From f5c631e22088ccceecf9d23120e2dc940cab40b0 Mon Sep 17 00:00:00 2001 From: Xiaogang Xin Date: Wed, 16 Mar 2016 16:47:30 +0800 Subject: [PATCH] kubernetes/kubernetes#23034 Fix controller-manager race condition issue which cause endpoints flush during restart --- pkg/controller/endpoint/endpoints_controller.go | 17 +++++++++++++++++ .../endpoint/endpoints_controller_test.go | 15 +++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 07491f6966fff..e19c0ce91aefb 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -47,6 +47,10 @@ const ( // often. Higher numbers = lower CPU/network load; lower numbers = // shorter amount of time before a mistaken endpoint is corrected. FullServiceResyncPeriod = 30 * time.Second + + // We must avoid syncing service until the pod store has synced. If it hasn't synced, to + // avoid a hot loop, we'll wait this long between checks. + PodStoreSyncedPollPeriod = 100 * time.Millisecond ) var ( @@ -98,6 +102,7 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller. DeleteFunc: e.deletePod, }, ) + e.podStoreSynced = e.podController.HasSynced return e } @@ -120,6 +125,9 @@ type EndpointController struct { // controllers. serviceController *framework.Controller podController *framework.Controller + // podStoreSynced returns true if the pod store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + podStoreSynced func() bool } // Runs e; will not return until stopCh is closed. workers determines how many @@ -268,6 +276,15 @@ func (e *EndpointController) syncService(key string) { defer func() { glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) }() + + if !e.podStoreSynced() { + // Sleep so we give the pod reflector goroutine a chance to run. + time.Sleep(PodStoreSyncedPollPeriod) + glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key) + e.queue.Add(key) + return + } + obj, exists, err := e.serviceStore.Store.GetByKey(key) if err != nil || !exists { // Delete the corresponding endpoint, as the service has been deleted. diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 670824a52e30b..6f5d43fd815d6 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -36,6 +36,8 @@ import ( utiltesting "k8s.io/kubernetes/pkg/util/testing" ) +var alwaysReady = func() bool { return true } + func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { for i := 0; i < nPods+nNotReady; i++ { p := &api.Pod{ @@ -107,6 +109,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, @@ -140,6 +143,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady endpoints.checkLeftoverEndpoints() if e, a := 1, endpoints.queue.Len(); e != a { @@ -169,6 +173,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady + addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -211,6 +217,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -250,6 +257,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -288,6 +296,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 0, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -326,6 +335,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -368,6 +378,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -409,6 +420,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, @@ -429,6 +441,7 @@ func TestSyncEndpointsItems(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 3, 2, 0) addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found! endpoints.serviceStore.Store.Add(&api.Service{ @@ -472,6 +485,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Store.Add(&api.Service{ @@ -533,6 +547,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { // defer testServer.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) + endpoints.podStoreSynced = alwaysReady addPods(endpoints.podStore.Store, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Store.Add(&api.Service{