From 42f778fc14124d6f81afae075960778d682f3457 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Wed, 8 May 2024 14:26:36 +0800 Subject: [PATCH 1/3] modify streaming io url form sandbox address should be in the form of +:// for example: ttrpc+hvsock:///run/test.hvsock:1024 or: grpc+vsock://1111111:1024 and the Stdin/Stdout/Stderr will add a `streaming_id` as a parameter of the url result form is: +://?streaming_id= for example ttrpc+hvsock:///run/test.hvsock:1024?streaming_id=111111 or grpc+vsock://1111111:1024?streaming_id=222222 Signed-off-by: Abel Feng --- internal/cri/io/container_io.go | 11 +++++++++++ internal/cri/io/exec_io.go | 11 +++++++++++ internal/cri/io/helpers.go | 15 ++++++--------- 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/internal/cri/io/container_io.go b/internal/cri/io/container_io.go index c0d60e486..9fc554534 100644 --- a/internal/cri/io/container_io.go +++ b/internal/cri/io/container_io.go @@ -73,6 +73,17 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts { } // WithStreams creates new streams for the container io. +// The stream address is in format of `protocol://address?stream_id=xyz`. +// It allocates ContainerID-stdin, ContainerID-stdout and ContainerID-stderr as streaming IDs. +// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and container ID is `app`. +// There are three streams if stdin is enabled and TTY is disabled. +// +// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin +// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout +// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr +// +// The streaming IDs will be used as unique key to establish stream tunnel. +// And it should support reconnection with the same streaming ID if containerd restarts. func WithStreams(address string, tty, stdin bool) ContainerIOOpts { return func(c *ContainerIO) error { if address == "" { diff --git a/internal/cri/io/exec_io.go b/internal/cri/io/exec_io.go index 495a40abc..bf31bc00e 100644 --- a/internal/cri/io/exec_io.go +++ b/internal/cri/io/exec_io.go @@ -55,6 +55,17 @@ func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) { } // NewStreamExecIO creates exec io with streaming. +// The stream address is in format of `protocol://address?stream_id=xyz`. +// It allocates ExecID-stdin, ExecID-stdout and ExecID-stderr as streaming IDs. +// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and exec ID is `app`. +// There are three streams if stdin is enabled and TTY is disabled. +// +// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin +// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout +// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr +// +// The streaming IDs will be used as unique key to establish stream tunnel. +// And it should support reconnection with the same streaming ID if containerd restarts. func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) { fifos, err := newStreams(address, id, tty, stdin) if err != nil { diff --git a/internal/cri/io/helpers.go b/internal/cri/io/helpers.go index 64344dd1f..6e148e8c1 100644 --- a/internal/cri/io/helpers.go +++ b/internal/cri/io/helpers.go @@ -105,13 +105,13 @@ func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) { fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil }) if stdin { streamID := id + "-stdin" - fifos.Stdin = fmt.Sprintf("%s/streaming?id=%s", address, streamID) + fifos.Stdin = fmt.Sprintf("%s?streaming_id=%s", address, streamID) } stdoutStreamID := id + "-stdout" - fifos.Stdout = fmt.Sprintf("%s/streaming?id=%s", address, stdoutStreamID) + fifos.Stdout = fmt.Sprintf("%s?streaming_id=%s", address, stdoutStreamID) if !tty { stderrStreamID := id + "-stderr" - fifos.Stderr = fmt.Sprintf("%s/streaming?id=%s", address, stderrStreamID) + fifos.Stderr = fmt.Sprintf("%s?streaming_id=%s", address, stderrStreamID) } fifos.Terminal = tty return fifos, nil @@ -209,6 +209,8 @@ func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) { } func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) { + // urlStr should be in the form of: + // +://?streaming_id= u, err := url.Parse(urlStr) if err != nil { return nil, fmt.Errorf("address url parse error: %v", err) @@ -221,12 +223,7 @@ func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) " the form of +, i.e. ttrpc+unix or grpc+vsock") } - if u.Path != "streaming" { - // TODO, support connect stream other than streaming api - return nil, fmt.Errorf("only
/streaming?id=xxx supported") - } - - id := u.Query().Get("id") + id := u.Query().Get("streaming_id") if id == "" { return nil, fmt.Errorf("no stream id in url queries") } From 7cead88004e2f25c40a83d1d9ffa6ed167c44a8b Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Mon, 13 May 2024 17:15:50 +0800 Subject: [PATCH 2/3] cri: restart created container with correct io type Signed-off-by: Abel Feng --- internal/cri/server/restart.go | 33 +++++++++++++++++++++++++++++---- 1 file changed, 29 insertions(+), 4 deletions(-) diff --git a/internal/cri/server/restart.go b/internal/cri/server/restart.go index 70f433570..f487c9aed 100644 --- a/internal/cri/server/restart.go +++ b/internal/cri/server/restart.go @@ -257,7 +257,6 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe defer cancel() id := cntr.ID() containerDir := c.getContainerRootDir(id) - volatileContainerDir := c.getVolatileContainerRootDir(id) var container containerstore.Container // Load container metadata. exts, err := cntr.Extensions(ctx) @@ -336,9 +335,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe // NOTE: Another possibility is that we've tried to start the container, but // containerd got restarted during that. In that case, we still // treat the container as `CREATED`. - containerIO, err = cio.NewContainerIO(id, - cio.WithNewFIFOs(volatileContainerDir, meta.Config.GetTty(), meta.Config.GetStdin()), - ) + containerIO, err = c.createContainerIO(id, meta.SandboxID, meta.Config) if err != nil { return fmt.Errorf("failed to create container io: %w", err) } @@ -465,3 +462,31 @@ func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, ba } return nil } + +func (c *criService) createContainerIO(containerID, sandboxID string, config *runtime.ContainerConfig) (*cio.ContainerIO, error) { + if config == nil { + return nil, fmt.Errorf("ContainerConfig should not be nil when create container io") + } + sb, err := c.sandboxStore.Get(sandboxID) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %w", sandboxID, err) + } + ociRuntime, err := c.config.GetSandboxRuntime(sb.Config, sb.Metadata.RuntimeHandler) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox runtime: %w", err) + } + var containerIO *cio.ContainerIO + switch ociRuntime.IOType { + case criconfig.IOTypeStreaming: + containerIO, err = cio.NewContainerIO(containerID, + cio.WithStreams(sb.Endpoint.Address, config.GetTty(), config.GetStdin())) + default: + volatileContainerRootDir := c.getVolatileContainerRootDir(containerID) + containerIO, err = cio.NewContainerIO(containerID, + cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin())) + } + if err != nil { + return nil, fmt.Errorf("failed to create container io: %w", err) + } + return containerIO, nil +} From 0b113d78d446cf07c5c0a319139ae4b17e340129 Mon Sep 17 00:00:00 2001 From: Abel Feng Date: Wed, 8 May 2024 15:53:01 +0800 Subject: [PATCH 3/3] doc: add the description of sandboxer and io_type Signed-off-by: Abel Feng --- docs/cri/config.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/cri/config.md b/docs/cri/config.md index 63041db75..2cdcf4bcf 100644 --- a/docs/cri/config.md +++ b/docs/cri/config.md @@ -369,6 +369,17 @@ version = 2 # See https://github.com/containerd/containerd/issues/6657 for context. snapshotter = "" + # sandboxer is the sandbox controller for the runtime. + # The default sandbox controller is the podsandbox controller, which create a "pause" container as a sandbox. + # We can create our own "shim" sandbox controller by implementing the sandbox api defined in runtime/sandbox/v1/sandbox.proto in our shim, and specifiy the sandboxer to "shim" here. + # We can also run a grpc or ttrpc server to serve the sandbox controller API defined in services/sandbox/v1/sandbox.proto, and define a ProxyPlugin of "sandbox" type, and specify the name of the ProxyPlugin here. + sandboxer = "" + + # io_type is the way containerd get stdin/stdout/stderr from container or the execed process. + # The default value is "fifo", in which containerd will create a set of named pipes and transfer io by them. + # Currently the value of "streaming" is supported, in this way, sandbox should serve streaming api defined in services/streaming/v1/streaming.proto, and containerd will connect to sandbox's endpoint and create a set of streams to it, as channels to transfer io of container or process. + io_type = "" + # 'plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options' is options specific to # "io.containerd.runc.v1" and "io.containerd.runc.v2". Its corresponding options type is: # https://github.com/containerd/containerd/blob/v1.3.2/runtime/v2/runc/options/oci.pb.go#L26 .