Skip to content

Commit

Permalink
rpcs: simplify API for BlockService to handle multiple HTTP paths (al…
Browse files Browse the repository at this point in the history
…gorand#5718)

Co-authored-by: Pavel Zbitskiy <pavel@algorand.com>
  • Loading branch information
zeldovich and algorandskiy authored Jul 26, 2024
1 parent 04ec5f9 commit edda2ee
Show file tree
Hide file tree
Showing 14 changed files with 80 additions and 30 deletions.
7 changes: 7 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ func (b *basicRPCNode) RegisterHTTPHandler(path string, handler http.Handler) {
b.rmux.Handle(path, handler)
}

func (b *basicRPCNode) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
if b.rmux == nil {
b.rmux = mux.NewRouter()
}
b.rmux.HandleFunc(path, handler)
}

func (b *basicRPCNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {
}

Expand Down
2 changes: 1 addition & 1 deletion catchup/pref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
net := &httpTestPeerSource{}
ls := rpcs.MakeBlockService(logging.TestingLog(b), config.GetDefaultLocal(), remote, net, "test genesisID")
nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down
22 changes: 11 additions & 11 deletions catchup/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestServiceFetchBlocksSameRange(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestSyncRound(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestPeriodicSync(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -379,7 +379,7 @@ func TestServiceFetchBlocksOneBlock(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -443,7 +443,7 @@ func TestAbruptWrites(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestServiceFetchBlocksMultiBlocks(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -555,7 +555,7 @@ func TestServiceFetchBlocksMalformed(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -709,7 +709,7 @@ func helperTestOnSwitchToUnSupportedProtocol(
ls := rpcs.MakeBlockService(logging.Base(), config, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -932,7 +932,7 @@ func TestCatchupUnmatchedCertificate(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -1064,7 +1064,7 @@ func TestServiceLedgerUnavailable(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down Expand Up @@ -1110,7 +1110,7 @@ func TestServiceNoBlockForRound(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, remote, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down
2 changes: 1 addition & 1 deletion catchup/universalFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestUGetBlockHTTP(t *testing.T) {
ls := rpcs.MakeBlockService(logging.Base(), blockServiceConfig, ledger, net, "test genesisID")

nodeA := basicRPCNode{}
nodeA.RegisterHTTPHandler(rpcs.BlockServiceBlockPath, ls)
ls.RegisterHandlers(&nodeA)
nodeA.start()
defer nodeA.stop()
rootURL := nodeA.rootURL()
Expand Down
4 changes: 4 additions & 0 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func (network *MockNetwork) ClearProcessors() {
func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}

// RegisterHTTPHandlerFunc - empty implementation
func (network *MockNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
}

// OnNetworkAdvance - empty implementation
func (network *MockNetwork) OnNetworkAdvance() {}

Expand Down
3 changes: 2 additions & 1 deletion network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ type GossipNode interface {
Disconnect(badnode DisconnectablePeer)
DisconnectPeers() // only used by testing

// RegisterHTTPHandler path accepts gorilla/mux path annotations
// RegisterHTTPHandler and RegisterHTTPHandlerFunc: path accepts gorilla/mux path annotations
RegisterHTTPHandler(path string, handler http.Handler)
RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request))

// RequestConnectOutgoing asks the system to actually connect to peers.
// `replace` optionally drops existing connections before making new ones.
Expand Down
6 changes: 6 additions & 0 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ func (n *HybridP2PNetwork) RegisterHTTPHandler(path string, handler http.Handler
n.wsNetwork.RegisterHTTPHandler(path, handler)
}

// RegisterHTTPHandlerFunc implements GossipNode
func (n *HybridP2PNetwork) RegisterHTTPHandlerFunc(path string, handlerFunc func(http.ResponseWriter, *http.Request)) {
n.p2pNetwork.RegisterHTTPHandlerFunc(path, handlerFunc)
n.wsNetwork.RegisterHTTPHandlerFunc(path, handlerFunc)
}

// RequestConnectOutgoing implements GossipNode
func (n *HybridP2PNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {}

Expand Down
8 changes: 8 additions & 0 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func (s *HTTPServer) RegisterHTTPHandler(path string, handler http.Handler) {
})
}

// RegisterHTTPHandlerFunc registers a http handler with a given path.
func (s *HTTPServer) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
s.p2phttpMux.HandleFunc(path, handler)
s.p2phttpMuxRegistrarOnce.Do(func() {
s.Host.SetHTTPHandlerAtPath(algorandP2pHTTPProtocol, "/", s.p2phttpMux)
})
}

// MakeHTTPClient creates a http.Client that uses libp2p transport for a given protocol and peer address.
func MakeHTTPClient(addrInfo *peer.AddrInfo) (*http.Client, error) {
clientStreamHost, err := libp2p.New(libp2p.NoListenAddrs)
Expand Down
6 changes: 6 additions & 0 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,12 @@ func (n *P2PNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
n.httpServer.RegisterHTTPHandler(path, handler)
}

// RegisterHTTPHandlerFunc is like RegisterHTTPHandler but accepts
// a callback handler function instead of a method receiver.
func (n *P2PNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
n.httpServer.RegisterHTTPHandlerFunc(path, handler)
}

// RequestConnectOutgoing asks the system to actually connect to peers.
// `replace` optionally drops existing connections before making new ones.
// `quit` chan allows cancellation.
Expand Down
5 changes: 5 additions & 0 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ func (wn *WebsocketNetwork) RegisterHTTPHandler(path string, handler http.Handle
wn.router.Handle(path, handler)
}

// RegisterHTTPHandlerFunc path accepts gorilla/mux path annotations
func (wn *WebsocketNetwork) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
wn.router.HandleFunc(path, handler)
}

// RequestConnectOutgoing tries to actually do the connect to new peers.
// `replace` drop all connections first and find new peers.
func (wn *WebsocketNetwork) RequestConnectOutgoing(replace bool, quit <-chan struct{}) {
Expand Down
17 changes: 11 additions & 6 deletions rpcs/blockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const blockServerCatchupRequestBufferSize = 10
const BlockResponseLatestRoundHeader = "X-Latest-Round"

// BlockServiceBlockPath is the path to register BlockService as a handler for when using gorilla/mux
// e.g. .Handle(BlockServiceBlockPath, &ls)
// e.g. .HandleFunc(BlockServiceBlockPath, ls.ServeBlockPath)
const BlockServiceBlockPath = "/v{version:[0-9.]+}/{genesisID}/block/{round:[0-9a-z]+}"

// Constant strings used as keys for topics
Expand Down Expand Up @@ -147,11 +147,16 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger LedgerForB
memoryCap: config.BlockServiceMemCap,
}
if service.enableService {
net.RegisterHTTPHandler(BlockServiceBlockPath, service)
service.RegisterHandlers(net)
}
return service
}

// RegisterHandlers registers the request handlers for BlockService's paths with the registrar.
func (bs *BlockService) RegisterHandlers(registrar Registrar) {
registrar.RegisterHTTPHandlerFunc(BlockServiceBlockPath, bs.ServeBlockPath)
}

// Start listening to catchup requests over ws
func (bs *BlockService) Start() {
bs.mu.Lock()
Expand Down Expand Up @@ -179,10 +184,10 @@ func (bs *BlockService) Stop() {
bs.closeWaitGroup.Wait()
}

// ServerHTTP returns blocks
// ServeBlockPath returns blocks
// Either /v{version}/{genesisID}/block/{round} or ?b={round}&v={version}
// Uses gorilla/mux for path argument parsing.
func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Request) {
func (bs *BlockService) ServeBlockPath(response http.ResponseWriter, request *http.Request) {
pathVars := mux.Vars(request)
versionStr, hasVersionStr := pathVars["version"]
roundStr, hasRoundStr := pathVars["round"]
Expand Down Expand Up @@ -260,13 +265,13 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
if !ok {
response.Header().Set("Retry-After", blockResponseRetryAfter)
response.WriteHeader(http.StatusServiceUnavailable)
bs.log.Debugf("ServeHTTP: returned retry-after: %v", err)
bs.log.Debugf("ServeBlockPath: returned retry-after: %v", err)
}
httpBlockMessagesDroppedCounter.Inc(nil)
return
default:
// unexpected error.
bs.log.Warnf("ServeHTTP : failed to retrieve block %d %v", round, err)
bs.log.Warnf("ServeBlockPath: failed to retrieve block %d %v", round, err)
response.WriteHeader(http.StatusInternalServerError)
return
}
Expand Down
19 changes: 9 additions & 10 deletions rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ func TestRedirectFallbackEndpoints(t *testing.T) {
bs1 := MakeBlockService(log, config, ledger1, net1, "test-genesis-ID")
bs2 := MakeBlockService(log, config, ledger2, net2, "test-genesis-ID")

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
bs1.RegisterHandlers(nodeA)
bs2.RegisterHandlers(nodeB)

parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
require.NoError(t, err)
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestBlockServiceShutdown(t *testing.T) {

nodeA := &basicRPCNode{}

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
bs1.RegisterHandlers(nodeA)
nodeA.start()
defer nodeA.stop()

Expand Down Expand Up @@ -292,9 +292,8 @@ func TestRedirectOnFullCapacity(t *testing.T) {
bs1.memoryCap = 250
bs2.memoryCap = 250

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)

nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
bs1.RegisterHandlers(nodeA)
bs2.RegisterHandlers(nodeB)

parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
require.NoError(t, err)
Expand Down Expand Up @@ -371,11 +370,11 @@ forloop:

// First node redirects, does not return retry
require.True(t, strings.Contains(logBuffer1.String(), "redirectRequest: redirected block request to"))
require.False(t, strings.Contains(logBuffer1.String(), "ServeHTTP: returned retry-after: block service memory over capacity"))
require.False(t, strings.Contains(logBuffer1.String(), "ServeBlockPath: returned retry-after: block service memory over capacity"))

// Second node cannot redirect, it returns retry-after when over capacity
require.False(t, strings.Contains(logBuffer2.String(), "redirectRequest: redirected block request to"))
require.True(t, strings.Contains(logBuffer2.String(), "ServeHTTP: returned retry-after: block service memory over capacity"))
require.True(t, strings.Contains(logBuffer2.String(), "ServeBlockPath: returned retry-after: block service memory over capacity"))
}

// TestWsBlockLimiting ensures that limits are applied correctly on the websocket side of the service
Expand Down Expand Up @@ -474,8 +473,8 @@ func TestRedirectExceptions(t *testing.T) {
bs1 := MakeBlockService(log1, configInvalidRedirects, ledger1, net1, "{genesisID}")
bs2 := MakeBlockService(log2, configWithRedirectToSelf, ledger2, net2, "{genesisID}")

nodeA.RegisterHTTPHandler(BlockServiceBlockPath, bs1)
nodeB.RegisterHTTPHandler(BlockServiceBlockPath, bs2)
bs1.RegisterHandlers(nodeA)
bs2.RegisterHandlers(nodeB)

parsedURL, err := addr.ParseHostOrURL(nodeA.rootURL())
require.NoError(t, err)
Expand Down
2 changes: 2 additions & 0 deletions rpcs/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
type Registrar interface {
// RegisterHTTPHandler path accepts gorilla/mux path annotations
RegisterHTTPHandler(path string, handler http.Handler)
// RegisterHTTPHandlerFunc path accepts gorilla/mux path annotations and a HandlerFunc
RegisterHTTPHandlerFunc(path string, handler func(response http.ResponseWriter, request *http.Request))
// RegisterHandlers exposes global websocket handler registration
RegisterHandlers(dispatch []network.TaggedMessageHandler)
}
7 changes: 7 additions & 0 deletions rpcs/txService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ func (b *basicRPCNode) RegisterHTTPHandler(path string, handler http.Handler) {
b.rmux.Handle(path, handler)
}

func (b *basicRPCNode) RegisterHTTPHandlerFunc(path string, handler func(http.ResponseWriter, *http.Request)) {
if b.rmux == nil {
b.rmux = mux.NewRouter()
}
b.rmux.HandleFunc(path, handler)
}

func (b *basicRPCNode) RegisterHandlers(dispatch []network.TaggedMessageHandler) {
}

Expand Down

0 comments on commit edda2ee

Please sign in to comment.