Redis 的事件循环
IO 多路复用
此小节写给对 IO 多路复用不熟悉的同学,如果已经了解,可跳过。
考虑一个场景:一个服务器同时和多个客户端连接,每个客户端在任何时刻都用可能发送内容过来,服务器需要高效、低延迟地对这些客户端进行服务。服务器该如何实现呢?
方案一:多线程方案
要想同时处理这多个连接,一种方法是对每一个连接都分配一个线程,在单个线程里对 socket 使用阻塞型 IO 进行读写。使用多线程有以下缺点:
1、使用多线程会涉及到线程的切换,另外数据可能在不同的 CPU 上移动,即某个线程被不同的 CPU 调度。 2、使用多线程会牵扯到复杂的线程同步,编程容易出错。 3、创建和销毁线程有不小的开销,不过可以使用线程池来缓解。 4、一个并发服务器,比较优化的方案应该是线程数量和 CPU 数量对齐,而不是线程数和客户端数量对齐。
方案二:单线程轮询 (实际不可行,行文需要)
另外一种思路,在多个客户端连接的 socket 上使用非阻塞 IO 来轮训,当发现有可读的 socket 的,对其内容进行处理,然后继续轮训,使用这种思路利用单个线程就可以解决问题。不过这种方案需要频繁进行 IO 操作,涉及到大量的系统调用,会严重拉低服务器的性能,实践中是不可行的。
轮询的目的实际上是为了拿到当前有数据可读、可写去的 socket 连接,假如可以比较高效地得到这些活跃的 socket 连接,降低轮训的开销,以上方案就比较高效了。在操作系统层面,可以比较容易地知道哪些 socket 可以读写,因此可以将轮训的操作交给操作系统来完成。一次性传递给操作系统一组 socket 连接,然后让操作系统在这些 socket 上可以读写的时候通知应用程序,上层应用程序处理这些可读写的 socket,然后再次让操作系统帮忙轮训。
以上思路,就引出了基于 IO 多路复用的方案。
方案三:IO 多路复用
操作系统层面,可以直接和文件系统、网络驱动器交互,因此可以很容易地知道哪些 socket 可读可写(实际上不局限于 socket,准确地说是文件描述符)。操作系统提供了 select 、poll、epoll、kqueue 等系统接口,虽然实现细节、使用方法不同,但解决的问题是相同的。
I/O 多路复用,允许我们同时检查多个文件描述符的就绪状态。通俗一点讲,用户传给内核一组文件描述符,然后让内核在这些文件描述符上可以读或写的时候告诉用户。这样,用户大多数时间都在等待操作系统的通知。接到通知后,根据通知信息就知道那个文件描述符上可以进行 I/O 操作了。其他未产生事件的文件描述符就无需关注。
Redis 事件循环的实现
事件循环对 IO 多路复用的封装,是运用非常广泛的网络编程模型,本文将分析在 Redis 中事件循环是如何实现的。
事件循环的表示
为了屏蔽不同平台上 IO 多路复用相关的接口的差异,Redis 实现了一层抽象,屏蔽了如 select、epoll、kqueue 等之间的差异。事件循环的核心对象是 aeEventLoop,其中记录了事件循环相关的各种变量。
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
其中平台特定的信息存储在 apidata 这个指针里面。如果使用 epoll,apidata 是下面结构体的指针:
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
如果使用 select,apidata 则是如下结构体的指针:
typedef struct aeApiState {
fd_set rfds, wfds;
/* We need to have a copy of the fd sets as it's not safe to reuse
* FD sets after select(). */
fd_set _rfds, _wfds;
} aeApiState;
事件的抽象表示
不同的 IO 多路复用 API 使用不同的结构来记录事件和文件描述符,比如 select 使用 fd_set,epoll 使用 epoll_event。为了屏蔽差异,Redis 定义了 aeFileEvent 和 aeFiredEvent 两个结构体实现抽象。
aeFileEvent::mask 中记录要注册的事件类型,rfileProc 和 wfileProc 是可读可写事件对应的事件回调, clientData 是用户在绑定事件的时候传入的数据,这个之后会被传入读写的回调函数。
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
对于涉及到文件描述符的事件,Redis 中使用一个 aeFileEvent 来表示,其中 mask 用来指定需要注册读或写事件。为了屏蔽事件类型的差异,定义了一些宏表示事件类型。mask 可以是事件类型中的任意一个,或者两者取或。
#define AE_READABLE 1 /* Fire when descriptor is readable. */
#define AE_WRITABLE 2 /* Fire when descriptor is writable. */
所有注册的文件事件都会以 fd 作为下标记录在 aeEventLoop::events 中。另外,时间事件会以链表的形式存放在 “aeEventLoop::timeEventHead` 中。
另外使用 aeFiredEvent 来表示触发后的事件。
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
在 aeEventLoop::events 中使用数组保存了 fd 到 aeFileEvent 的映射,利用 aeFiredEvent 中的 fd 字段即可找到 aeFileEvent,进而可以找到处理读写事件的回调。
底层实现
有了这样的设计,就可以屏蔽底层接口的不同了。比如使用 epoll 来实现事件循环。只需要将 epoll 使用到的专属数据 aeApiState 存储在 aeEventLoop 的 apidata 中。需要添加事件的时候,传递过来的是一个 aeFileEvent ,可以中 mask 属性中得出注册的事件的类型,然后使用 epoll 专用的事件类型来调用 epoll_ctl 等函数来处理事件的注册或删除。
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
使用不同的 API 实现 aeApiAddEvent、 aeApiDelEvent 等函数,然后使用条件编译,就可以使用某种 API 作为 EventLoop 的底层实现了。
// ae.c
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
#ifdef HAVE_EPOLL
#include "ae_epoll.c"
#else
#ifdef HAVE_KQUEUE
#include "ae_kqueue.c"
#else
#include "ae_select.c"
#endif
#endif
#endif
Redis 暴露的接口则对底层接口进行封装,实现通用的跨平台的接口,下面是一个例子:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData) {
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
时间事件的实现方法
在 JavaScript 中,可以调用 setTimeout(func, ms) 设置一个 ms 毫秒后调用的函数 func,这个功能在基于事件循环中很容易实现。这里将其称为时间事件,是在某个时间点到达后触发的事件。
因为 epoll、select 这类函数调用后,如果没有事件发生就会阻塞,并等待事件发生。那么要等多久呢?select 和 epoll_wait 都接受一个超时时间,如果在超时时间范围内没有事件发生,函数就会返回。如果不指的超时时间,那就一直等下去。
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
这里时间事件的实现就是基于这个超时时间实现的,每次添加时间事件时,就将该事件记录下来。在执行 epoll_wait 函数的时候,找到最近将要触发的时间事件,将其时间设置为超时时间(比如两个时间事件,一个是 1 秒后,一个是 10 秒后,此时就设置超时时间为 1 秒后)。当 epoll_wait 函数返回后,有两种可能,一个是发生了超时,一个是某个文件描述符上注册的事件发生了。所以只需要拿当前时间点和时间事件设定的时间比较以下,如果当前时间大于时间事件中设定的定时时间,那就执行任务。如果不大于,那就等待下次事件循环。
回调函数的返回值就很有用,在 Redis 实现中,时间事件的返回值为整数,表示经过多少毫秒后再次执行当前函数。如果只是发生一次的时间事件,可以返回 AE_NOMORE。如果事件需要再次执行,那么返回间隔的毫秒数即可。
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
monotime when;
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
前面提到过,所有的时间事件都要保存下来。而且还需要从中寻找定时最近的时间点,已经找到已经超时的事件。在 Redis 中,时间事件使用 aeTimeEvent 来表示,使用链表来存储。因此,寻找最近将要发生的时间事件是一个线性时间复杂度的过程。在 Redis 中之所以这么做,是因为 Redis 中只使用了一个时间事件。如果要支持大量的时间事件,那么就要修改一下这部分实现,使用时间堆或者哈希表+双向链表来保证能够快速找到最近要触发的时间点。
高层接口
这里解释 Redis 提供的事件循环的 API,Redis 的这套实现比较通用,完全可以在其他地方复用,值得学习。
// 创建与删除事件循环
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
// 注册一个事件,这里 proc 是可读回调还是可写回调是基于 mask 决定的
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
// 返回某个文件描述符上的事件类型
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
// 创建时间事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
// 进行一次事件循环,在这里面调用如 epoll_wait 这样的函数,然后处理事件
// 要构成事件循环,需要在 while 循环中调用此函数
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
// 事件循环,即在 while 中不断调用 aeProcessEvents,知道停止事件循环
void aeMain(aeEventLoop *eventLoop);
// 停止事件循环
void aeStop(aeEventLoop *eventLoop);
// 在一个 fd 上等到 mask 指定的事件,最多能到 milliseconds 毫秒
int aeWait(int fd, int mask, long long milliseconds);
// 得到底层 API 的名称,如 epoll/select/kqueue 等
char *aeGetApiName(void);
// 在 poll 之前调用的回调函数
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
// 在 poll 返回后调用的回调
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
// 得到最大能够监听的文件描述符
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
// 设置是否非阻塞,即在调用 epoll_wait 这类函数的时候,如果没有事件发生,是否阻塞
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);
高层 API 的使用
借助 Redis 代码库中的 EventLoop 和网络相关的工具函数,可以很容易地实现一个 echo 服务器。
#include <stdio.h>
#include "ae.h"
#include "anet.h"
void serveEcho(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask){
char buf[128];
int n = read(fd, buf, sizeof(buf));
if(n < 0){
aeDeleteFileEvent(eventLoop, fd, AE_READABLE);
} else if(n > 0) {
write(fd, buf, n);
}
}
void handleAccept(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask){
char ip[128];
int port = 0;
int connfd = anetTcpAccept(NULL, fd, ip, sizeof(ip), &port);
aeCreateFileEvent(eventLoop, connfd, AE_READABLE, serveEcho, NULL);
}
int main(){
aeEventLoop *el = aeCreateEventLoop(100);
char err[1024];
int fd = anetTcpServer(err, 9009, "127.0.0.1", 10);
if (fd == -1){
fprintf(stderr, "create server failed: %s\n", err);
exit(1);
}
aeCreateFileEvent(el, fd, AE_READABLE, handleAccept, NULL);
aeMain(el);
return 0;
}
下面的例子中使用时间事件,每隔 1000 毫秒打印一次当前时间。
#include "ae.h"
#include <time.h>
#include <stdio.h>
int printTime(struct aeEventLoop *eventLoop, long long id, void *clientData){
time_t now = time(NULL);
printf("current time is: %s\n", ctime(&now));
return 1000;
}
int main(){
aeEventLoop *el = aeCreateEventLoop(100);
aeCreateTimeEvent(el, 1000, printTime, NULL, NULL);
aeMain(el);
return 0;
}
轮询前后的回调
在 Redis 的实现中,某些场景需要在调用 poll 函数之前执行一些操作。比如在 Redis 中处理完命令,将有内容要写给客户端,此时应该向 eventloop 中注册写事件,但如果在处理读事件的过程中,去注册写事件,会不会干扰到当前这一轮的写事件的处理呢。因此,有些操作希望能够放到下一轮的事件循环中执行。Redis 的 EventLoop 提供了两个回调 eventLoop->beforesleep 和 eventLoop->aftersleep,这两个回调分别在 poll 之前和之后调用。