From 3e5c3e446cc2548ab1c48a031df5d1f1c8460743 Mon Sep 17 00:00:00 2001 From: Jason Yellick Date: Fri, 7 Jul 2017 15:18:43 -0400 Subject: [PATCH] [FAB-5266] Replace Enqueue with Order/Configure The current consenter interface only allows for one sort of message ingress. All messages are received via 'Enqueue', and treated identically. In order for the consenter to be able to differentiate and handle config vs non-config messages differently, the consenter needs two diferent ingress points for messages. This CR replaces the Enqueue method with two methods: Order and Configure. For the time being, these methods behave exactly as Enqueue, but will be leveraged in future CRs. Change-Id: I3701e5e3c0de4833a455c49acebbad70c6ed763c Signed-off-by: Jason Yellick --- orderer/common/broadcast/broadcast.go | 22 +++++++++++++++++--- orderer/common/broadcast/broadcast_test.go | 22 +++++++++++++------- orderer/common/multichannel/chainsupport.go | 11 +++++++--- orderer/common/multichannel/manager_test.go | 6 +++--- orderer/common/multichannel/util_test.go | 9 ++++++-- orderer/consensus/consensus.go | 23 +++++++++++++++++++-- orderer/consensus/kafka/chain.go | 16 ++++++++++++-- orderer/consensus/kafka/chain_test.go | 18 ++++++++-------- orderer/consensus/kafka/consenter_test.go | 2 +- orderer/consensus/solo/consensus.go | 17 ++++++++++----- orderer/consensus/solo/consensus_test.go | 10 ++++----- 11 files changed, 114 insertions(+), 42 deletions(-) diff --git a/orderer/common/broadcast/broadcast.go b/orderer/common/broadcast/broadcast.go index e023366e669..b62e3d010d8 100644 --- a/orderer/common/broadcast/broadcast.go +++ b/orderer/common/broadcast/broadcast.go @@ -53,8 +53,13 @@ type SupportManager interface { // Support provides the backing resources needed to support broadcast on a chain type Support interface { - // Enqueue accepts a message and returns true on acceptance, or false on shutdown - Enqueue(env *cb.Envelope) bool + // Order accepts a message or returns an error indicating the cause of failure + // It ultimately passes through to the consensus.Chain interface + Order(env *cb.Envelope, configSeq uint64) error + + // Configure accepts a reconfiguration or returns an error indicating the cause of failure + // It ultimately passes through to the consensus.Chain interface + Configure(configUpdateMsg *cb.Envelope, config *cb.Envelope, configSeq uint64) error // Filters returns the set of broadcast filters for this chain Filters() *filter.RuleSet @@ -102,6 +107,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } + isConfig := false + configUpdateMsg := msg if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) { logger.Debugf("Preprocessing CONFIG_UPDATE") msg, err = bh.sm.Process(msg) @@ -126,6 +133,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)") return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR}) } + + isConfig = true } support, ok := bh.sm.GetChain(chdr.ChannelId) @@ -144,7 +153,14 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error { return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST}) } - if !support.Enqueue(msg) { + // XXX temporary hack to mesh interface definitions, will remove. + if isConfig { + err = support.Configure(configUpdateMsg, msg, 0) + } else { + err = support.Order(msg, 0) + } + + if err != nil { return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE}) } diff --git a/orderer/common/broadcast/broadcast_test.go b/orderer/common/broadcast/broadcast_test.go index 8b814888676..5f147dfb60f 100644 --- a/orderer/common/broadcast/broadcast_test.go +++ b/orderer/common/broadcast/broadcast_test.go @@ -127,9 +127,17 @@ func (ms *mockSupport) Filters() *filter.RuleSet { return ms.filters } -// Enqueue sends a message for ordering -func (ms *mockSupport) Enqueue(env *cb.Envelope) bool { - return !ms.rejectEnqueue +// Order sends a message for ordering +func (ms *mockSupport) Order(env *cb.Envelope, configSeq uint64) error { + if ms.rejectEnqueue { + return fmt.Errorf("Reject") + } + return nil +} + +// Configure sends a reconfiguration message for ordering +func (ms *mockSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error { + return ms.Order(config, configSeq) } func makeConfigMessage(chainID string) *cb.Envelope { @@ -264,9 +272,9 @@ func TestGoodConfigUpdate(t *testing.T) { m := newMockB() defer close(m.recvChan) go bh.Handle(m) - newChannelId := "New Chain" + newChannelID := "New Chain" - m.recvChan <- makeConfigMessage(newChannelId) + m.recvChan <- makeConfigMessage(newChannelID) reply := <-m.sendChan assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE") } @@ -301,9 +309,9 @@ func TestRejected(t *testing.T) { defer close(m.recvChan) go bh.Handle(m) - newChannelId := "New Chain" + newChannelID := "New Chain" - m.recvChan <- makeConfigMessage(newChannelId) + m.recvChan <- makeConfigMessage(newChannelID) reply := <-m.sendChan assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected CONFIG_UPDATE") } diff --git a/orderer/common/multichannel/chainsupport.go b/orderer/common/multichannel/chainsupport.go index 2a4e85d8edc..cd6a7f33ebb 100644 --- a/orderer/common/multichannel/chainsupport.go +++ b/orderer/common/multichannel/chainsupport.go @@ -197,9 +197,14 @@ func (cs *ChainSupport) Reader() ledger.Reader { return cs.ledger } -// Enqueue takes a message and sends it to the consenter for ordering. -func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool { - return cs.chain.Enqueue(env) +// Order passes through to the Consenter implementation. +func (cs *ChainSupport) Order(env *cb.Envelope, configSeq uint64) error { + return cs.chain.Order(env, configSeq) +} + +// Configure passes through to the Consenter implementation. +func (cs *ChainSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error { + return cs.chain.Configure(configUpdate, config, configSeq) } // Errored returns whether the backing consenter has errored diff --git a/orderer/common/multichannel/manager_test.go b/orderer/common/multichannel/manager_test.go index cf7ebc4fa8b..d06c3d8b91a 100644 --- a/orderer/common/multichannel/manager_test.go +++ b/orderer/common/multichannel/manager_test.go @@ -178,7 +178,7 @@ func TestManagerImpl(t *testing.T) { } for _, message := range messages { - chainSupport.Enqueue(message) + chainSupport.Order(message, 0) } it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}}) @@ -489,7 +489,7 @@ func TestNewChain(t *testing.T) { chainSupport, ok := manager.GetChain(manager.SystemChannelID()) assert.True(t, ok, "Could not find system channel") - chainSupport.Enqueue(wrapped) + chainSupport.Configure(wrapped, wrapped, 0) func() { it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}}) defer it.Close() @@ -521,7 +521,7 @@ func TestNewChain(t *testing.T) { } for _, message := range messages { - chainSupport.Enqueue(message) + chainSupport.Order(message, 0) } it, _ := chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}}) diff --git a/orderer/common/multichannel/util_test.go b/orderer/common/multichannel/util_test.go index 8f8f59979f4..dc448e9db9f 100644 --- a/orderer/common/multichannel/util_test.go +++ b/orderer/common/multichannel/util_test.go @@ -54,9 +54,14 @@ func (mch *mockChain) Errored() <-chan struct{} { return nil } -func (mch *mockChain) Enqueue(env *cb.Envelope) bool { +func (mch *mockChain) Order(env *cb.Envelope, configSeq uint64) error { mch.queue <- env - return true + return nil +} + +func (mch *mockChain) Configure(configUpdate, config *cb.Envelope, configSeq uint64) error { + mch.queue <- config + return nil } func (mch *mockChain) Start() { diff --git a/orderer/consensus/consensus.go b/orderer/consensus/consensus.go index 6b86902b356..238719c432d 100644 --- a/orderer/consensus/consensus.go +++ b/orderer/consensus/consensus.go @@ -32,8 +32,27 @@ type Consenter interface { // 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka) // 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft) type Chain interface { - // Enqueue accepts a message and returns true on acceptance, or false on failure. - Enqueue(env *cb.Envelope) bool + // NOTE: The solo/kafka consenters have not been updated to perform the revalidation + // checks conditionally. For now, Order/Configure are essentially Enqueue as before. + // This does not cause data inconsistency, but it wastes cycles and will be required + // to properly support the ConfigUpdate concept once introduced + + // Order accepts a message which has been processed at a given configSeq. + // If the configSeq advances, it is the responsibility of the consenter + // to revalidate and potentially discard the message + // The consenter may return an error, indicating the message was not accepted + Order(env *cb.Envelope, configSeq uint64) error + + // Configure accepts a message which reconfigures the channel and will + // trigger an update to the configSeq if committed. The configuration must have + // been triggered by a ConfigUpdate message, which is included. If the config + // sequence advances, it is the responsibility of the consenter to recompute the + // resulting config, discarding the message if the reconfiguration is no longer + // valid. While a configure message is in flight, the consenter should lock + // and block additional calls to Order/Configure, any messages received will + // need to be revalidated before ordering. + // The consenter may return an error, indicating the message was not accepted + Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error // Errored returns a channel which will close when an error has occurred. // This is especially useful for the Deliver client, who must terminate waiting diff --git a/orderer/consensus/kafka/chain.go b/orderer/consensus/kafka/chain.go index 80ca854f422..d0bbb4939af 100644 --- a/orderer/consensus/kafka/chain.go +++ b/orderer/consensus/kafka/chain.go @@ -112,9 +112,21 @@ func (chain *chainImpl) Halt() { } } -// Enqueue accepts a message and returns true on acceptance, or false otheriwse. // Implements the consensus.Chain interface. Called by Broadcast(). -func (chain *chainImpl) Enqueue(env *cb.Envelope) bool { +func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error { + if !chain.enqueue(env) { + return fmt.Errorf("Could not enqueue") + } + return nil +} + +// Implements the consensus.Chain interface. Called by Broadcast(). +func (chain *chainImpl) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error { + return chain.Order(config, configSeq) +} + +// enqueue accepts a message and returns true on acceptance, or false otheriwse. +func (chain *chainImpl) enqueue(env *cb.Envelope) bool { logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID()) select { case <-chain.startChan: // The Start phase has completed diff --git a/orderer/consensus/kafka/chain_test.go b/orderer/consensus/kafka/chain_test.go index 2512fa18568..45bc0a22e97 100644 --- a/orderer/consensus/kafka/chain_test.go +++ b/orderer/consensus/kafka/chain_test.go @@ -195,7 +195,7 @@ func TestChain(t *testing.T) { assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic") }) - t.Run("EnqueueIfNotStarted", func(t *testing.T) { + t.Run("enqueueIfNotStarted", func(t *testing.T) { mockChannel, mockBroker, mockSupport := newMocks(t) defer func() { mockBroker.Close() }() chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) @@ -216,7 +216,7 @@ func TestChain(t *testing.T) { SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message), }) - assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false") + assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false") }) t.Run("StartWithConsumerForChannelError", func(t *testing.T) { @@ -247,7 +247,7 @@ func TestChain(t *testing.T) { assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic") }) - t.Run("EnqueueProper", func(t *testing.T) { + t.Run("enqueueProper", func(t *testing.T) { mockChannel, mockBroker, mockSupport := newMocks(t) defer func() { mockBroker.Close() }() chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) @@ -273,14 +273,14 @@ func TestChain(t *testing.T) { t.Fatal("startChan should have been closed by now") } - // Enqueue should have access to the post path, and its ProduceRequest + // enqueue should have access to the post path, and its ProduceRequest // should go by without error - assert.True(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return true") + assert.True(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return true") chain.Halt() }) - t.Run("EnqueueIfHalted", func(t *testing.T) { + t.Run("enqueueIfHalted", func(t *testing.T) { mockChannel, mockBroker, mockSupport := newMocks(t) defer func() { mockBroker.Close() }() chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) @@ -308,10 +308,10 @@ func TestChain(t *testing.T) { chain.Halt() // haltChan should close access to the post path - assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false") + assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false") }) - t.Run("EnqueueError", func(t *testing.T) { + t.Run("enqueueError", func(t *testing.T) { mockChannel, mockBroker, mockSupport := newMocks(t) defer func() { mockBroker.Close() }() chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1) @@ -345,7 +345,7 @@ func TestChain(t *testing.T) { SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotLeaderForPartition), }) - assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false") + assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false") }) } diff --git a/orderer/consensus/kafka/consenter_test.go b/orderer/consensus/kafka/consenter_test.go index 883652573f8..b27547dc999 100644 --- a/orderer/consensus/kafka/consenter_test.go +++ b/orderer/consensus/kafka/consenter_test.go @@ -165,7 +165,7 @@ func setupTestLogging(logLevel string, verbose bool) { // Taken from orderer/solo/consensus_test.go func syncQueueMessage(message *cb.Envelope, chain *chainImpl, mockBlockcutter *mockblockcutter.Receiver) { - chain.Enqueue(message) + chain.enqueue(message) mockBlockcutter.Block <- struct{}{} // We'll move past this line (and the function will return) only when the mock blockcutter is about to return } diff --git a/orderer/consensus/solo/consensus.go b/orderer/consensus/solo/consensus.go index a9bae3b2c0d..5ed0efc9135 100644 --- a/orderer/consensus/solo/consensus.go +++ b/orderer/consensus/solo/consensus.go @@ -17,6 +17,7 @@ limitations under the License. package solo import ( + "fmt" "time" "github.com/hyperledger/fabric/orderer/common/msgprocessor" @@ -37,7 +38,7 @@ type chain struct { // New creates a new consenter for the solo consensus scheme. // The solo consensus scheme is very simple, and allows only one consenter for a given chain (this process). -// It accepts messages being delivered via Enqueue, orders them, and then uses the blockcutter to form the messages +// It accepts messages being delivered via Order/Configure, orders them, and then uses the blockcutter to form the messages // into blocks before writing to the given ledger func New() consensus.Consenter { return &consenter{} @@ -68,16 +69,22 @@ func (ch *chain) Halt() { } } -// Enqueue accepts a message and returns true on acceptance, or false on shutdown -func (ch *chain) Enqueue(env *cb.Envelope) bool { +// Order accepts normal messages for ordering +func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error { select { case ch.sendChan <- env: - return true + return nil case <-ch.exitChan: - return false + return fmt.Errorf("Exiting") } } +// Order accepts normal messages for ordering +func (ch *chain) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error { + // TODO, handle this specially + return ch.Order(config, configSeq) +} + // Errored only closes on exit func (ch *chain) Errored() <-chan struct{} { return ch.exitChan diff --git a/orderer/consensus/solo/consensus_test.go b/orderer/consensus/solo/consensus_test.go index 4d7c32522f6..6c269cb5dec 100644 --- a/orderer/consensus/solo/consensus_test.go +++ b/orderer/consensus/solo/consensus_test.go @@ -37,7 +37,7 @@ func init() { var testMessage = &cb.Envelope{Payload: []byte("TEST_MESSAGE")} func syncQueueMessage(msg *cb.Envelope, chain *chain, bc *mockblockcutter.Receiver) { - chain.Enqueue(msg) + chain.Order(msg, 0) bc.Block <- struct{}{} } @@ -91,7 +91,7 @@ func TestStart(t *testing.T) { defer bs.Halt() support.BlockCutterVal.CutNext = true - bs.Enqueue(testMessage) + assert.Nil(t, bs.Order(testMessage, 0)) select { case <-support.Blocks: case <-bs.Errored(): @@ -99,7 +99,7 @@ func TestStart(t *testing.T) { } } -func TestEnqueueAfterHalt(t *testing.T) { +func TestOrderAfterHalt(t *testing.T) { batchTimeout, _ := time.ParseDuration("1ms") support := &mockmultichannel.ConsenterSupport{ Blocks: make(chan *cb.Block), @@ -109,7 +109,7 @@ func TestEnqueueAfterHalt(t *testing.T) { defer close(support.BlockCutterVal.Block) bs := newChain(support) bs.Halt() - assert.False(t, bs.Enqueue(testMessage), "Enqueue should not be accepted after halt") + assert.NotNil(t, bs.Order(testMessage, 0), "Order should not be accepted after halt") select { case <-bs.Errored(): default: @@ -253,7 +253,7 @@ func TestConfigMsg(t *testing.T) { syncQueueMessage(testMessage, bs, support.BlockCutterVal) support.ClassifyMsgVal = msgprocessor.ConfigUpdateMsg - bs.Enqueue(testMessage) + assert.Nil(t, bs.Order(testMessage, 0)) select { case <-support.Blocks: