cloud initialize node in external cloud controller

This commit is contained in:
wlan0
2017-03-29 16:21:42 -07:00
committed by Sidhartha Mani
parent 069a25f378
commit 45d2bc06b7
15 changed files with 1131 additions and 91 deletions

View File

@@ -28,7 +28,7 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider"
)
const ProviderName = "fake"
const defaultProviderName = "fake"
// FakeBalancer is a fake storage of balancer information
type FakeBalancer struct {
@@ -61,6 +61,8 @@ type FakeCloud struct {
UpdateCalls []FakeUpdateBalancerCall
RouteMap map[string]*FakeRoute
Lock sync.Mutex
Provider string
addCallLock sync.Mutex
cloudprovider.Zone
}
@@ -70,6 +72,8 @@ type FakeRoute struct {
}
func (f *FakeCloud) addCall(desc string) {
f.addCallLock.Lock()
defer f.addCallLock.Unlock()
f.Calls = append(f.Calls, desc)
}
@@ -92,7 +96,10 @@ func (f *FakeCloud) Clusters() (cloudprovider.Clusters, bool) {
// ProviderName returns the cloud provider ID.
func (f *FakeCloud) ProviderName() string {
return ProviderName
if f.Provider == "" {
return defaultProviderName
}
return f.Provider
}
// ScrubDNS filters DNS settings for pods.

View File

@@ -18,7 +18,9 @@ go_library(
"//pkg/api/v1/node:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/retry:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/util/node:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
@@ -26,6 +28,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)
@@ -46,6 +49,7 @@ go_test(
"//pkg/controller/node/testutil:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",

View File

@@ -28,15 +28,24 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
nodeutil "k8s.io/kubernetes/pkg/api/v1/node"
nodeutilv1 "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
clientretry "k8s.io/kubernetes/pkg/client/retry"
"k8s.io/kubernetes/pkg/cloudprovider"
nodeutil "k8s.io/kubernetes/pkg/util/node"
)
var UpdateNodeSpecBackoff = wait.Backoff{
Steps: 20,
Duration: 50 * time.Millisecond,
Jitter: 1.0,
}
type CloudNodeController struct {
nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface
@@ -48,6 +57,8 @@ type CloudNodeController struct {
// check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod
// set in controller-manager
nodeMonitorPeriod time.Duration
nodeStatusUpdateFrequency time.Duration
}
const (
@@ -63,96 +74,358 @@ func NewCloudNodeController(
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration) *CloudNodeController {
nodeMonitorPeriod time.Duration,
nodeStatusUpdateFrequency time.Duration) *CloudNodeController {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloudcontrollermanager"})
eventBroadcaster.StartLogging(glog.Infof)
if kubeClient != nil {
glog.V(0).Infof("Sending events to api server.")
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
} else {
glog.V(0).Infof("No api server defined - no events will be sent to API server.")
}
cnc := &CloudNodeController{
nodeInformer: nodeInformer,
kubeClient: kubeClient,
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeInformer: nodeInformer,
kubeClient: kubeClient,
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cnc.AddCloudNode,
})
return cnc
}
// This controller deletes a node if kubelet is not reporting
// and the node is gone from the cloud provider.
func (cnc *CloudNodeController) Run() {
go func() {
defer utilruntime.HandleCrash()
defer utilruntime.HandleCrash()
go wait.Until(func() {
nodes, err := cnc.kubeClient.Core().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
// The following loops run communicate with the APIServer with a worst case complexity
// of O(num_nodes) per cycle. These functions are justified here because these events fire
// very infrequently. DO NOT MODIFY this to perform frequent operations.
for i := range nodes.Items {
var currentReadyCondition *v1.NodeCondition
node := &nodes.Items[i]
// Try to get the current node status
// If node status is empty, then kubelet has not posted ready status yet. In this case, process next node
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
_, currentReadyCondition = nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition != nil {
break
}
name := node.Name
node, err = cnc.kubeClient.Core().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
break
}
time.Sleep(retrySleepTime)
}
if currentReadyCondition == nil {
glog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count.", node.Name)
continue
}
// If the known node status says that Node is NotReady, then check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately
if currentReadyCondition != nil {
if currentReadyCondition.Status != v1.ConditionTrue {
instances, ok := cnc.cloud.Instances()
if !ok {
glog.Errorf("cloud provider does not support instances.")
continue
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil {
if err == cloudprovider.InstanceNotFound {
glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name)
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.UID),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name)
cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode")
go func(nodeName string) {
defer utilruntime.HandleCrash()
if err := cnc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("unable to delete node %q: %v", node.Name, err)
}
}(node.Name)
}
glog.Errorf("Error getting node data from cloud: %v", err)
}
}
}
}
}, cnc.nodeMonitorPeriod, wait.NeverStop)
}()
// Start a loop to periodically update the node addresses obtained from the cloud
go wait.Until(cnc.UpdateNodeStatus, cnc.nodeStatusUpdateFrequency, wait.NeverStop)
// Start a loop to periodically check if any nodes have been deleted from cloudprovider
go wait.Until(cnc.MonitorNode, cnc.nodeMonitorPeriod, wait.NeverStop)
}
// UpdateNodeStatus updates the node status, such as node addresses
func (cnc *CloudNodeController) UpdateNodeStatus() {
instances, ok := cnc.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
glog.Errorf("Error monitoring node status: %v", err)
return
}
for i := range nodes.Items {
cnc.updateNodeAddress(&nodes.Items[i], instances)
}
}
// UpdateNodeAddress updates the nodeAddress of a single node
func (cnc *CloudNodeController) updateNodeAddress(node *v1.Node, instances cloudprovider.Instances) {
// Do not process nodes that are still tainted
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint != nil {
glog.V(5).Infof("This node %s is still tainted. Will not process.", node.Name)
return
}
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, node)
if err != nil {
glog.Errorf("%v", err)
return
}
// Check if a hostname address exists in the cloud provided addresses
hostnameExists := false
for i := range nodeAddresses {
if nodeAddresses[i].Type == v1.NodeHostName {
hostnameExists = true
}
}
// If hostname was not present in cloud provided addresses, use the hostname
// from the existing node (populated by kubelet)
if !hostnameExists {
for _, addr := range node.Status.Addresses {
if addr.Type == v1.NodeHostName {
nodeAddresses = append(nodeAddresses, addr)
}
}
}
// If nodeIP was suggested by user, ensure that
// it can be found in the cloud as well (consistent with the behaviour in kubelet)
if nodeIP, ok := ensureNodeProvidedIPExists(node, nodeAddresses); ok {
if nodeIP == nil {
glog.Errorf("Specified Node IP not found in cloudprovider")
return
}
nodeAddresses = []v1.NodeAddress{*nodeIP}
}
nodeCopy, err := api.Scheme.DeepCopy(node)
if err != nil {
glog.Errorf("failed to copy node to a new object")
return
}
newNode := nodeCopy.(*v1.Node)
newNode.Status.Addresses = nodeAddresses
if !nodeAddressesChangeDetected(node.Status.Addresses, newNode.Status.Addresses) {
return
}
_, err = nodeutil.PatchNodeStatus(cnc.kubeClient, types.NodeName(node.Name), node, newNode)
if err != nil {
glog.Errorf("Error patching node with cloud ip addresses = [%v]", err)
}
}
// Monitor node queries the cloudprovider for non-ready nodes and deletes them
// if they cannot be found in the cloud provider
func (cnc *CloudNodeController) MonitorNode() {
instances, ok := cnc.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
nodes, err := cnc.kubeClient.CoreV1().Nodes().List(metav1.ListOptions{ResourceVersion: "0"})
if err != nil {
glog.Errorf("Error monitoring node status: %v", err)
return
}
for i := range nodes.Items {
var currentReadyCondition *v1.NodeCondition
node := &nodes.Items[i]
// Try to get the current node status
// If node status is empty, then kubelet has not posted ready status yet. In this case, process next node
for rep := 0; rep < nodeStatusUpdateRetry; rep++ {
_, currentReadyCondition = nodeutilv1.GetNodeCondition(&node.Status, v1.NodeReady)
if currentReadyCondition != nil {
break
}
name := node.Name
node, err = cnc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name)
break
}
time.Sleep(retrySleepTime)
}
if currentReadyCondition == nil {
glog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count.", node.Name)
continue
}
// If the known node status says that Node is NotReady, then check if the node has been removed
// from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately
if currentReadyCondition != nil {
if currentReadyCondition.Status != v1.ConditionTrue {
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil {
if err == cloudprovider.InstanceNotFound {
glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name)
ref := &v1.ObjectReference{
Kind: "Node",
Name: node.Name,
UID: types.UID(node.UID),
Namespace: "",
}
glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name)
cnc.recorder.Eventf(ref, v1.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode")
go func(nodeName string) {
defer utilruntime.HandleCrash()
if err := cnc.kubeClient.CoreV1().Nodes().Delete(node.Name, nil); err != nil {
glog.Errorf("unable to delete node %q: %v", node.Name, err)
}
}(node.Name)
}
glog.Errorf("Error getting node data from cloud: %v", err)
}
}
}
}
}
// This processes nodes that were added into the cluster, and cloud initializea them if appropriate
func (cnc *CloudNodeController) AddCloudNode(obj interface{}) {
node := obj.(*v1.Node)
instances, ok := cnc.cloud.Instances()
if !ok {
utilruntime.HandleError(fmt.Errorf("failed to get instances from cloud provider"))
return
}
cloudTaint := getCloudTaint(node.Spec.Taints)
if cloudTaint == nil {
glog.V(2).Infof("This node %s is registered without the cloud taint. Will not process.", node.Name)
return
}
err := clientretry.RetryOnConflict(UpdateNodeSpecBackoff, func() error {
curNode, err := cnc.kubeClient.CoreV1().Nodes().Get(node.Name, metav1.GetOptions{})
if err != nil {
return err
}
nodeAddresses, err := getNodeAddressesByProviderIDOrName(instances, curNode)
if err != nil {
glog.Errorf("%v", err)
return nil
}
// If user provided an IP address, ensure that IP address is found
// in the cloud provider before removing the taint on the node
if nodeIP, ok := ensureNodeProvidedIPExists(curNode, nodeAddresses); ok {
if nodeIP == nil {
glog.Errorf("failed to get specified nodeIP in cloudprovider")
return nil
}
}
if instanceType, err := getInstanceTypeByProviderIDOrName(instances, curNode); err != nil {
glog.Errorf("%v", err)
return err
} else if instanceType != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", metav1.LabelInstanceType, instanceType)
curNode.ObjectMeta.Labels[metav1.LabelInstanceType] = instanceType
}
// TODO(wlan0): Move this logic to the route controller using the node taint instead of condition
// Since there are node taints, do we still need this?
// This condition marks the node as unusable until routes are initialized in the cloud provider
if cnc.cloud.ProviderName() == "gce" {
curNode.Status.Conditions = append(node.Status.Conditions, v1.NodeCondition{
Type: v1.NodeNetworkUnavailable,
Status: v1.ConditionTrue,
Reason: "NoRouteCreated",
Message: "Node created without a route",
LastTransitionTime: metav1.Now(),
})
}
if zones, ok := cnc.cloud.Zones(); ok {
zone, err := zones.GetZone()
if err != nil {
return fmt.Errorf("failed to get zone from cloud provider: %v", err)
}
if zone.FailureDomain != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", metav1.LabelZoneFailureDomain, zone.FailureDomain)
curNode.ObjectMeta.Labels[metav1.LabelZoneFailureDomain] = zone.FailureDomain
}
if zone.Region != "" {
glog.Infof("Adding node label from cloud provider: %s=%s", metav1.LabelZoneRegion, zone.Region)
curNode.ObjectMeta.Labels[metav1.LabelZoneRegion] = zone.Region
}
}
curNode.Spec.Taints = excludeTaintFromList(curNode.Spec.Taints, *cloudTaint)
_, err = cnc.kubeClient.CoreV1().Nodes().Update(curNode)
if err != nil {
return err
}
// After adding, call UpdateNodeAddress to set the CloudProvider provided IPAddresses
// So that users do not see any significant delay in IP addresses being filled into the node
cnc.updateNodeAddress(curNode, instances)
return nil
})
if err != nil {
utilruntime.HandleError(err)
return
}
}
func getCloudTaint(taints []v1.Taint) *v1.Taint {
for _, taint := range taints {
if taint.Key == metav1.TaintExternalCloudProvider {
return &taint
}
}
return nil
}
func excludeTaintFromList(taints []v1.Taint, toExclude v1.Taint) []v1.Taint {
newTaints := []v1.Taint{}
for _, taint := range taints {
if toExclude.MatchTaint(&taint) {
continue
}
newTaints = append(newTaints, taint)
}
return newTaints
}
func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) {
nodeAddresses, err := instances.NodeAddressesByProviderID(node.Spec.ProviderID)
if err != nil {
providerIDErr := err
nodeAddresses, err = instances.NodeAddresses(types.NodeName(node.Name))
if err != nil {
return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}
}
return nodeAddresses, nil
}
func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool {
if len(addressSet1) != len(addressSet2) {
return true
}
addressMap1 := map[v1.NodeAddressType]string{}
addressMap2 := map[v1.NodeAddressType]string{}
for i := range addressSet1 {
addressMap1[addressSet1[i].Type] = addressSet1[i].Address
addressMap2[addressSet2[i].Type] = addressSet2[i].Address
}
for k, v := range addressMap1 {
if addressMap2[k] != v {
return true
}
}
return false
}
func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (*v1.NodeAddress, bool) {
var nodeIP *v1.NodeAddress
nodeIPExists := false
if providedIP, ok := node.ObjectMeta.Annotations[metav1.AnnotationProvidedIPAddr]; ok {
nodeIPExists = true
for i := range nodeAddresses {
if nodeAddresses[i].Address == providedIP {
nodeIP = &nodeAddresses[i]
break
}
}
}
return nodeIP, nodeIPExists
}
func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) {
instanceType, err := instances.InstanceTypeByProviderID(node.Spec.ProviderID)
if err != nil {
providerIDErr := err
instanceType, err = instances.InstanceType(types.NodeName(node.Name))
if err != nil {
return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}
}
return instanceType, err
}

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
@@ -103,11 +104,12 @@ func TestNodeDeleted(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound},
nodeMonitorPeriod: 5 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "controllermanager"}),
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound},
nodeMonitorPeriod: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
eventBroadcaster.StartLogging(glog.Infof)
@@ -122,3 +124,682 @@ func TestNodeDeleted(t *testing.T) {
t.Errorf("Node was not deleted")
}
}
// This test checks that a node with the external cloud provider taint is cloudprovider initialized
func TestNodeInitialized(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: metav1.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
fakeCloud := &fakecloud.FakeCloud{
InstanceTypes: map[types.NodeName]string{
types.NodeName("node0"): "t1.micro",
},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
Err: nil,
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}),
nodeStatusUpdateFrequency: 1 * time.Second,
}
eventBroadcaster.StartLogging(glog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0])
if len(fnh.UpdatedNodes) != 1 || fnh.UpdatedNodes[0].Name != "node0" {
t.Errorf("Node was not updated")
}
if len(fnh.UpdatedNodes[0].Spec.Taints) != 0 {
t.Errorf("Node Taint was not removed after cloud init")
}
}
// This test checks that a node without the external cloud provider taint are NOT cloudprovider initialized
func TestNodeIgnored(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
fakeCloud := &fakecloud.FakeCloud{
InstanceTypes: map[types.NodeName]string{
types.NodeName("node0"): "t1.micro",
},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
Err: nil,
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}),
}
eventBroadcaster.StartLogging(glog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0])
if len(fnh.UpdatedNodes) != 0 {
t.Errorf("Node was wrongly updated")
}
}
// This test checks that a node with the external cloud provider taint is cloudprovider initialized and
// the GCE route condition is added if cloudprovider is GCE
func TestGCECondition(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: metav1.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
fakeCloud := &fakecloud.FakeCloud{
InstanceTypes: map[types.NodeName]string{
types.NodeName("node0"): "t1.micro",
},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
Provider: "gce",
Err: nil,
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}),
}
eventBroadcaster.StartLogging(glog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0])
if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" {
t.Errorf("Node was not updated")
}
if len(fnh.UpdatedNodes[0].Status.Conditions) != 2 {
t.Errorf("No new conditions were added for GCE")
}
conditionAdded := false
for _, cond := range fnh.UpdatedNodes[0].Status.Conditions {
if cond.Status == "True" && cond.Type == "NetworkUnavailable" && cond.Reason == "NoRouteCreated" {
conditionAdded = true
}
}
if !conditionAdded {
t.Errorf("Network Route Condition for GCE not added by external cloud intializer")
}
}
// This test checks that a node with the external cloud provider taint is cloudprovider initialized and
// and that zone labels are added correctly
func TestZoneInitialized(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: metav1.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
fakeCloud := &fakecloud.FakeCloud{
InstanceTypes: map[types.NodeName]string{
types.NodeName("node0"): "t1.micro",
},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
Provider: "aws",
Zone: cloudprovider.Zone{
FailureDomain: "us-west-1a",
Region: "us-west",
},
Err: nil,
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}),
}
eventBroadcaster.StartLogging(glog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0])
if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" {
t.Errorf("Node was not updated")
}
if len(fnh.UpdatedNodes[0].ObjectMeta.Labels) != 2 {
t.Errorf("Node label for Region and Zone were not set")
}
if fnh.UpdatedNodes[0].ObjectMeta.Labels[metav1.LabelZoneRegion] != "us-west" {
t.Errorf("Node Region not correctly updated")
}
if fnh.UpdatedNodes[0].ObjectMeta.Labels[metav1.LabelZoneFailureDomain] != "us-west-1a" {
t.Errorf("Node FailureDomain not correctly updated")
}
}
// This test checks that a node with the external cloud provider taint is cloudprovider initialized and
// and nodeAddresses are updated from the cloudprovider
func TestNodeAddresses(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: "ImproveCoverageTaint",
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
{
Key: metav1.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
fakeCloud := &fakecloud.FakeCloud{
InstanceTypes: map[types.NodeName]string{},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
Provider: "aws",
Zone: cloudprovider.Zone{
FailureDomain: "us-west-1a",
Region: "us-west",
},
Err: nil,
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
nodeStatusUpdateFrequency: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}),
}
eventBroadcaster.StartLogging(glog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0])
if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" {
t.Errorf("Node was not updated")
}
if len(fnh.UpdatedNodes[0].Status.Addresses) != 3 {
t.Errorf("Node status not updated")
}
fakeCloud.Addresses = []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
}
cloudNodeController.Run()
<-time.After(2 * time.Second)
updatedNodes := fnh.GetUpdatedNodesCopy()
if len(updatedNodes[0].Status.Addresses) != 2 {
t.Errorf("Node Addresses not correctly updated")
}
}
// This test checks that a node with the external cloud provider taint is cloudprovider initialized and
// and the provided node ip is validated with the cloudprovider and nodeAddresses are updated from the cloudprovider
func TestNodeProvidedIPAddresses(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
Labels: map[string]string{},
Annotations: map[string]string{
metav1.AnnotationProvidedIPAddr: "10.0.0.1",
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeHostName,
Address: "node0.cloud.internal",
},
},
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{
Key: "ImproveCoverageTaint",
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
{
Key: metav1.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
},
},
ProviderID: "node0.aws.12345",
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{}),
DeleteWaitChan: make(chan struct{}),
}
factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc())
fakeCloud := &fakecloud.FakeCloud{
InstanceTypes: map[types.NodeName]string{
types.NodeName("node0"): "t1.micro",
types.NodeName("node0.aws.12345"): "t2.macro",
},
Addresses: []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
},
Provider: "aws",
Zone: cloudprovider.Zone{
FailureDomain: "us-west-1a",
Region: "us-west",
},
Err: nil,
}
eventBroadcaster := record.NewBroadcaster()
cloudNodeController := &CloudNodeController{
kubeClient: fnh,
nodeInformer: factory.Core().V1().Nodes(),
cloud: fakeCloud,
nodeMonitorPeriod: 5 * time.Second,
nodeStatusUpdateFrequency: 1 * time.Second,
recorder: eventBroadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "cloud-controller-manager"}),
}
eventBroadcaster.StartLogging(glog.Infof)
cloudNodeController.AddCloudNode(fnh.Existing[0])
if len(fnh.UpdatedNodes) != 1 && fnh.UpdatedNodes[0].Name != "node0" {
t.Errorf("Node was not updated")
}
if len(fnh.UpdatedNodes[0].Status.Addresses) != 1 {
t.Errorf("Node status unexpectedly updated")
}
cloudNodeController.Run()
<-time.After(2 * time.Second)
updatedNodes := fnh.GetUpdatedNodesCopy()
if len(updatedNodes[0].Status.Addresses) != 1 || updatedNodes[0].Status.Addresses[0].Address != "10.0.0.1" {
t.Errorf("Node Addresses not correctly updated")
}
}
// Tests that node address changes are detected correctly
func TestNodeAddressesChangeDetected(t *testing.T) {
addressSet1 := []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
}
addressSet2 := []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
}
if nodeAddressesChangeDetected(addressSet1, addressSet2) {
t.Errorf("Node address changes are not detected correctly")
}
addressSet1 = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.164",
},
}
addressSet2 = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
}
if !nodeAddressesChangeDetected(addressSet1, addressSet2) {
t.Errorf("Node address changes are not detected correctly")
}
addressSet1 = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.164",
},
{
Type: v1.NodeHostName,
Address: "hostname.zone.region.aws.test",
},
}
addressSet2 = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.164",
},
}
if !nodeAddressesChangeDetected(addressSet1, addressSet2) {
t.Errorf("Node address changes are not detected correctly")
}
addressSet1 = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.164",
},
}
addressSet2 = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.164",
},
{
Type: v1.NodeHostName,
Address: "hostname.zone.region.aws.test",
},
}
if !nodeAddressesChangeDetected(addressSet1, addressSet2) {
t.Errorf("Node address changes are not detected correctly")
}
addressSet1 = []v1.NodeAddress{
{
Type: v1.NodeExternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeInternalIP,
Address: "132.143.154.163",
},
}
addressSet2 = []v1.NodeAddress{
{
Type: v1.NodeInternalIP,
Address: "10.0.0.1",
},
{
Type: v1.NodeExternalIP,
Address: "132.143.154.163",
},
}
if !nodeAddressesChangeDetected(addressSet1, addressSet2) {
t.Errorf("Node address changes are not detected correctly")
}
}

View File

@@ -90,6 +90,11 @@ func (c *FakeNodeHandler) Core() v1core.CoreV1Interface {
return &FakeLegacyHandler{c.Clientset.Core(), c}
}
// CoreV1 returns fake CoreV1Interface
func (c *FakeNodeHandler) CoreV1() v1core.CoreV1Interface {
return &FakeLegacyHandler{c.Clientset.CoreV1(), c}
}
// Nodes return fake NodeInterfaces.
func (m *FakeLegacyHandler) Nodes() v1core.NodeInterface {
return m.n

View File

@@ -186,7 +186,7 @@ type KubeletBootstrap interface {
}
// create and initialize a Kubelet instance
type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir string) (KubeletBootstrap, error)
type KubeletBuilder func(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir, providerID string) (KubeletBootstrap, error)
// KubeletDeps is a bin for things we might consider "injected dependencies" -- objects constructed
// at runtime that are necessary for running the Kubelet. This is a temporary solution for grouping
@@ -281,7 +281,7 @@ func getRuntimeAndImageServices(config *componentconfig.KubeletConfiguration) (i
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir string) (*Kubelet, error) {
func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool, hostnameOverride, nodeIP, dockershimRootDir, providerID string) (*Kubelet, error) {
if kubeCfg.RootDirectory == "" {
return nil, fmt.Errorf("invalid root directory %q", kubeCfg.RootDirectory)
}
@@ -433,6 +433,8 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
diskSpaceManager: diskSpaceManager,
cloud: kubeDeps.Cloud,
autoDetectCloudProvider: (componentconfigv1alpha1.AutoDetectCloudProvider == kubeCfg.CloudProvider),
externalCloudProvider: cloudprovider.IsExternal(kubeCfg.CloudProvider),
providerID: providerID,
nodeRef: nodeRef,
nodeLabels: kubeCfg.NodeLabels,
nodeStatusUpdateFrequency: kubeCfg.NodeStatusUpdateFrequency.Duration,
@@ -904,7 +906,8 @@ type Kubelet struct {
// Cloud provider interface.
cloud cloudprovider.Interface
autoDetectCloudProvider bool
// Indicates that the node initialization happens in an external cloud controller
externalCloudProvider bool
// Reference to this node.
nodeRef *clientv1.ObjectReference
@@ -998,6 +1001,9 @@ type Kubelet struct {
// If non-nil, use this IP address for the node
nodeIP net.IP
// If non-nil, this is a unique identifier for the node in an external database, eg. cloudprovider
providerID string
// clock is an interface that provides time related functionality in a way that makes it
// easy to test the code.
clock clock.Clock

View File

@@ -202,6 +202,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
Unschedulable: !kl.registerSchedulable,
},
}
nodeTaints := make([]v1.Taint, 0)
if len(kl.kubeletConfiguration.RegisterWithTaints) > 0 {
taints := make([]v1.Taint, len(kl.kubeletConfiguration.RegisterWithTaints))
for i := range kl.kubeletConfiguration.RegisterWithTaints {
@@ -209,7 +210,19 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
return nil, err
}
}
node.Spec.Taints = taints
nodeTaints = append(nodeTaints, taints...)
}
if kl.externalCloudProvider {
taint := v1.Taint{
Key: metav1.TaintExternalCloudProvider,
Value: "true",
Effect: v1.TaintEffectNoSchedule,
}
nodeTaints = append(nodeTaints, taint)
}
if len(nodeTaints) > 0 {
node.Spec.Taints = nodeTaints
}
// Initially, set NodeNetworkUnavailable to true.
if kl.providerRequiresNetworkingConfiguration() {
@@ -241,6 +254,10 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
node.ObjectMeta.Labels[k] = v
}
if kl.providerID != "" {
node.Spec.ProviderID = kl.providerID
}
if kl.cloud != nil {
instances, ok := kl.cloud.Instances()
if !ok {
@@ -259,9 +276,11 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
// TODO: We can't assume that the node has credentials to talk to the
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil {
return nil, err
if node.Spec.ProviderID == "" {
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
if err != nil {
return nil, err
}
}
instanceType, err := instances.InstanceType(kl.nodeName)
@@ -443,6 +462,7 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
// 4) Try to get the IP from the network interface used as default gateway
if kl.nodeIP != nil {
ipAddr = kl.nodeIP
node.ObjectMeta.Annotations[metav1.AnnotationProvidedIPAddr] = kl.nodeIP.String()
} else if addr := net.ParseIP(kl.hostname); addr != nil {
ipAddr = addr
} else {