Skip to content

Commit

Permalink
Merge pull request kubernetes#23035 from xinxiaogang/xnxin-master
Browse files Browse the repository at this point in the history
Auto commit by PR queue bot
  • Loading branch information
k8s-merge-robot committed Mar 17, 2016
2 parents f1c7bd0 + f5c631e commit cda4583
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
17 changes: 17 additions & 0 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ const (
// often. Higher numbers = lower CPU/network load; lower numbers =
// shorter amount of time before a mistaken endpoint is corrected.
FullServiceResyncPeriod = 30 * time.Second

// We must avoid syncing service until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
)

var (
Expand Down Expand Up @@ -98,6 +102,7 @@ func NewEndpointController(client *clientset.Clientset, resyncPeriod controller.
DeleteFunc: e.deletePod,
},
)
e.podStoreSynced = e.podController.HasSynced

return e
}
Expand All @@ -120,6 +125,9 @@ type EndpointController struct {
// controllers.
serviceController *framework.Controller
podController *framework.Controller
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
}

// Runs e; will not return until stopCh is closed. workers determines how many
Expand Down Expand Up @@ -268,6 +276,15 @@ func (e *EndpointController) syncService(key string) {
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()

if !e.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing rc %v", key)
e.queue.Add(key)
return
}

obj, exists, err := e.serviceStore.Store.GetByKey(key)
if err != nil || !exists {
// Delete the corresponding endpoint, as the service has been deleted.
Expand Down
15 changes: 15 additions & 0 deletions pkg/controller/endpoint/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
utiltesting "k8s.io/kubernetes/pkg/util/testing"
)

var alwaysReady = func() bool { return true }

func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
for i := 0; i < nPods+nNotReady; i++ {
p := &api.Pod{
Expand Down Expand Up @@ -107,6 +109,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
Expand Down Expand Up @@ -140,6 +143,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
endpoints.checkLeftoverEndpoints()

if e, a := 1, endpoints.queue.Len(); e != a {
Expand Down Expand Up @@ -169,6 +173,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady

addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Expand Down Expand Up @@ -211,6 +217,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Expand Down Expand Up @@ -250,6 +257,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Expand Down Expand Up @@ -288,6 +296,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Expand Down Expand Up @@ -326,6 +335,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Expand Down Expand Up @@ -368,6 +378,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Expand Down Expand Up @@ -409,6 +420,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Expand All @@ -429,6 +441,7 @@ func TestSyncEndpointsItems(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down Expand Up @@ -472,6 +485,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down Expand Up @@ -533,6 +547,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
// defer testServer.Close()
client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.podStoreSynced = alwaysReady
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Store.Add(&api.Service{
Expand Down

0 comments on commit cda4583

Please sign in to comment.