欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MongoDB源码阅读之通信协议

程序员文章站 2022-05-19 14:47:49
...

MongoDB wire protocol

MongoDB通过一种特殊的client/server 之间的协议:wire protocol。它是一种基于socket的request/response之间的通信协议。用户通过mongo shell或者driver API, 进行创建、删除、更新以及执行管理操作的命令, 都会产生一个Message, 用来表示该操作需要的信息。 每一个消息, 都有一些公共的部分, 被称作MsgHeader, 它分别记录了消息体的长度, 当前消息的requestId, 目的端的database ID, 以及代表操作的类型, 比如创建、删除等。

struct MsgHeader {
    int32   messageLength; // total message size, including this
    int32   requestID;     // identifier for this message
    int32   responseTo;    // requestID from the original request
                           //   (used in responses from db)
    int32   opCode;        // request type - see table below for details
}

在代码里面, 通过Message类, 来接受客户端传入的信息,具体的代码实现可以参考: src/mongo/Util/Net/message.h
在MongoDB的main函数里面, 会创建一个MyMessageHandler 对象, 由它来接收所有的来自于客户端的消息:

class MyMessageHandler : public MessageHandler {
public:
    virtual void process(Message& m, AbstractMessagingPort* port) {
        while (true) {
            DbResponse dbresponse;
            {
                auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
                assembleResponse(opCtx.get(), m, dbresponse, port->remote());

                // opCtx must go out of scope here so that the operation cannot show up in currentOp
                // results after the response reaches the client
            }

            if (!dbresponse.response.empty()) {
                port->reply(m, dbresponse.response, dbresponse.responseTo);
                ...
            }
            break;
        }
    }
};

消息的处理
从上面的代码可以看到, 所有的消息处理函数为assembleResponse, 在其里面会根据不同的消息类型, 进行不同的处理:

void assembleResponse(OperationContext* txn,
                      Message& m,
                      DbResponse& dbresponse,
                      const HostAndPort& remote) {
    OpDebug& debug = currentOp.debug();

    if (op == dbQuery) {
        if (isCommand) {
            receivedCommand(txn, nsString, c, dbresponse, m);
        } else {
            receivedQuery(txn, nsString, c, dbresponse, m);
        }
    } else if (op == dbCommand) {
        receivedRpc(txn, c, dbresponse, m);
    } else if (op == dbGetMore) {
        if (!receivedGetMore(txn, dbresponse, m, currentOp))
            shouldLogOpDebug = true;
    }  else {
            if (op == dbKillCursors) {
                receivedKillCursors(txn, m);
            } else if (op != dbInsert && op != dbUpdate && op != dbDelete) {
                ...
            } else {
                if (op == dbInsert) {
                    receivedInsert(txn, nsString, m, currentOp);
                } else if (op == dbUpdate) {
                    receivedUpdate(txn, nsString, m, currentOp);
                } else if (op == dbDelete) {
                    receivedDelete(txn, nsString, m, currentOp);
                } else {
                    invariant(false);
                }
            }
    }

        // 记录下curOp的各种统计信息, 包括locks等的各种信息
        Locker::LockerInfo lockerInfo;
        txn->lockState()->getLockerInfo(&lockerInfo);
        log() << debug.report(currentOp, lockerInfo.stats);
     ...
}        

创建、删除以及更新等操作, 这些操作可以很容易从上面的operation 类型进行区分, 然后我们根据相应的入口函数向下调用就能够trace到其实现。
Command和其他的相随来讲要复杂一点, 因为我们面临了太多的command, 有database的, collection的, 副本集的, 以及sharding的,他们都来自于相同的基类:Command, 其继承结构如下:
MongoDB源码阅读之通信协议
每一种Command的构造函数, 在其基类的Command 部分会进行注册: typedef StringMap<Command*> CommandMap;
通过该一个command命令, 经过map找到相关的具体的Command对象, 每个command都有相关的权限校验, 以及命令执行函数Run().

相关标签: 笔记总结