diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index f0a129443a7..e12f1e3eb76 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -1013,14 +1013,14 @@ func (kl *Kubelet) HandlePodCleanups() error { // These two conditions could be alleviated by checkpointing kubelet. activePods := kl.filterOutTerminatedPods(allPods) - desiredPods := make(map[types.UID]empty) + desiredPods := make(map[types.UID]sets.Empty) for _, pod := range activePods { - desiredPods[pod.UID] = empty{} + desiredPods[pod.UID] = sets.Empty{} } // Stop the workers for no-longer existing pods. // TODO: is here the best place to forget pod workers? kl.podWorkers.ForgetNonExistingPodWorkers(desiredPods) - kl.probeManager.CleanupPods(activePods) + kl.probeManager.CleanupPods(desiredPods) runningPods, err := kl.runtimeCache.GetPods() if err != nil { diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 6b7c2244f34..56964890932 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -25,6 +25,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/klog" @@ -72,7 +73,7 @@ type UpdatePodOptions struct { // PodWorkers is an abstract interface for testability. type PodWorkers interface { UpdatePod(options *UpdatePodOptions) - ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) + ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty) ForgetWorker(uid types.UID) } @@ -251,7 +252,7 @@ func (p *podWorkers) ForgetWorker(uid types.UID) { p.removeWorker(uid) } -func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) { +func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty) { p.podLock.Lock() defer p.podLock.Unlock() for key := range p.podUpdates { diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 2dc7cd644fa..3584b54fa49 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" @@ -57,7 +58,7 @@ func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) { } } -func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {} +func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]sets.Empty) {} func (f *fakePodWorkers) ForgetWorker(uid types.UID) {} @@ -219,9 +220,9 @@ func TestForgetNonExistingPodWorkers(t *testing.T) { t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) } - desiredPods := map[types.UID]empty{} - desiredPods[types.UID(2)] = empty{} - desiredPods[types.UID(14)] = empty{} + desiredPods := map[types.UID]sets.Empty{} + desiredPods[types.UID(2)] = sets.Empty{} + desiredPods[types.UID(14)] = sets.Empty{} podWorkers.ForgetNonExistingPodWorkers(desiredPods) if len(podWorkers.podUpdates) != 2 { t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) @@ -233,7 +234,7 @@ func TestForgetNonExistingPodWorkers(t *testing.T) { t.Errorf("No updates channel for pod 14") } - podWorkers.ForgetNonExistingPodWorkers(map[types.UID]empty{}) + podWorkers.ForgetNonExistingPodWorkers(map[types.UID]sets.Empty{}) if len(podWorkers.podUpdates) != 0 { t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates)) } diff --git a/pkg/kubelet/prober/BUILD b/pkg/kubelet/prober/BUILD index 18922e59e33..d3b3046bcc3 100644 --- a/pkg/kubelet/prober/BUILD +++ b/pkg/kubelet/prober/BUILD @@ -61,6 +61,7 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/intstr:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", diff --git a/pkg/kubelet/prober/prober_manager.go b/pkg/kubelet/prober/prober_manager.go index 9ee312eb63e..efa7d827145 100644 --- a/pkg/kubelet/prober/prober_manager.go +++ b/pkg/kubelet/prober/prober_manager.go @@ -62,8 +62,8 @@ type Manager interface { RemovePod(pod *v1.Pod) // CleanupPods handles cleaning up pods which should no longer be running. - // It takes a list of "active pods" which should not be cleaned up. - CleanupPods(activePods []*v1.Pod) + // It takes a map of "desired pods" which should not be cleaned up. + CleanupPods(desiredPods map[types.UID]sets.Empty) // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each // container based on container running status, cached probe results and worker states. @@ -198,12 +198,7 @@ func (m *manager) RemovePod(pod *v1.Pod) { } } -func (m *manager) CleanupPods(activePods []*v1.Pod) { - desiredPods := make(map[types.UID]sets.Empty) - for _, pod := range activePods { - desiredPods[pod.UID] = sets.Empty{} - } - +func (m *manager) CleanupPods(desiredPods map[types.UID]sets.Empty) { m.workerLock.RLock() defer m.workerLock.RUnlock() diff --git a/pkg/kubelet/prober/prober_manager_test.go b/pkg/kubelet/prober/prober_manager_test.go index 72593c85d5c..7021c392597 100644 --- a/pkg/kubelet/prober/prober_manager_test.go +++ b/pkg/kubelet/prober/prober_manager_test.go @@ -26,6 +26,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -158,7 +159,9 @@ func TestCleanupPods(t *testing.T) { m.AddPod(&podToCleanup) m.AddPod(&podToKeep) - m.CleanupPods([]*v1.Pod{&podToKeep}) + desiredPods := map[types.UID]sets.Empty{} + desiredPods[podToKeep.UID] = sets.Empty{} + m.CleanupPods(desiredPods) removedProbes := []probeKey{ {"pod_cleanup", "prober1", readiness}, @@ -197,7 +200,7 @@ func TestCleanupRepeated(t *testing.T) { } for i := 0; i < 10; i++ { - m.CleanupPods([]*v1.Pod{}) + m.CleanupPods(map[types.UID]sets.Empty{}) } } diff --git a/pkg/kubelet/prober/testing/BUILD b/pkg/kubelet/prober/testing/BUILD index aa7f5e5287d..348d458f706 100644 --- a/pkg/kubelet/prober/testing/BUILD +++ b/pkg/kubelet/prober/testing/BUILD @@ -12,6 +12,7 @@ go_library( deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) diff --git a/pkg/kubelet/prober/testing/fake_manager.go b/pkg/kubelet/prober/testing/fake_manager.go index e0cfc785133..41910382d38 100644 --- a/pkg/kubelet/prober/testing/fake_manager.go +++ b/pkg/kubelet/prober/testing/fake_manager.go @@ -19,6 +19,7 @@ package testing import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" ) // FakeManager simulates a prober.Manager for testing. @@ -33,7 +34,7 @@ func (FakeManager) AddPod(_ *v1.Pod) {} func (FakeManager) RemovePod(_ *v1.Pod) {} // CleanupPods simulates cleaning up Pods. -func (FakeManager) CleanupPods(_ []*v1.Pod) {} +func (FakeManager) CleanupPods(_ map[types.UID]sets.Empty) {} // Start simulates start syncing the probe status func (FakeManager) Start() {}