containerd/internal/cri/io/helpers.go
Abel Feng 42f778fc14 modify streaming io url form
sandbox address should be in the form of
<ttrpc|grpc>+<unix|vsock|hvsock>://<uds-path|vsock-cid:vsock-port|uds-path:hvsock-port>
for example: ttrpc+hvsock:///run/test.hvsock:1024
or: grpc+vsock://1111111:1024
and the Stdin/Stdout/Stderr will add a `streaming_id` as a parameter of the url
result form is:
<ttrpc|grpc>+<unix|vsock|hvsock>://<uds-path|vsock-cid:vsock-port|uds-path:hvsock-port>?streaming_id=<stream-id>
for example ttrpc+hvsock:///run/test.hvsock:1024?streaming_id=111111
or grpc+vsock://1111111:1024?streaming_id=222222

Signed-off-by: Abel Feng <fshb1988@gmail.com>
2024-05-13 17:42:51 +08:00

269 lines
6.7 KiB
Go

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io
import (
"context"
"fmt"
"io"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
"github.com/containerd/ttrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
streamingapi "github.com/containerd/containerd/v2/core/streaming"
"github.com/containerd/containerd/v2/core/streaming/proxy"
"github.com/containerd/containerd/v2/core/transfer/streaming"
"github.com/containerd/containerd/v2/pkg/cio"
"github.com/containerd/containerd/v2/pkg/shim"
)
// AttachOptions specifies how to attach to a container.
type AttachOptions struct {
Stdin io.Reader
Stdout io.WriteCloser
Stderr io.WriteCloser
Tty bool
StdinOnce bool
// CloseStdin is the function to close container stdin.
CloseStdin func() error
}
// StreamType is the type of the stream, stdout/stderr.
type StreamType string
const (
// Stdin stream type.
Stdin StreamType = "stdin"
// Stdout stream type.
Stdout = StreamType(runtime.Stdout)
// Stderr stream type.
Stderr = StreamType(runtime.Stderr)
)
type wgCloser struct {
ctx context.Context
wg *sync.WaitGroup
set []io.Closer
cancel context.CancelFunc
}
func (g *wgCloser) Wait() {
g.wg.Wait()
}
func (g *wgCloser) Close() {
for _, f := range g.set {
f.Close()
}
}
func (g *wgCloser) Cancel() {
g.cancel()
}
// newFifos creates fifos directory for a container.
func newFifos(root, id string, tty, stdin bool) (*cio.FIFOSet, error) {
root = filepath.Join(root, "io")
if err := os.MkdirAll(root, 0700); err != nil {
return nil, err
}
fifos, err := cio.NewFIFOSetInDir(root, id, tty)
if err != nil {
return nil, err
}
if !stdin {
fifos.Stdin = ""
}
return fifos, nil
}
// newStreams init streams for io of container.
func newStreams(address, id string, tty, stdin bool) (*cio.FIFOSet, error) {
fifos := cio.NewFIFOSet(cio.Config{}, func() error { return nil })
if stdin {
streamID := id + "-stdin"
fifos.Stdin = fmt.Sprintf("%s?streaming_id=%s", address, streamID)
}
stdoutStreamID := id + "-stdout"
fifos.Stdout = fmt.Sprintf("%s?streaming_id=%s", address, stdoutStreamID)
if !tty {
stderrStreamID := id + "-stderr"
fifos.Stderr = fmt.Sprintf("%s?streaming_id=%s", address, stderrStreamID)
}
fifos.Terminal = tty
return fifos, nil
}
type stdioStream struct {
stdin io.WriteCloser
stdout io.ReadCloser
stderr io.ReadCloser
}
// newStdioStream creates actual streams or fifos for stdio.
func newStdioStream(fifos *cio.FIFOSet) (_ *stdioStream, _ *wgCloser, err error) {
var (
set []io.Closer
ctx, cancel = context.WithCancel(context.Background())
p = &stdioStream{}
)
defer func() {
if err != nil {
for _, f := range set {
f.Close()
}
cancel()
}
}()
if fifos.Stdin != "" {
in, err := openStdin(ctx, fifos.Stdin)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdin, %w", err)
}
p.stdin = in
set = append(set, in)
}
if fifos.Stdout != "" {
out, err := openOutput(ctx, fifos.Stdout)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stdout, %w", err)
}
p.stdout = out
set = append(set, out)
}
if fifos.Stderr != "" {
out, err := openOutput(ctx, fifos.Stderr)
if err != nil {
return nil, nil, fmt.Errorf("failed to open stderr, %w", err)
}
p.stderr = out
set = append(set, out)
}
return p, &wgCloser{
wg: &sync.WaitGroup{},
set: set,
ctx: ctx,
cancel: cancel,
}, nil
}
func openStdin(ctx context.Context, url string) (io.WriteCloser, error) {
ok := strings.Contains(url, "://")
if !ok {
return openPipe(ctx, url, syscall.O_WRONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}
return openStdinStream(ctx, url)
}
func openStdinStream(ctx context.Context, url string) (io.WriteCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.WriteByteStream(ctx, stream), nil
}
func openOutput(ctx context.Context, url string) (io.ReadCloser, error) {
ok := strings.Contains(url, "://")
if !ok {
return openPipe(ctx, url, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
}
return openOutputStream(ctx, url)
}
func openOutputStream(ctx context.Context, url string) (io.ReadCloser, error) {
stream, err := openStream(ctx, url)
if err != nil {
return nil, err
}
return streaming.ReadByteStream(ctx, stream), nil
}
func openStream(ctx context.Context, urlStr string) (streamingapi.Stream, error) {
// urlStr should be in the form of:
// <ttrpc|grpc>+<unix|vsock|hvsock>://<uds-path|vsock-cid:vsock-port|uds-path:hvsock-port>?streaming_id=<stream-id>
u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("address url parse error: %v", err)
}
// The address returned from sandbox controller should be in the form like ttrpc+unix://<uds-path>
// or grpc+vsock://<cid>:<port>, we should get the protocol from the url first.
protocol, scheme, ok := strings.Cut(u.Scheme, "+")
if !ok {
return nil, fmt.Errorf("the scheme of sandbox address should be in " +
" the form of <protocol>+<unix|vsock|tcp>, i.e. ttrpc+unix or grpc+vsock")
}
id := u.Query().Get("streaming_id")
if id == "" {
return nil, fmt.Errorf("no stream id in url queries")
}
realAddress := fmt.Sprintf("%s://%s/%s", scheme, u.Host, u.Path)
conn, err := shim.AnonReconnectDialer(realAddress, 100*time.Second)
if err != nil {
return nil, fmt.Errorf("failed to connect the stream %v", err)
}
var stream streamingapi.Stream
switch protocol {
case "ttrpc":
c := ttrpc.NewClient(conn)
streamCreator := proxy.NewStreamCreator(c)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil
case "grpc":
ctx, cancel := context.WithTimeout(ctx, time.Second*100)
defer cancel()
gopts := []grpc.DialOption{
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
}
conn, err := grpc.DialContext(ctx, realAddress, gopts...)
if err != nil {
return nil, err
}
streamCreator := proxy.NewStreamCreator(conn)
stream, err = streamCreator.Create(ctx, id)
if err != nil {
return nil, err
}
return stream, nil
default:
return nil, fmt.Errorf("protocol not supported")
}
}