diff --git a/pkg/cri/server/service.go b/pkg/cri/server/service.go index 8d8cf0f73..e8901d819 100644 --- a/pkg/cri/server/service.go +++ b/pkg/cri/server/service.go @@ -65,7 +65,7 @@ type CRIService interface { Run() error // io.Closer is used by containerd to gracefully stop cri service. io.Closer - plugin.Service + Register(*grpc.Server) error grpcServices } diff --git a/plugin/plugin.go b/plugin/plugin.go index 9bbfb0d9e..b8e5157c5 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -20,9 +20,7 @@ import ( "fmt" "sync" - "github.com/containerd/ttrpc" "github.com/pkg/errors" - "google.golang.org/grpc" ) var ( @@ -63,6 +61,8 @@ const ( ServicePlugin Type = "io.containerd.service.v1" // GRPCPlugin implements a grpc service GRPCPlugin Type = "io.containerd.grpc.v1" + // TTRPCPlugin implements a ttrpc shim service + TTRPCPlugin Type = "io.containerd.ttrpc.v1" // SnapshotPlugin implements a snapshotter SnapshotPlugin Type = "io.containerd.snapshotter.v1" // TaskMonitorPlugin implements a task monitor @@ -124,21 +124,6 @@ func (r *Registration) URI() string { return fmt.Sprintf("%s.%s", r.Type, r.ID) } -// Service allows GRPC services to be registered with the underlying server -type Service interface { - Register(*grpc.Server) error -} - -// TTRPCService allows TTRPC services to be registered with the underlying server -type TTRPCService interface { - RegisterTTRPC(*ttrpc.Server) error -} - -// TCPService allows GRPC services to be registered with the underlying tcp server -type TCPService interface { - RegisterTCP(*grpc.Server) error -} - var register = struct { sync.RWMutex r []*Registration diff --git a/runtime/v2/shim/shim.go b/runtime/v2/shim/shim.go index c14aacca9..691040bc7 100644 --- a/runtime/v2/shim/shim.go +++ b/runtime/v2/shim/shim.go @@ -30,6 +30,7 @@ import ( "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/plugin" shimapi "github.com/containerd/containerd/runtime/v2/task" "github.com/containerd/containerd/version" "github.com/containerd/ttrpc" @@ -38,13 +39,6 @@ import ( "github.com/sirupsen/logrus" ) -// Client for a shim server -type Client struct { - service shimapi.TaskService - context context.Context - signals chan os.Signal -} - // Publisher for events type Publisher interface { events.Publisher @@ -64,7 +58,6 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error) // Shim server interface type Shim interface { - shimapi.TaskService Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error) StartShim(ctx context.Context, opts StartOpts) (string, error) } @@ -91,6 +84,19 @@ type Config struct { NoSetupLogger bool } +type ttrpcService interface { + RegisterTTRPC(*ttrpc.Server) error +} + +type taskService struct { + local shimapi.TaskService +} + +func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error { + shimapi.RegisterTaskService(server, t.local) + return nil +} + var ( debugFlag bool versionFlag bool @@ -158,6 +164,7 @@ func Run(id string, initFunc Init, opts ...BinaryOpts) { for _, o := range opts { o(&config) } + if err := run(id, initFunc, config); err != nil { fmt.Fprintf(os.Stderr, "%s: %s\n", id, err) os.Exit(1) @@ -208,6 +215,7 @@ func run(id string, initFunc Init, config Config) error { return err } + // Handle explicit actions switch action { case "delete": logger := logrus.WithFields(logrus.Fields{ @@ -234,6 +242,7 @@ func run(id string, initFunc Init, config Config) error { Address: addressFlag, TTRPCAddress: ttrpcAddress, } + address, err := service.StartShim(ctx, opts) if err != nil { return err @@ -242,46 +251,122 @@ func run(id string, initFunc Init, config Config) error { return err } return nil - default: - if !config.NoSetupLogger { - if err := setLogger(ctx, idFlag); err != nil { - return err - } + } + + if !config.NoSetupLogger { + if err := setLogger(ctx, idFlag); err != nil { + return err } - client := NewShimClient(ctx, service, signals) - if err := client.Serve(); err != nil { - if err != context.Canceled { - return err - } + } + + // Register event plugin + plugin.Register(&plugin.Registration{ + Type: plugin.EventPlugin, + ID: "publisher", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return publisher, nil + }, + }) + + // If service is an implementation of the task service, register it as a plugin + if ts, ok := service.(shimapi.TaskService); ok { + plugin.Register(&plugin.Registration{ + Type: plugin.TTRPCPlugin, + ID: "task", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + return &taskService{ts}, nil + }, + }) + } + + var ( + initialized = plugin.NewPluginSet() + ttrpcServices = []ttrpcService{} + ) + plugins := plugin.Graph(func(*plugin.Registration) bool { return false }) + for _, p := range plugins { + id := p.URI() + log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id) + + initContext := plugin.NewContext( + ctx, + p, + initialized, + // NOTE: Root is empty since the shim does not support persistent storage, + // shim plugins should make use state directory for writing files to disk. + // The state directory will be destroyed when the shim if cleaned up or + // on reboot + "", + bundlePath, + ) + initContext.Address = addressFlag + initContext.TTRPCAddress = ttrpcAddress + + // load the plugin specific configuration if it is provided + //TODO: Read configuration passed into shim, or from state directory? + //if p.Config != nil { + // pc, err := config.Decode(p) + // if err != nil { + // return nil, err + // } + // initContext.Config = pc + //} + + result := p.Init(initContext) + if err := initialized.Add(result); err != nil { + return errors.Wrapf(err, "could not add plugin result to plugin set") } - // NOTE: If the shim server is down(like oom killer), the address - // socket might be leaking. - if address, err := ReadAddress("address"); err == nil { - _ = RemoveSocket(address) + instance, err := result.Instance() + if err != nil { + if plugin.IsSkipPlugin(err) { + log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id) + } else { + log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id) + } + continue } - select { - case <-publisher.Done(): - return nil - case <-time.After(5 * time.Second): - return errors.New("publisher not closed") + if src, ok := instance.(ttrpcService); ok { + logrus.WithField("id", id).Debug("registering ttrpc service") + ttrpcServices = append(ttrpcServices, src) } } + + server, err := newServer() + if err != nil { + return errors.Wrap(err, "failed creating server") + } + + for _, srv := range ttrpcServices { + if err := srv.RegisterTTRPC(server); err != nil { + return errors.Wrap(err, "failed to register service") + } + } + + if err := serve(ctx, server, signals); err != nil { + if err != context.Canceled { + return err + } + } + + // NOTE: If the shim server is down(like oom killer), the address + // socket might be leaking. + if address, err := ReadAddress("address"); err == nil { + _ = RemoveSocket(address) + } + + select { + case <-publisher.Done(): + return nil + case <-time.After(5 * time.Second): + return errors.New("publisher not closed") + } } -// NewShimClient creates a new shim server client -func NewShimClient(ctx context.Context, svc shimapi.TaskService, signals chan os.Signal) *Client { - s := &Client{ - service: svc, - context: ctx, - signals: signals, - } - return s -} - -// Serve the shim server -func (s *Client) Serve() error { +// serve serves the ttrpc API over a unix socket in the current working directory +// and blocks until the context is canceled +func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) error { dump := make(chan os.Signal, 32) setupDumpStacks(dump) @@ -289,34 +374,8 @@ func (s *Client) Serve() error { if err != nil { return err } - server, err := newServer() - if err != nil { - return errors.Wrap(err, "failed creating server") - } - logrus.Debug("registering ttrpc server") - shimapi.RegisterTaskService(server, s.service) - - if err := serve(s.context, server, socketFlag); err != nil { - return err - } - logger := logrus.WithFields(logrus.Fields{ - "pid": os.Getpid(), - "path": path, - "namespace": namespaceFlag, - }) - go func() { - for range dump { - dumpStacks(logger) - } - }() - return handleSignals(s.context, logger, s.signals) -} - -// serve serves the ttrpc API over a unix socket at the provided path -// this function does not block -func serve(ctx context.Context, server *ttrpc.Server, path string) error { - l, err := serveListener(path) + l, err := serveListener(socketFlag) if err != nil { return err } @@ -327,7 +386,17 @@ func serve(ctx context.Context, server *ttrpc.Server, path string) error { logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure") } }() - return nil + logger := logrus.WithFields(logrus.Fields{ + "pid": os.Getpid(), + "path": path, + "namespace": namespaceFlag, + }) + go func() { + for range dump { + dumpStacks(logger) + } + }() + return handleSignals(ctx, logger, signals) } func dumpStacks(logger *logrus.Entry) { diff --git a/services/server/server.go b/services/server/server.go index 52d2dc840..6a36bd4a6 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -142,13 +142,29 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig))) } + + // grpcService allows GRPC services to be registered with the underlying server + type grpcService interface { + Register(*grpc.Server) error + } + + // tcpService allows GRPC services to be registered with the underlying tcp server + type tcpService interface { + RegisterTCP(*grpc.Server) error + } + + // ttrpcService allows TTRPC services to be registered with the underlying server + type ttrpcService interface { + RegisterTTRPC(*ttrpc.Server) error + } + var ( grpcServer = grpc.NewServer(serverOpts...) tcpServer = grpc.NewServer(tcpServerOpts...) - grpcServices []plugin.Service - tcpServices []plugin.TCPService - ttrpcServices []plugin.TTRPCService + grpcServices []grpcService + tcpServices []tcpService + ttrpcServices []ttrpcService s = &Server{ grpcServer: grpcServer, @@ -211,13 +227,13 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { delete(required, reqID) // check for grpc services that should be registered with the server - if src, ok := instance.(plugin.Service); ok { + if src, ok := instance.(grpcService); ok { grpcServices = append(grpcServices, src) } - if src, ok := instance.(plugin.TTRPCService); ok { + if src, ok := instance.(ttrpcService); ok { ttrpcServices = append(ttrpcServices, src) } - if service, ok := instance.(plugin.TCPService); ok { + if service, ok := instance.(tcpService); ok { tcpServices = append(tcpServices, service) }