diff --git a/pkg/controller/cloud/BUILD b/pkg/controller/cloud/BUILD new file mode 100644 index 00000000000..2fba1528357 --- /dev/null +++ b/pkg/controller/cloud/BUILD @@ -0,0 +1,49 @@ +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +load( + "@io_bazel_rules_go//go:def.bzl", + "go_binary", + "go_library", + "go_test", + "cgo_library", +) + +go_library( + name = "go_default_library", + srcs = ["cloud_node_controller.go"], + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/client/clientset_generated/internalclientset:go_default_library", + "//pkg/client/clientset_generated/internalclientset/typed/core/internalversion:go_default_library", + "//pkg/client/record:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/controller/informers:go_default_library", + "//pkg/types:go_default_library", + "//pkg/util/runtime:go_default_library", + "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", + ], +) + +go_test( + name = "go_default_test", + srcs = ["cloud_node_controller_test.go"], + library = "go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/api:go_default_library", + "//pkg/api/unversioned:go_default_library", + "//pkg/client/clientset_generated/internalclientset/fake:go_default_library", + "//pkg/client/record:go_default_library", + "//pkg/cloudprovider:go_default_library", + "//pkg/cloudprovider/providers/fake:go_default_library", + "//pkg/controller:go_default_library", + "//pkg/controller/informers:go_default_library", + "//pkg/controller/node:go_default_library", + "//pkg/util/wait:go_default_library", + "//vendor:github.com/golang/glog", + ], +) diff --git a/pkg/controller/cloud/cloud_node_controller.go b/pkg/controller/cloud/cloud_node_controller.go new file mode 100644 index 00000000000..29d5edf0149 --- /dev/null +++ b/pkg/controller/cloud/cloud_node_controller.go @@ -0,0 +1,153 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/controller/informers" + "k8s.io/kubernetes/pkg/types" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" +) + +type CloudNodeController struct { + nodeInformer informers.NodeInformer + kubeClient clientset.Interface + recorder record.EventRecorder + + cloud cloudprovider.Interface + + // Value controlling NodeController monitoring period, i.e. how often does NodeController + // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod + // set in controller-manager + nodeMonitorPeriod time.Duration +} + +const ( + // nodeStatusUpdateRetry controls the number of retries of writing NodeStatus update. + nodeStatusUpdateRetry = 5 + + // The amount of time the nodecontroller should sleep between retrying NodeStatus updates + retrySleepTime = 20 * time.Millisecond +) + +func NewCloudNodeController( + nodeInformer informers.NodeInformer, + kubeClient clientset.Interface, + cloud cloudprovider.Interface, + nodeMonitorPeriod time.Duration) (*CloudNodeController, error) { + + eventBroadcaster := record.NewBroadcaster() + recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "cloudcontrollermanager"}) + eventBroadcaster.StartLogging(glog.Infof) + if kubeClient != nil { + glog.V(0).Infof("Sending events to api server.") + eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + } else { + glog.V(0).Infof("No api server defined - no events will be sent to API server.") + } + + cnc := &CloudNodeController{ + nodeInformer: nodeInformer, + kubeClient: kubeClient, + recorder: recorder, + cloud: cloud, + nodeMonitorPeriod: nodeMonitorPeriod, + } + return cnc, nil +} + +// This controller deletes a node if kubelet is not reporting +// and the node is gone from the cloud provider. +func (cnc *CloudNodeController) Run() { + go func() { + defer utilruntime.HandleCrash() + + go wait.Until(func() { + nodes, err := cnc.kubeClient.Core().Nodes().List(api.ListOptions{ResourceVersion: "0"}) + if err != nil { + glog.Errorf("Error monitoring node status: %v", err) + } + + for i := range nodes.Items { + var currentReadyCondition *api.NodeCondition + node := &nodes.Items[i] + // Try to get the current node status + // If node status is empty, then kubelet has not posted ready status yet. In this case, process next node + for rep := 0; rep < nodeStatusUpdateRetry; rep++ { + _, currentReadyCondition = api.GetNodeCondition(&node.Status, api.NodeReady) + if currentReadyCondition != nil { + break + } + name := node.Name + node, err = cnc.kubeClient.Core().Nodes().Get(name) + if err != nil { + glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) + break + } + time.Sleep(retrySleepTime) + } + if currentReadyCondition == nil { + glog.Errorf("Update status of Node %v from CloudNodeController exceeds retry count.", node.Name) + continue + } + // If the known node status says that Node is NotReady, then check if the node has been removed + // from the cloud provider. If node cannot be found in cloudprovider, then delete the node immediately + if currentReadyCondition != nil { + if currentReadyCondition.Status != api.ConditionTrue { + instances, ok := cnc.cloud.Instances() + if !ok { + glog.Errorf("cloud provider does not support instances.") + continue + } + // Check with the cloud provider to see if the node still exists. If it + // doesn't, delete the node immediately. + if _, err := instances.ExternalID(types.NodeName(node.Name)); err != nil { + if err == cloudprovider.InstanceNotFound { + glog.V(2).Infof("Deleting node no longer present in cloud provider: %s", node.Name) + ref := &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + UID: types.UID(node.UID), + Namespace: "", + } + glog.V(2).Infof("Recording %s event message for node %s", "DeletingNode", node.Name) + cnc.recorder.Eventf(ref, api.EventTypeNormal, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name), "Node %s event: %s", node.Name, "DeletingNode") + go func(nodeName string) { + defer utilruntime.HandleCrash() + if err := cnc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { + glog.Errorf("unable to delete node %q: %v", node.Name, err) + } + }(node.Name) + } + glog.Errorf("Error getting node data from cloud: %v", err) + } + } + } + } + }, cnc.nodeMonitorPeriod, wait.NeverStop) + }() +} diff --git a/pkg/controller/cloud/cloud_node_controller_test.go b/pkg/controller/cloud/cloud_node_controller_test.go new file mode 100644 index 00000000000..c3771dd8b5c --- /dev/null +++ b/pkg/controller/cloud/cloud_node_controller_test.go @@ -0,0 +1,122 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "testing" + "time" + + "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/cloudprovider" + fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/informers" + "k8s.io/kubernetes/pkg/controller/node" + "k8s.io/kubernetes/pkg/util/wait" +) + +// This test checks that the node is deleted when kubelet stops reporting +// and cloud provider says node is gone +func TestNodeDeleted(t *testing.T) { + pod0 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "pod0", + }, + Spec: api.PodSpec{ + NodeName: "node0", + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + } + + pod1 := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "default", + Name: "pod1", + }, + Spec: api.PodSpec{ + NodeName: "node0", + }, + Status: api.PodStatus{ + Conditions: []api.PodCondition{ + { + Type: api.PodReady, + Status: api.ConditionTrue, + }, + }, + }, + } + + fnh := &node.FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*pod0, *pod1}}), + DeleteWaitChan: make(chan struct{}), + } + + factory := informers.NewSharedInformerFactory(fnh, controller.NoResyncPeriodFunc()) + + eventBroadcaster := record.NewBroadcaster() + cloudNodeController := &CloudNodeController{ + kubeClient: fnh, + nodeInformer: factory.Nodes(), + cloud: &fakecloud.FakeCloud{Err: cloudprovider.InstanceNotFound}, + nodeMonitorPeriod: 5 * time.Second, + recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}), + } + eventBroadcaster.StartLogging(glog.Infof) + + cloudNodeController.Run() + + select { + case <-fnh.DeleteWaitChan: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout) + } + if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" { + t.Errorf("Node was not deleted") + } +} diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 012ade79b1c..bf8b2e25866 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -1261,7 +1261,11 @@ func TestCloudProviderNoRateLimit(t *testing.T) { }, }, }, +<<<<<<< HEAD Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}), +======= + Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), +>>>>>>> start breaking up controller manager into two pieces DeleteWaitChan: make(chan struct{}), } nodeController, _ := NewNodeControllerFromClient(nil, fnh, 10*time.Minute,