Skip to content

Commit

Permalink
Adds protocol versioning of RPCs and new transitional protocol versio…
Browse files Browse the repository at this point in the history
…n 1.
James Phillips committed Jul 25, 2016
1 parent 3d2949a commit e648f4e
Showing 5 changed files with 130 additions and 51 deletions.
21 changes: 9 additions & 12 deletions api.go
Original file line number Diff line number Diff line change
@@ -206,7 +206,7 @@ func BootstrapCluster(conf *Config, logs LogStore, stable StableStore,
Index: 1,
Term: 1,
}
if conf.ProtocolVersion < 1 {
if conf.ProtocolVersion < 2 {
entry.Type = LogRemovePeerDeprecated
entry.Data = encodePeers(configuration, trans)
} else {
@@ -258,7 +258,7 @@ func RecoverCluster(conf *Config, logs LogStore, trans Transport,
Index: fakeIndex,
Term: lastLog.Term,
}
if conf.ProtocolVersion < 1 {
if conf.ProtocolVersion < 2 {
entry.Type = LogRemovePeerDeprecated
entry.Data = encodePeers(configuration, trans)
} else {
@@ -318,13 +318,10 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
localAddr := ServerAddress(trans.LocalAddr())
localID := conf.LocalID

// TODO (slackpad) - When we deprecate protocol version 0, remove this
// TODO (slackpad) - When we deprecate protocol version 1, remove this
// along with the AddPeer() and RemovePeer() APIs.
if protocolVersion < 1 || localID == "" {
// During the transition to the new ID system, keep this as an
// INFO level message. Once the new scheme has been out for a
// while, change this to a deprecation WARN message.
logger.Printf("[INFO] raft: No server ID given, or ProtocolVersion < 1, using network address as server ID: %v",
if protocolVersion < 2 {
logger.Printf("[INFO] raft: Configured ProtocolVersion < 2, using network address as LocalID: %v",
localAddr)
localID = ServerID(localAddr)
}
@@ -559,7 +556,7 @@ func (r *Raft) GetConfiguration() (Configuration, uint64, error) {
// AddPeer (deprecated) is used to add a new peer into the cluster. This must be
// run on the leader or it will fail. Use AddVoter/AddNonvoter instead.
func (r *Raft) AddPeer(peer ServerAddress) Future {
if r.protocolVersion > 0 {
if r.protocolVersion > 1 {
return errorFuture{ErrUnsupportedProtocol}
}

@@ -576,7 +573,7 @@ func (r *Raft) AddPeer(peer ServerAddress) Future {
// to occur. This must be run on the leader or it will fail.
// Use RemoveServer instead.
func (r *Raft) RemovePeer(peer ServerAddress) Future {
if r.protocolVersion > 0 {
if r.protocolVersion > 1 {
return errorFuture{ErrUnsupportedProtocol}
}

@@ -614,7 +611,7 @@ func (r *Raft) AddVoter(id ServerID, address ServerAddress, prevIndex uint64, ti
// a staging server or voter, this does nothing. This must be run on the leader
// or it will fail. For prevIndex and timeout, see AddVoter.
func (r *Raft) AddNonvoter(id ServerID, address ServerAddress, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 1 {
if r.protocolVersion < 2 {
return errorFuture{ErrUnsupportedProtocol}
}

@@ -647,7 +644,7 @@ func (r *Raft) RemoveServer(id ServerID, prevIndex uint64, timeout time.Duration
// does nothing. This must be run on the leader or it will fail. For prevIndex
// and timeout, see AddVoter.
func (r *Raft) DemoteVoter(id ServerID, prevIndex uint64, timeout time.Duration) IndexFuture {
if r.protocolVersion < 1 {
if r.protocolVersion < 2 {
return errorFuture{ErrUnsupportedProtocol}
}

52 changes: 45 additions & 7 deletions config.go
Original file line number Diff line number Diff line change
@@ -11,17 +11,55 @@ import (
// well as Raft-specific log entries) that this server can _understand_. Use
// the ProtocolVersion member of the Config object to control the version of
// the protocol to use when _speaking_ to other servers. This is not currently
// written into snapshots so they are unversioned.
// written into snapshots so they are unversioned. Note that depending on the
// protocol version being spoken, some otherwise understood RPC messages may be
// refused. See isVersionCompatible for details of this logic.
//
// There are notes about the upgrade path in the description of the versions
// below. If you are starting a fresh cluster then there's no reason not to
// jump right to the latest protocol version. If you need to interoperate with
// older, unversioned Raft servers you'll need to drive the cluster through the
// different versions in order. This may best be done by moving forward across a
// series of releases of your application.
//
// Version History
//
// 0: Unversioned original protocol spoken until Q2 2016.
// 1: Added server IDs and a new peer change mechanism via a new LogConfiguration
// log entry type. All servers must be running >= 1 in order to support new
// staging and nonvoter modes for servers.
// 0: Unversioned, original protocol, used to interoperate with unversioned
// Raft servers. Under this version all configuration changes are propagated
// using the deprecated RemovePeerDeprecated Raft log entry. This means that
// server IDs are always set to be the same as the server addresses (since
// the old log entry type cannot transmit an ID), and only AddPeer/RemovePeer
// APIs are supported. This version can understand the new LogConfiguration
// Raft log entry but it will never generate one.
// 1: Transitional protocol used when migrating an existing cluster to the new
// server ID system. Server IDs are still set to be the same as server
// addresses, but all configuration changes are propagated using the new
// LogConfiguration Raft log entry type, which can carry full ID information.
// This version supports the old AddPeer/RemovePeer APIs as well as the new
// ID-based AddVoter/RemoveServer APIs which should be used when adding
// version 2 servers to the cluster later. Note that non-voting roles are not
// yet supported. This version sheds all interoperability with unversioned
// Raft servers, but can interoperate with newer Raft servers running with
// protocol version 0 since they can understand the new LogConfiguration Raft
// log entry, and this version can still understand their RemovePeerDeprecated
// Raft log entries. We need this protocol version as an intermediate step
// between 0 and 2 so that servers will propagate the ID information that
// will come from newly-added (or -cycled) servers using protocol version 2,
// but since they are still using their address-based IDs from the previous
// step they will still be able to track commitments and their own voting
// status properly. If we skipped this step, servers would be started with
// their new IDs, but they wouldn't see themselves in the old address-based
// configuration, so none of the servers would think they had a vote.
// 2: Protocol adding full support for server IDs and new ID-based server APIs
// (AddVoter, AddNonvoter, etc.), old AddPeer/RemovePeer APIs are no longer
// supported. Version 1 servers should be swapped out by removing them from
// the cluster one-by-one and re-adding them with updated configuration for
// this protocol version, along with their server ID. The remove/add cycle
// is required to populate their server ID. Note that removing must be done
// by ID, which will be the old server's address.
const (
ProtocolVersionMin = 0
ProtocolVersionMax = 1
ProtocolVersionMax = 2
)

// Config provides any necessary configuration to
@@ -131,7 +169,7 @@ func ValidateConfig(config *Config) error {
config.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
}
if config.ProtocolVersion > 0 && len(config.LocalID) == 0 {
return fmt.Errorf("LocalID must be supplied with protocol version > 0")
return fmt.Errorf("LocalID cannot be empty")
}
if config.HeartbeatTimeout < 5*time.Millisecond {
return fmt.Errorf("Heartbeat timeout is too low")
29 changes: 24 additions & 5 deletions raft.go
Original file line number Diff line number Diff line change
@@ -21,13 +21,33 @@ var (
)

// getVersionInfo returns an initialized VersionInfo struct for the given
// Raft instance.
// Raft instance. This structure is sent along with RPC requests and
// responses.
func (r *Raft) getVersionInfo() VersionInfo {
return VersionInfo{
ProtocolVersion: r.conf.ProtocolVersion,
}
}

// isVersionCompatible houses logic about whether this instance of Raft can
// process an RPC message with the given version.
func (r *Raft) isVersionCompatible(vi VersionInfo) bool {
// First check is to just make sure the code can understand the
// protocol at all.
if vi.ProtocolVersion < ProtocolVersionMin ||
vi.ProtocolVersion > ProtocolVersionMax {
return false
}

// Second check is whether we should support this message, given the
// current protocol we are configured to run. This will drop support
// for protocol version 0 starting at protocol version 2, which is
// currently what we want, and in general support one version back. We
// may need to revisit this policy depending on how future protocol
// changes evolve.
return vi.ProtocolVersion > r.conf.ProtocolVersion-1
}

// commitTuple is used to send an index that was committed,
// with an optional associated future that should be invoked.
type commitTuple struct {
@@ -635,7 +655,7 @@ func (r *Raft) appendConfigurationEntry(future *configurationChangeFuture) {
}

r.logger.Printf("[INFO] raft: Updating configuration with %s (%v, %v) to %v",
future.req.command, future.req.serverAddress, future.req.serverID, configuration)
future.req.command, future.req.serverID, future.req.serverAddress, configuration)

// In pre-ID compatibility mode we translate all configuration changes
// in to an old remove peer message, which can handle all supported
@@ -779,9 +799,8 @@ func (r *Raft) processLog(l *Log, future *logFuture) {
// called from the main thread.
func (r *Raft) processRPC(rpc RPC) {
if v, ok := rpc.Command.(WithVersionInfo); ok {
pv := v.GetVersionInfo().ProtocolVersion
if pv < ProtocolVersionMin || pv > ProtocolVersionMax {
r.logger.Printf("[ERR] raft: Ignoring unsupported protocol version %d for command: %#v", pv, rpc.Command)
if !r.isVersionCompatible(v.GetVersionInfo()) {
r.logger.Printf("[ERR] raft: Ignoring unsupported %#v for command: %#v", v, rpc.Command)
rpc.Respond(nil, ErrUnsupportedProtocol)
return
}
78 changes: 51 additions & 27 deletions raft_test.go
Original file line number Diff line number Diff line change
@@ -1958,6 +1958,7 @@ func TestRaft_Voting(t *testing.T) {
ldrT := c.trans[c.IndexOf(ldr)]

reqVote := RequestVoteRequest{
VersionInfo: ldr.getVersionInfo(),
Term: ldr.getCurrentTerm() + 10,
Candidate: ldrT.EncodePeer(ldr.localAddr),
LastLogIndex: ldr.LastIndex(),
@@ -1998,29 +1999,37 @@ func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) {
LastLogTerm: ldr.getCurrentTerm(),
}

// Try the vote and make sure the request gets an error back.
// Reject a message from a future version we don't understand.
var resp RequestVoteResponse
err := ldrT.RequestVote(followers[0].localAddr, &reqVote, &resp)
if err == nil || !strings.Contains(err.Error(), "protocol version") {
c.FailNowf("[ERR] expected RPC to get rejected: %v", err)
}

// Reject a message that's too old.
reqVote.VersionInfo.ProtocolVersion = followers[0].protocolVersion - 2
err = ldrT.RequestVote(followers[0].localAddr, &reqVote, &resp)
if err == nil || !strings.Contains(err.Error(), "protocol version") {
c.FailNowf("[ERR] expected RPC to get rejected: %v", err)
}
}

func TestRaft_ProtocolVersion_0(t *testing.T) {
// Make a cluster on the old protocol.
func TestRaft_ProtocolVersion_Upgrade_0_1(t *testing.T) {
// Make a cluster back on protocol version 0.
conf := inmemConfig(t)
conf.ProtocolVersion = 0
c := MakeCluster(2, t, conf)
defer c.Close()

// Set up another server, also speaking the old protocol.
// Set up another server speaking protocol version 1.
conf.ProtocolVersion = 1
c1 := MakeClusterNoBootstrap(1, t, conf)

// Merge clusters.
c.Merge(c1)
c.FullyConnect()

// Make sure the new id-based operations aren't supported in the old
// Make sure the new ID-based operations aren't supported in the old
// protocol.
future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second)
if err := future.Error(); err != ErrUnsupportedProtocol {
@@ -2039,38 +2048,53 @@ func TestRaft_ProtocolVersion_0(t *testing.T) {
c.FailNowf("[ERR] err: %v", err)
}

// Now do the join using the old address-based API (this returns a
// different type of future).
{
future := c.Leader().AddPeer(c1.rafts[0].localAddr)
if err := future.Error(); err != nil {
c.FailNowf("[ERR] err: %v", err)
}
// Now do the join using the old address-based API.
if future := c.Leader().AddPeer(c1.rafts[0].localAddr); future.Error() != nil {
c.FailNowf("[ERR] err: %v", future.Error())
}

// Set up another server, this time speaking the new protocol.
c2 := MakeClusterNoBootstrap(1, t, nil)
// Sanity check the cluster.
c.EnsureSame(t)
c.EnsureSamePeers(t)
c.EnsureLeader(t, c.Leader().localAddr)

// Now do the remove using the old address-based API.
if future := c.Leader().RemovePeer(c1.rafts[0].localAddr); future.Error() != nil {
c.FailNowf("[ERR] err: %v", future.Error())
}
}

func TestRaft_ProtocolVersion_Upgrade_1_2(t *testing.T) {
// Make a cluster back on protocol version 1.
conf := inmemConfig(t)
conf.ProtocolVersion = 1
c := MakeCluster(2, t, conf)
defer c.Close()
oldAddr := c.Followers()[0].localAddr

// Set up another server speaking protocol version 2.
conf.ProtocolVersion = 2
c1 := MakeClusterNoBootstrap(1, t, conf)

// Merge this one in.
c.Merge(c2)
// Merge clusters.
c.Merge(c1)
c.FullyConnect()

// Make sure this can join in and inter-operate with the old servers.
{
future := c.Leader().AddPeer(c2.rafts[0].localAddr)
if err := future.Error(); err != nil {
c.FailNowf("[ERR] err: %v", err)
}
// Use the new ID-based API to add the server with its ID.
future := c.Leader().AddVoter(c1.rafts[0].localID, c1.rafts[0].localAddr, 0, 1*time.Second)
if err := future.Error(); err != nil {
c.FailNowf("[ERR] err: %v", err)
}

// Check the FSMs
// Sanity check the cluster.
c.EnsureSame(t)

// Check the peers
c.EnsureSamePeers(t)

// Ensure one leader
c.EnsureLeader(t, c.Leader().localAddr)

// Remove an old server using the old address-based API.
if future := c.Leader().RemovePeer(oldAddr); future.Error() != nil {
c.FailNowf("[ERR] err: %v", future.Error())
}
}

// TODO: These are test cases we'd like to write for appendEntries().
1 change: 1 addition & 0 deletions replication.go
Original file line number Diff line number Diff line change
@@ -275,6 +275,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {

// Setup the request
req := InstallSnapshotRequest{
VersionInfo: r.getVersionInfo(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
LastLogIndex: meta.Index,

0 comments on commit e648f4e

Please sign in to comment.