-
Notifications
You must be signed in to change notification settings - Fork 40k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Separate rate limiters for Pod evictions for different zones in NodeController #28843
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,13 +73,12 @@ type nodeStatusData struct { | |
} | ||
|
||
type NodeController struct { | ||
allocateNodeCIDRs bool | ||
cloud cloudprovider.Interface | ||
clusterCIDR *net.IPNet | ||
serviceCIDR *net.IPNet | ||
deletingPodsRateLimiter flowcontrol.RateLimiter | ||
knownNodeSet map[string]*api.Node | ||
kubeClient clientset.Interface | ||
allocateNodeCIDRs bool | ||
cloud cloudprovider.Interface | ||
clusterCIDR *net.IPNet | ||
serviceCIDR *net.IPNet | ||
knownNodeSet map[string]*api.Node | ||
kubeClient clientset.Interface | ||
// Method for easy mocking in unittest. | ||
lookupIP func(host string) ([]net.IP, error) | ||
// Value used if sync_nodes_status=False. NodeController will not proactively | ||
|
@@ -112,9 +111,11 @@ type NodeController struct { | |
// Lock to access evictor workers | ||
evictorLock *sync.Mutex | ||
// workers that evicts pods from unresponsive nodes. | ||
podEvictor *RateLimitedTimedQueue | ||
terminationEvictor *RateLimitedTimedQueue | ||
podEvictionTimeout time.Duration | ||
zonePodEvictor map[string]*RateLimitedTimedQueue | ||
zoneTerminationEvictor map[string]*RateLimitedTimedQueue | ||
evictionLimiterQPS float32 | ||
evictionLimiterBurst int | ||
podEvictionTimeout time.Duration | ||
// The maximum duration before a pod evicted from a node can be forcefully terminated. | ||
maximumGracePeriod time.Duration | ||
recorder record.EventRecorder | ||
|
@@ -142,8 +143,8 @@ func NewNodeController( | |
cloud cloudprovider.Interface, | ||
kubeClient clientset.Interface, | ||
podEvictionTimeout time.Duration, | ||
deletionEvictionLimiter flowcontrol.RateLimiter, | ||
terminationEvictionLimiter flowcontrol.RateLimiter, | ||
evictionLimiterQPS float32, | ||
evictionLimiterBurst int, | ||
nodeMonitorGracePeriod time.Duration, | ||
nodeStartupGracePeriod time.Duration, | ||
nodeMonitorPeriod time.Duration, | ||
|
@@ -184,8 +185,8 @@ func NewNodeController( | |
podEvictionTimeout: podEvictionTimeout, | ||
maximumGracePeriod: 5 * time.Minute, | ||
evictorLock: &evictorLock, | ||
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter), | ||
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter), | ||
zonePodEvictor: make(map[string]*RateLimitedTimedQueue), | ||
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue), | ||
nodeStatusMap: make(map[string]nodeStatusData), | ||
nodeMonitorGracePeriod: nodeMonitorGracePeriod, | ||
nodeMonitorPeriod: nodeMonitorPeriod, | ||
|
@@ -198,6 +199,8 @@ func NewNodeController( | |
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, | ||
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, | ||
computeZoneStateFunc: ComputeZoneState, | ||
evictionLimiterQPS: evictionLimiterQPS, | ||
evictionLimiterBurst: evictionLimiterBurst, | ||
zoneStates: make(map[string]zoneState), | ||
} | ||
|
||
|
@@ -309,45 +312,49 @@ func (nc *NodeController) Run(period time.Duration) { | |
go wait.Until(func() { | ||
nc.evictorLock.Lock() | ||
defer nc.evictorLock.Unlock() | ||
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { | ||
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore) | ||
if err != nil { | ||
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) | ||
return false, 0 | ||
} | ||
for k := range nc.zonePodEvictor { | ||
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) { | ||
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore) | ||
if err != nil { | ||
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) | ||
return false, 0 | ||
} | ||
|
||
if remaining { | ||
nc.terminationEvictor.Add(value.Value) | ||
} | ||
return true, 0 | ||
}) | ||
if remaining { | ||
nc.zoneTerminationEvictor[k].Add(value.Value) | ||
} | ||
return true, 0 | ||
}) | ||
} | ||
}, nodeEvictionPeriod, wait.NeverStop) | ||
|
||
// TODO: replace with a controller that ensures pods that are terminating complete | ||
// in a particular time period | ||
go wait.Until(func() { | ||
nc.evictorLock.Lock() | ||
defer nc.evictorLock.Unlock() | ||
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { | ||
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod) | ||
if err != nil { | ||
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) | ||
return false, 0 | ||
} | ||
for k := range nc.zoneTerminationEvictor { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And the same here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed f2f |
||
nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) { | ||
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod) | ||
if err != nil { | ||
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) | ||
return false, 0 | ||
} | ||
|
||
if completed { | ||
glog.V(2).Infof("All pods terminated on %s", value.Value) | ||
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) | ||
return true, 0 | ||
} | ||
if completed { | ||
glog.V(2).Infof("All pods terminated on %s", value.Value) | ||
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) | ||
return true, 0 | ||
} | ||
|
||
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining) | ||
// clamp very short intervals | ||
if remaining < nodeEvictionPeriod { | ||
remaining = nodeEvictionPeriod | ||
} | ||
return false, remaining | ||
}) | ||
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining) | ||
// clamp very short intervals | ||
if remaining < nodeEvictionPeriod { | ||
remaining = nodeEvictionPeriod | ||
} | ||
return false, remaining | ||
}) | ||
} | ||
}, nodeEvictionPeriod, wait.NeverStop) | ||
|
||
go wait.Until(func() { | ||
|
@@ -372,8 +379,19 @@ func (nc *NodeController) monitorNodeStatus() error { | |
for i := range added { | ||
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name) | ||
recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name)) | ||
nc.cancelPodEviction(added[i]) | ||
nc.knownNodeSet[added[i].Name] = added[i] | ||
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor. | ||
zone := utilnode.GetZoneKey(added[i]) | ||
if _, found := nc.zonePodEvictor[zone]; !found { | ||
nc.zonePodEvictor[zone] = | ||
NewRateLimitedTimedQueue( | ||
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst)) | ||
} | ||
if _, found := nc.zoneTerminationEvictor[zone]; !found { | ||
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue( | ||
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst)) | ||
} | ||
nc.cancelPodEviction(added[i]) | ||
} | ||
|
||
for i := range deleted { | ||
|
@@ -689,10 +707,11 @@ func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added, | |
// cancelPodEviction removes any queued evictions, typically because the node is available again. It | ||
// returns true if an eviction was queued. | ||
func (nc *NodeController) cancelPodEviction(node *api.Node) bool { | ||
zone := utilnode.GetZoneKey(node) | ||
nc.evictorLock.Lock() | ||
defer nc.evictorLock.Unlock() | ||
wasDeleting := nc.podEvictor.Remove(node.Name) | ||
wasTerminating := nc.terminationEvictor.Remove(node.Name) | ||
wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name) | ||
wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name) | ||
if wasDeleting || wasTerminating { | ||
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name) | ||
return true | ||
|
@@ -703,10 +722,18 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool { | |
// evictPods queues an eviction for the provided node name, and returns false if the node is already | ||
// queued for eviction. | ||
func (nc *NodeController) evictPods(node *api.Node) bool { | ||
if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation { | ||
return false | ||
} | ||
nc.evictorLock.Lock() | ||
defer nc.evictorLock.Unlock() | ||
return nc.podEvictor.Add(node.Name) | ||
foundHealty := false | ||
for _, state := range nc.zoneStates { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why you are iterating over all zones - shouldn't you only look into zone in which the node is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the current logic. |
||
if state != stateFullSegmentation { | ||
foundHealty = true | ||
break | ||
} | ||
} | ||
if !foundHealty { | ||
return false | ||
} | ||
zone := utilnode.GetZoneKey(node) | ||
return nc.zonePodEvictor[zone].Add(node.Name) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/zoneTerminationEvictor/zonePodEvictor/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or maybe you should even do (performance is not important in this case):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed f2f