diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index 1f388a856..11fb3c488 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -19,12 +19,15 @@ package server import ( "bytes" "fmt" + "io" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/typeurl" "github.com/golang/glog" "golang.org/x/net/context" + "k8s.io/client-go/tools/remotecommand" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) @@ -40,12 +43,46 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync } }() + var stdin, stdout, stderr bytes.Buffer + exitCode, err := c.execInContainer(ctx, r.GetContainerId(), execOptions{ + cmd: r.GetCmd(), + stdin: &stdin, + stdout: &stdout, + stderr: &stderr, + timeout: time.Duration(r.GetTimeout()) * time.Second, + }) + if err != nil { + return nil, fmt.Errorf("failed to exec in container: %v", err) + } + + return &runtime.ExecSyncResponse{ + Stdout: stdout.Bytes(), + Stderr: stderr.Bytes(), + ExitCode: int32(*exitCode), + }, nil +} + +// execOptions specifies how to execute command in container. +type execOptions struct { + cmd []string + stdin io.Reader + stdout io.Writer + stderr io.Writer + tty bool + resize <-chan remotecommand.TerminalSize + timeout time.Duration +} + +// execInContainer executes a command inside the container synchronously, and +// redirects stdio stream properly. +// TODO(random-liu): Support timeout. +func (c *criContainerdService) execInContainer(ctx context.Context, id string, opts execOptions) (*uint32, error) { // Get container from our container store. - cntr, err := c.containerStore.Get(r.GetContainerId()) + cntr, err := c.containerStore.Get(id) if err != nil { return nil, fmt.Errorf("failed to find container in store: %v", err) } - id := cntr.ID + id = cntr.ID state := cntr.Status.Get().State() if state != runtime.ContainerState_CONTAINER_RUNNING { @@ -65,14 +102,17 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync if err != nil { return nil, fmt.Errorf("failed to load task: %v", err) } - pspec := spec.Process - pspec.Args = r.GetCmd() + pspec.Args = opts.cmd + pspec.Terminal = opts.tty execID := generateID() - stdinBuf, stdoutBuf, stderrBuf := new(bytes.Buffer), new(bytes.Buffer), new(bytes.Buffer) - io := containerd.NewIOWithTerminal(stdinBuf, stdoutBuf, stderrBuf, pspec.Terminal) - process, err := task.Exec(ctx, execID, pspec, io) + process, err := task.Exec(ctx, execID, pspec, containerd.NewIOWithTerminal( + opts.stdin, + opts.stdout, + opts.stderr, + opts.tty, + )) if err != nil { return nil, fmt.Errorf("failed to create exec %q: %v", execID, err) } @@ -82,6 +122,12 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync } }() + handleResizing(opts.resize, func(size remotecommand.TerminalSize) { + if err := process.Resize(ctx, uint32(size.Width), uint32(size.Height)); err != nil { + glog.Errorf("Failed to resize process %q console for container %q: %v", execID, id, err) + } + }) + // Get containerd event client first, so that we won't miss any events. // TODO(random-liu): Add filter to only subscribe events of the exec process. // TODO(random-liu): Use `Wait` after is fixed. (containerd#1279, containerd#1287) @@ -100,26 +146,20 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync if err != nil { return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err) } - // TODO(random-liu): [P1] Deal with timeout, kill and wait again on timeout. // Wait for the io to be drained. process.IO().Wait() - return &runtime.ExecSyncResponse{ - Stdout: stdoutBuf.Bytes(), - Stderr: stderrBuf.Bytes(), - ExitCode: int32(exitCode), - }, nil + return exitCode, nil } // waitContainerExec waits for container exec to finish and returns the exit code. func (c *criContainerdService) waitContainerExec(eventstream events.Events_SubscribeClient, id string, - execID string) (uint32, error) { + execID string) (*uint32, error) { for { evt, err := eventstream.Recv() if err != nil { - // Return non-zero exit code just in case. - return unknownExitCode, err + return nil, err } // Continue until the event received is of type task exit. if !typeurl.Is(evt.Event, &events.TaskExit{}) { @@ -127,11 +167,11 @@ func (c *criContainerdService) waitContainerExec(eventstream events.Events_Subsc } any, err := typeurl.UnmarshalAny(evt.Event) if err != nil { - return unknownExitCode, err + return nil, err } e := any.(*events.TaskExit) if e.ContainerID == id && e.ID == execID { - return e.ExitStatus, nil + return &e.ExitStatus, nil } } } diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 3dc59762b..4a08d58a7 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -52,8 +52,6 @@ const ( errorExitReason = "Error" // oomExitReason is the exit reason when process in container is oom killed. oomExitReason = "OOMKilled" - // unknownExitCode is the exit code when exit reason is unknown. - unknownExitCode = 255 ) const ( diff --git a/pkg/server/streaming.go b/pkg/server/streaming.go new file mode 100644 index 000000000..03bf99501 --- /dev/null +++ b/pkg/server/streaming.go @@ -0,0 +1,47 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/tools/remotecommand" + _ "k8s.io/kubernetes/pkg/kubelet/server/streaming" +) + +// handleResizing spawns a goroutine that processes the resize channel, calling resizeFunc for each +// remotecommand.TerminalSize received from the channel. The resize channel must be closed elsewhere to stop the +// goroutine. +func handleResizing(resize <-chan remotecommand.TerminalSize, resizeFunc func(size remotecommand.TerminalSize)) { + if resize == nil { + return + } + + go func() { + defer runtime.HandleCrash() + + for { + size, ok := <-resize + if !ok { + return + } + if size.Height < 1 || size.Width < 1 { + continue + } + resizeFunc(size) + } + }() +}