Merge pull request #46197 from xiangpengzhao/fix-allocate-clusterip

Automatic merge from submit-queue (batch tested with PRs 47850, 47835, 46197, 47250, 48284)

Allocate clusterIP when change service type from ExternalName to ClusterIP

**What this PR does / why we need it**:

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #35354 #46190

**Special notes for your reviewer**:
/cc @smarterclayton @thockin 

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue
2017-06-29 15:16:42 -07:00
committed by GitHub
8 changed files with 536 additions and 60 deletions

View File

@@ -33,6 +33,9 @@ type Interface interface {
AllocateNext() (net.IP, error)
Release(net.IP) error
ForEach(func(net.IP))
// For testing
Has(ip net.IP) bool
}
var (

View File

@@ -34,6 +34,9 @@ type Interface interface {
AllocateNext() (int, error)
Release(int) error
ForEach(func(int))
// For testing
Has(int) bool
}
var (

View File

@@ -28,7 +28,7 @@ package portallocator
// ...
// write(updatedOwner)
/// op.Commit()
type portAllocationOperation struct {
type PortAllocationOperation struct {
pa Interface
allocated []int
releaseDeferred []int
@@ -36,8 +36,8 @@ type portAllocationOperation struct {
}
// Creates a portAllocationOperation, tracking a set of allocations & releases
func StartOperation(pa Interface) *portAllocationOperation {
op := &portAllocationOperation{}
func StartOperation(pa Interface) *PortAllocationOperation {
op := &PortAllocationOperation{}
op.pa = pa
op.allocated = []int{}
op.releaseDeferred = []int{}
@@ -46,14 +46,14 @@ func StartOperation(pa Interface) *portAllocationOperation {
}
// Will rollback unless marked as shouldRollback = false by a Commit(). Call from a defer block
func (op *portAllocationOperation) Finish() {
func (op *PortAllocationOperation) Finish() {
if op.shouldRollback {
op.Rollback()
}
}
// (Try to) undo any operations we did
func (op *portAllocationOperation) Rollback() []error {
func (op *PortAllocationOperation) Rollback() []error {
errors := []error{}
for _, allocated := range op.allocated {
@@ -72,7 +72,7 @@ func (op *portAllocationOperation) Rollback() []error {
// (Try to) perform any deferred operations.
// Note that even if this fails, we don't rollback; we always want to err on the side of over-allocation,
// and Commit should be called _after_ the owner is written
func (op *portAllocationOperation) Commit() []error {
func (op *PortAllocationOperation) Commit() []error {
errors := []error{}
for _, release := range op.releaseDeferred {
@@ -94,7 +94,7 @@ func (op *portAllocationOperation) Commit() []error {
}
// Allocates a port, and record it for future rollback
func (op *portAllocationOperation) Allocate(port int) error {
func (op *PortAllocationOperation) Allocate(port int) error {
err := op.pa.Allocate(port)
if err == nil {
op.allocated = append(op.allocated, port)
@@ -103,7 +103,7 @@ func (op *portAllocationOperation) Allocate(port int) error {
}
// Allocates a port, and record it for future rollback
func (op *portAllocationOperation) AllocateNext() (int, error) {
func (op *PortAllocationOperation) AllocateNext() (int, error) {
port, err := op.pa.AllocateNext()
if err == nil {
op.allocated = append(op.allocated, port)
@@ -112,6 +112,6 @@ func (op *portAllocationOperation) AllocateNext() (int, error) {
}
// Marks a port so that it will be released if this operation Commits
func (op *portAllocationOperation) ReleaseDeferred(port int) {
func (op *PortAllocationOperation) ReleaseDeferred(port int) {
op.releaseDeferred = append(op.releaseDeferred, port)
}

View File

@@ -357,58 +357,45 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, errs)
}
// TODO: this should probably move to strategy.PrepareForCreate()
releaseServiceIP := false
defer func() {
if releaseServiceIP {
if helper.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
}
}
}()
nodePortOp := portallocator.StartOperation(rs.serviceNodePorts)
defer nodePortOp.Finish()
assignNodePorts := shouldAssignNodePorts(service)
oldNodePorts := CollectServiceNodePorts(oldService)
newNodePorts := []int{}
if assignNodePorts {
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
nodePort := int(servicePort.NodePort)
if nodePort != 0 {
if !contains(oldNodePorts, nodePort) {
err := nodePortOp.Allocate(nodePort)
if err != nil {
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort, err.Error())}
return nil, false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
} else {
nodePort, err = nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return nil, false, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
servicePort.NodePort = int32(nodePort)
}
// Detect duplicate node ports; this should have been caught by validation, so we panic
if contains(newNodePorts, nodePort) {
panic("duplicate node port")
}
newNodePorts = append(newNodePorts, nodePort)
// Update service from ExternalName to non-ExternalName, should initialize ClusterIP.
if oldService.Spec.Type == api.ServiceTypeExternalName && service.Spec.Type != api.ServiceTypeExternalName {
if releaseServiceIP, err = rs.initClusterIP(service); err != nil {
return nil, false, err
}
} else {
// Validate should have validated that nodePort == 0
}
// The comparison loops are O(N^2), but we don't expect N to be huge
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
for _, oldNodePort := range oldNodePorts {
if contains(newNodePorts, oldNodePort) {
continue
// Update service from non-ExternalName to ExternalName, should release ClusterIP if exists.
if oldService.Spec.Type != api.ServiceTypeExternalName && service.Spec.Type == api.ServiceTypeExternalName {
if helper.IsServiceIPSet(oldService) {
rs.serviceIPs.Release(net.ParseIP(oldService.Spec.ClusterIP))
}
nodePortOp.ReleaseDeferred(oldNodePort)
}
// Remove any LoadBalancerStatus now if Type != LoadBalancer;
// although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity.
// Update service from NodePort or LoadBalancer to ExternalName or ClusterIP, should release NodePort if exists.
if (oldService.Spec.Type == api.ServiceTypeNodePort || oldService.Spec.Type == api.ServiceTypeLoadBalancer) &&
(service.Spec.Type == api.ServiceTypeExternalName || service.Spec.Type == api.ServiceTypeClusterIP) {
rs.releaseNodePort(oldService, nodePortOp)
}
// Update service from any type to NodePort or LoadBalancer, should update NodePort.
if service.Spec.Type == api.ServiceTypeNodePort || service.Spec.Type == api.ServiceTypeLoadBalancer {
if err := rs.updateNodePort(oldService, service, nodePortOp); err != nil {
return nil, false, err
}
}
// Update service from LoadBalancer to non-LoadBalancer, should remove any LoadBalancerStatus.
if service.Spec.Type != api.ServiceTypeLoadBalancer {
// Although loadbalancer delete is actually asynchronous, we don't need to expose the user to that complexity.
service.Status.LoadBalancer = api.LoadBalancerStatus{}
}
@@ -425,13 +412,14 @@ func (rs *REST) Update(ctx genericapirequest.Context, name string, objInfo rest.
}
out, err := rs.registry.UpdateService(ctx, service)
if err == nil {
el := nodePortOp.Commit()
if el != nil {
// problems should be fixed by an eventual reconciliation / restart
glog.Errorf("error(s) committing NodePorts changes: %v", el)
}
releaseServiceIP = false
}
return out, false, err
@@ -570,3 +558,82 @@ func (rs *REST) allocateHealthCheckNodePort(service *api.Service) error {
}
return nil
}
// The return bool value indicates if a cluster IP is allocated successfully.
func (rs *REST) initClusterIP(service *api.Service) (bool, error) {
switch {
case service.Spec.ClusterIP == "":
// Allocate next available.
ip, err := rs.serviceIPs.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return false, errors.NewInternalError(fmt.Errorf("failed to allocate a serviceIP: %v", err))
}
service.Spec.ClusterIP = ip.String()
return true, nil
case service.Spec.ClusterIP != api.ClusterIPNone && service.Spec.ClusterIP != "":
// Try to respect the requested IP.
if err := rs.serviceIPs.Allocate(net.ParseIP(service.Spec.ClusterIP)); err != nil {
// TODO: when validation becomes versioned, this gets more complicated.
el := field.ErrorList{field.Invalid(field.NewPath("spec", "clusterIP"), service.Spec.ClusterIP, err.Error())}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
return true, nil
}
return false, nil
}
func (rs *REST) updateNodePort(oldService, newService *api.Service, nodePortOp *portallocator.PortAllocationOperation) error {
oldNodePorts := CollectServiceNodePorts(oldService)
newNodePorts := []int{}
for i := range newService.Spec.Ports {
servicePort := &newService.Spec.Ports[i]
nodePort := int(servicePort.NodePort)
if nodePort != 0 {
if !contains(oldNodePorts, nodePort) {
err := nodePortOp.Allocate(nodePort)
if err != nil {
el := field.ErrorList{field.Invalid(field.NewPath("spec", "ports").Index(i).Child("nodePort"), nodePort, err.Error())}
return errors.NewInvalid(api.Kind("Service"), newService.Name, el)
}
}
} else {
nodePort, err := nodePortOp.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
servicePort.NodePort = int32(nodePort)
}
// Detect duplicate node ports; this should have been caught by validation, so we panic
if contains(newNodePorts, nodePort) {
panic("duplicate node port")
}
newNodePorts = append(newNodePorts, nodePort)
}
// The comparison loops are O(N^2), but we don't expect N to be huge
// (there's a hard-limit at 2^16, because they're ports; and even 4 ports would be a lot)
for _, oldNodePort := range oldNodePorts {
if contains(newNodePorts, oldNodePort) {
continue
}
nodePortOp.ReleaseDeferred(oldNodePort)
}
return nil
}
func (rs *REST) releaseNodePort(service *api.Service, nodePortOp *portallocator.PortAllocationOperation) {
nodePorts := CollectServiceNodePorts(service)
for _, nodePort := range nodePorts {
nodePortOp.ReleaseDeferred(nodePort)
}
}

View File

@@ -1276,3 +1276,269 @@ func TestServiceRegistryExternalTrafficAnnotationClusterIP(t *testing.T) {
t.Errorf("Unexpected allocation of health check node port annotation %s", api.BetaAnnotationHealthCheckNodePort)
}
}
func TestInitClusterIP(t *testing.T) {
storage, _ := NewTestREST(t, nil)
testCases := []struct {
name string
svc *api.Service
expectClusterIP bool
}{
{
name: "Allocate new ClusterIP",
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
expectClusterIP: true,
},
{
name: "Allocate specified ClusterIP",
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: "1.2.3.4",
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
expectClusterIP: true,
},
{
name: "Shouldn't allocate ClusterIP",
svc: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
ClusterIP: api.ClusterIPNone,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
expectClusterIP: false,
},
}
for _, test := range testCases {
hasAllocatedIP, err := storage.initClusterIP(test.svc)
if err != nil {
t.Errorf("%q: unexpected error: %v", test.name, err)
}
if hasAllocatedIP != test.expectClusterIP {
t.Errorf("%q: expected %v, but got %v", test.name, test.expectClusterIP, hasAllocatedIP)
}
if test.expectClusterIP {
if !storage.serviceIPs.Has(net.ParseIP(test.svc.Spec.ClusterIP)) {
t.Errorf("%q: unexpected ClusterIP %q, out of range", test.name, test.svc.Spec.ClusterIP)
}
}
if test.name == "Allocate specified ClusterIP" && test.svc.Spec.ClusterIP != "1.2.3.4" {
t.Errorf("%q: expected ClusterIP %q, but got %q", test.name, "1.2.3.4", test.svc.Spec.ClusterIP)
}
}
}
func TestUpdateNodePort(t *testing.T) {
storage, _ := NewTestREST(t, nil)
nodePortOp := portallocator.StartOperation(storage.serviceNodePorts)
defer nodePortOp.Finish()
testCases := []struct {
name string
oldService *api.Service
newService *api.Service
expectSpecifiedNodePorts []int
}{
{
name: "Old service and new service have the same NodePort",
oldService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
NodePort: 30053,
}},
},
},
newService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
NodePort: 30053,
}},
},
},
expectSpecifiedNodePorts: []int{30053},
},
{
name: "Old service has more NodePorts than new service has",
oldService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-tcp",
Port: 53,
TargetPort: intstr.FromInt(6502),
Protocol: api.ProtocolTCP,
NodePort: 30053,
},
{
Name: "port-udp",
Port: 53,
TargetPort: intstr.FromInt(6502),
Protocol: api.ProtocolUDP,
NodePort: 30053,
},
},
},
},
newService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-tcp",
Port: 53,
TargetPort: intstr.FromInt(6502),
Protocol: api.ProtocolTCP,
NodePort: 30053,
},
},
},
},
expectSpecifiedNodePorts: []int{30053},
},
{
name: "Change protocol of ServicePort without changing NodePort",
oldService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-tcp",
Port: 53,
TargetPort: intstr.FromInt(6502),
Protocol: api.ProtocolTCP,
NodePort: 30053,
},
},
},
},
newService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{
{
Name: "port-udp",
Port: 53,
TargetPort: intstr.FromInt(6502),
Protocol: api.ProtocolUDP,
NodePort: 30053,
},
},
},
},
expectSpecifiedNodePorts: []int{30053},
},
{
name: "Should allocate NodePort when changing service type to NodePort",
oldService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeClusterIP,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
newService: &api.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{"bar": "baz"},
SessionAffinity: api.ServiceAffinityNone,
Type: api.ServiceTypeNodePort,
Ports: []api.ServicePort{{
Port: 6502,
Protocol: api.ProtocolTCP,
TargetPort: intstr.FromInt(6502),
}},
},
},
expectSpecifiedNodePorts: []int{},
},
}
for _, test := range testCases {
err := storage.updateNodePort(test.oldService, test.newService, nodePortOp)
if err != nil {
t.Errorf("%q: unexpected error: %v", test.name, err)
continue
}
_ = nodePortOp.Commit()
serviceNodePorts := CollectServiceNodePorts(test.newService)
if len(test.expectSpecifiedNodePorts) == 0 {
for _, nodePort := range serviceNodePorts {
if !storage.serviceNodePorts.Has(nodePort) {
t.Errorf("%q: unexpected NodePort %d, out of range", test.name, nodePort)
}
}
} else if !reflect.DeepEqual(serviceNodePorts, test.expectSpecifiedNodePorts) {
t.Errorf("%q: expected NodePorts %v, but got %v", test.name, test.expectSpecifiedNodePorts, serviceNodePorts)
}
}
}