Merge pull request #5817 from dmcgowan/shim-plugins

Add support for shim plugins
This commit is contained in:
Fu Wei 2021-09-12 18:18:20 +08:00 committed by GitHub
commit e1ad779107
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 161 additions and 91 deletions

View File

@ -65,7 +65,7 @@ type CRIService interface {
Run() error
// io.Closer is used by containerd to gracefully stop cri service.
io.Closer
plugin.Service
Register(*grpc.Server) error
grpcServices
}

View File

@ -20,9 +20,7 @@ import (
"fmt"
"sync"
"github.com/containerd/ttrpc"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
var (
@ -63,6 +61,8 @@ const (
ServicePlugin Type = "io.containerd.service.v1"
// GRPCPlugin implements a grpc service
GRPCPlugin Type = "io.containerd.grpc.v1"
// TTRPCPlugin implements a ttrpc shim service
TTRPCPlugin Type = "io.containerd.ttrpc.v1"
// SnapshotPlugin implements a snapshotter
SnapshotPlugin Type = "io.containerd.snapshotter.v1"
// TaskMonitorPlugin implements a task monitor
@ -124,21 +124,6 @@ func (r *Registration) URI() string {
return fmt.Sprintf("%s.%s", r.Type, r.ID)
}
// Service allows GRPC services to be registered with the underlying server
type Service interface {
Register(*grpc.Server) error
}
// TTRPCService allows TTRPC services to be registered with the underlying server
type TTRPCService interface {
RegisterTTRPC(*ttrpc.Server) error
}
// TCPService allows GRPC services to be registered with the underlying tcp server
type TCPService interface {
RegisterTCP(*grpc.Server) error
}
var register = struct {
sync.RWMutex
r []*Registration

View File

@ -30,6 +30,7 @@ import (
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
shimapi "github.com/containerd/containerd/runtime/v2/task"
"github.com/containerd/containerd/version"
"github.com/containerd/ttrpc"
@ -38,13 +39,6 @@ import (
"github.com/sirupsen/logrus"
)
// Client for a shim server
type Client struct {
service shimapi.TaskService
context context.Context
signals chan os.Signal
}
// Publisher for events
type Publisher interface {
events.Publisher
@ -64,7 +58,6 @@ type Init func(context.Context, string, Publisher, func()) (Shim, error)
// Shim server interface
type Shim interface {
shimapi.TaskService
Cleanup(ctx context.Context) (*shimapi.DeleteResponse, error)
StartShim(ctx context.Context, opts StartOpts) (string, error)
}
@ -91,6 +84,19 @@ type Config struct {
NoSetupLogger bool
}
type ttrpcService interface {
RegisterTTRPC(*ttrpc.Server) error
}
type taskService struct {
local shimapi.TaskService
}
func (t *taskService) RegisterTTRPC(server *ttrpc.Server) error {
shimapi.RegisterTaskService(server, t.local)
return nil
}
var (
debugFlag bool
versionFlag bool
@ -158,6 +164,7 @@ func Run(id string, initFunc Init, opts ...BinaryOpts) {
for _, o := range opts {
o(&config)
}
if err := run(id, initFunc, config); err != nil {
fmt.Fprintf(os.Stderr, "%s: %s\n", id, err)
os.Exit(1)
@ -208,6 +215,7 @@ func run(id string, initFunc Init, config Config) error {
return err
}
// Handle explicit actions
switch action {
case "delete":
logger := logrus.WithFields(logrus.Fields{
@ -234,6 +242,7 @@ func run(id string, initFunc Init, config Config) error {
Address: addressFlag,
TTRPCAddress: ttrpcAddress,
}
address, err := service.StartShim(ctx, opts)
if err != nil {
return err
@ -242,46 +251,122 @@ func run(id string, initFunc Init, config Config) error {
return err
}
return nil
default:
if !config.NoSetupLogger {
if err := setLogger(ctx, idFlag); err != nil {
return err
}
}
if !config.NoSetupLogger {
if err := setLogger(ctx, idFlag); err != nil {
return err
}
client := NewShimClient(ctx, service, signals)
if err := client.Serve(); err != nil {
if err != context.Canceled {
return err
}
}
// Register event plugin
plugin.Register(&plugin.Registration{
Type: plugin.EventPlugin,
ID: "publisher",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return publisher, nil
},
})
// If service is an implementation of the task service, register it as a plugin
if ts, ok := service.(shimapi.TaskService); ok {
plugin.Register(&plugin.Registration{
Type: plugin.TTRPCPlugin,
ID: "task",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return &taskService{ts}, nil
},
})
}
var (
initialized = plugin.NewPluginSet()
ttrpcServices = []ttrpcService{}
)
plugins := plugin.Graph(func(*plugin.Registration) bool { return false })
for _, p := range plugins {
id := p.URI()
log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)
initContext := plugin.NewContext(
ctx,
p,
initialized,
// NOTE: Root is empty since the shim does not support persistent storage,
// shim plugins should make use state directory for writing files to disk.
// The state directory will be destroyed when the shim if cleaned up or
// on reboot
"",
bundlePath,
)
initContext.Address = addressFlag
initContext.TTRPCAddress = ttrpcAddress
// load the plugin specific configuration if it is provided
//TODO: Read configuration passed into shim, or from state directory?
//if p.Config != nil {
// pc, err := config.Decode(p)
// if err != nil {
// return nil, err
// }
// initContext.Config = pc
//}
result := p.Init(initContext)
if err := initialized.Add(result); err != nil {
return errors.Wrapf(err, "could not add plugin result to plugin set")
}
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
if address, err := ReadAddress("address"); err == nil {
_ = RemoveSocket(address)
instance, err := result.Instance()
if err != nil {
if plugin.IsSkipPlugin(err) {
log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
} else {
log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)
}
continue
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
if src, ok := instance.(ttrpcService); ok {
logrus.WithField("id", id).Debug("registering ttrpc service")
ttrpcServices = append(ttrpcServices, src)
}
}
server, err := newServer()
if err != nil {
return errors.Wrap(err, "failed creating server")
}
for _, srv := range ttrpcServices {
if err := srv.RegisterTTRPC(server); err != nil {
return errors.Wrap(err, "failed to register service")
}
}
if err := serve(ctx, server, signals); err != nil {
if err != context.Canceled {
return err
}
}
// NOTE: If the shim server is down(like oom killer), the address
// socket might be leaking.
if address, err := ReadAddress("address"); err == nil {
_ = RemoveSocket(address)
}
select {
case <-publisher.Done():
return nil
case <-time.After(5 * time.Second):
return errors.New("publisher not closed")
}
}
// NewShimClient creates a new shim server client
func NewShimClient(ctx context.Context, svc shimapi.TaskService, signals chan os.Signal) *Client {
s := &Client{
service: svc,
context: ctx,
signals: signals,
}
return s
}
// Serve the shim server
func (s *Client) Serve() error {
// serve serves the ttrpc API over a unix socket in the current working directory
// and blocks until the context is canceled
func serve(ctx context.Context, server *ttrpc.Server, signals chan os.Signal) error {
dump := make(chan os.Signal, 32)
setupDumpStacks(dump)
@ -289,34 +374,8 @@ func (s *Client) Serve() error {
if err != nil {
return err
}
server, err := newServer()
if err != nil {
return errors.Wrap(err, "failed creating server")
}
logrus.Debug("registering ttrpc server")
shimapi.RegisterTaskService(server, s.service)
if err := serve(s.context, server, socketFlag); err != nil {
return err
}
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
return handleSignals(s.context, logger, s.signals)
}
// serve serves the ttrpc API over a unix socket at the provided path
// this function does not block
func serve(ctx context.Context, server *ttrpc.Server, path string) error {
l, err := serveListener(path)
l, err := serveListener(socketFlag)
if err != nil {
return err
}
@ -327,7 +386,17 @@ func serve(ctx context.Context, server *ttrpc.Server, path string) error {
logrus.WithError(err).Fatal("containerd-shim: ttrpc server failure")
}
}()
return nil
logger := logrus.WithFields(logrus.Fields{
"pid": os.Getpid(),
"path": path,
"namespace": namespaceFlag,
})
go func() {
for range dump {
dumpStacks(logger)
}
}()
return handleSignals(ctx, logger, signals)
}
func dumpStacks(logger *logrus.Entry) {

View File

@ -142,13 +142,29 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))
}
// grpcService allows GRPC services to be registered with the underlying server
type grpcService interface {
Register(*grpc.Server) error
}
// tcpService allows GRPC services to be registered with the underlying tcp server
type tcpService interface {
RegisterTCP(*grpc.Server) error
}
// ttrpcService allows TTRPC services to be registered with the underlying server
type ttrpcService interface {
RegisterTTRPC(*ttrpc.Server) error
}
var (
grpcServer = grpc.NewServer(serverOpts...)
tcpServer = grpc.NewServer(tcpServerOpts...)
grpcServices []plugin.Service
tcpServices []plugin.TCPService
ttrpcServices []plugin.TTRPCService
grpcServices []grpcService
tcpServices []tcpService
ttrpcServices []ttrpcService
s = &Server{
grpcServer: grpcServer,
@ -211,13 +227,13 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
delete(required, reqID)
// check for grpc services that should be registered with the server
if src, ok := instance.(plugin.Service); ok {
if src, ok := instance.(grpcService); ok {
grpcServices = append(grpcServices, src)
}
if src, ok := instance.(plugin.TTRPCService); ok {
if src, ok := instance.(ttrpcService); ok {
ttrpcServices = append(ttrpcServices, src)
}
if service, ok := instance.(plugin.TCPService); ok {
if service, ok := instance.(tcpService); ok {
tcpServices = append(tcpServices, service)
}