diff --git a/cmd/containerd-shim/main_unix.go b/cmd/containerd-shim/main_unix.go index efde0424e..88fbffda9 100644 --- a/cmd/containerd-shim/main_unix.go +++ b/cmd/containerd-shim/main_unix.go @@ -8,15 +8,18 @@ import ( "os" "runtime" "strings" + "time" "golang.org/x/sys/unix" "google.golang.org/grpc" + events "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/linux/shim" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/reaper" "github.com/containerd/containerd/version" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/urfave/cli" ) @@ -72,10 +75,14 @@ func main() { return err } server := grpc.NewServer() + e, err := connectEvents(context.GlobalString("address")) + if err != nil { + return err + } sv, err := shim.NewService( path, context.GlobalString("namespace"), - context.GlobalString("address"), + e, ) if err != nil { return err @@ -155,3 +162,35 @@ func dumpStacks() { buf = buf[:stackSize] logrus.Infof("=== BEGIN goroutine stack dump ===\n%s\n=== END goroutine stack dump ===", buf) } + +func connectEvents(address string) (events.EventsClient, error) { + conn, err := connect(address, dialer) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + return events.NewEventsClient(conn), nil +} + +func connect(address string, d func(string, time.Duration) (net.Conn, error)) (*grpc.ClientConn, error) { + gopts := []grpc.DialOption{ + grpc.WithBlock(), + grpc.WithInsecure(), + grpc.WithTimeout(100 * time.Second), + grpc.WithDialer(d), + grpc.FailOnNonTempDialError(true), + } + conn, err := grpc.Dial(dialAddress(address), gopts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial %q", address) + } + return conn, nil +} + +func dialer(address string, timeout time.Duration) (net.Conn, error) { + address = strings.TrimPrefix(address, "unix://") + return net.DialTimeout("unix", address, timeout) +} + +func dialAddress(address string) string { + return fmt.Sprintf("unix://%s", address) +} diff --git a/linux/bundle.go b/linux/bundle.go index dd07287b9..49615002f 100644 --- a/linux/bundle.go +++ b/linux/bundle.go @@ -9,21 +9,23 @@ import ( "os" "path/filepath" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/linux/runcopts" client "github.com/containerd/containerd/linux/shim" "github.com/containerd/containerd/runtime" "github.com/containerd/containerd/typeurl" ) -func loadBundle(path, namespace string) *bundle { +func loadBundle(path, namespace string, events *events.Exchange) *bundle { return &bundle{ path: path, namespace: namespace, + events: events, } } // newBundle creates a new bundle on disk at the provided path for the given id -func newBundle(path, namespace, id string, spec []byte) (b *bundle, err error) { +func newBundle(path, namespace, id string, spec []byte, events *events.Exchange) (b *bundle, err error) { if err := os.MkdirAll(path, 0711); err != nil { return nil, err } @@ -49,6 +51,7 @@ func newBundle(path, namespace, id string, spec []byte) (b *bundle, err error) { id: id, path: path, namespace: namespace, + events: events, }, err } @@ -56,13 +59,14 @@ type bundle struct { id string path string namespace string + events *events.Exchange } // NewShim connects to the shim managing the bundle and tasks func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote, debug bool, createOpts runtime.CreateOpts) (*client.Client, error) { opt := client.WithStart(binary, grpcAddress, debug) if !remote { - opt = client.WithLocal + opt = client.WithLocal(b.events) } var options runcopts.CreateOptions if createOpts.Options != nil { @@ -84,7 +88,7 @@ func (b *bundle) NewShim(ctx context.Context, binary, grpcAddress string, remote func (b *bundle) Connect(ctx context.Context, remote bool) (*client.Client, error) { opt := client.WithConnect if !remote { - opt = client.WithLocal + opt = client.WithLocal(b.events) } return client.New(ctx, client.Config{ Address: b.shimAddress(), diff --git a/linux/runtime.go b/linux/runtime.go index d2ff6a7d5..049c4328d 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -12,6 +12,7 @@ import ( "github.com/boltdb/bolt" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/identifiers" client "github.com/containerd/containerd/linux/shim" shim "github.com/containerd/containerd/linux/shim/v1" @@ -90,6 +91,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { tasks: runtime.NewTaskList(), db: m.(*bolt.DB), address: ic.Address, + events: ic.Events, } tasks, err := r.restoreTasks(ic.Context) if err != nil { @@ -114,6 +116,7 @@ type Runtime struct { monitor runtime.TaskMonitor tasks *runtime.TaskList db *bolt.DB + events *events.Exchange } func (r *Runtime) ID() string { @@ -130,7 +133,7 @@ func (r *Runtime) Create(ctx context.Context, id string, opts runtime.CreateOpts return nil, errors.Wrapf(err, "invalid task id") } - bundle, err := newBundle(filepath.Join(r.root, namespace), namespace, id, opts.Spec.Value) + bundle, err := newBundle(filepath.Join(r.root, namespace), namespace, id, opts.Spec.Value, r.events) if err != nil { return nil, err } @@ -203,7 +206,7 @@ func (r *Runtime) Delete(ctx context.Context, c runtime.Task) (*runtime.Exit, er } r.tasks.Delete(ctx, lc) - bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace) + bundle := loadBundle(filepath.Join(r.root, namespace, lc.id), namespace, r.events) if err := bundle.Delete(); err != nil { return nil, err } @@ -254,7 +257,7 @@ func (r *Runtime) loadTasks(ctx context.Context, ns string) ([]*Task, error) { continue } id := path.Name() - bundle := loadBundle(filepath.Join(r.root, ns, id), ns) + bundle := loadBundle(filepath.Join(r.root, ns, id), ns, r.events) s, err := bundle.Connect(ctx, r.remote) if err != nil { diff --git a/linux/shim/client.go b/linux/shim/client.go index 0ebadb082..30b54a909 100644 --- a/linux/shim/client.go +++ b/linux/shim/client.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" + "github.com/containerd/containerd/events" shim "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" "github.com/containerd/containerd/reaper" @@ -149,12 +150,14 @@ func WithConnect(ctx context.Context, config Config) (shim.ShimClient, io.Closer } // WithLocal uses an in process shim -func WithLocal(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { - service, err := NewService(config.Path, config.Namespace, "") - if err != nil { - return nil, nil, err +func WithLocal(events *events.Exchange) func(context.Context, Config) (shim.ShimClient, io.Closer, error) { + return func(ctx context.Context, config Config) (shim.ShimClient, io.Closer, error) { + service, err := NewService(config.Path, config.Namespace, &localEventsClient{forwarder: events}) + if err != nil { + return nil, nil, err + } + return NewLocal(service), nil, nil } - return NewLocal(service), nil, nil } type Config struct { diff --git a/linux/shim/service.go b/linux/shim/service.go index 5555394a8..ce7027e54 100644 --- a/linux/shim/service.go +++ b/linux/shim/service.go @@ -15,7 +15,6 @@ import ( events "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/api/types/task" "github.com/containerd/containerd/errdefs" - evt "github.com/containerd/containerd/events" shimapi "github.com/containerd/containerd/linux/shim/v1" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" @@ -32,23 +31,11 @@ var empty = &google_protobuf.Empty{} const RuncRoot = "/run/containerd/runc" // NewService returns a new shim service that can be used via GRPC -func NewService(path, namespace, address string) (*Service, error) { +func NewService(path, namespace string, client publisher) (*Service, error) { if namespace == "" { return nil, fmt.Errorf("shim namespace cannot be empty") } context := namespaces.WithNamespace(context.Background(), namespace) - var client publisher - if address != "" { - conn, err := connect(address, dialer) - if err != nil { - return nil, errors.Wrapf(err, "failed to dial %q", address) - } - client = events.NewEventsClient(conn) - } else { - client = &localEventsClient{ - forwarder: evt.NewExchange(), - } - } s := &Service{ path: path, processes: make(map[string]process),