diff --git a/pkg/kubelet/pleg/BUILD b/pkg/kubelet/pleg/BUILD index 4bf6c7a7550..5d847199f2c 100644 --- a/pkg/kubelet/pleg/BUILD +++ b/pkg/kubelet/pleg/BUILD @@ -14,6 +14,7 @@ go_library( "pleg.go", ], deps = [ + "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/metrics:go_default_library", "//vendor/github.com/golang/glog:go_default_library", diff --git a/pkg/kubelet/pleg/generic.go b/pkg/kubelet/pleg/generic.go index e12509f4b54..2d8a9a0c13d 100644 --- a/pkg/kubelet/pleg/generic.go +++ b/pkg/kubelet/pleg/generic.go @@ -26,6 +26,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" ) @@ -329,6 +330,41 @@ func (g *GenericPLEG) cacheEnabled() bool { return g.cache != nil } +// Preserve an older cached status' pod IP if the new status has no pod IP +// and its sandboxes have exited +func (g *GenericPLEG) getPodIP(pid types.UID, status *kubecontainer.PodStatus) string { + if status.IP != "" { + return status.IP + } + + oldStatus, err := g.cache.Get(pid) + if err != nil || oldStatus.IP == "" { + return "" + } + + for _, sandboxStatus := range status.SandboxStatuses { + // If at least one sandbox is ready, then use this status update's pod IP + if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY { + return status.IP + } + } + + if len(status.SandboxStatuses) == 0 { + // Without sandboxes (which built-in runtimes like rkt don't report) + // look at all the container statuses, and if any containers are + // running then use the new pod IP + for _, containerStatus := range status.ContainerStatuses { + if containerStatus.State == kubecontainer.ContainerStateCreated || containerStatus.State == kubecontainer.ContainerStateRunning { + return status.IP + } + } + } + + // For pods with no ready containers or sandboxes (like exited pods) + // use the old status' pod IP + return oldStatus.IP +} + func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { if pod == nil { // The pod is missing in the current relist. This means that @@ -343,6 +379,14 @@ func (g *GenericPLEG) updateCache(pod *kubecontainer.Pod, pid types.UID) error { // all containers again. status, err := g.runtime.GetPodStatus(pod.ID, pod.Name, pod.Namespace) glog.V(4).Infof("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err) + if err == nil { + // Preserve the pod IP across cache updates if the new IP is empty. + // When a pod is torn down, kubelet may race with PLEG and retrieve + // a pod status after network teardown, but the kubernetes API expects + // the completed pod's IP to be available after the pod is dead. + status.IP = g.getPodIP(pid, status) + } + g.cache.Set(pod.ID, status, err, timestamp) return err } diff --git a/pkg/kubelet/pleg/generic_test.go b/pkg/kubelet/pleg/generic_test.go index f5fd5635ca2..468f98bffae 100644 --- a/pkg/kubelet/pleg/generic_test.go +++ b/pkg/kubelet/pleg/generic_test.go @@ -496,3 +496,58 @@ func TestRelistingWithSandboxes(t *testing.T) { actual = getEventsFromChannel(ch) verifyEvents(t, expected, actual) } + +func TestRelistIPChange(t *testing.T) { + pleg, runtimeMock := newTestGenericPLEGWithRuntimeMock() + ch := pleg.Watch() + + id := types.UID("test-pod-0") + cState := kubecontainer.ContainerStateRunning + container := createTestContainer("c0", cState) + pod := &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + ipAddr := "192.168.1.5/24" + status := &kubecontainer.PodStatus{ + ID: id, + IP: ipAddr, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: cState}}, + } + event := &PodLifecycleEvent{ID: pod.ID, Type: ContainerStarted, Data: container.ID.ID} + + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() + runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() + + pleg.relist() + actualEvents := getEventsFromChannel(ch) + actualStatus, actualErr := pleg.cache.Get(pod.ID) + assert.Equal(t, status, actualStatus, "test0") + assert.Nil(t, actualErr, "test0") + assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) + + // Clear the IP address and mark the container terminated + container = createTestContainer("c0", kubecontainer.ContainerStateExited) + pod = &kubecontainer.Pod{ + ID: id, + Containers: []*kubecontainer.Container{container}, + } + status = &kubecontainer.PodStatus{ + ID: id, + ContainerStatuses: []*kubecontainer.ContainerStatus{{ID: container.ID, State: kubecontainer.ContainerStateExited}}, + } + event = &PodLifecycleEvent{ID: pod.ID, Type: ContainerDied, Data: container.ID.ID} + runtimeMock.On("GetPods", true).Return([]*kubecontainer.Pod{pod}, nil).Once() + runtimeMock.On("GetPodStatus", pod.ID, "", "").Return(status, nil).Once() + + pleg.relist() + actualEvents = getEventsFromChannel(ch) + actualStatus, actualErr = pleg.cache.Get(pod.ID) + // Must copy status to compare since its pointer gets passed through all + // the way to the event + statusCopy := *status + statusCopy.IP = ipAddr + assert.Equal(t, &statusCopy, actualStatus, "test0") + assert.Nil(t, actualErr, "test0") + assert.Exactly(t, []*PodLifecycleEvent{event}, actualEvents) +}