Merge pull request #1820 from crosbymichael/shim-lockup

Resolve issues with signal handling and exec exit events
This commit is contained in:
Kenfe-Mickaël Laventure 2017-11-28 11:57:21 -08:00 committed by GitHub
commit 118c0a279e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 20 deletions

View File

@ -11,6 +11,7 @@ import (
"net"
"os"
"os/exec"
"os/signal"
"runtime"
"strings"
"sync"
@ -80,6 +81,9 @@ func executeShim() error {
if err != nil {
return err
}
dump := make(chan os.Signal, 32)
signal.Notify(dump, syscall.SIGUSR1)
path, err := os.Getwd()
if err != nil {
return err
@ -111,6 +115,11 @@ func executeShim() error {
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
return handleSignals(logger, signals, server, sv)
}
@ -171,8 +180,6 @@ func handleSignals(logger *logrus.Entry, signals chan os.Signal, server *ttrpc.S
sv.Delete(context.Background(), &ptypes.Empty{})
close(done)
})
case unix.SIGUSR1:
dumpStacks(logger)
}
}
}
@ -213,8 +220,11 @@ func (l *remoteEventsPublisher) Publish(ctx context.Context, topic string, event
if err != nil {
return err
}
exit := <-c
if exit.Status != 0 {
status, err := reaper.Default.Wait(cmd, c)
if err != nil {
return err
}
if status != 0 {
return errors.New("failed to publish event")
}
return nil

View File

@ -3,6 +3,7 @@ package main
import (
"os"
"os/signal"
"syscall"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/sys"
@ -14,7 +15,7 @@ import (
// sub-reaper so that the container processes are reparented
func setupSignals() (chan os.Signal, error) {
signals := make(chan os.Signal, 2048)
signal.Notify(signals)
signal.Notify(signals, syscall.SIGTERM, syscall.SIGINT, syscall.SIGCHLD)
// make sure runc is setup to use the monitor
// for waiting on processes
runc.Monitor = reaper.Default

View File

@ -7,6 +7,7 @@ import (
"os/signal"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
@ -51,6 +52,10 @@ func main() {
Value: 1 * time.Minute,
Usage: "set the duration of the stress test",
},
cli.BoolFlag{
Name: "exec",
Usage: "add execs to the stress tests",
},
}
app.Before = func(context *cli.Context) error {
if context.GlobalBool("debug") {
@ -63,6 +68,7 @@ func main() {
Address: context.GlobalString("address"),
Duration: context.GlobalDuration("duration"),
Concurrency: context.GlobalInt("concurrent"),
Exec: context.GlobalBool("exec"),
}
return test(config)
}
@ -76,6 +82,7 @@ type config struct {
Concurrency int
Duration time.Duration
Address string
Exec bool
}
func (c config) newClient() (*containerd.Client, error) {
@ -115,12 +122,17 @@ func test(c config) error {
start = time.Now()
)
logrus.Info("starting stress test run...")
args := oci.WithProcessArgs("true")
if c.Exec {
args = oci.WithProcessArgs("sleep", "10")
}
for i := 0; i < c.Concurrency; i++ {
wg.Add(1)
spec, err := oci.GenerateSpec(ctx, client,
&containers.Container{},
oci.WithImageConfig(image),
oci.WithProcessArgs("true"))
args,
)
if err != nil {
return err
}
@ -130,6 +142,7 @@ func test(c config) error {
spec: spec,
image: image,
client: client,
doExec: c.Exec,
}
workers = append(workers, w)
go w.run(ctx, tctx)
@ -157,15 +170,15 @@ func test(c config) error {
}
type worker struct {
id int
wg *sync.WaitGroup
count int
failures int
waitContext context.Context
id int
wg *sync.WaitGroup
count int
failures int
client *containerd.Client
image containerd.Image
spec *specs.Spec
doExec bool
}
func (w *worker) run(ctx, tctx context.Context) {
@ -173,12 +186,6 @@ func (w *worker) run(ctx, tctx context.Context) {
w.wg.Done()
logrus.Infof("worker %d finished", w.id)
}()
wctx, cancel := context.WithCancel(ctx)
w.waitContext = wctx
go func() {
<-tctx.Done()
cancel()
}()
for {
select {
case <-tctx.Done():
@ -222,10 +229,20 @@ func (w *worker) runContainer(ctx context.Context, id string) error {
if err != nil {
return err
}
if err := task.Start(ctx); err != nil {
return err
}
if w.doExec {
for i := 0; i < 256; i++ {
if err := w.exec(ctx, i, task); err != nil {
w.failures++
logrus.WithError(err).Error("exec failure")
}
}
}
if err := task.Kill(ctx, syscall.SIGKILL); err != nil {
return err
}
status := <-statusC
_, _, err = status.Result()
if err != nil {
@ -237,6 +254,25 @@ func (w *worker) runContainer(ctx context.Context, id string) error {
return nil
}
func (w *worker) exec(ctx context.Context, i int, t containerd.Task) error {
pSpec := *w.spec.Process
pSpec.Args = []string{"true"}
process, err := t.Exec(ctx, strconv.Itoa(i), &pSpec, cio.NullIO)
if err != nil {
return err
}
defer process.Delete(ctx)
status, err := process.Wait(ctx)
if err != nil {
return err
}
if err := process.Start(ctx); err != nil {
return err
}
<-status
return nil
}
func (w *worker) getID() string {
return fmt.Sprintf("%d-%d", w.id, w.count)
}

View File

@ -143,6 +143,7 @@ func (e *execProcess) start(ctx context.Context) (err error) {
opts.ConsoleSocket = socket
}
if err := e.parent.runtime.Exec(ctx, e.parent.id, e.spec, opts); err != nil {
close(e.waitBlock)
return e.parent.runtimeError(err, "OCI runtime exec failed")
}
if e.stdio.Stdin != "" {

View File

@ -148,7 +148,6 @@ func (s *Service) Delete(ctx context.Context, r *ptypes.Empty) (*shimapi.DeleteR
if p == nil {
return nil, errdefs.ToGRPCf(errdefs.ErrFailedPrecondition, "container must be created")
}
if err := p.Delete(ctx); err != nil {
return nil, err
}
@ -480,7 +479,7 @@ func (s *Service) getContainerPids(ctx context.Context, id string) ([]uint32, er
func (s *Service) forward(publisher events.Publisher) {
for e := range s.events {
if err := publisher.Publish(s.context, getTopic(s.context, e), e); err != nil {
logrus.WithError(err).Error("post event")
log.G(s.context).WithError(err).Error("post event")
}
}
}