From 3d52aac13c71cfb3ac28e5b3695c04648c96bc94 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 3 Mar 2015 16:16:50 -0500 Subject: [PATCH] genericetcd.Etcd should test resourceVersion Also fix that Update was returning AlreadyExists instead of NotFound (when create is disabled) and that Update when CreateOnUpdate was true was not populating the returned object. Added more tests --- pkg/registry/generic/etcd/etcd.go | 16 +- pkg/registry/generic/etcd/etcd_test.go | 263 +++++++++++++++++++++++-- pkg/tools/etcd_tools.go | 3 +- 3 files changed, 268 insertions(+), 14 deletions(-) diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 2b6c2b6404f77..b48797b7d42cb 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -17,6 +17,8 @@ limitations under the License. package etcd import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" @@ -228,6 +230,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool if err != nil { return nil, false, err } + // TODO: expose TTL creating := false out := e.NewFunc() err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) { @@ -237,7 +240,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool } if version == 0 { if !e.UpdateStrategy.AllowCreateOnUpdate() { - return nil, kubeerr.NewAlreadyExists(e.EndpointName, name) + return nil, kubeerr.NewNotFound(e.EndpointName, name) } creating = true if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { @@ -245,13 +248,22 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool } return obj, nil } + creating = false + newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj) + if err != nil { + return nil, err + } + if newVersion != version { + // TODO: return the most recent version to a client? + return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version)) + } if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { return nil, err } - // TODO: expose TTL return obj, nil }) + if err != nil { if creating { err = etcderr.InterpretCreateError(err, e.EndpointName, name) diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 372af96ec6e19..59d9ee1782cc0 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -32,19 +32,52 @@ import ( "github.com/coreos/go-etcd/etcd" ) +type testRESTStrategy struct { + runtime.ObjectTyper + api.NameGenerator + namespaceScoped bool + allowCreateOnUpdate bool +} + +func (t *testRESTStrategy) NamespaceScoped() bool { return t.namespaceScoped } +func (t *testRESTStrategy) AllowCreateOnUpdate() bool { return t.allowCreateOnUpdate } + +func (t *testRESTStrategy) ResetBeforeCreate(obj runtime.Object) {} +func (t *testRESTStrategy) Validate(obj runtime.Object) errors.ValidationErrorList { + return nil +} +func (t *testRESTStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList { + return nil +} + +func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { + return func(obj runtime.Object) bool { + actualPod := obj.(*api.Pod) + if !api.Semantic.DeepDerivative(pod.Status, actualPod.Status) { + t.Errorf("not a deep derivative %#v", actualPod) + return false + } + return api.HasObjectMetaSystemFieldValues(&actualPod.ObjectMeta) + } +} + func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false} return f, &Etcd{ - NewFunc: func() runtime.Object { return &api.Pod{} }, - NewListFunc: func() runtime.Object { return &api.PodList{} }, - EndpointName: "pods", - KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" }, + NewFunc: func() runtime.Object { return &api.Pod{} }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + EndpointName: "pods", + CreateStrategy: strategy, + UpdateStrategy: strategy, + KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" }, KeyFunc: func(ctx api.Context, id string) (string, error) { return path.Join("/registry/pods", id), nil }, - Helper: h, + ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil }, + Helper: h, } } @@ -153,11 +186,11 @@ func TestEtcdList(t *testing.T) { func TestEtcdCreate(t *testing.T) { podA := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Status: api.PodStatus{Host: "machine"}, } podB := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Status: api.PodStatus{Host: "machine2"}, } @@ -178,18 +211,98 @@ func TestEtcdCreate(t *testing.T) { } path := "/registry/pods/foo" - key := "foo" table := map[string]struct { existing tools.EtcdResponseWithError expect tools.EtcdResponseWithError toCreate runtime.Object + objOK func(obj runtime.Object) bool errOK func(error) bool }{ "normal": { existing: emptyNode, + toCreate: podA, + objOK: hasCreated(t, podA), + errOK: func(err error) bool { return err == nil }, + }, + "preExisting": { + existing: nodeWithPodA, expect: nodeWithPodA, + toCreate: podB, + errOK: errors.IsAlreadyExists, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + obj, err := registry.Create(api.NewDefaultContext(), item.toCreate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + actual := fakeClient.Data[path] + if item.objOK != nil { + if !item.objOK(obj) { + t.Errorf("%v: unexpected returned: %v", name, obj) + } + actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value)) + if err != nil { + t.Errorf("unable to decode stored value for %#v", actual) + continue + } + if !item.objOK(actualObj) { + t.Errorf("%v: unexpected response: %v", name, actual) + } + } else { + if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } + } +} + +// DEPRECATED +func TestEtcdCreateWithName(t *testing.T) { + podA := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Status: api.PodStatus{Host: "machine"}, + } + podB := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Status: api.PodStatus{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toCreate runtime.Object + objOK func(obj runtime.Object) bool + errOK func(error) bool + }{ + "normal": { + existing: emptyNode, toCreate: podA, + objOK: hasCreated(t, podA), errOK: func(err error) bool { return err == nil }, }, "preExisting": { @@ -203,18 +316,146 @@ func TestEtcdCreate(t *testing.T) { for name, item := range table { fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient.Data[path] = item.existing - err := registry.CreateWithName(api.NewContext(), key, item.toCreate) + err := registry.CreateWithName(api.NewDefaultContext(), key, item.toCreate) if !item.errOK(err) { t.Errorf("%v: unexpected error: %v", name, err) } - if e, a := item.expect, fakeClient.Data[path]; !api.Semantic.DeepDerivative(e, a) { - t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + actual := fakeClient.Data[path] + if item.objOK != nil { + obj, err := api.Scheme.Decode([]byte(actual.R.Node.Value)) + if err != nil { + t.Errorf("unable to decode stored value for %#v", actual) + continue + } + if !item.objOK(obj) { + t.Errorf("%v: unexpected response: %v", name, actual) + } + } else { + if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } } } } func TestEtcdUpdate(t *testing.T) { + podA := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Status: api.PodStatus{Host: "machine"}, + } + podB := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "1"}, + Status: api.PodStatus{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + newerNodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 2, + CreatedIndex: 1, + }, + }, + E: nil, + } + + nodeWithPodB := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podB), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toUpdate runtime.Object + allowCreate bool + objOK func(obj runtime.Object) bool + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: nodeWithPodB, + toUpdate: podB, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: emptyNode, + toUpdate: podA, + errOK: func(err error) bool { return errors.IsNotFound(err) }, + }, + "createIfNotFound": { + existing: emptyNode, + toUpdate: podA, + allowCreate: true, + objOK: hasCreated(t, podA), + errOK: func(err error) bool { return err == nil }, + }, + "outOfDate": { + existing: newerNodeWithPodA, + expect: newerNodeWithPodA, + toUpdate: podB, + errOK: func(err error) bool { return errors.IsConflict(err) }, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = item.allowCreate + fakeClient.Data[path] = item.existing + obj, _, err := registry.Update(api.NewDefaultContext(), item.toUpdate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + actual := fakeClient.Data[path] + if item.objOK != nil { + if !item.objOK(obj) { + t.Errorf("%v: unexpected returned: %#v", name, obj) + } + actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value)) + if err != nil { + t.Errorf("unable to decode stored value for %#v", actual) + continue + } + if !item.objOK(actualObj) { + t.Errorf("%v: unexpected response: %#v", name, actual) + } + } else { + if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } + } +} + +// DEPRECATED +func TestEtcdUpdateWithName(t *testing.T) { podA := &api.Pod{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Status: api.PodStatus{Host: "machine"}, diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 751b03c320fbf..59a1c01d38868 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -398,10 +398,11 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo // First time this key has been used, try creating new value. if index == 0 { - _, err = h.Client.Create(key, string(data), 0) + response, err := h.Client.Create(key, string(data), 0) if IsEtcdNodeExist(err) { continue } + _, _, err = h.extractObj(response, err, ptrToType, false, false) return err }