Merge pull request #96441 from smarterclayton/daemonset_surge_impl

DaemonSet controller respects MaxSurge during update
This commit is contained in:
Kubernetes Prow Robot
2021-03-06 08:23:42 -08:00
committed by GitHub
7 changed files with 1344 additions and 460 deletions

View File

@@ -17,10 +17,14 @@ limitations under the License.
package apps
import (
"bytes"
"context"
"fmt"
"math/rand"
"reflect"
"sort"
"strings"
"text/tabwriter"
"time"
"github.com/onsi/ginkgo"
@@ -32,6 +36,8 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
@@ -482,8 +488,334 @@ var _ = SIGDescribe("Daemon set [Serial]", func() {
framework.ExpectEqual(rollbackPods[pod.Name], true, fmt.Sprintf("unexpected pod %s be restarted", pod.Name))
}
})
// TODO: This test is expected to be promoted to conformance after the feature is promoted
ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate [Feature:DaemonSetUpdateSurge]", func() {
label := map[string]string{daemonsetNameLabel: dsName}
framework.Logf("Creating surge daemon set %s", dsName)
maxSurgeOverlap := 60 * time.Second
maxSurge := 1
surgePercent := intstr.FromString("20%")
zero := intstr.FromInt(0)
oldVersion := "1"
ds := newDaemonSet(dsName, image, label)
ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{
{Name: "VERSION", Value: oldVersion},
}
// delay shutdown by 15s to allow containers to overlap in time
ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{
PreStop: &v1.Handler{
Exec: &v1.ExecAction{
Command: []string{"/bin/sh", "-c", "sleep 15"},
},
},
}
// use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready)
ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
Handler: v1.Handler{
Exec: &v1.ExecAction{
Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`},
},
},
InitialDelaySeconds: 7,
PeriodSeconds: 3,
SuccessThreshold: 1,
FailureThreshold: 1,
}
// use a simple surge strategy
ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{
Type: appsv1.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateDaemonSet{
MaxUnavailable: &zero,
MaxSurge: &surgePercent,
},
}
// The pod must be ready for at least 10s before we delete the old pod
ds.Spec.MinReadySeconds = 10
ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(c, ns, label, 1)
cur := curHistory(listDaemonHistories(c, ns, label), ds)
hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
framework.ExpectEqual(cur.Revision, int64(1))
checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
newVersion := "2"
ginkgo.By("Update daemon pods environment var")
patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion)
ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
framework.ExpectNoError(err)
// Time to complete the rolling upgrade is proportional to the number of nodes in the cluster.
// Get the number of nodes, and set the timeout appropriately.
nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err)
nodeCount := len(nodes.Items)
retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second
ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout")
ageOfOldPod := make(map[string]time.Time)
deliberatelyDeletedPods := sets.NewString()
err = wait.PollImmediate(dsRetryPeriod, retryTimeout, func() (bool, error) {
podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return false, err
}
pods := podList.Items
var buf bytes.Buffer
pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0)
fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n")
now := time.Now()
podUIDs := sets.NewString()
deletedPodUIDs := sets.NewString()
nodes := sets.NewString()
versions := sets.NewString()
nodesToVersions := make(map[string]map[string]int)
nodesToDeletedVersions := make(map[string]map[string]int)
var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int
for _, pod := range pods {
if !metav1.IsControlledBy(&pod, ds) {
continue
}
nodeName := pod.Spec.NodeName
nodes.Insert(nodeName)
podVersion := pod.Spec.Containers[0].Env[0].Value
if pod.DeletionTimestamp != nil {
if !deliberatelyDeletedPods.Has(string(pod.UID)) {
versions := nodesToDeletedVersions[nodeName]
if versions == nil {
versions = make(map[string]int)
nodesToDeletedVersions[nodeName] = versions
}
versions[podVersion]++
}
} else {
versions := nodesToVersions[nodeName]
if versions == nil {
versions = make(map[string]int)
nodesToVersions[nodeName] = versions
}
versions[podVersion]++
}
ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now())
if podVersion == newVersion {
surgeCount++
if !ready || pod.DeletionTimestamp != nil {
if deliberatelyDeletedPods.Has(string(pod.UID)) {
newDeliberatelyDeletedCount++
}
newUnavailableCount++
}
} else {
if !ready || pod.DeletionTimestamp != nil {
oldUnavailableCount++
}
}
fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready)
}
// print a stable sorted list of pods by node for debugging
pw.Flush()
lines := strings.Split(buf.String(), "\n")
lines = lines[:len(lines)-1]
sort.Strings(lines[1:])
for _, line := range lines {
framework.Logf("%s", line)
}
// if there is an old and new pod at the same time, record a timestamp
deletedPerNode := make(map[string]int)
for _, pod := range pods {
if !metav1.IsControlledBy(&pod, ds) {
continue
}
// ignore deleted pods
if pod.DeletionTimestamp != nil {
deletedPodUIDs.Insert(string(pod.UID))
if !deliberatelyDeletedPods.Has(string(pod.UID)) {
deletedPerNode[pod.Spec.NodeName]++
}
continue
}
podUIDs.Insert(string(pod.UID))
podVersion := pod.Spec.Containers[0].Env[0].Value
if podVersion == newVersion {
continue
}
// if this is a pod in an older version AND there is a new version of this pod, record when
// we started seeing this, otherwise delete the record (perhaps the node was drained)
if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 {
if _, ok := ageOfOldPod[string(pod.UID)]; !ok {
ageOfOldPod[string(pod.UID)] = now
}
} else {
delete(ageOfOldPod, string(pod.UID))
}
}
// purge the old pods list of any deleted pods
for uid := range ageOfOldPod {
if !podUIDs.Has(uid) {
delete(ageOfOldPod, uid)
}
}
deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs)
for _, versions := range nodesToVersions {
if versions[oldVersion] == 0 {
nodesWithoutOldVersion++
}
}
var errs []string
// invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving
for node, count := range deletedPerNode {
if count > 1 {
errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count))
}
}
// invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period)
for uid, firstSeen := range ageOfOldPod {
if now.Sub(firstSeen) > maxSurgeOverlap {
errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap))
}
}
// invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or
// if we deliberately deleted one of the new pods
if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) {
errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion))
}
// invariant: the total number of versions created should be 2
if versions.Len() > 2 {
errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len()))
}
for _, node := range nodes.List() {
// ignore pods that haven't been scheduled yet
if len(node) == 0 {
continue
}
versionCount := make(map[string]int)
// invariant: surge should never have more than one instance of a pod per node running
for version, count := range nodesToVersions[node] {
if count > 1 {
errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version))
}
versionCount[version] += count
}
// invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted)
for version, count := range nodesToDeletedVersions[node] {
if count > 2 {
errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version))
}
versionCount[version] += count
}
// invariant: on any node, we should never have more than two instances of a version (if we are getting evicted)
for version, count := range versionCount {
if count > 2 {
errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version))
}
}
}
if len(errs) > 0 {
sort.Strings(errs)
return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n"))
}
// Make sure every daemon pod on the node has been updated
nodeNames := schedulableNodes(c, ds)
for _, node := range nodeNames {
switch {
case
// if we don't have the new version yet
nodesToVersions[node][newVersion] == 0,
// if there are more than one version on a node
len(nodesToVersions[node]) > 1,
// if there are still any deleted pods
len(nodesToDeletedVersions[node]) > 0,
// if any of the new pods are unavailable
newUnavailableCount > 0:
// inject a failure randomly to ensure the controller recovers
switch rand.Intn(25) {
// cause a random old pod to go unready
case 0:
// select a not-deleted pod of the old version
if pod := randomPod(pods, func(pod *v1.Pod) bool {
return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value
}); pod != nil {
// make the /tmp/ready file read only, which will cause readiness to fail
if _, err := framework.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil {
framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err)
} else {
framework.Logf("Marked old pod %s as unready", pod.Name)
}
}
case 1:
// delete a random pod
if pod := randomPod(pods, func(pod *v1.Pod) bool {
return pod.DeletionTimestamp == nil
}); pod != nil {
if err := c.CoreV1().Pods(ds.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil {
framework.Logf("Failed to delete pod %s early: %v", pod.Name, err)
} else {
framework.Logf("Deleted pod %s prematurely", pod.Name)
deliberatelyDeletedPods.Insert(string(pod.UID))
}
}
}
// then wait
return false, nil
}
}
return true, nil
})
framework.ExpectNoError(err)
ginkgo.By("Check that daemon pods are still running on every node of the cluster.")
err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds))
framework.ExpectNoError(err, "error waiting for daemon pod to start")
// Check history and labels
ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
waitForHistoryCreated(c, ns, label, 2)
cur = curHistory(listDaemonHistories(c, ns, label), ds)
hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey]
framework.ExpectEqual(cur.Revision, int64(2))
checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash)
})
})
// randomPod selects a random pod within pods that causes fn to return true, or nil
// if no pod can be found matching the criteria.
func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod {
podCount := len(pods)
for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ {
pod := &pods[(offset+i)%podCount]
if fn(pod) {
return pod
}
}
return nil
}
// getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image
func getDaemonSetImagePatch(containerName, containerImage string) string {
return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)