1. libevent库是什么

1.1 定义

libevent 是一个高性能的事件通知库,提供了一种机制,在文件描述符上发生特定事件或超时后执行回调函数。它抽象了底层的事件通知机制(如 epoll、kqueue 或 select),并在不同平台上提供了一致的接口。常用于网络服务器和其他需要高效事件驱动编程的应用场景。同类型的异步事件库还有 libev,以及 nodejs 中使用的 libuv 等。

1.2 前置技能

阻塞/非阻塞IO和多路复用

正如定义中所说,要理解 libevent 的工作原理,首先需要理解一下基于文件描述符的阻塞/非阻塞IO、epoll/kqueue/select 等多路复用的原理。

可以参考:

reactor模式

在前面一小节提到的文章中,可以看到基于 epoll 等方案实现多路复用的代码中,业务代码需要做非常多的事情,如事件注册、回调函数&数据管理、循环等待等。reactor模式可以通过一定的封装,为业务提供一个基础库,让业务代码的异步调用变得更简单,很多常见的开源软件很多都采用了这个设计思路,如 redis、nginx、netty 等等。

reactor,翻译过来就是“反应器”,还挺直观的 —— 对某事件做出反应,执行对应的操作。

我觉得讲的比较清楚的一篇文章:高性能网络模式:Reactor 和 Proactor (本小节的图都来自这篇文章)

单reactor线程:

image.png

可以通过单 reactor 线程的示意图来简单理解 reactor 模式的工作原理:通过 多路复用库(如select)的事件等待机制,将不同的事件(可读、可写),分发(dispatch)给不同的处理器(acceptor / handler)进行处理(accept、read、send)。

单Reactor多线程:

image.png

如果要避免单线程中业务处理逻辑阻塞 eventloop,可以使用单 reactor 多线程模式,即一个 reactor 线程用来处理事件分发,具体的业务逻辑由 worker 线程池来完成。

主从reactor多线程:

image.png

更进一步,还可以把 server socket 的可读事件(即建立连接),和 client socket 的读写事件分为多个 reactor 线程来处理。主 reactor 收到 client fd 后,即分发给子 reactor,由子 reactor 来管理 client fd 的可读、可写事件,同时把具体的业务逻辑交给 worker 线程池来完成。

Proactor模式

image.png

和 reacotr 模式差不多,区别是 reactor 是基于同步IO,内核在数据可读写时通知用户,由用户完成数据的读写,而异步IO是内核完成读写后通知用户。

2. libevent 1.1b 的使用

为了理解 libevent 的基本原理,我选择了它最早的一个子版本 tag 来分析,此版本中已经包含的核心的 reactor 逻辑,没有后续版本的优化,用来看源码非常合适。

https://github.com/libevent/libevent/tree/release-1.1b

2.1 epoll不同封装程度实现异步事件

前面 “前置技能” 的描述还是有点抽象,我们可以看一下,基于 epoll 做不同程度的封装来实现异步事件,是什么样的编码体验。下面的例子中,我们需要监听一个 fd 上的可读可写事件,事件触发后执行相关操作。

直接使用 epoll

直接使用 epoll,我们需要自己管理 epoll fd、自己管理事件循环、自己管理回调参数。下面是一部分伪代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//step1.创建一个epoll,后面的size是给系统的建议初始size,并不是指定固定的size
int efd = epoll_create(10);
//step2.向epoll中注册server fd
epoll_event ev{ EPOLLIN|EPOLLOUT, {.fd=sfd} };
epoll_ctl(efd, EPOLL_CTL_ADD, target_fd, &ev);
//step3.循环阻塞等待 epoll 事件
epoll_event evs[EPOLL_MAX_EVENTS];
while (1)
{
ev_num = epoll_wait(efd, evs, EPOLL_MAX_EVENTS, -1);
while (i < ev_num)
{
epoll_event *c_ev = &evs[i];
// handle event ...
// 这里需要根据读写事件,甚至 fd 的值,来自行判断应该执行如何操作,如:
// server socket fd 可读,执行 accept 操作
// client socket fd 可读,执行从远端读取数据的操作
// client socket fd 可写,执行向远端写入数据的操作
}
}

自行封装简单的 reactor 模式

epoll_event 结构体中,有个 epoll_data 结构体,可以存储一个指针/数值信息

1
2
3
4
5
6
7
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

可以基于这个参数简单封装,将读写等事件触发后的回调函数(或者如回调参数等更多信息)保存在这个数据中。

1
2
3
4
5
6
7
8
9
10
11
12
//step1.创建一个epoll
int efd = epoll_create(10);
//step2.向epoll中注册server fd
epoll_event ev{ EPOLLIN|EPOLLOUT, {.fd=sfd} };
// 封装通过 epoll_ctl 绑定事件,并将 handler 写入 epoll_event 的 data 属性
bind_reacotr_event(&ev, sfd, handler);
//step3. 执行 eventloop 逻辑
while (1)
{
// 封装了循环 epoll_wait 阻塞等待事件出发 —— 从 epoll_event 的 data 属性中获取回调 —— 执行handler 的逻辑
dispatch(efd);
}

使用 libevent 1.1b

使用 libevent 来实现可以让代码变得更加简单,libevent 将 epoll 相关操作的逻辑封装在了内部,并做了平台兼容,根据不同的运行平台选择合适的库。

1
2
3
4
5
6
7
8
// step1. 创建 event_base
event_base *base = (event_base*)event_init();
// step2. 创建 event 对象,设置其监听的事件、回调函数、回调参数,并加入 libevent 的监听
event ev;
event_set(&ev, sfd, EV_READ | EV_PERSIST, connect_cb, NULL);
event_add(&ev, NULL);
// step3. 执行 eventloop 逻辑
event_base_dispatch(base);

相对最初的直接使用 epoll 的版本,是不是直接又清晰了?

2.2 编译安装

编译运行环境为 centos7,1.1b是个非常老的版本了,编译安装的过程中,得安装一些依赖工具。

1
yum install -y autoconf automake libtool

然后按下面的步骤操作,即可。

  • 将 configure.in 重命名为 configure.ac
  • 运行 aclocal 生成 aclocal.m4 文件
  • 运行 autoconf 生成 configure 文件
  • 运行 automake --add-missing,会报错,缺少 Makefile.in 和 config.h.in
  • 运行 autoconf -i 重新生成 Makefile.in 等文件
  • 运行 automake --add-missing
  • 运行 ./configure,生成 makefile
  • make,编译完成
  • make install 安装

安装成功后,可以在 /usr/local/lib 中找到 libevent 库文件,/usr/local/include 中找到 event.h 头文件。

2.3 处理各种事件的示例

研究源码之前,可以先看一下我们可以用 libevent 来做些什么,具体来说就是可以处理哪些类型的事件,然后按图索骥,从源码来分析是如何实现这些事件的处理的。

IO

下面是一个使用 libevent 来处理网络IO的示例,server socket 监听起来后,将其加入 libevent 的监听,在有新的客户端连接建立时,将 client fd 也加入到 libevent 的监听,并处理 client socket 上的读写事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <event.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <strings.h>
#include <errno.h>

#define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0)
#define SERVER_PORT 8888
#define BUFFER_SIZE 1024

void read_cb(int cfd, short e, void *arg) {
char buf[1024];
while (true)
{
bzero(buf, sizeof(buf));
int ret = recv(cfd, buf, sizeof(buf), 0);
if(ret == 0) {
// 断开连接,清空 libevent 监听
event_del((event*)arg);
close(cfd);
break;
}
if(ret == -1){
// 读完了数据,直接退出
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
perror("recv client fd data error");
event_del((event*)arg);
close(cfd);
free(arg);
break;
}
printf("<---: %s \n", buf);
ret = send(cfd, buf, sizeof(buf), 0);
if(ret !=0 ) perror("send data error");
printf("--->: %s \n", buf);
}
}

void connect_cb(int sfd, short e, void *arg) {
// libevent 1.1b 中使用的是 水平触发 模式,必须把值给一次性读完,不然一直触发
while (true)
{
// 接收 client fd
sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
int cfd = accept(sfd, (sockaddr*)&addr, &addr_len);
if(cfd == -1) {
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
perror("accept from server fd error");
break;
}
int flags = fcntl(cfd, F_GETFL, 0);
fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
// 将 client fd 加入到 libevent 监听中
// event ev; // 这种方式会报错,因为 connect_cb 执行完毕后 ev 就会被释放
struct event *ev = (event*)malloc(sizeof(struct event));
event_set(ev, cfd, EV_READ|EV_PERSIST, read_cb, ev);
event_add(ev, NULL);
}
}

int main(){

// 创建一个 server socket,并监听端口
int sfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_family = AF_INET;
addr.sin_port = htons(SERVER_PORT);
int ret = bind(sfd, (sockaddr*)&addr, sizeof(addr));
if(ret != 0) handle_error("bind fail");
ret = listen(sfd, 10);
if(ret != 0) handle_error("listen fail");

// 创建 event_base
event_base *base = (event_base*)event_init();

// server fd 加入监听,将 event_base 传入,以将 client fd 也加入监听
event ev;
event_set(&ev, sfd, EV_READ | EV_PERSIST, connect_cb, NULL);
event_add(&ev, NULL);

// 进入事件循环
event_base_dispatch(base);
printf("exit loop\n");
return 0;
}

timerfd

下面是一个使用 libevent 处理 timerfd 事件的例子,将 timerfd 加入 libevent 的持续监听,每隔一段时间触发一次回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/timerfd.h>
#include <event.h>

void timer_cb(int fd, short event, void *arg) {
// libevent 1.1b 中使用的是 水平触发 模式,必须把值给一次性读完,不然一直触发
while (true)
{
u_int64_t count;
int ret = read(fd, &count, sizeof(count));
if(ret == -1){
// perror("read timerfd error");
break;
}
printf("read timeout count: %u\n", count);
}
}

int main() {
// 初始化 libevent
event_base *base = (event_base*)event_init();
// 创建 timerfd
int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
// 设置 timerfd 的超时时间
itimerspec ts = {{2, 0}, {6, 0}};
timerfd_settime(tfd, 0, &ts, NULL);
// 定义 event 结构体
event ev;
// 为 event 设置触发事件的fd、触发的目标事件、触发后的回调
event_set(&ev, tfd, EV_READ | EV_PERSIST, timer_cb, NULL);
// 添加 event 到 event_base
event_add(&ev, NULL);
// 进入事件循环
event_base_dispatch(base);
printf("exit loop\n");
return 0;
}

signal

下面是一个使用 libevent 处理进程信号的例子。运行过程中使用 kill 发送监听的信号值,即可触发回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <event.h>
#include <signal.h>
#include <sys/signalfd.h>
#include <stdio.h>
#include <event.h>
#include <signal.h>

void signal_cb(int fd, short event, void *arg)
{

struct event *sig = (struct event*)arg;
printf("caught an signal %d \n", fd);
// printf("Caught an interrupt signal; exiting cleanly in two seconds.\n");
// event_base_loopexit(sig->ev_base, NULL);
}

int main(int argc, char **argv)
{

struct event signal_int;
struct timeval tv;

/* 初始化 libevent */
struct event_base *base = (event_base*)event_init();

/* 设置信号处理函数 */
// event_set(&signal_int, SIGINT, EV_SIGNAL|EV_PERSIST, signal_cb, &signal_int);
event_set(&signal_int, SIGRTMIN + 10, EV_SIGNAL|EV_PERSIST, signal_cb, &signal_int);

/* 添加事件 */
event_add(&signal_int, NULL);

/* 进入事件循环 */
event_base_dispatch(base);

return 0;
}

timeout

监听事件时,可以添加一个超时事件,一旦超时时间到,则会调用一次回调,回调中的 event 设置为 EV_TIMEOUT,并将事件从监听中移除。在前面 timerfd 的示例基础上,加入超时时间:

1
2
timeval timeout = {1, 0};
event_add(&ev, &timeout);

回调中可以判断触发的事件类型

1
2
3
4
5
6
void timer_cb(int fd, short event, void *arg) {
if (event & EV_TIMEOUT) {
printf(" event timeout \n");
return;
}
}

evbuffer 和 bufferevent 管理内存缓冲区

前面处理 IO 的例子中,是自行管理的 client fd 上的读写事件,实际上 libevent 还提供了一个 evbuffer 和 bufferevent 来管理内存缓冲区和 fd 上的读写事件。

把 IO 例子中的 fd 读写操作替换为 evbuffer 和 bufferevent:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <event.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <strings.h>
#include <errno.h>

#define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0)
#define SERVER_PORT 8888
#define BUFFER_SIZE 1024

void read_cb(bufferevent* bufev, void* carg){
// 触发可读事件后,尝试读取数据
char* buf = (char*)carg;
int size = bufferevent_read(bufev, buf, BUFFER_SIZE);
if(size > 0) {
// 读到了数据,回写
printf("<---: %s \n", buf);
bufferevent_write(bufev, buf, size);
printf("--->: %s \n", buf);
}
}

void write_cb(bufferevent* bufev, void* carg){
printf("---> write done \n");
}

void error_cb(struct bufferevent * bufev, short what, void * carg) {
// EVBUFFER_EOF EVBUFFER_ERROR
// 如果连接断开,则关掉fd,删除事件
if(what & EVBUFFER_EOF) {
close(bufev->ev_read.ev_fd);
bufferevent_free(bufev);
return;
}
perror("client fd error");
close(bufev->ev_read.ev_fd);
bufferevent_free(bufev);

}

void connect_cb(int sfd, short e, void *arg) {
// libevent 1.1b 中使用的是 水平触发 模式,必须把值给一次性读完,不然一直触发
while (true)
{
// 接收 client fd
sockaddr_in addr;
socklen_t addr_len = sizeof(addr);
int cfd = accept(sfd, (sockaddr*)&addr, &addr_len);
if(cfd == -1) {
if(errno == EAGAIN || errno == EWOULDBLOCK) break;
perror("accept from server fd error");
break;
}
int flags = fcntl(cfd, F_GETFL, 0);
fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
// 连接建立后,创建一个 bufferevent 用于管理 client fd 的读写
char *buf = (char*)malloc(BUFFER_SIZE);
bufferevent* bufev = bufferevent_new(cfd, read_cb, write_cb, error_cb, buf);
bufferevent_enable(bufev, EV_READ);
}
}

int main(){

// 创建一个 server socket,并监听端口
int sfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0);
sockaddr_in addr;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
addr.sin_family = AF_INET;
addr.sin_port = htons(SERVER_PORT);
int ret = bind(sfd, (sockaddr*)&addr, sizeof(addr));
if(ret != 0) handle_error("bind fail");
ret = listen(sfd, 10);
if(ret != 0) handle_error("listen fail");

// 创建 event_base
event_base *base = (event_base*)event_init();

// server fd 加入监听,将 event_base 传入,以将 client fd 也加入监听
event ev;
event_set(&ev, sfd, EV_READ | EV_PERSIST, connect_cb, NULL);
event_add(&ev, NULL);

// 进入事件循环
event_base_dispatch(base);
printf("exit loop\n");
return 0;
}

3. libevent 1.1b 源码分析

选择 1.1b 版本的原因是,它是 libevent 最早的一个版本,实现了主体功能,没有做后期复杂的优化和更多高级功能的支持,对阅读源码来说比较简单。

我在这里 https://github.com/zouchengzhuo/libevent/tree/release-1.1b-comment 提交了一个带阅读注释的分支。

3.1 整体视图

image.png

如图中所示,libevent 工作的基本流程就是:

  • 提供一个封装事件的结构体 event,一个核心入口 event_base 用于存储事件和事件调度器
  • 通过 event_add 将事件加入存储,同时注册到根据运行平台选择的多路复用管理器 eventop 上
  • eventop 注册事件到内核多路复用库的监听,并在 eventloop 中执行 dispatch 操作,在有事件触发时将 event 加入到带优先级的激活队列中
  • 在 eventloop 中执行激活队列中 event 携带的回调函数

下面将逐个分析这些环节的细节,并分析 libevent 内部是如何处理各类事件的。

3.2 event 核心事件

event 结构体就是异步事件驱动中的 “事件”。所有的事件均由此结构体描述,并绑定到 libevent 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
struct event {
// 该事件在事件链表中的前后节点
TAILQ_ENTRY (event) ev_next;
// 该事件在活跃事件链表中的前后节点
TAILQ_ENTRY (event) ev_active_next;
// 该事件在信号事件链表中的前后节点
TAILQ_ENTRY (event) ev_signal_next;
// 该事件在超时红黑树中的节点描述信息
RB_ENTRY (event) ev_timeout_node;
// 该事件被绑定到的 event_base 变量指针
struct event_base *ev_base;
// event 绑定的目标 fd
int ev_fd;
// event 需要监听的事件(可读、可写等)
short ev_events;
// 事件被触发的次数
short ev_ncalls;
// 事件上次被激活后触发的次数
// 如果该事件在回调函数中被重新注册,那么ev_pncalls会被设置为ev_ncalls的值,表示该事件在下一次被激活时应该触发ev_ncalls-ev_pncalls次。
// 这样可以确保事件在回调函数中被重新注册后,不会立即被再次触发,从而避免事件处理器进入死循环。
short *ev_pncalls; /* Allows deletes in callback */
// 超时时间,在 event_add 时传入一个时间,加上当前时间后赋值给 ev_timeout
struct timeval ev_timeout;
// 事件的优先级
int ev_pri; /* smaller numbers are higher priority */
// 事件触发时执行的回调函数
void (*ev_callback)(int, short, void *arg);
// 事件参数,回调时传给 arg
void *ev_arg;
// 事件结果,回调时传给第二个参数
int ev_res; /* result passed to event callback */
// 该 event 当前所处的事件队列或超时红黑树(可以同时在多个队列中)
int ev_flags;
};

其中,ev_events 可以设置的事件类型含:

1
2
3
4
5
#define EV_TIMEOUT	0x01
#define EV_READ 0x02
#define EV_WRITE 0x04
#define EV_SIGNAL 0x08
#define EV_PERSIST 0x10 /* Persistant event */

ev_flags 可以描述的当前 event 所在的队列类型有:

1
2
3
4
5
6
#define EVLIST_TIMEOUT	0x01 // 表示 event 已经超时,等待被处理
#define EVLIST_INSERTED 0x02 // 表示 event 已经被添加到 event_base 中,等待被处理
#define EVLIST_SIGNAL 0x04 // 表示 event 是一个信号事件。
#define EVLIST_ACTIVE 0x08 // 表示 event 正在被处理
#define EVLIST_INTERNAL 0x10 // 表示 event 是一个内部事件
#define EVLIST_INIT 0x80 // 表示 event 已经被初始化,但还没有被添加到 event_base 中

3.3 event_base 核心入口

event_base 结构体是 libevent 库的核心和主入口,它保存了多路复用库对象和所有被监听的事件信息。

在这里 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/event-internal.h#L40 可以看到 event_base 的结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
struct event_base {
// 选择的 多路复用库 的 eventop 对象
const struct eventop *evsel;
// 选择的 多路复用库 的属性对象,如 epollop、kqop(kqueue op)等
void *evbase;
// event计数
int event_count; /* counts number of total events */
// 激活状态 event 计数
int event_count_active; /* counts number of active events */
// 将 eventloop 设置为停止状态
int event_gotterm; /* Set to terminate loop */

/* active event management */
// 激活状态的 event 链表列表,注意是个双重指针,一个优先级指向一个队列
// 若没有指定,则event_init的时候默认设置优先级数量为1,即只有一个队列
struct event_list **activequeues;
// 激活状态的队列数量,根据指定的优先级分配
int nactivequeues;
// 全部的 event 链表
struct event_list eventqueue;
// event_base创建时设置为当前时间,事件循环中每次循环时设置为当前时间
struct timeval event_tv;
// 超时事件的tree
RB_HEAD(event_tree, event) timetree;
};

从结构中可以看到,event_base 结构体中保存了:

  • 当前运行环境中选择的多路复用库的 eventop 对象
  • 当前运行环境中选择的多路复用库的属性对象,如 epollop、kqop(kqueue op)等
  • 当前被注册监听的事件计数
  • 当前被激活的事件计数
  • eventloop停止开关
  • 激活事件优先级队列数组
  • 激活事件优先级队列数组长度
  • 保存全部 event 的链表
  • 当前时间
  • 保存超时事件的红黑树

3.4 eventop 多路复用库封装

libevent 根据不同的运行平台选择不同的多路复用库,不过所有平台的库都会被封装为一个 eventop 的结构体实现。

eventop 的结构在这里 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/event.h#L123 可以看到。

epoll.ckqueue.cpoll.cselect.c 中则实现了各种多路复用库的封装。

以 epoll 的封装为例 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/epoll.c#L65

1
2
3
4
5
6
7
8
struct eventop epollops = {
"epoll",
epoll_init,
epoll_add,
epoll_del,
epoll_recalc,
epoll_dispatch
};

下面挑几个比较重要的方法分析下它们都做了哪些事情。

epoll_init

epoll_init 中,执行了 epoll fd 初始化的逻辑,并且创建了一个 epollop 变量返回,这个变量将被保存到 event_base 的 evbase 属性中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
struct evepoll {
// 若监听了可读事件,则有值
struct event *evread;
// 若监听了可写事件,则有值
struct event *evwrite;
};

struct epollop {
// 被监听的事件和fd绑定的列表,支持0~32000的fd,超过后会报错
struct evepoll *fds;
// 最大监听的 fd 数量
int nfds;
// 监听的 epoll_event 数组
struct epoll_event *events;
// 监听的 epoll_event 数组长度
int nevents;
// epoll fd 的值
int epfd;
// 监听的进程信号值
sigset_t evsigmask;
};

epoll_add

epoll_add 中,实现了事件注册的逻辑。

1
2
3
4
5
6
7
8
9
/**
* @brief 向 epoll 中添加事件注册
*
* @param arg epollop 指针
* @param ev 需要被添加到 epoll 监听的 event 指针
* @return int
*/

int
epoll_add(void *arg, struct event *ev)

这里内部实现中需要注意的是,libevent实现分配好了 32000 个 epollop 变量的空间(即 epollop.fds 属性),然后把 event 中的 fd 当做索引,判断该索引是否已经绑定过了时间,若绑定过了,则使用 epoll_ctl + EPOLL_CTL_MOD 更新 epoll 监听,否则使用 epoll_ctl + EPOLL_CTL_ADD 向 epoll 添加监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
evep = &epollop->fds[fd];
op = EPOLL_CTL_ADD;
events = 0;
if (evep->evread != NULL) {
events |= EPOLLIN;
op = EPOLL_CTL_MOD;
}
if (evep->evwrite != NULL) {
events |= EPOLLOUT;
op = EPOLL_CTL_MOD;
}

if (ev->ev_events & EV_READ)
events |= EPOLLIN;
if (ev->ev_events & EV_WRITE)
events |= EPOLLOUT;

epev.data.ptr = evep;
epev.events = events;
if (epoll_ctl(epollop->epfd, op, ev->ev_fd, &epev) == -1)
return (-1);

epoll_del

和 epoll_add 类似,做了相反的操作,即如果读和写都被删掉了,则同过 EPOLL_CTL_DEL 去掉 epoll 监听,若还有剩下的,则通过 EPOLL_CTL_MOD 更新 epoll 监听。

1
2
int
epoll_del(void *arg, struct event *ev)

epoll_dispatch

在 epoll_dispatch 中,调用一次 epoll_wait,若有触发了可读/可写事件的 event,则调用 event_active 函数,将它们放到 event_base 的激活队列中去。

同时,若 event 的 ev_events 属性中没有 EV_PERSIST 的 flag,则代表这个事件是一次性的,本次触发完成后,通过 epoll_del 函数将其从 epoll 的监听中删除。

1
2
3
4
5
6
7
8
9
10
/**
* @brief 执行一次 epoll_wait,并分发被激活的事件
*
* @param base event_base 指针
* @param arg epolltop指针
* @param tv epoll_wait 超时时间
* @return int
*/

int
epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)

3.5 event_add 事件注册

可以看到,event_add 中做了三件事:

  • 将事件插入到 event_base 的事件队列中
  • 如果传入了超时时间,则将 event 加入到超时事件红黑树
  • 将事件加入到多路复用库的监听列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* @brief 向 event_base 中绑定一个 event
*
* @param ev 被绑定的 event
* @param tv 超时时间
* @return int
*/

int
event_add(struct event *ev, struct timeval *tv)
{

// 从 event 中取出 event_base 信息
struct event_base *base = ev->ev_base;
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;

event_debug((
"event_add: event: %p, %s%s%scall %p",
ev,
ev->ev_events & EV_READ ? "EV_READ " : " ",
ev->ev_events & EV_WRITE ? "EV_WRITE " : " ",
tv ? "EV_TIMEOUT " : " ",
ev->ev_callback));

assert(!(ev->ev_flags & ~EVLIST_ALL));
// 如果绑定时附带了超时信息
if (tv != NULL) {
struct timeval now;
// 判断 event 的 flag,如果已经在超时队列中了,则给移除掉
if (ev->ev_flags & EVLIST_TIMEOUT)
event_queue_remove(base, ev, EVLIST_TIMEOUT);

/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */

// 如果 event 已经在 激活队列中了,而且是因为超时而激活的,则重新添加时,在超时回调执行之前将其从激活队列中移除
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
/* See if we are just active executing this
* event in a loop
*/

// 清空触发次数,之前触发的就不会调用回调了
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
// 从激活队列中移除
event_queue_remove(base, ev, EVLIST_ACTIVE);
}

// 将 event 的 ev_timeout 设置为 当前时间点加上传入的 tv timeval
gettimeofday(&now, NULL);
timeradd(&now, tv, &ev->ev_timeout);

event_debug((
"event_add: timeout in %d seconds, call %p",
tv->tv_sec, ev->ev_callback));
// 将 event 插入超时队列,timeout事件为了快速查找写入红黑树,给 event 的 flag 添加 EVLIST_TIMEOUT 的flag
event_queue_insert(base, ev, EVLIST_TIMEOUT);
}
// 如果 event 监听了 可读或者可写事件,而且它不在 event_base 的已插入队列或者活跃队列中,则将它加入已插入队列
if ((ev->ev_events & (EV_READ|EV_WRITE)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
event_queue_insert(base, ev, EVLIST_INSERTED);
// 执行多路复用管理器的add操作,比如 epoll_ctl add
return (evsel->add(evbase, ev));
} else if ((ev->ev_events & EV_SIGNAL) &&
!(ev->ev_flags & EVLIST_SIGNAL)) {
// 如果 event 监听了 signal 事件,而且不在 event_base 的 signal 队列中,则加入
event_queue_insert(base, ev, EVLIST_SIGNAL);
// 执行多路复用管理器的add操作,比如 epoll_ctl add
return (evsel->add(evbase, ev));
}

return (0);
}

3.6 event_del 事件删除

删除的逻辑和添加相反:

  • 将事件从 event_base 的事件队列中移除
  • 将事件从多路复用库的监听列表中移除
1
2
int
event_del(struct event *ev)

3.7 event_base_loop 事件循环

event_base_loop 就是 libevent 库的时间循环启动方法,在时间循环中不断调用多路复用库的 dispatch,阻塞等待可用的事件触发。

事件触发后,会由多路复用库将事件写入到激活队列中,此时在 event_base_loop 中按优先级遍历激活队列中的事件,并调用它们的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
int
event_base_loop(struct event_base *base, int flags)
{

// 取出 event_base 信息
const struct eventop *evsel = base->evsel;
void *evbase = base->evbase;
struct timeval tv;
int res, done;

/* Calculate the initial events that we are waiting for */
// 如果没有监听的 event,直接退出
if (evsel->recalc(base, evbase, 0) == -1)
return (-1);

done = 0;
while (!done) {
/* Terminate the loop if we have been asked to */
// 收到停止信号时,退出事件循环
if (base->event_gotterm) {
base->event_gotterm = 0;
break;
}

/* You cannot use this interface for multi-threaded apps */
// TODO: 信号处理逻辑,就只是在每个事件循环里边判断一下?纵观代码,这个应该是用不上的
while (event_gotsig) {
event_gotsig = 0;
if (event_sigcb) {
res = (*event_sigcb)();
if (res == -1) {
errno = EINTR;
return (-1);
}
}
}

/* Check if time is running backwards */
// 检查时间是否在倒退,如果倒退,对超时事件红黑树中的所有 events 的超时时间进行修正,减去要给 offset
gettimeofday(&tv, NULL);
if (timercmp(&tv, &base->event_tv, <)) {
struct timeval off;
event_debug(("%s: time is running backwards, corrected",
__func__));
timersub(&base->event_tv, &tv, &off);
timeout_correct(base, &off);
}
// 更新 event_base 的 event_tv 时间
base->event_tv = tv;
// 计算下一次 epoll_wait 的超时时间
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK))
// 如果一个激活状态的 event 都没有,而且 flag 中不包含 EVLOOP_NONBLOCK,即是一个阻塞epoll(从 event_base_dispatch 调用进来的 flag 是默认值0)
// 那么计算得到一个 epoll wait 超时时间 tv
timeout_next(base, &tv);
else
// 否则将tv设置为时间 0
timerclear(&tv);

/* If we have no events, we just exit */
// 如果没有被监听的 fd,直接退出
if (!event_haveevents(base)) {
event_debug(("%s: no events registered.", __func__));
return (1);
}
// 执行多路复用的 wait 操作,使用上面计算得到的 tv,如果 tv 是0,则1微秒超时
// epoll_wait 超时后,若有读写事件发生,将有事件发生的 event 放到激活队列中
res = evsel->dispatch(base, evbase, &tv);
// 如果出错,直接返回
if (res == -1)
return (-1);
// 处理超时事件,若有超时的事件,给放到激活队列中,然后从超时事件红黑树和其它结构中移除,也就是说超时事件仅触发一次
timeout_process(base);
// 如果存在激活状态的 event,处理激活事件
if (base->event_count_active) {
// 处理一个优先级最高的激活事件队列,处理完毕后从激活队列中移除,其它优先级的等待下一次处理
event_process_active(base);
// 如果事件处理完了,而且 flat 中有 EVLOOP_ONCE,则退出
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
// 如果没有激活状态的 event,而且 event_base 设置了 EVLOOP_NONBLOCK flag,则直接退出
done = 1;

if (evsel->recalc(base, evbase, 0) == -1)
return (-1);
}

event_debug(("%s: asked to terminate loop.", __func__));
return (0);
}

上面的代码中,值的注意的是:

  • 为了避免时钟回拨的问题,每轮 loop 中都会判断一次时间是否倒退,如果是,则修正超时红黑树中的时间。
  • dispatch 的超时时间是每次循环时重新计算的,若有超时事件,则使用最近一次超时事件作为 dispatch 的超时时间,否则使用默认的 5s。

3.8 IO/timeout/signal的处理逻辑

IO event 处理

IO 事件的处理比较简单,就通过多路复用库来实现监听触发就可以了。比较有特点的是它对事件优先级的处理。

创建 event,并通过 event_set 设置 event 属性。

1
2
3
4
5
6
7
8
9
10
// event.c 
void
event_set(struct event *ev, int fd, short events,
void (*callback)(int, short, void *), void *arg)
{
//...... 省略其它属性设置逻辑
/* by default, we put new events into the middle priority */
// 设置默认优先级,为 event_base 的 活跃队列数量的 一半,即默认中等优先级
ev->ev_pri = current_base->nactivequeues/2;
}

通过 event_add 添加到 event_base 监听,进入事件队列,添加到多路复用库监听,若设置了超时时间,加入到超时事件红黑树。细节参考前面的 event_add 的逻辑。

1
2
3
4
5
6
7
8
9
/**
* @brief 向 event_base 中绑定一个 event
*
* @param ev 被绑定的 event
* @param tv 超时时间
* @return int
*/

int
event_add(struct event *ev, struct timeval *tv)

通过 event_base_dispatch 循环调用多路复用库的 dispatch,等待事件触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
// event.c
int
event_base_loop(struct event_base *base, int flags)
{

// ...
while (!done) {
//...
// 调用多路复用库的 dispatch
res = evsel->dispatch(base, evbase, &tv);
//...
}
//...
}

多路复用库的 dispatch 中,若有触发事件,将其加入激活队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// epoll.c
/**
* @brief 执行一次 epoll_wait,并分发被激活的事件
*
* @param base event_base 指针
* @param arg epolltop指针
* @param tv epoll_wait 超时时间
* @return int
*/

int
epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{

struct epollop *epollop = arg;
struct epoll_event *events = epollop->events;
struct evepoll *evep;
int i, res, timeout;
// 将要监听的信号量解除阻塞,以让进程可以响应此信号量
if (evsignal_deliver(&epollop->evsigmask) == -1)
return (-1);

timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999) / 1000;
res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout);

if (evsignal_recalc(&epollop->evsigmask) == -1)
return (-1);

if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}

evsignal_process();
return (0);
} else if (evsignal_caught)
evsignal_process();

event_debug(("%s: epoll_wait reports %d", __func__, res));
// 遍历被触发的事件
for (i = 0; i < res; i++) {
int which = 0;
int what = events[i].events;
struct event *evread = NULL, *evwrite = NULL;

evep = (struct evepoll *)events[i].data.ptr;

if (what & EPOLLHUP)
what |= EPOLLIN | EPOLLOUT;
else if (what & EPOLLERR)
what |= EPOLLIN | EPOLLOUT;

if (what & EPOLLIN) {
evread = evep->evread;
which |= EV_READ;
}

if (what & EPOLLOUT) {
evwrite = evep->evwrite;
which |= EV_WRITE;
}

if (!which)
continue;

if (evread != NULL && !(evread->ev_events & EV_PERSIST))
event_del(evread);
if (evwrite != NULL && evwrite != evread &&
!(evwrite->ev_events & EV_PERSIST))
event_del(evwrite);

if (evread != NULL)
// 将触发可读事件的 event 放到激活队列中
event_active(evread, EV_READ, 1);
if (evwrite != NULL)
// 将触发可写事件的 event 放到激活队列中
event_active(evwrite, EV_WRITE, 1);
}

return (0);
}

事件加入激活队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// event.c
/**
* @brief 将 event 激活并放入到激活队列中
*
* @param ev event
* @param res 触发事件,如 EV_TIMEOUT / EV_READ / EV_WRITE
* @param ncalls 触发次数
*/

void
event_active(struct event *ev, int res, short ncalls)
{

/* We get different kinds of events, add them together */
// 如果event已经是活跃,将 res 事件合并到 ev_res中
if (ev->ev_flags & EVLIST_ACTIVE) {
ev->ev_res |= res;
return;
}
// 写入本次激活触发的事件,如 EV_TIMEOUT / EV_READ / EV_WRITE
ev->ev_res = res;
// 写入激活次数
ev->ev_ncalls = ncalls;
ev->ev_pncalls = NULL;
// 将 event 插入到激活队列中
event_queue_insert(ev->ev_base, ev, EVLIST_ACTIVE);
}

在 event_base_dispatch 一次循环等待完成后,执行激活事件列表中的回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// event.c
int
event_base_loop(struct event_base *base, int flags)
{

// ...
while (!done) {
//...
// 如果存在激活状态的 event,处理激活事件
if (base->event_count_active) {
// 处理一个优先级最高的激活事件队列,处理完毕后从激活队列中移除,其它优先级的等待下一次处理
event_process_active(base);
// 如果事件处理完了,而且 flat 中有 EVLOOP_ONCE,则退出
if (!base->event_count_active && (flags & EVLOOP_ONCE))
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
// 如果没有激活状态的 event,而且 event_base 设置了 EVLOOP_NONBLOCK flag,则直接退出
done = 1;
//...
}
//...
}

处理激活队列中的事件回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// event.c
/**
* @brief 处理激活队列中的事件,每个事件都根据 ev_ncalls 调用对应次数的回调函数,并将其从激活队列中移除
*
* @param base
*/

static void
event_process_active(struct event_base *base)
{

struct event *ev;
struct event_list *activeq = NULL;
int i;
short ncalls;
// 没有激活事件,直接返回
if (!base->event_count_active)
return;
// 取出第一个激活状态的事件队列(一次只处理一个优先级)
for (i = 0; i < base->nactivequeues; ++i) {
if (TAILQ_FIRST(base->activequeues[i]) != NULL) {
activeq = base->activequeues[i];
break;
}
}

for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) {
// 每处理一个,就从激活队列中移除一个
event_queue_remove(base, ev, EVLIST_ACTIVE);

/* Allows deletes to work */
// 读取 event 触发事件的数量
ncalls = ev->ev_ncalls;
// 将 ev_pncalls 设置为上一轮激活事件的数量
ev->ev_pncalls = &ncalls;
// 根据激活次数,调用n次回调函数
while (ncalls) {
ncalls--;
ev->ev_ncalls = ncalls;
// 调用回调时传入参数: fd、激活的事件类型、ev中附带的args
(*ev->ev_callback)((int)ev->ev_fd, ev->ev_res, ev->ev_arg);
}
}
}

timeout 处理

libevent 通过不断的动态计算每次 dispatch 的超时时间,来实现对事件自身超时的处理。

event 通过 event_add 添加到 event_base 的监听中时,若传入了超时时间,则加入超时红黑树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// event.c
/**
* @brief 向 event_base 中绑定一个 event
*
* @param ev 被绑定的 event
* @param tv 超时时间
* @return int
*/

int
event_add(struct event *ev, struct timeval *tv)
{

//...
// 如果绑定时附带了超时信息
if (tv != NULL) {
struct timeval now;
// 判断 event 的 flag,如果已经在超时队列中了,则给移除掉
if (ev->ev_flags & EVLIST_TIMEOUT)
event_queue_remove(base, ev, EVLIST_TIMEOUT);

/* Check if it is active due to a timeout. Rescheduling
* this timeout before the callback can be executed
* removes it from the active list. */

// 如果 event 已经在 激活队列中了,而且是因为超时而激活的,则重新添加时,在超时回调执行之前将其从激活队列中移除
if ((ev->ev_flags & EVLIST_ACTIVE) &&
(ev->ev_res & EV_TIMEOUT)) {
/* See if we are just active executing this
* event in a loop
*/

// 清空触发次数,之前触发的就不会调用回调了
if (ev->ev_ncalls && ev->ev_pncalls) {
/* Abort loop */
*ev->ev_pncalls = 0;
}
// 从激活队列中移除
event_queue_remove(base, ev, EVLIST_ACTIVE);
}

// 将 event 的 ev_timeout 设置为 当前时间点加上传入的 tv timeval
gettimeofday(&now, NULL);
timeradd(&now, tv, &ev->ev_timeout);

event_debug((
"event_add: timeout in %d seconds, call %p",
tv->tv_sec, ev->ev_callback));
// 将 event 插入超时队列,timeout事件为了快速查找写入红黑树,给 event 的 flag 添加 EVLIST_TIMEOUT 的flag
event_queue_insert(base, ev, EVLIST_TIMEOUT);
}
//...
}

通过 event_base_dispatch 循环调用多路复用库的 dispatch 时,查找最近的一个超时时间的超时时间点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// event.c
int
event_base_loop(struct event_base *base, int flags)
{

// ...
while (!done) {
//...
// 更新 event_base 的 event_tv 时间
base->event_tv = tv;
// 计算下一次 epoll_wait 的超时时间
if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK))
// 如果一个激活状态的 event 都没有,而且 flag 中不包含 EVLOOP_NONBLOCK,即是一个阻塞epoll(从 event_base_dispatch 调用进来的 flag 是默认值0)
// 那么计算得到一个 epoll wait 超时时间 tv
timeout_next(base, &tv);
else
// 否则将tv设置为时间 0
timerclear(&tv);

/* If we have no events, we just exit */
// 如果没有被监听的 fd,直接退出
if (!event_haveevents(base)) {
event_debug(("%s: no events registered.", __func__));
return (1);
}
// 执行多路复用的 wait 操作,使用上面计算得到的 tv,如果 tv 是0,则1微秒超时
// epoll_wait 超时后,若有读写事件发生,将有事件发生的 event 放到激活队列中
res = evsel->dispatch(base, evbase, &tv);
//...
}
//...
}

查找最近的超时时间点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* @brief 计算 epoll_wait 的超时时间,若没有超时 event,返回默认的 5s;若有超时的 event,则返回当前时间距离最近超时 event 的时间差(最小值为0)。
*
* @param base event_base 对象
* @param tv epoll_wait 的超时时间
* @return int
*/

int
timeout_next(struct event_base *base, struct timeval *tv)
{

// 默认超时时间,5秒
struct timeval dflt = TIMEOUT_DEFAULT;

struct timeval now;
struct event *ev;
// 从超时红黑树中取出超时时间最小的 event,如果没有,则将 tv 设置为默认超时时间,并返回
if ((ev = RB_MIN(event_tree, &base->timetree)) == NULL) {
*tv = dflt;
return (0);
}
// 获取当前时间戳
if (gettimeofday(&now, NULL) == -1)
return (-1);
// 如果 event 的超时时间小于当前时间,即 event 已经超时了,将 tv 设置为 0,返回
if (timercmp(&ev->ev_timeout, &now, <=)) {
timerclear(tv);
return (0);
}
// event 还没超时,用 event 的超时时间点减去当前时间点,作为 tv 的值,返回
timersub(&ev->ev_timeout, &now, tv);

assert(tv->tv_sec >= 0);
assert(tv->tv_usec >= 0);

event_debug(("timeout_next: in %d seconds", tv->tv_sec));
return (0);
}

多路复用库的 dispatch 返回时,判断是否有已经超时的时间,若有,将其加入到激活队列,并从事件队列、超时红黑树中删除。

1
2
3
4
5
6
7
8
9
10
11
12
13
// event.c
int
event_base_loop(struct event_base *base, int flags)
{

// ...
while (!done) {
//...
// 处理超时事件,若有超时的事件,给放到激活队列中,然后从超时事件红黑树和其它结构中移除,也就是说超时事件仅触发一次
timeout_process(base);
//...
}
//...
}

判断超时,加入激活队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* @brief 处理超时队列中的 event,判断是否超时,已经超时的从其它队列移除,给放到激活队列中
*
* @param base
*/

void
timeout_process(struct event_base *base)
{

struct timeval now;
struct event *ev, *next;
// 获取当前时间
gettimeofday(&now, NULL);
// 从红黑树中查找最小节点,并依次查找下一个节点
for (ev = RB_MIN(event_tree, &base->timetree); ev; ev = next) {
// 如果超时时间还没到,直接结束此函数
if (timercmp(&ev->ev_timeout, &now, >))
break;
// 已经超时,查找下一个 event
next = RB_NEXT(event_tree, &base->timetree, ev);
// 将 event 从 timeout queue中移除
event_queue_remove(base, ev, EVLIST_TIMEOUT);

/* delete this event from the I/O queues */
// 把此 event 从所有的事件队列中移除
event_del(ev);

event_debug(("timeout_process: call %p",
ev->ev_callback));
// 触发一次 EV_TIMEOUT 事件
event_active(ev, EV_TIMEOUT, 1);
}
}

event_base_dispatch 中处理激活队列中的事件,若发现超时的 event,则执行其回调,这里的逻辑就和 IO 事件处理中的一致了。

1
2
3
4
5
6
7
8
// event.c
/**
* @brief 处理激活队列中的事件,每个事件都根据 ev_ncalls 调用对应次数的回调函数,并将其从激活队列中移除
*
* @param base
*/

static void
event_process_active(struct event_base *base)

signal 处理

在比较新版本的 linux 内核中,进程信号可以通过 signalfd 来监听。

libevent 1.1b 在 2002 年就发布了,signalfd 从 linux kernel 2.6.22 版本开始支持 https://man7.org/linux/man-pages/man2/signalfd.2.html#VERSIONS,而 2.6.22 版本是 2008 年才发布的。 https://zh.wikipedia.org/wiki/Linux%E5%86%85%E6%A0%B8%E7%89%88%E6%9C%AC%E5%8E%86%E5%8F%B2#%E7%89%88%E6%9C%AC_2.6.x.y

所以没办法用前面的多路复用机制来实现进程信号的监听,下面我们看看 libevent 是怎么巧妙的实现 signal 监听的。

image.png

整体流程如上图。首先,通过 event_init 初始化 event_base 时,会同时初始化多路复用库。

1
2
3
4
5
6
7
8
9
10
void *
event_init(void)
{

// 根据不同平台选择一个多路复用系统,选择后进行初始化
current_base->evbase = NULL;
for (i = 0; eventops[i] && !current_base->evbase; i++) {
current_base->evsel = eventops[i];
current_base->evbase = current_base->evsel->init();
}
}

多路复用库初始化时:

  • 初始化一个 sigset_t 变量 evsigmask,用于记录该 epoll 上要监听的信号值
  • 由于多路复用库只能处理 fd,不能处理进程信号,所以需要创建一个用于占位的内部事件 ev_signal 加入 epoll 监听,不加这个的话,就没办法利用多路复用库来阻塞等待信号的发生了
  • 初始化一对全双工的 socket :ev_signal_pair,将其中一端 ev_signal_pair[1] 的可读事件设置给 ev_signal,绑定到 event_base 的监听中,可读事件触发后,调用一次 evsignal_cb。

下面以 epoll.c 中的实现为例:

1
2
3
4
5
6
7
void *
epoll_init(void)
{

//...
evsignal_init(&epollop->evsigmask);
//...
}

evsignal_init 中,做了上面提到的三个工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* @brief 初始化 ev_signal 这个 event,监听它的读事件,读到信号后,触发 evsignal_cb
*
* @param evsigmask
*/

void
evsignal_init(sigset_t *evsigmask)
{

sigemptyset(evsigmask);

/*
* Our signal handler is going to write to one end of the socket
* pair to wake up our event loop. The event loop then scans for
* signals that got delivered.
*/

// 创建一对全双工通信的 socket,信号处理时会向一端写入,event loop 从另一端读取
if (socketpair(AF_UNIX, SOCK_STREAM, 0, ev_signal_pair) == -1)
event_err(1, "%s: socketpair", __func__);

FD_CLOSEONEXEC(ev_signal_pair[0]);
FD_CLOSEONEXEC(ev_signal_pair[1]);
// 为 ev_signal 信号 event 设置可读事件
event_set(&ev_signal, ev_signal_pair[1], EV_READ,
evsignal_cb, &ev_signal);
// 设置为内部事件,不影响 event_base 的事件计数
ev_signal.ev_flags |= EVLIST_INTERNAL;
}

初始化完毕后,在使用时,创建 event并通过 event_set 设置 event 属性。其中,若要监听进程信号,则在监听的事件类型 ev_events 中加上 EV_SIGNAL,并将需要监听的信号值设置为 event 的 ev_fd。

event创建并设置好后,通过 event_add 加入 event_base 监听,事件加入信号事件队列,并添加多路复用库监听。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// event.c
/**
* @brief 向 event_base 中绑定一个 event
*
* @param ev 被绑定的 event
* @param tv 超时时间
* @return int
*/

int
event_add(struct event *ev, struct timeval *tv)
{

//...
if ((ev->ev_events & EV_SIGNAL) &&
!(ev->ev_flags & EVLIST_SIGNAL)) {
// 如果 event 监听了 signal 事件,而且不在 event_base 的 signal 队列中,则加入
event_queue_insert(base, ev, EVLIST_SIGNAL);
// 执行多路复用管理器的add操作,比如 epoll_ctl add
return (evsel->add(evbase, ev));
}
//...
}

多路复用库添加监听时,判断如果是 signal 事件,则不是直接将 event.ev_fd 加入到 epoll 监听,此时 event.ev_fd 保存的实际上是要监听的 信号值,将其加入到 epoll 的变量 evsigmask 即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// epoll.c
/**
* @brief 向 epoll 中添加事件注册
*
* @param arg epollop 指针
* @param ev 需要被添加到 epoll 监听的 event 指针
* @return int
*/

int
epoll_add(void *arg, struct event *ev)
{

//...
// 如果 event 是 signal event,将要监听的信号量写入 pollop->evsigmask 中
if (ev->ev_events & EV_SIGNAL)
return (evsignal_add(&epollop->evsigmask, ev));
//...
}

在 evsignal_add 函数中,将 event 中携带的,要监听的信号值,设置给 epollop 的 evsigmask 属性。

多路复用库的 dispatch 操作中,在一次阻塞等待结束后,调用一次 evsignal_recalc。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// epoll.c
/**
* @brief 执行一次 epoll_wait,并分发被激活的事件
*
* @param base event_base 指针
* @param arg epolltop指针
* @param tv epoll_wait 超时时间
* @return int
*/

int
epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{

//...
if (evsignal_recalc(&epollop->evsigmask) == -1)
return (-1);
//...
}

evsignal_recalc 中遍历 signal events 列表,通过 sigaction 将进程信号值加入到内核的信号监听中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// signal.c
/**
* @brief 遍历 信号event 队列, 用 sigaction 重新注册需要监听的信号量,以及收到信号后的响应事件
*
* @param evsigmask
* @return int
*/

int
evsignal_recalc(sigset_t *evsigmask)
{

struct sigaction sa;
struct event *ev;
// 如果没添加,把信号事件添加到 event_base 的信号队列中
if (!ev_signal_added) {
ev_signal_added = 1;
event_add(&ev_signal, NULL);
}
// 如果信号队列是空的而且不需要重算,直接返回
if (TAILQ_FIRST(&signalqueue) == NULL && !needrecalc)
return (0);
needrecalc = 0;
// 将 evsigmask 信号集中的信号添加到进程的 sigmask 中
if (sigprocmask(SIG_BLOCK, evsigmask, NULL) == -1)
return (-1);

/* Reinstall our signal handler. */
memset(&sa, 0, sizeof(sa));
sa.sa_handler = evsignal_handler;
sa.sa_mask = *evsigmask;
sa.sa_flags |= SA_RESTART;
// 遍历信号队列,注册新号事件
TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) {
if (sigaction(EVENT_SIGNAL(ev), &sa, NULL) == -1)
return (-1);
}
return (0);
}

当信号触发时,调用一次 evsignal_handler。evsignal_handler中,设置一下信号被触发的标记 evsignal_caught ,信号触发次数 evsigcaught[sig]+1,并向 ev_signal_pair[0] 中写入一个字符 a,此操作会触发 ev_signal 绑定的可读回调 evsignal_cb,将 ev_signal 重新注册进 event_base 中。这里要注意的是,触发 evsignal_cb 的操作本身并无意义,只是重新添加一次 ev_signal,它的真正意义是能立即结束多路复用库的等待阻塞,立即结束 wait 并进入后面的激活事件处理流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//signal.c
/**
* @brief 信号触发时,设置evsignal_caught为1,
* 增加evsigcaught中该sig的触发计数,并向 ev_signal_pair[0] 写入一个字符,以触发 此文件中 ev_signal 的可读事件
*
* @param sig 信号值,是 event 的 fd
*/

static void
evsignal_handler(int sig)
{

evsigcaught[sig]++;
evsignal_caught = 1;

/* Wake up our notification mechanism */
write(ev_signal_pair[0], "a", 1);
}

多路复用 dispatch 中,判断,若 evsignal_caught 标记已经被设置,则调用一次 evsignal_process。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// epoll.c
/**
* @brief 执行一次 epoll_wait,并分发被激活的事件
*
* @param base event_base 指针
* @param arg epolltop指针
* @param tv epoll_wait 超时时间
* @return int
*/

int
epoll_dispatch(struct event_base *base, void *arg, struct timeval *tv)
{

//...
if (res == -1) {
if (errno != EINTR) {
event_warn("epoll_wait");
return (-1);
}
evsignal_process();
return (0);
} else if (evsignal_caught)
evsignal_process();
//...
}

evsignal_process函数中,遍历信号事件队列,从 evsigcaught[sig] 中取出信号的触发次数,将信号 event 加入到 event_base 的激活队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* @brief 处理信号 event,通过 evsigcaught[event->fd] 判断触发次数,将信号 event 放入 event_base 的激活队列
*
*/

void
evsignal_process(void)
{

struct event *ev;
short ncalls;
// 遍历信号 event 队列,找出有收到信号的 event,将其加入激活队列
TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) {
ncalls = evsigcaught[EVENT_SIGNAL(ev)];
if (ncalls) {
if (!(ev->ev_events & EV_PERSIST))
event_del(ev);
event_active(ev, EV_SIGNAL, ncalls);
}
}

memset(evsigcaught, 0, sizeof(evsigcaught));
evsignal_caught = 0;
}

后面的流程就和 IO 一致了,event_base 的 loop 中处理激活队列中的事件,把信号事件处理掉,调用注册的回调。

1
2
3
4
5
6
7
8
// event.c
/**
* @brief 处理激活队列中的事件,每个事件都根据 ev_ncalls 调用对应次数的回调函数,并将其从激活队列中移除
*
* @param base
*/

static void
event_process_active(struct event_base *base)

3.9 evbuffer

除了事件库本身,libevent 还提供了一个内存缓冲区管理的组件,有了它,我们就不需要自己分配、扩容、释放内存空间了。

这块的源码组织有点奇怪,用于管理内存的组件名为 evbuffer,主体逻辑源码位于 buffer.c 中;基于evbuffer封装,用于管理 fd 上的读写事件的组件叫 bufferevent,源码位于 evbuffer.c 中。

evbuffer

evbuffer 结构体的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
struct evbuffer {
// 有数据的内存区块起点
u_char *buffer;
// 原始分配的内存区块起点
u_char *orig_buffer;
// 有数据的内存区块相对于起点的偏移量,即 buffer - orig_buffer
size_t misalign;
// 原始分配的内存区块总长
size_t totallen;
// 实际数据便宜量,如实际数据起点是 *buffer,终点是 buffer + 100,则 off 是 100
size_t off;
// 各种操作完毕后被执行的回调函数
void (*cb)(struct evbuffer *, size_t, size_t, void *);
// 传递给回调函数的最后一个参数
void *cbarg;
};

光看代码和注释还是不够直观,我找到一张图,感觉描述的比较清楚。

image.png

在此数据结构上,提供了一堆用于读、写、扩容、删除等操作的函数,就不一一分析了,这里有一些阅读注释 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/buffer.c。只挑一些比较有意思的点放在这里说一下吧。

在写入数据时,若内存空间不够,则会调用 evbuffer_expand 对内存区块进行扩容,扩容的策略是不够就翻倍。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* @brief 对 buf 的内存区域进行扩容,最小256字节,每次翻倍扩容。扩容前先对数据进行强制对齐。
*
* @param buf 要扩容的 evbuffer
* @param datlen 本次扩容后要插入的数据长度
* @return int
*/

int
evbuffer_expand(struct evbuffer *buf, size_t datlen)
{

// 当前需要的内存块长度
size_t need = buf->misalign + buf->off + datlen;

// 如果已经满足要求,就啥也不用做了
if (buf->totallen >= need)
return (0);

/*
* If the misalignment fulfills our data needs, we just force an
* alignment to happen. Afterwards, we have enough space.
*/

// 如果因 misalign 错位的空间比 datlen 需要的长度大,只需强制进行对齐,完了就有足够的空间了
if (buf->misalign >= datlen) {
evbuffer_align(buf);
} else {
// 如果对齐后空间也不够,则需要进行扩容逻辑
void *newbuf;
size_t length = buf->totallen;
// 最小 256 字节,如果空间不够,则翻倍后再判断,不够则继续翻倍
if (length < 256)
length = 256;
while (length < need)
length <<= 1;
// 扩容时,若内存区块起点和实际内存区块起点不一致(即misalign不为0),则进行一次对齐
if (buf->orig_buffer != buf->buffer)
evbuffer_align(buf);
// 将原内存块的内容复制到新内存块中,并返回新内存块的指针
if ((newbuf = realloc(buf->buffer, length)) == NULL)
return (-1);
// 将 buf 的 orig_buffer 和 buffer 都修改到新的内存块指针
buf->orig_buffer = buf->buffer = newbuf;
// 修改内存块总大小
buf->totallen = length;
}

return (0);
}

清理数据时,只是对 buffer 指向的位置和 off、misalign 的值进行调整,并不真的去做内存操作,能够减小内存操作的开销。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* @brief 把 evbuffer 的实际数据清理一段
*
* @param buf 要被清空的 evbuffer
* @param len 要清空的实际数据长度
*/

void
evbuffer_drain(struct evbuffer *buf, size_t len)
{

size_t oldoff = buf->off;
// 如果传入的 len 比实际长度大,说明全部清完,直接将实际数据的起点设置到内存块头部,实际偏移量设置为0,错位偏移量设置为0,然后调用回调
if (len >= buf->off) {
buf->off = 0;
buf->buffer = buf->orig_buffer;
buf->misalign = 0;
goto done;
}
// 否则只是将 buffer 的指针从实际数据块的头部移动到尾部,并将 misalign 添加一个被清空的长度
// 实际上就是把之前的实际数据忽略了,把内存块的错位偏移量往前移动
// 这样做能避免一次内存拷贝,可以等到后续内存块需要扩容时再做内存拷贝操作
buf->buffer += len;
buf->misalign += len;
// 实际数据长度减去被清理
buf->off -= len;

done:
// 清理完成,调用回调函数
if (buf->off != oldoff && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);

}

数据写入或者删除操作完成后,都会尝试调用回调函数。这样从 fd 中读取数据到 evbuffer 中之后,业务逻辑可以在回调中获取数据操作前后的大小等信息,从而根据实际情况决定下一步动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int
evbuffer_add(struct evbuffer *buf, void *data, size_t datlen)
{

//...
// 添加完毕后调用回调函数
if (datlen && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
//...
}

void
evbuffer_drain(struct evbuffer *buf, size_t len)
{

//...
// 清理完成,调用回调函数
if (buf->off != oldoff && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
//...
}

bufferevent

基于 evbuffer,bufferevent 可以为 fd 封装一套数据读写的事件注册到 event_base 上,并提供了读写高低水位线的功能。下面是 bufferevent 的工作流程。

初始化一个 bufferevent 对象,设置好 读写缓冲区、读写事件、读写回调、出错回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* @brief 初始化一个 bufferevent 对象,设置好 读写缓冲区、读写事件、读写回调、出错回调
*
* @param fd 目标fd
* @param readcb 读取回调
* @param writecb 写入回调
* @param errorcb 出错回调
* @param cbarg 回调时携带的参数
* @return struct bufferevent*
*/

struct bufferevent *
bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
everrorcb errorcb, void *cbarg)

{

struct bufferevent *bufev;
// 初始化 bufferevent 并分配空间
if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
return (NULL);
// 创建输入缓冲区 evbuffer
if ((bufev->input = evbuffer_new()) == NULL) {
free(bufev);
return (NULL);
}
// 创建输出缓冲区 evbuffer
if ((bufev->output = evbuffer_new()) == NULL) {
evbuffer_free(bufev->input);
free(bufev);
return (NULL);
}
// 绑定 fd 的读事件到 bufferevent 的 ev_read
event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
// 绑定 fd 的写事件到 bufferevent 的 ev_write
event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
// 各类回调函数赋值
bufev->readcb = readcb;
bufev->writecb = writecb;
bufev->errorcb = errorcb;
// 回调参数赋值
bufev->cbarg = cbarg;
// enabled 固定为 读写都开启
bufev->enabled = EV_READ | EV_WRITE;

return (bufev);
}

启动 bufferevent 的读写事件,将上面生成的读写 event 注册到 event_base 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* @brief 启用 bufferevent 的 可读/可写事件
*
* @param bufev
* @param event
* @return int
*/

int
bufferevent_enable(struct bufferevent *bufev, short event)
{

if (event & EV_READ) {
if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
return (-1);
}
if (event & EV_WRITE) {
if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
return (-1);
}

bufev->enabled |= event;
return (0);
}

/**
* @brief 将一个 event 添加到 event_base 中
*
* @param ev event
* @param timeout 超时时间,单位秒
* @return int
*/

static int
bufferevent_add(struct event *ev, int timeout)
{

struct timeval tv, *ptv = NULL;

if (timeout) {
timerclear(&tv);
tv.tv_sec = timeout;
ptv = &tv;
}

return (event_add(ev, ptv));
}

从 fd 读取数据时

  • 当 evbuffer 缓冲区中的数据低于水位线,啥也不做
  • 当 evbuffer 缓冲区中的数据高于低水位线,低于高水位线,调用用户设置的读取回调
  • 当 evbuffer 缓冲区中的数据高于高水位线,将 fd 的可读事件从 event_base 上移除,并为 evbuffer 绑定一个回调,等待用户消费数据。回调中当前数据大小下降到高水位线以下时,移除此回调并重新将 fd 的可读事件注册回 event_base 中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* @brief eventbuffer 绑定的 fd 的可读回调,尝试从 fd 中读取数据放到 input 缓冲区中,并控制水位线
*
* @param fd 目标 fd
* @param event 可读 event
* @param arg eventbuffer 对象指针
*/

static void
bufferevent_readcb(int fd, short event, void *arg)
{

//...
// 读取缓冲区中当前的实际数据长度
len = EVBUFFER_LENGTH(bufev->input);
// 如果低水位线的长度不为0,而且当前实际数据长度不到低水位线,则啥也不做
if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
return;
// 如果高水位线的值不为0,而且当前实际数据长度大于高水位线
// 说明已经读过头了,不再读了,把 bufferevent 的可读事件从 event_base 的监听中移除
if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
struct evbuffer *buf = bufev->input;

event_del(&bufev->ev_read);

/* Now schedule a callback for us */
// 为 input buffer 设置一个回调函数
evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
return;
}
// 如果读取缓冲区的实际数据长度在高低水位线之间,则调用一次用户的读取回调
/* Invoke the user callback - must always be called last */
(*bufev->readcb)(bufev, bufev->cbarg);
//...
}

上面的代码中,当前数据大于高水位线时,为 evbuffer 绑定了 bufferevent_read_pressure_cb 回调函数,此函数中检测 evbuffer 做了数据操作之后,最新的数据大小,若小于高水位线了,则删掉 evbuffer 的回调,并将 fd 的可读事件重新注册回 event_base。

通过这种方式,用户就可以通过指定高低水位线的方式控制数据读取的量了,不需要自己管理 fd 的读写事件,非常方便。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @brief 当 bufferevent 的输入缓冲区 input 这个 evbuffer 中的数据被消费,导致实际数据长度在高水位线之下之后
* 将 bufferevent 的读取事件重新注册会 event_base 的监听,以继续从fd中读取数据
*
* @param buf
* @param old
* @param now
* @param arg
*/

void
bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
void *arg)
{

struct bufferevent *bufev = arg;
/*
* If we are below the watermak then reschedule reading if it's
* still enabled.
*/

// 如果 evbuffer 所属的 bufferevent 没有高水位线,或者有高水位线但是当前数据在高水位线之下
if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
// evbuffer 触发一次回调之后,将回调移除
evbuffer_setcb(buf, NULL, NULL);
// 如果 bufferevent 启用了读事件,将其再次加入到 event_base 的监听中
if (bufev->enabled & EV_READ)
bufferevent_add(&bufev->ev_read, bufev->timeout_read);
}
}

写数据时,只有低水位线起作用,将可写事件绑定到 event_base 上,可写触发时就尝试写入数据,如果还有没写完的,则不断循环绑定可写事件,直到数据写完。当数据写到小于写低水位线时,触发一次回调。

1
2
3
4
5
6
7
8
9
10
11
12
13
bufferevent_writecb(int fd, short event, void *arg)
{
// ...
// 如果还没写完,把可写事件重新加回 event_base 的监听,等待下一轮
if (EVBUFFER_LENGTH(bufev->output) != 0)
bufferevent_add(&bufev->ev_write, bufev->timeout_write);

// ...
// 输出缓冲区中的数据写到低水位线之下后,调用一次用户的写数据回调
if (EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
(*bufev->writecb)(bufev, bufev->cbarg);
// ...
}

参考文档

☞ 参与评论