diff --git a/pkg/ioutil/writer_group.go b/pkg/ioutil/writer_group.go index 603ecc7e5..7c3dde5cb 100644 --- a/pkg/ioutil/writer_group.go +++ b/pkg/ioutil/writer_group.go @@ -20,8 +20,6 @@ import ( "errors" "io" "sync" - - "github.com/golang/glog" ) // WriterGroup is a group of writers. Writer could be dynamically @@ -73,11 +71,7 @@ func (g *WriterGroup) Write(p []byte) (int, error) { defer g.mu.Unlock() for k, w := range g.writers { n, err := w.Write(p) - if err != nil { - glog.Errorf("Writer %q write error: %v", k, err) - } else if len(p) != n { - glog.Errorf("Writer %q short write error", k) - } else { + if err == nil && len(p) == n { continue } // The writer is closed or in bad state, remove it. diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 8b5b4bcc7..dd0e05259 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -175,7 +175,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C } containerIO, err := cio.NewContainerIO(id, - cio.WithStdin(config.GetStdin()), + cio.WithStdinOpen(config.GetStdin()), cio.WithTerminal(config.GetTty()), cio.WithRootDir(containerRootDir), ) diff --git a/pkg/server/io/io.go b/pkg/server/io/io.go index 259ae066d..0fab2aca0 100644 --- a/pkg/server/io/io.go +++ b/pkg/server/io/io.go @@ -75,17 +75,21 @@ func streamKey(id, name string, stream StreamType) string { // ContainerIO holds the container io. type ContainerIO struct { + id string + dir string stdinPath string stdoutPath string stderrPath string - id string - tty bool - stdin bool + // Configs for the io. + tty bool + openStdin bool + root string + + stdin io.WriteCloser stdout *cioutil.WriterGroup stderr *cioutil.WriterGroup - root string closer *wgCloser } @@ -95,10 +99,10 @@ var _ containerd.IO = &ContainerIO{} // Opts sets specific information to newly created ContainerIO. type Opts func(*ContainerIO) error -// WithStdin enables stdin of the container io. -func WithStdin(stdin bool) Opts { +// WithStdinOpen enables stdin of the container io. +func WithStdinOpen(open bool) Opts { return func(c *ContainerIO) error { - c.stdin = stdin + c.openStdin = open return nil } } @@ -171,7 +175,7 @@ func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) { c.dir = fifos.Dir c.stdoutPath = fifos.Out c.stderrPath = fifos.Err - if c.stdin { + if c.openStdin { c.stdinPath = fifos.In } return c, nil @@ -205,11 +209,11 @@ func (c *ContainerIO) Pipe() (err error) { } }() if c.stdinPath != "" { - // Just create the stdin, only open it when used. if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { return err } - f.Close() + c.stdin = f + set = append(set, f) } if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil { @@ -276,29 +280,25 @@ func (c *ContainerIO) Attach(opts AttachOptions) error { stderrKey := streamKey(c.id, "attach-"+key, Stderr) var stdinRC io.ReadCloser - if c.stdinPath != "" && opts.Stdin != nil { - f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700) - if err != nil { - return err - } + if c.stdin != nil && opts.Stdin != nil { // Create a wrapper of stdin which could be closed. Note that the // wrapper doesn't close the actual stdin, it only stops io.Copy. // The actual stdin will be closed by stream server. stdinRC = cioutil.NewWrapReadCloser(opts.Stdin) // Also increase wait group here, so that `closer.Wait` will // also wait for this fifo to be closed. - c.closer.wg.Add(1) wg.Add(1) - go func(w io.WriteCloser) { - if _, err := io.Copy(w, stdinRC); err != nil && err != io.ErrClosedPipe { + go func() { + if _, err := io.Copy(c.stdin, stdinRC); err != nil { glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err) } - w.Close() glog.V(2).Infof("Attach stream %q closed", stdinKey) if opts.StdinOnce && !opts.Tty { // Due to kubectl requirements and current docker behavior, when (opts.StdinOnce && // opts.Tty) we have to close container stdin and keep stdout and stderr open until // container stops. + c.stdin.Close() + // Also closes the containerd side. if err := opts.CloseStdin(); err != nil { glog.Errorf("Failed to close stdin for container %q: %v", c.id, err) } @@ -311,8 +311,7 @@ func (c *ContainerIO) Attach(opts AttachOptions) error { } } wg.Done() - c.closer.wg.Done() - }(f) + }() } attachStream := func(key string, close <-chan struct{}) { diff --git a/pkg/server/restart.go b/pkg/server/restart.go index 4102b753e..80f56f31d 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -202,7 +202,7 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir // cri-containerd got restarted just during that. In that case, we still // treat the container as `CREATED`. containerIO, err = cio.NewContainerIO(id, - cio.WithStdin(meta.Config.GetStdin()), + cio.WithStdinOpen(meta.Config.GetStdin()), cio.WithTerminal(meta.Config.GetTty()), ) if err != nil {