接上篇 linux IO —— 异步IO之 kernel aio

posix aio 和 kernel aio 的区别

前面说的 linux kernel aio,提供了一些内核函数来支持异步IO,性能很好,但是没有可移植性。还有一点是网上绝大部分文章都描述 linux kernel aio 只支持 DIRECT IO,这个与我在 linux 5.10 上测试的结果不符,可能是内核版本的原因。

而 posix aio 是c标准库 glibc 中,基于posix接口规范,在用户态用多线程实现的一个aio。它的优点是可移植性好,在满足posix规范的平台上基本上都能跑,缺点是基于多线程实现的性能较差,而且受限于线程池的大小,可能出现任务积压影响IO速度。

posix aio

posix aio api

见文档 https://man7.org/linux/man-pages/man7/aio.7.html

1
aio_read(3)
        Enqueue a read request.  This is the asynchronous analog
        of read(2).

aio_write(3)
        Enqueue a write request.  This is the asynchronous analog
        of write(2).

aio_fsync(3)
        Enqueue a sync request for the I/O operations on a file
        descriptor.  This is the asynchronous analog of fsync(2)
        and fdatasync(2).

aio_error(3)
        Obtain the error status of an enqueued I/O request.

aio_return(3)
        Obtain the return status of a completed I/O request.

aio_suspend(3)
        Suspend the caller until one or more of a specified set of
        I/O requests completes.

aio_cancel(3)
        Attempt to cancel outstanding I/O requests on a specified
        file descriptor.

lio_listio(3)
        Enqueue multiple I/O requests using a single function
        call.

编译时需要链接 -L/usr/lib64 -laio -lrt 这两个库。

基本使用流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
const int SEQ_BUF_SIZE = 1024;
char buf[SEQ_BUF_SIZE];
bzero(buf, SEQ_BUF_SIZE);
struct aiocb cb;
memset(&cb, 0, sizeof(struct aiocb));
int fd = open("./test.txt", O_RDONLY);

//step1. 创建 posix aio 回调对象
cb.aio_fildes = fd;
cb.aio_buf = buf;
cb.aio_nbytes = SEQ_BUF_SIZE;
cb.aio_offset = 0; // start offset

//step2. 创建异步读取任务
int ret = aio_read(&cb);
if (ret != 0){
fprintf(stderr, "aio_read() failed. errno = %d\n", errno);
return "";
}
//step3. 等待异步通知

异步通知方式

有很多方式可以花式判断 posix aio 任务的结束,和 kernel aio 一样,也是有办法通过 epoll 来监视异步IO任务的。

通过 sleep 判断

1
2
3
4
5
6
void CheckBySleep(aiocb &cb){
while (aio_error(&cb) == EINPROGRESS){
sleep(1);
}
fprintf(stdout, "%s\n", (char*)(cb.aio_buf));
}

通过 signal 通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* @brief 信号方式判断结束的handler
*
* @param sig
* @param info
* @param ucontext
*/

void SignalHandler(int sig, siginfo_t *info, void *ucontext){
printf("handle sig: %d\n", sig);
// handle sig: 10
// info->si_code:-4
// info->si_errno:0
// info->si_signo:10
std::cout << "info->si_code:" << info->si_code << std::endl;
std::cout << "info->si_errno:" << info->si_errno << std::endl;
std::cout << "info->si_signo:" << info->si_signo << std::endl;
//一定要设置 cb.aio_sigevent.sigev_value.sival_ptr = &cb, 否则拿不到二进制数据
aiocb *ptr = (aiocb *)info->si_value.sival_ptr;
printf("p=%d\n", ptr->aio_offset);
printf("p=%d\n", ptr->aio_nbytes);
printf("p=%d\n", ptr->aio_sigevent.sigev_signo);
printf("read=%s\n", (char *)ptr->aio_buf);
};

通过线程通知

1
2
3
4
5
6
7
8
9
10
/**
* @brief 通过线程的方式判断结束
* @param cb
*/

void CheckByThread(aiocb &cb){
cb.aio_sigevent.sigev_value.sival_ptr = &cb;
cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
cb.aio_sigevent.sigev_notify_function = ThreadHandler;
cb.aio_sigevent.sigev_notify_attributes = NULL;
}

通过 signalfd 阻塞读来通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
**
* @brief 通过信号fd读写来监听结束
* https://man7.org/linux/man-pages/man2/signalfd.2.html
* https://zhuanlan.zhihu.com/p/418256266
* @param cb
*/
void CheckBySignalFd(aiocb &cb){
//指定用信号的方式触发
cb.aio_sigevent.sigev_value.sival_ptr = &cb;//这个必须要填写,会在signalfd_siginfo中以ssi_ptr指针返回
cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
cb.aio_sigevent.sigev_signo = SIGUSR1;
//创建 signalfd
sigset_t mask;
// 信号清零
sigemptyset(&mask);
// 添加信号到掩码集
sigaddset(&mask, SIGUSR1);
// 设置该进程为对应的信号集的内容(当前已经的信号集合做并集、交集、覆盖)
// 这行代码才是真正的信号设置;
sigprocmask(SIG_BLOCK, &mask, NULL);

// 创建 signalfd 句柄(绑定信号)
int sfd = signalfd(-1, &mask, 0);
for (;;) {
// 阻塞读取 signalfd 数据(数据代表信号)
signalfd_siginfo sig;
size_t s = read(sfd, &sig, sizeof(struct signalfd_siginfo));
// ...
// 信号的逻辑处理
if(s > 0){
SignalFdHandler(sig.ssi_signo, (aiocb*)sig.ssi_ptr, nullptr);
} else {
printf("get signal size %d\n", s);
}
}
}

通过 signalfd+epoll 来通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* @brief 将 posix aio 的 signal 模式下的 signal 转换为 signalfd,这个fd可以用epoll来监听
* 普通的ext4文件系统的文件不能用 epoll 来监听,因为没有提供poll接口,参考 https://cloud.tencent.com/developer/article/1835294
* @param cb
*/

void CheckByEpoll(aiocb &cb){
//指定用信号的方式触发
cb.aio_sigevent.sigev_value.sival_ptr = &cb;//这个必须要填写,会在signalfd_siginfo中以ssi_ptr指针返回
cb.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
cb.aio_sigevent.sigev_signo = SIGUSR1;
//创建 signalfd
sigset_t mask;
// 信号清零
sigemptyset(&mask);
// 添加信号到掩码集
sigaddset(&mask, SIGUSR1);
// 设置该进程为对应的信号集的内容(当前已经的信号集合做并集、交集、覆盖)
// 这行代码才是真正的信号设置;
sigprocmask(SIG_BLOCK, &mask, NULL);

// 创建 signalfd 句柄(绑定信号),用于epoll下可以考虑设置为非阻塞fd
int sfd = signalfd(-1, &mask, SFD_NONBLOCK);

printf("handle aio by signalfd \n");
//创建epollfd
const int MAX_EVENT_NUM = 1;
int efd = epoll_create(MAX_EVENT_NUM);
//添加sfd到epoll的监听中
epoll_event in_ev{EPOLLIN, {.fd=sfd}};
epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &in_ev);
//阻塞,监听epoll返回
epoll_event out_ev;
int ret = epoll_wait(efd, &out_ev, MAX_EVENT_NUM, -1);
if(ret <=0){
printf(" epoll fail with ret: %d \n", ret);
return;
}
//若out_ev里边有可写事件,尝试读取数据
if(EPOLLIN & out_ev.events){
signalfd_siginfo sig;
size_t s = read(sfd, &sig, sizeof(struct signalfd_siginfo));
// ...
// 信号的逻辑处理
if(s > 0){
SignalFdHandler(sig.ssi_signo, (aiocb*)sig.ssi_ptr, nullptr);
} else {
printf("get signal size %d\n", s);
}
}
}

练习代码

练习代码仓库:https://github.com/zouchengzhuo/linux-io-learn/tree/master/4.aio/glibc_aio

参考文档:

☞ 参与评论