Linux native AIO与eventfd、epoll的结合使用
在前面的示例libaio_test.c和native_aio_test.c中,可以看到对磁盘aio请求(本文的aio都指此类)的使用有阻塞等待,这明显之处为对io_getevents()函数(当然,其它函数,比如io_submit()也有一定程度的阻塞)的调用,它会等待并获取已完成的io请求,如果当前没有或少于指定数目的io请求完成,那么就会等待直到timeout。
io_getevents()函数的等待会导致整个进程的阻塞使得程序无法继续向下执行,如果程序还有其它阻塞点,那么有必要想办法把这多处等待合而为一同时进行,从而提高并行性,也就是通常所说的select/epoll等这类多路复用技术。
本文就以epoll为例,介绍一下在linux下,如何把aio结合并应用到epoll机制里。我们知道,epoll机制的最大好处就是它能够在同一时刻对多个文件描述符(通常是由众多套接字形成的描述符集合)进行监听,并将其上发生的读/写(或错误等)事件通知给应用程序,也就是做到时间上的复用。如果能够把aio也放到epoll机制里,即把aio当作epoll机制里的“一路io”,那么就能使得aio与其它可能的等待操作(比如:读/写套接字)共同工作,从而达到时间复用的目的。
作为epoll机制里的“一路io”,需要一个文件描述符来反馈对应的发生事件,而对于纯aio而言,是没有文件描述符作为代表的,因此linux系统上多出了一个eventfd()的系统调用:
#include <sys/eventfd.h> int eventfd(unsigned int initval, int flags);
当然,这个系统调用是否就是因此原因才出现,我不得而知(也没去细查),但要把aio应用到epoll机制里,的确少不了它。从man手册http://man7.org/linux/man-pages/man2/eventfd.2.html可以看到,eventfd()函数的作用是提供一种让内核通知应用程序有事件发生的机制。根据给定参数的不同,对eventfd进行read()的语义也有所不同,看本文aio应用的场景情况:
int efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
对该描述符efd进行read(),如果读取成功,那么将返回8-byte的整型数据,而该数据也就是表示已经完成的aio请求个数。
充当中间桥梁的eventfd有了,并且eventfd()函数返回的描述符可以添加到epoll机制内,因此剩下需要做的就是把eventfd与aio联系起来,而目前aio当然已经有了这个支持,不过,由于native aio的相关结构体有两套封装,即一种是libaio的封装,一种是内核的直接封装(便于直接使用aio),比如iocb:
libaio的封装(来之:/usr/include/libaio.h):
struct io_iocb_common { PADDEDptr(void *buf, __pad1); PADDEDul(nbytes, __pad2); long long offset; long long __pad3; unsigned flags; unsigned resfd; }; /* result code is the amount read or -'ve errno */ struct io_iocb_vector { const struct iovec *vec; int nr; long long offset; }; /* result code is the amount read or -'ve errno */ struct iocb { PADDEDptr(void *data, __pad1); /* Return in the io completion event */ PADDED(unsigned key, __pad2); /* For use in identifying io requests */ short aio_lio_opcode; short aio_reqprio; int aio_fildes; union { struct io_iocb_common c; struct io_iocb_vector v; struct io_iocb_poll poll; struct io_iocb_sockaddr saddr; } u; };
内核的封装(来之:/usr/include/linux/aio_abi.h或/usr/src/linux-2.6.38.8/include/linux/aio_abi.h):
/*
* we always use a 64bit off_t when communicating
* with userland. its up to libraries to do the
* proper padding and aio_error abstraction
*/
struct iocb {
/* these are internal to the kernel/libc. */
__u64 aio_data; /* data to be returned in event's data */
__u32 PADDED(aio_key, aio_reserved1);
/* the kernel sets aio_key to the req # */
/* common fields */
__u16 aio_lio_opcode; /* see IOCB_CMD_ above */
__s16 aio_reqprio;
__u32 aio_fildes;
__u64 aio_buf;
__u64 aio_nbytes;
__s64 aio_offset;
/* extra parameters */
__u64 aio_reserved2; /* TODO: use this for a (struct sigevent *) */
/* flags for the "struct iocb" */
__u32 aio_flags;
/*
* if the IOCB_FLAG_RESFD flag of "aio_flags" is set, this is an
* eventfd to signal AIO readiness to
*/
__u32 aio_resfd;
}; /* 64 bytes */
两个结构体是等价的,只是字段名称有所不同而已,此处仅看内核封装的情况(后续将提到nginx对aio的使用实现,而nginx是采用的就是syscall手动封装),有一段很明显的英文注释出卖了aio对eventfd的使用支持,即两个字段:aio_flags与aio_resfd,详细来说就是将aio_flags打上IOCB_FLAG_RESFD标记并且将eventfd()函数返回的描述符设置到aio_resfd即可。
废话少说,看两个示例,第一个来之:http://blog.sina.com.cn/s/blog_6b19f21d0100znza.html
#define _GNU_SOURCE #define __STDC_FORMAT_MACROS #include <stdio.h> #include <errno.h> #include <libaio.h> #include <sys/eventfd.h> #include <sys/epoll.h> #include <stdlib.h> #include <sys/types.h> #include <unistd.h> #include <stdint.h> #include <sys/stat.h> #include <fcntl.h> #include <inttypes.h> #define TEST_FILE "aio_test_file" #define TEST_FILE_SIZE (127 * 1024) #define NUM_EVENTS 128 #define ALIGN_SIZE 512 #define RD_WR_SIZE 1024 struct custom_iocb { struct iocb iocb; int nth_request; }; void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2) { struct custom_iocb *iocbp = (struct custom_iocb *)iocb; printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n", iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE", iocb->u.c.offset, iocb->u.c.nbytes, res, res2); } int main(int argc, char *argv[]) { int efd, fd, epfd; io_context_t ctx; struct timespec tms; struct io_event events[NUM_EVENTS]; struct custom_iocb iocbs[NUM_EVENTS]; struct iocb *iocbps[NUM_EVENTS]; struct custom_iocb *iocbp; int i, j, r; void *buf; struct epoll_event epevent; efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (efd == -1) { perror("eventfd"); return 2; } fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644); if (fd == -1) { perror("open"); return 3; } ftruncate(fd, TEST_FILE_SIZE); ctx = 0; if (io_setup(8192, &ctx)) { perror("io_setup"); return 4; } if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) { perror("posix_memalign"); return 5; } printf("buf: %p\n", buf); for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) { iocbps[i] = &iocbp->iocb; io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE); io_set_eventfd(&iocbp->iocb, efd); io_set_callback(&iocbp->iocb, aio_callback); iocbp->nth_request = i + 1; } if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) { perror("io_submit"); return 6; } epfd = epoll_create(1); if (epfd == -1) { perror("epoll_create"); return 7; } epevent.events = EPOLLIN | EPOLLET; epevent.data.ptr = NULL; if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) { perror("epoll_ctl"); return 8; } i = 0; while (i < NUM_EVENTS) { uint64_t finished_aio; if (epoll_wait(epfd, &epevent, 1, -1) != 1) { perror("epoll_wait"); return 9; } if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) { perror("read"); return 10; } printf("finished io number: %"PRIu64"\n", finished_aio); while (finished_aio > 0) { tms.tv_sec = 0; tms.tv_nsec = 0; r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms); if (r > 0) { for (j = 0; j < r; ++j) { ((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2); } i += r; finished_aio -= r; } } } close(epfd); free(buf); io_destroy(ctx); close(fd); close(efd); remove(TEST_FILE); return 0; }
编译执行,OK无误(特别注意:上面示例代码仅只是演示aio+eventfd+epoll的使用,而细节部分是有严重bug的,比如所有请求共用一个缓存区buf):
[root@www 1]# gcc t.c -laio [root@www 1]# ./a.out
上面示例采用了libaio库,试试syscall简单封装(由上面示例修改而来):
/** * gcc aio_eventfd_epoll.c -o aio_eventfd_epoll * ref: http://blog.sina.com.cn/s/blog_6b19f21d0100znza.html * modified by: http://lenky.info/ */ #define _GNU_SOURCE #define __STDC_FORMAT_MACROS #include <sys/epoll.h> #include <stdio.h> /* for perror() */ #include <unistd.h> /* for syscall() */ #include <sys/syscall.h> /* for __NR_* definitions */ #include <linux/aio_abi.h> /* for AIO types and constants */ #include <fcntl.h> /* O_RDWR */ #include <string.h> /* memset() */ #include <inttypes.h> /* uint64_t */ #include <stdlib.h> #define TEST_FILE "aio_test_file" #define TEST_FILE_SIZE (128 * 1024) #define NUM_EVENTS 128 #define ALIGN_SIZE 512 #define RD_WR_SIZE 1024 inline int io_setup(unsigned nr, aio_context_t *ctxp) { return syscall(__NR_io_setup, nr, ctxp); } inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) { return syscall(__NR_io_submit, ctx, nr, iocbpp); } inline int io_getevents(aio_context_t ctx, long min_nr, long max_nr, struct io_event *events, struct timespec *timeout) { return syscall(__NR_io_getevents, ctx, min_nr, max_nr, events, timeout); } inline int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } inline int eventfd2(unsigned int initval, int flags) { return syscall(__NR_eventfd2, initval, flags); } struct custom_iocb { struct iocb iocb; int nth_request; }; typedef void io_callback_t(aio_context_t ctx, struct iocb *iocb, long res, long res2); void aio_callback(aio_context_t ctx, struct iocb *iocb, long res, long res2) { struct custom_iocb *iocbp = (struct custom_iocb *)iocb; printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n", iocbp->nth_request, (iocb->aio_lio_opcode == IOCB_CMD_PREAD) ? "READ" : "WRITE", iocb->aio_offset, iocb->aio_nbytes, res, res2); } int main(int argc, char *argv[]) { int efd, fd, epfd; aio_context_t ctx; struct timespec tms; struct io_event events[NUM_EVENTS]; struct custom_iocb iocbs[NUM_EVENTS]; struct iocb *iocbps[NUM_EVENTS]; struct custom_iocb *iocbp; int i, j, r; void *buf; void *aio_buf; struct epoll_event epevent; efd = eventfd2(0, O_NONBLOCK | O_CLOEXEC); if (efd == -1) { perror("eventfd2"); return 2; } fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644); if (fd == -1) { perror("open"); return 3; } ftruncate(fd, TEST_FILE_SIZE); ctx = 0; if (io_setup(NUM_EVENTS, &ctx)) { perror("io_setup"); return 4; } if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE * NUM_EVENTS)) { perror("posix_memalign"); return 5; } printf("buf: %p\n", buf); for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) { aio_buf = (void *)((char *)buf + (i*RD_WR_SIZE)); memset(aio_buf, 0, RD_WR_SIZE); //io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE); iocbp->iocb.aio_fildes = fd; iocbp->iocb.aio_lio_opcode = IOCB_CMD_PREAD; iocbp->iocb.aio_buf = (uint64_t)aio_buf; iocbp->iocb.aio_offset = i * RD_WR_SIZE; iocbp->iocb.aio_nbytes = RD_WR_SIZE; //io_set_eventfd(&iocbp->iocb, efd); iocbp->iocb.aio_flags = IOCB_FLAG_RESFD; iocbp->iocb.aio_resfd = efd; //io_set_callback(&iocbp->iocb, aio_callback); iocbp->iocb.aio_data = (__u64)aio_callback; iocbp->nth_request = i + 1; iocbps[i] = &iocbp->iocb; } if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) { perror("io_submit"); return 6; } epfd = epoll_create(1); if (epfd == -1) { perror("epoll_create"); return 7; } epevent.events = EPOLLIN | EPOLLET; epevent.data.ptr = NULL; if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) { perror("epoll_ctl"); return 8; } i = 0; while (i < NUM_EVENTS) { uint64_t finished_aio; if (epoll_wait(epfd, &epevent, 1, -1) != 1) { perror("epoll_wait"); return 9; } if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) { perror("read"); return 10; } printf("finished io number: %"PRIu64"\n", finished_aio); while (finished_aio > 0) { tms.tv_sec = 0; tms.tv_nsec = 0; r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms); if (r > 0) { for (j = 0; j < r; ++j) { ((io_callback_t *)(events[j].data))(ctx, (struct iocb *)events[j].obj, events[j].res, events[j].res2); } i += r; finished_aio -= r; } } } close(epfd); free(buf); io_destroy(ctx); close(fd); close(efd); remove(TEST_FILE); return 0; }
转载请保留地址:http://lenky.info/archives/2013/01/09/2183 或 http://lenky.info/?p=2183
备注:如无特殊说明,文章内容均出自Lenky个人的真实理解而并非存心妄自揣测来故意愚人耳目。由于个人水平有限,虽力求内容正确无误,但仍然难免出错,请勿见怪,如果可以则请留言告之,并欢迎来讨论。另外值得说明的是,Lenky的部分文章以及部分内容参考借鉴了网络上各位网友的热心分享,特别是一些带有完全参考的文章,其后附带的链接内容也许更直接、更丰富,而我只是做了一下归纳&转述,在此也一并表示感谢。关于本站的所有技术文章,欢迎转载,但请遵从CC创作共享协议,而一些私人性质较强的心情随笔,建议不要转载。
法律:根据最新颁布的《信息网络传播权保护条例》,如果您认为本文章的任何内容侵犯了您的权利,请以或书面等方式告知,本站将及时删除相关内容或链接。