Merge pull request #209 from Random-Liu/checkpoint-recovery
Checkpoint recovery
This commit is contained in:
commit
9015b6ec68
@ -157,6 +157,11 @@ func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.C
|
|||||||
}
|
}
|
||||||
meta.ImageRef = image.ID
|
meta.ImageRef = image.ID
|
||||||
|
|
||||||
|
// Get container log path.
|
||||||
|
if config.GetLogPath() != "" {
|
||||||
|
meta.LogPath = filepath.Join(sandbox.Config.GetLogDirectory(), config.GetLogPath())
|
||||||
|
}
|
||||||
|
|
||||||
containerIO, err := cio.NewContainerIO(id,
|
containerIO, err := cio.NewContainerIO(id,
|
||||||
cio.WithStdin(config.GetStdin()),
|
cio.WithStdin(config.GetStdin()),
|
||||||
cio.WithTerminal(config.GetTty()),
|
cio.WithTerminal(config.GetTty()),
|
||||||
|
@ -19,7 +19,6 @@ package server
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"path/filepath"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/containerd/containerd"
|
"github.com/containerd/containerd"
|
||||||
@ -88,7 +87,6 @@ func (c *criContainerdService) startContainer(ctx context.Context,
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err)
|
return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err)
|
||||||
}
|
}
|
||||||
sandboxConfig := sandbox.Config
|
|
||||||
sandboxID := meta.SandboxID
|
sandboxID := meta.SandboxID
|
||||||
// Make sure sandbox is running.
|
// Make sure sandbox is running.
|
||||||
s, err := sandbox.Container.Task(ctx, nil)
|
s, err := sandbox.Container.Task(ctx, nil)
|
||||||
@ -106,7 +104,10 @@ func (c *criContainerdService) startContainer(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
ioCreation := func(id string) (_ containerd.IO, err error) {
|
ioCreation := func(id string) (_ containerd.IO, err error) {
|
||||||
var stdoutWC, stderrWC io.WriteCloser
|
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, config.GetTty())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create container loggers: %v", err)
|
||||||
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if stdoutWC != nil {
|
if stdoutWC != nil {
|
||||||
@ -117,23 +118,6 @@ func (c *criContainerdService) startContainer(ctx context.Context,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if config.GetLogPath() != "" {
|
|
||||||
// Only generate container log when log path is specified.
|
|
||||||
logPath := filepath.Join(sandboxConfig.GetLogDirectory(), config.GetLogPath())
|
|
||||||
if stdoutWC, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to start container stdout logger: %v", err)
|
|
||||||
}
|
|
||||||
// Only redirect stderr when there is no tty.
|
|
||||||
if !config.GetTty() {
|
|
||||||
if stderrWC, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to start container stderr logger: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
stdoutWC = cio.NewDiscardLogger()
|
|
||||||
stderrWC = cio.NewDiscardLogger()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil {
|
if err := cio.WithOutput("log", stdoutWC, stderrWC)(cntr.IO); err != nil {
|
||||||
return nil, fmt.Errorf("failed to add container log: %v", err)
|
return nil, fmt.Errorf("failed to add container log: %v", err)
|
||||||
}
|
}
|
||||||
@ -165,3 +149,28 @@ func (c *criContainerdService) startContainer(ctx context.Context,
|
|||||||
status.StartedAt = time.Now().UnixNano()
|
status.StartedAt = time.Now().UnixNano()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create container loggers and return write closer for stdout and stderr.
|
||||||
|
func createContainerLoggers(logPath string, tty bool) (stdout io.WriteCloser, stderr io.WriteCloser, err error) {
|
||||||
|
if logPath != "" {
|
||||||
|
// Only generate container log when log path is specified.
|
||||||
|
if stdout, err = cio.NewCRILogger(logPath, cio.Stdout); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to start container stdout logger: %v", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
stdout.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
// Only redirect stderr when there is no tty.
|
||||||
|
if !tty {
|
||||||
|
if stderr, err = cio.NewCRILogger(logPath, cio.Stderr); err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to start container stderr logger: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stdout = cio.NewDiscardLogger()
|
||||||
|
stderr = cio.NewDiscardLogger()
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
@ -426,3 +426,9 @@ func getSourceMount(source string, mountInfos []*mount.Info) (string, string, er
|
|||||||
// If we are here, we did not find parent mount. Something is wrong.
|
// If we are here, we did not find parent mount. Something is wrong.
|
||||||
return "", "", fmt.Errorf("Could not find source mount of %s", source)
|
return "", "", fmt.Errorf("Could not find source mount of %s", source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// filterLabel returns a label filter. Use `%q` here because containerd
|
||||||
|
// filter needs extra quote to work properly.
|
||||||
|
func filterLabel(k, v string) string {
|
||||||
|
return fmt.Sprintf("labels.%q==%q", k, v)
|
||||||
|
}
|
||||||
|
@ -136,6 +136,17 @@ func WithRootDir(root string) Opts {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithFIFOs specifies existing fifos for the container io.
|
||||||
|
func WithFIFOs(dir, stdin, stdout, stderr string) Opts {
|
||||||
|
return func(c *ContainerIO) error {
|
||||||
|
c.dir = dir
|
||||||
|
c.stdinPath = stdin
|
||||||
|
c.stdoutPath = stdout
|
||||||
|
c.stderrPath = stderr
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewContainerIO creates container io.
|
// NewContainerIO creates container io.
|
||||||
func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) {
|
func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) {
|
||||||
c := &ContainerIO{
|
c := &ContainerIO{
|
||||||
@ -149,6 +160,10 @@ func NewContainerIO(id string, opts ...Opts) (*ContainerIO, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if c.dir != "" {
|
||||||
|
// Return if fifos are already set.
|
||||||
|
return c, nil
|
||||||
|
}
|
||||||
fifos, err := newFifos(c.root, id)
|
fifos, err := newFifos(c.root, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
421
pkg/server/restart.go
Normal file
421
pkg/server/restart.go
Normal file
@ -0,0 +1,421 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2017 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd"
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
|
"github.com/containerd/containerd/errdefs"
|
||||||
|
"github.com/containerd/containerd/typeurl"
|
||||||
|
"github.com/docker/distribution/reference"
|
||||||
|
"github.com/docker/docker/pkg/system"
|
||||||
|
"github.com/golang/glog"
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||||
|
|
||||||
|
cio "github.com/kubernetes-incubator/cri-containerd/pkg/server/io"
|
||||||
|
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
|
||||||
|
imagestore "github.com/kubernetes-incubator/cri-containerd/pkg/store/image"
|
||||||
|
sandboxstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/sandbox"
|
||||||
|
)
|
||||||
|
|
||||||
|
// NOTE: The recovery logic has following assumption: when cri-containerd is down:
|
||||||
|
// 1) Files (e.g. root directory, netns) and checkpoint maintained by cri-containerd MUST NOT be
|
||||||
|
// touched. Or else, recovery logic for those containers/sandboxes may return error.
|
||||||
|
// 2) Containerd containers may be deleted, but SHOULD NOT be added. Or else, recovery logic
|
||||||
|
// for the newly added container/sandbox will return error, because there is no corresponding root
|
||||||
|
// directory created.
|
||||||
|
// 3) Containerd container tasks may exit or be stoppped, deleted. Even though current logic could
|
||||||
|
// tolerant tasks being created or started, we prefer that not to happen.
|
||||||
|
|
||||||
|
// recover recovers system state from containerd and status checkpoint.
|
||||||
|
func (c *criContainerdService) recover(ctx context.Context) error {
|
||||||
|
// Recover all sandboxes.
|
||||||
|
sandboxes, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindSandbox))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to list sandbox containers: %v", err)
|
||||||
|
}
|
||||||
|
for _, sandbox := range sandboxes {
|
||||||
|
sb, err := loadSandbox(ctx, sandbox)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to load sandbox %q: %v", sandbox.ID(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Loaded sandbox %+v", sb)
|
||||||
|
if err := c.sandboxStore.Add(sb); err != nil {
|
||||||
|
return fmt.Errorf("failed to add sandbox %q to store: %v", sandbox.ID(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover all containers.
|
||||||
|
containers, err := c.client.Containers(ctx, filterLabel(containerKindLabel, containerKindContainer))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to list containers: %v", err)
|
||||||
|
}
|
||||||
|
for _, container := range containers {
|
||||||
|
containerDir := getContainerRootDir(c.config.RootDir, container.ID())
|
||||||
|
cntr, err := loadContainer(ctx, container, containerDir)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Failed to load container %q: %v", container.ID(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("Loaded container %+v", cntr)
|
||||||
|
if err := c.containerStore.Add(cntr); err != nil {
|
||||||
|
return fmt.Errorf("failed to add container %q to store: %v", container.ID(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover all images.
|
||||||
|
cImages, err := c.client.ListImages(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to list images: %v", err)
|
||||||
|
}
|
||||||
|
images, err := loadImages(ctx, cImages, c.client.ContentStore())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to load images: %v", err)
|
||||||
|
}
|
||||||
|
for _, image := range images {
|
||||||
|
c.imageStore.Add(image)
|
||||||
|
glog.V(4).Infof("Loaded image %+v", image)
|
||||||
|
}
|
||||||
|
|
||||||
|
// It's possible that containerd containers are deleted unexpectedly. In that case,
|
||||||
|
// we can't even get metadata, we should cleanup orphaned sandbox/container directories
|
||||||
|
// with best effort.
|
||||||
|
|
||||||
|
// Cleanup orphaned sandbox directories without corresponding containerd container.
|
||||||
|
if err := cleanupOrphanedSandboxDirs(sandboxes, filepath.Join(c.config.RootDir, "sandboxes")); err != nil {
|
||||||
|
return fmt.Errorf("failed to cleanup orphaned sandbox directories: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup orphaned container directories without corresponding containerd container.
|
||||||
|
if err := cleanupOrphanedContainerDirs(containers, filepath.Join(c.config.RootDir, "containers")); err != nil {
|
||||||
|
return fmt.Errorf("failed to cleanup orphaned container directories: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadContainer loads container from containerd and status checkpoint.
|
||||||
|
func loadContainer(ctx context.Context, cntr containerd.Container, containerDir string) (containerstore.Container, error) {
|
||||||
|
id := cntr.ID()
|
||||||
|
var container containerstore.Container
|
||||||
|
// Load container metadata.
|
||||||
|
ext, ok := cntr.Extensions()[containerMetadataExtension]
|
||||||
|
if !ok {
|
||||||
|
return container, fmt.Errorf("metadata extension %q not found", containerMetadataExtension)
|
||||||
|
}
|
||||||
|
data, err := typeurl.UnmarshalAny(&ext)
|
||||||
|
if err != nil {
|
||||||
|
return container, fmt.Errorf("failed to unmarshal metadata extension %q: %v", ext, err)
|
||||||
|
}
|
||||||
|
meta := data.(*containerstore.Metadata)
|
||||||
|
|
||||||
|
// Load status from checkpoint.
|
||||||
|
status, err := containerstore.LoadStatus(containerDir, id)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Failed to load container status for %q: %v", id, err)
|
||||||
|
status = unknownContainerStatus()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load up-to-date status from containerd.
|
||||||
|
var containerIO *cio.ContainerIO
|
||||||
|
t, err := cntr.Task(ctx, func(fifos *containerd.FIFOSet) (containerd.IO, error) {
|
||||||
|
stdoutWC, stderrWC, err := createContainerLoggers(meta.LogPath, meta.Config.GetTty())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
containerIO, err = cio.NewContainerIO(id,
|
||||||
|
cio.WithTerminal(fifos.Terminal),
|
||||||
|
cio.WithFIFOs(fifos.Dir, fifos.In, fifos.Out, fifos.Err),
|
||||||
|
cio.WithOutput("log", stdoutWC, stderrWC),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if err := containerIO.Pipe(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return containerIO, nil
|
||||||
|
})
|
||||||
|
if err != nil && !errdefs.IsNotFound(err) {
|
||||||
|
return container, fmt.Errorf("failed to load task: %v", err)
|
||||||
|
}
|
||||||
|
var s containerd.Status
|
||||||
|
var notFound bool
|
||||||
|
if errdefs.IsNotFound(err) {
|
||||||
|
// Task is not found.
|
||||||
|
notFound = true
|
||||||
|
} else {
|
||||||
|
// Task is found. Get task status.
|
||||||
|
s, err = t.Status(ctx)
|
||||||
|
if err != nil {
|
||||||
|
// It's still possible that task is deleted during this window.
|
||||||
|
if !errdefs.IsNotFound(err) {
|
||||||
|
return container, fmt.Errorf("failed to get task status: %v", err)
|
||||||
|
}
|
||||||
|
notFound = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if notFound {
|
||||||
|
// Task is not created or has been deleted, use the checkpointed status
|
||||||
|
// to generate container status.
|
||||||
|
switch status.State() {
|
||||||
|
case runtime.ContainerState_CONTAINER_CREATED:
|
||||||
|
// TODO(random-liu): Do not create fifos directory in NewContainerIO.
|
||||||
|
// container is in created state, create container io for it.
|
||||||
|
// NOTE: Another possibility is that we've tried to start the container, but
|
||||||
|
// cri-containerd got restarted just during that. In that case, we still
|
||||||
|
// treat the container as `CREATED`.
|
||||||
|
containerIO, err = cio.NewContainerIO(id,
|
||||||
|
cio.WithStdin(meta.Config.GetStdin()),
|
||||||
|
cio.WithTerminal(meta.Config.GetTty()),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return container, fmt.Errorf("failed to create container io: %v", err)
|
||||||
|
}
|
||||||
|
case runtime.ContainerState_CONTAINER_RUNNING:
|
||||||
|
// Container was in running state, but its task has been deleted,
|
||||||
|
// set unknown exited state. Container io is not needed in this case.
|
||||||
|
status.FinishedAt = time.Now().UnixNano()
|
||||||
|
status.ExitCode = unknownExitCode
|
||||||
|
status.Reason = unknownExitReason
|
||||||
|
default:
|
||||||
|
// Container is in exited/unknown state, return the status as it is.
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Task status is found. Update container status based on the up-to-date task status.
|
||||||
|
switch s.Status {
|
||||||
|
case containerd.Created:
|
||||||
|
// Task has been created, but not started yet. This could only happen if cri-containerd
|
||||||
|
// gets restarted during container start.
|
||||||
|
// Container must be in `CREATED` state.
|
||||||
|
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
|
||||||
|
return container, fmt.Errorf("failed to delete task: %v", err)
|
||||||
|
}
|
||||||
|
if status.State() != runtime.ContainerState_CONTAINER_CREATED {
|
||||||
|
return container, fmt.Errorf("unexpected container state for created task: %q", status.State())
|
||||||
|
}
|
||||||
|
case containerd.Running:
|
||||||
|
// Task is running. Container must be in `RUNNING` state, based on our assuption that
|
||||||
|
// "task should not be started when cri-containerd is down".
|
||||||
|
switch status.State() {
|
||||||
|
case runtime.ContainerState_CONTAINER_EXITED:
|
||||||
|
return container, fmt.Errorf("unexpected container state for running task: %q", status.State())
|
||||||
|
case runtime.ContainerState_CONTAINER_RUNNING:
|
||||||
|
default:
|
||||||
|
// This may happen if cri-containerd gets restarted after task is started, but
|
||||||
|
// before status is checkpointed.
|
||||||
|
status.StartedAt = time.Now().UnixNano()
|
||||||
|
status.Pid = t.Pid()
|
||||||
|
}
|
||||||
|
case containerd.Stopped:
|
||||||
|
// Task is stopped. Updata status and delete the task.
|
||||||
|
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
|
||||||
|
return container, fmt.Errorf("failed to delete task: %v", err)
|
||||||
|
}
|
||||||
|
status.FinishedAt = s.ExitTime.UnixNano()
|
||||||
|
status.ExitCode = int32(s.ExitStatus)
|
||||||
|
default:
|
||||||
|
return container, fmt.Errorf("unexpected task status %q", s.Status)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
opts := []containerstore.Opts{
|
||||||
|
containerstore.WithStatus(status, containerDir),
|
||||||
|
containerstore.WithContainer(cntr),
|
||||||
|
}
|
||||||
|
if containerIO != nil {
|
||||||
|
opts = append(opts, containerstore.WithContainerIO(containerIO))
|
||||||
|
}
|
||||||
|
return containerstore.NewContainer(*meta, opts...)
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// unknownExitCode is the exit code when exit reason is unknown.
|
||||||
|
unknownExitCode = 255
|
||||||
|
// unknownExitReason is the exit reason when exit reason is unknown.
|
||||||
|
unknownExitReason = "Unknown"
|
||||||
|
)
|
||||||
|
|
||||||
|
// unknownContainerStatus returns the default container status when its status is unknown.
|
||||||
|
func unknownContainerStatus() containerstore.Status {
|
||||||
|
return containerstore.Status{
|
||||||
|
CreatedAt: time.Now().UnixNano(),
|
||||||
|
StartedAt: time.Now().UnixNano(),
|
||||||
|
FinishedAt: time.Now().UnixNano(),
|
||||||
|
ExitCode: unknownExitCode,
|
||||||
|
Reason: unknownExitReason,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadSandbox loads sandbox from containerd.
|
||||||
|
func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) {
|
||||||
|
var sandbox sandboxstore.Sandbox
|
||||||
|
// Load sandbox metadata.
|
||||||
|
ext, ok := cntr.Extensions()[sandboxMetadataExtension]
|
||||||
|
if !ok {
|
||||||
|
return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension)
|
||||||
|
}
|
||||||
|
data, err := typeurl.UnmarshalAny(&ext)
|
||||||
|
if err != nil {
|
||||||
|
return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %v", ext, err)
|
||||||
|
}
|
||||||
|
meta := data.(*sandboxstore.Metadata)
|
||||||
|
sandbox = sandboxstore.Sandbox{
|
||||||
|
Metadata: *meta,
|
||||||
|
Container: cntr,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load network namespace.
|
||||||
|
if meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetHostNetwork() {
|
||||||
|
// Don't need to load netns for host network sandbox.
|
||||||
|
return sandbox, nil
|
||||||
|
}
|
||||||
|
netNS, err := sandboxstore.LoadNetNS(meta.NetNSPath)
|
||||||
|
if err != nil {
|
||||||
|
if err != sandboxstore.ErrClosedNetNS {
|
||||||
|
return sandbox, fmt.Errorf("failed to load netns %q: %v", meta.NetNSPath, err)
|
||||||
|
}
|
||||||
|
netNS = nil
|
||||||
|
}
|
||||||
|
sandbox.NetNS = netNS
|
||||||
|
|
||||||
|
// It doesn't matter whether task is running or not. If it is running, sandbox
|
||||||
|
// status will be `READY`; if it is not running, sandbox status will be `NOT_READY`,
|
||||||
|
// kubelet will stop the sandbox which will properly cleanup everything.
|
||||||
|
return sandbox, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadImages loads images from containerd.
|
||||||
|
// TODO(random-liu): Check whether image is unpacked, because containerd put image reference
|
||||||
|
// into store before image is unpacked.
|
||||||
|
func loadImages(ctx context.Context, cImages []containerd.Image, provider content.Provider) ([]imagestore.Image, error) {
|
||||||
|
// Group images by image id.
|
||||||
|
imageMap := make(map[string][]containerd.Image)
|
||||||
|
for _, i := range cImages {
|
||||||
|
desc, err := i.Config(ctx)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Failed to get image config for %q: %v", i.Name(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
id := desc.Digest.String()
|
||||||
|
imageMap[id] = append(imageMap[id], i)
|
||||||
|
}
|
||||||
|
var images []imagestore.Image
|
||||||
|
for id, imgs := range imageMap {
|
||||||
|
// imgs len must be > 0, or else the entry will not be created in
|
||||||
|
// previous loop.
|
||||||
|
i := imgs[0]
|
||||||
|
info, err := getImageInfo(ctx, i, provider)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Failed to get image info for %q: %v", i.Name(), err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
image := imagestore.Image{
|
||||||
|
ID: id,
|
||||||
|
ChainID: info.chainID.String(),
|
||||||
|
Size: info.size,
|
||||||
|
Config: &info.config,
|
||||||
|
Image: i,
|
||||||
|
}
|
||||||
|
// Recover repo digests and repo tags.
|
||||||
|
for _, i := range imgs {
|
||||||
|
name := i.Name()
|
||||||
|
r, err := reference.ParseAnyReference(name)
|
||||||
|
if err != nil {
|
||||||
|
glog.Warningf("Failed to parse image reference %q: %v", name, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := r.(reference.Canonical); ok {
|
||||||
|
image.RepoDigests = append(image.RepoDigests, name)
|
||||||
|
} else if _, ok := r.(reference.Tagged); ok {
|
||||||
|
image.RepoTags = append(image.RepoTags, name)
|
||||||
|
} else if _, ok := r.(reference.Digested); ok {
|
||||||
|
// This is an image id.
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
glog.Warningf("Invalid image reference %q", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
images = append(images, image)
|
||||||
|
}
|
||||||
|
return images, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func cleanupOrphanedSandboxDirs(cntrs []containerd.Container, sandboxesRoot string) error {
|
||||||
|
// Cleanup orphaned sandbox directories.
|
||||||
|
dirs, err := ioutil.ReadDir(sandboxesRoot)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to read sandboxes directory %q: %v", sandboxesRoot, err)
|
||||||
|
}
|
||||||
|
cntrsMap := make(map[string]containerd.Container)
|
||||||
|
for _, cntr := range cntrs {
|
||||||
|
cntrsMap[cntr.ID()] = cntr
|
||||||
|
}
|
||||||
|
for _, d := range dirs {
|
||||||
|
if !d.IsDir() {
|
||||||
|
glog.Warningf("Invalid file %q found in sandboxes directory", d.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := cntrsMap[d.Name()]; ok {
|
||||||
|
// Do not remove sandbox directory if corresponding container is found.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
sandboxDir := filepath.Join(sandboxesRoot, d.Name())
|
||||||
|
if err := system.EnsureRemoveAll(sandboxDir); err != nil {
|
||||||
|
glog.Warningf("Failed to remove sandbox directory %q: %v", sandboxDir, err)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Cleanup orphaned sandbox directory %q", sandboxDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func cleanupOrphanedContainerDirs(cntrs []containerd.Container, containersRoot string) error {
|
||||||
|
// Cleanup orphaned container directories.
|
||||||
|
dirs, err := ioutil.ReadDir(containersRoot)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to read containers directory %q: %v", containersRoot, err)
|
||||||
|
}
|
||||||
|
cntrsMap := make(map[string]containerd.Container)
|
||||||
|
for _, cntr := range cntrs {
|
||||||
|
cntrsMap[cntr.ID()] = cntr
|
||||||
|
}
|
||||||
|
for _, d := range dirs {
|
||||||
|
if !d.IsDir() {
|
||||||
|
glog.Warningf("Invalid file %q found in containers directory", d.Name())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := cntrsMap[d.Name()]; ok {
|
||||||
|
// Do not remove container directory if corresponding container is found.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
containerDir := filepath.Join(containersRoot, d.Name())
|
||||||
|
if err := system.EnsureRemoveAll(containerDir); err != nil {
|
||||||
|
glog.Warningf("Failed to remove container directory %q: %v", containerDir, err)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("Cleanup orphaned container directory %q", containerDir)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
@ -56,7 +56,7 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Teardown network for sandbox.
|
// Teardown network for sandbox.
|
||||||
if sandbox.NetNSPath != "" {
|
if sandbox.NetNSPath != "" && sandbox.NetNS != nil {
|
||||||
if _, err := os.Stat(sandbox.NetNSPath); err != nil {
|
if _, err := os.Stat(sandbox.NetNSPath); err != nil {
|
||||||
if !os.IsNotExist(err) {
|
if !os.IsNotExist(err) {
|
||||||
return nil, fmt.Errorf("failed to stat network namespace path %s :%v", sandbox.NetNSPath, err)
|
return nil, fmt.Errorf("failed to stat network namespace path %s :%v", sandbox.NetNSPath, err)
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
"github.com/cri-o/ocicni/pkg/ocicni"
|
"github.com/cri-o/ocicni/pkg/ocicni"
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||||
@ -94,8 +95,6 @@ type criContainerdService struct {
|
|||||||
|
|
||||||
// NewCRIContainerdService returns a new instance of CRIContainerdService
|
// NewCRIContainerdService returns a new instance of CRIContainerdService
|
||||||
func NewCRIContainerdService(config options.Config) (CRIContainerdService, error) {
|
func NewCRIContainerdService(config options.Config) (CRIContainerdService, error) {
|
||||||
// TODO(random-liu): [P2] Recover from runtime state and checkpoint.
|
|
||||||
|
|
||||||
client, err := containerd.New(config.ContainerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
|
client, err := containerd.New(config.ContainerdEndpoint, containerd.WithDefaultNamespace(k8sContainerdNamespace))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
|
return nil, fmt.Errorf("failed to initialize containerd client with endpoint %q: %v",
|
||||||
@ -147,7 +146,11 @@ func NewCRIContainerdService(config options.Config) (CRIContainerdService, error
|
|||||||
// Run starts the cri-containerd service.
|
// Run starts the cri-containerd service.
|
||||||
func (c *criContainerdService) Run() error {
|
func (c *criContainerdService) Run() error {
|
||||||
glog.V(2).Info("Start cri-containerd service")
|
glog.V(2).Info("Start cri-containerd service")
|
||||||
// TODO(random-liu): Recover state.
|
|
||||||
|
glog.V(2).Infof("Start recovering state")
|
||||||
|
if err := c.recover(context.Background()); err != nil {
|
||||||
|
return fmt.Errorf("failed to recover state: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Start event handler.
|
// Start event handler.
|
||||||
glog.V(2).Info("Start event monitor")
|
glog.V(2).Info("Start event monitor")
|
||||||
|
@ -88,11 +88,6 @@ func (c *Container) Delete() error {
|
|||||||
return c.Status.Delete()
|
return c.Status.Delete()
|
||||||
}
|
}
|
||||||
|
|
||||||
// LoadContainer loads the internal used container type.
|
|
||||||
func LoadContainer() (Container, error) {
|
|
||||||
return Container{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store stores all Containers.
|
// Store stores all Containers.
|
||||||
type Store struct {
|
type Store struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
@ -41,6 +41,7 @@ func TestContainerStore(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
ImageRef: "TestImage-1",
|
ImageRef: "TestImage-1",
|
||||||
|
LogPath: "/test/log/path/1",
|
||||||
},
|
},
|
||||||
"2": {
|
"2": {
|
||||||
ID: "2",
|
ID: "2",
|
||||||
@ -53,6 +54,7 @@ func TestContainerStore(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
ImageRef: "TestImage-2",
|
ImageRef: "TestImage-2",
|
||||||
|
LogPath: "/test/log/path/2",
|
||||||
},
|
},
|
||||||
"3": {
|
"3": {
|
||||||
ID: "3",
|
ID: "3",
|
||||||
@ -65,6 +67,7 @@ func TestContainerStore(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
ImageRef: "TestImage-3",
|
ImageRef: "TestImage-3",
|
||||||
|
LogPath: "/test/log/path/3",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
statuses := map[string]Status{
|
statuses := map[string]Status{
|
||||||
@ -153,6 +156,7 @@ func TestWithContainerIO(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
ImageRef: "TestImage-1",
|
ImageRef: "TestImage-1",
|
||||||
|
LogPath: "/test/log/path",
|
||||||
}
|
}
|
||||||
status := Status{
|
status := Status{
|
||||||
Pid: 1,
|
Pid: 1,
|
||||||
|
@ -54,6 +54,8 @@ type Metadata struct {
|
|||||||
Config *runtime.ContainerConfig
|
Config *runtime.ContainerConfig
|
||||||
// ImageRef is the reference of image used by the container.
|
// ImageRef is the reference of image used by the container.
|
||||||
ImageRef string
|
ImageRef string
|
||||||
|
// LogPath is the container log path.
|
||||||
|
LogPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalJSON encodes Metadata into bytes in json format.
|
// MarshalJSON encodes Metadata into bytes in json format.
|
||||||
|
@ -36,6 +36,7 @@ func TestMetadataMarshalUnmarshal(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
ImageRef: "test-image-ref",
|
ImageRef: "test-image-ref",
|
||||||
|
LogPath: "/test/log/path",
|
||||||
}
|
}
|
||||||
|
|
||||||
assert := assertlib.New(t)
|
assert := assertlib.New(t)
|
||||||
|
@ -58,8 +58,7 @@ type Status struct {
|
|||||||
Message string
|
Message string
|
||||||
// Removing indicates that the container is in removing state.
|
// Removing indicates that the container is in removing state.
|
||||||
// This field doesn't need to be checkpointed.
|
// This field doesn't need to be checkpointed.
|
||||||
// TODO(random-liu): Reset this field to false during state recoverry.
|
Removing bool `json:"-"`
|
||||||
Removing bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// State returns current state of the container based on the container status.
|
// State returns current state of the container based on the container status.
|
||||||
|
@ -81,6 +81,7 @@ func TestStatusEncodeDecode(t *testing.T) {
|
|||||||
assert.NoError(err)
|
assert.NoError(err)
|
||||||
newS := &Status{}
|
newS := &Status{}
|
||||||
assert.NoError(newS.decode(data))
|
assert.NoError(newS.decode(data))
|
||||||
|
s.Removing = false // Removing should not be encoded.
|
||||||
assert.Equal(s, newS)
|
assert.Equal(s, newS)
|
||||||
|
|
||||||
unsupported, err := json.Marshal(&versionedStatus{
|
unsupported, err := json.Marshal(&versionedStatus{
|
||||||
|
@ -17,17 +17,25 @@ limitations under the License.
|
|||||||
package sandbox
|
package sandbox
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
cnins "github.com/containernetworking/plugins/pkg/ns"
|
cnins "github.com/containernetworking/plugins/pkg/ns"
|
||||||
|
"github.com/docker/docker/pkg/mount"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrClosedNetNS is the error returned when network namespace is closed.
|
||||||
|
var ErrClosedNetNS = errors.New("network namespace is closed")
|
||||||
|
|
||||||
// NetNS holds network namespace for sandbox
|
// NetNS holds network namespace for sandbox
|
||||||
type NetNS struct {
|
type NetNS struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
ns cnins.NetNS
|
ns cnins.NetNS
|
||||||
closed bool
|
closed bool
|
||||||
|
restored bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNetNS creates a network namespace for the sandbox
|
// NewNetNS creates a network namespace for the sandbox
|
||||||
@ -41,6 +49,22 @@ func NewNetNS() (*NetNS, error) {
|
|||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LoadNetNS loads existing network namespace. It returns ErrClosedNetNS
|
||||||
|
// if the network namespace has already been closed.
|
||||||
|
func LoadNetNS(path string) (*NetNS, error) {
|
||||||
|
if err := cnins.IsNSorErr(path); err != nil {
|
||||||
|
if _, ok := err.(cnins.NSPathNotExistErr); ok {
|
||||||
|
return nil, ErrClosedNetNS
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
ns, err := cnins.GetNS(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to load network namespace %v", err)
|
||||||
|
}
|
||||||
|
return &NetNS{ns: ns, restored: true}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Remove removes network namepace if it exists and not closed. Remove is idempotent,
|
// Remove removes network namepace if it exists and not closed. Remove is idempotent,
|
||||||
// meaning it might be invoked multiple times and provides consistent result.
|
// meaning it might be invoked multiple times and provides consistent result.
|
||||||
func (n *NetNS) Remove() error {
|
func (n *NetNS) Remove() error {
|
||||||
@ -49,10 +73,34 @@ func (n *NetNS) Remove() error {
|
|||||||
if !n.closed {
|
if !n.closed {
|
||||||
err := n.ns.Close()
|
err := n.ns.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return fmt.Errorf("failed to close network namespace: %v", err)
|
||||||
}
|
}
|
||||||
n.closed = true
|
n.closed = true
|
||||||
}
|
}
|
||||||
|
if n.restored {
|
||||||
|
path := n.ns.Path()
|
||||||
|
// TODO(random-liu): Add util function for unmount.
|
||||||
|
// Check netns existence.
|
||||||
|
if _, err := os.Stat(path); err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return fmt.Errorf("failed to stat netns: %v", err)
|
||||||
|
}
|
||||||
|
mounted, err := mount.Mounted(path)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to check netns mounted: %v", err)
|
||||||
|
}
|
||||||
|
if mounted {
|
||||||
|
err := unix.Unmount(path, unix.MNT_DETACH)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return fmt.Errorf("failed to umount netns: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := os.RemoveAll(path); err != nil {
|
||||||
|
return fmt.Errorf("failed to remove netns: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user