lab2中实现了raft协议,本lab将在raft之上实现一个可容错的k/v存储服务,第一部分是实现一个不带日志压缩的版本,第二部分是实现日志压缩。时间缘由我只完成了第一部分。git
如上图,lab2实现了raft协议,本lab将实现kvserver。每一个raft都关联一个kvserver,Clerks发送Put(), Append(), Get() RPC给leader服务器中的kvserver,kvserver收到请求后将操做打包成Log Entry提交给raft,而后阻塞等待raft将这个Entry拷贝到其它server,当Log Entry被拷贝到大部分的server后,leader 的raft会通知kvserver(raft往管道中塞comitted Entry,kvserver经过读这个管道获取通知),kvserver执行命令,而后响应Clerk。github
客户端经过Clerk发送请求,来看下Clerk代码:缓存
type Clerk struct { servers []*labrpc.ClientEnd // You will have to modify this struct. lastLeader int cid int64 seq int } func (ck *Clerk) Get(key string) string { // You will have to modify this function. // 参数: 要读的key, 当前clerk的id, 请求序列号 getArgs := GetArgs{Key: key, Cid:ck.cid, Seq:ck.seq} reply := GetReply{} for { doneCh := make(chan bool, 1) go func() { //发送Get() RPC ok := ck.servers[ck.lastLeader].Call("KVServer.Get", &getArgs, &reply) doneCh <- ok }() select { case <-time.After(600 * time.Millisecond): DPrintf("clerk(%d) retry PutAppend after timeout\n", ck.cid) continue case ok := <- doneCh: //收到响应后,而且是leader返回的,那么说明这个命令已经执行了 if ok && reply.WrongLeader != WrongLeader { //请求序列号加1 ck.seq++ return reply.Value } } //换一个server重试 ck.lastLeader++ ck.lastLeader %= len(ck.servers) } return "" }
这里只给出了Get()的代码,Put()和Append()相似,发送KVServer.Get给一个server,若是这个server不是leader,换一个server重试。直到发给真正的leader,而且leader将这个命令拷贝到大部分其它server后,而后成功执行该命令,Clerk.Get()才会返回。服务器
再来看下服务端的代码,KVServer处理Clerk的RPC请求:app
type KVServer struct { mu sync.Mutex me int rf *raft.Raft applyCh chan raft.ApplyMsg maxraftstate int // snapshot if log grows this big // Your definitions here. // 保存键值对 db map[string]string latestReplies map[int64]*LatestReply notify map[int]chan struct{} } func (kv *KVServer) Get(args *GetArgs, reply *GetReply) { // Your code here. if _, isLeader := kv.rf.GetState(); !isLeader { reply.WrongLeader = WrongLeader reply.Err = "" return } // 防止重复请求 kv.mu.Lock() if latestReply, ok := kv.latestReplies[args.Cid]; ok && args.Seq <= latestReply.Seq { reply.WrongLeader = IsLeader reply.Value = latestReply.Reply.Value reply.Err = latestReply.Reply.Err kv.mu.Unlock() return } kv.mu.Unlock() command := Op{Operation:"Get", Key:args.Key, Cid:args.Cid, Seq:args.Seq} index, term, _ := kv.rf.Start(command) // 阻塞等待结果 kv.mu.Lock() ch := make(chan struct{}) kv.notify[index] = ch kv.mu.Unlock() select { case <-ch: curTerm, isLeader := kv.rf.GetState() DPrintf("%v got notify at index %v, isLeader = %v\n", kv.me, index, isLeader) if !isLeader || curTerm != term { reply.WrongLeader = WrongLeader reply.Err = "" } else { reply.WrongLeader = IsLeader kv.mu.Lock() if value, ok := kv.db[args.Key]; ok { reply.Value = value reply.Err = OK } else { reply.Err = ErrNoKey } kv.mu.Unlock() } } }
KVServer.db用于保存键值对。
KVServer.Get()首先判断本身是否是leader,若是不是leader,直接返回,这样Clerk好重试其它server。若是是leader,先在缓存中找,看这个请求是否已经执行过了。
由于可能出现这么一种状况:若是leader commit一个Entry后当即奔溃了,那么Clerk就收不到响应,那么Clerk会将这个请求发给新的leader,新的leader收到请求后若是不作任何措施,将会二次commit该Log Entry,对于Put()和Append()请求执行两次是不正确的,因此须要一个办法防止一个请求执行两次。
能够这么作:每一个Clerk都分配一个惟一的cid,每一个请求分配一个惟一的序列号seq,每成功一个请求,该序列号加一。服务端记录每一个客户端cid最近一次apply的请求的序列号seq和对应的响应结果,根据这个信息可知,当再次收到这个客户端的序列号小于seq的请求时,说明已经执行过了,直接返回结果。函数
若是以前没有执行过,那么调用this
kv.rf.Start(command)
将Log Entry提交给raft,而且阻塞等待raft将这个Entry拷贝到其它大部分server,从阻塞返回后,说明这个Entry已经被拷贝到大部分server了,而且已经执行了命令,这时能够将结果返回给Clerk了。线程
那么在哪里接收raft的消息呢?KVServer在建立的时候会在一个线程中执行以下函数:设计
func (kv *KVServer) applyDaemon() { for appliedEntry := range kv.applyCh { command := appliedEntry.Command.(Op) // 执行命令, 过滤已经执行过得命令 kv.mu.Lock() if latestReply, ok := kv.latestReplies[command.Cid]; !ok || command.Seq > latestReply.Seq { switch command.Operation { case "Get": latestReply := LatestReply{Seq:command.Seq,} reply := GetReply{} if value, ok := kv.db[command.Key]; ok { reply.Value = value } else { reply.Err = ErrNoKey } latestReply.Reply = reply kv.latestReplies[command.Cid] = &latestReply case "Put": kv.db[command.Key] = command.Value latestReply := LatestReply{Seq:command.Seq} kv.latestReplies[command.Cid] = &latestReply case "Append": kv.db[command.Key] += command.Value latestReply := LatestReply{Seq:command.Seq} kv.latestReplies[command.Cid] = &latestReply default: panic("invalid command operation") } } DPrintf("%d applied index:%d, cmd:%v\n", kv.me, appliedEntry.CommandIndex, command) // 通知 if ch, ok := kv.notify[appliedEntry.CommandIndex]; ok && ch != nil { DPrintf("%d notify index %d\n",kv.me, appliedEntry.CommandIndex) close(ch) delete(kv.notify, appliedEntry.CommandIndex) } kv.mu.Unlock() } }
kv.applyCh这个chanel会在建立raft的时候传给raft,当某个Log Entry能够被commit的时候,raft会往这个chanel中塞,只要for循环这个kv.applyCh,就能知道已经被commit的Entry,拿到Entry后,根据其中的命令执行相应的操做,而后通知KVServer.Get()继续执行。3d
具体代码在:https://github.com/gatsbyd/mit_6.824_2018 若有错误,欢迎指正: 15313676365