2020 6.824 的 Raft Lab 4A
目录
前言
做2020的MIT6.824,完成了实验 Lab4A,通过了测试,对于之前的Raft实现的实验请参考Lab 2A, Lab 2B 和 Lab 2C 和 Lab 3A 以及 Lab 3B
Lab4A主要是做DB的分片,也就是Client向Server请求,Server根据不同的Request类型,向不同的DB获取/更改数据。实验重点需要完成master对DB server的Join/Leave/Move的三个操作,并且实验Client对Server的configs查询的操作。
整个实验有三个点需要注意
- load balance的算法
- config需要拷贝而不是引用
- labgob需要注册JoinArgs,LeaveArgs,MoveArgs 和 QueryArgs
一、Overview
1.1 架构图
实验是通过数据库分片来提升性能的,Master负责数据库的分片,大概架构如下
1.2 架构细节
第一,对于本次分片来说,最多可以有10个Group,这里Common文件可以看出来
// The number of shards.
const NShards = 10
第二,对于Master来说,也是有多个servers的,所以需要通过Raft保证shards configuration的一致性,从config可以看出来,一个有三个属性需要维护的,其中Shards就是分片configuration, 初始是[0,0,0,0,0,0,0,0,0,0],Groups就是对应每个shard分片中Raft servers的information,比如Group1 -> server[a, b, c]
type Config struct {
Num int // config number
Shards [NShards]int // shard -> gid
Groups map[int][]string // gid -> servers[]
}
举个例子,shards初始为[0,0,0,0,0,0,0,0,0,0],那么Join了Group1 -> servers[a, b, c] 之后,整个系统就有1个Group了,那么,shards就会变成[1,1,1,1,1,1,1,1,1,1],如果JoinGroup2 -> servers[e, f, g]之后,整个系统就有2个groups了,那么,10个shards就需要尽量平均分配给两个Groups,也就是[1,1,1,1,1,2,2,2,2,2]
这里也就是涉及到了load balance的算法,我的算法实现参考了 网上 的方法,并没有用到高级的数据结构比如heap来实现
二、client
这个跟kvraft中client的实现非常相似,基本搬过来就行
其中,Clerk的构造函数是
type Clerk struct {
servers []*labrpc.ClientEnd
mu sync.Mutex
leaderId int
clientId int64
sequenceId int64
// Your data here.
}
func MakeClerk(servers []*labrpc.ClientEnd) *Clerk {
ck := new(Clerk)
ck.servers = servers
// Your code here.
ck.clientId = nrand()
return ck
}
增加一些新的属性, 主要就是ClientId很SequenceId
type JoinArgs struct {
Servers map[int][]string // new GID -> servers mappings
ClientId int64
SequenceId int64
}
type LeaveArgs struct {
GIDs []int
ClientId int64
SequenceId int64
}
type MoveArgs struct {
Shard int
GID int
ClientId int64
SequenceId int64
}
type QueryArgs struct {
Num int // desired config number
ClientId int64
SequenceId int64
}
Join/Move/Leave/Query 的实现,对于Join/Move/Leave 基本都是一样的
func (ck *Clerk) Query(num int) Config {
args := &QueryArgs{}
// Your code here.
args.Num = num
args.ClientId = ck.clientId
args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)
for {
reply := QueryReply{}
if ck.servers[ck.currentLeader()].Call("ShardMaster.Query", args, &reply) && !reply.WrongLeader {
return reply.Config
}
ck.leaderId = ck.changeLeader()
time.Sleep(100 * time.Millisecond)
}
}
func (ck *Clerk) Join(servers map[int][]string) {
args := &JoinArgs{}
args.Servers = servers
args.ClientId = ck.clientId
args.SequenceId = atomic.AddInt64(&ck.sequenceId, 1)
for {
reply := JoinReply{}
if ck.servers[ck.currentLeader()].Call("ShardMaster.Join", args, &reply) && !reply.WrongLeader {
return
}
ck.leaderId = ck.changeLeader()
time.Sleep(100 * time.Millisecond)
}
}
三、Master server
3.1 属性
跟kvRaft的server还是很类似的,需要有两个mapper,分别是requestMapper和sequenceMapper
type ShardMaster struct {
mu sync.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
// Your data here.
dead int32
configs []Config // indexed by config num
requestMapper map[int]chan Op
sequenceMapper map[int64]int64
}
type Op struct {
// Your data here.
SequenceId int64
ClientId int64
OpType string
OpArgs interface{}
Index int
Term int
}
type joinLeaveMoveReply struct {
WrongLeader bool
Err Err
}
const (
JOIN string = "Join"
LEAVE string = "Leave"
MOVE string = "Move"
QUERY string = "Query"
)
3.2 构造函数
这里有个很重要的地方,那就是需要向labgob Register 任何自定义的STRUCT 否则会出现interface{ } is nil 的报错
func StartServer(servers []*labrpc.ClientEnd, me int, persister *raft.Persister) *ShardMaster {
sm := new(ShardMaster)
sm.me = me
sm.configs = make([]Config, 1)
sm.configs[0].Groups = map[int][]string{}
labgob.Register(Op{})
labgob.Register(JoinArgs{})
labgob.Register(LeaveArgs{})
labgob.Register(MoveArgs{})
labgob.Register(QueryArgs{})
sm.applyCh = make(chan raft.ApplyMsg)
sm.rf = raft.Make(servers, me, persister, sm.applyCh)
// Your code here.
sm.sequenceMapper = make(map[int64]int64)
sm.requestMapper = make(map[int]chan Op)
go sm.serverMonitor()
return sm
}
3.3 Join/Move/Leave
对于Join/Move/Leave 通过Raft保证一致性,实现基本都是一样的,就是
- rf.Start通知raft
- 等待让她applyMsg的通知,对比term,如果一致则返回success
func (sm *ShardMaster) Join(args *JoinArgs, reply *JoinReply) {
var isLeader bool
clientOp := Op{OpType: JOIN, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}
func (sm *ShardMaster) Leave(args *LeaveArgs, reply *LeaveReply) {
var isLeader bool
clientOp := Op{OpType: LEAVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}
func (sm *ShardMaster) Move(args *MoveArgs, reply *MoveReply) {
var isLeader bool
clientOp := Op{OpType: MOVE, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
joinLeaveMoveReply := sm.joinLeaveMove(clientOp)
reply.WrongLeader, reply.Err = joinLeaveMoveReply.WrongLeader, joinLeaveMoveReply.Err
}
func (sm *ShardMaster) joinLeaveMove(clientOp Op) joinLeaveMoveReply {
reply := joinLeaveMoveReply{}
ch := sm.getChannel(clientOp.Index)
defer func() {
sm.mu.Lock()
delete(sm.requestMapper, clientOp.Index)
sm.mu.Unlock()
}()
timer := time.NewTicker(2000 * time.Millisecond)
defer timer.Stop()
select {
case op := <-ch:
sm.mu.Lock()
opTerm := op.Term
sm.mu.Unlock()
if clientOp.Term != opTerm {
reply.WrongLeader = true
} else {
reply.Err = OK
}
case <-timer.C:
reply.WrongLeader = true
}
return reply
}
3.4 Query
query的实现return的内容稍微多了一点,那就是需要return config,根据要求,query为-1或者大于configs长度的都返回最新的config,否则返回对应num的config
func (sm *ShardMaster) Query(args *QueryArgs, reply *QueryReply) {
// Your code here.
DPrintf("[%v] Query is called", sm.me)
var isLeader bool
clientOp := Op{OpType: QUERY, OpArgs: *args, SequenceId: args.SequenceId, ClientId: args.ClientId}
clientOp.Index, clientOp.Term, isLeader = sm.rf.Start(clientOp)
if !isLeader {
reply.WrongLeader = true
return
}
DPrintf("Query [%v] leader is found", sm.me)
ch := sm.getChannel(clientOp.Index)
defer func() {
sm.mu.Lock()
delete(sm.requestMapper, clientOp.Index)
sm.mu.Unlock()
}()
timer := time.NewTicker(2000 * time.Millisecond)
defer timer.Stop()
select {
case op := <-ch:
DPrintf("[%v] QUERY receive op", sm.me)
sm.mu.Lock()
opTerm := op.Term
sm.mu.Unlock()
DPrintf("[%v] QUERY clientOp.Term[%v] vs opTerm[%v]", sm.me, clientOp.Term, opTerm)
if clientOp.Term != opTerm {
reply.WrongLeader = true
} else {
DPrintf("[%v] QUERY args.Num=%v sm.config=%v", sm.me, args.Num, sm.configs)
sm.mu.Lock()
reply.Err = OK
if args.Num >= 0 && args.Num < len(sm.configs) {
reply.Config = sm.configs[args.Num]
} else {
reply.Config = sm.configs[len(sm.configs)-1]
}
sm.mu.Unlock()
}
case <-timer.C:
reply.WrongLeader = true
}
}
3.5 serverMonitor
这个函数是用来监视Raft的applyCh,如果是Join或者是Leave,需要重新平衡Shards中Groups的分配,其中addNewConfig是很重要的,这是用来避免map的引用带来的error
func (sm *ShardMaster) serverMonitor() {
for {
if sm.killed() {
return
}
select {
case msg := <-sm.applyCh:
if msg.IsSnapshot || !msg.CommandValid {
continue
}
index := msg.CommandIndex
term := msg.CommandTerm
op := msg.Command.(Op)
sm.mu.Lock()
sequenceInMapper, hasSequence := sm.sequenceMapper[op.ClientId]
op.Term = term
if !hasSequence || op.SequenceId > sequenceInMapper {
switch op.OpType {
case JOIN:
newConfig := sm.addNewConfig()
joinArgs := op.OpArgs.(JoinArgs)
for i, servers := range joinArgs.Servers {
newConfig.Groups[i] = servers
sm.balanceLoad(&newConfig, i, JOIN)
}
sm.configs = append(sm.configs, newConfig)
case LEAVE:
newConfig := sm.addNewConfig()
leaveArgs := op.OpArgs.(LeaveArgs)
for _, gid := range leaveArgs.GIDs {
delete(newConfig.Groups, gid)
sm.balanceLoad(&newConfig, gid, LEAVE)
}
sm.configs = append(sm.configs, newConfig)
case MOVE:
newConfig := sm.addNewConfig()
moveArgs := op.OpArgs.(MoveArgs)
if _, isExists := newConfig.Groups[moveArgs.GID]; isExists {
newConfig.Shards[moveArgs.Shard] = moveArgs.GID
} else {
return
}
sm.configs = append(sm.configs, newConfig)
}
sm.sequenceMapper[op.ClientId] = op.SequenceId
}
sm.mu.Unlock()
sm.getChannel(index) <- op
}
}
}
func (sm *ShardMaster) addNewConfig() Config {
lastConfig := sm.configs[len(sm.configs)-1]
nextConfig := Config{Num: lastConfig.Num + 1, Shards: lastConfig.Shards, Groups: make(map[int][]string)}
for gid, servers := range lastConfig.Groups {
nextConfig.Groups[gid] = append([]string{}, servers...)
}
return nextConfig
}
3.6 Load balance
对于Join来说,就是把多数的GroupId换成新的GroupId
比如,开始是[1,1,1,1,1,2,2,2,2,2],然后又groupId=3加入,那么添加流程就是
[1,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,2,2,2,2,2] ->
[3,1,1,1,1,3,2,2,2,2] ->
[3,3,1,1,1,3,2,2,2,2]
Leave则是逆向操作,比如开始是[3,3,1,1,1,3,2,2,2,2], 然后需要把3撤走,那么撤走流程是
[3,3,1,1,1,3,2,2,2,2] ->
[1,3,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,3,2,2,2,2] ->
[1,1,1,1,1,2,2,2,2,2]
func (sm *ShardMaster) balanceLoad(c *Config, gid int, request string) {
shardsMap := groupByGid(c)
switch request {
case JOIN:
totalGroups := len(c.Groups)
newShardNum := NShards / totalGroups
for i := 0; i < newShardNum; i++ {
maxGid := getMaxShardGid(shardsMap)
c.Shards[shardsMap[maxGid][0]] = gid
shardsMap[maxGid] = shardsMap[maxGid][1:]
}
case LEAVE:
shardsList, isExists := shardsMap[gid]
if !isExists {
return
}
delete(shardsMap, gid)
if len(c.Groups) == 0 {
c.Shards = [NShards]int{}
return
}
for _, value := range shardsList {
minGid := getMinShardGid(shardsMap)
c.Shards[value] = minGid
shardsMap[minGid] = append(shardsMap[minGid], value)
}
}
}
helper functions, 这是把shards做group操作,变成map[gid] -> shards[]
比如 [3,3,1,1,1,3,2,2,2,2] 就会变成
map[1] -> [2,3,4]
map[2] ->[6,7,8,9]
map[3] ->[0,1]
func groupByGid(c *Config) map[int][]int {
shardsMap := map[int][]int{}
for k, _ := range c.Groups {
shardsMap[k] = []int{}
}
for index, gid := range c.Shards {
shardsMap[gid] = append(shardsMap[gid], index)
}
return shardsMap
}
func getMaxShardGid(shardsMap map[int][]int) int {
max := -1
gid := -1
for index, shards := range shardsMap {
if max < len(shards) {
max = len(shards)
gid = index
}
}
return gid
}
func getMinShardGid(shardsMap map[int][]int) int {
min := NShards
gid := -1
for index, shards := range shardsMap {
if min > len(shards) {
min = len(shards)
gid = index
}
}
return gid
}
四、总结
本实验难点在于完成load balance算法,推荐使用heap,这样应该是复杂度最低的
本文地址:https://blog.csdn.net/Joshmo/article/details/111955297