Skip to content

Commit

Permalink
Merge pull request kubernetes#5851 from smarterclayton/support_input_…
Browse files Browse the repository at this point in the history
…streams

Support input streams being returned by the APIserver
  • Loading branch information
derekwaynecarr committed Mar 25, 2015
2 parents f584069 + bfb6b05 commit cfb6f11
Show file tree
Hide file tree
Showing 17 changed files with 722 additions and 137 deletions.
7 changes: 7 additions & 0 deletions pkg/api/latest/latest.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta3"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

// Version is the string that represents the current external default version.
Expand Down Expand Up @@ -122,9 +123,15 @@ func init() {
"Namespace": true,
}

// these kinds should be excluded from the list of resources
ignoredKinds := util.NewStringSet("ListOptions", "DeleteOptions", "Status", "ContainerManifest")

// enumerate all supported versions, get the kinds, and register with the mapper how to address our resources
for _, version := range versions {
for kind := range api.Scheme.KnownTypes(version) {
if ignoredKinds.Has(kind) {
continue
}
mixedCase, found := versionMixedCase[version]
if !found {
mixedCase = false
Expand Down
19 changes: 19 additions & 0 deletions pkg/api/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package rest

import (
"io"
"net/http"
"net/url"

Expand Down Expand Up @@ -148,3 +149,21 @@ type Redirector interface {
// ResourceLocation should return the remote location of the given resource, and an optional transport to use to request it, or an error.
ResourceLocation(ctx api.Context, id string) (remoteLocation *url.URL, transport http.RoundTripper, err error)
}

// ResourceStreamer is an interface implemented by objects that prefer to be streamed from the server
// instead of decoded directly.
type ResourceStreamer interface {
// InputStream should return an io.Reader if the provided object supports streaming. The desired
// api version and a accept header (may be empty) are passed to the call. If no error occurs,
// the caller may return a content type string with the reader that indicates the type of the
// stream.
InputStream(apiVersion, acceptHeader string) (io.ReadCloser, string, error)
}

// StorageMetadata is an optional interface that callers can implement to provide additional
// information about their Storage objects.
type StorageMetadata interface {
// ProducesMIMETypes returns a list of the MIME types the specified HTTP verb (GET, POST, DELETE,
// PATCH) can respond with.
ProducesMIMETypes(verb string) []string
}
44 changes: 37 additions & 7 deletions pkg/apiserver/api_installer.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
patcher, isPatcher := storage.(rest.Patcher)
_, isWatcher := storage.(rest.Watcher)
_, isRedirector := storage.(rest.Redirector)
storageMeta, isMetadata := storage.(rest.StorageMetadata)
if !isMetadata {
storageMeta = defaultStorageMetadata{}
}

var versionedDeleterObject runtime.Object
switch {
Expand Down Expand Up @@ -283,56 +287,70 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
//
// test/integration/auth_test.go is currently the most comprehensive status code test

reqScope := RequestScope{
ContextFunc: ctxFn,
Codec: mapping.Codec,
APIVersion: a.group.Version,
Resource: resource,
Kind: kind,
}
for _, action := range actions {
reqScope.Namer = action.Namer
m := monitorFilter(action.Verb, resource)
switch action.Verb {
case "GET": // Get a resource.
route := ws.GET(action.Path).To(GetResource(getter, ctxFn, action.Namer, mapping.Codec)).
route := ws.GET(action.Path).To(GetResource(getter, reqScope)).
Filter(m).
Doc("read the specified " + kind).
Operation("read" + kind).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
Writes(versionedObject)
addParams(route, action.Params)
ws.Route(route)
case "LIST": // List all resources of a kind.
route := ws.GET(action.Path).To(ListResource(lister, ctxFn, action.Namer, mapping.Codec, a.group.Version, resource)).
route := ws.GET(action.Path).To(ListResource(lister, reqScope)).
Filter(m).
Doc("list objects of kind " + kind).
Operation("list" + kind).
Produces("application/json").
Writes(versionedList)
addParams(route, action.Params)
ws.Route(route)
case "PUT": // Update a resource.
route := ws.PUT(action.Path).To(UpdateResource(updater, ctxFn, action.Namer, mapping.Codec, a.group.Typer, resource, admit)).
route := ws.PUT(action.Path).To(UpdateResource(updater, reqScope, a.group.Typer, admit)).
Filter(m).
Doc("replace the specified " + kind).
Operation("replace" + kind).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
Reads(versionedObject)
addParams(route, action.Params)
ws.Route(route)
case "PATCH": // Partially update a resource
route := ws.PATCH(action.Path).To(PatchResource(patcher, ctxFn, action.Namer, mapping.Codec, a.group.Typer, resource, admit)).
route := ws.PATCH(action.Path).To(PatchResource(patcher, reqScope, a.group.Typer, admit)).
Filter(m).
Doc("partially update the specified " + kind).
// TODO: toggle patch strategy by content type
// Consumes("application/merge-patch+json", "application/json-patch+json").
Operation("patch" + kind).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
Reads(versionedObject)
addParams(route, action.Params)
ws.Route(route)
case "POST": // Create a resource.
route := ws.POST(action.Path).To(CreateResource(creater, ctxFn, action.Namer, mapping.Codec, a.group.Typer, resource, admit)).
route := ws.POST(action.Path).To(CreateResource(creater, reqScope, a.group.Typer, admit)).
Filter(m).
Doc("create a " + kind).
Operation("create" + kind).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...).
Reads(versionedObject)
addParams(route, action.Params)
ws.Route(route)
case "DELETE": // Delete a resource.
route := ws.DELETE(action.Path).To(DeleteResource(gracefulDeleter, isGracefulDeleter, ctxFn, action.Namer, mapping.Codec, resource, kind, admit)).
route := ws.DELETE(action.Path).To(DeleteResource(gracefulDeleter, isGracefulDeleter, reqScope, admit)).
Filter(m).
Doc("delete a " + kind).
Operation("delete" + kind)
Operation("delete" + kind).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), "application/json")...)
if isGracefulDeleter {
route.Reads(versionedDeleterObject)
}
Expand All @@ -343,6 +361,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Filter(m).
Doc("watch a particular " + kind).
Operation("watch" + kind).
Produces("application/json").
Writes(versionedObject)
addParams(route, action.Params)
ws.Route(route)
Expand All @@ -351,6 +370,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Filter(m).
Doc("watch a list of " + kind).
Operation("watch" + kind + "list").
Produces("application/json").
Writes(versionedList)
addParams(route, action.Params)
ws.Route(route)
Expand Down Expand Up @@ -630,3 +650,13 @@ func addParams(route *restful.RouteBuilder, params []*restful.Parameter) {
route.Param(param)
}
}

// defaultStorageMetadata provides default answers to rest.StorageMetadata.
type defaultStorageMetadata struct{}

// defaultStorageMetadata implements rest.StorageMetadata
var _ rest.StorageMetadata = defaultStorageMetadata{}

func (defaultStorageMetadata) ProducesMIMETypes(verb string) []string {
return nil
}
21 changes: 21 additions & 0 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"path"
Expand Down Expand Up @@ -196,6 +197,26 @@ func APIVersionHandler(versions ...string) restful.RouteFunction {
}
}

// write renders a returned runtime.Object to the response as a stream or an encoded object.
func write(statusCode int, apiVersion string, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, req *http.Request) {
if stream, ok := object.(rest.ResourceStreamer); ok {
out, contentType, err := stream.InputStream(apiVersion, req.Header.Get("Accept"))
if err != nil {
errorJSONFatal(err, codec, w)
return
}
defer out.Close()
if len(contentType) == 0 {
contentType = "application/octet-stream"
}
w.Header().Set("Content-Type", contentType)
w.WriteHeader(statusCode)
io.Copy(w, out)
return
}
writeJSON(statusCode, codec, object, w)
}

// writeJSON renders an object as JSON to the response.
func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter) {
output, err := codec.Encode(object)
Expand Down
93 changes: 91 additions & 2 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -122,7 +123,8 @@ func init() {
// defaultAPIServer exposes nested objects for testability.
type defaultAPIServer struct {
http.Handler
group *APIGroupVersion
group *APIGroupVersion
container *restful.Container
}

// uses the default settings
Expand Down Expand Up @@ -169,7 +171,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
ws := new(restful.WebService)
InstallSupport(mux, ws)
container.Add(ws)
return &defaultAPIServer{mux, group}
return &defaultAPIServer{mux, group, container}
}

type Simple struct {
Expand Down Expand Up @@ -212,6 +214,8 @@ type SimpleRESTStorage struct {
updated *Simple
created *Simple

stream *SimpleStream

deleted string
deleteOptions *api.DeleteOptions

Expand Down Expand Up @@ -243,8 +247,34 @@ func (storage *SimpleRESTStorage) List(ctx api.Context, label labels.Selector, f
return result, storage.errors["list"]
}

type SimpleStream struct {
version string
accept string
contentType string
err error

io.Reader
closed bool
}

func (s *SimpleStream) Close() error {
s.closed = true
return nil
}

func (s *SimpleStream) IsAnAPIObject() {}

func (s *SimpleStream) InputStream(version, accept string) (io.ReadCloser, string, error) {
s.version = version
s.accept = accept
return s, s.contentType, s.err
}

func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Object, error) {
storage.checkContext(ctx)
if id == "binary" {
return storage.stream, storage.errors["get"]
}
return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"]
}

Expand Down Expand Up @@ -343,6 +373,15 @@ func (storage LegacyRESTStorage) Delete(ctx api.Context, id string) (runtime.Obj
return storage.SimpleRESTStorage.Delete(ctx, id, nil)
}

type MetadataRESTStorage struct {
*SimpleRESTStorage
types []string
}

func (m *MetadataRESTStorage) ProducesMIMETypes(method string) []string {
return m.types
}

func extractBody(response *http.Response, object runtime.Object) (string, error) {
defer response.Body.Close()
body, err := ioutil.ReadAll(response.Body)
Expand Down Expand Up @@ -646,6 +685,26 @@ func TestSelfLinkSkipsEmptyName(t *testing.T) {
}
}

func TestMetadata(t *testing.T) {
simpleStorage := &MetadataRESTStorage{&SimpleRESTStorage{}, []string{"text/plain"}}
h := handle(map[string]rest.Storage{"simple": simpleStorage})
ws := h.(*defaultAPIServer).container.RegisteredWebServices()
if len(ws) == 0 {
t.Fatal("no web services registered")
}
matches := map[string]int{}
for _, w := range ws {
for _, r := range w.Routes() {
s := strings.Join(r.Produces, ",")
i := matches[s]
matches[s] = i + 1
}
}
if matches["text/plain,application/json"] == 0 || matches["application/json"] == 0 || matches["*/*"] == 0 || len(matches) != 3 {
t.Errorf("unexpected mime types: %v", matches)
}
}

func TestGet(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
Expand Down Expand Up @@ -685,6 +744,36 @@ func TestGet(t *testing.T) {
}
}

func TestGetBinary(t *testing.T) {
simpleStorage := SimpleRESTStorage{
stream: &SimpleStream{
contentType: "text/plain",
Reader: bytes.NewBufferString("response data"),
},
}
stream := simpleStorage.stream
server := httptest.NewServer(handle(map[string]rest.Storage{"simple": &simpleStorage}))
defer server.Close()

req, _ := http.NewRequest("GET", server.URL+"/api/version/simple/binary", nil)
req.Header.Add("Accept", "text/other, */*")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected response: %#v", resp)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !stream.closed || stream.version != "version" || stream.accept != "text/other, */*" ||
resp.Header.Get("Content-Type") != stream.contentType || string(body) != "response data" {
t.Errorf("unexpected stream: %#v", stream)
}
}

func TestGetAlternateSelfLink(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
Expand Down
Loading

0 comments on commit cfb6f11

Please sign in to comment.