consolidate node deletion logic between node lifecycle and cloud node controller

This commit is contained in:
andrewsykim
2018-10-28 21:57:23 -04:00
parent 6be4f1bbf3
commit 5329f09663
18 changed files with 750 additions and 702 deletions

View File

@@ -25,7 +25,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
@@ -42,7 +41,6 @@ go_library(
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
@@ -70,7 +68,6 @@ go_test(
srcs = ["node_lifecycle_controller_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/nodelifecycle/scheduler:go_default_library",
"//pkg/controller/testutil:go_default_library",
@@ -86,9 +83,7 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature/testing:go_default_library",
"//staging/src/k8s.io/client-go/informers:go_default_library",
@@ -98,7 +93,6 @@ go_test(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//vendor/k8s.io/utils/pointer:go_default_library",
],
)

View File

@@ -22,7 +22,6 @@ limitations under the License.
package nodelifecycle
import (
"context"
"fmt"
"hash/fnv"
"io"
@@ -37,7 +36,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
@@ -54,7 +52,6 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
v1node "k8s.io/kubernetes/pkg/api/v1/node"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
@@ -150,7 +147,6 @@ type Controller struct {
taintManager *scheduler.NoExecuteTaintManager
podInformerSynced cache.InformerSynced
cloud cloudprovider.Interface
kubeClient clientset.Interface
// This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
@@ -179,12 +175,10 @@ type Controller struct {
daemonSetStore extensionslisters.DaemonSetLister
daemonSetInformerSynced cache.InformerSynced
leaseLister coordlisters.LeaseLister
leaseInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
nodeExistsInCloudProvider func(types.NodeName) (bool, error)
nodeShutdownInCloudProvider func(context.Context, *v1.Node) (bool, error)
leaseLister coordlisters.LeaseLister
leaseInformerSynced cache.InformerSynced
nodeLister corelisters.NodeLister
nodeInformerSynced cache.InformerSynced
recorder record.EventRecorder
@@ -247,7 +241,6 @@ func NewNodeLifecycleController(
podInformer coreinformers.PodInformer,
nodeInformer coreinformers.NodeInformer,
daemonSetInformer extensionsinformers.DaemonSetInformer,
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
nodeMonitorPeriod time.Duration,
nodeStartupGracePeriod time.Duration,
@@ -280,17 +273,10 @@ func NewNodeLifecycleController(
}
nc := &Controller{
cloud: cloud,
kubeClient: kubeClient,
now: metav1.Now,
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: make(map[string]*nodeHealthData),
nodeExistsInCloudProvider: func(nodeName types.NodeName) (bool, error) {
return nodeutil.ExistsInCloudProvider(cloud, nodeName)
},
nodeShutdownInCloudProvider: func(ctx context.Context, node *v1.Node) (bool, error) {
return nodeutil.ShutdownInCloudProvider(ctx, cloud, node)
},
kubeClient: kubeClient,
now: metav1.Now,
knownNodeSet: make(map[string]*v1.Node),
nodeHealthMap: make(map[string]*nodeHealthData),
recorder: recorder,
nodeMonitorPeriod: nodeMonitorPeriod,
nodeStartupGracePeriod: nodeStartupGracePeriod,
@@ -779,11 +765,6 @@ func (nc *Controller) monitorNodeHealth() error {
klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
// remove shutdown taint this is needed always depending do we use taintbased or not
err := nc.markNodeAsNotShutdown(node)
if err != nil {
klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
}
}
// Report node event.
@@ -793,42 +774,6 @@ func (nc *Controller) monitorNodeHealth() error {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
if currentReadyCondition.Status != v1.ConditionTrue && nc.cloud != nil {
// check is node shutdowned, if yes do not deleted it. Instead add taint
shutdown, err := nc.nodeShutdownInCloudProvider(context.TODO(), node)
if err != nil {
klog.Errorf("Error determining if node %v shutdown in cloud: %v", node.Name, err)
}
// node shutdown
if shutdown && err == nil {
err = controller.AddOrUpdateTaintOnNode(nc.kubeClient, node.Name, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Error patching node taints: %v", err)
}
continue
}
exists, err := nc.nodeExistsInCloudProvider(types.NodeName(node.Name))
if err != nil {
klog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err)
continue
}
if !exists {
klog.V(2).Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nodeutil.RecordNodeEvent(nc.recorder, node.Name, string(node.UID), v1.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
go func(nodeName string) {
defer utilruntime.HandleCrash()
// Kubelet is not reporting and Cloud Provider says node
// is gone. Delete it without worrying about grace
// periods.
if err := nodeutil.ForcefullyDeleteNode(nc.kubeClient, nodeName); err != nil {
klog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err)
}
}(node.Name)
}
}
}
}
nc.handleDisruption(zoneToNodeConditions, nodes)
@@ -1268,17 +1213,6 @@ func (nc *Controller) markNodeAsReachable(node *v1.Node) (bool, error) {
return nc.zoneNoExecuteTainter[utilnode.GetZoneKey(node)].Remove(node.Name), nil
}
func (nc *Controller) markNodeAsNotShutdown(node *v1.Node) error {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
err := controller.RemoveTaintOffNode(nc.kubeClient, node.Name, node, controller.ShutdownTaint)
if err != nil {
klog.Errorf("Failed to remove taint from node %v: %v", node.Name, err)
return err
}
return nil
}
// ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.
// The zone is considered:
// - fullyDisrupted if there're no Ready Nodes,

View File

@@ -17,7 +17,6 @@ limitations under the License.
package nodelifecycle
import (
"context"
"strings"
"testing"
"time"
@@ -28,9 +27,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilfeaturetesting "k8s.io/apiserver/pkg/util/feature/testing"
"k8s.io/client-go/informers"
@@ -40,8 +37,6 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
testcore "k8s.io/client-go/testing"
cloudprovider "k8s.io/cloud-provider"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
"k8s.io/kubernetes/pkg/controller/testutil"
@@ -128,7 +123,6 @@ func (nc *nodeLifecycleController) syncNodeStore(fakeNodeHandler *testutil.FakeN
}
func newNodeLifecycleControllerFromClient(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
podEvictionTimeout time.Duration,
evictionLimiterQPS float32,
@@ -152,7 +146,6 @@ func newNodeLifecycleControllerFromClient(
factory.Core().V1().Pods(),
nodeInformer,
daemonSetInformer,
cloud,
kubeClient,
nodeMonitorPeriod,
nodeStartupGracePeriod,
@@ -641,7 +634,6 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
for _, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@@ -802,7 +794,6 @@ func TestPodStatusChange(t *testing.T) {
for _, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@@ -1327,7 +1318,6 @@ func TestMonitorNodeHealthEvictPodsWithDisruption(t *testing.T) {
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: item.podList}),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@@ -1392,185 +1382,6 @@ func TestMonitorNodeHealthEvictPodsWithDisruption(t *testing.T) {
}
}
func TestCloudProviderNodeShutdown(t *testing.T) {
testCases := []struct {
testName string
node *v1.Node
shutdown bool
}{
{
testName: "node shutdowned add taint",
shutdown: true,
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
{
testName: "node started after shutdown remove taint",
shutdown: false,
node: &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Spec: v1.NodeSpec{
ProviderID: "node0",
Taints: []v1.Taint{
{
Key: schedulerapi.TaintNodeShutdown,
Effect: v1.TaintEffectNoSchedule,
},
},
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{tc.node},
Clientset: fake.NewSimpleClientset(),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fnh,
10*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.nodeShutdownInCloudProvider = func(ctx context.Context, node *v1.Node) (bool, error) {
return tc.shutdown, nil
}
if err := nodeController.syncNodeStore(fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fnh.UpdatedNodes) != 1 {
t.Errorf("Node was not updated")
}
if tc.shutdown {
if len(fnh.UpdatedNodes[0].Spec.Taints) != 1 {
t.Errorf("Node Taint was not added")
}
if fnh.UpdatedNodes[0].Spec.Taints[0].Key != "node.cloudprovider.kubernetes.io/shutdown" {
t.Errorf("Node Taint key is not correct")
}
} else {
if len(fnh.UpdatedNodes[0].Spec.Taints) != 0 {
t.Errorf("Node Taint was not removed after node is back in ready state")
}
}
})
}
}
// TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes
// pods and the node when kubelet has not reported, and the cloudprovider says
// the node is gone.
func TestCloudProviderNoRateLimit(t *testing.T) {
fnh := &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: v1.ConditionUnknown,
LastHeartbeatTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
LastTransitionTime: metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC),
},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0"), *testutil.NewPod("pod1", "node0")}}),
DeleteWaitChan: make(chan struct{}),
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fnh,
10*time.Minute,
testRateLimiterQPS,
testRateLimiterQPS,
testLargeClusterThreshold,
testUnhealthyThreshold,
testNodeMonitorGracePeriod,
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.now = func() metav1.Time { return metav1.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) }
nodeController.recorder = testutil.NewFakeRecorder()
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
return false, nil
}
nodeController.nodeShutdownInCloudProvider = func(ctx context.Context, node *v1.Node) (bool, error) {
return false, nil
}
// monitorNodeHealth should allow this node to be immediately deleted
if err := nodeController.syncNodeStore(fnh); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(); err != nil {
t.Errorf("unexpected error: %v", err)
}
select {
case <-fnh.DeleteWaitChan:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout)
}
if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" {
t.Errorf("Node was not deleted")
}
if nodeOnQueue := nodeController.zonePodEvictor[""].Remove("node0"); nodeOnQueue {
t.Errorf("Node was queued for eviction. Should have been immediately deleted.")
}
}
func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
fakeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct {
@@ -1822,7 +1633,6 @@ func TestMonitorNodeHealthUpdateStatus(t *testing.T) {
}
for i, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@@ -2403,7 +2213,6 @@ func TestMonitorNodeHealthUpdateNodeAndPodStatusWithLease(t *testing.T) {
for _, item := range testcases {
t.Run(item.description, func(t *testing.T) {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@@ -2567,7 +2376,6 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
for i, item := range table {
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
item.fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@@ -2697,7 +2505,6 @@ func TestApplyNoExecuteTaints(t *testing.T) {
}
originalTaint := UnreachableTaintTemplate
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@@ -2838,7 +2645,6 @@ func TestSwapUnreachableNotReadyTaints(t *testing.T) {
updatedTaint := NotReadyTaintTemplate
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@@ -2940,7 +2746,6 @@ func TestTaintsNodeByCondition(t *testing.T) {
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,
@@ -3180,7 +2985,6 @@ func TestNodeEventGeneration(t *testing.T) {
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
5*time.Minute,
testRateLimiterQPS,
@@ -3191,26 +2995,20 @@ func TestNodeEventGeneration(t *testing.T) {
testNodeStartupGracePeriod,
testNodeMonitorPeriod,
false)
nodeController.cloud = &fakecloud.FakeCloud{}
nodeController.nodeExistsInCloudProvider = func(nodeName types.NodeName) (bool, error) {
return false, nil
}
nodeController.nodeShutdownInCloudProvider = func(ctx context.Context, node *v1.Node) (bool, error) {
return false, nil
}
nodeController.now = func() metav1.Time { return fakeNow }
fakeRecorder := testutil.NewFakeRecorder()
nodeController.recorder = fakeRecorder
if err := nodeController.syncNodeStore(fakeNodeHandler); err != nil {
t.Errorf("unexpected error: %v", err)
}
if err := nodeController.monitorNodeHealth(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(fakeRecorder.Events) != 2 {
t.Fatalf("unexpected events, got %v, expected %v: %+v", len(fakeRecorder.Events), 2, fakeRecorder.Events)
if len(fakeRecorder.Events) != 1 {
t.Fatalf("unexpected events, got %v, expected %v: %+v", len(fakeRecorder.Events), 1, fakeRecorder.Events)
}
if fakeRecorder.Events[0].Reason != "RegisteredNode" || fakeRecorder.Events[1].Reason != "DeletingNode" {
if fakeRecorder.Events[0].Reason != "RegisteredNode" {
var reasons []string
for _, event := range fakeRecorder.Events {
reasons = append(reasons, event.Reason)
@@ -3249,7 +3047,6 @@ func TestFixDeprecatedTaintKey(t *testing.T) {
}
nodeController, _ := newNodeLifecycleControllerFromClient(
nil,
fakeNodeHandler,
evictionTimeout,
testRateLimiterQPS,