add sandbox deletor to delete sandboxes on pod delete event

This commit is contained in:
Keerthan Reddy,Mala 2020-07-05 15:54:02 -07:00
parent d4325f42fb
commit 90cc954eed
13 changed files with 338 additions and 18 deletions

View File

@ -25,6 +25,7 @@ go_library(
"kubelet_resources.go",
"kubelet_volumes.go",
"pod_container_deletor.go",
"pod_sandbox_deleter.go",
"pod_workers.go",
"reason_cache.go",
"runonce.go",
@ -177,6 +178,7 @@ go_test(
"kubelet_volumes_linux_test.go",
"kubelet_volumes_test.go",
"pod_container_deletor_test.go",
"pod_sandbox_deleter_test.go",
"pod_workers_test.go",
"reason_cache_test.go",
"runonce_test.go",

View File

@ -114,6 +114,8 @@ type Runtime interface {
GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error)
// Delete a container. If the container is still running, an error is returned.
DeleteContainer(containerID ContainerID) error
// Delete a sandbox. If the container is still running, an error is returned.
DeleteSandbox(sandboxID string) error
// ImageService provides methods to image-related methods.
ImageService
// UpdatePodCIDR sends a new podCIDR to the runtime.
@ -308,6 +310,8 @@ type Status struct {
ID ContainerID
// Name of the container.
Name string
// ID of the sandbox to which this container belongs.
PodSandboxId string
// Status of the container.
State State
// Creation time of the container.

View File

@ -364,6 +364,14 @@ func (f *FakeRuntime) DeleteContainer(containerID kubecontainer.ContainerID) err
return f.Err
}
func (f *FakeRuntime) DeleteSandbox(sandboxID string) error {
f.Lock()
defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "DeleteSandbox")
return f.Err
}
func (f *FakeRuntime) ImageStats() (*kubecontainer.ImageStats, error) {
f.Lock()
defer f.Unlock()

View File

@ -147,6 +147,11 @@ func (r *Mock) DeleteContainer(containerID kubecontainer.ContainerID) error {
return args.Error(0)
}
func (r *Mock) DeleteSandbox(sandboxID string) error {
args := r.Called(sandboxID)
return args.Error(0)
}
func (r *Mock) ImageStats() (*kubecontainer.ImageStats, error) {
args := r.Called()
return args.Get(0).(*kubecontainer.ImageStats), args.Error(1)

View File

@ -654,6 +654,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
klet.containerGC = containerGC
klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))
klet.sandboxDeleter = newPodSandboxDeleter(klet.containerRuntime)
// setup imageManager
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
@ -1095,6 +1096,9 @@ type Kubelet struct {
// trigger deleting containers in a pod
containerDeletor *podContainerDeletor
// trigger deleting sandboxes in a pod
sandboxDeleter *podSandboxDeleter
// config iptables util rules
makeIPTablesUtilChains bool
@ -1866,6 +1870,9 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e)
}
}
if e.Type == pleg.ContainerRemoved {
kl.deletePodSandbox(e.ID)
}
if e.Type == pleg.ContainerDied {
if containerID, ok := e.Data.(string); ok {
@ -2193,6 +2200,16 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
}
}
func (kl *Kubelet) deletePodSandbox(podID types.UID) {
if podStatus, err := kl.podCache.Get(podID); err == nil {
toKeep := 1
if kl.IsPodDeleted(podID) {
toKeep = 0
}
kl.sandboxDeleter.deleteSandboxesInPod(podStatus, toKeep)
}
}
// isSyncPodWorthy filters out events that are not worthy of pod syncing
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContainerRemoved doesn't affect pod state

View File

@ -2454,7 +2454,7 @@ func TestPodResourcesAreReclaimed(t *testing.T) {
pod: &v1.Pod{},
status: v1.PodStatus{},
runtimeStatus: kubecontainer.PodStatus{
ContainerStatuses: []*kubecontainer.ContainerStatus{
ContainerStatuses: []*kubecontainer.Status{
{},
},
},

View File

@ -474,6 +474,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(uid kubetypes.UID, n
cStatus.Message += tMessage
}
}
cStatus.PodSandboxId = c.PodSandboxId
statuses[i] = cStatus
}

View File

@ -161,26 +161,13 @@ func (cgc *containerGC) removeOldestNSandboxes(sandboxes []sandboxGCInfo, toRemo
// Remove from oldest to newest (last to first).
for i := len(sandboxes) - 1; i >= numToKeep; i-- {
if !sandboxes[i].active {
cgc.removeSandbox(sandboxes[i].id)
if err := cgc.manager.DeleteSandbox(sandboxes[i].id); err != nil {
klog.Errorf("Failed to remove sandbox %q: %v", sandboxes[i].id, err)
}
}
}
}
// removeSandbox removes the sandbox by sandboxID.
func (cgc *containerGC) removeSandbox(sandboxID string) {
klog.V(4).Infof("Removing sandbox %q", sandboxID)
// In normal cases, kubelet should've already called StopPodSandbox before
// GC kicks in. To guard against the rare cases where this is not true, try
// stopping the sandbox before removing it.
if err := cgc.client.StopPodSandbox(sandboxID); err != nil {
klog.Errorf("Failed to stop sandbox %q before removing: %v", sandboxID, err)
return
}
if err := cgc.client.RemovePodSandbox(sandboxID); err != nil {
klog.Errorf("Failed to remove sandbox %q: %v", sandboxID, err)
}
}
// evictableContainers gets all containers that are evictable. Evictable containers are: not running
// and created more than MinAge ago.
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, error) {

View File

@ -318,7 +318,7 @@ func TestGetPodStatus(t *testing.T) {
}
// Set fake sandbox and faked containers to fakeRuntime.
makeAndSetFakePod(t, m, fakeRuntime, pod)
sandbox, _ := makeAndSetFakePod(t, m, fakeRuntime, pod)
podStatus, err := m.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
assert.NoError(t, err)
@ -326,6 +326,9 @@ func TestGetPodStatus(t *testing.T) {
assert.Equal(t, pod.Name, podStatus.Name)
assert.Equal(t, pod.Namespace, podStatus.Namespace)
assert.Equal(t, apitest.FakePodSandboxIPs, podStatus.IPs)
for _, containerStatus := range podStatus.ContainerStatuses {
assert.Equal(t, sandbox.Id, containerStatus.PodSandboxId)
}
}
func TestGetPods(t *testing.T) {

View File

@ -304,3 +304,15 @@ func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string,
}
return url.Parse(resp.Url)
}
// DeleteSandbox removes the sandbox by sandboxID..
func (m *kubeGenericRuntimeManager) DeleteSandbox(sandboxID string) error {
klog.V(4).Infof("Removing sandbox %q", sandboxID)
// the stop sandbox is called as part of kill pod but the error is ignored. So,
// we have to call stop sandbox again to make sure that all the resources like
// netwrork are cleaned by runtime.
if err := m.runtimeService.StopPodSandbox(sandboxID); err != nil {
return err
}
return m.runtimeService.RemovePodSandbox(sandboxID)
}

View File

@ -28,6 +28,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
apitest "k8s.io/cri-api/pkg/apis/testing"
"k8s.io/kubernetes/pkg/features"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/runtimeclass"
@ -177,3 +178,40 @@ func newSeccompPod(podFieldProfile, containerFieldProfile *v1.SeccompProfile, po
}
return pod
}
// TestDeleteSandbox tests removing the sandbox.
func TestDeleteSandbox(t *testing.T) {
fakeRuntime, _, m, err := createTestRuntimeManager()
require.NoError(t, err)
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345678",
Name: "bar",
Namespace: "new",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "foo",
Image: "busybox",
ImagePullPolicy: v1.PullIfNotPresent,
},
},
},
}
sandbox := makeFakePodSandbox(t, m, sandboxTemplate{
pod: pod,
createdAt: fakeCreatedAt,
state: runtimeapi.PodSandboxState_SANDBOX_NOTREADY,
})
fakeRuntime.SetFakeSandboxes([]*apitest.FakePodSandbox{sandbox})
err = m.DeleteSandbox(sandbox.Id)
assert.NoError(t, err)
assert.Contains(t, fakeRuntime.Called, "StopPodSandbox")
assert.Contains(t, fakeRuntime.Called, "RemovePodSandbox")
containers, err := fakeRuntime.ListPodSandbox(&runtimeapi.PodSandboxFilter{Id: sandbox.Id})
assert.NoError(t, err)
assert.Empty(t, containers)
}

View File

@ -0,0 +1,83 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sort"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
const (
// The number of sandboxes which can be deleted in parallel.
sandboxDeletionBufferLimit = 20
)
type sandboxStatusByCreatedList []*runtimeapi.PodSandboxStatus
type podSandboxDeleter struct {
worker chan<- string
}
func (a sandboxStatusByCreatedList) Len() int { return len(a) }
func (a sandboxStatusByCreatedList) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a sandboxStatusByCreatedList) Less(i, j int) bool {
return a[i].CreatedAt > a[j].CreatedAt
}
func newPodSandboxDeleter(runtime kubecontainer.Runtime) *podSandboxDeleter {
buffer := make(chan string, sandboxDeletionBufferLimit)
go wait.Forever(func() {
for {
id := <-buffer
if err := runtime.DeleteSandbox(id); err != nil {
klog.Warningf("[pod_sandbox_deleter] DeleteSandbox returned error for (id=%v): %v", id, err)
}
}
}, 0)
return &podSandboxDeleter{
worker: buffer,
}
}
// deleteSandboxesInPod issues sandbox deletion requests for all inactive sandboxes after sorting by creation time
// and skipping toKeep number of sandboxes
func (p *podSandboxDeleter) deleteSandboxesInPod(podStatus *kubecontainer.PodStatus, toKeep int) {
sandboxIDs := sets.NewString()
for _, containerStatus := range podStatus.ContainerStatuses {
sandboxIDs.Insert(containerStatus.PodSandboxId)
}
sandboxStatuses := podStatus.SandboxStatuses
if toKeep > 0 {
sort.Sort(sandboxStatusByCreatedList(sandboxStatuses))
}
for i := len(sandboxStatuses) - 1; i >= toKeep; i-- {
if _, ok := sandboxIDs[sandboxStatuses[i].Id]; !ok && sandboxStatuses[i].State != runtimeapi.PodSandboxState_SANDBOX_READY {
select {
case p.worker <- sandboxStatuses[i].Id:
default:
klog.Warningf("Failed to issue the request to remove sandbox %v", sandboxStatuses[i].Id)
}
}
}
}

View File

@ -0,0 +1,160 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/wait"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
type testPodSandboxDeleter struct {
podSandboxDeleter
deletedSandoxes []string
}
func newTestPodSandboxDeleter() (*testPodSandboxDeleter, chan struct{}) {
buffer := make(chan string, 5)
stopCh := make(chan struct{})
testSandboxDeleter := &testPodSandboxDeleter{
podSandboxDeleter: podSandboxDeleter{
worker: buffer,
},
deletedSandoxes: []string{},
}
go wait.Until(func() {
for {
id, ok := <-buffer
if !ok {
close(stopCh)
break
}
testSandboxDeleter.deletedSandoxes = append(testSandboxDeleter.deletedSandoxes, id)
}
}, 0, stopCh)
return testSandboxDeleter, stopCh
}
func Test_podSandboxDeleter_deleteSandboxesInPod(t *testing.T) {
type args struct {
podStatus *kubecontainer.PodStatus
toKeep int
}
tests := []struct {
name string
args args
want []string
}{
{
name: "ready sandboxes shouldn't be deleted ever",
args: args{
podStatus: &kubecontainer.PodStatus{
SandboxStatuses: []*runtimeapi.PodSandboxStatus{
{
Id: "testsandbox",
State: runtimeapi.PodSandboxState_SANDBOX_READY,
},
},
},
toKeep: 0,
},
want: []string{},
},
{
name: "all unready sandboxes should be deleted if to keep is 0",
args: args{
podStatus: &kubecontainer.PodStatus{
SandboxStatuses: []*runtimeapi.PodSandboxStatus{
{
Id: "testsandbox",
State: runtimeapi.PodSandboxState_SANDBOX_READY,
},
{
Id: "testsandbox1",
State: runtimeapi.PodSandboxState_SANDBOX_NOTREADY,
},
{
Id: "testsandbox2",
State: runtimeapi.PodSandboxState_SANDBOX_NOTREADY,
},
},
},
toKeep: 0,
},
want: []string{"testsandbox1", "testsandbox2"},
},
{
name: "sandboxes with containers shouldn't be deleted",
args: args{
podStatus: &kubecontainer.PodStatus{
ContainerStatuses: []*kubecontainer.Status{
{
PodSandboxId: "testsandbox1",
},
},
SandboxStatuses: []*runtimeapi.PodSandboxStatus{
{
Id: "testsandbox1",
State: runtimeapi.PodSandboxState_SANDBOX_NOTREADY,
},
{
Id: "testsandbox2",
State: runtimeapi.PodSandboxState_SANDBOX_NOTREADY,
},
},
},
toKeep: 0,
},
want: []string{"testsandbox2"},
},
{
name: "latest unready sandboxes shouldn't be deleted if to keep is 1",
args: args{
podStatus: &kubecontainer.PodStatus{
SandboxStatuses: []*runtimeapi.PodSandboxStatus{
{
Id: "testsandbox1",
State: runtimeapi.PodSandboxState_SANDBOX_NOTREADY,
CreatedAt: time.Now().Add(time.Second).UnixNano(),
},
{
Id: "testsandbox2",
State: runtimeapi.PodSandboxState_SANDBOX_NOTREADY,
CreatedAt: time.Now().Add(2 * time.Second).UnixNano(),
},
},
},
toKeep: 1,
},
want: []string{"testsandbox1"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p, stopCh := newTestPodSandboxDeleter()
p.deleteSandboxesInPod(tt.args.podStatus, tt.args.toKeep)
close(p.worker)
<-stopCh
assert.ElementsMatch(t, tt.want, p.deletedSandoxes, tt.name)
})
}
}