From 3557cffbbbd8edf88f0875adee952d2ffd9dcb61 Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Fri, 10 Nov 2017 18:34:59 +0000 Subject: [PATCH] Fix container exec Signed-off-by: Lantao Liu --- pkg/server/container_execsync.go | 49 ++++++---- pkg/server/io/exec_io.go | 152 +++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 16 deletions(-) create mode 100644 pkg/server/io/exec_io.go diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index b5fec799f..594147bb1 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -20,7 +20,6 @@ import ( "bytes" "fmt" "io" - "io/ioutil" "time" "github.com/containerd/containerd" @@ -32,6 +31,8 @@ import ( "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" "github.com/kubernetes-incubator/cri-containerd/pkg/util" ) @@ -41,8 +42,8 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync var stdout, stderr bytes.Buffer exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{ cmd: r.GetCmd(), - stdout: &stdout, - stderr: &stderr, + stdout: cioutil.NewNopWriteCloser(&stdout), + stderr: cioutil.NewNopWriteCloser(&stderr), timeout: time.Duration(r.GetTimeout()) * time.Second, }) if err != nil { @@ -60,8 +61,8 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync type execOptions struct { cmd []string stdin io.Reader - stdout io.Writer - stderr io.Writer + stdout io.WriteCloser + stderr io.WriteCloser tty bool resize <-chan remotecommand.TerminalSize timeout time.Duration @@ -106,22 +107,23 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o pspec.Args = opts.cmd pspec.Terminal = opts.tty - if opts.stdin == nil { - opts.stdin = new(bytes.Buffer) - } if opts.stdout == nil { - opts.stdout = ioutil.Discard + opts.stdout = cio.NewDiscardLogger() } if opts.stderr == nil { - opts.stderr = ioutil.Discard + opts.stderr = cio.NewDiscardLogger() } execID := util.GenerateID() - process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( - opts.stdin, - opts.stdout, - opts.stderr, - opts.tty, - )) + glog.V(4).Infof("Generated exec id %q for container %q", execID, id) + rootDir := getContainerRootDir(c.config.RootDir, id) + var execIO *cio.ExecIO + process, err := task.Exec(ctx, execID, pspec, + func(id string) (containerd.IO, error) { + var err error + execIO, err = cio.NewExecIO(id, rootDir, opts.tty, opts.stdin != nil) + return execIO, err + }, + ) if err != nil { return nil, fmt.Errorf("failed to create exec %q: %v", execID, err) } @@ -145,6 +147,17 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o } }) + attachDone := execIO.Attach(cio.AttachOptions{ + Stdin: opts.stdin, + Stdout: opts.stdout, + Stderr: opts.stderr, + Tty: opts.tty, + StdinOnce: true, + CloseStdin: func() error { + return process.CloseIO(ctx, containerd.WithStdinCloser) + }, + }) + var timeoutCh <-chan time.Time if opts.timeout == 0 { // Do not set timeout if it's 0. @@ -163,6 +176,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o exitRes := <-exitCh glog.V(2).Infof("Timeout received while waiting for exec process kill %q code %d and error %v", execID, exitRes.ExitCode(), exitRes.Error()) + <-attachDone + glog.V(4).Infof("Stream pipe for exec process %q done", execID) return nil, fmt.Errorf("timeout %v exceeded", opts.timeout) case exitRes := <-exitCh: code, _, err := exitRes.Result() @@ -170,6 +185,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o if err != nil { return nil, fmt.Errorf("failed while waiting for exec %q: %v", execID, err) } + <-attachDone + glog.V(4).Infof("Stream pipe for exec process %q done", execID) return &code, nil } } diff --git a/pkg/server/io/exec_io.go b/pkg/server/io/exec_io.go new file mode 100644 index 000000000..35e2dde8a --- /dev/null +++ b/pkg/server/io/exec_io.go @@ -0,0 +1,152 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io + +import ( + "io" + "os" + "sync" + + "github.com/containerd/containerd" + "github.com/golang/glog" + + cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil" +) + +// ExecIO holds the exec io. +type ExecIO struct { + id string + fifos *containerd.FIFOSet + *stdioPipes + closer *wgCloser +} + +var _ containerd.IO = &ExecIO{} + +// NewExecIO creates exec io. +func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) { + fifos, err := newFifos(root, id, tty, stdin) + if err != nil { + return nil, err + } + stdio, closer, err := newStdioPipes(fifos) + if err != nil { + return nil, err + } + return &ExecIO{ + id: id, + fifos: fifos, + stdioPipes: stdio, + closer: closer, + }, nil +} + +// Config returns io config. +func (e *ExecIO) Config() containerd.IOConfig { + return containerd.IOConfig{ + Terminal: e.fifos.Terminal, + Stdin: e.fifos.In, + Stdout: e.fifos.Out, + Stderr: e.fifos.Err, + } +} + +// Attach attaches exec stdio. The logic is similar with container io attach. +func (e *ExecIO) Attach(opts AttachOptions) <-chan struct{} { + var wg sync.WaitGroup + var stdinStreamRC io.ReadCloser + if e.stdin != nil && opts.Stdin != nil { + stdinStreamRC = cioutil.NewWrapReadCloser(opts.Stdin) + wg.Add(1) + go func() { + if _, err := io.Copy(e.stdin, stdinStreamRC); err != nil { + glog.Errorf("Failed to redirect stdin for container exec %q: %v", e.id, err) + } + glog.V(2).Infof("Container exec %q stdin closed", e.id) + if opts.StdinOnce && !opts.Tty { + e.stdin.Close() + if err := opts.CloseStdin(); err != nil { + glog.Errorf("Failed to close stdin for container exec %q: %v", e.id, err) + } + } else { + if e.stdout != nil { + e.stdout.Close() + } + if e.stderr != nil { + e.stderr.Close() + } + } + wg.Done() + }() + } + + attachOutput := func(t StreamType, stream io.WriteCloser, out io.ReadCloser) { + if _, err := io.Copy(stream, out); err != nil { + glog.Errorf("Failed to pipe %q for container exec %q: %v", t, e.id, err) + } + out.Close() + stream.Close() + if stdinStreamRC != nil { + stdinStreamRC.Close() + } + e.closer.wg.Done() + wg.Done() + glog.V(2).Infof("Finish piping %q of container exec %q", t, e.id) + } + + if opts.Stdout != nil { + wg.Add(1) + // Closer should wait for this routine to be over. + e.closer.wg.Add(1) + go attachOutput(Stdout, opts.Stdout, e.stdout) + } + + if !opts.Tty && opts.Stderr != nil { + wg.Add(1) + // Closer should wait for this routine to be over. + e.closer.wg.Add(1) + go attachOutput(Stderr, opts.Stderr, e.stderr) + } + + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + return done +} + +// Cancel cancels exec io. +func (e *ExecIO) Cancel() { + e.closer.Cancel() +} + +// Wait waits exec io to finish. +func (e *ExecIO) Wait() { + e.closer.Wait() +} + +// Close closes all FIFOs. +func (e *ExecIO) Close() error { + if e.closer != nil { + e.closer.Close() + } + if e.fifos != nil { + return os.RemoveAll(e.fifos.Dir) + } + return nil +}