TheRiver | blog

You have reached the world's edge, none but devils play past here

0%

2A raft leader election

2A Introduction

Part 2A

Implement Raft leader election and heartbeats (AppendEntries RPCs with no log entries). The goal for Part 2A is for a single leader to be elected, for the leader to remain the leader if there are no failures, and for a new leader to take over if the old leader fails or if packets to/from the old leader are lost. Run go test -run 2A -race to test your 2A code.

  • You can’t easily run your Raft implementation directly; instead you should run it by way of the tester, i.e. go test -run 2A -race.
  • Follow the paper’s Figure 2. At this point you care about sending and receiving RequestVote RPCs, the Rules for Servers that relate to elections, and the State related to leader election,
  • Add the Figure 2 state for leader election to the Raft struct in raft.go. You’ll also need to define a struct to hold information about each log entry.
  • Fill in the RequestVoteArgs and RequestVoteReply structs. Modify Make() to create a background goroutine that will kick off leader election periodically by sending out RequestVote RPCs when it hasn’t heard from another peer for a while. This way a peer will learn who is the leader, if there is already a leader, or become the leader itself. Implement the RequestVote() RPC handler so that servers will vote for one another.
  • To implement heartbeats, define an AppendEntries RPC struct (though you may not need all the arguments yet), and have the leader send them out periodically. Write an AppendEntries RPC handler method that resets the election timeout so that other servers don’t step forward as leaders when one has already been elected.
  • Make sure the election timeouts in different peers don’t always fire at the same time, or else all peers will vote only for themselves and no one will become the leader.
  • The tester requires that the leader send heartbeat RPCs no more than ten times per second.
  • The tester requires your Raft to elect a new leader within five seconds of the failure of the old leader (if a majority of peers can still communicate). Remember, however, that leader election may require multiple rounds in case of a split vote (which can happen if packets are lost or if candidates unluckily choose the same random backoff times). You must pick election timeouts (and thus heartbeat intervals) that are short enough that it’s very likely that an election will complete in less than five seconds even if it requires multiple rounds.
  • The paper’s Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds. Because the tester limits you to 10 heartbeats per second, you will have to use an election timeout larger than the paper’s 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.
  • You may find Go’s rand useful.
  • You’ll need to write code that takes actions periodically or after delays in time. The easiest way to do this is to create a goroutine with a loop that calls time.Sleep(); (see the ticker() goroutine that Make() creates for this purpose). Don’t use Go’s time.Timer or time.Ticker, which are difficult to use correctly.
  • The Guidance page has some tips on how to develop and debug your code.
  • If your code has trouble passing the tests, read the paper’s Figure 2 again; the full logic for leader election is spread over multiple parts of the figure.
  • Don’t forget to implement GetState().
  • The tester calls your Raft’s rf.Kill() when it is permanently shutting down an instance. You can check whether Kill() has been called using rf.killed(). You may want to do this in all loops, to avoid having dead Raft instances print confusing messages.
  • Go RPC sends only struct fields whose names start with capital letters. Sub-structures must also have capitalized field names (e.g. fields of log records in an array). The labgob package will warn you about this; don’t ignore the warnings.

My Code

Make

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
//
// the service or tester wants to create a Raft server. the ports
// of all the Raft servers (including this one) are in peers[]. this
// server's port is peers[me]. all the servers' peers[] arrays
// have the same order. persister is a place for this server to
// save its persistent state, and also initially holds the most
// recent saved state, if any. applyCh is a channel on which the
// tester or service expects Raft to send ApplyMsg messages.
// Make() must return quickly, so it should start goroutines
// for any long-running work.
//
// todo add context
func Make(peers []*labrpc.ClientEnd, me int,
persister *Persister, applyCh chan ApplyMsg) *Raft {
log.Println("begin make.., me ", me)
rf := &Raft{}
rf.peers = peers
rf.persister = persister
rf.me = me
rf.hasLeader = false
rf.refreshCh = make(chan struct{})
rf.heartCh = make(chan struct{})

// Your initialization code here (2A, 2B, 2C).
go func() {
for {
// rand time 300-400 ms
rand.Seed(time.Now().UTC().UnixNano())
randtime := rand.Intn(100) + 300
timeout := time.After(time.Duration(randtime) * time.Millisecond)

select {
// update log or leader
// case ch := <-applyCh:
// {
// if ch.CommandValid == true {
// rf.lastApplied = ch.CommandIndex
// }
// }

// refresh timeout
case <-rf.refreshCh:

// begin elect
case <-timeout:
{
if rf.killed() {
return
}

go rf.startElection(me)
}

// begin heartbeat
case <-rf.heartCh:
{
if rf.killed() {
return
}

// The tester requires that the leader send heartbeat RPCs no more than ten times per second.
for num, _ := range peers {
// heartbeat
if num != me {
go rf.heartBeatOne(num)
}
}
}
}
}
}()

// initialize from state persisted before a crash
rf.readPersist(persister.ReadRaftState())

return rf
}

startElection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (rf *Raft) startElection(me int) {
var count int = len(rf.peers)

// become candidater
log.Println("me become candidater ", me)
rf.mu.Lock()
rf.hasLeader = false
rf.currentTerm++
rf.votenum = 1
rf.isLeader = false
rf.mu.Unlock()

for num, _ := range rf.peers {
if num != me {
var args RequestVoteArgs
var reply RequestVoteReply

rf.mu.Lock()
// already has leader
if rf.hasLeader == true {
log.Println("break elect, hasleader, who = ", me)
rf.mu.Unlock()
break
}
args.Term = rf.currentTerm
args.CandidateId = me
lognum := len(rf.log)
if lognum > 0 {
args.LastLogIndex = rf.log[lognum-1].Index
args.LastLogTerm = rf.log[lognum-1].Term
}
rf.mu.Unlock()

go func(server int) {
ret := rf.sendRequestVote(server, &args, &reply)
if ret != true {
return
}

rf.mu.Lock()
defer rf.mu.Unlock()
// become follower
if reply.Term > rf.currentTerm {
log.Println("me become follower ", me)
rf.isLeader = false
return
}

if reply.VoteGranted == true {
rf.votenum++
}

log.Println("votenum count/2 me ", rf.votenum, count/2, me)
// become leader
if rf.votenum > count/2 {
log.Println("become leader me", me)
rf.hasLeader = true
if !rf.isLeader {
rf.isLeader = true
rf.heartCh <- struct{}{}
}
}
}(num)
}
}
}

heartBeatOne

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (rf *Raft) heartBeatOne(num int) {
for {
var req AppendEntriesReq
var rsp AppendEntriesRsp

rf.mu.Lock()
rf.refreshCh <- struct{}{}
if !rf.hasLeader || !rf.isLeader || rf.killed() {
rf.mu.Unlock()
break
}

req.Term = rf.currentTerm
req.LeaderId = rf.me
rf.mu.Unlock()

ret := rf.sendAppendEntries(num, &req, &rsp)
if ret != true {
log.Println("sendAppendEntries err, server = me = ", num, rf.me)
}

// expired
rf.mu.Lock()
if rsp.Term > rf.currentTerm {
rf.isLeader = false
rf.mu.Unlock()
break
}
rf.mu.Unlock()

time.Sleep(150 * time.Millisecond)
}
}

RequestVote

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

//
// example RequestVote RPC handler.
//
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
// Your code here (2A, 2B).
//1. Reply false if term < currentTerm (§5.1)
//2. If votedFor is null or candidateId, and candidate’s log is at
//least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
if rf.killed() {
reply.VoteGranted = false
return
}

rf.mu.Lock()
if args.Term < rf.currentTerm {
reply.VoteGranted = false
} else if args.Term > rf.currentTerm {
reply.VoteGranted = true
} else { // args.Term == rf.currentTerm
// one term only vote one server
if rf.votedFor == 0 || rf.votedFor == args.CandidateId {
loglen := len(rf.log)
if loglen == 0 {
reply.VoteGranted = true
} else if args.LastLogTerm > rf.log[loglen-1].Term ||
(args.LastLogTerm == rf.log[loglen-1].Term && args.LastLogIndex >= rf.log[loglen-1].Index) {
reply.VoteGranted = true
}
}
}

if reply.VoteGranted == true {
log.Println("me vote him ", rf.me, args.CandidateId)
rf.votedFor = args.CandidateId
rf.isLeader = false
}

reply.Term = rf.currentTerm
rf.mu.Unlock()
}

AppendEntries

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Receiver implementation:
// 1. Reply false if term < currentTerm (§5.1)
// 2. Reply false if log doesn’t contain an entry at prevLogIndex
// whose term matches prevLogTerm (§5.3)
// 3. If an existing entry conflicts with a new one (same index
// but different terms), delete the existing entry and all that
// follow it (§5.3)
// 4. Append any new entries not already in the log
// 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry)
func (rf *Raft) AppendEntries(req *AppendEntriesReq, rsp *AppendEntriesRsp) {

if rf.killed() {
rsp.Success = false
return
}

rf.mu.Lock()
defer func() {
rsp.Term = rf.currentTerm
rf.mu.Unlock()
}()

// expired
if req.Term < rf.currentTerm {
rsp.Success = false
return
}

// heartbeat
if len(req.Entries) == 0 {
rf.hasLeader = true
rf.currentTerm = req.Term
rf.isLeader = false
rf.refreshCh <- struct{}{}
rsp.Success = true
return
}

if len(rf.log) > req.PrevLogIndex && rf.log[req.PrevLogIndex].Term != req.PrevLogTerm {
rsp.Success = false
return
}

rsp.Success = true
for _, value := range req.Entries {
rf.log[value.Index] = value
}

// why?
if req.LeaderCommit > rf.commitIndex {
if req.LeaderCommit < req.Entries[len(req.Entries)-1].Index {
rf.commitIndex = req.LeaderCommit
} else {
rf.commitIndex = req.Entries[len(req.Entries)-1].Index
}
}
}

test

1
2
3
4
5
6
7
8
#! /bin/sh

for((i=1;i<=500;i++));
do
echo "---------------------$i" >>2a.log
go test -run 2A -race 2>&1 >>2a.log
echo ""
done;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
grep PASS 2a.log | wc -l
500

ok pathname 7.480s
ok pathname 7.453s
ok pathname 7.463s
ok pathname 7.581s
ok pathname 7.587s
ok pathname 7.534s
ok pathname 7.483s
ok pathname 7.667s
ok pathname 7.499s
ok pathname 7.520s
ok pathname 7.539s
ok pathname 7.475s
ok pathname 7.519s
ok pathname 7.657s
ok pathname 7.617s
ok pathname 7.728s
ok pathname 7.539s
ok pathname 8.254s
ok pathname 7.591s
ok pathname 7.618s
ok pathname 7.703s
ok pathname 8.075s
ok pathname 7.509s
ok pathname 7.644s
ok pathname 7.381s
ok pathname 7.607s
ok pathname 7.616s
ok pathname 7.646s
ok pathname 7.571s
ok pathname 7.443s
ok pathname 7.533s
ok pathname 7.552s
ok pathname 7.562s
ok pathname 8.243s
ok pathname 7.559s
ok pathname 8.128s
ok pathname 8.000s
ok pathname 7.654s
ok pathname 7.579s
----------- ending -----------