diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 63d0643e6..cda810595 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -17,14 +17,84 @@ limitations under the License. package server import ( - "errors" + "fmt" + "time" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // CreateContainer creates a new container in the given PodSandbox. -func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (*runtime.CreateContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (retRes *runtime.CreateContainerResponse, retErr error) { + glog.V(2).Infof("CreateContainer within sandbox %q with container config %+v and sandbox config %+v", + r.GetPodSandboxId(), r.GetConfig(), r.GetSandboxConfig()) + defer func() { + if retErr == nil { + glog.V(2).Infof("CreateContainer returns container id %q", retRes.GetContainerId()) + } + }() + + config := r.GetConfig() + sandboxConfig := r.GetSandboxConfig() + sandbox, err := c.getSandbox(r.GetPodSandboxId()) + if err != nil { + return nil, fmt.Errorf("failed to find sandbox id %q: %v", r.GetPodSandboxId(), err) + } + + // Generate unique id and name for the container and reserve the name. + // Reserve the container name to avoid concurrent `CreateContainer` request creating + // the same container. + id := generateID() + name := makeContainerName(config.GetMetadata(), sandboxConfig.GetMetadata()) + if err := c.containerNameIndex.Reserve(name, id); err != nil { + return nil, fmt.Errorf("failed to reserve container name %q: %v", name, err) + } + defer func() { + // Release the name if the function returns with an error. + if retErr != nil { + c.containerNameIndex.ReleaseByName(name) + } + }() + + // Create initial container metadata. + meta := metadata.ContainerMetadata{ + ID: id, + Name: name, + SandboxID: sandbox.ID, + Config: config, + } + + // TODO(random-liu): [P0] Prepare container rootfs. + + // TODO(random-liu): [P0] Set ImageRef in ContainerMetadata with image id. + + // Create container root directory. + containerRootDir := getContainerRootDir(c.rootDir, id) + if err := c.os.MkdirAll(containerRootDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create container root directory %q: %v", + containerRootDir, err) + } + defer func() { + if retErr != nil { + // Cleanup the container root directory. + if err := c.os.RemoveAll(containerRootDir); err != nil { + glog.Errorf("Failed to remove container root directory %q: %v", + containerRootDir, err) + } + } + }() + + // Update container CreatedAt. + meta.CreatedAt = time.Now().UnixNano() + // Add container into container store. + if err := c.containerStore.Create(meta); err != nil { + return nil, fmt.Errorf("failed to add container metadata %+v into store: %v", + meta, err) + } + + return &runtime.CreateContainerResponse{ContainerId: id}, nil } diff --git a/pkg/server/container_list.go b/pkg/server/container_list.go index 254323e44..0d7cd8978 100644 --- a/pkg/server/container_list.go +++ b/pkg/server/container_list.go @@ -17,14 +17,87 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // ListContainers lists all containers matching the filter. -func (c *criContainerdService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (*runtime.ListContainersResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (retRes *runtime.ListContainersResponse, retErr error) { + glog.V(4).Infof("ListContainers with filter %+v", r.GetFilter()) + defer func() { + if retErr == nil { + glog.V(4).Infof("ListContainers returns containers %+v", retRes.GetContainers()) + } + }() + + // List all container metadata from store. + metas, err := c.containerStore.List() + if err != nil { + return nil, fmt.Errorf("failed to list metadata from container store: %v", err) + } + + var containers []*runtime.Container + for _, meta := range metas { + containers = append(containers, toCRIContainer(meta)) + } + + containers = c.filterCRIContainers(containers, r.GetFilter()) + return &runtime.ListContainersResponse{Containers: containers}, nil +} + +// toCRIContainer converts container metadata into CRI container. +func toCRIContainer(meta *metadata.ContainerMetadata) *runtime.Container { + return &runtime.Container{ + Id: meta.ID, + PodSandboxId: meta.SandboxID, + Metadata: meta.Config.GetMetadata(), + Image: meta.Config.GetImage(), + ImageRef: meta.ImageRef, + State: meta.State(), + CreatedAt: meta.CreatedAt, + Labels: meta.Config.GetLabels(), + Annotations: meta.Config.GetAnnotations(), + } +} + +// filterCRIContainers filters CRIContainers. +func (c *criContainerdService) filterCRIContainers(containers []*runtime.Container, filter *runtime.ContainerFilter) []*runtime.Container { + if filter == nil { + return containers + } + + filtered := []*runtime.Container{} + for _, cntr := range containers { + if filter.GetId() != "" && filter.GetId() != cntr.Id { + continue + } + if filter.GetPodSandboxId() != "" && filter.GetPodSandboxId() != cntr.PodSandboxId { + continue + } + if filter.GetState() != nil && filter.GetState().GetState() != cntr.State { + continue + } + if filter.GetLabelSelector() != nil { + match := true + for k, v := range filter.GetLabelSelector() { + got, ok := cntr.Labels[k] + if !ok || got != v { + match = false + break + } + } + if !match { + continue + } + } + filtered = append(filtered, cntr) + } + + return filtered } diff --git a/pkg/server/container_remove.go b/pkg/server/container_remove.go index d3e8bd1ce..706f0b9d1 100644 --- a/pkg/server/container_remove.go +++ b/pkg/server/container_remove.go @@ -17,14 +17,96 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // RemoveContainer removes the container. -func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (*runtime.RemoveContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (retRes *runtime.RemoveContainerResponse, retErr error) { + glog.V(2).Infof("RemoveContainer for %q", r.GetContainerId()) + defer func() { + if retErr == nil { + glog.V(2).Infof("RemoveContainer %q returns successfully", r.GetContainerId()) + } + }() + + id := r.GetContainerId() + + // Set removing state to prevent other start/remove operations against this container + // while it's being removed. + if err := c.setContainerRemoving(id); err != nil { + if !metadata.IsNotExistError(err) { + return nil, fmt.Errorf("failed to set removing state for container %q: %v", + id, err) + } + // Do not return error if container metadata doesn't exist. + glog.V(5).Infof("RemoveContainer called for container %q that does not exist", id) + return &runtime.RemoveContainerResponse{}, nil + } + defer func() { + if retErr == nil { + // Cleanup all index after successfully remove the container. + c.containerNameIndex.ReleaseByKey(id) + return + } + // Reset removing if remove failed. + if err := c.resetContainerRemoving(id); err != nil { + // TODO(random-liu): Deal with update failure. Actually Removing doesn't need to + // be checkpointed, we only need it to have the same lifecycle with container metadata. + glog.Errorf("failed to reset removing state for container %q: %v", + id, err) + } + }() + + // NOTE(random-liu): Docker set container to "Dead" state when start removing the + // container so as to avoid start/restart the container again. However, for current + // kubelet implementation, we'll never start a container once we decide to remove it, + // so we don't need the "Dead" state for now. + + // TODO(random-liu): [P0] Cleanup container rootfs. + + // Cleanup container root directory. + containerRootDir := getContainerRootDir(c.rootDir, id) + if err := c.os.RemoveAll(containerRootDir); err != nil { + return nil, fmt.Errorf("failed to remove container root directory %q: %v", + containerRootDir, err) + } + + // Delete container metadata. + if err := c.containerStore.Delete(id); err != nil { + return nil, fmt.Errorf("failed to delete container metadata for %q: %v", id, err) + } + + return &runtime.RemoveContainerResponse{}, nil +} + +// setContainerRemoving sets the container into removing state. In removing state, the +// container will not be started or removed again. +func (c *criContainerdService) setContainerRemoving(id string) error { + return c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + // Do not remove container if it's still running. + if meta.State() == runtime.ContainerState_CONTAINER_RUNNING { + return meta, fmt.Errorf("container %q is still running", id) + } + if meta.Removing { + return meta, fmt.Errorf("container is already in removing state") + } + meta.Removing = true + return meta, nil + }) +} + +// resetContainerRemoving resets the container removing state on remove failure. So +// that we could remove the container again. +func (c *criContainerdService) resetContainerRemoving(id string) error { + return c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + meta.Removing = false + return meta, nil + }) } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index dfd5f80a6..cc4a8697b 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -17,14 +17,328 @@ limitations under the License. package server import ( - "errors" + "encoding/json" + "fmt" + "io" + "os" + "time" + prototypes "github.com/gogo/protobuf/types" + "github.com/golang/glog" + runtimespec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/opencontainers/runtime-tools/generate" "golang.org/x/net/context" + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + "github.com/containerd/containerd/api/types/mount" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // StartContainer starts the container. -func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (*runtime.StartContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) { + glog.V(2).Infof("StartContainer for %q", r.GetContainerId()) + defer func() { + if retErr == nil { + glog.V(2).Infof("StartContainer %q returns successfully", r.GetContainerId()) + } + }() + + container, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + id := container.ID + + var startErr error + // start container in one transaction to avoid race with event monitor. + if err := c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + // Always apply metadata change no matter startContainer fails or not. Because startContainer + // may change container state no matter it fails or succeeds. + startErr = c.startContainer(ctx, id, &meta) + return meta, nil + }); startErr != nil { + return nil, startErr + } else if err != nil { + return nil, fmt.Errorf("failed to update container %q metadata: %v", id, err) + } + return &runtime.StartContainerResponse{}, nil +} + +// startContainer actually starts the container. The function needs to be run in one transaction. Any updates +// to the metadata passed in will be applied to container store no matter the function returns error or not. +func (c *criContainerdService) startContainer(ctx context.Context, id string, meta *metadata.ContainerMetadata) (retErr error) { + config := meta.Config + // Return error if container is not in created state. + if meta.State() != runtime.ContainerState_CONTAINER_CREATED { + return fmt.Errorf("container %q is in %s state", id, criContainerStateToString(meta.State())) + } + + // Do not start the container when there is a removal in progress. + if meta.Removing { + return fmt.Errorf("container %q is in removing state", id) + } + + defer func() { + if retErr != nil { + // Set container to exited if fail to start. + meta.Pid = 0 + meta.FinishedAt = time.Now().UnixNano() + meta.ExitCode = errorStartExitCode + meta.Reason = errorStartReason + meta.Message = retErr.Error() + } + }() + + // Get sandbox config from sandbox store. + sandboxMeta, err := c.getSandbox(meta.SandboxID) + if err != nil { + return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err) + } + sandboxConfig := sandboxMeta.Config + sandboxID := meta.SandboxID + // Make sure sandbox is running. + sandboxInfo, err := c.containerService.Info(ctx, &execution.InfoRequest{ID: sandboxID}) + if err != nil { + return fmt.Errorf("failed to get sandbox container %q info: %v", sandboxID, err) + } + // This is only a best effort check, sandbox may still exit after this. If sandbox fails + // before starting the container, the start will fail. + if sandboxInfo.Status != container.Status_RUNNING { + return fmt.Errorf("sandbox container %q is not running", sandboxID) + } + sandboxPid := sandboxInfo.Pid + glog.V(2).Infof("Sandbox container %q is running with pid %d", sandboxID, sandboxPid) + + // Generate containerd container create options. + // TODO(random-liu): [P0] Create container rootfs with image ref. + // TODO(random-liu): [P0] Apply default image config. + // Use fixed rootfs path for now. + const rootPath = "/" + + spec, err := c.generateContainerSpec(id, sandboxPid, config, sandboxConfig) + if err != nil { + return fmt.Errorf("failed to generate container %q spec: %v", id, err) + } + rawSpec, err := json.Marshal(spec) + if err != nil { + return fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err) + } + glog.V(4).Infof("Container spec: %+v", spec) + + containerRootDir := getContainerRootDir(c.rootDir, id) + stdin, stdout, stderr := getStreamingPipes(containerRootDir) + // Set stdin to empty if Stdin == false. + if !config.GetStdin() { + stdin = "" + } + stdinPipe, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, stdin, stdout, stderr) + if err != nil { + return fmt.Errorf("failed to prepare streaming pipes: %v", err) + } + defer func() { + if retErr != nil { + if stdinPipe != nil { + stdinPipe.Close() + } + stdoutPipe.Close() + stderrPipe.Close() + } + }() + // Redirect the stream to std for now. + // TODO(random-liu): [P1] Support container logging. + // TODO(random-liu): [P1] Support StdinOnce after container logging is added. + if stdinPipe != nil { + go func(w io.WriteCloser) { + io.Copy(w, os.Stdin) // nolint: errcheck + w.Close() + }(stdinPipe) + } + go func(r io.ReadCloser) { + io.Copy(os.Stdout, r) // nolint: errcheck + r.Close() + }(stdoutPipe) + // Only redirect stderr when there is no tty. + if !config.GetTty() { + go func(r io.ReadCloser) { + io.Copy(os.Stderr, r) // nolint: errcheck + r.Close() + }(stderrPipe) + } + + // Create containerd container. + createOpts := &execution.CreateRequest{ + ID: id, + Spec: &prototypes.Any{ + TypeUrl: runtimespec.Version, + Value: rawSpec, + }, + // TODO(random-liu): [P0] Get rootfs mount from containerd. + Rootfs: []*mount.Mount{ + { + Type: "bind", + Source: rootPath, + Options: []string{ + "rw", + "rbind", + }, + }, + }, + Runtime: defaultRuntime, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Terminal: config.GetTty(), + } + glog.V(2).Infof("Create containerd container (id=%q, name=%q) with options %+v.", + id, meta.Name, createOpts) + createResp, err := c.containerService.Create(ctx, createOpts) + if err != nil { + return fmt.Errorf("failed to create containerd container: %v", err) + } + defer func() { + if retErr != nil { + // Cleanup the containerd container if an error is returned. + if _, err := c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}); err != nil { + glog.Errorf("Failed to delete containerd container %q: %v", id, err) + } + } + }() + + // Start containerd container. + if _, err := c.containerService.Start(ctx, &execution.StartRequest{ID: id}); err != nil { + return fmt.Errorf("failed to start containerd container %q: %v", id, err) + } + + // Update container start timestamp. + meta.Pid = createResp.Pid + meta.StartedAt = time.Now().UnixNano() + return nil +} + +func (c *criContainerdService) generateContainerSpec(id string, sandboxPid uint32, config *runtime.ContainerConfig, sandboxConfig *runtime.PodSandboxConfig) (*runtimespec.Spec, error) { + // Creates a spec Generator with the default spec. + // TODO(random-liu): [P2] Move container runtime spec generation into a helper function. + g := generate.New() + + // Set the relative path to the rootfs of the container from containerd's + // pre-defined directory. + g.SetRootPath(relativeRootfsPath) + + if len(config.GetCommand()) != 0 || len(config.GetArgs()) != 0 { + g.SetProcessArgs(append(config.GetCommand(), config.GetArgs()...)) + } + + if config.GetWorkingDir() != "" { + g.SetProcessCwd(config.GetWorkingDir()) + } + + for _, e := range config.GetEnvs() { + g.AddProcessEnv(e.GetKey(), e.GetValue()) + } + + addOCIBindMounts(&g, config.GetMounts()) + + // TODO(random-liu): [P1] Set device mapping. + // Ref https://github.com/moby/moby/blob/master/oci/devices_linux.go. + + // TODO(random-liu): [P1] Handle container logging, decorate and redirect to file. + + setOCILinuxResource(&g, config.GetLinux().GetResources()) + + if sandboxConfig.GetLinux().GetCgroupParent() != "" { + cgroupsPath := getCgroupsPath(sandboxConfig.GetLinux().GetCgroupParent(), id) + g.SetLinuxCgroupsPath(cgroupsPath) + } + + g.SetProcessTerminal(config.GetTty()) + + securityContext := config.GetLinux().GetSecurityContext() + + if err := setOCICapabilities(&g, securityContext.GetCapabilities()); err != nil { + return nil, fmt.Errorf("failed to set capabilities %+v: %v", + securityContext.GetCapabilities(), err) + } + + // TODO(random-liu): [P0] Handle privileged. + + // Set namespaces, share namespace with sandbox container. + setOCINamespaces(&g, securityContext.GetNamespaceOptions(), sandboxPid) + + // TODO(random-liu): [P1] Set selinux options. + + // TODO(random-liu): [P1] Set user/username. + + supplementalGroups := securityContext.GetSupplementalGroups() + for _, group := range supplementalGroups { + g.AddProcessAdditionalGid(uint32(group)) + } + + g.SetRootReadonly(securityContext.GetReadonlyRootfs()) + + // TODO(random-liu): [P2] Add apparmor and seccomp. + + // TODO(random-liu): [P1] Bind mount sandbox /dev/shm. + + // TODO(random-liu): [P0] Bind mount sandbox resolv.conf. + + return g.Spec(), nil +} + +// addOCIBindMounts adds bind mounts. +func addOCIBindMounts(g *generate.Generator, mounts []*runtime.Mount) { + for _, mount := range mounts { + dst := mount.GetContainerPath() + src := mount.GetHostPath() + options := []string{"rw"} + if mount.GetReadonly() { + options = []string{"ro"} + } + // TODO(random-liu): [P1] Apply selinux label + g.AddBindMount(src, dst, options) + } +} + +// setOCILinuxResource set container resource limit. +func setOCILinuxResource(g *generate.Generator, resources *runtime.LinuxContainerResources) { + if resources == nil { + return + } + g.SetLinuxResourcesCPUPeriod(uint64(resources.GetCpuPeriod())) + g.SetLinuxResourcesCPUQuota(resources.GetCpuQuota()) + g.SetLinuxResourcesCPUShares(uint64(resources.GetCpuShares())) + g.SetLinuxResourcesMemoryLimit(uint64(resources.GetMemoryLimitInBytes())) + g.SetLinuxResourcesOOMScoreAdj(int(resources.GetOomScoreAdj())) +} + +// setOCICapabilities adds/drops process capabilities. +func setOCICapabilities(g *generate.Generator, capabilities *runtime.Capability) error { + if capabilities == nil { + return nil + } + + for _, c := range capabilities.GetAddCapabilities() { + if err := g.AddProcessCapability(c); err != nil { + return err + } + } + + for _, c := range capabilities.GetDropCapabilities() { + if err := g.DropProcessCapability(c); err != nil { + return err + } + } + + return nil +} + +// setOCINamespaces sets namespaces. +func setOCINamespaces(g *generate.Generator, namespaces *runtime.NamespaceOption, sandboxPid uint32) { + g.AddOrReplaceLinuxNamespace(string(runtimespec.NetworkNamespace), getNetworkNamespace(sandboxPid)) // nolint: errcheck + g.AddOrReplaceLinuxNamespace(string(runtimespec.IPCNamespace), getIPCNamespace(sandboxPid)) // nolint: errcheck + g.AddOrReplaceLinuxNamespace(string(runtimespec.UTSNamespace), getUTSNamespace(sandboxPid)) // nolint: errcheck + g.AddOrReplaceLinuxNamespace(string(runtimespec.PIDNamespace), getPIDNamespace(sandboxPid)) // nolint: errcheck } diff --git a/pkg/server/container_status.go b/pkg/server/container_status.go index 78c871964..26a40eb6a 100644 --- a/pkg/server/container_status.go +++ b/pkg/server/container_status.go @@ -17,14 +17,60 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // ContainerStatus inspects the container and returns the status. -func (c *criContainerdService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (*runtime.ContainerStatusResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (retRes *runtime.ContainerStatusResponse, retErr error) { + glog.V(4).Infof("ContainerStatus for container %q", r.GetContainerId()) + defer func() { + if retErr == nil { + glog.V(4).Infof("ContainerStatus for %q returns status %+v", r.GetContainerId(), retRes.GetStatus()) + } + }() + + meta, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + + return &runtime.ContainerStatusResponse{ + Status: toCRIContainerStatus(meta), + }, nil +} + +// toCRIContainerStatus converts container metadata to CRI container status. +func toCRIContainerStatus(meta *metadata.ContainerMetadata) *runtime.ContainerStatus { + state := meta.State() + reason := meta.Reason + if state == runtime.ContainerState_CONTAINER_EXITED && reason == "" { + if meta.ExitCode == 0 { + reason = completeExitReason + } else { + reason = errorExitReason + } + } + return &runtime.ContainerStatus{ + Id: meta.ID, + Metadata: meta.Config.GetMetadata(), + State: state, + CreatedAt: meta.CreatedAt, + StartedAt: meta.StartedAt, + FinishedAt: meta.FinishedAt, + ExitCode: meta.ExitCode, + Image: meta.Config.GetImage(), + ImageRef: meta.ImageRef, + Reason: reason, + Message: meta.Message, + Labels: meta.Config.GetLabels(), + Annotations: meta.Config.GetAnnotations(), + Mounts: meta.Config.GetMounts(), + } } diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index 28e7b156c..c686981c0 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -17,14 +17,115 @@ limitations under the License. package server import ( - "errors" + "fmt" + "time" + "github.com/golang/glog" "golang.org/x/net/context" + "golang.org/x/sys/unix" + + "github.com/containerd/containerd/api/services/execution" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" +) + +const ( + // stopCheckPollInterval is the the interval to check whether a container + // is stopped successfully. + stopCheckPollInterval = 100 * time.Millisecond + + // killContainerTimeout is the timeout that we wait for the container to + // be SIGKILLed. + killContainerTimeout = 2 * time.Minute ) // StopContainer stops a running container with a grace period (i.e., timeout). -func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (retRes *runtime.StopContainerResponse, retErr error) { + glog.V(2).Infof("StopContainer for %q with timeout %d (s)", r.GetContainerId(), r.GetTimeout()) + defer func() { + if retErr == nil { + glog.V(2).Infof("StopContainer %q returns successfully", r.GetContainerId()) + } + }() + + // Get container config from container store. + meta, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + id := r.GetContainerId() + + // Return without error if container is not running. This makes sure that + // stop only takes real action after the container is started. + if meta.State() != runtime.ContainerState_CONTAINER_RUNNING { + glog.V(2).Infof("Container to stop %q is not running, current state %q", + id, criContainerStateToString(meta.State())) + return &runtime.StopContainerResponse{}, nil + } + + // TODO(random-liu): [P1] Get stop signal from image config. + stopSignal := unix.SIGTERM + glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal) + _, err = c.containerService.Kill(ctx, &execution.KillRequest{ID: id, Signal: uint32(stopSignal)}) + if err != nil { + if isContainerdContainerNotExistError(err) { + return &runtime.StopContainerResponse{}, nil + } + return nil, fmt.Errorf("failed to stop container %q: %v", id, err) + } + + err = c.waitContainerStop(id, time.Duration(r.GetTimeout())*time.Second) + if err == nil { + return &runtime.StopContainerResponse{}, nil + } + glog.Errorf("Stop container %q timed out: %v", id, err) + + glog.V(2).Infof("Delete container from containerd %q", id) + _, err = c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}) + if err != nil { + if isContainerdContainerNotExistError(err) { + return &runtime.StopContainerResponse{}, nil + } + return nil, fmt.Errorf("failed to delete container %q: %v", id, err) + } + + // Wait forever until container stop is observed by event monitor. + if err := c.waitContainerStop(id, killContainerTimeout); err != nil { + return nil, fmt.Errorf("error occurs during waiting for container %q to stop: %v", + id, err) + } + return &runtime.StopContainerResponse{}, nil +} + +// waitContainerStop polls container state until timeout exceeds or container is stopped. +func (c *criContainerdService) waitContainerStop(id string, timeout time.Duration) error { + ticker := time.NewTicker(stopCheckPollInterval) + defer ticker.Stop() + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() + for { + // Poll once before waiting for stopCheckPollInterval. + meta, err := c.containerStore.Get(id) + if err != nil { + if !metadata.IsNotExistError(err) { + return fmt.Errorf("failed to get container %q metadata: %v", id, err) + } + // Do not return error here because container was removed means + // it is already stopped. + glog.Warningf("Container %q was removed during stopping", id) + return nil + } + // TODO(random-liu): Use channel with event handler instead of polling. + if meta.State() == runtime.ContainerState_CONTAINER_EXITED { + return nil + } + select { + case <-timeoutTimer.C: + return fmt.Errorf("wait container %q stop timeout", id) + case <-ticker.C: + continue + } + } } diff --git a/pkg/server/events.go b/pkg/server/events.go new file mode 100644 index 000000000..38446126c --- /dev/null +++ b/pkg/server/events.go @@ -0,0 +1,94 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "github.com/golang/glog" + "golang.org/x/net/context" + + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" +) + +// startEventMonitor starts an event monitor which monitors and handles all +// container events. +func (c *criContainerdService) startEventMonitor() error { + events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{}) + if err != nil { + return err + } + go func() { + for { + c.handleEvent(events) + } + }() + return nil +} + +// handleEvent receives an event from contaienrd and handles the event. +func (c *criContainerdService) handleEvent(events execution.ContainerService_EventsClient) { + e, err := events.Recv() + if err != nil { + glog.Errorf("Failed to receive event: %v", err) + return + } + glog.V(2).Infof("Received container event: %+v", e) + switch e.Type { + // If containerd-shim exits unexpectedly, there will be no corresponding event. + // However, containerd could not retrieve container state in that case, so it's + // fine to leave out that case for now. + // TODO(random-liu): [P2] Handle container-shim exit. + case container.Event_EXIT: + meta, err := c.containerStore.Get(e.ID) + if err != nil { + glog.Errorf("Failed to get container %q metadata: %v", e.ID, err) + return + } + if e.Pid != meta.Pid { + // Not init process dies, ignore the event. + return + } + // Delete the container from containerd. + _, err = c.containerService.Delete(context.Background(), &execution.DeleteRequest{ID: e.ID}) + if err != nil && !isContainerdContainerNotExistError(err) { + // TODO(random-liu): [P0] Enqueue the event and retry. + glog.Errorf("Failed to delete container %q: %v", e.ID, err) + return + } + err = c.containerStore.Update(e.ID, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + // If FinishedAt has been set (e.g. with start failure), keep as + // it is. + if meta.FinishedAt != 0 { + return meta, nil + } + meta.Pid = 0 + meta.FinishedAt = e.ExitedAt.UnixNano() + meta.ExitCode = int32(e.ExitStatus) + return meta, nil + }) + if err != nil { + glog.Errorf("Failed to update container %q state: %v", e.ID, err) + // TODO(random-liu): [P0] Enqueue the event and retry. + return + } + case container.Event_OOM: + // TODO(random-liu): [P1] Handle OOM event. + } + return +} diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 54cb7074a..791972581 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -18,11 +18,14 @@ package server import ( "fmt" + "io" "path/filepath" "strings" + "syscall" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/truncindex" + "golang.org/x/net/context" "google.golang.org/grpc" "github.com/containerd/containerd" @@ -32,6 +35,18 @@ import ( "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) +const ( + // errorStartReason is the exit reason when fails to start container. + errorStartReason = "StartError" + // errorStartExitCode is the exit code when fails to start container. + // 128 is the same with Docker's behavior. + errorStartExitCode = 128 + // completeExitReason is the exit reason when container exits with code 0. + completeExitReason = "Completed" + // errorExitReason is the exit reason when container exits with code non-zero. + errorExitReason = "Error" +) + const ( // relativeRootfsPath is the rootfs path relative to bundle path. relativeRootfsPath = "rootfs" @@ -42,6 +57,8 @@ const ( // directory of the sandbox, all files created for the sandbox will be // placed under this directory. sandboxesDir = "sandboxes" + // containersDir contains all container root. + containersDir = "containers" // stdinNamedPipe is the name of stdin named pipe. stdinNamedPipe = "stdin" // stdoutNamedPipe is the name of stdout named pipe. @@ -52,6 +69,12 @@ const ( nameDelimiter = "_" // netNSFormat is the format of network namespace of a process. netNSFormat = "/proc/%v/ns/net" + // ipcNSFormat is the format of ipc namespace of a process. + ipcNSFormat = "/proc/%v/ns/ipc" + // utsNSFormat is the format of uts namespace of a process. + utsNSFormat = "/proc/%v/ns/uts" + // pidNSFormat is the format of pid namespace of a process. + pidNSFormat = "/proc/%v/ns/pid" ) // generateID generates a random unique id. @@ -70,6 +93,19 @@ func makeSandboxName(s *runtime.PodSandboxMetadata) string { }, nameDelimiter) } +// makeContainerName generates container name from sandbox and container metadata. +// The name generated is unique as long as the sandbox container combination is +// unique. +func makeContainerName(c *runtime.ContainerMetadata, s *runtime.PodSandboxMetadata) string { + return strings.Join([]string{ + c.Name, // 0 + s.Name, // 1: sandbox name + s.Namespace, // 2: sandbox namespace + s.Uid, // 3: sandbox uid + fmt.Sprintf("%d", c.Attempt), // 4 + }, nameDelimiter) +} + // getCgroupsPath generates container cgroups path. func getCgroupsPath(cgroupsParent string, id string) string { // TODO(random-liu): [P0] Handle systemd. @@ -82,6 +118,11 @@ func getSandboxRootDir(rootDir, id string) string { return filepath.Join(rootDir, sandboxesDir, id) } +// getContainerRootDir returns the root directory for managing container files. +func getContainerRootDir(rootDir, id string) string { + return filepath.Join(rootDir, containersDir, id) +} + // getStreamingPipes returns the stdin/stdout/stderr pipes path in the root. func getStreamingPipes(rootDir string) (string, string, string) { stdin := filepath.Join(rootDir, stdinNamedPipe) @@ -90,11 +131,57 @@ func getStreamingPipes(rootDir string) (string, string, string) { return stdin, stdout, stderr } +// prepareStreamingPipes prepares stream named pipe for container. returns nil +// streaming handler if corresponding stream path is empty. +func (c *criContainerdService) prepareStreamingPipes(ctx context.Context, stdin, stdout, stderr string) ( + i io.WriteCloser, o io.ReadCloser, e io.ReadCloser, retErr error) { + pipes := map[string]io.ReadWriteCloser{} + for t, stream := range map[string]struct { + path string + flag int + }{ + "stdin": {stdin, syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, + "stdout": {stdout, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, + "stderr": {stderr, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, + } { + if stream.path == "" { + continue + } + s, err := c.os.OpenFifo(ctx, stream.path, stream.flag, 0700) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to open named pipe %q: %v", + stream.path, err) + } + defer func(cl io.Closer) { + if retErr != nil { + cl.Close() + } + }(s) + pipes[t] = s + } + return pipes["stdin"], pipes["stdout"], pipes["stderr"], nil +} + // getNetworkNamespace returns the network namespace of a process. func getNetworkNamespace(pid uint32) string { return fmt.Sprintf(netNSFormat, pid) } +// getIPCNamespace returns the ipc namespace of a process. +func getIPCNamespace(pid uint32) string { + return fmt.Sprintf(ipcNSFormat, pid) +} + +// getUTSNamespace returns the uts namespace of a process. +func getUTSNamespace(pid uint32) string { + return fmt.Sprintf(utsNSFormat, pid) +} + +// getPIDNamespace returns the pid namespace of a process. +func getPIDNamespace(pid uint32) string { + return fmt.Sprintf(pidNSFormat, pid) +} + // isContainerdContainerNotExistError checks whether a grpc error is containerd // ErrContainerNotExist error. // TODO(random-liu): Containerd should expose error better through api. @@ -124,3 +211,8 @@ func (c *criContainerdService) getSandbox(id string) (*metadata.SandboxMetadata, } return c.sandboxStore.Get(id) } + +// criContainerStateToString formats CRI container state to string. +func criContainerStateToString(state runtime.ContainerState) string { + return runtime.ContainerState_name[int32(state)] +} diff --git a/pkg/server/sandbox_list.go b/pkg/server/sandbox_list.go index 5baf81919..0d1793190 100644 --- a/pkg/server/sandbox_list.go +++ b/pkg/server/sandbox_list.go @@ -117,7 +117,8 @@ func (c *criContainerdService) filterCRISandboxes(sandboxes []*runtime.PodSandbo if filter.GetLabelSelector() != nil { match := true for k, v := range filter.GetLabelSelector() { - if s.Labels[k] != v { + got, ok := s.Labels[k] + if !ok || got != v { match = false break } diff --git a/pkg/server/sandbox_remove.go b/pkg/server/sandbox_remove.go index eb11cf292..e47adc891 100644 --- a/pkg/server/sandbox_remove.go +++ b/pkg/server/sandbox_remove.go @@ -35,7 +35,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. glog.V(2).Infof("RemovePodSandbox for sandbox %q", r.GetPodSandboxId()) defer func() { if retErr == nil { - glog.V(2).Info("RemovePodSandbox returns successfully") + glog.V(2).Info("RemovePodSandbox %q returns successfully", r.GetPodSandboxId()) } }() @@ -56,6 +56,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. // TODO(random-liu): [P2] Remove all containers in the sandbox. // Return error if sandbox container is not fully stopped. + // TODO(random-liu): [P0] Make sure network is torn down, may need to introduce a state. _, err = c.containerService.Info(ctx, &execution.InfoRequest{ID: id}) if err != nil && !isContainerdContainerNotExistError(err) { return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index 4cbb7b8b2..c0acc84d4 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "io/ioutil" - "syscall" "time" prototypes "github.com/gogo/protobuf/types" @@ -109,14 +108,14 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run // TODO(random-liu): [P1] Moving following logging related logic into util functions. // Discard sandbox container output because we don't care about it. _, stdout, stderr := getStreamingPipes(sandboxRootDir) - for _, p := range []string{stdout, stderr} { - f, err := c.os.OpenFifo(ctx, p, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) - if err != nil { - return nil, fmt.Errorf("failed to open named pipe %q: %v", p, err) - } - defer func(c io.Closer) { + _, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr) + if err != nil { + return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err) + } + for _, f := range []io.ReadCloser{stdoutPipe, stderrPipe} { + defer func(cl io.Closer) { if retErr != nil { - c.Close() + cl.Close() } }(f) go func(r io.ReadCloser) { diff --git a/pkg/server/sandbox_status.go b/pkg/server/sandbox_status.go index a54c4a608..08fd0b53f 100644 --- a/pkg/server/sandbox_status.go +++ b/pkg/server/sandbox_status.go @@ -35,7 +35,7 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime. glog.V(4).Infof("PodSandboxStatus for sandbox %q", r.GetPodSandboxId()) defer func() { if retErr == nil { - glog.V(4).Infof("PodSandboxStatus returns status %+v", retRes.GetStatus()) + glog.V(4).Infof("PodSandboxStatus for %q returns status %+v", r.GetPodSandboxId(), retRes.GetStatus()) } }() diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index 53285634a..c4107bb47 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -33,7 +33,7 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St glog.V(2).Infof("StopPodSandbox for sandbox %q", r.GetPodSandboxId()) defer func() { if retErr == nil { - glog.V(2).Info("StopPodSandbox returns successfully") + glog.V(2).Info("StopPodSandbox %q returns successfully", r.GetPodSandboxId()) } }() diff --git a/pkg/server/service.go b/pkg/server/service.go index 9909ab8e5..d05011c39 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -53,6 +53,7 @@ import ( // CRIContainerdService is the interface implement CRI remote service server. type CRIContainerdService interface { + Start() error runtime.RuntimeServiceServer runtime.ImageServiceServer } @@ -74,6 +75,11 @@ type criContainerdService struct { // id "abcdefg" is added, we could use "abcd" to identify the same thing // as long as there is no ambiguity. sandboxIDIndex *truncindex.TruncIndex + // containerStore stores all container metadata. + containerStore metadata.ContainerStore + // containerNameIndex stores all container names and make sure each + // name is unique. + containerNameIndex *registrar.Registrar // containerService is containerd container service client. containerService execution.ContainerServiceClient // contentIngester is the containerd service to ingest content into @@ -98,14 +104,22 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir string) CRIContainer os: osinterface.RealOS{}, rootDir: rootDir, sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()), + containerStore: metadata.NewContainerStore(store.NewMetadataStore()), imageMetadataStore: metadata.NewImageMetadataStore(store.NewMetadataStore()), - // TODO(random-liu): Register sandbox id/name for recovered sandbox. - sandboxNameIndex: registrar.NewRegistrar(), - sandboxIDIndex: truncindex.NewTruncIndex(nil), - containerService: execution.NewContainerServiceClient(conn), - imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), - contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)), - contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), - rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)), + // TODO(random-liu): Register sandbox/container id/name for recovered sandbox/container. + // TODO(random-liu): Use the same name and id index for both container and sandbox. + sandboxNameIndex: registrar.NewRegistrar(), + sandboxIDIndex: truncindex.NewTruncIndex(nil), + // TODO(random-liu): Add container id index. + containerNameIndex: registrar.NewRegistrar(), + containerService: execution.NewContainerServiceClient(conn), + imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), + contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)), + contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), + rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)), } } + +func (c *criContainerdService) Start() error { + return c.startEventMonitor() +}