Merge pull request #4195 from mxpv/binary-io

Binary IO fixes and tests
This commit is contained in:
Wei Fu 2020-04-19 10:26:44 +08:00 committed by GitHub
commit 5bfab78acd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 111 additions and 22 deletions

View File

@ -34,7 +34,6 @@ import (
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/stdio" "github.com/containerd/containerd/pkg/stdio"
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/sys" "github.com/containerd/containerd/sys"
"github.com/containerd/fifo" "github.com/containerd/fifo"
runc "github.com/containerd/go-runc" runc "github.com/containerd/go-runc"
@ -42,9 +41,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
const ( const binaryIOProcTermTimeout = 12 * time.Second // Give logger process solid 10 seconds for cleanup
shimLoggerTermTimeout = "io.containerd.timeout.shim.logger.shutdown"
)
var bufPool = sync.Pool{ var bufPool = sync.Pool{
New: func() interface{} { New: func() interface{} {
@ -249,11 +246,12 @@ func (c *countingWriteCloser) Close() error {
} }
// NewBinaryIO runs a custom binary process for pluggable shim logging // NewBinaryIO runs a custom binary process for pluggable shim logging
func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error) { func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (_ runc.IO, err error) {
ns, err := namespaces.NamespaceRequired(ctx) ns, err := namespaces.NamespaceRequired(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
var args []string var args []string
for k, vs := range uri.Query() { for k, vs := range uri.Query() {
args = append(args, k) args = append(args, k)
@ -262,20 +260,35 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error)
} }
} }
var closers []func() error
defer func() {
if err == nil {
return
}
result := multierror.Append(err)
for _, fn := range closers {
result = multierror.Append(result, fn())
}
err = multierror.Flatten(result)
}()
out, err := newPipe() out, err := newPipe()
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "failed to create stdout pipes")
} }
closers = append(closers, out.Close)
serr, err := newPipe() serr, err := newPipe()
if err != nil { if err != nil {
return nil, err return nil, errors.Wrap(err, "failed to create stderr pipes")
} }
closers = append(closers, serr.Close)
r, w, err := os.Pipe() r, w, err := os.Pipe()
if err != nil { if err != nil {
return nil, err return nil, err
} }
closers = append(closers, r.Close, w.Close)
cmd := exec.Command(uri.Path, args...) cmd := exec.Command(uri.Path, args...)
cmd.Env = append(cmd.Env, cmd.Env = append(cmd.Env,
@ -287,17 +300,21 @@ func NewBinaryIO(ctx context.Context, id string, uri *url.URL) (runc.IO, error)
// don't need to register this with the reaper or wait when // don't need to register this with the reaper or wait when
// running inside a shim // running inside a shim
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
return nil, err return nil, errors.Wrap(err, "failed to start binary process")
} }
closers = append(closers, func() error { return cmd.Process.Kill() })
// close our side of the pipe after start // close our side of the pipe after start
if err := w.Close(); err != nil { if err := w.Close(); err != nil {
return nil, err return nil, errors.Wrap(err, "failed to close write pipe after start")
} }
// wait for the logging binary to be ready // wait for the logging binary to be ready
b := make([]byte, 1) b := make([]byte, 1)
if _, err := r.Read(b); err != nil && err != io.EOF { if _, err := r.Read(b); err != nil && err != io.EOF {
return nil, err return nil, errors.Wrap(err, "failed to read from logging binary")
} }
return &binaryIO{ return &binaryIO{
cmd: cmd, cmd: cmd,
out: out, out: out,
@ -366,19 +383,13 @@ func (b *binaryIO) cancel() error {
done := make(chan error) done := make(chan error)
go func() { go func() {
err := b.cmd.Wait() done <- b.cmd.Wait()
if err != nil {
err = errors.Wrap(err, "failed to wait for shim logger process after SIGTERM")
}
done <- err
}() }()
termTimeout := timeout.Get(shimLoggerTermTimeout)
select { select {
case err := <-done: case err := <-done:
return err return err
case <-time.After(termTimeout): case <-time.After(binaryIOProcTermTimeout):
log.L.Warn("failed to wait for shim logger process to exit, killing") log.L.Warn("failed to wait for shim logger process to exit, killing")
err := b.cmd.Process.Kill() err := b.cmd.Process.Kill()
@ -428,9 +439,15 @@ type pipe struct {
} }
func (p *pipe) Close() error { func (p *pipe) Close() error {
err := p.w.Close() var result *multierror.Error
if rerr := p.r.Close(); err == nil {
err = rerr if err := p.w.Close(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to close write pipe"))
} }
return err
if err := p.r.Close(); err != nil {
result = multierror.Append(result, errors.Wrap(err, "failed to close read pipe"))
}
return multierror.Prefix(result.ErrorOrNil(), "pipe:")
} }

72
pkg/process/io_test.go Normal file
View File

@ -0,0 +1,72 @@
// +build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package process
import (
"context"
"io/ioutil"
"net/url"
"testing"
"github.com/containerd/containerd/namespaces"
)
func TestNewBinaryIO(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), "test")
uri, _ := url.Parse("binary:///bin/echo?test")
before := descriptorCount(t)
io, err := NewBinaryIO(ctx, "1", uri)
if err != nil {
t.Fatal(err)
}
err = io.Close()
if err != nil {
t.Fatal(err)
}
after := descriptorCount(t)
if before != after-1 { // one descriptor must be closed from shim logger side
t.Fatalf("some descriptors weren't closed (%d != %d)", before, after)
}
}
func TestNewBinaryIOCleanup(t *testing.T) {
ctx := namespaces.WithNamespace(context.Background(), "test")
uri, _ := url.Parse("binary:///not/existing")
before := descriptorCount(t)
_, err := NewBinaryIO(ctx, "2", uri)
if err == nil {
t.Fatal("error expected for invalid binary")
}
after := descriptorCount(t)
if before != after {
t.Fatalf("some descriptors weren't closed (%d != %d)", before, after)
}
}
func descriptorCount(t *testing.T) int {
t.Helper()
files, _ := ioutil.ReadDir("/proc/self/fd")
return len(files)
}