MongoDB源码阅读之通信协议
MongoDB wire protocol
MongoDB通过一种特殊的client/server 之间的协议:wire protocol。它是一种基于socket的request/response之间的通信协议。用户通过mongo shell或者driver API, 进行创建、删除、更新以及执行管理操作的命令, 都会产生一个Message, 用来表示该操作需要的信息。 每一个消息, 都有一些公共的部分, 被称作MsgHeader, 它分别记录了消息体的长度, 当前消息的requestId, 目的端的database ID, 以及代表操作的类型, 比如创建、删除等。
struct MsgHeader {
int32 messageLength; // total message size, including this
int32 requestID; // identifier for this message
int32 responseTo; // requestID from the original request
// (used in responses from db)
int32 opCode; // request type - see table below for details
}
在代码里面, 通过Message类, 来接受客户端传入的信息,具体的代码实现可以参考: src/mongo/Util/Net/message.h
在MongoDB的main函数里面, 会创建一个MyMessageHandler 对象, 由它来接收所有的来自于客户端的消息:
class MyMessageHandler : public MessageHandler {
public:
virtual void process(Message& m, AbstractMessagingPort* port) {
while (true) {
DbResponse dbresponse;
{
auto opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
assembleResponse(opCtx.get(), m, dbresponse, port->remote());
// opCtx must go out of scope here so that the operation cannot show up in currentOp
// results after the response reaches the client
}
if (!dbresponse.response.empty()) {
port->reply(m, dbresponse.response, dbresponse.responseTo);
...
}
break;
}
}
};
消息的处理
从上面的代码可以看到, 所有的消息处理函数为assembleResponse, 在其里面会根据不同的消息类型, 进行不同的处理:
void assembleResponse(OperationContext* txn,
Message& m,
DbResponse& dbresponse,
const HostAndPort& remote) {
OpDebug& debug = currentOp.debug();
if (op == dbQuery) {
if (isCommand) {
receivedCommand(txn, nsString, c, dbresponse, m);
} else {
receivedQuery(txn, nsString, c, dbresponse, m);
}
} else if (op == dbCommand) {
receivedRpc(txn, c, dbresponse, m);
} else if (op == dbGetMore) {
if (!receivedGetMore(txn, dbresponse, m, currentOp))
shouldLogOpDebug = true;
} else {
if (op == dbKillCursors) {
receivedKillCursors(txn, m);
} else if (op != dbInsert && op != dbUpdate && op != dbDelete) {
...
} else {
if (op == dbInsert) {
receivedInsert(txn, nsString, m, currentOp);
} else if (op == dbUpdate) {
receivedUpdate(txn, nsString, m, currentOp);
} else if (op == dbDelete) {
receivedDelete(txn, nsString, m, currentOp);
} else {
invariant(false);
}
}
}
// 记录下curOp的各种统计信息, 包括locks等的各种信息
Locker::LockerInfo lockerInfo;
txn->lockState()->getLockerInfo(&lockerInfo);
log() << debug.report(currentOp, lockerInfo.stats);
...
}
创建、删除以及更新等操作, 这些操作可以很容易从上面的operation 类型进行区分, 然后我们根据相应的入口函数向下调用就能够trace到其实现。
Command和其他的相随来讲要复杂一点, 因为我们面临了太多的command, 有database的, collection的, 副本集的, 以及sharding的,他们都来自于相同的基类:Command, 其继承结构如下:
每一种Command的构造函数, 在其基类的Command 部分会进行注册: typedef StringMap<Command*> CommandMap;
通过该一个command命令, 经过map找到相关的具体的Command对象, 每个command都有相关的权限校验, 以及命令执行函数Run().