OceanPresent

MIT-6.5840 Lab3笔记


前置知识

任务目标

Lab 3要求我们实现Raft算法并实现持久化和日记压缩

用于测试的文件在src/raft/test_test.go,运行命令go testgo test -race进行测试。命令go test -v -run ${func_name}对指定测试样例进行测试,也可以在VSCode中进入文件点击Test按钮

文档资料

以下列出完成Lab 3所需要阅读的资料:

  • Lab 3 2024文档 介绍了Lab3的任务和一些规范,同时给出了一些实现提示
  • MIT 6.824 6.824的官方视频课程,完成Lab 3需要看第六、七节
  • Raft官方网站 介绍了Raft,页面上还有一个Raft的可视化程序,便于理解
  • Raft论文 Raft论文,要多看几遍,尤其是论文里的几张图,最好严格实现不要自己DIY
  • Raft可视化 另一个Raft可视化程序,同时有Raft算法的分模块简单讲解,有利于Lab3A和Lab3B
  • Go语言教程 对于有一定编程基础,要快速入门书写Go程序的。这本国内的教程足够,既涵盖了基本语法,也提供了一些常用标准库的API供随时查看
  • src/raft/test_test.go 测试过不去的时候可以阅读测试样例源码,看测试具体做了什么检测什么

课程程序规范

Lab3主要关注如下文件

  • src/raft/raft.go Raft算法文件,需要在这里完成各操作的具体实现
  • src/raft/persister.go 持久化工具,一般不需要动。在Lab3C和Lab3D才用得到
  • src/raft/config.go 一些测试用到的函数,把它当作测试文件的源码之一。阅读方便理解测试
  • src/raft/test_test.go 测试文件,一般不需要动。如果某个测试样例过不去可以看该样例的函数,了解该样例大概需要完成什么任务,有助于Debug

Lab3A

任务目标

Lab 3A要求我们实现Raft算法的选举和Leader的心跳

数据结构

按照论文里的Figure来即可,因为每张图都贯穿整个Lab,因此不在这里重复放了,看论文即可。

Raft

type Raft struct {
	mu        sync.Mutex // Lock to protect shared access to this peer's state
	rwMutex   sync.RWMutex
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (3A, 3B, 3C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	currentTerm int
	votedFor    int
	logs        []LogEntry
	// 0 follower 1 candidate 2 leader
	state         RaftState
	electionTimer time.Time
	voteGotten    int
}

逻辑梳理

尽管一切实现细节都在图里,但因为各个图是按模块展示的,具体实现需要把这些图综合在一起,很容易对一些细节遗漏或者理不清细节的顺序。下面列举了一些Lab3A的逻辑梳理

  • Follower有两种超时类型。1. 超过ElectionTimeout没收到Leader的心跳。2. 超过ElectionTimeout没收到Candidate的请求投票。同时意味着Follower接收到这两类信号后应重制自己的ElectionTimer。Follower超时后转为Candidate。由于英语水平太差误解了论文上这段话,以为Follower接收到Candidate的请求投票后就会变成Candidate
  • 当currentTerm>arg.Term时,直接投票。因为Follower并没有在这轮Term中投过票,不需要考虑之前的voteFor值。当currentTerm==arg.Term时,需要检查voteFor
  • Server在某个状态发送了RPC请求后,在接受RPC恢复时需要检查自己的状态是否一致,若有变化则要丢弃回复不做处理。

选举投票

RequestVoteArgs

type RequestVoteArgs struct {
  // Your data here (3A, 3B).
  Term         int
  CandidateId  int
}

RequestVoteReply

type RequestVoteReply struct {
  // Your data here (3A).
  Term        int
  VoteGranted bool
}

askForVote

func (rf *Raft) askForVote(server int) {
  var args RequestVoteArgs
  var reply RequestVoteReply

  args.Term = rf.currentTerm
  args.CandidateId = rf.me
    
  if rf.sendRequestVote(server, &args, &reply) {
    // 如果当前不是 CANDIDATE 状态,说明已经发生了角色变化,退出
    if rf.state != STATE_CANDIDATE || args.Term != rf.currentTerm {
      return
    }

    // 如果收到的任期比当前大,变为 FOLLOWER 并更新任期
    if reply.Term > rf.currentTerm {
      // your code
      return
    }

    // 如果得到了投票
    if reply.VoteGranted {
      // your code
      // 如果获得了大多数投票,成为 Leader
      if rf.voteGotten > len(rf.peers)/2 {
        // your code
      }
    }
  }
}

RequestVote

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {

  // 如果请求者的任期小于当前任期,拒绝投票
  // 检查是否已经投过票给其他人
  if args.Term < rf.currentTerm || 
    (args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.CandidateId) {
    // your code
    return
  }

  // 如果请求者的任期大于当前任期,更新当前任期并转换为 FOLLOWER
  if args.Term > rf.currentTerm {
    // your code
  }

  // 如果满足投票条件,投票给请求者
  // 重置选举超时
  // your code

  reply.VoteGranted = true
  reply.Term = rf.currentTerm
}

心跳包

AppendEntriesArgs

type AppendEntriesArgs struct {
  Term         int
  LeaderId     int
  PrevLogIndex int
  PrevLogTerm  int
  // empty for heartbeat
  Entries      []LogEntry
  LeaderCommit int
}

AppendEntriesReply

type AppendEntriesReply struct {
  Term    int
  Success bool
}

sendAppendEntry

func (rf *Raft) sendAppendEntry(server int) {
    var args AppendEntriesArgs
    var reply AppendEntriesReply
    args.Term = rf.currentTerm
    args.LeaderId = rf.me
    args.Entries = make([]LogEntry, 0)
    
    rf.sendAppendEntries(server, &args, &reply)
    if rf.state != STATE_LEADER {
        return
    }

    if !reply.Success {
        if reply.Term > rf.currentTerm {
            // your code
        }
    }
}

AppendEntries

func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
  if rf.currentTerm > args.Term {
    // your code
  } else if rf.currentTerm < args.Term {
    // your code
    return
  } else if rf.currentTerm == args.Term {
    // your code
    // heartbeat
    if len(args.Entries) <= 0 {
      // your code
      return
    }
  }
}

Lab3B

任务目标

Lab 3B要求我们实现Raft算法的日志同步,实现一致性算法

数据结构

Raft

相比Lab3A,Lab3B需要用到的属性多了一些,但还是源自论文里的图。

注意我的实现里,日志数组的编号是从0开始的,也就是和数组的索引一致。

type Raft struct {
	mu        sync.Mutex // Lock to protect shared access to this peer's state
	rwMutex   sync.RWMutex
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	// Your data here (3A, 3B, 3C).
	// Look at the paper's Figure 2 for a description of what
	// state a Raft server must maintain.
	currentTerm int
	votedFor    int
	logs        []LogEntry
	commitIndex int
	lastApplied int
	// for leader only
	nextIndex []int
	// for leader only
	matchIndex []int
	// 0 follower 1 candidate 2 leader
	state         RaftState
	electionTimer time.Time
	voteGotten    int
	applyCh       chan ApplyMsg
}

逻辑梳理

  • MatchIndex和NextIndex其实没有太大关系。NextIndex表示Leader下一次要给某Follower发送的日志编号,注意是下一次发送,也就是当前还未发送。MatchIndex表示某Follower和Leader当前达成共识的最后一个日志编号。MatchIndex[i]=3表示第i个Follower的第[0, 3]个日志达成了共识
  • 正常情况下。Leader接受一个日志后,将它复制给其他Follower,如果大多数Follower都成功复制了该日志,则Leader提交(Commit)该日志,然后应用(Apply)该日志。后续Follower根据心跳包中的LeaderCommitId字段来判断自己是否要提交并应用一些日志。
    注意Leader和Follower提交同一个日志的顺序
  • 本实验中提交(Commit)和应用(Apply)的时机没有太多区别,Commit一个日志后立即Apply该日志即可

选举投票

RequestVoteArgs

Lab3B需要在RequestVoteArgs中标识Candidate最后一个日志的Index和Term

type RequestVoteArgs struct {
  // Your data here (3A, 3B).
  Term         int
  CandidateId  int
  LastLogIndex int
  LastLogTerm  int
}

RequestVote

RequestVote需要额外检查一下自己的日志是否比当前服务器更新,如果是的话,则拒绝投票(不需要重置计时器)

func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
  rf.rwMutex.Lock()
  defer rf.rwMutex.Unlock()

  // 如果请求者的任期小于当前任期,拒绝投票
  // 检查是否已经投过票给其他人,或者日志是否比当前服务器更新
  if args.Term < rf.currentTerm || 
    (args.Term == rf.currentTerm && rf.votedFor != -1 && rf.votedFor != args.CandidateId) || 
    !rf.checkLastLogUpToDate(args.LastLogTerm, args.LastLogIndex) {
    if args.Term > rf.currentTerm {
      // your code
    }
    reply.Term = rf.currentTerm
    reply.VoteGranted = false
    return
  }

  // 如果请求者的任期大于当前任期,更新当前任期并转换为 FOLLOWER
  if args.Term > rf.currentTerm {
    // 重置投票记录
    // your code
  }

  // 如果满足投票条件,投票给请求者
  // 重置选举超时
  // your code

  reply.VoteGranted = true
  reply.Term = rf.currentTerm
}

checkLastLogUpToDate

// 检查候选者的日志是否比当前服务器更新
// 返回 true 表示候选者的日志是最新的
func (rf *Raft) checkLastLogUpToDate(lastLogTerm int, lastLogIndex int) bool {
  // 当前服务器的最后一个日志条目
  lastTerm := -1
  if len(rf.logs) > 0 {
    lastTerm = rf.logs[len(rf.logs)-1].Term
  }

  // 比较日志条目的任期,如果任期不同,则任期大的日志更新
  if lastLogTerm != lastTerm {
    return lastLogTerm > lastTerm
  }

  // 如果任期相同,则比较日志索引
  return lastLogIndex >= len(rf.logs)-1
}

日志同步

AppendEntriesArgs

type AppendEntriesArgs struct {
  Term         int
  LeaderId     int
  PrevLogIndex int
  PrevLogTerm  int
  // empty for heartbeat
  Entries      []LogEntry
  LeaderCommit int
}

AppendEntriesReply

type AppendEntriesReply struct {
  Term          int
  Success       bool
  ConflictIndex int
  ConflictTerm  int
}

sendAppendEntry

sendAppendEntry额外增加了很多逻辑,难点在于如何更新NextIndex和MatchIndex。

如果Follower返回失败,最基本的做法是令NextIndex[i]–。我这里用到了一个加速的技术,在视频课里提到了,当Follower发现不匹配后,需要在Reply加入冲突的日志Index和Term。Leader根据

  1. 自己含有XTerm的日志
  2. 自己不含有XTerm的日志
  3. Follower日志过短(无法查询第PrevLogIndex个日志)

三种情况加速更新NextIndex

func (rf *Raft) sendAppendEntry(server int) {
  var args AppendEntriesArgs
  var reply AppendEntriesReply
  args.Term = rf.currentTerm
  args.LeaderId = rf.me
  args.LeaderCommit = rf.commitIndex
  // 5.3 start from -1
  var prevLogIdx = rf.nextIndex[server] - 1
  args.PrevLogIndex = prevLogIdx
    
  if len(rf.logs)-1 >= prevLogIdx+1 {
    args.Entries = make([]LogEntry, len(rf.logs[(prevLogIdx+1):]))
    copy(args.Entries, rf.logs[(prevLogIdx+1):])
  } else {
    args.Entries = make([]LogEntry, 0)
  }
  if prevLogIdx >= 0 {
    args.PrevLogTerm = rf.logs[prevLogIdx].Term
  } else {
    args.PrevLogTerm = -1
  }

  // rf.LogPrintf("Leader NextIndex %v matchIndex %v", rf.nextIndex, rf.matchIndex)
  if rf.sendAppendEntries(server, &args, &reply) {
    if rf.state != STATE_LEADER || args.Term != rf.currentTerm {
      return
    }
    if reply.Success {
      rf.matchIndex[server] = args.PrevLogIndex + len(args.Entries)
      rf.nextIndex[server] = rf.matchIndex[server] + 1

      for N := len(rf.logs) - 1; N > rf.commitIndex; N-- {
        // 检查N,如果大多数都达成共识,则Commit
        // your code
      }
    } else {
      if reply.Term > rf.currentTerm {
        // your code
      } else {
        // 5.3
        nextIdx := rf.lastIdxByTerm(reply.ConflictTerm)
        if nextIdx != -1 {
          rf.nextIndex[server] = nextIdx + 1
        } else {
          rf.nextIndex[server] = max(reply.ConflictIndex, 0)
        }
      }
    }
  }
}

AppendEntries

AppendEntries额外的逻辑在于判断自己的日志和Leader的日志在PrevLogIndex号上是否有冲突。同时删除冲突的所有日志,并补充上所有缺少的日志。

检查冲突主要有三步:

  1. 查看自己是否有PrevLogIndex号日志,如果没有说明自己日志过短
  2. 查看自己的PrevLogIndex号日志和Leader的PrevLogIndex号日志Term是否一致
  3. 检查在PrevLogIndex号之后。自己的每个日志和Leader传来的entries里对应的日志 Term是否一致
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
  if rf.currentTerm > args.Term {
      // your code
    return
  }

  rf.ResetElectionTimer()
  rf.state = STATE_FOLLOWER

  // 5.3 check same preLogIndex
  var myLastLogIdx = len(rf.logs) - 1
  if myLastLogIdx < args.PrevLogIndex {
    // your code
    reply.ConflictIndex = len(rf.logs)
    reply.ConflictTerm = -1
    return
  }

  rf.currentTerm = args.Term
  rf.state = STATE_FOLLOWER
  rf.ResetElectionTimer()

  // 5.3 check log at same preLogIndex
  if args.PrevLogIndex > 0 && rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
    // your code
    reply.ConflictTerm = rf.logs[args.PrevLogIndex].Term
    reply.ConflictIndex = rf.firstIdxByTerm(reply.ConflictTerm)
    return
  }

  var conflictIdx = -1
  for i := 0; i < len(args.Entries); i++ {
    var logIndex = i + args.PrevLogIndex + 1
    // 检查冲突
    // your code
  }

  if conflictIdx != -1 {
    // 3. delete all logs conflict
    rf.logs = rf.logs[:args.PrevLogIndex+1+conflictIdx]
    // 4. append any log not in logs
    rf.logs = append(rf.logs, args.Entries[conflictIdx:]...)
  }

  // 5
  if args.LeaderCommit > rf.commitIndex && args.LeaderCommit >= 0 {
    // commit日志
    // your code
  }
  reply.Success = true
  reply.Term = rf.currentTerm
}

Lab3C

任务目标

Lab 3C要求我们实现Raft状态的持久化

数据结构

和Lab3B一致

持久化状态

根据论文里的图,我们需要持久化Raft结构的以下状态:

  1. currentTerm int
  2. votedFor int
  3. logs []LogEntry

Persist

照着给的Example代码写即可。ReadPersist同理

func (rf *Raft) persist() {
  var buff = new(bytes.Buffer)
  var enc = labgob.NewEncoder(buff)
  rf.rwMutex.RLock()
  enc.Encode(rf.currentTerm)
  enc.Encode(rf.votedFor)
  enc.Encode(rf.logs)
  rf.rwMutex.RUnlock()
  var raftState = buff.Bytes()
  rf.persister.Save(raftState, nil)
}

逻辑梳理

最轻松的一集!

只需要在各个函数操作前加入

defer rf.persist()

Lab3D

任务目标

Lab 3D要求我们实现Raft的状态快照和日志压缩

数据结构

Raft

添加了snapshot字段,用于保存快照。根据论文里的描述,SnapShot除了日志外还有lastIncludedIndex和lastIncludedTerm两个字段。这里将它们提出来作为Raft的字段

type Raft struct {
  mu        sync.Mutex // Lock to protect shared access to this peer's state
  rwMutex   sync.RWMutex
  peers     []*labrpc.ClientEnd // RPC end points of all peers
  persister *Persister          // Object to hold this peer's persisted state
  me        int                 // this peer's index into peers[]
  dead      int32               // set by Kill()

  // Your data here (3A, 3B, 3C).
  // Look at the paper's Figure 2 for a description of what
  // state a Raft server must maintain.
  currentTerm       int
  votedFor          int
  logs              []LogEntry
  lastIncludedIndex int
  lastIncludedTerm  int

  commitIndex int
  lastApplied int
  // for leader only
  nextIndex []int
  // for leader only
  matchIndex []int
  // 0 follower 1 candidate 2 leader
  state         RaftState
  electionTimer time.Time
  voteGotten    int
  applyCh       chan ApplyMsg
  snapshot      []byte
}

持久化状态

因为SnapShot需要持久化,意味着lastIncludedIndex和lastIncludedTerm两个字段也要持久化。由于我将它们提出来了,要显式持久化一下

func (rf *Raft) persist() {
  var buff = new(bytes.Buffer)
  var enc = labgob.NewEncoder(buff)
  enc.Encode(rf.currentTerm)
  enc.Encode(rf.votedFor)
  enc.Encode(rf.logs)
  enc.Encode(rf.lastIncludedIndex)
  enc.Encode(rf.lastIncludedTerm)

  var raftState = buff.Bytes()

  var snapshot = clone(rf.snapshot)

  rf.persister.Save(raftState, snapshot)
}

InstallSnapshot

日志Index

最恶心的一集!

由于快照的存在,被快照存储的日志不需要再存储在log数组中,我们需要将它们删除。这意味着log在数组里的索引不再意味着他们在日志中的索引。我们需要对日志在日志列表和日志数组里的索引进行转换。

记日志在日志列表的索引为绝对索引;在日志数组里的索引为相对索引。同时需要将之前函数中各个日志索引访问进行绝对化或相对化,这里一定要细心。

func (rf *Raft) toAbsoluteIdx(idx int) int {
  return idx + (rf.lastIncludedIndex + 1)
}

func (rf *Raft) toRelativeIdx(idx int) int {
  return idx - (rf.lastIncludedIndex + 1)
}

InstallSnapshotArgs

type InstallSnapshotArgs struct {
  Term              int
  LeaderId          int
  LastIncludedIndex int
  LastIncludedTerm  int
  Data              []byte
  // not used
  Offset int
  Done   bool
}

InstallSnapshotReply

type InstallSnapshotReply struct {
  Term int
}

syncSnapshot

发送Snapshot的逻辑和AppenEntry很像,收到回复后更新Follower的MatchIndex和NextIndex即可

func (rf *Raft) syncSnapshot(server int) {
	defer rf.persist()
	var args InstallSnapshotArgs
	var reply InstallSnapshotReply
	rf.rwMutex.RLock()
	args.Term = rf.currentTerm
	args.LeaderId = rf.me
	args.LastIncludedIndex = rf.lastIncludedIndex
	args.LastIncludedTerm = rf.lastIncludedTerm
	if rf.snapshot != nil {
		args.Data = clone(rf.snapshot)
	} else {
		args.Data = nil
	}
	rf.rwMutex.RUnlock()
	if rf.sendInstallSnapshot(server, &args, &reply) {
		rf.rwMutex.Lock()
		if rf.currentTerm < reply.Term {
			rf.currentTerm = reply.Term
			rf.state = STATE_FOLLOWER
			rf.rwMutex.Unlock()
			return
		}
		if rf.snapshot != nil {
			rf.nextIndex[server] = args.LastIncludedIndex + 1
			rf.matchIndex[server] = args.LastIncludedIndex
		}
		rf.rwMutex.Unlock()
	}
}

InstallSnapshot

InstallSnapshot同样类似,检查自己的哪些日志需要被删掉,更新自己的日志信息。注意需要重置定时器

func (rf *Raft) InstallSnapshot(args *InstallSnapshotArgs, reply *InstallSnapshotReply) {
  defer rf.persist()
  rf.rwMutex.Lock()
  reply.Term = rf.currentTerm
  if rf.currentTerm > args.Term {
    rf.rwMutex.Unlock()
    return
  }
  rf.ResetElectionTimer()
  rf.state = STATE_FOLLOWER
  rf.currentTerm = args.Term

  if args.Data == nil {
    rf.rwMutex.Unlock()
    return
  }

  if (rf.snapshot == nil) ||
    (rf.snapshot != nil && rf.lastIncludedIndex < args.LastIncludedIndex) {
    var sliceIndex = rf.toRelativeIdx(args.LastIncludedIndex + 1)
    if sliceIndex >= 0 && sliceIndex <= len(rf.logs)-1 {
      rf.logs = rf.logs[sliceIndex:]
    } else {
      rf.logs = make([]LogEntry, 0)
    }
    rf.snapshot = args.Data
    rf.lastIncludedIndex = args.LastIncludedIndex
    rf.lastIncludedTerm = args.LastIncludedTerm

    rf.commitIndex = args.LastIncludedIndex

    rf.applyCh <- ApplyMsg{
      SnapshotValid: true,
      Snapshot:      rf.snapshot,
      SnapshotTerm:  rf.lastIncludedTerm,
      SnapshotIndex: rf.lastIncludedIndex + 1,
    }

    rf.lastApplied = rf.lastIncludedIndex
  }
  rf.rwMutex.Unlock()
}

InstallSnapshot时机

当Leader发现下一个要发送给Follower的日志已经不在自己的日志数组里,即已经被快照化了。此时需要向Follower发送快照

for i := 0; i < len(rf.peers); i++ {
    if i == rf.me {
        continue
    }
    rf.rwMutex.RLock()
    if rf.snapshot != nil && (rf.nextIndex[i] <= rf.lastIncludedIndex) {
        rf.rwMutex.RUnlock()
        go rf.syncSnapshot(i)
    } else {
        rf.rwMutex.RUnlock()
        go rf.sendAppendEntry(i)
    }
}

This website has been running for 3 years 2 months 12 days 15 hours 28 minutes 45 seconds

皖ICP备2021007094号 2021-PRESENT © OceanPresent