From 5cdce0e35ae1579f36249f82447fe2e93a915b53 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 21 Jul 2014 21:26:03 -0700 Subject: [PATCH 1/3] Prepare for external scheduler 1. Change names of Pod statuses (Waiting, Running, Terminated). 2. Store assigned host in etcd. 3. Change pod key to /registry/pods/. Container location remains the same (/registry/hosts//kublet). --- pkg/api/types.go | 7 +- pkg/api/v1beta1/types.go | 7 +- pkg/controller/replication_controller.go | 2 +- pkg/master/pod_cache.go | 4 +- pkg/registry/etcdregistry.go | 147 +++++++++++++---------- pkg/registry/etcdregistry_test.go | 71 ++++++----- pkg/registry/podstorage.go | 12 +- pkg/registry/podstorage_test.go | 27 +++-- 8 files changed, 158 insertions(+), 119 deletions(-) diff --git a/pkg/api/types.go b/pkg/api/types.go index 3c7b2df2c8634..5972cd0bc24d2 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -208,9 +208,12 @@ type PodStatus string // These are the valid statuses of pods. const ( + // PodWaiting means that we're waiting for the pod to begin running. + PodWaiting = "Waiting" + // PodRunning means that the pod is up and running. PodRunning PodStatus = "Running" - PodPending PodStatus = "Pending" - PodStopped PodStatus = "Stopped" + // PodTerminated means that the pod has stopped. + PodTerminated PodStatus = "Terminated" ) // PodInfo contains one entry for every container with available info. diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 16346bccad208..25a5d05558c0c 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -211,9 +211,12 @@ type PodStatus string // These are the valid statuses of pods. const ( + // PodWaiting means that we're waiting for the pod to begin running. + PodWaiting = "Waiting" + // PodRunning means that the pod is up and running. PodRunning PodStatus = "Running" - PodPending PodStatus = "Pending" - PodStopped PodStatus = "Stopped" + // PodTerminated means that the pod has stopped. + PodTerminated PodStatus = "Terminated" ) // PodInfo contains one entry for every container with available info. diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 8bacad7e870a5..9067286809034 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -130,7 +130,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) { func (rm *ReplicationManager) filterActivePods(pods []api.Pod) []api.Pod { var result []api.Pod for _, value := range pods { - if api.PodStopped != value.CurrentState.Status { + if api.PodTerminated != value.CurrentState.Status { result = append(result, value) } } diff --git a/pkg/master/pod_cache.go b/pkg/master/pod_cache.go index b652ab221fc34..a4d0ff24b7c70 100644 --- a/pkg/master/pod_cache.go +++ b/pkg/master/pod_cache.go @@ -76,13 +76,13 @@ func (p *PodCache) updatePodInfo(host, id string) error { func (p *PodCache) UpdateAllContainers() { pods, err := p.pods.ListPods(labels.Everything()) if err != nil { - glog.Errorf("Error synchronizing container list: %#v", err) + glog.Errorf("Error synchronizing container list: %v", err) return } for _, pod := range pods { err := p.updatePodInfo(pod.CurrentState.Host, pod.ID) if err != nil && err != client.ErrPodInfoNotAvailable { - glog.Errorf("Error synchronizing container: %#v", err) + glog.Errorf("Error synchronizing container: %v", err) } } } diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index d4cc759fcb6f3..de24edb13fba9 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -33,7 +33,6 @@ import ( // EtcdRegistry implements PodRegistry, ControllerRegistry and ServiceRegistry with backed by etcd. type EtcdRegistry struct { helper tools.EtcdHelper - machines MinionRegistry manifestFactory ManifestFactory } @@ -43,8 +42,7 @@ type EtcdRegistry struct { // 'scheduler' is the scheduling algorithm to use. func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdRegistry { registry := &EtcdRegistry{ - helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner}, - machines: machines, + helper: tools.EtcdHelper{client, api.Codec, api.ResourceVersioner}, } registry.manifestFactory = &BasicManifestFactory{ serviceRegistry: registry, @@ -52,37 +50,34 @@ func MakeEtcdRegistry(client tools.EtcdClient, machines MinionRegistry) *EtcdReg return registry } -func makePodKey(machine, podID string) string { - return "/registry/hosts/" + machine + "/pods/" + podID +func makePodKey(podID string) string { + return "/registry/pods/" + podID } // ListPods obtains a list of pods that match selector. func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, error) { - pods := []api.Pod{} - machines, err := registry.machines.List() + allPods := []api.Pod{} + filteredPods := []api.Pod{} + err := registry.helper.ExtractList("/registry/pods", &allPods) if err != nil { return nil, err } - for _, machine := range machines { - var machinePods []api.Pod - err := registry.helper.ExtractList("/registry/hosts/"+machine+"/pods", &machinePods) - if err != nil { - return pods, err - } - for _, pod := range machinePods { - if selector.Matches(labels.Set(pod.Labels)) { - pod.CurrentState.Host = machine - pods = append(pods, pod) - } + for _, pod := range allPods { + if selector.Matches(labels.Set(pod.Labels)) { + filteredPods = append(filteredPods, pod) } } - return pods, nil + return filteredPods, nil } // GetPod gets a specific pod specified by its ID. func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) { - pod, _, err := registry.findPod(podID) - return &pod, err + var pod api.Pod + err := registry.helper.ExtractObj(makePodKey(podID), &pod, false) + if err != nil { + return nil, err + } + return &pod, nil } func makeContainerKey(machine string) string { @@ -90,32 +85,69 @@ func makeContainerKey(machine string) string { } // CreatePod creates a pod based on a specification, schedule it onto a specific machine. -func (registry *EtcdRegistry) CreatePod(machineIn string, pod api.Pod) error { - podOut, machine, err := registry.findPod(pod.ID) +func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error { + // TODO: When our client supports it, switch to atomic creates. + var pod2 api.Pod + err := registry.helper.ExtractObj(makePodKey(pod.ID), &pod2, false) if err == nil { - // TODO: this error message looks racy. - return fmt.Errorf("a pod named %s already exists on %s (%#v)", pod.ID, machine, podOut) + return fmt.Errorf("a pod named %s already exists (%#v)", pod.ID, pod2) } - return registry.runPod(pod, machineIn) + + // Set status to "Waiting". + pod.CurrentState.Status = api.PodWaiting + pod.CurrentState.Host = "" + + err = registry.helper.SetObj(makePodKey(pod.ID), &pod) + if err != nil { + return err + } + + // TODO: Until scheduler separation is completed, just assign here. + return registry.AssignPod(pod.ID, machine) } -func (registry *EtcdRegistry) runPod(pod api.Pod, machine string) error { - podKey := makePodKey(machine, pod.ID) - err := registry.helper.SetObj(podKey, pod) +// AssignPod assigns the given pod to the given machine. +// TODO: hook this up via apiserver, not by calling it from CreatePod(). +func (registry *EtcdRegistry) AssignPod(podID string, machine string) error { + podKey := makePodKey(podID) + var finalPod *api.Pod + err := registry.helper.AtomicUpdate( + podKey, + &api.Pod{}, + func(obj interface{}) (interface{}, error) { + pod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object: %#v", obj) + } + pod.CurrentState.Host = machine + pod.CurrentState.Status = api.PodWaiting + finalPod = pod + return pod, nil + }, + ) + if err != nil { + return err + } - manifest, err := registry.manifestFactory.MakeManifest(machine, pod) + // TODO: move this to a watch/rectification loop. + manifest, err := registry.manifestFactory.MakeManifest(machine, *finalPod) if err != nil { return err } contKey := makeContainerKey(machine) - err = registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { - manifests := *in.(*api.ContainerManifestList) - manifests.Items = append(manifests.Items, manifest) - return manifests, nil - }) + err = registry.helper.AtomicUpdate( + contKey, + &api.ContainerManifestList{}, + func(in interface{}) (interface{}, error) { + manifests := *in.(*api.ContainerManifestList) + manifests.Items = append(manifests.Items, manifest) + return manifests, nil + }, + ) if err != nil { - // Don't strand stuff. + // Don't strand stuff. This is a terrible hack that won't be needed + // when the above TODO is fixed. err2 := registry.helper.Delete(podKey, false) if err2 != nil { glog.Errorf("Probably stranding a pod, couldn't delete %v: %#v", podKey, err2) @@ -130,18 +162,19 @@ func (registry *EtcdRegistry) UpdatePod(pod api.Pod) error { // DeletePod deletes an existing pod specified by its ID. func (registry *EtcdRegistry) DeletePod(podID string) error { - _, machine, err := registry.findPod(podID) + var pod api.Pod + podKey := makePodKey(podID) + err := registry.helper.ExtractObj(podKey, &pod, false) + if tools.IsEtcdNotFound(err) { + return apiserver.NewNotFoundErr("pod", podID) + } if err != nil { return err } - return registry.deletePodFromMachine(machine, podID) -} -func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error { // First delete the pod, so a scheduler doesn't notice it getting removed from the // machine and attempt to put it somewhere. - podKey := makePodKey(machine, podID) - err := registry.helper.Delete(podKey, true) + err = registry.helper.Delete(podKey, true) if tools.IsEtcdNotFound(err) { return apiserver.NewNotFoundErr("pod", podID) } @@ -149,6 +182,12 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error return err } + machine := pod.CurrentState.Host + if machine == "" { + // Pod was never scheduled anywhere, just return. + return nil + } + // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) return registry.helper.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in interface{}) (interface{}, error) { @@ -173,30 +212,6 @@ func (registry *EtcdRegistry) deletePodFromMachine(machine, podID string) error }) } -func (registry *EtcdRegistry) getPodForMachine(machine, podID string) (pod api.Pod, err error) { - key := makePodKey(machine, podID) - err = registry.helper.ExtractObj(key, &pod, false) - if err != nil { - return - } - pod.CurrentState.Host = machine - return -} - -func (registry *EtcdRegistry) findPod(podID string) (api.Pod, string, error) { - machines, err := registry.machines.List() - if err != nil { - return api.Pod{}, "", err - } - for _, machine := range machines { - pod, err := registry.getPodForMachine(machine, podID) - if err == nil { - return pod, machine, nil - } - } - return api.Pod{}, "", apiserver.NewNotFoundErr("pod", podID) -} - // ListControllers obtains a list of ReplicationControllers. func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, error) { var controllers []api.ReplicationController diff --git a/pkg/registry/etcdregistry_test.go b/pkg/registry/etcdregistry_test.go index 9713016087fcd..93c6e323f14a3 100644 --- a/pkg/registry/etcdregistry_test.go +++ b/pkg/registry/etcdregistry_test.go @@ -24,7 +24,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) @@ -38,7 +37,7 @@ func MakeTestEtcdRegistry(client tools.EtcdClient, machines []string) *EtcdRegis func TestEtcdGetPod(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Set("/registry/hosts/machine/pods/foo", util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) + fakeClient.Set("/registry/pods/foo", api.EncodeOrDie(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) pod, err := registry.GetPod("foo") if err != nil { @@ -52,7 +51,7 @@ func TestEtcdGetPod(t *testing.T) { func TestEtcdGetPodNotFound(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -68,7 +67,7 @@ func TestEtcdGetPodNotFound(t *testing.T) { func TestEtcdCreatePod(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.TestIndex = true - fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -94,7 +93,7 @@ func TestEtcdCreatePod(t *testing.T) { t.Errorf("unexpected error: %v", err) } - resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) + resp, err := fakeClient.Get("/registry/pods/foo", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -121,10 +120,10 @@ func TestEtcdCreatePod(t *testing.T) { func TestEtcdCreatePodAlreadyExisting(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ - Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), + Value: api.EncodeOrDie(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), }, }, E: nil, @@ -142,7 +141,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { func TestEtcdCreatePodWithContainersError(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -163,7 +162,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { if err == nil { t.Error("Unexpected non-error") } - _, err = fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) + _, err = fakeClient.Get("/registry/pods/foo", false, false) if err == nil { t.Error("Unexpected non-error") } @@ -174,7 +173,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -206,7 +205,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { t.Errorf("unexpected error: %v", err) } - resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) + resp, err := fakeClient.Get("/registry/pods/foo", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -234,7 +233,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { func TestEtcdCreatePodWithExistingContainers(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.TestIndex = true - fakeClient.Data["/registry/hosts/machine/pods/foo"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -265,7 +264,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { t.Errorf("unexpected error: %v", err) } - resp, err := fakeClient.Get("/registry/hosts/machine/pods/foo", false, false) + resp, err := fakeClient.Get("/registry/pods/foo", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -294,8 +293,11 @@ func TestEtcdDeletePod(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.TestIndex = true - key := "/registry/hosts/machine/pods/foo" - fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) + key := "/registry/pods/foo" + fakeClient.Set(key, api.EncodeOrDie(api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{Host: "machine"}, + }), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ Items: []api.ContainerManifest{ {ID: "foo"}, @@ -327,8 +329,11 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.TestIndex = true - key := "/registry/hosts/machine/pods/foo" - fakeClient.Set(key, util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), 0) + key := "/registry/pods/foo" + fakeClient.Set(key, api.EncodeOrDie(api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{Host: "machine"}, + }), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ Items: []api.ContainerManifest{ {ID: "foo"}, @@ -363,7 +368,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { func TestEtcdEmptyListPods(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - key := "/registry/hosts/machine/pods" + key := "/registry/pods" fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -385,7 +390,7 @@ func TestEtcdEmptyListPods(t *testing.T) { func TestEtcdListPodsNotFound(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - key := "/registry/hosts/machine/pods" + key := "/registry/pods" fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: tools.EtcdErrorNotFound, @@ -403,16 +408,22 @@ func TestEtcdListPodsNotFound(t *testing.T) { func TestEtcdListPods(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - key := "/registry/hosts/machine/pods" + key := "/registry/pods" fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "foo"}}), + Value: api.EncodeOrDie(api.Pod{ + JSONBase: api.JSONBase{ID: "foo"}, + CurrentState: api.PodState{Host: "machine"}, + }), }, { - Value: util.MakeJSONString(api.Pod{JSONBase: api.JSONBase{ID: "bar"}}), + Value: api.EncodeOrDie(api.Pod{ + JSONBase: api.JSONBase{ID: "bar"}, + CurrentState: api.PodState{Host: "machine"}, + }), }, }, }, @@ -478,10 +489,10 @@ func TestEtcdListControllers(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), + Value: api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), }, { - Value: util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "bar"}}), + Value: api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "bar"}}), }, }, }, @@ -501,7 +512,7 @@ func TestEtcdListControllers(t *testing.T) { func TestEtcdGetController(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) + fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) ctrl, err := registry.GetController("foo") if err != nil { @@ -594,7 +605,7 @@ func TestEtcdUpdateController(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.TestIndex = true - resp, _ := fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) + resp, _ := fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.UpdateController(api.ReplicationController{ JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, @@ -620,10 +631,10 @@ func TestEtcdListServices(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), + Value: api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), }, { - Value: util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "bar"}}), + Value: api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "bar"}}), }, }, }, @@ -681,7 +692,7 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { func TestEtcdGetService(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) + fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) service, err := registry.GetService("foo") if err != nil { @@ -733,7 +744,7 @@ func TestEtcdUpdateService(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) fakeClient.TestIndex = true - resp, _ := fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) + resp, _ := fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) testService := api.Service{ JSONBase: api.JSONBase{ID: "foo", ResourceVersion: resp.Node.ModifiedIndex}, diff --git a/pkg/registry/podstorage.go b/pkg/registry/podstorage.go index f1860820e0578..065a8c1ff41b0 100644 --- a/pkg/registry/podstorage.go +++ b/pkg/registry/podstorage.go @@ -116,8 +116,8 @@ func (storage *PodRegistryStorage) fillPodInfo(pod *api.Pod) { } func makePodStatus(pod *api.Pod) api.PodStatus { - if pod.CurrentState.Info == nil { - return api.PodPending + if pod.CurrentState.Info == nil || pod.CurrentState.Host == "" { + return api.PodWaiting } running := 0 stopped := 0 @@ -138,11 +138,11 @@ func makePodStatus(pod *api.Pod) api.PodStatus { case running > 0 && stopped == 0 && unknown == 0: return api.PodRunning case running == 0 && stopped > 0 && unknown == 0: - return api.PodStopped + return api.PodTerminated case running == 0 && stopped == 0 && unknown > 0: - return api.PodPending + return api.PodWaiting default: - return api.PodPending + return api.PodWaiting } } @@ -251,7 +251,7 @@ func (storage *PodRegistryStorage) waitForPodRunning(pod api.Pod) (interface{}, return nil, fmt.Errorf("Error %#v is not an api.Pod!", podObj) } switch podPtr.CurrentState.Status { - case api.PodRunning, api.PodStopped: + case api.PodRunning, api.PodTerminated: return pod, nil default: time.Sleep(storage.podPollPeriod) diff --git a/pkg/registry/podstorage_test.go b/pkg/registry/podstorage_test.go index 6ebbac241a59f..7c4bc5e2af19e 100644 --- a/pkg/registry/podstorage_test.go +++ b/pkg/registry/podstorage_test.go @@ -288,10 +288,13 @@ func TestMakePodStatus(t *testing.T) { }, }, } - pod := &api.Pod{DesiredState: desiredState} + currentState := api.PodState{ + Host: "machine", + } + pod := &api.Pod{DesiredState: desiredState, CurrentState: currentState} status := makePodStatus(pod) - if status != api.PodPending { - t.Errorf("Expected 'Pending', got '%s'", status) + if status != api.PodWaiting { + t.Errorf("Expected 'Waiting', got '%s'", status) } runningState := docker.Container{ @@ -313,6 +316,7 @@ func TestMakePodStatus(t *testing.T) { "containerA": runningState, "containerB": runningState, }, + Host: "machine", }, } status = makePodStatus(pod) @@ -328,11 +332,12 @@ func TestMakePodStatus(t *testing.T) { "containerA": stoppedState, "containerB": stoppedState, }, + Host: "machine", }, } status = makePodStatus(pod) - if status != api.PodStopped { - t.Errorf("Expected 'Stopped', got '%s'", status) + if status != api.PodTerminated { + t.Errorf("Expected 'Terminated', got '%s'", status) } // Mixed state. @@ -343,11 +348,12 @@ func TestMakePodStatus(t *testing.T) { "containerA": runningState, "containerB": stoppedState, }, + Host: "machine", }, } status = makePodStatus(pod) - if status != api.PodPending { - t.Errorf("Expected 'Pending', got '%s'", status) + if status != api.PodWaiting { + t.Errorf("Expected 'Waiting', got '%s'", status) } // Mixed state. @@ -357,11 +363,12 @@ func TestMakePodStatus(t *testing.T) { Info: map[string]docker.Container{ "containerA": runningState, }, + Host: "machine", }, } status = makePodStatus(pod) - if status != api.PodPending { - t.Errorf("Expected 'Pending', got '%s'", status) + if status != api.PodWaiting { + t.Errorf("Expected 'Waiting', got '%s'", status) } } @@ -406,7 +413,7 @@ func TestCreatePod(t *testing.T) { pod: &api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, CurrentState: api.PodState{ - Status: api.PodPending, + Host: "machine", }, }, } From b7752a86d44048bd666a913943a6cad63842d51e Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 8 Aug 2014 13:25:36 -0700 Subject: [PATCH 2/3] Add debugging info printing to etcd fake And make tests pass again. --- pkg/api/types.go | 2 +- pkg/api/v1beta1/types.go | 2 +- pkg/registry/etcdregistry.go | 9 +-------- pkg/registry/etcdregistry_test.go | 12 +++++++----- pkg/tools/fake_etcd_client.go | 9 ++++++++- 5 files changed, 18 insertions(+), 16 deletions(-) diff --git a/pkg/api/types.go b/pkg/api/types.go index 5972cd0bc24d2..b63abde4bf618 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -209,7 +209,7 @@ type PodStatus string // These are the valid statuses of pods. const ( // PodWaiting means that we're waiting for the pod to begin running. - PodWaiting = "Waiting" + PodWaiting PodStatus = "Waiting" // PodRunning means that the pod is up and running. PodRunning PodStatus = "Running" // PodTerminated means that the pod has stopped. diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 25a5d05558c0c..54e5b489dbeae 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -212,7 +212,7 @@ type PodStatus string // These are the valid statuses of pods. const ( // PodWaiting means that we're waiting for the pod to begin running. - PodWaiting = "Waiting" + PodWaiting PodStatus = "Waiting" // PodRunning means that the pod is up and running. PodRunning PodStatus = "Running" // PodTerminated means that the pod has stopped. diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index de24edb13fba9..bafbd68076c8b 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -86,18 +86,11 @@ func makeContainerKey(machine string) string { // CreatePod creates a pod based on a specification, schedule it onto a specific machine. func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error { - // TODO: When our client supports it, switch to atomic creates. - var pod2 api.Pod - err := registry.helper.ExtractObj(makePodKey(pod.ID), &pod2, false) - if err == nil { - return fmt.Errorf("a pod named %s already exists (%#v)", pod.ID, pod2) - } - // Set status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" - err = registry.helper.SetObj(makePodKey(pod.ID), &pod) + err := registry.helper.CreateObj(makePodKey(pod.ID), &pod) if err != nil { return err } diff --git a/pkg/registry/etcdregistry_test.go b/pkg/registry/etcdregistry_test.go index 93c6e323f14a3..3e3fece4c5919 100644 --- a/pkg/registry/etcdregistry_test.go +++ b/pkg/registry/etcdregistry_test.go @@ -141,6 +141,7 @@ func TestEtcdCreatePodAlreadyExisting(t *testing.T) { func TestEtcdCreatePodWithContainersError(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.TestIndex = true fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, @@ -160,7 +161,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { }, }) if err == nil { - t.Error("Unexpected non-error") + t.Fatalf("Unexpected non-error") } _, err = fakeClient.Get("/registry/pods/foo", false, false) if err == nil { @@ -173,6 +174,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) + fakeClient.TestIndex = true fakeClient.Data["/registry/pods/foo"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, @@ -202,7 +204,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } resp, err := fakeClient.Get("/registry/pods/foo", false, false) @@ -588,7 +590,7 @@ func TestEtcdCreateController(t *testing.T) { func TestEtcdCreateControllerAlreadyExisting(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Set("/registry/controllers/foo", util.MakeJSONString(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) + fakeClient.Set("/registry/controllers/foo", api.EncodeOrDie(api.ReplicationController{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.CreateController(api.ReplicationController{ @@ -680,7 +682,7 @@ func TestEtcdCreateService(t *testing.T) { func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { fakeClient := tools.MakeFakeEtcdClient(t) - fakeClient.Set("/registry/services/specs/foo", util.MakeJSONString(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) + fakeClient.Set("/registry/services/specs/foo", api.EncodeOrDie(api.Service{JSONBase: api.JSONBase{ID: "foo"}}), 0) registry := MakeTestEtcdRegistry(fakeClient, []string{"machine"}) err := registry.CreateService(api.Service{ JSONBase: api.JSONBase{ID: "foo"}, @@ -782,7 +784,7 @@ func TestEtcdUpdateEndpoints(t *testing.T) { Endpoints: []string{"baz", "bar"}, } - fakeClient.Set("/registry/services/endpoints/foo", util.MakeJSONString(api.Endpoints{}), 0) + fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{}), 0) err := registry.UpdateEndpoints(endpoints) if err != nil { diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 39a0c32078366..cb4c914ecbd5e 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -87,6 +87,7 @@ func (f *FakeEtcdClient) generateIndex() uint64 { } f.ChangeIndex++ + f.t.Logf("generating index %v", f.ChangeIndex) return f.ChangeIndex } @@ -115,7 +116,7 @@ func (f *FakeEtcdClient) Get(key string, sort, recursive bool) (*etcd.Response, func (f *FakeEtcdClient) nodeExists(key string) bool { result, ok := f.Data[key] - return ok && result.R != nil && result.R.Node != nil + return ok && result.R != nil && result.R.Node != nil && result.E == nil } func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Response, error) { @@ -128,6 +129,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons if f.nodeExists(key) { prevResult := f.Data[key] createdIndex := prevResult.R.Node.CreatedIndex + f.t.Logf("updating %v, index %v -> %v", key, createdIndex, i) result := EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -141,6 +143,7 @@ func (f *FakeEtcdClient) setLocked(key, value string, ttl uint64) (*etcd.Respons return result.R, nil } + f.t.Logf("creating %v, index %v", key, i) result := EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ @@ -163,6 +166,7 @@ func (f *FakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, err func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue string, prevIndex uint64) (*etcd.Response, error) { if f.Err != nil { + f.t.Logf("c&s: returning err %v", f.Err) return nil, f.Err } @@ -179,16 +183,19 @@ func (f *FakeEtcdClient) CompareAndSwap(key, value string, ttl uint64, prevValue defer f.Mutex.Unlock() if !f.nodeExists(key) { + f.t.Logf("c&s: node doesn't exist") return nil, EtcdErrorNotFound } prevNode := f.Data[key].R.Node if prevValue != "" && prevValue != prevNode.Value { + f.t.Logf("body didn't match") return nil, EtcdErrorTestFailed } if prevIndex != 0 && prevIndex != prevNode.ModifiedIndex { + f.t.Logf("got index %v but needed %v", prevIndex, prevNode.ModifiedIndex) return nil, EtcdErrorTestFailed } From b5352a81c795ee18fde3da694a22d1ebcc383491 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 11 Aug 2014 15:12:51 -0700 Subject: [PATCH 3/3] Use DesiredState rather than CurrentState for Host. --- pkg/registry/etcdregistry.go | 19 +++++++++++++++---- pkg/registry/etcdregistry_test.go | 8 ++++---- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/registry/etcdregistry.go b/pkg/registry/etcdregistry.go index bafbd68076c8b..2eb9e30deb434 100644 --- a/pkg/registry/etcdregistry.go +++ b/pkg/registry/etcdregistry.go @@ -64,6 +64,10 @@ func (registry *EtcdRegistry) ListPods(selector labels.Selector) ([]api.Pod, err } for _, pod := range allPods { if selector.Matches(labels.Set(pod.Labels)) { + // TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets + // the CurrentState.Host and Status fields. Here we pretend that reality perfectly + // matches our desires. + pod.CurrentState.Host = pod.DesiredState.Host filteredPods = append(filteredPods, pod) } } @@ -77,6 +81,10 @@ func (registry *EtcdRegistry) GetPod(podID string) (*api.Pod, error) { if err != nil { return nil, err } + // TODO: Currently nothing sets CurrentState.Host. We need a feedback loop that sets + // the CurrentState.Host and Status fields. Here we pretend that reality perfectly + // matches our desires. + pod.CurrentState.Host = pod.DesiredState.Host return &pod, nil } @@ -86,10 +94,14 @@ func makeContainerKey(machine string) string { // CreatePod creates a pod based on a specification, schedule it onto a specific machine. func (registry *EtcdRegistry) CreatePod(machine string, pod api.Pod) error { - // Set status to "Waiting". + // Set current status to "Waiting". pod.CurrentState.Status = api.PodWaiting pod.CurrentState.Host = "" + // DesiredState.Host == "" is a signal to the scheduler that this pod needs scheduling. + pod.DesiredState.Status = api.PodRunning + pod.DesiredState.Host = "" + err := registry.helper.CreateObj(makePodKey(pod.ID), &pod) if err != nil { return err @@ -112,8 +124,7 @@ func (registry *EtcdRegistry) AssignPod(podID string, machine string) error { if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) } - pod.CurrentState.Host = machine - pod.CurrentState.Status = api.PodWaiting + pod.DesiredState.Host = machine finalPod = pod return pod, nil }, @@ -175,7 +186,7 @@ func (registry *EtcdRegistry) DeletePod(podID string) error { return err } - machine := pod.CurrentState.Host + machine := pod.DesiredState.Host if machine == "" { // Pod was never scheduled anywhere, just return. return nil diff --git a/pkg/registry/etcdregistry_test.go b/pkg/registry/etcdregistry_test.go index 3e3fece4c5919..967e60645bbe9 100644 --- a/pkg/registry/etcdregistry_test.go +++ b/pkg/registry/etcdregistry_test.go @@ -298,7 +298,7 @@ func TestEtcdDeletePod(t *testing.T) { key := "/registry/pods/foo" fakeClient.Set(key, api.EncodeOrDie(api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, - CurrentState: api.PodState{Host: "machine"}, + DesiredState: api.PodState{Host: "machine"}, }), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ Items: []api.ContainerManifest{ @@ -334,7 +334,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { key := "/registry/pods/foo" fakeClient.Set(key, api.EncodeOrDie(api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, - CurrentState: api.PodState{Host: "machine"}, + DesiredState: api.PodState{Host: "machine"}, }), 0) fakeClient.Set("/registry/hosts/machine/kubelet", api.EncodeOrDie(&api.ContainerManifestList{ Items: []api.ContainerManifest{ @@ -418,13 +418,13 @@ func TestEtcdListPods(t *testing.T) { { Value: api.EncodeOrDie(api.Pod{ JSONBase: api.JSONBase{ID: "foo"}, - CurrentState: api.PodState{Host: "machine"}, + DesiredState: api.PodState{Host: "machine"}, }), }, { Value: api.EncodeOrDie(api.Pod{ JSONBase: api.JSONBase{ID: "bar"}, - CurrentState: api.PodState{Host: "machine"}, + DesiredState: api.PodState{Host: "machine"}, }), }, },