Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate rate limiters for Pod evictions for different zones in NodeController #28843

Merged
merged 1 commit into from
Jul 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ import (
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/configz"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"

"github.com/golang/glog"
Expand Down Expand Up @@ -239,8 +238,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Warningf("Unsuccessful parsing of service CIDR %v: %v", s.ServiceCIDR, err)
}
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
Expand Down
4 changes: 1 addition & 3 deletions contrib/mesos/pkg/controllermanager/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ import (
quotainstall "k8s.io/kubernetes/pkg/quota/install"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/crypto"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/wait"

"k8s.io/kubernetes/contrib/mesos/pkg/profile"
Expand Down Expand Up @@ -155,8 +154,7 @@ func (s *CMServer) Run(_ []string) error {
_, clusterCIDR, _ := net.ParseCIDR(s.ClusterCIDR)
_, serviceCIDR, _ := net.ParseCIDR(s.ServiceCIDR)
nodeController := nodecontroller.NewNodeController(cloud, clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "node-controller")),
s.PodEvictionTimeout.Duration, flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
flowcontrol.NewTokenBucketRateLimiter(s.DeletingPodsQps, int(s.DeletingPodsBurst)),
s.PodEvictionTimeout.Duration, s.DeletingPodsQps, int(s.DeletingPodsBurst),
s.NodeMonitorGracePeriod.Duration, s.NodeStartupGracePeriod.Duration, s.NodeMonitorPeriod.Duration, clusterCIDR, serviceCIDR, int(s.NodeCIDRMaskSize), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod.Duration)

Expand Down
127 changes: 77 additions & 50 deletions pkg/controller/node/nodecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,12 @@ type nodeStatusData struct {
}

type NodeController struct {
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
deletingPodsRateLimiter flowcontrol.RateLimiter
knownNodeSet map[string]*api.Node
kubeClient clientset.Interface
allocateNodeCIDRs bool
cloud cloudprovider.Interface
clusterCIDR *net.IPNet
serviceCIDR *net.IPNet
knownNodeSet map[string]*api.Node
kubeClient clientset.Interface
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
// Value used if sync_nodes_status=False. NodeController will not proactively
Expand Down Expand Up @@ -112,9 +111,11 @@ type NodeController struct {
// Lock to access evictor workers
evictorLock *sync.Mutex
// workers that evicts pods from unresponsive nodes.
podEvictor *RateLimitedTimedQueue
terminationEvictor *RateLimitedTimedQueue
podEvictionTimeout time.Duration
zonePodEvictor map[string]*RateLimitedTimedQueue
zoneTerminationEvictor map[string]*RateLimitedTimedQueue
evictionLimiterQPS float32
evictionLimiterBurst int
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
Expand Down Expand Up @@ -142,8 +143,8 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
deletionEvictionLimiter flowcontrol.RateLimiter,
terminationEvictionLimiter flowcontrol.RateLimiter,
evictionLimiterQPS float32,
evictionLimiterBurst int,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
Expand Down Expand Up @@ -184,8 +185,8 @@ func NewNodeController(
podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute,
evictorLock: &evictorLock,
podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter),
zonePodEvictor: make(map[string]*RateLimitedTimedQueue),
zoneTerminationEvictor: make(map[string]*RateLimitedTimedQueue),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
Expand All @@ -198,6 +199,8 @@ func NewNodeController(
forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) },
nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) },
computeZoneStateFunc: ComputeZoneState,
evictionLimiterQPS: evictionLimiterQPS,
evictionLimiterBurst: evictionLimiterBurst,
zoneStates: make(map[string]zoneState),
}

Expand Down Expand Up @@ -309,45 +312,49 @@ func (nc *NodeController) Run(period time.Duration) {
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
for k := range nc.zonePodEvictor {
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nc.daemonSetStore)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}

if remaining {
nc.terminationEvictor.Add(value.Value)
}
return true, 0
})
if remaining {
nc.zoneTerminationEvictor[k].Add(value.Value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/zoneTerminationEvictor/zonePodEvictor/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe you should even do (performance is not important in this case):

for _, evictor := range nv.zonePodEvictor {
  evictor.Try(...

     evictor.Add(...)
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed f2f

}
return true, 0
})
}
}, nodeEvictionPeriod, wait.NeverStop)

// TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period
go wait.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
for k := range nc.zoneTerminationEvictor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the same here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed f2f

nc.zoneTerminationEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := terminatePods(nc.kubeClient, nc.recorder, value.Value, value.AddedAt, nc.maximumGracePeriod)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}

if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
if completed {
glog.V(2).Infof("All pods terminated on %s", value.Value)
recordNodeEvent(nc.recorder, value.Value, api.EventTypeNormal, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}

glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
// clamp very short intervals
if remaining < nodeEvictionPeriod {
remaining = nodeEvictionPeriod
}
return false, remaining
})
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
// clamp very short intervals
if remaining < nodeEvictionPeriod {
remaining = nodeEvictionPeriod
}
return false, remaining
})
}
}, nodeEvictionPeriod, wait.NeverStop)

go wait.Until(func() {
Expand All @@ -372,8 +379,19 @@ func (nc *NodeController) monitorNodeStatus() error {
for i := range added {
glog.V(1).Infof("NodeController observed a new Node: %#v", added[i].Name)
recordNodeEvent(nc.recorder, added[i].Name, api.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", added[i].Name))
nc.cancelPodEviction(added[i])
nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
if _, found := nc.zonePodEvictor[zone]; !found {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
}
if _, found := nc.zoneTerminationEvictor[zone]; !found {
nc.zoneTerminationEvictor[zone] = NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, nc.evictionLimiterBurst))
}
nc.cancelPodEviction(added[i])
}

for i := range deleted {
Expand Down Expand Up @@ -689,10 +707,11 @@ func (nc *NodeController) checkForNodeAddedDeleted(nodes *api.NodeList) (added,
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
zone := utilnode.GetZoneKey(node)
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
wasDeleting := nc.podEvictor.Remove(node.Name)
wasTerminating := nc.terminationEvictor.Remove(node.Name)
wasDeleting := nc.zonePodEvictor[zone].Remove(node.Name)
wasTerminating := nc.zoneTerminationEvictor[zone].Remove(node.Name)
if wasDeleting || wasTerminating {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", node.Name)
return true
Expand All @@ -703,10 +722,18 @@ func (nc *NodeController) cancelPodEviction(node *api.Node) bool {
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(node *api.Node) bool {
if nc.zoneStates[utilnode.GetZoneKey(node)] == stateFullSegmentation {
return false
}
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(node.Name)
foundHealty := false
for _, state := range nc.zoneStates {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you are iterating over all zones - shouldn't you only look into zone in which the node is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the current logic.

if state != stateFullSegmentation {
foundHealty = true
break
}
}
if !foundHealty {
return false
}
zone := utilnode.GetZoneKey(node)
return nc.zonePodEvictor[zone].Add(node.Name)
}
Loading