Merge pull request #9965 from abel-von/streaming-io

cri: support io by streaming API
This commit is contained in:
Fu Wei
2024-05-07 14:22:12 +00:00
committed by GitHub
8 changed files with 364 additions and 38 deletions

View File

@@ -71,6 +71,10 @@ const (
// DefaultSandboxImage is the default image to use for sandboxes when empty or
// for default configurations.
DefaultSandboxImage = "registry.k8s.io/pause:3.9"
// IOTypeFifo is container io implemented by creating named pipe
IOTypeFifo = "fifo"
// IOTypeStreaming is container io implemented by connecting the streaming api to sandbox endpoint
IOTypeStreaming = "streaming"
)
// Runtime struct to contain the type(ID), engine, and root variables for a default runtime
@@ -116,6 +120,11 @@ type Runtime struct {
// shim - means use whatever Controller implementation provided by shim (e.g. use RemoteController).
// podsandbox - means use Controller implementation from sbserver podsandbox package.
Sandboxer string `toml:"sandboxer" json:"sandboxer"`
// IOType defines how containerd transfer the io streams of the container
// if it is not set, the named pipe will be created for the container
// we can also set it to "streaming" to create a stream by streaming api,
// and use it as a channel to transfer the io stream
IOType string `toml:"io_type" json:"io_type"`
}
// ContainerdConfig contains toml config related to containerd
@@ -527,6 +536,13 @@ func ValidateRuntimeConfig(ctx context.Context, c *RuntimeConfig) ([]deprecation
r.Sandboxer = string(ModePodSandbox)
c.ContainerdConfig.Runtimes[k] = r
}
if len(r.IOType) == 0 {
r.IOType = IOTypeFifo
}
if r.IOType != IOTypeStreaming && r.IOType != IOTypeFifo {
return warnings, errors.New("`io_type` can only be `streaming` or `named_pipe`")
}
}
// Validation for drain_exec_sync_io_timeout

View File

@@ -18,14 +18,15 @@ package io
import (
"errors"
"fmt"
"io"
"strings"
"sync"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/log"
"github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
)
@@ -39,7 +40,7 @@ type ContainerIO struct {
id string
fifos *cio.FIFOSet
*stdioPipes
*stdioStream
stdoutGroup *cioutil.WriterGroup
stderrGroup *cioutil.WriterGroup
@@ -71,6 +72,20 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
}
}
// WithStreams creates new streams for the container io.
func WithStreams(address string, tty, stdin bool) ContainerIOOpts {
return func(c *ContainerIO) error {
if address == "" {
return fmt.Errorf("address can not be empty for io stream")
}
fifos, err := newStreams(address, c.id, tty, stdin)
if err != nil {
return err
}
return WithFIFOs(fifos)(c)
}
}
// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
c := &ContainerIO{
@@ -87,11 +102,11 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err
return nil, errors.New("fifos are not set")
}
// Create actual fifos.
stdio, closer, err := newStdioPipes(c.fifos)
stdio, closer, err := newStdioStream(c.fifos)
if err != nil {
return nil, err
}
c.stdioPipes = stdio
c.stdioStream = stdio
c.closer = closer
return c, nil
}

View File

@@ -20,36 +20,55 @@ import (
"io"
"sync"
"github.com/containerd/log"
"github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
"github.com/containerd/log"
)
// ExecIO holds the exec io.
type ExecIO struct {
id string
fifos *cio.FIFOSet
*stdioPipes
*stdioStream
closer *wgCloser
}
var _ cio.IO = &ExecIO{}
// NewExecIO creates exec io.
func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
// NewFifoExecIO creates exec io by named pipes.
func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newFifos(root, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioPipes(fifos)
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioPipes: stdio,
closer: closer,
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}
// NewStreamExecIO creates exec io with streaming.
func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newStreams(address, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}

View File

@@ -18,14 +18,26 @@ package io
import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/ttrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
streamingapi "github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/containerd/v2/core/streaming/proxy"
"github.com/containerd/containerd/v2/core/transfer/streaming"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/shim"
)
// AttachOptions specifies how to attach to a container.
@@ -88,19 +100,35 @@ func newFifos(root, id string, tty, stdin bool) (*cio.FIFOSet, error) {
return fifos, nil
}
type stdioPipes struct {
// newStreams init streams for io of container.
func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) {
fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil })
if stdin {
streamID := id + "-stdin"
fifos.Stdin = fmt.Sprintf("%s/streaming?id=%s", address, streamID)
}
stdoutStreamID := id + "-stdout"
fifos.Stdout = fmt.Sprintf("%s/streaming?id=%s", address, stdoutStreamID)
if !tty {
stderrStreamID := id + "-stderr"
fifos.Stderr = fmt.Sprintf("%s/streaming?id=%s", address, stderrStreamID)
}
fifos.Terminal = tty
return fifos, nil
}
type stdioStream struct {
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
}
// newStdioPipes creates actual fifos for stdio.
func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
// newStdioStream creates actual streams or fifos for stdio.
func newStdioStream(fifos *cio.FIFOSet) (_ *stdioStream, _ *wgCloser, err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
p = &stdioPipes{}
p = &stdioStream{}
)
defer func() {
if err != nil {
@@ -112,27 +140,30 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
}()
if fifos.Stdin != "" {
if f, err = openPipe(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
in, err := openStdin(ctx, fifos.Stdin)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdin, %w", err)
}
p.stdin = f
set = append(set, f)
p.stdin = in
set = append(set, in)
}
if fifos.Stdout != "" {
if f, err = openPipe(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
out, err := openOutput(ctx, fifos.Stdout)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdout, %w", err)
}
p.stdout = f
set = append(set, f)
p.stdout = out
set = append(set, out)
}
if fifos.Stderr != "" {
if f, err = openPipe(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
out, err := openOutput(ctx, fifos.Stderr)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stderr, %w", err)
}
p.stderr = f
set = append(set, f)
p.stderr = out
set = append(set, out)
}
return p, &wgCloser{
@@ -142,3 +173,99 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
cancel: cancel,
}, nil
}
func openStdin(ctx context.Context, url string) (io.WriteCloser, error) {
ok := strings.Contains(url, "://")
if !ok {
return openPipe(ctx, url, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}
return openStdinStream(ctx, url)
}
func openStdinStream(ctx context.Context, url string) (io.WriteCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.WriteByteStream(ctx, stream), nil
}
func openOutput(ctx context.Context, url string) (io.ReadCloser, error) {
ok := strings.Contains(url, "://")
if !ok {
return openPipe(ctx, url, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}
return openOutputStream(ctx, url)
}
func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.ReadByteStream(ctx, stream), nil
}
func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("address url parse error: %v", err)
}
// The address returned from sandbox controller should be in the form like ttrpc+unix://<uds-path>
// or grpc+vsock://<cid>:<port>, we should get the protocol from the url first.
protocol, scheme, ok := strings.Cut(u.Scheme, "+")
if !ok {
return nil, fmt.Errorf("the scheme of sandbox address should be in " +
" the form of <protocol>+<unix|vsock|tcp>, i.e. ttrpc+unix or grpc+vsock")
}
if u.Path != "streaming" {
// TODO, support connect stream other than streaming api
return nil, fmt.Errorf("only <address>/streaming?id=xxx supported")
}
id := u.Query().Get("id")
if id == "" {
return nil, fmt.Errorf("no stream id in url queries")
}
realAddress := fmt.Sprintf("%s://%s/%s", scheme, u.Host, u.Path)
conn, err := shim.AnonReconnectDialer(realAddress, 100*time.Second)
if err != nil {
return nil, fmt.Errorf("failed to connect the stream %v", err)
}
var stream streamingapi.Stream
switch protocol {
case "ttrpc":
c := ttrpc.NewClient(conn)
streamCreator := proxy.NewStreamCreator(c)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil
case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
conn, err := grpc.DialContext(ctx, realAddress, gopts...)
if err != nil {
return nil, err
}
streamCreator := proxy.NewStreamCreator(conn)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil
default:
return nil, fmt.Errorf("protocol not supported")
}
}

View File

@@ -247,8 +247,15 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
sandboxConfig.GetLogDirectory(), config.GetLogPath())
}
containerIO, err := cio.NewContainerIO(id,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
var containerIO *cio.ContainerIO
switch ociRuntime.IOType {
case criconfig.IOTypeStreaming:
containerIO, err = cio.NewContainerIO(id,
cio.WithStreams(sandbox.Endpoint.Address, config.GetTty(), config.GetStdin()))
default:
containerIO, err = cio.NewContainerIO(id,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
}
if err != nil {
return nil, fmt.Errorf("failed to create container io: %w", err)
}

View File

@@ -24,16 +24,17 @@ import (
"syscall"
"time"
containerd "github.com/containerd/containerd/v2/client"
containerdio "github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/oci"
"github.com/containerd/errdefs"
"github.com/containerd/log"
"k8s.io/client-go/tools/remotecommand"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
containerd "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/internal/cri/config"
cio "github.com/containerd/containerd/v2/internal/cri/io"
"github.com/containerd/containerd/v2/internal/cri/util"
containerdio "github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
)
@@ -159,10 +160,28 @@ func (c *criService) execInternal(ctx context.Context, container containerd.Cont
log.G(ctx).Debugf("Generated exec id %q for container %q", execID, id)
volatileRootDir := c.getVolatileContainerRootDir(id)
var execIO *cio.ExecIO
process, err := task.Exec(ctx, execID, pspec,
func(id string) (containerdio.IO, error) {
var err error
execIO, err = cio.NewExecIO(id, volatileRootDir, opts.tty, opts.stdin != nil)
cntr, err := c.containerStore.Get(container.ID())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %w", container.ID(), err)
}
sb, err := c.sandboxStore.Get(cntr.SandboxID)
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %w", cntr.SandboxID, err)
}
ociRuntime, err := c.config.GetSandboxRuntime(sb.Config, sb.Metadata.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
}
switch ociRuntime.IOType {
case config.IOTypeStreaming:
execIO, err = cio.NewStreamExecIO(id, sb.Endpoint.Address, opts.tty, opts.stdin != nil)
default:
execIO, err = cio.NewFifoExecIO(id, volatileRootDir, opts.tty, opts.stdin != nil)
}
return execIO, err
},
)