Merge pull request #10190 from abel-von/fix-streaming-io-path

fix: modify streaming io url and add docs of sandboxer and io_type
This commit is contained in:
Maksym Pavlenko 2024-05-16 19:57:27 +00:00 committed by GitHub
commit 90a8667310
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 68 additions and 13 deletions

View File

@ -369,6 +369,17 @@ version = 2
# See https://github.com/containerd/containerd/issues/6657 for context.
snapshotter = ""
# sandboxer is the sandbox controller for the runtime.
# The default sandbox controller is the podsandbox controller, which create a "pause" container as a sandbox.
# We can create our own "shim" sandbox controller by implementing the sandbox api defined in runtime/sandbox/v1/sandbox.proto in our shim, and specifiy the sandboxer to "shim" here.
# We can also run a grpc or ttrpc server to serve the sandbox controller API defined in services/sandbox/v1/sandbox.proto, and define a ProxyPlugin of "sandbox" type, and specify the name of the ProxyPlugin here.
sandboxer = ""
# io_type is the way containerd get stdin/stdout/stderr from container or the execed process.
# The default value is "fifo", in which containerd will create a set of named pipes and transfer io by them.
# Currently the value of "streaming" is supported, in this way, sandbox should serve streaming api defined in services/streaming/v1/streaming.proto, and containerd will connect to sandbox's endpoint and create a set of streams to it, as channels to transfer io of container or process.
io_type = ""
# 'plugins."io.containerd.grpc.v1.cri".containerd.runtimes.runc.options' is options specific to
# "io.containerd.runc.v1" and "io.containerd.runc.v2". Its corresponding options type is:
# https://github.com/containerd/containerd/blob/v1.3.2/runtime/v2/runc/options/oci.pb.go#L26 .

View File

@ -73,6 +73,17 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts {
}
// WithStreams creates new streams for the container io.
// The stream address is in format of `protocol://address?stream_id=xyz`.
// It allocates ContainerID-stdin, ContainerID-stdout and ContainerID-stderr as streaming IDs.
// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and container ID is `app`.
// There are three streams if stdin is enabled and TTY is disabled.
//
// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin
// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout
// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr
//
// The streaming IDs will be used as unique key to establish stream tunnel.
// And it should support reconnection with the same streaming ID if containerd restarts.
func WithStreams(address string, tty, stdin bool) ContainerIOOpts {
return func(c *ContainerIO) error {
if address == "" {

View File

@ -55,6 +55,17 @@ func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) {
}
// NewStreamExecIO creates exec io with streaming.
// The stream address is in format of `protocol://address?stream_id=xyz`.
// It allocates ExecID-stdin, ExecID-stdout and ExecID-stderr as streaming IDs.
// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and exec ID is `app`.
// There are three streams if stdin is enabled and TTY is disabled.
//
// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin
// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout
// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr
//
// The streaming IDs will be used as unique key to establish stream tunnel.
// And it should support reconnection with the same streaming ID if containerd restarts.
func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) {
fifos, err := newStreams(address, id, tty, stdin)
if err != nil {

View File

@ -105,13 +105,13 @@ func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) {
fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil })
if stdin {
streamID := id + "-stdin"
fifos.Stdin = fmt.Sprintf("%s/streaming?id=%s", address, streamID)
fifos.Stdin = fmt.Sprintf("%s?streaming_id=%s", address, streamID)
}
stdoutStreamID := id + "-stdout"
fifos.Stdout = fmt.Sprintf("%s/streaming?id=%s", address, stdoutStreamID)
fifos.Stdout = fmt.Sprintf("%s?streaming_id=%s", address, stdoutStreamID)
if !tty {
stderrStreamID := id + "-stderr"
fifos.Stderr = fmt.Sprintf("%s/streaming?id=%s", address, stderrStreamID)
fifos.Stderr = fmt.Sprintf("%s?streaming_id=%s", address, stderrStreamID)
}
fifos.Terminal = tty
return fifos, nil
@ -209,6 +209,8 @@ func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) {
}
func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) {
// urlStr should be in the form of:
// <ttrpc|grpc>+<unix|vsock|hvsock>://<uds-path|vsock-cid:vsock-port|uds-path:hvsock-port>?streaming_id=<stream-id>
u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("address url parse error: %v", err)
@ -221,12 +223,7 @@ func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error)
" the form of <protocol>+<unix|vsock|tcp>, i.e. ttrpc+unix or grpc+vsock")
}
if u.Path != "streaming" {
// TODO, support connect stream other than streaming api
return nil, fmt.Errorf("only <address>/streaming?id=xxx supported")
}
id := u.Query().Get("id")
id := u.Query().Get("streaming_id")
if id == "" {
return nil, fmt.Errorf("no stream id in url queries")
}

View File

@ -257,7 +257,6 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
defer cancel()
id := cntr.ID()
containerDir := c.getContainerRootDir(id)
volatileContainerDir := c.getVolatileContainerRootDir(id)
var container containerstore.Container
// Load container metadata.
exts, err := cntr.Extensions(ctx)
@ -336,9 +335,7 @@ func (c *criService) loadContainer(ctx context.Context, cntr containerd.Containe
// NOTE: Another possibility is that we've tried to start the container, but
// containerd got restarted during that. In that case, we still
// treat the container as `CREATED`.
containerIO, err = cio.NewContainerIO(id,
cio.WithNewFIFOs(volatileContainerDir, meta.Config.GetTty(), meta.Config.GetStdin()),
)
containerIO, err = c.createContainerIO(id, meta.SandboxID, meta.Config)
if err != nil {
return fmt.Errorf("failed to create container io: %w", err)
}
@ -465,3 +462,31 @@ func cleanupOrphanedIDDirs(ctx context.Context, cntrs []containerd.Container, ba
}
return nil
}
func (c *criService) createContainerIO(containerID, sandboxID string, config *runtime.ContainerConfig) (*cio.ContainerIO, error) {
if config == nil {
return nil, fmt.Errorf("ContainerConfig should not be nil when create container io")
}
sb, err := c.sandboxStore.Get(sandboxID)
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %w", sandboxID, err)
}
ociRuntime, err := c.config.GetSandboxRuntime(sb.Config, sb.Metadata.RuntimeHandler)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
}
var containerIO *cio.ContainerIO
switch ociRuntime.IOType {
case criconfig.IOTypeStreaming:
containerIO, err = cio.NewContainerIO(containerID,
cio.WithStreams(sb.Endpoint.Address, config.GetTty(), config.GetStdin()))
default:
volatileContainerRootDir := c.getVolatileContainerRootDir(containerID)
containerIO, err = cio.NewContainerIO(containerID,
cio.WithNewFIFOs(volatileContainerRootDir, config.GetTty(), config.GetStdin()))
}
if err != nil {
return nil, fmt.Errorf("failed to create container io: %w", err)
}
return containerIO, nil
}