From 650f6cb8260986701c4cbab0cf44caaa3cca74e2 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 23 Feb 2015 13:53:21 -0800 Subject: [PATCH] Revert "Multi-port Endpoints" --- cmd/integration/integration.go | 2 +- pkg/api/testing/fuzzer.go | 10 +- pkg/api/types.go | 30 +- pkg/api/v1beta1/conversion.go | 24 +- pkg/api/v1beta1/conversion_test.go | 26 +- pkg/api/v1beta1/defaults.go | 4 +- pkg/api/v1beta1/defaults_test.go | 14 +- pkg/api/v1beta1/register.go | 6 - pkg/api/v1beta1/types.go | 6 +- pkg/api/v1beta2/conversion.go | 24 +- pkg/api/v1beta2/conversion_test.go | 26 +- pkg/api/v1beta2/defaults.go | 4 +- pkg/api/v1beta2/defaults_test.go | 14 +- pkg/api/v1beta2/register.go | 6 - pkg/api/v1beta2/types.go | 6 +- pkg/api/v1beta3/defaults.go | 12 +- pkg/api/v1beta3/defaults_test.go | 22 +- pkg/api/v1beta3/types.go | 24 +- pkg/api/validation/validation.go | 6 +- pkg/api/validation/validation_test.go | 32 +- pkg/client/client_test.go | 4 +- pkg/constraint/constraint_test.go | 2 +- pkg/kubectl/resource_printer.go | 11 +- pkg/kubectl/resource_printer_test.go | 5 +- pkg/kubectl/run.go | 2 +- pkg/kubectl/run_test.go | 2 +- pkg/kubelet/handlers_test.go | 4 +- pkg/kubelet/kubelet.go | 4 +- pkg/kubelet/kubelet_test.go | 18 +- pkg/kubelet/probe_test.go | 6 +- pkg/master/publish.go | 22 +- pkg/proxy/config/api_test.go | 4 +- pkg/proxy/loadbalancer.go | 8 +- pkg/proxy/proxier.go | 9 +- pkg/proxy/proxier_test.go | 20 +- pkg/proxy/roundrobin.go | 305 +++++-------- pkg/proxy/roundrobin_test.go | 550 ++++++++--------------- pkg/registry/controller/rest_test.go | 2 +- pkg/registry/endpoint/rest_test.go | 14 +- pkg/registry/etcd/etcd_test.go | 22 +- pkg/registry/pod/etcd/etcd_test.go | 10 +- pkg/registry/service/rest.go | 49 +- pkg/registry/service/rest_test.go | 2 +- pkg/scheduler/scheduler_test.go | 4 +- pkg/service/endpoints_controller.go | 26 +- pkg/service/endpoints_controller_test.go | 31 +- pkg/tools/etcd_tools_watch_test.go | 8 +- test/e2e/events.go | 2 +- test/e2e/networking.go | 2 +- test/e2e/pods.go | 6 +- test/e2e/rc.go | 2 +- test/e2e/service.go | 9 +- 52 files changed, 525 insertions(+), 938 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 417081fbb05..08b6eaecb02 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -538,7 +538,7 @@ func runServiceTest(client *client.Client) { { Name: "c1", Image: "foo", - Ports: []api.ContainerPort{ + Ports: []api.Port{ {ContainerPort: 1234}, }, ImagePullPolicy: "PullIfNotPresent", diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index 6918c4df28a..dbb0e05e2d3 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -24,8 +24,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -240,13 +238,7 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer { func(ep *api.Endpoint, c fuzz.Continue) { // TODO: If our API used a particular type for IP fields we could just catch that here. ep.IP = fmt.Sprintf("%d.%d.%d.%d", c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256)) - // TODO: Once we drop single-port APIs, make this fuzz - // multiple ports and fuzz port.name. This will force - // a compile error when those APIs are deleted. - _ = v1beta1.Dependency - _ = v1beta2.Dependency - ep.Ports = []api.EndpointPort{{Name: "", Port: c.Rand.Intn(65536)}} - c.Fuzz(&ep.Ports[0].Protocol) + ep.Port = c.Rand.Intn(65536) }, ) return f diff --git a/pkg/api/types.go b/pkg/api/types.go index 81ff553fc7b..5f8607e40c0 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -239,8 +239,8 @@ type SecretVolumeSource struct { Target ObjectReference `json:"target"` } -// ContainerPort represents a network port in a single container -type ContainerPort struct { +// Port represents a network port in a single container +type Port struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty"` @@ -346,9 +346,9 @@ type Container struct { // Optional: Defaults to whatever is defined in the image. Command []string `json:"command,omitempty"` // Optional: Defaults to Docker's default. - WorkingDir string `json:"workingDir,omitempty"` - Ports []ContainerPort `json:"ports,omitempty"` - Env []EnvVar `json:"env,omitempty"` + WorkingDir string `json:"workingDir,omitempty"` + Ports []Port `json:"ports,omitempty"` + Env []EnvVar `json:"env,omitempty"` // Compute resource requirements. Resources ResourceRequirements `json:"resources,omitempty"` VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"` @@ -750,6 +750,9 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` Endpoints []Endpoint `json:"endpoints,omitempty"` } @@ -759,21 +762,8 @@ type Endpoint struct { // TODO: This should allow hostname or IP, see #4447. IP string `json:"ip"` - // The ports exposed on this IP. - Ports []EndpointPort -} - -type EndpointPort struct { - // Optional if only one port is defined in this Endpoint. - // The name of this port within the larger service/endpoint structure. - // This must be a DNS_LABEL. - Name string - - // The IP protocol for this port. Supports "TCP" and "UDP". - Protocol Protocol - - // The destination port to access. - Port int + // Required: The destination port to access. + Port int `json:"port"` } // EndpointsList is a list of endpoints. diff --git a/pkg/api/v1beta1/conversion.go b/pkg/api/v1beta1/conversion.go index eaa787d783a..6ba485bdbe9 100644 --- a/pkg/api/v1beta1/conversion.go +++ b/pkg/api/v1beta1/conversion.go @@ -1123,16 +1123,12 @@ func init() { if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { return err } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } for i := range in.Endpoints { ep := &in.Endpoints[i] - // newer.Endpoints.Endpoints[i].Ports is an array - take the first one. - if len(ep.Ports) > 0 { - port := &ep.Ports[0] - if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil { - return err - } - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port))) - } + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) } return nil }, @@ -1143,20 +1139,22 @@ func init() { if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { return err } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } for i := range in.Endpoints { + out.Endpoints = append(out.Endpoints, newer.Endpoint{}) + ep := &out.Endpoints[i] host, port, err := net.SplitHostPort(in.Endpoints[i]) if err != nil { return err } + ep.IP = host pn, err := strconv.Atoi(port) if err != nil { return err } - epp := newer.EndpointPort{Port: pn} - if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil { - return err - } - out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}}) + ep.Port = pn } return nil }, diff --git a/pkg/api/v1beta1/conversion_test.go b/pkg/api/v1beta1/conversion_test.go index f5507ce6065..df0009b4198 100644 --- a/pkg/api/v1beta1/conversion_test.go +++ b/pkg/api/v1beta1/conversion_test.go @@ -390,35 +390,41 @@ func TestEndpointsConversion(t *testing.T) { }{ { given: current.Endpoints{ - Protocol: "", + TypeMeta: current.TypeMeta{ + ID: "empty", + }, + Protocol: current.ProtocolTCP, Endpoints: []string{}, }, expected: newer.Endpoints{ + Protocol: newer.ProtocolTCP, Endpoints: []newer.Endpoint{}, }, }, { given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "one", + }, Protocol: current.ProtocolTCP, Endpoints: []string{"1.2.3.4:88"}, }, expected: newer.Endpoints{ - Endpoints: []newer.Endpoint{ - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}}, - }, + Protocol: newer.ProtocolTCP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, }, }, { given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "several", + }, Protocol: current.ProtocolUDP, Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, }, expected: newer.Endpoints{ - Endpoints: []newer.Endpoint{ - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}}, - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}}, - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}}, - }, + Protocol: newer.ProtocolUDP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, }, }, } @@ -430,7 +436,7 @@ func TestEndpointsConversion(t *testing.T) { t.Errorf("[Case: %d] Unexpected error: %v", i, err) continue } - if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) } diff --git a/pkg/api/v1beta1/defaults.go b/pkg/api/v1beta1/defaults.go index a980ea50afe..7c9042dea32 100644 --- a/pkg/api/v1beta1/defaults.go +++ b/pkg/api/v1beta1/defaults.go @@ -32,7 +32,7 @@ func init() { } } }, - func(obj *ContainerPort) { + func(obj *Port) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP } @@ -86,7 +86,7 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" && len(obj.Endpoints) > 0 { + if obj.Protocol == "" { obj.Protocol = "TCP" } }, diff --git a/pkg/api/v1beta1/defaults_test.go b/pkg/api/v1beta1/defaults_test.go index 2127ee0f647..d7372045247 100644 --- a/pkg/api/v1beta1/defaults_test.go +++ b/pkg/api/v1beta1/defaults_test.go @@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) { func TestSetDefaultContainer(t *testing.T) { bp := ¤t.BoundPod{} bp.Spec.Containers = []current.Container{{}} - bp.Spec.Containers[0].Ports = []current.ContainerPort{{}} + bp.Spec.Containers[0].Ports = []current.Port{{}} obj2 := roundTrip(t, runtime.Object(bp)) bp2 := obj2.(*current.BoundPod) @@ -103,18 +103,8 @@ func TestSetDefaultSecret(t *testing.T) { } } -func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) { - in := ¤t.Endpoints{} - obj := roundTrip(t, runtime.Object(in)) - out := obj.(*current.Endpoints) - - if out.Protocol != "" { - t.Errorf("Expected protocol \"\", got %s", out.Protocol) - } -} - func TestSetDefaulEndpointsProtocol(t *testing.T) { - in := ¤t.Endpoints{Endpoints: []string{"1.2.3.4:5678"}} + in := ¤t.Endpoints{} obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) diff --git a/pkg/api/v1beta1/register.go b/pkg/api/v1beta1/register.go index d98c038230c..f61f806f08a 100644 --- a/pkg/api/v1beta1/register.go +++ b/pkg/api/v1beta1/register.go @@ -24,12 +24,6 @@ import ( // Codec encodes internal objects to the v1beta1 scheme var Codec = runtime.CodecFor(api.Scheme, "v1beta1") -// Dependency does nothing but give a hook for other packages to force a -// compile-time error when this API version is eventually removed. This is -// useful, for example, to clean up things that are implicitly tied to -// semantics of older APIs. -const Dependency = true - func init() { api.Scheme.AddKnownTypes("v1beta1", &Pod{}, diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index c9006798e77..9e2bd555a68 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -161,8 +161,8 @@ type SecretVolumeSource struct { Target ObjectReference `json:"target" description:"target is a reference to a secret"` } -// ContainerPort represents a network port in a single container -type ContainerPort struct { +// Port represents a network port in a single container +type Port struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty" description:"name for the port that can be referred to by services; must be a DNS_LABEL and unique without the pod"` @@ -283,7 +283,7 @@ type Container struct { Command []string `json:"command,omitempty" description:"command argv array; not executed within a shell; defaults to entrypoint or command in the image"` // Optional: Defaults to Docker's default. WorkingDir string `json:"workingDir,omitempty" description:"container's working directory; defaults to image's default"` - Ports []ContainerPort `json:"ports,omitempty" description:"list of ports to expose from the container"` + Ports []Port `json:"ports,omitempty" description:"list of ports to expose from the container"` Env []EnvVar `json:"env,omitempty" description:"list of environment variables to set in the container"` Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"` // Optional: Defaults to unlimited. diff --git a/pkg/api/v1beta2/conversion.go b/pkg/api/v1beta2/conversion.go index 44371412c91..837b9366a90 100644 --- a/pkg/api/v1beta2/conversion.go +++ b/pkg/api/v1beta2/conversion.go @@ -1038,16 +1038,12 @@ func init() { if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil { return err } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } for i := range in.Endpoints { ep := &in.Endpoints[i] - // newer.Endpoints.Endpoints[i].Ports is an array - take the first one. - if len(ep.Ports) > 0 { - port := &ep.Ports[0] - if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil { - return err - } - out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port))) - } + out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) } return nil }, @@ -1058,20 +1054,22 @@ func init() { if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil { return err } + if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil { + return err + } for i := range in.Endpoints { + out.Endpoints = append(out.Endpoints, newer.Endpoint{}) + ep := &out.Endpoints[i] host, port, err := net.SplitHostPort(in.Endpoints[i]) if err != nil { return err } + ep.IP = host pn, err := strconv.Atoi(port) if err != nil { return err } - epp := newer.EndpointPort{Port: pn} - if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil { - return err - } - out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}}) + ep.Port = pn } return nil }, diff --git a/pkg/api/v1beta2/conversion_test.go b/pkg/api/v1beta2/conversion_test.go index ac88b233d40..b606476466d 100644 --- a/pkg/api/v1beta2/conversion_test.go +++ b/pkg/api/v1beta2/conversion_test.go @@ -220,35 +220,41 @@ func TestEndpointsConversion(t *testing.T) { }{ { given: current.Endpoints{ - Protocol: "", + TypeMeta: current.TypeMeta{ + ID: "empty", + }, + Protocol: current.ProtocolTCP, Endpoints: []string{}, }, expected: newer.Endpoints{ + Protocol: newer.ProtocolTCP, Endpoints: []newer.Endpoint{}, }, }, { given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "one", + }, Protocol: current.ProtocolTCP, Endpoints: []string{"1.2.3.4:88"}, }, expected: newer.Endpoints{ - Endpoints: []newer.Endpoint{ - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}}, - }, + Protocol: newer.ProtocolTCP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}}, }, }, { given: current.Endpoints{ + TypeMeta: current.TypeMeta{ + ID: "several", + }, Protocol: current.ProtocolUDP, Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"}, }, expected: newer.Endpoints{ - Endpoints: []newer.Endpoint{ - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}}, - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}}, - {IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}}, - }, + Protocol: newer.ProtocolUDP, + Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}}, }, }, } @@ -260,7 +266,7 @@ func TestEndpointsConversion(t *testing.T) { t.Errorf("[Case: %d] Unexpected error: %v", i, err) continue } - if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { + if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) { t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got) } diff --git a/pkg/api/v1beta2/defaults.go b/pkg/api/v1beta2/defaults.go index ddf26cf5119..db59b8cd272 100644 --- a/pkg/api/v1beta2/defaults.go +++ b/pkg/api/v1beta2/defaults.go @@ -34,7 +34,7 @@ func init() { } } }, - func(obj *ContainerPort) { + func(obj *Port) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP } @@ -88,7 +88,7 @@ func init() { } }, func(obj *Endpoints) { - if obj.Protocol == "" && len(obj.Endpoints) > 0 { + if obj.Protocol == "" { obj.Protocol = "TCP" } }, diff --git a/pkg/api/v1beta2/defaults_test.go b/pkg/api/v1beta2/defaults_test.go index 1141f53d343..a73c83a93e6 100644 --- a/pkg/api/v1beta2/defaults_test.go +++ b/pkg/api/v1beta2/defaults_test.go @@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) { func TestSetDefaultContainer(t *testing.T) { bp := ¤t.BoundPod{} bp.Spec.Containers = []current.Container{{}} - bp.Spec.Containers[0].Ports = []current.ContainerPort{{}} + bp.Spec.Containers[0].Ports = []current.Port{{}} obj2 := roundTrip(t, runtime.Object(bp)) bp2 := obj2.(*current.BoundPod) @@ -103,18 +103,8 @@ func TestSetDefaultSecret(t *testing.T) { } } -func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) { - in := ¤t.Endpoints{} - obj := roundTrip(t, runtime.Object(in)) - out := obj.(*current.Endpoints) - - if out.Protocol != "" { - t.Errorf("Expected protocol \"\", got %s", out.Protocol) - } -} - func TestSetDefaulEndpointsProtocol(t *testing.T) { - in := ¤t.Endpoints{Endpoints: []string{"1.2.3.4:5678"}} + in := ¤t.Endpoints{} obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) diff --git a/pkg/api/v1beta2/register.go b/pkg/api/v1beta2/register.go index 39612227928..990aa7b2039 100644 --- a/pkg/api/v1beta2/register.go +++ b/pkg/api/v1beta2/register.go @@ -24,12 +24,6 @@ import ( // Codec encodes internal objects to the v1beta2 scheme var Codec = runtime.CodecFor(api.Scheme, "v1beta2") -// Dependency does nothing but give a hook for other packages to force a -// compile-time error when this API version is eventually removed. This is -// useful, for example, to clean up things that are implicitly tied to -// semantics of older APIs. -const Dependency = true - func init() { api.Scheme.AddKnownTypes("v1beta2", &Pod{}, diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 05ccd60f0df..6787cc6c32e 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -99,8 +99,8 @@ const ( ProtocolUDP Protocol = "UDP" ) -// ContainerPort represents a network port in a single container. -type ContainerPort struct { +// Port represents a network port in a single container. +type Port struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty" description:"name for the port that can be referred to by services; must be a DNS_LABEL and unique without the pod"` @@ -242,7 +242,7 @@ type Container struct { Command []string `json:"command,omitempty" description:"command argv array; not executed within a shell; defaults to entrypoint or command in the image"` // Optional: Defaults to Docker's default. WorkingDir string `json:"workingDir,omitempty" description:"container's working directory; defaults to image's default"` - Ports []ContainerPort `json:"ports,omitempty" description:"list of ports to expose from the container"` + Ports []Port `json:"ports,omitempty" description:"list of ports to expose from the container"` Env []EnvVar `json:"env,omitempty" description:"list of environment variables to set in the container"` Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"` // Optional: Defaults to unlimited. diff --git a/pkg/api/v1beta3/defaults.go b/pkg/api/v1beta3/defaults.go index c9cb2c851ee..55c867c3614 100644 --- a/pkg/api/v1beta3/defaults.go +++ b/pkg/api/v1beta3/defaults.go @@ -32,7 +32,7 @@ func init() { } } }, - func(obj *ContainerPort) { + func(obj *Port) { if obj.Protocol == "" { obj.Protocol = ProtocolTCP } @@ -81,14 +81,8 @@ func init() { } }, func(obj *Endpoints) { - for i := range obj.Endpoints { - ep := &obj.Endpoints[i] - for j := range ep.Ports { - port := &ep.Ports[j] - if port.Protocol == "" { - port.Protocol = ProtocolTCP - } - } + if obj.Protocol == "" { + obj.Protocol = "TCP" } }, ) diff --git a/pkg/api/v1beta3/defaults_test.go b/pkg/api/v1beta3/defaults_test.go index 2ac72ae3360..e1ebf388b1e 100644 --- a/pkg/api/v1beta3/defaults_test.go +++ b/pkg/api/v1beta3/defaults_test.go @@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) { func TestSetDefaultContainer(t *testing.T) { bp := ¤t.BoundPod{} bp.Spec.Containers = []current.Container{{}} - bp.Spec.Containers[0].Ports = []current.ContainerPort{{}} + bp.Spec.Containers[0].Ports = []current.Port{{}} obj2 := roundTrip(t, runtime.Object(bp)) bp2 := obj2.(*current.BoundPod) @@ -104,25 +104,11 @@ func TestSetDefaultSecret(t *testing.T) { } func TestSetDefaulEndpointsProtocol(t *testing.T) { - in := ¤t.Endpoints{ - Endpoints: []current.Endpoint{ - {IP: "1.2.3.4", Ports: []current.EndpointPort{ - {Protocol: "TCP"}, - {Protocol: "UDP"}, - {Protocol: ""}, - }}, - }, - } + in := ¤t.Endpoints{} obj := roundTrip(t, runtime.Object(in)) out := obj.(*current.Endpoints) - if out.Endpoints[0].Ports[0].Protocol != current.ProtocolTCP { - t.Errorf("Expected protocol[0] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[0].Protocol) - } - if out.Endpoints[0].Ports[1].Protocol != current.ProtocolUDP { - t.Errorf("Expected protocol[1] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[1].Protocol) - } - if out.Endpoints[0].Ports[2].Protocol != current.ProtocolTCP { - t.Errorf("Expected protocol[2] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[2].Protocol) + if out.Protocol != current.ProtocolTCP { + t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol) } } diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 8235091a08e..2e79c5e19d6 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -254,8 +254,8 @@ type SecretVolumeSource struct { Target ObjectReference `json:"target" description:"target is a reference to a secret"` } -// ContainerPort represents a network port in a single container. -type ContainerPort struct { +// Port represents a network port in a single container. +type Port struct { // Optional: If specified, this must be a DNS_LABEL. Each named port // in a pod must have a unique name. Name string `json:"name,omitempty"` @@ -367,7 +367,7 @@ type Container struct { Command []string `json:"command,omitempty"` // Optional: Defaults to Docker's default. WorkingDir string `json:"workingDir,omitempty"` - Ports []ContainerPort `json:"ports,omitempty"` + Ports []Port `json:"ports,omitempty"` Env []EnvVar `json:"env,omitempty"` Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"` VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"` @@ -782,6 +782,10 @@ type Endpoints struct { TypeMeta `json:",inline"` ObjectMeta `json:"metadata,omitempty"` + // Optional: The IP protocol for these endpoints. Supports "TCP" and + // "UDP". Defaults to "TCP". + Protocol Protocol `json:"protocol,omitempty"` + Endpoints []Endpoint `json:"endpoints,omitempty"` } @@ -791,20 +795,6 @@ type Endpoint struct { // TODO: This should allow hostname or IP, see #4447. IP string `json:"ip"` - // The ports exposed on this IP. - Ports []EndpointPort `json:"ports,omitempty"` -} - -type EndpointPort struct { - // Optional if only one port is defined in this Endpoint, otherwise required. - // The name of this port within the larger service/endpoint structure. - // This must be a DNS_LABEL. - Name string `json:"name,omitempty"` - - // Optional: The IP protocol for this port. Supports "TCP" and "UDP". - // Defaults to "TCP". - Protocol Protocol `json:"protocol,omitempty"` - // Required: The destination port to access. Port int `json:"port"` } diff --git a/pkg/api/validation/validation.go b/pkg/api/validation/validation.go index 2dba60266dc..d3b12f8d44b 100644 --- a/pkg/api/validation/validation.go +++ b/pkg/api/validation/validation.go @@ -325,7 +325,7 @@ func validateSecretVolumeSource(secretSource *api.SecretVolumeSource) errs.Valid var supportedPortProtocols = util.NewStringSet(string(api.ProtocolTCP), string(api.ProtocolUDP)) -func validatePorts(ports []api.ContainerPort) errs.ValidationErrorList { +func validatePorts(ports []api.Port) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} allNames := util.StringSet{} @@ -410,7 +410,7 @@ func validateProbe(probe *api.Probe) errs.ValidationErrorList { // AccumulateUniquePorts runs an extraction function on each Port of each Container, // accumulating the results and returning an error if any ports conflict. -func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, extract func(*api.ContainerPort) int) errs.ValidationErrorList { +func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, extract func(*api.Port) int) errs.ValidationErrorList { allErrs := errs.ValidationErrorList{} for ci, ctr := range containers { @@ -435,7 +435,7 @@ func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, // a slice of containers. func checkHostPortConflicts(containers []api.Container) errs.ValidationErrorList { allPorts := map[int]bool{} - return AccumulateUniquePorts(containers, allPorts, func(p *api.ContainerPort) int { return p.HostPort }) + return AccumulateUniquePorts(containers, allPorts, func(p *api.Port) int { return p.HostPort }) } func validateExecAction(exec *api.ExecAction) errs.ValidationErrorList { diff --git a/pkg/api/validation/validation_test.go b/pkg/api/validation/validation_test.go index 78b5a9a6554..ca2ac42f8dd 100644 --- a/pkg/api/validation/validation_test.go +++ b/pkg/api/validation/validation_test.go @@ -195,7 +195,7 @@ func TestValidateVolumes(t *testing.T) { } func TestValidatePorts(t *testing.T) { - successCase := []api.ContainerPort{ + successCase := []api.Port{ {Name: "abc", ContainerPort: 80, HostPort: 80, Protocol: "TCP"}, {Name: "easy", ContainerPort: 82, Protocol: "TCP"}, {Name: "as", ContainerPort: 83, Protocol: "UDP"}, @@ -207,7 +207,7 @@ func TestValidatePorts(t *testing.T) { t.Errorf("expected success: %v", errs) } - nonCanonicalCase := []api.ContainerPort{ + nonCanonicalCase := []api.Port{ {ContainerPort: 80, Protocol: "TCP"}, } if errs := validatePorts(nonCanonicalCase); len(errs) != 0 { @@ -215,22 +215,22 @@ func TestValidatePorts(t *testing.T) { } errorCases := map[string]struct { - P []api.ContainerPort + P []api.Port T errors.ValidationErrorType F string D string }{ - "name > 63 characters": {[]api.ContainerPort{{Name: strings.Repeat("a", 64), ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, - "name not a DNS label": {[]api.ContainerPort{{Name: "a.b.c", ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, - "name not unique": {[]api.ContainerPort{ + "name > 63 characters": {[]api.Port{{Name: strings.Repeat("a", 64), ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, + "name not a DNS label": {[]api.Port{{Name: "a.b.c", ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg}, + "name not unique": {[]api.Port{ {Name: "abc", ContainerPort: 80, Protocol: "TCP"}, {Name: "abc", ContainerPort: 81, Protocol: "TCP"}, }, errors.ValidationErrorTypeDuplicate, "[1].name", ""}, - "zero container port": {[]api.ContainerPort{{ContainerPort: 0, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, - "invalid container port": {[]api.ContainerPort{{ContainerPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, - "invalid host port": {[]api.ContainerPort{{ContainerPort: 80, HostPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].hostPort", portRangeErrorMsg}, - "invalid protocol": {[]api.ContainerPort{{ContainerPort: 80, Protocol: "ICMP"}}, errors.ValidationErrorTypeNotSupported, "[0].protocol", ""}, - "protocol required": {[]api.ContainerPort{{Name: "abc", ContainerPort: 80}}, errors.ValidationErrorTypeRequired, "[0].protocol", ""}, + "zero container port": {[]api.Port{{ContainerPort: 0, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, + "invalid container port": {[]api.Port{{ContainerPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg}, + "invalid host port": {[]api.Port{{ContainerPort: 80, HostPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].hostPort", portRangeErrorMsg}, + "invalid protocol": {[]api.Port{{ContainerPort: 80, Protocol: "ICMP"}}, errors.ValidationErrorTypeNotSupported, "[0].protocol", ""}, + "protocol required": {[]api.Port{{Name: "abc", ContainerPort: 80}}, errors.ValidationErrorTypeRequired, "[0].protocol", ""}, } for k, v := range errorCases { errs := validatePorts(v.P) @@ -433,9 +433,9 @@ func TestValidateContainers(t *testing.T) { }, "zero-length image": {{Name: "abc", Image: "", ImagePullPolicy: "IfNotPresent"}}, "host port not unique": { - {Name: "abc", Image: "image", Ports: []api.ContainerPort{{ContainerPort: 80, HostPort: 80, Protocol: "TCP"}}, + {Name: "abc", Image: "image", Ports: []api.Port{{ContainerPort: 80, HostPort: 80, Protocol: "TCP"}}, ImagePullPolicy: "IfNotPresent"}, - {Name: "def", Image: "image", Ports: []api.ContainerPort{{ContainerPort: 81, HostPort: 80, Protocol: "TCP"}}, + {Name: "def", Image: "image", Ports: []api.Port{{ContainerPort: 81, HostPort: 80, Protocol: "TCP"}}, ImagePullPolicy: "IfNotPresent"}, }, "invalid env var name": { @@ -587,7 +587,7 @@ func TestValidateManifest(t *testing.T) { "memory": resource.MustParse("1"), }, }, - Ports: []api.ContainerPort{ + Ports: []api.Port{ {Name: "p1", ContainerPort: 80, HostPort: 8080, Protocol: "TCP"}, {Name: "p2", ContainerPort: 81, Protocol: "TCP"}, {ContainerPort: 82, Protocol: "TCP"}, @@ -934,7 +934,7 @@ func TestValidatePodUpdate(t *testing.T) { Containers: []api.Container{ { Image: "foo:V1", - Ports: []api.ContainerPort{ + Ports: []api.Port{ {HostPort: 8080, ContainerPort: 80}, }, }, @@ -947,7 +947,7 @@ func TestValidatePodUpdate(t *testing.T) { Containers: []api.Container{ { Image: "foo:V2", - Ports: []api.ContainerPort{ + Ports: []api.Port{ {HostPort: 8000, ContainerPort: 80}, }, }, diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index e3dc1d213a5..096b887dbcf 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -616,9 +616,7 @@ func TestListEndpooints(t *testing.T) { { ObjectMeta: api.ObjectMeta{Name: "endpoint-1"}, Endpoints: []api.Endpoint{ - {IP: "10.245.1.2", Ports: []api.EndpointPort{{Port: 8080}}}, - {IP: "10.245.1.3", Ports: []api.EndpointPort{{Port: 8080}}}, - }, + {IP: "10.245.1.2", Port: 8080}, {IP: "10.245.1.3", Port: 8080}}, }, }, }, diff --git a/pkg/constraint/constraint_test.go b/pkg/constraint/constraint_test.go index 7f76ddf196b..eae45221270 100644 --- a/pkg/constraint/constraint_test.go +++ b/pkg/constraint/constraint_test.go @@ -26,7 +26,7 @@ import ( func containerWithHostPorts(ports ...int) api.Container { c := api.Container{} for _, p := range ports { - c.Ports = append(c.Ports, api.ContainerPort{HostPort: p}) + c.Ports = append(c.Ports, api.Port{HostPort: p}) } return c } diff --git a/pkg/kubectl/resource_printer.go b/pkg/kubectl/resource_printer.go index 05b7024fa85..f3cf2714a6d 100644 --- a/pkg/kubectl/resource_printer.go +++ b/pkg/kubectl/resource_printer.go @@ -22,8 +22,10 @@ import ( "fmt" "io" "io/ioutil" + "net" "reflect" "sort" + "strconv" "strings" "text/tabwriter" "text/template" @@ -268,11 +270,10 @@ func formatEndpoints(endpoints []api.Endpoint) string { return "" } list := []string{} - //FIXME: What do we want to print, now that endpoints are more complex? - //for i := range endpoints { - // ep := &endpoints[i] - // list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) - //} + for i := range endpoints { + ep := &endpoints[i] + list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) + } return strings.Join(list, ",") } diff --git a/pkg/kubectl/resource_printer_test.go b/pkg/kubectl/resource_printer_test.go index fff9bf09ffd..cc6f7ab6b6b 100644 --- a/pkg/kubectl/resource_printer_test.go +++ b/pkg/kubectl/resource_printer_test.go @@ -452,10 +452,7 @@ func TestPrinters(t *testing.T) { "pod": &api.Pod{ObjectMeta: om("pod")}, "emptyPodList": &api.PodList{}, "nonEmptyPodList": &api.PodList{Items: []api.Pod{{}}}, - "endpoints": &api.Endpoints{Endpoints: []api.Endpoint{ - {IP: "127.0.0.1"}, - {IP: "localhost", Ports: []api.EndpointPort{{Port: 8080}}}, - }}, + "endpoints": &api.Endpoints{Endpoints: []api.Endpoint{{IP: "127.0.0.1"}, {IP: "localhost", Port: 8080}}}, } // map of printer name to set of objects it should fail on. expectedErrors := map[string]util.StringSet{ diff --git a/pkg/kubectl/run.go b/pkg/kubectl/run.go index 4e6933be39f..3b8e69e1cf2 100644 --- a/pkg/kubectl/run.go +++ b/pkg/kubectl/run.go @@ -82,7 +82,7 @@ func (BasicReplicationController) Generate(params map[string]string) (runtime.Ob // Don't include the port if it was not specified. if port > 0 { - controller.Spec.Template.Spec.Containers[0].Ports = []api.ContainerPort{ + controller.Spec.Template.Spec.Containers[0].Ports = []api.Port{ { ContainerPort: port, }, diff --git a/pkg/kubectl/run_test.go b/pkg/kubectl/run_test.go index 760b74ca707..f009d81c4ea 100644 --- a/pkg/kubectl/run_test.go +++ b/pkg/kubectl/run_test.go @@ -84,7 +84,7 @@ func TestGenerate(t *testing.T) { { Name: "foo", Image: "someimage", - Ports: []api.ContainerPort{ + Ports: []api.Port{ { ContainerPort: 80, }, diff --git a/pkg/kubelet/handlers_test.go b/pkg/kubelet/handlers_test.go index 8f4525a4e7c..a211f84a5a3 100644 --- a/pkg/kubelet/handlers_test.go +++ b/pkg/kubelet/handlers_test.go @@ -38,7 +38,7 @@ func TestResolvePortString(t *testing.T) { expected := 80 name := "foo" container := &api.Container{ - Ports: []api.ContainerPort{ + Ports: []api.Port{ {Name: name, ContainerPort: expected}, }, } @@ -55,7 +55,7 @@ func TestResolvePortStringUnknown(t *testing.T) { expected := 80 name := "foo" container := &api.Container{ - Ports: []api.ContainerPort{ + Ports: []api.Port{ {Name: "bar", ContainerPort: expected}, }, } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 70df616a16d..a7fcc45a956 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -917,7 +917,7 @@ const ( // createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container. func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.DockerID, error) { - var ports []api.ContainerPort + var ports []api.Port // Docker only exports ports from the pod infra container. Let's // collect all of the relevant ports and export them. for _, container := range pod.Spec.Containers { @@ -1411,7 +1411,7 @@ func updateBoundPods(changed []api.BoundPod, current []api.BoundPod) []api.Bound func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod { filtered := []api.BoundPod{} ports := map[int]bool{} - extract := func(p *api.ContainerPort) int { return p.HostPort } + extract := func(p *api.Port) int { return p.HostPort } for i := range pods { pod := &pods[i] if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 46d8fa67bea..b1e45591e3e 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1136,7 +1136,7 @@ func TestMakeVolumesAndBinds(t *testing.T) { func TestMakePortsAndBindings(t *testing.T) { container := api.Container{ - Ports: []api.ContainerPort{ + Ports: []api.Port{ { ContainerPort: 80, HostPort: 8080, @@ -1200,12 +1200,12 @@ func TestMakePortsAndBindings(t *testing.T) { func TestCheckHostPortConflicts(t *testing.T) { successCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } successCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}, + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}}, } expected := append(successCaseAll, successCaseNew) if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) { @@ -1213,12 +1213,12 @@ func TestCheckHostPortConflicts(t *testing.T) { } failureCaseAll := []api.BoundPod{ - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}}, - {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}}, + {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}}, } failureCaseNew := api.BoundPod{ - Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, + Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}, } if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) { t.Errorf("Expected %#v, Got %#v", expected, actual) diff --git a/pkg/kubelet/probe_test.go b/pkg/kubelet/probe_test.go index 988ebf55cdf..4a22e5e8a64 100644 --- a/pkg/kubelet/probe_test.go +++ b/pkg/kubelet/probe_test.go @@ -32,7 +32,7 @@ import ( func TestFindPortByName(t *testing.T) { container := api.Container{ - Ports: []api.ContainerPort{ + Ports: []api.Port{ { Name: "foo", HostPort: 8080, @@ -71,7 +71,7 @@ func TestGetURLParts(t *testing.T) { for _, test := range testCases { state := api.PodStatus{PodIP: "127.0.0.1"} container := api.Container{ - Ports: []api.ContainerPort{{Name: "found", HostPort: 93}}, + Ports: []api.Port{{Name: "found", HostPort: 93}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ HTTPGet: test.probe, @@ -114,7 +114,7 @@ func TestGetTCPAddrParts(t *testing.T) { for _, test := range testCases { host := "1.2.3.4" container := api.Container{ - Ports: []api.ContainerPort{{Name: "found", HostPort: 93}}, + Ports: []api.Port{{Name: "found", HostPort: 93}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ TCPSocket: test.probe, diff --git a/pkg/master/publish.go b/pkg/master/publish.go index ce9e1b53e76..7cc73559965 100644 --- a/pkg/master/publish.go +++ b/pkg/master/publish.go @@ -128,35 +128,25 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) error { ctx := api.NewDefaultContext() e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName) - if err != nil { + if err != nil || e.Protocol != api.ProtocolTCP { e = &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: serviceName, Namespace: api.NamespaceDefault, }, + Protocol: api.ProtocolTCP, } } found := false -FindEndpointLoop: for i := range e.Endpoints { ep := &e.Endpoints[i] - if ep.IP == ip.String() { - for j := range ep.Ports { - epp := &ep.Ports[j] - if epp.Protocol == api.ProtocolTCP && epp.Port == port { - found = true - break FindEndpointLoop - } - } + if ep.IP == ip.String() && ep.Port == port { + found = true + break } } if !found { - e.Endpoints = append(e.Endpoints, api.Endpoint{ - IP: ip.String(), - Ports: []api.EndpointPort{ - {Protocol: api.ProtocolTCP, Port: port}, - }, - }) + e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port}) } if len(e.Endpoints) > m.masterCount { // We append to the end and remove from the beginning, so this should diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 1b4430fc551..29489f839f3 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -185,7 +185,7 @@ func TestServicesFromZeroError(t *testing.T) { func TestEndpoints(t *testing.T) { endpoint := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, } fakeWatch := watch.NewFake() @@ -235,7 +235,7 @@ func TestEndpoints(t *testing.T) { func TestEndpointsFromZero(t *testing.T) { endpoint := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, } fakeWatch := watch.NewFake() diff --git a/pkg/proxy/loadbalancer.go b/pkg/proxy/loadbalancer.go index bb9f93bf269..a94665383d8 100644 --- a/pkg/proxy/loadbalancer.go +++ b/pkg/proxy/loadbalancer.go @@ -25,8 +25,8 @@ import ( // LoadBalancer is an interface for distributing incoming requests to service endpoints. type LoadBalancer interface { // NextEndpoint returns the endpoint to handle a request for the given - // serviceName:portName and source address. - NextEndpoint(service string, port string, srcAddr net.Addr) (string, error) - NewService(service string, port string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error - CleanupStaleStickySessions(service string, port string) + // service and source address. + NextEndpoint(service string, srcAddr net.Addr) (string, error) + NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error + CleanupStaleStickySessions(service string) } diff --git a/pkg/proxy/proxier.go b/pkg/proxy/proxier.go index e2b08522362..245689cc134 100644 --- a/pkg/proxy/proxier.go +++ b/pkg/proxy/proxier.go @@ -69,8 +69,7 @@ type tcpProxySocket struct { func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) { for _, retryTimeout := range endpointDialTimeout { - // TODO: support multiple service ports - endpoint, err := proxier.loadBalancer.NextEndpoint(service, "", srcAddr) + endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr) if err != nil { glog.Errorf("Couldn't find an endpoint for %s: %v", service, err) return nil, err @@ -384,8 +383,7 @@ func (proxier *Proxier) ensurePortals() { func (proxier *Proxier) cleanupStaleStickySessions() { for name, info := range proxier.serviceMap { if info.sessionAffinityType != api.AffinityTypeNone { - // TODO: support multiple service ports - proxier.loadBalancer.CleanupStaleStickySessions(name, "") + proxier.loadBalancer.CleanupStaleStickySessions(name) } } } @@ -501,8 +499,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) { if err != nil { glog.Errorf("Failed to open portal for %q: %v", service.Name, err) } - // TODO: support multiple service ports - proxier.loadBalancer.NewService(service.Name, "", info.sessionAffinityType, info.stickyMaxAgeMinutes) + proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes) } proxier.mu.Lock() defer proxier.mu.Unlock() diff --git a/pkg/proxy/proxier_test.go b/pkg/proxy/proxier_test.go index 71c52b43924..5215bb546eb 100644 --- a/pkg/proxy/proxier_test.go +++ b/pkg/proxy/proxier_test.go @@ -197,7 +197,7 @@ func TestTCPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -217,7 +217,7 @@ func TestUDPProxy(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -246,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -277,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -308,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -338,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -368,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -407,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) @@ -446,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}}, }, }) @@ -482,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) { lb.OnUpdate([]api.Endpoints{ { ObjectMeta: api.ObjectMeta{Name: "echo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}}, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}}, }, }) diff --git a/pkg/proxy/roundrobin.go b/pkg/proxy/roundrobin.go index 384025774ef..070c82ee70f 100644 --- a/pkg/proxy/roundrobin.go +++ b/pkg/proxy/roundrobin.go @@ -18,7 +18,6 @@ package proxy import ( "errors" - "fmt" "net" "reflect" "strconv" @@ -35,83 +34,62 @@ var ( ErrMissingEndpoints = errors.New("missing endpoints") ) -type affinityState struct { - clientIP string +type sessionAffinityDetail struct { + clientIPAddress string //clientProtocol api.Protocol //not yet used //sessionCookie string //not yet used - endpoint string - lastUsed time.Time + endpoint string + lastUsedDTTM time.Time } -type affinityPolicy struct { - affinityType api.AffinityType - affinityMap map[string]*affinityState // map client IP -> affinity info - ttlMinutes int -} - -// balancerKey is a string that the balancer uses to key stored state. It is -// formatted as "service_name:port_name", but that should be opaque to most consumers. -type balancerKey string - -func makeBalancerKey(service, port string) balancerKey { - return balancerKey(fmt.Sprintf("%s:%s", service, port)) +type serviceDetail struct { + name string + sessionAffinityType api.AffinityType + sessionAffinityMap map[string]*sessionAffinityDetail + stickyMaxAgeMinutes int } // LoadBalancerRR is a round-robin load balancer. type LoadBalancerRR struct { - lock sync.RWMutex - services map[balancerKey]*balancerState + lock sync.RWMutex + endpointsMap map[string][]string + rrIndex map[string]int + serviceDtlMap map[string]serviceDetail } -// Ensure this implements LoadBalancer. -var _ LoadBalancer = &LoadBalancerRR{} - -type balancerState struct { - endpoints []string // a list of "ip:port" style strings - index int // index into endpoints - affinity affinityPolicy -} - -func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy { - return &affinityPolicy{ - affinityType: affinityType, - affinityMap: make(map[string]*affinityState), - ttlMinutes: ttlMinutes, +func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail { + return &serviceDetail{ + name: service, + sessionAffinityType: sessionAffinityType, + sessionAffinityMap: make(map[string]*sessionAffinityDetail), + stickyMaxAgeMinutes: stickyMaxAgeMinutes, } } // NewLoadBalancerRR returns a new LoadBalancerRR. func NewLoadBalancerRR() *LoadBalancerRR { return &LoadBalancerRR{ - services: map[balancerKey]*balancerState{}, + endpointsMap: make(map[string][]string), + rrIndex: make(map[string]int), + serviceDtlMap: make(map[string]serviceDetail), } } -func (lb *LoadBalancerRR) NewService(service, port string, affinityType api.AffinityType, ttlMinutes int) error { - lb.lock.Lock() - defer lb.lock.Unlock() - - lb.newServiceInternal(service, port, affinityType, ttlMinutes) +func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error { + if stickyMaxAgeMinutes == 0 { + stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? + } + if _, exists := lb.serviceDtlMap[service]; !exists { + lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes) + glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service]) + } return nil } -func (lb *LoadBalancerRR) newServiceInternal(service, port string, affinityType api.AffinityType, ttlMinutes int) *balancerState { - if ttlMinutes == 0 { - ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead???? - } - - key := makeBalancerKey(service, port) - if _, exists := lb.services[key]; !exists { - lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)} - glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service) - } - return lb.services[key] -} - -// return true if this service is using some form of session affinity. -func isSessionAffinity(affinity *affinityPolicy) bool { - // Should never be empty string, but checking for it to be safe. - if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone { +// return true if this service detail is using some form of session affinity. +func isSessionAffinity(serviceDtl serviceDetail) bool { + //Should never be empty string, but chekcing for it to be safe. + if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone { return false } return true @@ -119,111 +97,100 @@ func isSessionAffinity(affinity *affinityPolicy) bool { // NextEndpoint returns a service endpoint. // The service endpoint is chosen using the round-robin algorithm. -func (lb *LoadBalancerRR) NextEndpoint(service, port string, srcAddr net.Addr) (string, error) { - // Coarse locking is simple. We can get more fine-grained if/when we - // can prove it matters. - lb.lock.Lock() - defer lb.lock.Unlock() +func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) { + var ipaddr string + glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap) - key := makeBalancerKey(service, port) - state, exists := lb.services[key] - if !exists || state == nil { + lb.lock.RLock() + serviceDtls, exists := lb.serviceDtlMap[service] + endpoints, _ := lb.endpointsMap[service] + index := lb.rrIndex[service] + sessionAffinityEnabled := isSessionAffinity(serviceDtls) + + lb.lock.RUnlock() + if !exists { return "", ErrMissingServiceEntry } - if len(state.endpoints) == 0 { + if len(endpoints) == 0 { return "", ErrMissingEndpoints } - glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints) - - sessionAffinityEnabled := isSessionAffinity(&state.affinity) - - var ipaddr string if sessionAffinityEnabled { - // Caution: don't shadow ipaddr - var err error - ipaddr, _, err = net.SplitHostPort(srcAddr.String()) - if err != nil { - return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err) + if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil { + ipaddr, _, _ = net.SplitHostPort(srcAddr.String()) } - sessionAffinity, exists := state.affinity.affinityMap[ipaddr] - if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes { - // Affinity wins. + sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr] + glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) + if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes { endpoint := sessionAffinity.endpoint - sessionAffinity.lastUsed = time.Now() - glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint) + sessionAffinity.lastUsedDTTM = time.Now() + glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity) return endpoint, nil } } - // Take the next endpoint. - endpoint := state.endpoints[state.index] - state.index = (state.index + 1) % len(state.endpoints) + endpoint := endpoints[index] + lb.lock.Lock() + lb.rrIndex[service] = (index + 1) % len(endpoints) if sessionAffinityEnabled { - var affinity *affinityState - affinity = state.affinity.affinityMap[ipaddr] + var affinity *sessionAffinityDetail + affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] if affinity == nil { - affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()} - state.affinity.affinityMap[ipaddr] = affinity + affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()} + lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity } - affinity.lastUsed = time.Now() + affinity.lastUsedDTTM = time.Now() affinity.endpoint = endpoint - affinity.clientIP = ipaddr - glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr]) + affinity.clientIPAddress = ipaddr + + glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr]) } + lb.lock.Unlock() return endpoint, nil } -type hostPortPair struct { - host string - port int +func isValidEndpoint(ep *api.Endpoint) bool { + return ep.IP != "" && ep.Port > 0 } -func isValidEndpoint(hpp *hostPortPair) bool { - return hpp.host != "" && hpp.port > 0 -} - -func getValidEndpoints(pairs []hostPortPair) []string { - // Convert structs into strings for easier use later. +func filterValidEndpoints(endpoints []api.Endpoint) []string { + // Convert Endpoint objects into strings for easier use later. Ignore + // the protocol field - we'll get that from the Service objects. var result []string - for i := range pairs { - hpp := &pairs[i] - if isValidEndpoint(hpp) { - result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port))) + for i := range endpoints { + ep := &endpoints[i] + if isValidEndpoint(ep) { + result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port))) } } return result } -// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down). -func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) { - for _, affinity := range state.affinity.affinityMap { - if affinity.endpoint == endpoint { - glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service) - delete(state.affinity.affinityMap, affinity.clientIP) +//remove any session affinity records associated to a particular endpoint (for example when a pod goes down). +func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) { + for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { + if affinityDetail.endpoint == endpoint { + glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service) + delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress) } } } -// Loop through the valid endpoints and then the endpoints associated with the Load Balancer. -// Then remove any session affinity records that are not in both lists. -// This assumes the lb.lock is held. -func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) { +//Loop through the valid endpoints and then the endpoints associated with the Load Balancer. +// Then remove any session affinity records that are not in both lists. +func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) { allEndpoints := map[string]int{} - for _, newEndpoint := range newEndpoints { - allEndpoints[newEndpoint] = 1 + for _, validEndpoint := range validEndpoints { + allEndpoints[validEndpoint] = 1 } - state, exists := lb.services[service] - if !exists { - return - } - for _, existingEndpoint := range state.endpoints { + for _, existingEndpoint := range lb.endpointsMap[service] { allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1 } for mKey, mVal := range allEndpoints { if mVal == 1 { - glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service) - removeSessionAffinityByEndpoint(state, service, mKey) + glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service) + removeSessionAffinityByEndpoint(lb, service, mKey) + delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey) } } } @@ -231,86 +198,44 @@ func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints [] // OnUpdate manages the registered service endpoints. // Registered endpoints are updated if found in the update set or // unregistered if missing from the update set. -func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) { - registeredEndpoints := make(map[balancerKey]bool) +func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) { + registeredEndpoints := make(map[string]bool) lb.lock.Lock() defer lb.lock.Unlock() - // Update endpoints for services. - for i := range allEndpoints { - svcEndpoints := &allEndpoints[i] + for _, endpoint := range endpoints { + existingEndpoints, exists := lb.endpointsMap[endpoint.Name] + validEndpoints := filterValidEndpoints(endpoint.Endpoints) + if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) { + glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints) + updateServiceDetailMap(lb, endpoint.Name, validEndpoints) + // On update can be called without NewService being called externally. + // to be safe we will call it here. A new service will only be created + // if one does not already exist. + lb.NewService(endpoint.Name, api.AffinityTypeNone, 0) + lb.endpointsMap[endpoint.Name] = slice.ShuffleStrings(validEndpoints) - // We need to build a map of portname -> all ip:ports for that portname. - portsToEndpoints := map[string][]hostPortPair{} - - // Explode the Endpoints.Endpoints[*].Ports[*] into the aforementioned map. - // FIXME: this is awkward. Maybe a different factoring of Endpoints is better? - for j := range svcEndpoints.Endpoints { - ep := &svcEndpoints.Endpoints[j] - for k := range ep.Ports { - epp := &ep.Ports[k] - portsToEndpoints[epp.Name] = append(portsToEndpoints[epp.Name], hostPortPair{ep.IP, epp.Port}) - // Ignore the protocol field - we'll get that from the Service objects. - } - } - - for portname := range portsToEndpoints { - key := makeBalancerKey(svcEndpoints.Name, portname) - state, exists := lb.services[key] - curEndpoints := []string{} - if state != nil { - curEndpoints = state.endpoints - } - newEndpoints := getValidEndpoints(portsToEndpoints[portname]) - if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) { - glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints) - lb.updateAffinityMap(key, newEndpoints) - // On update can be called without NewService being called externally. - // To be safe we will call it here. A new service will only be created - // if one does not already exist. - state = lb.newServiceInternal(svcEndpoints.Name, portname, api.AffinityTypeNone, 0) - state.endpoints = slice.ShuffleStrings(newEndpoints) - - // Reset the round-robin index. - state.index = 0 - } - registeredEndpoints[key] = true + // Reset the round-robin index. + lb.rrIndex[endpoint.Name] = 0 } + registeredEndpoints[endpoint.Name] = true } // Remove endpoints missing from the update. - for k := range lb.services { + for k, v := range lb.endpointsMap { if _, exists := registeredEndpoints[k]; !exists { - glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k) - delete(lb.services, k) + glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v) + delete(lb.endpointsMap, k) + delete(lb.serviceDtlMap, k) } } } -// Tests whether two slices are equivalent. This sorts both slices in-place. -func slicesEquiv(lhs, rhs []string) bool { - if len(lhs) != len(rhs) { - return false - } - if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) { - return true - } - return false -} - -func (lb *LoadBalancerRR) CleanupStaleStickySessions(service, port string) { - lb.lock.Lock() - defer lb.lock.Unlock() - - key := makeBalancerKey(service, port) - state, exists := lb.services[key] - if !exists { - glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service) - return - } - for ip, affinity := range state.affinity.affinityMap { - if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes { - glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service) - delete(state.affinity.affinityMap, ip) +func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) { + stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes + for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap { + if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes { + glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes) + delete(lb.serviceDtlMap[service].sessionAffinityMap, key) } } } diff --git a/pkg/proxy/roundrobin_test.go b/pkg/proxy/roundrobin_test.go index 0861c5ec72e..3298aa92bd5 100644 --- a/pkg/proxy/roundrobin_test.go +++ b/pkg/proxy/roundrobin_test.go @@ -24,29 +24,29 @@ import ( ) func TestValidateWorks(t *testing.T) { - if isValidEndpoint(&hostPortPair{}) { + if isValidEndpoint(&api.Endpoint{}) { t.Errorf("Didn't fail for empty string") } - if isValidEndpoint(&hostPortPair{host: "foobar"}) { + if isValidEndpoint(&api.Endpoint{IP: "foobar"}) { t.Errorf("Didn't fail with no port") } - if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) { + if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) { t.Errorf("Didn't fail with a negative port") } - if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) { + if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) { t.Errorf("Failed a valid config.") } } func TestFilterWorks(t *testing.T) { - endpoints := []hostPortPair{ - {host: "foobar", port: 1}, - {host: "foobar", port: 2}, - {host: "foobar", port: -1}, - {host: "foobar", port: 3}, - {host: "foobar", port: -2}, + endpoints := []api.Endpoint{ + {IP: "foobar", Port: 1}, + {IP: "foobar", Port: 2}, + {IP: "foobar", Port: -1}, + {IP: "foobar", Port: 3}, + {IP: "foobar", Port: -2}, } - filtered := getValidEndpoints(endpoints) + filtered := filterValidEndpoints(endpoints) if len(filtered) != 3 { t.Errorf("Failed to filter to the correct size") @@ -66,7 +66,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() var endpoints []api.Endpoints loadBalancer.OnUpdate(endpoints) - endpoint, err := loadBalancer.NextEndpoint("foo", "http", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil { t.Errorf("Didn't fail with non-existent service") } @@ -75,8 +75,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) { } } -func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, port string, expected string, netaddr net.Addr) { - endpoint, err := loadBalancer.NextEndpoint(service, port, netaddr) +func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) { + endpoint, err := loadBalancer.NextEndpoint(service, netaddr) if err != nil { t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err) } @@ -87,41 +87,25 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 40}}}}, + Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil) -} - -func stringsInSlice(haystack []string, needles ...string) bool { - for _, needle := range needles { - found := false - for i := range haystack { - if haystack[i] == needle { - found = true - break - } - } - if found == false { - return false - } - } - return true + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) + expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil) } func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -129,77 +113,22 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } loadBalancer.OnUpdate(endpoints) - - shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) -} - -func TestLoadBalanceWorksWithMultipleEndpointsAndPorts(t *testing.T) { - loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) - if err == nil || len(endpoint) != 0 { - t.Errorf("Didn't fail with non-existent service") - } - endpoints := make([]api.Endpoints, 1) - endpoints[0] = api.Endpoints{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 1}, - {Name: "q", Port: 10}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 2}, - {Name: "q", Port: 20}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 3}, - {Name: "q", Port: 30}, - }}, - }, - } - loadBalancer.OnUpdate(endpoints) - - shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) - - shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) + shuffledEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) } func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -207,86 +136,37 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 1}, - {Name: "q", Port: 10}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 2}, - {Name: "q", Port: 20}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 3}, - {Name: "q", Port: 30}, - }}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } loadBalancer.OnUpdate(endpoints) - - shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - - shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) - + shuffledEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 8}, - {Name: "q", Port: 80}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 9}, - {Name: "q", Port: 90}, - }}, + {IP: "endpoint", Port: 8}, + {IP: "endpoint", Port: 9}, }, } loadBalancer.OnUpdate(endpoints) - - shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:8", "endpoint:9") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil) - - shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:80", "endpoint:90") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil) - + shuffledEndpoints = loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil) // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) - if err == nil || len(endpoint) != 0 { - t.Errorf("Didn't fail with non-existent service") - } - - endpoint, err = loadBalancer.NextEndpoint("foo", "q", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -294,7 +174,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -302,183 +182,133 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) { endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 1}, - {Name: "q", Port: 10}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 2}, - {Name: "q", Port: 20}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 3}, - {Name: "q", Port: 30}, - }}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 4}, - {Name: "q", Port: 40}, - }}, - {IP: "endpoint", Ports: []api.EndpointPort{ - {Name: "p", Port: 5}, - {Name: "q", Port: 50}, - }}, + {IP: "endpoint", Port: 4}, + {IP: "endpoint", Port: 5}, }, } loadBalancer.OnUpdate(endpoints) + shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil) - shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledFooEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil) - - shuffledFooEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints - if !stringsInSlice(shuffledFooEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") { - t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[2], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil) - - shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints - if !stringsInSlice(shuffledBarEndpoints, "endpoint:4", "endpoint:5") { - t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints) - } - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil) - - shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "q")].endpoints - if !stringsInSlice(shuffledBarEndpoints, "endpoint:40", "endpoint:50") { - t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints) - } - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil) - expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil) } func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - }, + Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}}, } loadBalancer.OnUpdate(endpoints) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2) - expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) + expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2) } -func TestStickyLoadBalanceWorksWithMultipleEndpoints(t *testing.T) { +func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + shuffledEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) } -func TestStickyLoadBalanceWorksWithMultipleEndpointsStickyNone(t *testing.T) { +func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", "p", api.AffinityTypeNone, 0) + loadBalancer.NewService("foo", api.AffinityTypeNone, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1) + shuffledEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1) } -func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { +func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0} client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} @@ -486,44 +316,41 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0} client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) + shuffledEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) client1Endpoint := shuffledEndpoints[0] - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) client2Endpoint := shuffledEndpoints[1] - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) client3Endpoint := shuffledEndpoints[2] endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints + shuffledEndpoints = loadBalancer.endpointsMap["foo"] if client1Endpoint == "endpoint:3" { client1Endpoint = shuffledEndpoints[0] } else if client2Endpoint == "endpoint:3" { @@ -531,26 +358,26 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) { } else if client3Endpoint == "endpoint:3" { client3Endpoint = shuffledEndpoints[0] } - expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3) + expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) + expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) + expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 4}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1) - expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2) - expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client4) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client5) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client6) + shuffledEndpoints = loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1) + expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2) + expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6) } func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { @@ -558,54 +385,51 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 1) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") { - t.Errorf("did not find expected endpoints: %v", shuffledEndpoints) - } - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + shuffledEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) // Then update the configuration with one fewer endpoints, make sure // we start in the beginning again endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}}, + {IP: "endpoint", Port: 4}, + {IP: "endpoint", Port: 5}, }, } loadBalancer.OnUpdate(endpoints) - shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2) + shuffledEndpoints = loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2) // Clear endpoints endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}} loadBalancer.OnUpdate(endpoints) - endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } @@ -616,58 +440,58 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) { client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0} client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0} loadBalancer := NewLoadBalancerRR() - endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err := loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } - loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0) endpoints := make([]api.Endpoints, 2) endpoints[0] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}}, + {IP: "endpoint", Port: 1}, + {IP: "endpoint", Port: 2}, + {IP: "endpoint", Port: 3}, }, } - loadBalancer.NewService("bar", "p", api.AffinityTypeClientIP, 0) + loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0) endpoints[1] = api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "bar"}, Endpoints: []api.Endpoint{ - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}}, - {IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 6}}}, + {IP: "endpoint", Port: 5}, + {IP: "endpoint", Port: 5}, }, } loadBalancer.OnUpdate(endpoints) - shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) + shuffledFooEndpoints := loadBalancer.endpointsMap["foo"] + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) - shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1) + shuffledBarEndpoints := loadBalancer.endpointsMap["bar"] + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1) // Then update the configuration by removing foo loadBalancer.OnUpdate(endpoints[1:]) - endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil) + endpoint, err = loadBalancer.NextEndpoint("foo", nil) if err == nil || len(endpoint) != 0 { t.Errorf("Didn't fail with non-existent service") } // but bar is still there, and we continue RR from where we left off. - shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "p")].endpoints - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) - expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1) + shuffledBarEndpoints = loadBalancer.endpointsMap["bar"] + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) + expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1) } diff --git a/pkg/registry/controller/rest_test.go b/pkg/registry/controller/rest_test.go index 4db4c432da9..9616c654210 100644 --- a/pkg/registry/controller/rest_test.go +++ b/pkg/registry/controller/rest_test.go @@ -171,7 +171,7 @@ func TestControllerParsing(t *testing.T) { Containers: []api.Container{ { Image: "dockerfile/nginx", - Ports: []api.ContainerPort{ + Ports: []api.Port{ { ContainerPort: 80, HostPort: 8080, diff --git a/pkg/registry/endpoint/rest_test.go b/pkg/registry/endpoint/rest_test.go index 7deea343ae2..99e00798844 100644 --- a/pkg/registry/endpoint/rest_test.go +++ b/pkg/registry/endpoint/rest_test.go @@ -27,20 +27,10 @@ import ( ) func TestGetEndpoints(t *testing.T) { - expected := []api.Endpoint{ - {IP: "127.0.0.1", Ports: []api.EndpointPort{ - {Name: "p", Port: 9000, Protocol: api.ProtocolTCP}, - {Name: "q", Port: 9000, Protocol: api.ProtocolUDP}, - }}, - {IP: "127.0.0.2", Ports: []api.EndpointPort{ - {Name: "p", Port: 8000, Protocol: api.ProtocolTCP}, - {Name: "q", Port: 8000, Protocol: api.ProtocolUDP}, - }}, - } registry := ®istrytest.ServiceRegistry{ Endpoints: api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: expected, + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, }, } storage := NewREST(registry) @@ -49,7 +39,7 @@ func TestGetEndpoints(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %#v", err) } - if !reflect.DeepEqual(expected, obj.(*api.Endpoints).Endpoints) { + if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) { t.Errorf("unexpected endpoints: %#v", obj) } } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index cc6d77cbc0e..2384fb76a3f 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -24,8 +24,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -600,10 +598,10 @@ func TestEtcdListEndpoints(t *testing.T) { Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Name: "p", Port: 8345, Protocol: api.ProtocolTCP}}}}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}), }, { - Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}), + Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}), }, }, }, @@ -627,7 +625,8 @@ func TestEtcdGetEndpoints(t *testing.T) { registry := NewTestEtcdRegistry(fakeClient) endpoints := &api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 34855, Protocol: api.ProtocolTCP}}}}, + Protocol: "TCP", + Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}}, } key, _ := makeServiceEndpointsKey(ctx, "foo") @@ -648,19 +647,10 @@ func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true registry := NewTestEtcdRegistry(fakeClient) - - // TODO: Once we drop single-port APIs, make this test use the - // multi-port features. This will force a compile error when those APIs - // are deleted. - _ = v1beta1.Dependency - _ = v1beta2.Dependency - endpoints := api.Endpoints{ ObjectMeta: api.ObjectMeta{Name: "foo"}, - Endpoints: []api.Endpoint{ - {IP: "baz", Ports: []api.EndpointPort{{Port: 1, Protocol: api.ProtocolTCP}}}, - {IP: "bar", Ports: []api.EndpointPort{{Port: 2, Protocol: api.ProtocolTCP}}}, - }, + Protocol: "TCP", + Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}}, } key, _ := makeServiceEndpointsKey(ctx, "foo") diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index 449d99d9bf0..cb9d1d4b4a1 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -592,7 +592,7 @@ func TestResourceLocation(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.PodSpec{ Containers: []api.Container{ - {Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, + {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, }, }, }, @@ -604,7 +604,7 @@ func TestResourceLocation(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.PodSpec{ Containers: []api.Container{ - {Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, + {Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}}, }, }, }, @@ -617,7 +617,7 @@ func TestResourceLocation(t *testing.T) { Spec: api.PodSpec{ Containers: []api.Container{ {Name: "ctr1"}, - {Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, + {Name: "ctr2", Ports: []api.Port{{ContainerPort: 9376}}}, }, }, }, @@ -629,8 +629,8 @@ func TestResourceLocation(t *testing.T) { ObjectMeta: api.ObjectMeta{Name: "foo"}, Spec: api.PodSpec{ Containers: []api.Container{ - {Name: "ctr1", Ports: []api.ContainerPort{{ContainerPort: 9376}}}, - {Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 1234}}}, + {Name: "ctr1", Ports: []api.Port{{ContainerPort: 9376}}}, + {Name: "ctr2", Ports: []api.Port{{ContainerPort: 1234}}}, }, }, }, diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index b000063e7a2..c3690f61431 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -21,7 +21,6 @@ import ( "math/rand" "net" "strconv" - "strings" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" @@ -219,57 +218,17 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo // ResourceLocation returns a URL to which one can send traffic for the specified service. func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) { - // Allow ID as "svcname" or "svcname:port". Choose an endpoint at - // random. If the port is specified as a number, use that value - // directly. If the port is specified as a name, try to look up that - // name on the chosen endpoint. If port is not specified, try to use - // the first unnamed port on the chosen endpoint. If there are no - // unnamed ports, try to use the first defined port. - parts := strings.Split(id, ":") - if len(parts) > 2 { - return "", errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id)) - } - name := parts[0] - port := "" - if len(parts) == 2 { - port = parts[1] - } - - eps, err := rs.registry.GetEndpoints(ctx, name) + eps, err := rs.registry.GetEndpoints(ctx, id) if err != nil { return "", err } if len(eps.Endpoints) == 0 { - return "", fmt.Errorf("no endpoints available for %v", name) + return "", fmt.Errorf("no endpoints available for %v", id) } - ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] - - // Try to figure out a port. - if _, err := strconv.Atoi(port); err != nil { - // Do nothing - port is correct as is. - } else { - // Try a name lookup, even if name is "". - for i := range ep.Ports { - if ep.Ports[i].Name == port { - port = strconv.Itoa(ep.Ports[i].Port) - break - } - } - } - if port == "" { - // Still nothing - try the first defined port. - if len(ep.Ports) > 0 { - port = strconv.Itoa(ep.Ports[0].Port) - } - } - // We leave off the scheme ('http://') because we have no idea what sort of server // is listening at this endpoint. - loc := ep.IP - if port != "" { - loc += fmt.Sprintf(":%s", port) - } - return loc, nil + ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))] + return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil } func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error { diff --git a/pkg/registry/service/rest_test.go b/pkg/registry/service/rest_test.go index e001abbb23f..8426ef4134a 100644 --- a/pkg/registry/service/rest_test.go +++ b/pkg/registry/service/rest_test.go @@ -370,7 +370,7 @@ func TestServiceRegistryGet(t *testing.T) { func TestServiceRegistryResourceLocation(t *testing.T) { ctx := api.NewDefaultContext() registry := registrytest.NewServiceRegistry() - registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Ports: []api.EndpointPort{{Port: 80}}}}} + registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}} fakeCloud := &cloud.FakeCloud{} machines := []string{"foo", "bar", "baz"} storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t)) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e0c652a76f2..7cd296e1592 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -60,9 +60,9 @@ func (st *schedulerTester) expectFailure(pod api.Pod) { } func newPod(host string, hostPorts ...int) api.Pod { - networkPorts := []api.ContainerPort{} + networkPorts := []api.Port{} for _, port := range hostPorts { - networkPorts = append(networkPorts, api.ContainerPort{HostPort: port}) + networkPorts = append(networkPorts, api.Port{HostPort: port}) } return api.Pod{ Status: api.PodStatus{ diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 870880db3be..3f42a9d0d8b 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -87,11 +87,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { continue } - // TODO: Add multiple-ports to Service and expose them here. - endpoints = append(endpoints, api.Endpoint{ - IP: pod.Status.PodIP, - Ports: []api.EndpointPort{{Name: "", Protocol: service.Spec.Protocol, Port: port}}, - }) + endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port}) } currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) if err != nil { @@ -100,6 +96,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { ObjectMeta: api.ObjectMeta{ Name: service.Name, }, + Protocol: service.Spec.Protocol, } } else { glog.Errorf("Error getting endpoints: %v", err) @@ -115,7 +112,7 @@ func (e *EndpointController) SyncServiceEndpoints() error { _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) } else { // Pre-existing - if endpointsEqual(currentEndpoints, endpoints) { + if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) { glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) continue } @@ -129,27 +126,12 @@ func (e *EndpointController) SyncServiceEndpoints() error { return resultErr } -// TODO: It would be nice if we had a util function that reflectively compared -// two slices for order-insensitive equivalence. -func portsEqual(lhs, rhs []api.EndpointPort) bool { - if len(lhs) != len(rhs) { - return false - } - for i := range lhs { - if lhs[i] != rhs[i] { - return false - } - } - return true -} - func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool { if haystack == nil || needle == nil { return false } for ix := range haystack.Endpoints { - haystackEP := &haystack.Endpoints[ix] - if haystackEP.IP == needle.IP && portsEqual(haystackEP.Ports, needle.Ports) { + if haystack.Endpoints[ix] == *needle { return true } } diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 72336b53d88..9476f8df5ff 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -39,7 +39,7 @@ func newPodList(count int) *api.PodList { Spec: api.PodSpec{ Containers: []api.Container{ { - Ports: []api.ContainerPort{ + Ports: []api.Port{ { ContainerPort: 8080, }, @@ -69,7 +69,7 @@ func TestFindPort(t *testing.T) { Spec: api.PodSpec{ Containers: []api.Container{ { - Ports: []api.ContainerPort{ + Ports: []api.Port{ { Name: "foo", ContainerPort: 8080, @@ -90,7 +90,7 @@ func TestFindPort(t *testing.T) { Spec: api.PodSpec{ Containers: []api.Container{ { - Ports: []api.ContainerPort{}, + Ports: []api.Port{}, }, }, }, @@ -245,7 +245,8 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -276,7 +277,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -307,7 +309,8 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolUDP, Port: 1000}}}}, + Protocol: api.ProtocolUDP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -337,6 +340,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, + Protocol: api.ProtocolTCP, Endpoints: []api.Endpoint{}, }}) defer testServer.Close() @@ -350,7 +354,8 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data) } @@ -376,7 +381,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -389,7 +395,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", ResourceVersion: "1", }, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data) } @@ -414,7 +421,8 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", }, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) @@ -452,7 +460,8 @@ func TestSyncEndpointsItems(t *testing.T) { ObjectMeta: api.ObjectMeta{ ResourceVersion: "", }, - Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}}, + Protocol: api.ProtocolTCP, + Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}}, }) endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data) } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index f9f09587836..9685898d6bb 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -308,7 +308,7 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), CreatedIndex: 1, ModifiedIndex: 2, }, @@ -321,7 +321,7 @@ func TestWatchEtcdState(t *testing.T) { }, From: 1, Expected: []*T{ - {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, }, }, "from initial state": { @@ -343,7 +343,7 @@ func TestWatchEtcdState(t *testing.T) { { Action: "compareAndSwap", Node: &etcd.Node{ - Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})), + Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})), CreatedIndex: 1, ModifiedIndex: 2, }, @@ -356,7 +356,7 @@ func TestWatchEtcdState(t *testing.T) { }, Expected: []*T{ {watch.Added, nil}, - {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}}, + {watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}}, }, }, } diff --git a/test/e2e/events.go b/test/e2e/events.go index 90f0de3a6b8..0f7f896f2ea 100644 --- a/test/e2e/events.go +++ b/test/e2e/events.go @@ -64,7 +64,7 @@ var _ = Describe("Events", func() { { Name: "p", Image: "kubernetes/serve_hostname", - Ports: []api.ContainerPort{{ContainerPort: 80}}, + Ports: []api.Port{{ContainerPort: 80}}, }, }, }, diff --git a/test/e2e/networking.go b/test/e2e/networking.go index 34de4720cb9..367d7300af4 100644 --- a/test/e2e/networking.go +++ b/test/e2e/networking.go @@ -109,7 +109,7 @@ var _ = Describe("Networking", func() { Name: "webserver", Image: "kubernetes/nettest:latest", Command: []string{"-service=" + name}, - Ports: []api.ContainerPort{{ContainerPort: 8080}}, + Ports: []api.Port{{ContainerPort: 8080}}, }, }, }, diff --git a/test/e2e/pods.go b/test/e2e/pods.go index 89e8f8f3cf2..717e3e87929 100644 --- a/test/e2e/pods.go +++ b/test/e2e/pods.go @@ -108,7 +108,7 @@ var _ = Describe("Pods", func() { { Name: "nginx", Image: "dockerfile/nginx", - Ports: []api.ContainerPort{{ContainerPort: 80}}, + Ports: []api.Port{{ContainerPort: 80}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ HTTPGet: &api.HTTPGetAction{ @@ -165,7 +165,7 @@ var _ = Describe("Pods", func() { { Name: "nginx", Image: "dockerfile/nginx", - Ports: []api.ContainerPort{{ContainerPort: 80}}, + Ports: []api.Port{{ContainerPort: 80}}, LivenessProbe: &api.Probe{ Handler: api.Handler{ HTTPGet: &api.HTTPGetAction{ @@ -236,7 +236,7 @@ var _ = Describe("Pods", func() { { Name: "srv", Image: "kubernetes/serve_hostname", - Ports: []api.ContainerPort{{ContainerPort: 9376}}, + Ports: []api.Port{{ContainerPort: 9376}}, }, }, }, diff --git a/test/e2e/rc.go b/test/e2e/rc.go index d0da049edbc..cb9e8a3a61c 100644 --- a/test/e2e/rc.go +++ b/test/e2e/rc.go @@ -86,7 +86,7 @@ func ServeImageOrFail(c *client.Client, test string, image string) { { Name: name, Image: image, - Ports: []api.ContainerPort{{ContainerPort: 9376, HostPort: 8080}}, + Ports: []api.Port{{ContainerPort: 9376, HostPort: 8080}}, }, }, }, diff --git a/test/e2e/service.go b/test/e2e/service.go index 554f4c3fc93..30707b2bb8b 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -250,11 +250,8 @@ var _ = Describe("Services", func() { func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) { ips := util.StringSet{} for _, ep := range endpoints.Endpoints { - if len(ep.Ports) == 0 { - Fail(fmt.Sprintf("invalid endpoint, no ports")) - } - if ep.Ports[0].Port != expectedPort { - Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Ports[0].Port)) + if ep.Port != expectedPort { + Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Port)) } ips.Insert(ep.IP) } @@ -299,7 +296,7 @@ func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]s { Name: "test", Image: "kubernetes/pause", - Ports: []api.ContainerPort{{ContainerPort: 80}}, + Ports: []api.Port{{ContainerPort: 80}}, }, }, },