Skip to content

Commit

Permalink
Merge pull request kubernetes#18207 from wojtek-t/string_resource_ver…
Browse files Browse the repository at this point in the history
…sion

Change resourceVersion to string in storage.Interface
  • Loading branch information
wojtek-t committed Dec 9, 2015
2 parents c1828b3 + 793da62 commit 0369805
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 54 deletions.
14 changes: 3 additions & 11 deletions pkg/registry/generic/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,7 @@ func (e *Etcd) ListPredicate(ctx api.Context, m generic.Matcher, options *unvers
if options == nil {
options = &unversioned.ListOptions{ResourceVersion: "0"}
}
version, err := storage.ParseWatchResourceVersion(options.ResourceVersion, e.EndpointName)
if err != nil {
return nil, err
}
err = e.Storage.List(ctx, e.KeyRootFunc(ctx), version, filterFunc, list)
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, filterFunc, list)
return list, etcderr.InterpretListError(err, e.EndpointName)
}

Expand Down Expand Up @@ -479,23 +475,19 @@ func (e *Etcd) Watch(ctx api.Context, options *unversioned.ListOptions) (watch.I

// WatchPredicate starts a watch for the items that m matches.
func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := storage.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {
return nil, err
}
filterFunc := e.filterAndDecorateFunction(m)

if name, ok := m.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
if err != nil {
return nil, err
}
return e.Storage.Watch(ctx, key, version, filterFunc)
return e.Storage.Watch(ctx, key, resourceVersion, filterFunc)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}

return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), version, filterFunc)
return e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, filterFunc)
}

func (e *Etcd) filterAndDecorateFunction(m generic.Matcher) func(runtime.Object) bool {
Expand Down
32 changes: 19 additions & 13 deletions pkg/storage/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,12 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object) err
}

// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}

// Do NOT allow Watch to start when the underlying structures are not propagated.
c.usable.RLock()
defer c.usable.RUnlock()
Expand All @@ -235,7 +240,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64,
// underlying watchCache is calling processEvent under its lock.
c.watchCache.RLock()
defer c.watchCache.RUnlock()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(resourceVersion)
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
return nil, err
}
Expand All @@ -249,7 +254,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion uint64,
}

// Implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, filter)
}

Expand All @@ -264,11 +269,16 @@ func (c *Cacher) GetToList(ctx context.Context, key string, filter FilterFunc, l
}

// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error {
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error {
if !c.ListFromCache {
return c.storage.List(ctx, key, resourceVersion, filter, listObj)
}

listRV, err := ParseListResourceVersion(resourceVersion)
if err != nil {
return err
}

// To avoid situation when List is proceesed before the underlying
// watchCache is propagated for the first time, we acquire and immediately
// release the 'usable' lock.
Expand All @@ -277,7 +287,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f
c.usable.RLock()
c.usable.RUnlock()

// List elements from cache, with at least 'resourceVersion'.
// List elements from cache, with at least 'listRV'.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
Expand All @@ -288,7 +298,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f
}
filterFunc := filterFunction(key, c.keyFunc, filter)

objs, resourceVersion := c.watchCache.WaitUntilFreshAndList(resourceVersion)
objs, readResourceVersion := c.watchCache.WaitUntilFreshAndList(listRV)
for _, obj := range objs {
object, ok := obj.(runtime.Object)
if !ok {
Expand All @@ -299,7 +309,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion uint64, f
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, resourceVersion); err != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
return err
}
}
Expand Down Expand Up @@ -389,19 +399,15 @@ func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFun
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options unversioned.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, 0, Everything, list); err != nil {
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
return nil, err
}
return list, nil
}

// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options unversioned.ListOptions) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(options.ResourceVersion, lw.resourcePrefix)
if err != nil {
return nil, err
}
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, version, Everything)
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}

// cacherWatch implements watch.Interface
Expand Down
20 changes: 12 additions & 8 deletions pkg/storage/cacher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestList(t *testing.T) {
result := &api.PodList{}
// TODO: We need to pass ResourceVersion of barPod deletion operation.
// However, there is no easy way to get it, so it is hardcoded to 8.
if err := cacher.List(context.TODO(), "pods/ns", 8, storage.Everything, result); err != nil {
if err := cacher.List(context.TODO(), "pods/ns", "8", storage.Everything, result); err != nil {
t.Errorf("Unexpected error: %v", err)
}
if result.ListMeta.ResourceVersion != "8" {
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestWatch(t *testing.T) {
podFooBis.Spec.NodeName = "anotherFakeNode"

// Set up Watch for object "podFoo".
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -192,17 +192,19 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)

// Check whether we get too-old error.
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", 1, storage.Everything)
_, err = cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
if err == nil {
t.Errorf("Expected 'error too old' error")
}

// Now test watch with initial state.
// We want to observe fooCreation too, so need to pass smaller resource version.
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
}
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), storage.Everything)
initialVersion--
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -211,7 +213,7 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)

// Now test watch from "now".
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", 0, storage.Everything)
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -229,14 +231,14 @@ func TestWatcherTimeout(t *testing.T) {
cacher := newTestCacher(etcdStorage)

// Create a watcher that will not be reading any result.
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer watcher.Stop()

// Create a second watcher that will be reading result.
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", 1, storage.Everything)
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -281,11 +283,13 @@ func TestFiltering(t *testing.T) {
}
return selector.Matches(labels.Set(metadata.Labels()))
}
// We want to observe fooCreation too, so need to pass smaller resource version.
initialVersion, err := strconv.Atoi(fooCreated.ResourceVersion)
if err != nil {
t.Fatalf("Incorrect resourceVersion: %s", fooCreated.ResourceVersion)
}
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", uint64(initialVersion), filter)
initialVersion--
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", strconv.Itoa(initialVersion), filter)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/storage/etcd/etcd_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,32 @@ func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object)
}

// Implements storage.Interface.
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
go w.etcdWatch(h.client, key, watchRV)
return w, nil
}

// Implements storage.Interface.
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) {
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
if ctx == nil {
glog.Errorf("Context is nil")
}
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h)
go w.etcdWatch(h.client, key, resourceVersion)
go w.etcdWatch(h.client, key, watchRV)
return w, nil
}

Expand Down Expand Up @@ -352,7 +360,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFun
}

// Implements storage.Interface.
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion uint64, filter storage.FilterFunc, listObj runtime.Object) error {
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error {
if ctx == nil {
glog.Errorf("Context is nil")
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/etcd/etcd_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestList(t *testing.T) {
var got api.PodList
// TODO: a sorted filter function could be applied such implied
// ordering on the returned list doesn't matter.
err := helper.List(context.TODO(), key, 0, storage.Everything, &got)
err := helper.List(context.TODO(), key, "", storage.Everything, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand Down Expand Up @@ -156,7 +156,7 @@ func TestListFiltered(t *testing.T) {
}

var got api.PodList
err := helper.List(context.TODO(), key, 0, filter, &got)
err := helper.List(context.TODO(), key, "", filter, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand Down Expand Up @@ -206,7 +206,7 @@ func TestListAcrossDirectories(t *testing.T) {
list.Items[2] = *returnedObj

var got api.PodList
err := roothelper.List(context.TODO(), rootkey, 0, storage.Everything, &got)
err := roothelper.List(context.TODO(), rootkey, "", storage.Everything, &got)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/storage/etcd/etcd_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func TestWatch(t *testing.T) {
key := "/some/key"
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())

watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestWatchEtcdState(t *testing.T) {
defer server.Terminate(t)

h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -353,7 +353,7 @@ func TestWatchFromZeroIndex(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}

watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -388,7 +388,7 @@ func TestWatchListFromZeroIndex(t *testing.T) {
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, key)

watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestWatchListIgnoresRootKey(t *testing.T) {
defer server.Terminate(t)
h := newEtcdHelper(server.Client, codec, key)

watching, err := h.WatchList(context.TODO(), key, 0, storage.Everything)
watching, err := h.WatchList(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down Expand Up @@ -451,7 +451,7 @@ func TestWatchPurposefulShutdown(t *testing.T) {
h := newEtcdHelper(server.Client, codec, etcdtest.PathPrefix())

// Test purposeful shutdown
watching, err := h.Watch(context.TODO(), key, 0, storage.Everything)
watching, err := h.Watch(context.TODO(), key, "0", storage.Everything)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,13 @@ type Interface interface {
// and any items passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
Watch(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
Watch(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)

// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
WatchList(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
WatchList(ctx context.Context, key string, resourceVersion string, filter FilterFunc) (watch.Interface, error)

// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
Expand All @@ -118,7 +118,7 @@ type Interface interface {
// into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
List(ctx context.Context, key string, resourceVersion uint64, filter FilterFunc, listObj runtime.Object) error
List(ctx context.Context, key string, resourceVersion string, filter FilterFunc, listObj runtime.Object) error

// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict.
Expand Down
Loading

0 comments on commit 0369805

Please sign in to comment.