Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: Fix infinite election loop #1310

Merged
merged 2 commits into from
Aug 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 64 additions & 18 deletions manager/state/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,18 +87,18 @@ type Node struct {
StateDir string
Error error

raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
wasLeader bool
restored bool
isMember uint32
joinAddr string
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
restored bool
signalledLeadership uint32
isMember uint32
joinAddr string

// waitProp waits for all the proposals to be terminated before
// shutting down the node.
Expand Down Expand Up @@ -329,6 +329,8 @@ func (n *Node) Run(ctx context.Context) error {
close(n.doneCh)
}()

wasLeader := false

for {
select {
case <-n.ticker.C():
Expand Down Expand Up @@ -389,12 +391,23 @@ func (n *Node) Run(ctx context.Context) error {
// if that happens we will apply them as any
// follower would.
if rd.SoftState != nil {
if n.wasLeader && rd.SoftState.RaftState != raft.StateLeader {
n.wasLeader = false
if wasLeader && rd.SoftState.RaftState != raft.StateLeader {
wasLeader = false
n.wait.cancelAll()
n.leadershipBroadcast.Write(IsFollower)
} else if !n.wasLeader && rd.SoftState.RaftState == raft.StateLeader {
n.wasLeader = true
if atomic.LoadUint32(&n.signalledLeadership) == 1 {
atomic.StoreUint32(&n.signalledLeadership, 0)
n.leadershipBroadcast.Write(IsFollower)
}
} else if !wasLeader && rd.SoftState.RaftState == raft.StateLeader {
wasLeader = true
}
}

if wasLeader && atomic.LoadUint32(&n.signalledLeadership) != 1 {
// If all the entries in the log have become
// committed, broadcast our leadership status.
if n.caughtUp() {
atomic.StoreUint32(&n.signalledLeadership, 1)
n.leadershipBroadcast.Write(IsLeader)
}
}
Expand Down Expand Up @@ -506,6 +519,19 @@ func (n *Node) Leader() uint64 {
return n.Node.Status().Lead
}

// ReadyForProposals returns true if the node has broadcasted a message
// saying that it has become the leader. This means it is ready to accept
// proposals.
func (n *Node) ReadyForProposals() bool {
return atomic.LoadUint32(&n.signalledLeadership) == 1
}

func (n *Node) caughtUp() bool {
// obnoxious function that always returns a nil error
lastIndex, _ := n.raftStore.LastIndex()
return n.appliedIndex >= lastIndex
}

// Join asks to a member of the raft to propose
// a configuration change and add us as a member thus
// beginning the log replication process. This method
Expand Down Expand Up @@ -719,12 +745,24 @@ func (n *Node) RemoveMember(ctx context.Context, id uint64) error {
// raft state machine with the provided message on the
// receiving node
func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessageRequest) (*api.ProcessRaftMessageResponse, error) {
if msg == nil || msg.Message == nil {
return nil, grpc.Errorf(codes.InvalidArgument, "no message provided")
}

// Don't process the message if this comes from
// a node in the remove set
if n.cluster.IsIDRemoved(msg.Message.From) {
return nil, ErrMemberRemoved
}

if msg.Message.Type == raftpb.MsgProp {
// We don't accepted forwarded proposals. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
return nil, grpc.Errorf(codes.InvalidArgument, "proposals not accepted")
}

// can't stop the raft node while an async RPC is in progress
n.stopMu.RLock()
defer n.stopMu.RUnlock()
Expand Down Expand Up @@ -982,6 +1020,14 @@ func (n *Node) send(messages []raftpb.Message) error {
continue
}

if m.Type == raftpb.MsgProp {
// We don't forward proposals to the leader. Our
// current architecture depends on only the leader
// making proposals, so in-flight proposals can be
// guaranteed not to conflict.
continue
}

n.asyncTasks.Add(1)
go n.sendToMember(members, m)
}
Expand Down Expand Up @@ -1093,7 +1139,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
ch := n.wait.register(r.ID, cb)

// Do this check after calling register to avoid a race.
if !n.IsLeader() {
if atomic.LoadUint32(&n.signalledLeadership) != 1 {
n.wait.cancel(r.ID)
return nil, ErrLostLeadership
}
Expand Down
9 changes: 9 additions & 0 deletions manager/state/raft/testutils/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func WaitForCluster(t *testing.T, clockSource *fakeclock.FakeClock, nodes map[ui
if cur.Lead != prev.Lead || cur.Term != prev.Term || cur.Applied != prev.Applied {
return errors.New("state does not match on all nodes")
}
if !n2.ReadyForProposals() {
return errors.New("leader not ready")
}
continue nodeLoop
}
}
Expand Down Expand Up @@ -254,8 +257,14 @@ func NewInitNode(t *testing.T, tc *cautils.TestCA, raftConfig *api.RaftConfig, o
err := n.Node.JoinAndStart()
require.NoError(t, err, "can't join cluster")

leadershipCh, cancel := n.SubscribeLeadership()
defer cancel()

go n.Run(ctx)

// Wait for the node to become the leader.
<-leadershipCh

if raftConfig != nil {
assert.NoError(t, n.MemoryStore().Update(func(tx store.Tx) error {
return store.CreateCluster(tx, &api.Cluster{
Expand Down