MIT-6.824 Lab 3: Fault-tolerant Key/Value Service
概述
lab2中实现了raft协议,本lab将在raft之上实现一个可容错的k/v存储服务,第一部分是实现一个不带日志压缩的版本,第二部分是实现日志压缩。时间原因我只完成了第一部分。
设计思路
如上图,lab2实现了raft协议,本lab将实现kvserver。每个raft都关联一个kvserver,clerks发送put(), append(), get() rpc给leader服务器中的kvserver,kvserver收到请求后将操作打包成log entry提交给raft,然后阻塞等待raft将这个entry拷贝到其它server,当log entry被拷贝到大部分的server后,leader 的raft会通知kvserver(raft往管道中塞comitted entry,kvserver通过读这个管道获取通知),kvserver执行命令,然后响应clerk。
clerk
客户端通过clerk发送请求,来看下clerk代码:
type clerk struct { servers []*labrpc.clientend // you will have to modify this struct. lastleader int cid int64 seq int } func (ck *clerk) get(key string) string { // you will have to modify this function. // 参数: 要读的key, 当前clerk的id, 请求序列号 getargs := getargs{key: key, cid:ck.cid, seq:ck.seq} reply := getreply{} for { donech := make(chan bool, 1) go func() { //发送get() rpc ok := ck.servers[ck.lastleader].call("kvserver.get", &getargs, &reply) donech <- ok }() select { case <-time.after(600 * time.millisecond): dprintf("clerk(%d) retry putappend after timeout\n", ck.cid) continue case ok := <- donech: //收到响应后,并且是leader返回的,那么说明这个命令已经执行了 if ok && reply.wrongleader != wrongleader { //请求序列号加1 ck.seq++ return reply.value } } //换一个server重试 ck.lastleader++ ck.lastleader %= len(ck.servers) } return "" }
这里只给出了get()的代码,put()和append()类似,发送kvserver.get给一个server,如果这个server不是leader,换一个server重试。直到发给真正的leader,并且leader将这个命令拷贝到大部分其它server后,然后成功执行该命令,clerk.get()才会返回。
kvserver
再来看下服务端的代码,kvserver处理clerk的rpc请求:
type kvserver struct { mu sync.mutex me int rf *raft.raft applych chan raft.applymsg maxraftstate int // snapshot if log grows this big // your definitions here. // 保存键值对 db map[string]string latestreplies map[int64]*latestreply notify map[int]chan struct{} } func (kv *kvserver) get(args *getargs, reply *getreply) { // your code here. if _, isleader := kv.rf.getstate(); !isleader { reply.wrongleader = wrongleader reply.err = "" return } // 防止重复请求 kv.mu.lock() if latestreply, ok := kv.latestreplies[args.cid]; ok && args.seq <= latestreply.seq { reply.wrongleader = isleader reply.value = latestreply.reply.value reply.err = latestreply.reply.err kv.mu.unlock() return } kv.mu.unlock() command := op{operation:"get", key:args.key, cid:args.cid, seq:args.seq} index, term, _ := kv.rf.start(command) // 阻塞等待结果 kv.mu.lock() ch := make(chan struct{}) kv.notify[index] = ch kv.mu.unlock() select { case <-ch: curterm, isleader := kv.rf.getstate() dprintf("%v got notify at index %v, isleader = %v\n", kv.me, index, isleader) if !isleader || curterm != term { reply.wrongleader = wrongleader reply.err = "" } else { reply.wrongleader = isleader kv.mu.lock() if value, ok := kv.db[args.key]; ok { reply.value = value reply.err = ok } else { reply.err = errnokey } kv.mu.unlock() } } }
kvserver.db用于保存键值对。
kvserver.get()首先判断自己是不是leader,如果不是leader,直接返回,这样clerk好重试其它server。如果是leader,先在缓存中找,看这个请求是否已经执行过了。
因为可能出现这么一种情况:如果leader commit一个entry后立即奔溃了,那么clerk就收不到响应,那么clerk会将这个请求发给新的leader,新的leader收到请求后如果不做任何措施,将会二次commit该log entry,对于put()和append()请求执行两次是不正确的,所以需要一个办法防止一个请求执行两次。
可以这么做:每个clerk都分配一个唯一的cid,每个请求分配一个唯一的序列号seq,每成功一个请求,该序列号加一。服务端记录每个客户端cid最近一次apply的请求的序列号seq和对应的响应结果,根据这个信息可知,当再次收到这个客户端的序列号小于seq的请求时,说明已经执行过了,直接返回结果。
如果之前没有执行过,那么调用
kv.rf.start(command)
将log entry提交给raft,并且阻塞等待raft将这个entry拷贝到其它大部分server,从阻塞返回后,说明这个entry已经被拷贝到大部分server了,并且已经执行了命令,这时可以将结果返回给clerk了。
那么在哪里接收raft的消息呢?kvserver在创建的时候会在一个线程中执行如下函数:
func (kv *kvserver) applydaemon() { for appliedentry := range kv.applych { command := appliedentry.command.(op) // 执行命令, 过滤已经执行过得命令 kv.mu.lock() if latestreply, ok := kv.latestreplies[command.cid]; !ok || command.seq > latestreply.seq { switch command.operation { case "get": latestreply := latestreply{seq:command.seq,} reply := getreply{} if value, ok := kv.db[command.key]; ok { reply.value = value } else { reply.err = errnokey } latestreply.reply = reply kv.latestreplies[command.cid] = &latestreply case "put": kv.db[command.key] = command.value latestreply := latestreply{seq:command.seq} kv.latestreplies[command.cid] = &latestreply case "append": kv.db[command.key] += command.value latestreply := latestreply{seq:command.seq} kv.latestreplies[command.cid] = &latestreply default: panic("invalid command operation") } } dprintf("%d applied index:%d, cmd:%v\n", kv.me, appliedentry.commandindex, command) // 通知 if ch, ok := kv.notify[appliedentry.commandindex]; ok && ch != nil { dprintf("%d notify index %d\n",kv.me, appliedentry.commandindex) close(ch) delete(kv.notify, appliedentry.commandindex) } kv.mu.unlock() } }
kv.applych这个chanel会在创建raft的时候传给raft,当某个log entry可以被commit的时候,raft会往这个chanel中塞,只要for循环这个kv.applych,就能知道已经被commit的entry,拿到entry后,根据其中的命令执行相应的操作,然后通知kvserver.get()继续执行。
具体代码在:
如有错误,欢迎指正:
15313676365