Skip to content

Commit

Permalink
xds/clusterresolver: stop forwarding UpdateSubConnState calls (grpc#6526
Browse files Browse the repository at this point in the history
)
  • Loading branch information
dfawley authored Aug 9, 2023
1 parent 8f51ca8 commit 694cb64
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 125 deletions.
33 changes: 2 additions & 31 deletions xds/internal/balancer/clusterresolver/clusterresolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

// Package clusterresolver contains the implementation of the
// xds_cluster_resolver_experimental LB policy which resolves endpoint addresses
// cluster_resolver_experimental LB policy which resolves endpoint addresses
// using a list of one or more discovery mechanisms.
package clusterresolver

Expand Down Expand Up @@ -150,13 +150,6 @@ type ccUpdate struct {
err error
}

// scUpdate wraps a subConn update received from gRPC. This is directly passed
// on to the child policy.
type scUpdate struct {
subConn balancer.SubConn
state balancer.SubConnState
}

type exitIdle struct{}

// clusterResolverBalancer resolves endpoint addresses using a list of one or
Expand Down Expand Up @@ -314,14 +307,6 @@ func (b *clusterResolverBalancer) run() {
switch update := u.(type) {
case *ccUpdate:
b.handleClientConnUpdate(update)
case *scUpdate:
// SubConn updates are simply handed over to the underlying
// child balancer.
if b.child == nil {
b.logger.Errorf("Received a SubConn update {%+v} with no child policy", update)
break
}
b.child.UpdateSubConnState(update.subConn, update.state)
case exitIdle:
if b.child == nil {
b.logger.Errorf("xds: received ExitIdle with no child balancer")
Expand Down Expand Up @@ -388,11 +373,7 @@ func (b *clusterResolverBalancer) ResolverError(err error) {

// UpdateSubConnState handles subConn updates from gRPC.
func (b *clusterResolverBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
if b.closed.HasFired() {
b.logger.Warningf("Received subConn update {%v, %v} after close", sc, state)
return
}
b.updateCh.Put(&scUpdate{subConn: sc, state: state})
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, state)
}

// Close closes the cdsBalancer and the underlying child balancer.
Expand All @@ -419,13 +400,3 @@ type ccWrapper struct {
func (c *ccWrapper) ResolveNow(resolver.ResolveNowOptions) {
c.resourceWatcher.resolveNow()
}

func (c *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) {
if opts.StateListener == nil {
// If already set, just allow updates to be sent directly to the
// child's listener. Otherwise, we are responsible for forwarding the
// update we'll receive to the proper child.
opts.StateListener = func(state balancer.SubConnState) { c.b.UpdateSubConnState(sc, state) }
}
return c.ClientConn.NewSubConn(addrs, opts)
}
94 changes: 0 additions & 94 deletions xds/internal/balancer/clusterresolver/e2e_test/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/buffer"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -318,99 +317,6 @@ func (s) TestErrorFromParentLB_ResourceNotFound(t *testing.T) {
}
}

// testCCWrapper wraps a balancer.ClientConn and intercepts NewSubConn to make
// subConn state changes available to the test.
type testCCWrapper struct {
balancer.ClientConn
scStateCh *buffer.Unbounded
}

func (t *testCCWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (sc balancer.SubConn, err error) {
oldListener := opts.StateListener
opts.StateListener = func(state balancer.SubConnState) {
t.scStateCh.Put(state)
if oldListener != nil {
oldListener(state)
}
}
return t.ClientConn.NewSubConn(addrs, opts)
}

// Test verifies that SubConn state changes are propagated to the child policy
// by the cluster resolver LB policy.
func (s) TestSubConnStateChangePropagationToChildPolicy(t *testing.T) {
// Unregister the priority balancer builder for the duration of this test,
// and register a policy under the same name that makes SubConn state
// changes pushed to it available to the test.
priorityBuilder := balancer.Get(priority.Name)
internal.BalancerUnregister(priorityBuilder.Name())
var ccWrapper *testCCWrapper
stub.Register(priority.Name, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
ccWrapper = &testCCWrapper{
ClientConn: bd.ClientConn,
scStateCh: buffer.NewUnbounded(),
}
bd.Data = priorityBuilder.Build(ccWrapper, bd.BuildOptions)
},
ParseConfig: func(lbCfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return priorityBuilder.(balancer.ConfigParser).ParseConfig(lbCfg)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bal := bd.Data.(balancer.Balancer)
bal.Close()
},
})

defer balancer.Register(priorityBuilder)

managementServer, nodeID, bootstrapContents, _, cleanup := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{})
defer cleanup()

server := stubserver.StartTestService(t, nil)
defer server.Stop()

// Configure cluster and endpoints resources in the management server.
resources := e2e.UpdateOptions{
NodeID: nodeID,
Clusters: []*v3clusterpb.Cluster{e2e.DefaultCluster(clusterName, edsServiceName, e2e.SecurityLevelNone)},
Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{testutils.ParsePort(t, server.Address)})},
SkipValidation: true,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}

// Create xDS client, configure cds_experimental LB policy with a manual
// resolver, and dial the test backends.
cc, cleanup := setupAndDial(t, bootstrapContents)
defer cleanup()

client := testgrpc.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
t.Fatalf("EmptyCall() failed: %v", err)
}

for {
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for child policy to see a READY SubConn")
case s := <-ccWrapper.scStateCh.Get():
ccWrapper.scStateCh.Load()
state := s.(balancer.SubConnState)
if state.ConnectivityState == connectivity.Ready {
return
}
}
}
}

// Test verifies that when the received Cluster resource contains outlier
// detection configuration, the LB config pushed to the child policy contains
// the appropriate configuration for the outlier detection LB policy.
Expand Down

0 comments on commit 694cb64

Please sign in to comment.