From 8c162601600ec257a77dbc2294d3782dbf33c107 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 20 Jun 2017 13:35:38 -0500 Subject: [PATCH] kubelet: fix inconsistent display of terminated pod IPs by using events instead PLEG and kubelet race when reading and sending pod status to the apiserver. PLEG inserts status into a cache, and then signals kubelet. Kubelet then eventually reads the status out of that cache, but in the mean time the status could have been changed by PLEG. When a pod exits, pod status will no longer include the pod's IP address because the network plugin/runtime will report "" for terminated pod IPs. If this status gets inserted into the PLEG cache before kubelet gets the status out of the cache, kubelet will see a blank pod IP address. This happens in about 1/5 of cases when pods are short-lived, and somewhat less frequently for longer running pods. To ensure consistency for properties of dead pods, copy an old status update's IP address over to the new status update if (a) the new status update's IP is missing and (b) all sandboxes of the pod are dead/not-ready (eg, no possibility for a valid IP from the sandbox). Fixes: https://github.com/kubernetes/kubernetes/issues/47265 --- pkg/kubelet/pleg/BUILD | 1 + pkg/kubelet/pleg/generic.go | 44 +++++++++++++++++++++++++ pkg/kubelet/pleg/generic_test.go | 55 ++++++++++++++++++++++++++++++++ 3 files changed, 100 insertions(+) diff --git a/pkg/kubelet/pleg/BUILD b/pkg/kubelet/pleg/BUILD index 6ad5cae0e5a..e907644f613 100644 --- a/pkg/kubelet/pleg/BUILD +++ b/pkg/kubelet/pleg/BUILD @@ -17,6 +17,7 @@ go_library( ], tags = ["automanaged"], deps = [ + "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index 8e447f0ef85..6c6c980c3d8 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" ) @@ -329,6 +330,41 @@ func (g *GenericPLEG) cacheEnabled() bool { return g.cache != nil } +// Preserve an older cached status' pod IP if the new status has no pod IP +// and its sandboxes have exited +func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) string { + if status.IP != "" { + return status.IP + } + + oldStatus, err := g.cache.Get(pid) + if err != nil || oldStatus.IP == "" { + return "" + } + + for _, sandboxStatus := range status.SandboxStatuses { + // If at least one sandbox is ready, then use this status update's pod IP + if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY { + return status.IP + } + } + + if len(status.SandboxStatuses) == 0 { + // Without sandboxes (which built-in runtimes like rkt don't report) + // look at all the container statuses, and if any containers are + // running then use the new pod IP + for _, containerStatus := range status.ContainerStatuses { + if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning { + return status.IP + } + } + } + + // For pods with no ready containers or sandboxes (like exited pods) + // use the old status' pod IP + return oldStatus.IP +} + func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { if pod == nil { // The pod is missing in the current relist. This means that @@ -343,6 +379,14 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { // all containers again. status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace) glog.V(4).Infof("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err) + if err == nil { + // Preserve the pod IP across cache updates if the new IP is empty. + // When a pod is torn down, kubelet may race with PLEG and retrieve + // a pod status after network teardown, but the kubernetes API expects + // the completed pod's IP to be available after the pod is dead. + status.IP = g.getPodIP(pid, status) + } + g.cache.Set(pod.ID, status, err, timestamp) return err } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index f5fd5635ca2..468f98bffae 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -496,3 +496,58 @@ func TestRelistingWithSandboxes(t *testing.T) { actual = getEventsFromChannel(ch) verifyEvents(t, expected, actual) } + +func TestRelistIPChange(t *testing.T) { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + ch := pleg.Watch() + + id := types.UID("test-pod-0") + cState := kubecontainer.ContainerStateRunning + container := createTestContainer("c0", cState) + pod := &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + ipAddr := "192.168.1.5/24" + status := &kubecontainer.PodStatus{ + ID: id, + IP: ipAddr, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}}, + } + event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID} + + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() + runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() + + pleg.relist() + actualEvents := getEventsFromChannel(ch) + actualStatus, actualErr := pleg.cache.Get(pod.ID) + assert.Equal(t, status, actualStatus, "test0") + assert.Nil(t, actualErr, "test0") + assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) + + // Clear the IP address and mark the container terminated + container = createTestContainer("c0", kubecontainer.ContainerStateExited) + pod = &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + status = &kubecontainer.PodStatus{ + ID: id, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: kubecontainer.ContainerStateExited}}, + } + event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID} + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() + runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() + + pleg.relist() + actualEvents = getEventsFromChannel(ch) + actualStatus, actualErr = pleg.cache.Get(pod.ID) + // Must copy status to compare since its pointer gets passed through all + // the way to the event + statusCopy := *status + statusCopy.IP = ipAddr + assert.Equal(t, &statusCopy, actualStatus, "test0") + assert.Nil(t, actualErr, "test0") + assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) +}