Skip to content

Commit

Permalink
Merge pull request kubernetes#805 from lavalamp/serverWatch
Browse files Browse the repository at this point in the history
Improve watch
  • Loading branch information
smarterclayton committed Aug 8, 2014
2 parents 4c00acd + 5dd130a commit 71c6e08
Show file tree
Hide file tree
Showing 21 changed files with 474 additions and 142 deletions.
2 changes: 1 addition & 1 deletion cmd/kubecfg/kubecfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func executeAPIRequest(method string, s *kube_client.Client) bool {

r := s.Verb(verb).
Path(path).
ParseSelector(*selector)
ParseSelectorParam("labels", *selector)
if setBody {
if version != 0 {
data := readConfig(storage)
Expand Down
51 changes: 22 additions & 29 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ type SimpleRESTStorage struct {
updated *Simple
created *Simple

// Valid if WatchAll or WatchSingle is called
fakeWatch *watch.FakeWatcher

// Set if WatchSingle is called
requestedID string
// These are set when Watch is called
fakeWatch *watch.FakeWatcher
requestedLabelSelector labels.Selector
requestedFieldSelector labels.Selector
requestedResourceVersion uint64

// If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj receives the original input to the update, delete, or create call.
Expand All @@ -95,7 +95,7 @@ func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error)
if storage.injectedFunction != nil {
return storage.injectedFunction(id)
}
return api.Status{Status: api.StatusSuccess}, nil
return &api.Status{Status: api.StatusSuccess}, nil
}), nil
}

Expand Down Expand Up @@ -130,18 +130,11 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e
}

// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) {
if err := storage.errors["watchAll"]; err != nil {
return nil, err
}
storage.fakeWatch = watch.NewFake()
return storage.fakeWatch, nil
}

// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) {
storage.requestedID = id
if err := storage.errors["watchSingle"]; err != nil {
func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
storage.requestedLabelSelector = label
storage.requestedFieldSelector = field
storage.requestedResourceVersion = resourceVersion
if err := storage.errors["watch"]; err != nil {
return nil, err
}
storage.fakeWatch = watch.NewFake()
Expand All @@ -164,17 +157,17 @@ func TestNotFound(t *testing.T) {
Path string
}
cases := map[string]T{
"PATCH method": T{"PATCH", "/prefix/version/foo"},
"GET long prefix": T{"GET", "/prefix/"},
"GET missing storage": T{"GET", "/prefix/version/blah"},
"GET with extra segment": T{"GET", "/prefix/version/foo/bar/baz"},
"POST with extra segment": T{"POST", "/prefix/version/foo/bar"},
"DELETE without extra segment": T{"DELETE", "/prefix/version/foo"},
"DELETE with extra segment": T{"DELETE", "/prefix/version/foo/bar/baz"},
"PUT without extra segment": T{"PUT", "/prefix/version/foo"},
"PUT with extra segment": T{"PUT", "/prefix/version/foo/bar/baz"},
"watch missing storage": T{"GET", "/prefix/version/watch/"},
"watch with bad method": T{"POST", "/prefix/version/watch/foo/bar"},
"PATCH method": {"PATCH", "/prefix/version/foo"},
"GET long prefix": {"GET", "/prefix/"},
"GET missing storage": {"GET", "/prefix/version/blah"},
"GET with extra segment": {"GET", "/prefix/version/foo/bar/baz"},
"POST with extra segment": {"POST", "/prefix/version/foo/bar"},
"DELETE without extra segment": {"DELETE", "/prefix/version/foo"},
"DELETE with extra segment": {"DELETE", "/prefix/version/foo/bar/baz"},
"PUT without extra segment": {"PUT", "/prefix/version/foo"},
"PUT with extra segment": {"PUT", "/prefix/version/foo/bar/baz"},
"watch missing storage": {"GET", "/prefix/version/watch/"},
"watch with bad method": {"POST", "/prefix/version/watch/foo/bar"},
}
handler := New(map[string]RESTStorage{
"foo": &SimpleRESTStorage{},
Expand Down
15 changes: 10 additions & 5 deletions pkg/apiserver/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ type RESTStorage interface {
New() interface{}

// List selects resources in the storage which match to the selector.
// TODO: add field selector in addition to label selector.
List(labels.Selector) (interface{}, error)

// Get finds a resource in the storage by id and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Get(id string) (interface{}, error)

// Delete finds a resource in the storage and deletes it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Delete(id string) (<-chan interface{}, error)

Create(interface{}) (<-chan interface{}, error)
Expand All @@ -46,7 +49,9 @@ type RESTStorage interface {
// ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface {
// TODO: take a query, like List, to filter out unwanted events.
WatchAll() (watch.Interface, error)
WatchSingle(id string) (watch.Interface, error)
// 'label' selects on labels; 'field' selects on the object's fields. Not all fields
// are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
// particular version.
Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}
29 changes: 22 additions & 7 deletions pkg/apiserver/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,37 @@ package apiserver
import (
"encoding/json"
"net/http"
"net/url"
"strconv"

"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

type WatchHandler struct {
storage map[string]RESTStorage
}

func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) {
if s, err := labels.ParseSelector(query.Get("labels")); err != nil {
label = labels.Everything()
} else {
label = s
}
if s, err := labels.ParseSelector(query.Get("fields")); err != nil {
field = labels.Everything()
} else {
field = s
}
if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil {
resourceVersion = rv
}
return label, field, resourceVersion
}

// handleWatch processes a watch request
func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
parts := splitPath(req.URL.Path)
Expand All @@ -41,13 +61,8 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
notFound(w, req)
}
if watcher, ok := storage.(ResourceWatcher); ok {
var watching watch.Interface
var err error
if id := req.URL.Query().Get("id"); id != "" {
watching, err = watcher.WatchSingle(id)
} else {
watching, err = watcher.WatchAll()
}
label, field, resourceVersion := getWatchParams(req.URL.Query())
watching, err := watcher.Watch(label, field, resourceVersion)
if err != nil {
internalError(err, w)
return
Expand Down
75 changes: 65 additions & 10 deletions pkg/apiserver/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var watchTestTable = []struct {

func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix/version")
Expand All @@ -48,17 +49,13 @@ func TestWatchWebsocket(t *testing.T) {
dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
dest.RawQuery = ""

ws, err := websocket.Dial(dest.String(), "", "http://localhost")
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}

try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
Expand Down Expand Up @@ -98,7 +95,7 @@ func TestWatchHTTP(t *testing.T) {

dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
dest.RawQuery = ""

request, err := http.NewRequest("GET", dest.String(), nil)
if err != nil {
Expand All @@ -114,10 +111,6 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected response %#v", response)
}

if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}

decoder := json.NewDecoder(response.Body)

try := func(action watch.EventType, object interface{}) {
Expand Down Expand Up @@ -148,3 +141,65 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected non-error")
}
}

func TestWatchParamParsing(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix/version")
server := httptest.NewServer(handler)

dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"

table := []struct {
rawQuery string
resourceVersion uint64
labelSelector string
fieldSelector string
}{
{
rawQuery: "resourceVersion=1234",
resourceVersion: 1234,
labelSelector: "",
fieldSelector: "",
}, {
rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo",
resourceVersion: 314159,
labelSelector: "name=foo",
fieldSelector: "Host=",
}, {
rawQuery: "fields=ID%3dfoo&resourceVersion=1492",
resourceVersion: 1492,
labelSelector: "",
fieldSelector: "ID=foo",
}, {
rawQuery: "",
resourceVersion: 0,
labelSelector: "",
fieldSelector: "",
},
}

for _, item := range table {
simpleStorage.requestedLabelSelector = nil
simpleStorage.requestedFieldSelector = nil
simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases
dest.RawQuery = item.rawQuery
resp, err := http.Get(dest.String())
if err != nil {
t.Errorf("%v: unexpected error: %v", item.rawQuery, err)
continue
}
resp.Body.Close()
if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
}
}
19 changes: 17 additions & 2 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)

// Interface holds the methods for clients of Kubenetes,
// an interface to allow mock testing.
// TODO: split this up by resource?
// TODO: these should return/take pointers.
type Interface interface {
ListPods(selector labels.Selector) (api.PodList, error)
GetPod(name string) (api.Pod, error)
Expand All @@ -45,6 +48,7 @@ type Interface interface {
CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error)
DeleteReplicationController(string) error
WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)

GetService(name string) (api.Service, error)
CreateService(api.Service) (api.Service, error)
Expand Down Expand Up @@ -169,7 +173,7 @@ func (c *Client) makeURL(path string) string {

// ListPods takes a selector, and returns the list of pods that match that selector
func (c *Client) ListPods(selector labels.Selector) (result api.PodList, err error) {
err = c.Get().Path("pods").Selector(selector).Do().Into(&result)
err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(&result)
return
}

Expand Down Expand Up @@ -202,7 +206,7 @@ func (c *Client) UpdatePod(pod api.Pod) (result api.Pod, err error) {

// ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector
func (c *Client) ListReplicationControllers(selector labels.Selector) (result api.ReplicationControllerList, err error) {
err = c.Get().Path("replicationControllers").Selector(selector).Do().Into(&result)
err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(&result)
return
}

Expand Down Expand Up @@ -233,6 +237,17 @@ func (c *Client) DeleteReplicationController(name string) error {
return c.Delete().Path("replicationControllers").Path(name).Do().Error()
}

// WatchReplicationControllers returns a watch.Interface that watches the requested controllers.
func (c *Client) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("replicationControllers").
UintParam("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}

// GetService returns information about a particular service.
func (c *Client) GetService(name string) (result api.Service, err error) {
err = c.Get().Path("services").Path(name).Do().Into(&result)
Expand Down
6 changes: 3 additions & 3 deletions pkg/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestListEmptyPods(t *testing.T) {
Request: testRequest{Method: "GET", Path: "/pods"},
Response: Response{StatusCode: 200, Body: api.PodList{}},
}
podList, err := c.Setup().ListPods(nil)
podList, err := c.Setup().ListPods(labels.Everything())
c.Validate(t, podList, err)
}

Expand All @@ -65,7 +65,7 @@ func TestListPods(t *testing.T) {
},
},
}
receivedPodList, err := c.Setup().ListPods(nil)
receivedPodList, err := c.Setup().ListPods(labels.Everything())
c.Validate(t, receivedPodList, err)
}

Expand Down Expand Up @@ -191,7 +191,7 @@ func TestListControllers(t *testing.T) {
},
},
}
receivedControllerList, err := c.Setup().ListReplicationControllers(nil)
receivedControllerList, err := c.Setup().ListReplicationControllers(labels.Everything())
c.Validate(t, receivedControllerList, err)

}
Expand Down
Loading

0 comments on commit 71c6e08

Please sign in to comment.