Skip to content

Commit

Permalink
Add "Update Event" to Kubernetes API
Browse files Browse the repository at this point in the history
  • Loading branch information
saad-ali committed Feb 5, 2015
1 parent 5de2e91 commit a41f520
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 35 deletions.
4 changes: 2 additions & 2 deletions pkg/api/errors/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func InterpretCreateError(err error, kind, name string) error {
}
}

// InterpretUpdateError converts a generic etcd error on a create
// InterpretUpdateError converts a generic etcd error on a update
// operation into the appropriate API error.
func InterpretUpdateError(err error, kind, name string) error {
switch {
Expand All @@ -54,7 +54,7 @@ func InterpretUpdateError(err error, kind, name string) error {
}
}

// InterpretDeleteError converts a generic etcd error on a create
// InterpretDeleteError converts a generic etcd error on a delete
// operation into the appropriate API error.
func InterpretDeleteError(err error, kind, name string) error {
switch {
Expand Down
8 changes: 4 additions & 4 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (r *Registry) UpdatePod(ctx api.Context, pod *api.Pod) error {
}
// There's no race with the scheduler, because either this write will fail because the host
// has been updated, or the host update will fail because this pod has been updated.
err = r.EtcdHelper.SetObj(podKey, pod)
err = r.EtcdHelper.SetObj(podKey, pod, 0 /* ttl */)
if err != nil {
return err
}
Expand Down Expand Up @@ -404,7 +404,7 @@ func (r *Registry) UpdateController(ctx api.Context, controller *api.Replication
if err != nil {
return err
}
err = r.SetObj(key, controller)
err = r.SetObj(key, controller, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, "replicationController", controller.Name)
}

Expand Down Expand Up @@ -512,7 +512,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {
if err != nil {
return err
}
err = r.SetObj(key, svc)
err = r.SetObj(key, svc, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, "service", svc.Name)
}

Expand Down Expand Up @@ -605,7 +605,7 @@ func (r *Registry) CreateMinion(ctx api.Context, minion *api.Node) error {

func (r *Registry) UpdateMinion(ctx api.Context, minion *api.Node) error {
// TODO: Add some validations.
err := r.SetObj(makeNodeKey(minion.Name), minion)
err := r.SetObj(makeNodeKey(minion.Name), minion, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, "minion", minion.Name)
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/registry/event/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ func (r registry) Create(ctx api.Context, id string, obj runtime.Object) error {
return etcderr.InterpretCreateError(err, r.Etcd.EndpointName, id)
}

// Update replaces an existing instance of the object, and sets a ttl so that the event
// doesn't stay in the system forever.
func (r registry) Update(ctx api.Context, id string, obj runtime.Object) error {
key, err := r.Etcd.KeyFunc(ctx, id)
if err != nil {
return err
}
err = r.Etcd.Helper.SetObj(key, obj, r.ttl)
return etcderr.InterpretUpdateError(err, r.Etcd.EndpointName, id)
}

// NewEtcdRegistry returns a registry which will store Events in the given
// EtcdHelper. ttl is the time that Events will be retained by the system.
func NewEtcdRegistry(h tools.EtcdHelper, ttl uint64) generic.Registry {
Expand Down
102 changes: 102 additions & 0 deletions pkg/registry/event/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,105 @@ func TestEventCreate(t *testing.T) {
}
}
}

func TestEventUpdate(t *testing.T) {
eventA := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Reason: "forTesting",
}
eventB := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "bar", Namespace: api.NamespaceDefault},
Reason: "for testing again",
}
eventC := &api.Event{
ObjectMeta: api.ObjectMeta{Name: "pan", Namespace: api.NamespaceDefault, ResourceVersion: "1"},
Reason: "for testing again something else",
}

nodeWithEventA := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), eventA),
ModifiedIndex: 1,
CreatedIndex: 1,
TTL: int64(testTTL),
},
},
E: nil,
}

nodeWithEventB := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), eventB),
ModifiedIndex: 1,
CreatedIndex: 1,
TTL: int64(testTTL),
},
},
E: nil,
}

nodeWithEventC := tools.EtcdResponseWithError{
R: &etcd.Response{
Node: &etcd.Node{
Value: runtime.EncodeOrDie(testapi.Codec(), eventC),
ModifiedIndex: 1,
CreatedIndex: 1,
TTL: int64(testTTL),
},
},
E: nil,
}

emptyNode := tools.EtcdResponseWithError{
R: &etcd.Response{},
E: tools.EtcdErrorNotFound,
}

ctx := api.NewDefaultContext()
key := "foo"
path, err := etcdgeneric.NamespaceKeyFunc(ctx, "/registry/events", key)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

table := map[string]struct {
existing tools.EtcdResponseWithError
expect tools.EtcdResponseWithError
toUpdate runtime.Object
errOK func(error) bool
}{
"doesNotExist": {
existing: emptyNode,
expect: nodeWithEventA,
toUpdate: eventA,
errOK: func(err error) bool { return err == nil },
},
"doesNotExist2": {
existing: emptyNode,
expect: nodeWithEventB,
toUpdate: eventB,
errOK: func(err error) bool { return err == nil },
},
"replaceExisting": {
existing: nodeWithEventA,
expect: nodeWithEventC,
toUpdate: eventC,
errOK: func(err error) bool { return err == nil },
},
}

for name, item := range table {
fakeClient, registry := NewTestEventEtcdRegistry(t)
fakeClient.Data[path] = item.existing
err := registry.Update(ctx, key, item.toUpdate)
if !item.errOK(err) {
t.Errorf("%v: unexpected error: %v", name, err)
}

if e, a := item.expect, fakeClient.Data[path]; !reflect.DeepEqual(e, a) {
t.Errorf("%v:\n%s", name, util.ObjectGoPrintDiff(e, a))
}
}
}
25 changes: 25 additions & 0 deletions pkg/registry/event/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan apiserver.RE
}), nil
}

// Update replaces an existing Event instance in storage.registry, with the given instance.
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan apiserver.RESTResult, error) {
event, ok := obj.(*api.Event)
if !ok {
return nil, fmt.Errorf("not an event object: %#v", obj)
}
if api.Namespace(ctx) != "" {
if !api.ValidNamespace(ctx, &event.ObjectMeta) {
return nil, errors.NewConflict("event", event.Namespace, fmt.Errorf("event.namespace does not match the provided context"))
}
}
if errs := validation.ValidateEvent(event); len(errs) > 0 {
return nil, errors.NewInvalid("event", event.Name, errs)
}
api.FillObjectMetaSystemFields(ctx, &event.ObjectMeta)

return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.Update(ctx, event.Name, event)
if err != nil {
return nil, err
}
return rs.registry.Get(ctx, event.Name)
}), nil
}

func (rs *REST) Delete(ctx api.Context, id string) (<-chan apiserver.RESTResult, error) {
obj, err := rs.registry.Get(ctx, id)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/registry/event/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,37 @@ func TestRESTCreate(t *testing.T) {
}
}

func TestRESTUpdate(t *testing.T) {
_, rest := NewTestREST()
eventA := testEvent("foo")
c, err := rest.Create(api.NewDefaultContext(), eventA)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
<-c
got, err := rest.Get(api.NewDefaultContext(), eventA.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if e, a := eventA, got; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
eventB := testEvent("bar")
u, err := rest.Update(api.NewDefaultContext(), eventB)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
<-u
got2, err := rest.Get(api.NewDefaultContext(), eventB.Name)
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if e, a := eventB, got2; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}

}

func TestRESTDelete(t *testing.T) {
_, rest := NewTestREST()
eventA := testEvent("foo")
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/generic/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (e *Etcd) Update(ctx api.Context, id string, obj runtime.Object) error {
return err
}
// TODO: verify that SetObj checks ResourceVersion before succeeding.
err = e.Helper.SetObj(key, obj)
err = e.Helper.SetObj(key, obj, 0 /* ttl */)
return etcderr.InterpretUpdateError(err, e.EndpointName, id)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/tools/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,22 +281,22 @@ func (h *EtcdHelper) Delete(key string, recursive bool) error {
return err
}

// SetObj marshals obj via json, and stores under key. Will do an
// atomic update if obj's ResourceVersion field is set.
func (h *EtcdHelper) SetObj(key string, obj runtime.Object) error {
// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion
// field is set. 'ttl' is time-to-live in seconds, and 0 means forever.
func (h *EtcdHelper) SetObj(key string, obj runtime.Object, ttl uint64) error {
data, err := h.Codec.Encode(obj)
if err != nil {
return err
}
if h.ResourceVersioner != nil {
if version, err := h.ResourceVersioner.ResourceVersion(obj); err == nil && version != 0 {
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", version)
_, err = h.Client.CompareAndSwap(key, string(data), ttl, "", version)
return err // err is shadowed!
}
}

// Create will fail if a key already exists.
_, err = h.Client.Create(key, string(data), 0)
_, err = h.Client.Create(key, string(data), ttl)
return err
}

Expand Down
16 changes: 13 additions & 3 deletions pkg/tools/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func TestSetObj(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
err := helper.SetObj("/some/key", obj)
err := helper.SetObj("/some/key", obj, 5)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
Expand All @@ -388,6 +388,10 @@ func TestSetObj(t *testing.T) {
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(5), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}

}

func TestSetObjWithVersion(t *testing.T) {
Expand All @@ -404,7 +408,7 @@ func TestSetObjWithVersion(t *testing.T) {
}

helper := EtcdHelper{fakeClient, testapi.Codec(), versioner}
err := helper.SetObj("/some/key", obj)
err := helper.SetObj("/some/key", obj, 7)
if err != nil {
t.Fatalf("Unexpected error %#v", err)
}
Expand All @@ -417,13 +421,16 @@ func TestSetObjWithVersion(t *testing.T) {
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(7), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
}

func TestSetObjWithoutResourceVersioner(t *testing.T) {
obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
fakeClient := NewFakeEtcdClient(t)
helper := EtcdHelper{fakeClient, testapi.Codec(), nil}
err := helper.SetObj("/some/key", obj)
err := helper.SetObj("/some/key", obj, 3)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
Expand All @@ -436,6 +443,9 @@ func TestSetObjWithoutResourceVersioner(t *testing.T) {
if expect != got {
t.Errorf("Wanted %v, got %v", expect, got)
}
if e, a := uint64(3), fakeClient.LastSetTTL; e != a {
t.Errorf("Wanted %v, got %v", e, a)
}
}

func TestAtomicUpdate(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/tools/fake_etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons
Value: value,
CreatedIndex: createdIndex,
ModifiedIndex: i,
TTL: int64(ttl),
},
},
}
Expand Down
Loading

0 comments on commit a41f520

Please sign in to comment.