欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

btcd p2p 网络分析

程序员文章站 2022-07-01 13:53:50
...

btcd p2p 网络分析

比特币依赖于对等网络来实现信息的共享与传输,网络中的每个节点即可以是客户端也可以是服务端,本篇文章基于比特币go版本btcd探索比特币对等网络的实现原理,整个实现从底层到上层可以分为地址,连接,节点三层,每层都有自己的功能与职责。下面逐一的分析这三个部分的构成与功能

地址管理

连接管理对象结构,其中重要的两个成员是addrNew和addTried,前者维护了1024个地址桶,每个桶的尺寸为64,地址经过一个散列算法放入到桶里面,保存的是已经添加尚未确认的连接,后者则维护了64个list,每个确定完好的连接hash散列后放到里面。

    type AddrManager struct {
        mtx            sync.Mutex
        peersFile      string
        lookupFunc     func(string) ([]net.IP, error)
        rand           *rand.Rand
        key            [32]byte
        addrIndex      map[string]*KnownAddress // address key to ka for all addrs.
        addrNew        [newBucketCount]map[string]*KnownAddress
        addrTried      [triedBucketCount]*list.List
        started        int32
        shutdown       int32
        wg             sync.WaitGroup
        quit           chan struct{}
        nTried         int
        nNew           int
        lamtx          sync.Mutex
        localAddresses map[string]*localAddress
    }

当通过AddLocalAddress函数添加一个新的地址的时候,这个地址会先加到addrNew里面,GetAddress会有一半的几率从addrNew里面随机选取一个地址上来尝试进行网络连接校验,如果检验完成,则会调用Good方法,将这个地址从New移动到tred里面。

GetAddress函数选择addrNew和选择addrTried的几率一半一半,选择addrTried用于更新老的可用性差的节点。for循环保证能获取到节点,chance()用于用于桶内位置调整,访问的越频繁这个值越大,选中的可能性越小,这是为了避免多次访问重复的节点。为了防止经过多轮次也无法选中地址,通过factor变量来控制,随着轮次的增加factor增加进而减少上面的限制,意思是实在是找不到没用过的点到用老的也行了。

    func (a *AddrManager) GetAddress() *KnownAddress {
        //省略
        // Use a 50% chance for choosing between tried and new table entries.
        if a.nTried > 0 && (a.nNew == 0 || a.rand.Intn(2) == 0) {
            // Tried entry.
            large := 1 << 30
            factor := 1.0
            for {
                // pick a random bucket.
                bucket := a.rand.Intn(len(a.addrTried))
                if a.addrTried[bucket].Len() == 0 {
                    continue
                }


                randval := a.rand.Intn(large)
                //控制几率避免段时间访问重复节点
                if float64(randval) < (factor * ka.chance() * float64(large)) {
                    log.Tracef("Selected %v from tried bucket",
                        NetAddressKey(ka.na))
                    return ka
                }
                factor *= 1.2
            }
        } else {
            // new node.
            // XXX use a closure/function to avoid repeating this.
            large := 1 << 30
            factor := 1.0
            for {
                // Pick a random bucket.
                bucket := a.rand.Intn(len(a.addrNew))
                if len(a.addrNew[bucket]) == 0 {
                    continue
                }
                // Then, a random entry in it.
                var ka *KnownAddress
                nth := a.rand.Intn(len(a.addrNew[bucket]))
                for _, value := range a.addrNew[bucket] {
                    if nth == 0 {
                        ka = value
                    }
                    nth--
                }
                randval := a.rand.Intn(large)
                if float64(randval) < (factor * ka.chance() * float64(large)) {
                    log.Tracef("Selected %v from new bucket",
                        NetAddressKey(ka.na))
                    return ka
                }
                factor *= 1.2
            }
        }
    }

连接管理

连接主要控制地址的可达性,通过一个状态即来控制连接的生命周期,每个连接都有如下五种状态,

    ConnPending ConnState = iota
    ConnFailing
    ConnCanceled
    ConnEstablished
    ConnDisconnected

函数connHandler监控几个chanel()的变化来对应处理连接的状态。

1)ConnPending 通过NewConn函数创建的对象进入该分支处理,将这个新的连接状态标记为ConnPending,并把连接挂到挂起连接里面

    case registerPending:
        connReq := msg.c
        connReq.updateState(ConnPending)
        pending[msg.c.id] = connReq
        close(msg.done)

2)ConnFailing 当Connect函数确认连接出现错误的时候会进入handfail分支标记连接错误


    case handleFailed:
            connReq := msg.c
            //一种特殊情况,当存在的peer过少,为了保证系统可用会把这个操作当作失败的情况处理
            if _, ok := pending[connReq.id]; !ok {
                log.Debugf("Ignoring connection for "+
                    "canceled conn req: %v", connReq)
                continue
            }

            connReq.updateState(ConnFailing)
            log.Debugf("Failed to connect to %v: %v",
                connReq, msg.err)
            cm.handleFailedConn(connReq)

3)ConnCanceled 当用户主动断开连接且该连接尚未完全建立的时候视为ConnCanceled状态

    case handleDisconnected:
        connReq, ok := conns[msg.id]
        if !ok {
            connReq, ok = pending[msg.id]
            if !ok {
                log.Errorf("Unknown connid=%d",
                    msg.id)
                continue
            }

            // Pending connection was found, remove
            // it from pending map if we should
            // ignore a later, successful
            // connection.
            connReq.updateState(ConnCanceled)
            log.Debugf("Canceling: %v", connReq)
            delete(pending, msg.id)
            continue
        }

4)ConnEstablished 连接通过函数Connect确认建立成功后变成ConnEstablished

    case handleConnected:
                    connReq := msg.c

                    if _, ok := pending[connReq.id]; !ok {
                        if msg.conn != nil {
                            msg.conn.Close()
                        }
                        log.Debugf("Ignoring connection for "+
                            "canceled connreq=%v", connReq)
                        continue
                    }

                    connReq.updateState(ConnEstablished)
                    connReq.conn = msg.conn
                    conns[connReq.id] = connReq
                    log.Debugf("Connected to %v", connReq)
                    connReq.retryCount = 0
                    cm.failedAttempts = 0

                    delete(pending, connReq.id)

                    if cm.cfg.OnConnection != nil {
                        go cm.cfg.OnConnection(connReq, msg.conn)
                    }

5)ConnDisconnected 用户主动断开已经建立好的连接,该连接状态会变成ConnDisconnected

    case handleDisconnected:
        if connReq.conn != nil {
            connReq.conn.Close()
        }

        if cm.cfg.OnDisconnection != nil {
            go cm.cfg.OnDisconnection(connReq)
        }

        // All internal state has been cleaned up, if
        // this connection is being removed, we will
        // make no further attempts with this request.
        if !msg.retry {
            connReq.updateState(ConnDisconnected)
            continue
        }

连接管理里面还附带了一个用于考评连接质量的估分函数,该数值累加函数如下

    func (s *DynamicBanScore) increase(persistent, transient uint32, t time.Time) uint32 {
        s.persistent += persistent
        tu := t.Unix()
        dt := tu - s.lastUnix

        if transient > 0 {
            if Lifetime < dt {
                s.transient = 0
            } else if s.transient > 1 && dt > 0 {
                s.transient *= decayFactor(dt)
            }
            s.transient += float64(transient)
            s.lastUnix = tu
        }
        return s.persistent + uint32(s.transient)
    }

参数和具体的操作有关系,大致来说会随着操作不断累加,操作越多增长的越快,当超过一定阈值后,该peer会休息一段时间,主要是为了防止恶意流量攻击。调用increase的位置都是可能出现大流量的位置(GetData,MemPool...)。

协议

协议层定义了网络消息的读写格式与应答方式,该协议定义了如下的消息类型.

    CmdVersion      = "version"   版本
    CmdVerAck       = "verack"    联通
    CmdGetAddr      = "getaddr"   获取地址
    CmdAddr         = "addr"      发送地址
    CmdGetBlocks    = "getblocks" 获取区块
    CmdInv          = "inv"       发送inv(交易/区块)
    CmdGetData      = "getdata"   发送区块数据
    CmdNotFound     = "notfound"
    CmdBlock        = "block"     发送区块
    CmdTx           = "tx"        发送交易
    CmdGetHeaders   = "getheaders"获取区块头
    CmdHeaders      = "headers"   发送区块头
    CmdPing         = "ping"
    CmdPong         = "pong"      和ping成对使用 维护连接
    CmdAlert        = "alert"     无用
    CmdMemPool      = "mempool"   交易池(代码上看没有用处)
    CmdFilterAdd    = "filteradd"
    CmdFilterClear  = "filterclear"
    CmdFilterLoad   = "filterload"
    CmdMerkleBlock  = "merkleblock"
    CmdReject       = "reject"
    CmdSendHeaders  = "sendheaders"
    CmdFeeFilter    = "feefilter"
    CmdGetCFilters  = "getcfilters"
    CmdGetCFHeaders = "getcfheaders"
    CmdGetCFCheckpt = "getcfcheckpt"
    CmdCFilter      = "cfilter"
    CmdCFHeaders    = "cfheaders"
    CmdCFCheckpt    = "cfcheckpt"

inHandler

负责收消息,收到消息解析具体的消息类型,在调用一个更加具体的函数来处理这些消息,这个具体的函数常常是通过配置从上层传递下来的.

    for atomic.LoadInt32(&p.disconnect) == 0 {
            rmsg, buf, err := p.readMessage(p.wireEncoding)
            idleTimer.Stop()
            //略
            atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
            p.stallControl <- stallControlMsg{sccReceiveMessage, rmsg}

            // Handle each supported message type.
            p.stallControl <- stallControlMsg{sccHandlerStart, rmsg}
            switch msg := rmsg.(type) {
            case *wire.MsgVersion:
                // Limit to one version message per peer.
                p.PushRejectMsg(msg.Command(), wire.RejectDuplicate,
                    "duplicate version message", nil, true)
                break out

            case *wire.MsgVerAck:
                // No read lock is necessary because verAckReceived is not written
                // to in any other goroutine.
                if p.verAckReceived {
                    log.Infof("Already received 'verack' from peer %v -- "+
                        "disconnecting", p)
                    break out
                }
                p.flagsMtx.Lock()
                p.verAckReceived = true
                p.flagsMtx.Unlock()
                if p.cfg.Listeners.OnVerAck != nil {
                    p.cfg.Listeners.OnVerAck(p, msg)
                }
            ///略
            case *wire.MsgSendHeaders:
                p.flagsMtx.Lock()
                p.sendHeadersPreferred = true
                p.flagsMtx.Unlock()

                if p.cfg.Listeners.OnSendHeaders != nil {
                    p.cfg.Listeners.OnSendHeaders(p, msg)
                }

            default:
                log.Debugf("Received unhandled message of type %v "+
                    "from %v", rmsg.Command(), p)
            }
            p.stallControl <- stallControlMsg{sccHandlerDone, rmsg}

            // A message was received so reset the idle timer.
            idleTimer.Reset(idleTimeout)
        }

outHandler

通过for select监听sendQueue,如果有新消息进来调用writeMessage发送消息

序列化,反序列化

每条消息都是由消息头和消息体构成。每个消息都实现有一个Message接口,

    type Message interface {
        BtcDecode(io.Reader, uint32, MessageEncoding) error
        BtcEncode(io.Writer, uint32, MessageEncoding) error
        Command() string
        MaxPayloadLength(uint32) uint32
    }

序列化的过程中先调用BtcDecode方法序列化一个二进制消息体,这里逐个写入消息字段,有兴趣的看wire文件夹下面的每个消息的BtcDecode消息的具体实现。然后双重hash该消息体取其前四位作为消息体校验位,在把网络号,消息名,消息长度,校验位置合起来构成一个消息头。反序列化则是个相反的过程

    err := msg.BtcEncode(&bw, pver, encoding)
    if err != nil {
        return totalBytes, err
    }
    payload := bw.Bytes()
    lenp := len(payload)

    //略

    // Create header for the message.
    hdr := messageHeader{}
    hdr.magic = btcnet
    hdr.command = cmd
    hdr.length = uint32(lenp)
    copy(hdr.checksum[:], chainhash.DoubleHashB(payload)[0:4])

交易控制

在实际应用中节点之间互相广播交易会占用很多流量,为了提高网路性能,节点会控制交易的重发,具体实现为每个peer带有一个knownInventory对象,记录已经发送过的交易Id,发送的时候会检查是否发过,发过的不会在重发发送.这个机制有时候会带来些奇怪的问题.之前在部署网络时候遇到个奇怪的现象就是矿机断电重启后,有些交易需要很久才能打包,原因则在于该机器直接相连的机器认为已经发送过交易,不会在二次重发。

    for e := invSendQueue.Front(); e != nil; e = invSendQueue.Front() {
        iv := invSendQueue.Remove(e).(*wire.InvVect)

        // Don't send inventory that became known after
        // the initial check.
        if p.knownInventory.Exists(iv) {
            continue
        }

        invMsg.AddInvVect(iv)
        if len(invMsg.InvList) >= maxInvTrickleSize {
            waiting = queuePacket(
                outMsg{msg: invMsg},
                pendingMsgs, waiting)
            invMsg = wire.NewMsgInvSizeHint(uint(invSendQueue.Len()))
        }

        // Add the inventory that is being relayed to
        // the known inventory for the peer.
        p.AddKnownInventory(iv)
    }

peer管理

peer管理主要负责peer节点的维护,消息的应答方式等

peer维护

peerHandler中for select 监听几个chanel,每个chanel对应几种操作,增删改查之类的,即在系统内部使用也提供外部rpc功能.

    for {
        select {
        // New peers connected to the server.
        case p := <-s.newPeers:
            s.handleAddPeerMsg(state, p)

        // Disconnected peers.
        case p := <-s.donePeers:
            s.handleDonePeerMsg(state, p)

        // Block accepted in mainchain or orphan, update peer height.
        case umsg := <-s.peerHeightsUpdate:
            s.handleUpdatePeerHeights(state, umsg)

        // Peer to ban.
        case p := <-s.banPeers:
            s.handleBanPeerMsg(state, p)

        // New inventory to potentially be relayed to other peers.
        case invMsg := <-s.relayInv:
            s.handleRelayInvMsg(state, invMsg)

        // Message to broadcast to all connected peers except those
        // which are excluded by the message.
        case bmsg := <-s.broadcast:
            s.handleBroadcastMsg(state, &bmsg)

        case qmsg := <-s.query:
            s.handleQuery(state, qmsg)

        case <-s.quit:
            // Disconnect all peers on server shutdown.
            state.forAllPeers(func(sp *serverPeer) {
                srvrLog.Tracef("Shutdown peer %s", sp)
                sp.Disconnect()
            })
            break out
        }
    }

这里有个可以说下的是交易重发机制,主要是解决节点刚启动时候同步交易池的问题,只要连接到节点,等待一段时间,周边的节点就会进行广播,这时候就能得到交易了,考虑到上面说过的knownInv问题,这时候最好是找个新的节点连接比较安全

peer的产生

server 启动时候会调用connManager的Start方法,其中会产生许多的连接对象,

    for i := atomic.LoadUint64(&cm.connReqCount); i < uint64 (cm.cfg.TargetOutbound); i++ {
        go cm.NewConnReq()
    }

NewConnReq函数里面会走上面提过的几种状态变化,最后进到ConnEstablished状态分支里面,该分支会调用OnConnection函数,这个函数就是outboundPeerConnected函数。这里结构有点差,整体上是这样子的,connManager中配置了一个函数变量OnConnect,而在p2p servver启动的时候会赋值connManager的函数,这里就是把outboundPeerConnected函数赋值给connManager的OnConnection变量(OnConnect:PeerConnected)。

    func (s *server) outboundPeerConnected(c *connmgr.ConnReq, conn net.Conn) {
        sp := newServerPeer(s, c.Permanent)
        p, err := peer.NewOutboundPeer(newPeerConfig(sp), c.Addr.String())
        if err != nil {
            srvrLog.Debugf("Cannot create outbound peer %s: %v", c.Addr, err)
            s.connManager.Disconnect(c.ID())
        }
        sp.Peer = p
        sp.connReq = c
        sp.isWhitelisted = isWhitelisted(conn.RemoteAddr())
        sp.AssociateConnection(conn)
        go s.peerDoneHandler(sp)
        s.addrManager.Attempt(sp.NA())
    }

peer产生之后在调用AssociateConnection关联连接对象的时候就开始进行信息沟通. 调用流程是:AssociateConnection -> start -> negotiateOutboundProtocol -> readRemoteVersionMsg 具体代码就不贴了,该函数最后又会调用到server的OnVersion函数(OnVersion和OnConnect是相同的做法),该函数主要就是校验版本,服务之类的功能是否完整匹配,此后节点就建立成功,之后就可以进行数据的广播同步了.

    if !cfg.SimNet && !isInbound {
        addrManager.SetServices(remoteAddr, msg.Services)  //服务校验
    }

    // Ignore peers that have a protcol version that is too old.  The peer
    // negotiation logic will disconnect it after this callback returns.
    if msg.ProtocolVersion < int32(peer.MinAcceptableProtocolVersion) { //版本校验
        return nil
    }

    //这里是交换地址用的  协议是 	getaddr/addr  当你连到一个点的时候互相交换地址,
    hasTimestamp := sp.ProtocolVersion() >= wire.NetAddressTimeVersion
    if addrManager.NeedMoreAddresses() && hasTimestamp {
        sp.QueueMessage(wire.NewMsgGetAddr(), nil)
    }

    //这句上面说过会把newAddr移动到triedAddr 这句通过了才说能对象节点是完全可用的
    // Mark the address as a known good address.
    addrManager.Good(remoteAddr)

peer同步维持

有个SyncManager结构体负责区块,交易同步的维护,接上文,新的节点加进来之后刷新bestPeer(原则是高度最高的节点),之后向该节点发送getblocks请求,参数则是自己的高度状态

    if sm.nextCheckpoint != nil &&
        best.Height < sm.nextCheckpoint.Height &&
        sm.chainParams != &chaincfg.RegressionNetParams {

        bestPeer.PushGetHeadersMsg(locator, sm.nextCheckpoint.Hash)
        sm.headersFirstMode = true
        log.Infof("Downloading headers for blocks %d to "+
            "%d from peer %s", best.Height+1,
            sm.nextCheckpoint.Height, bestPeer.Addr())
    } else {
        bestPeer.PushGetBlocksMsg(locator, &zeroHash)
    }

对方收到请求之后,比对高度,查找到自己的BlockHash构造InvTypeBlock类型NewMsgInv消息返回,然后在逐个的请求区块

发送方 接收方
PushGetBlocksMsg(当前自己高度) InvTypeBlock类型NewMsgInv(返回对方没有的高度hash))
OnInv处理后发送NewMsgGetData(接受message后逐个getdata) MsgBlock(区块详细信息)
OnBlock处理附加到链上

转载于:https://my.oschina.net/hunjixin/blog/2254414