Revert "Multi-port Endpoints"

This commit is contained in:
Daniel Smith
2015-02-23 13:53:21 -08:00
parent eed36455a7
commit 650f6cb826
52 changed files with 525 additions and 938 deletions

View File

@@ -538,7 +538,7 @@ func runServiceTest(client *client.Client) {
{
Name: "c1",
Image: "foo",
Ports: []api.ContainerPort{
Ports: []api.Port{
{ContainerPort: 1234},
},
ImagePullPolicy: "PullIfNotPresent",

View File

@@ -24,8 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@@ -240,13 +238,7 @@ func FuzzerFor(t *testing.T, version string, src rand.Source) *fuzz.Fuzzer {
func(ep *api.Endpoint, c fuzz.Continue) {
// TODO: If our API used a particular type for IP fields we could just catch that here.
ep.IP = fmt.Sprintf("%d.%d.%d.%d", c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256), c.Rand.Intn(256))
// TODO: Once we drop single-port APIs, make this fuzz
// multiple ports and fuzz port.name. This will force
// a compile error when those APIs are deleted.
_ = v1beta1.Dependency
_ = v1beta2.Dependency
ep.Ports = []api.EndpointPort{{Name: "", Port: c.Rand.Intn(65536)}}
c.Fuzz(&ep.Ports[0].Protocol)
ep.Port = c.Rand.Intn(65536)
},
)
return f

View File

@@ -239,8 +239,8 @@ type SecretVolumeSource struct {
Target ObjectReference `json:"target"`
}
// ContainerPort represents a network port in a single container
type ContainerPort struct {
// Port represents a network port in a single container
type Port struct {
// Optional: If specified, this must be a DNS_LABEL. Each named port
// in a pod must have a unique name.
Name string `json:"name,omitempty"`
@@ -347,7 +347,7 @@ type Container struct {
Command []string `json:"command,omitempty"`
// Optional: Defaults to Docker's default.
WorkingDir string `json:"workingDir,omitempty"`
Ports []ContainerPort `json:"ports,omitempty"`
Ports []Port `json:"ports,omitempty"`
Env []EnvVar `json:"env,omitempty"`
// Compute resource requirements.
Resources ResourceRequirements `json:"resources,omitempty"`
@@ -750,6 +750,9 @@ type Endpoints struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
Endpoints []Endpoint `json:"endpoints,omitempty"`
}
@@ -759,21 +762,8 @@ type Endpoint struct {
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip"`
// The ports exposed on this IP.
Ports []EndpointPort
}
type EndpointPort struct {
// Optional if only one port is defined in this Endpoint.
// The name of this port within the larger service/endpoint structure.
// This must be a DNS_LABEL.
Name string
// The IP protocol for this port. Supports "TCP" and "UDP".
Protocol Protocol
// The destination port to access.
Port int
// Required: The destination port to access.
Port int `json:"port"`
}
// EndpointsList is a list of endpoints.

View File

@@ -1123,16 +1123,12 @@ func init() {
if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil {
return err
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
// newer.Endpoints.Endpoints[i].Ports is an array - take the first one.
if len(ep.Ports) > 0 {
port := &ep.Ports[0]
if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil {
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port)))
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
return nil
},
@@ -1143,20 +1139,22 @@ func init() {
if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
out.Endpoints = append(out.Endpoints, newer.Endpoint{})
ep := &out.Endpoints[i]
host, port, err := net.SplitHostPort(in.Endpoints[i])
if err != nil {
return err
}
ep.IP = host
pn, err := strconv.Atoi(port)
if err != nil {
return err
}
epp := newer.EndpointPort{Port: pn}
if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}})
ep.Port = pn
}
return nil
},

View File

@@ -390,35 +390,41 @@ func TestEndpointsConversion(t *testing.T) {
}{
{
given: current.Endpoints{
Protocol: "",
TypeMeta: current.TypeMeta{
ID: "empty",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "one",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{"1.2.3.4:88"},
},
expected: newer.Endpoints{
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}},
},
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "several",
},
Protocol: current.ProtocolUDP,
Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"},
},
expected: newer.Endpoints{
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}},
},
Protocol: newer.ProtocolUDP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}},
},
},
}
@@ -430,7 +436,7 @@ func TestEndpointsConversion(t *testing.T) {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got)
}

View File

@@ -32,7 +32,7 @@ func init() {
}
}
},
func(obj *ContainerPort) {
func(obj *Port) {
if obj.Protocol == "" {
obj.Protocol = ProtocolTCP
}
@@ -86,7 +86,7 @@ func init() {
}
},
func(obj *Endpoints) {
if obj.Protocol == "" && len(obj.Endpoints) > 0 {
if obj.Protocol == "" {
obj.Protocol = "TCP"
}
},

View File

@@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) {
func TestSetDefaultContainer(t *testing.T) {
bp := &current.BoundPod{}
bp.Spec.Containers = []current.Container{{}}
bp.Spec.Containers[0].Ports = []current.ContainerPort{{}}
bp.Spec.Containers[0].Ports = []current.Port{{}}
obj2 := roundTrip(t, runtime.Object(bp))
bp2 := obj2.(*current.BoundPod)
@@ -103,18 +103,8 @@ func TestSetDefaultSecret(t *testing.T) {
}
}
func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) {
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != "" {
t.Errorf("Expected protocol \"\", got %s", out.Protocol)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{Endpoints: []string{"1.2.3.4:5678"}}
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)

View File

@@ -24,12 +24,6 @@ import (
// Codec encodes internal objects to the v1beta1 scheme
var Codec = runtime.CodecFor(api.Scheme, "v1beta1")
// Dependency does nothing but give a hook for other packages to force a
// compile-time error when this API version is eventually removed. This is
// useful, for example, to clean up things that are implicitly tied to
// semantics of older APIs.
const Dependency = true
func init() {
api.Scheme.AddKnownTypes("v1beta1",
&Pod{},

View File

@@ -161,8 +161,8 @@ type SecretVolumeSource struct {
Target ObjectReference `json:"target" description:"target is a reference to a secret"`
}
// ContainerPort represents a network port in a single container
type ContainerPort struct {
// Port represents a network port in a single container
type Port struct {
// Optional: If specified, this must be a DNS_LABEL. Each named port
// in a pod must have a unique name.
Name string `json:"name,omitempty" description:"name for the port that can be referred to by services; must be a DNS_LABEL and unique without the pod"`
@@ -283,7 +283,7 @@ type Container struct {
Command []string `json:"command,omitempty" description:"command argv array; not executed within a shell; defaults to entrypoint or command in the image"`
// Optional: Defaults to Docker's default.
WorkingDir string `json:"workingDir,omitempty" description:"container's working directory; defaults to image's default"`
Ports []ContainerPort `json:"ports,omitempty" description:"list of ports to expose from the container"`
Ports []Port `json:"ports,omitempty" description:"list of ports to expose from the container"`
Env []EnvVar `json:"env,omitempty" description:"list of environment variables to set in the container"`
Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"`
// Optional: Defaults to unlimited.

View File

@@ -1038,16 +1038,12 @@ func init() {
if err := s.Convert(&in.ObjectMeta, &out.TypeMeta, 0); err != nil {
return err
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
// newer.Endpoints.Endpoints[i].Ports is an array - take the first one.
if len(ep.Ports) > 0 {
port := &ep.Ports[0]
if err := s.Convert(&port.Protocol, &out.Protocol, 0); err != nil {
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(port.Port)))
}
for i := range in.Endpoints {
ep := &in.Endpoints[i]
out.Endpoints = append(out.Endpoints, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
return nil
},
@@ -1058,20 +1054,22 @@ func init() {
if err := s.Convert(&in.TypeMeta, &out.ObjectMeta, 0); err != nil {
return err
}
if err := s.Convert(&in.Protocol, &out.Protocol, 0); err != nil {
return err
}
for i := range in.Endpoints {
out.Endpoints = append(out.Endpoints, newer.Endpoint{})
ep := &out.Endpoints[i]
host, port, err := net.SplitHostPort(in.Endpoints[i])
if err != nil {
return err
}
ep.IP = host
pn, err := strconv.Atoi(port)
if err != nil {
return err
}
epp := newer.EndpointPort{Port: pn}
if err := s.Convert(&in.Protocol, &epp.Protocol, 0); err != nil {
return err
}
out.Endpoints = append(out.Endpoints, newer.Endpoint{IP: host, Ports: []newer.EndpointPort{epp}})
ep.Port = pn
}
return nil
},

View File

@@ -220,35 +220,41 @@ func TestEndpointsConversion(t *testing.T) {
}{
{
given: current.Endpoints{
Protocol: "",
TypeMeta: current.TypeMeta{
ID: "empty",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{},
},
expected: newer.Endpoints{
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "one",
},
Protocol: current.ProtocolTCP,
Endpoints: []string{"1.2.3.4:88"},
},
expected: newer.Endpoints{
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolTCP, Port: 88}}},
},
Protocol: newer.ProtocolTCP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}},
},
},
{
given: current.Endpoints{
TypeMeta: current.TypeMeta{
ID: "several",
},
Protocol: current.ProtocolUDP,
Endpoints: []string{"1.2.3.4:88", "1.2.3.4:89", "1.2.3.4:90"},
},
expected: newer.Endpoints{
Endpoints: []newer.Endpoint{
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 88}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 89}}},
{IP: "1.2.3.4", Ports: []newer.EndpointPort{{Protocol: newer.ProtocolUDP, Port: 90}}},
},
Protocol: newer.ProtocolUDP,
Endpoints: []newer.Endpoint{{IP: "1.2.3.4", Port: 88}, {IP: "1.2.3.4", Port: 89}, {IP: "1.2.3.4", Port: 90}},
},
},
}
@@ -260,7 +266,7 @@ func TestEndpointsConversion(t *testing.T) {
t.Errorf("[Case: %d] Unexpected error: %v", i, err)
continue
}
if !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
if got.Protocol != tc.expected.Protocol || !newer.Semantic.DeepEqual(got.Endpoints, tc.expected.Endpoints) {
t.Errorf("[Case: %d] Expected %v, got %v", i, tc.expected, got)
}

View File

@@ -34,7 +34,7 @@ func init() {
}
}
},
func(obj *ContainerPort) {
func(obj *Port) {
if obj.Protocol == "" {
obj.Protocol = ProtocolTCP
}
@@ -88,7 +88,7 @@ func init() {
}
},
func(obj *Endpoints) {
if obj.Protocol == "" && len(obj.Endpoints) > 0 {
if obj.Protocol == "" {
obj.Protocol = "TCP"
}
},

View File

@@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) {
func TestSetDefaultContainer(t *testing.T) {
bp := &current.BoundPod{}
bp.Spec.Containers = []current.Container{{}}
bp.Spec.Containers[0].Ports = []current.ContainerPort{{}}
bp.Spec.Containers[0].Ports = []current.Port{{}}
obj2 := roundTrip(t, runtime.Object(bp))
bp2 := obj2.(*current.BoundPod)
@@ -103,18 +103,8 @@ func TestSetDefaultSecret(t *testing.T) {
}
}
func TestSetDefaulEndpointsProtocolEmpty(t *testing.T) {
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Protocol != "" {
t.Errorf("Expected protocol \"\", got %s", out.Protocol)
}
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{Endpoints: []string{"1.2.3.4:5678"}}
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)

View File

@@ -24,12 +24,6 @@ import (
// Codec encodes internal objects to the v1beta2 scheme
var Codec = runtime.CodecFor(api.Scheme, "v1beta2")
// Dependency does nothing but give a hook for other packages to force a
// compile-time error when this API version is eventually removed. This is
// useful, for example, to clean up things that are implicitly tied to
// semantics of older APIs.
const Dependency = true
func init() {
api.Scheme.AddKnownTypes("v1beta2",
&Pod{},

View File

@@ -99,8 +99,8 @@ const (
ProtocolUDP Protocol = "UDP"
)
// ContainerPort represents a network port in a single container.
type ContainerPort struct {
// Port represents a network port in a single container.
type Port struct {
// Optional: If specified, this must be a DNS_LABEL. Each named port
// in a pod must have a unique name.
Name string `json:"name,omitempty" description:"name for the port that can be referred to by services; must be a DNS_LABEL and unique without the pod"`
@@ -242,7 +242,7 @@ type Container struct {
Command []string `json:"command,omitempty" description:"command argv array; not executed within a shell; defaults to entrypoint or command in the image"`
// Optional: Defaults to Docker's default.
WorkingDir string `json:"workingDir,omitempty" description:"container's working directory; defaults to image's default"`
Ports []ContainerPort `json:"ports,omitempty" description:"list of ports to expose from the container"`
Ports []Port `json:"ports,omitempty" description:"list of ports to expose from the container"`
Env []EnvVar `json:"env,omitempty" description:"list of environment variables to set in the container"`
Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"`
// Optional: Defaults to unlimited.

View File

@@ -32,7 +32,7 @@ func init() {
}
}
},
func(obj *ContainerPort) {
func(obj *Port) {
if obj.Protocol == "" {
obj.Protocol = ProtocolTCP
}
@@ -81,14 +81,8 @@ func init() {
}
},
func(obj *Endpoints) {
for i := range obj.Endpoints {
ep := &obj.Endpoints[i]
for j := range ep.Ports {
port := &ep.Ports[j]
if port.Protocol == "" {
port.Protocol = ProtocolTCP
}
}
if obj.Protocol == "" {
obj.Protocol = "TCP"
}
},
)

View File

@@ -73,7 +73,7 @@ func TestSetDefaulPodSpec(t *testing.T) {
func TestSetDefaultContainer(t *testing.T) {
bp := &current.BoundPod{}
bp.Spec.Containers = []current.Container{{}}
bp.Spec.Containers[0].Ports = []current.ContainerPort{{}}
bp.Spec.Containers[0].Ports = []current.Port{{}}
obj2 := roundTrip(t, runtime.Object(bp))
bp2 := obj2.(*current.BoundPod)
@@ -104,25 +104,11 @@ func TestSetDefaultSecret(t *testing.T) {
}
func TestSetDefaulEndpointsProtocol(t *testing.T) {
in := &current.Endpoints{
Endpoints: []current.Endpoint{
{IP: "1.2.3.4", Ports: []current.EndpointPort{
{Protocol: "TCP"},
{Protocol: "UDP"},
{Protocol: ""},
}},
},
}
in := &current.Endpoints{}
obj := roundTrip(t, runtime.Object(in))
out := obj.(*current.Endpoints)
if out.Endpoints[0].Ports[0].Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol[0] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[0].Protocol)
}
if out.Endpoints[0].Ports[1].Protocol != current.ProtocolUDP {
t.Errorf("Expected protocol[1] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[1].Protocol)
}
if out.Endpoints[0].Ports[2].Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol[2] %s, got %s", current.ProtocolTCP, out.Endpoints[0].Ports[2].Protocol)
if out.Protocol != current.ProtocolTCP {
t.Errorf("Expected protocol %s, got %s", current.ProtocolTCP, out.Protocol)
}
}

View File

@@ -254,8 +254,8 @@ type SecretVolumeSource struct {
Target ObjectReference `json:"target" description:"target is a reference to a secret"`
}
// ContainerPort represents a network port in a single container.
type ContainerPort struct {
// Port represents a network port in a single container.
type Port struct {
// Optional: If specified, this must be a DNS_LABEL. Each named port
// in a pod must have a unique name.
Name string `json:"name,omitempty"`
@@ -367,7 +367,7 @@ type Container struct {
Command []string `json:"command,omitempty"`
// Optional: Defaults to Docker's default.
WorkingDir string `json:"workingDir,omitempty"`
Ports []ContainerPort `json:"ports,omitempty"`
Ports []Port `json:"ports,omitempty"`
Env []EnvVar `json:"env,omitempty"`
Resources ResourceRequirements `json:"resources,omitempty" description:"Compute Resources required by this container"`
VolumeMounts []VolumeMount `json:"volumeMounts,omitempty"`
@@ -782,6 +782,10 @@ type Endpoints struct {
TypeMeta `json:",inline"`
ObjectMeta `json:"metadata,omitempty"`
// Optional: The IP protocol for these endpoints. Supports "TCP" and
// "UDP". Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
Endpoints []Endpoint `json:"endpoints,omitempty"`
}
@@ -791,20 +795,6 @@ type Endpoint struct {
// TODO: This should allow hostname or IP, see #4447.
IP string `json:"ip"`
// The ports exposed on this IP.
Ports []EndpointPort `json:"ports,omitempty"`
}
type EndpointPort struct {
// Optional if only one port is defined in this Endpoint, otherwise required.
// The name of this port within the larger service/endpoint structure.
// This must be a DNS_LABEL.
Name string `json:"name,omitempty"`
// Optional: The IP protocol for this port. Supports "TCP" and "UDP".
// Defaults to "TCP".
Protocol Protocol `json:"protocol,omitempty"`
// Required: The destination port to access.
Port int `json:"port"`
}

View File

@@ -325,7 +325,7 @@ func validateSecretVolumeSource(secretSource *api.SecretVolumeSource) errs.Valid
var supportedPortProtocols = util.NewStringSet(string(api.ProtocolTCP), string(api.ProtocolUDP))
func validatePorts(ports []api.ContainerPort) errs.ValidationErrorList {
func validatePorts(ports []api.Port) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
allNames := util.StringSet{}
@@ -410,7 +410,7 @@ func validateProbe(probe *api.Probe) errs.ValidationErrorList {
// AccumulateUniquePorts runs an extraction function on each Port of each Container,
// accumulating the results and returning an error if any ports conflict.
func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, extract func(*api.ContainerPort) int) errs.ValidationErrorList {
func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool, extract func(*api.Port) int) errs.ValidationErrorList {
allErrs := errs.ValidationErrorList{}
for ci, ctr := range containers {
@@ -435,7 +435,7 @@ func AccumulateUniquePorts(containers []api.Container, accumulator map[int]bool,
// a slice of containers.
func checkHostPortConflicts(containers []api.Container) errs.ValidationErrorList {
allPorts := map[int]bool{}
return AccumulateUniquePorts(containers, allPorts, func(p *api.ContainerPort) int { return p.HostPort })
return AccumulateUniquePorts(containers, allPorts, func(p *api.Port) int { return p.HostPort })
}
func validateExecAction(exec *api.ExecAction) errs.ValidationErrorList {

View File

@@ -195,7 +195,7 @@ func TestValidateVolumes(t *testing.T) {
}
func TestValidatePorts(t *testing.T) {
successCase := []api.ContainerPort{
successCase := []api.Port{
{Name: "abc", ContainerPort: 80, HostPort: 80, Protocol: "TCP"},
{Name: "easy", ContainerPort: 82, Protocol: "TCP"},
{Name: "as", ContainerPort: 83, Protocol: "UDP"},
@@ -207,7 +207,7 @@ func TestValidatePorts(t *testing.T) {
t.Errorf("expected success: %v", errs)
}
nonCanonicalCase := []api.ContainerPort{
nonCanonicalCase := []api.Port{
{ContainerPort: 80, Protocol: "TCP"},
}
if errs := validatePorts(nonCanonicalCase); len(errs) != 0 {
@@ -215,22 +215,22 @@ func TestValidatePorts(t *testing.T) {
}
errorCases := map[string]struct {
P []api.ContainerPort
P []api.Port
T errors.ValidationErrorType
F string
D string
}{
"name > 63 characters": {[]api.ContainerPort{{Name: strings.Repeat("a", 64), ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg},
"name not a DNS label": {[]api.ContainerPort{{Name: "a.b.c", ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg},
"name not unique": {[]api.ContainerPort{
"name > 63 characters": {[]api.Port{{Name: strings.Repeat("a", 64), ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg},
"name not a DNS label": {[]api.Port{{Name: "a.b.c", ContainerPort: 80, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].name", dnsLabelErrorMsg},
"name not unique": {[]api.Port{
{Name: "abc", ContainerPort: 80, Protocol: "TCP"},
{Name: "abc", ContainerPort: 81, Protocol: "TCP"},
}, errors.ValidationErrorTypeDuplicate, "[1].name", ""},
"zero container port": {[]api.ContainerPort{{ContainerPort: 0, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg},
"invalid container port": {[]api.ContainerPort{{ContainerPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg},
"invalid host port": {[]api.ContainerPort{{ContainerPort: 80, HostPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].hostPort", portRangeErrorMsg},
"invalid protocol": {[]api.ContainerPort{{ContainerPort: 80, Protocol: "ICMP"}}, errors.ValidationErrorTypeNotSupported, "[0].protocol", ""},
"protocol required": {[]api.ContainerPort{{Name: "abc", ContainerPort: 80}}, errors.ValidationErrorTypeRequired, "[0].protocol", ""},
"zero container port": {[]api.Port{{ContainerPort: 0, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg},
"invalid container port": {[]api.Port{{ContainerPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].containerPort", portRangeErrorMsg},
"invalid host port": {[]api.Port{{ContainerPort: 80, HostPort: 65536, Protocol: "TCP"}}, errors.ValidationErrorTypeInvalid, "[0].hostPort", portRangeErrorMsg},
"invalid protocol": {[]api.Port{{ContainerPort: 80, Protocol: "ICMP"}}, errors.ValidationErrorTypeNotSupported, "[0].protocol", ""},
"protocol required": {[]api.Port{{Name: "abc", ContainerPort: 80}}, errors.ValidationErrorTypeRequired, "[0].protocol", ""},
}
for k, v := range errorCases {
errs := validatePorts(v.P)
@@ -433,9 +433,9 @@ func TestValidateContainers(t *testing.T) {
},
"zero-length image": {{Name: "abc", Image: "", ImagePullPolicy: "IfNotPresent"}},
"host port not unique": {
{Name: "abc", Image: "image", Ports: []api.ContainerPort{{ContainerPort: 80, HostPort: 80, Protocol: "TCP"}},
{Name: "abc", Image: "image", Ports: []api.Port{{ContainerPort: 80, HostPort: 80, Protocol: "TCP"}},
ImagePullPolicy: "IfNotPresent"},
{Name: "def", Image: "image", Ports: []api.ContainerPort{{ContainerPort: 81, HostPort: 80, Protocol: "TCP"}},
{Name: "def", Image: "image", Ports: []api.Port{{ContainerPort: 81, HostPort: 80, Protocol: "TCP"}},
ImagePullPolicy: "IfNotPresent"},
},
"invalid env var name": {
@@ -587,7 +587,7 @@ func TestValidateManifest(t *testing.T) {
"memory": resource.MustParse("1"),
},
},
Ports: []api.ContainerPort{
Ports: []api.Port{
{Name: "p1", ContainerPort: 80, HostPort: 8080, Protocol: "TCP"},
{Name: "p2", ContainerPort: 81, Protocol: "TCP"},
{ContainerPort: 82, Protocol: "TCP"},
@@ -934,7 +934,7 @@ func TestValidatePodUpdate(t *testing.T) {
Containers: []api.Container{
{
Image: "foo:V1",
Ports: []api.ContainerPort{
Ports: []api.Port{
{HostPort: 8080, ContainerPort: 80},
},
},
@@ -947,7 +947,7 @@ func TestValidatePodUpdate(t *testing.T) {
Containers: []api.Container{
{
Image: "foo:V2",
Ports: []api.ContainerPort{
Ports: []api.Port{
{HostPort: 8000, ContainerPort: 80},
},
},

View File

@@ -616,9 +616,7 @@ func TestListEndpooints(t *testing.T) {
{
ObjectMeta: api.ObjectMeta{Name: "endpoint-1"},
Endpoints: []api.Endpoint{
{IP: "10.245.1.2", Ports: []api.EndpointPort{{Port: 8080}}},
{IP: "10.245.1.3", Ports: []api.EndpointPort{{Port: 8080}}},
},
{IP: "10.245.1.2", Port: 8080}, {IP: "10.245.1.3", Port: 8080}},
},
},
},

View File

@@ -26,7 +26,7 @@ import (
func containerWithHostPorts(ports ...int) api.Container {
c := api.Container{}
for _, p := range ports {
c.Ports = append(c.Ports, api.ContainerPort{HostPort: p})
c.Ports = append(c.Ports, api.Port{HostPort: p})
}
return c
}

View File

@@ -22,8 +22,10 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"reflect"
"sort"
"strconv"
"strings"
"text/tabwriter"
"text/template"
@@ -268,11 +270,10 @@ func formatEndpoints(endpoints []api.Endpoint) string {
return "<none>"
}
list := []string{}
//FIXME: What do we want to print, now that endpoints are more complex?
//for i := range endpoints {
// ep := &endpoints[i]
// list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
//}
for i := range endpoints {
ep := &endpoints[i]
list = append(list, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
return strings.Join(list, ",")
}

View File

@@ -452,10 +452,7 @@ func TestPrinters(t *testing.T) {
"pod": &api.Pod{ObjectMeta: om("pod")},
"emptyPodList": &api.PodList{},
"nonEmptyPodList": &api.PodList{Items: []api.Pod{{}}},
"endpoints": &api.Endpoints{Endpoints: []api.Endpoint{
{IP: "127.0.0.1"},
{IP: "localhost", Ports: []api.EndpointPort{{Port: 8080}}},
}},
"endpoints": &api.Endpoints{Endpoints: []api.Endpoint{{IP: "127.0.0.1"}, {IP: "localhost", Port: 8080}}},
}
// map of printer name to set of objects it should fail on.
expectedErrors := map[string]util.StringSet{

View File

@@ -82,7 +82,7 @@ func (BasicReplicationController) Generate(params map[string]string) (runtime.Ob
// Don't include the port if it was not specified.
if port > 0 {
controller.Spec.Template.Spec.Containers[0].Ports = []api.ContainerPort{
controller.Spec.Template.Spec.Containers[0].Ports = []api.Port{
{
ContainerPort: port,
},

View File

@@ -84,7 +84,7 @@ func TestGenerate(t *testing.T) {
{
Name: "foo",
Image: "someimage",
Ports: []api.ContainerPort{
Ports: []api.Port{
{
ContainerPort: 80,
},

View File

@@ -38,7 +38,7 @@ func TestResolvePortString(t *testing.T) {
expected := 80
name := "foo"
container := &api.Container{
Ports: []api.ContainerPort{
Ports: []api.Port{
{Name: name, ContainerPort: expected},
},
}
@@ -55,7 +55,7 @@ func TestResolvePortStringUnknown(t *testing.T) {
expected := 80
name := "foo"
container := &api.Container{
Ports: []api.ContainerPort{
Ports: []api.Port{
{Name: "bar", ContainerPort: expected},
},
}

View File

@@ -917,7 +917,7 @@ const (
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
func (kl *Kubelet) createPodInfraContainer(pod *api.BoundPod) (dockertools.DockerID, error) {
var ports []api.ContainerPort
var ports []api.Port
// Docker only exports ports from the pod infra container. Let's
// collect all of the relevant ports and export them.
for _, container := range pod.Spec.Containers {
@@ -1411,7 +1411,7 @@ func updateBoundPods(changed []api.BoundPod, current []api.BoundPod) []api.Bound
func filterHostPortConflicts(pods []api.BoundPod) []api.BoundPod {
filtered := []api.BoundPod{}
ports := map[int]bool{}
extract := func(p *api.ContainerPort) int { return p.HostPort }
extract := func(p *api.Port) int { return p.HostPort }
for i := range pods {
pod := &pods[i]
if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 {

View File

@@ -1136,7 +1136,7 @@ func TestMakeVolumesAndBinds(t *testing.T) {
func TestMakePortsAndBindings(t *testing.T) {
container := api.Container{
Ports: []api.ContainerPort{
Ports: []api.Port{
{
ContainerPort: 80,
HostPort: 8080,
@@ -1200,12 +1200,12 @@ func TestMakePortsAndBindings(t *testing.T) {
func TestCheckHostPortConflicts(t *testing.T) {
successCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
successCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}},
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 83}}}}},
}
expected := append(successCaseAll, successCaseNew)
if actual := filterHostPortConflicts(expected); !reflect.DeepEqual(actual, expected) {
@@ -1213,12 +1213,12 @@ func TestCheckHostPortConflicts(t *testing.T) {
}
failureCaseAll := []api.BoundPod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 82}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 82}}}}}},
}
failureCaseNew := api.BoundPod{
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}},
Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.Port{{HostPort: 81}}}}},
}
if actual := filterHostPortConflicts(append(failureCaseAll, failureCaseNew)); !reflect.DeepEqual(failureCaseAll, actual) {
t.Errorf("Expected %#v, Got %#v", expected, actual)

View File

@@ -32,7 +32,7 @@ import (
func TestFindPortByName(t *testing.T) {
container := api.Container{
Ports: []api.ContainerPort{
Ports: []api.Port{
{
Name: "foo",
HostPort: 8080,
@@ -71,7 +71,7 @@ func TestGetURLParts(t *testing.T) {
for _, test := range testCases {
state := api.PodStatus{PodIP: "127.0.0.1"}
container := api.Container{
Ports: []api.ContainerPort{{Name: "found", HostPort: 93}},
Ports: []api.Port{{Name: "found", HostPort: 93}},
LivenessProbe: &api.Probe{
Handler: api.Handler{
HTTPGet: test.probe,
@@ -114,7 +114,7 @@ func TestGetTCPAddrParts(t *testing.T) {
for _, test := range testCases {
host := "1.2.3.4"
container := api.Container{
Ports: []api.ContainerPort{{Name: "found", HostPort: 93}},
Ports: []api.Port{{Name: "found", HostPort: 93}},
LivenessProbe: &api.Probe{
Handler: api.Handler{
TCPSocket: test.probe,

View File

@@ -128,35 +128,25 @@ func (m *Master) createMasterServiceIfNeeded(serviceName string, serviceIP net.I
func (m *Master) ensureEndpointsContain(serviceName string, ip net.IP, port int) error {
ctx := api.NewDefaultContext()
e, err := m.endpointRegistry.GetEndpoints(ctx, serviceName)
if err != nil {
if err != nil || e.Protocol != api.ProtocolTCP {
e = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: serviceName,
Namespace: api.NamespaceDefault,
},
Protocol: api.ProtocolTCP,
}
}
found := false
FindEndpointLoop:
for i := range e.Endpoints {
ep := &e.Endpoints[i]
if ep.IP == ip.String() {
for j := range ep.Ports {
epp := &ep.Ports[j]
if epp.Protocol == api.ProtocolTCP && epp.Port == port {
if ep.IP == ip.String() && ep.Port == port {
found = true
break FindEndpointLoop
}
}
break
}
}
if !found {
e.Endpoints = append(e.Endpoints, api.Endpoint{
IP: ip.String(),
Ports: []api.EndpointPort{
{Protocol: api.ProtocolTCP, Port: port},
},
})
e.Endpoints = append(e.Endpoints, api.Endpoint{IP: ip.String(), Port: port})
}
if len(e.Endpoints) > m.masterCount {
// We append to the end and remove from the beginning, so this should

View File

@@ -185,7 +185,7 @@ func TestServicesFromZeroError(t *testing.T) {
func TestEndpoints(t *testing.T) {
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
}
fakeWatch := watch.NewFake()
@@ -235,7 +235,7 @@ func TestEndpoints(t *testing.T) {
func TestEndpointsFromZero(t *testing.T) {
endpoint := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar", ResourceVersion: "2"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
}
fakeWatch := watch.NewFake()

View File

@@ -25,8 +25,8 @@ import (
// LoadBalancer is an interface for distributing incoming requests to service endpoints.
type LoadBalancer interface {
// NextEndpoint returns the endpoint to handle a request for the given
// serviceName:portName and source address.
NextEndpoint(service string, port string, srcAddr net.Addr) (string, error)
NewService(service string, port string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service string, port string)
// service and source address.
NextEndpoint(service string, srcAddr net.Addr) (string, error)
NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error
CleanupStaleStickySessions(service string)
}

View File

@@ -69,8 +69,7 @@ type tcpProxySocket struct {
func tryConnect(service string, srcAddr net.Addr, protocol string, proxier *Proxier) (out net.Conn, err error) {
for _, retryTimeout := range endpointDialTimeout {
// TODO: support multiple service ports
endpoint, err := proxier.loadBalancer.NextEndpoint(service, "", srcAddr)
endpoint, err := proxier.loadBalancer.NextEndpoint(service, srcAddr)
if err != nil {
glog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
return nil, err
@@ -384,8 +383,7 @@ func (proxier *Proxier) ensurePortals() {
func (proxier *Proxier) cleanupStaleStickySessions() {
for name, info := range proxier.serviceMap {
if info.sessionAffinityType != api.AffinityTypeNone {
// TODO: support multiple service ports
proxier.loadBalancer.CleanupStaleStickySessions(name, "")
proxier.loadBalancer.CleanupStaleStickySessions(name)
}
}
}
@@ -501,8 +499,7 @@ func (proxier *Proxier) OnUpdate(services []api.Service) {
if err != nil {
glog.Errorf("Failed to open portal for %q: %v", service.Name, err)
}
// TODO: support multiple service ports
proxier.loadBalancer.NewService(service.Name, "", info.sessionAffinityType, info.stickyMaxAgeMinutes)
proxier.loadBalancer.NewService(service.Name, info.sessionAffinityType, info.stickyMaxAgeMinutes)
}
proxier.mu.Lock()
defer proxier.mu.Unlock()

View File

@@ -197,7 +197,7 @@ func TestTCPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -217,7 +217,7 @@ func TestUDPProxy(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -246,7 +246,7 @@ func TestTCPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -277,7 +277,7 @@ func TestUDPProxyStop(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -308,7 +308,7 @@ func TestTCPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -338,7 +338,7 @@ func TestUDPProxyUpdateDelete(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -368,7 +368,7 @@ func TestTCPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -407,7 +407,7 @@ func TestUDPProxyUpdateDeleteUpdate(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})
@@ -446,7 +446,7 @@ func TestTCPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: tcpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: tcpServerPort}},
},
})
@@ -482,7 +482,7 @@ func TestUDPProxyUpdatePort(t *testing.T) {
lb.OnUpdate([]api.Endpoints{
{
ObjectMeta: api.ObjectMeta{Name: "echo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: udpServerPort}}}},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: udpServerPort}},
},
})

View File

@@ -18,7 +18,6 @@ package proxy
import (
"errors"
"fmt"
"net"
"reflect"
"strconv"
@@ -35,83 +34,62 @@ var (
ErrMissingEndpoints = errors.New("missing endpoints")
)
type affinityState struct {
clientIP string
type sessionAffinityDetail struct {
clientIPAddress string
//clientProtocol api.Protocol //not yet used
//sessionCookie string //not yet used
endpoint string
lastUsed time.Time
lastUsedDTTM time.Time
}
type affinityPolicy struct {
affinityType api.AffinityType
affinityMap map[string]*affinityState // map client IP -> affinity info
ttlMinutes int
}
// balancerKey is a string that the balancer uses to key stored state. It is
// formatted as "service_name:port_name", but that should be opaque to most consumers.
type balancerKey string
func makeBalancerKey(service, port string) balancerKey {
return balancerKey(fmt.Sprintf("%s:%s", service, port))
type serviceDetail struct {
name string
sessionAffinityType api.AffinityType
sessionAffinityMap map[string]*sessionAffinityDetail
stickyMaxAgeMinutes int
}
// LoadBalancerRR is a round-robin load balancer.
type LoadBalancerRR struct {
lock sync.RWMutex
services map[balancerKey]*balancerState
endpointsMap map[string][]string
rrIndex map[string]int
serviceDtlMap map[string]serviceDetail
}
// Ensure this implements LoadBalancer.
var _ LoadBalancer = &LoadBalancerRR{}
type balancerState struct {
endpoints []string // a list of "ip:port" style strings
index int // index into endpoints
affinity affinityPolicy
}
func newAffinityPolicy(affinityType api.AffinityType, ttlMinutes int) *affinityPolicy {
return &affinityPolicy{
affinityType: affinityType,
affinityMap: make(map[string]*affinityState),
ttlMinutes: ttlMinutes,
func newServiceDetail(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) *serviceDetail {
return &serviceDetail{
name: service,
sessionAffinityType: sessionAffinityType,
sessionAffinityMap: make(map[string]*sessionAffinityDetail),
stickyMaxAgeMinutes: stickyMaxAgeMinutes,
}
}
// NewLoadBalancerRR returns a new LoadBalancerRR.
func NewLoadBalancerRR() *LoadBalancerRR {
return &LoadBalancerRR{
services: map[balancerKey]*balancerState{},
endpointsMap: make(map[string][]string),
rrIndex: make(map[string]int),
serviceDtlMap: make(map[string]serviceDetail),
}
}
func (lb *LoadBalancerRR) NewService(service, port string, affinityType api.AffinityType, ttlMinutes int) error {
lb.lock.Lock()
defer lb.lock.Unlock()
lb.newServiceInternal(service, port, affinityType, ttlMinutes)
func (lb *LoadBalancerRR) NewService(service string, sessionAffinityType api.AffinityType, stickyMaxAgeMinutes int) error {
if stickyMaxAgeMinutes == 0 {
stickyMaxAgeMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
}
if _, exists := lb.serviceDtlMap[service]; !exists {
lb.serviceDtlMap[service] = *newServiceDetail(service, sessionAffinityType, stickyMaxAgeMinutes)
glog.V(4).Infof("NewService. Service does not exist. So I created it: %+v", lb.serviceDtlMap[service])
}
return nil
}
func (lb *LoadBalancerRR) newServiceInternal(service, port string, affinityType api.AffinityType, ttlMinutes int) *balancerState {
if ttlMinutes == 0 {
ttlMinutes = 180 //default to 3 hours if not specified. Should 0 be unlimeted instead????
}
key := makeBalancerKey(service, port)
if _, exists := lb.services[key]; !exists {
lb.services[key] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlMinutes)}
glog.V(4).Infof("LoadBalancerRR service %q did not exist, created", service)
}
return lb.services[key]
}
// return true if this service is using some form of session affinity.
func isSessionAffinity(affinity *affinityPolicy) bool {
// Should never be empty string, but checking for it to be safe.
if affinity.affinityType == "" || affinity.affinityType == api.AffinityTypeNone {
// return true if this service detail is using some form of session affinity.
func isSessionAffinity(serviceDtl serviceDetail) bool {
//Should never be empty string, but chekcing for it to be safe.
if serviceDtl.sessionAffinityType == "" || serviceDtl.sessionAffinityType == api.AffinityTypeNone {
return false
}
return true
@@ -119,111 +97,100 @@ func isSessionAffinity(affinity *affinityPolicy) bool {
// NextEndpoint returns a service endpoint.
// The service endpoint is chosen using the round-robin algorithm.
func (lb *LoadBalancerRR) NextEndpoint(service, port string, srcAddr net.Addr) (string, error) {
// Coarse locking is simple. We can get more fine-grained if/when we
// can prove it matters.
lb.lock.Lock()
defer lb.lock.Unlock()
func (lb *LoadBalancerRR) NextEndpoint(service string, srcAddr net.Addr) (string, error) {
var ipaddr string
glog.V(4).Infof("NextEndpoint. service: %s. srcAddr: %+v. Endpoints: %+v", service, srcAddr, lb.endpointsMap)
key := makeBalancerKey(service, port)
state, exists := lb.services[key]
if !exists || state == nil {
lb.lock.RLock()
serviceDtls, exists := lb.serviceDtlMap[service]
endpoints, _ := lb.endpointsMap[service]
index := lb.rrIndex[service]
sessionAffinityEnabled := isSessionAffinity(serviceDtls)
lb.lock.RUnlock()
if !exists {
return "", ErrMissingServiceEntry
}
if len(state.endpoints) == 0 {
if len(endpoints) == 0 {
return "", ErrMissingEndpoints
}
glog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", service, srcAddr, state.endpoints)
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
var ipaddr string
if sessionAffinityEnabled {
// Caution: don't shadow ipaddr
var err error
ipaddr, _, err = net.SplitHostPort(srcAddr.String())
if err != nil {
return "", fmt.Errorf("malformed source address %q: %v", srcAddr.String(), err)
if _, _, err := net.SplitHostPort(srcAddr.String()); err == nil {
ipaddr, _, _ = net.SplitHostPort(srcAddr.String())
}
sessionAffinity, exists := state.affinity.affinityMap[ipaddr]
if exists && int(time.Now().Sub(sessionAffinity.lastUsed).Minutes()) < state.affinity.ttlMinutes {
// Affinity wins.
sessionAffinity, exists := serviceDtls.sessionAffinityMap[ipaddr]
glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity)
if exists && int(time.Now().Sub(sessionAffinity.lastUsedDTTM).Minutes()) < serviceDtls.stickyMaxAgeMinutes {
endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now()
glog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %+v: %s", service, ipaddr, sessionAffinity, endpoint)
sessionAffinity.lastUsedDTTM = time.Now()
glog.V(4).Infof("NextEndpoint. Key: %s. sessionAffinity: %+v", ipaddr, sessionAffinity)
return endpoint, nil
}
}
// Take the next endpoint.
endpoint := state.endpoints[state.index]
state.index = (state.index + 1) % len(state.endpoints)
endpoint := endpoints[index]
lb.lock.Lock()
lb.rrIndex[service] = (index + 1) % len(endpoints)
if sessionAffinityEnabled {
var affinity *affinityState
affinity = state.affinity.affinityMap[ipaddr]
var affinity *sessionAffinityDetail
affinity, _ = lb.serviceDtlMap[service].sessionAffinityMap[ipaddr]
if affinity == nil {
affinity = new(affinityState) //&affinityState{ipaddr, "TCP", "", endpoint, time.Now()}
state.affinity.affinityMap[ipaddr] = affinity
affinity = new(sessionAffinityDetail) //&sessionAffinityDetail{ipaddr, "TCP", "", endpoint, time.Now()}
lb.serviceDtlMap[service].sessionAffinityMap[ipaddr] = affinity
}
affinity.lastUsed = time.Now()
affinity.lastUsedDTTM = time.Now()
affinity.endpoint = endpoint
affinity.clientIP = ipaddr
glog.V(4).Infof("Updated affinity key %s: %+v", ipaddr, state.affinity.affinityMap[ipaddr])
affinity.clientIPAddress = ipaddr
glog.V(4).Infof("NextEndpoint. New Affinity key %s: %+v", ipaddr, lb.serviceDtlMap[service].sessionAffinityMap[ipaddr])
}
lb.lock.Unlock()
return endpoint, nil
}
type hostPortPair struct {
host string
port int
func isValidEndpoint(ep *api.Endpoint) bool {
return ep.IP != "" && ep.Port > 0
}
func isValidEndpoint(hpp *hostPortPair) bool {
return hpp.host != "" && hpp.port > 0
}
func getValidEndpoints(pairs []hostPortPair) []string {
// Convert structs into strings for easier use later.
func filterValidEndpoints(endpoints []api.Endpoint) []string {
// Convert Endpoint objects into strings for easier use later. Ignore
// the protocol field - we'll get that from the Service objects.
var result []string
for i := range pairs {
hpp := &pairs[i]
if isValidEndpoint(hpp) {
result = append(result, net.JoinHostPort(hpp.host, strconv.Itoa(hpp.port)))
for i := range endpoints {
ep := &endpoints[i]
if isValidEndpoint(ep) {
result = append(result, net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)))
}
}
return result
}
// Remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(state *balancerState, service balancerKey, endpoint string) {
for _, affinity := range state.affinity.affinityMap {
if affinity.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, service)
delete(state.affinity.affinityMap, affinity.clientIP)
//remove any session affinity records associated to a particular endpoint (for example when a pod goes down).
func removeSessionAffinityByEndpoint(lb *LoadBalancerRR, service string, endpoint string) {
for _, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap {
if affinityDetail.endpoint == endpoint {
glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s", affinityDetail.endpoint, service)
delete(lb.serviceDtlMap[service].sessionAffinityMap, affinityDetail.clientIPAddress)
}
}
}
// Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
//Loop through the valid endpoints and then the endpoints associated with the Load Balancer.
// Then remove any session affinity records that are not in both lists.
// This assumes the lb.lock is held.
func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []string) {
func updateServiceDetailMap(lb *LoadBalancerRR, service string, validEndpoints []string) {
allEndpoints := map[string]int{}
for _, newEndpoint := range newEndpoints {
allEndpoints[newEndpoint] = 1
for _, validEndpoint := range validEndpoints {
allEndpoints[validEndpoint] = 1
}
state, exists := lb.services[service]
if !exists {
return
}
for _, existingEndpoint := range state.endpoints {
for _, existingEndpoint := range lb.endpointsMap[service] {
allEndpoints[existingEndpoint] = allEndpoints[existingEndpoint] + 1
}
for mKey, mVal := range allEndpoints {
if mVal == 1 {
glog.V(3).Infof("Delete endpoint %s for service %q", mKey, service)
removeSessionAffinityByEndpoint(state, service, mKey)
glog.V(3).Infof("Delete endpoint %s for service: %s", mKey, service)
removeSessionAffinityByEndpoint(lb, service, mKey)
delete(lb.serviceDtlMap[service].sessionAffinityMap, mKey)
}
}
}
@@ -231,86 +198,44 @@ func (lb *LoadBalancerRR) updateAffinityMap(service balancerKey, newEndpoints []
// OnUpdate manages the registered service endpoints.
// Registered endpoints are updated if found in the update set or
// unregistered if missing from the update set.
func (lb *LoadBalancerRR) OnUpdate(allEndpoints []api.Endpoints) {
registeredEndpoints := make(map[balancerKey]bool)
func (lb *LoadBalancerRR) OnUpdate(endpoints []api.Endpoints) {
registeredEndpoints := make(map[string]bool)
lb.lock.Lock()
defer lb.lock.Unlock()
// Update endpoints for services.
for i := range allEndpoints {
svcEndpoints := &allEndpoints[i]
// We need to build a map of portname -> all ip:ports for that portname.
portsToEndpoints := map[string][]hostPortPair{}
// Explode the Endpoints.Endpoints[*].Ports[*] into the aforementioned map.
// FIXME: this is awkward. Maybe a different factoring of Endpoints is better?
for j := range svcEndpoints.Endpoints {
ep := &svcEndpoints.Endpoints[j]
for k := range ep.Ports {
epp := &ep.Ports[k]
portsToEndpoints[epp.Name] = append(portsToEndpoints[epp.Name], hostPortPair{ep.IP, epp.Port})
// Ignore the protocol field - we'll get that from the Service objects.
}
}
for portname := range portsToEndpoints {
key := makeBalancerKey(svcEndpoints.Name, portname)
state, exists := lb.services[key]
curEndpoints := []string{}
if state != nil {
curEndpoints = state.endpoints
}
newEndpoints := getValidEndpoints(portsToEndpoints[portname])
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(slice.CopyStrings(curEndpoints), newEndpoints) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcEndpoints.Name, svcEndpoints.Endpoints)
lb.updateAffinityMap(key, newEndpoints)
for _, endpoint := range endpoints {
existingEndpoints, exists := lb.endpointsMap[endpoint.Name]
validEndpoints := filterValidEndpoints(endpoint.Endpoints)
if !exists || !reflect.DeepEqual(slice.SortStrings(slice.CopyStrings(existingEndpoints)), slice.SortStrings(validEndpoints)) {
glog.V(3).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", endpoint.Name, endpoint.Endpoints)
updateServiceDetailMap(lb, endpoint.Name, validEndpoints)
// On update can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created
// to be safe we will call it here. A new service will only be created
// if one does not already exist.
state = lb.newServiceInternal(svcEndpoints.Name, portname, api.AffinityTypeNone, 0)
state.endpoints = slice.ShuffleStrings(newEndpoints)
lb.NewService(endpoint.Name, api.AffinityTypeNone, 0)
lb.endpointsMap[endpoint.Name] = slice.ShuffleStrings(validEndpoints)
// Reset the round-robin index.
state.index = 0
}
registeredEndpoints[key] = true
lb.rrIndex[endpoint.Name] = 0
}
registeredEndpoints[endpoint.Name] = true
}
// Remove endpoints missing from the update.
for k := range lb.services {
for k, v := range lb.endpointsMap {
if _, exists := registeredEndpoints[k]; !exists {
glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s", k)
delete(lb.services, k)
glog.V(3).Infof("LoadBalancerRR: Removing endpoints for %s -> %+v", k, v)
delete(lb.endpointsMap, k)
delete(lb.serviceDtlMap, k)
}
}
}
// Tests whether two slices are equivalent. This sorts both slices in-place.
func slicesEquiv(lhs, rhs []string) bool {
if len(lhs) != len(rhs) {
return false
}
if reflect.DeepEqual(slice.SortStrings(lhs), slice.SortStrings(rhs)) {
return true
}
return false
}
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service, port string) {
lb.lock.Lock()
defer lb.lock.Unlock()
key := makeBalancerKey(service, port)
state, exists := lb.services[key]
if !exists {
glog.Warning("CleanupStaleStickySessions called for non-existent balancer key %q", service)
return
}
for ip, affinity := range state.affinity.affinityMap {
if int(time.Now().Sub(affinity.lastUsed).Minutes()) >= state.affinity.ttlMinutes {
glog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, service)
delete(state.affinity.affinityMap, ip)
func (lb *LoadBalancerRR) CleanupStaleStickySessions(service string) {
stickyMaxAgeMinutes := lb.serviceDtlMap[service].stickyMaxAgeMinutes
for key, affinityDetail := range lb.serviceDtlMap[service].sessionAffinityMap {
if int(time.Now().Sub(affinityDetail.lastUsedDTTM).Minutes()) >= stickyMaxAgeMinutes {
glog.V(4).Infof("Removing client: %s from sessionAffinityMap for service: %s. Last used is greater than %d minutes....", affinityDetail.clientIPAddress, service, stickyMaxAgeMinutes)
delete(lb.serviceDtlMap[service].sessionAffinityMap, key)
}
}
}

View File

@@ -24,29 +24,29 @@ import (
)
func TestValidateWorks(t *testing.T) {
if isValidEndpoint(&hostPortPair{}) {
if isValidEndpoint(&api.Endpoint{}) {
t.Errorf("Didn't fail for empty string")
}
if isValidEndpoint(&hostPortPair{host: "foobar"}) {
if isValidEndpoint(&api.Endpoint{IP: "foobar"}) {
t.Errorf("Didn't fail with no port")
}
if isValidEndpoint(&hostPortPair{host: "foobar", port: -1}) {
if isValidEndpoint(&api.Endpoint{IP: "foobar", Port: -1}) {
t.Errorf("Didn't fail with a negative port")
}
if !isValidEndpoint(&hostPortPair{host: "foobar", port: 8080}) {
if !isValidEndpoint(&api.Endpoint{IP: "foobar", Port: 8080}) {
t.Errorf("Failed a valid config.")
}
}
func TestFilterWorks(t *testing.T) {
endpoints := []hostPortPair{
{host: "foobar", port: 1},
{host: "foobar", port: 2},
{host: "foobar", port: -1},
{host: "foobar", port: 3},
{host: "foobar", port: -2},
endpoints := []api.Endpoint{
{IP: "foobar", Port: 1},
{IP: "foobar", Port: 2},
{IP: "foobar", Port: -1},
{IP: "foobar", Port: 3},
{IP: "foobar", Port: -2},
}
filtered := getValidEndpoints(endpoints)
filtered := filterValidEndpoints(endpoints)
if len(filtered) != 3 {
t.Errorf("Failed to filter to the correct size")
@@ -66,7 +66,7 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
var endpoints []api.Endpoints
loadBalancer.OnUpdate(endpoints)
endpoint, err := loadBalancer.NextEndpoint("foo", "http", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil {
t.Errorf("Didn't fail with non-existent service")
}
@@ -75,8 +75,8 @@ func TestLoadBalanceFailsWithNoEndpoints(t *testing.T) {
}
}
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, port string, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, port, netaddr)
func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string, expected string, netaddr net.Addr) {
endpoint, err := loadBalancer.NextEndpoint(service, netaddr)
if err != nil {
t.Errorf("Didn't find a service for %s, expected %s, failed with: %v", service, expected, err)
}
@@ -87,41 +87,25 @@ func expectEndpoint(t *testing.T, loadBalancer *LoadBalancerRR, service string,
func TestLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 40}}}},
Endpoints: []api.Endpoint{{IP: "endpoint1", Port: 40}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:40", nil)
}
func stringsInSlice(haystack []string, needles ...string) bool {
for _, needle := range needles {
found := false
for i := range haystack {
if haystack[i] == needle {
found = true
break
}
}
if found == false {
return false
}
}
return true
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
expectEndpoint(t, loadBalancer, "foo", "endpoint1:40", nil)
}
func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -129,77 +113,22 @@ func TestLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsAndPorts(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 1},
{Name: "q", Port: 10},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 2},
{Name: "q", Port: 20},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 3},
{Name: "q", Port: 30},
}},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
}
func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -207,86 +136,37 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 1},
{Name: "q", Port: 10},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 2},
{Name: "q", Port: 20},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 3},
{Name: "q", Port: 30},
}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 8},
{Name: "q", Port: 80},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 9},
{Name: "q", Port: 90},
}},
{IP: "endpoint", Port: 8},
{IP: "endpoint", Port: 9},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:8", "endpoint:9") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:80", "endpoint:90") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledEndpoints[1], nil)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], nil)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
endpoint, err = loadBalancer.NextEndpoint("foo", "q", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -294,7 +174,7 @@ func TestLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -302,183 +182,133 @@ func TestLoadBalanceWorksWithServiceRemoval(t *testing.T) {
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 1},
{Name: "q", Port: 10},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 2},
{Name: "q", Port: 20},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 3},
{Name: "q", Port: 30},
}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 4},
{Name: "q", Port: 40},
}},
{IP: "endpoint", Ports: []api.EndpointPort{
{Name: "p", Port: 5},
{Name: "q", Port: 50},
}},
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], nil)
shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledFooEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], nil)
shuffledFooEndpoints = loadBalancer.services[makeBalancerKey("foo", "q")].endpoints
if !stringsInSlice(shuffledFooEndpoints, "endpoint:10", "endpoint:20", "endpoint:30") {
t.Errorf("did not find expected endpoints: %v", shuffledFooEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[2], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "foo", "q", shuffledFooEndpoints[1], nil)
shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints
if !stringsInSlice(shuffledBarEndpoints, "endpoint:4", "endpoint:5") {
t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints)
}
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], nil)
shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "q")].endpoints
if !stringsInSlice(shuffledBarEndpoints, "endpoint:40", "endpoint:50") {
t.Errorf("did not find expected endpoints: %v", shuffledBarEndpoints)
}
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
shuffledBarEndpoints := loadBalancer.endpointsMap["bar"]
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", "q", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], nil)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], nil)
}
func TestStickyLoadBalanceWorksWithSingleEndpoint(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
},
Endpoints: []api.Endpoint{{IP: "endpoint", Port: 1}},
}
loadBalancer.OnUpdate(endpoints)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, "foo", "p", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client1)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2)
expectEndpoint(t, loadBalancer, "foo", "endpoint:1", client2)
}
func TestStickyLoadBalanceWorksWithMultipleEndpoints(t *testing.T) {
func TestStickyLoadBalanaceWorksWithMultipleEndpoints(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
func TestStickyLoadBalanaceWorksWithMultipleEndpointsStickyNone(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", "p", api.AffinityTypeNone, 0)
loadBalancer.NewService("foo", api.AffinityTypeNone, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client1)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client1)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
func TestStickyLoadBalanaceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client1 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 0}
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
@@ -486,44 +316,41 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
client5 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 5), Port: 0}
client6 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 6), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
client1Endpoint := shuffledEndpoints[0]
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
client2Endpoint := shuffledEndpoints[1]
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
client3Endpoint := shuffledEndpoints[2]
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
if client1Endpoint == "endpoint:3" {
client1Endpoint = shuffledEndpoints[0]
} else if client2Endpoint == "endpoint:3" {
@@ -531,26 +358,26 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsRemoveOne(t *testing.T) {
} else if client3Endpoint == "endpoint:3" {
client3Endpoint = shuffledEndpoints[0]
}
expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 4},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", "p", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", "p", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client6)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", client1Endpoint, client1)
expectEndpoint(t, loadBalancer, "foo", client2Endpoint, client2)
expectEndpoint(t, loadBalancer, "foo", client3Endpoint, client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client4)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client5)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client6)
}
func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
@@ -558,54 +385,51 @@ func TestStickyLoadBalanceWorksWithMultipleEndpointsAndUpdates(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 1)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
if !stringsInSlice(shuffledEndpoints, "endpoint:1", "endpoint:2", "endpoint:3") {
t.Errorf("did not find expected endpoints: %v", shuffledEndpoints)
}
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
shuffledEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
// Then update the configuration with one fewer endpoints, make sure
// we start in the beginning again
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 4}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}},
{IP: "endpoint", Port: 4},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledEndpoints = loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledEndpoints[1], client2)
shuffledEndpoints = loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledEndpoints[1], client2)
// Clear endpoints
endpoints[0] = api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{}}
loadBalancer.OnUpdate(endpoints)
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
@@ -616,58 +440,58 @@ func TestStickyLoadBalanceWorksWithServiceRemoval(t *testing.T) {
client2 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 2), Port: 0}
client3 := &net.TCPAddr{IP: net.IPv4(127, 0, 0, 3), Port: 0}
loadBalancer := NewLoadBalancerRR()
endpoint, err := loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err := loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
loadBalancer.NewService("foo", "p", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("foo", api.AffinityTypeClientIP, 0)
endpoints := make([]api.Endpoints, 2)
endpoints[0] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 1}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 2}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 3}}},
{IP: "endpoint", Port: 1},
{IP: "endpoint", Port: 2},
{IP: "endpoint", Port: 3},
},
}
loadBalancer.NewService("bar", "p", api.AffinityTypeClientIP, 0)
loadBalancer.NewService("bar", api.AffinityTypeClientIP, 0)
endpoints[1] = api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "bar"},
Endpoints: []api.Endpoint{
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 5}}},
{IP: "endpoint", Ports: []api.EndpointPort{{Name: "p", Port: 6}}},
{IP: "endpoint", Port: 5},
{IP: "endpoint", Port: 5},
},
}
loadBalancer.OnUpdate(endpoints)
shuffledFooEndpoints := loadBalancer.services[makeBalancerKey("foo", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
shuffledFooEndpoints := loadBalancer.endpointsMap["foo"]
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[2], client3)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
shuffledBarEndpoints := loadBalancer.services[makeBalancerKey("bar", "p")].endpoints
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", "p", shuffledFooEndpoints[0], client1)
shuffledBarEndpoints := loadBalancer.endpointsMap["bar"]
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "foo", shuffledFooEndpoints[0], client1)
// Then update the configuration by removing foo
loadBalancer.OnUpdate(endpoints[1:])
endpoint, err = loadBalancer.NextEndpoint("foo", "p", nil)
endpoint, err = loadBalancer.NextEndpoint("foo", nil)
if err == nil || len(endpoint) != 0 {
t.Errorf("Didn't fail with non-existent service")
}
// but bar is still there, and we continue RR from where we left off.
shuffledBarEndpoints = loadBalancer.services[makeBalancerKey("bar", "p")].endpoints
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", "p", shuffledBarEndpoints[0], client1)
shuffledBarEndpoints = loadBalancer.endpointsMap["bar"]
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[1], client2)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
expectEndpoint(t, loadBalancer, "bar", shuffledBarEndpoints[0], client1)
}

View File

@@ -171,7 +171,7 @@ func TestControllerParsing(t *testing.T) {
Containers: []api.Container{
{
Image: "dockerfile/nginx",
Ports: []api.ContainerPort{
Ports: []api.Port{
{
ContainerPort: 80,
HostPort: 8080,

View File

@@ -27,20 +27,10 @@ import (
)
func TestGetEndpoints(t *testing.T) {
expected := []api.Endpoint{
{IP: "127.0.0.1", Ports: []api.EndpointPort{
{Name: "p", Port: 9000, Protocol: api.ProtocolTCP},
{Name: "q", Port: 9000, Protocol: api.ProtocolUDP},
}},
{IP: "127.0.0.2", Ports: []api.EndpointPort{
{Name: "p", Port: 8000, Protocol: api.ProtocolTCP},
{Name: "q", Port: 8000, Protocol: api.ProtocolUDP},
}},
}
registry := &registrytest.ServiceRegistry{
Endpoints: api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: expected,
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}},
},
}
storage := NewREST(registry)
@@ -49,7 +39,7 @@ func TestGetEndpoints(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %#v", err)
}
if !reflect.DeepEqual(expected, obj.(*api.Endpoints).Endpoints) {
if !reflect.DeepEqual([]api.Endpoint{{IP: "127.0.0.1", Port: 9000}}, obj.(*api.Endpoints).Endpoints) {
t.Errorf("unexpected endpoints: %#v", obj)
}
}

View File

@@ -24,8 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@@ -600,10 +598,10 @@ func TestEtcdListEndpoints(t *testing.T) {
Node: &etcd.Node{
Nodes: []*etcd.Node{
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Name: "p", Port: 8345, Protocol: api.ProtocolTCP}}}}}),
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Protocol: "TCP", Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 8345}}}),
},
{
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}}),
Value: runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "bar"}, Protocol: "TCP"}),
},
},
},
@@ -627,7 +625,8 @@ func TestEtcdGetEndpoints(t *testing.T) {
registry := NewTestEtcdRegistry(fakeClient)
endpoints := &api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 34855, Protocol: api.ProtocolTCP}}}},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 34855}},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")
@@ -648,19 +647,10 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true
registry := NewTestEtcdRegistry(fakeClient)
// TODO: Once we drop single-port APIs, make this test use the
// multi-port features. This will force a compile error when those APIs
// are deleted.
_ = v1beta1.Dependency
_ = v1beta2.Dependency
endpoints := api.Endpoints{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Endpoints: []api.Endpoint{
{IP: "baz", Ports: []api.EndpointPort{{Port: 1, Protocol: api.ProtocolTCP}}},
{IP: "bar", Ports: []api.EndpointPort{{Port: 2, Protocol: api.ProtocolTCP}}},
},
Protocol: "TCP",
Endpoints: []api.Endpoint{{IP: "baz"}, {IP: "bar"}},
}
key, _ := makeServiceEndpointsKey(ctx, "foo")

View File

@@ -592,7 +592,7 @@ func TestResourceLocation(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
{Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
@@ -604,7 +604,7 @@ func TestResourceLocation(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
{Name: "ctr", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
@@ -617,7 +617,7 @@ func TestResourceLocation(t *testing.T) {
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr1"},
{Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
{Name: "ctr2", Ports: []api.Port{{ContainerPort: 9376}}},
},
},
},
@@ -629,8 +629,8 @@ func TestResourceLocation(t *testing.T) {
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.PodSpec{
Containers: []api.Container{
{Name: "ctr1", Ports: []api.ContainerPort{{ContainerPort: 9376}}},
{Name: "ctr2", Ports: []api.ContainerPort{{ContainerPort: 1234}}},
{Name: "ctr1", Ports: []api.Port{{ContainerPort: 9376}}},
{Name: "ctr2", Ports: []api.Port{{ContainerPort: 1234}}},
},
},
},

View File

@@ -21,7 +21,6 @@ import (
"math/rand"
"net"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
@@ -219,57 +218,17 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo
// ResourceLocation returns a URL to which one can send traffic for the specified service.
func (rs *REST) ResourceLocation(ctx api.Context, id string) (string, error) {
// Allow ID as "svcname" or "svcname:port". Choose an endpoint at
// random. If the port is specified as a number, use that value
// directly. If the port is specified as a name, try to look up that
// name on the chosen endpoint. If port is not specified, try to use
// the first unnamed port on the chosen endpoint. If there are no
// unnamed ports, try to use the first defined port.
parts := strings.Split(id, ":")
if len(parts) > 2 {
return "", errors.NewBadRequest(fmt.Sprintf("invalid service request %q", id))
}
name := parts[0]
port := ""
if len(parts) == 2 {
port = parts[1]
}
eps, err := rs.registry.GetEndpoints(ctx, name)
eps, err := rs.registry.GetEndpoints(ctx, id)
if err != nil {
return "", err
}
if len(eps.Endpoints) == 0 {
return "", fmt.Errorf("no endpoints available for %v", name)
return "", fmt.Errorf("no endpoints available for %v", id)
}
ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))]
// Try to figure out a port.
if _, err := strconv.Atoi(port); err != nil {
// Do nothing - port is correct as is.
} else {
// Try a name lookup, even if name is "".
for i := range ep.Ports {
if ep.Ports[i].Name == port {
port = strconv.Itoa(ep.Ports[i].Port)
break
}
}
}
if port == "" {
// Still nothing - try the first defined port.
if len(ep.Ports) > 0 {
port = strconv.Itoa(ep.Ports[0].Port)
}
}
// We leave off the scheme ('http://') because we have no idea what sort of server
// is listening at this endpoint.
loc := ep.IP
if port != "" {
loc += fmt.Sprintf(":%s", port)
}
return loc, nil
ep := &eps.Endpoints[rand.Intn(len(eps.Endpoints))]
return net.JoinHostPort(ep.IP, strconv.Itoa(ep.Port)), nil
}
func (rs *REST) createExternalLoadBalancer(ctx api.Context, service *api.Service) error {

View File

@@ -370,7 +370,7 @@ func TestServiceRegistryGet(t *testing.T) {
func TestServiceRegistryResourceLocation(t *testing.T) {
ctx := api.NewDefaultContext()
registry := registrytest.NewServiceRegistry()
registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Ports: []api.EndpointPort{{Port: 80}}}}}
registry.Endpoints = api.Endpoints{Endpoints: []api.Endpoint{{IP: "foo", Port: 80}}}
fakeCloud := &cloud.FakeCloud{}
machines := []string{"foo", "bar", "baz"}
storage := NewREST(registry, fakeCloud, registrytest.NewMinionRegistry(machines, api.NodeResources{}), makeIPNet(t))

View File

@@ -60,9 +60,9 @@ func (st *schedulerTester) expectFailure(pod api.Pod) {
}
func newPod(host string, hostPorts ...int) api.Pod {
networkPorts := []api.ContainerPort{}
networkPorts := []api.Port{}
for _, port := range hostPorts {
networkPorts = append(networkPorts, api.ContainerPort{HostPort: port})
networkPorts = append(networkPorts, api.Port{HostPort: port})
}
return api.Pod{
Status: api.PodStatus{

View File

@@ -87,11 +87,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
continue
}
// TODO: Add multiple-ports to Service and expose them here.
endpoints = append(endpoints, api.Endpoint{
IP: pod.Status.PodIP,
Ports: []api.EndpointPort{{Name: "", Protocol: service.Spec.Protocol, Port: port}},
})
endpoints = append(endpoints, api.Endpoint{IP: pod.Status.PodIP, Port: port})
}
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
@@ -100,6 +96,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
ObjectMeta: api.ObjectMeta{
Name: service.Name,
},
Protocol: service.Spec.Protocol,
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
@@ -115,7 +112,7 @@ func (e *EndpointController) SyncServiceEndpoints() error {
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
} else {
// Pre-existing
if endpointsEqual(currentEndpoints, endpoints) {
if currentEndpoints.Protocol == service.Spec.Protocol && endpointsEqual(currentEndpoints, endpoints) {
glog.V(5).Infof("protocol and endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
continue
}
@@ -129,27 +126,12 @@ func (e *EndpointController) SyncServiceEndpoints() error {
return resultErr
}
// TODO: It would be nice if we had a util function that reflectively compared
// two slices for order-insensitive equivalence.
func portsEqual(lhs, rhs []api.EndpointPort) bool {
if len(lhs) != len(rhs) {
return false
}
for i := range lhs {
if lhs[i] != rhs[i] {
return false
}
}
return true
}
func containsEndpoint(haystack *api.Endpoints, needle *api.Endpoint) bool {
if haystack == nil || needle == nil {
return false
}
for ix := range haystack.Endpoints {
haystackEP := &haystack.Endpoints[ix]
if haystackEP.IP == needle.IP && portsEqual(haystackEP.Ports, needle.Ports) {
if haystack.Endpoints[ix] == *needle {
return true
}
}

View File

@@ -39,7 +39,7 @@ func newPodList(count int) *api.PodList {
Spec: api.PodSpec{
Containers: []api.Container{
{
Ports: []api.ContainerPort{
Ports: []api.Port{
{
ContainerPort: 8080,
},
@@ -69,7 +69,7 @@ func TestFindPort(t *testing.T) {
Spec: api.PodSpec{
Containers: []api.Container{
{
Ports: []api.ContainerPort{
Ports: []api.Port{
{
Name: "foo",
ContainerPort: 8080,
@@ -90,7 +90,7 @@ func TestFindPort(t *testing.T) {
Spec: api.PodSpec{
Containers: []api.Container{
{
Ports: []api.ContainerPort{},
Ports: []api.Port{},
},
},
},
@@ -245,7 +245,8 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@@ -276,7 +277,8 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@@ -307,7 +309,8 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolUDP, Port: 1000}}}},
Protocol: api.ProtocolUDP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@@ -337,6 +340,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{},
}})
defer testServer.Close()
@@ -350,7 +354,8 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=other", "PUT", &data)
}
@@ -376,7 +381,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 1000}}}},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "6.7.8.9", Port: 1000}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@@ -389,7 +395,8 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Name: "foo",
ResourceVersion: "1",
},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints/foo?namespace=bar", "PUT", &data)
}
@@ -414,7 +421,8 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
ObjectMeta: api.ObjectMeta{
ResourceVersion: "1",
},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
@@ -452,7 +460,8 @@ func TestSyncEndpointsItems(t *testing.T) {
ObjectMeta: api.ObjectMeta{
ResourceVersion: "",
},
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Ports: []api.EndpointPort{{Protocol: api.ProtocolTCP, Port: 8080}}}},
Protocol: api.ProtocolTCP,
Endpoints: []api.Endpoint{{IP: "1.2.3.4", Port: 8080}},
})
endpointsHandler.ValidateRequest(t, "/api/"+testapi.Version()+"/endpoints?namespace=other", "POST", &data)
}

View File

@@ -308,7 +308,7 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
@@ -321,7 +321,7 @@ func TestWatchEtcdState(t *testing.T) {
},
From: 1,
Expected: []*T{
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}},
},
},
"from initial state": {
@@ -343,7 +343,7 @@ func TestWatchEtcdState(t *testing.T) {
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}})),
Value: string(runtime.EncodeOrDie(codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}, Endpoints: []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
@@ -356,7 +356,7 @@ func TestWatchEtcdState(t *testing.T) {
},
Expected: []*T{
{watch.Added, nil},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Ports: []api.EndpointPort{{Port: 9000}}}}},
{watch.Modified, []api.Endpoint{{IP: "127.0.0.1", Port: 9000}}},
},
},
}

View File

@@ -64,7 +64,7 @@ var _ = Describe("Events", func() {
{
Name: "p",
Image: "kubernetes/serve_hostname",
Ports: []api.ContainerPort{{ContainerPort: 80}},
Ports: []api.Port{{ContainerPort: 80}},
},
},
},

View File

@@ -109,7 +109,7 @@ var _ = Describe("Networking", func() {
Name: "webserver",
Image: "kubernetes/nettest:latest",
Command: []string{"-service=" + name},
Ports: []api.ContainerPort{{ContainerPort: 8080}},
Ports: []api.Port{{ContainerPort: 8080}},
},
},
},

View File

@@ -108,7 +108,7 @@ var _ = Describe("Pods", func() {
{
Name: "nginx",
Image: "dockerfile/nginx",
Ports: []api.ContainerPort{{ContainerPort: 80}},
Ports: []api.Port{{ContainerPort: 80}},
LivenessProbe: &api.Probe{
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
@@ -165,7 +165,7 @@ var _ = Describe("Pods", func() {
{
Name: "nginx",
Image: "dockerfile/nginx",
Ports: []api.ContainerPort{{ContainerPort: 80}},
Ports: []api.Port{{ContainerPort: 80}},
LivenessProbe: &api.Probe{
Handler: api.Handler{
HTTPGet: &api.HTTPGetAction{
@@ -236,7 +236,7 @@ var _ = Describe("Pods", func() {
{
Name: "srv",
Image: "kubernetes/serve_hostname",
Ports: []api.ContainerPort{{ContainerPort: 9376}},
Ports: []api.Port{{ContainerPort: 9376}},
},
},
},

View File

@@ -86,7 +86,7 @@ func ServeImageOrFail(c *client.Client, test string, image string) {
{
Name: name,
Image: image,
Ports: []api.ContainerPort{{ContainerPort: 9376, HostPort: 8080}},
Ports: []api.Port{{ContainerPort: 9376, HostPort: 8080}},
},
},
},

View File

@@ -250,11 +250,8 @@ var _ = Describe("Services", func() {
func validateIPsOrFail(c *client.Client, ns string, expectedPort int, expectedEndpoints []string, endpoints *api.Endpoints) {
ips := util.StringSet{}
for _, ep := range endpoints.Endpoints {
if len(ep.Ports) == 0 {
Fail(fmt.Sprintf("invalid endpoint, no ports"))
}
if ep.Ports[0].Port != expectedPort {
Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Ports[0].Port))
if ep.Port != expectedPort {
Fail(fmt.Sprintf("invalid port, expected %d, got %d", expectedPort, ep.Port))
}
ips.Insert(ep.IP)
}
@@ -299,7 +296,7 @@ func addEndpointPodOrFail(c *client.Client, ns, name string, labels map[string]s
{
Name: "test",
Image: "kubernetes/pause",
Ports: []api.ContainerPort{{ContainerPort: 80}},
Ports: []api.Port{{ContainerPort: 80}},
},
},
},