Merge pull request #1910 from crosbymichael/stress-json
Add metrics endpoint to stress test tool
This commit is contained in:
		@@ -2,29 +2,68 @@ package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"os"
 | 
			
		||||
	"os/signal"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"syscall"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/containerd/containerd"
 | 
			
		||||
	"github.com/containerd/containerd/cio"
 | 
			
		||||
	"github.com/containerd/containerd/containers"
 | 
			
		||||
	"github.com/containerd/containerd/namespaces"
 | 
			
		||||
	"github.com/containerd/containerd/oci"
 | 
			
		||||
	specs "github.com/opencontainers/runtime-spec/specs-go"
 | 
			
		||||
	metrics "github.com/docker/go-metrics"
 | 
			
		||||
	"github.com/sirupsen/logrus"
 | 
			
		||||
	"github.com/urfave/cli"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const imageName = "docker.io/library/alpine:latest"
 | 
			
		||||
 | 
			
		||||
type run struct {
 | 
			
		||||
	total    int
 | 
			
		||||
	failures int
 | 
			
		||||
 | 
			
		||||
	started time.Time
 | 
			
		||||
	ended   time.Time
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *run) start() {
 | 
			
		||||
	r.started = time.Now()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *run) end() {
 | 
			
		||||
	r.ended = time.Now()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *run) seconds() float64 {
 | 
			
		||||
	return r.ended.Sub(r.started).Seconds()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r *run) gather(workers []*worker) *result {
 | 
			
		||||
	for _, w := range workers {
 | 
			
		||||
		r.total += w.count
 | 
			
		||||
		r.failures += w.failures
 | 
			
		||||
	}
 | 
			
		||||
	sec := r.seconds()
 | 
			
		||||
	return &result{
 | 
			
		||||
		Total:               r.total,
 | 
			
		||||
		Seconds:             sec,
 | 
			
		||||
		ContainersPerSecond: float64(r.total) / sec,
 | 
			
		||||
		SecondsPerContainer: sec / float64(r.total),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type result struct {
 | 
			
		||||
	Total               int     `json:"total"`
 | 
			
		||||
	Seconds             float64 `json:"seconds"`
 | 
			
		||||
	ContainersPerSecond float64 `json:"containersPerSecond"`
 | 
			
		||||
	SecondsPerContainer float64 `json:"secondsPerContainer"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	// morr power!
 | 
			
		||||
	runtime.GOMAXPROCS(runtime.NumCPU())
 | 
			
		||||
@@ -56,11 +95,22 @@ func main() {
 | 
			
		||||
			Name:  "exec",
 | 
			
		||||
			Usage: "add execs to the stress tests",
 | 
			
		||||
		},
 | 
			
		||||
		cli.BoolFlag{
 | 
			
		||||
			Name:  "json,j",
 | 
			
		||||
			Usage: "output results in json format",
 | 
			
		||||
		},
 | 
			
		||||
		cli.StringFlag{
 | 
			
		||||
			Name:  "metrics,m",
 | 
			
		||||
			Usage: "address to serve the metrics API",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
	app.Before = func(context *cli.Context) error {
 | 
			
		||||
		if context.GlobalBool("debug") {
 | 
			
		||||
			logrus.SetLevel(logrus.DebugLevel)
 | 
			
		||||
		}
 | 
			
		||||
		if context.GlobalBool("json") {
 | 
			
		||||
			logrus.SetLevel(logrus.WarnLevel)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	app.Action = func(context *cli.Context) error {
 | 
			
		||||
@@ -69,6 +119,11 @@ func main() {
 | 
			
		||||
			Duration:    context.GlobalDuration("duration"),
 | 
			
		||||
			Concurrency: context.GlobalInt("concurrent"),
 | 
			
		||||
			Exec:        context.GlobalBool("exec"),
 | 
			
		||||
			JSON:        context.GlobalBool("json"),
 | 
			
		||||
			Metrics:     context.GlobalString("metrics"),
 | 
			
		||||
		}
 | 
			
		||||
		if config.Metrics != "" {
 | 
			
		||||
			return serve(config)
 | 
			
		||||
		}
 | 
			
		||||
		return test(config)
 | 
			
		||||
	}
 | 
			
		||||
@@ -83,12 +138,23 @@ type config struct {
 | 
			
		||||
	Duration    time.Duration
 | 
			
		||||
	Address     string
 | 
			
		||||
	Exec        bool
 | 
			
		||||
	JSON        bool
 | 
			
		||||
	Metrics     string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c config) newClient() (*containerd.Client, error) {
 | 
			
		||||
	return containerd.New(c.Address)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func serve(c config) error {
 | 
			
		||||
	go func() {
 | 
			
		||||
		if err := http.ListenAndServe(c.Metrics, metrics.Handler()); err != nil {
 | 
			
		||||
			logrus.WithError(err).Error("listen and serve")
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
	return test(c)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func test(c config) error {
 | 
			
		||||
	var (
 | 
			
		||||
		wg  sync.WaitGroup
 | 
			
		||||
@@ -119,13 +185,18 @@ func test(c config) error {
 | 
			
		||||
 | 
			
		||||
	var (
 | 
			
		||||
		workers []*worker
 | 
			
		||||
		start   = time.Now()
 | 
			
		||||
		r       = &run{}
 | 
			
		||||
	)
 | 
			
		||||
	logrus.Info("starting stress test run...")
 | 
			
		||||
	args := oci.WithProcessArgs("true")
 | 
			
		||||
	if c.Exec {
 | 
			
		||||
		args = oci.WithProcessArgs("sleep", "10")
 | 
			
		||||
	}
 | 
			
		||||
	v, err := client.Version(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	// create the workers along with their spec
 | 
			
		||||
	for i := 0; i < c.Concurrency; i++ {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		spec, err := oci.GenerateSpec(ctx, client,
 | 
			
		||||
@@ -143,140 +214,37 @@ func test(c config) error {
 | 
			
		||||
			image:  image,
 | 
			
		||||
			client: client,
 | 
			
		||||
			doExec: c.Exec,
 | 
			
		||||
			commit: v.Revision,
 | 
			
		||||
		}
 | 
			
		||||
		workers = append(workers, w)
 | 
			
		||||
	}
 | 
			
		||||
	// start the timer and run the worker
 | 
			
		||||
	r.start()
 | 
			
		||||
	for _, w := range workers {
 | 
			
		||||
		go w.run(ctx, tctx)
 | 
			
		||||
	}
 | 
			
		||||
	// wait and end the timer
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	r.end()
 | 
			
		||||
 | 
			
		||||
	var (
 | 
			
		||||
		total    int
 | 
			
		||||
		failures int
 | 
			
		||||
		end      = time.Now().Sub(start).Seconds()
 | 
			
		||||
	)
 | 
			
		||||
	logrus.Infof("ending test run in %0.3f seconds", end)
 | 
			
		||||
	for _, w := range workers {
 | 
			
		||||
		total += w.count
 | 
			
		||||
		failures += w.failures
 | 
			
		||||
	}
 | 
			
		||||
	logrus.WithField("failures", failures).Infof(
 | 
			
		||||
	results := r.gather(workers)
 | 
			
		||||
	logrus.Infof("ending test run in %0.3f seconds", results.Seconds)
 | 
			
		||||
 | 
			
		||||
	logrus.WithField("failures", r.failures).Infof(
 | 
			
		||||
		"create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)",
 | 
			
		||||
		total,
 | 
			
		||||
		end,
 | 
			
		||||
		float64(total)/end,
 | 
			
		||||
		end/float64(total),
 | 
			
		||||
		results.Total,
 | 
			
		||||
		results.Seconds,
 | 
			
		||||
		results.ContainersPerSecond,
 | 
			
		||||
		results.SecondsPerContainer,
 | 
			
		||||
	)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type worker struct {
 | 
			
		||||
	id       int
 | 
			
		||||
	wg       *sync.WaitGroup
 | 
			
		||||
	count    int
 | 
			
		||||
	failures int
 | 
			
		||||
 | 
			
		||||
	client *containerd.Client
 | 
			
		||||
	image  containerd.Image
 | 
			
		||||
	spec   *specs.Spec
 | 
			
		||||
	doExec bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) run(ctx, tctx context.Context) {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		w.wg.Done()
 | 
			
		||||
		logrus.Infof("worker %d finished", w.id)
 | 
			
		||||
	}()
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-tctx.Done():
 | 
			
		||||
			return
 | 
			
		||||
		default:
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		w.count++
 | 
			
		||||
		id := w.getID()
 | 
			
		||||
		logrus.Debugf("starting container %s", id)
 | 
			
		||||
		if err := w.runContainer(ctx, id); err != nil {
 | 
			
		||||
			if err != context.DeadlineExceeded ||
 | 
			
		||||
				!strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
 | 
			
		||||
				w.failures++
 | 
			
		||||
				logrus.WithError(err).Errorf("running container %s", id)
 | 
			
		||||
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) runContainer(ctx context.Context, id string) error {
 | 
			
		||||
	// fix up cgroups path for a default config
 | 
			
		||||
	w.spec.Linux.CgroupsPath = filepath.Join("/", "stress", id)
 | 
			
		||||
	c, err := w.client.NewContainer(ctx, id,
 | 
			
		||||
		containerd.WithNewSnapshot(id, w.image),
 | 
			
		||||
		containerd.WithSpec(w.spec, oci.WithUsername("games")),
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
	if c.JSON {
 | 
			
		||||
		if err := json.NewEncoder(os.Stdout).Encode(results); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	defer c.Delete(ctx, containerd.WithSnapshotCleanup)
 | 
			
		||||
 | 
			
		||||
	task, err := c.NewTask(ctx, cio.NullIO)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer task.Delete(ctx, containerd.WithProcessKill)
 | 
			
		||||
 | 
			
		||||
	statusC, err := task.Wait(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if err := task.Start(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if w.doExec {
 | 
			
		||||
		for i := 0; i < 256; i++ {
 | 
			
		||||
			if err := w.exec(ctx, i, task); err != nil {
 | 
			
		||||
				w.failures++
 | 
			
		||||
				logrus.WithError(err).Error("exec failure")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	status := <-statusC
 | 
			
		||||
	_, _, err = status.Result()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if err == context.DeadlineExceeded || err == context.Canceled {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
		w.failures++
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) exec(ctx context.Context, i int, t containerd.Task) error {
 | 
			
		||||
	pSpec := *w.spec.Process
 | 
			
		||||
	pSpec.Args = []string{"true"}
 | 
			
		||||
	process, err := t.Exec(ctx, strconv.Itoa(i), &pSpec, cio.NullIO)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer process.Delete(ctx)
 | 
			
		||||
	status, err := process.Wait(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if err := process.Start(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	<-status
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) getID() string {
 | 
			
		||||
	return fmt.Sprintf("%d-%d", w.id, w.count)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// cleanup cleans up any containers in the "stress" namespace before the test run
 | 
			
		||||
func cleanup(ctx context.Context, client *containerd.Client) error {
 | 
			
		||||
	containers, err := client.Containers(ctx)
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										147
									
								
								cmd/containerd-stress/worker.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										147
									
								
								cmd/containerd-stress/worker.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,147 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"path/filepath"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"syscall"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/containerd/containerd"
 | 
			
		||||
	"github.com/containerd/containerd/cio"
 | 
			
		||||
	"github.com/containerd/containerd/oci"
 | 
			
		||||
	metrics "github.com/docker/go-metrics"
 | 
			
		||||
	specs "github.com/opencontainers/runtime-spec/specs-go"
 | 
			
		||||
	"github.com/sirupsen/logrus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	ct         metrics.LabeledTimer
 | 
			
		||||
	errCounter metrics.LabeledCounter
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	ns := metrics.NewNamespace("stress", "", nil)
 | 
			
		||||
	// if you want more fine grained metrics then you can drill down with the metrics in prom that
 | 
			
		||||
	// containerd is outputing
 | 
			
		||||
	ct = ns.NewLabeledTimer("run", "Run time of a full container during the test", "commit")
 | 
			
		||||
	errCounter = ns.NewLabeledCounter("errors", "Errors encountered running the stress tests", "err")
 | 
			
		||||
	metrics.Register(ns)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type worker struct {
 | 
			
		||||
	id       int
 | 
			
		||||
	wg       *sync.WaitGroup
 | 
			
		||||
	count    int
 | 
			
		||||
	failures int
 | 
			
		||||
 | 
			
		||||
	client *containerd.Client
 | 
			
		||||
	image  containerd.Image
 | 
			
		||||
	spec   *specs.Spec
 | 
			
		||||
	doExec bool
 | 
			
		||||
	commit string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) run(ctx, tctx context.Context) {
 | 
			
		||||
	defer func() {
 | 
			
		||||
		w.wg.Done()
 | 
			
		||||
		logrus.Infof("worker %d finished", w.id)
 | 
			
		||||
	}()
 | 
			
		||||
	for {
 | 
			
		||||
		select {
 | 
			
		||||
		case <-tctx.Done():
 | 
			
		||||
			return
 | 
			
		||||
		default:
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		w.count++
 | 
			
		||||
		id := w.getID()
 | 
			
		||||
		logrus.Debugf("starting container %s", id)
 | 
			
		||||
		start := time.Now()
 | 
			
		||||
		if err := w.runContainer(ctx, id); err != nil {
 | 
			
		||||
			if err != context.DeadlineExceeded ||
 | 
			
		||||
				!strings.Contains(err.Error(), context.DeadlineExceeded.Error()) {
 | 
			
		||||
				w.failures++
 | 
			
		||||
				logrus.WithError(err).Errorf("running container %s", id)
 | 
			
		||||
				errCounter.WithValues(err.Error()).Inc()
 | 
			
		||||
 | 
			
		||||
			}
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		// only log times are success so we don't scew the results from failures that go really fast
 | 
			
		||||
		ct.WithValues(w.commit).UpdateSince(start)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) runContainer(ctx context.Context, id string) error {
 | 
			
		||||
	// fix up cgroups path for a default config
 | 
			
		||||
	w.spec.Linux.CgroupsPath = filepath.Join("/", "stress", id)
 | 
			
		||||
	c, err := w.client.NewContainer(ctx, id,
 | 
			
		||||
		containerd.WithNewSnapshot(id, w.image),
 | 
			
		||||
		containerd.WithSpec(w.spec, oci.WithUsername("games")),
 | 
			
		||||
	)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer c.Delete(ctx, containerd.WithSnapshotCleanup)
 | 
			
		||||
 | 
			
		||||
	task, err := c.NewTask(ctx, cio.NullIO)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer task.Delete(ctx, containerd.WithProcessKill)
 | 
			
		||||
 | 
			
		||||
	statusC, err := task.Wait(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if err := task.Start(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if w.doExec {
 | 
			
		||||
		for i := 0; i < 256; i++ {
 | 
			
		||||
			if err := w.exec(ctx, i, task); err != nil {
 | 
			
		||||
				w.failures++
 | 
			
		||||
				logrus.WithError(err).Error("exec failure")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	status := <-statusC
 | 
			
		||||
	_, _, err = status.Result()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if err == context.DeadlineExceeded || err == context.Canceled {
 | 
			
		||||
			return nil
 | 
			
		||||
		}
 | 
			
		||||
		w.failures++
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) exec(ctx context.Context, i int, t containerd.Task) error {
 | 
			
		||||
	pSpec := *w.spec.Process
 | 
			
		||||
	pSpec.Args = []string{"true"}
 | 
			
		||||
	process, err := t.Exec(ctx, strconv.Itoa(i), &pSpec, cio.NullIO)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer process.Delete(ctx)
 | 
			
		||||
	status, err := process.Wait(ctx)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if err := process.Start(ctx); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	<-status
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *worker) getID() string {
 | 
			
		||||
	return fmt.Sprintf("%d-%d", w.id, w.count)
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user