以太坊源码分析之 P2P网络(六、p2p连接控制与消息处理(下))
区块链特辑 :https://blog.csdn.net/fusan2004/article/details/80879343,欢迎查阅,原创作品,转载请标明!
这是p2p网络系列文章的最后一篇,前面很多篇主要都是在描述p2p的底层实现,那么p2p在整个系统中处于什么位置和提供什么功能,都将会在本篇进行一部分的总结,首先我们看下host相关代码,来了解p2p的运行模式,然后再去了解capacity的处理逻辑。
一、Host
先来看下host.h文件,对Host建立一个初步的印象。
class Host: public Worker
{
friend class HostNodeTableHandler;
friend class RLPXHandshake;
friend class Session;
friend class HostCapabilityFace;
public:
// 启动服务端,在给定端口进行监听
Host(
std::string const& _clientVersion,
NetworkPreferences const& _n = NetworkPreferences(),
bytesConstRef _restoreNetwork = bytesConstRef()
);
// 可选的构造函数,运行直接提供节点**,不需要重新载入网络信息
Host(
std::string const& _clientVersion,
KeyPair const& _alias,
NetworkPreferences const& _n = NetworkPreferences()
);
// 将会在网络处理事件时阻塞
virtual ~Host();
// 默认host,大家有印象的话就记得nodetable进行节点发现的时候需要预先设定几个节点来获取信息
static std::unordered_map<Public, std::string> pocHosts();
// 注册一个peer-capability,所有新建立的连接都将会拥有该capability
template <class T> std::shared_ptr<T> registerCapability(std::shared_ptr<T> const& _t) { _t->m_host = this; m_capabilities[std::make_pair(T::staticName(), T::staticVersion())] = _t; return _t; }
// 后面会分析这两个函数的区别
template <class T> void addCapability(std::shared_ptr<T> const & _p, std::string const& _name, u256 const& _version) { m_capabilities[std::make_pair(_name, _version)] = _p; }
//当前节点是否包含某项功能
bool haveCapability(CapDesc const& _name) const { return m_capabilities.count(_name) != 0; }
// 返回所有capability的desc
CapDescs caps() const { CapDescs ret; for (auto const& i: m_capabilities) ret.push_back(i.first); return ret; }
template <class T> std::shared_ptr<T> cap() const { try { return std::static_pointer_cast<T>(m_capabilities.at(std::make_pair(T::staticName(), T::staticVersion()))); } catch (...) { return nullptr; } }
// 增加一个潜在的peer
void addPeer(NodeSpec const& _s, PeerType _t);
// 添加节点作为一个peer的候选,当节点发现到这个节点并且还有剩余额度的时候该节点被添加
void addNode(NodeID const& _node, NodeIPEndpoint const& _endpoint);
// 创建peer,并尝试保持该peer连接
void requirePeer(NodeID const& _node, NodeIPEndpoint const& _endpoint);
void requirePeer(NodeID const& _node, bi::address const& _addr, unsigned short _udpPort, unsigned short _tcpPort) { requirePeer(_node, NodeIPEndpoint(_addr, _udpPort, _tcpPort)); }
// 标记peer不再被需要
void relinquishPeer(NodeID const& _node);
// 设置peer的理想数量
void setIdealPeerCount(unsigned _n) { m_idealPeerCount = _n; }
// 设置最大接受连接数的乘法因子
void setPeerStretch(unsigned _n) { m_stretchPeers = _n; }
// 获取peer信息
PeerSessionInfos peerSessionInfo() const;
// 获取已经连接的peer数
size_t peerCount() const;
// 获取我们正在监听的地址
std::string listenAddress() const { return m_tcpPublic.address().is_unspecified() ? "0.0.0.0" : m_tcpPublic.address().to_string(); }
// 获取我们正在监听的端口
unsigned short listenPort() const { return std::max(0, m_listenPort.load()); }
// 序列化已知peer的set
bytes saveNetwork() const;
// TODO: P2P this should be combined with peers into a HostStat object of some kind; coalesce data, as it's only used for status information.
Peers getPeers() const { RecursiveGuard l(x_sessions); Peers ret; for (auto const& i: m_peers) ret.push_back(*i.second); return ret; }
// 网络设置
NetworkPreferences const& networkPreferences() const { return m_netPrefs; }
// 保存网络设置
void setNetworkPreferences(NetworkPreferences const& _p, bool _dropPeers = false) { m_dropPeers = _dropPeers; auto had = isStarted(); if (had) stop(); m_netPrefs = _p; if (had) start(); }
// 启动网络,
void start();
// 停止网络
void stop();
// 返回网络是否start
bool isStarted() const { return isWorking(); }
/// @returns our reputation manager.
ReputationManager& repMan() { return m_repMan; }
// 如果网络以及启动,并且具有交互性
bool haveNetwork() const { Guard l(x_runTimer); Guard ll(x_nodeTable); return m_run && !!m_nodeTable; }
// 确认并开启网络连接
void startPeerSession(Public const& _id, RLP const& _hello, std::unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s);
// 根据id来获取session
std::shared_ptr<SessionFace> peerSession(NodeID const& _id) { RecursiveGuard l(x_sessions); return m_sessions.count(_id) ? m_sessions[_id].lock() : std::shared_ptr<SessionFace>(); }
// 获取节点id
NodeID id() const { return m_alias.pub(); }
// 获取tcp端口的公开地址
bi::tcp::endpoint const& tcpPublic() const { return m_tcpPublic; }
/// Get the public endpoint information.
std::string enode() const { return "enode://" + id().hex() + "@" + (networkPreferences().publicIPAddress.empty() ? m_tcpPublic.address().to_string() : networkPreferences().publicIPAddress) + ":" + toString(m_tcpPublic.port()); }
// 获取节点信息
p2p::NodeInfo nodeInfo() const { return NodeInfo(id(), (networkPreferences().publicIPAddress.empty() ? m_tcpPublic.address().to_string() : networkPreferences().publicIPAddress), m_tcpPublic.port(), m_clientVersion); }
protected:
// 响应nodetable事件
void onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e);
/// Deserialise the data and populate the set of known peers.
void restoreNetwork(bytesConstRef _b);
private:
enum PeerSlotType { Egress, Ingress };
// 不同类型的peer连接数量限制,主动连接少点,被动连接多一点
unsigned peerSlots(PeerSlotType _type) { return _type == Egress ? m_idealPeerCount : m_idealPeerCount * m_stretchPeers; }
// 判断是否与某个node建立了连接
bool havePeerSession(NodeID const& _id) { return !!peerSession(_id); }
// 确定并设置公共地址
void determinePublic();
// 主动连接peer
void connect(std::shared_ptr<Peer> const& _p);
// 如果等待和已连接的peer数量小于最大值返回true
bool peerSlotsAvailable(PeerSlotType _type = Ingress);
// ping已连接的peer,来更新连接信息,并在某些超时的peer断开
void keepAlivePeers();
// 断开在c_keepAliveTimeOut之前没有回复keepAlivePeers ping的peers
void disconnectLatePeers();
// 只会在startedWorking里面调用
void runAcceptor();
// 被worker调用
virtual void startedWorking();
/// Called by startedWorking. Not thread-safe; to be called only be Worker.
void run(boost::system::error_code const& error); ///< Run network. Called serially via ASIO deadline timer. Manages connection state transitions.
// 运行network. Not thread-safe; to be called only by worker.
virtual void doWork();
/// 关闭 network. Not thread-safe; to be called only by worker.
virtual void doneWorking();
/// 获取或者创建节点keypair
static KeyPair networkAlias(bytesConstRef _b);
// 返回节点是否是必要的节点之一
bool isRequiredPeer(NodeID const&) const;
bool nodeTableHasNode(Public const& _id) const;
Node nodeFromNodeTable(Public const& _id) const;
bool addNodeToNodeTable(Node const& _node, NodeTable::NodeRelation _relation = NodeTable::NodeRelation::Unknown);
bytes m_restoreNetwork; // 通过构造函数设置,用于设置host的key和重新载入网络中的peer和node信息
std::atomic<bool> m_run{false}; // 网络是否正在运行
std::string m_clientVersion; // 版本信息
NetworkPreferences m_netPrefs; // 网络设置
/// Interface addresses (private, public)
std::set<bi::address> m_ifAddresses; // 接口地址
std::atomic<int> m_listenPort{-1};// 监听端口,-1表示绑定失败或者acceptor没有初始化
ba::io_service m_ioService; ///< IOService for network stuff.
bi::tcp::acceptor m_tcp4Acceptor; // 监听的acceptor
std::unique_ptr<boost::asio::deadline_timer> m_timer; //定时器
mutable std::mutex x_runTimer; ///< Start/stop mutex.
static const unsigned c_timerInterval = 100; // 定时器间隔
std::condition_variable m_timerReset;
std::set<Peer*> m_pendingPeerConns; // 只会在connect函数中被使用,用于现在去连接同一个节点,注意这里使用的是裸指针
bi::tcp::endpoint m_tcpPublic; //< Our public listening endpoint.
KeyPair m_alias;
std::shared_ptr<NodeTable> m_nodeTable; // node table,用于节点发现
mutable std::mutex x_nodeTable;
std::shared_ptr<NodeTable> nodeTable() const { Guard l(x_nodeTable); return m_nodeTable; }
// Peer结构的共享存储,Peers会被Host根据需要创建或释放,每一个活跃的session都会包含一个peer的shared_ptr
std::unordered_map<NodeID, std::shared_ptr<Peer>> m_peers;
/// Peers we try to connect regardless of p2p network.
std::set<NodeID> m_requiredPeers;
mutable Mutex x_requiredPeers;
/// The nodes to which we are currently connected. Used by host to service peer requests and keepAlivePeers and for shutdown. (see run())
/// Mutable because we flush zombie entries (null-weakptrs) as regular maintenance from a const method.
mutable std::unordered_map<NodeID, std::weak_ptr<SessionFace>> m_sessions;
mutable RecursiveMutex x_sessions;
std::list<std::weak_ptr<RLPXHandshake>> m_connecting;//< Pending connections.
Mutex x_connecting; ///< Mutex for m_connecting.
unsigned m_idealPeerCount = 11; // 主动去连的peer理想数目
unsigned m_stretchPeers = 7; // 如果是接收的话,可连接的peer数目是主动连接的若干倍
std::map<CapDesc, std::shared_ptr<HostCapabilityFace>> m_capabilities; //< Each of the capabilities we support.
/// Deadline timers used for isolated network events. GC'd by run.
std::list<std::shared_ptr<boost::asio::deadline_timer>> m_timers;
Mutex x_timers;
std::chrono::steady_clock::time_point m_lastPing; //< Time we sent the last ping to all peers.
bool m_accepting = false;
bool m_dropPeers = false;
ReputationManager m_repMan;
Logger m_logger{createLogger(VerbosityDebug, "net")};
};
从Host的定义来看,首先host继承与Worker,说明host是一个独立的线程来运行,实际上p2p作为系统底层的一个独立部分,一般必须是独立线程,避免来自于其他组件的干扰,另外p2p系统底层有频繁的网络交互、节点退出等情况,但基本上都是io密集型操作,不能够被cpu密集型操作阻塞。
另一方面,从代码上看,Host的成员函数和变量很多,我们这里如果全部讲述篇幅会显得过大,因此主要重点关注两个功能,一个是节点的连接管理(peer相关),另一方面是功能说明(capability),其他的一些辅助函数也是围绕这两个部分进行的。首先看下Host的构造函数
Host::Host(string const& _clientVersion, KeyPair const& _alias, NetworkPreferences const& _n):
Worker("p2p", 0),
m_clientVersion(_clientVersion),
m_netPrefs(_n),
m_ifAddresses(Network::getInterfaceAddresses()),
m_ioService(2),
m_tcp4Acceptor(m_ioService),
m_alias(_alias),
m_lastPing(chrono::steady_clock::time_point::min())
{
cnetnote << "Id: " << id();
}
Host::Host(string const& _clientVersion, NetworkPreferences const& _n, bytesConstRef _restoreNetwork):
Host(_clientVersion, networkAlias(_restoreNetwork), _n)
{
m_restoreNetwork = _restoreNetwork.toBytes();
}
构造函数这个没啥说的,就是初始化配置了网络还有节点等信息,因为这个是独立线程,再来看下线程启动的过程
void Host::start()
{
DEV_TIMED_FUNCTION_ABOVE(500);
startWorking(); // 还记得worker的这个函数额
while (isWorking() && !haveNetwork())
this_thread::sleep_for(chrono::milliseconds(10)); //等待线程初始化完成
// network start failed!
if (isWorking())
return;
cwarn << "Network start failed!";
doneWorking(); //网络结束
}
大家可以回过头去看看worker的类定义,有几个函数是虚函数,startWorking就是一个,这个里面是用来初始化线程的,也就是这里面开始工作线程运行,Host继承了Worker的startWorking函数,如果大家还记得的话,不记得也没有关系,可以翻开下前面的文章,可以看到startWorking里面,主要一次调用了三个函数,分别是startedWorking,workLoop, doneWorking,而workLoop里面调用的就是doWork函数,下面来看下这几个线程流程相关的函数
void Host::startedWorking()
{
asserts(!m_timer); // 定时器应该不存在
{
// 这里面加锁,是为了防止m_run被设置成true的同时在stop里面被设置成false
// 在m_timer设置之前不要释放锁,这是因为同时stop的时候会等待m_timer,并且优雅的网络退出
Guard l(x_runTimer);
// create deadline timer
m_timer.reset(new boost::asio::deadline_timer(m_ioService));
m_run = true;
}
// 启动capability线程,等待接入的连接
for (auto const& h: m_capabilities) // 构造函数中没有,说明这些是在start之前设置
h.second->onStarting();
// 尝试开启acceptor (ipv6待完成)
int port = Network::tcp4Listen(m_tcp4Acceptor, m_netPrefs);
if (port > 0)
{
m_listenPort = port;
determinePublic();
runAcceptor(); // 对acceptor的回调设置,后面会专门介绍
}
else
LOG(m_logger) << "p2p.start.notice id: " << id() << " TCP Listen port is invalid or unavailable."; //不退出吗?只连别人?
auto nodeTable = make_shared<NodeTable>(
m_ioService,
m_alias,
NodeIPEndpoint(bi::address::from_string(listenAddress()), listenPort(), listenPort()),
m_netPrefs.discovery
); //在新启动的线程中,执行nodeTable的更新工作,nodeTable没有自己的独立线程,udp
nodeTable->setEventHandler(new HostNodeTableHandler(*this)); // 设置句柄
DEV_GUARDED(x_nodeTable)
m_nodeTable = nodeTable; // 这里有点奇怪,为啥不直接赋值给m_nodeTable,我看了下有一个nodeTable函数,难道这个还会被别的线程调用?
restoreNetwork(&m_restoreNetwork); // 载入网络数据
LOG(m_logger) << "p2p.started id: " << id();
run(boost::system::error_code()); //主流程启动
}
这个函数是doWork之前的准备工作,从这个函数里面可以看到主要是定义了定时器,设置了运行状态m_run,配置了监听acceptor,启动了nodeTable来开始节点发现,已经run,配置主流程,注意,这里面都只是准备,但是io_service还没有run起来,实际上都没有开始工作,来看看run函数
void Host::run(boost::system::error_code const&)
{
if (!m_run) // 被设置了stop
{
// reset NodeTable
DEV_GUARDED(x_nodeTable)
m_nodeTable.reset();
// 停止io_service允许运行的网络操作停止,并且停止阻塞的worker线程,允许工作线程退出
// 重制定时器告知网络,没有任何事情再被调度运行了
DEV_GUARDED(x_runTimer)
m_timer.reset();
m_timerReset.notify_all(); // stop的时候会阻塞,这里通知完线程退出
return;
}
// 这里面说明了为啥需要加锁,这是谨慎的写法,防止的是外界对nodetable有操作,实际上
// 很多变量只会在worker线程里面调用
if (auto nodeTable = this->nodeTable()) // This again requires x_nodeTable, which is why an additional variable nodeTable is used.
nodeTable->processEvents(); // 定期处理事件,add Node, del Node
// 清理僵尸
DEV_GUARDED(x_connecting)
m_connecting.remove_if([](std::weak_ptr<RLPXHandshake> h){ return h.expired(); }); // handshake一直未完成的,不然会导致内存一直在增长
DEV_GUARDED(x_timers)
m_timers.remove_if([](std::shared_ptr<boost::asio::deadline_timer> t)
{
return t->expires_from_now().total_milliseconds() < 0;
});
//与节点保活
keepAlivePeers();
// At this time peers will be disconnected based on natural TCP timeout.
// disconnectLatePeers needs to be updated for the assumption that Session
// is always live and to ensure reputation and fallback timers are properly
// updated. // disconnectLatePeers();
// todo: update peerSlotsAvailable()
// 查看需要连接peer
list<shared_ptr<Peer>> toConnect;
unsigned reqConn = 0;
{
RecursiveGuard l(x_sessions);
for (auto const& p: m_peers)
{
bool haveSession = havePeerSession(p.second->id);
bool required = p.second->peerType == PeerType::Required;
if (haveSession && required)
reqConn++; //已有session且为必须的节点数 ++
else if (!haveSession && p.second->shouldReconnect() && (!m_netPrefs.pin || required))
toConnect.push_back(p.second);
}
}
// 待连接的peer,去连接
for (auto p: toConnect)
if (p->peerType == PeerType::Required && reqConn++ < m_idealPeerCount)
connect(p);
if (!m_netPrefs.pin) // 如果没有说只连require,可信的peer
{
unsigned const maxSlots = m_idealPeerCount + reqConn;
unsigned occupiedSlots = peerCount() + m_pendingPeerConns.size();
for (auto peerToConnect = toConnect.cbegin();
occupiedSlots <= maxSlots && peerToConnect != toConnect.cend(); ++peerToConnect)
{
if ((*peerToConnect)->peerType == PeerType::Optional)
{
connect(*peerToConnect);
++occupiedSlots;
}
}
}
auto runcb = [this](boost::system::error_code const& error) { run(error); };
m_timer->expires_from_now(boost::posix_time::milliseconds(c_timerInterval));
m_timer->async_wait(runcb); // 设置定时器间隔,下一次继续调run,进行这类检查
}
这个也是p2p管理中比较统一的操作,一般p2p系统管理会有若干个定时器,这个定时器需要处理保活和重连的操作,这里也不例外,这时候仍然要注意,这个函数也只是表明了需要做的操作,但是仍然没有开始,必须等到io_service运行起来,这块就是在doWork中
void Host::doWork()
{
try
{
if (m_run)
m_ioService.run();
}
catch (std::exception const& _e)
{
cwarn << "Exception in Network Thread: " << _e.what();
cwarn << "Network Restart is Recommended.";
}
}
这时候线程就会阻塞在这个函数里面,前面说到的网络、节点发现还有定时器都开始了运行。前面介绍过程中提到了runAcceptor,这里对函数进行下说明,p2p节点连接无非是主动和被动两种方式,acceptor就是被动等待其他节点,主动连接部分我们后面介绍
void Host::runAcceptor()
{
assert(m_listenPort > 0); // 这个是要来判断监听是否成功的
if (m_run && !m_accepting) // 线程在运行,且不处于accept的状态
{
cnetdetails << "Listening on local port " << m_listenPort << " (public: " << m_tcpPublic << ")";
m_accepting = true; // 修改状态,避免误调用,其实可以不需要判断
auto socket = make_shared<RLPXSocket>(m_ioService); // 构造socket,监听到新连接的
m_tcp4Acceptor.async_accept(socket->ref(), [=](boost::system::error_code ec) // 异步accept
{
m_accepting = false; // 不需要担心是否有多线程问题
if (ec || !m_run)
{
socket->close(); // 如果发生错误,或者p2p已经被stop,那么关闭socket
return;
}
if (peerCount() > peerSlots(Ingress)) // 现在被动连接的peer数已经超过了设置的最大值,那么要拒绝新连接
{
cnetdetails << "Dropping incoming connect due to maximum peer count (" << Ingress << " * ideal peer count): " << socket->remoteEndpoint();
socket->close();
if (ec.value() < 1)
runAcceptor(); // 没有发生错误的话,还是要继续监听的,这是异步调用中必须要有的
return;
}
bool success = false;
try
{
// 被动连接,还不知道nodeid
// 被动连接和主动连接的构造函数RLPXHandshake是不一样的,这一点前面说明过
auto handshake = make_shared<RLPXHandshake>(this, socket);
m_connecting.push_back(handshake);
handshake->start(); // 启动握手,握手成功后就会调用前面提到startPeerSession函数回调了
success = true;
}
catch (Exception const& _e)
{
cwarn << "ERROR: " << diagnostic_information(_e);
}
catch (std::exception const& _e)
{
cwarn << "ERROR: " << _e.what();
}
if (!success)
socket->ref().close(); // 出错了要关闭调该socket
runAcceptor(); // 继续下一次监听
});
}
}
这就是被动连接的过程,再来看看主动连接,主动连接是由nodeTable来触发的,当nodeTable发生了节点变化事件时候,都会将事件放到handle中,然后我们可以看到run函数中,有一个nodeTable->processEvents()语句,这里面就是来处理节点变化的事件的,最终这些事件的处理都落到了host的onNodeTableEvent函数里面
void Host::onNodeTableEvent(NodeID const& _n, NodeTableEventType const& _e)
{
if (_e == NodeEntryAdded)
{
LOG(m_logger) << "p2p.host.nodeTable.events.nodeEntryAdded " << _n;
if (Node n = nodeFromNodeTable(_n)) //获取node信息
{
shared_ptr<Peer> p;
DEV_RECURSIVE_GUARDED(x_sessions)
{
if (m_peers.count(_n)) //这个node id已经存在
{
p = m_peers[_n];
p->endpoint = n.endpoint; //替换端点信息
}
else
{
p = make_shared<Peer>(n); //生成peer
m_peers[_n] = p;
LOG(m_logger) << "p2p.host.peers.events.peerAdded " << _n << " " << p->endpoint;
}
}
if (peerSlotsAvailable(Egress)) //查看主动去连的空位是否还有
connect(p); //发起连接
}
}
else if (_e == NodeEntryDropped)
{
LOG(m_logger) << "p2p.host.nodeTable.events.NodeEntryDropped " << _n;
RecursiveGuard l(x_sessions);
if (m_peers.count(_n) && m_peers[_n]->peerType == PeerType::Optional)
m_peers.erase(_n);
}
}
可以看出,peer不一定代表连接成功,只有session才表示真正建立的连接,主动发起的连接都放到了connect函数
void Host::connect(std::shared_ptr<Peer> const& _p)
{
if (!m_run) // 线程没有跑起来的话,那就不尽兴
return;
if (havePeerSession(_p->id)) //判断是否已经建立过连接,session是否存在
{
cnetdetails << "Aborted connect. Node already connected.";
return;
}
if (!nodeTableHasNode(_p->id) && _p->peerType == PeerType::Optional) //如果该node在node table中不存在,且不是必须要连接的,跳过
return;
// prevent concurrently connecting to a node
Peer *nptr = _p.get();
if (m_pendingPeerConns.count(nptr)) //这里用的是裸指针
return;
m_pendingPeerConns.insert(nptr); //放到等待连接中去
_p->m_lastAttempted = std::chrono::system_clock::now(); //最近一次尝试连接的时间
bi::tcp::endpoint ep(_p->endpoint);
cnetdetails << "Attempting connection to node " << _p->id << "@" << ep << " from " << id();
auto socket = make_shared<RLPXSocket>(m_ioService); //创建RLPXSocket,实际上就是在socket上面封装了一层
socket->ref().async_connect(ep, [=](boost::system::error_code const& ec) //异步连接
{
_p->m_lastAttempted = std::chrono::system_clock::now(); // 更新
_p->m_failedAttempts++; // 为啥这个也要加一呢?
if (ec)
{
cnetdetails << "Connection refused to node " << _p->id << "@" << ep << " ("
<< ec.message() << ")";
// Manually set error (session not present)
_p->m_lastDisconnect = TCPError;
}
else
{
cnetdetails << "Connecting to " << _p->id << "@" << ep;
auto handshake = make_shared<RLPXHandshake>(this, socket, _p->id); //连接成功,下面进行握手,注意第一个this,就是后面的回调
{
Guard l(x_connecting);
m_connecting.push_back(handshake); //放到connecting中去
}
handshake->start(); //调用握手的start函数
}
m_pendingPeerConns.erase(nptr); //连接成功,将其从pending中删除
});
}
host的部分就介绍到这里,主要流程都已经说明了,还有一些比如重点关注状况好的节点,节点区分是必须和可选的,种子节点如何设置以及一些网络存储等操作均在host有所提及,这里就不赘述了。
二、Capability
这一小节来介绍下基于p2p系统之上的逻辑抽象,以太坊基于p2p无非是做一些同步和转发等功能,这里面的设计是将这个业务与p2p不要建立强耦合,而是抽象成Capability,开发人员可以基于需要开发不同的Capability,那么这些Capability怎么和p2p系统结合的呢,首先看下关于Capability的两个基础类
class Capability: public std::enable_shared_from_this<Capability>
{
friend class Session;
public:
Capability(std::shared_ptr<SessionFace> _s, HostCapabilityFace* _h, unsigned _idOffset);
virtual ~Capability() {}
// Implement these in the derived class.
/* static std::string name() { return ""; } // 名称
static u256 version() { return 0; } // 版本
static unsigned messageCount() { return 0; } // 消息数量,支持的消息个数
*/
protected:
std::shared_ptr<SessionFace> session() const { return m_session.lock(); } // 这个功能对应的session,可以看出Capability是跟session相关的
HostCapabilityFace* hostCapability() const { return m_hostCap; } // 返回节点Capability,后面可以看到HostCapability是与Capability
virtual bool interpret(unsigned _id, RLP const&) = 0; // 这个函数解析rlp,并找到对应的消息处理函数
virtual void onDisconnect() {} // 断开后的回调
void disable(std::string const& _problem); // 停止功能
RLPStream& prep(RLPStream& _s, unsigned _id, unsigned _args = 0);
void sealAndSend(RLPStream& _s); // 发送
void addRating(int _r);
private:
std::weak_ptr<SessionFace> m_session;
HostCapabilityFace* m_hostCap;
bool m_enabled = true;
unsigned m_idOffset; // 偏移,跟session有关
};
从这个定义可以看出,Capability是指系统支持的某项能力,但是这个能力是与session相关的,在后面可以看到,不同的session可能支持的功能不完全一致,因此每个Capability会有一个offset,这个offset也是跟session相关的,另外我们还可以回过头看看session里面的定义,其中定义了一个m_capabilities来保存了每一个session所支持的所有能力。
class HostCapabilityFace
{
friend class Host;
template <class T> friend class HostCapability;
friend class Capability;
friend class Session;
public:
HostCapabilityFace() {}
virtual ~HostCapabilityFace() {}
Host* host() const { return m_host; } // 获取host
std::vector<std::pair<std::shared_ptr<SessionFace>, std::shared_ptr<Peer>>> peerSessions() const; // 返回所有支持当前版本功能的所有session信息
std::vector<std::pair<std::shared_ptr<SessionFace>, std::shared_ptr<Peer>>> peerSessions(u256 const& _version) const; // 返回所有某个版本功能的所有session信息
protected:
virtual std::string name() const = 0;
virtual u256 version() const = 0;
CapDesc capDesc() const { return std::make_pair(name(), version()); } // 功能描述
virtual unsigned messageCount() const = 0;
virtual std::shared_ptr<Capability> newPeerCapability(std::shared_ptr<SessionFace> const& _s, unsigned _idOffset, CapDesc const& _cap) = 0; // 为某个session生成一个Capability,还记得说过Capability和sesion有关么?
virtual void onStarting() {} // 开始回调
virtual void onStopping() {} // 停止回调
private:
Host* m_host = nullptr;
};
template<class PeerCap> // 注意这个模板,说明了HostCapability没干别的,基于某个PeerCap的封装
class HostCapability: public HostCapabilityFace
{
public:
HostCapability() {}
virtual ~HostCapability() {}
static std::string staticName() { return PeerCap::name(); }
static u256 staticVersion() { return PeerCap::version(); }
static unsigned staticMessageCount() { return PeerCap::messageCount(); }
protected:
virtual std::string name() const { return PeerCap::name(); }
virtual u256 version() const { return PeerCap::version(); }
virtual unsigned messageCount() const { return PeerCap::messageCount(); }
virtual std::shared_ptr<Capability> newPeerCapability(std::shared_ptr<SessionFace> const& _s, unsigned _idOffset, CapDesc const& _cap)
{
auto p = std::make_shared<PeerCap>(_s, this, _idOffset, _cap); // new一个Capability
_s->registerCapability(_cap, p); // 将新的capacibity加到session的m_capabilities里面去
return p;
}
};
可以看到HostCapability实际上是对Capability做了一层封装,使得HostCapability是和Host相关联,当然目前来看,host保存HostCapability的唯一功能就是根据双方通信结果为每个session创建Capability而已,这就是两个基础类,实际上运用的时候,都是需要写具体的功能子类继承他们,在目前的以太坊源码中,分别是EthereumPeer.h和EthereumHost.h两个文件所定义,大家需要有兴趣可以翻开这两个文件看下,这里面就有关于同步、传递block、传递transaction、同步状态等功能的具体体现。
接下来我们看下capability和session是如何关联上的以及消息接受如何回调相应处理函数的整个流程
1、互通有无
在握手完成阶段,每个节点都会告诉对方,目前自己支持的capability的CapDesc,这个步骤在RLPxHandshake.cpp中transition函数中
else if (m_nextState == WriteHello)
{
m_nextState = ReadHello;
m_io.reset(new RLPXFrameCoder(*this));
RLPStream s;
s.append((unsigned)HelloPacket).appendList(5)
<< dev::p2p::c_protocolVersion
<< m_host->m_clientVersion
<< m_host->caps() // 将当前host支持的功能集合
<< m_host->listenPort()
<< m_host->id();
bytes packet;
s.swapOut(packet);
m_io->writeSingleFramePacket(&packet, m_handshakeOutBuffer);
ba::async_write(m_socket->ref(), ba::buffer(m_handshakeOutBuffer), [this, self](boost::system::error_code ec, std::size_t)
{
transition(ec);
});
}
2、求同去异
当收到hello消息后,就会回调host的startPeerSession函数了,在这个函数里面首先要进行去重,只留下双方共有的
// leave only highset mutually supported capability version
// 如果当前host不支持,或者有重复,清理
caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());
for (auto cap: caps)
capslog << "(" << cap.first << "," << dec << cap.second << ")";
这里面一个是把对方有但是自己没有的capability删掉,另一个是如果双方都有,但是出现同一个功能两个版本的,只保留最新版本
3、给session赋能
现在只剩下双方都有的capability了,那么就把这些capablity注册到各自的session里面去,这部分仍然是在startPeerSession中
// todo: mutex Session::m_capabilities and move for(:caps) out of mutex.
for (auto const& i: caps)
{
auto pcap = m_capabilities[i];
if (!pcap) // 这个判断很奇怪,上面已经进行去了整理,只保留了双方都支持的cap
return ps->disconnect(IncompatibleProtocol);
pcap->newPeerCapability(ps, offset, i); //
offset += pcap->messageCount(); //消息的数量
}
这里面的offset我有个疑惑,怎么保证两个节点的offset完全一致的呢?难道不会出现两个节点版本不一致,导致传过来的caps顺序不一样么?如果有朋友搞清楚了别的逻辑,还请赐教!
4、capability响应
当创建完session之后,与peer的session收发就与host没有关系了,当session收到消息后,会将消息转发给capability中,这部分在session.cpp的readPacket函数中
//读取数据包
bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
{
m_lastReceived = chrono::steady_clock::now();
clog(VerbosityTrace, "net") << "-> " << _t << " " << _r;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{
// v4 frame headers are useless, offset packet type used
// v5 protocol type is in header, packet type not offset
if (_capId == 0 && _t < UserPacket) //如果 _t < UserPacket,说明是底层通信数据包,比如ping pong之类,具体类型定义在Common.h文件
return interpret(_t, _r);
// 消息转发在这里面!!!!!!!!!!
for (auto const& i: m_capabilities):
if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;
return false; //如果未找到相应的capability,返回错误
}
catch (std::exception const& _e)
{
cnetlog << "Exception caught in p2p::Session::interpret(): " << _e.what()
<< ". PacketType: " << _t << ". RLP: " << _r;
disconnect(BadProtocol);
return true;
}
return true;
}
这个函数里,会根据packetType,根据offset找到这个packetType所处于的capability,然后调用这个capability的interpret函数来进一步处理,大家可以看看EthereumPeer这个函数多了解一些。
好了,到这里以太坊c++版本的p2p部分全部介绍完了,比我之前预想的要晚了好久,差点把这个主题最开始立下自己要坚持的flag给破坏了,后面介绍别的部分会尽量跟上节奏;这部分介绍的主要是p2p的内部运转逻辑,不涉及到具体的业务功能实现,所以看起来跟区块链关系不大,后面再往上层的逻辑就会涉及到区块、交易等相关信息,就不放在p2p部分来说明了,而是放到后面别的部分来介绍。
接下来我会再写一篇关于p2p的总结性文章,作为p2p部分的收篇,这是因为部门研发的联盟链底层也会需要p2p系统,但是对p2p系统的设计有很大不一样,我会做一些说明,未完待续!!
上一篇: ios 切换摄像头