欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

6.824 Lab 2: Raft

程序员文章站 2022-07-12 16:54:29
...

简介

lab2主要是根据论文 In Search of an Understandable Consensus Algorithm (Extended Version) 做一个复制状态机(replicated state machine)并以此做一个有容错性的存储k/v的分布式系统。

思路

整个lab2分为ABC三个部分:

  1. lab2A主要完成leader election部分,从多个server中选举出一个leader,并让选出的leader周期性地发送心跳包给各follower;
  2. lab2B主要完成日志的追加、提交(commit)、应用(apply);
  3. lab3C主要完成日志的持久化和server崩溃后的恢复。

具体实现

package raft

//
// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
//
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, Term, isleader)
//   start agreement on a new log entry
// rf.GetState() (Term, isLeader)
//   ask a Raft for its current Term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.
//

import (
	"bytes"
	"log"
	"math/rand"
	"sync"
	"time"
)
import "sync/atomic"
import "../labrpc"
import "../labgob"

//
// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
//
// in Lab 3 you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh; at that point you can add fields to
// ApplyMsg, but set CommandValid to false for these other uses.
//
type ApplyMsg struct {
	CommandValid bool
	Command      interface{}
	CommandIndex int
}

type Entry struct {
	Command interface{}
	Term    int
}

const (
	Follower = iota
	Leader
	Candidate
)

// time-related constants, in millisecond
const (
	FixedTimeout       = 210
	RandomTimeout      = 500
	CheckTimeoutPeriod = 10
	HeartbeatPeriod    = 100
)

//
// A Go object implementing a single Raft peer.
//
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	// persistent state on all servers:
	currentTerm int     // latest Term server has seen (initialized to 0 on first boot, increases monotonically)
	votedFor    int     //	CandidateId that received vote in current Term (or null if none)
	log         []Entry // log entries; each entry contains command for state machine, and Term when entry was received by leader (first index is 1)

	// volatile state on all servers:
	commitIndex   int // index of highest log entry known to be committed (initialized to 0, increases monotonically)
	lastApplied   int // index of highest log entry applied to state machine (initialized to 0, increases monotonically)
	state         int	// each server has three state: Follower/Leader/Candidate
	timer         Timer	// time a server, if time-out, then convert to candidate and kick off an election
	applyCh       chan ApplyMsg	// channel to send message to application
	newApplicable *sync.Cond	// condition variable used to wake goroutine that apply committed entries

	// volatile state on leaders:
	nextIndex  []int // for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
	matchIndex []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)

}

type AppendEntriesArgs struct {
	Term         int     // leader’s term
	LeaderId     int     // so follower can redirect clients
	PrevLogIndex int     // index of log entry immediately preceding new ones
	PrevLogTerm  int     // term of prevLogIndex entry
	Entries      []Entry // log entries to store (empty for heartbeat; may send more than one for efficiency)
	LeaderCommit int     // leader’s commitIndex
}

type AppendEntriesReply struct {
	Term    int  // currentTerm, for leader to update itself
	Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm

	// additional information needed to back up faster
	ConflictTerm           int // term of the conflicting entry
	FirstConflictTermIndex int // index of the first entry of conflicting term
	LengthLog              int // length of log
}

type RequestVoteArgs struct {
	Term         int // candidate’s term
	CandidateId  int // candidate requesting vote
	LastLogIndex int // index of candidate’s last log entry
	LastLogTerm  int // Term of candidate’s last log entry
}

type RequestVoteReply struct {
	Term        int  // currentTerm, for candidate to update itself
	VoteGranted bool // true means candidate received vote
}

type Timer struct {
	startTime time.Time
	timeout   time.Duration
	r         *rand.Rand
}

func (t *Timer) isExpired() bool {
	return time.Now().Sub(t.startTime) > t.timeout
}

func (t *Timer) reset() {
	t.timeout = FixedTimeout*time.Millisecond +
		time.Duration(t.r.Int63n(RandomTimeout))*time.Millisecond
	t.startTime = time.Now()
}

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

	var term int
	var isLeader bool

	rf.mu.Lock()
	defer rf.mu.Unlock()

	term = rf.currentTerm
	isLeader = rf.state == Leader
	return term, isLeader
}

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	err := e.Encode(rf.currentTerm)
	if err != nil {
		log.Fatalf("[%d] fails to encode currentTerm", &rf.me)
	}
	err = e.Encode(rf.votedFor)
	if err != nil {
		log.Fatalf("[%d] fails to encode votedFor", &rf.me)
	}
	//DPrintf("[%d] encoding log to persist(%v)", rf.me, rf.log)
	err = e.Encode(rf.log)
	if err != nil {
		log.Fatalf("[%d] fails to encode log", &rf.me)
	}
	data := w.Bytes()
	rf.persister.SaveRaftState(data)
	//DPrintf("[%d] saves persistent state", rf.me)
}

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
		return
	}
	rf.mu.Lock()
	defer rf.mu.Unlock()
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	var currentTerm, votedFor int
	var persistedLog []Entry
	if d.Decode(&currentTerm) != nil || d.Decode(&votedFor) != nil || d.Decode(&persistedLog) != nil {
		// fail to read persistent data
		//DPrintf("[%d] fail to read persistent data", rf.me)
	} else {
		rf.currentTerm = currentTerm
		rf.votedFor = votedFor
		rf.log = persistedLog
	}
}

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	rf.mu.Lock()
	defer rf.mu.Unlock()
	reply.VoteGranted = false
	reply.Term = rf.currentTerm

	// 1. Reply false if term < currentTerm (§5.1)
	if args.Term < rf.currentTerm {
		//DPrintf("[%d] refuse to vote for %d", rf.me, args.CandidateId)
		//DPrintf("votedFor: %d, CandidateId: %d", rf.votedFor, args.CandidateId)
		return
	}

	// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
	// this server MUST NOT immediately convert to follower of this candidate
	// we still need to check condition 2. to decide whether we vote for a server
	if args.Term > rf.currentTerm {
		rf.convertToFollower(args.Term)
	}

	// 2. If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
	// Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs.
	// If the logs have last entries with different terms, then the log with the later term is more up-to-date.
	// If the logs end with the same term, then whichever log is longer is more up-to-date.
	if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
		(args.LastLogTerm > rf.getLogTerm(len(rf.log)) ||
			args.LastLogTerm == rf.getLogTerm(len(rf.log)) && args.LastLogIndex >= len(rf.log)) {
		rf.state = Follower
		rf.timer.reset()
		rf.votedFor = args.CandidateId
		rf.persist()
		reply.VoteGranted = true
	}
}

// return term of the given index
// log index starts from 1
func (rf *Raft) getLogTerm(index int) int {
	// return -1 if index is out of lower bound
	if index-1 < 0 {
		return -1
	}
	return rf.log[index-1].Term
}

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {

	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm
	reply.Success = false
	reply.LengthLog = -1
	reply.ConflictTerm = -1
	reply.ConflictTerm = -1

	// 1. Reply false if term < currentTerm (§5.1)
	if args.Term < rf.currentTerm {
		return
	}

	// this is a legit leader because term >= currentTerm, therefore we reset timer
	rf.timer.reset()

	// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
	// if this leader has a higher term, then we should adopt his term and convert to follower.
	// we should set votedFor = args.leaderId, since this is a legit leader and we do not expect to vote for anyone else.
	// if we set votedFor = -1, and then some server from previous term crashes,
	// and then reboot and kick off an election at this term, servers with votedFor = -1
	// may vote for them even though they are following this leader.
	// this reboot server may become a leader and overwrite committed entries.
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.state = Follower
		rf.votedFor = args.LeaderId
		rf.persist()
	} else {
		rf.state = Follower
	}

	//DPrintf("[%d] receives AppendEntries RPC from %d, (currentTerm = %d, logLength = %d)", rf.me, args.LeaderId, rf.currentTerm, len(rf.log))

	// 2. Reply false if log does not contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
	if len(rf.log) < args.PrevLogIndex || rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm {
		reply.LengthLog = len(rf.log)
		if len(rf.log) >= args.PrevLogIndex {
			reply.ConflictTerm = rf.getLogTerm(args.PrevLogIndex)
			reply.FirstConflictTermIndex = rf.findFirstIndex(reply.ConflictTerm)
		}
		return
	}

	// 3. If an existing entry conflicts with a new one (same index but different terms),
	// delete the existing entry and all that follow it (§5.3)
	// do not truncate its entries if they have the same term
	// check entries until they don't match
	index := 0
	for index < len(args.Entries) && args.PrevLogIndex+index < len(rf.log) {
		if rf.log[args.PrevLogIndex+index].Term != args.Entries[index].Term {
			break
		}
		index++
	}
	args.Entries = args.Entries[index:]

	// 4. Append any new entries not already in the log
	// truncate rf.log only when new entries present, prevent it from truncating valid entries
	if len(args.Entries) > 0 {
		//DPrintf("[%d] truncates its log from len = %d to len = %d", rf.me, len(rf.log), args.PrevLogIndex+index)
		rf.log = rf.log[:args.PrevLogIndex+index]
		//for i := 0; i < len(args.Entries); i++ {
		//	DPrintf("[%d] appends entry(Command = %v, term = %d), preLogIndex = %d", rf.me, args.Entries[i].Command, args.Entries[i].Term, args.PrevLogIndex)
		//}
		rf.log = append(rf.log, args.Entries...)
		rf.persist()
	}

	// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
	//DPrintf("[%d] receives AppendEntryRPC from %d with leaderCommit=%d", rf.me, args.LeaderId, args.LeaderCommit)
	if args.LeaderCommit > rf.commitIndex {
		//DPrintf("[%d] updates its commitIndex from %d to %d", rf.me, rf.commitIndex, min(args.LeaderCommit, len(rf.log)))
		rf.commitIndex = min(args.LeaderCommit, len(rf.log))
		rf.newApplicable.Signal()
	}

	reply.Success = true
}

// find first index of the given term
func (rf *Raft) findFirstIndex(term int) int {
	left, right, res := 0, len(rf.log)-1, -1
	for left <= right {
		mid := left + (right-left)/2
		if rf.log[mid].Term < term {
			left = mid + 1
		} else if rf.log[mid].Term > term {
			right = mid - 1
		} else {
			res = mid
			right = mid - 1
		}
	}
	return res + 1
}

// find last index of the given term
func (rf *Raft) findLastIndex(term int) int {
	left, right, res := 0, len(rf.log)-1, -1
	for left <= right {
		mid := left + (right-left)/2
		if rf.log[mid].Term < term {
			left = mid + 1
		} else if rf.log[mid].Term > term {
			right = mid - 1
		} else {
			res = mid
			left = mid + 1
		}
	}
	return res + 1
}

// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
//
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// Term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1

	rf.mu.Lock()

	// return false if this is not the leader
	if rf.state != Leader || rf.killed() {
		rf.mu.Unlock()
		return index, term, false
	}

	index = len(rf.log) + 1
	term = rf.currentTerm

	// append new entry to leader
	e := Entry{Command: command, Term: rf.currentTerm}
	rf.log = append(rf.log, e)
	//DPrintf("[%d] appends new entry to log from start function.(command=%v, term=%v)", rf.me, e.Command, e.Term)
	rf.persist()
	rf.mu.Unlock()

	rf.callAppendEntries()
	return index, term, true
}

// caller should hold rf.mu while calling this function
func (rf *Raft) getMaxCommitIndex() int {
	maxCommit := rf.commitIndex
	next := rf.commitIndex + 1
	for rf.canCommit(next) {
		// to eliminate problems like figure 8
		// Raft never commits log entries from previous terms by counting replicas.
		// leader commits an entry from this term to implicitly commit entries from previous term
		if rf.getLogTerm(next) == rf.currentTerm {
			maxCommit = next
		}
		next++
	}
	//if maxCommit != rf.commitIndex {
	//	DPrintf("[%d] updates its commit index from %d to %d", rf.me, rf.commitIndex, maxCommit)
	//}
	return maxCommit
}

// check if entry[0...index] is allowed to be committed
func (rf *Raft) canCommit(index int) bool {
	if index > len(rf.log) {
		return false
	}
	// count servers that have log[index]
	count := 1
	for i, n := range rf.matchIndex {
		if i == rf.me {
			continue
		}
		if n >= index {
			count++
		}
	}
	return count > len(rf.peers)/2
}

// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
func (rf *Raft) Kill() {
	atomic.StoreInt32(&rf.dead, 1)
	// Your code here, if desired.
}

func (rf *Raft) killed() bool {
	z := atomic.LoadInt32(&rf.dead)
	return z == 1
}

// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me
	rf.applyCh = applyCh

	rf.commitIndex = 0
	rf.lastApplied = 0
	rf.newApplicable = sync.NewCond(&rf.mu)
	rf.timer = Timer{startTime: time.Now(), r: rand.New(rand.NewSource(int64(me + 1)))}
	rf.state = Follower
	rf.timer.reset()

	// initialize from state persisted before a crash
	rf.readPersist(persister.ReadRaftState())
	//DPrintf("[%d] readPersist: currentTerm = %d, votedFor = %d, log = %v", rf.me, rf.currentTerm, rf.votedFor, rf.log)

	// periodically check if it hits timeout
	go rf.periodicTimeout()
	// check if there's any newly committed entry to apply
	go rf.applyCommittedEntries()

	return rf
}

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
	return ok
}

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
	return ok
}

func (rf *Raft) periodicTimeout() {
	for !rf.killed() {
		rf.mu.Lock()
		// not a leader when timer expires, convert to candidate and kick off an election
		if rf.state != Leader && rf.timer.isExpired() {
			go rf.kickOffElection()
		}
		rf.mu.Unlock()
		time.Sleep(CheckTimeoutPeriod * time.Millisecond)
	}
}

func (rf *Raft) periodicHeartbeat() {
	for !rf.killed() {
		rf.mu.Lock()
		if rf.state == Leader {
			rf.mu.Unlock()
			rf.callAppendEntries()
		} else {
			// not a leader anymore, exit this goroutine
			rf.mu.Unlock()
			return
		}
		time.Sleep(HeartbeatPeriod * time.Millisecond)
	}
}

func (rf *Raft) kickOffElection() {
	rf.mu.Lock()
	rf.timer.reset()
	rf.state = Candidate
	rf.currentTerm++
	rf.votedFor = rf.me
	rf.persist()
	//DPrintf("[%d] is kicking off an election at term %d", rf.me, rf.currentTerm)
	// record rf.currentTerm because we need to release the lock while calling RequestVote RPC
	term := rf.currentTerm
	voteCount := 1
	done := false
	rf.mu.Unlock()
	for i := range rf.peers {
		if i == rf.me {
			continue
		}
		go func(server int) {
			voteGranted := rf.CallRequestVote(server, term)
			if !voteGranted {
				return
			}
			// receive vote from server
			rf.mu.Lock()
			voteCount++
			//DPrintf("[%d] gets vote from %d", rf.me, server)
			// if this goroutine sees there's not enough vote, exits
			if done || voteCount <= len(rf.peers)/2 {
				rf.mu.Unlock()
				return
			}
			// get enough votes, become a leader
			done = true
			// double check if it's still in the term when the election started
			if rf.currentTerm == term {
				//DPrintf("[%d] gets enough votes and now becomes the leader (currentTerm = %d, voteCount = %d, numPeer = %d)", rf.me, rf.currentTerm, voteCount, len(rf.peers))
				// we should not set votedFor = -1, since a leader should not vote for anyone else in his term
				rf.initializeLeader()
				rf.mu.Unlock()
				// start a go routine to send out heartbeat periodically
				go rf.periodicHeartbeat()
			} else {
				rf.mu.Unlock()
			}
		}(i)
	}
}

// caller should hold rf.mu while calling this function
func (rf *Raft) initializeLeader() {
	rf.state = Leader
	// nextIndex is initialized to the end of the log
	rf.nextIndex = make([]int, len(rf.peers))
	for i := 0; i < len(rf.nextIndex); i++ {
		rf.nextIndex[i] = len(rf.log) + 1
	}
	// match index is initialized to 0
	rf.matchIndex = make([]int, len(rf.peers))
	for i := 0; i < len(rf.matchIndex); i++ {
		rf.matchIndex[i] = 0
	}
}

// since sending on the applyCh can block, it must be done in a single goroutine.
// this goroutine shouldn't hold any lock while sending message into rf.applyCh
func (rf *Raft) applyCommittedEntries() {
	for !rf.killed() {
		rf.newApplicable.L.Lock()
		// check if there is any new applicable entry
		for rf.lastApplied >= rf.commitIndex {
			rf.newApplicable.Wait()
		}

		var messages []ApplyMsg
		// apply committed entries
		for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
			msg := ApplyMsg{
				CommandValid: true,
				Command:      rf.log[i-1].Command,
				CommandIndex: i,
			}
			messages = append(messages, msg)
		}

		rf.newApplicable.L.Unlock()

		for _, m := range messages {
			rf.applyCh <- m
			rf.mu.Lock()
			rf.lastApplied++
			//DPrintf("[%d] applies entry[%d].(Command=%v)", rf.me, m.CommandIndex, m.Command)
			rf.mu.Unlock()
		}
	}
}

func (rf *Raft) callAppendEntries() {
	rf.mu.Lock()
	// update leader's commitIndex every time before sending heartbeat
	commitIndex := rf.getMaxCommitIndex()
	// new committed entry, wake goroutine applyCommittedEntries to apply theses entries
	if commitIndex != rf.commitIndex {
		rf.commitIndex = commitIndex
		rf.newApplicable.Signal()
	}
	me := rf.me
	rf.mu.Unlock()
	for i := 0; i < len(rf.peers); i++ {
		if i == me {
			continue
		}
		rf.mu.Lock()
		args := AppendEntriesArgs{
			Term:         rf.currentTerm,
			LeaderId:     rf.me,
			PrevLogIndex: rf.nextIndex[i] - 1,
			PrevLogTerm:  rf.getLogTerm(rf.nextIndex[i] - 1),
			Entries:      rf.log[rf.nextIndex[i]-1:],
			LeaderCommit: rf.commitIndex,
		}
		reply := AppendEntriesReply{}
		rf.mu.Unlock()
		go func(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) {
			ok := rf.sendAppendEntries(server, args, reply)
			if !ok {
				//DPrintf("[%d] failed to contact %d", me, server)
				return
			}
			rf.mu.Lock()
			// a reply contains higher term, convert to follower
			if reply.Term > rf.currentTerm {
				rf.convertToFollower(reply.Term)
			} else if !reply.Success && reply.LengthLog != -1 {
				// back up strategy
				// case 1: follower does not have any entry at args.PrevLogIndex,
				// back up to follower's end of log
				// s1 4
				// s2 4 6 6 6
				// case 2: leader has entry with the same term of follower's conflicting entry,
				// back up to leader's last index of entry with the conflicting term
				// s1 4 4 4
				// s2 4 6 6 6
				// case 3: leader does not have follower's conflicting term at all
				// back up to follower's first index of entry with the conflicting term
				// s1 4 5 5
				// s2 4 6 6 6
				if reply.LengthLog < args.PrevLogIndex {
					rf.nextIndex[server] = reply.LengthLog + 1
				} else if lastIndex := rf.findLastIndex(reply.ConflictTerm); lastIndex != -1 {
					rf.nextIndex[server] = lastIndex + 1
				} else {
					// leader does not have entry with reply's term at all
					rf.nextIndex[server] = reply.FirstConflictTermIndex
				}
			} else {
				rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
				rf.nextIndex[server] = rf.matchIndex[server] + 1
			}
			rf.mu.Unlock()
		}(i, &args, &reply)
	}
}

func (rf *Raft) CallRequestVote(server int, term int) bool {
	rf.mu.Lock()
	//DPrintf("[%d] is sending request vote to %d", rf.me, server)
	args := RequestVoteArgs{
		Term:         term,
		CandidateId:  rf.me,
		LastLogIndex: len(rf.log),
		LastLogTerm:  rf.getLogTerm(len(rf.log)),
	}
	rf.mu.Unlock()
	var reply RequestVoteReply

	// actually send a RPC
	ok := rf.sendRequestVote(server, &args, &reply)

	if !ok {
		return false
	}
	rf.mu.Lock()
	// a reply contains higher term, convert to follower
	if rf.currentTerm < reply.Term {
		rf.convertToFollower(reply.Term)
	}
	rf.mu.Unlock()
	return reply.VoteGranted
}

func min(a, b int) int {
	if a < b {
		return a
	} else {
		return b
	}
}

func (rf *Raft) convertToFollower(term int) {
	rf.currentTerm = term
	rf.state = Follower
	rf.votedFor = -1
	rf.timer.reset()
	rf.persist()
}

一些注意点

  • Each candidate restarts its randomized election timeout at the start
    of an election, and it waits for that timeout to elapse before
    starting the next election.

  • Use diffrent seed for every server to generate random number,
    otherwise they will generate same random number sequence and
    repeatedly cause split vote.

  • you will need to either have a dedicated “applier”, or to lock around these applies, so that some other routine doesn’t also detect that entries need to be applied and also tries to apply.

  • Make sure that you check for commitIndex > lastApplied either periodically, or after commitIndex is updated (i.e., after matchIndex is updated).

  • nextIndex is a guess as to what prefix the leader shares with a given follower. When a leader has just been elected, nextIndex is set to be index index at the end of the log.

  • matchIndex is used for safety. It is a conservative measurement of what prefix of the log the leader shares with a given follower. This is why matchIndex is initialized to -1 (i.e., we agree on no prefix), and only updated when a follower positively acknowledges an AppendEntries RPC.

  • the correct thing to do is update matchIndex to be prevLogIndex + len(entries[]) from the arguments you sent in the RPC originally.

  • commitIndex is volatile because Raft can figure out a correct value for it after a reboot using just the persistent state. Once a leader successfully gets a new log entry committed, it knows everything before that point is also committed. A follower that crashes and comes back up will be told about the right commitIndex whenever the current leader sends it an AppendEntriesRPC.

  • upon receiving AppendEntries RPC: If leader has a higher term, then we should adopt his term and convert to follower. We should set votedFor = args.leaderId, since this is a legit leader. If we set votedFor = -1, and then some server from previous term crashes, and then reboot and kick off an election at this term, servers with votedFor = -1 may vote for them even though they are following this leader. This reboot server may become a leader and overwrite committed entries.

  • you should only restart your election timer if:

  1. you get an AppendEntries RPC from the current leader (i.e., if the term in the AppendEntries arguments is outdated, you should not reset your timer);
  2. you are starting an election; or
  3. you grant a vote to another peer.

参考文献

Student’s Guide to Raft
Extended Raft Paper

上一篇: 明天放假活动安排

下一篇: lab2