diff --git a/hack/test-utils.sh b/hack/test-utils.sh index 7bdbed267..1a802a94d 100644 --- a/hack/test-utils.sh +++ b/hack/test-utils.sh @@ -18,9 +18,14 @@ ROOT="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"/.. . ${ROOT}/hack/versions # CRI_CONTAINERD_FLAGS are the extra flags to use when start cri-containerd. CRI_CONTAINERD_FLAGS=${CRI_CONTAINERD_FLAGS:-""} +# RESTART_WAIT_PERIOD is the period to wait before restarting cri-containerd/containerd. +RESTART_WAIT_PERIOD=${RESTART_WAIT_PERIOD:-10} CRICONTAINERD_SOCK=/var/run/cri-containerd.sock +cri_containerd_pid= +containerd_pid= + # test_setup starts containerd and cri-containerd. test_setup() { local report_dir=$1 @@ -35,22 +40,42 @@ test_setup() { exit 1 fi sudo pkill containerd - sudo containerd &> ${report_dir}/containerd.log & + keepalive "sudo containerd" ${RESTART_WAIT_PERIOD} &> ${report_dir}/containerd.log & + containerd_pid=$! # Wait for containerd to be running by using the containerd client ctr to check the version # of the containerd server. Wait an increasing amount of time after each of five attempts readiness_check "sudo ctr version" # Start cri-containerd - sudo ${ROOT}/_output/cri-containerd --alsologtostderr --v 4 ${CRI_CONTAINERD_FLAGS} \ - &> ${report_dir}/cri-containerd.log & + keepalive "sudo ${ROOT}/_output/cri-containerd --alsologtostderr --v 4 ${CRI_CONTAINERD_FLAGS}" \ + ${RESTART_WAIT_PERIOD} &> ${report_dir}/cri-containerd.log & + cri_containerd_pid=$! readiness_check "sudo ${GOPATH}/bin/crictl --runtime-endpoint=${CRICONTAINERD_SOCK} info" } # test_teardown kills containerd and cri-containerd. test_teardown() { + if [ -n "${containerd_pid}" ]; then + kill ${containerd_pid} + fi + if [ -n "${cri_containerd_pid}" ]; then + kill ${cri_containerd_pid} + fi sudo pkill containerd } +# keepalive runs a command and keeps it alive. +# keepalive process is eventually killed in test_teardown. +keepalive() { + local command=$1 + echo ${command} + local wait_period=$2 + while true; do + ${command} + sleep ${wait_period} + done +} + # readiness_check checks readiness of a daemon with specified command. readiness_check() { local command=$1 diff --git a/integration/restart_test.go b/integration/restart_test.go new file mode 100644 index 000000000..768b45fc9 --- /dev/null +++ b/integration/restart_test.go @@ -0,0 +1,200 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/containerd/containerd" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + "golang.org/x/sys/unix" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" +) + +// Restart test must run sequentially. + +func TestSandboxAcrossCRIContainerdRestart(t *testing.T) { + ctx := context.Background() + sandboxNS := "sandbox-restart-cri-containerd" + sandboxes := []struct { + name string + id string + stateBeforeExit runtime.PodSandboxState + actionAfterExit string + expectedState runtime.PodSandboxState + }{ + { + name: "task-always-ready", + stateBeforeExit: runtime.PodSandboxState_SANDBOX_READY, + expectedState: runtime.PodSandboxState_SANDBOX_READY, + }, + { + name: "task-always-not-ready", + stateBeforeExit: runtime.PodSandboxState_SANDBOX_NOTREADY, + expectedState: runtime.PodSandboxState_SANDBOX_NOTREADY, + }, + { + name: "task-exit-before-restart", + stateBeforeExit: runtime.PodSandboxState_SANDBOX_READY, + actionAfterExit: "kill", + expectedState: runtime.PodSandboxState_SANDBOX_NOTREADY, + }, + { + name: "task-deleted-before-restart", + stateBeforeExit: runtime.PodSandboxState_SANDBOX_READY, + actionAfterExit: "delete", + expectedState: runtime.PodSandboxState_SANDBOX_NOTREADY, + }, + } + t.Logf("Make sure no sandbox is running before test") + existingSandboxes, err := runtimeService.ListPodSandbox(&runtime.PodSandboxFilter{}) + require.NoError(t, err) + require.Empty(t, existingSandboxes) + + t.Logf("Start test sandboxes") + for i := range sandboxes { + s := &sandboxes[i] + cfg := PodSandboxConfig(s.name, sandboxNS) + sb, err := runtimeService.RunPodSandbox(cfg) + require.NoError(t, err) + defer func() { + // Make sure the sandbox is cleaned up in any case. + runtimeService.StopPodSandbox(sb) + runtimeService.RemovePodSandbox(sb) + }() + s.id = sb + if s.stateBeforeExit == runtime.PodSandboxState_SANDBOX_NOTREADY { + require.NoError(t, runtimeService.StopPodSandbox(sb)) + } + } + + t.Logf("Kill cri-containerd") + require.NoError(t, KillProcess("cri-containerd")) + defer func() { + assert.NoError(t, Eventually(func() (bool, error) { + return ConnectDaemons() == nil, nil + }, time.Second, 30*time.Second), "make sure cri-containerd is running before test finish") + }() + + t.Logf("Change sandbox state, must finish before cri-containerd is restarted") + for _, s := range sandboxes { + if s.actionAfterExit == "" { + continue + } + cntr, err := containerdClient.LoadContainer(ctx, s.id) + require.NoError(t, err) + task, err := cntr.Task(ctx, nil) + require.NoError(t, err) + switch s.actionAfterExit { + case "kill": + require.NoError(t, task.Kill(ctx, unix.SIGKILL, containerd.WithKillAll)) + case "delete": + _, err := task.Delete(ctx, containerd.WithProcessKill) + require.NoError(t, err) + } + } + + t.Logf("Wait until cri-containerd is restarted") + require.NoError(t, Eventually(func() (bool, error) { + return ConnectDaemons() == nil, nil + }, time.Second, 30*time.Second), "wait for cri-containerd to be restarted") + + t.Logf("Check sandbox state after restart") + loadedSandboxes, err := runtimeService.ListPodSandbox(&runtime.PodSandboxFilter{}) + require.NoError(t, err) + assert.Len(t, loadedSandboxes, len(sandboxes)) + for _, s := range sandboxes { + for _, loaded := range loadedSandboxes { + if s.id == loaded.Id { + assert.Equal(t, s.expectedState, loaded.State) + break + } + } + } + + t.Logf("Should be able to stop and remove sandbox after restart") + for _, s := range sandboxes { + // Properly stop the sandbox if it's ready before restart. + if s.stateBeforeExit == runtime.PodSandboxState_SANDBOX_READY { + assert.NoError(t, runtimeService.StopPodSandbox(s.id)) + } + assert.NoError(t, runtimeService.RemovePodSandbox(s.id)) + } +} + +// TestSandboxDeletionAcrossCRIContainerdRestart tests the case that sandbox container +// is deleted from containerd during cri-containerd is down. This should not happen. +// However, if this really happens, cri-containerd should not load such sandbox and +// should do best effort cleanup of the sandbox root directory. Note that in this case, +// cri-containerd loses the network namespace of the sandbox, so it won't be able to +// teardown the network properly. +// This test uses host network sandbox to avoid resource leakage. +func TestSandboxDeletionAcrossCRIContainerdRestart(t *testing.T) { + ctx := context.Background() + sandboxNS := "sandbox-delete-restart-cri-containerd" + t.Logf("Make sure no sandbox is running before test") + existingSandboxes, err := runtimeService.ListPodSandbox(&runtime.PodSandboxFilter{}) + require.NoError(t, err) + require.Empty(t, existingSandboxes) + + t.Logf("Start test sandboxes") + cfg := PodSandboxConfig("sandbox", sandboxNS, WithHostNetwork) + sb, err := runtimeService.RunPodSandbox(cfg) + require.NoError(t, err) + defer func() { + // Make sure the sandbox is cleaned up in any case. + runtimeService.StopPodSandbox(sb) + runtimeService.RemovePodSandbox(sb) + }() + + t.Logf("Kill cri-containerd") + require.NoError(t, KillProcess("cri-containerd")) + defer func() { + assert.NoError(t, Eventually(func() (bool, error) { + return ConnectDaemons() == nil, nil + }, time.Second, 30*time.Second), "make sure cri-containerd is running before test finish") + }() + + t.Logf("Delete sandbox container from containerd") + cntr, err := containerdClient.LoadContainer(ctx, sb) + require.NoError(t, err) + task, err := cntr.Task(ctx, nil) + require.NoError(t, err) + _, err = task.Delete(ctx, containerd.WithProcessKill) + require.NoError(t, err) + require.NoError(t, cntr.Delete(ctx, containerd.WithSnapshotCleanup)) + + t.Logf("Wait until cri-containerd is restarted") + require.NoError(t, Eventually(func() (bool, error) { + return ConnectDaemons() == nil, nil + }, time.Second, 30*time.Second), "wait for cri-containerd to be restarted") + + t.Logf("Check sandbox state after restart") + loadedSandboxes, err := runtimeService.ListPodSandbox(&runtime.PodSandboxFilter{}) + require.NoError(t, err) + assert.Empty(t, loadedSandboxes) + + t.Logf("Make sure sandbox root is removed") + sandboxRoot := filepath.Join(criContainerdRoot, "sandboxes", sb) + _, err = os.Stat(sandboxRoot) + assert.True(t, os.IsNotExist(err)) +} diff --git a/integration/test_utils.go b/integration/test_utils.go index a58047ba4..574e0ded7 100644 --- a/integration/test_utils.go +++ b/integration/test_utils.go @@ -18,6 +18,8 @@ package integration import ( "errors" + "fmt" + "os/exec" "time" "github.com/containerd/containerd" @@ -38,6 +40,7 @@ const ( k8sNamespace = "k8s.io" // This is the same with server.k8sContainerdNamespace. containerdEndpoint = "/run/containerd/containerd.sock" criContainerdEndpoint = "/var/run/cri-containerd.sock" + criContainerdRoot = "/var/lib/cri-containerd" ) var ( @@ -48,28 +51,61 @@ var ( ) func init() { + if err := ConnectDaemons(); err != nil { + glog.Exitf("Failed to connect daemons: %v", err) + } +} + +// ConnectDaemons connect cri-containerd and containerd, and initialize the clients. +func ConnectDaemons() error { var err error runtimeService, err = remote.NewRemoteRuntimeService(sock, timeout) if err != nil { - glog.Exitf("Failed to create runtime service: %v", err) + return fmt.Errorf("failed to create runtime service: %v", err) } imageService, err = remote.NewRemoteImageService(sock, timeout) if err != nil { - glog.Exitf("Failed to create image service: %v", err) + return fmt.Errorf("failed to create image service: %v", err) + } + // Since CRI grpc client doesn't have `WithBlock` specified, we + // need to check whether it is actually connected. + // TODO(random-liu): Extend cri remote client to accept extra grpc options. + _, err = runtimeService.ListContainers(&runtime.ContainerFilter{}) + if err != nil { + return fmt.Errorf("failed to list containers: %v", err) + } + _, err = imageService.ListImages(&runtime.ImageFilter{}) + if err != nil { + return fmt.Errorf("failed to list images: %v", err) } containerdClient, err = containerd.New(containerdEndpoint, containerd.WithDefaultNamespace(k8sNamespace)) if err != nil { - glog.Exitf("Failed to connect containerd: %v", err) + return fmt.Errorf("failed to connect containerd: %v", err) } criContainerdClient, err = client.NewCRIContainerdClient(criContainerdEndpoint, timeout) if err != nil { - glog.Exitf("Failed to connect cri-containerd: %v", err) + return fmt.Errorf("failed to connect cri-containerd: %v", err) } + return nil } // Opts sets specific information in pod sandbox config. type PodSandboxOpts func(*runtime.PodSandboxConfig) +func WithHostNetwork(p *runtime.PodSandboxConfig) { + if p.Linux == nil { + p.Linux = &runtime.LinuxPodSandboxConfig{} + } + if p.Linux.SecurityContext == nil { + p.Linux.SecurityContext = &runtime.LinuxSandboxSecurityContext{} + } + if p.Linux.SecurityContext.NamespaceOptions == nil { + p.Linux.SecurityContext.NamespaceOptions = &runtime.NamespaceOption{ + HostNetwork: true, + } + } +} + // PodSandboxConfig generates a pod sandbox config for test. func PodSandboxConfig(name, ns string, opts ...PodSandboxOpts) *runtime.PodSandboxConfig { config := &runtime.PodSandboxConfig{ @@ -164,3 +200,12 @@ func Eventually(f CheckFunc, period, timeout time.Duration) error { func Randomize(str string) string { return str + "-" + util.GenerateID() } + +// KillProcess kills the process by name. pkill is used. +func KillProcess(name string) error { + output, err := exec.Command("pkill", fmt.Sprintf("^%s$", name)).CombinedOutput() + if err != nil { + return fmt.Errorf("failed to kill %q - error: %v, output: %q", name, err, output) + } + return nil +} diff --git a/pkg/client/client.go b/pkg/client/client.go index a7d1cbed5..276186153 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -34,7 +34,9 @@ func NewCRIContainerdClient(endpoint string, timeout time.Duration) (api.CRICont return nil, fmt.Errorf("failed to get dialer: %v", err) } conn, err := grpc.Dial(addr, + grpc.WithBlock(), grpc.WithInsecure(), + // TODO(random-liu): WithTimeout is being deprecated, use context instead. grpc.WithTimeout(timeout), grpc.WithDialer(dialer), )