Skip to content

Commit

Permalink
Merge pull request kubernetes#35218 from wojtek-t/get_to_list_support…
Browse files Browse the repository at this point in the history
…_resource_version

Automatic merge from submit-queue

Support resourceVersion in GetToList - unify interface of List and Ge…

This pretty much unifies the interface of List() and GetToList() methods of storage interface.

I'm going to use it in a subsequent PR to improve performance of the whole cluster.
  • Loading branch information
Kubernetes Submit Queue authored Oct 21, 2016
2 parents 0dbd954 + 93c008f commit 850c586
Showing 8 changed files with 114 additions and 16 deletions.
8 changes: 4 additions & 4 deletions pkg/registry/generic/registry/store.go
Original file line number Diff line number Diff line change
@@ -201,18 +201,18 @@ func (e *Store) List(ctx api.Context, options *api.ListOptions) (runtime.Object,

// ListPredicate returns a list of all the items matching m.
func (e *Store) ListPredicate(ctx api.Context, p storage.SelectionPredicate, options *api.ListOptions) (runtime.Object, error) {
if options == nil {
options = &api.ListOptions{ResourceVersion: "0"}
}
list := e.NewListFunc()
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, p, list)
err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}

if options == nil {
options = &api.ListOptions{ResourceVersion: "0"}
}
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)
return list, storeerr.InterpretListError(err, e.QualifiedResource)
}
58 changes: 54 additions & 4 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
@@ -344,8 +344,59 @@ func (c *Cacher) Get(ctx context.Context, key string, objPtr runtime.Object, ign
}

// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, pred SelectionPredicate, listObj runtime.Object) error {
return c.storage.GetToList(ctx, key, pred, listObj)
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
if resourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}

// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}

trace := util.NewTrace(fmt.Sprintf("cacher %v: List", c.objectType.String()))
defer trace.LogIfLong(500 * time.Millisecond)

c.ready.wait()
trace.Step("Ready")

// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil || listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterFunction(key, pred)

obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return fmt.Errorf("failed to wait for fresh list: %v", err)
}
trace.Step("Got from cache")

if exists {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Object) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
return err
}
}
return nil
}

// Implements storage.Interface.
@@ -359,7 +410,6 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.

listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
@@ -371,7 +421,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
c.ready.wait()
trace.Step("Ready")

// List elements from cache, with at least 'listRV'.
// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
2 changes: 1 addition & 1 deletion pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
@@ -297,7 +297,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
}

// Implements storage.Interface.
func (h *etcdHelper) GetToList(ctx context.Context, key string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
2 changes: 1 addition & 1 deletion pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
@@ -270,7 +270,7 @@ func (s *store) GuaranteedUpdate(ctx context.Context, key string, out runtime.Ob
}

// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
2 changes: 1 addition & 1 deletion pkg/storage/etcd3/store_test.go
Original file line number Diff line number Diff line change
@@ -253,7 +253,7 @@ func TestGetToList(t *testing.T) {

for i, tt := range tests {
out := &api.PodList{}
err := store.GetToList(ctx, tt.key, tt.pred, out)
err := store.GetToList(ctx, tt.key, "", tt.pred, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
4 changes: 3 additions & 1 deletion pkg/storage/interfaces.go
Original file line number Diff line number Diff line change
@@ -128,7 +128,9 @@ type Interface interface {

// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
GetToList(ctx context.Context, key string, p SelectionPredicate, listObj runtime.Object) error
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error

// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
30 changes: 26 additions & 4 deletions pkg/storage/watch_cache.go
Original file line number Diff line number Diff line change
@@ -245,8 +245,10 @@ func (w *watchCache) List() []interface{} {
return w.store.List()
}

// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
// waitUntilFreshAndBlock waits until cache is at least as fresh as given <resourceVersion>.
// NOTE: This function acquired lock and doesn't release it.
// You HAVE TO explicitly call w.RUnlock() after this function.
func (w *watchCache) waitUntilFreshAndBlock(resourceVersion uint64, trace *util.Trace) error {
startTime := w.clock.Now()
go func() {
// Wake us up when the time limit has expired. The docs
@@ -261,22 +263,42 @@ func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.T
}()

w.RLock()
defer w.RUnlock()
if trace != nil {
trace.Step("watchCache locked acquired")
}
for w.resourceVersion < resourceVersion {
if w.clock.Since(startTime) >= MaximumListWait {
return nil, 0, fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
return fmt.Errorf("time limit exceeded while waiting for resource version %v (current value: %v)", resourceVersion, w.resourceVersion)
}
w.cond.Wait()
}
if trace != nil {
trace.Step("watchCache fresh enough")
}
return nil
}

// WaitUntilFreshAndList returns list of pointers to <storeElement> objects.
func (w *watchCache) WaitUntilFreshAndList(resourceVersion uint64, trace *util.Trace) ([]interface{}, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, 0, err
}
return w.store.List(), w.resourceVersion, nil
}

// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
func (w *watchCache) WaitUntilFreshAndGet(resourceVersion uint64, key string, trace *util.Trace) (interface{}, bool, uint64, error) {
err := w.waitUntilFreshAndBlock(resourceVersion, trace)
defer w.RUnlock()
if err != nil {
return nil, false, 0, err
}
value, exists, err := w.store.GetByKey(key)
return value, exists, w.resourceVersion, err
}

func (w *watchCache) ListKeys() []string {
w.RLock()
defer w.RUnlock()
24 changes: 24 additions & 0 deletions pkg/storage/watch_cache_test.go
Original file line number Diff line number Diff line change
@@ -263,6 +263,30 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}
}

func TestWaitUntilFreshAndGet(t *testing.T) {
store := newTestWatchCache(3)

// In background, update the store.
go func() {
store.Add(makeTestPod("foo", 2))
store.Add(makeTestPod("bar", 5))
}()

obj, exists, resourceVersion, err := store.WaitUntilFreshAndGet(5, "prefix/ns/bar", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resourceVersion != 5 {
t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion)
}
if !exists {
t.Fatalf("no results returned: %#v", obj)
}
if !api.Semantic.DeepEqual(&storeElement{Key: "prefix/ns/bar", Object: makeTestPod("bar", 5)}, obj) {
t.Errorf("unexpected element returned: %#v", obj)
}
}

func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store := newTestWatchCache(3)
fc := store.clock.(*clock.FakeClock)

0 comments on commit 850c586

Please sign in to comment.