Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic atomic update code #269

Merged
merged 4 commits into from
Jun 27, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Generic atomic update code
  • Loading branch information
lavalamp committed Jun 27, 2014
commit 529870d12150b05387dbb18c51c62523f69df2a1
8 changes: 4 additions & 4 deletions pkg/registry/etcd_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func makeContainerKey(machine string) string {
}

func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) {
err, index = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
index, err = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
return manifests, index, err
}

Expand Down Expand Up @@ -190,7 +190,7 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error

func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID)
err, _ = registry.helper().ExtractObj(key, &pod, false)
_, err = registry.helper().ExtractObj(key, &pod, false)
if err != nil {
return
}
Expand Down Expand Up @@ -225,7 +225,7 @@ func makeControllerKey(id string) string {
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
var controller api.ReplicationController
key := makeControllerKey(controllerID)
err, _ := registry.helper().ExtractObj(key, &controller, false)
_, err := registry.helper().ExtractObj(key, &controller, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -264,7 +264,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error {
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
key := makeServiceKey(name)
var svc api.Service
err, _ := registry.helper().ExtractObj(key, &svc, false)
_, err := registry.helper().ExtractObj(key, &svc, false)
if err != nil {
return nil, err
}
Expand Down
86 changes: 71 additions & 15 deletions pkg/util/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,41 +116,97 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
// Unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error.
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (error, uint64) {
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (modifiedIndex uint64, err error) {
_, modifiedIndex, err = h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return modifiedIndex, err
}

// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches
// the previous modified index.
func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", index)
return err
}

/*
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err
}*/

func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
response, err := h.Client.Get(key, false, false)

if err != nil && !IsEtcdNotFound(err) {
return err, 0
return "", 0, err
}
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
if ignoreNotFound {
pv := reflect.ValueOf(objPtr)
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
return nil, 0
return "", 0, nil
} else if err != nil {
return err, 0
return "", 0, err
}
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response), 0
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
return json.Unmarshal([]byte(response.Node.Value), objPtr), response.Node.ModifiedIndex
body = response.Node.Value
return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr)
}

// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches the previous modified index
func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error {
// SetObj marshals obj via json, and stores under key.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", index)
_, err = h.Client.Set(key, string(data), 0)
return err
}

// SetObj marshals obj via json, and stores under key.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
// See the comment for AtomicUpdate for more detail.
type EtcdUpdateFunc func() (interface{}, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe have this function take the interface in? That way you don't have to rely on scope binding like you do above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was trying to avoid forcing people to do a type assertion, but on second thought, passing it in would allow one to use a non-closure as an EtcdUpdateFunc, so I guess I'll change it.


// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
// Note, tryUpdate may be called more than once.
//
// Example:
//
// h := &util.EtcdHelper{client}
// var currentObj MyType
// err := h.AtomicUpdate("myKey", &currentObj, func() (interface{}, error) {
// // Before this function is called, currentObj has been reset to etcd's current
// // contents for "myKey".
//
// // Make a *modification*.
// currentObj.Counter++
//
// // Return the modified object. Return an error to stop iterating.
// return currentObj, nil
// })
//
func (h *EtcdHelper) AtomicUpdate(key string, objPtr interface{}, tryUpdate EtcdUpdateFunc) error {
for {
origBody, index, err := h.bodyAndExtractObj(key, objPtr, true)
if err != nil {
return err
}

ret, err := tryUpdate()
if err != nil {
return err
}

data, err := json.Marshal(ret)
if err != nil {
return err
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
return err
}
_, err = h.Client.Set(key, string(data), 0)
return err
}
6 changes: 3 additions & 3 deletions pkg/util/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) {
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
helper := EtcdHelper{fakeClient}
var got testMarshalType
err, _ := helper.ExtractObj("/some/key", &got, false)
_, err := helper.ExtractObj("/some/key", &got, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
Expand Down Expand Up @@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) {
helper := EtcdHelper{fakeClient}
try := func(key string) {
var got testMarshalType
err, _ := helper.ExtractObj(key, &got, false)
_, err := helper.ExtractObj(key, &got, false)
if err == nil {
t.Errorf("%s: wanted error but didn't get one", key)
}
err, _ = helper.ExtractObj(key, &got, true)
_, err = helper.ExtractObj(key, &got, true)
if err != nil {
t.Errorf("%s: didn't want error but got %#v", key, err)
}
Expand Down