WangYu::Space

Study, think, create, and grow. Teach yourself and teach others.

Redis 的整体设计

分类:Redis创建时间:2021-12-16 00:00:00

Redis 是一个内存型的 KV 数据库,它提供了 string、list、dict、set、sorted-set、stream、geo、hyperloglog 等数据结构,还提供 pubsub 模块实现发布定义。应用程序可以在 Redis 中存储进程所需的数据结构,Redis 简直可以说是一个数据结构服务器。

为了完善 KV 数据库的功能,Redis 实现了 AOF、RDB 两种持久化方案,保证 Redis 停止后,数据不会丢失,至少不会全部丢失。为了支持主从架构,Redis 实现了主从复制。Redis 的数据都存储在内存中,因此 Redis 需要尽可能地节省内存,由于频繁地分配释放内存,必然会出现内存碎片,为此 Redis 基于 jemalloc 实现了内存碎片整理功能。Redis 的其他功能模块,就不再这里逐一描述了,笔者在后面的日子里会对几乎所有模块展开讨论。

本文的目的是对 Redis 的整体实现进行描述,我相信对 Redis 的整体有了初步认识后,再去了解各模块的实现细节就会容易很多。

主体结构

Redis 的实现中,其主体是一个 redisServer 结构体,这里面存储了与 Redis 相关的所有数据,这里列出一些主要的成员,如下:

struct redisServer {
    redisDb *db;
    int dbnum;

    aeEventLoop *el;

    dict *commands;

    list *clients;
};

数据存储

redisServer->db 是存储数据的地方,它包含 dbnum 个 redisDB 元素,每一个 redisDB 就是一个 DB,Redis 就是这样支持多 DB 的。redisDB 的定义如下:

struct redisDb {
    dict *dict;                 /* The keyspace for this DB */
    dict *expires;              /* Timeout of keys with a timeout set */
    // ...
}

redisDb 里面使用一个 dict 来存储 KV 数据,另外使用 expires 用来保存过期时间。dict 中 key 是一个 sds 字符串,value 则是 redisObj 结构,redisObj 的结构如下:

struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS;
    int refcount;
    void *ptr;
}

其中 type 是对象的类型,encoding 是编码方式,lru 字段用于记录该对象的访问时间、频次等信息,refcount 为引用计数,ptr 则为指向底层实际数据结构的指针。跟对细节可以查看 Redis 中数据的编码方式

网络模型

Redis 使用 IO 多路复用的网络模型,在 ae.hae.c 中 Redis 封装了不同平台的 IO 多路复用接口,做了一层抽象,实现了一套跨平台的接口。在 Linux 平台上使用 epoll 配合水平触发模式。具体实现细节可以参考 Redis 的事件循环

initServer() 中,redis 创建了端口并启动监听,之后创建事件循环,给监听的 socket 绑定了事件。

if (server.port != 0 && listenToPort(server.port,&server.ipfd) == C_ERR) {
    /* Note: the following log text is matched by the test suite. */
    serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
    exit(1);
}

server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
    serverPanic("Unrecoverable error creating TCP socket accept handler.");
}

之后会在主函数中启动事件循环,此后就等待客户端连接了。当有新的客户端连接到来时候,就会调用 acceptTcpHandler 函数,这个函数里面会执行 accept 得到客户端的 socket,然后将 socket 封装为一个 connection 结构。可以阅读 connection.h|c 了解 connection 的细节。connection 里面保存了一些回调函数,并实现了读写的方法,使用 connection 的借口,可以避免使用底层的 read 和 write 函数。

connection 创建完成后,会基于此连接创建一个 client:

client *c;
if ((c = createClient(conn)) == NULL) {
    // ...
    connClose(conn); /* May be already closed, just ignore errors */
    return;
}

在创建客户端的代码中,会在事件循环中注册此连结的可读事件,当客户端连接上有数据可读时,就会调用 readQueryFromClient 函数。

if (conn) {
    connEnableTcpNoDelay(conn);
    if (server.tcpkeepalive)
        connKeepAlive(conn,server.tcpkeepalive);
    connSetReadHandler(conn, readQueryFromClient);
    connSetPrivateData(conn, c);
}

命令处理流程

Redis 中客户端使用 client 结构描述,下面是 client 结构的部分字段:

struct client {
    // 当前客户端连接
    connection *conn;
    
    // 当前客户端选择的 DB,指向 server 中的 DB
    redisDb *db;

    // 读缓冲区
    sds querybuf;

    // 当前在执行的命令
    struct redisCommand *cmd;

    // 解析出的命令的参数个数和参数内容
    int argc;
    robj **argv;

    // ...

    // 输出缓冲区。下面的 buf 是固定长度的输出缓冲区,当 buf 存不下的时候,会
    // 将内容存储在 clientReplyBlock 构成的链表中。
    list *reply;

    // 输出缓冲区
    char buf[PROTO_REPLY_CHUNK_BYTES];
};


struct clientReplyBlock {
    size_t size, used;
    char buf[];
};

当客户端连接上有数据可读时,就会调用 readQueryFromClient 函数,这个函数中会读取数据。然后调用 processInputBuffer 根据 Redis 协议执行解析。解析完成后,会调用 processCommandAndResetClient 来执行命令。

最终在 processCommand 函数中会根据第一个参数,在 redisServer->commands 这个 dict 中找到命令,命令使用 redisCommand 结构描述:

typedef void redisCommandProc(client *c);

struct redisCommand {
    /* Declarative data */
    char *name;
    redisCommandProc *proc;
    int arity;
    // ...
};

在 redisCommand 中,有一个类型为 redisCommandProc 的 proc 函数指针,这个指针指向的就是命令的处理函数。在 processCommand 中经过重重判断,最终会调用 call 函数来调用 cmd->proc 执行命令。

在命令执行过程中,可以调用 addReply* 系列函数给出命令的响应。

void addReplyBulk(client *c, robj *obj);
void addReplyBulkCString(client *c, const char *s);
void addReplyBulkCBuffer(client *c, const void *p, size_t len);
void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj *obj);
// ...

addReply 中数据都被优先写入到 client->buf 中。因为 client->buf 是固定长度的 buf,当内容写不下的时候,会写入 reply 中。最后在事件循环的 beforeSleep 回调函数中,会遍历所有客户端,并给有数据需要发送的客户端连接注册写事件的回调函数 sendReplyToClient,此函数最终会调用 connWrite(c->conn, ...) 来发送数据给客户端。

void sendReplyToClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    writeToClient(c,1);
}

数据恢复

redis 有 AOF 和 RDB 两种持久化方案,AOF 就是将 Redis 的命令写入到文件中。下次 redis 启动的时候,执行所有的命令,就能增量地恢复数据。RDB 持久化是将数据保存在文件中,重启时候只需要将文件中的数据读取并解析出来,然后插入到 server->db 中。

redis 启动阶段,会尝试从本地磁盘载入数据。如果同时开启了 RDB 和 AOF 两种持久化方式,会优先尝试载入 AOF,然后尝试 RDB。

loadDataFromDisk();

void loadDataFromDisk(void) {
    if (server.aof_state == AOF_ON) {
        
        int ret = loadAppendOnlyFile(server.aof_filename);
        if (ret == AOF_FAILED || ret == AOF_OPEN_ERR)
            exit(1);
        if (ret == AOF_OK)
            serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
        errno = 0; /* Prevent a stale value from affecting error checking */
        int rdb_flags = RDBFLAGS_NONE;
        // ...
        if (rdbLoad(server.rdb_filename,&rsi,rdb_flags) == C_OK) {
            // ...
        } else if (errno != ENOENT) {
            serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
        // ...
    }
}

AOF 的载入过程就是从 AOF 文件中解析出一条条的命令,然后执行这些命令。RDB 的载入过程是从 RDB 文件解析出数据,然后将其插入到 server->db 中。

数据持久化

关于 Redis 持久化的原理,可以阅读文章 Redis 持久化 详细了解。这里会从宏观层面描述 Redis 如何实现持久化。

AOF 持久化

启动 AOF 持久化后,执行一条会改变数据库内容的命令后,就会将命令内容保存下来。但每执行一条命令,就往磁盘中写数据,这将严重拖慢 redis 的速度。在 redis 实现中,会将 AOF 的数据先写入到 server->aof_buf 中,并在合适的时机写入到文件中。

每次执行了写命令后,会调用 propagate 来传播命令,在 propagate 中会调用 feedAppendOnlyFile 将命令内容写入 AOF 缓冲区。

缓冲区中的数据只有写入到磁盘中,才能保证持久化。那么缓冲区中的数据何时写入磁盘呢?redis 会在每轮事件循环后,将 AOF 数据写入文件,但仅限于调用 write 函数写入。执行 write 写入数据并不意味着数据已经被写入磁盘了,只有内核将内核中的缓存同步到磁盘,数据才真正持久化保存了。为了保证数据真正写入了文件,需要主动调用 fsync 来让内核将数据同步到磁盘。

但是 fsync 是需要操作磁盘的,往往是比较慢的,要想保证数据的安全性,就需要尽可能高频地执行 fsync,但要想保证性能,就需要较少地执行 fsync。为此 redis 提供了三种策略:

在 redis 的 event loop 的 beforeSleep 回调函数中,会执行 AOF 的写入操作。在 serverCron 函数周期性尝试主动执行 fsync。它们调用的都是 flushAppendOnlyFile 函数。

因为 fsync 是一个阻塞操作,如果 redis 在主线程中执行,有可能会长时间阻塞主线程,影响服务。因此如果同步策略不是 always,redis 会在后台线程执行 fsync。

RDB 持久化

如果启动了 RDB 持久化,redis 会周期性地将当前数据库中的数据编码后存入磁盘上的 RDB 文件中。多久保存一次呢?这个是由 save 配置项决定的。save 配置的语法如下:

save 600 10  # 如果在 600 秒以内,数据进行 10 次以上的修改,就执行 RDB 持久化
save 60 1000 # 在 60 秒内,执行了 1000 次以上的修改,就执行 RDB 持久化

redis 会在 serverCron 中周期检查保存 RDB 的条件是否已经满足,则调用 rdbSaveBackground,来保存 RDB 文件。

for (j = 0; j < server.saveparamslen; j++) {
    struct saveparam *sp = server.saveparams+j;

    if (server.dirty >= sp->changes &&
        server.unixtime-server.lastsave > sp->seconds &&
        (server.unixtime-server.lastbgsave_try >
            CONFIG_BGSAVE_RETRY_DELAY ||
            server.lastbgsave_status == C_OK))
    {
        serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
            sp->changes, (int)sp->seconds);
        rdbSaveInfo rsi, *rsiptr;
        rsiptr = rdbPopulateSaveInfo(&rsi);
        rdbSaveBackground(server.rdb_filename,rsiptr);
        break;
    }
}

其中 server.dirty 用来记录对数据库修改的次数,这是在各个命令执行的时候设置的,以 flushdbCommand 为例:

void flushdbCommand(client *c) {
    int flags;

    if (getFlushCommandFlags(c,&flags) == C_ERR) return;
    server.dirty += emptyDb(c->db->id,flags,NULL);
    addReply(c,shared.ok);
    // ...
}

主从同步

主从同步始于 replicaofpsync 这两个命令。给从节点发送 replicaof 后,从节点会开始和主节点建立主从关系。此时从节点会给主节点发送 psync 命令来同步数据。主节点基于从节点当前的状态,判断是进行全量同步还是增量同步。关于同步的实现原理,可以阅读 Redis 主从复制原理 来详细了解。

syncCommand 中,redis 构建了一个状态机,逐步执行握手、保存 RDB、发送 RDB、发送增量数据等等步骤。当 RDB 生成完毕后,会给从节点的连接上设置写回调函数 sendBulkToSlave,在 sendBulkToSlave 中会执行读取 RDB 文件并发送给从节点。

当 RDB 发送完成后,从节点的状态变成 ONLINE,此时会调用 putSlaveOnline,此函数总会修改从节点的连接上的写回调函数为 sendReplyToClient。此后,就进入命令传播阶段。Redis 每次执行一条写命令后,会将命令内容写入从节点的发送缓冲区中,这是在 propagate 函数中进行的。在 propagate 中,会调用 replicationFeedSlaves 将命令写入从节点的发送缓冲区。

void propagate(int dbid, robj **argv, int argc, int flags) {
    if (!server.replication_allowed)
        return;

    // ...

    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

缓冲区中保存的命令会由回调函数 sendReplyToClient 发送给从节点。这里的逻辑和给客户端返回响应是一致的,对主节点而言,从节点就相当于一个客户端。

总结

本文从宏观层面,对 Redis 的核心数据结构、事件驱动网络模型、命令处理方式、主从复制等机制进行了描述。相信对整体的实现有了初步认识后,再仔细阅读各个模块的代码,定能容易很多。

评论 (评论内容仅博主可见,不会公开显示)