Skip to content

Commit

Permalink
Implement match exact rv in watch cache
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Nov 24, 2024
1 parent 7ccf813 commit f81c52b
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
"k8s.io/client-go/tools/cache"
testingclock "k8s.io/utils/clock/testing"
)

Expand Down Expand Up @@ -293,7 +292,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) {
}

const numObjects = 10
store := cache.NewIndexer(storeElementKey, storeElementIndexers(nil))
store := newStoreIndexer(nil)

for i := 0; i < numObjects; i++ {
elem := makeTestStoreElement(makeTestPod(fmt.Sprintf("pod-%d", i), uint64(i)))
Expand Down
19 changes: 14 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,19 +773,28 @@ func (c *Cacher) shouldDelegateList(opts storage.ListOptions) (bool, error) {
isLegacyResourceVersionMatchExact := opts.ResourceVersionMatch == "" && opts.Predicate.Limit > 0 && nonEmptyResourceVersion
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
switch {
case nonEmptyResourceVersion && len(opts.Predicate.Continue) > 0:
_, _, err := storage.DecodeContinue(opts.Predicate.Continue, c.resourcePrefix)
if err != nil {
return false, err
}
return false, errors.NewBadRequest("specifying resource version is not allowed when using continue")
case opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact || isLegacyResourceVersionMatchExact:
return true, nil
rv, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return false, err
}
_, _, isCached := c.watchCache.continueCache.FindEqualOrLower(rv)
return !isCached, nil
case opts.ResourceVersionMatch != "" && opts.ResourceVersionMatch != metav1.ResourceVersionMatchNotOlderThan:
return false, fmt.Errorf("unsupported")
case len(opts.Predicate.Continue) > 0:
_, rv, err := storage.DecodeContinue(opts.Predicate.Continue, c.resourcePrefix)
if err != nil {
return false, err
}
if nonEmptyResourceVersion {
return false, errors.NewBadRequest("specifying resource version is not allowed when using continue")
}
_, isCached := c.watchCache.continueCache.Get(uint64(rv))
_, _, isCached := c.watchCache.continueCache.FindEqualOrLower(uint64(rv))

return !isCached, nil
case opts.ResourceVersion == "":
consistentListFromCacheEnabled := utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,23 +212,27 @@ func TestGetListCacheBypass(t *testing.T) {

{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{2}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{3}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3}}, cachedRevisions: []uint64{3}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},

{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{3}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: continueOnRev3, Limit: 500}}, cachedRevisions: []uint64{3}, expectBypass: false},

// Legacy exact match
{opts: storage.ListOptions{ResourceVersion: "2", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "1", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "2", Predicate: storage.SelectionPredicate{Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "2", Predicate: storage.SelectionPredicate{Limit: 500}}, cachedRevisions: []uint64{3}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "3", Predicate: storage.SelectionPredicate{Limit: 500}}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, cachedRevisions: []uint64{2}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "2", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, cachedRevisions: []uint64{3}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "3", ResourceVersionMatch: metav1.ResourceVersionMatchExact}, cachedRevisions: []uint64{2}, expectBypass: false},
}

t.Run("ConsistentListFromStorage", func(t *testing.T) {
Expand Down
20 changes: 15 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,9 @@ const (
btreeDegree = 16
)

// storeIndexer is the basic interface for caching latest state.
type storeIndexer interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
mutableStore
List() []interface{}
ListKeys() []string
Get(obj interface{}) (item interface{}, exists bool, err error)
Expand All @@ -74,10 +73,21 @@ type storeIndexer interface {
ByIndex(indexName, indexedValue string) ([]interface{}, error)
}

type orderedLister interface {
type immutableOrderedStore interface {
Count(prefix, continueKey string) (count int)
ListPrefix(prefix, continueKey string, limit int) (items []interface{}, hasMore bool)
Clone() orderedLister
Clone() mutableOrderedStore
}

type mutableOrderedStore interface {
mutableStore
immutableOrderedStore
}

type mutableStore interface {
Add(obj interface{}) error
Update(obj interface{}) error
Delete(obj interface{}) error
}

func newStoreIndexer(indexers *cache.Indexers) storeIndexer {
Expand Down
75 changes: 35 additions & 40 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/store_btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package cacher

import (
"container/heap"
"fmt"
"math"
"strings"
Expand Down Expand Up @@ -51,7 +50,7 @@ func (si *threadedStoreIndexer) Count(prefix, continueKey string) (count int) {
return si.store.Count(prefix, continueKey)
}

func (si *threadedStoreIndexer) Clone() orderedLister {
func (si *threadedStoreIndexer) Clone() immutableOrderedStore {
si.lock.RLock()
defer si.lock.RUnlock()
return si.store.Clone()
Expand Down Expand Up @@ -294,7 +293,7 @@ func (s *btreeStore) Count(prefix, continueKey string) (count int) {
return count
}

func (s *btreeStore) Clone() orderedLister {
func (s *btreeStore) Clone() mutableOrderedStore {
return &btreeStore{
tree: s.tree.Clone(),
}
Expand Down Expand Up @@ -421,64 +420,60 @@ func (i *indexer) delete(key, value string, index map[string]map[string]*storeEl
// created is removed from the watchCache.
type continueCache struct {
sync.RWMutex
revisions minIntHeap
cache map[uint64]orderedLister
revisions *btree.BTree
cache map[uint64]immutableOrderedStore
}

func newContinueCache() *continueCache {
return &continueCache{
cache: make(map[uint64]orderedLister)}
revisions: btree.New(32),
cache: make(map[uint64]immutableOrderedStore)}
}

func (c *continueCache) Get(rv uint64) (orderedLister, bool) {
func (c *continueCache) FindEqualOrLower(rv uint64) (indexer immutableOrderedStore, foundRV uint64, ok bool) {
c.RLock()
defer c.RUnlock()
indexer, ok := c.cache[rv]
return indexer, ok

indexer, ok = c.cache[rv]
if ok {
return indexer, rv, ok
}

c.revisions.DescendLessOrEqual(rev(rv), func(i btree.Item) bool {
foundRV = uint64(i.(rev))
ok = true
return false
})
if ok {
indexer, ok = c.cache[foundRV]
}
return indexer, foundRV, ok
}

func (c *continueCache) Set(rv uint64, indexer orderedLister) {
func (c *continueCache) Set(rv uint64, indexer immutableOrderedStore) {
c.Lock()
defer c.Unlock()
if _, ok := c.cache[rv]; !ok {
heap.Push(&c.revisions, rv)
c.revisions.ReplaceOrInsert(rev(rv))
c.cache[rv] = indexer.Clone()
}
c.cache[rv] = indexer.Clone()
}

func (c *continueCache) Cleanup(rv uint64) {
c.Lock()
defer c.Unlock()
for len(c.revisions) > 0 && rv >= c.revisions[0] {
delete(c.cache, c.revisions[0])
heap.Pop(&c.revisions)
for c.revisions.Len() > 0 {
minRV := uint64(c.revisions.Min().(rev))
if rv < minRV {
break
}
delete(c.cache, minRV)
c.revisions.DeleteMin()
}
}

type minIntHeap []uint64

func (h minIntHeap) Len() int {
return len(h)
}

func (h minIntHeap) Less(i, j int) bool {
return h[i] < h[j]
}

func (h minIntHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *minIntHeap) Push(val interface{}) {
*h = append(*h, val.(uint64))
}

func (h *minIntHeap) Pop() interface{} {
old := *h

size := len(old)
val := old[size-1]
*h = old[:size-1]
type rev uint64

return val
func (r1 rev) Less(r2 btree.Item) bool {
return r1 < r2.(rev)
}
36 changes: 31 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/storage/cacher/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,48 @@ func TestContinueCacheCleanup(t *testing.T) {
cache.Set(30, fakeOrderedLister{})
cache.Set(40, fakeOrderedLister{})
assert.Len(t, cache.cache, 3)
assert.Len(t, cache.revisions, 3)
assert.Equal(t, 3, cache.revisions.Len())
_, _, ok := cache.FindEqualOrLower(19)
assert.False(t, ok)
_, rv, ok := cache.FindEqualOrLower(20)
assert.True(t, ok)
assert.Equal(t, 20, int(rv))
_, rv, ok = cache.FindEqualOrLower(21)
assert.True(t, ok)
assert.Equal(t, 20, int(rv))
_, rv, ok = cache.FindEqualOrLower(32)
assert.True(t, ok)
assert.Equal(t, 30, int(rv))
_, rv, ok = cache.FindEqualOrLower(43)
assert.True(t, ok)
assert.Equal(t, 40, int(rv))

cache.Cleanup(20)
assert.Len(t, cache.cache, 2)
assert.Len(t, cache.revisions, 2)
assert.Equal(t, 2, cache.revisions.Len())
_, _, ok = cache.FindEqualOrLower(21)
assert.False(t, ok)

cache.Set(20, fakeOrderedLister{})
cache.Set(20, fakeOrderedLister{})
assert.Len(t, cache.cache, 3)
assert.Len(t, cache.revisions, 3)
assert.Equal(t, 3, cache.revisions.Len())
_, rv, ok = cache.FindEqualOrLower(21)
assert.True(t, ok)
assert.Equal(t, 20, int(rv))
cache.Cleanup(40)
assert.Empty(t, cache.cache)
assert.Empty(t, cache.revisions)
assert.Empty(t, cache.revisions.Len())
_, _, ok = cache.FindEqualOrLower(43)
assert.False(t, ok)
}

type fakeOrderedLister struct{}

func (f fakeOrderedLister) Clone() orderedLister { return f }
func (f fakeOrderedLister) Add(obj interface{}) error { return nil }
func (f fakeOrderedLister) Update(obj interface{}) error { return nil }
func (f fakeOrderedLister) Delete(obj interface{}) error { return nil }
func (f fakeOrderedLister) Clone() mutableOrderedStore { return f }
func (f fakeOrderedLister) ListPrefix(prefixKey, continueKey string, limit int) ([]interface{}, bool) {
return nil, false
}
Expand Down
Loading

0 comments on commit f81c52b

Please sign in to comment.