-
Notifications
You must be signed in to change notification settings - Fork 491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
network:ws block byte limiter #5472
Changes from all commits
17c532f
80d71d4
413a2bd
c0bf235
503c439
2f43c59
e6c7908
a6716ea
3529573
ff1cdfe
68664af
d2d8f9d
5dcf685
bd0ee61
b1b8277
42c34fb
77771ed
83c8bea
977ce11
5aa8e33
f623eec
46d9d53
72ec677
84bb544
62414ed
fc1306b
6ba239e
18f3869
e1b6411
254ec5c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -175,6 +175,9 @@ type Response struct { | |
|
||
type sendMessages struct { | ||
msgs []sendMessage | ||
|
||
// onRelease function is called when the message is released either by being sent or discarded. | ||
onRelease func() | ||
} | ||
|
||
type wsPeer struct { | ||
|
@@ -312,7 +315,7 @@ type UnicastPeer interface { | |
// Version returns the matching version from network.SupportedProtocolVersions | ||
Version() string | ||
Request(ctx context.Context, tag Tag, topics Topics) (resp *Response, e error) | ||
Respond(ctx context.Context, reqMsg IncomingMessage, topics Topics) (e error) | ||
Respond(ctx context.Context, reqMsg IncomingMessage, outMsg OutgoingMessage) (e error) | ||
} | ||
|
||
// TCPInfoUnicastPeer exposes information about the underlying connection if available on the platform | ||
|
@@ -388,15 +391,15 @@ func (wp *wsPeer) GetUnderlyingConnTCPInfo() (*util.TCPInfo, error) { | |
} | ||
|
||
// Respond sends the response of a request message | ||
func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseTopics Topics) (e error) { | ||
func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, outMsg OutgoingMessage) (e error) { | ||
|
||
// Get the hash/key of the request message | ||
requestHash := hashTopics(reqMsg.Data) | ||
|
||
// Add the request hash | ||
requestHashData := make([]byte, binary.MaxVarintLen64) | ||
binary.PutUvarint(requestHashData, requestHash) | ||
responseTopics = append(responseTopics, Topic{key: requestHashKey, data: requestHashData}) | ||
responseTopics := append(outMsg.Topics, Topic{key: requestHashKey, data: requestHashData}) | ||
|
||
// Serialize the topics | ||
serializedMsg := responseTopics.MarshallTopics() | ||
|
@@ -411,11 +414,13 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseT | |
} | ||
|
||
select { | ||
case wp.sendBufferBulk <- sendMessages{msgs: msg}: | ||
case wp.sendBufferBulk <- sendMessages{msgs: msg, onRelease: outMsg.OnRelease}: | ||
case <-wp.closing: | ||
outMsg.OnRelease() | ||
wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) | ||
return | ||
case <-ctx.Done(): | ||
outMsg.OnRelease() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. similar here |
||
return ctx.Err() | ||
} | ||
return nil | ||
|
@@ -715,6 +720,9 @@ func (wp *wsPeer) handleFilterMessage(msg IncomingMessage) { | |
} | ||
|
||
func (wp *wsPeer) writeLoopSend(msgs sendMessages) disconnectReason { | ||
if msgs.onRelease != nil { | ||
defer msgs.onRelease() | ||
} | ||
for _, msg := range msgs.msgs { | ||
select { | ||
case <-msg.ctx.Done(): | ||
|
@@ -923,6 +931,21 @@ func (wp *wsPeer) Close(deadline time.Time) { | |
wp.net.log.Infof("failed to CloseWithoutFlush to connection for %s", wp.conn.RemoteAddr().String()) | ||
} | ||
} | ||
|
||
// We need to loop through all of the messages with callbacks still in the send queue and call them | ||
// to ensure that state of counters such as wsBlockBytesUsed is correct. | ||
L: | ||
for { | ||
select { | ||
case msgs := <-wp.sendBufferBulk: | ||
if msgs.onRelease != nil { | ||
msgs.onRelease() | ||
} | ||
default: | ||
break L | ||
} | ||
|
||
} | ||
// now call all registered closers | ||
for _, f := range wp.closers { | ||
f() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ import ( | |
"strconv" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
|
||
"github.com/gorilla/mux" | ||
|
||
|
@@ -42,6 +43,7 @@ import ( | |
"github.com/algorand/go-algorand/logging" | ||
"github.com/algorand/go-algorand/network" | ||
"github.com/algorand/go-algorand/protocol" | ||
"github.com/algorand/go-algorand/util/metrics" | ||
) | ||
|
||
// BlockResponseContentType is the HTTP Content-Type header for a raw binary block | ||
|
@@ -67,12 +69,21 @@ const ( | |
|
||
var errBlockServiceClosed = errors.New("block service is shutting down") | ||
|
||
const errMemoryAtCapacityPublic = "block service memory over capacity" | ||
|
||
type errMemoryAtCapacity struct{ capacity, used uint64 } | ||
|
||
func (err errMemoryAtCapacity) Error() string { | ||
return fmt.Sprintf("block service memory over capacity: %d / %d", err.used, err.capacity) | ||
} | ||
|
||
var wsBlockMessagesDroppedCounter = metrics.MakeCounter( | ||
metrics.MetricName{Name: "algod_rpcs_ws_reqs_dropped", Description: "Number of websocket block requests dropped due to memory capacity"}, | ||
) | ||
var httpBlockMessagesDroppedCounter = metrics.MakeCounter( | ||
metrics.MetricName{Name: "algod_rpcs_http_reqs_dropped", Description: "Number of http block requests dropped due to memory capacity"}, | ||
) | ||
|
||
// LedgerForBlockService describes the Ledger methods used by BlockService. | ||
type LedgerForBlockService interface { | ||
EncodedBlockCert(rnd basics.Round) (blk []byte, cert []byte, err error) | ||
|
@@ -93,6 +104,7 @@ type BlockService struct { | |
closeWaitGroup sync.WaitGroup | ||
mu deadlock.Mutex | ||
memoryUsed uint64 | ||
wsMemoryUsed uint64 | ||
memoryCap uint64 | ||
} | ||
|
||
|
@@ -130,7 +142,7 @@ func MakeBlockService(log logging.Logger, config config.Local, ledger LedgerForB | |
fallbackEndpoints: makeFallbackEndpoints(log, config.BlockServiceCustomFallbackEndpoints), | ||
enableArchiverFallback: config.EnableBlockServiceFallbackToArchiver, | ||
log: log, | ||
memoryCap: config.BlockServiceHTTPMemCap, | ||
memoryCap: config.BlockServiceMemCap, | ||
} | ||
if service.enableService { | ||
net.RegisterHTTPHandler(BlockServiceBlockPath, service) | ||
|
@@ -244,6 +256,7 @@ func (bs *BlockService) ServeHTTP(response http.ResponseWriter, request *http.Re | |
response.WriteHeader(http.StatusServiceUnavailable) | ||
bs.log.Debugf("ServeHTTP: returned retry-after: %v", err) | ||
} | ||
httpBlockMessagesDroppedCounter.Inc(nil) | ||
return | ||
default: | ||
// unexpected error. | ||
|
@@ -301,11 +314,35 @@ const datatypeUnsupportedErrMsg = "requested data type is unsupported" | |
func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.IncomingMessage) { | ||
target := reqMsg.Sender.(network.UnicastPeer) | ||
var respTopics network.Topics | ||
var n uint64 | ||
|
||
defer func() { | ||
target.Respond(ctx, reqMsg, respTopics) | ||
outMsg := network.OutgoingMessage{Topics: respTopics} | ||
if n > 0 { | ||
outMsg.OnRelease = func() { | ||
atomic.AddUint64(&bs.wsMemoryUsed, ^uint64(n-1)) | ||
} | ||
atomic.AddUint64(&bs.wsMemoryUsed, (n)) | ||
} | ||
err := target.Respond(ctx, reqMsg, outMsg) | ||
if err != nil { | ||
bs.log.Warnf("BlockService handleCatchupReq: failed to respond: %s", err) | ||
} | ||
}() | ||
|
||
// If we are over-capacity, we will not process the request | ||
// respond to sender with error message | ||
memUsed := atomic.LoadUint64(&bs.wsMemoryUsed) | ||
if memUsed > bs.memoryCap { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to just not return a response here? Or at least not give the peer memory information There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great question. In this case, the requester should request it from another peer. It cannot wait and request again, since these requests are time sensitive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is what it already does after a 4 second timeout even if you return nothing so not returning a response should be the same as returning an error response There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for WS 4 seconds is a very long time if we want to address this properly. If there is a system cause for not receiving the proposed block (i.e. network disruption), for the agreement to move forward will need the blocks serviced as fast as possible. 4 seconds might be okay, we get a 20 second round, but this may get longer very quickly, and we can easily do better. Again, this is a product question: should be add a little bit more protocol implementation to address this rare situation or we leave it as is. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning an error in that case should still be good enough, but 4 second timeouts do happen somewhat regularly right now |
||
err := errMemoryAtCapacity{capacity: bs.memoryCap, used: memUsed} | ||
bs.log.Infof("BlockService handleCatchupReq: %s", err.Error()) | ||
respTopics = network.Topics{ | ||
network.MakeTopic(network.ErrorKey, []byte(errMemoryAtCapacityPublic)), | ||
} | ||
wsBlockMessagesDroppedCounter.Inc(nil) | ||
return | ||
} | ||
|
||
topics, err := network.UnmarshallTopics(reqMsg.Data) | ||
if err != nil { | ||
bs.log.Infof("BlockService handleCatchupReq: %s", err.Error()) | ||
|
@@ -338,7 +375,7 @@ func (bs *BlockService) handleCatchupReq(ctx context.Context, reqMsg network.Inc | |
[]byte(roundNumberParseErrMsg))} | ||
return | ||
} | ||
respTopics = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType)) | ||
respTopics, n = topicBlockBytes(bs.log, bs.ledger, basics.Round(round), string(requestType)) | ||
return | ||
} | ||
|
||
|
@@ -416,7 +453,7 @@ func (bs *BlockService) rawBlockBytes(round basics.Round) ([]byte, error) { | |
return data, err | ||
} | ||
|
||
func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) network.Topics { | ||
func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round basics.Round, requestType string) (network.Topics, uint64) { | ||
blk, cert, err := dataLedger.EncodedBlockCert(round) | ||
if err != nil { | ||
switch err.(type) { | ||
|
@@ -425,7 +462,7 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round | |
log.Infof("BlockService topicBlockBytes: %s", err) | ||
} | ||
return network.Topics{ | ||
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))} | ||
network.MakeTopic(network.ErrorKey, []byte(blockNotAvailableErrMsg))}, 0 | ||
} | ||
switch requestType { | ||
case BlockAndCertValue: | ||
|
@@ -434,10 +471,10 @@ func topicBlockBytes(log logging.Logger, dataLedger LedgerForBlockService, round | |
BlockDataKey, blk), | ||
network.MakeTopic( | ||
CertDataKey, cert), | ||
} | ||
}, uint64(len(blk) + len(cert)) | ||
default: | ||
return network.Topics{ | ||
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))} | ||
network.MakeTopic(network.ErrorKey, []byte(datatypeUnsupportedErrMsg))}, 0 | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you need to check if
OnRelease
is nil or not?