分布式Raft算法原理及实现

Raft 状态

一个 Raft 集群包含若干个服务器节点;通常是 5 个,这允许整个系统容忍 2 个节点的失效,每个节点处于以下三种状态之一:

  • follower(跟随者) :所有结点都以 follower 的状态开始。如果没收到 leader消息则会变成 candidate状态。
  • candidate(候选人):会向其他结点“拉选票”,如果得到大部分的票则成为leader。这个过程就叫做Leader选举(Leader Election)。
  • leader(领导者):所有对系统的修改都会先经过leader

Raft 一致性算法

Raft通过选出一个leader来简化日志副本的管理,例如,日志项(log entry)只允许从leader流向follower。

基于leader的方法,Raft算法可以分解成三个子问题:

Leader election (领导选举):原来的leader挂掉后,必须选出一个新的leader

Log replication (日志复制):leader从客户端接收日志,并复制到整个集群中

Safety (安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项

Leader election (领导选举)

Raft 使用一种心跳机制来触发领导人选举。当服务器程序启动时,他们都是 follower(跟随者) 身份。如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者。要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态。

然后他会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。候选人的状态维持直到发生以下任何一个条件发生的时候,

  • 他自己赢得了这次的选举

    • 如果这个节点赢得了半数以上的vote就会成为leader,每个节点会按照first-come-first-served的原则进行投票,并且一个term中只能投给一个节点, 这样就保证了一个term最多有一个节点赢得半数以上的vote。
    • 当一个节点赢得选举, 他会成为leader, 并且给所有节点发送这个信息, 这样所有节点都会回退成follower。
  • 其他的服务器成为领导者

    如果在等待选举期间,candidate接收到其他server要成为leader的RPC,分两种情况处理:

    • 如果leader的term大于或等于自身的term,那么改candidate 会转成follower 状态
    • 如果leader的term小于自身的term,那么会拒绝该 leader,并继续保持candidate 状态
  • 一段时间之后没有任何一个获胜的人

    • 有可能,很多follower同时变成candidate,导致没有candidate能获得大多数的选举,从而导致无法选出主。当这个情况发生时,每个candidate会超时,然后重新发增加term,发起新一轮选举RPC。需要注意的是,如果没有特别处理,可能出导致无限地重复选主的情况。

    • Raft采用随机定时器的方法来避免上述情况,每个candidate选择一个时间间隔内的随机值,例如150-300ms,采用这种机制,一般只有一个server会进入candidate状态,然后获得大多数server的选举,最后成为主。每个candidate在收到leader的心跳信息后会重启定时器,从而避免在leader正常工作时,会发生选举的情况。

Log replication (日志复制)

当选出 leader 后,它会开始接受客户端请求,每个请求会带有一个指令,可以被回放到状态机中。leader 把指令追加成一个log entry,然后通过AppendEntries RPC并行的发送给其他的server,当改entry被多数派server复制后,leader 会把该entry回放到状态机中,然后把结果返回给客户端。

follower 宕机或者运行较慢时,leader 会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry。

raft的log replication保证以下性质(Log Matching Property):

  • 如果两个log entry有相同的index和term,那么它们存储相同的指令

  • 如果两个log entry在两份不同的日志中,并且有相同的index和term,那么它们之前的log entry是完全相同的

其中特性一通过以下保证:

  • leader在一个特定的term和index下,只会创建一个log entry
  • log entry不会改变它们在日志中的位置

特性二通过以下保证:

  • AppendEntries会做log entry的一致性检查,当发送一个AppendEntriesRPC时,leader会带上需要复制的log entry前一个log entry的(index, iterm)

如果follower没有发现与它一样的log entry,那么它会拒绝接受新的log entry 这样就能保证特性二得以满足。

安全性

选举限制

在一些一致性算法中,即使一台server没有包含所有之前已提交的log entry,也能被选为主,这些算法需要把leader上缺失的日志从其他的server拷贝到leader上,这种方法会导致额外的复杂度。相对而言,raft使用一种更简单的方法,即它保证所有已提交的log entry都会在当前选举的leader上,因此,在raft算法中,日志只会从leader流向follower。

为了实现上述目标,raft在选举中会保证,一个candidate只有得到大多数的server的选票之后,才能被选为主。得到大多数的选票表明,选举它的server中至少有一个server是拥有所有已经提交的log entry的,而leader的日志至少和follower的一样新,这样就保证了leader肯定有所有已提交的log entry。

提交之前任期内的日志条目

领导人知道一条当前任期内的日志记录是可以被提交的,只要它被存储到了大多数的服务器上。如果一个领导人在提交日志条目之前崩溃了,未来后续的领导人会继续尝试复制这条日志记录。然而,一个领导人不能断定一个之前任期里的日志条目被保存到大多数服务器上的时候就一定已经提交了。下图展示了一种情况,一条已经被存储到大多数节点上的老日志条目,也依然有可能会被未来的领导人覆盖掉。

如上图的例子,图(c)就发生了一个log entry虽然已经复制到大多数的服务器,但是仍然有可能被覆盖掉的可能,如图(d),整个发生的时序如下:

  • 图a中,S1被选为主,然后复制到log index为2的log entry到S2上

  • 图b中,S1挂掉,然后S5获得了S3,S4和自身的选举,成为leader,然后,其从客户端收到了一个新的log entry(3)

  • 图c中,S5挂掉,S1重新正常工作,又被选为主,继续复制log entry(2),在log entry(2)被提交前,S1又挂掉

  • 图d中,S5又重新被选为领导者,然后,会把term 3的log entry覆盖到其他log index为2的log entry

为了上图描述的情况,Raft 永远不会通过计算副本数目的方式去提交一个之前任期内的日志条目。只有领导人当前任期里的日志条目通过计算副本数目可以被提交;一旦当前任期的日志条目以这种方式被提交,那么由于日志匹配特性,之前的日志条目也都会被间接的提交。例如,图e中,如果S1在挂掉前把log entry(4)复制到了大多数的server后,就能保证之前的log entry(2)被提交了,之后S5也就不可能被选为领导者了。

安全性论证

以反证法来证明,假设任期 T 的领导人(领导人 T)在任期内提交了一条日志条目,但是这条日志条目没有被存储到未来某个任期的领导人的日志中。设大于 T 的最小任期 U 的领导人 U 没有这条日志条目。

如果 S1 (任期 T 的领导者)提交了一条新的日志在它的任期里,然后 S5 在之后的任期 U 里被选举为领导人,然后至少会有一个机器,如 S3,既拥有来自 S1 的日志,也给 S5 投票了。

  1. 在领导人 U 选举的时候一定没有那条被提交的日志条目(领导人从不会删除或者覆盖任何条目)。

  2. 领导人 T 复制这条日志条目给集群中的大多数节点,同时,领导人U 从集群中的大多数节点赢得了选票。因此,至少有一个节点(投票者、选民)同时接受了来自领导人T 的日志条目,并且给领导人U 投票了,这个投票者是产生这个矛盾的关键。

  3. 这个投票者必须在给领导人 U 投票之前先接受了从领导人 T 发来的已经被提交的日志条目;否则他就会拒绝来自领导人 T 的附加日志请求(因为此时他的任期号会比 T 大)。

  4. 投票者在给领导人 U 投票时依然保有这条日志条目,因为任何中间的领导人都包含该日志条目(根据上述的假设),领导人从不会删除条目,并且跟随者只有和领导人冲突的时候才会删除条目。

  5. 投票者把自己选票投给领导人 U 时,领导人 U 的日志必须和投票者自己一样新。这就导致了两者矛盾之一。

    • 首先,如果投票者和领导人 U 的最后一条日志的任期号相同,那么领导人 U 的日志至少和投票者一样长,所以领导人 U 的日志一定包含所有投票者的日志。这是另一处矛盾,因为投票者包含了那条已经被提交的日志条目,但是在上述的假设里,领导人 U 是不包含的。

    • 除此之外,领导人 U 的最后一条日志的任期号就必须比投票人大了。此外,他也比 T 大,因为投票人的最后一条日志的任期号至少和 T 一样大(他包含了来自任期 T 的已提交的日志)。创建了领导人 U 最后一条日志的之前领导人一定已经包含了那条被提交的日志(根据上述假设,领导人 U 是第一个不包含该日志条目的领导人)。所以,根据日志匹配特性,领导人 U 一定也包含那条被提交当然日志,这里产生矛盾。

  6. 因此,假设不成立,所有比 T 大的领导人一定包含了所有来自 T 的已经被提交的日志。日志匹配原则保证了未来的领导人也同时会包含被间接提交的条目

跟随者和候选人崩溃

跟随者或者候选人崩溃,会按如下处理:

  • 领导者会不断给它发送选举和追加日志的RPC,直到成功
  • 跟随者会忽略它已经处理过的追加日志的RPC

时间和可用性

领导人选举是 Raft 中对时间要求最为关键的方面。Raft 可以选举并维持一个稳定的领导人,只要系统满足下面的时间要求:

1
广播时间(broadcastTime) << 选举超时时间(electionTimeout) << 平均故障间隔时间(MTBF)
  • 广播时间指的是从一个服务器并行的发送 RPCs 给集群中的其他服务器并接收响应的平均时间;

  • 选举超时时间就是选举的超时时间限制

  • 平均故障间隔时间就是对于一台服务器而言,两次故障之间的平均时间。

选举超时时间要大于广播时间的原因是,防止跟随者因为还没收到领导者的心跳,而重新选主。

选举超时时间要小于MTBF的原因是,防止选举时,能正常工作的server没有达到大多数。

对于广播时间,一般在[0.5ms,20ms]之间,而平均故障间隔时间一般非常大,至少是按照月为单位。因此,一般选举超时时间一般选择范围为[10ms,500ms]。因此,当领导者挂掉后,能在较短时间内重新选主。

动画演示 Raft

http://thesecretlivesofdata.com/raft/

代码实现

实现效果:
4个子节点投票,选出leader,投票停止,leader状态变为leader,子节点状态重置。leader向子节点发送心跳数据,表名自己活着。leader使用浏览器自定义给子节点发送数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
package main

import (
"fmt"
"flag"
"net"
"strconv"
"time"
"strings"
"net/http"
"math/rand"
)

const (
LEADER = iota
CANDIDATE
FOLLOWER
)

//声明地址信息
type Addr struct {
Host string //ip
Port int
Addr string
}

type RaftServer struct {

Votes int //选票
Role int // 角色 follower candidate leader
Nodes []Addr
isElecting bool //判断当前节点是否处于选举中
Timeout int //选举间隔时间(也叫超时时间)
ElecChan chan bool //通道信号
HeartBeatChan chan bool //leader 的心跳信号
Port int //端口号

//网页接收到的参数 由主节点向子节点传参
CusMsg chan string


}

func (rs *RaftServer)changeRole(role int) {
switch role {
case LEADER:
fmt.Println("leader")
case CANDIDATE:
fmt.Println("candidate")
case FOLLOWER:
fmt.Println("follower")

}
rs.Role = role
}

func (rs *RaftServer)resetTimeout() {
//Raft系统一般为1500-3000毫秒选一次
rs.Timeout = 2000
}

//运行服务器
func (rs *RaftServer)Run() {
//rs监听 是否有人 给我投票
listen , _ := net.Listen("tcp", ":"+strconv.Itoa(rs.Port))

defer listen.Close()

go rs.elect()

//控制投票时间
go rs.electTimeDuration()

//go rs.printRole()

// 主节点发送心跳
go rs.sendHeartBeat()
//
go rs.sendDataToOtherNodes()

//监听http协议
go rs.setHttpServer()

for {
conn,_ := listen.Accept()
go func(){

for {
by := make([]byte, 1024)
n,_:= conn.Read(by)
fmt.Println("收到消息", string(by[:n]))

value := string(by[:n])
v,_ := strconv.Atoi(value)
if v == rs.Port {
rs.Votes++
fmt.Println("当前票数:", rs.Votes)
// leader 选举成功
if VoteSuccess(rs.Votes, 5) == true {
fmt.Printf("我是 %v, 我被选举成leader", rs.Port)

//通知其他节点。停止选举
//重置其他节点状态和票数
rs.VoteToOther("stopVote")
rs.isElecting = false
//改变当前节点状态

rs.changeRole(LEADER)
break
}
}



//收到leader发来的消息
if strings.HasPrefix(string(by[:n]), "stopVote") {
//停止给别人投票
rs.isElecting = false
//回退自己的状态
rs.changeRole(FOLLOWER)
break
}

}

}()
}

}

func VoteSuccess(vote int, target int) bool {
if vote >= target {
return true
}
return false
}


//发送数据)
func (rs *RaftServer)VoteToOther(data string) {
for _,k := range rs.Nodes {
if k.Port != rs.Port {
if data == "1234" {
fmt.Println("-------------", k.Port)
}

label :conn,err := net.Dial("tcp", ":"+strconv.Itoa(k.Port))
for {
if err != nil {
time.Sleep(1*time.Second)
goto label
}
break
}
conn.Write([]byte(data))

}
}
}

//给别人投票
func (rs *RaftServer)elect() {

for {
//通过通道确定现在可以给别人投票

<- rs.ElecChan

//给其他节点投票,不能投给自己
vote := getVoteNum()

rs.VoteToOther(strconv.Itoa(vote))
// 设置选举状态
if rs.Role != LEADER {
rs.changeRole(CANDIDATE)
} else {
//是leader的情况
return
}

}
}

func getVoteNum() int {

rand.Seed(time.Now().UnixNano())
return rand.Intn(4) + 5000
}


func (rs *RaftServer)electTimeDuration() {
//
fmt.Println("+++", rs.isElecting)
for {
if rs.isElecting {

rs.ElecChan <- true
time.Sleep(time.Duration(rs.Timeout) * time.Millisecond)

}


}
}

//打印当前对象的角色
func (rs *RaftServer)printRole() {
for {
time.Sleep(1 * time.Second)
fmt.Println(rs.Port, "状态为", rs.Role, rs.isElecting)
}
}


func main() {

//获取参数
//运行 go run main.go -p 5000 (p 后面就是要启动的端口)
port := flag.Int("p",1234,"port")
flag.Parse()
fmt.Println(*port)

rs := RaftServer{}
rs.isElecting = true
rs.Votes = 0
rs.Role = FOLLOWER
//控制是否开始投票
rs.ElecChan = make(chan bool)
rs.HeartBeatChan = make(chan bool)
rs.CusMsg = make(chan string)
rs.resetTimeout()
rs.Nodes = []Addr{
{"127.0.0.1",5000,"5000"},
{"127.0.0.1",5001,"5001"},
{"127.0.0.1",5002,"5002"},
{"127.0.0.1",5003,"5003"},
}
rs.Port = *port

rs.Run()

}
//主节点发送心跳信号给其他节点
func (rs *RaftServer)sendHeartBeat() {
// 每隔1s 发送一次心跳
for {
time.Sleep(1 * time.Second)
if rs.Role == LEADER {
//发送消息
rs.VoteToOther("heat beating")
}
}
}

//通过leader 给其他所有子节点发送数据
func (rs *RaftServer)sendDataToOtherNodes() {
for {
msg :=<-rs.CusMsg
if rs.Role == LEADER {
//发送消息
rs.VoteToOther(msg)

}
}
}

//开启http服务器
func (rs *RaftServer)setHttpServer() {

http.HandleFunc("/req", rs.request)
httpPort := rs.Port + 10
if err:=http.ListenAndServe(":"+strconv.Itoa(httpPort), nil); err == nil {
fmt.Println(err)
}

}
//leader向其他子节点发送数据
func (rs *RaftServer)request(writer http.ResponseWriter, request *http.Request){

request.ParseForm()
if len(request.Form["data"][0]) > 0 {
writer.Write([]byte("ok"))
fmt.Println(request.Form["data"][0])
rs.CusMsg <- request.Form["data"][0]
}

}

运行:
开启4个终端 分别执行

  • go run main.go -p 5000
  • go run main.go -p 5001
  • go run main.go -p 5002
  • go run main.go -p 5003

如果5001成为了leader,在浏览器输入http://127.0.0.1:5011/req?data=XXX
xxx就是leader向子节点发送的数据,浏览器的端口是终端的端口加上10,如果5000是leader,浏览器就是5010.

坚持原创技术分享,您的支持将鼓励我继续创作!