Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

genericetcd.Etcd should test resourceVersion #5010

Merged
merged 1 commit into from
Mar 4, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pkg/registry/generic/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package etcd

import (
"fmt"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kubeerr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
etcderr "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors/etcd"
Expand Down Expand Up @@ -228,6 +230,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
if err != nil {
return nil, false, err
}
// TODO: expose TTL
creating := false
out := e.NewFunc()
err = e.Helper.AtomicUpdate(key, out, true, func(existing runtime.Object) (runtime.Object, error) {
Expand All @@ -237,21 +240,30 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool
}
if version == 0 {
if !e.UpdateStrategy.AllowCreateOnUpdate() {
return nil, kubeerr.NewAlreadyExists(e.EndpointName, name)
return nil, kubeerr.NewNotFound(e.EndpointName, name)
}
creating = true
if err := rest.BeforeCreate(e.CreateStrategy, ctx, obj); err != nil {
return nil, err
}
return obj, nil
}

creating = false
newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this Update function using the AtomicUpdate pattern? Shouldn't it just use CompareAndSwap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting the stage for being able to pass an arbitrary function into Update(), to allow Update() to be used from contexts that want to use the AtomicUpdate style remerge behavior on specific fields. The normal Update case is what it uses today.

----- Original Message -----

    creating = false
  •   newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
    

Why is this Update function using the AtomicUpdate pattern? Shouldn't it just
use CompareAndSwap?


Reply to this email directly or view it on GitHub:
https://github.com/GoogleCloudPlatform/kubernetes/pull/5010/files#r25822475

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An example is the generic Resize controller. I'd like to expose Resize in a way that allows it to use the same update logic, but detect whether a size changed in the event of a conflict. If it didn't, it should allow the update to continue.

----- Original Message -----

Setting the stage for being able to pass an arbitrary function into Update(),
to allow Update() to be used from contexts that want to use the AtomicUpdate
style remerge behavior on specific fields. The normal Update case is what
it uses today.

----- Original Message -----

  creating = false
  • newVersion, err := e.Helper.ResourceVersioner.ResourceVersion(obj)
    

Why is this Update function using the AtomicUpdate pattern? Shouldn't it
just
use CompareAndSwap?


Reply to this email directly or view it on GitHub:
https://github.com/GoogleCloudPlatform/kubernetes/pull/5010/files#r25822475

if err != nil {
return nil, err
}
if newVersion != version {
// TODO: return the most recent version to a client?
return nil, kubeerr.NewConflict(e.EndpointName, name, fmt.Errorf("the resource was updated to %d", version))
}
if err := rest.BeforeUpdate(e.UpdateStrategy, ctx, obj, existing); err != nil {
return nil, err
}
// TODO: expose TTL
return obj, nil
})

if err != nil {
if creating {
err = etcderr.InterpretCreateError(err, e.EndpointName, name)
Expand Down
263 changes: 252 additions & 11 deletions pkg/registry/generic/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,52 @@ import (
"github.com/coreos/go-etcd/etcd"
)

type testRESTStrategy struct {
runtime.ObjectTyper
api.NameGenerator
namespaceScoped bool
allowCreateOnUpdate bool
}

func (t *testRESTStrategy) NamespaceScoped() bool { return t.namespaceScoped }
func (t *testRESTStrategy) AllowCreateOnUpdate() bool { return t.allowCreateOnUpdate }

func (t *testRESTStrategy) ResetBeforeCreate(obj runtime.Object) {}
func (t *testRESTStrategy) Validate(obj runtime.Object) errors.ValidationErrorList {
return nil
}
func (t *testRESTStrategy) ValidateUpdate(obj, old runtime.Object) errors.ValidationErrorList {
return nil
}

func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool {
return func(obj runtime.Object) bool {
actualPod := obj.(*api.Pod)
if !api.Semantic.DeepDerivative(pod.Status, actualPod.Status) {
t.Errorf("not a deep derivative %#v", actualPod)
return false
}
return api.HasObjectMetaSystemFieldValues(&actualPod.ObjectMeta)
}
}

func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) {
f := tools.NewFakeEtcdClient(t)
f.TestIndex = true
h := tools.EtcdHelper{f, testapi.Codec(), tools.RuntimeVersionAdapter{testapi.MetadataAccessor()}}
strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false}
return f, &Etcd{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
EndpointName: "pods",
KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" },
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
EndpointName: "pods",
CreateStrategy: strategy,
UpdateStrategy: strategy,
KeyRootFunc: func(ctx api.Context) string { return "/registry/pods" },
KeyFunc: func(ctx api.Context, id string) (string, error) {
return path.Join("/registry/pods", id), nil
},
Helper: h,
ObjectNameFunc: func(obj runtime.Object) (string, error) { return obj.(*api.Pod).Name, nil },
Helper: h,
}
}

Expand Down Expand Up @@ -153,11 +186,11 @@ func TestEtcdList(t *testing.T) {

func TestEtcdCreate(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}
podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine2"},
}

Expand All @@ -178,18 +211,98 @@ func TestEtcdCreate(t *testing.T) {
}

path := "/registry/pods/foo"
key := "foo"

table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toCreate runtime.Object
objOK func(obj runtime.Object) bool
errOK func(error) bool
}{
"normal": {
existing: emptyNode,
toCreate: podA,
objOK: hasCreated(t, podA),
errOK: func(err error) bool { return err == nil },
},
"preExisting": {
existing: nodeWithPodA,
expect: nodeWithPodA,
toCreate: podB,
errOK: errors.IsAlreadyExists,
},
}

for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing
obj, err := registry.Create(api.NewDefaultContext(), item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

actual := fakeClient.Data[path]
if item.objOK != nil {
if !item.objOK(obj) {
t.Errorf("%v: unexpected returned: %v", name, obj)
}
actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value))
if err != nil {
t.Errorf("unable to decode stored value for %#v", actual)
continue
}
if !item.objOK(actualObj) {
t.Errorf("%v: unexpected response: %v", name, actual)
}
} else {
if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}
}

// DEPRECATED
func TestEtcdCreateWithName(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}
podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine2"},
}

nodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}

emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}

path := "/registry/pods/foo"
key := "foo"

table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toCreate runtime.Object
objOK func(obj runtime.Object) bool
errOK func(error) bool
}{
"normal": {
existing: emptyNode,
toCreate: podA,
objOK: hasCreated(t, podA),
errOK: func(err error) bool { return err == nil },
},
"preExisting": {
Expand All @@ -203,18 +316,146 @@ func TestEtcdCreate(t *testing.T) {
for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.CreateWithName(api.NewContext(), key, item.toCreate)
err := registry.CreateWithName(api.NewDefaultContext(), key, item.toCreate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

if e, a := item.expect, fakeClient.Data[path]; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
actual := fakeClient.Data[path]
if item.objOK != nil {
obj, err := api.Scheme.Decode([]byte(actual.R.Node.Value))
if err != nil {
t.Errorf("unable to decode stored value for %#v", actual)
continue
}
if !item.objOK(obj) {
t.Errorf("%v: unexpected response: %v", name, actual)
}
} else {
if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}
}

func TestEtcdUpdate(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Status: api.PodStatus{Host: "machine"},
}
podB := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault, ResourceVersion: "1"},
Status: api.PodStatus{Host: "machine2"},
}

nodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}

newerNodeWithPodA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podA),
ModifiedIndex: 2,
CreatedIndex: 1,
},
},
E: nil,
}

nodeWithPodB := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), podB),
ModifiedIndex: 1,
CreatedIndex: 1,
},
},
E: nil,
}

emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}

path := "/registry/pods/foo"

table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toUpdate runtime.Object
allowCreate bool
objOK func(obj runtime.Object) bool
errOK func(error) bool
}{
"normal": {
existing: nodeWithPodA,
expect: nodeWithPodB,
toUpdate: podB,
errOK: func(err error) bool { return err == nil },
},
"notExisting": {
existing: emptyNode,
expect: emptyNode,
toUpdate: podA,
errOK: func(err error) bool { return errors.IsNotFound(err) },
},
"createIfNotFound": {
existing: emptyNode,
toUpdate: podA,
allowCreate: true,
objOK: hasCreated(t, podA),
errOK: func(err error) bool { return err == nil },
},
"outOfDate": {
existing: newerNodeWithPodA,
expect: newerNodeWithPodA,
toUpdate: podB,
errOK: func(err error) bool { return errors.IsConflict(err) },
},
}

for name, item := range table {
fakeClient, registry := NewTestGenericEtcdRegistry(t)
registry.UpdateStrategy.(*testRESTStrategy).allowCreateOnUpdate = item.allowCreate
fakeClient.Data[path] = item.existing
obj, _, err := registry.Update(api.NewDefaultContext(), item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

actual := fakeClient.Data[path]
if item.objOK != nil {
if !item.objOK(obj) {
t.Errorf("%v: unexpected returned: %#v", name, obj)
}
actualObj, err := api.Scheme.Decode([]byte(actual.R.Node.Value))
if err != nil {
t.Errorf("unable to decode stored value for %#v", actual)
continue
}
if !item.objOK(actualObj) {
t.Errorf("%v: unexpected response: %#v", name, actual)
}
} else {
if e, a := item.expect, actual; !api.Semantic.DeepDerivative(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectDiff(e, a))
}
}
}
}

// DEPRECATED
func TestEtcdUpdateWithName(t *testing.T) {
podA := &api.Pod{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Status: api.PodStatus{Host: "machine"},
Expand Down
3 changes: 2 additions & 1 deletion pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,11 @@ func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNo

// First time this key has been used, try creating new value.
if index == 0 {
_, err = h.Client.Create(key, string(data), 0)
response, err := h.Client.Create(key, string(data), 0)
if IsEtcdNodeExist(err) {
continue
}
_, _, err = h.extractObj(response, err, ptrToType, false, false)
return err
}

Expand Down