From 8d135d28420a1f8aa7ce8bb2f56103555f380a5f Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 17 Aug 2021 11:05:07 -0700 Subject: [PATCH] Add support for shim plugins Refactor shim v2 to load and register plugins. Update init shim interface to not require task service implementation on returned service, but register as plugin if it is. Signed-off-by: Derek McGowan --- pkg/cri/server/service.go | 2 +- plugin/plugin.go | 19 +--- runtime/v2/shim/shim.go | 203 +++++++++++++++++++++++++------------- services/server/server.go | 28 ++++-- 4 files changed, 161 insertions(+), 91 deletions(-) diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index 8d8cf0f73..e8901d819 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -65,7 +65,7 @@ type CRIService interface { Run() error // io.Closer is used by containerd to gracefully stop cri service. io.Closer - plugin.Service + Register(*grpc.Server) error grpcServices } diff --git a/plugin/plugin.go b/plugin/plugin.go index 9bbfb0d9e..b8e5157c5 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,9 +20,7 @@ import ( "fmt" "sync" - "github.com/containerd/ttrpc" "github.com/pkg/errors" - "google.golang.org/grpc" ) var ( @@ -63,6 +61,8 @@ const ( ServicePlugin Type = "io.containerd.service.v1" // GRPCPlugin implements a grpc service GRPCPlugin Type = "io.containerd.grpc.v1" + // TTRPCPlugin implements a ttrpc shim service + TTRPCPlugin Type = "io.containerd.ttrpc.v1" // SnapshotPlugin implements a snapshotter SnapshotPlugin Type = "io.containerd.snapshotter.v1" // TaskMonitorPlugin implements a task monitor @@ -124,21 +124,6 @@ func (r *Registration) URI() string { return fmt.Sprintf("%s.%s", r.Type, r.ID) } -// Service allows GRPC services to be registered with the underlying server -type Service interface { - Register(*grpc.Server) error -} - -// TTRPCService allows TTRPC services to be registered with the underlying server -type TTRPCService interface { - RegisterTTRPC(*ttrpc.Server) error -} - -// TCPService allows GRPC services to be registered with the underlying tcp server -type TCPService interface { - RegisterTCP(*grpc.Server) error -} - var register = struct { sync.RWMutex r []*Registration diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index c14aacca9..691040bc7 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/plugin" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/version" "github.com/containerd/ttrpc" @@ -38,13 +39,6 @@ import ( "github.com/sirupsen/logrus" ) -// Client for a shim server -type Client struct { - service shimapi.TaskService - context context.Context - signals chan os.Signal -} - // Publisher for events type Publisher interface { events.Publisher @@ -64,7 +58,6 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface type Shim interface { - shimapi.TaskService Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) StartShim(ctx context.Context, opts StartOpts) (string, error) } @@ -91,6 +84,19 @@ type Config struct { NoSetupLogger bool } +type ttrpcService interface { + RegisterTTRPC(*ttrpc.Server) error +} + +type taskService struct { + local shimapi.TaskService +} + +func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, t.local) + return nil +} + var ( debugFlag bool versionFlag bool @@ -158,6 +164,7 @@ func Run(id string, initFunc Init, opts ...BinaryOpts) { for _, o := range opts { o(&config) } + if err := run(id, initFunc, config); err != nil { fmt.Fprintf(os.Stderr, "%s: %s\n", id, err) os.Exit(1) @@ -208,6 +215,7 @@ func run(id string, initFunc Init, config Config) error { return err } + // Handle explicit actions switch action { case "delete": logger := logrus.WithFields(logrus.Fields{ @@ -234,6 +242,7 @@ func run(id string, initFunc Init, config Config) error { Address: addressFlag, TTRPCAddress: ttrpcAddress, } + address, err := service.StartShim(ctx, opts) if err != nil { return err @@ -242,46 +251,122 @@ func run(id string, initFunc Init, config Config) error { return err } return nil - default: - if !config.NoSetupLogger { - if err := setLogger(ctx, idFlag); err != nil { - return err - } + } + + if !config.NoSetupLogger { + if err := setLogger(ctx, idFlag); err != nil { + return err } - client := NewShimClient(ctx, service, signals) - if err := client.Serve(); err != nil { - if err != context.Canceled { - return err - } + } + + // Register event plugin + plugin.Register(&plugin.Registration{ + Type: plugin.EventPlugin, + ID: "publisher", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return publisher, nil + }, + }) + + // If service is an implementation of the task service, register it as a plugin + if ts, ok := service.(shimapi.TaskService); ok { + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return &taskService{ts}, nil + }, + }) + } + + var ( + initialized = plugin.NewPluginSet() + ttrpcServices = []ttrpcService{} + ) + plugins := plugin.Graph(func(*plugin.Registration) bool { return false }) + for _, p := range plugins { + id := p.URI() + log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id) + + initContext := plugin.NewContext( + ctx, + p, + initialized, + // NOTE: Root is empty since the shim does not support persistent storage, + // shim plugins should make use state directory for writing files to disk. + // The state directory will be destroyed when the shim if cleaned up or + // on reboot + "", + bundlePath, + ) + initContext.Address = addressFlag + initContext.TTRPCAddress = ttrpcAddress + + // load the plugin specific configuration if it is provided + //TODO: Read configuration passed into shim, or from state directory? + //if p.Config != nil { + // pc, err := config.Decode(p) + // if err != nil { + // return nil, err + // } + // initContext.Config = pc + //} + + result := p.Init(initContext) + if err := initialized.Add(result); err != nil { + return errors.Wrapf(err, "could not add plugin result to plugin set") } - // NOTE: If the shim server is down(like oom killer), the address - // socket might be leaking. - if address, err := ReadAddress("address"); err == nil { - _ = RemoveSocket(address) + instance, err := result.Instance() + if err != nil { + if plugin.IsSkipPlugin(err) { + log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id) + } else { + log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id) + } + continue } - select { - case <-publisher.Done(): - return nil - case <-time.After(5 * time.Second): - return errors.New("publisher not closed") + if src, ok := instance.(ttrpcService); ok { + logrus.WithField("id", id).Debug("registering ttrpc service") + ttrpcServices = append(ttrpcServices, src) } } + + server, err := newServer() + if err != nil { + return errors.Wrap(err, "failed creating server") + } + + for _, srv := range ttrpcServices { + if err := srv.RegisterTTRPC(server); err != nil { + return errors.Wrap(err, "failed to register service") + } + } + + if err := serve(ctx, server, signals); err != nil { + if err != context.Canceled { + return err + } + } + + // NOTE: If the shim server is down(like oom killer), the address + // socket might be leaking. + if address, err := ReadAddress("address"); err == nil { + _ = RemoveSocket(address) + } + + select { + case <-publisher.Done(): + return nil + case <-time.After(5 * time.Second): + return errors.New("publisher not closed") + } } -// NewShimClient creates a new shim server client -func NewShimClient(ctx context.Context, svc shimapi.TaskService, signals chan os.Signal) *Client { - s := &Client{ - service: svc, - context: ctx, - signals: signals, - } - return s -} - -// Serve the shim server -func (s *Client) Serve() error { +// serve serves the ttrpc API over a unix socket in the current working directory +// and blocks until the context is canceled +func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) error { dump := make(chan os.Signal, 32) setupDumpStacks(dump) @@ -289,34 +374,8 @@ func (s *Client) Serve() error { if err != nil { return err } - server, err := newServer() - if err != nil { - return errors.Wrap(err, "failed creating server") - } - logrus.Debug("registering ttrpc server") - shimapi.RegisterTaskService(server, s.service) - - if err := serve(s.context, server, socketFlag); err != nil { - return err - } - logger := logrus.WithFields(logrus.Fields{ - "pid": os.Getpid(), - "path": path, - "namespace": namespaceFlag, - }) - go func() { - for range dump { - dumpStacks(logger) - } - }() - return handleSignals(s.context, logger, s.signals) -} - -// serve serves the ttrpc API over a unix socket at the provided path -// this function does not block -func serve(ctx context.Context, server *ttrpc.Server, path string) error { - l, err := serveListener(path) + l, err := serveListener(socketFlag) if err != nil { return err } @@ -327,7 +386,17 @@ func serve(ctx context.Context, server *ttrpc.Server, path string) error { logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure") } }() - return nil + logger := logrus.WithFields(logrus.Fields{ + "pid": os.Getpid(), + "path": path, + "namespace": namespaceFlag, + }) + go func() { + for range dump { + dumpStacks(logger) + } + }() + return handleSignals(ctx, logger, signals) } func dumpStacks(logger *logrus.Entry) { diff --git a/services/server/server.go b/services/server/server.go index 52d2dc840..6a36bd4a6 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -142,13 +142,29 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) } + + // grpcService allows GRPC services to be registered with the underlying server + type grpcService interface { + Register(*grpc.Server) error + } + + // tcpService allows GRPC services to be registered with the underlying tcp server + type tcpService interface { + RegisterTCP(*grpc.Server) error + } + + // ttrpcService allows TTRPC services to be registered with the underlying server + type ttrpcService interface { + RegisterTTRPC(*ttrpc.Server) error + } + var ( grpcServer = grpc.NewServer(serverOpts...) tcpServer = grpc.NewServer(tcpServerOpts...) - grpcServices []plugin.Service - tcpServices []plugin.TCPService - ttrpcServices []plugin.TTRPCService + grpcServices []grpcService + tcpServices []tcpService + ttrpcServices []ttrpcService s = &Server{ grpcServer: grpcServer, @@ -211,13 +227,13 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { delete(required, reqID) // check for grpc services that should be registered with the server - if src, ok := instance.(plugin.Service); ok { + if src, ok := instance.(grpcService); ok { grpcServices = append(grpcServices, src) } - if src, ok := instance.(plugin.TTRPCService); ok { + if src, ok := instance.(ttrpcService); ok { ttrpcServices = append(ttrpcServices, src) } - if service, ok := instance.(plugin.TCPService); ok { + if service, ok := instance.(tcpService); ok { tcpServices = append(tcpServices, service) }