Skip to content

Commit

Permalink
network: p2p traffic exchange for algorand node (algorand#5939)
Browse files Browse the repository at this point in the history
Functional p2p support for gossip network:
* DHT advertisement and peers capabilities
* Hybrid networking when a node handles both legacy websocket and libp2p networks
* HTTP over p2p support for catching up from p2p nodes
* P2P network cluster test scenarios
* libp2p and DHT metrics and logging handling

Co-authored-by: Eric Warehime <eric.warehime@gmail.com>
Co-authored-by: cce <51567+cce@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 28, 2024
1 parent 63c0d5b commit c8407ab
Show file tree
Hide file tree
Showing 114 changed files with 7,391 additions and 1,032 deletions.
21 changes: 14 additions & 7 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,16 @@ type NetworkFacade struct {
rand *rand.Rand
timeoutAtInitOnce sync.Once
timeoutAtInitWait sync.WaitGroup
peerToNode map[network.Peer]int
peerToNode map[*facadePeer]int
}

type facadePeer struct {
id int
net network.GossipNode
}

func (p *facadePeer) GetNetwork() network.GossipNode { return p.net }

// MakeNetworkFacade creates a facade with a given nodeID.
func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
n := &NetworkFacade{
Expand All @@ -83,12 +90,12 @@ func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
eventsQueues: make(map[string]int),
eventsQueuesCh: make(chan int, 1000),
rand: rand.New(rand.NewSource(int64(nodeID))),
peerToNode: make(map[network.Peer]int, fuzzer.nodesCount),
peerToNode: make(map[*facadePeer]int, fuzzer.nodesCount),
debugMessages: false,
}
n.timeoutAtInitWait.Add(1)
for i := 0; i < fuzzer.nodesCount; i++ {
n.peerToNode[network.Peer(new(int))] = i
n.peerToNode[&facadePeer{id: i, net: n}] = i
}
return n
}
Expand Down Expand Up @@ -179,7 +186,7 @@ func (n *NetworkFacade) WaitForEventsQueue(cleared bool) {
func (n *NetworkFacade) Broadcast(ctx context.Context, tag protocol.Tag, data []byte, wait bool, exclude network.Peer) error {
excludeNode := -1
if exclude != nil {
excludeNode = n.peerToNode[exclude]
excludeNode = n.peerToNode[exclude.(*facadePeer)]
}
return n.broadcast(tag, data, excludeNode, "NetworkFacade service-%v Broadcast %v %v\n")
}
Expand Down Expand Up @@ -240,7 +247,7 @@ func (n *NetworkFacade) PushDownstreamMessage(newMsg context.CancelFunc) bool {
func (n *NetworkFacade) Address() (string, bool) { return "mock network", true }

// Start - unused function
func (n *NetworkFacade) Start() {}
func (n *NetworkFacade) Start() error { return nil }

// Stop - unused function
func (n *NetworkFacade) Stop() {}
Expand Down Expand Up @@ -341,8 +348,8 @@ func (n *NetworkFacade) ReceiveMessage(sourceNode int, tag protocol.Tag, data []
n.pushPendingReceivedMessage()
}

func (n *NetworkFacade) Disconnect(sender network.Peer) {
sourceNode := n.peerToNode[sender]
func (n *NetworkFacade) Disconnect(sender network.DisconnectablePeer) {
sourceNode := n.peerToNode[sender.(*facadePeer)]
n.fuzzer.Disconnect(n.nodeID, sourceNode)
}

Expand Down
9 changes: 4 additions & 5 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package gossip

import (
"context"
"net"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -136,7 +135,7 @@ func (w *whiteholeNetwork) Relay(ctx context.Context, tag protocol.Tag, data []b
func (w *whiteholeNetwork) BroadcastSimple(tag protocol.Tag, data []byte) error {
return w.Broadcast(context.Background(), tag, data, true, nil)
}
func (w *whiteholeNetwork) Disconnect(badnode network.Peer) {
func (w *whiteholeNetwork) Disconnect(badnode network.DisconnectablePeer) {
return
}
func (w *whiteholeNetwork) DisconnectPeers() {
Expand All @@ -156,11 +155,11 @@ func (w *whiteholeNetwork) GetPeers(options ...network.PeerOption) []network.Pee
}
func (w *whiteholeNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettableConn) {
return nil
}

func (w *whiteholeNetwork) Start() {
func (w *whiteholeNetwork) Start() error {
w.quit = make(chan struct{})
go func(w *whiteholeNetwork) {
w.domain.messagesMu.Lock()
Expand Down Expand Up @@ -216,7 +215,7 @@ func (w *whiteholeNetwork) Start() {
atomic.AddUint32(&w.lastMsgRead, 1)
}
}(w)
return
return nil
}
func (w *whiteholeNetwork) getMux() *network.Multiplexer {
return w.mux
Expand Down
18 changes: 12 additions & 6 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"net"
"net/http"
"net/url"
"strings"
"testing"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -173,8 +172,8 @@ func (b *basicRPCNode) GetPeers(options ...network.PeerOption) []network.Peer {
return b.peers
}

func (b *basicRPCNode) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (b *basicRPCNode) GetGenesisID() string {
return "test genesisID"
}

type httpTestPeerSource struct {
Expand All @@ -191,8 +190,8 @@ func (s *httpTestPeerSource) RegisterHandlers(dispatch []network.TaggedMessageHa
s.dispatchHandlers = append(s.dispatchHandlers, dispatch...)
}

func (s *httpTestPeerSource) SubstituteGenesisID(rawURL string) string {
return strings.Replace(rawURL, "{genesisID}", "test genesisID", -1)
func (s *httpTestPeerSource) GetGenesisID() string {
return "test genesisID"
}

// implement network.HTTPPeer
Expand All @@ -201,8 +200,13 @@ type testHTTPPeer string
func (p *testHTTPPeer) GetAddress() string {
return string(*p)
}

func (p *testHTTPPeer) GetHTTPClient() *http.Client {
return &http.Client{}
return &http.Client{
Transport: &network.HTTPPAddressBoundTransport{
Addr: p.GetAddress(),
InnerTransport: http.DefaultTransport},
}
}
func (p *testHTTPPeer) GetHTTPPeer() network.HTTPPeer {
return p
Expand Down Expand Up @@ -238,6 +242,8 @@ func (p *testUnicastPeer) GetAddress() string {
return "test"
}

func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn }

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
Expand Down
9 changes: 1 addition & 8 deletions catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"io"
"net/http"
"path"
"strconv"
"time"

Expand Down Expand Up @@ -74,13 +73,7 @@ func makeLedgerFetcher(net network.GossipNode, accessor ledger.CatchpointCatchup
}

func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPeer, round basics.Round, method string) (*http.Response, error) {
parsedURL, err := network.ParseHostOrURL(peer.GetAddress())
if err != nil {
return nil, err
}

parsedURL.Path = lf.net.SubstituteGenesisID(path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
ledgerURL := parsedURL.String()
ledgerURL := network.SubstituteGenesisID(lf.net, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36))
lf.log.Debugf("ledger %s %#v peer %#v %T", method, ledgerURL, peer, peer)
request, err := http.NewRequestWithContext(ctx, method, ledgerURL, nil)
if err != nil {
Expand Down
56 changes: 50 additions & 6 deletions catchup/ledgerFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package catchup

import (
"archive/tar"
"context"
"fmt"
"net"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/logging"
p2ptesting "github.com/algorand/go-algorand/network/p2p/testing"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/test/partitiontest"
)

Expand Down Expand Up @@ -125,7 +128,7 @@ func TestLedgerFetcherErrorResponseHandling(t *testing.T) {
}
}

func TestLedgerFetcherHeadLedger(t *testing.T) {
func TestLedgerFetcher(t *testing.T) {
partitiontest.PartitionTest(t)

// create a dummy server.
Expand All @@ -136,16 +139,19 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
listener, err := net.Listen("tcp", "localhost:")

var httpServerResponse = 0
var contentTypes = make([]string, 0)
require.NoError(t, err)
go s.Serve(listener)
defer s.Close()
defer listener.Close()
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
for _, contentType := range contentTypes {
w.Header().Add("Content-Type", contentType)
if req.Method == http.MethodHead {
w.WriteHeader(httpServerResponse)
} else {
w.Header().Add("Content-Type", rpcs.LedgerResponseContentType)
w.WriteHeader(httpServerResponse)
wtar := tar.NewWriter(w)
wtar.Close()
}
w.WriteHeader(httpServerResponse)
})
successPeer := testHTTPPeer(listener.Addr().String())
lf := makeLedgerFetcher(&mocks.MockNetwork{}, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal())
Expand All @@ -157,7 +163,7 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
// headLedger parseURL failure
parseFailurePeer := testHTTPPeer("foobar")
err = lf.headLedger(context.Background(), &parseFailurePeer, basics.Round(0))
require.Equal(t, fmt.Errorf("could not parse a host from url"), err)
require.ErrorContains(t, err, "could not parse a host from url")

// headLedger 404 response
httpServerResponse = http.StatusNotFound
Expand All @@ -169,8 +175,46 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
err = lf.headLedger(context.Background(), &successPeer, basics.Round(0))
require.NoError(t, err)

httpServerResponse = http.StatusOK
err = lf.downloadLedger(context.Background(), &successPeer, basics.Round(0))
require.NoError(t, err)

// headLedger 500 response
httpServerResponse = http.StatusInternalServerError
err = lf.headLedger(context.Background(), &successPeer, basics.Round(0))
require.Equal(t, fmt.Errorf("headLedger error response status code %d", http.StatusInternalServerError), err)
}

func TestLedgerFetcherP2P(t *testing.T) {
partitiontest.PartitionTest(t)

mux := http.NewServeMux()
nodeA := p2ptesting.MakeHTTPNode(t)
nodeA.RegisterHTTPHandler("/v1/ledger/0", mux)
var httpServerResponse = 0
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodHead {
w.WriteHeader(httpServerResponse)
} else {
w.Header().Add("Content-Type", rpcs.LedgerResponseContentType)
w.WriteHeader(httpServerResponse)
wtar := tar.NewWriter(w)
wtar.Close()
}
})

nodeA.Start()
defer nodeA.Stop()

successPeer := nodeA.GetHTTPPeer()
lf := makeLedgerFetcher(nodeA, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal())

// headLedger 200 response
httpServerResponse = http.StatusOK
err := lf.headLedger(context.Background(), successPeer, basics.Round(0))
require.NoError(t, err)

httpServerResponse = http.StatusOK
err = lf.downloadLedger(context.Background(), successPeer, basics.Round(0))
require.NoError(t, err)
}
7 changes: 1 addition & 6 deletions catchup/universalFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,8 @@ type HTTPFetcher struct {
// getBlockBytes gets a block.
// Core piece of FetcherClient interface
func (hf *HTTPFetcher) getBlockBytes(ctx context.Context, r basics.Round) (data []byte, err error) {
parsedURL, err := network.ParseHostOrURL(hf.rootURL)
if err != nil {
return nil, err
}
blockURL := rpcs.FormatBlockQuery(uint64(r), "", hf.net)

parsedURL.Path = rpcs.FormatBlockQuery(uint64(r), parsedURL.Path, hf.net)
blockURL := parsedURL.String()
hf.log.Debugf("block GET %#v peer %#v %T", blockURL, hf.peer, hf.peer)
request, err := http.NewRequest("GET", blockURL, nil)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/algod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/algorand/go-algorand/data/bookkeeping"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/network/addr"
"github.com/algorand/go-algorand/protocol"
toolsnet "github.com/algorand/go-algorand/tools/network"
"github.com/algorand/go-algorand/util"
Expand Down Expand Up @@ -282,7 +282,7 @@ func run() int {

// make sure that the format of each entry is valid:
for idx, peer := range peerOverrideArray {
addr, addrErr := network.ParseHostOrURLOrMultiaddr(peer)
addr, addrErr := addr.ParseHostOrURLOrMultiaddr(peer)
if addrErr != nil {
fmt.Fprintf(os.Stderr, "Provided command line parameter '%s' is not a valid host:port pair\n", peer)
return 1
Expand Down
4 changes: 2 additions & 2 deletions cmd/goal/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/algorand/go-algorand/daemon/algod/api/server/v2/generated/model"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/libgoal"
"github.com/algorand/go-algorand/network"
naddr "github.com/algorand/go-algorand/network/addr"
"github.com/algorand/go-algorand/nodecontrol"
"github.com/algorand/go-algorand/util"
"github.com/algorand/go-algorand/util/tokens"
Expand Down Expand Up @@ -751,7 +751,7 @@ func verifyPeerDialArg() bool {

// make sure that the format of each entry is valid:
for _, peer := range strings.Split(peerDial, ";") {
_, err := network.ParseHostOrURLOrMultiaddr(peer)
_, err := naddr.ParseHostOrURLOrMultiaddr(peer)
if err != nil {
reportErrorf("Provided peer '%s' is not a valid peer address : %v", peer, err)
return false
Expand Down
Loading

0 comments on commit c8407ab

Please sign in to comment.