daemonset: Implement MaxSurge on daemonset update

If MaxSurge is set, the controller will attempt to double up nodes
up to the allowed limit with a new pod, and then when the most recent
(by hash) pod is ready, trigger deletion on the old pod. If the old
pod goes unready before the new pod is ready, the old pod is immediately
deleted. If an old pod goes unready before a new pod is placed on that
node, a new pod is immediately added for that node even past the MaxSurge
limit.

The backoff clock is used consistently throughout the daemonset controller
as an injectable clock for the purposes of testing.
This commit is contained in:
Clayton Coleman
2021-01-27 00:20:56 -05:00
parent 6bac5019aa
commit 18f43e4120
6 changed files with 1190 additions and 68 deletions

View File

@@ -33,9 +33,11 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/names"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
@@ -43,10 +45,14 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/daemon/util"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/securitycontext"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)
@@ -429,6 +435,39 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.Dae
return
}
manager.expectations.DeleteExpectations(key)
now := manager.failedPodsBackoff.Clock.Now()
hash, _ := currentDSHash(manager, ds)
// log all the pods in the store
var lines []string
for _, obj := range manager.podStore.List() {
pod := obj.(*v1.Pod)
if pod.CreationTimestamp.IsZero() {
pod.CreationTimestamp.Time = now
}
var readyLast time.Time
ready := podutil.IsPodReady(pod)
if ready {
if c := podutil.GetPodReadyCondition(pod.Status); c != nil {
readyLast = c.LastTransitionTime.Time.Add(time.Duration(ds.Spec.MinReadySeconds) * time.Second)
}
}
nodeName, _ := util.GetTargetNodeName(pod)
lines = append(lines, fmt.Sprintf("node=%s current=%-5t ready=%-5t age=%-4d pod=%s now=%d available=%d",
nodeName,
hash == pod.Labels[apps.ControllerRevisionHashLabelKey],
ready,
now.Unix(),
pod.Name,
pod.CreationTimestamp.Unix(),
readyLast.Unix(),
))
}
sort.Strings(lines)
for _, line := range lines {
klog.Info(line)
}
}
func TestDeleteFinalStateUnknown(t *testing.T) {
@@ -3042,3 +3081,237 @@ func getQueuedKeys(queue workqueue.RateLimitingInterface) []string {
sort.Strings(keys)
return keys
}
// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods.
func TestSurgeDealsWithExistingPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1)
addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 2)
addPods(manager.podStore, "node-3", simpleDaemonSetLabel, ds, 5)
addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, ds, 2)
expectSyncDaemonSets(t, manager, ds, podControl, 2, 5, 0)
}
func TestSurgePreservesReadyOldPods(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)()
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
// will be preserved because it's the current hash
pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds)
pod.CreationTimestamp.Time = time.Unix(100, 0)
manager.podStore.Add(pod)
// will be preserved because it's the oldest AND it is ready
pod = newPod("node-1-old-", "node-1", simpleDaemonSetLabel, ds)
delete(pod.Labels, apps.ControllerRevisionHashLabelKey)
pod.CreationTimestamp.Time = time.Unix(50, 0)
pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(pod)
// will be deleted because it's not the oldest, even though it is ready
oldReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds)
delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey)
oldReadyPod.CreationTimestamp.Time = time.Unix(60, 0)
oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldReadyPod)
addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1)
expectSyncDaemonSets(t, manager, ds, podControl, 3, 1, 0)
actual := sets.NewString(podControl.DeletePodName...)
expected := sets.NewString(oldReadyPod.Name)
if !actual.Equal(expected) {
t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List())
}
}
func TestSurgeCreatesNewPodWhenAtMaxSurgeAndOldPodDeleted(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)()
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
// will be preserved because it has the newest hash, and is also consuming the surge budget
pod := newPod("node-0-", "node-0", simpleDaemonSetLabel, ds)
pod.CreationTimestamp.Time = time.Unix(100, 0)
pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}}
manager.podStore.Add(pod)
// will be preserved because it is ready
oldPodReady := newPod("node-0-old-ready-", "node-0", simpleDaemonSetLabel, ds)
delete(oldPodReady.Labels, apps.ControllerRevisionHashLabelKey)
oldPodReady.CreationTimestamp.Time = time.Unix(50, 0)
oldPodReady.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldPodReady)
// create old ready pods on all other nodes
for i := 1; i < 5; i++ {
oldPod := newPod(fmt.Sprintf("node-%d-preserve-", i), fmt.Sprintf("node-%d", i), simpleDaemonSetLabel, ds)
delete(oldPod.Labels, apps.ControllerRevisionHashLabelKey)
oldPod.CreationTimestamp.Time = time.Unix(1, 0)
oldPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldPod)
// mark the last old pod as deleted, which should trigger a creation above surge
if i == 4 {
thirty := int64(30)
timestamp := metav1.Time{Time: time.Unix(1+thirty, 0)}
oldPod.DeletionGracePeriodSeconds = &thirty
oldPod.DeletionTimestamp = &timestamp
}
}
// controller should detect that node-4 has only a deleted pod
clearExpectations(t, manager, ds, podControl)
expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0)
clearExpectations(t, manager, ds, podControl)
}
func TestSurgeDeletesUnreadyOldPods(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)()
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
// will be preserved because it has the newest hash
pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds)
pod.CreationTimestamp.Time = time.Unix(100, 0)
manager.podStore.Add(pod)
// will be deleted because it is unready
oldUnreadyPod := newPod("node-1-old-unready-", "node-1", simpleDaemonSetLabel, ds)
delete(oldUnreadyPod.Labels, apps.ControllerRevisionHashLabelKey)
oldUnreadyPod.CreationTimestamp.Time = time.Unix(50, 0)
oldUnreadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}}
manager.podStore.Add(oldUnreadyPod)
// will be deleted because it is not the oldest
oldReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds)
delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey)
oldReadyPod.CreationTimestamp.Time = time.Unix(60, 0)
oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldReadyPod)
addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1)
expectSyncDaemonSets(t, manager, ds, podControl, 3, 2, 0)
actual := sets.NewString(podControl.DeletePodName...)
expected := sets.NewString(oldReadyPod.Name, oldUnreadyPod.Name)
if !actual.Equal(expected) {
t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List())
}
}
func TestSurgePreservesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)()
ds := newDaemonSet("foo")
ds.Spec.MinReadySeconds = 15
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
// the clock will be set 10s after the newest pod on node-1 went ready, which is not long enough to be available
manager.DaemonSetsController.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(50+10, 0))
// will be preserved because it has the newest hash
pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds)
pod.CreationTimestamp.Time = time.Unix(100, 0)
pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: time.Unix(50, 0)}}}
manager.podStore.Add(pod)
// will be preserved because it is ready AND the newest pod is not yet available for long enough
oldReadyPod := newPod("node-1-old-ready-", "node-1", simpleDaemonSetLabel, ds)
delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey)
oldReadyPod.CreationTimestamp.Time = time.Unix(50, 0)
oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldReadyPod)
// will be deleted because it is not the oldest
oldExcessReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds)
delete(oldExcessReadyPod.Labels, apps.ControllerRevisionHashLabelKey)
oldExcessReadyPod.CreationTimestamp.Time = time.Unix(60, 0)
oldExcessReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldExcessReadyPod)
addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1)
expectSyncDaemonSets(t, manager, ds, podControl, 3, 1, 0)
actual := sets.NewString(podControl.DeletePodName...)
expected := sets.NewString(oldExcessReadyPod.Name)
if !actual.Equal(expected) {
t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List())
}
}
func TestSurgeDeletesOldReadyWithUnsatisfiedMinReady(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)()
ds := newDaemonSet("foo")
ds.Spec.MinReadySeconds = 15
ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1))
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
// the clock will be set 20s after the newest pod on node-1 went ready, which is not long enough to be available
manager.DaemonSetsController.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(50+20, 0))
// will be preserved because it has the newest hash
pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds)
pod.CreationTimestamp.Time = time.Unix(100, 0)
pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: time.Unix(50, 0)}}}
manager.podStore.Add(pod)
// will be preserved because it is ready AND the newest pod is not yet available for long enough
oldReadyPod := newPod("node-1-old-ready-", "node-1", simpleDaemonSetLabel, ds)
delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey)
oldReadyPod.CreationTimestamp.Time = time.Unix(50, 0)
oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldReadyPod)
// will be deleted because it is not the oldest
oldExcessReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds)
delete(oldExcessReadyPod.Labels, apps.ControllerRevisionHashLabelKey)
oldExcessReadyPod.CreationTimestamp.Time = time.Unix(60, 0)
oldExcessReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}}
manager.podStore.Add(oldExcessReadyPod)
addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1)
expectSyncDaemonSets(t, manager, ds, podControl, 3, 2, 0)
actual := sets.NewString(podControl.DeletePodName...)
expected := sets.NewString(oldExcessReadyPod.Name, oldReadyPod.Name)
if !actual.Equal(expected) {
t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List())
}
}