Skip to content

Commit

Permalink
block censorship integration tests
Browse files Browse the repository at this point in the history
Signed-off-by: Emil Elizarov <emil.elizarov@ibm.com>
  • Loading branch information
Emil Elizarov committed Jun 8, 2024
1 parent 8724a85 commit 94f545a
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 106 deletions.
105 changes: 60 additions & 45 deletions integration/smartbft/mock_orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ type MockOrderer struct {
grpcServer *comm.GRPCServer
channel chan string
censorDataMode bool
malicious bool
peerFirstTry bool
sentCount int
stopDeliveryChannel chan struct{}
StopDelivery bool
StopDeliveryChannel chan struct{}
censorAfter int
}

func (mo *MockOrderer) parseEnvelope(ctx context.Context, envelope *cb.Envelope) (*cb.Payload, *cb.ChannelHeader, *cb.SignatureHeader, error) {
Expand All @@ -58,6 +58,20 @@ func (mo *MockOrderer) parseEnvelope(ctx context.Context, envelope *cb.Envelope)
return payload, chdr, shdr, nil
}

func (mo *MockOrderer) StartCensoring(censorAfter int) {
mo.censorDataMode = true
mo.censorAfter = censorAfter
mo.StopDeliveryChannel = make(chan struct{})
}

func (mo *MockOrderer) StopCensoring() {
mo.censorDataMode = false
mo.censorAfter = math.MaxInt
close(mo.StopDeliveryChannel)
mo.StopDelivery = false
mo.StopDeliveryChannel = nil
}

func (mo *MockOrderer) Broadcast(server ab.AtomicBroadcast_BroadcastServer) error {
panic("implement me: Broadcast")
}
Expand All @@ -71,7 +85,7 @@ func (mo *MockOrderer) Deliver(server ab.AtomicBroadcast_DeliverServer) error {
mo.logger.Infof("Attempting to read seek info message from %s", addr)
envelope, err := server.Recv()
if err == io.EOF {
mo.logger.Debugf("Received EOF from %s, hangup", addr)
mo.logger.Infof("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
Expand All @@ -92,7 +106,7 @@ func (mo *MockOrderer) Deliver(server ab.AtomicBroadcast_DeliverServer) error {
return err
}

mo.logger.Debugf("Waiting for new SeekInfo from %s", addr)
mo.logger.Infof("Waiting for new SeekInfo from %s", addr)

return nil
}
Expand Down Expand Up @@ -120,41 +134,44 @@ func (mo *MockOrderer) deliverBlocks(
return cb.Status_BAD_REQUEST, nil
}

mo.logger.Infof("[%s] Received seekInfo.Start %+v", mo.address, seekInfo.Start)

mo.logger.Infof("[channel: %s] Received seekInfo (%p) %+v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
mo.logger.Infof("[channel: %s address: %s] Received seekInfo %+v [start: %v] [stop: %v] from %s", chdr.ChannelId, mo.address, seekInfo, seekInfo.Start, seekInfo.Stop, addr)

ledgerIdx := seekInfo.Start.GetSpecified().Number
number := uint64(1)
ledgerLastIdx := uint64(len(mo.ledgerArray) - 1)
var stopNum uint64
mo.logger.Infof("[channel: %s] Received seekInfo %+v from %s with ledgerIdx: [%d]", chdr.ChannelId, seekInfo, addr, ledgerIdx)
var startIdx uint64

switch start := seekInfo.Start.Type.(type) {
case *ab.SeekPosition_Oldest:
startIdx = uint64(1)
case *ab.SeekPosition_Newest:
startIdx = ledgerLastIdx
case *ab.SeekPosition_Specified:
startIdx = start.Specified.Number
}

var stopIdx uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
stopIdx = startIdx
case *ab.SeekPosition_Newest:
// when seeking only the newest block (i.e. starting
// and stopping at newest), don't reevaluate the ledger
// height as this can lead to multiple blocks being
// sent when only one is expected
if proto.Equal(seekInfo.Start, seekInfo.Stop) {
stopNum = number
break
}
stopNum = ledgerLastIdx
stopIdx = ledgerLastIdx
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
mo.logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return cb.Status_BAD_REQUEST, nil
}
stopIdx = stop.Specified.Number
}

if stopIdx < startIdx {
mo.logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, startIdx, stopIdx)
return cb.Status_BAD_REQUEST, nil
}

ledgerIdx := startIdx
for {
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > ledgerLastIdx {
mo.logger.Warningf("[channel: %s] Block %d not found, block number greater than chain length bounds", chdr.ChannelId, number)
if ledgerIdx > ledgerLastIdx {
mo.logger.Warningf("[channel: %s] Block %d not found, block number greater than chain length bounds", chdr.ChannelId, ledgerIdx)
return cb.Status_NOT_FOUND, nil
}
}
Expand Down Expand Up @@ -187,29 +204,23 @@ func (mo *MockOrderer) deliverBlocks(
return status, nil
}

number++
block2send := &cb.Block{
Header: block.Header,
Metadata: block.Metadata,
Data: block.Data,
}

if seekInfo.ContentType == ab.SeekInfo_HEADER_WITH_SIG && !protoutil.IsConfigBlock(block) {
mo.logger.Debugf("asked for header block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
mo.logger.Infof("asked for header block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
block2send.Data = nil
} else if mo.censorDataMode {
mo.logger.Debugf("asked for data block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
if !mo.peerFirstTry {
mo.malicious = true
mo.logger.Debugf("malicious mode activated for [%s]", mo.address)
}
if mo.sentCount >= 5 && mo.malicious {
<-mo.stopDeliveryChannel
}
} else {
mo.logger.Debugf("asked for data block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
mo.logger.Infof("asked for data block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
if mo.censorDataMode && mo.sentCount >= mo.censorAfter {
mo.logger.Infof("censoring blocks from [%s], stopping to respond...", mo.address)
mo.StopDelivery = true
<-mo.StopDeliveryChannel
}
}
mo.peerFirstTry = true

blockResponse := &ab.DeliverResponse{
Type: &ab.DeliverResponse_Block{Block: block2send},
Expand All @@ -221,13 +232,14 @@ func (mo *MockOrderer) deliverBlocks(
return cb.Status_INTERNAL_SERVER_ERROR, err
}
mo.sentCount += 1
mo.logger.Warningf("[channel: %s, orderer: %s] Sent to %s block number %d", mo.address, chdr.ChannelId, addr, block2send.Header.Number)

if stopNum == block.Header.Number {
if stopIdx == block.Header.Number {
break
}
}

mo.logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
mo.logger.Infof("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)

return cb.Status_SUCCESS, nil
}
Expand All @@ -238,7 +250,6 @@ func NewMockOrderer(address string, ledgerArray []*cb.Block, options comm.Secure
}

logger := flogging.MustGetLogger("mockorderer")

grpcServer, err := comm.NewGRPCServer(address, sc)
if err != nil {
logger.Errorf("Error creating GRPC server: %s", err)
Expand All @@ -247,17 +258,21 @@ func NewMockOrderer(address string, ledgerArray []*cb.Block, options comm.Secure
mo := &MockOrderer{
address: address,
ledgerArray: ledgerArray,
logger: flogging.MustGetLogger("mockorderer"),
logger: logger,
grpcServer: grpcServer,
censorDataMode: false,
malicious: false,
peerFirstTry: false,
sentCount: 0,
censorAfter: math.MaxInt,
}

ab.RegisterAtomicBroadcastServer(grpcServer.Server(), mo)

go grpcServer.Start()
go func() {
err := grpcServer.Start()
if err != nil {
panic("Orderer mock failed to start")
}
}()

return mo, nil
}
Loading

0 comments on commit 94f545a

Please sign in to comment.