WangYu::Space

Study, think, create, and grow. Teach yourself and teach others.

Redis 的事件循环

分类:Redis创建时间:2021-05-04 00:00:00

IO 多路复用

此小节写给对 IO 多路复用不熟悉的同学,如果已经了解,可跳过。

考虑一个场景:一个服务器同时和多个客户端连接,每个客户端在任何时刻都用可能发送内容过来,服务器需要高效、低延迟地对这些客户端进行服务。服务器该如何实现呢?

方案一:多线程方案

要想同时处理这多个连接,一种方法是对每一个连接都分配一个线程,在单个线程里对 socket 使用阻塞型 IO 进行读写。使用多线程有以下缺点:

1、使用多线程会涉及到线程的切换,另外数据可能在不同的 CPU 上移动,即某个线程被不同的 CPU 调度。 2、使用多线程会牵扯到复杂的线程同步,编程容易出错。 3、创建和销毁线程有不小的开销,不过可以使用线程池来缓解。 4、一个并发服务器,比较优化的方案应该是线程数量和 CPU 数量对齐,而不是线程数和客户端数量对齐。

方案二:单线程轮询 (实际不可行,行文需要)

另外一种思路,在多个客户端连接的 socket 上使用非阻塞 IO 来轮训,当发现有可读的 socket 的,对其内容进行处理,然后继续轮训,使用这种思路利用单个线程就可以解决问题。不过这种方案需要频繁进行 IO 操作,涉及到大量的系统调用,会严重拉低服务器的性能,实践中是不可行的。

轮询的目的实际上是为了拿到当前有数据可读、可写去的 socket 连接,假如可以比较高效地得到这些活跃的 socket 连接,降低轮训的开销,以上方案就比较高效了。在操作系统层面,可以比较容易地知道哪些 socket 可以读写,因此可以将轮训的操作交给操作系统来完成。一次性传递给操作系统一组 socket 连接,然后让操作系统在这些 socket 上可以读写的时候通知应用程序,上层应用程序处理这些可读写的 socket,然后再次让操作系统帮忙轮训。

以上思路,就引出了基于 IO 多路复用的方案。

方案三:IO 多路复用

操作系统层面,可以直接和文件系统、网络驱动器交互,因此可以很容易地知道哪些 socket 可读可写(实际上不局限于 socket,准确地说是文件描述符)。操作系统提供了 selectpollepollkqueue 等系统接口,虽然实现细节、使用方法不同,但解决的问题是相同的。

I/O 多路复用,允许我们同时检查多个文件描述符的就绪状态。通俗一点讲,用户传给内核一组文件描述符,然后让内核在这些文件描述符上可以读或写的时候告诉用户。这样,用户大多数时间都在等待操作系统的通知。接到通知后,根据通知信息就知道那个文件描述符上可以进行 I/O 操作了。其他未产生事件的文件描述符就无需关注。

Redis 事件循环的实现

事件循环对 IO 多路复用的封装,是运用非常广泛的网络编程模型,本文将分析在 Redis 中事件循环是如何实现的。

事件循环的表示

为了屏蔽不同平台上 IO 多路复用相关的接口的差异,Redis 实现了一层抽象,屏蔽了如 select、epoll、kqueue 等之间的差异。事件循环的核心对象是 aeEventLoop,其中记录了事件循环相关的各种变量。

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

其中平台特定的信息存储在 apidata 这个指针里面。如果使用 epoll,apidata 是下面结构体的指针:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

如果使用 selectapidata 则是如下结构体的指针:

typedef struct aeApiState {
    fd_set rfds, wfds;
    /* We need to have a copy of the fd sets as it's not safe to reuse
     * FD sets after select(). */
    fd_set _rfds, _wfds;
} aeApiState;

事件的抽象表示

不同的 IO 多路复用 API 使用不同的结构来记录事件和文件描述符,比如 select 使用 fd_setepoll 使用 epoll_event。为了屏蔽差异,Redis 定义了 aeFileEventaeFiredEvent 两个结构体实现抽象。

aeFileEvent::mask 中记录要注册的事件类型,rfileProcwfileProc 是可读可写事件对应的事件回调, clientData 是用户在绑定事件的时候传入的数据,这个之后会被传入读写的回调函数。

/* File event structure */
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

对于涉及到文件描述符的事件,Redis 中使用一个 aeFileEvent 来表示,其中 mask 用来指定需要注册读或写事件。为了屏蔽事件类型的差异,定义了一些宏表示事件类型。mask 可以是事件类型中的任意一个,或者两者取或。

#define AE_READABLE 1   /* Fire when descriptor is readable. */
#define AE_WRITABLE 2   /* Fire when descriptor is writable. */

所有注册的文件事件都会以 fd 作为下标记录在 aeEventLoop::events 中。另外,时间事件会以链表的形式存放在 “aeEventLoop::timeEventHead` 中。

另外使用 aeFiredEvent 来表示触发后的事件。

/* A fired event */
typedef struct aeFiredEvent {
    int fd;
    int mask;
} aeFiredEvent;

aeEventLoop::events 中使用数组保存了 fd 到 aeFileEvent 的映射,利用 aeFiredEvent 中的 fd 字段即可找到 aeFileEvent,进而可以找到处理读写事件的回调。

底层实现

有了这样的设计,就可以屏蔽底层接口的不同了。比如使用 epoll 来实现事件循环。只需要将 epoll 使用到的专属数据 aeApiState 存储在 aeEventLoopapidata 中。需要添加事件的时候,传递过来的是一个 aeFileEvent ,可以中 mask 属性中得出注册的事件的类型,然后使用 epoll 专用的事件类型来调用 epoll_ctl 等函数来处理事件的注册或删除。

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;


static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

使用不同的 API 实现 aeApiAddEventaeApiDelEvent 等函数,然后使用条件编译,就可以使用某种 API 作为 EventLoop 的底层实现了。

// ae.c

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

Redis 暴露的接口则对底层接口进行封装,实现通用的跨平台的接口,下面是一个例子:

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
                      aeFileProc *proc, void *clientData) {
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

时间事件的实现方法

在 JavaScript 中,可以调用 setTimeout(func, ms) 设置一个 ms 毫秒后调用的函数 func,这个功能在基于事件循环中很容易实现。这里将其称为时间事件,是在某个时间点到达后触发的事件。

因为 epollselect 这类函数调用后,如果没有事件发生就会阻塞,并等待事件发生。那么要等多久呢?selectepoll_wait 都接受一个超时时间,如果在超时时间范围内没有事件发生,函数就会返回。如果不指的超时时间,那就一直等下去。

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

这里时间事件的实现就是基于这个超时时间实现的,每次添加时间事件时,就将该事件记录下来。在执行 epoll_wait 函数的时候,找到最近将要触发的时间事件,将其时间设置为超时时间(比如两个时间事件,一个是 1 秒后,一个是 10 秒后,此时就设置超时时间为 1 秒后)。当 epoll_wait 函数返回后,有两种可能,一个是发生了超时,一个是某个文件描述符上注册的事件发生了。所以只需要拿当前时间点和时间事件设定的时间比较以下,如果当前时间大于时间事件中设定的定时时间,那就执行任务。如果不大于,那就等待下次事件循环。

回调函数的返回值就很有用,在 Redis 实现中,时间事件的返回值为整数,表示经过多少毫秒后再次执行当前函数。如果只是发生一次的时间事件,可以返回 AE_NOMORE。如果事件需要再次执行,那么返回间隔的毫秒数即可。

/* Time event structure */
typedef struct aeTimeEvent {
    long long id; /* time event identifier. */
    monotime when;
    aeTimeProc *timeProc;
    aeEventFinalizerProc *finalizerProc;
    void *clientData;
    struct aeTimeEvent *prev;
    struct aeTimeEvent *next;
    int refcount; /* refcount to prevent timer events from being
  		   * freed in recursive time event calls. */
} aeTimeEvent;

前面提到过,所有的时间事件都要保存下来。而且还需要从中寻找定时最近的时间点,已经找到已经超时的事件。在 Redis 中,时间事件使用 aeTimeEvent 来表示,使用链表来存储。因此,寻找最近将要发生的时间事件是一个线性时间复杂度的过程。在 Redis 中之所以这么做,是因为 Redis 中只使用了一个时间事件。如果要支持大量的时间事件,那么就要修改一下这部分实现,使用时间堆或者哈希表+双向链表来保证能够快速找到最近要触发的时间点。

高层接口

这里解释 Redis 提供的事件循环的 API,Redis 的这套实现比较通用,完全可以在其他地方复用,值得学习。

// 创建与删除事件循环
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);

// 注册一个事件,这里 proc 是可读回调还是可写回调是基于 mask 决定的
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);

// 返回某个文件描述符上的事件类型
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);

// 创建时间事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

// 进行一次事件循环,在这里面调用如 epoll_wait 这样的函数,然后处理事件
// 要构成事件循环,需要在 while 循环中调用此函数
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
// 事件循环,即在 while 中不断调用 aeProcessEvents,知道停止事件循环 
void aeMain(aeEventLoop *eventLoop);
// 停止事件循环
void aeStop(aeEventLoop *eventLoop);

// 在一个 fd 上等到 mask 指定的事件,最多能到 milliseconds 毫秒
int aeWait(int fd, int mask, long long milliseconds);

// 得到底层 API 的名称,如 epoll/select/kqueue 等
char *aeGetApiName(void);

// 在 poll 之前调用的回调函数
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
// 在 poll 返回后调用的回调
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);

// 得到最大能够监听的文件描述符
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);

// 设置是否非阻塞,即在调用 epoll_wait 这类函数的时候,如果没有事件发生,是否阻塞
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);

高层 API 的使用

借助 Redis 代码库中的 EventLoop 和网络相关的工具函数,可以很容易地实现一个 echo 服务器。

#include <stdio.h>
#include "ae.h"
#include "anet.h"

void serveEcho(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask){
    char buf[128];
    int n = read(fd, buf, sizeof(buf));
    if(n < 0){
        aeDeleteFileEvent(eventLoop, fd, AE_READABLE);
    } else if(n > 0) {
        write(fd, buf, n);
    }
}

void handleAccept(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask){
    char ip[128];
    int port = 0;
    int connfd = anetTcpAccept(NULL, fd, ip, sizeof(ip), &port);

    aeCreateFileEvent(eventLoop, connfd, AE_READABLE, serveEcho, NULL);
}

int main(){
    aeEventLoop *el = aeCreateEventLoop(100);

    char err[1024];
    int fd = anetTcpServer(err, 9009, "127.0.0.1", 10);
  	if (fd == -1){
    	fprintf(stderr, "create server failed: %s\n", err);
      exit(1);
    }

    aeCreateFileEvent(el, fd, AE_READABLE, handleAccept, NULL);

    aeMain(el);
  
	  return 0;
}

下面的例子中使用时间事件,每隔 1000 毫秒打印一次当前时间。

#include "ae.h"
#include <time.h>
#include <stdio.h>


int printTime(struct aeEventLoop *eventLoop, long long id, void *clientData){
    time_t now = time(NULL);
    printf("current time is: %s\n", ctime(&now));
    return 1000;
}

int main(){
    aeEventLoop *el = aeCreateEventLoop(100);

    aeCreateTimeEvent(el, 1000, printTime, NULL, NULL);

    aeMain(el);

	  return 0;
}

轮询前后的回调

在 Redis 的实现中,某些场景需要在调用 poll 函数之前执行一些操作。比如在 Redis 中处理完命令,将有内容要写给客户端,此时应该向 eventloop 中注册写事件,但如果在处理读事件的过程中,去注册写事件,会不会干扰到当前这一轮的写事件的处理呢。因此,有些操作希望能够放到下一轮的事件循环中执行。Redis 的 EventLoop 提供了两个回调 eventLoop->beforesleepeventLoop->aftersleep,这两个回调分别在 poll 之前和之后调用。

评论 (评论内容仅博主可见,不会公开显示)