From b0068d05f818a2f8c04d71368a7186b5bc7c5655 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Tue, 24 Mar 2015 19:57:04 -0700 Subject: [PATCH] Add the ability to watch fields of an rc --- pkg/api/v1beta1/conversion.go | 15 ++++ pkg/api/v1beta2/conversion.go | 15 ++++ pkg/api/v1beta3/conversion.go | 15 ++++ pkg/registry/controller/etcd/etcd_test.go | 85 +++++++++++++++++++++++ pkg/registry/controller/rest.go | 15 ++-- pkg/tools/etcd_helper_watch.go | 15 +++- 6 files changed, 153 insertions(+), 7 deletions(-) diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index da18a18c1466a..971b7be34cdca 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -1442,6 +1442,21 @@ func init() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "replicationControllers", + func(label, value string) (string, string, error) { + switch label { + case "name": + return "name", value, nil + case "currentState.replicas": + return "status.replicas", value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // If one of the conversion functions is malformed, detect it immediately. + panic(err) + } err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "events", func(label, value string) (string, string, error) { switch label { diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index 89b5cd805715b..0daedc37c893e 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -1368,6 +1368,21 @@ func init() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "replicationControllers", + func(label, value string) (string, string, error) { + switch label { + case "name": + return "name", value, nil + case "currentState.replicas": + return "status.replicas", value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // If one of the conversion functions is malformed, detect it immediately. + panic(err) + } err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "events", func(label, value string) (string, string, error) { switch label { diff --git a/pkg/api/v1beta3/conversion.go b/pkg/api/v1beta3/conversion.go index c40fb780fee9c..50a025eb9304e 100644 --- a/pkg/api/v1beta3/conversion.go +++ b/pkg/api/v1beta3/conversion.go @@ -39,6 +39,21 @@ func init() { // If one of the conversion functions is malformed, detect it immediately. panic(err) } + err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "replicationControllers", + func(label, value string) (string, string, error) { + switch label { + case "name": + return "name", value, nil + case "status.replicas": + return "status.replicas", value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) + if err != nil { + // If one of the conversion functions is malformed, detect it immediately. + panic(err) + } err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "events", func(label, value string) (string, string, error) { switch label { diff --git a/pkg/registry/controller/etcd/etcd_test.go b/pkg/registry/controller/etcd/etcd_test.go index 54f675e910960..ef51bf982ad69 100644 --- a/pkg/registry/controller/etcd/etcd_test.go +++ b/pkg/registry/controller/etcd/etcd_test.go @@ -34,6 +34,11 @@ import ( "github.com/coreos/go-etcd/etcd" ) +const ( + PASS = iota + FAIL +) + // newStorage creates a REST storage backed by etcd helpers func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) { fakeEtcdClient := tools.NewFakeEtcdClient(t) @@ -531,6 +536,86 @@ func TestEtcdWatchControllersMatch(t *testing.T) { watching.Stop() } +func TestEtcdWatchControllersFields(t *testing.T) { + ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace) + storage, fakeClient := newStorage(t) + fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods")) + + testFieldMap := map[int][]fields.Set{ + PASS: { + {"status.replicas": "0"}, + {"name": "foo"}, + {"status.replicas": "0", "name": "foo"}, + }, + FAIL: { + {"status.replicas": "10"}, + {"name": "bar"}, + {"status.replicas": "10", "name": "foo"}, + {"status.replicas": "0", "name": "bar"}, + }, + } + testEtcdActions := []string{ + tools.EtcdCreate, + tools.EtcdSet, + tools.EtcdCAS, + tools.EtcdDelete} + + controller := &api.ReplicationController{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Labels: validController.Spec.Selector, + Namespace: "default", + }, + Status: api.ReplicationControllerStatus{ + Replicas: 0, + }, + } + controllerBytes, _ := latest.Codec.Encode(controller) + + for expectedResult, fieldSet := range testFieldMap { + for _, field := range fieldSet { + for _, action := range testEtcdActions { + watching, err := storage.Watch(ctx, + labels.Everything(), + field.AsSelector(), + "1", + ) + var prevNode *etcd.Node = nil + node := &etcd.Node{ + Value: string(controllerBytes), + } + if action == tools.EtcdDelete { + prevNode = node + } + fakeClient.WaitForWatchCompletion() + fakeClient.WatchResponse <- &etcd.Response{ + Action: action, + Node: node, + PrevNode: prevNode, + } + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + select { + case r, ok := <-watching.ResultChan(): + if expectedResult == FAIL { + t.Errorf("Unexpected result from channel %#v", r) + } + if !ok { + t.Errorf("watching channel should be open") + } + case <-time.After(time.Millisecond * 100): + if expectedResult == PASS { + t.Error("unexpected timeout from result channel") + } + } + watching.Stop() + } + } + } +} + func TestEtcdWatchControllersNotMatch(t *testing.T) { ctx := api.NewDefaultContext() storage, fakeClient := newStorage(t) diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index 4ca1e22707d06..996eb646486a1 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" + "strconv" ) // rcStrategy implements verification logic for Replication Controllers. @@ -65,19 +66,25 @@ func (rcStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.Validation return validation.ValidateReplicationControllerUpdate(old.(*api.ReplicationController), obj.(*api.ReplicationController)) } +// ControllerToSelectableFields returns a label set that represents the object. +func ControllerToSelectableFields(controller *api.ReplicationController) labels.Set { + return labels.Set{ + "name": controller.Name, + "status.replicas": strconv.Itoa(controller.Status.Replicas), + } +} + // MatchController is the filter used by the generic etcd backend to route // watch events from etcd to clients of the apiserver only interested in specific // labels/fields. func MatchController(label labels.Selector, field fields.Selector) generic.Matcher { return generic.MatcherFunc( func(obj runtime.Object) (bool, error) { - if !field.Empty() { - return false, fmt.Errorf("field selector not supported yet") - } controllerObj, ok := obj.(*api.ReplicationController) if !ok { return false, fmt.Errorf("Given object is not a replication controller.") } - return label.Matches(labels.Set(controllerObj.Labels)), nil + fields := ControllerToSelectableFields(controllerObj) + return label.Matches(labels.Set(controllerObj.Labels)) && field.Matches(fields), nil }) } diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index 2a55994c6ccfe..f7081134941c9 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -32,6 +32,15 @@ import ( "github.com/golang/glog" ) +// Etcd watch event actions +const ( + EtcdCreate = "create" + EtcdGet = "get" + EtcdSet = "set" + EtcdCAS = "compareAndSwap" + EtcdDelete = "delete" +) + // FilterFunc is a predicate which takes an API object and returns true // iff the object should remain in the set. type FilterFunc func(obj runtime.Object) bool @@ -377,11 +386,11 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) { func (w *etcdWatcher) sendResult(res *etcd.Response) { switch res.Action { - case "create", "get": + case EtcdCreate, EtcdGet: w.sendAdd(res) - case "set", "compareAndSwap": + case EtcdSet, EtcdCAS: w.sendModify(res) - case "delete": + case EtcdDelete: w.sendDelete(res) default: glog.Errorf("unknown action: %v", res.Action)