Skip to content

Commit

Permalink
balancer/endpointsharding: Call ExitIdle() on child if child reports …
Browse files Browse the repository at this point in the history
…IDLE (#7782)
  • Loading branch information
zasweq authored Oct 29, 2024
1 parent 2e3f547 commit d66fc3a
Showing 1 changed file with 21 additions and 0 deletions.
21 changes: 21 additions & 0 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ type endpointSharding struct {
cc balancer.ClientConn
bOpts balancer.BuildOptions

childMu sync.Mutex // syncs balancer.Balancer calls into children
children atomic.Pointer[resolver.EndpointMap]
closed bool

// inhibitChildUpdates is set during UpdateClientConnState/ResolverError
// calls (calls to children will each produce an update, only want one
Expand All @@ -83,6 +85,9 @@ type endpointSharding struct {
// addresses it will ignore that endpoint. Otherwise, returns first error found
// from a child, but fully processes the new update.
func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState) error {
es.childMu.Lock()
defer es.childMu.Unlock()

es.inhibitChildUpdates.Store(true)
defer func() {
es.inhibitChildUpdates.Store(false)
Expand Down Expand Up @@ -145,6 +150,8 @@ func (es *endpointSharding) UpdateClientConnState(state balancer.ClientConnState
// children and sends a single synchronous update of the childStates at the end
// of the ResolverError operation.
func (es *endpointSharding) ResolverError(err error) {
es.childMu.Lock()
defer es.childMu.Unlock()
es.inhibitChildUpdates.Store(true)
defer func() {
es.inhibitChildUpdates.Store(false)
Expand All @@ -162,11 +169,14 @@ func (es *endpointSharding) UpdateSubConnState(balancer.SubConn, balancer.SubCon
}

func (es *endpointSharding) Close() {
es.childMu.Lock()
defer es.childMu.Unlock()
children := es.children.Load()
for _, child := range children.Values() {
bal := child.(balancer.Balancer)
bal.Close()
}
es.closed = true
}

// updateState updates this component's state. It sends the aggregated state,
Expand Down Expand Up @@ -274,6 +284,17 @@ func (bw *balancerWrapper) UpdateState(state balancer.State) {
bw.es.mu.Lock()
bw.childState.State = state
bw.es.mu.Unlock()
// When a child balancer says it's IDLE, ping it to exit idle and reconnect.
// TODO: In the future, perhaps make this a knob in configuration.
if ei, ok := bw.Balancer.(balancer.ExitIdler); state.ConnectivityState == connectivity.Idle && ok {
go func() {
bw.es.childMu.Lock()
if !bw.es.closed {
ei.ExitIdle()
}
bw.es.childMu.Unlock()
}()
}
bw.es.updateState()
}

Expand Down

0 comments on commit d66fc3a

Please sign in to comment.