Skip to content

Commit

Permalink
Merge pull request kubernetes#4326 from yujuhong/phantom_pods
Browse files Browse the repository at this point in the history
Allow AtomicUpdate() to surface the error when the key doesn't exist
  • Loading branch information
smarterclayton committed Feb 12, 2015
2 parents 8557cd6 + 6813683 commit 7d43971
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 13 deletions.
10 changes: 5 additions & 5 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (r *Registry) setPodHostTo(ctx api.Context, podID, oldMachine, machine stri
if err != nil {
return nil, err
}
err = r.AtomicUpdate(podKey, &api.Pod{}, func(obj runtime.Object) (runtime.Object, error) {
err = r.AtomicUpdate(podKey, &api.Pod{}, false, func(obj runtime.Object) (runtime.Object, error) {
pod, ok := obj.(*api.Pod)
if !ok {
return nil, fmt.Errorf("unexpected object: %#v", obj)
Expand All @@ -211,7 +211,7 @@ func (r *Registry) assignPod(ctx api.Context, podID string, machine string) erro
}
// Doing the constraint check this way provides atomicity guarantees.
contKey := makeBoundPodsKey(machine)
err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
err = r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPodList := in.(*api.BoundPods)
boundPodList.Items = append(boundPodList.Items, *boundPod)
if errors := constraint.Allowed(boundPodList.Items); len(errors) > 0 {
Expand Down Expand Up @@ -261,7 +261,7 @@ func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
}

containerKey := makeBoundPodsKey(podOut.Status.Host)
return r.AtomicUpdate(containerKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
return r.AtomicUpdate(containerKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
boundPods := in.(*api.BoundPods)
for ix := range boundPods.Items {
if boundPods.Items[ix].Name == pod.Name {
Expand Down Expand Up @@ -299,7 +299,7 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error {
}
// Next, remove the pod from the machine atomically.
contKey := makeBoundPodsKey(machine)
return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) {
return r.AtomicUpdate(contKey, &api.BoundPods{}, true, func(in runtime.Object) (runtime.Object, error) {
pods := in.(*api.BoundPods)
newPods := make([]api.BoundPod, 0, len(pods.Items))
found := false
Expand Down Expand Up @@ -553,7 +553,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er
return err
}
// TODO: this is a really bad misuse of AtomicUpdate, need to compute a diff inside the loop.
err = r.AtomicUpdate(key, &api.Endpoints{},
err = r.AtomicUpdate(key, &api.Endpoints{}, true,
func(input runtime.Object) (runtime.Object, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters
return endpoints, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
// Example:
//
// h := &util.EtcdHelper{client, encoding, versioning}
// err := h.AtomicUpdate("myKey", &MyType{}, func(input runtime.Object) (runtime.Object, error) {
// err := h.AtomicUpdate("myKey", &MyType{}, true, func(input runtime.Object) (runtime.Object, error) {
// // Before this function is called, currentObj has been reset to etcd's current
// // contents for "myKey".
//
Expand All @@ -323,15 +323,15 @@ type EtcdUpdateFunc func(input runtime.Object) (output runtime.Object, err error
// return cur, nil
// })
//
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, tryUpdate EtcdUpdateFunc) error {
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
v, err := conversion.EnforcePtr(ptrToType)
if err != nil {
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
for {
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, index, err := h.bodyAndExtractObj(key, obj, true)
origBody, index, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
if err != nil {
return err
}
Expand Down
36 changes: 31 additions & 5 deletions pkg/tools/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func TestAtomicUpdate(t *testing.T) {
// Create a new node.
fakeClient.ExpectNotFoundGet("/some/key")
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
return obj, nil
})
if err != nil {
Expand All @@ -475,7 +475,7 @@ func TestAtomicUpdate(t *testing.T) {
// Update an existing node.
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2}
err = helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true

if in.(*TestResource).Value != 1 {
Expand Down Expand Up @@ -510,7 +510,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
// Create a new node.
fakeClient.ExpectNotFoundGet("/some/key")
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
return obj, nil
})
if err != nil {
Expand All @@ -521,7 +521,7 @@ func TestAtomicUpdateNoChange(t *testing.T) {
callbackCalled := false
objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}
fakeClient.Err = errors.New("should not be called")
err = helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
err = helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
callbackCalled = true
return objUpdate, nil
})
Expand All @@ -533,6 +533,32 @@ func TestAtomicUpdateNoChange(t *testing.T) {
}
}

func TestAtomicUpdateKeyNotFound(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
helper := EtcdHelper{fakeClient, codec, versioner}

// Create a new node.
fakeClient.ExpectNotFoundGet("/some/key")
obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1}

f := func(in runtime.Object) (runtime.Object, error) {
return obj, nil
}

ignoreNotFound := false
err := helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
if err == nil {
t.Errorf("Expected error for key not found.")
}

ignoreNotFound = true
err = helper.AtomicUpdate("/some/key", &TestResource{}, ignoreNotFound, f)
if err != nil {
t.Errorf("Unexpected error %v.", err)
}
}

func TestAtomicUpdate_CreateCollision(t *testing.T) {
fakeClient := NewFakeEtcdClient(t)
fakeClient.TestIndex = true
Expand All @@ -552,7 +578,7 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
defer wgDone.Done()

firstCall := true
err := helper.AtomicUpdate("/some/key", &TestResource{}, func(in runtime.Object) (runtime.Object, error) {
err := helper.AtomicUpdate("/some/key", &TestResource{}, true, func(in runtime.Object) (runtime.Object, error) {
defer func() { firstCall = false }()

if firstCall {
Expand Down

0 comments on commit 7d43971

Please sign in to comment.