Merge pull request #112895 from nokia/kep-1435-GA

KEP-1435 Mixed Protocol values in LoadBalancer Service GA
This commit is contained in:
Kubernetes Prow Robot
2022-11-03 05:43:35 -07:00
committed by GitHub
11 changed files with 350 additions and 522 deletions

View File

@@ -24,6 +24,9 @@ import (
// PortsByPodUID is a map that maps pod UID to container ports.
type PortsByPodUID map[types.UID][]int
// FullPortsByPodUID is a map that maps pod UID to container ports.
type FullPortsByPodUID map[types.UID][]v1.ContainerPort
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
m := PortsByPodUID{}
@@ -40,3 +43,24 @@ func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID {
}
return m
}
// GetFullContainerPortsByPodUID returns a FullPortsByPodUID map on the given endpoints with all the port data.
func GetFullContainerPortsByPodUID(ep *v1.Endpoints) FullPortsByPodUID {
m := FullPortsByPodUID{}
for _, ss := range ep.Subsets {
for _, port := range ss.Ports {
containerPort := v1.ContainerPort{
Name: port.Name,
ContainerPort: port.Port,
Protocol: port.Protocol,
}
for _, addr := range ss.Addresses {
if _, ok := m[addr.TargetRef.UID]; !ok {
m[addr.TargetRef.UID] = make([]v1.ContainerPort, 0)
}
m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], containerPort)
}
}
}
return m
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package endpointslice
import (
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/types"
)
@@ -24,6 +25,9 @@ import (
// PortsByPodUID is a map that maps pod UID to container ports.
type PortsByPodUID map[types.UID][]int
// FullPortsByPodUID is a map that maps pod UID to container ports.
type FullPortsByPodUID map[types.UID][]v1.ContainerPort
// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) PortsByPodUID {
m := PortsByPodUID{}
@@ -44,3 +48,28 @@ func GetContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) PortsByPodUID {
}
return m
}
// GetFullContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints.
func GetFullContainerPortsByPodUID(eps []discoveryv1.EndpointSlice) FullPortsByPodUID {
m := FullPortsByPodUID{}
for _, es := range eps {
for _, port := range es.Ports {
if port.Port == nil {
continue
}
containerPort := v1.ContainerPort{
Name: *port.Name,
ContainerPort: *port.Port,
Protocol: *port.Protocol,
}
for _, ep := range es.Endpoints {
if _, ok := m[ep.TargetRef.UID]; !ok {
m[ep.TargetRef.UID] = make([]v1.ContainerPort, 0)
}
m[ep.TargetRef.UID] = append(m[ep.TargetRef.UID], containerPort)
}
}
}
return m
}

View File

@@ -1030,6 +1030,8 @@ func (j *TestJig) CheckServiceReachability(svc *v1.Service, pod *v1.Pod) error {
return j.checkNodePortServiceReachability(svc, pod)
case v1.ServiceTypeExternalName:
return j.checkExternalServiceReachability(svc, pod)
case v1.ServiceTypeLoadBalancer:
return j.checkClusterIPServiceReachability(svc, pod)
default:
return fmt.Errorf("unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type", svcType, svc.Name)
}
@@ -1065,3 +1067,22 @@ func (j *TestJig) CreateSCTPServiceWithPort(tweak func(svc *v1.Service), port in
}
return j.sanityCheckService(result, svc.Spec.Type)
}
// CreateLoadBalancerServiceWaitForClusterIPOnly creates a loadbalancer service and waits
// for it to acquire a cluster IP
func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(tweak func(svc *v1.Service)) (*v1.Service, error) {
ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
svc.Spec.Type = v1.ServiceTypeLoadBalancer
// We need to turn affinity off for our LB distribution tests
svc.Spec.SessionAffinity = v1.ServiceAffinityNone
if tweak != nil {
tweak(svc)
}
result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
if err != nil {
return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %v", svc.Name, err)
}
return j.sanityCheckService(result, v1.ServiceTypeLoadBalancer)
}

View File

@@ -125,6 +125,12 @@ type portsByPodName map[string][]int
// portsByPodUID is a map that maps pod name to container ports.
type portsByPodUID map[types.UID][]int
// fullPortsByPodName is a map that maps pod name to container ports including their protocols.
type fullPortsByPodName map[string][]v1.ContainerPort
// fullPortsByPodUID is a map that maps pod name to container ports.
type fullPortsByPodUID map[types.UID][]v1.ContainerPort
// affinityCheckFromPod returns interval, timeout and function pinging the service and
// returning pinged hosts for pinging the service from execPod.
func affinityCheckFromPod(execPod *v1.Pod, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) {
@@ -3731,6 +3737,77 @@ var _ = common.SIGDescribe("Services", func() {
framework.Logf("Collection of services has been deleted")
})
/*
Release: v1.26
Testname: Service, same ports with different protocols on a Load Balancer Service
Description: Create a LoadBalancer service with two ports that have the same value but use different protocols. Add a Pod that listens on both ports. The Pod must be reachable via the ClusterIP and both ports
*/
ginkgo.It("should serve endpoints on same port and different protocol for internal traffic on Type LoadBalancer ", func() {
serviceName := "multiprotocol-lb-test"
ns := f.Namespace.Name
jig := e2eservice.NewTestJig(cs, ns, serviceName)
defer func() {
err := cs.CoreV1().Services(ns).Delete(context.TODO(), serviceName, metav1.DeleteOptions{})
framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
}()
svc1port := "svc1"
svc2port := "svc2"
ginkgo.By("creating service " + serviceName + " in namespace " + ns)
svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(service *v1.Service) {
service.Spec.Ports = []v1.ServicePort{
{
Name: "portname1",
Port: 80,
TargetPort: intstr.FromString(svc1port),
Protocol: v1.ProtocolTCP,
},
{
Name: "portname2",
Port: 80,
TargetPort: intstr.FromString(svc2port),
Protocol: v1.ProtocolUDP,
},
}
})
framework.ExpectNoError(err)
containerPort := 100
names := map[string]bool{}
defer func() {
for name := range names {
err := cs.CoreV1().Pods(ns).Delete(context.TODO(), name, metav1.DeleteOptions{})
framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
}
}()
containerPorts := []v1.ContainerPort{
{
Name: svc1port,
ContainerPort: int32(containerPort),
Protocol: v1.ProtocolTCP,
},
{
Name: svc2port,
ContainerPort: int32(containerPort),
Protocol: v1.ProtocolUDP,
},
}
podname1 := "pod1"
createPodOrFail(f, ns, podname1, jig.Labels, containerPorts, "netexec", "--http-port", strconv.Itoa(containerPort), "--udp-port", strconv.Itoa(containerPort))
validateEndpointsPortsWithProtocolsOrFail(cs, ns, serviceName, fullPortsByPodName{podname1: containerPorts})
ginkgo.By("Checking if the Service forwards traffic to pods")
execPod := e2epod.CreateExecPodOrFail(cs, ns, "execpod", nil)
err = jig.CheckServiceReachability(svc, execPod)
framework.ExpectNoError(err)
e2epod.DeletePodOrFail(cs, ns, podname1)
})
})
@@ -4164,6 +4241,108 @@ func restartComponent(cs clientset.Interface, cName, ns string, matchLabels map[
return err
}
// validateEndpointsPortsWithProtocolsOrFail validates that the given service exists and is served by the given expectedEndpoints.
func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints fullPortsByPodName) {
ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
expectedPortsByPodUID, err := translatePortsByPodNameToPortsByPodUID(c, namespace, expectedEndpoints)
framework.ExpectNoError(err, "failed to translate pod name to UID, ns:%s, expectedEndpoints:%v", namespace, expectedEndpoints)
var (
pollErr error
i = 0
)
if pollErr = wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) {
i++
ep, err := c.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
framework.Logf("Failed go get Endpoints object: %v", err)
// Retry the error
return false, nil
}
portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep))
if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
}
return false, nil
}
// If EndpointSlice API is enabled, then validate if appropriate EndpointSlice objects
// were also create/updated/deleted.
if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()); err == nil {
opts := metav1.ListOptions{
LabelSelector: "kubernetes.io/service-name=" + serviceName,
}
es, err := c.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), opts)
if err != nil {
framework.Logf("Failed go list EndpointSlice objects: %v", err)
// Retry the error
return false, nil
}
portsByUID = fullPortsByPodUID(e2eendpointslice.GetFullContainerPortsByPodUID(es.Items))
if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
if i%5 == 0 {
framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
}
return false, nil
}
}
framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v",
serviceName, namespace, expectedEndpoints)
return true, nil
}); pollErr != nil {
if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}); err == nil {
for _, pod := range pods.Items {
framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
}
} else {
framework.Logf("Can't list pod debug info: %v", err)
}
}
framework.ExpectNoError(pollErr, "error waithing for service %s in namespace %s to expose endpoints %v: %v", serviceName, namespace, expectedEndpoints)
}
func translatePortsByPodNameToPortsByPodUID(c clientset.Interface, ns string, expectedEndpoints fullPortsByPodName) (fullPortsByPodUID, error) {
portsByUID := make(fullPortsByPodUID)
for name, portList := range expectedEndpoints {
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
}
portsByUID[pod.ObjectMeta.UID] = portList
}
return portsByUID, nil
}
func validatePortsAndProtocols(ep, expectedEndpoints fullPortsByPodUID) error {
if len(ep) != len(expectedEndpoints) {
// should not happen because we check this condition before
return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
}
for podUID := range expectedEndpoints {
if _, ok := ep[podUID]; !ok {
return fmt.Errorf("endpoint %v not found", podUID)
}
if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
}
var match bool
for _, epPort := range ep[podUID] {
match = false
for _, expectedPort := range expectedEndpoints[podUID] {
if epPort.ContainerPort == expectedPort.ContainerPort && epPort.Protocol == expectedPort.Protocol {
match = true
}
}
if !match {
return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
}
}
}
return nil
}
var _ = common.SIGDescribe("SCTP [LinuxOnly]", func() {
f := framework.NewDefaultFramework("sctp")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged

View File

@@ -381,6 +381,59 @@ func Test_UpdateLoadBalancerWithLoadBalancerClass(t *testing.T) {
}
}
// Test_ServiceLoadBalancerMixedProtocolSetup tests that a LoadBalancer Service with different protocol values
// can be created.
func Test_ServiceLoadBalancerMixedProtocolSetup(t *testing.T) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd())
defer server.TearDownFn()
client, err := clientset.NewForConfig(server.ClientConfig)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
ns := framework.CreateNamespaceOrDie(client, "test-service-mixed-protocols", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
controller, cloud, informer := newServiceController(t, client)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
informer.Start(ctx.Done())
go controller.Run(ctx, 1, controllersmetrics.NewControllerManagerMetrics("loadbalancer-test"))
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
},
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeLoadBalancer,
Ports: []corev1.ServicePort{
{
Name: "tcpport",
Port: int32(53),
Protocol: corev1.ProtocolTCP,
},
{
Name: "udpport",
Port: int32(53),
Protocol: corev1.ProtocolUDP,
},
},
},
}
_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating test service: %v", err)
}
time.Sleep(5 * time.Second) // sleep 5 second to wait for the service controller reconcile
if len(cloud.Calls) == 0 {
t.Errorf("expected cloud provider calls to create load balancer")
}
}
func newServiceController(t *testing.T, client *clientset.Clientset) (*servicecontroller.Controller, *fakecloud.Cloud, informers.SharedInformerFactory) {
cloud := &fakecloud.Cloud{}
informerFactory := informers.NewSharedInformerFactory(client, 0)