欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

P2P之Noise代码分析

程序员文章站 2022-03-04 23:29:28
...

Noise是一个用Go写的去中心化的P2P网络(A decentralized P2P networking stack written in Go.)

是perlin公司开发的一个基础P2P网络,Perlin平台由三个主要部分组成: 一个算力资源加密证明机制,一个基于DAG的分布式账本,以及一个高度并行的盲计算框架。他们的白皮书刚刚发布,相关代码只有Noise项目,其他还未完成。其代币目前基于ERC20,也基本没上交易所。

实现的主要功能:

  • 通过对等体之间的实时双向流传输。基于KCP / TCP和Protobufs
  • NAT遍历/自动端口转发(NAT-PMP,UPnP)。
  • 用于对等身份和签名的NaCL / Ed25519方案。
  • Kademlia受DHT启发的节点发现。
  • 请求/响应和消息传递RPC。
  • 使用glog记录日志。
  • 插件系统

项目提供的测试效果图:

P2P之Noise代码分析

下面看主要代码。


func main() {
	flag.Set("logtostderr", "true")

	portFlag := flag.Int("port", 3000, "port to listen to")
	hostFlag := flag.String("host", "localhost", "host to listen to")
	protocolFlag := flag.String("protocol", "tcp", "protocol to use (kcp/tcp)")
	peersFlag := flag.String("peers", "", "peers to connect to")
	flag.Parse() //解析启动参数

	port := uint16(*portFlag)
	host := *hostFlag
	protocol := *protocolFlag
	peers := strings.Split(*peersFlag, ",")

	keys := ed25519.RandomKeyPair() //生产ed25519的公私钥

	glog.Infof("Private Key: %s", keys.PrivateKeyHex())
	glog.Infof("Public Key: %s", keys.PublicKeyHex())

	builder := network.NewBuilder() //构造builder对象
	builder.SetKeys(keys) //将公私钥作为本节点的ID
	builder.SetAddress(network.FormatAddress(protocol, host, port)) //设置本地地址端口等

	// Register peer discovery plugin.
	builder.AddPlugin(new(discovery.Plugin)) //添加节点发现的插件,实现了DHT的功能

	// Add custom chat plugin.
	builder.AddPlugin(new(ChatPlugin)) //本测试节点间发送消息的插件
	nat.RegisterPlugin(builder) //可以启用NAT插件,如果没有开启Upnp则没有效果
	net, err := builder.Build() //Build返回Network对象
	if err != nil {
		glog.Fatal(err)
		return
	}

	go net.Listen() //启用监听

	if len(peers) > 0 {
		net.Bootstrap(peers...) //与bootstrap节点建立连接
	}

	reader := bufio.NewReader(os.Stdin) //读取控制台
	for {
		input, _ := reader.ReadString('\n') //如有输入

		// skip blank lines
		if len(strings.TrimSpace(input)) == 0 {
			continue
		}

		glog.Infof("<%s> %s", net.Address, input)

		net.Broadcast(&messages.ChatMessage{Message: input}) //将输入广播到其他节点
	}
}

1. discovery.Plugin分析


type Plugin struct {
	*network.Plugin

	DisablePing   bool
	DisablePong   bool
	DisableLookup bool

	Routes *dht.RoutingTable
}

此插件继承自network.Plugin对象,实现了PluginInterface接口,如下:


// Plugin is an abstract class which all plugins extend.
type Plugin struct{}

// Hook callbacks of network builder plugins

// Startup is called only once when the plugin is loaded
func (*Plugin) Startup(net *Network) {}

// Receive is called every time when messages are received
func (*Plugin) Receive(ctx *PluginContext) error { return nil }

// Cleanup is called only once after network stops listening
func (*Plugin) Cleanup(net *Network) {}

// PeerConnect is called every time a PeerClient is initialized and connected
func (*Plugin) PeerConnect(client *PeerClient) {}

// PeerDisconnect is called every time a PeerClient connection is closed
func (*Plugin) PeerDisconnect(client *PeerClient) {}

分别在启动、接受、清理、节点连接、断开连接时调用插件。

启动时:state.Routes = dht.CreateRoutingTable(net.ID),创建含有len(id.PublicKey)*8个Bucket的RoutingTable,如果id.PublicKey=20则RoutingTable有160个Bucket,每个Bucket含有一个List,同时table.Update(id)更新id对应的bucket里的id到顶部。

func NewBucket() *Bucket {
	return &Bucket{
		List:  list.New(),
		mutex: &sync.RWMutex{},
	}
}

每个bucket保存const BucketSize = 16个节点信息。

接受其他节点的消息后处理逻辑:


func (state *Plugin) Receive(ctx *network.PluginContext) error {
	state.Routes.Update(ctx.Sender()) //更新发送节点在bucket中的位置

	switch msg := ctx.Message().(type) {// Handle RPC. 判断消息类型
	case *protobuf.Ping:
		if state.DisablePing {
			break
		}

		// Send pong to peer.
		err := ctx.Reply(&protobuf.Pong{}) //回复Pong消息

		if err != nil {
			return err
		}
	case *protobuf.Pong:
		if state.DisablePong {
			break
		}

		peers := FindNode(ctx.Network(), ctx.Sender(), dht.BucketSize, 8) //使用KAD算法找到离ctx.Sender最近的节点

		// Update routing table w/ closest peers to self.
		for _, peerID := range peers {
			state.Routes.Update(peerID) //将这些最近的节点更新到bucket
		}

		glog.Infof("bootstrapped w/ peer(s): %s.", strings.Join(state.Routes.GetPeerAddresses(), ", "))
	case *protobuf.LookupNodeRequest:
		if state.DisableLookup {
			break
		}

		// Prepare response.
		response := &protobuf.LookupNodeResponse{} //实例化LookupResponse相应对象

		// Respond back with closest peers to a provided target.
		for _, peerID := range state.Routes.FindClosestPeers(peer.ID(*msg.Target), dht.BucketSize) { //在本节点找到离target最近的节点
			id := protobuf.ID(peerID)
			response.Peers = append(response.Peers, &id)
		}

		err := ctx.Reply(response) //输出response
		if err != nil {
			return err
		}

		glog.Infof("connected peers: %s.", strings.Join(state.Routes.GetPeerAddresses(), ", "))
	}

	return nil
}

节点间发送的数据对象:

type PluginContext struct {
	client  *PeerClient
	message proto.Message
	nonce   uint64
}

2. ChatPlugin主要就是收到消息后输出日志

type ChatPlugin struct{ *network.Plugin }

func (state *ChatPlugin) Receive(ctx *network.PluginContext) error {
	switch msg := ctx.Message().(type) {
	case *messages.ChatMessage:
		glog.Infof("<%s> %s", ctx.Client().ID.Address, msg.Message)
	}

	return nil
}

3. net, err := builder.Build()主要是构造Network对象,并初始化,初始化代码如下:

func (n *Network) Init() {
	workerCount := runtime.NumCPU() + 1

	// Spawn worker routines for receiving and handling messages in the application layer.
	go n.handleRecvQueue() //从n.RecvQueue读取消息

	for i := 0; i < workerCount; i++ {
		// Spawn worker routines for sending queued messages to the networking layer.
		go n.handleSendQueue() //从n.SendQueue发送消息
	}
}

4.go net.Listen() 启用监听网络,支持KCP、TCP协议

5.net.Bootstrap(peers...) 与peers节点建立连接

func (n *Network) Bootstrap(addresses ...string) {
	n.BlockUntilListening()

	addresses = FilterPeers(n.Address, addresses)

	for _, address := range addresses {
		client, err := n.Client(address)

		if err != nil {
			glog.Error(err)
			continue
		}

		err = client.Tell(&protobuf.Ping{}) //发送Ping消息
		if err != nil {
			continue
		}
	}
}

6. net.Broadcast(&messages.ChatMessage{Message: input}) 循环所有节点广播控制台输入的消息

func (n *Network) Broadcast(message proto.Message) {
	n.Peers.Range(func(key, value interface{}) bool {
		client := value.(*PeerClient)

		err := client.Tell(message)

		if err != nil {
			glog.Warningf("failed to send message to peer %v [err=%s]", client.ID, err)
		}

		return true
	})
}


本文作者:architect.bian,欢迎收藏,转载请保留原文地址并保留版权声明!谢谢~
还没完!往下看!!!

相关标签: p2p noise