Adds support for PodCIDR allocation from the GCE cloud provider
If CIDRAllocatorType is set to `CloudCIDRAllocator`, then allocation of CIDR allocation instead is done by the external cloud provider and the node controller is only responsible for reflecting the allocation into the node spec. - Splits off the rangeAllocator from the cidr_allocator.go file. - Adds cloudCIDRAllocator, which is used when the cloud provider allocates the CIDR ranges externally. (GCE support only) - Updates RBAC permission for node controller to include PATCH
This commit is contained in:
262
pkg/controller/node/range_allocator.go
Normal file
262
pkg/controller/node/range_allocator.go
Normal file
@@ -0,0 +1,262 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package node
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
clientv1 "k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// TODO: figure out the good setting for those constants.
|
||||
const (
|
||||
// controls how many NodeSpec updates NC can process concurrently.
|
||||
cidrUpdateWorkers = 10
|
||||
cidrUpdateQueueSize = 5000
|
||||
// podCIDRUpdateRetry controls the number of retries of writing Node.Spec.PodCIDR update.
|
||||
podCIDRUpdateRetry = 5
|
||||
)
|
||||
|
||||
type rangeAllocator struct {
|
||||
client clientset.Interface
|
||||
cidrs *cidrSet
|
||||
clusterCIDR *net.IPNet
|
||||
maxCIDRs int
|
||||
// Channel that is used to pass updating Nodes with assigned CIDRs to the background
|
||||
// This increases a throughput of CIDR assignment by not blocking on long operations.
|
||||
nodeCIDRUpdateChannel chan nodeAndCIDR
|
||||
recorder record.EventRecorder
|
||||
// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
|
||||
sync.Mutex
|
||||
nodesInProcessing sets.String
|
||||
}
|
||||
|
||||
// NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDR for node
|
||||
// Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
|
||||
// Caller must always pass in a list of existing nodes so the new allocator
|
||||
// can initialize its CIDR map. NodeList is only nil in testing.
|
||||
func NewCIDRRangeAllocator(client clientset.Interface, clusterCIDR *net.IPNet, serviceCIDR *net.IPNet, subNetMaskSize int, nodeList *v1.NodeList) (CIDRAllocator, error) {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cidrAllocator"})
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
if client != nil {
|
||||
glog.V(0).Infof("Sending events to api server.")
|
||||
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.Core().RESTClient()).Events("")})
|
||||
} else {
|
||||
glog.Fatalf("kubeClient is nil when starting NodeController")
|
||||
}
|
||||
|
||||
ra := &rangeAllocator{
|
||||
client: client,
|
||||
cidrs: newCIDRSet(clusterCIDR, subNetMaskSize),
|
||||
clusterCIDR: clusterCIDR,
|
||||
nodeCIDRUpdateChannel: make(chan nodeAndCIDR, cidrUpdateQueueSize),
|
||||
recorder: recorder,
|
||||
nodesInProcessing: sets.NewString(),
|
||||
}
|
||||
|
||||
if serviceCIDR != nil {
|
||||
ra.filterOutServiceRange(serviceCIDR)
|
||||
} else {
|
||||
glog.V(0).Info("No Service CIDR provided. Skipping filtering out service addresses.")
|
||||
}
|
||||
|
||||
if nodeList != nil {
|
||||
for _, node := range nodeList.Items {
|
||||
if node.Spec.PodCIDR == "" {
|
||||
glog.Infof("Node %v has no CIDR, ignoring", node.Name)
|
||||
continue
|
||||
} else {
|
||||
glog.Infof("Node %v has CIDR %s, occupying it in CIDR map",
|
||||
node.Name, node.Spec.PodCIDR)
|
||||
}
|
||||
if err := ra.occupyCIDR(&node); err != nil {
|
||||
// This will happen if:
|
||||
// 1. We find garbage in the podCIDR field. Retrying is useless.
|
||||
// 2. CIDR out of range: This means a node CIDR has changed.
|
||||
// This error will keep crashing controller-manager.
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
for i := 0; i < cidrUpdateWorkers; i++ {
|
||||
go func(stopChan <-chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case workItem, ok := <-ra.nodeCIDRUpdateChannel:
|
||||
if !ok {
|
||||
glog.Warning("NodeCIDRUpdateChannel read returned false.")
|
||||
return
|
||||
}
|
||||
ra.updateCIDRAllocation(workItem)
|
||||
case <-stopChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}(wait.NeverStop)
|
||||
}
|
||||
|
||||
return ra, nil
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
if r.nodesInProcessing.Has(nodeName) {
|
||||
return false
|
||||
}
|
||||
r.nodesInProcessing.Insert(nodeName)
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.nodesInProcessing.Delete(nodeName)
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) occupyCIDR(node *v1.Node) error {
|
||||
defer r.removeNodeFromProcessing(node.Name)
|
||||
if node.Spec.PodCIDR == "" {
|
||||
return nil
|
||||
}
|
||||
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
|
||||
}
|
||||
if err := r.cidrs.occupy(podCIDR); err != nil {
|
||||
return fmt.Errorf("failed to mark cidr as occupied: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WARNING: If you're adding any return calls or defer any more work from this
|
||||
// function you have to handle correctly nodesInProcessing.
|
||||
func (r *rangeAllocator) AllocateOrOccupyCIDR(node *v1.Node) error {
|
||||
if node == nil {
|
||||
return nil
|
||||
}
|
||||
if !r.insertNodeToProcessing(node.Name) {
|
||||
glog.V(2).Infof("Node %v is already in a process of CIDR assignment.", node.Name)
|
||||
return nil
|
||||
}
|
||||
if node.Spec.PodCIDR != "" {
|
||||
return r.occupyCIDR(node)
|
||||
}
|
||||
podCIDR, err := r.cidrs.allocateNext()
|
||||
if err != nil {
|
||||
r.removeNodeFromProcessing(node.Name)
|
||||
recordNodeStatusChange(r.recorder, node, "CIDRNotAvailable")
|
||||
return fmt.Errorf("failed to allocate cidr: %v", err)
|
||||
}
|
||||
|
||||
glog.V(10).Infof("Putting node %s with CIDR %s into the work queue", node.Name, podCIDR)
|
||||
r.nodeCIDRUpdateChannel <- nodeAndCIDR{
|
||||
nodeName: node.Name,
|
||||
cidr: podCIDR,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rangeAllocator) ReleaseCIDR(node *v1.Node) error {
|
||||
if node == nil || node.Spec.PodCIDR == "" {
|
||||
return nil
|
||||
}
|
||||
_, podCIDR, err := net.ParseCIDR(node.Spec.PodCIDR)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to parse CIDR %s on Node %v: %v", node.Spec.PodCIDR, node.Name, err)
|
||||
}
|
||||
|
||||
glog.V(4).Infof("release CIDR %s", node.Spec.PodCIDR)
|
||||
if err = r.cidrs.release(podCIDR); err != nil {
|
||||
return fmt.Errorf("Error when releasing CIDR %v: %v", node.Spec.PodCIDR, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used,
|
||||
// so that they won't be assignable.
|
||||
func (r *rangeAllocator) filterOutServiceRange(serviceCIDR *net.IPNet) {
|
||||
// Checks if service CIDR has a nonempty intersection with cluster
|
||||
// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
|
||||
// clusterCIDR's Mask applied (this means that clusterCIDR contains
|
||||
// serviceCIDR) or vice versa (which means that serviceCIDR contains
|
||||
// clusterCIDR).
|
||||
if !r.clusterCIDR.Contains(serviceCIDR.IP.Mask(r.clusterCIDR.Mask)) && !serviceCIDR.Contains(r.clusterCIDR.IP.Mask(serviceCIDR.Mask)) {
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.cidrs.occupy(serviceCIDR); err != nil {
|
||||
glog.Errorf("Error filtering out service cidr %v: %v", serviceCIDR, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Assigns CIDR to Node and sends an update to the API server.
|
||||
func (r *rangeAllocator) updateCIDRAllocation(data nodeAndCIDR) error {
|
||||
var err error
|
||||
var node *v1.Node
|
||||
defer r.removeNodeFromProcessing(data.nodeName)
|
||||
for rep := 0; rep < podCIDRUpdateRetry; rep++ {
|
||||
// TODO: change it to using PATCH instead of full Node updates.
|
||||
node, err = r.client.Core().Nodes().Get(data.nodeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
glog.Errorf("Failed while getting node %v to retry updating Node.Spec.PodCIDR: %v", data.nodeName, err)
|
||||
continue
|
||||
}
|
||||
if node.Spec.PodCIDR != "" {
|
||||
glog.Errorf("Node %v already has allocated CIDR %v. Releasing assigned one if different.", node.Name, node.Spec.PodCIDR)
|
||||
if node.Spec.PodCIDR != data.cidr.String() {
|
||||
if err := r.cidrs.release(data.cidr); err != nil {
|
||||
glog.Errorf("Error when releasing CIDR %v", data.cidr.String())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
node.Spec.PodCIDR = data.cidr.String()
|
||||
if _, err := r.client.Core().Nodes().Update(node); err != nil {
|
||||
glog.Errorf("Failed while updating Node.Spec.PodCIDR (%d retries left): %v", podCIDRUpdateRetry-rep-1, err)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
recordNodeStatusChange(r.recorder, node, "CIDRAssignmentFailed")
|
||||
// We accept the fact that we may leek CIDRs here. This is safer than releasing
|
||||
// them in case when we don't know if request went through.
|
||||
// NodeController restart will return all falsely allocated CIDRs to the pool.
|
||||
if !apierrors.IsServerTimeout(err) {
|
||||
glog.Errorf("CIDR assignment for node %v failed: %v. Releasing allocated CIDR", data.nodeName, err)
|
||||
if releaseErr := r.cidrs.release(data.cidr); releaseErr != nil {
|
||||
glog.Errorf("Error releasing allocated CIDR for node %v: %v", data.nodeName, releaseErr)
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
Reference in New Issue
Block a user