Merge pull request #127451 from swetharepakula/automated-cherry-pick-of-#127417-origin-release-1.31
Automated cherry pick of #127417: bugfix: endpoints controller track resource version
This commit is contained in:
		| @@ -532,12 +532,13 @@ func (e *Controller) syncService(ctx context.Context, key string) error { | ||||
| 	} | ||||
|  | ||||
| 	logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) | ||||
| 	var updatedEndpoints *v1.Endpoints | ||||
| 	if createEndpoints { | ||||
| 		// No previous endpoints, create them | ||||
| 		_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{}) | ||||
| 	} else { | ||||
| 		// Pre-existing | ||||
| 		_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) | ||||
| 		updatedEndpoints, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{}) | ||||
| 	} | ||||
| 	if err != nil { | ||||
| 		if createEndpoints && errors.IsForbidden(err) { | ||||
| @@ -564,7 +565,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error { | ||||
| 	// If the current endpoints is updated we track the old resource version, so | ||||
| 	// if we obtain this resource version again from the lister we know is outdated | ||||
| 	// and we need to retry later to wait for the informer cache to be up-to-date. | ||||
| 	if !createEndpoints { | ||||
| 	// there are some operations (webhooks, truncated endpoints, ...) that can potentially cause endpoints updates became noop | ||||
| 	// and return the same resourceVersion. | ||||
| 	// Ref: https://issues.k8s.io/127370 , https://issues.k8s.io/126578 | ||||
| 	if updatedEndpoints != nil && updatedEndpoints.ResourceVersion != currentEndpoints.ResourceVersion { | ||||
| 		e.staleEndpointsTracker.Stale(currentEndpoints) | ||||
| 	} | ||||
| 	return nil | ||||
|   | ||||
| @@ -28,6 +28,7 @@ import ( | ||||
| 	apierrors "k8s.io/apimachinery/pkg/api/errors" | ||||
| 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
| 	"k8s.io/apimachinery/pkg/util/intstr" | ||||
| 	"k8s.io/apimachinery/pkg/util/sets" | ||||
| 	"k8s.io/apimachinery/pkg/util/wait" | ||||
| 	"k8s.io/client-go/informers" | ||||
| 	clientset "k8s.io/client-go/kubernetes" | ||||
| @@ -35,6 +36,7 @@ import ( | ||||
| 	"k8s.io/kubernetes/pkg/controller/endpoint" | ||||
| 	"k8s.io/kubernetes/test/integration/framework" | ||||
| 	"k8s.io/kubernetes/test/utils/ktesting" | ||||
| 	netutils "k8s.io/utils/net" | ||||
| ) | ||||
|  | ||||
| func TestEndpointUpdates(t *testing.T) { | ||||
| @@ -605,3 +607,199 @@ func newExternalNameService(namespace, name string) *v1.Service { | ||||
| 	svc.Spec.ExternalName = "google.com" | ||||
| 	return svc | ||||
| } | ||||
|  | ||||
| func TestEndpointTruncate(t *testing.T) { | ||||
| 	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. | ||||
| 	server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) | ||||
| 	defer server.TearDownFn() | ||||
|  | ||||
| 	client, err := clientset.NewForConfig(server.ClientConfig) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Error creating clientset: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	informers := informers.NewSharedInformerFactory(client, 0) | ||||
|  | ||||
| 	tCtx := ktesting.Init(t) | ||||
| 	epController := endpoint.NewEndpointController( | ||||
| 		tCtx, | ||||
| 		informers.Core().V1().Pods(), | ||||
| 		informers.Core().V1().Services(), | ||||
| 		informers.Core().V1().Endpoints(), | ||||
| 		client, | ||||
| 		0) | ||||
|  | ||||
| 	// Start informer and controllers | ||||
| 	informers.Start(tCtx.Done()) | ||||
| 	go epController.Run(tCtx, 1) | ||||
|  | ||||
| 	// Create namespace | ||||
| 	ns := framework.CreateNamespaceOrDie(client, "test-endpoints-truncate", t) | ||||
| 	defer framework.DeleteNamespaceOrDie(client, ns, t) | ||||
|  | ||||
| 	// Create a pod with labels | ||||
| 	basePod := &v1.Pod{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:   "test-pod", | ||||
| 			Labels: labelMap(), | ||||
| 		}, | ||||
| 		Spec: v1.PodSpec{ | ||||
| 			NodeName: "fake-node", | ||||
| 			Containers: []v1.Container{ | ||||
| 				{ | ||||
| 					Name:  "fakename", | ||||
| 					Image: "fakeimage", | ||||
| 					Ports: []v1.ContainerPort{ | ||||
| 						{ | ||||
| 							Name:          "port-443", | ||||
| 							ContainerPort: 443, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 		Status: v1.PodStatus{ | ||||
| 			Phase: v1.PodRunning, | ||||
| 			Conditions: []v1.PodCondition{ | ||||
| 				{ | ||||
| 					Type:   v1.PodReady, | ||||
| 					Status: v1.ConditionTrue, | ||||
| 				}, | ||||
| 			}, | ||||
| 			PodIP: "10.0.0.1", | ||||
| 			PodIPs: []v1.PodIP{ | ||||
| 				{ | ||||
| 					IP: "10.0.0.1", | ||||
| 				}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
|  | ||||
| 	// create 1001 Pods to reach endpoint max capacity that is set to 1000 | ||||
| 	allPodNames := sets.New[string]() | ||||
| 	baseIP := netutils.BigForIP(netutils.ParseIPSloppy("10.0.0.1")) | ||||
| 	for i := 0; i < 1001; i++ { | ||||
| 		pod := basePod.DeepCopy() | ||||
| 		pod.Name = fmt.Sprintf("%s-%d", basePod.Name, i) | ||||
| 		allPodNames.Insert(pod.Name) | ||||
| 		podIP := netutils.AddIPOffset(baseIP, i).String() | ||||
| 		pod.Status.PodIP = podIP | ||||
| 		pod.Status.PodIPs[0] = v1.PodIP{IP: podIP} | ||||
| 		createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Failed to create pod %s: %v", pod.Name, err) | ||||
| 		} | ||||
|  | ||||
| 		createdPod.Status = pod.Status | ||||
| 		_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Create a service associated to the pod | ||||
| 	svc := &v1.Service{ | ||||
| 		ObjectMeta: metav1.ObjectMeta{ | ||||
| 			Name:      "test-service", | ||||
| 			Namespace: ns.Name, | ||||
| 			Labels: map[string]string{ | ||||
| 				"foo": "bar", | ||||
| 			}, | ||||
| 		}, | ||||
| 		Spec: v1.ServiceSpec{ | ||||
| 			Selector: map[string]string{ | ||||
| 				"foo": "bar", | ||||
| 			}, | ||||
| 			Ports: []v1.ServicePort{ | ||||
| 				{Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)}, | ||||
| 			}, | ||||
| 		}, | ||||
| 	} | ||||
| 	_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Failed to create service %s: %v", svc.Name, err) | ||||
| 	} | ||||
|  | ||||
| 	var truncatedPodName string | ||||
| 	// poll until associated Endpoints to the previously created Service exists | ||||
| 	if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { | ||||
| 		podNames := sets.New[string]() | ||||
| 		endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) | ||||
| 		if err != nil { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		for _, subset := range endpoints.Subsets { | ||||
| 			for _, address := range subset.Addresses { | ||||
| 				podNames.Insert(address.TargetRef.Name) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		if podNames.Len() != 1000 { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] | ||||
| 		if !ok || truncated != "truncated" { | ||||
| 			return false, nil | ||||
| 		} | ||||
| 		// There is only 1 truncated Pod. | ||||
| 		truncatedPodName, _ = allPodNames.Difference(podNames).PopAny() | ||||
| 		return true, nil | ||||
| 	}); err != nil { | ||||
| 		t.Fatalf("endpoints not found: %v", err) | ||||
| 	} | ||||
|  | ||||
| 	// Update the truncated Pod several times to make endpoints controller resync the service. | ||||
| 	truncatedPod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, truncatedPodName, metav1.GetOptions{}) | ||||
| 	if err != nil { | ||||
| 		t.Fatalf("Failed to get pod %s: %v", truncatedPodName, err) | ||||
| 	} | ||||
| 	for i := 0; i < 10; i++ { | ||||
| 		truncatedPod.Status.Conditions[0].Status = v1.ConditionFalse | ||||
| 		truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) | ||||
| 		} | ||||
| 		truncatedPod.Status.Conditions[0].Status = v1.ConditionTrue | ||||
| 		truncatedPod, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, truncatedPod, metav1.UpdateOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("Failed to update status of pod %s: %v", truncatedPod.Name, err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// delete 501 Pods | ||||
| 	for i := 500; i < 1001; i++ { | ||||
| 		podName := fmt.Sprintf("%s-%d", basePod.Name, i) | ||||
| 		err = client.CoreV1().Pods(ns.Name).Delete(tCtx, podName, metav1.DeleteOptions{}) | ||||
| 		if err != nil { | ||||
| 			t.Fatalf("error deleting test pod: %v", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// poll until endpoints for deleted Pod are no longer in Endpoints. | ||||
| 	if err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Second, true, func(context.Context) (bool, error) { | ||||
| 		endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{}) | ||||
| 		if err != nil { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		numEndpoints := 0 | ||||
| 		for _, subset := range endpoints.Subsets { | ||||
| 			numEndpoints += len(subset.Addresses) | ||||
| 		} | ||||
|  | ||||
| 		if numEndpoints != 500 { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		truncated, ok := endpoints.Annotations[v1.EndpointsOverCapacity] | ||||
| 		if ok || truncated == "truncated" { | ||||
| 			return false, nil | ||||
| 		} | ||||
|  | ||||
| 		return true, nil | ||||
| 	}); err != nil { | ||||
| 		t.Fatalf("error checking for no endpoints with terminating pods: %v", err) | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Kubernetes Prow Robot
					Kubernetes Prow Robot