欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

开源代码memberlist源码分析

程序员文章站 2024-03-20 22:40:28
...

本文微信公众号链接:https://mp.weixin.qq.com/s/abY24PhBgNDJgh5m9Taq4w

 

memberlist是go语言开发的,基于Gossip协议来传播消息,用来管理分布式集群内节点发现、 节点失效探测、节点列表的软件包。

 

对于Gossip协议之前写过一篇文章: Gossip协议简介---病毒感染模型的p2p算法

 

源码地址 https://github.com/hashicorp/memberlist

 

为了学习memberlist的原理设计,遵循个人从低版本代码研究的习惯。这里一提交号fe04265为分析。

 

再次备注:学习早期版本,只是为了学习开源代码的设计原理,底层工作原理。以及版本在进化过程中,源码的改进。

开源代码memberlist源码分析

 

源码目录:

开源代码memberlist源码分析

 

整体代码风格像面向对象c的风格。模块划分刚好以文件名为划分

1、broadcast.go :广播模块

2、net.go:传输与协议处理模块

3、state.go:节点状态管理模块

4、memberlist.go:主模块

 

 

github.com/hashicorp/memberlist/memberlist.go

Memberlist

在结构体Memberlist中,成员变量也是按照功能不同分隔

type Memberlist struct {
config *Config  //配置
shutdown bool  //本地服务关闭的标志位
leave bool  //本节点退出的标志位

udpListener *net.UDPConn
tcpListener *net.TCPListener
//udp和tcp的链接管理。对应的net.go,传输与协议管理

sequenceNum uint32 // Local sequence number
//本地seq num
incarnation uint32 // Local incarnation number
//本地inc num

nodeLock sync.RWMutex
nodes []*NodeState // Known nodes
nodeMap map[string]*NodeState // Maps Addr.String() -> NodeState
//node管理以及state管理对应state.go

tickerLock sync.Mutex
tickers []*time.Ticker
stopTick chan struct{}
probeIndex int

ackLock sync.Mutex
ackHandlers map[uint32]*ackHandler

broadcastLock sync.Mutex
bcQueue broadcasts
//broadcast管理,对应broadcast.go
}

 

 

Config

在config中,前面是一些基本的配置项,注释也都有解释。

type Config struct {
Name string // Node name (FQDN)
BindAddr string // Binding address
UDPPort int // UDP port to listen on
TCPPort int // TCP port to listen on
TCPTimeout time.Duration // TCP timeout
IndirectChecks int // Number of indirect checks to use
RetransmitMult int // Retransmits = RetransmitMult * log(N+1)
SuspicionMult int // Suspicion time = SuspcicionMult * log(N+1) * Interval
PushPullInterval time.Duration // How often we do a Push/Pull update
RTT time.Duration // 99% precentile of round-trip-time
ProbeInterval time.Duration // Failure probing interval length

GossipNodes int // Number of nodes to gossip to per GossipInterval
GossipInterval time.Duration // Gossip interval for non-piggyback messages (only if GossipNodes > 0)

JoinCh chan<- *Node
LeaveCh chan<- *Node
}

 

在最后两行

JoinCh:这个是对外提供的一个接口,用于做新增node的时候,作为外部注册通知处理

LeaveCh:这个是对外提供的一个接口,用于做对去除一个node的时候,做为外部注册通知处理

这两个chan,在更早的版本中是在结构体memberlist中。后来移到了config中。

 

开始进入流程

Create

// Create will start memberlist and create a new gossip pool, but
// will not connect to an existing node. This should only be used
// for the first node in the cluster.
func Create(conf *Config) (*Memberlist, error) {
  m, err := newMemberlist(conf)
//newMemberlist,中开启了tcplisten和udplisten
  if err != nil {
  return nil, err
  }
  if err := m.setAlive(); err != nil {
  m.Shutdown()
  return nil, err
  }
  m.schedule()
//schedule中开启了三个服务:probe、pushpull、gossip
  return m, nil
}
这里面有两个重要步骤
1、newMemberlist
2、m.schedule

 

newMemberlist
// newMemberlist creates the network listeners.
// Does not schedule exeuction of background maintenence.
func newMemberlist(conf *Config) (*Memberlist, error) {
  
    tcpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.TCPPort)
    tcpLn, err := net.Listen("tcp", tcpAddr)
    if err != nil {
        return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
    }
//上面是创建tcplisten

    udpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.UDPPort)
    udpLn, err := net.ListenPacket("udp", udpAddr)
    if err != nil {
        tcpLn.Close()
        return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
    }
//上面是创建udplisten


    m := &Memberlist{config: conf,
        udpListener: udpLn.(*net.UDPConn),
        tcpListener: tcpLn.(*net.TCPListener),
        nodeMap: make(map[string]*NodeState),
        stopTick: make(chan struct{}, 32),
        ackHandlers: make(map[uint32]*ackHandler),
      }//构建Memberlist实例

    go m.tcpListen() //开启tcp服务
    go m.udpListen() //开启udp服务
    return m, nil
}
在newMemberlist中,最主要的动作就是开启了tcp服务和udp服务
那么就看看net服务(tcp和udp)
github.com/hashicorp/memberlist/net.go
tcp
tcplisten
// tcpListen listens for and handles incoming connections
func (m *Memberlist) tcpListen() {
    for {
        //tcp accept
        conn, err := m.tcpListener.AcceptTCP()
        if err != nil {
            if m.shutdown {
                break
            }
            log.Printf("[ERR] Error accepting TCP connection: %s", err)
            continue
        }
        //每个链接都有一个处理部分handleConn
        go m.handleConn(conn)
    }
}
继续看
handleConn
// handleConn handles a single incoming TCP connection
func (m *Memberlist) handleConn(conn *net.TCPConn) {
    defer conn.Close()

//读取Remote的状态
    remoteNodes, err := readRemoteState(conn)
    if err != nil {
        log.Printf("[ERR] Failed to receive remote state: %s", err)
    return
    }
//发送本地节点的状态
    if err := m.sendLocalState(conn); err != nil {
        log.Printf("[ERR] Failed to push local state: %s", err)
    }
//将收到的Remote状态进行更新
    m.mergeState(remoteNodes)
}
tcp服务提供的功能就是:同步节点状态。
readRemoteState
读取节点状态信息,并返回
// recvRemoteState is used to read the remote state from a connection
func readRemoteState(conn net.Conn) ([]pushNodeState, error) {
// Read the message type
//读取数据
    buf := []byte{0}
    if _, err := conn.Read(buf); err != nil {
        return nil, err
    }
//读取消息类型
    msgType := uint8(buf[0])

// Quit if not push/pull
//支持push和pull消息
    if msgType != pushPullMsg {
        err := fmt.Errorf("received invalid msgType (%d)", msgType)
        return nil, err
    }

// Read the push/pull header
//解码
    var header pushPullHeader
    hd := codec.MsgpackHandle{}
    dec := codec.NewDecoder(conn, &hd)
    if err := dec.Decode(&header); err != nil {
        return nil, err
    }

// Allocate space for the transfer
//解码所有的节点信息
    remoteNodes := make([]pushNodeState, header.Nodes)
// Try to decode all the states
    for i := 0; i < header.Nodes; i++ {
        if err := dec.Decode(&remoteNodes[i]); err != nil {
            return remoteNodes, err
        }
    }
//返回节点状态信息
    return remoteNodes, nil
}
 
sendLocalState
发送本地存储的节点状态信息
// sendLocalState is invoked to send our local state over a tcp connection
func (m *Memberlist) sendLocalState(conn net.Conn) error {
// Prepare the local node state
//收集本地存储的节点状态信息
    m.nodeLock.RLock()
    localNodes := make([]pushNodeState, len(m.nodes))
    for idx, n := range m.nodes {
        localNodes[idx].Name = n.Name
        localNodes[idx].Addr = n.Addr
        localNodes[idx].Incarnation = n.Incarnation
        localNodes[idx].State = n.State
    }
    m.nodeLock.RUnlock()

//添加头部信息
// Send our node state
    header := pushPullHeader{Nodes: len(localNodes)}
    hd := codec.MsgpackHandle{}
    enc := codec.NewEncoder(conn, &hd)

// Begin state push
    conn.Write([]byte{pushPullMsg})
//编码并发送
    if err := enc.Encode(&header); err != nil {
        return err
    }
    for i := 0; i < header.Nodes; i++ {
        if err := enc.Encode(&localNodes[i]); err != nil {
            return err
        }
    }
    return nil
}
 
mergeState
更新节点状态
// mergeState is invoked by the network layer when we get a Push/Pull
// state transfer
func (m *Memberlist) mergeState(remote []pushNodeState) {
   for _, r := range remote {
      // Look for a matching local node
      m.nodeLock.RLock()
      local, ok := m.nodeMap[r.Name]
      m.nodeLock.RUnlock()

      // Skip if we agree on states
      if ok && local.State == r.State {
  //若状态与本地存储状态一直,则跳过
        continue
      }
//三种状态
      switch r.State {
//StateAlive
      case StateAlive:
         a := alive{Incarnation: r.Incarnation, Node: r.Name, Addr: r.Addr}
         m.aliveNode(&a)

//StateSupect
      case StateSuspect:
         s := suspect{Incarnation: r.Incarnation, Node: r.Name}
         m.suspectNode(&s)

//StateDead
      case StateDead:
         d := dead{Incarnation: r.Incarnation, Node: r.Name}
         m.deadNode(&d)
      }
   }
}
存在三种状态:
StateAlive:处理函数aliveNode
StateSupect:处理函数supectNode
StateDead:处理函数deadNode
这三者的处理在后续解读
tcp小结
tcp链接,主要处理节点状态信息的同步与更新。
 
udp
udpListen
代码还是很简单的,不断读取数据,然后交给handleCommand处理
// udpListen listens for and handles incoming UDP packets
func (m *Memberlist) udpListen() {
   mainBuf := make([]byte, udpBufSize)
   var n int
   var addr net.Addr
   var err error
   for {
      // Reset buffer
      buf := mainBuf[0:udpBufSize]

      // Read a packet
//不断从udplisten中读取数据
      n, addr, err = m.udpListener.ReadFrom(buf)
      if err != nil {
         if m.shutdown {
            break
         }
         log.Printf("[ERR] Error reading UDP packet: %s", err)
         continue
      }

      // Check the length
      if n < 1 {
         log.Printf("[ERR] UDP packet too short (%d bytes). From: %s", len(buf), addr)
         continue
      }

      // Handle the command
//真正处理部分
    m.handleCommand(buf[:n], addr)
   }
}
 
handleCommand
func (m *Memberlist) handleCommand(buf []byte, from net.Addr) {
   // Decode the message type
//解码消息类型
   msgType := uint8(buf[0])
   buf = buf[1:]

   // Switch on the msgType
//根据消息不同消息类型,进行不同的处理
switch msgType {
   case compoundMsg:
      m.handleCompound(buf, from)
   case pingMsg:
      m.handlePing(buf, from)
   case indirectPingMsg:
      m.handleIndirectPing(buf, from)
   case ackRespMsg:
      m.handleAck(buf, from)
   case suspectMsg:
      m.handleSuspect(buf, from)
   case aliveMsg:
      m.handleAlive(buf, from)
   case deadMsg:
      m.handleDead(buf, from)
   default:
      log.Printf("[ERR] UDP msg type (%d) not supported. From: %s", msgType, from)
   }
}
一共有:
compoundMsg:处理函数为handleCompound
    多个消息聚合在一起,进行分割,然后再重新调用handleCommand
func (m *Memberlist) handleCompound(buf []byte, from net.Addr) {
   // Decode the parts
//消息分割   trunc, parts, err := decodeCompoundMessage(buf)
   if err != nil {
      log.Printf("[ERR] Failed to decode compound request: %s", err)
      return
   }

   // Log any truncation
   if trunc > 0 {
      log.Printf("[WARN] Compound request had %d truncated messages", trunc)
   }

   // Handle each message
   for _, part := range parts {
//分割的消息重新调用handleCommand
      m.handleCommand(part, from)
   }
}
pingMsg:处理函数为:handlePing
indirectPingMsg:处理函数为handleindirectPing
ackRespMsg:处理函数为handleAck
    上面三个消息,都比较简单
suspectMsg:处理函数handleSuspect
    调用的函数为suspectNode
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
   var sus suspect
   if err := decode(buf, &sus); err != nil {
      log.Printf("[ERR] Failed to decode suspect message: %s", err)
      return
   }
   m.suspectNode(&sus)
}
 
aliveMsg:处理函数handleAlive
    调用的函数为aliveNode
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
   var live alive
   if err := decode(buf, &live); err != nil {
      log.Printf("[ERR] Failed to decode alive message: %s", err)
      return
   }
   m.aliveNode(&live)
}
 
deadMsg:处理函数handleDead
    调用的函数为deadNode
func (m *Memberlist) handleDead(buf []byte, from net.Addr) {
   var d dead
   if err := decode(buf, &d); err != nil {
      log.Printf("[ERR] Failed to decode dead message: %s", err)
      return
   }
   m.deadNode(&d)
}
 
udp小结:
udp服务提供了一些基本的Command操作
 
github.com/hashicorp/memberlist/state.go
节点状态信息管理
在github.com/hashicorp/memberlist/memberlist.go中
Create,最后调用的函数schedule
// Schedule is used to ensure the Tick is performed periodically
func (m *Memberlist) schedule() {
   m.tickerLock.Lock()
   defer m.tickerLock.Unlock()

   // Create a new probeTicker
//开启了probe协程
   if m.config.ProbeInterval > 0 {
      t := time.NewTicker(m.config.ProbeInterval)
      go m.triggerFunc(t.C, m.probe)
      m.tickers = append(m.tickers, t)
   }

   // Create a push pull ticker if needed
//开启了pushpull协程
   if m.config.PushPullInterval > 0 {
      t := time.NewTicker(m.config.PushPullInterval)
      go m.triggerFunc(t.C, m.pushPull)
      m.tickers = append(m.tickers, t)
   }

   // Create a gossip ticker if needed
//开启了gossip协程
   if m.config.GossipNodes > 0 {
      t := time.NewTicker(m.config.GossipInterval)
      go m.triggerFunc(t.C, m.gossip)
      m.tickers = append(m.tickers, t)
   }
}
在这里面一共开启了三个定时任务
probe、pushpull、gossip
 
probe

当节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息,当PING消息失败后,会随机选取 IndirectChecks 个节点发起间接PING的请求和直接更其再发起一个tcp PING消息。 收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。 如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。

https://www.colabug.com/1010287.html

 

// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
   // Track the number of indexes we've considered probing
   numCheck := 0
START:
   // Make sure we don't wrap around infinitely
   if numCheck >= len(m.nodes) {
      return
   }

   // Handle the wrap around case
//probeIndex是node索引,循环进行探测
   if m.probeIndex >= len(m.nodes) {
      m.resetNodes()
      m.probeIndex = 0
      numCheck++
      goto START
   }

   // Determine if we should probe this node
   skip := false
   var node *NodeState
   m.nodeLock.RLock()

   node = m.nodes[m.probeIndex]
   if node.Name == m.config.Name {
      skip = true//当node在配置文件中
} else if node.State == StateDead {
      skip = true//当node为dead时候
}

   // Potentially skip
   m.nodeLock.RUnlock()
   if skip {//node在配置文件中或者为dead时候则跳过
      numCheck++
      m.probeIndex++
      goto START
   }

   // Probe the specific node
//进行probem.probeNode(node)
}
// probeNode handles a single round of failure checking on a node
func (m *Memberlist) probeNode(node *NodeState) {
   // Send a ping to the node
   ping := ping{SeqNo: m.nextSeqNo()}
   destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}

   // Setup an ack handler
   ackCh := make(chan bool, m.config.IndirectChecks+1)
   m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)

   // Send the ping message
//发送pingMsg
   if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
      log.Printf("[ERR] Failed to send ping: %s", err)
      return
   }

   // Wait for response or round-trip-time
   select {
   case v := <-ackCh:
      if v == true {
         return
      }
   case <-time.After(m.config.RTT):
   }

   // Get some random live nodes
   m.nodeLock.RLock()
   excludes := []string{m.config.Name, node.Name}
//随机获取一些节点
   kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)
   m.nodeLock.RUnlock()

   // Attempt an indirect ping
   ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr}
   for _, peer := range kNodes {
      destAddr := &net.UDPAddr{IP: peer.Addr, Port: m.config.UDPPort}
//发送indirectPingMsg
      if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
         log.Printf("[ERR] Failed to send indirect ping: %s", err)
      }
   }

   // Wait for the acks or timeout
   select {
   case v := <-ackCh:
      if v == true {
         return
      }
   }

   // No acks received from target, suspect
   s := suspect{Incarnation: node.Incarnation, Node: node.Name}
//若探测结果失败则将node设置为suspect
   m.suspectNode(&s)
}
 
pushpull

每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。

https://www.jianshu.com/p/e2173b44db65

// pushPull is invoked periodically to randomly perform a state
// exchange. Used to ensure a high level of convergence.
func (m *Memberlist) pushPull() {
   // Get a random live node
   m.nodeLock.RLock()
   excludes := []string{m.config.Name}
//随机选取1个节点
   nodes := kRandomNodes(1, excludes, m.nodes)
   m.nodeLock.RUnlock()

   // If no nodes, bail
   if len(nodes) == 0 {
      return
   }
   node := nodes[0]

   // Attempt a push pull
//调用pushPullNode
if err := m.pushPullNode(node.Addr); err != nil {
      log.Printf("[ERR] Push/Pull with %s failed: %s", node.Name, err)
   }
}
上面随机选取一个节点

 
// pushPullNode is invoked to do a state exchange with
// a given node
func (m *Memberlist) pushPullNode(addr []byte) error {
   // Attempt to send and receive with the node
//发送并获取状态信息
   remote, err := m.sendAndReceiveState(addr)
   if err != nil {
      return nil
   }

   // Merge the state
//合并更新节点状态信息
   m.mergeState(remote)
   return nil
}
// sendState is used to initiate a push/pull over TCP with a remote node
func (m *Memberlist) sendAndReceiveState(addr []byte) ([]pushNodeState, error) {
   // Attempt to connect
//创建tcp client链接
   dialer := net.Dialer{Timeout: m.config.TCPTimeout}
   dest := net.TCPAddr{IP: addr, Port: m.config.TCPPort}
   conn, err := dialer.Dial("tcp", dest.String())
   if err != nil {
      return nil, err
   }

   // Send our state
//发送本地节点状态信息
   if err := m.sendLocalState(conn); err != nil {
      return nil, err
   }

   // Read remote state
//读取Remote节点状态信息并返回
remote, err := readRemoteState(conn)
   if err != nil {
      return nil, err
   }

   // Return the remote state
   return remote, nil
}
 
gossip

节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。

https://www.jianshu.com/p/e2173b44db65

// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
   // Get some random live nodes
   m.nodeLock.RLock()
   excludes := []string{m.config.Name}
//随机获取gossipNodes配置项个数的节点
   kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)
   m.nodeLock.RUnlock()

   // Compute the bytes available
   bytesAvail := udpSendBuf - compoundHeaderOverhead
   for _, node := range kNodes {
      // Get any pending broadcasts
//获取能够广播消息大小
      msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
      if len(msgs) == 0 {
         return
      }

      // Create a compound message
//创建一个合并的消息
      compound := makeCompoundMessage(msgs)

      // Send the compound message
      destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}
//发送消息
      if err := m.rawSendMsg(destAddr, compound); err != nil {
         log.Printf("[ERR] Failed to send gossip to %s: %s", destAddr, err)
      }
   }
}
 
 
那么节点的三个状态有
alive
用于标识活跃节点
aliveNode
// aliveNode is invoked by the network layer when we get a message
// about a live node
func (m *Memberlist) aliveNode(a *alive) {
   m.nodeLock.Lock()
   defer m.nodeLock.Unlock()
在节点信息中进行查找
   state, ok := m.nodeMap[a.Node]

   // Check if we've never seen this node before

  if !ok {
//以下为添加新的节点 
      state = &NodeState{
         Node: Node{
            Name: a.Node,
            Addr: a.Addr,
         },
         State: StateDead,
      }

      // Add to map
      m.nodeMap[a.Node] = state
      // Get a random offset. This is important to ensure
      // the failure detection bound is low on average. If all
      // nodes did an append, failure detection bound would be
      // very high.
      n := len(m.nodes)
      offset := randomOffset(n)

      // Add at the end and swap with the node at the offset
      m.nodes = append(m.nodes, state)
      m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
   }

   // Bail if the incarnation number is old
//inc若是更新的则返回
if a.Incarnation <= state.Incarnation {
      return
   }

//inc信息为旧的,则广播alivemsg
   // Re-Broadcast
   m.encodeAndBroadcast(a.Node, aliveMsg, a)

   // Update the state and incarnation number
   oldState := state.State
   state.Incarnation = a.Incarnation
   if state.State != StateAlive {
      state.State = StateAlive
      state.StateChange = time.Now()
   }

//节点状态发生变化,则通知到joinch
   // if Dead -> Alive, notify of join
   if oldState == StateDead {
      notify(m.config.JoinCh, &state.Node)
   }
}
 
suspect

当探测一些节点失败时,或者suspect某个节点的信息时,会将本地对应的信息标记为suspect,然后启动一个 定时器,并发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。 当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。

https://www.colabug.com/1010287.html

suspectNode

// suspectNode is invoked by the network layer when we get a message
// about a suspect node
func (m *Memberlist) suspectNode(s *suspect) {
   m.nodeLock.Lock()
   defer m.nodeLock.Unlock()
   state, ok := m.nodeMap[s.Node]

   // If we've never heard about this node before, ignore it
   if !ok {
      return
   }

   // Ignore old incarnation numbers
   if s.Incarnation < state.Incarnation {
      return
   }

   // Ignore non-alive nodes
   if state.State != StateAlive {
      return
   }

   // If this is us we need to refute, otherwise re-broadcast
   if state.Name == m.config.Name {
      inc := m.nextIncarnation()
      a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}
      m.encodeAndBroadcast(s.Node, aliveMsg, a)

      state.Incarnation = inc
      return // Do not mark ourself suspect
   } else {
      m.encodeAndBroadcast(s.Node, suspectMsg, s)
   }

   // Update the state
   state.Incarnation = s.Incarnation
   state.State = StateSuspect
   changeTime := time.Now()
   state.StateChange = changeTime
   // Setup a timeout for this
   timeout := suspicionTimeout(m.config.SuspicionMult, len(m.nodes), m.config.ProbeInterval)
   time.AfterFunc(timeout, func() {
      if state.State == StateSuspect && state.StateChange == changeTime {
         m.suspectTimeout(state)
      }
   })
}
 
dead

当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播。收到dead广播 消息的节点会跟本地的记录比较,当本地记录也是dead时会忽略消息,当本地的记录不是dead时,会删除本地 的记录再将dead消息再次广播出去,形成再次传播。 如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播 以修正其他节点上存储的本节点数据。

https://www.colabug.com/1010287.html

 

deadNode
// deadNode is invoked by the network layer when we get a message
// about a dead node
func (m *Memberlist) deadNode(d *dead) {
   m.nodeLock.Lock()
   defer m.nodeLock.Unlock()
   state, ok := m.nodeMap[d.Node]

   // If we've never heard about this node before, ignore it
   if !ok {
      return
   }

   // Ignore old incarnation numbers
   if d.Incarnation < state.Incarnation {
      return
   }

   // Ignore if node is already dead
   if state.State == StateDead {
      return
   }

   // If this is us we need to refute, otherwise re-broadcast
   if state.Name == m.config.Name && !m.leave {
      inc := m.nextIncarnation()
      a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}
      m.encodeAndBroadcast(d.Node, aliveMsg, a)

      state.Incarnation = inc
      return // Do not mark ourself dead
   } else {
      m.encodeAndBroadcast(d.Node, deadMsg, d)
   }

   // Update the state
   state.Incarnation = d.Incarnation
   state.State = StateDead
   state.StateChange = time.Now()

   // Notify of death
//将dead节点信息通知到LeaveCh
   notify(m.config.LeaveCh, &state.Node)
}

github.com/hashicorp/memberlist/broadcast.go
broadcast模块是广播模块
提供了三个函数,最主要的函数是
getBroadcasts
返回一个广播的最大size,主要是用于填充udp包。很简单代码如下
// getBroadcasts is used to return a slice of broadcasts to send up to
// a maximum byte size, while imposing a per-broadcast overhead. This is used
// to fill a UDP packet with piggybacked data
func (m *Memberlist) getBroadcasts(overhead, limit int) []*bytes.Buffer {
   m.broadcastLock.Lock()
   defer m.broadcastLock.Unlock()

   transmitLimit := retransmitLimit(m.config.RetransmitMult, len(m.nodes))
   bytesUsed := 0
   var toSend []*bytes.Buffer
   for i := len(m.bcQueue) - 1; i >= 0; i-- {
      // Check if this is within our limits
      b := m.bcQueue[i]
      if bytesUsed+overhead+b.msg.Len() > limit {
         continue
      }

      // Add to slice to send
      bytesUsed += overhead + b.msg.Len()
      toSend = append(toSend, b.msg)

      // Check if we should stop transmission
      b.transmits++
      if b.transmits >= transmitLimit {
         n := len(m.bcQueue)
         m.bcQueue[i], m.bcQueue[n-1] = m.bcQueue[n-1], nil
         m.bcQueue = m.bcQueue[:n-1]
      }
   }

   // If we are sending anything, we need to re-sort to deal
   // with adjusted transmit counts
   if len(toSend) > 0 {
      m.bcQueue.Sort()
   }

   return toSend
}
 
 
总结:
上面这个代码版本,算是一个比较小集的版本,功能模块分的比较清晰
但其还未提供对用户的使用接口。在其后续的版本中,模块代码做了一
些调整,并抽象出了一个接口,供给使用者。
 
关于Delegate的接口使用,后续会有新的文章案例来说明。
 

 

 

 

龚浩华

月牙寂道长

qq:29185807

2019年06月13日

如果你觉得本文对你有帮助,可以转到你的朋友圈,让更多人一起学习。

第一时间获取文章,可以关注本人公众号:月牙寂道长,也可以扫码关注

 

开源代码memberlist源码分析