Containerd client integration

This commit:
1) Replaces the usage of containerd GRPC APIs with the containerd client for all operations related to containerd.
2) Updated containerd to v1.0alpha4+
3) Updated runc to v1.0.0

Signed-off-by: Abhinandan Prativadi <abhi@docker.com>
This commit is contained in:
Abhinandan Prativadi
2017-08-04 09:55:36 -07:00
parent 2427d332f0
commit 32e0313418
170 changed files with 5819 additions and 7262 deletions

View File

@@ -17,14 +17,11 @@ limitations under the License.
package server
import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/glog"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/runc/libcontainer/devices"
@@ -94,29 +91,15 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
if err != nil {
return nil, fmt.Errorf("failed to generate container %q spec: %v", id, err)
}
rawSpec, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
}
glog.V(4).Infof("Container spec: %+v", spec)
var opts []containerd.NewContainerOpts
// Prepare container rootfs.
if config.GetLinux().GetSecurityContext().GetReadonlyRootfs() {
if _, err := c.snapshotService.View(ctx, id, image.ChainID); err != nil {
return nil, fmt.Errorf("failed to view container rootfs %q: %v", image.ChainID, err)
}
opts = append(opts, containerd.WithNewSnapshotView(id, image.Image))
} else {
if _, err := c.snapshotService.Prepare(ctx, id, image.ChainID); err != nil {
return nil, fmt.Errorf("failed to prepare container rootfs %q: %v", image.ChainID, err)
}
opts = append(opts, containerd.WithNewSnapshot(id, image.Image))
}
defer func() {
if retErr != nil {
if err := c.snapshotService.Remove(ctx, id); err != nil {
glog.Errorf("Failed to remove container snapshot %q: %v", id, err)
}
}
}()
meta.ImageRef = image.ID
// Create container root directory.
@@ -135,29 +118,22 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
}
}()
// Create containerd container.
if _, err = c.containerService.Create(ctx, containers.Container{
ID: id,
// TODO(random-liu): Checkpoint metadata into container labels.
Image: image.ID,
Runtime: containers.RuntimeInfo{Name: defaultRuntime},
Spec: &prototypes.Any{
TypeUrl: runtimespec.Version,
Value: rawSpec,
},
RootFS: id,
}); err != nil {
opts = append(opts, containerd.WithSpec(spec), containerd.WithRuntime(defaultRuntime))
var cntr containerd.Container
if cntr, err = c.client.NewContainer(ctx, id, opts...); err != nil {
return nil, fmt.Errorf("failed to create containerd container: %v", err)
}
defer func() {
if retErr != nil {
if err := c.containerService.Delete(ctx, id); err != nil {
if err := cntr.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
glog.Errorf("Failed to delete containerd container %q: %v", id, err)
}
}
}()
container, err := containerstore.NewContainer(meta, containerstore.Status{CreatedAt: time.Now().UnixNano()})
container, err := containerstore.NewContainer(meta,
containerstore.Status{CreatedAt: time.Now().UnixNano()},
containerstore.WithContainer(cntr))
if err != nil {
return nil, fmt.Errorf("failed to create internal container object for %q: %v",
id, err)

View File

@@ -19,6 +19,7 @@ package server
import (
"fmt"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/golang/glog"
"golang.org/x/net/context"
@@ -68,14 +69,6 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R
// kubelet implementation, we'll never start a container once we decide to remove it,
// so we don't need the "Dead" state for now.
// Remove container snapshot.
if err := c.snapshotService.Remove(ctx, id); err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to remove container snapshot %q: %v", id, err)
}
glog.V(5).Infof("Remove called for snapshot %q that does not exist", id)
}
containerRootDir := getContainerRootDir(c.rootDir, id)
if err := c.os.RemoveAll(containerRootDir); err != nil {
return nil, fmt.Errorf("failed to remove container root directory %q: %v",
@@ -88,8 +81,8 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R
}
// Delete containerd container.
if err := c.containerService.Delete(ctx, id); err != nil {
if !isContainerdGRPCNotFoundError(err) {
if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err)
}
glog.V(5).Infof("Remove called for containerd container %q that does not exist", id, err)

View File

@@ -17,15 +17,13 @@ limitations under the License.
package server
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@@ -54,7 +52,7 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St
if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
// Always apply status change no matter startContainer fails or not. Because startContainer
// may change container state no matter it fails or succeeds.
startErr = c.startContainer(ctx, id, container.Metadata, &status)
startErr = c.startContainer(ctx, container.Container, container.Metadata, &status)
return status, nil
}); startErr != nil {
return nil, startErr
@@ -66,13 +64,17 @@ func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.St
// startContainer actually starts the container. The function needs to be run in one transaction. Any updates
// to the status passed in will be applied no matter the function returns error or not.
func (c *criContainerdService) startContainer(ctx context.Context, id string, meta containerstore.Metadata, status *containerstore.Status) (retErr error) {
func (c *criContainerdService) startContainer(ctx context.Context,
container containerd.Container,
meta containerstore.Metadata,
status *containerstore.Status) (retErr error) {
config := meta.Config
id := container.ID()
// Return error if container is not in created state.
if status.State() != runtime.ContainerState_CONTAINER_CREATED {
return fmt.Errorf("container %q is in %s state", id, criContainerStateToString(status.State()))
}
// Do not start the container when there is a removal in progress.
if status.Removing {
return fmt.Errorf("container %q is in removing state", id)
@@ -97,102 +99,65 @@ func (c *criContainerdService) startContainer(ctx context.Context, id string, me
sandboxConfig := sandbox.Config
sandboxID := meta.SandboxID
// Make sure sandbox is running.
sandboxInfo, err := c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: sandboxID})
s, err := sandbox.Container.Task(ctx, nil)
if err != nil {
return fmt.Errorf("failed to get sandbox container %q info: %v", sandboxID, err)
}
// This is only a best effort check, sandbox may still exit after this. If sandbox fails
// before starting the container, the start will fail.
if sandboxInfo.Task.Status != task.StatusRunning {
taskStatus, err := s.Status(ctx)
if err != nil {
return fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err)
}
if taskStatus.Status != containerd.Running {
return fmt.Errorf("sandbox container %q is not running", sandboxID)
}
containerRootDir := getContainerRootDir(c.rootDir, id)
stdin, stdout, stderr := getStreamingPipes(containerRootDir)
// Set stdin to empty if Stdin == false.
if !config.GetStdin() {
stdin = ""
}
stdinPipe, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, stdin, stdout, stderr)
if err != nil {
return fmt.Errorf("failed to prepare streaming pipes: %v", err)
}
defer func() {
if retErr != nil {
if stdinPipe != nil {
stdinPipe.Close()
}
stdoutPipe.Close()
stderrPipe.Close()
}
}()
// Redirect the stream to std for now.
// TODO(random-liu): [P1] Support StdinOnce after container logging is added.
if stdinPipe != nil {
go func(w io.WriteCloser) {
io.Copy(w, os.Stdin) // nolint: errcheck
w.Close()
}(stdinPipe)
}
rStdoutPipe, wStdoutPipe := io.Pipe()
rStderrPipe, wStderrPipe := io.Pipe()
stdin := new(bytes.Buffer)
defer func() {
if retErr != nil {
rStdoutPipe.Close()
rStderrPipe.Close()
}
}()
if config.GetLogPath() != "" {
// Only generate container log when log path is specified.
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stdout, stdoutPipe).Start(); err != nil {
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stdout, rStdoutPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stdout logger: %v", err)
}
// Only redirect stderr when there is no tty.
if !config.GetTty() {
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stderr, stderrPipe).Start(); err != nil {
if err = c.agentFactory.NewContainerLogger(logPath, agents.Stderr, rStderrPipe).Start(); err != nil {
return fmt.Errorf("failed to start container stderr logger: %v", err)
}
}
}
// Get rootfs mounts.
rootfsMounts, err := c.snapshotService.Mounts(ctx, id)
if err != nil {
return fmt.Errorf("failed to get rootfs mounts %q: %v", id, err)
}
var rootfs []*types.Mount
for _, m := range rootfsMounts {
rootfs = append(rootfs, &types.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
}
// Create containerd task.
createOpts := &tasks.CreateTaskRequest{
ContainerID: id,
Rootfs: rootfs,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Terminal: config.GetTty(),
}
glog.V(5).Infof("Create containerd task (id=%q, name=%q) with options %+v.",
id, meta.Name, createOpts)
createResp, err := c.taskService.Create(ctx, createOpts)
//TODO(Abhi): close stdin/pass a managed IOCreation
task, err := container.NewTask(ctx, containerd.NewIO(stdin, wStdoutPipe, wStderrPipe))
if err != nil {
return fmt.Errorf("failed to create containerd task: %v", err)
}
defer func() {
if retErr != nil {
// Cleanup the containerd task if an error is returned.
if _, err := c.taskService.Delete(ctx, &tasks.DeleteTaskRequest{ContainerID: id}); err != nil {
if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil {
glog.Errorf("Failed to delete containerd task %q: %v", id, err)
}
}
}()
// Start containerd task.
if _, err := c.taskService.Start(ctx, &tasks.StartTaskRequest{ContainerID: id}); err != nil {
if err := task.Start(ctx); err != nil {
return fmt.Errorf("failed to start containerd task %q: %v", id, err)
}
// Update container start timestamp.
status.Pid = createResp.Pid
status.Pid = task.Pid()
status.StartedAt = time.Now().UnixNano()
return nil
}

View File

@@ -20,7 +20,7 @@ import (
"fmt"
"time"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/errdefs"
"github.com/docker/docker/pkg/signal"
"github.com/golang/glog"
"golang.org/x/net/context"
@@ -94,16 +94,20 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
}
}
glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal)
_, err = c.taskService.Kill(ctx, &tasks.KillRequest{
ContainerID: id,
Signal: uint32(stopSignal),
All: true,
})
task, err := container.Container.Task(ctx, nil)
if err != nil {
if !isContainerdGRPCNotFoundError(err) && !isRuncProcessAlreadyFinishedError(err) {
return fmt.Errorf("failed to stop container %q: %v", id, err)
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err)
}
return nil
}
if task != nil {
if err = task.Kill(ctx, stopSignal); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container %q: %v", id, err)
}
// Move on to make sure container status is updated.
}
// Move on to make sure container status is updated.
}
err = c.waitContainerStop(ctx, id, timeout)
@@ -113,18 +117,22 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
glog.Errorf("Stop container %q timed out: %v", id, err)
}
task, err := container.Container.Task(ctx, nil)
if err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err)
}
return nil
}
// Event handler will Delete the container from containerd after it handles the Exited event.
glog.V(2).Infof("Kill container %q", id)
_, err := c.taskService.Kill(ctx, &tasks.KillRequest{
ContainerID: id,
Signal: uint32(unix.SIGKILL),
All: true,
})
if err != nil {
if !isContainerdGRPCNotFoundError(err) && !isRuncProcessAlreadyFinishedError(err) {
return fmt.Errorf("failed to kill container %q: %v", id, err)
if task != nil {
if err = task.Kill(ctx, unix.SIGKILL); err != nil {
if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to kill container %q: %v", id, err)
}
// Move on to make sure container status is updated.
}
// Move on to make sure container status is updated.
}
// Wait for a fixed timeout until container stop is observed by event monitor.

View File

@@ -20,7 +20,7 @@ import (
"time"
"github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/typeurl"
"github.com/golang/glog"
"github.com/jpillora/backoff"
@@ -104,13 +104,22 @@ func (c *criContainerdService) handleEvent(evt *events.Envelope) {
// Non-init process died, ignore the event.
return
}
// Delete the container from containerd.
_, err = c.taskService.Delete(context.Background(), &tasks.DeleteTaskRequest{ContainerID: e.ContainerID})
// TODO(random-liu): Change isContainerdGRPCNotFoundError to use errdefs.
if err != nil && !isContainerdGRPCNotFoundError(err) {
// TODO(random-liu): [P0] Enqueue the event and retry.
glog.Errorf("Failed to delete container %q: %v", e.ContainerID, err)
return
task, err := cntr.Container.Task(context.Background(), nil)
if err != nil {
if !errdefs.IsNotFound(err) {
glog.Errorf("failed to stop container, task not found for container %q: %v", e.ContainerID, err)
return
}
}
if task != nil {
if _, err = task.Delete(context.Background()); err != nil {
if !errdefs.IsNotFound(err) {
// TODO(random-liu): [P0] Enqueue the event and retry.
glog.Errorf("failed to stop container %q: %v", e.ContainerID, err)
return
}
// Move on to make sure container status is updated.
}
}
err = cntr.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
// If FinishedAt has been set (e.g. with start failure), keep as

View File

@@ -26,6 +26,7 @@ import (
"strings"
"syscall"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/docker/distribution/reference"
"github.com/docker/docker/pkg/stringid"
@@ -33,8 +34,6 @@ import (
"github.com/opencontainers/image-spec/identity"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/store"
@@ -78,12 +77,6 @@ const (
// According to http://man7.org/linux/man-pages/man5/resolv.conf.5.html:
// "The search list is currently limited to six domains with a total of 256 characters."
maxDNSSearches = 6
// stdinNamedPipe is the name of stdin named pipe.
stdinNamedPipe = "stdin"
// stdoutNamedPipe is the name of stdout named pipe.
stdoutNamedPipe = "stdout"
// stderrNamedPipe is the name of stderr named pipe.
stderrNamedPipe = "stderr"
// Delimiter used to construct container/sandbox names.
nameDelimiter = "_"
// netNSFormat is the format of network namespace of a process.
@@ -148,15 +141,6 @@ func getContainerRootDir(rootDir, id string) string {
return filepath.Join(rootDir, containersDir, id)
}
// getStreamingPipes returns the stdin/stdout/stderr pipes path in the
// container/sandbox root.
func getStreamingPipes(rootDir string) (string, string, string) {
stdin := filepath.Join(rootDir, stdinNamedPipe)
stdout := filepath.Join(rootDir, stdoutNamedPipe)
stderr := filepath.Join(rootDir, stderrNamedPipe)
return stdin, stdout, stderr
}
// getSandboxHosts returns the hosts file path inside the sandbox root directory.
func getSandboxHosts(sandboxRootDir string) string {
return filepath.Join(sandboxRootDir, "hosts")
@@ -223,18 +207,6 @@ func getPIDNamespace(pid uint32) string {
return fmt.Sprintf(pidNSFormat, pid)
}
// isContainerdGRPCNotFoundError checks whether a grpc error is not found error.
func isContainerdGRPCNotFoundError(grpcError error) bool {
return grpc.Code(grpcError) == codes.NotFound
}
// isRuncProcessAlreadyFinishedError checks whether a grpc error is a process already
// finished error.
// TODO(random-liu): Containerd should expose this error in api. (containerd#999)
func isRuncProcessAlreadyFinishedError(grpcError error) bool {
return strings.Contains(grpc.ErrorDesc(grpcError), "os: process already finished")
}
// criContainerStateToString formats CRI container state to string.
func criContainerStateToString(state runtime.ContainerState) string {
return runtime.ContainerState_name[int32(state)]
@@ -269,46 +241,48 @@ func normalizeImageRef(ref string) (reference.Named, error) {
return reference.TagNameOnly(named), nil
}
// getImageInfo returns image chainID, compressed size and oci config. Note that getImageInfo
// getImageInfo returns image chainID, compressed size, oci config, imageID. Note that getImageInfo
// assumes that the image has been pulled or it will return an error.
func (c *criContainerdService) getImageInfo(ctx context.Context, ref string) (
imagedigest.Digest, int64, *imagespec.ImageConfig, error) {
imagedigest.Digest, int64, *imagespec.ImageConfig, imagedigest.Digest, error) {
// Get image config
normalized, err := normalizeImageRef(ref)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to normalize image reference %q: %v", ref, err)
return "", 0, nil, "", fmt.Errorf("failed to normalize image reference %q: %v", ref, err)
}
normalizedRef := normalized.String()
//TODO(Abhi): Switch to using containerd client GetImage() api
image, err := c.imageStoreService.Get(ctx, normalizedRef)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image %q from containerd image store: %v",
return "", 0, nil, "", fmt.Errorf("failed to get image %q from containerd image store: %v",
normalizedRef, err)
}
// Get image config
desc, err := image.Config(ctx, c.contentStoreService)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image config descriptor: %v", err)
return "", 0, nil, "", fmt.Errorf("failed to get image config descriptor: %v", err)
}
rc, err := c.contentStoreService.Reader(ctx, desc.Digest)
rb, err := content.ReadBlob(ctx, c.contentStoreService, desc.Digest)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image config reader: %v", err)
return "", 0, nil, "", fmt.Errorf("failed to get image config reader: %v", err)
}
defer rc.Close()
var imageConfig imagespec.Image
if err = json.NewDecoder(rc).Decode(&imageConfig); err != nil {
return "", 0, nil, fmt.Errorf("failed to decode image config: %v", err)
if err = json.Unmarshal(rb, &imageConfig); err != nil {
return "", 0, nil, "", err
}
// Get image chainID
diffIDs, err := image.RootFS(ctx, c.contentStoreService)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image diff ids: %v", err)
return "", 0, nil, "", fmt.Errorf("failed to get image diff ids: %v", err)
}
chainID := identity.ChainID(diffIDs)
// Get image size
size, err := image.Size(ctx, c.contentStoreService)
if err != nil {
return "", 0, nil, fmt.Errorf("failed to get image size: %v", err)
return "", 0, nil, "", fmt.Errorf("failed to get image size: %v", err)
}
return chainID, size, &imageConfig.Config, nil
return chainID, size, &imageConfig.Config, desc.Digest, nil
}
// getRepoDigestAngTag returns image repoDigest and repoTag of the named image reference.
@@ -336,6 +310,7 @@ func (c *criContainerdService) localResolve(ctx context.Context, ref string) (*i
if err != nil {
return nil, fmt.Errorf("invalid image reference %q: %v", ref, err)
}
//TODO(Abhi): Switch to using containerd client GetImage() api
imageInContainerd, err := c.imageStoreService.Get(ctx, normalized.String())
if err != nil {
if errdefs.IsNotFound(err) {

View File

@@ -17,22 +17,15 @@ limitations under the License.
package server
import (
gocontext "context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
containerdimages "github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/schema1"
containerdrootfs "github.com/containerd/containerd/rootfs"
"github.com/golang/glog"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/net/context"
@@ -88,70 +81,70 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma
r.GetImage().GetImage(), retRes.GetImageRef())
}
}()
imageRef := r.GetImage().GetImage()
namedRef, err := normalizeImageRef(imageRef)
if err != nil {
return nil, fmt.Errorf("failed to parse image reference %q: %v", imageRef, err)
}
// TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference.
ref := namedRef.String()
if ref != imageRef {
glog.V(4).Infof("PullImage using normalized image ref: %q", ref)
}
resolver := docker.NewResolver(docker.ResolverOptions{
Credentials: func(string) (string, string, error) { return ParseAuth(r.GetAuth()) },
Client: http.DefaultClient,
})
// TODO(mikebrow): add truncIndex for image id
imageID, repoTag, repoDigest, err := c.pullImage(ctx, imageRef, r.GetAuth())
image, err := c.client.Pull(ctx, ref, containerd.WithPullUnpack, containerd.WithSchema1Conversion, containerd.WithResolver(resolver))
if err != nil {
return nil, fmt.Errorf("failed to pull image %q: %v", imageRef, err)
return nil, fmt.Errorf("failed to pull image %q: %v", ref, err)
}
repoDigest, repoTag := getRepoDigestAndTag(namedRef, image.Target().Digest, image.Target().MediaType == containerdimages.MediaTypeDockerSchema1Manifest)
for _, r := range []string{repoTag, repoDigest} {
if r == "" {
continue
}
if err := c.createImageReference(ctx, r, image.Target()); err != nil {
return nil, fmt.Errorf("failed to update image reference %q: %v", r, err)
}
}
glog.V(4).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q", imageRef, imageID,
repoTag, repoDigest)
// Get image information.
chainID, size, config, err := c.getImageInfo(ctx, imageRef)
chainID, size, config, id, err := c.getImageInfo(ctx, imageRef)
if err != nil {
return nil, fmt.Errorf("failed to get image %q information: %v", imageRef, err)
}
image := imagestore.Image{
imageID := id.String()
if err := c.createImageReference(ctx, imageID, image.Target()); err != nil {
return nil, fmt.Errorf("failed to update image reference %q: %v", imageID, err)
}
glog.V(4).Infof("Pulled image %q with image id %q, repo tag %q, repo digest %q", imageRef, imageID,
repoTag, repoDigest)
img := imagestore.Image{
ID: imageID,
ChainID: chainID.String(),
Size: size,
Config: config,
Image: image,
}
if repoDigest != "" {
image.RepoDigests = []string{repoDigest}
img.RepoDigests = []string{repoDigest}
}
if repoTag != "" {
image.RepoTags = []string{repoTag}
img.RepoTags = []string{repoTag}
}
c.imageStore.Add(image)
c.imageStore.Add(img)
// NOTE(random-liu): the actual state in containerd is the source of truth, even we maintain
// in-memory image store, it's only for in-memory indexing. The image could be removed
// by someone else anytime, before/during/after we create the metadata. We should always
// check the actual state in containerd before using the image or returning status of the
// image.
return &runtime.PullImageResponse{ImageRef: imageID}, err
}
// resourceSet is the helper struct to help tracking all resources associated
// with an image.
type resourceSet struct {
sync.Mutex
resources map[string]struct{}
}
func newResourceSet() *resourceSet {
return &resourceSet{resources: make(map[string]struct{})}
}
func (r *resourceSet) add(resource string) {
r.Lock()
defer r.Unlock()
r.resources[resource] = struct{}{}
}
// all returns an array of all resources added.
func (r *resourceSet) all() map[string]struct{} {
r.Lock()
defer r.Unlock()
resources := make(map[string]struct{})
for resource := range r.resources {
resources[resource] = struct{}{}
}
return resources
return &runtime.PullImageResponse{ImageRef: img.ID}, err
}
// ParseAuth parses AuthConfig and returns username and password/secret required by containerd.
@@ -183,160 +176,6 @@ func ParseAuth(auth *runtime.AuthConfig) (string, string, error) {
return "", "", fmt.Errorf("invalid auth config")
}
// pullImage pulls image and returns image id (config digest), repoTag and repoDigest.
func (c *criContainerdService) pullImage(ctx context.Context, rawRef string, auth *runtime.AuthConfig) (
// TODO(random-liu): Replace with client.Pull.
string, string, string, error) {
namedRef, err := normalizeImageRef(rawRef)
if err != nil {
return "", "", "", fmt.Errorf("failed to parse image reference %q: %v", rawRef, err)
}
// TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference.
ref := namedRef.String()
if ref != rawRef {
glog.V(4).Infof("PullImage using normalized image ref: %q", ref)
}
// Resolve the image reference to get descriptor and fetcher.
resolver := docker.NewResolver(docker.ResolverOptions{
Credentials: func(string) (string, string, error) { return ParseAuth(auth) },
Client: http.DefaultClient,
})
_, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return "", "", "", fmt.Errorf("failed to resolve ref %q: %v", ref, err)
}
fetcher, err := resolver.Fetcher(ctx, ref)
if err != nil {
return "", "", "", fmt.Errorf("failed to get fetcher for ref %q: %v", ref, err)
}
// Currently, the resolved image name is the same with ref in docker resolver,
// but they may be different in the future.
// TODO(random-liu): Always resolve image reference and use resolved image name in
// the system.
glog.V(4).Infof("Start downloading resources for image %q", ref)
resources := newResourceSet()
resourceTrackHandler := containerdimages.HandlerFunc(func(ctx gocontext.Context, desc imagespec.Descriptor) (
[]imagespec.Descriptor, error) {
resources.add(remotes.MakeRefKey(ctx, desc))
return nil, nil
})
// Fetch all image resources into content store.
// Dispatch a handler which will run a sequence of handlers to:
// 1) track all resources associated using a customized handler;
// 2) fetch the object using a FetchHandler;
// 3) recurse through any sub-layers via a ChildrenHandler.
// Support schema1 image.
var (
schema1Converter *schema1.Converter
handler containerdimages.Handler
)
if desc.MediaType == containerdimages.MediaTypeDockerSchema1Manifest {
schema1Converter = schema1.NewConverter(c.contentStoreService, fetcher)
handler = containerdimages.Handlers(
resourceTrackHandler,
schema1Converter,
)
} else {
handler = containerdimages.Handlers(
resourceTrackHandler,
remotes.FetchHandler(c.contentStoreService, fetcher),
containerdimages.ChildrenHandler(c.contentStoreService),
)
}
if err := containerdimages.Dispatch(ctx, handler, desc); err != nil {
// Dispatch returns error when requested resources are locked.
// In that case, we should start waiting and checking the pulling
// progress.
// TODO(random-liu): Check specific resource locked error type.
glog.V(5).Infof("Dispatch for %q returns error: %v", ref, err)
}
// Wait for the image pulling to finish
if err := c.waitForResourcesDownloading(ctx, resources.all()); err != nil {
return "", "", "", fmt.Errorf("failed to wait for image %q downloading: %v", ref, err)
}
glog.V(4).Infof("Finished downloading resources for image %q", ref)
if schema1Converter != nil {
desc, err = schema1Converter.Convert(ctx)
if err != nil {
return "", "", "", fmt.Errorf("failed to convert schema 1 image %q: %v", ref, err)
}
}
// In the future, containerd will rely on the information in the image store to perform image
// garbage collection.
// For now, we simply use it to store and retrieve information required for pulling an image.
// @stevvooe said we should `Put` before downloading content, However:
// 1) Containerd client put image metadata after downloading;
// 2) We need desc returned by schema1 converter.
// So just put the image metadata after downloading now.
// TODO(random-liu): Fix the potential garbage collection race.
repoDigest, repoTag := getRepoDigestAndTag(namedRef, desc.Digest, schema1Converter != nil)
if ref != repoTag && ref != repoDigest {
return "", "", "", fmt.Errorf("unexpected repo tag %q and repo digest %q for %q", repoTag, repoDigest, ref)
}
for _, r := range []string{repoTag, repoDigest} {
if r == "" {
continue
}
if err := c.createImageReference(ctx, r, desc); err != nil {
return "", "", "", fmt.Errorf("failed to update image reference %q: %v", r, err)
}
}
// Do not cleanup if following operations fail so as to make resumable download possible.
// TODO(random-liu): Replace with image.Unpack.
// Unpack the image layers into snapshots.
image, err := c.imageStoreService.Get(ctx, ref)
if err != nil {
return "", "", "", fmt.Errorf("failed to get image %q from containerd image store: %v", ref, err)
}
// Read the image manifest from content store.
manifestDigest := image.Target.Digest
p, err := content.ReadBlob(ctx, c.contentStoreService, manifestDigest)
if err != nil {
return "", "", "", fmt.Errorf("readblob failed for manifest digest %q: %v", manifestDigest, err)
}
var manifest imagespec.Manifest
if err := json.Unmarshal(p, &manifest); err != nil {
return "", "", "", fmt.Errorf("unmarshal blob to manifest failed for manifest digest %q: %v",
manifestDigest, err)
}
diffIDs, err := image.RootFS(ctx, c.contentStoreService)
if err != nil {
return "", "", "", fmt.Errorf("failed to get image rootfs: %v", err)
}
if len(diffIDs) != len(manifest.Layers) {
return "", "", "", fmt.Errorf("mismatched image rootfs and manifest layers")
}
layers := make([]containerdrootfs.Layer, len(diffIDs))
for i := range diffIDs {
layers[i].Diff = imagespec.Descriptor{
// TODO: derive media type from compressed type
MediaType: imagespec.MediaTypeImageLayer,
Digest: diffIDs[i],
}
layers[i].Blob = manifest.Layers[i]
}
if _, err := containerdrootfs.ApplyLayers(ctx, layers, c.snapshotService, c.diffService); err != nil {
return "", "", "", fmt.Errorf("failed to apply layers %+v: %v", layers, err)
}
// TODO(random-liu): Considering how to deal with the disk usage of content.
configDesc, err := image.Config(ctx, c.contentStoreService)
if err != nil {
return "", "", "", fmt.Errorf("failed to get config descriptor for image %q: %v", ref, err)
}
// Use config digest as imageID to conform to oci image spec, and also add image id as
// image reference.
imageID := configDesc.Digest.String()
if err := c.createImageReference(ctx, imageID, desc); err != nil {
return "", "", "", fmt.Errorf("failed to update image id %q: %v", imageID, err)
}
return imageID, repoTag, repoDigest, nil
}
// createImageReference creates image reference inside containerd image store.
// Note that because create and update are not finished in one transaction, there could be race. E.g.
// the image reference is deleted by someone else after create returns already exists, but before update
@@ -358,40 +197,3 @@ func (c *criContainerdService) createImageReference(ctx context.Context, name st
_, err = c.imageStoreService.Update(ctx, img, "target")
return err
}
// waitDownloadingPollInterval is the interval to check resource downloading progress.
const waitDownloadingPollInterval = 200 * time.Millisecond
// waitForResourcesDownloading waits for all resource downloading to finish.
func (c *criContainerdService) waitForResourcesDownloading(ctx context.Context, resources map[string]struct{}) error {
ticker := time.NewTicker(waitDownloadingPollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// TODO(random-liu): Use better regexp when containerd `MakeRefKey` contains more
// information.
statuses, err := c.contentStoreService.ListStatuses(ctx, "")
if err != nil {
return fmt.Errorf("failed to get content status: %v", err)
}
pulling := false
// TODO(random-liu): Move Dispatch into a separate goroutine, so that we could report
// image pulling progress concurrently.
for _, status := range statuses {
_, ok := resources[status.Ref]
if ok {
glog.V(5).Infof("Pulling resource %q with progress %d/%d",
status.Ref, status.Offset, status.Total)
pulling = true
}
}
if !pulling {
return nil
}
case <-ctx.Done():
// TODO(random-liu): Abort ongoing pulling if cancelled.
return fmt.Errorf("image resources pulling is cancelled")
}
}
}

View File

@@ -18,33 +18,12 @@ package server
import (
"encoding/base64"
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
func TestResources(t *testing.T) {
const threads = 10
var wg sync.WaitGroup
r := newResourceSet()
for i := 0; i < threads; i++ {
wg.Add(1)
go func(ref string) {
r.add(ref)
wg.Done()
}(fmt.Sprintf("sha256:%d", i))
}
wg.Wait()
refs := r.all()
for i := 0; i < threads; i++ {
_, ok := refs[fmt.Sprintf("sha256:%d", i)]
assert.True(t, ok)
}
}
func TestParseAuth(t *testing.T) {
testUser := "username"
testPasswd := "password"

View File

@@ -19,12 +19,10 @@ package server
import (
"fmt"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types/task"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types/task"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
@@ -42,16 +40,15 @@ func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.Li
// List all sandboxes from store.
sandboxesInStore := c.sandboxStore.List()
resp, err := c.taskService.List(ctx, &tasks.ListTasksRequest{})
response, err := c.taskService.List(ctx, &tasks.ListTasksRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list sandbox containers: %v", err)
}
sandboxesInContainerd := resp.Tasks
var sandboxes []*runtime.PodSandbox
for _, sandboxInStore := range sandboxesInStore {
var sandboxInContainerd *task.Task
for _, s := range sandboxesInContainerd {
var sandboxInContainerd *task.Process
for _, s := range response.Tasks {
if s.ID == sandboxInStore.ID {
sandboxInContainerd = s
break

View File

@@ -19,7 +19,7 @@ package server
import (
"fmt"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/golang/glog"
"golang.org/x/net/context"
@@ -54,22 +54,14 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
// Return error if sandbox container is not fully stopped.
// TODO(random-liu): [P0] Make sure network is torn down, may need to introduce a state.
_, err = c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: id})
if err != nil && !isContainerdGRPCNotFoundError(err) {
_, err = sandbox.Container.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err)
}
if err == nil {
return nil, fmt.Errorf("sandbox container %q is not fully stopped", id)
}
// Remove sandbox container snapshot.
if err := c.snapshotService.Remove(ctx, id); err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to remove sandbox container snapshot %q: %v", id, err)
}
glog.V(5).Infof("Remove called for snapshot %q that does not exist", id)
}
// Remove all containers inside the sandbox.
// NOTE(random-liu): container could still be created after this point, Kubelet should
// not rely on this behavior.
@@ -96,8 +88,8 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
}
// Delete sandbox container.
if err := c.containerService.Delete(ctx, id); err != nil {
if !isContainerdGRPCNotFoundError(err) {
if err := sandbox.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete sandbox container %q: %v", id, err)
}
glog.V(5).Infof("Remove called for sandbox container %q that does not exist", id, err)

View File

@@ -17,17 +17,14 @@ limitations under the License.
package server
import (
"encoding/json"
"bytes"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types"
"github.com/containerd/containerd/containers"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/glog"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
@@ -78,26 +75,7 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
// Ensure sandbox container image snapshot.
image, err := c.ensureImageExists(ctx, c.sandboxImage)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox image %q: %v", defaultSandboxImage, err)
}
rootfsMounts, err := c.snapshotService.View(ctx, id, image.ChainID)
if err != nil {
return nil, fmt.Errorf("failed to prepare sandbox rootfs %q: %v", image.ChainID, err)
}
defer func() {
if retErr != nil {
if err := c.snapshotService.Remove(ctx, id); err != nil {
glog.Errorf("Failed to remove sandbox container snapshot %q: %v", id, err)
}
}
}()
var rootfs []*types.Mount
for _, m := range rootfsMounts {
rootfs = append(rootfs, &types.Mount{
Type: m.Type,
Source: m.Source,
Options: m.Options,
})
return nil, fmt.Errorf("failed to get sandbox image %q: %v", c.sandboxImage, err)
}
// Create sandbox container.
@@ -105,34 +83,26 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
if err != nil {
return nil, fmt.Errorf("failed to generate sandbox container spec: %v", err)
}
rawSpec, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
}
glog.V(4).Infof("Sandbox container spec: %+v", spec)
if _, err = c.containerService.Create(ctx, containers.Container{
ID: id,
// TODO(random-liu): Checkpoint metadata into container labels.
Image: image.ID,
Runtime: containers.RuntimeInfo{Name: defaultRuntime},
Spec: &prototypes.Any{
TypeUrl: runtimespec.Version,
Value: rawSpec,
},
RootFS: id,
}); err != nil {
// TODO(random-liu): Checkpoint metadata into container labels.
opts := []containerd.NewContainerOpts{
containerd.WithSpec(spec),
containerd.WithRuntime(defaultRuntime),
containerd.WithNewSnapshotView(id, image.Image)}
container, err := c.client.NewContainer(ctx, id, opts...)
if err != nil {
return nil, fmt.Errorf("failed to create containerd container: %v", err)
}
defer func() {
if retErr != nil {
if err := c.containerService.Delete(ctx, id); err != nil {
glog.Errorf("Failed to delete containerd container%q: %v", id, err)
if err := container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
glog.Errorf("Failed to delete containerd container %q: %v", id, err)
}
}
}()
// Create sandbox container root directory.
// Prepare streaming named pipe.
sandboxRootDir := getSandboxRootDir(c.rootDir, id)
if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create sandbox root directory %q: %v",
@@ -149,21 +119,18 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}()
// Discard sandbox container output because we don't care about it.
_, stdout, stderr := getStreamingPipes(sandboxRootDir)
_, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr)
if err != nil {
return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err)
}
rStdoutPipe, wStdoutPipe := io.Pipe()
rStderrPipe, wStderrPipe := io.Pipe()
defer func() {
if retErr != nil {
stdoutPipe.Close()
stderrPipe.Close()
rStdoutPipe.Close()
rStderrPipe.Close()
}
}()
if err := c.agentFactory.NewSandboxLogger(stdoutPipe).Start(); err != nil {
if err := c.agentFactory.NewSandboxLogger(rStdoutPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stdout logger: %v", err)
}
if err := c.agentFactory.NewSandboxLogger(stderrPipe).Start(); err != nil {
if err := c.agentFactory.NewSandboxLogger(rStderrPipe).Start(); err != nil {
return nil, fmt.Errorf("failed to start sandbox stderr logger: %v", err)
}
@@ -180,32 +147,25 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}
}()
createOpts := &tasks.CreateTaskRequest{
ContainerID: id,
Rootfs: rootfs,
// No stdin for sandbox container.
Stdout: stdout,
Stderr: stderr,
}
// Create sandbox task in containerd.
glog.V(5).Infof("Create sandbox container (id=%q, name=%q) with options %+v.",
id, name, createOpts)
createResp, err := c.taskService.Create(ctx, createOpts)
glog.V(5).Infof("Create sandbox container (id=%q, name=%q).",
id, name)
//TODO(Abhi): close the stdin or pass newIOCreation with /dev/null stdin
task, err := container.NewTask(ctx, containerd.NewIO(new(bytes.Buffer), wStdoutPipe, wStderrPipe))
if err != nil {
return nil, fmt.Errorf("failed to create sandbox container %q: %v",
id, err)
return nil, fmt.Errorf("failed to create task for sandbox %q: %v", id, err)
}
defer func() {
if retErr != nil {
// Cleanup the sandbox container if an error is returned.
if err := c.stopSandboxContainer(ctx, id); err != nil {
if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil {
glog.Errorf("Failed to delete sandbox container %q: %v", id, err)
}
}
}()
sandbox.Pid = createResp.Pid
sandbox.NetNS = getNetworkNamespace(createResp.Pid)
sandbox.Pid = task.Pid()
sandbox.NetNS = getNetworkNamespace(task.Pid())
if !config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetHostNetwork() {
// Setup network for sandbox.
// TODO(random-liu): [P2] Replace with permanent network namespace.
@@ -223,14 +183,14 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}()
}
// Start sandbox container in containerd.
if _, err := c.taskService.Start(ctx, &tasks.StartTaskRequest{ContainerID: id}); err != nil {
return nil, fmt.Errorf("failed to start sandbox container %q: %v",
if err = task.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start sandbox container task %q: %v",
id, err)
}
// Add sandbox into sandbox store.
sandbox.CreatedAt = time.Now().UnixNano()
sandbox.Container = container
if err := c.sandboxStore.Add(sandbox); err != nil {
return nil, fmt.Errorf("failed to add sandbox %+v into store: %v", sandbox, err)
}

View File

@@ -22,9 +22,8 @@ import (
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
@@ -47,18 +46,24 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.
// Use the full sandbox id.
id := sandbox.ID
info, err := c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: id})
if err != nil && !isContainerdGRPCNotFoundError(err) {
task, err := sandbox.Container.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err)
}
// Set sandbox state to NOTREADY by default.
state := runtime.PodSandboxState_SANDBOX_NOTREADY
// If the sandbox container is running, treat it as READY.
if info != nil && info.Task.Status == task.StatusRunning {
state = runtime.PodSandboxState_SANDBOX_READY
}
if task != nil {
taskStatus, err := task.Status(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err)
}
if taskStatus.Status == containerd.Running {
state = runtime.PodSandboxState_SANDBOX_READY
}
}
ip, err := c.netPlugin.GetContainerNetworkStatus(sandbox.NetNS, sandbox.Config.GetMetadata().GetNamespace(), sandbox.Config.GetMetadata().GetName(), id)
if err != nil {
// Ignore the error on network status

View File

@@ -20,13 +20,10 @@ import (
"fmt"
"os"
"github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/typeurl"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
"github.com/golang/glog"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
@@ -83,69 +80,26 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St
return nil, fmt.Errorf("failed to unmount sandbox files in %q: %v", sandboxRoot, err)
}
if err := c.stopSandboxContainer(ctx, id); err != nil {
if err := c.stopSandboxContainer(ctx, sandbox.Container); err != nil {
return nil, fmt.Errorf("failed to stop sandbox container %q: %v", id, err)
}
return &runtime.StopPodSandboxResponse{}, nil
}
// stopSandboxContainer kills and deletes sandbox container.
func (c *criContainerdService) stopSandboxContainer(ctx context.Context, id string) error {
cancellable, cancel := context.WithCancel(ctx)
eventstream, err := c.eventService.Subscribe(cancellable, &events.SubscribeRequest{})
func (c *criContainerdService) stopSandboxContainer(ctx context.Context, container containerd.Container) error {
task, err := container.Task(ctx, nil)
if err != nil {
return fmt.Errorf("failed to get containerd event: %v", err)
}
defer cancel()
resp, err := c.taskService.Get(ctx, &tasks.GetTaskRequest{ContainerID: id})
if err != nil {
if isContainerdGRPCNotFoundError(err) {
if errdefs.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to get sandbox container: %v", err)
}
if resp.Task.Status != task.StatusStopped {
// TODO(random-liu): [P1] Handle sandbox container graceful deletion.
if _, err := c.taskService.Kill(ctx, &tasks.KillRequest{
ContainerID: id,
Signal: uint32(unix.SIGKILL),
All: true,
}); err != nil && !isContainerdGRPCNotFoundError(err) && !isRuncProcessAlreadyFinishedError(err) {
return fmt.Errorf("failed to kill sandbox container: %v", err)
}
if err := c.waitSandboxContainer(eventstream, id, resp.Task.Pid); err != nil {
return fmt.Errorf("failed to wait for pod sandbox to stop: %v", err)
}
}
// Delete the sandbox container from containerd.
_, err = c.taskService.Delete(ctx, &tasks.DeleteTaskRequest{ContainerID: id})
if err != nil && !isContainerdGRPCNotFoundError(err) {
_, err = task.Delete(ctx, containerd.WithProcessKill)
if err != nil && !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to delete sandbox container: %v", err)
}
return nil
}
// waitSandboxContainer wait sandbox container stop event.
func (c *criContainerdService) waitSandboxContainer(eventstream events.Events_SubscribeClient, id string, pid uint32) error {
for {
evt, err := eventstream.Recv()
if err != nil {
return err
}
// Continue until the event received is of type task exit.
if !typeurl.Is(evt.Event, &events.TaskExit{}) {
continue
}
any, err := typeurl.UnmarshalAny(evt.Event)
if err != nil {
return err
}
e := any.(*events.TaskExit)
if e.ContainerID == id && e.Pid == pid {
return nil
}
}
}

View File

@@ -20,7 +20,6 @@ import (
"fmt"
"golang.org/x/net/context"
healthapi "google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
)
@@ -38,9 +37,8 @@ func (c *criContainerdService) Status(ctx context.Context, r *runtime.StatusRequ
Type: runtime.RuntimeReady,
Status: true,
}
// Use containerd grpc server healthcheck service to check its readiness.
resp, err := c.healthService.Check(ctx, &healthapi.HealthCheckRequest{})
if err != nil || resp.Status != healthapi.HealthCheckResponse_SERVING {
serving, err := c.client.IsServing(ctx)
if err != nil || !serving {
runtimeCondition.Status = false
runtimeCondition.Reason = runtimeNotReadyReason
if err != nil {
@@ -49,7 +47,6 @@ func (c *criContainerdService) Status(ctx context.Context, r *runtime.StatusRequ
runtimeCondition.Message = "Containerd grpc server is not serving"
}
}
networkCondition := &runtime.RuntimeCondition{
Type: runtime.NetworkReady,
Status: true,

View File

@@ -1,97 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"errors"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
healthapi "google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing"
)
func TestStatus(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
for desc, test := range map[string]struct {
containerdCheckRes *healthapi.HealthCheckResponse
containerdCheckErr error
networkStatusErr error
expectRuntimeNotReady bool
expectNetworkNotReady bool
}{
"runtime should not be ready when containerd is not serving": {
containerdCheckRes: &healthapi.HealthCheckResponse{
Status: healthapi.HealthCheckResponse_NOT_SERVING,
},
expectRuntimeNotReady: true,
},
"runtime should not be ready when containerd healthcheck returns error": {
containerdCheckErr: errors.New("healthcheck error"),
expectRuntimeNotReady: true,
},
"network should not be ready when network plugin status returns error": {
containerdCheckRes: &healthapi.HealthCheckResponse{
Status: healthapi.HealthCheckResponse_SERVING,
},
networkStatusErr: errors.New("status error"),
expectNetworkNotReady: true,
},
"runtime should be ready when containerd is serving": {
containerdCheckRes: &healthapi.HealthCheckResponse{
Status: healthapi.HealthCheckResponse_SERVING,
},
},
} {
t.Logf("TestCase %q", desc)
c := newTestCRIContainerdService()
ctx := context.Background()
mock := servertesting.NewMockHealthClient(ctrl)
mock.EXPECT().Check(ctx, &healthapi.HealthCheckRequest{}).Return(
test.containerdCheckRes, test.containerdCheckErr)
c.healthService = mock
if test.networkStatusErr != nil {
c.netPlugin.(*servertesting.FakeCNIPlugin).InjectError(
"Status", test.networkStatusErr)
}
resp, err := c.Status(ctx, &runtime.StatusRequest{})
assert.NoError(t, err)
require.NotNil(t, resp)
runtimeCondition := resp.Status.Conditions[0]
networkCondition := resp.Status.Conditions[1]
assert.Equal(t, runtime.RuntimeReady, runtimeCondition.Type)
assert.Equal(t, test.expectRuntimeNotReady, !runtimeCondition.Status)
if test.expectRuntimeNotReady {
assert.Equal(t, runtimeNotReadyReason, runtimeCondition.Reason)
assert.NotEmpty(t, runtimeCondition.Message)
}
assert.Equal(t, runtime.NetworkReady, networkCondition.Type)
assert.Equal(t, test.expectNetworkNotReady, !networkCondition.Status)
if test.expectNetworkNotReady {
assert.Equal(t, networkNotReadyReason, networkCondition.Reason)
assert.NotEmpty(t, networkCondition.Message)
}
}
}

View File

@@ -1,64 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Automatically generated by MockGen. DO NOT EDIT!
// Source: google.golang.org/grpc/health/grpc_health_v1 (interfaces: HealthClient)
package testing
import (
gomock "github.com/golang/mock/gomock"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
grpc_health_v1 "google.golang.org/grpc/health/grpc_health_v1"
)
// Mock of HealthClient interface
type MockHealthClient struct {
ctrl *gomock.Controller
recorder *_MockHealthClientRecorder
}
// Recorder for MockHealthClient (not exported)
type _MockHealthClientRecorder struct {
mock *MockHealthClient
}
func NewMockHealthClient(ctrl *gomock.Controller) *MockHealthClient {
mock := &MockHealthClient{ctrl: ctrl}
mock.recorder = &_MockHealthClientRecorder{mock}
return mock
}
func (_m *MockHealthClient) EXPECT() *_MockHealthClientRecorder {
return _m.recorder
}
func (_m *MockHealthClient) Check(_param0 context.Context, _param1 *grpc_health_v1.HealthCheckRequest, _param2 ...grpc.CallOption) (*grpc_health_v1.HealthCheckResponse, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := _m.ctrl.Call(_m, "Check", _s...)
ret0, _ := ret[0].(*grpc_health_v1.HealthCheckResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
func (_mr *_MockHealthClientRecorder) Check(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "Check", _s...)
}

View File

@@ -1,65 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Automatically generated by MockGen. DO NOT EDIT!
// Source: github.com/containerd/containerd/api/services/version (interfaces: VersionClient)
package testing
import (
version "github.com/containerd/containerd/api/services/version/v1"
gomock "github.com/golang/mock/gomock"
empty "github.com/golang/protobuf/ptypes/empty"
context "golang.org/x/net/context"
grpc "google.golang.org/grpc"
)
// Mock of VersionClient interface
type MockVersionClient struct {
ctrl *gomock.Controller
recorder *_MockVersionClientRecorder
}
// Recorder for MockVersionClient (not exported)
type _MockVersionClientRecorder struct {
mock *MockVersionClient
}
func NewMockVersionClient(ctrl *gomock.Controller) *MockVersionClient {
mock := &MockVersionClient{ctrl: ctrl}
mock.recorder = &_MockVersionClientRecorder{mock}
return mock
}
func (_m *MockVersionClient) EXPECT() *_MockVersionClientRecorder {
return _m.recorder
}
func (_m *MockVersionClient) Version(_param0 context.Context, _param1 *empty.Empty, _param2 ...grpc.CallOption) (*version.VersionResponse, error) {
_s := []interface{}{_param0, _param1}
for _, _x := range _param2 {
_s = append(_s, _x)
}
ret := _m.ctrl.Call(_m, "Version", _s...)
ret0, _ := ret[0].(*version.VersionResponse)
ret1, _ := ret[1].(error)
return ret0, ret1
}
func (_mr *_MockVersionClientRecorder) Version(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
_s := append([]interface{}{arg0, arg1}, arg2...)
return _mr.mock.ctrl.RecordCall(_mr.mock, "Version", _s...)
}

View File

@@ -19,7 +19,6 @@ package server
import (
"fmt"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@@ -34,7 +33,7 @@ const (
// Version returns the runtime name, runtime version and runtime API version.
func (c *criContainerdService) Version(ctx context.Context, r *runtime.VersionRequest) (*runtime.VersionResponse, error) {
resp, err := c.versionService.Version(ctx, &empty.Empty{})
resp, err := c.client.Version(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get containerd version: %v", err)
}

View File

@@ -1,64 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"errors"
"testing"
versionapi "github.com/containerd/containerd/api/services/version/v1"
"github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes/empty"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing"
)
func TestVersion(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
for desc, test := range map[string]struct {
versionRes *versionapi.VersionResponse
versionErr error
expectErr bool
}{
"should return error if containerd version returns error": {
versionErr: errors.New("random error"),
expectErr: true,
},
"should not return error if containerd version returns successfully": {
versionRes: &versionapi.VersionResponse{Version: "1.1.1"},
expectErr: false,
},
} {
t.Logf("TestCase %q", desc)
c := newTestCRIContainerdService()
ctx := context.Background()
mock := servertesting.NewMockVersionClient(ctrl)
mock.EXPECT().Version(ctx, &empty.Empty{}).Return(test.versionRes, test.versionErr)
c.versionService = mock
v, err := c.Version(ctx, &runtime.VersionRequest{})
if test.expectErr {
assert.Equal(t, test.expectErr, err != nil)
} else {
assert.Equal(t, test.versionRes.Version, v.RuntimeVersion)
}
}
}