cri: support io by streaming api

Signed-off-by: Abel Feng <fshb1988@gmail.com>
This commit is contained in:
Abel Feng
2024-03-13 09:37:19 +08:00
parent a26c686ea2
commit b8dfb4d8f5
8 changed files with 364 additions and 38 deletions

View File

@@ -18,14 +18,15 @@ package io
import (
"errors"
"fmt"
"io"
"strings"
"sync"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/log"
"github.com/containerd/containerd/v2/internal/cri/util"
"github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
)
@@ -39,7 +40,7 @@ type ContainerIO struct {
id string
fifos *cio.FIFOSet
*stdioPipes
*stdioStream
stdoutGroup *cioutil.WriterGroup
stderrGroup *cioutil.WriterGroup
@@ -71,6 +72,20 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
}
}
// WithStreams creates new streams for the container io.
func WithStreams(address string, tty, stdin bool) ContainerIOOpts {
return func(c *ContainerIO) error {
if address == "" {
return fmt.Errorf("address can not be empty for io stream")
}
fifos, err := newStreams(address, c.id, tty, stdin)
if err != nil {
return err
}
return WithFIFOs(fifos)(c)
}
}
// NewContainerIO creates container io.
func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err error) {
c := &ContainerIO{
@@ -87,11 +102,11 @@ func NewContainerIO(id string, opts ...ContainerIOOpts) (_ *ContainerIO, err err
return nil, errors.New("fifos are not set")
}
// Create actual fifos.
stdio, closer, err := newStdioPipes(c.fifos)
stdio, closer, err := newStdioStream(c.fifos)
if err != nil {
return nil, err
}
c.stdioPipes = stdio
c.stdioStream = stdio
c.closer = closer
return c, nil
}

View File

@@ -20,36 +20,55 @@ import (
"io"
"sync"
"github.com/containerd/log"
"github.com/containerd/containerd/v2/pkg/cio"
cioutil "github.com/containerd/containerd/v2/pkg/ioutil"
"github.com/containerd/log"
)
// ExecIO holds the exec io.
type ExecIO struct {
id string
fifos *cio.FIFOSet
*stdioPipes
*stdioStream
closer *wgCloser
}
var _ cio.IO = &ExecIO{}
// NewExecIO creates exec io.
func NewExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
// NewFifoExecIO creates exec io by named pipes.
func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newFifos(root, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioPipes(fifos)
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioPipes: stdio,
closer: closer,
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}
// NewStreamExecIO creates exec io with streaming.
func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newStreams(address, id, tty, stdin)
if err != nil {
return nil, err
}
stdio, closer, err := newStdioStream(fifos)
if err != nil {
return nil, err
}
return &ExecIO{
id: id,
fifos: fifos,
stdioStream: stdio,
closer: closer,
}, nil
}

View File

@@ -18,14 +18,26 @@ package io
import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/ttrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
streamingapi "github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/containerd/v2/core/streaming/proxy"
"github.com/containerd/containerd/v2/core/transfer/streaming"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/shim"
)
// AttachOptions specifies how to attach to a container.
@@ -88,19 +100,35 @@ func newFifos(root, id string, tty, stdin bool) (*cio.FIFOSet, error) {
return fifos, nil
}
type stdioPipes struct {
// newStreams init streams for io of container.
func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) {
fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil })
if stdin {
streamID := id + "-stdin"
fifos.Stdin = fmt.Sprintf("%s/streaming?id=%s", address, streamID)
}
stdoutStreamID := id + "-stdout"
fifos.Stdout = fmt.Sprintf("%s/streaming?id=%s", address, stdoutStreamID)
if !tty {
stderrStreamID := id + "-stderr"
fifos.Stderr = fmt.Sprintf("%s/streaming?id=%s", address, stderrStreamID)
}
fifos.Terminal = tty
return fifos, nil
}
type stdioStream struct {
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
}
// newStdioPipes creates actual fifos for stdio.
func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
// newStdioStream creates actual streams or fifos for stdio.
func newStdioStream(fifos *cio.FIFOSet) (_ *stdioStream, _ *wgCloser, err error) {
var (
f io.ReadWriteCloser
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
p = &stdioPipes{}
p = &stdioStream{}
)
defer func() {
if err != nil {
@@ -112,27 +140,30 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
}()
if fifos.Stdin != "" {
if f, err = openPipe(ctx, fifos.Stdin, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
in, err := openStdin(ctx, fifos.Stdin)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdin, %w", err)
}
p.stdin = f
set = append(set, f)
p.stdin = in
set = append(set, in)
}
if fifos.Stdout != "" {
if f, err = openPipe(ctx, fifos.Stdout, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
out, err := openOutput(ctx, fifos.Stdout)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdout, %w", err)
}
p.stdout = f
set = append(set, f)
p.stdout = out
set = append(set, out)
}
if fifos.Stderr != "" {
if f, err = openPipe(ctx, fifos.Stderr, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700); err != nil {
return nil, nil, err
out, err := openOutput(ctx, fifos.Stderr)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stderr, %w", err)
}
p.stderr = f
set = append(set, f)
p.stderr = out
set = append(set, out)
}
return p, &wgCloser{
@@ -142,3 +173,99 @@ func newStdioPipes(fifos *cio.FIFOSet) (_ *stdioPipes, _ *wgCloser, err error) {
cancel: cancel,
}, nil
}
func openStdin(ctx context.Context, url string) (io.WriteCloser, error) {
ok := strings.Contains(url, "://")
if !ok {
return openPipe(ctx, url, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}
return openStdinStream(ctx, url)
}
func openStdinStream(ctx context.Context, url string) (io.WriteCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.WriteByteStream(ctx, stream), nil
}
func openOutput(ctx context.Context, url string) (io.ReadCloser, error) {
ok := strings.Contains(url, "://")
if !ok {
return openPipe(ctx, url, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}
return openOutputStream(ctx, url)
}
func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.ReadByteStream(ctx, stream), nil
}
func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) {
u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("address url parse error: %v", err)
}
// The address returned from sandbox controller should be in the form like ttrpc+unix://<uds-path>
// or grpc+vsock://<cid>:<port>, we should get the protocol from the url first.
protocol, scheme, ok := strings.Cut(u.Scheme, "+")
if !ok {
return nil, fmt.Errorf("the scheme of sandbox address should be in " +
" the form of <protocol>+<unix|vsock|tcp>, i.e. ttrpc+unix or grpc+vsock")
}
if u.Path != "streaming" {
// TODO, support connect stream other than streaming api
return nil, fmt.Errorf("only <address>/streaming?id=xxx supported")
}
id := u.Query().Get("id")
if id == "" {
return nil, fmt.Errorf("no stream id in url queries")
}
realAddress := fmt.Sprintf("%s://%s/%s", scheme, u.Host, u.Path)
conn, err := shim.AnonReconnectDialer(realAddress, 100*time.Second)
if err != nil {
return nil, fmt.Errorf("failed to connect the stream %v", err)
}
var stream streamingapi.Stream
switch protocol {
case "ttrpc":
c := ttrpc.NewClient(conn)
streamCreator := proxy.NewStreamCreator(c)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil
case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
conn, err := grpc.DialContext(ctx, realAddress, gopts...)
if err != nil {
return nil, err
}
streamCreator := proxy.NewStreamCreator(conn)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil
default:
return nil, fmt.Errorf("protocol not supported")
}
}