接上篇 linux IO —— 异步IO之 kernel aio
posix aio 和 kernel aio 的区别 前面说的 linux kernel aio,提供了一些内核函数来支持异步IO,性能很好,但是没有可移植性。还有一点是网上绝大部分文章都描述 linux kernel aio 只支持 DIRECT IO,这个与我在 linux 5.10 上测试的结果不符,可能是内核版本的原因。
而 posix aio 是c标准库 glibc 中,基于posix接口规范,在用户态用多线程实现的一个aio。它的优点是可移植性好,在满足posix规范的平台上基本上都能跑,缺点是基于多线程实现的性能较差,而且受限于线程池的大小,可能出现任务积压影响IO速度。
posix aio posix aio api 见文档 https://man7.org/linux/man-pages/man7/aio.7.html
1 aio_read(3)
Enqueue a read request. This is the asynchronous analog
of read(2).
aio_write(3)
Enqueue a write request. This is the asynchronous analog
of write(2).
aio_fsync(3)
Enqueue a sync request for the I/O operations on a file
descriptor. This is the asynchronous analog of fsync(2)
and fdatasync(2).
aio_error(3)
Obtain the error status of an enqueued I/O request.
aio_return(3)
Obtain the return status of a completed I/O request.
aio_suspend(3)
Suspend the caller until one or more of a specified set of
I/O requests completes.
aio_cancel(3)
Attempt to cancel outstanding I/O requests on a specified
file descriptor.
lio_listio(3)
Enqueue multiple I/O requests using a single function
call.
编译时需要链接 -L/usr/lib64 -laio -lrt
这两个库。
基本使用流程 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 const int SEQ_BUF_SIZE = 1024 ;char buf[SEQ_BUF_SIZE];bzero(buf, SEQ_BUF_SIZE); struct aiocb cb;memset (&cb, 0 , sizeof (struct aiocb));int fd = open("./test.txt" , O_RDONLY);cb.aio_fildes = fd; cb.aio_buf = buf; cb.aio_nbytes = SEQ_BUF_SIZE; cb.aio_offset = 0 ; int ret = aio_read(&cb);if (ret != 0 ){ fprintf (stderr , "aio_read() failed. errno = %d\n" , errno); return "" ; }
异步通知方式 有很多方式可以花式判断 posix aio 任务的结束,和 kernel aio 一样,也是有办法通过 epoll 来监视异步IO任务的。
通过 sleep 判断 1 2 3 4 5 6 void CheckBySleep (aiocb &cb) { while (aio_error(&cb) == EINPROGRESS){ sleep(1 ); } fprintf (stdout , "%s\n" , (char *)(cb.aio_buf)); }
通过 signal 通知 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 * @brief 信号方式判断结束的handler * * @param sig * @param info * @param ucontext */ void SignalHandler (int sig, siginfo_t *info, void *ucontext) { printf ("handle sig: %d\n" , sig); std ::cout << "info->si_code:" << info->si_code << std ::endl; std ::cout << "info->si_errno:" << info->si_errno << std ::endl; std ::cout << "info->si_signo:" << info->si_signo << std ::endl; aiocb *ptr = (aiocb *)info->si_value.sival_ptr; printf ("p=%d\n" , ptr->aio_offset); printf ("p=%d\n" , ptr->aio_nbytes); printf ("p=%d\n" , ptr->aio_sigevent.sigev_signo); printf ("read=%s\n" , (char *)ptr->aio_buf); };
通过线程通知 1 2 3 4 5 6 7 8 9 10 * @brief 通过线程的方式判断结束 * @param cb */ void CheckByThread (aiocb &cb) { cb.aio_sigevent.sigev_value.sival_ptr = &cb; cb.aio_sigevent.sigev_notify = SIGEV_THREAD; cb.aio_sigevent.sigev_notify_function = ThreadHandler; cb.aio_sigevent.sigev_notify_attributes = NULL ; }
通过 signalfd 阻塞读来通知 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 ** * @brief 通过信号fd读写来监听结束 * https: * https: * @param cb */ void CheckBySignalFd (aiocb &cb) { cb.aio_sigevent.sigev_value.sival_ptr = &cb; cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL; cb.aio_sigevent.sigev_signo = SIGUSR1; sigset_t mask; sigemptyset(&mask); sigaddset(&mask, SIGUSR1); sigprocmask(SIG_BLOCK, &mask, NULL ); int sfd = signalfd(-1 , &mask, 0 ); for (;;) { signalfd_siginfo sig; size_t s = read(sfd, &sig, sizeof (struct signalfd_siginfo)); if (s > 0 ){ SignalFdHandler(sig.ssi_signo, (aiocb*)sig.ssi_ptr, nullptr ); } else { printf ("get signal size %d\n" , s); } } }
通过 signalfd+epoll 来通知 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 * @brief 将 posix aio 的 signal 模式下的 signal 转换为 signalfd,这个fd可以用epoll来监听 * 普通的ext4文件系统的文件不能用 epoll 来监听,因为没有提供poll接口,参考 https://cloud.tencent.com/developer/article/1835294 * @param cb */ void CheckByEpoll (aiocb &cb) { cb.aio_sigevent.sigev_value.sival_ptr = &cb; cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL; cb.aio_sigevent.sigev_signo = SIGUSR1; sigset_t mask; sigemptyset(&mask); sigaddset(&mask, SIGUSR1); sigprocmask(SIG_BLOCK, &mask, NULL ); int sfd = signalfd(-1 , &mask, SFD_NONBLOCK); printf ("handle aio by signalfd \n" ); const int MAX_EVENT_NUM = 1 ; int efd = epoll_create(MAX_EVENT_NUM); epoll_event in_ev{EPOLLIN, {.fd=sfd}}; epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &in_ev); epoll_event out_ev; int ret = epoll_wait(efd, &out_ev, MAX_EVENT_NUM, -1 ); if (ret <=0 ){ printf (" epoll fail with ret: %d \n" , ret); return ; } if (EPOLLIN & out_ev.events){ signalfd_siginfo sig; size_t s = read(sfd, &sig, sizeof (struct signalfd_siginfo)); if (s > 0 ){ SignalFdHandler(sig.ssi_signo, (aiocb*)sig.ssi_ptr, nullptr ); } else { printf ("get signal size %d\n" , s); } } }
练习代码 练习代码仓库:https://github.com/zouchengzhuo/linux-io-learn/tree/master/4.aio/glibc_aio
参考文档:
本文链接:https://www.zoucz.com/blog/2022/06/07/e5d88170-e5b7-11ec-9fe7-534bbf9f369d/