Skip to content

Commit

Permalink
Fix endpoints status out-of-sync when the pod state changes rapidly
Browse files Browse the repository at this point in the history
When Pod state changes rapidly, endpoints controller may use outdated
informer cache to sync Service. If the outdated endpoints appear to be
expected by the controller, it skips updating it.

The commit fixes it by checking if endpoints informer cache is outdated
when processing a service. If the endpoints is stale, it returns an
error and retries later.

Signed-off-by: Quan Tian <quan.tian@broadcom.com>
  • Loading branch information
tnqn committed Jul 3, 2024
1 parent d2dae8f commit 0a005e1
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 13 deletions.
12 changes: 12 additions & 0 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced

e.staleEndpointsTracker = newStaleEndpointsTracker()
e.triggerTimeTracker = endpointsliceutil.NewTriggerTimeTracker()
e.eventBroadcaster = broadcaster
e.eventRecorder = recorder
Expand Down Expand Up @@ -140,6 +141,8 @@ type Controller struct {
// endpointsSynced returns true if the endpoints shared informer has been synced at least once.
// Added as a member to the struct to allow injection for testing.
endpointsSynced cache.InformerSynced
// staleEndpointsTracker can help determine if a cached Endpoints is out of date.
staleEndpointsTracker *staleEndpointsTracker

// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
Expand Down Expand Up @@ -379,6 +382,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
return err
}
e.triggerTimeTracker.DeleteService(namespace, name)
e.staleEndpointsTracker.Delete(namespace, name)
return nil
}

Expand Down Expand Up @@ -468,6 +472,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
Labels: service.Labels,
},
}
} else if e.staleEndpointsTracker.IsStale(currentEndpoints) {
return fmt.Errorf("endpoints informer cache is out of date, resource version %s already processed for endpoints %s", currentEndpoints.ResourceVersion, key)
}

createEndpoints := len(currentEndpoints.ResourceVersion) == 0
Expand Down Expand Up @@ -550,6 +556,12 @@ func (e *Controller) syncService(ctx context.Context, key string) error {

return err
}
// If the current endpoints is updated we track the old resource version, so
// if we obtain this resource version again from the lister we know is outdated
// and we need to retry later to wait for the informer cache to be up-to-date.
if !createEndpoints {
e.staleEndpointsTracker.Stale(currentEndpoints)
}
return nil
}

Expand Down
116 changes: 103 additions & 13 deletions pkg/controller/endpoint/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,10 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest
return httptest.NewServer(mux), &fakeEndpointsHandler
}

// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
// be sent in the response.
func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
// makeBlockingEndpointTestServer will signal the blockNextAction channel on endpoint "POST", "PUT", and "DELETE"
// requests. "POST" and "PUT" requests will wait on a blockUpdate signal if provided, while "DELETE" requests will wait
// on a blockDelete signal if provided. If controller is nil, an error will be sent in the response.
func makeBlockingEndpointTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockUpdate, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {

handlerFunc := func(res http.ResponseWriter, req *http.Request) {
if controller == nil {
Expand All @@ -171,23 +171,37 @@ func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointCont
return
}

if req.Method == "POST" {
controller.endpointsStore.Add(endpoint)
if req.Method == "POST" || req.Method == "PUT" {
if blockUpdate != nil {
go func() {
// Delay the update of endpoints to make endpoints cache out of sync
<-blockUpdate
_ = controller.endpointsStore.Add(endpoint)
}()
} else {
_ = controller.endpointsStore.Add(endpoint)
}
blockNextAction <- struct{}{}
}

if req.Method == "DELETE" {
go func() {
// Delay the deletion of endoints to make endpoint cache out of sync
<-blockDelete
controller.endpointsStore.Delete(endpoint)
if blockDelete != nil {
go func() {
// Delay the deletion of endpoints to make endpoints cache out of sync
<-blockDelete
_ = controller.endpointsStore.Delete(endpoint)
controller.onEndpointsDelete(endpoint)
}()
} else {
_ = controller.endpointsStore.Delete(endpoint)
controller.onEndpointsDelete(endpoint)
}()
}
blockNextAction <- struct{}{}
}

res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusOK)
res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
_, _ = res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), endpoint)))
}

mux := http.NewServeMux()
Expand Down Expand Up @@ -2247,7 +2261,7 @@ func TestMultipleServiceChanges(t *testing.T) {
blockDelete := make(chan struct{})
blockNextAction := make(chan struct{})
stopChan := make(chan struct{})
testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
testServer := makeBlockingEndpointTestServer(t, controller, endpoint, nil, blockDelete, blockNextAction, ns)
defer testServer.Close()

*controller = *newController(testServer.URL, 0*time.Second)
Expand Down Expand Up @@ -2291,6 +2305,82 @@ func TestMultipleServiceChanges(t *testing.T) {
close(stopChan)
}

// TestMultiplePodChanges tests that endpoints that are not updated because of an out of sync endpoints cache are
// eventually resynced after multiple Pod changes.
func TestMultiplePodChanges(t *testing.T) {
ns := metav1.NamespaceDefault

readyEndpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
}},
}
notReadyEndpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "2"},
Subsets: []v1.EndpointSubset{{
NotReadyAddresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
},
Ports: []v1.EndpointPort{{Port: 8080, Protocol: v1.ProtocolTCP}},
}},
}

controller := &endpointController{}
blockUpdate := make(chan struct{})
blockNextAction := make(chan struct{})
stopChan := make(chan struct{})
testServer := makeBlockingEndpointTestServer(t, controller, notReadyEndpoints, blockUpdate, nil, blockNextAction, ns)
defer testServer.Close()

*controller = *newController(testServer.URL, 0*time.Second)
pod := testPod(ns, 0, 1, true, ipv4only)
_ = controller.podStore.Add(pod)
_ = controller.endpointsStore.Add(readyEndpoints)
_ = controller.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: "10.0.0.1",
Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt32(8080)}},
},
})

go func() { controller.Run(context.TODO(), 1) }()

// Rapidly update the Pod: Ready -> NotReady -> Ready.
pod2 := pod.DeepCopy()
pod2.ResourceVersion = "2"
pod2.Status.Conditions[0].Status = v1.ConditionFalse
_ = controller.podStore.Update(pod2)
controller.updatePod(pod, pod2)
// blockNextAction should eventually unblock once server gets endpoints request.
waitForChanReceive(t, 1*time.Second, blockNextAction, "Pod Update should have caused a request to be sent to the test server")
// The endpoints update hasn't been applied to the cache yet.
pod3 := pod.DeepCopy()
pod3.ResourceVersion = "3"
pod3.Status.Conditions[0].Status = v1.ConditionTrue
_ = controller.podStore.Update(pod3)
controller.updatePod(pod2, pod3)
// It shouldn't get endpoints request as the endpoints in the cache is out-of-date.
timer := time.NewTimer(100 * time.Millisecond)
select {
case <-timer.C:
case <-blockNextAction:
t.Errorf("Pod Update shouldn't have caused a request to be sent to the test server")
}

// Applying the endpoints update to the cache should cause test server to update endpoints.
close(blockUpdate)
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoints should have been updated")

close(blockNextAction)
close(stopChan)
}

func TestSyncServiceAddresses(t *testing.T) {
makeService := func(tolerateUnready bool) *v1.Service {
return &v1.Service{
Expand Down
64 changes: 64 additions & 0 deletions pkg/controller/endpoint/endpoints_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endpoint

import (
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)

// staleEndpointsTracker tracks Endpoints and their stale resource versions to
// help determine if an Endpoints is stale.
type staleEndpointsTracker struct {
// lock protects staleResourceVersionByEndpoints.
lock sync.RWMutex
// staleResourceVersionByEndpoints tracks the stale resource version of Endpoints.
staleResourceVersionByEndpoints map[types.NamespacedName]string
}

func newStaleEndpointsTracker() *staleEndpointsTracker {
return &staleEndpointsTracker{
staleResourceVersionByEndpoints: map[types.NamespacedName]string{},
}
}

func (t *staleEndpointsTracker) Stale(endpoints *v1.Endpoints) {
t.lock.Lock()
defer t.lock.Unlock()
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
t.staleResourceVersionByEndpoints[nn] = endpoints.ResourceVersion
}

func (t *staleEndpointsTracker) IsStale(endpoints *v1.Endpoints) bool {
t.lock.RLock()
defer t.lock.RUnlock()
nn := types.NamespacedName{Name: endpoints.Name, Namespace: endpoints.Namespace}
staleResourceVersion, exists := t.staleResourceVersionByEndpoints[nn]
if exists && staleResourceVersion == endpoints.ResourceVersion {
return true
}
return false
}

func (t *staleEndpointsTracker) Delete(namespace, name string) {
t.lock.Lock()
defer t.lock.Unlock()
nn := types.NamespacedName{Namespace: namespace, Name: name}
delete(t.staleResourceVersionByEndpoints, nn)
}
54 changes: 54 additions & 0 deletions pkg/controller/endpoint/endpoints_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package endpoint

import (
"testing"

"github.com/stretchr/testify/assert"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func TestStaleEndpointsTracker(t *testing.T) {
ns := metav1.NamespaceDefault
tracker := newStaleEndpointsTracker()

endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
Ports: []v1.EndpointPort{{Port: 1000}},
}},
}

assert.False(t, tracker.IsStale(endpoints), "IsStale should return false before the endpoint is staled")

tracker.Stale(endpoints)
assert.True(t, tracker.IsStale(endpoints), "IsStale should return true after the endpoint is staled")

endpoints.ResourceVersion = "2"
assert.False(t, tracker.IsStale(endpoints), "IsStale should return false after the endpoint is updated")

tracker.Delete(endpoints.Namespace, endpoints.Name)
assert.Empty(t, tracker.staleResourceVersionByEndpoints)
}
Loading

0 comments on commit 0a005e1

Please sign in to comment.