Skip to content

Commit

Permalink
balancergroup: add a ParseConfig API and remove the UpdateBuilder
Browse files Browse the repository at this point in the history
… API (#7232)
  • Loading branch information
easwars authored May 22, 2024
1 parent a75dfa6 commit c822adf
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 164 deletions.
49 changes: 15 additions & 34 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package balancergroup

import (
"encoding/json"
"fmt"
"sync"
"time"
Expand All @@ -29,6 +30,7 @@ import (
"google.golang.org/grpc/internal/cache"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

// subBalancerWrapper is used to keep the configurations that will be used to start
Expand Down Expand Up @@ -148,20 +150,6 @@ func (sbc *subBalancerWrapper) resolverError(err error) {
b.ResolverError(err)
}

func (sbc *subBalancerWrapper) gracefulSwitch(builder balancer.Builder) {
sbc.builder = builder
b := sbc.balancer
// Even if you get an add and it persists builder but doesn't start
// balancer, this would leave graceful switch being nil, in which we are
// correctly overwriting with the recent builder here as well to use later.
// The graceful switch balancer's presence is an invariant of whether the
// balancer group is closed or not (if closed, nil, if started, present).
if sbc.balancer != nil {
sbc.group.logger.Infof("Switching child policy %v to type %v", sbc.id, sbc.builder.Name())
b.SwitchTo(sbc.builder)
}
}

func (sbc *subBalancerWrapper) stopBalancer() {
if sbc.balancer == nil {
return
Expand All @@ -170,7 +158,8 @@ func (sbc *subBalancerWrapper) stopBalancer() {
sbc.balancer = nil
}

// BalancerGroup takes a list of balancers, and make them into one balancer.
// BalancerGroup takes a list of balancers, each behind a gracefulswitch
// balancer, and make them into one balancer.
//
// Note that this struct doesn't implement balancer.Balancer, because it's not
// intended to be used directly as a balancer. It's expected to be used as a
Expand Down Expand Up @@ -377,25 +366,6 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
bg.AddWithClientConn(id, builder.Name(), bg.cc)
}

// UpdateBuilder updates the builder for a current child, starting the Graceful
// Switch process for that child.
//
// TODO: update this API to take the name of the new builder instead.
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
bg.outgoingMu.Lock()
// This does not deal with the balancer cache because this call should come
// after an Add call for a given child balancer. If the child is removed,
// the caller will call Add if the child balancer comes back which would
// then deal with the balancer cache.
sbc := bg.idToBalancerConfig[id]
if sbc == nil {
// simply ignore it if not present, don't error
return
}
sbc.gracefulSwitch(builder)
bg.outgoingMu.Unlock()
}

// Remove removes the balancer with id from the group.
//
// But doesn't close the balancer. The balancer is kept in a cache, and will be
Expand Down Expand Up @@ -636,3 +606,14 @@ func (bg *BalancerGroup) ExitIdleOne(id string) {
}
bg.outgoingMu.Unlock()
}

// ParseConfig parses a child config list and returns a LB config for the
// gracefulswitch Balancer.
//
// cfg is expected to be a json.RawMessage containing a JSON array of LB policy
// names + configs as the format of the "loadBalancingConfig" field in
// ServiceConfig. It returns a type that should be passed to
// UpdateClientConnState in the BalancerConfig field.
func ParseConfig(cfg json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return gracefulswitch.ParseConfig(cfg)
}
13 changes: 10 additions & 3 deletions internal/balancergroup/balancergroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package balancergroup

import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -609,9 +610,15 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
return bal.UpdateClientConnState(ccs)
},
})
builder := balancer.Get(childPolicyName)
bg.UpdateBuilder(testBalancerIDs[0], builder)
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]}}); err != nil {
cfgJSON := json.RawMessage(fmt.Sprintf(`[{%q: {}}]`, t.Name()))
lbCfg, err := ParseConfig(cfgJSON)
if err != nil {
t.Fatalf("ParseConfig(%s) failed: %v", string(cfgJSON), err)
}
if err := bg.UpdateClientConnState(testBalancerIDs[0], balancer.ClientConnState{
ResolverState: resolver.State{Addresses: testBackendAddrs[2:4]},
BalancerConfig: lbCfg,
}); err != nil {
t.Fatalf("error updating ClientConn state: %v", err)
}

Expand Down
126 changes: 31 additions & 95 deletions xds/internal/balancer/clustermanager/balancerstateaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,14 @@ func (s *subBalancerState) String() string {
type balancerStateAggregator struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
csEval *balancer.ConnectivityStateEvaluator

mu sync.Mutex
// If started is false, no updates should be sent to the parent cc. A closed
// sub-balancer could still send pickers to this aggregator. This makes sure
// that no updates will be forwarded to parent when the whole balancer group
// and states aggregator is closed.
started bool
// All balancer IDs exist as keys in this map, even if balancer group is not
// started.
//
// If an ID is not in map, it's either removed or never added.
// This field is used to ensure that no updates are forwarded to the parent
// CC once the aggregator is closed. A closed sub-balancer could still send
// pickers to this aggregator.
closed bool
// Map from child policy name to last reported state.
idToPickerState map[string]*subBalancerState
// Set when UpdateState call propagation is paused.
pauseUpdateState bool
Expand All @@ -68,34 +65,24 @@ func newBalancerStateAggregator(cc balancer.ClientConn, logger *grpclog.PrefixLo
return &balancerStateAggregator{
cc: cc,
logger: logger,
csEval: &balancer.ConnectivityStateEvaluator{},
idToPickerState: make(map[string]*subBalancerState),
}
}

// Start starts the aggregator. It can be called after Close to restart the
// aggretator.
func (bsa *balancerStateAggregator) start() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.started = true
}

// Close closes the aggregator. When the aggregator is closed, it won't call
// parent ClientConn to update balancer state.
func (bsa *balancerStateAggregator) close() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
bsa.started = false
bsa.clearStates()
bsa.closed = true
}

// add adds a sub-balancer state with weight. It adds a place holder, and waits
// for the real sub-balancer to update state.
// add adds a sub-balancer in CONNECTING state.
//
// This is called when there's a new child.
func (bsa *balancerStateAggregator) add(id string) {
bsa.mu.Lock()
defer bsa.mu.Unlock()

bsa.idToPickerState[id] = &subBalancerState{
// Start everything in CONNECTING, so if one of the sub-balancers
// reports TransientFailure, the RPCs will still wait for the other
Expand All @@ -106,6 +93,8 @@ func (bsa *balancerStateAggregator) add(id string) {
},
stateToAggregate: connectivity.Connecting,
}
bsa.csEval.RecordTransition(connectivity.Shutdown, connectivity.Connecting)
bsa.buildAndUpdateLocked()
}

// remove removes the sub-balancer state. Future updates from this sub-balancer,
Expand All @@ -118,9 +107,15 @@ func (bsa *balancerStateAggregator) remove(id string) {
if _, ok := bsa.idToPickerState[id]; !ok {
return
}
// Setting the state of the deleted sub-balancer to Shutdown will get
// csEvltr to remove the previous state for any aggregated state
// evaluations. Transitions to and from connectivity.Shutdown are ignored
// by csEvltr.
bsa.csEval.RecordTransition(bsa.idToPickerState[id].stateToAggregate, connectivity.Shutdown)
// Remove id and picker from picker map. This also results in future updates
// for this ID to be ignored.
delete(bsa.idToPickerState, id)
bsa.buildAndUpdateLocked()
}

// pauseStateUpdates causes UpdateState calls to not propagate to the parent
Expand All @@ -140,7 +135,7 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
defer bsa.mu.Unlock()
bsa.pauseUpdateState = false
if bsa.needUpdateStateOnResume {
bsa.cc.UpdateState(bsa.build())
bsa.cc.UpdateState(bsa.buildLocked())
}
}

Expand All @@ -149,6 +144,8 @@ func (bsa *balancerStateAggregator) resumeStateUpdates() {
//
// It calls parent ClientConn's UpdateState with the new aggregated state.
func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State) {
bsa.logger.Infof("State update from sub-balancer %q: %+v", id, state)

bsa.mu.Lock()
defer bsa.mu.Unlock()
pickerSt, ok := bsa.idToPickerState[id]
Expand All @@ -162,42 +159,17 @@ func (bsa *balancerStateAggregator) UpdateState(id string, state balancer.State)
// update the state, to prevent the aggregated state from being always
// CONNECTING. Otherwise, stateToAggregate is the same as
// state.ConnectivityState.
bsa.csEval.RecordTransition(pickerSt.stateToAggregate, state.ConnectivityState)
pickerSt.stateToAggregate = state.ConnectivityState
}
pickerSt.state = state

if !bsa.started {
return
}
if bsa.pauseUpdateState {
// If updates are paused, do not call UpdateState, but remember that we
// need to call it when they are resumed.
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
}

// clearState Reset everything to init state (Connecting) but keep the entry in
// map (to keep the weight).
//
// Caller must hold bsa.mu.
func (bsa *balancerStateAggregator) clearStates() {
for _, pState := range bsa.idToPickerState {
pState.state = balancer.State{
ConnectivityState: connectivity.Connecting,
Picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable),
}
pState.stateToAggregate = connectivity.Connecting
}
bsa.buildAndUpdateLocked()
}

// buildAndUpdate combines the sub-state from each sub-balancer into one state,
// and update it to parent ClientConn.
func (bsa *balancerStateAggregator) buildAndUpdate() {
bsa.mu.Lock()
defer bsa.mu.Unlock()
if !bsa.started {
// buildAndUpdateLocked combines the sub-state from each sub-balancer into one
// state, and sends a picker update to the parent ClientConn.
func (bsa *balancerStateAggregator) buildAndUpdateLocked() {
if bsa.closed {
return
}
if bsa.pauseUpdateState {
Expand All @@ -206,55 +178,19 @@ func (bsa *balancerStateAggregator) buildAndUpdate() {
bsa.needUpdateStateOnResume = true
return
}
bsa.cc.UpdateState(bsa.build())
bsa.cc.UpdateState(bsa.buildLocked())
}

// build combines sub-states into one. The picker will do a child pick.
//
// Caller must hold bsa.mu.
func (bsa *balancerStateAggregator) build() balancer.State {
// TODO: the majority of this function (and UpdateState) is exactly the same
// as weighted_target's state aggregator. Try to make a general utility
// function/struct to handle the logic.
//
// One option: make a SubBalancerState that handles Update(State), including
// handling the special connecting after ready, as in UpdateState(). Then a
// function to calculate the aggregated connectivity state as in this
// function.
//
// TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated
// state.
var readyN, connectingN, idleN int
for _, ps := range bsa.idToPickerState {
switch ps.stateToAggregate {
case connectivity.Ready:
readyN++
case connectivity.Connecting:
connectingN++
case connectivity.Idle:
idleN++
}
}
var aggregatedState connectivity.State
switch {
case readyN > 0:
aggregatedState = connectivity.Ready
case connectingN > 0:
aggregatedState = connectivity.Connecting
case idleN > 0:
aggregatedState = connectivity.Idle
default:
aggregatedState = connectivity.TransientFailure
}

// buildLocked combines sub-states into one.
func (bsa *balancerStateAggregator) buildLocked() balancer.State {
// The picker's return error might not be consistent with the
// aggregatedState. Because for this LB policy, we want to always build
// picker with all sub-pickers (not only ready sub-pickers), so even if the
// overall state is Ready, pick for certain RPCs can behave like Connecting
// or TransientFailure.
bsa.logger.Infof("Child pickers: %+v", bsa.idToPickerState)
return balancer.State{
ConnectivityState: aggregatedState,
ConnectivityState: bsa.csEval.CurrentState(),
Picker: newPickerGroup(bsa.idToPickerState),
}
}
Loading

0 comments on commit c822adf

Please sign in to comment.