Add initial container implementation.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Random-Liu 2017-05-02 14:50:12 -07:00 committed by Lantao Liu
parent 11fff60aff
commit 6ac71e5862
14 changed files with 925 additions and 38 deletions

View File

@ -17,14 +17,84 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// CreateContainer creates a new container in the given PodSandbox.
func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (*runtime.CreateContainerResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (retRes *runtime.CreateContainerResponse, retErr error) {
glog.V(2).Infof("CreateContainer within sandbox %q with container config %+v and sandbox config %+v",
r.GetPodSandboxId(), r.GetConfig(), r.GetSandboxConfig())
defer func() {
if retErr == nil {
glog.V(2).Infof("CreateContainer returns container id %q", retRes.GetContainerId())
}
}()
config := r.GetConfig()
sandboxConfig := r.GetSandboxConfig()
sandbox, err := c.getSandbox(r.GetPodSandboxId())
if err != nil {
return nil, fmt.Errorf("failed to find sandbox id %q: %v", r.GetPodSandboxId(), err)
}
// Generate unique id and name for the container and reserve the name.
// Reserve the container name to avoid concurrent `CreateContainer` request creating
// the same container.
id := generateID()
name := makeContainerName(config.GetMetadata(), sandboxConfig.GetMetadata())
if err := c.containerNameIndex.Reserve(name, id); err != nil {
return nil, fmt.Errorf("failed to reserve container name %q: %v", name, err)
}
defer func() {
// Release the name if the function returns with an error.
if retErr != nil {
c.containerNameIndex.ReleaseByName(name)
}
}()
// Create initial container metadata.
meta := metadata.ContainerMetadata{
ID: id,
Name: name,
SandboxID: sandbox.ID,
Config: config,
}
// TODO(random-liu): [P0] Prepare container rootfs.
// TODO(random-liu): [P0] Set ImageRef in ContainerMetadata with image id.
// Create container root directory.
containerRootDir := getContainerRootDir(c.rootDir, id)
if err := c.os.MkdirAll(containerRootDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create container root directory %q: %v",
containerRootDir, err)
}
defer func() {
if retErr != nil {
// Cleanup the container root directory.
if err := c.os.RemoveAll(containerRootDir); err != nil {
glog.Errorf("Failed to remove container root directory %q: %v",
containerRootDir, err)
}
}
}()
// Update container CreatedAt.
meta.CreatedAt = time.Now().UnixNano()
// Add container into container store.
if err := c.containerStore.Create(meta); err != nil {
return nil, fmt.Errorf("failed to add container metadata %+v into store: %v",
meta, err)
}
return &runtime.CreateContainerResponse{ContainerId: id}, nil
}

View File

@ -17,14 +17,87 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// ListContainers lists all containers matching the filter.
func (c *criContainerdService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (*runtime.ListContainersResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (retRes *runtime.ListContainersResponse, retErr error) {
glog.V(4).Infof("ListContainers with filter %+v", r.GetFilter())
defer func() {
if retErr == nil {
glog.V(4).Infof("ListContainers returns containers %+v", retRes.GetContainers())
}
}()
// List all container metadata from store.
metas, err := c.containerStore.List()
if err != nil {
return nil, fmt.Errorf("failed to list metadata from container store: %v", err)
}
var containers []*runtime.Container
for _, meta := range metas {
containers = append(containers, toCRIContainer(meta))
}
containers = c.filterCRIContainers(containers, r.GetFilter())
return &runtime.ListContainersResponse{Containers: containers}, nil
}
// toCRIContainer converts container metadata into CRI container.
func toCRIContainer(meta *metadata.ContainerMetadata) *runtime.Container {
return &runtime.Container{
Id: meta.ID,
PodSandboxId: meta.SandboxID,
Metadata: meta.Config.GetMetadata(),
Image: meta.Config.GetImage(),
ImageRef: meta.ImageRef,
State: meta.State(),
CreatedAt: meta.CreatedAt,
Labels: meta.Config.GetLabels(),
Annotations: meta.Config.GetAnnotations(),
}
}
// filterCRIContainers filters CRIContainers.
func (c *criContainerdService) filterCRIContainers(containers []*runtime.Container, filter *runtime.ContainerFilter) []*runtime.Container {
if filter == nil {
return containers
}
filtered := []*runtime.Container{}
for _, cntr := range containers {
if filter.GetId() != "" && filter.GetId() != cntr.Id {
continue
}
if filter.GetPodSandboxId() != "" && filter.GetPodSandboxId() != cntr.PodSandboxId {
continue
}
if filter.GetState() != nil && filter.GetState().GetState() != cntr.State {
continue
}
if filter.GetLabelSelector() != nil {
match := true
for k, v := range filter.GetLabelSelector() {
got, ok := cntr.Labels[k]
if !ok || got != v {
match = false
break
}
}
if !match {
continue
}
}
filtered = append(filtered, cntr)
}
return filtered
}

View File

@ -17,14 +17,96 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// RemoveContainer removes the container.
func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (*runtime.RemoveContainerResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (retRes *runtime.RemoveContainerResponse, retErr error) {
glog.V(2).Infof("RemoveContainer for %q", r.GetContainerId())
defer func() {
if retErr == nil {
glog.V(2).Infof("RemoveContainer %q returns successfully", r.GetContainerId())
}
}()
id := r.GetContainerId()
// Set removing state to prevent other start/remove operations against this container
// while it's being removed.
if err := c.setContainerRemoving(id); err != nil {
if !metadata.IsNotExistError(err) {
return nil, fmt.Errorf("failed to set removing state for container %q: %v",
id, err)
}
// Do not return error if container metadata doesn't exist.
glog.V(5).Infof("RemoveContainer called for container %q that does not exist", id)
return &runtime.RemoveContainerResponse{}, nil
}
defer func() {
if retErr == nil {
// Cleanup all index after successfully remove the container.
c.containerNameIndex.ReleaseByKey(id)
return
}
// Reset removing if remove failed.
if err := c.resetContainerRemoving(id); err != nil {
// TODO(random-liu): Deal with update failure. Actually Removing doesn't need to
// be checkpointed, we only need it to have the same lifecycle with container metadata.
glog.Errorf("failed to reset removing state for container %q: %v",
id, err)
}
}()
// NOTE(random-liu): Docker set container to "Dead" state when start removing the
// container so as to avoid start/restart the container again. However, for current
// kubelet implementation, we'll never start a container once we decide to remove it,
// so we don't need the "Dead" state for now.
// TODO(random-liu): [P0] Cleanup container rootfs.
// Cleanup container root directory.
containerRootDir := getContainerRootDir(c.rootDir, id)
if err := c.os.RemoveAll(containerRootDir); err != nil {
return nil, fmt.Errorf("failed to remove container root directory %q: %v",
containerRootDir, err)
}
// Delete container metadata.
if err := c.containerStore.Delete(id); err != nil {
return nil, fmt.Errorf("failed to delete container metadata for %q: %v", id, err)
}
return &runtime.RemoveContainerResponse{}, nil
}
// setContainerRemoving sets the container into removing state. In removing state, the
// container will not be started or removed again.
func (c *criContainerdService) setContainerRemoving(id string) error {
return c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) {
// Do not remove container if it's still running.
if meta.State() == runtime.ContainerState_CONTAINER_RUNNING {
return meta, fmt.Errorf("container %q is still running", id)
}
if meta.Removing {
return meta, fmt.Errorf("container is already in removing state")
}
meta.Removing = true
return meta, nil
})
}
// resetContainerRemoving resets the container removing state on remove failure. So
// that we could remove the container again.
func (c *criContainerdService) resetContainerRemoving(id string) error {
return c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) {
meta.Removing = false
return meta, nil
})
}

View File

@ -17,14 +17,328 @@ limitations under the License.
package server
import (
"errors"
"encoding/json"
"fmt"
"io"
"os"
"time"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/glog"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/opencontainers/runtime-tools/generate"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/container"
"github.com/containerd/containerd/api/types/mount"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// StartContainer starts the container.
func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (*runtime.StartContainerResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) {
glog.V(2).Infof("StartContainer for %q", r.GetContainerId())
defer func() {
if retErr == nil {
glog.V(2).Infof("StartContainer %q returns successfully", r.GetContainerId())
}
}()
container, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
}
id := container.ID
var startErr error
// start container in one transaction to avoid race with event monitor.
if err := c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) {
// Always apply metadata change no matter startContainer fails or not. Because startContainer
// may change container state no matter it fails or succeeds.
startErr = c.startContainer(ctx, id, &meta)
return meta, nil
}); startErr != nil {
return nil, startErr
} else if err != nil {
return nil, fmt.Errorf("failed to update container %q metadata: %v", id, err)
}
return &runtime.StartContainerResponse{}, nil
}
// startContainer actually starts the container. The function needs to be run in one transaction. Any updates
// to the metadata passed in will be applied to container store no matter the function returns error or not.
func (c *criContainerdService) startContainer(ctx context.Context, id string, meta *metadata.ContainerMetadata) (retErr error) {
config := meta.Config
// Return error if container is not in created state.
if meta.State() != runtime.ContainerState_CONTAINER_CREATED {
return fmt.Errorf("container %q is in %s state", id, criContainerStateToString(meta.State()))
}
// Do not start the container when there is a removal in progress.
if meta.Removing {
return fmt.Errorf("container %q is in removing state", id)
}
defer func() {
if retErr != nil {
// Set container to exited if fail to start.
meta.Pid = 0
meta.FinishedAt = time.Now().UnixNano()
meta.ExitCode = errorStartExitCode
meta.Reason = errorStartReason
meta.Message = retErr.Error()
}
}()
// Get sandbox config from sandbox store.
sandboxMeta, err := c.getSandbox(meta.SandboxID)
if err != nil {
return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err)
}
sandboxConfig := sandboxMeta.Config
sandboxID := meta.SandboxID
// Make sure sandbox is running.
sandboxInfo, err := c.containerService.Info(ctx, &execution.InfoRequest{ID: sandboxID})
if err != nil {
return fmt.Errorf("failed to get sandbox container %q info: %v", sandboxID, err)
}
// This is only a best effort check, sandbox may still exit after this. If sandbox fails
// before starting the container, the start will fail.
if sandboxInfo.Status != container.Status_RUNNING {
return fmt.Errorf("sandbox container %q is not running", sandboxID)
}
sandboxPid := sandboxInfo.Pid
glog.V(2).Infof("Sandbox container %q is running with pid %d", sandboxID, sandboxPid)
// Generate containerd container create options.
// TODO(random-liu): [P0] Create container rootfs with image ref.
// TODO(random-liu): [P0] Apply default image config.
// Use fixed rootfs path for now.
const rootPath = "/"
spec, err := c.generateContainerSpec(id, sandboxPid, config, sandboxConfig)
if err != nil {
return fmt.Errorf("failed to generate container %q spec: %v", id, err)
}
rawSpec, err := json.Marshal(spec)
if err != nil {
return fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
}
glog.V(4).Infof("Container spec: %+v", spec)
containerRootDir := getContainerRootDir(c.rootDir, id)
stdin, stdout, stderr := getStreamingPipes(containerRootDir)
// Set stdin to empty if Stdin == false.
if !config.GetStdin() {
stdin = ""
}
stdinPipe, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, stdin, stdout, stderr)
if err != nil {
return fmt.Errorf("failed to prepare streaming pipes: %v", err)
}
defer func() {
if retErr != nil {
if stdinPipe != nil {
stdinPipe.Close()
}
stdoutPipe.Close()
stderrPipe.Close()
}
}()
// Redirect the stream to std for now.
// TODO(random-liu): [P1] Support container logging.
// TODO(random-liu): [P1] Support StdinOnce after container logging is added.
if stdinPipe != nil {
go func(w io.WriteCloser) {
io.Copy(w, os.Stdin) // nolint: errcheck
w.Close()
}(stdinPipe)
}
go func(r io.ReadCloser) {
io.Copy(os.Stdout, r) // nolint: errcheck
r.Close()
}(stdoutPipe)
// Only redirect stderr when there is no tty.
if !config.GetTty() {
go func(r io.ReadCloser) {
io.Copy(os.Stderr, r) // nolint: errcheck
r.Close()
}(stderrPipe)
}
// Create containerd container.
createOpts := &execution.CreateRequest{
ID: id,
Spec: &prototypes.Any{
TypeUrl: runtimespec.Version,
Value: rawSpec,
},
// TODO(random-liu): [P0] Get rootfs mount from containerd.
Rootfs: []*mount.Mount{
{
Type: "bind",
Source: rootPath,
Options: []string{
"rw",
"rbind",
},
},
},
Runtime: defaultRuntime,
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Terminal: config.GetTty(),
}
glog.V(2).Infof("Create containerd container (id=%q, name=%q) with options %+v.",
id, meta.Name, createOpts)
createResp, err := c.containerService.Create(ctx, createOpts)
if err != nil {
return fmt.Errorf("failed to create containerd container: %v", err)
}
defer func() {
if retErr != nil {
// Cleanup the containerd container if an error is returned.
if _, err := c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}); err != nil {
glog.Errorf("Failed to delete containerd container %q: %v", id, err)
}
}
}()
// Start containerd container.
if _, err := c.containerService.Start(ctx, &execution.StartRequest{ID: id}); err != nil {
return fmt.Errorf("failed to start containerd container %q: %v", id, err)
}
// Update container start timestamp.
meta.Pid = createResp.Pid
meta.StartedAt = time.Now().UnixNano()
return nil
}
func (c *criContainerdService) generateContainerSpec(id string, sandboxPid uint32, config *runtime.ContainerConfig, sandboxConfig *runtime.PodSandboxConfig) (*runtimespec.Spec, error) {
// Creates a spec Generator with the default spec.
// TODO(random-liu): [P2] Move container runtime spec generation into a helper function.
g := generate.New()
// Set the relative path to the rootfs of the container from containerd's
// pre-defined directory.
g.SetRootPath(relativeRootfsPath)
if len(config.GetCommand()) != 0 || len(config.GetArgs()) != 0 {
g.SetProcessArgs(append(config.GetCommand(), config.GetArgs()...))
}
if config.GetWorkingDir() != "" {
g.SetProcessCwd(config.GetWorkingDir())
}
for _, e := range config.GetEnvs() {
g.AddProcessEnv(e.GetKey(), e.GetValue())
}
addOCIBindMounts(&g, config.GetMounts())
// TODO(random-liu): [P1] Set device mapping.
// Ref https://github.com/moby/moby/blob/master/oci/devices_linux.go.
// TODO(random-liu): [P1] Handle container logging, decorate and redirect to file.
setOCILinuxResource(&g, config.GetLinux().GetResources())
if sandboxConfig.GetLinux().GetCgroupParent() != "" {
cgroupsPath := getCgroupsPath(sandboxConfig.GetLinux().GetCgroupParent(), id)
g.SetLinuxCgroupsPath(cgroupsPath)
}
g.SetProcessTerminal(config.GetTty())
securityContext := config.GetLinux().GetSecurityContext()
if err := setOCICapabilities(&g, securityContext.GetCapabilities()); err != nil {
return nil, fmt.Errorf("failed to set capabilities %+v: %v",
securityContext.GetCapabilities(), err)
}
// TODO(random-liu): [P0] Handle privileged.
// Set namespaces, share namespace with sandbox container.
setOCINamespaces(&g, securityContext.GetNamespaceOptions(), sandboxPid)
// TODO(random-liu): [P1] Set selinux options.
// TODO(random-liu): [P1] Set user/username.
supplementalGroups := securityContext.GetSupplementalGroups()
for _, group := range supplementalGroups {
g.AddProcessAdditionalGid(uint32(group))
}
g.SetRootReadonly(securityContext.GetReadonlyRootfs())
// TODO(random-liu): [P2] Add apparmor and seccomp.
// TODO(random-liu): [P1] Bind mount sandbox /dev/shm.
// TODO(random-liu): [P0] Bind mount sandbox resolv.conf.
return g.Spec(), nil
}
// addOCIBindMounts adds bind mounts.
func addOCIBindMounts(g *generate.Generator, mounts []*runtime.Mount) {
for _, mount := range mounts {
dst := mount.GetContainerPath()
src := mount.GetHostPath()
options := []string{"rw"}
if mount.GetReadonly() {
options = []string{"ro"}
}
// TODO(random-liu): [P1] Apply selinux label
g.AddBindMount(src, dst, options)
}
}
// setOCILinuxResource set container resource limit.
func setOCILinuxResource(g *generate.Generator, resources *runtime.LinuxContainerResources) {
if resources == nil {
return
}
g.SetLinuxResourcesCPUPeriod(uint64(resources.GetCpuPeriod()))
g.SetLinuxResourcesCPUQuota(resources.GetCpuQuota())
g.SetLinuxResourcesCPUShares(uint64(resources.GetCpuShares()))
g.SetLinuxResourcesMemoryLimit(uint64(resources.GetMemoryLimitInBytes()))
g.SetLinuxResourcesOOMScoreAdj(int(resources.GetOomScoreAdj()))
}
// setOCICapabilities adds/drops process capabilities.
func setOCICapabilities(g *generate.Generator, capabilities *runtime.Capability) error {
if capabilities == nil {
return nil
}
for _, c := range capabilities.GetAddCapabilities() {
if err := g.AddProcessCapability(c); err != nil {
return err
}
}
for _, c := range capabilities.GetDropCapabilities() {
if err := g.DropProcessCapability(c); err != nil {
return err
}
}
return nil
}
// setOCINamespaces sets namespaces.
func setOCINamespaces(g *generate.Generator, namespaces *runtime.NamespaceOption, sandboxPid uint32) {
g.AddOrReplaceLinuxNamespace(string(runtimespec.NetworkNamespace), getNetworkNamespace(sandboxPid)) // nolint: errcheck
g.AddOrReplaceLinuxNamespace(string(runtimespec.IPCNamespace), getIPCNamespace(sandboxPid)) // nolint: errcheck
g.AddOrReplaceLinuxNamespace(string(runtimespec.UTSNamespace), getUTSNamespace(sandboxPid)) // nolint: errcheck
g.AddOrReplaceLinuxNamespace(string(runtimespec.PIDNamespace), getPIDNamespace(sandboxPid)) // nolint: errcheck
}

View File

@ -17,14 +17,60 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// ContainerStatus inspects the container and returns the status.
func (c *criContainerdService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (*runtime.ContainerStatusResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (retRes *runtime.ContainerStatusResponse, retErr error) {
glog.V(4).Infof("ContainerStatus for container %q", r.GetContainerId())
defer func() {
if retErr == nil {
glog.V(4).Infof("ContainerStatus for %q returns status %+v", r.GetContainerId(), retRes.GetStatus())
}
}()
meta, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
}
return &runtime.ContainerStatusResponse{
Status: toCRIContainerStatus(meta),
}, nil
}
// toCRIContainerStatus converts container metadata to CRI container status.
func toCRIContainerStatus(meta *metadata.ContainerMetadata) *runtime.ContainerStatus {
state := meta.State()
reason := meta.Reason
if state == runtime.ContainerState_CONTAINER_EXITED && reason == "" {
if meta.ExitCode == 0 {
reason = completeExitReason
} else {
reason = errorExitReason
}
}
return &runtime.ContainerStatus{
Id: meta.ID,
Metadata: meta.Config.GetMetadata(),
State: state,
CreatedAt: meta.CreatedAt,
StartedAt: meta.StartedAt,
FinishedAt: meta.FinishedAt,
ExitCode: meta.ExitCode,
Image: meta.Config.GetImage(),
ImageRef: meta.ImageRef,
Reason: reason,
Message: meta.Message,
Labels: meta.Config.GetLabels(),
Annotations: meta.Config.GetAnnotations(),
Mounts: meta.Config.GetMounts(),
}
}

View File

@ -17,14 +17,115 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"time"
"github.com/golang/glog"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/api/services/execution"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
const (
// stopCheckPollInterval is the the interval to check whether a container
// is stopped successfully.
stopCheckPollInterval = 100 * time.Millisecond
// killContainerTimeout is the timeout that we wait for the container to
// be SIGKILLed.
killContainerTimeout = 2 * time.Minute
)
// StopContainer stops a running container with a grace period (i.e., timeout).
func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (retRes *runtime.StopContainerResponse, retErr error) {
glog.V(2).Infof("StopContainer for %q with timeout %d (s)", r.GetContainerId(), r.GetTimeout())
defer func() {
if retErr == nil {
glog.V(2).Infof("StopContainer %q returns successfully", r.GetContainerId())
}
}()
// Get container config from container store.
meta, err := c.containerStore.Get(r.GetContainerId())
if err != nil {
return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err)
}
id := r.GetContainerId()
// Return without error if container is not running. This makes sure that
// stop only takes real action after the container is started.
if meta.State() != runtime.ContainerState_CONTAINER_RUNNING {
glog.V(2).Infof("Container to stop %q is not running, current state %q",
id, criContainerStateToString(meta.State()))
return &runtime.StopContainerResponse{}, nil
}
// TODO(random-liu): [P1] Get stop signal from image config.
stopSignal := unix.SIGTERM
glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal)
_, err = c.containerService.Kill(ctx, &execution.KillRequest{ID: id, Signal: uint32(stopSignal)})
if err != nil {
if isContainerdContainerNotExistError(err) {
return &runtime.StopContainerResponse{}, nil
}
return nil, fmt.Errorf("failed to stop container %q: %v", id, err)
}
err = c.waitContainerStop(id, time.Duration(r.GetTimeout())*time.Second)
if err == nil {
return &runtime.StopContainerResponse{}, nil
}
glog.Errorf("Stop container %q timed out: %v", id, err)
glog.V(2).Infof("Delete container from containerd %q", id)
_, err = c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id})
if err != nil {
if isContainerdContainerNotExistError(err) {
return &runtime.StopContainerResponse{}, nil
}
return nil, fmt.Errorf("failed to delete container %q: %v", id, err)
}
// Wait forever until container stop is observed by event monitor.
if err := c.waitContainerStop(id, killContainerTimeout); err != nil {
return nil, fmt.Errorf("error occurs during waiting for container %q to stop: %v",
id, err)
}
return &runtime.StopContainerResponse{}, nil
}
// waitContainerStop polls container state until timeout exceeds or container is stopped.
func (c *criContainerdService) waitContainerStop(id string, timeout time.Duration) error {
ticker := time.NewTicker(stopCheckPollInterval)
defer ticker.Stop()
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()
for {
// Poll once before waiting for stopCheckPollInterval.
meta, err := c.containerStore.Get(id)
if err != nil {
if !metadata.IsNotExistError(err) {
return fmt.Errorf("failed to get container %q metadata: %v", id, err)
}
// Do not return error here because container was removed means
// it is already stopped.
glog.Warningf("Container %q was removed during stopping", id)
return nil
}
// TODO(random-liu): Use channel with event handler instead of polling.
if meta.State() == runtime.ContainerState_CONTAINER_EXITED {
return nil
}
select {
case <-timeoutTimer.C:
return fmt.Errorf("wait container %q stop timeout", id)
case <-ticker.C:
continue
}
}
}

94
pkg/server/events.go Normal file
View File

@ -0,0 +1,94 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/container"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// startEventMonitor starts an event monitor which monitors and handles all
// container events.
func (c *criContainerdService) startEventMonitor() error {
events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{})
if err != nil {
return err
}
go func() {
for {
c.handleEvent(events)
}
}()
return nil
}
// handleEvent receives an event from contaienrd and handles the event.
func (c *criContainerdService) handleEvent(events execution.ContainerService_EventsClient) {
e, err := events.Recv()
if err != nil {
glog.Errorf("Failed to receive event: %v", err)
return
}
glog.V(2).Infof("Received container event: %+v", e)
switch e.Type {
// If containerd-shim exits unexpectedly, there will be no corresponding event.
// However, containerd could not retrieve container state in that case, so it's
// fine to leave out that case for now.
// TODO(random-liu): [P2] Handle container-shim exit.
case container.Event_EXIT:
meta, err := c.containerStore.Get(e.ID)
if err != nil {
glog.Errorf("Failed to get container %q metadata: %v", e.ID, err)
return
}
if e.Pid != meta.Pid {
// Not init process dies, ignore the event.
return
}
// Delete the container from containerd.
_, err = c.containerService.Delete(context.Background(), &execution.DeleteRequest{ID: e.ID})
if err != nil && !isContainerdContainerNotExistError(err) {
// TODO(random-liu): [P0] Enqueue the event and retry.
glog.Errorf("Failed to delete container %q: %v", e.ID, err)
return
}
err = c.containerStore.Update(e.ID, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) {
// If FinishedAt has been set (e.g. with start failure), keep as
// it is.
if meta.FinishedAt != 0 {
return meta, nil
}
meta.Pid = 0
meta.FinishedAt = e.ExitedAt.UnixNano()
meta.ExitCode = int32(e.ExitStatus)
return meta, nil
})
if err != nil {
glog.Errorf("Failed to update container %q state: %v", e.ID, err)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
}
case container.Event_OOM:
// TODO(random-liu): [P1] Handle OOM event.
}
return
}

View File

@ -18,11 +18,14 @@ package server
import (
"fmt"
"io"
"path/filepath"
"strings"
"syscall"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/truncindex"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/containerd/containerd"
@ -32,6 +35,18 @@ import (
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
const (
// errorStartReason is the exit reason when fails to start container.
errorStartReason = "StartError"
// errorStartExitCode is the exit code when fails to start container.
// 128 is the same with Docker's behavior.
errorStartExitCode = 128
// completeExitReason is the exit reason when container exits with code 0.
completeExitReason = "Completed"
// errorExitReason is the exit reason when container exits with code non-zero.
errorExitReason = "Error"
)
const (
// relativeRootfsPath is the rootfs path relative to bundle path.
relativeRootfsPath = "rootfs"
@ -42,6 +57,8 @@ const (
// directory of the sandbox, all files created for the sandbox will be
// placed under this directory.
sandboxesDir = "sandboxes"
// containersDir contains all container root.
containersDir = "containers"
// stdinNamedPipe is the name of stdin named pipe.
stdinNamedPipe = "stdin"
// stdoutNamedPipe is the name of stdout named pipe.
@ -52,6 +69,12 @@ const (
nameDelimiter = "_"
// netNSFormat is the format of network namespace of a process.
netNSFormat = "/proc/%v/ns/net"
// ipcNSFormat is the format of ipc namespace of a process.
ipcNSFormat = "/proc/%v/ns/ipc"
// utsNSFormat is the format of uts namespace of a process.
utsNSFormat = "/proc/%v/ns/uts"
// pidNSFormat is the format of pid namespace of a process.
pidNSFormat = "/proc/%v/ns/pid"
)
// generateID generates a random unique id.
@ -70,6 +93,19 @@ func makeSandboxName(s *runtime.PodSandboxMetadata) string {
}, nameDelimiter)
}
// makeContainerName generates container name from sandbox and container metadata.
// The name generated is unique as long as the sandbox container combination is
// unique.
func makeContainerName(c *runtime.ContainerMetadata, s *runtime.PodSandboxMetadata) string {
return strings.Join([]string{
c.Name, // 0
s.Name, // 1: sandbox name
s.Namespace, // 2: sandbox namespace
s.Uid, // 3: sandbox uid
fmt.Sprintf("%d", c.Attempt), // 4
}, nameDelimiter)
}
// getCgroupsPath generates container cgroups path.
func getCgroupsPath(cgroupsParent string, id string) string {
// TODO(random-liu): [P0] Handle systemd.
@ -82,6 +118,11 @@ func getSandboxRootDir(rootDir, id string) string {
return filepath.Join(rootDir, sandboxesDir, id)
}
// getContainerRootDir returns the root directory for managing container files.
func getContainerRootDir(rootDir, id string) string {
return filepath.Join(rootDir, containersDir, id)
}
// getStreamingPipes returns the stdin/stdout/stderr pipes path in the root.
func getStreamingPipes(rootDir string) (string, string, string) {
stdin := filepath.Join(rootDir, stdinNamedPipe)
@ -90,11 +131,57 @@ func getStreamingPipes(rootDir string) (string, string, string) {
return stdin, stdout, stderr
}
// prepareStreamingPipes prepares stream named pipe for container. returns nil
// streaming handler if corresponding stream path is empty.
func (c *criContainerdService) prepareStreamingPipes(ctx context.Context, stdin, stdout, stderr string) (
i io.WriteCloser, o io.ReadCloser, e io.ReadCloser, retErr error) {
pipes := map[string]io.ReadWriteCloser{}
for t, stream := range map[string]struct {
path string
flag int
}{
"stdin": {stdin, syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK},
"stdout": {stdout, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK},
"stderr": {stderr, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK},
} {
if stream.path == "" {
continue
}
s, err := c.os.OpenFifo(ctx, stream.path, stream.flag, 0700)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to open named pipe %q: %v",
stream.path, err)
}
defer func(cl io.Closer) {
if retErr != nil {
cl.Close()
}
}(s)
pipes[t] = s
}
return pipes["stdin"], pipes["stdout"], pipes["stderr"], nil
}
// getNetworkNamespace returns the network namespace of a process.
func getNetworkNamespace(pid uint32) string {
return fmt.Sprintf(netNSFormat, pid)
}
// getIPCNamespace returns the ipc namespace of a process.
func getIPCNamespace(pid uint32) string {
return fmt.Sprintf(ipcNSFormat, pid)
}
// getUTSNamespace returns the uts namespace of a process.
func getUTSNamespace(pid uint32) string {
return fmt.Sprintf(utsNSFormat, pid)
}
// getPIDNamespace returns the pid namespace of a process.
func getPIDNamespace(pid uint32) string {
return fmt.Sprintf(pidNSFormat, pid)
}
// isContainerdContainerNotExistError checks whether a grpc error is containerd
// ErrContainerNotExist error.
// TODO(random-liu): Containerd should expose error better through api.
@ -124,3 +211,8 @@ func (c *criContainerdService) getSandbox(id string) (*metadata.SandboxMetadata,
}
return c.sandboxStore.Get(id)
}
// criContainerStateToString formats CRI container state to string.
func criContainerStateToString(state runtime.ContainerState) string {
return runtime.ContainerState_name[int32(state)]
}

View File

@ -117,7 +117,8 @@ func (c *criContainerdService) filterCRISandboxes(sandboxes []*runtime.PodSandbo
if filter.GetLabelSelector() != nil {
match := true
for k, v := range filter.GetLabelSelector() {
if s.Labels[k] != v {
got, ok := s.Labels[k]
if !ok || got != v {
match = false
break
}

View File

@ -35,7 +35,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
glog.V(2).Infof("RemovePodSandbox for sandbox %q", r.GetPodSandboxId())
defer func() {
if retErr == nil {
glog.V(2).Info("RemovePodSandbox returns successfully")
glog.V(2).Info("RemovePodSandbox %q returns successfully", r.GetPodSandboxId())
}
}()
@ -56,6 +56,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
// TODO(random-liu): [P2] Remove all containers in the sandbox.
// Return error if sandbox container is not fully stopped.
// TODO(random-liu): [P0] Make sure network is torn down, may need to introduce a state.
_, err = c.containerService.Info(ctx, &execution.InfoRequest{ID: id})
if err != nil && !isContainerdContainerNotExistError(err) {
return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err)

View File

@ -21,7 +21,6 @@ import (
"fmt"
"io"
"io/ioutil"
"syscall"
"time"
prototypes "github.com/gogo/protobuf/types"
@ -109,14 +108,14 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
// TODO(random-liu): [P1] Moving following logging related logic into util functions.
// Discard sandbox container output because we don't care about it.
_, stdout, stderr := getStreamingPipes(sandboxRootDir)
for _, p := range []string{stdout, stderr} {
f, err := c.os.OpenFifo(ctx, p, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, fmt.Errorf("failed to open named pipe %q: %v", p, err)
}
defer func(c io.Closer) {
_, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr)
if err != nil {
return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err)
}
for _, f := range []io.ReadCloser{stdoutPipe, stderrPipe} {
defer func(cl io.Closer) {
if retErr != nil {
c.Close()
cl.Close()
}
}(f)
go func(r io.ReadCloser) {

View File

@ -35,7 +35,7 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.
glog.V(4).Infof("PodSandboxStatus for sandbox %q", r.GetPodSandboxId())
defer func() {
if retErr == nil {
glog.V(4).Infof("PodSandboxStatus returns status %+v", retRes.GetStatus())
glog.V(4).Infof("PodSandboxStatus for %q returns status %+v", r.GetPodSandboxId(), retRes.GetStatus())
}
}()

View File

@ -33,7 +33,7 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St
glog.V(2).Infof("StopPodSandbox for sandbox %q", r.GetPodSandboxId())
defer func() {
if retErr == nil {
glog.V(2).Info("StopPodSandbox returns successfully")
glog.V(2).Info("StopPodSandbox %q returns successfully", r.GetPodSandboxId())
}
}()

View File

@ -53,6 +53,7 @@ import (
// CRIContainerdService is the interface implement CRI remote service server.
type CRIContainerdService interface {
Start() error
runtime.RuntimeServiceServer
runtime.ImageServiceServer
}
@ -74,6 +75,11 @@ type criContainerdService struct {
// id "abcdefg" is added, we could use "abcd" to identify the same thing
// as long as there is no ambiguity.
sandboxIDIndex *truncindex.TruncIndex
// containerStore stores all container metadata.
containerStore metadata.ContainerStore
// containerNameIndex stores all container names and make sure each
// name is unique.
containerNameIndex *registrar.Registrar
// containerService is containerd container service client.
containerService execution.ContainerServiceClient
// contentIngester is the containerd service to ingest content into
@ -98,14 +104,22 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir string) CRIContainer
os: osinterface.RealOS{},
rootDir: rootDir,
sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()),
containerStore: metadata.NewContainerStore(store.NewMetadataStore()),
imageMetadataStore: metadata.NewImageMetadataStore(store.NewMetadataStore()),
// TODO(random-liu): Register sandbox id/name for recovered sandbox.
sandboxNameIndex: registrar.NewRegistrar(),
sandboxIDIndex: truncindex.NewTruncIndex(nil),
containerService: execution.NewContainerServiceClient(conn),
imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)),
contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)),
contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)),
rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)),
// TODO(random-liu): Register sandbox/container id/name for recovered sandbox/container.
// TODO(random-liu): Use the same name and id index for both container and sandbox.
sandboxNameIndex: registrar.NewRegistrar(),
sandboxIDIndex: truncindex.NewTruncIndex(nil),
// TODO(random-liu): Add container id index.
containerNameIndex: registrar.NewRegistrar(),
containerService: execution.NewContainerServiceClient(conn),
imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)),
contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)),
contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)),
rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)),
}
}
func (c *criContainerdService) Start() error {
return c.startEventMonitor()
}