From 73748840daddc24e2fc1113cbe13e200aa998914 Mon Sep 17 00:00:00 2001 From: Mike Brown Date: Mon, 24 Jul 2017 20:24:04 -0500 Subject: [PATCH] Swicth to 1.0.0-alpha2 containerd api. Signed-off-by: Mike Brown --- hack/test-cri.sh | 1 + hack/versions | 4 +- pkg/server/container_create.go | 28 ++++---- pkg/server/container_create_test.go | 2 +- pkg/server/container_execsync.go | 59 +++++++++-------- pkg/server/container_remove.go | 7 +- pkg/server/container_start.go | 16 ++--- pkg/server/container_stop.go | 10 +-- pkg/server/events.go | 49 ++++++++------ pkg/server/helpers.go | 6 +- pkg/server/image_pull.go | 35 ++++++++-- pkg/server/image_remove.go | 6 +- pkg/server/sandbox_list.go | 4 +- pkg/server/sandbox_remove.go | 11 ++-- pkg/server/sandbox_run.go | 43 ++++++------ pkg/server/sandbox_run_test.go | 2 +- pkg/server/sandbox_status.go | 4 +- pkg/server/sandbox_stop.go | 79 ++++++++++++++++++++--- pkg/server/service.go | 27 ++++---- pkg/server/testing/mock_version_client.go | 2 +- pkg/server/version_test.go | 2 +- 21 files changed, 246 insertions(+), 151 deletions(-) diff --git a/hack/test-cri.sh b/hack/test-cri.sh index 1006f5b29..604ff05eb 100755 --- a/hack/test-cri.sh +++ b/hack/test-cri.sh @@ -61,6 +61,7 @@ if [ ! -x "$(command -v containerd)" ]; then fi sudo pkill containerd sudo containerd -l debug &> ${REPORT_DIR}/containerd.log & +sleep 1 # sleep 1 seconds for containerd to be ready. # Start cri-containerd cd ${ROOT} diff --git a/hack/versions b/hack/versions index 66ff2ad34..5d9898380 100644 --- a/hack/versions +++ b/hack/versions @@ -1,4 +1,4 @@ -RUNC_VERSION=639454475cb9c8b861cc599f8bcd5c8c790ae402 +RUNC_VERSION=e775f0fba3ea329b8b766451c892c41a3d49594d CNI_VERSION=v0.4.0 -CONTAINERD_VERSION=8ed1e24ae925b5c6d8195858ee89dddb0507d65f +CONTAINERD_VERSION=2386062ce152d6f158d22be5991fe11c7cf67535 CRITEST_VERSION=74bbd4e142f752f13c648d9dde23defed3e472a2 diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index d4ce0aa6d..8a83efa49 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -22,7 +22,7 @@ import ( "strings" "time" - "github.com/containerd/containerd/api/services/containers" + "github.com/containerd/containerd/containers" prototypes "github.com/gogo/protobuf/types" "github.com/golang/glog" imagespec "github.com/opencontainers/image-spec/specs-go/v1" @@ -135,24 +135,22 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C }() // Create containerd container. - if _, err = c.containerService.Create(ctx, &containers.CreateContainerRequest{ - Container: containers.Container{ - ID: id, - // TODO(random-liu): Checkpoint metadata into container labels. - Image: image.ID, - Runtime: defaultRuntime, - Spec: &prototypes.Any{ - TypeUrl: runtimespec.Version, - Value: rawSpec, - }, - RootFS: id, + if _, err = c.containerService.Create(ctx, containers.Container{ + ID: id, + // TODO(random-liu): Checkpoint metadata into container labels. + Image: image.ID, + Runtime: containers.RuntimeInfo{Name: defaultRuntime}, + Spec: &prototypes.Any{ + TypeUrl: runtimespec.Version, + Value: rawSpec, }, + RootFS: id, }); err != nil { return nil, fmt.Errorf("failed to create containerd container: %v", err) } defer func() { if retErr != nil { - if _, err := c.containerService.Delete(ctx, &containers.DeleteContainerRequest{ID: id}); err != nil { + if err := c.containerService.Delete(ctx, id); err != nil { glog.Errorf("Failed to delete containerd container %q: %v", id, err) } } @@ -419,8 +417,8 @@ func setOCILinuxResource(g *generate.Generator, resources *runtime.LinuxContaine g.SetLinuxResourcesCPUPeriod(uint64(resources.GetCpuPeriod())) g.SetLinuxResourcesCPUQuota(resources.GetCpuQuota()) g.SetLinuxResourcesCPUShares(uint64(resources.GetCpuShares())) - g.SetLinuxResourcesMemoryLimit(uint64(resources.GetMemoryLimitInBytes())) - g.SetLinuxResourcesOOMScoreAdj(int(resources.GetOomScoreAdj())) + g.SetLinuxResourcesMemoryLimit(resources.GetMemoryLimitInBytes()) + g.SetProcessOOMScoreAdj(int(resources.GetOomScoreAdj())) } // setOCICapabilities adds/drops process capabilities. diff --git a/pkg/server/container_create_test.go b/pkg/server/container_create_test.go index aa7172852..86420db2b 100644 --- a/pkg/server/container_create_test.go +++ b/pkg/server/container_create_test.go @@ -128,7 +128,7 @@ func getCreateContainerTestData() (*runtime.ContainerConfig, *runtime.PodSandbox assert.EqualValues(t, *spec.Linux.Resources.CPU.Quota, 200) assert.EqualValues(t, *spec.Linux.Resources.CPU.Shares, 300) assert.EqualValues(t, *spec.Linux.Resources.Memory.Limit, 400) - assert.EqualValues(t, *spec.Linux.Resources.OOMScoreAdj, 500) + assert.EqualValues(t, *spec.Process.OOMScoreAdj, 500) t.Logf("Check capabilities") assert.Contains(t, spec.Process.Capabilities.Bounding, "CAP_SYS_ADMIN") diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index c24602f76..87bc789db 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -23,9 +23,9 @@ import ( "io" "io/ioutil" - "github.com/containerd/containerd/api/services/containers" - "github.com/containerd/containerd/api/services/execution" - "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/typeurl" prototypes "github.com/gogo/protobuf/types" "github.com/golang/glog" runtimespec "github.com/opencontainers/runtime-spec/specs-go" @@ -58,15 +58,15 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync } // Get exec process spec. - cntrResp, err := c.containerService.Get(ctx, &containers.GetContainerRequest{ID: id}) + container, err := c.containerService.Get(ctx, id) if err != nil { return nil, fmt.Errorf("failed to get container %q from containerd: %v", id, err) } var spec runtimespec.Spec - if err := json.Unmarshal(cntrResp.Container.Spec.Value, &spec); err != nil { + if err := json.Unmarshal(container.Spec.Value, &spec); err != nil { return nil, fmt.Errorf("failed to unmarshal container spec: %v", err) } - pspec := &spec.Process + pspec := spec.Process pspec.Args = r.GetCmd() rawSpec, err := json.Marshal(pspec) if err != nil { @@ -98,15 +98,16 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync go io.Copy(stderrBuf, stderrPipe) // nolint: errcheck // Get containerd event client first, so that we won't miss any events. - // TODO(random-liu): Handle this in event handler. Create an events client for - // each exec introduces unnecessary overhead. + // TODO(random-liu): Add filter to only subscribe events of the exec process. cancellable, cancel := context.WithCancel(ctx) - events, err := c.taskService.Events(cancellable, &execution.EventsRequest{}) + eventstream, err := c.eventService.Subscribe(cancellable, &events.SubscribeRequest{}) if err != nil { return nil, fmt.Errorf("failed to get containerd event: %v", err) } + defer cancel() - resp, err := c.taskService.Exec(ctx, &execution.ExecRequest{ + execID := generateID() + _, err = c.taskService.Exec(ctx, &tasks.ExecProcessRequest{ ContainerID: id, Terminal: false, Stdout: stdout, @@ -115,14 +116,22 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync TypeUrl: runtimespec.Version, Value: rawSpec, }, + ExecID: execID, }) if err != nil { return nil, fmt.Errorf("failed to exec in container %q: %v", id, err) } - exitCode, err := waitContainerExec(cancel, events, id, resp.Pid, r.GetTimeout()) + exitCode, err := c.waitContainerExec(eventstream, id, execID) if err != nil { return nil, fmt.Errorf("failed to wait for exec in container %q to finish: %v", id, err) } + if _, err := c.taskService.DeleteProcess(ctx, &tasks.DeleteProcessRequest{ + ContainerID: id, + ExecID: execID, + }); err != nil && !isContainerdGRPCNotFoundError(err) { + return nil, fmt.Errorf("failed to delete exec %q in container %q: %v", execID, id, err) + } + // TODO(random-liu): [P1] Deal with timeout, kill and wait again on timeout. // TODO(random-liu): Make sure stdout/stderr are drained. return &runtime.ExecSyncResponse{ @@ -133,30 +142,24 @@ func (c *criContainerdService) ExecSync(ctx context.Context, r *runtime.ExecSync } // waitContainerExec waits for container exec to finish and returns the exit code. -func waitContainerExec(cancel context.CancelFunc, events execution.Tasks_EventsClient, id string, - pid uint32, timeout int64) (uint32, error) { - // TODO(random-liu): [P1] Support ExecSync timeout. - // TODO(random-liu): Delete process after containerd upgrade. - defer func() { - // Stop events and drain the event channel. grpc-go#188 - cancel() - for { - _, err := events.Recv() - if err != nil { - break - } - } - }() +func (c *criContainerdService) waitContainerExec(eventstream events.Events_SubscribeClient, id string, + execID string) (uint32, error) { for { - e, err := events.Recv() + evt, err := eventstream.Recv() if err != nil { // Return non-zero exit code just in case. return unknownExitCode, err } - if e.Type != task.Event_EXIT { + // Continue until the event received is of type task exit. + if !typeurl.Is(evt.Event, &events.TaskExit{}) { continue } - if e.ID == id && e.Pid == pid { + any, err := typeurl.UnmarshalAny(evt.Event) + if err != nil { + return unknownExitCode, err + } + e := any.(*events.TaskExit) + if e.ContainerID == id && e.ID == execID { return e.ExitStatus, nil } } diff --git a/pkg/server/container_remove.go b/pkg/server/container_remove.go index 5897d7f9b..3a2575ac9 100644 --- a/pkg/server/container_remove.go +++ b/pkg/server/container_remove.go @@ -19,8 +19,7 @@ package server import ( "fmt" - "github.com/containerd/containerd/api/services/containers" - "github.com/containerd/containerd/snapshot" + "github.com/containerd/containerd/errdefs" "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -71,7 +70,7 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R // Remove container snapshot. if err := c.snapshotService.Remove(ctx, id); err != nil { - if !snapshot.IsNotExist(err) { + if !errdefs.IsNotFound(err) { return nil, fmt.Errorf("failed to remove container snapshot %q: %v", id, err) } glog.V(5).Infof("Remove called for snapshot %q that does not exist", id) @@ -89,7 +88,7 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R } // Delete containerd container. - if _, err := c.containerService.Delete(ctx, &containers.DeleteContainerRequest{ID: id}); err != nil { + if err := c.containerService.Delete(ctx, id); err != nil { if !isContainerdGRPCNotFoundError(err) { return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err) } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 32a2bcbe5..0eafb3114 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -23,8 +23,8 @@ import ( "path/filepath" "time" - "github.com/containerd/containerd/api/services/execution" - "github.com/containerd/containerd/api/types/mount" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/api/types/task" "github.com/golang/glog" "golang.org/x/net/context" @@ -97,7 +97,7 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me sandboxConfig := sandbox.Config sandboxID := meta.SandboxID // Make sure sandbox is running. - sandboxInfo, err := c.taskService.Info(ctx, &execution.InfoRequest{ContainerID: sandboxID}) + sandboxInfo, err := c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: sandboxID}) if err != nil { return fmt.Errorf("failed to get sandbox container %q info: %v", sandboxID, err) } @@ -153,9 +153,9 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me if err != nil { return fmt.Errorf("failed to get rootfs mounts %q: %v", id, err) } - var rootfs []*mount.Mount + var rootfs []*types.Mount for _, m := range rootfsMounts { - rootfs = append(rootfs, &mount.Mount{ + rootfs = append(rootfs, &types.Mount{ Type: m.Type, Source: m.Source, Options: m.Options, @@ -163,7 +163,7 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me } // Create containerd task. - createOpts := &execution.CreateRequest{ + createOpts := &tasks.CreateTaskRequest{ ContainerID: id, Rootfs: rootfs, Stdin: stdin, @@ -180,14 +180,14 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me defer func() { if retErr != nil { // Cleanup the containerd task if an error is returned. - if _, err := c.taskService.Delete(ctx, &execution.DeleteRequest{ContainerID: id}); err != nil { + if _, err := c.taskService.Delete(ctx, &tasks.DeleteTaskRequest{ContainerID: id}); err != nil { glog.Errorf("Failed to delete containerd task %q: %v", id, err) } } }() // Start containerd task. - if _, err := c.taskService.Start(ctx, &execution.StartRequest{ContainerID: id}); err != nil { + if _, err := c.taskService.Start(ctx, &tasks.StartTaskRequest{ContainerID: id}); err != nil { return fmt.Errorf("failed to start containerd task %q: %v", id, err) } diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index 299550d4b..52593f920 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -20,7 +20,7 @@ import ( "fmt" "time" - "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/services/tasks/v1" "github.com/docker/docker/pkg/signal" "github.com/golang/glog" "golang.org/x/net/context" @@ -94,10 +94,10 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont } } glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal) - _, err = c.taskService.Kill(ctx, &execution.KillRequest{ + _, err = c.taskService.Kill(ctx, &tasks.KillRequest{ ContainerID: id, Signal: uint32(stopSignal), - PidOrAll: &execution.KillRequest_All{All: true}, + All: true, }) if err != nil { if !isContainerdGRPCNotFoundError(err) && !isRuncProcessAlreadyFinishedError(err) { @@ -115,10 +115,10 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont // Event handler will Delete the container from containerd after it handles the Exited event. glog.V(2).Infof("Kill container %q", id) - _, err := c.taskService.Kill(ctx, &execution.KillRequest{ + _, err := c.taskService.Kill(ctx, &tasks.KillRequest{ ContainerID: id, Signal: uint32(unix.SIGKILL), - PidOrAll: &execution.KillRequest_All{All: true}, + All: true, }) if err != nil { if !isContainerdGRPCNotFoundError(err) && !isRuncProcessAlreadyFinishedError(err) { diff --git a/pkg/server/events.go b/pkg/server/events.go index 0d9c96914..c26f3738c 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -19,8 +19,9 @@ package server import ( "time" - "github.com/containerd/containerd/api/services/execution" - "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/typeurl" "github.com/golang/glog" "github.com/jpillora/backoff" "golang.org/x/net/context" @@ -48,7 +49,7 @@ func (c *criContainerdService) startEventMonitor() { } go func() { for { - events, err := c.taskService.Events(context.Background(), &execution.EventsRequest{}) + eventstream, err := c.eventService.Subscribe(context.Background(), &events.SubscribeRequest{}) if err != nil { glog.Errorf("Failed to connect to containerd event stream: %v", err) time.Sleep(b.Duration()) @@ -59,7 +60,7 @@ func (c *criContainerdService) startEventMonitor() { // TODO(random-liu): Relist to recover state, should prevent other operations // until state is fully recovered. for { - if err := c.handleEventStream(events); err != nil { + if err := c.handleEventStream(eventstream); err != nil { glog.Errorf("Failed to handle event stream: %v", err) break } @@ -69,27 +70,34 @@ func (c *criContainerdService) startEventMonitor() { } // handleEventStream receives an event from containerd and handles the event. -func (c *criContainerdService) handleEventStream(events execution.Tasks_EventsClient) error { - e, err := events.Recv() +func (c *criContainerdService) handleEventStream(eventstream events.Events_SubscribeClient) error { + e, err := eventstream.Recv() if err != nil { return err } - glog.V(2).Infof("Received container event: %+v", e) + glog.V(4).Infof("Received container event timestamp - %v, namespace - %q, topic - %q", e.Timestamp, e.Namespace, e.Topic) c.handleEvent(e) return nil } // handleEvent handles a containerd event. -func (c *criContainerdService) handleEvent(e *task.Event) { - switch e.Type { +func (c *criContainerdService) handleEvent(evt *events.Envelope) { + any, err := typeurl.UnmarshalAny(evt.Event) + if err != nil { + glog.Errorf("Failed to convert event envelope %+v: %v", evt, err) + return + } + switch any.(type) { // If containerd-shim exits unexpectedly, there will be no corresponding event. // However, containerd could not retrieve container state in that case, so it's // fine to leave out that case for now. // TODO(random-liu): [P2] Handle containerd-shim exit. - case task.Event_EXIT: - cntr, err := c.containerStore.Get(e.ID) + case *events.TaskExit: + e := any.(*events.TaskExit) + glog.V(2).Infof("TaskExit event %+v", e) + cntr, err := c.containerStore.Get(e.ContainerID) if err != nil { - glog.Errorf("Failed to get container %q: %v", e.ID, err) + glog.Errorf("Failed to get container %q: %v", e.ContainerID, err) return } if e.Pid != cntr.Status.Get().Pid { @@ -97,10 +105,11 @@ func (c *criContainerdService) handleEvent(e *task.Event) { return } // Delete the container from containerd. - _, err = c.taskService.Delete(context.Background(), &execution.DeleteRequest{ContainerID: e.ID}) + _, err = c.taskService.Delete(context.Background(), &tasks.DeleteTaskRequest{ContainerID: e.ContainerID}) + // TODO(random-liu): Change isContainerdGRPCNotFoundError to use errdefs. if err != nil && !isContainerdGRPCNotFoundError(err) { // TODO(random-liu): [P0] Enqueue the event and retry. - glog.Errorf("Failed to delete container %q: %v", e.ID, err) + glog.Errorf("Failed to delete container %q: %v", e.ContainerID, err) return } err = cntr.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { @@ -115,21 +124,23 @@ func (c *criContainerdService) handleEvent(e *task.Event) { return status, nil }) if err != nil { - glog.Errorf("Failed to update container %q state: %v", e.ID, err) + glog.Errorf("Failed to update container %q state: %v", e.ContainerID, err) // TODO(random-liu): [P0] Enqueue the event and retry. return } - case task.Event_OOM: - cntr, err := c.containerStore.Get(e.ID) + case *events.TaskOOM: + e := any.(*events.TaskOOM) + glog.V(2).Infof("TaskOOM event %+v", e) + cntr, err := c.containerStore.Get(e.ContainerID) if err != nil { - glog.Errorf("Failed to get container %q: %v", e.ID, err) + glog.Errorf("Failed to get container %q: %v", e.ContainerID, err) } err = cntr.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { status.Reason = oomExitReason return status, nil }) if err != nil { - glog.Errorf("Failed to update container %q oom: %v", e.ID, err) + glog.Errorf("Failed to update container %q oom: %v", e.ContainerID, err) return } } diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index f95a7b382..3dc59762b 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -25,7 +25,7 @@ import ( "strings" "syscall" - containerdmetadata "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/errdefs" "github.com/docker/distribution/reference" "github.com/docker/docker/pkg/stringid" imagedigest "github.com/opencontainers/go-digest" @@ -69,7 +69,7 @@ const ( relativeRootfsPath = "rootfs" // defaultRuntime is the runtime to use in containerd. We may support // other runtime in the future. - defaultRuntime = "linux" + defaultRuntime = "io.containerd.runtime.v1.linux" // sandboxesDir contains all sandbox root. A sandbox root is the running // directory of the sandbox, all files created for the sandbox will be // placed under this directory. @@ -339,7 +339,7 @@ func (c *criContainerdService) localResolve(ctx context.Context, ref string) (*i } imageInContainerd, err := c.imageStoreService.Get(ctx, normalized.String()) if err != nil { - if containerdmetadata.IsNotFound(err) { + if errdefs.IsNotFound(err) { return nil, nil } return nil, fmt.Errorf("an error occurred when getting image %q from containerd image store: %v", diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index 6eb4992df..f39eb0370 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -27,6 +27,7 @@ import ( "time" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" containerdimages "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" @@ -279,9 +280,8 @@ func (c *criContainerdService) pullImage(ctx context.Context, rawRef string, aut if r == "" { continue } - if err := c.imageStoreService.Put(ctx, r, desc); err != nil { - return "", "", "", fmt.Errorf("failed to put image reference %q desc %v into containerd image store: %v", - r, desc, err) + if err := c.createImageReference(ctx, r, desc); err != nil { + return "", "", "", fmt.Errorf("failed to update image reference %q: %v", r, err) } } // Do not cleanup if following operations fail so as to make resumable download possible. @@ -331,13 +331,34 @@ func (c *criContainerdService) pullImage(ctx context.Context, rawRef string, aut // Use config digest as imageID to conform to oci image spec, and also add image id as // image reference. imageID := configDesc.Digest.String() - if err := c.imageStoreService.Put(ctx, imageID, desc); err != nil { - return "", "", "", fmt.Errorf("failed to put image id %q into containerd image store: %v", - imageID, err) + if err := c.createImageReference(ctx, imageID, desc); err != nil { + return "", "", "", fmt.Errorf("failed to update image id %q: %v", imageID, err) } return imageID, repoTag, repoDigest, nil } +// createImageReference creates image reference inside containerd image store. +// Note that because create and update are not finished in one transaction, there could be race. E.g. +// the image reference is deleted by someone else after create returns already exists, but before update +// happens. +func (c *criContainerdService) createImageReference(ctx context.Context, name string, desc imagespec.Descriptor) error { + img := containerdimages.Image{ + Name: name, + Target: desc, + } + // TODO(random-liu): Figure out which is the more performant sequence create then update or + // update then create. + _, err := c.imageStoreService.Create(ctx, img) + if err == nil { + return nil + } + if err != nil && !errdefs.IsAlreadyExists(err) { + return err + } + _, err = c.imageStoreService.Update(ctx, img, "target") + return err +} + // waitDownloadingPollInterval is the interval to check resource downloading progress. const waitDownloadingPollInterval = 200 * time.Millisecond @@ -350,7 +371,7 @@ func (c *criContainerdService) waitForResourcesDownloading(ctx context.Context, case <-ticker.C: // TODO(random-liu): Use better regexp when containerd `MakeRefKey` contains more // information. - statuses, err := c.contentStoreService.Status(ctx, "") + statuses, err := c.contentStoreService.ListStatuses(ctx, "") if err != nil { return fmt.Errorf("failed to get content status: %v", err) } diff --git a/pkg/server/image_remove.go b/pkg/server/image_remove.go index 6e3e23956..4f319cbd0 100644 --- a/pkg/server/image_remove.go +++ b/pkg/server/image_remove.go @@ -19,7 +19,7 @@ package server import ( "fmt" - containerdmetadata "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/errdefs" "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -51,8 +51,10 @@ func (c *criContainerdService) RemoveImage(ctx context.Context, r *runtime.Remov for _, ref := range append(append(image.RepoTags, image.RepoDigests...), image.ID) { // TODO(random-liu): Containerd should schedule a garbage collection immediately, // and we may want to wait for the garbage collection to be over here. + // TODO(random-liu): Should check whether descriptor is as expected before delete, + // so as to avoid deleting new reference because of staled reference. err = c.imageStoreService.Delete(ctx, ref) - if err == nil || containerdmetadata.IsNotFound(err) { + if err == nil || errdefs.IsNotFound(err) { continue } return nil, fmt.Errorf("failed to delete image reference %q for image %q: %v", ref, image.ID, err) diff --git a/pkg/server/sandbox_list.go b/pkg/server/sandbox_list.go index fea4f8bf0..71ac6aa1f 100644 --- a/pkg/server/sandbox_list.go +++ b/pkg/server/sandbox_list.go @@ -22,7 +22,7 @@ import ( "github.com/golang/glog" "golang.org/x/net/context" - "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/types/task" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -42,7 +42,7 @@ func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.Li // List all sandboxes from store. sandboxesInStore := c.sandboxStore.List() - resp, err := c.taskService.List(ctx, &execution.ListRequest{}) + resp, err := c.taskService.List(ctx, &tasks.ListTasksRequest{}) if err != nil { return nil, fmt.Errorf("failed to list sandbox containers: %v", err) } diff --git a/pkg/server/sandbox_remove.go b/pkg/server/sandbox_remove.go index c064ddce8..8d2cb8e77 100644 --- a/pkg/server/sandbox_remove.go +++ b/pkg/server/sandbox_remove.go @@ -19,9 +19,8 @@ package server import ( "fmt" - "github.com/containerd/containerd/api/services/containers" - "github.com/containerd/containerd/api/services/execution" - "github.com/containerd/containerd/snapshot" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/errdefs" "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -55,7 +54,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. // Return error if sandbox container is not fully stopped. // TODO(random-liu): [P0] Make sure network is torn down, may need to introduce a state. - _, err = c.taskService.Info(ctx, &execution.InfoRequest{ContainerID: id}) + _, err = c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: id}) if err != nil && !isContainerdGRPCNotFoundError(err) { return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) } @@ -65,7 +64,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. // Remove sandbox container snapshot. if err := c.snapshotService.Remove(ctx, id); err != nil { - if !snapshot.IsNotExist(err) { + if !errdefs.IsNotFound(err) { return nil, fmt.Errorf("failed to remove sandbox container snapshot %q: %v", id, err) } glog.V(5).Infof("Remove called for snapshot %q that does not exist", id) @@ -97,7 +96,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. } // Delete sandbox container. - if _, err := c.containerService.Delete(ctx, &containers.DeleteContainerRequest{ID: id}); err != nil { + if err := c.containerService.Delete(ctx, id); err != nil { if !isContainerdGRPCNotFoundError(err) { return nil, fmt.Errorf("failed to delete sandbox container %q: %v", id, err) } diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index adb3b57e9..6a15130cf 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -23,9 +23,9 @@ import ( "strings" "time" - "github.com/containerd/containerd/api/services/containers" - "github.com/containerd/containerd/api/services/execution" - "github.com/containerd/containerd/api/types/mount" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/api/types" + "github.com/containerd/containerd/containers" prototypes "github.com/gogo/protobuf/types" "github.com/golang/glog" imagespec "github.com/opencontainers/image-spec/specs-go/v1" @@ -90,9 +90,9 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run } } }() - var rootfs []*mount.Mount + var rootfs []*types.Mount for _, m := range rootfsMounts { - rootfs = append(rootfs, &mount.Mount{ + rootfs = append(rootfs, &types.Mount{ Type: m.Type, Source: m.Source, Options: m.Options, @@ -109,24 +109,22 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err) } glog.V(4).Infof("Sandbox container spec: %+v", spec) - if _, err = c.containerService.Create(ctx, &containers.CreateContainerRequest{ - Container: containers.Container{ - ID: id, - // TODO(random-liu): Checkpoint metadata into container labels. - Image: image.ID, - Runtime: defaultRuntime, - Spec: &prototypes.Any{ - TypeUrl: runtimespec.Version, - Value: rawSpec, - }, - RootFS: id, + if _, err = c.containerService.Create(ctx, containers.Container{ + ID: id, + // TODO(random-liu): Checkpoint metadata into container labels. + Image: image.ID, + Runtime: containers.RuntimeInfo{Name: defaultRuntime}, + Spec: &prototypes.Any{ + TypeUrl: runtimespec.Version, + Value: rawSpec, }, + RootFS: id, }); err != nil { return nil, fmt.Errorf("failed to create containerd container: %v", err) } defer func() { if retErr != nil { - if _, err := c.containerService.Delete(ctx, &containers.DeleteContainerRequest{ID: id}); err != nil { + if err := c.containerService.Delete(ctx, id); err != nil { glog.Errorf("Failed to delete containerd container%q: %v", id, err) } } @@ -181,7 +179,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run } }() - createOpts := &execution.CreateRequest{ + createOpts := &tasks.CreateTaskRequest{ ContainerID: id, Rootfs: rootfs, // No stdin for sandbox container. @@ -199,9 +197,8 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run defer func() { if retErr != nil { // Cleanup the sandbox container if an error is returned. - if _, err = c.taskService.Delete(ctx, &execution.DeleteRequest{ContainerID: id}); err != nil { - glog.Errorf("Failed to delete sandbox container %q: %v", - id, err) + if err := c.stopSandboxContainer(ctx, id); err != nil { + glog.Errorf("Failed to delete sandbox container %q: %v", id, err) } } }() @@ -226,7 +223,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run } // Start sandbox container in containerd. - if _, err := c.taskService.Start(ctx, &execution.StartRequest{ContainerID: id}); err != nil { + if _, err := c.taskService.Start(ctx, &tasks.StartTaskRequest{ContainerID: id}); err != nil { return nil, fmt.Errorf("failed to start sandbox container %q: %v", id, err) } @@ -314,7 +311,7 @@ func (c *criContainerdService) generateSandboxContainerSpec(id string, config *r // TODO(random-liu): [P2] Set apparmor and seccomp from annotations. g.SetLinuxResourcesCPUShares(uint64(defaultSandboxCPUshares)) - g.SetLinuxResourcesOOMScoreAdj(int(defaultSandboxOOMAdj)) + g.SetProcessOOMScoreAdj(int(defaultSandboxOOMAdj)) return g.Spec(), nil } diff --git a/pkg/server/sandbox_run_test.go b/pkg/server/sandbox_run_test.go index 6fe05cb31..ae281c166 100644 --- a/pkg/server/sandbox_run_test.go +++ b/pkg/server/sandbox_run_test.go @@ -60,7 +60,7 @@ func getRunPodSandboxTestData() (*runtime.PodSandboxConfig, *imagespec.ImageConf assert.Equal(t, []string{"/pause", "forever"}, spec.Process.Args) assert.Equal(t, "/workspace", spec.Process.Cwd) assert.EqualValues(t, *spec.Linux.Resources.CPU.Shares, defaultSandboxCPUshares) - assert.EqualValues(t, *spec.Linux.Resources.OOMScoreAdj, defaultSandboxOOMAdj) + assert.EqualValues(t, *spec.Process.OOMScoreAdj, defaultSandboxOOMAdj) } return config, imageConfig, specCheck } diff --git a/pkg/server/sandbox_status.go b/pkg/server/sandbox_status.go index b9364629d..9ddbb2a49 100644 --- a/pkg/server/sandbox_status.go +++ b/pkg/server/sandbox_status.go @@ -22,7 +22,7 @@ import ( "github.com/golang/glog" "golang.org/x/net/context" - "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/types/task" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -47,7 +47,7 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime. // Use the full sandbox id. id := sandbox.ID - info, err := c.taskService.Info(ctx, &execution.InfoRequest{ContainerID: id}) + info, err := c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: id}) if err != nil && !isContainerdGRPCNotFoundError(err) { return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) } diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index 4b9157dcd..3d6b28cf3 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -20,11 +20,13 @@ import ( "fmt" "os" + "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/api/services/tasks/v1" + "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/typeurl" "github.com/golang/glog" "golang.org/x/net/context" - - "github.com/containerd/containerd/api/services/execution" - + "golang.org/x/sys/unix" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) @@ -77,16 +79,73 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St glog.V(2).Infof("TearDown network for sandbox %q successfully", id) sandboxRoot := getSandboxRootDir(c.rootDir, id) - if err = c.unmountSandboxFiles(sandboxRoot, sandbox.Config); err != nil { + if err := c.unmountSandboxFiles(sandboxRoot, sandbox.Config); err != nil { return nil, fmt.Errorf("failed to unmount sandbox files in %q: %v", sandboxRoot, err) } - // TODO(random-liu): [P1] Handle sandbox container graceful deletion. - // Delete the sandbox container from containerd. - _, err = c.taskService.Delete(ctx, &execution.DeleteRequest{ContainerID: id}) - if err != nil && !isContainerdGRPCNotFoundError(err) { - return nil, fmt.Errorf("failed to delete sandbox container %q: %v", id, err) + if err := c.stopSandboxContainer(ctx, id); err != nil { + return nil, fmt.Errorf("failed to stop sandbox container %q: %v", id, err) } - return &runtime.StopPodSandboxResponse{}, nil } + +// stopSandboxContainer kills and deletes sandbox container. +func (c *criContainerdService) stopSandboxContainer(ctx context.Context, id string) error { + cancellable, cancel := context.WithCancel(ctx) + eventstream, err := c.eventService.Subscribe(cancellable, &events.SubscribeRequest{}) + if err != nil { + return fmt.Errorf("failed to get containerd event: %v", err) + } + defer cancel() + + resp, err := c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: id}) + if err != nil { + if isContainerdGRPCNotFoundError(err) { + return nil + } + return fmt.Errorf("failed to get sandbox container: %v", err) + } + if resp.Task.Status != task.StatusStopped { + // TODO(random-liu): [P1] Handle sandbox container graceful deletion. + if _, err := c.taskService.Kill(ctx, &tasks.KillRequest{ + ContainerID: id, + Signal: uint32(unix.SIGKILL), + All: true, + }); err != nil && !isContainerdGRPCNotFoundError(err) && !isRuncProcessAlreadyFinishedError(err) { + return fmt.Errorf("failed to kill sandbox container: %v", err) + } + + if err := c.waitSandboxContainer(eventstream, id, resp.Task.Pid); err != nil { + return fmt.Errorf("failed to wait for pod sandbox to stop: %v", err) + } + } + + // Delete the sandbox container from containerd. + _, err = c.taskService.Delete(ctx, &tasks.DeleteTaskRequest{ContainerID: id}) + if err != nil && !isContainerdGRPCNotFoundError(err) { + return fmt.Errorf("failed to delete sandbox container: %v", err) + } + return nil +} + +// waitSandboxContainer wait sandbox container stop event. +func (c *criContainerdService) waitSandboxContainer(eventstream events.Events_SubscribeClient, id string, pid uint32) error { + for { + evt, err := eventstream.Recv() + if err != nil { + return err + } + // Continue until the event received is of type task exit. + if !typeurl.Is(evt.Event, &events.TaskExit{}) { + continue + } + any, err := typeurl.UnmarshalAny(evt.Event) + if err != nil { + return err + } + e := any.(*events.TaskExit) + if e.ContainerID == id && e.Pid == pid { + return nil + } + } +} diff --git a/pkg/server/service.go b/pkg/server/service.go index eb189bf3c..40c2087b6 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -20,9 +20,10 @@ import ( "fmt" "github.com/containerd/containerd" - "github.com/containerd/containerd/api/services/containers" - "github.com/containerd/containerd/api/services/execution" - versionapi "github.com/containerd/containerd/api/services/version" + "github.com/containerd/containerd/api/services/events/v1" + "github.com/containerd/containerd/api/services/tasks/v1" + versionapi "github.com/containerd/containerd/api/services/version/v1" + "github.com/containerd/containerd/containers" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" diffservice "github.com/containerd/containerd/services/diff" @@ -71,9 +72,9 @@ type criContainerdService struct { // imageStore stores all resources associated with images. imageStore *imagestore.Store // containerService is containerd containers client. - containerService containers.ContainersClient + containerService containers.Store // taskService is containerd tasks client. - taskService execution.TasksClient + taskService tasks.TasksClient // contentStoreService is the containerd content service client. contentStoreService content.Store // snapshotService is the containerd snapshot service client. @@ -93,6 +94,8 @@ type criContainerdService struct { agentFactory agents.AgentFactory // client is an instance of the containerd client client *containerd.Client + // eventsService is the containerd task service client + eventService events.EventsClient } // NewCRIContainerdService returns a new instance of CRIContainerdService @@ -117,12 +120,14 @@ func NewCRIContainerdService(containerdEndpoint, rootDir, networkPluginBinDir, n taskService: client.TaskService(), imageStoreService: client.ImageService(), contentStoreService: client.ContentStore(), - snapshotService: client.SnapshotService(), - diffService: client.DiffService(), - versionService: client.VersionService(), - healthService: client.HealthService(), - agentFactory: agents.NewAgentFactory(), - client: client, + // Use daemon default snapshotter. + snapshotService: client.SnapshotService(""), + diffService: client.DiffService(), + versionService: client.VersionService(), + healthService: client.HealthService(), + agentFactory: agents.NewAgentFactory(), + client: client, + eventService: client.EventService(), } netPlugin, err := ocicni.InitCNI(networkPluginBinDir, networkPluginConfDir) diff --git a/pkg/server/testing/mock_version_client.go b/pkg/server/testing/mock_version_client.go index b33723b9a..d6e65360e 100644 --- a/pkg/server/testing/mock_version_client.go +++ b/pkg/server/testing/mock_version_client.go @@ -20,7 +20,7 @@ limitations under the License. package testing import ( - version "github.com/containerd/containerd/api/services/version" + version "github.com/containerd/containerd/api/services/version/v1" gomock "github.com/golang/mock/gomock" empty "github.com/golang/protobuf/ptypes/empty" context "golang.org/x/net/context" diff --git a/pkg/server/version_test.go b/pkg/server/version_test.go index 8f7273867..593dbe9c5 100644 --- a/pkg/server/version_test.go +++ b/pkg/server/version_test.go @@ -20,7 +20,7 @@ import ( "errors" "testing" - versionapi "github.com/containerd/containerd/api/services/version" + versionapi "github.com/containerd/containerd/api/services/version/v1" "github.com/golang/mock/gomock" "github.com/golang/protobuf/ptypes/empty" "github.com/stretchr/testify/assert"