Add support for shim plugins

Refactor shim v2 to load and register plugins.
Update init shim interface to not require task service implementation on
returned service, but register as plugin if it is.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2021-08-17 11:05:07 -07:00
parent fda782a7b9
commit 8d135d2842
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
4 changed files with 161 additions and 91 deletions

View File

@ -65,7 +65,7 @@ type CRIService interface {
Run() error
// io.Closer is used by containerd to gracefully stop cri service.
io.Closer
plugin.Service
Register(*grpc.Server) error
grpcServices
}

View File

@ -20,9 +20,7 @@ import (
"fmt"
"sync"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
var (
@ -63,6 +61,8 @@ const (
ServicePlugin Type = "io.containerd.service.v1"
// GRPCPlugin implements a grpc service
GRPCPlugin Type = "io.containerd.grpc.v1"
// TTRPCPlugin implements a ttrpc shim service
TTRPCPlugin Type = "io.containerd.ttrpc.v1"
// SnapshotPlugin implements a snapshotter
SnapshotPlugin Type = "io.containerd.snapshotter.v1"
// TaskMonitorPlugin implements a task monitor
@ -124,21 +124,6 @@ func (r *Registration) URI() string {
return fmt.Sprintf("%s.%s", r.Type, r.ID)
}
// Service allows GRPC services to be registered with the underlying server
type Service interface {
Register(*grpc.Server) error
}
// TTRPCService allows TTRPC services to be registered with the underlying server
type TTRPCService interface {
RegisterTTRPC(*ttrpc.Server) error
}
// TCPService allows GRPC services to be registered with the underlying tcp server
type TCPService interface {
RegisterTCP(*grpc.Server) error
}
var register = struct {
sync.RWMutex
r []*Registration

View File

@ -30,6 +30,7 @@ import (
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/version"
"github.com/containerd/ttrpc"
@ -38,13 +39,6 @@ import (
"github.com/sirupsen/logrus"
)
// Client for a shim server
type Client struct {
service shimapi.TaskService
context context.Context
signals chan os.Signal
}
// Publisher for events
type Publisher interface {
events.Publisher
@ -64,7 +58,6 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error)
// Shim server interface
type Shim interface {
shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, opts StartOpts) (string, error)
}
@ -91,6 +84,19 @@ type Config struct {
NoSetupLogger bool
}
type ttrpcService interface {
RegisterTTRPC(*ttrpc.Server) error
}
type taskService struct {
local shimapi.TaskService
}
func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, t.local)
return nil
}
var (
debugFlag bool
versionFlag bool
@ -158,6 +164,7 @@ func Run(id string, initFunc Init, opts ...BinaryOpts) {
for _, o := range opts {
o(&config)
}
if err := run(id, initFunc, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
os.Exit(1)
@ -208,6 +215,7 @@ func run(id string, initFunc Init, config Config) error {
return err
}
// Handle explicit actions
switch action {
case "delete":
logger := logrus.WithFields(logrus.Fields{
@ -234,6 +242,7 @@ func run(id string, initFunc Init, config Config) error {
Address: addressFlag,
TTRPCAddress: ttrpcAddress,
}
address, err := service.StartShim(ctx, opts)
if err != nil {
return err
@ -242,46 +251,122 @@ func run(id string, initFunc Init, config Config) error {
return err
}
return nil
default:
if !config.NoSetupLogger {
if err := setLogger(ctx, idFlag); err != nil {
return err
}
}
if !config.NoSetupLogger {
if err := setLogger(ctx, idFlag); err != nil {
return err
}
client := NewShimClient(ctx, service, signals)
if err := client.Serve(); err != nil {
if err != context.Canceled {
return err
}
}
// Register event plugin
plugin.Register(&plugin.Registration{
Type: plugin.EventPlugin,
ID: "publisher",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return publisher, nil
},
})
// If service is an implementation of the task service, register it as a plugin
if ts, ok := service.(shimapi.TaskService); ok {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "task",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return &taskService{ts}, nil
},
})
}
var (
initialized = plugin.NewPluginSet()
ttrpcServices = []ttrpcService{}
)
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
for _, p := range plugins {
id := p.URI()
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
initContext := plugin.NewContext(
ctx,
p,
initialized,
// NOTE: Root is empty since the shim does not support persistent storage,
// shim plugins should make use state directory for writing files to disk.
// The state directory will be destroyed when the shim if cleaned up or
// on reboot
"",
bundlePath,
)
initContext.Address = addressFlag
initContext.TTRPCAddress = ttrpcAddress
// load the plugin specific configuration if it is provided
//TODO: Read configuration passed into shim, or from state directory?
//if p.Config != nil {
// pc, err := config.Decode(p)
// if err != nil {
// return nil, err
// }
// initContext.Config = pc
//}
result := p.Init(initContext)
if err := initialized.Add(result); err != nil {
return errors.Wrapf(err, "could not add plugin result to plugin set")
}
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
if address, err := ReadAddress("address"); err == nil {
_ = RemoveSocket(address)
instance, err := result.Instance()
if err != nil {
if plugin.IsSkipPlugin(err) {
log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
} else {
log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
}
continue
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
if src, ok := instance.(ttrpcService); ok {
logrus.WithField("id", id).Debug("registering ttrpc service")
ttrpcServices = append(ttrpcServices, src)
}
}
server, err := newServer()
if err != nil {
return errors.Wrap(err, "failed creating server")
}
for _, srv := range ttrpcServices {
if err := srv.RegisterTTRPC(server); err != nil {
return errors.Wrap(err, "failed to register service")
}
}
if err := serve(ctx, server, signals); err != nil {
if err != context.Canceled {
return err
}
}
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
if address, err := ReadAddress("address"); err == nil {
_ = RemoveSocket(address)
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
}
}
// NewShimClient creates a new shim server client
func NewShimClient(ctx context.Context, svc shimapi.TaskService, signals chan os.Signal) *Client {
s := &Client{
service: svc,
context: ctx,
signals: signals,
}
return s
}
// Serve the shim server
func (s *Client) Serve() error {
// serve serves the ttrpc API over a unix socket in the current working directory
// and blocks until the context is canceled
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) error {
dump := make(chan os.Signal, 32)
setupDumpStacks(dump)
@ -289,34 +374,8 @@ func (s *Client) Serve() error {
if err != nil {
return err
}
server, err := newServer()
if err != nil {
return errors.Wrap(err, "failed creating server")
}
logrus.Debug("registering ttrpc server")
shimapi.RegisterTaskService(server, s.service)
if err := serve(s.context, server, socketFlag); err != nil {
return err
}
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
return handleSignals(s.context, logger, s.signals)
}
// serve serves the ttrpc API over a unix socket at the provided path
// this function does not block
func serve(ctx context.Context, server *ttrpc.Server, path string) error {
l, err := serveListener(path)
l, err := serveListener(socketFlag)
if err != nil {
return err
}
@ -327,7 +386,17 @@ func serve(ctx context.Context, server *ttrpc.Server, path string) error {
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
}
}()
return nil
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
return handleSignals(ctx, logger, signals)
}
func dumpStacks(logger *logrus.Entry) {

View File

@ -142,13 +142,29 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
// grpcService allows GRPC services to be registered with the underlying server
type grpcService interface {
Register(*grpc.Server) error
}
// tcpService allows GRPC services to be registered with the underlying tcp server
type tcpService interface {
RegisterTCP(*grpc.Server) error
}
// ttrpcService allows TTRPC services to be registered with the underlying server
type ttrpcService interface {
RegisterTTRPC(*ttrpc.Server) error
}
var (
grpcServer = grpc.NewServer(serverOpts...)
tcpServer = grpc.NewServer(tcpServerOpts...)
grpcServices []plugin.Service
tcpServices []plugin.TCPService
ttrpcServices []plugin.TTRPCService
grpcServices []grpcService
tcpServices []tcpService
ttrpcServices []ttrpcService
s = &Server{
grpcServer: grpcServer,
@ -211,13 +227,13 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
delete(required, reqID)
// check for grpc services that should be registered with the server
if src, ok := instance.(plugin.Service); ok {
if src, ok := instance.(grpcService); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(plugin.TTRPCService); ok {
if src, ok := instance.(ttrpcService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if service, ok := instance.(plugin.TCPService); ok {
if service, ok := instance.(tcpService); ok {
tcpServices = append(tcpServices, service)
}