Skip to content

Commit

Permalink
Merge pull request #32776 from m1093782566/m109-fix-endpoint-controll…
Browse files Browse the repository at this point in the history
…er-hotloop

Automatic merge from submit-queue

[Controller Manager] Fix endpoint controller hot loop and use utilruntime.HandleError to replace glog.Errorf

<!--  Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md
2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md
3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes
-->

**Why**:

Fix endpoint controller hot loop and use `utilruntime.HandleError` to replace `glog.Errorf`

**What**

1. Fix endpoint controller hot loop in `pkg/controller/endpoint`

2.  Fix endpoint controller hot loop in `contrib/mesos/pkg/service`

3. Sweep cases of `glog.Errorf` and use `utilruntime.HandleError` instead.

**Which issue this PR fixes**

Fixes #32843
Related issue is #30629 

**Special notes for your reviewer**:

@deads2k @derekwaynecarr 

The changes on `pkg/controller/endpoints_controller.go` and `contrib/mesos/pkg/service/endpoints_controller.go` are almost the same except `contrib/mesos/pkg/service/endpoints_controller.go` does not pass `podInformer` as the parameter of `NewEndpointController()`. 

So, I didn't wait `podStoreSynced` before `syncService()`(Just leave it as it was). Will it lead to a problem?
  • Loading branch information
Kubernetes Submit Queue authored Sep 18, 2016
2 parents 66d06d4 + a388833 commit 41fc0a4
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 92 deletions.
88 changes: 49 additions & 39 deletions contrib/mesos/pkg/service/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type EndpointController interface {
func NewEndpointController(client *clientset.Clientset) *endpointController {
e := &endpointController{
client: client,
queue: workqueue.New(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"),
}
e.serviceStore.Store, e.serviceController = cache.NewInformer(
&cache.ListWatch{
Expand Down Expand Up @@ -108,7 +108,7 @@ type endpointController struct {
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue *workqueue.Type
queue workqueue.RateLimitingInterface

// Since we join two objects, we'll watch both of them with
// controllers.
Expand Down Expand Up @@ -158,7 +158,7 @@ func (e *endpointController) addPod(obj interface{}) {
pod := obj.(*api.Pod)
services, err := e.getPodServiceMemberships(pod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err))
return
}
for key := range services {
Expand All @@ -176,7 +176,7 @@ func (e *endpointController) updatePod(old, cur interface{}) {
newPod := old.(*api.Pod)
services, err := e.getPodServiceMemberships(newPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err))
return
}

Expand All @@ -185,7 +185,7 @@ func (e *endpointController) updatePod(old, cur interface{}) {
if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
oldServices, err := e.getPodServiceMemberships(oldPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
utilruntime.HandleError(fmt.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err))
return
}
services = services.Union(oldServices)
Expand All @@ -207,9 +207,9 @@ func (e *endpointController) deletePod(obj interface{}) {
}
podKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
}
glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod)
glog.V(4).Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, kservice.FullServiceResyncPeriod)

// TODO: keep a map of pods to services to handle this condition.
}
Expand All @@ -218,7 +218,7 @@ func (e *endpointController) deletePod(obj interface{}) {
func (e *endpointController) enqueueService(obj interface{}) {
key, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
}

e.queue.Add(key)
Expand All @@ -229,24 +229,35 @@ func (e *endpointController) enqueueService(obj interface{}) {
// workqueue guarantees that they will not end up processing the same service
// at the same time.
func (e *endpointController) worker() {
for {
func() {
key, quit := e.queue.Get()
if quit {
return
}
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(key)
e.syncService(key.(string))
}()
for e.processNextWorkItem() {
}
}

func (e *endpointController) processNextWorkItem() bool {
eKey, quit := e.queue.Get()
if quit {
return false
}
// Use defer: in the unlikely event that there's a
// panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(eKey)

err := e.syncService(eKey.(string))
if err == nil {
e.queue.Forget(eKey)
return true
}

utilruntime.HandleError(fmt.Errorf("Sync %v failed with %v", eKey, err))
e.queue.AddRateLimited(eKey)

return true
}

// HACK(sttts): add annotations to the endpoint about the respective container ports
func (e *endpointController) syncService(key string) {
func (e *endpointController) syncService(key string) error {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
Expand All @@ -260,33 +271,32 @@ func (e *endpointController) syncService(key string) {
// doesn't completely solve the problem. See #6877.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
utilruntime.HandleError(fmt.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err))
// Don't retry, as the key isn't going to magically become understandable.
return
return nil
}
err = e.client.Endpoints(namespace).Delete(name, nil)
if err != nil && !errors.IsNotFound(err) {
glog.Errorf("Error deleting endpoint %q: %v", key, err)
e.queue.Add(key) // Retry
utilruntime.HandleError(fmt.Errorf("Error deleting endpoint %q: %v", key, err))
return err
}
return
return nil
}

service := obj.(*api.Service)
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return
return nil
}

glog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
glog.Errorf("Error syncing service %q: %v", key, err)
e.queue.Add(key) // Retry
return
utilruntime.HandleError(fmt.Errorf("Error syncing service %q: %v", key, err))
return err
}

subsets := []api.EndpointSubset{}
Expand Down Expand Up @@ -346,14 +356,13 @@ func (e *endpointController) syncService(key string) {
},
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
e.queue.Add(key) // Retry
return
utilruntime.HandleError(fmt.Errorf("Error getting endpoints: %v", err))
return err
}
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("Endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
return nil
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
Expand All @@ -374,9 +383,10 @@ func (e *endpointController) syncService(key string) {
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
}
if err != nil {
glog.Errorf("Error updating endpoints: %v", err)
e.queue.Add(key) // Retry
utilruntime.HandleError(fmt.Errorf("Error updating endpoints: %v", err))
return err
}
return nil
}

// checkLeftoverEndpoints lists all currently existing endpoints and adds their
Expand All @@ -388,14 +398,14 @@ func (e *endpointController) syncService(key string) {
func (e *endpointController) checkLeftoverEndpoints() {
list, err := e.client.Endpoints(api.NamespaceAll).List(api.ListOptions{})
if err != nil {
glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)
utilruntime.HandleError(fmt.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err))
return
}
for i := range list.Items {
ep := &list.Items[i]
key, err := keyFunc(ep)
if err != nil {
glog.Errorf("Unable to get key for endpoint %#v", ep)
utilruntime.HandleError(fmt.Errorf("Unable to get key for endpoint %#v", ep))
continue
}
e.queue.Add(key)
Expand Down
Loading

0 comments on commit 41fc0a4

Please sign in to comment.