Skip to content

Commit

Permalink
Address comments; also, demonstrate one more property in test.
Browse files Browse the repository at this point in the history
  • Loading branch information
lavalamp committed Jun 26, 2014
1 parent c9246dc commit afd13ed
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 39 deletions.
2 changes: 2 additions & 0 deletions pkg/api/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func init() {
MinionList{},
Minion{},
Status{},
ServerOpList{},
ServerOp{},
)
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,3 +207,14 @@ const (
StatusFailure = "failure"
StatusWorking = "working"
)

// Operation information, as delivered to API clients.
type ServerOp struct {
JSONBase `yaml:",inline" json:",inline"`
}

// Operation list, as delivered to API clients.
type ServerOpList struct {
JSONBase `yaml:",inline" json:",inline"`
Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"`
}
11 changes: 8 additions & 3 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@ type RESTStorage interface {
Update(interface{}) (<-chan interface{}, error)
}

// WorkFunc is used to perform any time consuming work for an api call, after
// the input has been validated. Pass one of these to MakeAsync to create an
// appropriate return value for the Update, Delete, and Create methods.
type WorkFunc func() (result interface{}, err error)

// MakeAsync takes a function and executes it, delivering the result in the way required
// by RESTStorage's Update, Delete, and Create methods.
func MakeAsync(fn func() (interface{}, error)) <-chan interface{} {
func MakeAsync(fn WorkFunc) <-chan interface{} {
channel := make(chan interface{})
go func() {
defer util.HandleCrash()
Expand Down Expand Up @@ -171,7 +176,7 @@ func (server *ApiServer) finishReq(out <-chan interface{}, sync bool, timeout ti
if sync {
op.WaitFor(timeout)
}
obj, complete := op.Describe()
obj, complete := op.StatusOrResult()
if complete {
server.write(http.StatusOK, obj, w)
} else {
Expand Down Expand Up @@ -308,7 +313,7 @@ func (server *ApiServer) handleOperationRequest(parts []string, w http.ResponseW
server.notFound(req, w)
}

obj, complete := op.Describe()
obj, complete := op.StatusOrResult()
if complete {
server.write(http.StatusOK, obj, w)
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/apiserver/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type SimpleRESTStorage struct {
updated Simple
created Simple

// called when answering update, delete, create
// If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj recieves the original input to the update, delete, or create call.
injectedFunction func(obj interface{}) (returnObj interface{}, err error)
}

Expand Down
58 changes: 26 additions & 32 deletions pkg/apiserver/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,16 @@ limitations under the License.
package apiserver

import (
"fmt"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

func init() {
api.AddKnownTypes(ServerOp{}, ServerOpList{})
}

// Operation information, as delivered to API clients.
type ServerOp struct {
api.JSONBase `yaml:",inline" json:",inline"`
}

// Operation list, as delivered to API clients.
type ServerOpList struct {
api.JSONBase `yaml:",inline" json:",inline"`
Items []ServerOp `yaml:"items,omitempty" json:"items,omitempty"`
}

// Operation represents an ongoing action which the server is performing.
type Operation struct {
ID string
Expand All @@ -53,9 +39,12 @@ type Operation struct {

// Operations tracks all the ongoing operations.
type Operations struct {
lock sync.Mutex
ops map[string]*Operation
nextID int
// Access only using functions from atomic.
lastID int64

// 'lock' guards the ops map.
lock sync.Mutex
ops map[string]*Operation
}

// Returns a new Operations repository.
Expand All @@ -67,25 +56,28 @@ func NewOperations() *Operations {
return ops
}

// Add a new operation.
// Add a new operation. Lock-free.
func (ops *Operations) NewOperation(from <-chan interface{}) *Operation {
ops.lock.Lock()
defer ops.lock.Unlock()
id := fmt.Sprintf("%v", ops.nextID)
ops.nextID++

id := atomic.AddInt64(&ops.lastID, 1)
op := &Operation{
ID: id,
ID: strconv.FormatInt(id, 10),
awaiting: from,
notify: make(chan bool, 1),
}
go op.wait()
ops.ops[id] = op
go ops.insert(op)
return op
}

// Inserts op into the ops map.
func (ops *Operations) insert(op *Operation) {
ops.lock.Lock()
defer ops.lock.Unlock()
ops.ops[op.ID] = op
}

// List operations for an API client.
func (ops *Operations) List() ServerOpList {
func (ops *Operations) List() api.ServerOpList {
ops.lock.Lock()
defer ops.lock.Unlock()

Expand All @@ -94,9 +86,9 @@ func (ops *Operations) List() ServerOpList {
ids = append(ids, id)
}
sort.StringSlice(ids).Sort()
ol := ServerOpList{}
ol := api.ServerOpList{}
for _, id := range ids {
ol.Items = append(ol.Items, ServerOp{JSONBase: api.JSONBase{ID: id}})
ol.Items = append(ol.Items, api.ServerOp{JSONBase: api.JSONBase{ID: id}})
}
return ol
}
Expand Down Expand Up @@ -124,7 +116,9 @@ func (ops *Operations) expire(maxAge time.Duration) {

// Waits forever for the operation to complete; call via go when
// the operation is created. Sets op.finished when the operation
// does complete. Does not keep op locked while waiting.
// does complete, and sends on the notify channel, in case there
// are any WaitFor() calls in progress.
// Does not keep op locked while waiting.
func (op *Operation) wait() {
defer util.HandleCrash()
result := <-op.awaiting
Expand Down Expand Up @@ -161,7 +155,7 @@ func (op *Operation) expired(limitTime time.Time) bool {

// Return status information or the result of the operation if it is complete,
// with a bool indicating true in the latter case.
func (op *Operation) Describe() (description interface{}, finished bool) {
func (op *Operation) StatusOrResult() (description interface{}, finished bool) {
op.lock.Lock()
defer op.lock.Unlock()

Expand Down
23 changes: 20 additions & 3 deletions pkg/apiserver/operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package apiserver

import (
"sync/atomic"
"testing"
"time"
)
Expand All @@ -26,6 +27,10 @@ func TestOperation(t *testing.T) {

c := make(chan interface{})
op := ops.NewOperation(c)
// Allow context switch, so that op's ID can get added to the map and Get will work.
// This is just so we can test Get. Ordinary users have no need to call Get immediately
// after calling NewOperation, because it returns the operation directly.
time.Sleep(time.Millisecond)
go func() {
time.Sleep(500 * time.Millisecond)
c <- "All done"
Expand All @@ -40,16 +45,28 @@ func TestOperation(t *testing.T) {
}

op.WaitFor(10 * time.Millisecond)
if _, completed := op.Describe(); completed {
if _, completed := op.StatusOrResult(); completed {
t.Errorf("Unexpectedly fast completion")
}

op.WaitFor(time.Second)
if _, completed := op.Describe(); !completed {
const waiters = 10
var waited int32
for i := 0; i < waiters; i++ {
go func() {
op.WaitFor(time.Hour)
atomic.AddInt32(&waited, 1)
}()
}

op.WaitFor(time.Minute)
if _, completed := op.StatusOrResult(); !completed {
t.Errorf("Unexpectedly slow completion")
}

time.Sleep(100 * time.Millisecond)
if waited != waiters {
t.Errorf("Multiple waiters doesn't work, only %v finished", waited)
}

if op.expired(time.Now().Add(-time.Second)) {
t.Errorf("Should not be expired: %#v", op)
Expand Down

0 comments on commit afd13ed

Please sign in to comment.