-
Notifications
You must be signed in to change notification settings - Fork 490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
network:ws block byte limiter #5472
network:ws block byte limiter #5472
Conversation
Codecov Report
@@ Coverage Diff @@
## master #5472 +/- ##
==========================================
+ Coverage 55.78% 55.81% +0.02%
==========================================
Files 446 446
Lines 63253 63282 +29
==========================================
+ Hits 35288 35322 +34
+ Misses 25593 25584 -9
- Partials 2372 2376 +4
... and 7 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
…ounter from the network layer
config/localTemplate.go
Outdated
@@ -524,6 +524,9 @@ type Local struct { | |||
// BlockServiceHTTPMemCap is the memory capacity in bytes which is allowed for the block service to use for HTTP block requests. | |||
// When it exceeds this capacity, it redirects the block requests to a different node | |||
BlockServiceHTTPMemCap uint64 `version[28]:"500000000"` | |||
|
|||
// BlockServiceWSMemCap is the memory capacity in bytes which is allowed for the block service to use for websocket block requests. | |||
BlockServiceWSMemCap int64 `version[28]:"500000000"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can make a single parameter and use it for both HTTP and WS. We are having too many config params...
network/wsNetwork.go
Outdated
@@ -142,6 +142,8 @@ var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricN | |||
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"}) | |||
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"}) | |||
|
|||
var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you want to add metrics, maybe we need one/same for the HTTP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now it's unused since my change removed it? Should I remove it or add both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want these both metrics
network/wsNetwork.go
Outdated
@@ -470,6 +472,13 @@ type WebsocketNetwork struct { | |||
|
|||
// resolveSRVRecords is a function that resolves SRV records for a given service, protocol and name | |||
resolveSRVRecords func(service string, protocol string, name string, fallbackDNSResolverAddress string, secure bool) (addrs []string, err error) | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is too much complication in the network layer.
I have a prototype suggestion here:
iansuvak#4
Keep the accounting similar to #5428 and use a callback as in iansuvak#4
…k-byte-limiter Suggestion
// If we are over-capacity, we will not process the request | ||
// respond to sender with error message | ||
memUsed := atomic.LoadUint64(&bs.memoryUsed) | ||
if memUsed > bs.memoryCap { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to just not return a response here? Or at least not give the peer memory information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question.
We need to do something similar to the HTTP request. There we got it for free, using the protocol retry message.
Here, we need to implement on the requester side, how to react to this error.
In this case, the requester should request it from another peer. It cannot wait and request again, since these requests are time sensitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what it already does after a 4 second timeout even if you return nothing so not returning a response should be the same as returning an error response
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for WS 4 seconds is a very long time if we want to address this properly.
The request will come from the agreement service, so it needs the block in milliseconds, not seconds :-)
If there is a system cause for not receiving the proposed block (i.e. network disruption), for the agreement to move forward will need the blocks serviced as fast as possible. 4 seconds might be okay, we get a 20 second round, but this may get longer very quickly, and we can easily do better.
Again, this is a product question: should be add a little bit more protocol implementation to address this rare situation or we leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning an error in that case should still be good enough, but 4 second timeouts do happen somewhat regularly right now
network/wsPeer.go
Outdated
@@ -410,7 +411,7 @@ func (wp *wsPeer) Respond(ctx context.Context, reqMsg IncomingMessage, responseT | |||
} | |||
|
|||
select { | |||
case wp.sendBufferBulk <- sendMessages{msgs: msg}: | |||
case wp.sendBufferBulk <- sendMessages{msgs: msg, callback: reqMsg.Callback}: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this select case is not picked, you may leak the counter decrement.
For now, this likely won't matter, since ctx is canceled only when the service is shutting down.
If this is the case, we are still vulnerable to future changes.
Better to handle the case here and make the behavior robust irrespective of why or who closes or cancels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, thanks!
network/wsNetwork.go
Outdated
@@ -142,6 +142,8 @@ var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricN | |||
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"}) | |||
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"}) | |||
|
|||
var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note the Lint warning for this var being unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed. I was originally using it but had it removed. There's a conversation above asking if we should record this for just this, both this and http rejections or neither
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess having a callback on when a response leaves this node is fine, but it only works for topics since Respond
has IncomingMessage
as an argument, so it is not a generic implementation so should not be part of IncomingMessage
.
The Respond
handler is used only in blockService (the usage in wsNetwork appears to be unreachable since the Respond
action is not used.
I think Respond handler should be refactored to accept OutgoingMessage
instead of responseTopics Topics
especially OutgoingMessage
has Topics field so no functionality will be lost. Adding OnRelease
(or OnSent
) to OutgoingMessage
would make it more generic/usable.
Maybe call it "OnResponseSent" instead of "OnRelease"?
network/wsNetwork.go
Outdated
@@ -142,6 +142,8 @@ var networkPrioBatchesPPWithoutCompression = metrics.MakeCounter(metrics.MetricN | |||
var networkPrioPPCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_compressed_size_total", Description: "cumulative size of all compressed PP"}) | |||
var networkPrioPPNonCompressedSize = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_prio_pp_non_compressed_size_total", Description: "cumulative size of all non-compressed PP"}) | |||
|
|||
var networkCatchupMessagesDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_ue_messages_dropped", Description: "number of (UE) block catchup request messages dropped due to being at byte limit"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want these both metrics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, please fix the reviewdog and a failing test
rpcs/blockService.go
Outdated
} | ||
atomic.AddUint64(&bs.wsMemoryUsed, (n)) | ||
} | ||
target.Respond(ctx, reqMsg, outMsg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe log an error here?
case <-wp.closing: | ||
outMsg.OnRelease() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you need to check if OnRelease
is nil or not?
wp.net.log.Debugf("peer closing %s", wp.conn.RemoteAddr().String()) | ||
return | ||
case <-ctx.Done(): | ||
outMsg.OnRelease() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar here
Summary
This is the websocket counterpart of the HTTP block server PR implemented here #5428
The goal of it is to limit the number of concurrent bytes worth of block requests that we can serve at a given time. It counts the bytes as used from the moment that they land in the send queue channel and subtracts once they are sent or the connection is terminated.
The tricky implementation choice here is where to track the number of bytes. Ideally we would be able to do it in the blockserver.go like the HTTP PR does but since the bytes aren't freed until the network package is done with them and network doesn't have access to the blockserver we would have to do this by changing the interface to allow messages to communicate back to the blockserver via a channel perhaps? All thoughts and opinions are much appreciated
Test Plan
Don't have the tests written yet since I wanted to get feedback on the approach beforehand.
Will write new tests focusing on ensuring that send queue is drained properly in different cases.