Skip to content

Commit

Permalink
Merge pull request #269 from lavalamp/exampleEtcd
Browse files Browse the repository at this point in the history
Generic atomic update code
  • Loading branch information
brendandburns committed Jun 27, 2014
2 parents 4102abe + 54ad458 commit 4fdf55e
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 77 deletions.
6 changes: 6 additions & 0 deletions cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ func main() {
util.InitLogs()
defer util.FlushLogs()

go func() {
defer util.FlushLogs()
time.Sleep(3 * time.Minute)
glog.Fatalf("This test has timed out.")
}()

manifestUrl := ServeCachedManifestFile()
// Setup
servers := []string{"http://localhost:4001"}
Expand Down
13 changes: 10 additions & 3 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,10 +436,17 @@ func TestSyncManifestsDeletes(t *testing.T) {
err := kubelet.SyncManifests([]api.ContainerManifest{})
expectNoError(t, err)
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop"})

// A map interation is used to delete containers, so must not depend on
// order here.
expectedToStop := map[string]bool{
"1234": true,
"9876": true,
}
if len(fakeDocker.stopped) != 2 ||
fakeDocker.stopped[0] != "1234" ||
fakeDocker.stopped[1] != "9876" {
t.Errorf("Unexpected sequence of stopped containers: %s", fakeDocker.stopped)
!expectedToStop[fakeDocker.stopped[0]] ||
!expectedToStop[fakeDocker.stopped[1]] {
t.Errorf("Wrong containers were stopped: %v", fakeDocker.stopped)
}
}

Expand Down
86 changes: 32 additions & 54 deletions pkg/registry/etcd_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package registry

import (
"encoding/json"
"fmt"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
Expand Down Expand Up @@ -90,56 +89,37 @@ func makeContainerKey(machine string) string {
return "/registry/hosts/" + machine + "/kubelet"
}

func (registry *EtcdRegistry) loadManifests(machine string) (manifests []api.ContainerManifest, index uint64, err error) {
err, index = registry.helper().ExtractObj(makeContainerKey(machine), &manifests, true)
return manifests, index, err
}

func (registry *EtcdRegistry) updateManifests(machine string, manifests []api.ContainerManifest, index uint64) error {
if index != 0 {
return registry.helper().CompareAndSwapObj(makeContainerKey(machine), manifests, index)
} else {
return registry.helper().SetObj(makeContainerKey(machine), manifests)
}
}

func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error {
podOut, machine, err := registry.findPod(pod.ID)
if err == nil {
// TODO: this error message looks racy.
return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut)
}
return registry.runPod(pod, machineIn)
}

func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error {
manifests, index, err := registry.loadManifests(machine)
if err != nil {
return err
}
podKey := makePodKey(machine, pod.ID)
err := registry.helper().SetObj(podKey, pod)

key := makePodKey(machine, pod.ID)
data, err := json.Marshal(pod)
manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
if err != nil {
return err
}
_, err = registry.etcdClient.Create(key, string(data), 0)

manifest, err := registry.manifestFactory.MakeManifest(machine, pod)
contKey := makeContainerKey(machine)
err = registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*[]api.ContainerManifest)
return append(manifests, manifest), nil
})
if err != nil {
return err
}
for {
manifests = append(manifests, manifest)
err = registry.updateManifests(machine, manifests, index)
if util.IsEtcdConflict(err) {
manifests, index, err = registry.loadManifests(machine)
if err != nil {
return err
}
continue
// Don't strand stuff.
_, err2 := registry.etcdClient.Delete(podKey, false)
if err2 != nil {
glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2)
}
return err
}
return err
}

func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error {
Expand All @@ -155,12 +135,19 @@ func (registry *EtcdRegistry) DeletePod(podID string) error {
}

func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error {
for {
manifests, index, err := registry.loadManifests(machine)
if err != nil {
return err
}
newManifests := make([]api.ContainerManifest, 0)
// First delete the pod, so a scheduler doesn't notice it getting removed from the
// machine and attempt to put it somewhere.
podKey := makePodKey(machine, podID)
_, err := registry.etcdClient.Delete(podKey, true)
if err != nil {
return err
}

// Next, remove the pod from the machine atomically.
contKey := makeContainerKey(machine)
return registry.helper().AtomicUpdate(contKey, &[]api.ContainerManifest{}, func(in interface{}) (interface{}, error) {
manifests := *in.(*[]api.ContainerManifest)
newManifests := make([]api.ContainerManifest, 0, len(manifests))
found := false
for _, manifest := range manifests {
if manifest.Id != podID {
Expand All @@ -175,22 +162,13 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error
// However it is "deleted" so log it and move on
glog.Infof("Couldn't find: %s in %#v", podID, manifests)
}
if err = registry.updateManifests(machine, newManifests, index); err != nil {
if util.IsEtcdConflict(err) {
continue
}
return err
}
break
}
key := makePodKey(machine, podID)
_, err := registry.etcdClient.Delete(key, true)
return err
return newManifests, nil
})
}

func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) {
key := makePodKey(machine, podID)
err, _ = registry.helper().ExtractObj(key, &pod, false)
err = registry.helper().ExtractObj(key, &pod, false)
if err != nil {
return
}
Expand Down Expand Up @@ -225,7 +203,7 @@ func makeControllerKey(id string) string {
func (registry *EtcdRegistry) GetController(controllerID string) (*api.ReplicationController, error) {
var controller api.ReplicationController
key := makeControllerKey(controllerID)
err, _ := registry.helper().ExtractObj(key, &controller, false)
err := registry.helper().ExtractObj(key, &controller, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -264,7 +242,7 @@ func (registry *EtcdRegistry) CreateService(svc api.Service) error {
func (registry *EtcdRegistry) GetService(name string) (*api.Service, error) {
key := makeServiceKey(name)
var svc api.Service
err, _ := registry.helper().ExtractObj(key, &svc, false)
err := registry.helper().ExtractObj(key, &svc, false)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/registry/etcd_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ func TestEtcdDeletePod(t *testing.T) {
expectNoError(t, err)
if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 1 delete, found %#v", fakeClient.DeletedKeys)
}
if fakeClient.DeletedKeys[0] != key {
} else if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
}
response, _ := fakeClient.Get("/registry/hosts/machine/kubelet", false, false)
Expand Down
86 changes: 71 additions & 15 deletions pkg/util/etcd_tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,41 +116,97 @@ func (h *EtcdHelper) ExtractList(key string, slicePtr interface{}) error {
// Unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error.
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (error, uint64) {
func (h *EtcdHelper) ExtractObj(key string, objPtr interface{}, ignoreNotFound bool) error {
_, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err
}

func (h *EtcdHelper) bodyAndExtractObj(key string, objPtr interface{}, ignoreNotFound bool) (body string, modifiedIndex uint64, err error) {
response, err := h.Client.Get(key, false, false)

if err != nil && !IsEtcdNotFound(err) {
return err, 0
return "", 0, err
}
if err != nil || response.Node == nil || len(response.Node.Value) == 0 {
if ignoreNotFound {
pv := reflect.ValueOf(objPtr)
pv.Elem().Set(reflect.Zero(pv.Type().Elem()))
return nil, 0
return "", 0, nil
} else if err != nil {
return err, 0
return "", 0, err
}
return fmt.Errorf("key '%v' found no nodes field: %#v", key, response), 0
return "", 0, fmt.Errorf("key '%v' found no nodes field: %#v", key, response)
}
return json.Unmarshal([]byte(response.Node.Value), objPtr), response.Node.ModifiedIndex
body = response.Node.Value
return body, response.Node.ModifiedIndex, json.Unmarshal([]byte(body), objPtr)
}

// CompareAndSwapObj marshals obj via json, and stores under key so long as index matches the previous modified index
func (h *EtcdHelper) CompareAndSwapObj(key string, obj interface{}, index uint64) error {
// SetObj marshals obj via json, and stores under key.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
return err
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, "", index)
_, err = h.Client.Set(key, string(data), 0)
return err
}

// SetObj marshals obj via json, and stores under key.
func (h *EtcdHelper) SetObj(key string, obj interface{}) error {
data, err := json.Marshal(obj)
if err != nil {
// Pass an EtcdUpdateFunc to EtcdHelper.AtomicUpdate to make an atomic etcd update.
// See the comment for AtomicUpdate for more detail.
type EtcdUpdateFunc func(input interface{}) (output interface{}, err error)

// AtomicUpdate generalizes the pattern that allows for making atomic updates to etcd objects.
// Note, tryUpdate may be called more than once.
//
// Example:
//
// h := &util.EtcdHelper{client}
// err := h.AtomicUpdate("myKey", &MyType{}, func(input interface{}) (interface{}, error) {
// // Before this function is called, currentObj has been reset to etcd's current
// // contents for "myKey".
//
// cur := input.(*MyType) // Gauranteed to work.
//
// // Make a *modification*.
// cur.Counter++
//
// // Return the modified object. Return an error to stop iterating.
// return cur, nil
// })
//
func (h *EtcdHelper) AtomicUpdate(key string, ptrToType interface{}, tryUpdate EtcdUpdateFunc) error {
pt := reflect.TypeOf(ptrToType)
if pt.Kind() != reflect.Ptr {
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
for {
obj := reflect.New(pt.Elem()).Interface()
origBody, index, err := h.bodyAndExtractObj(key, obj, true)
if err != nil {
return err
}

ret, err := tryUpdate(obj)
if err != nil {
return err
}

// First time this key has been used, just set.
// TODO: This is racy. Fix when our client supports prevExist. See:
// https://github.com/coreos/etcd/blob/master/Documentation/api.md#atomic-compare-and-swap
if index == 0 {
return h.SetObj(key, ret)
}

data, err := json.Marshal(ret)
if err != nil {
return err
}
_, err = h.Client.CompareAndSwap(key, string(data), 0, origBody, index)
if IsEtcdConflict(err) {
continue
}
return err
}
_, err = h.Client.Set(key, string(data), 0)
return err
}
6 changes: 3 additions & 3 deletions pkg/util/etcd_tools_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestExtractObj(t *testing.T) {
fakeClient.Set("/some/key", MakeJSONString(expect), 0)
helper := EtcdHelper{fakeClient}
var got testMarshalType
err, _ := helper.ExtractObj("/some/key", &got, false)
err := helper.ExtractObj("/some/key", &got, false)
if err != nil {
t.Errorf("Unexpected error %#v", err)
}
Expand Down Expand Up @@ -120,11 +120,11 @@ func TestExtractObjNotFoundErr(t *testing.T) {
helper := EtcdHelper{fakeClient}
try := func(key string) {
var got testMarshalType
err, _ := helper.ExtractObj(key, &got, false)
err := helper.ExtractObj(key, &got, false)
if err == nil {
t.Errorf("%s: wanted error but didn't get one", key)
}
err, _ = helper.ExtractObj(key, &got, true)
err = helper.ExtractObj(key, &got, true)
if err != nil {
t.Errorf("%s: didn't want error but got %#v", key, err)
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/fake_etcd_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response,
f.t.Errorf("Unexpected get for %s", key)
return &etcd.Response{}, &etcd.EtcdError{ErrorCode: 100} // Key not found
}
f.t.Logf("returning %v: %v %#v", key, result.R, result.E)
return result.R, result.E
}

Expand All @@ -85,6 +86,13 @@ func (f *FakeEtcdClient) Create(key, value string, ttl uint64) (*etcd.Response,
return f.Set(key, value, ttl)
}
func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
f.Data[key] = EtcdResponseWithError{
R: &etcd.Response{
Node: nil,
},
E: &etcd.EtcdError{ErrorCode: 100},
}

f.DeletedKeys = append(f.DeletedKeys, key)
return &etcd.Response{}, f.Err
}
Expand Down

0 comments on commit 4fdf55e

Please sign in to comment.