From 723f37d8466135dd8ed01b08ace22132d584f5bc Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 28 Nov 2017 14:16:09 -0500 Subject: [PATCH 1/2] Add exec support to stress test tool Signed-off-by: Michael Crosby --- cmd/containerd-stress/main.go | 62 +++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/cmd/containerd-stress/main.go b/cmd/containerd-stress/main.go index 05864a424..522fc7400 100644 --- a/cmd/containerd-stress/main.go +++ b/cmd/containerd-stress/main.go @@ -7,6 +7,7 @@ import ( "os/signal" "path/filepath" "runtime" + "strconv" "strings" "sync" "syscall" @@ -51,6 +52,10 @@ func main() { Value: 1 * time.Minute, Usage: "set the duration of the stress test", }, + cli.BoolFlag{ + Name: "exec", + Usage: "add execs to the stress tests", + }, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { @@ -63,6 +68,7 @@ func main() { Address: context.GlobalString("address"), Duration: context.GlobalDuration("duration"), Concurrency: context.GlobalInt("concurrent"), + Exec: context.GlobalBool("exec"), } return test(config) } @@ -76,6 +82,7 @@ type config struct { Concurrency int Duration time.Duration Address string + Exec bool } func (c config) newClient() (*containerd.Client, error) { @@ -115,12 +122,17 @@ func test(c config) error { start = time.Now() ) logrus.Info("starting stress test run...") + args := oci.WithProcessArgs("true") + if c.Exec { + args = oci.WithProcessArgs("sleep", "10") + } for i := 0; i < c.Concurrency; i++ { wg.Add(1) spec, err := oci.GenerateSpec(ctx, client, &containers.Container{}, oci.WithImageConfig(image), - oci.WithProcessArgs("true")) + args, + ) if err != nil { return err } @@ -130,6 +142,7 @@ func test(c config) error { spec: spec, image: image, client: client, + doExec: c.Exec, } workers = append(workers, w) go w.run(ctx, tctx) @@ -157,15 +170,15 @@ func test(c config) error { } type worker struct { - id int - wg *sync.WaitGroup - count int - failures int - waitContext context.Context + id int + wg *sync.WaitGroup + count int + failures int client *containerd.Client image containerd.Image spec *specs.Spec + doExec bool } func (w *worker) run(ctx, tctx context.Context) { @@ -173,12 +186,6 @@ func (w *worker) run(ctx, tctx context.Context) { w.wg.Done() logrus.Infof("worker %d finished", w.id) }() - wctx, cancel := context.WithCancel(ctx) - w.waitContext = wctx - go func() { - <-tctx.Done() - cancel() - }() for { select { case <-tctx.Done(): @@ -222,10 +229,20 @@ func (w *worker) runContainer(ctx context.Context, id string) error { if err != nil { return err } - if err := task.Start(ctx); err != nil { return err } + if w.doExec { + for i := 0; i < 256; i++ { + if err := w.exec(ctx, i, task); err != nil { + w.failures++ + logrus.WithError(err).Error("exec failure") + } + } + } + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + return err + } status := <-statusC _, _, err = status.Result() if err != nil { @@ -237,6 +254,25 @@ func (w *worker) runContainer(ctx context.Context, id string) error { return nil } +func (w *worker) exec(ctx context.Context, i int, t containerd.Task) error { + pSpec := *w.spec.Process + pSpec.Args = []string{"true"} + process, err := t.Exec(ctx, strconv.Itoa(i), &pSpec, cio.NullIO) + if err != nil { + return err + } + defer process.Delete(ctx) + status, err := process.Wait(ctx) + if err != nil { + return err + } + if err := process.Start(ctx); err != nil { + return err + } + <-status + return nil +} + func (w *worker) getID() string { return fmt.Sprintf("%d-%d", w.id, w.count) } From 74b3cb3391ab292ffde4d096c850977d2738aba8 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 28 Nov 2017 14:17:22 -0500 Subject: [PATCH 2/2] Fix exit event handling in shim Could issues where when exec processes fail the wait block is not released. Second, you could not dump stacks if the reaper loop locks up. Third, the publisher was not waiting on the correct pid. Signed-off-by: Michael Crosby --- cmd/containerd-shim/main_unix.go | 18 ++++++++++++++---- cmd/containerd-shim/shim_linux.go | 3 ++- linux/proc/exec.go | 1 + linux/shim/service.go | 3 +-- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 2c6d0ff0a..f26c15007 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -11,6 +11,7 @@ import ( "net" "os" "os/exec" + "os/signal" "runtime" "strings" "sync" @@ -80,6 +81,9 @@ func executeShim() error { if err != nil { return err } + dump := make(chan os.Signal, 32) + signal.Notify(dump, syscall.SIGUSR1) + path, err := os.Getwd() if err != nil { return err @@ -111,6 +115,11 @@ func executeShim() error { "path": path, "namespace": namespaceFlag, }) + go func() { + for range dump { + dumpStacks(logger) + } + }() return handleSignals(logger, signals, server, sv) } @@ -171,8 +180,6 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.S sv.Delete(context.Background(), &ptypes.Empty{}) close(done) }) - case unix.SIGUSR1: - dumpStacks(logger) } } } @@ -213,8 +220,11 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event if err != nil { return err } - exit := <-c - if exit.Status != 0 { + status, err := reaper.Default.Wait(cmd, c) + if err != nil { + return err + } + if status != 0 { return errors.New("failed to publish event") } return nil diff --git a/cmd/containerd-shim/shim_linux.go b/cmd/containerd-shim/shim_linux.go index 140795ede..73c1b68f9 100644 --- a/cmd/containerd-shim/shim_linux.go +++ b/cmd/containerd-shim/shim_linux.go @@ -3,6 +3,7 @@ package main import ( "os" "os/signal" + "syscall" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/sys" @@ -14,7 +15,7 @@ import ( // sub-reaper so that the container processes are reparented func setupSignals() (chan os.Signal, error) { signals := make(chan os.Signal, 2048) - signal.Notify(signals) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGCHLD) // make sure runc is setup to use the monitor // for waiting on processes runc.Monitor = reaper.Default diff --git a/linux/proc/exec.go b/linux/proc/exec.go index 8154af550..00a8547b8 100644 --- a/linux/proc/exec.go +++ b/linux/proc/exec.go @@ -143,6 +143,7 @@ func (e *execProcess) start(ctx context.Context) (err error) { opts.ConsoleSocket = socket } if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil { + close(e.waitBlock) return e.parent.runtimeError(err, "OCI runtime exec failed") } if e.stdio.Stdin != "" { diff --git a/linux/shim/service.go b/linux/shim/service.go index 5568621a1..1150d1cc8 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -148,7 +148,6 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.Delete(ctx); err != nil { return nil, err } @@ -480,7 +479,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *Service) forward(publisher events.Publisher) { for e := range s.events { if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { - logrus.WithError(err).Error("post event") + log.G(s.context).WithError(err).Error("post event") } } }