Use containerGC in the Kubelet.

New policy default is 100 containers max.

Fixes #5457.
This commit is contained in:
Victor Marmol
2015-03-14 10:13:20 -07:00
parent 4c17c09a8f
commit d1ed571e28
3 changed files with 30 additions and 473 deletions

View File

@@ -65,6 +65,7 @@ type KubeletServer struct {
RunOnce bool RunOnce bool
EnableDebuggingHandlers bool EnableDebuggingHandlers bool
MinimumGCAge time.Duration MinimumGCAge time.Duration
MaxPerPodContainerCount int
MaxContainerCount int MaxContainerCount int
AuthPath string AuthPath string
CadvisorPort uint CadvisorPort uint
@@ -92,7 +93,8 @@ func NewKubeletServer() *KubeletServer {
RegistryBurst: 10, RegistryBurst: 10,
EnableDebuggingHandlers: true, EnableDebuggingHandlers: true,
MinimumGCAge: 1 * time.Minute, MinimumGCAge: 1 * time.Minute,
MaxContainerCount: 5, MaxPerPodContainerCount: 5,
MaxContainerCount: 100,
CadvisorPort: 4194, CadvisorPort: 4194,
OOMScoreAdj: -900, OOMScoreAdj: -900,
MasterServiceNamespace: api.NamespaceDefault, MasterServiceNamespace: api.NamespaceDefault,
@@ -120,7 +122,8 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api_servers, and --enable-server") fs.BoolVar(&s.RunOnce, "runonce", s.RunOnce, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --api_servers, and --enable-server")
fs.BoolVar(&s.EnableDebuggingHandlers, "enable_debugging_handlers", s.EnableDebuggingHandlers, "Enables server endpoints for log collection and local running of containers and commands") fs.BoolVar(&s.EnableDebuggingHandlers, "enable_debugging_handlers", s.EnableDebuggingHandlers, "Enables server endpoints for log collection and local running of containers and commands")
fs.DurationVar(&s.MinimumGCAge, "minimum_container_ttl_duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'") fs.DurationVar(&s.MinimumGCAge, "minimum_container_ttl_duration", s.MinimumGCAge, "Minimum age for a finished container before it is garbage collected. Examples: '300ms', '10s' or '2h45m'")
fs.IntVar(&s.MaxContainerCount, "maximum_dead_containers_per_container", s.MaxContainerCount, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.") fs.IntVar(&s.MaxPerPodContainerCount, "maximum_dead_containers_per_container", s.MaxPerPodContainerCount, "Maximum number of old instances of a container to retain per container. Each container takes up some disk space. Default: 5.")
fs.IntVar(&s.MaxContainerCount, "maximum_dead_containers", s.MaxContainerCount, "Maximum number of old instances of a containers to retain globally. Each container takes up some disk space. Default: 100.")
fs.StringVar(&s.AuthPath, "auth_path", s.AuthPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.") fs.StringVar(&s.AuthPath, "auth_path", s.AuthPath, "Path to .kubernetes_auth file, specifying how to authenticate to API server.")
fs.UintVar(&s.CadvisorPort, "cadvisor_port", s.CadvisorPort, "The port of the localhost cAdvisor endpoint") fs.UintVar(&s.CadvisorPort, "cadvisor_port", s.CadvisorPort, "The port of the localhost cAdvisor endpoint")
fs.IntVar(&s.OOMScoreAdj, "oom_score_adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]") fs.IntVar(&s.OOMScoreAdj, "oom_score_adj", s.OOMScoreAdj, "The oom_score_adj value for kubelet process. Values must be within the range [-1000, 1000]")
@@ -170,6 +173,7 @@ func (s *KubeletServer) Run(_ []string) error {
RegistryPullQPS: s.RegistryPullQPS, RegistryPullQPS: s.RegistryPullQPS,
RegistryBurst: s.RegistryBurst, RegistryBurst: s.RegistryBurst,
MinimumGCAge: s.MinimumGCAge, MinimumGCAge: s.MinimumGCAge,
MaxPerPodContainerCount: s.MaxPerPodContainerCount,
MaxContainerCount: s.MaxContainerCount, MaxContainerCount: s.MaxContainerCount,
ClusterDomain: s.ClusterDomain, ClusterDomain: s.ClusterDomain,
ClusterDNS: s.ClusterDNS, ClusterDNS: s.ClusterDNS,
@@ -260,7 +264,8 @@ func SimpleRunKubelet(client *client.Client,
StatusUpdateFrequency: 3 * time.Second, StatusUpdateFrequency: 3 * time.Second,
SyncFrequency: 3 * time.Second, SyncFrequency: 3 * time.Second,
MinimumGCAge: 10 * time.Second, MinimumGCAge: 10 * time.Second,
MaxContainerCount: 5, MaxPerPodContainerCount: 5,
MaxContainerCount: 100,
MasterServiceNamespace: masterServiceNamespace, MasterServiceNamespace: masterServiceNamespace,
VolumePlugins: volumePlugins, VolumePlugins: volumePlugins,
TLSOptions: tlsOptions, TLSOptions: tlsOptions,
@@ -359,6 +364,7 @@ type KubeletConfig struct {
RegistryPullQPS float64 RegistryPullQPS float64
RegistryBurst int RegistryBurst int
MinimumGCAge time.Duration MinimumGCAge time.Duration
MaxPerPodContainerCount int
MaxContainerCount int MaxContainerCount int
ClusterDomain string ClusterDomain string
ClusterDNS util.IP ClusterDNS util.IP
@@ -386,6 +392,12 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kubeClient = kc.KubeClient kubeClient = kc.KubeClient
} }
gcPolicy := kubelet.ContainerGCPolicy{
MinAge: kc.MinimumGCAge,
MaxPerPodContainer: kc.MaxPerPodContainerCount,
MaxContainers: kc.MaxContainerCount,
}
k, err := kubelet.NewMainKubelet( k, err := kubelet.NewMainKubelet(
kc.Hostname, kc.Hostname,
kc.DockerClient, kc.DockerClient,
@@ -395,8 +407,7 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub
kc.SyncFrequency, kc.SyncFrequency,
float32(kc.RegistryPullQPS), float32(kc.RegistryPullQPS),
kc.RegistryBurst, kc.RegistryBurst,
kc.MinimumGCAge, gcPolicy,
kc.MaxContainerCount,
pc.SeenAllSources, pc.SeenAllSources,
kc.ClusterDomain, kc.ClusterDomain,
net.IP(kc.ClusterDNS), net.IP(kc.ClusterDNS),

View File

@@ -111,8 +111,7 @@ func NewMainKubelet(
resyncInterval time.Duration, resyncInterval time.Duration,
pullQPS float32, pullQPS float32,
pullBurst int, pullBurst int,
minimumGCAge time.Duration, containerGCPolicy ContainerGCPolicy,
maxContainerCount int,
sourcesReady SourcesReadyFn, sourcesReady SourcesReadyFn,
clusterDomain string, clusterDomain string,
clusterDNS net.IP, clusterDNS net.IP,
@@ -128,9 +127,7 @@ func NewMainKubelet(
if resyncInterval <= 0 { if resyncInterval <= 0 {
return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval) return nil, fmt.Errorf("invalid sync frequency %d", resyncInterval)
} }
if minimumGCAge <= 0 { dockerClient = metrics.NewInstrumentedDockerInterface(dockerClient)
return nil, fmt.Errorf("invalid minimum GC age %d", minimumGCAge)
}
// Wait for the Docker daemon to be up (with a timeout). // Wait for the Docker daemon to be up (with a timeout).
waitStart := time.Now() waitStart := time.Now()
@@ -164,7 +161,11 @@ func NewMainKubelet(
} }
serviceLister := &cache.StoreToServiceLister{serviceStore} serviceLister := &cache.StoreToServiceLister{serviceStore}
dockerClient = metrics.NewInstrumentedDockerInterface(dockerClient) containerGC, err := newContainerGC(dockerClient, containerGCPolicy)
if err != nil {
return nil, err
}
klet := &Kubelet{ klet := &Kubelet{
hostname: hostname, hostname: hostname,
dockerClient: dockerClient, dockerClient: dockerClient,
@@ -178,8 +179,6 @@ func NewMainKubelet(
httpClient: &http.Client{}, httpClient: &http.Client{},
pullQPS: pullQPS, pullQPS: pullQPS,
pullBurst: pullBurst, pullBurst: pullBurst,
minimumGCAge: minimumGCAge,
maxContainerCount: maxContainerCount,
sourcesReady: sourcesReady, sourcesReady: sourcesReady,
clusterDomain: clusterDomain, clusterDomain: clusterDomain,
clusterDNS: clusterDNS, clusterDNS: clusterDNS,
@@ -190,6 +189,7 @@ func NewMainKubelet(
streamingConnectionIdleTimeout: streamingConnectionIdleTimeout, streamingConnectionIdleTimeout: streamingConnectionIdleTimeout,
recorder: recorder, recorder: recorder,
cadvisor: cadvisorInterface, cadvisor: cadvisorInterface,
containerGC: containerGC,
} }
dockerCache, err := dockertools.NewDockerCache(dockerClient) dockerCache, err := dockertools.NewDockerCache(dockerClient)
@@ -268,10 +268,6 @@ type Kubelet struct {
// cAdvisor used for container information. // cAdvisor used for container information.
cadvisor cadvisor.Interface cadvisor cadvisor.Interface
// Optional, minimum age required for garbage collection. If zero, no limit.
minimumGCAge time.Duration
maxContainerCount int
// If non-empty, use this for container DNS search. // If non-empty, use this for container DNS search.
clusterDomain string clusterDomain string
@@ -302,6 +298,9 @@ type Kubelet struct {
// A mirror pod manager which provides helper functions. // A mirror pod manager which provides helper functions.
mirrorManager mirrorManager mirrorManager mirrorManager
// Policy for handling garbage collection of dead containers.
containerGC containerGC
} }
// getRootDir returns the full path to the directory under which kubelet can // getRootDir returns the full path to the directory under which kubelet can
@@ -443,109 +442,14 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) {
return pods, nil return pods, nil
} }
type ByCreated []*docker.Container
func (a ByCreated) Len() int { return len(a) }
func (a ByCreated) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByCreated) Less(i, j int) bool { return a[i].Created.After(a[j].Created) }
// TODO: these removals are racy, we should make dockerclient threadsafe across List/Inspect transactions.
func (kl *Kubelet) purgeOldest(ids []string) error {
dockerData := []*docker.Container{}
for _, id := range ids {
data, err := kl.dockerClient.InspectContainer(id)
if err != nil {
return err
}
if !data.State.Running && (time.Now().Sub(data.State.FinishedAt) > kl.minimumGCAge) {
dockerData = append(dockerData, data)
}
}
sort.Sort(ByCreated(dockerData))
if len(dockerData) <= kl.maxContainerCount {
return nil
}
dockerData = dockerData[kl.maxContainerCount:]
for _, data := range dockerData {
if err := kl.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: data.ID}); err != nil {
return err
}
}
return nil
}
func (kl *Kubelet) GarbageCollectLoop() { func (kl *Kubelet) GarbageCollectLoop() {
util.Forever(func() { util.Forever(func() {
if err := kl.GarbageCollectContainers(); err != nil { if err := kl.containerGC.GarbageCollect(); err != nil {
glog.Errorf("Garbage collect failed: %v", err) glog.Errorf("Container garbage collect failed: %v", err)
} }
}, time.Minute*1) }, time.Minute*1)
} }
// TODO: Also enforce a maximum total number of containers.
func (kl *Kubelet) GarbageCollectContainers() error {
if kl.maxContainerCount == 0 {
return nil
}
containers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, true)
if err != nil {
return err
}
type unidentifiedContainer struct {
// Docker ID.
id string
// Docker container name
name string
}
unidentifiedContainers := make([]unidentifiedContainer, 0)
uidToIDMap := map[string][]string{}
for _, container := range containers {
_, uid, name, _, err := dockertools.ParseDockerName(container.Names[0])
if err != nil {
unidentifiedContainers = append(unidentifiedContainers, unidentifiedContainer{
id: container.ID,
name: container.Names[0],
})
continue
}
uidName := string(uid) + "." + name
uidToIDMap[uidName] = append(uidToIDMap[uidName], container.ID)
}
// Remove all non-running unidentified containers.
for _, container := range unidentifiedContainers {
data, err := kl.dockerClient.InspectContainer(container.id)
if err != nil {
return err
}
if data.State.Running {
continue
}
glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id)
err = kl.dockerClient.RemoveContainer(docker.RemoveContainerOptions{ID: container.id})
if err != nil {
return err
}
}
// Evict dead containers according to our policies.
for _, list := range uidToIDMap {
if len(list) <= kl.maxContainerCount {
continue
}
if err := kl.purgeOldest(list); err != nil {
return err
}
}
return nil
}
func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) { func (kl *Kubelet) getPodStatusFromCache(podFullName string) (api.PodStatus, bool) {
kl.podStatusesLock.RLock() kl.podStatusesLock.RLock()
defer kl.podStatusesLock.RUnlock() defer kl.podStatusesLock.RUnlock()

View File

@@ -139,7 +139,7 @@ func verifyStringArrayEqualsAnyOrder(t *testing.T, actual, expected []string) {
} }
} }
if !found { if !found {
t.Errorf("Expected element %s not found in %#v", exp, actual) t.Errorf("Expected element %q not found in %#v", exp, actual)
} }
} }
} }
@@ -1692,364 +1692,6 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
} }
} }
func TestKubeletGarbageCollection(t *testing.T) {
tests := []struct {
containers []docker.APIContainers
containerDetails map[string]*docker.Container
expectedRemoved []string
}{
// Remove oldest containers.
{
containers: []docker.APIContainers{
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "1876",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "2876",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "3876",
},
},
containerDetails: map[string]*docker.Container{
"1876": {
State: docker.State{
Running: false,
},
ID: "1876",
Created: time.Now(),
},
},
expectedRemoved: []string{"1876"},
},
// Only remove non-running containers.
{
containers: []docker.APIContainers{
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "1876",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "2876",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "3876",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "4876",
},
},
containerDetails: map[string]*docker.Container{
"1876": {
State: docker.State{
Running: true,
},
ID: "1876",
Created: time.Now(),
},
"2876": {
State: docker.State{
Running: false,
},
ID: "2876",
Created: time.Now(),
},
},
expectedRemoved: []string{"2876"},
},
// Less than maxContainerCount doesn't delete any.
{
containers: []docker.APIContainers{
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "1876",
},
},
},
// maxContainerCount applies per container..
{
containers: []docker.APIContainers{
{
// pod infra container
Names: []string{"/k8s_POD_foo2_new_.beefbeef_40"},
ID: "1706",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo2_new_.beefbeef_40"},
ID: "2706",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo2_new_.beefbeef_40"},
ID: "3706",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "1876",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "2876",
},
{
// pod infra container
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "3876",
},
},
containerDetails: map[string]*docker.Container{
"1706": {
State: docker.State{
Running: false,
},
ID: "1706",
Created: time.Now(),
},
"1876": {
State: docker.State{
Running: false,
},
ID: "1876",
Created: time.Now(),
},
},
expectedRemoved: []string{"1706", "1876"},
},
// Remove non-running unidentified Kubernetes containers.
{
containers: []docker.APIContainers{
{
// Unidentified Kubernetes container.
Names: []string{"/k8s_unidentified"},
ID: "1876",
},
{
// Unidentified (non-running) Kubernetes container.
Names: []string{"/k8s_unidentified"},
ID: "2309",
},
{
// Regular Kubernetes container.
Names: []string{"/k8s_POD_foo_new_.deadbeef_42"},
ID: "3876",
},
},
containerDetails: map[string]*docker.Container{
"1876": {
State: docker.State{
Running: false,
},
ID: "1876",
Created: time.Now(),
},
"2309": {
State: docker.State{
Running: true,
},
ID: "2309",
Created: time.Now(),
},
},
expectedRemoved: []string{"1876"},
},
}
for _, test := range tests {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
kubelet.maxContainerCount = 2
fakeDocker.ContainerList = test.containers
fakeDocker.ContainerMap = test.containerDetails
fakeDocker.Container = &docker.Container{ID: "error", Created: time.Now()}
err := kubelet.GarbageCollectContainers()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyStringArrayEqualsAnyOrder(t, test.expectedRemoved, fakeDocker.Removed)
}
}
func TestPurgeOldest(t *testing.T) {
created := time.Now()
tests := []struct {
ids []string
containerDetails map[string]*docker.Container
expectedRemoved []string
}{
{
ids: []string{"1", "2", "3", "4", "5"},
containerDetails: map[string]*docker.Container{
"1": {
State: docker.State{
Running: true,
},
ID: "1",
Created: created,
},
"2": {
State: docker.State{
Running: false,
},
ID: "2",
Created: created.Add(time.Second),
},
"3": {
State: docker.State{
Running: false,
},
ID: "3",
Created: created.Add(time.Second),
},
"4": {
State: docker.State{
Running: false,
},
ID: "4",
Created: created.Add(time.Second),
},
"5": {
State: docker.State{
Running: false,
},
ID: "5",
Created: created.Add(time.Second),
},
},
},
{
ids: []string{"1", "2", "3", "4", "5", "6"},
containerDetails: map[string]*docker.Container{
"1": {
State: docker.State{
Running: false,
},
ID: "1",
Created: created.Add(time.Second),
},
"2": {
State: docker.State{
Running: false,
},
ID: "2",
Created: created.Add(time.Millisecond),
},
"3": {
State: docker.State{
Running: false,
},
ID: "3",
Created: created.Add(time.Second),
},
"4": {
State: docker.State{
Running: false,
},
ID: "4",
Created: created.Add(time.Second),
},
"5": {
State: docker.State{
Running: false,
},
ID: "5",
Created: created.Add(time.Second),
},
"6": {
State: docker.State{
Running: false,
},
ID: "6",
Created: created.Add(time.Second),
},
},
expectedRemoved: []string{"2"},
},
{
ids: []string{"1", "2", "3", "4", "5", "6", "7"},
containerDetails: map[string]*docker.Container{
"1": {
State: docker.State{
Running: false,
},
ID: "1",
Created: created.Add(time.Second),
},
"2": {
State: docker.State{
Running: false,
},
ID: "2",
Created: created.Add(time.Millisecond),
},
"3": {
State: docker.State{
Running: false,
},
ID: "3",
Created: created.Add(time.Second),
},
"4": {
State: docker.State{
Running: false,
},
ID: "4",
Created: created.Add(time.Second),
},
"5": {
State: docker.State{
Running: false,
},
ID: "5",
Created: created.Add(time.Second),
},
"6": {
State: docker.State{
Running: false,
},
ID: "6",
Created: created.Add(time.Microsecond),
},
"7": {
State: docker.State{
Running: false,
},
ID: "7",
Created: created.Add(time.Second),
},
},
expectedRemoved: []string{"2", "6"},
},
}
for _, test := range tests {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
kubelet.maxContainerCount = 5
fakeDocker.ContainerMap = test.containerDetails
kubelet.purgeOldest(test.ids)
if !reflect.DeepEqual(fakeDocker.Removed, test.expectedRemoved) {
t.Errorf("expected: %v, got: %v", test.expectedRemoved, fakeDocker.Removed)
}
}
}
func TestSyncPodsWithPullPolicy(t *testing.T) { func TestSyncPodsWithPullPolicy(t *testing.T) {
testKubelet := newTestKubelet(t) testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet kubelet := testKubelet.kubelet