diff --git a/hack/test-cmd.sh b/hack/test-cmd.sh index 26cab4a281e49..08b235f5d12ca 100755 --- a/hack/test-cmd.sh +++ b/hack/test-cmd.sh @@ -570,6 +570,7 @@ __EOF__ curl -s "http://127.0.0.1:${API_PORT}/swaggerapi/api/${version}" > "${file}" [[ "$(grep "list of returned" "${file}")" ]] [[ "$(grep "list of pods" "${file}")" ]] + [[ "$(grep "watch for changes to the described resources" "${file}")" ]] fi kube::test::clear_all diff --git a/pkg/api/types.go b/pkg/api/types.go index d25d7af82ef84..ac030fd1df4fb 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1170,7 +1170,8 @@ type DeleteOptions struct { GracePeriodSeconds *int64 `json:"gracePeriodSeconds"` } -// ListOptions is the query options to a standard REST list call +// ListOptions is the query options to a standard REST list call, and has future support for +// watch calls. type ListOptions struct { TypeMeta `json:",inline"` @@ -1178,6 +1179,10 @@ type ListOptions struct { LabelSelector labels.Selector // A selector based on fields FieldSelector fields.Selector + // If true, watch for changes to this list + Watch bool + // The resource version to watch (no effect on list yet) + ResourceVersion string } // Status is a return value for calls that don't return other objects. diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index afd3df0080dda..00416faf360a2 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -993,6 +993,10 @@ type ListOptions struct { LabelSelector string `json:"labels" description:"a selector to restrict the list of returned objects by their labels; defaults to everything"` // A selector based on fields FieldSelector string `json:"fields" description:"a selector to restrict the list of returned objects by their fields; defaults to everything"` + // If true, watch for changes to the selected resources + Watch bool `json:"watch" description:"watch for changes to the described resources and return them as a stream of add, update, and remove notifications; specify resourceVersion"` + // The desired resource version to watch + ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"` } // Status is a return value for calls that don't return other objects. diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index df53559266b0f..4a60b0da5bd74 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -1007,6 +1007,10 @@ type ListOptions struct { LabelSelector string `json:"labels" description:"a selector to restrict the list of returned objects by their labels; defaults to everything"` // A selector based on fields FieldSelector string `json:"fields" description:"a selector to restrict the list of returned objects by their fields; defaults to everything"` + // If true, watch for changes to the selected resources + Watch bool `json:"watch" description:"watch for changes to the described resources and return them as a stream of add, update, and remove notifications; specify resourceVersion"` + // The desired resource version to watch + ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"` } // Status is a return value for calls that don't return other objects. diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 876a2d9c7d084..56418b9617299 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -1166,6 +1166,10 @@ type ListOptions struct { LabelSelector string `json:"labelSelector" description:"a selector to restrict the list of returned objects by their labels; defaults to everything"` // A selector based on fields FieldSelector string `json:"fieldSelector" description:"a selector to restrict the list of returned objects by their fields; defaults to everything"` + // If true, watch for changes to the selected resources + Watch bool `json:"watch" description:"watch for changes to the described resources and return them as a stream of add, update, and remove notifications; specify resourceVersion"` + // The desired resource version to watch + ResourceVersion string `json:"resourceVersion" description:"when specified with a watch call, shows changes that occur after that particular version of a resource; defaults to changes from the beginning of history"` } // Status is a return value for calls that don't return other objects. diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index a3e7f3cc0063e..89e0132311013 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -31,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" "github.com/emicklei/go-restful" ) @@ -59,15 +60,6 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { // Create the WebService. ws = a.newWebService() - // Initialize the custom handlers. - watchHandler := (&WatchHandler{ - storage: a.group.Storage, - mapper: a.group.Mapper, - convertor: a.group.Convertor, - codec: a.group.Codec, - linker: a.group.Linker, - info: a.info, - }) redirectHandler := (&RedirectHandler{a.group.Storage, a.group.Codec, a.group.Context, a.info}) proxyHandler := (&ProxyHandler{a.prefix + "/proxy/", a.group.Storage, a.group.Codec, a.group.Context, a.info}) @@ -80,7 +72,7 @@ func (a *APIInstaller) Install() (ws *restful.WebService, errors []error) { } sort.Strings(paths) for _, path := range paths { - if err := a.registerResourceHandlers(path, a.group.Storage[path], ws, watchHandler, redirectHandler, proxyHandler); err != nil { + if err := a.registerResourceHandlers(path, a.group.Storage[path], ws, redirectHandler, proxyHandler); err != nil { errors = append(errors, err) } } @@ -98,7 +90,7 @@ func (a *APIInstaller) newWebService() *restful.WebService { return ws } -func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, watchHandler, redirectHandler, proxyHandler http.Handler) error { +func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService, redirectHandler, proxyHandler http.Handler) error { admit := a.group.Admit context := a.group.Context @@ -129,17 +121,6 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } versionedObject := indirectArbitraryPointer(versionedPtr) - var versionedList interface{} - if lister, ok := storage.(rest.Lister); ok { - list := lister.NewList() - _, listKind, err := a.group.Typer.ObjectVersionAndKind(list) - versionedListPtr, err := a.group.Creater.New(a.group.Version, listKind) - if err != nil { - return err - } - versionedList = indirectArbitraryPointer(versionedListPtr) - } - mapping, err := a.group.Mapper.RESTMapping(kind, a.group.Version) if err != nil { return err @@ -153,13 +134,24 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag gracefulDeleter, isGracefulDeleter := storage.(rest.GracefulDeleter) updater, isUpdater := storage.(rest.Updater) patcher, isPatcher := storage.(rest.Patcher) - _, isWatcher := storage.(rest.Watcher) + watcher, isWatcher := storage.(rest.Watcher) _, isRedirector := storage.(rest.Redirector) storageMeta, isMetadata := storage.(rest.StorageMetadata) if !isMetadata { storageMeta = defaultStorageMetadata{} } + var versionedList interface{} + if isLister { + list := lister.NewList() + _, listKind, err := a.group.Typer.ObjectVersionAndKind(list) + versionedListPtr, err := a.group.Creater.New(a.group.Version, listKind) + if err != nil { + return err + } + versionedList = indirectArbitraryPointer(versionedListPtr) + } + versionedListOptions, err := a.group.Creater.New(serverVersion, "ListOptions") if err != nil { return err @@ -324,7 +316,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag addParams(route, action.Params) ws.Route(route) case "LIST": // List all resources of a kind. - route := ws.GET(action.Path).To(ListResource(lister, reqScope)). + route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, false)). Filter(m). Doc("list objects of kind " + kind). Operation("list" + kind). @@ -375,22 +367,30 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } addParams(route, action.Params) ws.Route(route) + // TODO: deprecated case "WATCH": // Watch a resource. - route := ws.GET(action.Path).To(routeFunction(watchHandler)). + route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)). Filter(m). - Doc("watch a particular " + kind). + Doc("watch changes to an object of kind " + kind). Operation("watch" + kind). Produces("application/json"). - Writes(versionedObject) + Writes(watchjson.NewWatchEvent()) + if err := addObjectParams(ws, route, versionedListOptions); err != nil { + return err + } addParams(route, action.Params) ws.Route(route) + // TODO: deprecated case "WATCHLIST": // Watch all resources of a kind. - route := ws.GET(action.Path).To(routeFunction(watchHandler)). + route := ws.GET(action.Path).To(ListResource(lister, watcher, reqScope, true)). Filter(m). - Doc("watch a list of " + kind). + Doc("watch individual changes to a list of " + kind). Operation("watch" + kind + "list"). Produces("application/json"). - Writes(versionedList) + Writes(watchjson.NewWatchEvent()) + if err := addObjectParams(ws, route, versionedListOptions); err != nil { + return err + } addParams(route, action.Params) ws.Route(route) case "REDIRECT": // Get the redirect URL for a resource. diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 62a67a688699a..1bfe62be59433 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -19,6 +19,7 @@ package apiserver import ( "fmt" "net/http" + "net/url" gpath "path" "time" @@ -98,7 +99,7 @@ func GetResource(r rest.Getter, scope RequestScope) restful.RouteFunction { } // ListResource returns a function that handles retrieving a list of resources from a rest.Storage object. -func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction { +func ListResource(r rest.Lister, rw rest.Watcher, scope RequestScope, forceWatch bool) restful.RouteFunction { return func(req *restful.Request, res *restful.Response) { w := res.ResponseWriter @@ -110,22 +111,8 @@ func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction { ctx := scope.ContextFunc(req) ctx = api.WithNamespace(ctx, namespace) - // TODO: extract me into a method - query := req.Request.URL.Query() - versioned, err := scope.Creater.New(scope.ServerAPIVersion, "ListOptions") + out, err := queryToObject(req.Request.URL.Query(), scope, "ListOptions") if err != nil { - // programmer error - errorJSON(err, scope.Codec, w) - return - } - if err := scope.Convertor.Convert(&query, versioned); err != nil { - // bad request - errorJSON(err, scope.Codec, w) - return - } - out, err := scope.Convertor.ConvertToVersion(versioned, "") - if err != nil { - // programmer error errorJSON(err, scope.Codec, w) return } @@ -136,11 +123,22 @@ func ListResource(r rest.Lister, scope RequestScope) restful.RouteFunction { return scope.Convertor.ConvertFieldLabel(scope.APIVersion, scope.Kind, label, value) } if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil { - // invalid field + // TODO: allow bad request to set field causes based on query parameters + err = errors.NewBadRequest(err.Error()) errorJSON(err, scope.Codec, w) return } + if (opts.Watch || forceWatch) && rw != nil { + watcher, err := rw.Watch(ctx, opts.LabelSelector, opts.FieldSelector, opts.ResourceVersion) + if err != nil { + errorJSON(err, scope.Codec, w) + return + } + serveWatch(watcher, scope, w, req) + return + } + result, err := r.List(ctx, opts.LabelSelector, opts.FieldSelector) if err != nil { errorJSON(err, scope.Codec, w) @@ -419,6 +417,27 @@ func DeleteResource(r rest.GracefulDeleter, checkBody bool, scope RequestScope, } } +// queryToObject converts query parameters into a structured internal object by +// kind. The caller must cast the returned object to the matching internal Kind +// to use it. +// TODO: add appropriate structured error responses +func queryToObject(query url.Values, scope RequestScope, kind string) (runtime.Object, error) { + versioned, err := scope.Creater.New(scope.ServerAPIVersion, kind) + if err != nil { + // programmer error + return nil, err + } + if err := scope.Convertor.Convert(&query, versioned); err != nil { + return nil, errors.NewBadRequest(err.Error()) + } + out, err := scope.Convertor.ConvertToVersion(versioned, "") + if err != nil { + // programmer error + return nil, err + } + return out, nil +} + // resultFunc is a function that returns a rest result and can be run in a goroutine type resultFunc func() (runtime.Object, error) diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 49faa2e3cf98d..a95906805a1b2 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -17,155 +17,39 @@ limitations under the License. package apiserver import ( - "fmt" "net/http" - "net/url" - "path" + "reflect" "regexp" "strings" - "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" + "github.com/emicklei/go-restful" "github.com/golang/glog" "golang.org/x/net/websocket" ) -// TODO: convert me to resthandler custom verb -type WatchHandler struct { - storage map[string]rest.Storage - mapper meta.RESTMapper - convertor runtime.ObjectConvertor - codec runtime.Codec - linker runtime.SelfLinker - info *APIRequestInfoResolver -} - -// setSelfLinkAddName sets the self link, appending the object's name to the canonical path & type. -func (h *WatchHandler) setSelfLinkAddName(obj runtime.Object, req *http.Request) error { - name, err := h.linker.Name(obj) - if err != nil { - return err - } - newURL := *req.URL - newURL.Path = path.Join(req.URL.Path, name) - newURL.RawQuery = "" - newURL.Fragment = "" - return h.linker.SetSelfLink(obj, newURL.String()) -} - var connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)") func isWebsocketRequest(req *http.Request) bool { return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket" } -// ServeHTTP processes watch requests. -func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - var verb string - var apiResource string - var httpCode int - reqStart := time.Now() - defer monitor("watch", &verb, &apiResource, &httpCode, reqStart) - - if req.Method != "GET" { - httpCode = errorJSON(errors.NewBadRequest( - fmt.Sprintf("unsupported method for watch: %s", req.Method)), h.codec, w) - return - } - - requestInfo, err := h.info.GetAPIRequestInfo(req) - if err != nil { - httpCode = errorJSON(errors.NewBadRequest( - fmt.Sprintf("failed to find api request info: %s", err.Error())), h.codec, w) - return - } - verb = requestInfo.Verb - ctx := api.WithNamespace(api.NewContext(), requestInfo.Namespace) - - storage := h.storage[requestInfo.Resource] - if storage == nil { - httpCode = errorJSON(errors.NewNotFound(requestInfo.Resource, "Resource"), h.codec, w) - return - } - apiResource = requestInfo.Resource - watcher, ok := storage.(rest.Watcher) - if !ok { - httpCode = errorJSON(errors.NewMethodNotSupported(requestInfo.Resource, "watch"), h.codec, w) - return - } - kind := requestInfo.Kind - if len(kind) == 0 { - if _, kind, err = h.mapper.VersionAndKindForResource(apiResource); err != nil { - glog.Errorf("No kind found for %s: %v", apiResource, err) - } - } - - scope := RequestScope{ - Convertor: h.convertor, - Kind: kind, - Resource: apiResource, - APIVersion: requestInfo.APIVersion, - // TODO: this must be parameterized per version, and is incorrect for implementors - // outside of Kubernetes. Fix by refactoring watch under resthandler as a custome - // resource. - ServerAPIVersion: requestInfo.APIVersion, - } - label, field, err := parseSelectorQueryParams(req.URL.Query(), scope) - if err != nil { - httpCode = errorJSON(err, h.codec, w) - return - } - - resourceVersion := req.URL.Query().Get("resourceVersion") - watching, err := watcher.Watch(ctx, label, field, resourceVersion) - if err != nil { - httpCode = errorJSON(err, h.codec, w) - return - } - httpCode = http.StatusOK - - // TODO: This is one watch per connection. We want to multiplex, so that - // multiple watches of the same thing don't create two watches downstream. - watchServer := &WatchServer{watching, h.codec, func(obj runtime.Object) { - if err := h.setSelfLinkAddName(obj, req); err != nil { - glog.Errorf("Failed to set self link for object %#v", obj) +// serveWatch handles serving requests to the server +func serveWatch(watcher watch.Interface, scope RequestScope, w http.ResponseWriter, req *restful.Request) { + watchServer := &WatchServer{watcher, scope.Codec, func(obj runtime.Object) { + if err := setSelfLink(obj, req, scope.Namer); err != nil { + glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err) } }} - if isWebsocketRequest(req) { - websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req) + if isWebsocketRequest(req.Request) { + websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(w), req.Request) } else { - watchServer.ServeHTTP(w, req) - } -} - -// TODO: remove when watcher is refactored to fit under api_installer -func parseSelectorQueryParams(query url.Values, scope RequestScope) (label labels.Selector, field fields.Selector, err error) { - labelString := query.Get(api.LabelSelectorQueryParam(scope.ServerAPIVersion)) - label, err = labels.Parse(labelString) - if err != nil { - return nil, nil, errors.NewBadRequest(fmt.Sprintf("The 'labels' selector parameter (%s) could not be parsed: %v", labelString, err)) - } - - fn := func(label, value string) (newLabel, newValue string, err error) { - return scope.Convertor.ConvertFieldLabel(scope.APIVersion, scope.Kind, label, value) - } - fieldString := query.Get(api.FieldSelectorQueryParam(scope.ServerAPIVersion)) - field, err = fields.ParseAndTransformSelector(fieldString, fn) - if err != nil { - return nil, nil, errors.NewBadRequest(fmt.Sprintf("The 'fields' selector parameter (%s) could not be parsed: %v", fieldString, err)) + watchServer.ServeHTTP(w, req.Request) } - glog.Infof("Found %#v %#v from %v in scope %#v", label, field, query, scope) - return label, field, nil } // WatchServer serves a watch.Interface over a websocket or vanilla HTTP. diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index a394f6e5765cb..40368d58fc2a8 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -194,13 +194,13 @@ func TestWatchParamParsing(t *testing.T) { fieldSelector: "", namespace: api.NamespaceAll, }, { - rawQuery: "namespace=default&resourceVersion=314159&" + api.FieldSelectorQueryParam(testVersion) + "=Host%3D&" + api.LabelSelectorQueryParam(testVersion) + "=name%3Dfoo", + rawQuery: "namespace=default&resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo", resourceVersion: "314159", labelSelector: "name=foo", fieldSelector: "Host=", namespace: api.NamespaceDefault, }, { - rawQuery: "namespace=watchother&" + api.FieldSelectorQueryParam(testVersion) + "=id%3dfoo&resourceVersion=1492", + rawQuery: "namespace=watchother&fields=id%3dfoo&resourceVersion=1492", resourceVersion: "1492", labelSelector: "", fieldSelector: "id=foo", diff --git a/pkg/runtime/conversion.go b/pkg/runtime/conversion.go index b47f9e6bbe7c8..21df85acba109 100644 --- a/pkg/runtime/conversion.go +++ b/pkg/runtime/conversion.go @@ -40,6 +40,7 @@ func JSONKeyMapper(key string, sourceTag, destTag reflect.StructTag) (string, st var DefaultStringConversions = []interface{}{ convertStringSliceToString, convertStringSliceToInt, + convertStringSliceToBool, convertStringSliceToInt64, } @@ -64,6 +65,19 @@ func convertStringSliceToInt(input *[]string, out *int, s conversion.Scope) erro return nil } +func convertStringSliceToBool(input *[]string, out *bool, s conversion.Scope) error { + if len(*input) == 0 { + *out = false + } + switch strings.ToLower((*input)[0]) { + case "true", "1": + *out = true + default: + *out = true + } + return nil +} + func convertStringSliceToInt64(input *[]string, out *int64, s conversion.Scope) error { if len(*input) == 0 { *out = 0 diff --git a/pkg/watch/json/types.go b/pkg/watch/json/types.go index 6688e0fbba233..66c297a029a21 100644 --- a/pkg/watch/json/types.go +++ b/pkg/watch/json/types.go @@ -37,6 +37,11 @@ type watchEvent struct { Object runtime.RawExtension `json:"object,omitempty"` } +// NewWatchEvent returns the serialization form of watchEvent for structured schemas +func NewWatchEvent() interface{} { + return &watchEvent{} +} + // Object converts a watch.Event into an appropriately serializable JSON object func Object(codec runtime.Codec, event *watch.Event) (interface{}, error) { obj, ok := event.Object.(runtime.Object)