Redis源码 - Expire 策略

本文主要想聊聊key 的Expire策略

基于redis 4.x版本source code

希望通过本文能了解如下内容:

  • Redis 何时进行key的淘汰
  • 如何在淘汰大规模数据的过程中不影响main thread处理客户端请求
  • Redis如何处理较大key的淘汰
  • 从库会主动淘汰过期数据吗?
  • 了解了redis expire的机制后,使用者如何避免大规模的数据过期带来的影响?

Expire 检查时机

命令触发检查

  • expireIfNeeded

    • ttlCommand/pttlCommand
    • typeCommand
    • lookupKeyRead/lookupKeyWrite 大多数读写命令都会触发
    • delCommand
    • unlinkCommand
    • renameCommand
    • existsCommand
    • expireCommand
    • expireatCommand
    • pexpireatCommand
    • persistCommand
    • ......
int expireIfNeeded(redisDb *db, robj *key) {
    mstime_t when = getExpire(db,key);
    mstime_t now;

    if (when < 0) return 0; /* No expire for this key */

    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;

    now = server.lua_caller ? server.lua_time_start : mstime();
    if (server.masterhost != NULL) return now > when;

    /* Return when this key has not expired */
    if (now <= when) return 0;

    /* Delete the key */
    server.stat_expiredkeys++;
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}

/* Propagate expires into slaves and the AOF file.
 * When a key expires in the master, a DEL operation for this key is sent
 * to all the slaves and the AOF file if enabled.
 *
 * This way the key expiry is centralized in one place, and since both
 * AOF and the master->slave link guarantee operation ordering, everything
 * will be consistent even if we allow write operations against expiring
 * keys. */
void propagateExpire(redisDb *db, robj *key, int lazy) {
    robj *argv[2];

    argv[0] = lazy ? shared.unlink : shared.del;
    argv[1] = key;
    incrRefCount(argv[0]);
    incrRefCount(argv[1]);
    // aof 存储,del/unlink 命令入库
    if (server.aof_state != AOF_OFF)
        feedAppendOnlyFile(server.delCommand,db->id,argv,2);
    // 同步 del/unlink 命令到从库
    replicationFeedSlaves(server.slaves,db->id,argv,2);

    decrRefCount(argv[0]);
    decrRefCount(argv[1]);
}
  • maxmemory 淘汰 前文已经讨论过,达到maxmemory,每次命令执行会触发淘汰机制,多种策略Rand/近似LRU/近似LFU

事件触发

I/O事件触发,处理事件前,触发快速检查

int main(int argc, char **argv) {
    ......
    aeSetBeforeSleepProc(server.el,beforeSleep);
    ......
    aeMain(server.el);
    ......
}

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
/* This function gets called every time Redis is entering the
 * main loop of the event driven library, that is, before to sleep
 * for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);

    /* Call the Redis Cluster before sleep function. Note that this function
     * may change the state of Redis Cluster (from ok to fail or vice versa),
     * so it's a good idea to call it before serving the unblocked clients
     * later in this function. */
    if (server.cluster_enabled) clusterBeforeSleep();

    /* Run a fast expire cycle (the called function will return
     * ASAP if a fast cycle is not needed). */
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
    ......
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
 * will use few CPU cycles if there are few expiring keys, otherwise
 * it will get more aggressive to avoid that too much memory is used by
 * keys that can be removed from the keyspace.
 *
 * No more than CRON_DBS_PER_CALL databases are tested at every
 * iteration.
 *
 * This kind of call is used when Redis detects that timelimit_exit is
 * true, so there is more work to do, and we do it more incrementally from
 * the beforeSleep() function of the event loop.
 *
 * Expire cycle type:
 *
 * If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
 * "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
 * microseconds, and is not repeated again before the same amount of time.
 *
 * If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
 * executed, where the time limit is a percentage of the REDIS_HZ period
 * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */

void activeExpireCycle(int type) {
    //不在这里展开
    ......
}

/*-----------------------------------------------------------------------------
 * Incremental collection of expired keys.
 *
 * When keys are accessed they are expired on-access. However we need a
 * mechanism in order to ensure keys are eventually removed when expired even
 * if no access is performed on them.
 *----------------------------------------------------------------------------*/

/* Helper function for the activeExpireCycle() function.
 * This function will try to expire the key that is stored in the hash table
 * entry 'de' of the 'expires' hash table of a Redis database.
 *
 * If the key is found to be expired, it is removed from the database and
 * 1 is returned. Otherwise no operation is performed and 0 is returned.
 *
 * When a key is expired, server.stat_expiredkeys is incremented.
 *
 * The parameter 'now' is the current time in milliseconds as is passed
 * to the function to avoid too many gettimeofday() syscalls. */
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
    long long t = dictGetSignedIntegerVal(de);
    if (now > t) {
        sds key = dictGetKey(de);
        robj *keyobj = createStringObject(key,sdslen(key));

        propagateExpire(db,keyobj,server.lazyfree_lazy_expire);
        if (server.lazyfree_lazy_expire)
            dbAsyncDelete(db,keyobj);
        else
            dbSyncDelete(db,keyobj);
        notifyKeyspaceEvent(NOTIFY_EXPIRED,
            "expired",keyobj,db->id);
        decrRefCount(keyobj);
        server.stat_expiredkeys++;
        return 1;
    } else {
        return 0;
    }
}
  • activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST) - beforeSleep 调用activeExpireCycle,需要快速执行,防止处理事件的latency

  • activeExpireCycle 将尝试使存储在 Redis 数据库redisDb的 'expires' dict中的键过期

  • 有了上面的命令触发淘汰过期key,为什么需要在事件循环时处理过期key?

    • 需要一种机制来确保,在访问key时未被淘汰的数据,最终也会被移除

    • 追问,那定时任务是不是也可以?

      • 是的,👇将讨论

定时检查

initServer时设置, serverCron定时触发databasesCron,最终activeExpireCycle进行key的过期淘汰

  • 可写的从库,会执行expireSlaveKeys,淘汰自身的过期keys
/* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
/* This is our timer interrupt, called server.hz times per second.
 * Here is where we do a number of things that need to be done asynchronously.
 * For instance:
 *
 * - Active expired keys collection (it is also performed in a lazy way on
 *   lookup).
 * - Software watchdog.
 * - Update some statistic.
 * - Incremental rehashing of the DBs hash tables.
 * - Triggering BGSAVE / AOF rewrite, and handling of terminated children.
 * - Clients timeout of different kinds.
 * - Replication reconnection.
 * - Many more...
 *
 * Everything directly called here will be called server.hz times per second,
 * so in order to throttle execution of things we want to do less frequently
 * a macro is used: run_with_period(milliseconds) { .... }
 */

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData){
    ......
        /* Handle background operations on Redis databases. */
    databasesCron();
    ......
}
/* This function handles 'background' operations we are required to do
 * incrementally in Redis databases, such as active key expiring, resizing,
 * rehashing. */
void databasesCron(void) {
    /* Expire keys by random sampling. Not required for slaves
     * as master will synthesize DELs for us. */
    if (server.active_expire_enabled && server.masterhost == NULL) {
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
    } else if (server.masterhost != NULL) {
        expireSlaveKeys();
    }
    ......
}

activeExpireCycle为什么选择增量式处理?

无聊事件轮询还是定时触发activeExpireCycle,频率都比较高

redis 主逻辑在主线程中实现,要保证不能影响主业务前提下,检查过期数据,不能太影响系统性能,因此redis选择增量式处理

activeExpireCycle函数主要三方面进行限制:

  • 检查时间限制
  • 过期数据检查数量限制
  • 过期数据是否达到可接受比例

activeExpireCycle 检查有“快速”和“慢速”两种

  • 定时任务检查属于慢速类型
  • 事件轮训属于快速类型,因为触发频率更高

activeExpireCycle几个重要变量:

// effort,默认 1,也就是遍历过期字典的力度,力度越大,遍历数量越多,但是性能损耗更多。 effort = server.active_expire_effort-1, / Rescale from 0 to 9. / // 每次循环遍历键值个数。力度越大,遍历个数越多。 config_keys_per_loop = ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP + ACTIVE_EXPIRE_CYCLE_KEYS_PER_LOOP/4effort, // 快速遍历时间范围,力度越大,给予遍历时间越多。 config_cycle_fast_duration = ACTIVE_EXPIRE_CYCLE_FAST_DURATION + ACTIVE_EXPIRE_CYCLE_FAST_DURATION/4effort, // 慢速遍历检查时间片 config_cycle_slow_time_perc = ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC + 2effort, // 已经到期数据 / 检查数据 比例。达到可以接受的比例。 config_cycle_acceptable_stale = ACTIVE_EXPIRE_CYCLE_ACCEPTABLE_STALE- effort; timelimit = config_cycle_slow_time_perc1000000/server.hz/100

约束每次检查的时间,防止时间过长timelimit 约束每次检查的范围,防止数据过多config_cycle_acceptable_stale

Expire数据删除

  • 先从redisDb->expire中清除key

  • 从redisDb->dict中清除key

  • 最后释放内存空间

    • 最终dictGenericDelete 函数,是通过分别调用 dictFreeKey、dictFreeVal 和 zfree 三个函数来释放 key、value 和键值对对应哈希项这三者占用的内存空间的
  • 分为同步和异步删除

同步删除

整个过程比较简单

/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

异步 - 惰性删除

异步删除过程:

  • redisDb->expire中清除key

  • redisDb->dict中清除key

  • 计算释放内寸的effort:lazyfreeGetFreeEffort,如果开销较小,dbAsyncDelete 函数就直接在主 IO 线程中进行同步删除了。否则的话,dbAsyncDelete 函数会创建惰性删除任务,并交给后台线程来完成

    • 删除开销的评估逻辑很简单,就是根据要删除的键值对的类型,来计算删除开销。当键值对类型属于 List、Hash、Set 和 Sorted Set 这四种集合类型中的一种,并且没有使用紧凑型内存结构来保存的话,那么,这个键值对的删除开销就等于集合中的元素个数。否则的话,删除开销就等于 1
  • 大于LAZYFREE_THRESHOLD (default 64)时,执行 bioCreateBackgroundJob

    • 函数首先分配了一个 struct bio_job list
    • 它获取了与任务类型对应的互斥锁 bio_mutex[type]
    • 将任务添加到对应类型的任务列表 bio_jobs[type] 的末尾
    • 递增了对应类型的待处理任务计数器 bio_pending[type]
    • 发送了一个信号通知有新任务加入, 相当于唤醒bio线程处理
    • 最后释放了互斥锁
  • 否则dictFreeUnlinkedEntry 释放空间

/* Delete a key, value, and associated expiration entry if any, from the DB.
 * If there are enough allocations to free the value object may be put into
 * a lazy free list instead of being freed synchronously. The lazy free list
 * will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    /* If the value is composed of a few allocations, to free in a lazy way
     * is actually just slower... So under a certain limit we just free
     * the object synchronously. */
    dictEntry *de = dictUnlink(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        size_t free_effort = lazyfreeGetFreeEffort(val);

        /* If releasing the object is too much work, let's put it into the
         * lazy free list. */
        if (free_effort > LAZYFREE_THRESHOLD) {
            atomicIncr(lazyfree_objects,1);
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            dictSetVal(db->dict,de,NULL);
        }
    }

    /* Release the key-val pair, or just the key if we set the val
     * field to NULL in order to lazy free it later. */
    if (de) {
        dictFreeUnlinkedEntry(db->dict,de);
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

tatic pthread_t bio_threads[BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
static list *bio_jobs[BIO_NUM_OPS];
static unsigned long long bio_pending[BIO_NUM_OPS];

void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
    struct bio_job *job = zmalloc(sizeof(*job));

    job->time = time(NULL);
    job->arg1 = arg1;
    job->arg2 = arg2;
    job->arg3 = arg3;
    pthread_mutex_lock(&bio_mutex[type]);
    listAddNodeTail(bio_jobs[type],job);
    bio_pending[type]++;
    pthread_cond_signal(&bio_newjob_cond[type]);
    pthread_mutex_unlock(&bio_mutex[type]);
}

void *bioProcessBackgroundJobs(void *arg) {
    ......
    while(1) {
        listNode *ln;

        /* The loop always starts with the lock hold. */
        if (listLength(bio_jobs[type]) == 0) {
            pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
            continue;
        }
        /* Pop the job from the queue. */
        ln = listFirst(bio_jobs[type]);
        job = ln->value;
        /* It is now possible to unlock the background system as we know have
         * a stand alone job structure to process.*/
        pthread_mutex_unlock(&bio_mutex[type]);

        /* Process the job accordingly to its type. */
        if (type == BIO_CLOSE_FILE) {
            close((long)job->arg1);
        } else if (type == BIO_AOF_FSYNC) {
            aof_fsync((long)job->arg1);
        } else if (type == BIO_LAZY_FREE) {
            /* What we free changes depending on what arguments are set:
             * arg1 -> free the object at pointer.
             * arg2 & arg3 -> free two dictionaries (a Redis DB).
             * only arg3 -> free the skiplist. */
            if (job->arg1)
                lazyfreeFreeObjectFromBioThread(job->arg1);
            else if (job->arg2 && job->arg3)
                lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
            else if (job->arg3)
                lazyfreeFreeSlotsMapFromBioThread(job->arg3);
        } else {
            serverPanic("Wrong job type in bioProcessBackgroundJobs().");
        }
        zfree(job);
    ......
}

同步与异步删除的选择

Redis del命令时阻塞执行。这意味着服务器停止处理新命令,以同步方式回收与对象关联的所有内存

  • 如果被删除的键与一个小对象关联,执行DEL命令所需的时间非常短,可以与Redis中大多数其他O(1)或O(log_N)命令相媲美
  • 如果键与包含数百万元素的聚合值关联,则服务器可能会阻塞很长时间(甚至几秒钟)才能完成操作

出于上述原因,Redis还提供了非阻塞删除原语,例如UNLINK以及FLUSHALL和FLUSHDB命令的ASYNC选项

  • 这些命令在后台回收内存。在恒定时间内执行
  • 另一个线程将尽可能快地在后台逐步释放对象

DEL、UNLINK和FLUSHALL、FLUSHDB的ASYNC选项由用户控制

Redis服务器有时还必须在执行其他操作的副作用中删除键或刷新整个数据库

具体来说,Redis会在以下情况下独立于用户调用而删除对象:

  • 在eviction时,由于maxmemory和maxmemory策略配置,为了为新数据腾出空间,而不超过指定的内存限制
  • 因为到期:当具有关联生存时间的键(请参阅EXPIRE命令)必须从内存中删除时
  • 因为命令的副作用,该命令存储数据在可能已经存在的键上。例如,RENAME命令可能会在用另一个键替换旧键内容时删除旧键内容。类似地,SUNIONSTORE或带有STORE选项的SORT可能会删除现有键。SET命令本身会删除指定键的任何旧内容,以便用指定字符串替换它
  • 在复制过程中,当从库与其主服务器执行完全重新同步时,整个数据库的内容将被删除,以加载刚刚传输的RDB文件

在所有上述情况下,默认情况是以阻塞方式删除对象,就像调用DEL一样。但是,您可以针对每种情况进行特定配置,以便以非阻塞方式释放内存,就像调用UNLINK一样,使用以下配置指令:

  • lazyfree-lazy-eviction no eviction时的数据删除场景
  • lazyfree-lazy-expire no 过期 key 的删除场景
  • lazyfree-lazy-server-del no 进行删除操作的 server 命令执行场景
  • slave-lazy-flush no 从节点完成全量同步后,删除原有旧数据的场景

总结

现在可以回答篇首的几个问题:

  • Redis 何时进行key的淘汰?

    • 命令执行触发/事件轮询触发/定时触发
  • 如何在淘汰数据的过程中不影响main thread处理客户端请求,即减少blocked时间?

    • 开启惰性删除(异步删除)
    • 分而治之,Redis选择在不同的时机触发过期淘汰key,避免一次性过多的处理数据,减少对后续命令的阻塞
    • 每次触发淘汰数据都是有限的,activeExpireCycleTryExpire 是一个增量式淘汰数据的函数
  • Redis如何处理较大的key的淘汰?

    • Redis不会主动处理,需要开启lazyfree相关配置
  • 从库会淘汰过期数据吗?

    • 会,如果从库是可写的时候
  • 了解了redis expire的机制后,如何避免大规模的数据过期带来的影响?

    • ttl 可以考虑加入random,避免同时过期带来的影响