Skip to content

Commit

Permalink
Merge pull request kubernetes#22708 from mikedanese/revert-revert
Browse files Browse the repository at this point in the history
Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Mar 8, 2016
2 parents dba955e + d8eaed9 commit fa21ef1
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 14 deletions.
37 changes: 25 additions & 12 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
Expand Down Expand Up @@ -284,6 +285,8 @@ func (nc *NodeController) Run(period time.Duration) {
return false, remaining
})
}, nodeEvictionPeriod, wait.NeverStop)

go wait.Until(nc.cleanupOrphanedPods, 30*time.Second, wait.NeverStop)
}

// Generates num pod CIDRs that could be assigned to nodes.
Expand Down Expand Up @@ -368,6 +371,28 @@ func (nc *NodeController) maybeDeleteTerminatingPod(obj interface{}) {
}
}

// cleanupOrphanedPods deletes pods that are bound to nodes that don't
// exist.
func (nc *NodeController) cleanupOrphanedPods() {
pods, err := nc.podStore.List(labels.Everything())
if err != nil {
utilruntime.HandleError(err)
return
}

for _, pod := range pods {
if pod.Spec.NodeName == "" {
continue
}
if _, exists, _ := nc.nodeStore.Store.GetByKey(pod.Spec.NodeName); exists {
continue
}
if err := nc.forcefullyDeletePod(pod); err != nil {
utilruntime.HandleError(err)
}
}
}

func forcefullyDeletePod(c clientset.Interface, pod *api.Pod) error {
var zero int64
err := c.Core().Pods(pod.Namespace).Delete(pod.Name, &api.DeleteOptions{GracePeriodSeconds: &zero})
Expand Down Expand Up @@ -759,18 +784,6 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
return gracePeriod, lastReadyCondition, readyCondition, err
}

// returns true if the provided node still has pods scheduled to it, or an error if
// the server could not be contacted.
func (nc *NodeController) hasPods(nodeName string) (bool, error) {
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName)
options := api.ListOptions{FieldSelector: selector}
pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options)
if err != nil {
return false, err
}
return len(pods.Items) > 0, nil
}

// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool {
Expand Down
42 changes: 42 additions & 0 deletions pkg/controller/node/nodecontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,48 @@ func TestCheckPod(t *testing.T) {
}
}

func TestCleanupOrphanedPods(t *testing.T) {
newPod := func(name, node string) api.Pod {
return api.Pod{
ObjectMeta: api.ObjectMeta{
Name: name,
},
Spec: api.PodSpec{
NodeName: node,
},
}
}
pods := []api.Pod{
newPod("a", "foo"),
newPod("b", "bar"),
newPod("c", "gone"),
}
nc := NewNodeController(nil, nil, 0, nil, nil, 0, 0, 0, nil, false)

nc.nodeStore.Store.Add(newNode("foo"))
nc.nodeStore.Store.Add(newNode("bar"))
for _, pod := range pods {
p := pod
nc.podStore.Store.Add(&p)
}

var deleteCalls int
var deletedPodName string
nc.forcefullyDeletePod = func(p *api.Pod) error {
deleteCalls++
deletedPodName = p.ObjectMeta.Name
return nil
}
nc.cleanupOrphanedPods()

if deleteCalls != 1 {
t.Fatalf("expected one delete, got: %v", deleteCalls)
}
if deletedPodName != "c" {
t.Fatalf("expected deleted pod name to be 'c', but got: %q", deletedPodName)
}
}

func newNode(name string) *api.Node {
return &api.Node{
ObjectMeta: api.ObjectMeta{Name: name},
Expand Down
6 changes: 4 additions & 2 deletions test/e2e/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ var _ = Describe("Garbage collector [Slow]", func() {
gcThreshold := 100

By(fmt.Sprintf("Waiting for gc controller to gc all but %d pods", gcThreshold))
pollErr := wait.Poll(30*time.Second, timeout, func() (bool, error) {
pollErr := wait.Poll(1*time.Minute, timeout, func() (bool, error) {
pods, err = f.Client.Pods(f.Namespace.Name).List(api.ListOptions{})
if err != nil {
Logf("Failed to list pod %v", err)
Expand All @@ -84,9 +84,11 @@ func createTerminatingPod(f *Framework) (*api.Pod, error) {
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: string(uuid),
Annotations: map[string]string{
"scheduler.alpha.kubernetes.io/name": "please don't schedule my pods",
},
},
Spec: api.PodSpec{
NodeName: "nonexistant-node",
Containers: []api.Container{
{
Name: string(uuid),
Expand Down

0 comments on commit fa21ef1

Please sign in to comment.