19848: Retry service IP repair on conflict
Components can write services during startup, which results in the ip allocator map being updated. Since core controllers *must* succeed for the masters to start, we should retry a few times in order to pass.
This commit is contained in:
		@@ -22,6 +22,8 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api/errors"
 | 
				
			||||||
 | 
						client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/registry/service"
 | 
						"k8s.io/kubernetes/pkg/registry/service"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/registry/service/ipallocator"
 | 
						"k8s.io/kubernetes/pkg/registry/service/ipallocator"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util"
 | 
						"k8s.io/kubernetes/pkg/util"
 | 
				
			||||||
@@ -72,6 +74,11 @@ func (c *Repair) RunUntil(ch chan struct{}) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
 | 
					// RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
 | 
				
			||||||
func (c *Repair) RunOnce() error {
 | 
					func (c *Repair) RunOnce() error {
 | 
				
			||||||
 | 
						return client.RetryOnConflict(client.DefaultBackoff, c.runOnce)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
 | 
				
			||||||
 | 
					func (c *Repair) runOnce() error {
 | 
				
			||||||
	// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
 | 
						// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
 | 
				
			||||||
	// or if they are executed against different leaders,
 | 
						// or if they are executed against different leaders,
 | 
				
			||||||
	// the ordering guarantee required to ensure no IP is allocated twice is violated.
 | 
						// the ordering guarantee required to ensure no IP is allocated twice is violated.
 | 
				
			||||||
@@ -127,12 +134,14 @@ func (c *Repair) RunOnce() error {
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	err = r.Snapshot(latest)
 | 
						if err := r.Snapshot(latest); err != nil {
 | 
				
			||||||
	if err != nil {
 | 
							return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
 | 
				
			||||||
		return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
 | 
					 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err := c.alloc.CreateOrUpdate(latest); err != nil {
 | 
						if err := c.alloc.CreateOrUpdate(latest); err != nil {
 | 
				
			||||||
 | 
							if errors.IsConflict(err) {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
 | 
							return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -21,6 +21,8 @@ import (
 | 
				
			|||||||
	"time"
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/api"
 | 
						"k8s.io/kubernetes/pkg/api"
 | 
				
			||||||
 | 
						"k8s.io/kubernetes/pkg/api/errors"
 | 
				
			||||||
 | 
						client "k8s.io/kubernetes/pkg/client/unversioned"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/registry/service"
 | 
						"k8s.io/kubernetes/pkg/registry/service"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/registry/service/portallocator"
 | 
						"k8s.io/kubernetes/pkg/registry/service/portallocator"
 | 
				
			||||||
	"k8s.io/kubernetes/pkg/util"
 | 
						"k8s.io/kubernetes/pkg/util"
 | 
				
			||||||
@@ -57,6 +59,11 @@ func (c *Repair) RunUntil(ch chan struct{}) {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
 | 
					// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
 | 
				
			||||||
func (c *Repair) RunOnce() error {
 | 
					func (c *Repair) RunOnce() error {
 | 
				
			||||||
 | 
						return client.RetryOnConflict(client.DefaultBackoff, c.runOnce)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					// runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
 | 
				
			||||||
 | 
					func (c *Repair) runOnce() error {
 | 
				
			||||||
	// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
 | 
						// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
 | 
				
			||||||
	// or if they are executed against different leaders,
 | 
						// or if they are executed against different leaders,
 | 
				
			||||||
	// the ordering guarantee required to ensure no port is allocated twice is violated.
 | 
						// the ordering guarantee required to ensure no port is allocated twice is violated.
 | 
				
			||||||
@@ -116,10 +123,13 @@ func (c *Repair) RunOnce() error {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
	err = r.Snapshot(latest)
 | 
						err = r.Snapshot(latest)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
		return fmt.Errorf("unable to persist the updated port allocations: %v", err)
 | 
							return fmt.Errorf("unable to snapshot the updated port allocations: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	if err := c.alloc.CreateOrUpdate(latest); err != nil {
 | 
						if err := c.alloc.CreateOrUpdate(latest); err != nil {
 | 
				
			||||||
 | 
							if errors.IsConflict(err) {
 | 
				
			||||||
 | 
								return err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
		return fmt.Errorf("unable to persist the updated port allocations: %v", err)
 | 
							return fmt.Errorf("unable to persist the updated port allocations: %v", err)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
	return nil
 | 
						return nil
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user