Skip to content

Commit

Permalink
stats: Add RPC event for blocking for a picker update (#6422)
Browse files Browse the repository at this point in the history
  • Loading branch information
zasweq authored Jul 18, 2023
1 parent 02946a3 commit 7aab9c0
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 14 deletions.
2 changes: 1 addition & 1 deletion clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (cc *ClientConn) exitIdleMode() error {
cc.idlenessState = ccIdlenessStateExitingIdle
exitedIdle := false
if cc.blockingpicker == nil {
cc.blockingpicker = newPickerWrapper()
cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers)
} else {
cc.blockingpicker.exitIdleMode()
exitedIdle = true
Expand Down
34 changes: 27 additions & 7 deletions picker_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,26 @@ import (
"google.golang.org/grpc/internal/channelz"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick
// actions and unblock when there's a picker update.
type pickerWrapper struct {
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
mu sync.Mutex
done bool
idle bool
blockingCh chan struct{}
picker balancer.Picker
statsHandlers []stats.Handler // to record blocking picker calls
}

func newPickerWrapper() *pickerWrapper {
return &pickerWrapper{blockingCh: make(chan struct{})}
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
return &pickerWrapper{
blockingCh: make(chan struct{}),
statsHandlers: statsHandlers,
}
}

// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick.
Expand Down Expand Up @@ -95,6 +100,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
var ch chan struct{}

var lastPickErr error

for {
pw.mu.Lock()
if pw.done {
Expand Down Expand Up @@ -129,6 +135,20 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
continue
}

// If the channel is set, it means that the pick call had to wait for a
// new picker at some point. Either it's the first iteration and this
// function received the first picker, or a picker errored with
// ErrNoSubConnAvailable or errored with failfast set to false, which
// will trigger a continue to the next iteration. In the first case this
// conditional will hit if this call had to block (the channel is set).
// In the second case, the only way it will get to this conditional is
// if there is a new picker.
if ch != nil {
for _, sh := range pw.statsHandlers {
sh.HandleRPC(ctx, &stats.PickerUpdated{})
}
}

ch = pw.blockingCh
p := pw.picker
pw.mu.Unlock()
Expand Down
10 changes: 5 additions & 5 deletions picker_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error
}

func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
Expand All @@ -75,7 +75,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) {
}

func (s) TestBlockingPick(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
// All goroutines should block because picker is nil in bp.
var finishedCount uint64
for i := goroutineCount; i > 0; i-- {
Expand All @@ -94,7 +94,7 @@ func (s) TestBlockingPick(t *testing.T) {
}

func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
// All goroutines should block because picker returns no subConn available.
Expand All @@ -114,7 +114,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
}

func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because picker returns transientFailure and
Expand All @@ -135,7 +135,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
}

func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper()
bp := newPickerWrapper(nil)
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64
// All goroutines should block because subConn is not ready.
Expand Down
4 changes: 3 additions & 1 deletion stats/opencensus/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ func populateSpan(ctx context.Context, rs stats.RPCStats, ti *traceInfo) {
trace.BoolAttribute("Client", rs.Client),
trace.BoolAttribute("FailFast", rs.FailFast),
)
case *stats.PickerUpdated:
span.Annotate(nil, "Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting
// from 1 one for sent messages and one for received messages."
// from one for sent messages and one for received messages."
mi := atomic.AddUint32(&ti.countRecvMsg, 1)
span.AddMessageReceiveEvent(int64(mi), int64(rs.Length), int64(rs.CompressedLength))
case *stats.OutPayload:
Expand Down
10 changes: 10 additions & 0 deletions stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,16 @@ func (s *Begin) IsClient() bool { return s.Client }

func (s *Begin) isRPCStats() {}

// PickerUpdated indicates that the LB policy provided a new picker while the
// RPC was waiting for one.
type PickerUpdated struct{}

// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a Picker, thus always returns true.
func (*PickerUpdated) IsClient() bool { return true }

func (*PickerUpdated) isRPCStats() {}

// InPayload contains the information for an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
Expand Down
167 changes: 167 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"flag"
"fmt"
Expand All @@ -44,6 +45,8 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
Expand All @@ -62,6 +65,7 @@ import (
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/serviceconfig"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"google.golang.org/grpc/tap"
Expand All @@ -82,6 +86,7 @@ const defaultHealthService = "grpc.health.v1.Health"

func init() {
channelz.TurnOn()
balancer.Register(triggerRPCBlockPickerBalancerBuilder{})
}

type s struct {
Expand Down Expand Up @@ -6362,3 +6367,165 @@ func (s) TestGlobalBinaryLoggingOptions(t *testing.T) {
t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events)
}
}

type statsHandlerRecordEvents struct {
mu sync.Mutex
s []stats.RPCStats
}

func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context {
return ctx
}
func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) {
h.mu.Lock()
defer h.mu.Unlock()
h.s = append(h.s, s)
}
func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context {
return ctx
}
func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {}

type triggerRPCBlockPicker struct {
pickDone func()
}

func (bp *triggerRPCBlockPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) {
bp.pickDone()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}

const name = "triggerRPCBlockBalancer"

type triggerRPCBlockPickerBalancerBuilder struct{}

func (triggerRPCBlockPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &triggerRPCBlockBalancer{
blockingPickerDone: grpcsync.NewEvent(),
ClientConn: cc,
}
// round_robin child to complete balancer tree with a usable leaf policy and
// have RPCs actually work.
builder := balancer.Get(roundrobin.Name)
rr := builder.Build(b, bOpts)
if rr == nil {
panic("round robin builder returned nil")
}
b.Balancer = rr
return b
}

func (triggerRPCBlockPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return &bpbConfig{}, nil
}

func (triggerRPCBlockPickerBalancerBuilder) Name() string {
return name
}

type bpbConfig struct {
serviceconfig.LoadBalancingConfig
}

// triggerRPCBlockBalancer uses a child RR balancer, but blocks all UpdateState
// calls until the first Pick call. That first Pick returns
// ErrNoSubConnAvailable to make the RPC block and trigger the appropriate stats
// handler callout. After the first Pick call, it will forward at least one
// READY picker update from the child, causing RPCs to proceed as normal using a
// round robin balancer's picker if it updates with a READY picker.
type triggerRPCBlockBalancer struct {
stateMu sync.Mutex
childState balancer.State

blockingPickerDone *grpcsync.Event
// embed a ClientConn to wrap only UpdateState() operation
balancer.ClientConn
// embed a Balancer to wrap only UpdateClientConnState() operation
balancer.Balancer
}

func (bpb *triggerRPCBlockBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
err := bpb.Balancer.UpdateClientConnState(s)
bpb.ClientConn.UpdateState(balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: &triggerRPCBlockPicker{
pickDone: func() {
bpb.stateMu.Lock()
defer bpb.stateMu.Unlock()
bpb.blockingPickerDone.Fire()
if bpb.childState.ConnectivityState == connectivity.Ready {
bpb.ClientConn.UpdateState(bpb.childState)
}
},
},
})
return err
}

func (bpb *triggerRPCBlockBalancer) UpdateState(state balancer.State) {
bpb.stateMu.Lock()
defer bpb.stateMu.Unlock()
bpb.childState = state
if bpb.blockingPickerDone.HasFired() { // guard first one to get a picker sending ErrNoSubConnAvailable first
if state.ConnectivityState == connectivity.Ready {
bpb.ClientConn.UpdateState(state) // after the first rr picker update, only forward once READY for deterministic picker counts
}
}
}

// TestRPCBlockingOnPickerStatsCall tests the emission of a stats handler call
// that represents the RPC had to block waiting for a new picker due to
// ErrNoSubConnAvailable being returned from the first picker call.
func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
sh := &statsHandlerRecordEvents{}
ss := &stubserver.StubServer{
UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}

if err := ss.StartServer(); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()

lbCfgJSON := `{
"loadBalancingConfig": [
{
"triggerRPCBlockBalancer": {}
}
]
}`

sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON)
mr := manual.NewBuilderWithScheme("pickerupdatedbalancer")
defer mr.Close()
mr.InitialState(resolver.State{
Addresses: []resolver.Address{
{Addr: ss.Address},
},
ServiceConfig: sc,
})

cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("grpc.Dial() failed: %v", err)
}
defer cc.Close()
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
testServiceClient := testgrpc.NewTestServiceClient(cc)
if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
}

var pickerUpdatedCount uint
for _, stat := range sh.s {
if _, ok := stat.(*stats.PickerUpdated); ok {
pickerUpdatedCount++
}
}
if pickerUpdatedCount != 1 {
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
}
}
4 changes: 4 additions & 0 deletions test/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,10 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
return ctx
}
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
// these calls come in nondeterministically - so can just ignore
if _, ok := s.(*stats.PickerUpdated); ok {
return
}
h.mu.Lock()
h.s = append(h.s, s)
h.mu.Unlock()
Expand Down

0 comments on commit 7aab9c0

Please sign in to comment.