wait for container cleanup before deletion

This commit is contained in:
David Ashpole 2017-08-28 10:43:43 -07:00
parent 870406bec5
commit 9ac30e2c28
10 changed files with 83 additions and 75 deletions

View File

@ -628,7 +628,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeCfg.SeccompProfileRoot,
containerRefManager,
machineInfo,
klet.podManager,
klet,
kubeDeps.OSInterface,
klet,
httpClient,
@ -665,7 +665,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet,
kubeDeps.Recorder,
containerRefManager,
klet.podManager,
klet,
klet.livenessManager,
httpClient,
klet.networkPlugin,

View File

@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
remotecommandserver "k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
@ -819,6 +820,22 @@ func (kl *Kubelet) podIsTerminated(pod *v1.Pod) bool {
return status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}
// IsPodDeleted returns true if the pod is deleted. For the pod to be deleted, either:
// 1. The pod object is deleted
// 2. The pod's status is evicted
// 3. The pod's deletion timestamp is set, and containers are not running
func (kl *Kubelet) IsPodDeleted(uid types.UID) bool {
pod, podFound := kl.podManager.GetPodByUID(uid)
if !podFound {
return true
}
status, statusFound := kl.statusManager.GetPodStatus(pod.UID)
if !statusFound {
status = pod.Status
}
return eviction.PodIsEvicted(status) || (pod.DeletionTimestamp != nil && notRunning(status.ContainerStatuses))
}
// PodResourcesAreReclaimed returns true if all required node-level resources that a pod was consuming have
// been reclaimed by the kubelet. Reclaiming resources is a prerequisite to deleting a pod from the API server.
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool {
@ -827,6 +844,16 @@ func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bo
glog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod))
return false
}
// pod's containers should be deleted
runtimeStatus, err := kl.podCache.Get(pod.UID)
if err != nil {
glog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err)
return false
}
if len(runtimeStatus.ContainerStatuses) > 0 {
glog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %+v", format.Pod(pod), runtimeStatus.ContainerStatuses)
return false
}
if kl.podVolumesExist(pod.UID) && !kl.kubeletConfiguration.KeepTerminatedPodVolumes {
// We shouldnt delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes
glog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod))

View File

@ -21,7 +21,6 @@ import (
"time"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
@ -43,17 +42,19 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) {
return nil, f.err
}
type fakePodGetter struct {
pods map[types.UID]*v1.Pod
type fakePodDeletionProvider struct {
pods map[types.UID]struct{}
}
func newFakePodGetter() *fakePodGetter {
return &fakePodGetter{make(map[types.UID]*v1.Pod)}
func newFakePodDeletionProvider() *fakePodDeletionProvider {
return &fakePodDeletionProvider{
pods: make(map[types.UID]struct{}),
}
}
func (f *fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
pod, found := f.pods[uid]
return pod, found
func (f *fakePodDeletionProvider) IsPodDeleted(uid types.UID) bool {
_, found := f.pods[uid]
return !found
}
func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, machineInfo *cadvisorapi.MachineInfo, osInterface kubecontainer.OSInterface, runtimeHelper kubecontainer.RuntimeHelper, keyring credentialprovider.DockerKeyring) (*kubeGenericRuntimeManager, error) {
@ -76,7 +77,7 @@ func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
return nil, err
}
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, newFakePodGetter(), kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, newFakePodDeletionProvider(), kubeRuntimeManager)
kubeRuntimeManager.runtimeName = typedVersion.RuntimeName
kubeRuntimeManager.imagePuller = images.NewImageManager(
kubecontainer.FilterEventRecorder(recorder),

View File

@ -341,13 +341,12 @@ func (m *kubeGenericRuntimeManager) makeMounts(opts *kubecontainer.RunContainerO
func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]*runtimeapi.Container, error) {
filter := &runtimeapi.ContainerFilter{}
if !allContainers {
runningState := runtimeapi.ContainerState_CONTAINER_RUNNING
filter.State = &runtimeapi.ContainerStateValue{
State: runningState,
State: runtimeapi.ContainerState_CONTAINER_RUNNING,
}
}
containers, err := m.getContainersHelper(filter)
containers, err := m.runtimeService.ListContainers(filter)
if err != nil {
glog.Errorf("getKubeletContainers failed: %v", err)
return nil, err
@ -356,16 +355,6 @@ func (m *kubeGenericRuntimeManager) getKubeletContainers(allContainers bool) ([]
return containers, nil
}
// getContainers lists containers by filter.
func (m *kubeGenericRuntimeManager) getContainersHelper(filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
resp, err := m.runtimeService.ListContainers(filter)
if err != nil {
return nil, err
}
return resp, err
}
// makeUID returns a randomly generated string.
func makeUID() string {
return fmt.Sprintf("%08x", rand.Uint32())

View File

@ -34,15 +34,15 @@ import (
type containerGC struct {
client internalapi.RuntimeService
manager *kubeGenericRuntimeManager
podGetter podGetter
podDeletionProvider podDeletionProvider
}
// NewContainerGC creates a new containerGC.
func NewContainerGC(client internalapi.RuntimeService, podGetter podGetter, manager *kubeGenericRuntimeManager) *containerGC {
func NewContainerGC(client internalapi.RuntimeService, podDeletionProvider podDeletionProvider, manager *kubeGenericRuntimeManager) *containerGC {
return &containerGC{
client: client,
manager: manager,
podGetter: podGetter,
podDeletionProvider: podDeletionProvider,
}
}
@ -52,8 +52,6 @@ type containerGCInfo struct {
id string
// The name of the container.
name string
// The sandbox ID which this container belongs to
sandboxID string
// Creation time for the container.
createTime time.Time
}
@ -159,12 +157,6 @@ func (cgc *containerGC) removeSandbox(sandboxID string) {
}
}
// isPodDeleted returns true if the pod is already deleted.
func (cgc *containerGC) isPodDeleted(podUID types.UID) bool {
_, found := cgc.podGetter.GetPodByUID(podUID)
return !found
}
// evictableContainers gets all containers that are evictable. Evictable containers are: not running
// and created more than MinAge ago.
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, error) {
@ -191,7 +183,6 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
id: container.Id,
name: container.Metadata.Name,
createTime: createdAt,
sandboxID: container.PodSandboxId,
}
key := evictUnit{
uid: labeledInfo.PodUID,
@ -219,7 +210,7 @@ func (cgc *containerGC) evictContainers(gcPolicy kubecontainer.ContainerGCPolicy
// Remove deleted pod containers if all sources are ready.
if allSourcesReady {
for key, unit := range evictUnits {
if cgc.isPodDeleted(key.uid) || evictNonDeletedPods {
if cgc.podDeletionProvider.IsPodDeleted(key.uid) || evictNonDeletedPods {
cgc.removeOldestN(unit, len(unit)) // Remove all.
delete(evictUnits, key)
}
@ -307,7 +298,7 @@ func (cgc *containerGC) evictSandboxes(evictNonDeletedPods bool) error {
}
for podUID, sandboxes := range sandboxesByPod {
if cgc.isPodDeleted(podUID) || evictNonDeletedPods {
if cgc.podDeletionProvider.IsPodDeleted(podUID) || evictNonDeletedPods {
// Remove all evictable sandboxes if the pod has been removed.
// Note that the latest dead sandbox is also removed if there is
// already an active one.
@ -333,7 +324,7 @@ func (cgc *containerGC) evictPodLogsDirectories(allSourcesReady bool) error {
for _, dir := range dirs {
name := dir.Name()
podUID := types.UID(name)
if !cgc.isPodDeleted(podUID) {
if !cgc.podDeletionProvider.IsPodDeleted(podUID) {
continue
}
err := osInterface.RemoveAll(filepath.Join(podLogsRootDirectory, name))

View File

@ -34,11 +34,11 @@ func TestSandboxGC(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
makeGCSandbox := func(pod *v1.Pod, attempt uint32, state runtimeapi.PodSandboxState, withPodGetter bool, createdAt int64) sandboxTemplate {
if withPodGetter {
// initialize the pod getter
fakePodGetter.pods[pod.UID] = pod
podDeletionProvider.pods[pod.UID] = struct{}{}
}
return sandboxTemplate{
pod: pod,
@ -162,13 +162,13 @@ func TestContainerGC(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
makeGCContainer := func(podName, containerName string, attempt int, createdAt int64, state runtimeapi.ContainerState) containerTemplate {
container := makeTestContainer(containerName, "test-image")
pod := makeTestPod(podName, "test-ns", podName, []v1.Container{container})
if podName != "deleted" {
// initialize the pod getter, explicitly exclude deleted pod
fakePodGetter.pods[pod.UID] = pod
podDeletionProvider.pods[pod.UID] = struct{}{}
}
return containerTemplate{
pod: pod,
@ -361,11 +361,11 @@ func TestPodLogDirectoryGC(t *testing.T) {
_, _, m, err := createTestRuntimeManager()
assert.NoError(t, err)
fakeOS := m.osInterface.(*containertest.FakeOS)
fakePodGetter := m.containerGC.podGetter.(*fakePodGetter)
podDeletionProvider := m.containerGC.podDeletionProvider.(*fakePodDeletionProvider)
// pod log directories without corresponding pods should be removed.
fakePodGetter.pods["123"] = makeTestPod("foo1", "new", "123", nil)
fakePodGetter.pods["456"] = makeTestPod("foo2", "new", "456", nil)
podDeletionProvider.pods["123"] = struct{}{}
podDeletionProvider.pods["456"] = struct{}{}
files := []string{"123", "456", "789", "012"}
removed := []string{filepath.Join(podLogsRootDirectory, "789"), filepath.Join(podLogsRootDirectory, "012")}

View File

@ -64,9 +64,9 @@ var (
ErrVersionNotSupported = errors.New("Runtime api version is not supported")
)
// A subset of the pod.Manager interface extracted for garbage collection purposes.
type podGetter interface {
GetPodByUID(kubetypes.UID) (*v1.Pod, bool)
// podDeletionProvider can determine if a pod is deleted
type podDeletionProvider interface {
IsPodDeleted(kubetypes.UID) bool
}
type kubeGenericRuntimeManager struct {
@ -123,7 +123,7 @@ func NewKubeGenericRuntimeManager(
seccompProfileRoot string,
containerRefManager *kubecontainer.RefManager,
machineInfo *cadvisorapi.MachineInfo,
podGetter podGetter,
podDeletionProvider podDeletionProvider,
osInterface kubecontainer.OSInterface,
runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter,
@ -187,7 +187,7 @@ func NewKubeGenericRuntimeManager(
imagePullQPS,
imagePullBurst)
kubeRuntimeManager.runner = lifecycle.NewHandlerRunner(httpClient, kubeRuntimeManager, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podGetter, kubeRuntimeManager)
kubeRuntimeManager.containerGC = NewContainerGC(runtimeService, podDeletionProvider, kubeRuntimeManager)
kubeRuntimeManager.versionCache = cache.NewObjectCache(
func() (interface{}, error) {

View File

@ -26,7 +26,6 @@ import (
rktapi "github.com/coreos/rkt/api/v1alpha"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
kubetypes "k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -179,17 +178,19 @@ func (f *fakeRktCli) Reset() {
f.err = nil
}
type fakePodGetter struct {
pods map[types.UID]*v1.Pod
type fakePodDeletionProvider struct {
pods map[types.UID]struct{}
}
func newFakePodGetter() *fakePodGetter {
return &fakePodGetter{pods: make(map[types.UID]*v1.Pod)}
func newFakePodDeletionProvider() *fakePodDeletionProvider {
return &fakePodDeletionProvider{
pods: make(map[types.UID]struct{}),
}
}
func (f fakePodGetter) GetPodByUID(uid types.UID) (*v1.Pod, bool) {
p, found := f.pods[uid]
return p, found
func (f *fakePodDeletionProvider) IsPodDeleted(uid types.UID) bool {
_, found := f.pods[uid]
return !found
}
type fakeUnitGetter struct {

View File

@ -160,7 +160,7 @@ type Runtime struct {
dockerKeyring credentialprovider.DockerKeyring
containerRefManager *kubecontainer.RefManager
podGetter podGetter
podDeletionProvider podDeletionProvider
runtimeHelper kubecontainer.RuntimeHelper
recorder record.EventRecorder
livenessManager proberesults.Manager
@ -201,9 +201,9 @@ type podServiceDirective struct {
var _ kubecontainer.Runtime = &Runtime{}
var _ kubecontainer.DirectStreamingRuntime = &Runtime{}
// TODO(yifan): This duplicates the podGetter in dockertools.
type podGetter interface {
GetPodByUID(kubetypes.UID) (*v1.Pod, bool)
// podDeletionProvider can determine if a pod is deleted
type podDeletionProvider interface {
IsPodDeleted(kubetypes.UID) bool
}
// cliInterface wrapps the command line calls for testing purpose.
@ -228,7 +228,7 @@ func New(
runtimeHelper kubecontainer.RuntimeHelper,
recorder record.EventRecorder,
containerRefManager *kubecontainer.RefManager,
podGetter podGetter,
podDeletionProvider podDeletionProvider,
livenessManager proberesults.Manager,
httpClient types.HttpGetter,
networkPlugin network.NetworkPlugin,
@ -285,7 +285,7 @@ func New(
config: config,
dockerKeyring: credentialprovider.NewDockerKeyring(),
containerRefManager: containerRefManager,
podGetter: podGetter,
podDeletionProvider: podDeletionProvider,
runtimeHelper: runtimeHelper,
recorder: recorder,
livenessManager: livenessManager,
@ -2020,8 +2020,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSo
removeCandidates = append(removeCandidates, pod)
continue
}
_, found := r.podGetter.GetPodByUID(uid)
if !found && allSourcesReady {
if r.podDeletionProvider.IsPodDeleted(uid) && allSourcesReady {
removeCandidates = append(removeCandidates, pod)
continue
}

View File

@ -1636,7 +1636,7 @@ func TestGarbageCollect(t *testing.T) {
fs := newFakeSystemd()
cli := newFakeRktCli()
fakeOS := kubetesting.NewFakeOS()
getter := newFakePodGetter()
deletionProvider := newFakePodDeletionProvider()
fug := newfakeUnitGetter()
frh := &containertesting.FakeRuntimeHelper{}
@ -1644,7 +1644,7 @@ func TestGarbageCollect(t *testing.T) {
os: fakeOS,
cli: cli,
apisvc: fr,
podGetter: getter,
podDeletionProvider: deletionProvider,
systemd: fs,
containerRefManager: kubecontainer.NewRefManager(),
unitGetter: fug,
@ -1830,7 +1830,7 @@ func TestGarbageCollect(t *testing.T) {
fr.pods = tt.pods
for _, p := range tt.apiPods {
getter.pods[p.UID] = p
deletionProvider.pods[p.UID] = struct{}{}
}
allSourcesReady := true
@ -1862,7 +1862,7 @@ func TestGarbageCollect(t *testing.T) {
ctrl.Finish()
fakeOS.Removes = []string{}
fs.resetFailedUnits = []string{}
getter.pods = make(map[kubetypes.UID]*v1.Pod)
deletionProvider.pods = make(map[kubetypes.UID]struct{})
}
}