Skip to content

Commit

Permalink
bugfix: endpoints controller track resource version conrrectly
Browse files Browse the repository at this point in the history
The endpoints controller store the resource version of the previous
Endpoints objects to avoid issues related to stale information on the
cache.

However, there can be update operations that succeed without increasing
the resource version, causing the endpoints controller to declare stale
the existing Resource Version and stopping the Endpoints to be updated.

Co-Author-By: Quan Tian <quan.tian@broadcom.com>
Co-Author-By: Yang Yang <yyyng@amazon.com>
  • Loading branch information
M00nF1sh authored and swetharepakula committed Sep 18, 2024
1 parent 7ac76f7 commit 6226e65
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 2 deletions.
8 changes: 6 additions & 2 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
}

logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
var updatedEndpoints *v1.Endpoints
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
}
if err != nil {
if createEndpoints && errors.IsForbidden(err) {
Expand All @@ -559,7 +560,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// If the current endpoints is updated we track the old resource version, so
// if we obtain this resource version again from the lister we know is outdated
// and we need to retry later to wait for the informer cache to be up-to-date.
if !createEndpoints {
// there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop
// and return the same resourceVersion.
// Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578
if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion {
e.staleEndpointsTracker.Stale(currentEndpoints)
}
return nil
Expand Down
198 changes: 198 additions & 0 deletions test/integration/endpoints/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/test/integration/framework"
netutils "k8s.io/utils/net"
)

func TestEndpointUpdates(t *testing.T) {
Expand Down Expand Up @@ -604,3 +606,199 @@ func newExternalNameService(namespace, name string) *v1.Service {
svc.Spec.ExternalName = "google.com"
return svc
}

func TestEndpointTruncate(t *testing.T) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn()

client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}

informers := informers.NewSharedInformerFactory(client, 0)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epController := endpoint.NewEndpointController(
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
client,
0)

// Start informer and controllers
informers.Start(ctx.Done())
go epController.Run(ctx, 1)

// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)

// Create a pod with labels
basePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: labelMap(),
},
Spec: v1.PodSpec{
NodeName: "fake-node",
Containers: []v1.Container{
{
Name: "fakename",
Image: "fakeimage",
Ports: []v1.ContainerPort{
{
Name: "port-443",
ContainerPort: 443,
},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.1",
},
},
},
}

// create 1001 Pods to reach endpoint max capacity that is set to 1000
allPodNames := sets.New[string]()
baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1"))
for i := 0; i < 1001; i++ {
pod := basePod.DeepCopy()
pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i)
allPodNames.Insert(pod.Name)
podIP := netutils.AddIPOffset(baseIP, i).String()
pod.Status.PodIP = podIP
pod.Status.PodIPs[0] = v1.PodIP{IP: podIP}
createdPod, err := client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}

createdPod.Status = pod.Status
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, createdPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
}

// Create a service associated to the pod
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: ns.Name,
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
},
Ports: []v1.ServicePort{
{Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
},
},
}
_, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}

var truncatedPodName string
// poll until associated Endpoints to the previously created Service exists
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
podNames := sets.New[string]()
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}

for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
podNames.Insert(address.TargetRef.Name)
}
}

if podNames.Len() != 1000 {
return false, nil
}

truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
if !ok || truncated != "truncated" {
return false, nil
}
// There is only 1 truncated Pod.
truncatedPodName, _ = allPodNames.Difference(podNames).PopAny()
return true, nil
}); err != nil {
t.Fatalf("endpoints not found: %v", err)
}

// Update the truncated Pod several times to make endpoints controller resync the service.
truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(ctx, truncatedPodName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err)
}
for i := 0; i < 10; i++ {
truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, truncatedPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
}
truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, truncatedPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
}
}

// delete 501 Pods
for i := 500; i < 1001; i++ {
podName := fmt.Sprintf("%s-%d", basePod.Name, i)
err = client.CoreV1().Pods(ns.Name).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("error deleting test pod: %v", err)
}
}

// poll until endpoints for deleted Pod are no longer in Endpoints.
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}

numEndpoints := 0
for _, subset := range endpoints.Subsets {
numEndpoints += len(subset.Addresses)
}

if numEndpoints != 500 {
return false, nil
}

truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
if ok || truncated == "truncated" {
return false, nil
}

return true, nil
}); err != nil {
t.Fatalf("error checking for no endpoints with terminating pods: %v", err)
}
}

0 comments on commit 6226e65

Please sign in to comment.