diff --git a/pkg/ioutil/read_closer.go b/pkg/ioutil/read_closer.go new file mode 100644 index 000000000..39d23be9e --- /dev/null +++ b/pkg/ioutil/read_closer.go @@ -0,0 +1,54 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import ( + "io" + "sync" +) + +// writeCloseInformer wraps a reader with a close function. +type wrapReadCloser struct { + // TODO(random-liu): Evaluate whether the lock introduces + // performance regression. + sync.RWMutex + r io.Reader + closed bool +} + +// NewWrapReadCloser creates a wrapReadCloser from a reader. +func NewWrapReadCloser(r io.Reader) io.ReadCloser { + return &wrapReadCloser{r: r} +} + +// Read reads up to len(p) bytes into p. +func (w *wrapReadCloser) Read(p []byte) (int, error) { + w.RLock() + defer w.RUnlock() + if w.closed { + return 0, io.EOF + } + return w.r.Read(p) +} + +// Close closes read closer. +func (w *wrapReadCloser) Close() error { + w.Lock() + defer w.Unlock() + w.closed = true + return nil +} diff --git a/pkg/ioutil/read_closer_test.go b/pkg/ioutil/read_closer_test.go new file mode 100644 index 000000000..7318ea16f --- /dev/null +++ b/pkg/ioutil/read_closer_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ioutil + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestWrapReadCloser(t *testing.T) { + buf := bytes.NewBufferString("abc") + + rc := NewWrapReadCloser(buf) + dst := make([]byte, 1) + n, err := rc.Read(dst) + assert.Equal(t, 1, n) + assert.NoError(t, err) + assert.Equal(t, []byte("a"), dst) + + n, err = rc.Read(dst) + assert.Equal(t, 1, n) + assert.NoError(t, err) + assert.Equal(t, []byte("b"), dst) + + rc.Close() + n, err = rc.Read(dst) + assert.Equal(t, 0, n) + assert.Equal(t, io.EOF, err) + assert.Equal(t, []byte("b"), dst) +} diff --git a/pkg/server/container_attach.go b/pkg/server/container_attach.go index 2e3825624..7cbf71f63 100644 --- a/pkg/server/container_attach.go +++ b/pkg/server/container_attach.go @@ -25,6 +25,8 @@ import ( "golang.org/x/net/context" "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" ) // Attach prepares a streaming endpoint to attach to a running container, and returns the address. @@ -71,15 +73,19 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s } }) - // TODO(random-liu): Figure out whether we need to support historical output. - if err := cntr.IO.Attach(stdin, stdout, stderr); err != nil { - return fmt.Errorf("failed to attach container: %v", err) + opts := cio.AttachOptions{ + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Tty: tty, + StdinOnce: cntr.Config.StdinOnce, + CloseStdin: func() error { + return task.CloseIO(ctx, containerd.WithStdinCloser) + }, } - - // Close stdin after first attach if StdinOnce is specified, otherwise stdin will - // be kept open until container exits. - if cntr.Config.StdinOnce { - task.CloseIO(ctx, containerd.WithStdinCloser) + // TODO(random-liu): Figure out whether we need to support historical output. + if err := cntr.IO.Attach(opts); err != nil { + return fmt.Errorf("failed to attach container: %v", err) } return nil } diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index 732889ca2..b5fec799f 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" "github.com/golang/glog" + "github.com/opencontainers/runtime-tools/generate" "golang.org/x/net/context" "golang.org/x/sys/unix" "k8s.io/client-go/tools/remotecommand" @@ -96,6 +97,11 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o if err != nil { return nil, fmt.Errorf("failed to load task: %v", err) } + if opts.tty { + g := generate.NewFromSpec(spec) + g.AddProcessEnv("TERM", "xterm") + spec = g.Spec() + } pspec := spec.Process pspec.Args = opts.cmd pspec.Terminal = opts.tty @@ -125,12 +131,6 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o } }() - handleResizing(opts.resize, func(size remotecommand.TerminalSize) { - if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil { - glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err) - } - }) - exitCh, err := process.Wait(ctx) if err != nil { return nil, fmt.Errorf("failed to wait for process %q: %v", execID, err) @@ -139,6 +139,12 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o return nil, fmt.Errorf("failed to start exec %q: %v", execID, err) } + handleResizing(opts.resize, func(size remotecommand.TerminalSize) { + if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil { + glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err) + } + }) + var timeoutCh <-chan time.Time if opts.timeout == 0 { // Do not set timeout if it's 0. diff --git a/pkg/server/io/io.go b/pkg/server/io/io.go index 42f66606a..ba2af8039 100644 --- a/pkg/server/io/io.go +++ b/pkg/server/io/io.go @@ -252,8 +252,20 @@ func (c *ContainerIO) Pipe() (err error) { return nil } +// AttachOptions specifies how to attach to a container. +type AttachOptions struct { + Stdin io.Reader + Stdout io.WriteCloser + Stderr io.WriteCloser + Tty bool + StdinOnce bool + // CloseStdin is the function to close container stdin. + CloseStdin func() error +} + // Attach attaches container stdio. -func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) error { +// TODO(random-liu): Use pools.Copy in docker to reduce memory usage? +func (c *ContainerIO) Attach(opts AttachOptions) error { if c.closer == nil { return errors.New("container io is not initialized") } @@ -263,56 +275,67 @@ func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) err stdoutKey := streamKey(c.id, "attach-"+key, Stdout) stderrKey := streamKey(c.id, "attach-"+key, Stderr) - var stdinCloser io.Closer - if c.stdinPath != "" && stdin != nil { + var stdinRC io.ReadCloser + if c.stdinPath != "" && opts.Stdin != nil { f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700) if err != nil { return err } + // Create a wrapper of stdin which could be closed. Note that the + // wrapper doesn't close the actual stdin, it only stops io.Copy. + // The actual stdin will be closed by stream server. + stdinRC = cioutil.NewWrapReadCloser(opts.Stdin) // Also increase wait group here, so that `closer.Wait` will // also wait for this fifo to be closed. c.closer.wg.Add(1) wg.Add(1) go func(w io.WriteCloser) { - if _, err := io.Copy(w, stdin); err != nil { + if _, err := io.Copy(w, stdinRC); err != nil && err != io.ErrClosedPipe { glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err) } w.Close() glog.V(2).Infof("Attach stream %q closed", stdinKey) - // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr - if stdout != nil { - c.stdout.Remove(stdoutKey) - } - if stderr != nil { - c.stderr.Remove(stderrKey) + if opts.StdinOnce && !opts.Tty { + // Due to kubectl requirements and current docker behavior, when (opts.StdinOnce && + // opts.Tty) we have to close container stdin and keep stdout and stderr open until + // container stops. + if err := opts.CloseStdin(); err != nil { + glog.Errorf("Failed to close stdin for container %q: %v", c.id, err) + } + } else { + if opts.Stdout != nil { + c.stdout.Remove(stdoutKey) + } + if opts.Stderr != nil { + c.stderr.Remove(stderrKey) + } } wg.Done() c.closer.wg.Done() }(f) - stdinCloser = f } attachStream := func(key string, close <-chan struct{}) { <-close glog.V(2).Infof("Attach stream %q closed", key) // Make sure stdin gets closed. - if stdinCloser != nil { - stdinCloser.Close() + if stdinRC != nil { + stdinRC.Close() } wg.Done() } - if stdout != nil { + if opts.Stdout != nil { wg.Add(1) - wc, close := cioutil.NewWriteCloseInformer(stdout) + wc, close := cioutil.NewWriteCloseInformer(opts.Stdout) if err := c.stdout.Add(stdoutKey, wc); err != nil { return err } go attachStream(stdoutKey, close) } - if !c.tty && stderr != nil { + if opts.Tty && opts.Stderr != nil { wg.Add(1) - wc, close := cioutil.NewWriteCloseInformer(stderr) + wc, close := cioutil.NewWriteCloseInformer(opts.Stderr) if err := c.stderr.Add(stderrKey, wc); err != nil { return err }