Remove crash loop detection from the dynamic kubelet config feature

The subfeature was a cool idea, but in the end it is very complex to
separate Kubelet restarts into crash-loops caused by config vs.
crash-loops caused by other phenomena, like admin-triggered node restarts,
kernel panics, and and process babysitter behavior. Dynamic kubelet config
will be better off without the potential for false positives here.

Removing this subfeature also simplifies dynamic configuration by
reducing persistent state:
- we no longer need to track bad config in a file
- we no longer need to track kubelet startups in a file
This commit is contained in:
Michael Taufen
2017-08-22 12:30:20 -07:00
parent e225666813
commit 76c41a252c
22 changed files with 53 additions and 1552 deletions

View File

@@ -56,17 +56,8 @@ type KubeletConfiguration struct {
metav1.TypeMeta
// Only used for dynamic configuration.
// The length of the trial period for this configuration. If the Kubelet records CrashLoopThreshold or
// more startups during this period, the current configuration will be marked bad and the
// Kubelet will roll-back to the last-known-good. Default 10 minutes.
ConfigTrialDuration metav1.Duration
// Only used for dynamic configuration.
// If this number of Kubelet "crashes" during ConfigTrialDuration meets this threshold,
// the configuration fails the trial and the Kubelet rolls back to its last-known-good config.
// Crash-loops are detected by counting Kubelet startups, so one startup is implicitly added
// to this threshold to always allow a single restart per config change.
// Default 10, mimimum allowed is 0, maximum allowed is 10.
CrashLoopThreshold int32
// The length of the trial period for this configuration. This configuration will become the last-known-good after this duration.
ConfigTrialDuration *metav1.Duration
// podManifestPath is the path to the directory containing pod manifests to
// run, or the path to a single manifest file
PodManifestPath string

View File

@@ -53,9 +53,6 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) {
if obj.ConfigTrialDuration == nil {
obj.ConfigTrialDuration = &metav1.Duration{Duration: 10 * time.Minute}
}
if obj.CrashLoopThreshold == nil {
obj.CrashLoopThreshold = utilpointer.Int32Ptr(10)
}
if obj.Authentication.Anonymous.Enabled == nil {
obj.Authentication.Anonymous.Enabled = boolVar(true)
}

View File

@@ -51,17 +51,8 @@ type KubeletConfiguration struct {
metav1.TypeMeta `json:",inline"`
// Only used for dynamic configuration.
// The length of the trial period for this configuration. If the Kubelet records CrashLoopThreshold or
// more startups during this period, the current configuration will be marked bad and the
// Kubelet will roll-back to the last-known-good. Default 10 minutes.
// The length of the trial period for this configuration. This configuration will become the last-known-good after this duration.
ConfigTrialDuration *metav1.Duration `json:"configTrialDuration"`
// Only used for dynamic configuration.
// If this number of Kubelet "crashes" during ConfigTrialDuration meets this threshold,
// the configuration fails the trial and the Kubelet rolls back to its last-known-good config.
// Crash-loops are detected by counting Kubelet startups, so one startup is implicitly added
// to this threshold to always allow a single restart per config change.
// Default 10, mimimum allowed is 0, maximum allowed is 10.
CrashLoopThreshold *int32 `json:"crashLoopThreshold"`
// podManifestPath is the path to the directory containing pod manifests to
// run, or the path to a single manifest file
PodManifestPath string `json:"podManifestPath"`

View File

@@ -142,12 +142,7 @@ func Convert_kubeletconfig_KubeletAuthorization_To_v1alpha1_KubeletAuthorization
}
func autoConvert_v1alpha1_KubeletConfiguration_To_kubeletconfig_KubeletConfiguration(in *KubeletConfiguration, out *kubeletconfig.KubeletConfiguration, s conversion.Scope) error {
if err := v1.Convert_Pointer_v1_Duration_To_v1_Duration(&in.ConfigTrialDuration, &out.ConfigTrialDuration, s); err != nil {
return err
}
if err := v1.Convert_Pointer_int32_To_int32(&in.CrashLoopThreshold, &out.CrashLoopThreshold, s); err != nil {
return err
}
out.ConfigTrialDuration = (*v1.Duration)(unsafe.Pointer(in.ConfigTrialDuration))
out.PodManifestPath = in.PodManifestPath
out.SyncFrequency = in.SyncFrequency
out.FileCheckFrequency = in.FileCheckFrequency
@@ -306,12 +301,7 @@ func Convert_v1alpha1_KubeletConfiguration_To_kubeletconfig_KubeletConfiguration
}
func autoConvert_kubeletconfig_KubeletConfiguration_To_v1alpha1_KubeletConfiguration(in *kubeletconfig.KubeletConfiguration, out *KubeletConfiguration, s conversion.Scope) error {
if err := v1.Convert_v1_Duration_To_Pointer_v1_Duration(&in.ConfigTrialDuration, &out.ConfigTrialDuration, s); err != nil {
return err
}
if err := v1.Convert_int32_To_Pointer_int32(&in.CrashLoopThreshold, &out.CrashLoopThreshold, s); err != nil {
return err
}
out.ConfigTrialDuration = (*v1.Duration)(unsafe.Pointer(in.ConfigTrialDuration))
out.PodManifestPath = in.PodManifestPath
out.SyncFrequency = in.SyncFrequency
out.FileCheckFrequency = in.FileCheckFrequency

View File

@@ -143,15 +143,6 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
**out = **in
}
}
if in.CrashLoopThreshold != nil {
in, out := &in.CrashLoopThreshold, &out.CrashLoopThreshold
if *in == nil {
*out = nil
} else {
*out = new(int32)
**out = **in
}
}
out.SyncFrequency = in.SyncFrequency
out.FileCheckFrequency = in.FileCheckFrequency
out.HTTPCheckFrequency = in.HTTPCheckFrequency

View File

@@ -23,18 +23,8 @@ import (
containermanager "k8s.io/kubernetes/pkg/kubelet/cm"
)
// MaxCrashLoopThreshold is the maximum allowed KubeletConfiguraiton.CrashLoopThreshold
const MaxCrashLoopThreshold = 10
// ValidateKubeletConfiguration validates `kc` and returns an error if it is invalid
func ValidateKubeletConfiguration(kc *kubeletconfig.KubeletConfiguration) error {
// restrict crashloop threshold to between 0 and `maxCrashLoopThreshold`, inclusive
// more than `maxStartups=maxCrashLoopThreshold` adds unnecessary bloat to the .startups.json file,
// and negative values would be silly.
if kc.CrashLoopThreshold < 0 || kc.CrashLoopThreshold > MaxCrashLoopThreshold {
return fmt.Errorf("field `CrashLoopThreshold` must be between 0 and %d, inclusive", MaxCrashLoopThreshold)
}
if !kc.CgroupsPerQOS && len(kc.EnforceNodeAllocatable) > 0 {
return fmt.Errorf("node allocatable enforcement is not supported unless Cgroups Per QOS feature is turned on")
}

View File

@@ -21,6 +21,7 @@ limitations under the License.
package kubeletconfig
import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
conversion "k8s.io/apimachinery/pkg/conversion"
runtime "k8s.io/apimachinery/pkg/runtime"
api "k8s.io/kubernetes/pkg/api"
@@ -124,7 +125,15 @@ func (in *KubeletAuthorization) DeepCopy() *KubeletAuthorization {
func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) {
*out = *in
out.TypeMeta = in.TypeMeta
out.ConfigTrialDuration = in.ConfigTrialDuration
if in.ConfigTrialDuration != nil {
in, out := &in.ConfigTrialDuration, &out.ConfigTrialDuration
if *in == nil {
*out = nil
} else {
*out = new(v1.Duration)
**out = **in
}
}
out.SyncFrequency = in.SyncFrequency
out.FileCheckFrequency = in.FileCheckFrequency
out.HTTPCheckFrequency = in.HTTPCheckFrequency

View File

@@ -16,17 +16,14 @@ go_library(
deps = [
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/kubeletconfig/validation:go_default_library",
"//pkg/kubelet/kubeletconfig/badconfig:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint:go_default_library",
"//pkg/kubelet/kubeletconfig/checkpoint/store:go_default_library",
"//pkg/kubelet/kubeletconfig/configfiles:go_default_library",
"//pkg/kubelet/kubeletconfig/startups:go_default_library",
"//pkg/kubelet/kubeletconfig/status:go_default_library",
"//pkg/kubelet/kubeletconfig/util/equal:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
"//pkg/kubelet/kubeletconfig/util/panic:go_default_library",
"//pkg/version:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
@@ -49,10 +46,8 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/kubeletconfig/badconfig:all-srcs",
"//pkg/kubelet/kubeletconfig/checkpoint:all-srcs",
"//pkg/kubelet/kubeletconfig/configfiles:all-srcs",
"//pkg/kubelet/kubeletconfig/startups:all-srcs",
"//pkg/kubelet/kubeletconfig/status:all-srcs",
"//pkg/kubelet/kubeletconfig/util/codec:all-srcs",
"//pkg/kubelet/kubeletconfig/util/equal:all-srcs",

View File

@@ -1,47 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_test(
name = "go_default_test",
srcs = [
"badconfig_test.go",
"fstracker_test.go",
],
library = ":go_default_library",
deps = [
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
],
)
go_library(
name = "go_default_library",
srcs = [
"badconfig.go",
"fstracker.go",
],
deps = [
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@@ -1,83 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package badconfig
import (
"encoding/json"
"fmt"
"time"
)
// Tracker tracks "bad" configurations in a storage layer
type Tracker interface {
// Initialize sets up the storage layer
Initialize() error
// MarkBad marks `uid` as a bad config and records `reason` as the reason for marking it bad
MarkBad(uid, reason string) error
// Entry returns the Entry for `uid` if it exists in the tracker, otherise nil
Entry(uid string) (*Entry, error)
}
// Entry describes when a configuration was marked bad and why
type Entry struct {
Time string `json:"time"`
Reason string `json:"reason"`
}
// markBad makes an entry in `m` for the config with `uid` and reason `reason`
func markBad(m map[string]Entry, uid, reason string) {
now := time.Now()
entry := Entry{
Time: now.Format(time.RFC3339), // use RFC3339 time format
Reason: reason,
}
m[uid] = entry
}
// getEntry returns the Entry for `uid` in `m`, or nil if no such entry exists
func getEntry(m map[string]Entry, uid string) *Entry {
entry, ok := m[uid]
if ok {
return &entry
}
return nil
}
// encode retuns a []byte representation of `m`, for saving `m` to a storage layer
func encode(m map[string]Entry) ([]byte, error) {
data, err := json.Marshal(m)
if err != nil {
return nil, err
}
return data, nil
}
// decode transforms a []byte into a `map[string]Entry`, or returns an error if it can't produce said map
// if `data` is empty, returns an empty map
func decode(data []byte) (map[string]Entry, error) {
// create the map
m := map[string]Entry{}
// if the data is empty, just return the empty map
if len(data) == 0 {
return m, nil
}
// otherwise unmarshal the json
if err := json.Unmarshal(data, &m); err != nil {
return nil, fmt.Errorf("failed to unmarshal, error: %v", err)
}
return m, nil
}

View File

@@ -1,157 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package badconfig
import (
"fmt"
"reflect"
"testing"
"time"
)
func TestMarkBad(t *testing.T) {
// build a map with one entry
m := map[string]Entry{}
uid := "uid"
reason := "reason"
markBad(m, uid, reason)
// the entry should exist for uid
entry, ok := m[uid]
if !ok {
t.Fatalf("expect entry for uid %q, but none exists", uid)
}
// the entry's reason should match the reason it was marked bad with
if entry.Reason != reason {
t.Errorf("expect Entry.Reason %q, but got %q", reason, entry.Reason)
}
// the entry's timestamp should be in RFC3339 format
if err := assertRFC3339(entry.Time); err != nil {
t.Errorf("expect Entry.Time to use RFC3339 format, but got %q, error: %v", entry.Time, err)
}
// it should be the only entry in the map thus far
if n := len(m); n != 1 {
t.Errorf("expect one entry in the map, but got %d", n)
}
}
func TestGetEntry(t *testing.T) {
nowstamp := time.Now().Format(time.RFC3339)
uid := "uid"
expect := &Entry{
Time: nowstamp,
Reason: "reason",
}
m := map[string]Entry{uid: *expect}
// should return nil for entries that don't exist
bogus := "bogus-uid"
if e := getEntry(m, bogus); e != nil {
t.Errorf("expect nil for entries that don't exist (uid: %q), but got %#v", bogus, e)
}
// should return non-nil for entries that exist
if e := getEntry(m, uid); e == nil {
t.Errorf("expect non-nil for entries that exist (uid: %q), but got nil", uid)
} else if !reflect.DeepEqual(expect, e) {
// entry should match what we inserted for the given UID
t.Errorf("expect entry for uid %q to match %#v, but got %#v", uid, expect, e)
}
}
func TestEncode(t *testing.T) {
nowstamp := time.Now().Format(time.RFC3339)
uid := "uid"
expect := fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp)
m := map[string]Entry{uid: {
Time: nowstamp,
Reason: "reason",
}}
data, err := encode(m)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
json := string(data)
if json != expect {
t.Errorf("expect encoding of %#v to match %q, but got %q", m, expect, json)
}
}
func TestDecode(t *testing.T) {
nowstamp := time.Now().Format(time.RFC3339)
uid := "uid"
valid := []byte(fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp))
expect := map[string]Entry{uid: {
Time: nowstamp,
Reason: "reason",
}}
// decoding valid json should result in an object with the correct values
if m, err := decode(valid); err != nil {
t.Errorf("expect decoding valid json %q to produce a map, but got error: %v", valid, err)
} else if !reflect.DeepEqual(expect, m) {
// m should equal expected decoded object
t.Errorf("expect decoding valid json %q to produce %#v, but got %#v", valid, expect, m)
}
// decoding invalid json should return an error
invalid := []byte(`invalid`)
if m, err := decode(invalid); err == nil {
t.Errorf("expect decoding invalid json %q to return an error, but decoded to %#v", invalid, m)
}
}
func TestRoundTrip(t *testing.T) {
nowstamp := time.Now().Format(time.RFC3339)
uid := "uid"
expect := map[string]Entry{uid: {
Time: nowstamp,
Reason: "reason",
}}
// test that encoding and decoding an object results in the same value
data, err := encode(expect)
if err != nil {
t.Fatalf("failed to encode %#v, error: %v", expect, err)
}
after, err := decode(data)
if err != nil {
t.Fatalf("failed to decode %q, error: %v", string(data), err)
}
if !reflect.DeepEqual(expect, after) {
t.Errorf("expect round-tripping %#v to result in the same value, but got %#v", expect, after)
}
}
func assertRFC3339(s string) error {
tm, err := time.Parse(time.RFC3339, s)
if err != nil {
return fmt.Errorf("expect RFC3339 format, but failed to parse, error: %v", err)
}
// parsing succeeded, now finish round-trip and compare
rt := tm.Format(time.RFC3339)
if rt != s {
return fmt.Errorf("expect RFC3339 format, but failed to round trip unchanged, original %q, round-trip %q", s, rt)
}
return nil
}

View File

@@ -1,105 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package badconfig
import (
"path/filepath"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
const (
badConfigsFile = "bad-configs.json"
)
// fsTracker tracks bad config in the local filesystem
type fsTracker struct {
// fs is the filesystem to use for storage operations; can be mocked for testing
fs utilfs.Filesystem
// trackingDir is the absolute path to the storage directory for fsTracker
trackingDir string
}
// NewFsTracker returns a new Tracker that will store information in the `trackingDir`
func NewFsTracker(fs utilfs.Filesystem, trackingDir string) Tracker {
return &fsTracker{
fs: fs,
trackingDir: trackingDir,
}
}
func (tracker *fsTracker) Initialize() error {
utillog.Infof("initializing bad config tracking directory %q", tracker.trackingDir)
if err := utilfiles.EnsureDir(tracker.fs, tracker.trackingDir); err != nil {
return err
}
if err := utilfiles.EnsureFile(tracker.fs, filepath.Join(tracker.trackingDir, badConfigsFile)); err != nil {
return err
}
return nil
}
func (tracker *fsTracker) MarkBad(uid, reason string) error {
m, err := tracker.load()
if err != nil {
return err
}
// create the bad config entry in the map
markBad(m, uid, reason)
// save the file
if err := tracker.save(m); err != nil {
return err
}
return nil
}
func (tracker *fsTracker) Entry(uid string) (*Entry, error) {
m, err := tracker.load()
if err != nil {
return nil, err
}
// return the entry, or nil if it doesn't exist
return getEntry(m, uid), nil
}
// load loads the bad-config-tracking file from disk and decodes the map encoding it contains
func (tracker *fsTracker) load() (map[string]Entry, error) {
path := filepath.Join(tracker.trackingDir, badConfigsFile)
// load the file
data, err := tracker.fs.ReadFile(path)
if err != nil {
return nil, err
}
return decode(data)
}
// save replaces the contents of the bad-config-tracking file with the encoding of `m`
func (tracker *fsTracker) save(m map[string]Entry) error {
// encode the map
data, err := encode(m)
if err != nil {
return err
}
// save the file
path := filepath.Join(tracker.trackingDir, badConfigsFile)
if err := utilfiles.ReplaceFile(tracker.fs, path, data); err != nil {
return err
}
return nil
}

View File

@@ -1,255 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package badconfig
import (
"fmt"
"path/filepath"
"reflect"
"testing"
"time"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
const testTrackingDir = "/test-tracking-dir"
// TODO(mtaufen): this file reuses a lot of test code from badconfig_test.go, should consolidate
func newInitializedFakeFsTracker() (*fsTracker, error) {
fs := utilfs.NewFakeFs()
tracker := NewFsTracker(fs, testTrackingDir)
if err := tracker.Initialize(); err != nil {
return nil, err
}
return tracker.(*fsTracker), nil
}
func TestFsTrackerInitialize(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("fsTracker.Initialize() failed with error: %v", err)
}
// check that testTrackingDir exists
_, err = tracker.fs.Stat(testTrackingDir)
if err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", testTrackingDir, err)
}
// check that testTrackingDir contains the badConfigsFile
path := filepath.Join(testTrackingDir, badConfigsFile)
_, err = tracker.fs.Stat(path)
if err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", path, err)
}
}
func TestFsTrackerMarkBad(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
// create a bad config entry in the fs
uid := "uid"
reason := "reason"
tracker.MarkBad(uid, reason)
// load the map from the fs
m, err := tracker.load()
if err != nil {
t.Fatalf("failed to load bad-config data, error: %v", err)
}
// the entry should exist for uid
entry, ok := m[uid]
if !ok {
t.Fatalf("expect entry for uid %q, but none exists", uid)
}
// the entry's reason should match the reason it was marked bad with
if entry.Reason != reason {
t.Errorf("expect Entry.Reason %q, but got %q", reason, entry.Reason)
}
// the entry's timestamp should be in RFC3339 format
if err := assertRFC3339(entry.Time); err != nil {
t.Errorf("expect Entry.Time to use RFC3339 format, but got %q, error: %v", entry.Time, err)
}
// it should be the only entry in the map thus far
if n := len(m); n != 1 {
t.Errorf("expect one entry in the map, but got %d", n)
}
}
func TestFsTrackerEntry(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
// manually save a correct entry to fs
nowstamp := time.Now().Format(time.RFC3339)
uid := "uid"
expect := &Entry{
Time: nowstamp,
Reason: "reason",
}
m := map[string]Entry{uid: *expect}
err = tracker.save(m)
if err != nil {
t.Fatalf("failed to save bad-config data, error: %v", err)
}
// should return nil for entries that don't exist
bogus := "bogus-uid"
e, err := tracker.Entry(bogus)
if err != nil {
t.Errorf("expect nil for entries that don't exist (uid: %q), but got error: %v", bogus, err)
} else if e != nil {
t.Errorf("expect nil for entries that don't exist (uid: %q), but got %#v", bogus, e)
}
// should return non-nil for entries that exist
e, err = tracker.Entry(uid)
if err != nil {
t.Errorf("expect non-nil for entries that exist (uid: %q), but got error: %v", uid, err)
} else if e == nil {
t.Errorf("expect non-nil for entries that exist (uid: %q), but got nil", uid)
} else if !reflect.DeepEqual(expect, e) {
// entry should match what we inserted for the given UID
t.Errorf("expect entry for uid %q to match %#v, but got %#v", uid, expect, e)
}
}
// TODO(mtaufen): test loading invalid json (see startups/fstracker_test.go for example)
func TestFsTrackerLoad(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
uid := "uid"
nowstamp := time.Now().Format(time.RFC3339)
cases := []struct {
desc string
data []byte
expect map[string]Entry
err string
}{
// empty file
{"empty file", []byte(""), map[string]Entry{}, ""},
// empty map
{"empty map", []byte("{}"), map[string]Entry{}, ""},
// valid json
{"valid json", []byte(fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp)),
map[string]Entry{uid: {
Time: nowstamp,
Reason: "reason",
}}, ""},
// invalid json
{"invalid json", []byte(`*`), map[string]Entry{}, "failed to unmarshal"},
}
for _, c := range cases {
// save a file containing the correct serialization
utilfiles.ReplaceFile(tracker.fs, filepath.Join(testTrackingDir, badConfigsFile), c.data)
// loading valid json should result in an object with the correct values
m, err := tracker.load()
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if !reflect.DeepEqual(c.expect, m) {
// m should equal expected decoded object
t.Errorf("case %q, expect %#v but got %#v", c.desc, c.expect, m)
}
}
}
func TestFsTrackerSave(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
uid := "uid"
nowstamp := time.Now().Format(time.RFC3339)
cases := []struct {
desc string
m map[string]Entry
expect string
err string
}{
// empty map
{"empty map", map[string]Entry{}, "{}", ""},
// 1-entry map
{"1-entry map",
map[string]Entry{uid: {
Time: nowstamp,
Reason: "reason",
}},
fmt.Sprintf(`{"%s":{"time":"%s","reason":"reason"}}`, uid, nowstamp), ""},
}
for _, c := range cases {
if err := tracker.save(c.m); utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
data, err := tracker.fs.ReadFile(filepath.Join(testTrackingDir, badConfigsFile))
if err != nil {
t.Fatalf("failed to read bad-config file, error: %v", err)
}
json := string(data)
if json != c.expect {
t.Errorf("case %q, expect %q but got %q", c.desc, c.expect, json)
}
}
}
func TestFsTrackerRoundTrip(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
nowstamp := time.Now().Format(time.RFC3339)
uid := "uid"
expect := map[string]Entry{uid: {
Time: nowstamp,
Reason: "reason",
}}
// test that saving and loading an object results in the same value
err = tracker.save(expect)
if err != nil {
t.Fatalf("failed to save bad-config data, error: %v", err)
}
after, err := tracker.load()
if err != nil {
t.Fatalf("failed to load bad-config data, error: %v", err)
}
if !reflect.DeepEqual(expect, after) {
t.Errorf("expect round-tripping %#v to result in the same value, but got %#v", expect, after)
}
}

View File

@@ -75,32 +75,8 @@ func (cc *Controller) syncConfigSource(client clientset.Interface, nodeName stri
// If we get here:
// - there is no need to restart to update the current config
// - there was no error trying to sync configuration
// - if, previously, there was an error trying to sync configuration, we need to update to the correct condition
errfmt := `sync succeeded but unable to clear "failed to sync" message from ConfigOK, error: %v`
currentUID := ""
if currentSource, err := cc.checkpointStore.Current(); err != nil {
utillog.Errorf(errfmt, err)
return
} else if currentSource != nil {
currentUID = currentSource.UID()
}
lkgUID := ""
if lkgSource, err := cc.checkpointStore.LastKnownGood(); err != nil {
utillog.Errorf(errfmt, err)
return
} else if lkgSource != nil {
lkgUID = lkgSource.UID()
}
currentBadReason := ""
if entry, err := cc.badConfigTracker.Entry(currentUID); err != nil {
utillog.Errorf(errfmt, err)
} else if entry != nil {
currentBadReason = entry.Reason
}
cc.configOK.ClearFailedSyncCondition(currentUID, lkgUID, currentBadReason, cc.initConfig != nil)
// - if, previously, there was an error trying to sync configuration, we need to clear that error from the condition
cc.configOK.ClearFailedSyncCondition()
}
// doSyncConfigSource checkpoints and sets the store's current config to the new config or resets config,

View File

@@ -27,12 +27,9 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/badconfig"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/checkpoint/store"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/configfiles"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/startups"
"k8s.io/kubernetes/pkg/kubelet/kubeletconfig/status"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
@@ -40,10 +37,8 @@ import (
)
const (
badConfigTrackingDir = "bad-config-tracking"
startupTrackingDir = "startup-tracking"
checkpointsDir = "checkpoints"
initConfigDir = "init"
checkpointsDir = "checkpoints"
initConfigDir = "init"
)
// Controller is the controller which, among other things:
@@ -51,7 +46,6 @@ const (
// - checkpoints configuration to disk
// - downloads new configuration from the API server
// - validates configuration
// - monitors for potential crash-loops caused by new configurations
// - tracks the last-known-good configuration, and rolls-back to last-known-good when necessary
// For more information, see the proposal: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/dynamic-kubelet-configuration.md
type Controller struct {
@@ -78,12 +72,6 @@ type Controller struct {
// checkpointStore persists config source checkpoints to a storage layer
checkpointStore store.Store
// badConfigTracker persists bad-config records to a storage layer
badConfigTracker badconfig.Tracker
// startupTracker persists Kubelet startup records, used for crash-loop detection, to a storage layer
startupTracker startups.Tracker
}
// NewController constructs a new Controller object and returns it. Directory paths must be absolute.
@@ -109,14 +97,6 @@ func NewController(initConfigDir string,
dynamicConfig = true
}
// Get the current kubelet version; bad-config and startup-tracking information can be kubelet-version specific,
// e.g. a bug that crash loops an old Kubelet under a given config might be fixed in a new Kubelet or vice-versa,
// validation might be relaxed in a new Kubelet, etc.
// We also don't want a change in a file format to break Kubelet upgrades; this makes sure a new kubelet gets
// a fresh dir to put its config health data in.
// Note that config checkpoints use the api machinery to store ConfigMaps, and thus get file format versioning for free.
kubeletVersion := version.Get().String()
return &Controller{
dynamicConfig: dynamicConfig,
defaultConfig: defaultConfig,
@@ -124,8 +104,6 @@ func NewController(initConfigDir string,
pendingConfigSource: make(chan bool, 1),
configOK: status.NewConfigOKCondition(),
checkpointStore: store.NewFsStore(fs, filepath.Join(dynamicConfigDir, checkpointsDir)),
badConfigTracker: badconfig.NewFsTracker(fs, filepath.Join(dynamicConfigDir, badConfigTrackingDir, kubeletVersion)),
startupTracker: startups.NewFsTracker(fs, filepath.Join(dynamicConfigDir, startupTrackingDir, kubeletVersion)),
initLoader: initLoader,
}, nil
}
@@ -170,11 +148,6 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
return nil, err
}
// record the kubelet startup time, used for crashloop detection
if err := cc.startupTracker.RecordStartup(); err != nil {
return nil, err
}
// determine UID of the current config source
curUID := ""
if curSource, err := cc.checkpointStore.Current(); err != nil {
@@ -188,14 +161,6 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
return cc.localConfig(), nil
} // Assert: we will not use the local configurations, unless we roll back to lkg; curUID is non-empty
// check whether the current config is marked bad
if entry, err := cc.badConfigTracker.Entry(curUID); err != nil {
return nil, err
} else if entry != nil {
utillog.Infof("current config %q was marked bad for reason %q at time %q", curUID, entry.Reason, entry.Time)
return cc.lkgRollback(entry.Reason)
}
// TODO(mtaufen): consider re-verifying integrity and re-attempting download when a load/verify/parse/validate
// error happens outside trial period, we already made it past the trial so it's probably filesystem corruption
// or something else scary (unless someone is using a 0-length trial period)
@@ -203,33 +168,26 @@ func (cc *Controller) Bootstrap() (*kubeletconfig.KubeletConfiguration, error) {
// load the current config
checkpoint, err := cc.checkpointStore.Load(curUID)
if err != nil {
// TODO(mtaufen): rollback and mark bad for now, but this could reasonably be handled by re-attempting a download,
// TODO(mtaufen): rollback for now, but this could reasonably be handled by re-attempting a download,
// it probably indicates some sort of corruption
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailLoadReasonFmt, curUID), fmt.Sprintf("error: %v", err))
return cc.lkgRollback(fmt.Sprintf(status.CurFailLoadReasonFmt, curUID), fmt.Sprintf("error: %v", err))
}
// parse the checkpoint into a KubeletConfiguration
cur, err := checkpoint.Parse()
if err != nil {
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailParseReasonFmt, curUID), fmt.Sprintf("error: %v", err))
return cc.lkgRollback(fmt.Sprintf(status.CurFailParseReasonFmt, curUID), fmt.Sprintf("error: %v", err))
}
// validate current config
if err := validation.ValidateKubeletConfiguration(cur); err != nil {
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailValidateReasonFmt, curUID), fmt.Sprintf("error: %v", err))
return cc.lkgRollback(fmt.Sprintf(status.CurFailValidateReasonFmt, curUID), fmt.Sprintf("error: %v", err))
}
// check for crash loops if we're still in the trial period
// when the trial period is over, the current config becomes the last-known-good
if trial, err := cc.inTrial(cur.ConfigTrialDuration.Duration); err != nil {
return nil, err
} else if trial {
if crashing, err := cc.crashLooping(cur.CrashLoopThreshold); err != nil {
return nil, err
} else if crashing {
return cc.badRollback(curUID, fmt.Sprintf(status.CurFailCrashLoopReasonFmt, curUID), "")
}
} else {
// when the trial period is over, the current config becomes the last-known-good
} else if !trial {
if err := cc.graduateCurrentToLastKnownGood(); err != nil {
return nil, err
}
@@ -293,14 +251,6 @@ func (cc *Controller) initialize() error {
if err := cc.checkpointStore.Initialize(); err != nil {
return err
}
// initialize bad config tracker
if err := cc.badConfigTracker.Initialize(); err != nil {
return err
}
// initialize startup tracker
if err := cc.startupTracker.Initialize(); err != nil {
return err
}
return nil
}
@@ -327,21 +277,6 @@ func (cc *Controller) inTrial(trialDur time.Duration) (bool, error) {
return false, nil
}
// crashLooping returns true if the number of startups since the last modification of the current config exceeds `threshold`, false otherwise
func (cc *Controller) crashLooping(threshold int32) (bool, error) {
// determine the last time the current config changed
modTime, err := cc.checkpointStore.CurrentModified()
if err != nil {
return false, err
}
// get the number of startups since that modification time
num, err := cc.startupTracker.StartupsSince(modTime)
if err != nil {
return false, err
}
return num > threshold, nil
}
// graduateCurrentToLastKnownGood sets the last-known-good UID on the checkpointStore
// to the same value as the current UID maintained by the checkpointStore
func (cc *Controller) graduateCurrentToLastKnownGood() error {

View File

@@ -26,19 +26,10 @@ import (
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
// badRollback makes an entry in the bad-config-tracking file for `uid` with `reason`, and returns the result of rolling back to the last-known-good config
func (cc *Controller) badRollback(uid, reason, detail string) (*kubeletconfig.KubeletConfiguration, error) {
utillog.Errorf(fmt.Sprintf("%s, %s", reason, detail))
if err := cc.badConfigTracker.MarkBad(uid, reason); err != nil {
return nil, err
}
return cc.lkgRollback(reason)
}
// lkgRollback returns a valid last-known-good configuration, and updates the `cc.configOK` condition
// regarding the `reason` for the rollback, or returns an error if a valid last-known-good could not be produced
func (cc *Controller) lkgRollback(reason string) (*kubeletconfig.KubeletConfiguration, error) {
utillog.Infof("rolling back to last-known-good config")
func (cc *Controller) lkgRollback(reason, detail string) (*kubeletconfig.KubeletConfiguration, error) {
utillog.Errorf(fmt.Sprintf("%s, %s", reason, detail))
lkgUID := ""
if lkgSource, err := cc.checkpointStore.LastKnownGood(); err != nil {

View File

@@ -1,48 +0,0 @@
package(default_visibility = ["//visibility:public"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"fstracker.go",
"startups.go",
],
deps = [
"//pkg/kubelet/apis/kubeletconfig/validation:go_default_library",
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/log:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = [
"fstracker_test.go",
"startups_test.go",
],
library = ":go_default_library",
deps = [
"//pkg/kubelet/kubeletconfig/util/files:go_default_library",
"//pkg/kubelet/kubeletconfig/util/filesystem:go_default_library",
"//pkg/kubelet/kubeletconfig/util/test:go_default_library",
],
)

View File

@@ -1,127 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package startups
import (
"encoding/json"
"fmt"
"path/filepath"
"time"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utillog "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/log"
)
const (
startupsFile = "startups.json"
)
// fsTracker tracks startups in the local filesystem
type fsTracker struct {
// fs is the filesystem to use for storage operations; can be mocked for testing
fs utilfs.Filesystem
// trackingDir is the absolute path to the storage directory for fsTracker
trackingDir string
}
// NewFsTracker returns a Tracker that will store information in the `trackingDir`
func NewFsTracker(fs utilfs.Filesystem, trackingDir string) Tracker {
return &fsTracker{
fs: fs,
trackingDir: trackingDir,
}
}
func (tracker *fsTracker) Initialize() error {
utillog.Infof("initializing startups tracking directory %q", tracker.trackingDir)
if err := utilfiles.EnsureDir(tracker.fs, tracker.trackingDir); err != nil {
return err
}
if err := utilfiles.EnsureFile(tracker.fs, filepath.Join(tracker.trackingDir, startupsFile)); err != nil {
return err
}
return nil
}
func (tracker *fsTracker) RecordStartup() error {
// load the file
ls, err := tracker.load()
if err != nil {
return err
}
ls = recordStartup(ls)
// save the file
err = tracker.save(ls)
if err != nil {
return err
}
return nil
}
func (tracker *fsTracker) StartupsSince(t time.Time) (int32, error) {
// load the startups-tracking file
ls, err := tracker.load()
if err != nil {
return 0, err
}
return startupsSince(ls, t)
}
// TODO(mtaufen): refactor into encode/decode like in badconfig.go
// load loads the startups-tracking file from disk
func (tracker *fsTracker) load() ([]string, error) {
path := filepath.Join(tracker.trackingDir, startupsFile)
// load the file
b, err := tracker.fs.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to load startups-tracking file %q, error: %v", path, err)
}
// parse json into the slice
ls := []string{}
// if the file is empty, just return empty slice
if len(b) == 0 {
return ls, nil
}
// otherwise unmarshal the json
if err := json.Unmarshal(b, &ls); err != nil {
return nil, fmt.Errorf("failed to unmarshal json from startups-tracking file %q, error: %v", path, err)
}
return ls, nil
}
// save replaces the contents of the startups-tracking file with `ls`
func (tracker *fsTracker) save(ls []string) error {
// marshal the json
b, err := json.Marshal(ls)
if err != nil {
return err
}
// save the file
path := filepath.Join(tracker.trackingDir, startupsFile)
if err := utilfiles.ReplaceFile(tracker.fs, path, b); err != nil {
return err
}
return nil
}

View File

@@ -1,294 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package startups
import (
"fmt"
"path/filepath"
"reflect"
"testing"
"time"
utilfiles "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/files"
utilfs "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/filesystem"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
const testTrackingDir = "/test-tracking-dir"
// TODO(mtaufen): this file reuses a lot of test code from startups_test.go, should consolidate
func newInitializedFakeFsTracker() (*fsTracker, error) {
fs := utilfs.NewFakeFs()
tracker := NewFsTracker(fs, testTrackingDir)
if err := tracker.Initialize(); err != nil {
return nil, err
}
return tracker.(*fsTracker), nil
}
func TestFsTrackerInitialize(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("tracker.Initialize() failed with error: %v", err)
}
// check that testTrackingDir exists
_, err = tracker.fs.Stat(testTrackingDir)
if err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", testTrackingDir, err)
}
// check that testTrackingDir contains the startupsFile
path := filepath.Join(testTrackingDir, startupsFile)
_, err = tracker.fs.Stat(path)
if err != nil {
t.Fatalf("expect %q to exist, but stat failed with error: %v", path, err)
}
}
func TestFsTrackerRecordStartup(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
now := time.Now()
fullList := func() []string {
ls := []string{}
for i := maxStartups; i > 0; i-- {
// subtract decreasing amounts so timestamps increase but remain in the past
ls = append(ls, now.Add(-time.Duration(i)*time.Second).Format(time.RFC3339))
}
return ls
}()
cases := []struct {
desc string
ls []string
expectHead []string // what we expect the first length-1 elements to look like after recording a new timestamp
expectLen int // how long the list should be after recording
}{
// start empty
{
"start empty",
[]string{},
[]string{},
1,
},
// start non-empty
{
"start non-empty",
// subtract 1 so stamps are in the past
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
2,
},
// rotate list
{
"rotate list",
// make a slice with len == maxStartups, containing monotonically-increasing timestamps
fullList,
fullList[1:],
maxStartups,
},
}
for _, c := range cases {
// save the starting point, record a "startup" time, then load list from fs
if err := tracker.save(c.ls); err != nil {
t.Fatalf("unexpected error: %v", err)
}
if err := tracker.RecordStartup(); err != nil {
t.Fatalf("unexpected error: %v", err)
}
ls, err := tracker.load()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if c.expectLen != len(ls) {
t.Errorf("case %q, expected list %q to have length %d", c.desc, ls, c.expectLen)
}
if !reflect.DeepEqual(c.expectHead, ls[:len(ls)-1]) {
t.Errorf("case %q, expected elements 0 through n-1 of list %q to equal %q", c.desc, ls, c.expectHead)
}
// timestamps should be monotonically increasing (assuming system clock isn't jumping around at least)
if sorted, err := timestampsSorted(ls); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if !sorted {
t.Errorf("case %q, expected monotonically increasing timestamps, but got %q", c.desc, ls)
}
}
}
func TestFsTrackerStartupsSince(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
now, err := time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
ls []string
expect int32
err string
}{
// empty list
{"empty list", []string{}, 0, ""},
// no startups since
{
"no startups since",
[]string{"2014-01-02T15:04:05Z", "2015-01-02T15:04:05Z", "2016-01-02T15:04:05Z"},
0,
"",
},
// 2 startups since
{
"some startups since",
[]string{"2016-01-02T15:04:05Z", "2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z"},
2,
"",
},
// all startups since
{
"all startups since",
[]string{"2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z", "2020-01-02T15:04:05Z"},
3,
"",
},
// invalid timestamp
{"invalid timestamp", []string{"2018-01-02T15:04:05Z08:00"}, 0, "failed to parse"},
}
for _, c := range cases {
if err := tracker.save(c.ls); err != nil {
t.Fatalf("unexected error: %v", err)
}
num, err := tracker.StartupsSince(now)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if num != c.expect {
t.Errorf("case %q, expect %d startups but got %d", c.desc, c.expect, num)
}
}
}
func TestFsTrackerLoad(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
nowstamp := time.Now().Format(time.RFC3339)
cases := []struct {
desc string
data []byte
expect []string
err string
}{
// empty file
{"empty file", []byte(""), []string{}, ""},
// empty list
{"empty list", []byte("[]"), []string{}, ""},
// valid json
{"valid json", []byte(fmt.Sprintf(`["%s"]`, nowstamp)), []string{nowstamp}, ""},
// invalid json
{"invalid json", []byte(`*`), []string{}, "failed to unmarshal"},
}
for _, c := range cases {
// save a file containing the correct serialization
utilfiles.ReplaceFile(tracker.fs, filepath.Join(testTrackingDir, startupsFile), c.data)
// loading valid json should result in an object with the correct serialization
ls, err := tracker.load()
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if !reflect.DeepEqual(c.expect, ls) {
// ls should equal expected decoded object
t.Errorf("case %q, expect %#v but got %#v", c.desc, c.expect, ls)
}
}
}
func TestFsTrackerSave(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
nowstamp := time.Now().Format(time.RFC3339)
cases := []struct {
desc string
ls []string
expect string
err string
}{
// empty list
{"empty list", []string{}, "[]", ""},
// 1-entry list
{"valid json", []string{nowstamp}, fmt.Sprintf(`["%s"]`, nowstamp), ""},
}
for _, c := range cases {
if err := tracker.save(c.ls); utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
data, err := tracker.fs.ReadFile(filepath.Join(testTrackingDir, startupsFile))
if err != nil {
t.Fatalf("failed to read startups file, error: %v", err)
}
json := string(data)
if json != c.expect {
t.Errorf("case %q, expect %q but got %q", c.desc, c.expect, json)
}
}
}
func TestFsTrackerRoundTrip(t *testing.T) {
tracker, err := newInitializedFakeFsTracker()
if err != nil {
t.Fatalf("failed to construct a tracker, error: %v", err)
}
nowstamp := time.Now().Format(time.RFC3339)
expect := []string{nowstamp}
// test that saving and loading an object results in the same value
err = tracker.save(expect)
if err != nil {
t.Fatalf("failed to save startups data, error: %v", err)
}
after, err := tracker.load()
if err != nil {
t.Fatalf("failed to load startups data, error: %v", err)
}
if !reflect.DeepEqual(expect, after) {
t.Errorf("expect round-tripping %#v to result in the same value, but got %#v", expect, after)
}
}

View File

@@ -1,69 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package startups
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig/validation"
)
const (
// we allow one extra startup to account for the startup necessary to update configuration
maxStartups = validation.MaxCrashLoopThreshold + 1
)
// Tracker tracks Kubelet startups in a storage layer
type Tracker interface {
// Initialize sets up the storage layer
Initialize() error
// RecordStartup records the current time as a Kubelet startup
RecordStartup() error
// StartupsSince returns the number of Kubelet startus recorded since `t`
StartupsSince(t time.Time) (int32, error)
}
func startupsSince(ls []string, start time.Time) (int32, error) {
// since the list is append-only we only need to count the number of timestamps since `t`
startups := int32(0)
for _, stamp := range ls {
t, err := time.Parse(time.RFC3339, stamp)
if err != nil {
return 0, fmt.Errorf("failed to parse timestamp while counting startups, error: %v", err)
}
if t.After(start) {
startups++
}
}
return startups, nil
}
func recordStartup(ls []string) []string {
// record current time
now := time.Now()
stamp := now.Format(time.RFC3339) // use RFC3339 time format
ls = append(ls, stamp)
// rotate the slice if necessary
if len(ls) > maxStartups {
ls = ls[1:]
}
// return the new slice
return ls
}

View File

@@ -1,157 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package startups
import (
"reflect"
"testing"
"time"
utiltest "k8s.io/kubernetes/pkg/kubelet/kubeletconfig/util/test"
)
func TestRecordStartup(t *testing.T) {
now := time.Now()
fullList := func() []string {
ls := []string{}
for i := maxStartups; i > 0; i-- {
// subtract decreasing amounts so timestamps increase but remain in the past
ls = append(ls, now.Add(-time.Duration(i)*time.Second).Format(time.RFC3339))
}
return ls
}()
cases := []struct {
desc string
ls []string
expectHead []string // what we expect the first length-1 elements to look like after recording a new timestamp
expectLen int // how long the list should be after recording
}{
// start empty
{
"start empty",
[]string{},
[]string{},
1,
},
// start non-empty
{
"start non-empty",
// subtract 1 so stamps are in the past
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
[]string{now.Add(-1 * time.Second).Format(time.RFC3339)},
2,
},
// rotate list
{
"rotate list",
// make a slice with len == maxStartups, containing monotonically-increasing timestamps
fullList,
fullList[1:],
maxStartups,
},
}
for _, c := range cases {
ls := recordStartup(c.ls)
if c.expectLen != len(ls) {
t.Errorf("case %q, expected list %q to have length %d", c.desc, ls, c.expectLen)
}
if !reflect.DeepEqual(c.expectHead, ls[:len(ls)-1]) {
t.Errorf("case %q, expected elements 0 through n-1 of list %q to equal %q", c.desc, ls, c.expectHead)
}
// timestamps should be monotonically increasing (assuming system clock isn't jumping around at least)
if sorted, err := timestampsSorted(ls); err != nil {
t.Fatalf("unexpected error: %v", err)
} else if !sorted {
t.Errorf("case %q, expected monotonically increasing timestamps, but got %q", c.desc, ls)
}
}
}
func TestStartupsSince(t *testing.T) {
now, err := time.Parse(time.RFC3339, "2017-01-02T15:04:05Z")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cases := []struct {
desc string
ls []string
expect int32
err string
}{
// empty list
{"empty list", []string{}, 0, ""},
// no startups since
{
"no startups since",
[]string{"2014-01-02T15:04:05Z", "2015-01-02T15:04:05Z", "2016-01-02T15:04:05Z"},
0,
"",
},
// 2 startups since
{
"some startups since",
[]string{"2016-01-02T15:04:05Z", "2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z"},
2,
"",
},
// all startups since
{
"all startups since",
[]string{"2018-01-02T15:04:05Z", "2019-01-02T15:04:05Z", "2020-01-02T15:04:05Z"},
3,
"",
},
// invalid timestamp
{"invalid timestamp", []string{"2018-01-02T15:04:05Z08:00"}, 0, "failed to parse"},
}
for _, c := range cases {
num, err := startupsSince(c.ls, now)
if utiltest.SkipRest(t, c.desc, err, c.err) {
continue
}
if num != c.expect {
t.Errorf("case %q, expect %d startups but got %d", c.desc, c.expect, num)
}
}
}
// returns true if the timestamps are monotically increasing, false otherwise
func timestampsSorted(ls []string) (bool, error) {
if len(ls) < 2 {
return true, nil
}
prev, err := time.Parse(time.RFC3339, ls[0])
if err != nil {
return false, err
}
for _, stamp := range ls[1:] {
cur, err := time.Parse(time.RFC3339, stamp)
if err != nil {
return false, err
}
if !cur.After(prev) {
return false, nil
}
prev = cur
}
return true, nil
}

View File

@@ -18,7 +18,6 @@ package status
import (
"fmt"
"strings"
"sync"
"time"
@@ -83,8 +82,8 @@ type ConfigOKCondition interface {
Set(message, reason string, status apiv1.ConditionStatus)
// SetFailedSyncCondition sets the condition for when syncing Kubelet config fails
SetFailedSyncCondition(reason string)
// ClearFailedSyncCondition resets ConfigOKCondition to the correct condition for successfully syncing the kubelet config
ClearFailedSyncCondition(current string, lastKnownGood string, currentBadReason string, initConfig bool)
// ClearFailedSyncCondition clears the overlay from SetFailedSyncCondition
ClearFailedSyncCondition()
// Sync patches the current condition into the Node identified by `nodeName`
Sync(client clientset.Interface, nodeName string)
}
@@ -95,6 +94,8 @@ type configOKCondition struct {
conditionMux sync.Mutex
// condition is the current ConfigOK node condition, which will be reported in the Node.status.conditions
condition *apiv1.NodeCondition
// failedSyncReason is sent in place of the usual reason when the Kubelet is failing to sync the remote config
failedSyncReason string
// pendingCondition; write to this channel to indicate that ConfigOK needs to be synced to the API server
pendingCondition chan bool
}
@@ -142,44 +143,20 @@ func (c *configOKCondition) Set(message, reason string, status apiv1.ConditionSt
// SetFailedSyncCondition updates the ConfigOK status to reflect that we failed to sync to the latest config because we couldn't figure out what
// config to use (e.g. due to a malformed reference, a download failure, etc)
func (c *configOKCondition) SetFailedSyncCondition(reason string) {
c.Set(c.condition.Message, fmt.Sprintf("failed to sync, desired config unclear, reason: %s", reason), apiv1.ConditionUnknown)
}
// ClearFailedSyncCondition resets ConfigOK to the correct condition for the config UIDs
// `current` and `lastKnownGood`, depending on whether current is bad (non-empty `currentBadReason`)
// and whether an init config exists (`initConfig` is true).
func (c *configOKCondition) ClearFailedSyncCondition(current string,
lastKnownGood string,
currentBadReason string,
initConfig bool) {
// since our reason-check relies on c.condition we must manually take the lock and use c.unsafeSet instead of c.Set
c.conditionMux.Lock()
defer c.conditionMux.Unlock()
if strings.Contains(c.condition.Reason, "failed to sync, desired config unclear") {
// if we should report a "current is bad, rolled back" state
if len(currentBadReason) > 0 {
if len(current) == 0 {
if initConfig {
c.unsafeSet(LkgInitMessage, currentBadReason, apiv1.ConditionFalse)
return
}
c.unsafeSet(LkgDefaultMessage, currentBadReason, apiv1.ConditionFalse)
return
}
c.unsafeSet(fmt.Sprintf(LkgRemoteMessageFmt, lastKnownGood), currentBadReason, apiv1.ConditionFalse)
return
}
// if we should report a "current is ok" state
if len(current) == 0 {
if initConfig {
c.unsafeSet(CurInitMessage, CurInitOKReason, apiv1.ConditionTrue)
return
}
c.unsafeSet(CurDefaultMessage, CurDefaultOKReason, apiv1.ConditionTrue)
return
}
c.unsafeSet(fmt.Sprintf(CurRemoteMessageFmt, current), CurRemoteOKReason, apiv1.ConditionTrue)
}
// set the reason overlay and poke the sync worker to send the update
c.failedSyncReason = fmt.Sprintf("failed to sync, desired config unclear, reason: %s", reason)
c.pokeSyncWorker()
}
// ClearFailedSyncCondition removes the "failed to sync" reason overlay
func (c *configOKCondition) ClearFailedSyncCondition() {
c.conditionMux.Lock()
defer c.conditionMux.Unlock()
// clear the reason overlay and poke the sync worker to send the update
c.failedSyncReason = ""
c.pokeSyncWorker()
}
// pokeSyncWorker notes that the ConfigOK condition needs to be synced to the API server
@@ -239,6 +216,16 @@ func (c *configOKCondition) Sync(client clientset.Interface, nodeName string) {
c.condition.LastTransitionTime = remote.LastTransitionTime
}
// overlay the failedSyncReason if necessary
var condition *apiv1.NodeCondition
if len(c.failedSyncReason) > 0 {
// get a copy of the condition before we edit it
condition = c.condition.DeepCopy()
condition.Reason = c.failedSyncReason
} else {
condition = c.condition
}
// generate the patch
mediaType := "application/json"
info, ok := kuberuntime.SerializerInfoForMediaType(api.Codecs.SupportedMediaTypes(), mediaType)