diff --git a/cmd/containerd-stress/cri_worker.go b/cmd/containerd-stress/cri_worker.go new file mode 100644 index 000000000..6aa7ee824 --- /dev/null +++ b/cmd/containerd-stress/cri_worker.go @@ -0,0 +1,166 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/sirupsen/logrus" + + internalapi "github.com/containerd/containerd/integration/cri-api/pkg/apis" + "github.com/containerd/containerd/pkg/cri/util" + runtime "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +type criWorker struct { + id int + wg *sync.WaitGroup + count int + failures int + client internalapi.RuntimeService + + commit string + runtimeHandler string + snapshotter string +} + +const podNamespaceLabel = "pod.namespace" + +func (w *criWorker) incCount() { + w.count++ +} + +func (w *criWorker) getCount() int { + return w.count +} + +func (w *criWorker) incFailures() { + w.failures++ +} + +func (w *criWorker) getFailures() int { + return w.failures +} + +func (w *criWorker) run(ctx, tctx context.Context) { + defer func() { + w.wg.Done() + logrus.Infof("worker %d finished", w.id) + }() + for { + select { + case <-tctx.Done(): + return + default: + } + + w.count++ + id := w.getID() + logrus.Debugf("starting container %s", id) + start := time.Now() + if err := w.runSandbox(tctx, ctx, id); err != nil { + if err != context.DeadlineExceeded || + !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + w.failures++ + logrus.WithError(err).Errorf("running container %s", id) + errCounter.WithValues(err.Error()).Inc() + + } + continue + } + // only log times are success so we don't scew the results from failures that go really fast + ct.WithValues(w.commit).UpdateSince(start) + } +} + +func (w *criWorker) runSandbox(tctx, ctx context.Context, id string) (err error) { + + sbConfig := &runtime.PodSandboxConfig{ + Metadata: &runtime.PodSandboxMetadata{ + Name: id, + // Using random id as uuid is good enough for local + // integration test. + Uid: util.GenerateID(), + Namespace: "stress", + }, + Labels: map[string]string{podNamespaceLabel: stressNs}, + Linux: &runtime.LinuxPodSandboxConfig{}, + } + + sb, err := w.client.RunPodSandbox(sbConfig, w.runtimeHandler) + if err != nil { + w.failures++ + return err + } + defer func() { + w.client.StopPodSandbox(sb) + w.client.RemovePodSandbox(sb) + }() + + // verify it is running ? + + ticker := time.NewTicker(250 * time.Millisecond) + quit := make(chan struct{}) + go func() { + for { + select { + case <-tctx.Done(): + close(quit) + return + case <-ticker.C: + // do stuff + status, err := w.client.PodSandboxStatus(sb) + if err != nil && status.GetState() == runtime.PodSandboxState_SANDBOX_READY { + close(quit) + return + } + case <-quit: + ticker.Stop() + return + } + } + }() + + return nil +} + +func (w *criWorker) getID() string { + return fmt.Sprintf("%d-%d", w.id, w.count) +} + +// cleanup cleans up any containers in the "stress" namespace before the test run +func criCleanup(ctx context.Context, client internalapi.RuntimeService) error { + filter := &runtime.PodSandboxFilter{ + LabelSelector: map[string]string{podNamespaceLabel: stressNs}, + } + + sandboxes, err := client.ListPodSandbox(filter) + if err != nil { + return err + } + + for _, sb := range sandboxes { + client.StopPodSandbox(sb.Id) + client.RemovePodSandbox(sb.Id) + } + + return nil +} diff --git a/cmd/containerd-stress/exec_worker.go b/cmd/containerd-stress/exec_worker.go index 6a30f5c18..66aaa4267 100644 --- a/cmd/containerd-stress/exec_worker.go +++ b/cmd/containerd-stress/exec_worker.go @@ -31,7 +31,7 @@ import ( ) type execWorker struct { - worker + ctrWorker } func (w *execWorker) exec(ctx, tctx context.Context) { diff --git a/cmd/containerd-stress/main.go b/cmd/containerd-stress/main.go index 01c6c38c7..da0cd0200 100644 --- a/cmd/containerd-stress/main.go +++ b/cmd/containerd-stress/main.go @@ -29,6 +29,7 @@ import ( "time" "github.com/containerd/containerd" + "github.com/containerd/containerd/integration/remote" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" metrics "github.com/docker/go-metrics" @@ -43,8 +44,12 @@ var ( binarySizeGauge metrics.LabeledGauge ) +const ( + stressNs string = "stress" +) + func init() { - ns := metrics.NewNamespace("stress", "", nil) + ns := metrics.NewNamespace(stressNs, "", nil) // if you want more fine grained metrics then you can drill down with the metrics in prom that // containerd is outputting ct = ns.NewLabeledTimer("run", "Run time of a full container during the test", "commit") @@ -59,6 +64,14 @@ func init() { } } +type worker interface { + run(ctx, tcxt context.Context) + getCount() int + incCount() + getFailures() int + incFailures() +} + type run struct { total int failures int @@ -79,10 +92,10 @@ func (r *run) seconds() float64 { return r.ended.Sub(r.started).Seconds() } -func (r *run) gather(workers []*worker) *result { +func (r *run) gather(workers []worker) *result { for _, w := range workers { - r.total += w.count - r.failures += w.failures + r.total += w.getCount() + r.failures += w.getFailures() } sec := r.seconds() return &result{ @@ -125,6 +138,10 @@ func main() { Value: 1, Usage: "set the concurrency of the stress test", }, + cli.BoolFlag{ + Name: "cri", + Usage: "utilize CRI to create pods for the stress test. This requires a runtime that matches CRI runtime handler. Example: --runtime runc", + }, cli.DurationFlag{ Name: "duration,d", Value: 1 * time.Minute, @@ -132,7 +149,7 @@ func main() { }, cli.BoolFlag{ Name: "exec", - Usage: "add execs to the stress tests", + Usage: "add execs to the stress tests (non-CRI only)", }, cli.StringFlag{ Name: "image,i", @@ -175,6 +192,7 @@ func main() { Address: context.GlobalString("address"), Duration: context.GlobalDuration("duration"), Concurrency: context.GlobalInt("concurrent"), + CRI: context.GlobalBool("cri"), Exec: context.GlobalBool("exec"), Image: context.GlobalString("image"), JSON: context.GlobalBool("json"), @@ -185,6 +203,11 @@ func main() { if config.Metrics != "" { return serve(config) } + + if config.CRI { + return criTest(config) + } + return test(config) } if err := app.Run(os.Args); err != nil { @@ -195,6 +218,7 @@ func main() { type config struct { Concurrency int + CRI bool Duration time.Duration Address string Exec bool @@ -216,9 +240,91 @@ func serve(c config) error { } }() checkBinarySizes() + + if c.CRI { + return criTest(c) + } + return test(c) } +func criTest(c config) error { + var ( + timeout = 1 * time.Minute + wg sync.WaitGroup + ctx = namespaces.WithNamespace(context.Background(), stressNs) + criEndpoint = "unix:///run/containerd/containerd.sock" + ) + + client, err := remote.NewRuntimeService(criEndpoint, timeout) + if err != nil { + return fmt.Errorf("failed to create runtime service: %w", err) + } + + if err := criCleanup(ctx, client); err != nil { + return err + } + + tctx, cancel := context.WithTimeout(ctx, c.Duration) + go func() { + s := make(chan os.Signal, 1) + signal.Notify(s, syscall.SIGTERM, syscall.SIGINT) + <-s + cancel() + }() + + // get runtime version: + version, err := client.Version("") + if err != nil { + return fmt.Errorf("failed to get runtime version: %w", err) + } + var ( + workers []worker + r = &run{} + ) + logrus.Info("starting stress test run...") + // create the workers along with their spec + for i := 0; i < c.Concurrency; i++ { + wg.Add(1) + w := &criWorker{ + id: i, + wg: &wg, + client: client, + commit: fmt.Sprintf("%s-%s", version.RuntimeName, version.RuntimeVersion), + runtimeHandler: c.Runtime, + snapshotter: c.Snapshotter, + } + workers = append(workers, w) + } + + // start the timer and run the worker + r.start() + for _, w := range workers { + go w.run(ctx, tctx) + } + // wait and end the timer + wg.Wait() + r.end() + + results := r.gather(workers) + logrus.Infof("ending test run in %0.3f seconds", results.Seconds) + + logrus.WithField("failures", r.failures).Infof( + "create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)", + results.Total, + results.Seconds, + results.ContainersPerSecond, + results.SecondsPerContainer, + ) + if c.JSON { + if err := json.NewEncoder(os.Stdout).Encode(results); err != nil { + return err + } + } + + return nil +} + func test(c config) error { var ( wg sync.WaitGroup @@ -233,11 +339,17 @@ func test(c config) error { if err := cleanup(ctx, client); err != nil { return err } + logrus.Infof("pulling %s", c.Image) image, err := client.Pull(ctx, c.Image, containerd.WithPullUnpack, containerd.WithPullSnapshotter(c.Snapshotter)) if err != nil { return err } + v, err := client.Version(ctx) + if err != nil { + return err + } + tctx, cancel := context.WithTimeout(ctx, c.Duration) go func() { s := make(chan os.Signal, 1) @@ -247,18 +359,15 @@ func test(c config) error { }() var ( - workers []*worker + workers []worker r = &run{} ) logrus.Info("starting stress test run...") - v, err := client.Version(ctx) - if err != nil { - return err - } // create the workers along with their spec for i := 0; i < c.Concurrency; i++ { wg.Add(1) - w := &worker{ + + w := &ctrWorker{ id: i, wg: &wg, image: image, @@ -273,7 +382,7 @@ func test(c config) error { for i := c.Concurrency; i < c.Concurrency+c.Concurrency; i++ { wg.Add(1) exec = &execWorker{ - worker: worker{ + ctrWorker: ctrWorker{ id: i, wg: &wg, image: image, diff --git a/cmd/containerd-stress/worker.go b/cmd/containerd-stress/worker.go index cfa99a9be..69be22ce7 100644 --- a/cmd/containerd-stress/worker.go +++ b/cmd/containerd-stress/worker.go @@ -29,7 +29,7 @@ import ( "github.com/sirupsen/logrus" ) -type worker struct { +type ctrWorker struct { id int wg *sync.WaitGroup count int @@ -41,7 +41,7 @@ type worker struct { snapshotter string } -func (w *worker) run(ctx, tctx context.Context) { +func (w *ctrWorker) run(ctx, tctx context.Context) { defer func() { w.wg.Done() logrus.Infof("worker %d finished", w.id) @@ -72,7 +72,23 @@ func (w *worker) run(ctx, tctx context.Context) { } } -func (w *worker) runContainer(ctx context.Context, id string) (err error) { +func (w *ctrWorker) incCount() { + w.count++ +} + +func (w *ctrWorker) getCount() int { + return w.count +} + +func (w *ctrWorker) incFailures() { + w.failures++ +} + +func (w *ctrWorker) getFailures() int { + return w.failures +} + +func (w *ctrWorker) runContainer(ctx context.Context, id string) (err error) { // fix up cgroups path for a default config c, err := w.client.NewContainer(ctx, id, containerd.WithSnapshotter(w.snapshotter), @@ -115,6 +131,6 @@ func (w *worker) runContainer(ctx context.Context, id string) (err error) { return nil } -func (w *worker) getID() string { +func (w *ctrWorker) getID() string { return fmt.Sprintf("%d-%d", w.id, w.count) }