Merge pull request #6931 from egernst/cri-stress

This commit is contained in:
Samuel Karp 2022-08-10 17:59:48 -07:00 committed by GitHub
commit f87a1b09fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 308 additions and 17 deletions

View File

@ -0,0 +1,166 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
internalapi "github.com/containerd/containerd/integration/cri-api/pkg/apis"
"github.com/containerd/containerd/pkg/cri/util"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
)
type criWorker struct {
id int
wg *sync.WaitGroup
count int
failures int
client internalapi.RuntimeService
commit string
runtimeHandler string
snapshotter string
}
const podNamespaceLabel = "pod.namespace"
func (w *criWorker) incCount() {
w.count++
}
func (w *criWorker) getCount() int {
return w.count
}
func (w *criWorker) incFailures() {
w.failures++
}
func (w *criWorker) getFailures() int {
return w.failures
}
func (w *criWorker) run(ctx, tctx context.Context) {
defer func() {
w.wg.Done()
logrus.Infof("worker %d finished", w.id)
}()
for {
select {
case <-tctx.Done():
return
default:
}
w.count++
id := w.getID()
logrus.Debugf("starting container %s", id)
start := time.Now()
if err := w.runSandbox(tctx, ctx, id); err != nil {
if err != context.DeadlineExceeded ||
!strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
w.failures++
logrus.WithError(err).Errorf("running container %s", id)
errCounter.WithValues(err.Error()).Inc()
}
continue
}
// only log times are success so we don't scew the results from failures that go really fast
ct.WithValues(w.commit).UpdateSince(start)
}
}
func (w *criWorker) runSandbox(tctx, ctx context.Context, id string) (err error) {
sbConfig := &runtime.PodSandboxConfig{
Metadata: &runtime.PodSandboxMetadata{
Name: id,
// Using random id as uuid is good enough for local
// integration test.
Uid: util.GenerateID(),
Namespace: "stress",
},
Labels: map[string]string{podNamespaceLabel: stressNs},
Linux: &runtime.LinuxPodSandboxConfig{},
}
sb, err := w.client.RunPodSandbox(sbConfig, w.runtimeHandler)
if err != nil {
w.failures++
return err
}
defer func() {
w.client.StopPodSandbox(sb)
w.client.RemovePodSandbox(sb)
}()
// verify it is running ?
ticker := time.NewTicker(250 * time.Millisecond)
quit := make(chan struct{})
go func() {
for {
select {
case <-tctx.Done():
close(quit)
return
case <-ticker.C:
// do stuff
status, err := w.client.PodSandboxStatus(sb)
if err != nil && status.GetState() == runtime.PodSandboxState_SANDBOX_READY {
close(quit)
return
}
case <-quit:
ticker.Stop()
return
}
}
}()
return nil
}
func (w *criWorker) getID() string {
return fmt.Sprintf("%d-%d", w.id, w.count)
}
// cleanup cleans up any containers in the "stress" namespace before the test run
func criCleanup(ctx context.Context, client internalapi.RuntimeService) error {
filter := &runtime.PodSandboxFilter{
LabelSelector: map[string]string{podNamespaceLabel: stressNs},
}
sandboxes, err := client.ListPodSandbox(filter)
if err != nil {
return err
}
for _, sb := range sandboxes {
client.StopPodSandbox(sb.Id)
client.RemovePodSandbox(sb.Id)
}
return nil
}

View File

@ -31,7 +31,7 @@ import (
) )
type execWorker struct { type execWorker struct {
worker ctrWorker
} }
func (w *execWorker) exec(ctx, tctx context.Context) { func (w *execWorker) exec(ctx, tctx context.Context) {

View File

@ -29,6 +29,7 @@ import (
"time" "time"
"github.com/containerd/containerd" "github.com/containerd/containerd"
"github.com/containerd/containerd/integration/remote"
"github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
metrics "github.com/docker/go-metrics" metrics "github.com/docker/go-metrics"
@ -43,8 +44,12 @@ var (
binarySizeGauge metrics.LabeledGauge binarySizeGauge metrics.LabeledGauge
) )
const (
stressNs string = "stress"
)
func init() { func init() {
ns := metrics.NewNamespace("stress", "", nil) ns := metrics.NewNamespace(stressNs, "", nil)
// if you want more fine grained metrics then you can drill down with the metrics in prom that // if you want more fine grained metrics then you can drill down with the metrics in prom that
// containerd is outputting // containerd is outputting
ct = ns.NewLabeledTimer("run", "Run time of a full container during the test", "commit") ct = ns.NewLabeledTimer("run", "Run time of a full container during the test", "commit")
@ -59,6 +64,14 @@ func init() {
} }
} }
type worker interface {
run(ctx, tcxt context.Context)
getCount() int
incCount()
getFailures() int
incFailures()
}
type run struct { type run struct {
total int total int
failures int failures int
@ -79,10 +92,10 @@ func (r *run) seconds() float64 {
return r.ended.Sub(r.started).Seconds() return r.ended.Sub(r.started).Seconds()
} }
func (r *run) gather(workers []*worker) *result { func (r *run) gather(workers []worker) *result {
for _, w := range workers { for _, w := range workers {
r.total += w.count r.total += w.getCount()
r.failures += w.failures r.failures += w.getFailures()
} }
sec := r.seconds() sec := r.seconds()
return &result{ return &result{
@ -125,6 +138,10 @@ func main() {
Value: 1, Value: 1,
Usage: "set the concurrency of the stress test", Usage: "set the concurrency of the stress test",
}, },
cli.BoolFlag{
Name: "cri",
Usage: "utilize CRI to create pods for the stress test. This requires a runtime that matches CRI runtime handler. Example: --runtime runc",
},
cli.DurationFlag{ cli.DurationFlag{
Name: "duration,d", Name: "duration,d",
Value: 1 * time.Minute, Value: 1 * time.Minute,
@ -132,7 +149,7 @@ func main() {
}, },
cli.BoolFlag{ cli.BoolFlag{
Name: "exec", Name: "exec",
Usage: "add execs to the stress tests", Usage: "add execs to the stress tests (non-CRI only)",
}, },
cli.StringFlag{ cli.StringFlag{
Name: "image,i", Name: "image,i",
@ -175,6 +192,7 @@ func main() {
Address: context.GlobalString("address"), Address: context.GlobalString("address"),
Duration: context.GlobalDuration("duration"), Duration: context.GlobalDuration("duration"),
Concurrency: context.GlobalInt("concurrent"), Concurrency: context.GlobalInt("concurrent"),
CRI: context.GlobalBool("cri"),
Exec: context.GlobalBool("exec"), Exec: context.GlobalBool("exec"),
Image: context.GlobalString("image"), Image: context.GlobalString("image"),
JSON: context.GlobalBool("json"), JSON: context.GlobalBool("json"),
@ -185,6 +203,11 @@ func main() {
if config.Metrics != "" { if config.Metrics != "" {
return serve(config) return serve(config)
} }
if config.CRI {
return criTest(config)
}
return test(config) return test(config)
} }
if err := app.Run(os.Args); err != nil { if err := app.Run(os.Args); err != nil {
@ -195,6 +218,7 @@ func main() {
type config struct { type config struct {
Concurrency int Concurrency int
CRI bool
Duration time.Duration Duration time.Duration
Address string Address string
Exec bool Exec bool
@ -216,9 +240,91 @@ func serve(c config) error {
} }
}() }()
checkBinarySizes() checkBinarySizes()
if c.CRI {
return criTest(c)
}
return test(c) return test(c)
} }
func criTest(c config) error {
var (
timeout = 1 * time.Minute
wg sync.WaitGroup
ctx = namespaces.WithNamespace(context.Background(), stressNs)
criEndpoint = "unix:///run/containerd/containerd.sock"
)
client, err := remote.NewRuntimeService(criEndpoint, timeout)
if err != nil {
return fmt.Errorf("failed to create runtime service: %w", err)
}
if err := criCleanup(ctx, client); err != nil {
return err
}
tctx, cancel := context.WithTimeout(ctx, c.Duration)
go func() {
s := make(chan os.Signal, 1)
signal.Notify(s, syscall.SIGTERM, syscall.SIGINT)
<-s
cancel()
}()
// get runtime version:
version, err := client.Version("")
if err != nil {
return fmt.Errorf("failed to get runtime version: %w", err)
}
var (
workers []worker
r = &run{}
)
logrus.Info("starting stress test run...")
// create the workers along with their spec
for i := 0; i < c.Concurrency; i++ {
wg.Add(1)
w := &criWorker{
id: i,
wg: &wg,
client: client,
commit: fmt.Sprintf("%s-%s", version.RuntimeName, version.RuntimeVersion),
runtimeHandler: c.Runtime,
snapshotter: c.Snapshotter,
}
workers = append(workers, w)
}
// start the timer and run the worker
r.start()
for _, w := range workers {
go w.run(ctx, tctx)
}
// wait and end the timer
wg.Wait()
r.end()
results := r.gather(workers)
logrus.Infof("ending test run in %0.3f seconds", results.Seconds)
logrus.WithField("failures", r.failures).Infof(
"create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)",
results.Total,
results.Seconds,
results.ContainersPerSecond,
results.SecondsPerContainer,
)
if c.JSON {
if err := json.NewEncoder(os.Stdout).Encode(results); err != nil {
return err
}
}
return nil
}
func test(c config) error { func test(c config) error {
var ( var (
wg sync.WaitGroup wg sync.WaitGroup
@ -233,11 +339,17 @@ func test(c config) error {
if err := cleanup(ctx, client); err != nil { if err := cleanup(ctx, client); err != nil {
return err return err
} }
logrus.Infof("pulling %s", c.Image) logrus.Infof("pulling %s", c.Image)
image, err := client.Pull(ctx, c.Image, containerd.WithPullUnpack, containerd.WithPullSnapshotter(c.Snapshotter)) image, err := client.Pull(ctx, c.Image, containerd.WithPullUnpack, containerd.WithPullSnapshotter(c.Snapshotter))
if err != nil { if err != nil {
return err return err
} }
v, err := client.Version(ctx)
if err != nil {
return err
}
tctx, cancel := context.WithTimeout(ctx, c.Duration) tctx, cancel := context.WithTimeout(ctx, c.Duration)
go func() { go func() {
s := make(chan os.Signal, 1) s := make(chan os.Signal, 1)
@ -247,18 +359,15 @@ func test(c config) error {
}() }()
var ( var (
workers []*worker workers []worker
r = &run{} r = &run{}
) )
logrus.Info("starting stress test run...") logrus.Info("starting stress test run...")
v, err := client.Version(ctx)
if err != nil {
return err
}
// create the workers along with their spec // create the workers along with their spec
for i := 0; i < c.Concurrency; i++ { for i := 0; i < c.Concurrency; i++ {
wg.Add(1) wg.Add(1)
w := &worker{
w := &ctrWorker{
id: i, id: i,
wg: &wg, wg: &wg,
image: image, image: image,
@ -273,7 +382,7 @@ func test(c config) error {
for i := c.Concurrency; i < c.Concurrency+c.Concurrency; i++ { for i := c.Concurrency; i < c.Concurrency+c.Concurrency; i++ {
wg.Add(1) wg.Add(1)
exec = &execWorker{ exec = &execWorker{
worker: worker{ ctrWorker: ctrWorker{
id: i, id: i,
wg: &wg, wg: &wg,
image: image, image: image,

View File

@ -29,7 +29,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
type worker struct { type ctrWorker struct {
id int id int
wg *sync.WaitGroup wg *sync.WaitGroup
count int count int
@ -41,7 +41,7 @@ type worker struct {
snapshotter string snapshotter string
} }
func (w *worker) run(ctx, tctx context.Context) { func (w *ctrWorker) run(ctx, tctx context.Context) {
defer func() { defer func() {
w.wg.Done() w.wg.Done()
logrus.Infof("worker %d finished", w.id) logrus.Infof("worker %d finished", w.id)
@ -72,7 +72,23 @@ func (w *worker) run(ctx, tctx context.Context) {
} }
} }
func (w *worker) runContainer(ctx context.Context, id string) (err error) { func (w *ctrWorker) incCount() {
w.count++
}
func (w *ctrWorker) getCount() int {
return w.count
}
func (w *ctrWorker) incFailures() {
w.failures++
}
func (w *ctrWorker) getFailures() int {
return w.failures
}
func (w *ctrWorker) runContainer(ctx context.Context, id string) (err error) {
// fix up cgroups path for a default config // fix up cgroups path for a default config
c, err := w.client.NewContainer(ctx, id, c, err := w.client.NewContainer(ctx, id,
containerd.WithSnapshotter(w.snapshotter), containerd.WithSnapshotter(w.snapshotter),
@ -115,6 +131,6 @@ func (w *worker) runContainer(ctx context.Context, id string) (err error) {
return nil return nil
} }
func (w *worker) getID() string { func (w *ctrWorker) getID() string {
return fmt.Sprintf("%d-%d", w.id, w.count) return fmt.Sprintf("%d-%d", w.id, w.count)
} }