diff --git a/linux/bundle.go b/linux/bundle.go index fbed32224..f1d957cf8 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -84,9 +84,9 @@ func ShimLocal(exchange *exchange.Exchange) ShimOpt { } // ShimConnect is a ShimOpt for connecting to an existing remote shim -func ShimConnect() ShimOpt { +func ShimConnect(onClose func()) ShimOpt { return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) { - return b.shimConfig(ns, ropts), client.WithConnect(b.shimAddress(ns)) + return b.shimConfig(ns, ropts), client.WithConnect(b.shimAddress(ns), onClose) } } diff --git a/linux/runtime.go b/linux/runtime.go index 5eb08c187..494c6868d 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -201,10 +201,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts "id": id, "namespace": namespace, }).Warn("cleaning up after killed shim") - err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid) - if err == nil { - r.tasks.Delete(ctx, lc) - } else { + if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid); err != nil { log.G(ctx).WithError(err).WithFields(logrus.Fields{ "id": id, "namespace": namespace, @@ -313,7 +310,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er } return nil, errdefs.FromGRPC(err) } - r.tasks.Delete(ctx, lc) + r.tasks.Delete(ctx, lc.id) if err := lc.shim.KillShim(ctx); err != nil { log.G(ctx).WithError(err).Error("failed to kill shim") } @@ -383,7 +380,17 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { ) ctx = namespaces.WithNamespace(ctx, ns) pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile)) - s, err := bundle.NewShimClient(ctx, ns, ShimConnect(), nil) + s, err := bundle.NewShimClient(ctx, ns, ShimConnect(func() { + log.G(ctx).WithError(err).WithFields(logrus.Fields{ + "id": id, + "namespace": ns, + }).Error("connecting to shim") + err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid) + if err != nil { + log.G(ctx).WithError(err).WithField("bundle", bundle.path). + Error("cleaning up after dead shim") + } + }), nil) if err != nil { log.G(ctx).WithError(err).WithFields(logrus.Fields{ "id": id, @@ -433,6 +440,7 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, ExitedAt: exitedAt, }) + r.tasks.Delete(ctx, id) if err := bundle.Delete(); err != nil { log.G(ctx).WithError(err).Error("delete bundle") } @@ -448,12 +456,10 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns, } func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error { - ctx = namespaces.WithNamespace(ctx, ns) rt, err := r.getRuntime(ctx, ns, id) if err != nil { return err } - if err := rt.Delete(ctx, id, &runc.DeleteOpts{ Force: true, }); err != nil { diff --git a/linux/shim/client/client.go b/linux/shim/client/client.go index 6dd1a9fd7..aea7c8172 100644 --- a/linux/shim/client/client.go +++ b/linux/shim/client/client.go @@ -80,7 +80,7 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa if err = sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil { return nil, nil, errors.Wrap(err, "failed to set OOM Score on shim") } - c, clo, err := WithConnect(address)(ctx, config) + c, clo, err := WithConnect(address, func() {})(ctx, config) if err != nil { return nil, nil, errors.Wrap(err, "failed to connect") } @@ -149,13 +149,15 @@ func annonDialer(address string, timeout time.Duration) (net.Conn, error) { } // WithConnect connects to an existing shim -func WithConnect(address string) Opt { +func WithConnect(address string, onClose func()) Opt { return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) { conn, err := connect(address, annonDialer) if err != nil { return nil, nil, err } - return shimapi.NewShimClient(ttrpc.NewClient(conn)), conn, nil + client := ttrpc.NewClient(conn) + client.OnClose(onClose) + return shimapi.NewShimClient(client), conn, nil } } diff --git a/runtime/task_list.go b/runtime/task_list.go index 05f34c323..98fc90974 100644 --- a/runtime/task_list.go +++ b/runtime/task_list.go @@ -92,7 +92,7 @@ func (l *TaskList) AddWithNamespace(namespace string, t Task) error { } // Delete a task -func (l *TaskList) Delete(ctx context.Context, t Task) { +func (l *TaskList) Delete(ctx context.Context, id string) { l.mu.Lock() defer l.mu.Unlock() namespace, err := namespaces.NamespaceRequired(ctx) @@ -101,6 +101,6 @@ func (l *TaskList) Delete(ctx context.Context, t Task) { } tasks, ok := l.tasks[namespace] if ok { - delete(tasks, t.ID()) + delete(tasks, id) } } diff --git a/vendor.conf b/vendor.conf index 67910bdca..bd9359413 100644 --- a/vendor.conf +++ b/vendor.conf @@ -40,5 +40,5 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944 golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4 github.com/dmcgowan/go-tar go1.10 -github.com/stevvooe/ttrpc d2710463e497617f16f26d1e715a3308609e7982 +github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577 github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16 diff --git a/vendor/github.com/stevvooe/ttrpc/channel.go b/vendor/github.com/stevvooe/ttrpc/channel.go index 4a33827a4..9493d6862 100644 --- a/vendor/github.com/stevvooe/ttrpc/channel.go +++ b/vendor/github.com/stevvooe/ttrpc/channel.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "io" + "net" "sync" "github.com/pkg/errors" @@ -60,16 +61,18 @@ func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error { var buffers sync.Pool type channel struct { + conn net.Conn bw *bufio.Writer br *bufio.Reader hrbuf [messageHeaderLength]byte // avoid alloc when reading header hwbuf [messageHeaderLength]byte } -func newChannel(w io.Writer, r io.Reader) *channel { +func newChannel(conn net.Conn) *channel { return &channel{ - bw: bufio.NewWriter(w), - br: bufio.NewReader(r), + conn: conn, + bw: bufio.NewWriter(conn), + br: bufio.NewReader(conn), } } diff --git a/vendor/github.com/stevvooe/ttrpc/client.go b/vendor/github.com/stevvooe/ttrpc/client.go index ae367f90e..f04718167 100644 --- a/vendor/github.com/stevvooe/ttrpc/client.go +++ b/vendor/github.com/stevvooe/ttrpc/client.go @@ -27,18 +27,20 @@ type Client struct { closed chan struct{} closeOnce sync.Once + closeFunc func() done chan struct{} err error } func NewClient(conn net.Conn) *Client { c := &Client{ - codec: codec{}, - conn: conn, - channel: newChannel(conn, conn), - calls: make(chan *callRequest), - closed: make(chan struct{}), - done: make(chan struct{}), + codec: codec{}, + conn: conn, + channel: newChannel(conn), + calls: make(chan *callRequest), + closed: make(chan struct{}), + done: make(chan struct{}), + closeFunc: func() {}, } go c.run() @@ -113,6 +115,11 @@ func (c *Client) Close() error { return nil } +// OnClose allows a close func to be called when the server is closed +func (c *Client) OnClose(closer func()) { + c.closeFunc = closer +} + type message struct { messageHeader p []byte @@ -158,6 +165,7 @@ func (c *Client) run() { defer c.conn.Close() defer close(c.done) + defer c.closeFunc() for { select { @@ -199,7 +207,7 @@ func (c *Client) run() { } // broadcast the shutdown error to the remaining waiters. for _, waiter := range waiters { - waiter.errs <- shutdownErr + waiter.errs <- c.err } return } diff --git a/vendor/github.com/stevvooe/ttrpc/server.go b/vendor/github.com/stevvooe/ttrpc/server.go index 5d688e5d5..fd29b719e 100644 --- a/vendor/github.com/stevvooe/ttrpc/server.go +++ b/vendor/github.com/stevvooe/ttrpc/server.go @@ -281,7 +281,7 @@ func (c *serverConn) run(sctx context.Context) { ) var ( - ch = newChannel(c.conn, c.conn) + ch = newChannel(c.conn) ctx, cancel = context.WithCancel(sctx) active int state connState = connStateIdle diff --git a/windows/runtime.go b/windows/runtime.go index 542c97363..d3721d4e3 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -195,7 +195,7 @@ func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.E } wt.cleanup() - r.tasks.Delete(ctx, t) + r.tasks.Delete(ctx, t.ID()) r.publisher.Publish(ctx, runtime.TaskDeleteEventTopic,