diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index 2c6d0ff0a..f26c15007 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -11,6 +11,7 @@ import ( "net" "os" "os/exec" + "os/signal" "runtime" "strings" "sync" @@ -80,6 +81,9 @@ func executeShim() error { if err != nil { return err } + dump := make(chan os.Signal, 32) + signal.Notify(dump, syscall.SIGUSR1) + path, err := os.Getwd() if err != nil { return err @@ -111,6 +115,11 @@ func executeShim() error { "path": path, "namespace": namespaceFlag, }) + go func() { + for range dump { + dumpStacks(logger) + } + }() return handleSignals(logger, signals, server, sv) } @@ -171,8 +180,6 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.S sv.Delete(context.Background(), &ptypes.Empty{}) close(done) }) - case unix.SIGUSR1: - dumpStacks(logger) } } } @@ -213,8 +220,11 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event if err != nil { return err } - exit := <-c - if exit.Status != 0 { + status, err := reaper.Default.Wait(cmd, c) + if err != nil { + return err + } + if status != 0 { return errors.New("failed to publish event") } return nil diff --git a/cmd/containerd-shim/shim_linux.go b/cmd/containerd-shim/shim_linux.go index 140795ede..73c1b68f9 100644 --- a/cmd/containerd-shim/shim_linux.go +++ b/cmd/containerd-shim/shim_linux.go @@ -3,6 +3,7 @@ package main import ( "os" "os/signal" + "syscall" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/sys" @@ -14,7 +15,7 @@ import ( // sub-reaper so that the container processes are reparented func setupSignals() (chan os.Signal, error) { signals := make(chan os.Signal, 2048) - signal.Notify(signals) + signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGCHLD) // make sure runc is setup to use the monitor // for waiting on processes runc.Monitor = reaper.Default diff --git a/cmd/containerd-stress/main.go b/cmd/containerd-stress/main.go index 05864a424..522fc7400 100644 --- a/cmd/containerd-stress/main.go +++ b/cmd/containerd-stress/main.go @@ -7,6 +7,7 @@ import ( "os/signal" "path/filepath" "runtime" + "strconv" "strings" "sync" "syscall" @@ -51,6 +52,10 @@ func main() { Value: 1 * time.Minute, Usage: "set the duration of the stress test", }, + cli.BoolFlag{ + Name: "exec", + Usage: "add execs to the stress tests", + }, } app.Before = func(context *cli.Context) error { if context.GlobalBool("debug") { @@ -63,6 +68,7 @@ func main() { Address: context.GlobalString("address"), Duration: context.GlobalDuration("duration"), Concurrency: context.GlobalInt("concurrent"), + Exec: context.GlobalBool("exec"), } return test(config) } @@ -76,6 +82,7 @@ type config struct { Concurrency int Duration time.Duration Address string + Exec bool } func (c config) newClient() (*containerd.Client, error) { @@ -115,12 +122,17 @@ func test(c config) error { start = time.Now() ) logrus.Info("starting stress test run...") + args := oci.WithProcessArgs("true") + if c.Exec { + args = oci.WithProcessArgs("sleep", "10") + } for i := 0; i < c.Concurrency; i++ { wg.Add(1) spec, err := oci.GenerateSpec(ctx, client, &containers.Container{}, oci.WithImageConfig(image), - oci.WithProcessArgs("true")) + args, + ) if err != nil { return err } @@ -130,6 +142,7 @@ func test(c config) error { spec: spec, image: image, client: client, + doExec: c.Exec, } workers = append(workers, w) go w.run(ctx, tctx) @@ -157,15 +170,15 @@ func test(c config) error { } type worker struct { - id int - wg *sync.WaitGroup - count int - failures int - waitContext context.Context + id int + wg *sync.WaitGroup + count int + failures int client *containerd.Client image containerd.Image spec *specs.Spec + doExec bool } func (w *worker) run(ctx, tctx context.Context) { @@ -173,12 +186,6 @@ func (w *worker) run(ctx, tctx context.Context) { w.wg.Done() logrus.Infof("worker %d finished", w.id) }() - wctx, cancel := context.WithCancel(ctx) - w.waitContext = wctx - go func() { - <-tctx.Done() - cancel() - }() for { select { case <-tctx.Done(): @@ -222,10 +229,20 @@ func (w *worker) runContainer(ctx context.Context, id string) error { if err != nil { return err } - if err := task.Start(ctx); err != nil { return err } + if w.doExec { + for i := 0; i < 256; i++ { + if err := w.exec(ctx, i, task); err != nil { + w.failures++ + logrus.WithError(err).Error("exec failure") + } + } + } + if err := task.Kill(ctx, syscall.SIGKILL); err != nil { + return err + } status := <-statusC _, _, err = status.Result() if err != nil { @@ -237,6 +254,25 @@ func (w *worker) runContainer(ctx context.Context, id string) error { return nil } +func (w *worker) exec(ctx context.Context, i int, t containerd.Task) error { + pSpec := *w.spec.Process + pSpec.Args = []string{"true"} + process, err := t.Exec(ctx, strconv.Itoa(i), &pSpec, cio.NullIO) + if err != nil { + return err + } + defer process.Delete(ctx) + status, err := process.Wait(ctx) + if err != nil { + return err + } + if err := process.Start(ctx); err != nil { + return err + } + <-status + return nil +} + func (w *worker) getID() string { return fmt.Sprintf("%d-%d", w.id, w.count) } diff --git a/linux/proc/exec.go b/linux/proc/exec.go index 8154af550..00a8547b8 100644 --- a/linux/proc/exec.go +++ b/linux/proc/exec.go @@ -143,6 +143,7 @@ func (e *execProcess) start(ctx context.Context) (err error) { opts.ConsoleSocket = socket } if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil { + close(e.waitBlock) return e.parent.runtimeError(err, "OCI runtime exec failed") } if e.stdio.Stdin != "" { diff --git a/linux/shim/service.go b/linux/shim/service.go index 5568621a1..1150d1cc8 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -148,7 +148,6 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR if p == nil { return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created") } - if err := p.Delete(ctx); err != nil { return nil, err } @@ -480,7 +479,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er func (s *Service) forward(publisher events.Publisher) { for e := range s.events { if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil { - logrus.WithError(err).Error("post event") + log.G(s.context).WithError(err).Error("post event") } } }