Skip to content

Commit

Permalink
Implement match exact rv in watch cache
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Nov 24, 2024
1 parent 7ccf813 commit 111c1a9
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cacher
import (
"context"
"fmt"
"k8s.io/client-go/tools/cache"
"reflect"
"sync"
"testing"
Expand All @@ -35,7 +36,6 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
testingclock "k8s.io/utils/clock/testing"
)

Expand Down
19 changes: 14 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,19 +773,28 @@ func (c *Cacher) shouldDelegateList(opts storage.ListOptions) (bool, error) {
isLegacyResourceVersionMatchExact := opts.ResourceVersionMatch == "" && opts.Predicate.Limit > 0 && nonEmptyResourceVersion
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
switch {
case nonEmptyResourceVersion && len(opts.Predicate.Continue) > 0:
_, _, err := storage.DecodeContinue(opts.Predicate.Continue, c.resourcePrefix)
if err != nil {
return false, err
}
return false, errors.NewBadRequest("specifying resource version is not allowed when using continue")
case opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact || isLegacyResourceVersionMatchExact:
return true, nil
rv, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return false, err
}
isCached := c.watchCache.storeSnapshots.IncludesRV(rv)
return !isCached, nil
case opts.ResourceVersionMatch != "" && opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan:
return false, fmt.Errorf("unsupported")
case len(opts.Predicate.Continue) > 0:
_, rv, err := storage.DecodeContinue(opts.Predicate.Continue, c.resourcePrefix)
if err != nil {
return false, err
}
if nonEmptyResourceVersion {
return false, errors.NewBadRequest("specifying resource version is not allowed when using continue")
}
_, isCached := c.watchCache.continueCache.Get(uint64(rv))
isCached := c.watchCache.storeSnapshots.IncludesRV(uint64(rv))

return !isCached, nil
case opts.ResourceVersion == "":
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,23 +212,27 @@ func TestGetListCacheBypass(t *testing.T) {

{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{3}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{3}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{3}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{3}, expectBypass: false},

// Legacy exact match
{opts: storage.ListOptions{ResourceVersion: "2", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "2", Predicate: storage.SelectionPredicate{Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "2", Predicate: storage.SelectionPredicate{Limit: 500}}, cachedRevisions: []uint64{3}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "3", Predicate: storage.SelectionPredicate{Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, cachedRevisions: []uint64{3}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "3", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, cachedRevisions: []uint64{2}, expectBypass: false},
}

t.Run("ConsistentListFromStorage", func(t *testing.T) {
Expand Down Expand Up @@ -271,7 +275,7 @@ func testGetListCacheBypass(t *testing.T, options storage.ListOptions, cachedRev
}
defer cacher.Stop()
for _, rev := range cachedRevisions {
cacher.watchCache.continueCache.Set(rev, fakeOrderedLister{})
cacher.watchCache.storeSnapshots.CacheRV(rev, fakeOrderedLister{})
}

result := &example.PodList{}
Expand Down
20 changes: 15 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ const (
btreeDegree = 16
)

// storeIndexer is the basic interface for caching latest state.
type storeIndexer interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
mutableStore
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
Expand All @@ -74,10 +73,21 @@ type storeIndexer interface {
ByIndex(indexName, indexedValue string) ([]interface{}, error)
}

type orderedLister interface {
type immutableOrderedStore interface {
Count(prefix, continueKey string) (count int)
ListPrefix(prefix, continueKey string, limit int) (items []interface{}, hasMore bool)
Clone() orderedLister
Clone() mutableOrderedStore
}

type mutableOrderedStore interface {
mutableStore
immutableOrderedStore
}

type mutableStore interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
}

func newStoreIndexer(indexers *cache.Indexers) storeIndexer {
Expand Down
97 changes: 48 additions & 49 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package cacher

import (
"container/heap"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -51,7 +50,7 @@ func (si *threadedStoreIndexer) Count(prefix, continueKey string) (count int) {
return si.store.Count(prefix, continueKey)
}

func (si *threadedStoreIndexer) Clone() orderedLister {
func (si *threadedStoreIndexer) Clone() immutableOrderedStore {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Clone()
Expand Down Expand Up @@ -294,7 +293,7 @@ func (s *btreeStore) Count(prefix, continueKey string) (count int) {
return count
}

func (s *btreeStore) Clone() orderedLister {
func (s *btreeStore) Clone() mutableOrderedStore {
return &btreeStore{
tree: s.tree.Clone(),
}
Expand Down Expand Up @@ -411,74 +410,74 @@ func (i *indexer) delete(key, value string, index map[string]map[string]*storeEl
}
}

// continueCache caches roots of trees that were created as
// clones to serve LIST requests. When a continue request is
// meant to be served for a certain LIST request, we retrieve
// the tree that served the LIST request and serve the continue
// request from there.
// storeSnapshotter caches roots of trees that were created as
// clones to serve LIST requests.
//
// A tree is removed from this cache when the RV at which it was
// created is removed from the watchCache.
type continueCache struct {
type storeSnapshotter struct {
sync.RWMutex
revisions minIntHeap
cache map[uint64]orderedLister
revisions *btree.BTree
cache map[uint64]immutableOrderedStore
}

func newContinueCache() *continueCache {
return &continueCache{
cache: make(map[uint64]orderedLister)}
func newStoreSnapshotter() *storeSnapshotter {
return &storeSnapshotter{
revisions: btree.New(32),
cache: make(map[uint64]immutableOrderedStore)}
}

func (c *continueCache) Get(rv uint64) (orderedLister, bool) {
func (c *storeSnapshotter) EqualOrLowerRV(rv uint64) (immutableOrderedStore, uint64) {
c.RLock()
defer c.RUnlock()
indexer, ok := c.cache[rv]
return indexer, ok

var equalOrLower uint64
c.revisions.DescendLessOrEqual(rev(rv), func(i btree.Item) bool {
equalOrLower = uint64(i.(rev))
return false
})
if equalOrLower == 0 {
return nil, 0
}
return c.cache[equalOrLower], equalOrLower
}

func (c *continueCache) Set(rv uint64, indexer orderedLister) {
func (c *storeSnapshotter) IncludesRV(rv uint64) bool {
c.RLock()
defer c.RUnlock()

minRV := c.revisions.Min()
if minRV == nil {
return false
}

return uint64(minRV.(rev)) <= rv
}

func (c *storeSnapshotter) CacheRV(rv uint64, indexer immutableOrderedStore) {
c.Lock()
defer c.Unlock()
if _, ok := c.cache[rv]; !ok {
heap.Push(&c.revisions, rv)
c.revisions.ReplaceOrInsert(rev(rv))
c.cache[rv] = indexer.Clone()
}
c.cache[rv] = indexer.Clone()
}

func (c *continueCache) Cleanup(rv uint64) {
func (c *storeSnapshotter) CleanUpToRV(rv uint64) {
c.Lock()
defer c.Unlock()
for len(c.revisions) > 0 && rv >= c.revisions[0] {
delete(c.cache, c.revisions[0])
heap.Pop(&c.revisions)
for c.revisions.Len() > 0 {
minRV := uint64(c.revisions.Min().(rev))
if rv < minRV {
break
}
delete(c.cache, minRV)
c.revisions.DeleteMin()
}
}

type minIntHeap []uint64

func (h minIntHeap) Len() int {
return len(h)
}

func (h minIntHeap) Less(i, j int) bool {
return h[i] < h[j]
}

func (h minIntHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *minIntHeap) Push(val interface{}) {
*h = append(*h, val.(uint64))
}

func (h *minIntHeap) Pop() interface{} {
old := *h

size := len(old)
val := old[size-1]
*h = old[:size-1]
type rev uint64

return val
func (r1 rev) Less(r2 btree.Item) bool {
return r1 < r2.(rev)
}
47 changes: 34 additions & 13 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,27 +177,48 @@ func testStoreIndexers() *cache.Indexers {
}

func TestContinueCacheCleanup(t *testing.T) {
cache := newContinueCache()
cache.Set(20, fakeOrderedLister{})
cache.Set(30, fakeOrderedLister{})
cache.Set(40, fakeOrderedLister{})
cache := newStoreSnapshotter()
cache.CacheRV(20, fakeOrderedLister{})
cache.CacheRV(30, fakeOrderedLister{})
cache.CacheRV(40, fakeOrderedLister{})
assert.Len(t, cache.cache, 3)
assert.Len(t, cache.revisions, 3)
cache.Cleanup(20)
assert.Equal(t, 3, cache.revisions.Len())
_, rv := cache.EqualOrLowerRV(19)
assert.Equal(t, uint64(0), rv)
_, rv = cache.EqualOrLowerRV(20)
assert.Equal(t, 20, int(rv))
_, rv = cache.EqualOrLowerRV(21)
assert.Equal(t, 20, int(rv))
_, rv = cache.EqualOrLowerRV(32)
assert.Equal(t, 30, int(rv))
_, rv = cache.EqualOrLowerRV(43)
assert.Equal(t, 40, int(rv))

cache.CleanUpToRV(20)
assert.Len(t, cache.cache, 2)
assert.Len(t, cache.revisions, 2)
cache.Set(20, fakeOrderedLister{})
cache.Set(20, fakeOrderedLister{})
assert.Equal(t, 2, cache.revisions.Len())
_, rv = cache.EqualOrLowerRV(21)
assert.Equal(t, uint64(0), rv)

cache.CacheRV(20, fakeOrderedLister{})
cache.CacheRV(20, fakeOrderedLister{})
assert.Len(t, cache.cache, 3)
assert.Len(t, cache.revisions, 3)
cache.Cleanup(40)
assert.Equal(t, 3, cache.revisions.Len())
_, rv = cache.EqualOrLowerRV(21)
assert.Equal(t, 20, int(rv))
cache.CleanUpToRV(40)
assert.Empty(t, cache.cache)
assert.Empty(t, cache.revisions)
assert.Empty(t, cache.revisions.Len())
_, rv = cache.EqualOrLowerRV(43)
assert.Equal(t, uint64(0), rv)
}

type fakeOrderedLister struct{}

func (f fakeOrderedLister) Clone() orderedLister { return f }
func (f fakeOrderedLister) Add(obj interface{}) error { return nil }
func (f fakeOrderedLister) Update(obj interface{}) error { return nil }
func (f fakeOrderedLister) Delete(obj interface{}) error { return nil }
func (f fakeOrderedLister) Clone() mutableOrderedStore { return f }
func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string, limit int) ([]interface{}, bool) {
return nil, false
}
Expand Down
Loading

0 comments on commit 111c1a9

Please sign in to comment.