Merge pull request #406 from Random-Liu/fix-attach-stdin
Keep stdin open instead of opening when use it.
This commit is contained in:
commit
b6de04772d
@ -20,8 +20,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// WriterGroup is a group of writers. Writer could be dynamically
|
// WriterGroup is a group of writers. Writer could be dynamically
|
||||||
@ -73,11 +71,7 @@ func (g *WriterGroup) Write(p []byte) (int, error) {
|
|||||||
defer g.mu.Unlock()
|
defer g.mu.Unlock()
|
||||||
for k, w := range g.writers {
|
for k, w := range g.writers {
|
||||||
n, err := w.Write(p)
|
n, err := w.Write(p)
|
||||||
if err != nil {
|
if err == nil && len(p) == n {
|
||||||
glog.Errorf("Writer %q write error: %v", k, err)
|
|
||||||
} else if len(p) != n {
|
|
||||||
glog.Errorf("Writer %q short write error", k)
|
|
||||||
} else {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// The writer is closed or in bad state, remove it.
|
// The writer is closed or in bad state, remove it.
|
||||||
|
@ -175,7 +175,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
containerIO, err := cio.NewContainerIO(id,
|
containerIO, err := cio.NewContainerIO(id,
|
||||||
cio.WithStdin(config.GetStdin()),
|
cio.WithStdinOpen(config.GetStdin()),
|
||||||
cio.WithTerminal(config.GetTty()),
|
cio.WithTerminal(config.GetTty()),
|
||||||
cio.WithRootDir(containerRootDir),
|
cio.WithRootDir(containerRootDir),
|
||||||
)
|
)
|
||||||
|
@ -75,17 +75,21 @@ func streamKey(id, name string, stream StreamType) string {
|
|||||||
|
|
||||||
// ContainerIO holds the container io.
|
// ContainerIO holds the container io.
|
||||||
type ContainerIO struct {
|
type ContainerIO struct {
|
||||||
|
id string
|
||||||
|
|
||||||
dir string
|
dir string
|
||||||
stdinPath string
|
stdinPath string
|
||||||
stdoutPath string
|
stdoutPath string
|
||||||
stderrPath string
|
stderrPath string
|
||||||
|
|
||||||
id string
|
// Configs for the io.
|
||||||
tty bool
|
tty bool
|
||||||
stdin bool
|
openStdin bool
|
||||||
|
root string
|
||||||
|
|
||||||
|
stdin io.WriteCloser
|
||||||
stdout *cioutil.WriterGroup
|
stdout *cioutil.WriterGroup
|
||||||
stderr *cioutil.WriterGroup
|
stderr *cioutil.WriterGroup
|
||||||
root string
|
|
||||||
|
|
||||||
closer *wgCloser
|
closer *wgCloser
|
||||||
}
|
}
|
||||||
@ -95,10 +99,10 @@ var _ containerd.IO = &ContainerIO{}
|
|||||||
// Opts sets specific information to newly created ContainerIO.
|
// Opts sets specific information to newly created ContainerIO.
|
||||||
type Opts func(*ContainerIO) error
|
type Opts func(*ContainerIO) error
|
||||||
|
|
||||||
// WithStdin enables stdin of the container io.
|
// WithStdinOpen enables stdin of the container io.
|
||||||
func WithStdin(stdin bool) Opts {
|
func WithStdinOpen(open bool) Opts {
|
||||||
return func(c *ContainerIO) error {
|
return func(c *ContainerIO) error {
|
||||||
c.stdin = stdin
|
c.openStdin = open
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -171,7 +175,7 @@ func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) {
|
|||||||
c.dir = fifos.Dir
|
c.dir = fifos.Dir
|
||||||
c.stdoutPath = fifos.Out
|
c.stdoutPath = fifos.Out
|
||||||
c.stderrPath = fifos.Err
|
c.stderrPath = fifos.Err
|
||||||
if c.stdin {
|
if c.openStdin {
|
||||||
c.stdinPath = fifos.In
|
c.stdinPath = fifos.In
|
||||||
}
|
}
|
||||||
return c, nil
|
return c, nil
|
||||||
@ -205,11 +209,11 @@ func (c *ContainerIO) Pipe() (err error) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if c.stdinPath != "" {
|
if c.stdinPath != "" {
|
||||||
// Just create the stdin, only open it when used.
|
|
||||||
if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
f.Close()
|
c.stdin = f
|
||||||
|
set = append(set, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
|
||||||
@ -276,29 +280,25 @@ func (c *ContainerIO) Attach(opts AttachOptions) error {
|
|||||||
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
|
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
|
||||||
|
|
||||||
var stdinRC io.ReadCloser
|
var stdinRC io.ReadCloser
|
||||||
if c.stdinPath != "" && opts.Stdin != nil {
|
if c.stdin != nil && opts.Stdin != nil {
|
||||||
f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Create a wrapper of stdin which could be closed. Note that the
|
// Create a wrapper of stdin which could be closed. Note that the
|
||||||
// wrapper doesn't close the actual stdin, it only stops io.Copy.
|
// wrapper doesn't close the actual stdin, it only stops io.Copy.
|
||||||
// The actual stdin will be closed by stream server.
|
// The actual stdin will be closed by stream server.
|
||||||
stdinRC = cioutil.NewWrapReadCloser(opts.Stdin)
|
stdinRC = cioutil.NewWrapReadCloser(opts.Stdin)
|
||||||
// Also increase wait group here, so that `closer.Wait` will
|
// Also increase wait group here, so that `closer.Wait` will
|
||||||
// also wait for this fifo to be closed.
|
// also wait for this fifo to be closed.
|
||||||
c.closer.wg.Add(1)
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(w io.WriteCloser) {
|
go func() {
|
||||||
if _, err := io.Copy(w, stdinRC); err != nil && err != io.ErrClosedPipe {
|
if _, err := io.Copy(c.stdin, stdinRC); err != nil {
|
||||||
glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err)
|
glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err)
|
||||||
}
|
}
|
||||||
w.Close()
|
|
||||||
glog.V(2).Infof("Attach stream %q closed", stdinKey)
|
glog.V(2).Infof("Attach stream %q closed", stdinKey)
|
||||||
if opts.StdinOnce && !opts.Tty {
|
if opts.StdinOnce && !opts.Tty {
|
||||||
// Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
|
// Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
|
||||||
// opts.Tty) we have to close container stdin and keep stdout and stderr open until
|
// opts.Tty) we have to close container stdin and keep stdout and stderr open until
|
||||||
// container stops.
|
// container stops.
|
||||||
|
c.stdin.Close()
|
||||||
|
// Also closes the containerd side.
|
||||||
if err := opts.CloseStdin(); err != nil {
|
if err := opts.CloseStdin(); err != nil {
|
||||||
glog.Errorf("Failed to close stdin for container %q: %v", c.id, err)
|
glog.Errorf("Failed to close stdin for container %q: %v", c.id, err)
|
||||||
}
|
}
|
||||||
@ -311,8 +311,7 @@ func (c *ContainerIO) Attach(opts AttachOptions) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
wg.Done()
|
wg.Done()
|
||||||
c.closer.wg.Done()
|
}()
|
||||||
}(f)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
attachStream := func(key string, close <-chan struct{}) {
|
attachStream := func(key string, close <-chan struct{}) {
|
||||||
|
@ -202,7 +202,7 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
|
|||||||
// cri-containerd got restarted just during that. In that case, we still
|
// cri-containerd got restarted just during that. In that case, we still
|
||||||
// treat the container as `CREATED`.
|
// treat the container as `CREATED`.
|
||||||
containerIO, err = cio.NewContainerIO(id,
|
containerIO, err = cio.NewContainerIO(id,
|
||||||
cio.WithStdin(meta.Config.GetStdin()),
|
cio.WithStdinOpen(meta.Config.GetStdin()),
|
||||||
cio.WithTerminal(meta.Config.GetTty()),
|
cio.WithTerminal(meta.Config.GetTty()),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user