Skip to content

Commit

Permalink
xds/clusterimpl: stop forwarding UpdateSubConnState calls (grpc#6518)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Aug 9, 2023
1 parent 8def12a commit dceb6ee
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 40 deletions.
25 changes: 13 additions & 12 deletions xds/internal/balancer/clusterimpl/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,24 +374,25 @@ func (s) TestPickerUpdateAfterClose(t *testing.T) {
stub.Register(childPolicyName, stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
// Create a subConn which will be used later on to test the race
// between UpdateSubConnState() and Close().
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{})
// between StateListener() and Close().
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, balancer.NewSubConnOptions{
StateListener: func(balancer.SubConnState) {
go func() {
// Wait for Close() to be called on the parent policy before
// sending the picker update.
<-closeCh
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
}()
},
})
if err != nil {
return err
}
sc.Connect()
return nil
},
UpdateSubConnState: func(bd *stub.BalancerData, _ balancer.SubConn, _ balancer.SubConnState) {
go func() {
// Wait for Close() to be called on the parent policy before
// sending the picker update.
<-closeCh
bd.ClientConn.UpdateState(balancer.State{
Picker: base.NewErrPicker(errors.New("dummy error picker")),
})
}()
},
})

var maxRequest uint32 = 50
Expand Down
29 changes: 1 addition & 28 deletions xds/internal/balancer/clusterimpl/clusterimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
closed: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
loadWrapper: loadstore.NewWrapper(),
scWrappers: make(map[balancer.SubConn]*scWrapper),
pickerUpdateCh: buffer.NewUnbounded(),
requestCountMax: defaultRequestCountMax,
}
Expand Down Expand Up @@ -113,18 +112,6 @@ type clusterImplBalancer struct {
clusterNameMu sync.Mutex
clusterName string

scWrappersMu sync.Mutex
// The SubConns passed to the child policy are wrapped in a wrapper, to keep
// locality ID. But when the parent ClientConn sends updates, it's going to
// give the original SubConn, not the wrapper. But the child policies only
// know about the wrapper, so when forwarding SubConn updates, they must be
// sent for the wrappers.
//
// This keeps a map from original SubConn to wrapper, so that when
// forwarding the SubConn state update, the child policy will get the
// wrappers.
scWrappers map[balancer.SubConn]*scWrapper

// childState/drops/requestCounter keeps the state used by the most recently
// generated picker. All fields can only be accessed in run(). And run() is
// the only goroutine that sends picker to the parent ClientConn. All
Expand Down Expand Up @@ -296,24 +283,13 @@ func (b *clusterImplBalancer) updateSubConnState(sc balancer.SubConn, s balancer
b.ClientConn.ResolveNow(resolver.ResolveNowOptions{})
}

b.scWrappersMu.Lock()
if scw, ok := b.scWrappers[sc]; ok {
sc = scw
if s.ConnectivityState == connectivity.Shutdown {
// Remove this SubConn from the map on Shutdown.
delete(b.scWrappers, scw.SubConn)
}
}
b.scWrappersMu.Unlock()
if cb != nil {
cb(s)
} else {
b.child.UpdateSubConnState(sc, s)
}
}

func (b *clusterImplBalancer) UpdateSubConnState(sc balancer.SubConn, s balancer.SubConnState) {
b.updateSubConnState(sc, s, nil)
b.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, s)
}

func (b *clusterImplBalancer) Close() {
Expand Down Expand Up @@ -394,11 +370,8 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}
// Wrap this SubConn in a wrapper, and add it to the map.
b.scWrappersMu.Lock()
ret := &scWrapper{SubConn: sc}
ret.updateLocalityID(lID)
b.scWrappers[sc] = ret
b.scWrappersMu.Unlock()
return ret, nil
}

Expand Down

0 comments on commit dceb6ee

Please sign in to comment.