欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

2020 6.824 的 Raft Lab 4A

程序员文章站 2022-06-26 14:48:29
目录前言一、Overview1.1 架构图1.2 架构细节二、client三、Master server3.1 属性3.2 构造函数3.3 Join/Move/Leave3.4 Query3.5 serverMonitor3.6 Load balance四、总结前言做2020的MIT6.824,完成了实验 Lab4A,通过了测试,对于之前的Raft实现的实验请参考Lab 2A, Lab 2B 和 Lab 2C 和 Lab 3A 以及 Lab 3BLab4A主要是做DB的分片,也就是Client向Se...

前言

做2020的MIT6.824,完成了实验 Lab4A,通过了测试,对于之前的Raft实现的实验请参考Lab 2ALab 2BLab 2CLab 3A 以及 Lab 3B

Lab4A主要是做DB的分片,也就是Client向Server请求,Server根据不同的Request类型,向不同的DB获取/更改数据。实验重点需要完成master对DB server的Join/Leave/Move的三个操作,并且实验Client对Server的configs查询的操作。

整个实验有三个点需要注意

  1. load balance的算法
  2. config需要拷贝而不是引用
  3. labgob需要注册JoinArgs,LeaveArgs,MoveArgs 和 QueryArgs

一、Overview

1.1 架构图

实验是通过数据库分片来提升性能的,Master负责数据库的分片,大概架构如下
2020 6.824 的 Raft Lab 4A

1.2 架构细节

第一,对于本次分片来说,最多可以有10个Group,这里Common文件可以看出来

// The number of shards.
const NShards = 10

第二,对于Master来说,也是有多个servers的,所以需要通过Raft保证shards configuration的一致性,从config可以看出来,一个有三个属性需要维护的,其中Shards就是分片configuration, 初始是[0,0,0,0,0,0,0,0,0,0],Groups就是对应每个shard分片中Raft servers的information,比如Group1 -> server[a, b, c]

type Config struct {
	Num    int              // config number
	Shards [NShards]int     // shard -> gid
	Groups map[int][]string // gid -> servers[]
}

举个例子,shards初始为[0,0,0,0,0,0,0,0,0,0],那么Join了Group1 -> servers[a, b, c] 之后,整个系统就有1个Group了,那么,shards就会变成[1,1,1,1,1,1,1,1,1,1],如果JoinGroup2 -> servers[e, f, g]之后,整个系统就有2个groups了,那么,10个shards就需要尽量平均分配给两个Groups,也就是[1,1,1,1,1,2,2,2,2,2]

这里也就是涉及到了load balance的算法,我的算法实现参考了 网上 的方法,并没有用到高级的数据结构比如heap来实现


二、client

这个跟kvraft中client的实现非常相似,基本搬过来就行

其中,Clerk的构造函数

type Clerk struct {
	servers []*labrpc.ClientEnd

	mu         sync.Mutex
	leaderId   int
	clientId   int64
	sequenceId int64

	// Your data here.
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
	ck := new(Clerk)
	ck.servers = servers
	// Your code here.
	ck.clientId = nrand()
	return ck
}

增加一些新的属性, 主要就是ClientId很SequenceId

type JoinArgs struct {
	Servers    map[int][]string // new GID -> servers mappings
	ClientId   int64
	SequenceId int64
}

type LeaveArgs struct {
	GIDs       []int
	ClientId   int64
	SequenceId int64
}

type MoveArgs struct {
	Shard      int
	GID        int
	ClientId   int64
	SequenceId int64
}

type QueryArgs struct {
	Num        int // desired config number
	ClientId   int64
	SequenceId int64
}

Join/Move/Leave/Query 的实现,对于Join/Move/Leave 基本都是一样的

func (ck *Clerk) Query(num int) Config {
	args := &QueryArgs{}
	// Your code here.
	args.Num = num
	args.ClientId = ck.clientId
	args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)

	for {
		reply := QueryReply{}
		if ck.servers[ck.currentLeader()].Call("ShardMaster.Query", args, &reply) && !reply.WrongLeader {
			return reply.Config
		}
		ck.leaderId = ck.changeLeader()
		time.Sleep(100 * time.Millisecond)
	}
}

func (ck *Clerk) Join(servers map[int][]string) {
	args := &JoinArgs{}
	args.Servers = servers
	args.ClientId = ck.clientId
	args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)

	for {
		reply := JoinReply{}
		if ck.servers[ck.currentLeader()].Call("ShardMaster.Join", args, &reply) && !reply.WrongLeader {
			return
		}
		ck.leaderId = ck.changeLeader()
		time.Sleep(100 * time.Millisecond)
	}
}


三、Master server

3.1 属性

跟kvRaft的server还是很类似的,需要有两个mapper,分别是requestMapper和sequenceMapper


type ShardMaster struct {
	mu      sync.Mutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg

	// Your data here.
	dead    int32
	configs []Config // indexed by config num

	requestMapper  map[int]chan Op
	sequenceMapper map[int64]int64
}

type Op struct {
	// Your data here.
	SequenceId int64
	ClientId   int64
	OpType     string
	OpArgs     interface{}
	Index      int
	Term       int
}

type joinLeaveMoveReply struct {
	WrongLeader bool
	Err         Err
}

const (
	JOIN  string = "Join"
	LEAVE string = "Leave"
	MOVE  string = "Move"
	QUERY string = "Query"
)

3.2 构造函数

这里有个很重要的地方,那就是需要向labgob Register 任何自定义的STRUCT 否则会出现interface{ } is nil 的报错

func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster {
	sm := new(ShardMaster)
	sm.me = me

	sm.configs = make([]Config, 1)
	sm.configs[0].Groups = map[int][]string{}

	labgob.Register(Op{})
	labgob.Register(JoinArgs{})
	labgob.Register(LeaveArgs{})
	labgob.Register(MoveArgs{})
	labgob.Register(QueryArgs{})
	sm.applyCh = make(chan raft.ApplyMsg)
	sm.rf = raft.Make(servers, me, persister, sm.applyCh)

	// Your code here.
	sm.sequenceMapper = make(map[int64]int64)
	sm.requestMapper = make(map[int]chan Op)

	go sm.serverMonitor()

	return sm
}

3.3 Join/Move/Leave

对于Join/Move/Leave 通过Raft保证一致性,实现基本都是一样的,就是

  1. rf.Start通知raft
  2. 等待让她applyMsg的通知,对比term,如果一致则返回success
func (sm *ShardMaster) Join(args *JoinArgs, reply *JoinReply) {
	var isLeader bool
	clientOp := Op{OpType: JOIN, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) Leave(args *LeaveArgs, reply *LeaveReply) {
	var isLeader bool
	clientOp := Op{OpType: LEAVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) Move(args *MoveArgs, reply *MoveReply) {
	var isLeader bool
	clientOp := Op{OpType: MOVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}
	joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
	reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}

func (sm *ShardMaster) joinLeaveMove(clientOp Op) joinLeaveMoveReply {
	reply := joinLeaveMoveReply{}
	ch := sm.getChannel(clientOp.Index)
	defer func() {
		sm.mu.Lock()
		delete(sm.requestMapper, clientOp.Index)
		sm.mu.Unlock()
	}()

	timer := time.NewTicker(2000 * time.Millisecond)
	defer timer.Stop()
	select {
	case op := <-ch:
		sm.mu.Lock()
		opTerm := op.Term
		sm.mu.Unlock()
		if clientOp.Term != opTerm {
			reply.WrongLeader = true
		} else {
			reply.Err = OK
		}
	case <-timer.C:
		reply.WrongLeader = true
	}
	return reply
}

3.4 Query

query的实现return的内容稍微多了一点,那就是需要return config,根据要求,query为-1或者大于configs长度的都返回最新的config,否则返回对应num的config

func (sm *ShardMaster) Query(args *QueryArgs, reply *QueryReply) {
	// Your code here.
	DPrintf("[%v] Query is called", sm.me)
	var isLeader bool
	clientOp := Op{OpType: QUERY, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
	clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
	if !isLeader {
		reply.WrongLeader = true
		return
	}

	DPrintf("Query [%v] leader is found", sm.me)

	ch := sm.getChannel(clientOp.Index)
	defer func() {
		sm.mu.Lock()
		delete(sm.requestMapper, clientOp.Index)
		sm.mu.Unlock()
	}()

	timer := time.NewTicker(2000 * time.Millisecond)
	defer timer.Stop()
	select {
	case op := <-ch:
		DPrintf("[%v] QUERY receive op", sm.me)
		sm.mu.Lock()
		opTerm := op.Term
		sm.mu.Unlock()
		DPrintf("[%v] QUERY clientOp.Term[%v] vs opTerm[%v]", sm.me, clientOp.Term, opTerm)
		if clientOp.Term != opTerm {
			reply.WrongLeader = true
		} else {
			DPrintf("[%v] QUERY args.Num=%v sm.config=%v", sm.me, args.Num, sm.configs)
			sm.mu.Lock()
			reply.Err = OK
			if args.Num >= 0 && args.Num < len(sm.configs) {
				reply.Config = sm.configs[args.Num]
			} else {
				reply.Config = sm.configs[len(sm.configs)-1]
			}
			sm.mu.Unlock()
		}
	case <-timer.C:
		reply.WrongLeader = true
	}
}

3.5 serverMonitor

这个函数是用来监视Raft的applyCh,如果是Join或者是Leave,需要重新平衡Shards中Groups的分配,其中addNewConfig是很重要的,这是用来避免map的引用带来的error

func (sm *ShardMaster) serverMonitor() {
	for {
		if sm.killed() {
			return
		}
		select {
		case msg := <-sm.applyCh:
			if msg.IsSnapshot || !msg.CommandValid {
				continue
			}

			index := msg.CommandIndex
			term := msg.CommandTerm
			op := msg.Command.(Op)
			sm.mu.Lock()
			sequenceInMapper, hasSequence := sm.sequenceMapper[op.ClientId]
			op.Term = term
			if !hasSequence || op.SequenceId > sequenceInMapper {
				switch op.OpType {
				case JOIN:
					newConfig := sm.addNewConfig()
					joinArgs := op.OpArgs.(JoinArgs)
					for i, servers := range joinArgs.Servers {
						newConfig.Groups[i] = servers
						sm.balanceLoad(&newConfig, i, JOIN)
					}
					sm.configs = append(sm.configs, newConfig)
				case LEAVE:
					newConfig := sm.addNewConfig()
					leaveArgs := op.OpArgs.(LeaveArgs)
					for _, gid := range leaveArgs.GIDs {
						delete(newConfig.Groups, gid)
						sm.balanceLoad(&newConfig, gid, LEAVE)
					}
					sm.configs = append(sm.configs, newConfig)
				case MOVE:
					newConfig := sm.addNewConfig()
					moveArgs := op.OpArgs.(MoveArgs)
					if _, isExists := newConfig.Groups[moveArgs.GID]; isExists {
						newConfig.Shards[moveArgs.Shard] = moveArgs.GID
					} else {
						return
					}
					sm.configs = append(sm.configs, newConfig)
				}
				sm.sequenceMapper[op.ClientId] = op.SequenceId
			}
			sm.mu.Unlock()
			sm.getChannel(index) <- op
		}
	}
}

func (sm *ShardMaster) addNewConfig() Config {
	lastConfig := sm.configs[len(sm.configs)-1]
	nextConfig := Config{Num: lastConfig.Num + 1, Shards: lastConfig.Shards, Groups: make(map[int][]string)}
	for gid, servers := range lastConfig.Groups {
		nextConfig.Groups[gid] = append([]string{}, servers...)
	}
	return nextConfig
}

3.6 Load balance

对于Join来说,就是把多数的GroupId换成新的GroupId
比如,开始是[1,1,1,1,1,2,2,2,2,2],然后又groupId=3加入,那么添加流程就是
[1,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,3,2,2,2,2] ->
[3,3,1,1,1,3,2,2,2,2]

Leave则是逆向操作,比如开始是[3,3,1,1,1,3,2,2,2,2], 然后需要把3撤走,那么撤走流程是
[3,3,1,1,1,3,2,2,2,2] ->
[1,3,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,2,2,2,2,2]

func (sm *ShardMaster) balanceLoad(c *Config, gid int, request string) {
	shardsMap := groupByGid(c)
	switch request {
	case JOIN:
		totalGroups := len(c.Groups)
		newShardNum := NShards / totalGroups
		for i := 0; i < newShardNum; i++ {
			maxGid := getMaxShardGid(shardsMap)
			c.Shards[shardsMap[maxGid][0]] = gid
			shardsMap[maxGid] = shardsMap[maxGid][1:]
		}
	case LEAVE:
		shardsList, isExists := shardsMap[gid]
		if !isExists {
			return
		}
		delete(shardsMap, gid)
		if len(c.Groups) == 0 {
			c.Shards = [NShards]int{}
			return
		}
		for _, value := range shardsList {
			minGid := getMinShardGid(shardsMap)
			c.Shards[value] = minGid
			shardsMap[minGid] = append(shardsMap[minGid], value)
		}
	}
}

helper functions, 这是把shards做group操作,变成map[gid] -> shards[]
比如 [3,3,1,1,1,3,2,2,2,2] 就会变成
map[1] -> [2,3,4]
map[2] ->[6,7,8,9]
map[3] ->[0,1]

func groupByGid(c *Config) map[int][]int {
	shardsMap := map[int][]int{}
	for k, _ := range c.Groups {
		shardsMap[k] = []int{}
	}
	for index, gid := range c.Shards {
		shardsMap[gid] = append(shardsMap[gid], index)
	}

	return shardsMap
}

func getMaxShardGid(shardsMap map[int][]int) int {
	max := -1
	gid := -1
	for index, shards := range shardsMap {
		if max < len(shards) {
			max = len(shards)
			gid = index
		}
	}
	return gid
}

func getMinShardGid(shardsMap map[int][]int) int {
	min := NShards
	gid := -1
	for index, shards := range shardsMap {
		if min > len(shards) {
			min = len(shards)
			gid = index
		}
	}
	return gid
}

四、总结

本实验难点在于完成load balance算法,推荐使用heap,这样应该是复杂度最低的

本文地址:https://blog.csdn.net/Joshmo/article/details/111955297

相关标签: 6.824