Skip to content

Commit

Permalink
fix: default to bitswap-client-only
Browse files Browse the repository at this point in the history
This restores bitswap setup we had in v1.0.0 where only the client
is initialized by default.
lidel authored and hacdias committed May 15, 2024
1 parent ad18bc8 commit 0b01c13
Showing 5 changed files with 134 additions and 107 deletions.
2 changes: 1 addition & 1 deletion docs/environment-variables.md
Original file line number Diff line number Diff line change
@@ -145,7 +145,7 @@ Default: 100
### `RAINBOW_PEERING_SHARED_CACHE`

> [!WARNING]
> Experimental feature.
> Experimental feature, will result in increased network I/O due to Bitswap server being run in addition to the lean client.
Enable sharing of local cache to peers safe-listed with `RAINBOW_PEERING`
or `RAINBOW_SEED_PEERING`.
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -234,7 +234,7 @@ Generate an identity seed and launch a gateway:
Name: "peering-shared-cache",
Value: false,
EnvVars: []string{"RAINBOW_PEERING_SHARED_CACHE"},
Usage: "Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
Usage: "(EXPERIMENTAL: increased network I/O) Enable sharing of local cache to peers safe-listed with --peering. Rainbow will respond to Bitswap queries from these peers, serving locally cached data as needed.",
},
&cli.StringFlag{
Name: "blockstore",
@@ -360,7 +360,7 @@ share the same seed as long as the indexes are different.
IpnsMaxCacheTTL: cctx.Duration("ipns-max-cache-ttl"),
DenylistSubs: cctx.StringSlice("denylists"),
Peering: peeringAddrs,
PeeringCache: cctx.Bool("peering-shared-cache"),
PeeringSharedCache: cctx.Bool("peering-shared-cache"),
Seed: seed,
SeedIndex: index,
SeedPeering: cctx.Bool("seed-peering"),
109 changes: 7 additions & 102 deletions setup.go
Original file line number Diff line number Diff line change
@@ -14,11 +14,6 @@ import (
"github.com/dgraph-io/badger/v4/options"
nopfs "github.com/ipfs-shipyard/nopfs"
nopfsipfs "github.com/ipfs-shipyard/nopfs/ipfs"
"github.com/ipfs/boxo/bitswap"
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmspb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockservice"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
@@ -27,13 +22,9 @@ import (
"github.com/ipfs/boxo/namesys"
"github.com/ipfs/boxo/path/resolver"
"github.com/ipfs/boxo/peering"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
badger4 "github.com/ipfs/go-ds-badger4"
flatfs "github.com/ipfs/go-ds-flatfs"
delay "github.com/ipfs/go-ipfs-delay"
metri "github.com/ipfs/go-metrics-interface"
mprome "github.com/ipfs/go-metrics-prometheus"
"github.com/ipfs/go-unixfsnode"
dagpb "github.com/ipld/go-codec-dagpb"
@@ -76,7 +67,7 @@ type Node struct {
dataDir string
datastore datastore.Batching
blockstore blockstore.Blockstore
bs *bitswap.Bitswap
exchange exchange.Interface
bsrv blockservice.BlockService
resolver resolver.Resolver
ns namesys.NameSystem
@@ -107,8 +98,9 @@ type Config struct {
IpnsMaxCacheTTL time.Duration

DenylistSubs []string
Peering []peer.AddrInfo
PeeringCache bool

Peering []peer.AddrInfo
PeeringSharedCache bool

Seed string
SeedIndex int
@@ -208,7 +200,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

bswap := setupBitswap(ctx, cfg, h, cr, blkst)
bswap := setupBitswapExchange(ctx, cfg, h, cr, blkst)

err = os.Mkdir(filepath.Join(cfg.DataDir, "denylists"), 0755)
if err != nil && !errors.Is(err, fs.ErrExist) {
@@ -233,7 +225,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
return nil, err
}

bsrv := blockservice.New(blkst, &noNotifyExchange{bswap},
bsrv := blockservice.New(blkst, bswap,
// if we are doing things right, our bitswap wantlists should
// not have blocks that we already have (see
// https://github.com/ipfs/boxo/blob/e0d4b3e9b91e9904066a10278e366c9a6d9645c7/blockservice/blockservice.go#L272). Thus
@@ -269,7 +261,7 @@ func Setup(ctx context.Context, cfg Config, key crypto.PrivKey, dnsCache *cached
blockstore: blkst,
dataDir: cfg.DataDir,
datastore: ds,
bs: bswap,
exchange: bswap,
ns: ns,
vs: vs,
bsrv: bsrv,
@@ -398,90 +390,3 @@ func setupPeering(cfg Config, h host.Host) error {

return nil
}

func setupBitswap(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) *bitswap.Bitswap {
var (
peerBlockRequestFilter bsserver.PeerBlockRequestFilter
)
if cfg.PeeringCache && len(cfg.Peering) > 0 {
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}

peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}
} else {
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
return false
}
}

bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)
bswap := bitswap.New(bsctx, bn, bstore,
// --- Client Options
// default is 1 minute to search for a random live-want (1
// CID). I think we want to search for random live-wants more
// often although probably it overlaps with general
// rebroadcasts.
bitswap.RebroadcastDelay(delay.Fixed(10*time.Second)),
// ProviderSearchDelay: default is 1 second.
bitswap.ProviderSearchDelay(time.Second),
bitswap.WithoutDuplicatedBlockStats(),

// ---- Server Options
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
bitswap.ProvideEnabled(false),
// Do not keep track of other peer's wantlists, we only want to reply if we
// have a block. If we get it later, it's no longer relevant.
bitswap.WithPeerLedger(&noopPeerLedger{}),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
)
bn.Start(bswap)

return bswap
}

type noopPeerLedger struct{}

func (*noopPeerLedger) Wants(p peer.ID, e wl.Entry) {}

func (*noopPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return false
}

func (*noopPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ bsmspb.Message_Wantlist_WantType) {
}

func (*noopPeerLedger) Peers(k cid.Cid) []bsserver.PeerEntry {
return nil
}

func (*noopPeerLedger) CollectPeerIDs() []peer.ID {
return nil
}

func (*noopPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return 0
}

func (*noopPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
return nil
}

func (*noopPeerLedger) ClearPeerWantlist(p peer.ID) {}

func (*noopPeerLedger) PeerDisconnected(p peer.ID) {}

type noNotifyExchange struct {
exchange.Interface
}

func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}
122 changes: 122 additions & 0 deletions setup_bitswap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"time"

"github.com/ipfs/boxo/bitswap"
bsclient "github.com/ipfs/boxo/bitswap/client"
wl "github.com/ipfs/boxo/bitswap/client/wantlist"
bsmspb "github.com/ipfs/boxo/bitswap/message/pb"
bsnet "github.com/ipfs/boxo/bitswap/network"
bsserver "github.com/ipfs/boxo/bitswap/server"
"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/exchange"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
metri "github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

func setupBitswapExchange(ctx context.Context, cfg Config, h host.Host, cr routing.ContentRouting, bstore blockstore.Blockstore) exchange.Interface {
bsctx := metri.CtxScope(ctx, "ipfs_bitswap")
bn := bsnet.NewFromIpfsHost(h, cr)

// --- Client Options
// bitswap.RebroadcastDelay: default is 1 minute to search for a random
// live-want (1 CID). I think we want to search for random live-wants more
// often although probably it overlaps with general rebroadcasts.
rebroadcastDelay := delay.Fixed(10 * time.Second)
// bitswap.ProviderSearchDelay: default is 1 second.
providerSearchDelay := 1 * time.Second

// If peering and shared cache are both enabled, we initialize both a
// Client and a Server with custom request filter and custom options.
// client+server is more expensive but necessary when deployment requires
// serving cached blocks to safelisted peerids
if cfg.PeeringSharedCache && len(cfg.Peering) > 0 {
var peerBlockRequestFilter bsserver.PeerBlockRequestFilter

// Set up request filter to only respond to request for safelisted (peered) nodes
peers := make(map[peer.ID]struct{}, len(cfg.Peering))
for _, a := range cfg.Peering {
peers[a.ID] = struct{}{}
}
peerBlockRequestFilter = func(p peer.ID, c cid.Cid) bool {
_, ok := peers[p]
return ok
}

// Initialize client+server
bswap := bitswap.New(bsctx, bn, bstore,
// --- Client Options
bitswap.RebroadcastDelay(rebroadcastDelay),
bitswap.ProviderSearchDelay(providerSearchDelay),
bitswap.WithoutDuplicatedBlockStats(),

// ---- Server Options
bitswap.WithPeerBlockRequestFilter(peerBlockRequestFilter),
bitswap.ProvideEnabled(false),
// Do not keep track of other peer's wantlists, we only want to reply if we
// have a block. If we get it later, it's no longer relevant.
bitswap.WithPeerLedger(&noopPeerLedger{}),
// When we don't have a block, don't reply. This reduces processment.
bitswap.SetSendDontHaves(false),
)
bn.Start(bswap)
return &noNotifyExchange{bswap}
}

// By default, rainbow runs with bitswap client alone
bswap := bsclient.New(bsctx, bn, bstore,
// --- Client Options
bsclient.RebroadcastDelay(rebroadcastDelay),
bsclient.ProviderSearchDelay(providerSearchDelay),
bsclient.WithoutDuplicatedBlockStats(),
)
bn.Start(bswap)
return bswap
}

type noopPeerLedger struct{}

func (*noopPeerLedger) Wants(p peer.ID, e wl.Entry) {}

func (*noopPeerLedger) CancelWant(p peer.ID, k cid.Cid) bool {
return false
}

func (*noopPeerLedger) CancelWantWithType(p peer.ID, k cid.Cid, typ bsmspb.Message_Wantlist_WantType) {
}

func (*noopPeerLedger) Peers(k cid.Cid) []bsserver.PeerEntry {
return nil
}

func (*noopPeerLedger) CollectPeerIDs() []peer.ID {
return nil
}

func (*noopPeerLedger) WantlistSizeForPeer(p peer.ID) int {
return 0
}

func (*noopPeerLedger) WantlistForPeer(p peer.ID) []wl.Entry {
return nil
}

func (*noopPeerLedger) ClearPeerWantlist(p peer.ID) {}

func (*noopPeerLedger) PeerDisconnected(p peer.ID) {}

type noNotifyExchange struct {
exchange.Interface
}

func (e *noNotifyExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// Rainbow does not notify when we get new blocks in our Blockservice.
return nil
}
4 changes: 2 additions & 2 deletions setup_test.go
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ func mustPeeredNodes(t *testing.T, configuration [][]int, peeringShareCache bool
RoutingV1Endpoints: []string{},
ListenAddrs: []string{mas[i].String()},
Peering: []peer.AddrInfo{},
PeeringCache: peeringShareCache,
PeeringSharedCache: peeringShareCache,
}

for _, j := range configuration[i] {
@@ -118,7 +118,7 @@ func TestPeering(t *testing.T) {
}, false)
}

func TestPeeringCache(t *testing.T) {
func TestPeeringSharedCache(t *testing.T) {
nodes := mustPeeredNodes(t, [][]int{
{1}, // 0 peered to 1
{0}, // 1 peered to 0

0 comments on commit 0b01c13

Please sign in to comment.