文章目录
  1. 1. Raft
  2. 2. Raft的实现
    1. 2.1. State
    2. 2.2. RPC
    3. 2.3. Server
      1. 2.3.1. All Servers
      2. 2.3.2. Follower
      3. 2.3.3. Candidate
      4. 2.3.4. Leader
  3. 3. Conclusion

MIT6.824 是MIT的 Distributed System 课程,从RPC开始讲起,到主从备份,容错,再到分布式事务,分布式一致性,介绍了分布式系统各方面的内容。这门课程还包含一些实验,lab1是MapReduce实验,实现一个简单MapReduce框架,最终要能够运行WordCount、InvertedIndex这样经典的MapReduce程序;lab2及之后的lab则围绕着分布式存储这一块,从Raft开始,实现一个具备选主和日志复制功能的Raft(不包含日志压缩和快照),接着再基于Raft实现一个多副本的KV存储,以及进一步实现分片功能。

本文主要记录lab raft 的实现,以及对于Raft 的一些理解。课程实验本身提供了一些基础的代码框架,包括RPC的实现,以及单元测试代码,所以我们基本只要实现raft的核心部分代码就可以了,并且按照TDD的方式,可以从简至繁,逐步实现Raft的各个部分。

由于笔者接触分布式系统的时间还比较短,因此此文可能存在一些毗漏,欢迎读者指出纠正。

Raft

Raft 协议可以视为由选主和主从复制两部分组成的协议,协议开始选出一个strong leader,其他的成为follower,接下来所有命令则提交到leader,由leader复制给follower。

Raft是基于Replicated State Machine 模型:对于多个状态机,如果初始状态一致,输入一致,那么最终状态就会一致。那么,对于数据库来说,到底复制什么呢?一种方式,是把数据库当作状态机,把对数据库的操作当作状态机的输入,也就是“The log hold the truth, database is just an optimization”,只要记录所有的操作日志,就能把数据库恢复到任何状态。对于这种方式,client 首先向 leader 提交一个 “put(“hello”, “world”)”这样的命令,Raft首先把这条命令复制到多个副本,等待majority接受之后,这条命令就可以commit了。commit之后的命令,就可以apply到状态机了,也就是说可以在单机的存储引擎中执行“put(“hello”, “world”)”这条命令了。也就是说,一个命令需要经过 submit, commit, apply 这三个过程才能真正写到数据库。

那么除此之外有没有其他的方式呢,在我看来应该是有的。Raft的log可以当作数据库的REDO log,那么apply这个操作显然就应该交由数据库来处理,而不是raft来apply。再者,log本身是不是也可以作为数据呢,一定要apply到状态机吗?

Raft的实现

Raft能够流行开来,很重要的一个原因就是易于理解,容易实现。基本只要按照Figure2,就能实现一个具备基本功能的raft,不过显然对于工业级使用来说,这样的实现在性能上还是比较差的,同时也会有一些corner case 能够让raft挂掉。不过暂且不考虑这些,我们先来分析一下如何按照Figure2实现raft。

State

首先是Raft需要维护的状态。需要持久化的状态有三个,currentTerm, votedFor, log[]。currentTerm表示当前的term,votedFor表示投以选票的peer,这两个状态之所以需要进行持久化,是为了保证在重启之后,不会发生重复投票的情况,即“在一个term只给一个candidate投票”,这点是leader election的基本保证,否则就会发生多个leader 的情况。而votedFor还有一点需要注意,它不等价于当前term的leader,不要因为发现了leader就修改votedFor。另一个需要持久化的状态就是log,这个无需赘述。

接下来是所有server都需要维护的非持久化状态: commitIndex, lastApplied。这两个状态是为了实现日志复制。commitIndex指的是当前commit的日志,而lastApplied则是已经apply的日志。这也是前面所说的,submit -> commit -> apply。submitIndex 隐式包含在log[] 中,也就是最后一个log entry。

最后是leader需要维护的状态: nextIndex[], matchIndex[]。matchIndex是成功复制到peer的日志index,这里的成功定义为:appendEntries RPC返回成功。也就是说不管这个log 有没有成功复制到其他的peer,或者有没有commit。按理说每次复制日志的时候,只要把matchIndex后面的日志复制过去就好了,为什么还需要nextIndex呢?主要是为了处理log mismatch 的情况,matchIndex之前的日志还是可能mismatch的,发生这种情况的话,要回退nextIndex,重新尝试。

RPC

Raft主要有两个RPC:AppendEntries和RequestVote。至于InstallSnapshot则算是锦上添花。

AppendEntries用作日志复制和心跳。参数包括term, leaderId, prevLogIndex, prevLogTerm, entries[], leaderCommit,返回值有term, success。term和leaderId是sender的term和id;prevLogIndex, prevLogTerm, entries[]则是日志,prev这个参数一来为了保证Log Matching Safety,二来也是为了保证AppendEntries的幂等。还有一个很重要的参数是leaderCommit,表示leader 的commitIndex,用来进行two phase commit中的commit阶段。

对于AppendEntries的处理:

  • 如果term < currentTerm ,返回false。(这个leader已经过期)
  • 如果prevLog不匹配,返回false。(Log Matching Property)
  • 把entries追加到日志的prevLogIndex后面,如果存在冲突,直接覆盖掉
  • 如果leaderCommit > commitIndex,commitIndex = min(leaderCommit, indexOfLastLogEntry)。(commit log)

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (rf *Raft) AppendEntries(args AppendEntriesArgs, reply *AppendEntriesReply) {
rf.ruleForAll(args.LeaderId, args.Term)
if args.Term >= rf.currentTerm {
rf.heartBeatCh <- args
}

rf.Lock()
defer rf.Unlock()
term := rf.currentTerm
logOk := len(rf.log) > args.PrevLogIndex && args.PrevLogTerm == rf.log[args.PrevLogIndex].Term

reply.Term = term
if args.Term < term || !logOk {
reply.Success = false
if args.Term < term {
rf.logInfo("Reject AppendEntries, for TermMismatch: %d < %d", args.Term, term)
}
if !logOk {
rf.logInfo("Reject AppendEntries, for LogMisMatch: (%d, %d) not exists", args.PrevLogIndex, args.PrevLogTerm)
}
} else {
reply.Success = true
if len(args.Entries) > 0 {
rf.log = rf.log[:args.PrevLogIndex + 1]
rf.log = append(rf.log, args.Entries...)
rf.persist()
rf.logInfo("AppendEntries successfully: %v", args.Entries)
}
if args.LeaderCommit > rf.commitIndex && len(rf.log) - 1 > rf.commitIndex {
rf.commitIndex = int(math.Min(float64(args.LeaderCommit), float64(len(rf.log) - 1)))
rf.logInfo("Follower update commitIndex: commitIndex: %d", rf.commitIndex)
}
}
}

另一个RPC是RequestVote,用于选主。参数包括term, candidateId, lastLogIndex, lastLogTerm, 返回值包括term, voteGranted。term和candidateId还是用于标识一个candidate,而lastLog是为了保证section5.4.1 说的Election restriction,新选出的leader需要包含所有已经提交的日志,那么只要它的日志和其他的一样新,那么它就一定满足这个条件。

所以处理这个RPC时:

  • 如果term < currentTerm,返回false
  • 如果votedFor为空,或者就是candidateId,并且lastLog和自己的一样新,就投以选票。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rf *Raft) RequestVote(args RequestVoteArgs, reply *RequestVoteReply) {
rf.ruleForAll(args.CandidateID, args.Term)

rf.Lock()
defer rf.Unlock()
voteFor := rf.votedFor
voteForOk := voteFor == -1 || voteFor == args.CandidateID
lastEntry := rf.log[len(rf.log) - 1]
logOk := args.LastLogTerm > lastEntry.Term ||
(args.LastLogTerm == lastEntry.Term && args.LastLogIndex >= len(rf.log) - 1)

if args.Term >= rf.currentTerm && voteForOk && logOk {
rf.beFollower(args.CandidateID, args.Term)
reply.VoteGranted = true
reply.Term = rf.currentTerm
go func() {
rf.beFollowerCh <- follower
}()
rf.logInfo("GrantVote to candidate<%d,%d>", args.CandidateID, args.Term)
} else {
reply.VoteGranted = false
reply.Term = rf.currentTerm
if args.Term < rf.currentTerm {
rf.logInfo("Not grant vote to <%d,%d>, because term %d < %d", args.Term, rf.currentTerm)
}
if !voteForOk {
rf.logInfo("Not grant vote to <%d,%d>, because votedFor: %d", args.CandidateID, args.Term, voteFor)
}
if !logOk {
rf.logInfo("Not grant vote to <%d,%d>, because logMismatch: %v", args.CandidateID, args.Term, rf.log)
}
}
}

Server

All Servers

对于所有server有一些通用的规则。如果commitIndex > lastApplied,就把lastApplied后面的日志apply到SM。如果RPC中的term大于当前term,说明当前节点已经过期,需要更新currentTerm,并且直接变成follower。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (rf *Raft) ruleForAll(candidate int, term int) {
// Apply command need to be serial!
rf.Lock()
old := rf.lastApplied
for rf.commitIndex > rf.lastApplied {
rf.lastApplied++
msg := ApplyMsg{Index: rf.lastApplied, Command: rf.log[rf.lastApplied].Command}
rf.applyCh <- msg
}
if rf.lastApplied > old {
rf.logInfo("Applyed commands until index=%d", rf.lastApplied)
}
rf.Unlock()

// exists a new term
rf.Lock()
if term > rf.currentTerm {
rf.logInfo("RuleForAll: find new term<%d,%d>, become follower", candidate, term)
rf.beFollower(-1, term)
go func() {
rf.beFollowerCh <- follower
}()
}
rf.Unlock()
}

Follower

Follower需要响应AppendEntries以及RequestVote RPC,并且维护一个 election timeout,如果没有收到心跳,需要进入candidate状态。相对来说比较简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func (rf *Raft) doFollower() {
rf.logInfo("Become follower")

for {
select {
case <-time.After(electionTO()):
rf.Lock()
rf.role = candidate
rf.votedFor = -1
rf.persist()
rf.Unlock()
rf.logInfo("Follower lose heartbeat, become candidate")
return
case <-rf.heartBeatCh:
case <-rf.beFollowerCh:
}
}
}

Candidate

进入Candidate状态需要不断发起选举,当然,为了避免互相竞争的活锁情况,每次选举的时间也是有间隔的。

对于一次选举,首先增加currentTerm, votedFor改成自己,通过RequestVote向其它peer发起投票。如果收到majority的投票,变成leader(不需要等待所有RPC返回);如果收到新的Leader的AppendEntries,变成follower;如果超时,进入下一轮选举。第二条规则可能和Rule For All那边有一些重叠,我们需要谨慎处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (rf *Raft) doCandidate() {
rf.logInfo("Start leader election")
defer rf.logInfo("Quit leader election")

for {
// increase term
rf.Lock()
rf.votedFor = rf.me
rf.currentTerm++
rf.persist()
rf.Unlock()

// request votes
votedCh := make(chan bool)
quitCh := make(chan bool)
go rf.requestingVote(votedCh, quitCh)

// 1. voted by majority: succeed then quit
// 2. find a new leader: become a follower and quit.
// 3. timeout, enter next election
start := time.Now()
election := electionTO()
select {
case <-time.After(election):
rf.logInfo("LeaderElection timeout")
case <-rf.beFollowerCh:
rf.logInfo("Convert to Follower from candidate")
return
case ok := <-votedCh:
if ok {
rf.Lock()
rf.role = leader
rf.Unlock()
rf.logInfo("Leader election succeed")
return
} else {
rf.logInfo("Leader election fail")
}
case args := <-rf.heartBeatCh:
rf.logInfo("LeaderElection fail, find new leader<%d,%d>", args.LeaderId, args.Term)
rf.beFollowerLocked(rf.votedFor, args.Term)
return
}

// election fail, quit requestVote goroutine
close(quitCh)
if time.Since(start) < election {
time.Sleep(election - time.Since(start))
}
}
}

Leader

Leader要做的事情会多一点,毕竟它是leader。

首先,它需要用心跳维护自己的leader地位,有点lease的意思。开始当选leader时,就要发送一个心跳,结束其他candidate。之后如果需要维护一个定期的心跳。

其次,它需要处理客户端的请求,如果收到客户端的command,leader需要将其append到自己的日志,得到command的index。至于何时返回给客户端,看起来则是可以选择的,如果append之后直接返回,那么可能发生日志没有成功提交的情况,如果apply 到SM之后再返回,那么可能永远不会返回,也许是需要一个timeout。

除此之外,就是日志复制了。paper将客户端提交日志和日志复制两块分开了,我们在实现的时候也需要仔细考虑,何时进行日志复制,如果提交一条就复制,那么吞吐量可能跟不上,如果定期复制,延迟也许又会比较大。不过通用的规则时,lastLogIndx >= nextIndex,就发送日志,如果返回成功,更新matchIndex nextIndex,如果失败,回退nextIndex,重试。

日志复制之后,需要进行commit。commit要满足majority的原则,如果存在 N > commitIndex, majority matchIndex[i] > N,并且log[N].term == currentTerm,commitIndex = N。Raft仅仅commit当前term的日志,不过也间接commit了之前的日志。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (rf *Raft) doLeader() {
rf.logInfo("Become leader")
defer rf.logInfo("Leader quit.")

rf.Lock()
rf.nextIndex = make([]int, len(rf.peers))
for i := range rf.nextIndex {
rf.nextIndex[i] = len(rf.log)
}
rf.matchIndex = make([]int, len(rf.peers))
rf.commitCh = make(chan bool)
rf.submitCh = make(chan bool)
rf.Unlock()
quitCh := make(chan bool)
done := new(sync.WaitGroup)

for i := range rf.peers {
if i != rf.me {
go rf.replicator(i, quitCh, done)
}
}

go rf.committer(quitCh, done)

// todo: need a elegant way to trigger leader to quit
<-rf.beFollowerCh
close(quitCh)
done.Wait()
rf.logInfo("Leader quit, close committer, replicators. And cleanup submit channel")
}

Conclusion

Raft说来简单,但深究其实也挺复杂。如果我们仅仅实现一个简单的Raft,那么很简单,如果要追求性能,可能会变得较为复杂,在这种情况下,用哪种cosensus protocol可能就不是那么重要了,不论用哪个,最后的复杂度可能都落在了处理各种corner case和性能提升。Raft在一些corner case 的情况下可能也会出现一些奇怪的情况,例如,如果一个follower和leader 失联,他会不断选举,增大term,并且通过RequestVote告知给其他的peer,进而导致所有节点都变成follower,并且这个选举过程可能一直不会成功。对于这个case,如果follower在进行选举之前,先询问一下其他的server,是否认为leader已经挂了,如果majority认为leader挂了,再进行选举,看起来是能比避免这种情况。

所以,Raft并不是银弹,也不存在银弹。

文章目录
  1. 1. Raft
  2. 2. Raft的实现
    1. 2.1. State
    2. 2.2. RPC
    3. 2.3. Server
      1. 2.3.1. All Servers
      2. 2.3.2. Follower
      3. 2.3.3. Candidate
      4. 2.3.4. Leader
  3. 3. Conclusion