forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request kubernetes#28770 from gmarek/coupling
Automatic merge from submit-queue Reduce tightness of coupling in NodeController Depends on kubernetes#28604
- Loading branch information
Showing
3 changed files
with
361 additions
and
335 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,319 @@ | ||
/* | ||
Copyright 2016 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package node | ||
|
||
import ( | ||
"fmt" | ||
"strings" | ||
"time" | ||
|
||
"k8s.io/kubernetes/pkg/api" | ||
"k8s.io/kubernetes/pkg/api/unversioned" | ||
"k8s.io/kubernetes/pkg/client/cache" | ||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" | ||
"k8s.io/kubernetes/pkg/client/record" | ||
"k8s.io/kubernetes/pkg/cloudprovider" | ||
"k8s.io/kubernetes/pkg/fields" | ||
"k8s.io/kubernetes/pkg/kubelet/util/format" | ||
"k8s.io/kubernetes/pkg/types" | ||
utilruntime "k8s.io/kubernetes/pkg/util/runtime" | ||
"k8s.io/kubernetes/pkg/version" | ||
|
||
"github.com/golang/glog" | ||
) | ||
|
||
// cleanupOrphanedPods deletes pods that are bound to nodes that don't | ||
// exist. | ||
func cleanupOrphanedPods(pods []*api.Pod, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) { | ||
for _, pod := range pods { | ||
if pod.Spec.NodeName == "" { | ||
continue | ||
} | ||
if _, exists, _ := nodeStore.GetByKey(pod.Spec.NodeName); exists { | ||
continue | ||
} | ||
if err := forcefulDeletePodFunc(pod); err != nil { | ||
utilruntime.HandleError(err) | ||
} | ||
} | ||
} | ||
|
||
// deletePods will delete all pods from master running on given node, and return true | ||
// if any pods were deleted. | ||
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, daemonStore cache.StoreToDaemonSetLister) (bool, error) { | ||
remaining := false | ||
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) | ||
options := api.ListOptions{FieldSelector: selector} | ||
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options) | ||
if err != nil { | ||
return remaining, err | ||
} | ||
|
||
if len(pods.Items) > 0 { | ||
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) | ||
} | ||
|
||
for _, pod := range pods.Items { | ||
// Defensive check, also needed for tests. | ||
if pod.Spec.NodeName != nodeName { | ||
continue | ||
} | ||
// if the pod has already been deleted, ignore it | ||
if pod.DeletionGracePeriodSeconds != nil { | ||
continue | ||
} | ||
// if the pod is managed by a daemonset, ignore it | ||
_, err := daemonStore.GetPodDaemonSets(&pod) | ||
if err == nil { // No error means at least one daemonset was found | ||
continue | ||
} | ||
|
||
glog.V(2).Infof("Starting deletion of pod %v", pod.Name) | ||
recorder.Eventf(&pod, api.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) | ||
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { | ||
return false, err | ||
} | ||
remaining = true | ||
} | ||
return remaining, nil | ||
} | ||
|
||
func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error { | ||
var zero int64 | ||
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero}) | ||
if err == nil { | ||
glog.V(4).Infof("forceful deletion of %s succeeded", pod.Name) | ||
} | ||
return err | ||
} | ||
|
||
// forcefullyDeleteNode immediately deletes all pods on the node, and then | ||
// deletes the node itself. | ||
func forcefullyDeleteNode(kubeClient clientset.Interface, nodeName string, forcefulDeletePodFunc func(*api.Pod) error) error { | ||
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) | ||
options := api.ListOptions{FieldSelector: selector} | ||
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options) | ||
if err != nil { | ||
return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err) | ||
} | ||
for _, pod := range pods.Items { | ||
if pod.Spec.NodeName != nodeName { | ||
continue | ||
} | ||
if err := forcefulDeletePodFunc(&pod); err != nil { | ||
return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err) | ||
} | ||
} | ||
if err := kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil { | ||
return fmt.Errorf("unable to delete node %q: %v", nodeName, err) | ||
} | ||
return nil | ||
} | ||
|
||
// forceUpdateAllProbeTimes bumps all observed timestamps in saved nodeStatuses to now. This makes | ||
// all eviction timer to reset. | ||
func forceUpdateAllProbeTimes(now unversioned.Time, statusData map[string]nodeStatusData) { | ||
for k, v := range statusData { | ||
v.probeTimestamp = now | ||
v.readyTransitionTimestamp = now | ||
statusData[k] = v | ||
} | ||
} | ||
|
||
// maybeDeleteTerminatingPod non-gracefully deletes pods that are terminating | ||
// that should not be gracefully terminated. | ||
func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}, nodeStore cache.Store, forcefulDeletePodFunc func(*api.Pod) error) { | ||
pod, ok := obj.(*api.Pod) | ||
if !ok { | ||
return | ||
} | ||
|
||
// consider only terminating pods | ||
if pod.DeletionTimestamp == nil { | ||
return | ||
} | ||
|
||
// delete terminating pods that have not yet been scheduled | ||
if len(pod.Spec.NodeName) == 0 { | ||
utilruntime.HandleError(forcefulDeletePodFunc(pod)) | ||
return | ||
} | ||
|
||
nodeObj, found, err := nodeStore.GetByKey(pod.Spec.NodeName) | ||
if err != nil { | ||
// this can only happen if the Store.KeyFunc has a problem creating | ||
// a key for the pod. If it happens once, it will happen again so | ||
// don't bother requeuing the pod. | ||
utilruntime.HandleError(err) | ||
return | ||
} | ||
|
||
// delete terminating pods that have been scheduled on | ||
// nonexistent nodes | ||
if !found { | ||
glog.Warningf("Unable to find Node: %v, deleting all assigned Pods.", pod.Spec.NodeName) | ||
utilruntime.HandleError(forcefulDeletePodFunc(pod)) | ||
return | ||
} | ||
|
||
// delete terminating pods that have been scheduled on | ||
// nodes that do not support graceful termination | ||
// TODO(mikedanese): this can be removed when we no longer | ||
// guarantee backwards compatibility of master API to kubelets with | ||
// versions less than 1.1.0 | ||
node := nodeObj.(*api.Node) | ||
v, err := version.Parse(node.Status.NodeInfo.KubeletVersion) | ||
if err != nil { | ||
glog.V(0).Infof("couldn't parse verions %q of minion: %v", node.Status.NodeInfo.KubeletVersion, err) | ||
utilruntime.HandleError(forcefulDeletePodFunc(pod)) | ||
return | ||
} | ||
if gracefulDeletionVersion.GT(v) { | ||
utilruntime.HandleError(forcefulDeletePodFunc(pod)) | ||
return | ||
} | ||
} | ||
|
||
// update ready status of all pods running on given node from master | ||
// return true if success | ||
func markAllPodsNotReady(kubeClient clientset.Interface, nodeName string) error { | ||
glog.V(2).Infof("Update ready status of pods on node [%v]", nodeName) | ||
opts := api.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName)} | ||
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(opts) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
errMsg := []string{} | ||
for _, pod := range pods.Items { | ||
// Defensive check, also needed for tests. | ||
if pod.Spec.NodeName != nodeName { | ||
continue | ||
} | ||
|
||
for i, cond := range pod.Status.Conditions { | ||
if cond.Type == api.PodReady { | ||
pod.Status.Conditions[i].Status = api.ConditionFalse | ||
glog.V(2).Infof("Updating ready status of pod %v to false", pod.Name) | ||
_, err := kubeClient.Core().Pods(pod.Namespace).UpdateStatus(&pod) | ||
if err != nil { | ||
glog.Warningf("Failed to update status for pod %q: %v", format.Pod(&pod), err) | ||
errMsg = append(errMsg, fmt.Sprintf("%v", err)) | ||
} | ||
break | ||
} | ||
} | ||
} | ||
if len(errMsg) == 0 { | ||
return nil | ||
} | ||
return fmt.Errorf("%v", strings.Join(errMsg, "; ")) | ||
} | ||
|
||
func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) { | ||
instances, ok := cloud.Instances() | ||
if !ok { | ||
return false, fmt.Errorf("%v", ErrCloudInstance) | ||
} | ||
if _, err := instances.ExternalID(nodeName); err != nil { | ||
if err == cloudprovider.InstanceNotFound { | ||
return false, nil | ||
} | ||
return false, err | ||
} | ||
return true, nil | ||
} | ||
|
||
func recordNodeEvent(recorder record.EventRecorder, nodeName, eventtype, reason, event string) { | ||
ref := &api.ObjectReference{ | ||
Kind: "Node", | ||
Name: nodeName, | ||
UID: types.UID(nodeName), | ||
Namespace: "", | ||
} | ||
glog.V(2).Infof("Recording %s event message for node %s", event, nodeName) | ||
recorder.Eventf(ref, eventtype, reason, "Node %s event: %s", nodeName, event) | ||
} | ||
|
||
func recordNodeStatusChange(recorder record.EventRecorder, node *api.Node, new_status string) { | ||
ref := &api.ObjectReference{ | ||
Kind: "Node", | ||
Name: node.Name, | ||
UID: types.UID(node.Name), | ||
Namespace: "", | ||
} | ||
glog.V(2).Infof("Recording status change %s event message for node %s", new_status, node.Name) | ||
// TODO: This requires a transaction, either both node status is updated | ||
// and event is recorded or neither should happen, see issue #6055. | ||
recorder.Eventf(ref, api.EventTypeNormal, new_status, "Node %s status is now: %s", node.Name, new_status) | ||
} | ||
|
||
// terminatePods will ensure all pods on the given node that are in terminating state are eventually | ||
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how | ||
// long before we should check again (the next deadline for a pod to complete), or an error. | ||
func terminatePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName string, since time.Time, maxGracePeriod time.Duration) (bool, time.Duration, error) { | ||
// the time before we should try again | ||
nextAttempt := time.Duration(0) | ||
// have we deleted all pods | ||
complete := true | ||
|
||
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) | ||
options := api.ListOptions{FieldSelector: selector} | ||
pods, err := kubeClient.Core().Pods(api.NamespaceAll).List(options) | ||
if err != nil { | ||
return false, nextAttempt, err | ||
} | ||
|
||
now := time.Now() | ||
elapsed := now.Sub(since) | ||
for _, pod := range pods.Items { | ||
// Defensive check, also needed for tests. | ||
if pod.Spec.NodeName != nodeName { | ||
continue | ||
} | ||
// only clean terminated pods | ||
if pod.DeletionGracePeriodSeconds == nil { | ||
continue | ||
} | ||
|
||
// the user's requested grace period | ||
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second | ||
if grace > maxGracePeriod { | ||
grace = maxGracePeriod | ||
} | ||
|
||
// the time remaining before the pod should have been deleted | ||
remaining := grace - elapsed | ||
if remaining < 0 { | ||
remaining = 0 | ||
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace) | ||
recordNodeEvent(recorder, nodeName, api.EventTypeNormal, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName)) | ||
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil { | ||
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err) | ||
complete = false | ||
} | ||
} else { | ||
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining) | ||
complete = false | ||
} | ||
|
||
if nextAttempt < remaining { | ||
nextAttempt = remaining | ||
} | ||
} | ||
return complete, nextAttempt, nil | ||
} |
Oops, something went wrong.