startupProbe: Kubelet changes
This commit is contained in:
parent
e4d26f845e
commit
323f99ea8c
@ -36,7 +36,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/probe"
|
||||
execprobe "k8s.io/kubernetes/pkg/probe/exec"
|
||||
httpprobe "k8s.io/kubernetes/pkg/probe/http"
|
||||
tcprobe "k8s.io/kubernetes/pkg/probe/tcp"
|
||||
tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
|
||||
"k8s.io/utils/exec"
|
||||
|
||||
"k8s.io/klog"
|
||||
@ -44,7 +44,7 @@ import (
|
||||
|
||||
const maxProbeRetries = 3
|
||||
|
||||
// Prober helps to check the liveness/readiness of a container.
|
||||
// Prober helps to check the liveness/readiness/startup of a container.
|
||||
type prober struct {
|
||||
exec execprobe.Prober
|
||||
// probe types needs different httprobe instances so they don't
|
||||
@ -52,7 +52,8 @@ type prober struct {
|
||||
// same host:port and transient failures. See #49740.
|
||||
readinessHTTP httpprobe.Prober
|
||||
livenessHTTP httpprobe.Prober
|
||||
tcp tcprobe.Prober
|
||||
startupHTTP httpprobe.Prober
|
||||
tcp tcpprobe.Prober
|
||||
runner kubecontainer.ContainerCommandRunner
|
||||
|
||||
refManager *kubecontainer.RefManager
|
||||
@ -71,7 +72,8 @@ func newProber(
|
||||
exec: execprobe.New(),
|
||||
readinessHTTP: httpprobe.New(followNonLocalRedirects),
|
||||
livenessHTTP: httpprobe.New(followNonLocalRedirects),
|
||||
tcp: tcprobe.New(),
|
||||
startupHTTP: httpprobe.New(followNonLocalRedirects),
|
||||
tcp: tcpprobe.New(),
|
||||
runner: runner,
|
||||
refManager: refManager,
|
||||
recorder: recorder,
|
||||
@ -86,6 +88,8 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c
|
||||
probeSpec = container.ReadinessProbe
|
||||
case liveness:
|
||||
probeSpec = container.LivenessProbe
|
||||
case startup:
|
||||
probeSpec = container.StartupProbe
|
||||
default:
|
||||
return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
|
||||
}
|
||||
@ -174,11 +178,14 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
|
||||
url := formatURL(scheme, host, port, path)
|
||||
headers := buildHeader(p.HTTPGet.HTTPHeaders)
|
||||
klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
|
||||
if probeType == liveness {
|
||||
switch probeType {
|
||||
case liveness:
|
||||
return pb.livenessHTTP.Probe(url, headers, timeout)
|
||||
case startup:
|
||||
return pb.startupHTTP.Probe(url, headers, timeout)
|
||||
default:
|
||||
return pb.readinessHTTP.Probe(url, headers, timeout)
|
||||
}
|
||||
// readiness
|
||||
return pb.readinessHTTP.Probe(url, headers, timeout)
|
||||
}
|
||||
if p.TCPSocket != nil {
|
||||
port, err := extractPort(p.TCPSocket.Port, container)
|
||||
|
@ -23,9 +23,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/component-base/metrics"
|
||||
"k8s.io/klog"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/status"
|
||||
@ -37,7 +39,7 @@ var ProberResults = metrics.NewCounterVec(
|
||||
&metrics.CounterOpts{
|
||||
Subsystem: "prober",
|
||||
Name: "probe_total",
|
||||
Help: "Cumulative number of a liveness or readiness probe for a container by result.",
|
||||
Help: "Cumulative number of a liveness, readiness or startup probe for a container by result.",
|
||||
StabilityLevel: metrics.ALPHA,
|
||||
},
|
||||
[]string{"probe_type",
|
||||
@ -89,6 +91,9 @@ type manager struct {
|
||||
// livenessManager manages the results of liveness probes
|
||||
livenessManager results.Manager
|
||||
|
||||
// startupManager manages the results of startup probes
|
||||
startupManager results.Manager
|
||||
|
||||
// prober executes the probe actions.
|
||||
prober *prober
|
||||
}
|
||||
@ -103,11 +108,13 @@ func NewManager(
|
||||
|
||||
prober := newProber(runner, refManager, recorder)
|
||||
readinessManager := results.NewManager()
|
||||
startupManager := results.NewManager()
|
||||
return &manager{
|
||||
statusManager: statusManager,
|
||||
prober: prober,
|
||||
readinessManager: readinessManager,
|
||||
livenessManager: livenessManager,
|
||||
startupManager: startupManager,
|
||||
workers: make(map[probeKey]*worker),
|
||||
}
|
||||
}
|
||||
@ -116,6 +123,8 @@ func NewManager(
|
||||
func (m *manager) Start() {
|
||||
// Start syncing readiness.
|
||||
go wait.Forever(m.updateReadiness, 0)
|
||||
// Start syncing startup.
|
||||
go wait.Forever(m.updateStartup, 0)
|
||||
}
|
||||
|
||||
// Key uniquely identifying container probes
|
||||
@ -125,12 +134,13 @@ type probeKey struct {
|
||||
probeType probeType
|
||||
}
|
||||
|
||||
// Type of probe (readiness or liveness)
|
||||
// Type of probe (liveness, readiness or startup)
|
||||
type probeType int
|
||||
|
||||
const (
|
||||
liveness probeType = iota
|
||||
readiness
|
||||
startup
|
||||
|
||||
probeResultSuccessful string = "successful"
|
||||
probeResultFailed string = "failed"
|
||||
@ -144,6 +154,8 @@ func (t probeType) String() string {
|
||||
return "Readiness"
|
||||
case liveness:
|
||||
return "Liveness"
|
||||
case startup:
|
||||
return "Startup"
|
||||
default:
|
||||
return "UNKNOWN"
|
||||
}
|
||||
@ -157,6 +169,18 @@ func (m *manager) AddPod(pod *v1.Pod) {
|
||||
for _, c := range pod.Spec.Containers {
|
||||
key.containerName = c.Name
|
||||
|
||||
if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
|
||||
key.probeType = startup
|
||||
if _, ok := m.workers[key]; ok {
|
||||
klog.Errorf("Startup probe already exists! %v - %v",
|
||||
format.Pod(pod), c.Name)
|
||||
return
|
||||
}
|
||||
w := newWorker(m, startup, pod, c)
|
||||
m.workers[key] = w
|
||||
go w.run()
|
||||
}
|
||||
|
||||
if c.ReadinessProbe != nil {
|
||||
key.probeType = readiness
|
||||
if _, ok := m.workers[key]; ok {
|
||||
@ -190,7 +214,7 @@ func (m *manager) RemovePod(pod *v1.Pod) {
|
||||
key := probeKey{podUID: pod.UID}
|
||||
for _, c := range pod.Spec.Containers {
|
||||
key.containerName = c.Name
|
||||
for _, probeType := range [...]probeType{readiness, liveness} {
|
||||
for _, probeType := range [...]probeType{readiness, liveness, startup} {
|
||||
key.probeType = probeType
|
||||
if worker, ok := m.workers[key]; ok {
|
||||
worker.stop()
|
||||
@ -223,6 +247,21 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
|
||||
ready = !exists
|
||||
}
|
||||
podStatus.ContainerStatuses[i].Ready = ready
|
||||
|
||||
var started bool
|
||||
if c.State.Running == nil {
|
||||
started = false
|
||||
} else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
|
||||
// the container is running, assume it is started if the StartupProbe feature is disabled
|
||||
started = true
|
||||
} else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
|
||||
started = result == results.Success
|
||||
} else {
|
||||
// The check whether there is a probe which hasn't run yet.
|
||||
_, exists := m.getWorker(podUID, c.Name, startup)
|
||||
started = !exists
|
||||
}
|
||||
podStatus.ContainerStatuses[i].Started = &started
|
||||
}
|
||||
// init containers are ready if they have exited with success or if a readiness probe has
|
||||
// succeeded.
|
||||
@ -262,3 +301,10 @@ func (m *manager) updateReadiness() {
|
||||
ready := update.Result == results.Success
|
||||
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
|
||||
}
|
||||
|
||||
func (m *manager) updateStartup() {
|
||||
update := <-m.startupManager.Updates()
|
||||
|
||||
started := update.Result == results.Success
|
||||
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
|
||||
}
|
||||
|
@ -98,6 +98,10 @@ func newWorker(
|
||||
w.spec = container.LivenessProbe
|
||||
w.resultsManager = m.livenessManager
|
||||
w.initialValue = results.Success
|
||||
case startup:
|
||||
w.spec = container.StartupProbe
|
||||
w.resultsManager = m.startupManager
|
||||
w.initialValue = results.Failure
|
||||
}
|
||||
|
||||
basicMetricLabels := prometheus.Labels{
|
||||
@ -218,10 +222,23 @@ func (w *worker) doProbe() (keepGoing bool) {
|
||||
w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
|
||||
}
|
||||
|
||||
// Probe disabled for InitialDelaySeconds.
|
||||
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
|
||||
return true
|
||||
}
|
||||
|
||||
if c.Started != nil && *c.Started {
|
||||
// Stop probing for startup once container has started.
|
||||
if w.probeType == startup {
|
||||
return true
|
||||
}
|
||||
} else {
|
||||
// Disable other probes until container has started.
|
||||
if w.probeType != startup {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
|
||||
// the full container environment here, OR we must make a call to the CRI in order to get those environment
|
||||
// values from the running container.
|
||||
@ -255,8 +272,8 @@ func (w *worker) doProbe() (keepGoing bool) {
|
||||
|
||||
w.resultsManager.Set(w.containerID, result, w.pod)
|
||||
|
||||
if w.probeType == liveness && result == results.Failure {
|
||||
// The container fails a liveness check, it will need to be restarted.
|
||||
if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
|
||||
// The container fails a liveness/startup check, it will need to be restarted.
|
||||
// Stop probing until we see a new container ID. This is to reduce the
|
||||
// chance of hitting #21751, where running `docker exec` when a
|
||||
// container is being stopped may lead to corrupted container state.
|
||||
|
@ -100,6 +100,10 @@ type Manager interface {
|
||||
// triggers a status update.
|
||||
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
|
||||
|
||||
// SetContainerStartup updates the cached container status with the given startup, and
|
||||
// triggers a status update.
|
||||
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
|
||||
|
||||
// TerminatePod resets the container status for the provided pod to terminated and triggers
|
||||
// a status update.
|
||||
TerminatePod(pod *v1.Pod)
|
||||
@ -248,6 +252,45 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
|
||||
m.updateStatusInternal(pod, status, false)
|
||||
}
|
||||
|
||||
func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
|
||||
m.podStatusesLock.Lock()
|
||||
defer m.podStatusesLock.Unlock()
|
||||
|
||||
pod, ok := m.podManager.GetPodByUID(podUID)
|
||||
if !ok {
|
||||
klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID))
|
||||
return
|
||||
}
|
||||
|
||||
oldStatus, found := m.podStatuses[pod.UID]
|
||||
if !found {
|
||||
klog.Warningf("Container startup changed before pod has synced: %q - %q",
|
||||
format.Pod(pod), containerID.String())
|
||||
return
|
||||
}
|
||||
|
||||
// Find the container to update.
|
||||
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
|
||||
if !ok {
|
||||
klog.Warningf("Container startup changed for unknown container: %q - %q",
|
||||
format.Pod(pod), containerID.String())
|
||||
return
|
||||
}
|
||||
|
||||
if containerStatus.Started != nil && *containerStatus.Started == started {
|
||||
klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started,
|
||||
format.Pod(pod), containerID.String())
|
||||
return
|
||||
}
|
||||
|
||||
// Make sure we're not updating the cached version.
|
||||
status := *oldStatus.status.DeepCopy()
|
||||
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
|
||||
containerStatus.Started = &started
|
||||
|
||||
m.updateStatusInternal(pod, status, false)
|
||||
}
|
||||
|
||||
func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) {
|
||||
// Find the container to update.
|
||||
for i, c := range status.ContainerStatuses {
|
||||
|
Loading…
Reference in New Issue
Block a user