OceanPresent

MIT-6.5840 Lab5笔记


前置知识

任务目标

Lab 5要求我们基于Lab3的Raft算法实现一个多Raft组分布式K/V Server以及不同Raft组间的Shard切片迁移功能。本来开摆了不打算做Lab5,但烂尾了又觉得不够优雅对于强迫症很难受,所以还是捡起来继续做完x

用于测试的文件在src/shardkv/test_test.go,运行命令go testgo test -race进行测试。命令go test -v -run ${func_name}对指定测试样例进行测试,也可以在VSCode中进入文件点击Test按钮

文档资料

以下列出完成Lab 5所需要阅读的资料:

  • Lab 5 2024文档 介绍了Lab4的任务和一些规范,同时给出了一些实现提示
  • MIT 6.824 6.824的官方视频课程,完成Lab 4可以看第六、七节,有部分参考
  • Raft论文 一定要保证Lab3中的Raft实现稳如老狗,能通过上千次测试不出错。基础不牢地动山摇,底层的Raft如果有问题上层应用根本没法做
  • Go语言教程 对于有一定编程基础,要快速入门书写Go程序的。这本国内的教程足够,既涵盖了基本语法,也提供了一些常用标准库的API供随时查看
  • src/shardkv/test_test.go 测试过不去的时候可以阅读测试样例源码,看测试具体做了什么检测什么
  • 大佬的博客/可选 代码和理论都写的非常规范漂亮的博客,有地方卡住的时候可以参考

课程程序规范

Lab5主要关注如下文件

  • src/shardctrler/*.go
  • src/shardctrler/test_test.go
  • src/shardkv/*.go
  • src/shardkv/test_test.go 测试文件,一般不需要动。如果某个测试样例过不去可以看该样例的函数,了解该样例大概需要完成什么任务,有助于Debug
  • src/raft/*.go Lab3的Raft,如果在做Lab5的时候遇到了bug,需要检查是不是底层执行出了问题

Lab 5A

任务目标

Lab5实现一个基于Raft的 shardctrler,一个高可用的集群配置管理服务。主要记录了当前每个 raft 组有哪些sever 以及当前每个 shard 被分配到了哪个 raft 组。无论是客户端还是KV存储服务端都需要从shardctrler拉取最新的配置,在共识的配置下进行通信

数据结构

Clerk

对着Lab4猛猛Copy就行,只有函数名不一样

type Clerk struct {
	servers []*labrpc.ClientEnd
	// Your data here.
	rwMutex            sync.RWMutex
	instructionCounter int32
	clientId           int
	lastLeaderId       int
}

shardctrler有4种RPC服务:

  1. Query 查看某个序号的Config,如果序号是-1或大于已知的最大配置数,则shardctrler应回复最新的配置。
  2. Leave 让某些组离开,不再负责KV服务
  3. Join 加入某些组
  4. Move 将某个Shard移动到某个组,这里的移动是指配置上的。shardctrler只负责管理配置文件

ShardCtrler

ShardCtrler对着Lab4的Server猛猛Copy就行,只是换了个具体应用类型,整体架构(例如如何和底层Raft交互,保存日志)不变

Leave、Join、Move操作后应当产生新的配置,而不是在原有的配置上更改。因此shardctrler会按序号以此保存历史配置队列

Config中包含该配置的全局唯一的序号,每个Raft组包含的服务器,不同Shard切片所属的Raft组。Lab5中Shard切片数量固定为NShards=10

type ShardCtrler struct {
	rwMutex sync.RWMutex
	me      int
	rf      *raft.Raft
	applyCh chan raft.ApplyMsg
	dead    int32 // set by Kill()

	// Your data here.
	latestCmdMap map[int]LastCmdInfo
	reqChanMap   map[int]chan OpReply

	configs []Config // indexed by config num
}

type Config struct {
	Num    int              // config number
	Shards [NShards]int     // shard -> gid
	Groups map[int][]string // gid -> servers[]
}

Op

Lab4中KV Server的Op参数几乎都一样,所以硬编码在Op里也没什么问题。ShardCtrler不同的RPC服务的参数不同,将Op的Args改成interface{},当Apply的时候再进行动态类型转换来判断要执行那种RPC服务

type Op struct {
	// Your data here.
	OpType    OperationType
	CommandId CommandId
	Args      interface{} // JoinArgs, LeaveArgs, MoveArgs, QueryArgs
}

type OpReply struct {
	Err         Err
	Config      Config
}

Join

RPC服务的参数Lab5的文档都写了,按要求写出即可。不要忘了把Lab4中一些基建搬过来,例如在参数中添加CommandId帮助去重

type JoinArgs struct {
	CommandId CommandId
	Servers   map[int][]string // new GID -> servers mappings
}

type JoinReply struct {
	WrongLeader bool
	Err         Err
}

Sever处理请求的函数逻辑整体和Lab4一样,猛猛Copy后修改成对Config进行操作。像ReceiveRaftApply函数完全不需要改直接拿来用就行

func (sc *ShardCtrler) Join(args *JoinArgs, reply *JoinReply) {
	sc.rwMutex.Lock()
	if sc.checkDuplicateCommandId(args.CommandId.ClientId, args.CommandId) {
		sc.LogPrintf(DDebug, "Duplicate JOIN command id: %+v", args.CommandId)
		opReply := sc.latestCmdMap[args.CommandId.ClientId].Reply
		reply.Err = opReply.Err
		sc.rwMutex.Unlock()
		return
	}
	sc.rwMutex.Unlock()

	var startIndex, _, isLeader = sc.rf.Start(Op{
		OpType:    OP_JOIN,
		CommandId: args.CommandId,
		Args:      *args,
	})

	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}

	sc.rwMutex.Lock()
	var reqChan = sc.getReqChanWithDefault(startIndex)
	sc.rwMutex.Unlock()

	select {
	case result := <-reqChan:
		reply.Err = result.Err
	case <-time.After(APPLY_WAIT_INTERVAL):
		reply.Err = ErrTimeout
	}

	sc.LogPrintf(DDebug, "ShardCtrler.Join Finish: args=%+v, reply=%+v", args, reply)

	sc.rwMutex.Lock()
	close(reqChan)
	delete(sc.reqChanMap, startIndex)
	sc.rwMutex.Unlock()
}

Leave、Query、Move

完全同理

Rebalance算法

Lab 5A的难点在于RPC操作提交后,对Config进行修改的操作,对应Lab4的executeCmd函数。执行完操作后要重新分配各Group负责的Shard

func (sc *ShardCtrler) executeCmd(op Op) OpReply {
	var reply OpReply
	switch op.OpType {
	case OP_JOIN:
		args := op.Args.(JoinArgs)
		newConfig := sc.joinBalance(sc.configs[len(sc.configs)-1], args.Servers)
		sc.configs = append(sc.configs, newConfig)
		reply.Config = newConfig
		reply.Err = OK
	case OP_LEAVE:
		args := op.Args.(LeaveArgs)
		newConfig := sc.leaveBalance(sc.configs[len(sc.configs)-1], args.GIDs)
		sc.configs = append(sc.configs, newConfig)
		reply.Config = newConfig
		reply.Err = OK
	case OP_MOVE:
		args := op.Args.(MoveArgs)
		newConfig := sc.moveBalance(sc.configs[len(sc.configs)-1], args.Shard, args.GID)
		sc.configs = append(sc.configs, newConfig)
		reply.Config = newConfig
		reply.Err = OK
	case OP_QUERY:
		// Here, return the latest config. as the query number is out of range
		args := op.Args.(QueryArgs)
		if args.Num >= INVALID_CONFIG_INDEX && args.Num < len(sc.configs) {
			reply.Config = sc.configs[args.Num]
		} else {
			reply.Config = sc.configs[len(sc.configs)-1]
		}
		reply.Err = OK
	}
	return reply
}

Rebalance算法要求在增删 raft 组后需要将 shard 分配地更为均匀且尽量产生较少的迁移。 均匀的意思就是 让每个Group至少承担NShards / GroupNum个Shard;如果不能整除,挑选出一些Group再各多承担1个Shard。在此基础上尽量不改变每个Group承担的Shard,毕竟Shard修改所属后需要在新主和旧主间进行迁移。

个人觉得自己的写法还是清晰易懂的。首先深拷贝旧Config,更新Num和Group,注意此时newConfig.Shards和旧Config还是一样的。接下来要对Shard进行分配(reAllocateShards函数):

  1. 遍历c.Shards。将已经被移除的Group所属的Shard都标记为INVALID_GID
  2. 根据c.Groups建立gid2shards索引,gid不包含INVALID_GID。如果len(gid2shards)==0说明newConfig没有Group了,c.Shards全为INVALID_GID直接退出。
  3. 计算num_shards_per_group := int(NShards / num_groups),remaining_shards := NShards % num_groups。还需要考虑num_groups > NShards的情况,此时一部份Group均分Shard各拿1个,一部份Group闲着就行。创建shardBuffer用于存储待分配的Shard,给那些Shard数小于num_shards_per_group的Group
  4. 寻找Shard数大于numMaxShards的Group。如果此时remainCount大于0,说明多余的部分还没平均完,numMaxShards=num_shards_per_group+1。这样该Group就均摊了1个NShards % GroupNum多余的部分,该Group也就可以少迁移1个Shard,这种贪心的算法保证了产生较少的迁移。将多出的Shard存进shardBuffer
  5. 寻找Shard数小于num_shards_per_group的Group。从shardBuffer补充Shard数量至num_shards_per_group即可
func (sc *ShardCtrler) joinBalance(config Config, newServers map[int][]string) Config {

	// Create a new config based on the latest config
	newConfig := config.Copy()
	newConfig.Num = config.Num + 1

	num_unseen := 0
	// Add new servers to the new config
	for gid, servers := range newServers {
		if _, exists := config.Groups[gid]; !exists {
			newConfig.Groups[gid] = servers
			num_unseen++
			sc.LogPrintf(DDebug, "ShardCtrler.joinBalance: New group %d with servers %v", gid, servers)
		} else {
			newConfig.Groups[gid] = append(newConfig.Groups[gid], servers...)
		}
	}
	if num_unseen > 0 {
		newConfig.reAllocateShards()
	}

	sc.LogPrintf(DDebug, "ShardCtrler.joinBalance: newConfig=%+v", newConfig)

	return newConfig
}

func (c *Config) reAllocateShards() {
	for i := 0; i < len(c.Shards); i++ {
		if c.Shards[i] == INVALID_GID || c.Groups[c.Shards[i]] == nil {
			c.Shards[i] = INVALID_GID // Reset invalid shards
		}
	}
	gid2shards := c.buildGid2Shards()
	num_groups := len(gid2shards)
	if num_groups == 0 {
		c.Shards = [NShards]int{}
		return
	}
	num_shards_per_group := int(NShards / num_groups)
	remaining_shards := NShards % num_groups
	if num_groups > NShards {
		num_shards_per_group = 1
		remaining_shards = 0
	}
	// Balance shards among groups
	gids := make([]int, 0)
	for gid := range gid2shards {
		gids = append(gids, gid)
	}
	sort.Slice(gids, func(i, j int) bool {
		return gids[i] > gids[j]
	})
	shardBuffer := make([]int, 0, NShards)
	for shard, gid := range c.Shards {
		if gid == INVALID_GID {
			shardBuffer = append(shardBuffer, shard)
		}
	}
	remainCount := remaining_shards
	for _, gid := range gids {
		numMaxShards := num_shards_per_group
		if remainCount > 0 {
			numMaxShards += 1
			remainCount -= 1
		}
		if len(gid2shards[gid]) > numMaxShards {
			shardBuffer = append(shardBuffer, gid2shards[gid][numMaxShards:]...)
			gid2shards[gid] = gid2shards[gid][:numMaxShards]
		}
	}
	for _, gid := range gids {
		numMaxShards := num_shards_per_group
		if len(gid2shards[gid]) < numMaxShards && len(shardBuffer) > 0 {
			idx := numMaxShards - len(gid2shards[gid])
			gid2shards[gid] = append(gid2shards[gid], shardBuffer[:idx]...)
			shardBuffer = shardBuffer[idx:]
		}
	}
	for _, gid := range gids {
		for _, shard := range gid2shards[gid] {
			c.Shards[shard] = gid
		}
	}

}

Lab 5B

任务目标

当ShardCtrler更改分片时,在Raft Group之间移动Shard分片,并提供线性一致性k/v客户端操作。

数据结构

Clerk

老生常谈了,和Lab4、Lab5A一模一样,猛猛Copy。

type Clerk struct {
	sm       *shardctrler.Clerk
	config   shardctrler.Config
	make_end func(string) *labrpc.ClientEnd
	// You will have to modify this struct.
	rwMutex            sync.RWMutex
	instructionCounter int32
	clientId           int
}

ShardKV

shardStateshardLatestCmdMap相比Lab4多了一层ShardID映射,每个Group只负责自己的Shard。waitShards用来记录当前需要和哪些Group沟通迁移哪些Shard。

type ShardKV struct {
	rwMutex      sync.RWMutex
	me           int
	rf           *raft.Raft
	applyCh      chan raft.ApplyMsg
	make_end     func(string) *labrpc.ClientEnd
	gid          int
	ctrlers      []*labrpc.ClientEnd
	maxraftstate int // snapshot if log grows this big

	// Your definitions here.
	dead        int32 // set by Kill()
	lastApplied int
	// kvState     map[string]string

	shardLatestCmdMap map[int]map[int]LastCmdInfo // shard-> clientId -> latest command id
	reqChanMap        map[int]chan OpReply
	mck               *shardctrler.Clerk // shard controller clerk
	config            shardctrler.Config // current config
	lastConfig        shardctrler.Config
	shardState        map[int]map[string]string // shard id -> key-value state
	shards            map[int]bool              // shards that this server is responsible for
	waitShards        map[int]map[int]bool      //gid -> shard ids that this server is waiting to receive
}

客户端请求

唯一区别就是每个操作需要通过Key找到对应的Shard,再根据Config查找对于的Group去发操作。如果找错了Group,需要从ShardCtrler拉取最新的配置重试。每个请求可以带一点冗余信息方便Debug或判断请求是否过时

type GetArgs struct {
	Gid       int
	ConfigNum int
	Key       string
	// You'll have to add definitions here.
	CommandId CommandId
}

func (ck *Clerk) Get(key string) string {
	args := GetArgs{}
	args.Key = key
	args.CommandId = ck.genClientCommandId()

	for {
		shard := key2shard(key)
		gid := ck.config.Shards[shard]
		args.Gid = gid
		args.ConfigNum = ck.config.Num
		if servers, ok := ck.config.Groups[gid]; ok {
			// try each server for the shard.
		requestLoop:
			for si := 0; si < len(servers); si++ {
				srv := ck.make_end(servers[si])
				var reply GetReply
				if srv.Call("ShardKV.Get", &args, &reply) {
					switch reply.Err {
					case OK, ErrNoKey:
						ck.logPrintf(DInfo, "[Get] OK: shard : %d, to gid: %d  %+v, reply: %+v", shard, gid, args, reply)
						return reply.Value
					case ErrWrongGroup:
						ck.logPrintf(DInfo, "[Get] ErrWrongGroup: %+v", args)
						break requestLoop
					case ErrNotReadyToServe:
						ck.logPrintf(DInfo, "[Get] ErrNotReadyToServe: %+v", args)
						break requestLoop
					case ErrWrongLeader, ErrTimeout:
						continue
					}
				}
				// ... not ok, or ErrWrongLeader
			}
		}
		time.Sleep(100 * time.Millisecond)
		// ask controller for the latest configuration.
		ck.config = ck.sm.Query(-1)
	}

	return ""
}

Config拉取

PullConfigLoop

单开一个Go协程不断拉取当前配置的下一个配置,因为Config要求是线性更新,根据Lab提示Config的更新也要保存到Raft日志中。Config的更新会涉及Shard的变更,这样可以保证一致性。如果当前Config的Shard还没迁移完,则不能再拉取新的Config,这也是为了保证Config的线性更新

func (kv *ShardKV) isReadyToConfig() bool {
	return len(kv.waitShards) == 0
}
func (kv *ShardKV) PullConfigLoop() {
	// ask controller for the latest configuration.

	for !kv.killed() {
		if _, isLeader := kv.rf.GetState(); isLeader {
			kv.rwMutex.RLock()
			currentVersion := kv.config.Num
			kv.rwMutex.RUnlock()
			latestConfig := kv.mck.Query(currentVersion + 1)
			kv.rwMutex.RLock()
			if latestConfig.Num == kv.config.Num+1 && kv.isReadyToConfig() {
				kv.rwMutex.RUnlock()
				// kv.LogPrintf(DInfo, "PullConfig: New config received: %+v", latestConfig)
				kv.rf.Start(latestConfig.Copy())
			} else {
				kv.rwMutex.RUnlock()
			}
		}
		time.Sleep(80 * time.Millisecond)
	}
}

ReceiveRaftApply

依据Raft应用编程规范,在ReceiveRaftApply接受到Raft日志Apply后再进行上层应用状态更新。后续Shard拉取也在这里调用

func (kv *ShardKV) ReceiveRaftApply() {
	for !kv.killed() {
		select {
		case apply := <-kv.applyCh:
    .....
			if apply.CommandValid {
				if op, ok := apply.Command.(Op); ok {
					kv.applyOp(apply, op)
				} else if config, ok := apply.Command.(shardctrler.Config); ok {
					kv.applyConfig(apply, config)
				} else if shardReply, ok := apply.Command.(PullShardReply); ok {
					kv.applyShard(apply, shardReply)
				} else {
					panic("Panic! ReceiveRaftApply: unknown command type")
				}
			}
		}
    ....
	}
}

applyConfig

应用日志时,更换config和lastConfig。同时根据两个Config的差异可以统计出哪些Shard要删除、拉取。Lab5如果不做Challenge不删Shard也可以,保留着可以简化Lab少出点问题。

func (kv *ShardKV) applyConfig(apply raft.ApplyMsg, config shardctrler.Config) {
	kv.rwMutex.Lock()
	defer kv.rwMutex.Unlock()

	if config.Num != kv.config.Num+1 {
		kv.LogPrintf(DInfo, "[applyConfig] Ignore non-sequential config: %+v, kv.config: %+v", config, kv.config)
		return
	}

	kv.LogPrintf(DInfo, "[applyConfig] Apply new config: %+v", config)
	kv.lastConfig = kv.config
	kv.config = config.Copy()

	// Handle shard transfer logic here if needed.
	deleteShards := make([]int, 0, shardctrler.NShards)
	unseenShards := make([]int, 0, shardctrler.NShards)
	newShards := make([]int, 0, shardctrler.NShards)

	for shardId, gid := range kv.config.Shards {
		if gid == kv.gid {
			// This shard is assigned to this server.
			if gid != kv.lastConfig.Shards[shardId] {
				unseenShards = append(unseenShards, shardId)
			}
			newShards = append(newShards, shardId)
		} else {
			// This shard is not assigned to this server.
			if gid == kv.lastConfig.Shards[shardId] {
				deleteShards = append(deleteShards, shardId)
			}
		}
	}

	kv.shards = make(map[int]bool)
	for _, shardId := range newShards {
		kv.shards[shardId] = true
	}
	kv.waitShards = make(map[int]map[int]bool)
	if kv.lastConfig.Num != 0 {
		for _, shardId := range unseenShards {
			if _, ok := kv.waitShards[kv.lastConfig.Shards[shardId]]; !ok {
				kv.waitShards[kv.lastConfig.Shards[shardId]] = make(map[int]bool)
			}
			kv.waitShards[kv.lastConfig.Shards[shardId]][shardId] = true
		}
		kv.LogPrintf(DInfo, "[applyConfig] Waiting for shards: %+v", kv.waitShards)
	}
	// 判断是否需要 snapshot
	if kv.maxraftstate != -1 && kv.rf.StateSize() >= kv.maxraftstate {
		kv.makeSnapshot(apply.CommandIndex)
	}
}

Shard迁移

服务可用

拉取新的Config后,如果Group发现自己不再负责某个Shard,应该立即停用服务;如果Group发现自己开始负责某个Shard,只有从旧主迁移该Shard完毕后才可以开始提供服务。

这里有个坑是最好在执行请求前(即RPC服务函数)提交日志前(applyOp函数)都进行isReadyToServeShardcheckDuplicateCommandId判断。否则并发测试和线性一致性测试可能过不了。因为有可能rf.Start时Group还负责该Shard,日志apply时却不负责了,同时又有Shard迁移,新主可能拿不到该操作的结果。

func (kv *ShardKV) isReadyToServeShard(shardId int) Err {

	gid := kv.lastConfig.Shards[shardId]
	if kv.config.Shards[shardId] != kv.gid {
		return ErrWrongGroup
	}
	if shards, ok := kv.waitShards[gid]; ok {
		if _, ok := shards[shardId]; ok {
			return ErrNotReadyToServe
		}
	}
	return OK
}

func (kv *ShardKV) PutAppend(args *PutAppendArgs, reply *PutAppendReply) {
	shardId := key2shard(args.Key)
	kv.rwMutex.Lock()
	if kv.config.Num < args.ConfigNum {
		reply.Err = ErrWrongConfig
		kv.rwMutex.Unlock()
		return
	}
	if err := kv.isReadyToServeShard(shardId); err != OK {
		reply.Err = err
		kv.rwMutex.Unlock()
		return
	}
	if kv.checkDuplicateCommandId(shardId, args.CommandId.ClientId, args.CommandId) {
		kv.LogPrintf(DDebug, "Duplicate PUT command id: %+v", args.CommandId)
		opReply := kv.shardLatestCmdMap[shardId][args.CommandId.ClientId].Reply
		reply.Err = opReply.Err
		kv.rwMutex.Unlock()
		return
	}
	kv.rwMutex.Unlock()
  .....
}

func (kv *ShardKV) applyOp(apply raft.ApplyMsg, op Op) {
	kv.rwMutex.Lock()
	var opReply OpReply
	shardId := key2shard(op.Key)
	if err := kv.isReadyToServeShard(shardId); err == OK {
		if op.OpType != OP_GET && kv.checkDuplicateCommandId(shardId, op.CommandId.ClientId, op.CommandId) {
      ....
    }
  }
}

PullShardLoop

单开一个Go协程检测是否有Shard需要拉取,如果有,则发送PullShard RPC请求。这里是Group和Group间的通信

func (kv *ShardKV) PullShardLoop() {
	// ask controller for the latest configuration.
	// if this server is not responsible for the shard, return.
	for !kv.killed() {
		if _, isLeader := kv.rf.GetState(); isLeader {
			kv.rwMutex.Lock()
			for gid, shardIds := range kv.waitShards {
				args := &PullShardArgs{
					ConfigNum: kv.config.Num,
				}
				args.Gid = gid
				shardIdsSlice := make([]int, 0, len(shardIds))
				for shardId := range shardIds {
					shardIdsSlice = append(shardIdsSlice, shardId)
				}
				args.ShardIds = shardIdsSlice
				go kv.askForShard(gid, args)
			}
			kv.rwMutex.Unlock()
		}
		time.Sleep(80 * time.Millisecond)
	}
}

PullShard

为了保证服务去重,要将LatestCmdMap也迁移。RPC函数一些注意事项和Lab3类似,例如请求前后判断自己是否还是Leader,判断自己的Config序号和请求的Config序号大小关系。传输引用类型数据时,最好做下深拷贝,否则会出现不同地方保存相同的引用数据被预期外修改

type PullShardArgs struct {
	Gid       int
	ShardIds  []int
	ConfigNum int
}

type PullShardReply struct {
	Gid          int
	ConfigNum    int
	ShardIds     []int
	ShardState   map[int]map[string]string   // shardId -> key -> value
	LatestCmdMap map[int]map[int]LastCmdInfo // shardId -> clientId -> LastCmdInfo
	Err          Err
}
func (kv *ShardKV) PullShard(args *PullShardArgs, reply *PullShardReply) {
	kv.rwMutex.Lock()
	defer kv.rwMutex.Unlock()
	reply.Gid = kv.gid
	reply.ConfigNum = kv.config.Num
	reply.ShardIds = args.ShardIds

	_, isLeader := kv.rf.GetState()
	if !isLeader {
		reply.Err = ErrWrongLeader
		return
	}

	if args.ConfigNum > kv.config.Num {
		reply.Err = ErrWrongConfig
		return
	}

	reply.ShardState = make(map[int]map[string]string)
	reply.LatestCmdMap = make(map[int]map[int]LastCmdInfo)
	for _, shardId := range args.ShardIds {
		if _, ok := kv.shardState[shardId]; !ok {
			reply.Err = ErrWrongGroup
			return
		}
		if _, ok := kv.shardLatestCmdMap[shardId]; !ok {
			reply.Err = ErrWrongGroup
			return
		}
		reply.ShardState[shardId] = deepCopyStringMapString(kv.shardState[shardId])
		reply.LatestCmdMap[shardId] = deepCopyStringMapCmdInfo(kv.shardLatestCmdMap[shardId])
	}
	reply.Err = OK
}

applyShard

为保证线性一致性,Shard迁移也应该放入Raft日志中。因此调用完ShardKV.PullShard请求后应该发送Raft日志,等到日志Apply后再真正修改shard kv状态

func (kv *ShardKV) askForShard(gid int, args *PullShardArgs) {
	reply := &PullShardReply{}
	if servers, ok := kv.lastConfig.Groups[gid]; ok {
		for si := 0; si < len(servers); si++ {
			srv := kv.make_end(servers[si])
			if srv.Call("ShardKV.PullShard", args, reply) {
				switch reply.Err {
				case OK:
					if args.ConfigNum != kv.config.Num {
						kv.LogPrintf(DInfo, "[askForShard] ConfigNum mismatch: args:%+v reply:%+v", args, reply)
						return
					}
					if reply.ConfigNum < kv.config.Num {
						kv.LogPrintf(DInfo, "[askForShard] ConfigNum too old: args:%+v reply:%+v", args, reply)
						return
					}
					_, _, isLeader := kv.rf.Start(*reply)
					if !isLeader {
						return
					}
					kv.LogPrintf(DInfo, "[askForShard] OK: args:%+v reply:%+v", args, reply)
					return
				case ErrWrongGroup:
					kv.LogPrintf(DInfo, "[askForShard] ErrWrongGroup: %+v", args)
					return
          ....
				}
			}
			// ... not ok, or ErrWrongLeader
		}
	}
}

func (kv *ShardKV) applyShard(apply raft.ApplyMsg, shardReply PullShardReply) {
	kv.rwMutex.Lock()
	defer kv.rwMutex.Unlock()

	if shardReply.ConfigNum < kv.config.Num {
		kv.LogPrintf(DInfo, "[applyShard] Ignore outdated shard reply: %+v, current config: %+v", shardReply, kv.config)
		return
	}

	for _, shardId := range shardReply.ShardIds {
		if _, ok := kv.shardState[shardId]; !ok {
			continue
		}
		if _, ok := kv.shardLatestCmdMap[shardId]; !ok {
			continue
		}
		kv.shardState[shardId] = deepCopyStringMapString(shardReply.ShardState[shardId])
		kv.shardLatestCmdMap[shardId] = deepCopyStringMapCmdInfo(shardReply.LatestCmdMap[shardId])
	}
	kv.LogPrintf(DInfo, "[applyShard] Apply shard reply: %+v", shardReply)

	for _, shardId := range shardReply.ShardIds {
		delete(kv.waitShards[shardReply.Gid], shardId)
		if len(kv.waitShards[shardReply.Gid]) == 0 {
			delete(kv.waitShards, shardReply.Gid)
		}
	}

	// 判断是否需要 snapshot
	if kv.maxraftstate != -1 && kv.rf.StateSize() >= kv.maxraftstate {
		kv.makeSnapshot(apply.CommandIndex)
	}
}

踩坑

回望整个6.5840 5个Lab。最大的体会就是自己对书写并发程序完全不熟悉,即使加锁加麻了也会有意想不到的数据读写问题,这更多是自己在写代码时过于想当然了。除了Lab3 Raft的算法逻辑容易理解错需要多读论文多参考其他人的博客,其他的Lab从逻辑来说都比较简单。最难的莫过于理解自己每一行代码可能对其他地方产生什么影响以及在几千行的日志中排查分布式节点出现了怎样的通信错误,它们的数据在哪里出现了错误的流通。

一个好的事件循环代码框架也是很重要的,Lab4、5的客户端、服务端运行框架几乎是一模一样的,如果一开始就能有一个组织良好的框架,那么添加每个新逻辑受到的阻力是非常小的。

这里记录一点自己踩的数据读写的坑

并行请求

Lab中涉及很多并行向多个Server发一轮请求的逻辑,要注意每一轮发送的所有请求应该提前构建好再发送,而不是在go协程里再构造。如果再go协程里再读取对象数据构造请求参数,此时对象数据例如Term可能已经被修改了,导致同一轮发送的请求参数Term不一样。这也是为什么很多人的写法会用go func(…){}(…)闭包的写法

错误的写法:

for server := range rf.peers {
	go rf.sendAppendEntry(server)
}

func (rf *Raft) sendAppendEntry(server int) {
		args:=.....
}

正确的写法:

for server := range rf.peers {
	args := &AppendEntriesArgs{
		Term:         term,
		LeaderId:     rf.me,
		LeaderCommit: rf.commitIndex,
		PrevLogIndex: prevIdx,
		PrevLogTerm:  prevTerm,
		Entries:      append([]LogEntry(nil), rf.state.tail(prevIdx+1)...),
	}
	go rf.sendAppendEntry(server, args)
}

数据闭包

同样是构造请求参数,之前想当然将参数构造写在最上层作用域,搞得自己很聪明懂得节省内存增加性能。结果发现服务发送了两个同样的请求,因为他们引用的是同一个args参数,第一个请求在发送前参数已经被for循环覆盖掉了

错误的写法:

args := &PullShardArgs{
	ConfigNum: kv.config.Num,
}
for gid, shardIds := range kv.waitShards {
	args.Gid = gid
	go kv.askForShard(gid, args)
}

正确的写法:

for gid, shardIds := range kv.waitShards {
	args := &PullShardArgs{
		ConfigNum: kv.config.Num,
	}
	args.Gid = gid
	go kv.askForShard(gid, args)
}

This website has been running for 3 years 2 months 12 days 15 hours 28 minutes 46 seconds

皖ICP备2021007094号 2021-PRESENT © OceanPresent