From bf28c7fc75fe3d6522a83f1c50eec5e4ae327ac0 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Fri, 12 May 2017 13:14:11 -0700 Subject: [PATCH] Add initial sandbox management implementation Signed-off-by: Random-Liu --- cmd/cri-containerd/cri_containerd.go | 14 +- cmd/cri-containerd/options/options.go | 25 +-- pkg/os/os.go | 11 ++ pkg/os/testing/fake_os.go | 12 ++ pkg/registrar/registrar.go | 101 +++++++++++ pkg/server/helpers.go | 125 +++++++++++++ pkg/server/image_pull.go | 8 +- pkg/server/sandbox_list.go | 109 +++++++++++- pkg/server/sandbox_remove.go | 66 ++++++- pkg/server/sandbox_run.go | 242 +++++++++++++++++++++++++- pkg/server/sandbox_status.go | 77 +++++++- pkg/server/sandbox_stop.go | 35 +++- pkg/server/service.go | 68 ++++++-- 13 files changed, 842 insertions(+), 51 deletions(-) create mode 100644 pkg/registrar/registrar.go create mode 100644 pkg/server/helpers.go diff --git a/cmd/cri-containerd/cri_containerd.go b/cmd/cri-containerd/cri_containerd.go index 9a0fe1458..ed31effec 100644 --- a/cmd/cri-containerd/cri_containerd.go +++ b/cmd/cri-containerd/cri_containerd.go @@ -32,20 +32,20 @@ func main() { o.AddFlags(pflag.CommandLine) options.InitFlags() - if o.CRIContainerdVersion { + if o.PrintVersion { version.PrintVersion() os.Exit(0) } - glog.V(2).Infof("Connect to containerd socket %q with timeout %v", o.ContainerdSocketPath, o.ContainerdConnectionTimeout) - conn, err := server.ConnectToContainerd(o.ContainerdSocketPath, o.ContainerdConnectionTimeout) + glog.V(2).Infof("Connect to containerd endpoint %q with timeout %v", o.ContainerdEndpoint, o.ContainerdConnectionTimeout) + conn, err := server.ConnectToContainerd(o.ContainerdEndpoint, o.ContainerdConnectionTimeout) if err != nil { - glog.Exitf("Failed to connect containerd socket %q: %v", o.ContainerdSocketPath, err) + glog.Exitf("Failed to connect containerd endpoint %q: %v", o.ContainerdEndpoint, err) } - glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.CRIContainerdSocketPath) - service := server.NewCRIContainerdService(conn) - s := server.NewCRIContainerdServer(o.CRIContainerdSocketPath, service, service) + glog.V(2).Infof("Run cri-containerd grpc server on socket %q", o.SocketPath) + service := server.NewCRIContainerdService(conn, o.RootDir) + s := server.NewCRIContainerdServer(o.SocketPath, service, service) if err := s.Run(); err != nil { glog.Exitf("Failed to run cri-containerd grpc server: %v", err) } diff --git a/cmd/cri-containerd/options/options.go b/cmd/cri-containerd/options/options.go index 1d91f5a84..9304169d5 100644 --- a/cmd/cri-containerd/options/options.go +++ b/cmd/cri-containerd/options/options.go @@ -25,12 +25,15 @@ import ( // CRIContainerdOptions contains cri-containerd command line options. type CRIContainerdOptions struct { - // CRIContainerdSocketPath is the path to the socket which cri-containerd serves on. - CRIContainerdSocketPath string - // CRIContainerdVersion is the git release version of cri-containerd - CRIContainerdVersion bool - // ContainerdSocketPath is the path to the containerd socket. - ContainerdSocketPath string + // SocketPath is the path to the socket which cri-containerd serves on. + SocketPath string + // RootDir is the root directory path for managing cri-containerd files + // (metadata checkpoint etc.) + RootDir string + // PrintVersion indicates to print version information of cri-containerd. + PrintVersion bool + // ContainerdEndpoint is the containerd endpoint path. + ContainerdEndpoint string // ContainerdConnectionTimeout is the connection timeout for containerd client. ContainerdConnectionTimeout time.Duration } @@ -42,13 +45,15 @@ func NewCRIContainerdOptions() *CRIContainerdOptions { // AddFlags adds cri-containerd command line options to pflag. func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) { - fs.StringVar(&c.CRIContainerdSocketPath, "cri-containerd-socket", + fs.StringVar(&c.SocketPath, "socket-path", "/var/run/cri-containerd.sock", "Path to the socket which cri-containerd serves on.") - fs.StringVar(&c.ContainerdSocketPath, "containerd-socket", - "/run/containerd/containerd.sock", "Path to the containerd socket.") + fs.StringVar(&c.RootDir, "root-dir", + "/var/lib/cri-containerd", "Root directory path for cri-containerd managed files (metadata checkpoint etc).") + fs.StringVar(&c.ContainerdEndpoint, "containerd-endpoint", + "/run/containerd/containerd.sock", "Path to the containerd endpoint.") fs.DurationVar(&c.ContainerdConnectionTimeout, "containerd-connection-timeout", 2*time.Minute, "Connection timeout for containerd client.") - fs.BoolVar(&c.CRIContainerdVersion, "version", + fs.BoolVar(&c.PrintVersion, "version", false, "Print cri-containerd version information and quit.") } diff --git a/pkg/os/os.go b/pkg/os/os.go index 4ba2a9cee..7de48167b 100644 --- a/pkg/os/os.go +++ b/pkg/os/os.go @@ -17,7 +17,12 @@ limitations under the License. package os import ( + "io" "os" + + "golang.org/x/net/context" + + "github.com/tonistiigi/fifo" ) // OS collects system level operations that need to be mocked out @@ -25,6 +30,7 @@ import ( type OS interface { MkdirAll(path string, perm os.FileMode) error RemoveAll(path string) error + OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) } // RealOS is used to dispatch the real system level operations. @@ -39,3 +45,8 @@ func (RealOS) MkdirAll(path string, perm os.FileMode) error { func (RealOS) RemoveAll(path string) error { return os.RemoveAll(path) } + +// OpenFifo will call fifo.OpenFifo to open a fifo. +func (RealOS) OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { + return fifo.OpenFifo(ctx, fn, flag, perm) +} diff --git a/pkg/os/testing/fake_os.go b/pkg/os/testing/fake_os.go index b385a08be..9edc89f43 100644 --- a/pkg/os/testing/fake_os.go +++ b/pkg/os/testing/fake_os.go @@ -17,8 +17,11 @@ limitations under the License. package testing import ( + "io" "os" + "golang.org/x/net/context" + osInterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" ) @@ -28,6 +31,7 @@ import ( type FakeOS struct { MkdirAllFn func(string, os.FileMode) error RemoveAllFn func(string) error + OpenFifoFn func(context.Context, string, int, os.FileMode) (io.ReadWriteCloser, error) } var _ osInterface.OS = &FakeOS{} @@ -52,3 +56,11 @@ func (f *FakeOS) RemoveAll(path string) error { } return nil } + +// OpenFifo is a fake call that invokes OpenFifoFn or just returns nil. +func (f *FakeOS) OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { + if f.OpenFifoFn != nil { + return f.OpenFifoFn(ctx, fn, flag, perm) + } + return nil, nil +} diff --git a/pkg/registrar/registrar.go b/pkg/registrar/registrar.go new file mode 100644 index 000000000..cb1c78dfe --- /dev/null +++ b/pkg/registrar/registrar.go @@ -0,0 +1,101 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package registrar + +import ( + "fmt" + "sync" +) + +// Registrar stores one-to-one name<->key mappings. +// Names and keys must be unique. +// Registrar is safe for concurrent access. +type Registrar struct { + lock sync.Mutex + nameToKey map[string]string + keyToName map[string]string +} + +// NewRegistrar creates a new Registrar with the empty indexes. +func NewRegistrar() *Registrar { + return &Registrar{ + nameToKey: make(map[string]string), + keyToName: make(map[string]string), + } +} + +// Reserve registers a name<->key mapping, name or key must not +// be empty. +// Reserve is idempotent. +// Attempting to reserve a conflict key<->name mapping results +// in an error. +// A name<->key reservation is globally unique. +func (r *Registrar) Reserve(name, key string) error { + r.lock.Lock() + defer r.lock.Unlock() + + if name == "" || key == "" { + return fmt.Errorf("invalid name %q or key %q", name, key) + } + + if k, exists := r.nameToKey[name]; exists { + if k != key { + return fmt.Errorf("name %q is reserved for %q", name, k) + } + return nil + } + + if n, exists := r.keyToName[key]; exists { + if n != name { + return fmt.Errorf("key %q is reserved for %q", key, n) + } + return nil + } + + r.nameToKey[name] = key + r.keyToName[key] = name + return nil +} + +// ReleaseByName releases the reserved name<->key mapping by name. +// Once released, the name and the key can be reserved again. +func (r *Registrar) ReleaseByName(name string) { + r.lock.Lock() + defer r.lock.Unlock() + + key, exists := r.nameToKey[name] + if !exists { + return + } + + delete(r.nameToKey, name) + delete(r.keyToName, key) +} + +// ReleaseByKey release the reserved name<->key mapping by key. +func (r *Registrar) ReleaseByKey(key string) { + r.lock.Lock() + defer r.lock.Unlock() + + name, exists := r.keyToName[key] + if !exists { + return + } + + delete(r.nameToKey, name) + delete(r.keyToName, key) +} diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go new file mode 100644 index 000000000..30e0e717b --- /dev/null +++ b/pkg/server/helpers.go @@ -0,0 +1,125 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "fmt" + "path/filepath" + "strings" + + "github.com/docker/docker/pkg/stringid" + "github.com/docker/docker/pkg/truncindex" + "google.golang.org/grpc" + + "github.com/containerd/containerd" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +const ( + // relativeRootfsPath is the rootfs path relative to bundle path. + relativeRootfsPath = "rootfs" + // defaultRuntime is the runtime to use in containerd. We may support + // other runtime in the future. + defaultRuntime = "linux" + // sandboxesDir contains all sandbox root. A sandbox root is the running + // directory of the sandbox, all files created for the sandbox will be + // placed under this directory. + sandboxesDir = "sandboxes" + // stdinNamedPipe is the name of stdin named pipe. + stdinNamedPipe = "stdin" + // stdoutNamedPipe is the name of stdout named pipe. + stdoutNamedPipe = "stdout" + // stderrNamedPipe is the name of stderr named pipe. + stderrNamedPipe = "stderr" + // Delimiter used to construct container/sandbox names. + nameDelimiter = "_" + // netNSFormat is the format of network namespace of a process. + netNSFormat = "/proc/%v/ns/net" +) + +// generateID generates a random unique id. +func generateID() string { + return stringid.GenerateNonCryptoID() +} + +// makeSandboxName generates sandbox name from sandbox metadata. The name +// generated is unique as long as sandbox metadata is unique. +func makeSandboxName(s *runtime.PodSandboxMetadata) string { + return strings.Join([]string{ + s.Name, // 0 + s.Namespace, // 1 + s.Uid, // 2 + fmt.Sprintf("%d", s.Attempt), // 3 + }, nameDelimiter) +} + +// getCgroupsPath generates container cgroups path. +func getCgroupsPath(cgroupsParent string, id string) string { + // TODO(random-liu): [P0] Handle systemd. + return filepath.Join(cgroupsParent, id) +} + +// getSandboxRootDir returns the root directory for managing sandbox files, +// e.g. named pipes. +func getSandboxRootDir(rootDir, id string) string { + return filepath.Join(rootDir, sandboxesDir, id) +} + +// getStreamingPipes returns the stdin/stdout/stderr pipes path in the root. +func getStreamingPipes(rootDir string) (string, string, string) { + stdin := filepath.Join(rootDir, stdinNamedPipe) + stdout := filepath.Join(rootDir, stdoutNamedPipe) + stderr := filepath.Join(rootDir, stderrNamedPipe) + return stdin, stdout, stderr +} + +// getNetworkNamespace returns the network namespace of a process. +func getNetworkNamespace(pid uint32) string { + return fmt.Sprintf(netNSFormat, pid) +} + +// isContainerdContainerNotExistError checks whether a grpc error is containerd +// ErrContainerNotExist error. +// TODO(random-liu): Containerd should expose error better through api. +func isContainerdContainerNotExistError(grpcError error) bool { + return grpc.ErrorDesc(grpcError) == containerd.ErrContainerNotExist.Error() +} + +// getSandbox gets the sandbox metadata from the sandbox store. It returns nil without +// error if the sandbox metadata is not found. It also tries to get full sandbox id and +// retry if the sandbox metadata is not found with the initial id. +func (c *criContainerdService) getSandbox(id string) (*metadata.SandboxMetadata, error) { + sandbox, err := c.sandboxStore.Get(id) + if err != nil { + return nil, fmt.Errorf("sandbox metadata not found: %v", err) + } + if sandbox != nil { + return sandbox, nil + } + // sandbox is not found in metadata store, try to extract full id. + id, err = c.sandboxIDIndex.Get(id) + if err != nil { + if err == truncindex.ErrNotExist { + return nil, nil + } + return nil, fmt.Errorf("sandbox id not found: %v", err) + } + return c.sandboxStore.Get(id) +} diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index cb964d348..6183a85c4 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -94,7 +94,7 @@ func (c *criContainerdService) imageReferenceResolver(ctx context.Context, ref s return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err) } - err = c.imageStore.Put(ctx, resolvedImageName, desc) + err = c.imageStoreService.Put(ctx, resolvedImageName, desc) if err != nil { return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err) } @@ -107,7 +107,7 @@ func (c *criContainerdService) imageReferenceResolver(ctx context.Context, ref s return resolvedImageName, manifest, compressedSize, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err) } - image, err := c.imageStore.Get(ctx, resolvedImageName) + image, err := c.imageStoreService.Get(ctx, resolvedImageName) if err != nil { return resolvedImageName, manifest, compressedSize, fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err) @@ -150,7 +150,7 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (image return desc, size, fmt.Errorf("failed to resolve ref %q: err: %v", ref, err) } - err = c.imageStore.Put(ctx, resolvedImageName, desc) + err = c.imageStoreService.Put(ctx, resolvedImageName, desc) if err != nil { return desc, size, fmt.Errorf("failed to put %q: desc: %v err: %v", resolvedImageName, desc, err) } @@ -165,7 +165,7 @@ func (c *criContainerdService) pullImage(ctx context.Context, ref string) (image return desc, size, fmt.Errorf("failed to fetch %q: desc: %v err: %v", resolvedImageName, desc, err) } - image, err := c.imageStore.Get(ctx, resolvedImageName) + image, err := c.imageStoreService.Get(ctx, resolvedImageName) if err != nil { return desc, size, fmt.Errorf("get failed for image:%q err: %v", resolvedImageName, err) diff --git a/pkg/server/sandbox_list.go b/pkg/server/sandbox_list.go index bc7832c79..5baf81919 100644 --- a/pkg/server/sandbox_list.go +++ b/pkg/server/sandbox_list.go @@ -17,14 +17,117 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" + "github.com/containerd/containerd/api/services/execution" + + "github.com/containerd/containerd/api/types/container" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // ListPodSandbox returns a list of Sandbox. -func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (*runtime.ListPodSandboxResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (retRes *runtime.ListPodSandboxResponse, retErr error) { + glog.V(4).Infof("ListPodSandbox with filter %+v", r.GetFilter()) + defer func() { + if retErr == nil { + glog.V(4).Infof("ListPodSandbox returns sandboxes %+v", retRes.GetItems()) + } + }() + + // List all sandbox metadata from store. + sandboxesInStore, err := c.sandboxStore.List() + if err != nil { + return nil, fmt.Errorf("failed to list metadata from sandbox store: %v", err) + } + + resp, err := c.containerService.List(ctx, &execution.ListRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to list sandbox containers: %v", err) + } + sandboxesInContainerd := resp.Containers + + var sandboxes []*runtime.PodSandbox + for _, sandboxInStore := range sandboxesInStore { + var sandboxInContainerd *container.Container + for _, s := range sandboxesInContainerd { + if s.ID == sandboxInStore.ID { + sandboxInContainerd = s + break + } + } + + // Set sandbox state to NOTREADY by default. + state := runtime.PodSandboxState_SANDBOX_NOTREADY + // If the sandbox container is running, return the sandbox as READY. + if sandboxInContainerd != nil && sandboxInContainerd.Status == container.Status_RUNNING { + state = runtime.PodSandboxState_SANDBOX_READY + } + + sandboxes = append(sandboxes, toCRISandbox(sandboxInStore, state)) + } + + sandboxes = c.filterCRISandboxes(sandboxes, r.GetFilter()) + return &runtime.ListPodSandboxResponse{Items: sandboxes}, nil +} + +// toCRISandbox converts sandbox metadata into CRI pod sandbox. +func toCRISandbox(meta *metadata.SandboxMetadata, state runtime.PodSandboxState) *runtime.PodSandbox { + return &runtime.PodSandbox{ + Id: meta.ID, + Metadata: meta.Config.GetMetadata(), + State: state, + CreatedAt: meta.CreatedAt, + Labels: meta.Config.GetLabels(), + Annotations: meta.Config.GetAnnotations(), + } +} + +// filterCRISandboxes filters CRISandboxes. +func (c *criContainerdService) filterCRISandboxes(sandboxes []*runtime.PodSandbox, filter *runtime.PodSandboxFilter) []*runtime.PodSandbox { + if filter == nil { + return sandboxes + } + + var filterID string + if filter.GetId() != "" { + // Handle truncate id. Use original filter if failed to convert. + var err error + filterID, err = c.sandboxIDIndex.Get(filter.GetId()) + if err != nil { + filterID = filter.GetId() + } + } + + filtered := []*runtime.PodSandbox{} + for _, s := range sandboxes { + // Filter by id + if filterID != "" && filterID != s.Id { + continue + } + // Filter by state + if filter.GetState() != nil && filter.GetState().GetState() != s.State { + continue + } + // Filter by label + if filter.GetLabelSelector() != nil { + match := true + for k, v := range filter.GetLabelSelector() { + if s.Labels[k] != v { + match = false + break + } + } + if !match { + continue + } + } + filtered = append(filtered, s) + } + + return filtered } diff --git a/pkg/server/sandbox_remove.go b/pkg/server/sandbox_remove.go index 90c741ca1..13ed8882f 100644 --- a/pkg/server/sandbox_remove.go +++ b/pkg/server/sandbox_remove.go @@ -17,15 +17,75 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" + "github.com/containerd/containerd/api/services/execution" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) // RemovePodSandbox removes the sandbox. If there are running containers in the // sandbox, they should be forcibly removed. -func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (*runtime.RemovePodSandboxResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodSandboxRequest) (retRes *runtime.RemovePodSandboxResponse, retErr error) { + glog.V(2).Infof("RemovePodSandbox for sandbox %q", r.GetPodSandboxId()) + defer func() { + if retErr == nil { + glog.V(2).Info("RemovePodSandbox returns successfully") + } + }() + + sandbox, err := c.getSandbox(r.GetPodSandboxId()) + if err != nil { + return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err) + } + if sandbox == nil { + // Do not return error if the id doesn't exist. + glog.V(5).Infof("RemovePodSandbox called for sandbox %q that does not exist", + r.GetPodSandboxId()) + return &runtime.RemovePodSandboxResponse{}, nil + } + // Use the full sandbox id. + id := sandbox.ID + + // TODO(random-liu): [P2] Remove all containers in the sandbox. + + // Return error if sandbox container is not fully stopped. + _, err = c.containerService.Info(ctx, &execution.InfoRequest{ID: id}) + if err != nil && !isContainerdContainerNotExistError(err) { + return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) + } + if err == nil { + return nil, fmt.Errorf("sandbox container %q is not fully stopped", id) + } + + // TODO(random-liu): [P0] Cleanup shm created in RunPodSandbox. + // TODO(random-liu): [P1] Remove permanent namespace once used. + + // Cleanup the sandbox root directory. + sandboxRootDir := getSandboxRootDir(c.rootDir, id) + if err := c.os.RemoveAll(sandboxRootDir); err != nil { + return nil, fmt.Errorf("failed to remove sandbox root directory %q: %v", + sandboxRootDir, err) + } + + // Remove sandbox metadata from metadata store. Note that once the sandbox + // metadata is successfully deleted: + // 1) ListPodSandbox will not include this sandbox. + // 2) PodSandboxStatus and StopPodSandbox will return error. + // 3) On-going operations which have held the metadata reference will not be + // affected. + if err := c.sandboxStore.Delete(id); err != nil { + return nil, fmt.Errorf("failed to delete sandbox metadata for %q: %v", id, err) + } + + // Release the sandbox id from id index. + c.sandboxIDIndex.Delete(id) // nolint: errcheck + + // Release the sandbox name reserved for the sandbox. + c.sandboxNameIndex.ReleaseByKey(id) + + return &runtime.RemovePodSandboxResponse{}, nil } diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index d3b0d46f2..4cbb7b8b2 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -17,15 +17,251 @@ limitations under the License. package server import ( - "errors" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "syscall" + "time" + prototypes "github.com/gogo/protobuf/types" + "github.com/golang/glog" + runtimespec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/opencontainers/runtime-tools/generate" "golang.org/x/net/context" + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/mount" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure // the sandbox is in ready state. -func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (*runtime.RunPodSandboxResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandboxRequest) (retRes *runtime.RunPodSandboxResponse, retErr error) { + glog.V(2).Infof("RunPodSandbox with config %+v", r.GetConfig()) + defer func() { + if retErr == nil { + glog.V(2).Infof("RunPodSandbox returns sandbox id %q", retRes.GetPodSandboxId()) + } + }() + + config := r.GetConfig() + + // Generate unique id and name for the sandbox and reserve the name. + id := generateID() + name := makeSandboxName(config.GetMetadata()) + // Reserve the sandbox name to avoid concurrent `RunPodSandbox` request starting the + // same sandbox. + if err := c.sandboxNameIndex.Reserve(name, id); err != nil { + return nil, fmt.Errorf("failed to reserve sandbox name %q: %v", name, err) + } + defer func() { + // Release the name if the function returns with an error. + if retErr != nil { + c.sandboxNameIndex.ReleaseByName(name) + } + }() + // Register the sandbox id. + if err := c.sandboxIDIndex.Add(id); err != nil { + return nil, fmt.Errorf("failed to insert sandbox id %q: %v", id, err) + } + defer func() { + // Delete the sandbox id if the function returns with an error. + if retErr != nil { + c.sandboxIDIndex.Delete(id) // nolint: errcheck + } + }() + + // Create initial sandbox metadata. + meta := metadata.SandboxMetadata{ + ID: id, + Name: name, + Config: config, + } + + // TODO(random-liu): [P0] Ensure pause image snapshot, apply default image config + // and get snapshot mounts. + // Use fixed rootfs path and sleep command. + const rootPath = "/" + + // TODO(random-liu): [P0] Set up sandbox network with network plugin. + + // Create sandbox container root directory. + // Prepare streaming named pipe. + sandboxRootDir := getSandboxRootDir(c.rootDir, id) + if err := c.os.MkdirAll(sandboxRootDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create sandbox root directory %q: %v", + sandboxRootDir, err) + } + defer func() { + if retErr != nil { + // Cleanup the sandbox root directory. + if err := c.os.RemoveAll(sandboxRootDir); err != nil { + glog.Errorf("Failed to remove sandbox root directory %q: %v", + sandboxRootDir, err) + } + } + }() + + // TODO(random-liu): [P1] Moving following logging related logic into util functions. + // Discard sandbox container output because we don't care about it. + _, stdout, stderr := getStreamingPipes(sandboxRootDir) + for _, p := range []string{stdout, stderr} { + f, err := c.os.OpenFifo(ctx, p, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) + if err != nil { + return nil, fmt.Errorf("failed to open named pipe %q: %v", p, err) + } + defer func(c io.Closer) { + if retErr != nil { + c.Close() + } + }(f) + go func(r io.ReadCloser) { + // Discard the output for now. + io.Copy(ioutil.Discard, r) // nolint: errcheck + r.Close() + }(f) + } + + // Start sandbox container. + spec := c.generateSandboxContainerSpec(id, config) + rawSpec, err := json.Marshal(spec) + if err != nil { + return nil, fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err) + } + glog.V(4).Infof("Sandbox container spec: %+v", spec) + createOpts := &execution.CreateRequest{ + ID: id, + Spec: &prototypes.Any{ + TypeUrl: runtimespec.Version, + Value: rawSpec, + }, + // TODO(random-liu): [P0] Get rootfs mount from containerd. + Rootfs: []*mount.Mount{ + { + Type: "bind", + Source: rootPath, + Options: []string{ + "rw", + "rbind", + }, + }, + }, + Runtime: defaultRuntime, + // No stdin for sandbox container. + Stdout: stdout, + Stderr: stderr, + } + + // Create sandbox container in containerd. + glog.V(5).Infof("Create sandbox container (id=%q, name=%q) with options %+v.", + id, name, createOpts) + createResp, err := c.containerService.Create(ctx, createOpts) + if err != nil { + return nil, fmt.Errorf("failed to create sandbox container %q: %v", + id, err) + } + defer func() { + if retErr != nil { + // Cleanup the sandbox container if an error is returned. + if _, err := c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}); err != nil { + glog.Errorf("Failed to delete sandbox container %q: %v", + id, err) + } + } + }() + + // Start sandbox container in containerd. + if _, err := c.containerService.Start(ctx, &execution.StartRequest{ID: id}); err != nil { + return nil, fmt.Errorf("failed to start sandbox container %q: %v", + id, err) + } + + // Add sandbox into sandbox store. + meta.CreatedAt = time.Now().UnixNano() + // TODO(random-liu): [P2] Replace with permanent network namespace. + meta.NetNS = getNetworkNamespace(createResp.Pid) + if err := c.sandboxStore.Create(meta); err != nil { + return nil, fmt.Errorf("failed to add sandbox metadata %+v into store: %v", + meta, err) + } + + return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil +} + +func (c *criContainerdService) generateSandboxContainerSpec(id string, config *runtime.PodSandboxConfig) *runtimespec.Spec { + // TODO(random-liu): [P0] Get command from image config. + pauseCommand := []string{"sh", "-c", "while true; do sleep 1000000000; done"} + + // Creates a spec Generator with the default spec. + // TODO(random-liu): [P1] Compare the default settings with docker and containerd default. + g := generate.New() + + // Set relative root path. + g.SetRootPath(relativeRootfsPath) + + // Set process commands. + g.SetProcessArgs(pauseCommand) + + // Make root of sandbox container read-only. + g.SetRootReadonly(true) + + // Set hostname. + g.SetHostname(config.GetHostname()) + + // TODO(random-liu): [P0] Set DNS options. Maintain a resolv.conf for the sandbox. + + // TODO(random-liu): [P0] Add NamespaceGetter and PortMappingGetter to initialize network plugin. + + // TODO(random-liu): [P0] Add annotation to identify the container is managed by cri-containerd. + // TODO(random-liu): [P2] Consider whether to add labels and annotations to the container. + + // Set cgroups parent. + if config.GetLinux().GetCgroupParent() != "" { + cgroupsPath := getCgroupsPath(config.GetLinux().GetCgroupParent(), id) + g.SetLinuxCgroupsPath(cgroupsPath) + } + // When cgroup parent is not set, containerd-shim will create container in a child cgroup + // of the cgroup itself is in. + // TODO(random-liu): [P2] Set default cgroup path if cgroup parent is not specified. + + // Set namespace options. + nsOptions := config.GetLinux().GetSecurityContext().GetNamespaceOptions() + // TODO(random-liu): [P1] Create permanent network namespace, so that we could still cleanup + // network namespace after sandbox container dies unexpectedly. + // By default, all namespaces are enabled for the container, runc will create a new namespace + // for it. By removing the namespace, the container will inherit the namespace of the runtime. + if nsOptions.GetHostNetwork() { + g.RemoveLinuxNamespace(string(runtimespec.NetworkNamespace)) // nolint: errcheck + // TODO(random-liu): [P1] Figure out how to handle UTS namespace. + } + + if nsOptions.GetHostPid() { + g.RemoveLinuxNamespace(string(runtimespec.PIDNamespace)) // nolint: errcheck + } + + // TODO(random-liu): [P0] Deal with /dev/shm. Use host for HostIpc, and create and mount for + // non-HostIpc. What about mqueue? + if nsOptions.GetHostIpc() { + g.RemoveLinuxNamespace(string(runtimespec.IPCNamespace)) // nolint: errcheck + } + + // TODO(random-liu): [P1] Apply SeLinux options. + + // TODO(random-liu): [P1] Set user. + + // TODO(random-liu): [P1] Set supplemental group. + + // TODO(random-liu): [P1] Set privileged. + + // TODO(random-liu): [P2] Set sysctl from annotations. + + // TODO(random-liu): [P2] Set apparmor and seccomp from annotations. + + // TODO(random-liu): [P1] Set default sandbox container resource limit. + + return g.Spec() } diff --git a/pkg/server/sandbox_status.go b/pkg/server/sandbox_status.go index 8034dfef5..e1ecb5ce1 100644 --- a/pkg/server/sandbox_status.go +++ b/pkg/server/sandbox_status.go @@ -17,14 +17,85 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // PodSandboxStatus returns the status of the PodSandbox. -func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (*runtime.PodSandboxStatusResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (retRes *runtime.PodSandboxStatusResponse, retErr error) { + glog.V(4).Infof("PodSandboxStatus for sandbox %q", r.GetPodSandboxId()) + defer func() { + if retErr == nil { + glog.V(4).Infof("PodSandboxStatus returns status %+v", retRes.GetStatus()) + } + }() + + sandbox, err := c.getSandbox(r.GetPodSandboxId()) + if err != nil { + return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err) + } + if sandbox == nil { + return nil, fmt.Errorf("sandbox %q does not exist", r.GetPodSandboxId()) + } + // Use the full sandbox id. + id := sandbox.ID + + info, err := c.containerService.Info(ctx, &execution.InfoRequest{ID: id}) + if err != nil && !isContainerdContainerNotExistError(err) { + return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) + } + + // Set sandbox state to NOTREADY by default. + state := runtime.PodSandboxState_SANDBOX_NOTREADY + // If the sandbox container is running, treat it as READY. + if info != nil && info.Status == container.Status_RUNNING { + state = runtime.PodSandboxState_SANDBOX_READY + } + + return &runtime.PodSandboxStatusResponse{Status: toCRISandboxStatus(sandbox, state)}, nil +} + +// toCRISandboxStatus converts sandbox metadata into CRI pod sandbox status. +func toCRISandboxStatus(meta *metadata.SandboxMetadata, state runtime.PodSandboxState) *runtime.PodSandboxStatus { + nsOpts := meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions() + netNS := meta.NetNS + if state == runtime.PodSandboxState_SANDBOX_NOTREADY { + // Return empty network namespace when sandbox is not ready. + // For kubenet, when sandbox is not running, both empty + // network namespace and a valid permanent network namespace + // work. Go with the first option here because it's the current + // behavior in Kubernetes. + netNS = "" + } + return &runtime.PodSandboxStatus{ + Id: meta.ID, + Metadata: meta.Config.GetMetadata(), + State: state, + CreatedAt: meta.CreatedAt, + // TODO(random-liu): [P0] Get sandbox ip from network plugin. + Network: &runtime.PodSandboxNetworkStatus{}, + Linux: &runtime.LinuxPodSandboxStatus{ + Namespaces: &runtime.Namespace{ + // TODO(random-liu): Revendor new CRI version and get + // rid of this field. + Network: netNS, + Options: &runtime.NamespaceOption{ + HostNetwork: nsOpts.GetHostNetwork(), + HostPid: nsOpts.GetHostPid(), + HostIpc: nsOpts.GetHostIpc(), + }, + }, + }, + Labels: meta.Config.GetLabels(), + Annotations: meta.Config.GetAnnotations(), + } } diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index 06a3fedc7..c45f1cff6 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -17,15 +17,44 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" + "github.com/containerd/containerd/api/services/execution" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be forcibly terminated. -func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (retRes *runtime.StopPodSandboxResponse, retErr error) { + glog.V(2).Infof("StopPodSandbox for sandbox %q", r.GetPodSandboxId()) + defer func() { + if retErr == nil { + glog.V(2).Info("StopPodSandbox returns successfully") + } + }() + + sandbox, err := c.getSandbox(r.GetPodSandboxId()) + if err != nil { + return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err) + } + if sandbox == nil { + return nil, fmt.Errorf("sandbox %q does not exist", r.GetPodSandboxId()) + } + // Use the full sandbox id. + id := sandbox.ID + + // TODO(random-liu): [P1] Handle sandbox container graceful deletion. + // Delete the sandbox container from containerd. + _, err = c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}) + if err != nil && !isContainerdContainerNotExistError(err) { + return nil, fmt.Errorf("failed to delete sandbox container %q: %v", id, err) + } + + // TODO(random-liu): [P0] Call network plugin to teardown network. + // TODO(random-liu): [P2] Stop all containers inside the sandbox. + return &runtime.StopPodSandboxResponse{}, nil } diff --git a/pkg/server/service.go b/pkg/server/service.go index 53d560e6c..9909ab8e5 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "github.com/docker/docker/pkg/truncindex" "google.golang.org/grpc" contentapi "github.com/containerd/containerd/api/services/content" @@ -29,13 +30,20 @@ import ( contentservice "github.com/containerd/containerd/services/content" imagesservice "github.com/containerd/containerd/services/images" rootfsservice "github.com/containerd/containerd/services/rootfs" + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" - "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + osinterface "github.com/kubernetes-incubator/cri-containerd/pkg/os" + "github.com/kubernetes-incubator/cri-containerd/pkg/registrar" - // TODO remove the underscores from the following imports as the services are - // implemented. "_" is being used to hold the reference to keep autocomplete - // from deleting them until referenced below. + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +// TODO remove the underscores from the following imports as the services are +// implemented. "_" is being used to hold the reference to keep autocomplete +// from deleting them until referenced below. +// nolint: golint +import ( _ "github.com/containerd/containerd/api/types/container" _ "github.com/containerd/containerd/api/types/descriptor" _ "github.com/containerd/containerd/api/types/mount" @@ -51,23 +59,53 @@ type CRIContainerdService interface { // criContainerdService implements CRIContainerdService. type criContainerdService struct { - containerService execution.ContainerServiceClient - imageStore images.Store - contentIngester content.Ingester - contentProvider content.Provider - rootfsUnpacker rootfs.Unpacker + // os is an interface for all required os operations. + os osinterface.OS + // rootDir is the directory for managing cri-containerd files. + rootDir string + // sandboxStore stores all sandbox metadata. + sandboxStore metadata.SandboxStore + // imageMetadataStore stores all image metadata. imageMetadataStore metadata.ImageMetadataStore + // sandboxNameIndex stores all sandbox names and make sure each name + // is unique. + sandboxNameIndex *registrar.Registrar + // sandboxIDIndex is trie tree for truncated id indexing, e.g. after an + // id "abcdefg" is added, we could use "abcd" to identify the same thing + // as long as there is no ambiguity. + sandboxIDIndex *truncindex.TruncIndex + // containerService is containerd container service client. + containerService execution.ContainerServiceClient + // contentIngester is the containerd service to ingest content into + // content store. + contentIngester content.Ingester + // contentProvider is the containerd service to get content from + // content store. + contentProvider content.Provider + // rootfsUnpacker is the containerd service to unpack image content + // into snapshots. + rootfsUnpacker rootfs.Unpacker + // imageStoreService is the containerd service to store and track + // image metadata. + imageStoreService images.Store } // NewCRIContainerdService returns a new instance of CRIContainerdService -func NewCRIContainerdService(conn *grpc.ClientConn) CRIContainerdService { +func NewCRIContainerdService(conn *grpc.ClientConn, rootDir string) CRIContainerdService { // TODO: Initialize different containerd clients. + // TODO(random-liu): [P2] Recover from runtime state and metadata store. return &criContainerdService{ - containerService: execution.NewContainerServiceClient(conn), - imageStore: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), - contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)), - contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), - rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)), + os: osinterface.RealOS{}, + rootDir: rootDir, + sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()), imageMetadataStore: metadata.NewImageMetadataStore(store.NewMetadataStore()), + // TODO(random-liu): Register sandbox id/name for recovered sandbox. + sandboxNameIndex: registrar.NewRegistrar(), + sandboxIDIndex: truncindex.NewTruncIndex(nil), + containerService: execution.NewContainerServiceClient(conn), + imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), + contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)), + contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), + rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)), } }