Skip to content

Commit

Permalink
Remove asynchronous channel on RESTStorage interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
smarterclayton committed Feb 11, 2015
1 parent d167c11 commit 79cb930
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 134 deletions.
20 changes: 8 additions & 12 deletions pkg/api/rest/resttest/resttest.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func (t *Tester) TestCreateResetsUserData(valid runtime.Object) {
objectMeta.UID = "bad-uid"
objectMeta.CreationTimestamp = now

channel, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
obj, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if obj := <-channel; obj.Object == nil {
t.Fatalf("Unexpected object from channel: %#v", obj)
if obj == nil {
t.Fatalf("Unexpected object from result: %#v", obj)
}
if objectMeta.UID == "bad-uid" || objectMeta.CreationTimestamp == now {
t.Errorf("ObjectMeta did not reset basic fields: %#v", objectMeta)
Expand All @@ -111,12 +111,12 @@ func (t *Tester) TestCreateHasMetadata(valid runtime.Object) {
context = api.NewContext()
}

channel, err := t.storage.(apiserver.RESTCreater).Create(context, valid)
obj, err := t.storage.(apiserver.RESTCreater).Create(context, valid)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if obj := <-channel; obj.Object == nil {
t.Fatalf("Unexpected object from channel: %#v", obj)
if obj == nil {
t.Fatalf("Unexpected object from result: %#v", obj)
}
if !api.HasObjectMetaSystemFieldValues(objectMeta) {
t.Errorf("storage did not populate object meta field values")
Expand Down Expand Up @@ -148,12 +148,8 @@ func (t *Tester) TestCreateGeneratesNameReturnsTryAgain(valid runtime.Object) {

objectMeta.GenerateName = "test-"
t.withStorageError(errors.NewAlreadyExists("kind", "thing"), func() {
ch, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
res := <-ch
if err := errors.FromObject(res.Object); err == nil || !errors.IsTryAgainLater(err) {
_, err := t.storage.(apiserver.RESTCreater).Create(api.NewDefaultContext(), valid)
if err == nil || !errors.IsTryAgainLater(err) {
t.Fatalf("Unexpected error: %v", err)
}
})
Expand Down
44 changes: 21 additions & 23 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,17 +180,17 @@ func (storage *SimpleRESTStorage) Get(ctx api.Context, id string) (runtime.Objec
return api.Scheme.CopyOrDie(&storage.item), storage.errors["get"]
}

func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (<-chan RESTResult, error) {
func (storage *SimpleRESTStorage) Delete(ctx api.Context, id string) (runtime.Object, error) {
storage.deleted = id
if err := storage.errors["delete"]; err != nil {
return nil, err
}
return MakeAsync(func() (runtime.Object, error) {
if storage.injectedFunction != nil {
return storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}})
}
return &api.Status{Status: api.StatusSuccess}, nil
}), nil
var obj runtime.Object = &api.Status{Status: api.StatusSuccess}
var err error
if storage.injectedFunction != nil {
obj, err = storage.injectedFunction(&Simple{ObjectMeta: api.ObjectMeta{Name: id}})
}
return obj, err
}

func (storage *SimpleRESTStorage) New() runtime.Object {
Expand All @@ -201,30 +201,28 @@ func (storage *SimpleRESTStorage) NewList() runtime.Object {
return &SimpleList{}
}

func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) {
func (storage *SimpleRESTStorage) Create(ctx api.Context, obj runtime.Object) (runtime.Object, error) {
storage.created = obj.(*Simple)
if err := storage.errors["create"]; err != nil {
return nil, err
}
return MakeAsync(func() (runtime.Object, error) {
if storage.injectedFunction != nil {
return storage.injectedFunction(obj)
}
return obj, nil
}), nil
var err error
if storage.injectedFunction != nil {
obj, err = storage.injectedFunction(obj)
}
return obj, err
}

func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error) {
func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error) {
storage.updated = obj.(*Simple)
if err := storage.errors["update"]; err != nil {
return nil, err
return nil, false, err
}
return MakeAsync(func() (runtime.Object, error) {
if storage.injectedFunction != nil {
return storage.injectedFunction(obj)
}
return obj, nil
}), nil
var err error
if storage.injectedFunction != nil {
obj, err = storage.injectedFunction(obj)
}
return obj, false, err
}

// Implement ResourceWatcher.
Expand Down Expand Up @@ -994,7 +992,7 @@ func TestCreate(t *testing.T) {
if !reflect.DeepEqual(&itemOut, simple) {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simple, string(body))
}
if response.StatusCode != http.StatusOK {
if response.StatusCode != http.StatusCreated {
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusOK, response)
}
if !selfLinker.called {
Expand Down
25 changes: 0 additions & 25 deletions pkg/apiserver/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,3 @@ func MakeAsync(fn WorkFunc) <-chan RESTResult {
}()
return channel
}

// WorkFunc is used to perform any time consuming work for an api call, after
// the input has been validated. Pass one of these to MakeAsync to create an
// appropriate return value for the Update, Delete, and Create methods.
type WorkResultFunc func() (result RESTResult, err error)

// MakeAsync takes a function and executes it, delivering the result in the way required
// by RESTStorage's Update, Delete, and Create methods.
func MakeAsyncResult(fn WorkResultFunc) <-chan RESTResult {
channel := make(chan RESTResult)
go func() {
defer util.HandleCrash()
obj, err := fn()
if err != nil {
channel <- RESTResult{Object: errToAPIStatus(err)}
} else {
channel <- obj
}
// 'close' is used to signal that no further values will
// be written to the channel. Not strictly necessary, but
// also won't hurt.
close(channel)
}()
return channel
}
13 changes: 7 additions & 6 deletions pkg/apiserver/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ type RESTDeleter interface {
// Delete finds a resource in the storage and deletes it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Delete(ctx api.Context, id string) (<-chan RESTResult, error)
// Delete *may* return the object that was deleted, or a status object indicating additional
// information about deletion.
Delete(ctx api.Context, id string) (runtime.Object, error)
}

type RESTCreater interface {
Expand All @@ -61,7 +63,7 @@ type RESTCreater interface {
New() runtime.Object

// Create creates a new version of a resource.
Create(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error)
Create(ctx api.Context, obj runtime.Object) (runtime.Object, error)
}

type RESTUpdater interface {
Expand All @@ -70,10 +72,9 @@ type RESTUpdater interface {
New() runtime.Object

// Update finds a resource in the storage and updates it. Some implementations
// may allow updates creates the object - they should set the Created flag of
// the returned RESTResult to true. In the event of an asynchronous error returned
// via an api.Status object, the Created flag is ignored.
Update(ctx api.Context, obj runtime.Object) (<-chan RESTResult, error)
// may allow updates creates the object - they should set the created boolean
// to true.
Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool, error)
}

// RESTResult indicates the result of a REST transformation.
Expand Down
94 changes: 48 additions & 46 deletions pkg/apiserver/resthandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package apiserver

import (
"fmt"
"net/http"
"time"

Expand Down Expand Up @@ -145,29 +144,20 @@ func CreateResource(r RESTCreater, namespaceFn ResourceNamespaceFunc, linkFn Lin
return
}

out, err := r.Create(ctx, obj)
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.Create(ctx, obj)
})
if err != nil {
errorJSON(err, codec, w)
return
}

result, err := finishRequest(out, timeout, codec)
if err != nil {
if err := linkFn(req, result); err != nil {
errorJSON(err, codec, w)
return
}

item := result.Object
if err := linkFn(req, item); err != nil {
errorJSON(err, codec, w)
return
}

status := http.StatusOK
if result.Created {
status = http.StatusCreated
}
writeJSON(status, codec, item, w)
writeJSON(http.StatusCreated, codec, result, w)
}
}

Expand Down Expand Up @@ -223,29 +213,27 @@ func UpdateResource(r RESTUpdater, nameFn ResourceNameFunc, objNameFunc ObjectNa
return
}

out, err := r.Update(ctx, obj)
if err != nil {
errorJSON(err, codec, w)
return
}

result, err := finishRequest(out, timeout, codec)
wasCreated := false
result, err := finishRequest(timeout, func() (runtime.Object, error) {
obj, created, err := r.Update(ctx, obj)
wasCreated = created
return obj, err
})
if err != nil {
errorJSON(err, codec, w)
return
}

item := result.Object
if err := linkFn(req, item); err != nil {
if err := linkFn(req, result); err != nil {
errorJSON(err, codec, w)
return
}

status := http.StatusOK
if result.Created {
if wasCreated {
status = http.StatusCreated
}
writeJSON(status, codec, item, w)
writeJSON(status, codec, result, w)
}
}

Expand Down Expand Up @@ -273,48 +261,62 @@ func DeleteResource(r RESTDeleter, nameFn ResourceNameFunc, linkFn LinkResourceF
return
}

out, err := r.Delete(ctx, name)
if err != nil {
errorJSON(err, codec, w)
return
}

result, err := finishRequest(out, timeout, codec)
result, err := finishRequest(timeout, func() (runtime.Object, error) {
return r.Delete(ctx, name)
})
if err != nil {
errorJSON(err, codec, w)
return
}

// if the RESTDeleter returns a nil object, fill out a status. Callers may return a valid
// object with the response.
item := result.Object
if item == nil {
item = &api.Status{
if result == nil {
result = &api.Status{
Status: api.StatusSuccess,
Code: http.StatusOK,
Details: &api.StatusDetails{
ID: name,
Kind: kind,
},
}
} else {
// when a non-status response is returned, set the self link
if _, ok := result.(*api.Status); !ok {
if err := linkFn(req, result); err != nil {
errorJSON(err, codec, w)
return
}
}
}
writeJSON(http.StatusOK, codec, item, w)
writeJSON(http.StatusOK, codec, result, w)
}
}

// finishRequest waits for the result channel to close or clear, and writes the appropriate response.
// resultFunc is a function that returns a rest result and can be run in a goroutine
type resultFunc func() (runtime.Object, error)

// finishRequest makes a given resultFunc asynchronous and handles errors returned by the response.
// Any api.Status object returned is considered an "error", which interrupts the normal response flow.
func finishRequest(ch <-chan RESTResult, timeout time.Duration, codec runtime.Codec) (*RESTResult, error) {
select {
case result, ok := <-ch:
if !ok {
// likely programming error
return nil, fmt.Errorf("operation channel closed without returning result")
func finishRequest(timeout time.Duration, fn resultFunc) (result runtime.Object, err error) {
ch := make(chan runtime.Object)
errCh := make(chan error)
go func() {
if result, err := fn(); err != nil {
errCh <- err
} else {
ch <- result
}
if status, ok := result.Object.(*api.Status); ok {
}()

select {
case result = <-ch:
if status, ok := result.(*api.Status); ok {
return nil, errors.FromObject(status)
}
return &result, nil
return result, nil
case err = <-errCh:
return nil, err
case <-time.After(timeout):
return nil, errors.NewTimeoutError("request did not complete within allowed duration")
}
Expand Down
26 changes: 4 additions & 22 deletions pkg/master/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package master

import (
"fmt"
"net"
"strconv"
"time"
Expand Down Expand Up @@ -92,15 +91,8 @@ func (m *Master) createMasterNamespaceIfNeeded(ns string) error {
Namespace: "",
},
}
c, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace)
if err != nil {
return err
}
resp := <-c
if _, ok := resp.Object.(*api.Service); ok {
return nil
}
return fmt.Errorf("unexpected response %#v", resp)
_, err := m.storage["namespaces"].(apiserver.RESTCreater).Create(ctx, namespace)
return err
}

// createMasterServiceIfNeeded will create the specified service if it
Expand All @@ -126,18 +118,8 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I
SessionAffinity: api.AffinityTypeNone,
},
}
// Kids, don't do this at home: this is a hack. There's no good way to call the business
// logic which lives in the REST object from here.
c, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc)
if err != nil {
return err
}
resp := <-c
if _, ok := resp.Object.(*api.Service); ok {
// If all worked, we get back an *api.Service object.
return nil
}
return fmt.Errorf("unexpected response: %#v", resp.Object)
_, err := m.storage["services"].(apiserver.RESTCreater).Create(ctx, svc)
return err
}

// ensureEndpointsContain sets the endpoints for the given service. Also removes
Expand Down

0 comments on commit 79cb930

Please sign in to comment.