diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 7c768051984..16a1d5fe73d 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -21,7 +21,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/oom" @@ -31,7 +31,7 @@ import ( func NewFakeDockerManager( client DockerInterface, recorder record.EventRecorder, - prober prober.Prober, + livenessManager proberesults.Manager, containerRefManager *kubecontainer.RefManager, machineInfo *cadvisorapi.MachineInfo, podInfraContainerImage string, @@ -45,7 +45,7 @@ func NewFakeDockerManager( fakeOOMAdjuster := oom.NewFakeOOMAdjuster() fakeProcFs := procfs.NewFakeProcFs() - dm := NewDockerManager(client, recorder, prober, containerRefManager, machineInfo, podInfraContainerImage, qps, + dm := NewDockerManager(client, recorder, livenessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, fakeOOMAdjuster, fakeProcFs, false, imageBackOff) dm.dockerPuller = &FakeDockerPuller{} diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 8cfac56c5ca..e8f1bfc8353 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -44,10 +44,9 @@ import ( "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/hairpin" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/qos" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -119,8 +118,8 @@ type DockerManager struct { // Network plugin. networkPlugin network.NetworkPlugin - // Health check prober. - prober prober.Prober + // Health check results. + livenessManager proberesults.Manager // Generator of runtime container options. generator kubecontainer.RunContainerOptionsGenerator @@ -147,7 +146,7 @@ type DockerManager struct { func NewDockerManager( client DockerInterface, recorder record.EventRecorder, - prober prober.Prober, + livenessManager proberesults.Manager, containerRefManager *kubecontainer.RefManager, machineInfo *cadvisorapi.MachineInfo, podInfraContainerImage string, @@ -208,7 +207,7 @@ func NewDockerManager( dockerRoot: dockerRoot, containerLogsDir: containerLogsDir, networkPlugin: networkPlugin, - prober: prober, + livenessManager: livenessManager, generator: generator, execHandler: execHandler, oomAdjuster: oomAdjuster, @@ -1762,20 +1761,13 @@ func (dm *DockerManager) computePodContainerChanges(pod *api.Pod, runningPod kub continue } - result, err := dm.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created) - if err != nil { - // TODO(vmarmol): examine this logic. - glog.V(2).Infof("probe no-error: %q", container.Name) - containersToKeep[containerID] = index - continue - } - if result == probe.Success { - glog.V(4).Infof("probe success: %q", container.Name) + liveness, found := dm.livenessManager.Get(c.ID) + if !found || liveness == proberesults.Success { containersToKeep[containerID] = index continue } if pod.Spec.RestartPolicy != api.RestartPolicyNever { - glog.Infof("pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) + glog.Infof("pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name) containersToStart[index] = empty{} } } diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index f62c7dac5ed..fada0015c48 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -38,7 +38,8 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" uexec "k8s.io/kubernetes/pkg/util/exec" @@ -83,7 +84,7 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage dockerManager := NewFakeDockerManager( fakeDocker, fakeRecorder, - prober.FakeProber{}, + proberesults.NewManager(), containerRefManager, &cadvisorapi.MachineInfo{}, PodInfraContainerImage, @@ -854,6 +855,10 @@ func TestSyncPodBadHash(t *testing.T) { } func TestSyncPodsUnhealthy(t *testing.T) { + const ( + unhealthyContainerID = "1234" + infraContainerID = "9876" + ) dm, fakeDocker := newTestDockerManager() pod := &api.Pod{ ObjectMeta: api.ObjectMeta{ @@ -862,40 +867,35 @@ func TestSyncPodsUnhealthy(t *testing.T) { Namespace: "new", }, Spec: api.PodSpec{ - Containers: []api.Container{ - {Name: "bar", - LivenessProbe: &api.Probe{ - // Always returns healthy == false - }, - }, - }, + Containers: []api.Container{{Name: "unhealthy"}}, }, } fakeDocker.ContainerList = []docker.APIContainers{ { // the k8s prefix is required for the kubelet to manage the container - Names: []string{"/k8s_bar_foo_new_12345678_42"}, - ID: "1234", + Names: []string{"/k8s_unhealthy_foo_new_12345678_42"}, + ID: unhealthyContainerID, }, { // pod infra container Names: []string{"/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42"}, - ID: "9876", + ID: infraContainerID, }, } fakeDocker.ContainerMap = map[string]*docker.Container{ - "1234": { - ID: "1234", + unhealthyContainerID: { + ID: unhealthyContainerID, Config: &docker.Config{}, HostConfig: &docker.HostConfig{}, }, - "9876": { - ID: "9876", + infraContainerID: { + ID: infraContainerID, Config: &docker.Config{}, HostConfig: &docker.HostConfig{}, }, } + dm.livenessManager.Set(kubetypes.DockerID(unhealthyContainerID).ContainerID(), proberesults.Failure, nil) runSyncPod(t, dm, fakeDocker, pod, nil) @@ -908,7 +908,7 @@ func TestSyncPodsUnhealthy(t *testing.T) { "create", "start", "inspect_container", }) - if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { + if err := fakeDocker.AssertStopped([]string{unhealthyContainerID}); err != nil { t.Errorf("%v", err) } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1c2d63a6d93..3c2f05a8585 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -54,12 +54,12 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/rkt" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -309,6 +309,10 @@ func NewMainKubelet( procFs := procfs.NewProcFs() imageBackOff := util.NewBackOff(resyncInterval, MaxContainerBackOff) + + readinessManager := proberesults.NewManager() + klet.livenessManager = proberesults.NewManagerWithUpdates() + // Initialize the runtime. switch containerRuntime { case "docker": @@ -316,7 +320,7 @@ func NewMainKubelet( klet.containerRuntime = dockertools.NewDockerManager( dockerClient, recorder, - klet, // prober + klet.livenessManager, containerRefManager, machineInfo, podInfraContainerImage, @@ -344,7 +348,7 @@ func NewMainKubelet( klet, recorder, containerRefManager, - klet, // prober + klet.livenessManager, klet.volumeManager, imageBackOff) if err != nil { @@ -396,11 +400,14 @@ func NewMainKubelet( klet.runner = klet.containerRuntime klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient)) - klet.prober = prober.New(klet.runner, containerRefManager, recorder) klet.probeManager = prober.NewManager( klet.resyncInterval, klet.statusManager, - klet.prober) + readinessManager, + klet.livenessManager, + klet.runner, + containerRefManager, + recorder) runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime) if err != nil { @@ -508,10 +515,10 @@ type Kubelet struct { // Network plugin. networkPlugin network.NetworkPlugin - // Handles container readiness probing + // Handles container probing. probeManager prober.Manager - // TODO: Move prober ownership to the probeManager once the runtime no longer depends on it. - prober prober.Prober + // Manages container health check results. + livenessManager proberesults.Manager // How long to keep idle streaming command execution/port forwarding // connections open before terminating them @@ -1982,6 +1989,12 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler // Periodically syncs all the pods and performs cleanup tasks. glog.V(4).Infof("SyncLoop (periodic sync)") handler.HandlePodSyncs(kl.podManager.GetPods()) + case update := <-kl.livenessManager.Updates(): + // We only care about failures (signalling container death) here. + if update.Result == proberesults.Failure { + glog.V(1).Infof("SyncLoop (container unhealthy).") + handler.HandlePodSyncs([]*api.Pod{update.Pod}) + } } kl.syncLoopMonitor.Store(time.Now()) return true @@ -2831,16 +2844,6 @@ func (kl *Kubelet) GetRuntime() kubecontainer.Runtime { return kl.containerRuntime } -// Proxy prober calls through the Kubelet to break the circular dependency between the runtime & -// prober. -// TODO: Remove this hack once the runtime no longer depends on the prober. -func (kl *Kubelet) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) { - return kl.prober.ProbeLiveness(pod, status, container, containerID, createdAt) -} -func (kl *Kubelet) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { - return kl.prober.ProbeReadiness(pod, status, container, containerID) -} - var minRsrc = resource.MustParse("1k") var maxRsrc = resource.MustParse("1P") diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 92e606fb0e2..6c82c6091c8 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" @@ -134,8 +135,8 @@ func newTestKubelet(t *testing.T) *TestKubelet { t: t, } - kubelet.prober = prober.FakeProber{} kubelet.probeManager = prober.FakeManager{} + kubelet.livenessManager = proberesults.NewManager() kubelet.volumeManager = newVolumeManager() kubelet.containerManager, _ = newContainerManager(fakeContainerMgrMountInt(), mockCadvisor, "", "", "") diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index 7ff5d363b6e..38e6c4fcb41 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -37,7 +37,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/util/sets" ) @@ -152,7 +152,7 @@ func newTestDockerManager() (*dockertools.DockerManager, *dockertools.FakeDocker dockerManager := dockertools.NewFakeDockerManager( fakeDocker, fakeRecorder, - prober.FakeProber{}, + proberesults.NewManager(), containerRefManager, &cadvisorapi.MachineInfo{}, dockertools.PodInfraContainerImage, diff --git a/pkg/kubelet/prober/fake_prober.go b/pkg/kubelet/prober/fake_prober.go deleted file mode 100644 index fd18dbd05d0..00000000000 --- a/pkg/kubelet/prober/fake_prober.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package prober - -import ( - "k8s.io/kubernetes/pkg/api" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/probe" -) - -var _ Prober = FakeProber{} - -type FakeProber struct { - Readiness probe.Result - Liveness probe.Result - Error error -} - -func (f FakeProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) { - if c.LivenessProbe == nil { - return probe.Success, nil - } - return f.Liveness, f.Error -} - -func (f FakeProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) { - if c.ReadinessProbe == nil { - return probe.Success, nil - } - return f.Readiness, f.Error -} diff --git a/pkg/kubelet/prober/manager.go b/pkg/kubelet/prober/manager.go index a39089a3f5d..13045107c69 100644 --- a/pkg/kubelet/prober/manager.go +++ b/pkg/kubelet/prober/manager.go @@ -22,9 +22,11 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" + kubeutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/sets" ) @@ -53,19 +55,22 @@ type Manager interface { } type manager struct { - // Caches the results of readiness probes. - readinessCache results.Manager - - // Map of active workers for readiness - readinessProbes map[containerPath]*worker - // Lock for accessing & mutating readinessProbes + // Map of active workers for probes + workers map[probeKey]*worker + // Lock for accessing & mutating workers workerLock sync.RWMutex // The statusManager cache provides pod IP and container IDs for probing. statusManager status.Manager + // readinessManager manages the results of readiness probes + readinessManager results.Manager + + // livenessManager manages the results of liveness probes + livenessManager results.Manager + // prober executes the probe actions. - prober Prober + prober *prober // Default period for workers to execute a probe. defaultProbePeriod time.Duration @@ -74,36 +79,79 @@ type manager struct { func NewManager( defaultProbePeriod time.Duration, statusManager status.Manager, - prober Prober) Manager { + readinessManager results.Manager, + livenessManager results.Manager, + runner kubecontainer.ContainerCommandRunner, + refManager *kubecontainer.RefManager, + recorder record.EventRecorder) Manager { + prober := newProber(runner, refManager, recorder) return &manager{ defaultProbePeriod: defaultProbePeriod, statusManager: statusManager, prober: prober, - readinessCache: results.NewManager(), - readinessProbes: make(map[containerPath]*worker), + readinessManager: readinessManager, + livenessManager: livenessManager, + workers: make(map[probeKey]*worker), } } -// Key uniquely identifying containers -type containerPath struct { +// Key uniquely identifying container probes +type probeKey struct { podUID types.UID containerName string + probeType probeType +} + +// Type of probe (readiness or liveness) +type probeType int + +const ( + liveness probeType = iota + readiness +) + +// For debugging. +func (t probeType) String() string { + switch t { + case readiness: + return "Readiness" + case liveness: + return "Liveness" + default: + return "UNKNOWN" + } } func (m *manager) AddPod(pod *api.Pod) { m.workerLock.Lock() defer m.workerLock.Unlock() - key := containerPath{podUID: pod.UID} + key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name - if _, ok := m.readinessProbes[key]; ok { - glog.Errorf("Readiness probe already exists! %v - %v", - kubecontainer.GetPodFullName(pod), c.Name) - return - } + if c.ReadinessProbe != nil { - m.readinessProbes[key] = m.newWorker(pod, c) + key.probeType = readiness + if _, ok := m.workers[key]; ok { + glog.Errorf("Readiness probe already exists! %v - %v", + kubeutil.FormatPodName(pod), c.Name) + return + } + w := newWorker(m, readiness, pod, c) + m.workers[key] = w + go w.run() + } + + if c.LivenessProbe != nil { + key.probeType = liveness + if _, ok := m.workers[key]; ok { + glog.Errorf("Liveness probe already exists! %v - %v", + kubeutil.FormatPodName(pod), c.Name) + return + } + w := newWorker(m, liveness, pod, c) + m.workers[key] = w + go w.run() } } } @@ -112,11 +160,14 @@ func (m *manager) RemovePod(pod *api.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() - key := containerPath{podUID: pod.UID} + key := probeKey{podUID: pod.UID} for _, c := range pod.Spec.Containers { key.containerName = c.Name - if worker, ok := m.readinessProbes[key]; ok { - close(worker.stop) + for _, probeType := range [...]probeType{readiness, liveness} { + key.probeType = probeType + if worker, ok := m.workers[key]; ok { + close(worker.stop) + } } } } @@ -130,8 +181,8 @@ func (m *manager) CleanupPods(activePods []*api.Pod) { m.workerLock.RLock() defer m.workerLock.RUnlock() - for path, worker := range m.readinessProbes { - if _, ok := desiredPods[path.podUID]; !ok { + for key, worker := range m.workers { + if _, ok := desiredPods[key.podUID]; !ok { close(worker.stop) } } @@ -142,28 +193,27 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *api.PodStatus) { var ready bool if c.State.Running == nil { ready = false - } else if result, ok := m.readinessCache.Get( - kubecontainer.ParseContainerID(c.ContainerID)); ok { + } else if result, ok := m.readinessManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok { ready = result == results.Success } else { // The check whether there is a probe which hasn't run yet. - _, exists := m.getReadinessProbe(podUID, c.Name) + _, exists := m.getWorker(podUID, c.Name, readiness) ready = !exists } podStatus.ContainerStatuses[i].Ready = ready } } -func (m *manager) getReadinessProbe(podUID types.UID, containerName string) (*worker, bool) { +func (m *manager) getWorker(podUID types.UID, containerName string, probeType probeType) (*worker, bool) { m.workerLock.RLock() defer m.workerLock.RUnlock() - probe, ok := m.readinessProbes[containerPath{podUID, containerName}] - return probe, ok + worker, ok := m.workers[probeKey{podUID, containerName, probeType}] + return worker, ok } // Called by the worker after exiting. -func (m *manager) removeReadinessProbe(podUID types.UID, containerName string) { +func (m *manager) removeWorker(podUID types.UID, containerName string, probeType probeType) { m.workerLock.Lock() defer m.workerLock.Unlock() - delete(m.readinessProbes, containerPath{podUID, containerName}) + delete(m.workers, probeKey{podUID, containerName, probeType}) } diff --git a/pkg/kubelet/prober/manager_test.go b/pkg/kubelet/prober/manager_test.go index 9caf1cc37d0..d03151598b4 100644 --- a/pkg/kubelet/prober/manager_test.go +++ b/pkg/kubelet/prober/manager_test.go @@ -23,11 +23,13 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/unversioned/testclient" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" ) @@ -53,13 +55,13 @@ func TestAddRemovePods(t *testing.T) { Containers: []api.Container{{ Name: "no_probe1", }, { - Name: "prober1", + Name: "readiness", ReadinessProbe: &api.Probe{}, }, { Name: "no_probe2", }, { - Name: "prober2", - ReadinessProbe: &api.Probe{}, + Name: "liveness", + LivenessProbe: &api.Probe{}, }}, }, } @@ -77,7 +79,10 @@ func TestAddRemovePods(t *testing.T) { // Adding a pod with probes. m.AddPod(&probePod) - probePaths := []containerPath{{"probe_pod", "prober1"}, {"probe_pod", "prober2"}} + probePaths := []probeKey{ + {"probe_pod", "readiness", readiness}, + {"probe_pod", "liveness", liveness}, + } if err := expectProbes(m, probePaths); err != nil { t.Error(err) } @@ -115,8 +120,8 @@ func TestCleanupPods(t *testing.T) { Name: "prober1", ReadinessProbe: &api.Probe{}, }, { - Name: "prober2", - ReadinessProbe: &api.Probe{}, + Name: "prober2", + LivenessProbe: &api.Probe{}, }}, }, } @@ -129,8 +134,8 @@ func TestCleanupPods(t *testing.T) { Name: "prober1", ReadinessProbe: &api.Probe{}, }, { - Name: "prober2", - ReadinessProbe: &api.Probe{}, + Name: "prober2", + LivenessProbe: &api.Probe{}, }}, }, } @@ -139,8 +144,14 @@ func TestCleanupPods(t *testing.T) { m.CleanupPods([]*api.Pod{&podToKeep}) - removedProbes := []containerPath{{"pod_cleanup", "prober1"}, {"pod_cleanup", "prober2"}} - expectedProbes := []containerPath{{"pod_keep", "prober1"}, {"pod_keep", "prober2"}} + removedProbes := []probeKey{ + {"pod_cleanup", "prober1", readiness}, + {"pod_cleanup", "prober2", liveness}, + } + expectedProbes := []probeKey{ + {"pod_keep", "prober1", readiness}, + {"pod_keep", "prober2", liveness}, + } if err := waitForWorkerExit(m, removedProbes); err != nil { t.Fatal(err) } @@ -195,28 +206,28 @@ func TestUpdatePodStatus(t *testing.T) { m := newTestManager() // Setup probe "workers" and cached results. - m.readinessProbes = map[containerPath]*worker{ - containerPath{podUID, probedReady.Name}: {}, - containerPath{podUID, probedPending.Name}: {}, - containerPath{podUID, probedUnready.Name}: {}, - containerPath{podUID, terminated.Name}: {}, + m.workers = map[probeKey]*worker{ + probeKey{podUID, unprobed.Name, liveness}: {}, + probeKey{podUID, probedReady.Name, readiness}: {}, + probeKey{podUID, probedPending.Name, readiness}: {}, + probeKey{podUID, probedUnready.Name, readiness}: {}, + probeKey{podUID, terminated.Name, readiness}: {}, } - - m.readinessCache.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success) - m.readinessCache.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure) - m.readinessCache.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success) + m.readinessManager.Set(kubecontainer.ParseContainerID(probedReady.ContainerID), results.Success, nil) + m.readinessManager.Set(kubecontainer.ParseContainerID(probedUnready.ContainerID), results.Failure, nil) + m.readinessManager.Set(kubecontainer.ParseContainerID(terminated.ContainerID), results.Success, nil) m.UpdatePodStatus(podUID, &podStatus) - expectedReadiness := map[containerPath]bool{ - containerPath{podUID, unprobed.Name}: true, - containerPath{podUID, probedReady.Name}: true, - containerPath{podUID, probedPending.Name}: false, - containerPath{podUID, probedUnready.Name}: false, - containerPath{podUID, terminated.Name}: false, + expectedReadiness := map[probeKey]bool{ + probeKey{podUID, unprobed.Name, readiness}: true, + probeKey{podUID, probedReady.Name, readiness}: true, + probeKey{podUID, probedPending.Name, readiness}: false, + probeKey{podUID, probedUnready.Name, readiness}: false, + probeKey{podUID, terminated.Name, readiness}: false, } for _, c := range podStatus.ContainerStatuses { - expected, ok := expectedReadiness[containerPath{podUID, c.Name}] + expected, ok := expectedReadiness[probeKey{podUID, c.Name, readiness}] if !ok { t.Fatalf("Missing expectation for test case: %v", c.Name) } @@ -227,16 +238,16 @@ func TestUpdatePodStatus(t *testing.T) { } } -func expectProbes(m *manager, expectedReadinessProbes []containerPath) error { +func expectProbes(m *manager, expectedProbes []probeKey) error { m.workerLock.RLock() defer m.workerLock.RUnlock() - var unexpected []containerPath - missing := make([]containerPath, len(expectedReadinessProbes)) - copy(missing, expectedReadinessProbes) + var unexpected []probeKey + missing := make([]probeKey, len(expectedProbes)) + copy(missing, expectedProbes) outer: - for probePath := range m.readinessProbes { + for probePath := range m.workers { for i, expectedPath := range missing { if probePath == expectedPath { missing = append(missing[:i], missing[i+1:]...) @@ -255,26 +266,34 @@ outer: func newTestManager() *manager { const probePeriod = 1 - statusManager := status.NewManager(&testclient.Fake{}) - prober := FakeProber{Readiness: probe.Success} - return NewManager(probePeriod, statusManager, prober).(*manager) + m := NewManager( + probePeriod, + status.NewManager(&testclient.Fake{}), + results.NewManager(), + results.NewManager(), + nil, // runner + kubecontainer.NewRefManager(), + &record.FakeRecorder{}, + ).(*manager) + // Don't actually execute probes. + m.prober.exec = fakeExecProber{probe.Success, nil} + return m } // Wait for the given workers to exit & clean up. -func waitForWorkerExit(m *manager, workerPaths []containerPath) error { +func waitForWorkerExit(m *manager, workerPaths []probeKey) error { const interval = 100 * time.Millisecond - const timeout = 30 * time.Second for _, w := range workerPaths { condition := func() (bool, error) { - _, exists := m.getReadinessProbe(w.podUID, w.containerName) + _, exists := m.getWorker(w.podUID, w.containerName, w.probeType) return !exists, nil } if exited, _ := condition(); exited { continue // Already exited, no need to poll. } glog.Infof("Polling %v", w) - if err := wait.Poll(interval, timeout, condition); err != nil { + if err := wait.Poll(interval, util.ForeverTestTimeout, condition); err != nil { return err } } diff --git a/pkg/kubelet/prober/prober.go b/pkg/kubelet/prober/prober.go index befdcc18b0e..b57e1203872 100644 --- a/pkg/kubelet/prober/prober.go +++ b/pkg/kubelet/prober/prober.go @@ -39,12 +39,6 @@ import ( const maxProbeRetries = 3 -// Prober checks the healthiness of a container. -type Prober interface { - ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) - ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) -} - // Prober helps to check the liveness/readiness of a container. type prober struct { exec execprobe.ExecProber @@ -58,10 +52,10 @@ type prober struct { // NewProber creates a Prober, it takes a command runner and // several container info managers. -func New( +func newProber( runner kubecontainer.ContainerCommandRunner, refManager *kubecontainer.RefManager, - recorder record.EventRecorder) Prober { + recorder record.EventRecorder) *prober { return &prober{ exec: execprobe.New(), @@ -73,9 +67,19 @@ func New( } } -// ProbeLiveness probes the liveness of a container. -// If the initalDelay since container creation on liveness probe has not passed the probe will return probe.Success. -func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID, createdAt int64) (probe.Result, error) { +// probe probes the container. +func (pb *prober) probe(probeType probeType, pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { + switch probeType { + case readiness: + return pb.probeReadiness(pod, status, container, containerID) + case liveness: + return pb.probeLiveness(pod, status, container, containerID) + } + return probe.Unknown, fmt.Errorf("Unknown probe type: %q", probeType) +} + +// probeLiveness probes the liveness of a container. +func (pb *prober) probeLiveness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { var live probe.Result var output string var err error @@ -83,11 +87,7 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap if p == nil { return probe.Success, nil } - if time.Now().Unix()-createdAt < p.InitialDelaySeconds { - return probe.Success, nil - } else { - live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) - } + live, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name) if err != nil || live != probe.Success { // Liveness failed in one way or another. @@ -113,17 +113,16 @@ func (pb *prober) ProbeLiveness(pod *api.Pod, status api.PodStatus, container ap return probe.Success, nil } -// ProbeReadiness probes and sets the readiness of a container. -func (pb *prober) ProbeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { +// probeReadiness probes and sets the readiness of a container. +func (pb *prober) probeReadiness(pod *api.Pod, status api.PodStatus, container api.Container, containerID kubecontainer.ContainerID) (probe.Result, error) { var ready probe.Result var output string var err error p := container.ReadinessProbe if p == nil { - ready = probe.Success - } else { - ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) + return probe.Success, nil } + ready, output, err = pb.runProbeWithRetries(p, pod, status, container, containerID, maxProbeRetries) ctrName := fmt.Sprintf("%s:%s", kubecontainer.GetPodFullName(pod), container.Name) if err != nil || ready == probe.Failure { // Readiness failed in one way or another. diff --git a/pkg/kubelet/prober/prober_test.go b/pkg/kubelet/prober/prober_test.go index ebf62c59ebf..954b9ddf977 100644 --- a/pkg/kubelet/prober/prober_test.go +++ b/pkg/kubelet/prober/prober_test.go @@ -19,7 +19,6 @@ package prober import ( "errors" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" @@ -184,7 +183,6 @@ func TestProbeContainer(t *testing.T) { recorder: &record.FakeRecorder{}, } containerID := kubecontainer.ContainerID{"test", "foobar"} - createdAt := time.Now().Unix() tests := []struct { testContainer api.Container @@ -201,14 +199,7 @@ func TestProbeContainer(t *testing.T) { // Only LivenessProbe. expectedReadiness should always be true here. { testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, - }, - expectedLiveness: probe.Success, - expectedReadiness: probe.Success, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, + LivenessProbe: &api.Probe{}, }, expectedLiveness: probe.Unknown, expectedReadiness: probe.Success, @@ -216,7 +207,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -228,7 +218,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -240,7 +229,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -252,7 +240,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -265,7 +252,7 @@ func TestProbeContainer(t *testing.T) { // // Only ReadinessProbe. expectedLiveness should always be probe.Success here. { testContainer: api.Container{ - ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, + ReadinessProbe: &api.Probe{}, }, expectedLiveness: probe.Success, expectedReadiness: probe.Unknown, @@ -273,7 +260,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -285,7 +271,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -297,7 +282,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -309,7 +293,6 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -322,32 +305,8 @@ func TestProbeContainer(t *testing.T) { // Both LivenessProbe and ReadinessProbe. { testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, - }, - expectedLiveness: probe.Success, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: 100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, - }, - expectedLiveness: probe.Success, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: 100}, - }, - expectedLiveness: probe.Unknown, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{InitialDelaySeconds: -100}, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, + LivenessProbe: &api.Probe{}, + ReadinessProbe: &api.Probe{}, }, expectedLiveness: probe.Unknown, expectedReadiness: probe.Unknown, @@ -355,25 +314,11 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, }, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, - }, - expectedLiveness: probe.Unknown, - expectedReadiness: probe.Unknown, - }, - { - testContainer: api.Container{ - LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, - Handler: api.Handler{ - Exec: &api.ExecAction{}, - }, - }, - ReadinessProbe: &api.Probe{InitialDelaySeconds: -100}, + ReadinessProbe: &api.Probe{}, }, expectedLiveness: probe.Failure, expectedReadiness: probe.Unknown, @@ -381,13 +326,11 @@ func TestProbeContainer(t *testing.T) { { testContainer: api.Container{ LivenessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, }, ReadinessProbe: &api.Probe{ - InitialDelaySeconds: -100, Handler: api.Handler{ Exec: &api.ExecAction{}, }, @@ -405,7 +348,7 @@ func TestProbeContainer(t *testing.T) { prober.exec = fakeExecProber{test.expectedLiveness, nil} } - liveness, err := prober.ProbeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID, createdAt) + liveness, err := prober.probeLiveness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID) if test.expectError && err == nil { t.Errorf("[%d] Expected liveness probe error but no error was returned.", i) } @@ -418,7 +361,7 @@ func TestProbeContainer(t *testing.T) { // TODO: Test readiness errors prober.exec = fakeExecProber{test.expectedReadiness, nil} - readiness, err := prober.ProbeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID) + readiness, err := prober.probeReadiness(&api.Pod{}, api.PodStatus{}, test.testContainer, containerID) if err != nil { t.Errorf("[%d] Unexpected readiness probe error: %v", i, err) } diff --git a/pkg/kubelet/prober/results/results_manager.go b/pkg/kubelet/prober/results/results_manager.go index 208d3d5ffd4..eb55f71e85d 100644 --- a/pkg/kubelet/prober/results/results_manager.go +++ b/pkg/kubelet/prober/results/results_manager.go @@ -19,17 +19,23 @@ package results import ( "sync" + "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) -// Manager provides a probe results cache. +// Manager provides a probe results cache and channel of updates. type Manager interface { // Get returns the cached result for the container with the given ID. - Get(id kubecontainer.ContainerID) (Result, bool) + Get(kubecontainer.ContainerID) (Result, bool) // Set sets the cached result for the container with the given ID. - Set(id kubecontainer.ContainerID, result Result) + // The pod is only included to be sent with the update. + Set(kubecontainer.ContainerID, Result, *api.Pod) // Remove clears the cached result for the container with the given ID. - Remove(id kubecontainer.ContainerID) + Remove(kubecontainer.ContainerID) + // Updates creates a channel that receives an Update whenever its result changes (but not + // removed). + // NOTE: The current implementation only supports a single updates channel. + Updates() <-chan Update } // Result is the type for probe results. @@ -51,19 +57,36 @@ func (r Result) String() string { } } +// Update is an enum of the types of updates sent over the Updates channel. +type Update struct { + ContainerID kubecontainer.ContainerID + Result Result + Pod *api.Pod +} + // Manager implementation. type manager struct { // guards the cache sync.RWMutex // map of container ID -> probe Result cache map[kubecontainer.ContainerID]Result + // channel of updates (may be nil) + updates chan Update } var _ Manager = &manager{} // NewManager creates ane returns an empty results manager. func NewManager() Manager { - return &manager{cache: make(map[kubecontainer.ContainerID]Result)} + m := &manager{cache: make(map[kubecontainer.ContainerID]Result)} + return m +} + +// NewManager creates ane returns an empty results manager. +func NewManagerWithUpdates() Manager { + m := NewManager().(*manager) + m.updates = make(chan Update, 20) + return m } func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { @@ -73,13 +96,22 @@ func (m *manager) Get(id kubecontainer.ContainerID) (Result, bool) { return result, found } -func (m *manager) Set(id kubecontainer.ContainerID, result Result) { +func (m *manager) Set(id kubecontainer.ContainerID, result Result, pod *api.Pod) { + if m.setInternal(id, result) { + m.pushUpdate(Update{id, result, pod}) + } +} + +// Internal helper for locked portion of set. Returns whether an update should be sent. +func (m *manager) setInternal(id kubecontainer.ContainerID, result Result) bool { m.Lock() defer m.Unlock() prev, exists := m.cache[id] if !exists || prev != result { m.cache[id] = result + return true } + return false } func (m *manager) Remove(id kubecontainer.ContainerID) { @@ -87,3 +119,14 @@ func (m *manager) Remove(id kubecontainer.ContainerID) { defer m.Unlock() delete(m.cache, id) } + +func (m *manager) Updates() <-chan Update { + return m.updates +} + +// pushUpdates sends an update on the updates channel if it is initialized. +func (m *manager) pushUpdate(update Update) { + if m.updates != nil { + m.updates <- update + } +} diff --git a/pkg/kubelet/prober/results/results_manager_test.go b/pkg/kubelet/prober/results/results_manager_test.go index d815c1a84b8..24b131958b7 100644 --- a/pkg/kubelet/prober/results/results_manager_test.go +++ b/pkg/kubelet/prober/results/results_manager_test.go @@ -18,9 +18,12 @@ package results import ( "testing" + "time" "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/pkg/api" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util" ) func TestCacheOperations(t *testing.T) { @@ -32,7 +35,7 @@ func TestCacheOperations(t *testing.T) { _, found := m.Get(unsetID) assert.False(t, found, "unset result found") - m.Set(setID, Success) + m.Set(setID, Success, nil) result, found := m.Get(setID) assert.True(t, result == Success, "set result") assert.True(t, found, "set result found") @@ -41,3 +44,55 @@ func TestCacheOperations(t *testing.T) { _, found = m.Get(setID) assert.False(t, found, "removed result found") } + +func TestUpdates(t *testing.T) { + m := NewManagerWithUpdates() + + pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "test-pod"}} + fooID := kubecontainer.ContainerID{"test", "foo"} + barID := kubecontainer.ContainerID{"test", "bar"} + + expectUpdate := func(expected Update, msg string) { + select { + case u := <-m.Updates(): + if expected != u { + t.Errorf("Expected update %v, recieved %v: %s %s", expected, u, msg) + } + case <-time.After(util.ForeverTestTimeout): + t.Errorf("Timed out waiting for update %v: %s", expected, msg) + } + } + + expectNoUpdate := func(msg string) { + // NOTE: Since updates are accumulated asynchronously, this method is not guaranteed to fail + // when it should. In the event it misses a failure, the following calls to expectUpdate should + // still fail. + select { + case u := <-m.Updates(): + t.Errorf("Unexpected update %v: %s", u, msg) + default: + // Pass + } + } + + // New result should always push an update. + m.Set(fooID, Success, pod) + expectUpdate(Update{fooID, Success, pod}, "new success") + + m.Set(barID, Failure, pod) + expectUpdate(Update{barID, Failure, pod}, "new failure") + + // Unchanged results should not send an update. + m.Set(fooID, Success, pod) + expectNoUpdate("unchanged foo") + + m.Set(barID, Failure, pod) + expectNoUpdate("unchanged bar") + + // Changed results should send an update. + m.Set(fooID, Failure, pod) + expectUpdate(Update{fooID, Failure, pod}, "changed foo") + + m.Set(barID, Success, pod) + expectUpdate(Update{barID, Success, pod}, "changed bar") +} diff --git a/pkg/kubelet/prober/worker.go b/pkg/kubelet/prober/worker.go index 480218efe11..996b85d0045 100644 --- a/pkg/kubelet/prober/worker.go +++ b/pkg/kubelet/prober/worker.go @@ -32,7 +32,6 @@ import ( // associated with it which runs the probe loop until the container permanently terminates, or the // stop channel is closed. The worker uses the probe Manager's statusManager to get up-to-date // container IDs. -// TODO: Handle liveness probing type worker struct { // Channel for stopping the probe, it should be closed to trigger a stop. stop chan struct{} @@ -46,44 +45,65 @@ type worker struct { // Describes the probe configuration (read-only) spec *api.Probe + // The type of the worker. + probeType probeType + + // The probe value during the initial delay. + initialValue results.Result + + // Where to store this workers results. + resultsManager results.Manager + probeManager *manager + // The last known container ID for this worker. containerID kubecontainer.ContainerID } // Creates and starts a new probe worker. -func (m *manager) newWorker( +func newWorker( + m *manager, + probeType probeType, pod *api.Pod, container api.Container) *worker { w := &worker{ - stop: make(chan struct{}), - pod: pod, - container: container, - spec: container.ReadinessProbe, + stop: make(chan struct{}), + pod: pod, + container: container, + probeType: probeType, + probeManager: m, } - // Start the worker thread. - go run(m, w) + switch probeType { + case readiness: + w.spec = container.ReadinessProbe + w.resultsManager = m.readinessManager + w.initialValue = results.Failure + case liveness: + w.spec = container.LivenessProbe + w.resultsManager = m.livenessManager + w.initialValue = results.Success + } return w } // run periodically probes the container. -func run(m *manager, w *worker) { - probeTicker := time.NewTicker(m.defaultProbePeriod) +func (w *worker) run() { + probeTicker := time.NewTicker(w.probeManager.defaultProbePeriod) defer func() { // Clean up. probeTicker.Stop() if !w.containerID.IsEmpty() { - m.readinessCache.Remove(w.containerID) + w.resultsManager.Remove(w.containerID) } - m.removeReadinessProbe(w.pod.UID, w.container.Name) + w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType) }() probeLoop: - for doProbe(m, w) { + for w.doProbe() { // Wait for next probe tick. select { case <-w.stop: @@ -96,10 +116,10 @@ probeLoop: // doProbe probes the container once and records the result. // Returns whether the worker should continue. -func doProbe(m *manager, w *worker) (keepGoing bool) { +func (w *worker) doProbe() (keepGoing bool) { defer util.HandleCrash(func(_ interface{}) { keepGoing = true }) - status, ok := m.statusManager.GetPodStatus(w.pod.UID) + status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID) if !ok { // Either the pod has not been created yet, or it was already deleted. glog.V(3).Infof("No status for pod: %v", kubeletutil.FormatPodName(w.pod)) @@ -123,7 +143,7 @@ func doProbe(m *manager, w *worker) (keepGoing bool) { if w.containerID.String() != c.ContainerID { if !w.containerID.IsEmpty() { - m.readinessCache.Remove(w.containerID) + w.resultsManager.Remove(w.containerID) } w.containerID = kubecontainer.ParseContainerID(c.ContainerID) } @@ -131,22 +151,23 @@ func doProbe(m *manager, w *worker) (keepGoing bool) { if c.State.Running == nil { glog.V(3).Infof("Non-running container probed: %v - %v", kubeletutil.FormatPodName(w.pod), w.container.Name) - m.readinessCache.Set(w.containerID, results.Failure) + if !w.containerID.IsEmpty() { + w.resultsManager.Set(w.containerID, results.Failure, w.pod) + } // Abort if the container will not be restarted. return c.State.Terminated == nil || w.pod.Spec.RestartPolicy != api.RestartPolicyNever } if int64(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds { - // Readiness defaults to false during the initial delay. - m.readinessCache.Set(w.containerID, results.Failure) + w.resultsManager.Set(w.containerID, w.initialValue, w.pod) return true } // TODO: Move error handling out of prober. - result, _ := m.prober.ProbeReadiness(w.pod, status, w.container, w.containerID) + result, _ := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID) if result != probe.Unknown { - m.readinessCache.Set(w.containerID, result != probe.Failure) + w.resultsManager.Set(w.containerID, result != probe.Failure, w.pod) } return true diff --git a/pkg/kubelet/prober/worker_test.go b/pkg/kubelet/prober/worker_test.go index 3a7a6262f73..7006dc4c428 100644 --- a/pkg/kubelet/prober/worker_test.go +++ b/pkg/kubelet/prober/worker_test.go @@ -17,14 +17,19 @@ limitations under the License. package prober import ( + "fmt" "testing" "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/probe" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/exec" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -52,12 +57,11 @@ func TestDoProbe(t *testing.T) { failedStatus.Phase = api.PodFailed tests := []struct { - probe api.Probe - podStatus *api.PodStatus - - expectContinue bool - expectReadySet bool - expectedReadiness results.Result + probe api.Probe + podStatus *api.PodStatus + expectContinue bool + expectSet bool + expectedResult results.Result }{ { // No status. expectContinue: true, @@ -72,136 +76,158 @@ func TestDoProbe(t *testing.T) { { // Container waiting podStatus: &pendingStatus, expectContinue: true, - expectReadySet: true, + expectSet: true, }, { // Container terminated - podStatus: &terminatedStatus, - expectReadySet: true, + podStatus: &terminatedStatus, + expectSet: true, }, { // Probe successful. - podStatus: &runningStatus, - expectContinue: true, - expectReadySet: true, - expectedReadiness: results.Success, + podStatus: &runningStatus, + expectContinue: true, + expectSet: true, + expectedResult: results.Success, }, { // Initial delay passed podStatus: &runningStatus, probe: api.Probe{ InitialDelaySeconds: -100, }, - expectContinue: true, - expectReadySet: true, - expectedReadiness: results.Success, + expectContinue: true, + expectSet: true, + expectedResult: results.Success, }, } - for i, test := range tests { - w := newTestWorker(test.probe) - if test.podStatus != nil { - m.statusManager.SetPodStatus(w.pod, *test.podStatus) - } - if c := doProbe(m, w); c != test.expectContinue { - t.Errorf("[%d] Expected continue to be %v but got %v", i, test.expectContinue, c) - } - ready, ok := m.readinessCache.Get(containerID) - if ok != test.expectReadySet { - t.Errorf("[%d] Expected to have readiness: %v but got %v", i, test.expectReadySet, ok) - } - if ready != test.expectedReadiness { - t.Errorf("[%d] Expected readiness: %v but got %v", i, test.expectedReadiness, ready) - } + for _, probeType := range [...]probeType{liveness, readiness} { + for i, test := range tests { + w := newTestWorker(m, probeType, test.probe) + if test.podStatus != nil { + m.statusManager.SetPodStatus(w.pod, *test.podStatus) + } + if c := w.doProbe(); c != test.expectContinue { + t.Errorf("[%s-%d] Expected continue to be %v but got %v", probeType, i, test.expectContinue, c) + } + result, ok := resultsManager(m, probeType).Get(containerID) + if ok != test.expectSet { + t.Errorf("[%s-%d] Expected to have result: %v but got %v", probeType, i, test.expectSet, ok) + } + if result != test.expectedResult { + t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result) + } - // Clean up. - m.statusManager.DeletePodStatus(podUID) - m.readinessCache.Remove(containerID) + // Clean up. + m.statusManager.DeletePodStatus(podUID) + resultsManager(m, probeType).Remove(containerID) + } } } func TestInitialDelay(t *testing.T) { m := newTestManager() - w := newTestWorker(api.Probe{ - InitialDelaySeconds: 10, - }) - m.statusManager.SetPodStatus(w.pod, getRunningStatus()) - if !doProbe(m, w) { - t.Errorf("Expected to continue, but did not") - } + for _, probeType := range [...]probeType{liveness, readiness} { + w := newTestWorker(m, probeType, api.Probe{ + InitialDelaySeconds: 10, + }) + m.statusManager.SetPodStatus(w.pod, getRunningStatus()) - ready, ok := m.readinessCache.Get(containerID) - if !ok { - t.Errorf("Expected readiness to be false, but was not set") - } else if ready { - t.Errorf("Expected readiness to be false, but was true") - } + if !w.doProbe() { + t.Errorf("[%s] Expected to continue, but did not", probeType) + } - // 100 seconds later... - laterStatus := getRunningStatus() - laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = - time.Now().Add(-100 * time.Second) - m.statusManager.SetPodStatus(w.pod, laterStatus) + expectedResult := results.Result(probeType == liveness) + result, ok := resultsManager(m, probeType).Get(containerID) + if !ok { + t.Errorf("[%s] Expected result to be set during initial delay, but was not set", probeType) + } else if result != expectedResult { + t.Errorf("[%s] Expected result to be %v during initial delay, but was %v", + probeType, expectedResult, result) + } - // Second call should succeed (already waited). - if !doProbe(m, w) { - t.Errorf("Expected to continue, but did not") - } + // 100 seconds later... + laterStatus := getRunningStatus() + laterStatus.ContainerStatuses[0].State.Running.StartedAt.Time = + time.Now().Add(-100 * time.Second) + m.statusManager.SetPodStatus(w.pod, laterStatus) - ready, ok = m.readinessCache.Get(containerID) - if !ok { - t.Errorf("Expected readiness to be true, but was not set") - } else if !ready { - t.Errorf("Expected readiness to be true, but was false") + // Second call should succeed (already waited). + if !w.doProbe() { + t.Errorf("[%s] Expected to continue, but did not", probeType) + } + + result, ok = resultsManager(m, probeType).Get(containerID) + if !ok { + t.Errorf("[%s] Expected result to be true, but was not set", probeType) + } else if !result { + t.Errorf("[%s] Expected result to be true, but was false", probeType) + } } } func TestCleanUp(t *testing.T) { m := newTestManager() - pod := getTestPod(api.Probe{}) - m.statusManager.SetPodStatus(&pod, getRunningStatus()) - m.readinessCache.Set(containerID, results.Success) - w := m.newWorker(&pod, pod.Spec.Containers[0]) - m.readinessProbes[containerPath{podUID, containerName}] = w - if ready, _ := m.readinessCache.Get(containerID); !ready { - t.Fatal("Expected readiness to be true.") - } + for _, probeType := range [...]probeType{liveness, readiness} { + key := probeKey{podUID, containerName, probeType} + w := newTestWorker(m, probeType, api.Probe{}) + m.statusManager.SetPodStatus(w.pod, getRunningStatus()) + go w.run() + m.workers[key] = w - close(w.stop) - if err := waitForWorkerExit(m, []containerPath{{podUID, containerName}}); err != nil { - t.Fatal(err) - } + // Wait for worker to run. + condition := func() (bool, error) { + ready, _ := resultsManager(m, probeType).Get(containerID) + return ready == results.Success, nil + } + if ready, _ := condition(); !ready { + if err := wait.Poll(100*time.Millisecond, util.ForeverTestTimeout, condition); err != nil { + t.Fatalf("[%s] Error waiting for worker ready: %v", probeType, err) + } + } - if _, ok := m.readinessCache.Get(containerID); ok { - t.Error("Expected readiness to be cleared.") - } - if _, ok := m.readinessProbes[containerPath{podUID, containerName}]; ok { - t.Error("Expected worker to be cleared.") + close(w.stop) + if err := waitForWorkerExit(m, []probeKey{key}); err != nil { + t.Fatalf("[%s] error waiting for worker exit: %v", probeType, err) + } + + if _, ok := resultsManager(m, probeType).Get(containerID); ok { + t.Errorf("[%s] Expected result to be cleared.", probeType) + } + if _, ok := m.workers[key]; ok { + t.Errorf("[%s] Expected worker to be cleared.", probeType) + } } } func TestHandleCrash(t *testing.T) { m := newTestManager() - m.prober = CrashingProber{} - w := newTestWorker(api.Probe{}) + m.prober = &prober{ + refManager: kubecontainer.NewRefManager(), + recorder: &record.FakeRecorder{}, + exec: crashingExecProber{}, + } + + w := newTestWorker(m, readiness, api.Probe{}) m.statusManager.SetPodStatus(w.pod, getRunningStatus()) // doProbe should recover from the crash, and keep going. - if !doProbe(m, w) { + if !w.doProbe() { t.Error("Expected to keep going, but terminated.") } - if _, ok := m.readinessCache.Get(containerID); ok { + if _, ok := m.readinessManager.Get(containerID); ok { t.Error("Expected readiness to be unchanged from crash.") } } -func newTestWorker(probeSpec api.Probe) *worker { - pod := getTestPod(probeSpec) - return &worker{ - stop: make(chan struct{}), - pod: &pod, - container: pod.Spec.Containers[0], - spec: &probeSpec, +func newTestWorker(m *manager, probeType probeType, probeSpec api.Probe) *worker { + // All tests rely on the fake exec prober. + probeSpec.Handler = api.Handler{ + Exec: &api.ExecAction{}, } + + pod := getTestPod(probeType, probeSpec) + return newWorker(m, probeType, &pod, pod.Spec.Containers[0]) } func getRunningStatus() api.PodStatus { @@ -217,10 +243,15 @@ func getRunningStatus() api.PodStatus { return podStatus } -func getTestPod(probeSpec api.Probe) api.Pod { +func getTestPod(probeType probeType, probeSpec api.Probe) api.Pod { container := api.Container{ - Name: containerName, - ReadinessProbe: &probeSpec, + Name: containerName, + } + switch probeType { + case readiness: + container.ReadinessProbe = &probeSpec + case liveness: + container.LivenessProbe = &probeSpec } pod := api.Pod{ Spec: api.PodSpec{ @@ -232,12 +263,18 @@ func getTestPod(probeSpec api.Probe) api.Pod { return pod } -type CrashingProber struct{} - -func (f CrashingProber) ProbeLiveness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID, _ int64) (probe.Result, error) { - panic("Intentional ProbeLiveness crash.") +func resultsManager(m *manager, probeType probeType) results.Manager { + switch probeType { + case readiness: + return m.readinessManager + case liveness: + return m.livenessManager + } + panic(fmt.Errorf("Unhandled case: %v", probeType)) } -func (f CrashingProber) ProbeReadiness(_ *api.Pod, _ api.PodStatus, c api.Container, _ kubecontainer.ContainerID) (probe.Result, error) { - panic("Intentional ProbeReadiness crash.") +type crashingExecProber struct{} + +func (p crashingExecProber) Probe(_ exec.Cmd) (probe.Result, string, error) { + panic("Intentional Probe crash.") } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 1865a7b456c..23a59ffedc2 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -41,9 +41,8 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" - "k8s.io/kubernetes/pkg/kubelet/prober" + proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" kubeletutil "k8s.io/kubernetes/pkg/kubelet/util" - "k8s.io/kubernetes/pkg/probe" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" @@ -89,7 +88,7 @@ type Runtime struct { containerRefManager *kubecontainer.RefManager generator kubecontainer.RunContainerOptionsGenerator recorder record.EventRecorder - prober prober.Prober + livenessManager proberesults.Manager volumeGetter volumeGetter imagePuller kubecontainer.ImagePuller } @@ -108,8 +107,9 @@ func New(config *Config, generator kubecontainer.RunContainerOptionsGenerator, recorder record.EventRecorder, containerRefManager *kubecontainer.RefManager, - prober prober.Prober, - volumeGetter volumeGetter, imageBackOff *util.Backoff) (*Runtime, error) { + livenessManager proberesults.Manager, + volumeGetter volumeGetter, + imageBackOff *util.Backoff) (*Runtime, error) { systemdVersion, err := getSystemdVersion() if err != nil { @@ -146,7 +146,7 @@ func New(config *Config, containerRefManager: containerRefManager, generator: generator, recorder: recorder, - prober: prober, + livenessManager: livenessManager, volumeGetter: volumeGetter, } rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) @@ -1032,17 +1032,13 @@ func (r *Runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus break } - result, err := r.prober.ProbeLiveness(pod, podStatus, container, c.ID, c.Created) - // TODO(vmarmol): examine this logic. - if err == nil && result != probe.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever { - glog.Infof("Pod %q container %q is unhealthy (probe result: %v), it will be killed and re-created.", podFullName, container.Name, result) + liveness, found := r.livenessManager.Get(c.ID) + if found && liveness != proberesults.Success && pod.Spec.RestartPolicy != api.RestartPolicyNever { + glog.Infof("Pod %q container %q is unhealthy, it will be killed and re-created.", podFullName, container.Name) restartPod = true break } - if err != nil { - glog.V(2).Infof("Probe container %q failed: %v", container.Name, err) - } delete(unidentifiedContainers, c.ID) }