Redis源码剖析--列表t_list实现
Redis中的列表对象比较特殊,在版本3.2之前,列表底层的编码是 ziplist 和 linkedlist 实现的, 但是在版本3.2之后,重新引入了一个 quicklist 的数据结构,列表的底层都由quicklist实现。
这边是在看源码和实际验证的时候发现的区别,然后上网查证。由于目前使用的redis基本都在3.2了, 而且老版本肯定会被取代, 所以我们只分析3.2版本之后的实现。对于老版本的列表实现,一笔带过吧。
在老版本中,当列表对象可以同时满足以下两个条件时, 列表对象使用 ziplist 编码:
- 列表对象保存的所有字符串元素的长度都小于 64 字节;
- 列表对象保存的元素数量小于 512 个;
不能满足这两个条件的列表对象需要使用 linkedlist 编码。当这两个条件任何一个不满足的时候,就会有一个格式的转换。
对于quicklist的结构,下节中在具体分析,先来看下list的实现。
List的结构
/* Structure to hold list iteration abstraction. */
typedef struct {
robj *subject;
unsigned char encoding;
unsigned char direction; /* Iteration direction */
quicklistIter *iter;
} listTypeIterator;
/* Structure for an entry while iterating over a list. */
typedef struct {
listTypeIterator *li;
quicklistEntry entry; /* Entry in quicklist */
} listTypeEntry;
List的结构其实就是定义了一个列表的头节点, 以及一个迭代器指针,指针中指定了编码格式和迭代方向。
List命令
命令 | 说明 |
---|---|
BLPOP key1 [key2 ] timeout | 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
BRPOP key1 [key2 ] timeout | 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
BRPOPLPUSH source destination timeout | 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它;如但果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。 |
LINDEX key index | 通过索引获取列表中的元素 |
LINSERT key BEFORE | AFTER pivot value |
LLEN key | 获取列表长度 |
LPOP key | 移出并获取列表的第一个元素 |
LPUSH key value1 [value2] | 将一个或多个值插入到列表头部 |
LPUSHX key value | 将一个或多个值插入到已存在的列表头部 |
LRANGE key start stop | 获取列表指定范围内的元素 |
LREM key count value | 移除列表元素 |
LSET key index value | 通过索引设置列表元素的值 |
LTRIM key start stop | 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。 |
RPOP key | 移除并获取列表最后一个元素 |
RPOPLPUSH source destination | 移除列表的最后一个元素,并将该元素添加到另一个列表并返回 |
RPUSH key value1 [value2] | 在列表中添加一个或多个值 |
RPUSHX key value | 为已存在的列表添加值 |
List命令实现
push命令实现
lpush和rpush分别调用的下边的两个函数:
// lpush操作
void lpushCommand(client *c) {
pushGenericCommand(c,LIST_HEAD);
}
// rpush操作
void rpushCommand(client *c) {
pushGenericCommand(c,LIST_TAIL);
}
可以看到, 这两个操作其实都是调用的pushGenericCommand这个函数实现, 不同的点是指定了是从HEAD的位置push一个数据还是从TAIL的位置push一个数据。接下来来看pushGenericCommand的实现。
void pushGenericCommand(client *c, int where) {
int j, pushed = 0;
// 现在数据库中查找是否已经存在了该键
robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
// 如果已经存在了该键,验证该键是否是list类型,如果不是的话,返回错误
if (lobj && lobj->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
}
// 遍历剩余的参数
for (j = 2; j < c->argc; j++) {
// 如果该键不存在,创建一个quicklist
if (!lobj) {
lobj = createQuicklistObject();
quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
server.list_compress_depth);
// 将创建的键添加到对应的db
dbAdd(c->db,c->argv[1],lobj);
}
// 执行push操作
listTypePush(lobj,c->argv[j],where);
// 个数加1
pushed++;
}
// 返回添加的节点数量
addReplyLongLong(c, (lobj ? listTypeLength(lobj) : 0));
// 至少有一个添加成功则进行操作
if (pushed) {
char *event = (where == LIST_HEAD) ? "lpush" : "rpush";
// 发送键修改信号
signalModifiedKey(c->db,c->argv[1]);
// 发送事件通知
notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
}
// 服务器的脏数据个数增加
server.dirty += pushed;
}
内部最终还是调用了一个listTypePush的函数:
/* The function pushes an element to the specified list object 'subject',
* at head or tail position as specified by 'where'.
*
* There is no need for the caller to increment the refcount of 'value' as
* the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
// 判断类型是否为quciklist,不是的话返回错误
if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
// 解码数据
value = getDecodedObject(value);
// 得到数据的长度
size_t len = sdslen(value->ptr);
// 调用quicklistPush插入数据
quicklistPush(subject->ptr, value->ptr, len, pos);
// 将数据项对象的引用次数减1,也就是释放value
decrRefCount(value);
} else {
serverPanic("Unknown list encoding");
}
}
pop的操作其实跟push的操作很相似,比较容易读懂。List的操作比较有特点的一项是阻塞操作,可以来分析一下。
阻塞pop操作的实现
首先,一样是调用了最上层的两个函数:
// blpop
void blpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_HEAD);
}
// brpop
void brpopCommand(client *c) {
blockingPopGenericCommand(c,LIST_TAIL);
}
他们其实都是调用blockingPopGenericCommand来实现的。
/* Blocking RPOP/LPOP */
void blockingPopGenericCommand(client *c, int where) {
robj *o;
mstime_t timeout;
int j;
// 取出timeout参数
if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
!= C_OK) return;
// 遍历参数
for (j = 1; j < c->argc-1; j++) {
// 查看数据库中是否存在该键
o = lookupKeyWrite(c->db,c->argv[j]);
if (o != NULL) {
// 如果存在,判断该键是否是list类型,不是的话报错
if (o->type != OBJ_LIST) {
addReply(c,shared.wrongtypeerr);
return;
} else {
// 当前列表非空,直接执行pop操作
if (listTypeLength(o) != 0) {
/* Non empty list, this is like a non normal [LR]POP. */
char *event = (where == LIST_HEAD) ? "lpop" : "rpop";
robj *value = listTypePop(o,where);
serverAssert(value != NULL);
addReplyMultiBulkLen(c,2);
addReplyBulk(c,c->argv[j]);
addReplyBulk(c,value);
decrRefCount(value);
notifyKeyspaceEvent(NOTIFY_LIST,event,
c->argv[j],c->db->id);
// 如果当前key弹出一个值之后为空,删除这个列表
if (listTypeLength(o) == 0) {
dbDelete(c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",
c->argv[j],c->db->id);
}
signalModifiedKey(c->db,c->argv[j]);
server.dirty++;
/* Replicate it as an [LR]POP instead of B[LR]POP. */
rewriteClientCommandVector(c,2,
(where == LIST_HEAD) ? shared.lpop : shared.rpop,
c->argv[j]);
return;
}
}
}
}
/* If we are inside a MULTI/EXEC and the list is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
addReply(c,shared.nullmultibulk);
return;
}
/* If the list is empty or the key does not exists we must block */
// 参数中的所有键都不存在,则阻塞这些键
blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}
可以看到, 当指定的list存在于当前数据库中且list不为空,就会执行一次普通的pop操作;但是当指定的list键不存在,或者该list为空,就会阻塞该操作。就是上边代码中的最后一句。
接下去,就是看redis如何处理这个被阻塞的操作。
/* Set a client in blocking mode for the specified key, with the specified
* timeout */
// 设置键的阻塞状态
void blockForKeys(client *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
dictEntry *de;
list *l;
int j;
c->bpop.timeout = timeout;
c->bpop.target = target;
if (target != NULL) incrRefCount(target);
// 遍历所有的key
for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dict ignore it. */
// 如果当前键存在,则忽略;反之则添加该键
// bpop.keys记录所有造成客户端阻塞的键
if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
//当前的key引用计数加1
incrRefCount(keys[j]);
/* And in the other "side", to map keys -> clients */
// blocking_keys是一个字典,其键为造成阻塞的键,值是一个链表,记录所有被该键阻塞的客户端
// 查找当前造成阻塞的键
de = dictFind(c->db->blocking_keys,keys[j]);
if (de == NULL) {
// 如果不存在,需要新创建一个, 并加入到blocking_keys中
int retval;
/* For every key we take a list of clients blocked for it */
l = listCreate();
// 将键和新创建的列表加入
retval = dictAdd(c->db->blocking_keys,keys[j],l);
incrRefCount(keys[j]);
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
} else {
// 如果存在,获取该键的值,即客户端列表
l = dictGetVal(de);
}
// 将当前的客户端加入的该键的阻塞列表中
listAddNodeTail(l,c);
}
// 阻塞该客户端
blockClient(c,BLOCKED_LIST);
}
从上边的代码中,可以看到客户端分别用来c->bpop.xxxx 和 c->db->blocking_keys用来保存被阻塞的键,以及阻塞的键和客户端的对应关系。
他们的定义如下:
// server.h
typedef struct client {
//client当前使用的数据库
redisDb *db; /* Pointer to currently SELECTed DB. */
//阻塞状态
blockingState bpop; /* blocking state */
//其他成员省略
} client;
// 阻塞状态结构体
typedef struct blockingState {
mstime_t timeout; // 阻塞超时时间
dict *keys; // 记录所有造成客户端阻塞的键
robj *target; // 目标选项,target在执行RPOPLPUSH命令时使用,
/* BLOCKED_WAIT */
int numreplicas; /* Number of replicas we are waiting for ACK. */
long long reploffset; /* Replication offset to reach. */
} blockingState;
typedef struct redisDb {
dict *blocking_keys; // 记录所有造成阻塞的键,及其相应的客户端
// ...其他参数省略
} redisDb;
typedef struct redisDb {
//正处于阻塞状态的键
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
//可以解除阻塞的键
dict *ready_keys; /* Blocked keys that received a PUSH */
} redisDb;
redisDb里边利用了一个名为blocking_keys的dict来存储每个阻塞的键,以及等待该键的客户端的对应关系。
做完这些之后,这个客户端就被阻塞了。 那么这个客户端如何从阻塞状态重新回复到非阻塞状态呢。 一个当时是等待的时间超过了timeout的时间,从阻塞状态恢复; 另一个则是其他的客户端往这个列表中插入了数据,正好是当前阻塞的客户端所需要的,当前客户端收到信号之后,从阻塞状态中恢复。
根据我的理解, 如果是客户端超时被解阻塞,调用的是这个函数:
超时解阻塞
//解阻塞一个正在阻塞中的client
void unblockClientWaitingData(client *c) {
dictEntry *de;
dictIterator *di;
list *l;
serverAssertWithInfo(c,NULL,dictSize(c->bpop.keys) != 0);
//创建一个字典的迭代器,指向的是造成client阻塞的键所组成的字典
di = dictGetIterator(c->bpop.keys);
/* The client may wait for multiple keys, so unblock it for every key. */
//因为client可能被多个key所阻塞,所以要遍历所有的键
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de); //获得key对象
/* Remove this client from the list of clients waiting for this key. */
//根据key找到对应的列表类型值,值保存着被阻塞的client,从中找c->db->blocking_keys中寻找
l = dictFetchValue(c->db->blocking_keys,key);
serverAssertWithInfo(c,key,l != NULL);
// 将阻塞的client从列表中移除
listDelNode(l,listSearchKey(l,c));
/* If the list is empty we need to remove it to avoid wasting memory */
//如果当前列表为空了,则从c->db->blocking_keys中将key删除
if (listLength(l) == 0)
dictDelete(c->db->blocking_keys,key);
}
dictReleaseIterator(di); //释放迭代器
/* Cleanup the client structure */
//清空bpop.keys的所有节点
dictEmpty(c->bpop.keys,NULL);
//如果保存有新添加的元素,则应该释放
if (c->bpop.target) {
decrRefCount(c->bpop.target);
c->bpop.target = NULL;
}
}
如果是其他的客户端插入了一个数据,则是调用下边的函数:
插入解阻塞
// 如果客户端因为等待某个 key 被阻塞,那么将此key加入到server.ready_keys中
// 这个列表最终会被 handleClientsBlockedOnLists() 函数处理。
void signalListAsReady(redisDb *db, robj *key) {
readyList *rl;
// 如果在所有造成客户端阻塞的键中找不到此键,则不作处理
if (dictFind(db->blocking_keys,key) == NULL) return;
// 这个键已经存在于ready_keys中了,则不作处理
if (dictFind(db->ready_keys,key) != NULL) return;
// 创建一个新的readylists结构,保存键和数据库
// 然后将该结构添加到server.ready_keys中
rl = zmalloc(sizeof(*rl));
rl->key = key;
rl->db = db;
// 该键的索引加1
incrRefCount(key);
listAddNodeTail(server.ready_keys,rl);
// 同样,将key添加到db->ready_keys中
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}
在将所有可用的key加入到ready_keys之后,会有统一的函数去检查哪些客户端等待着这些key,然后将他们解阻塞
/* 遍历server.ready_keys中所有已经准备好的key,同时在c->db->blocking_keys中
遍历所有由此键造成阻塞的客户端,如果key不为空的话,就从key中弹出一个元素返回给客户端并解除该客户端的阻塞状态,直到server.ready_keys为空,或没有因该key而阻塞的客户端为止 */
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client.
*
* All the keys with at least one client blocked that received at least
* one new element via some PUSH operation are accumulated into
* the server.ready_keys list. This function will run the list and will
* serve clients accordingly. Note that the function will iterate again and
* again as a result of serving BRPOPLPUSH we can have new blocking clients
* to serve because of the PUSH side of BRPOPLPUSH. */
void handleClientsBlockedOnLists(void) {
while(listLength(server.ready_keys) != 0) {
list *l;
/* Point server.ready_keys to a fresh list and save the current one
* locally. This way as we run the old list we are free to call
* signalListAsReady() that may push new elements in server.ready_keys
* when handling clients blocked into BRPOPLPUSH. */
l = server.ready_keys;
server.ready_keys = listCreate();
while(listLength(l) != 0) {
listNode *ln = listFirst(l);
readyList *rl = ln->value;
/* First of all remove this key from db->ready_keys so that
* we can safely call signalListAsReady() against this key. */
dictDelete(rl->db->ready_keys,rl->key);
/* If the key exists and it's a list, serve blocked clients
* with data. */
robj *o = lookupKeyWrite(rl->db,rl->key);
if (o != NULL && o->type == OBJ_LIST) {
dictEntry *de;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
robj *dstkey = receiver->bpop.target;
int where = (receiver->lastcmd &&
receiver->lastcmd->proc == blpopCommand) ?
LIST_HEAD : LIST_TAIL;
robj *value = listTypePop(o,where);
if (value) {
/* Protect receiver->bpop.target, that will be
* freed by the next unblockClient()
* call. */
if (dstkey) incrRefCount(dstkey);
unblockClient(receiver);
if (serveClientBlockedOnList(receiver,
rl->key,dstkey,rl->db,value,
where) == C_ERR)
{
/* If we failed serving the client we need
* to also undo the POP operation. */
listTypePush(o,value,where);
}
if (dstkey) decrRefCount(dstkey);
decrRefCount(value);
} else {
break;
}
}
}
if (listTypeLength(o) == 0) {
dbDelete(rl->db,rl->key);
}
/* We don't call signalModifiedKey() as it was already called
* when an element was pushed on the list. */
}
/* Free this item. */
decrRefCount(rl->key);
zfree(rl);
listDelNode(l,ln);
}
listRelease(l); /* We have the new list on place at this point. */
}
}
补一张从http://czrzchao.com/redisSourceBPOP 转过来的图,很好的说明了阻塞中使用的结构:
- redisServer都是遍历需求,因此采用list作为存储结构。其中ready_keys需要key加db才能确定一个唯一阻塞值,因此list元素为一个简单的结构体。
- redisDb的blocking_keys用于存储单个db的阻塞key,有精确查找需求,采用dict作为基础数据结构。由于db的阻塞key和client为1对多关系,blocking_keys的value为clients的list。
- redisDb的ready_keys只是起到一个单纯的去重逻辑,db是key阻塞的单位,因此去重逻辑放在db结构体中最为合适。采用dict存储,将value置为NULL,只用到dict的索引的。
- client中的flags和btype用于记录阻塞的一些状态标志,bpop为一个复杂结构体,保存着阻塞超时时间和阻塞keys等,其中keys为dict数据结构,value为NULL,同db的ready_keys。