OceanPresent

MIT-6.5840 Lab4笔记


前置知识

任务目标

Lab 4要求我们基于Lab3的Raft算法实现一个分布式K/V Server以及快照功能

用于测试的文件在src/kvraft/test_test.go,运行命令go testgo test -race进行测试。命令go test -v -run ${func_name}对指定测试样例进行测试,也可以在VSCode中进入文件点击Test按钮

文档资料

以下列出完成Lab 4所需要阅读的资料:

  • Lab 4 2024文档 介绍了Lab4的任务和一些规范,同时给出了一些实现提示
  • MIT 6.824 6.824的官方视频课程,完成Lab 4可以看第六、七节,有部分参考
  • Raft论文 Raft论文,要多看几遍,尤其是论文里的几张图,最好严格实现不要自己DIY。Section 8提到了Lab 4一些要注意的点
  • Go语言教程 对于有一定编程基础,要快速入门书写Go程序的。这本国内的教程足够,既涵盖了基本语法,也提供了一些常用标准库的API供随时查看
  • src/kvraft/test_test.go 测试过不去的时候可以阅读测试样例源码,看测试具体做了什么检测什么

课程程序规范

Lab4主要关注如下文件

  • src/kvraft/client.go K/V Server。接受客户端的RPC请求,提交日志,在日志同步后返回结果
  • src/kvraft/server.go K/V Client。用于给K/V Server发送RPC请求
  • src/kvraft/config.go 一些测试用到的函数,把它当作测试文件的源码之一。阅读方便理解测试
  • src/kvraft/test_test.go 测试文件,一般不需要动。如果某个测试样例过不去可以看该样例的函数,了解该样例大概需要完成什么任务,有助于Debug
  • src/raft/*.go Lab3的Raft,如果在做Lab4的时候遇到了bug,需要检查是不是底层执行出了问题

Lab 4A

任务目标

Lab3实现一个基于Raft的key/value数据库

数据结构

Clerk

clientId标识唯一客户端。instructionCounter标识本客户端的唯一请求。lastLeaderId用于记录上次发送的LeaderId,请求优先向LeaderId发送,可以减少尝试请求的次数。

type Clerk struct {
	servers []*labrpc.ClientEnd
	// You will have to modify this struct.
	rwMutex            sync.RWMutex
	instructionCounter int32
	clientId           int
	lastLeaderId       int
}

KVServer

kvState作为内存键值数据库。latestCmdMap保存每个客户端最新的请求Id,以及请求的结果,方便过滤重复请求。reqChanMap保存了每个请求的Channel,当接受Raft底层的提交后通过该channel响应客户端请求

type KVServer struct {
	rwMutex sync.RWMutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	lastApplied int
	kvState     map[string]string
	// clientId -> latest command id
	latestCmdMap map[int]LastCmdInfo
	reqChanMap   map[int]chan OpReply
}

type LastCmdInfo struct {
	CommandId CommandId
	Reply     OpReply
}

Op

type Op struct {
	// Your definitions here.
	// Field names must start with capital letters,
	// otherwise RPC will break.

	// 0: Get 1:Put 2:Append
	OpType    OperationType
	Key       string
	Value     string
	CommandId CommandId
}

type OpReply struct {
	Value string
	Err   Err
}

客户端请求

Get

每个客户端只能串行地发送请求,因此开一个for循环直至请求得到响应。请求如果超时或发送给的不是Leader,则切换下一个服务器重试。记得更新lastLeaderId用于记录上次发送的LeaderId,请求优先向LeaderId发送,可以减少尝试请求的次数。

func (ck *Clerk) Get(key string) string {

	// You will have to modify this function.
	var args GetArgs
	var reply GetReply
	args.Key = key
	args.CommandId = ck.genClientCommandId()

	ck.rwMutex.Lock()
	var serverId = ck.lastLeaderId
	ck.rwMutex.Unlock()
	for {
		// ck.logPrintf("Send Get %v request to server %d", args.CommandId, serverId)
		if ck.servers[serverId].Call("KVServer.Get", &args, &reply) {
			switch reply.Err {
			case OK:
				return reply.Value
			case ErrNoKey:
				return ""
			case ErrWrongLeader:
				serverId = (serverId + 1) % len(ck.servers)
			case ErrCommandOverwritten:
				continue
			}
			ck.rwMutex.Lock()
			ck.lastLeaderId = serverId
			ck.rwMutex.Unlock()
		} else {
			serverId = (serverId + 1) % len(ck.servers)
		}
		time.Sleep(REQUEST_INTERVAL * time.Millisecond)
	}
}

PutAppend

和Get同理,只需要根据请求本身性质稍微改动即可

服务器处理请求

ReceiveRaftApply

注意,只有Leader KV Server才能处理KV Client的请求。但所有的KV Server都会通过kv.applyCh从自己的下层Raft接受提交的日志/快照,且正常情况下所有的KV Server都会以相同顺序执行相同的指令。我们需要开一个协程专门接受Raft过来的applyMsg

func (kv *KVServer) ReceiveRaftApply() {
	for !kv.killed() {
		select {
		case apply := <-kv.applyCh:
			kv.LogPrintf(DDebug, "[ReceiveRaftApply] ApplyMsg: %v", apply)
			if apply.CommandValid {
				kv.rwMutex.Lock()
				if apply.CommandIndex <= kv.lastApplied {
					kv.rwMutex.Unlock()
					continue
				}
				kv.lastApplied = apply.CommandIndex
				op := apply.Command.(Op)
				var opReply OpReply
				if op.OpType != OP_GET && kv.checkDuplicateCommandId(op.CommandId.ClientId, op.CommandId) {
					kv.LogPrintf(DDebug, "[ReceiveRaftApply] PutAppend Duplicate command id: %+v", op.CommandId)
					opReply = kv.latestCmdMap[op.CommandId.ClientId].Reply
				} else {
					opReply = kv.executeCmd(op)
					if op.OpType != OP_GET {
						kv.latestCmdMap[op.CommandId.ClientId] = LastCmdInfo{
							CommandId: op.CommandId,
							Reply:     opReply,
						}
					}
				}
				kv.LogPrintf(DInfo, "[ReceiveRaftApply] Command: %+v, reply: %+v", op, opReply)
				kv.LogPrintf(DInfo, "[ReceiveRaftApply] KV State: %v", kv.kvState)
				if _, isLeader := kv.rf.GetState(); isLeader {
					reqChan := kv.getReqChanWithDefault(apply.CommandIndex)
					reqChan <- opReply
				}
				kv.rwMutex.Unlock()
			} 
		}
	}
}

reqChan最好提供大小为1的缓存。一开始我发现Leader节点总是在KV Sever应用层阻塞,而其他节点则没这个问题。因为Leader节点需要向reqChan发送消息,然后在GetPutAppend函数里处理,而Get等函数是有超时机制的,如果不设置缓冲区,会导致超时后,消息到达却没地方接受,导致发送方阻塞(在ReceiveRaftApply)

func (kv *KVServer) getReqChanWithDefault(commandIdx int) chan OpReply {

	if recvChan, ok := kv.reqChanMap[commandIdx]; ok {
		return recvChan
	}

	recvChan := make(chan OpReply, 1)
	kv.reqChanMap[commandIdx] = recvChan
	return recvChan
}

PutAppend

Leader接受客户端请求后,在kv.recvChanMap为该请求创建一个Channel等待 该请求对应的日志被提交,只有日志被提交后我们才可以回复客户端。

kv.recvChanMap[ComandId]什么时候会有数据过来呢?就在ReceiveRaftApply函数里,该函数统一接受所有applyMsg,然后检查该applyMsg是否有对应的请求Channel,有则发送给Channel。相当于ReceiveRaftApply是一个统一拦截器,对于特殊的applyMsg会转发给Get/Put/Append再进行处理

func (kv *KVServer) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	kv.rwMutex.Lock()
	if kv.checkDuplicateCommandId(args.CommandId.ClientId, args.CommandId) {
		kv.LogPrintf(DDebug, "Duplicate PUT command id: %+v", args.CommandId)
		opReply := kv.latestCmdMap[args.CommandId.ClientId].Reply
		reply.Err = opReply.Err
		kv.rwMutex.Unlock()
		return
	}
	kv.rwMutex.Unlock()

	// Your code here.
	var startIndex, _, isLeader = kv.rf.Start(Op{
		OpType:    args.OpType,
		Key:       args.Key,
		Value:     args.Value,
		CommandId: args.CommandId,
	})

	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}
	// kv.LogPrintf(DInfo, "Receive PutAppend request: %+v.Raft Log Index:%v. Args: %v %v", args.CommandId, startIndex, args.Key, args.Value)

	kv.rwMutex.Lock()
	var reqChan = kv.getReqChanWithDefault(startIndex)
	kv.rwMutex.Unlock()

	select {
	case result := <-reqChan:
		reply.Err = result.Err
	case <-time.After(APPLY_WAIT_INTERVAL):
		reply.Err = ErrTimeout
	}

	kv.rwMutex.Lock()
	close(reqChan)
	delete(kv.reqChanMap, startIndex)
	kv.rwMutex.Unlock()
}

Get

和PutAppend同理

Lab4B

任务目标

Lab4B要求我们对KV Server应用实现快照功能

数据结构

Snapshot

KV Server需要保存自己的KV内存数据库和Server维护的关于每个Client Request的状态。

说白了就是kvStatelatestCmdMap两个字段

快照存储

SnapshotLoop

根据试验要求,我们需要检查Snapshot的大小,如果大于程序为我们传递的maxraftstate,则需要进行快照存储。这里要求我们手动制作快照并调用Raft的Snapshot函数

那么什么时候检测是否需要进行快照并执行呢?在kvState大小改变时,也就是接受Raft的Apply时,在ReceiveRaftApply函数中插入逻辑即可

别忘了在Lab 3D中,Snapshot制作完成后也会通过Apply Channel发送一个ApplyMsg,同样是在ReceiveRaftApply函数中处理

// 判断是否需要 snapshot
if kv.maxraftstate != -1 && kv.rf.StateSize() >= kv.maxraftstate {
	kv.makeSnapshot(apply.CommandIndex)
}
if apply.SnapshotValid {
	kv.rwMutex.Lock()
	kv.installSnapshot(apply.Snapshot)
	kv.lastApplied = apply.SnapshotIndex
	kv.rwMutex.Unlock()
}

installSnapshot

安装Snapshot就是解码的过程

func (kv *KVServer) installSnapshot(snapshot []byte) {
	if snapshot == nil || len(snapshot) <= 0 {
		return
	}

	var buffer = bytes.NewBuffer(snapshot)
	var dec = labgob.NewDecoder(buffer)

	var kvState map[string]string
	var latestCmdMap map[int]CommandId

	if dec.Decode(&kvState) != nil ||
		dec.Decode(&latestCmdMap) != nil {
		kv.logPrintf("reading persister got a problem")
	} else {
		kv.kvState = kvState
		kv.latestCmdMap = latestCmdMap
	}
}

注意KV Server启动的时候应该先检查persister中是否有快照,如果有则安装

kv.installSnapshot(persister.ReadSnapshot())

This website has been running for 3 years 2 months 12 days 15 hours 28 minutes 46 seconds

皖ICP备2021007094号 2021-PRESENT © OceanPresent