Fix update container resources

Signed-off-by: Lantao Liu <lantaol@google.com>
This commit is contained in:
Lantao Liu 2017-10-03 06:03:39 +00:00
parent 29d9a788e6
commit a81a47bf9b
14 changed files with 324 additions and 52 deletions

View File

@ -61,7 +61,7 @@ func (c *criContainerdService) attachContainer(ctx context.Context, id string, s
return fmt.Errorf("container is in %s state", criContainerStateToString(state)) return fmt.Errorf("container is in %s state", criContainerStateToString(state))
} }
task, err := cntr.Container.Task(ctx, nil) task, err := cntr.Container.Get().Task(ctx, nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to load task: %v", err) return fmt.Errorf("failed to load task: %v", err)
} }

View File

@ -647,6 +647,8 @@ func setOCILinuxResource(g *generate.Generator, resources *runtime.LinuxContaine
g.SetLinuxResourcesCPUShares(uint64(resources.GetCpuShares())) g.SetLinuxResourcesCPUShares(uint64(resources.GetCpuShares()))
g.SetLinuxResourcesMemoryLimit(resources.GetMemoryLimitInBytes()) g.SetLinuxResourcesMemoryLimit(resources.GetMemoryLimitInBytes())
g.SetProcessOOMScoreAdj(int(resources.GetOomScoreAdj())) g.SetProcessOOMScoreAdj(int(resources.GetOomScoreAdj()))
g.SetLinuxResourcesCPUCpus(resources.GetCpusetCpus())
g.SetLinuxResourcesCPUMems(resources.GetCpusetMems())
} }
// getOCICapabilitiesList returns a list of all available capabilities. // getOCICapabilitiesList returns a list of all available capabilities.

View File

@ -96,6 +96,8 @@ func getCreateContainerTestData() (*runtime.ContainerConfig, *runtime.PodSandbox
CpuShares: 300, CpuShares: 300,
MemoryLimitInBytes: 400, MemoryLimitInBytes: 400,
OomScoreAdj: 500, OomScoreAdj: 500,
CpusetCpus: "0-1",
CpusetMems: "2-3",
}, },
SecurityContext: &runtime.LinuxContainerSecurityContext{ SecurityContext: &runtime.LinuxContainerSecurityContext{
SupplementalGroups: []int64{1111, 2222}, SupplementalGroups: []int64{1111, 2222},
@ -138,6 +140,8 @@ func getCreateContainerTestData() (*runtime.ContainerConfig, *runtime.PodSandbox
assert.EqualValues(t, *spec.Linux.Resources.CPU.Period, 100) assert.EqualValues(t, *spec.Linux.Resources.CPU.Period, 100)
assert.EqualValues(t, *spec.Linux.Resources.CPU.Quota, 200) assert.EqualValues(t, *spec.Linux.Resources.CPU.Quota, 200)
assert.EqualValues(t, *spec.Linux.Resources.CPU.Shares, 300) assert.EqualValues(t, *spec.Linux.Resources.CPU.Shares, 300)
assert.EqualValues(t, spec.Linux.Resources.CPU.Cpus, "0-1")
assert.EqualValues(t, spec.Linux.Resources.CPU.Mems, "2-3")
assert.EqualValues(t, *spec.Linux.Resources.Memory.Limit, 400) assert.EqualValues(t, *spec.Linux.Resources.Memory.Limit, 400)
assert.EqualValues(t, *spec.Process.OOMScoreAdj, 500) assert.EqualValues(t, *spec.Process.OOMScoreAdj, 500)

View File

@ -87,7 +87,7 @@ func (c *criContainerdService) execInContainer(ctx context.Context, id string, o
return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state)) return nil, fmt.Errorf("container is in %s state", criContainerStateToString(state))
} }
container := cntr.Container container := cntr.Container.Get()
spec, err := container.Spec() spec, err := container.Spec()
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get container spec: %v", err) return nil, fmt.Errorf("failed to get container spec: %v", err)

View File

@ -75,7 +75,7 @@ func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.R
} }
// Delete containerd container. // Delete containerd container.
if err := container.Container.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { if err := container.Container.Get().Delete(ctx, containerd.WithSnapshotCleanup); err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err) return nil, fmt.Errorf("failed to delete containerd container %q: %v", id, err)
} }

View File

@ -59,7 +59,7 @@ func (c *criContainerdService) startContainer(ctx context.Context,
status *containerstore.Status) (retErr error) { status *containerstore.Status) (retErr error) {
id := cntr.ID id := cntr.ID
meta := cntr.Metadata meta := cntr.Metadata
container := cntr.Container container := cntr.Container.Get()
config := meta.Config config := meta.Config
// Return error if container is not in created state. // Return error if container is not in created state.

View File

@ -88,7 +88,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
} }
} }
glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal) glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal)
task, err := container.Container.Task(ctx, nil) task, err := container.Container.Get().Task(ctx, nil)
if err != nil { if err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err) return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err)
@ -111,7 +111,7 @@ func (c *criContainerdService) stopContainer(ctx context.Context, container cont
glog.Errorf("Stop container %q timed out: %v", id, err) glog.Errorf("Stop container %q timed out: %v", id, err)
} }
task, err := container.Container.Task(ctx, nil) task, err := container.Container.Get().Task(ctx, nil)
if err != nil { if err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err) return fmt.Errorf("failed to stop container, task not found for container %q: %v", id, err)

View File

@ -20,41 +20,150 @@ import (
"fmt" "fmt"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/golang/protobuf/proto" "github.com/containerd/containerd/errdefs"
"github.com/containerd/typeurl"
"github.com/golang/glog"
runtimespec "github.com/opencontainers/runtime-spec/specs-go" runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/opencontainers/runtime-tools/generate"
"golang.org/x/net/context" "golang.org/x/net/context"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
containerstore "github.com/kubernetes-incubator/cri-containerd/pkg/store/container"
) )
// UpdateContainerResources updates ContainerConfig of the container. // UpdateContainerResources updates ContainerConfig of the container.
func (c *criContainerdService) UpdateContainerResources(ctx context.Context, r *runtime.UpdateContainerResourcesRequest) (retRes *runtime.UpdateContainerResourcesResponse, retErr error) { func (c *criContainerdService) UpdateContainerResources(ctx context.Context, r *runtime.UpdateContainerResourcesRequest) (retRes *runtime.UpdateContainerResourcesResponse, retErr error) {
cntr, err := c.containerStore.Get(r.GetContainerId()) container, err := c.containerStore.Get(r.GetContainerId())
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to find container: %v", err) return nil, fmt.Errorf("failed to find container: %v", err)
} }
task, err := cntr.Container.Task(ctx, nil) // Update resources in status update transaction, so that:
if err != nil { // 1) There won't be race condition with container start.
return nil, fmt.Errorf("failed to find task: %v", err) // 2) There won't be concurrent resource update to the same container.
} if err := container.Status.Update(func(status containerstore.Status) (containerstore.Status, error) {
resources := toOCIResources(r.GetLinux()) return status, c.updateContainerResources(ctx, container, r.GetLinux(), status)
if err := task.Update(ctx, containerd.WithResources(resources)); err != nil { }); err != nil {
return nil, fmt.Errorf("failed to update resources: %v", err) return nil, fmt.Errorf("failed to update resources: %v", err)
} }
return &runtime.UpdateContainerResourcesResponse{}, nil return &runtime.UpdateContainerResourcesResponse{}, nil
} }
// toOCIResources converts CRI resource constraints to OCI. func (c *criContainerdService) updateContainerResources(ctx context.Context,
func toOCIResources(r *runtime.LinuxContainerResources) *runtimespec.LinuxResources { cntr containerstore.Container,
return &runtimespec.LinuxResources{ resources *runtime.LinuxContainerResources,
CPU: &runtimespec.LinuxCPU{ status containerstore.Status) (retErr error) {
Shares: proto.Uint64(uint64(r.GetCpuShares())), id := cntr.ID
Quota: proto.Int64(r.GetCpuQuota()), // Do not update the container when there is a removal in progress.
Period: proto.Uint64(uint64(r.GetCpuPeriod())), if status.Removing {
Cpus: r.GetCpusetCpus(), return fmt.Errorf("container %q is in removing state", id)
Mems: r.GetCpusetMems(),
},
Memory: &runtimespec.LinuxMemory{
Limit: proto.Int64(r.GetMemoryLimitInBytes()),
},
} }
// Update container spec. If the container is not started yet, updating
// spec makes sure that the resource limits are correct when start;
// if the container is already started, updating spec is still required,
// the spec will become our source of truth for resource limits.
oldSpec, err := cntr.Container.Get().Spec()
if err != nil {
return fmt.Errorf("failed to get container spec: %v", err)
}
newSpec, err := updateOCILinuxResource(oldSpec, resources)
if err != nil {
return fmt.Errorf("failed to update resource in spec: %v", err)
}
info := cntr.Container.Get().Info()
any, err := typeurl.MarshalAny(newSpec)
if err != nil {
return fmt.Errorf("failed to marshal spec %+v: %v", newSpec, err)
}
info.Spec = any
// TODO(random-liu): Add helper function in containerd to do the update.
if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil {
return fmt.Errorf("failed to update container spec: %v", err)
}
defer func() {
if retErr != nil {
// Reset spec on error.
any, err := typeurl.MarshalAny(oldSpec)
if err != nil {
glog.Errorf("Failed to marshal spec %+v for container %q: %v", oldSpec, id, err)
return
}
info.Spec = any
if _, err := c.client.ContainerService().Update(ctx, info, "spec"); err != nil {
glog.Errorf("Failed to recover spec %+v for container %q: %v", oldSpec, id, err)
}
}
}()
container, err := c.client.LoadContainer(ctx, id)
if err != nil {
return fmt.Errorf("failed to load container: %v", err)
}
defer func() {
if retErr == nil {
// Update container client if no error is returned.
// NOTE(random-liu): By updating container client, we'll be able
// to get latest OCI spec from it, which includes the up-to-date
// container resource limits. This will be useful after the debug
// api is introduced.
cntr.Container.Set(container)
}
}()
// If container is not running, only update spec is enough, new resource
// limit will be applied when container start.
if status.State() != runtime.ContainerState_CONTAINER_RUNNING {
return nil
}
task, err := container.Task(ctx, nil)
if err != nil {
if errdefs.IsNotFound(err) {
// Task exited already.
return nil
}
return fmt.Errorf("failed to get task: %v", err)
}
// newSpec.Linux won't be nil
if err := task.Update(ctx, containerd.WithResources(newSpec.Linux.Resources)); err != nil {
if errdefs.IsNotFound(err) {
// Task exited already.
return nil
}
return fmt.Errorf("failed to update resources: %v", err)
}
return nil
}
// updateOCILinuxResource updates container resource limit.
func updateOCILinuxResource(spec *runtimespec.Spec, new *runtime.LinuxContainerResources) (*runtimespec.Spec, error) {
// Copy to make sure old spec is not changed.
cloned, err := conversion.NewCloner().DeepCopy(spec)
if err != nil {
return nil, fmt.Errorf("failed to deep copy: %v", err)
}
g := generate.NewFromSpec(cloned.(*runtimespec.Spec))
if new.GetCpuPeriod() != 0 {
g.SetLinuxResourcesCPUPeriod(uint64(new.GetCpuPeriod()))
}
if new.GetCpuQuota() != 0 {
g.SetLinuxResourcesCPUQuota(new.GetCpuQuota())
}
if new.GetCpuShares() != 0 {
g.SetLinuxResourcesCPUShares(uint64(new.GetCpuShares()))
}
if new.GetMemoryLimitInBytes() != 0 {
g.SetLinuxResourcesMemoryLimit(new.GetMemoryLimitInBytes())
}
// OOMScore is not updatable.
if new.GetCpusetCpus() != "" {
g.SetLinuxResourcesCPUCpus(new.GetCpusetCpus())
}
if new.GetCpusetMems() != "" {
g.SetLinuxResourcesCPUMems(new.GetCpusetMems())
}
return g.Spec(), nil
} }

View File

@ -25,27 +25,137 @@ import (
"k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
) )
func TestToOCIResources(t *testing.T) { func TestUpdateOCILinuxResource(t *testing.T) {
resources := &runtime.LinuxContainerResources{ oomscoreadj := new(int)
CpuPeriod: 10000, *oomscoreadj = -500
CpuQuota: 20000, for desc, test := range map[string]struct {
CpuShares: 300, spec *runtimespec.Spec
MemoryLimitInBytes: 4000000, resources *runtime.LinuxContainerResources
OomScoreAdj: -500, expected *runtimespec.Spec
CpusetCpus: "6,7", expectErr bool
CpusetMems: "8,9", }{
} "should be able to update each resource": {
expected := &runtimespec.LinuxResources{ spec: &runtimespec.Spec{
Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj},
Linux: &runtimespec.Linux{
Resources: &runtimespec.LinuxResources{
Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(12345)},
CPU: &runtimespec.LinuxCPU{ CPU: &runtimespec.LinuxCPU{
Period: proto.Uint64(10000), Shares: proto.Uint64(1111),
Quota: proto.Int64(20000), Quota: proto.Int64(2222),
Shares: proto.Uint64(300), Period: proto.Uint64(3333),
Cpus: "6,7", Cpus: "0-1",
Mems: "8,9", Mems: "2-3",
}, },
Memory: &runtimespec.LinuxMemory{
Limit: proto.Int64(4000000),
}, },
},
},
resources: &runtime.LinuxContainerResources{
CpuPeriod: 6666,
CpuQuota: 5555,
CpuShares: 4444,
MemoryLimitInBytes: 54321,
OomScoreAdj: 500,
CpusetCpus: "4-5",
CpusetMems: "6-7",
},
expected: &runtimespec.Spec{
Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj},
Linux: &runtimespec.Linux{
Resources: &runtimespec.LinuxResources{
Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(54321)},
CPU: &runtimespec.LinuxCPU{
Shares: proto.Uint64(4444),
Quota: proto.Int64(5555),
Period: proto.Uint64(6666),
Cpus: "4-5",
Mems: "6-7",
},
},
},
},
},
"should skip empty fields": {
spec: &runtimespec.Spec{
Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj},
Linux: &runtimespec.Linux{
Resources: &runtimespec.LinuxResources{
Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(12345)},
CPU: &runtimespec.LinuxCPU{
Shares: proto.Uint64(1111),
Quota: proto.Int64(2222),
Period: proto.Uint64(3333),
Cpus: "0-1",
Mems: "2-3",
},
},
},
},
resources: &runtime.LinuxContainerResources{
CpuQuota: 5555,
CpuShares: 4444,
MemoryLimitInBytes: 54321,
OomScoreAdj: 500,
CpusetMems: "6-7",
},
expected: &runtimespec.Spec{
Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj},
Linux: &runtimespec.Linux{
Resources: &runtimespec.LinuxResources{
Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(54321)},
CPU: &runtimespec.LinuxCPU{
Shares: proto.Uint64(4444),
Quota: proto.Int64(5555),
Period: proto.Uint64(3333),
Cpus: "0-1",
Mems: "6-7",
},
},
},
},
},
"should be able to fill empty fields": {
spec: &runtimespec.Spec{
Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj},
Linux: &runtimespec.Linux{
Resources: &runtimespec.LinuxResources{
Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(12345)},
},
},
},
resources: &runtime.LinuxContainerResources{
CpuPeriod: 6666,
CpuQuota: 5555,
CpuShares: 4444,
MemoryLimitInBytes: 54321,
OomScoreAdj: 500,
CpusetCpus: "4-5",
CpusetMems: "6-7",
},
expected: &runtimespec.Spec{
Process: &runtimespec.Process{OOMScoreAdj: oomscoreadj},
Linux: &runtimespec.Linux{
Resources: &runtimespec.LinuxResources{
Memory: &runtimespec.LinuxMemory{Limit: proto.Int64(54321)},
CPU: &runtimespec.LinuxCPU{
Shares: proto.Uint64(4444),
Quota: proto.Int64(5555),
Period: proto.Uint64(6666),
Cpus: "4-5",
Mems: "6-7",
},
},
},
},
},
} {
t.Logf("TestCase %q", desc)
got, err := updateOCILinuxResource(test.spec, test.resources)
if test.expectErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, test.expected, got)
} }
assert.Equal(t, expected, toOCIResources(resources))
} }

View File

@ -104,7 +104,7 @@ func (em *eventMonitor) handleEvent(evt *events.Envelope) {
return return
} }
// Attach container IO so that `Delete` could cleanup the stream properly. // Attach container IO so that `Delete` could cleanup the stream properly.
task, err := cntr.Container.Task(context.Background(), task, err := cntr.Container.Get().Task(context.Background(),
func(*containerd.FIFOSet) (containerd.IO, error) { func(*containerd.FIFOSet) (containerd.IO, error) {
return cntr.IO, nil return cntr.IO, nil
}, },

View File

@ -0,0 +1,45 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"sync"
"github.com/containerd/containerd"
)
// Client holds the containerd container client.
// containerd.Container is a pointer underlying. New assignment won't affect
// the previous pointer, so simply lock around is enough.
type Client struct {
lock sync.RWMutex
container containerd.Container
}
// Get containerd container client.
func (c *Client) Get() containerd.Container {
c.lock.RLock()
defer c.lock.RUnlock()
return c.container
}
// Set containerd container client.
func (c *Client) Set(container containerd.Container) {
c.lock.Lock()
defer c.lock.Unlock()
c.container = container
}

View File

@ -32,8 +32,8 @@ type Container struct {
Metadata Metadata
// Status stores the status of the container. // Status stores the status of the container.
Status StatusStorage Status StatusStorage
// Containerd container // Container is the containerd container client.
Container containerd.Container Container *Client
// Container IO // Container IO
IO *cio.ContainerIO IO *cio.ContainerIO
// TODO(random-liu): Add stop channel to get rid of stop poll waiting. // TODO(random-liu): Add stop channel to get rid of stop poll waiting.
@ -45,7 +45,7 @@ type Opts func(*Container) error
// WithContainer adds the containerd Container to the internal data store. // WithContainer adds the containerd Container to the internal data store.
func WithContainer(cntr containerd.Container) Opts { func WithContainer(cntr containerd.Container) Opts {
return func(c *Container) error { return func(c *Container) error {
c.Container = cntr c.Container = &Client{container: cntr}
return nil return nil
} }
} }

View File

@ -51,6 +51,8 @@ type Metadata struct {
// SandboxID is the sandbox id the container belongs to. // SandboxID is the sandbox id the container belongs to.
SandboxID string SandboxID string
// Config is the CRI container config. // Config is the CRI container config.
// NOTE(random-liu): Resource limits are updatable, the source
// of truth for resource limits are in containerd.
Config *runtime.ContainerConfig Config *runtime.ContainerConfig
// ImageRef is the reference of image used by the container. // ImageRef is the reference of image used by the container.
ImageRef string ImageRef string

View File

@ -29,7 +29,7 @@ import (
type Sandbox struct { type Sandbox struct {
// Metadata is the metadata of the sandbox, it is immutable after created. // Metadata is the metadata of the sandbox, it is immutable after created.
Metadata Metadata
// Containerd sandbox container // Container is the containerd sandbox container client
Container containerd.Container Container containerd.Container
// CNI network namespace client // CNI network namespace client
NetNS *NetNS NetNS *NetNS