Skip to content

Commit

Permalink
rls: make the data cache purge ticker a field in rlsBalancer (#5154)
Browse files Browse the repository at this point in the history
  • Loading branch information
easwars authored Jan 20, 2022
1 parent f93e8e6 commit fa62572
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 16 deletions.
30 changes: 17 additions & 13 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ import (
"google.golang.org/grpc/resolver"
)

const (
// Name is the name of the RLS LB policy.
Name = "rls_experimental"
// Default frequency for data cache purging.
periodicCachePurgeFreq = time.Minute
)

var (
logger = grpclog.Component("rls")

// Below defined vars for overriding in unit tests.

// Default exponential backoff strategy for data cache entries.
defaultBackoffStrategy = backoff.Strategy(backoff.DefaultExponential)
// Default frequency for data cache purging.
periodicCachePurgeFreq = time.Minute
// Ticker used for periodic data cache purging.
dataCachePurgeTicker = func() *time.Ticker { return time.NewTicker(periodicCachePurgeFreq) }
// We want every cache entry to live in the cache for at least this
// duration. If we encounter a cache entry whose minimum expiration time is
// in the future, we abort the LRU pass, which may temporarily leave the
Expand All @@ -65,9 +72,6 @@ var (
resetBackoffHook = func() {}
)

// Name is the name of the RLS LB policy.
const Name = "rls_experimental"

func init() {
balancer.Register(&rlsBB{})
}
Expand All @@ -83,6 +87,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
done: grpcsync.NewEvent(),
cc: cc,
bopts: opts,
purgeTicker: dataCachePurgeTicker(),
lbCfg: &lbConfig{},
pendingMap: make(map[cacheKey]*backoffState),
childPolicies: make(map[string]*childPolicyWrapper),
Expand All @@ -100,10 +105,11 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.

// rlsBalancer implements the RLS LB policy.
type rlsBalancer struct {
done *grpcsync.Event
cc balancer.ClientConn
bopts balancer.BuildOptions
logger *internalgrpclog.PrefixLogger
done *grpcsync.Event
cc balancer.ClientConn
bopts balancer.BuildOptions
purgeTicker *time.Ticker
logger *internalgrpclog.PrefixLogger

// If both cacheMu and stateMu need to be acquired, the former must be
// acquired first to prevent a deadlock. This order restriction is due to the
Expand Down Expand Up @@ -169,14 +175,11 @@ func (b *rlsBalancer) run() {
// entries. An expired entry is one for which both the expiryTime and
// backoffExpiryTime are in the past.
func (b *rlsBalancer) purgeDataCache() {
ticker := time.NewTicker(periodicCachePurgeFreq)
defer ticker.Stop()

for {
select {
case <-b.done.Done():
return
case <-ticker.C:
case <-b.purgeTicker.C:
b.cacheMu.Lock()
updatePicker := b.dataCache.evictExpiredEntries()
b.cacheMu.Unlock()
Expand Down Expand Up @@ -376,6 +379,7 @@ func (b *rlsBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Sub
func (b *rlsBalancer) Close() {
b.done.Fire()

b.purgeTicker.Stop()
b.stateMu.Lock()
if b.ctrlCh != nil {
b.ctrlCh.close()
Expand Down
8 changes: 5 additions & 3 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,9 +586,11 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
// entries from the data cache.
func (s) TestDataCachePurging(t *testing.T) {
// Override the frequency of the data cache purger to a small one.
origPurgeFreq := periodicCachePurgeFreq
periodicCachePurgeFreq = defaultTestShortTimeout
defer func() { periodicCachePurgeFreq = origPurgeFreq }()
origDataCachePurgeTicker := dataCachePurgeTicker
ticker := time.NewTicker(defaultTestShortTimeout)
defer ticker.Stop()
dataCachePurgeTicker = func() *time.Ticker { return ticker }
defer func() { dataCachePurgeTicker = origDataCachePurgeTicker }()

// Override the data cache purge hook to get notified.
dataCachePurgeDone := make(chan struct{}, 1)
Expand Down

0 comments on commit fa62572

Please sign in to comment.