From 48118ae08644653cc5decc5bf85122e958c502cc Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Tue, 25 Apr 2017 21:29:33 -0700 Subject: [PATCH 1/4] Update godeps. Signed-off-by: Lantao Liu --- Godeps/Godeps.json | 31 +-- vendor/github.com/blang/semver/README.md | 77 ++++-- vendor/github.com/blang/semver/range.go | 224 ++++++++++++++++++ .../runtime-tools/generate/generate.go | 8 +- .../runtime-tools/validate/validate.go | 6 +- 5 files changed, 309 insertions(+), 37 deletions(-) create mode 100644 vendor/github.com/blang/semver/range.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 6d8029c13..2d12ef6d1 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -18,8 +18,8 @@ }, { "ImportPath": "github.com/blang/semver", - "Comment": "v3.0.1", - "Rev": "31b736133b98f26d5e078ec9eb591666edfd091f" + "Comment": "v3.1.0", + "Rev": "aea32c919a18e5ef4537bbd283ff29594b1b0165" }, { "ImportPath": "github.com/boltdb/bolt", @@ -153,12 +153,12 @@ }, { "ImportPath": "github.com/docker/distribution/digestset", - "Comment": "v2.6.0-rc.1-130-gb38e5838", + "Comment": "v2.6.0-rc.1-130-gb38e583", "Rev": "b38e5838b7b2f2ad48e06ec4b500011976080621" }, { "ImportPath": "github.com/docker/distribution/reference", - "Comment": "v2.6.0-rc.1-130-gb38e5838", + "Comment": "v2.6.0-rc.1-130-gb38e583", "Rev": "b38e5838b7b2f2ad48e06ec4b500011976080621" }, { @@ -178,27 +178,27 @@ }, { "ImportPath": "github.com/gogo/protobuf/gogoproto", - "Comment": "v0.3-150-gd2e1ade2", + "Comment": "v0.3-150-gd2e1ade", "Rev": "d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8" }, { "ImportPath": "github.com/gogo/protobuf/proto", - "Comment": "v0.3-150-gd2e1ade2", + "Comment": "v0.3-150-gd2e1ade", "Rev": "d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8" }, { "ImportPath": "github.com/gogo/protobuf/protoc-gen-gogo/descriptor", - "Comment": "v0.3-150-gd2e1ade2", + "Comment": "v0.3-150-gd2e1ade", "Rev": "d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8" }, { "ImportPath": "github.com/gogo/protobuf/sortkeys", - "Comment": "v0.3-150-gd2e1ade2", + "Comment": "v0.3-150-gd2e1ade", "Rev": "d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8" }, { "ImportPath": "github.com/gogo/protobuf/types", - "Comment": "v0.3-150-gd2e1ade2", + "Comment": "v0.3-150-gd2e1ade", "Rev": "d2e1ade2d719b78fe5b061b4c18a9f7111b5bdc8" }, { @@ -238,7 +238,7 @@ }, { "ImportPath": "github.com/opencontainers/runc/libcontainer/system", - "Comment": "v1.0.0-rc3-21-g50401b5b", + "Comment": "v1.0.0-rc3-21-g50401b5", "Rev": "50401b5b4c2e01e4f1372b73a021742deeaf4e2d" }, { @@ -248,15 +248,15 @@ }, { "ImportPath": "github.com/opencontainers/runtime-tools/generate", - "Rev": "8addcc695096a0fc61010af8766952546bba7cd0" + "Rev": "68c195c3f2fa04a9a298b839eb2d94f31141271a" }, { "ImportPath": "github.com/opencontainers/runtime-tools/generate/seccomp", - "Rev": "8addcc695096a0fc61010af8766952546bba7cd0" + "Rev": "68c195c3f2fa04a9a298b839eb2d94f31141271a" }, { "ImportPath": "github.com/opencontainers/runtime-tools/validate", - "Rev": "8addcc695096a0fc61010af8766952546bba7cd0" + "Rev": "68c195c3f2fa04a9a298b839eb2d94f31141271a" }, { "ImportPath": "github.com/pkg/errors", @@ -265,6 +265,7 @@ }, { "ImportPath": "github.com/pmezard/go-difflib/difflib", + "Comment": "v1.0.0", "Rev": "792786c7400a136282c1664665ae0a8db921c6c2" }, { @@ -399,12 +400,12 @@ }, { "ImportPath": "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime", - "Comment": "v1.7.0-alpha.1-493-g32e927f4d8", + "Comment": "v1.7.0-alpha.1-493-g32e927f", "Rev": "32e927f4d86cfe7d3a7ad3c231fc445fb01463f5" }, { "ImportPath": "k8s.io/kubernetes/pkg/util/interrupt", - "Comment": "v1.7.0-alpha.1-493-g32e927f4d8", + "Comment": "v1.7.0-alpha.1-493-g32e927f", "Rev": "32e927f4d86cfe7d3a7ad3c231fc445fb01463f5" } ] diff --git a/vendor/github.com/blang/semver/README.md b/vendor/github.com/blang/semver/README.md index 5171c5c55..4399639e2 100644 --- a/vendor/github.com/blang/semver/README.md +++ b/vendor/github.com/blang/semver/README.md @@ -40,10 +40,52 @@ Features - Comparator-like comparisons - Compare Helper Methods - InPlace manipulation +- Ranges `>=1.0.0 <2.0.0 || >=3.0.0 !3.0.1-beta.1` - Sortable (implements sort.Interface) - database/sql compatible (sql.Scanner/Valuer) - encoding/json compatible (json.Marshaler/Unmarshaler) +Ranges +------ + +A `Range` is a set of conditions which specify which versions satisfy the range. + +A condition is composed of an operator and a version. The supported operators are: + +- `<1.0.0` Less than `1.0.0` +- `<=1.0.0` Less than or equal to `1.0.0` +- `>1.0.0` Greater than `1.0.0` +- `>=1.0.0` Greater than or equal to `1.0.0` +- `1.0.0`, `=1.0.0`, `==1.0.0` Equal to `1.0.0` +- `!1.0.0`, `!=1.0.0` Not equal to `1.0.0`. Excludes version `1.0.0`. + +A `Range` can link multiple `Ranges` separated by space: + +Ranges can be linked by logical AND: + + - `>1.0.0 <2.0.0` would match between both ranges, so `1.1.1` and `1.8.7` but not `1.0.0` or `2.0.0` + - `>1.0.0 <3.0.0 !2.0.3-beta.2` would match every version between `1.0.0` and `3.0.0` except `2.0.3-beta.2` + +Ranges can also be linked by logical OR: + + - `<2.0.0 || >=3.0.0` would match `1.x.x` and `3.x.x` but not `2.x.x` + +AND has a higher precedence than OR. It's not possible to use brackets. + +Ranges can be combined by both AND and OR + + - `>1.0.0 <2.0.0 || >3.0.0 !4.2.1` would match `1.2.3`, `1.9.9`, `3.1.1`, but not `4.2.1`, `2.1.1` + +Range usage: + +``` +v, err := semver.Parse("1.2.3") +range, err := semver.ParseRange(">1.0.0 <2.0.0 || >=3.0.0") +if range(v) { + //valid +} + +``` Example ----- @@ -103,23 +145,30 @@ if err != nil { } ``` + Benchmarks ----- - BenchmarkParseSimple 5000000 328 ns/op 49 B/op 1 allocs/op - BenchmarkParseComplex 1000000 2105 ns/op 263 B/op 7 allocs/op - BenchmarkParseAverage 1000000 1301 ns/op 168 B/op 4 allocs/op - BenchmarkStringSimple 10000000 130 ns/op 5 B/op 1 allocs/op - BenchmarkStringLarger 5000000 280 ns/op 32 B/op 2 allocs/op - BenchmarkStringComplex 3000000 512 ns/op 80 B/op 3 allocs/op - BenchmarkStringAverage 5000000 387 ns/op 47 B/op 2 allocs/op - BenchmarkValidateSimple 500000000 7.92 ns/op 0 B/op 0 allocs/op - BenchmarkValidateComplex 2000000 923 ns/op 0 B/op 0 allocs/op - BenchmarkValidateAverage 5000000 452 ns/op 0 B/op 0 allocs/op - BenchmarkCompareSimple 100000000 11.2 ns/op 0 B/op 0 allocs/op - BenchmarkCompareComplex 50000000 40.9 ns/op 0 B/op 0 allocs/op - BenchmarkCompareAverage 50000000 43.8 ns/op 0 B/op 0 allocs/op - BenchmarkSort 5000000 436 ns/op 259 B/op 2 allocs/op + BenchmarkParseSimple-4 5000000 390 ns/op 48 B/op 1 allocs/op + BenchmarkParseComplex-4 1000000 1813 ns/op 256 B/op 7 allocs/op + BenchmarkParseAverage-4 1000000 1171 ns/op 163 B/op 4 allocs/op + BenchmarkStringSimple-4 20000000 119 ns/op 16 B/op 1 allocs/op + BenchmarkStringLarger-4 10000000 206 ns/op 32 B/op 2 allocs/op + BenchmarkStringComplex-4 5000000 324 ns/op 80 B/op 3 allocs/op + BenchmarkStringAverage-4 5000000 273 ns/op 53 B/op 2 allocs/op + BenchmarkValidateSimple-4 200000000 9.33 ns/op 0 B/op 0 allocs/op + BenchmarkValidateComplex-4 3000000 469 ns/op 0 B/op 0 allocs/op + BenchmarkValidateAverage-4 5000000 256 ns/op 0 B/op 0 allocs/op + BenchmarkCompareSimple-4 100000000 11.8 ns/op 0 B/op 0 allocs/op + BenchmarkCompareComplex-4 50000000 30.8 ns/op 0 B/op 0 allocs/op + BenchmarkCompareAverage-4 30000000 41.5 ns/op 0 B/op 0 allocs/op + BenchmarkSort-4 3000000 419 ns/op 256 B/op 2 allocs/op + BenchmarkRangeParseSimple-4 2000000 850 ns/op 192 B/op 5 allocs/op + BenchmarkRangeParseAverage-4 1000000 1677 ns/op 400 B/op 10 allocs/op + BenchmarkRangeParseComplex-4 300000 5214 ns/op 1440 B/op 30 allocs/op + BenchmarkRangeMatchSimple-4 50000000 25.6 ns/op 0 B/op 0 allocs/op + BenchmarkRangeMatchAverage-4 30000000 56.4 ns/op 0 B/op 0 allocs/op + BenchmarkRangeMatchComplex-4 10000000 153 ns/op 0 B/op 0 allocs/op See benchmark cases at [semver_test.go](semver_test.go) diff --git a/vendor/github.com/blang/semver/range.go b/vendor/github.com/blang/semver/range.go new file mode 100644 index 000000000..0a8eaa1c9 --- /dev/null +++ b/vendor/github.com/blang/semver/range.go @@ -0,0 +1,224 @@ +package semver + +import ( + "fmt" + "strings" + "unicode" +) + +type comparator func(Version, Version) bool + +var ( + compEQ comparator = func(v1 Version, v2 Version) bool { + return v1.Compare(v2) == 0 + } + compNE = func(v1 Version, v2 Version) bool { + return v1.Compare(v2) != 0 + } + compGT = func(v1 Version, v2 Version) bool { + return v1.Compare(v2) == 1 + } + compGE = func(v1 Version, v2 Version) bool { + return v1.Compare(v2) >= 0 + } + compLT = func(v1 Version, v2 Version) bool { + return v1.Compare(v2) == -1 + } + compLE = func(v1 Version, v2 Version) bool { + return v1.Compare(v2) <= 0 + } +) + +type versionRange struct { + v Version + c comparator +} + +// rangeFunc creates a Range from the given versionRange. +func (vr *versionRange) rangeFunc() Range { + return Range(func(v Version) bool { + return vr.c(v, vr.v) + }) +} + +// Range represents a range of versions. +// A Range can be used to check if a Version satisfies it: +// +// range, err := semver.ParseRange(">1.0.0 <2.0.0") +// range(semver.MustParse("1.1.1") // returns true +type Range func(Version) bool + +// OR combines the existing Range with another Range using logical OR. +func (rf Range) OR(f Range) Range { + return Range(func(v Version) bool { + return rf(v) || f(v) + }) +} + +// AND combines the existing Range with another Range using logical AND. +func (rf Range) AND(f Range) Range { + return Range(func(v Version) bool { + return rf(v) && f(v) + }) +} + +// ParseRange parses a range and returns a Range. +// If the range could not be parsed an error is returned. +// +// Valid ranges are: +// - "<1.0.0" +// - "<=1.0.0" +// - ">1.0.0" +// - ">=1.0.0" +// - "1.0.0", "=1.0.0", "==1.0.0" +// - "!1.0.0", "!=1.0.0" +// +// A Range can consist of multiple ranges separated by space: +// Ranges can be linked by logical AND: +// - ">1.0.0 <2.0.0" would match between both ranges, so "1.1.1" and "1.8.7" but not "1.0.0" or "2.0.0" +// - ">1.0.0 <3.0.0 !2.0.3-beta.2" would match every version between 1.0.0 and 3.0.0 except 2.0.3-beta.2 +// +// Ranges can also be linked by logical OR: +// - "<2.0.0 || >=3.0.0" would match "1.x.x" and "3.x.x" but not "2.x.x" +// +// AND has a higher precedence than OR. It's not possible to use brackets. +// +// Ranges can be combined by both AND and OR +// +// - `>1.0.0 <2.0.0 || >3.0.0 !4.2.1` would match `1.2.3`, `1.9.9`, `3.1.1`, but not `4.2.1`, `2.1.1` +func ParseRange(s string) (Range, error) { + parts := splitAndTrim(s) + orParts, err := splitORParts(parts) + if err != nil { + return nil, err + } + var orFn Range + for _, p := range orParts { + var andFn Range + for _, ap := range p { + opStr, vStr, err := splitComparatorVersion(ap) + if err != nil { + return nil, err + } + vr, err := buildVersionRange(opStr, vStr) + if err != nil { + return nil, fmt.Errorf("Could not parse Range %q: %s", ap, err) + } + rf := vr.rangeFunc() + + // Set function + if andFn == nil { + andFn = rf + } else { // Combine with existing function + andFn = andFn.AND(rf) + } + } + if orFn == nil { + orFn = andFn + } else { + orFn = orFn.OR(andFn) + } + + } + return orFn, nil +} + +// splitORParts splits the already cleaned parts by '||'. +// Checks for invalid positions of the operator and returns an +// error if found. +func splitORParts(parts []string) ([][]string, error) { + var ORparts [][]string + last := 0 + for i, p := range parts { + if p == "||" { + if i == 0 { + return nil, fmt.Errorf("First element in range is '||'") + } + ORparts = append(ORparts, parts[last:i]) + last = i + 1 + } + } + if last == len(parts) { + return nil, fmt.Errorf("Last element in range is '||'") + } + ORparts = append(ORparts, parts[last:]) + return ORparts, nil +} + +// buildVersionRange takes a slice of 2: operator and version +// and builds a versionRange, otherwise an error. +func buildVersionRange(opStr, vStr string) (*versionRange, error) { + c := parseComparator(opStr) + if c == nil { + return nil, fmt.Errorf("Could not parse comparator %q in %q", opStr, strings.Join([]string{opStr, vStr}, "")) + } + v, err := Parse(vStr) + if err != nil { + return nil, fmt.Errorf("Could not parse version %q in %q: %s", vStr, strings.Join([]string{opStr, vStr}, ""), err) + } + + return &versionRange{ + v: v, + c: c, + }, nil + +} + +// splitAndTrim splits a range string by spaces and cleans leading and trailing spaces +func splitAndTrim(s string) (result []string) { + last := 0 + for i := 0; i < len(s); i++ { + if s[i] == ' ' { + if last < i-1 { + result = append(result, s[last:i]) + } + last = i + 1 + } + } + if last < len(s)-1 { + result = append(result, s[last:]) + } + // parts := strings.Split(s, " ") + // for _, x := range parts { + // if s := strings.TrimSpace(x); len(s) != 0 { + // result = append(result, s) + // } + // } + return +} + +// splitComparatorVersion splits the comparator from the version. +// Spaces between the comparator and the version are not allowed. +// Input must be free of leading or trailing spaces. +func splitComparatorVersion(s string) (string, string, error) { + i := strings.IndexFunc(s, unicode.IsDigit) + if i == -1 { + return "", "", fmt.Errorf("Could not get version from string: %q", s) + } + return strings.TrimSpace(s[0:i]), s[i:], nil +} + +func parseComparator(s string) comparator { + switch s { + case "==": + fallthrough + case "": + fallthrough + case "=": + return compEQ + case ">": + return compGT + case ">=": + return compGE + case "<": + return compLT + case "<=": + return compLE + case "!": + fallthrough + case "!=": + return compNE + } + + return nil +} diff --git a/vendor/github.com/opencontainers/runtime-tools/generate/generate.go b/vendor/github.com/opencontainers/runtime-tools/generate/generate.go index 737cd9e0c..5ca0e3159 100644 --- a/vendor/github.com/opencontainers/runtime-tools/generate/generate.go +++ b/vendor/github.com/opencontainers/runtime-tools/generate/generate.go @@ -912,35 +912,30 @@ func (g *Generator) DropProcessCapability(c string) error { for i, cap := range g.spec.Process.Capabilities.Bounding { if strings.ToUpper(cap) == cp { g.spec.Process.Capabilities.Bounding = append(g.spec.Process.Capabilities.Bounding[:i], g.spec.Process.Capabilities.Bounding[i+1:]...) - return nil } } for i, cap := range g.spec.Process.Capabilities.Effective { if strings.ToUpper(cap) == cp { g.spec.Process.Capabilities.Effective = append(g.spec.Process.Capabilities.Effective[:i], g.spec.Process.Capabilities.Effective[i+1:]...) - return nil } } for i, cap := range g.spec.Process.Capabilities.Inheritable { if strings.ToUpper(cap) == cp { g.spec.Process.Capabilities.Inheritable = append(g.spec.Process.Capabilities.Inheritable[:i], g.spec.Process.Capabilities.Inheritable[i+1:]...) - return nil } } for i, cap := range g.spec.Process.Capabilities.Permitted { if strings.ToUpper(cap) == cp { g.spec.Process.Capabilities.Permitted = append(g.spec.Process.Capabilities.Permitted[:i], g.spec.Process.Capabilities.Permitted[i+1:]...) - return nil } } for i, cap := range g.spec.Process.Capabilities.Ambient { if strings.ToUpper(cap) == cp { g.spec.Process.Capabilities.Ambient = append(g.spec.Process.Capabilities.Ambient[:i], g.spec.Process.Capabilities.Ambient[i+1:]...) - return nil } } @@ -1031,7 +1026,7 @@ func (g *Generator) AddDevice(device rspec.LinuxDevice) { g.spec.Linux.Devices = append(g.spec.Linux.Devices, device) } -//RemoveDevice remove a device from g.spec.Linux.Devices +// RemoveDevice remove a device from g.spec.Linux.Devices func (g *Generator) RemoveDevice(path string) error { if g.spec == nil || g.spec.Linux == nil || g.spec.Linux.Devices == nil { return nil @@ -1046,6 +1041,7 @@ func (g *Generator) RemoveDevice(path string) error { return nil } +// ClearLinuxDevices clears g.spec.Linux.Devices func (g *Generator) ClearLinuxDevices() { if g.spec == nil || g.spec.Linux == nil || g.spec.Linux.Devices == nil { return diff --git a/vendor/github.com/opencontainers/runtime-tools/validate/validate.go b/vendor/github.com/opencontainers/runtime-tools/validate/validate.go index 1d71e6efa..b7260f9b3 100644 --- a/vendor/github.com/opencontainers/runtime-tools/validate/validate.go +++ b/vendor/github.com/opencontainers/runtime-tools/validate/validate.go @@ -259,7 +259,7 @@ func (v *Validator) CheckProcess() (msgs []string) { } } - msgs = append(msgs, v.CheckCapablities()...) + msgs = append(msgs, v.CheckCapabilities()...) msgs = append(msgs, v.CheckRlimits()...) if v.spec.Platform.OS == "linux" { @@ -276,7 +276,8 @@ func (v *Validator) CheckProcess() (msgs []string) { return } -func (v *Validator) CheckCapablities() (msgs []string) { +// CheckCapabilities checks v.spec.Process.Capabilities +func (v *Validator) CheckCapabilities() (msgs []string) { process := v.spec.Process if v.spec.Platform.OS == "linux" { var caps []string @@ -309,6 +310,7 @@ func (v *Validator) CheckCapablities() (msgs []string) { return } +// CheckRlimits checks v.spec.Process.Rlimits func (v *Validator) CheckRlimits() (msgs []string) { process := v.spec.Process for index, rlimit := range process.Rlimits { From 11fff60aff4776730ed7b3039c1e815a0b62be67 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Wed, 26 Apr 2017 22:15:42 -0700 Subject: [PATCH 2/4] Add container metadata store. Signed-off-by: Lantao Liu --- pkg/metadata/container.go | 183 +++++++++++++++++++++++++++++++++ pkg/metadata/container_test.go | 180 ++++++++++++++++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 pkg/metadata/container.go create mode 100644 pkg/metadata/container_test.go diff --git a/pkg/metadata/container.go b/pkg/metadata/container.go new file mode 100644 index 000000000..3e1cf77e4 --- /dev/null +++ b/pkg/metadata/container.go @@ -0,0 +1,183 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadata + +import ( + "encoding/json" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" + + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +// The code is very similar with sandbox.go, but there is no template support +// in golang, we have to have similar files for different types. +// TODO(random-liu): Figure out a way to simplify this. +// TODO(random-liu): Handle versioning with the same mechanism with container.go + +// containerMetadataVersion is current version of container metadata. +const containerMetadataVersion = "v1" // nolint + +// versionedContainerMetadata is the internal versioned container metadata. +// nolint +type versionedContainerMetadata struct { + // Version indicates the version of the versioned container metadata. + Version string + ContainerMetadata +} + +// ContainerMetadata is the unversioned container metadata. +type ContainerMetadata struct { + // ID is the container id. + ID string + // Name is the container name. + Name string + // SandboxID is the sandbox id the container belongs to. + SandboxID string + // Config is the CRI container config. + Config *runtime.ContainerConfig + // ImageRef is the reference of image used by the container. + ImageRef string + // Pid is the init process id of the container. + Pid uint32 + // CreatedAt is the created timestamp. + CreatedAt int64 + // StartedAt is the started timestamp. + StartedAt int64 + // FinishedAt is the finished timestamp. + FinishedAt int64 + // ExitCode is the container exit code. + ExitCode int32 + // CamelCase string explaining why container is in its current state. + Reason string + // Human-readable message indicating details about why container is in its + // current state. + Message string + // Removing indicates that the container is in removing state. + // In fact, this field doesn't need to be checkpointed. + // TODO(random-liu): Skip this during serialization when we put object + // into the store directly. + // TODO(random-liu): Reset this field to false during state recoverry. + Removing bool +} + +// State returns current state of the container based on the metadata. +func (c *ContainerMetadata) State() runtime.ContainerState { + if c.FinishedAt != 0 { + return runtime.ContainerState_CONTAINER_EXITED + } + if c.StartedAt != 0 { + return runtime.ContainerState_CONTAINER_RUNNING + } + if c.CreatedAt != 0 { + return runtime.ContainerState_CONTAINER_CREATED + } + return runtime.ContainerState_CONTAINER_UNKNOWN +} + +// ContainerUpdateFunc is the function used to update ContainerMetadata. +type ContainerUpdateFunc func(ContainerMetadata) (ContainerMetadata, error) + +// ContainerToStoreUpdateFunc generates a metadata store UpdateFunc from ContainerUpdateFunc. +func ContainerToStoreUpdateFunc(u ContainerUpdateFunc) store.UpdateFunc { + return func(data []byte) ([]byte, error) { + meta := &ContainerMetadata{} + if err := json.Unmarshal(data, meta); err != nil { + return nil, err + } + newMeta, err := u(*meta) + if err != nil { + return nil, err + } + return json.Marshal(newMeta) + } +} + +// ContainerStore is the store for metadata of all containers. +type ContainerStore interface { + // Create creates a container from ContainerMetadata in the store. + Create(ContainerMetadata) error + // Get gets a specified container. + Get(string) (*ContainerMetadata, error) + // Update updates a specified container. + Update(string, ContainerUpdateFunc) error + // List lists all containers. + List() ([]*ContainerMetadata, error) + // Delete deletes the container from the store. + Delete(string) error +} + +// containerStore is an implmentation of ContainerStore. +type containerStore struct { + store store.MetadataStore +} + +// NewContainerStore creates a ContainerStore from a basic MetadataStore. +func NewContainerStore(store store.MetadataStore) ContainerStore { + return &containerStore{store: store} +} + +// Create creates a container from ContainerMetadata in the store. +func (c *containerStore) Create(metadata ContainerMetadata) error { + data, err := json.Marshal(&metadata) + if err != nil { + return err + } + return c.store.Create(metadata.ID, data) +} + +// Get gets a specified container. +func (c *containerStore) Get(containerID string) (*ContainerMetadata, error) { + data, err := c.store.Get(containerID) + if err != nil { + return nil, err + } + container := &ContainerMetadata{} + if err := json.Unmarshal(data, container); err != nil { + return nil, err + } + return container, nil +} + +// Update updates a specified container. The function is running in a +// transaction. Update will not be applied when the update function +// returns error. +func (c *containerStore) Update(containerID string, u ContainerUpdateFunc) error { + return c.store.Update(containerID, ContainerToStoreUpdateFunc(u)) +} + +// List lists all containers. +func (c *containerStore) List() ([]*ContainerMetadata, error) { + allData, err := c.store.List() + if err != nil { + return nil, err + } + var containers []*ContainerMetadata + for _, data := range allData { + container := &ContainerMetadata{} + if err := json.Unmarshal(data, container); err != nil { + return nil, err + } + containers = append(containers, container) + } + return containers, nil +} + +// Delete deletes the Container from the store. +func (c *containerStore) Delete(containerID string) error { + return c.store.Delete(containerID) +} diff --git a/pkg/metadata/container_test.go b/pkg/metadata/container_test.go new file mode 100644 index 000000000..f022f0ee9 --- /dev/null +++ b/pkg/metadata/container_test.go @@ -0,0 +1,180 @@ +/* +Copyright 2017 The Kubernetes Authorc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metadata + +import ( + "testing" + "time" + + assertlib "github.com/stretchr/testify/assert" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata/store" + + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" +) + +func TestContainerState(t *testing.T) { + for c, test := range map[string]struct { + metadata *ContainerMetadata + state runtime.ContainerState + }{ + "unknown state": { + metadata: &ContainerMetadata{ + ID: "1", + Name: "Container-1", + }, + state: runtime.ContainerState_CONTAINER_UNKNOWN, + }, + "created state": { + metadata: &ContainerMetadata{ + ID: "2", + Name: "Container-2", + CreatedAt: time.Now().UnixNano(), + }, + state: runtime.ContainerState_CONTAINER_CREATED, + }, + "running state": { + metadata: &ContainerMetadata{ + ID: "3", + Name: "Container-3", + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + }, + state: runtime.ContainerState_CONTAINER_RUNNING, + }, + "exited state": { + metadata: &ContainerMetadata{ + ID: "3", + Name: "Container-3", + CreatedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + }, + state: runtime.ContainerState_CONTAINER_EXITED, + }, + } { + t.Logf("TestCase %q", c) + assertlib.Equal(t, test.state, test.metadata.State()) + } +} + +func TestContainerStore(t *testing.T) { + containers := map[string]*ContainerMetadata{ + "1": { + ID: "1", + Name: "Container-1", + SandboxID: "Sandbox-1", + Config: &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: "TestPod-1", + Attempt: 1, + }, + }, + ImageRef: "TestImage-1", + Pid: 1, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: 1, + Reason: "TestReason-1", + Message: "TestMessage-1", + }, + "2": { + ID: "2", + Name: "Container-2", + SandboxID: "Sandbox-2", + Config: &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: "TestPod-2", + Attempt: 2, + }, + }, + ImageRef: "TestImage-2", + Pid: 2, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: 2, + Reason: "TestReason-2", + Message: "TestMessage-2", + }, + "3": { + ID: "3", + Name: "Container-3", + SandboxID: "Sandbox-3", + Config: &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: "TestPod-3", + Attempt: 3, + }, + }, + ImageRef: "TestImage-3", + Pid: 3, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: 3, + Reason: "TestReason-3", + Message: "TestMessage-3", + Removing: true, + }, + } + assert := assertlib.New(t) + + c := NewContainerStore(store.NewMetadataStore()) + + t.Logf("should be able to create container metadata") + for _, meta := range containers { + assert.NoError(c.Create(*meta)) + } + + t.Logf("should be able to get container metadata") + for id, expectMeta := range containers { + meta, err := c.Get(id) + assert.NoError(err) + assert.Equal(expectMeta, meta) + } + + t.Logf("should be able to list container metadata") + cntrs, err := c.List() + assert.NoError(err) + assert.Len(cntrs, 3) + + t.Logf("should be able to update container metadata") + testID := "2" + newCreatedAt := time.Now().UnixNano() + expectMeta := *containers[testID] + expectMeta.CreatedAt = newCreatedAt + err = c.Update(testID, func(o ContainerMetadata) (ContainerMetadata, error) { + o.CreatedAt = newCreatedAt + return o, nil + }) + assert.NoError(err) + newMeta, err := c.Get(testID) + assert.NoError(err) + assert.Equal(&expectMeta, newMeta) + + t.Logf("should be able to delete container metadata") + assert.NoError(c.Delete(testID)) + cntrs, err = c.List() + assert.NoError(err) + assert.Len(cntrs, 2) + + t.Logf("get should return nil without error after deletion") + meta, err := c.Get(testID) + assert.Error(store.ErrNotExist, err) + assert.True(meta == nil) +} From 6ac71e5862e9343cfee4f47ae54379d6f7585ddc Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Tue, 2 May 2017 14:50:12 -0700 Subject: [PATCH 3/4] Add initial container implementation. Signed-off-by: Lantao Liu --- pkg/server/container_create.go | 76 +++++++- pkg/server/container_list.go | 79 +++++++- pkg/server/container_remove.go | 88 ++++++++- pkg/server/container_start.go | 320 ++++++++++++++++++++++++++++++++- pkg/server/container_status.go | 52 +++++- pkg/server/container_stop.go | 107 ++++++++++- pkg/server/events.go | 94 ++++++++++ pkg/server/helpers.go | 92 ++++++++++ pkg/server/sandbox_list.go | 3 +- pkg/server/sandbox_remove.go | 3 +- pkg/server/sandbox_run.go | 15 +- pkg/server/sandbox_status.go | 2 +- pkg/server/sandbox_stop.go | 2 +- pkg/server/service.go | 30 +++- 14 files changed, 925 insertions(+), 38 deletions(-) create mode 100644 pkg/server/events.go diff --git a/pkg/server/container_create.go b/pkg/server/container_create.go index 63d0643e6..cda810595 100644 --- a/pkg/server/container_create.go +++ b/pkg/server/container_create.go @@ -17,14 +17,84 @@ limitations under the License. package server import ( - "errors" + "fmt" + "time" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // CreateContainer creates a new container in the given PodSandbox. -func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (*runtime.CreateContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) CreateContainer(ctx context.Context, r *runtime.CreateContainerRequest) (retRes *runtime.CreateContainerResponse, retErr error) { + glog.V(2).Infof("CreateContainer within sandbox %q with container config %+v and sandbox config %+v", + r.GetPodSandboxId(), r.GetConfig(), r.GetSandboxConfig()) + defer func() { + if retErr == nil { + glog.V(2).Infof("CreateContainer returns container id %q", retRes.GetContainerId()) + } + }() + + config := r.GetConfig() + sandboxConfig := r.GetSandboxConfig() + sandbox, err := c.getSandbox(r.GetPodSandboxId()) + if err != nil { + return nil, fmt.Errorf("failed to find sandbox id %q: %v", r.GetPodSandboxId(), err) + } + + // Generate unique id and name for the container and reserve the name. + // Reserve the container name to avoid concurrent `CreateContainer` request creating + // the same container. + id := generateID() + name := makeContainerName(config.GetMetadata(), sandboxConfig.GetMetadata()) + if err := c.containerNameIndex.Reserve(name, id); err != nil { + return nil, fmt.Errorf("failed to reserve container name %q: %v", name, err) + } + defer func() { + // Release the name if the function returns with an error. + if retErr != nil { + c.containerNameIndex.ReleaseByName(name) + } + }() + + // Create initial container metadata. + meta := metadata.ContainerMetadata{ + ID: id, + Name: name, + SandboxID: sandbox.ID, + Config: config, + } + + // TODO(random-liu): [P0] Prepare container rootfs. + + // TODO(random-liu): [P0] Set ImageRef in ContainerMetadata with image id. + + // Create container root directory. + containerRootDir := getContainerRootDir(c.rootDir, id) + if err := c.os.MkdirAll(containerRootDir, 0755); err != nil { + return nil, fmt.Errorf("failed to create container root directory %q: %v", + containerRootDir, err) + } + defer func() { + if retErr != nil { + // Cleanup the container root directory. + if err := c.os.RemoveAll(containerRootDir); err != nil { + glog.Errorf("Failed to remove container root directory %q: %v", + containerRootDir, err) + } + } + }() + + // Update container CreatedAt. + meta.CreatedAt = time.Now().UnixNano() + // Add container into container store. + if err := c.containerStore.Create(meta); err != nil { + return nil, fmt.Errorf("failed to add container metadata %+v into store: %v", + meta, err) + } + + return &runtime.CreateContainerResponse{ContainerId: id}, nil } diff --git a/pkg/server/container_list.go b/pkg/server/container_list.go index 254323e44..0d7cd8978 100644 --- a/pkg/server/container_list.go +++ b/pkg/server/container_list.go @@ -17,14 +17,87 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // ListContainers lists all containers matching the filter. -func (c *criContainerdService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (*runtime.ListContainersResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ListContainers(ctx context.Context, r *runtime.ListContainersRequest) (retRes *runtime.ListContainersResponse, retErr error) { + glog.V(4).Infof("ListContainers with filter %+v", r.GetFilter()) + defer func() { + if retErr == nil { + glog.V(4).Infof("ListContainers returns containers %+v", retRes.GetContainers()) + } + }() + + // List all container metadata from store. + metas, err := c.containerStore.List() + if err != nil { + return nil, fmt.Errorf("failed to list metadata from container store: %v", err) + } + + var containers []*runtime.Container + for _, meta := range metas { + containers = append(containers, toCRIContainer(meta)) + } + + containers = c.filterCRIContainers(containers, r.GetFilter()) + return &runtime.ListContainersResponse{Containers: containers}, nil +} + +// toCRIContainer converts container metadata into CRI container. +func toCRIContainer(meta *metadata.ContainerMetadata) *runtime.Container { + return &runtime.Container{ + Id: meta.ID, + PodSandboxId: meta.SandboxID, + Metadata: meta.Config.GetMetadata(), + Image: meta.Config.GetImage(), + ImageRef: meta.ImageRef, + State: meta.State(), + CreatedAt: meta.CreatedAt, + Labels: meta.Config.GetLabels(), + Annotations: meta.Config.GetAnnotations(), + } +} + +// filterCRIContainers filters CRIContainers. +func (c *criContainerdService) filterCRIContainers(containers []*runtime.Container, filter *runtime.ContainerFilter) []*runtime.Container { + if filter == nil { + return containers + } + + filtered := []*runtime.Container{} + for _, cntr := range containers { + if filter.GetId() != "" && filter.GetId() != cntr.Id { + continue + } + if filter.GetPodSandboxId() != "" && filter.GetPodSandboxId() != cntr.PodSandboxId { + continue + } + if filter.GetState() != nil && filter.GetState().GetState() != cntr.State { + continue + } + if filter.GetLabelSelector() != nil { + match := true + for k, v := range filter.GetLabelSelector() { + got, ok := cntr.Labels[k] + if !ok || got != v { + match = false + break + } + } + if !match { + continue + } + } + filtered = append(filtered, cntr) + } + + return filtered } diff --git a/pkg/server/container_remove.go b/pkg/server/container_remove.go index d3e8bd1ce..706f0b9d1 100644 --- a/pkg/server/container_remove.go +++ b/pkg/server/container_remove.go @@ -17,14 +17,96 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // RemoveContainer removes the container. -func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (*runtime.RemoveContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) RemoveContainer(ctx context.Context, r *runtime.RemoveContainerRequest) (retRes *runtime.RemoveContainerResponse, retErr error) { + glog.V(2).Infof("RemoveContainer for %q", r.GetContainerId()) + defer func() { + if retErr == nil { + glog.V(2).Infof("RemoveContainer %q returns successfully", r.GetContainerId()) + } + }() + + id := r.GetContainerId() + + // Set removing state to prevent other start/remove operations against this container + // while it's being removed. + if err := c.setContainerRemoving(id); err != nil { + if !metadata.IsNotExistError(err) { + return nil, fmt.Errorf("failed to set removing state for container %q: %v", + id, err) + } + // Do not return error if container metadata doesn't exist. + glog.V(5).Infof("RemoveContainer called for container %q that does not exist", id) + return &runtime.RemoveContainerResponse{}, nil + } + defer func() { + if retErr == nil { + // Cleanup all index after successfully remove the container. + c.containerNameIndex.ReleaseByKey(id) + return + } + // Reset removing if remove failed. + if err := c.resetContainerRemoving(id); err != nil { + // TODO(random-liu): Deal with update failure. Actually Removing doesn't need to + // be checkpointed, we only need it to have the same lifecycle with container metadata. + glog.Errorf("failed to reset removing state for container %q: %v", + id, err) + } + }() + + // NOTE(random-liu): Docker set container to "Dead" state when start removing the + // container so as to avoid start/restart the container again. However, for current + // kubelet implementation, we'll never start a container once we decide to remove it, + // so we don't need the "Dead" state for now. + + // TODO(random-liu): [P0] Cleanup container rootfs. + + // Cleanup container root directory. + containerRootDir := getContainerRootDir(c.rootDir, id) + if err := c.os.RemoveAll(containerRootDir); err != nil { + return nil, fmt.Errorf("failed to remove container root directory %q: %v", + containerRootDir, err) + } + + // Delete container metadata. + if err := c.containerStore.Delete(id); err != nil { + return nil, fmt.Errorf("failed to delete container metadata for %q: %v", id, err) + } + + return &runtime.RemoveContainerResponse{}, nil +} + +// setContainerRemoving sets the container into removing state. In removing state, the +// container will not be started or removed again. +func (c *criContainerdService) setContainerRemoving(id string) error { + return c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + // Do not remove container if it's still running. + if meta.State() == runtime.ContainerState_CONTAINER_RUNNING { + return meta, fmt.Errorf("container %q is still running", id) + } + if meta.Removing { + return meta, fmt.Errorf("container is already in removing state") + } + meta.Removing = true + return meta, nil + }) +} + +// resetContainerRemoving resets the container removing state on remove failure. So +// that we could remove the container again. +func (c *criContainerdService) resetContainerRemoving(id string) error { + return c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + meta.Removing = false + return meta, nil + }) } diff --git a/pkg/server/container_start.go b/pkg/server/container_start.go index dfd5f80a6..cc4a8697b 100644 --- a/pkg/server/container_start.go +++ b/pkg/server/container_start.go @@ -17,14 +17,328 @@ limitations under the License. package server import ( - "errors" + "encoding/json" + "fmt" + "io" + "os" + "time" + prototypes "github.com/gogo/protobuf/types" + "github.com/golang/glog" + runtimespec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/opencontainers/runtime-tools/generate" "golang.org/x/net/context" + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + "github.com/containerd/containerd/api/types/mount" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // StartContainer starts the container. -func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (*runtime.StartContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) StartContainer(ctx context.Context, r *runtime.StartContainerRequest) (retRes *runtime.StartContainerResponse, retErr error) { + glog.V(2).Infof("StartContainer for %q", r.GetContainerId()) + defer func() { + if retErr == nil { + glog.V(2).Infof("StartContainer %q returns successfully", r.GetContainerId()) + } + }() + + container, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + id := container.ID + + var startErr error + // start container in one transaction to avoid race with event monitor. + if err := c.containerStore.Update(id, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + // Always apply metadata change no matter startContainer fails or not. Because startContainer + // may change container state no matter it fails or succeeds. + startErr = c.startContainer(ctx, id, &meta) + return meta, nil + }); startErr != nil { + return nil, startErr + } else if err != nil { + return nil, fmt.Errorf("failed to update container %q metadata: %v", id, err) + } + return &runtime.StartContainerResponse{}, nil +} + +// startContainer actually starts the container. The function needs to be run in one transaction. Any updates +// to the metadata passed in will be applied to container store no matter the function returns error or not. +func (c *criContainerdService) startContainer(ctx context.Context, id string, meta *metadata.ContainerMetadata) (retErr error) { + config := meta.Config + // Return error if container is not in created state. + if meta.State() != runtime.ContainerState_CONTAINER_CREATED { + return fmt.Errorf("container %q is in %s state", id, criContainerStateToString(meta.State())) + } + + // Do not start the container when there is a removal in progress. + if meta.Removing { + return fmt.Errorf("container %q is in removing state", id) + } + + defer func() { + if retErr != nil { + // Set container to exited if fail to start. + meta.Pid = 0 + meta.FinishedAt = time.Now().UnixNano() + meta.ExitCode = errorStartExitCode + meta.Reason = errorStartReason + meta.Message = retErr.Error() + } + }() + + // Get sandbox config from sandbox store. + sandboxMeta, err := c.getSandbox(meta.SandboxID) + if err != nil { + return fmt.Errorf("sandbox %q not found: %v", meta.SandboxID, err) + } + sandboxConfig := sandboxMeta.Config + sandboxID := meta.SandboxID + // Make sure sandbox is running. + sandboxInfo, err := c.containerService.Info(ctx, &execution.InfoRequest{ID: sandboxID}) + if err != nil { + return fmt.Errorf("failed to get sandbox container %q info: %v", sandboxID, err) + } + // This is only a best effort check, sandbox may still exit after this. If sandbox fails + // before starting the container, the start will fail. + if sandboxInfo.Status != container.Status_RUNNING { + return fmt.Errorf("sandbox container %q is not running", sandboxID) + } + sandboxPid := sandboxInfo.Pid + glog.V(2).Infof("Sandbox container %q is running with pid %d", sandboxID, sandboxPid) + + // Generate containerd container create options. + // TODO(random-liu): [P0] Create container rootfs with image ref. + // TODO(random-liu): [P0] Apply default image config. + // Use fixed rootfs path for now. + const rootPath = "/" + + spec, err := c.generateContainerSpec(id, sandboxPid, config, sandboxConfig) + if err != nil { + return fmt.Errorf("failed to generate container %q spec: %v", id, err) + } + rawSpec, err := json.Marshal(spec) + if err != nil { + return fmt.Errorf("failed to marshal oci spec %+v: %v", spec, err) + } + glog.V(4).Infof("Container spec: %+v", spec) + + containerRootDir := getContainerRootDir(c.rootDir, id) + stdin, stdout, stderr := getStreamingPipes(containerRootDir) + // Set stdin to empty if Stdin == false. + if !config.GetStdin() { + stdin = "" + } + stdinPipe, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, stdin, stdout, stderr) + if err != nil { + return fmt.Errorf("failed to prepare streaming pipes: %v", err) + } + defer func() { + if retErr != nil { + if stdinPipe != nil { + stdinPipe.Close() + } + stdoutPipe.Close() + stderrPipe.Close() + } + }() + // Redirect the stream to std for now. + // TODO(random-liu): [P1] Support container logging. + // TODO(random-liu): [P1] Support StdinOnce after container logging is added. + if stdinPipe != nil { + go func(w io.WriteCloser) { + io.Copy(w, os.Stdin) // nolint: errcheck + w.Close() + }(stdinPipe) + } + go func(r io.ReadCloser) { + io.Copy(os.Stdout, r) // nolint: errcheck + r.Close() + }(stdoutPipe) + // Only redirect stderr when there is no tty. + if !config.GetTty() { + go func(r io.ReadCloser) { + io.Copy(os.Stderr, r) // nolint: errcheck + r.Close() + }(stderrPipe) + } + + // Create containerd container. + createOpts := &execution.CreateRequest{ + ID: id, + Spec: &prototypes.Any{ + TypeUrl: runtimespec.Version, + Value: rawSpec, + }, + // TODO(random-liu): [P0] Get rootfs mount from containerd. + Rootfs: []*mount.Mount{ + { + Type: "bind", + Source: rootPath, + Options: []string{ + "rw", + "rbind", + }, + }, + }, + Runtime: defaultRuntime, + Stdin: stdin, + Stdout: stdout, + Stderr: stderr, + Terminal: config.GetTty(), + } + glog.V(2).Infof("Create containerd container (id=%q, name=%q) with options %+v.", + id, meta.Name, createOpts) + createResp, err := c.containerService.Create(ctx, createOpts) + if err != nil { + return fmt.Errorf("failed to create containerd container: %v", err) + } + defer func() { + if retErr != nil { + // Cleanup the containerd container if an error is returned. + if _, err := c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}); err != nil { + glog.Errorf("Failed to delete containerd container %q: %v", id, err) + } + } + }() + + // Start containerd container. + if _, err := c.containerService.Start(ctx, &execution.StartRequest{ID: id}); err != nil { + return fmt.Errorf("failed to start containerd container %q: %v", id, err) + } + + // Update container start timestamp. + meta.Pid = createResp.Pid + meta.StartedAt = time.Now().UnixNano() + return nil +} + +func (c *criContainerdService) generateContainerSpec(id string, sandboxPid uint32, config *runtime.ContainerConfig, sandboxConfig *runtime.PodSandboxConfig) (*runtimespec.Spec, error) { + // Creates a spec Generator with the default spec. + // TODO(random-liu): [P2] Move container runtime spec generation into a helper function. + g := generate.New() + + // Set the relative path to the rootfs of the container from containerd's + // pre-defined directory. + g.SetRootPath(relativeRootfsPath) + + if len(config.GetCommand()) != 0 || len(config.GetArgs()) != 0 { + g.SetProcessArgs(append(config.GetCommand(), config.GetArgs()...)) + } + + if config.GetWorkingDir() != "" { + g.SetProcessCwd(config.GetWorkingDir()) + } + + for _, e := range config.GetEnvs() { + g.AddProcessEnv(e.GetKey(), e.GetValue()) + } + + addOCIBindMounts(&g, config.GetMounts()) + + // TODO(random-liu): [P1] Set device mapping. + // Ref https://github.com/moby/moby/blob/master/oci/devices_linux.go. + + // TODO(random-liu): [P1] Handle container logging, decorate and redirect to file. + + setOCILinuxResource(&g, config.GetLinux().GetResources()) + + if sandboxConfig.GetLinux().GetCgroupParent() != "" { + cgroupsPath := getCgroupsPath(sandboxConfig.GetLinux().GetCgroupParent(), id) + g.SetLinuxCgroupsPath(cgroupsPath) + } + + g.SetProcessTerminal(config.GetTty()) + + securityContext := config.GetLinux().GetSecurityContext() + + if err := setOCICapabilities(&g, securityContext.GetCapabilities()); err != nil { + return nil, fmt.Errorf("failed to set capabilities %+v: %v", + securityContext.GetCapabilities(), err) + } + + // TODO(random-liu): [P0] Handle privileged. + + // Set namespaces, share namespace with sandbox container. + setOCINamespaces(&g, securityContext.GetNamespaceOptions(), sandboxPid) + + // TODO(random-liu): [P1] Set selinux options. + + // TODO(random-liu): [P1] Set user/username. + + supplementalGroups := securityContext.GetSupplementalGroups() + for _, group := range supplementalGroups { + g.AddProcessAdditionalGid(uint32(group)) + } + + g.SetRootReadonly(securityContext.GetReadonlyRootfs()) + + // TODO(random-liu): [P2] Add apparmor and seccomp. + + // TODO(random-liu): [P1] Bind mount sandbox /dev/shm. + + // TODO(random-liu): [P0] Bind mount sandbox resolv.conf. + + return g.Spec(), nil +} + +// addOCIBindMounts adds bind mounts. +func addOCIBindMounts(g *generate.Generator, mounts []*runtime.Mount) { + for _, mount := range mounts { + dst := mount.GetContainerPath() + src := mount.GetHostPath() + options := []string{"rw"} + if mount.GetReadonly() { + options = []string{"ro"} + } + // TODO(random-liu): [P1] Apply selinux label + g.AddBindMount(src, dst, options) + } +} + +// setOCILinuxResource set container resource limit. +func setOCILinuxResource(g *generate.Generator, resources *runtime.LinuxContainerResources) { + if resources == nil { + return + } + g.SetLinuxResourcesCPUPeriod(uint64(resources.GetCpuPeriod())) + g.SetLinuxResourcesCPUQuota(resources.GetCpuQuota()) + g.SetLinuxResourcesCPUShares(uint64(resources.GetCpuShares())) + g.SetLinuxResourcesMemoryLimit(uint64(resources.GetMemoryLimitInBytes())) + g.SetLinuxResourcesOOMScoreAdj(int(resources.GetOomScoreAdj())) +} + +// setOCICapabilities adds/drops process capabilities. +func setOCICapabilities(g *generate.Generator, capabilities *runtime.Capability) error { + if capabilities == nil { + return nil + } + + for _, c := range capabilities.GetAddCapabilities() { + if err := g.AddProcessCapability(c); err != nil { + return err + } + } + + for _, c := range capabilities.GetDropCapabilities() { + if err := g.DropProcessCapability(c); err != nil { + return err + } + } + + return nil +} + +// setOCINamespaces sets namespaces. +func setOCINamespaces(g *generate.Generator, namespaces *runtime.NamespaceOption, sandboxPid uint32) { + g.AddOrReplaceLinuxNamespace(string(runtimespec.NetworkNamespace), getNetworkNamespace(sandboxPid)) // nolint: errcheck + g.AddOrReplaceLinuxNamespace(string(runtimespec.IPCNamespace), getIPCNamespace(sandboxPid)) // nolint: errcheck + g.AddOrReplaceLinuxNamespace(string(runtimespec.UTSNamespace), getUTSNamespace(sandboxPid)) // nolint: errcheck + g.AddOrReplaceLinuxNamespace(string(runtimespec.PIDNamespace), getPIDNamespace(sandboxPid)) // nolint: errcheck } diff --git a/pkg/server/container_status.go b/pkg/server/container_status.go index 78c871964..26a40eb6a 100644 --- a/pkg/server/container_status.go +++ b/pkg/server/container_status.go @@ -17,14 +17,60 @@ limitations under the License. package server import ( - "errors" + "fmt" + "github.com/golang/glog" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" ) // ContainerStatus inspects the container and returns the status. -func (c *criContainerdService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (*runtime.ContainerStatusResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) ContainerStatus(ctx context.Context, r *runtime.ContainerStatusRequest) (retRes *runtime.ContainerStatusResponse, retErr error) { + glog.V(4).Infof("ContainerStatus for container %q", r.GetContainerId()) + defer func() { + if retErr == nil { + glog.V(4).Infof("ContainerStatus for %q returns status %+v", r.GetContainerId(), retRes.GetStatus()) + } + }() + + meta, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + + return &runtime.ContainerStatusResponse{ + Status: toCRIContainerStatus(meta), + }, nil +} + +// toCRIContainerStatus converts container metadata to CRI container status. +func toCRIContainerStatus(meta *metadata.ContainerMetadata) *runtime.ContainerStatus { + state := meta.State() + reason := meta.Reason + if state == runtime.ContainerState_CONTAINER_EXITED && reason == "" { + if meta.ExitCode == 0 { + reason = completeExitReason + } else { + reason = errorExitReason + } + } + return &runtime.ContainerStatus{ + Id: meta.ID, + Metadata: meta.Config.GetMetadata(), + State: state, + CreatedAt: meta.CreatedAt, + StartedAt: meta.StartedAt, + FinishedAt: meta.FinishedAt, + ExitCode: meta.ExitCode, + Image: meta.Config.GetImage(), + ImageRef: meta.ImageRef, + Reason: reason, + Message: meta.Message, + Labels: meta.Config.GetLabels(), + Annotations: meta.Config.GetAnnotations(), + Mounts: meta.Config.GetMounts(), + } } diff --git a/pkg/server/container_stop.go b/pkg/server/container_stop.go index 28e7b156c..c686981c0 100644 --- a/pkg/server/container_stop.go +++ b/pkg/server/container_stop.go @@ -17,14 +17,115 @@ limitations under the License. package server import ( - "errors" + "fmt" + "time" + "github.com/golang/glog" "golang.org/x/net/context" + "golang.org/x/sys/unix" + + "github.com/containerd/containerd/api/services/execution" "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" +) + +const ( + // stopCheckPollInterval is the the interval to check whether a container + // is stopped successfully. + stopCheckPollInterval = 100 * time.Millisecond + + // killContainerTimeout is the timeout that we wait for the container to + // be SIGKILLed. + killContainerTimeout = 2 * time.Minute ) // StopContainer stops a running container with a grace period (i.e., timeout). -func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (*runtime.StopContainerResponse, error) { - return nil, errors.New("not implemented") +func (c *criContainerdService) StopContainer(ctx context.Context, r *runtime.StopContainerRequest) (retRes *runtime.StopContainerResponse, retErr error) { + glog.V(2).Infof("StopContainer for %q with timeout %d (s)", r.GetContainerId(), r.GetTimeout()) + defer func() { + if retErr == nil { + glog.V(2).Infof("StopContainer %q returns successfully", r.GetContainerId()) + } + }() + + // Get container config from container store. + meta, err := c.containerStore.Get(r.GetContainerId()) + if err != nil { + return nil, fmt.Errorf("an error occurred when try to find container %q: %v", r.GetContainerId(), err) + } + id := r.GetContainerId() + + // Return without error if container is not running. This makes sure that + // stop only takes real action after the container is started. + if meta.State() != runtime.ContainerState_CONTAINER_RUNNING { + glog.V(2).Infof("Container to stop %q is not running, current state %q", + id, criContainerStateToString(meta.State())) + return &runtime.StopContainerResponse{}, nil + } + + // TODO(random-liu): [P1] Get stop signal from image config. + stopSignal := unix.SIGTERM + glog.V(2).Infof("Stop container %q with signal %v", id, stopSignal) + _, err = c.containerService.Kill(ctx, &execution.KillRequest{ID: id, Signal: uint32(stopSignal)}) + if err != nil { + if isContainerdContainerNotExistError(err) { + return &runtime.StopContainerResponse{}, nil + } + return nil, fmt.Errorf("failed to stop container %q: %v", id, err) + } + + err = c.waitContainerStop(id, time.Duration(r.GetTimeout())*time.Second) + if err == nil { + return &runtime.StopContainerResponse{}, nil + } + glog.Errorf("Stop container %q timed out: %v", id, err) + + glog.V(2).Infof("Delete container from containerd %q", id) + _, err = c.containerService.Delete(ctx, &execution.DeleteRequest{ID: id}) + if err != nil { + if isContainerdContainerNotExistError(err) { + return &runtime.StopContainerResponse{}, nil + } + return nil, fmt.Errorf("failed to delete container %q: %v", id, err) + } + + // Wait forever until container stop is observed by event monitor. + if err := c.waitContainerStop(id, killContainerTimeout); err != nil { + return nil, fmt.Errorf("error occurs during waiting for container %q to stop: %v", + id, err) + } + return &runtime.StopContainerResponse{}, nil +} + +// waitContainerStop polls container state until timeout exceeds or container is stopped. +func (c *criContainerdService) waitContainerStop(id string, timeout time.Duration) error { + ticker := time.NewTicker(stopCheckPollInterval) + defer ticker.Stop() + timeoutTimer := time.NewTimer(timeout) + defer timeoutTimer.Stop() + for { + // Poll once before waiting for stopCheckPollInterval. + meta, err := c.containerStore.Get(id) + if err != nil { + if !metadata.IsNotExistError(err) { + return fmt.Errorf("failed to get container %q metadata: %v", id, err) + } + // Do not return error here because container was removed means + // it is already stopped. + glog.Warningf("Container %q was removed during stopping", id) + return nil + } + // TODO(random-liu): Use channel with event handler instead of polling. + if meta.State() == runtime.ContainerState_CONTAINER_EXITED { + return nil + } + select { + case <-timeoutTimer.C: + return fmt.Errorf("wait container %q stop timeout", id) + case <-ticker.C: + continue + } + } } diff --git a/pkg/server/events.go b/pkg/server/events.go new file mode 100644 index 000000000..38446126c --- /dev/null +++ b/pkg/server/events.go @@ -0,0 +1,94 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "github.com/golang/glog" + "golang.org/x/net/context" + + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" +) + +// startEventMonitor starts an event monitor which monitors and handles all +// container events. +func (c *criContainerdService) startEventMonitor() error { + events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{}) + if err != nil { + return err + } + go func() { + for { + c.handleEvent(events) + } + }() + return nil +} + +// handleEvent receives an event from contaienrd and handles the event. +func (c *criContainerdService) handleEvent(events execution.ContainerService_EventsClient) { + e, err := events.Recv() + if err != nil { + glog.Errorf("Failed to receive event: %v", err) + return + } + glog.V(2).Infof("Received container event: %+v", e) + switch e.Type { + // If containerd-shim exits unexpectedly, there will be no corresponding event. + // However, containerd could not retrieve container state in that case, so it's + // fine to leave out that case for now. + // TODO(random-liu): [P2] Handle container-shim exit. + case container.Event_EXIT: + meta, err := c.containerStore.Get(e.ID) + if err != nil { + glog.Errorf("Failed to get container %q metadata: %v", e.ID, err) + return + } + if e.Pid != meta.Pid { + // Not init process dies, ignore the event. + return + } + // Delete the container from containerd. + _, err = c.containerService.Delete(context.Background(), &execution.DeleteRequest{ID: e.ID}) + if err != nil && !isContainerdContainerNotExistError(err) { + // TODO(random-liu): [P0] Enqueue the event and retry. + glog.Errorf("Failed to delete container %q: %v", e.ID, err) + return + } + err = c.containerStore.Update(e.ID, func(meta metadata.ContainerMetadata) (metadata.ContainerMetadata, error) { + // If FinishedAt has been set (e.g. with start failure), keep as + // it is. + if meta.FinishedAt != 0 { + return meta, nil + } + meta.Pid = 0 + meta.FinishedAt = e.ExitedAt.UnixNano() + meta.ExitCode = int32(e.ExitStatus) + return meta, nil + }) + if err != nil { + glog.Errorf("Failed to update container %q state: %v", e.ID, err) + // TODO(random-liu): [P0] Enqueue the event and retry. + return + } + case container.Event_OOM: + // TODO(random-liu): [P1] Handle OOM event. + } + return +} diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 54cb7074a..791972581 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -18,11 +18,14 @@ package server import ( "fmt" + "io" "path/filepath" "strings" + "syscall" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/truncindex" + "golang.org/x/net/context" "google.golang.org/grpc" "github.com/containerd/containerd" @@ -32,6 +35,18 @@ import ( "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" ) +const ( + // errorStartReason is the exit reason when fails to start container. + errorStartReason = "StartError" + // errorStartExitCode is the exit code when fails to start container. + // 128 is the same with Docker's behavior. + errorStartExitCode = 128 + // completeExitReason is the exit reason when container exits with code 0. + completeExitReason = "Completed" + // errorExitReason is the exit reason when container exits with code non-zero. + errorExitReason = "Error" +) + const ( // relativeRootfsPath is the rootfs path relative to bundle path. relativeRootfsPath = "rootfs" @@ -42,6 +57,8 @@ const ( // directory of the sandbox, all files created for the sandbox will be // placed under this directory. sandboxesDir = "sandboxes" + // containersDir contains all container root. + containersDir = "containers" // stdinNamedPipe is the name of stdin named pipe. stdinNamedPipe = "stdin" // stdoutNamedPipe is the name of stdout named pipe. @@ -52,6 +69,12 @@ const ( nameDelimiter = "_" // netNSFormat is the format of network namespace of a process. netNSFormat = "/proc/%v/ns/net" + // ipcNSFormat is the format of ipc namespace of a process. + ipcNSFormat = "/proc/%v/ns/ipc" + // utsNSFormat is the format of uts namespace of a process. + utsNSFormat = "/proc/%v/ns/uts" + // pidNSFormat is the format of pid namespace of a process. + pidNSFormat = "/proc/%v/ns/pid" ) // generateID generates a random unique id. @@ -70,6 +93,19 @@ func makeSandboxName(s *runtime.PodSandboxMetadata) string { }, nameDelimiter) } +// makeContainerName generates container name from sandbox and container metadata. +// The name generated is unique as long as the sandbox container combination is +// unique. +func makeContainerName(c *runtime.ContainerMetadata, s *runtime.PodSandboxMetadata) string { + return strings.Join([]string{ + c.Name, // 0 + s.Name, // 1: sandbox name + s.Namespace, // 2: sandbox namespace + s.Uid, // 3: sandbox uid + fmt.Sprintf("%d", c.Attempt), // 4 + }, nameDelimiter) +} + // getCgroupsPath generates container cgroups path. func getCgroupsPath(cgroupsParent string, id string) string { // TODO(random-liu): [P0] Handle systemd. @@ -82,6 +118,11 @@ func getSandboxRootDir(rootDir, id string) string { return filepath.Join(rootDir, sandboxesDir, id) } +// getContainerRootDir returns the root directory for managing container files. +func getContainerRootDir(rootDir, id string) string { + return filepath.Join(rootDir, containersDir, id) +} + // getStreamingPipes returns the stdin/stdout/stderr pipes path in the root. func getStreamingPipes(rootDir string) (string, string, string) { stdin := filepath.Join(rootDir, stdinNamedPipe) @@ -90,11 +131,57 @@ func getStreamingPipes(rootDir string) (string, string, string) { return stdin, stdout, stderr } +// prepareStreamingPipes prepares stream named pipe for container. returns nil +// streaming handler if corresponding stream path is empty. +func (c *criContainerdService) prepareStreamingPipes(ctx context.Context, stdin, stdout, stderr string) ( + i io.WriteCloser, o io.ReadCloser, e io.ReadCloser, retErr error) { + pipes := map[string]io.ReadWriteCloser{} + for t, stream := range map[string]struct { + path string + flag int + }{ + "stdin": {stdin, syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, + "stdout": {stdout, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, + "stderr": {stderr, syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK}, + } { + if stream.path == "" { + continue + } + s, err := c.os.OpenFifo(ctx, stream.path, stream.flag, 0700) + if err != nil { + return nil, nil, nil, fmt.Errorf("failed to open named pipe %q: %v", + stream.path, err) + } + defer func(cl io.Closer) { + if retErr != nil { + cl.Close() + } + }(s) + pipes[t] = s + } + return pipes["stdin"], pipes["stdout"], pipes["stderr"], nil +} + // getNetworkNamespace returns the network namespace of a process. func getNetworkNamespace(pid uint32) string { return fmt.Sprintf(netNSFormat, pid) } +// getIPCNamespace returns the ipc namespace of a process. +func getIPCNamespace(pid uint32) string { + return fmt.Sprintf(ipcNSFormat, pid) +} + +// getUTSNamespace returns the uts namespace of a process. +func getUTSNamespace(pid uint32) string { + return fmt.Sprintf(utsNSFormat, pid) +} + +// getPIDNamespace returns the pid namespace of a process. +func getPIDNamespace(pid uint32) string { + return fmt.Sprintf(pidNSFormat, pid) +} + // isContainerdContainerNotExistError checks whether a grpc error is containerd // ErrContainerNotExist error. // TODO(random-liu): Containerd should expose error better through api. @@ -124,3 +211,8 @@ func (c *criContainerdService) getSandbox(id string) (*metadata.SandboxMetadata, } return c.sandboxStore.Get(id) } + +// criContainerStateToString formats CRI container state to string. +func criContainerStateToString(state runtime.ContainerState) string { + return runtime.ContainerState_name[int32(state)] +} diff --git a/pkg/server/sandbox_list.go b/pkg/server/sandbox_list.go index 5baf81919..0d1793190 100644 --- a/pkg/server/sandbox_list.go +++ b/pkg/server/sandbox_list.go @@ -117,7 +117,8 @@ func (c *criContainerdService) filterCRISandboxes(sandboxes []*runtime.PodSandbo if filter.GetLabelSelector() != nil { match := true for k, v := range filter.GetLabelSelector() { - if s.Labels[k] != v { + got, ok := s.Labels[k] + if !ok || got != v { match = false break } diff --git a/pkg/server/sandbox_remove.go b/pkg/server/sandbox_remove.go index eb11cf292..e47adc891 100644 --- a/pkg/server/sandbox_remove.go +++ b/pkg/server/sandbox_remove.go @@ -35,7 +35,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. glog.V(2).Infof("RemovePodSandbox for sandbox %q", r.GetPodSandboxId()) defer func() { if retErr == nil { - glog.V(2).Info("RemovePodSandbox returns successfully") + glog.V(2).Info("RemovePodSandbox %q returns successfully", r.GetPodSandboxId()) } }() @@ -56,6 +56,7 @@ func (c *criContainerdService) RemovePodSandbox(ctx context.Context, r *runtime. // TODO(random-liu): [P2] Remove all containers in the sandbox. // Return error if sandbox container is not fully stopped. + // TODO(random-liu): [P0] Make sure network is torn down, may need to introduce a state. _, err = c.containerService.Info(ctx, &execution.InfoRequest{ID: id}) if err != nil && !isContainerdContainerNotExistError(err) { return nil, fmt.Errorf("failed to get sandbox container info for %q: %v", id, err) diff --git a/pkg/server/sandbox_run.go b/pkg/server/sandbox_run.go index 4cbb7b8b2..c0acc84d4 100644 --- a/pkg/server/sandbox_run.go +++ b/pkg/server/sandbox_run.go @@ -21,7 +21,6 @@ import ( "fmt" "io" "io/ioutil" - "syscall" "time" prototypes "github.com/gogo/protobuf/types" @@ -109,14 +108,14 @@ func (c *criContainerdService) RunPodSandbox(ctx context.Context, r *runtime.Run // TODO(random-liu): [P1] Moving following logging related logic into util functions. // Discard sandbox container output because we don't care about it. _, stdout, stderr := getStreamingPipes(sandboxRootDir) - for _, p := range []string{stdout, stderr} { - f, err := c.os.OpenFifo(ctx, p, syscall.O_RDONLY|syscall.O_CREAT|syscall.O_NONBLOCK, 0700) - if err != nil { - return nil, fmt.Errorf("failed to open named pipe %q: %v", p, err) - } - defer func(c io.Closer) { + _, stdoutPipe, stderrPipe, err := c.prepareStreamingPipes(ctx, "", stdout, stderr) + if err != nil { + return nil, fmt.Errorf("failed to prepare streaming pipes: %v", err) + } + for _, f := range []io.ReadCloser{stdoutPipe, stderrPipe} { + defer func(cl io.Closer) { if retErr != nil { - c.Close() + cl.Close() } }(f) go func(r io.ReadCloser) { diff --git a/pkg/server/sandbox_status.go b/pkg/server/sandbox_status.go index a54c4a608..08fd0b53f 100644 --- a/pkg/server/sandbox_status.go +++ b/pkg/server/sandbox_status.go @@ -35,7 +35,7 @@ func (c *criContainerdService) PodSandboxStatus(ctx context.Context, r *runtime. glog.V(4).Infof("PodSandboxStatus for sandbox %q", r.GetPodSandboxId()) defer func() { if retErr == nil { - glog.V(4).Infof("PodSandboxStatus returns status %+v", retRes.GetStatus()) + glog.V(4).Infof("PodSandboxStatus for %q returns status %+v", r.GetPodSandboxId(), retRes.GetStatus()) } }() diff --git a/pkg/server/sandbox_stop.go b/pkg/server/sandbox_stop.go index 53285634a..c4107bb47 100644 --- a/pkg/server/sandbox_stop.go +++ b/pkg/server/sandbox_stop.go @@ -33,7 +33,7 @@ func (c *criContainerdService) StopPodSandbox(ctx context.Context, r *runtime.St glog.V(2).Infof("StopPodSandbox for sandbox %q", r.GetPodSandboxId()) defer func() { if retErr == nil { - glog.V(2).Info("StopPodSandbox returns successfully") + glog.V(2).Info("StopPodSandbox %q returns successfully", r.GetPodSandboxId()) } }() diff --git a/pkg/server/service.go b/pkg/server/service.go index 9909ab8e5..d05011c39 100644 --- a/pkg/server/service.go +++ b/pkg/server/service.go @@ -53,6 +53,7 @@ import ( // CRIContainerdService is the interface implement CRI remote service server. type CRIContainerdService interface { + Start() error runtime.RuntimeServiceServer runtime.ImageServiceServer } @@ -74,6 +75,11 @@ type criContainerdService struct { // id "abcdefg" is added, we could use "abcd" to identify the same thing // as long as there is no ambiguity. sandboxIDIndex *truncindex.TruncIndex + // containerStore stores all container metadata. + containerStore metadata.ContainerStore + // containerNameIndex stores all container names and make sure each + // name is unique. + containerNameIndex *registrar.Registrar // containerService is containerd container service client. containerService execution.ContainerServiceClient // contentIngester is the containerd service to ingest content into @@ -98,14 +104,22 @@ func NewCRIContainerdService(conn *grpc.ClientConn, rootDir string) CRIContainer os: osinterface.RealOS{}, rootDir: rootDir, sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()), + containerStore: metadata.NewContainerStore(store.NewMetadataStore()), imageMetadataStore: metadata.NewImageMetadataStore(store.NewMetadataStore()), - // TODO(random-liu): Register sandbox id/name for recovered sandbox. - sandboxNameIndex: registrar.NewRegistrar(), - sandboxIDIndex: truncindex.NewTruncIndex(nil), - containerService: execution.NewContainerServiceClient(conn), - imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), - contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)), - contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), - rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)), + // TODO(random-liu): Register sandbox/container id/name for recovered sandbox/container. + // TODO(random-liu): Use the same name and id index for both container and sandbox. + sandboxNameIndex: registrar.NewRegistrar(), + sandboxIDIndex: truncindex.NewTruncIndex(nil), + // TODO(random-liu): Add container id index. + containerNameIndex: registrar.NewRegistrar(), + containerService: execution.NewContainerServiceClient(conn), + imageStoreService: imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), + contentIngester: contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)), + contentProvider: contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)), + rootfsUnpacker: rootfsservice.NewUnpackerFromClient(rootfsapi.NewRootFSClient(conn)), } } + +func (c *criContainerdService) Start() error { + return c.startEventMonitor() +} From 322b6ef33399e9c5ea06b943272ea6423708d4cd Mon Sep 17 00:00:00 2001 From: Lantao Liu Date: Tue, 16 May 2017 19:49:07 +0000 Subject: [PATCH 4/4] Add unit test. Signed-off-by: Lantao Liu --- pkg/server/container_create_test.go | 157 ++++++++ pkg/server/container_list_test.go | 229 +++++++++++ pkg/server/container_remove_test.go | 175 +++++++++ pkg/server/container_start_test.go | 402 ++++++++++++++++++++ pkg/server/container_status_test.go | 173 +++++++++ pkg/server/container_stop_test.go | 199 ++++++++++ pkg/server/events.go | 23 +- pkg/server/events_test.go | 154 ++++++++ pkg/server/helpers_test.go | 93 +++++ pkg/server/service_test.go | 19 +- pkg/server/testing/fake_execution_client.go | 24 +- 11 files changed, 1633 insertions(+), 15 deletions(-) create mode 100644 pkg/server/container_create_test.go create mode 100644 pkg/server/container_list_test.go create mode 100644 pkg/server/container_remove_test.go create mode 100644 pkg/server/container_start_test.go create mode 100644 pkg/server/container_status_test.go create mode 100644 pkg/server/container_stop_test.go create mode 100644 pkg/server/events_test.go diff --git a/pkg/server/container_create_test.go b/pkg/server/container_create_test.go new file mode 100644 index 000000000..6deee479f --- /dev/null +++ b/pkg/server/container_create_test.go @@ -0,0 +1,157 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "errors" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" +) + +func TestCreateContainer(t *testing.T) { + testSandboxID := "test-sandbox-id" + testNameMeta := &runtime.ContainerMetadata{ + Name: "test-name", + Attempt: 1, + } + testSandboxNameMeta := &runtime.PodSandboxMetadata{ + Name: "test-sandbox-name", + Uid: "test-sandbox-uid", + Namespace: "test-sandbox-namespace", + Attempt: 2, + } + testConfig := &runtime.ContainerConfig{ + Metadata: testNameMeta, + Image: &runtime.ImageSpec{ + Image: "test-image", + }, + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + } + testSandboxConfig := &runtime.PodSandboxConfig{ + Metadata: testSandboxNameMeta, + } + + for desc, test := range map[string]struct { + sandboxMetadata *metadata.SandboxMetadata + reserveNameErr bool + createRootDirErr error + createMetadataErr bool + expectErr bool + expectMeta *metadata.ContainerMetadata + }{ + "should return error if sandbox does not exist": { + sandboxMetadata: nil, + expectErr: true, + }, + "should return error if name is reserved": { + sandboxMetadata: &metadata.SandboxMetadata{ + ID: testSandboxID, + Name: makeSandboxName(testSandboxNameMeta), + Config: testSandboxConfig, + }, + reserveNameErr: true, + expectErr: true, + }, + "should return error if fail to create root directory": { + sandboxMetadata: &metadata.SandboxMetadata{ + ID: testSandboxID, + Name: makeSandboxName(testSandboxNameMeta), + Config: testSandboxConfig, + }, + createRootDirErr: errors.New("random error"), + expectErr: true, + }, + "should be able to create container successfully": { + sandboxMetadata: &metadata.SandboxMetadata{ + ID: testSandboxID, + Name: makeSandboxName(testSandboxNameMeta), + Config: testSandboxConfig, + }, + expectErr: false, + expectMeta: &metadata.ContainerMetadata{ + Name: makeContainerName(testNameMeta, testSandboxNameMeta), + SandboxID: testSandboxID, + Config: testConfig, + }, + }, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + fakeOS := c.os.(*ostesting.FakeOS) + if test.sandboxMetadata != nil { + assert.NoError(t, c.sandboxStore.Create(*test.sandboxMetadata)) + } + containerName := makeContainerName(testNameMeta, testSandboxNameMeta) + if test.reserveNameErr { + assert.NoError(t, c.containerNameIndex.Reserve(containerName, "random id")) + } + rootExists := false + rootPath := "" + fakeOS.MkdirAllFn = func(path string, perm os.FileMode) error { + assert.Equal(t, os.FileMode(0755), perm) + rootPath = path + if test.createRootDirErr == nil { + rootExists = true + } + return test.createRootDirErr + } + fakeOS.RemoveAllFn = func(path string) error { + assert.Equal(t, rootPath, path) + rootExists = false + return nil + } + resp, err := c.CreateContainer(context.Background(), &runtime.CreateContainerRequest{ + PodSandboxId: testSandboxID, + Config: testConfig, + SandboxConfig: testSandboxConfig, + }) + if test.expectErr { + assert.Error(t, err) + assert.Nil(t, resp) + assert.False(t, rootExists, "root directory should be cleaned up") + if !test.reserveNameErr { + assert.NoError(t, c.containerNameIndex.Reserve(containerName, "random id"), + "container name should be released") + } + metas, err := c.containerStore.List() + assert.NoError(t, err) + assert.Empty(t, metas, "container metadata should not be created") + continue + } + assert.NoError(t, err) + assert.NotNil(t, resp) + id := resp.GetContainerId() + assert.True(t, rootExists) + assert.Equal(t, getContainerRootDir(c.rootDir, id), rootPath, "root directory should be created") + meta, err := c.containerStore.Get(id) + assert.NoError(t, err) + require.NotNil(t, meta) + test.expectMeta.ID = id + // TODO(random-liu): Use fake clock to test CreatedAt. + test.expectMeta.CreatedAt = meta.CreatedAt + assert.Equal(t, test.expectMeta, meta, "container metadata should be created") + } +} diff --git a/pkg/server/container_list_test.go b/pkg/server/container_list_test.go new file mode 100644 index 000000000..2c3b6b44c --- /dev/null +++ b/pkg/server/container_list_test.go @@ -0,0 +1,229 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" +) + +func TestToCRIContainer(t *testing.T) { + config := &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: "test-name", + Attempt: 1, + }, + Image: &runtime.ImageSpec{Image: "test-image"}, + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + } + createdAt := time.Now().UnixNano() + meta := &metadata.ContainerMetadata{ + ID: "test-id", + Name: "test-name", + SandboxID: "test-sandbox-id", + Config: config, + ImageRef: "test-image-ref", + Pid: 1234, + CreatedAt: createdAt, + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + ExitCode: 1, + Reason: "test-reason", + Message: "test-message", + } + expect := &runtime.Container{ + Id: "test-id", + PodSandboxId: "test-sandbox-id", + Metadata: config.GetMetadata(), + Image: config.GetImage(), + ImageRef: "test-image-ref", + State: runtime.ContainerState_CONTAINER_EXITED, + CreatedAt: createdAt, + Labels: config.GetLabels(), + Annotations: config.GetAnnotations(), + } + c := toCRIContainer(meta) + assert.Equal(t, expect, c) +} + +func TestFilterContainers(t *testing.T) { + c := newTestCRIContainerdService() + + testContainers := []*runtime.Container{ + { + Id: "1", + PodSandboxId: "s-1", + Metadata: &runtime.ContainerMetadata{Name: "name-1", Attempt: 1}, + State: runtime.ContainerState_CONTAINER_RUNNING, + }, + { + Id: "2", + PodSandboxId: "s-2", + Metadata: &runtime.ContainerMetadata{Name: "name-2", Attempt: 2}, + State: runtime.ContainerState_CONTAINER_EXITED, + Labels: map[string]string{"a": "b"}, + }, + { + Id: "3", + PodSandboxId: "s-2", + Metadata: &runtime.ContainerMetadata{Name: "name-2", Attempt: 3}, + State: runtime.ContainerState_CONTAINER_CREATED, + Labels: map[string]string{"c": "d"}, + }, + } + for desc, test := range map[string]struct { + filter *runtime.ContainerFilter + expect []*runtime.Container + }{ + "no filter": { + expect: testContainers, + }, + "id filter": { + filter: &runtime.ContainerFilter{Id: "2"}, + expect: []*runtime.Container{testContainers[1]}, + }, + "state filter": { + filter: &runtime.ContainerFilter{ + State: &runtime.ContainerStateValue{ + State: runtime.ContainerState_CONTAINER_EXITED, + }, + }, + expect: []*runtime.Container{testContainers[1]}, + }, + "label filter": { + filter: &runtime.ContainerFilter{ + LabelSelector: map[string]string{"a": "b"}, + }, + expect: []*runtime.Container{testContainers[1]}, + }, + "sandbox id filter": { + filter: &runtime.ContainerFilter{PodSandboxId: "s-2"}, + expect: []*runtime.Container{testContainers[1], testContainers[2]}, + }, + "mixed filter not matched": { + filter: &runtime.ContainerFilter{ + Id: "1", + PodSandboxId: "s-2", + LabelSelector: map[string]string{"a": "b"}, + }, + expect: []*runtime.Container{}, + }, + "mixed filter matched": { + filter: &runtime.ContainerFilter{ + PodSandboxId: "s-2", + State: &runtime.ContainerStateValue{ + State: runtime.ContainerState_CONTAINER_CREATED, + }, + LabelSelector: map[string]string{"c": "d"}, + }, + expect: []*runtime.Container{testContainers[2]}, + }, + } { + filtered := c.filterCRIContainers(testContainers, test.filter) + assert.Equal(t, test.expect, filtered, desc) + } +} + +func TestListContainers(t *testing.T) { + c := newTestCRIContainerdService() + + createdAt := time.Now().UnixNano() + startedAt := time.Now().UnixNano() + finishedAt := time.Now().UnixNano() + containersInStore := []metadata.ContainerMetadata{ + { + ID: "1", + Name: "name-1", + SandboxID: "s-1", + Config: &runtime.ContainerConfig{Metadata: &runtime.ContainerMetadata{Name: "name-1"}}, + CreatedAt: createdAt, + }, + { + ID: "2", + Name: "name-2", + SandboxID: "s-1", + Config: &runtime.ContainerConfig{Metadata: &runtime.ContainerMetadata{Name: "name-2"}}, + CreatedAt: createdAt, + StartedAt: startedAt, + }, + { + ID: "3", + Name: "name-3", + SandboxID: "s-1", + Config: &runtime.ContainerConfig{Metadata: &runtime.ContainerMetadata{Name: "name-3"}}, + CreatedAt: createdAt, + StartedAt: startedAt, + FinishedAt: finishedAt, + }, + { + ID: "4", + Name: "name-4", + SandboxID: "s-2", + Config: &runtime.ContainerConfig{Metadata: &runtime.ContainerMetadata{Name: "name-4"}}, + CreatedAt: createdAt, + }, + } + filter := &runtime.ContainerFilter{ + PodSandboxId: "s-1", + } + expect := []*runtime.Container{ + { + Id: "1", + PodSandboxId: "s-1", + Metadata: &runtime.ContainerMetadata{Name: "name-1"}, + State: runtime.ContainerState_CONTAINER_CREATED, + CreatedAt: createdAt, + }, + { + Id: "2", + PodSandboxId: "s-1", + Metadata: &runtime.ContainerMetadata{Name: "name-2"}, + State: runtime.ContainerState_CONTAINER_RUNNING, + CreatedAt: createdAt, + }, + { + Id: "3", + PodSandboxId: "s-1", + Metadata: &runtime.ContainerMetadata{Name: "name-3"}, + State: runtime.ContainerState_CONTAINER_EXITED, + CreatedAt: createdAt, + }, + } + + // Inject test metadata + for _, cntr := range containersInStore { + c.containerStore.Create(cntr) + } + + resp, err := c.ListContainers(context.Background(), &runtime.ListContainersRequest{Filter: filter}) + assert.NoError(t, err) + require.NotNil(t, resp) + containers := resp.GetContainers() + assert.Len(t, containers, len(expect)) + for _, cntr := range expect { + assert.Contains(t, containers, cntr) + } +} diff --git a/pkg/server/container_remove_test.go b/pkg/server/container_remove_test.go new file mode 100644 index 000000000..10a18d22d --- /dev/null +++ b/pkg/server/container_remove_test.go @@ -0,0 +1,175 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" +) + +// TestSetContainerRemoving tests setContainerRemoving sets removing +// state correctly. +func TestSetContainerRemoving(t *testing.T) { + testID := "test-id" + for desc, test := range map[string]struct { + metadata *metadata.ContainerMetadata + expectErr bool + }{ + "should return error when container is in running state": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + }, + expectErr: true, + }, + "should return error when container is in removing state": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + Removing: true, + }, + expectErr: true, + }, + "should not return error when container is not running and removing": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + }, + expectErr: false, + }, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + if test.metadata != nil { + assert.NoError(t, c.containerStore.Create(*test.metadata)) + } + err := c.setContainerRemoving(testID) + meta, getErr := c.containerStore.Get(testID) + assert.NoError(t, getErr) + if test.expectErr { + assert.Error(t, err) + assert.Equal(t, test.metadata, meta, "metadata should not be updated") + } else { + assert.NoError(t, err) + assert.True(t, meta.Removing, "removing should be set") + } + } +} + +func TestRemoveContainer(t *testing.T) { + testID := "test-id" + testName := "test-name" + for desc, test := range map[string]struct { + metadata *metadata.ContainerMetadata + removeDirErr error + expectErr bool + expectUnsetRemoving bool + }{ + "should return error when container is still running": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + }, + expectErr: true, + }, + "should return error when there is ongoing removing": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + Removing: true, + }, + expectErr: true, + }, + "should not return error if container does not exist": { + metadata: nil, + expectErr: false, + }, + "should return error if remove container root fails": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + }, + removeDirErr: errors.New("random error"), + expectErr: true, + expectUnsetRemoving: true, + }, + "should be able to remove container successfully": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + }, + expectErr: false, + }, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + fakeOS := c.os.(*ostesting.FakeOS) + if test.metadata != nil { + assert.NoError(t, c.containerNameIndex.Reserve(testName, testID)) + assert.NoError(t, c.containerStore.Create(*test.metadata)) + } + fakeOS.RemoveAllFn = func(path string) error { + assert.Equal(t, getContainerRootDir(c.rootDir, testID), path) + return test.removeDirErr + } + resp, err := c.RemoveContainer(context.Background(), &runtime.RemoveContainerRequest{ + ContainerId: testID, + }) + if test.expectErr { + assert.Error(t, err) + assert.Nil(t, resp) + if !test.expectUnsetRemoving { + continue + } + meta, err := c.containerStore.Get(testID) + assert.NoError(t, err) + require.NotNil(t, meta) + // Also covers resetContainerRemoving. + assert.False(t, meta.Removing, "removing state should be unset") + continue + } + assert.NoError(t, err) + assert.NotNil(t, resp) + meta, err := c.containerStore.Get(testID) + assert.Error(t, err) + assert.True(t, metadata.IsNotExistError(err)) + assert.Nil(t, meta, "container metadata should be removed") + assert.NoError(t, c.containerNameIndex.Reserve(testName, testID), + "container name should be released") + } +} diff --git a/pkg/server/container_start_test.go b/pkg/server/container_start_test.go new file mode 100644 index 000000000..7ea788560 --- /dev/null +++ b/pkg/server/container_start_test.go @@ -0,0 +1,402 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "encoding/json" + "errors" + "io" + "os" + "testing" + "time" + + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + runtimespec "github.com/opencontainers/runtime-spec/specs-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" + servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing" +) + +func getStartContainerTestData() (*runtime.ContainerConfig, *runtime.PodSandboxConfig, + func(*testing.T, string, uint32, *runtimespec.Spec)) { + config := &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: "test-name", + Attempt: 1, + }, + Command: []string{"test", "command"}, + Args: []string{"test", "args"}, + WorkingDir: "test-cwd", + Envs: []*runtime.KeyValue{ + {Key: "k1", Value: "v1"}, + {Key: "k2", Value: "v2"}, + }, + Mounts: []*runtime.Mount{ + { + ContainerPath: "container-path-1", + HostPath: "host-path-1", + }, + { + ContainerPath: "container-path-2", + HostPath: "host-path-2", + Readonly: true, + }, + }, + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + Linux: &runtime.LinuxContainerConfig{ + Resources: &runtime.LinuxContainerResources{ + CpuPeriod: 100, + CpuQuota: 200, + CpuShares: 300, + MemoryLimitInBytes: 400, + OomScoreAdj: 500, + }, + SecurityContext: &runtime.LinuxContainerSecurityContext{ + Capabilities: &runtime.Capability{ + AddCapabilities: []string{"CAP_SYS_ADMIN"}, + DropCapabilities: []string{"CAP_CHOWN"}, + }, + SupplementalGroups: []int64{1111, 2222}, + }, + }, + } + sandboxConfig := &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: "test-sandbox-name", + Uid: "test-sandbox-uid", + Namespace: "test-sandbox-ns", + Attempt: 2, + }, + Linux: &runtime.LinuxPodSandboxConfig{ + CgroupParent: "/test/cgroup/parent", + }, + } + specCheck := func(t *testing.T, id string, sandboxPid uint32, spec *runtimespec.Spec) { + assert.Equal(t, relativeRootfsPath, spec.Root.Path) + assert.Equal(t, []string{"test", "command", "test", "args"}, spec.Process.Args) + assert.Equal(t, "test-cwd", spec.Process.Cwd) + assert.Contains(t, spec.Process.Env, "k1=v1", "k2=v2") + + t.Logf("Check bind mount") + found1, found2 := false, false + for _, m := range spec.Mounts { + if m.Source == "host-path-1" { + assert.Equal(t, m.Destination, "container-path-1") + assert.Contains(t, m.Options, "rw") + found1 = true + } + if m.Source == "host-path-2" { + assert.Equal(t, m.Destination, "container-path-2") + assert.Contains(t, m.Options, "ro") + found2 = true + } + } + assert.True(t, found1) + assert.True(t, found2) + + t.Logf("Check resource limits") + assert.EqualValues(t, *spec.Linux.Resources.CPU.Period, 100) + assert.EqualValues(t, *spec.Linux.Resources.CPU.Quota, 200) + assert.EqualValues(t, *spec.Linux.Resources.CPU.Shares, 300) + assert.EqualValues(t, *spec.Linux.Resources.Memory.Limit, 400) + assert.EqualValues(t, *spec.Linux.Resources.OOMScoreAdj, 500) + + t.Logf("Check capabilities") + assert.Contains(t, spec.Process.Capabilities.Bounding, "CAP_SYS_ADMIN") + assert.Contains(t, spec.Process.Capabilities.Effective, "CAP_SYS_ADMIN") + assert.Contains(t, spec.Process.Capabilities.Inheritable, "CAP_SYS_ADMIN") + assert.Contains(t, spec.Process.Capabilities.Permitted, "CAP_SYS_ADMIN") + assert.Contains(t, spec.Process.Capabilities.Ambient, "CAP_SYS_ADMIN") + assert.NotContains(t, spec.Process.Capabilities.Bounding, "CAP_CHOWN") + assert.NotContains(t, spec.Process.Capabilities.Effective, "CAP_CHOWN") + assert.NotContains(t, spec.Process.Capabilities.Inheritable, "CAP_CHOWN") + assert.NotContains(t, spec.Process.Capabilities.Permitted, "CAP_CHOWN") + assert.NotContains(t, spec.Process.Capabilities.Ambient, "CAP_CHOWN") + + t.Logf("Check supplemental groups") + assert.Contains(t, spec.Process.User.AdditionalGids, uint32(1111)) + assert.Contains(t, spec.Process.User.AdditionalGids, uint32(2222)) + + t.Logf("Check cgroup path") + assert.Equal(t, getCgroupsPath("/test/cgroup/parent", id), spec.Linux.CgroupsPath) + + t.Logf("Check namespaces") + assert.Contains(t, spec.Linux.Namespaces, runtimespec.LinuxNamespace{ + Type: runtimespec.NetworkNamespace, + Path: getNetworkNamespace(sandboxPid), + }) + assert.Contains(t, spec.Linux.Namespaces, runtimespec.LinuxNamespace{ + Type: runtimespec.IPCNamespace, + Path: getIPCNamespace(sandboxPid), + }) + assert.Contains(t, spec.Linux.Namespaces, runtimespec.LinuxNamespace{ + Type: runtimespec.UTSNamespace, + Path: getUTSNamespace(sandboxPid), + }) + assert.Contains(t, spec.Linux.Namespaces, runtimespec.LinuxNamespace{ + Type: runtimespec.PIDNamespace, + Path: getPIDNamespace(sandboxPid), + }) + } + return config, sandboxConfig, specCheck +} + +func TestGeneralContainerSpec(t *testing.T) { + testID := "test-id" + testPid := uint32(1234) + config, sandboxConfig, specCheck := getStartContainerTestData() + c := newTestCRIContainerdService() + spec, err := c.generateContainerSpec(testID, testPid, config, sandboxConfig) + assert.NoError(t, err) + specCheck(t, testID, testPid, spec) +} + +func TestContainerSpecTty(t *testing.T) { + testID := "test-id" + testPid := uint32(1234) + config, sandboxConfig, specCheck := getStartContainerTestData() + c := newTestCRIContainerdService() + for _, tty := range []bool{true, false} { + config.Tty = tty + spec, err := c.generateContainerSpec(testID, testPid, config, sandboxConfig) + assert.NoError(t, err) + specCheck(t, testID, testPid, spec) + assert.Equal(t, tty, spec.Process.Terminal) + } +} + +func TestContainerSpecReadonlyRootfs(t *testing.T) { + testID := "test-id" + testPid := uint32(1234) + config, sandboxConfig, specCheck := getStartContainerTestData() + c := newTestCRIContainerdService() + for _, readonly := range []bool{true, false} { + config.Linux.SecurityContext.ReadonlyRootfs = readonly + spec, err := c.generateContainerSpec(testID, testPid, config, sandboxConfig) + assert.NoError(t, err) + specCheck(t, testID, testPid, spec) + assert.Equal(t, readonly, spec.Root.Readonly) + } +} + +func TestStartContainer(t *testing.T) { + testID := "test-id" + testSandboxID := "test-sandbox-id" + testSandboxPid := uint32(4321) + config, sandboxConfig, specCheck := getStartContainerTestData() + testMetadata := &metadata.ContainerMetadata{ + ID: testID, + Name: "test-name", + SandboxID: testSandboxID, + Config: config, + CreatedAt: time.Now().UnixNano(), + } + testSandboxMetadata := &metadata.SandboxMetadata{ + ID: testSandboxID, + Name: "test-sandbox-name", + Config: sandboxConfig, + } + testSandboxContainer := &container.Container{ + ID: testSandboxID, + Pid: testSandboxPid, + Status: container.Status_RUNNING, + } + for desc, test := range map[string]struct { + containerMetadata *metadata.ContainerMetadata + sandboxMetadata *metadata.SandboxMetadata + sandboxContainerdContainer *container.Container + prepareFIFOErr error + createContainerErr error + startContainerErr error + expectStateChange bool + expectCalls []string + expectErr bool + }{ + "should return error when container does not exist": { + containerMetadata: nil, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: testSandboxContainer, + expectCalls: []string{}, + expectErr: true, + }, + "should return error when container is not in created state": { + containerMetadata: &metadata.ContainerMetadata{ + ID: testID, + Name: "test-name", + SandboxID: testSandboxID, + Config: config, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + }, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: testSandboxContainer, + expectCalls: []string{}, + expectErr: true, + }, + "should return error when container is in removing state": { + containerMetadata: &metadata.ContainerMetadata{ + ID: testID, + Name: "test-name", + SandboxID: testSandboxID, + Config: config, + CreatedAt: time.Now().UnixNano(), + Removing: true, + }, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: testSandboxContainer, + expectCalls: []string{}, + expectErr: true, + }, + "should return error when sandbox does not exist": { + containerMetadata: testMetadata, + sandboxMetadata: nil, + sandboxContainerdContainer: testSandboxContainer, + expectStateChange: true, + expectCalls: []string{}, + expectErr: true, + }, + "should return error when sandbox is not running": { + containerMetadata: testMetadata, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: &container.Container{ + ID: testSandboxID, + Pid: testSandboxPid, + Status: container.Status_STOPPED, + }, + expectStateChange: true, + expectCalls: []string{"info"}, + expectErr: true, + }, + "should return error when fail to open streaming pipes": { + containerMetadata: testMetadata, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: testSandboxContainer, + prepareFIFOErr: errors.New("open error"), + expectStateChange: true, + expectCalls: []string{"info"}, + expectErr: true, + }, + "should return error when fail to create container": { + containerMetadata: testMetadata, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: testSandboxContainer, + createContainerErr: errors.New("create error"), + expectStateChange: true, + expectCalls: []string{"info", "create"}, + expectErr: true, + }, + "should return error when fail to start container": { + containerMetadata: testMetadata, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: testSandboxContainer, + startContainerErr: errors.New("start error"), + expectStateChange: true, + // cleanup the containerd container. + expectCalls: []string{"info", "create", "start", "delete"}, + expectErr: true, + }, + "should be able to start container successfully": { + containerMetadata: testMetadata, + sandboxMetadata: testSandboxMetadata, + sandboxContainerdContainer: testSandboxContainer, + expectStateChange: true, + expectCalls: []string{"info", "create", "start"}, + expectErr: false, + }, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + fake := c.containerService.(*servertesting.FakeExecutionClient) + fakeOS := c.os.(*ostesting.FakeOS) + if test.containerMetadata != nil { + assert.NoError(t, c.containerStore.Create(*test.containerMetadata)) + } + if test.sandboxMetadata != nil { + assert.NoError(t, c.sandboxStore.Create(*test.sandboxMetadata)) + } + if test.sandboxContainerdContainer != nil { + fake.SetFakeContainers([]container.Container{*test.sandboxContainerdContainer}) + } + // TODO(random-liu): Test behavior with different streaming config. + fakeOS.OpenFifoFn = func(context.Context, string, int, os.FileMode) (io.ReadWriteCloser, error) { + return nopReadWriteCloser{}, test.prepareFIFOErr + } + if test.createContainerErr != nil { + fake.InjectError("create", test.createContainerErr) + } + if test.startContainerErr != nil { + fake.InjectError("start", test.startContainerErr) + } + resp, err := c.StartContainer(context.Background(), &runtime.StartContainerRequest{ + ContainerId: testID, + }) + // Check containerd functions called. + assert.Equal(t, test.expectCalls, fake.GetCalledNames()) + // Check results returned. + if test.expectErr { + assert.Error(t, err) + assert.Nil(t, resp) + } else { + assert.NoError(t, err) + assert.NotNil(t, resp) + } + // Check container state. + meta, err := c.containerStore.Get(testID) + if !test.expectStateChange { + // Do not check the error, because container may not exist + // in the test case. + assert.Equal(t, meta, test.containerMetadata) + continue + } + assert.NoError(t, err) + require.NotNil(t, meta) + if test.expectErr { + t.Logf("container state should be in exited state when fail to start") + assert.Equal(t, runtime.ContainerState_CONTAINER_EXITED, meta.State()) + assert.Zero(t, meta.Pid) + assert.EqualValues(t, errorStartExitCode, meta.ExitCode) + assert.Equal(t, errorStartReason, meta.Reason) + assert.NotEmpty(t, meta.Message) + _, err := fake.Info(context.Background(), &execution.InfoRequest{ID: testID}) + assert.True(t, isContainerdContainerNotExistError(err), + "containerd container should be cleaned up after when fail to start") + continue + } + t.Logf("container state should be running when start successfully") + assert.Equal(t, runtime.ContainerState_CONTAINER_RUNNING, meta.State()) + info, err := fake.Info(context.Background(), &execution.InfoRequest{ID: testID}) + assert.NoError(t, err) + pid := info.Pid + assert.Equal(t, pid, meta.Pid) + assert.Equal(t, container.Status_RUNNING, info.Status) + // Check runtime spec + calls := fake.GetCalledDetails() + createOpts, ok := calls[1].Argument.(*execution.CreateRequest) + assert.True(t, ok, "2nd call should be create") + // TODO(random-liu): Test other create options. + spec := &runtimespec.Spec{} + assert.NoError(t, json.Unmarshal(createOpts.Spec.Value, spec)) + specCheck(t, testID, testSandboxPid, spec) + } +} diff --git a/pkg/server/container_status_test.go b/pkg/server/container_status_test.go new file mode 100644 index 000000000..acd536967 --- /dev/null +++ b/pkg/server/container_status_test.go @@ -0,0 +1,173 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" +) + +func getContainerStatusTestData() (*metadata.ContainerMetadata, *runtime.ContainerStatus) { + testID := "test-id" + config := &runtime.ContainerConfig{ + Metadata: &runtime.ContainerMetadata{ + Name: "test-name", + Attempt: 1, + }, + Image: &runtime.ImageSpec{Image: "test-image"}, + Mounts: []*runtime.Mount{{ + ContainerPath: "test-container-path", + HostPath: "test-host-path", + }}, + Labels: map[string]string{"a": "b"}, + Annotations: map[string]string{"c": "d"}, + } + + createdAt := time.Now().UnixNano() + startedAt := time.Now().UnixNano() + + metadata := &metadata.ContainerMetadata{ + ID: testID, + Name: "test-long-name", + SandboxID: "test-sandbox-id", + Config: config, + ImageRef: "test-image-ref", + Pid: 1234, + CreatedAt: createdAt, + StartedAt: startedAt, + } + + expected := &runtime.ContainerStatus{ + Id: testID, + Metadata: config.GetMetadata(), + State: runtime.ContainerState_CONTAINER_RUNNING, + CreatedAt: createdAt, + StartedAt: startedAt, + Image: config.GetImage(), + ImageRef: "test-image-ref", + Reason: completeExitReason, + Labels: config.GetLabels(), + Annotations: config.GetAnnotations(), + Mounts: config.GetMounts(), + } + + return metadata, expected +} + +func TestToCRIContainerStatus(t *testing.T) { + for desc, test := range map[string]struct { + finishedAt int64 + exitCode int32 + reason string + message string + expectedState runtime.ContainerState + expectedReason string + }{ + "container running": { + expectedState: runtime.ContainerState_CONTAINER_RUNNING, + }, + "container exited with reason": { + finishedAt: time.Now().UnixNano(), + exitCode: 1, + reason: "test-reason", + message: "test-message", + expectedState: runtime.ContainerState_CONTAINER_EXITED, + expectedReason: "test-reason", + }, + "container exited with exit code 0 without reason": { + finishedAt: time.Now().UnixNano(), + exitCode: 0, + message: "test-message", + expectedState: runtime.ContainerState_CONTAINER_EXITED, + expectedReason: completeExitReason, + }, + "container exited with non-zero exit code without reason": { + finishedAt: time.Now().UnixNano(), + exitCode: 1, + message: "test-message", + expectedState: runtime.ContainerState_CONTAINER_EXITED, + expectedReason: errorExitReason, + }, + } { + meta, expected := getContainerStatusTestData() + // Update metadata with test case. + meta.FinishedAt = test.finishedAt + meta.ExitCode = test.exitCode + meta.Reason = test.reason + meta.Message = test.message + // Set expectation based on test case. + expected.State = test.expectedState + expected.Reason = test.expectedReason + expected.FinishedAt = test.finishedAt + expected.ExitCode = test.exitCode + expected.Message = test.message + assert.Equal(t, expected, toCRIContainerStatus(meta), desc) + } +} + +func TestContainerStatus(t *testing.T) { + for desc, test := range map[string]struct { + exist bool + finishedAt int64 + reason string + expectedState runtime.ContainerState + expectErr bool + }{ + "container running": { + exist: true, + expectedState: runtime.ContainerState_CONTAINER_RUNNING, + }, + "container exited": { + exist: true, + finishedAt: time.Now().UnixNano(), + reason: "test-reason", + expectedState: runtime.ContainerState_CONTAINER_EXITED, + }, + "container not exist": { + exist: false, + expectErr: true, + }, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + meta, expected := getContainerStatusTestData() + // Update metadata with test case. + meta.FinishedAt = test.finishedAt + meta.Reason = test.reason + if test.exist { + assert.NoError(t, c.containerStore.Create(*meta)) + } + resp, err := c.ContainerStatus(context.Background(), &runtime.ContainerStatusRequest{ContainerId: meta.ID}) + if test.expectErr { + assert.Error(t, err) + assert.Nil(t, resp) + continue + } + // Set expectation based on test case. + expected.FinishedAt = test.finishedAt + expected.Reason = test.reason + expected.State = test.expectedState + assert.Equal(t, expected, resp.GetStatus()) + } +} diff --git a/pkg/server/container_stop_test.go b/pkg/server/container_stop_test.go new file mode 100644 index 000000000..31db1bab8 --- /dev/null +++ b/pkg/server/container_stop_test.go @@ -0,0 +1,199 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "errors" + "testing" + "time" + + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing" +) + +func TestWaitContainerStop(t *testing.T) { + id := "test-id" + timeout := 2 * stopCheckPollInterval + for desc, test := range map[string]struct { + metadata *metadata.ContainerMetadata + expectErr bool + }{ + "should return error if timeout exceeds": { + metadata: &metadata.ContainerMetadata{ + ID: id, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + }, + expectErr: true, + }, + "should not return error if container is removed before timeout": { + metadata: nil, + expectErr: false, + }, + "should not return error if container is stopped before timeout": { + metadata: &metadata.ContainerMetadata{ + ID: id, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + FinishedAt: time.Now().UnixNano(), + }, + expectErr: false, + }, + } { + c := newTestCRIContainerdService() + if test.metadata != nil { + assert.NoError(t, c.containerStore.Create(*test.metadata)) + } + err := c.waitContainerStop(id, timeout) + assert.Equal(t, test.expectErr, err != nil, desc) + } +} + +func TestStopContainer(t *testing.T) { + testID := "test-id" + testPid := uint32(1234) + testMetadata := metadata.ContainerMetadata{ + ID: testID, + Pid: testPid, + CreatedAt: time.Now().UnixNano(), + StartedAt: time.Now().UnixNano(), + } + testContainer := container.Container{ + ID: testID, + Pid: testPid, + Status: container.Status_RUNNING, + } + for desc, test := range map[string]struct { + metadata *metadata.ContainerMetadata + containerdContainer *container.Container + killErr error + deleteErr error + discardEvents int + expectErr bool + expectCalls []string + }{ + "should return error when container does not exist": { + metadata: nil, + expectErr: true, + expectCalls: []string{}, + }, + "should not return error when container is not running": { + metadata: &metadata.ContainerMetadata{ + ID: testID, + CreatedAt: time.Now().UnixNano(), + }, + expectErr: false, + expectCalls: []string{}, + }, + "should not return error if containerd container does not exist": { + metadata: &testMetadata, + expectErr: false, + expectCalls: []string{"kill"}, + }, + "should not return error if containerd container is killed": { + metadata: &testMetadata, + containerdContainer: &testContainer, + expectErr: false, + // deleted by the event monitor. + expectCalls: []string{"kill", "delete"}, + }, + "should not return error if containerd container is deleted": { + metadata: &testMetadata, + containerdContainer: &testContainer, + // discard killed events to force a delete. This is only + // for testing. Actually real containerd should only generate + // one EXIT event. + discardEvents: 1, + expectErr: false, + // one more delete from the event monitor. + expectCalls: []string{"kill", "delete", "delete"}, + }, + "should return error if kill failed": { + metadata: &testMetadata, + containerdContainer: &testContainer, + killErr: errors.New("random error"), + expectErr: true, + expectCalls: []string{"kill"}, + }, + "should return error if delete failed": { + metadata: &testMetadata, + containerdContainer: &testContainer, + deleteErr: errors.New("random error"), + discardEvents: 1, + expectErr: true, + expectCalls: []string{"kill", "delete"}, + }, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + fake := servertesting.NewFakeExecutionClient().WithEvents() + defer fake.Stop() + c.containerService = fake + + // Inject metadata. + if test.metadata != nil { + assert.NoError(t, c.containerStore.Create(*test.metadata)) + } + // Inject containerd container. + if test.containerdContainer != nil { + fake.SetFakeContainers([]container.Container{*test.containerdContainer}) + } + if test.killErr != nil { + fake.InjectError("kill", test.killErr) + } + if test.deleteErr != nil { + fake.InjectError("delete", test.deleteErr) + } + eventClient, err := fake.Events(context.Background(), &execution.EventsRequest{}) + assert.NoError(t, err) + // Start a simple test event monitor. + go func(e execution.ContainerService_EventsClient, discard int) { + for { + e, err := e.Recv() // nolint: vetshadow + if err != nil { + return + } + if discard > 0 { + discard-- + continue + } + c.handleEvent(e) + } + }(eventClient, test.discardEvents) + fake.ClearCalls() + // 1 second timeout should be enough for the unit test. + // TODO(random-liu): Use fake clock for this test. + resp, err := c.StopContainer(context.Background(), &runtime.StopContainerRequest{ + ContainerId: testID, + Timeout: 1, + }) + if test.expectErr { + assert.Error(t, err) + assert.Nil(t, resp) + } else { + assert.NoError(t, err) + assert.NotNil(t, resp) + } + assert.Equal(t, test.expectCalls, fake.GetCalledNames()) + } +} diff --git a/pkg/server/events.go b/pkg/server/events.go index 38446126c..8d4d82998 100644 --- a/pkg/server/events.go +++ b/pkg/server/events.go @@ -28,6 +28,10 @@ import ( // startEventMonitor starts an event monitor which monitors and handles all // container events. +// TODO(random-liu): [P1] Figure out: +// 1) Is it possible to drop event during containerd is running? +// 2) How to deal with containerd down? We should restart event monitor, and +// we should recover all container state. func (c *criContainerdService) startEventMonitor() error { events, err := c.containerService.Events(context.Background(), &execution.EventsRequest{}) if err != nil { @@ -35,25 +39,33 @@ func (c *criContainerdService) startEventMonitor() error { } go func() { for { - c.handleEvent(events) + c.handleEventStream(events) } }() return nil } -// handleEvent receives an event from contaienrd and handles the event. -func (c *criContainerdService) handleEvent(events execution.ContainerService_EventsClient) { +// handleEventStream receives an event from containerd and handles the event. +func (c *criContainerdService) handleEventStream(events execution.ContainerService_EventsClient) { + // TODO(random-liu): [P1] Should backoff on this error, or else this will + // cause a busy loop. e, err := events.Recv() if err != nil { glog.Errorf("Failed to receive event: %v", err) return } glog.V(2).Infof("Received container event: %+v", e) + c.handleEvent(e) + return +} + +// handleEvent handles a containerd event. +func (c *criContainerdService) handleEvent(e *container.Event) { switch e.Type { // If containerd-shim exits unexpectedly, there will be no corresponding event. // However, containerd could not retrieve container state in that case, so it's // fine to leave out that case for now. - // TODO(random-liu): [P2] Handle container-shim exit. + // TODO(random-liu): [P2] Handle containerd-shim exit. case container.Event_EXIT: meta, err := c.containerStore.Get(e.ID) if err != nil { @@ -61,7 +73,7 @@ func (c *criContainerdService) handleEvent(events execution.ContainerService_Eve return } if e.Pid != meta.Pid { - // Not init process dies, ignore the event. + // Non-init process died, ignore the event. return } // Delete the container from containerd. @@ -90,5 +102,4 @@ func (c *criContainerdService) handleEvent(events execution.ContainerService_Eve case container.Event_OOM: // TODO(random-liu): [P1] Handle OOM event. } - return } diff --git a/pkg/server/events_test.go b/pkg/server/events_test.go new file mode 100644 index 000000000..79d75d9ff --- /dev/null +++ b/pkg/server/events_test.go @@ -0,0 +1,154 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package server + +import ( + "fmt" + "testing" + "time" + + "github.com/containerd/containerd/api/services/execution" + "github.com/containerd/containerd/api/types/container" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" + "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + + "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + servertesting "github.com/kubernetes-incubator/cri-containerd/pkg/server/testing" +) + +func TestHandleEvent(t *testing.T) { + testID := "test-id" + testPid := uint32(1234) + testCreatedAt := time.Now().UnixNano() + testStartedAt := time.Now().UnixNano() + // Container metadata in running state. + testMetadata := metadata.ContainerMetadata{ + ID: testID, + Name: "test-name", + SandboxID: "test-sandbox-id", + Pid: testPid, + CreatedAt: testCreatedAt, + StartedAt: testStartedAt, + } + testExitedAt := time.Now() + testExitEvent := container.Event{ + ID: testID, + Type: container.Event_EXIT, + Pid: testPid, + ExitStatus: 1, + ExitedAt: testExitedAt, + } + testFinishedMetadata := metadata.ContainerMetadata{ + ID: testID, + Name: "test-name", + SandboxID: "test-sandbox-id", + Pid: 0, + CreatedAt: testCreatedAt, + StartedAt: testStartedAt, + FinishedAt: testExitedAt.UnixNano(), + ExitCode: 1, + } + assert.Equal(t, runtime.ContainerState_CONTAINER_RUNNING, testMetadata.State()) + testContainerdContainer := container.Container{ + ID: testID, + Pid: testPid, + Status: container.Status_RUNNING, + } + + for desc, test := range map[string]struct { + event *container.Event + metadata *metadata.ContainerMetadata + containerdContainer *container.Container + containerdErr error + expected *metadata.ContainerMetadata + }{ + "should not update state when no corresponding metadata for event": { + event: &testExitEvent, + expected: nil, + }, + "should not update state when exited process is not init process": { + event: &container.Event{ + ID: testID, + Type: container.Event_EXIT, + Pid: 9999, + ExitStatus: 1, + ExitedAt: testExitedAt, + }, + metadata: &testMetadata, + containerdContainer: &testContainerdContainer, + expected: &testMetadata, + }, + "should not update state when fail to delete containerd container": { + event: &testExitEvent, + metadata: &testMetadata, + containerdContainer: &testContainerdContainer, + containerdErr: fmt.Errorf("random error"), + expected: &testMetadata, + }, + "should not update state for non-exited events": { + event: &container.Event{ + ID: testID, + Type: container.Event_OOM, + Pid: testPid, + ExitStatus: 1, + ExitedAt: testExitedAt, + }, + metadata: &testMetadata, + containerdContainer: &testContainerdContainer, + expected: &testMetadata, + }, + "should update state when containerd container is already deleted": { + event: &testExitEvent, + metadata: &testMetadata, + expected: &testFinishedMetadata, + }, + "should update state when delete containerd container successfully": { + event: &testExitEvent, + metadata: &testMetadata, + containerdContainer: &testContainerdContainer, + expected: &testFinishedMetadata, + }, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + fake := c.containerService.(*servertesting.FakeExecutionClient) + e, err := fake.Events(context.Background(), &execution.EventsRequest{}) + assert.NoError(t, err) + fakeEvents := e.(*servertesting.EventClient) + // Inject event. + if test.event != nil { + fakeEvents.Events <- test.event + } + // Inject metadata. + if test.metadata != nil { + // Make sure that original data will not be changed. + assert.NoError(t, c.containerStore.Create(*test.metadata)) + } + // Inject containerd container. + if test.containerdContainer != nil { + fake.SetFakeContainers([]container.Container{*test.containerdContainer}) + } + // Inject containerd delete error. + if test.containerdErr != nil { + fake.InjectError("delete", test.containerdErr) + } + c.handleEventStream(e) + got, _ := c.containerStore.Get(testID) + assert.Equal(t, test.expected, got) + } +} diff --git a/pkg/server/helpers_test.go b/pkg/server/helpers_test.go index b1ccb40cc..0b2370f1d 100644 --- a/pkg/server/helpers_test.go +++ b/pkg/server/helpers_test.go @@ -17,13 +17,106 @@ limitations under the License. package server import ( + "fmt" + "io" + "os" + "syscall" "testing" "github.com/stretchr/testify/assert" + "golang.org/x/net/context" "github.com/kubernetes-incubator/cri-containerd/pkg/metadata" + ostesting "github.com/kubernetes-incubator/cri-containerd/pkg/os/testing" ) +func TestPrepareStreamingPipes(t *testing.T) { + for desc, test := range map[string]struct { + stdin string + stdout string + stderr string + }{ + "empty stdin": { + stdout: "/test/stdout", + stderr: "/test/stderr", + }, + "empty stdout/stderr": { + stdin: "/test/stdin", + }, + "non-empty stdio": { + stdin: "/test/stdin", + stdout: "/test/stdout", + stderr: "/test/stderr", + }, + "empty stdio": {}, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + fakeOS := c.os.(*ostesting.FakeOS) + fakeOS.OpenFifoFn = func(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { + expectFlag := syscall.O_RDONLY | syscall.O_CREAT | syscall.O_NONBLOCK + if fn == test.stdin { + expectFlag = syscall.O_WRONLY | syscall.O_CREAT | syscall.O_NONBLOCK + } + assert.Equal(t, expectFlag, flag) + assert.Equal(t, os.FileMode(0700), perm) + return nopReadWriteCloser{}, nil + } + i, o, e, err := c.prepareStreamingPipes(context.Background(), test.stdin, test.stdout, test.stderr) + assert.NoError(t, err) + assert.Equal(t, test.stdin != "", i != nil) + assert.Equal(t, test.stdout != "", o != nil) + assert.Equal(t, test.stderr != "", e != nil) + } +} + +type closeTestReadWriteCloser struct { + CloseFn func() error + nopReadWriteCloser +} + +func (c closeTestReadWriteCloser) Close() error { + return c.CloseFn() +} + +func TestPrepareStreamingPipesError(t *testing.T) { + stdin, stdout, stderr := "/test/stdin", "/test/stdout", "/test/stderr" + for desc, inject := range map[string]map[string]error{ + "should cleanup on stdin error": {stdin: fmt.Errorf("stdin error")}, + "should cleanup on stdout error": {stdout: fmt.Errorf("stdout error")}, + "should cleanup on stderr error": {stderr: fmt.Errorf("stderr error")}, + } { + t.Logf("TestCase %q", desc) + c := newTestCRIContainerdService() + fakeOS := c.os.(*ostesting.FakeOS) + openFlags := map[string]bool{ + stdin: false, + stdout: false, + stderr: false, + } + fakeOS.OpenFifoFn = func(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) { + if inject[fn] != nil { + return nil, inject[fn] + } + openFlags[fn] = !openFlags[fn] + testCloser := closeTestReadWriteCloser{} + testCloser.CloseFn = func() error { + openFlags[fn] = !openFlags[fn] + return nil + } + return testCloser, nil + } + i, o, e, err := c.prepareStreamingPipes(context.Background(), stdin, stdout, stderr) + assert.Error(t, err) + assert.Nil(t, i) + assert.Nil(t, o) + assert.Nil(t, e) + assert.False(t, openFlags[stdin]) + assert.False(t, openFlags[stdout]) + assert.False(t, openFlags[stderr]) + } +} + func TestGetSandbox(t *testing.T) { c := newTestCRIContainerdService() testID := "abcdefg" diff --git a/pkg/server/service_test.go b/pkg/server/service_test.go index 9ac434ed3..c753815dd 100644 --- a/pkg/server/service_test.go +++ b/pkg/server/service_test.go @@ -39,8 +39,9 @@ import ( type nopReadWriteCloser struct{} -func (nopReadWriteCloser) Read(p []byte) (n int, err error) { return len(p), nil } -func (nopReadWriteCloser) Write(p []byte) (n int, err error) { return len(p), nil } +// Return error directly to avoid read/write. +func (nopReadWriteCloser) Read(p []byte) (n int, err error) { return 0, io.EOF } +func (nopReadWriteCloser) Write(p []byte) (n int, err error) { return 0, io.ErrShortWrite } func (nopReadWriteCloser) Close() error { return nil } const testRootDir = "/test/rootfs" @@ -48,12 +49,14 @@ const testRootDir = "/test/rootfs" // newTestCRIContainerdService creates a fake criContainerdService for test. func newTestCRIContainerdService() *criContainerdService { return &criContainerdService{ - os: ostesting.NewFakeOS(), - rootDir: testRootDir, - containerService: servertesting.NewFakeExecutionClient(), - sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()), - sandboxNameIndex: registrar.NewRegistrar(), - sandboxIDIndex: truncindex.NewTruncIndex(nil), + os: ostesting.NewFakeOS(), + rootDir: testRootDir, + containerService: servertesting.NewFakeExecutionClient(), + sandboxStore: metadata.NewSandboxStore(store.NewMetadataStore()), + sandboxNameIndex: registrar.NewRegistrar(), + sandboxIDIndex: truncindex.NewTruncIndex(nil), + containerStore: metadata.NewContainerStore(store.NewMetadataStore()), + containerNameIndex: registrar.NewRegistrar(), } } diff --git a/pkg/server/testing/fake_execution_client.go b/pkg/server/testing/fake_execution_client.go index 73ff3da89..a241f97e6 100644 --- a/pkg/server/testing/fake_execution_client.go +++ b/pkg/server/testing/fake_execution_client.go @@ -51,7 +51,10 @@ type EventClient struct { // Recv is a test implementation of Recv func (cli *EventClient) Recv() (*container.Event, error) { - event := <-cli.Events + event, ok := <-cli.Events + if !ok { + return nil, fmt.Errorf("event channel closed") + } return event, nil } @@ -76,6 +79,18 @@ func NewFakeExecutionClient() *FakeExecutionClient { } } +// Stop the fake execution service. Needed when event is enabled. +func (f *FakeExecutionClient) Stop() { + if f.eventsQueue != nil { + close(f.eventsQueue) + } + f.Lock() + defer f.Unlock() + for _, client := range f.eventClients { + close(client.Events) + } +} + // WithEvents setup events publisher for FakeExecutionClient func (f *FakeExecutionClient) WithEvents() *FakeExecutionClient { f.eventsQueue = make(chan *container.Event, 1024) @@ -154,6 +169,13 @@ func (f *FakeExecutionClient) GetCalledNames() []string { return names } +// ClearCalls clear all call detail. +func (f *FakeExecutionClient) ClearCalls() { + f.Lock() + defer f.Unlock() + f.called = []CalledDetail{} +} + // GetCalledDetails get detail of each call. func (f *FakeExecutionClient) GetCalledDetails() []CalledDetail { f.Lock()