Skip to content

Commit

Permalink
Sets up for a more generic RPC header.
Browse files Browse the repository at this point in the history
This can later be the place where we put a cluster ID.
  • Loading branch information
slackpad committed Jul 30, 2016
1 parent 25fb028 commit d20f436
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 59 deletions.
63 changes: 32 additions & 31 deletions commands.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
package raft

// VersionInfo is a common sub-structure used to pass along
// protocol version information. For older Raft implementations before
// versioning was added this will default to protocol version 0.
type VersionInfo struct {
// RPCHeader is a common sub-structure used to pass along protocol version and
// other information about the cluster. For older Raft implementations before
// versioning was added this will default to a zero-valued structure when read
// by newer Raft versions.
type RPCHeader struct {
// ProtocolVersion is the version of the protocol the sender is
// speaking.
ProtocolVersion int
}

// WithVersionInfo is an interface that exposes version info.
type WithVersionInfo interface {
GetVersionInfo() VersionInfo
// WithRPCHeader is an interface that exposes the RPC header.
type WithRPCHeader interface {
GetRPCHeader() RPCHeader
}

// AppendEntriesRequest is the command used to append entries to the
// replicated log.
type AppendEntriesRequest struct {
VersionInfo
RPCHeader

// Provide the current term and leader
Term uint64
Expand All @@ -34,15 +35,15 @@ type AppendEntriesRequest struct {
LeaderCommitIndex uint64
}

// See WithVersionInfo.
func (r *AppendEntriesRequest) GetVersionInfo() VersionInfo {
return r.VersionInfo
// See WithRPCHeader.
func (r *AppendEntriesRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// AppendEntriesResponse is the response returned from an
// AppendEntriesRequest.
type AppendEntriesResponse struct {
VersionInfo
RPCHeader

// Newer term if leader is out of date
Term uint64
Expand All @@ -58,15 +59,15 @@ type AppendEntriesResponse struct {
NoRetryBackoff bool
}

// See WithVersionInfo.
func (r *AppendEntriesResponse) GetVersionInfo() VersionInfo {
return r.VersionInfo
// See WithRPCHeader.
func (r *AppendEntriesResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// RequestVoteRequest is the command used by a candidate to ask a Raft peer
// for a vote in an election.
type RequestVoteRequest struct {
VersionInfo
RPCHeader

// Provide the term and our id
Term uint64
Expand All @@ -77,14 +78,14 @@ type RequestVoteRequest struct {
LastLogTerm uint64
}

// See WithVersionInfo.
func (r *RequestVoteRequest) GetVersionInfo() VersionInfo {
return r.VersionInfo
// See WithRPCHeader.
func (r *RequestVoteRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// RequestVoteResponse is the response returned from a RequestVoteRequest.
type RequestVoteResponse struct {
VersionInfo
RPCHeader

// Newer term if leader is out of date
Term uint64
Expand All @@ -93,15 +94,15 @@ type RequestVoteResponse struct {
Granted bool
}

// See WithVersionInfo.
func (r *RequestVoteResponse) GetVersionInfo() VersionInfo {
return r.VersionInfo
// See WithRPCHeader.
func (r *RequestVoteResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// InstallSnapshotRequest is the command sent to a Raft peer to bootstrap its
// log (and state machine) from a snapshot on another peer.
type InstallSnapshotRequest struct {
VersionInfo
RPCHeader

Term uint64
Leader []byte
Expand All @@ -124,21 +125,21 @@ type InstallSnapshotRequest struct {
Size int64
}

// See WithVersionInfo.
func (r *InstallSnapshotRequest) GetVersionInfo() VersionInfo {
return r.VersionInfo
// See WithRPCHeader.
func (r *InstallSnapshotRequest) GetRPCHeader() RPCHeader {
return r.RPCHeader
}

// InstallSnapshotResponse is the response returned from an
// InstallSnapshotRequest.
type InstallSnapshotResponse struct {
VersionInfo
RPCHeader

Term uint64
Success bool
}

// See WithVersionInfo.
func (r *InstallSnapshotResponse) GetVersionInfo() VersionInfo {
return r.VersionInfo
// See WithRPCHeader.
func (r *InstallSnapshotResponse) GetRPCHeader() RPCHeader {
return r.RPCHeader
}
40 changes: 20 additions & 20 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ var (
keyLastVoteCand = []byte("LastVoteCand")
)

// getVersionInfo returns an initialized VersionInfo struct for the given
// getRPCHeader returns an initialized RPCHeader struct for the given
// Raft instance. This structure is sent along with RPC requests and
// responses.
func (r *Raft) getVersionInfo() VersionInfo {
return VersionInfo{
func (r *Raft) getRPCHeader() RPCHeader {
return RPCHeader{
ProtocolVersion: r.conf.ProtocolVersion,
}
}

// isVersionCompatible houses logic about whether this instance of Raft can
// process an RPC message with the given version.
func (r *Raft) isVersionCompatible(vi VersionInfo) bool {
// isCompatible houses logic about whether this instance of Raft can process
// an RPC message with the given header.
func (r *Raft) isCompatible(header RPCHeader) bool {
// First check is to just make sure the code can understand the
// protocol at all.
if vi.ProtocolVersion < ProtocolVersionMin ||
vi.ProtocolVersion > ProtocolVersionMax {
if header.ProtocolVersion < ProtocolVersionMin ||
header.ProtocolVersion > ProtocolVersionMax {
return false
}

Expand All @@ -45,7 +45,7 @@ func (r *Raft) isVersionCompatible(vi VersionInfo) bool {
// currently what we want, and in general support one version back. We
// may need to revisit this policy depending on how future protocol
// changes evolve.
return vi.ProtocolVersion >= r.conf.ProtocolVersion-1
return header.ProtocolVersion >= r.conf.ProtocolVersion-1
}

// commitTuple is used to send an index that was committed,
Expand Down Expand Up @@ -798,9 +798,9 @@ func (r *Raft) processLog(l *Log, future *logFuture) {
// processRPC is called to handle an incoming RPC request. This must only be
// called from the main thread.
func (r *Raft) processRPC(rpc RPC) {
if v, ok := rpc.Command.(WithVersionInfo); ok {
if !r.isVersionCompatible(v.GetVersionInfo()) {
r.logger.Printf("[ERR] raft: Ignoring unsupported %#v for command: %#v", v, rpc.Command)
if wh, ok := rpc.Command.(WithRPCHeader); ok {
if !r.isCompatible(wh.GetRPCHeader()) {
r.logger.Printf("[ERR] raft: Ignoring unsupported RPC %#v for command: %#v", wh, rpc.Command)
rpc.Respond(nil, ErrUnsupportedProtocol)
return
}
Expand Down Expand Up @@ -853,7 +853,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
defer metrics.MeasureSince([]string{"raft", "rpc", "appendEntries"}, time.Now())
// Setup a response
resp := &AppendEntriesResponse{
VersionInfo: r.getVersionInfo(),
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
LastLog: r.getLastIndex(),
Success: false,
Expand Down Expand Up @@ -1006,9 +1006,9 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {

// Setup a response
resp := &RequestVoteResponse{
VersionInfo: r.getVersionInfo(),
Term: r.getCurrentTerm(),
Granted: false,
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Granted: false,
}
var rpcErr error
defer func() {
Expand Down Expand Up @@ -1224,7 +1224,7 @@ func (r *Raft) electSelf() <-chan *voteResult {
// Construct the request
lastIdx, lastTerm := r.getLastEntry()
req := &RequestVoteRequest{
VersionInfo: r.getVersionInfo(),
RPCHeader: r.getRPCHeader(),
Term: r.getCurrentTerm(),
Candidate: r.trans.EncodePeer(r.localAddr),
LastLogIndex: lastIdx,
Expand Down Expand Up @@ -1258,9 +1258,9 @@ func (r *Raft) electSelf() <-chan *voteResult {
// Include our own vote
respCh <- &voteResult{
RequestVoteResponse: RequestVoteResponse{
VersionInfo: r.getVersionInfo(),
Term: req.Term,
Granted: true,
RPCHeader: r.getRPCHeader(),
Term: req.Term,
Granted: true,
},
voterID: r.localID,
}
Expand Down
6 changes: 3 additions & 3 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2107,7 +2107,7 @@ func TestRaft_Voting(t *testing.T) {
ldrT := c.trans[c.IndexOf(ldr)]

reqVote := RequestVoteRequest{
VersionInfo: ldr.getVersionInfo(),
RPCHeader: ldr.getRPCHeader(),
Term: ldr.getCurrentTerm() + 10,
Candidate: ldrT.EncodePeer(ldr.localAddr),
LastLogIndex: ldr.LastIndex(),
Expand Down Expand Up @@ -2139,7 +2139,7 @@ func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) {
ldrT := c.trans[c.IndexOf(ldr)]

reqVote := RequestVoteRequest{
VersionInfo: VersionInfo{
RPCHeader: RPCHeader{
ProtocolVersion: ProtocolVersionMax + 1,
},
Term: ldr.getCurrentTerm() + 10,
Expand All @@ -2156,7 +2156,7 @@ func TestRaft_ProtocolVersion_RejectRPC(t *testing.T) {
}

// Reject a message that's too old.
reqVote.VersionInfo.ProtocolVersion = followers[0].protocolVersion - 2
reqVote.RPCHeader.ProtocolVersion = followers[0].protocolVersion - 2
err = ldrT.RequestVote(followers[0].localAddr, &reqVote, &resp)
if err == nil || !strings.Contains(err.Error(), "protocol version") {
c.FailNowf("[ERR] expected RPC to get rejected: %v", err)
Expand Down
10 changes: 5 additions & 5 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {

// Setup the request
req := InstallSnapshotRequest{
VersionInfo: r.getVersionInfo(),
RPCHeader: r.getRPCHeader(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
LastLogIndex: meta.Index,
Expand Down Expand Up @@ -329,9 +329,9 @@ func (r *Raft) sendLatestSnapshot(s *followerReplication) (bool, error) {
func (r *Raft) heartbeat(s *followerReplication, stopCh chan struct{}) {
var failures uint64
req := AppendEntriesRequest{
VersionInfo: r.getVersionInfo(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
RPCHeader: r.getRPCHeader(),
Term: s.currentTerm,
Leader: r.trans.EncodePeer(r.localAddr),
}
var resp AppendEntriesResponse
for {
Expand Down Expand Up @@ -473,7 +473,7 @@ func (r *Raft) pipelineDecode(s *followerReplication, p AppendPipeline, stopCh,

// setupAppendEntries is used to setup an append entries request.
func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequest, nextIndex, lastIndex uint64) error {
req.VersionInfo = r.getVersionInfo()
req.RPCHeader = r.getRPCHeader()
req.Term = s.currentTerm
req.Leader = r.trans.EncodePeer(r.localAddr)
req.LeaderCommitIndex = r.getCommitIndex()
Expand Down

0 comments on commit d20f436

Please sign in to comment.