Redis源码 - Scale机制 Cluster part1

Redis Cluster设计目标

  • High performance and linear scalability ,最多可达 1000 个节点。该系统没有代理,使用异步复制,且不对值执行合并操作

  • write safety:系统试图(尽力而为地)保留所有来自连接了大多数主节点的客户端的写入。通常情况下,确认写入可能丢失的时间窗口很小。当客户端处于少数分区时,丢失确认写入的时间窗口会更大

  • availability :Redis Cluster 能够在大多数主节点可达的分区中存活,并且对于每个不再可达的主节点,至少有一个可达的副本。

    • 通过replicas migration实现

其他功能:

  • cluster划分若干slots,key存储再slot对应的node上
  • 实现单机版 Redis 中所有单键命令
  • 对于执行复杂的多键操作(如集合并和交集)的命令,只有当操作中涉及的所有键都散列到同一个slot才支持
  • 实现hash-tag,用于强制将某些键存储在同一个哈希槽中
  • cluster 不支持像独立版本 Redis 那样的多个数据库。我们仅支持数据库 0,禁止使用select

核心组件Overview

通信机制

在 Redis Cluster 中,节点负责保存数据并维护集群的状态,包括将键映射到正确的节点。集群节点还能够自动发现其他节点,检测非工作节点,并在需要时将副本节点提升为主节点,以便在发生故障时继续运行

所有集群节点都使用 TCP 总线和二进制协议连接,称为 Redis Cluster Bus。每个节点都通过集群总线连接到集群中的其他节点。节点使用一种Gossip协议来传播有关集群的信息,以发现新节点,发送 ping 数据包以确保所有其他节点正常工作,并发送集群消息以信号特定条件。集群总线还用于在集群中传播发布/订阅消息,并在用户请求时协调手动故障转移(手动故障转移是由系统管理员直接发起的故障转移,而不是由 Redis Cluster 故障检测器发起的)

通信消息

  • redis 定义了一个结构体 clusterMsg,它用来表示节点间通信的一条消息。它包含的信息包括发送消息节点的名称、IP、集群通信端口和负责的 slots,以及消息类型、消息长度和具体的消息体

  • 重点是clusterMsgDataGossip结构

    • 记录ping_sent 节点发送Ping的时间
    • 记录pong_received 节点收到Pong的时间
/* Cluster node flags and macros. */
#define CLUSTER_NODE_MASTER 1     /* The node is a master */
#define CLUSTER_NODE_SLAVE 2      /* The node is a slave */
#define CLUSTER_NODE_PFAIL 4      /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL 8       /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF 16    /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR   64  /* We don't know the address of this node */
#define CLUSTER_NODE_MEET 128     /* Send a MEET message to this node */

/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
    mstime_t ctime;             /* Link creation time */
    int fd;                     /* TCP socket file descriptor */
    sds sndbuf;                 /* Packet send buffer */
    sds rcvbuf;                 /* Packet reception buffer */
    struct clusterNode *node;   /* Node related to this link if any, or NULL */
} clusterLink;
typedef struct {
    char sig[4];        /* Signature "RCmb" (Redis Cluster message bus). */
    uint32_t totlen;    /* Total length of this message */
    uint16_t ver;       /* Protocol version, currently set to 1. */
    uint16_t port;      /* TCP base port number. */
    uint16_t type;      /* Message type */
    uint16_t count;     /* Only used for some kind of messages. */
    uint64_t currentEpoch;  /* The epoch accordingly to the sending node. */
    uint64_t configEpoch;   /* The config epoch if it's a master, or the last
                               epoch advertised by its master if it is a
                               slave. */
    uint64_t offset;    /* Master replication offset if node is a master or
                           processed replication offset if node is a slave. */
    char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
    unsigned char myslots[CLUSTER_SLOTS/8];
    char slaveof[CLUSTER_NAMELEN];
    char myip[NET_IP_STR_LEN];    /* Sender IP, if not all zeroed. */
    char notused1[34];  /* 34 bytes reserved for future usage. */
    uint16_t cport;      /* Sender TCP cluster bus port */
    uint16_t flags;      /* Sender node flags */
    unsigned char state; /* Cluster state from the POV of the sender */
    unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
    union clusterMsgData data;
} clusterMsg;


union clusterMsgData {
    /* PING, MEET and PONG */
    struct {
        /* Array of N clusterMsgDataGossip structures */
        clusterMsgDataGossip gossip[1];
    } ping;

    /* FAIL */
    struct {
        clusterMsgDataFail about;
    } fail;

    /* PUBLISH */
    struct {
        clusterMsgDataPublish msg;
    } publish;

    /* UPDATE */
    struct {
        clusterMsgDataUpdate nodecfg;
    } update;

    /* MODULE */
    struct {
        clusterMsgModule msg;
    } module;
};

/* Initially we don't know our "name", but we'll find it once we connect
 * to the first node, using the getsockname() function. Then we'll use this
 * address for all the next messages. */
typedef struct {
    char nodename[CLUSTER_NAMELEN];
    uint32_t ping_sent;
    uint32_t pong_received;
    char ip[NET_IP_STR_LEN];  /* IP address last time it was seen */
    uint16_t port;              /* base port last time it was seen */
    uint16_t cport;             /* cluster port last time it was seen */
    uint16_t flags;             /* node->flags copy */
    uint32_t notused1;
} clusterMsgDataGossip;


/* Message types.
 *
 * Note that the PING, PONG and MEET messages are actually the same exact
 * kind of packet. PONG is the reply to ping, in the exact format as a PING,
 * while MEET is a special PING that forces the receiver to add the sender
 * as a node (if it is not already in the list). */
#define CLUSTERMSG_TYPE_PING 0          /* Ping */
#define CLUSTERMSG_TYPE_PONG 1          /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2          /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3          /* Mark node xxx as failing */
#define CLUSTERMSG_TYPE_PUBLISH 4       /* Pub/Sub Publish propagation */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6     /* Yes, you have my vote */
#define CLUSTERMSG_TYPE_UPDATE 7        /* Another node slots configuration */
#define CLUSTERMSG_TYPE_MFSTART 8       /* Pause clients for manual failover */
#define CLUSTERMSG_TYPE_MODULE 9        /* Module cluster API message. */
#define CLUSTERMSG_TYPE_COUNT 10        /* Total number of message types. */

Node Handshake

  • cluster启动触发

    • 创建TCP监听连接initServer->clusterInit,创建clusterLink对象用于监听来自其他node的通信
    • clusterAcceptHandler 完成接收连接的工作
  • CLUSTER MEET 加入节点,处理命令后会更新本地server.cluster->nodes,然后处理连接

    • 一个节点只会以两种方式将另一个节点视为集群的一部分

      • 一个节点通过 MEET 消息MEET 消息与 PING 消息完全相同,但会强制当前节点接受发送节点作为集群的一部分
      • 如果已经信任的节点向其他节点传递关于该节点的信息。例如,如果节点 A 知道节点 B,而节点 B 知道节点 C,那么最终节点 B 将向节点 A 传递关于节点 C 的gossip消息。当这种情况发生时,节点 A 将注册节点 C 为网络的一部分,并尝试连接节点 C
    • 实现: clusterCron 中将加入集群且未创建连接的node - server.cluster->nodes,建立tcp连接

//clusterInit函数
    if (listenToPort(server.port+CLUSTER_PORT_INCR,
        server.cfd,&server.cfd_count) == C_ERR)
    {
        exit(1);
    } else {
        int j;

        for (j = 0; j < server.cfd_count; j++) {
            if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
                clusterAcceptHandler, NULL) == AE_ERR)
                    serverPanic("Unrecoverable error creating Redis Cluster "
                                "file event.");
        }
    }
//clusterCron函数
if (node->link == NULL) {
            int fd;
            mstime_t old_ping_sent;
            clusterLink *link;

            fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
                node->cport, NET_FIRST_BIND_ADDR);
            if (fd == -1) {
                /* We got a synchronous error from connect before
                 * clusterSendPing() had a chance to be called.
                 * If node->ping_sent is zero, failure detection can't work,
                 * so we claim we actually sent a ping now (that will
                 * be really sent as soon as the link is obtained). */
                if (node->ping_sent == 0) node->ping_sent = mstime();
                serverLog(LL_DEBUG, "Unable to connect to "
                    "Cluster Node [%s]:%d -> %s", node->ip,
                    node->cport, server.neterr);
                continue;
            }
            link = createClusterLink(node);
            link->fd = fd;
            node->link = link;
            aeCreateFileEvent(server.el,link->fd,AE_READABLE,
                    clusterReadHandler,link);
            ......
}

Gossip Protocol

  • 按一定频率随机选一些节点进行通信 serverCron->clusterCron 100ms执行一次

  • 每执行10次clusterCron函数,执行1次该分支代码,即每秒发送一次ping

  • 随机选5个节点,然后从中选择最久没有与之通信的节点,发送ping命令 - clusterSendPing

    • freshnodes = dictSize(server.cluster->nodes)-2 等于集群节点数减 2
    • wanted 默认值是集群节点数的 1/10,但是如果这个默认值小于 3,那么 wanted 就等于 3。如果这个默认值大于 freshnodes,那么 wanted 就等于 freshnodes 的大小
    • maxiterations = wanted*3
  • 收到ping的节点使用回调函数clusterReadHandler进行处理

    • clusterReadHandler -> clusterProcessPacket

    • 最终调用clusterSendPing 发送回复PONG

    • 收到Pong 消息的节点,就可以根据消息体中的信息,更新本地记录的对应节点的信息了 - clusterProcessPacket函数处理

      • 当收到Pong消息时,更新本地记录的目标节点Pong消息最新返回时间
      • 如果发送消息的节点是主节点,更新本地记录的slots分布信息 - 调用clusterProcessGossipSection函数处理Ping或Pong消息的消息体
/* Ping some random node 1 time every 10 iterations, so that we usually ping
     * one random node every second. */
    if (!(iteration % 10)) {
        int j;

        /* Check a few random nodes and ping the one with the oldest
         * pong_received time. */
        for (j = 0; j < 5; j++) {
            de = dictGetRandomKey(server.cluster->nodes);
            clusterNode *this = dictGetVal(de);

            /* Don't ping nodes disconnected or with a ping currently active. */
            if (this->link == NULL || this->ping_sent != 0) continue;
            if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
                continue;
            if (min_pong_node == NULL || min_pong > this->pong_received) {
                min_pong_node = this;
                min_pong = this->pong_received;
            }
        }
        if (min_pong_node) {
            serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
            clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
        }
    }

Cluster topology

Key 划分模式

  • 集群的键空间被分成16384个槽位
  • crc16计算key hash
  • HASH_SLOT = CRC16(key) mod 16384
/* -----------------------------------------------------------------------------
 * Key space handling
 * -------------------------------------------------------------------------- */

/* We have 16384 hash slots. The hash slot of a given key is obtained
 * as the least significant 14 bits of the crc16 of the key.
 *
 * However if the key contains the {...} pattern, only the part between
 * { and } is hashed. This may be useful in the future to force certain
 * keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */

    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;

    /* No '{' ? Hash the whole key. This is the base case. */
    if (s == keylen) return crc16(key,keylen) & 0x3FFF;

    /* '{' found? Check if we have the corresponding '}'. */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;

    /* No '}' or nothing between {} ? Hash the whole key. */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

    /* If we are here there is both a { and a } on its right. Hash
     * what is in the middle between { and }. */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}
uint16_t crc16(const char *buf, int len) {
    int counter;
    uint16_t crc = 0;
    for (counter = 0; counter < len; counter++)
            crc = (crc<<8) ^ crc16tab[((crc>>8) ^ *buf++)&0x00FF];
    return crc;
}

命令处理

normal command

redirection command

由于负载均衡或是节点故障而导致数据迁移时,请求重定向是不可避免的

  • MOVED Redirection - 负载均衡

clientsCron-> clientsCronHandleTimeout-> 定时处理被blocked clients clusterRedirectBlockedClientIfNeeded-> clusterRedirectClient函数处理

根据error_code == moved处理重定向

void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
    ......
    else if (error_code == CLUSTER_REDIR_MOVED ||
               error_code == CLUSTER_REDIR_ASK)
    {
        addReplySds(c,sdscatprintf(sdsempty(),
            "-%s %d %s:%d\r\n",
            (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
            hashslot,n->ip,n->port));
    } else {
        serverPanic("getNodeByQuery() unknown error.");
    }
    ......
}

一个 Redis 客户端可以向集群中的每个节点发送查询,包括副本节点。节点会分析查询,并且如果查询是可接受的(也就是说,查询只涉及一个键,或者涉及的多个键都属于同一个哈希槽),它将查找负责所涉及键的哈希槽的节点

如果slot由该节点服务,则查询将被简单处理,否则节点将检查其内部的哈希槽到节点映射,并向客户端回复一个 MOVED 错误:

GET x

-MOVED 3999 127.0.0.1:6381

3999 表示 slot位置

client需要重新向127.0.0.1:6381请求

在重定向之前,client被blocked 直至收到err重定向

也就是说,频繁更换slot将影响client操作延时,尽可能保持slot稳定,减少moved ,可以减少客户端延时

  • ASKED Redirection
  • Live reconfiguration

Redis Cluster 支持在集群运行时添加和移除节点的能力。添加或移除节点被抽象为相同的操作:将哈希槽从一个节点移动到另一个节点。这意味着可以使用相同的基本机制来重新平衡集群、添加或移除节点等

要重新平衡集群,需要在节点之间移动一组给定的slots

实现的核心是移动哈希槽的能力。从实际角度来看,哈希槽只是一组键。

因此 Redis Cluster 在重新分片期间实际上是将键从一个实例移动到另一个实例。移动一个slot意味着移动所有哈希到该slot的键