Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration tests for censorship detection #4881

Merged
merged 2 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 60 additions & 45 deletions integration/smartbft/mock_orderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ type MockOrderer struct {
grpcServer *comm.GRPCServer
channel chan string
censorDataMode bool
malicious bool
peerFirstTry bool
sentCount int
stopDeliveryChannel chan struct{}
StopDelivery bool
StopDeliveryChannel chan struct{}
censorAfter int
}

func (mo *MockOrderer) parseEnvelope(ctx context.Context, envelope *cb.Envelope) (*cb.Payload, *cb.ChannelHeader, *cb.SignatureHeader, error) {
Expand All @@ -58,6 +58,20 @@ func (mo *MockOrderer) parseEnvelope(ctx context.Context, envelope *cb.Envelope)
return payload, chdr, shdr, nil
}

func (mo *MockOrderer) StartCensoring(censorAfter int) {
mo.censorDataMode = true
mo.censorAfter = censorAfter
mo.StopDeliveryChannel = make(chan struct{})
}

func (mo *MockOrderer) StopCensoring() {
mo.censorDataMode = false
mo.censorAfter = math.MaxInt
close(mo.StopDeliveryChannel)
mo.StopDelivery = false
mo.StopDeliveryChannel = nil
}

func (mo *MockOrderer) Broadcast(server ab.AtomicBroadcast_BroadcastServer) error {
panic("implement me: Broadcast")
}
Expand All @@ -71,7 +85,7 @@ func (mo *MockOrderer) Deliver(server ab.AtomicBroadcast_DeliverServer) error {
mo.logger.Infof("Attempting to read seek info message from %s", addr)
envelope, err := server.Recv()
if err == io.EOF {
mo.logger.Debugf("Received EOF from %s, hangup", addr)
mo.logger.Infof("Received EOF from %s, hangup", addr)
return nil
}
if err != nil {
Expand All @@ -92,7 +106,7 @@ func (mo *MockOrderer) Deliver(server ab.AtomicBroadcast_DeliverServer) error {
return err
}

mo.logger.Debugf("Waiting for new SeekInfo from %s", addr)
mo.logger.Infof("Waiting for new SeekInfo from %s", addr)

return nil
}
Expand Down Expand Up @@ -120,41 +134,44 @@ func (mo *MockOrderer) deliverBlocks(
return cb.Status_BAD_REQUEST, nil
}

mo.logger.Infof("[%s] Received seekInfo.Start %+v", mo.address, seekInfo.Start)

mo.logger.Infof("[channel: %s] Received seekInfo (%p) %+v from %s", chdr.ChannelId, seekInfo, seekInfo, addr)
mo.logger.Infof("[channel: %s address: %s] Received seekInfo %+v [start: %v] [stop: %v] from %s", chdr.ChannelId, mo.address, seekInfo, seekInfo.Start, seekInfo.Stop, addr)

ledgerIdx := seekInfo.Start.GetSpecified().Number
number := uint64(1)
ledgerLastIdx := uint64(len(mo.ledgerArray) - 1)
var stopNum uint64
mo.logger.Infof("[channel: %s] Received seekInfo %+v from %s with ledgerIdx: [%d]", chdr.ChannelId, seekInfo, addr, ledgerIdx)
var startIdx uint64

switch start := seekInfo.Start.Type.(type) {
case *ab.SeekPosition_Oldest:
startIdx = uint64(1)
case *ab.SeekPosition_Newest:
startIdx = ledgerLastIdx
case *ab.SeekPosition_Specified:
startIdx = start.Specified.Number
}

var stopIdx uint64
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
stopIdx = startIdx
case *ab.SeekPosition_Newest:
// when seeking only the newest block (i.e. starting
// and stopping at newest), don't reevaluate the ledger
// height as this can lead to multiple blocks being
// sent when only one is expected
if proto.Equal(seekInfo.Start, seekInfo.Stop) {
stopNum = number
break
}
stopNum = ledgerLastIdx
stopIdx = ledgerLastIdx
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
if stopNum < number {
mo.logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum)
return cb.Status_BAD_REQUEST, nil
}
stopIdx = stop.Specified.Number
}

if stopIdx < startIdx {
mo.logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, startIdx, stopIdx)
return cb.Status_BAD_REQUEST, nil
}

ledgerIdx := startIdx
for {
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > ledgerLastIdx {
mo.logger.Warningf("[channel: %s] Block %d not found, block number greater than chain length bounds", chdr.ChannelId, number)
if ledgerIdx > ledgerLastIdx {
mo.logger.Warningf("[channel: %s] Block %d not found, block number greater than chain length bounds", chdr.ChannelId, ledgerIdx)
return cb.Status_NOT_FOUND, nil
}
}
Expand Down Expand Up @@ -187,29 +204,23 @@ func (mo *MockOrderer) deliverBlocks(
return status, nil
}

number++
block2send := &cb.Block{
Header: block.Header,
Metadata: block.Metadata,
Data: block.Data,
}

if seekInfo.ContentType == ab.SeekInfo_HEADER_WITH_SIG && !protoutil.IsConfigBlock(block) {
mo.logger.Debugf("asked for header block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
mo.logger.Infof("asked for header block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
block2send.Data = nil
} else if mo.censorDataMode {
mo.logger.Debugf("asked for data block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
if !mo.peerFirstTry {
mo.malicious = true
mo.logger.Debugf("malicious mode activated for [%s]", mo.address)
}
if mo.sentCount >= 5 && mo.malicious {
<-mo.stopDeliveryChannel
}
} else {
mo.logger.Debugf("asked for data block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
mo.logger.Infof("asked for data block from [%s]; block num [%d]", mo.address, block2send.Header.Number)
if mo.censorDataMode && mo.sentCount >= mo.censorAfter {
mo.logger.Infof("censoring blocks from [%s], stopping to respond...", mo.address)
mo.StopDelivery = true
<-mo.StopDeliveryChannel
}
}
mo.peerFirstTry = true

blockResponse := &ab.DeliverResponse{
Type: &ab.DeliverResponse_Block{Block: block2send},
Expand All @@ -221,13 +232,14 @@ func (mo *MockOrderer) deliverBlocks(
return cb.Status_INTERNAL_SERVER_ERROR, err
}
mo.sentCount += 1
mo.logger.Warningf("[channel: %s, orderer: %s] Sent to %s block number %d", mo.address, chdr.ChannelId, addr, block2send.Header.Number)

if stopNum == block.Header.Number {
if stopIdx == block.Header.Number {
break
}
}

mo.logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)
mo.logger.Infof("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo)

return cb.Status_SUCCESS, nil
}
Expand All @@ -238,7 +250,6 @@ func NewMockOrderer(address string, ledgerArray []*cb.Block, options comm.Secure
}

logger := flogging.MustGetLogger("mockorderer")

grpcServer, err := comm.NewGRPCServer(address, sc)
if err != nil {
logger.Errorf("Error creating GRPC server: %s", err)
Expand All @@ -247,17 +258,21 @@ func NewMockOrderer(address string, ledgerArray []*cb.Block, options comm.Secure
mo := &MockOrderer{
address: address,
ledgerArray: ledgerArray,
logger: flogging.MustGetLogger("mockorderer"),
logger: logger,
grpcServer: grpcServer,
censorDataMode: false,
malicious: false,
peerFirstTry: false,
sentCount: 0,
censorAfter: math.MaxInt,
}

ab.RegisterAtomicBroadcastServer(grpcServer.Server(), mo)

go grpcServer.Start()
go func() {
err := grpcServer.Start()
if err != nil {
panic("Orderer mock failed to start")
}
}()

return mo, nil
}
Loading