diff --git a/client_test.go b/client_test.go index a6b1d5933..1a4cf3962 100644 --- a/client_test.go +++ b/client_test.go @@ -21,6 +21,8 @@ import ( "context" "flag" "fmt" + "io" + "io/ioutil" "os" "os/exec" "testing" @@ -36,11 +38,12 @@ import ( ) var ( - address string - noDaemon bool - noCriu bool - supportsCriu bool - testNamespace = "testing" + address string + noDaemon bool + noCriu bool + supportsCriu bool + testNamespace = "testing" + ctrdStdioFilePath string ctrd = &daemon{} ) @@ -76,13 +79,26 @@ func TestMain(m *testing.M) { if !noDaemon { sys.ForceRemoveAll(defaultRoot) - err := ctrd.start("containerd", address, []string{ + stdioFile, err := ioutil.TempFile("", "") + if err != nil { + fmt.Fprintf(os.Stderr, "could not create a new stdio temp file: %s\n", err) + os.Exit(1) + } + defer func() { + stdioFile.Close() + os.Remove(stdioFile.Name()) + }() + ctrdStdioFilePath = stdioFile.Name() + stdioWriter := io.MultiWriter(stdioFile, buf) + + err = ctrd.start("containerd", address, []string{ "--root", defaultRoot, "--state", defaultState, "--log-level", "debug", - }, buf, buf) + "--config", createShimDebugConfig(), + }, stdioWriter, stdioWriter) if err != nil { - fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String()) + fmt.Fprintf(os.Stderr, "%s: %s\n", err, buf.String()) os.Exit(1) } } @@ -137,6 +153,7 @@ func TestMain(m *testing.M) { fmt.Fprintln(os.Stderr, "failed to wait for containerd", err) } } + if err := sys.ForceRemoveAll(defaultRoot); err != nil { fmt.Fprintln(os.Stderr, "failed to remove test root dir", err) os.Exit(1) @@ -343,3 +360,19 @@ func TestClientReconnect(t *testing.T) { t.Errorf("client closed returned error %v", err) } } + +func createShimDebugConfig() string { + f, err := ioutil.TempFile("", "containerd-config-") + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to create config file: %s\n", err) + os.Exit(1) + } + defer f.Close() + + if _, err := f.WriteString("[plugins.linux]\n\tshim_debug = true\n"); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write to config file %s: %s\n", f.Name(), err) + os.Exit(1) + } + + return f.Name() +} diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index ca0a90ad0..6c59cd1c9 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -23,6 +23,7 @@ import ( "context" "flag" "fmt" + "io" "net" "os" "os/exec" @@ -36,6 +37,7 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/namespaces" + shimlog "github.com/containerd/containerd/runtime/v1" "github.com/containerd/containerd/runtime/v1/linux/proc" "github.com/containerd/containerd/runtime/v1/shim" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" @@ -92,12 +94,38 @@ func main() { runtime.GOMAXPROCS(2) } + stdout, stderr, err := openStdioKeepAlivePipes(workdirFlag) + if err != nil { + fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err) + os.Exit(1) + } + defer func() { + stdout.Close() + stderr.Close() + }() + if err := executeShim(); err != nil { fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err) os.Exit(1) } } +// If containerd server process dies, we need the shim to keep stdout/err reader +// FDs so that Linux does not SIGPIPE the shim process if it tries to use its end of +// these pipes. +func openStdioKeepAlivePipes(dir string) (io.ReadCloser, io.ReadCloser, error) { + background := context.Background() + keepStdoutAlive, err := shimlog.OpenShimStdoutLog(background, dir) + if err != nil { + return nil, nil, err + } + keepStderrAlive, err := shimlog.OpenShimStderrLog(background, dir) + if err != nil { + return nil, nil, err + } + return keepStdoutAlive, keepStderrAlive, nil +} + func executeShim() error { // start handling signals as soon as possible so that things are properly reaped // or if runtime exits before we hit the handler diff --git a/container_linux_test.go b/container_linux_test.go index 60b03362d..fa764d74e 100644 --- a/container_linux_test.go +++ b/container_linux_test.go @@ -24,7 +24,9 @@ import ( "fmt" "io" "io/ioutil" + "os" "os/exec" + "path/filepath" "runtime" "strings" "sync" @@ -258,6 +260,213 @@ func TestDaemonRestart(t *testing.T) { <-statusC } +func TestShimDoesNotLeakPipes(t *testing.T) { + containerdPid := ctrd.cmd.Process.Pid + initialPipes, err := numPipes(containerdPid) + if err != nil { + t.Fatal(err) + } + + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Fatal(err) + } + + container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30"))) + if err != nil { + t.Fatal(err) + } + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + + exitChannel, err := task.Wait(ctx) + if err != nil { + t.Fatal(err) + } + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + t.Fatal(err) + } + + <-exitChannel + + if _, err := task.Delete(ctx); err != nil { + t.Fatal(err) + } + + if err := container.Delete(ctx, WithSnapshotCleanup); err != nil { + t.Fatal(err) + } + + currentPipes, err := numPipes(containerdPid) + if err != nil { + t.Fatal(err) + } + + if initialPipes != currentPipes { + t.Errorf("Pipes have leaked after container has been deleted. Initially there were %d pipes, after container deletion there were %d pipes", initialPipes, currentPipes) + } +} + +func numPipes(pid int) (int, error) { + cmd := exec.Command("sh", "-c", fmt.Sprintf("lsof -p %d | grep pipe", pid)) + + var stdout bytes.Buffer + cmd.Stdout = &stdout + if err := cmd.Run(); err != nil { + return 0, err + } + return strings.Count(stdout.String(), "\n"), nil +} + +func TestDaemonReconnectsToShimIOPipesOnRestart(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + var ( + image Image + ctx, cancel = testContext() + id = t.Name() + ) + defer cancel() + + image, err = client.GetImage(ctx, testImage) + if err != nil { + t.Fatal(err) + } + + container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30"))) + if err != nil { + t.Fatal(err) + } + defer container.Delete(ctx, WithSnapshotCleanup) + + task, err := container.NewTask(ctx, empty()) + if err != nil { + t.Fatal(err) + } + defer task.Delete(ctx) + + _, err = task.Wait(ctx) + if err != nil { + t.Fatal(err) + } + + if err := task.Start(ctx); err != nil { + t.Fatal(err) + } + + if err := ctrd.Restart(nil); err != nil { + t.Fatal(err) + } + + waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second) + serving, err := client.IsServing(waitCtx) + waitCancel() + if !serving { + t.Fatalf("containerd did not start within 2s: %v", err) + } + + // After we restared containerd we write some messages to the log pipes, simulating shim writing stuff there. + // Then we make sure that these messages are available on the containerd log thus proving that the server reconnected to the log pipes + runtimeVersion := getRuntimeVersion() + logDirPath := getLogDirPath(runtimeVersion, id) + + switch runtimeVersion { + case "v1": + writeToFile(t, filepath.Join(logDirPath, "shim.stdout.log"), fmt.Sprintf("%s writing to stdout\n", id)) + writeToFile(t, filepath.Join(logDirPath, "shim.stderr.log"), fmt.Sprintf("%s writing to stderr\n", id)) + case "v2": + writeToFile(t, filepath.Join(logDirPath, "log"), fmt.Sprintf("%s writing to log\n", id)) + } + + statusC, err := task.Wait(ctx) + if err != nil { + t.Fatal(err) + } + + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + t.Fatal(err) + } + + <-statusC + + stdioContents, err := ioutil.ReadFile(ctrdStdioFilePath) + if err != nil { + t.Fatal(err) + } + + switch runtimeVersion { + case "v1": + if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stdout", id)) { + t.Fatal("containerd did not connect to the shim stdout pipe") + } + if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stderr", id)) { + t.Fatal("containerd did not connect to the shim stderr pipe") + } + case "v2": + if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to log", id)) { + t.Fatal("containerd did not connect to the shim log pipe") + } + } +} + +func writeToFile(t *testing.T, filePath, message string) { + writer, err := os.OpenFile(filePath, os.O_WRONLY, 0600) + if err != nil { + t.Fatal(err) + } + if _, err := writer.WriteString(message); err != nil { + t.Fatal(err) + } + if err := writer.Close(); err != nil { + t.Fatal(err) + } +} + +func getLogDirPath(runtimeVersion, id string) string { + switch runtimeVersion { + case "v1": + return filepath.Join(defaultRoot, "io.containerd.runtime.v1.linux", testNamespace, id) + case "v2": + return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id) + default: + panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion)) + } +} + +func getRuntimeVersion() string { + switch rt := os.Getenv("TEST_RUNTIME"); rt { + case "io.containerd.runc.v1": + return "v2" + default: + return "v1" + } +} + func TestContainerPTY(t *testing.T) { t.Parallel() diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index d19b8e516..e1b3cacea 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -21,6 +21,7 @@ package linux import ( "context" "fmt" + "io" "io/ioutil" "os" "path/filepath" @@ -40,6 +41,7 @@ import ( "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime/linux/runctypes" + "github.com/containerd/containerd/runtime/v1" "github.com/containerd/containerd/runtime/v1/linux/proc" shim "github.com/containerd/containerd/runtime/v1/shim/v1" runc "github.com/containerd/go-runc" @@ -341,6 +343,30 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } + logDirPath := filepath.Join(r.root, ns, id) + + shimStdoutLog, err := v1.OpenShimStdoutLog(ctx, logDirPath) + if err != nil { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + "logDirPath": logDirPath, + }).Error("opening shim stdout log pipe") + continue + } + go io.Copy(os.Stdout, shimStdoutLog) + + shimStderrLog, err := v1.OpenShimStderrLog(ctx, logDirPath) + if err != nil { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + "logDirPath": logDirPath, + }).Error("opening shim stderr log pipe") + continue + } + go io.Copy(os.Stderr, shimStderrLog) + t, err := newTask(id, ns, pid, s, r.events, r.tasks, bundle) if err != nil { log.G(ctx).WithError(err).Error("loading task type") diff --git a/runtime/v1/shim.go b/runtime/v1/shim.go new file mode 100644 index 000000000..3942968e1 --- /dev/null +++ b/runtime/v1/shim.go @@ -0,0 +1,38 @@ +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package v1 + +import ( + "context" + "io" + "path/filepath" + + "github.com/containerd/fifo" + "golang.org/x/sys/unix" +) + +// OpenShimStdoutLog opens the shim log for reading +func OpenShimStdoutLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) { + return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stdout.log"), unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700) +} + +// OpenShimStderrLog opens the shim log +func OpenShimStderrLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) { + return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stderr.log"), unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700) +} diff --git a/runtime/v1/shim/client/client.go b/runtime/v1/shim/client/client.go index 015d88c2d..ef740308d 100644 --- a/runtime/v1/shim/client/client.go +++ b/runtime/v1/shim/client/client.go @@ -37,6 +37,7 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + v1 "github.com/containerd/containerd/runtime/v1" "github.com/containerd/containerd/runtime/v1/shim" shimapi "github.com/containerd/containerd/runtime/v1/shim/v1" "github.com/containerd/containerd/sys" @@ -62,7 +63,24 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa } defer f.Close() - cmd, err := newCommand(binary, daemonAddress, debug, config, f) + var stdoutLog io.ReadWriteCloser + var stderrLog io.ReadWriteCloser + if debug { + stdoutLog, err = v1.OpenShimStdoutLog(ctx, config.WorkDir) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create stdout log") + } + + stderrLog, err = v1.OpenShimStderrLog(ctx, config.WorkDir) + if err != nil { + return nil, nil, errors.Wrapf(err, "failed to create stderr log") + } + + go io.Copy(os.Stdout, stdoutLog) + go io.Copy(os.Stderr, stderrLog) + } + + cmd, err := newCommand(binary, daemonAddress, debug, config, f, stdoutLog, stderrLog) if err != nil { return nil, nil, err } @@ -77,6 +95,12 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa go func() { cmd.Wait() exitHandler() + if stdoutLog != nil { + stderrLog.Close() + } + if stdoutLog != nil { + stderrLog.Close() + } }() log.G(ctx).WithFields(logrus.Fields{ "pid": cmd.Process.Pid, @@ -104,7 +128,7 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa } } -func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File) (*exec.Cmd, error) { +func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File, stdout, stderr io.Writer) (*exec.Cmd, error) { selfExe, err := os.Executable() if err != nil { return nil, err @@ -137,10 +161,8 @@ func newCommand(binary, daemonAddress string, debug bool, config shim.Config, so cmd.SysProcAttr = getSysProcAttr() cmd.ExtraFiles = append(cmd.ExtraFiles, socket) cmd.Env = append(os.Environ(), "GOMAXPROCS=2") - if debug { - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - } + cmd.Stdout = stdout + cmd.Stderr = stderr return cmd, nil }