From fd75f1e52fb3614808a78c1036b545c0903023bf Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Thu, 10 Aug 2017 15:40:01 -0400 Subject: [PATCH] Add stress test tool This adds a `stress` binary to help stress test containerd. It is different from a benchmarking tool as it only gives a simple summary at the end. It is built to run long, multi hour/day stress tests across builds of containerd. Signed-off-by: Michael Crosby --- Makefile | 2 +- cmd/stress/main.go | 273 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 cmd/stress/main.go diff --git a/Makefile b/Makefile index ea5cfb4d5..5813a1a38 100644 --- a/Makefile +++ b/Makefile @@ -48,7 +48,7 @@ TEST_REQUIRES_ROOT_PACKAGES=$(filter \ ) # Project binaries. -COMMANDS=ctr containerd +COMMANDS=ctr containerd stress ifneq ("$(GOOS)", "windows") COMMANDS += containerd-shim endif diff --git a/cmd/stress/main.go b/cmd/stress/main.go new file mode 100644 index 000000000..c24659a6f --- /dev/null +++ b/cmd/stress/main.go @@ -0,0 +1,273 @@ +package main + +import ( + "bytes" + "context" + "fmt" + "io/ioutil" + "os" + "os/signal" + "runtime" + "strings" + "sync" + "syscall" + "time" + + "github.com/containerd/containerd" + "github.com/containerd/containerd/namespaces" + specs "github.com/opencontainers/runtime-spec/specs-go" + "github.com/sirupsen/logrus" + "github.com/urfave/cli" +) + +const imageName = "docker.io/library/alpine:latest" + +func main() { + // morr power! + runtime.GOMAXPROCS(runtime.NumCPU()) + + app := cli.NewApp() + app.Name = "stress" + app.Description = "stress test a containerd daemon" + app.Flags = []cli.Flag{ + cli.BoolFlag{ + Name: "debug", + Usage: "set debug output in the logs", + }, + cli.StringFlag{ + Name: "address,a", + Value: "/run/containerd/containerd.sock", + Usage: "path to the containerd socket", + }, + cli.IntFlag{ + Name: "concurrent,c", + Value: 1, + Usage: "set the concurrency of the stress test", + }, + cli.DurationFlag{ + Name: "duration,d", + Value: 1 * time.Minute, + Usage: "set the duration of the stress test", + }, + } + app.Before = func(context *cli.Context) error { + if context.GlobalBool("debug") { + logrus.SetLevel(logrus.DebugLevel) + } + return nil + } + app.Action = func(context *cli.Context) error { + config := config{ + Address: context.GlobalString("address"), + Duration: context.GlobalDuration("duration"), + Concurrency: context.GlobalInt("concurrent"), + } + return test(config) + } + if err := app.Run(os.Args); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +type config struct { + Concurrency int + Duration time.Duration + Address string +} + +func (c config) newClient() (*containerd.Client, error) { + return containerd.New(c.Address) +} + +func test(c config) error { + var ( + wg sync.WaitGroup + ctx = namespaces.WithNamespace(context.Background(), "stress") + ) + + client, err := c.newClient() + if err != nil { + return err + } + defer client.Close() + if err := cleanup(ctx, client); err != nil { + return err + } + logrus.Infof("pulling %s", imageName) + image, err := client.Pull(ctx, imageName, containerd.WithPullUnpack) + if err != nil { + return err + } + logrus.Info("generating spec from image") + spec, err := containerd.GenerateSpec(containerd.WithImageConfig(ctx, image), containerd.WithProcessArgs("true")) + if err != nil { + return err + } + tctx, cancel := context.WithTimeout(ctx, c.Duration) + go func() { + s := make(chan os.Signal, 1) + signal.Notify(s, syscall.SIGTERM, syscall.SIGINT) + <-s + cancel() + }() + + var ( + workers []*worker + start = time.Now() + ) + logrus.Info("starting stress test run...") + for i := 0; i < c.Concurrency; i++ { + wg.Add(1) + w := &worker{ + id: i, + wg: &wg, + spec: spec, + image: image, + client: client, + } + workers = append(workers, w) + go w.run(ctx, tctx) + } + wg.Wait() + + var ( + total int + failures int + end = time.Now().Sub(start).Seconds() + ) + logrus.Infof("ending test run in %0.3f seconds", end) + for _, w := range workers { + total += w.count + failures += w.failures + } + logrus.WithField("failures", failures).Infof( + "create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)", + total, + end, + float64(total)/end, + end/float64(total), + ) + return nil +} + +type worker struct { + id int + wg *sync.WaitGroup + count int + failures int + waitContext context.Context + + client *containerd.Client + image containerd.Image + spec *specs.Spec +} + +func (w *worker) run(ctx, tctx context.Context) { + defer func() { + w.wg.Done() + logrus.Infof("worker %d finished", w.id) + }() + wctx, cancel := context.WithCancel(ctx) + w.waitContext = wctx + go func() { + <-tctx.Done() + cancel() + }() + for { + select { + case <-tctx.Done(): + return + default: + } + + w.count++ + id := w.getID() + logrus.Debugf("starting container %s", id) + if err := w.runContainer(ctx, id); err != nil { + if err != context.DeadlineExceeded || + !strings.Contains(err.Error(), context.DeadlineExceeded.Error()) { + w.failures++ + logrus.WithError(err).Errorf("running container %s", id) + + } + } + } +} + +func (w *worker) runContainer(ctx context.Context, id string) error { + c, err := w.client.NewContainer(ctx, id, + containerd.WithSpec(w.spec), + containerd.WithNewSnapshot(id, w.image), + ) + if err != nil { + return err + } + defer c.Delete(ctx, containerd.WithSnapshotCleanup) + + task, err := c.NewTask(ctx, containerd.NewIO(bytes.NewBuffer(nil), ioutil.Discard, ioutil.Discard)) + if err != nil { + return err + } + defer task.Delete(ctx, containerd.WithProcessKill) + var ( + start sync.WaitGroup + status = make(chan uint32, 1) + ) + start.Add(1) + go func() { + start.Done() + s, err := task.Wait(w.waitContext) + if err != nil { + if err == context.DeadlineExceeded || + err == context.Canceled { + close(status) + return + } + w.failures++ + logrus.WithError(err).Errorf("wait task %s", id) + } + status <- s + }() + start.Wait() + if err := task.Start(ctx); err != nil { + return err + } + <-status + return nil +} + +func (w *worker) getID() string { + return fmt.Sprintf("%d-%d", w.id, w.count) +} + +func (w *worker) cleanup(ctx context.Context, c containerd.Container) { + if err := c.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { + if err == context.DeadlineExceeded { + return + } + w.failures++ + logrus.WithError(err).Errorf("delete container %s", c.ID()) + } +} + +// cleanup cleans up any containers in the "stress" namespace before the test run +func cleanup(ctx context.Context, client *containerd.Client) error { + containers, err := client.Containers(ctx) + if err != nil { + return err + } + for _, c := range containers { + task, err := c.Task(ctx, nil) + if err == nil { + task.Delete(ctx, containerd.WithProcessKill) + } + if err := c.Delete(ctx, containerd.WithSnapshotCleanup); err != nil { + if derr := c.Delete(ctx); derr == nil { + continue + } + return err + } + } + return nil +}