diff --git a/pkg/metadata/container.go b/pkg/metadata/container.go index f91a1b7d7..5992ba668 100644 --- a/pkg/metadata/container.go +++ b/pkg/metadata/container.go @@ -19,9 +19,10 @@ package metadata import ( "encoding/json" - "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" - + runtimespec "github.com/opencontainers/runtime-spec/specs-go" runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" ) // The code is very similar with sandbox.go, but there is no template support @@ -71,8 +72,13 @@ type ContainerMetadata struct { // In fact, this field doesn't need to be checkpointed. // TODO(random-liu): Skip this during serialization when we put object // into the store directly. - // TODO(random-liu): Reset this field to false during state recoverry. + // TODO(random-liu): Reset this field to false during state recovery. Removing bool + // TODO(random-liu): Remove following field after switching to new containerd + // client. + // Not including them in unit test now because they will be removed soon. + // Spec is the oci runtime spec used to run the container. + Spec *runtimespec.Spec } // State returns current state of the container based on the metadata. diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 3124fddd9..5a762a8bb 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -160,6 +160,7 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C // Update container CreatedAt. meta.CreatedAt = time.Now().UnixNano() + meta.Spec = spec // Add container into container store. if err := c.containerStore.Create(meta); err != nil { return nil, fmt.Errorf("failed to add container metadata %+v into store: %v", diff --git a/pkg/server/container_create_test.go b/pkg/server/container_create_test.go index 186987e8f..137577d6c 100644 --- a/pkg/server/container_create_test.go +++ b/pkg/server/container_create_test.go @@ -565,13 +565,6 @@ func TestCreateContainer(t *testing.T) { id := resp.GetContainerId() assert.True(t, rootExists) assert.Equal(t, getContainerRootDir(c.rootDir, id), rootPath, "root directory should be created") - meta, err := c.containerStore.Get(id) - assert.NoError(t, err) - require.NotNil(t, meta) - test.expectMeta.ID = id - // TODO(random-liu): Use fake clock to test CreatedAt. - test.expectMeta.CreatedAt = meta.CreatedAt - assert.Equal(t, test.expectMeta, meta, "container metadata should be created") // Check runtime spec containersCalls := fake.GetCalledDetails() @@ -593,5 +586,14 @@ func TestCreateContainer(t *testing.T) { Key: id, Parent: testChainID, }, prepareOpts, "prepare request should be correct") + + meta, err := c.containerStore.Get(id) + assert.NoError(t, err) + require.NotNil(t, meta) + test.expectMeta.ID = id + // TODO(random-liu): Use fake clock to test CreatedAt. + test.expectMeta.CreatedAt = meta.CreatedAt + test.expectMeta.Spec = spec + assert.Equal(t, test.expectMeta, meta, "container metadata should be created") } } diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index a96b85f6b..e08533e93 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -17,15 +17,136 @@ limitations under the License. package server import ( - "errors" + "bytes" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/task" + prototypes "github.com/gogo/protobuf/types" + "github.com/golang/glog" + runtimespec "github.com/opencontainers/runtime-spec/specs-go" "golang.org/x/net/context" - runtime "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1" ) // ExecSync executes a command in the container, and returns the stdout output. // If command exits with a non-zero exit code, an error is returned. -func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (*runtime.ExecSyncResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSyncRequest) (retRes *runtime.ExecSyncResponse, retErr error) { + glog.V(2).Infof("ExecSync for %q with command %+v and timeout %d (s)", r.GetContainerId(), r.GetCmd(), r.GetTimeout()) + defer func() { + if retErr == nil { + glog.V(2).Infof("ExecSync for %q returns with exit code %d", r.GetContainerId(), retRes.GetExitCode()) + glog.V(4).Infof("ExecSync for %q outputs - stdout: %q, stderr: %q", r.GetContainerId(), + retRes.GetStdout(), retRes.GetStderr()) + } + }() + + // Get container config from container store. + meta, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + id := meta.ID + + if meta.State() != runtime.ContainerState_CONTAINER_RUNNING { + return nil, fmt.Errorf("container %q is in %s state", id, criContainerStateToString(meta.State())) + } + + // TODO(random-liu): Replace the following logic with containerd client and add unit test. + // Prepare streaming pipes. + execDir, err := ioutil.TempDir(getContainerRootDir(c.rootDir, id), "exec") + if err != nil { + return nil, fmt.Errorf("failed to create exec streaming directory: %v", err) + } + defer func() { + if err = c.os.RemoveAll(execDir); err != nil { + glog.Errorf("Failed to remove exec streaming directory %q: %v", execDir, err) + } + }() + _, stdout, stderr := getStreamingPipes(execDir) + _, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr) + if err != nil { + return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err) + } + defer stdoutPipe.Close() + defer stderrPipe.Close() + + // Start redirecting exec output. + stdoutBuf, stderrBuf := new(bytes.Buffer), new(bytes.Buffer) + go io.Copy(stdoutBuf, stdoutPipe) // nolint: errcheck + go io.Copy(stderrBuf, stderrPipe) // nolint: errcheck + + // Get containerd event client first, so that we won't miss any events. + // TODO(random-liu): Handle this in event handler. Create an events client for + // each exec introduces unnecessary overhead. + cancellable, cancel := context.WithCancel(ctx) + events, err := c.taskService.Events(cancellable, &execution.EventsRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to get containerd event: %v", err) + } + + spec := &meta.Spec.Process + spec.Args = r.GetCmd() + rawSpec, err := json.Marshal(spec) + if err != nil { + return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err) + } + + resp, err := c.taskService.Exec(ctx, &execution.ExecRequest{ + ContainerID: id, + Terminal: false, + Stdout: stdout, + Stderr: stderr, + Spec: &prototypes.Any{ + TypeUrl: runtimespec.Version, + Value: rawSpec, + }, + }) + if err != nil { + return nil, fmt.Errorf("failed to exec in container %q: %v", id, err) + } + exitCode, err := waitContainerExec(cancel, events, id, resp.Pid, r.GetTimeout()) + if err != nil { + return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err) + } + + // TODO(random-liu): Make sure stdout/stderr are drained. + return &runtime.ExecSyncResponse{ + Stdout: stdoutBuf.Bytes(), + Stderr: stderrBuf.Bytes(), + ExitCode: int32(exitCode), + }, nil +} + +// waitContainerExec waits for container exec to finish and returns the exit code. +func waitContainerExec(cancel context.CancelFunc, events execution.Tasks_EventsClient, id string, + pid uint32, timeout int64) (uint32, error) { + // TODO(random-liu): [P1] Support ExecSync timeout. + // TODO(random-liu): Delete process after containerd upgrade. + defer func() { + // Stop events and drain the event channel. grpc-go#188 + cancel() + for { + _, err := events.Recv() + if err != nil { + break + } + } + }() + for { + e, err := events.Recv() + if err != nil { + // Return non-zero exit code just in case. + return unknownExitCode, err + } + if e.Type != task.Event_EXIT { + continue + } + if e.ID == id && e.Pid == pid { + return e.ExitStatus, nil + } + } } diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index c4583196f..430866abb 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -50,6 +50,8 @@ const ( completeExitReason = "Completed" // errorExitReason is the exit reason when container exits with code non-zero. errorExitReason = "Error" + // unknownExitCode is the exit code when exit reason is unknown. + unknownExitCode = 255 ) const (