Skip to content

Commit

Permalink
balancer/weightedtarget: pause picker updates during UpdateClientConn…
Browse files Browse the repository at this point in the history
…State (#5527)
  • Loading branch information
dfawley authored Jul 21, 2022
1 parent 679138d commit 86117db
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 0 deletions.
41 changes: 41 additions & 0 deletions balancer/weightedtarget/weightedaggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ type Aggregator struct {
//
// If an ID is not in map, it's either removed or never added.
idToPickerState map[string]*weightedPickerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
// Set when UpdateState call propagation is paused and an UpdateState call
// is suppressed.
needUpdateStateOnResume bool
}

// New creates a new weighted balancer state aggregator.
Expand Down Expand Up @@ -141,6 +146,27 @@ func (wbsa *Aggregator) UpdateWeight(id string, newWeight uint32) {
pState.weight = newWeight
}

// PauseStateUpdates causes UpdateState calls to not propagate to the parent
// ClientConn. The last state will be remembered and propagated when
// ResumeStateUpdates is called.
func (wbsa *Aggregator) PauseStateUpdates() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.pauseUpdateState = true
wbsa.needUpdateStateOnResume = false
}

// ResumeStateUpdates will resume propagating UpdateState calls to the parent,
// and call UpdateState on the parent if any UpdateState call was suppressed.
func (wbsa *Aggregator) ResumeStateUpdates() {
wbsa.mu.Lock()
defer wbsa.mu.Unlock()
wbsa.pauseUpdateState = false
if wbsa.needUpdateStateOnResume {
wbsa.cc.UpdateState(wbsa.build())
}
}

// UpdateState is called to report a balancer state change from sub-balancer.
// It's usually called by the balancer group.
//
Expand All @@ -166,6 +192,14 @@ func (wbsa *Aggregator) UpdateState(id string, newState balancer.State) {
if !wbsa.started {
return
}

if wbsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
wbsa.needUpdateStateOnResume = true
return
}

wbsa.cc.UpdateState(wbsa.build())
}

Expand All @@ -191,6 +225,13 @@ func (wbsa *Aggregator) BuildAndUpdate() {
if !wbsa.started {
return
}
if wbsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
wbsa.needUpdateStateOnResume = true
return
}

wbsa.cc.UpdateState(wbsa.build())
}

Expand Down
3 changes: 3 additions & 0 deletions balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func (b *weightedTargetBalancer) UpdateClientConnState(s balancer.ClientConnStat

var rebuildStateAndPicker bool

b.stateAggregator.PauseStateUpdates()
defer b.stateAggregator.ResumeStateUpdates()

// Remove sub-pickers and sub-balancers that are not in the new config.
for name := range b.targets {
if _, ok := newConfig.Targets[name]; !ok {
Expand Down
55 changes: 55 additions & 0 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,3 +1218,58 @@ func (s) TestInitialIdle(t *testing.T) {
t.Fatalf("Received aggregated state: %v, want Idle", state)
}
}

// tcc wraps a testutils.TestClientConn but stores all state transitions in a
// slice.
type tcc struct {
*testutils.TestClientConn
states []balancer.State
}

func (t *tcc) UpdateState(bs balancer.State) {
t.states = append(t.states, bs)
t.TestClientConn.UpdateState(bs)
}

func (s) TestUpdateStatePauses(t *testing.T) {
cc := &tcc{TestClientConn: testutils.NewTestClientConn(t)}

balFuncs := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, s balancer.ClientConnState) error {
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.TransientFailure, Picker: nil})
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: nil})
return nil
},
}
stub.Register("update_state_balancer", balFuncs)

wtb := wtbBuilder.Build(cc, balancer.BuildOptions{})
defer wtb.Close()

config, err := wtbParser.ParseConfig([]byte(`
{
"targets": {
"cluster_1": {
"weight":1,
"childPolicy": [{"update_state_balancer": ""}]
}
}
}`))
if err != nil {
t.Fatalf("failed to parse balancer config: %v", err)
}

// Send the config, and an address with hierarchy path ["cluster_1"].
addrs := []resolver.Address{{Addr: testBackendAddrStrs[0], Attributes: nil}}
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{hierarchy.Set(addrs[0], []string{"cds:cluster_1"})}},
BalancerConfig: config,
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}

// Verify that the only state update is the second one called by the child.
if len(cc.states) != 1 || cc.states[0].ConnectivityState != connectivity.Ready {
t.Fatalf("cc.states = %v; want [connectivity.Ready]", cc.states)
}
}

0 comments on commit 86117db

Please sign in to comment.