diff --git a/integration/container_update_resources_test.go b/integration/container_update_resources_test.go index 23be1d034..2d301feff 100644 --- a/integration/container_update_resources_test.go +++ b/integration/container_update_resources_test.go @@ -60,7 +60,7 @@ func TestUpdateContainerResources(t *testing.T) { t.Logf("Check memory limit in container OCI spec") container, err := containerdClient.LoadContainer(context.Background(), cn) require.NoError(t, err) - spec, err := container.Spec() + spec, err := container.Spec(context.Background()) require.NoError(t, err) checkMemoryLimit(t, spec, 2*1024*1024) @@ -71,9 +71,7 @@ func TestUpdateContainerResources(t *testing.T) { require.NoError(t, err) t.Logf("Check memory limit in container OCI spec") - container, err = containerdClient.LoadContainer(context.Background(), cn) - require.NoError(t, err) - spec, err = container.Spec() + spec, err = container.Spec(context.Background()) require.NoError(t, err) checkMemoryLimit(t, spec, 4*1024*1024) @@ -96,9 +94,7 @@ func TestUpdateContainerResources(t *testing.T) { require.NoError(t, err) t.Logf("Check memory limit in container OCI spec") - container, err = containerdClient.LoadContainer(context.Background(), cn) - require.NoError(t, err) - spec, err = container.Spec() + spec, err = container.Spec(context.Background()) require.NoError(t, err) checkMemoryLimit(t, spec, 8*1024*1024) diff --git a/pkg/server/container_attach.go b/pkg/server/container_attach.go index bdf046da0..a7621480c 100644 --- a/pkg/server/container_attach.go +++ b/pkg/server/container_attach.go @@ -61,7 +61,7 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s return fmt.Errorf("container is in %s state", criContainerStateToString(state)) } - task, err := cntr.Container.Get().Task(ctx, nil) + task, err := cntr.Container.Task(ctx, nil) if err != nil { return fmt.Errorf("failed to load task: %v", err) } diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index 7d75e5ae7..8136909e2 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -87,8 +87,8 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) } - container := cntr.Container.Get() - spec, err := container.Spec() + container := cntr.Container + spec, err := container.Spec(ctx) if err != nil { return nil, fmt.Errorf("failed to get container spec: %v", err) } diff --git a/pkg/server/container_remove.go b/pkg/server/container_remove.go index 80fe756e6..7caa54bda 100644 --- a/pkg/server/container_remove.go +++ b/pkg/server/container_remove.go @@ -75,7 +75,7 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R } // Delete containerd container. - if err := container.Container.Get().Delete(ctx, containerd.WithSnapshotCleanup); err != nil { + if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { if !errdefs.IsNotFound(err) { return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err) } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index 14d800d8d..e2fb5b871 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -59,7 +59,7 @@ func (c *criContainerdService) startContainer(ctx context.Context, status *containerstore.Status) (retErr error) { id := cntr.ID meta := cntr.Metadata - container := cntr.Container.Get() + container := cntr.Container config := meta.Config // Return error if container is not in created state. diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index 205c0550c..9c7260096 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -88,7 +88,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont } } glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal) - task, err := container.Container.Get().Task(ctx, nil) + task, err := container.Container.Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err) @@ -111,7 +111,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont glog.Errorf("Stop container %q timed out: %v", id, err) } - task, err := container.Container.Get().Task(ctx, nil) + task, err := container.Container.Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err) diff --git a/pkg/server/container_update_resources.go b/pkg/server/container_update_resources.go index 00ab1ee1f..97189d033 100644 --- a/pkg/server/container_update_resources.go +++ b/pkg/server/container_update_resources.go @@ -17,9 +17,11 @@ limitations under the License. package server import ( + gocontext "context" "fmt" "github.com/containerd/containerd" + "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/typeurl" "github.com/golang/glog" @@ -63,7 +65,7 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context, // spec makes sure that the resource limits are correct when start; // if the container is already started, updating spec is still required, // the spec will become our source of truth for resource limits. - oldSpec, err := cntr.Container.Get().Spec() + oldSpec, err := cntr.Container.Spec(ctx) if err != nil { return fmt.Errorf("failed to get container spec: %v", err) } @@ -72,42 +74,15 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context, return fmt.Errorf("failed to update resource in spec: %v", err) } - info := cntr.Container.Get().Info() - any, err := typeurl.MarshalAny(newSpec) - if err != nil { - return fmt.Errorf("failed to marshal spec %+v: %v", newSpec, err) - } - info.Spec = any - // TODO(random-liu): Add helper function in containerd to do the update. - if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil { - return fmt.Errorf("failed to update container spec: %v", err) + if err := updateContainerSpec(ctx, cntr.Container, newSpec); err != nil { + return err } defer func() { if retErr != nil { // Reset spec on error. - any, err := typeurl.MarshalAny(oldSpec) - if err != nil { - glog.Errorf("Failed to marshal spec %+v for container %q: %v", oldSpec, id, err) - return + if err := updateContainerSpec(ctx, cntr.Container, oldSpec); err != nil { + glog.Errorf("Failed to update spec %+v for container %q: %v", oldSpec, id, err) } - info.Spec = any - if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil { - glog.Errorf("Failed to recover spec %+v for container %q: %v", oldSpec, id, err) - } - } - }() - container, err := c.client.LoadContainer(ctx, id) - if err != nil { - return fmt.Errorf("failed to load container: %v", err) - } - defer func() { - if retErr == nil { - // Update container client if no error is returned. - // NOTE(random-liu): By updating container client, we'll be able - // to get latest OCI spec from it, which includes the up-to-date - // container resource limits. This will be useful after the debug - // api is introduced. - cntr.Container.Set(container) } }() @@ -117,7 +92,7 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context, return nil } - task, err := container.Task(ctx, nil) + task, err := cntr.Container.Task(ctx, nil) if err != nil { if errdefs.IsNotFound(err) { // Task exited already. @@ -136,6 +111,21 @@ func (c *criContainerdService) updateContainerResources(ctx context.Context, return nil } +// updateContainerSpec updates container spec. +func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *runtimespec.Spec) error { + any, err := typeurl.MarshalAny(spec) + if err != nil { + return fmt.Errorf("failed to marshal spec %+v: %v", spec, err) + } + if err := cntr.Update(ctx, func(ctx gocontext.Context, client *containerd.Client, c *containers.Container) error { + c.Spec = any + return nil + }); err != nil { + return fmt.Errorf("failed to update container spec: %v", err) + } + return nil +} + // updateOCILinuxResource updates container resource limit. func updateOCILinuxResource(spec *runtimespec.Spec, new *runtime.LinuxContainerResources) (*runtimespec.Spec, error) { // Copy to make sure old spec is not changed. diff --git a/pkg/server/events.go b/pkg/server/events.go index c5b64531c..f38c0e2f9 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -104,7 +104,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { return } // Attach container IO so that `Delete` could cleanup the stream properly. - task, err := cntr.Container.Get().Task(context.Background(), + task, err := cntr.Container.Task(context.Background(), func(*containerd.FIFOSet) (containerd.IO, error) { return cntr.IO, nil }, diff --git a/pkg/server/restart.go b/pkg/server/restart.go index 92c8c04a1..22bd2e029 100644 --- a/pkg/server/restart.go +++ b/pkg/server/restart.go @@ -127,7 +127,11 @@ func loadContainer(ctx context.Context, cntr containerd.Container, containerDir id := cntr.ID() var container containerstore.Container // Load container metadata. - ext, ok := cntr.Extensions()[containerMetadataExtension] + exts, err := cntr.Extensions(ctx) + if err != nil { + return container, fmt.Errorf("failed to get container extensions: %v", err) + } + ext, ok := exts[containerMetadataExtension] if !ok { return container, fmt.Errorf("metadata extension %q not found", containerMetadataExtension) } @@ -278,7 +282,11 @@ func unknownContainerStatus() containerstore.Status { func loadSandbox(ctx context.Context, cntr containerd.Container) (sandboxstore.Sandbox, error) { var sandbox sandboxstore.Sandbox // Load sandbox metadata. - ext, ok := cntr.Extensions()[sandboxMetadataExtension] + exts, err := cntr.Extensions(ctx) + if err != nil { + return sandbox, fmt.Errorf("failed to get sandbox container extensions: %v", err) + } + ext, ok := exts[sandboxMetadataExtension] if !ok { return sandbox, fmt.Errorf("metadata extension %q not found", sandboxMetadataExtension) } diff --git a/pkg/server/sandbox_list.go b/pkg/server/sandbox_list.go index ba2e04170..aca59e2de 100644 --- a/pkg/server/sandbox_list.go +++ b/pkg/server/sandbox_list.go @@ -22,6 +22,7 @@ import ( tasks "github.com/containerd/containerd/api/services/tasks/v1" "github.com/containerd/containerd/api/types/task" + "github.com/containerd/containerd/errdefs" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" @@ -55,7 +56,15 @@ func (c *criContainerdService) ListPodSandbox(ctx context.Context, r *runtime.Li state = runtime.PodSandboxState_SANDBOX_READY } - createdAt := sandboxInStore.Container.Info().CreatedAt + info, err := sandboxInStore.Container.Info(ctx) + if err != nil { + // It's possible that container gets deleted during list. + if errdefs.IsNotFound(err) { + continue + } + return nil, fmt.Errorf("failed to get sandbox container %q info: %v", sandboxInStore.ID, err) + } + createdAt := info.CreatedAt sandboxes = append(sandboxes, toCRISandbox(sandboxInStore.Metadata, state, createdAt)) } diff --git a/pkg/server/sandbox_status.go b/pkg/server/sandbox_status.go index ae1e2c275..6c7863206 100644 --- a/pkg/server/sandbox_status.go +++ b/pkg/server/sandbox_status.go @@ -34,15 +34,12 @@ import ( func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime.PodSandboxStatusRequest) (*runtime.PodSandboxStatusResponse, error) { sandbox, err := c.sandboxStore.Get(r.GetPodSandboxId()) if err != nil { - return nil, fmt.Errorf("an error occurred when try to find sandbox %q: %v", - r.GetPodSandboxId(), err) + return nil, fmt.Errorf("an error occurred when try to find sandbox: %v", err) } - // Use the full sandbox id. - id := sandbox.ID task, err := sandbox.Container.Task(ctx, nil) if err != nil && !errdefs.IsNotFound(err) { - return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) + return nil, fmt.Errorf("failed to get sandbox container task: %v", err) } // Set sandbox state to NOTREADY by default. @@ -51,7 +48,7 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime. if task != nil { taskStatus, err := task.Status(ctx) if err != nil { - return nil, fmt.Errorf("failed to get task status for sandbox container %q: %v", id, err) + return nil, fmt.Errorf("failed to get task status: %v", err) } if taskStatus.Status == containerd.Running { @@ -64,7 +61,11 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime. return nil, fmt.Errorf("failed to get sandbox ip: %v", err) } - createdAt := sandbox.Container.Info().CreatedAt + info, err := sandbox.Container.Info(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get sandbox container info: %v", err) + } + createdAt := info.CreatedAt status := toCRISandboxStatus(sandbox.Metadata, state, createdAt, ip) return &runtime.PodSandboxStatusResponse{Status: status}, nil } diff --git a/pkg/store/container/client.go b/pkg/store/container/client.go deleted file mode 100644 index 92689eab9..000000000 --- a/pkg/store/container/client.go +++ /dev/null @@ -1,45 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package container - -import ( - "sync" - - "github.com/containerd/containerd" -) - -// Client holds the containerd container client. -// containerd.Container is a pointer underlying. New assignment won't affect -// the previous pointer, so simply lock around is enough. -type Client struct { - lock sync.RWMutex - container containerd.Container -} - -// Get containerd container client. -func (c *Client) Get() containerd.Container { - c.lock.RLock() - defer c.lock.RUnlock() - return c.container -} - -// Set containerd container client. -func (c *Client) Set(container containerd.Container) { - c.lock.Lock() - defer c.lock.Unlock() - c.container = container -} diff --git a/pkg/store/container/container.go b/pkg/store/container/container.go index 3d031ac54..d2bbb6ca8 100644 --- a/pkg/store/container/container.go +++ b/pkg/store/container/container.go @@ -33,7 +33,7 @@ type Container struct { // Status stores the status of the container. Status StatusStorage // Container is the containerd container client. - Container *Client + Container containerd.Container // Container IO IO *cio.ContainerIO // TODO(random-liu): Add stop channel to get rid of stop poll waiting. @@ -45,7 +45,7 @@ type Opts func(*Container) error // WithContainer adds the containerd Container to the internal data store. func WithContainer(cntr containerd.Container) Opts { return func(c *Container) error { - c.Container = &Client{container: cntr} + c.Container = cntr return nil } } @@ -95,10 +95,6 @@ type Store struct { // TODO(random-liu): Add trunc index. } -// LoadStore loads containers from runtime. -// TODO(random-liu): Implement LoadStore. -func LoadStore() *Store { return nil } - // NewStore creates a container store. func NewStore() *Store { return &Store{containers: make(map[string]Container)} diff --git a/pkg/store/image/image.go b/pkg/store/image/image.go index f010651be..abd81ea50 100644 --- a/pkg/store/image/image.go +++ b/pkg/store/image/image.go @@ -51,10 +51,6 @@ type Store struct { // TODO(random-liu): Add trunc index. } -// LoadStore loads images from runtime. -// TODO(random-liu): Implement LoadStore. -func LoadStore() *Store { return nil } - // NewStore creates an image store. func NewStore() *Store { return &Store{images: make(map[string]Image)} diff --git a/pkg/store/sandbox/sandbox.go b/pkg/store/sandbox/sandbox.go index 363f9e664..74501bf58 100644 --- a/pkg/store/sandbox/sandbox.go +++ b/pkg/store/sandbox/sandbox.go @@ -42,10 +42,6 @@ type Store struct { // TODO(random-liu): Add trunc index. } -// LoadStore loads sandboxes from runtime. -// TODO(random-liu): Implement LoadStore. -func LoadStore() *Store { return nil } - // NewStore creates a sandbox store. func NewStore() *Store { return &Store{sandboxes: make(map[string]Sandbox)}