Redis源码 - 持久化处理

开门见山,本文想重点聊聊以下内容:

  • 持久化的时机,同步还是异步?
  • 异步持久化遇到server退出如何处理
  • 持久化文件大了怎么保证性能?
  • Redis 有哪些机制保障数据的可靠性存储?
  • RDB/AOF 各有哪些优缺点?
  • AOF重写时,新数据如何处理?

出发点是,假设你需要实现一个带有存储的cache服务,你会如何做?

持久化的时机

redis 有两种持久化方式,分别为:aof 和 rdb,默认开启 rdb

如果需求开启aof

  • redis.conf : appendonly on

先聊聊RDB

Redis RDB机制,先将内存快照保存至temp-pid.rdb文件,再copy至dump.rdb

RDB(point-in-time snapshot)无法保证数据不丢失,仅对可靠性提供弱支持,这是在性能要求下做的牺牲

优点:

  • 实现简单,无update操作
  • 可以用于独立分析,比如大key
  • 提供故障恢复能力

缺点:

  • 无法提供完善地可靠性保证
  • 不支持检索功能
  • RDB 需要经常调用 fork() 函数,以便使用子进程将数据持久化到磁盘上。如果数据集很大,fork() 可能会非常耗时,并且可能导致 Redis 停止服务客户端几毫秒甚至在数据集非常大且 CPU 性能不佳的情况下长达一秒钟

RDB 异步持久化

  • serverCron 定时触发
  • bgsaveCommand 命令触发

rdbSaveBackground,主进程通过 fork 子进程异步进行 childpid = fork()

int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;

    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);
    openChildInfoPipe();

    start = ustime();
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-rdb-bgsave");
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        }
        exitFromChild((retval == C_OK) ? 0 : 1);
    } else {
        /* Parent */
        ......
        return C_OK;
    }
    return C_OK; /* unreached */
}

异步触发的频率

  1. save 900 1
  2. save 300 10
  3. save 60 10000
// rdb 定期存盘参数
struct saveparam {
    time_t seconds; // 时间间隔
    int changes;    // 修改次数
};

可见,异步持久化,无法保证不丢失数据

RDB 同步持久化

rdbSave

命令触发

  • saveCommand
  • flushallCommand
  • shutdownCommand
  • kill signal 进程退出触发prepareForShutdown->rdbSave
/* Save the DB on disk. Return C_ERR on error, C_OK on success. */
int rdbSave(char *filename, rdbSaveInfo *rsi) {
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp;
    rio rdb;
    int error = 0;

    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Failed opening the RDB file %s (in server root dir %s) "
            "for saving: %s",
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        return C_ERR;
    }

    rioInitWithFile(&rdb,fp);
    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) {
        errno = error;
        goto werr;
    }

    // fflush 是 libc 提供的方法,调用 write 函数写到磁盘-其实是写到内核的缓冲区
    // fsync 是系统提供的系统调用,把内核缓冲刷到磁盘上
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    if (rename(tmpfile,filename) == -1) {
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        serverLog(LL_WARNING,
            "Error moving temp DB file %s on the final "
            "destination %s (in server root dir %s): %s",
            tmpfile,
            filename,
            cwdp ? cwdp : "unknown",
            strerror(errno));
        unlink(tmpfile);
        return C_ERR;
    }

    serverLog(LL_NOTICE,"DB saved on disk");
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = C_OK;
    return C_OK;

werr:
    serverLog(LL_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;
}

同步持久化会保证数据不丢吗?

  • 一样不会,因为时命令触发,在两次同步间隔时间出现故障将丢失数据 大量数据持久化,如果减少对系统的影响?

  • rdb-save-incremental-fsync yes 增量刷新到磁盘 Redis 5.0开始提供

    • REDIS_AUTOSYNC_BYTES,缓存刷新到磁盘fsync
void rioSetAutoSync(rio *r, off_t bytes) {
    if (r->write != rioFileIO.write) return;
    r->io.file.autosync = bytes;
}

/* Returns 1 or 0 for success/failure. */
static size_t rioFileWrite(rio *r, const void *buf, size_t len) {
    size_t retval;

    retval = fwrite(buf,len,1,r->io.file.fp);
    r->io.file.buffered += len;

    if (r->io.file.autosync &&
        r->io.file.buffered >= r->io.file.autosync)
    {
        fflush(r->io.file.fp);
        redis_fsync(fileno(r->io.file.fp));
        r->io.file.buffered = 0;
    }
    return retval;
}

数据读取

redis 程序启动,从磁盘 rdb 文件加载数据到内存

int main(int argc, char **argv) {
    ...
    if (!server.sentinel_mode) {
        loadDataFromDisk();
    }
}

/* flags on the purpose of rdb save or load */
#define RDBFLAGS_NONE 0
#define RDBFLAGS_AOF_PREAMBLE (1<<0)
#define RDBFLAGS_REPLICATION (1<<1)

/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == AOF_ON) {
        if (loadAppendOnlyFile(server.aof_filename) == C_OK)
            ......
    } else {
        rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
        if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
            ......
        }
    }
    ......
}

AOF持久化

针对rdb非完整可靠性支持,AOF提供了更高的可靠性保证

优点:

  • AOF的持久化方式采用append方式,同样不会修改已有数据,只会追加,减少I/O的随机读取,没有seek
  • 使用 AOF 使得 Redis 更加持久化:有不同的 fsync 策略:appendfsync no,appendfsync everysec,每个查询 fsync 一次。采用默认策略,即每秒 fsync 一次,写入性能仍然很好。fsync 通过后台线程执行,而主线程会努力在没有进行 fsync 的时候执行写入操作,因此您最多只会丢失一秒钟的写入数据
  • Redis 能够在 AOF 文件变得过大时自动在后台重写它。重写过程是完全安全的,因为在 Redis 继续向旧文件追加数据的同时,会生成一个全新的文件,其中包含创建当前数据集所需的最小操作集合,一旦这个新文件准备就绪,Redis 就会切换这两个文件,并开始向新文件追加数据
  • AOF 包含了一个易于理解和解析的格式的所有操作日志。可以轻松导出 AOF 文件。例如,即使不小心使用 FLUSHALL 命令清空了所有数据,只要在此期间没有执行日志的重写,仍然可以通过停止服务器、移除最后的命令,并再次启动 Redis 来保存数据

缺点:

  • AOF 通常比RDB文件大很多
  • 由于AOF更频繁的fsync(系统调用),RDB(异步)提供更好的性能保证

持久化时机

  • 写命令触发

  • 先写入到全局的aof buf

  • 将buf写入磁盘时机

    • serverCron 定时将buf写入文件
    • beforeSleep 每次eventloop
    • prepareForShutdown/stopAppendOnly
  • fsync策略

    • 如果appendfsync everysec 将异步执行,添加任务进入job,bio线程异步处理
    • 如果appendfsync always 同步写入fsync,非常影响性能,但安全
    • 如果appendfsync no 不实用fsync,性能最好,最不安全
void call(client *c, int flags) 
// 需要append到aof buffer
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)
{
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        replicationFeedSlaves(server.slaves,dbid,argv,argc);
}

struct redisServer {
    ......
    sds aof_buf;      /* AOF buffer, written before entering the event loop */
    ......
}

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
    ......
    // 格式化为 redis 命令格式,然后追加到 aof  buf
    buf = catAppendOnlyGenericCommand(buf,argc,argv);
    ......
    /* Append to the AOF buffer. This will be flushed on disk just before
     * of re-entering the event loop, so before the client will get a
     * positive reply about the operation performed. */
    if (server.aof_state == AOF_ON)
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf));
    // 如果有子进程正在重写,父进程将新的数据发送给正在重写的子进程,使得重写文件数据更完备。
    if (server.aof_child_pid != -1)
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
    ......
}
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ......
    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    run_with_period(1000) {
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    }
    ......
}

/* Write the append only file buffer on disk.
 *
 * Since we are required to write the AOF before replying to the client,
 * and the only way the client socket can get a write is entering when the
 * the event loop, we accumulate all the AOF writes in a memory
 * buffer and write it on disk using this function just before entering
 * the event loop again.
 *
 * About the 'force' argument:
 *
 * When the fsync policy is set to 'everysec' we may delay the flush if there
 * is still an fsync() going on in the background thread, since for instance
 * on Linux write(2) will be blocked by the background fsync anyway.
 * When this happens we remember that there is some aof buffer to be
 * flushed ASAP, and will try to do that in the serverCron() function.
 *
 * However if force is set to 1 we'll write regardless of the background
 * fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force){
    ......
    if (sdslen(server.aof_buf) == 0) return;
    //需要换取当前是否有fsync任务
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0;

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
        /* With this append fsync policy we do background fsyncing.
         * If the fsync is still in progress we can try to delay
         * the write for a couple of seconds. */
         // 已经有fsync,delay处理
        if (sync_in_progress) {
            if (server.aof_flush_postponed_start == 0) {
                /* No previous write postponing, remember that we are
                 * postponing the flush and return. */
                server.aof_flush_postponed_start = server.unixtime;
                return;
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) {
                /* We were already waiting for fsync to finish, but for less
                 * than two seconds this is still ok. Postpone again. */
                return;
            }
    }
    ......
    latencyStartMonitor(latency);
    //将buf写入到内核缓冲区-系统调用write
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    latencyEndMonitor(latency);
    ......
    /* Perform the fsync if needed. */
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
        /* redis_fsync is defined as fdatasync() for Linux in order to avoid
         * flushing metadata. */
        latencyStartMonitor(latency);
        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */
        latencyEndMonitor(latency);
        latencyAddSampleIfNeeded("aof-fsync-always",latency);
        server.aof_last_fsync = server.unixtime;
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                server.unixtime > server.aof_last_fsync)) {
        if (!sync_in_progress) aof_background_fsync(server.aof_fd);
        server.aof_last_fsync = server.unixtime;
    }
}

flushAppendOnlyFile 核心逻辑两部分:

  • 调用write将server.aof_buf写入到文件

  • 根据条件选择是异步调用或者同步调用fsync保证数据刷到磁盘上

    • 根据配置,如果配置了AOF_FSYNC_ALWAYS, 就同步的调用redis_fsync宏来完成刷盘, 具体的工作是redis_fsync完成的
    • 如果配置了AOF_FSYNC_EVERYSEC,且没有正在运行的异步刷盘任务,就会调用aof_background_fsync来创建一个异步bio任务 (bioCreateBackgroundJob) 来完成刷盘
    • 如果配置了AOF_FSYNC_EVERYSEC 且已有fsync job在进行,delay处理

AOF重写

AOF重写的目的是缩小文件的大小,防止无限增长

实现 AOF 重写的函数是 rewriteAppendOnlyFileBackground

触发时机:

  • bgrewriteaofCommand ,需要send bgrewriteaof命令给server,属于手动触发

    • 前置条件:

      • 没有AOF重写的子进程正在执行
      • 没有创建 RDB 的子进程正在执行
  • configSetCommand ,手动触发,config set appendonly yes

    • 一旦 AOF 功能启用后,configSetCommand 函数就会调用 startAppendOnly 函数,执行一次 AOF 重写
  • restartAOFAfterSYNC 它会在主从节点的复制过程中被调用 留到复制文章里谈论

  • 老朋友 serverCron,serverCron 函数是被周期性执行的。然后它在执行的过程中,会做判断来决定是否执行 AOF 重写

    • 判断条件

      • AOF功能启用

      • 没有RDB子进程和AOF重写子进程在执行

      • AOF文件大小比例设定了阈值,以及AOF文件大小绝对值超出了阈值

        • auto-aof-rewrite-percentage AOF 文件大小超出基础大小的比例,默认值为 100%,即超出 1 倍大小
        • auto-aof-rewrite-min-size AOF 文件大小绝对值的最小值,默认为 64MB

运行机制: 异步

重写过程:

  • fork子进程,child调用rewriteAppendOnlyFile,主线程仍然可以处理客户端请求,没有阻塞

  • 创建 aof 临时文件

  • 子进程利用fork-on-write将数据存到 aof 临时文件:rioInitWithFile(&aof,fp)

  • 逐步将文件缓存刷新到磁盘

    • 根据aof_rewrite_incremental_fsync/REDIS_AUTOSYNC_BYTES
  • 根据配置,重写文件内容方式,rdb 或者 aof,aof 存储方式支持 rdb 和 aof 内容兼容在同一个 aof 文件

  • fflush

  • fsync

/* ----------------------------------------------------------------------------
 * AOF background rewrite
 * ------------------------------------------------------------------------- */

/* This is how rewriting of the append only file in background works:
 *
 * 1) The user calls BGREWRITEAOF
 * 2) Redis calls this function, that forks():
 *    2a) the child rewrite the append only file in a temp file.
 *    2b) the parent accumulates differences in server.aof_rewrite_buf.
 * 3) When the child finished '2a' exists.
 * 4) The parent will trap the exit code, if it's OK, will append the
 *    data accumulated into server.aof_rewrite_buf into the temp file, and
 *    finally will rename(2) the temp file in the actual file name.
 *    The the new file is reopened as the new append only file. Profit!
 */
int rewriteAppendOnlyFileBackground(void) {
    ......
    if ((childpid = fork()) == 0) {
        char tmpfile[256];

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) {
                serverLog(LL_NOTICE,
                    "AOF rewrite: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            }

            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    }
    ......
}
/* Write a sequence of commands able to fully rebuild the dataset into
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF.
 *
 * In order to minimize the number of commands needed in the rewritten
 * log Redis uses variadic commands when possible, such as RPUSH, SADD
 * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
 * are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename){
    ......
     snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
        return C_ERR;
    }

    server.aof_child_diff = sdsempty();
    rioInitWithFile(&aof,fp);

    if (server.aof_rewrite_incremental_fsync)
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);

    if (server.aof_use_rdb_preamble) {
        int error;
        if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) {
            errno = error;
            goto werr;
        }
    } else {
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    }

    /* Do an initial slow fsync here while the parent is still sending
     * data, in order to make the next final fsync faster. */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    ......
}

这里面有一个问题,既然是异步child进程完成,那rewrite期间,有新的命令进来如何处理?

Redis 利用 aof_rewrite_buf_blocks 列表处理这期间的新的写命令, item是aofrwblock

list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. */

/* ----------------------------------------------------------------------------
 * AOF rewrite buffer implementation.
 *
 * The following code implement a simple buffer used in order to accumulate
 * changes while the background process is rewriting the AOF file.
 *
 * We only need to append, but can't just use realloc with a large block
 * because 'huge' reallocs are not always handled as one could expect
 * (via remapping of pages at OS level) but may involve copying data.
 *
 * For this reason we use a list of blocks, every block is
 * AOF_RW_BUF_BLOCK_SIZE bytes.
 * ------------------------------------------------------------------------- */

#define AOF_RW_BUF_BLOCK_SIZE (1024*1024*10)    /* 10 MB per block */

typedef struct aofrwblock {
    unsigned long used, free;
    char buf[AOF_RW_BUF_BLOCK_SIZE];
} aofrwblock;
/* Append data to the AOF rewrite buffer, allocating new blocks if needed. */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) {
    ......
    if (len) { /* First block to allocate, or need another block. */
            int numblocks;

            block = zmalloc(sizeof(*block));
            block->free = AOF_RW_BUF_BLOCK_SIZE;
            block->used = 0;
            listAddNodeTail(server.aof_rewrite_buf_blocks,block);

            /* Log every time we cross more 10 or 100 blocks, respectively
             * as a notice or warning. */
            numblocks = listLength(server.aof_rewrite_buf_blocks);
            if (((numblocks+1) % 10) == 0) {
                int level = ((numblocks+1) % 100) == 0 ? LL_WARNING :
                                                         LL_NOTICE;
                serverLog(level,"Background AOF buffer size: %lu MB",
                    aofRewriteBufferSize()/(1024*1024));
            }
    }

    /* Install a file event to send data to the rewrite child if there is
     * not one already. */
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) {
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,
            AE_WRITABLE, aofChildWriteDiffData, NULL);
    }
}

/* Event handler used to send data to the child process doing the AOF
 * rewrite. We send pieces of our AOF differences buffer so that the final
 * write when the child finishes the rewrite will be small. */
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) {
    listNode *ln;
    aofrwblock *block;
    ssize_t nwritten;
    UNUSED(el);
    UNUSED(fd);
    UNUSED(privdata);
    UNUSED(mask);

    while(1) {
        //从aof_rewrite_buf_blocks列表中取出数据
        ln = listFirst(server.aof_rewrite_buf_blocks);
        block = ln ? ln->value : NULL;
        if (server.aof_stop_sending_diff || !block) {
            aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,
                              AE_WRITABLE);
            return;
        }
        if (block->used > 0) {
            //调用write将数据块写入主进程和重写子进程间的管道,后面会详细解释
            nwritten = write(server.aof_pipe_write_data_to_child,
                             block->buf,block->used);
            if (nwritten <= 0) return;
            memmove(block->buf,block->buf+nwritten,block->used-nwritten);
            block->used -= nwritten;
            block->free += nwritten;
        }
        if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
    }
}

aofrwblock 结构体就相当于是一个 10MB 的数据块,记录了 AOF 重写期间主进程收到的命令

aof_rewrite_buf_blocks 列表负责将这些数据块连接起来

父进程写入过程:

  • 当 feedAppendOnlyFile 执行append操作时,且server.aof_child_pid != -1 (说明有子进程在重写)
  • aofRewriteBufferAppend 函数被调用,它会将命令data append到 aof_rewrite_buf_blocks 列表
  • 使用aeCreateFileEvent注册写事件,回调函数aofChildWriteDiffData完成
  • 当fd(server.aof_pipe_write_data_to_child)可写时 调用aofChildWriteDiffData取出数据,write to child pipe

子进程读取过程:

  • rewriteAppendOnlyFile进行重写时会调用aofReadDiffFromParent函数

    • rewriteAppendOnlyFileRio也会调用aofReadDiffFromParent
    • rdbSaveRio 这个函数是创建 RDB 文件的函数。当我们使用 AOF 和 RDB 混合持久化机制时,这个函数也会调用 aofReadDiffFromParent 函数
  • aofReadDiffFromParent 使用一个 64KB 大小的缓冲区,然后调用 read 函数,从fd(server.aof_pipe_read_data_from_parent)读取👆写入的数据

  • append到到全局变量 server.aof_child_diff 字符串中

  • rewriteAppendOnlyFile 在执行中将aof_child_diff中累积的操作写入AOF重写日志文件

子进程何时注册的读事件呢?

  • rewriteAppendOnlyFile 执行中过程中注册,见👇代码
  • aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1)
/* This function is called by the child rewriting the AOF file to read
 * the difference accumulated from the parent into a buffer, that is
 * concatenated at the end of the rewrite. */
ssize_t aofReadDiffFromParent(void) {
    char buf[65536]; /* Default pipe buffer size on most Linux systems. */
    ssize_t nread, total = 0;

    while ((nread =
            read(server.aof_pipe_read_data_from_parent,buf,sizeof(buf))) > 0) {
        server.aof_child_diff = sdscatlen(server.aof_child_diff,buf,nread);
        total += nread;
    }
    return total;
}

int rewriteAppendOnlyFile(char *filename) {
    ......
    int nodata = 0;
    mstime_t start = mstime();
    while(mstime()-start < 1000 && nodata < 20) {
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0) {
            nodata++;
            continue;
        }
        nodata = 0; /* Start counting from zero, we stop on N *contiguous*
                       timeouts. */
        aofReadDiffFromParent();
    }
}

至此完成父子进程间的数据同步

AOF数据读取

类似RDB文件读取,这里略

总结

开头的几个问题,相信都可以在上面的文字中找到答案

参考

Redis文档more

Redis source code 5.0.1