Skip to content

Commit

Permalink
xds/cdsbalancer: stop handling subconn state updates (grpc#6509)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 7, 2023
1 parent e9a4e94 commit 9c46304
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 83 deletions.
21 changes: 1 addition & 20 deletions xds/internal/balancer/cdsbalancer/cdsbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,6 @@ type ccUpdate struct {
err error
}

// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the cluster_resolver balancer.
type scUpdate struct {
subConn balancer.SubConn
state balancer.SubConnState
}

type exitIdle struct{}

// cdsBalancer implements a CDS based LB policy. It instantiates a
Expand Down Expand Up @@ -415,14 +408,6 @@ func (b *cdsBalancer) run() {
switch update := u.(type) {
case *ccUpdate:
b.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are passthrough and are simply handed over to
// the underlying cluster_resolver balancer.
if b.childLB == nil {
b.logger.Errorf("Received SubConn update with no child policy: %+v", update)
break
}
b.childLB.UpdateSubConnState(update.subConn, update.state)
case exitIdle:
if b.childLB == nil {
b.logger.Errorf("Received ExitIdle with no child policy")
Expand Down Expand Up @@ -540,11 +525,7 @@ func (b *cdsBalancer) ResolverError(err error) {

// UpdateSubConnState handles subConn updates from gRPC.
func (b *cdsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if b.closed.HasFired() {
b.logger.Warningf("Received subConn update after close: {%v, %v}", sc, state)
return
}
b.updateCh.Put(&scUpdate{subConn: sc, state: state})
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

// Close cancels the CDS watch, closes the child policy and closes the
Expand Down
63 changes: 0 additions & 63 deletions xds/internal/balancer/cdsbalancer/cdsbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpctest"
internalserviceconfig "google.golang.org/grpc/internal/serviceconfig"
Expand Down Expand Up @@ -175,20 +174,6 @@ func (tb *testEDSBalancer) waitForClientConnUpdate(ctx context.Context, wantCCS
return nil
}

// waitForSubConnUpdate verifies if the testEDSBalancer receives the provided
// SubConn update before the context expires.
func (tb *testEDSBalancer) waitForSubConnUpdate(ctx context.Context, wantSCS subConnWithState) error {
scs, err := tb.scStateCh.Receive(ctx)
if err != nil {
return err
}
gotSCS := scs.(subConnWithState)
if !cmp.Equal(gotSCS, wantSCS, cmp.AllowUnexported(subConnWithState{})) {
return fmt.Errorf("received SubConnState: %+v, want %+v", gotSCS, wantSCS)
}
return nil
}

// waitForResolverError verifies if the testEDSBalancer receives the provided
// resolver error before the context expires.
func (tb *testEDSBalancer) waitForResolverError(ctx context.Context, wantErr error) error {
Expand Down Expand Up @@ -698,45 +683,6 @@ func (s) TestResolverError(t *testing.T) {
}
}

// TestUpdateSubConnState verifies the UpdateSubConnState() method in the CDS
// balancer.
func (s) TestUpdateSubConnState(t *testing.T) {
// This creates a CDS balancer, pushes a ClientConnState update with a fake
// xdsClient, and makes sure that the CDS balancer registers a watch on the
// provided xdsClient.
xdsC, cdsB, edsB, _, cancel := setupWithWatch(t)
defer func() {
cancel()
cdsB.Close()
}()

// Here we invoke the watch callback registered on the fake xdsClient. This
// will trigger the watch handler on the CDS balancer, which will attempt to
// create a new EDS balancer. The fake EDS balancer created above will be
// returned to the CDS balancer, because we have overridden the
// newChildBalancer function as part of test setup.
cdsUpdate := xdsresource.ClusterUpdate{
ClusterName: serviceName,
LBPolicy: wrrLocalityLBConfigJSON,
}
wantCCS := edsCCS(serviceName, nil, false, wrrLocalityLBConfigJSON, noopODLBCfgJSON)
ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer ctxCancel()
if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil {
t.Fatal(err)
}

// Push a subConn state change to the CDS balancer.
var sc balancer.SubConn
state := balancer.SubConnState{ConnectivityState: connectivity.Ready}
cdsB.UpdateSubConnState(sc, state)

// Make sure that the update is forwarded to the EDS balancer.
if err := edsB.waitForSubConnUpdate(ctx, subConnWithState{sc: sc, state: state}); err != nil {
t.Fatal(err)
}
}

// TestCircuitBreaking verifies that the CDS balancer correctly updates a
// service's counter on watch updates.
func (s) TestCircuitBreaking(t *testing.T) {
Expand Down Expand Up @@ -829,15 +775,6 @@ func (s) TestClose(t *testing.T) {
t.Fatalf("UpdateClientConnState() after close returned %v, want %v", err, errBalancerClosed)
}

// Make sure that the UpdateSubConnState() method on the CDS balancer does
// not forward the update to the EDS balancer.
cdsB.UpdateSubConnState(&testutils.TestSubConn{}, balancer.SubConnState{})
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if err := edsB.waitForSubConnUpdate(sCtx, subConnWithState{}); err != context.DeadlineExceeded {
t.Fatal("UpdateSubConnState() forwarded to EDS balancer after Close()")
}

// Make sure that the ResolverErr() method on the CDS balancer does not
// forward the update to the EDS balancer.
rErr := errors.New("cdsBalancer resolver error")
Expand Down

0 comments on commit 9c46304

Please sign in to comment.