6.824 Lab 2: Raft

lab2主要是根据论文 In Search of an Understandable Consensus Algorithm (Extended Version) 做一个复制状态机(replicated state machine)并以此做一个有容错性的存储k/v的分布式系统。



  1. lab2A主要完成leader election部分,从多个server中选举出一个leader,并让选出的leader周期性地发送心跳包给各follower;
  2. lab2B主要完成日志的追加、提交(commit)、应用(apply);
  3. lab3C主要完成日志的持久化和server崩溃后的恢复。


package raft

// this is an outline of the API that raft must expose to
// the service (or tester). see comments below for
// each of these functions for more details.
// rf = Make(...)
//   create a new Raft server.
// rf.Start(command interface{}) (index, Term, isleader)
//   start agreement on a new log entry
// rf.GetState() (Term, isLeader)
//   ask a Raft for its current Term, and whether it thinks it is leader
// ApplyMsg
//   each time a new entry is committed to the log, each Raft peer
//   should send an ApplyMsg to the service (or tester)
//   in the same server.

import (
import "sync/atomic"
import "../labrpc"
import "../labgob"

// as each Raft peer becomes aware that successive log entries are
// committed, the peer should send an ApplyMsg to the service (or
// tester) on the same server, via the applyCh passed to Make(). set
// CommandValid to true to indicate that the ApplyMsg contains a newly
// committed log entry.
// in Lab 3 you'll want to send other kinds of messages (e.g.,
// snapshots) on the applyCh; at that point you can add fields to
// ApplyMsg, but set CommandValid to false for these other uses.
type ApplyMsg struct {
	CommandValid bool
	Command      interface{}
	CommandIndex int

type Entry struct {
	Command interface{}
	Term    int

const (
	Follower = iota

// time-related constants, in millisecond
const (
	FixedTimeout       = 210
	RandomTimeout      = 500
	CheckTimeoutPeriod = 10
	HeartbeatPeriod    = 100

// A Go object implementing a single Raft peer.
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (2A, 2B, 2C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.

	// persistent state on all servers:
	currentTerm int     // latest Term server has seen (initialized to 0 on first boot, increases monotonically)
	votedFor    int     //	CandidateId that received vote in current Term (or null if none)
	log         []Entry // log entries; each entry contains command for state machine, and Term when entry was received by leader (first index is 1)

	// volatile state on all servers:
	commitIndex   int // index of highest log entry known to be committed (initialized to 0, increases monotonically)
	lastApplied   int // index of highest log entry applied to state machine (initialized to 0, increases monotonically)
	state         int	// each server has three state: Follower/Leader/Candidate
	timer         Timer	// time a server, if time-out, then convert to candidate and kick off an election
	applyCh       chan ApplyMsg	// channel to send message to application
	newApplicable *sync.Cond	// condition variable used to wake goroutine that apply committed entries

	// volatile state on leaders:
	nextIndex  []int // for each server, index of the next log entry to send to that server (initialized to leader last log index + 1)
	matchIndex []int // for each server, index of highest log entry known to be replicated on server (initialized to 0, increases monotonically)


type AppendEntriesArgs struct {
	Term         int     // leader’s term
	LeaderId     int     // so follower can redirect clients
	PrevLogIndex int     // index of log entry immediately preceding new ones
	PrevLogTerm  int     // term of prevLogIndex entry
	Entries      []Entry // log entries to store (empty for heartbeat; may send more than one for efficiency)
	LeaderCommit int     // leader’s commitIndex

type AppendEntriesReply struct {
	Term    int  // currentTerm, for leader to update itself
	Success bool // true if follower contained entry matching prevLogIndex and prevLogTerm

	// additional information needed to back up faster
	ConflictTerm           int // term of the conflicting entry
	FirstConflictTermIndex int // index of the first entry of conflicting term
	LengthLog              int // length of log

type RequestVoteArgs struct {
	Term         int // candidate’s term
	CandidateId  int // candidate requesting vote
	LastLogIndex int // index of candidate’s last log entry
	LastLogTerm  int // Term of candidate’s last log entry

type RequestVoteReply struct {
	Term        int  // currentTerm, for candidate to update itself
	VoteGranted bool // true means candidate received vote

type Timer struct {
	startTime time.Time
	timeout   time.Duration
	r         *rand.Rand

func (t *Timer) isExpired() bool {
	return time.Now().Sub(t.startTime) > t.timeout

func (t *Timer) reset() {
	t.timeout = FixedTimeout*time.Millisecond +
	t.startTime = time.Now()

// return currentTerm and whether this server
// believes it is the leader.
func (rf *Raft) GetState() (int, bool) {

	var term int
	var isLeader bool

	defer rf.mu.Unlock()

	term = rf.currentTerm
	isLeader = rf.state == Leader
	return term, isLeader

// save Raft's persistent state to stable storage,
// where it can later be retrieved after a crash and restart.
// see paper's Figure 2 for a description of what should be persistent.
func (rf *Raft) persist() {
	w := new(bytes.Buffer)
	e := labgob.NewEncoder(w)
	err := e.Encode(rf.currentTerm)
	if err != nil {
		log.Fatalf("[%d] fails to encode currentTerm", &rf.me)
	err = e.Encode(rf.votedFor)
	if err != nil {
		log.Fatalf("[%d] fails to encode votedFor", &rf.me)
	//DPrintf("[%d] encoding log to persist(%v)", rf.me, rf.log)
	err = e.Encode(rf.log)
	if err != nil {
		log.Fatalf("[%d] fails to encode log", &rf.me)
	data := w.Bytes()
	//DPrintf("[%d] saves persistent state", rf.me)

// restore previously persisted state.
func (rf *Raft) readPersist(data []byte) {
	if data == nil || len(data) < 1 { // bootstrap without any state?
	defer rf.mu.Unlock()
	r := bytes.NewBuffer(data)
	d := labgob.NewDecoder(r)
	var currentTerm, votedFor int
	var persistedLog []Entry
	if d.Decode(&currentTerm) != nil || d.Decode(&votedFor) != nil || d.Decode(&persistedLog) != nil {
		// fail to read persistent data
		//DPrintf("[%d] fail to read persistent data", rf.me)
	} else {
		rf.currentTerm = currentTerm
		rf.votedFor = votedFor
		rf.log = persistedLog

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
	defer rf.mu.Unlock()
	reply.VoteGranted = false
	reply.Term = rf.currentTerm

	// 1. Reply false if term < currentTerm (§5.1)
	if args.Term < rf.currentTerm {
		//DPrintf("[%d] refuse to vote for %d", rf.me, args.CandidateId)
		//DPrintf("votedFor: %d, CandidateId: %d", rf.votedFor, args.CandidateId)

	// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
	// this server MUST NOT immediately convert to follower of this candidate
	// we still need to check condition 2. to decide whether we vote for a server
	if args.Term > rf.currentTerm {

	// 2. If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
	// Raft determines which of two logs is more up-to-date by comparing the index and term of the last entries in the logs.
	// If the logs have last entries with different terms, then the log with the later term is more up-to-date.
	// If the logs end with the same term, then whichever log is longer is more up-to-date.
	if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) &&
		(args.LastLogTerm > rf.getLogTerm(len(rf.log)) ||
			args.LastLogTerm == rf.getLogTerm(len(rf.log)) && args.LastLogIndex >= len(rf.log)) {
		rf.state = Follower
		rf.votedFor = args.CandidateId
		reply.VoteGranted = true

// return term of the given index
// log index starts from 1
func (rf *Raft) getLogTerm(index int) int {
	// return -1 if index is out of lower bound
	if index-1 < 0 {
		return -1
	return rf.log[index-1].Term

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {

	defer rf.mu.Unlock()

	reply.Term = rf.currentTerm
	reply.Success = false
	reply.LengthLog = -1
	reply.ConflictTerm = -1
	reply.ConflictTerm = -1

	// 1. Reply false if term < currentTerm (§5.1)
	if args.Term < rf.currentTerm {

	// this is a legit leader because term >= currentTerm, therefore we reset timer

	// If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (§5.1)
	// if this leader has a higher term, then we should adopt his term and convert to follower.
	// we should set votedFor = args.leaderId, since this is a legit leader and we do not expect to vote for anyone else.
	// if we set votedFor = -1, and then some server from previous term crashes,
	// and then reboot and kick off an election at this term, servers with votedFor = -1
	// may vote for them even though they are following this leader.
	// this reboot server may become a leader and overwrite committed entries.
	if args.Term > rf.currentTerm {
		rf.currentTerm = args.Term
		rf.state = Follower
		rf.votedFor = args.LeaderId
	} else {
		rf.state = Follower

	//DPrintf("[%d] receives AppendEntries RPC from %d, (currentTerm = %d, logLength = %d)", rf.me, args.LeaderId, rf.currentTerm, len(rf.log))

	// 2. Reply false if log does not contain an entry at prevLogIndex whose term matches prevLogTerm (§5.3)
	if len(rf.log) < args.PrevLogIndex || rf.getLogTerm(args.PrevLogIndex) != args.PrevLogTerm {
		reply.LengthLog = len(rf.log)
		if len(rf.log) >= args.PrevLogIndex {
			reply.ConflictTerm = rf.getLogTerm(args.PrevLogIndex)
			reply.FirstConflictTermIndex = rf.findFirstIndex(reply.ConflictTerm)

	// 3. If an existing entry conflicts with a new one (same index but different terms),
	// delete the existing entry and all that follow it (§5.3)
	// do not truncate its entries if they have the same term
	// check entries until they don't match
	index := 0
	for index < len(args.Entries) && args.PrevLogIndex+index < len(rf.log) {
		if rf.log[args.PrevLogIndex+index].Term != args.Entries[index].Term {
	args.Entries = args.Entries[index:]

	// 4. Append any new entries not already in the log
	// truncate rf.log only when new entries present, prevent it from truncating valid entries
	if len(args.Entries) > 0 {
		//DPrintf("[%d] truncates its log from len = %d to len = %d", rf.me, len(rf.log), args.PrevLogIndex+index)
		rf.log = rf.log[:args.PrevLogIndex+index]
		//for i := 0; i < len(args.Entries); i++ {
		//	DPrintf("[%d] appends entry(Command = %v, term = %d), preLogIndex = %d", rf.me, args.Entries[i].Command, args.Entries[i].Term, args.PrevLogIndex)
		rf.log = append(rf.log, args.Entries...)

	// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
	//DPrintf("[%d] receives AppendEntryRPC from %d with leaderCommit=%d", rf.me, args.LeaderId, args.LeaderCommit)
	if args.LeaderCommit > rf.commitIndex {
		//DPrintf("[%d] updates its commitIndex from %d to %d", rf.me, rf.commitIndex, min(args.LeaderCommit, len(rf.log)))
		rf.commitIndex = min(args.LeaderCommit, len(rf.log))

	reply.Success = true

// find first index of the given term
func (rf *Raft) findFirstIndex(term int) int {
	left, right, res := 0, len(rf.log)-1, -1
	for left <= right {
		mid := left + (right-left)/2
		if rf.log[mid].Term < term {
			left = mid + 1
		} else if rf.log[mid].Term > term {
			right = mid - 1
		} else {
			res = mid
			right = mid - 1
	return res + 1

// find last index of the given term
func (rf *Raft) findLastIndex(term int) int {
	left, right, res := 0, len(rf.log)-1, -1
	for left <= right {
		mid := left + (right-left)/2
		if rf.log[mid].Term < term {
			left = mid + 1
		} else if rf.log[mid].Term > term {
			right = mid - 1
		} else {
			res = mid
			left = mid + 1
	return res + 1

// the service using Raft (e.g. a k/v server) wants to start
// agreement on the next command to be appended to Raft's log. if this
// server isn't the leader, returns false. otherwise start the
// agreement and return immediately. there is no guarantee that this
// command will ever be committed to the Raft log, since the leader
// may fail or lose an election. even if the Raft instance has been killed,
// this function should return gracefully.
// the first return value is the index that the command will appear at
// if it's ever committed. the second return value is the current
// Term. the third return value is true if this server believes it is
// the leader.
func (rf *Raft) Start(command interface{}) (int, int, bool) {
	index := -1
	term := -1


	// return false if this is not the leader
	if rf.state != Leader || rf.killed() {
		return index, term, false

	index = len(rf.log) + 1
	term = rf.currentTerm

	// append new entry to leader
	e := Entry{Command: command, Term: rf.currentTerm}
	rf.log = append(rf.log, e)
	//DPrintf("[%d] appends new entry to log from start function.(command=%v, term=%v)", rf.me, e.Command, e.Term)

	return index, term, true

// caller should hold rf.mu while calling this function
func (rf *Raft) getMaxCommitIndex() int {
	maxCommit := rf.commitIndex
	next := rf.commitIndex + 1
	for rf.canCommit(next) {
		// to eliminate problems like figure 8
		// Raft never commits log entries from previous terms by counting replicas.
		// leader commits an entry from this term to implicitly commit entries from previous term
		if rf.getLogTerm(next) == rf.currentTerm {
			maxCommit = next
	//if maxCommit != rf.commitIndex {
	//	DPrintf("[%d] updates its commit index from %d to %d", rf.me, rf.commitIndex, maxCommit)
	return maxCommit

// check if entry[0...index] is allowed to be committed
func (rf *Raft) canCommit(index int) bool {
	if index > len(rf.log) {
		return false
	// count servers that have log[index]
	count := 1
	for i, n := range rf.matchIndex {
		if i == rf.me {
		if n >= index {
	return count > len(rf.peers)/2

// the tester doesn't halt goroutines created by Raft after each test,
// but it does call the Kill() method. your code can use killed() to
// check whether Kill() has been called. the use of atomic avoids the
// need for a lock.
// the issue is that long-running goroutines use memory and may chew
// up CPU time, perhaps causing later tests to fail and generating
// confusing debug output. any goroutine with a long-running loop
// should call killed() to check whether it should stop.
func (rf *Raft) Kill() {
	atomic.StoreInt32(&rf.dead, 1)
	// Your code here, if desired.

func (rf *Raft) killed() bool {
	z := atomic.LoadInt32(&rf.dead)
	return z == 1

// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
func Make(peers []*labrpc.ClientEnd, me int,
	persister *Persister, applyCh chan ApplyMsg) *Raft {
	rf := &Raft{}
	rf.peers = peers
	rf.persister = persister
	rf.me = me
	rf.applyCh = applyCh

	rf.commitIndex = 0
	rf.lastApplied = 0
	rf.newApplicable = sync.NewCond(&rf.mu)
	rf.timer = Timer{startTime: time.Now(), r: rand.New(rand.NewSource(int64(me + 1)))}
	rf.state = Follower

	// initialize from state persisted before a crash
	//DPrintf("[%d] readPersist: currentTerm = %d, votedFor = %d, log = %v", rf.me, rf.currentTerm, rf.votedFor, rf.log)

	// periodically check if it hits timeout
	go rf.periodicTimeout()
	// check if there's any newly committed entry to apply
	go rf.applyCommittedEntries()

	return rf

func (rf *Raft) sendRequestVote(server int, args *RequestVoteArgs, reply *RequestVoteReply) bool {
	ok := rf.peers[server].Call("Raft.RequestVote", args, reply)
	return ok

func (rf *Raft) sendAppendEntries(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) bool {
	ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
	return ok

func (rf *Raft) periodicTimeout() {
	for !rf.killed() {
		// not a leader when timer expires, convert to candidate and kick off an election
		if rf.state != Leader && rf.timer.isExpired() {
			go rf.kickOffElection()
		time.Sleep(CheckTimeoutPeriod * time.Millisecond)

func (rf *Raft) periodicHeartbeat() {
	for !rf.killed() {
		if rf.state == Leader {
		} else {
			// not a leader anymore, exit this goroutine
		time.Sleep(HeartbeatPeriod * time.Millisecond)

func (rf *Raft) kickOffElection() {
	rf.state = Candidate
	rf.votedFor = rf.me
	//DPrintf("[%d] is kicking off an election at term %d", rf.me, rf.currentTerm)
	// record rf.currentTerm because we need to release the lock while calling RequestVote RPC
	term := rf.currentTerm
	voteCount := 1
	done := false
	for i := range rf.peers {
		if i == rf.me {
		go func(server int) {
			voteGranted := rf.CallRequestVote(server, term)
			if !voteGranted {
			// receive vote from server
			//DPrintf("[%d] gets vote from %d", rf.me, server)
			// if this goroutine sees there's not enough vote, exits
			if done || voteCount <= len(rf.peers)/2 {
			// get enough votes, become a leader
			done = true
			// double check if it's still in the term when the election started
			if rf.currentTerm == term {
				//DPrintf("[%d] gets enough votes and now becomes the leader (currentTerm = %d, voteCount = %d, numPeer = %d)", rf.me, rf.currentTerm, voteCount, len(rf.peers))
				// we should not set votedFor = -1, since a leader should not vote for anyone else in his term
				// start a go routine to send out heartbeat periodically
				go rf.periodicHeartbeat()
			} else {

// caller should hold rf.mu while calling this function
func (rf *Raft) initializeLeader() {
	rf.state = Leader
	// nextIndex is initialized to the end of the log
	rf.nextIndex = make([]int, len(rf.peers))
	for i := 0; i < len(rf.nextIndex); i++ {
		rf.nextIndex[i] = len(rf.log) + 1
	// match index is initialized to 0
	rf.matchIndex = make([]int, len(rf.peers))
	for i := 0; i < len(rf.matchIndex); i++ {
		rf.matchIndex[i] = 0

// since sending on the applyCh can block, it must be done in a single goroutine.
// this goroutine shouldn't hold any lock while sending message into rf.applyCh
func (rf *Raft) applyCommittedEntries() {
	for !rf.killed() {
		// check if there is any new applicable entry
		for rf.lastApplied >= rf.commitIndex {

		var messages []ApplyMsg
		// apply committed entries
		for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
			msg := ApplyMsg{
				CommandValid: true,
				Command:      rf.log[i-1].Command,
				CommandIndex: i,
			messages = append(messages, msg)


		for _, m := range messages {
			rf.applyCh <- m
			//DPrintf("[%d] applies entry[%d].(Command=%v)", rf.me, m.CommandIndex, m.Command)

func (rf *Raft) callAppendEntries() {
	// update leader's commitIndex every time before sending heartbeat
	commitIndex := rf.getMaxCommitIndex()
	// new committed entry, wake goroutine applyCommittedEntries to apply theses entries
	if commitIndex != rf.commitIndex {
		rf.commitIndex = commitIndex
	me := rf.me
	for i := 0; i < len(rf.peers); i++ {
		if i == me {
		args := AppendEntriesArgs{
			Term:         rf.currentTerm,
			LeaderId:     rf.me,
			PrevLogIndex: rf.nextIndex[i] - 1,
			PrevLogTerm:  rf.getLogTerm(rf.nextIndex[i] - 1),
			Entries:      rf.log[rf.nextIndex[i]-1:],
			LeaderCommit: rf.commitIndex,
		reply := AppendEntriesReply{}
		go func(server int, args *AppendEntriesArgs, reply *AppendEntriesReply) {
			ok := rf.sendAppendEntries(server, args, reply)
			if !ok {
				//DPrintf("[%d] failed to contact %d", me, server)
			// a reply contains higher term, convert to follower
			if reply.Term > rf.currentTerm {
			} else if !reply.Success && reply.LengthLog != -1 {
				// back up strategy
				// case 1: follower does not have any entry at args.PrevLogIndex,
				// back up to follower's end of log
				// s1 4
				// s2 4 6 6 6
				// case 2: leader has entry with the same term of follower's conflicting entry,
				// back up to leader's last index of entry with the conflicting term
				// s1 4 4 4
				// s2 4 6 6 6
				// case 3: leader does not have follower's conflicting term at all
				// back up to follower's first index of entry with the conflicting term
				// s1 4 5 5
				// s2 4 6 6 6
				if reply.LengthLog < args.PrevLogIndex {
					rf.nextIndex[server] = reply.LengthLog + 1
				} else if lastIndex := rf.findLastIndex(reply.ConflictTerm); lastIndex != -1 {
					rf.nextIndex[server] = lastIndex + 1
				} else {
					// leader does not have entry with reply's term at all
					rf.nextIndex[server] = reply.FirstConflictTermIndex
			} else {
				rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
				rf.nextIndex[server] = rf.matchIndex[server] + 1
		}(i, &args, &reply)

func (rf *Raft) CallRequestVote(server int, term int) bool {
	//DPrintf("[%d] is sending request vote to %d", rf.me, server)
	args := RequestVoteArgs{
		Term:         term,
		CandidateId:  rf.me,
		LastLogIndex: len(rf.log),
		LastLogTerm:  rf.getLogTerm(len(rf.log)),
	var reply RequestVoteReply

	// actually send a RPC
	ok := rf.sendRequestVote(server, &args, &reply)

	if !ok {
		return false
	// a reply contains higher term, convert to follower
	if rf.currentTerm < reply.Term {
	return reply.VoteGranted

func min(a, b int) int {
	if a < b {
		return a
	} else {
		return b

func (rf *Raft) convertToFollower(term int) {
	rf.currentTerm = term
	rf.state = Follower
	rf.votedFor = -1


  • Each candidate restarts its randomized election timeout at the start
    of an election, and it waits for that timeout to elapse before
    starting the next election.

  • Use diffrent seed for every server to generate random number,
    otherwise they will generate same random number sequence and
    repeatedly cause split vote.

  • you will need to either have a dedicated “applier”, or to lock around these applies, so that some other routine doesn’t also detect that entries need to be applied and also tries to apply.

  • Make sure that you check for commitIndex > lastApplied either periodically, or after commitIndex is updated (i.e., after matchIndex is updated).

  • nextIndex is a guess as to what prefix the leader shares with a given follower. When a leader has just been elected, nextIndex is set to be index index at the end of the log.

  • matchIndex is used for safety. It is a conservative measurement of what prefix of the log the leader shares with a given follower. This is why matchIndex is initialized to -1 (i.e., we agree on no prefix), and only updated when a follower positively acknowledges an AppendEntries RPC.

  • the correct thing to do is update matchIndex to be prevLogIndex + len(entries[]) from the arguments you sent in the RPC originally.

  • commitIndex is volatile because Raft can figure out a correct value for it after a reboot using just the persistent state. Once a leader successfully gets a new log entry committed, it knows everything before that point is also committed. A follower that crashes and comes back up will be told about the right commitIndex whenever the current leader sends it an AppendEntriesRPC.

  • upon receiving AppendEntries RPC: If leader has a higher term, then we should adopt his term and convert to follower. We should set votedFor = args.leaderId, since this is a legit leader. If we set votedFor = -1, and then some server from previous term crashes, and then reboot and kick off an election at this term, servers with votedFor = -1 may vote for them even though they are following this leader. This reboot server may become a leader and overwrite committed entries.

  • you should only restart your election timer if:

  1. you get an AppendEntries RPC from the current leader (i.e., if the term in the AppendEntries arguments is outdated, you should not reset your timer);
  2. you are starting an election; or
  3. you grant a vote to another peer.


Student’s Guide to Raft
Extended Raft Paper

