Add initial sandbox management implementation

Signed-off-by: Random-Liu <lantaol@google.com>
This commit is contained in:
Random-Liu 2017-05-12 13:14:11 -07:00
parent 507eff04b3
commit bf28c7fc75
13 changed files with 842 additions and 51 deletions

View File

@ -32,20 +32,20 @@ func main() {
o.AddFlags(pflag.CommandLine)
options.InitFlags()
if o.CRIContainerdVersion {
if o.PrintVersion {
version.PrintVersion()
os.Exit(0)
}
glog.V(2).Infof("Connect to containerd socket %q with timeout %v", o.ContainerdSocketPath, o.ContainerdConnectionTimeout)
conn, err := server.ConnectToContainerd(o.ContainerdSocketPath, o.ContainerdConnectionTimeout)
glog.V(2).Infof("Connect to containerd endpoint %q with timeout %v", o.ContainerdEndpoint, o.ContainerdConnectionTimeout)
conn, err := server.ConnectToContainerd(o.ContainerdEndpoint, o.ContainerdConnectionTimeout)
if err != nil {
glog.Exitf("Failed to connect containerd socket %q: %v", o.ContainerdSocketPath, err)
glog.Exitf("Failed to connect containerd endpoint %q: %v", o.ContainerdEndpoint, err)
}
glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.CRIContainerdSocketPath)
service := server.NewCRIContainerdService(conn)
s := server.NewCRIContainerdServer(o.CRIContainerdSocketPath, service, service)
glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath)
service := server.NewCRIContainerdService(conn, o.RootDir)
s := server.NewCRIContainerdServer(o.SocketPath, service, service)
if err := s.Run(); err != nil {
glog.Exitf("Failed to run cri-containerd grpc server: %v", err)
}

View File

@ -25,12 +25,15 @@ import (
// CRIContainerdOptions contains cri-containerd command line options.
type CRIContainerdOptions struct {
// CRIContainerdSocketPath is the path to the socket which cri-containerd serves on.
CRIContainerdSocketPath string
// CRIContainerdVersion is the git release version of cri-containerd
CRIContainerdVersion bool
// ContainerdSocketPath is the path to the containerd socket.
ContainerdSocketPath string
// SocketPath is the path to the socket which cri-containerd serves on.
SocketPath string
// RootDir is the root directory path for managing cri-containerd files
// (metadata checkpoint etc.)
RootDir string
// PrintVersion indicates to print version information of cri-containerd.
PrintVersion bool
// ContainerdEndpoint is the containerd endpoint path.
ContainerdEndpoint string
// ContainerdConnectionTimeout is the connection timeout for containerd client.
ContainerdConnectionTimeout time.Duration
}
@ -42,13 +45,15 @@ func NewCRIContainerdOptions() *CRIContainerdOptions {
// AddFlags adds cri-containerd command line options to pflag.
func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&c.CRIContainerdSocketPath, "cri-containerd-socket",
fs.StringVar(&c.SocketPath, "socket-path",
"/var/run/cri-containerd.sock", "Path to the socket which cri-containerd serves on.")
fs.StringVar(&c.ContainerdSocketPath, "containerd-socket",
"/run/containerd/containerd.sock", "Path to the containerd socket.")
fs.StringVar(&c.RootDir, "root-dir",
"/var/lib/cri-containerd", "Root directory path for cri-containerd managed files (metadata checkpoint etc).")
fs.StringVar(&c.ContainerdEndpoint, "containerd-endpoint",
"/run/containerd/containerd.sock", "Path to the containerd endpoint.")
fs.DurationVar(&c.ContainerdConnectionTimeout, "containerd-connection-timeout",
2*time.Minute, "Connection timeout for containerd client.")
fs.BoolVar(&c.CRIContainerdVersion, "version",
fs.BoolVar(&c.PrintVersion, "version",
false, "Print cri-containerd version information and quit.")
}

View File

@ -17,7 +17,12 @@ limitations under the License.
package os
import (
"io"
"os"
"golang.org/x/net/context"
"github.com/tonistiigi/fifo"
)
// OS collects system level operations that need to be mocked out
@ -25,6 +30,7 @@ import (
type OS interface {
MkdirAll(path string, perm os.FileMode) error
RemoveAll(path string) error
OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error)
}
// RealOS is used to dispatch the real system level operations.
@ -39,3 +45,8 @@ func (RealOS) MkdirAll(path string, perm os.FileMode) error {
func (RealOS) RemoveAll(path string) error {
return os.RemoveAll(path)
}
// OpenFifo will call fifo.OpenFifo to open a fifo.
func (RealOS) OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
return fifo.OpenFifo(ctx, fn, flag, perm)
}

View File

@ -17,8 +17,11 @@ limitations under the License.
package testing
import (
"io"
"os"
"golang.org/x/net/context"
osInterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
)
@ -28,6 +31,7 @@ import (
type FakeOS struct {
MkdirAllFn func(string, os.FileMode) error
RemoveAllFn func(string) error
OpenFifoFn func(context.Context, string, int, os.FileMode) (io.ReadWriteCloser, error)
}
var _ osInterface.OS = &FakeOS{}
@ -52,3 +56,11 @@ func (f *FakeOS) RemoveAll(path string) error {
}
return nil
}
// OpenFifo is a fake call that invokes OpenFifoFn or just returns nil.
func (f *FakeOS) OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
if f.OpenFifoFn != nil {
return f.OpenFifoFn(ctx, fn, flag, perm)
}
return nil, nil
}

101
pkg/registrar/registrar.go Normal file
View File

@ -0,0 +1,101 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registrar
import (
"fmt"
"sync"
)
// Registrar stores one-to-one name<->key mappings.
// Names and keys must be unique.
// Registrar is safe for concurrent access.
type Registrar struct {
lock sync.Mutex
nameToKey map[string]string
keyToName map[string]string
}
// NewRegistrar creates a new Registrar with the empty indexes.
func NewRegistrar() *Registrar {
return &Registrar{
nameToKey: make(map[string]string),
keyToName: make(map[string]string),
}
}
// Reserve registers a name<->key mapping, name or key must not
// be empty.
// Reserve is idempotent.
// Attempting to reserve a conflict key<->name mapping results
// in an error.
// A name<->key reservation is globally unique.
func (r *Registrar) Reserve(name, key string) error {
r.lock.Lock()
defer r.lock.Unlock()
if name == "" || key == "" {
return fmt.Errorf("invalid name %q or key %q", name, key)
}
if k, exists := r.nameToKey[name]; exists {
if k != key {
return fmt.Errorf("name %q is reserved for %q", name, k)
}
return nil
}
if n, exists := r.keyToName[key]; exists {
if n != name {
return fmt.Errorf("key %q is reserved for %q", key, n)
}
return nil
}
r.nameToKey[name] = key
r.keyToName[key] = name
return nil
}
// ReleaseByName releases the reserved name<->key mapping by name.
// Once released, the name and the key can be reserved again.
func (r *Registrar) ReleaseByName(name string) {
r.lock.Lock()
defer r.lock.Unlock()
key, exists := r.nameToKey[name]
if !exists {
return
}
delete(r.nameToKey, name)
delete(r.keyToName, key)
}
// ReleaseByKey release the reserved name<->key mapping by key.
func (r *Registrar) ReleaseByKey(key string) {
r.lock.Lock()
defer r.lock.Unlock()
name, exists := r.keyToName[key]
if !exists {
return
}
delete(r.nameToKey, name)
delete(r.keyToName, key)
}

125
pkg/server/helpers.go Normal file
View File

@ -0,0 +1,125 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"fmt"
"path/filepath"
"strings"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/pkg/truncindex"
"google.golang.org/grpc"
"github.com/containerd/containerd"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
const (
// relativeRootfsPath is the rootfs path relative to bundle path.
relativeRootfsPath = "rootfs"
// defaultRuntime is the runtime to use in containerd. We may support
// other runtime in the future.
defaultRuntime = "linux"
// sandboxesDir contains all sandbox root. A sandbox root is the running
// directory of the sandbox, all files created for the sandbox will be
// placed under this directory.
sandboxesDir = "sandboxes"
// stdinNamedPipe is the name of stdin named pipe.
stdinNamedPipe = "stdin"
// stdoutNamedPipe is the name of stdout named pipe.
stdoutNamedPipe = "stdout"
// stderrNamedPipe is the name of stderr named pipe.
stderrNamedPipe = "stderr"
// Delimiter used to construct container/sandbox names.
nameDelimiter = "_"
// netNSFormat is the format of network namespace of a process.
netNSFormat = "/proc/%v/ns/net"
)
// generateID generates a random unique id.
func generateID() string {
return stringid.GenerateNonCryptoID()
}
// makeSandboxName generates sandbox name from sandbox metadata. The name
// generated is unique as long as sandbox metadata is unique.
func makeSandboxName(s *runtime.PodSandboxMetadata) string {
return strings.Join([]string{
s.Name, // 0
s.Namespace, // 1
s.Uid, // 2
fmt.Sprintf("%d", s.Attempt), // 3
}, nameDelimiter)
}
// getCgroupsPath generates container cgroups path.
func getCgroupsPath(cgroupsParent string, id string) string {
// TODO(random-liu): [P0] Handle systemd.
return filepath.Join(cgroupsParent, id)
}
// getSandboxRootDir returns the root directory for managing sandbox files,
// e.g. named pipes.
func getSandboxRootDir(rootDir, id string) string {
return filepath.Join(rootDir, sandboxesDir, id)
}
// getStreamingPipes returns the stdin/stdout/stderr pipes path in the root.
func getStreamingPipes(rootDir string) (string, string, string) {
stdin := filepath.Join(rootDir, stdinNamedPipe)
stdout := filepath.Join(rootDir, stdoutNamedPipe)
stderr := filepath.Join(rootDir, stderrNamedPipe)
return stdin, stdout, stderr
}
// getNetworkNamespace returns the network namespace of a process.
func getNetworkNamespace(pid uint32) string {
return fmt.Sprintf(netNSFormat, pid)
}
// isContainerdContainerNotExistError checks whether a grpc error is containerd
// ErrContainerNotExist error.
// TODO(random-liu): Containerd should expose error better through api.
func isContainerdContainerNotExistError(grpcError error) bool {
return grpc.ErrorDesc(grpcError) == containerd.ErrContainerNotExist.Error()
}
// getSandbox gets the sandbox metadata from the sandbox store. It returns nil without
// error if the sandbox metadata is not found. It also tries to get full sandbox id and
// retry if the sandbox metadata is not found with the initial id.
func (c *criContainerdService) getSandbox(id string) (*metadata.SandboxMetadata, error) {
sandbox, err := c.sandboxStore.Get(id)
if err != nil {
return nil, fmt.Errorf("sandbox metadata not found: %v", err)
}
if sandbox != nil {
return sandbox, nil
}
// sandbox is not found in metadata store, try to extract full id.
id, err = c.sandboxIDIndex.Get(id)
if err != nil {
if err == truncindex.ErrNotExist {
return nil, nil
}
return nil, fmt.Errorf("sandbox id not found: %v", err)
}
return c.sandboxStore.Get(id)
}

View File

@ -94,7 +94,7 @@ func (c *criContainerdService) imageReferenceResolver(ctx context.Context, ref s
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err)
}
err = c.imageStore.Put(ctx, resolvedImageName, desc)
err = c.imageStoreService.Put(ctx, resolvedImageName, desc)
if err != nil {
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err)
}
@ -107,7 +107,7 @@ func (c *criContainerdService) imageReferenceResolver(ctx context.Context, ref s
return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err)
}
image, err := c.imageStore.Get(ctx, resolvedImageName)
image, err := c.imageStoreService.Get(ctx, resolvedImageName)
if err != nil {
return resolvedImageName, manifest, compressedSize,
fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err)
@ -150,7 +150,7 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (image
return desc, size, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err)
}
err = c.imageStore.Put(ctx, resolvedImageName, desc)
err = c.imageStoreService.Put(ctx, resolvedImageName, desc)
if err != nil {
return desc, size, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err)
}
@ -165,7 +165,7 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (image
return desc, size, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err)
}
image, err := c.imageStore.Get(ctx, resolvedImageName)
image, err := c.imageStoreService.Get(ctx, resolvedImageName)
if err != nil {
return desc, size,
fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err)

View File

@ -17,14 +17,117 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/container"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// ListPodSandbox returns a list of Sandbox.
func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (*runtime.ListPodSandboxResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (retRes *runtime.ListPodSandboxResponse, retErr error) {
glog.V(4).Infof("ListPodSandbox with filter %+v", r.GetFilter())
defer func() {
if retErr == nil {
glog.V(4).Infof("ListPodSandbox returns sandboxes %+v", retRes.GetItems())
}
}()
// List all sandbox metadata from store.
sandboxesInStore, err := c.sandboxStore.List()
if err != nil {
return nil, fmt.Errorf("failed to list metadata from sandbox store: %v", err)
}
resp, err := c.containerService.List(ctx, &execution.ListRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list sandbox containers: %v", err)
}
sandboxesInContainerd := resp.Containers
var sandboxes []*runtime.PodSandbox
for _, sandboxInStore := range sandboxesInStore {
var sandboxInContainerd *container.Container
for _, s := range sandboxesInContainerd {
if s.ID == sandboxInStore.ID {
sandboxInContainerd = s
break
}
}
// Set sandbox state to NOTREADY by default.
state := runtime.PodSandboxState_SANDBOX_NOTREADY
// If the sandbox container is running, return the sandbox as READY.
if sandboxInContainerd != nil && sandboxInContainerd.Status == container.Status_RUNNING {
state = runtime.PodSandboxState_SANDBOX_READY
}
sandboxes = append(sandboxes, toCRISandbox(sandboxInStore, state))
}
sandboxes = c.filterCRISandboxes(sandboxes, r.GetFilter())
return &runtime.ListPodSandboxResponse{Items: sandboxes}, nil
}
// toCRISandbox converts sandbox metadata into CRI pod sandbox.
func toCRISandbox(meta *metadata.SandboxMetadata, state runtime.PodSandboxState) *runtime.PodSandbox {
return &runtime.PodSandbox{
Id: meta.ID,
Metadata: meta.Config.GetMetadata(),
State: state,
CreatedAt: meta.CreatedAt,
Labels: meta.Config.GetLabels(),
Annotations: meta.Config.GetAnnotations(),
}
}
// filterCRISandboxes filters CRISandboxes.
func (c *criContainerdService) filterCRISandboxes(sandboxes []*runtime.PodSandbox, filter *runtime.PodSandboxFilter) []*runtime.PodSandbox {
if filter == nil {
return sandboxes
}
var filterID string
if filter.GetId() != "" {
// Handle truncate id. Use original filter if failed to convert.
var err error
filterID, err = c.sandboxIDIndex.Get(filter.GetId())
if err != nil {
filterID = filter.GetId()
}
}
filtered := []*runtime.PodSandbox{}
for _, s := range sandboxes {
// Filter by id
if filterID != "" && filterID != s.Id {
continue
}
// Filter by state
if filter.GetState() != nil && filter.GetState().GetState() != s.State {
continue
}
// Filter by label
if filter.GetLabelSelector() != nil {
match := true
for k, v := range filter.GetLabelSelector() {
if s.Labels[k] != v {
match = false
break
}
}
if !match {
continue
}
}
filtered = append(filtered, s)
}
return filtered
}

View File

@ -17,15 +17,75 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/execution"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
// RemovePodSandbox removes the sandbox. If there are running containers in the
// sandbox, they should be forcibly removed.
func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (*runtime.RemovePodSandboxResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (retRes *runtime.RemovePodSandboxResponse, retErr error) {
glog.V(2).Infof("RemovePodSandbox for sandbox %q", r.GetPodSandboxId())
defer func() {
if retErr == nil {
glog.V(2).Info("RemovePodSandbox returns successfully")
}
}()
sandbox, err := c.getSandbox(r.GetPodSandboxId())
if err != nil {
return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err)
}
if sandbox == nil {
// Do not return error if the id doesn't exist.
glog.V(5).Infof("RemovePodSandbox called for sandbox %q that does not exist",
r.GetPodSandboxId())
return &runtime.RemovePodSandboxResponse{}, nil
}
// Use the full sandbox id.
id := sandbox.ID
// TODO(random-liu): [P2] Remove all containers in the sandbox.
// Return error if sandbox container is not fully stopped.
_, err = c.containerService.Info(ctx, &execution.InfoRequest{ID: id})
if err != nil && !isContainerdContainerNotExistError(err) {
return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err)
}
if err == nil {
return nil, fmt.Errorf("sandbox container %q is not fully stopped", id)
}
// TODO(random-liu): [P0] Cleanup shm created in RunPodSandbox.
// TODO(random-liu): [P1] Remove permanent namespace once used.
// Cleanup the sandbox root directory.
sandboxRootDir := getSandboxRootDir(c.rootDir, id)
if err := c.os.RemoveAll(sandboxRootDir); err != nil {
return nil, fmt.Errorf("failed to remove sandbox root directory %q: %v",
sandboxRootDir, err)
}
// Remove sandbox metadata from metadata store. Note that once the sandbox
// metadata is successfully deleted:
// 1) ListPodSandbox will not include this sandbox.
// 2) PodSandboxStatus and StopPodSandbox will return error.
// 3) On-going operations which have held the metadata reference will not be
// affected.
if err := c.sandboxStore.Delete(id); err != nil {
return nil, fmt.Errorf("failed to delete sandbox metadata for %q: %v", id, err)
}
// Release the sandbox id from id index.
c.sandboxIDIndex.Delete(id) // nolint: errcheck
// Release the sandbox name reserved for the sandbox.
c.sandboxNameIndex.ReleaseByKey(id)
return &runtime.RemovePodSandboxResponse{}, nil
}

View File

@ -17,15 +17,251 @@ limitations under the License.
package server
import (
"errors"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"syscall"
"time"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/glog"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/opencontainers/runtime-tools/generate"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/mount"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
// the sandbox is in ready state.
func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (*runtime.RunPodSandboxResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (retRes *runtime.RunPodSandboxResponse, retErr error) {
glog.V(2).Infof("RunPodSandbox with config %+v", r.GetConfig())
defer func() {
if retErr == nil {
glog.V(2).Infof("RunPodSandbox returns sandbox id %q", retRes.GetPodSandboxId())
}
}()
config := r.GetConfig()
// Generate unique id and name for the sandbox and reserve the name.
id := generateID()
name := makeSandboxName(config.GetMetadata())
// Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the
// same sandbox.
if err := c.sandboxNameIndex.Reserve(name, id); err != nil {
return nil, fmt.Errorf("failed to reserve sandbox name %q: %v", name, err)
}
defer func() {
// Release the name if the function returns with an error.
if retErr != nil {
c.sandboxNameIndex.ReleaseByName(name)
}
}()
// Register the sandbox id.
if err := c.sandboxIDIndex.Add(id); err != nil {
return nil, fmt.Errorf("failed to insert sandbox id %q: %v", id, err)
}
defer func() {
// Delete the sandbox id if the function returns with an error.
if retErr != nil {
c.sandboxIDIndex.Delete(id) // nolint: errcheck
}
}()
// Create initial sandbox metadata.
meta := metadata.SandboxMetadata{
ID: id,
Name: name,
Config: config,
}
// TODO(random-liu): [P0] Ensure pause image snapshot, apply default image config
// and get snapshot mounts.
// Use fixed rootfs path and sleep command.
const rootPath = "/"
// TODO(random-liu): [P0] Set up sandbox network with network plugin.
// Create sandbox container root directory.
// Prepare streaming named pipe.
sandboxRootDir := getSandboxRootDir(c.rootDir, id)
if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create sandbox root directory %q: %v",
sandboxRootDir, err)
}
defer func() {
if retErr != nil {
// Cleanup the sandbox root directory.
if err := c.os.RemoveAll(sandboxRootDir); err != nil {
glog.Errorf("Failed to remove sandbox root directory %q: %v",
sandboxRootDir, err)
}
}
}()
// TODO(random-liu): [P1] Moving following logging related logic into util functions.
// Discard sandbox container output because we don't care about it.
_, stdout, stderr := getStreamingPipes(sandboxRootDir)
for _, p := range []string{stdout, stderr} {
f, err := c.os.OpenFifo(ctx, p, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700)
if err != nil {
return nil, fmt.Errorf("failed to open named pipe %q: %v", p, err)
}
defer func(c io.Closer) {
if retErr != nil {
c.Close()
}
}(f)
go func(r io.ReadCloser) {
// Discard the output for now.
io.Copy(ioutil.Discard, r) // nolint: errcheck
r.Close()
}(f)
}
// Start sandbox container.
spec := c.generateSandboxContainerSpec(id, config)
rawSpec, err := json.Marshal(spec)
if err != nil {
return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err)
}
glog.V(4).Infof("Sandbox container spec: %+v", spec)
createOpts := &execution.CreateRequest{
ID: id,
Spec: &prototypes.Any{
TypeUrl: runtimespec.Version,
Value: rawSpec,
},
// TODO(random-liu): [P0] Get rootfs mount from containerd.
Rootfs: []*mount.Mount{
{
Type: "bind",
Source: rootPath,
Options: []string{
"rw",
"rbind",
},
},
},
Runtime: defaultRuntime,
// No stdin for sandbox container.
Stdout: stdout,
Stderr: stderr,
}
// Create sandbox container in containerd.
glog.V(5).Infof("Create sandbox container (id=%q, name=%q) with options %+v.",
id, name, createOpts)
createResp, err := c.containerService.Create(ctx, createOpts)
if err != nil {
return nil, fmt.Errorf("failed to create sandbox container %q: %v",
id, err)
}
defer func() {
if retErr != nil {
// Cleanup the sandbox container if an error is returned.
if _, err := c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}); err != nil {
glog.Errorf("Failed to delete sandbox container %q: %v",
id, err)
}
}
}()
// Start sandbox container in containerd.
if _, err := c.containerService.Start(ctx, &execution.StartRequest{ID: id}); err != nil {
return nil, fmt.Errorf("failed to start sandbox container %q: %v",
id, err)
}
// Add sandbox into sandbox store.
meta.CreatedAt = time.Now().UnixNano()
// TODO(random-liu): [P2] Replace with permanent network namespace.
meta.NetNS = getNetworkNamespace(createResp.Pid)
if err := c.sandboxStore.Create(meta); err != nil {
return nil, fmt.Errorf("failed to add sandbox metadata %+v into store: %v",
meta, err)
}
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
}
func (c *criContainerdService) generateSandboxContainerSpec(id string, config *runtime.PodSandboxConfig) *runtimespec.Spec {
// TODO(random-liu): [P0] Get command from image config.
pauseCommand := []string{"sh", "-c", "while true; do sleep 1000000000; done"}
// Creates a spec Generator with the default spec.
// TODO(random-liu): [P1] Compare the default settings with docker and containerd default.
g := generate.New()
// Set relative root path.
g.SetRootPath(relativeRootfsPath)
// Set process commands.
g.SetProcessArgs(pauseCommand)
// Make root of sandbox container read-only.
g.SetRootReadonly(true)
// Set hostname.
g.SetHostname(config.GetHostname())
// TODO(random-liu): [P0] Set DNS options. Maintain a resolv.conf for the sandbox.
// TODO(random-liu): [P0] Add NamespaceGetter and PortMappingGetter to initialize network plugin.
// TODO(random-liu): [P0] Add annotation to identify the container is managed by cri-containerd.
// TODO(random-liu): [P2] Consider whether to add labels and annotations to the container.
// Set cgroups parent.
if config.GetLinux().GetCgroupParent() != "" {
cgroupsPath := getCgroupsPath(config.GetLinux().GetCgroupParent(), id)
g.SetLinuxCgroupsPath(cgroupsPath)
}
// When cgroup parent is not set, containerd-shim will create container in a child cgroup
// of the cgroup itself is in.
// TODO(random-liu): [P2] Set default cgroup path if cgroup parent is not specified.
// Set namespace options.
nsOptions := config.GetLinux().GetSecurityContext().GetNamespaceOptions()
// TODO(random-liu): [P1] Create permanent network namespace, so that we could still cleanup
// network namespace after sandbox container dies unexpectedly.
// By default, all namespaces are enabled for the container, runc will create a new namespace
// for it. By removing the namespace, the container will inherit the namespace of the runtime.
if nsOptions.GetHostNetwork() {
g.RemoveLinuxNamespace(string(runtimespec.NetworkNamespace)) // nolint: errcheck
// TODO(random-liu): [P1] Figure out how to handle UTS namespace.
}
if nsOptions.GetHostPid() {
g.RemoveLinuxNamespace(string(runtimespec.PIDNamespace)) // nolint: errcheck
}
// TODO(random-liu): [P0] Deal with /dev/shm. Use host for HostIpc, and create and mount for
// non-HostIpc. What about mqueue?
if nsOptions.GetHostIpc() {
g.RemoveLinuxNamespace(string(runtimespec.IPCNamespace)) // nolint: errcheck
}
// TODO(random-liu): [P1] Apply SeLinux options.
// TODO(random-liu): [P1] Set user.
// TODO(random-liu): [P1] Set supplemental group.
// TODO(random-liu): [P1] Set privileged.
// TODO(random-liu): [P2] Set sysctl from annotations.
// TODO(random-liu): [P2] Set apparmor and seccomp from annotations.
// TODO(random-liu): [P1] Set default sandbox container resource limit.
return g.Spec()
}

View File

@ -17,14 +17,85 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/execution"
"github.com/containerd/containerd/api/types/container"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
)
// PodSandboxStatus returns the status of the PodSandbox.
func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (*runtime.PodSandboxStatusResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (retRes *runtime.PodSandboxStatusResponse, retErr error) {
glog.V(4).Infof("PodSandboxStatus for sandbox %q", r.GetPodSandboxId())
defer func() {
if retErr == nil {
glog.V(4).Infof("PodSandboxStatus returns status %+v", retRes.GetStatus())
}
}()
sandbox, err := c.getSandbox(r.GetPodSandboxId())
if err != nil {
return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err)
}
if sandbox == nil {
return nil, fmt.Errorf("sandbox %q does not exist", r.GetPodSandboxId())
}
// Use the full sandbox id.
id := sandbox.ID
info, err := c.containerService.Info(ctx, &execution.InfoRequest{ID: id})
if err != nil && !isContainerdContainerNotExistError(err) {
return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err)
}
// Set sandbox state to NOTREADY by default.
state := runtime.PodSandboxState_SANDBOX_NOTREADY
// If the sandbox container is running, treat it as READY.
if info != nil && info.Status == container.Status_RUNNING {
state = runtime.PodSandboxState_SANDBOX_READY
}
return &runtime.PodSandboxStatusResponse{Status: toCRISandboxStatus(sandbox, state)}, nil
}
// toCRISandboxStatus converts sandbox metadata into CRI pod sandbox status.
func toCRISandboxStatus(meta *metadata.SandboxMetadata, state runtime.PodSandboxState) *runtime.PodSandboxStatus {
nsOpts := meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions()
netNS := meta.NetNS
if state == runtime.PodSandboxState_SANDBOX_NOTREADY {
// Return empty network namespace when sandbox is not ready.
// For kubenet, when sandbox is not running, both empty
// network namespace and a valid permanent network namespace
// work. Go with the first option here because it's the current
// behavior in Kubernetes.
netNS = ""
}
return &runtime.PodSandboxStatus{
Id: meta.ID,
Metadata: meta.Config.GetMetadata(),
State: state,
CreatedAt: meta.CreatedAt,
// TODO(random-liu): [P0] Get sandbox ip from network plugin.
Network: &runtime.PodSandboxNetworkStatus{},
Linux: &runtime.LinuxPodSandboxStatus{
Namespaces: &runtime.Namespace{
// TODO(random-liu): Revendor new CRI version and get
// rid of this field.
Network: netNS,
Options: &runtime.NamespaceOption{
HostNetwork: nsOpts.GetHostNetwork(),
HostPid: nsOpts.GetHostPid(),
HostIpc: nsOpts.GetHostIpc(),
},
},
},
Labels: meta.Config.GetLabels(),
Annotations: meta.Config.GetAnnotations(),
}
}

View File

@ -17,15 +17,44 @@ limitations under the License.
package server
import (
"errors"
"fmt"
"github.com/golang/glog"
"golang.org/x/net/context"
"github.com/containerd/containerd/api/services/execution"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be forcibly terminated.
func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) {
return nil, errors.New("not implemented")
func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (retRes *runtime.StopPodSandboxResponse, retErr error) {
glog.V(2).Infof("StopPodSandbox for sandbox %q", r.GetPodSandboxId())
defer func() {
if retErr == nil {
glog.V(2).Info("StopPodSandbox returns successfully")
}
}()
sandbox, err := c.getSandbox(r.GetPodSandboxId())
if err != nil {
return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err)
}
if sandbox == nil {
return nil, fmt.Errorf("sandbox %q does not exist", r.GetPodSandboxId())
}
// Use the full sandbox id.
id := sandbox.ID
// TODO(random-liu): [P1] Handle sandbox container graceful deletion.
// Delete the sandbox container from containerd.
_, err = c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id})
if err != nil && !isContainerdContainerNotExistError(err) {
return nil, fmt.Errorf("failed to delete sandbox container %q: %v", id, err)
}
// TODO(random-liu): [P0] Call network plugin to teardown network.
// TODO(random-liu): [P2] Stop all containers inside the sandbox.
return &runtime.StopPodSandboxResponse{}, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"github.com/docker/docker/pkg/truncindex"
"google.golang.org/grpc"
contentapi "github.com/containerd/containerd/api/services/content"
@ -29,13 +30,20 @@ import (
contentservice "github.com/containerd/containerd/services/content"
imagesservice "github.com/containerd/containerd/services/images"
rootfsservice "github.com/containerd/containerd/services/rootfs"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata"
"github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store"
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os"
"github.com/kubernetes-incubator/cri-containerd/pkg/registrar"
// TODO remove the underscores from the following imports as the services are
// implemented. "_" is being used to hold the reference to keep autocomplete
// from deleting them until referenced below.
"k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
)
// TODO remove the underscores from the following imports as the services are
// implemented. "_" is being used to hold the reference to keep autocomplete
// from deleting them until referenced below.
// nolint: golint
import (
_ "github.com/containerd/containerd/api/types/container"
_ "github.com/containerd/containerd/api/types/descriptor"
_ "github.com/containerd/containerd/api/types/mount"
@ -51,23 +59,53 @@ type CRIContainerdService interface {
// criContainerdService implements CRIContainerdService.
type criContainerdService struct {
containerService execution.ContainerServiceClient
imageStore images.Store
contentIngester content.Ingester
contentProvider content.Provider
rootfsUnpacker rootfs.Unpacker
// os is an interface for all required os operations.
os osinterface.OS
// rootDir is the directory for managing cri-containerd files.
rootDir string
// sandboxStore stores all sandbox metadata.
sandboxStore metadata.SandboxStore
// imageMetadataStore stores all image metadata.
imageMetadataStore metadata.ImageMetadataStore
// sandboxNameIndex stores all sandbox names and make sure each name
// is unique.
sandboxNameIndex *registrar.Registrar
// sandboxIDIndex is trie tree for truncated id indexing, e.g. after an
// id "abcdefg" is added, we could use "abcd" to identify the same thing
// as long as there is no ambiguity.
sandboxIDIndex *truncindex.TruncIndex
// containerService is containerd container service client.
containerService execution.ContainerServiceClient
// contentIngester is the containerd service to ingest content into
// content store.
contentIngester content.Ingester
// contentProvider is the containerd service to get content from
// content store.
contentProvider content.Provider
// rootfsUnpacker is the containerd service to unpack image content
// into snapshots.
rootfsUnpacker rootfs.Unpacker
// imageStoreService is the containerd service to store and track
// image metadata.
imageStoreService images.Store
}
// NewCRIContainerdService returns a new instance of CRIContainerdService
func NewCRIContainerdService(conn *grpc.ClientConn) CRIContainerdService {
func NewCRIContainerdService(conn *grpc.ClientConn, rootDir string) CRIContainerdService {
// TODO: Initialize different containerd clients.
// TODO(random-liu): [P2] Recover from runtime state and metadata store.
return &criContainerdService{
os: osinterface.RealOS{},
rootDir: rootDir,
sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()),
imageMetadataStore: metadata.NewImageMetadataStore(store.NewMetadataStore()),
// TODO(random-liu): Register sandbox id/name for recovered sandbox.
sandboxNameIndex: registrar.NewRegistrar(),
sandboxIDIndex: truncindex.NewTruncIndex(nil),
containerService: execution.NewContainerServiceClient(conn),
imageStore: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)),
imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)),
contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)),
contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)),
rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)),
imageMetadataStore: metadata.NewImageMetadataStore(store.NewMetadataStore()),
}
}