Skip to content

Commit

Permalink
rpc,kv: use drpc for BatchRequest
Browse files Browse the repository at this point in the history
I cherry-picked the stream reuse PR cockroachdb#136648 so that both are
more likely to evolve in unison, and because I anticipated
"piggybacking" on top of it. This did not materialize, but
the important integration point is now side by side and should
lend itself well to a future change that adds stream reuse for
drpc as well.

This commit roughly encompasses the following:

1.  we change `*rpc.Connection` to also maintain a `drpcpool.Conn`.

It took me a while to grok how the `drpcpool.Pool` is architected.
Essentially, its `Get()` method gives you a handle to a `drpcpool.Conn`
that reflects a specific use case of the pool. When a client is created
on it, it gets assigned an actual `drpc.Conn` from the pool (dialing if
necessary), and when the client is closed, this conn is returned to the
pool.

So in a sense, `drpcpool.Conn` is an actual pool for some keyed use
case; `drpcpool.Pool` is simply the bucket in which the actual
connections get pooled, but they're never pulled from or returned to it
directly. We don't currently use the key since we create a pool per
remote node, and if we're not sharing TCP conns they all look the same
to us anyway (i.e. there's no point in DefaultClass vs SystemClass).

If we squint, a `drpcoool.Conn` parallels `*grpc.ClientConn` in the sense
that you can "just" make multiple clients on top of it.  Of course
internally a `*grpc.ClientConn` multiplexes multiple clients over one
HTTP2 connection, whereas a `drpcpool.Conn` represents multiple
independent TCP connections.

The lifecycle of these pools is currently completely broken, and I was
honestly surprised TestTestServerDRPC passes! Future commits will need
to clean this up to the point where we can at least run a representative
benchmark.
  • Loading branch information
tbg committed Dec 9, 2024
1 parent 01275cc commit cfb25fc
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 31 deletions.
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/kvtenant/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,8 @@ func (c *connector) dialAddrs(ctx context.Context) (*client, error) {

func (c *connector) dialAddr(ctx context.Context, addr string) (conn *grpc.ClientConn, err error) {
if c.rpcDialTimeout == 0 {
return c.rpcContext.GRPCUnvalidatedDial(addr, roachpb.Locality{}).Connect(ctx)
cc, err := c.rpcContext.GRPCUnvalidatedDial(addr, roachpb.Locality{}).Connect(ctx)
return cc, err
}
err = timeutil.RunWithTimeout(ctx, "dial addr", c.rpcDialTimeout, func(ctx context.Context) error {
conn, err = c.rpcContext.GRPCUnvalidatedDial(addr, roachpb.Locality{}).Connect(ctx)
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/loqrecovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func visitNodeWithRetry(
// Note that we use ConnectNoBreaker here to avoid any race with probe
// running on current node and target node restarting. Errors from circuit
// breaker probes could confuse us and present node as unavailable.
conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
conn, _, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
// Nodes would contain dead nodes that we don't need to visit. We can skip
// them and let caller handle incomplete info.
if err != nil {
Expand Down Expand Up @@ -803,7 +803,7 @@ func makeVisitNode(g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context)
// Note that we use ConnectNoBreaker here to avoid any race with probe
// running on current node and target node restarting. Errors from circuit
// breaker probes could confuse us and present node as unavailable.
conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
conn, _, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
if err != nil {
if grpcutil.IsClosedConnection(err) {
log.Infof(ctx, "can't dial node n%d because connection is permanently closed: %s",
Expand Down
3 changes: 3 additions & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ go_library(
"@com_github_montanaflynn_stats//:stats",
"@com_github_vividcortex_ewma//:ewma",
"@io_opentelemetry_go_otel//attribute",
"@io_storj_drpc//drpcconn",
"@io_storj_drpc//drpcmigrate",
"@io_storj_drpc//drpcmux",
"@io_storj_drpc//drpcpool",
"@io_storj_drpc//drpcserver",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
Expand Down
41 changes: 29 additions & 12 deletions pkg/rpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"storj.io/drpc/drpcpool"
)

// Connection is a wrapper around grpc.ClientConn. It prevents the underlying
Expand Down Expand Up @@ -65,14 +66,14 @@ func newConnectionToNodeID(
// block but fall back to defErr in this case.
func (c *Connection) waitOrDefault(
ctx context.Context, defErr error, sig circuit.Signal,
) (*grpc.ClientConn, error) {
) (*grpc.ClientConn, drpcpool.Conn, error) {
// Check the circuit breaker first. If it is already tripped now, we
// want it to take precedence over connFuture below (which is closed in
// the common case of a connection going bad after having been healthy
// for a while).
select {
case <-sig.C():
return nil, sig.Err()
return nil, nil, sig.Err()
default:
}

Expand All @@ -83,26 +84,26 @@ func (c *Connection) waitOrDefault(
select {
case <-c.connFuture.C():
case <-sig.C():
return nil, sig.Err()
return nil, nil, sig.Err()
case <-ctx.Done():
return nil, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
return nil, nil, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
}
} else {
select {
case <-c.connFuture.C():
case <-sig.C():
return nil, sig.Err()
return nil, nil, sig.Err()
case <-ctx.Done():
return nil, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
return nil, nil, errors.Wrapf(ctx.Err(), "while connecting to n%d at %s", c.k.NodeID, c.k.TargetAddr)
default:
return nil, defErr
return nil, nil, defErr
}
}

// Done waiting, c.connFuture has resolved, return the result. Note that this
// conn could be unhealthy (or there may not even be a conn, i.e. Err() !=
// nil), if that's what the caller wanted (ConnectNoBreaker).
return c.connFuture.Conn(), c.connFuture.Err()
return c.connFuture.Conn(), c.connFuture.DRPCConn(), c.connFuture.Err()
}

// Connect returns the underlying grpc.ClientConn after it has been validated,
Expand All @@ -112,6 +113,11 @@ func (c *Connection) waitOrDefault(
// an error. In rare cases, this behavior is undesired and ConnectNoBreaker may
// be used instead.
func (c *Connection) Connect(ctx context.Context) (*grpc.ClientConn, error) {
cc, _, err := c.waitOrDefault(ctx, nil /* defErr */, c.breakerSignalFn())
return cc, err
}

func (c *Connection) Connect2(ctx context.Context) (*grpc.ClientConn, drpcpool.Conn, error) {
return c.waitOrDefault(ctx, nil /* defErr */, c.breakerSignalFn())
}

Expand All @@ -133,7 +139,9 @@ func (s *neverTripSignal) IsTripped() bool {
// that it will latch onto (or start) an existing connection attempt even if
// previous attempts have not succeeded. This may be preferable to Connect
// if the caller is already certain that a peer is available.
func (c *Connection) ConnectNoBreaker(ctx context.Context) (*grpc.ClientConn, error) {
func (c *Connection) ConnectNoBreaker(
ctx context.Context,
) (*grpc.ClientConn, drpcpool.Conn, error) {
// For ConnectNoBreaker we don't use the default Signal but pass a dummy one
// that never trips. (The probe tears down the Conn on quiesce so we don't rely
// on the Signal for that).
Expand All @@ -157,7 +165,7 @@ func (c *Connection) ConnectNoBreaker(ctx context.Context) (*grpc.ClientConn, er
// latest heartbeat. Returns ErrNotHeartbeated if the peer was just contacted for
// the first time and the first heartbeat has not occurred yet.
func (c *Connection) Health() error {
_, err := c.waitOrDefault(context.Background(), ErrNotHeartbeated, c.breakerSignalFn())
_, _, err := c.waitOrDefault(context.Background(), ErrNotHeartbeated, c.breakerSignalFn())
return err
}

Expand All @@ -175,6 +183,7 @@ func (c *Connection) BatchStreamPool() *BatchStreamPool {
type connFuture struct {
ready chan struct{}
cc *grpc.ClientConn
dc drpcpool.Conn
err error
}

Expand All @@ -201,6 +210,14 @@ func (s *connFuture) Conn() *grpc.ClientConn {
return s.cc
}

// DRPCConn must only be called after C() has been closed.
func (s *connFuture) DRPCConn() drpcpool.Conn {
if s.err != nil {
return nil
}
return s.dc
}

func (s *connFuture) Resolved() bool {
select {
case <-s.ready:
Expand All @@ -212,12 +229,12 @@ func (s *connFuture) Resolved() bool {

// Resolve is idempotent. Only the first call has any effect.
// Not thread safe.
func (s *connFuture) Resolve(cc *grpc.ClientConn, err error) {
func (s *connFuture) Resolve(cc *grpc.ClientConn, dc drpcpool.Conn, err error) {
select {
case <-s.ready:
// Already resolved, noop.
default:
s.cc, s.err = cc, err
s.cc, s.dc, s.err = cc, dc, err
close(s.ready)
}
}
1 change: 1 addition & 0 deletions pkg/rpc/nodedialer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@io_storj_drpc//drpcpool",
"@org_golang_google_grpc//:grpc",
],
)
Expand Down
44 changes: 33 additions & 11 deletions pkg/rpc/nodedialer/nodedialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"google.golang.org/grpc"
"storj.io/drpc/drpcpool"
)

// An AddressResolver translates NodeIDs into addresses.
Expand Down Expand Up @@ -100,7 +101,7 @@ func (n *Dialer) Dial(
err = errors.Wrapf(err, "failed to resolve n%d", nodeID)
return nil, err
}
conn, _, err := n.dial(ctx, nodeID, addr, locality, true, class)
conn, _, _, err := n.dial(ctx, nodeID, addr, locality, true, class)
return conn, err
}

Expand All @@ -117,7 +118,7 @@ func (n *Dialer) DialNoBreaker(
if err != nil {
return nil, err
}
conn, _, err := n.dial(ctx, nodeID, addr, locality, false, class)
conn, _, _, err := n.dial(ctx, nodeID, addr, locality, false, class)
return conn, err
}

Expand Down Expand Up @@ -147,16 +148,37 @@ func (n *Dialer) DialInternalClient(
return nil, errors.Wrap(err, "resolver error")
}
log.VEventf(ctx, 2, "sending request to %s", addr)
conn, pool, err := n.dial(ctx, nodeID, addr, locality, true, class)
conn, pool, dconn, err := n.dial(ctx, nodeID, addr, locality, true, class)
if err != nil {
return nil, err
}

client := kvpb.NewInternalClient(conn)
client = maybeWrapInBatchStreamPoolClient(ctx, n.rpcContext.Settings, client, pool)
client = maybeWrapInTracingClient(ctx, client)

const useDRPC = true
if useDRPC {
client := &unaryDRPCBatchServiceToInternalAdapter{
InternalClient: client, // for RangeFeed only
drpcClient: kvpb.NewDRPCDRPCBatchServiceClient(dconn),
}
return client, nil
}
client = maybeWrapInBatchStreamPoolClient(ctx, n.rpcContext.Settings, client, pool)
return client, nil
}

type unaryDRPCBatchServiceToInternalAdapter struct {
kvpb.InternalClient
drpcClient kvpb.DRPCDRPCBatchServiceClient
}

func (a *unaryDRPCBatchServiceToInternalAdapter) Batch(
ctx context.Context, in *kvpb.BatchRequest, opts ...grpc.CallOption,
) (*kvpb.BatchResponse, error) {
return a.drpcClient.Batch(ctx, in)
}

// dial performs the dialing of the remote connection. If checkBreaker
// is set (which it usually is), circuit breakers for the peer will be
// checked.
Expand All @@ -167,28 +189,28 @@ func (n *Dialer) dial(
locality roachpb.Locality,
checkBreaker bool,
class rpc.ConnectionClass,
) (_ *grpc.ClientConn, _ *rpc.BatchStreamPool, err error) {
) (_ *grpc.ClientConn, _ *rpc.BatchStreamPool, _ drpcpool.Conn, err error) {
const ctxWrapMsg = "dial"
// Don't trip the breaker if we're already canceled.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, nil, errors.Wrap(ctxErr, ctxWrapMsg)
return nil, nil, nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
rpcConn := n.rpcContext.GRPCDialNode(addr.String(), nodeID, locality, class)
connect := rpcConn.Connect
connect := rpcConn.Connect2
if !checkBreaker {
connect = rpcConn.ConnectNoBreaker
}
conn, err := connect(ctx)
conn, dconn, err := connect(ctx)
if err != nil {
// If we were canceled during the dial, don't trip the breaker.
if ctxErr := ctx.Err(); ctxErr != nil {
return nil, nil, errors.Wrap(ctxErr, ctxWrapMsg)
return nil, nil, nil, errors.Wrap(ctxErr, ctxWrapMsg)
}
err = errors.Wrapf(err, "failed to connect to n%d at %v", nodeID, addr)
return nil, nil, err
return nil, nil, nil, err
}
pool := rpcConn.BatchStreamPool()
return conn, pool, nil
return conn, pool, dconn, nil
}

// ConnHealth returns nil if we have an open connection of the request
Expand Down
Loading

0 comments on commit cfb25fc

Please sign in to comment.