欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

以太坊源码分析之 P2P网络(五、p2p连接控制与消息处理(中))

程序员文章站 2022-07-01 15:32:24
...

区块链特辑 :https://blog.csdn.net/fusan2004/article/details/80879343,欢迎查阅,原创作品,转载请标明!

承继前篇, 前一篇介绍了连接底层的一些细节,包括socket,握手等详细步骤,等待握手完成后,会调用startPeerSession抛向上层,这个时候连接表明已经建立了,从代码里面,为了表征这种连接的建立,会再定义一些数据结构来表达这种关系,本篇文章我们将首先去了解下startPeerSession里面都做了些什么事情,然后再去详细了解逻辑上的连接表达方式。

一、建立连接

这部分我们只介绍host.cpp中的startPeerSession函数,其他关于host的部分将在下篇中介绍,那也是整个p2p系列代码部分的最后一篇。首先研读下cpp的代码,如下

// 这是在握手成功之后被handshake调用,握手是在connect或者acceptor中进行
// 可以细致了解下这个函数的参数,_id表示对方的公钥,也是对方的节点id,rlp实际上
// 是writeHello中填写的对方的一些信息,io是数据帧编解码,_s就是底层通信的socket
void Host::startPeerSession(Public const& _id, RLP const& _rlp, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s)
{
    // session可能是主动或者被动建立的,所以peers和node table里面可能不包括这个node id
    shared_ptr<Peer> p;
    DEV_RECURSIVE_GUARDED(x_sessions)
    {
        if (m_peers.count(_id))   //判断peer是否存在
            p = m_peers[_id];
        else
        {
            // 不存在的话,先尝试从node table中获取端口信息
            if (Node n = nodeFromNodeTable(_id))
                p = make_shared<Peer>(n);

            if (!p)
                p = make_shared<Peer>(Node(_id, UnspecifiedNodeIPEndpoint));  //如果node_table中也没有,先不指定ip断点信息

            m_peers[_id] = p;  //添加到m_peers中去
        }
    }
    if (p->isOffline()) // 如果之前已经下线了,session不存在了
        p->m_lastConnected = std::chrono::system_clock::now();  //更新
    p->endpoint.address = _s->remoteEndpoint().address();  //更新地址

    auto protocolVersion = _rlp[0].toInt<unsigned>();  // 协议版本号
    auto clientVersion = _rlp[1].toString();           // 客户版本
    auto caps = _rlp[2].toVector<CapDesc>();           // 远方端点支持的能力
    auto listenPort = _rlp[3].toInt<unsigned short>(); // 监听的端口
    auto pub = _rlp[4].toHash<Public>();  // 公钥,与node id相同

    if (pub != _id)
    {
        cdebug << "Wrong ID: " << pub << " vs. " << _id; // 不同认为有问题,不建立session
        return;
    }

    // clang error (previously: ... << hex << caps ...)
    // "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments"
    stringstream capslog;

    // leave only highset mutually supported capability version
    // 如果当前host不支持,或者有重复,清理
    caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());

    for (auto cap: caps)
        capslog << "(" << cap.first << "," << dec << cap.second << ")";

    cnetlog << "Hello: " << clientVersion << " V[" << protocolVersion << "]"
            << " " << _id << " " << showbase << capslog.str() << " " << dec << listenPort;

    // create session so disconnects are managed
    // session来管理连接,所以先创建session,然后再去判断信息的合法性
    shared_ptr<SessionFace> ps = make_shared<Session>(this, move(_io), _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
    if (protocolVersion < dev::p2p::c_protocolVersion - 1)  //协议版本不一致的话断开
    {
        ps->disconnect(IncompatibleProtocol);
        return;
    }
    if (caps.empty())
    {
        ps->disconnect(UselessPeer);  //没有注册功能的话,直接断开连接
        return;
    }

    if (m_netPrefs.pin && !isRequiredPeer(_id))   //如果设置要求只接受可信的peer,但是当前不可信,也要断开
    {
        cdebug << "Unexpected identity from peer (got" << _id << ", must be one of " << m_requiredPeers << ")";
        ps->disconnect(UnexpectedIdentity);
        return;
    }
    
    {
        RecursiveGuard l(x_sessions);
        if (m_sessions.count(_id) && !!m_sessions[_id].lock()) // 该节点已有session,且该session已建立
            if (auto s = m_sessions[_id].lock())
                if(s->isConnected())  //该连接正在连接,端口
                {
                    // Already connected.
                    cnetlog << "Session already exists for peer with id " << _id;
                    ps->disconnect(DuplicatePeer);
                    return;
                }
        
        if (!peerSlotsAvailable())  //如果已经满了
        {
            cnetdetails << "Too many peers, can't connect. peer count: " << peerCount() // peerCount已经连接且连接有效的peer数
                        << " pending peers: " << m_pendingPeerConns.size();
            ps->disconnect(TooManyPeers);
            return;
        }

        unsigned offset = (unsigned)UserPacket;

        // todo: mutex Session::m_capabilities and move for(:caps) out of mutex.
        for (auto const& i: caps)
        {
            auto pcap = m_capabilities[i];
            if (!pcap) // 这个判断很奇怪,上面已经进行去了整理,只保留了双方都支持的cap
                return ps->disconnect(IncompatibleProtocol);

            pcap->newPeerCapability(ps, offset, i); //
            offset += pcap->messageCount(); //消息的数量
        }

        ps->start(); //发完ping之后再添加到m_sessions
        m_sessions[_id] = ps;
    }
    
    LOG(m_logger) << "p2p.host.peer.register " << _id;
}

这个函数里面,主要是根据当前建立的连接,生成了一个peer,当然如果这个peer已经存在的话就复用了,然后再根据已有数据为这个连接生成了一个逻辑上的结构session,最后根据交互的信息来查看这个连接是否合理合法,没问题的话就将这个新建的session放到m_sessions里面去,否则会调用session的disconnect函数,这里面也说明了为啥有些判断条件不是在session生成之前来判断,这是因为作者希望是通过session来管理连接,这可以从disconnect的时候要加上断开的说明看出来。

除了这些基本逻辑,大家还需要关心几个结构,这些也将会在后面来依次说明:

1)peer,要区分peer和node的区别,node是在NodeTable中体现的,peer是tcp连接握手成功后体现的,peer是真正用来p2p消息的对象,node仅仅是候选,后面看peer的构造函数的参数是Node类型也可以说明些事情;

2)session,session是整个连接管理中最重要的结构,他表示的就是peer真正建立了连接后的逻辑关系,从构造函数中可以看到,他把peer 、peerSessionInfo 、socket 、io 等全部包括了;

3)capability,这是指节点提供的能力,实际上也就是在p2p层之上的业务能力,p2p只是提供通信信道,具体用p2p干什么、传递什么消息,都是由这些capability来触发的,这个部分也会在后面详细说明。

二、连接的数据结构

1、peer

首先我们来看看peer提供了哪些能力

class Peer: public Node
{
	friend class Session;		/// Allows Session to update score and rating.
	friend class Host;		/// For Host: saveNetwork(), restoreNetwork()
	friend class RLPXHandshake;
public:
	/// Construct Peer from Node.
	Peer(Node const& _node): Node(_node) {}
	Peer(Peer const&);
	bool isOffline() const { return !m_session.lock(); }  //判断是否断开
    // peer间一个比较
	virtual bool operator<(Peer const& _p) const;
    // 返回当前peer的评分
	int rating() const { return m_rating; }
    // 判断是否需要尝试去连接这个peer
	bool shouldReconnect() const;
    // 尝试连接这个peer的次数
	int failedAttempts() const { return m_failedAttempts; }
    // 上一次断开的原因
	DisconnectReason lastDisconnect() const { return m_lastDisconnect; }	
	// 表明和这个peer的连接是有用的
	void noteSessionGood() { m_failedAttempts = 0; }
	
protected:
    // 根据之前连接的历史来决定下一次尝试连接等待的时间
	unsigned fallbackSeconds() const;
	std::atomic<int> m_score{0};									///< All time cumulative.
	std::atomic<int> m_rating{0};									///< Trending.	
	/// Network Availability	
	std::chrono::system_clock::time_point m_lastConnected;  // 最近一次连接时间
	std::chrono::system_clock::time_point m_lastAttempted;  // 最近一次尝试重连时间
	std::atomic<unsigned> m_failedAttempts{0};  // 失败次数
	DisconnectReason m_lastDisconnect = NoDisconnect;	// 上一次断开原因
	/// Used by isOffline() and (todo) for peer to emit session information.
	std::weak_ptr<Session> m_session;
};

从peer继承于node看,实际上是在node上面加了一层连接相关的逻辑,从代码中可以看出,主要是一些连接的逻辑,包括重连,是否断开,连接次数等等。

2、session

session是上层网络逻辑最重要的部分,主要负责维护连接的稳定性以及保持消息通信,下面一起看下session里面的操作

class Session: public SessionFace, public std::enable_shared_from_this<SessionFace>
{
public:
    // 构造函数
    // _server, 其实是为了和创建session的host有个交互的可能,目前没有大的作用
    // 也说明当创建完session之后,p2p的业务逻辑都是通过session来完成的了
    // _io, rlp数据编解码,这是在网络底层数据接受的rlp封装对象
    // _s,底层通信的socket
    // _n, 与之连接peer,session的另一头
    // _info, 简单的维护了对方的一些信息
	Session(Host* _server, std::unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info);
	virtual ~Session();
    // session的启动,这里主要是发送了ping数据,然后开启read模式,等待收消息
	void start() override;
    // 主动断开,并添加原因
	void disconnect(DisconnectReason _reason) override;
    // 发送ping包
	void ping() override;
    // 判断是否还在连接
	bool isConnected() const override { return m_socket->ref().is_open(); }
    // 返回连接的node id
	NodeID id() const override;
    // 封装且发送
	void sealAndSend(RLPStream& _s) override;
    // 返回评分
	int rating() const override;
	void addRating(int _r) override;

	void addNote(std::string const& _k, std::string const& _v) override { Guard l(x_info); m_info.notes[_k] = _v; }

	PeerSessionInfo info() const override { Guard l(x_info); return m_info; }
    // 返回连接建立的时间,也就是session被new出来的时间点
	std::chrono::steady_clock::time_point connectionTime() override { return m_connect; } 
    // 注册功能
	void registerCapability(CapDesc const& _desc, std::shared_ptr<Capability> _p) override;
    // 返回目前支持的功能
	std::map<CapDesc, std::shared_ptr<Capability>> const& capabilities() const override { return m_capabilities; }
    // 返回对方peer
	std::shared_ptr<Peer> peer() const override { return m_peer; }
    // 返回最近一次接收到消息的时间点
	std::chrono::steady_clock::time_point lastReceived() const override { return m_lastReceived; }
    // 名誉管理器
	ReputationManager& repMan() override;

private:
	static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0);
    // 发送数据
	void send(bytes&& _msg);
	// 因为某种原因丢弃连接
	void drop(DisconnectReason _r);
	// 在socket上执行读操作
	void doRead();
	// 在读完之后检查error code,如果error code存在那就丢弃peer
	bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length);
	// 执行一轮写操作,可能结束的时候异步调用自己
	void write();
	/// Deliver RLPX packet to Session or Capability for interpretation.
	// 为了可解释性传送rlpx包给session或capablity
	bool readPacket(uint16_t _capId, PacketType _t, RLP const& _r);
	/// Interpret an incoming Session packet.
	// 解释一个传入session包
	bool interpret(PacketType _t, RLP const& _r);
	/// @returns true iff the _msg forms a valid message for sending or receiving on the network.
	// 检查网络上发送或读取的msg是否是一个有用的消息
	static bool checkPacket(bytesConstRef _msg);

	Host* m_server;							    // 保存session的server,绝不为null

	std::unique_ptr<RLPXFrameCoder> m_io;	    // 编码,基于此发送数据报
	std::shared_ptr<RLPXSocket> m_socket;		// peer连接底层的socket
	Mutex x_framing;						    // 写队列的互斥锁
	std::deque<bytes> m_writeQueue;			    // 写队列
	std::vector<byte> m_data;			        // 读进来的数据缓冲区
	bytes m_incoming;						    // 读进来字节的读缓冲区

	std::shared_ptr<Peer> m_peer;			    // peer对象
	bool m_dropped = false;					// 是否丢弃这个session
	                                        //< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.

	mutable Mutex x_info;
	PeerSessionInfo m_info;						// 关于这个peer的动态信息

	std::chrono::steady_clock::time_point m_connect;		// 连接的时间点
	std::chrono::steady_clock::time_point m_ping;			// 最近一次ping的时间点
	std::chrono::steady_clock::time_point m_lastReceived;	// 最近一次获取消息的时间点

	std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities;	// 这个peer的支持的功能

    std::string const m_logContext;
};

可以看出,session最主要的功能还是处理消息的发送和接受,来看看主要的几个函数,首先看下start

void Session::start()
{
    ping();   // 先发一个ping消息,打个招呼
    doRead(); // 进入读等待阶段
}

void Session::ping()  //发送ping包
{
    RLPStream s;
    sealAndSend(prep(s, PingPacket));  // prep是准备消息,sealAndSend在准备好的消息发送
    m_ping = std::chrono::steady_clock::now();
}

start函数其实最主要的就是让socket处于异步读的逻辑中,这个ping除了更新保活状态,没有其他作用了,doRead函数较大,单独提出来说明。

void Session::doRead()
{
    // 如果以及处于被丢弃状态的话忽略接收到的packet
    if (m_dropped)
        return;

    auto self(shared_from_this());
    m_data.resize(h256::size); // 将读缓冲区resize到h256的大小空间,数据头的大小
    ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size),
        [this, self](boost::system::error_code ec, std::size_t length) {
            LOG_SCOPED_CONTEXT(m_logContext);

            if (!checkRead(h256::size, ec, length))  //读取过程中报错,或者长度不一致
                return;
            else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length))) //解密数据头
            {
                cnetlog << "header decrypt failed";
                drop(BadProtocol);  // todo: better error
                return;
            }

            uint16_t hProtocolId;
            uint32_t hLength;
            uint8_t hPadding;
            try
            {
                RLPXFrameInfo header(bytesConstRef(m_data.data(), length));
                hProtocolId = header.protocolId;
                hLength = header.length;
                hPadding = header.padding;
            }
            catch (std::exception const& _e)
            {
                cnetlog << "Exception decoding frame header RLP: " << _e.what() << " "
                        << bytesConstRef(m_data.data(), h128::size).cropped(3);
                drop(BadProtocol);
                return;
            }

            /// read padded frame and mac
            // 读取后面的帧数据以及mac
            auto tlen = hLength + hPadding + h128::size;
            m_data.resize(tlen);
            ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen),
                [this, self, hLength, hProtocolId, tlen](
                    boost::system::error_code ec, std::size_t length) {
                    LOG_SCOPED_CONTEXT(m_logContext);

                    if (!checkRead(tlen, ec, length))  //同样检查下数据对不对,全不全
                        return;
                    else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen))) //解密
                    {
                        cnetlog << "frame decrypt failed";
                        drop(BadProtocol);  // todo: better error
                        return;
                    }

                    bytesConstRef frame(m_data.data(), hLength);
                    if (!checkPacket(frame))  //验证下消息格式和大小
                    {
                        cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
                        cnetlog << "INVALID MESSAGE RECEIVED";
                        disconnect(BadProtocol);
                        return;
                    }
                    else
                    {
                        //第一个保存的是packetType
                        auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
                        RLP r(frame.cropped(1)); //后面是正文
                        bool ok = readPacket(hProtocolId, packetType, r); //处理逻辑
                        if (!ok)
                            cnetlog << "Couldn't interpret packet. " << RLP(r);
                    }
                    doRead();  //成功读取之后继续下一次doRead
                });
        });
}

从doRead函数中可以看出来,这个函数大概就是先读取一个数据头,根据数据头读取数据体,然后把完整的消息体都扔给了readPacket来处理,当一次读过程没有问题的时候,就继续下一次读过程,session本身不处理消息,那么可以从readPacket看看消息是怎么交给其他部分来处理的

//读取数据包
bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
{
    m_lastReceived = chrono::steady_clock::now();
    clog(VerbosityTrace, "net") << "-> " << _t << " " << _r;
    try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
    {
        // v4 frame headers are useless, offset packet type used
        // v5 protocol type is in header, packet type not offset
        if (_capId == 0 && _t < UserPacket)  //如果 _t < UserPacket,说明是底层通信数据包,比如ping pong之类,具体类型定义在Common.h文件
            return interpret(_t, _r);

        //这里面是根据支持的功能进行相应的操作
        for (auto const& i: m_capabilities):
            if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
                return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;

        return false;  //如果未找到相应的capability,返回错误
    }
    catch (std::exception const& _e)
    {
        cnetlog << "Exception caught in p2p::Session::interpret(): " << _e.what()
                << ". PacketType: " << _t << ". RLP: " << _r;
        disconnect(BadProtocol);
        return true;
    }
    return true;
}

enum PacketType
{
    HelloPacket = 0,
    DisconnectPacket,
    PingPacket,
    PongPacket,
    GetPeersPacket,
    PeersPacket,
    UserPacket = 0x10
};

在readPacket可以看到,数据类型有系统定义的类型和用户自定义的类型之分,当数据包类型属于enum PacketType之中的时候,会直接由session的interpret函数来处理,而当超出这个时候,就会由注册的capability来处理,大家读起来其实挺费劲的,这个capability没有明确的说明什么消息类型,而是说给定一个偏移量,根据当前值与偏移量的之差来确定消息类型,然后由各capability自己的interpret函数来解析和处理,这一点我个人是不喜欢的,个人感觉一个系统没有必要设计的如此晦涩,再来看看interpret里面都如何处理系统定义的消息的吧

bool Session::interpret(PacketType _t, RLP const& _r)
{
    switch (_t)
    {
    case DisconnectPacket:  // 断开连接数据包
    {
        string reason = "Unspecified";
        auto r = (DisconnectReason)_r[0].toInt<int>();
        if (!_r[0].isInt())  //不是整型
            drop(BadProtocol);
        else
        {
            reason = reasonOf(r);
            cnetlog << "Disconnect (reason: " << reason << ")";
            drop(DisconnectRequested);
        }
        break;
    }
    case PingPacket:  // ping数据包
    {
        cnetdetails << "Ping " << m_info.id;
        RLPStream s;
        sealAndSend(prep(s, PongPacket));  // 回复pong数据包
        break;
    }
    case PongPacket:
        DEV_GUARDED(x_info)
        {
            m_info.lastPing = std::chrono::steady_clock::now() - m_ping;  //更新ping时间
            cnetdetails << "Latency: "
                        << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count()
                        << " ms";
        }
        break;
    case GetPeersPacket: // 这些实际上是在节点发现时候用到的,session中用不到
    case PeersPacket:
        break;
    default:
        return false;
    }
    return true;
}

从这里可以看到,发送给对方的时候都有一个sealAndSend的过程,这个函数里面前面也提到实际上就是调用了send函数将数据发送出去,最后来看下send的过程吧

void Session::send(bytes&& _msg)
{
    bytesConstRef msg(&_msg); 
    clog(VerbosityTrace, "net") << "<- " << RLP(msg.cropped(1));
    if (!checkPacket(msg))  // 检查是否合法
        cnetlog << "INVALID PACKET CONSTRUCTED!";

    if (!m_socket->ref().is_open()) // 检查连接是否保持
        return;

    bool doWrite = false;
    DEV_GUARDED(x_framing)
    {
        m_writeQueue.push_back(std::move(_msg));  // 塞到写队列中去
        doWrite = (m_writeQueue.size() == 1); // 只有当队列长度为一的时候才写,这个跟之前介绍节点发现时候理由一致
    }

    if (doWrite)
        write();  // 调用write
}

void Session::write()
{
    bytes const* out = nullptr;
    DEV_GUARDED(x_framing)
    {
        m_io->writeSingleFramePacket(&m_writeQueue[0], m_writeQueue[0]);  // io用来编码的
        out = &m_writeQueue[0];
    }
    auto self(shared_from_this());
    ba::async_write(m_socket->ref(), ba::buffer(*out),
        [this, self](boost::system::error_code ec, std::size_t /*length*/) {
            LOG_SCOPED_CONTEXT(m_logContext);

            // must check queue, as write callback can occur following dropped()
            if (ec)
            {
                cnetlog << "Error sending: " << ec.message();
                drop(TCPError);
                return;
            }

            DEV_GUARDED(x_framing)
            {
                m_writeQueue.pop_front();  //丢弃已写的数据
                if (m_writeQueue.empty())
                    return;
            }
            write(); //继续写
        });
}

可以看出,在网络这块,作者的思路还是很有延续性的,对write的处理和节点发现过程基本上是一直的,只不过在send的时候加了些合法性判断以及io编码等操作而已

到这里,就把节点相关的连接逻辑全部介绍完毕了,最后一节将介绍host的功能以及每个节点提供的能力是如何体现的,未完待续。。。

相关标签: 区块链 p2p