Skip to content

Commit

Permalink
Merge pull request kubernetes#127454 from swetharepakula/automated-ch…
Browse files Browse the repository at this point in the history
…erry-pick-of-#127417-origin-release-1.28

Automated cherry pick of kubernetes#127417: bugfix: endpoints controller track resource version
  • Loading branch information
k8s-ci-robot authored Sep 19, 2024
2 parents 7ac76f7 + 6226e65 commit f1a5de9
Show file tree
Hide file tree
Showing 2 changed files with 204 additions and 2 deletions.
8 changes: 6 additions & 2 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
}

logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
var updatedEndpoints *v1.Endpoints
if createEndpoints {
// No previous endpoints, create them
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
} else {
// Pre-existing
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
}
if err != nil {
if createEndpoints && errors.IsForbidden(err) {
Expand All @@ -559,7 +560,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// If the current endpoints is updated we track the old resource version, so
// if we obtain this resource version again from the lister we know is outdated
// and we need to retry later to wait for the informer cache to be up-to-date.
if !createEndpoints {
// there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop
// and return the same resourceVersion.
// Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578
if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion {
e.staleEndpointsTracker.Stale(currentEndpoints)
}
return nil
Expand Down
198 changes: 198 additions & 0 deletions test/integration/endpoints/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/test/integration/framework"
netutils "k8s.io/utils/net"
)

func TestEndpointUpdates(t *testing.T) {
Expand Down Expand Up @@ -604,3 +606,199 @@ func newExternalNameService(namespace, name string) *v1.Service {
svc.Spec.ExternalName = "google.com"
return svc
}

func TestEndpointTruncate(t *testing.T) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
defer server.TearDownFn()

client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}

informers := informers.NewSharedInformerFactory(client, 0)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
epController := endpoint.NewEndpointController(
informers.Core().V1().Pods(),
informers.Core().V1().Services(),
informers.Core().V1().Endpoints(),
client,
0)

// Start informer and controllers
informers.Start(ctx.Done())
go epController.Run(ctx, 1)

// Create namespace
ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)

// Create a pod with labels
basePod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Labels: labelMap(),
},
Spec: v1.PodSpec{
NodeName: "fake-node",
Containers: []v1.Container{
{
Name: "fakename",
Image: "fakeimage",
Ports: []v1.ContainerPort{
{
Name: "port-443",
ContainerPort: 443,
},
},
},
},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
Conditions: []v1.PodCondition{
{
Type: v1.PodReady,
Status: v1.ConditionTrue,
},
},
PodIP: "10.0.0.1",
PodIPs: []v1.PodIP{
{
IP: "10.0.0.1",
},
},
},
}

// create 1001 Pods to reach endpoint max capacity that is set to 1000
allPodNames := sets.New[string]()
baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1"))
for i := 0; i < 1001; i++ {
pod := basePod.DeepCopy()
pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i)
allPodNames.Insert(pod.Name)
podIP := netutils.AddIPOffset(baseIP, i).String()
pod.Status.PodIP = podIP
pod.Status.PodIPs[0] = v1.PodIP{IP: podIP}
createdPod, err := client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
}

createdPod.Status = pod.Status
_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, createdPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
}
}

// Create a service associated to the pod
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: ns.Name,
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"foo": "bar",
},
Ports: []v1.ServicePort{
{Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
},
},
}
_, err = client.CoreV1().Services(ns.Name).Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Failed to create service %s: %v", svc.Name, err)
}

var truncatedPodName string
// poll until associated Endpoints to the previously created Service exists
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
podNames := sets.New[string]()
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}

for _, subset := range endpoints.Subsets {
for _, address := range subset.Addresses {
podNames.Insert(address.TargetRef.Name)
}
}

if podNames.Len() != 1000 {
return false, nil
}

truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
if !ok || truncated != "truncated" {
return false, nil
}
// There is only 1 truncated Pod.
truncatedPodName, _ = allPodNames.Difference(podNames).PopAny()
return true, nil
}); err != nil {
t.Fatalf("endpoints not found: %v", err)
}

// Update the truncated Pod several times to make endpoints controller resync the service.
truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(ctx, truncatedPodName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err)
}
for i := 0; i < 10; i++ {
truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, truncatedPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
}
truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue
truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(ctx, truncatedPod, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err)
}
}

// delete 501 Pods
for i := 500; i < 1001; i++ {
podName := fmt.Sprintf("%s-%d", basePod.Name, i)
err = client.CoreV1().Pods(ns.Name).Delete(ctx, podName, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("error deleting test pod: %v", err)
}
}

// poll until endpoints for deleted Pod are no longer in Endpoints.
if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) {
endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(ctx, svc.Name, metav1.GetOptions{})
if err != nil {
return false, nil
}

numEndpoints := 0
for _, subset := range endpoints.Subsets {
numEndpoints += len(subset.Addresses)
}

if numEndpoints != 500 {
return false, nil
}

truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity]
if ok || truncated == "truncated" {
return false, nil
}

return true, nil
}); err != nil {
t.Fatalf("error checking for no endpoints with terminating pods: %v", err)
}
}

0 comments on commit f1a5de9

Please sign in to comment.