Don't remove all containers of deleted pods until sources are ready
Without this fix, kubelet may assume a pod from a not-ready source has already been deleted, and GC all its dead containers.
This commit is contained in:
		@@ -39,7 +39,7 @@ type ContainerGCPolicy struct {
 | 
				
			|||||||
// Implementation is thread-compatible.
 | 
					// Implementation is thread-compatible.
 | 
				
			||||||
type ContainerGC interface {
 | 
					type ContainerGC interface {
 | 
				
			||||||
	// Garbage collect containers.
 | 
						// Garbage collect containers.
 | 
				
			||||||
	GarbageCollect() error
 | 
						GarbageCollect(allSourcesReady bool) error
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// TODO(vmarmol): Preferentially remove pod infra containers.
 | 
					// TODO(vmarmol): Preferentially remove pod infra containers.
 | 
				
			||||||
@@ -63,6 +63,6 @@ func NewContainerGC(runtime Runtime, policy ContainerGCPolicy) (ContainerGC, err
 | 
				
			|||||||
	}, nil
 | 
						}, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (cgc *realContainerGC) GarbageCollect() error {
 | 
					func (cgc *realContainerGC) GarbageCollect(allSourcesReady bool) error {
 | 
				
			||||||
	return cgc.runtime.GarbageCollect(cgc.policy)
 | 
						return cgc.runtime.GarbageCollect(cgc.policy, allSourcesReady)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -74,7 +74,12 @@ type Runtime interface {
 | 
				
			|||||||
	// exited and dead containers (used for garbage collection).
 | 
						// exited and dead containers (used for garbage collection).
 | 
				
			||||||
	GetPods(all bool) ([]*Pod, error)
 | 
						GetPods(all bool) ([]*Pod, error)
 | 
				
			||||||
	// GarbageCollect removes dead containers using the specified container gc policy
 | 
						// GarbageCollect removes dead containers using the specified container gc policy
 | 
				
			||||||
	GarbageCollect(gcPolicy ContainerGCPolicy) error
 | 
						// If allSourcesReady is not true, it means that kubelet doesn't have the
 | 
				
			||||||
 | 
						// complete list of pods from all avialble sources (e.g., apiserver, http,
 | 
				
			||||||
 | 
						// file). In this case, garbage collector should refrain itself from aggressive
 | 
				
			||||||
 | 
						// behavior such as removing all containers of unrecognized pods (yet).
 | 
				
			||||||
 | 
						// TODO: Revisit this method and make it cleaner.
 | 
				
			||||||
 | 
						GarbageCollect(gcPolicy ContainerGCPolicy, allSourcesReady bool) error
 | 
				
			||||||
	// Syncs the running pod into the desired pod.
 | 
						// Syncs the running pod into the desired pod.
 | 
				
			||||||
	SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
 | 
						SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *PodStatus, pullSecrets []api.Secret, backOff *flowcontrol.Backoff) PodSyncResult
 | 
				
			||||||
	// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
 | 
						// KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -346,7 +346,7 @@ func (f *FakeRuntime) GetNetNS(containerID ContainerID) (string, error) {
 | 
				
			|||||||
	return "", f.Err
 | 
						return "", f.Err
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy) error {
 | 
					func (f *FakeRuntime) GarbageCollect(gcPolicy ContainerGCPolicy, ready bool) error {
 | 
				
			||||||
	f.Lock()
 | 
						f.Lock()
 | 
				
			||||||
	defer f.Unlock()
 | 
						defer f.Unlock()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -133,8 +133,8 @@ func (r *Mock) GetNetNS(containerID ContainerID) (string, error) {
 | 
				
			|||||||
	return "", args.Error(0)
 | 
						return "", args.Error(0)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (r *Mock) GarbageCollect(gcPolicy ContainerGCPolicy) error {
 | 
					func (r *Mock) GarbageCollect(gcPolicy ContainerGCPolicy, ready bool) error {
 | 
				
			||||||
	args := r.Called(gcPolicy)
 | 
						args := r.Called(gcPolicy, ready)
 | 
				
			||||||
	return args.Error(0)
 | 
						return args.Error(0)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -185,7 +185,7 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// GarbageCollect removes dead containers using the specified container gc policy
 | 
					// GarbageCollect removes dead containers using the specified container gc policy
 | 
				
			||||||
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
 | 
					func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
 | 
				
			||||||
	// Separate containers by evict units.
 | 
						// Separate containers by evict units.
 | 
				
			||||||
	evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
 | 
						evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
 | 
				
			||||||
	if err != nil {
 | 
						if err != nil {
 | 
				
			||||||
@@ -201,11 +201,13 @@ func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy)
 | 
				
			|||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	// Remove deleted pod containers.
 | 
						// Remove deleted pod containers if all sources are ready.
 | 
				
			||||||
	for key, unit := range evictUnits {
 | 
						if allSourcesReady {
 | 
				
			||||||
		if cgc.isPodDeleted(key.uid) {
 | 
							for key, unit := range evictUnits {
 | 
				
			||||||
			cgc.removeOldestN(unit, len(unit)) // Remove all.
 | 
								if cgc.isPodDeleted(key.uid) {
 | 
				
			||||||
			delete(evictUnits, key)
 | 
									cgc.removeOldestN(unit, len(unit)) // Remove all.
 | 
				
			||||||
 | 
									delete(evictUnits, key)
 | 
				
			||||||
 | 
								}
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -96,7 +96,7 @@ func TestGarbageCollectZeroMaxContainers(t *testing.T) {
 | 
				
			|||||||
	})
 | 
						})
 | 
				
			||||||
	addPods(gc.podGetter, "foo")
 | 
						addPods(gc.podGetter, "foo")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: 1, MaxContainers: 0}))
 | 
						assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: 1, MaxContainers: 0}, true))
 | 
				
			||||||
	assert.Len(t, fakeDocker.Removed, 1)
 | 
						assert.Len(t, fakeDocker.Removed, 1)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -111,7 +111,7 @@ func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) {
 | 
				
			|||||||
	})
 | 
						})
 | 
				
			||||||
	addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4")
 | 
						addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: 4}))
 | 
						assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: 4}, true))
 | 
				
			||||||
	assert.Len(t, fakeDocker.Removed, 1)
 | 
						assert.Len(t, fakeDocker.Removed, 1)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -240,7 +240,7 @@ func TestGarbageCollect(t *testing.T) {
 | 
				
			|||||||
		gc, fakeDocker := newTestContainerGC(t)
 | 
							gc, fakeDocker := newTestContainerGC(t)
 | 
				
			||||||
		fakeDocker.SetFakeContainers(test.containers)
 | 
							fakeDocker.SetFakeContainers(test.containers)
 | 
				
			||||||
		addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7")
 | 
							addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7")
 | 
				
			||||||
		assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Hour, MaxPerPodContainer: 2, MaxContainers: 6}))
 | 
							assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Hour, MaxPerPodContainer: 2, MaxContainers: 6}, true))
 | 
				
			||||||
		verifyStringArrayEqualsAnyOrder(t, fakeDocker.Removed, test.expectedRemoved)
 | 
							verifyStringArrayEqualsAnyOrder(t, fakeDocker.Removed, test.expectedRemoved)
 | 
				
			||||||
	}
 | 
						}
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -2352,8 +2352,8 @@ func (dm *DockerManager) GetNetNS(containerID kubecontainer.ContainerID) (string
 | 
				
			|||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Garbage collection of dead containers
 | 
					// Garbage collection of dead containers
 | 
				
			||||||
func (dm *DockerManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
 | 
					func (dm *DockerManager) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
 | 
				
			||||||
	return dm.containerGC.GarbageCollect(gcPolicy)
 | 
						return dm.containerGC.GarbageCollect(gcPolicy, allSourcesReady)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (dm *DockerManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
 | 
					func (dm *DockerManager) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kubecontainer.PodStatus, error) {
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -911,7 +911,7 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
 | 
				
			|||||||
// Starts garbage collection threads.
 | 
					// Starts garbage collection threads.
 | 
				
			||||||
func (kl *Kubelet) StartGarbageCollection() {
 | 
					func (kl *Kubelet) StartGarbageCollection() {
 | 
				
			||||||
	go wait.Until(func() {
 | 
						go wait.Until(func() {
 | 
				
			||||||
		if err := kl.containerGC.GarbageCollect(); err != nil {
 | 
							if err := kl.containerGC.GarbageCollect(kl.sourcesReady.AllReady()); err != nil {
 | 
				
			||||||
			glog.Errorf("Container garbage collection failed: %v", err)
 | 
								glog.Errorf("Container garbage collection failed: %v", err)
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
	}, ContainerGCPeriod, wait.NeverStop)
 | 
						}, ContainerGCPeriod, wait.NeverStop)
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1813,7 +1813,7 @@ func podDetailsFromServiceFile(serviceFilePath string) (string, string, string,
 | 
				
			|||||||
// - If the number of containers exceeds gcPolicy.MaxContainers,
 | 
					// - If the number of containers exceeds gcPolicy.MaxContainers,
 | 
				
			||||||
//   then containers whose ages are older than gcPolicy.minAge will
 | 
					//   then containers whose ages are older than gcPolicy.minAge will
 | 
				
			||||||
//   be removed.
 | 
					//   be removed.
 | 
				
			||||||
func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error {
 | 
					func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
 | 
				
			||||||
	var errlist []error
 | 
						var errlist []error
 | 
				
			||||||
	var totalInactiveContainers int
 | 
						var totalInactiveContainers int
 | 
				
			||||||
	var inactivePods []*rktapi.Pod
 | 
						var inactivePods []*rktapi.Pod
 | 
				
			||||||
@@ -1846,7 +1846,7 @@ func (r *Runtime) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy) error
 | 
				
			|||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
			_, found := r.podGetter.GetPodByUID(uid)
 | 
								_, found := r.podGetter.GetPodByUID(uid)
 | 
				
			||||||
			if !found {
 | 
								if !found && allSourcesReady {
 | 
				
			||||||
				removeCandidates = append(removeCandidates, pod)
 | 
									removeCandidates = append(removeCandidates, pod)
 | 
				
			||||||
				continue
 | 
									continue
 | 
				
			||||||
			}
 | 
								}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -1694,7 +1694,8 @@ func TestGarbageCollect(t *testing.T) {
 | 
				
			|||||||
			getter.pods[p.UID] = p
 | 
								getter.pods[p.UID] = p
 | 
				
			||||||
		}
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		err := rkt.GarbageCollect(tt.gcPolicy)
 | 
							allSourcesReady := true
 | 
				
			||||||
 | 
							err := rkt.GarbageCollect(tt.gcPolicy, allSourcesReady)
 | 
				
			||||||
		assert.NoError(t, err, testCaseHint)
 | 
							assert.NoError(t, err, testCaseHint)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		sort.Sort(sortedStringList(tt.expectedCommands))
 | 
							sort.Sort(sortedStringList(tt.expectedCommands))
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user