
Organizes the cri related server packages under pkg/cri Signed-off-by: Derek McGowan <derek@mcg.dev>
237 lines
6.2 KiB
Go
237 lines
6.2 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package io
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/containerd/containerd/cio"
|
|
"github.com/sirupsen/logrus"
|
|
|
|
cioutil "github.com/containerd/containerd/pkg/ioutil"
|
|
"github.com/containerd/containerd/pkg/util"
|
|
)
|
|
|
|
// streamKey generates a key for the stream.
|
|
func streamKey(id, name string, stream StreamType) string {
|
|
return strings.Join([]string{id, name, string(stream)}, "-")
|
|
}
|
|
|
|
// ContainerIO holds the container io.
|
|
type ContainerIO struct {
|
|
id string
|
|
|
|
fifos *cio.FIFOSet
|
|
*stdioPipes
|
|
|
|
stdoutGroup *cioutil.WriterGroup
|
|
stderrGroup *cioutil.WriterGroup
|
|
|
|
closer *wgCloser
|
|
}
|
|
|
|
var _ cio.IO = &ContainerIO{}
|
|
|
|
// ContainerIOOpts sets specific information to newly created ContainerIO.
|
|
type ContainerIOOpts func(*ContainerIO) error
|
|
|
|
// WithFIFOs specifies existing fifos for the container io.
|
|
func WithFIFOs(fifos *cio.FIFOSet) ContainerIOOpts {
|
|
return func(c *ContainerIO) error {
|
|
c.fifos = fifos
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// WithNewFIFOs creates new fifos for the container io.
|
|
func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
|
|
return func(c *ContainerIO) error {
|
|
fifos, err := newFifos(root, c.id, tty, stdin)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return WithFIFOs(fifos)(c)
|
|
}
|
|
}
|
|
|
|
// NewContainerIO creates container io.
|
|
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
|
|
c := &ContainerIO{
|
|
id: id,
|
|
stdoutGroup: cioutil.NewWriterGroup(),
|
|
stderrGroup: cioutil.NewWriterGroup(),
|
|
}
|
|
for _, opt := range opts {
|
|
if err := opt(c); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if c.fifos == nil {
|
|
return nil, errors.New("fifos are not set")
|
|
}
|
|
// Create actual fifos.
|
|
stdio, closer, err := newStdioPipes(c.fifos)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.stdioPipes = stdio
|
|
c.closer = closer
|
|
return c, nil
|
|
}
|
|
|
|
// Config returns io config.
|
|
func (c *ContainerIO) Config() cio.Config {
|
|
return c.fifos.Config
|
|
}
|
|
|
|
// Pipe creates container fifos and pipe container output
|
|
// to output stream.
|
|
func (c *ContainerIO) Pipe() {
|
|
wg := c.closer.wg
|
|
if c.stdout != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
if _, err := io.Copy(c.stdoutGroup, c.stdout); err != nil {
|
|
logrus.WithError(err).Errorf("Failed to pipe stdout of container %q", c.id)
|
|
}
|
|
c.stdout.Close()
|
|
c.stdoutGroup.Close()
|
|
wg.Done()
|
|
logrus.Infof("Finish piping stdout of container %q", c.id)
|
|
}()
|
|
}
|
|
|
|
if !c.fifos.Terminal && c.stderr != nil {
|
|
wg.Add(1)
|
|
go func() {
|
|
if _, err := io.Copy(c.stderrGroup, c.stderr); err != nil {
|
|
logrus.WithError(err).Errorf("Failed to pipe stderr of container %q", c.id)
|
|
}
|
|
c.stderr.Close()
|
|
c.stderrGroup.Close()
|
|
wg.Done()
|
|
logrus.Infof("Finish piping stderr of container %q", c.id)
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Attach attaches container stdio.
|
|
// TODO(random-liu): Use pools.Copy in docker to reduce memory usage?
|
|
func (c *ContainerIO) Attach(opts AttachOptions) {
|
|
var wg sync.WaitGroup
|
|
key := util.GenerateID()
|
|
stdinKey := streamKey(c.id, "attach-"+key, Stdin)
|
|
stdoutKey := streamKey(c.id, "attach-"+key, Stdout)
|
|
stderrKey := streamKey(c.id, "attach-"+key, Stderr)
|
|
|
|
var stdinStreamRC io.ReadCloser
|
|
if c.stdin != nil && opts.Stdin != nil {
|
|
// Create a wrapper of stdin which could be closed. Note that the
|
|
// wrapper doesn't close the actual stdin, it only stops io.Copy.
|
|
// The actual stdin will be closed by stream server.
|
|
stdinStreamRC = cioutil.NewWrapReadCloser(opts.Stdin)
|
|
wg.Add(1)
|
|
go func() {
|
|
if _, err := io.Copy(c.stdin, stdinStreamRC); err != nil {
|
|
logrus.WithError(err).Errorf("Failed to pipe stdin for container attach %q", c.id)
|
|
}
|
|
logrus.Infof("Attach stream %q closed", stdinKey)
|
|
if opts.StdinOnce && !opts.Tty {
|
|
// Due to kubectl requirements and current docker behavior, when (opts.StdinOnce &&
|
|
// opts.Tty) we have to close container stdin and keep stdout and stderr open until
|
|
// container stops.
|
|
c.stdin.Close()
|
|
// Also closes the containerd side.
|
|
if err := opts.CloseStdin(); err != nil {
|
|
logrus.WithError(err).Errorf("Failed to close stdin for container %q", c.id)
|
|
}
|
|
} else {
|
|
if opts.Stdout != nil {
|
|
c.stdoutGroup.Remove(stdoutKey)
|
|
}
|
|
if opts.Stderr != nil {
|
|
c.stderrGroup.Remove(stderrKey)
|
|
}
|
|
}
|
|
wg.Done()
|
|
}()
|
|
}
|
|
|
|
attachStream := func(key string, close <-chan struct{}) {
|
|
<-close
|
|
logrus.Infof("Attach stream %q closed", key)
|
|
// Make sure stdin gets closed.
|
|
if stdinStreamRC != nil {
|
|
stdinStreamRC.Close()
|
|
}
|
|
wg.Done()
|
|
}
|
|
|
|
if opts.Stdout != nil {
|
|
wg.Add(1)
|
|
wc, close := cioutil.NewWriteCloseInformer(opts.Stdout)
|
|
c.stdoutGroup.Add(stdoutKey, wc)
|
|
go attachStream(stdoutKey, close)
|
|
}
|
|
if !opts.Tty && opts.Stderr != nil {
|
|
wg.Add(1)
|
|
wc, close := cioutil.NewWriteCloseInformer(opts.Stderr)
|
|
c.stderrGroup.Add(stderrKey, wc)
|
|
go attachStream(stderrKey, close)
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
// AddOutput adds new write closers to the container stream, and returns existing
|
|
// write closers if there are any.
|
|
func (c *ContainerIO) AddOutput(name string, stdout, stderr io.WriteCloser) (io.WriteCloser, io.WriteCloser) {
|
|
var oldStdout, oldStderr io.WriteCloser
|
|
if stdout != nil {
|
|
key := streamKey(c.id, name, Stdout)
|
|
oldStdout = c.stdoutGroup.Get(key)
|
|
c.stdoutGroup.Add(key, stdout)
|
|
}
|
|
if stderr != nil {
|
|
key := streamKey(c.id, name, Stderr)
|
|
oldStderr = c.stderrGroup.Get(key)
|
|
c.stderrGroup.Add(key, stderr)
|
|
}
|
|
return oldStdout, oldStderr
|
|
}
|
|
|
|
// Cancel cancels container io.
|
|
func (c *ContainerIO) Cancel() {
|
|
c.closer.Cancel()
|
|
}
|
|
|
|
// Wait waits container io to finish.
|
|
func (c *ContainerIO) Wait() {
|
|
c.closer.Wait()
|
|
}
|
|
|
|
// Close closes all FIFOs.
|
|
func (c *ContainerIO) Close() error {
|
|
c.closer.Close()
|
|
if c.fifos != nil {
|
|
return c.fifos.Close()
|
|
}
|
|
return nil
|
|
}
|