From cc1b0b6709405bb89a4a2a945260c377b5201aec Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Mon, 4 Sep 2017 02:47:58 +0000 Subject: [PATCH] Add restart recovery logic. Signed-off-by: Lantao Liu --- pkg/server/container_create.go | 5 + pkg/server/container_start.go | 49 +-- pkg/server/helpers.go | 6 + pkg/server/io/io.go | 15 + pkg/server/restart.go | 421 ++++++++++++++++++++++++++ pkg/server/sandbox_stop.go | 2 +- pkg/server/service.go | 9 +- pkg/store/container/container.go | 5 - pkg/store/container/container_test.go | 4 + pkg/store/container/metadata.go | 2 + pkg/store/container/metadata_test.go | 1 + pkg/store/container/status.go | 3 +- pkg/store/container/status_test.go | 1 + pkg/store/sandbox/netns.go | 54 +++- 14 files changed, 543 insertions(+), 34 deletions(-) create mode 100644 pkg/server/restart.go diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index dde565b1c..3a19348b0 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -159,6 +159,11 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C } meta.ImageRef = image.ID + // Get container log path. + if config.GetLogPath() != "" { + meta.LogPath = filepath.Join(sandbox.Config.GetLogDirectory(), config.GetLogPath()) + } + containerIO, err := cio.NewContainerIO(id, cio.WithStdin(config.GetStdin()), cio.WithTerminal(config.GetTty()), diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 0d500997e..e2fb5b871 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -19,7 +19,6 @@ package server import ( "fmt" "io" - "path/filepath" "time" "github.com/containerd/containerd" @@ -88,7 +87,6 @@ func (c *criContainerdService) startContainer(ctx context.Context, if err != nil { return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err) } - sandboxConfig := sandbox.Config sandboxID := meta.SandboxID // Make sure sandbox is running. s, err := sandbox.Container.Task(ctx, nil) @@ -106,7 +104,10 @@ func (c *criContainerdService) startContainer(ctx context.Context, } ioCreation := func(id string) (_ containerd.IO, err error) { - var stdoutWC, stderrWC io.WriteCloser + stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty()) + if err != nil { + return nil, fmt.Errorf("failed to create container loggers: %v", err) + } defer func() { if err != nil { if stdoutWC != nil { @@ -117,23 +118,6 @@ func (c *criContainerdService) startContainer(ctx context.Context, } } }() - if config.GetLogPath() != "" { - // Only generate container log when log path is specified. - logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath()) - if stdoutWC, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil { - return nil, fmt.Errorf("failed to start container stdout logger: %v", err) - } - // Only redirect stderr when there is no tty. - if !config.GetTty() { - if stderrWC, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil { - return nil, fmt.Errorf("failed to start container stderr logger: %v", err) - } - } - } else { - stdoutWC = cio.NewDiscardLogger() - stderrWC = cio.NewDiscardLogger() - } - if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil { return nil, fmt.Errorf("failed to add container log: %v", err) } @@ -165,3 +149,28 @@ func (c *criContainerdService) startContainer(ctx context.Context, status.StartedAt = time.Now().UnixNano() return nil } + +// Create container loggers and return write closer for stdout and stderr. +func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) { + if logPath != "" { + // Only generate container log when log path is specified. + if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil { + return nil, nil, fmt.Errorf("failed to start container stdout logger: %v", err) + } + defer func() { + if err != nil { + stdout.Close() + } + }() + // Only redirect stderr when there is no tty. + if !tty { + if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil { + return nil, nil, fmt.Errorf("failed to start container stderr logger: %v", err) + } + } + } else { + stdout = cio.NewDiscardLogger() + stderr = cio.NewDiscardLogger() + } + return +} diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index fd250fbe6..6a3d62e60 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -426,3 +426,9 @@ func getSourceMount(source string, mountInfos []*mount.Info) (string, string, er // If we are here, we did not find parent mount. Something is wrong. return "", "", fmt.Errorf("Could not find source mount of %s", source) } + +// filterLabel returns a label filter. Use `%q` here because containerd +// filter needs extra quote to work properly. +func filterLabel(k, v string) string { + return fmt.Sprintf("labels.%q==%q", k, v) +} diff --git a/pkg/server/io/io.go b/pkg/server/io/io.go index 36643f053..42f66606a 100644 --- a/pkg/server/io/io.go +++ b/pkg/server/io/io.go @@ -136,6 +136,17 @@ func WithRootDir(root string) Opts { } } +// WithFIFOs specifies existing fifos for the container io. +func WithFIFOs(dir, stdin, stdout, stderr string) Opts { + return func(c *ContainerIO) error { + c.dir = dir + c.stdinPath = stdin + c.stdoutPath = stdout + c.stderrPath = stderr + return nil + } +} + // NewContainerIO creates container io. func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) { c := &ContainerIO{ @@ -149,6 +160,10 @@ func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) { return nil, err } } + if c.dir != "" { + // Return if fifos are already set. + return c, nil + } fifos, err := newFifos(c.root, id) if err != nil { return nil, err diff --git a/pkg/server/restart.go b/pkg/server/restart.go new file mode 100644 index 000000000..2050b487e --- /dev/null +++ b/pkg/server/restart.go @@ -0,0 +1,421 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/typeurl" + "github.com/docker/distribution/reference" + "github.com/docker/docker/pkg/system" + "github.com/golang/glog" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + + cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io" + containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" + imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image" + sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox" +) + +// NOTE: The recovery logic has following assumption: when cri-containerd is down: +// 1) Files (e.g. root directory, netns) and checkpoint maintained by cri-containerd MUST NOT be +// touched. Or else, recovery logic for those containers/sandboxes may return error. +// 2) Containerd containers may be deleted, but SHOULD NOT be added. Or else, recovery logic +// for the newly added container/sandbox will return error, because there is no corresponding root +// directory created. +// 3) Containerd container tasks may exit or be stoppped, deleted. Even though current logic could +// tolerant tasks being created or started, we prefer that not to happen. + +// recover recovers system state from containerd and status checkpoint. +func (c *criContainerdService) recover(ctx context.Context) error { + // Recover all sandboxes. + sandboxes, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindSandbox)) + if err != nil { + return fmt.Errorf("failed to list sandbox containers: %v", err) + } + for _, sandbox := range sandboxes { + sb, err := loadSandbox(ctx, sandbox) + if err != nil { + glog.Errorf("Failed to load sandbox %q: %v", sandbox.ID(), err) + continue + } + glog.V(4).Infof("Loaded sandbox %+v", sb) + if err := c.sandboxStore.Add(sb); err != nil { + return fmt.Errorf("failed to add sandbox %q to store: %v", sandbox.ID(), err) + } + } + + // Recover all containers. + containers, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindContainer)) + if err != nil { + return fmt.Errorf("failed to list containers: %v", err) + } + for _, container := range containers { + containerDir := getContainerRootDir(c.config.RootDir, container.ID()) + cntr, err := loadContainer(ctx, container, containerDir) + if err != nil { + glog.Errorf("Failed to load container %q: %v", container.ID(), err) + continue + } + glog.V(4).Infof("Loaded container %+v", cntr) + if err := c.containerStore.Add(cntr); err != nil { + return fmt.Errorf("failed to add container %q to store: %v", container.ID(), err) + } + } + + // Recover all images. + cImages, err := c.client.ListImages(ctx) + if err != nil { + return fmt.Errorf("failed to list images: %v", err) + } + images, err := loadImages(ctx, cImages, c.client.ContentStore()) + if err != nil { + return fmt.Errorf("failed to load images: %v", err) + } + for _, image := range images { + c.imageStore.Add(image) + glog.V(4).Infof("Loaded image %+v", image) + } + + // It's possible that containerd containers are deleted unexpectedly. In that case, + // we can't even get metadata, we should cleanup orphaned sandbox/container directories + // with best effort. + + // Cleanup orphaned sandbox directories without corresponding containerd container. + if err := cleanupOrphanedSandboxDirs(sandboxes, filepath.Join(c.config.RootDir, "sandboxes")); err != nil { + return fmt.Errorf("failed to cleanup orphaned sandbox directories: %v", err) + } + + // Cleanup orphaned container directories without corresponding containerd container. + if err := cleanupOrphanedContainerDirs(containers, filepath.Join(c.config.RootDir, "containers")); err != nil { + return fmt.Errorf("failed to cleanup orphaned container directories: %v", err) + } + + return nil +} + +// loadContainer loads container from containerd and status checkpoint. +func loadContainer(ctx context.Context, cntr containerd.Container, containerDir string) (containerstore.Container, error) { + id := cntr.ID() + var container containerstore.Container + // Load container metadata. + ext, ok := cntr.Extensions()[containerMetadataExtension] + if !ok { + return container, fmt.Errorf("metadata extension %q not found", containerMetadataExtension) + } + data, err := typeurl.UnmarshalAny(&ext) + if err != nil { + return container, fmt.Errorf("failed to unmarshal metadata extension %q: %v", ext, err) + } + meta := data.(*containerstore.Metadata) + + // Load status from checkpoint. + status, err := containerstore.LoadStatus(containerDir, id) + if err != nil { + glog.Warningf("Failed to load container status for %q: %v", id, err) + status = unknownContainerStatus() + } + + // Load up-to-date status from containerd. + var containerIO *cio.ContainerIO + t, err := cntr.Task(ctx, func(fifos *containerd.FIFOSet) (containerd.IO, error) { + stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, meta.Config.GetTty()) + if err != nil { + return nil, err + } + containerIO, err = cio.NewContainerIO(id, + cio.WithTerminal(fifos.Terminal), + cio.WithFIFOs(fifos.Dir, fifos.In, fifos.Out, fifos.Err), + cio.WithOutput("log", stdoutWC, stderrWC), + ) + if err != nil { + return nil, err + } + if err := containerIO.Pipe(); err != nil { + return nil, err + } + return containerIO, nil + }) + if err != nil && !errdefs.IsNotFound(err) { + return container, fmt.Errorf("failed to load task: %v", err) + } + var s containerd.Status + var notFound bool + if errdefs.IsNotFound(err) { + // Task is not found. + notFound = true + } else { + // Task is found. Get task status. + s, err = t.Status(ctx) + if err != nil { + // It's still possible that task is deleted during this window. + if !errdefs.IsNotFound(err) { + return container, fmt.Errorf("failed to get task status: %v", err) + } + notFound = true + } + } + if notFound { + // Task is not created or has been deleted, use the checkpointed status + // to generate container status. + switch status.State() { + case runtime.ContainerState_CONTAINER_CREATED: + // TODO(random-liu): Do not create fifos directory in NewContainerIO. + // container is in created state, create container io for it. + // NOTE: Another possibility is that we've tried to start the container, but + // cri-containerd got restarted just during that. In that case, we still + // treat the container as `CREATED`. + containerIO, err = cio.NewContainerIO(id, + cio.WithStdin(meta.Config.GetStdin()), + cio.WithTerminal(meta.Config.GetTty()), + ) + if err != nil { + return container, fmt.Errorf("failed to create container io: %v", err) + } + case runtime.ContainerState_CONTAINER_RUNNING: + // Container was in running state, but its task has been deleted, + // set unknown exited state. Container io is not needed in this case. + status.FinishedAt = time.Now().UnixNano() + status.ExitCode = unknownExitCode + status.Reason = unknownExitReason + default: + // Container is in exited/unknown state, return the status as it is. + } + } else { + // Task status is found. Update container status based on the up-to-date task status. + switch s.Status { + case containerd.Created: + // Task has been created, but not started yet. This could only happen if cri-containerd + // gets restarted during container start. + // Container must be in `CREATED` state. + if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + return container, fmt.Errorf("failed to delete task: %v", err) + } + if status.State() != runtime.ContainerState_CONTAINER_CREATED { + return container, fmt.Errorf("unexpected container state for created task: %q", status.State()) + } + case containerd.Running: + // Task is running. Container must be in `RUNNING` state, based on our assuption that + // "task should not be started when cri-containerd is down". + switch status.State() { + case runtime.ContainerState_CONTAINER_EXITED: + return container, fmt.Errorf("unexpected container state for running task: %q", status.State()) + case runtime.ContainerState_CONTAINER_RUNNING: + default: + // This may happen if cri-containerd gets restarted after task is started, but + // before status is checkpointed. + status.StartedAt = time.Now().UnixNano() + status.Pid = t.Pid() + } + case containerd.Stopped: + // Task is stopped. Updata status and delete the task. + if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + return container, fmt.Errorf("failed to delete task: %v", err) + } + status.FinishedAt = s.ExitTime.UnixNano() + status.ExitCode = int32(s.ExitStatus) + default: + return container, fmt.Errorf("unexpected task status %q", s.Status) + } + } + opts := []containerstore.Opts{ + containerstore.WithStatus(status, containerDir), + containerstore.WithContainer(cntr), + } + if containerIO != nil { + opts = append(opts, containerstore.WithContainerIO(containerIO)) + } + return containerstore.NewContainer(*meta, opts...) +} + +const ( + // unknownExitCode is the exit code when exit reason is unknown. + unknownExitCode = 255 + // unknownExitReason is the exit reason when exit reason is unknown. + unknownExitReason = "Unknown" +) + +// unknownContainerStatus returns the default container status when its status is unknown. +func unknownContainerStatus() containerstore.Status { + return containerstore.Status{ + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: unknownExitCode, + Reason: unknownExitReason, + } +} + +// loadSandbox loads sandbox from containerd. +func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { + var sandbox sandboxstore.Sandbox + // Load sandbox metadata. + ext, ok := cntr.Extensions()[sandboxMetadataExtension] + if !ok { + return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension) + } + data, err := typeurl.UnmarshalAny(&ext) + if err != nil { + return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %v", ext, err) + } + meta := data.(*sandboxstore.Metadata) + sandbox = sandboxstore.Sandbox{ + Metadata: *meta, + Container: cntr, + } + + // Load network namespace. + if meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetHostNetwork() { + // Don't need to load netns for host network sandbox. + return sandbox, nil + } + netNS, err := sandboxstore.LoadNetNS(meta.NetNSPath) + if err != nil { + if err != sandboxstore.ErrClosedNetNS { + return sandbox, fmt.Errorf("failed to load netns %q: %v", meta.NetNSPath, err) + } + netNS = nil + } + sandbox.NetNS = netNS + + // It doesn't matter whether task is running or not. If it is running, sandbox + // status will be `READY`; if it is not running, sandbox status will be `NOT_READY`, + // kubelet will stop the sandbox which will properly cleanup everything. + return sandbox, nil +} + +// loadImages loads images from containerd. +// TODO(random-liu): Check whether image is unpacked, because containerd put image reference +// into store before image is unpacked. +func loadImages(ctx context.Context, cImages []containerd.Image, provider content.Provider) ([]imagestore.Image, error) { + // Group images by image id. + imageMap := make(map[string][]containerd.Image) + for _, i := range cImages { + desc, err := i.Config(ctx) + if err != nil { + glog.Warningf("Failed to get image config for %q: %v", i.Name(), err) + continue + } + id := desc.Digest.String() + imageMap[id] = append(imageMap[id], i) + } + var images []imagestore.Image + for id, imgs := range imageMap { + // imgs len must be > 0, or else the entry will not be created in + // previous loop. + i := imgs[0] + info, err := getImageInfo(ctx, i, provider) + if err != nil { + glog.Warningf("Failed to get image info for %q: %v", i.Name(), err) + continue + } + image := imagestore.Image{ + ID: id, + ChainID: info.chainID.String(), + Size: info.size, + Config: &info.config, + Image: i, + } + // Recover repo digests and repo tags. + for _, i := range imgs { + name := i.Name() + r, err := reference.ParseAnyReference(name) + if err != nil { + glog.Warningf("Failed to parse image reference %q: %v", name, err) + continue + } + if _, ok := r.(reference.Canonical); ok { + image.RepoDigests = append(image.RepoDigests, name) + } else if _, ok := r.(reference.Tagged); ok { + image.RepoTags = append(image.RepoTags, name) + } else if _, ok := r.(reference.Digested); ok { + // This is an image id. + continue + } else { + glog.Warningf("Invalid image reference %q", name) + } + } + images = append(images, image) + } + return images, nil +} + +func cleanupOrphanedSandboxDirs(cntrs []containerd.Container, sandboxesRoot string) error { + // Cleanup orphaned sandbox directories. + dirs, err := ioutil.ReadDir(sandboxesRoot) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read sandboxes directory %q: %v", sandboxesRoot, err) + } + cntrsMap := make(map[string]containerd.Container) + for _, cntr := range cntrs { + cntrsMap[cntr.ID()] = cntr + } + for _, d := range dirs { + if !d.IsDir() { + glog.Warningf("Invalid file %q found in sandboxes directory", d.Name()) + continue + } + if _, ok := cntrsMap[d.Name()]; ok { + // Do not remove sandbox directory if corresponding container is found. + continue + } + sandboxDir := filepath.Join(sandboxesRoot, d.Name()) + if err := system.EnsureRemoveAll(sandboxDir); err != nil { + glog.Warningf("Failed to remove sandbox directory %q: %v", sandboxDir, err) + } else { + glog.V(4).Infof("Cleanup orphaned sandbox directory %q", sandboxDir) + } + } + return nil +} + +func cleanupOrphanedContainerDirs(cntrs []containerd.Container, containersRoot string) error { + // Cleanup orphaned container directories. + dirs, err := ioutil.ReadDir(containersRoot) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to read containers directory %q: %v", containersRoot, err) + } + cntrsMap := make(map[string]containerd.Container) + for _, cntr := range cntrs { + cntrsMap[cntr.ID()] = cntr + } + for _, d := range dirs { + if !d.IsDir() { + glog.Warningf("Invalid file %q found in containers directory", d.Name()) + continue + } + if _, ok := cntrsMap[d.Name()]; ok { + // Do not remove container directory if corresponding container is found. + continue + } + containerDir := filepath.Join(containersRoot, d.Name()) + if err := system.EnsureRemoveAll(containerDir); err != nil { + glog.Warningf("Failed to remove container directory %q: %v", containerDir, err) + } else { + glog.V(4).Infof("Cleanup orphaned container directory %q", containerDir) + } + } + return nil +} diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index fc9865b23..b86a0c443 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -56,7 +56,7 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St } // Teardown network for sandbox. - if sandbox.NetNSPath != "" { + if sandbox.NetNSPath != "" && sandbox.NetNS != nil { if _, err := os.Stat(sandbox.NetNSPath); err != nil { if !os.IsNotExist(err) { return nil, fmt.Errorf("failed to stat network namespace path %s :%v", sandbox.NetNSPath, err) diff --git a/pkg/server/service.go b/pkg/server/service.go index 0a40d29d2..6bf0306ee 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -28,6 +28,7 @@ import ( "github.com/containerd/containerd/images" "github.com/cri-o/ocicni/pkg/ocicni" "github.com/golang/glog" + "golang.org/x/net/context" "google.golang.org/grpc" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/server/streaming" @@ -94,8 +95,6 @@ type criContainerdService struct { // NewCRIContainerdService returns a new instance of CRIContainerdService func NewCRIContainerdService(config options.Config) (CRIContainerdService, error) { - // TODO(random-liu): [P2] Recover from runtime state and checkpoint. - client, err := containerd.New(config.ContainerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace)) if err != nil { return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v", @@ -147,7 +146,11 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error // Run starts the cri-containerd service. func (c *criContainerdService) Run() error { glog.V(2).Info("Start cri-containerd service") - // TODO(random-liu): Recover state. + + glog.V(2).Infof("Start recovering state") + if err := c.recover(context.Background()); err != nil { + return fmt.Errorf("failed to recover state: %v", err) + } // Start event handler. glog.V(2).Info("Start event monitor") diff --git a/pkg/store/container/container.go b/pkg/store/container/container.go index 11da1debe..247d9afb0 100644 --- a/pkg/store/container/container.go +++ b/pkg/store/container/container.go @@ -88,11 +88,6 @@ func (c *Container) Delete() error { return c.Status.Delete() } -// LoadContainer loads the internal used container type. -func LoadContainer() (Container, error) { - return Container{}, nil -} - // Store stores all Containers. type Store struct { lock sync.RWMutex diff --git a/pkg/store/container/container_test.go b/pkg/store/container/container_test.go index c3233b305..3f25887a4 100644 --- a/pkg/store/container/container_test.go +++ b/pkg/store/container/container_test.go @@ -41,6 +41,7 @@ func TestContainerStore(t *testing.T) { }, }, ImageRef: "TestImage-1", + LogPath: "/test/log/path/1", }, "2": { ID: "2", @@ -53,6 +54,7 @@ func TestContainerStore(t *testing.T) { }, }, ImageRef: "TestImage-2", + LogPath: "/test/log/path/2", }, "3": { ID: "3", @@ -65,6 +67,7 @@ func TestContainerStore(t *testing.T) { }, }, ImageRef: "TestImage-3", + LogPath: "/test/log/path/3", }, } statuses := map[string]Status{ @@ -153,6 +156,7 @@ func TestWithContainerIO(t *testing.T) { }, }, ImageRef: "TestImage-1", + LogPath: "/test/log/path", } status := Status{ Pid: 1, diff --git a/pkg/store/container/metadata.go b/pkg/store/container/metadata.go index 717fe52e9..3c314090c 100644 --- a/pkg/store/container/metadata.go +++ b/pkg/store/container/metadata.go @@ -54,6 +54,8 @@ type Metadata struct { Config *runtime.ContainerConfig // ImageRef is the reference of image used by the container. ImageRef string + // LogPath is the container log path. + LogPath string } // MarshalJSON encodes Metadata into bytes in json format. diff --git a/pkg/store/container/metadata_test.go b/pkg/store/container/metadata_test.go index 6ff3c8d7f..f6451a719 100644 --- a/pkg/store/container/metadata_test.go +++ b/pkg/store/container/metadata_test.go @@ -36,6 +36,7 @@ func TestMetadataMarshalUnmarshal(t *testing.T) { }, }, ImageRef: "test-image-ref", + LogPath: "/test/log/path", } assert := assertlib.New(t) diff --git a/pkg/store/container/status.go b/pkg/store/container/status.go index 56289e77c..a8390c81c 100644 --- a/pkg/store/container/status.go +++ b/pkg/store/container/status.go @@ -58,8 +58,7 @@ type Status struct { Message string // Removing indicates that the container is in removing state. // This field doesn't need to be checkpointed. - // TODO(random-liu): Reset this field to false during state recoverry. - Removing bool + Removing bool `json:"-"` } // State returns current state of the container based on the container status. diff --git a/pkg/store/container/status_test.go b/pkg/store/container/status_test.go index bd3537875..5e8560b4e 100644 --- a/pkg/store/container/status_test.go +++ b/pkg/store/container/status_test.go @@ -81,6 +81,7 @@ func TestStatusEncodeDecode(t *testing.T) { assert.NoError(err) newS := &Status{} assert.NoError(newS.decode(data)) + s.Removing = false // Removing should not be encoded. assert.Equal(s, newS) unsupported, err := json.Marshal(&versionedStatus{ diff --git a/pkg/store/sandbox/netns.go b/pkg/store/sandbox/netns.go index d39ce0165..5f4ab6758 100644 --- a/pkg/store/sandbox/netns.go +++ b/pkg/store/sandbox/netns.go @@ -17,17 +17,25 @@ limitations under the License. package sandbox import ( + "errors" "fmt" + "os" "sync" cnins "github.com/containernetworking/plugins/pkg/ns" + "github.com/docker/docker/pkg/mount" + "golang.org/x/sys/unix" ) +// ErrClosedNetNS is the error returned when network namespace is closed. +var ErrClosedNetNS = errors.New("network namespace is closed") + // NetNS holds network namespace for sandbox type NetNS struct { sync.Mutex - ns cnins.NetNS - closed bool + ns cnins.NetNS + closed bool + restored bool } // NewNetNS creates a network namespace for the sandbox @@ -41,6 +49,22 @@ func NewNetNS() (*NetNS, error) { return n, nil } +// LoadNetNS loads existing network namespace. It returns ErrClosedNetNS +// if the network namespace has already been closed. +func LoadNetNS(path string) (*NetNS, error) { + if err := cnins.IsNSorErr(path); err != nil { + if _, ok := err.(cnins.NSPathNotExistErr); ok { + return nil, ErrClosedNetNS + } + return nil, err + } + ns, err := cnins.GetNS(path) + if err != nil { + return nil, fmt.Errorf("failed to load network namespace %v", err) + } + return &NetNS{ns: ns, restored: true}, nil +} + // Remove removes network namepace if it exists and not closed. Remove is idempotent, // meaning it might be invoked multiple times and provides consistent result. func (n *NetNS) Remove() error { @@ -49,10 +73,34 @@ func (n *NetNS) Remove() error { if !n.closed { err := n.ns.Close() if err != nil { - return err + return fmt.Errorf("failed to close network namespace: %v", err) } n.closed = true } + if n.restored { + path := n.ns.Path() + // TODO(random-liu): Add util function for unmount. + // Check netns existence. + if _, err := os.Stat(path); err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed to stat netns: %v", err) + } + mounted, err := mount.Mounted(path) + if err != nil { + return fmt.Errorf("failed to check netns mounted: %v", err) + } + if mounted { + err := unix.Unmount(path, unix.MNT_DETACH) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to umount netns: %v", err) + } + } + if err := os.RemoveAll(path); err != nil { + return fmt.Errorf("failed to remove netns: %v", err) + } + } return nil }