Skip to content

Commit

Permalink
Merge pull request #13880 from yujuhong/fix_annotations
Browse files Browse the repository at this point in the history
Fix source annotation in kubelet
  • Loading branch information
nikhiljindal committed Sep 18, 2015
2 parents 5577430 + fb27077 commit 9e5ed1d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 81 deletions.
61 changes: 39 additions & 22 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,19 +190,13 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil
}

// recordFirstSeenTime records the first seen time of this pod.
func recordFirstSeenTime(pod *api.Pod) {
glog.V(4).Infof("Receiving a new pod %q", kubeletUtil.FormatPodName(pod))
pod.Annotations[kubelet.ConfigFirstSeenAnnotationKey] = kubeletTypes.NewTimestamp().GetString()
}

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes *kubelet.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()

adds = &kubelet.PodUpdate{Op: kubelet.ADD}
updates = &kubelet.PodUpdate{Op: kubelet.UPDATE}
deletes = &kubelet.PodUpdate{Op: kubelet.REMOVE}
adds = &kubelet.PodUpdate{Op: kubelet.ADD, Source: source}
updates = &kubelet.PodUpdate{Op: kubelet.UPDATE, Source: source}
deletes = &kubelet.PodUpdate{Op: kubelet.REMOVE, Source: source}

pods := s.pods[source]
if pods == nil {
Expand All @@ -221,6 +215,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
filtered := filterInvalidPods(update.Pods, source, s.recorder)
for _, ref := range filtered {
name := kubecontainer.GetPodFullName(ref)
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
if existing, found := pods[name]; found {
if checkAndUpdatePod(existing, ref) {
// this is an update
Expand All @@ -231,10 +230,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
continue
}
// this is an add
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
recordFirstSeenTime(ref)
pods[name] = ref
adds.Pods = append(adds.Pods, ref)
Expand Down Expand Up @@ -263,6 +258,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
filtered := filterInvalidPods(update.Pods, source, s.recorder)
for _, ref := range filtered {
name := kubecontainer.GetPodFullName(ref)
// Annotate the pod with the source before any comparison.
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[name]; found {
pods[name] = existing
if checkAndUpdatePod(existing, ref) {
Expand All @@ -273,10 +273,6 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
// this is a no-op
continue
}
if ref.Annotations == nil {
ref.Annotations = make(map[string]string)
}
ref.Annotations[kubelet.ConfigSourceAnnotationKey] = source
recordFirstSeenTime(ref)
pods[name] = ref
adds.Pods = append(adds.Pods, ref)
Expand Down Expand Up @@ -358,9 +354,12 @@ func isLocalAnnotationKey(key string) bool {
// for local annotations.
func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool {
if candidateMap == nil {
return true
candidateMap = make(map[string]string)
}
for k, v := range candidateMap {
if isLocalAnnotationKey(k) {
continue
}
if existingValue, ok := existingMap[k]; ok && existingValue == v {
continue
}
Expand All @@ -378,6 +377,12 @@ func isAnnotationMapEqual(existingMap, candidateMap map[string]string) bool {
return true
}

// recordFirstSeenTime records the first seen time of this pod.
func recordFirstSeenTime(pod *api.Pod) {
glog.V(4).Infof("Receiving a new pod %q", kubeletUtil.FormatPodName(pod))
pod.Annotations[kubelet.ConfigFirstSeenAnnotationKey] = kubeletTypes.NewTimestamp().GetString()
}

// updateAnnotations returns an Annotation map containing the api annotation map plus
// locally managed annotations
func updateAnnotations(existing, ref *api.Pod) {
Expand All @@ -393,18 +398,30 @@ func updateAnnotations(existing, ref *api.Pod) {
existing.Annotations = annotations
}

func podsDifferSemantically(existing, ref *api.Pod) bool {
if reflect.DeepEqual(existing.Spec, ref.Spec) &&
reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) &&
isAnnotationMapEqual(existing.Annotations, ref.Annotations) {
return false
}
return true
}

// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or
// returns false if there was no update.
func checkAndUpdatePod(existing, ref *api.Pod) bool {
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if reflect.DeepEqual(existing.Spec, ref.Spec) &&
reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) &&
isAnnotationMapEqual(existing.Annotations, ref.Annotations) {
if !podsDifferSemantically(existing, ref) {
return false
}
// this is an update

// Overwrite the first-seen time with the existing one. This is our own
// internal annotation, there is no need to update.
ref.Annotations[kubelet.ConfigFirstSeenAnnotationKey] = existing.Annotations[kubelet.ConfigFirstSeenAnnotationKey]

existing.Spec = ref.Spec
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
Expand Down
Loading

0 comments on commit 9e5ed1d

Please sign in to comment.