Various fixes for container streaming.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2017-11-06 05:19:20 +00:00
parent 8a1a2f2713
commit eec818e6ab
5 changed files with 167 additions and 31 deletions

54
pkg/ioutil/read_closer.go Normal file
View File

@ -0,0 +1,54 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"io"
"sync"
)
// writeCloseInformer wraps a reader with a close function.
type wrapReadCloser struct {
// TODO(random-liu): Evaluate whether the lock introduces
// performance regression.
sync.RWMutex
r io.Reader
closed bool
}
// NewWrapReadCloser creates a wrapReadCloser from a reader.
func NewWrapReadCloser(r io.Reader) io.ReadCloser {
return &wrapReadCloser{r: r}
}
// Read reads up to len(p) bytes into p.
func (w *wrapReadCloser) Read(p []byte) (int, error) {
w.RLock()
defer w.RUnlock()
if w.closed {
return 0, io.EOF
}
return w.r.Read(p)
}
// Close closes read closer.
func (w *wrapReadCloser) Close() error {
w.Lock()
defer w.Unlock()
w.closed = true
return nil
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ioutil
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/assert"
)
func TestWrapReadCloser(t *testing.T) {
buf := bytes.NewBufferString("abc")
rc := NewWrapReadCloser(buf)
dst := make([]byte, 1)
n, err := rc.Read(dst)
assert.Equal(t, 1, n)
assert.NoError(t, err)
assert.Equal(t, []byte("a"), dst)
n, err = rc.Read(dst)
assert.Equal(t, 1, n)
assert.NoError(t, err)
assert.Equal(t, []byte("b"), dst)
rc.Close()
n, err = rc.Read(dst)
assert.Equal(t, 0, n)
assert.Equal(t, io.EOF, err)
assert.Equal(t, []byte("b"), dst)
}

View File

@ -25,6 +25,8 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
) )
// Attach prepares a streaming endpoint to attach to a running container, and returns the address. // Attach prepares a streaming endpoint to attach to a running container, and returns the address.
@ -71,15 +73,19 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s
} }
}) })
// TODO(random-liu): Figure out whether we need to support historical output. opts := cio.AttachOptions{
if err := cntr.IO.Attach(stdin, stdout, stderr); err != nil { Stdin: stdin,
return fmt.Errorf("failed to attach container: %v", err) Stdout: stdout,
Stderr: stderr,
Tty: tty,
StdinOnce: cntr.Config.StdinOnce,
CloseStdin: func() error {
return task.CloseIO(ctx, containerd.WithStdinCloser)
},
} }
// TODO(random-liu): Figure out whether we need to support historical output.
// Close stdin after first attach if StdinOnce is specified, otherwise stdin will if err := cntr.IO.Attach(opts); err != nil {
// be kept open until container exits. return fmt.Errorf("failed to attach container: %v", err)
if cntr.Config.StdinOnce {
task.CloseIO(ctx, containerd.WithStdinCloser)
} }
return nil return nil
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/opencontainers/runtime-tools/generate"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
@ -96,6 +97,11 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to load task: %v", err) return nil, fmt.Errorf("failed to load task: %v", err)
} }
if opts.tty {
g := generate.NewFromSpec(spec)
g.AddProcessEnv("TERM", "xterm")
spec = g.Spec()
}
pspec := spec.Process pspec := spec.Process
pspec.Args = opts.cmd pspec.Args = opts.cmd
pspec.Terminal = opts.tty pspec.Terminal = opts.tty
@ -125,12 +131,6 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
} }
}() }()
handleResizing(opts.resize, func(size remotecommand.TerminalSize) {
if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil {
glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err)
}
})
exitCh, err := process.Wait(ctx) exitCh, err := process.Wait(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to wait for process %q: %v", execID, err) return nil, fmt.Errorf("failed to wait for process %q: %v", execID, err)
@ -139,6 +139,12 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
return nil, fmt.Errorf("failed to start exec %q: %v", execID, err) return nil, fmt.Errorf("failed to start exec %q: %v", execID, err)
} }
handleResizing(opts.resize, func(size remotecommand.TerminalSize) {
if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil {
glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err)
}
})
var timeoutCh <-chan time.Time var timeoutCh <-chan time.Time
if opts.timeout == 0 { if opts.timeout == 0 {
// Do not set timeout if it's 0. // Do not set timeout if it's 0.

View File

@ -252,8 +252,20 @@ func (c *ContainerIO) Pipe() (err error) {
return nil return nil
} }
// AttachOptions specifies how to attach to a container.
type AttachOptions struct {
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
Tty bool
StdinOnce bool
// CloseStdin is the function to close container stdin.
CloseStdin func() error
}
// Attach attaches container stdio. // Attach attaches container stdio.
func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) error { // TODO(random-liu): Use pools.Copy in docker to reduce memory usage?
func (c *ContainerIO) Attach(opts AttachOptions) error {
if c.closer == nil { if c.closer == nil {
return errors.New("container io is not initialized") return errors.New("container io is not initialized")
} }
@ -263,56 +275,67 @@ func (c *ContainerIO) Attach(stdin io.Reader, stdout, stderr io.WriteCloser) err
stdoutKey := streamKey(c.id, "attach-"+key, Stdout) stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
stderrKey := streamKey(c.id, "attach-"+key, Stderr) stderrKey := streamKey(c.id, "attach-"+key, Stderr)
var stdinCloser io.Closer var stdinRC io.ReadCloser
if c.stdinPath != "" && stdin != nil { if c.stdinPath != "" && opts.Stdin != nil {
f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700) f, err := fifo.OpenFifo(c.closer.ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_NONBLOCK, 0700)
if err != nil { if err != nil {
return err return err
} }
// Create a wrapper of stdin which could be closed. Note that the
// wrapper doesn't close the actual stdin, it only stops io.Copy.
// The actual stdin will be closed by stream server.
stdinRC = cioutil.NewWrapReadCloser(opts.Stdin)
// Also increase wait group here, so that `closer.Wait` will // Also increase wait group here, so that `closer.Wait` will
// also wait for this fifo to be closed. // also wait for this fifo to be closed.
c.closer.wg.Add(1) c.closer.wg.Add(1)
wg.Add(1) wg.Add(1)
go func(w io.WriteCloser) { go func(w io.WriteCloser) {
if _, err := io.Copy(w, stdin); err != nil { if _, err := io.Copy(w, stdinRC); err != nil && err != io.ErrClosedPipe {
glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err) glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err)
} }
w.Close() w.Close()
glog.V(2).Infof("Attach stream %q closed", stdinKey) glog.V(2).Infof("Attach stream %q closed", stdinKey)
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr if opts.StdinOnce && !opts.Tty {
if stdout != nil { // Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
c.stdout.Remove(stdoutKey) // opts.Tty) we have to close container stdin and keep stdout and stderr open until
} // container stops.
if stderr != nil { if err := opts.CloseStdin(); err != nil {
c.stderr.Remove(stderrKey) glog.Errorf("Failed to close stdin for container %q: %v", c.id, err)
}
} else {
if opts.Stdout != nil {
c.stdout.Remove(stdoutKey)
}
if opts.Stderr != nil {
c.stderr.Remove(stderrKey)
}
} }
wg.Done() wg.Done()
c.closer.wg.Done() c.closer.wg.Done()
}(f) }(f)
stdinCloser = f
} }
attachStream := func(key string, close <-chan struct{}) { attachStream := func(key string, close <-chan struct{}) {
<-close <-close
glog.V(2).Infof("Attach stream %q closed", key) glog.V(2).Infof("Attach stream %q closed", key)
// Make sure stdin gets closed. // Make sure stdin gets closed.
if stdinCloser != nil { if stdinRC != nil {
stdinCloser.Close() stdinRC.Close()
} }
wg.Done() wg.Done()
} }
if stdout != nil { if opts.Stdout != nil {
wg.Add(1) wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(stdout) wc, close := cioutil.NewWriteCloseInformer(opts.Stdout)
if err := c.stdout.Add(stdoutKey, wc); err != nil { if err := c.stdout.Add(stdoutKey, wc); err != nil {
return err return err
} }
go attachStream(stdoutKey, close) go attachStream(stdoutKey, close)
} }
if !c.tty && stderr != nil { if opts.Tty && opts.Stderr != nil {
wg.Add(1) wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(stderr) wc, close := cioutil.NewWriteCloseInformer(opts.Stderr)
if err := c.stderr.Add(stderrKey, wc); err != nil { if err := c.stderr.Add(stderrKey, wc); err != nil {
return err return err
} }