Skip to content

Commit

Permalink
balancergroup: do not cache closed sub-balancers by default (#6523)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Aug 10, 2023
1 parent 68704f8 commit bb41067
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 96 deletions.
8 changes: 7 additions & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,13 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger)
lb.bg = balancergroup.New(cc, opts, lb, lb.logger)
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
StateAggregator: lb,
Logger: lb.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
lb.bg.Start()
go lb.run()
return lb
Expand Down
5 changes: 0 additions & 5 deletions balancer/rls/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"google.golang.org/grpc/balancer/rls/internal/test/e2e"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/grpctest"
rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1"
Expand All @@ -48,10 +47,6 @@ const (
defaultTestShortTimeout = 100 * time.Millisecond
)

func init() {
balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
}

type s struct {
grpctest.Tester
}
Expand Down
9 changes: 8 additions & 1 deletion balancer/weightedtarget/weightedtarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package weightedtarget
import (
"encoding/json"
"fmt"
"time"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedtarget/weightedaggregator"
Expand Down Expand Up @@ -54,7 +55,13 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
b.logger = prefixLogger(b)
b.stateAggregator = weightedaggregator.New(cc, b.logger, NewRandomWRR)
b.stateAggregator.Start()
b.bg = balancergroup.New(cc, bOpts, b.stateAggregator, b.logger)
b.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: bOpts,
StateAggregator: b.stateAggregator,
Logger: b.logger,
SubBalancerCloseTimeout: time.Duration(0), // Disable caching of removed child policies
})
b.bg.Start()
b.logger.Infof("Created")
return b
Expand Down
2 changes: 0 additions & 2 deletions balancer/weightedtarget/weightedtarget_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/balancergroup"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/testutils"
Expand Down Expand Up @@ -159,7 +158,6 @@ func init() {
wtbBuilder = balancer.Get(Name)
wtbParser = wtbBuilder.(balancer.ConfigParser)

balancergroup.DefaultSubBalancerCloseTimeout = time.Millisecond
NewRandomWRR = testutils.NewTestWRR
}

Expand Down
115 changes: 77 additions & 38 deletions internal/balancergroup/balancergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ type BalancerGroup struct {
outgoingMu sync.Mutex
outgoingStarted bool
idToBalancerConfig map[string]*subBalancerWrapper
// Cache for sub-balancers when they are removed.
balancerCache *cache.TimeoutCache
// Cache for sub-balancers when they are removed. This is `nil` if caching
// is disabled by passing `0` for Options.SubBalancerCloseTimeout`.
deletedBalancerCache *cache.TimeoutCache

// incomingMu is to make sure this balancer group doesn't send updates to cc
// after it's closed.
Expand Down Expand Up @@ -244,24 +245,40 @@ type BalancerGroup struct {
scToSubBalancer map[balancer.SubConn]*subBalancerWrapper
}

// DefaultSubBalancerCloseTimeout is defined as a variable instead of const for
// testing.
//
// TODO: make it a parameter for New().
var DefaultSubBalancerCloseTimeout = 15 * time.Minute
// Options wraps the arguments to be passed to the BalancerGroup ctor.
type Options struct {
// CC is a reference to the parent balancer.ClientConn.
CC balancer.ClientConn
// BuildOpts contains build options to be used when creating sub-balancers.
BuildOpts balancer.BuildOptions
// StateAggregator is an implementation of the BalancerStateAggregator
// interface to aggregate picker and connectivity states from sub-balancers.
StateAggregator BalancerStateAggregator
// Logger is a group specific prefix logger.
Logger *grpclog.PrefixLogger
// SubBalancerCloseTimeout is the amount of time deleted sub-balancers spend
// in the idle cache. A value of zero here disables caching of deleted
// sub-balancers.
SubBalancerCloseTimeout time.Duration
}

// New creates a new BalancerGroup. Note that the BalancerGroup
// needs to be started to work.
func New(cc balancer.ClientConn, bOpts balancer.BuildOptions, stateAggregator BalancerStateAggregator, logger *grpclog.PrefixLogger) *BalancerGroup {
return &BalancerGroup{
cc: cc,
buildOpts: bOpts,
logger: logger,
stateAggregator: stateAggregator,
func New(opts Options) *BalancerGroup {
var bc *cache.TimeoutCache
if opts.SubBalancerCloseTimeout != time.Duration(0) {
bc = cache.NewTimeoutCache(opts.SubBalancerCloseTimeout)
}

idToBalancerConfig: make(map[string]*subBalancerWrapper),
balancerCache: cache.NewTimeoutCache(DefaultSubBalancerCloseTimeout),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
return &BalancerGroup{
cc: opts.CC,
buildOpts: opts.BuildOpts,
stateAggregator: opts.StateAggregator,
logger: opts.Logger,

deletedBalancerCache: bc,
idToBalancerConfig: make(map[string]*subBalancerWrapper),
scToSubBalancer: make(map[balancer.SubConn]*subBalancerWrapper),
}
}

Expand Down Expand Up @@ -307,9 +324,10 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
defer bg.outgoingMu.Unlock()
var sbc *subBalancerWrapper
// If outgoingStarted is true, search in the cache. Otherwise, cache is
// guaranteed to be empty, searching is unnecessary.
if bg.outgoingStarted {
if old, ok := bg.balancerCache.Remove(id); ok {
// guaranteed to be empty, searching is unnecessary. Also, skip the cache if
// caching is disabled.
if bg.outgoingStarted && bg.deletedBalancerCache != nil {
if old, ok := bg.deletedBalancerCache.Remove(id); ok {
sbc, _ = old.(*subBalancerWrapper)
if sbc != nil && sbc.builder != builder {
// If the sub-balancer in cache was built with a different
Expand Down Expand Up @@ -380,28 +398,47 @@ func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
// subconns) will be done after timeout.
func (bg *BalancerGroup) Remove(id string) {
bg.logger.Infof("Removing child policy for locality %q", id)

bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
if bg.outgoingStarted {
bg.balancerCache.Add(id, sbToRemove, func() {
// A sub-balancer evicted from the timeout cache needs to closed
// and its subConns need to removed, unconditionally. There is a
// possibility that a sub-balancer might be removed (thereby
// moving it to the cache) around the same time that the
// balancergroup is closed, and by the time we get here the
// balancergroup might be closed. Check for `outgoingStarted ==
// true` at that point can lead to a leaked sub-balancer.
bg.outgoingMu.Lock()
sbToRemove.stopBalancer()
bg.outgoingMu.Unlock()
bg.cleanupSubConns(sbToRemove)
})
}
delete(bg.idToBalancerConfig, id)
} else {

sbToRemove, ok := bg.idToBalancerConfig[id]
if !ok {
bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id)
bg.outgoingMu.Unlock()
return
}

// Unconditionally remove the sub-balancer config from the map.
delete(bg.idToBalancerConfig, id)
if !bg.outgoingStarted {
// Nothing needs to be done here, since we wouldn't have created the
// sub-balancer.
bg.outgoingMu.Unlock()
return
}

if bg.deletedBalancerCache != nil {
bg.deletedBalancerCache.Add(id, sbToRemove, func() {
// A sub-balancer evicted from the timeout cache needs to closed
// and its subConns need to removed, unconditionally. There is a
// possibility that a sub-balancer might be removed (thereby
// moving it to the cache) around the same time that the
// balancergroup is closed, and by the time we get here the
// balancergroup might be closed. Check for `outgoingStarted ==
// true` at that point can lead to a leaked sub-balancer.
bg.outgoingMu.Lock()
sbToRemove.stopBalancer()
bg.outgoingMu.Unlock()
bg.cleanupSubConns(sbToRemove)
})
bg.outgoingMu.Unlock()
return
}

// Remove the sub-balancer with immediate effect if we are not caching.
sbToRemove.stopBalancer()
bg.outgoingMu.Unlock()
bg.cleanupSubConns(sbToRemove)
}

// bg.remove(id) doesn't do cleanup for the sub-balancer. This function does
Expand Down Expand Up @@ -546,7 +583,9 @@ func (bg *BalancerGroup) Close() {

// Clear(true) runs clear function to close sub-balancers in cache. It
// must be called out of outgoing mutex.
bg.balancerCache.Clear(true)
if bg.deletedBalancerCache != nil {
bg.deletedBalancerCache.Clear(true)
}

bg.outgoingMu.Lock()
if bg.outgoingStarted {
Expand Down
Loading

0 comments on commit bb41067

Please sign in to comment.