Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network:ws block byte limiter #5472

Merged
merged 30 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
17c532f
Base implementation, no testing
iansuvak Jun 6, 2023
80d71d4
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 13, 2023
413a2bd
Make existing tests pass
iansuvak Jun 13, 2023
c0bf235
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 16, 2023
503c439
proposl: count memory in blockServer, use callback to decrement mem c…
algonautshant Jun 16, 2023
2f43c59
Merge pull request #4 from algonautshant/shant/suggestion/ian/ws-bloc…
iansuvak Jun 17, 2023
e6c7908
rename CallWhenDone to Callback
iansuvak Jun 20, 2023
a6716ea
encoding: Update go-codec version. (#5471)
winder Jun 16, 2023
3529573
Chore: Use strings.Cut for clarity (#5474)
jannotti Jun 16, 2023
ff1cdfe
network: improve MsgOfInterest message handling (#5476)
cce Jun 16, 2023
68664af
ledger: refactor store module interfaces before kv impl merge (#5451)
icorderi Jun 16, 2023
d2d8f9d
tools: replace upload_metrics (#5470)
shiqizng Jun 16, 2023
5dcf685
Unify ws and http limits to block_service.go
iansuvak Jun 21, 2023
bd0ee61
call after succeeding or failing instead of before
iansuvak Jun 21, 2023
b1b8277
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 21, 2023
42c34fb
noop test skeleton
iansuvak Jun 22, 2023
77771ed
Almost working network test
iansuvak Jun 22, 2023
83c8bea
network side test
iansuvak Jun 23, 2023
977ce11
use locking methods to create responsechannels
iansuvak Jun 23, 2023
5aa8e33
fix blockservice test
iansuvak Jun 23, 2023
f623eec
Merge remote-tracking branch 'upstream/master' into ws-block-byte-lim…
iansuvak Jun 26, 2023
46d9d53
add callbacks to shutdown cases
iansuvak Jun 26, 2023
72ec677
add another test to confirm draining explicitly
iansuvak Jun 26, 2023
84bb544
Change callback to OnMessageRelease()
iansuvak Jun 27, 2023
62414ed
Use a separate counter for memory
iansuvak Jun 27, 2023
fc1306b
move onRelease to OutgoingMessage
iansuvak Jun 29, 2023
6ba239e
add counters for both http and ws and don't share memory information …
iansuvak Jun 29, 2023
18f3869
remove the now unused network side metric
iansuvak Jun 29, 2023
e1b6411
fix reviewdog issue
iansuvak Jun 29, 2023
254ec5c
fix failing test
iansuvak Jun 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,15 @@ func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics
}
}

func (p *testUnicastPeer) Respond(ctx context.Context, reqMsg network.IncomingMessage, responseTopics network.Topics) (e error) {
func (p *testUnicastPeer) Respond(ctx context.Context, reqMsg network.IncomingMessage, outMsg network.OutgoingMessage) (e error) {

hashKey := uint64(0)
channel, found := p.responseChannels[hashKey]
if !found {
}

select {
case channel <- &network.Response{Topics: responseTopics}:
case channel <- &network.Response{Topics: outMsg.Topics}:
default:
}

Expand Down
2 changes: 1 addition & 1 deletion catchup/peerSelector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (d *mockUnicastPeer) Version() string {
func (d *mockUnicastPeer) Request(ctx context.Context, tag network.Tag, topics network.Topics) (resp *network.Response, e error) {
return nil, nil
}
func (d *mockUnicastPeer) Respond(ctx context.Context, reqMsg network.IncomingMessage, topics network.Topics) (e error) {
func (d *mockUnicastPeer) Respond(ctx context.Context, reqMsg network.IncomingMessage, outMsg network.OutgoingMessage) (e error) {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions config/localTemplate.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,9 +517,9 @@ type Local struct {
// only relevant if TxIncomingFilteringFlags is non-zero
TxIncomingFilterMaxSize uint64 `version[28]:"500000"`

// BlockServiceHTTPMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests.
// BlockServiceMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests.
// When it exceeds this capacity, it redirects the block requests to a different node
BlockServiceHTTPMemCap uint64 `version[28]:"500000000"`
BlockServiceMemCap uint64 `version[28]:"500000000"`
}

// DNSBootstrapArray returns an array of one or more DNS Bootstrap identifiers
Expand Down
2 changes: 1 addition & 1 deletion config/local_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var defaultLocal = Local{
Archival: false,
BaseLoggerDebugLevel: 4,
BlockServiceCustomFallbackEndpoints: "",
BlockServiceHTTPMemCap: 500000000,
BlockServiceMemCap: 500000000,
BroadcastConnectionsLimit: -1,
CadaverDirectory: "",
CadaverSizeTarget: 0,
Expand Down
2 changes: 1 addition & 1 deletion installer/config.json.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"Archival": false,
"BaseLoggerDebugLevel": 4,
"BlockServiceCustomFallbackEndpoints": "",
"BlockServiceHTTPMemCap": 500000000,
"BlockServiceMemCap": 500000000,
"BroadcastConnectionsLimit": -1,
"CadaverDirectory": "",
"CadaverSizeTarget": 0,
Expand Down
6 changes: 5 additions & 1 deletion network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ type OutgoingMessage struct {
Payload []byte
Topics Topics
reason disconnectReason // used when Action == Disconnect

// OnRelease is a function called when outgoing message, resulting from this incoming message, is released
// either by being sent or discarded.
OnRelease func()
}

// ForwardingPolicy is an enum indicating to whom we should send a message
Expand Down Expand Up @@ -1320,7 +1324,7 @@ func (wn *WebsocketNetwork) messageHandlerThread(peersConnectivityCheckCh <-chan
wn.log.Warnf("WebsocketNetwork.messageHandlerThread: WebsocketNetwork.Broadcast returned unexpected error %v", err)
}
case Respond:
err := msg.Sender.(*wsPeer).Respond(wn.ctx, msg, outmsg.Topics)
err := msg.Sender.(*wsPeer).Respond(wn.ctx, msg, outmsg)
if err != nil && err != wn.ctx.Err() {
wn.log.Warnf("WebsocketNetwork.messageHandlerThread: wsPeer.Respond returned unexpected error %v", err)
}
Expand Down
82 changes: 81 additions & 1 deletion network/wsNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func TestWebsocketNetworkCancel(t *testing.T) {
msgs[50].ctx = ctx

for _, peer := range peers {
peer.sendBufferHighPrio <- sendMessages{msgs}
peer.sendBufferHighPrio <- sendMessages{msgs: msgs}
}

select {
Expand Down Expand Up @@ -4509,3 +4509,83 @@ func TestMergePrimarySecondaryRelayAddressListsNoDedupExp(t *testing.T) {
assert.ElementsMatch(t, expectedRelayAddresses, mergedRelayAddresses)
})
}

// TestSendMessageCallbacks tests that the SendMessage callbacks are called correctly. These are currently used for
// decrementing the number of bytes considered currently in flight for blockservice memcaps.
func TestSendMessageCallbacks(t *testing.T) {
partitiontest.PartitionTest(t)
netA, netB, _, closeFunc := setupWebsocketNetworkAB(t, 2)
_ = netB
defer closeFunc()

var counter uint64
require.NotZero(t, netA.NumPeers())
peer := netA.peers[0]
// Need to create a channel so that the message doesn't get filtered out
netB.peers[0].makeResponseChannel(1)
for i := 0; i < 100; i++ {
randInt := crypto.RandUint64()%(128) + 1
atomic.AddUint64(&counter, randInt)
topic := MakeTopic("val", []byte("blah"))
callback := func() {
atomic.AddUint64(&counter, ^uint64(randInt-1))
}
msg := IncomingMessage{Sender: netA.peers[0], Tag: protocol.UniEnsBlockReqTag}
peer.Respond(context.Background(), msg, OutgoingMessage{OnRelease: callback, Topics: Topics{topic}})
}
// force it to disconnect by removing the only response channel -- this is breach of protocol.
netB.peers[0].getAndRemoveResponseChannel(1)
// confirm that we still have messages in the send buffer
require.NotZero(t, len(peer.sendBufferBulk))

// confirm that eventually the messages get drained during the cleanup
require.Eventually(t,
func() bool { return atomic.LoadUint64(&counter) == uint64(0) },
500*time.Millisecond,
25*time.Millisecond,
)
require.Eventually(t,
func() bool { return netA.NumPeers() == 0 },
500*time.Millisecond,
25*time.Millisecond,
)
}

func TestSendMessageCallbackDrain(t *testing.T) {
partitiontest.PartitionTest(t)

node := makeTestWebsocketNode(t)
destPeer := wsPeer{
closing: make(chan struct{}),
sendBufferHighPrio: make(chan sendMessages, sendBufferLength),
sendBufferBulk: make(chan sendMessages, sendBufferLength),
conn: &nopConnSingleton,
}
node.addPeer(&destPeer)
node.Start()
defer node.Stop()

var target, counter uint64
// send messages to the peer that won't read them so they will sit in the sendQueue
for i := 0; i < 10; i++ {
randInt := crypto.RandUint64()%(128) + 1
target += randInt
topic := MakeTopic("val", []byte("blah"))
callback := func() {
counter += randInt
}
msg := IncomingMessage{Sender: node.peers[0], Tag: protocol.UniEnsBlockReqTag}
destPeer.Respond(context.Background(), msg, OutgoingMessage{OnRelease: callback, Topics: Topics{topic}})
}
require.Len(t, destPeer.sendBufferBulk, 10)
require.Zero(t, counter)
require.Positive(t, target)
// close the peer to trigger draining of the queue callbacks
destPeer.Close(time.Now().Add(time.Second))

require.Eventually(t,
func() bool { return target == counter },
2*time.Second,
50*time.Millisecond,
)
}
31 changes: 27 additions & 4 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type Response struct {

type sendMessages struct {
msgs []sendMessage

// onRelease function is called when the message is released either by being sent or discarded.
onRelease func()
}

type wsPeer struct {
Expand Down Expand Up @@ -312,7 +315,7 @@ type UnicastPeer interface {
// Version returns the matching version from network.SupportedProtocolVersions
Version() string
Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error)
Respond(ctx context.Context, reqMsg IncomingMessage, topics Topics) (e error)
Respond(ctx context.Context, reqMsg IncomingMessage, outMsg OutgoingMessage) (e error)
}

// TCPInfoUnicastPeer exposes information about the underlying connection if available on the platform
Expand Down Expand Up @@ -388,15 +391,15 @@ func (wp *wsPeer) GetUnderlyingConnTCPInfo() (*util.TCPInfo, error) {
}

// Respond sends the response of a request message
func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseTopics Topics) (e error) {
func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, outMsg OutgoingMessage) (e error) {

// Get the hash/key of the request message
requestHash := hashTopics(reqMsg.Data)

// Add the request hash
requestHashData := make([]byte, binary.MaxVarintLen64)
binary.PutUvarint(requestHashData, requestHash)
responseTopics = append(responseTopics, Topic{key: requestHashKey, data: requestHashData})
responseTopics := append(outMsg.Topics, Topic{key: requestHashKey, data: requestHashData})

// Serialize the topics
serializedMsg := responseTopics.MarshallTopics()
Expand All @@ -411,11 +414,13 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseT
}

select {
case wp.sendBufferBulk <- sendMessages{msgs: msg}:
case wp.sendBufferBulk <- sendMessages{msgs: msg, onRelease: outMsg.OnRelease}:
case <-wp.closing:
outMsg.OnRelease()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you need to check if OnRelease is nil or not?

wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String())
return
case <-ctx.Done():
outMsg.OnRelease()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar here

return ctx.Err()
}
return nil
Expand Down Expand Up @@ -715,6 +720,9 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) {
}

func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason {
if msgs.onRelease != nil {
defer msgs.onRelease()
}
for _, msg := range msgs.msgs {
select {
case <-msg.ctx.Done():
Expand Down Expand Up @@ -923,6 +931,21 @@ func (wp *wsPeer) Close(deadline time.Time) {
wp.net.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddr().String())
}
}

// We need to loop through all of the messages with callbacks still in the send queue and call them
// to ensure that state of counters such as wsBlockBytesUsed is correct.
L:
for {
select {
case msgs := <-wp.sendBufferBulk:
if msgs.onRelease != nil {
msgs.onRelease()
}
default:
break L
}

}
// now call all registered closers
for _, f := range wp.closers {
f()
Expand Down
51 changes: 44 additions & 7 deletions rpcs/blockService.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/gorilla/mux"

Expand All @@ -42,6 +43,7 @@ import (
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/metrics"
)

// BlockResponseContentType is the HTTP Content-Type header for a raw binary block
Expand All @@ -67,12 +69,21 @@ const (

var errBlockServiceClosed = errors.New("block service is shutting down")

const errMemoryAtCapacityPublic = "block service memory over capacity"

type errMemoryAtCapacity struct{ capacity, used uint64 }

func (err errMemoryAtCapacity) Error() string {
return fmt.Sprintf("block service memory over capacity: %d / %d", err.used, err.capacity)
}

var wsBlockMessagesDroppedCounter = metrics.MakeCounter(
metrics.MetricName{Name: "algod_rpcs_ws_reqs_dropped", Description: "Number of websocket block requests dropped due to memory capacity"},
)
var httpBlockMessagesDroppedCounter = metrics.MakeCounter(
metrics.MetricName{Name: "algod_rpcs_http_reqs_dropped", Description: "Number of http block requests dropped due to memory capacity"},
)

// LedgerForBlockService describes the Ledger methods used by BlockService.
type LedgerForBlockService interface {
EncodedBlockCert(rnd basics.Round) (blk []byte, cert []byte, err error)
Expand All @@ -93,6 +104,7 @@ type BlockService struct {
closeWaitGroup sync.WaitGroup
mu deadlock.Mutex
memoryUsed uint64
wsMemoryUsed uint64
memoryCap uint64
}

Expand Down Expand Up @@ -130,7 +142,7 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger LedgerForB
fallbackEndpoints: makeFallbackEndpoints(log, config.BlockServiceCustomFallbackEndpoints),
enableArchiverFallback: config.EnableBlockServiceFallbackToArchiver,
log: log,
memoryCap: config.BlockServiceHTTPMemCap,
memoryCap: config.BlockServiceMemCap,
}
if service.enableService {
net.RegisterHTTPHandler(BlockServiceBlockPath, service)
Expand Down Expand Up @@ -244,6 +256,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re
response.WriteHeader(http.StatusServiceUnavailable)
bs.log.Debugf("ServeHTTP: returned retry-after: %v", err)
}
httpBlockMessagesDroppedCounter.Inc(nil)
return
default:
// unexpected error.
Expand Down Expand Up @@ -301,11 +314,35 @@ const datatypeUnsupportedErrMsg = "requested data type is unsupported"
func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.IncomingMessage) {
target := reqMsg.Sender.(network.UnicastPeer)
var respTopics network.Topics
var n uint64

defer func() {
target.Respond(ctx, reqMsg, respTopics)
outMsg := network.OutgoingMessage{Topics: respTopics}
if n > 0 {
outMsg.OnRelease = func() {
atomic.AddUint64(&bs.wsMemoryUsed, ^uint64(n-1))
}
atomic.AddUint64(&bs.wsMemoryUsed, (n))
}
err := target.Respond(ctx, reqMsg, outMsg)
if err != nil {
bs.log.Warnf("BlockService handleCatchupReq: failed to respond: %s", err)
}
}()

// If we are over-capacity, we will not process the request
// respond to sender with error message
memUsed := atomic.LoadUint64(&bs.wsMemoryUsed)
if memUsed > bs.memoryCap {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to just not return a response here? Or at least not give the peer memory information

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question.
We need to do something similar to the HTTP request. There we got it for free, using the protocol retry message.
Here, we need to implement on the requester side, how to react to this error.

In this case, the requester should request it from another peer. It cannot wait and request again, since these requests are time sensitive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what it already does after a 4 second timeout even if you return nothing so not returning a response should be the same as returning an error response

Copy link
Contributor

@algonautshant algonautshant Jun 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for WS 4 seconds is a very long time if we want to address this properly.
The request will come from the agreement service, so it needs the block in milliseconds, not seconds :-)

If there is a system cause for not receiving the proposed block (i.e. network disruption), for the agreement to move forward will need the blocks serviced as fast as possible. 4 seconds might be okay, we get a 20 second round, but this may get longer very quickly, and we can easily do better.

Again, this is a product question: should be add a little bit more protocol implementation to address this rare situation or we leave it as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error in that case should still be good enough, but 4 second timeouts do happen somewhat regularly right now

err := errMemoryAtCapacity{capacity: bs.memoryCap, used: memUsed}
bs.log.Infof("BlockService handleCatchupReq: %s", err.Error())
respTopics = network.Topics{
network.MakeTopic(network.ErrorKey, []byte(errMemoryAtCapacityPublic)),
}
wsBlockMessagesDroppedCounter.Inc(nil)
return
}

topics, err := network.UnmarshallTopics(reqMsg.Data)
if err != nil {
bs.log.Infof("BlockService handleCatchupReq: %s", err.Error())
Expand Down Expand Up @@ -338,7 +375,7 @@ func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.Inc
[]byte(roundNumberParseErrMsg))}
return
}
respTopics = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType))
respTopics, n = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType))
return
}

Expand Down Expand Up @@ -416,7 +453,7 @@ func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) {
return data, err
}

func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) network.Topics {
func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) (network.Topics, uint64) {
blk, cert, err := dataLedger.EncodedBlockCert(round)
if err != nil {
switch err.(type) {
Expand All @@ -425,7 +462,7 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round
log.Infof("BlockService topicBlockBytes: %s", err)
}
return network.Topics{
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))}
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))}, 0
}
switch requestType {
case BlockAndCertValue:
Expand All @@ -434,10 +471,10 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round
BlockDataKey, blk),
network.MakeTopic(
CertDataKey, cert),
}
}, uint64(len(blk) + len(cert))
default:
return network.Topics{
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))}
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))}, 0
}
}

Expand Down
Loading