diff --git a/internal/cri/io/container_io.go b/internal/cri/io/container_io.go index c0d60e486..9fc554534 100644 --- a/internal/cri/io/container_io.go +++ b/internal/cri/io/container_io.go @@ -73,6 +73,17 @@ func WithNewFIFOs(root string, tty, stdin bool) ContainerIOOpts { } // WithStreams creates new streams for the container io. +// The stream address is in format of `protocol://address?stream_id=xyz`. +// It allocates ContainerID-stdin, ContainerID-stdout and ContainerID-stderr as streaming IDs. +// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and container ID is `app`. +// There are three streams if stdin is enabled and TTY is disabled. +// +// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin +// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout +// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr +// +// The streaming IDs will be used as unique key to establish stream tunnel. +// And it should support reconnection with the same streaming ID if containerd restarts. func WithStreams(address string, tty, stdin bool) ContainerIOOpts { return func(c *ContainerIO) error { if address == "" { diff --git a/internal/cri/io/exec_io.go b/internal/cri/io/exec_io.go index 495a40abc..bf31bc00e 100644 --- a/internal/cri/io/exec_io.go +++ b/internal/cri/io/exec_io.go @@ -55,6 +55,17 @@ func NewFifoExecIO(id, root string, tty, stdin bool) (*ExecIO, error) { } // NewStreamExecIO creates exec io with streaming. +// The stream address is in format of `protocol://address?stream_id=xyz`. +// It allocates ExecID-stdin, ExecID-stdout and ExecID-stderr as streaming IDs. +// For example, that advertiser address of shim is `ttrpc+unix:///run/demo.sock` and exec ID is `app`. +// There are three streams if stdin is enabled and TTY is disabled. +// +// - Stdin: ttrpc+unix:///run/demo.sock?stream_id=app-stdin +// - Stdout: ttrpc+unix:///run/demo.sock?stream_id=app-stdout +// - stderr: ttrpc+unix:///run/demo.sock?stream_id=app-stderr +// +// The streaming IDs will be used as unique key to establish stream tunnel. +// And it should support reconnection with the same streaming ID if containerd restarts. func NewStreamExecIO(id, address string, tty, stdin bool) (*ExecIO, error) { fifos, err := newStreams(address, id, tty, stdin) if err != nil { diff --git a/internal/cri/io/helpers.go b/internal/cri/io/helpers.go index 64344dd1f..6e148e8c1 100644 --- a/internal/cri/io/helpers.go +++ b/internal/cri/io/helpers.go @@ -105,13 +105,13 @@ func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) { fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil }) if stdin { streamID := id + "-stdin" - fifos.Stdin = fmt.Sprintf("%s/streaming?id=%s", address, streamID) + fifos.Stdin = fmt.Sprintf("%s?streaming_id=%s", address, streamID) } stdoutStreamID := id + "-stdout" - fifos.Stdout = fmt.Sprintf("%s/streaming?id=%s", address, stdoutStreamID) + fifos.Stdout = fmt.Sprintf("%s?streaming_id=%s", address, stdoutStreamID) if !tty { stderrStreamID := id + "-stderr" - fifos.Stderr = fmt.Sprintf("%s/streaming?id=%s", address, stderrStreamID) + fifos.Stderr = fmt.Sprintf("%s?streaming_id=%s", address, stderrStreamID) } fifos.Terminal = tty return fifos, nil @@ -209,6 +209,8 @@ func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) { } func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) { + // urlStr should be in the form of: + // +://?streaming_id= u, err := url.Parse(urlStr) if err != nil { return nil, fmt.Errorf("address url parse error: %v", err) @@ -221,12 +223,7 @@ func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) " the form of +, i.e. ttrpc+unix or grpc+vsock") } - if u.Path != "streaming" { - // TODO, support connect stream other than streaming api - return nil, fmt.Errorf("only
/streaming?id=xxx supported") - } - - id := u.Query().Get("id") + id := u.Query().Get("streaming_id") if id == "" { return nil, fmt.Errorf("no stream id in url queries") }