Skip to content

Commit

Permalink
Merge "Allow orderer startup with no system channel defined"
Browse files Browse the repository at this point in the history
  • Loading branch information
sykesm authored and Gerrit Code Review committed Nov 14, 2019
2 parents 028d9fb + 727850f commit 445fd06
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 73 deletions.
43 changes: 43 additions & 0 deletions integration/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,49 @@ var _ = Describe("EndToEnd", func() {
nwo.EnableCapabilities(network, "testchannel", "Application", "V2_0", orderer, network.Peer("Org1", "peer0"), network.Peer("Org2", "peer0"))
})
})

Describe("basic solo network without a system channel", func() {
var ordererProcess ifrit.Process
BeforeEach(func() {
soloConfig := nwo.BasicSolo()
soloConfig.RemovePeer("Org1", "peer1")
soloConfig.RemovePeer("Org2", "peer1")
network = nwo.New(soloConfig, testDir, client, StartPort(), components)
network.GenerateConfigTree()

orderer := network.Orderer("orderer")
ordererConfig := network.ReadOrdererConfig(orderer)
ordererConfig.General.GenesisMethod = "none"
network.WriteOrdererConfig(orderer, ordererConfig)
network.Bootstrap()

ordererRunner := network.OrdererRunner(orderer)
ordererProcess = ifrit.Invoke(ordererRunner)
Eventually(ordererProcess.Ready, network.EventuallyTimeout).Should(BeClosed())
Eventually(ordererRunner.Err(), network.EventuallyTimeout).Should(gbytes.Say("registrar initializing with no system channel"))
})

AfterEach(func() {
if ordererProcess != nil {
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererProcess.Wait(), network.EventuallyTimeout).Should(Receive())
}
})

It("starts the orderer but rejects channel creation requests", func() {
By("attempting to create a channel without a system channel defined")
sess, err := network.PeerAdminSession(network.Peer("Org1", "peer0"), commands.ChannelCreate{
ChannelID: "testchannel",
Orderer: network.OrdererAddress(network.Orderer("orderer"), nwo.ListenPort),
File: network.CreateChannelTxPath("testchannel"),
OutputBlock: "/dev/null",
ClientAuth: network.ClientAuthRequired,
})
Expect(err).NotTo(HaveOccurred())
Eventually(sess, network.EventuallyTimeout).Should(gexec.Exit(1))
Eventually(sess.Err, network.EventuallyTimeout).Should(gbytes.Say("channel creation request not allowed because the orderer system channel is not yet defined"))
})
})
})

func RunQueryInvokeQuery(n *nwo.Network, orderer *nwo.Orderer, peer *nwo.Peer, channel string) {
Expand Down
5 changes: 4 additions & 1 deletion orderer/common/multichannel/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (r *Registrar) Initialize(consenters map[string]consensus.Consenter) {
}

if r.systemChannelID == "" {
logger.Panicf("No system chain found. If bootstrapping, does your system channel contain a consortiums group definition?")
logger.Info("registrar initializing with no system channel")
}
}

Expand All @@ -244,6 +244,9 @@ func (r *Registrar) BroadcastChannelSupport(msg *cb.Envelope) (*cb.ChannelHeader
cs := r.GetChain(chdr.ChannelId)
// New channel creation
if cs == nil {
if r.systemChannel == nil {
return nil, false, nil, errors.New("channel creation request not allowed because the orderer system channel is not yet defined")
}
cs = r.systemChannel
}

Expand Down
35 changes: 28 additions & 7 deletions orderer/common/multichannel/registrar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,12 @@ func newLedgerAndFactory(dir string, chainID string, genesisBlockSys *cb.Block)
if err != nil {
panic(err)
}
err = rl.Append(genesisBlockSys)
if err != nil {
panic(err)

if genesisBlockSys != nil {
err = rl.Append(genesisBlockSys)
if err != nil {
panic(err)
}
}
return rlf, rl
}
Expand Down Expand Up @@ -157,9 +160,9 @@ func TestNewRegistrar(t *testing.T) {
consenters := make(map[string]consensus.Consenter)
consenters[confSys.Orderer.OrdererType] = &mockConsenter{}

assert.Panics(t, func() {
assert.NotPanics(t, func() {
NewRegistrar(localconfig.TopLevel{}, lf, mockCrypto(), &disabled.Provider{}, cryptoProvider).Initialize(consenters)
}, "Should have panicked when starting without a system chain")
}, "Should not panic when starting without a system channel")
})

// This test checks to make sure that the orderer refuses to come up if there are multiple system channels
Expand Down Expand Up @@ -421,8 +424,8 @@ func TestResourcesCheck(t *testing.T) {
}

// The registrar's BroadcastChannelSupport implementation should reject message types which should not be processed directly.
func TestBroadcastChannelSupportRejection(t *testing.T) {
//system channel
func TestBroadcastChannelSupport(t *testing.T) {
// system channel
confSys := genesisconfig.Load(genesisconfig.SampleInsecureSoloProfile, configtest.GetDevConfigDir())
genesisBlockSys := encoder.New(confSys).GenesisBlock()

Expand All @@ -443,4 +446,22 @@ func TestBroadcastChannelSupportRejection(t *testing.T) {
_, _, _, err = registrar.BroadcastChannelSupport(configTx)
assert.Error(t, err, "Messages of type HeaderType_CONFIG should return an error.")
})

t.Run("No system channel", func(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "registrar_test-")
require.NoError(t, err)
defer os.RemoveAll(tmpdir)

ledgerFactory, _ := newLedgerAndFactory(tmpdir, "", nil)
mockConsenters := map[string]consensus.Consenter{confSys.Orderer.OrdererType: &mockConsenter{}}
config := localconfig.TopLevel{}
config.General.GenesisMethod = "none"
config.General.GenesisFile = ""
registrar := NewRegistrar(config, ledgerFactory, mockCrypto(), &disabled.Provider{}, cryptoProvider)
registrar.Initialize(mockConsenters)
configTx := makeConfigTxFull("testchannelid", 1)
_, _, _, err = registrar.BroadcastChannelSupport(configTx)
assert.Error(t, err)
assert.Equal(t, "channel creation request not allowed because the orderer system channel is not yet defined", err.Error())
})
}
96 changes: 48 additions & 48 deletions orderer/common/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,11 @@ func Main() {

prettyPrintStruct(conf)

bootstrapBlock := extractBootstrapBlock(conf)

cryptoProvider := factory.GetDefault()

if err := ValidateBootstrapBlock(bootstrapBlock, cryptoProvider); err != nil {
logger.Panicf("Failed validating bootstrap block: %v", err)
signer, signErr := loadLocalMSP(conf).GetDefaultSigningIdentity()
if signErr != nil {
logger.Panicf("Failed to get local MSP identity: %s", signErr)
}

opsSystem := newOperationsSystem(conf.Operations, conf.Metrics)
Expand All @@ -100,19 +99,6 @@ func Main() {
}
defer opsSystem.Stop()
metricsProvider := opsSystem.Provider

lf, _, err := createLedgerFactory(conf, metricsProvider)
if err != nil {
logger.Panicf("Failed in creating ledger factory: %v", err)
}
sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
clusterBootBlock := selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)

signer, signErr := loadLocalMSP(conf).GetDefaultSigningIdentity()
if signErr != nil {
logger.Panicf("Failed to get local MSP identity: %s", signErr)
}

logObserver := floggingmetrics.NewObserver(metricsProvider)
flogging.SetObserver(logObserver)

Expand All @@ -124,38 +110,58 @@ func Main() {
clientRootCAs: serverConfig.SecOpts.ClientRootCAs,
}

lf, _, err := createLedgerFactory(conf, metricsProvider)
if err != nil {
logger.Panicf("Failed to create ledger factory: %v", err)
}

var clusterBootBlock *cb.Block
// configure following artifacts properly if orderer is of cluster type
var r *replicationInitiator
clusterServerConfig := serverConfig
clusterGRPCServer := grpcServer // by default, cluster shares the same grpc server
var clusterClientConfig comm.ClientConfig
var clusterDialer *cluster.PredicateDialer

var reuseGrpcListener bool
typ := consensusType(bootstrapBlock, cryptoProvider)
var clusterType, reuseGrpcListener bool
var serversToUpdate []*comm.GRPCServer

clusterType := isClusterType(clusterBootBlock, cryptoProvider)
if clusterType {
logger.Infof("Setting up cluster for orderer type %s", typ)
clusterClientConfig = initializeClusterClientConfig(conf)
clusterDialer = &cluster.PredicateDialer{
Config: clusterClientConfig,
if conf.General.GenesisMethod == "file" {
bootstrapBlock := extractBootstrapBlock(conf)
if err := ValidateBootstrapBlock(bootstrapBlock, cryptoProvider); err != nil {
logger.Panicf("Failed validating bootstrap block: %v", err)
}
sysChanLastConfigBlock := extractSysChanLastConfig(lf, bootstrapBlock)
clusterBootBlock = selectClusterBootBlock(bootstrapBlock, sysChanLastConfigBlock)

r = createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer, cryptoProvider)
// Only clusters that are equipped with a recent config block can replicate.
if conf.General.GenesisMethod == "file" {
r.replicateIfNeeded(bootstrapBlock)
}
typ := consensusType(bootstrapBlock, cryptoProvider)
clusterType = isClusterType(clusterBootBlock, cryptoProvider)
if clusterType {
logger.Infof("Setting up cluster for orderer type %s", typ)
clusterClientConfig = initializeClusterClientConfig(conf)
clusterDialer = &cluster.PredicateDialer{
Config: clusterClientConfig,
}

r = createReplicator(lf, bootstrapBlock, conf, clusterClientConfig.SecOpts, signer, cryptoProvider)
// Only clusters that are equipped with a recent config block can replicate.
if conf.General.GenesisMethod == "file" {
r.replicateIfNeeded(bootstrapBlock)
}

if reuseGrpcListener = reuseListener(conf, typ); !reuseGrpcListener {
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, ioutil.ReadFile)
if reuseGrpcListener = reuseListener(conf, typ); !reuseGrpcListener {
clusterServerConfig, clusterGRPCServer = configureClusterListener(conf, serverConfig, ioutil.ReadFile)
}

// If we have a separate gRPC server for the cluster,
// we need to update its TLS CA certificate pool.
serversToUpdate = append(serversToUpdate, clusterGRPCServer)
}
// Are we bootstrapping?
if len(lf.ChannelIDs()) == 0 {
initializeBootstrapChannel(clusterBootBlock, lf)
} else {
logger.Info("Not bootstrapping because of existing channels")
}

// If we have a separate gRPC server for the cluster,
// we need to update its TLS CA certificate pool.
serversToUpdate = append(serversToUpdate, clusterGRPCServer)
}

identityBytes, err := signer.Serialize()
Expand Down Expand Up @@ -697,25 +703,19 @@ func initializeMultichannelRegistrar(
bccsp bccsp.BCCSP,
callbacks ...channelconfig.BundleActor,
) *multichannel.Registrar {
// Are we bootstrapping?
if len(lf.ChannelIDs()) == 0 {
initializeBootstrapChannel(bootstrapBlock, lf)
} else {
logger.Info("Not bootstrapping because of existing channels")
}

consenters := make(map[string]consensus.Consenter)

registrar := multichannel.NewRegistrar(*conf, lf, signer, metricsProvider, bccsp, callbacks...)

consenters := map[string]consensus.Consenter{}
consenters["solo"] = solo.New()
var kafkaMetrics *kafka.Metrics
consenters["kafka"], kafkaMetrics = kafka.New(conf.Kafka, metricsProvider, healthChecker)
// Note, we pass a 'nil' channel here, we could pass a channel that
// closes if we wished to cleanup this routine on exit.
go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil)
if isClusterType(bootstrapBlock, bccsp) {
initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider, bccsp)
if conf.General.GenesisMethod == "file" {
if isClusterType(bootstrapBlock, bccsp) {
initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider, bccsp)
}
}
registrar.Initialize(consenters)
return registrar
Expand Down
63 changes: 46 additions & 17 deletions orderer/common/server/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestLoadLocalMSP(t *testing.T) {
})
}

func TestInitializeMultiChainManager(t *testing.T) {
func TestInitializeMultichannelRegistrar(t *testing.T) {
cleanup := configtest.SetDevFabricConfigPath(t)
defer cleanup()
genesisFile := produceGenesisFile(t, genesisconfig.SampleDevModeSoloProfile, "testchannelid")
Expand All @@ -382,22 +382,50 @@ func TestInitializeMultiChainManager(t *testing.T) {
assert.NoError(t, err)

signer := &server_mocks.SignerSerializer{}
lf, _, err := createLedgerFactory(conf, &disabled.Provider{})
assert.NoError(t, err)
bootBlock := file.New(genesisFile).GenesisBlock()
initializeMultichannelRegistrar(
bootBlock,
&replicationInitiator{cryptoProvider: cryptoProvider},
&cluster.PredicateDialer{},
comm.ServerConfig{},
nil,
conf,
signer,
&disabled.Provider{},
&server_mocks.HealthChecker{},
lf,
cryptoProvider,
)

t.Run("registrar with system channel", func(t *testing.T) {
lf, _, err := createLedgerFactory(conf, &disabled.Provider{})
assert.NoError(t, err)
bootBlock := file.New(genesisFile).GenesisBlock()
initializeBootstrapChannel(bootBlock, lf)
registrar := initializeMultichannelRegistrar(
bootBlock,
&replicationInitiator{cryptoProvider: cryptoProvider},
&cluster.PredicateDialer{},
comm.ServerConfig{},
nil,
conf,
signer,
&disabled.Provider{},
&server_mocks.HealthChecker{},
lf,
cryptoProvider,
)
assert.NotNil(t, registrar)
assert.Equal(t, "testchannelid", registrar.SystemChannelID())
})

t.Run("registrar with no system channel", func(t *testing.T) {
conf.General.GenesisMethod = "none"
conf.General.GenesisFile = ""
lf, _, err := createLedgerFactory(conf, &disabled.Provider{})
assert.NoError(t, err)
registrar := initializeMultichannelRegistrar(
nil,
&replicationInitiator{cryptoProvider: cryptoProvider},
&cluster.PredicateDialer{},
comm.ServerConfig{},
nil,
conf,
signer,
&disabled.Provider{},
&server_mocks.HealthChecker{},
lf,
cryptoProvider,
)
assert.NotNil(t, registrar)
assert.Empty(t, registrar.SystemChannelID())
})
}

func TestInitializeGrpcServer(t *testing.T) {
Expand Down Expand Up @@ -465,6 +493,7 @@ func TestUpdateTrustedRoots(t *testing.T) {
lf, _, err := createLedgerFactory(conf, &disabled.Provider{})
assert.NoError(t, err)
bootBlock := file.New(genesisFile).GenesisBlock()
initializeBootstrapChannel(bootBlock, lf)
signer := &server_mocks.SignerSerializer{}

cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
Expand Down

0 comments on commit 445fd06

Please sign in to comment.