6.824 Lab 2: Raft
简介
lab2主要是根据论文 In Search of an Understandable Consensus Algorithm (Extended Version) 做一个复制状态机(replicated state machine)并以此做一个有容错性的存储k/v的分布式系统。
思路
整个lab2分为ABC三个部分:
- lab2A主要完成leader election部分,从多个server中选举出一个leader,并让选出的leader周期性地发送心跳包给各follower;
- lab2B主要完成日志的追加、提交(commit)、应用(apply);
- lab3C主要完成日志的持久化和server崩溃后的恢复。
具体实现
package raft
//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
// create a new Raft server.
// rf.Start(command interface{}) (index, Term, isleader)
// start agreement on a new log entry
// rf.GetState() (Term, isLeader)
// ask a Raft for its current Term, and whether it thinks it is leader
// ApplyMsg
// each time a new entry is committed to the log, each Raft peer
// should send an ApplyMsg to the service (or tester)
// in the same server.
//
import (
"bytes"
"log"
"math/rand"
"sync"
"time"
)
import "sync/atomic"
import "../labrpc"
import "../labgob"
//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in Lab 3 you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh; at that point you can add fields to
// ApplyMsg, but set CommandValid to false for these other uses.
//
type ApplyMsg struct {
CommandValid bool
Command interface{}
CommandIndex int
}
type Entry struct {
Command interface{}
Term int
}
const (
Follower = iota
Leader
Candidate
)
// time-related constants, in millisecond
const (
FixedTimeout = 210
RandomTimeout = 500
CheckTimeoutPeriod = 10
HeartbeatPeriod = 100
)
//
// A Go object implementing a single Raft peer.
//
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (2A, 2B, 2C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
// persistent state on all servers:
currentTerm int // latest Term server has seen (initialized to 0 on first boot, increases monotonically)
votedFor int // CandidateId that received vote in current Term (or null if none)
log []Entry // log entries; each entry contains command for state machine, and Term when entry was received by leader (first index is 1)
// volatile state on all servers:
commitIndex int // index of highest log entry known to be committed (initialized to 0, increases monotonically)
lastApplied int // index of highest log entry applied to state machine (initialized to 0, increases monotonically)
state int // each server has three state: Follower/Leader/Candidate
timer Timer // time a server, if time-out, then convert to candidate and kick off an election
applyCh chan ApplyMsg // channel to send message to application
newApplicable *sync.Cond // condition variable used to wake goroutine that apply committed entries
// volatile state on leaders:
nextIndex []int // for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
matchIndex []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)
}
type AppendEntriesArgs struct {
Term int // leader’s term
LeaderId int // so follower can redirect clients
PrevLogIndex int // index of log entry immediately preceding new ones
PrevLogTerm int // term of prevLogIndex entry
Entries []Entry // log entries to store (empty for heartbeat; may send more than one for efficiency)
LeaderCommit int // leader’s commitIndex
}
type AppendEntriesReply struct {
Term int // currentTerm, for leader to update itself
Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm
// additional information needed to back up faster
ConflictTerm int // term of the conflicting entry
FirstConflictTermIndex int // index of the first entry of conflicting term
LengthLog int // length of log
}
type RequestVoteArgs struct {
Term int // candidate’s term
CandidateId int // candidate requesting vote
LastLogIndex int // index of candidate’s last log entry
LastLogTerm int // Term of candidate’s last log entry
}
type RequestVoteReply struct {
Term int // currentTerm, for candidate to update itself
VoteGranted bool // true means candidate received vote
}
type Timer struct {
startTime time.Time
timeout time.Duration
r *rand.Rand
}
func (t *Timer) isExpired() bool {
return time.Now().Sub(t.startTime) > t.timeout
}
func (t *Timer) reset() {
t.timeout = FixedTimeout*time.Millisecond +
time.Duration(t.r.Int63n(RandomTimeout))*time.Millisecond
t.startTime = time.Now()
}
// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {
var term int
var isLeader bool
rf.mu.Lock()
defer rf.mu.Unlock()
term = rf.currentTerm
isLeader = rf.state == Leader
return term, isLeader
}
// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
w := new(bytes.Buffer)
e := labgob.NewEncoder(w)
err := e.Encode(rf.currentTerm)
if err != nil {
log.Fatalf("[%d] fails to encode currentTerm", &rf.me)
}
err = e.Encode(rf.votedFor)
if err != nil {
log.Fatalf("[%d] fails to encode votedFor", &rf.me)
}
//DPrintf("[%d] encoding log to persist(%v)", rf.me, rf.log)
err = e.Encode(rf.log)
if err != nil {
log.Fatalf("[%d] fails to encode log", &rf.me)
}
data := w.Bytes()
rf.persister.SaveRaftState(data)
//DPrintf("[%d] saves persistent state", rf.me)
}
// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
if data == nil || len(data) < 1 { // bootstrap without any state?
return
}
rf.mu.Lock()
defer rf.mu.Unlock()
r := bytes.NewBuffer(data)
d := labgob.NewDecoder(r)
var currentTerm, votedFor int
var persistedLog []Entry
if d.Decode(¤tTerm) != nil || d.Decode(&votedFor) != nil || d.Decode(&persistedLog) != nil {
// fail to read persistent data
//DPrintf("[%d] fail to read persistent data", rf.me)
} else {
rf.currentTerm = currentTerm
rf.votedFor = votedFor
rf.log = persistedLog
}
}
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.VoteGranted = false
reply.Term = rf.currentTerm
// 1. Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
//DPrintf("[%d] refuse to vote for %d", rf.me, args.CandidateId)
//DPrintf("votedFor: %d, CandidateId: %d", rf.votedFor, args.CandidateId)
return
}
// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
// this server MUST NOT immediately convert to follower of this candidate
// we still need to check condition 2. to decide whether we vote for a server
if args.Term > rf.currentTerm {
rf.convertToFollower(args.Term)
}
// 2. If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
// Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs.
// If the logs have last entries with different terms, then the log with the later term is more up-to-date.
// If the logs end with the same term, then whichever log is longer is more up-to-date.
if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
(args.LastLogTerm > rf.getLogTerm(len(rf.log)) ||
args.LastLogTerm == rf.getLogTerm(len(rf.log)) && args.LastLogIndex >= len(rf.log)) {
rf.state = Follower
rf.timer.reset()
rf.votedFor = args.CandidateId
rf.persist()
reply.VoteGranted = true
}
}
// return term of the given index
// log index starts from 1
func (rf *Raft) getLogTerm(index int) int {
// return -1 if index is out of lower bound
if index-1 < 0 {
return -1
}
return rf.log[index-1].Term
}
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Term = rf.currentTerm
reply.Success = false
reply.LengthLog = -1
reply.ConflictTerm = -1
reply.ConflictTerm = -1
// 1. Reply false if term < currentTerm (§5.1)
if args.Term < rf.currentTerm {
return
}
// this is a legit leader because term >= currentTerm, therefore we reset timer
rf.timer.reset()
// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
// if this leader has a higher term, then we should adopt his term and convert to follower.
// we should set votedFor = args.leaderId, since this is a legit leader and we do not expect to vote for anyone else.
// if we set votedFor = -1, and then some server from previous term crashes,
// and then reboot and kick off an election at this term, servers with votedFor = -1
// may vote for them even though they are following this leader.
// this reboot server may become a leader and overwrite committed entries.
if args.Term > rf.currentTerm {
rf.currentTerm = args.Term
rf.state = Follower
rf.votedFor = args.LeaderId
rf.persist()
} else {
rf.state = Follower
}
//DPrintf("[%d] receives AppendEntries RPC from %d, (currentTerm = %d, logLength = %d)", rf.me, args.LeaderId, rf.currentTerm, len(rf.log))
// 2. Reply false if log does not contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
if len(rf.log) < args.PrevLogIndex || rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm {
reply.LengthLog = len(rf.log)
if len(rf.log) >= args.PrevLogIndex {
reply.ConflictTerm = rf.getLogTerm(args.PrevLogIndex)
reply.FirstConflictTermIndex = rf.findFirstIndex(reply.ConflictTerm)
}
return
}
// 3. If an existing entry conflicts with a new one (same index but different terms),
// delete the existing entry and all that follow it (§5.3)
// do not truncate its entries if they have the same term
// check entries until they don't match
index := 0
for index < len(args.Entries) && args.PrevLogIndex+index < len(rf.log) {
if rf.log[args.PrevLogIndex+index].Term != args.Entries[index].Term {
break
}
index++
}
args.Entries = args.Entries[index:]
// 4. Append any new entries not already in the log
// truncate rf.log only when new entries present, prevent it from truncating valid entries
if len(args.Entries) > 0 {
//DPrintf("[%d] truncates its log from len = %d to len = %d", rf.me, len(rf.log), args.PrevLogIndex+index)
rf.log = rf.log[:args.PrevLogIndex+index]
//for i := 0; i < len(args.Entries); i++ {
// DPrintf("[%d] appends entry(Command = %v, term = %d), preLogIndex = %d", rf.me, args.Entries[i].Command, args.Entries[i].Term, args.PrevLogIndex)
//}
rf.log = append(rf.log, args.Entries...)
rf.persist()
}
// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
//DPrintf("[%d] receives AppendEntryRPC from %d with leaderCommit=%d", rf.me, args.LeaderId, args.LeaderCommit)
if args.LeaderCommit > rf.commitIndex {
//DPrintf("[%d] updates its commitIndex from %d to %d", rf.me, rf.commitIndex, min(args.LeaderCommit, len(rf.log)))
rf.commitIndex = min(args.LeaderCommit, len(rf.log))
rf.newApplicable.Signal()
}
reply.Success = true
}
// find first index of the given term
func (rf *Raft) findFirstIndex(term int) int {
left, right, res := 0, len(rf.log)-1, -1
for left <= right {
mid := left + (right-left)/2
if rf.log[mid].Term < term {
left = mid + 1
} else if rf.log[mid].Term > term {
right = mid - 1
} else {
res = mid
right = mid - 1
}
}
return res + 1
}
// find last index of the given term
func (rf *Raft) findLastIndex(term int) int {
left, right, res := 0, len(rf.log)-1, -1
for left <= right {
mid := left + (right-left)/2
if rf.log[mid].Term < term {
left = mid + 1
} else if rf.log[mid].Term > term {
right = mid - 1
} else {
res = mid
left = mid + 1
}
}
return res + 1
}
// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// Term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
index := -1
term := -1
rf.mu.Lock()
// return false if this is not the leader
if rf.state != Leader || rf.killed() {
rf.mu.Unlock()
return index, term, false
}
index = len(rf.log) + 1
term = rf.currentTerm
// append new entry to leader
e := Entry{Command: command, Term: rf.currentTerm}
rf.log = append(rf.log, e)
//DPrintf("[%d] appends new entry to log from start function.(command=%v, term=%v)", rf.me, e.Command, e.Term)
rf.persist()
rf.mu.Unlock()
rf.callAppendEntries()
return index, term, true
}
// caller should hold rf.mu while calling this function
func (rf *Raft) getMaxCommitIndex() int {
maxCommit := rf.commitIndex
next := rf.commitIndex + 1
for rf.canCommit(next) {
// to eliminate problems like figure 8
// Raft never commits log entries from previous terms by counting replicas.
// leader commits an entry from this term to implicitly commit entries from previous term
if rf.getLogTerm(next) == rf.currentTerm {
maxCommit = next
}
next++
}
//if maxCommit != rf.commitIndex {
// DPrintf("[%d] updates its commit index from %d to %d", rf.me, rf.commitIndex, maxCommit)
//}
return maxCommit
}
// check if entry[0...index] is allowed to be committed
func (rf *Raft) canCommit(index int) bool {
if index > len(rf.log) {
return false
}
// count servers that have log[index]
count := 1
for i, n := range rf.matchIndex {
if i == rf.me {
continue
}
if n >= index {
count++
}
}
return count > len(rf.peers)/2
}
// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
func (rf *Raft) Kill() {
atomic.StoreInt32(&rf.dead, 1)
// Your code here, if desired.
}
func (rf *Raft) killed() bool {
z := atomic.LoadInt32(&rf.dead)
return z == 1
}
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
rf.applyCh = applyCh
rf.commitIndex = 0
rf.lastApplied = 0
rf.newApplicable = sync.NewCond(&rf.mu)
rf.timer = Timer{startTime: time.Now(), r: rand.New(rand.NewSource(int64(me + 1)))}
rf.state = Follower
rf.timer.reset()
// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())
//DPrintf("[%d] readPersist: currentTerm = %d, votedFor = %d, log = %v", rf.me, rf.currentTerm, rf.votedFor, rf.log)
// periodically check if it hits timeout
go rf.periodicTimeout()
// check if there's any newly committed entry to apply
go rf.applyCommittedEntries()
return rf
}
func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
return ok
}
func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
return ok
}
func (rf *Raft) periodicTimeout() {
for !rf.killed() {
rf.mu.Lock()
// not a leader when timer expires, convert to candidate and kick off an election
if rf.state != Leader && rf.timer.isExpired() {
go rf.kickOffElection()
}
rf.mu.Unlock()
time.Sleep(CheckTimeoutPeriod * time.Millisecond)
}
}
func (rf *Raft) periodicHeartbeat() {
for !rf.killed() {
rf.mu.Lock()
if rf.state == Leader {
rf.mu.Unlock()
rf.callAppendEntries()
} else {
// not a leader anymore, exit this goroutine
rf.mu.Unlock()
return
}
time.Sleep(HeartbeatPeriod * time.Millisecond)
}
}
func (rf *Raft) kickOffElection() {
rf.mu.Lock()
rf.timer.reset()
rf.state = Candidate
rf.currentTerm++
rf.votedFor = rf.me
rf.persist()
//DPrintf("[%d] is kicking off an election at term %d", rf.me, rf.currentTerm)
// record rf.currentTerm because we need to release the lock while calling RequestVote RPC
term := rf.currentTerm
voteCount := 1
done := false
rf.mu.Unlock()
for i := range rf.peers {
if i == rf.me {
continue
}
go func(server int) {
voteGranted := rf.CallRequestVote(server, term)
if !voteGranted {
return
}
// receive vote from server
rf.mu.Lock()
voteCount++
//DPrintf("[%d] gets vote from %d", rf.me, server)
// if this goroutine sees there's not enough vote, exits
if done || voteCount <= len(rf.peers)/2 {
rf.mu.Unlock()
return
}
// get enough votes, become a leader
done = true
// double check if it's still in the term when the election started
if rf.currentTerm == term {
//DPrintf("[%d] gets enough votes and now becomes the leader (currentTerm = %d, voteCount = %d, numPeer = %d)", rf.me, rf.currentTerm, voteCount, len(rf.peers))
// we should not set votedFor = -1, since a leader should not vote for anyone else in his term
rf.initializeLeader()
rf.mu.Unlock()
// start a go routine to send out heartbeat periodically
go rf.periodicHeartbeat()
} else {
rf.mu.Unlock()
}
}(i)
}
}
// caller should hold rf.mu while calling this function
func (rf *Raft) initializeLeader() {
rf.state = Leader
// nextIndex is initialized to the end of the log
rf.nextIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.nextIndex); i++ {
rf.nextIndex[i] = len(rf.log) + 1
}
// match index is initialized to 0
rf.matchIndex = make([]int, len(rf.peers))
for i := 0; i < len(rf.matchIndex); i++ {
rf.matchIndex[i] = 0
}
}
// since sending on the applyCh can block, it must be done in a single goroutine.
// this goroutine shouldn't hold any lock while sending message into rf.applyCh
func (rf *Raft) applyCommittedEntries() {
for !rf.killed() {
rf.newApplicable.L.Lock()
// check if there is any new applicable entry
for rf.lastApplied >= rf.commitIndex {
rf.newApplicable.Wait()
}
var messages []ApplyMsg
// apply committed entries
for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
msg := ApplyMsg{
CommandValid: true,
Command: rf.log[i-1].Command,
CommandIndex: i,
}
messages = append(messages, msg)
}
rf.newApplicable.L.Unlock()
for _, m := range messages {
rf.applyCh <- m
rf.mu.Lock()
rf.lastApplied++
//DPrintf("[%d] applies entry[%d].(Command=%v)", rf.me, m.CommandIndex, m.Command)
rf.mu.Unlock()
}
}
}
func (rf *Raft) callAppendEntries() {
rf.mu.Lock()
// update leader's commitIndex every time before sending heartbeat
commitIndex := rf.getMaxCommitIndex()
// new committed entry, wake goroutine applyCommittedEntries to apply theses entries
if commitIndex != rf.commitIndex {
rf.commitIndex = commitIndex
rf.newApplicable.Signal()
}
me := rf.me
rf.mu.Unlock()
for i := 0; i < len(rf.peers); i++ {
if i == me {
continue
}
rf.mu.Lock()
args := AppendEntriesArgs{
Term: rf.currentTerm,
LeaderId: rf.me,
PrevLogIndex: rf.nextIndex[i] - 1,
PrevLogTerm: rf.getLogTerm(rf.nextIndex[i] - 1),
Entries: rf.log[rf.nextIndex[i]-1:],
LeaderCommit: rf.commitIndex,
}
reply := AppendEntriesReply{}
rf.mu.Unlock()
go func(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) {
ok := rf.sendAppendEntries(server, args, reply)
if !ok {
//DPrintf("[%d] failed to contact %d", me, server)
return
}
rf.mu.Lock()
// a reply contains higher term, convert to follower
if reply.Term > rf.currentTerm {
rf.convertToFollower(reply.Term)
} else if !reply.Success && reply.LengthLog != -1 {
// back up strategy
// case 1: follower does not have any entry at args.PrevLogIndex,
// back up to follower's end of log
// s1 4
// s2 4 6 6 6
// case 2: leader has entry with the same term of follower's conflicting entry,
// back up to leader's last index of entry with the conflicting term
// s1 4 4 4
// s2 4 6 6 6
// case 3: leader does not have follower's conflicting term at all
// back up to follower's first index of entry with the conflicting term
// s1 4 5 5
// s2 4 6 6 6
if reply.LengthLog < args.PrevLogIndex {
rf.nextIndex[server] = reply.LengthLog + 1
} else if lastIndex := rf.findLastIndex(reply.ConflictTerm); lastIndex != -1 {
rf.nextIndex[server] = lastIndex + 1
} else {
// leader does not have entry with reply's term at all
rf.nextIndex[server] = reply.FirstConflictTermIndex
}
} else {
rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = rf.matchIndex[server] + 1
}
rf.mu.Unlock()
}(i, &args, &reply)
}
}
func (rf *Raft) CallRequestVote(server int, term int) bool {
rf.mu.Lock()
//DPrintf("[%d] is sending request vote to %d", rf.me, server)
args := RequestVoteArgs{
Term: term,
CandidateId: rf.me,
LastLogIndex: len(rf.log),
LastLogTerm: rf.getLogTerm(len(rf.log)),
}
rf.mu.Unlock()
var reply RequestVoteReply
// actually send a RPC
ok := rf.sendRequestVote(server, &args, &reply)
if !ok {
return false
}
rf.mu.Lock()
// a reply contains higher term, convert to follower
if rf.currentTerm < reply.Term {
rf.convertToFollower(reply.Term)
}
rf.mu.Unlock()
return reply.VoteGranted
}
func min(a, b int) int {
if a < b {
return a
} else {
return b
}
}
func (rf *Raft) convertToFollower(term int) {
rf.currentTerm = term
rf.state = Follower
rf.votedFor = -1
rf.timer.reset()
rf.persist()
}
一些注意点
-
Each candidate restarts its randomized election timeout at the start
of an election, and it waits for that timeout to elapse before
starting the next election. -
Use diffrent seed for every server to generate random number,
otherwise they will generate same random number sequence and
repeatedly cause split vote. -
you will need to either have a dedicated “applier”, or to lock around these applies, so that some other routine doesn’t also detect that entries need to be applied and also tries to apply.
-
Make sure that you check for
commitIndex > lastApplied
either periodically, or aftercommitIndex
is updated (i.e., aftermatchIndex
is updated). -
nextIndex
is a guess as to what prefix the leader shares with a given follower. When a leader has just been elected,nextIndex
is set to be index index at the end of the log. -
matchIndex
is used for safety. It is a conservative measurement of what prefix of the log the leader shares with a given follower. This is whymatchIndex
is initialized to -1 (i.e., we agree on no prefix), and only updated when a follower positively acknowledges anAppendEntries
RPC. -
the correct thing to do is update
matchIndex
to beprevLogIndex + len(entries[])
from the arguments you sent in the RPC originally. -
commitIndex
is volatile because Raft can figure out a correct value for it after a reboot using just the persistent state. Once a leader successfully gets a new log entry committed, it knows everything before that point is also committed. A follower that crashes and comes back up will be told about the rightcommitIndex
whenever the current leader sends it anAppendEntries
RPC. -
upon receiving
AppendEntries
RPC: If leader has a higher term, then we should adopt his term and convert to follower. We should setvotedFor = args.leaderId
, since this is a legit leader. If we setvotedFor = -1
, and then some server from previous term crashes, and then reboot and kick off an election at this term, servers withvotedFor = -1
may vote for them even though they are following this leader. This reboot server may become a leader and overwrite committed entries. -
you should only restart your election timer if:
- you get an
AppendEntries
RPC from the current leader (i.e., if the term in theAppendEntries
arguments is outdated, you should not reset your timer); - you are starting an election; or
- you grant a vote to another peer.