From ad403fde5e4d9ec3a9a91e739dd8bf24a63259c6 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Wed, 8 Oct 2014 15:56:02 -0400 Subject: [PATCH 1/3] Read BoundPods from etcd instead of ContainerManifestList There are three values that uniquely identify a pod on a host - the configuration source (etcd, file, http), the pod name, and the pod namespace. This change ensures that configuration properly makes those names unique by changing podFullName to contain both name (currently ID in v1beta1, Name in v1beta3) and namespace. The Kubelet does not properly handle information requests for pods not in the default namespace at this time. --- pkg/api/conversion.go | 40 +++++ pkg/api/register.go | 4 + pkg/api/serialization_test.go | 4 + pkg/api/v1beta1/register.go | 4 + pkg/api/v1beta2/register.go | 4 + pkg/api/validation/validation.go | 22 +++ pkg/api/validation/validation_test.go | 18 ++ pkg/kubelet/config/config.go | 75 +++++--- pkg/kubelet/config/config_test.go | 121 ++++++++----- pkg/kubelet/config/etcd.go | 28 +-- pkg/kubelet/config/etcd_test.go | 37 ++-- pkg/kubelet/config/file.go | 41 +++-- pkg/kubelet/config/file_test.go | 104 ++++++++--- pkg/kubelet/config/http.go | 39 +++-- pkg/kubelet/config/http_test.go | 33 ++-- pkg/kubelet/dockertools/docker.go | 4 +- pkg/kubelet/kubelet.go | 70 ++++---- pkg/kubelet/kubelet_test.go | 238 +++++++++++++++----------- pkg/kubelet/runonce.go | 27 +-- pkg/kubelet/runonce_test.go | 20 ++- pkg/kubelet/server.go | 78 +++++++-- pkg/kubelet/server_test.go | 133 ++++++++++++-- pkg/kubelet/types.go | 18 +- pkg/kubelet/validation.go | 33 ---- pkg/kubelet/validation_test.go | 43 ----- 25 files changed, 800 insertions(+), 438 deletions(-) delete mode 100644 pkg/kubelet/validation.go delete mode 100644 pkg/kubelet/validation_test.go diff --git a/pkg/api/conversion.go b/pkg/api/conversion.go index 5c8cd1c809701..05323e9e262a9 100644 --- a/pkg/api/conversion.go +++ b/pkg/api/conversion.go @@ -17,9 +17,49 @@ limitations under the License. package api import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" ) // Codec is the identity codec for this package - it can only convert itself // to itself. var Codec = runtime.CodecFor(Scheme, "") + +func init() { + Scheme.AddConversionFuncs( + // Convert ContainerManifest to BoundPod + func(in *ContainerManifest, out *BoundPod, s conversion.Scope) error { + out.Spec.Containers = in.Containers + out.Spec.Volumes = in.Volumes + out.Spec.RestartPolicy = in.RestartPolicy + out.ID = in.ID + out.UID = in.UUID + return nil + }, + func(in *BoundPod, out *ContainerManifest, s conversion.Scope) error { + out.Containers = in.Spec.Containers + out.Volumes = in.Spec.Volumes + out.RestartPolicy = in.Spec.RestartPolicy + out.Version = "v1beta2" + out.ID = in.ID + out.UUID = in.UID + return nil + }, + func(in *ContainerManifestList, out *BoundPods, s conversion.Scope) error { + if err := s.Convert(&in.Items, &out.Items, 0); err != nil { + return err + } + for i := range out.Items { + item := &out.Items[i] + item.ResourceVersion = in.ResourceVersion + } + return nil + }, + func(in *BoundPods, out *ContainerManifestList, s conversion.Scope) error { + if err := s.Convert(&in.Items, &out.Items, 0); err != nil { + return err + } + out.ResourceVersion = in.ResourceVersion + return nil + }) +} diff --git a/pkg/api/register.go b/pkg/api/register.go index 5e7bb8cc42e50..95b22ca87f171 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -40,7 +40,9 @@ func init() { &Binding{}, &Event{}, &EventList{}, + &ContainerManifest{}, &ContainerManifestList{}, + &BoundPod{}, &BoundPods{}, ) } @@ -61,5 +63,7 @@ func (*ServerOp) IsAnAPIObject() {} func (*ServerOpList) IsAnAPIObject() {} func (*Event) IsAnAPIObject() {} func (*EventList) IsAnAPIObject() {} +func (*ContainerManifest) IsAnAPIObject() {} func (*ContainerManifestList) IsAnAPIObject() {} +func (*BoundPod) IsAnAPIObject() {} func (*BoundPods) IsAnAPIObject() {} diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index eb0e24815595b..4feeda1bd8a8c 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -170,6 +170,10 @@ func TestTypes(t *testing.T) { t.Errorf("Couldn't make a %v? %v", kind, err) continue } + if _, err := runtime.FindTypeMeta(item); err != nil { + t.Logf("%s is not a TypeMeta and cannot be round tripped: %v", kind, err) + continue + } runTest(t, v1beta1.Codec, item) runTest(t, v1beta2.Codec, item) runTest(t, api.Codec, item) diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index 7a125b8a92dba..b278d7769ecc2 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -42,7 +42,9 @@ func init() { &ServerOpList{}, &Event{}, &EventList{}, + &ContainerManifest{}, &ContainerManifestList{}, + &BoundPod{}, &BoundPods{}, ) } @@ -63,5 +65,7 @@ func (*ServerOp) IsAnAPIObject() {} func (*ServerOpList) IsAnAPIObject() {} func (*Event) IsAnAPIObject() {} func (*EventList) IsAnAPIObject() {} +func (*ContainerManifest) IsAnAPIObject() {} func (*ContainerManifestList) IsAnAPIObject() {} +func (*BoundPod) IsAnAPIObject() {} func (*BoundPods) IsAnAPIObject() {} diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index 2a5439b87c508..e0319b5ee19b9 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -42,7 +42,9 @@ func init() { &ServerOpList{}, &Event{}, &EventList{}, + &ContainerManifest{}, &ContainerManifestList{}, + &BoundPod{}, &BoundPods{}, ) } @@ -63,5 +65,7 @@ func (*ServerOp) IsAnAPIObject() {} func (*ServerOpList) IsAnAPIObject() {} func (*Event) IsAnAPIObject() {} func (*EventList) IsAnAPIObject() {} +func (*ContainerManifest) IsAnAPIObject() {} func (*ContainerManifestList) IsAnAPIObject() {} +func (*BoundPod) IsAnAPIObject() {} func (*BoundPods) IsAnAPIObject() {} diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index a68dc68114896..25bc6aa6a7603 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -437,3 +437,25 @@ func ValidateReadOnlyPersistentDisks(volumes []api.Volume) errs.ErrorList { } return allErrs } + +// ValidateBoundPod tests if required fields on a bound pod are set. +func ValidateBoundPod(pod *api.BoundPod) (errors []error) { + if !util.IsDNSSubdomain(pod.ID) { + errors = append(errors, errs.NewFieldInvalid("id", pod.ID)) + } + if !util.IsDNSSubdomain(pod.Namespace) { + errors = append(errors, errs.NewFieldInvalid("namespace", pod.Namespace)) + } + containerManifest := &api.ContainerManifest{ + Version: "v1beta2", + ID: pod.ID, + UUID: pod.UID, + Containers: pod.Spec.Containers, + Volumes: pod.Spec.Volumes, + RestartPolicy: pod.Spec.RestartPolicy, + } + if errs := ValidateManifest(containerManifest); len(errs) != 0 { + errors = append(errors, errs...) + } + return errors +} diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 9e0ed6366b388..baa3b0171fd73 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -823,3 +823,21 @@ func TestValidateReplicationController(t *testing.T) { } } } + +func TestValidateBoundPodNoName(t *testing.T) { + errorCases := map[string]api.BoundPod{ + // manifest is tested in api/validation_test.go, ensure it is invoked + "empty version": {TypeMeta: api.TypeMeta{ID: "test"}, Spec: api.PodSpec{Containers: []api.Container{{Name: ""}}}}, + + // Name + "zero-length name": {TypeMeta: api.TypeMeta{ID: ""}}, + "name > 255 characters": {TypeMeta: api.TypeMeta{ID: strings.Repeat("a", 256)}}, + "name not a DNS subdomain": {TypeMeta: api.TypeMeta{ID: "a.b.c."}}, + "name with underscore": {TypeMeta: api.TypeMeta{ID: "a_b_c"}}, + } + for k, v := range errorCases { + if errs := ValidateBoundPod(&v); len(errs) == 0 { + t.Errorf("expected failure for %s", k) + } + } +} diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index a110044f0727a..9c57ccc52d899 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -21,7 +21,9 @@ import ( "reflect" "sync" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/config" @@ -89,7 +91,7 @@ func (c *PodConfig) Sync() { type podStorage struct { podLock sync.RWMutex // map of source name to pod name to pod reference - pods map[string]map[string]*kubelet.Pod + pods map[string]map[string]*api.BoundPod mode PodConfigNotificationMode // ensures that updates are delivered in strict order @@ -103,7 +105,7 @@ type podStorage struct { // TODO: allow initialization of the current state of the store with snapshotted version. func newPodStorage(updates chan<- kubelet.PodUpdate, mode PodConfigNotificationMode) *podStorage { return &podStorage{ - pods: make(map[string]map[string]*kubelet.Pod), + pods: make(map[string]map[string]*api.BoundPod), mode: mode, updates: updates, } @@ -136,12 +138,12 @@ func (s *podStorage) Merge(source string, change interface{}) error { s.updates <- *updates } if len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} } case PodConfigNotificationSnapshot: if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 { - s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} } default: @@ -161,7 +163,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de pods := s.pods[source] if pods == nil { - pods = make(map[string]*kubelet.Pod) + pods = make(map[string]*api.BoundPod) } update := change.(kubelet.PodUpdate) @@ -175,11 +177,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de filtered := filterInvalidPods(update.Pods, source) for _, ref := range filtered { - name := ref.Name + name := podUniqueName(ref) if existing, found := pods[name]; found { - if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + if !reflect.DeepEqual(existing.Spec, ref.Spec) { // this is an update - existing.Manifest = ref.Manifest + existing.Spec = ref.Spec updates.Pods = append(updates.Pods, *existing) continue } @@ -187,7 +189,10 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de continue } // this is an add - ref.Namespace = source + if ref.Annotations == nil { + ref.Annotations = make(map[string]string) + } + ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source pods[name] = ref adds.Pods = append(adds.Pods, *ref) } @@ -195,7 +200,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de case kubelet.REMOVE: glog.V(4).Infof("Removing a pod %v", update) for _, value := range update.Pods { - name := value.Name + name := podUniqueName(&value) if existing, found := pods[name]; found { // this is a delete delete(pods, name) @@ -209,23 +214,26 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de glog.V(4).Infof("Setting pods for source %s : %v", source, update) // Clear the old map entries by just creating a new map oldPods := pods - pods = make(map[string]*kubelet.Pod) + pods = make(map[string]*api.BoundPod) filtered := filterInvalidPods(update.Pods, source) for _, ref := range filtered { - name := ref.Name + name := podUniqueName(ref) if existing, found := oldPods[name]; found { pods[name] = existing - if !reflect.DeepEqual(existing.Manifest, ref.Manifest) { + if !reflect.DeepEqual(existing.Spec, ref.Spec) { // this is an update - existing.Manifest = ref.Manifest + existing.Spec = ref.Spec updates.Pods = append(updates.Pods, *existing) continue } // this is a no-op continue } - ref.Namespace = source + if ref.Annotations == nil { + ref.Annotations = make(map[string]string) + } + ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source pods[name] = ref adds.Pods = append(adds.Pods, *ref) } @@ -246,20 +254,21 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de return adds, updates, deletes } -func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.Pod) { +func filterInvalidPods(pods []api.BoundPod, source string) (filtered []*api.BoundPod) { names := util.StringSet{} for i := range pods { var errors []error - if names.Has(pods[i].Name) { - errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].Name)) + name := podUniqueName(&pods[i]) + if names.Has(name) { + errors = append(errors, apierrs.NewFieldDuplicate("name", pods[i].ID)) } else { - names.Insert(pods[i].Name) + names.Insert(name) } - if errs := kubelet.ValidatePod(&pods[i]); len(errs) != 0 { + if errs := validation.ValidateBoundPod(&pods[i]); len(errs) != 0 { errors = append(errors, errs...) } if len(errors) > 0 { - glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].Name, source, errors) + glog.Warningf("Pod %d (%s) from %s failed validation, ignoring: %v", i+1, pods[i].ID, source, errors) continue } filtered = append(filtered, &pods[i]) @@ -271,20 +280,32 @@ func filterInvalidPods(pods []kubelet.Pod, source string) (filtered []*kubelet.P func (s *podStorage) Sync() { s.updateLock.Lock() defer s.updateLock.Unlock() - s.updates <- kubelet.PodUpdate{s.MergedState().([]kubelet.Pod), kubelet.SET} + s.updates <- kubelet.PodUpdate{s.MergedState().([]api.BoundPod), kubelet.SET} } // Object implements config.Accessor func (s *podStorage) MergedState() interface{} { s.podLock.RLock() defer s.podLock.RUnlock() - pods := make([]kubelet.Pod, 0) - for source, sourcePods := range s.pods { + var pods []api.BoundPod + for _, sourcePods := range s.pods { for _, podRef := range sourcePods { - pod := *podRef - pod.Namespace = source - pods = append(pods, pod) + pod, err := api.Scheme.Copy(podRef) + if err != nil { + glog.Errorf("unable to copy pod: %v", err) + } + pods = append(pods, *pod.(*api.BoundPod)) } } return pods } + +// podUniqueName returns a value for a given pod that is unique across a source, +// which is the combination of namespace and ID. +func podUniqueName(pod *api.BoundPod) string { + namespace := pod.Namespace + if len(namespace) == 0 { + namespace = api.NamespaceDefault + } + return fmt.Sprintf("%s.%s", pod.ID, namespace) +} diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 05d85386255fa..4671c65aa80fe 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -18,6 +18,7 @@ package config import ( "reflect" + "sort" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -32,7 +33,7 @@ func expectEmptyChannel(t *testing.T, ch <-chan interface{}) { } } -type sortedPods []kubelet.Pod +type sortedPods []api.BoundPod func (s sortedPods) Len() int { return len(s) @@ -41,25 +42,27 @@ func (s sortedPods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s sortedPods) Less(i, j int) bool { - if s[i].Namespace < s[j].Namespace { - return true - } - return s[i].Name < s[j].Name + return s[i].ID < s[j].ID } -func CreateValidPod(name, namespace string) kubelet.Pod { - return kubelet.Pod{ - Name: name, - Namespace: namespace, - Manifest: api.ContainerManifest{ - Version: "v1beta1", +func CreateValidPod(name, namespace, source string) api.BoundPod { + return api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: name, + Namespace: namespace, + Annotations: map[string]string{kubelet.ConfigSourceAnnotationKey: source}, + }, + Spec: api.PodSpec{ RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, }, } } -func CreatePodUpdate(op kubelet.PodOperation, pods ...kubelet.Pod) kubelet.PodUpdate { - newPods := make([]kubelet.Pod, len(pods)) +func CreatePodUpdate(op kubelet.PodOperation, pods ...api.BoundPod) kubelet.PodUpdate { + if len(pods) == 0 { + return kubelet.PodUpdate{Op: op} + } + newPods := make([]api.BoundPod, len(pods)) for i := range pods { newPods[i] = pods[i] } @@ -76,6 +79,7 @@ func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kubelet.PodUpdate) { for i := range expected { update := <-ch + sort.Sort(sortedPods(update.Pods)) if !reflect.DeepEqual(expected[i], update) { t.Fatalf("Expected %#v, Got %#v", expected[i], update) } @@ -95,24 +99,63 @@ func TestNewPodAdded(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) +} + +func TestNewPodAddedInvalidNamespace(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "", "")) + channel <- podUpdate + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET)) +} + +func TestNewPodAddedDefaultNamespace(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) + + config.Sync() + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"))) +} + +func TestNewPodAddedDifferentNamespaces(t *testing.T) { + channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + + // see an update + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "")) + channel <- podUpdate + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "default", "test"))) + + // see an update in another namespace + podUpdate = CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test"))) } func TestInvalidPodFiltered(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) // add an invalid update - podUpdate = CreatePodUpdate(kubelet.UPDATE, kubelet.Pod{Name: "foo"}) + podUpdate = CreatePodUpdate(kubelet.UPDATE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo"}}) channel <- podUpdate expectNoPodUpdate(t, ch) } @@ -121,16 +164,16 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) // container updates are separated as UPDATE pod := podUpdate.Pods[0] - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} channel <- CreatePodUpdate(kubelet.ADD, pod) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) } @@ -139,16 +182,16 @@ func TestNewPodAddedSnapshot(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, CreateValidPod("foo", "new", "test"))) // container updates are separated as UPDATE pod := podUpdate.Pods[0] - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} channel <- CreatePodUpdate(kubelet.ADD, pod) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, pod)) } @@ -157,21 +200,21 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) // an kubelet.ADD should be converted to kubelet.UPDATE - pod := CreateValidPod("foo", "test") - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} + pod := CreateValidPod("foo", "new", "test") + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} podUpdate = CreatePodUpdate(kubelet.ADD, pod) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, pod)) - podUpdate = CreatePodUpdate(kubelet.REMOVE, kubelet.Pod{Name: "foo"}) + podUpdate = CreatePodUpdate(kubelet.REMOVE, api.BoundPod{TypeMeta: api.TypeMeta{ID: "foo", Namespace: "new"}}) channel <- podUpdate expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, pod)) } @@ -180,20 +223,20 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", ""), CreateValidPod("foo2", ""), CreateValidPod("foo3", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", "")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "test"), CreateValidPod("foo2", "test"), CreateValidPod("foo3", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) // should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE - pod := CreateValidPod("foo2", "test") - pod.Manifest.Containers = []api.Container{{Name: "bar", Image: "test"}} - podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", ""), CreateValidPod("foo4", "test")) + pod := CreateValidPod("foo2", "new", "test") + pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test"}} + podUpdate = CreatePodUpdate(kubelet.SET, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test")) channel <- podUpdate expectPodUpdate(t, ch, - CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "test")), - CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "test")), + CreatePodUpdate(kubelet.REMOVE, CreateValidPod("foo", "new", "test")), + CreatePodUpdate(kubelet.ADD, CreateValidPod("foo4", "new", "test")), CreatePodUpdate(kubelet.UPDATE, pod)) } diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index a8a822c3b83ec..d53e787b5552f 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "path" + "strconv" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -86,21 +87,26 @@ func (s *SourceEtcd) run() { // eventToPods takes a watch.Event object, and turns it into a structured list of pods. // It returns a list of containers, or an error if one occurs. -func eventToPods(ev watch.Event) ([]kubelet.Pod, error) { - pods := []kubelet.Pod{} - manifests, ok := ev.Object.(*api.ContainerManifestList) +func eventToPods(ev watch.Event) ([]api.BoundPod, error) { + pods := []api.BoundPod{} + boundPods, ok := ev.Object.(*api.BoundPods) if !ok { - return pods, errors.New("unable to parse response as ContainerManifestList") + return pods, errors.New("unable to parse response as BoundPods") } - for i, manifest := range manifests.Items { - name := manifest.ID - if name == "" { - name = fmt.Sprintf("%d", i+1) + for i, pod := range boundPods.Items { + if len(pod.ID) == 0 { + pod.ID = fmt.Sprintf("%d", i+1) } - pods = append(pods, kubelet.Pod{ - Name: name, - Manifest: manifest}) + // TODO: generate random UID if not present + if pod.UID == "" && !pod.CreationTimestamp.IsZero() { + pod.UID = strconv.FormatInt(pod.CreationTimestamp.Unix(), 10) + } + // Backwards compatibility with old api servers + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault + } + pods = append(pods, pod) } return pods, nil diff --git a/pkg/kubelet/config/etcd_test.go b/pkg/kubelet/config/etcd_test.go index 917acd40e53e8..6e3cb10126e82 100644 --- a/pkg/kubelet/config/etcd_test.go +++ b/pkg/kubelet/config/etcd_test.go @@ -21,53 +21,52 @@ import ( "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestEventToPods(t *testing.T) { tests := []struct { input watch.Event - pods []kubelet.Pod + pods []api.BoundPod fail bool }{ { input: watch.Event{Object: nil}, - pods: []kubelet.Pod{}, + pods: []api.BoundPod{}, fail: true, }, { - input: watch.Event{Object: &api.ContainerManifestList{}}, - pods: []kubelet.Pod{}, + input: watch.Event{Object: &api.BoundPods{}}, + pods: []api.BoundPod{}, fail: false, }, { input: watch.Event{ - Object: &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "foo"}, - {ID: "bar"}, + Object: &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo"}}, + {TypeMeta: api.TypeMeta{ID: "bar"}}, }, }, }, - pods: []kubelet.Pod{ - {Name: "foo", Manifest: api.ContainerManifest{ID: "foo"}}, - {Name: "bar", Manifest: api.ContainerManifest{ID: "bar"}}, + pods: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo", Namespace: "default"}, Spec: api.PodSpec{}}, + {TypeMeta: api.TypeMeta{ID: "bar", Namespace: "default"}, Spec: api.PodSpec{}}, }, fail: false, }, { input: watch.Event{ - Object: &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: ""}, - {ID: ""}, + Object: &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "1"}}, + {TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}}, }, }, }, - pods: []kubelet.Pod{ - {Name: "1", Manifest: api.ContainerManifest{ID: ""}}, - {Name: "2", Manifest: api.ContainerManifest{ID: ""}}, + pods: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "1", Namespace: "default"}, Spec: api.PodSpec{}}, + {TypeMeta: api.TypeMeta{ID: "2", Namespace: "foo"}, Spec: api.PodSpec{}}, }, fail: false, }, diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index 111f7cf83f0ec..107f8d3dced37 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -29,8 +29,10 @@ import ( "strings" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" "gopkg.in/v1/yaml" ) @@ -79,7 +81,7 @@ func (s *SourceFile) extractFromPath() error { if err != nil { return err } - s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} default: return fmt.Errorf("path is not a directory or file") @@ -88,28 +90,29 @@ func (s *SourceFile) extractFromPath() error { return nil } -func extractFromDir(name string) ([]kubelet.Pod, error) { - pods := []kubelet.Pod{} - +func extractFromDir(name string) ([]api.BoundPod, error) { files, err := filepath.Glob(filepath.Join(name, "[^.]*")) if err != nil { - return pods, err + return nil, err + } + if len(files) == 0 { + return nil, nil } sort.Strings(files) - + pods := []api.BoundPod{} for _, file := range files { pod, err := extractFromFile(file) if err != nil { - return []kubelet.Pod{}, err + return nil, err } pods = append(pods, pod) } return pods, nil } -func extractFromFile(name string) (kubelet.Pod, error) { - var pod kubelet.Pod +func extractFromFile(name string) (api.BoundPod, error) { + var pod api.BoundPod file, err := os.Open(name) if err != nil { @@ -123,15 +126,23 @@ func extractFromFile(name string) (kubelet.Pod, error) { return pod, err } - if err := yaml.Unmarshal(data, &pod.Manifest); err != nil { - return pod, fmt.Errorf("could not unmarshal manifest: %v", err) + manifest := &api.ContainerManifest{} + // TODO: use api.Scheme.DecodeInto + if err := yaml.Unmarshal(data, manifest); err != nil { + return pod, err } - podName := pod.Manifest.ID - if podName == "" { - podName = simpleSubdomainSafeHash(name) + if err := api.Scheme.Convert(manifest, &pod); err != nil { + return pod, err + } + + pod.ID = simpleSubdomainSafeHash(name) + if len(pod.UID) == 0 { + pod.UID = simpleSubdomainSafeHash(name) + } + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault } - pod.Name = podName return pod, nil } diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go index 4fed13fd2981f..0e280f49e7e51 100644 --- a/pkg/kubelet/config/file_test.go +++ b/pkg/kubelet/config/file_test.go @@ -26,10 +26,57 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" + "gopkg.in/v1/yaml" ) +func ExampleManifestAndPod(id string) (api.ContainerManifest, api.BoundPod) { + manifest := api.ContainerManifest{ + ID: id, + UUID: "uid", + Containers: []api.Container{ + { + Name: "c" + id, + Image: "foo", + }, + }, + Volumes: []api.Volume{ + { + Name: "host-dir", + Source: &api.VolumeSource{ + HostDir: &api.HostDir{"/dir/path"}, + }, + }, + }, + } + expectedPod := api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: id, + UID: "uid", + Namespace: "default", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "c" + id, + Image: "foo", + }, + }, + Volumes: []api.Volume{ + { + Name: "host-dir", + Source: &api.VolumeSource{ + HostDir: &api.HostDir{"/dir/path"}, + }, + }, + }, + }, + } + return manifest, expectedPod +} + func TestExtractFromNonExistentFile(t *testing.T) { ch := make(chan interface{}, 1) c := SourceFile{"/some/fake/file", ch} @@ -70,16 +117,18 @@ func TestReadFromFile(t *testing.T) { select { case got := <-ch: update := got.(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{ - Name: "test", - Manifest: api.ContainerManifest{ - ID: "test", - Version: "v1beta1", + expected := CreatePodUpdate(kubelet.SET, api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: simpleSubdomainSafeHash(file.Name()), + UID: simpleSubdomainSafeHash(file.Name()), + Namespace: "default", + }, + Spec: api.PodSpec{ Containers: []api.Container{{Image: "test/image"}}, }, }) if !reflect.DeepEqual(expected, update) { - t.Errorf("Expected %#v, Got %#v", expected, update) + t.Fatalf("Expected %#v, Got %#v", expected, update) } case <-time.After(2 * time.Millisecond): @@ -95,29 +144,31 @@ func TestExtractFromBadDataFile(t *testing.T) { c := SourceFile{file.Name(), ch} err := c.extractFromPath() if err == nil { - t.Errorf("Expected error") + t.Fatalf("Expected error") } expectEmptyChannel(t, ch) } func TestExtractFromValidDataFile(t *testing.T) { - manifest := api.ContainerManifest{ID: ""} + manifest, expectedPod := ExampleManifestAndPod("id") text, err := json.Marshal(manifest) if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } file := writeTestFile(t, os.TempDir(), "test_pod_config", string(text)) defer os.Remove(file.Name()) + expectedPod.ID = simpleSubdomainSafeHash(file.Name()) + ch := make(chan interface{}, 1) c := SourceFile{file.Name(), ch} err = c.extractFromPath() if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } update := (<-ch).(kubelet.PodUpdate) - expected := CreatePodUpdate(kubelet.SET, kubelet.Pod{Name: simpleSubdomainSafeHash(file.Name()), Manifest: manifest}) + expected := CreatePodUpdate(kubelet.SET, expectedPod) if !reflect.DeepEqual(expected, update) { t.Errorf("Expected %#v, Got %#v", expected, update) } @@ -126,7 +177,7 @@ func TestExtractFromValidDataFile(t *testing.T) { func TestExtractFromEmptyDir(t *testing.T) { dirName, err := ioutil.TempDir("", "foo") if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } defer os.RemoveAll(dirName) @@ -134,7 +185,7 @@ func TestExtractFromEmptyDir(t *testing.T) { c := SourceFile{dirName, ch} err = c.extractFromPath() if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } update := (<-ch).(kubelet.PodUpdate) @@ -145,54 +196,55 @@ func TestExtractFromEmptyDir(t *testing.T) { } func TestExtractFromDir(t *testing.T) { - manifests := []api.ContainerManifest{ - {Version: "v1beta1", Containers: []api.Container{{Name: "1", Image: "foo"}}}, - {Version: "v1beta1", Containers: []api.Container{{Name: "2", Image: "bar"}}}, - } + manifest, expectedPod := ExampleManifestAndPod("1") + manifest2, expectedPod2 := ExampleManifestAndPod("2") + + manifests := []api.ContainerManifest{manifest, manifest2} + pods := []api.BoundPod{expectedPod, expectedPod2} files := make([]*os.File, len(manifests)) dirName, err := ioutil.TempDir("", "foo") if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } for i, manifest := range manifests { data, err := json.Marshal(manifest) if err != nil { t.Errorf("Unexpected error: %v", err) + continue } file, err := ioutil.TempFile(dirName, manifest.ID) if err != nil { t.Errorf("Unexpected error: %v", err) + continue } name := file.Name() if err := file.Close(); err != nil { t.Errorf("Unexpected error: %v", err) + continue } ioutil.WriteFile(name, data, 0755) files[i] = file + pods[i].ID = simpleSubdomainSafeHash(name) } ch := make(chan interface{}, 1) c := SourceFile{dirName, ch} err = c.extractFromPath() if err != nil { - t.Errorf("Unexpected error: %v", err) + t.Fatalf("Unexpected error: %v", err) } update := (<-ch).(kubelet.PodUpdate) - expected := CreatePodUpdate( - kubelet.SET, - kubelet.Pod{Name: simpleSubdomainSafeHash(files[0].Name()), Manifest: manifests[0]}, - kubelet.Pod{Name: simpleSubdomainSafeHash(files[1].Name()), Manifest: manifests[1]}, - ) + expected := CreatePodUpdate(kubelet.SET, pods...) sort.Sort(sortedPods(update.Pods)) sort.Sort(sortedPods(expected.Pods)) if !reflect.DeepEqual(expected, update) { - t.Errorf("Expected %#v, Got %#v", expected, update) + t.Fatalf("Expected %#v, Got %#v", expected, update) } for i := range update.Pods { - if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 { + if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 { t.Errorf("Expected no validation errors on %#v, Got %#v", update.Pods[i], errs) } } diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index 6aca6333b2f90..455201b13b324 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -28,6 +28,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" "gopkg.in/v1/yaml" ) @@ -79,6 +80,7 @@ func (s *SourceURL) extractFromURL() error { // First try as if it's a single manifest var manifest api.ContainerManifest + // TODO: should be api.Scheme.Decode singleErr := yaml.Unmarshal(data, &manifest) if singleErr == nil { if errs := validation.ValidateManifest(&manifest); len(errs) > 0 { @@ -86,16 +88,23 @@ func (s *SourceURL) extractFromURL() error { } } if singleErr == nil { - pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest} - if pod.Name == "" { - pod.Name = "1" + pod := api.BoundPod{} + if err := api.Scheme.Convert(&manifest, &pod); err != nil { + return err + } + if len(pod.ID) == 0 { + pod.ID = "1" + } + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault } - s.updates <- kubelet.PodUpdate{[]kubelet.Pod{pod}, kubelet.SET} + s.updates <- kubelet.PodUpdate{[]api.BoundPod{pod}, kubelet.SET} return nil } // That didn't work, so try an array of manifests. var manifests []api.ContainerManifest + // TODO: should be api.Scheme.Decode multiErr := yaml.Unmarshal(data, &manifests) // We're not sure if the person reading the logs is going to care about the single or // multiple manifest unmarshalling attempt, so we need to put both in the logs, as is @@ -113,18 +122,24 @@ func (s *SourceURL) extractFromURL() error { // array of manifests (and no error) when unmarshaled as such. In that case, // if the single manifest at least had a Version, we return the single-manifest // error (if any). - if len(manifests) == 0 && manifest.Version != "" { + if len(manifests) == 0 && len(manifest.Version) != 0 { return singleErr } - pods := []kubelet.Pod{} - for i, manifest := range manifests { - pod := kubelet.Pod{Name: manifest.ID, Manifest: manifest} - if pod.Name == "" { - pod.Name = fmt.Sprintf("%d", i+1) + list := api.ContainerManifestList{Items: manifests} + boundPods := &api.BoundPods{} + if err := api.Scheme.Convert(&list, boundPods); err != nil { + return err + } + for i := range boundPods.Items { + pod := &boundPods.Items[i] + if len(pod.ID) == 0 { + pod.ID = fmt.Sprintf("%d", i+1) + } + if len(pod.Namespace) == 0 { + pod.Namespace = api.NamespaceDefault } - pods = append(pods, pod) } - s.updates <- kubelet.PodUpdate{pods, kubelet.SET} + s.updates <- kubelet.PodUpdate{boundPods.Items, kubelet.SET} return nil } diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index e561d0d7d5738..13a08aeaf4137 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -122,11 +123,12 @@ func TestExtractFromHTTP(t *testing.T) { desc: "Single manifest", manifests: api.ContainerManifest{Version: "v1beta1", ID: "foo"}, expected: CreatePodUpdate(kubelet.SET, - kubelet.Pod{ - Name: "foo", - Manifest: api.ContainerManifest{ - Version: "v1beta1", - ID: "foo", + api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "default", + }, + Spec: api.PodSpec{ RestartPolicy: api.RestartPolicy{Always: &api.RestartPolicyAlways{}}, }, }), @@ -138,13 +140,19 @@ func TestExtractFromHTTP(t *testing.T) { {Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}}, }, expected: CreatePodUpdate(kubelet.SET, - kubelet.Pod{ - Name: "1", - Manifest: api.ContainerManifest{Version: "v1beta1", ID: "", Containers: []api.Container{{Name: "1", Image: "foo"}}}, + api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "1", + Namespace: "default", + }, + Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}}, }, - kubelet.Pod{ - Name: "bar", - Manifest: api.ContainerManifest{Version: "v1beta1", ID: "bar", Containers: []api.Container{{Name: "1", Image: "foo"}}}, + api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "bar", + Namespace: "default", + }, + Spec: api.PodSpec{Containers: []api.Container{{Name: "1", Image: "foo"}}}, }), }, { @@ -167,13 +175,14 @@ func TestExtractFromHTTP(t *testing.T) { c := SourceURL{testServer.URL, ch, nil} if err := c.extractFromURL(); err != nil { t.Errorf("%s: Unexpected error: %v", testCase.desc, err) + continue } update := (<-ch).(kubelet.PodUpdate) if !reflect.DeepEqual(testCase.expected, update) { t.Errorf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update) } for i := range update.Pods { - if errs := kubelet.ValidatePod(&update.Pods[i]); len(errs) != 0 { + if errs := validation.ValidateBoundPod(&update.Pods[i]); len(errs) != 0 { t.Errorf("%s: Expected no validation errors on %#v, Got %#v", testCase.desc, update.Pods[i], errs) } } diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index 6dc70d300c688..ecb2229b6dfb5 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -212,7 +212,7 @@ func GetKubeletDockerContainers(client DockerInterface, allContainers bool) (Doc // TODO(dchen1107): Remove the old separator "--" by end of Oct if !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"_") && !strings.HasPrefix(container.Names[0], "/"+containerNamePrefix+"--") { - glog.Infof("Docker Container:%s is not managed by kubelet.", container.Names[0]) + glog.Infof("Docker Container: %s is not managed by kubelet.", container.Names[0]) continue } result[DockerID(container.ID)] = container @@ -330,7 +330,7 @@ func inspectContainer(client DockerInterface, dockerID, containerName string) (* } // GetDockerPodInfo returns docker info for all containers in the pod/manifest. -func GetDockerPodInfo(client DockerInterface, manifest api.ContainerManifest, podFullName, uuid string) (api.PodInfo, error) { +func GetDockerPodInfo(client DockerInterface, manifest api.PodSpec, podFullName, uuid string) (api.PodInfo, error) { info := api.PodInfo{} containers, err := client.ListContainers(docker.ListContainersOptions{All: true}) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 32e6b02e385f7..d681e453a7fc8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -55,7 +55,7 @@ type CadvisorInterface interface { // SyncHandler is an interface implemented by Kubelet, for testability type SyncHandler interface { - SyncPods([]Pod) error + SyncPods([]api.BoundPod) error } type volumeMap map[string]volume.Interface @@ -111,7 +111,7 @@ type Kubelet struct { networkContainerImage string podWorkers *podWorkers resyncInterval time.Duration - pods []Pod + pods []api.BoundPod // Optional, no events will be sent without it etcdClient tools.EtcdClient @@ -213,7 +213,7 @@ func makeEnvironmentVariables(container *api.Container) []string { return result } -func makeBinds(pod *Pod, container *api.Container, podVolumes volumeMap) []string { +func makeBinds(pod *api.BoundPod, container *api.Container, podVolumes volumeMap) []string { binds := []string{} for _, mount := range container.VolumeMounts { vol, ok := podVolumes[mount.Name] @@ -276,10 +276,10 @@ func milliCPUToShares(milliCPU int) int { return shares } -func (kl *Kubelet) mountExternalVolumes(manifest *api.ContainerManifest) (volumeMap, error) { +func (kl *Kubelet) mountExternalVolumes(pod *api.BoundPod) (volumeMap, error) { podVolumes := make(volumeMap) - for _, vol := range manifest.Volumes { - extVolume, err := volume.CreateVolumeBuilder(&vol, manifest.ID, kl.rootDirectory) + for _, vol := range pod.Spec.Volumes { + extVolume, err := volume.CreateVolumeBuilder(&vol, pod.ID, kl.rootDirectory) if err != nil { return nil, err } @@ -323,18 +323,18 @@ func (kl *Kubelet) runHandler(podFullName, uuid string, container *api.Container } // Run a single container from a pod. Returns the docker container ID -func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) { +func (kl *Kubelet) runContainer(pod *api.BoundPod, container *api.Container, podVolumes volumeMap, netMode string) (id dockertools.DockerID, err error) { envVariables := makeEnvironmentVariables(container) binds := makeBinds(pod, container, podVolumes) exposedPorts, portBindings := makePortsAndBindings(container) opts := docker.CreateContainerOptions{ - Name: dockertools.BuildDockerName(pod.Manifest.UUID, GetPodFullName(pod), container), + Name: dockertools.BuildDockerName(pod.UID, GetPodFullName(pod), container), Config: &docker.Config{ Cmd: container.Command, Env: envVariables, ExposedPorts: exposedPorts, - Hostname: pod.Name, + Hostname: pod.ID, Image: container.Image, Memory: int64(container.Memory), CpuShares: int64(milliCPUToShares(container.CPU)), @@ -358,7 +358,7 @@ func (kl *Kubelet) runContainer(pod *Pod, container *api.Container, podVolumes v Privileged: privileged, }) if err == nil && container.Lifecycle != nil && container.Lifecycle.PostStart != nil { - handlerErr := kl.runHandler(GetPodFullName(pod), pod.Manifest.UUID, container, container.Lifecycle.PostStart) + handlerErr := kl.runHandler(GetPodFullName(pod), pod.UID, container, container.Lifecycle.PostStart) if handlerErr != nil { kl.killContainerByID(dockerContainer.ID, "") return dockertools.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) @@ -392,11 +392,11 @@ const ( ) // createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container. -func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error) { +func (kl *Kubelet) createNetworkContainer(pod *api.BoundPod) (dockertools.DockerID, error) { var ports []api.Port // Docker only exports ports from the network container. Let's // collect all of the relevant ports and export them. - for _, container := range pod.Manifest.Containers { + for _, container := range pod.Spec.Containers { ports = append(ports, container.Ports...) } container := &api.Container{ @@ -419,12 +419,12 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error // Delete all containers in a pod (except the network container) returns the number of containers deleted // and an error if one occurs. -func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) { +func (kl *Kubelet) deleteAllContainers(pod *api.BoundPod, podFullName string, dockerContainers dockertools.DockerContainers) (int, error) { count := 0 - errs := make(chan error, len(pod.Manifest.Containers)) + errs := make(chan error, len(pod.Spec.Containers)) wg := sync.WaitGroup{} - for _, container := range pod.Manifest.Containers { - if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.Manifest.UUID, container.Name); found { + for _, container := range pod.Spec.Containers { + if dockerContainer, found, _ := dockerContainers.FindPodContainer(podFullName, pod.UID, container.Name); found { count++ wg.Add(1) go func() { @@ -451,9 +451,9 @@ func (kl *Kubelet) deleteAllContainers(pod *Pod, podFullName string, dockerConta type empty struct{} -func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContainers) error { +func (kl *Kubelet) syncPod(pod *api.BoundPod, dockerContainers dockertools.DockerContainers) error { podFullName := GetPodFullName(pod) - uuid := pod.Manifest.UUID + uuid := pod.UID containersToKeep := make(map[dockertools.DockerID]empty) killedContainers := make(map[dockertools.DockerID]empty) @@ -484,7 +484,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine } containersToKeep[netID] = empty{} - podVolumes, err := kl.mountExternalVolumes(&pod.Manifest) + podVolumes, err := kl.mountExternalVolumes(pod) if err != nil { glog.Errorf("Unable to mount volumes for pod %s: (%v) Skipping pod.", podFullName, err) return err @@ -501,7 +501,7 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine podState.PodIP = netInfo.PodIP } - for _, container := range pod.Manifest.Containers { + for _, container := range pod.Spec.Containers { expectedHash := dockertools.HashContainer(&container) if dockerContainer, found, hash := dockerContainers.FindPodContainer(podFullName, uuid, container.Name); found { containerID := dockertools.DockerID(dockerContainer.ID) @@ -538,13 +538,13 @@ func (kl *Kubelet) syncPod(pod *Pod, dockerContainers dockertools.DockerContaine // TODO(dawnchen): error handling here? } - if len(recentContainers) > 0 && pod.Manifest.RestartPolicy.Always == nil { - if pod.Manifest.RestartPolicy.Never != nil { + if len(recentContainers) > 0 && pod.Spec.RestartPolicy.Always == nil { + if pod.Spec.RestartPolicy.Never != nil { glog.V(3).Infof("Already ran container with name %s--%s--%s, do nothing", podFullName, uuid, container.Name) continue } - if pod.Manifest.RestartPolicy.OnFailure != nil { + if pod.Spec.RestartPolicy.OnFailure != nil { // Check the exit code of last run if recentContainers[0].State.ExitCode == 0 { glog.V(3).Infof("Already successfully ran container with name %s--%s--%s, do nothing", @@ -605,11 +605,11 @@ type podContainer struct { // Stores all volumes defined by the set of pods into a map. // Keys for each entry are in the format (POD_ID)/(VOLUME_NAME) -func getDesiredVolumes(pods []Pod) map[string]api.Volume { +func getDesiredVolumes(pods []api.BoundPod) map[string]api.Volume { desiredVolumes := make(map[string]api.Volume) for _, pod := range pods { - for _, volume := range pod.Manifest.Volumes { - identifier := path.Join(pod.Manifest.ID, volume.Name) + for _, volume := range pod.Spec.Volumes { + identifier := path.Join(pod.ID, volume.Name) desiredVolumes[identifier] = volume } } @@ -618,7 +618,7 @@ func getDesiredVolumes(pods []Pod) map[string]api.Volume { // Compares the map of current volumes to the map of desired volumes. // If an active volume does not have a respective desired volume, clean it up. -func (kl *Kubelet) reconcileVolumes(pods []Pod) error { +func (kl *Kubelet) reconcileVolumes(pods []api.BoundPod) error { desiredVolumes := getDesiredVolumes(pods) currentVolumes := volume.GetCurrentVolumes(kl.rootDirectory) for name, vol := range currentVolumes { @@ -637,7 +637,7 @@ func (kl *Kubelet) reconcileVolumes(pods []Pod) error { } // SyncPods synchronizes the configured list of pods (desired state) with the host current state. -func (kl *Kubelet) SyncPods(pods []Pod) error { +func (kl *Kubelet) SyncPods(pods []api.BoundPod) error { glog.V(4).Infof("Desired [%s]: %+v", kl.hostname, pods) var err error desiredContainers := make(map[podContainer]empty) @@ -652,11 +652,11 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { for ix := range pods { pod := &pods[ix] podFullName := GetPodFullName(pod) - uuid := pod.Manifest.UUID + uuid := pod.UID // Add all containers (including net) to the map. desiredContainers[podContainer{podFullName, uuid, networkContainerName}] = empty{} - for _, cont := range pod.Manifest.Containers { + for _, cont := range pod.Spec.Containers { desiredContainers[podContainer{podFullName, uuid, cont.Name}] = empty{} } @@ -693,13 +693,13 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { } // filterHostPortConflicts removes pods that conflict on Port.HostPort values -func filterHostPortConflicts(pods []Pod) []Pod { - filtered := []Pod{} +func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { + filtered := []api.BoundPod{} ports := map[int]bool{} extract := func(p *api.Port) int { return p.HostPort } for i := range pods { pod := &pods[i] - if errs := validation.AccumulateUniquePorts(pod.Manifest.Containers, ports, extract); len(errs) != 0 { + if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { glog.Warningf("Pod %s has conflicting ports, ignoring: %v", GetPodFullName(pod), errs) continue } @@ -784,10 +784,10 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName, tail stri // GetPodInfo returns information from Docker about the containers in a pod func (kl *Kubelet) GetPodInfo(podFullName, uuid string) (api.PodInfo, error) { - var manifest api.ContainerManifest + var manifest api.PodSpec for _, pod := range kl.pods { if GetPodFullName(&pod) == podFullName { - manifest = pod.Manifest + manifest = pod.Spec break } } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index df323fccc262c..b0dff6f97056d 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -125,7 +125,7 @@ func TestKillContainer(t *testing.T) { } type channelReader struct { - list [][]Pod + list [][]api.BoundPod wg sync.WaitGroup } @@ -145,7 +145,7 @@ func startReading(channel <-chan interface{}) *channelReader { return cr } -func (cr *channelReader) GetList() [][]Pod { +func (cr *channelReader) GetList() [][]api.BoundPod { cr.wg.Wait() return cr.list } @@ -156,21 +156,23 @@ func TestSyncPodsDoesNothing(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // format is k8s__ - Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.test"}, + Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.new.test"}, ID: "1234", }, { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ container, }, @@ -209,12 +211,14 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { kubelet, _, fakeDocker := newTestKubelet(t) kubelet.networkContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -242,8 +246,8 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { } if len(fakeDocker.Created) != 2 || - !matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { + !matchString(t, "k8s_net\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) || + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[1]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -255,12 +259,14 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { puller.HasImages = []string{} kubelet.networkContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -282,8 +288,8 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { } if len(fakeDocker.Created) != 2 || - !matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { + !matchString(t, "k8s_net\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) || + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[1]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -294,16 +300,18 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -320,7 +328,7 @@ func TestSyncPodsWithNetCreatesContainer(t *testing.T) { fakeDocker.Lock() if len(fakeDocker.Created) != 1 || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) { + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -333,16 +341,18 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ { Name: "bar", @@ -370,7 +380,7 @@ func TestSyncPodsWithNetCreatesContainerCallsHandler(t *testing.T) { fakeDocker.Lock() if len(fakeDocker.Created) != 1 || - !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) { + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.new.test_", fakeDocker.Created[0]) { t.Errorf("Unexpected containers created %v", fakeDocker.Created) } fakeDocker.Unlock() @@ -384,16 +394,18 @@ func TestSyncPodsDeletesWithNoNetContainer(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // format is k8s__ - Names: []string{"/k8s_bar_foo.test"}, + Names: []string{"/k8s_bar_foo.new.test"}, ID: "1234", }, } - err := kubelet.SyncPods([]Pod{ + err := kubelet.SyncPods([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -425,12 +437,12 @@ func TestSyncPodsDeletes(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_foo_bar.test"}, + Names: []string{"/k8s_foo_bar.new.test"}, ID: "1234", }, { // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, { @@ -438,7 +450,7 @@ func TestSyncPodsDeletes(t *testing.T) { ID: "4567", }, } - err := kubelet.SyncPods([]Pod{}) + err := kubelet.SyncPods([]api.BoundPod{}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -463,30 +475,32 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_foo_bar.test_1"}, + Names: []string{"/k8s_foo_bar.new.test_1"}, ID: "1234", }, "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_bar.test_"}, + Names: []string{"/k8s_net_bar.new.test_"}, ID: "9876", }, "4567": &docker.APIContainers{ // Duplicate for the same container. - Names: []string{"/k8s_foo_bar.test_2"}, + Names: []string{"/k8s_foo_bar.new.test_2"}, ID: "4567", }, "2304": &docker.APIContainers{ // Container for another pod, untouched. - Names: []string{"/k8s_baz_fiz.test_6"}, + Names: []string{"/k8s_baz_fiz.new.test_6"}, ID: "2304", }, } - err := kubelet.syncPod(&Pod{ - Name: "bar", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "bar", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "bar", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "foo"}, }, @@ -520,20 +534,22 @@ func TestSyncPodBadHash(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_bar.1234_foo.test"}, + Names: []string{"/k8s_bar.1234_foo.new.test"}, ID: "1234", }, "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.syncPod(&Pod{ - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -562,20 +578,22 @@ func TestSyncPodUnhealthy(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "1234": &docker.APIContainers{ // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_bar_foo.test"}, + Names: []string{"/k8s_bar_foo.new.test"}, ID: "1234", }, "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.syncPod(&Pod{ - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar", LivenessProbe: &api.LivenessProbe{ @@ -629,25 +647,31 @@ func TestMakeEnvVariables(t *testing.T) { func TestMountExternalVolumes(t *testing.T) { kubelet, _, _ := newTestKubelet(t) - manifest := api.ContainerManifest{ - Volumes: []api.Volume{ - { - Name: "host-dir", - Source: &api.VolumeSource{ - HostDir: &api.HostDir{"/dir/path"}, + pod := api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "test", + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "host-dir", + Source: &api.VolumeSource{ + HostDir: &api.HostDir{"/dir/path"}, + }, }, }, }, } - podVolumes, _ := kubelet.mountExternalVolumes(&manifest) + podVolumes, _ := kubelet.mountExternalVolumes(&pod) expectedPodVolumes := make(volumeMap) expectedPodVolumes["host-dir"] = &volume.HostDir{"/dir/path"} if len(expectedPodVolumes) != len(podVolumes) { - t.Errorf("Unexpected volumes. Expected %#v got %#v. Manifest was: %#v", expectedPodVolumes, podVolumes, manifest) + t.Errorf("Unexpected volumes. Expected %#v got %#v. Manifest was: %#v", expectedPodVolumes, podVolumes, pod) } for name, expectedVolume := range expectedPodVolumes { if _, ok := podVolumes[name]; !ok { - t.Errorf("Pod volumes map is missing key: %s. %#v", expectedVolume, podVolumes) + t.Errorf("api.BoundPod volumes map is missing key: %s. %#v", expectedVolume, podVolumes) } } } @@ -678,9 +702,11 @@ func TestMakeVolumesAndBinds(t *testing.T) { }, } - pod := Pod{ - Name: "pod", - Namespace: "test", + pod := api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "pod", + Namespace: "test", + }, } podVolumes := volumeMap{ @@ -769,26 +795,26 @@ func TestMakePortsAndBindings(t *testing.T) { } func TestCheckHostPortConflicts(t *testing.T) { - successCaseAll := []Pod{ - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, + successCaseAll := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } - successCaseNew := Pod{ - Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}}, + successCaseNew := api.BoundPod{ + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}}, } expected := append(successCaseAll, successCaseNew) if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { t.Errorf("Expected %#v, Got %#v", expected, actual) } - failureCaseAll := []Pod{ - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, - {Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, + failureCaseAll := []api.BoundPod{ + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } - failureCaseNew := Pod{ - Manifest: api.ContainerManifest{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, + failureCaseNew := api.BoundPod{ + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, } if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { t.Errorf("Expected %#v, Got %#v", expected, actual) @@ -965,7 +991,7 @@ func TestRunInContainerNoSuchPod(t *testing.T) { podNamespace := "etcd" containerName := "containerFoo" output, err := kubelet.RunInContainer( - GetPodFullName(&Pod{Name: podName, Namespace: podNamespace}), + GetPodFullName(&api.BoundPod{TypeMeta: api.TypeMeta{ID: podName, Namespace: podNamespace}}), "", containerName, []string{"ls"}) @@ -990,13 +1016,19 @@ func TestRunInContainer(t *testing.T) { fakeDocker.ContainerList = []docker.APIContainers{ { ID: containerID, - Names: []string{"/k8s_" + containerName + "_" + podName + "." + podNamespace + "_1234"}, + Names: []string{"/k8s_" + containerName + "_" + podName + "." + podNamespace + ".test_1234"}, }, } cmd := []string{"ls"} _, err := kubelet.RunInContainer( - GetPodFullName(&Pod{Name: podName, Namespace: podNamespace}), + GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podName, + Namespace: podNamespace, + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + }), "", containerName, cmd) @@ -1128,15 +1160,17 @@ func TestSyncPodEventHandlerFails(t *testing.T) { dockerContainers := dockertools.DockerContainers{ "9876": &docker.APIContainers{ // network container - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", }, } - err := kubelet.syncPod(&Pod{ - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + err := kubelet.syncPod(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar", Lifecycle: &api.Lifecycle{ diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index cdb00a6edfd3f..0053a4bb8b0e4 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/golang/glog" ) @@ -32,7 +33,7 @@ const ( ) type RunPodResult struct { - Pod *Pod + Pod *api.BoundPod Err error } @@ -50,7 +51,7 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { } // runOnce runs a given set of pods and returns their status. -func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { +func (kl *Kubelet) runOnce(pods []api.BoundPod) (results []RunPodResult, err error) { if kl.dockerPuller == nil { kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } @@ -72,10 +73,10 @@ func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { results = append(results, res) if res.Err != nil { // TODO(proppy): report which containers failed the pod. - glog.Infof("failed to start pod %q: %v", res.Pod.Name, res.Err) - failedPods = append(failedPods, res.Pod.Name) + glog.Infof("failed to start pod %q: %v", res.Pod.ID, res.Err) + failedPods = append(failedPods, res.Pod.ID) } else { - glog.Infof("started pod %q", res.Pod.Name) + glog.Infof("started pod %q", res.Pod.ID) } } if len(failedPods) > 0 { @@ -86,7 +87,7 @@ func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { } // runPod runs a single pod and wait until all containers are running. -func (kl *Kubelet) runPod(pod Pod) error { +func (kl *Kubelet) runPod(pod api.BoundPod) error { delay := RunOnceRetryDelay retry := 0 for { @@ -95,18 +96,18 @@ func (kl *Kubelet) runPod(pod Pod) error { return fmt.Errorf("failed to get kubelet docker containers: %v", err) } if running := kl.isPodRunning(pod, dockerContainers); running { - glog.Infof("pod %q containers running", pod.Name) + glog.Infof("pod %q containers running", pod.ID) return nil } - glog.Infof("pod %q containers not running: syncing", pod.Name) + glog.Infof("pod %q containers not running: syncing", pod.ID) if err = kl.syncPod(&pod, dockerContainers); err != nil { return fmt.Errorf("error syncing pod: %v", err) } if retry >= RunOnceMaxRetries { - return fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries) + return fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.ID, RunOnceMaxRetries) } // TODO(proppy): health checking would be better than waiting + checking the state at the next iteration. - glog.Infof("pod %q containers synced, waiting for %v", pod.Name, delay) + glog.Infof("pod %q containers synced, waiting for %v", pod.ID, delay) <-time.After(delay) retry++ delay *= RunOnceRetryDelayBackoff @@ -114,9 +115,9 @@ func (kl *Kubelet) runPod(pod Pod) error { } // isPodRunning returns true if all containers of a manifest are running. -func (kl *Kubelet) isPodRunning(pod Pod, dockerContainers dockertools.DockerContainers) bool { - for _, container := range pod.Manifest.Containers { - if dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.Manifest.UUID, container.Name); !found || dockerContainer.Status != "running" { +func (kl *Kubelet) isPodRunning(pod api.BoundPod, dockerContainers dockertools.DockerContainers) bool { + for _, container := range pod.Spec.Containers { + if dockerContainer, found, _ := dockerContainers.FindPodContainer(GetPodFullName(&pod), pod.UID, container.Name); !found || dockerContainer.Status != "running" { glog.Infof("container %q not found (%v) or not running: %#v", container.Name, found, dockerContainer) return false } diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index c110eccf92867..e23be9005850c 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -69,12 +69,12 @@ func TestRunOnce(t *testing.T) { kb := &Kubelet{} podContainers := []docker.APIContainers{ { - Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.test"}, + Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&api.Container{Name: "bar"}), 16) + "_foo.new.test"}, ID: "1234", Status: "running", }, { - Names: []string{"/k8s_net_foo.test_"}, + Names: []string{"/k8s_net_foo.new.test_"}, ID: "9876", Status: "running", }, @@ -106,12 +106,14 @@ func TestRunOnce(t *testing.T) { t: t, } kb.dockerPuller = &dockertools.FakeDockerPuller{} - results, err := kb.runOnce([]Pod{ + results, err := kb.runOnce([]api.BoundPod{ { - Name: "foo", - Namespace: "test", - Manifest: api.ContainerManifest{ - ID: "foo", + TypeMeta: api.TypeMeta{ + ID: "foo", + Namespace: "new", + Annotations: map[string]string{ConfigSourceAnnotationKey: "test"}, + }, + Spec: api.PodSpec{ Containers: []api.Container{ {Name: "bar"}, }, @@ -124,7 +126,7 @@ func TestRunOnce(t *testing.T) { if results[0].Err != nil { t.Errorf("unexpected run pod error: %v", results[0].Err) } - if results[0].Pod.Name != "foo" { - t.Errorf("unexpected pod: %q", results[0].Pod.Name) + if results[0].Pod.ID != "foo" { + t.Errorf("unexpected pod: %q", results[0].Pod.ID) } } diff --git a/pkg/kubelet/server.go b/pkg/kubelet/server.go index 0fdf1a8678513..16d7f68934254 100644 --- a/pkg/kubelet/server.go +++ b/pkg/kubelet/server.go @@ -119,15 +119,26 @@ func (s *Server) handleContainer(w http.ResponseWriter, req *http.Request) { return } // This is to provide backward compatibility. It only supports a single manifest - var pod Pod - err = yaml.Unmarshal(data, &pod.Manifest) + var pod api.BoundPod + var containerManifest api.ContainerManifest + err = yaml.Unmarshal(data, &containerManifest) if err != nil { s.error(w, err) return } + pod.ID = containerManifest.ID + pod.UID = containerManifest.UUID + pod.Spec.Containers = containerManifest.Containers + pod.Spec.Volumes = containerManifest.Volumes + pod.Spec.RestartPolicy = containerManifest.RestartPolicy //TODO: sha1 of manifest? - pod.Name = "1" - s.updates <- PodUpdate{[]Pod{pod}, SET} + if pod.ID == "" { + pod.ID = "1" + } + if pod.UID == "" { + pod.UID = "1" + } + s.updates <- PodUpdate{[]api.BoundPod{pod}, SET} } @@ -139,16 +150,16 @@ func (s *Server) handleContainers(w http.ResponseWriter, req *http.Request) { s.error(w, err) return } - var manifests []api.ContainerManifest - err = yaml.Unmarshal(data, &manifests) + var specs []api.PodSpec + err = yaml.Unmarshal(data, &specs) if err != nil { s.error(w, err) return } - pods := make([]Pod, len(manifests)) - for i := range manifests { - pods[i].Name = fmt.Sprintf("%d", i+1) - pods[i].Manifest = manifests[i] + pods := make([]api.BoundPod, len(specs)) + for i := range specs { + pods[i].ID = fmt.Sprintf("%d", i+1) + pods[i].Spec = specs[i] } s.updates <- PodUpdate{pods, SET} @@ -186,7 +197,14 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) { follow, _ := strconv.ParseBool(uriValues.Get("follow")) tail := uriValues.Get("tail") - podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podID, + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) fw := FlushWriter{writer: w} if flusher, ok := w.(http.Flusher); ok { @@ -216,10 +234,17 @@ func (s *Server) handlePodInfo(w http.ResponseWriter, req *http.Request) { return } // TODO: backwards compatibility with existing API, needs API change - podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podID, + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) info, err := s.host.GetPodInfo(podFullName, podUUID) if err == dockertools.ErrNoContainersInPod { - http.Error(w, "Pod does not exist", http.StatusNotFound) + http.Error(w, "api.BoundPod does not exist", http.StatusNotFound) return } if err != nil { @@ -283,7 +308,14 @@ func (s *Server) handleRun(w http.ResponseWriter, req *http.Request) { http.Error(w, "Unexpected path for command running", http.StatusBadRequest) return } - podFullName := GetPodFullName(&Pod{Name: podID, Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: podID, + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) command := strings.Split(u.Query().Get("cmd"), " ") data, err := s.host.RunInContainer(podFullName, uuid, container, command) if err != nil { @@ -327,10 +359,24 @@ func (s *Server) serveStats(w http.ResponseWriter, req *http.Request) { errors.New("pod level status currently unimplemented") case 3: // Backward compatibility without uuid information - podFullName := GetPodFullName(&Pod{Name: components[1], Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: components[1], + // TODO: I am broken + Namespace: api.NamespaceDefault, + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) stats, err = s.host.GetContainerInfo(podFullName, "", components[2], &query) case 4: - podFullName := GetPodFullName(&Pod{Name: components[1], Namespace: "etcd"}) + podFullName := GetPodFullName(&api.BoundPod{ + TypeMeta: api.TypeMeta{ + ID: components[1], + // TODO: I am broken + Namespace: "", + Annotations: map[string]string{ConfigSourceAnnotationKey: "etcd"}, + }, + }) stats, err = s.host.GetContainerInfo(podFullName, components[2], components[2], &query) default: http.Error(w, "unknown resource.", http.StatusNotFound) diff --git a/pkg/kubelet/server_test.go b/pkg/kubelet/server_test.go index 91a742fbea0ed..3261337c47c96 100644 --- a/pkg/kubelet/server_test.go +++ b/pkg/kubelet/server_test.go @@ -101,7 +101,23 @@ func readResp(resp *http.Response) (string, error) { func TestContainer(t *testing.T) { fw := newServerTest() expected := []api.ContainerManifest{ - {ID: "test_manifest"}, + { + ID: "test_manifest", + UUID: "value", + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, } body := bytes.NewBuffer([]byte(util.EncodeJSON(expected[0]))) // Only send a single ContainerManifest resp, err := http.Post(fw.testHTTPServer.URL+"/container", "application/json", body) @@ -114,7 +130,29 @@ func TestContainer(t *testing.T) { if len(received) != 1 { t.Errorf("Expected 1 manifest, but got %v", len(received)) } - expectedPods := []Pod{{Name: "1", Manifest: expected[0]}} + expectedPods := []api.BoundPod{ + { + TypeMeta: api.TypeMeta{ + ID: "test_manifest", + UID: "value", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + }, + } if !reflect.DeepEqual(expectedPods, received[0]) { t.Errorf("Expected %#v, but got %#v", expectedPods, received[0]) } @@ -123,8 +161,38 @@ func TestContainer(t *testing.T) { func TestContainers(t *testing.T) { fw := newServerTest() expected := []api.ContainerManifest{ - {ID: "test_manifest_1"}, - {ID: "test_manifest_2"}, + { + ID: "test_manifest_1", + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + { + ID: "test_manifest_2", + Containers: []api.Container{ + { + Name: "container2", + }, + }, + Volumes: []api.Volume{ + { + Name: "test2", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, } body := bytes.NewBuffer([]byte(util.EncodeJSON(expected))) resp, err := http.Post(fw.testHTTPServer.URL+"/containers", "application/json", body) @@ -137,7 +205,48 @@ func TestContainers(t *testing.T) { if len(received) != 1 { t.Errorf("Expected 1 update, but got %v", len(received)) } - expectedPods := []Pod{{Name: "1", Manifest: expected[0]}, {Name: "2", Manifest: expected[1]}} + expectedPods := []api.BoundPod{ + { + TypeMeta: api.TypeMeta{ + ID: "1", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container", + }, + }, + Volumes: []api.Volume{ + { + Name: "test", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + }, + { + TypeMeta: api.TypeMeta{ + ID: "2", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container2", + }, + }, + Volumes: []api.Volume{ + { + Name: "test2", + }, + }, + RestartPolicy: api.RestartPolicy{ + Never: &api.RestartPolicyNever{}, + }, + }, + }, + } if !reflect.DeepEqual(expectedPods, received[0]) { t.Errorf("Expected %#v, but got %#v", expectedPods, received[0]) } @@ -149,7 +258,7 @@ func TestPodInfo(t *testing.T) { "goodpod": api.ContainerStatus{}, } fw.fakeKubelet.infoFunc = func(name string) (api.PodInfo, error) { - if name == "goodpod.etcd" { + if name == "goodpod.default.etcd" { return expected, nil } return nil, fmt.Errorf("bad pod %s", name) @@ -175,7 +284,7 @@ func TestContainerInfo(t *testing.T) { fw := newServerTest() expectedInfo := &info.ContainerInfo{} podID := "somepod" - expectedPodID := "somepod" + ".etcd" + expectedPodID := "somepod" + ".default.etcd" expectedContainerName := "goodcontainer" fw.fakeKubelet.containerInfoFunc = func(podID, containerName string, req *info.ContainerInfoRequest) (*info.ContainerInfo, error) { if podID != expectedPodID || containerName != expectedContainerName { @@ -278,7 +387,7 @@ func TestServeRunInContainer(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedCommand := "ls -a" fw.fakeKubelet.runFunc = func(podFullName, uuid, containerName string, cmd []string) ([]byte, error) { @@ -317,7 +426,7 @@ func TestServeRunInContainerWithUUID(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedUuid := "7e00838d_-_3523_-_11e4_-_8421_-_42010af0a720" expectedContainerName := "baz" expectedCommand := "ls -a" @@ -360,7 +469,7 @@ func TestContainerLogs(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedTail := "" expectedFollow := false @@ -399,7 +508,7 @@ func TestContainerLogsWithTail(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedTail := "5" expectedFollow := false @@ -438,7 +547,7 @@ func TestContainerLogsWithFollow(t *testing.T) { fw := newServerTest() output := "foo bar" podName := "foo" - expectedPodName := podName + ".etcd" + expectedPodName := podName + ".default.etcd" expectedContainerName := "baz" expectedTail := "" expectedFollow := true diff --git a/pkg/kubelet/types.go b/pkg/kubelet/types.go index bd063fc129aaa..d8f167f909d2d 100644 --- a/pkg/kubelet/types.go +++ b/pkg/kubelet/types.go @@ -22,13 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -// Pod represents the structure of a pod on the Kubelet, distinct from the apiserver -// representation of a Pod. -type Pod struct { - Namespace string - Name string - Manifest api.ContainerManifest -} +const ConfigSourceAnnotationKey = "kubernetes/config.source" // PodOperation defines what changes will be made on a pod configuration. type PodOperation int @@ -48,13 +42,13 @@ const ( // sending an array of size one and Op == ADD|REMOVE (with REMOVE, only the ID is required). // For setting the state of the system to a given state for this source configuration, set // Pods as desired and Op to SET, which will reset the system state to that specified in this -// operation for this source channel. To remove all pods, set Pods to empty array and Op to SET. +// operation for this source channel. To remove all pods, set Pods to empty object and Op to SET. type PodUpdate struct { - Pods []Pod + Pods []api.BoundPod Op PodOperation } -// GetPodFullName returns a name that full identifies a pod across all config sources. -func GetPodFullName(pod *Pod) string { - return fmt.Sprintf("%s.%s", pod.Name, pod.Namespace) +// GetPodFullName returns a name that uniquely identifies a pod across all config sources. +func GetPodFullName(pod *api.BoundPod) string { + return fmt.Sprintf("%s.%s.%s", pod.ID, pod.Namespace, pod.Annotations[ConfigSourceAnnotationKey]) } diff --git a/pkg/kubelet/validation.go b/pkg/kubelet/validation.go deleted file mode 100644 index 6cf3cfbada9c4..0000000000000 --- a/pkg/kubelet/validation.go +++ /dev/null @@ -1,33 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - apierrs "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" -) - -func ValidatePod(pod *Pod) (errors []error) { - if !util.IsDNSSubdomain(pod.Name) { - errors = append(errors, apierrs.NewFieldInvalid("name", pod.Name)) - } - if errs := validation.ValidateManifest(&pod.Manifest); len(errs) != 0 { - errors = append(errors, errs...) - } - return errors -} diff --git a/pkg/kubelet/validation_test.go b/pkg/kubelet/validation_test.go deleted file mode 100644 index 499199750bda3..0000000000000 --- a/pkg/kubelet/validation_test.go +++ /dev/null @@ -1,43 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet_test - -import ( - "strings" - "testing" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - . "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet" -) - -func TestValidatePodNoName(t *testing.T) { - errorCases := map[string]Pod{ - // manifest is tested in api/validation_test.go, ensure it is invoked - "empty version": {Name: "test", Manifest: api.ContainerManifest{Version: "", ID: "abc"}}, - - // Name - "zero-length name": {Name: "", Manifest: api.ContainerManifest{Version: "v1beta1"}}, - "name > 255 characters": {Name: strings.Repeat("a", 256), Manifest: api.ContainerManifest{Version: "v1beta1"}}, - "name not a DNS subdomain": {Name: "a.b.c.", Manifest: api.ContainerManifest{Version: "v1beta1"}}, - "name with underscore": {Name: "a_b_c", Manifest: api.ContainerManifest{Version: "v1beta1"}}, - } - for k, v := range errorCases { - if errs := ValidatePod(&v); len(errs) == 0 { - t.Errorf("expected failure for %s", k) - } - } -} From 5053a4589f653d826493a22f59fbd3380b210013 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Thu, 9 Oct 2014 13:27:47 -0400 Subject: [PATCH 2/3] Write BoundPods to etcd instead of ContainerManifestList Rename ManifestFactory -> BoundPodFactory and change the general structure of the call to focus on BoundPod. --- pkg/api/conversion.go | 17 ++++- .../controller/minioncontroller_test.go | 8 ++- pkg/constraint/constraint.go | 6 +- pkg/constraint/constraint_test.go | 36 +++++----- pkg/constraint/ports.go | 6 +- pkg/master/master.go | 6 +- pkg/registry/etcd/etcd.go | 36 +++++----- pkg/registry/etcd/etcd_test.go | 66 +++++++++---------- ...nifest_factory.go => bound_pod_factory.go} | 21 +++--- ...tory_test.go => bound_pod_factory_test.go} | 42 ++++++------ 10 files changed, 131 insertions(+), 113 deletions(-) rename pkg/registry/pod/{manifest_factory.go => bound_pod_factory.go} (67%) rename pkg/registry/pod/{manifest_factory_test.go => bound_pod_factory_test.go} (79%) diff --git a/pkg/api/conversion.go b/pkg/api/conversion.go index 05323e9e262a9..e6f937103186c 100644 --- a/pkg/api/conversion.go +++ b/pkg/api/conversion.go @@ -61,5 +61,20 @@ func init() { } out.ResourceVersion = in.ResourceVersion return nil - }) + }, + + // Convert Pod to BoundPod + func(in *Pod, out *BoundPod, s conversion.Scope) error { + if err := s.Convert(&in.DesiredState.Manifest, out, 0); err != nil { + return err + } + // Only copy a subset of fields, and override manifest attributes with the pod + // metadata + out.UID = in.UID + out.ID = in.ID + out.Namespace = in.Namespace + out.CreationTimestamp = in.CreationTimestamp + return nil + }, + ) } diff --git a/pkg/cloudprovider/controller/minioncontroller_test.go b/pkg/cloudprovider/controller/minioncontroller_test.go index 4e3f3221cbd3f..ef117fc44cddf 100644 --- a/pkg/cloudprovider/controller/minioncontroller_test.go +++ b/pkg/cloudprovider/controller/minioncontroller_test.go @@ -33,10 +33,12 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *etcdregistry.Registry { - registry := etcdregistry.NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, - &pod.BasicManifestFactory{ + registry := etcdregistry.NewRegistry( + tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, + &pod.BasicBoundPodFactory{ ServiceRegistry: ®istrytest.ServiceRegistry{}, - }) + }, + ) return registry } diff --git a/pkg/constraint/constraint.go b/pkg/constraint/constraint.go index 0e08162792f3e..8acc8c073d86e 100644 --- a/pkg/constraint/constraint.go +++ b/pkg/constraint/constraint.go @@ -20,8 +20,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" ) -// Allowed returns true if manifests is a collection of manifests +// Allowed returns true if pods is a collection of bound pods // which can run without conflict on a single minion. -func Allowed(manifests []api.ContainerManifest) bool { - return !PortsConflict(manifests) +func Allowed(pods []api.BoundPod) bool { + return !PortsConflict(pods) } diff --git a/pkg/constraint/constraint_test.go b/pkg/constraint/constraint_test.go index 4d168551fc77f..1677d935df526 100644 --- a/pkg/constraint/constraint_test.go +++ b/pkg/constraint/constraint_test.go @@ -30,27 +30,27 @@ func containerWithHostPorts(ports ...int) api.Container { return c } -func manifestWithContainers(containers ...api.Container) api.ContainerManifest { - m := api.ContainerManifest{} +func podWithContainers(containers ...api.Container) api.BoundPod { + m := api.BoundPod{} for _, c := range containers { - m.Containers = append(m.Containers, c) + m.Spec.Containers = append(m.Spec.Containers, c) } return m } func TestAllowed(t *testing.T) { table := []struct { - allowed bool - manifests []api.ContainerManifest + allowed bool + pods []api.BoundPod }{ { allowed: true, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(1, 2, 3), containerWithHostPorts(4, 5, 6), ), - manifestWithContainers( + podWithContainers( containerWithHostPorts(7, 8, 9), containerWithHostPorts(10, 11, 12), ), @@ -58,12 +58,12 @@ func TestAllowed(t *testing.T) { }, { allowed: true, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(0, 0), containerWithHostPorts(0, 0), ), - manifestWithContainers( + podWithContainers( containerWithHostPorts(0, 0), containerWithHostPorts(0, 0), ), @@ -71,19 +71,19 @@ func TestAllowed(t *testing.T) { }, { allowed: false, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(3, 3), ), }, }, { allowed: false, - manifests: []api.ContainerManifest{ - manifestWithContainers( + pods: []api.BoundPod{ + podWithContainers( containerWithHostPorts(6), ), - manifestWithContainers( + podWithContainers( containerWithHostPorts(6), ), }, @@ -91,8 +91,8 @@ func TestAllowed(t *testing.T) { } for _, item := range table { - if e, a := item.allowed, Allowed(item.manifests); e != a { - t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.manifests) + if e, a := item.allowed, Allowed(item.pods); e != a { + t.Errorf("Expected %v, got %v: \n%v\v", e, a, item.pods) } } } diff --git a/pkg/constraint/ports.go b/pkg/constraint/ports.go index 92622469a2c87..f5c95f5fb2d42 100644 --- a/pkg/constraint/ports.go +++ b/pkg/constraint/ports.go @@ -22,10 +22,10 @@ import ( // PortsConflict returns true iff two containers attempt to expose // the same host port. -func PortsConflict(manifests []api.ContainerManifest) bool { +func PortsConflict(pods []api.BoundPod) bool { hostPorts := map[int]struct{}{} - for _, manifest := range manifests { - for _, container := range manifest.Containers { + for _, pod := range pods { + for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.HostPort == 0 { continue diff --git a/pkg/master/master.go b/pkg/master/master.go index 366e8a3557c59..5bcabfac035f7 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -86,15 +86,15 @@ func NewEtcdHelper(client tools.EtcdGetSet, version string) (helper tools.EtcdHe func New(c *Config) *Master { minionRegistry := makeMinionRegistry(c) serviceRegistry := etcd.NewRegistry(c.EtcdHelper, nil) - manifestFactory := &pod.BasicManifestFactory{ + boundPodFactory := &pod.BasicBoundPodFactory{ ServiceRegistry: serviceRegistry, } m := &Master{ - podRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory), + podRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory), controllerRegistry: etcd.NewRegistry(c.EtcdHelper, nil), serviceRegistry: serviceRegistry, endpointRegistry: etcd.NewRegistry(c.EtcdHelper, nil), - bindingRegistry: etcd.NewRegistry(c.EtcdHelper, manifestFactory), + bindingRegistry: etcd.NewRegistry(c.EtcdHelper, boundPodFactory), eventRegistry: event.NewEtcdRegistry(c.EtcdHelper, uint64(c.EventTTL.Seconds())), minionRegistry: minionRegistry, client: c.Client, diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 9d0d4860b8816..331af8e02c0f7 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -40,15 +40,15 @@ import ( // Registry implements PodRegistry, ControllerRegistry, ServiceRegistry and MinionRegistry, backed by etcd. type Registry struct { tools.EtcdHelper - manifestFactory pod.ManifestFactory + boundPodFactory pod.BoundPodFactory } // NewRegistry creates an etcd registry. -func NewRegistry(helper tools.EtcdHelper, manifestFactory pod.ManifestFactory) *Registry { +func NewRegistry(helper tools.EtcdHelper, boundPodFactory pod.BoundPodFactory) *Registry { registry := &Registry{ EtcdHelper: helper, } - registry.manifestFactory = manifestFactory + registry.boundPodFactory = boundPodFactory return registry } @@ -176,18 +176,18 @@ func (r *Registry) assignPod(podID string, machine string) error { return err } // TODO: move this to a watch/rectification loop. - manifest, err := r.manifestFactory.MakeManifest(machine, *finalPod) + pod, err := r.boundPodFactory.MakeBoundPod(machine, finalPod) if err != nil { return err } contKey := makeContainerKey(machine) - err = r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) { - manifests := *in.(*api.ContainerManifestList) - manifests.Items = append(manifests.Items, manifest) - if !constraint.Allowed(manifests.Items) { + err = r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) { + pods := *in.(*api.BoundPods) + pods.Items = append(pods.Items, *pod) + if !constraint.Allowed(pods.Items) { return nil, fmt.Errorf("The assignment would cause a constraint violation") } - return &manifests, nil + return &pods, nil }) if err != nil { // Put the pod's host back the way it was. This is a terrible hack that @@ -261,13 +261,13 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error { } // Next, remove the pod from the machine atomically. contKey := makeContainerKey(machine) - return r.AtomicUpdate(contKey, &api.ContainerManifestList{}, func(in runtime.Object) (runtime.Object, error) { - manifests := in.(*api.ContainerManifestList) - newManifests := make([]api.ContainerManifest, 0, len(manifests.Items)) + return r.AtomicUpdate(contKey, &api.BoundPods{}, func(in runtime.Object) (runtime.Object, error) { + pods := in.(*api.BoundPods) + newPods := make([]api.BoundPod, 0, len(pods.Items)) found := false - for _, manifest := range manifests.Items { - if manifest.ID != podID { - newManifests = append(newManifests, manifest) + for _, pod := range pods.Items { + if pod.ID != podID { + newPods = append(newPods, pod) } else { found = true } @@ -276,10 +276,10 @@ func (r *Registry) DeletePod(ctx api.Context, podID string) error { // This really shouldn't happen, it indicates something is broken, and likely // there is a lost pod somewhere. // However it is "deleted" so log it and move on - glog.Warningf("Couldn't find: %s in %#v", podID, manifests) + glog.Warningf("Couldn't find: %s in %#v", podID, pods) } - manifests.Items = newManifests - return manifests, nil + pods.Items = newPods + return pods, nil }) } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index dbd964ffe8fa4..bc3fd38c795f8 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -35,7 +35,7 @@ import ( func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { registry := NewRegistry(tools.EtcdHelper{client, latest.Codec, tools.RuntimeVersionAdapter{latest.ResourceVersioner}}, - &pod.BasicManifestFactory{ + &pod.BasicBoundPodFactory{ ServiceRegistry: ®istrytest.ServiceRegistry{}, }) return registry @@ -117,7 +117,7 @@ func TestEtcdCreatePod(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{}), 0) + fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0) registry := NewTestEtcdRegistry(fakeClient) err := registry.CreatePod(ctx, &api.Pod{ TypeMeta: api.TypeMeta{ @@ -156,15 +156,15 @@ func TestEtcdCreatePod(t *testing.T) { if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests api.ContainerManifestList + var boundPods api.BoundPods resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests) - if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" { - t.Errorf("Unexpected manifest list: %#v", manifests) + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) } } @@ -286,15 +286,15 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests api.ContainerManifestList + var boundPods api.BoundPods resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests) - if len(manifests.Items) != 1 || manifests.Items[0].ID != "foo" { - t.Errorf("Unexpected manifest list: %#v", manifests) + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 1 || boundPods.Items[0].ID != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) } } @@ -308,9 +308,9 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "bar"}, + fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "bar"}}, }, }), 0) registry := NewTestEtcdRegistry(fakeClient) @@ -352,15 +352,15 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { if pod.ID != "foo" { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } - var manifests api.ContainerManifestList + var boundPods api.BoundPods resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } - err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &manifests) - if len(manifests.Items) != 2 || manifests.Items[1].ID != "foo" { - t.Errorf("Unexpected manifest list: %#v", manifests) + err = latest.Codec.DecodeInto([]byte(resp.Node.Value), &boundPods) + if len(boundPods.Items) != 2 || boundPods.Items[1].ID != "foo" { + t.Errorf("Unexpected boundPod list: %#v", boundPods) } } @@ -516,9 +516,9 @@ func TestEtcdDeletePod(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, }), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "foo"}, + fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo"}}, }, }), 0) registry := NewTestEtcdRegistry(fakeClient) @@ -536,9 +536,9 @@ func TestEtcdDeletePod(t *testing.T) { if err != nil { t.Fatalf("Unexpected error %v", err) } - var manifests api.ContainerManifestList - latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests) - if len(manifests.Items) != 0 { + var boundPods api.BoundPods + latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) + if len(boundPods.Items) != 0 { t.Errorf("Unexpected container set: %s, expected empty", response.Node.Value) } } @@ -553,10 +553,10 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, }), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ - Items: []api.ContainerManifest{ - {ID: "foo"}, - {ID: "bar"}, + fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + Items: []api.BoundPod{ + {TypeMeta: api.TypeMeta{ID: "foo"}}, + {TypeMeta: api.TypeMeta{ID: "bar"}}, }, }), 0) registry := NewTestEtcdRegistry(fakeClient) @@ -575,13 +575,13 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { if err != nil { t.Fatalf("Unexpected error %v", err) } - var manifests api.ContainerManifestList - latest.Codec.DecodeInto([]byte(response.Node.Value), &manifests) - if len(manifests.Items) != 1 { - t.Fatalf("Unexpected manifest set: %#v, expected empty", manifests) + var boundPods api.BoundPods + latest.Codec.DecodeInto([]byte(response.Node.Value), &boundPods) + if len(boundPods.Items) != 1 { + t.Fatalf("Unexpected boundPod set: %#v, expected empty", boundPods) } - if manifests.Items[0].ID != "bar" { - t.Errorf("Deleted wrong manifest: %#v", manifests) + if boundPods.Items[0].ID != "bar" { + t.Errorf("Deleted wrong boundPod: %#v", boundPods) } } diff --git a/pkg/registry/pod/manifest_factory.go b/pkg/registry/pod/bound_pod_factory.go similarity index 67% rename from pkg/registry/pod/manifest_factory.go rename to pkg/registry/pod/bound_pod_factory.go index db34efa43e069..00030acac5875 100644 --- a/pkg/registry/pod/manifest_factory.go +++ b/pkg/registry/pod/bound_pod_factory.go @@ -21,24 +21,27 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" ) -type ManifestFactory interface { +type BoundPodFactory interface { // Make a container object for a given pod, given the machine that the pod is running on. - MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) + MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) } -type BasicManifestFactory struct { +type BasicBoundPodFactory struct { // TODO: this should really point at the API rather than a registry ServiceRegistry service.Registry } -func (b *BasicManifestFactory) MakeManifest(machine string, pod api.Pod) (api.ContainerManifest, error) { +func (b *BasicBoundPodFactory) MakeBoundPod(machine string, pod *api.Pod) (*api.BoundPod, error) { envVars, err := service.GetServiceEnvironmentVariables(api.NewContext(), b.ServiceRegistry, machine) if err != nil { - return api.ContainerManifest{}, err + return nil, err } - for ix, container := range pod.DesiredState.Manifest.Containers { - pod.DesiredState.Manifest.ID = pod.ID - pod.DesiredState.Manifest.Containers[ix].Env = append(container.Env, envVars...) + boundPod := &api.BoundPod{} + if err := api.Scheme.Convert(pod, boundPod); err != nil { + return nil, err } - return pod.DesiredState.Manifest, nil + for ix, container := range boundPod.Spec.Containers { + boundPod.Spec.Containers[ix].Env = append(container.Env, envVars...) + } + return boundPod, nil } diff --git a/pkg/registry/pod/manifest_factory_test.go b/pkg/registry/pod/bound_pod_factory_test.go similarity index 79% rename from pkg/registry/pod/manifest_factory_test.go rename to pkg/registry/pod/bound_pod_factory_test.go index af1ccec3a80bb..5f2daac20ddb5 100644 --- a/pkg/registry/pod/manifest_factory_test.go +++ b/pkg/registry/pod/bound_pod_factory_test.go @@ -25,13 +25,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func TestMakeManifestNoServices(t *testing.T) { +func TestMakeBoundPodNoServices(t *testing.T) { registry := registrytest.ServiceRegistry{} - factory := &BasicManifestFactory{ + factory := &BasicBoundPodFactory{ ServiceRegistry: ®istry, } - manifest, err := factory.MakeManifest("machine", api.Pod{ + pod, err := factory.MakeBoundPod("machine", &api.Pod{ TypeMeta: api.TypeMeta{ID: "foobar"}, DesiredState: api.PodState{ Manifest: api.ContainerManifest{ @@ -44,21 +44,21 @@ func TestMakeManifestNoServices(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } - container := manifest.Containers[0] + container := pod.Spec.Containers[0] if len(container.Env) != 1 || container.Env[0].Name != "SERVICE_HOST" || container.Env[0].Value != "machine" { - t.Errorf("Expected one env vars, got: %#v", manifest) + t.Errorf("Expected one env vars, got: %#v", pod) } - if manifest.ID != "foobar" { - t.Errorf("Failed to assign ID to manifest: %#v", manifest.ID) + if pod.ID != "foobar" { + t.Errorf("Failed to assign ID to pod: %#v", pod.ID) } } -func TestMakeManifestServices(t *testing.T) { +func TestMakeBoundPodServices(t *testing.T) { registry := registrytest.ServiceRegistry{ List: api.ServiceList{ Items: []api.Service{ @@ -73,11 +73,11 @@ func TestMakeManifestServices(t *testing.T) { }, }, } - factory := &BasicManifestFactory{ + factory := &BasicBoundPodFactory{ ServiceRegistry: ®istry, } - manifest, err := factory.MakeManifest("machine", api.Pod{ + pod, err := factory.MakeBoundPod("machine", &api.Pod{ DesiredState: api.PodState{ Manifest: api.ContainerManifest{ Containers: []api.Container{ @@ -89,10 +89,10 @@ func TestMakeManifestServices(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } - container := manifest.Containers[0] + container := pod.Spec.Containers[0] envs := []api.EnvVar{ { Name: "TEST_SERVICE_HOST", @@ -128,8 +128,7 @@ func TestMakeManifestServices(t *testing.T) { }, } if len(container.Env) != len(envs) { - t.Errorf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), manifest) - return + t.Fatalf("Expected %d env vars, got %d: %#v", len(envs), len(container.Env), pod) } for ix := range container.Env { if !reflect.DeepEqual(envs[ix], container.Env[ix]) { @@ -138,7 +137,7 @@ func TestMakeManifestServices(t *testing.T) { } } -func TestMakeManifestServicesExistingEnvVar(t *testing.T) { +func TestMakeBoundPodServicesExistingEnvVar(t *testing.T) { registry := registrytest.ServiceRegistry{ List: api.ServiceList{ Items: []api.Service{ @@ -153,11 +152,11 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, }, } - factory := &BasicManifestFactory{ + factory := &BasicBoundPodFactory{ ServiceRegistry: ®istry, } - manifest, err := factory.MakeManifest("machine", api.Pod{ + pod, err := factory.MakeBoundPod("machine", &api.Pod{ DesiredState: api.PodState{ Manifest: api.ContainerManifest{ Containers: []api.Container{ @@ -174,10 +173,10 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, }) if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } - container := manifest.Containers[0] + container := pod.Spec.Containers[0] envs := []api.EnvVar{ { @@ -218,8 +217,7 @@ func TestMakeManifestServicesExistingEnvVar(t *testing.T) { }, } if len(container.Env) != len(envs) { - t.Errorf("Expected %d env vars, got: %#v", len(envs), manifest) - return + t.Fatalf("Expected %d env vars, got: %#v", len(envs), pod) } for ix := range container.Env { if !reflect.DeepEqual(envs[ix], container.Env[ix]) { From 2325140e1fa895d0b5f784985bdeeb09cbd927ba Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Tue, 14 Oct 2014 17:31:11 -0400 Subject: [PATCH 3/3] Rename the etcd path for pods to be /registry/nodes/<>/boundpods --- pkg/kubelet/config/etcd.go | 6 +----- pkg/registry/etcd/etcd.go | 2 +- pkg/registry/etcd/etcd_test.go | 24 ++++++++++++------------ 3 files changed, 14 insertions(+), 18 deletions(-) diff --git a/pkg/kubelet/config/etcd.go b/pkg/kubelet/config/etcd.go index d53e787b5552f..aae9d07d2d6cf 100644 --- a/pkg/kubelet/config/etcd.go +++ b/pkg/kubelet/config/etcd.go @@ -34,7 +34,7 @@ import ( ) func EtcdKeyForHost(hostname string) string { - return path.Join("/", "registry", "hosts", hostname, "kubelet") + return path.Join("/", "registry", "nodes", hostname, "boundpods") } type SourceEtcd struct { @@ -111,7 +111,3 @@ func eventToPods(ev watch.Event) ([]api.BoundPod, error) { return pods, nil } - -func makeContainerKey(machine string) string { - return "/registry/hosts/" + machine + "/kubelet" -} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 331af8e02c0f7..1a7505d321b25 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -130,7 +130,7 @@ func (r *Registry) GetPod(ctx api.Context, podID string) (*api.Pod, error) { } func makeContainerKey(machine string) string { - return "/registry/hosts/" + machine + "/kubelet" + return "/registry/nodes/" + machine + "/boundpods" } // CreatePod creates a pod based on a specification. diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index bc3fd38c795f8..f87a5f19ed1b8 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -117,7 +117,7 @@ func TestEtcdCreatePod(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0) + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{}), 0) registry := NewTestEtcdRegistry(fakeClient) err := registry.CreatePod(ctx, &api.Pod{ TypeMeta: api.TypeMeta{ @@ -157,7 +157,7 @@ func TestEtcdCreatePod(t *testing.T) { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } var boundPods api.BoundPods - resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -200,7 +200,7 @@ func TestEtcdCreatePodWithContainersError(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -241,7 +241,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Data["/registry/hosts/machine/kubelet"] = tools.EtcdResponseWithError{ + fakeClient.Data["/registry/nodes/machine/boundpods"] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -287,7 +287,7 @@ func TestEtcdCreatePodWithContainersNotFound(t *testing.T) { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } var boundPods api.BoundPods - resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -308,7 +308,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { }, E: tools.EtcdErrorNotFound, } - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ Items: []api.BoundPod{ {TypeMeta: api.TypeMeta{ID: "bar"}}, }, @@ -353,7 +353,7 @@ func TestEtcdCreatePodWithExistingContainers(t *testing.T) { t.Errorf("Unexpected pod: %#v %s", pod, resp.Node.Value) } var boundPods api.BoundPods - resp, err = fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + resp, err = fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -441,7 +441,7 @@ func TestEtcdUpdatePodScheduled(t *testing.T) { }, }), 1) - contKey := "/registry/hosts/machine/kubelet" + contKey := "/registry/nodes/machine/boundpods" fakeClient.Set(contKey, runtime.EncodeOrDie(latest.Codec, &api.ContainerManifestList{ Items: []api.ContainerManifest{ { @@ -516,7 +516,7 @@ func TestEtcdDeletePod(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, }), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ Items: []api.BoundPod{ {TypeMeta: api.TypeMeta{ID: "foo"}}, }, @@ -532,7 +532,7 @@ func TestEtcdDeletePod(t *testing.T) { } else if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) } @@ -553,7 +553,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { TypeMeta: api.TypeMeta{ID: "foo"}, DesiredState: api.PodState{Host: "machine"}, }), 0) - fakeClient.Set("/registry/hosts/machine/kubelet", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ + fakeClient.Set("/registry/nodes/machine/boundpods", runtime.EncodeOrDie(latest.Codec, &api.BoundPods{ Items: []api.BoundPod{ {TypeMeta: api.TypeMeta{ID: "foo"}}, {TypeMeta: api.TypeMeta{ID: "bar"}}, @@ -571,7 +571,7 @@ func TestEtcdDeletePodMultipleContainers(t *testing.T) { if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - response, err := fakeClient.Get("/registry/hosts/machine/kubelet", false, false) + response, err := fakeClient.Get("/registry/nodes/machine/boundpods", false, false) if err != nil { t.Fatalf("Unexpected error %v", err) }