ETCD 源码学习--Raft log 的实现(十)
在 ETCD 源码学习过程,不会讲解太多的源码知识,只讲解相关的实现机制,需要关注源码细节的朋友可以自行根据文章中的提示,找到相关源码进行学习。
Raft log 主要有两部分组成,一是已提交但未被上层模块处理的 unstable 消息,另一部分是已被上层处理的 stable 消息。
主要文件
/raft/log.go raft log 对 stable 和 unstable 的封装
/raft/log_unstable.log unstable 存储的实现
/raft/storage.log stable 存储的实现
log
log 主要维护几个主要状态来维护节点的 log 的信息,committed (已提交的 Index)、applied(已被上层应用的 Index)、storage(已被上层模块处理的entries消息) 和 unstable(已提交未处理的消息)
数据结构:
type raftLog struct {
storage Storage //持久化存储
unstable unstable //未持久化存储
committed uint64 //当前节点已提交的最大Index
applied uint64 //当前节点已应用的最大Index
logger Logger
maxNextEntsSize uint64
}
追加:
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) { //查看 index 的 term 与 logTerm 是否匹配·
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents) //查找 ents 中,index 与 term 冲突的位置。
switch {
case ci == 0: //没有,全部追加完成
case ci <= l.committed: //如果冲突的位置在已提交的位置之前,有问题
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
//在提交位置之后,将未冲突的追加
offset := index + 1
l.append(ents[ci-offset:]...)
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}
return 0, false
}
func (l *raftLog) append(ents ...pb.Entry) uint64 {
if len(ents) == 0 {
return l.lastIndex()
}
if after := ents[0].Index - 1; after < l.committed {
l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
}
l.unstable.truncateAndAppend(ents)
return l.lastIndex()
}
提交 log 数据
func (l *raftLog) commitTo(tocommit uint64) {
// never decrease commit
if l.committed < tocommit {
if l.lastIndex() < tocommit {
l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
}
l.committed = tocommit
}
}
func (l *raftLog) appliedTo(i uint64) {
if i == 0 {
return
}
if l.committed < i || i < l.applied {
l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
}
l.applied = i
}
func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
unstable
unstable 的日志追加主要在 ratf/raft.log 中的 appendEntry(Leader),以及 handleAppendEntries(Follower/Candidate)。
unstable 数据结构:
type unstable struct {
snapshot *pb.Snapshot //快照
entries []pb.Entry //消息
offset uint64 // 已被上层模块处理的最大的 entry.Index + 1。
logger Logger
}
截断与追加entry:
func (u *unstable) truncateAndAppend(ents []pb.Entry) {
//ents[0] 是这批 entries 的第一个消息,所以它的 Index 最小
after := ents[0].Index
switch {
//如图1
case after == u.offset+uint64(len(u.entries)):
u.entries = append(u.entries, ents...)
//如图2
case after <= u.offset:
u.offset = after
u.entries = ents
//其他取并集
default:
u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
u.entries = append(u.entries, ents...)
}
}
func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
u.mustCheckOutOfBounds(lo, hi)//判断lo,hi 是否超过entries范围
return u.entries[lo-u.offset : hi-u.offset]
}
图1:
图2:
commit log:
//i entry.index, t entry.term
func (u *unstable) stableTo(i, t uint64) {
gt, ok := u.maybeTerm(i) //尝试获取 i 的任期
if !ok {
return
}
if gt == t && i >= u.offset {
u.entries = u.entries[i+1-u.offset:] //缩减数组
u.offset = i + 1 //更新已 stable 的最大index
u.shrinkEntriesArray()
}
}
//缩减entries
func (u *unstable) shrinkEntriesArray() {
const lenMultiple = 2
if len(u.entries) == 0 {
u.entries = nil
} else if len(u.entries)*lenMultiple < cap(u.entries) {
newEntries := make([]pb.Entry, len(u.entries))
copy(newEntries, u.entries)
u.entries = newEntries
}
}
stable
数据结构:
type MemoryStorage struct {
sync.Mutex
hardState pb.HardState // Term、Vote、commit 等节点信息
snapshot pb.Snapshot //快照
ents []pb.Entry //entries
}
追加 log:
func (ms *MemoryStorage) Append(entries []pb.Entry) error {
...
first := ms.firstIndex()
last := entries[0].Index + uint64(len(entries)) - 1
...
if first > entries[0].Index {
entries = entries[first-entries[0].Index:]
}
offset := entries[0].Index - ms.ents[0].Index
//这三种情况与 unstable 相似,这里不在做解释
switch {
case uint64(len(ms.ents)) > offset:
ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
ms.ents = append(ms.ents, entries...)
case uint64(len(ms.ents)) == offset:
ms.ents = append(ms.ents, entries...)
default:
...
}
return nil
}
其他:
ApplySnapshot 申请快照
CreateSnapshot 创建快照
Compact 压缩entries
总结
1.Raft 中 log 模板主要包括两个部分的存储,一是未持久化的 log,另一个是已持久化的 log,并通过 committed 和 applied 还维护节点 Index 信息。
2.未持久化的 log (unstable log) 是客户端进行的提案,当前节点接收到了消息,只是暂时放到了一个 msgs 队列,但未进行处理,需要等待上层模块进行持久化和进行相应的处理。
3.持久化 log (stable) 是当前节点的上层应用对 msgs 队列中的消息进行了相关处理之后,存储到 stable 中,并且对 unstable log 进行 commit。所以一个 log 要么存储在 unstable 中, 要么存在的 stable 中。
PS:欢迎纠正
上一篇: C语言课程设计(房屋管理系统)