Fix race in service IP allocation repair loop

This commit is contained in:
Tim Hockin 2016-12-02 10:54:12 -08:00
parent 2b5a10f8ff
commit 0777ecd030
5 changed files with 164 additions and 25 deletions

View File

@ -65,6 +65,8 @@ kube::log::status "Starting kube-apiserver"
--cert-dir="${TMP_DIR}/certs" \ --cert-dir="${TMP_DIR}/certs" \
--runtime-config="api/all=true" \ --runtime-config="api/all=true" \
--token-auth-file=$TMP_DIR/tokenauth.csv \ --token-auth-file=$TMP_DIR/tokenauth.csv \
--logtostderr \
--v=2 \
--service-cluster-ip-range="10.0.0.0/24" >/tmp/openapi-api-server.log 2>&1 & --service-cluster-ip-range="10.0.0.0/24" >/tmp/openapi-api-server.log 2>&1 &
APISERVER_PID=$! APISERVER_PID=$!

View File

@ -90,6 +90,19 @@ func NewCIDRRange(cidr *net.IPNet) *Range {
}) })
} }
// NewFromSnapshot allocates a Range and initializes it from a snapshot.
func NewFromSnapshot(snap *api.RangeAllocation) (*Range, error) {
_, ipnet, err := net.ParseCIDR(snap.Range)
if err != nil {
return nil, err
}
r := NewCIDRRange(ipnet)
if err := r.Restore(ipnet, snap.Data); err != nil {
return nil, err
}
return r, nil
}
func maximum(a, b int) int { func maximum(a, b int) int {
if a > b { if a > b {
return a return a
@ -102,6 +115,16 @@ func (r *Range) Free() int {
return r.alloc.Free() return r.alloc.Free()
} }
// Used returns the count of IP addresses used in the range.
func (r *Range) Used() int {
return r.max - r.alloc.Free()
}
// CIDR returns the CIDR covered by the range.
func (r *Range) CIDR() net.IPNet {
return *r.net
}
// Allocate attempts to reserve the provided IP. ErrNotInRange or // Allocate attempts to reserve the provided IP. ErrNotInRange or
// ErrAllocated will be returned if the IP is not valid for this range // ErrAllocated will be returned if the IP is not valid for this range
// or has already been reserved. ErrFull will be returned if there // or has already been reserved. ErrFull will be returned if there

View File

@ -34,6 +34,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 254 { if f := r.Free(); f != 254 {
t.Errorf("unexpected free %d", f) t.Errorf("unexpected free %d", f)
} }
if f := r.Used(); f != 0 {
t.Errorf("unexpected used %d", f)
}
found := sets.NewString() found := sets.NewString()
count := 0 count := 0
for r.Free() > 0 { for r.Free() > 0 {
@ -61,6 +64,9 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 { if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f) t.Errorf("unexpected free %d", f)
} }
if f := r.Used(); f != 253 {
t.Errorf("unexpected free %d", f)
}
ip, err := r.AllocateNext() ip, err := r.AllocateNext()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -87,12 +93,18 @@ func TestAllocate(t *testing.T) {
if f := r.Free(); f != 1 { if f := r.Free(); f != 1 {
t.Errorf("unexpected free %d", f) t.Errorf("unexpected free %d", f)
} }
if f := r.Used(); f != 253 {
t.Errorf("unexpected free %d", f)
}
if err := r.Allocate(released); err != nil { if err := r.Allocate(released); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if f := r.Free(); f != 0 { if f := r.Free(); f != 0 {
t.Errorf("unexpected free %d", f) t.Errorf("unexpected free %d", f)
} }
if f := r.Used(); f != 254 {
t.Errorf("unexpected free %d", f)
}
} }
func TestAllocateTiny(t *testing.T) { func TestAllocateTiny(t *testing.T) {
@ -256,3 +268,42 @@ func TestSnapshot(t *testing.T) {
t.Errorf("counts do not match: %d", other.Free()) t.Errorf("counts do not match: %d", other.Free())
} }
} }
func TestNewFromSnapshot(t *testing.T) {
_, cidr, err := net.ParseCIDR("192.168.0.0/24")
if err != nil {
t.Fatal(err)
}
r := NewCIDRRange(cidr)
allocated := []net.IP{}
for i := 0; i < 128; i++ {
ip, err := r.AllocateNext()
if err != nil {
t.Fatal(err)
}
allocated = append(allocated, ip)
}
snapshot := api.RangeAllocation{}
if err = r.Snapshot(&snapshot); err != nil {
t.Fatal(err)
}
r, err = NewFromSnapshot(&snapshot)
if err != nil {
t.Fatal(err)
}
if x := r.Free(); x != 126 {
t.Fatalf("expected 126 free IPs, got %d", x)
}
if x := r.Used(); x != 128 {
t.Fatalf("expected 128 used IPs, got %d", x)
}
for _, ip := range allocated {
if !r.Has(ip) {
t.Fatalf("expected IP to be allocated, but it was not")
}
}
}

View File

@ -51,8 +51,13 @@ type Repair struct {
serviceClient coreclient.ServicesGetter serviceClient coreclient.ServicesGetter
network *net.IPNet network *net.IPNet
alloc rangeallocation.RangeRegistry alloc rangeallocation.RangeRegistry
leaks map[string]int // counter per leaked IP
} }
// How many times we need to detect a leak before we clean up. This is to
// avoid races between allocating an IP and using it.
const numRepairsBeforeLeakCleanup = 3
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
// and generates informational warnings for a cluster that is not in sync. // and generates informational warnings for a cluster that is not in sync.
func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair { func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry) *Repair {
@ -61,6 +66,7 @@ func NewRepair(interval time.Duration, serviceClient coreclient.ServicesGetter,
serviceClient: serviceClient, serviceClient: serviceClient,
network: network, network: network,
alloc: alloc, alloc: alloc,
leaks: map[string]int{},
} }
} }
@ -89,18 +95,27 @@ func (c *Repair) runOnce() error {
// If etcd server is not running we should wait for some time and fail only then. This is particularly // If etcd server is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and etcd at the same time. // important when we start apiserver and etcd at the same time.
var latest *api.RangeAllocation var snapshot *api.RangeAllocation
var err error err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { var err error
latest, err = c.alloc.Get() snapshot, err = c.alloc.Get()
return err == nil, err return err == nil, err
}) })
if err != nil { if err != nil {
return fmt.Errorf("unable to refresh the service IP block: %v", err) return fmt.Errorf("unable to refresh the service IP block: %v", err)
} }
// If not yet initialized.
if snapshot.Range == "" {
snapshot.Range = c.network.String()
}
// Create an allocator because it is easy to use.
stored, err := ipallocator.NewFromSnapshot(snapshot)
if err != nil {
return fmt.Errorf("unable to rebuild allocator from snapshot: %v", err)
}
// We explicitly send no resource version, since the resource version // We explicitly send no resource version, since the resource version
// of 'latest' is from a different collection, it's not comparable to // of 'snapshot' is from a different collection, it's not comparable to
// the service collection. The caching layer keeps per-collection RVs, // the service collection. The caching layer keeps per-collection RVs,
// and this is proper, since in theory the collections could be hosted // and this is proper, since in theory the collections could be hosted
// in separate etcd (or even non-etcd) instances. // in separate etcd (or even non-etcd) instances.
@ -109,40 +124,73 @@ func (c *Repair) runOnce() error {
return fmt.Errorf("unable to refresh the service IP block: %v", err) return fmt.Errorf("unable to refresh the service IP block: %v", err)
} }
r := ipallocator.NewCIDRRange(c.network) rebuilt := ipallocator.NewCIDRRange(c.network)
// Check every Service's ClusterIP, and rebuild the state as we think it should be.
for _, svc := range list.Items { for _, svc := range list.Items {
if !api.IsServiceIPSet(&svc) { if !api.IsServiceIPSet(&svc) {
// didn't need a cluster IP
continue continue
} }
ip := net.ParseIP(svc.Spec.ClusterIP) ip := net.ParseIP(svc.Spec.ClusterIP)
if ip == nil { if ip == nil {
// cluster IP is broken, reallocate // cluster IP is corrupt
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
continue continue
} }
switch err := r.Allocate(ip); err { // mark it as in-use
switch err := rebuilt.Allocate(ip); err {
case nil: case nil:
if stored.Has(ip) {
// remove it from the old set, so we can find leaks
stored.Release(ip)
} else {
// cluster IP doesn't seem to be allocated
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not allocated; repairing", svc.Spec.ClusterIP, svc.Name, svc.Namespace))
}
delete(c.leaks, ip.String()) // it is used, so it can't be leaked
case ipallocator.ErrAllocated: case ipallocator.ErrAllocated:
// TODO: send event // TODO: send event
// cluster IP is broken, reallocate // cluster IP is duplicate
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace)) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s was assigned to multiple services; please recreate", ip, svc.Name, svc.Namespace))
case ipallocator.ErrNotInRange: case ipallocator.ErrNotInRange:
// TODO: send event // TODO: send event
// cluster IP is broken, reallocate // cluster IP is out of range
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network)) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not within the service CIDR %s; please recreate", ip, svc.Name, svc.Namespace, c.network))
case ipallocator.ErrFull: case ipallocator.ErrFull:
// TODO: send event // TODO: send event
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", r) // somehow we are out of IPs
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services", rebuilt)
default: default:
return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err) return fmt.Errorf("unable to allocate cluster IP %s for service %s/%s due to an unknown error, exiting: %v", ip, svc.Name, svc.Namespace, err)
} }
} }
if err := r.Snapshot(latest); err != nil { // Check for IPs that are left in the old set. They appear to have been leaked.
stored.ForEach(func(ip net.IP) {
count, found := c.leaks[ip.String()]
switch {
case !found:
// flag it to be cleaned up after any races (hopefully) are gone
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip))
count = numRepairsBeforeLeakCleanup - 1
fallthrough
case count > 0:
// pretend it is still in use until count expires
c.leaks[ip.String()] = count - 1
if err := rebuilt.Allocate(ip); err != nil {
runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
}
default:
// do not add it to the rebuilt set, which means it will be available for reuse
runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
}
})
// Blast the rebuilt state into storage.
if err := rebuilt.Snapshot(snapshot); err != nil {
return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err) return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
} }
if err := c.alloc.CreateOrUpdate(snapshot); err != nil {
if err := c.alloc.CreateOrUpdate(latest); err != nil {
if errors.IsConflict(err) { if errors.IsConflict(err) {
return err return err
} }

View File

@ -50,10 +50,10 @@ func (r *mockRangeRegistry) CreateOrUpdate(alloc *api.RangeAllocation) error {
func TestRepair(t *testing.T) { func TestRepair(t *testing.T) {
fakeClient := fake.NewSimpleClientset() fakeClient := fake.NewSimpleClientset()
_, cidr, _ := net.ParseCIDR("192.168.1.0/24")
ipregistry := &mockRangeRegistry{ ipregistry := &mockRangeRegistry{
item: &api.RangeAllocation{}, item: &api.RangeAllocation{Range: "192.168.1.0/24"},
} }
_, cidr, _ := net.ParseCIDR(ipregistry.item.Range)
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
if err := r.RunOnce(); err != nil { if err := r.RunOnce(); err != nil {
@ -64,7 +64,7 @@ func TestRepair(t *testing.T) {
} }
ipregistry = &mockRangeRegistry{ ipregistry = &mockRangeRegistry{
item: &api.RangeAllocation{}, item: &api.RangeAllocation{Range: "192.168.1.0/24"},
updateErr: fmt.Errorf("test error"), updateErr: fmt.Errorf("test error"),
} }
r = NewRepair(0, fakeClient.Core(), cidr, ipregistry) r = NewRepair(0, fakeClient.Core(), cidr, ipregistry)
@ -73,7 +73,7 @@ func TestRepair(t *testing.T) {
} }
} }
func TestRepairEmpty(t *testing.T) { func TestRepairLeak(t *testing.T) {
_, cidr, _ := net.ParseCIDR("192.168.1.0/24") _, cidr, _ := net.ParseCIDR("192.168.1.0/24")
previous := ipallocator.NewCIDRRange(cidr) previous := ipallocator.NewCIDRRange(cidr)
previous.Allocate(net.ParseIP("192.168.1.10")) previous.Allocate(net.ParseIP("192.168.1.10"))
@ -94,16 +94,31 @@ func TestRepairEmpty(t *testing.T) {
Data: dst.Data, Data: dst.Data,
}, },
} }
r := NewRepair(0, fakeClient.Core(), cidr, ipregistry) r := NewRepair(0, fakeClient.Core(), cidr, ipregistry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err != nil {
t.Fatal(err)
}
if !after.Has(net.ParseIP("192.168.1.10")) {
t.Errorf("expected ipallocator to still have leaked IP")
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil { if err := r.RunOnce(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
after := ipallocator.NewCIDRRange(cidr) after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if after.Has(net.ParseIP("192.168.1.10")) { if after.Has(net.ParseIP("192.168.1.10")) {
t.Errorf("unexpected ipallocator state: %#v", after) t.Errorf("expected ipallocator to not have leaked IP")
} }
} }
@ -157,14 +172,14 @@ func TestRepairWithExisting(t *testing.T) {
if err := r.RunOnce(); err != nil { if err := r.RunOnce(); err != nil {
t.Fatal(err) t.Fatal(err)
} }
after := ipallocator.NewCIDRRange(cidr) after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
if err := after.Restore(cidr, ipregistry.updated.Data); err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) { if !after.Has(net.ParseIP("192.168.1.1")) || !after.Has(net.ParseIP("192.168.1.100")) {
t.Errorf("unexpected ipallocator state: %#v", after) t.Errorf("unexpected ipallocator state: %#v", after)
} }
if after.Free() != 252 { if free := after.Free(); free != 252 {
t.Errorf("unexpected ipallocator state: %#v", after) t.Errorf("unexpected ipallocator state: %d free", free)
} }
} }