以下は、2017年2月時点の https://github.com/coreos/etcd/raft/doc.go の、kanda.motohiro@gmail.com による抄訳です。Apache License, Version 2.0 の元で配布します。
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
raft パッケージは、raftpb パッケージで定義した Protocol Buffer 形式で、メッセージを送受信します。
Raft は、ノードのクラスタが複製された状態マシンを維持するために使われるプロトコルです。
状態マシンは複製されたログを使うことで、同期された状態を維持します。Raft について詳しくは、"In Search of an Understandable Consensus Algorithm"
(https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout を参照下さい。
訳注。この raft 論文の抄訳あります。
単純な例となるアプリケーション _raftexample_ もあって、このパッケージを実際にどうやって使うかを明らかにする助けとなります。
https://github.com/coreos/etcd/tree/master/contrib/raftexample
使い方
raft における主となるオプジェクトは Node です。あなたは Node をゼロから、raft.StartNode を使って始めるか、何らかの初期状態から、raft.RestartNode を使って始めます。
Node をゼロから始めるには:
storage := raft.NewMemoryStorage()
c := &Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
ノードを以前の状態から再開始するには:
storage := raft.NewMemoryStorage()
// recover the in-memory storage from persistent
// snapshot, state and entries.
storage.ApplySnapshot(snapshot)
storage.SetHardState(state)
storage.Append(entries)
c := &Config{
ID: 0x01,
ElectionTick: 10,
HeartbeatTick: 1,
Storage: storage,
MaxSizePerMsg: 4096,
MaxInflightMsgs: 256,
}
// restart raft without peer information.
// peer information is already included in the storage.
n := raft.RestartNode(c)
さて、あなたは Node を持つことになったので、いくつかの義務があります:
最初に、あなたは Node.Ready() チャネルを読んで、それが含む更新を処理しなくてはいけません。これらのステップは、ステップ2の注意を除いては、並行して実行してもかまいません。
1.HardState, Entries, and Snapshot が空でない時は、それらを永続的ストレージに書きます。インデックスが i の Entry を書くときには、Index >= i を持つ、それ以前に永続化された全てのエントリを捨てなくてはいけないことに注意下さい、
2.To フィールドに指定されているノードに、全ての Message を送ります。最新の HardState がディスクに永続化され、そして以前の Ready バッチによって書かれた全ての Entry が書かれるまで、メッセージは何も送られないことが重要です(メッセージは、同じバッチからのエントリが永続化されている間に送ることはできます)。I/O 遅延を減らすために、リーダーがそのフォロワーと並行してディスクに書くようにするという最適化を使うことができます(Raft 論文の 10.2.1 節で説明してあるように)。メッセージが MsgSnap 型であるなら、それを送った後に、Node.ReportSnapshot() を呼びます(このメッセージは大きいことがあります)。
注意:メッセージのマーシャリングはスレッドセーフではありません。マーシャリングをしている間は、いかなる新しいエントリも永続化しないことを保証するのは重要です。
これを達成する一番簡単な方法は、メッセージをあなたのメインの raft ループ内で直接シリアライズすることです。
3.状態マシンにスナップショット(もしあれば)と CommittedEntries を適用します。
もし、コミットされたエントリが EntryConfChange 型であるなら、Node.ApplyConfChange() を呼んで、それをそのノードに適用します。構成変更は、この時点で、ApplyConfChange を呼ぶ前に NodeID フィールドをゼロにすることでキャンセルできます。
(ただし、いずれにしても、ApplyConfChange は呼ばなくてはいけません。そして、キャンセルするという判断は、状態マシンだけに基づくものであり、ノードの健康状態を観測したなどの外部的な情報によるものではいけません。)
4.Node.Advance() を呼んで、次の更新のバッチを受け入れる準備ができたことを知らせます。
これは、ステップ1の後、いつ行なってもよいです。ただし、全ての更新は、それらが、Ready によって返された順序で処理しなくてはいけません。
2つめに、全ての永続化されたログエントリは、Storage インタフェースの実装を通して、raft パッケージ側から使用可能とならなくてはいけません。提供されている、MemoryStorage 型を、このために使うことができます(あなたは、再開始の時に、その状態を再度充填して下さい)。あるいは、ご自分のディスクベースの実装を提供してもよいです。
3つめに、他のノードからメッセージを受けたら、Node.Step にそれを渡して下さい:
func recvRaftRPC(ctx context.Context, m raftpb.Message) {
n.Step(ctx, m)
}
最後に、あなたは、Node.Tick() を定期的な間隔で呼ぶ必要があります(たぶん、time.Ticker を使って)Raft は二つの重要なタイムアウトを持ちます。ハートビートと選出のタイムアウトです。しかし、raft パッケージの内部では、時間は抽象的な "tick" で表現されます。
状態マシンを処理するループは全体で、こんなふうになるでしょう:
for {
select {
case <-s.Ticker:
n.Tick()
case rd := <-s.Node.Ready():
saveToStorage(rd.State, rd.Entries, rd.Snapshot)
send(rd.Messages)
if !raft.IsEmptySnap(rd.Snapshot) {
processSnapshot(rd.Snapshot)
}
for _, entry := range rd.CommittedEntries {
process(entry)
if entry.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
cc.Unmarshal(entry.Data)
s.Node.ApplyConfChange(cc)
}
}
s.Node.Advance()
case <-s.done:
return
}
}
あなたのノードから状態マシンに変更を提案するには、あなたのアプリケーションのデータを取って、バイトスライスにそれをシリアライズして、以下を呼びます:
n.Propose(ctx, data)
その提案がコミットされたら、データは、raftpb.EntryNormal 型を持つコミット済みエントリ内に現れるでしょう。提案したコマンドがコミットされる保証はありません。タイムアウトの後、また再提案しなくてはいけないかもしれません。
省略
メッセージ型
raft パッケージは、メッセージを Protocol Buffer 形式(raftpb パッケージで定義されます)で送受信します。それぞれの状態(フォロワー、候補者、リーダー)は、指定された raftpb.Message を持って進行する時に、自分自身の 'step' メソッド ('stepFollower', 'stepCandidate', 'stepLeader') を実装しています。それぞれのステップは、その raftpb.MessageType で決まります。なお、全てのステップは、一つの共通メソッドである 'Step' によってチェックされ、それが、ノードと受信されたメッセージのタームを安全のためにチェックして、古いログエントリを防ぐことに注意下さい。
'MsgHup' は、選出に使われます。
省略
*/