diff --git a/pkg/process/io.go b/pkg/process/io.go index 78f86047e..9b5503497 100644 --- a/pkg/process/io.go +++ b/pkg/process/io.go @@ -34,7 +34,6 @@ import ( "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/stdio" - "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/sys" "github.com/containerd/fifo" runc "github.com/containerd/go-runc" @@ -42,9 +41,7 @@ import ( "github.com/pkg/errors" ) -const ( - shimLoggerTermTimeout = "io.containerd.timeout.shim.logger.shutdown" -) +const binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup var bufPool = sync.Pool{ New: func() interface{} { @@ -249,11 +246,12 @@ func (c *countingWriteCloser) Close() error { } // NewBinaryIO runs a custom binary process for pluggable shim logging -func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) { +func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) { ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err } + var args []string for k, vs := range uri.Query() { args = append(args, k) @@ -262,20 +260,35 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) } } + var closers []func() error + defer func() { + if err == nil { + return + } + result := multierror.Append(err) + for _, fn := range closers { + result = multierror.Append(result, fn()) + } + err = multierror.Flatten(result) + }() + out, err := newPipe() if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create stdout pipes") } + closers = append(closers, out.Close) serr, err := newPipe() if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to create stderr pipes") } + closers = append(closers, serr.Close) r, w, err := os.Pipe() if err != nil { return nil, err } + closers = append(closers, r.Close, w.Close) cmd := exec.Command(uri.Path, args...) cmd.Env = append(cmd.Env, @@ -287,17 +300,21 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) // don't need to register this with the reaper or wait when // running inside a shim if err := cmd.Start(); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to start binary process") } + closers = append(closers, func() error { return cmd.Process.Kill() }) + // close our side of the pipe after start if err := w.Close(); err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to close write pipe after start") } + // wait for the logging binary to be ready b := make([]byte, 1) if _, err := r.Read(b); err != nil && err != io.EOF { - return nil, err + return nil, errors.Wrap(err, "failed to read from logging binary") } + return &binaryIO{ cmd: cmd, out: out, @@ -366,19 +383,13 @@ func (b *binaryIO) cancel() error { done := make(chan error) go func() { - err := b.cmd.Wait() - if err != nil { - err = errors.Wrap(err, "failed to wait for shim logger process after SIGTERM") - } - done <- err + done <- b.cmd.Wait() }() - termTimeout := timeout.Get(shimLoggerTermTimeout) - select { case err := <-done: return err - case <-time.After(termTimeout): + case <-time.After(binaryIOProcTermTimeout): log.L.Warn("failed to wait for shim logger process to exit, killing") err := b.cmd.Process.Kill() @@ -428,9 +439,15 @@ type pipe struct { } func (p *pipe) Close() error { - err := p.w.Close() - if rerr := p.r.Close(); err == nil { - err = rerr + var result *multierror.Error + + if err := p.w.Close(); err != nil { + result = multierror.Append(result, errors.Wrap(err, "failed to close write pipe")) } - return err + + if err := p.r.Close(); err != nil { + result = multierror.Append(result, errors.Wrap(err, "failed to close read pipe")) + } + + return multierror.Prefix(result.ErrorOrNil(), "pipe:") } diff --git a/pkg/process/io_test.go b/pkg/process/io_test.go new file mode 100644 index 000000000..21b4144f5 --- /dev/null +++ b/pkg/process/io_test.go @@ -0,0 +1,72 @@ +// +build linux + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package process + +import ( + "context" + "io/ioutil" + "net/url" + "testing" + + "github.com/containerd/containerd/namespaces" +) + +func TestNewBinaryIO(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), "test") + uri, _ := url.Parse("binary:///bin/echo?test") + + before := descriptorCount(t) + + io, err := NewBinaryIO(ctx, "1", uri) + if err != nil { + t.Fatal(err) + } + + err = io.Close() + if err != nil { + t.Fatal(err) + } + + after := descriptorCount(t) + if before != after-1 { // one descriptor must be closed from shim logger side + t.Fatalf("some descriptors weren't closed (%d != %d)", before, after) + } +} + +func TestNewBinaryIOCleanup(t *testing.T) { + ctx := namespaces.WithNamespace(context.Background(), "test") + uri, _ := url.Parse("binary:///not/existing") + + before := descriptorCount(t) + _, err := NewBinaryIO(ctx, "2", uri) + if err == nil { + t.Fatal("error expected for invalid binary") + } + + after := descriptorCount(t) + if before != after { + t.Fatalf("some descriptors weren't closed (%d != %d)", before, after) + } +} + +func descriptorCount(t *testing.T) int { + t.Helper() + files, _ := ioutil.ReadDir("/proc/self/fd") + return len(files) +}