以太坊源码分析之 P2P网络(五、p2p连接控制与消息处理(中))
区块链特辑 :https://blog.csdn.net/fusan2004/article/details/80879343,欢迎查阅,原创作品,转载请标明!
承继前篇, 前一篇介绍了连接底层的一些细节,包括socket,握手等详细步骤,等待握手完成后,会调用startPeerSession抛向上层,这个时候连接表明已经建立了,从代码里面,为了表征这种连接的建立,会再定义一些数据结构来表达这种关系,本篇文章我们将首先去了解下startPeerSession里面都做了些什么事情,然后再去详细了解逻辑上的连接表达方式。
一、建立连接
这部分我们只介绍host.cpp中的startPeerSession函数,其他关于host的部分将在下篇中介绍,那也是整个p2p系列代码部分的最后一篇。首先研读下cpp的代码,如下
// 这是在握手成功之后被handshake调用,握手是在connect或者acceptor中进行
// 可以细致了解下这个函数的参数,_id表示对方的公钥,也是对方的节点id,rlp实际上
// 是writeHello中填写的对方的一些信息,io是数据帧编解码,_s就是底层通信的socket
void Host::startPeerSession(Public const& _id, RLP const& _rlp, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s)
{
// session可能是主动或者被动建立的,所以peers和node table里面可能不包括这个node id
shared_ptr<Peer> p;
DEV_RECURSIVE_GUARDED(x_sessions)
{
if (m_peers.count(_id)) //判断peer是否存在
p = m_peers[_id];
else
{
// 不存在的话,先尝试从node table中获取端口信息
if (Node n = nodeFromNodeTable(_id))
p = make_shared<Peer>(n);
if (!p)
p = make_shared<Peer>(Node(_id, UnspecifiedNodeIPEndpoint)); //如果node_table中也没有,先不指定ip断点信息
m_peers[_id] = p; //添加到m_peers中去
}
}
if (p->isOffline()) // 如果之前已经下线了,session不存在了
p->m_lastConnected = std::chrono::system_clock::now(); //更新
p->endpoint.address = _s->remoteEndpoint().address(); //更新地址
auto protocolVersion = _rlp[0].toInt<unsigned>(); // 协议版本号
auto clientVersion = _rlp[1].toString(); // 客户版本
auto caps = _rlp[2].toVector<CapDesc>(); // 远方端点支持的能力
auto listenPort = _rlp[3].toInt<unsigned short>(); // 监听的端口
auto pub = _rlp[4].toHash<Public>(); // 公钥,与node id相同
if (pub != _id)
{
cdebug << "Wrong ID: " << pub << " vs. " << _id; // 不同认为有问题,不建立session
return;
}
// clang error (previously: ... << hex << caps ...)
// "'operator<<' should be declared prior to the call site or in an associated namespace of one of its arguments"
stringstream capslog;
// leave only highset mutually supported capability version
// 如果当前host不支持,或者有重复,清理
caps.erase(remove_if(caps.begin(), caps.end(), [&](CapDesc const& _r){ return !haveCapability(_r) || any_of(caps.begin(), caps.end(), [&](CapDesc const& _o){ return _r.first == _o.first && _o.second > _r.second && haveCapability(_o); }); }), caps.end());
for (auto cap: caps)
capslog << "(" << cap.first << "," << dec << cap.second << ")";
cnetlog << "Hello: " << clientVersion << " V[" << protocolVersion << "]"
<< " " << _id << " " << showbase << capslog.str() << " " << dec << listenPort;
// create session so disconnects are managed
// session来管理连接,所以先创建session,然后再去判断信息的合法性
shared_ptr<SessionFace> ps = make_shared<Session>(this, move(_io), _s, p, PeerSessionInfo({_id, clientVersion, p->endpoint.address.to_string(), listenPort, chrono::steady_clock::duration(), _rlp[2].toSet<CapDesc>(), 0, map<string, string>(), protocolVersion}));
if (protocolVersion < dev::p2p::c_protocolVersion - 1) //协议版本不一致的话断开
{
ps->disconnect(IncompatibleProtocol);
return;
}
if (caps.empty())
{
ps->disconnect(UselessPeer); //没有注册功能的话,直接断开连接
return;
}
if (m_netPrefs.pin && !isRequiredPeer(_id)) //如果设置要求只接受可信的peer,但是当前不可信,也要断开
{
cdebug << "Unexpected identity from peer (got" << _id << ", must be one of " << m_requiredPeers << ")";
ps->disconnect(UnexpectedIdentity);
return;
}
{
RecursiveGuard l(x_sessions);
if (m_sessions.count(_id) && !!m_sessions[_id].lock()) // 该节点已有session,且该session已建立
if (auto s = m_sessions[_id].lock())
if(s->isConnected()) //该连接正在连接,端口
{
// Already connected.
cnetlog << "Session already exists for peer with id " << _id;
ps->disconnect(DuplicatePeer);
return;
}
if (!peerSlotsAvailable()) //如果已经满了
{
cnetdetails << "Too many peers, can't connect. peer count: " << peerCount() // peerCount已经连接且连接有效的peer数
<< " pending peers: " << m_pendingPeerConns.size();
ps->disconnect(TooManyPeers);
return;
}
unsigned offset = (unsigned)UserPacket;
// todo: mutex Session::m_capabilities and move for(:caps) out of mutex.
for (auto const& i: caps)
{
auto pcap = m_capabilities[i];
if (!pcap) // 这个判断很奇怪,上面已经进行去了整理,只保留了双方都支持的cap
return ps->disconnect(IncompatibleProtocol);
pcap->newPeerCapability(ps, offset, i); //
offset += pcap->messageCount(); //消息的数量
}
ps->start(); //发完ping之后再添加到m_sessions
m_sessions[_id] = ps;
}
LOG(m_logger) << "p2p.host.peer.register " << _id;
}
这个函数里面,主要是根据当前建立的连接,生成了一个peer,当然如果这个peer已经存在的话就复用了,然后再根据已有数据为这个连接生成了一个逻辑上的结构session,最后根据交互的信息来查看这个连接是否合理合法,没问题的话就将这个新建的session放到m_sessions里面去,否则会调用session的disconnect函数,这里面也说明了为啥有些判断条件不是在session生成之前来判断,这是因为作者希望是通过session来管理连接,这可以从disconnect的时候要加上断开的说明看出来。
除了这些基本逻辑,大家还需要关心几个结构,这些也将会在后面来依次说明:
1)peer,要区分peer和node的区别,node是在NodeTable中体现的,peer是tcp连接握手成功后体现的,peer是真正用来p2p消息的对象,node仅仅是候选,后面看peer的构造函数的参数是Node类型也可以说明些事情;
2)session,session是整个连接管理中最重要的结构,他表示的就是peer真正建立了连接后的逻辑关系,从构造函数中可以看到,他把peer 、peerSessionInfo 、socket 、io 等全部包括了;
3)capability,这是指节点提供的能力,实际上也就是在p2p层之上的业务能力,p2p只是提供通信信道,具体用p2p干什么、传递什么消息,都是由这些capability来触发的,这个部分也会在后面详细说明。
二、连接的数据结构
1、peer
首先我们来看看peer提供了哪些能力
class Peer: public Node
{
friend class Session; /// Allows Session to update score and rating.
friend class Host; /// For Host: saveNetwork(), restoreNetwork()
friend class RLPXHandshake;
public:
/// Construct Peer from Node.
Peer(Node const& _node): Node(_node) {}
Peer(Peer const&);
bool isOffline() const { return !m_session.lock(); } //判断是否断开
// peer间一个比较
virtual bool operator<(Peer const& _p) const;
// 返回当前peer的评分
int rating() const { return m_rating; }
// 判断是否需要尝试去连接这个peer
bool shouldReconnect() const;
// 尝试连接这个peer的次数
int failedAttempts() const { return m_failedAttempts; }
// 上一次断开的原因
DisconnectReason lastDisconnect() const { return m_lastDisconnect; }
// 表明和这个peer的连接是有用的
void noteSessionGood() { m_failedAttempts = 0; }
protected:
// 根据之前连接的历史来决定下一次尝试连接等待的时间
unsigned fallbackSeconds() const;
std::atomic<int> m_score{0}; ///< All time cumulative.
std::atomic<int> m_rating{0}; ///< Trending.
/// Network Availability
std::chrono::system_clock::time_point m_lastConnected; // 最近一次连接时间
std::chrono::system_clock::time_point m_lastAttempted; // 最近一次尝试重连时间
std::atomic<unsigned> m_failedAttempts{0}; // 失败次数
DisconnectReason m_lastDisconnect = NoDisconnect; // 上一次断开原因
/// Used by isOffline() and (todo) for peer to emit session information.
std::weak_ptr<Session> m_session;
};
从peer继承于node看,实际上是在node上面加了一层连接相关的逻辑,从代码中可以看出,主要是一些连接的逻辑,包括重连,是否断开,连接次数等等。
2、session
session是上层网络逻辑最重要的部分,主要负责维护连接的稳定性以及保持消息通信,下面一起看下session里面的操作
class Session: public SessionFace, public std::enable_shared_from_this<SessionFace>
{
public:
// 构造函数
// _server, 其实是为了和创建session的host有个交互的可能,目前没有大的作用
// 也说明当创建完session之后,p2p的业务逻辑都是通过session来完成的了
// _io, rlp数据编解码,这是在网络底层数据接受的rlp封装对象
// _s,底层通信的socket
// _n, 与之连接peer,session的另一头
// _info, 简单的维护了对方的一些信息
Session(Host* _server, std::unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info);
virtual ~Session();
// session的启动,这里主要是发送了ping数据,然后开启read模式,等待收消息
void start() override;
// 主动断开,并添加原因
void disconnect(DisconnectReason _reason) override;
// 发送ping包
void ping() override;
// 判断是否还在连接
bool isConnected() const override { return m_socket->ref().is_open(); }
// 返回连接的node id
NodeID id() const override;
// 封装且发送
void sealAndSend(RLPStream& _s) override;
// 返回评分
int rating() const override;
void addRating(int _r) override;
void addNote(std::string const& _k, std::string const& _v) override { Guard l(x_info); m_info.notes[_k] = _v; }
PeerSessionInfo info() const override { Guard l(x_info); return m_info; }
// 返回连接建立的时间,也就是session被new出来的时间点
std::chrono::steady_clock::time_point connectionTime() override { return m_connect; }
// 注册功能
void registerCapability(CapDesc const& _desc, std::shared_ptr<Capability> _p) override;
// 返回目前支持的功能
std::map<CapDesc, std::shared_ptr<Capability>> const& capabilities() const override { return m_capabilities; }
// 返回对方peer
std::shared_ptr<Peer> peer() const override { return m_peer; }
// 返回最近一次接收到消息的时间点
std::chrono::steady_clock::time_point lastReceived() const override { return m_lastReceived; }
// 名誉管理器
ReputationManager& repMan() override;
private:
static RLPStream& prep(RLPStream& _s, PacketType _t, unsigned _args = 0);
// 发送数据
void send(bytes&& _msg);
// 因为某种原因丢弃连接
void drop(DisconnectReason _r);
// 在socket上执行读操作
void doRead();
// 在读完之后检查error code,如果error code存在那就丢弃peer
bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length);
// 执行一轮写操作,可能结束的时候异步调用自己
void write();
/// Deliver RLPX packet to Session or Capability for interpretation.
// 为了可解释性传送rlpx包给session或capablity
bool readPacket(uint16_t _capId, PacketType _t, RLP const& _r);
/// Interpret an incoming Session packet.
// 解释一个传入session包
bool interpret(PacketType _t, RLP const& _r);
/// @returns true iff the _msg forms a valid message for sending or receiving on the network.
// 检查网络上发送或读取的msg是否是一个有用的消息
static bool checkPacket(bytesConstRef _msg);
Host* m_server; // 保存session的server,绝不为null
std::unique_ptr<RLPXFrameCoder> m_io; // 编码,基于此发送数据报
std::shared_ptr<RLPXSocket> m_socket; // peer连接底层的socket
Mutex x_framing; // 写队列的互斥锁
std::deque<bytes> m_writeQueue; // 写队列
std::vector<byte> m_data; // 读进来的数据缓冲区
bytes m_incoming; // 读进来字节的读缓冲区
std::shared_ptr<Peer> m_peer; // peer对象
bool m_dropped = false; // 是否丢弃这个session
//< If true, we've already divested ourselves of this peer. We're just waiting for the reads & writes to fail before the shared_ptr goes OOS and the destructor kicks in.
mutable Mutex x_info;
PeerSessionInfo m_info; // 关于这个peer的动态信息
std::chrono::steady_clock::time_point m_connect; // 连接的时间点
std::chrono::steady_clock::time_point m_ping; // 最近一次ping的时间点
std::chrono::steady_clock::time_point m_lastReceived; // 最近一次获取消息的时间点
std::map<CapDesc, std::shared_ptr<Capability>> m_capabilities; // 这个peer的支持的功能
std::string const m_logContext;
};
可以看出,session最主要的功能还是处理消息的发送和接受,来看看主要的几个函数,首先看下start
void Session::start()
{
ping(); // 先发一个ping消息,打个招呼
doRead(); // 进入读等待阶段
}
void Session::ping() //发送ping包
{
RLPStream s;
sealAndSend(prep(s, PingPacket)); // prep是准备消息,sealAndSend在准备好的消息发送
m_ping = std::chrono::steady_clock::now();
}
start函数其实最主要的就是让socket处于异步读的逻辑中,这个ping除了更新保活状态,没有其他作用了,doRead函数较大,单独提出来说明。
void Session::doRead()
{
// 如果以及处于被丢弃状态的话忽略接收到的packet
if (m_dropped)
return;
auto self(shared_from_this());
m_data.resize(h256::size); // 将读缓冲区resize到h256的大小空间,数据头的大小
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size),
[this, self](boost::system::error_code ec, std::size_t length) {
LOG_SCOPED_CONTEXT(m_logContext);
if (!checkRead(h256::size, ec, length)) //读取过程中报错,或者长度不一致
return;
else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length))) //解密数据头
{
cnetlog << "header decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
uint16_t hProtocolId;
uint32_t hLength;
uint8_t hPadding;
try
{
RLPXFrameInfo header(bytesConstRef(m_data.data(), length));
hProtocolId = header.protocolId;
hLength = header.length;
hPadding = header.padding;
}
catch (std::exception const& _e)
{
cnetlog << "Exception decoding frame header RLP: " << _e.what() << " "
<< bytesConstRef(m_data.data(), h128::size).cropped(3);
drop(BadProtocol);
return;
}
/// read padded frame and mac
// 读取后面的帧数据以及mac
auto tlen = hLength + hPadding + h128::size;
m_data.resize(tlen);
ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen),
[this, self, hLength, hProtocolId, tlen](
boost::system::error_code ec, std::size_t length) {
LOG_SCOPED_CONTEXT(m_logContext);
if (!checkRead(tlen, ec, length)) //同样检查下数据对不对,全不全
return;
else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen))) //解密
{
cnetlog << "frame decrypt failed";
drop(BadProtocol); // todo: better error
return;
}
bytesConstRef frame(m_data.data(), hLength);
if (!checkPacket(frame)) //验证下消息格式和大小
{
cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
cnetlog << "INVALID MESSAGE RECEIVED";
disconnect(BadProtocol);
return;
}
else
{
//第一个保存的是packetType
auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
RLP r(frame.cropped(1)); //后面是正文
bool ok = readPacket(hProtocolId, packetType, r); //处理逻辑
if (!ok)
cnetlog << "Couldn't interpret packet. " << RLP(r);
}
doRead(); //成功读取之后继续下一次doRead
});
});
}
从doRead函数中可以看出来,这个函数大概就是先读取一个数据头,根据数据头读取数据体,然后把完整的消息体都扔给了readPacket来处理,当一次读过程没有问题的时候,就继续下一次读过程,session本身不处理消息,那么可以从readPacket看看消息是怎么交给其他部分来处理的
//读取数据包
bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
{
m_lastReceived = chrono::steady_clock::now();
clog(VerbosityTrace, "net") << "-> " << _t << " " << _r;
try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
{
// v4 frame headers are useless, offset packet type used
// v5 protocol type is in header, packet type not offset
if (_capId == 0 && _t < UserPacket) //如果 _t < UserPacket,说明是底层通信数据包,比如ping pong之类,具体类型定义在Common.h文件
return interpret(_t, _r);
//这里面是根据支持的功能进行相应的操作
for (auto const& i: m_capabilities):
if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;
return false; //如果未找到相应的capability,返回错误
}
catch (std::exception const& _e)
{
cnetlog << "Exception caught in p2p::Session::interpret(): " << _e.what()
<< ". PacketType: " << _t << ". RLP: " << _r;
disconnect(BadProtocol);
return true;
}
return true;
}
enum PacketType
{
HelloPacket = 0,
DisconnectPacket,
PingPacket,
PongPacket,
GetPeersPacket,
PeersPacket,
UserPacket = 0x10
};
在readPacket可以看到,数据类型有系统定义的类型和用户自定义的类型之分,当数据包类型属于enum PacketType之中的时候,会直接由session的interpret函数来处理,而当超出这个时候,就会由注册的capability来处理,大家读起来其实挺费劲的,这个capability没有明确的说明什么消息类型,而是说给定一个偏移量,根据当前值与偏移量的之差来确定消息类型,然后由各capability自己的interpret函数来解析和处理,这一点我个人是不喜欢的,个人感觉一个系统没有必要设计的如此晦涩,再来看看interpret里面都如何处理系统定义的消息的吧
bool Session::interpret(PacketType _t, RLP const& _r)
{
switch (_t)
{
case DisconnectPacket: // 断开连接数据包
{
string reason = "Unspecified";
auto r = (DisconnectReason)_r[0].toInt<int>();
if (!_r[0].isInt()) //不是整型
drop(BadProtocol);
else
{
reason = reasonOf(r);
cnetlog << "Disconnect (reason: " << reason << ")";
drop(DisconnectRequested);
}
break;
}
case PingPacket: // ping数据包
{
cnetdetails << "Ping " << m_info.id;
RLPStream s;
sealAndSend(prep(s, PongPacket)); // 回复pong数据包
break;
}
case PongPacket:
DEV_GUARDED(x_info)
{
m_info.lastPing = std::chrono::steady_clock::now() - m_ping; //更新ping时间
cnetdetails << "Latency: "
<< chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count()
<< " ms";
}
break;
case GetPeersPacket: // 这些实际上是在节点发现时候用到的,session中用不到
case PeersPacket:
break;
default:
return false;
}
return true;
}
从这里可以看到,发送给对方的时候都有一个sealAndSend的过程,这个函数里面前面也提到实际上就是调用了send函数将数据发送出去,最后来看下send的过程吧
void Session::send(bytes&& _msg)
{
bytesConstRef msg(&_msg);
clog(VerbosityTrace, "net") << "<- " << RLP(msg.cropped(1));
if (!checkPacket(msg)) // 检查是否合法
cnetlog << "INVALID PACKET CONSTRUCTED!";
if (!m_socket->ref().is_open()) // 检查连接是否保持
return;
bool doWrite = false;
DEV_GUARDED(x_framing)
{
m_writeQueue.push_back(std::move(_msg)); // 塞到写队列中去
doWrite = (m_writeQueue.size() == 1); // 只有当队列长度为一的时候才写,这个跟之前介绍节点发现时候理由一致
}
if (doWrite)
write(); // 调用write
}
void Session::write()
{
bytes const* out = nullptr;
DEV_GUARDED(x_framing)
{
m_io->writeSingleFramePacket(&m_writeQueue[0], m_writeQueue[0]); // io用来编码的
out = &m_writeQueue[0];
}
auto self(shared_from_this());
ba::async_write(m_socket->ref(), ba::buffer(*out),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
LOG_SCOPED_CONTEXT(m_logContext);
// must check queue, as write callback can occur following dropped()
if (ec)
{
cnetlog << "Error sending: " << ec.message();
drop(TCPError);
return;
}
DEV_GUARDED(x_framing)
{
m_writeQueue.pop_front(); //丢弃已写的数据
if (m_writeQueue.empty())
return;
}
write(); //继续写
});
}
可以看出,在网络这块,作者的思路还是很有延续性的,对write的处理和节点发现过程基本上是一直的,只不过在send的时候加了些合法性判断以及io编码等操作而已
到这里,就把节点相关的连接逻辑全部介绍完毕了,最后一节将介绍host的功能以及每个节点提供的能力是如何体现的,未完待续。。。
上一篇: 以太坊源码分析之 P2P网络(六、p2p连接控制与消息处理(下))
下一篇: 切换摄像头