Skip to content

Commit

Permalink
rpc: reuse gRPC streams across unary BatchRequest RPCs
Browse files Browse the repository at this point in the history
Closes #136572.

This commit introduces pooling of gRPC streams that are used to send requests
and receive corresponding responses in a manner that mimics unary RPC
invocation. Pooling these streams allows for reuse of gRPC resources across
calls, as opposed to native unary RPCs, which create a new stream and throw it
away for each request (see grpc.invoke).

The new pooling mechanism is used for the Internal/Batch RPC method, which is
the dominant RPC method used to communicate between the KV client and KV server.
A new Internal/BatchStream RPC method is introduced to allow a client to send
and receive BatchRequest/BatchResponse pairs over a long-lived, pooled stream. A
pool of these streams is then maintained alongside each gRPC connection. The
pool grows and shrinks dynamically based on demand.

The change demonstrates a large performance improvement in both microbenchmarks
and full system benchmarks, which reveals just how expensive the gRPC stream
setup on each unary RPC is.

Microbenchmarks:
```
name                                            old time/op    new time/op    delta
Sysbench/KV/1node_remote/oltp_point_select-10     45.9µs ± 1%    28.8µs ± 2%  -37.31%  (p=0.000 n=9+8)
Sysbench/KV/1node_remote/oltp_read_only-10         958µs ± 6%     709µs ± 1%  -26.00%  (p=0.000 n=9+9)
Sysbench/SQL/1node_remote/oltp_read_only-10       3.65ms ± 6%    2.81ms ± 8%  -23.06%  (p=0.000 n=8+9)
Sysbench/KV/1node_remote/oltp_read_write-10       1.77ms ± 5%    1.38ms ± 1%  -22.09%  (p=0.000 n=10+8)
Sysbench/KV/1node_remote/oltp_write_only-10        688µs ± 4%     557µs ± 1%  -19.11%  (p=0.000 n=9+9)
Sysbench/SQL/1node_remote/oltp_point_select-10     181µs ± 8%     159µs ± 2%  -12.10%  (p=0.000 n=8+9)
Sysbench/SQL/1node_remote/oltp_write_only-10      2.16ms ± 4%    1.92ms ± 3%  -11.08%  (p=0.000 n=9+9)
Sysbench/SQL/1node_remote/oltp_read_write-10      5.89ms ± 2%    5.36ms ± 1%   -8.89%  (p=0.000 n=9+9)

name                                            old alloc/op   new alloc/op   delta
Sysbench/KV/1node_remote/oltp_point_select-10     16.3kB ± 0%     6.4kB ± 0%  -60.70%  (p=0.000 n=8+10)
Sysbench/KV/1node_remote/oltp_write_only-10        359kB ± 1%     256kB ± 1%  -28.92%  (p=0.000 n=10+10)
Sysbench/SQL/1node_remote/oltp_write_only-10       748kB ± 0%     548kB ± 1%  -26.78%  (p=0.000 n=8+10)
Sysbench/SQL/1node_remote/oltp_point_select-10    40.9kB ± 0%    30.8kB ± 0%  -24.74%  (p=0.000 n=9+10)
Sysbench/KV/1node_remote/oltp_read_write-10       1.11MB ± 1%    0.88MB ± 1%  -21.17%  (p=0.000 n=9+10)
Sysbench/SQL/1node_remote/oltp_read_write-10      2.00MB ± 0%    1.65MB ± 0%  -17.60%  (p=0.000 n=9+10)
Sysbench/KV/1node_remote/oltp_read_only-10         790kB ± 0%     655kB ± 0%  -17.11%  (p=0.000 n=9+9)
Sysbench/SQL/1node_remote/oltp_read_only-10       1.33MB ± 0%    1.19MB ± 0%  -10.97%  (p=0.000 n=10+9)

name                                            old allocs/op  new allocs/op  delta
Sysbench/KV/1node_remote/oltp_point_select-10        210 ± 0%        61 ± 0%  -70.95%  (p=0.000 n=10+10)
Sysbench/KV/1node_remote/oltp_read_only-10         3.98k ± 0%     1.88k ± 0%  -52.68%  (p=0.019 n=6+8)
Sysbench/KV/1node_remote/oltp_read_write-10        7.10k ± 0%     3.47k ± 0%  -51.07%  (p=0.000 n=10+9)
Sysbench/KV/1node_remote/oltp_write_only-10        3.10k ± 0%     1.58k ± 0%  -48.89%  (p=0.000 n=10+9)
Sysbench/SQL/1node_remote/oltp_write_only-10       6.73k ± 0%     3.82k ± 0%  -43.30%  (p=0.000 n=10+10)
Sysbench/SQL/1node_remote/oltp_read_write-10       14.4k ± 0%      9.2k ± 0%  -36.29%  (p=0.000 n=9+10)
Sysbench/SQL/1node_remote/oltp_point_select-10       429 ± 0%       277 ± 0%  -35.46%  (p=0.000 n=9+10)
Sysbench/SQL/1node_remote/oltp_read_only-10        7.52k ± 0%     5.37k ± 0%  -28.60%  (p=0.000 n=10+10)
```

Roachtests:
```
name                                            old queries/s  new queries/s  delta
sysbench/oltp_read_write/nodes=3/cpu=8/conc=64     17.6k ± 7%     19.2k ± 2%  +9.22%  (p=0.008 n=5+5)

name                                            old avg_ms/op  new avg_ms/op  delta
sysbench/oltp_read_write/nodes=3/cpu=8/conc=64      72.9 ± 7%      66.6 ± 2%  -8.57%  (p=0.008 n=5+5)

name                                            old p95_ms/op  new p95_ms/op  delta
sysbench/oltp_read_write/nodes=3/cpu=8/conc=64       116 ± 8%       106 ± 3%  -9.02%  (p=0.016 n=5+5)
```

Manual tests:
```
Running in a similar configuration to sysbench/oltp_read_write/nodes=3/cpu=8/conc=64,
but with a benchmarking related cluster settings (before and after) to reduce variance.

-- Before
Mean: 19771.03
Median: 19714.22
Standard Deviation: 282.96
Coefficient of variance: .0143

-- After
Mean: 21908.23
Median: 21923.03
Standard Deviation: 200.88
Coefficient of variance: .0091
```

Release note (performance improvement): gRPC streams are now pooled
across unary intra-cluster RPCs, allowing for reuse of gRPC resources
to reduce the cost of remote key-value layer access.
  • Loading branch information
nvanbenschoten committed Dec 7, 2024
1 parent 00aab22 commit f334557
Show file tree
Hide file tree
Showing 23 changed files with 1,092 additions and 29 deletions.
1 change: 1 addition & 0 deletions build/bazelutil/check.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ GIT_GREP="git $CONFIGS grep"
EXISTING_GO_GENERATE_COMMENTS="
pkg/config/field.go://go:generate stringer --type=Field --linecomment
pkg/rpc/context.go://go:generate mockgen -destination=mocks_generated_test.go --package=. Dialbacker
pkg/rpc/stream_pool.go://go:generate mockgen -destination=mocks_generated_test.go --package=. BatchStreamClient
pkg/roachprod/vm/aws/config.go://go:generate terraformgen -o terraform/main.tf
pkg/roachprod/prometheus/prometheus.go://go:generate mockgen -package=prometheus -destination=mocks_generated_test.go . Cluster
pkg/cmd/roachtest/clusterstats/collector.go://go:generate mockgen -package=clusterstats -destination mocks_generated_test.go github.com/cockroachdb/cockroach/pkg/roachprod/prometheus Client
Expand Down
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -401,4 +401,4 @@ trace.span_registry.enabled boolean false if set, ongoing traces can be seen at
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used. application
ui.database_locality_metadata.enabled boolean true if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute application
ui.display_timezone enumeration etc/utc the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1] application
version version 1000024.3-upgrading-to-1000025.1-step-008 set the active cluster version in the format '<major>.<minor>' application
version version 1000024.3-upgrading-to-1000025.1-step-010 set the active cluster version in the format '<major>.<minor>' application
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,6 @@
<tr><td><div id="setting-trace-zipkin-collector" class="anchored"><code>trace.zipkin.collector</code></div></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 9411 will be used.</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-database-locality-metadata-enabled" class="anchored"><code>ui.database_locality_metadata.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if enabled shows extended locality data about databases and tables in DB Console which can be expensive to compute</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-ui-display-timezone" class="anchored"><code>ui.display_timezone</code></div></td><td>enumeration</td><td><code>etc/utc</code></td><td>the timezone used to format timestamps in the ui [etc/utc = 0, america/new_york = 1]</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-008</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-version" class="anchored"><code>version</code></div></td><td>version</td><td><code>1000024.3-upgrading-to-1000025.1-step-010</code></td><td>set the active cluster version in the format &#39;&lt;major&gt;.&lt;minor&gt;&#39;</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
</tbody>
</table>
5 changes: 5 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ const (
// range-ID local key, which is written below raft.
V25_1_AddRangeForceFlushKey

// V25_1_BatchStreamRPC adds the BatchStream RPC, which allows for more
// efficient Batch unary RPCs.
V25_1_BatchStreamRPC

// *************************************************
// Step (1) Add new versions above this comment.
// Do not add new versions to a patch release.
Expand Down Expand Up @@ -304,6 +308,7 @@ var versionTable = [numKeys]roachpb.Version{
V25_1_AddJobsTables: {Major: 24, Minor: 3, Internal: 4},
V25_1_MoveRaftTruncatedState: {Major: 24, Minor: 3, Internal: 6},
V25_1_AddRangeForceFlushKey: {Major: 24, Minor: 3, Internal: 8},
V25_1_BatchStreamRPC: {Major: 24, Minor: 3, Internal: 10},

// *************************************************
// Step (2): Add new versions above this comment.
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvcoord/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func (n Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchRe
return &kvpb.BatchResponse{}, nil
}

func (n Node) BatchStream(stream kvpb.Internal_BatchStreamServer) error {
panic("unimplemented")
}

func (n Node) RangeLookup(
_ context.Context, _ *kvpb.RangeLookupRequest,
) (*kvpb.RangeLookupResponse, error) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kv/kvclient/kvcoord/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ func (m *mockInternalClient) Batch(
return br, nil
}

func (m *mockInternalClient) BatchStream(
ctx context.Context, opts ...grpc.CallOption,
) (kvpb.Internal_BatchStreamClient, error) {
return nil, fmt.Errorf("unsupported BatchStream call")
}

// RangeLookup implements the kvpb.InternalClient interface.
func (m *mockInternalClient) RangeLookup(
ctx context.Context, rl *kvpb.RangeLookupRequest, _ ...grpc.CallOption,
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/kvtenant/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (*mockServer) Batch(context.Context, *kvpb.BatchRequest) (*kvpb.BatchRespon
panic("unimplemented")
}

func (m *mockServer) BatchStream(stream kvpb.Internal_BatchStreamServer) error {
panic("implement me")
}

func (m *mockServer) MuxRangeFeed(server kvpb.Internal_MuxRangeFeedServer) error {
panic("implement me")
}
Expand Down
26 changes: 16 additions & 10 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3668,44 +3668,50 @@ message JoinNodeResponse {

// Batch and RangeFeed service implemented by nodes for KV API requests.
service Internal {
rpc Batch (BatchRequest) returns (BatchResponse) {}
rpc Batch (BatchRequest) returns (BatchResponse) {}

// BatchStream is a streaming variant of Batch. There is a 1:1 correspondence
// between requests and responses. The method is used to facilitate pooling of
// gRPC streams to avoid the overhead of creating and discarding a new stream
// for each unary Batch RPC invocation. See rpc.BatchStreamPool.
rpc BatchStream (stream BatchRequest) returns (stream BatchResponse) {}

rpc RangeLookup (RangeLookupRequest) returns (RangeLookupResponse) {}
rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {}
rpc MuxRangeFeed (stream RangeFeedRequest) returns (stream MuxRangeFeedEvent) {}
rpc GossipSubscription (GossipSubscriptionRequest) returns (stream GossipSubscriptionEvent) {}
rpc ResetQuorum (ResetQuorumRequest) returns (ResetQuorumResponse) {}

// TokenBucket is used by tenants to obtain Request Units and report
// consumption.
rpc TokenBucket (TokenBucketRequest) returns (TokenBucketResponse) {}
rpc TokenBucket (TokenBucketRequest) returns (TokenBucketResponse) {}

// Join a bootstrapped cluster. If the target node is itself not part of a
// bootstrapped cluster, an appropriate error is returned.
rpc Join(JoinNodeRequest) returns (JoinNodeResponse) { }
rpc Join (JoinNodeRequest) returns (JoinNodeResponse) {}

// GetSpanConfigs is used to fetch the span configurations over a given
// keyspan.
rpc GetSpanConfigs (GetSpanConfigsRequest) returns (GetSpanConfigsResponse) { }
rpc GetSpanConfigs (GetSpanConfigsRequest) returns (GetSpanConfigsResponse) {}

// GetAllSystemSpanConfigsThatApply is used to fetch all system span
// configurations that apply over a tenant's ranges.
rpc GetAllSystemSpanConfigsThatApply (GetAllSystemSpanConfigsThatApplyRequest) returns (GetAllSystemSpanConfigsThatApplyResponse) {}

// UpdateSpanConfigs is used to update the span configurations over given
// keyspans.
rpc UpdateSpanConfigs (UpdateSpanConfigsRequest) returns (UpdateSpanConfigsResponse) { }
rpc UpdateSpanConfigs (UpdateSpanConfigsRequest) returns (UpdateSpanConfigsResponse) {}

// SpanConfigConformance is used to determine whether ranges backing the given
// keyspans conform to span configs that apply over them.
rpc SpanConfigConformance (SpanConfigConformanceRequest) returns (SpanConfigConformanceResponse) { }
rpc SpanConfigConformance (SpanConfigConformanceRequest) returns (SpanConfigConformanceResponse) {}

// TenantSettings is used by tenants to obtain and stay up to date with tenant
// setting overrides.
rpc TenantSettings (TenantSettingsRequest) returns (stream TenantSettingsEvent) { }

rpc TenantSettings (TenantSettingsRequest) returns (stream TenantSettingsEvent) {}

// GetRangeDescriptors is used by tenants to get range descriptors for their
// own ranges.
rpc GetRangeDescriptors (GetRangeDescriptorsRequest) returns (stream GetRangeDescriptorsResponse) { }
rpc GetRangeDescriptors (GetRangeDescriptorsRequest) returns (stream GetRangeDescriptorsResponse) {}
}

// GetRangeDescriptorsRequest is used to fetch range descriptors.
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvpb/kvpbmock/mocks_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"restricted_internal_client.go",
"settings.go",
"snappy.go",
"stream_pool.go",
"tls.go",
],
embed = [":rpc_go_proto"],
Expand Down Expand Up @@ -90,7 +91,10 @@ go_library(
gomock(
name = "mock_rpc",
out = "mocks_generated_test.go",
interfaces = ["Dialbacker"],
interfaces = [
"BatchStreamClient",
"Dialbacker",
],
library = ":rpc",
package = "rpc",
self_package = "github.com/cockroachdb/cockroach/pkg/rpc",
Expand All @@ -116,6 +120,7 @@ go_test(
"metrics_test.go",
"peer_test.go",
"snappy_test.go",
"stream_pool_test.go",
"tls_test.go",
":mock_rpc", # keep
],
Expand Down Expand Up @@ -173,6 +178,7 @@ go_test(
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//peer",
"@org_golang_google_grpc//status",
"@org_golang_x_sync//errgroup",
],
)

Expand Down
3 changes: 2 additions & 1 deletion pkg/rpc/auth_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,15 @@ func (a tenantAuthorizer) authorize(
req interface{},
) error {
switch fullMethod {
case "/cockroach.roachpb.Internal/Batch":
case "/cockroach.roachpb.Internal/Batch", "/cockroach.roachpb.Internal/BatchStream":
return a.authBatch(ctx, sv, tenID, req.(*kvpb.BatchRequest))

case "/cockroach.roachpb.Internal/RangeLookup":
return a.authRangeLookup(ctx, tenID, req.(*kvpb.RangeLookupRequest))

case "/cockroach.roachpb.Internal/RangeFeed", "/cockroach.roachpb.Internal/MuxRangeFeed":
return a.authRangeFeed(tenID, req.(*kvpb.RangeFeedRequest))

case "/cockroach.roachpb.Internal/GossipSubscription":
return a.authGossipSubscription(tenID, req.(*kvpb.GossipSubscriptionRequest))

Expand Down
26 changes: 25 additions & 1 deletion pkg/rpc/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,30 @@ func TestTenantAuthRequest(t *testing.T) {
expErr: noError,
},
},
"/cockroach.roachpb.Internal/BatchStream": {
{
req: &kvpb.BatchRequest{},
expErr: `requested key span /Max not fully contained in tenant keyspace /Tenant/1{0-1}`,
},
{
req: &kvpb.BatchRequest{Requests: makeReqs(
makeReq("a", "b"),
)},
expErr: `requested key span {a-b} not fully contained in tenant keyspace /Tenant/1{0-1}`,
},
{
req: &kvpb.BatchRequest{Requests: makeReqs(
makeReq(prefix(5, "a"), prefix(5, "b")),
)},
expErr: `requested key span /Tenant/5{a-b} not fully contained in tenant keyspace /Tenant/1{0-1}`,
},
{
req: &kvpb.BatchRequest{Requests: makeReqs(
makeReq(prefix(10, "a"), prefix(10, "b")),
)},
expErr: noError,
},
},
"/cockroach.roachpb.Internal/RangeLookup": {
{
req: &kvpb.RangeLookupRequest{},
Expand Down Expand Up @@ -1009,7 +1033,7 @@ func TestTenantAuthRequest(t *testing.T) {
// cross-read capability and the request is a read, expect no error.
if canCrossRead && strings.Contains(tc.expErr, "fully contained") {
switch method {
case "/cockroach.roachpb.Internal/Batch":
case "/cockroach.roachpb.Internal/Batch", "/cockroach.roachpb.Internal/BatchStream":
if tc.req.(*kvpb.BatchRequest).IsReadOnly() {
tc.expErr = noError
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/rpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,26 @@ type Connection struct {
// It always has to be signaled eventually, regardless of the stopper
// draining, etc, since callers might be blocking on it.
connFuture connFuture
// batchStreamPool holds a pool of BatchStreamClient streams established on
// the connection. The pool can be used to avoid the overhead of unary Batch
// RPCs.
//
// The pool is only initialized once the ClientConn is resolved.
batchStreamPool BatchStreamPool
}

// newConnectionToNodeID makes a Connection for the given node, class, and nontrivial Signal
// that should be queried in Connect().
func newConnectionToNodeID(k peerKey, breakerSignal func() circuit.Signal) *Connection {
func newConnectionToNodeID(
opts *ContextOptions, k peerKey, breakerSignal func() circuit.Signal,
) *Connection {
c := &Connection{
breakerSignalFn: breakerSignal,
k: k,
connFuture: connFuture{
ready: make(chan struct{}),
},
batchStreamPool: makeStreamPool(opts.Stopper, newBatchStream),
}
return c
}
Expand Down Expand Up @@ -156,6 +165,13 @@ func (c *Connection) Signal() circuit.Signal {
return c.breakerSignalFn()
}

func (c *Connection) BatchStreamPool() *BatchStreamPool {
if !c.connFuture.Resolved() {
panic("BatchStreamPool called on unresolved connection")
}
return &c.batchStreamPool
}

type connFuture struct {
ready chan struct{}
cc *grpc.ClientConn
Expand Down
4 changes: 4 additions & 0 deletions pkg/rpc/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ func (*internalServer) Batch(context.Context, *kvpb.BatchRequest) (*kvpb.BatchRe
return nil, nil
}

func (*internalServer) BatchStream(stream kvpb.Internal_BatchStreamServer) error {
panic("unimplemented")
}

func (*internalServer) RangeLookup(
context.Context, *kvpb.RangeLookupRequest,
) (*kvpb.RangeLookupResponse, error) {
Expand Down
Loading

0 comments on commit f334557

Please sign in to comment.