MIT-6.5840 Lab4笔记
前置知识
任务目标
Lab 4
要求我们基于Lab3的Raft算法实现一个分布式K/V Server以及快照功能
用于测试的文件在src/kvraft/test_test.go
,运行命令go test
或go test -race
进行测试。命令go test -v -run ${func_name}
对指定测试样例进行测试,也可以在VSCode中进入文件点击Test按钮
文档资料
以下列出完成Lab 4所需要阅读的资料:
- Lab 4 2024文档 介绍了Lab4的任务和一些规范,同时给出了一些实现提示
- MIT 6.824 6.824的官方视频课程,完成Lab 4可以看第六、七节,有部分参考
- Raft论文 Raft论文,要多看几遍,尤其是论文里的几张图,最好严格实现不要自己DIY。Section 8提到了Lab 4一些要注意的点
- Go语言教程 对于有一定编程基础,要快速入门书写Go程序的。这本国内的教程足够,既涵盖了基本语法,也提供了一些常用标准库的API供随时查看
- src/kvraft/test_test.go 测试过不去的时候可以阅读测试样例源码,看测试具体做了什么检测什么
课程程序规范
Lab4主要关注如下文件
- src/kvraft/client.go K/V Server。接受客户端的RPC请求,提交日志,在日志同步后返回结果
- src/kvraft/server.go K/V Client。用于给K/V Server发送RPC请求
- src/kvraft/config.go 一些测试用到的函数,把它当作测试文件的源码之一。阅读方便理解测试
- src/kvraft/test_test.go 测试文件,一般不需要动。如果某个测试样例过不去可以看该样例的函数,了解该样例大概需要完成什么任务,有助于Debug
- src/raft/*.go Lab3的Raft,如果在做Lab4的时候遇到了bug,需要检查是不是底层执行出了问题
Lab 4A
任务目标
Lab3实现一个基于Raft的key/value数据库
数据结构
Clerk
clientId标识唯一客户端。instructionCounter
标识本客户端的唯一请求。lastLeaderId
用于记录上次发送的LeaderId
,请求优先向LeaderId
发送,可以减少尝试请求的次数。
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
rwMutex sync.RWMutex
instructionCounter int32
clientId int
lastLeaderId int
}
KVServer
kvState
作为内存键值数据库。latestCmdMap
保存每个客户端最新的请求Id,以及请求的结果,方便过滤重复请求。reqChanMap
保存了每个请求的Channel,当接受Raft底层的提交后通过该channel响应客户端请求
type KVServer struct {
rwMutex sync.RWMutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()
maxraftstate int // snapshot if log grows this big
// Your definitions here.
lastApplied int
kvState map[string]string
// clientId -> latest command id
latestCmdMap map[int]LastCmdInfo
reqChanMap map[int]chan OpReply
}
type LastCmdInfo struct {
CommandId CommandId
Reply OpReply
}
Op
type Op struct {
// Your definitions here.
// Field names must start with capital letters,
// otherwise RPC will break.
// 0: Get 1:Put 2:Append
OpType OperationType
Key string
Value string
CommandId CommandId
}
type OpReply struct {
Value string
Err Err
}
客户端请求
Get
每个客户端只能串行地发送请求,因此开一个for循环直至请求得到响应。请求如果超时或发送给的不是Leader,则切换下一个服务器重试。记得更新lastLeaderId
用于记录上次发送的LeaderId
,请求优先向LeaderId
发送,可以减少尝试请求的次数。
func (ck *Clerk) Get(key string) string {
// You will have to modify this function.
var args GetArgs
var reply GetReply
args.Key = key
args.CommandId = ck.genClientCommandId()
ck.rwMutex.Lock()
var serverId = ck.lastLeaderId
ck.rwMutex.Unlock()
for {
// ck.logPrintf("Send Get %v request to server %d", args.CommandId, serverId)
if ck.servers[serverId].Call("KVServer.Get", &args, &reply) {
switch reply.Err {
case OK:
return reply.Value
case ErrNoKey:
return ""
case ErrWrongLeader:
serverId = (serverId + 1) % len(ck.servers)
case ErrCommandOverwritten:
continue
}
ck.rwMutex.Lock()
ck.lastLeaderId = serverId
ck.rwMutex.Unlock()
} else {
serverId = (serverId + 1) % len(ck.servers)
}
time.Sleep(REQUEST_INTERVAL * time.Millisecond)
}
}
PutAppend
和Get同理,只需要根据请求本身性质稍微改动即可
服务器处理请求
ReceiveRaftApply
注意,只有Leader KV Server才能处理KV Client的请求。但所有的KV Server都会通过kv.applyCh
从自己的下层Raft接受提交的日志/快照,且正常情况下所有的KV Server都会以相同顺序执行相同的指令。我们需要开一个协程专门接受Raft过来的applyMsg
func (kv *KVServer) ReceiveRaftApply() {
for !kv.killed() {
select {
case apply := <-kv.applyCh:
kv.LogPrintf(DDebug, "[ReceiveRaftApply] ApplyMsg: %v", apply)
if apply.CommandValid {
kv.rwMutex.Lock()
if apply.CommandIndex <= kv.lastApplied {
kv.rwMutex.Unlock()
continue
}
kv.lastApplied = apply.CommandIndex
op := apply.Command.(Op)
var opReply OpReply
if op.OpType != OP_GET && kv.checkDuplicateCommandId(op.CommandId.ClientId, op.CommandId) {
kv.LogPrintf(DDebug, "[ReceiveRaftApply] PutAppend Duplicate command id: %+v", op.CommandId)
opReply = kv.latestCmdMap[op.CommandId.ClientId].Reply
} else {
opReply = kv.executeCmd(op)
if op.OpType != OP_GET {
kv.latestCmdMap[op.CommandId.ClientId] = LastCmdInfo{
CommandId: op.CommandId,
Reply: opReply,
}
}
}
kv.LogPrintf(DInfo, "[ReceiveRaftApply] Command: %+v, reply: %+v", op, opReply)
kv.LogPrintf(DInfo, "[ReceiveRaftApply] KV State: %v", kv.kvState)
if _, isLeader := kv.rf.GetState(); isLeader {
reqChan := kv.getReqChanWithDefault(apply.CommandIndex)
reqChan <- opReply
}
kv.rwMutex.Unlock()
}
}
}
}
reqChan
最好提供大小为1的缓存。一开始我发现Leader节点总是在KV Sever应用层阻塞,而其他节点则没这个问题。因为Leader节点需要向reqChan
发送消息,然后在Get
或PutAppend
函数里处理,而Get
等函数是有超时机制的,如果不设置缓冲区,会导致超时后,消息到达却没地方接受,导致发送方阻塞(在ReceiveRaftApply)
func (kv *KVServer) getReqChanWithDefault(commandIdx int) chan OpReply {
if recvChan, ok := kv.reqChanMap[commandIdx]; ok {
return recvChan
}
recvChan := make(chan OpReply, 1)
kv.reqChanMap[commandIdx] = recvChan
return recvChan
}
PutAppend
Leader接受客户端请求后,在kv.recvChanMap
为该请求创建一个Channel等待 该请求对应的日志被提交,只有日志被提交后我们才可以回复客户端。
kv.recvChanMap[ComandId]
什么时候会有数据过来呢?就在ReceiveRaftApply函数里,该函数统一接受所有applyMsg,然后检查该applyMsg是否有对应的请求Channel,有则发送给Channel。相当于ReceiveRaftApply是一个统一拦截器,对于特殊的applyMsg会转发给Get/Put/Append再进行处理
func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
kv.rwMutex.Lock()
if kv.checkDuplicateCommandId(args.CommandId.ClientId, args.CommandId) {
kv.LogPrintf(DDebug, "Duplicate PUT command id: %+v", args.CommandId)
opReply := kv.latestCmdMap[args.CommandId.ClientId].Reply
reply.Err = opReply.Err
kv.rwMutex.Unlock()
return
}
kv.rwMutex.Unlock()
// Your code here.
var startIndex, _, isLeader = kv.rf.Start(Op{
OpType: args.OpType,
Key: args.Key,
Value: args.Value,
CommandId: args.CommandId,
})
if !isLeader {
reply.Err = ErrWrongLeader
return
}
// kv.LogPrintf(DInfo, "Receive PutAppend request: %+v.Raft Log Index:%v. Args: %v %v", args.CommandId, startIndex, args.Key, args.Value)
kv.rwMutex.Lock()
var reqChan = kv.getReqChanWithDefault(startIndex)
kv.rwMutex.Unlock()
select {
case result := <-reqChan:
reply.Err = result.Err
case <-time.After(APPLY_WAIT_INTERVAL):
reply.Err = ErrTimeout
}
kv.rwMutex.Lock()
close(reqChan)
delete(kv.reqChanMap, startIndex)
kv.rwMutex.Unlock()
}
Get
和PutAppend同理
Lab4B
任务目标
Lab4B要求我们对KV Server应用实现快照功能
数据结构
Snapshot
KV Server需要保存自己的KV内存数据库和Server维护的关于每个Client Request的状态。
说白了就是kvState
和latestCmdMap
两个字段
快照存储
SnapshotLoop
根据试验要求,我们需要检查Snapshot的大小,如果大于程序为我们传递的maxraftstate
,则需要进行快照存储。这里要求我们手动制作快照并调用Raft的Snapshot
函数
那么什么时候检测是否需要进行快照并执行呢?在kvState
大小改变时,也就是接受Raft的Apply时,在ReceiveRaftApply
函数中插入逻辑即可
别忘了在Lab 3D中,Snapshot制作完成后也会通过Apply Channel发送一个ApplyMsg
,同样是在ReceiveRaftApply
函数中处理
// 判断是否需要 snapshot
if kv.maxraftstate != -1 && kv.rf.StateSize() >= kv.maxraftstate {
kv.makeSnapshot(apply.CommandIndex)
}
if apply.SnapshotValid {
kv.rwMutex.Lock()
kv.installSnapshot(apply.Snapshot)
kv.lastApplied = apply.SnapshotIndex
kv.rwMutex.Unlock()
}
installSnapshot
安装Snapshot就是解码的过程
func (kv *KVServer) installSnapshot(snapshot []byte) {
if snapshot == nil || len(snapshot) <= 0 {
return
}
var buffer = bytes.NewBuffer(snapshot)
var dec = labgob.NewDecoder(buffer)
var kvState map[string]string
var latestCmdMap map[int]CommandId
if dec.Decode(&kvState) != nil ||
dec.Decode(&latestCmdMap) != nil {
kv.logPrintf("reading persister got a problem")
} else {
kv.kvState = kvState
kv.latestCmdMap = latestCmdMap
}
}
注意KV Server启动的时候应该先检查persister
中是否有快照,如果有则安装
kv.installSnapshot(persister.ReadSnapshot())