P2P之Noise代码分析
Noise是一个用Go写的去中心化的P2P网络(A decentralized P2P networking stack written in Go.)
是perlin公司开发的一个基础P2P网络,Perlin平台由三个主要部分组成: 一个算力资源加密证明机制,一个基于DAG的分布式账本,以及一个高度并行的盲计算框架。他们的白皮书刚刚发布,相关代码只有Noise项目,其他还未完成。其代币目前基于ERC20,也基本没上交易所。
实现的主要功能:
- 通过对等体之间的实时双向流传输。基于KCP / TCP和Protobufs。
- NAT遍历/自动端口转发(NAT-PMP,UPnP)。
- 用于对等身份和签名的NaCL / Ed25519方案。
- Kademlia受DHT启发的节点发现。
- 请求/响应和消息传递RPC。
- 使用glog记录日志。
- 插件系统
项目提供的测试效果图:
下面看主要代码。
func main() {
flag.Set("logtostderr", "true")
portFlag := flag.Int("port", 3000, "port to listen to")
hostFlag := flag.String("host", "localhost", "host to listen to")
protocolFlag := flag.String("protocol", "tcp", "protocol to use (kcp/tcp)")
peersFlag := flag.String("peers", "", "peers to connect to")
flag.Parse() //解析启动参数
port := uint16(*portFlag)
host := *hostFlag
protocol := *protocolFlag
peers := strings.Split(*peersFlag, ",")
keys := ed25519.RandomKeyPair() //生产ed25519的公私钥
glog.Infof("Private Key: %s", keys.PrivateKeyHex())
glog.Infof("Public Key: %s", keys.PublicKeyHex())
builder := network.NewBuilder() //构造builder对象
builder.SetKeys(keys) //将公私钥作为本节点的ID
builder.SetAddress(network.FormatAddress(protocol, host, port)) //设置本地地址端口等
// Register peer discovery plugin.
builder.AddPlugin(new(discovery.Plugin)) //添加节点发现的插件,实现了DHT的功能
// Add custom chat plugin.
builder.AddPlugin(new(ChatPlugin)) //本测试节点间发送消息的插件
nat.RegisterPlugin(builder) //可以启用NAT插件,如果没有开启Upnp则没有效果
net, err := builder.Build() //Build返回Network对象
if err != nil {
glog.Fatal(err)
return
}
go net.Listen() //启用监听
if len(peers) > 0 {
net.Bootstrap(peers...) //与bootstrap节点建立连接
}
reader := bufio.NewReader(os.Stdin) //读取控制台
for {
input, _ := reader.ReadString('\n') //如有输入
// skip blank lines
if len(strings.TrimSpace(input)) == 0 {
continue
}
glog.Infof("<%s> %s", net.Address, input)
net.Broadcast(&messages.ChatMessage{Message: input}) //将输入广播到其他节点
}
}
1. discovery.Plugin分析
type Plugin struct {
*network.Plugin
DisablePing bool
DisablePong bool
DisableLookup bool
Routes *dht.RoutingTable
}
此插件继承自network.Plugin对象,实现了PluginInterface接口,如下:
// Plugin is an abstract class which all plugins extend.
type Plugin struct{}
// Hook callbacks of network builder plugins
// Startup is called only once when the plugin is loaded
func (*Plugin) Startup(net *Network) {}
// Receive is called every time when messages are received
func (*Plugin) Receive(ctx *PluginContext) error { return nil }
// Cleanup is called only once after network stops listening
func (*Plugin) Cleanup(net *Network) {}
// PeerConnect is called every time a PeerClient is initialized and connected
func (*Plugin) PeerConnect(client *PeerClient) {}
// PeerDisconnect is called every time a PeerClient connection is closed
func (*Plugin) PeerDisconnect(client *PeerClient) {}
分别在启动、接受、清理、节点连接、断开连接时调用插件。
启动时:state.Routes = dht.CreateRoutingTable(net.ID),创建含有len(id.PublicKey)*8个Bucket的RoutingTable,如果id.PublicKey=20则RoutingTable有160个Bucket,每个Bucket含有一个List,同时table.Update(id)更新id对应的bucket里的id到顶部。
func NewBucket() *Bucket {
return &Bucket{
List: list.New(),
mutex: &sync.RWMutex{},
}
}
每个bucket保存const BucketSize = 16个节点信息。
接受其他节点的消息后处理逻辑:
func (state *Plugin) Receive(ctx *network.PluginContext) error {
state.Routes.Update(ctx.Sender()) //更新发送节点在bucket中的位置
switch msg := ctx.Message().(type) {// Handle RPC. 判断消息类型
case *protobuf.Ping:
if state.DisablePing {
break
}
// Send pong to peer.
err := ctx.Reply(&protobuf.Pong{}) //回复Pong消息
if err != nil {
return err
}
case *protobuf.Pong:
if state.DisablePong {
break
}
peers := FindNode(ctx.Network(), ctx.Sender(), dht.BucketSize, 8) //使用KAD算法找到离ctx.Sender最近的节点
// Update routing table w/ closest peers to self.
for _, peerID := range peers {
state.Routes.Update(peerID) //将这些最近的节点更新到bucket
}
glog.Infof("bootstrapped w/ peer(s): %s.", strings.Join(state.Routes.GetPeerAddresses(), ", "))
case *protobuf.LookupNodeRequest:
if state.DisableLookup {
break
}
// Prepare response.
response := &protobuf.LookupNodeResponse{} //实例化LookupResponse相应对象
// Respond back with closest peers to a provided target.
for _, peerID := range state.Routes.FindClosestPeers(peer.ID(*msg.Target), dht.BucketSize) { //在本节点找到离target最近的节点
id := protobuf.ID(peerID)
response.Peers = append(response.Peers, &id)
}
err := ctx.Reply(response) //输出response
if err != nil {
return err
}
glog.Infof("connected peers: %s.", strings.Join(state.Routes.GetPeerAddresses(), ", "))
}
return nil
}
节点间发送的数据对象:
type PluginContext struct {
client *PeerClient
message proto.Message
nonce uint64
}
2. ChatPlugin主要就是收到消息后输出日志
type ChatPlugin struct{ *network.Plugin }
func (state *ChatPlugin) Receive(ctx *network.PluginContext) error {
switch msg := ctx.Message().(type) {
case *messages.ChatMessage:
glog.Infof("<%s> %s", ctx.Client().ID.Address, msg.Message)
}
return nil
}
3. net, err := builder.Build()主要是构造Network对象,并初始化,初始化代码如下:
func (n *Network) Init() {
workerCount := runtime.NumCPU() + 1
// Spawn worker routines for receiving and handling messages in the application layer.
go n.handleRecvQueue() //从n.RecvQueue读取消息
for i := 0; i < workerCount; i++ {
// Spawn worker routines for sending queued messages to the networking layer.
go n.handleSendQueue() //从n.SendQueue发送消息
}
}
4.go net.Listen() 启用监听网络,支持KCP、TCP协议
5.net.Bootstrap(peers...) 与peers节点建立连接
func (n *Network) Bootstrap(addresses ...string) {
n.BlockUntilListening()
addresses = FilterPeers(n.Address, addresses)
for _, address := range addresses {
client, err := n.Client(address)
if err != nil {
glog.Error(err)
continue
}
err = client.Tell(&protobuf.Ping{}) //发送Ping消息
if err != nil {
continue
}
}
}
6. net.Broadcast(&messages.ChatMessage{Message: input}) 循环所有节点广播控制台输入的消息
func (n *Network) Broadcast(message proto.Message) {
n.Peers.Range(func(key, value interface{}) bool {
client := value.(*PeerClient)
err := client.Tell(message)
if err != nil {
glog.Warningf("failed to send message to peer %v [err=%s]", client.ID, err)
}
return true
})
}
本文作者:architect.bian,欢迎收藏,转载请保留原文地址并保留版权声明!谢谢~
还没完!往下看!!!
下一篇: Lucene全文检索应用