欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  数据库

redis源代码分析20–发布/订阅

程序员文章站 2022-04-19 19:01:17
...

redis的发布/订阅(publish/subscribe)功能类似于传统的消息路由功能,发布者发布消息,订阅者接收消息,沟通发布者和订阅者之间的桥梁是订阅的channel或者pattern。发布者向指定的publish或者pattern发布消息,订阅者阻塞在订阅的channel或者pattern。可以

redis的发布/订阅(publish/subscribe)功能类似于传统的消息路由功能,发布者发布消息,订阅者接收消息,沟通发布者和订阅者之间的桥梁是订阅的channel或者pattern。发布者向指定的publish或者pattern发布消息,订阅者阻塞在订阅的channel或者pattern。可以看到,发布者不会指定哪个订阅者才能接收消息,订阅者也无法只接收特定发布者的消息。这种订阅者和发布者之间的关系是松耦合的,订阅者不知道是谁发布的消息,发布者也不知道谁会接收消息。

redis的发布/订阅功能主要通过SUBSCRIBE、UNSUBSCRIBE、PSUBSCRIBE、PUNSUBSCRIBE 、PUBLISH五个命令来表现。其中SUBSCRIBE、UNSUBSCRIBE用于订阅或者取消订阅channel,而PSUBSCRIBE、PUNSUBSCRIBE用于订阅或者取消订阅pattern,发布消息则通过publish命令。

对于发布/订阅功能的实现,我们先来看看几个与此相关的结构。

struct redisServer {
    ---
   /* Pubsub */
   dict *pubsub_channels;/* Map channels to list of subscribed clients */
   list *pubsub_patterns;/* A list of pubsub_patterns */
   ---
}
typedef struct redisClient {
   ---
   dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
   list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
} redisClient;

在redis的全局server变量(redisServer类型)中,channel和订阅者之间的关系用字典pubsub_channels来保存,特定channel和所有订阅者组成的链表构成pubsub_channels字典中的一项,即字典中的每一项可表示为(channel,订阅者链表);pattern和订阅者之间的关系用链表pubsub_patterns来保存,链表中的每一项可表示成(pattern,redisClient)组成的字典。

在特定订阅者redisClient的结构中,pubsub_channels保存着它所订阅的channel的字典,而订阅的模式则保存在链表pubsub_patterns中。

从上面的解释,我们再来看看订阅/发布命令的最坏时间复杂度(注意字典增删查改一项的复杂度为O(1),而链表的查删复杂度为O(N),从链表尾部增加一项的复杂度为O(1))。

SUBSCRIBE:

订阅者用SUBSCRIBE订阅特定channel,这需要在订阅者的redisClient结构中的pubsub_channels增加一项(复杂度为 O(1)),然后在redisServer 的pubsub_channels找到该channel(复杂度为O(1)),并在该channel的订阅者链表的尾部增加一项(复杂度为O(1),注意,如果pubsub_channels中没找到该channel,则插入的复杂度也同为O(1)),因此订阅者用SUBSCRIBE订阅特定 channel的最坏时间复杂度为O(1)。

UNSUBSCRIBE:

订阅者取消订阅时,需要先从订阅者的redisClient结构中的pubsub_channels删除一项(复杂度为O(1)),然后在 redisServer 的pubsub_channels找到该channel(复杂度为O(1)),然后在channel的订阅者链表中删除该订阅者(复杂度为O(1)),因此总的复杂度为O(N),N为特定channel的订阅者数。

PSUBSCRIBE:

订阅者用PSUBSCRIBE订阅pattern时,需要先在redisClient结构中的pubsub_patterns先查找是否已存在该 pattern(复杂度为O(N)),并在不存在的情况下往redisClient结构中的pubsub_patterns和redisServer结构中的pubsub_patterns链表尾部各增加一项(复杂度都为O(1)),因此,总的复杂度为O(N),其中N为订阅者已订阅的模式。

PUNSUBSCRIBE:

订阅者用PUNSUBSCRIBE取消对pattern的订阅时,需要先在redisClient结构中的pubsub_patterns链表中删除该 pattern(复杂度为O(N)),并在redisServer结构中的pubsub_patterns链表中删除订阅者和pattern组成的映射(复杂度为O(M),因此,总的复杂度为O(N+M),其中N为订阅者已订阅的模式,而M为系统中所有订阅者和所有pattern组成的映射数。

PUBLISH:

发布消息时,只会向特定channel发布,但该channel可能会匹配某个pattern。因此,需要先在redisServer结构中的 pubsub_channels找到该channel的订阅者链表(O(1)),然后发送给所有订阅者(复杂度为O(N)),然后查看 redisServer结构中的pubsub_patterns链表中的所有项,看channel是否和该项中的pattern匹配(复杂度为O(M))(注意,这并不包括模式匹配的复杂度),因此,总的复杂度为O(N+M),。其中N为该channel的订阅者数,而M为系统中所有订阅者和所有 pattern组成的映射数。另外,从这也可以看出,一个订阅者是可能多次收到同一个消息的。

解释了发布/订阅的算法后,其代码就好理解了,这里仅给出PUBLISH命令的处理函数publishCommand的代码,更多相关命令的代码请参看redis的源代码。

static void publishCommand(redisClient *c) {
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
    addReplyLongLong(c,receivers);
}
/* Publish a message */
static int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    struct dictEntry *de;
    listNode *ln;
    listIter li;
    /* Send to clients listening for that channel */
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        list *list = dictGetEntryVal(de);
        listNode *ln;
        listIter li;
        listRewind(list,&li);
        while ((ln = listNext(&li)) != NULL) {
            redisClient *c = ln->value;
            addReply(c,shared.mbulk3);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* Send to clients listening to matching channels */
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                addReply(pat->client,shared.mbulk4);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

最后提醒一下,处于发布/订阅模式的client,是无法发布上述五种命令之外的命令(quit除外),这是在processCommand函数中检查的,可以参看前面命令处理章节对该函数的解释。