Use named pipes for shim logs

Relating to issue [#2606](https://github.com/containerd/containerd/issues/2606)

Co-authored-by: Oliver Stenbom <ostenbom@pivotal.io>
Co-authored-by: Georgi Sabev <georgethebeatle@gmail.com>
Co-authored-by: Giuseppe Capizzi <gcapizzi@pivotal.io>
Co-authored-by: Danail Branekov <danailster@gmail.com>

Signed-off-by: Oliver Stenbom <ostenbom@pivotal.io>
Signed-off-by: Georgi Sabev <georgethebeatle@gmail.com>
Signed-off-by: Giuseppe Capizzi <gcapizzi@pivotal.io>
Signed-off-by: Danail Branekov <danailster@gmail.com>
This commit is contained in:
Julia Nedialkova 2018-11-16 16:11:43 +02:00
parent 44b90df286
commit 1d4105cacf
6 changed files with 370 additions and 14 deletions

View File

@ -21,6 +21,8 @@ import (
"context"
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"testing"
@ -41,6 +43,7 @@ var (
noCriu bool
supportsCriu bool
testNamespace = "testing"
ctrdStdioFilePath string
ctrd = &daemon{}
)
@ -76,13 +79,26 @@ func TestMain(m *testing.M) {
if !noDaemon {
sys.ForceRemoveAll(defaultRoot)
err := ctrd.start("containerd", address, []string{
stdioFile, err := ioutil.TempFile("", "")
if err != nil {
fmt.Fprintf(os.Stderr, "could not create a new stdio temp file: %s\n", err)
os.Exit(1)
}
defer func() {
stdioFile.Close()
os.Remove(stdioFile.Name())
}()
ctrdStdioFilePath = stdioFile.Name()
stdioWriter := io.MultiWriter(stdioFile, buf)
err = ctrd.start("containerd", address, []string{
"--root", defaultRoot,
"--state", defaultState,
"--log-level", "debug",
}, buf, buf)
"--config", createShimDebugConfig(),
}, stdioWriter, stdioWriter)
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %s", err, buf.String())
fmt.Fprintf(os.Stderr, "%s: %s\n", err, buf.String())
os.Exit(1)
}
}
@ -137,6 +153,7 @@ func TestMain(m *testing.M) {
fmt.Fprintln(os.Stderr, "failed to wait for containerd", err)
}
}
if err := sys.ForceRemoveAll(defaultRoot); err != nil {
fmt.Fprintln(os.Stderr, "failed to remove test root dir", err)
os.Exit(1)
@ -343,3 +360,19 @@ func TestClientReconnect(t *testing.T) {
t.Errorf("client closed returned error %v", err)
}
}
func createShimDebugConfig() string {
f, err := ioutil.TempFile("", "containerd-config-")
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create config file: %s\n", err)
os.Exit(1)
}
defer f.Close()
if _, err := f.WriteString("[plugins.linux]\n\tshim_debug = true\n"); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write to config file %s: %s\n", f.Name(), err)
os.Exit(1)
}
return f.Name()
}

View File

@ -23,6 +23,7 @@ import (
"context"
"flag"
"fmt"
"io"
"net"
"os"
"os/exec"
@ -36,6 +37,7 @@ import (
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/namespaces"
shimlog "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/linux/proc"
"github.com/containerd/containerd/runtime/v1/shim"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
@ -92,12 +94,38 @@ func main() {
runtime.GOMAXPROCS(2)
}
stdout, stderr, err := openStdioKeepAlivePipes(workdirFlag)
if err != nil {
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
os.Exit(1)
}
defer func() {
stdout.Close()
stderr.Close()
}()
if err := executeShim(); err != nil {
fmt.Fprintf(os.Stderr, "containerd-shim: %s\n", err)
os.Exit(1)
}
}
// If containerd server process dies, we need the shim to keep stdout/err reader
// FDs so that Linux does not SIGPIPE the shim process if it tries to use its end of
// these pipes.
func openStdioKeepAlivePipes(dir string) (io.ReadCloser, io.ReadCloser, error) {
background := context.Background()
keepStdoutAlive, err := shimlog.OpenShimStdoutLog(background, dir)
if err != nil {
return nil, nil, err
}
keepStderrAlive, err := shimlog.OpenShimStderrLog(background, dir)
if err != nil {
return nil, nil, err
}
return keepStdoutAlive, keepStderrAlive, nil
}
func executeShim() error {
// start handling signals as soon as possible so that things are properly reaped
// or if runtime exits before we hit the handler

View File

@ -24,7 +24,9 @@ import (
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
@ -258,6 +260,213 @@ func TestDaemonRestart(t *testing.T) {
<-statusC
}
func TestShimDoesNotLeakPipes(t *testing.T) {
containerdPid := ctrd.cmd.Process.Pid
initialPipes, err := numPipes(containerdPid)
if err != nil {
t.Fatal(err)
}
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Fatal(err)
}
container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30")))
if err != nil {
t.Fatal(err)
}
task, err := container.NewTask(ctx, empty())
if err != nil {
t.Fatal(err)
}
exitChannel, err := task.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if err := task.Start(ctx); err != nil {
t.Fatal(err)
}
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Fatal(err)
}
<-exitChannel
if _, err := task.Delete(ctx); err != nil {
t.Fatal(err)
}
if err := container.Delete(ctx, WithSnapshotCleanup); err != nil {
t.Fatal(err)
}
currentPipes, err := numPipes(containerdPid)
if err != nil {
t.Fatal(err)
}
if initialPipes != currentPipes {
t.Errorf("Pipes have leaked after container has been deleted. Initially there were %d pipes, after container deletion there were %d pipes", initialPipes, currentPipes)
}
}
func numPipes(pid int) (int, error) {
cmd := exec.Command("sh", "-c", fmt.Sprintf("lsof -p %d | grep pipe", pid))
var stdout bytes.Buffer
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return 0, err
}
return strings.Count(stdout.String(), "\n"), nil
}
func TestDaemonReconnectsToShimIOPipesOnRestart(t *testing.T) {
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
image Image
ctx, cancel = testContext()
id = t.Name()
)
defer cancel()
image, err = client.GetImage(ctx, testImage)
if err != nil {
t.Fatal(err)
}
container, err := client.NewContainer(ctx, id, WithNewSnapshot(id, image), WithNewSpec(oci.WithImageConfig(image), withProcessArgs("sleep", "30")))
if err != nil {
t.Fatal(err)
}
defer container.Delete(ctx, WithSnapshotCleanup)
task, err := container.NewTask(ctx, empty())
if err != nil {
t.Fatal(err)
}
defer task.Delete(ctx)
_, err = task.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if err := task.Start(ctx); err != nil {
t.Fatal(err)
}
if err := ctrd.Restart(nil); err != nil {
t.Fatal(err)
}
waitCtx, waitCancel := context.WithTimeout(ctx, 2*time.Second)
serving, err := client.IsServing(waitCtx)
waitCancel()
if !serving {
t.Fatalf("containerd did not start within 2s: %v", err)
}
// After we restared containerd we write some messages to the log pipes, simulating shim writing stuff there.
// Then we make sure that these messages are available on the containerd log thus proving that the server reconnected to the log pipes
runtimeVersion := getRuntimeVersion()
logDirPath := getLogDirPath(runtimeVersion, id)
switch runtimeVersion {
case "v1":
writeToFile(t, filepath.Join(logDirPath, "shim.stdout.log"), fmt.Sprintf("%s writing to stdout\n", id))
writeToFile(t, filepath.Join(logDirPath, "shim.stderr.log"), fmt.Sprintf("%s writing to stderr\n", id))
case "v2":
writeToFile(t, filepath.Join(logDirPath, "log"), fmt.Sprintf("%s writing to log\n", id))
}
statusC, err := task.Wait(ctx)
if err != nil {
t.Fatal(err)
}
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
t.Fatal(err)
}
<-statusC
stdioContents, err := ioutil.ReadFile(ctrdStdioFilePath)
if err != nil {
t.Fatal(err)
}
switch runtimeVersion {
case "v1":
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stdout", id)) {
t.Fatal("containerd did not connect to the shim stdout pipe")
}
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to stderr", id)) {
t.Fatal("containerd did not connect to the shim stderr pipe")
}
case "v2":
if !strings.Contains(string(stdioContents), fmt.Sprintf("%s writing to log", id)) {
t.Fatal("containerd did not connect to the shim log pipe")
}
}
}
func writeToFile(t *testing.T, filePath, message string) {
writer, err := os.OpenFile(filePath, os.O_WRONLY, 0600)
if err != nil {
t.Fatal(err)
}
if _, err := writer.WriteString(message); err != nil {
t.Fatal(err)
}
if err := writer.Close(); err != nil {
t.Fatal(err)
}
}
func getLogDirPath(runtimeVersion, id string) string {
switch runtimeVersion {
case "v1":
return filepath.Join(defaultRoot, "io.containerd.runtime.v1.linux", testNamespace, id)
case "v2":
return filepath.Join(defaultState, "io.containerd.runtime.v2.task", testNamespace, id)
default:
panic(fmt.Errorf("Unsupported runtime version %s", runtimeVersion))
}
}
func getRuntimeVersion() string {
switch rt := os.Getenv("TEST_RUNTIME"); rt {
case "io.containerd.runc.v1":
return "v2"
default:
return "v1"
}
}
func TestContainerPTY(t *testing.T) {
t.Parallel()

View File

@ -21,6 +21,7 @@ package linux
import (
"context"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
@ -40,6 +41,7 @@ import (
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
"github.com/containerd/containerd/runtime/linux/runctypes"
"github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/linux/proc"
shim "github.com/containerd/containerd/runtime/v1/shim/v1"
runc "github.com/containerd/go-runc"
@ -341,6 +343,30 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
continue
}
logDirPath := filepath.Join(r.root, ns, id)
shimStdoutLog, err := v1.OpenShimStdoutLog(ctx, logDirPath)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
"logDirPath": logDirPath,
}).Error("opening shim stdout log pipe")
continue
}
go io.Copy(os.Stdout, shimStdoutLog)
shimStderrLog, err := v1.OpenShimStderrLog(ctx, logDirPath)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
"logDirPath": logDirPath,
}).Error("opening shim stderr log pipe")
continue
}
go io.Copy(os.Stderr, shimStderrLog)
t, err := newTask(id, ns, pid, s, r.events, r.tasks, bundle)
if err != nil {
log.G(ctx).WithError(err).Error("loading task type")

38
runtime/v1/shim.go Normal file
View File

@ -0,0 +1,38 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"context"
"io"
"path/filepath"
"github.com/containerd/fifo"
"golang.org/x/sys/unix"
)
// OpenShimStdoutLog opens the shim log for reading
func OpenShimStdoutLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) {
return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stdout.log"), unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700)
}
// OpenShimStderrLog opens the shim log
func OpenShimStderrLog(ctx context.Context, logDirPath string) (io.ReadWriteCloser, error) {
return fifo.OpenFifo(ctx, filepath.Join(logDirPath, "shim.stderr.log"), unix.O_RDWR|unix.O_CREAT|unix.O_NONBLOCK, 0700)
}

View File

@ -37,6 +37,7 @@ import (
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
v1 "github.com/containerd/containerd/runtime/v1"
"github.com/containerd/containerd/runtime/v1/shim"
shimapi "github.com/containerd/containerd/runtime/v1/shim/v1"
"github.com/containerd/containerd/sys"
@ -62,7 +63,24 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
}
defer f.Close()
cmd, err := newCommand(binary, daemonAddress, debug, config, f)
var stdoutLog io.ReadWriteCloser
var stderrLog io.ReadWriteCloser
if debug {
stdoutLog, err = v1.OpenShimStdoutLog(ctx, config.WorkDir)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create stdout log")
}
stderrLog, err = v1.OpenShimStderrLog(ctx, config.WorkDir)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to create stderr log")
}
go io.Copy(os.Stdout, stdoutLog)
go io.Copy(os.Stderr, stderrLog)
}
cmd, err := newCommand(binary, daemonAddress, debug, config, f, stdoutLog, stderrLog)
if err != nil {
return nil, nil, err
}
@ -77,6 +95,12 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
go func() {
cmd.Wait()
exitHandler()
if stdoutLog != nil {
stderrLog.Close()
}
if stdoutLog != nil {
stderrLog.Close()
}
}()
log.G(ctx).WithFields(logrus.Fields{
"pid": cmd.Process.Pid,
@ -104,7 +128,7 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
}
}
func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File) (*exec.Cmd, error) {
func newCommand(binary, daemonAddress string, debug bool, config shim.Config, socket *os.File, stdout, stderr io.Writer) (*exec.Cmd, error) {
selfExe, err := os.Executable()
if err != nil {
return nil, err
@ -137,10 +161,8 @@ func newCommand(binary, daemonAddress string, debug bool, config shim.Config, so
cmd.SysProcAttr = getSysProcAttr()
cmd.ExtraFiles = append(cmd.ExtraFiles, socket)
cmd.Env = append(os.Environ(), "GOMAXPROCS=2")
if debug {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
}
cmd.Stdout = stdout
cmd.Stderr = stderr
return cmd, nil
}