From 143015025a01d4bd0146b070b20ea58df1ccf4a3 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 4 Mar 2015 18:22:48 -0500 Subject: [PATCH] Support TTL in genericetcd#Update --- pkg/registry/etcd/etcd.go | 4 ++-- pkg/registry/generic/etcd/etcd.go | 32 ++++++++++++++++++++++--------- pkg/registry/pod/etcd/etcd.go | 8 ++++---- pkg/tools/etcd_tools.go | 15 ++++++++------- pkg/tools/etcd_tools_test.go | 24 +++++++++++------------ 5 files changed, 49 insertions(+), 34 deletions(-) diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 479d60f8de5f4..be03bea3f52db 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -322,9 +322,9 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er } // TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop. err = r.AtomicUpdate(key, &api.Endpoints{}, true, - func(input runtime.Object) (runtime.Object, error) { + func(input runtime.Object) (runtime.Object, uint64, error) { // TODO: racy - label query is returning different results for two simultaneous updaters - return endpoints, nil + return endpoints, 0, nil }) return etcderr.InterpretUpdateError(err, "endpoints", endpoints.Name) } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index a746ebd01fb77..2d9a1deba8c28 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -251,35 +251,49 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool // TODO: expose TTL creating := false out := e.NewFunc() - err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) { + err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, uint64, error) { version, err := e.Helper.ResourceVersioner.ResourceVersion(existing) if err != nil { - return nil, err + return nil, 0, err } if version == 0 { if !e.UpdateStrategy.AllowCreateOnUpdate() { - return nil, kubeerr.NewNotFound(e.EndpointName, name) + return nil, 0, kubeerr.NewNotFound(e.EndpointName, name) } creating = true if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { - return nil, err + return nil, 0, err + } + ttl := uint64(0) + if e.TTLFunc != nil { + ttl, err = e.TTLFunc(obj, true) + if err != nil { + return nil, 0, err + } } - return obj, nil + return obj, ttl, nil } creating = false newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj) if err != nil { - return nil, err + return nil, 0, err } if newVersion != version { // TODO: return the most recent version to a client? - return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version)) + return nil, 0, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version)) } if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { - return nil, err + return nil, 0, err } - return obj, nil + ttl := uint64(0) + if e.TTLFunc != nil { + ttl, err = e.TTLFunc(obj, false) + if err != nil { + return nil, 0, err + } + } + return obj, ttl, nil }) if err != nil { diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index 6878599b347d5..ff751f7d3c571 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -160,18 +160,18 @@ func (r *BindingREST) setPodHostTo(ctx api.Context, podID, oldMachine, machine s if err != nil { return nil, err } - err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) { + err = r.store.Helper.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, uint64, error) { pod, ok := obj.(*api.Pod) if !ok { - return nil, fmt.Errorf("unexpected object: %#v", obj) + return nil, 0, fmt.Errorf("unexpected object: %#v", obj) } if pod.Spec.Host != oldMachine || pod.Status.Host != oldMachine { - return nil, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host) + return nil, 0, fmt.Errorf("pod %v is already assigned to host %q or %q", pod.Name, pod.Spec.Host, pod.Status.Host) } pod.Spec.Host = machine pod.Status.Host = machine finalPod = pod - return pod, nil + return pod, 0, nil }) return finalPod, err } diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 2af7eb3a03835..c92f2e505a8c5 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -357,7 +357,7 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err // Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update. // See the comment for AtomicUpdate for more detail. -type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error) +type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, ttl uint64, err error) // AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects. // Note, tryUpdate may be called more than once. @@ -365,7 +365,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error // Example: // // h := &util.EtcdHelper{client, encoding, versioning} -// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, error) { +// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, uint64, error) { // // Before this function is called, currentObj has been reset to etcd's current // // contents for "myKey". // @@ -374,8 +374,9 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error // // Make a *modification*. // cur.Counter++ // -// // Return the modified object. Return an error to stop iterating. -// return cur, nil +// // Return the modified object. Return an error to stop iterating. Return a non-zero uint64 to set +// // the TTL on the object. +// return cur, 0, nil // }) // func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error { @@ -391,7 +392,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo return err } - ret, err := tryUpdate(obj) + ret, ttl, err := tryUpdate(obj) if err != nil { return err } @@ -403,7 +404,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo // First time this key has been used, try creating new value. if index == 0 { - response, err := h.Client.Create(key, string(data), 0) + response, err := h.Client.Create(key, string(data), ttl) if IsEtcdNodeExist(err) { continue } @@ -415,7 +416,7 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo return nil } - response, err := h.Client.CompareAndSwap(key, string(data), 0, origBody, index) + response, err := h.Client.CompareAndSwap(key, string(data), ttl, origBody, index) if IsEtcdTestFailed(err) { continue } diff --git a/pkg/tools/etcd_tools_test.go b/pkg/tools/etcd_tools_test.go index 58d317e2f72dd..20ebfb1a87ace 100644 --- a/pkg/tools/etcd_tools_test.go +++ b/pkg/tools/etcd_tools_test.go @@ -505,8 +505,8 @@ func TestAtomicUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { - return obj, nil + err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + return obj, 0, nil }) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -524,14 +524,14 @@ func TestAtomicUpdate(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { + err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { callbackCalled = true if in.(*TestResource).Value != 1 { t.Errorf("Callback input was not current set value") } - return objUpdate, nil + return objUpdate, 0, nil }) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -559,8 +559,8 @@ func TestAtomicUpdateNoChange(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet("/some/key") obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { - return obj, nil + err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { + return obj, 0, nil }) if err != nil { t.Errorf("Unexpected error %#v", err) @@ -569,10 +569,10 @@ func TestAtomicUpdateNoChange(t *testing.T) { // Update an existing node with the same data callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { + err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { fakeClient.Err = errors.New("should not be called") callbackCalled = true - return objUpdate, nil + return objUpdate, 0, nil }) if err != nil { t.Fatalf("Unexpected error %#v", err) @@ -591,8 +591,8 @@ func TestAtomicUpdateKeyNotFound(t *testing.T) { fakeClient.ExpectNotFoundGet("/some/key") obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - f := func(in runtime.Object) (runtime.Object, error) { - return obj, nil + f := func(in runtime.Object) (runtime.Object, uint64, error) { + return obj, 0, nil } ignoreNotFound := false @@ -627,7 +627,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { defer wgDone.Done() firstCall := true - err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) { + err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, uint64, error) { defer func() { firstCall = false }() if firstCall { @@ -638,7 +638,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) { currValue := in.(*TestResource).Value obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: currValue + 1} - return obj, nil + return obj, 0, nil }) if err != nil { t.Errorf("Unexpected error %#v", err)