Added flag to set cluster class B network address for pods, add flag to disable allocation CIDRs for Pods. Fixed synchornization bug in NodeController registerNodes().

This commit is contained in:
Jerzy Szczepkowski
2015-04-28 17:02:45 +02:00
committed by CJ Cullen
parent 290c7b94ef
commit e967ffd522
12 changed files with 97 additions and 53 deletions

View File

@@ -53,7 +53,9 @@ type GCECloud struct {
projectID string
zone string
instanceID string
networkName string
// We assume here that nodes and master are in the same network. TODO(cjcullen) Fix it.
networkName string
// Used for accessing the metadata server
metadataAccess func(string) (string, error)
@@ -243,6 +245,7 @@ func waitForOp(op *compute.Operation, getOperation func() (*compute.Operation, e
pollOp := op
for pollOp.Status != "DONE" {
var err error
// TODO: add some backoff here.
time.Sleep(time.Second)
pollOp, err = getOperation()
if err != nil {
@@ -569,6 +572,7 @@ func (gce *GCECloud) Configure(name string, spec *api.NodeSpec) error {
}
return ErrMetadataConflict
}
// We are setting the metadata, so they can be picked-up by the configure-vm.sh script to start docker with the given CIDR for Pods.
instance.Metadata.Items = append(instance.Metadata.Items,
&compute.MetadataItems{
Key: podCIDRMetadataKey,

View File

@@ -22,6 +22,7 @@ import (
"net"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@@ -88,6 +89,8 @@ type NodeController struct {
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod time.Duration
clusterName string
clusterClassB string
allocateNodeCIDRs bool
// Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
now func() util.Time
@@ -106,7 +109,9 @@ func NewNodeController(
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
clusterName string) *NodeController {
clusterName string,
clusterClassB string,
allocateNodeCIDRs bool) *NodeController {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
if kubeClient != nil {
@@ -132,24 +137,26 @@ func NewNodeController(
lookupIP: net.LookupIP,
now: util.Now,
clusterName: clusterName,
clusterClassB: clusterClassB,
allocateNodeCIDRs: allocateNodeCIDRs,
}
}
// Generates num pod CIDRs that could be assigned to nodes.
func generateCIDRs(num int) util.StringSet {
func (nc *NodeController) generateCIDRs(num int) util.StringSet {
res := util.NewStringSet()
for i := 0; i < num; i++ {
// TODO: Make the CIDRs configurable.
res.Insert(fmt.Sprintf("10.244.%v.0/24", i))
res.Insert(fmt.Sprintf("%v.%v.0/24", nc.clusterClassB, i))
}
return res
}
// For each node from newNodes, finds its current spec in registeredNodes.
// If it is not there, it gets a new valid CIDR assigned.
func reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList {
func (nc *NodeController) reconcilePodCIDRs(newNodes, registeredNodes *api.NodeList) *api.NodeList {
registeredCIDRs := make(map[string]string)
availableCIDRs := generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items))
availableCIDRs := nc.generateCIDRs(len(newNodes.Items) + len(registeredNodes.Items))
for _, node := range registeredNodes.Items {
registeredCIDRs[node.Name] = node.Spec.PodCIDR
availableCIDRs.Delete(node.Spec.PodCIDR)
@@ -220,8 +227,8 @@ func (nc *NodeController) Run(period time.Duration, syncNodeList bool) {
if nodes, err = nc.populateAddresses(nodes); err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if nc.isRunningCloudProvider() {
reconcilePodCIDRs(nodes, &api.NodeList{})
if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs {
nc.reconcilePodCIDRs(nodes, &api.NodeList{})
}
if err := nc.registerNodes(nodes, nc.registerRetryCount, period); err != nil {
glog.Errorf("Error registering node list %+v: %v", nodes, err)
@@ -249,38 +256,37 @@ func (nc *NodeController) registerNodes(nodes *api.NodeList, retryCount int, ret
if len(nodes.Items) == 0 {
return nil
}
registered := util.NewStringSet()
nodes = nc.canonicalizeName(nodes)
for i := 0; i < retryCount; i++ {
var wg sync.WaitGroup
wg.Add(len(nodes.Items))
for i := range nodes.Items {
go func(node *api.Node) {
toRegister := util.NewStringSet()
var wg sync.WaitGroup
var successfullyRegistered int32 = 0
for i := range nodes.Items {
node := &nodes.Items[i]
if !toRegister.Has(node.Name) {
wg.Add(1)
toRegister.Insert(node.Name)
go func(n *api.Node) {
defer wg.Done()
if registered.Has(node.Name) {
return
for i := 0; i < retryCount; i++ {
if nc.isRunningCloudProvider() && nc.allocateNodeCIDRs {
nc.configureNodeCIDR(n)
}
_, err := nc.kubeClient.Nodes().Create(n)
if err == nil || apierrors.IsAlreadyExists(err) {
glog.Infof("Registered node in registry: %v", n.Name)
atomic.AddInt32(&successfullyRegistered, 1)
return
} else {
glog.Errorf("Error registering node %v (retries left: %v): %v", n.Name, retryCount-i-1, err)
}
time.Sleep(retryInterval)
}
if nc.isRunningCloudProvider() {
nc.configureNodeCIDR(node)
}
_, err := nc.kubeClient.Nodes().Create(node)
if err == nil || apierrors.IsAlreadyExists(err) {
registered.Insert(node.Name)
glog.Infof("Registered node in registry: %s", node.Name)
} else {
glog.Errorf("Error registering node %s, retrying: %s", node.Name, err)
}
}(&nodes.Items[i])
glog.Errorf("Unable to register node %v", n.Name)
}(node)
}
wg.Wait()
if registered.Len() == len(nodes.Items) {
glog.Infof("Successfully registered all nodes")
return nil
}
time.Sleep(retryInterval)
}
if registered.Len() != len(nodes.Items) {
wg.Wait()
if int32(toRegister.Len()) != atomic.LoadInt32(&successfullyRegistered) {
return ErrRegistration
} else {
return nil
@@ -302,7 +308,9 @@ func (nc *NodeController) syncCloudNodes() error {
node := nodes.Items[i]
nodeMap[node.Name] = &node
}
reconcilePodCIDRs(matches, nodes)
if nc.allocateNodeCIDRs {
nc.reconcilePodCIDRs(matches, nodes)
}
var wg sync.WaitGroup
wg.Add(len(matches.Items))
// Create nodes which have been created in cloud, but not in kubernetes cluster
@@ -320,7 +328,9 @@ func (nc *NodeController) syncCloudNodes() error {
return
}
node.Status.Addresses = nodeList.Items[0].Status.Addresses
nc.configureNodeCIDR(node)
if nc.allocateNodeCIDRs {
nc.configureNodeCIDR(node)
}
glog.Infof("Create node in registry: %s", node.Name)
_, err = nc.kubeClient.Nodes().Create(node)
if err != nil {
@@ -337,7 +347,9 @@ func (nc *NodeController) syncCloudNodes() error {
for nodeID := range nodeMap {
go func(nodeID string) {
defer wg.Done()
nc.unassignNodeCIDR(nodeID)
if nc.allocateNodeCIDRs {
nc.unassignNodeCIDR(nodeID)
}
glog.Infof("Delete node from registry: %s", nodeID)
err = nc.kubeClient.Nodes().Delete(nodeID)
if err != nil {

View File

@@ -21,6 +21,7 @@ import (
"fmt"
"reflect"
"sort"
"sync"
"testing"
"time"
@@ -59,6 +60,9 @@ type FakeNodeHandler struct {
UpdatedNodes []*api.Node
UpdatedNodeStatuses []*api.Node
RequestCount int
// Synchronization
createLock sync.Mutex
}
func (c *FakeNodeHandler) Nodes() client.NodeInterface {
@@ -66,7 +70,11 @@ func (c *FakeNodeHandler) Nodes() client.NodeInterface {
}
func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) {
defer func() { m.RequestCount++ }()
m.createLock.Lock()
defer func() {
m.RequestCount++
m.createLock.Unlock()
}()
for _, n := range m.Existing {
if n.Name == node.Name {
return nil, apierrors.NewAlreadyExists("Minion", node.Name)
@@ -238,7 +246,7 @@ func TestRegisterNodes(t *testing.T) {
nodes.Items = append(nodes.Items, *newNode(machine))
}
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
err := nodeController.registerNodes(&nodes, item.retryCount, time.Millisecond)
if !item.expectedFail && err != nil {
t.Errorf("unexpected error: %v", err)
@@ -324,7 +332,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
nodes, err := nodeController.getStaticNodesWithSpec()
if err != nil {
t.Errorf("unexpected error: %v", err)
@@ -386,7 +394,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
nodes, err := nodeController.getCloudNodesWithSpec()
if err != nil {
t.Errorf("unexpected error: %v", err)
@@ -496,7 +504,7 @@ func TestSyncCloudNodes(t *testing.T) {
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
}
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
if err := nodeController.syncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -580,7 +588,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
item.fakeNodeHandler.Fake = testclient.NewSimpleFake()
}
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
if err := nodeController.syncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -620,7 +628,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, 10, time.Minute,
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
util.NewFakeRateLimiter(), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
result, err := nodeController.populateAddresses(item.nodes)
// In case of IP querying error, we should continue.
if err != nil {
@@ -820,7 +828,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
@@ -1022,7 +1030,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, 10, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "")
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, "", "", false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)