Merge pull request #12648 from samsabed/crashloop
back off restarts of crashlooping containers
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
@@ -151,7 +152,7 @@ func (f *FakeRuntime) GetPods(all bool) ([]*Pod, error) {
|
||||
return f.PodList, f.Err
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secret) error {
|
||||
func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secret, backOff *util.Backoff) error {
|
||||
f.Lock()
|
||||
defer f.Unlock()
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ limitations under the License.
|
||||
package container
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"reflect"
|
||||
@@ -24,9 +25,13 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/volume"
|
||||
)
|
||||
|
||||
// Container Terminated and Kubelet is backing off the restart
|
||||
var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff")
|
||||
|
||||
type Version interface {
|
||||
// Compare compares two versions of the runtime. On success it returns -1
|
||||
// if the version is less than the other, 1 if it is greater than the other,
|
||||
@@ -53,7 +58,7 @@ type Runtime interface {
|
||||
// exited and dead containers (used for garbage collection).
|
||||
GetPods(all bool) ([]*Pod, error)
|
||||
// Syncs the running pod into the desired pod.
|
||||
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error
|
||||
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error
|
||||
// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
|
||||
KillPod(pod *api.Pod, runningPod Pod) error
|
||||
// GetPodStatus retrieves the status of the pod, including the information of
|
||||
|
||||
@@ -233,14 +233,15 @@ func (c DockerContainers) FindPodContainer(podFullName string, uid types.UID, co
|
||||
const containerNamePrefix = "k8s"
|
||||
|
||||
// Creates a name which can be reversed to identify both full pod name and container name.
|
||||
func BuildDockerName(dockerName KubeletContainerName, container *api.Container) string {
|
||||
func BuildDockerName(dockerName KubeletContainerName, container *api.Container) (string, string) {
|
||||
containerName := dockerName.ContainerName + "." + strconv.FormatUint(kubecontainer.HashContainer(container), 16)
|
||||
return fmt.Sprintf("%s_%s_%s_%s_%08x",
|
||||
stableName := fmt.Sprintf("%s_%s_%s_%s",
|
||||
containerNamePrefix,
|
||||
containerName,
|
||||
dockerName.PodFullName,
|
||||
dockerName.PodUID,
|
||||
rand.Uint32())
|
||||
dockerName.PodUID)
|
||||
|
||||
return stableName, fmt.Sprintf("%s_%08x", stableName, rand.Uint32())
|
||||
}
|
||||
|
||||
// Unpacks a container name, returning the pod full name and container name we would have used to
|
||||
|
||||
@@ -100,7 +100,7 @@ func verifyPackUnpack(t *testing.T, podNamespace, podUID, podName, containerName
|
||||
util.DeepHashObject(hasher, *container)
|
||||
computedHash := uint64(hasher.Sum32())
|
||||
podFullName := fmt.Sprintf("%s_%s", podName, podNamespace)
|
||||
name := BuildDockerName(KubeletContainerName{podFullName, types.UID(podUID), container.Name}, container)
|
||||
_, name := BuildDockerName(KubeletContainerName{podFullName, types.UID(podUID), container.Name}, container)
|
||||
returned, hash, err := ParseDockerName(name)
|
||||
if err != nil {
|
||||
t.Errorf("Failed to parse Docker container name %q: %v", name, err)
|
||||
|
||||
@@ -468,10 +468,15 @@ func (dm *DockerManager) GetPodStatus(pod *api.Pod) (*api.PodStatus, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Handle the containers for which we cannot find any associated active or
|
||||
// dead docker containers.
|
||||
// Handle the containers for which we cannot find any associated active or dead docker containers or are in restart backoff
|
||||
for _, container := range manifest.Containers {
|
||||
if _, found := statuses[container.Name]; found {
|
||||
if containerStatus, found := statuses[container.Name]; found {
|
||||
reason, ok := dm.reasonCache.Get(uid, container.Name)
|
||||
if ok && reason == kubecontainer.ErrCrashLoopBackOff.Error() {
|
||||
containerStatus.LastTerminationState = containerStatus.State
|
||||
containerStatus.State.Waiting = &api.ContainerStateWaiting{Reason: reason}
|
||||
containerStatus.State.Running = nil
|
||||
}
|
||||
continue
|
||||
}
|
||||
var containerStatus api.ContainerStatus
|
||||
@@ -634,8 +639,9 @@ func (dm *DockerManager) runContainer(
|
||||
// of CPU shares.
|
||||
cpuShares = milliCPUToShares(cpuRequest.MilliValue())
|
||||
}
|
||||
_, containerName := BuildDockerName(dockerName, container)
|
||||
dockerOpts := docker.CreateContainerOptions{
|
||||
Name: BuildDockerName(dockerName, container),
|
||||
Name: containerName,
|
||||
Config: &docker.Config{
|
||||
Env: makeEnvList(opts.Envs),
|
||||
ExposedPorts: exposedPorts,
|
||||
@@ -1641,7 +1647,7 @@ func (dm *DockerManager) clearReasonCache(pod *api.Pod, container *api.Container
|
||||
}
|
||||
|
||||
// Sync the running pod to match the specified desired pod.
|
||||
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error {
|
||||
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
|
||||
@@ -1707,6 +1713,13 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
|
||||
// Start everything
|
||||
for idx := range containerChanges.ContainersToStart {
|
||||
container := &pod.Spec.Containers[idx]
|
||||
|
||||
// containerChanges.StartInfraContainer causes the containers to be restarted for config reasons
|
||||
// ignore backoff
|
||||
if !containerChanges.StartInfraContainer && dm.doBackOff(pod, container, podStatus, backOff) {
|
||||
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, podFullName)
|
||||
continue
|
||||
}
|
||||
glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName)
|
||||
err := dm.imagePuller.PullImage(pod, container, pullSecrets)
|
||||
dm.updateReasonCache(pod, container, err)
|
||||
@@ -1799,3 +1812,43 @@ func getUidFromUser(id string) string {
|
||||
// no gid, just return the id
|
||||
return id
|
||||
}
|
||||
|
||||
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus api.PodStatus, backOff *util.Backoff) bool {
|
||||
var ts util.Time
|
||||
for _, containerStatus := range podStatus.ContainerStatuses {
|
||||
if containerStatus.Name != container.Name {
|
||||
continue
|
||||
}
|
||||
// first failure
|
||||
if containerStatus.State.Terminated != nil {
|
||||
ts = containerStatus.State.Terminated.FinishedAt
|
||||
break
|
||||
}
|
||||
// state is waiting and the failure timestamp is in LastTerminationState
|
||||
if (containerStatus.State.Waiting != nil) && (containerStatus.LastTerminationState.Terminated != nil) {
|
||||
ts = containerStatus.LastTerminationState.Terminated.FinishedAt
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// found a container that requires backoff
|
||||
if !ts.IsZero() {
|
||||
dockerName := KubeletContainerName{
|
||||
PodFullName: kubecontainer.GetPodFullName(pod),
|
||||
PodUID: pod.UID,
|
||||
ContainerName: container.Name,
|
||||
}
|
||||
stableName, _ := BuildDockerName(dockerName, container)
|
||||
if backOff.IsInBackOffSince(stableName, ts.Time) {
|
||||
if ref, err := kubecontainer.GenerateContainerRef(pod, container); err == nil {
|
||||
dm.recorder.Eventf(ref, "Backoff", "Back-off restarting failed docker container")
|
||||
}
|
||||
dm.updateReasonCache(pod, container, kubecontainer.ErrCrashLoopBackOff)
|
||||
glog.Infof("Back-off %s restarting failed container=%s pod=%s", backOff.Get(stableName), container.Name, kubecontainer.GetPodFullName(pod))
|
||||
return true
|
||||
}
|
||||
backOff.Next(stableName, ts.Time)
|
||||
}
|
||||
dm.clearReasonCache(pod, container)
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -844,7 +844,7 @@ func generatePodInfraContainerHash(pod *api.Pod) uint64 {
|
||||
|
||||
// runSyncPod is a helper function to retrieve the running pods from the fake
|
||||
// docker client and runs SyncPod for the given pod.
|
||||
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod) {
|
||||
func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, pod *api.Pod, backOff *util.Backoff) {
|
||||
runningPods, err := dm.GetPods(false)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
@@ -855,7 +855,10 @@ func runSyncPod(t *testing.T, dm *DockerManager, fakeDocker *FakeDockerClient, p
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
fakeDocker.ClearCalls()
|
||||
err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{})
|
||||
if backOff == nil {
|
||||
backOff = util.NewBackOff(time.Second, time.Minute)
|
||||
}
|
||||
err = dm.SyncPod(pod, runningPod, *podStatus, []api.Secret{}, backOff)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@@ -878,7 +881,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Create pod infra container.
|
||||
"create", "start", "inspect_container",
|
||||
@@ -926,7 +929,7 @@ func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Create pod infra container.
|
||||
@@ -978,7 +981,7 @@ func TestSyncPodWithPodInfraCreatesContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Inspect pod infra container (but does not create)"
|
||||
@@ -1017,7 +1020,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Kill the container since pod infra container is not running.
|
||||
@@ -1090,7 +1093,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Check the pod infra container.
|
||||
@@ -1144,7 +1147,7 @@ func TestSyncPodBadHash(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Check the pod infra container.
|
||||
@@ -1202,7 +1205,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Check the pod infra container.
|
||||
@@ -1259,7 +1262,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Check the pod infra contianer.
|
||||
@@ -1292,7 +1295,7 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
fakeDocker.Lock()
|
||||
|
||||
@@ -1456,7 +1459,7 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
|
||||
fakeDocker.ContainerMap = containerMap
|
||||
pod.Spec.RestartPolicy = tt.policy
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
// 'stop' is because the pod infra container is killed when no container is running.
|
||||
verifyCalls(t, fakeDocker, tt.calls)
|
||||
@@ -1575,7 +1578,7 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
// Check if we can retrieve the pod status.
|
||||
status, err := dm.GetPodStatus(pod)
|
||||
@@ -1603,6 +1606,123 @@ func TestGetPodStatusWithLastTermination(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestSyncPodBackoff(t *testing.T) {
|
||||
var fakeClock = &util.FakeClock{Time: time.Now()}
|
||||
startTime := fakeClock.Now()
|
||||
|
||||
dm, fakeDocker := newTestDockerManager()
|
||||
containers := []api.Container{
|
||||
{Name: "good"},
|
||||
{Name: "bad"},
|
||||
}
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
Name: "podfoo",
|
||||
Namespace: "nsnew",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: containers,
|
||||
},
|
||||
}
|
||||
|
||||
containerList := []docker.APIContainers{
|
||||
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>_<random>
|
||||
{
|
||||
// pod infra container
|
||||
Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_podfoo_nsnew_12345678_0"},
|
||||
ID: "9876",
|
||||
},
|
||||
{
|
||||
Names: []string{"/k8s_good." + strconv.FormatUint(kubecontainer.HashContainer(&containers[0]), 16) + "_podfoo_nsnew_12345678_0"},
|
||||
ID: "1234",
|
||||
},
|
||||
}
|
||||
|
||||
exitedAPIContainers := []docker.APIContainers{
|
||||
{
|
||||
// format is // k8s_<container-id>_<pod-fullname>_<pod-uid>
|
||||
Names: []string{"/k8s_bad." + strconv.FormatUint(kubecontainer.HashContainer(&containers[1]), 16) + "_podfoo_nsnew_12345678_0"},
|
||||
ID: "5678",
|
||||
},
|
||||
}
|
||||
stableId := "k8s_bad." + strconv.FormatUint(kubecontainer.HashContainer(&containers[1]), 16) + "_podfoo_nsnew_12345678"
|
||||
containerMap := map[string]*docker.Container{
|
||||
"9876": {
|
||||
ID: "9876",
|
||||
Name: "POD",
|
||||
Config: &docker.Config{},
|
||||
HostConfig: &docker.HostConfig{},
|
||||
State: docker.State{
|
||||
StartedAt: startTime,
|
||||
Running: true,
|
||||
},
|
||||
},
|
||||
"1234": {
|
||||
ID: "1234",
|
||||
Name: "good",
|
||||
Config: &docker.Config{},
|
||||
HostConfig: &docker.HostConfig{},
|
||||
State: docker.State{
|
||||
StartedAt: startTime,
|
||||
Running: true,
|
||||
},
|
||||
},
|
||||
"5678": {
|
||||
ID: "5678",
|
||||
Name: "bad",
|
||||
Config: &docker.Config{},
|
||||
HostConfig: &docker.HostConfig{},
|
||||
State: docker.State{
|
||||
ExitCode: 42,
|
||||
StartedAt: startTime,
|
||||
FinishedAt: fakeClock.Now(),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
startCalls := []string{"inspect_container", "create", "start", "inspect_container"}
|
||||
backOffCalls := []string{"inspect_container"}
|
||||
tests := []struct {
|
||||
tick int
|
||||
backoff int
|
||||
killDelay int
|
||||
result []string
|
||||
}{
|
||||
{1, 1, 1, startCalls},
|
||||
{2, 2, 2, startCalls},
|
||||
{3, 2, 3, backOffCalls},
|
||||
{4, 4, 4, startCalls},
|
||||
{5, 4, 5, backOffCalls},
|
||||
{6, 4, 6, backOffCalls},
|
||||
{7, 4, 7, backOffCalls},
|
||||
{8, 8, 129, startCalls},
|
||||
{130, 1, 0, startCalls},
|
||||
}
|
||||
|
||||
backOff := util.NewBackOff(time.Second, time.Minute)
|
||||
backOff.Clock = fakeClock
|
||||
for _, c := range tests {
|
||||
fakeDocker.ContainerMap = containerMap
|
||||
fakeDocker.ExitedContainerList = exitedAPIContainers
|
||||
fakeDocker.ContainerList = containerList
|
||||
fakeClock.Time = startTime.Add(time.Duration(c.tick) * time.Second)
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod, backOff)
|
||||
verifyCalls(t, fakeDocker, c.result)
|
||||
|
||||
if backOff.Get(stableId) != time.Duration(c.backoff)*time.Second {
|
||||
t.Errorf("At tick %s expected backoff=%s got=%s", time.Duration(c.tick)*time.Second, time.Duration(c.backoff)*time.Second, backOff.Get(stableId))
|
||||
}
|
||||
|
||||
if len(fakeDocker.Created) > 0 {
|
||||
// pretend kill the container
|
||||
fakeDocker.Created = nil
|
||||
containerMap["5678"].State.FinishedAt = startTime.Add(time.Duration(c.killDelay) * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetPodCreationFailureReason(t *testing.T) {
|
||||
dm, fakeDocker := newTestDockerManager()
|
||||
|
||||
@@ -1639,7 +1759,7 @@ func TestGetPodCreationFailureReason(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
// Check if we can retrieve the pod status.
|
||||
status, err := dm.GetPodStatus(pod)
|
||||
if err != nil {
|
||||
@@ -1695,7 +1815,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
// Check if we can retrieve the pod status.
|
||||
status, err := dm.GetPodStatus(pod)
|
||||
if err != nil {
|
||||
@@ -1839,7 +1959,7 @@ func TestSyncPodWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Check the pod infra container.
|
||||
@@ -1902,7 +2022,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Check the pod infra container.
|
||||
@@ -1969,7 +2089,7 @@ func TestSyncPodWithTerminationLog(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Create pod infra container.
|
||||
"create", "start", "inspect_container",
|
||||
@@ -2007,7 +2127,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod)
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil)
|
||||
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Create pod infra container.
|
||||
|
||||
@@ -79,6 +79,9 @@ const (
|
||||
|
||||
// Location of container logs.
|
||||
containerLogsDir = "/var/log/containers"
|
||||
|
||||
// max backoff period
|
||||
maxContainerBackOff = 300 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -373,6 +376,7 @@ func NewMainKubelet(
|
||||
}
|
||||
}
|
||||
|
||||
klet.backOff = util.NewBackOff(resyncInterval, maxContainerBackOff)
|
||||
return klet, nil
|
||||
}
|
||||
|
||||
@@ -525,6 +529,9 @@ type Kubelet struct {
|
||||
|
||||
// Monitor Kubelet's sync loop
|
||||
syncLoopMonitor util.AtomicValue
|
||||
|
||||
// Container restart Backoff
|
||||
backOff *util.Backoff
|
||||
}
|
||||
|
||||
// getRootDir returns the full path to the directory under which kubelet can
|
||||
@@ -1258,7 +1265,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
|
||||
return err
|
||||
}
|
||||
|
||||
err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets)
|
||||
err = kl.containerRuntime.SyncPod(pod, runningPod, podStatus, pullSecrets, kl.backOff)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1560,6 +1567,7 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro
|
||||
glog.Errorf("Failed to cleanup terminated pods: %v", err)
|
||||
}
|
||||
|
||||
kl.backOff.GC()
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@@ -129,6 +129,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
kubelet.volumeManager = newVolumeManager()
|
||||
kubelet.containerManager, _ = newContainerManager(mockCadvisor, "", "", "")
|
||||
kubelet.networkConfigured = true
|
||||
fakeClock := &util.FakeClock{Time: time.Now()}
|
||||
kubelet.backOff = util.NewBackOff(time.Second, time.Minute)
|
||||
kubelet.backOff.Clock = fakeClock
|
||||
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/errors"
|
||||
)
|
||||
|
||||
@@ -910,7 +911,7 @@ func (r *runtime) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
|
||||
}
|
||||
|
||||
// SyncPod syncs the running pod to match the specified desired pod.
|
||||
func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error {
|
||||
func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
|
||||
podFullName := kubecontainer.GetPodFullName(pod)
|
||||
|
||||
// Add references to all containers.
|
||||
|
||||
111
pkg/util/backoff.go
Normal file
111
pkg/util/backoff.go
Normal file
@@ -0,0 +1,111 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type backoffEntry struct {
|
||||
backoff time.Duration
|
||||
lastUpdate time.Time
|
||||
}
|
||||
|
||||
type Backoff struct {
|
||||
sync.Mutex
|
||||
Clock Clock
|
||||
defaultDuration time.Duration
|
||||
maxDuration time.Duration
|
||||
perItemBackoff map[string]*backoffEntry
|
||||
}
|
||||
|
||||
func NewBackOff(initial, max time.Duration) *Backoff {
|
||||
return &Backoff{
|
||||
perItemBackoff: map[string]*backoffEntry{},
|
||||
Clock: RealClock{},
|
||||
defaultDuration: initial,
|
||||
maxDuration: max,
|
||||
}
|
||||
}
|
||||
|
||||
// Get the current backoff Duration
|
||||
func (p *Backoff) Get(id string) time.Duration {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
var delay time.Duration
|
||||
entry, ok := p.perItemBackoff[id]
|
||||
if ok {
|
||||
delay = entry.backoff
|
||||
}
|
||||
return delay
|
||||
}
|
||||
|
||||
// move backoff to the next mark, capping at maxDuration
|
||||
func (p *Backoff) Next(id string, eventTime time.Time) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
entry, ok := p.perItemBackoff[id]
|
||||
if !ok || hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
|
||||
entry = p.initEntryUnsafe(id)
|
||||
} else {
|
||||
delay := entry.backoff * 2 // exponential
|
||||
entry.backoff = time.Duration(math.Min(float64(delay), float64(p.maxDuration)))
|
||||
}
|
||||
entry.lastUpdate = p.Clock.Now()
|
||||
}
|
||||
|
||||
// Returns True if the elapsed time since eventTime is smaller than the current backoff window
|
||||
func (p *Backoff) IsInBackOffSince(id string, eventTime time.Time) bool {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
entry, ok := p.perItemBackoff[id]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
if hasExpired(eventTime, entry.lastUpdate, p.maxDuration) {
|
||||
return false
|
||||
}
|
||||
return p.Clock.Now().Sub(eventTime) < entry.backoff
|
||||
}
|
||||
|
||||
// Garbage collect records that have aged past maxDuration. Backoff users are expected
|
||||
// to invoke this periodically.
|
||||
func (p *Backoff) GC() {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
now := p.Clock.Now()
|
||||
for id, entry := range p.perItemBackoff {
|
||||
if now.Sub(entry.lastUpdate) > p.maxDuration*2 {
|
||||
// GC when entry has not been updated for 2*maxDuration
|
||||
delete(p.perItemBackoff, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Take a lock on *Backoff, before calling initEntryUnsafe
|
||||
func (p *Backoff) initEntryUnsafe(id string) *backoffEntry {
|
||||
entry := &backoffEntry{backoff: p.defaultDuration}
|
||||
p.perItemBackoff[id] = entry
|
||||
return entry
|
||||
}
|
||||
|
||||
// After 2*maxDuration we restart the backoff factor to the begining
|
||||
func hasExpired(eventTime time.Time, lastUpdate time.Time, maxDuration time.Duration) bool {
|
||||
return eventTime.Sub(lastUpdate) > maxDuration*2 // consider stable if it's ok for twice the maxDuration
|
||||
}
|
||||
125
pkg/util/backoff_test.go
Normal file
125
pkg/util/backoff_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewFakeBackOff(initial, max time.Duration, tc *FakeClock) *Backoff {
|
||||
return &Backoff{
|
||||
perItemBackoff: map[string]*backoffEntry{},
|
||||
Clock: tc,
|
||||
defaultDuration: initial,
|
||||
maxDuration: max,
|
||||
}
|
||||
}
|
||||
|
||||
func TestSlowBackoff(t *testing.T) {
|
||||
id := "_idSlow"
|
||||
tc := &FakeClock{Time: time.Now()}
|
||||
step := time.Second
|
||||
maxDuration := 50 * step
|
||||
|
||||
b := NewFakeBackOff(step, maxDuration, tc)
|
||||
cases := []time.Duration{0, 1, 2, 4, 8, 16, 32, 50, 50, 50}
|
||||
for ix, c := range cases {
|
||||
tc.Step(step)
|
||||
w := b.Get(id)
|
||||
if w != c*step {
|
||||
t.Errorf("input: '%d': expected %s, got %s", ix, c*step, w)
|
||||
}
|
||||
b.Next(id, tc.Now())
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackoffReset(t *testing.T) {
|
||||
id := "_idReset"
|
||||
tc := &FakeClock{Time: time.Now()}
|
||||
step := time.Second
|
||||
maxDuration := step * 5
|
||||
b := NewFakeBackOff(step, maxDuration, tc)
|
||||
startTime := tc.Now()
|
||||
|
||||
// get to backoff = maxDuration
|
||||
for i := 0; i <= int(maxDuration/step); i++ {
|
||||
tc.Step(step)
|
||||
b.Next(id, tc.Now())
|
||||
}
|
||||
|
||||
// backoff should be capped at maxDuration
|
||||
if !b.IsInBackOffSince(id, tc.Now()) {
|
||||
t.Errorf("expected to be in Backoff got %s", b.Get(id))
|
||||
}
|
||||
|
||||
lastUpdate := tc.Now()
|
||||
tc.Step(2*maxDuration + step) // time += 11s, 11 > 2*maxDuration
|
||||
if b.IsInBackOffSince(id, lastUpdate) {
|
||||
t.Errorf("now=%s lastUpdate=%s (%s) expected Backoff reset got %s b.lastUpdate=%s", tc.Now(), startTime, tc.Now().Sub(lastUpdate), b.Get(id))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackoffHightWaterMark(t *testing.T) {
|
||||
id := "_idHiWaterMark"
|
||||
tc := &FakeClock{Time: time.Now()}
|
||||
step := time.Second
|
||||
maxDuration := 5 * step
|
||||
b := NewFakeBackOff(step, maxDuration, tc)
|
||||
|
||||
// get to backoff = maxDuration
|
||||
for i := 0; i <= int(maxDuration/step); i++ {
|
||||
tc.Step(step)
|
||||
b.Next(id, tc.Now())
|
||||
}
|
||||
|
||||
// backoff high watermark expires after 2*maxDuration
|
||||
tc.Step(maxDuration + step)
|
||||
b.Next(id, tc.Now())
|
||||
|
||||
if b.Get(id) != maxDuration {
|
||||
t.Errorf("expected Backoff to stay at high watermark %s got %s", maxDuration, b.Get(id))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBackoffGC(t *testing.T) {
|
||||
id := "_idGC"
|
||||
tc := &FakeClock{Time: time.Now()}
|
||||
step := time.Second
|
||||
maxDuration := 5 * step
|
||||
|
||||
b := NewFakeBackOff(step, maxDuration, tc)
|
||||
|
||||
for i := 0; i <= int(maxDuration/step); i++ {
|
||||
tc.Step(step)
|
||||
b.Next(id, tc.Now())
|
||||
}
|
||||
lastUpdate := tc.Now()
|
||||
tc.Step(maxDuration + step)
|
||||
b.GC()
|
||||
_, found := b.perItemBackoff[id]
|
||||
if !found {
|
||||
t.Errorf("expected GC to skip entry, elapsed time=%s maxDuration=%s", tc.Now().Sub(lastUpdate), maxDuration)
|
||||
}
|
||||
|
||||
tc.Step(maxDuration + step)
|
||||
b.GC()
|
||||
r, found := b.perItemBackoff[id]
|
||||
if found {
|
||||
t.Errorf("expected GC of entry after %s got entry %v", tc.Now().Sub(lastUpdate), r)
|
||||
}
|
||||
}
|
||||
@@ -54,3 +54,8 @@ func (f *FakeClock) Now() time.Time {
|
||||
func (f *FakeClock) Since(ts time.Time) time.Duration {
|
||||
return f.Time.Sub(ts)
|
||||
}
|
||||
|
||||
// Move clock by Duration
|
||||
func (f *FakeClock) Step(d time.Duration) {
|
||||
f.Time = f.Time.Add(d)
|
||||
}
|
||||
|
||||
38
pkg/util/clock_test.go
Normal file
38
pkg/util/clock_test.go
Normal file
@@ -0,0 +1,38 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestFakeClock(t *testing.T) {
|
||||
startTime := time.Now()
|
||||
tc := &FakeClock{Time: startTime}
|
||||
tc.Step(time.Second)
|
||||
now := tc.Now()
|
||||
if now.Sub(startTime) != time.Second {
|
||||
t.Errorf("input: %s now=%s gap=%s expected=%s", startTime, now, now.Sub(startTime), time.Second)
|
||||
}
|
||||
|
||||
tt := tc.Now()
|
||||
tc.Time = tt.Add(time.Hour)
|
||||
if tc.Now().Sub(tt) != time.Hour {
|
||||
t.Errorf("input: %s now=%s gap=%s expected=%s", tt, tc.Now(), tc.Now().Sub(tt), time.Hour)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user