diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 2f977e4074cea..e9b8b3e4e385d 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -17,6 +17,8 @@ limitations under the License. package etcd import ( + "fmt" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd" @@ -228,6 +230,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool if err != nil { return nil, false, err } + // TODO: expose TTL creating := false out := e.NewFunc() err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) { @@ -237,7 +240,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool } if version == 0 { if !e.UpdateStrategy.AllowCreateOnUpdate() { - return nil, kubeerr.NewAlreadyExists(e.EndpointName, name) + return nil, kubeerr.NewNotFound(e.EndpointName, name) } creating = true if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil { @@ -245,13 +248,22 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool } return obj, nil } + creating = false + newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj) + if err != nil { + return nil, err + } + if newVersion != version { + // TODO: return the most recent version to a client? + return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version)) + } if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil { return nil, err } - // TODO: expose TTL return obj, nil }) + if err != nil { if creating { err = etcderr.InterpretCreateError(err, e.EndpointName, name) diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 372af96ec6e19..59d9ee1782cc0 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -32,19 +32,52 @@ import ( "github.com/coreos/go-etcd/etcd" ) +type testRESTStrategy struct { + runtime.ObjectTyper + api.NameGenerator + namespaceScoped bool + allowCreateOnUpdate bool +} + +func (t *testRESTStrategy) NamespaceScoped() bool { return t.namespaceScoped } +func (t *testRESTStrategy) AllowCreateOnUpdate() bool { return t.allowCreateOnUpdate } + +func (t *testRESTStrategy) ResetBeforeCreate(obj runtime.Object) {} +func (t *testRESTStrategy) Validate(obj runtime.Object) errors.ValidationErrorList { + return nil +} +func (t *testRESTStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList { + return nil +} + +func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { + return func(obj runtime.Object) bool { + actualPod := obj.(*api.Pod) + if !api.Semantic.DeepDerivative(pod.Status, actualPod.Status) { + t.Errorf("not a deep derivative %#v", actualPod) + return false + } + return api.HasObjectMetaSystemFieldValues(&actualPod.ObjectMeta) + } +} + func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}} + strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false} return f, &Etcd{ - NewFunc: func() runtime.Object { return &api.Pod{} }, - NewListFunc: func() runtime.Object { return &api.PodList{} }, - EndpointName: "pods", - KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" }, + NewFunc: func() runtime.Object { return &api.Pod{} }, + NewListFunc: func() runtime.Object { return &api.PodList{} }, + EndpointName: "pods", + CreateStrategy: strategy, + UpdateStrategy: strategy, + KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" }, KeyFunc: func(ctx api.Context, id string) (string, error) { return path.Join("/registry/pods", id), nil }, - Helper: h, + ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil }, + Helper: h, } } @@ -153,11 +186,11 @@ func TestEtcdList(t *testing.T) { func TestEtcdCreate(t *testing.T) { podA := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Status: api.PodStatus{Host: "machine"}, } podB := &api.Pod{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Status: api.PodStatus{Host: "machine2"}, } @@ -178,18 +211,98 @@ func TestEtcdCreate(t *testing.T) { } path := "/registry/pods/foo" - key := "foo" table := map[string]struct { existing tools.EtcdResponseWithError expect tools.EtcdResponseWithError toCreate runtime.Object + objOK func(obj runtime.Object) bool errOK func(error) bool }{ "normal": { existing: emptyNode, + toCreate: podA, + objOK: hasCreated(t, podA), + errOK: func(err error) bool { return err == nil }, + }, + "preExisting": { + existing: nodeWithPodA, expect: nodeWithPodA, + toCreate: podB, + errOK: errors.IsAlreadyExists, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + fakeClient.Data[path] = item.existing + obj, err := registry.Create(api.NewDefaultContext(), item.toCreate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + actual := fakeClient.Data[path] + if item.objOK != nil { + if !item.objOK(obj) { + t.Errorf("%v: unexpected returned: %v", name, obj) + } + actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value)) + if err != nil { + t.Errorf("unable to decode stored value for %#v", actual) + continue + } + if !item.objOK(actualObj) { + t.Errorf("%v: unexpected response: %v", name, actual) + } + } else { + if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } + } +} + +// DEPRECATED +func TestEtcdCreateWithName(t *testing.T) { + podA := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Status: api.PodStatus{Host: "machine"}, + } + podB := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Status: api.PodStatus{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + key := "foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toCreate runtime.Object + objOK func(obj runtime.Object) bool + errOK func(error) bool + }{ + "normal": { + existing: emptyNode, toCreate: podA, + objOK: hasCreated(t, podA), errOK: func(err error) bool { return err == nil }, }, "preExisting": { @@ -203,18 +316,146 @@ func TestEtcdCreate(t *testing.T) { for name, item := range table { fakeClient, registry := NewTestGenericEtcdRegistry(t) fakeClient.Data[path] = item.existing - err := registry.CreateWithName(api.NewContext(), key, item.toCreate) + err := registry.CreateWithName(api.NewDefaultContext(), key, item.toCreate) if !item.errOK(err) { t.Errorf("%v: unexpected error: %v", name, err) } - if e, a := item.expect, fakeClient.Data[path]; !api.Semantic.DeepDerivative(e, a) { - t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + actual := fakeClient.Data[path] + if item.objOK != nil { + obj, err := api.Scheme.Decode([]byte(actual.R.Node.Value)) + if err != nil { + t.Errorf("unable to decode stored value for %#v", actual) + continue + } + if !item.objOK(obj) { + t.Errorf("%v: unexpected response: %v", name, actual) + } + } else { + if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } } } } func TestEtcdUpdate(t *testing.T) { + podA := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Status: api.PodStatus{Host: "machine"}, + } + podB := &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "1"}, + Status: api.PodStatus{Host: "machine2"}, + } + + nodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + newerNodeWithPodA := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podA), + ModifiedIndex: 2, + CreatedIndex: 1, + }, + }, + E: nil, + } + + nodeWithPodB := tools.EtcdResponseWithError{ + R: &etcd.Response{ + Node: &etcd.Node{ + Value: runtime.EncodeOrDie(testapi.Codec(), podB), + ModifiedIndex: 1, + CreatedIndex: 1, + }, + }, + E: nil, + } + + emptyNode := tools.EtcdResponseWithError{ + R: &etcd.Response{}, + E: tools.EtcdErrorNotFound, + } + + path := "/registry/pods/foo" + + table := map[string]struct { + existing tools.EtcdResponseWithError + expect tools.EtcdResponseWithError + toUpdate runtime.Object + allowCreate bool + objOK func(obj runtime.Object) bool + errOK func(error) bool + }{ + "normal": { + existing: nodeWithPodA, + expect: nodeWithPodB, + toUpdate: podB, + errOK: func(err error) bool { return err == nil }, + }, + "notExisting": { + existing: emptyNode, + expect: emptyNode, + toUpdate: podA, + errOK: func(err error) bool { return errors.IsNotFound(err) }, + }, + "createIfNotFound": { + existing: emptyNode, + toUpdate: podA, + allowCreate: true, + objOK: hasCreated(t, podA), + errOK: func(err error) bool { return err == nil }, + }, + "outOfDate": { + existing: newerNodeWithPodA, + expect: newerNodeWithPodA, + toUpdate: podB, + errOK: func(err error) bool { return errors.IsConflict(err) }, + }, + } + + for name, item := range table { + fakeClient, registry := NewTestGenericEtcdRegistry(t) + registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = item.allowCreate + fakeClient.Data[path] = item.existing + obj, _, err := registry.Update(api.NewDefaultContext(), item.toUpdate) + if !item.errOK(err) { + t.Errorf("%v: unexpected error: %v", name, err) + } + + actual := fakeClient.Data[path] + if item.objOK != nil { + if !item.objOK(obj) { + t.Errorf("%v: unexpected returned: %#v", name, obj) + } + actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value)) + if err != nil { + t.Errorf("unable to decode stored value for %#v", actual) + continue + } + if !item.objOK(actualObj) { + t.Errorf("%v: unexpected response: %#v", name, actual) + } + } else { + if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) { + t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a)) + } + } + } +} + +// DEPRECATED +func TestEtcdUpdateWithName(t *testing.T) { podA := &api.Pod{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Status: api.PodStatus{Host: "machine"}, diff --git a/pkg/tools/etcd_tools.go b/pkg/tools/etcd_tools.go index 915feb2134562..3bffc224e375c 100644 --- a/pkg/tools/etcd_tools.go +++ b/pkg/tools/etcd_tools.go @@ -404,10 +404,11 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo // First time this key has been used, try creating new value. if index == 0 { - _, err = h.Client.Create(key, string(data), 0) + response, err := h.Client.Create(key, string(data), 0) if IsEtcdNodeExist(err) { continue } + _, _, err = h.extractObj(response, err, ptrToType, false, false) return err }