Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Add WireTap interface #444

Merged
merged 2 commits into from
Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ type Bitswap struct {
allMetric metrics.Histogram
sentHistogram metrics.Histogram

// External statistics interface
wiretap WireTap

// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager

Expand Down Expand Up @@ -419,6 +422,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

if bs.wiretap != nil {
bs.wiretap.MessageReceived(p, incoming)
}

iblocks := incoming.Blocks()

if len(iblocks) > 0 {
Expand Down
144 changes: 143 additions & 1 deletion bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
decision "github.com/ipfs/go-bitswap/internal/decision"
bssession "github.com/ipfs/go-bitswap/internal/session"
"github.com/ipfs/go-bitswap/message"
bsmsg "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
Expand Down Expand Up @@ -468,7 +470,6 @@ func TestBasicBitswap(t *testing.T) {
if err != nil {
t.Fatal(err)
}

st1, err := instances[1].Exchange.Stat()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -860,3 +861,144 @@ func TestWithScoreLedger(t *testing.T) {
t.Fatal("Expected the score ledger to be closed within 5s")
}
}

type logItem struct {
dir byte
pid peer.ID
msg bsmsg.BitSwapMessage
}
type mockWireTap struct {
log []logItem
}

func (m *mockWireTap) MessageReceived(p peer.ID, msg bsmsg.BitSwapMessage) {
m.log = append(m.log, logItem{'r', p, msg})
}
func (m *mockWireTap) MessageSent(p peer.ID, msg bsmsg.BitSwapMessage) {
m.log = append(m.log, logItem{'s', p, msg})
}

func TestWireTap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

instances := ig.Instances(3)
blocks := bg.Blocks(2)

// Install WireTap
wiretap := new(mockWireTap)
bitswap.EnableWireTap(wiretap)(instances[0].Exchange)

// First peer has block
err := instances[0].Exchange.HasBlock(blocks[0])
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

// Second peer broadcasts want for block CID
// (Received by first and third peers)
_, err = instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
if err != nil {
t.Fatal(err)
}

// When second peer receives block, it should send out a cancel, so third
// peer should no longer keep second peer's want
if err = tu.WaitFor(ctx, func() error {
if len(instances[2].Exchange.WantlistForPeer(instances[1].Peer)) != 0 {
return fmt.Errorf("should have no items in other peers wantlist")
}
if len(instances[1].Exchange.GetWantlist()) != 0 {
return fmt.Errorf("shouldnt have anything in wantlist")
}
return nil
}); err != nil {
t.Fatal(err)
}

// After communication, 3 messages should be logged via WireTap
if l := len(wiretap.log); l != 3 {
t.Fatal("expected 3 items logged via WireTap, found", l)
}

// Received: 'Have'
if wiretap.log[0].dir != 'r' {
t.Error("expected message to be received")
}
if wiretap.log[0].pid != instances[1].Peer {
t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[0].pid)
}
if l := len(wiretap.log[0].msg.Wantlist()); l != 1 {
t.Fatal("expected 1 entry in Wantlist, found", l)
}
if wiretap.log[0].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Have {
t.Error("expected WantType equal to 'Have', found 'Block'")
}

// Sent: Block
if wiretap.log[1].dir != 's' {
t.Error("expected message to be sent")
}
if wiretap.log[1].pid != instances[1].Peer {
t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[1].pid)
}
if l := len(wiretap.log[1].msg.Blocks()); l != 1 {
t.Fatal("expected 1 entry in Blocks, found", l)
}
if wiretap.log[1].msg.Blocks()[0].Cid() != blocks[0].Cid() {
t.Error("wrong block Cid")
}

// Received: 'Cancel'
if wiretap.log[2].dir != 'r' {
t.Error("expected message to be received")
}
if wiretap.log[2].pid != instances[1].Peer {
t.Error("expected peer", instances[1].Peer, ", found", wiretap.log[2].pid)
}
if l := len(wiretap.log[2].msg.Wantlist()); l != 1 {
t.Fatal("expected 1 entry in Wantlist, found", l)
}
if wiretap.log[2].msg.Wantlist()[0].WantType != pb.Message_Wantlist_Block {
t.Error("expected WantType equal to 'Block', found 'Have'")
}
if wiretap.log[2].msg.Wantlist()[0].Cancel != true {
t.Error("expected entry with Cancel set to 'true'")
}

// After disabling WireTap, no new messages are logged
bitswap.DisableWireTap()(instances[0].Exchange)

err = instances[0].Exchange.HasBlock(blocks[1])
if err != nil {
t.Fatal(err)
}
_, err = instances[1].Exchange.GetBlock(ctx, blocks[1].Cid())
if err != nil {
t.Fatal(err)
}
if err = tu.WaitFor(ctx, func() error {
if len(instances[1].Exchange.GetWantlist()) != 0 {
return fmt.Errorf("shouldnt have anything in wantlist")
}
return nil
}); err != nil {
t.Fatal(err)
}

if l := len(wiretap.log); l != 3 {
t.Fatal("expected 3 items logged via WireTap, found", l)
}

for _, inst := range instances {
err := inst.Exchange.Close()
if err != nil {
t.Fatal(err)
}
}
}
27 changes: 27 additions & 0 deletions wiretap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package bitswap

import (
bsmsg "github.com/ipfs/go-bitswap/message"
peer "github.com/libp2p/go-libp2p-core/peer"
)

// WireTap provides methods to access all messages sent and received by Bitswap.
// This interface can be used to implement various statistics (this is original intent).
type WireTap interface {
MessageReceived(peer.ID, bsmsg.BitSwapMessage)
MessageSent(peer.ID, bsmsg.BitSwapMessage)
}

// Configures Bitswap to use given wiretap.
func EnableWireTap(tap WireTap) Option {
return func(bs *Bitswap) {
bs.wiretap = tap
}
}

// Configures Bitswap not to use any wiretap.
func DisableWireTap() Option {
return func(bs *Bitswap) {
bs.wiretap = nil
}
}
3 changes: 3 additions & 0 deletions workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
// Ideally, yes. But we'd need some way to trigger a retry and/or drop
// the peer.
bs.engine.MessageSent(envelope.Peer, envelope.Message)
if bs.wiretap != nil {
bs.wiretap.MessageSent(envelope.Peer, envelope.Message)
}
bs.sendBlocks(ctx, envelope)
case <-ctx.Done():
return
Expand Down