Merge pull request #410 from Random-Liu/refactor-and-fix-streaming

Refactor and fix container streaming.
This commit is contained in:
Lantao Liu 2017-11-10 14:03:49 -08:00 committed by GitHub
commit e1015b8d91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 585 additions and 418 deletions

View File

@ -175,10 +175,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
} }
containerIO, err := cio.NewContainerIO(id, containerIO, err := cio.NewContainerIO(id,
cio.WithStdinOpen(config.GetStdin()), cio.WithNewFIFOs(containerRootDir, config.GetTty(), config.GetStdin()))
cio.WithTerminal(config.GetTty()),
cio.WithRootDir(containerRootDir),
)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create container io: %v", err) return nil, fmt.Errorf("failed to create container io: %v", err)
} }

View File

@ -20,7 +20,6 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
@ -32,6 +31,8 @@ import (
"k8s.io/client-go/tools/remotecommand" "k8s.io/client-go/tools/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
"github.com/kubernetes-incubator/cri-containerd/pkg/util" "github.com/kubernetes-incubator/cri-containerd/pkg/util"
) )
@ -41,8 +42,8 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync
var stdout, stderr bytes.Buffer var stdout, stderr bytes.Buffer
exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{ exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{
cmd: r.GetCmd(), cmd: r.GetCmd(),
stdout: &stdout, stdout: cioutil.NewNopWriteCloser(&stdout),
stderr: &stderr, stderr: cioutil.NewNopWriteCloser(&stderr),
timeout: time.Duration(r.GetTimeout()) * time.Second, timeout: time.Duration(r.GetTimeout()) * time.Second,
}) })
if err != nil { if err != nil {
@ -60,8 +61,8 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync
type execOptions struct { type execOptions struct {
cmd []string cmd []string
stdin io.Reader stdin io.Reader
stdout io.Writer stdout io.WriteCloser
stderr io.Writer stderr io.WriteCloser
tty bool tty bool
resize <-chan remotecommand.TerminalSize resize <-chan remotecommand.TerminalSize
timeout time.Duration timeout time.Duration
@ -106,22 +107,23 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
pspec.Args = opts.cmd pspec.Args = opts.cmd
pspec.Terminal = opts.tty pspec.Terminal = opts.tty
if opts.stdin == nil {
opts.stdin = new(bytes.Buffer)
}
if opts.stdout == nil { if opts.stdout == nil {
opts.stdout = ioutil.Discard opts.stdout = cio.NewDiscardLogger()
} }
if opts.stderr == nil { if opts.stderr == nil {
opts.stderr = ioutil.Discard opts.stderr = cio.NewDiscardLogger()
} }
execID := util.GenerateID() execID := util.GenerateID()
process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( glog.V(4).Infof("Generated exec id %q for container %q", execID, id)
opts.stdin, rootDir := getContainerRootDir(c.config.RootDir, id)
opts.stdout, var execIO *cio.ExecIO
opts.stderr, process, err := task.Exec(ctx, execID, pspec,
opts.tty, func(id string) (containerd.IO, error) {
)) var err error
execIO, err = cio.NewExecIO(id, rootDir, opts.tty, opts.stdin != nil)
return execIO, err
},
)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create exec %q: %v", execID, err) return nil, fmt.Errorf("failed to create exec %q: %v", execID, err)
} }
@ -145,6 +147,17 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
} }
}) })
attachDone := execIO.Attach(cio.AttachOptions{
Stdin: opts.stdin,
Stdout: opts.stdout,
Stderr: opts.stderr,
Tty: opts.tty,
StdinOnce: true,
CloseStdin: func() error {
return process.CloseIO(ctx, containerd.WithStdinCloser)
},
})
var timeoutCh <-chan time.Time var timeoutCh <-chan time.Time
if opts.timeout == 0 { if opts.timeout == 0 {
// Do not set timeout if it's 0. // Do not set timeout if it's 0.
@ -163,6 +176,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
exitRes := <-exitCh exitRes := <-exitCh
glog.V(2).Infof("Timeout received while waiting for exec process kill %q code %d and error %v", glog.V(2).Infof("Timeout received while waiting for exec process kill %q code %d and error %v",
execID, exitRes.ExitCode(), exitRes.Error()) execID, exitRes.ExitCode(), exitRes.Error())
<-attachDone
glog.V(4).Infof("Stream pipe for exec process %q done", execID)
return nil, fmt.Errorf("timeout %v exceeded", opts.timeout) return nil, fmt.Errorf("timeout %v exceeded", opts.timeout)
case exitRes := <-exitCh: case exitRes := <-exitCh:
code, _, err := exitRes.Result() code, _, err := exitRes.Result()
@ -170,6 +185,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
if err != nil { if err != nil {
return nil, fmt.Errorf("failed while waiting for exec %q: %v", execID, err) return nil, fmt.Errorf("failed while waiting for exec %q: %v", execID, err)
} }
<-attachDone
glog.V(4).Infof("Stream pipe for exec process %q done", execID)
return &code, nil return &code, nil
} }
} }

View File

@ -121,9 +121,7 @@ func (c *criContainerdService) startContainer(ctx context.Context,
if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil { if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil {
return nil, fmt.Errorf("failed to add container log: %v", err) return nil, fmt.Errorf("failed to add container log: %v", err)
} }
if err := cntr.IO.Pipe(); err != nil { cntr.IO.Pipe()
return nil, fmt.Errorf("failed to pipe container io: %v", err)
}
return cntr.IO, nil return cntr.IO, nil
} }

View File

@ -0,0 +1,245 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io
import (
"errors"
"io"
"os"
"strings"
"sync"
"github.com/containerd/containerd"
"github.com/golang/glog"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
)
// streamKey generates a key for the stream.
func streamKey(id, name string, stream StreamType) string {
return strings.Join([]string{id, name, string(stream)}, "-")
}
// ContainerIO holds the container io.
type ContainerIO struct {
id string
fifos *containerd.FIFOSet
*stdioPipes
stdoutGroup *cioutil.WriterGroup
stderrGroup *cioutil.WriterGroup
closer *wgCloser
}
var _ containerd.IO = &ContainerIO{}
// ContainerIOOpts sets specific information to newly created ContainerIO.
type ContainerIOOpts func(*ContainerIO) error
// WithOutput adds output stream to the container io.
func WithOutput(name string, stdout, stderr io.WriteCloser) ContainerIOOpts {
return func(c *ContainerIO) error {
if stdout != nil {
if err := c.stdoutGroup.Add(streamKey(c.id, name, Stdout), stdout); err != nil {
return err
}
}
if stderr != nil {
if err := c.stderrGroup.Add(streamKey(c.id, name, Stderr), stderr); err != nil {
return err
}
}
return nil
}
}
// WithFIFOs specifies existing fifos for the container io.
func WithFIFOs(fifos *containerd.FIFOSet) ContainerIOOpts {
return func(c *ContainerIO) error {
c.fifos = fifos
return nil
}
}
// WithNewFIFOs creates new fifos for the container io.
func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
return func(c *ContainerIO) error {
fifos, err := newFifos(root, c.id, tty, stdin)
if err != nil {
return err
}
return WithFIFOs(fifos)(c)
}
}
// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
c := &ContainerIO{
id: id,
stdoutGroup: cioutil.NewWriterGroup(),
stderrGroup: cioutil.NewWriterGroup(),
}
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
if c.fifos == nil {
return nil, errors.New("fifos are not set")
}
// Create actual fifos.
stdio, closer, err := newStdioPipes(c.fifos)
if err != nil {
return nil, err
}
c.stdioPipes = stdio
c.closer = closer
return c, nil
}
// Config returns io config.
func (c *ContainerIO) Config() containerd.IOConfig {
return containerd.IOConfig{
Terminal: c.fifos.Terminal,
Stdin: c.fifos.In,
Stdout: c.fifos.Out,
Stderr: c.fifos.Err,
}
}
// Pipe creates container fifos and pipe container output
// to output stream.
func (c *ContainerIO) Pipe() {
wg := c.closer.wg
wg.Add(1)
go func() {
if _, err := io.Copy(c.stdoutGroup, c.stdout); err != nil {
glog.Errorf("Failed to pipe stdout of container %q: %v", c.id, err)
}
c.stdout.Close()
c.stdoutGroup.Close()
wg.Done()
glog.V(2).Infof("Finish piping stdout of container %q", c.id)
}()
if !c.fifos.Terminal {
wg.Add(1)
go func() {
if _, err := io.Copy(c.stderrGroup, c.stderr); err != nil {
glog.Errorf("Failed to pipe stderr of container %q: %v", c.id, err)
}
c.stderr.Close()
c.stderrGroup.Close()
wg.Done()
glog.V(2).Infof("Finish piping stderr of container %q", c.id)
}()
}
}
// Attach attaches container stdio.
// TODO(random-liu): Use pools.Copy in docker to reduce memory usage?
func (c *ContainerIO) Attach(opts AttachOptions) error {
var wg sync.WaitGroup
key := util.GenerateID()
stdinKey := streamKey(c.id, "attach-"+key, Stdin)
stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
var stdinStreamRC io.ReadCloser
if c.stdin != nil && opts.Stdin != nil {
// Create a wrapper of stdin which could be closed. Note that the
// wrapper doesn't close the actual stdin, it only stops io.Copy.
// The actual stdin will be closed by stream server.
stdinStreamRC = cioutil.NewWrapReadCloser(opts.Stdin)
wg.Add(1)
go func() {
if _, err := io.Copy(c.stdin, stdinStreamRC); err != nil {
glog.Errorf("Failed to pipe stdin for container attach %q: %v", c.id, err)
}
glog.V(2).Infof("Attach stream %q closed", stdinKey)
if opts.StdinOnce && !opts.Tty {
// Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
// opts.Tty) we have to close container stdin and keep stdout and stderr open until
// container stops.
c.stdin.Close()
// Also closes the containerd side.
if err := opts.CloseStdin(); err != nil {
glog.Errorf("Failed to close stdin for container %q: %v", c.id, err)
}
} else {
if opts.Stdout != nil {
c.stdoutGroup.Remove(stdoutKey)
}
if opts.Stderr != nil {
c.stderrGroup.Remove(stderrKey)
}
}
wg.Done()
}()
}
attachStream := func(key string, close <-chan struct{}) {
<-close
glog.V(2).Infof("Attach stream %q closed", key)
// Make sure stdin gets closed.
if stdinStreamRC != nil {
stdinStreamRC.Close()
}
wg.Done()
}
if opts.Stdout != nil {
wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(opts.Stdout)
if err := c.stdoutGroup.Add(stdoutKey, wc); err != nil {
return err
}
go attachStream(stdoutKey, close)
}
if !opts.Tty && opts.Stderr != nil {
wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(opts.Stderr)
if err := c.stderrGroup.Add(stderrKey, wc); err != nil {
return err
}
go attachStream(stderrKey, close)
}
wg.Wait()
return nil
}
// Cancel cancels container io.
func (c *ContainerIO) Cancel() {
c.closer.Cancel()
}
// Wait waits container io to finish.
func (c *ContainerIO) Wait() {
c.closer.Wait()
}
// Close closes all FIFOs.
func (c *ContainerIO) Close() error {
c.closer.Close()
if c.fifos != nil {
return os.RemoveAll(c.fifos.Dir)
}
return nil
}

152
pkg/server/io/exec_io.go Normal file
View File

@ -0,0 +1,152 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io
import (
"io"
"os"
"sync"
"github.com/containerd/containerd"
"github.com/golang/glog"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
)
// ExecIO holds the exec io.
type ExecIO struct {
id string
fifos *containerd.FIFOSet
*stdioPipes
closer *wgCloser
}
var _ containerd.IO = &ExecIO{}
// NewExecIO creates exec io.
func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newFifos(root, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioPipes(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioPipes: stdio,
closer: closer,
}, nil
}
// Config returns io config.
func (e *ExecIO) Config() containerd.IOConfig {
return containerd.IOConfig{
Terminal: e.fifos.Terminal,
Stdin: e.fifos.In,
Stdout: e.fifos.Out,
Stderr: e.fifos.Err,
}
}
// Attach attaches exec stdio. The logic is similar with container io attach.
func (e *ExecIO) Attach(opts AttachOptions) <-chan struct{} {
var wg sync.WaitGroup
var stdinStreamRC io.ReadCloser
if e.stdin != nil && opts.Stdin != nil {
stdinStreamRC = cioutil.NewWrapReadCloser(opts.Stdin)
wg.Add(1)
go func() {
if _, err := io.Copy(e.stdin, stdinStreamRC); err != nil {
glog.Errorf("Failed to redirect stdin for container exec %q: %v", e.id, err)
}
glog.V(2).Infof("Container exec %q stdin closed", e.id)
if opts.StdinOnce && !opts.Tty {
e.stdin.Close()
if err := opts.CloseStdin(); err != nil {
glog.Errorf("Failed to close stdin for container exec %q: %v", e.id, err)
}
} else {
if e.stdout != nil {
e.stdout.Close()
}
if e.stderr != nil {
e.stderr.Close()
}
}
wg.Done()
}()
}
attachOutput := func(t StreamType, stream io.WriteCloser, out io.ReadCloser) {
if _, err := io.Copy(stream, out); err != nil {
glog.Errorf("Failed to pipe %q for container exec %q: %v", t, e.id, err)
}
out.Close()
stream.Close()
if stdinStreamRC != nil {
stdinStreamRC.Close()
}
e.closer.wg.Done()
wg.Done()
glog.V(2).Infof("Finish piping %q of container exec %q", t, e.id)
}
if opts.Stdout != nil {
wg.Add(1)
// Closer should wait for this routine to be over.
e.closer.wg.Add(1)
go attachOutput(Stdout, opts.Stdout, e.stdout)
}
if !opts.Tty && opts.Stderr != nil {
wg.Add(1)
// Closer should wait for this routine to be over.
e.closer.wg.Add(1)
go attachOutput(Stderr, opts.Stderr, e.stderr)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
return done
}
// Cancel cancels exec io.
func (e *ExecIO) Cancel() {
e.closer.Cancel()
}
// Wait waits exec io to finish.
func (e *ExecIO) Wait() {
e.closer.Wait()
}
// Close closes all FIFOs.
func (e *ExecIO) Close() error {
if e.closer != nil {
e.closer.Close()
}
if e.fifos != nil {
return os.RemoveAll(e.fifos.Dir)
}
return nil
}

148
pkg/server/io/helpers.go Normal file
View File

@ -0,0 +1,148 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io
import (
"io"
"io/ioutil"
"os"
"path/filepath"
"sync"
"syscall"
"github.com/containerd/containerd"
"github.com/containerd/fifo"
"golang.org/x/net/context"
)
// AttachOptions specifies how to attach to a container.
type AttachOptions struct {
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
Tty bool
StdinOnce bool
// CloseStdin is the function to close container stdin.
CloseStdin func() error
}
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdin stream type.
Stdin StreamType = "stdin"
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)
type wgCloser struct {
ctx context.Context
wg *sync.WaitGroup
set []io.Closer
cancel context.CancelFunc
}
func (g *wgCloser) Wait() {
g.wg.Wait()
}
func (g *wgCloser) Close() {
for _, f := range g.set {
f.Close()
}
}
func (g *wgCloser) Cancel() {
g.cancel()
}
// newFifos creates fifos directory for a container.
func newFifos(root, id string, tty, stdin bool) (*containerd.FIFOSet, error) {
root = filepath.Join(root, "io")
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
dir, err := ioutil.TempDir(root, "")
if err != nil {
return nil, err
}
fifos := &containerd.FIFOSet{
Dir: dir,
In: filepath.Join(dir, id+"-stdin"),
Out: filepath.Join(dir, id+"-stdout"),
Err: filepath.Join(dir, id+"-stderr"),
Terminal: tty,
}
if !stdin {
fifos.In = ""
}
return fifos, nil
}
type stdioPipes struct {
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
}
// newStdioPipes creates actual fifos for stdio.
func newStdioPipes(fifos *containerd.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
p = &stdioPipes{}
)
defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()
if fifos.In != "" {
if f, err = fifo.OpenFifo(ctx, fifos.In, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
}
p.stdin = f
set = append(set, f)
}
if f, err = fifo.OpenFifo(ctx, fifos.Out, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
}
p.stdout = f
set = append(set, f)
if f, err = fifo.OpenFifo(ctx, fifos.Err, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
}
p.stderr = f
set = append(set, f)
return p, &wgCloser{
wg: &sync.WaitGroup{},
set: set,
ctx: ctx,
cancel: cancel,
}, nil
}

View File

@ -1,384 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package agents
import (
"errors"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"github.com/containerd/containerd"
"github.com/containerd/fifo"
"github.com/golang/glog"
"golang.org/x/net/context"
cioutil "github.com/kubernetes-incubator/cri-containerd/pkg/ioutil"
"github.com/kubernetes-incubator/cri-containerd/pkg/util"
)
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdin stream type.
Stdin StreamType = "stdin"
// Stdout stream type.
Stdout StreamType = "stdout"
// Stderr stream type.
Stderr StreamType = "stderr"
)
type wgCloser struct {
ctx context.Context
wg *sync.WaitGroup
set []io.Closer
cancel context.CancelFunc
}
func (g *wgCloser) Wait() {
g.wg.Wait()
}
func (g *wgCloser) Close() {
for _, f := range g.set {
f.Close()
}
}
func (g *wgCloser) Cancel() {
g.cancel()
}
// streamKey generates a key for the stream.
func streamKey(id, name string, stream StreamType) string {
return strings.Join([]string{id, name, string(stream)}, "-")
}
// ContainerIO holds the container io.
type ContainerIO struct {
id string
dir string
stdinPath string
stdoutPath string
stderrPath string
// Configs for the io.
tty bool
openStdin bool
root string
stdin io.WriteCloser
stdout *cioutil.WriterGroup
stderr *cioutil.WriterGroup
closer *wgCloser
}
var _ containerd.IO = &ContainerIO{}
// Opts sets specific information to newly created ContainerIO.
type Opts func(*ContainerIO) error
// WithStdinOpen enables stdin of the container io.
func WithStdinOpen(open bool) Opts {
return func(c *ContainerIO) error {
c.openStdin = open
return nil
}
}
// WithOutput adds output stream to the container io.
func WithOutput(name string, stdout, stderr io.WriteCloser) Opts {
return func(c *ContainerIO) error {
if stdout != nil {
if err := c.stdout.Add(streamKey(c.id, name, Stdout), stdout); err != nil {
return err
}
}
if stderr != nil {
if err := c.stderr.Add(streamKey(c.id, name, Stderr), stderr); err != nil {
return err
}
}
return nil
}
}
// WithTerminal enables tty of the container io.
func WithTerminal(tty bool) Opts {
return func(c *ContainerIO) error {
c.tty = tty
return nil
}
}
// WithRootDir sets the root directory to create container streams.
func WithRootDir(root string) Opts {
return func(c *ContainerIO) error {
c.root = root
return nil
}
}
// WithFIFOs specifies existing fifos for the container io.
func WithFIFOs(dir, stdin, stdout, stderr string) Opts {
return func(c *ContainerIO) error {
c.dir = dir
c.stdinPath = stdin
c.stdoutPath = stdout
c.stderrPath = stderr
return nil
}
}
// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) {
c := &ContainerIO{
id: id,
stdout: cioutil.NewWriterGroup(),
stderr: cioutil.NewWriterGroup(),
root: os.TempDir(),
}
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
if c.dir != "" {
// Return if fifos are already set.
return c, nil
}
fifos, err := newFifos(c.root, id)
if err != nil {
return nil, err
}
c.dir = fifos.Dir
c.stdoutPath = fifos.Out
c.stderrPath = fifos.Err
if c.openStdin {
c.stdinPath = fifos.In
}
return c, nil
}
// Config returns io config.
func (c *ContainerIO) Config() containerd.IOConfig {
return containerd.IOConfig{
Terminal: c.tty,
Stdin: c.stdinPath,
Stdout: c.stdoutPath,
Stderr: c.stderrPath,
}
}
// Pipe creates container fifos and pipe container output
// to output stream.
func (c *ContainerIO) Pipe() (err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
wg = &sync.WaitGroup{}
)
defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()
if c.stdinPath != "" {
if f, err = fifo.OpenFifo(ctx, c.stdinPath, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
c.stdin = f
set = append(set, f)
}
if f, err = fifo.OpenFifo(ctx, c.stdoutPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
set = append(set, f)
wg.Add(1)
go func(r io.ReadCloser) {
if _, err := io.Copy(c.stdout, r); err != nil {
glog.Errorf("Failed to redirect stdout of container %q: %v", c.id, err)
}
r.Close()
c.stdout.Close()
wg.Done()
glog.V(2).Infof("Finish piping stdout of container %q", c.id)
}(f)
if f, err = fifo.OpenFifo(ctx, c.stderrPath, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return err
}
set = append(set, f)
if !c.tty {
wg.Add(1)
go func(r io.ReadCloser) {
if _, err := io.Copy(c.stderr, r); err != nil {
glog.Errorf("Failed to redirect stderr of container %q: %v", c.id, err)
}
r.Close()
c.stderr.Close()
wg.Done()
glog.V(2).Infof("Finish piping stderr of container %q", c.id)
}(f)
}
c.closer = &wgCloser{
wg: wg,
set: set,
ctx: ctx,
cancel: cancel,
}
return nil
}
// AttachOptions specifies how to attach to a container.
type AttachOptions struct {
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
Tty bool
StdinOnce bool
// CloseStdin is the function to close container stdin.
CloseStdin func() error
}
// Attach attaches container stdio.
// TODO(random-liu): Use pools.Copy in docker to reduce memory usage?
func (c *ContainerIO) Attach(opts AttachOptions) error {
if c.closer == nil {
return errors.New("container io is not initialized")
}
var wg sync.WaitGroup
key := util.GenerateID()
stdinKey := streamKey(c.id, "attach-"+key, Stdin)
stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
var stdinRC io.ReadCloser
if c.stdin != nil && opts.Stdin != nil {
// Create a wrapper of stdin which could be closed. Note that the
// wrapper doesn't close the actual stdin, it only stops io.Copy.
// The actual stdin will be closed by stream server.
stdinRC = cioutil.NewWrapReadCloser(opts.Stdin)
// Also increase wait group here, so that `closer.Wait` will
// also wait for this fifo to be closed.
wg.Add(1)
go func() {
if _, err := io.Copy(c.stdin, stdinRC); err != nil {
glog.Errorf("Failed to redirect stdin for container attach %q: %v", c.id, err)
}
glog.V(2).Infof("Attach stream %q closed", stdinKey)
if opts.StdinOnce && !opts.Tty {
// Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
// opts.Tty) we have to close container stdin and keep stdout and stderr open until
// container stops.
c.stdin.Close()
// Also closes the containerd side.
if err := opts.CloseStdin(); err != nil {
glog.Errorf("Failed to close stdin for container %q: %v", c.id, err)
}
} else {
if opts.Stdout != nil {
c.stdout.Remove(stdoutKey)
}
if opts.Stderr != nil {
c.stderr.Remove(stderrKey)
}
}
wg.Done()
}()
}
attachStream := func(key string, close <-chan struct{}) {
<-close
glog.V(2).Infof("Attach stream %q closed", key)
// Make sure stdin gets closed.
if stdinRC != nil {
stdinRC.Close()
}
wg.Done()
}
if opts.Stdout != nil {
wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(opts.Stdout)
if err := c.stdout.Add(stdoutKey, wc); err != nil {
return err
}
go attachStream(stdoutKey, close)
}
if !opts.Tty && opts.Stderr != nil {
wg.Add(1)
wc, close := cioutil.NewWriteCloseInformer(opts.Stderr)
if err := c.stderr.Add(stderrKey, wc); err != nil {
return err
}
go attachStream(stderrKey, close)
}
wg.Wait()
return nil
}
// Cancel cancels container io.
func (c *ContainerIO) Cancel() {
c.closer.Cancel()
}
// Wait waits container io to finish.
func (c *ContainerIO) Wait() {
c.closer.Wait()
}
// Close closes all FIFOs.
func (c *ContainerIO) Close() error {
if c.closer != nil {
c.closer.Close()
}
if c.dir != "" {
return os.RemoveAll(c.dir)
}
return nil
}
// newFifos creates fifos directory for a container.
func newFifos(root, id string) (*containerd.FIFOSet, error) {
root = filepath.Join(root, "io")
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
dir, err := ioutil.TempDir(root, "")
if err != nil {
return nil, err
}
return &containerd.FIFOSet{
Dir: dir,
In: filepath.Join(dir, id+"-stdin"),
Out: filepath.Join(dir, id+"-stdout"),
Err: filepath.Join(dir, id+"-stderr"),
}, nil
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package agents package io
import ( import (
"bufio" "bufio"

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
package agents package io
import ( import (
"bytes" "bytes"

View File

@ -160,16 +160,13 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
return nil, err return nil, err
} }
containerIO, err = cio.NewContainerIO(id, containerIO, err = cio.NewContainerIO(id,
cio.WithTerminal(fifos.Terminal), cio.WithFIFOs(fifos),
cio.WithFIFOs(fifos.Dir, fifos.In, fifos.Out, fifos.Err),
cio.WithOutput("log", stdoutWC, stderrWC), cio.WithOutput("log", stdoutWC, stderrWC),
) )
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := containerIO.Pipe(); err != nil { containerIO.Pipe()
return nil, err
}
return containerIO, nil return containerIO, nil
}) })
if err != nil && !errdefs.IsNotFound(err) { if err != nil && !errdefs.IsNotFound(err) {
@ -196,14 +193,11 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir
// to generate container status. // to generate container status.
switch status.State() { switch status.State() {
case runtime.ContainerState_CONTAINER_CREATED: case runtime.ContainerState_CONTAINER_CREATED:
// TODO(random-liu): Do not create fifos directory in NewContainerIO.
// container is in created state, create container io for it.
// NOTE: Another possibility is that we've tried to start the container, but // NOTE: Another possibility is that we've tried to start the container, but
// cri-containerd got restarted just during that. In that case, we still // cri-containerd got restarted just during that. In that case, we still
// treat the container as `CREATED`. // treat the container as `CREATED`.
containerIO, err = cio.NewContainerIO(id, containerIO, err = cio.NewContainerIO(id,
cio.WithStdinOpen(meta.Config.GetStdin()), cio.WithNewFIFOs(containerDir, meta.Config.GetTty(), meta.Config.GetStdin()),
cio.WithTerminal(meta.Config.GetTty()),
) )
if err != nil { if err != nil {
return container, fmt.Errorf("failed to create container io: %v", err) return container, fmt.Errorf("failed to create container io: %v", err)