Skip to content

Commit

Permalink
Fix watch for events; add test for interface implementation so it won…
Browse files Browse the repository at this point in the history
…'t break again.
  • Loading branch information
lavalamp committed Nov 12, 2014
1 parent 02a0593 commit 178d0af
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 69 deletions.
7 changes: 0 additions & 7 deletions pkg/api/errors/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,3 @@ func InterpretDeleteError(err error, kind, name string) error {
return err
}
}

// InterpretResourceVersionError returns the appropriate api error
// for a failure to convert the resource version of an object sent
// to the API to an etcd uint64 index.
func InterpretResourceVersionError(err error, kind, value string) error {
return errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", value)})
}
24 changes: 4 additions & 20 deletions pkg/registry/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package etcd

import (
"fmt"
"strconv"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
Expand Down Expand Up @@ -97,21 +96,6 @@ func makePodKey(ctx api.Context, id string) (string, error) {
return MakeEtcdItemKey(ctx, PodPath, id)
}

// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, etcderr.InterpretResourceVersionError(err, kind, resourceVersion)
}
return version + 1, nil
}

// ListPods obtains a list of pods with labels that match selector.
func (r *Registry) ListPods(ctx api.Context, selector labels.Selector) (*api.PodList, error) {
return r.ListPodsPredicate(ctx, func(pod *api.Pod) bool {
Expand Down Expand Up @@ -143,7 +127,7 @@ func (r *Registry) ListPodsPredicate(ctx api.Context, filter func(*api.Pod) bool

// WatchPods begins watching for new, changed, or deleted pods.
func (r *Registry) WatchPods(ctx api.Context, resourceVersion string, filter func(*api.Pod) bool) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "pod")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "pod")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -354,7 +338,7 @@ func (r *Registry) ListControllers(ctx api.Context) (*api.ReplicationControllerL

// WatchControllers begins watching for new, changed, or deleted controllers.
func (r *Registry) WatchControllers(ctx api.Context, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "replicationControllers")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "replicationControllers")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -516,7 +500,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) error {

// WatchServices begins watching for new, changed, or deleted service configurations.
func (r *Registry) WatchServices(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "service")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "service")
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -561,7 +545,7 @@ func (r *Registry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpoints) er

// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
version, err := ParseWatchResourceVersion(resourceVersion, "endpoints")
version, err := tools.ParseWatchResourceVersion(resourceVersion, "endpoints")
if err != nil {
return nil, err
}
Expand Down
35 changes: 0 additions & 35 deletions pkg/registry/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,41 +42,6 @@ func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
return registry
}

func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
Kind string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 2},
{Version: "10", ExpectVersion: 11},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}

// TestEtcdGetPodDifferentNamespace ensures same-name pods in different namespaces do not clash
func TestEtcdGetPodDifferentNamespace(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/event/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (rs *REST) List(ctx api.Context, label, field labels.Selector) (runtime.Obj

// Watch returns Events events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVersion string) (watch.Interface, error) {
return rs.registry.Watch(ctx, &generic.SelectionPredicate{label, field, rs.getAttrs}, resourceVersion)
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/registry/event/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
Expand Down Expand Up @@ -50,6 +51,8 @@ func TestRESTCreate(t *testing.T) {
if e, a := eventA, (<-c).Object; !reflect.DeepEqual(e, a) {
t.Errorf("diff: %s", util.ObjectDiff(e, a))
}
// Ensure we implement the interface
_ = apiserver.ResourceWatcher(rest)
}

func TestRESTDelete(t *testing.T) {
Expand Down Expand Up @@ -216,7 +219,7 @@ func TestRESTWatch(t *testing.T) {
Reason: "forTesting",
}
reg, rest := NewTestREST()
wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), 0)
wi, err := rest.Watch(api.NewContext(), labels.Everything(), labels.Everything(), "0")
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/registry/generic/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,12 @@ func (e *Etcd) Delete(ctx api.Context, id string) error {

// Watch starts a watch for the items that m matches.
// TODO: Detect if m references a single object instead of a list.
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) {
return e.Helper.WatchList(e.KeyRoot, resourceVersion, func(obj runtime.Object) bool {
func (e *Etcd) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName)
if err != nil {
return nil, err
}
return e.Helper.WatchList(e.KeyRoot, version, func(obj runtime.Object) bool {
matches, err := m.Matches(obj)
return err == nil && matches
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/generic/etcd/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func TestEtcdWatch(t *testing.T) {
}

fakeClient, registry := NewTestGenericEtcdRegistry(t)
wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, 1)
wi, err := registry.Watch(api.NewContext(), EverythingMatcher{}, "1")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/generic/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type Registry interface {
Update(ctx api.Context, id string, obj runtime.Object) error
Get(ctx api.Context, id string) (runtime.Object, error)
Delete(ctx api.Context, id string) error
Watch(ctx api.Context, m Matcher, resourceVersion uint64) (watch.Interface, error)
Watch(ctx api.Context, m Matcher, resourceVersion string) (watch.Interface, error)
}

// FilterList filters any list object that conforms to the api conventions,
Expand Down
2 changes: 1 addition & 1 deletion pkg/registry/registrytest/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (r *GenericRegistry) List(ctx api.Context, m generic.Matcher) (runtime.Obje
return generic.FilterList(r.ObjectList, m)
}

func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion uint64) (watch.Interface, error) {
func (r *GenericRegistry) Watch(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) {
// TODO: wire filter down into the mux; it needs access to current and previous state :(
return r.Mux.Watch(), nil
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/tools/etcd_tools_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@ limitations under the License.
package tools

import (
"strconv"
"sync"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"

"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
Expand All @@ -37,6 +40,21 @@ func Everything(runtime.Object) bool {
return true
}

// ParseWatchResourceVersion takes a resource version argument and converts it to
// the etcd version we should pass to helper.Watch(). Because resourceVersion is
// an opaque value, the default watch behavior for non-zero watch is to watch
// the next value (if you pass "1", you will see updates from "2" onwards).
func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
if resourceVersion == "" || resourceVersion == "0" {
return 0, nil
}
version, err := strconv.ParseUint(resourceVersion, 10, 64)
if err != nil {
return 0, errors.NewInvalid(kind, "", errors.ValidationErrorList{errors.NewFieldInvalid("resourceVersion", resourceVersion)})
}
return version + 1, nil
}

// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface. resourceVersion may be used to specify what version to begin
Expand Down
36 changes: 36 additions & 0 deletions pkg/tools/etcd_tools_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
Expand Down Expand Up @@ -601,3 +602,38 @@ func TestWatchPurposefulShutdown(t *testing.T) {
t.Errorf("An injected error did not cause a graceful shutdown")
}
}

func TestEtcdParseWatchResourceVersion(t *testing.T) {
testCases := []struct {
Version string
Kind string
ExpectVersion uint64
Err bool
}{
{Version: "", ExpectVersion: 0},
{Version: "a", Err: true},
{Version: " ", Err: true},
{Version: "1", ExpectVersion: 2},
{Version: "10", ExpectVersion: 11},
}
for _, testCase := range testCases {
version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind)
switch {
case testCase.Err:
if err == nil {
t.Errorf("%s: unexpected non-error", testCase.Version)
continue
}
if !errors.IsInvalid(err) {
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
case !testCase.Err && err != nil:
t.Errorf("%s: unexpected error: %v", testCase.Version, err)
continue
}
if version != testCase.ExpectVersion {
t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version)
}
}
}

0 comments on commit 178d0af

Please sign in to comment.