containerd-stress: add support for running through CRI

Introduce a --cri flag, which will enable running container-stress using the CRI,
instead of containerd's task API.

In doing so, we introduce cri_worker, rename the existing worker to ctr_worker, and introduce
a worker interface that each of these implement.

Signed-off-by: Eric Ernst <eric_ernst@apple.com>
This commit is contained in:
Eric Ernst
2022-04-13 13:39:29 -07:00
parent 14af2bdfa3
commit 52593cfc86
4 changed files with 308 additions and 17 deletions

View File

@@ -29,6 +29,7 @@ import (
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/integration/remote"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
metrics "github.com/docker/go-metrics"
@@ -43,8 +44,12 @@ var (
binarySizeGauge metrics.LabeledGauge
)
const (
stressNs string = "stress"
)
func init() {
ns := metrics.NewNamespace("stress", "", nil)
ns := metrics.NewNamespace(stressNs, "", nil)
// if you want more fine grained metrics then you can drill down with the metrics in prom that
// containerd is outputting
ct = ns.NewLabeledTimer("run", "Run time of a full container during the test", "commit")
@@ -59,6 +64,14 @@ func init() {
}
}
type worker interface {
run(ctx, tcxt context.Context)
getCount() int
incCount()
getFailures() int
incFailures()
}
type run struct {
total int
failures int
@@ -79,10 +92,10 @@ func (r *run) seconds() float64 {
return r.ended.Sub(r.started).Seconds()
}
func (r *run) gather(workers []*worker) *result {
func (r *run) gather(workers []worker) *result {
for _, w := range workers {
r.total += w.count
r.failures += w.failures
r.total += w.getCount()
r.failures += w.getFailures()
}
sec := r.seconds()
return &result{
@@ -125,6 +138,10 @@ func main() {
Value: 1,
Usage: "set the concurrency of the stress test",
},
cli.BoolFlag{
Name: "cri",
Usage: "utilize CRI to create pods for the stress test. This requires a runtime that matches CRI runtime handler. Example: --runtime runc",
},
cli.DurationFlag{
Name: "duration,d",
Value: 1 * time.Minute,
@@ -132,7 +149,7 @@ func main() {
},
cli.BoolFlag{
Name: "exec",
Usage: "add execs to the stress tests",
Usage: "add execs to the stress tests (non-CRI only)",
},
cli.StringFlag{
Name: "image,i",
@@ -175,6 +192,7 @@ func main() {
Address: context.GlobalString("address"),
Duration: context.GlobalDuration("duration"),
Concurrency: context.GlobalInt("concurrent"),
CRI: context.GlobalBool("cri"),
Exec: context.GlobalBool("exec"),
Image: context.GlobalString("image"),
JSON: context.GlobalBool("json"),
@@ -185,6 +203,11 @@ func main() {
if config.Metrics != "" {
return serve(config)
}
if config.CRI {
return criTest(config)
}
return test(config)
}
if err := app.Run(os.Args); err != nil {
@@ -195,6 +218,7 @@ func main() {
type config struct {
Concurrency int
CRI bool
Duration time.Duration
Address string
Exec bool
@@ -216,9 +240,91 @@ func serve(c config) error {
}
}()
checkBinarySizes()
if c.CRI {
return criTest(c)
}
return test(c)
}
func criTest(c config) error {
var (
timeout = 1 * time.Minute
wg sync.WaitGroup
ctx = namespaces.WithNamespace(context.Background(), stressNs)
criEndpoint = "unix:///run/containerd/containerd.sock"
)
client, err := remote.NewRuntimeService(criEndpoint, timeout)
if err != nil {
return fmt.Errorf("failed to create runtime service: %w", err)
}
if err := criCleanup(ctx, client); err != nil {
return err
}
tctx, cancel := context.WithTimeout(ctx, c.Duration)
go func() {
s := make(chan os.Signal, 1)
signal.Notify(s, syscall.SIGTERM, syscall.SIGINT)
<-s
cancel()
}()
// get runtime version:
version, err := client.Version("")
if err != nil {
return fmt.Errorf("failed to get runtime version: %w", err)
}
var (
workers []worker
r = &run{}
)
logrus.Info("starting stress test run...")
// create the workers along with their spec
for i := 0; i < c.Concurrency; i++ {
wg.Add(1)
w := &criWorker{
id: i,
wg: &wg,
client: client,
commit: fmt.Sprintf("%s-%s", version.RuntimeName, version.RuntimeVersion),
runtimeHandler: c.Runtime,
snapshotter: c.Snapshotter,
}
workers = append(workers, w)
}
// start the timer and run the worker
r.start()
for _, w := range workers {
go w.run(ctx, tctx)
}
// wait and end the timer
wg.Wait()
r.end()
results := r.gather(workers)
logrus.Infof("ending test run in %0.3f seconds", results.Seconds)
logrus.WithField("failures", r.failures).Infof(
"create/start/delete %d containers in %0.3f seconds (%0.3f c/sec) or (%0.3f sec/c)",
results.Total,
results.Seconds,
results.ContainersPerSecond,
results.SecondsPerContainer,
)
if c.JSON {
if err := json.NewEncoder(os.Stdout).Encode(results); err != nil {
return err
}
}
return nil
}
func test(c config) error {
var (
wg sync.WaitGroup
@@ -233,11 +339,17 @@ func test(c config) error {
if err := cleanup(ctx, client); err != nil {
return err
}
logrus.Infof("pulling %s", c.Image)
image, err := client.Pull(ctx, c.Image, containerd.WithPullUnpack, containerd.WithPullSnapshotter(c.Snapshotter))
if err != nil {
return err
}
v, err := client.Version(ctx)
if err != nil {
return err
}
tctx, cancel := context.WithTimeout(ctx, c.Duration)
go func() {
s := make(chan os.Signal, 1)
@@ -247,18 +359,15 @@ func test(c config) error {
}()
var (
workers []*worker
workers []worker
r = &run{}
)
logrus.Info("starting stress test run...")
v, err := client.Version(ctx)
if err != nil {
return err
}
// create the workers along with their spec
for i := 0; i < c.Concurrency; i++ {
wg.Add(1)
w := &worker{
w := &ctrWorker{
id: i,
wg: &wg,
image: image,
@@ -273,7 +382,7 @@ func test(c config) error {
for i := c.Concurrency; i < c.Concurrency+c.Concurrency; i++ {
wg.Add(1)
exec = &execWorker{
worker: worker{
ctrWorker: ctrWorker{
id: i,
wg: &wg,
image: image,