diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index d314a149101ea..44715dbac771f 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -196,7 +196,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st // TODO: Write an integration test for the replication controllers watch. go controllerManager.Run(3, util.NeverStop) - nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()), + nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 935224d9d03e9..7deea27fb0919 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -196,7 +196,7 @@ func (s *CMServer) Run(_ []string) error { } nodeController := nodecontroller.NewNodeController(cloud, kubeClient, - s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)), + s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index f354430640da5..ad6aa59408a27 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -123,7 +123,7 @@ func (s *CMServer) Run(_ []string) error { } nodeController := nodecontroller.NewNodeController(cloud, kubeClient, - s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)), + s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 22fa7f89c42bb..aa0f3e62ce79c 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -89,8 +89,11 @@ type NodeController struct { nodeStatusMap map[string]nodeStatusData now func() util.Time // worker that evicts pods from unresponsive nodes. - podEvictor *PodEvictor + podEvictor *RateLimitedTimedQueue + terminationEvictor *RateLimitedTimedQueue podEvictionTimeout time.Duration + // The maximum duration before a pod evicted from a node can be forcefully terminated. + maximumGracePeriod time.Duration recorder record.EventRecorder } @@ -99,7 +102,7 @@ func NewNodeController( cloud cloudprovider.Interface, kubeClient client.Interface, podEvictionTimeout time.Duration, - podEvictor *PodEvictor, + podEvictionLimiter util.RateLimiter, nodeMonitorGracePeriod time.Duration, nodeStartupGracePeriod time.Duration, nodeMonitorPeriod time.Duration, @@ -123,7 +126,9 @@ func NewNodeController( kubeClient: kubeClient, recorder: recorder, podEvictionTimeout: podEvictionTimeout, - podEvictor: podEvictor, + maximumGracePeriod: 5 * time.Minute, + podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false), + terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false), nodeStatusMap: make(map[string]nodeStatusData), nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorPeriod: nodeMonitorPeriod, @@ -145,38 +150,36 @@ func (nc *NodeController) Run(period time.Duration) { }, nc.nodeMonitorPeriod) go util.Forever(func() { - nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) }) + nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { + remaining, err := nc.deletePods(value.Value) + if err != nil { + util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) + return false, 0 + } + if remaining { + glog.V(2).Infof("Pods terminating on %q", value.Value) + nc.terminationEvictor.Add(value.Value) + } + return true, 0 + }) }, nodeEvictionPeriod) -} -// We observed a Node deletion in etcd. Currently we only need to remove Pods that -// were assigned to it. -func (nc *NodeController) deleteNode(nodeID string) error { - return nc.deletePods(nodeID) -} - -// deletePods will delete all pods from master running on given node. -func (nc *NodeController) deletePods(nodeID string) error { - glog.V(2).Infof("Delete all pods from %v", nodeID) - pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), - fields.OneTermEqualSelector(client.PodHost, nodeID)) - if err != nil { - return err - } - nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID)) - for _, pod := range pods.Items { - // Defensive check, also needed for tests. - if pod.Spec.NodeName != nodeID { - continue - } - glog.V(2).Infof("Delete pod %v", pod.Name) - nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID) - if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { - glog.Errorf("Error deleting pod %v: %v", pod.Name, err) - } - } - - return nil + // TODO: replace with a controller that ensures pods that are terminating complete + // in a particular time period + go util.Forever(func() { + nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { + remaining, err := nc.terminatePods(value.Value, value.Added) + if err != nil { + util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) + return false, 0 + } + if remaining != 0 { + glog.V(2).Infof("Pods still terminating on %q, estimated completion %s", value.Value, remaining) + return false, remaining + } + return true, 0 + }) + }, nodeEvictionPeriod) } // Generates num pod CIDRs that could be assigned to nodes. @@ -271,18 +274,18 @@ func (nc *NodeController) monitorNodeStatus() error { // Check eviction timeout against decisionTimestamp if lastReadyCondition.Status == api.ConditionFalse && decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { - if nc.podEvictor.AddNodeToEvict(node.Name) { + if nc.podEvictor.Add(node.Name) { glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) } } if lastReadyCondition.Status == api.ConditionUnknown && decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) { - if nc.podEvictor.AddNodeToEvict(node.Name) { + if nc.podEvictor.Add(node.Name) { glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) } } if lastReadyCondition.Status == api.ConditionTrue { - if nc.podEvictor.RemoveNodeToEvict(node.Name) { + if nc.podEvictor.Remove(node.Name) { glog.Infof("Pods on %v won't be evicted", node.Name) } } @@ -302,8 +305,8 @@ func (nc *NodeController) monitorNodeStatus() error { } if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound { glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name) - nc.recordNodeEvent(node.Name, "DeleteingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) - if err := nc.deletePods(node.Name); err != nil { + nc.recordNodeEvent(node.Name, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) + if _, err := nc.deletePods(node.Name); err != nil { glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) continue } @@ -311,6 +314,7 @@ func (nc *NodeController) monitorNodeStatus() error { glog.Errorf("Unable to delete node %s: %v", node.Name, err) continue } + nc.podEvictor.Add(node.Name) } } } @@ -502,3 +506,88 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap return gracePeriod, lastReadyCondition, readyCondition, err } + +// We observed a Node deletion in etcd. Currently we only need to remove Pods that +// were assigned to it. +func (nc *NodeController) deleteNode(nodeID string) error { + nc.podEvictor.Add(nodeID) + return nil +} + +// deletePods will delete all pods from master running on given node, and return true +// if any pods were deleted. +func (nc *NodeController) deletePods(nodeID string) (bool, error) { + remaining := false + glog.V(2).Infof("Delete all pods from %s", nodeID) + pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), + fields.OneTermEqualSelector(client.PodHost, nodeID)) + if err != nil { + return remaining, err + } + + nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID)) + + for _, pod := range pods.Items { + // Defensive check, also needed for tests. + if pod.Spec.NodeName != nodeID { + continue + } + // if the pod has already been deleted, ignore it + if pod.DeletionGracePeriodSeconds != nil { + continue + } + + glog.V(2).Infof("Delete pod %v", pod.Name) + nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID) + if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { + return false, err + } + remaining = true + } + return remaining, nil +} + +// terminatePods will ensure all pods on the given node that are in terminating state are eventually +// cleaned up +func (nc *NodeController) terminatePods(nodeID string, since time.Time) (time.Duration, error) { + remaining := time.Duration(0) + glog.V(2).Infof("Terminating all pods on %s", nodeID) + pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), + fields.OneTermEqualSelector(client.PodHost, nodeID)) + if err != nil { + return remaining, err + } + + nc.recordNodeEvent(nodeID, "TerminatingAllPods", fmt.Sprintf("Terminating all Pods on Node %s.", nodeID)) + + now := time.Now() + elapsed := now.Sub(since) + for _, pod := range pods.Items { + // Defensive check, also needed for tests. + if pod.Spec.NodeName != nodeID { + continue + } + // only clean terminated pods + if pod.DeletionGracePeriodSeconds == nil { + continue + } + + grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second + if grace > nc.maximumGracePeriod { + grace = nc.maximumGracePeriod + } + next := grace - elapsed + if next < 0 { + glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace) + nc.recordNodeEvent(nodeID, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeID)) + if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil { + glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err) + next = 1 + } + } + if remaining < next { + remaining = next + } + } + return remaining, nil +} diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index af73e2fc03dd1..6239673967193 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -324,9 +324,8 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } for _, item := range table { - podEvictor := NewPodEvictor(util.NewFakeRateLimiter()) nodeController := NewNodeController(nil, item.fakeNodeHandler, - evictionTimeout, podEvictor, testNodeMonitorGracePeriod, + evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -340,7 +339,17 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { t.Errorf("unexpected error: %v", err) } - podEvictor.TryEvict(func(nodeName string) { nodeController.deletePods(nodeName) }) + nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { + remaining, _ := nodeController.deletePods(value.Value) + if remaining { + nodeController.terminationEvictor.Add(value.Value) + } + return true, 0 + }) + nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { + nodeController.terminatePods(value.Value, value.Added) + return true, 0 + }) podEvicted := false for _, action := range item.fakeNodeHandler.Actions() { if action.GetVerb() == "delete" && action.GetResource() == "pods" { @@ -531,7 +540,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()), + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -610,7 +619,7 @@ func TestNodeDeletion(t *testing.T) { Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), } - nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()), + nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil { @@ -620,6 +629,10 @@ func TestNodeDeletion(t *testing.T) { if err := nodeController.monitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } + nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { + nodeController.deletePods(value.Value) + return true, 0 + }) podEvicted := false for _, action := range fakeNodeHandler.Actions() { if action.GetVerb() == "delete" && action.GetResource() == "pods" { diff --git a/pkg/controller/node/podevictor.go b/pkg/controller/node/podevictor.go deleted file mode 100644 index 1c66d0a211edf..0000000000000 --- a/pkg/controller/node/podevictor.go +++ /dev/null @@ -1,129 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package nodecontroller - -import ( - "sync" - - "k8s.io/kubernetes/pkg/util" - - "github.com/golang/glog" -) - -// A FIFO queue which additionally guarantees that any element can be added only once until -// it is removed. -type UniqueQueue struct { - lock sync.Mutex - queue []string - set util.StringSet -} - -// Entity responsible for evicting Pods from inserted Nodes. It uses RateLimiter to avoid -// evicting everything at once. Note that we rate limit eviction of Nodes not individual Pods. -type PodEvictor struct { - queue UniqueQueue - deletingPodsRateLimiter util.RateLimiter -} - -// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the -// Remove call. Returns true if new value was added. -func (q *UniqueQueue) Add(value string) bool { - q.lock.Lock() - defer q.lock.Unlock() - - if !q.set.Has(value) { - q.queue = append(q.queue, value) - q.set.Insert(value) - return true - } else { - return false - } -} - -// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition -// of the given value. If the value is not present does nothing and returns false. -func (q *UniqueQueue) Remove(value string) bool { - q.lock.Lock() - defer q.lock.Unlock() - - q.set.Delete(value) - for i, val := range q.queue { - if val == value { - if i > 0 && i < len(q.queue)-1 { - q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...) - } else if i > 0 { - q.queue = q.queue[0 : len(q.queue)-1] - } else { - q.queue = q.queue[1:len(q.queue)] - } - return true - } - } - return false -} - -// Returns the oldest added value that wasn't returned yet. -func (q *UniqueQueue) Get() (string, bool) { - q.lock.Lock() - defer q.lock.Unlock() - if len(q.queue) == 0 { - return "", false - } - - result := q.queue[0] - q.queue = q.queue[1:len(q.queue)] - return result, true -} - -// Creates new PodEvictor which will use given RateLimiter to oversee eviction. -func NewPodEvictor(deletingPodsRateLimiter util.RateLimiter) *PodEvictor { - return &PodEvictor{ - queue: UniqueQueue{ - queue: make([]string, 0), - set: util.NewStringSet(), - }, - deletingPodsRateLimiter: deletingPodsRateLimiter, - } -} - -// Tries to evict all Pods from previously inserted Nodes. Ends prematurely if RateLimiter forbids any eviction. -// Each Node is processed only once, as long as it's not Removed, i.e. calling multiple AddNodeToEvict does not result -// with multiple evictions as long as RemoveNodeToEvict is not called. -func (pe *PodEvictor) TryEvict(delFunc func(string)) { - val, ok := pe.queue.Get() - for ok { - if pe.deletingPodsRateLimiter.CanAccept() { - glog.Infof("PodEvictor is evicting Pods on Node: %v", val) - delFunc(val) - } else { - glog.V(1).Info("PodEvictor is rate limitted.") - break - } - val, ok = pe.queue.Get() - } -} - -// Adds Node to the Evictor to be processed later. Won't add the same Node second time if it was already -// added and not removed. -func (pe *PodEvictor) AddNodeToEvict(nodeName string) bool { - return pe.queue.Add(nodeName) -} - -// Removes Node from the Evictor. The Node won't be processed until added again. -func (pe *PodEvictor) RemoveNodeToEvict(nodeName string) bool { - return pe.queue.Remove(nodeName) -} diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/rate_limited_queue.go new file mode 100644 index 0000000000000..a30d62a1a8aad --- /dev/null +++ b/pkg/controller/node/rate_limited_queue.go @@ -0,0 +1,183 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodecontroller + +import ( + "container/heap" + "sync" + "time" + + "k8s.io/kubernetes/pkg/util" +) + +// TimedValue is a value that should be processed at a designated time. +type TimedValue struct { + Value string + Added time.Time + Next time.Time +} + +// now is used to test time +var now func() time.Time = time.Now + +// TimedQueue is a priority heap where the lowest Next is at the front of the queue +type TimedQueue []*TimedValue + +func (h TimedQueue) Len() int { return len(h) } +func (h TimedQueue) Less(i, j int) bool { return h[i].Next.Before(h[j].Next) } +func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *TimedQueue) Push(x interface{}) { + *h = append(*h, x.(*TimedValue)) +} + +func (h *TimedQueue) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// A FIFO queue which additionally guarantees that any element can be added only once until +// it is removed. +type UniqueQueue struct { + lock sync.Mutex + queue TimedQueue + set util.StringSet +} + +// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the +// Remove call. Returns true if new value was added. +func (q *UniqueQueue) Add(value TimedValue) bool { + q.lock.Lock() + defer q.lock.Unlock() + + if q.set.Has(value.Value) { + return false + } + heap.Push(&q.queue, &value) + q.set.Insert(value.Value) + return true +} + +// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition +// of the given value. If the value is not present does nothing and returns false. +func (q *UniqueQueue) Remove(value string) bool { + q.lock.Lock() + defer q.lock.Unlock() + + q.set.Delete(value) + for i, val := range q.queue { + if val.Value == value { + if i > 0 && i < len(q.queue)-1 { + q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...) + } else if i > 0 { + q.queue = q.queue[0 : len(q.queue)-1] + } else { + q.queue = q.queue[1:len(q.queue)] + } + return true + } + } + return false +} + +// Returns the oldest added value that wasn't returned yet. +func (q *UniqueQueue) Get() (TimedValue, bool) { + q.lock.Lock() + defer q.lock.Unlock() + if len(q.queue) == 0 { + return TimedValue{}, false + } + result := q.queue.Pop().(*TimedValue) + q.set.Delete(result.Value) + return *result, true +} + +// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time +// of execution. It is also rate limited. +type RateLimitedTimedQueue struct { + queue UniqueQueue + limiter util.RateLimiter + leak bool +} + +// Creates new queue which will use given RateLimiter to oversee execution. If leak is true, +// items which are rate limited will be leakped. Otherwise, rate limited items will be requeued. +func NewRateLimitedTimedQueue(limiter util.RateLimiter, leak bool) *RateLimitedTimedQueue { + return &RateLimitedTimedQueue{ + queue: UniqueQueue{ + queue: TimedQueue{}, + set: util.NewStringSet(), + }, + limiter: limiter, + leak: leak, + } +} + +// ActionFunc takes a timed value and returns false if the item must be retried, with an optional +// time.Duration if some minimum wait interval should be used. +type ActionFunc func(TimedValue) (bool, time.Duration) + +// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true. +// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true, +// otherwise it is added back to the queue. The returned remaining is used to identify the minimum +// time to execute the next item in the queue. +func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { + val, ok := q.queue.Get() + for ok { + // rate limit the queue checking + if q.leak { + if !q.limiter.CanAccept() { + break + } + } else { + q.limiter.Accept() + } + + now := now() + if now.Before(val.Next) { + q.queue.Add(val) + val, ok = q.queue.Get() + // we do not sleep here because other values may be added at the front of the queue + continue + } + + if ok, wait := fn(val); !ok { + val.Next = now.Add(wait + 1) + q.queue.Add(val) + } + val, ok = q.queue.Get() + } +} + +// Adds value to the queue to be processed. Won't add the same value a second time if it was already +// added and not removed. +func (q *RateLimitedTimedQueue) Add(value string) bool { + now := now() + return q.queue.Add(TimedValue{ + Value: value, + Added: now, + Next: now, + }) +} + +// Removes Node from the Evictor. The Node won't be processed until added again. +func (q *RateLimitedTimedQueue) Remove(value string) bool { + return q.queue.Remove(value) +} diff --git a/pkg/controller/node/podevictor_test.go b/pkg/controller/node/rate_limited_queue_test.go similarity index 67% rename from pkg/controller/node/podevictor_test.go rename to pkg/controller/node/rate_limited_queue_test.go index dd9532efff7df..d57baf9d24df2 100644 --- a/pkg/controller/node/podevictor_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -17,14 +17,16 @@ limitations under the License. package nodecontroller import ( + "reflect" "testing" + "time" "k8s.io/kubernetes/pkg/util" ) -func CheckQueueEq(lhs, rhs []string) bool { +func CheckQueueEq(lhs []string, rhs TimedQueue) bool { for i := 0; i < len(lhs); i++ { - if rhs[i] != lhs[i] { + if rhs[i].Value != lhs[i] { return false } } @@ -36,10 +38,10 @@ func CheckSetEq(lhs, rhs util.StringSet) bool { } func TestAddNode(t *testing.T) { - evictor := NewPodEvictor(util.NewFakeRateLimiter()) - evictor.AddNodeToEvict("first") - evictor.AddNodeToEvict("second") - evictor.AddNodeToEvict("third") + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor.Add("first") + evictor.Add("second") + evictor.Add("third") queuePattern := []string{"first", "second", "third"} if len(evictor.queue.queue) != len(queuePattern) { @@ -59,11 +61,11 @@ func TestAddNode(t *testing.T) { } func TestDelNode(t *testing.T) { - evictor := NewPodEvictor(util.NewFakeRateLimiter()) - evictor.AddNodeToEvict("first") - evictor.AddNodeToEvict("second") - evictor.AddNodeToEvict("third") - evictor.RemoveNodeToEvict("first") + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor.Add("first") + evictor.Add("second") + evictor.Add("third") + evictor.Remove("first") queuePattern := []string{"second", "third"} if len(evictor.queue.queue) != len(queuePattern) { @@ -81,11 +83,11 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewPodEvictor(util.NewFakeRateLimiter()) - evictor.AddNodeToEvict("first") - evictor.AddNodeToEvict("second") - evictor.AddNodeToEvict("third") - evictor.RemoveNodeToEvict("second") + evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor.Add("first") + evictor.Add("second") + evictor.Add("third") + evictor.Remove("second") queuePattern = []string{"first", "third"} if len(evictor.queue.queue) != len(queuePattern) { @@ -103,11 +105,11 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewPodEvictor(util.NewFakeRateLimiter()) - evictor.AddNodeToEvict("first") - evictor.AddNodeToEvict("second") - evictor.AddNodeToEvict("third") - evictor.RemoveNodeToEvict("third") + evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor.Add("first") + evictor.Add("second") + evictor.Add("third") + evictor.Remove("third") queuePattern = []string{"first", "second"} if len(evictor.queue.queue) != len(queuePattern) { @@ -126,15 +128,18 @@ func TestDelNode(t *testing.T) { } } -func TestEvictNode(t *testing.T) { - evictor := NewPodEvictor(util.NewFakeRateLimiter()) - evictor.AddNodeToEvict("first") - evictor.AddNodeToEvict("second") - evictor.AddNodeToEvict("third") - evictor.RemoveNodeToEvict("second") +func TestTry(t *testing.T) { + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor.Add("first") + evictor.Add("second") + evictor.Add("third") + evictor.Remove("second") deletedMap := util.NewStringSet() - evictor.TryEvict(func(nodeName string) { deletedMap.Insert(nodeName) }) + evictor.Try(func(value TimedValue) (bool, time.Duration) { + deletedMap.Insert(value.Value) + return true, 0 + }) setPattern := util.NewStringSet("first", "third") if len(deletedMap) != len(setPattern) { @@ -144,3 +149,35 @@ func TestEvictNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern) } } + +func TestTryOrdering(t *testing.T) { + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false) + evictor.Add("first") + evictor.Add("second") + evictor.Add("third") + + order := []string{} + count := 0 + queued := false + evictor.Try(func(value TimedValue) (bool, time.Duration) { + count++ + if value.Added.IsZero() { + t.Fatalf("added should not be zero") + } + if value.Next.IsZero() { + t.Fatalf("next should not be zero") + } + if !queued && value.Value == "second" { + queued = true + return false, time.Millisecond + } + order = append(order, value.Value) + return true, 0 + }) + if reflect.DeepEqual(order, []string{"first", "third", "second"}) { + t.Fatalf("order was wrong: %v", order) + } + if count != 4 { + t.Fatalf("unexpected iterations: %d", count) + } +}