欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

以太坊源码分析之 P2P网络(六、p2p连接控制与消息处理(下))

程序员文章站 2022-07-01 15:32:30
...

区块链特辑 :https://blog.csdn.net/fusan2004/article/details/80879343,欢迎查阅,原创作品,转载请标明!

这是p2p网络系列文章的最后一篇,前面很多篇主要都是在描述p2p的底层实现,那么p2p在整个系统中处于什么位置和提供什么功能,都将会在本篇进行一部分的总结,首先我们看下host相关代码,来了解p2p的运行模式,然后再去了解capacity的处理逻辑。

一、Host

先来看下host.h文件,对Host建立一个初步的印象。

class Host: public Worker
{
	friend class HostNodeTableHandler;
	friend class RLPXHandshake;	
	friend class Session;
	friend class HostCapabilityFace;
public:
    // 启动服务端,在给定端口进行监听
	Host(
		std::string const& _clientVersion,
		NetworkPreferences const& _n = NetworkPreferences(),
		bytesConstRef _restoreNetwork = bytesConstRef()
	);
    // 可选的构造函数,运行直接提供节点**,不需要重新载入网络信息
	Host(
		std::string const& _clientVersion,
		KeyPair const& _alias,
		NetworkPreferences const& _n = NetworkPreferences()
	);
    // 将会在网络处理事件时阻塞
	virtual ~Host();
    // 默认host,大家有印象的话就记得nodetable进行节点发现的时候需要预先设定几个节点来获取信息
	static std::unordered_map<Public, std::string> pocHosts();
    // 注册一个peer-capability,所有新建立的连接都将会拥有该capability
	template <class T> std::shared_ptr<T> registerCapability(std::shared_ptr<T> const& _t) { _t->m_host = this; m_capabilities[std::make_pair(T::staticName(), T::staticVersion())] = _t; return _t; }
    // 后面会分析这两个函数的区别
	template <class T> void addCapability(std::shared_ptr<T> const & _p, std::string const& _name, u256 const& _version) { m_capabilities[std::make_pair(_name, _version)] = _p; }
    //当前节点是否包含某项功能
	bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; }
    // 返回所有capability的desc
	CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
	template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
    // 增加一个潜在的peer
	void addPeer(NodeSpec const& _s, PeerType _t);
    // 添加节点作为一个peer的候选,当节点发现到这个节点并且还有剩余额度的时候该节点被添加
	void addNode(NodeID const& _node, NodeIPEndpoint const& _endpoint);
    // 创建peer,并尝试保持该peer连接
	void requirePeer(NodeID const& _node, NodeIPEndpoint const& _endpoint);
	void requirePeer(NodeID const& _node, bi::address const& _addr, unsigned short _udpPort, unsigned short _tcpPort) { requirePeer(_node, NodeIPEndpoint(_addr, _udpPort, _tcpPort)); }
    // 标记peer不再被需要
	void relinquishPeer(NodeID const& _node);
    // 设置peer的理想数量
	void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
    // 设置最大接受连接数的乘法因子
	void setPeerStretch(unsigned _n) { m_stretchPeers = _n; }
	// 获取peer信息
	PeerSessionInfos peerSessionInfo() const;
    // 获取已经连接的peer数
	size_t peerCount() const;
    // 获取我们正在监听的地址
	std::string listenAddress() const { return m_tcpPublic.address().is_unspecified() ? "0.0.0.0" : m_tcpPublic.address().to_string(); }
    // 获取我们正在监听的端口
	unsigned short listenPort() const { return std::max(0, m_listenPort.load()); }
    // 序列化已知peer的set
	bytes saveNetwork() const;
	// TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information.
	Peers getPeers() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; }
    // 网络设置
	NetworkPreferences const& networkPreferences() const { return m_netPrefs; }
    // 保存网络设置
	void setNetworkPreferences(NetworkPreferences const& _p, bool _dropPeers = false) { m_dropPeers = _dropPeers; auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); }
    // 启动网络,
	void start();
    // 停止网络
	void stop();
	// 返回网络是否start
	bool isStarted() const { return isWorking(); }
	/// @returns our reputation manager.
	ReputationManager& repMan() { return m_repMan; }
	// 如果网络以及启动,并且具有交互性
	bool haveNetwork() const { Guard l(x_runTimer); Guard ll(x_nodeTable); return m_run && !!m_nodeTable; }
    // 确认并开启网络连接
	void startPeerSession(Public const& _id, RLP const& _hello, std::unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s);
    // 根据id来获取session
	std::shared_ptr<SessionFace> peerSession(NodeID const& _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? m_sessions[_id].lock() : std::shared_ptr<SessionFace>(); }
    // 获取节点id
	NodeID id() const { return m_alias.pub(); }
    // 获取tcp端口的公开地址
	bi::tcp::endpoint const& tcpPublic() const { return m_tcpPublic; }
	/// Get the public endpoint information.
	std::string enode() const { return "enode://" + id().hex() + "@" + (networkPreferences().publicIPAddress.empty() ? m_tcpPublic.address().to_string() : networkPreferences().publicIPAddress) + ":" + toString(m_tcpPublic.port()); }
    // 获取节点信息
	p2p::NodeInfo nodeInfo() const { return NodeInfo(id(), (networkPreferences().publicIPAddress.empty() ? m_tcpPublic.address().to_string() : networkPreferences().publicIPAddress), m_tcpPublic.port(), m_clientVersion); }
protected:
    // 响应nodetable事件
	void onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e);

	/// Deserialise the data and populate the set of known peers.
	void restoreNetwork(bytesConstRef _b);

private:
	enum PeerSlotType { Egress, Ingress };
	
	// 不同类型的peer连接数量限制,主动连接少点,被动连接多一点
	unsigned peerSlots(PeerSlotType _type) { return _type == Egress ? m_idealPeerCount : m_idealPeerCount * m_stretchPeers; }
    // 判断是否与某个node建立了连接
	bool havePeerSession(NodeID const& _id) { return !!peerSession(_id); }
    // 确定并设置公共地址
	void determinePublic();
    // 主动连接peer
	void connect(std::shared_ptr<Peer> const& _p);
	// 如果等待和已连接的peer数量小于最大值返回true
	bool peerSlotsAvailable(PeerSlotType _type = Ingress);
	// ping已连接的peer,来更新连接信息,并在某些超时的peer断开
	void keepAlivePeers();
	// 断开在c_keepAliveTimeOut之前没有回复keepAlivePeers ping的peers
	void disconnectLatePeers();
    // 只会在startedWorking里面调用
	void runAcceptor();
    // 被worker调用
	virtual void startedWorking();
	/// Called by startedWorking. Not thread-safe; to be called only be Worker.
	void run(boost::system::error_code const& error);			///< Run network. Called serially via ASIO deadline timer. Manages connection state transitions.
	// 运行network. Not thread-safe; to be called only by worker.
	virtual void doWork();
	/// 关闭 network. Not thread-safe; to be called only by worker.
	virtual void doneWorking();
	/// 获取或者创建节点keypair
	static KeyPair networkAlias(bytesConstRef _b);
    // 返回节点是否是必要的节点之一
	bool isRequiredPeer(NodeID const&) const;
	bool nodeTableHasNode(Public const& _id) const;
	Node nodeFromNodeTable(Public const& _id) const;
	bool addNodeToNodeTable(Node const& _node, NodeTable::NodeRelation _relation = NodeTable::NodeRelation::Unknown);
	bytes m_restoreNetwork;  // 通过构造函数设置,用于设置host的key和重新载入网络中的peer和node信息
	std::atomic<bool> m_run{false};	// 网络是否正在运行
	std::string m_clientVersion; // 版本信息
	NetworkPreferences m_netPrefs; // 网络设置
	/// Interface addresses (private, public)
	std::set<bi::address> m_ifAddresses; // 接口地址
	std::atomic<int> m_listenPort{-1};// 监听端口,-1表示绑定失败或者acceptor没有初始化					
	ba::io_service m_ioService; ///< IOService for network stuff.
	bi::tcp::acceptor m_tcp4Acceptor; // 监听的acceptor
	std::unique_ptr<boost::asio::deadline_timer> m_timer;	//定时器
	mutable std::mutex x_runTimer;	///< Start/stop mutex.
	static const unsigned c_timerInterval = 100; // 定时器间隔
	std::condition_variable m_timerReset;
	std::set<Peer*> m_pendingPeerConns;	// 只会在connect函数中被使用,用于现在去连接同一个节点,注意这里使用的是裸指针
	bi::tcp::endpoint m_tcpPublic;	//< Our public listening endpoint.
	KeyPair m_alias;														
	std::shared_ptr<NodeTable> m_nodeTable;	// node table,用于节点发现
	mutable std::mutex x_nodeTable;
	std::shared_ptr<NodeTable> nodeTable() const { Guard l(x_nodeTable); return m_nodeTable; }
	// Peer结构的共享存储,Peers会被Host根据需要创建或释放,每一个活跃的session都会包含一个peer的shared_ptr
	std::unordered_map<NodeID, std::shared_ptr<Peer>> m_peers;
    /// Peers we try to connect regardless of p2p network.
	std::set<NodeID> m_requiredPeers;
	mutable Mutex x_requiredPeers;

	/// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run())
	/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
	mutable std::unordered_map<NodeID, std::weak_ptr<SessionFace>> m_sessions;
	mutable RecursiveMutex x_sessions;
	std::list<std::weak_ptr<RLPXHandshake>> m_connecting;//< Pending connections.
	Mutex x_connecting;					///< Mutex for m_connecting.
	unsigned m_idealPeerCount = 11;	// 主动去连的peer理想数目
	unsigned m_stretchPeers = 7;	// 如果是接收的话,可连接的peer数目是主动连接的若干倍
	std::map<CapDesc, std::shared_ptr<HostCapabilityFace>> m_capabilities; //< Each of the capabilities we support.
	/// Deadline timers used for isolated network events. GC'd by run.
	std::list<std::shared_ptr<boost::asio::deadline_timer>> m_timers;
	Mutex x_timers;
	std::chrono::steady_clock::time_point m_lastPing; //< Time we sent the last ping to all peers.
	bool m_accepting = false;
	bool m_dropPeers = false;
	ReputationManager m_repMan;
    Logger m_logger{createLogger(VerbosityDebug, "net")};
};

从Host的定义来看,首先host继承与Worker,说明host是一个独立的线程来运行,实际上p2p作为系统底层的一个独立部分,一般必须是独立线程,避免来自于其他组件的干扰,另外p2p系统底层有频繁的网络交互、节点退出等情况,但基本上都是io密集型操作,不能够被cpu密集型操作阻塞。

另一方面,从代码上看,Host的成员函数和变量很多,我们这里如果全部讲述篇幅会显得过大,因此主要重点关注两个功能,一个是节点的连接管理(peer相关),另一方面是功能说明(capability),其他的一些辅助函数也是围绕这两个部分进行的。首先看下Host的构造函数

Host::Host(string const& _clientVersion, KeyPair const& _alias, NetworkPreferences const& _n):
    Worker("p2p", 0),
    m_clientVersion(_clientVersion),
    m_netPrefs(_n),
    m_ifAddresses(Network::getInterfaceAddresses()),
    m_ioService(2),
    m_tcp4Acceptor(m_ioService),
    m_alias(_alias),
    m_lastPing(chrono::steady_clock::time_point::min())
{
    cnetnote << "Id: " << id();
}

Host::Host(string const& _clientVersion, NetworkPreferences const& _n, bytesConstRef _restoreNetwork):
    Host(_clientVersion, networkAlias(_restoreNetwork), _n)
{
    m_restoreNetwork = _restoreNetwork.toBytes();
}

构造函数这个没啥说的,就是初始化配置了网络还有节点等信息,因为这个是独立线程,再来看下线程启动的过程

void Host::start()
{
    DEV_TIMED_FUNCTION_ABOVE(500);
    startWorking();  // 还记得worker的这个函数额
    while (isWorking() && !haveNetwork())
        this_thread::sleep_for(chrono::milliseconds(10));  //等待线程初始化完成
    
    // network start failed!
    if (isWorking())
        return;  

    cwarn << "Network start failed!";
    doneWorking();  //网络结束
} 

大家可以回过头去看看worker的类定义,有几个函数是虚函数,startWorking就是一个,这个里面是用来初始化线程的,也就是这里面开始工作线程运行,Host继承了Worker的startWorking函数,如果大家还记得的话,不记得也没有关系,可以翻开下前面的文章,可以看到startWorking里面,主要一次调用了三个函数,分别是startedWorking,workLoop, doneWorking,而workLoop里面调用的就是doWork函数,下面来看下这几个线程流程相关的函数

void Host::startedWorking()
{
    asserts(!m_timer);  // 定时器应该不存在

    {
        // 这里面加锁,是为了防止m_run被设置成true的同时在stop里面被设置成false
        // 在m_timer设置之前不要释放锁,这是因为同时stop的时候会等待m_timer,并且优雅的网络退出
        Guard l(x_runTimer);
        // create deadline timer
        m_timer.reset(new boost::asio::deadline_timer(m_ioService));
        m_run = true;
    }
    // 启动capability线程,等待接入的连接
    for (auto const& h: m_capabilities)  // 构造函数中没有,说明这些是在start之前设置
        h.second->onStarting();
    // 尝试开启acceptor (ipv6待完成)
    int port = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs);
    if (port > 0)
    {
        m_listenPort = port;
        determinePublic();
        runAcceptor();  // 对acceptor的回调设置,后面会专门介绍
    }
    else
        LOG(m_logger) << "p2p.start.notice id: " << id() << " TCP Listen port is invalid or unavailable.";  //不退出吗?只连别人?

    auto nodeTable = make_shared<NodeTable>(
        m_ioService,
        m_alias,
        NodeIPEndpoint(bi::address::from_string(listenAddress()), listenPort(), listenPort()),
        m_netPrefs.discovery
    ); //在新启动的线程中,执行nodeTable的更新工作,nodeTable没有自己的独立线程,udp
    nodeTable->setEventHandler(new HostNodeTableHandler(*this)); // 设置句柄
    DEV_GUARDED(x_nodeTable)
        m_nodeTable = nodeTable;  // 这里有点奇怪,为啥不直接赋值给m_nodeTable,我看了下有一个nodeTable函数,难道这个还会被别的线程调用?
    restoreNetwork(&m_restoreNetwork);  // 载入网络数据

    LOG(m_logger) << "p2p.started id: " << id();

    run(boost::system::error_code());  //主流程启动
}

这个函数是doWork之前的准备工作,从这个函数里面可以看到主要是定义了定时器,设置了运行状态m_run,配置了监听acceptor,启动了nodeTable来开始节点发现,已经run,配置主流程,注意,这里面都只是准备,但是io_service还没有run起来,实际上都没有开始工作,来看看run函数

void Host::run(boost::system::error_code const&)
{
    if (!m_run)  // 被设置了stop
    {
        // reset NodeTable
        DEV_GUARDED(x_nodeTable)
            m_nodeTable.reset();
        // 停止io_service允许运行的网络操作停止,并且停止阻塞的worker线程,允许工作线程退出
        // 重制定时器告知网络,没有任何事情再被调度运行了
        DEV_GUARDED(x_runTimer)
            m_timer.reset();
        m_timerReset.notify_all(); // stop的时候会阻塞,这里通知完线程退出
        return;
    }
    // 这里面说明了为啥需要加锁,这是谨慎的写法,防止的是外界对nodetable有操作,实际上
    // 很多变量只会在worker线程里面调用
    if (auto nodeTable = this->nodeTable()) // This again requires x_nodeTable, which is why an additional variable nodeTable is used.
        nodeTable->processEvents();  // 定期处理事件,add Node, del Node
    // 清理僵尸
    DEV_GUARDED(x_connecting)
        m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.expired(); });  // handshake一直未完成的,不然会导致内存一直在增长
    DEV_GUARDED(x_timers)
        m_timers.remove_if([](std::shared_ptr<boost::asio::deadline_timer> t)
        {
            return t->expires_from_now().total_milliseconds() < 0;
        });

    //与节点保活
    keepAlivePeers();
    
    // At this time peers will be disconnected based on natural TCP timeout.
    // disconnectLatePeers needs to be updated for the assumption that Session
    // is always live and to ensure reputation and fallback timers are properly
    // updated. // disconnectLatePeers();

    // todo: update peerSlotsAvailable()
    // 查看需要连接peer
    list<shared_ptr<Peer>> toConnect;
    unsigned reqConn = 0;
    {
        RecursiveGuard l(x_sessions);
        for (auto const& p: m_peers)
        {
            bool haveSession = havePeerSession(p.second->id);
            bool required = p.second->peerType == PeerType::Required;
            if (haveSession && required)
                reqConn++;  //已有session且为必须的节点数 ++
            else if (!haveSession && p.second->shouldReconnect() && (!m_netPrefs.pin || required))
                toConnect.push_back(p.second);
        }
    } 
    // 待连接的peer,去连接
    for (auto p: toConnect)
        if (p->peerType == PeerType::Required && reqConn++ < m_idealPeerCount)
            connect(p);
    
    if (!m_netPrefs.pin)  // 如果没有说只连require,可信的peer
    {
        unsigned const maxSlots = m_idealPeerCount + reqConn;
        unsigned occupiedSlots = peerCount() + m_pendingPeerConns.size();
        for (auto peerToConnect = toConnect.cbegin();
             occupiedSlots <= maxSlots && peerToConnect != toConnect.cend(); ++peerToConnect)
        {
            if ((*peerToConnect)->peerType == PeerType::Optional)
            {
                connect(*peerToConnect);
                ++occupiedSlots;
            }
        }
    }

    auto runcb = [this](boost::system::error_code const& error) { run(error); };
    m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
    m_timer->async_wait(runcb);  // 设置定时器间隔,下一次继续调run,进行这类检查
}

这个也是p2p管理中比较统一的操作,一般p2p系统管理会有若干个定时器,这个定时器需要处理保活和重连的操作,这里也不例外,这时候仍然要注意,这个函数也只是表明了需要做的操作,但是仍然没有开始,必须等到io_service运行起来,这块就是在doWork中

void Host::doWork()
{
    try
    {
        if (m_run)
            m_ioService.run();
    }
    catch (std::exception const& _e)
    {
        cwarn << "Exception in Network Thread: " << _e.what();
        cwarn << "Network Restart is Recommended.";
    }
}

这时候线程就会阻塞在这个函数里面,前面说到的网络、节点发现还有定时器都开始了运行。前面介绍过程中提到了runAcceptor,这里对函数进行下说明,p2p节点连接无非是主动和被动两种方式,acceptor就是被动等待其他节点,主动连接部分我们后面介绍

void Host::runAcceptor()
{
    assert(m_listenPort > 0); // 这个是要来判断监听是否成功的
    if (m_run && !m_accepting) // 线程在运行,且不处于accept的状态
    {
        cnetdetails << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
        m_accepting = true; // 修改状态,避免误调用,其实可以不需要判断
        auto socket = make_shared<RLPXSocket>(m_ioService); // 构造socket,监听到新连接的
        m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec) // 异步accept
        {
            m_accepting = false;  // 不需要担心是否有多线程问题
            if (ec || !m_run)
            {
                socket->close();  // 如果发生错误,或者p2p已经被stop,那么关闭socket
                return;
            }
            if (peerCount() > peerSlots(Ingress)) // 现在被动连接的peer数已经超过了设置的最大值,那么要拒绝新连接
            {
                cnetdetails << "Dropping incoming connect due to maximum peer count (" << Ingress << " * ideal peer count): " << socket->remoteEndpoint();
                socket->close();
                if (ec.value() < 1)
                    runAcceptor(); // 没有发生错误的话,还是要继续监听的,这是异步调用中必须要有的
                return;
            }
            
            bool success = false;
            try
            {
                // 被动连接,还不知道nodeid
                // 被动连接和主动连接的构造函数RLPXHandshake是不一样的,这一点前面说明过
                auto handshake = make_shared<RLPXHandshake>(this, socket);
                m_connecting.push_back(handshake);
                handshake->start(); // 启动握手,握手成功后就会调用前面提到startPeerSession函数回调了
                success = true;
            }
            catch (Exception const& _e)
            {
                cwarn << "ERROR: " << diagnostic_information(_e);
            }
            catch (std::exception const& _e)
            {
                cwarn << "ERROR: " << _e.what();
            }

            if (!success)
                socket->ref().close(); // 出错了要关闭调该socket
            runAcceptor(); // 继续下一次监听
        });
    }
}

这就是被动连接的过程,再来看看主动连接,主动连接是由nodeTable来触发的,当nodeTable发生了节点变化事件时候,都会将事件放到handle中,然后我们可以看到run函数中,有一个nodeTable->processEvents()语句,这里面就是来处理节点变化的事件的,最终这些事件的处理都落到了host的onNodeTableEvent函数里面

void Host::onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e)
{
    if (_e == NodeEntryAdded)
    {
        LOG(m_logger) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
        if (Node n = nodeFromNodeTable(_n))  //获取node信息
        {
            shared_ptr<Peer> p;
            DEV_RECURSIVE_GUARDED(x_sessions)
            {
                if (m_peers.count(_n))  //这个node id已经存在
                {
                    p = m_peers[_n];
                    p->endpoint = n.endpoint; //替换端点信息
                }
                else
                {
                    p = make_shared<Peer>(n);  //生成peer
                    m_peers[_n] = p;
                    LOG(m_logger) << "p2p.host.peers.events.peerAdded " << _n << " " << p->endpoint;
                }
            }
            if (peerSlotsAvailable(Egress))  //查看主动去连的空位是否还有
                connect(p);  //发起连接
        }
    }
    else if (_e == NodeEntryDropped)
    {
        LOG(m_logger) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
        RecursiveGuard l(x_sessions);
        if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
            m_peers.erase(_n);
    }
}

可以看出,peer不一定代表连接成功,只有session才表示真正建立的连接,主动发起的连接都放到了connect函数

void Host::connect(std::shared_ptr<Peer> const& _p)
{
    if (!m_run) // 线程没有跑起来的话,那就不尽兴
        return;   
    if (havePeerSession(_p->id)) //判断是否已经建立过连接,session是否存在
    {
        cnetdetails << "Aborted connect. Node already connected.";
        return;
    }
    if (!nodeTableHasNode(_p->id) && _p->peerType == PeerType::Optional) //如果该node在node table中不存在,且不是必须要连接的,跳过
        return;
    // prevent concurrently connecting to a node
    Peer *nptr = _p.get();
    if (m_pendingPeerConns.count(nptr))  //这里用的是裸指针
        return;
    m_pendingPeerConns.insert(nptr);  //放到等待连接中去
    _p->m_lastAttempted = std::chrono::system_clock::now();  //最近一次尝试连接的时间
    bi::tcp::endpoint ep(_p->endpoint);
    cnetdetails << "Attempting connection to node " << _p->id << "@" << ep << " from " << id();
    auto socket = make_shared<RLPXSocket>(m_ioService);  //创建RLPXSocket,实际上就是在socket上面封装了一层
    socket->ref().async_connect(ep, [=](boost::system::error_code const& ec) //异步连接
    {
        _p->m_lastAttempted = std::chrono::system_clock::now(); // 更新
        _p->m_failedAttempts++; // 为啥这个也要加一呢?
        
        if (ec)
        {
            cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " ("
                        << ec.message() << ")";
            // Manually set error (session not present)
            _p->m_lastDisconnect = TCPError;
        }
        else
        {
            cnetdetails << "Connecting to " << _p->id << "@" << ep;
            auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id); //连接成功,下面进行握手,注意第一个this,就是后面的回调
            {
                Guard l(x_connecting);
                m_connecting.push_back(handshake);  //放到connecting中去
            }

            handshake->start(); //调用握手的start函数
        }
        
        m_pendingPeerConns.erase(nptr);  //连接成功,将其从pending中删除
    });
}

host的部分就介绍到这里,主要流程都已经说明了,还有一些比如重点关注状况好的节点,节点区分是必须和可选的,种子节点如何设置以及一些网络存储等操作均在host有所提及,这里就不赘述了。

二、Capability

这一小节来介绍下基于p2p系统之上的逻辑抽象,以太坊基于p2p无非是做一些同步和转发等功能,这里面的设计是将这个业务与p2p不要建立强耦合,而是抽象成Capability,开发人员可以基于需要开发不同的Capability,那么这些Capability怎么和p2p系统结合的呢,首先看下关于Capability的两个基础类

class Capability: public std::enable_shared_from_this<Capability>
{
    friend class Session;

public:
    Capability(std::shared_ptr<SessionFace> _s, HostCapabilityFace* _h, unsigned _idOffset);
    virtual ~Capability() {}
    // Implement these in the derived class.
/*  static std::string name() { return ""; }  // 名称
    static u256 version() { return 0; }       // 版本
    static unsigned messageCount() { return 0; }   // 消息数量,支持的消息个数
*/
protected:
    std::shared_ptr<SessionFace> session() const { return m_session.lock(); }  // 这个功能对应的session,可以看出Capability是跟session相关的
    HostCapabilityFace* hostCapability() const { return m_hostCap; } // 返回节点Capability,后面可以看到HostCapability是与Capability
    virtual bool interpret(unsigned _id, RLP const&) = 0; // 这个函数解析rlp,并找到对应的消息处理函数
    virtual void onDisconnect() {} // 断开后的回调
    void disable(std::string const& _problem); // 停止功能
    RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0);
    void sealAndSend(RLPStream& _s); // 发送
    void addRating(int _r);
private:
    std::weak_ptr<SessionFace> m_session;
    HostCapabilityFace* m_hostCap;
    bool m_enabled = true;
    unsigned m_idOffset;  // 偏移,跟session有关
};

从这个定义可以看出,Capability是指系统支持的某项能力,但是这个能力是与session相关的,在后面可以看到,不同的session可能支持的功能不完全一致,因此每个Capability会有一个offset,这个offset也是跟session相关的,另外我们还可以回过头看看session里面的定义,其中定义了一个m_capabilities来保存了每一个session所支持的所有能力。

class HostCapabilityFace
{
	friend class Host;
	template <class T> friend class HostCapability;
	friend class Capability;
	friend class Session;

public:
	HostCapabilityFace() {}
	virtual ~HostCapabilityFace() {}
	Host* host() const { return m_host; }  // 获取host
	std::vector<std::pair<std::shared_ptr<SessionFace>, std::shared_ptr<Peer>>> peerSessions() const;  // 返回所有支持当前版本功能的所有session信息
	std::vector<std::pair<std::shared_ptr<SessionFace>, std::shared_ptr<Peer>>> peerSessions(u256 const& _version) const; // 返回所有某个版本功能的所有session信息
protected:
	virtual std::string name() const = 0;
	virtual u256 version() const = 0;
	CapDesc capDesc() const { return std::make_pair(name(), version()); } // 功能描述
	virtual unsigned messageCount() const = 0;
	virtual std::shared_ptr<Capability> newPeerCapability(std::shared_ptr<SessionFace> const& _s, unsigned _idOffset, CapDesc const& _cap) = 0; // 为某个session生成一个Capability,还记得说过Capability和sesion有关么?
	virtual void onStarting() {} // 开始回调
	virtual void onStopping() {} // 停止回调

private:
	Host* m_host = nullptr;
};

template<class PeerCap> // 注意这个模板,说明了HostCapability没干别的,基于某个PeerCap的封装
class HostCapability: public HostCapabilityFace
{
public:
	HostCapability() {}
	virtual ~HostCapability() {}
	static std::string staticName() { return PeerCap::name(); }
	static u256 staticVersion() { return PeerCap::version(); }
	static unsigned staticMessageCount() { return PeerCap::messageCount(); }

protected:
	virtual std::string name() const { return PeerCap::name(); }
	virtual u256 version() const { return PeerCap::version(); }
	virtual unsigned messageCount() const { return PeerCap::messageCount(); }

	virtual std::shared_ptr<Capability> newPeerCapability(std::shared_ptr<SessionFace> const& _s, unsigned _idOffset, CapDesc const& _cap)
	{
		auto p = std::make_shared<PeerCap>(_s, this, _idOffset, _cap); // new一个Capability
		_s->registerCapability(_cap, p); // 将新的capacibity加到session的m_capabilities里面去
		return p;
	}
};

可以看到HostCapability实际上是对Capability做了一层封装,使得HostCapability是和Host相关联,当然目前来看,host保存HostCapability的唯一功能就是根据双方通信结果为每个session创建Capability而已,这就是两个基础类,实际上运用的时候,都是需要写具体的功能子类继承他们,在目前的以太坊源码中,分别是EthereumPeer.h和EthereumHost.h两个文件所定义,大家需要有兴趣可以翻开这两个文件看下,这里面就有关于同步、传递block、传递transaction、同步状态等功能的具体体现。

接下来我们看下capability和session是如何关联上的以及消息接受如何回调相应处理函数的整个流程

1、互通有无

在握手完成阶段,每个节点都会告诉对方,目前自己支持的capability的CapDesc,这个步骤在RLPxHandshake.cpp中transition函数中

   else if (m_nextState == WriteHello)
    {
        m_nextState = ReadHello;
        m_io.reset(new RLPXFrameCoder(*this));
        RLPStream s;
        s.append((unsigned)HelloPacket).appendList(5)
            << dev::p2p::c_protocolVersion
            << m_host->m_clientVersion
            << m_host->caps()  // 将当前host支持的功能集合
            << m_host->listenPort()
            << m_host->id();
        bytes packet;
        s.swapOut(packet);
        m_io->writeSingleFramePacket(&packet, m_handshakeOutBuffer);
        ba::async_write(m_socket->ref(), ba::buffer(m_handshakeOutBuffer), [this, self](boost::system::error_code ec, std::size_t)
        {
            transition(ec);
        });
    }

2、求同去异

当收到hello消息后,就会回调host的startPeerSession函数了,在这个函数里面首先要进行去重,只留下双方共有的

    // leave only highset mutually supported capability version
    // 如果当前host不支持,或者有重复,清理
    caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());

    for (auto cap: caps)
        capslog << "(" << cap.first << "," << dec << cap.second << ")";

这里面一个是把对方有但是自己没有的capability删掉,另一个是如果双方都有,但是出现同一个功能两个版本的,只保留最新版本

3、给session赋能

现在只剩下双方都有的capability了,那么就把这些capablity注册到各自的session里面去,这部分仍然是在startPeerSession中

        // todo: mutex Session::m_capabilities and move for(:caps) out of mutex.
        for (auto const& i: caps)
        {
            auto pcap = m_capabilities[i];
            if (!pcap) // 这个判断很奇怪,上面已经进行去了整理,只保留了双方都支持的cap
                return ps->disconnect(IncompatibleProtocol);

            pcap->newPeerCapability(ps, offset, i); //
            offset += pcap->messageCount(); //消息的数量
        }

这里面的offset我有个疑惑,怎么保证两个节点的offset完全一致的呢?难道不会出现两个节点版本不一致,导致传过来的caps顺序不一样么?如果有朋友搞清楚了别的逻辑,还请赐教!

4、capability响应

当创建完session之后,与peer的session收发就与host没有关系了,当session收到消息后,会将消息转发给capability中,这部分在session.cpp的readPacket函数中

//读取数据包
bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
{
    m_lastReceived = chrono::steady_clock::now();
    clog(VerbosityTrace, "net") << "-> " << _t << " " << _r;
    try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
    {
        // v4 frame headers are useless, offset packet type used
        // v5 protocol type is in header, packet type not offset
        if (_capId == 0 && _t < UserPacket)  //如果 _t < UserPacket,说明是底层通信数据包,比如ping pong之类,具体类型定义在Common.h文件
            return interpret(_t, _r);

        // 消息转发在这里面!!!!!!!!!!
        for (auto const& i: m_capabilities):
            if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
                return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;

        return false;  //如果未找到相应的capability,返回错误
    }
    catch (std::exception const& _e)
    {
        cnetlog << "Exception caught in p2p::Session::interpret(): " << _e.what()
                << ". PacketType: " << _t << ". RLP: " << _r;
        disconnect(BadProtocol);
        return true;
    }
    return true;
}

这个函数里,会根据packetType,根据offset找到这个packetType所处于的capability,然后调用这个capability的interpret函数来进一步处理,大家可以看看EthereumPeer这个函数多了解一些。

 

好了,到这里以太坊c++版本的p2p部分全部介绍完了,比我之前预想的要晚了好久,差点把这个主题最开始立下自己要坚持的flag给破坏了,后面介绍别的部分会尽量跟上节奏;这部分介绍的主要是p2p的内部运转逻辑,不涉及到具体的业务功能实现,所以看起来跟区块链关系不大,后面再往上层的逻辑就会涉及到区块、交易等相关信息,就不放在p2p部分来说明了,而是放到后面别的部分来介绍。

接下来我会再写一篇关于p2p的总结性文章,作为p2p部分的收篇,这是因为部门研发的联盟链底层也会需要p2p系统,但是对p2p系统的设计有很大不一样,我会做一些说明,未完待续!!