Implement Raft leader election and heartbeats (AppendEntries RPCs with no log entries). The goal for Part 2A is for a single leader to be elected, for the leader to remain the leader if there are no failures, and for a new leader to take over if the old leader fails or if packets to/from the old leader are lost. Run go test -run 2A -race to test your 2A code.
You can’t easily run your Raft implementation directly; instead you should run it by way of the tester, i.e. go test -run 2A -race.
Follow the paper’s Figure 2. At this point you care about sending and receiving RequestVote RPCs, the Rules for Servers that relate to elections, and the State related to leader election,
Add the Figure 2 state for leader election to the Raft struct in raft.go. You’ll also need to define a struct to hold information about each log entry.
Fill in the RequestVoteArgs and RequestVoteReply structs. Modify Make() to create a background goroutine that will kick off leader election periodically by sending out RequestVote RPCs when it hasn’t heard from another peer for a while. This way a peer will learn who is the leader, if there is already a leader, or become the leader itself. Implement the RequestVote() RPC handler so that servers will vote for one another.
To implement heartbeats, define an AppendEntries RPC struct (though you may not need all the arguments yet), and have the leader send them out periodically. Write an AppendEntries RPC handler method that resets the election timeout so that other servers don’t step forward as leaders when one has already been elected.
Make sure the election timeouts in different peers don’t always fire at the same time, or else all peers will vote only for themselves and no one will become the leader.
The tester requires that the leader send heartbeat RPCs no more than ten times per second.
The tester requires your Raft to elect a new leader within five seconds of the failure of the old leader (if a majority of peers can still communicate). Remember, however, that leader election may require multiple rounds in case of a split vote (which can happen if packets are lost or if candidates unluckily choose the same random backoff times). You must pick election timeouts (and thus heartbeat intervals) that are short enough that it’s very likely that an election will complete in less than five seconds even if it requires multiple rounds.
The paper’s Section 5.2 mentions election timeouts in the range of 150 to 300 milliseconds. Such a range only makes sense if the leader sends heartbeats considerably more often than once per 150 milliseconds. Because the tester limits you to 10 heartbeats per second, you will have to use an election timeout larger than the paper’s 150 to 300 milliseconds, but not too large, because then you may fail to elect a leader within five seconds.
You may find Go’s rand useful.
You’ll need to write code that takes actions periodically or after delays in time. The easiest way to do this is to create a goroutine with a loop that calls time.Sleep(); (see the ticker() goroutine that Make() creates for this purpose). Don’t use Go’s time.Timer or time.Ticker, which are difficult to use correctly.
The Guidance page has some tips on how to develop and debug your code.
If your code has trouble passing the tests, read the paper’s Figure 2 again; the full logic for leader election is spread over multiple parts of the figure.
Don’t forget to implement GetState().
The tester calls your Raft’s rf.Kill() when it is permanently shutting down an instance. You can check whether Kill() has been called using rf.killed(). You may want to do this in all loops, to avoid having dead Raft instances print confusing messages.
Go RPC sends only struct fields whose names start with capital letters. Sub-structures must also have capitalized field names (e.g. fields of log records in an array). The labgob package will warn you about this; don’t ignore the warnings.
// // the service or tester wants to create a Raft server. the ports // of all the Raft servers (including this one) are in peers[]. this // server's port is peers[me]. all the servers' peers[] arrays // have the same order. persister is a place for this server to // save its persistent state, and also initially holds the most // recent saved state, if any. applyCh is a channel on which the // tester or service expects Raft to send ApplyMsg messages. // Make() must return quickly, so it should start goroutines // for any long-running work. // // todo add context funcMake(peers []*labrpc.ClientEnd, me int, persister *Persister, applyCh chan ApplyMsg) *Raft { log.Println("begin make.., me ", me) rf := &Raft{} rf.peers = peers rf.persister = persister rf.me = me rf.hasLeader = false rf.refreshCh = make(chanstruct{}) rf.heartCh = make(chanstruct{})
// Your initialization code here (2A, 2B, 2C). gofunc() { for { // rand time 300-400 ms rand.Seed(time.Now().UTC().UnixNano()) randtime := rand.Intn(100) + 300 timeout := time.After(time.Duration(randtime) * time.Millisecond)
select { // update log or leader // case ch := <-applyCh: // { // if ch.CommandValid == true { // rf.lastApplied = ch.CommandIndex // } // }
// refresh timeout case <-rf.refreshCh:
// begin elect case <-timeout: { if rf.killed() { return }
go rf.startElection(me) }
// begin heartbeat case <-rf.heartCh: { if rf.killed() { return }
// The tester requires that the leader send heartbeat RPCs no more than ten times per second. for num, _ := range peers { // heartbeat if num != me { go rf.heartBeatOne(num) } } } } } }()
// initialize from state persisted before a crash rf.readPersist(persister.ReadRaftState())
// // example RequestVote RPC handler. // func(rf *Raft)RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) { // Your code here (2A, 2B). //1. Reply false if term < currentTerm (§5.1) //2. If votedFor is null or candidateId, and candidate’s log is at //least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) if rf.killed() { reply.VoteGranted = false return }
rf.mu.Lock() if args.Term < rf.currentTerm { reply.VoteGranted = false } elseif args.Term > rf.currentTerm { reply.VoteGranted = true } else { // args.Term == rf.currentTerm // one term only vote one server if rf.votedFor == 0 || rf.votedFor == args.CandidateId { loglen := len(rf.log) if loglen == 0 { reply.VoteGranted = true } elseif args.LastLogTerm > rf.log[loglen-1].Term || (args.LastLogTerm == rf.log[loglen-1].Term && args.LastLogIndex >= rf.log[loglen-1].Index) { reply.VoteGranted = true } } }
if reply.VoteGranted == true { log.Println("me vote him ", rf.me, args.CandidateId) rf.votedFor = args.CandidateId rf.isLeader = false }
// Receiver implementation: // 1. Reply false if term < currentTerm (§5.1) // 2. Reply false if log doesn’t contain an entry at prevLogIndex // whose term matches prevLogTerm (§5.3) // 3. If an existing entry conflicts with a new one (same index // but different terms), delete the existing entry and all that // follow it (§5.3) // 4. Append any new entries not already in the log // 5. If leaderCommit > commitIndex, set commitIndex = min(leaderCommit, index of last new entry) func(rf *Raft)AppendEntries(req *AppendEntriesReq, rsp *AppendEntriesRsp) {
ok pathname 7.480s ok pathname 7.453s ok pathname 7.463s ok pathname 7.581s ok pathname 7.587s ok pathname 7.534s ok pathname 7.483s ok pathname 7.667s ok pathname 7.499s ok pathname 7.520s ok pathname 7.539s ok pathname 7.475s ok pathname 7.519s ok pathname 7.657s ok pathname 7.617s ok pathname 7.728s ok pathname 7.539s ok pathname 8.254s ok pathname 7.591s ok pathname 7.618s ok pathname 7.703s ok pathname 8.075s ok pathname 7.509s ok pathname 7.644s ok pathname 7.381s ok pathname 7.607s ok pathname 7.616s ok pathname 7.646s ok pathname 7.571s ok pathname 7.443s ok pathname 7.533s ok pathname 7.552s ok pathname 7.562s ok pathname 8.243s ok pathname 7.559s ok pathname 8.128s ok pathname 8.000s ok pathname 7.654s ok pathname 7.579s