redis源代码分析24–VM(中)
VM根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。 这一节主要介绍阻塞方式。 redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 v
VM根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。
这一节主要介绍阻塞方式。
redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 value(vmSwapOneObjectBlocking函数)。除此之外,redis只在serverCron函数(该函数事件处理章节分析过)中换出value。我们来看看serverCron中的处理代码,阻塞方式使用函数vmSwapOneObjectBlocking换出value,多线程方式使用函数vmSwapOneObjectThreaded换出value。
static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { --- /* Swap a few keys on disk if we are over the memory limit and VM * is enbled. Try to free objects from the free list first. */ if (vmCanSwapOut()) { while (server.vm_enabled && zmalloc_used_memory() > server.vm_max_memory) { --- if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue; retval = (server.vm_max_threads == 0) ? vmSwapOneObjectBlocking() : vmSwapOneObjectThreaded(); --- } } --- return 100; }
无论是阻塞方式vmSwapOneObjectBlocking换出value,还是多线程方式vmSwapOneObjectThreaded换出value,最终都调用vmSwapOneObject(调用参数不一样)来换出value。
vmSwapOneObject会对每个db,随机选择5项,计算它的swappability,然后如果是多线程方式,则调用vmSwapObjectThreaded来换出value,否则使用vmSwapObjectBlocking换出value。
static int vmSwapOneObject(int usethreads) { int j, i; struct dictEntry *best = NULL; double best_swappability = 0; redisDb *best_db = NULL; robj *key, *val; for (j = 0; j dict) == 0) continue; for (i = 0; i dict); key = dictGetEntryKey(de); val = dictGetEntryVal(de); /* Only swap objects that are currently in memory. * * Also don't swap shared objects if threaded VM is on, as we * try to ensure that the main thread does not touch the * object while the I/O thread is using it, but we can't * control other keys without adding additional mutex. */ if (key->storage != REDIS_VM_MEMORY || (server.vm_max_threads != 0 && val->refcount != 1)) { if (maxtries) i--; /* don't count this try */ continue; } val->vm.atime = key->vm.atime; /* atime is updated on key object */ swappability = computeObjectSwappability(val); if (!best || swappability > best_swappability) { best = de; best_swappability = swappability; best_db = db; } } } if (best == NULL) return REDIS_ERR; key = dictGetEntryKey(best); val = dictGetEntryVal(best); redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f", key->ptr, best_swappability); /* Unshare the key if needed */ if (key->refcount > 1) { robj *newkey = dupStringObject(key); decrRefCount(key); key = dictGetEntryKey(best) = newkey; } /* Swap it */ if (usethreads) { vmSwapObjectThreaded(key,val,best_db); return REDIS_OK; } else { if (vmSwapObjectBlocking(key,val) == REDIS_OK) { dictGetEntryVal(best) = NULL; return REDIS_OK; } else { return REDIS_ERR; } } }
vmSwapObjectBlocking会在计算所需的交换页后,阻塞性的将value写到vm文件中(函数vmWriteObjectOnSwap),最后标记相应vm页为已使用。
static int vmSwapObjectBlocking(robj *key, robj *val) { off_t pages = rdbSavedObjectPages(val,NULL); off_t page; assert(key->storage == REDIS_VM_MEMORY); assert(key->refcount == 1); if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR; if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR; key->vm.page = page; key->vm.usedpages = pages; key->storage = REDIS_VM_SWAPPED; key->vtype = val->type; decrRefCount(val); /* Deallocate the object from memory. */ vmMarkPagesUsed(page,pages); redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)", (unsigned char*) key->ptr, (unsigned long long) page, (unsigned long long) pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; return REDIS_OK; }
对于value的加载,如果是多线程方式,会使用blockClientOnSwappedKeys提前加载,但阻塞方式则只有到相应命令执行时才会加载。最终无论是阻塞方式还是多线程方式,都会调用lookupKey来查找key是否在内存中,若不在,则使用vmLoadObject加载value,该函数是阻塞式的读入value。
static robj *lookupKey(redisDb *db, robj *key) { dictEntry *de = dictFind(db->dict,key); if (de) { robj *key = dictGetEntryKey(de); robj *val = dictGetEntryVal(de); if (server.vm_enabled) { if (key->storage == REDIS_VM_MEMORY || key->storage == REDIS_VM_SWAPPING) { /* If we were swapping the object out, stop it, this key * was requested. */ if (key->storage == REDIS_VM_SWAPPING) vmCancelThreadedIOJob(key); /* Update the access time of the key for the aging algorithm. */ key->vm.atime = server.unixtime; } else { int notify = (key->storage == REDIS_VM_LOADING); /* Our value was swapped on disk. Bring it at home. */ redisAssert(val == NULL); val = vmLoadObject(key); dictGetEntryVal(de) = val; /* Clients blocked by the VM subsystem may be waiting for * this key... */ if (notify) handleClientsBlockedOnSwappedKey(db,key); } } return val; } else { return NULL; } }
原文地址:redis源代码分析24–VM(中), 感谢原作者分享。