Skip to content

Commit

Permalink
Merge pull request kubernetes#27349 from resouer/delete
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Generates DELETE pod update operations

fixes kubernetes#27105

Generates DELETE pod update operations  to make the code and log more intuitive.

1. main refactoring is in `kubelet/config`
2. kubelet will log if it received DELETE, just like other OPs

cc @Random-Liu :)
  • Loading branch information
k8s-merge-robot authored Jul 12, 2016
2 parents 629f3c1 + 0d5dddc commit 72f6493
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 28 deletions.
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/executor/service/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,5 +80,5 @@ func (kl *executorKubelet) Run(mergedUpdates <-chan kubetypes.PodUpdate) {

//TODO(jdef) revisit this if/when executor failover lands
// Force kubelet to delete all pods.
kl.HandlePodDeletions(kl.GetPods())
kl.HandlePodRemoves(kl.GetPods())
}
64 changes: 43 additions & 21 deletions pkg/kubelet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ const (
// PodConfigNotificationSnapshot delivers the full configuration as a SET whenever
// any change occurs.
PodConfigNotificationSnapshot
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE message whenever pods are
// PodConfigNotificationSnapshotAndUpdates delivers an UPDATE and DELETE message whenever pods are
// changed, and a SET message if there are any additions or removals.
PodConfigNotificationSnapshotAndUpdates
// PodConfigNotificationIncremental delivers ADD, UPDATE, REMOVE, RECONCILE to the update channel.
// PodConfigNotificationIncremental delivers ADD, UPDATE, DELETE, REMOVE, RECONCILE to the update channel.
PodConfigNotificationIncremental
)

Expand Down Expand Up @@ -152,24 +152,27 @@ func (s *podStorage) Merge(source string, change interface{}) error {
defer s.updateLock.Unlock()

seenBefore := s.sourcesSeen.Has(source)
adds, updates, deletes, reconciles := s.merge(source, change)
adds, updates, deletes, removes, reconciles := s.merge(source, change)
firstSet := !seenBefore && s.sourcesSeen.Has(source)

// deliver update notifications
switch s.mode {
case PodConfigNotificationIncremental:
if len(deletes.Pods) > 0 {
s.updates <- *deletes
if len(removes.Pods) > 0 {
s.updates <- *removes
}
if len(adds.Pods) > 0 {
s.updates <- *adds
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 {
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}
if firstSet && len(adds.Pods) == 0 && len(updates.Pods) == 0 && len(deletes.Pods) == 0 {
// Send an empty update when first seeing the source and there are
// no ADD or UPDATE pods from the source. This signals kubelet that
// no ADD or UPDATE or DELETE pods from the source. This signals kubelet that
// the source is ready.
s.updates <- *adds
}
Expand All @@ -179,15 +182,18 @@ func (s *podStorage) Merge(source string, change interface{}) error {
}

case PodConfigNotificationSnapshotAndUpdates:
if len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
if len(removes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
}
if len(updates.Pods) > 0 {
s.updates <- *updates
}
if len(deletes.Pods) > 0 {
s.updates <- *deletes
}

case PodConfigNotificationSnapshot:
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || firstSet {
if len(updates.Pods) > 0 || len(deletes.Pods) > 0 || len(adds.Pods) > 0 || len(removes.Pods) > 0 || firstSet {
s.updates <- kubetypes.PodUpdate{Pods: s.MergedState().([]*api.Pod), Op: kubetypes.SET, Source: source}
}

Expand All @@ -200,13 +206,14 @@ func (s *podStorage) Merge(source string, change interface{}) error {
return nil
}

func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, reconciles *kubetypes.PodUpdate) {
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
s.podLock.Lock()
defer s.podLock.Unlock()

addPods := []*api.Pod{}
updatePods := []*api.Pod{}
deletePods := []*api.Pod{}
removePods := []*api.Pod{}
reconcilePods := []*api.Pod{}

pods := s.pods[source]
Expand All @@ -228,11 +235,13 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
ref.Annotations[kubetypes.ConfigSourceAnnotationKey] = source
if existing, found := oldPods[name]; found {
pods[name] = existing
needUpdate, needReconcile := checkAndUpdatePod(existing, ref)
needUpdate, needReconcile, needGracefulDelete := checkAndUpdatePod(existing, ref)
if needUpdate {
updatePods = append(updatePods, existing)
} else if needReconcile {
reconcilePods = append(reconcilePods, existing)
} else if needGracefulDelete {
deletePods = append(deletePods, existing)
}
continue
}
Expand All @@ -244,9 +253,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de

update := change.(kubetypes.PodUpdate)
switch update.Op {
case kubetypes.ADD, kubetypes.UPDATE:
case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
if update.Op == kubetypes.ADD {
glog.V(4).Infof("Adding new pods from source %s : %v", source, update.Pods)
} else if update.Op == kubetypes.DELETE {
glog.V(4).Infof("Graceful deleting pods from source %s : %v", source, update.Pods)
} else {
glog.V(4).Infof("Updating pods from source %s : %v", source, update.Pods)
}
Expand All @@ -259,7 +270,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
if existing, found := pods[name]; found {
// this is a delete
delete(pods, name)
deletePods = append(deletePods, existing)
removePods = append(removePods, existing)
continue
}
// this is a no-op
Expand All @@ -275,7 +286,7 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
for name, existing := range oldPods {
if _, found := pods[name]; !found {
// this is a delete
deletePods = append(deletePods, existing)
removePods = append(removePods, existing)
}
}

Expand All @@ -288,10 +299,11 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de

adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(deletePods), Source: source}
deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

return adds, updates, deletes, reconciles
return adds, updates, deletes, removes, reconciles
}

func (s *podStorage) markSourceSet(source string) {
Expand Down Expand Up @@ -413,10 +425,13 @@ func podsDifferSemantically(existing, ref *api.Pod) bool {

// checkAndUpdatePod updates existing, and:
// * if ref makes a meaningful change, returns needUpdate=true
// * if ref makes a meaningful change, and this change is graceful deletion, returns needGracefulDelete=true
// * if ref makes no meaningful change, but changes the pod status, returns needReconcile=true
// * else return both false
// Now, needUpdate and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool) {
// * else return all false
// Now, needUpdate, needGracefulDelete and needReconcile should never be both true
func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile, needGracefulDelete bool) {

// 1. this is a reconcile
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if !podsDifferSemantically(existing, ref) {
Expand All @@ -431,7 +446,6 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
}
return
}
// this is an update

// Overwrite the first-seen time with the existing one. This is our own
// internal annotation, there is no need to update.
Expand All @@ -443,7 +457,15 @@ func checkAndUpdatePod(existing, ref *api.Pod) (needUpdate, needReconcile bool)
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
existing.Status = ref.Status
updateAnnotations(existing, ref)
needUpdate = true

// 2. this is an graceful delete
if ref.DeletionTimestamp != nil {
needGracefulDelete = true
} else {
// 3. this is an update
needUpdate = true
}

return
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/kubelet/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"sort"
"strconv"
"testing"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/conversion"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -248,6 +250,25 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.REMOVE, TestSource, pod))
}

func TestNewPodAddedDelete(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)

// should register an add
addedPod := CreateValidPod("foo", "new")
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, addedPod)
channel <- podUpdate
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.ADD, TestSource, addedPod))

// mark this pod as deleted
timestamp := unversioned.NewTime(time.Now())
deletedPod := CreateValidPod("foo", "new")
deletedPod.ObjectMeta.DeletionTimestamp = &timestamp
podUpdate = CreatePodUpdate(kubetypes.DELETE, TestSource, deletedPod)
channel <- podUpdate
// the existing pod should be gracefully deleted
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.DELETE, TestSource, addedPod))
}

func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)

Expand Down
17 changes: 11 additions & 6 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ const (
type SyncHandler interface {
HandlePodAdditions(pods []*api.Pod)
HandlePodUpdates(pods []*api.Pod)
HandlePodDeletions(pods []*api.Pod)
HandlePodRemoves(pods []*api.Pod)
HandlePodReconcile(pods []*api.Pod)
HandlePodSyncs(pods []*api.Pod)
HandlePodCleanups() error
Expand Down Expand Up @@ -1673,7 +1673,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
// pod - the pod to sync
// mirrorPod - the mirror pod for the pod to sync, if it is a static pod
// podStatus - the current status (TODO: always from the status manager?)
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE)
// updateType - the type of update (ADD, UPDATE, REMOVE, RECONCILE, DELETE)
//
// The workflow is:
// * If the pod is being created, record pod worker start latency
Expand Down Expand Up @@ -2321,13 +2321,18 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
handler.HandlePodUpdates(u.Pods)
case kubetypes.REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodDeletions(u.Pods)
handler.HandlePodRemoves(u.Pods)
case kubetypes.RECONCILE:
glog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods))
handler.HandlePodReconcile(u.Pods)
case kubetypes.DELETE:
glog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods))
// DELETE is treated as a UPDATE because of graceful deletion.
handler.HandlePodUpdates(u.Pods)
case kubetypes.SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")

}
case e := <-plegCh:
// PLEG event for a pod; sync it.
Expand Down Expand Up @@ -2463,9 +2468,9 @@ func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod) {
}
}

// HandlePodDeletions is the callback in the SyncHandler interface for pods
// being deleted from a config source.
func (kl *Kubelet) HandlePodDeletions(pods []*api.Pod) {
// HandlePodRemoves is the callback in the SyncHandler interface for pods
// being removed from a config source.
func (kl *Kubelet) HandlePodRemoves(pods []*api.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.DeletePod(pod)
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/types/pod_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
SET PodOperation = iota
// Pods with the given ids are new to this source
ADD
// Pods with the given ids are gracefully deleted from this source
DELETE
// Pods with the given ids have been removed from this source
REMOVE
// Pods with the given ids have been updated in this source
Expand Down

0 comments on commit 72f6493

Please sign in to comment.