异步I/O之glibc AIO篇
Linux下可用的异步I/O(以下简称为AIO)有两种,一种由Glibc实现提供,另一种是由Linux内核实现提供,而本文介绍的是glibc内的AIO。
在介绍AIO之前,需要对Linux上的各种不同I/O模型有一个较好的了解,而在这里:http://www.ibm.com/developerworks/cn/linux/l-async/,已经有了很详细的解释,这无需多说。
先来看示例,从感观上来进行认识:
/** * gcc glibc_aio_test.c -o glibc_aio_test -lrt */ #include <aio.h> #include <errno.h> #include <string.h> #include <stdio.h> #include <unistd.h> #include <malloc.h> #define SIZE_TO_READ (100) int main() { int fd; char *buffer; struct aiocb cb; int num; if ((fd = open("glibc_aio_test.c", O_RDONLY, 0)) == -1) { perror("Unable to open file."); return -1; } if ((buffer = malloc(SIZE_TO_READ + 1)) == NULL) { perror("Malloc failed."); close(fd); return -1; } memset(buffer, 0, SIZE_TO_READ + 1); memset(&cb, 0, sizeof(struct aiocb)); cb.aio_nbytes = SIZE_TO_READ; cb.aio_fildes = fd; cb.aio_offset = 0; cb.aio_buf = buffer; if (aio_read(&cb) == -1) { perror("Unable to create request."); close(fd); free(buffer); return -1; } printf("Request enqueued.\n"); while (aio_error(&cb) == EINPROGRESS) { printf("Working...\n"); sleep(1); } if ((num = aio_return(&cb)) != -1) { printf("Success return: %d.\n", num); printf("Data:\n%s\n", buffer); } else { printf("Error.\n"); } close(fd); free(buffer); return 0; }
在我的电脑上对上面示例进行编译、运行,结果如下:
[root@localhost aio]# uname -a Linux localhost.localdomain 2.6.32-220.el6.i686 #1 SMP Tue Dec 6 16:15:40 GMT 2011 i686 i686 i386 GNU/Linux [root@localhost aio]# cat /etc/issue CentOS release 6.2 (Final) Kernel \r on an \m [root@localhost aio]# rpm -q glibc glibc-2.12-1.47.el6.i686 [root@localhost aio]# gcc glibc_aio_test.c -o glibc_aio_test -lrt [root@localhost aio]# ./glibc_aio_test Request enqueued. Working... Success return: 100. Data: /** * gcc glibc_aio_test.c -o glibc_aio_test -lrt */ #include <aio.h> #include <errno.h> #inc [root@localhost aio]# hexdump -C -n 100 glibc_aio_test.c 00000000 2f 2a 2a 0d 0a 20 2a 20 67 63 63 20 67 6c 69 62 |/**.. * gcc glib| 00000010 63 5f 61 69 6f 5f 74 65 73 74 2e 63 20 2d 6f 20 |c_aio_test.c -o | 00000020 67 6c 69 62 63 5f 61 69 6f 5f 74 65 73 74 20 2d |glibc_aio_test -| 00000030 6c 72 74 0d 0a 20 2a 2f 0d 0a 23 69 6e 63 6c 75 |lrt.. */..#inclu| 00000040 64 65 20 3c 61 69 6f 2e 68 3e 0d 0a 23 69 6e 63 |de <aio.h>..#inc| 00000050 6c 75 64 65 20 3c 65 72 72 6e 6f 2e 68 3e 0d 0a |lude <errno.h>..| 00000060 23 69 6e 63 |#inc| 00000064 [root@localhost aio]#
回过头来看示例源代码,用到的几个AIO接口函数分别如下:
aio_read(),通过该函数告诉系统我们所请求IO操作的具体信息,这些信息存储在结构体aiocb变量cb的几个重要字段内:
aio_fildes – 文件描述符
aio_offset – 文件内偏移
aio_nbytes – 读取字节数
aio_buf – 数据存储区
可以把函数aio_read()与函数pread()对应起来理解:
ssize_t pread(int fd, void *buf, size_t count, off_t offset);
pread() reads up to count bytes from file descriptor fd at offset offset (from the start of the file) into
the buffer starting at buf. The file offset is not changed.
因此:aio_fildes == fd,aio_offset == offset,aio_nbytes == count,aio_buf == buf。
aio_error(),检查IO请求的当前状态,返回0表示请求处理完成,EINPROGRESS表示还在处理当中(源码中注释掉的部分),或其他一些表示错误的状态值。
aio_return(),检查IO请求的结果,返回实际读取的字节数或-1表示IO请求处理失败。
其他接口:
提交一个异步写
int aio_write(struct aiocb *aiocbp);
取消一个异步请求(或基于一个fd的所有异步请求,aiocbp==NULL)
int aio_cancel(int fildes, struct aiocb *aiocbp);
阻塞等待请求完成
int aio_suspend(const struct aiocb * const list[], int nent, const struct timespec *timeout);
所有这些接口的申明在头文件/usr/include/aio.h里可以看到。
下面具体来看一下Glibc的异步I/O实现,首先是aio_read接口定义:
glibc-2.17\sysdeps\pthread\aio_read.c
int aio_read (aiocbp) struct aiocb *aiocbp; { return (__aio_enqueue_request ((aiocb_union *) aiocbp, LIO_READ) == NULL ? -1 : 0); }
其中的__aio_enqueue_request()函数来之这里:
glibc-2.17\sysdeps\pthread\aio_misc.c
/* The main function of the async I/O handling. It enqueues requests and if necessary starts and handles threads. */ struct requestlist * internal_function __aio_enqueue_request (aiocb_union *aiocbp, int operation) { int result = 0; int policy, prio; struct sched_param param; struct requestlist *last, *runp, *newp; int running = no; if (operation == LIO_SYNC || operation == LIO_DSYNC) aiocbp->aiocb.aio_reqprio = 0; else if (aiocbp->aiocb.aio_reqprio < 0 || aiocbp->aiocb.aio_reqprio > AIO_PRIO_DELTA_MAX) { /* Invalid priority value. */ __set_errno (EINVAL); aiocbp->aiocb.__error_code = EINVAL; aiocbp->aiocb.__return_value = -1; return NULL; } /* Compute priority for this request. */ pthread_getschedparam (pthread_self (), &policy, ¶m); prio = param.sched_priority - aiocbp->aiocb.aio_reqprio; /* Get the mutex. */ pthread_mutex_lock (&__aio_requests_mutex); last = NULL; runp = requests; /* First look whether the current file descriptor is currently worked with. */ while (runp != NULL && runp->aiocbp->aiocb.aio_fildes < aiocbp->aiocb.aio_fildes) { last = runp; runp = runp->next_fd; } /* Get a new element for the waiting list. */ newp = get_elem (); if (newp == NULL) { pthread_mutex_unlock (&__aio_requests_mutex); __set_errno (EAGAIN); return NULL; } newp->aiocbp = aiocbp; #ifdef BROKEN_THREAD_SIGNALS newp->caller_pid = (aiocbp->aiocb.aio_sigevent.sigev_notify == SIGEV_SIGNAL ? getpid () : 0); #endif newp->waiting = NULL; aiocbp->aiocb.__abs_prio = prio; aiocbp->aiocb.__policy = policy; aiocbp->aiocb.aio_lio_opcode = operation; aiocbp->aiocb.__error_code = EINPROGRESS; aiocbp->aiocb.__return_value = 0; if (runp != NULL && runp->aiocbp->aiocb.aio_fildes == aiocbp->aiocb.aio_fildes) { /* The current file descriptor is worked on. It makes no sense to start another thread since this new thread would fight with the running thread for the resources. But we also cannot say that the thread processing this desriptor shall immediately after finishing the current job process this request if there are other threads in the running queue which have a higher priority. */ /* Simply enqueue it after the running one according to the priority. */ last = NULL; while (runp->next_prio != NULL && runp->next_prio->aiocbp->aiocb.__abs_prio >= prio) { last = runp; runp = runp->next_prio; } newp->next_prio = runp->next_prio; runp->next_prio = newp; running = queued; } else { running = yes; /* Enqueue this request for a new descriptor. */ if (last == NULL) { newp->last_fd = NULL; newp->next_fd = requests; if (requests != NULL) requests->last_fd = newp; requests = newp; } else { newp->next_fd = last->next_fd; newp->last_fd = last; last->next_fd = newp; if (newp->next_fd != NULL) newp->next_fd->last_fd = newp; } newp->next_prio = NULL; last = NULL; } if (running == yes) { /* We try to create a new thread for this file descriptor. The function which gets called will handle all available requests for this descriptor and when all are processed it will terminate. If no new thread can be created or if the specified limit of threads for AIO is reached we queue the request. */ /* See if we need to and are able to create a thread. */ if (nthreads < optim.aio_threads && idle_thread_count == 0) { pthread_t thid; running = newp->running = allocated; /* Now try to start a thread. */ result = aio_create_helper_thread (&thid, handle_fildes_io, newp); if (result == 0) /* We managed to enqueue the request. All errors which can happen now can be recognized by calls to `aio_return' and `aio_error'. */ ++nthreads; else { /* Reset the running flag. The new request is not running. */ running = newp->running = yes; if (nthreads == 0) { /* We cannot create a thread in the moment and there is also no thread running. This is a problem. `errno' is set to EAGAIN if this is only a temporary problem. */ __aio_remove_request (last, newp, 0); } else result = 0; } } } /* Enqueue the request in the run queue if it is not yet running. */ if (running == yes && result == 0) { add_request_to_runlist (newp); /* If there is a thread waiting for work, then let it know that we have just given it something to do. */ if (idle_thread_count > 0) pthread_cond_signal (&__aio_new_request_notification); } if (result == 0) newp->running = running; else { /* Something went wrong. */ __aio_free_request (newp); aiocbp->aiocb.__error_code = result; __set_errno (result); newp = NULL; } /* Release the mutex. */ pthread_mutex_unlock (&__aio_requests_mutex); return newp; }
该函数以及相关函数的代码量有点多,具体不一一细说,但其做的事情可以总结如下(来之完全参考2):
1、异步请求被提交到request_queue中;
2、request_queue实际上是一个表结构,”行”是fd、”列”是具体的请求。也就是说,同一个fd的请求会被组织在一起;
3、异步请求有优先级概念,属于同一个fd的请求会按优先级排序,并且最终被按优先级顺序处理;
4、随着异步请求的提交,一些异步处理线程被动态创建。这些线程要做的事情就是从request_queue中取出请求,然后处理之;
5、为避免异步处理线程之间的竞争,同一个fd所对应的请求只由一个线程来处理;
6、异步处理线程同步地处理每一个请求,处理完成后在对应的aiocb中填充结果,然后触发可能的信号通知或回调函数(回调函数是需要创建新线程来调用的);
7、异步处理线程在完成某个fd的所有请求后,进入闲置状态;
8、异步处理线程在闲置状态时,如果request_queue中有新的fd加入,则重新投入工作,去处理这个新fd的请求(新fd和它上一次处理的fd可以不是同一个);
9、异步处理线程处于闲置状态一段时间后(没有新的请求),则会自动退出。等到再有新的请求时,再去动态创建;
总结:可以看到Glibc的异步I/O是一种在用户级(user-level)通过多线程来模拟实现的,它不是真正意义上异步I/O,但正因为它只是一种模拟实现,使用的都是普通的技术(比如线程、非阻塞I/O),因此它能够使用于任何这些普通技术能够使用的场景,比如任意文件系统、任意操作系统、不要求direct IO;Glibc的异步I/O的缺点也很明显,性能相对真正意义上异步I/O要差得多。
完全参考:
1,http://fwheel.net/aio.html
2,http://hi.baidu.com/_kouu/item/2b3cfecd49c17d10515058d9
3,http://stackoverflow.com/questions/8768083/difference-between-posix-aio-and-libaio-on-linux
转载请保留地址:http://lenky.info/archives/2013/01/01/2165 或 http://lenky.info/?p=2165
备注:如无特殊说明,文章内容均出自Lenky个人的真实理解而并非存心妄自揣测来故意愚人耳目。由于个人水平有限,虽力求内容正确无误,但仍然难免出错,请勿见怪,如果可以则请留言告之,并欢迎来讨论。另外值得说明的是,Lenky的部分文章以及部分内容参考借鉴了网络上各位网友的热心分享,特别是一些带有完全参考的文章,其后附带的链接内容也许更直接、更丰富,而我只是做了一下归纳&转述,在此也一并表示感谢。关于本站的所有技术文章,欢迎转载,但请遵从CC创作共享协议,而一些私人性质较强的心情随笔,建议不要转载。
法律:根据最新颁布的《信息网络传播权保护条例》,如果您认为本文章的任何内容侵犯了您的权利,请以或书面等方式告知,本站将及时删除相关内容或链接。
@小聪明
新年快乐!:)
元旦也不休息,努力呀!