diff --git a/integration/sandbox_clean_remove_test.go b/integration/sandbox_clean_remove_test.go index 16aa846a7..e1295567b 100644 --- a/integration/sandbox_clean_remove_test.go +++ b/integration/sandbox_clean_remove_test.go @@ -18,6 +18,7 @@ package integration import ( "testing" + "time" "github.com/containerd/containerd" "github.com/stretchr/testify/assert" @@ -50,9 +51,13 @@ func TestSandboxCleanRemove(t *testing.T) { require.NoError(t, err) t.Logf("Sandbox state should be NOTREADY") - status, err := runtimeService.PodSandboxStatus(sb) - require.NoError(t, err) - assert.Equal(t, runtime.PodSandboxState_SANDBOX_NOTREADY, status.GetState()) + assert.NoError(t, Eventually(func() (bool, error) { + status, err := runtimeService.PodSandboxStatus(sb) + if err != nil { + return false, err + } + return status.GetState() == runtime.PodSandboxState_SANDBOX_NOTREADY, nil + }, time.Second, 30*time.Second), "sandbox state should become NOTREADY") t.Logf("Should not be able to remove the sandbox when netns is not closed") assert.Error(t, runtimeService.RemovePodSandbox(sb)) diff --git a/pkg/server/container_list_test.go b/pkg/server/container_list_test.go index c4f31b386..09e2136c8 100644 --- a/pkg/server/container_list_test.go +++ b/pkg/server/container_list_test.go @@ -170,20 +170,26 @@ func (c containerForTest) toContainer() (containerstore.Container, error) { func TestListContainers(t *testing.T) { c := newTestCRIContainerdService() sandboxesInStore := []sandboxstore.Sandbox{ - { - Metadata: sandboxstore.Metadata{ + sandboxstore.NewSandbox( + sandboxstore.Metadata{ ID: "s-1abcdef1234", Name: "sandboxname-1", Config: &runtime.PodSandboxConfig{Metadata: &runtime.PodSandboxMetadata{Name: "podname-1"}}, }, - }, - { - Metadata: sandboxstore.Metadata{ + sandboxstore.Status{ + State: sandboxstore.StateReady, + }, + ), + sandboxstore.NewSandbox( + sandboxstore.Metadata{ ID: "s-2abcdef1234", Name: "sandboxname-2", Config: &runtime.PodSandboxConfig{Metadata: &runtime.PodSandboxMetadata{Name: "podname-2"}}, }, - }, + sandboxstore.Status{ + State: sandboxstore.StateNotReady, + }, + ), } createdAt := time.Now().UnixNano() startedAt := time.Now().UnixNano() diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 88dc8f5cb..56e05459d 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -23,12 +23,14 @@ import ( "github.com/containerd/containerd" containerdio "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/errdefs" "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" cio "github.com/containerd/cri-containerd/pkg/server/io" containerstore "github.com/containerd/cri-containerd/pkg/store/container" + sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) // StartContainer starts the container. @@ -89,18 +91,7 @@ func (c *criContainerdService) startContainer(ctx context.Context, return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err) } sandboxID := meta.SandboxID - // Make sure sandbox is running. - s, err := sandbox.Container.Task(ctx, nil) - if err != nil { - return fmt.Errorf("failed to get sandbox container %q info: %v", sandboxID, err) - } - // This is only a best effort check, sandbox may still exit after this. If sandbox fails - // before starting the container, the start will fail. - taskStatus, err := s.Status(ctx) - if err != nil { - return fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err) - } - if taskStatus.Status != containerd.Running { + if sandbox.Status.Get().State != sandboxstore.StateReady { return fmt.Errorf("sandbox container %q is not running", sandboxID) } @@ -132,7 +123,8 @@ func (c *criContainerdService) startContainer(ctx context.Context, } defer func() { if retErr != nil { - if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil { + // It's possible that task is deleted by event monitor. + if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { logrus.WithError(err).Errorf("Failed to delete containerd task %q", id) } } diff --git a/pkg/server/container_status.go b/pkg/server/container_status.go index f78892d46..3c494410d 100644 --- a/pkg/server/container_status.go +++ b/pkg/server/container_status.go @@ -111,6 +111,7 @@ type containerInfo struct { } // toCRIContainerInfo converts internal container object information to CRI container status response info map. +// TODO(random-liu): Return error instead of logging. func toCRIContainerInfo(ctx context.Context, container containerstore.Container, verbose bool) (map[string]string, error) { if !verbose { return nil, nil diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index b24bdb9b9..a57e31514 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -31,11 +31,9 @@ import ( containerstore "github.com/containerd/cri-containerd/pkg/store/container" ) -const ( - // killContainerTimeout is the timeout that we wait for the container to - // be SIGKILLed. - killContainerTimeout = 2 * time.Minute -) +// killContainerTimeout is the timeout that we wait for the container to +// be SIGKILLed. +const killContainerTimeout = 2 * time.Minute // StopContainer stops a running container with a grace period (i.e., timeout). func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) { diff --git a/pkg/server/events.go b/pkg/server/events.go index 0df4e681e..23572f52f 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -28,12 +28,14 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/net/context" + "github.com/containerd/cri-containerd/pkg/store" containerstore "github.com/containerd/cri-containerd/pkg/store/container" sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) // eventMonitor monitors containerd event and updates internal state correspondingly. -// TODO(random-liu): [P1] Is it possible to drop event during containerd is running? +// TODO(random-liu): [P1] Figure out is it possible to drop event during containerd +// is running. If it is, we should do periodically list to sync state with containerd. type eventMonitor struct { containerStore *containerstore.Store sandboxStore *sandboxstore.Store @@ -106,57 +108,22 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { e := any.(*eventtypes.TaskExit) logrus.Infof("TaskExit event %+v", e) cntr, err := em.containerStore.Get(e.ContainerID) - if err != nil { - if _, err := em.sandboxStore.Get(e.ContainerID); err == nil { - return - } + if err == nil { + handleContainerExit(e, cntr) + return + } else if err != store.ErrNotExist { logrus.WithError(err).Errorf("Failed to get container %q", e.ContainerID) return } - if e.Pid != cntr.Status.Get().Pid { - // Non-init process died, ignore the event. + // Use GetAll to include sandbox in unknown state. + sb, err := em.sandboxStore.GetAll(e.ContainerID) + if err == nil { + handleSandboxExit(e, sb) + return + } else if err != store.ErrNotExist { + logrus.WithError(err).Errorf("Failed to get sandbox %q", e.ContainerID) return } - // Attach container IO so that `Delete` could cleanup the stream properly. - task, err := cntr.Container.Task(context.Background(), - func(*containerdio.FIFOSet) (containerdio.IO, error) { - return cntr.IO, nil - }, - ) - if err != nil { - if !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to stop container, task not found for container %q", e.ContainerID) - return - } - } else { - // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker - if _, err = task.Delete(context.Background()); err != nil { - // TODO(random-liu): [P0] Enqueue the event and retry. - if !errdefs.IsNotFound(err) { - logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID) - return - } - // Move on to make sure container status is updated. - } - } - err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { - // If FinishedAt has been set (e.g. with start failure), keep as - // it is. - if status.FinishedAt != 0 { - return status, nil - } - status.Pid = 0 - status.FinishedAt = e.ExitedAt.UnixNano() - status.ExitCode = int32(e.ExitStatus) - return status, nil - }) - if err != nil { - logrus.WithError(err).Errorf("Failed to update container %q state", e.ContainerID) - // TODO(random-liu): [P0] Enqueue the event and retry. - return - } - // Using channel to propagate the information of container stop - cntr.Stop() case *eventtypes.TaskOOM: e := any.(*eventtypes.TaskOOM) logrus.Infof("TaskOOM event %+v", e) @@ -177,3 +144,95 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { } } } + +// handleContainerExit handles TaskExit event for container. +func handleContainerExit(e *eventtypes.TaskExit, cntr containerstore.Container) { + if e.Pid != cntr.Status.Get().Pid { + // Non-init process died, ignore the event. + return + } + // Attach container IO so that `Delete` could cleanup the stream properly. + task, err := cntr.Container.Task(context.Background(), + func(*containerdio.FIFOSet) (containerdio.IO, error) { + return cntr.IO, nil + }, + ) + if err != nil { + if !errdefs.IsNotFound(err) { + logrus.WithError(err).Errorf("failed to load task for container %q", e.ContainerID) + return + } + } else { + // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker + if _, err = task.Delete(context.Background()); err != nil { + // TODO(random-liu): [P0] Enqueue the event and retry. + if !errdefs.IsNotFound(err) { + logrus.WithError(err).Errorf("failed to stop container %q", e.ContainerID) + return + } + // Move on to make sure container status is updated. + } + } + err = cntr.Status.UpdateSync(func(status containerstore.Status) (containerstore.Status, error) { + // If FinishedAt has been set (e.g. with start failure), keep as + // it is. + if status.FinishedAt != 0 { + return status, nil + } + status.Pid = 0 + status.FinishedAt = e.ExitedAt.UnixNano() + status.ExitCode = int32(e.ExitStatus) + return status, nil + }) + if err != nil { + logrus.WithError(err).Errorf("Failed to update container %q state", e.ContainerID) + // TODO(random-liu): [P0] Enqueue the event and retry. + return + } + // Using channel to propagate the information of container stop + cntr.Stop() +} + +// handleSandboxExit handles TaskExit event for sandbox. +func handleSandboxExit(e *eventtypes.TaskExit, sb sandboxstore.Sandbox) { + if e.Pid != sb.Status.Get().Pid { + // Non-init process died, ignore the event. + return + } + // No stream attached to sandbox container. + task, err := sb.Container.Task(context.Background(), nil) + if err != nil { + if !errdefs.IsNotFound(err) { + logrus.WithError(err).Errorf("failed to load task for sandbox %q", e.ContainerID) + return + } + } else { + // TODO(random-liu): [P1] This may block the loop, we may want to spawn a worker + if _, err = task.Delete(context.Background()); err != nil { + // TODO(random-liu): [P0] Enqueue the event and retry. + if !errdefs.IsNotFound(err) { + logrus.WithError(err).Errorf("failed to stop sandbox %q", e.ContainerID) + return + } + // Move on to make sure container status is updated. + } + } + err = sb.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + // NOTE(random-liu): We SHOULD NOT change UNKNOWN state here. + // If sandbox state is UNKNOWN when event monitor receives an TaskExit event, + // it means that sandbox start has failed. In that case, `RunPodSandbox` will + // cleanup everything immediately. + // Once sandbox state goes out of UNKNOWN, it becomes visable to the user, which + // is not what we want. + if status.State != sandboxstore.StateUnknown { + status.State = sandboxstore.StateNotReady + } + status.Pid = 0 + return status, nil + }) + if err != nil { + logrus.WithError(err).Errorf("Failed to update sandbox %q state", e.ContainerID) + // TODO(random-liu): [P0] Enqueue the event and retry. + return + } +} diff --git a/pkg/server/restart.go b/pkg/server/restart.go index 22d0cdb4c..ee8f471ec 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -293,10 +293,63 @@ func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.S return sandbox, fmt.Errorf("failed to unmarshal metadata extension %q: %v", ext, err) } meta := data.(*sandboxstore.Metadata) - sandbox = sandboxstore.Sandbox{ - Metadata: *meta, - Container: cntr, + + // Load sandbox created timestamp. + info, err := cntr.Info(ctx) + if err != nil { + return sandbox, fmt.Errorf("failed to get sandbox container info: %v", err) } + createdAt := info.CreatedAt + + // Load sandbox status. + t, err := cntr.Task(ctx, nil) + if err != nil && !errdefs.IsNotFound(err) { + return sandbox, fmt.Errorf("failed to load task: %v", err) + } + var s containerd.Status + var notFound bool + if errdefs.IsNotFound(err) { + // Task is not found. + notFound = true + } else { + // Task is found. Get task status. + s, err = t.Status(ctx) + if err != nil { + // It's still possible that task is deleted during this window. + if !errdefs.IsNotFound(err) { + return sandbox, fmt.Errorf("failed to get task status: %v", err) + } + notFound = true + } + } + var state sandboxstore.State + var pid uint32 + if notFound { + // Task does not exist, set sandbox state as NOTREADY. + state = sandboxstore.StateNotReady + } else { + if s.Status == containerd.Running { + // Task is running, set sandbox state as READY. + state = sandboxstore.StateReady + pid = t.Pid() + } else { + // Task is not running. Delete the task and set sandbox state as NOTREADY. + if _, err := t.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + return sandbox, fmt.Errorf("failed to delete task: %v", err) + } + state = sandboxstore.StateNotReady + } + } + + sandbox = sandboxstore.NewSandbox( + *meta, + sandboxstore.Status{ + Pid: pid, + CreatedAt: createdAt, + State: state, + }, + ) + sandbox.Container = cntr // Load network namespace. if meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions().GetHostNetwork() { diff --git a/pkg/server/sandbox_list.go b/pkg/server/sandbox_list.go index 02378c470..6f182e806 100644 --- a/pkg/server/sandbox_list.go +++ b/pkg/server/sandbox_list.go @@ -17,12 +17,6 @@ limitations under the License. package server import ( - "fmt" - "time" - - tasks "github.com/containerd/containerd/api/services/tasks/v1" - "github.com/containerd/containerd/api/types/task" - "github.com/containerd/containerd/errdefs" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -33,39 +27,12 @@ import ( func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.ListPodSandboxRequest) (*runtime.ListPodSandboxResponse, error) { // List all sandboxes from store. sandboxesInStore := c.sandboxStore.List() - - response, err := c.client.TaskService().List(ctx, &tasks.ListTasksRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to list sandbox containers: %v", err) - } - var sandboxes []*runtime.PodSandbox for _, sandboxInStore := range sandboxesInStore { - var sandboxInContainerd *task.Process - for _, s := range response.Tasks { - if s.ID == sandboxInStore.ID { - sandboxInContainerd = s - break - } - } - - // Set sandbox state to NOTREADY by default. - state := runtime.PodSandboxState_SANDBOX_NOTREADY - // If the sandbox container is running, return the sandbox as READY. - if sandboxInContainerd != nil && sandboxInContainerd.Status == task.StatusRunning { - state = runtime.PodSandboxState_SANDBOX_READY - } - - info, err := sandboxInStore.Container.Info(ctx) - if err != nil { - // It's possible that container gets deleted during list. - if errdefs.IsNotFound(err) { - continue - } - return nil, fmt.Errorf("failed to get sandbox container %q info: %v", sandboxInStore.ID, err) - } - createdAt := info.CreatedAt - sandboxes = append(sandboxes, toCRISandbox(sandboxInStore.Metadata, state, createdAt)) + sandboxes = append(sandboxes, toCRISandbox( + sandboxInStore.Metadata, + sandboxInStore.Status.Get(), + )) } sandboxes = c.filterCRISandboxes(sandboxes, r.GetFilter()) @@ -73,12 +40,17 @@ func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.Li } // toCRISandbox converts sandbox metadata into CRI pod sandbox. -func toCRISandbox(meta sandboxstore.Metadata, state runtime.PodSandboxState, createdAt time.Time) *runtime.PodSandbox { +func toCRISandbox(meta sandboxstore.Metadata, status sandboxstore.Status) *runtime.PodSandbox { + // Set sandbox state to NOTREADY by default. + state := runtime.PodSandboxState_SANDBOX_NOTREADY + if status.State == sandboxstore.StateReady { + state = runtime.PodSandboxState_SANDBOX_READY + } return &runtime.PodSandbox{ Id: meta.ID, Metadata: meta.Config.GetMetadata(), State: state, - CreatedAt: createdAt.UnixNano(), + CreatedAt: status.CreatedAt.UnixNano(), Labels: meta.Config.GetLabels(), Annotations: meta.Config.GetAnnotations(), } diff --git a/pkg/server/sandbox_list_test.go b/pkg/server/sandbox_list_test.go index ff4efe56d..988ed289c 100644 --- a/pkg/server/sandbox_list_test.go +++ b/pkg/server/sandbox_list_test.go @@ -44,90 +44,106 @@ func TestToCRISandbox(t *testing.T) { Config: config, NetNSPath: "test-netns", } - state := runtime.PodSandboxState_SANDBOX_READY expect := &runtime.PodSandbox{ Id: "test-id", Metadata: config.GetMetadata(), - State: state, CreatedAt: createdAt.UnixNano(), Labels: config.GetLabels(), Annotations: config.GetAnnotations(), } - s := toCRISandbox(meta, state, createdAt) - assert.Equal(t, expect, s) + for desc, test := range map[string]struct { + state sandboxstore.State + expectedState runtime.PodSandboxState + }{ + "sandbox state ready": { + state: sandboxstore.StateReady, + expectedState: runtime.PodSandboxState_SANDBOX_READY, + }, + "sandbox state not ready": { + state: sandboxstore.StateNotReady, + expectedState: runtime.PodSandboxState_SANDBOX_NOTREADY, + }, + } { + status := sandboxstore.Status{ + CreatedAt: createdAt, + State: test.state, + } + expect.State = test.expectedState + s := toCRISandbox(meta, status) + assert.Equal(t, expect, s, desc) + } } func TestFilterSandboxes(t *testing.T) { c := newTestCRIContainerdService() - sandboxes := []struct { - sandbox sandboxstore.Sandbox - state runtime.PodSandboxState - }{ - { - sandbox: sandboxstore.Sandbox{ - Metadata: sandboxstore.Metadata{ - ID: "1abcdef", - Name: "sandboxname-1", - Config: &runtime.PodSandboxConfig{ - Metadata: &runtime.PodSandboxMetadata{ - Name: "podname-1", - Uid: "uid-1", - Namespace: "ns-1", - Attempt: 1, - }, + sandboxes := []sandboxstore.Sandbox{ + sandboxstore.NewSandbox( + sandboxstore.Metadata{ + ID: "1abcdef", + Name: "sandboxname-1", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "podname-1", + Uid: "uid-1", + Namespace: "ns-1", + Attempt: 1, }, }, }, - state: runtime.PodSandboxState_SANDBOX_READY, - }, - { - sandbox: sandboxstore.Sandbox{ - Metadata: sandboxstore.Metadata{ - ID: "2abcdef", - Name: "sandboxname-2", - Config: &runtime.PodSandboxConfig{ - Metadata: &runtime.PodSandboxMetadata{ - Name: "podname-2", - Uid: "uid-2", - Namespace: "ns-2", - Attempt: 2, - }, - Labels: map[string]string{"a": "b"}, + sandboxstore.Status{ + CreatedAt: time.Now(), + State: sandboxstore.StateReady, + }, + ), + sandboxstore.NewSandbox( + sandboxstore.Metadata{ + ID: "2abcdef", + Name: "sandboxname-2", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "podname-2", + Uid: "uid-2", + Namespace: "ns-2", + Attempt: 2, }, + Labels: map[string]string{"a": "b"}, }, }, - state: runtime.PodSandboxState_SANDBOX_NOTREADY, - }, - { - sandbox: sandboxstore.Sandbox{ - Metadata: sandboxstore.Metadata{ - ID: "3abcdef", - Name: "sandboxname-3", - Config: &runtime.PodSandboxConfig{ - Metadata: &runtime.PodSandboxMetadata{ - Name: "podname-2", - Uid: "uid-2", - Namespace: "ns-2", - Attempt: 2, - }, - Labels: map[string]string{"c": "d"}, + sandboxstore.Status{ + CreatedAt: time.Now(), + State: sandboxstore.StateNotReady, + }, + ), + sandboxstore.NewSandbox( + sandboxstore.Metadata{ + ID: "3abcdef", + Name: "sandboxname-3", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "podname-2", + Uid: "uid-2", + Namespace: "ns-2", + Attempt: 2, }, + Labels: map[string]string{"c": "d"}, }, }, - state: runtime.PodSandboxState_SANDBOX_READY, - }, + sandboxstore.Status{ + CreatedAt: time.Now(), + State: sandboxstore.StateReady, + }, + ), } // Create PodSandbox testSandboxes := []*runtime.PodSandbox{} - createdAt := time.Now() for _, sb := range sandboxes { - testSandboxes = append(testSandboxes, toCRISandbox(sb.sandbox.Metadata, sb.state, createdAt)) + testSandboxes = append(testSandboxes, toCRISandbox(sb.Metadata, sb.Status.Get())) } // Inject test sandbox metadata for _, sb := range sandboxes { - assert.NoError(t, c.sandboxStore.Add(sb.sandbox)) + assert.NoError(t, c.sandboxStore.Add(sb)) } for desc, test := range map[string]struct { diff --git a/pkg/server/sandbox_portforward.go b/pkg/server/sandbox_portforward.go index 696e36e3a..900ffa81f 100644 --- a/pkg/server/sandbox_portforward.go +++ b/pkg/server/sandbox_portforward.go @@ -24,10 +24,11 @@ import ( "os/exec" "strings" - "github.com/containerd/containerd" "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + + sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address. @@ -37,16 +38,7 @@ func (c *criContainerdService) PortForward(ctx context.Context, r *runtime.PortF if err != nil { return nil, fmt.Errorf("failed to find sandbox %q: %v", r.GetPodSandboxId(), err) } - - t, err := sandbox.Container.Task(ctx, nil) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox container task: %v", err) - } - status, err := t.Status(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox container status: %v", err) - } - if status.Status != containerd.Running { + if sandbox.Status.Get().State != sandboxstore.StateReady { return nil, errors.New("sandbox container is not running") } // TODO(random-liu): Verify that ports are exposed. diff --git a/pkg/server/sandbox_remove.go b/pkg/server/sandbox_remove.go index 1fe92acf7..6136cd989 100644 --- a/pkg/server/sandbox_remove.go +++ b/pkg/server/sandbox_remove.go @@ -27,6 +27,7 @@ import ( "github.com/containerd/cri-containerd/pkg/log" "github.com/containerd/cri-containerd/pkg/store" + sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) // RemovePodSandbox removes the sandbox. If there are running containers in the @@ -46,12 +47,8 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. // Use the full sandbox id. id := sandbox.ID - // Return error if sandbox container is not fully stopped. - _, err = sandbox.Container.Task(ctx, nil) - if err != nil && !errdefs.IsNotFound(err) { - return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) - } - if err == nil { + // Return error if sandbox container is still running. + if sandbox.Status.Get().State == sandboxstore.StateReady { return nil, fmt.Errorf("sandbox container %q is not fully stopped", id) } diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index ff26f9f19..0a0c337e2 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -23,6 +23,7 @@ import ( "github.com/containerd/containerd" containerdio "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/linux/runctypes" "github.com/containerd/containerd/oci" "github.com/containerd/typeurl" @@ -68,13 +69,16 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run }() // Create initial internal sandbox object. - sandbox := sandboxstore.Sandbox{ - Metadata: sandboxstore.Metadata{ + sandbox := sandboxstore.NewSandbox( + sandboxstore.Metadata{ ID: id, Name: name, Config: config, }, - } + sandboxstore.Status{ + State: sandboxstore.StateUnknown, + }, + ) // Ensure sandbox container image snapshot. image, err := c.ensureImageExists(ctx, c.config.SandboxImage) @@ -224,33 +228,86 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run } }() - // Create sandbox task in containerd. - log.Tracef("Create sandbox container (id=%q, name=%q).", - id, name) - // We don't need stdio for sandbox container. - task, err := container.NewTask(ctx, containerdio.NullIO) + // Update sandbox created timestamp. + info, err := container.Info(ctx) if err != nil { - return nil, fmt.Errorf("failed to create containerd task: %v", err) + return nil, fmt.Errorf("failed to get sandbox container info: %v", err) } - defer func() { - if retErr != nil { - // Cleanup the sandbox container if an error is returned. - if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil { - logrus.WithError(err).Errorf("Failed to delete sandbox container %q", id) - } - } - }() - - if err = task.Start(ctx); err != nil { - return nil, fmt.Errorf("failed to start sandbox container task %q: %v", - id, err) + if err := sandbox.Status.Update(func(status sandboxstore.Status) (sandboxstore.Status, error) { + status.CreatedAt = info.CreatedAt + return status, nil + }); err != nil { + return nil, fmt.Errorf("failed to update sandbox created timestamp: %v", err) } - // Add sandbox into sandbox store. + // Add sandbox into sandbox store in UNKNOWN state. sandbox.Container = container if err := c.sandboxStore.Add(sandbox); err != nil { return nil, fmt.Errorf("failed to add sandbox %+v into store: %v", sandbox, err) } + defer func() { + // Delete sandbox from sandbox store if there is an error. + if retErr != nil { + c.sandboxStore.Delete(id) + } + }() + // NOTE(random-liu): Sandbox state only stay in UNKNOWN state after this point + // and before the end of this function. + // * If `Update` succeeds, sandbox state will become READY in one transaction. + // * If `Update` fails, sandbox will be removed from the store in the defer above. + // * If cri-containerd stops at any point before `Update` finishes, because sandbox + // state is not checkpointed, it will be recovered from corresponding containerd task + // status during restart: + // * If the task is running, sandbox state will be READY, + // * Or else, sandbox state will be NOTREADY. + // + // In any case, sandbox will leave UNKNOWN state, so it's safe to ignore sandbox + // in UNKNOWN state in other functions. + + // Start sandbox container in one transaction to avoid race condition with + // event monitor. + if err := sandbox.Status.Update(func(status sandboxstore.Status) (_ sandboxstore.Status, retErr error) { + // NOTE(random-liu): We should not change the sandbox state to NOTREADY + // if `Update` fails. + // + // If `Update` fails, the sandbox will be cleaned up by all the defers + // above. We should not let user see this sandbox, or else they will + // see the sandbox disappear after the defer clean up, which may confuse + // them. + // + // Given so, we should keep the sandbox in UNKNOWN state if `Update` fails, + // and ignore sandbox in UNKNOWN state in all the inspection functions. + + // Create sandbox task in containerd. + log.Tracef("Create sandbox container (id=%q, name=%q).", + id, name) + // We don't need stdio for sandbox container. + task, err := container.NewTask(ctx, containerdio.NullIO) + if err != nil { + return status, fmt.Errorf("failed to create containerd task: %v", err) + } + defer func() { + if retErr != nil { + // Cleanup the sandbox container if an error is returned. + // It's possible that task is deleted by event monitor. + if _, err := task.Delete(ctx, containerd.WithProcessKill); err != nil && !errdefs.IsNotFound(err) { + logrus.WithError(err).Errorf("Failed to delete sandbox container %q", id) + } + } + }() + + if err := task.Start(ctx); err != nil { + return status, fmt.Errorf("failed to start sandbox container task %q: %v", + id, err) + } + + // Set the pod sandbox as ready after successfully start sandbox container. + status.Pid = task.Pid() + status.State = sandboxstore.StateReady + return status, nil + }); err != nil { + return nil, fmt.Errorf("failed to start sandbox container: %v", err) + } return &runtime.RunPodSandboxResponse{PodSandboxId: id}, nil } diff --git a/pkg/server/sandbox_status.go b/pkg/server/sandbox_status.go index 662af1cb9..d8b9307a2 100644 --- a/pkg/server/sandbox_status.go +++ b/pkg/server/sandbox_status.go @@ -19,12 +19,10 @@ package server import ( "encoding/json" "fmt" - "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" runtimespec "github.com/opencontainers/runtime-spec/specs-go" - "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -38,32 +36,14 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime. return nil, fmt.Errorf("an error occurred when try to find sandbox: %v", err) } - task, err := sandbox.Container.Task(ctx, nil) - if err != nil && !errdefs.IsNotFound(err) { - return nil, fmt.Errorf("failed to get sandbox container task: %v", err) - } - - var pid uint32 - var processStatus containerd.ProcessStatus - // If the sandbox container is running, treat it as READY. - if task != nil { - taskStatus, err := task.Status(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get task status: %v", err) - } - - pid = task.Pid() - processStatus = taskStatus.Status - } - ip := c.getIP(sandbox) - ctrInfo, err := sandbox.Container.Info(ctx) - if err != nil { - return nil, fmt.Errorf("failed to get sandbox container info: %v", err) + status := toCRISandboxStatus(sandbox.Metadata, sandbox.Status.Get(), ip) + if !r.GetVerbose() { + return &runtime.PodSandboxStatusResponse{Status: status}, nil } - createdAt := ctrInfo.CreatedAt - status := toCRISandboxStatus(sandbox.Metadata, processStatus, createdAt, ip) - info, err := toCRISandboxInfo(ctx, sandbox, pid, processStatus, r.GetVerbose()) + + // Generate verbose information. + info, err := toCRISandboxInfo(ctx, sandbox) if err != nil { return nil, fmt.Errorf("failed to get verbose sandbox container info: %v", err) } @@ -92,10 +72,10 @@ func (c *criContainerdService) getIP(sandbox sandboxstore.Sandbox) string { } // toCRISandboxStatus converts sandbox metadata into CRI pod sandbox status. -func toCRISandboxStatus(meta sandboxstore.Metadata, status containerd.ProcessStatus, createdAt time.Time, ip string) *runtime.PodSandboxStatus { +func toCRISandboxStatus(meta sandboxstore.Metadata, status sandboxstore.Status, ip string) *runtime.PodSandboxStatus { // Set sandbox state to NOTREADY by default. state := runtime.PodSandboxState_SANDBOX_NOTREADY - if status == containerd.Running { + if status.State == sandboxstore.StateReady { state = runtime.PodSandboxState_SANDBOX_READY } nsOpts := meta.Config.GetLinux().GetSecurityContext().GetNamespaceOptions() @@ -103,7 +83,7 @@ func toCRISandboxStatus(meta sandboxstore.Metadata, status containerd.ProcessSta Id: meta.ID, Metadata: meta.Config.GetMetadata(), State: state, - CreatedAt: createdAt.UnixNano(), + CreatedAt: status.CreatedAt.UnixNano(), Network: &runtime.PodSandboxNetworkStatus{Ip: ip}, Linux: &runtime.LinuxPodSandboxStatus{ Namespaces: &runtime.Namespace{ @@ -132,14 +112,25 @@ type sandboxInfo struct { } // toCRISandboxInfo converts internal container object information to CRI sandbox status response info map. -func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox, - pid uint32, processStatus containerd.ProcessStatus, verbose bool) (map[string]string, error) { - if !verbose { - return nil, nil +func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox) (map[string]string, error) { + container := sandbox.Container + task, err := container.Task(ctx, nil) + if err != nil && !errdefs.IsNotFound(err) { + return nil, fmt.Errorf("failed to get sandbox container task: %v", err) + } + + var processStatus containerd.ProcessStatus + if task != nil { + taskStatus, err := task.Status(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get task status: %v", err) + } + + processStatus = taskStatus.Status } si := &sandboxInfo{ - Pid: pid, + Pid: sandbox.Status.Get().Pid, Status: string(processStatus), Config: sandbox.Config, } @@ -155,25 +146,22 @@ func toCRISandboxInfo(ctx context.Context, sandbox sandboxstore.Sandbox, si.NetNSClosed = (sandbox.NetNS == nil || sandbox.NetNS.Closed()) } - container := sandbox.Container spec, err := container.Spec(ctx) - if err == nil { - si.RuntimeSpec = spec - } else { - logrus.WithError(err).Errorf("Failed to get sandbox container %q runtime spec", sandbox.ID) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox container runtime spec: %v", err) } + si.RuntimeSpec = spec ctrInfo, err := container.Info(ctx) - if err == nil { - // Do not use config.SandboxImage because the configuration might - // be changed during restart. It may not reflect the actual image - // used by the sandbox container. - si.Image = ctrInfo.Image - si.SnapshotKey = ctrInfo.SnapshotKey - si.Snapshotter = ctrInfo.Snapshotter - } else { - logrus.WithError(err).Errorf("Failed to get sandbox container %q info", sandbox.ID) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox container info: %v", err) } + // Do not use config.SandboxImage because the configuration might + // be changed during restart. It may not reflect the actual image + // used by the sandbox container. + si.Image = ctrInfo.Image + si.SnapshotKey = ctrInfo.SnapshotKey + si.Snapshotter = ctrInfo.Snapshotter infoBytes, err := json.Marshal(si) if err != nil { diff --git a/pkg/server/sandbox_status_test.go b/pkg/server/sandbox_status_test.go index 945a22b05..c857d5dc4 100644 --- a/pkg/server/sandbox_status_test.go +++ b/pkg/server/sandbox_status_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "github.com/containerd/containerd" "github.com/stretchr/testify/assert" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -52,12 +51,10 @@ func TestPodSandboxStatus(t *testing.T) { Labels: map[string]string{"a": "b"}, Annotations: map[string]string{"c": "d"}, } - sandbox := &sandboxstore.Sandbox{ - Metadata: sandboxstore.Metadata{ - ID: id, - Name: "test-name", - Config: config, - }, + metadata := sandboxstore.Metadata{ + ID: id, + Name: "test-name", + Config: config, } expected := &runtime.PodSandboxStatus{ @@ -77,21 +74,26 @@ func TestPodSandboxStatus(t *testing.T) { Labels: config.GetLabels(), Annotations: config.GetAnnotations(), } - for _, status := range []containerd.ProcessStatus{ - "", - containerd.Running, - containerd.Created, - containerd.Stopped, - containerd.Paused, - containerd.Pausing, - containerd.Unknown, + for desc, test := range map[string]struct { + state sandboxstore.State + expectedState runtime.PodSandboxState + }{ + "sandbox state ready": { + state: sandboxstore.StateReady, + expectedState: runtime.PodSandboxState_SANDBOX_READY, + }, + "sandbox state not ready": { + state: sandboxstore.StateNotReady, + expectedState: runtime.PodSandboxState_SANDBOX_NOTREADY, + }, } { - state := runtime.PodSandboxState_SANDBOX_NOTREADY - if status == containerd.Running { - state = runtime.PodSandboxState_SANDBOX_READY + t.Logf("TestCase: %s", desc) + status := sandboxstore.Status{ + CreatedAt: createdAt, + State: test.state, } - expected.State = state - got := toCRISandboxStatus(sandbox.Metadata, status, createdAt, ip) + expected.State = test.expectedState + got := toCRISandboxStatus(metadata, status, ip) assert.Equal(t, expected, got) } } diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index 39c35fbf9..7cdb8573a 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -19,6 +19,7 @@ package server import ( "fmt" "os" + "time" "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" @@ -26,8 +27,14 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + + sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" ) +// stopCheckPollInterval is the the interval to check whether a sandbox +// is stopped successfully. +const stopCheckPollInterval = 100 * time.Millisecond + // StopPodSandbox stops the sandbox. If there are any running containers in the // sandbox, they should be forcibly terminated. func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.StopPodSandboxRequest) (*runtime.StopPodSandboxResponse, error) { @@ -89,14 +96,18 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St return nil, fmt.Errorf("failed to unmount sandbox files in %q: %v", sandboxRoot, err) } - if err := c.stopSandboxContainer(ctx, sandbox.Container); err != nil { - return nil, fmt.Errorf("failed to stop sandbox container %q: %v", id, err) + // Only stop sandbox container when it's running. + if sandbox.Status.Get().State == sandboxstore.StateReady { + if err := c.stopSandboxContainer(ctx, sandbox); err != nil { + return nil, fmt.Errorf("failed to stop sandbox container %q: %v", id, err) + } } return &runtime.StopPodSandboxResponse{}, nil } // stopSandboxContainer kills and deletes sandbox container. -func (c *criContainerdService) stopSandboxContainer(ctx context.Context, container containerd.Container) error { +func (c *criContainerdService) stopSandboxContainer(ctx context.Context, sandbox sandboxstore.Sandbox) error { + container := sandbox.Container task, err := container.Task(ctx, nil) if err != nil { if errdefs.IsNotFound(err) { @@ -111,5 +122,28 @@ func (c *criContainerdService) stopSandboxContainer(ctx context.Context, contain return fmt.Errorf("failed to delete sandbox container: %v", err) } - return nil + return c.waitSandboxStop(ctx, sandbox, killContainerTimeout) +} + +// waitSandboxStop polls sandbox state until timeout exceeds or sandbox is stopped. +func (c *criContainerdService) waitSandboxStop(ctx context.Context, sandbox sandboxstore.Sandbox, timeout time.Duration) error { + ticker := time.NewTicker(stopCheckPollInterval) + defer ticker.Stop() + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() + for { + // Poll once before waiting for stopCheckPollInterval. + // TODO(random-liu): Use channel with event handler instead of polling. + if sandbox.Status.Get().State == sandboxstore.StateNotReady { + return nil + } + select { + case <-ctx.Done(): + return fmt.Errorf("wait sandbox container %q is cancelled", sandbox.ID) + case <-timeoutTimer.C: + return fmt.Errorf("wait sandbox container %q stop timeout", sandbox.ID) + case <-ticker.C: + continue + } + } } diff --git a/pkg/server/sandbox_stop_test.go b/pkg/server/sandbox_stop_test.go new file mode 100644 index 000000000..09ff82fb8 --- /dev/null +++ b/pkg/server/sandbox_stop_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + + sandboxstore "github.com/containerd/cri-containerd/pkg/store/sandbox" +) + +func TestWaitSandboxStop(t *testing.T) { + id := "test-id" + for desc, test := range map[string]struct { + state sandboxstore.State + cancel bool + timeout time.Duration + expectErr bool + }{ + "should return error if timeout exceeds": { + state: sandboxstore.StateReady, + timeout: 2 * stopCheckPollInterval, + expectErr: true, + }, + "should return error if context is cancelled": { + state: sandboxstore.StateReady, + timeout: time.Hour, + cancel: true, + expectErr: true, + }, + "should not return error if sandbox is stopped before timeout": { + state: sandboxstore.StateNotReady, + timeout: time.Hour, + expectErr: false, + }, + } { + c := newTestCRIContainerdService() + sandbox := sandboxstore.NewSandbox( + sandboxstore.Metadata{ID: id}, + sandboxstore.Status{State: test.state}, + ) + ctx := context.Background() + if test.cancel { + cancelledCtx, cancel := context.WithCancel(ctx) + cancel() + ctx = cancelledCtx + } + err := c.waitSandboxStop(ctx, sandbox, test.timeout) + assert.Equal(t, test.expectErr, err != nil, desc) + } +} diff --git a/pkg/store/container/status.go b/pkg/store/container/status.go index 0e6a21baa..d0573a721 100644 --- a/pkg/store/container/status.go +++ b/pkg/store/container/status.go @@ -119,8 +119,6 @@ type StatusStorage interface { Delete() error } -// TODO(random-liu): Add factory function and configure checkpoint path. - // StoreStatus creates the storage containing the passed in container status with the // specified id. // The status MUST be created in one transaction. diff --git a/pkg/store/sandbox/metadata.go b/pkg/store/sandbox/metadata.go index 910c30ce6..5e9847498 100644 --- a/pkg/store/sandbox/metadata.go +++ b/pkg/store/sandbox/metadata.go @@ -52,6 +52,8 @@ type Metadata struct { Config *runtime.PodSandboxConfig // NetNSPath is the network namespace used by the sandbox. NetNSPath string + // IP of Pod if it is attached to non host network + IP string } // MarshalJSON encodes Metadata into bytes in json format. diff --git a/pkg/store/sandbox/sandbox.go b/pkg/store/sandbox/sandbox.go index e2089f8fd..0b69c2b23 100644 --- a/pkg/store/sandbox/sandbox.go +++ b/pkg/store/sandbox/sandbox.go @@ -30,12 +30,21 @@ import ( type Sandbox struct { // Metadata is the metadata of the sandbox, it is immutable after created. Metadata + // Status stores the status of the sandbox. + Status StatusStorage // Container is the containerd sandbox container client Container containerd.Container // CNI network namespace client NetNS *NetNS - // IP of Pod if it is attached to non host network - IP string +} + +// NewSandbox creates an internally used sandbox type. This functions reminds +// the caller that a sandbox must have a status. +func NewSandbox(metadata Metadata, status Status) Sandbox { + return Sandbox{ + Metadata: metadata, + Status: StoreStatus(status), + } } // Store stores all sandboxes. @@ -67,9 +76,22 @@ func (s *Store) Add(sb Sandbox) error { return nil } -// Get returns the sandbox with specified id. Returns nil +// Get returns the sandbox with specified id. Returns store.ErrNotExist // if the sandbox doesn't exist. func (s *Store) Get(id string) (Sandbox, error) { + sb, err := s.GetAll(id) + if err != nil { + return sb, err + } + if sb.Status.Get().State == StateUnknown { + return Sandbox{}, store.ErrNotExist + } + return sb, nil +} + +// GetAll returns the sandbox with specified id, including sandbox in unknown +// state. Returns store.ErrNotExist if the sandbox doesn't exist. +func (s *Store) GetAll(id string) (Sandbox, error) { s.lock.RLock() defer s.lock.RUnlock() id, err := s.idIndex.Get(id) @@ -91,6 +113,9 @@ func (s *Store) List() []Sandbox { defer s.lock.RUnlock() var sandboxes []Sandbox for _, sb := range s.sandboxes { + if sb.Status.Get().State == StateUnknown { + continue + } sandboxes = append(sandboxes, sb) } return sandboxes diff --git a/pkg/store/sandbox/sandbox_test.go b/pkg/store/sandbox/sandbox_test.go index 288fc9ce2..12472878f 100644 --- a/pkg/store/sandbox/sandbox_test.go +++ b/pkg/store/sandbox/sandbox_test.go @@ -26,72 +26,96 @@ import ( ) func TestSandboxStore(t *testing.T) { - metadatas := map[string]Metadata{ - "1": { - ID: "1", - Name: "Sandbox-1", - Config: &runtime.PodSandboxConfig{ - Metadata: &runtime.PodSandboxMetadata{ - Name: "TestPod-1", - Uid: "TestUid-1", - Namespace: "TestNamespace-1", - Attempt: 1, + sandboxes := map[string]Sandbox{ + "1": NewSandbox( + Metadata{ + ID: "1", + Name: "Sandbox-1", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-1", + Uid: "TestUid-1", + Namespace: "TestNamespace-1", + Attempt: 1, + }, }, + NetNSPath: "TestNetNS-1", }, - NetNSPath: "TestNetNS-1", - }, - "2abcd": { - ID: "2abcd", - Name: "Sandbox-2abcd", - Config: &runtime.PodSandboxConfig{ - Metadata: &runtime.PodSandboxMetadata{ - Name: "TestPod-2abcd", - Uid: "TestUid-2abcd", - Namespace: "TestNamespace-2abcd", - Attempt: 2, + Status{State: StateReady}, + ), + "2abcd": NewSandbox( + Metadata{ + ID: "2abcd", + Name: "Sandbox-2abcd", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-2abcd", + Uid: "TestUid-2abcd", + Namespace: "TestNamespace-2abcd", + Attempt: 2, + }, }, + NetNSPath: "TestNetNS-2", }, - NetNSPath: "TestNetNS-2", - }, - "4a333": { - ID: "4a333", - Name: "Sandbox-4a333", - Config: &runtime.PodSandboxConfig{ - Metadata: &runtime.PodSandboxMetadata{ - Name: "TestPod-4a333", - Uid: "TestUid-4a333", - Namespace: "TestNamespace-4a333", - Attempt: 3, + Status{State: StateNotReady}, + ), + "4a333": NewSandbox( + Metadata{ + ID: "4a333", + Name: "Sandbox-4a333", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-4a333", + Uid: "TestUid-4a333", + Namespace: "TestNamespace-4a333", + Attempt: 3, + }, }, + NetNSPath: "TestNetNS-3", }, - NetNSPath: "TestNetNS-3", - }, - "4abcd": { - ID: "4abcd", - Name: "Sandbox-4abcd", - Config: &runtime.PodSandboxConfig{ - Metadata: &runtime.PodSandboxMetadata{ - Name: "TestPod-4abcd", - Uid: "TestUid-4abcd", - Namespace: "TestNamespace-4abcd", - Attempt: 1, + Status{State: StateNotReady}, + ), + "4abcd": NewSandbox( + Metadata{ + ID: "4abcd", + Name: "Sandbox-4abcd", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-4abcd", + Uid: "TestUid-4abcd", + Namespace: "TestNamespace-4abcd", + Attempt: 1, + }, }, + NetNSPath: "TestNetNS-4abcd", }, - NetNSPath: "TestNetNS-4abcd", - }, + Status{State: StateReady}, + ), } + unknown := NewSandbox( + Metadata{ + ID: "3defg", + Name: "Sandbox-3defg", + Config: &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "TestPod-3defg", + Uid: "TestUid-3defg", + Namespace: "TestNamespace-3defg", + Attempt: 1, + }, + }, + NetNSPath: "TestNetNS-3defg", + }, + Status{State: StateUnknown}, + ) assert := assertlib.New(t) - sandboxes := map[string]Sandbox{} - for id := range metadatas { - sandboxes[id] = Sandbox{Metadata: metadatas[id]} - } - s := NewStore() t.Logf("should be able to add sandbox") for _, sb := range sandboxes { assert.NoError(s.Add(sb)) } + assert.NoError(s.Add(unknown)) t.Logf("should be able to get sandbox") genTruncIndex := func(normalName string) string { return normalName[:(len(normalName)+1)/2] } @@ -101,6 +125,16 @@ func TestSandboxStore(t *testing.T) { assert.Equal(sb, got) } + t.Logf("should not be able to get unknown sandbox") + got, err := s.Get(unknown.ID) + assert.Equal(store.ErrNotExist, err) + assert.Equal(Sandbox{}, got) + + t.Logf("should be able to get unknown sandbox with GetAll") + got, err = s.GetAll(unknown.ID) + assert.NoError(err) + assert.Equal(unknown, got) + t.Logf("should be able to list sandboxes") sbs := s.List() assert.Len(sbs, len(sandboxes)) diff --git a/pkg/store/sandbox/status.go b/pkg/store/sandbox/status.go new file mode 100644 index 000000000..20df9749a --- /dev/null +++ b/pkg/store/sandbox/status.go @@ -0,0 +1,100 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sandbox + +import ( + "sync" + "time" +) + +// State is the sandbox state we use in cri-containerd. +// It has unknown state defined. +type State uint32 + +const ( + // StateUnknown is unknown state of sandbox. Sandbox + // is in unknown state before its corresponding sandbox container + // is created. Sandbox in unknown state should be ignored by most + // functions, unless the caller needs to update sandbox state. + StateUnknown State = iota + // StateReady is ready state, it means sandbox container + // is running. + StateReady + // StateNotReady is notready state, it ONLY means sandbox + // container is not running. + // StopPodSandbox should still be called for NOTREADY sandbox to + // cleanup resources other than sandbox container, e.g. network namespace. + // This is an assumption made in CRI. + StateNotReady +) + +// Status is the status of a sandbox. +type Status struct { + // Pid is the init process id of the sandbox container. + Pid uint32 + // CreatedAt is the created timestamp. + CreatedAt time.Time + // State is the state of the sandbox. + State State +} + +// UpdateFunc is function used to update the sandbox status. If there +// is an error, the update will be rolled back. +type UpdateFunc func(Status) (Status, error) + +// StatusStorage manages the sandbox status. +// The status storage for sandbox is different from container status storage, +// because we don't checkpoint sandbox status. If we need checkpoint in the +// future, we should combine this with container status storage. +type StatusStorage interface { + // Get a sandbox status. + Get() Status + // Update the sandbox status. Note that the update MUST be applied + // in one transaction. + Update(UpdateFunc) error +} + +// StoreStatus creates the storage containing the passed in sandbox status with the +// specified id. +// The status MUST be created in one transaction. +func StoreStatus(status Status) StatusStorage { + return &statusStorage{status: status} +} + +type statusStorage struct { + sync.RWMutex + status Status +} + +// Get a copy of sandbox status. +func (s *statusStorage) Get() Status { + s.RLock() + defer s.RUnlock() + return s.status +} + +// Update the sandbox status. +func (s *statusStorage) Update(u UpdateFunc) error { + s.Lock() + defer s.Unlock() + newStatus, err := u(s.status) + if err != nil { + return err + } + s.status = newStatus + return nil +} diff --git a/pkg/store/sandbox/status_test.go b/pkg/store/sandbox/status_test.go new file mode 100644 index 000000000..6973ac66c --- /dev/null +++ b/pkg/store/sandbox/status_test.go @@ -0,0 +1,61 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sandbox + +import ( + "errors" + "testing" + "time" + + assertlib "github.com/stretchr/testify/assert" +) + +func TestStatus(t *testing.T) { + testStatus := Status{ + Pid: 123, + CreatedAt: time.Now(), + State: StateUnknown, + } + updateStatus := Status{ + Pid: 456, + CreatedAt: time.Now(), + State: StateReady, + } + updateErr := errors.New("update error") + assert := assertlib.New(t) + + t.Logf("simple store and get") + s := StoreStatus(testStatus) + old := s.Get() + assert.Equal(testStatus, old) + + t.Logf("failed update should not take effect") + err := s.Update(func(o Status) (Status, error) { + o = updateStatus + return o, updateErr + }) + assert.Equal(updateErr, err) + assert.Equal(testStatus, s.Get()) + + t.Logf("successful update should take effect but not checkpoint") + err = s.Update(func(o Status) (Status, error) { + o = updateStatus + return o, nil + }) + assert.NoError(err) + assert.Equal(updateStatus, s.Get()) +}