Skip to content

Commit

Permalink
Merge pull request kubernetes#2047 from smarterclayton/make_request_t…
Browse files Browse the repository at this point in the history
…estable

Make client.Request/RESTClient more testable and fakeable
  • Loading branch information
smarterclayton committed Oct 29, 2014
2 parents 39ee257 + 71fecef commit 2c10dd8
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 173 deletions.
3 changes: 1 addition & 2 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func startComponents(manifestURL string) (apiServerURL string) {
}

cl := client.NewOrDie(&client.Config{Host: apiServer.URL, Version: testapi.Version()})
cl.PollPeriod = time.Second * 1
cl.PollPeriod = time.Millisecond * 100
cl.Sync = true

helper, err := master.NewEtcdHelper(etcdClient, "")
Expand Down Expand Up @@ -312,7 +312,6 @@ func runAtomicPutTest(c *client.Client) {
err := c.Get().
Path("services").
Path(svc.Name).
PollPeriod(100 * time.Millisecond).
Do().
Into(&tmpSvc)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ type Status struct {
type StatusDetails struct {
// The ID attribute of the resource associated with the status StatusReason
// (when there is a single ID which can be described).
// TODO: replace with Name with v1beta3
ID string `json:"id,omitempty" yaml:"id,omitempty"`
// The kind attribute of the resource associated with the status StatusReason.
// On some operations may differ from the requested resource Kind.
Expand Down
76 changes: 76 additions & 0 deletions pkg/client/flags_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"testing"

"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

type fakeFlagSet struct {
t *testing.T
set util.StringSet
}

func (f *fakeFlagSet) StringVar(p *string, name, value, usage string) {
if p == nil {
f.t.Errorf("unexpected nil pointer")
}
if usage == "" {
f.t.Errorf("unexpected empty usage")
}
f.set.Insert(name)
}

func (f *fakeFlagSet) BoolVar(p *bool, name string, value bool, usage string) {
if p == nil {
f.t.Errorf("unexpected nil pointer")
}
if usage == "" {
f.t.Errorf("unexpected empty usage")
}
f.set.Insert(name)
}

func (f *fakeFlagSet) UintVar(p *uint, name string, value uint, usage string) {
if p == nil {
f.t.Errorf("unexpected nil pointer")
}
if usage == "" {
f.t.Errorf("unexpected empty usage")
}
f.set.Insert(name)
}

func TestBindClientConfigFlags(t *testing.T) {
flags := &fakeFlagSet{t, util.StringSet{}}
config := &Config{}
BindClientConfigFlags(flags, config)
if len(flags.set) != 6 {
t.Errorf("unexpected flag set: %#v", flags)
}
}

func TestBindKubeletClientConfigFlags(t *testing.T) {
flags := &fakeFlagSet{t, util.StringSet{}}
config := &KubeletConfig{}
BindKubeletClientConfigFlags(flags, config)
if len(flags.set) != 5 {
t.Errorf("unexpected flag set: %#v", flags)
}
}
193 changes: 145 additions & 48 deletions pkg/client/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,65 @@ import (
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
"github.com/golang/glog"
)

// specialParams lists parameters that are handled specially and which users of Request
// are therefore not allowed to set manually.
var specialParams = util.NewStringSet("sync", "timeout")

// PollFunc is called when a server operation returns 202 accepted. The name of the
// operation is extracted from the response and passed to this function. Return a
// request to retrieve the result of the operation, or false for the second argument
// if polling should end.
type PollFunc func(name string) (*Request, bool)

// HTTPClient is an interface for testing a request object.
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}

// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
type Request struct {
c *RESTClient
err error
verb string
path string
body io.Reader
params map[string]string
selector labels.Selector
timeout time.Duration
sync bool
pollPeriod time.Duration
// required
client HTTPClient
verb string
baseURL *url.URL
codec runtime.Codec

// optional, will be invoked if the server returns a 202 to decide
// whether to poll.
poller PollFunc

// accessible via method setters
path string
params map[string]string
selector labels.Selector
sync bool
timeout time.Duration

// output
err error
body io.Reader
}

// NewRequest creates a new request with the core attributes.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, codec runtime.Codec) *Request {
return &Request{
client: client,
verb: verb,
baseURL: baseURL,
codec: codec,

path: baseURL.Path,
}
}

// Path appends an item to the request path. You must call Path at least once.
Expand All @@ -76,6 +109,9 @@ func (r *Request) Sync(sync bool) *Request {

// Namespace applies the namespace scope to a request
func (r *Request) Namespace(namespace string) *Request {
if r.err != nil {
return r
}
if len(namespace) > 0 {
return r.setParam("namespace", namespace)
}
Expand Down Expand Up @@ -135,6 +171,9 @@ func (r *Request) setParam(paramName, value string) *Request {
r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
return r
}
if r.params == nil {
r.params = make(map[string]string)
}
r.params[paramName] = value
return r
}
Expand Down Expand Up @@ -172,7 +211,7 @@ func (r *Request) Body(obj interface{}) *Request {
case io.Reader:
r.body = t
case runtime.Object:
data, err := r.c.Codec.Encode(t)
data, err := r.codec.Encode(t)
if err != nil {
r.err = err
return r
Expand All @@ -184,21 +223,24 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}

// PollPeriod sets the poll period.
// If the server sends back a "working" status message, then repeatedly poll the server
// to see if the operation has completed yet, waiting 'd' between each poll.
// If you want to handle the "working" status yourself (it'll be delivered as StatusErr),
// set d to 0 to turn off this behavior.
func (r *Request) PollPeriod(d time.Duration) *Request {
// NoPoll indicates a server "working" response should be returned as an error
func (r *Request) NoPoll() *Request {
return r.Poller(nil)
}

// Poller indicates this request should use the specified poll function to determine whether
// a server "working" response should be retried. The poller is responsible for waiting or
// outputting messages to the client.
func (r *Request) Poller(poller PollFunc) *Request {
if r.err != nil {
return r
}
r.pollPeriod = d
r.poller = poller
return r
}

func (r *Request) finalURL() string {
finalURL := *r.c.baseURL
finalURL := *r.baseURL
finalURL.Path = r.path
query := url.Values{}
for key, value := range r.params {
Expand Down Expand Up @@ -227,18 +269,18 @@ func (r *Request) Watch() (watch.Interface, error) {
if err != nil {
return nil, err
}
client := r.c.Client
client := r.client
if client == nil {
client = http.DefaultClient
}
response, err := client.Do(req)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Got status: %v", resp.StatusCode)
}
return watch.NewStreamWatcher(watchjson.NewDecoder(response.Body, r.c.Codec)), nil
return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.codec)), nil
}

// Stream formats and executes the request, and offers streaming of the response.
Expand All @@ -251,49 +293,104 @@ func (r *Request) Stream() (io.ReadCloser, error) {
if err != nil {
return nil, err
}
client := r.c.Client
client := r.client
if client == nil {
client = http.DefaultClient
}
response, err := client.Do(req)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
return response.Body, nil
return resp.Body, nil
}

// Do formats and executes the request. Returns the API object received, or an error.
// Do formats and executes the request. Returns a Result object for easy response
// processing. Handles polling the server in the event a continuation was sent.
func (r *Request) Do() Result {
client := r.client
if client == nil {
client = http.DefaultClient
}

for {
if r.err != nil {
return Result{err: r.err}
}

req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
if err != nil {
return Result{err: err}
}
respBody, created, err := r.c.doRequest(req)

resp, err := client.Do(req)
if err != nil {
if s, ok := err.(APIStatus); ok {
status := s.Status()
if status.Status == api.StatusWorking && r.pollPeriod != 0 {
if status.Details != nil {
id := status.Details.ID
if len(id) > 0 {
glog.Infof("Waiting for completion of /operations/%s", id)
time.Sleep(r.pollPeriod)
// Make a poll request
pollOp := r.c.PollFor(id).PollPeriod(r.pollPeriod)
// Could also say "return r.Do()" but this way doesn't grow the callstack.
r = pollOp
continue
}
}
}
}
return Result{err: err}
}

respBody, created, err := r.transformResponse(resp, req)
if poll, ok := r.shouldPoll(err); ok {
r = poll
continue
}
return Result{respBody, created, err, r.c.Codec}

return Result{respBody, created, err, r.codec}
}
}

// shouldPoll checks the server error for an incomplete operation
// and if found returns a request that would check the response.
// If no polling is necessary or possible, it will return false.
func (r *Request) shouldPoll(err error) (*Request, bool) {
if err == nil || r.poller == nil {
return nil, false
}
apistatus, ok := err.(APIStatus)
if !ok {
return nil, false
}
status := apistatus.Status()
if status.Status != api.StatusWorking {
return nil, false
}
if status.Details == nil || len(status.Details.ID) == 0 {
return nil, false
}
return r.poller(status.Details.ID)
}

// transformResponse converts an API response into a structured API object.
func (r *Request) transformResponse(resp *http.Response, req *http.Request) ([]byte, bool, error) {
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, false, err
}

// Did the server give us a status response?
isStatusResponse := false
var status api.Status
if err := r.codec.DecodeInto(body, &status); err == nil && status.Status != "" {
isStatusResponse = true
}

switch {
case resp.StatusCode < http.StatusOK || resp.StatusCode > http.StatusPartialContent:
if !isStatusResponse {
return nil, false, fmt.Errorf("request [%#v] failed (%d) %s: %s", req, resp.StatusCode, resp.Status, string(body))
}
return nil, false, errors.FromObject(&status)
}

// If the server gave us a status back, look at what it was.
if isStatusResponse && status.Status != api.StatusSuccess {
// "Working" requests need to be handled specially.
// "Failed" requests are clearly just an error and it makes sense to return them as such.
return nil, false, errors.FromObject(&status)
}

created := resp.StatusCode == http.StatusCreated
return body, created, err
}

// Result contains the result of calling Request.Do().
Expand Down
Loading

0 comments on commit 2c10dd8

Please sign in to comment.