开源代码memberlist源码分析
本文微信公众号链接:https://mp.weixin.qq.com/s/abY24PhBgNDJgh5m9Taq4w
memberlist是go语言开发的,基于Gossip协议来传播消息,用来管理分布式集群内节点发现、 节点失效探测、节点列表的软件包。
对于Gossip协议之前写过一篇文章: Gossip协议简介---病毒感染模型的p2p算法
源码地址 https://github.com/hashicorp/memberlist
为了学习memberlist的原理设计,遵循个人从低版本代码研究的习惯。这里一提交号fe04265为分析。
再次备注:学习早期版本,只是为了学习开源代码的设计原理,底层工作原理。以及版本在进化过程中,源码的改进。
源码目录:
整体代码风格像面向对象c的风格。模块划分刚好以文件名为划分
1、broadcast.go :广播模块
2、net.go:传输与协议处理模块
3、state.go:节点状态管理模块
4、memberlist.go:主模块
github.com/hashicorp/memberlist/memberlist.go
Memberlist
在结构体Memberlist中,成员变量也是按照功能不同分隔
type Memberlist struct {
config *Config //配置
shutdown bool //本地服务关闭的标志位
leave bool //本节点退出的标志位
udpListener *net.UDPConn
tcpListener *net.TCPListener
//udp和tcp的链接管理。对应的net.go,传输与协议管理
sequenceNum uint32 // Local sequence number
//本地seq num
incarnation uint32 // Local incarnation number
//本地inc num
nodeLock sync.RWMutex
nodes []*NodeState // Known nodes
nodeMap map[string]*NodeState // Maps Addr.String() -> NodeState
//node管理以及state管理对应state.go
tickerLock sync.Mutex
tickers []*time.Ticker
stopTick chan struct{}
probeIndex int
ackLock sync.Mutex
ackHandlers map[uint32]*ackHandler
broadcastLock sync.Mutex
bcQueue broadcasts
//broadcast管理,对应broadcast.go
}
Config
在config中,前面是一些基本的配置项,注释也都有解释。
type Config struct {
Name string // Node name (FQDN)
BindAddr string // Binding address
UDPPort int // UDP port to listen on
TCPPort int // TCP port to listen on
TCPTimeout time.Duration // TCP timeout
IndirectChecks int // Number of indirect checks to use
RetransmitMult int // Retransmits = RetransmitMult * log(N+1)
SuspicionMult int // Suspicion time = SuspcicionMult * log(N+1) * Interval
PushPullInterval time.Duration // How often we do a Push/Pull update
RTT time.Duration // 99% precentile of round-trip-time
ProbeInterval time.Duration // Failure probing interval length
GossipNodes int // Number of nodes to gossip to per GossipInterval
GossipInterval time.Duration // Gossip interval for non-piggyback messages (only if GossipNodes > 0)
JoinCh chan<- *Node
LeaveCh chan<- *Node
}
在最后两行
JoinCh:这个是对外提供的一个接口,用于做新增node的时候,作为外部注册通知处理
LeaveCh:这个是对外提供的一个接口,用于做对去除一个node的时候,做为外部注册通知处理
这两个chan,在更早的版本中是在结构体memberlist中。后来移到了config中。
开始进入流程
Create
// Create will start memberlist and create a new gossip pool, but
// will not connect to an existing node. This should only be used
// for the first node in the cluster.
func Create(conf *Config) (*Memberlist, error) {
m, err := newMemberlist(conf)
//newMemberlist,中开启了tcplisten和udplisten
if err != nil {
return nil, err
}
if err := m.setAlive(); err != nil {
m.Shutdown()
return nil, err
}
m.schedule()
//schedule中开启了三个服务:probe、pushpull、gossip
return m, nil
}
这里面有两个重要步骤
1、newMemberlist
2、m.schedule
newMemberlist
// newMemberlist creates the network listeners.
// Does not schedule exeuction of background maintenence.
func newMemberlist(conf *Config) (*Memberlist, error) {
tcpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.TCPPort)
tcpLn, err := net.Listen("tcp", tcpAddr)
if err != nil {
return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
}
//上面是创建tcplisten
udpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.UDPPort)
udpLn, err := net.ListenPacket("udp", udpAddr)
if err != nil {
tcpLn.Close()
return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
}
//上面是创建udplisten
m := &Memberlist{config: conf,
udpListener: udpLn.(*net.UDPConn),
tcpListener: tcpLn.(*net.TCPListener),
nodeMap: make(map[string]*NodeState),
stopTick: make(chan struct{}, 32),
ackHandlers: make(map[uint32]*ackHandler),
}//构建Memberlist实例
go m.tcpListen() //开启tcp服务
go m.udpListen() //开启udp服务
return m, nil
}
在newMemberlist中,最主要的动作就是开启了tcp服务和udp服务
那么就看看net服务(tcp和udp)
github.com/hashicorp/memberlist/net.go
tcp
tcplisten
// tcpListen listens for and handles incoming connections
func (m *Memberlist) tcpListen() {
for {
//tcp accept
conn, err := m.tcpListener.AcceptTCP()
if err != nil {
if m.shutdown {
break
}
log.Printf("[ERR] Error accepting TCP connection: %s", err)
continue
}
//每个链接都有一个处理部分handleConn
go m.handleConn(conn)
}
}
继续看
handleConn
// handleConn handles a single incoming TCP connection
func (m *Memberlist) handleConn(conn *net.TCPConn) {
defer conn.Close()
//读取Remote的状态
remoteNodes, err := readRemoteState(conn)
if err != nil {
log.Printf("[ERR] Failed to receive remote state: %s", err)
return
}
//发送本地节点的状态
if err := m.sendLocalState(conn); err != nil {
log.Printf("[ERR] Failed to push local state: %s", err)
}
//将收到的Remote状态进行更新
m.mergeState(remoteNodes)
}
tcp服务提供的功能就是:同步节点状态。
readRemoteState
读取节点状态信息,并返回
// recvRemoteState is used to read the remote state from a connection
func readRemoteState(conn net.Conn) ([]pushNodeState, error) {
// Read the message type
//读取数据
buf := []byte{0}
if _, err := conn.Read(buf); err != nil {
return nil, err
}
//读取消息类型
msgType := uint8(buf[0])
// Quit if not push/pull
//支持push和pull消息
if msgType != pushPullMsg {
err := fmt.Errorf("received invalid msgType (%d)", msgType)
return nil, err
}
// Read the push/pull header
//解码
var header pushPullHeader
hd := codec.MsgpackHandle{}
dec := codec.NewDecoder(conn, &hd)
if err := dec.Decode(&header); err != nil {
return nil, err
}
// Allocate space for the transfer
//解码所有的节点信息
remoteNodes := make([]pushNodeState, header.Nodes)
// Try to decode all the states
for i := 0; i < header.Nodes; i++ {
if err := dec.Decode(&remoteNodes[i]); err != nil {
return remoteNodes, err
}
}
//返回节点状态信息
return remoteNodes, nil
}
sendLocalState
发送本地存储的节点状态信息
// sendLocalState is invoked to send our local state over a tcp connection
func (m *Memberlist) sendLocalState(conn net.Conn) error {
// Prepare the local node state
//收集本地存储的节点状态信息
m.nodeLock.RLock()
localNodes := make([]pushNodeState, len(m.nodes))
for idx, n := range m.nodes {
localNodes[idx].Name = n.Name
localNodes[idx].Addr = n.Addr
localNodes[idx].Incarnation = n.Incarnation
localNodes[idx].State = n.State
}
m.nodeLock.RUnlock()
//添加头部信息
// Send our node state
header := pushPullHeader{Nodes: len(localNodes)}
hd := codec.MsgpackHandle{}
enc := codec.NewEncoder(conn, &hd)
// Begin state push
conn.Write([]byte{pushPullMsg})
//编码并发送
if err := enc.Encode(&header); err != nil {
return err
}
for i := 0; i < header.Nodes; i++ {
if err := enc.Encode(&localNodes[i]); err != nil {
return err
}
}
return nil
}
mergeState
更新节点状态
// mergeState is invoked by the network layer when we get a Push/Pull
// state transfer
func (m *Memberlist) mergeState(remote []pushNodeState) {
for _, r := range remote {
// Look for a matching local node
m.nodeLock.RLock()
local, ok := m.nodeMap[r.Name]
m.nodeLock.RUnlock()
// Skip if we agree on states
if ok && local.State == r.State {
//若状态与本地存储状态一直,则跳过
continue
}
//三种状态
switch r.State {
//StateAlive
case StateAlive:
a := alive{Incarnation: r.Incarnation, Node: r.Name, Addr: r.Addr}
m.aliveNode(&a)
//StateSupect
case StateSuspect:
s := suspect{Incarnation: r.Incarnation, Node: r.Name}
m.suspectNode(&s)
//StateDead
case StateDead:
d := dead{Incarnation: r.Incarnation, Node: r.Name}
m.deadNode(&d)
}
}
}
存在三种状态:
StateAlive:处理函数aliveNode
StateSupect:处理函数supectNode
StateDead:处理函数deadNode
这三者的处理在后续解读
tcp小结:
tcp链接,主要处理节点状态信息的同步与更新。
udp
udpListen
代码还是很简单的,不断读取数据,然后交给handleCommand处理
// udpListen listens for and handles incoming UDP packets
func (m *Memberlist) udpListen() {
mainBuf := make([]byte, udpBufSize)
var n int
var addr net.Addr
var err error
for {
// Reset buffer
buf := mainBuf[0:udpBufSize]
// Read a packet
//不断从udplisten中读取数据
n, addr, err = m.udpListener.ReadFrom(buf)
if err != nil {
if m.shutdown {
break
}
log.Printf("[ERR] Error reading UDP packet: %s", err)
continue
}
// Check the length
if n < 1 {
log.Printf("[ERR] UDP packet too short (%d bytes). From: %s", len(buf), addr)
continue
}
// Handle the command
//真正处理部分
m.handleCommand(buf[:n], addr)
}
}
handleCommand
func (m *Memberlist) handleCommand(buf []byte, from net.Addr) {
// Decode the message type
//解码消息类型
msgType := uint8(buf[0])
buf = buf[1:]
// Switch on the msgType
//根据消息不同消息类型,进行不同的处理
switch msgType {
case compoundMsg:
m.handleCompound(buf, from)
case pingMsg:
m.handlePing(buf, from)
case indirectPingMsg:
m.handleIndirectPing(buf, from)
case ackRespMsg:
m.handleAck(buf, from)
case suspectMsg:
m.handleSuspect(buf, from)
case aliveMsg:
m.handleAlive(buf, from)
case deadMsg:
m.handleDead(buf, from)
default:
log.Printf("[ERR] UDP msg type (%d) not supported. From: %s", msgType, from)
}
}
一共有:
compoundMsg:处理函数为handleCompound
多个消息聚合在一起,进行分割,然后再重新调用handleCommand
func (m *Memberlist) handleCompound(buf []byte, from net.Addr) {
// Decode the parts
//消息分割 trunc, parts, err := decodeCompoundMessage(buf)
if err != nil {
log.Printf("[ERR] Failed to decode compound request: %s", err)
return
}
// Log any truncation
if trunc > 0 {
log.Printf("[WARN] Compound request had %d truncated messages", trunc)
}
// Handle each message
for _, part := range parts {
//分割的消息重新调用handleCommand
m.handleCommand(part, from)
}
}
pingMsg:处理函数为:handlePing
indirectPingMsg:处理函数为handleindirectPing
ackRespMsg:处理函数为handleAck
上面三个消息,都比较简单
suspectMsg:处理函数handleSuspect
调用的函数为suspectNode
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
var sus suspect
if err := decode(buf, &sus); err != nil {
log.Printf("[ERR] Failed to decode suspect message: %s", err)
return
}
m.suspectNode(&sus)
}
aliveMsg:处理函数handleAlive
调用的函数为aliveNode
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
var live alive
if err := decode(buf, &live); err != nil {
log.Printf("[ERR] Failed to decode alive message: %s", err)
return
}
m.aliveNode(&live)
}
deadMsg:处理函数handleDead
调用的函数为deadNode
func (m *Memberlist) handleDead(buf []byte, from net.Addr) {
var d dead
if err := decode(buf, &d); err != nil {
log.Printf("[ERR] Failed to decode dead message: %s", err)
return
}
m.deadNode(&d)
}
udp小结:
udp服务提供了一些基本的Command操作
github.com/hashicorp/memberlist/state.go
节点状态信息管理
在github.com/hashicorp/memberlist/memberlist.go中
Create,最后调用的函数schedule
// Schedule is used to ensure the Tick is performed periodically
func (m *Memberlist) schedule() {
m.tickerLock.Lock()
defer m.tickerLock.Unlock()
// Create a new probeTicker
//开启了probe协程
if m.config.ProbeInterval > 0 {
t := time.NewTicker(m.config.ProbeInterval)
go m.triggerFunc(t.C, m.probe)
m.tickers = append(m.tickers, t)
}
// Create a push pull ticker if needed
//开启了pushpull协程
if m.config.PushPullInterval > 0 {
t := time.NewTicker(m.config.PushPullInterval)
go m.triggerFunc(t.C, m.pushPull)
m.tickers = append(m.tickers, t)
}
// Create a gossip ticker if needed
//开启了gossip协程
if m.config.GossipNodes > 0 {
t := time.NewTicker(m.config.GossipInterval)
go m.triggerFunc(t.C, m.gossip)
m.tickers = append(m.tickers, t)
}
}
在这里面一共开启了三个定时任务
probe、pushpull、gossip
probe
当节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息,当PING消息失败后,会随机选取 IndirectChecks 个节点发起间接PING的请求和直接更其再发起一个tcp PING消息。 收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。 如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。
https://www.colabug.com/1010287.html
// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
// Track the number of indexes we've considered probing
numCheck := 0
START:
// Make sure we don't wrap around infinitely
if numCheck >= len(m.nodes) {
return
}
// Handle the wrap around case
//probeIndex是node索引,循环进行探测
if m.probeIndex >= len(m.nodes) {
m.resetNodes()
m.probeIndex = 0
numCheck++
goto START
}
// Determine if we should probe this node
skip := false
var node *NodeState
m.nodeLock.RLock()
node = m.nodes[m.probeIndex]
if node.Name == m.config.Name {
skip = true//当node在配置文件中
} else if node.State == StateDead {
skip = true//当node为dead时候
}
// Potentially skip
m.nodeLock.RUnlock()
if skip {//node在配置文件中或者为dead时候则跳过
numCheck++
m.probeIndex++
goto START
}
// Probe the specific node
//进行probem.probeNode(node)
}
// probeNode handles a single round of failure checking on a node
func (m *Memberlist) probeNode(node *NodeState) {
// Send a ping to the node
ping := ping{SeqNo: m.nextSeqNo()}
destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}
// Setup an ack handler
ackCh := make(chan bool, m.config.IndirectChecks+1)
m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)
// Send the ping message
//发送pingMsg
if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
log.Printf("[ERR] Failed to send ping: %s", err)
return
}
// Wait for response or round-trip-time
select {
case v := <-ackCh:
if v == true {
return
}
case <-time.After(m.config.RTT):
}
// Get some random live nodes
m.nodeLock.RLock()
excludes := []string{m.config.Name, node.Name}
//随机获取一些节点
kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)
m.nodeLock.RUnlock()
// Attempt an indirect ping
ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr}
for _, peer := range kNodes {
destAddr := &net.UDPAddr{IP: peer.Addr, Port: m.config.UDPPort}
//发送indirectPingMsg
if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
log.Printf("[ERR] Failed to send indirect ping: %s", err)
}
}
// Wait for the acks or timeout
select {
case v := <-ackCh:
if v == true {
return
}
}
// No acks received from target, suspect
s := suspect{Incarnation: node.Incarnation, Node: node.Name}
//若探测结果失败则将node设置为suspect
m.suspectNode(&s)
}
pushpull
每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。
https://www.jianshu.com/p/e2173b44db65
// pushPull is invoked periodically to randomly perform a state
// exchange. Used to ensure a high level of convergence.
func (m *Memberlist) pushPull() {
// Get a random live node
m.nodeLock.RLock()
excludes := []string{m.config.Name}
//随机选取1个节点
nodes := kRandomNodes(1, excludes, m.nodes)
m.nodeLock.RUnlock()
// If no nodes, bail
if len(nodes) == 0 {
return
}
node := nodes[0]
// Attempt a push pull
//调用pushPullNode
if err := m.pushPullNode(node.Addr); err != nil {
log.Printf("[ERR] Push/Pull with %s failed: %s", node.Name, err)
}
}
上面随机选取一个节点
// pushPullNode is invoked to do a state exchange with
// a given node
func (m *Memberlist) pushPullNode(addr []byte) error {
// Attempt to send and receive with the node
//发送并获取状态信息
remote, err := m.sendAndReceiveState(addr)
if err != nil {
return nil
}
// Merge the state
//合并更新节点状态信息
m.mergeState(remote)
return nil
}
// sendState is used to initiate a push/pull over TCP with a remote node
func (m *Memberlist) sendAndReceiveState(addr []byte) ([]pushNodeState, error) {
// Attempt to connect
//创建tcp client链接
dialer := net.Dialer{Timeout: m.config.TCPTimeout}
dest := net.TCPAddr{IP: addr, Port: m.config.TCPPort}
conn, err := dialer.Dial("tcp", dest.String())
if err != nil {
return nil, err
}
// Send our state
//发送本地节点状态信息
if err := m.sendLocalState(conn); err != nil {
return nil, err
}
// Read remote state
//读取Remote节点状态信息并返回
remote, err := readRemoteState(conn)
if err != nil {
return nil, err
}
// Return the remote state
return remote, nil
}
gossip
节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。
https://www.jianshu.com/p/e2173b44db65
// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
// Get some random live nodes
m.nodeLock.RLock()
excludes := []string{m.config.Name}
//随机获取gossipNodes配置项个数的节点
kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)
m.nodeLock.RUnlock()
// Compute the bytes available
bytesAvail := udpSendBuf - compoundHeaderOverhead
for _, node := range kNodes {
// Get any pending broadcasts
//获取能够广播消息大小
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
return
}
// Create a compound message
//创建一个合并的消息
compound := makeCompoundMessage(msgs)
// Send the compound message
destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}
//发送消息
if err := m.rawSendMsg(destAddr, compound); err != nil {
log.Printf("[ERR] Failed to send gossip to %s: %s", destAddr, err)
}
}
}
那么节点的三个状态有
alive
用于标识活跃节点
aliveNode
// aliveNode is invoked by the network layer when we get a message
// about a live node
func (m *Memberlist) aliveNode(a *alive) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
在节点信息中进行查找
state, ok := m.nodeMap[a.Node]
// Check if we've never seen this node before
if !ok {
//以下为添加新的节点
state = &NodeState{
Node: Node{
Name: a.Node,
Addr: a.Addr,
},
State: StateDead,
}
// Add to map
m.nodeMap[a.Node] = state
// Get a random offset. This is important to ensure
// the failure detection bound is low on average. If all
// nodes did an append, failure detection bound would be
// very high.
n := len(m.nodes)
offset := randomOffset(n)
// Add at the end and swap with the node at the offset
m.nodes = append(m.nodes, state)
m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
}
// Bail if the incarnation number is old
//inc若是更新的则返回
if a.Incarnation <= state.Incarnation {
return
}
//inc信息为旧的,则广播alivemsg
// Re-Broadcast
m.encodeAndBroadcast(a.Node, aliveMsg, a)
// Update the state and incarnation number
oldState := state.State
state.Incarnation = a.Incarnation
if state.State != StateAlive {
state.State = StateAlive
state.StateChange = time.Now()
}
//节点状态发生变化,则通知到joinch
// if Dead -> Alive, notify of join
if oldState == StateDead {
notify(m.config.JoinCh, &state.Node)
}
}
suspect
当探测一些节点失败时,或者suspect某个节点的信息时,会将本地对应的信息标记为suspect,然后启动一个 定时器,并发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。 当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。
https://www.colabug.com/1010287.html
suspectNode
// suspectNode is invoked by the network layer when we get a message
// about a suspect node
func (m *Memberlist) suspectNode(s *suspect) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[s.Node]
// If we've never heard about this node before, ignore it
if !ok {
return
}
// Ignore old incarnation numbers
if s.Incarnation < state.Incarnation {
return
}
// Ignore non-alive nodes
if state.State != StateAlive {
return
}
// If this is us we need to refute, otherwise re-broadcast
if state.Name == m.config.Name {
inc := m.nextIncarnation()
a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}
m.encodeAndBroadcast(s.Node, aliveMsg, a)
state.Incarnation = inc
return // Do not mark ourself suspect
} else {
m.encodeAndBroadcast(s.Node, suspectMsg, s)
}
// Update the state
state.Incarnation = s.Incarnation
state.State = StateSuspect
changeTime := time.Now()
state.StateChange = changeTime
// Setup a timeout for this
timeout := suspicionTimeout(m.config.SuspicionMult, len(m.nodes), m.config.ProbeInterval)
time.AfterFunc(timeout, func() {
if state.State == StateSuspect && state.StateChange == changeTime {
m.suspectTimeout(state)
}
})
}
dead
当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播。收到dead广播 消息的节点会跟本地的记录比较,当本地记录也是dead时会忽略消息,当本地的记录不是dead时,会删除本地 的记录再将dead消息再次广播出去,形成再次传播。 如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播 以修正其他节点上存储的本节点数据。
https://www.colabug.com/1010287.html
deadNode
// deadNode is invoked by the network layer when we get a message
// about a dead node
func (m *Memberlist) deadNode(d *dead) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
state, ok := m.nodeMap[d.Node]
// If we've never heard about this node before, ignore it
if !ok {
return
}
// Ignore old incarnation numbers
if d.Incarnation < state.Incarnation {
return
}
// Ignore if node is already dead
if state.State == StateDead {
return
}
// If this is us we need to refute, otherwise re-broadcast
if state.Name == m.config.Name && !m.leave {
inc := m.nextIncarnation()
a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}
m.encodeAndBroadcast(d.Node, aliveMsg, a)
state.Incarnation = inc
return // Do not mark ourself dead
} else {
m.encodeAndBroadcast(d.Node, deadMsg, d)
}
// Update the state
state.Incarnation = d.Incarnation
state.State = StateDead
state.StateChange = time.Now()
// Notify of death
//将dead节点信息通知到LeaveCh
notify(m.config.LeaveCh, &state.Node)
}
github.com/hashicorp/memberlist/broadcast.go
broadcast模块是广播模块
提供了三个函数,最主要的函数是
getBroadcasts
返回一个广播的最大size,主要是用于填充udp包。很简单代码如下
// getBroadcasts is used to return a slice of broadcasts to send up to
// a maximum byte size, while imposing a per-broadcast overhead. This is used
// to fill a UDP packet with piggybacked data
func (m *Memberlist) getBroadcasts(overhead, limit int) []*bytes.Buffer {
m.broadcastLock.Lock()
defer m.broadcastLock.Unlock()
transmitLimit := retransmitLimit(m.config.RetransmitMult, len(m.nodes))
bytesUsed := 0
var toSend []*bytes.Buffer
for i := len(m.bcQueue) - 1; i >= 0; i-- {
// Check if this is within our limits
b := m.bcQueue[i]
if bytesUsed+overhead+b.msg.Len() > limit {
continue
}
// Add to slice to send
bytesUsed += overhead + b.msg.Len()
toSend = append(toSend, b.msg)
// Check if we should stop transmission
b.transmits++
if b.transmits >= transmitLimit {
n := len(m.bcQueue)
m.bcQueue[i], m.bcQueue[n-1] = m.bcQueue[n-1], nil
m.bcQueue = m.bcQueue[:n-1]
}
}
// If we are sending anything, we need to re-sort to deal
// with adjusted transmit counts
if len(toSend) > 0 {
m.bcQueue.Sort()
}
return toSend
}
总结:
上面这个代码版本,算是一个比较小集的版本,功能模块分的比较清晰
但其还未提供对用户的使用接口。在其后续的版本中,模块代码做了一
些调整,并抽象出了一个接口,供给使用者。
关于Delegate的接口使用,后续会有新的文章案例来说明。
龚浩华
月牙寂道长
qq:29185807
2019年06月13日
如果你觉得本文对你有帮助,可以转到你的朋友圈,让更多人一起学习。
第一时间获取文章,可以关注本人公众号:月牙寂道长,也可以扫码关注
下一篇: iOS安装CocoaPods详细过程