Redis源码 - 如何处理命令

本文亦可以看作事件处理 PART2 的续篇,上一篇谈了谈Redis 客户端连接的过程和客户端数据就绪过程

本文主要想聊聊数据就绪后,如何处理命令,如何Reply给客户端

从一条命令的执行说起

127.0.0.1:6379> set key1 v1

(gdb) bt
(gdb) bt
#0  _addReplyToBuffer (c=0x7ffff6b4e680, s=0x7ffff6a16689 "+OK\r\n", len=5) at networking.c:214
#1  0x0000000000437ba1 in addReply (c=0x7ffff6b4e680, obj=0x7ffff6a178c0) at networking.c:340
#2  0x000000000044b922 in setGenericCommand (c=0x7ffff6b4e680, flags=0, key=0x7ffff6a551b8, val=0x7ffff6a551a0, expire=0x0, unit=0, ok_reply=0x0, abort_reply=0x0) at t_string.c:92
#3  0x000000000044bbc6 in setCommand (c=0x7ffff6b4e680) at t_string.c:139
#4  0x0000000000429c9d in call (c=0x7ffff6b4e680, flags=15) at server.c:2265
#5  0x000000000042a7e9 in processCommand (c=0x7ffff6b4e680) at server.c:2544
#6  0x000000000043a377 in processInputBuffer (c=0x7ffff6b4e680) at networking.c:1311
#7  0x000000000043a668 in readQueryFromClient (el=0x7ffff6a2f0a0, fd=5, privdata=0x7ffff6b4e680, mask=1) at networking.c:1375
#8  0x0000000000421c85 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:431
#9  0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#10 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133

server 利用epoll_wait监听就绪事件,当客户端的请求到达时触发回调(上一篇已详细说明,本文不再重复)

回调即从readQueryFromClient开始,处理命令请求

readQueryFromClient-> processInputBuffer-> processCommand->call->setCommand...

  • readQueryFromClient 函数会从客户端连接的 socket 中,读取数据

  • processInputBuffer 解析收到的客户端数据,处理两类命令

    • 管道命令 PROTO_REQ_INLINE
    • RESP命令 PROTO_REQ_MULTIBULK
  • processCommand

    • lookupCommand 查找匹配命令,据解析的命令名称,在 commands 对应的哈希表中查找相应的命令--redisCommandTable
    • call()处理命令
    • addReply
    • 调用 _addReplyToBuffer 等函数,将要返回的结果添加到客户端的输出缓冲区中
    • processCommand还需要处理达到内存限制的情况,将在后面的文章探讨
/* Call() is the core of Redis execution of a command.
 *
 * The following flags can be passed:
 * CMD_CALL_NONE        No flags.
 * CMD_CALL_SLOWLOG     Check command speed and log in the slow log if needed.
 * CMD_CALL_STATS       Populate command stats.
 * CMD_CALL_PROPAGATE_AOF   Append command to AOF if it modified the dataset
 *                          or if the client flags are forcing propagation.
 * CMD_CALL_PROPAGATE_REPL  Send command to salves if it modified the dataset
 *                          or if the client flags are forcing propagation.
 * CMD_CALL_PROPAGATE   Alias for PROPAGATE_AOF|PROPAGATE_REPL.
 * CMD_CALL_FULL        Alias for SLOWLOG|STATS|PROPAGATE.
 *
 * The exact propagation behavior depends on the client flags.
 * Specifically:
 *
 * 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
 *    and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
 *    in the call flags, then the command is propagated even if the
 *    dataset was not affected by the command.
 * 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
 *    are set, the propagation into AOF or to slaves is not performed even
 *    if the command modified the dataset.
 *
 * Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
 * or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
 * slaves propagation will never occur.
 *
 * Client flags are modified by the implementation of a given command
 * using the following API:
 *
 * forceCommandPropagation(client *c, int flags);
 * preventCommandPropagation(client *c);
 * preventCommandAOF(client *c);
 * preventCommandReplication(client *c);
 *
 */
 void call(client *c, int flags)

/* If this function gets called we already read a whole
 * command, arguments are in the client argv/argc fields.
 * processCommand() execute the command or prepare the
 * server for a bulk read from the client.
 *
 * If C_OK is returned the client is still alive and valid and
 * other operations can be performed by the caller. Otherwise
 * if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
    /* The QUIT command is handled separately. Normal command procs will
     * go through checking for replication and QUIT will cause trouble
     * when FORCE_REPLICATION is enabled and would be implemented in
     * a regular command proc. */
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }

    /* Now lookup the command and check ASAP about trivial error conditions
     * such as wrong arity, bad command name and so forth. */
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    ......
}

struct redisCommand *lookupCommand(sds name) {
    return dictFetchValue(server.commands, name);
}

addReply之后,数据如何写回给client呢?

aeMain loop循环调用beforeSleep->handleClientsWithPendingWrites->writeToClient

(gdb) bt
#0  writeToClient (fd=5, c=0x7ffff6b4e680, handler_installed=0) at networking.c:967
#1  0x0000000000439733 in handleClientsWithPendingWrites () at networking.c:1010
#2  0x000000000042795d in beforeSleep (eventLoop=0x7ffff6a2f0a0) at server.c:1366
#3  0x0000000000421e57 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:488
#4  0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133

最终写到socket 发送缓冲区, 系统调用write写回客户端

write是如何处理数据的呢?

发送数据从用户层通过系统调用进入到内核逻辑,通过 fd 文件描述符,找到对应的文件,然后再找到与文件关联的对应的 socket,进行发送数据

socket 文件系统处理

write -> fd -> file -> sock_sendmsg

/* ./include/linux/fs.h */
SYSCALL_DEFINE3(write, unsigned int, fd, const char __user *, buf,
        size_t, count) {
    return ksys_write(fd, buf, count);
}
ssize_t ksys_write(unsigned int fd, const char __user *buf, size_t count) {
    ...
    if (f.file) {
        ...
        ret = vfs_write(f.file, buf, count, &pos);
        ...
    }
    ...
}

ssize_t vfs_write(struct file *file, const char __user *buf, size_t count, loff_t *pos) {
    ...
    ret = __vfs_write(file, buf, count, pos);
    ...
}

ssize_t __vfs_write(struct file *file, const char __user *p, size_t count,
            loff_t *pos) {
    if (file->f_op->write)
        return file->f_op->write(file, p, count, pos);
    /* sock_write_iter */
    else if (file->f_op->write_iter)
        return new_sync_write(file, p, count, pos);
    else
        return -EINVAL;
}

static ssize_t new_sync_write(struct file *filp, const char __user *buf, size_t len, loff_t *ppos) {
    ...
    ret = call_write_iter(filp, &kiocb, &iter);
    ...
}

static inline ssize_t call_write_iter(struct file *file, struct kiocb *kio,
                      struct iov_iter *iter) {
    /* sock_write_iter */
    return file->f_op->write_iter(kio, iter);
}

static ssize_t sock_write_iter(struct kiocb *iocb, struct iov_iter *from) {
    struct file *file = iocb->ki_filp;
    struct socket *sock = file->private_data;
    struct msghdr msg = {
        .msg_iter = *from, 
        .msg_iocb = iocb
    };
    ssize_t res;
    ...
    res = sock_sendmsg(sock, &msg);
    *from = msg.msg_iter;
    return res;
}

int sock_sendmsg(struct socket *sock, struct msghdr *msg) {
    int err = security_socket_sendmsg(sock, msg,
                      msg_data_left(msg));

    return err ?: sock_sendmsg_nosec(sock, msg);
}

/* net/socket.c */
static inline int sock_sendmsg_nosec(struct socket *sock, struct msghdr *msg) {
    /* inet_sendmsg */
    int ret = sock->ops->sendmsg(sock, msg, msg_data_left(msg));
    ...
}

/* net/ipv4/af_inet.c */
int inet_sendmsg(struct socket *sock, struct msghdr *msg, size_t size) {
    struct sock *sk = sock->sk;
    ...
    return sk->sk_prot->sendmsg(sk, msg, size);
}

/* Location: net/ipv4/tcp.c
  *
  * Parameter:
  * sk 传输所使用的套接字
  * msg 要传输的用户层的数据包
  * size 用户要传输的数据的大小
  */
int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size) {
    ...
    //主要是把用户层的数据填充到内核的发送队列进行发送
    ret = tcp_sendmsg_locked(sk, msg, size);
    ...
}

......

/* Push out any pending frames which were held back due to
 * TCP_CORK or attempt at coalescing tiny packets.
 * The socket must be locked by the caller.
 */
void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss,
			       int nonagle)
{
	/* If we are closed, the bytes will have to remain here.
	 * In time closedown will finish, we empty the write queue and
	 * all will be happy.
	 */
	if (unlikely(sk->sk_state == TCP_CLOSE))
		return;

	if (tcp_write_xmit(sk, cur_mss, nonagle, 0,
			   sk_gfp_mask(sk, GFP_ATOMIC)))
		tcp_check_probe_timer(sk);
}

/* This routine writes packets to the network.  It advances the
 * send_head.  This happens as incoming acks open up the remote
 * window for us.
 *
 * LARGESEND note: !tcp_urg_mode is overkill, only frames between
 * snd_up-64k-mss .. snd_up cannot be large. However, taking into
 * account rare use of URG, this is not a big flaw.
 *
 * Send at most one packet when push_one > 0. Temporarily ignore
 * cwnd limit to force at most one packet out when push_one == 2.

 * Returns true, if no segments are in flight and we have queued segments,
 * but cannot send anything now because of SWS or another problem.
 */
static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle,
			   int push_one, gfp_t gfp)

如👆的调用链路,最终处理在 tcp_sendmsg ,进一步说,socket系统调用完成数据从用户空间到内核空间的copy,最后交给上层协议TCP/IP处理

  • sk_buff socket 数据缓存,sk_buff 用于保存接收或者发送的数据报文信息,目的为了方便网络协议栈的各层间进行无缝传递数据

TCP层处理

tcp_sendmsg->tcp_sendmsg_locked->__tcp_push_pending_frames( or tcp_push_one)-> tcp_write_xmit

tcp_write_xmit()的核心部分为一个循环,每次调用tcp_send_head()获取头部sk_buff,若已经读完则退出循环。循环内逻辑为:

  1. 调用tcp_init_tso_segs()进行TSO(TCP Segmentation Offload)相关工作。当需要发送较大的网络包的时候,我们可以选择在协议栈中进行分段,也可以选择延迟到硬件网卡去进行自动分段以降低CPU负载。
  2. 调用tcp_cwnd_test()检查现在拥塞窗口是否允许发包,如果允许,返回可以发送多少个sk_buff。
  3. 调用tcp_snd_wnd_test()检测当前第一个sk_buff的序列号是否满足要求: sk_buff 中的 end_seq 和 tcp_wnd_end(tp) 之间的关系,也即这个 sk_buff 是否在滑动窗口的允许范围之内。
  4. tso_segs为1可能是nagle协议导致,需要进行判断。其次需要判断TSO是否延迟到硬件网卡进行。
  5. 调用tcp_mss_split_point()判断是否会因为超出 mss 而分段,还会判断另一个条件,就是是否在滑动窗口的运行范围之内,如果小于窗口的大小,也需要分段,也即需要调用 tso_fragment()。
  6. 调用tcp_small_queue_check()检查是否需要采取小队列:TCP小队列对每个TCP数据流中,能够同时参与排队的字节数做出了限制,这个限制是通过net.ipv4.tcp_limit_output_bytes内核选项实现的。当TCP发送的数据超过这个限制时,多余的数据会被放入另外一个队列中,再通过tastlet机制择机发送。由于该限制的存在,TCP通过一味增大缓冲区的方式是无法发出更多的数据包的。
  7. 调用tcp_transmit_skb()完成sk_buff的真正发送工作,将数据通过 tcp_transmit_skb 填充 TCP 头部,从传输层发送到 IP 层处理

复杂的流控也在这里,暂不进行展开

IP/MAC/Driver 层处理

过于底层,略

最后的话

本文省略了RESP协议解析过程,网络上已经有很多优秀的文章讲解,这里不再画蛇添足

Redis面向的是内存数据库的操作,选择了简洁的设计模式

易于解析(序列化和反序列化)、易于理解,值得借鉴

参考: read more