Avoid containerd access as much as possible.

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu
2018-01-25 01:15:45 +00:00
parent 11042a4141
commit df58d6825d
22 changed files with 797 additions and 337 deletions

View File

@@ -170,20 +170,26 @@ func (c containerForTest) toContainer() (containerstore.Container, error) {
func TestListContainers(t *testing.T) {
c := newTestCRIContainerdService()
sandboxesInStore := []sandboxstore.Sandbox{
{
Metadata: sandboxstore.Metadata{
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "s-1abcdef1234",
Name: "sandboxname-1",
Config: &runtime.PodSandboxConfig{Metadata: &runtime.PodSandboxMetadata{Name: "podname-1"}},
},
},
{
Metadata: sandboxstore.Metadata{
sandboxstore.Status{
State: sandboxstore.StateReady,
},
),
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "s-2abcdef1234",
Name: "sandboxname-2",
Config: &runtime.PodSandboxConfig{Metadata: &runtime.PodSandboxMetadata{Name: "podname-2"}},
},
},
sandboxstore.Status{
State: sandboxstore.StateNotReady,
},
),
}
createdAt := time.Now().UnixNano()
startedAt := time.Now().UnixNano()

View File

@@ -23,12 +23,14 @@ import (
"github.com/containerd/containerd"
containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
cio "github.com/containerd/cri-containerd/pkg/server/io"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
)
// StartContainer starts the container.
@@ -89,18 +91,7 @@ func (c *criContainerdService) startContainer(ctx context.Context,
return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err)
}
sandboxID := meta.SandboxID
// Make sure sandbox is running.
s, err := sandbox.Container.Task(ctx, nil)
if err != nil {
return fmt.Errorf("failed to get sandbox container %q info: %v", sandboxID, err)
}
// This is only a best effort check, sandbox may still exit after this. If sandbox fails
// before starting the container, the start will fail.
taskStatus, err := s.Status(ctx)
if err != nil {
return fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err)
}
if taskStatus.Status != containerd.Running {
if sandbox.Status.Get().State != sandboxstore.StateReady {
return fmt.Errorf("sandbox container %q is not running", sandboxID)
}
@@ -132,7 +123,8 @@ func (c *criContainerdService) startContainer(ctx context.Context,
}
defer func() {
if retErr != nil {
if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil {
// It's possible that task is deleted by event monitor.
if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("Failed to delete containerd task %q", id)
}
}

View File

@@ -111,6 +111,7 @@ type containerInfo struct {
}
// toCRIContainerInfo converts internal container object information to CRI container status response info map.
// TODO(random-liu): Return error instead of logging.
func toCRIContainerInfo(ctx context.Context, container containerstore.Container, verbose bool) (map[string]string, error) {
if !verbose {
return nil, nil

View File

@@ -31,11 +31,9 @@ import (
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
)
const (
// killContainerTimeout is the timeout that we wait for the container to
// be SIGKILLed.
killContainerTimeout = 2 * time.Minute
)
// killContainerTimeout is the timeout that we wait for the container to
// be SIGKILLed.
const killContainerTimeout = 2 * time.Minute
// StopContainer stops a running container with a grace period (i.e., timeout).
func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) {

View File

@@ -28,12 +28,14 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"github.com/containerd/cri-containerd/pkg/store"
containerstore "github.com/containerd/cri-containerd/pkg/store/container"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
)
// eventMonitor monitors containerd event and updates internal state correspondingly.
// TODO(random-liu): [P1] Is it possible to drop event during containerd is running?
// TODO(random-liu): [P1] Figure out is it possible to drop event during containerd
// is running. If it is, we should do periodically list to sync state with containerd.
type eventMonitor struct {
containerStore *containerstore.Store
sandboxStore *sandboxstore.Store
@@ -106,57 +108,22 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
e := any.(*eventtypes.TaskExit)
logrus.Infof("TaskExit event %+v", e)
cntr, err := em.containerStore.Get(e.ContainerID)
if err != nil {
if _, err := em.sandboxStore.Get(e.ContainerID); err == nil {
return
}
if err == nil {
handleContainerExit(e, cntr)
return
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID)
return
}
if e.Pid != cntr.Status.Get().Pid {
// Non-init process died, ignore the event.
// Use GetAll to include sandbox in unknown state.
sb, err := em.sandboxStore.GetAll(e.ContainerID)
if err == nil {
handleSandboxExit(e, sb)
return
} else if err != store.ErrNotExist {
logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID)
return
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(context.Background(),
func(*containerdio.FIFOSet) (containerdio.IO, error) {
return cntr.IO, nil
},
)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop container, task not found for container %q", e.ContainerID)
return
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID)
return
}
// Move on to make sure container status is updated.
}
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
// If FinishedAt has been set (e.g. with start failure), keep as
// it is.
if status.FinishedAt != 0 {
return status, nil
}
status.Pid = 0
status.FinishedAt = e.ExitedAt.UnixNano()
status.ExitCode = int32(e.ExitStatus)
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update container %q state", e.ContainerID)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
}
// Using channel to propagate the information of container stop
cntr.Stop()
case *eventtypes.TaskOOM:
e := any.(*eventtypes.TaskOOM)
logrus.Infof("TaskOOM event %+v", e)
@@ -177,3 +144,95 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
}
}
}
// handleContainerExit handles TaskExit event for container.
func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container) {
if e.Pid != cntr.Status.Get().Pid {
// Non-init process died, ignore the event.
return
}
// Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(context.Background(),
func(*containerdio.FIFOSet) (containerdio.IO, error) {
return cntr.IO, nil
},
)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for container %q", e.ContainerID)
return
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID)
return
}
// Move on to make sure container status is updated.
}
}
err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) {
// If FinishedAt has been set (e.g. with start failure), keep as
// it is.
if status.FinishedAt != 0 {
return status, nil
}
status.Pid = 0
status.FinishedAt = e.ExitedAt.UnixNano()
status.ExitCode = int32(e.ExitStatus)
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update container %q state", e.ContainerID)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
}
// Using channel to propagate the information of container stop
cntr.Stop()
}
// handleSandboxExit handles TaskExit event for sandbox.
func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) {
if e.Pid != sb.Status.Get().Pid {
// Non-init process died, ignore the event.
return
}
// No stream attached to sandbox container.
task, err := sb.Container.Task(context.Background(), nil)
if err != nil {
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID)
return
}
} else {
// TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker
if _, err = task.Delete(context.Background()); err != nil {
// TODO(random-liu): [P0] Enqueue the event and retry.
if !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID)
return
}
// Move on to make sure container status is updated.
}
}
err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
// NOTE(random-liu): We SHOULD NOT change UNKNOWN state here.
// If sandbox state is UNKNOWN when event monitor receives an TaskExit event,
// it means that sandbox start has failed. In that case, `RunPodSandbox` will
// cleanup everything immediately.
// Once sandbox state goes out of UNKNOWN, it becomes visable to the user, which
// is not what we want.
if status.State != sandboxstore.StateUnknown {
status.State = sandboxstore.StateNotReady
}
status.Pid = 0
return status, nil
})
if err != nil {
logrus.WithError(err).Errorf("Failed to update sandbox %q state", e.ContainerID)
// TODO(random-liu): [P0] Enqueue the event and retry.
return
}
}

View File

@@ -293,10 +293,63 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S
return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %v", ext, err)
}
meta := data.(*sandboxstore.Metadata)
sandbox = sandboxstore.Sandbox{
Metadata: *meta,
Container: cntr,
// Load sandbox created timestamp.
info, err := cntr.Info(ctx)
if err != nil {
return sandbox, fmt.Errorf("failed to get sandbox container info: %v", err)
}
createdAt := info.CreatedAt
// Load sandbox status.
t, err := cntr.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return sandbox, fmt.Errorf("failed to load task: %v", err)
}
var s containerd.Status
var notFound bool
if errdefs.IsNotFound(err) {
// Task is not found.
notFound = true
} else {
// Task is found. Get task status.
s, err = t.Status(ctx)
if err != nil {
// It's still possible that task is deleted during this window.
if !errdefs.IsNotFound(err) {
return sandbox, fmt.Errorf("failed to get task status: %v", err)
}
notFound = true
}
}
var state sandboxstore.State
var pid uint32
if notFound {
// Task does not exist, set sandbox state as NOTREADY.
state = sandboxstore.StateNotReady
} else {
if s.Status == containerd.Running {
// Task is running, set sandbox state as READY.
state = sandboxstore.StateReady
pid = t.Pid()
} else {
// Task is not running. Delete the task and set sandbox state as NOTREADY.
if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
return sandbox, fmt.Errorf("failed to delete task: %v", err)
}
state = sandboxstore.StateNotReady
}
}
sandbox = sandboxstore.NewSandbox(
*meta,
sandboxstore.Status{
Pid: pid,
CreatedAt: createdAt,
State: state,
},
)
sandbox.Container = cntr
// Load network namespace.
if meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetHostNetwork() {

View File

@@ -17,12 +17,6 @@ limitations under the License.
package server
import (
"fmt"
"time"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/api/types/task"
"github.com/containerd/containerd/errdefs"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@@ -33,39 +27,12 @@ import (
func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (*runtime.ListPodSandboxResponse, error) {
// List all sandboxes from store.
sandboxesInStore := c.sandboxStore.List()
response, err := c.client.TaskService().List(ctx, &tasks.ListTasksRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list sandbox containers: %v", err)
}
var sandboxes []*runtime.PodSandbox
for _, sandboxInStore := range sandboxesInStore {
var sandboxInContainerd *task.Process
for _, s := range response.Tasks {
if s.ID == sandboxInStore.ID {
sandboxInContainerd = s
break
}
}
// Set sandbox state to NOTREADY by default.
state := runtime.PodSandboxState_SANDBOX_NOTREADY
// If the sandbox container is running, return the sandbox as READY.
if sandboxInContainerd != nil && sandboxInContainerd.Status == task.StatusRunning {
state = runtime.PodSandboxState_SANDBOX_READY
}
info, err := sandboxInStore.Container.Info(ctx)
if err != nil {
// It's possible that container gets deleted during list.
if errdefs.IsNotFound(err) {
continue
}
return nil, fmt.Errorf("failed to get sandbox container %q info: %v", sandboxInStore.ID, err)
}
createdAt := info.CreatedAt
sandboxes = append(sandboxes, toCRISandbox(sandboxInStore.Metadata, state, createdAt))
sandboxes = append(sandboxes, toCRISandbox(
sandboxInStore.Metadata,
sandboxInStore.Status.Get(),
))
}
sandboxes = c.filterCRISandboxes(sandboxes, r.GetFilter())
@@ -73,12 +40,17 @@ func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.Li
}
// toCRISandbox converts sandbox metadata into CRI pod sandbox.
func toCRISandbox(meta sandboxstore.Metadata, state runtime.PodSandboxState, createdAt time.Time) *runtime.PodSandbox {
func toCRISandbox(meta sandboxstore.Metadata, status sandboxstore.Status) *runtime.PodSandbox {
// Set sandbox state to NOTREADY by default.
state := runtime.PodSandboxState_SANDBOX_NOTREADY
if status.State == sandboxstore.StateReady {
state = runtime.PodSandboxState_SANDBOX_READY
}
return &runtime.PodSandbox{
Id: meta.ID,
Metadata: meta.Config.GetMetadata(),
State: state,
CreatedAt: createdAt.UnixNano(),
CreatedAt: status.CreatedAt.UnixNano(),
Labels: meta.Config.GetLabels(),
Annotations: meta.Config.GetAnnotations(),
}

View File

@@ -44,90 +44,106 @@ func TestToCRISandbox(t *testing.T) {
Config: config,
NetNSPath: "test-netns",
}
state := runtime.PodSandboxState_SANDBOX_READY
expect := &runtime.PodSandbox{
Id: "test-id",
Metadata: config.GetMetadata(),
State: state,
CreatedAt: createdAt.UnixNano(),
Labels: config.GetLabels(),
Annotations: config.GetAnnotations(),
}
s := toCRISandbox(meta, state, createdAt)
assert.Equal(t, expect, s)
for desc, test := range map[string]struct {
state sandboxstore.State
expectedState runtime.PodSandboxState
}{
"sandbox state ready": {
state: sandboxstore.StateReady,
expectedState: runtime.PodSandboxState_SANDBOX_READY,
},
"sandbox state not ready": {
state: sandboxstore.StateNotReady,
expectedState: runtime.PodSandboxState_SANDBOX_NOTREADY,
},
} {
status := sandboxstore.Status{
CreatedAt: createdAt,
State: test.state,
}
expect.State = test.expectedState
s := toCRISandbox(meta, status)
assert.Equal(t, expect, s, desc)
}
}
func TestFilterSandboxes(t *testing.T) {
c := newTestCRIContainerdService()
sandboxes := []struct {
sandbox sandboxstore.Sandbox
state runtime.PodSandboxState
}{
{
sandbox: sandboxstore.Sandbox{
Metadata: sandboxstore.Metadata{
ID: "1abcdef",
Name: "sandboxname-1",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "podname-1",
Uid: "uid-1",
Namespace: "ns-1",
Attempt: 1,
},
sandboxes := []sandboxstore.Sandbox{
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "1abcdef",
Name: "sandboxname-1",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "podname-1",
Uid: "uid-1",
Namespace: "ns-1",
Attempt: 1,
},
},
},
state: runtime.PodSandboxState_SANDBOX_READY,
},
{
sandbox: sandboxstore.Sandbox{
Metadata: sandboxstore.Metadata{
ID: "2abcdef",
Name: "sandboxname-2",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "podname-2",
Uid: "uid-2",
Namespace: "ns-2",
Attempt: 2,
},
Labels: map[string]string{"a": "b"},
sandboxstore.Status{
CreatedAt: time.Now(),
State: sandboxstore.StateReady,
},
),
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "2abcdef",
Name: "sandboxname-2",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "podname-2",
Uid: "uid-2",
Namespace: "ns-2",
Attempt: 2,
},
Labels: map[string]string{"a": "b"},
},
},
state: runtime.PodSandboxState_SANDBOX_NOTREADY,
},
{
sandbox: sandboxstore.Sandbox{
Metadata: sandboxstore.Metadata{
ID: "3abcdef",
Name: "sandboxname-3",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "podname-2",
Uid: "uid-2",
Namespace: "ns-2",
Attempt: 2,
},
Labels: map[string]string{"c": "d"},
sandboxstore.Status{
CreatedAt: time.Now(),
State: sandboxstore.StateNotReady,
},
),
sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: "3abcdef",
Name: "sandboxname-3",
Config: &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: "podname-2",
Uid: "uid-2",
Namespace: "ns-2",
Attempt: 2,
},
Labels: map[string]string{"c": "d"},
},
},
state: runtime.PodSandboxState_SANDBOX_READY,
},
sandboxstore.Status{
CreatedAt: time.Now(),
State: sandboxstore.StateReady,
},
),
}
// Create PodSandbox
testSandboxes := []*runtime.PodSandbox{}
createdAt := time.Now()
for _, sb := range sandboxes {
testSandboxes = append(testSandboxes, toCRISandbox(sb.sandbox.Metadata, sb.state, createdAt))
testSandboxes = append(testSandboxes, toCRISandbox(sb.Metadata, sb.Status.Get()))
}
// Inject test sandbox metadata
for _, sb := range sandboxes {
assert.NoError(t, c.sandboxStore.Add(sb.sandbox))
assert.NoError(t, c.sandboxStore.Add(sb))
}
for desc, test := range map[string]struct {

View File

@@ -24,10 +24,11 @@ import (
"os/exec"
"strings"
"github.com/containerd/containerd"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
)
// PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
@@ -37,16 +38,7 @@ func (c *criContainerdService) PortForward(ctx context.Context, r *runtime.PortF
if err != nil {
return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err)
}
t, err := sandbox.Container.Task(ctx, nil)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox container task: %v", err)
}
status, err := t.Status(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox container status: %v", err)
}
if status.Status != containerd.Running {
if sandbox.Status.Get().State != sandboxstore.StateReady {
return nil, errors.New("sandbox container is not running")
}
// TODO(random-liu): Verify that ports are exposed.

View File

@@ -27,6 +27,7 @@ import (
"github.com/containerd/cri-containerd/pkg/log"
"github.com/containerd/cri-containerd/pkg/store"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
)
// RemovePodSandbox removes the sandbox. If there are running containers in the
@@ -46,12 +47,8 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime.
// Use the full sandbox id.
id := sandbox.ID
// Return error if sandbox container is not fully stopped.
_, err = sandbox.Container.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err)
}
if err == nil {
// Return error if sandbox container is still running.
if sandbox.Status.Get().State == sandboxstore.StateReady {
return nil, fmt.Errorf("sandbox container %q is not fully stopped", id)
}

View File

@@ -23,6 +23,7 @@ import (
"github.com/containerd/containerd"
containerdio "github.com/containerd/containerd/cio"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/linux/runctypes"
"github.com/containerd/containerd/oci"
"github.com/containerd/typeurl"
@@ -68,13 +69,16 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}()
// Create initial internal sandbox object.
sandbox := sandboxstore.Sandbox{
Metadata: sandboxstore.Metadata{
sandbox := sandboxstore.NewSandbox(
sandboxstore.Metadata{
ID: id,
Name: name,
Config: config,
},
}
sandboxstore.Status{
State: sandboxstore.StateUnknown,
},
)
// Ensure sandbox container image snapshot.
image, err := c.ensureImageExists(ctx, c.config.SandboxImage)
@@ -224,33 +228,86 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run
}
}()
// Create sandbox task in containerd.
log.Tracef("Create sandbox container (id=%q, name=%q).",
id, name)
// We don't need stdio for sandbox container.
task, err := container.NewTask(ctx, containerdio.NullIO)
// Update sandbox created timestamp.
info, err := container.Info(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create containerd task: %v", err)
return nil, fmt.Errorf("failed to get sandbox container info: %v", err)
}
defer func() {
if retErr != nil {
// Cleanup the sandbox container if an error is returned.
if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil {
logrus.WithError(err).Errorf("Failed to delete sandbox container %q", id)
}
}
}()
if err = task.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start sandbox container task %q: %v",
id, err)
if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) {
status.CreatedAt = info.CreatedAt
return status, nil
}); err != nil {
return nil, fmt.Errorf("failed to update sandbox created timestamp: %v", err)
}
// Add sandbox into sandbox store.
// Add sandbox into sandbox store in UNKNOWN state.
sandbox.Container = container
if err := c.sandboxStore.Add(sandbox); err != nil {
return nil, fmt.Errorf("failed to add sandbox %+v into store: %v", sandbox, err)
}
defer func() {
// Delete sandbox from sandbox store if there is an error.
if retErr != nil {
c.sandboxStore.Delete(id)
}
}()
// NOTE(random-liu): Sandbox state only stay in UNKNOWN state after this point
// and before the end of this function.
// * If `Update` succeeds, sandbox state will become READY in one transaction.
// * If `Update` fails, sandbox will be removed from the store in the defer above.
// * If cri-containerd stops at any point before `Update` finishes, because sandbox
// state is not checkpointed, it will be recovered from corresponding containerd task
// status during restart:
// * If the task is running, sandbox state will be READY,
// * Or else, sandbox state will be NOTREADY.
//
// In any case, sandbox will leave UNKNOWN state, so it's safe to ignore sandbox
// in UNKNOWN state in other functions.
// Start sandbox container in one transaction to avoid race condition with
// event monitor.
if err := sandbox.Status.Update(func(status sandboxstore.Status) (_ sandboxstore.Status, retErr error) {
// NOTE(random-liu): We should not change the sandbox state to NOTREADY
// if `Update` fails.
//
// If `Update` fails, the sandbox will be cleaned up by all the defers
// above. We should not let user see this sandbox, or else they will
// see the sandbox disappear after the defer clean up, which may confuse
// them.
//
// Given so, we should keep the sandbox in UNKNOWN state if `Update` fails,
// and ignore sandbox in UNKNOWN state in all the inspection functions.
// Create sandbox task in containerd.
log.Tracef("Create sandbox container (id=%q, name=%q).",
id, name)
// We don't need stdio for sandbox container.
task, err := container.NewTask(ctx, containerdio.NullIO)
if err != nil {
return status, fmt.Errorf("failed to create containerd task: %v", err)
}
defer func() {
if retErr != nil {
// Cleanup the sandbox container if an error is returned.
// It's possible that task is deleted by event monitor.
if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) {
logrus.WithError(err).Errorf("Failed to delete sandbox container %q", id)
}
}
}()
if err := task.Start(ctx); err != nil {
return status, fmt.Errorf("failed to start sandbox container task %q: %v",
id, err)
}
// Set the pod sandbox as ready after successfully start sandbox container.
status.Pid = task.Pid()
status.State = sandboxstore.StateReady
return status, nil
}); err != nil {
return nil, fmt.Errorf("failed to start sandbox container: %v", err)
}
return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil
}

View File

@@ -19,12 +19,10 @@ package server
import (
"encoding/json"
"fmt"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@@ -38,32 +36,14 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.
return nil, fmt.Errorf("an error occurred when try to find sandbox: %v", err)
}
task, err := sandbox.Container.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get sandbox container task: %v", err)
}
var pid uint32
var processStatus containerd.ProcessStatus
// If the sandbox container is running, treat it as READY.
if task != nil {
taskStatus, err := task.Status(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get task status: %v", err)
}
pid = task.Pid()
processStatus = taskStatus.Status
}
ip := c.getIP(sandbox)
ctrInfo, err := sandbox.Container.Info(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox container info: %v", err)
status := toCRISandboxStatus(sandbox.Metadata, sandbox.Status.Get(), ip)
if !r.GetVerbose() {
return &runtime.PodSandboxStatusResponse{Status: status}, nil
}
createdAt := ctrInfo.CreatedAt
status := toCRISandboxStatus(sandbox.Metadata, processStatus, createdAt, ip)
info, err := toCRISandboxInfo(ctx, sandbox, pid, processStatus, r.GetVerbose())
// Generate verbose information.
info, err := toCRISandboxInfo(ctx, sandbox)
if err != nil {
return nil, fmt.Errorf("failed to get verbose sandbox container info: %v", err)
}
@@ -92,10 +72,10 @@ func (c *criContainerdService) getIP(sandbox sandboxstore.Sandbox) string {
}
// toCRISandboxStatus converts sandbox metadata into CRI pod sandbox status.
func toCRISandboxStatus(meta sandboxstore.Metadata, status containerd.ProcessStatus, createdAt time.Time, ip string) *runtime.PodSandboxStatus {
func toCRISandboxStatus(meta sandboxstore.Metadata, status sandboxstore.Status, ip string) *runtime.PodSandboxStatus {
// Set sandbox state to NOTREADY by default.
state := runtime.PodSandboxState_SANDBOX_NOTREADY
if status == containerd.Running {
if status.State == sandboxstore.StateReady {
state = runtime.PodSandboxState_SANDBOX_READY
}
nsOpts := meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions()
@@ -103,7 +83,7 @@ func toCRISandboxStatus(meta sandboxstore.Metadata, status containerd.ProcessSta
Id: meta.ID,
Metadata: meta.Config.GetMetadata(),
State: state,
CreatedAt: createdAt.UnixNano(),
CreatedAt: status.CreatedAt.UnixNano(),
Network: &runtime.PodSandboxNetworkStatus{Ip: ip},
Linux: &runtime.LinuxPodSandboxStatus{
Namespaces: &runtime.Namespace{
@@ -132,14 +112,25 @@ type sandboxInfo struct {
}
// toCRISandboxInfo converts internal container object information to CRI sandbox status response info map.
func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox,
pid uint32, processStatus containerd.ProcessStatus, verbose bool) (map[string]string, error) {
if !verbose {
return nil, nil
func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox) (map[string]string, error) {
container := sandbox.Container
task, err := container.Task(ctx, nil)
if err != nil && !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to get sandbox container task: %v", err)
}
var processStatus containerd.ProcessStatus
if task != nil {
taskStatus, err := task.Status(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get task status: %v", err)
}
processStatus = taskStatus.Status
}
si := &sandboxInfo{
Pid: pid,
Pid: sandbox.Status.Get().Pid,
Status: string(processStatus),
Config: sandbox.Config,
}
@@ -155,25 +146,22 @@ func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox,
si.NetNSClosed = (sandbox.NetNS == nil || sandbox.NetNS.Closed())
}
container := sandbox.Container
spec, err := container.Spec(ctx)
if err == nil {
si.RuntimeSpec = spec
} else {
logrus.WithError(err).Errorf("Failed to get sandbox container %q runtime spec", sandbox.ID)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox container runtime spec: %v", err)
}
si.RuntimeSpec = spec
ctrInfo, err := container.Info(ctx)
if err == nil {
// Do not use config.SandboxImage because the configuration might
// be changed during restart. It may not reflect the actual image
// used by the sandbox container.
si.Image = ctrInfo.Image
si.SnapshotKey = ctrInfo.SnapshotKey
si.Snapshotter = ctrInfo.Snapshotter
} else {
logrus.WithError(err).Errorf("Failed to get sandbox container %q info", sandbox.ID)
if err != nil {
return nil, fmt.Errorf("failed to get sandbox container info: %v", err)
}
// Do not use config.SandboxImage because the configuration might
// be changed during restart. It may not reflect the actual image
// used by the sandbox container.
si.Image = ctrInfo.Image
si.SnapshotKey = ctrInfo.SnapshotKey
si.Snapshotter = ctrInfo.Snapshotter
infoBytes, err := json.Marshal(si)
if err != nil {

View File

@@ -20,7 +20,6 @@ import (
"testing"
"time"
"github.com/containerd/containerd"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
@@ -52,12 +51,10 @@ func TestPodSandboxStatus(t *testing.T) {
Labels: map[string]string{"a": "b"},
Annotations: map[string]string{"c": "d"},
}
sandbox := &sandboxstore.Sandbox{
Metadata: sandboxstore.Metadata{
ID: id,
Name: "test-name",
Config: config,
},
metadata := sandboxstore.Metadata{
ID: id,
Name: "test-name",
Config: config,
}
expected := &runtime.PodSandboxStatus{
@@ -77,21 +74,26 @@ func TestPodSandboxStatus(t *testing.T) {
Labels: config.GetLabels(),
Annotations: config.GetAnnotations(),
}
for _, status := range []containerd.ProcessStatus{
"",
containerd.Running,
containerd.Created,
containerd.Stopped,
containerd.Paused,
containerd.Pausing,
containerd.Unknown,
for desc, test := range map[string]struct {
state sandboxstore.State
expectedState runtime.PodSandboxState
}{
"sandbox state ready": {
state: sandboxstore.StateReady,
expectedState: runtime.PodSandboxState_SANDBOX_READY,
},
"sandbox state not ready": {
state: sandboxstore.StateNotReady,
expectedState: runtime.PodSandboxState_SANDBOX_NOTREADY,
},
} {
state := runtime.PodSandboxState_SANDBOX_NOTREADY
if status == containerd.Running {
state = runtime.PodSandboxState_SANDBOX_READY
t.Logf("TestCase: %s", desc)
status := sandboxstore.Status{
CreatedAt: createdAt,
State: test.state,
}
expected.State = state
got := toCRISandboxStatus(sandbox.Metadata, status, createdAt, ip)
expected.State = test.expectedState
got := toCRISandboxStatus(metadata, status, ip)
assert.Equal(t, expected, got)
}
}

View File

@@ -19,6 +19,7 @@ package server
import (
"fmt"
"os"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/errdefs"
@@ -26,8 +27,14 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/net/context"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
)
// stopCheckPollInterval is the the interval to check whether a sandbox
// is stopped successfully.
const stopCheckPollInterval = 100 * time.Millisecond
// StopPodSandbox stops the sandbox. If there are any running containers in the
// sandbox, they should be forcibly terminated.
func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) {
@@ -89,14 +96,18 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St
return nil, fmt.Errorf("failed to unmount sandbox files in %q: %v", sandboxRoot, err)
}
if err := c.stopSandboxContainer(ctx, sandbox.Container); err != nil {
return nil, fmt.Errorf("failed to stop sandbox container %q: %v", id, err)
// Only stop sandbox container when it's running.
if sandbox.Status.Get().State == sandboxstore.StateReady {
if err := c.stopSandboxContainer(ctx, sandbox); err != nil {
return nil, fmt.Errorf("failed to stop sandbox container %q: %v", id, err)
}
}
return &runtime.StopPodSandboxResponse{}, nil
}
// stopSandboxContainer kills and deletes sandbox container.
func (c *criContainerdService) stopSandboxContainer(ctx context.Context, container containerd.Container) error {
func (c *criContainerdService) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error {
container := sandbox.Container
task, err := container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
@@ -111,5 +122,28 @@ func (c *criContainerdService) stopSandboxContainer(ctx context.Context, contain
return fmt.Errorf("failed to delete sandbox container: %v", err)
}
return nil
return c.waitSandboxStop(ctx, sandbox, killContainerTimeout)
}
// waitSandboxStop polls sandbox state until timeout exceeds or sandbox is stopped.
func (c *criContainerdService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox, timeout time.Duration) error {
ticker := time.NewTicker(stopCheckPollInterval)
defer ticker.Stop()
timeoutTimer := time.NewTimer(timeout)
defer timeoutTimer.Stop()
for {
// Poll once before waiting for stopCheckPollInterval.
// TODO(random-liu): Use channel with event handler instead of polling.
if sandbox.Status.Get().State == sandboxstore.StateNotReady {
return nil
}
select {
case <-ctx.Done():
return fmt.Errorf("wait sandbox container %q is cancelled", sandbox.ID)
case <-timeoutTimer.C:
return fmt.Errorf("wait sandbox container %q stop timeout", sandbox.ID)
case <-ticker.C:
continue
}
}
}

View File

@@ -0,0 +1,68 @@
/*
Copyright 2018 The Containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package server
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"golang.org/x/net/context"
sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox"
)
func TestWaitSandboxStop(t *testing.T) {
id := "test-id"
for desc, test := range map[string]struct {
state sandboxstore.State
cancel bool
timeout time.Duration
expectErr bool
}{
"should return error if timeout exceeds": {
state: sandboxstore.StateReady,
timeout: 2 * stopCheckPollInterval,
expectErr: true,
},
"should return error if context is cancelled": {
state: sandboxstore.StateReady,
timeout: time.Hour,
cancel: true,
expectErr: true,
},
"should not return error if sandbox is stopped before timeout": {
state: sandboxstore.StateNotReady,
timeout: time.Hour,
expectErr: false,
},
} {
c := newTestCRIContainerdService()
sandbox := sandboxstore.NewSandbox(
sandboxstore.Metadata{ID: id},
sandboxstore.Status{State: test.state},
)
ctx := context.Background()
if test.cancel {
cancelledCtx, cancel := context.WithCancel(ctx)
cancel()
ctx = cancelledCtx
}
err := c.waitSandboxStop(ctx, sandbox, test.timeout)
assert.Equal(t, test.expectErr, err != nil, desc)
}
}