欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

MIT-6.824 Lab 3: Fault-tolerant Key/Value Service

程序员文章站 2023-10-17 18:28:12
概述 lab2中实现了raft协议,本lab将在raft之上实现一个可容错的k/v存储服务,第一部分是实现一个不带日志压缩的版本,第二部分是实现日志压缩。时间原因我只完成了第一部分。 设计思路 ![kvserver](https://blog 1253119293.cos.ap beijing.my ......

概述

lab2中实现了raft协议,本lab将在raft之上实现一个可容错的k/v存储服务,第一部分是实现一个不带日志压缩的版本,第二部分是实现日志压缩。时间原因我只完成了第一部分。

设计思路

MIT-6.824 Lab 3: Fault-tolerant Key/Value Service

如上图,lab2实现了raft协议,本lab将实现kvserver。每个raft都关联一个kvserver,clerks发送put(), append(), get() rpc给leader服务器中的kvserver,kvserver收到请求后将操作打包成log entry提交给raft,然后阻塞等待raft将这个entry拷贝到其它server,当log entry被拷贝到大部分的server后,leader 的raft会通知kvserver(raft往管道中塞comitted entry,kvserver通过读这个管道获取通知),kvserver执行命令,然后响应clerk。

clerk

客户端通过clerk发送请求,来看下clerk代码:

type clerk struct {
    servers []*labrpc.clientend
    // you will have to modify this struct.

    lastleader  int
    cid         int64
    seq         int
}

func (ck *clerk) get(key string) string {

    // you will have to modify this function.
    // 参数: 要读的key, 当前clerk的id,  请求序列号
    getargs := getargs{key: key, cid:ck.cid, seq:ck.seq}
    reply := getreply{}

    for {
        donech := make(chan bool, 1)
        go func() {
           //发送get() rpc
            ok := ck.servers[ck.lastleader].call("kvserver.get", &getargs, &reply)
            donech <- ok
        }()

        select {
        case <-time.after(600 * time.millisecond):
            dprintf("clerk(%d) retry putappend after timeout\n", ck.cid)
            continue
        case ok := <- donech:
           //收到响应后,并且是leader返回的,那么说明这个命令已经执行了
            if ok && reply.wrongleader != wrongleader {
                //请求序列号加1
              ck.seq++
                return reply.value
            }
        }

       //换一个server重试
        ck.lastleader++
        ck.lastleader %= len(ck.servers)
    }

    return ""
}

这里只给出了get()的代码,put()和append()类似,发送kvserver.get给一个server,如果这个server不是leader,换一个server重试。直到发给真正的leader,并且leader将这个命令拷贝到大部分其它server后,然后成功执行该命令,clerk.get()才会返回。

kvserver

再来看下服务端的代码,kvserver处理clerk的rpc请求:

type kvserver struct {
    mu      sync.mutex
    me      int
    rf      *raft.raft
    applych chan raft.applymsg

    maxraftstate int // snapshot if log grows this big

    // your definitions here.
   // 保存键值对
    db      map[string]string
    latestreplies map[int64]*latestreply
    notify map[int]chan struct{}
}

func (kv *kvserver) get(args *getargs, reply *getreply) {
    // your code here.
    if _, isleader := kv.rf.getstate(); !isleader {
        reply.wrongleader = wrongleader
        reply.err = ""
        return
    }

    // 防止重复请求
    kv.mu.lock()
    if latestreply, ok := kv.latestreplies[args.cid]; ok && args.seq <= latestreply.seq {
        reply.wrongleader = isleader
        reply.value = latestreply.reply.value
        reply.err = latestreply.reply.err
        kv.mu.unlock()
        return
    }
    kv.mu.unlock()

    command := op{operation:"get", key:args.key, cid:args.cid, seq:args.seq}
    index, term, _ := kv.rf.start(command)

    // 阻塞等待结果
    kv.mu.lock()
    ch := make(chan struct{})
    kv.notify[index] = ch
    kv.mu.unlock()

    select {
    case <-ch:
        curterm, isleader := kv.rf.getstate()
        dprintf("%v got notify at index %v, isleader = %v\n", kv.me, index, isleader)
        if !isleader || curterm != term {
            reply.wrongleader = wrongleader
            reply.err = ""
        } else {
            reply.wrongleader = isleader
            kv.mu.lock()
            if value, ok := kv.db[args.key]; ok {
                reply.value = value
                reply.err = ok
            } else {
                reply.err = errnokey
            }
            kv.mu.unlock()
        }

    }

}

kvserver.db用于保存键值对。
kvserver.get()首先判断自己是不是leader,如果不是leader,直接返回,这样clerk好重试其它server。如果是leader,先在缓存中找,看这个请求是否已经执行过了。
因为可能出现这么一种情况:如果leader commit一个entry后立即奔溃了,那么clerk就收不到响应,那么clerk会将这个请求发给新的leader,新的leader收到请求后如果不做任何措施,将会二次commit该log entry,对于put()和append()请求执行两次是不正确的,所以需要一个办法防止一个请求执行两次。
可以这么做:每个clerk都分配一个唯一的cid,每个请求分配一个唯一的序列号seq,每成功一个请求,该序列号加一。服务端记录每个客户端cid最近一次apply的请求的序列号seq和对应的响应结果,根据这个信息可知,当再次收到这个客户端的序列号小于seq的请求时,说明已经执行过了,直接返回结果。

如果之前没有执行过,那么调用

kv.rf.start(command)

将log entry提交给raft,并且阻塞等待raft将这个entry拷贝到其它大部分server,从阻塞返回后,说明这个entry已经被拷贝到大部分server了,并且已经执行了命令,这时可以将结果返回给clerk了。

那么在哪里接收raft的消息呢?kvserver在创建的时候会在一个线程中执行如下函数:

func (kv *kvserver) applydaemon()  {
    for appliedentry := range kv.applych {
        command := appliedentry.command.(op)

        // 执行命令, 过滤已经执行过得命令
        kv.mu.lock()
        if latestreply, ok := kv.latestreplies[command.cid]; !ok || command.seq > latestreply.seq {
            switch command.operation {
            case "get":
                latestreply := latestreply{seq:command.seq,}
                reply := getreply{}
                if value, ok := kv.db[command.key]; ok {
                    reply.value = value
                } else {
                    reply.err = errnokey
                }
                latestreply.reply = reply
                kv.latestreplies[command.cid] = &latestreply
            case "put":
                kv.db[command.key] = command.value
                latestreply := latestreply{seq:command.seq}
                kv.latestreplies[command.cid] = &latestreply
            case "append":
                kv.db[command.key] += command.value
                latestreply := latestreply{seq:command.seq}
                kv.latestreplies[command.cid] = &latestreply
            default:
                panic("invalid command operation")
            }
        }

        dprintf("%d applied index:%d, cmd:%v\n", kv.me, appliedentry.commandindex, command)

        // 通知
        if ch, ok := kv.notify[appliedentry.commandindex]; ok && ch != nil {
            dprintf("%d notify index %d\n",kv.me, appliedentry.commandindex)
            close(ch)
            delete(kv.notify, appliedentry.commandindex)
        }
        kv.mu.unlock()
    }
}

kv.applych这个chanel会在创建raft的时候传给raft,当某个log entry可以被commit的时候,raft会往这个chanel中塞,只要for循环这个kv.applych,就能知道已经被commit的entry,拿到entry后,根据其中的命令执行相应的操作,然后通知kvserver.get()继续执行。

具体代码在:
如有错误,欢迎指正:
15313676365