Skip to content

Commit

Permalink
Making all operations synchronous
Browse files Browse the repository at this point in the history
  • Loading branch information
nikhiljindal committed Jan 28, 2015
1 parent 790a784 commit de60600
Show file tree
Hide file tree
Showing 15 changed files with 117 additions and 527 deletions.
1 change: 0 additions & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ func startComponents(manifestURL string) (apiServerURL string) {

cl := client.NewOrDie(&client.Config{Host: apiServer.URL, Version: testapi.Version()})
cl.PollPeriod = time.Millisecond * 100
cl.Sync = true

helper, err := master.NewEtcdHelper(etcdClient, "")
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func newApiClient(addr string, port int) *client.Client {
apiServerURL := fmt.Sprintf("http://%s:%d", addr, port)
cl := client.NewOrDie(&client.Config{Host: apiServerURL, Version: testapi.Version()})
cl.PollPeriod = time.Second * 1
cl.Sync = true
return cl
}

Expand Down
18 changes: 6 additions & 12 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,18 +866,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""

// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"

// StatusReasonForbidden means the server can be reached and understood the request, but refuses
// to take any further action. It is the result of the server being configured to deny access for some reason
// to the requested resource by the client.
Expand Down Expand Up @@ -926,6 +914,12 @@ const (
// Status code 422
StatusReasonInvalid StatusReason = "Invalid"

// StatusReasonTimeout means that the request could not be completed within the given time.
// Clients can get this response only when they specified a timeout param in the request.
// The request might succeed with an increased value of timeout param.
// Status code 504
StatusReasonTimeout StatusReason = "Timeout"

// StatusReasonBadRequest means that the request itself was invalid, because the request
// doesn't make any sense, for example deleting a read-only object. This is different than
// StatusReasonInvalid above which indicates that the API call could possibly succeed, but the
Expand Down
12 changes: 0 additions & 12 deletions pkg/api/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,18 +683,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""

// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"

// StatusReasonNotFound means one or more resources required for this operation
// could not be found.
// Details (optional):
Expand Down
12 changes: 0 additions & 12 deletions pkg/api/v1beta2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,18 +643,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""

// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"

// StatusReasonNotFound means one or more resources required for this operation
// could not be found.
// Details (optional):
Expand Down
12 changes: 0 additions & 12 deletions pkg/api/v1beta3/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,18 +884,6 @@ const (
// Status code 500.
StatusReasonUnknown StatusReason = ""

// StatusReasonWorking means the server is processing this request and will complete
// at a future time.
// Details (optional):
// "kind" string - the name of the resource being referenced ("operation" today)
// "id" string - the identifier of the Operation resource where updates
// will be returned
// Headers (optional):
// "Location" - HTTP header populated with a URL that can retrieved the final
// status of this operation.
// Status code 202
StatusReasonWorking StatusReason = "Working"

// StatusReasonNotFound means one or more resources required for this operation
// could not be found.
// Details (optional):
Expand Down
2 changes: 0 additions & 2 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ func NewAPIGroupVersion(storage map[string]RESTStorage, codec runtime.Codec, can
selfLinker: selfLinker,
ops: NewOperations(),
admissionControl: admissionControl,
// Delay just long enough to handle most simple write operations
asyncOpWait: time.Millisecond * 25,
}}
}

Expand Down
197 changes: 51 additions & 146 deletions pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -689,88 +689,6 @@ func TestUpdateMissing(t *testing.T) {
}
}

func TestCreate(t *testing.T) {
wait := sync.WaitGroup{}
wait.Add(1)
simpleStorage := &SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (returnObj runtime.Object, err error) {
wait.Wait()
return &Simple{}, nil
},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}

simple := &Simple{
Other: "foo",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if response.StatusCode != http.StatusAccepted {
t.Errorf("Unexpected response %#v", response)
}

var itemOut api.Status
body, err := extractBody(response, &itemOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}

if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
t.Errorf("Unexpected status: %#v (%s)", itemOut, string(body))
}
wait.Done()
}

func TestCreateInvokesAdmissionControl(t *testing.T) {
wait := sync.WaitGroup{}
wait.Add(1)
simpleStorage := &SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (returnObj runtime.Object, err error) {
wait.Wait()
return &Simple{}, nil
},
}
handler := Handle(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}

simple := &Simple{
Other: "foo",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusForbidden {
t.Errorf("Unexpected response %#v", response)
}
}

func TestCreateNotFound(t *testing.T) {
handler := Handle(map[string]RESTStorage{
"simple": &SimpleRESTStorage{
Expand Down Expand Up @@ -831,7 +749,7 @@ func (s *setTestSelfLinker) SetSelfLink(obj runtime.Object, selfLink string) err
return nil
}

func TestSyncCreate(t *testing.T) {
func TestCreate(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
time.Sleep(5 * time.Millisecond)
Expand All @@ -855,7 +773,7 @@ func TestSyncCreate(t *testing.T) {
Other: "bar",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo?sync=true", bytes.NewBuffer(data))
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down Expand Up @@ -889,6 +807,51 @@ func TestSyncCreate(t *testing.T) {
}
}

func TestCreateInvokesAdmissionControl(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
time.Sleep(5 * time.Millisecond)
return obj, nil
},
}
selfLinker := &setTestSelfLinker{
t: t,
name: "bar",
namespace: "other",
expectedSet: "/prefix/version/ns/other/foo/bar",
}
handler := Handle(map[string]RESTStorage{
"foo": &storage,
}, codec, "/prefix", testVersion, selfLinker, deny.NewAlwaysDeny())
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}

simple := &Simple{
Other: "bar",
}
data, _ := codec.Encode(simple)
request, err := http.NewRequest("POST", server.URL+"/prefix/version/ns/other/foo", bytes.NewBuffer(data))
if err != nil {
t.Errorf("unexpected error: %v", err)
}

wg := sync.WaitGroup{}
wg.Add(1)
var response *http.Response
go func() {
response, err = client.Do(request)
wg.Done()
}()
wg.Wait()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusForbidden {
t.Errorf("Unexpected status: %d, Expected: %d, %#v", response.StatusCode, http.StatusForbidden, response)
}
}

func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *api.Status {
client := http.Client{}
request, err := http.NewRequest(method, url, bytes.NewBuffer(data))
Expand All @@ -913,14 +876,13 @@ func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *a
return &status
}

func TestAsyncDelayReturnsError(t *testing.T) {
func TestDelayReturnsError(t *testing.T) {
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
return nil, apierrs.NewAlreadyExists("foo", "bar")
},
}
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = time.Millisecond / 2
server := httptest.NewServer(handler)
defer server.Close()

Expand All @@ -930,63 +892,6 @@ func TestAsyncDelayReturnsError(t *testing.T) {
}
}

func TestAsyncCreateError(t *testing.T) {
ch := make(chan struct{})
storage := SimpleRESTStorage{
injectedFunction: func(obj runtime.Object) (runtime.Object, error) {
<-ch
return nil, apierrs.NewAlreadyExists("foo", "bar")
},
}
selfLinker := &setTestSelfLinker{
t: t,
name: "bar",
expectedSet: "/prefix/version/foo/bar",
}
handler := Handle(map[string]RESTStorage{"foo": &storage}, codec, "/prefix", testVersion, selfLinker, admissionControl)
handler.(*defaultAPIServer).group.handler.asyncOpWait = 0
server := httptest.NewServer(handler)
defer server.Close()

simple := &Simple{Other: "bar"}
data, _ := codec.Encode(simple)

status := expectApiStatus(t, "POST", fmt.Sprintf("%s/prefix/version/foo", server.URL), data, http.StatusAccepted)
if status.Status != api.StatusWorking || status.Details == nil || status.Details.ID == "" {
t.Errorf("Unexpected status %#v", status)
}

otherStatus := expectApiStatus(t, "GET", fmt.Sprintf("%s/prefix/version/operations/%s", server.URL, status.Details.ID), []byte{}, http.StatusAccepted)
if !reflect.DeepEqual(status, otherStatus) {
t.Errorf("Expected %#v, Got %#v", status, otherStatus)
}

ch <- struct{}{}
time.Sleep(time.Millisecond)

finalStatus := expectApiStatus(t, "GET", fmt.Sprintf("%s/prefix/version/operations/%s?after=1", server.URL, status.Details.ID), []byte{}, http.StatusOK)
expectedErr := apierrs.NewAlreadyExists("foo", "bar")
expectedStatus := &api.Status{
Status: api.StatusFailure,
Code: http.StatusConflict,
Reason: "AlreadyExists",
Message: expectedErr.Error(),
Details: &api.StatusDetails{
Kind: "foo",
ID: "bar",
},
}
if !reflect.DeepEqual(expectedStatus, finalStatus) {
t.Errorf("Expected %#v, Got %#v", expectedStatus, finalStatus)
if finalStatus.Details != nil {
t.Logf("Details %#v, Got %#v", *expectedStatus.Details, *finalStatus.Details)
}
}
if !selfLinker.called {
t.Errorf("Never set self link")
}
}

type UnregisteredAPIObject struct {
Value string
}
Expand Down Expand Up @@ -1031,7 +936,7 @@ func TestWriteRAWJSONMarshalError(t *testing.T) {
}
}

func TestSyncCreateTimeout(t *testing.T) {
func TestCreateTimeout(t *testing.T) {
testOver := make(chan struct{})
defer close(testOver)
storage := SimpleRESTStorage{
Expand All @@ -1049,8 +954,8 @@ func TestSyncCreateTimeout(t *testing.T) {

simple := &Simple{Other: "foo"}
data, _ := codec.Encode(simple)
itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?sync=true&timeout=4ms", data, http.StatusAccepted)
if itemOut.Status != api.StatusWorking || itemOut.Details == nil || itemOut.Details.ID == "" {
itemOut := expectApiStatus(t, "POST", server.URL+"/prefix/version/foo?timeout=4ms", data, http.StatusAccepted)
if itemOut.Status != api.StatusFailure || itemOut.Reason != api.StatusReasonTimeout {
t.Errorf("Unexpected status %#v", itemOut)
}
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/apiserver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,8 @@ func (op *Operation) StatusOrResult() (description RESTResult, finished bool) {

if op.finished == nil {
return RESTResult{Object: &api.Status{
Status: api.StatusWorking,
Reason: api.StatusReasonWorking,
Details: &api.StatusDetails{ID: op.ID, Kind: "operation"},
Status: api.StatusFailure,
Reason: api.StatusReasonTimeout,
}}, false
}
return op.result, true
Expand Down
Loading

0 comments on commit de60600

Please sign in to comment.