Skip to content

Commit

Permalink
Handle gracefully terminated pods in node controller
Browse files Browse the repository at this point in the history
Eviction should retry longer and wait for completion of the pod.
  • Loading branch information
smarterclayton committed Aug 18, 2015
1 parent 780accb commit edb1088
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 203 deletions.
2 changes: 1 addition & 1 deletion cmd/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
// TODO: Write an integration test for the replication controllers watch.
go controllerManager.Run(3, util.NeverStop)

nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()),
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisor.Fake)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (s *CMServer) Run(_ []string) error {
}

nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)

Expand Down
2 changes: 1 addition & 1 deletion contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *CMServer) Run(_ []string) error {
}

nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)

Expand Down
165 changes: 127 additions & 38 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,11 @@ type NodeController struct {
nodeStatusMap map[string]nodeStatusData
now func() util.Time
// worker that evicts pods from unresponsive nodes.
podEvictor *PodEvictor
podEvictor *RateLimitedTimedQueue
terminationEvictor *RateLimitedTimedQueue
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
}

Expand All @@ -99,7 +102,7 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient client.Interface,
podEvictionTimeout time.Duration,
podEvictor *PodEvictor,
podEvictionLimiter util.RateLimiter,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
Expand All @@ -123,7 +126,9 @@ func NewNodeController(
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
podEvictor: podEvictor,
maximumGracePeriod: 5 * time.Minute,
podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
Expand All @@ -145,38 +150,36 @@ func (nc *NodeController) Run(period time.Duration) {
}, nc.nodeMonitorPeriod)

go util.Forever(func() {
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.deletePods(value.Value)
if err != nil {
util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
glog.V(2).Infof("Pods terminating on %q", value.Value)
nc.terminationEvictor.Add(value.Value)
}
return true, 0
})
}, nodeEvictionPeriod)
}

// We observed a Node deletion in etcd. Currently we only need to remove Pods that
// were assigned to it.
func (nc *NodeController) deleteNode(nodeID string) error {
return nc.deletePods(nodeID)
}

// deletePods will delete all pods from master running on given node.
func (nc *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeID))
if err != nil {
return err
}
nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeID {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
}
}

return nil
// TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period
go util.Forever(func() {
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.terminatePods(value.Value, value.Added)
if err != nil {
util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
if remaining != 0 {
glog.V(2).Infof("Pods still terminating on %q, estimated completion %s", value.Value, remaining)
return false, remaining
}
return true, 0
})
}, nodeEvictionPeriod)
}

// Generates num pod CIDRs that could be assigned to nodes.
Expand Down Expand Up @@ -271,18 +274,18 @@ func (nc *NodeController) monitorNodeStatus() error {
// Check eviction timeout against decisionTimestamp
if lastReadyCondition.Status == api.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.podEvictor.AddNodeToEvict(node.Name) {
if nc.podEvictor.Add(node.Name) {
glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
}
}
if lastReadyCondition.Status == api.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
if nc.podEvictor.AddNodeToEvict(node.Name) {
if nc.podEvictor.Add(node.Name) {
glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
}
}
if lastReadyCondition.Status == api.ConditionTrue {
if nc.podEvictor.RemoveNodeToEvict(node.Name) {
if nc.podEvictor.Remove(node.Name) {
glog.Infof("Pods on %v won't be evicted", node.Name)
}
}
Expand All @@ -302,15 +305,16 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nc.recordNodeEvent(node.Name, "DeleteingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
if err := nc.deletePods(node.Name); err != nil {
nc.recordNodeEvent(node.Name, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
if _, err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
continue
}
if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
continue
}
nc.podEvictor.Add(node.Name)
}
}
}
Expand Down Expand Up @@ -502,3 +506,88 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap

return gracePeriod, lastReadyCondition, readyCondition, err
}

// We observed a Node deletion in etcd. Currently we only need to remove Pods that
// were assigned to it.
func (nc *NodeController) deleteNode(nodeID string) error {
nc.podEvictor.Add(nodeID)
return nil
}

// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func (nc *NodeController) deletePods(nodeID string) (bool, error) {
remaining := false
glog.V(2).Infof("Delete all pods from %s", nodeID)
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeID))
if err != nil {
return remaining, err
}

nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))

for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeID {
continue
}
// if the pod has already been deleted, ignore it
if pod.DeletionGracePeriodSeconds != nil {
continue
}

glog.V(2).Infof("Delete pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
return remaining, nil
}

// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up
func (nc *NodeController) terminatePods(nodeID string, since time.Time) (time.Duration, error) {
remaining := time.Duration(0)
glog.V(2).Infof("Terminating all pods on %s", nodeID)
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeID))
if err != nil {
return remaining, err
}

nc.recordNodeEvent(nodeID, "TerminatingAllPods", fmt.Sprintf("Terminating all Pods on Node %s.", nodeID))

now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeID {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}

grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > nc.maximumGracePeriod {
grace = nc.maximumGracePeriod
}
next := grace - elapsed
if next < 0 {
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
nc.recordNodeEvent(nodeID, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeID))
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
next = 1
}
}
if remaining < next {
remaining = next
}
}
return remaining, nil
}
23 changes: 18 additions & 5 deletions pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,9 +324,8 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}

for _, item := range table {
podEvictor := NewPodEvictor(util.NewFakeRateLimiter())
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, podEvictor, testNodeMonitorGracePeriod,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand All @@ -340,7 +339,17 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}

podEvictor.TryEvict(func(nodeName string) { nodeController.deletePods(nodeName) })
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, _ := nodeController.deletePods(value.Value)
if remaining {
nodeController.terminationEvictor.Add(value.Value)
}
return true, 0
})
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.terminatePods(value.Value, value.Added)
return true, 0
})
podEvicted := false
for _, action := range item.fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource() == "pods" {
Expand Down Expand Up @@ -531,7 +540,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}

for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand Down Expand Up @@ -610,7 +619,7 @@ func TestNodeDeletion(t *testing.T) {
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}

nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
Expand All @@ -620,6 +629,10 @@ func TestNodeDeletion(t *testing.T) {
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.deletePods(value.Value)
return true, 0
})
podEvicted := false
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource() == "pods" {
Expand Down
Loading

0 comments on commit edb1088

Please sign in to comment.