plugin: refactor plugin system to support initialization reporting

Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
Stephen J Day 2017-10-09 20:43:04 -07:00
parent fe52d9369f
commit 8508e8252b
No known key found for this signature in database
GPG Key ID: 67B3DED84EDC823F
21 changed files with 216 additions and 80 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -27,11 +28,13 @@ func init() {
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
md, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
return NewWalkingDiff(md.(*metadata.DB).ContentStore())
},
})

View File

@ -23,6 +23,7 @@ import (
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/reaper"
"github.com/containerd/containerd/runtime"
@ -30,9 +31,9 @@ import (
runc "github.com/containerd/go-runc"
"github.com/containerd/typeurl"
google_protobuf "github.com/golang/protobuf/ptypes/empty"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)
@ -49,9 +50,9 @@ const (
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.RuntimePlugin,
ID: "linux",
Init: New,
Type: plugin.RuntimePlugin,
ID: "linux",
InitFn: New,
Requires: []plugin.Type{
plugin.TaskMonitorPlugin,
plugin.MetadataPlugin,
@ -92,6 +93,8 @@ type Config struct {
// New returns a configured runtime
func New(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []ocispec.Platform{platforms.DefaultSpec()}
if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err
}
@ -121,6 +124,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err != nil {
return nil, err
}
// TODO: need to add the tasks to the monitor
for _, t := range tasks {
if err := r.tasks.AddWithNamespace(t.namespace, t); err != nil {

View File

@ -9,6 +9,7 @@ import (
"github.com/containerd/containerd/linux"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
metrics "github.com/docker/go-metrics"
@ -24,7 +25,7 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.TaskMonitorPlugin,
ID: "cgroups",
Init: New,
InitFn: New,
Config: &Config{},
})
}
@ -44,6 +45,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if ns != nil {
metrics.Register(ns)
}
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
return &cgroupsMonitor{
collector: collector,
oom: oom,

View File

@ -2,48 +2,123 @@ package plugin
import (
"context"
"fmt"
"path/filepath"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/log"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
// NewContext returns a new plugin InitContext
func NewContext(ctx context.Context, plugins map[Type]map[string]interface{}, root, state, id string) *InitContext {
return &InitContext{
plugins: plugins,
Root: filepath.Join(root, id),
State: filepath.Join(state, id),
Context: log.WithModule(ctx, id),
}
}
// InitContext is used for plugin inititalization
type InitContext struct {
Context context.Context
Root string
State string
Address string
Context context.Context
Config interface{}
Address string
Events *events.Exchange
plugins map[Type]map[string]interface{}
Meta *Meta // plugins can fill in metadata at init.
plugins *PluginSet
}
// NewContext returns a new plugin InitContext
func NewContext(ctx context.Context, r *Registration, plugins *PluginSet, root, state string) *InitContext {
return &InitContext{
Context: log.WithModule(ctx, r.URI()),
Root: filepath.Join(root, r.URI()),
State: filepath.Join(state, r.URI()),
Meta: &Meta{
Exports: map[string]string{},
},
plugins: plugins,
}
}
// Get returns the first plugin by its type
func (i *InitContext) Get(t Type) (interface{}, error) {
for _, v := range i.plugins[t] {
return v, nil
}
return nil, fmt.Errorf("no plugins registered for %s", t)
return i.plugins.Get(t)
}
// GetAll returns all plugins with the specific type
func (i *InitContext) GetAll(t Type) (map[string]interface{}, error) {
p, ok := i.plugins[t]
if !ok {
return nil, fmt.Errorf("no plugins registered for %s", t)
// Meta contains information gathered from the registration and initialization
// process.
type Meta struct {
Platforms []ocispec.Platform // platforms supported by plugin
Exports map[string]string // values exported by plugin
Capabilities []string // feature switches for plugin
}
// Plugin represents an initialized plugin, used with an init context.
type Plugin struct {
Registration *Registration // registration, as initialized
Config interface{} // config, as initialized
Meta *Meta
instance interface{}
err error // will be set if there was an error initializing the plugin
}
func (p *Plugin) Err() error {
return p.err
}
func (p *Plugin) Instance() (interface{}, error) {
return p.instance, p.err
}
// PluginSet defines a plugin collection, used with InitContext.
//
// This maintains ordering and unique indexing over the set.
//
// After iteratively instantiating plugins, this set should represent, the
// ordered, initialization set of plugins for a containerd instance.
type PluginSet struct {
ordered []*Plugin // order of initialization
byTypeAndID map[Type]map[string]*Plugin
}
func NewPluginSet() *PluginSet {
return &PluginSet{
byTypeAndID: make(map[Type]map[string]*Plugin),
}
}
func (ps *PluginSet) Add(p *Plugin) error {
if byID, typeok := ps.byTypeAndID[p.Registration.Type]; !typeok {
ps.byTypeAndID[p.Registration.Type] = map[string]*Plugin{
p.Registration.ID: p,
}
} else if _, idok := byID[p.Registration.ID]; !idok {
byID[p.Registration.ID] = p
} else {
return errors.Wrapf(errdefs.ErrAlreadyExists, "plugin %v already initialized", p.Registration.URI())
}
ps.ordered = append(ps.ordered, p)
return nil
}
// Get returns the first plugin by its type
func (ps *PluginSet) Get(t Type) (interface{}, error) {
for _, v := range ps.byTypeAndID[t] {
return v.Instance()
}
return nil, errors.Wrapf(errdefs.ErrNotFound, "no plugins registered for %s", t)
}
func (i *InitContext) GetAll() []*Plugin {
return i.plugins.ordered
}
// GetByType returns all plugins with the specific type.
func (i *InitContext) GetByType(t Type) (map[string]*Plugin, error) {
p, ok := i.plugins.byTypeAndID[t]
if !ok {
return nil, errors.Wrapf(errdefs.ErrNotFound, "no plugins registered for %s", t)
}
return p, nil
}

View File

@ -31,6 +31,8 @@ func IsSkipPlugin(err error) bool {
// Type is the type of the plugin
type Type string
func (t Type) String() string { return string(t) }
const (
// RuntimePlugin implements a runtime
RuntimePlugin Type = "io.containerd.runtime.v1"
@ -54,9 +56,22 @@ type Registration struct {
ID string
Config interface{}
Requires []Type
Init func(*InitContext) (interface{}, error)
added bool
// InitFn is called when initializing a plugin. The registration and
// context are passed in. The init function may modify the registration to
// add exports, capabilites and platform support declarations.
InitFn func(*InitContext) (interface{}, error)
}
func (r *Registration) Init(ic *InitContext) *Plugin {
p, err := r.InitFn(ic)
return &Plugin{
Registration: r,
Config: ic.Config,
Meta: ic.Meta,
instance: p,
err: err,
}
}
// URI returns the full plugin URI

View File

@ -1,7 +1,6 @@
package server
import (
"errors"
"expvar"
"net"
"net/http"
@ -29,6 +28,7 @@ import (
"github.com/containerd/containerd/snapshot"
metrics "github.com/docker/go-metrics"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -66,7 +66,7 @@ func New(ctx context.Context, config *Config) (*Server, error) {
rpc: rpc,
events: events.NewExchange(),
}
initialized = make(map[plugin.Type]map[string]interface{})
initialized = plugin.NewPluginSet()
)
for _, p := range plugins {
id := p.URI()
@ -74,10 +74,10 @@ func New(ctx context.Context, config *Config) (*Server, error) {
initContext := plugin.NewContext(
ctx,
p,
initialized,
config.Root,
config.State,
id,
)
initContext.Events = s.events
initContext.Address = config.GRPC.Address
@ -90,7 +90,12 @@ func New(ctx context.Context, config *Config) (*Server, error) {
}
initContext.Config = pluginConfig
}
instance, err := p.Init(initContext)
result := p.Init(initContext)
if err := initialized.Add(result); err != nil {
return nil, errors.Wrapf(err, "could not add plugin result to plugin set")
}
instance, err := result.Instance()
if err != nil {
if plugin.IsSkipPlugin(err) {
log.G(ctx).WithField("type", p.Type).Infof("skip loading plugin %q...", id)
@ -99,14 +104,6 @@ func New(ctx context.Context, config *Config) (*Server, error) {
}
continue
}
if types, ok := initialized[p.Type]; ok {
types[p.ID] = instance
} else {
initialized[p.Type] = map[string]interface{}{
p.ID: instance,
}
}
// check for grpc services that should be registered with the server
if service, ok := instance.(plugin.Service); ok {
services = append(services, service)
@ -171,7 +168,8 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
plugin.Register(&plugin.Registration{
Type: plugin.ContentPlugin,
ID: "content",
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Exports["root"] = ic.Root
return local.NewStore(ic.Root)
},
})
@ -182,7 +180,7 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
plugin.ContentPlugin,
plugin.SnapshotPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
if err := os.MkdirAll(ic.Root, 0711); err != nil {
return nil, err
}
@ -191,17 +189,26 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
return nil, err
}
rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin)
snapshottersRaw, err := ic.GetByType(plugin.SnapshotPlugin)
if err != nil {
return nil, err
}
snapshotters := make(map[string]snapshot.Snapshotter)
for name, sn := range rawSnapshotters {
for name, sn := range snapshottersRaw {
sn, err := sn.Instance()
if err != nil {
log.G(ic.Context).WithError(err).
Warnf("could not use snapshotter %v in metadata plugin", name)
continue
}
snapshotters[name] = sn.(snapshot.Snapshotter)
}
db, err := bolt.Open(filepath.Join(ic.Root, "meta.db"), 0644, nil)
path := filepath.Join(ic.Root, "meta.db")
ic.Meta.Exports["path"] = path
db, err := bolt.Open(path, 0644, nil)
if err != nil {
return nil, err
}

View File

@ -23,7 +23,7 @@ func init() {
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err

View File

@ -41,19 +41,22 @@ func init() {
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: NewService,
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
s, err := NewService(m.(*metadata.DB).ContentStore(), ic.Events)
return s, err
},
})
}
func NewService(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
func NewService(cs content.Store, publisher events.Publisher) (*Service, error) {
return &Service{
store: m.(*metadata.DB).ContentStore(),
publisher: ic.Events,
store: cs,
publisher: publisher,
}, nil
}

View File

@ -32,8 +32,8 @@ func init() {
Config: &config{
Order: []string{"walking"},
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
differs, err := ic.GetAll(plugin.DiffPlugin)
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
differs, err := ic.GetByType(plugin.DiffPlugin)
if err != nil {
return nil, err
}
@ -41,10 +41,15 @@ func init() {
orderedNames := ic.Config.(*config).Order
ordered := make([]plugin.Differ, len(orderedNames))
for i, n := range orderedNames {
differ, ok := differs[n]
differp, ok := differs[n]
if !ok {
return nil, errors.Errorf("needed differ not loaded: %s", n)
}
differ, err := differp.Instance()
if err != nil {
return nil, errors.Wrapf(err, "could not load required differ due plugin init error: %s", n)
}
ordered[i] = differ.(plugin.Differ)
}

View File

@ -15,7 +15,7 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "events",
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return NewService(ic.Events), nil
},
})

View File

@ -16,11 +16,13 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "healthcheck",
Init: NewService,
InitFn: func(*plugin.InitContext) (interface{}, error) {
return NewService()
},
})
}
func NewService(ic *plugin.InitContext) (interface{}, error) {
func NewService() (*Service, error) {
return &Service{
health.NewServer(),
}, nil

View File

@ -23,7 +23,7 @@ func init() {
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err

View File

@ -24,7 +24,7 @@ func init() {
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err

View File

@ -25,7 +25,7 @@ func init() {
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
Init: newService,
InitFn: newService,
})
}

View File

@ -2,6 +2,7 @@ package tasks
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
@ -46,15 +47,16 @@ func init() {
plugin.RuntimePlugin,
plugin.MetadataPlugin,
},
Init: New,
InitFn: New,
})
}
func New(ic *plugin.InitContext) (interface{}, error) {
rt, err := ic.GetAll(plugin.RuntimePlugin)
rt, err := ic.GetByType(plugin.RuntimePlugin)
if err != nil {
return nil, err
}
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
@ -62,9 +64,18 @@ func New(ic *plugin.InitContext) (interface{}, error) {
cs := m.(*metadata.DB).ContentStore()
runtimes := make(map[string]runtime.Runtime)
for _, rr := range rt {
r := rr.(runtime.Runtime)
ri, err := rr.Instance()
if err != nil {
log.G(ic.Context).WithError(err).Warn("could not load runtime instance due to initialization error")
continue
}
r := ri.(runtime.Runtime)
runtimes[r.ID()] = r
}
if len(runtimes) == 0 {
return nil, errors.New("no runtimes available to create task service")
}
return &Service{
runtimes: runtimes,
db: m.(*metadata.DB),

View File

@ -13,9 +13,9 @@ var _ api.VersionServer = &Service{}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "version",
Init: New,
Type: plugin.GRPCPlugin,
ID: "version",
InitFn: New,
})
}

View File

@ -12,9 +12,11 @@ import (
"github.com/containerd/btrfs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshot/storage"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -23,7 +25,9 @@ func init() {
plugin.Register(&plugin.Registration{
ID: "btrfs",
Type: plugin.SnapshotPlugin,
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = []ocispec.Platform{platforms.DefaultSpec()}
ic.Meta.Exports = map[string]string{"root": ic.Root}
return NewSnapshotter(ic.Root)
},
})

View File

@ -9,6 +9,7 @@ import (
"github.com/containerd/containerd/fs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshot/storage"
@ -19,7 +20,8 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.SnapshotPlugin,
ID: "naive",
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
return NewSnapshotter(ic.Root)
},
})

View File

@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/fs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshot/storage"
@ -24,7 +25,9 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.SnapshotPlugin,
ID: "overlayfs",
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
ic.Meta.Exports["root"] = ic.Root
return NewSnapshotter(ic.Root)
},
})

View File

@ -19,7 +19,7 @@ func init() {
plugin.Register(&plugin.Registration{
Type: plugin.SnapshotPlugin,
ID: "windows",
Init: func(ic *plugin.InitContext) (interface{}, error) {
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return NewSnapshotter(ic.Root)
},
})

View File

@ -41,9 +41,9 @@ var _ = (runtime.Runtime)(&windowsRuntime{})
func init() {
plugin.Register(&plugin.Registration{
ID: runtimeName,
Type: plugin.RuntimePlugin,
Init: New,
ID: runtimeName,
Type: plugin.RuntimePlugin,
InitFn: New,
Requires: []plugin.Type{
plugin.MetadataPlugin,
},