Skip to content

Commit

Permalink
Move watch to being a resthandler resource and expose it on LIST
Browse files Browse the repository at this point in the history
    GET /pods?watch=true&resourceVersion=10

will now function equivalent to GET /watch/pods.
  • Loading branch information
smarterclayton committed Mar 27, 2015
1 parent 1618c39 commit eb0eff6
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 176 deletions.
1 change: 1 addition & 0 deletions hack/test-cmd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ __EOF__
curl -s "http://127.0.0.1:${API_PORT}/swaggerapi/api/${version}" > "${file}"
[[ "$(grep "list of returned" "${file}")" ]]
[[ "$(grep "list of pods" "${file}")" ]]
[[ "$(grep "watch for changes to the described resources" "${file}")" ]]
fi

kube::test::clear_all
Expand Down
7 changes: 6 additions & 1 deletion pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1170,14 +1170,19 @@ type DeleteOptions struct {
GracePeriodSeconds *int64 `json:"gracePeriodSeconds"`
}

// ListOptions is the query options to a standard REST list call
// ListOptions is the query options to a standard REST list call, and has future support for
// watch calls.
type ListOptions struct {
TypeMeta `json:",inline"`

// A selector based on labels
LabelSelector labels.Selector
// A selector based on fields
FieldSelector fields.Selector
// If true, watch for changes to this list
Watch bool
// The resource version to watch (no effect on list yet)
ResourceVersion string
}

// Status is a return value for calls that don't return other objects.
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,10 @@ type ListOptions struct {
LabelSelector string `json:"labels" description:"a selector to restrict the list of returned objects by their labels; defaults to everything"`
// A selector based on fields
FieldSelector string `json:"fields" description:"a selector to restrict the list of returned objects by their fields; defaults to everything"`
// If true, watch for changes to the selected resources
Watch bool `json:"watch" description:"watch for changes to the described resources and return them as a stream of add, update, and remove notifications; specify resourceVersion"`
// The desired resource version to watch
ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"`
}

// Status is a return value for calls that don't return other objects.
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,10 @@ type ListOptions struct {
LabelSelector string `json:"labels" description:"a selector to restrict the list of returned objects by their labels; defaults to everything"`
// A selector based on fields
FieldSelector string `json:"fields" description:"a selector to restrict the list of returned objects by their fields; defaults to everything"`
// If true, watch for changes to the selected resources
Watch bool `json:"watch" description:"watch for changes to the described resources and return them as a stream of add, update, and remove notifications; specify resourceVersion"`
// The desired resource version to watch
ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"`
}

// Status is a return value for calls that don't return other objects.
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ type ListOptions struct {
LabelSelector string `json:"labelSelector" description:"a selector to restrict the list of returned objects by their labels; defaults to everything"`
// A selector based on fields
FieldSelector string `json:"fieldSelector" description:"a selector to restrict the list of returned objects by their fields; defaults to everything"`
// If true, watch for changes to the selected resources
Watch bool `json:"watch" description:"watch for changes to the described resources and return them as a stream of add, update, and remove notifications; specify resourceVersion"`
// The desired resource version to watch
ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"`
}

// Status is a return value for calls that don't return other objects.
Expand Down
60 changes: 30 additions & 30 deletions pkg/apiserver/api_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"

"github.com/emicklei/go-restful"
)
Expand Down Expand Up @@ -59,15 +60,6 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) {
// Create the WebService.
ws = a.newWebService()

// Initialize the custom handlers.
watchHandler := (&WatchHandler{
storage: a.group.Storage,
mapper: a.group.Mapper,
convertor: a.group.Convertor,
codec: a.group.Codec,
linker: a.group.Linker,
info: a.info,
})
redirectHandler := (&RedirectHandler{a.group.Storage, a.group.Codec, a.group.Context, a.info})
proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info})

Expand All @@ -80,7 +72,7 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) {
}
sort.Strings(paths)
for _, path := range paths {
if err := a.registerResourceHandlers(path, a.group.Storage[path], ws, watchHandler, redirectHandler, proxyHandler); err != nil {
if err := a.registerResourceHandlers(path, a.group.Storage[path], ws, redirectHandler, proxyHandler); err != nil {
errors = append(errors, err)
}
}
Expand All @@ -98,7 +90,7 @@ func (a *APIInstaller) newWebService() *restful.WebService {
return ws
}

func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, watchHandler, redirectHandler, proxyHandler http.Handler) error {
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, redirectHandler, proxyHandler http.Handler) error {
admit := a.group.Admit
context := a.group.Context

Expand Down Expand Up @@ -129,17 +121,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
versionedObject := indirectArbitraryPointer(versionedPtr)

var versionedList interface{}
if lister, ok := storage.(rest.Lister); ok {
list := lister.NewList()
_, listKind, err := a.group.Typer.ObjectVersionAndKind(list)
versionedListPtr, err := a.group.Creater.New(a.group.Version, listKind)
if err != nil {
return err
}
versionedList = indirectArbitraryPointer(versionedListPtr)
}

mapping, err := a.group.Mapper.RESTMapping(kind, a.group.Version)
if err != nil {
return err
Expand All @@ -153,13 +134,24 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter)
updater, isUpdater := storage.(rest.Updater)
patcher, isPatcher := storage.(rest.Patcher)
_, isWatcher := storage.(rest.Watcher)
watcher, isWatcher := storage.(rest.Watcher)
_, isRedirector := storage.(rest.Redirector)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
if !isMetadata {
storageMeta = defaultStorageMetadata{}
}

var versionedList interface{}
if isLister {
list := lister.NewList()
_, listKind, err := a.group.Typer.ObjectVersionAndKind(list)
versionedListPtr, err := a.group.Creater.New(a.group.Version, listKind)
if err != nil {
return err
}
versionedList = indirectArbitraryPointer(versionedListPtr)
}

versionedListOptions, err := a.group.Creater.New(serverVersion, "ListOptions")
if err != nil {
return err
Expand Down Expand Up @@ -324,7 +316,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
addParams(route, action.Params)
ws.Route(route)
case "LIST": // List all resources of a kind.
route := ws.GET(action.Path).To(ListResource(lister, reqScope)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false)).
Filter(m).
Doc("list objects of kind " + kind).
Operation("list" + kind).
Expand Down Expand Up @@ -375,22 +367,30 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
addParams(route, action.Params)
ws.Route(route)
// TODO: deprecated
case "WATCH": // Watch a resource.
route := ws.GET(action.Path).To(routeFunction(watchHandler)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
Filter(m).
Doc("watch a particular " + kind).
Doc("watch changes to an object of kind " + kind).
Operation("watch" + kind).
Produces("application/json").
Writes(versionedObject)
Writes(watchjson.NewWatchEvent())
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return err
}
addParams(route, action.Params)
ws.Route(route)
// TODO: deprecated
case "WATCHLIST": // Watch all resources of a kind.
route := ws.GET(action.Path).To(routeFunction(watchHandler)).
route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)).
Filter(m).
Doc("watch a list of " + kind).
Doc("watch individual changes to a list of " + kind).
Operation("watch" + kind + "list").
Produces("application/json").
Writes(versionedList)
Writes(watchjson.NewWatchEvent())
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return err
}
addParams(route, action.Params)
ws.Route(route)
case "REDIRECT": // Get the redirect URL for a resource.
Expand Down
53 changes: 36 additions & 17 deletions pkg/apiserver/resthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package apiserver
import (
"fmt"
"net/http"
"net/url"
gpath "path"
"time"

Expand Down Expand Up @@ -98,7 +99,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction {
}

// ListResource returns a function that handles retrieving a list of resources from a rest.Storage object.
func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction {
func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
w := res.ResponseWriter

Expand All @@ -110,22 +111,8 @@ func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction {
ctx := scope.ContextFunc(req)
ctx = api.WithNamespace(ctx, namespace)

// TODO: extract me into a method
query := req.Request.URL.Query()
versioned, err := scope.Creater.New(scope.ServerAPIVersion, "ListOptions")
out, err := queryToObject(req.Request.URL.Query(), scope, "ListOptions")
if err != nil {
// programmer error
errorJSON(err, scope.Codec, w)
return
}
if err := scope.Convertor.Convert(&query, versioned); err != nil {
// bad request
errorJSON(err, scope.Codec, w)
return
}
out, err := scope.Convertor.ConvertToVersion(versioned, "")
if err != nil {
// programmer error
errorJSON(err, scope.Codec, w)
return
}
Expand All @@ -136,11 +123,22 @@ func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction {
return scope.Convertor.ConvertFieldLabel(scope.APIVersion, scope.Kind, label, value)
}
if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
// invalid field
// TODO: allow bad request to set field causes based on query parameters
err = errors.NewBadRequest(err.Error())
errorJSON(err, scope.Codec, w)
return
}

if (opts.Watch || forceWatch) && rw != nil {
watcher, err := rw.Watch(ctx, opts.LabelSelector, opts.FieldSelector, opts.ResourceVersion)
if err != nil {
errorJSON(err, scope.Codec, w)
return
}
serveWatch(watcher, scope, w, req)
return
}

result, err := r.List(ctx, opts.LabelSelector, opts.FieldSelector)
if err != nil {
errorJSON(err, scope.Codec, w)
Expand Down Expand Up @@ -419,6 +417,27 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope,
}
}

// queryToObject converts query parameters into a structured internal object by
// kind. The caller must cast the returned object to the matching internal Kind
// to use it.
// TODO: add appropriate structured error responses
func queryToObject(query url.Values, scope RequestScope, kind string) (runtime.Object, error) {
versioned, err := scope.Creater.New(scope.ServerAPIVersion, kind)
if err != nil {
// programmer error
return nil, err
}
if err := scope.Convertor.Convert(&query, versioned); err != nil {
return nil, errors.NewBadRequest(err.Error())
}
out, err := scope.Convertor.ConvertToVersion(versioned, "")
if err != nil {
// programmer error
return nil, err
}
return out, nil
}

// resultFunc is a function that returns a rest result and can be run in a goroutine
type resultFunc func() (runtime.Object, error)

Expand Down
Loading

0 comments on commit eb0eff6

Please sign in to comment.