From a81a47bf9bffefe10e075c128010ef30d7ac62dc Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 3 Oct 2017 06:03:39 +0000 Subject: [PATCH] Fix update container resources Signed-off-by: Lantao Liu --- pkg/server/container_attach.go | 2 +- pkg/server/container_create.go | 2 + pkg/server/container_create_test.go | 4 + pkg/server/container_execsync.go | 2 +- pkg/server/container_remove.go | 2 +- pkg/server/container_start.go | 2 +- pkg/server/container_stop.go | 4 +- pkg/server/container_update_resources.go | 151 +++++++++++++++--- pkg/server/container_update_resources_test.go | 150 ++++++++++++++--- pkg/server/events.go | 2 +- pkg/store/container/client.go | 45 ++++++ pkg/store/container/container.go | 6 +- pkg/store/container/metadata.go | 2 + pkg/store/sandbox/sandbox.go | 2 +- 14 files changed, 324 insertions(+), 52 deletions(-) create mode 100644 pkg/store/container/client.go diff --git a/pkg/server/container_attach.go b/pkg/server/container_attach.go index a7621480c..bdf046da0 100644 --- a/pkg/server/container_attach.go +++ b/pkg/server/container_attach.go @@ -61,7 +61,7 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s return fmt.Errorf("container is in %s state", criContainerStateToString(state)) } - task, err := cntr.Container.Task(ctx, nil) + task, err := cntr.Container.Get().Task(ctx, nil) if err != nil { return fmt.Errorf("failed to load task: %v", err) } diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 5c7749bbe..89f3ef3fc 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -647,6 +647,8 @@ func setOCILinuxResource(g *generate.Generator, resources *runtime.LinuxContaine g.SetLinuxResourcesCPUShares(uint64(resources.GetCpuShares())) g.SetLinuxResourcesMemoryLimit(resources.GetMemoryLimitInBytes()) g.SetProcessOOMScoreAdj(int(resources.GetOomScoreAdj())) + g.SetLinuxResourcesCPUCpus(resources.GetCpusetCpus()) + g.SetLinuxResourcesCPUMems(resources.GetCpusetMems()) } // getOCICapabilitiesList returns a list of all available capabilities. diff --git a/pkg/server/container_create_test.go b/pkg/server/container_create_test.go index 483b0e1a1..b03235328 100644 --- a/pkg/server/container_create_test.go +++ b/pkg/server/container_create_test.go @@ -96,6 +96,8 @@ func getCreateContainerTestData() (*runtime.ContainerConfig, *runtime.PodSandbox CpuShares: 300, MemoryLimitInBytes: 400, OomScoreAdj: 500, + CpusetCpus: "0-1", + CpusetMems: "2-3", }, SecurityContext: &runtime.LinuxContainerSecurityContext{ SupplementalGroups: []int64{1111, 2222}, @@ -138,6 +140,8 @@ func getCreateContainerTestData() (*runtime.ContainerConfig, *runtime.PodSandbox assert.EqualValues(t, *spec.Linux.Resources.CPU.Period, 100) assert.EqualValues(t, *spec.Linux.Resources.CPU.Quota, 200) assert.EqualValues(t, *spec.Linux.Resources.CPU.Shares, 300) + assert.EqualValues(t, spec.Linux.Resources.CPU.Cpus, "0-1") + assert.EqualValues(t, spec.Linux.Resources.CPU.Mems, "2-3") assert.EqualValues(t, *spec.Linux.Resources.Memory.Limit, 400) assert.EqualValues(t, *spec.Process.OOMScoreAdj, 500) diff --git a/pkg/server/container_execsync.go b/pkg/server/container_execsync.go index 625787886..7d75e5ae7 100644 --- a/pkg/server/container_execsync.go +++ b/pkg/server/container_execsync.go @@ -87,7 +87,7 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) } - container := cntr.Container + container := cntr.Container.Get() spec, err := container.Spec() if err != nil { return nil, fmt.Errorf("failed to get container spec: %v", err) diff --git a/pkg/server/container_remove.go b/pkg/server/container_remove.go index 7caa54bda..80fe756e6 100644 --- a/pkg/server/container_remove.go +++ b/pkg/server/container_remove.go @@ -75,7 +75,7 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R } // Delete containerd container. - if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { + if err := container.Container.Get().Delete(ctx, containerd.WithSnapshotCleanup); err != nil { if !errdefs.IsNotFound(err) { return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err) } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index e2fb5b871..14d800d8d 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -59,7 +59,7 @@ func (c *criContainerdService) startContainer(ctx context.Context, status *containerstore.Status) (retErr error) { id := cntr.ID meta := cntr.Metadata - container := cntr.Container + container := cntr.Container.Get() config := meta.Config // Return error if container is not in created state. diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index 9c7260096..205c0550c 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -88,7 +88,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont } } glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal) - task, err := container.Container.Task(ctx, nil) + task, err := container.Container.Get().Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err) @@ -111,7 +111,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont glog.Errorf("Stop container %q timed out: %v", id, err) } - task, err := container.Container.Task(ctx, nil) + task, err := container.Container.Get().Task(ctx, nil) if err != nil { if !errdefs.IsNotFound(err) { return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err) diff --git a/pkg/server/container_update_resources.go b/pkg/server/container_update_resources.go index 1c62dd2d1..00ab1ee1f 100644 --- a/pkg/server/container_update_resources.go +++ b/pkg/server/container_update_resources.go @@ -20,41 +20,150 @@ import ( "fmt" "github.com/containerd/containerd" - "github.com/golang/protobuf/proto" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/typeurl" + "github.com/golang/glog" runtimespec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/opencontainers/runtime-tools/generate" "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/conversion" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + + containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container" ) // UpdateContainerResources updates ContainerConfig of the container. func (c *criContainerdService) UpdateContainerResources(ctx context.Context, r *runtime.UpdateContainerResourcesRequest) (retRes *runtime.UpdateContainerResourcesResponse, retErr error) { - cntr, err := c.containerStore.Get(r.GetContainerId()) + container, err := c.containerStore.Get(r.GetContainerId()) if err != nil { return nil, fmt.Errorf("failed to find container: %v", err) } - task, err := cntr.Container.Task(ctx, nil) - if err != nil { - return nil, fmt.Errorf("failed to find task: %v", err) - } - resources := toOCIResources(r.GetLinux()) - if err := task.Update(ctx, containerd.WithResources(resources)); err != nil { + // Update resources in status update transaction, so that: + // 1) There won't be race condition with container start. + // 2) There won't be concurrent resource update to the same container. + if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) { + return status, c.updateContainerResources(ctx, container, r.GetLinux(), status) + }); err != nil { return nil, fmt.Errorf("failed to update resources: %v", err) } return &runtime.UpdateContainerResourcesResponse{}, nil } -// toOCIResources converts CRI resource constraints to OCI. -func toOCIResources(r *runtime.LinuxContainerResources) *runtimespec.LinuxResources { - return &runtimespec.LinuxResources{ - CPU: &runtimespec.LinuxCPU{ - Shares: proto.Uint64(uint64(r.GetCpuShares())), - Quota: proto.Int64(r.GetCpuQuota()), - Period: proto.Uint64(uint64(r.GetCpuPeriod())), - Cpus: r.GetCpusetCpus(), - Mems: r.GetCpusetMems(), - }, - Memory: &runtimespec.LinuxMemory{ - Limit: proto.Int64(r.GetMemoryLimitInBytes()), - }, +func (c *criContainerdService) updateContainerResources(ctx context.Context, + cntr containerstore.Container, + resources *runtime.LinuxContainerResources, + status containerstore.Status) (retErr error) { + id := cntr.ID + // Do not update the container when there is a removal in progress. + if status.Removing { + return fmt.Errorf("container %q is in removing state", id) } + + // Update container spec. If the container is not started yet, updating + // spec makes sure that the resource limits are correct when start; + // if the container is already started, updating spec is still required, + // the spec will become our source of truth for resource limits. + oldSpec, err := cntr.Container.Get().Spec() + if err != nil { + return fmt.Errorf("failed to get container spec: %v", err) + } + newSpec, err := updateOCILinuxResource(oldSpec, resources) + if err != nil { + return fmt.Errorf("failed to update resource in spec: %v", err) + } + + info := cntr.Container.Get().Info() + any, err := typeurl.MarshalAny(newSpec) + if err != nil { + return fmt.Errorf("failed to marshal spec %+v: %v", newSpec, err) + } + info.Spec = any + // TODO(random-liu): Add helper function in containerd to do the update. + if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil { + return fmt.Errorf("failed to update container spec: %v", err) + } + defer func() { + if retErr != nil { + // Reset spec on error. + any, err := typeurl.MarshalAny(oldSpec) + if err != nil { + glog.Errorf("Failed to marshal spec %+v for container %q: %v", oldSpec, id, err) + return + } + info.Spec = any + if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil { + glog.Errorf("Failed to recover spec %+v for container %q: %v", oldSpec, id, err) + } + } + }() + container, err := c.client.LoadContainer(ctx, id) + if err != nil { + return fmt.Errorf("failed to load container: %v", err) + } + defer func() { + if retErr == nil { + // Update container client if no error is returned. + // NOTE(random-liu): By updating container client, we'll be able + // to get latest OCI spec from it, which includes the up-to-date + // container resource limits. This will be useful after the debug + // api is introduced. + cntr.Container.Set(container) + } + }() + + // If container is not running, only update spec is enough, new resource + // limit will be applied when container start. + if status.State() != runtime.ContainerState_CONTAINER_RUNNING { + return nil + } + + task, err := container.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + // Task exited already. + return nil + } + return fmt.Errorf("failed to get task: %v", err) + } + // newSpec.Linux won't be nil + if err := task.Update(ctx, containerd.WithResources(newSpec.Linux.Resources)); err != nil { + if errdefs.IsNotFound(err) { + // Task exited already. + return nil + } + return fmt.Errorf("failed to update resources: %v", err) + } + return nil +} + +// updateOCILinuxResource updates container resource limit. +func updateOCILinuxResource(spec *runtimespec.Spec, new *runtime.LinuxContainerResources) (*runtimespec.Spec, error) { + // Copy to make sure old spec is not changed. + cloned, err := conversion.NewCloner().DeepCopy(spec) + if err != nil { + return nil, fmt.Errorf("failed to deep copy: %v", err) + } + g := generate.NewFromSpec(cloned.(*runtimespec.Spec)) + + if new.GetCpuPeriod() != 0 { + g.SetLinuxResourcesCPUPeriod(uint64(new.GetCpuPeriod())) + } + if new.GetCpuQuota() != 0 { + g.SetLinuxResourcesCPUQuota(new.GetCpuQuota()) + } + if new.GetCpuShares() != 0 { + g.SetLinuxResourcesCPUShares(uint64(new.GetCpuShares())) + } + if new.GetMemoryLimitInBytes() != 0 { + g.SetLinuxResourcesMemoryLimit(new.GetMemoryLimitInBytes()) + } + // OOMScore is not updatable. + if new.GetCpusetCpus() != "" { + g.SetLinuxResourcesCPUCpus(new.GetCpusetCpus()) + } + if new.GetCpusetMems() != "" { + g.SetLinuxResourcesCPUMems(new.GetCpusetMems()) + } + + return g.Spec(), nil } diff --git a/pkg/server/container_update_resources_test.go b/pkg/server/container_update_resources_test.go index c8ccff0f9..ea04e2d77 100644 --- a/pkg/server/container_update_resources_test.go +++ b/pkg/server/container_update_resources_test.go @@ -25,27 +25,137 @@ import ( "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" ) -func TestToOCIResources(t *testing.T) { - resources := &runtime.LinuxContainerResources{ - CpuPeriod: 10000, - CpuQuota: 20000, - CpuShares: 300, - MemoryLimitInBytes: 4000000, - OomScoreAdj: -500, - CpusetCpus: "6,7", - CpusetMems: "8,9", - } - expected := &runtimespec.LinuxResources{ - CPU: &runtimespec.LinuxCPU{ - Period: proto.Uint64(10000), - Quota: proto.Int64(20000), - Shares: proto.Uint64(300), - Cpus: "6,7", - Mems: "8,9", +func TestUpdateOCILinuxResource(t *testing.T) { + oomscoreadj := new(int) + *oomscoreadj = -500 + for desc, test := range map[string]struct { + spec *runtimespec.Spec + resources *runtime.LinuxContainerResources + expected *runtimespec.Spec + expectErr bool + }{ + "should be able to update each resource": { + spec: &runtimespec.Spec{ + Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj}, + Linux: &runtimespec.Linux{ + Resources: &runtimespec.LinuxResources{ + Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(12345)}, + CPU: &runtimespec.LinuxCPU{ + Shares: proto.Uint64(1111), + Quota: proto.Int64(2222), + Period: proto.Uint64(3333), + Cpus: "0-1", + Mems: "2-3", + }, + }, + }, + }, + resources: &runtime.LinuxContainerResources{ + CpuPeriod: 6666, + CpuQuota: 5555, + CpuShares: 4444, + MemoryLimitInBytes: 54321, + OomScoreAdj: 500, + CpusetCpus: "4-5", + CpusetMems: "6-7", + }, + expected: &runtimespec.Spec{ + Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj}, + Linux: &runtimespec.Linux{ + Resources: &runtimespec.LinuxResources{ + Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(54321)}, + CPU: &runtimespec.LinuxCPU{ + Shares: proto.Uint64(4444), + Quota: proto.Int64(5555), + Period: proto.Uint64(6666), + Cpus: "4-5", + Mems: "6-7", + }, + }, + }, + }, }, - Memory: &runtimespec.LinuxMemory{ - Limit: proto.Int64(4000000), + "should skip empty fields": { + spec: &runtimespec.Spec{ + Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj}, + Linux: &runtimespec.Linux{ + Resources: &runtimespec.LinuxResources{ + Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(12345)}, + CPU: &runtimespec.LinuxCPU{ + Shares: proto.Uint64(1111), + Quota: proto.Int64(2222), + Period: proto.Uint64(3333), + Cpus: "0-1", + Mems: "2-3", + }, + }, + }, + }, + resources: &runtime.LinuxContainerResources{ + CpuQuota: 5555, + CpuShares: 4444, + MemoryLimitInBytes: 54321, + OomScoreAdj: 500, + CpusetMems: "6-7", + }, + expected: &runtimespec.Spec{ + Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj}, + Linux: &runtimespec.Linux{ + Resources: &runtimespec.LinuxResources{ + Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(54321)}, + CPU: &runtimespec.LinuxCPU{ + Shares: proto.Uint64(4444), + Quota: proto.Int64(5555), + Period: proto.Uint64(3333), + Cpus: "0-1", + Mems: "6-7", + }, + }, + }, + }, }, + "should be able to fill empty fields": { + spec: &runtimespec.Spec{ + Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj}, + Linux: &runtimespec.Linux{ + Resources: &runtimespec.LinuxResources{ + Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(12345)}, + }, + }, + }, + resources: &runtime.LinuxContainerResources{ + CpuPeriod: 6666, + CpuQuota: 5555, + CpuShares: 4444, + MemoryLimitInBytes: 54321, + OomScoreAdj: 500, + CpusetCpus: "4-5", + CpusetMems: "6-7", + }, + expected: &runtimespec.Spec{ + Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj}, + Linux: &runtimespec.Linux{ + Resources: &runtimespec.LinuxResources{ + Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(54321)}, + CPU: &runtimespec.LinuxCPU{ + Shares: proto.Uint64(4444), + Quota: proto.Int64(5555), + Period: proto.Uint64(6666), + Cpus: "4-5", + Mems: "6-7", + }, + }, + }, + }, + }, + } { + t.Logf("TestCase %q", desc) + got, err := updateOCILinuxResource(test.spec, test.resources) + if test.expectErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + assert.Equal(t, test.expected, got) } - assert.Equal(t, expected, toOCIResources(resources)) } diff --git a/pkg/server/events.go b/pkg/server/events.go index f38c0e2f9..c5b64531c 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -104,7 +104,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) { return } // Attach container IO so that `Delete` could cleanup the stream properly. - task, err := cntr.Container.Task(context.Background(), + task, err := cntr.Container.Get().Task(context.Background(), func(*containerd.FIFOSet) (containerd.IO, error) { return cntr.IO, nil }, diff --git a/pkg/store/container/client.go b/pkg/store/container/client.go new file mode 100644 index 000000000..92689eab9 --- /dev/null +++ b/pkg/store/container/client.go @@ -0,0 +1,45 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "sync" + + "github.com/containerd/containerd" +) + +// Client holds the containerd container client. +// containerd.Container is a pointer underlying. New assignment won't affect +// the previous pointer, so simply lock around is enough. +type Client struct { + lock sync.RWMutex + container containerd.Container +} + +// Get containerd container client. +func (c *Client) Get() containerd.Container { + c.lock.RLock() + defer c.lock.RUnlock() + return c.container +} + +// Set containerd container client. +func (c *Client) Set(container containerd.Container) { + c.lock.Lock() + defer c.lock.Unlock() + c.container = container +} diff --git a/pkg/store/container/container.go b/pkg/store/container/container.go index 247d9afb0..3d031ac54 100644 --- a/pkg/store/container/container.go +++ b/pkg/store/container/container.go @@ -32,8 +32,8 @@ type Container struct { Metadata // Status stores the status of the container. Status StatusStorage - // Containerd container - Container containerd.Container + // Container is the containerd container client. + Container *Client // Container IO IO *cio.ContainerIO // TODO(random-liu): Add stop channel to get rid of stop poll waiting. @@ -45,7 +45,7 @@ type Opts func(*Container) error // WithContainer adds the containerd Container to the internal data store. func WithContainer(cntr containerd.Container) Opts { return func(c *Container) error { - c.Container = cntr + c.Container = &Client{container: cntr} return nil } } diff --git a/pkg/store/container/metadata.go b/pkg/store/container/metadata.go index 3c314090c..5f684123b 100644 --- a/pkg/store/container/metadata.go +++ b/pkg/store/container/metadata.go @@ -51,6 +51,8 @@ type Metadata struct { // SandboxID is the sandbox id the container belongs to. SandboxID string // Config is the CRI container config. + // NOTE(random-liu): Resource limits are updatable, the source + // of truth for resource limits are in containerd. Config *runtime.ContainerConfig // ImageRef is the reference of image used by the container. ImageRef string diff --git a/pkg/store/sandbox/sandbox.go b/pkg/store/sandbox/sandbox.go index 018aef9ae..363f9e664 100644 --- a/pkg/store/sandbox/sandbox.go +++ b/pkg/store/sandbox/sandbox.go @@ -29,7 +29,7 @@ import ( type Sandbox struct { // Metadata is the metadata of the sandbox, it is immutable after created. Metadata - // Containerd sandbox container + // Container is the containerd sandbox container client Container containerd.Container // CNI network namespace client NetNS *NetNS