diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index e019495caa4..8c55d779b3c 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -1261,6 +1261,10 @@ func (dm *DockerManager) killContainer(containerID types.UID, container *api.Con gracePeriod = minimumGracePeriodInSeconds } err := dm.client.StopContainer(ID, uint(gracePeriod)) + if _, ok := err.(*docker.ContainerNotRunning); ok && err != nil { + glog.V(4).Infof("Container %q has already exited", name) + return nil + } if err == nil { glog.V(2).Infof("Container %q exited after %s", name, util.Now().Sub(start.Time)) } else { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 41850d039c7..a34d756b3a4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1367,6 +1367,32 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } +// Delete any pods that are no longer running and are marked for deletion. +func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { + var terminating []*api.Pod + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + found := false + for _, runningPod := range runningPods { + if runningPod.ID == pod.UID { + found = true + break + } + } + if found { + podFullName := kubecontainer.GetPodFullName(pod) + glog.V(5).Infof("Keeping terminated pod %q and uid %q, still running", podFullName, pod.UID) + continue + } + terminating = append(terminating, pod) + } + } + if !kl.statusManager.TerminatePods(terminating) { + return errors.New("not all pods were successfully terminated") + } + return nil +} + // pastActiveDeadline returns true if the pod has been active for more than // ActiveDeadlineSeconds. func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { @@ -1529,6 +1555,10 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro // Remove any orphaned mirror pods. kl.podManager.DeleteOrphanedMirrorPods() + if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil { + glog.Errorf("Failed to cleanup terminated pods: %v", err) + } + return err } diff --git a/pkg/kubelet/mirror_client.go b/pkg/kubelet/mirror_client.go index ae54c33017c..8ef630d1b7e 100644 --- a/pkg/kubelet/mirror_client.go +++ b/pkg/kubelet/mirror_client.go @@ -64,7 +64,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error { return err } glog.V(4).Infof("Deleting a mirror pod %q", podFullName) - if err := mc.apiserverClient.Pods(namespace).Delete(name, nil); err != nil { + if err := mc.apiserverClient.Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil { glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) } return nil diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 4d754286309..448c5bd5853 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -131,6 +131,29 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } } +// TerminatePods resets the container status for the provided pods to terminated and triggers +// a status update. This function may not enqueue all the provided pods, in which case it will +// return false +func (s *statusManager) TerminatePods(pods []*api.Pod) bool { + sent := true + s.podStatusesLock.Lock() + defer s.podStatusesLock.Unlock() + for _, pod := range pods { + for i := range pod.Status.ContainerStatuses { + pod.Status.ContainerStatuses[i].State = api.ContainerState{ + Terminated: &api.ContainerStateTerminated{}, + } + } + select { + case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: + default: + sent = false + glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod)) + } + } + return sent +} + func (s *statusManager) DeletePodStatus(podFullName string) { s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() @@ -167,6 +190,10 @@ func (s *statusManager) syncBatch() error { return nil } if err == nil { + if len(pod.UID) > 0 && statusPod.UID != pod.UID { + glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletUtil.FormatPodName(pod)) + return nil + } statusPod.Status = status // TODO: handle conflict as a retry, make that easier too. statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go index 585d2ffd34f..22e93275024 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status_manager_test.go @@ -153,8 +153,21 @@ func TestUnchangedStatus(t *testing.T) { verifyUpdates(t, syncer, 1) } +func TestSyncBatchIgnoresNotFound(t *testing.T) { + syncer := newTestStatusManager() + syncer.SetPodStatus(testPod, getRandomPodStatus()) + err := syncer.syncBatch() + if err != nil { + t.Errorf("unexpected syncing error: %v", err) + } + verifyActions(t, syncer.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + }) +} + func TestSyncBatch(t *testing.T) { syncer := newTestStatusManager() + syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) err := syncer.syncBatch() if err != nil { @@ -167,6 +180,22 @@ func TestSyncBatch(t *testing.T) { ) } +func TestSyncBatchChecksMismatchedUID(t *testing.T) { + syncer := newTestStatusManager() + testPod.UID = "first" + differentPod := *testPod + differentPod.UID = "second" + syncer.kubeClient = testclient.NewSimpleFake(testPod) + syncer.SetPodStatus(&differentPod, getRandomPodStatus()) + err := syncer.syncBatch() + if err != nil { + t.Errorf("unexpected syncing error: %v", err) + } + verifyActions(t, syncer.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + }) +} + // shuffle returns a new shuffled list of container statuses. func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus { numStatuses := len(statuses)