Skip to content

Commit

Permalink
Merge pull request #5971 from bprashanth/rc_watch_fields
Browse files Browse the repository at this point in the history
Add the ability to watch fields of a replication controller
  • Loading branch information
piosz committed Mar 27, 2015
2 parents eec9ad6 + b0068d0 commit 768c733
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 7 deletions.
15 changes: 15 additions & 0 deletions pkg/api/v1beta1/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,21 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "replicationControllers",
func(label, value string) (string, string, error) {
switch label {
case "name":
return "name", value, nil
case "currentState.replicas":
return "status.replicas", value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta1", "events",
func(label, value string) (string, string, error) {
switch label {
Expand Down
15 changes: 15 additions & 0 deletions pkg/api/v1beta2/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -1440,6 +1440,21 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "replicationControllers",
func(label, value string) (string, string, error) {
switch label {
case "name":
return "name", value, nil
case "currentState.replicas":
return "status.replicas", value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta2", "events",
func(label, value string) (string, string, error) {
switch label {
Expand Down
15 changes: 15 additions & 0 deletions pkg/api/v1beta3/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ func init() {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "replicationControllers",
func(label, value string) (string, string, error) {
switch label {
case "name":
return "name", value, nil
case "status.replicas":
return "status.replicas", value, nil
default:
return "", "", fmt.Errorf("field label not supported: %s", label)
}
})
if err != nil {
// If one of the conversion functions is malformed, detect it immediately.
panic(err)
}
err = newer.Scheme.AddFieldLabelConversionFunc("v1beta3", "events",
func(label, value string) (string, string, error) {
switch label {
Expand Down
85 changes: 85 additions & 0 deletions pkg/registry/controller/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ import (
"github.com/coreos/go-etcd/etcd"
)

const (
PASS = iota
FAIL
)

// newStorage creates a REST storage backed by etcd helpers
func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient) {
fakeEtcdClient := tools.NewFakeEtcdClient(t)
Expand Down Expand Up @@ -531,6 +536,86 @@ func TestEtcdWatchControllersMatch(t *testing.T) {
watching.Stop()
}

func TestEtcdWatchControllersFields(t *testing.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), validController.Namespace)
storage, fakeClient := newStorage(t)
fakeClient.ExpectNotFoundGet(etcdgeneric.NamespaceKeyRootFunc(ctx, "/registry/pods"))

testFieldMap := map[int][]fields.Set{
PASS: {
{"status.replicas": "0"},
{"name": "foo"},
{"status.replicas": "0", "name": "foo"},
},
FAIL: {
{"status.replicas": "10"},
{"name": "bar"},
{"status.replicas": "10", "name": "foo"},
{"status.replicas": "0", "name": "bar"},
},
}
testEtcdActions := []string{
tools.EtcdCreate,
tools.EtcdSet,
tools.EtcdCAS,
tools.EtcdDelete}

controller := &api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Labels: validController.Spec.Selector,
Namespace: "default",
},
Status: api.ReplicationControllerStatus{
Replicas: 0,
},
}
controllerBytes, _ := latest.Codec.Encode(controller)

for expectedResult, fieldSet := range testFieldMap {
for _, field := range fieldSet {
for _, action := range testEtcdActions {
watching, err := storage.Watch(ctx,
labels.Everything(),
field.AsSelector(),
"1",
)
var prevNode *etcd.Node = nil
node := &etcd.Node{
Value: string(controllerBytes),
}
if action == tools.EtcdDelete {
prevNode = node
}
fakeClient.WaitForWatchCompletion()
fakeClient.WatchResponse <- &etcd.Response{
Action: action,
Node: node,
PrevNode: prevNode,
}
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

select {
case r, ok := <-watching.ResultChan():
if expectedResult == FAIL {
t.Errorf("Unexpected result from channel %#v", r)
}
if !ok {
t.Errorf("watching channel should be open")
}
case <-time.After(time.Millisecond * 100):
if expectedResult == PASS {
t.Error("unexpected timeout from result channel")
}
}
watching.Stop()
}
}
}
}

func TestEtcdWatchControllersNotMatch(t *testing.T) {
ctx := api.NewDefaultContext()
storage, fakeClient := newStorage(t)
Expand Down
15 changes: 11 additions & 4 deletions pkg/registry/controller/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors"
"strconv"
)

// rcStrategy implements verification logic for Replication Controllers.
Expand Down Expand Up @@ -73,19 +74,25 @@ func (rcStrategy) ValidateUpdate(obj, old runtime.Object) fielderrors.Validation
return validation.ValidateReplicationControllerUpdate(old.(*api.ReplicationController), obj.(*api.ReplicationController))
}

// ControllerToSelectableFields returns a label set that represents the object.
func ControllerToSelectableFields(controller *api.ReplicationController) labels.Set {
return labels.Set{
"name": controller.Name,
"status.replicas": strconv.Itoa(controller.Status.Replicas),
}
}

// MatchController is the filter used by the generic etcd backend to route
// watch events from etcd to clients of the apiserver only interested in specific
// labels/fields.
func MatchController(label labels.Selector, field fields.Selector) generic.Matcher {
return generic.MatcherFunc(
func(obj runtime.Object) (bool, error) {
if !field.Empty() {
return false, fmt.Errorf("field selector not supported yet")
}
controllerObj, ok := obj.(*api.ReplicationController)
if !ok {
return false, fmt.Errorf("Given object is not a replication controller.")
}
return label.Matches(labels.Set(controllerObj.Labels)), nil
fields := ControllerToSelectableFields(controllerObj)
return label.Matches(labels.Set(controllerObj.Labels)) && field.Matches(fields), nil
})
}
15 changes: 12 additions & 3 deletions pkg/tools/etcd_helper_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,15 @@ import (
"github.com/golang/glog"
)

// Etcd watch event actions
const (
EtcdCreate = "create"
EtcdGet = "get"
EtcdSet = "set"
EtcdCAS = "compareAndSwap"
EtcdDelete = "delete"
)

// FilterFunc is a predicate which takes an API object and returns true
// iff the object should remain in the set.
type FilterFunc func(obj runtime.Object) bool
Expand Down Expand Up @@ -377,11 +386,11 @@ func (w *etcdWatcher) sendDelete(res *etcd.Response) {

func (w *etcdWatcher) sendResult(res *etcd.Response) {
switch res.Action {
case "create", "get":
case EtcdCreate, EtcdGet:
w.sendAdd(res)
case "set", "compareAndSwap":
case EtcdSet, EtcdCAS:
w.sendModify(res)
case "delete":
case EtcdDelete:
w.sendDelete(res)
default:
glog.Errorf("unknown action: %v", res.Action)
Expand Down

0 comments on commit 768c733

Please sign in to comment.