Make nodecontroller configure nodes' pod IP ranges

This commit is contained in:
Tomek Kulczynski
2015-02-24 13:32:44 +01:00
committed by CJ Cullen
parent db6586bdab
commit 290c7b94ef
13 changed files with 286 additions and 126 deletions

View File

@@ -973,3 +973,11 @@ func (aws *AWSCloud) DeleteVolume(volumeName string) error {
}
return awsDisk.delete()
}
func (v *AWSCloud) Configure(name string, spec *api.NodeSpec) error {
return nil
}
func (v *AWSCloud) Release(name string) error {
return nil
}

View File

@@ -80,6 +80,10 @@ type Instances interface {
List(filter string) ([]string, error)
// GetNodeResources gets the resources for a particular node
GetNodeResources(name string) (*api.NodeResources, error)
// Configure the specified instance using the spec
Configure(name string, spec *api.NodeSpec) error
// Delete all the configuration related to the instance, including other cloud resources
Release(name string) error
}
// Zone represents the location of a particular machine.

View File

@@ -159,3 +159,13 @@ func (f *FakeCloud) GetNodeResources(name string) (*api.NodeResources, error) {
f.addCall("get-node-resources")
return f.NodeResources, f.Err
}
func (f *FakeCloud) Configure(name string, spec *api.NodeSpec) error {
f.addCall("configure")
return f.Err
}
func (f *FakeCloud) Release(name string) error {
f.addCall("release")
return f.Err
}

View File

@@ -17,6 +17,7 @@ limitations under the License.
package gce_cloud
import (
"errors"
"fmt"
"io"
"io/ioutil"
@@ -42,6 +43,9 @@ import (
"google.golang.org/cloud/compute/metadata"
)
var ErrMetadataConflict = errors.New("Metadata already set at the same key")
const podCIDRMetadataKey string = "node-ip-range"
// GCECloud is an implementation of Interface, TCPLoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
service *compute.Service
@@ -49,6 +53,7 @@ type GCECloud struct {
projectID string
zone string
instanceID string
networkName string
// Used for accessing the metadata server
metadataAccess func(string) (string, error)
@@ -113,6 +118,18 @@ func getInstanceID() (string, error) {
return parts[0], nil
}
func getNetworkName() (string, error) {
result, err := metadata.Get("instance/network-interfaces/0/network")
if err != nil {
return "", err
}
parts := strings.Split(result, "/")
if len(parts) != 4 {
return "", fmt.Errorf("unexpected response: %s", result)
}
return parts[3], nil
}
// newGCECloud creates a new instance of GCECloud.
func newGCECloud(config io.Reader) (*GCECloud, error) {
projectID, zone, err := getProjectAndZone()
@@ -126,6 +143,10 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
if err != nil {
return nil, err
}
networkName, err := getNetworkName()
if err != nil {
return nil, err
}
tokenSource := google.ComputeTokenSource("")
if config != nil {
var cfg Config
@@ -152,6 +173,7 @@ func newGCECloud(config io.Reader) (*GCECloud, error) {
projectID: projectID,
zone: zone,
instanceID: instanceID,
networkName: networkName,
metadataAccess: getMetadata,
}, nil
}
@@ -217,12 +239,12 @@ func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, error)) error {
pollOp := op
for pollOp.Status != "DONE" {
var err error
time.Sleep(time.Second)
pollOp, err = gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do()
pollOp, err = getOperation()
if err != nil {
return err
}
@@ -234,6 +256,25 @@ func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error
}
}
return nil
}
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error {
return waitForOp(op, func() (*compute.Operation, error) {
return gce.service.GlobalOperations.Get(gce.projectID, op.Name).Do()
})
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
return waitForOp(op, func() (*compute.Operation, error) {
return gce.service.RegionOperations.Get(gce.projectID, region, op.Name).Do()
})
}
func (gce *GCECloud) waitForZoneOp(op *compute.Operation) error {
return waitForOp(op, func() (*compute.Operation, error) {
return gce.service.ZoneOperations.Get(gce.projectID, gce.zone, op.Name).Do()
})
}
// GetTCPLoadBalancer is an implementation of TCPLoadBalancer.GetTCPLoadBalancer
@@ -506,6 +547,66 @@ func (gce *GCECloud) GetNodeResources(name string) (*api.NodeResources, error) {
}
}
func getMetadataValue(metadata *compute.Metadata, key string) (string, bool) {
for _, item := range metadata.Items {
if item.Key == key {
return item.Value, true
}
}
return "", false
}
func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
instanceName := canonicalizeInstanceName(name)
instance, err := gce.service.Instances.Get(gce.projectID, gce.zone, instanceName).Do()
if err != nil {
return err
}
if currentValue, ok := getMetadataValue(instance.Metadata, podCIDRMetadataKey); ok {
if currentValue == spec.PodCIDR {
// IP range already set to proper value.
return nil
}
return ErrMetadataConflict
}
instance.Metadata.Items = append(instance.Metadata.Items,
&compute.MetadataItems{
Key: podCIDRMetadataKey,
Value: spec.PodCIDR,
})
setMetadataCall := gce.service.Instances.SetMetadata(gce.projectID, gce.zone, instanceName, instance.Metadata)
setMetadataOp, err := setMetadataCall.Do()
if err != nil {
return err
}
err = gce.waitForZoneOp(setMetadataOp)
if err != nil {
return err
}
insertCall := gce.service.Routes.Insert(gce.projectID, &compute.Route{
Name: instanceName,
DestRange: spec.PodCIDR,
NextHopInstance: fmt.Sprintf("zones/%s/instances/%s", gce.zone, instanceName),
Network: fmt.Sprintf("global/networks/%s", gce.networkName),
Priority: 1000,
})
insertOp, err := insertCall.Do()
if err != nil {
return err
}
return gce.waitForGlobalOp(insertOp)
}
func (gce *GCECloud) Release(name string) error {
instanceName := canonicalizeInstanceName(name)
deleteCall := gce.service.Routes.Delete(gce.projectID, instanceName)
deleteOp, err := deleteCall.Do()
if err != nil {
return err
}
return gce.waitForGlobalOp(deleteOp)
}
func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
region, err := getGceRegion(gce.zone)
if err != nil {

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -134,6 +135,61 @@ func NewNodeController(
}
}
// Generates num pod CIDRs that could be assigned to nodes.
func generateCIDRs(num int) util.StringSet {
res := util.NewStringSet()
for i := 0; i < num; i++ {
// TODO: Make the CIDRs configurable.
res.Insert(fmt.Sprintf("10.244.%v.0/24", i))
}
return res
}
// For each node from newNodes, finds its current spec in registeredNodes.
// If it is not there, it gets a new valid CIDR assigned.
func reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList {
registeredCIDRs := make(map[string]string)
availableCIDRs := generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items))
for _, node := range registeredNodes.Items {
registeredCIDRs[node.Name] = node.Spec.PodCIDR
availableCIDRs.Delete(node.Spec.PodCIDR)
}
for i, node := range newNodes.Items {
podCIDR, registered := registeredCIDRs[node.Name]
if !registered {
podCIDR, _ = availableCIDRs.PopAny()
}
newNodes.Items[i].Spec.PodCIDR = podCIDR
}
return newNodes
}
func (nc *NodeController) configureNodeCIDR(node *api.Node) {
instances, ok := nc.cloud.Instances()
if !ok {
glog.Errorf("Error configuring node %s: CloudProvider does not support Instances()", node.Name)
return
}
err := instances.Configure(node.Name, &node.Spec)
if err != nil {
glog.Errorf("Error configuring node %s: %s", node.Name, err)
// The newly assigned CIDR was not properly configured, so don't save it in the API server.
node.Spec.PodCIDR = ""
}
}
func (nc *NodeController) unassignNodeCIDR(nodeName string) {
instances, ok := nc.cloud.Instances()
if !ok {
glog.Errorf("Error deconfiguring node %s: CloudProvider does not support Instances()", nodeName)
return
}
err := instances.Release(nodeName)
if err != nil {
glog.Errorf("Error deconfiguring node %s: %s", nodeName, err)
}
}
// Run creates initial node list and start syncing instances from cloudprovider, if any.
// It also starts syncing or monitoring cluster node status.
// 1. registerNodes() is called only once to register all initial nodes (from cloudprovider
@@ -164,6 +220,9 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
if nodes, err = nc.populateAddresses(nodes); err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if nc.isRunningCloudProvider() {
reconcilePodCIDRs(nodes, &api.NodeList{})
}
if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil {
glog.Errorf("Error registering node list %+v: %v", nodes, err)
}
@@ -194,21 +253,30 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret
registered := util.NewStringSet()
nodes = nc.canonicalizeName(nodes)
for i := 0; i < retryCount; i++ {
for _, node := range nodes.Items {
if registered.Has(node.Name) {
continue
}
_, err := nc.kubeClient.Nodes().Create(&node)
if err == nil || apierrors.IsAlreadyExists(err) {
registered.Insert(node.Name)
glog.Infof("Registered node in registry: %s", node.Name)
} else {
glog.Errorf("Error registering node %s, retrying: %s", node.Name, err)
}
if registered.Len() == len(nodes.Items) {
glog.Infof("Successfully registered all nodes")
return nil
}
var wg sync.WaitGroup
wg.Add(len(nodes.Items))
for i := range nodes.Items {
go func(node *api.Node) {
defer wg.Done()
if registered.Has(node.Name) {
return
}
if nc.isRunningCloudProvider() {
nc.configureNodeCIDR(node)
}
_, err := nc.kubeClient.Nodes().Create(node)
if err == nil || apierrors.IsAlreadyExists(err) {
registered.Insert(node.Name)
glog.Infof("Registered node in registry: %s", node.Name)
} else {
glog.Errorf("Error registering node %s, retrying: %s", node.Name, err)
}
}(&nodes.Items[i])
}
wg.Wait()
if registered.Len() == len(nodes.Items) {
glog.Infof("Successfully registered all nodes")
return nil
}
time.Sleep(retryInterval)
}
@@ -234,39 +302,51 @@ func (nc *NodeController) syncCloudNodes() error {
node := nodes.Items[i]
nodeMap[node.Name] = &node
}
reconcilePodCIDRs(matches, nodes)
var wg sync.WaitGroup
wg.Add(len(matches.Items))
// Create nodes which have been created in cloud, but not in kubernetes cluster
// Skip nodes if we hit an error while trying to get their addresses.
for _, node := range matches.Items {
if _, ok := nodeMap[node.Name]; !ok {
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
nodeList := &api.NodeList{}
nodeList.Items = []api.Node{node}
_, err = nc.populateAddresses(nodeList)
if err != nil {
glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err)
continue
for i := range matches.Items {
go func(node *api.Node) {
defer wg.Done()
if _, ok := nodeMap[node.Name]; !ok {
glog.V(3).Infof("Querying addresses for new node: %s", node.Name)
nodeList := &api.NodeList{}
nodeList.Items = []api.Node{*node}
_, err = nc.populateAddresses(nodeList)
if err != nil {
glog.Errorf("Error fetching addresses for new node %s: %v", node.Name, err)
return
}
node.Status.Addresses = nodeList.Items[0].Status.Addresses
nc.configureNodeCIDR(node)
glog.Infof("Create node in registry: %s", node.Name)
_, err = nc.kubeClient.Nodes().Create(node)
if err != nil {
glog.Errorf("Create node %s error: %v", node.Name, err)
}
}
node.Status.Addresses = nodeList.Items[0].Status.Addresses
glog.Infof("Create node in registry: %s", node.Name)
_, err = nc.kubeClient.Nodes().Create(&node)
if err != nil {
glog.Errorf("Create node %s error: %v", node.Name, err)
}
}
delete(nodeMap, node.Name)
delete(nodeMap, node.Name)
}(&matches.Items[i])
}
wg.Wait()
wg.Add(len(nodeMap))
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
for nodeID := range nodeMap {
glog.Infof("Delete node from registry: %s", nodeID)
err = nc.kubeClient.Nodes().Delete(nodeID)
if err != nil {
glog.Errorf("Delete node %s error: %v", nodeID, err)
}
nc.deletePods(nodeID)
go func(nodeID string) {
defer wg.Done()
nc.unassignNodeCIDR(nodeID)
glog.Infof("Delete node from registry: %s", nodeID)
err = nc.kubeClient.Nodes().Delete(nodeID)
if err != nil {
glog.Errorf("Delete node %s error: %v", nodeID, err)
}
nc.deletePods(nodeID)
}(nodeID)
}
wg.Wait()
return nil
}

View File

@@ -389,6 +389,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
return rsrc, nil
}
func (i *Instances) Configure(name string, spec *api.NodeSpec) error {
return nil
}
func (i *Instances) Release(name string) error {
return nil
}
func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) {
return nil, false
}

View File

@@ -250,3 +250,11 @@ func (v *OVirtCloud) List(filter string) ([]string, error) {
func (v *OVirtCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}
func (v *OVirtCloud) Configure(name string, spec *api.NodeSpec) error {
return nil
}
func (v *OVirtCloud) Release(name string) error {
return nil
}

View File

@@ -395,6 +395,14 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) {
return rsrc, nil
}
func (i *Instances) Configure(name string, spec *api.NodeSpec) error {
return nil
}
func (i *Instances) Release(name string) error {
return nil
}
func (os *Rackspace) Clusters() (cloudprovider.Clusters, bool) {
return nil, false
}

View File

@@ -239,3 +239,11 @@ func (v *VagrantCloud) List(filter string) ([]string, error) {
func (v *VagrantCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, nil
}
func (v *VagrantCloud) Configure(name string, spec *api.NodeSpec) error {
return nil
}
func (v *VagrantCloud) Release(name string) error {
return nil
}