Skip to content

Commit

Permalink
Merge pull request kubernetes#28770 from gmarek/coupling
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Reduce tightness of coupling in NodeController

Depends on kubernetes#28604
  • Loading branch information
k8s-merge-robot authored Jul 12, 2016
2 parents 9467c21 + 7524da8 commit ea70eca
Show file tree
Hide file tree
Showing 3 changed files with 361 additions and 335 deletions.
319 changes: 319 additions & 0 deletions pkg/controller/node/deletion_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,319 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package node

import (
"fmt"
"strings"
"time"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/types"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/version"

"github.com/golang/glog"
)

// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists {
continue
}
if err := forcefulDeletePodFunc(pod); err != nil {
utilruntime.HandleError(err)
}
}
}

// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, daemonStore cache.StoreToDaemonSetLister) (bool, error) {
remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return remaining, err
}

if len(pods.Items) > 0 {
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}

for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// if the pod has already been deleted, ignore it
if pod.DeletionGracePeriodSeconds != nil {
continue
}
// if the pod is managed by a daemonset, ignore it
_, err := daemonStore.GetPodDaemonSets(&pod)
if err == nil { // No error means at least one daemonset was found
continue
}

glog.V(2).Infof("Starting deletion of pod %v", pod.Name)
recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
return remaining, nil
}

func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
var zero int64
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
if err == nil {
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name)
}
return err
}

// forcefullyDeleteNode immediately deletes all pods on the node, and then
// deletes the node itself.
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error {
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err)
}
for _, pod := range pods.Items {
if pod.Spec.NodeName != nodeName {
continue
}
if err := forcefulDeletePodFunc(&pod); err != nil {
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err)
}
}
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil {
return fmt.Errorf("unable to delete node %q: %v", nodeName, err)
}
return nil
}

// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes
// all eviction timer to reset.
func forceUpdateAllProbeTimes(now unversioned.Time, statusData map[string]nodeStatusData) {
for k, v := range statusData {
v.probeTimestamp = now
v.readyTransitionTimestamp = now
statusData[k] = v
}
}

// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating
// that should not be gracefully terminated.
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) {
pod, ok := obj.(*api.Pod)
if !ok {
return
}

// consider only terminating pods
if pod.DeletionTimestamp == nil {
return
}

// delete terminating pods that have not yet been scheduled
if len(pod.Spec.NodeName) == 0 {
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}

nodeObj, found, err := nodeStore.GetByKey(pod.Spec.NodeName)
if err != nil {
// this can only happen if the Store.KeyFunc has a problem creating
// a key for the pod. If it happens once, it will happen again so
// don't bother requeuing the pod.
utilruntime.HandleError(err)
return
}

// delete terminating pods that have been scheduled on
// nonexistent nodes
if !found {
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName)
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}

// delete terminating pods that have been scheduled on
// nodes that do not support graceful termination
// TODO(mikedanese): this can be removed when we no longer
// guarantee backwards compatibility of master API to kubelets with
// versions less than 1.1.0
node := nodeObj.(*api.Node)
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion)
if err != nil {
glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err)
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}
if gracefulDeletionVersion.GT(v) {
utilruntime.HandleError(forcefulDeletePodFunc(pod))
return
}
}

// update ready status of all pods running on given node from master
// return true if success
func markAllPodsNotReady(kubeClient clientset.Interface, nodeName string) error {
glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName)
opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(opts)
if err != nil {
return err
}

errMsg := []string{}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}

for i, cond := range pod.Status.Conditions {
if cond.Type == api.PodReady {
pod.Status.Conditions[i].Status = api.ConditionFalse
glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name)
_, err := kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod)
if err != nil {
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err)
errMsg = append(errMsg, fmt.Sprintf("%v", err))
}
break
}
}
}
if len(errMsg) == 0 {
return nil
}
return fmt.Errorf("%v", strings.Join(errMsg, "; "))
}

func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) {
instances, ok := cloud.Instances()
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
if _, err := instances.ExternalID(nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}
return false, err
}
return true, nil
}

func recordNodeEvent(recorder record.EventRecorder, nodeName, eventtype, reason, event string) {
ref := &api.ObjectReference{
Kind: "Node",
Name: nodeName,
UID: types.UID(nodeName),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName)
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event)
}

func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) {
ref := &api.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.Name),
Namespace: "",
}
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name)
// TODO: This requires a transaction, either both node status is updated
// and event is recorded or neither should happen, see issue #6055.
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status)
}

// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
// long before we should check again (the next deadline for a pod to complete), or an error.
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) {
// the time before we should try again
nextAttempt := time.Duration(0)
// have we deleted all pods
complete := true

selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return false, nextAttempt, err
}

now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}

// the user's requested grace period
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > maxGracePeriod {
grace = maxGracePeriod
}

// the time remaining before the pod should have been deleted
remaining := grace - elapsed
if remaining < 0 {
remaining = 0
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
complete = false
}
} else {
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
complete = false
}

if nextAttempt < remaining {
nextAttempt = remaining
}
}
return complete, nextAttempt, nil
}
Loading

0 comments on commit ea70eca

Please sign in to comment.