Merge pull request #394 from Random-Liu/fix-container-streaming

Various fixes for container streaming.
This commit is contained in:
Lantao Liu 2017-11-06 14:09:30 -08:00 committed by GitHub
commit 68e74dc16a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 167 additions and 31 deletions

54
pkg/ioutil/read_closer.go Normal file
View File

@ -0,0 +1,54 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"io"
"sync"
)
// writeCloseInformer wraps a reader with a close function.
type wrapReadCloser struct {
// TODO(random-liu): Evaluate whether the lock introduces
// performance regression.
sync.RWMutex
r io.Reader
closed bool
}
// NewWrapReadCloser creates a wrapReadCloser from a reader.
func NewWrapReadCloser(r io.Reader) io.ReadCloser {
return &wrapReadCloser{r: r}
}
// Read reads up to len(p) bytes into p.
func (w *wrapReadCloser) Read(p []byte) (int, error) {
w.RLock()
defer w.RUnlock()
if w.closed {
return 0, io.EOF
}
return w.r.Read(p)
}
// Close closes read closer.
func (w *wrapReadCloser) Close() error {
w.Lock()
defer w.Unlock()
w.closed = true
return nil
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWrapReadCloser(t *testing.T) {
buf := bytes.NewBufferString("abc")
rc := NewWrapReadCloser(buf)
dst := make([]byte, 1)
n, err := rc.Read(dst)
assert.Equal(t, 1, n)
assert.NoError(t, err)
assert.Equal(t, []byte("a"), dst)
n, err = rc.Read(dst)
assert.Equal(t, 1, n)
assert.NoError(t, err)
assert.Equal(t, []byte("b"), dst)
rc.Close()
n, err = rc.Read(dst)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)
assert.Equal(t, []byte("b"), dst)
}

View File

@ -25,6 +25,8 @@ import (
"golang.org/x/net/context"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
)
// Attach prepares a streaming endpoint to attach to a running container, and returns the address.
@ -64,15 +66,19 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s
}
})
// TODO(random-liu): Figure out whether we need to support historical output.
if err := cntr.IO.Attach(stdin, stdout, stderr); err != nil {
return fmt.Errorf("failed to attach container: %v", err)
opts := cio.AttachOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: tty,
StdinOnce: cntr.Config.StdinOnce,
CloseStdin: func() error {
return task.CloseIO(ctx, containerd.WithStdinCloser)
},
}
// Close stdin after first attach if StdinOnce is specified, otherwise stdin will
// be kept open until container exits.
if cntr.Config.StdinOnce {
task.CloseIO(ctx, containerd.WithStdinCloser)
// TODO(random-liu): Figure out whether we need to support historical output.
if err := cntr.IO.Attach(opts); err != nil {
return fmt.Errorf("failed to attach container: %v", err)
}
return nil
}

View File

@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/golang/glog"
"github.com/opencontainers/runtime-tools/generate"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"k8s.io/client-go/tools/remotecommand"
@ -96,6 +97,11 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
if err != nil {
return nil, fmt.Errorf("failed to load task: %v", err)
}
if opts.tty {
g := generate.NewFromSpec(spec)
g.AddProcessEnv("TERM", "xterm")
spec = g.Spec()
}
pspec := spec.Process
pspec.Args = opts.cmd
pspec.Terminal = opts.tty
@ -125,12 +131,6 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
}
}()
handleResizing(opts.resize, func(size remotecommand.TerminalSize) {
if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil {
glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err)
}
})
exitCh, err := process.Wait(ctx)
if err != nil {
return nil, fmt.Errorf("failed to wait for process %q: %v", execID, err)
@ -139,6 +139,12 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
return nil, fmt.Errorf("failed to start exec %q: %v", execID, err)
}
handleResizing(opts.resize, func(size remotecommand.TerminalSize) {
if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil {
glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err)
}
})
var timeoutCh <-chan time.Time
if opts.timeout == 0 {
// Do not set timeout if it's 0.

View File

@ -252,8 +252,20 @@ func (c *ContainerIO) Pipe() (err error) {
return nil
}
// AttachOptions specifies how to attach to a container.
type AttachOptions struct {
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
Tty bool
StdinOnce bool
// CloseStdin is the function to close container stdin.
CloseStdin func() error
}
// Attach attaches container stdio.
func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) error {
// TODO(random-liu): Use pools.Copy in docker to reduce memory usage?
func (c *ContainerIO) Attach(opts AttachOptions) error {
if c.closer == nil {
return errors.New("container io is not initialized")
}
@ -263,56 +275,67 @@ func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) err
stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
var stdinCloser io.Closer
if c.stdinPath != "" && stdin != nil {
var stdinRC io.ReadCloser
if c.stdinPath != "" && opts.Stdin != nil {
f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700)
if err != nil {
return err
}
// Create a wrapper of stdin which could be closed. Note that the
// wrapper doesn't close the actual stdin, it only stops io.Copy.
// The actual stdin will be closed by stream server.
stdinRC = cioutil.NewWrapReadCloser(opts.Stdin)
// Also increase wait group here, so that `closer.Wait` will
// also wait for this fifo to be closed.
c.closer.wg.Add(1)
wg.Add(1)
go func(w io.WriteCloser) {
if _, err := io.Copy(w, stdin); err != nil {
if _, err := io.Copy(w, stdinRC); err != nil && err != io.ErrClosedPipe {
glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err)
}
w.Close()
glog.V(2).Infof("Attach stream %q closed", stdinKey)
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if stdout != nil {
if opts.StdinOnce && !opts.Tty {
// Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
// opts.Tty) we have to close container stdin and keep stdout and stderr open until
// container stops.
if err := opts.CloseStdin(); err != nil {
glog.Errorf("Failed to close stdin for container %q: %v", c.id, err)
}
} else {
if opts.Stdout != nil {
c.stdout.Remove(stdoutKey)
}
if stderr != nil {
if opts.Stderr != nil {
c.stderr.Remove(stderrKey)
}
}
wg.Done()
c.closer.wg.Done()
}(f)
stdinCloser = f
}
attachStream := func(key string, close <-chan struct{}) {
<-close
glog.V(2).Infof("Attach stream %q closed", key)
// Make sure stdin gets closed.
if stdinCloser != nil {
stdinCloser.Close()
if stdinRC != nil {
stdinRC.Close()
}
wg.Done()
}
if stdout != nil {
if opts.Stdout != nil {
wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(stdout)
wc, close := cioutil.NewWriteCloseInformer(opts.Stdout)
if err := c.stdout.Add(stdoutKey, wc); err != nil {
return err
}
go attachStream(stdoutKey, close)
}
if !c.tty && stderr != nil {
if opts.Tty && opts.Stderr != nil {
wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(stderr)
wc, close := cioutil.NewWriteCloseInformer(opts.Stderr)
if err := c.stderr.Add(stderrKey, wc); err != nil {
return err
}