欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

redis源代码分析24–VM(中)

程序员文章站 2024-02-18 13:48:28
...

VM根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。 这一节主要介绍阻塞方式。 redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 v

VM根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。

这一节主要介绍阻塞方式。

redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 value(vmSwapOneObjectBlocking函数)。除此之外,redis只在serverCron函数(该函数事件处理章节分析过)中换出value。我们来看看serverCron中的处理代码,阻塞方式使用函数vmSwapOneObjectBlocking换出value,多线程方式使用函数vmSwapOneObjectThreaded换出value。

static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ---
    /* Swap a few keys on disk if we are over the memory limit and VM
     * is enbled. Try to free objects from the free list first. */
    if (vmCanSwapOut()) {
        while (server.vm_enabled && zmalloc_used_memory() >
                server.vm_max_memory)
        {
            ---
            if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
            retval = (server.vm_max_threads == 0) ?
                        vmSwapOneObjectBlocking() :
                        vmSwapOneObjectThreaded();
            ---
        }
    }
    ---
    return 100;
}

无论是阻塞方式vmSwapOneObjectBlocking换出value,还是多线程方式vmSwapOneObjectThreaded换出value,最终都调用vmSwapOneObject(调用参数不一样)来换出value。

vmSwapOneObject会对每个db,随机选择5项,计算它的swappability,然后如果是多线程方式,则调用vmSwapObjectThreaded来换出value,否则使用vmSwapObjectBlocking换出value。

static int vmSwapOneObject(int usethreads) {
    int j, i;
    struct dictEntry *best = NULL;
    double best_swappability = 0;
    redisDb *best_db = NULL;
    robj *key, *val;
    for (j = 0; j dict) == 0) continue;
        for (i = 0; i dict);
            key = dictGetEntryKey(de);
            val = dictGetEntryVal(de);
            /* Only swap objects that are currently in memory.
             *
             * Also don't swap shared objects if threaded VM is on, as we
             * try to ensure that the main thread does not touch the
             * object while the I/O thread is using it, but we can't
             * control other keys without adding additional mutex. */
            if (key->storage != REDIS_VM_MEMORY ||
                (server.vm_max_threads != 0 && val->refcount != 1)) {
                if (maxtries) i--; /* don't count this try */
                continue;
            }
            val->vm.atime = key->vm.atime; /* atime is updated on key object */
            swappability = computeObjectSwappability(val);
            if (!best || swappability > best_swappability) {
                best = de;
                best_swappability = swappability;
                best_db = db;
            }
        }
    }
    if (best == NULL) return REDIS_ERR;
    key = dictGetEntryKey(best);
    val = dictGetEntryVal(best);
    redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f",
        key->ptr, best_swappability);
    /* Unshare the key if needed */
    if (key->refcount > 1) {
        robj *newkey = dupStringObject(key);
        decrRefCount(key);
        key = dictGetEntryKey(best) = newkey;
    }
    /* Swap it */
    if (usethreads) {
        vmSwapObjectThreaded(key,val,best_db);
        return REDIS_OK;
    } else {
        if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
            dictGetEntryVal(best) = NULL;
            return REDIS_OK;
        } else {
            return REDIS_ERR;
        }
    }
}

vmSwapObjectBlocking会在计算所需的交换页后,阻塞性的将value写到vm文件中(函数vmWriteObjectOnSwap),最后标记相应vm页为已使用。

static int vmSwapObjectBlocking(robj *key, robj *val) {
    off_t pages = rdbSavedObjectPages(val,NULL);
    off_t page;
    assert(key->storage == REDIS_VM_MEMORY);
    assert(key->refcount == 1);
    if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
    if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR;
    key->vm.page = page;
    key->vm.usedpages = pages;
    key->storage = REDIS_VM_SWAPPED;
    key->vtype = val->type;
    decrRefCount(val); /* Deallocate the object from memory. */
    vmMarkPagesUsed(page,pages);
    redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
        (unsigned char*) key->ptr,
        (unsigned long long) page, (unsigned long long) pages);
    server.vm_stats_swapped_objects++;
    server.vm_stats_swapouts++;
    return REDIS_OK;
}

对于value的加载,如果是多线程方式,会使用blockClientOnSwappedKeys提前加载,但阻塞方式则只有到相应命令执行时才会加载。最终无论是阻塞方式还是多线程方式,都会调用lookupKey来查找key是否在内存中,若不在,则使用vmLoadObject加载value,该函数是阻塞式的读入value。

static robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key);
    if (de) {
        robj *key = dictGetEntryKey(de);
        robj *val = dictGetEntryVal(de);
        if (server.vm_enabled) {
            if (key->storage == REDIS_VM_MEMORY ||
                key->storage == REDIS_VM_SWAPPING)
            {
                /* If we were swapping the object out, stop it, this key
                 * was requested. */
                if (key->storage == REDIS_VM_SWAPPING)
                    vmCancelThreadedIOJob(key);
                /* Update the access time of the key for the aging algorithm. */
                key->vm.atime = server.unixtime;
            } else {
                int notify = (key->storage == REDIS_VM_LOADING);
                /* Our value was swapped on disk. Bring it at home. */
                redisAssert(val == NULL);
                val = vmLoadObject(key);
                dictGetEntryVal(de) = val;
                /* Clients blocked by the VM subsystem may be waiting for
                 * this key... */
                if (notify) handleClientsBlockedOnSwappedKey(db,key);
            }
        }
        return val;
    } else {
        return NULL;
    }
}