MIT-6.5840 Lab3笔记
前置知识
任务目标
Lab 3
要求我们实现Raft算法并实现持久化和日记压缩
用于测试的文件在src/raft/test_test.go
,运行命令go test
或go test -race
进行测试。命令go test -v -run ${func_name}
对指定测试样例进行测试,也可以在VSCode中进入文件点击Test按钮
文档资料
以下列出完成Lab 3所需要阅读的资料:
- Lab 3 2024文档 介绍了Lab3的任务和一些规范,同时给出了一些实现提示
- MIT 6.824 6.824的官方视频课程,完成Lab 3需要看第六、七节
- Raft官方网站 介绍了Raft,页面上还有一个Raft的可视化程序,便于理解
- Raft论文 Raft论文,要多看几遍,尤其是论文里的几张图,最好严格实现不要自己DIY
- Raft可视化 另一个Raft可视化程序,同时有Raft算法的分模块简单讲解,有利于Lab3A和Lab3B
- Go语言教程 对于有一定编程基础,要快速入门书写Go程序的。这本国内的教程足够,既涵盖了基本语法,也提供了一些常用标准库的API供随时查看
- src/raft/test_test.go 测试过不去的时候可以阅读测试样例源码,看测试具体做了什么检测什么
课程程序规范
Lab3主要关注如下文件
- src/raft/raft.go Raft算法文件,需要在这里完成各操作的具体实现
- src/raft/persister.go 持久化工具,一般不需要动。在Lab3C和Lab3D才用得到
- src/raft/config.go 一些测试用到的函数,把它当作测试文件的源码之一。阅读方便理解测试
- src/raft/test_test.go 测试文件,一般不需要动。如果某个测试样例过不去可以看该样例的函数,了解该样例大概需要完成什么任务,有助于Debug
Lab3A
任务目标
Lab 3A
要求我们实现Raft算法的选举和Leader的心跳
数据结构
按照论文里的Figure来即可,因为每张图都贯穿整个Lab,因此不在这里重复放了,看论文即可。
Raft
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
rwMutex sync.RWMutex
peers []*labrpc.ClientEnd // RPC end points of all peers
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (3A, 3B, 3C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int
votedFor int
logs []LogEntry
// 0 follower 1 candidate 2 leader
state RaftState
electionTimer time.Time
voteGotten int
}
逻辑梳理
尽管一切实现细节都在图里,但因为各个图是按模块展示的,具体实现需要把这些图综合在一起,很容易对一些细节遗漏或者理不清细节的顺序。下面列举了一些Lab3A的逻辑梳理
- Follower有两种超时类型。1. 超过ElectionTimeout没收到Leader的心跳。2. 超过ElectionTimeout没收到Candidate的请求投票。同时意味着Follower接收到这两类信号后应重制自己的ElectionTimer。Follower超时后转为Candidate。由于英语水平太差误解了论文上这段话,以为Follower接收到Candidate的请求投票后就会变成Candidate
- 当currentTerm>arg.Term时,直接投票。因为Follower并没有在这轮Term中投过票,不需要考虑之前的voteFor值。当currentTerm==arg.Term时,需要检查voteFor
- Server在某个状态发送了RPC请求后,在接受RPC恢复时需要检查自己的状态是否一致,若有变化则要丢弃回复不做处理。
选举投票
RequestVoteArgs
type RequestVoteArgs struct {
// Your data here (3A, 3B).
Term int
CandidateId int
}
RequestVoteReply
type RequestVoteReply struct {
// Your data here (3A).
Term int
VoteGranted bool
}
askForVote
func (rf *Raft) askForVote(server int) {
var args RequestVoteArgs
var reply RequestVoteReply
args.Term = rf.currentTerm
args.CandidateId = rf.me
if rf.sendRequestVote(server, &args, &reply) {
// 如果当前不是 CANDIDATE 状态,说明已经发生了角色变化,退出
if rf.state != STATE_CANDIDATE || args.Term != rf.currentTerm {
return
}
// 如果收到的任期比当前大,变为 FOLLOWER 并更新任期
if reply.Term > rf.currentTerm {
// your code
return
}
// 如果得到了投票
if reply.VoteGranted {
// your code
// 如果获得了大多数投票,成为 Leader
if rf.voteGotten > len(rf.peers)/2 {
// your code
}
}
}
}
RequestVote
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// 如果请求者的任期小于当前任期,拒绝投票
// 检查是否已经投过票给其他人
if args.Term < rf.currentTerm ||
(args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.CandidateId) {
// your code
return
}
// 如果请求者的任期大于当前任期,更新当前任期并转换为 FOLLOWER
if args.Term > rf.currentTerm {
// your code
}
// 如果满足投票条件,投票给请求者
// 重置选举超时
// your code
reply.VoteGranted = true
reply.Term = rf.currentTerm
}
心跳包
AppendEntriesArgs
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
// empty for heartbeat
Entries []LogEntry
LeaderCommit int
}
AppendEntriesReply
type AppendEntriesReply struct {
Term int
Success bool
}
sendAppendEntry
func (rf *Raft) sendAppendEntry(server int) {
var args AppendEntriesArgs
var reply AppendEntriesReply
args.Term = rf.currentTerm
args.LeaderId = rf.me
args.Entries = make([]LogEntry, 0)
rf.sendAppendEntries(server, &args, &reply)
if rf.state != STATE_LEADER {
return
}
if !reply.Success {
if reply.Term > rf.currentTerm {
// your code
}
}
}
AppendEntries
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
if rf.currentTerm > args.Term {
// your code
} else if rf.currentTerm < args.Term {
// your code
return
} else if rf.currentTerm == args.Term {
// your code
// heartbeat
if len(args.Entries) <= 0 {
// your code
return
}
}
}
Lab3B
任务目标
Lab 3B
要求我们实现Raft算法的日志同步,实现一致性算法
数据结构
Raft
相比Lab3A,Lab3B需要用到的属性多了一些,但还是源自论文里的图。
注意我的实现里,日志数组的编号是从0开始的,也就是和数组的索引一致。
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
rwMutex sync.RWMutex
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (3A, 3B, 3C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int
votedFor int
logs []LogEntry
commitIndex int
lastApplied int
// for leader only
nextIndex []int
// for leader only
matchIndex []int
// 0 follower 1 candidate 2 leader
state RaftState
electionTimer time.Time
voteGotten int
applyCh chan ApplyMsg
}
逻辑梳理
- MatchIndex和NextIndex其实没有太大关系。NextIndex表示Leader下一次要给某Follower发送的日志编号,注意是下一次发送,也就是当前还未发送。MatchIndex表示某Follower和Leader当前达成共识的最后一个日志编号。MatchIndex[i]=3表示第i个Follower的第[0, 3]个日志达成了共识
- 正常情况下。Leader接受一个日志后,将它复制给其他Follower,如果大多数Follower都成功复制了该日志,则Leader提交(Commit)该日志,然后应用(Apply)该日志。后续Follower根据心跳包中的LeaderCommitId字段来判断自己是否要提交并应用一些日志。
注意Leader和Follower提交同一个日志的顺序 - 本实验中提交(Commit)和应用(Apply)的时机没有太多区别,Commit一个日志后立即Apply该日志即可
选举投票
RequestVoteArgs
Lab3B需要在RequestVoteArgs中标识Candidate最后一个日志的Index和Term
type RequestVoteArgs struct {
// Your data here (3A, 3B).
Term int
CandidateId int
LastLogIndex int
LastLogTerm int
}
RequestVote
RequestVote需要额外检查一下自己的日志是否比当前服务器更新,如果是的话,则拒绝投票(不需要重置计时器)
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
rf.rwMutex.Lock()
defer rf.rwMutex.Unlock()
// 如果请求者的任期小于当前任期,拒绝投票
// 检查是否已经投过票给其他人,或者日志是否比当前服务器更新
if args.Term < rf.currentTerm ||
(args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.CandidateId) ||
!rf.checkLastLogUpToDate(args.LastLogTerm, args.LastLogIndex) {
if args.Term > rf.currentTerm {
// your code
}
reply.Term = rf.currentTerm
reply.VoteGranted = false
return
}
// 如果请求者的任期大于当前任期,更新当前任期并转换为 FOLLOWER
if args.Term > rf.currentTerm {
// 重置投票记录
// your code
}
// 如果满足投票条件,投票给请求者
// 重置选举超时
// your code
reply.VoteGranted = true
reply.Term = rf.currentTerm
}
checkLastLogUpToDate
// 检查候选者的日志是否比当前服务器更新
// 返回 true 表示候选者的日志是最新的
func (rf *Raft) checkLastLogUpToDate(lastLogTerm int, lastLogIndex int) bool {
// 当前服务器的最后一个日志条目
lastTerm := -1
if len(rf.logs) > 0 {
lastTerm = rf.logs[len(rf.logs)-1].Term
}
// 比较日志条目的任期,如果任期不同,则任期大的日志更新
if lastLogTerm != lastTerm {
return lastLogTerm > lastTerm
}
// 如果任期相同,则比较日志索引
return lastLogIndex >= len(rf.logs)-1
}
日志同步
AppendEntriesArgs
type AppendEntriesArgs struct {
Term int
LeaderId int
PrevLogIndex int
PrevLogTerm int
// empty for heartbeat
Entries []LogEntry
LeaderCommit int
}
AppendEntriesReply
type AppendEntriesReply struct {
Term int
Success bool
ConflictIndex int
ConflictTerm int
}
sendAppendEntry
sendAppendEntry额外增加了很多逻辑,难点在于如何更新NextIndex和MatchIndex。
如果Follower返回失败,最基本的做法是令NextIndex[i]–。我这里用到了一个加速的技术,在视频课里提到了,当Follower发现不匹配后,需要在Reply加入冲突的日志Index和Term。Leader根据
- 自己含有XTerm的日志
- 自己不含有XTerm的日志
- Follower日志过短(无法查询第PrevLogIndex个日志)
三种情况加速更新NextIndex
func (rf *Raft) sendAppendEntry(server int) {
var args AppendEntriesArgs
var reply AppendEntriesReply
args.Term = rf.currentTerm
args.LeaderId = rf.me
args.LeaderCommit = rf.commitIndex
// 5.3 start from -1
var prevLogIdx = rf.nextIndex[server] - 1
args.PrevLogIndex = prevLogIdx
if len(rf.logs)-1 >= prevLogIdx+1 {
args.Entries = make([]LogEntry, len(rf.logs[(prevLogIdx+1):]))
copy(args.Entries, rf.logs[(prevLogIdx+1):])
} else {
args.Entries = make([]LogEntry, 0)
}
if prevLogIdx >= 0 {
args.PrevLogTerm = rf.logs[prevLogIdx].Term
} else {
args.PrevLogTerm = -1
}
// rf.LogPrintf("Leader NextIndex %v matchIndex %v", rf.nextIndex, rf.matchIndex)
if rf.sendAppendEntries(server, &args, &reply) {
if rf.state != STATE_LEADER || args.Term != rf.currentTerm {
return
}
if reply.Success {
rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
rf.nextIndex[server] = rf.matchIndex[server] + 1
for N := len(rf.logs) - 1; N > rf.commitIndex; N-- {
// 检查N,如果大多数都达成共识,则Commit
// your code
}
} else {
if reply.Term > rf.currentTerm {
// your code
} else {
// 5.3
nextIdx := rf.lastIdxByTerm(reply.ConflictTerm)
if nextIdx != -1 {
rf.nextIndex[server] = nextIdx + 1
} else {
rf.nextIndex[server] = max(reply.ConflictIndex, 0)
}
}
}
}
}
AppendEntries
AppendEntries额外的逻辑在于判断自己的日志和Leader的日志在PrevLogIndex号上是否有冲突。同时删除冲突的所有日志,并补充上所有缺少的日志。
检查冲突主要有三步:
- 查看自己是否有PrevLogIndex号日志,如果没有说明自己日志过短
- 查看自己的PrevLogIndex号日志和Leader的PrevLogIndex号日志Term是否一致
- 检查在PrevLogIndex号之后。自己的每个日志和Leader传来的entries里对应的日志 Term是否一致
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
if rf.currentTerm > args.Term {
// your code
return
}
rf.ResetElectionTimer()
rf.state = STATE_FOLLOWER
// 5.3 check same preLogIndex
var myLastLogIdx = len(rf.logs) - 1
if myLastLogIdx < args.PrevLogIndex {
// your code
reply.ConflictIndex = len(rf.logs)
reply.ConflictTerm = -1
return
}
rf.currentTerm = args.Term
rf.state = STATE_FOLLOWER
rf.ResetElectionTimer()
// 5.3 check log at same preLogIndex
if args.PrevLogIndex > 0 && rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
// your code
reply.ConflictTerm = rf.logs[args.PrevLogIndex].Term
reply.ConflictIndex = rf.firstIdxByTerm(reply.ConflictTerm)
return
}
var conflictIdx = -1
for i := 0; i < len(args.Entries); i++ {
var logIndex = i + args.PrevLogIndex + 1
// 检查冲突
// your code
}
if conflictIdx != -1 {
// 3. delete all logs conflict
rf.logs = rf.logs[:args.PrevLogIndex+1+conflictIdx]
// 4. append any log not in logs
rf.logs = append(rf.logs, args.Entries[conflictIdx:]...)
}
// 5
if args.LeaderCommit > rf.commitIndex && args.LeaderCommit >= 0 {
// commit日志
// your code
}
reply.Success = true
reply.Term = rf.currentTerm
}
Lab3C
任务目标
Lab 3C
要求我们实现Raft状态的持久化
数据结构
和Lab3B一致
持久化状态
根据论文里的图,我们需要持久化Raft结构的以下状态:
- currentTerm int
- votedFor int
- logs []LogEntry
Persist
照着给的Example代码写即可。ReadPersist同理
func (rf *Raft) persist() {
var buff = new(bytes.Buffer)
var enc = labgob.NewEncoder(buff)
rf.rwMutex.RLock()
enc.Encode(rf.currentTerm)
enc.Encode(rf.votedFor)
enc.Encode(rf.logs)
rf.rwMutex.RUnlock()
var raftState = buff.Bytes()
rf.persister.Save(raftState, nil)
}
逻辑梳理
最轻松的一集!
只需要在各个函数操作前加入
defer rf.persist()
Lab3D
任务目标
Lab 3D
要求我们实现Raft的状态快照和日志压缩
数据结构
Raft
添加了snapshot字段,用于保存快照。根据论文里的描述,SnapShot除了日志外还有lastIncludedIndex和lastIncludedTerm两个字段。这里将它们提出来作为Raft的字段
type Raft struct {
mu sync.Mutex // Lock to protect shared access to this peer's state
rwMutex sync.RWMutex
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// Your data here (3A, 3B, 3C).
// Look at the paper's Figure 2 for a description of what
// state a Raft server must maintain.
currentTerm int
votedFor int
logs []LogEntry
lastIncludedIndex int
lastIncludedTerm int
commitIndex int
lastApplied int
// for leader only
nextIndex []int
// for leader only
matchIndex []int
// 0 follower 1 candidate 2 leader
state RaftState
electionTimer time.Time
voteGotten int
applyCh chan ApplyMsg
snapshot []byte
}
持久化状态
因为SnapShot需要持久化,意味着lastIncludedIndex和lastIncludedTerm两个字段也要持久化。由于我将它们提出来了,要显式持久化一下
func (rf *Raft) persist() {
var buff = new(bytes.Buffer)
var enc = labgob.NewEncoder(buff)
enc.Encode(rf.currentTerm)
enc.Encode(rf.votedFor)
enc.Encode(rf.logs)
enc.Encode(rf.lastIncludedIndex)
enc.Encode(rf.lastIncludedTerm)
var raftState = buff.Bytes()
var snapshot = clone(rf.snapshot)
rf.persister.Save(raftState, snapshot)
}
InstallSnapshot
日志Index
最恶心的一集!
由于快照的存在,被快照存储的日志不需要再存储在log数组中,我们需要将它们删除。这意味着log在数组里的索引不再意味着他们在日志中的索引。我们需要对日志在日志列表和日志数组里的索引进行转换。
记日志在日志列表的索引为绝对索引;在日志数组里的索引为相对索引。同时需要将之前函数中各个日志索引访问进行绝对化或相对化,这里一定要细心。
func (rf *Raft) toAbsoluteIdx(idx int) int {
return idx + (rf.lastIncludedIndex + 1)
}
func (rf *Raft) toRelativeIdx(idx int) int {
return idx - (rf.lastIncludedIndex + 1)
}
InstallSnapshotArgs
type InstallSnapshotArgs struct {
Term int
LeaderId int
LastIncludedIndex int
LastIncludedTerm int
Data []byte
// not used
Offset int
Done bool
}
InstallSnapshotReply
type InstallSnapshotReply struct {
Term int
}
syncSnapshot
发送Snapshot的逻辑和AppenEntry很像,收到回复后更新Follower的MatchIndex和NextIndex即可
func (rf *Raft) syncSnapshot(server int) {
defer rf.persist()
var args InstallSnapshotArgs
var reply InstallSnapshotReply
rf.rwMutex.RLock()
args.Term = rf.currentTerm
args.LeaderId = rf.me
args.LastIncludedIndex = rf.lastIncludedIndex
args.LastIncludedTerm = rf.lastIncludedTerm
if rf.snapshot != nil {
args.Data = clone(rf.snapshot)
} else {
args.Data = nil
}
rf.rwMutex.RUnlock()
if rf.sendInstallSnapshot(server, &args, &reply) {
rf.rwMutex.Lock()
if rf.currentTerm < reply.Term {
rf.currentTerm = reply.Term
rf.state = STATE_FOLLOWER
rf.rwMutex.Unlock()
return
}
if rf.snapshot != nil {
rf.nextIndex[server] = args.LastIncludedIndex + 1
rf.matchIndex[server] = args.LastIncludedIndex
}
rf.rwMutex.Unlock()
}
}
InstallSnapshot
InstallSnapshot同样类似,检查自己的哪些日志需要被删掉,更新自己的日志信息。注意需要重置定时器
func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
defer rf.persist()
rf.rwMutex.Lock()
reply.Term = rf.currentTerm
if rf.currentTerm > args.Term {
rf.rwMutex.Unlock()
return
}
rf.ResetElectionTimer()
rf.state = STATE_FOLLOWER
rf.currentTerm = args.Term
if args.Data == nil {
rf.rwMutex.Unlock()
return
}
if (rf.snapshot == nil) ||
(rf.snapshot != nil && rf.lastIncludedIndex < args.LastIncludedIndex) {
var sliceIndex = rf.toRelativeIdx(args.LastIncludedIndex + 1)
if sliceIndex >= 0 && sliceIndex <= len(rf.logs)-1 {
rf.logs = rf.logs[sliceIndex:]
} else {
rf.logs = make([]LogEntry, 0)
}
rf.snapshot = args.Data
rf.lastIncludedIndex = args.LastIncludedIndex
rf.lastIncludedTerm = args.LastIncludedTerm
rf.commitIndex = args.LastIncludedIndex
rf.applyCh <- ApplyMsg{
SnapshotValid: true,
Snapshot: rf.snapshot,
SnapshotTerm: rf.lastIncludedTerm,
SnapshotIndex: rf.lastIncludedIndex + 1,
}
rf.lastApplied = rf.lastIncludedIndex
}
rf.rwMutex.Unlock()
}
InstallSnapshot时机
当Leader发现下一个要发送给Follower的日志已经不在自己的日志数组里,即已经被快照化了。此时需要向Follower发送快照
for i := 0; i < len(rf.peers); i++ {
if i == rf.me {
continue
}
rf.rwMutex.RLock()
if rf.snapshot != nil && (rf.nextIndex[i] <= rf.lastIncludedIndex) {
rf.rwMutex.RUnlock()
go rf.syncSnapshot(i)
} else {
rf.rwMutex.RUnlock()
go rf.sendAppendEntry(i)
}
}