Merge pull request #2109 from crosbymichael/onclose

Use ttrpc OnClose to fix restored dead shims
This commit is contained in:
Stephen Day 2018-02-06 10:57:29 -08:00 committed by GitHub
commit 0d9995e6e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 47 additions and 28 deletions

View File

@ -84,9 +84,9 @@ func ShimLocal(exchange *exchange.Exchange) ShimOpt {
}
// ShimConnect is a ShimOpt for connecting to an existing remote shim
func ShimConnect() ShimOpt {
func ShimConnect(onClose func()) ShimOpt {
return func(b *bundle, ns string, ropts *runctypes.RuncOptions) (shim.Config, client.Opt) {
return b.shimConfig(ns, ropts), client.WithConnect(b.shimAddress(ns))
return b.shimConfig(ns, ropts), client.WithConnect(b.shimAddress(ns), onClose)
}
}

View File

@ -201,10 +201,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts
"id": id,
"namespace": namespace,
}).Warn("cleaning up after killed shim")
err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid)
if err == nil {
r.tasks.Delete(ctx, lc)
} else {
if err = r.cleanupAfterDeadShim(context.Background(), bundle, namespace, id, lc.pid); err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": namespace,
@ -313,7 +310,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er
}
return nil, errdefs.FromGRPC(err)
}
r.tasks.Delete(ctx, lc)
r.tasks.Delete(ctx, lc.id)
if err := lc.shim.KillShim(ctx); err != nil {
log.G(ctx).WithError(err).Error("failed to kill shim")
}
@ -383,7 +380,17 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) {
)
ctx = namespaces.WithNamespace(ctx, ns)
pid, _ := runc.ReadPidFile(filepath.Join(bundle.path, proc.InitPidFile))
s, err := bundle.NewShimClient(ctx, ns, ShimConnect(), nil)
s, err := bundle.NewShimClient(ctx, ns, ShimConnect(func() {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
"namespace": ns,
}).Error("connecting to shim")
err := r.cleanupAfterDeadShim(ctx, bundle, ns, id, pid)
if err != nil {
log.G(ctx).WithError(err).WithField("bundle", bundle.path).
Error("cleaning up after dead shim")
}
}), nil)
if err != nil {
log.G(ctx).WithError(err).WithFields(logrus.Fields{
"id": id,
@ -433,6 +440,7 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns,
ExitedAt: exitedAt,
})
r.tasks.Delete(ctx, id)
if err := bundle.Delete(); err != nil {
log.G(ctx).WithError(err).Error("delete bundle")
}
@ -448,12 +456,10 @@ func (r *Runtime) cleanupAfterDeadShim(ctx context.Context, bundle *bundle, ns,
}
func (r *Runtime) terminate(ctx context.Context, bundle *bundle, ns, id string) error {
ctx = namespaces.WithNamespace(ctx, ns)
rt, err := r.getRuntime(ctx, ns, id)
if err != nil {
return err
}
if err := rt.Delete(ctx, id, &runc.DeleteOpts{
Force: true,
}); err != nil {

View File

@ -80,7 +80,7 @@ func WithStart(binary, address, daemonAddress, cgroup string, debug bool, exitHa
if err = sys.SetOOMScore(cmd.Process.Pid, sys.OOMScoreMaxKillable); err != nil {
return nil, nil, errors.Wrap(err, "failed to set OOM Score on shim")
}
c, clo, err := WithConnect(address)(ctx, config)
c, clo, err := WithConnect(address, func() {})(ctx, config)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to connect")
}
@ -149,13 +149,15 @@ func annonDialer(address string, timeout time.Duration) (net.Conn, error) {
}
// WithConnect connects to an existing shim
func WithConnect(address string) Opt {
func WithConnect(address string, onClose func()) Opt {
return func(ctx context.Context, config shim.Config) (shimapi.ShimService, io.Closer, error) {
conn, err := connect(address, annonDialer)
if err != nil {
return nil, nil, err
}
return shimapi.NewShimClient(ttrpc.NewClient(conn)), conn, nil
client := ttrpc.NewClient(conn)
client.OnClose(onClose)
return shimapi.NewShimClient(client), conn, nil
}
}

View File

@ -92,7 +92,7 @@ func (l *TaskList) AddWithNamespace(namespace string, t Task) error {
}
// Delete a task
func (l *TaskList) Delete(ctx context.Context, t Task) {
func (l *TaskList) Delete(ctx context.Context, id string) {
l.mu.Lock()
defer l.mu.Unlock()
namespace, err := namespaces.NamespaceRequired(ctx)
@ -101,6 +101,6 @@ func (l *TaskList) Delete(ctx context.Context, t Task) {
}
tasks, ok := l.tasks[namespace]
if ok {
delete(tasks, t.ID())
delete(tasks, id)
}
}

View File

@ -40,5 +40,5 @@ github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
google.golang.org/genproto d80a6e20e776b0b17a324d0ba1ab50a39c8e8944
golang.org/x/text 19e51611da83d6be54ddafce4a4af510cb3e9ea4
github.com/dmcgowan/go-tar go1.10
github.com/stevvooe/ttrpc d2710463e497617f16f26d1e715a3308609e7982
github.com/stevvooe/ttrpc d4528379866b0ce7e9d71f3eb96f0582fc374577
github.com/syndtr/gocapability db04d3cc01c8b54962a58ec7e491717d06cfcc16

View File

@ -5,6 +5,7 @@ import (
"context"
"encoding/binary"
"io"
"net"
"sync"
"github.com/pkg/errors"
@ -60,16 +61,18 @@ func writeMessageHeader(w io.Writer, p []byte, mh messageHeader) error {
var buffers sync.Pool
type channel struct {
conn net.Conn
bw *bufio.Writer
br *bufio.Reader
hrbuf [messageHeaderLength]byte // avoid alloc when reading header
hwbuf [messageHeaderLength]byte
}
func newChannel(w io.Writer, r io.Reader) *channel {
func newChannel(conn net.Conn) *channel {
return &channel{
bw: bufio.NewWriter(w),
br: bufio.NewReader(r),
conn: conn,
bw: bufio.NewWriter(conn),
br: bufio.NewReader(conn),
}
}

View File

@ -27,6 +27,7 @@ type Client struct {
closed chan struct{}
closeOnce sync.Once
closeFunc func()
done chan struct{}
err error
}
@ -35,10 +36,11 @@ func NewClient(conn net.Conn) *Client {
c := &Client{
codec: codec{},
conn: conn,
channel: newChannel(conn, conn),
channel: newChannel(conn),
calls: make(chan *callRequest),
closed: make(chan struct{}),
done: make(chan struct{}),
closeFunc: func() {},
}
go c.run()
@ -113,6 +115,11 @@ func (c *Client) Close() error {
return nil
}
// OnClose allows a close func to be called when the server is closed
func (c *Client) OnClose(closer func()) {
c.closeFunc = closer
}
type message struct {
messageHeader
p []byte
@ -158,6 +165,7 @@ func (c *Client) run() {
defer c.conn.Close()
defer close(c.done)
defer c.closeFunc()
for {
select {
@ -199,7 +207,7 @@ func (c *Client) run() {
}
// broadcast the shutdown error to the remaining waiters.
for _, waiter := range waiters {
waiter.errs <- shutdownErr
waiter.errs <- c.err
}
return
}

View File

@ -281,7 +281,7 @@ func (c *serverConn) run(sctx context.Context) {
)
var (
ch = newChannel(c.conn, c.conn)
ch = newChannel(c.conn)
ctx, cancel = context.WithCancel(sctx)
active int
state connState = connStateIdle

View File

@ -195,7 +195,7 @@ func (r *windowsRuntime) Delete(ctx context.Context, t runtime.Task) (*runtime.E
}
wt.cleanup()
r.tasks.Delete(ctx, t)
r.tasks.Delete(ctx, t.ID())
r.publisher.Publish(ctx,
runtime.TaskDeleteEventTopic,