diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 19b2c83181289..6263e0e2edecb 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -190,19 +190,13 @@ func (s *podStorage) Merge(source string, change interface{}) error { return nil } -// recordFirstSeenTime records the first seen time of this pod. -func recordFirstSeenTime(pod *api.Pod) { - glog.V(4).Infof("Receiving a new pod %q", kubeletUtil.FormatPodName(pod)) - pod.Annotations[kubelet.ConfigFirstSeenAnnotationKey] = kubeletTypes.NewTimestamp().GetString() -} - func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubelet.PodUpdate) { s.podLock.Lock() defer s.podLock.Unlock() - adds = &kubelet.PodUpdate{Op: kubelet.ADD} - updates = &kubelet.PodUpdate{Op: kubelet.UPDATE} - deletes = &kubelet.PodUpdate{Op: kubelet.REMOVE} + adds = &kubelet.PodUpdate{Op: kubelet.ADD, Source: source} + updates = &kubelet.PodUpdate{Op: kubelet.UPDATE, Source: source} + deletes = &kubelet.PodUpdate{Op: kubelet.REMOVE, Source: source} pods := s.pods[source] if pods == nil { @@ -221,6 +215,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de filtered := filterInvalidPods(update.Pods, source, s.recorder) for _, ref := range filtered { name := kubecontainer.GetPodFullName(ref) + // Annotate the pod with the source before any comparison. + if ref.Annotations == nil { + ref.Annotations = make(map[string]string) + } + ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source if existing, found := pods[name]; found { if checkAndUpdatePod(existing, ref) { // this is an update @@ -231,10 +230,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de continue } // this is an add - if ref.Annotations == nil { - ref.Annotations = make(map[string]string) - } - ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source recordFirstSeenTime(ref) pods[name] = ref adds.Pods = append(adds.Pods, ref) @@ -263,6 +258,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de filtered := filterInvalidPods(update.Pods, source, s.recorder) for _, ref := range filtered { name := kubecontainer.GetPodFullName(ref) + // Annotate the pod with the source before any comparison. + if ref.Annotations == nil { + ref.Annotations = make(map[string]string) + } + ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source if existing, found := oldPods[name]; found { pods[name] = existing if checkAndUpdatePod(existing, ref) { @@ -273,10 +273,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de // this is a no-op continue } - if ref.Annotations == nil { - ref.Annotations = make(map[string]string) - } - ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source recordFirstSeenTime(ref) pods[name] = ref adds.Pods = append(adds.Pods, ref) @@ -358,9 +354,12 @@ func isLocalAnnotationKey(key string) bool { // for local annotations. func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool { if candidateMap == nil { - return true + candidateMap = make(map[string]string) } for k, v := range candidateMap { + if isLocalAnnotationKey(k) { + continue + } if existingValue, ok := existingMap[k]; ok && existingValue == v { continue } @@ -378,6 +377,12 @@ func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool { return true } +// recordFirstSeenTime records the first seen time of this pod. +func recordFirstSeenTime(pod *api.Pod) { + glog.V(4).Infof("Receiving a new pod %q", kubeletUtil.FormatPodName(pod)) + pod.Annotations[kubelet.ConfigFirstSeenAnnotationKey] = kubeletTypes.NewTimestamp().GetString() +} + // updateAnnotations returns an Annotation map containing the api annotation map plus // locally managed annotations func updateAnnotations(existing, ref *api.Pod) { @@ -393,18 +398,30 @@ func updateAnnotations(existing, ref *api.Pod) { existing.Annotations = annotations } +func podsDifferSemantically(existing, ref *api.Pod) bool { + if reflect.DeepEqual(existing.Spec, ref.Spec) && + reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) && + reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) && + isAnnotationMapEqual(existing.Annotations, ref.Annotations) { + return false + } + return true +} + // checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or // returns false if there was no update. func checkAndUpdatePod(existing, ref *api.Pod) bool { // TODO: it would be better to update the whole object and only preserve certain things // like the source annotation or the UID (to ensure safety) - if reflect.DeepEqual(existing.Spec, ref.Spec) && - reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) && - reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) && - isAnnotationMapEqual(existing.Annotations, ref.Annotations) { + if !podsDifferSemantically(existing, ref) { return false } // this is an update + + // Overwrite the first-seen time with the existing one. This is our own + // internal annotation, there is no need to update. + ref.Annotations[kubelet.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubelet.ConfigFirstSeenAnnotationKey] + existing.Spec = ref.Spec existing.DeletionTimestamp = ref.DeletionTimestamp existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index fac24eca7ab5e..2fdcccec6a3b5 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -29,7 +29,6 @@ import ( ) const ( - NoneSource = "" TestSource = "test" ) @@ -53,7 +52,7 @@ func (s sortedPods) Less(i, j int) bool { return s[i].Namespace < s[j].Namespace } -func CreateValidPod(name, namespace, source string) *api.Pod { +func CreateValidPod(name, namespace string) *api.Pod { return &api.Pod{ ObjectMeta: api.ObjectMeta{ UID: types.UID(name), // for the purpose of testing, this is unique enough @@ -91,19 +90,25 @@ func expectPodUpdate(t *testing.T, ch <-chan kubelet.PodUpdate, expected ...kube for i := range expected { update := <-ch sort.Sort(sortedPods(update.Pods)) - // Clear the annotation field before the comparison. - // TODO: consider mock out recordFirstSeen in config.go - for _, pod := range update.Pods { - delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey) - delete(pod.Annotations, kubelet.ConfigSourceAnnotationKey) + sort.Sort(sortedPods(expected[i].Pods)) + // Make copies of the expected/actual update to compare all fields + // except for "Pods", which are compared separately below. + expectedCopy, updateCopy := expected[i], update + expectedCopy.Pods, updateCopy.Pods = nil, nil + if !api.Semantic.DeepEqual(expectedCopy, updateCopy) { + t.Fatalf("Expected %#v, Got %#v", expectedCopy, updateCopy) } - for _, pod := range expected[i].Pods { - delete(pod.Annotations, kubelet.ConfigFirstSeenAnnotationKey) - delete(pod.Annotations, kubelet.ConfigSourceAnnotationKey) - } - if !api.Semantic.DeepEqual(expected[i], update) { + + if len(expected[i].Pods) != len(update.Pods) { t.Fatalf("Expected %#v, Got %#v", expected[i], update) } + // Compare pods one by one. This is necessary beacuse we don't want to + // compare local annotations. + for j := range expected[i].Pods { + if podsDifferSemantically(expected[i].Pods[j], update.Pods[j]) { + t.Fatalf("Expected %#v, Got %#v", expected[i].Pods[j], update.Pods[j]) + } + } } expectNoPodUpdate(t, ch) } @@ -120,19 +125,19 @@ func TestNewPodAdded(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new"))) } func TestNewPodAddedInvalidNamespace(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "")) channel <- podUpdate config.Sync() expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource)) @@ -142,41 +147,41 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "default")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "default"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default"))) } func TestNewPodAddedDifferentNamespaces(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "default")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "default", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "default"))) // see an update in another namespace - podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) + podUpdate = CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default", "test"), CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "default"), CreateValidPod("foo", "new"))) } func TestInvalidPodFiltered(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // see an update - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new"))) // add an invalid update - podUpdate = CreatePodUpdate(kubelet.UPDATE, NoneSource, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) + podUpdate = CreatePodUpdate(kubelet.UPDATE, TestSource, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}) channel <- podUpdate expectNoPodUpdate(t, ch) } @@ -185,35 +190,35 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new"))) // container updates are separated as UPDATE pod := *podUpdate.Pods[0] pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test", ImagePullPolicy: api.PullIfNotPresent}} - channel <- CreatePodUpdate(kubelet.ADD, NoneSource, &pod) - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, &pod)) + channel <- CreatePodUpdate(kubelet.ADD, TestSource, &pod) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, TestSource, &pod)) } func TestNewPodAddedSnapshot(t *testing.T) { channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) // see an set - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo", "new"))) config.Sync() - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, kubelet.AllSource, CreateValidPod("foo", "new"))) // container updates are separated as UPDATE pod := *podUpdate.Pods[0] pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test", ImagePullPolicy: api.PullIfNotPresent}} - channel <- CreatePodUpdate(kubelet.ADD, NoneSource, &pod) + channel <- CreatePodUpdate(kubelet.ADD, TestSource, &pod) expectPodUpdate(t, ch, CreatePodUpdate(kubelet.SET, TestSource, &pod)) } @@ -221,51 +226,51 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) // an kubelet.ADD should be converted to kubelet.UPDATE - pod := CreateValidPod("foo", "new", "test") + pod := CreateValidPod("foo", "new") pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test", ImagePullPolicy: api.PullIfNotPresent}} - podUpdate = CreatePodUpdate(kubelet.ADD, NoneSource, pod) + podUpdate = CreatePodUpdate(kubelet.ADD, TestSource, pod) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, TestSource, pod)) - podUpdate = CreatePodUpdate(kubelet.REMOVE, NoneSource, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}}) + podUpdate = CreatePodUpdate(kubelet.REMOVE, TestSource, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "new"}}) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, NoneSource, pod)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.REMOVE, TestSource, pod)) } func TestNewPodAddedUpdatedSet(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) // should register an add - podUpdate := CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", ""), CreateValidPod("foo2", "new", ""), CreateValidPod("foo3", "new", "")) + podUpdate := CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo", "new", "test"), CreateValidPod("foo2", "new", "test"), CreateValidPod("foo3", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))) // should ignore ADDs that are identical expectNoPodUpdate(t, ch) // should be converted to an kubelet.ADD, kubelet.REMOVE, and kubelet.UPDATE - pod := CreateValidPod("foo2", "new", "test") + pod := CreateValidPod("foo2", "new") pod.Spec.Containers = []api.Container{{Name: "bar", Image: "test", ImagePullPolicy: api.PullIfNotPresent}} - podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, pod, CreateValidPod("foo3", "new", ""), CreateValidPod("foo4", "new", "test")) + podUpdate = CreatePodUpdate(kubelet.SET, TestSource, pod, CreateValidPod("foo3", "new"), CreateValidPod("foo4", "new")) channel <- podUpdate expectPodUpdate(t, ch, - CreatePodUpdate(kubelet.REMOVE, NoneSource, CreateValidPod("foo", "new", "test")), - CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo4", "new", "test")), - CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) + CreatePodUpdate(kubelet.REMOVE, TestSource, CreateValidPod("foo", "new")), + CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo4", "new")), + CreatePodUpdate(kubelet.UPDATE, TestSource, pod)) } func TestPodUpdateAnnotations(t *testing.T) { channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) - pod := CreateValidPod("foo2", "new", "test") + pod := CreateValidPod("foo2", "new") pod.Annotations = make(map[string]string, 0) pod.Annotations["kubernetes.io/blah"] = "blah" @@ -274,22 +279,22 @@ func TestPodUpdateAnnotations(t *testing.T) { t.Fatalf("%v", err) } - podUpdate := CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), clone.(*api.Pod), CreateValidPod("foo3", "new", "test")) + podUpdate := CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo1", "new"), clone.(*api.Pod), CreateValidPod("foo3", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test"))) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.ADD, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new"))) pod.Annotations["kubenetes.io/blah"] = "superblah" - podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test")) + podUpdate = CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, TestSource, pod)) pod.Annotations["kubernetes.io/otherblah"] = "doh" - podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test")) + podUpdate = CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, TestSource, pod)) delete(pod.Annotations, "kubernetes.io/blah") - podUpdate = CreatePodUpdate(kubelet.SET, NoneSource, CreateValidPod("foo1", "new", "test"), pod, CreateValidPod("foo3", "new", "test")) + podUpdate = CreatePodUpdate(kubelet.SET, TestSource, CreateValidPod("foo1", "new"), pod, CreateValidPod("foo3", "new")) channel <- podUpdate - expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, NoneSource, pod)) + expectPodUpdate(t, ch, CreatePodUpdate(kubelet.UPDATE, TestSource, pod)) }