diff --git a/cmd/cri-containerd/cri_containerd.go b/cmd/cri-containerd/cri_containerd.go index 5ddb6e6a2..cbeee1706 100644 --- a/cmd/cri-containerd/cri_containerd.go +++ b/cmd/cri-containerd/cri_containerd.go @@ -38,7 +38,14 @@ func main() { } glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath) - service, err := server.NewCRIContainerdService(o.ContainerdEndpoint, o.RootDir, o.NetworkPluginBinDir, o.NetworkPluginConfDir) + service, err := server.NewCRIContainerdService( + o.ContainerdEndpoint, + o.RootDir, + o.NetworkPluginBinDir, + o.NetworkPluginConfDir, + o.StreamServerAddress, + o.StreamServerPort, + ) if err != nil { glog.Exitf("Failed to create CRI containerd service %+v: %v", o, err) } diff --git a/cmd/cri-containerd/options/options.go b/cmd/cri-containerd/options/options.go index 097c704f0..edd196f97 100644 --- a/cmd/cri-containerd/options/options.go +++ b/cmd/cri-containerd/options/options.go @@ -40,6 +40,10 @@ type CRIContainerdOptions struct { NetworkPluginBinDir string // NetworkPluginConfDir is the directory in which the admin places a CNI conf. NetworkPluginConfDir string + // StreamServerAddress is the ip address streaming server is listening on. + StreamServerAddress string + // StreamServerPort is the port streaming server is listening on. + StreamServerPort string } // NewCRIContainerdOptions returns a reference to CRIContainerdOptions @@ -63,6 +67,10 @@ func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) { "/etc/cni/net.d", "The directory for putting network binaries.") fs.StringVar(&c.NetworkPluginConfDir, "network-conf-dir", "/opt/cni/bin", "The directory for putting network plugin configuration files.") + fs.StringVar(&c.StreamServerAddress, "stream-addr", + "", "The ip address streaming server is listening on. Default host interface is used if this is empty.") + fs.StringVar(&c.StreamServerPort, "stream-port", + "10010", "The port streaming server is listening on.") } // InitFlags must be called after adding all cli options flags are defined and diff --git a/pkg/server/container_exec.go b/pkg/server/container_exec.go index c89faea24..c0a050959 100644 --- a/pkg/server/container_exec.go +++ b/pkg/server/container_exec.go @@ -17,14 +17,30 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" - "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) // Exec prepares a streaming endpoint to execute a command in the container, and returns the address. -func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) (*runtime.ExecResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) Exec(ctx context.Context, r *runtime.ExecRequest) (retRes *runtime.ExecResponse, retErr error) { + glog.V(2).Infof("Exec for %q with command %+v, tty %v and stdin %v", + r.GetContainerId(), r.GetCmd(), r.GetTty(), r.GetStdin()) + defer func() { + if retErr == nil { + glog.V(2).Infof("Exec for %q returns URL %q", r.GetContainerId(), retRes.Url) + } + }() + + cntr, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("failed to find container in store: %v", err) + } + state := cntr.Status.Get().State() + if state != runtime.ContainerState_CONTAINER_RUNNING { + return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) + } + return c.streamServer.GetExec(r) } diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index 11fb3c488..e7f267e84 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -43,10 +43,9 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync } }() - var stdin, stdout, stderr bytes.Buffer + var stdout, stderr bytes.Buffer exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{ cmd: r.GetCmd(), - stdin: &stdin, stdout: &stdout, stderr: &stderr, timeout: time.Duration(r.GetTimeout()) * time.Second, @@ -106,6 +105,10 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o pspec.Args = opts.cmd pspec.Terminal = opts.tty + if opts.stdin == nil { + // Create empty buffer if stdin is nil. + opts.stdin = new(bytes.Buffer) + } execID := generateID() process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( opts.stdin, diff --git a/pkg/server/service.go b/pkg/server/service.go index 40c2087b6..3bf193cc7 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -28,9 +28,11 @@ import ( "github.com/containerd/containerd/images" diffservice "github.com/containerd/containerd/services/diff" "github.com/containerd/containerd/snapshot" + "github.com/golang/glog" "github.com/kubernetes-incubator/cri-o/pkg/ocicni" healthapi "google.golang.org/grpc/health/grpc_health_v1" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/server/streaming" osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" "github.com/kubernetes-incubator/cri-containerd/pkg/registrar" @@ -84,6 +86,8 @@ type criContainerdService struct { // imageStoreService is the containerd service to store and track // image metadata. imageStoreService images.Store + // eventsService is the containerd task service client + eventService events.EventsClient // versionService is the containerd version service client. versionService versionapi.VersionClient // healthService is the healthcheck service of containerd grpc server. @@ -94,12 +98,14 @@ type criContainerdService struct { agentFactory agents.AgentFactory // client is an instance of the containerd client client *containerd.Client - // eventsService is the containerd task service client - eventService events.EventsClient + // streamServer is the streaming server serves container streaming request. + streamServer streaming.Server } // NewCRIContainerdService returns a new instance of CRIContainerdService -func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir string) (CRIContainerdService, error) { +// TODO(random-liu): Add cri-containerd server config to get rid of the long arg list. +func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, networkPluginConfDir, + streamAddress, streamPort string) (CRIContainerdService, error) { // TODO(random-liu): [P2] Recover from runtime state and checkpoint. client, err := containerd.New(containerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace)) @@ -119,6 +125,7 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n containerService: client.ContainerService(), taskService: client.TaskService(), imageStoreService: client.ImageService(), + eventService: client.EventService(), contentStoreService: client.ContentStore(), // Use daemon default snapshotter. snapshotService: client.SnapshotService(""), @@ -127,7 +134,6 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n healthService: client.HealthService(), agentFactory: agents.NewAgentFactory(), client: client, - eventService: client.EventService(), } netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir) @@ -136,9 +142,20 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n } c.netPlugin = netPlugin + // prepare streaming server + c.streamServer, err = newStreamServer(c, streamAddress, streamPort) + if err != nil { + return nil, fmt.Errorf("failed to create stream server: %v", err) + } + return c, nil } func (c *criContainerdService) Start() { c.startEventMonitor() + go func() { + if err := c.streamServer.Start(true); err != nil { + glog.Errorf("Failed to start streaming server: %v", err) + } + }() } diff --git a/pkg/server/streaming.go b/pkg/server/streaming.go index 03bf99501..29eb9a62e 100644 --- a/pkg/server/streaming.go +++ b/pkg/server/streaming.go @@ -17,11 +17,75 @@ limitations under the License. package server import ( + "errors" + "fmt" + "io" + "net" + + "golang.org/x/net/context" + k8snet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/remotecommand" - _ "k8s.io/kubernetes/pkg/kubelet/server/streaming" + "k8s.io/kubernetes/pkg/kubelet/server/streaming" + // TODO(random-liu): k8s.io/utils/exec after bump up kubernetes version. + "k8s.io/kubernetes/pkg/util/exec" ) +func newStreamServer(c *criContainerdService, addr, port string) (streaming.Server, error) { + if addr == "" { + a, err := k8snet.ChooseBindAddress(nil) + if err != nil { + return nil, fmt.Errorf("failed to get stream server address: %v", err) + } + addr = a.String() + } + config := streaming.DefaultConfig + config.Addr = net.JoinHostPort(addr, port) + runtime := newStreamRuntime(c) + return streaming.NewServer(config, runtime) +} + +type streamRuntime struct { + c *criContainerdService +} + +func newStreamRuntime(c *criContainerdService) streaming.Runtime { + return &streamRuntime{c: c} +} + +// Exec executes a command inside the container. exec.ExitError is returned if the command +// returns non-zero exit code. +func (s *streamRuntime) Exec(containerID string, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, + tty bool, resize <-chan remotecommand.TerminalSize) error { + exitCode, err := s.c.execInContainer(context.Background(), containerID, execOptions{ + cmd: cmd, + stdin: stdin, + stdout: stdout, + stderr: stderr, + tty: tty, + resize: resize, + }) + if err != nil { + return fmt.Errorf("failed to exec in container: %v", err) + } + if *exitCode == 0 { + return nil + } + return &exec.CodeExitError{ + Err: fmt.Errorf("error executing command %v, exit code %d", cmd, *exitCode), + Code: int(*exitCode), + } +} + +func (s *streamRuntime) Attach(containerID string, in io.Reader, out, err io.WriteCloser, tty bool, + resize <-chan remotecommand.TerminalSize) error { + return errors.New("not implemented") +} + +func (s *streamRuntime) PortForward(podSandboxID string, port int32, stream io.ReadWriteCloser) error { + return errors.New("not implemented") +} + // handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each // remotecommand.TerminalSize received from the channel. The resize channel must be closed elsewhere to stop the // goroutine.