Skip to content

Commit

Permalink
[FAB-5266] Replace Enqueue with Order/Configure
Browse files Browse the repository at this point in the history
The current consenter interface only allows for one sort of message
ingress.  All messages are received via 'Enqueue', and treated
identically.

In order for the consenter to be able to differentiate and handle config
vs non-config messages differently, the consenter needs two diferent
ingress points for messages.

This CR replaces the Enqueue method with two methods: Order and
Configure.

For the time being, these methods behave exactly as Enqueue, but will be
leveraged in future CRs.

Change-Id: I3701e5e3c0de4833a455c49acebbad70c6ed763c
Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
  • Loading branch information
Jason Yellick committed Jul 27, 2017
1 parent ed9517e commit 3e5c3e4
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 42 deletions.
22 changes: 19 additions & 3 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ type SupportManager interface {

// Support provides the backing resources needed to support broadcast on a chain
type Support interface {
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
Enqueue(env *cb.Envelope) bool
// Order accepts a message or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Order(env *cb.Envelope, configSeq uint64) error

// Configure accepts a reconfiguration or returns an error indicating the cause of failure
// It ultimately passes through to the consensus.Chain interface
Configure(configUpdateMsg *cb.Envelope, config *cb.Envelope, configSeq uint64) error

// Filters returns the set of broadcast filters for this chain
Filters() *filter.RuleSet
Expand Down Expand Up @@ -102,6 +107,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

isConfig := false
configUpdateMsg := msg
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
logger.Debugf("Preprocessing CONFIG_UPDATE")
msg, err = bh.sm.Process(msg)
Expand All @@ -126,6 +133,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
}

isConfig = true
}

support, ok := bh.sm.GetChain(chdr.ChannelId)
Expand All @@ -144,7 +153,14 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if !support.Enqueue(msg) {
// XXX temporary hack to mesh interface definitions, will remove.
if isConfig {
err = support.Configure(configUpdateMsg, msg, 0)
} else {
err = support.Order(msg, 0)
}

if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

Expand Down
22 changes: 15 additions & 7 deletions orderer/common/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,17 @@ func (ms *mockSupport) Filters() *filter.RuleSet {
return ms.filters
}

// Enqueue sends a message for ordering
func (ms *mockSupport) Enqueue(env *cb.Envelope) bool {
return !ms.rejectEnqueue
// Order sends a message for ordering
func (ms *mockSupport) Order(env *cb.Envelope, configSeq uint64) error {
if ms.rejectEnqueue {
return fmt.Errorf("Reject")
}
return nil
}

// Configure sends a reconfiguration message for ordering
func (ms *mockSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return ms.Order(config, configSeq)
}

func makeConfigMessage(chainID string) *cb.Envelope {
Expand Down Expand Up @@ -264,9 +272,9 @@ func TestGoodConfigUpdate(t *testing.T) {
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
newChannelId := "New Chain"
newChannelID := "New Chain"

m.recvChan <- makeConfigMessage(newChannelId)
m.recvChan <- makeConfigMessage(newChannelID)
reply := <-m.sendChan
assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE")
}
Expand Down Expand Up @@ -301,9 +309,9 @@ func TestRejected(t *testing.T) {
defer close(m.recvChan)
go bh.Handle(m)

newChannelId := "New Chain"
newChannelID := "New Chain"

m.recvChan <- makeConfigMessage(newChannelId)
m.recvChan <- makeConfigMessage(newChannelID)
reply := <-m.sendChan
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected CONFIG_UPDATE")
}
Expand Down
11 changes: 8 additions & 3 deletions orderer/common/multichannel/chainsupport.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,14 @@ func (cs *ChainSupport) Reader() ledger.Reader {
return cs.ledger
}

// Enqueue takes a message and sends it to the consenter for ordering.
func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool {
return cs.chain.Enqueue(env)
// Order passes through to the Consenter implementation.
func (cs *ChainSupport) Order(env *cb.Envelope, configSeq uint64) error {
return cs.chain.Order(env, configSeq)
}

// Configure passes through to the Consenter implementation.
func (cs *ChainSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return cs.chain.Configure(configUpdate, config, configSeq)
}

// Errored returns whether the backing consenter has errored
Expand Down
6 changes: 3 additions & 3 deletions orderer/common/multichannel/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestManagerImpl(t *testing.T) {
}

for _, message := range messages {
chainSupport.Enqueue(message)
chainSupport.Order(message, 0)
}

it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
Expand Down Expand Up @@ -489,7 +489,7 @@ func TestNewChain(t *testing.T) {
chainSupport, ok := manager.GetChain(manager.SystemChannelID())
assert.True(t, ok, "Could not find system channel")

chainSupport.Enqueue(wrapped)
chainSupport.Configure(wrapped, wrapped, 0)
func() {
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
defer it.Close()
Expand Down Expand Up @@ -521,7 +521,7 @@ func TestNewChain(t *testing.T) {
}

for _, message := range messages {
chainSupport.Enqueue(message)
chainSupport.Order(message, 0)
}

it, _ := chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}})
Expand Down
9 changes: 7 additions & 2 deletions orderer/common/multichannel/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,14 @@ func (mch *mockChain) Errored() <-chan struct{} {
return nil
}

func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
func (mch *mockChain) Order(env *cb.Envelope, configSeq uint64) error {
mch.queue <- env
return true
return nil
}

func (mch *mockChain) Configure(configUpdate, config *cb.Envelope, configSeq uint64) error {
mch.queue <- config
return nil
}

func (mch *mockChain) Start() {
Expand Down
23 changes: 21 additions & 2 deletions orderer/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,27 @@ type Consenter interface {
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
type Chain interface {
// Enqueue accepts a message and returns true on acceptance, or false on failure.
Enqueue(env *cb.Envelope) bool
// NOTE: The solo/kafka consenters have not been updated to perform the revalidation
// checks conditionally. For now, Order/Configure are essentially Enqueue as before.
// This does not cause data inconsistency, but it wastes cycles and will be required
// to properly support the ConfigUpdate concept once introduced

// Order accepts a message which has been processed at a given configSeq.
// If the configSeq advances, it is the responsibility of the consenter
// to revalidate and potentially discard the message
// The consenter may return an error, indicating the message was not accepted
Order(env *cb.Envelope, configSeq uint64) error

// Configure accepts a message which reconfigures the channel and will
// trigger an update to the configSeq if committed. The configuration must have
// been triggered by a ConfigUpdate message, which is included. If the config
// sequence advances, it is the responsibility of the consenter to recompute the
// resulting config, discarding the message if the reconfiguration is no longer
// valid. While a configure message is in flight, the consenter should lock
// and block additional calls to Order/Configure, any messages received will
// need to be revalidated before ordering.
// The consenter may return an error, indicating the message was not accepted
Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error

// Errored returns a channel which will close when an error has occurred.
// This is especially useful for the Deliver client, who must terminate waiting
Expand Down
16 changes: 14 additions & 2 deletions orderer/consensus/kafka/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,21 @@ func (chain *chainImpl) Halt() {
}
}

// Enqueue accepts a message and returns true on acceptance, or false otheriwse.
// Implements the consensus.Chain interface. Called by Broadcast().
func (chain *chainImpl) Enqueue(env *cb.Envelope) bool {
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
if !chain.enqueue(env) {
return fmt.Errorf("Could not enqueue")
}
return nil
}

// Implements the consensus.Chain interface. Called by Broadcast().
func (chain *chainImpl) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
return chain.Order(config, configSeq)
}

// enqueue accepts a message and returns true on acceptance, or false otheriwse.
func (chain *chainImpl) enqueue(env *cb.Envelope) bool {
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID())
select {
case <-chain.startChan: // The Start phase has completed
Expand Down
18 changes: 9 additions & 9 deletions orderer/consensus/kafka/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestChain(t *testing.T) {
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
})

t.Run("EnqueueIfNotStarted", func(t *testing.T) {
t.Run("enqueueIfNotStarted", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
Expand All @@ -216,7 +216,7 @@ func TestChain(t *testing.T) {
SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message),
})

assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})

t.Run("StartWithConsumerForChannelError", func(t *testing.T) {
Expand Down Expand Up @@ -247,7 +247,7 @@ func TestChain(t *testing.T) {
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
})

t.Run("EnqueueProper", func(t *testing.T) {
t.Run("enqueueProper", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
Expand All @@ -273,14 +273,14 @@ func TestChain(t *testing.T) {
t.Fatal("startChan should have been closed by now")
}

// Enqueue should have access to the post path, and its ProduceRequest
// enqueue should have access to the post path, and its ProduceRequest
// should go by without error
assert.True(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return true")
assert.True(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return true")

chain.Halt()
})

t.Run("EnqueueIfHalted", func(t *testing.T) {
t.Run("enqueueIfHalted", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
Expand Down Expand Up @@ -308,10 +308,10 @@ func TestChain(t *testing.T) {
chain.Halt()

// haltChan should close access to the post path
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})

t.Run("EnqueueError", func(t *testing.T) {
t.Run("enqueueError", func(t *testing.T) {
mockChannel, mockBroker, mockSupport := newMocks(t)
defer func() { mockBroker.Close() }()
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestChain(t *testing.T) {
SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotLeaderForPartition),
})

assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
})
}

Expand Down
2 changes: 1 addition & 1 deletion orderer/consensus/kafka/consenter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func setupTestLogging(logLevel string, verbose bool) {

// Taken from orderer/solo/consensus_test.go
func syncQueueMessage(message *cb.Envelope, chain *chainImpl, mockBlockcutter *mockblockcutter.Receiver) {
chain.Enqueue(message)
chain.enqueue(message)
mockBlockcutter.Block <- struct{}{} // We'll move past this line (and the function will return) only when the mock blockcutter is about to return
}

Expand Down
17 changes: 12 additions & 5 deletions orderer/consensus/solo/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package solo

import (
"fmt"
"time"

"github.com/hyperledger/fabric/orderer/common/msgprocessor"
Expand All @@ -37,7 +38,7 @@ type chain struct {

// New creates a new consenter for the solo consensus scheme.
// The solo consensus scheme is very simple, and allows only one consenter for a given chain (this process).
// It accepts messages being delivered via Enqueue, orders them, and then uses the blockcutter to form the messages
// It accepts messages being delivered via Order/Configure, orders them, and then uses the blockcutter to form the messages
// into blocks before writing to the given ledger
func New() consensus.Consenter {
return &consenter{}
Expand Down Expand Up @@ -68,16 +69,22 @@ func (ch *chain) Halt() {
}
}

// Enqueue accepts a message and returns true on acceptance, or false on shutdown
func (ch *chain) Enqueue(env *cb.Envelope) bool {
// Order accepts normal messages for ordering
func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {
select {
case ch.sendChan <- env:
return true
return nil
case <-ch.exitChan:
return false
return fmt.Errorf("Exiting")
}
}

// Order accepts normal messages for ordering
func (ch *chain) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
// TODO, handle this specially
return ch.Order(config, configSeq)
}

// Errored only closes on exit
func (ch *chain) Errored() <-chan struct{} {
return ch.exitChan
Expand Down
10 changes: 5 additions & 5 deletions orderer/consensus/solo/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func init() {
var testMessage = &cb.Envelope{Payload: []byte("TEST_MESSAGE")}

func syncQueueMessage(msg *cb.Envelope, chain *chain, bc *mockblockcutter.Receiver) {
chain.Enqueue(msg)
chain.Order(msg, 0)
bc.Block <- struct{}{}
}

Expand Down Expand Up @@ -91,15 +91,15 @@ func TestStart(t *testing.T) {
defer bs.Halt()

support.BlockCutterVal.CutNext = true
bs.Enqueue(testMessage)
assert.Nil(t, bs.Order(testMessage, 0))
select {
case <-support.Blocks:
case <-bs.Errored():
t.Fatalf("Expected not to exit")
}
}

func TestEnqueueAfterHalt(t *testing.T) {
func TestOrderAfterHalt(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1ms")
support := &mockmultichannel.ConsenterSupport{
Blocks: make(chan *cb.Block),
Expand All @@ -109,7 +109,7 @@ func TestEnqueueAfterHalt(t *testing.T) {
defer close(support.BlockCutterVal.Block)
bs := newChain(support)
bs.Halt()
assert.False(t, bs.Enqueue(testMessage), "Enqueue should not be accepted after halt")
assert.NotNil(t, bs.Order(testMessage, 0), "Order should not be accepted after halt")
select {
case <-bs.Errored():
default:
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestConfigMsg(t *testing.T) {

syncQueueMessage(testMessage, bs, support.BlockCutterVal)
support.ClassifyMsgVal = msgprocessor.ConfigUpdateMsg
bs.Enqueue(testMessage)
assert.Nil(t, bs.Order(testMessage, 0))

select {
case <-support.Blocks:
Expand Down

0 comments on commit 3e5c3e4

Please sign in to comment.