Generalize the plugin package

Remove containerd specific parts of the plugin package to prepare its
move out of the main repository. Separate the plugin registration
singleton into a separate package.

Separating out the plugin package and registration makes it easier to
implement external plugins without creating a dependency loop.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2023-10-12 15:36:02 -07:00
parent a80606bc2d
commit 7b2a918213
65 changed files with 363 additions and 299 deletions

View File

@@ -19,20 +19,15 @@ package plugin
import (
"context"
"fmt"
"path/filepath"
"github.com/containerd/containerd/errdefs"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
)
// InitContext is used for plugin initialization
type InitContext struct {
Context context.Context
Root string
State string
Properties map[string]string
Config interface{}
Address string
TTRPCAddress string
RegisterReadiness func() func()
// Meta is metadata plugins can fill in at init
@@ -42,11 +37,13 @@ type InitContext struct {
}
// NewContext returns a new plugin InitContext
func NewContext(ctx context.Context, r *Registration, plugins *Set, root, state string) *InitContext {
func NewContext(ctx context.Context, plugins *Set, properties map[string]string) *InitContext {
if properties == nil {
properties = map[string]string{}
}
return &InitContext{
Context: ctx,
Root: filepath.Join(root, r.URI()),
State: filepath.Join(state, r.URI()),
Context: ctx,
Properties: properties,
Meta: &Meta{
Exports: map[string]string{},
},
@@ -62,16 +59,16 @@ func (i *InitContext) Get(t Type) (interface{}, error) {
// Meta contains information gathered from the registration and initialization
// process.
type Meta struct {
Platforms []ocispec.Platform // platforms supported by plugin
Exports map[string]string // values exported by plugin
Capabilities []string // feature switches for plugin
Platforms []imagespec.Platform // platforms supported by plugin
Exports map[string]string // values exported by plugin
Capabilities []string // feature switches for plugin
}
// Plugin represents an initialized plugin, used with an init context.
type Plugin struct {
Registration *Registration // registration, as initialized
Config interface{} // config, as initialized
Meta *Meta
Registration Registration // registration, as initialized
Config interface{} // config, as initialized
Meta Meta
instance interface{}
err error // will be set if there was an error initializing the plugin
@@ -115,7 +112,7 @@ func (ps *Set) Add(p *Plugin) error {
} else if _, idok := byID[p.Registration.ID]; !idok {
byID[p.Registration.ID] = p
} else {
return fmt.Errorf("plugin %v already initialized: %w", p.Registration.URI(), errdefs.ErrAlreadyExists)
return fmt.Errorf("plugin add failed for %s: %w", p.Registration.URI(), ErrPluginInitialized)
}
ps.ordered = append(ps.ordered, p)
@@ -127,7 +124,7 @@ func (ps *Set) Get(t Type) (interface{}, error) {
for _, v := range ps.byTypeAndID[t] {
return v.Instance()
}
return nil, fmt.Errorf("no plugins registered for %s: %w", t, errdefs.ErrNotFound)
return nil, fmt.Errorf("no plugins registered for %s: %w", t, ErrPluginNotFound)
}
// GetAll returns all initialized plugins
@@ -153,7 +150,7 @@ func (i *InitContext) GetByID(t Type, id string) (interface{}, error) {
}
p, ok := ps[id]
if !ok {
return nil, fmt.Errorf("no %s plugins with id %s: %w", t, id, errdefs.ErrNotFound)
return nil, fmt.Errorf("no %s plugins with id %s: %w", t, id, ErrPluginNotFound)
}
return p.Instance()
}
@@ -162,7 +159,7 @@ func (i *InitContext) GetByID(t Type, id string) (interface{}, error) {
func (i *InitContext) GetByType(t Type) (map[string]*Plugin, error) {
p, ok := i.plugins.byTypeAndID[t]
if !ok {
return nil, fmt.Errorf("no plugins registered for %s: %w", t, errdefs.ErrNotFound)
return nil, fmt.Errorf("no plugins registered for %s: %w", t, ErrPluginNotFound)
}
return p, nil

View File

@@ -20,7 +20,6 @@ import (
"context"
"errors"
"fmt"
"sync"
)
var (
@@ -34,6 +33,10 @@ var (
// this allows the plugin loader differentiate between a plugin which is configured
// not to load and one that fails to load.
ErrSkipPlugin = errors.New("skip plugin")
// ErrPluginInitialized is used when a plugin is already initialized
ErrPluginInitialized = errors.New("plugin: already initialized")
// ErrPluginNotFound is used when a plugin is looked up but not found
ErrPluginNotFound = errors.New("plugin: not found")
// ErrInvalidRequires will be thrown if the requirements for a plugin are
// defined in an invalid manner.
@@ -65,8 +68,6 @@ type Registration struct {
// context are passed in. The init function may modify the registration to
// add exports, capabilities and platform support declarations.
InitFn func(*InitContext) (interface{}, error)
// Disable the plugin from loading
Disable bool
// ConfigMigration allows a plugin to migrate configurations from an older
// version to handle plugin renames or moving of features from one plugin
@@ -79,12 +80,12 @@ type Registration struct {
}
// Init the registered plugin
func (r *Registration) Init(ic *InitContext) *Plugin {
func (r Registration) Init(ic *InitContext) *Plugin {
p, err := r.InitFn(ic)
return &Plugin{
Registration: r,
Config: ic.Config,
Meta: ic.Meta,
Meta: *ic.Meta,
instance: p,
err: err,
}
@@ -95,11 +96,6 @@ func (r *Registration) URI() string {
return r.Type.String() + "." + r.ID
}
var register = struct {
sync.RWMutex
r []*Registration
}{}
// Load loads all plugins at the provided path into containerd.
//
// Load is currently only implemented on non-static, non-gccgo builds for amd64
@@ -118,18 +114,64 @@ func Load(path string) (err error) {
return loadPlugins(path)
}
// Register allows plugins to register
func Register(r *Registration) {
register.Lock()
defer register.Unlock()
// DisableFilter filters out disabled plugins
type DisableFilter func(r *Registration) bool
// Registry is list of registrations which can be registered to and
// produce a filtered and ordered output.
// The Registry itself is immutable and the list will be copied
// and appeneded to a new registry when new items are registered.
type Registry []*Registration
// Graph computes the ordered list of registrations based on their dependencies,
// filtering out any plugins which match the provided filter.
func (registry Registry) Graph(filter DisableFilter) []Registration {
disabled := map[*Registration]bool{}
for _, r := range registry {
if filter(r) {
disabled[r] = true
}
}
ordered := make([]Registration, 0, len(registry)-len(disabled))
added := map[*Registration]bool{}
for _, r := range registry {
if disabled[r] {
continue
}
children(r, registry, added, disabled, &ordered)
if !added[r] {
ordered = append(ordered, *r)
added[r] = true
}
}
return ordered
}
func children(reg *Registration, registry []*Registration, added, disabled map[*Registration]bool, ordered *[]Registration) {
for _, t := range reg.Requires {
for _, r := range registry {
if !disabled[r] && r.URI() != reg.URI() && (t == "*" || r.Type == t) {
children(r, registry, added, disabled, ordered)
if !added[r] {
*ordered = append(*ordered, *r)
added[r] = true
}
}
}
}
}
// Register adds the registration to a Registry and returns the
// updated Registry, panicking if registration could not succeed.
func (registry Registry) Register(r *Registration) Registry {
if r.Type == "" {
panic(ErrNoType)
}
if r.ID == "" {
panic(ErrNoPluginID)
}
if err := checkUnique(r); err != nil {
if err := checkUnique(registry, r); err != nil {
panic(err)
}
@@ -139,66 +181,14 @@ func Register(r *Registration) {
}
}
register.r = append(register.r, r)
return append(registry, r)
}
// Reset removes all global registrations
func Reset() {
register.Lock()
defer register.Unlock()
register.r = nil
}
func checkUnique(r *Registration) error {
for _, registered := range register.r {
func checkUnique(registry Registry, r *Registration) error {
for _, registered := range registry {
if r.URI() == registered.URI() {
return fmt.Errorf("%s: %w", r.URI(), ErrIDRegistered)
}
}
return nil
}
// DisableFilter filters out disabled plugins
type DisableFilter func(r *Registration) bool
// Graph returns an ordered list of registered plugins for initialization.
// Plugins in disableList specified by id will be disabled.
func Graph(filter DisableFilter) (ordered []*Registration) {
register.RLock()
defer register.RUnlock()
for _, r := range register.r {
if filter(r) {
r.Disable = true
}
}
added := map[*Registration]bool{}
for _, r := range register.r {
if r.Disable {
continue
}
children(r, added, &ordered)
if !added[r] {
ordered = append(ordered, r)
added[r] = true
}
}
return ordered
}
func children(reg *Registration, added map[*Registration]bool, ordered *[]*Registration) {
for _, t := range reg.Requires {
for _, r := range register.r {
if !r.Disable &&
r.URI() != reg.URI() &&
(t == "*" || r.Type == t) {
children(r, added, ordered)
if !added[r] {
*ordered = append(*ordered, r)
added[r] = true
}
}
}
}
}

View File

@@ -22,19 +22,12 @@ import (
"github.com/containerd/containerd/services"
)
func registerClear() {
register.Lock()
defer register.Unlock()
register.r = nil
}
func mockPluginFilter(*Registration) bool {
return false
}
// TestContainerdPlugin tests the logic of Graph, use the containerd's plugin
func TestContainerdPlugin(t *testing.T) {
registerClear()
// Plugin types commonly used by containerd
const (
InternalPlugin Type = "io.containerd.internal.v1"
@@ -52,11 +45,11 @@ func TestContainerdPlugin(t *testing.T) {
TracingProcessorPlugin Type = "io.containerd.tracing.processor.v1"
)
Register(&Registration{
var register Registry
register = register.Register(&Registration{
Type: TaskMonitorPlugin,
ID: "cgroups",
})
Register(&Registration{
}).Register(&Registration{
Type: ServicePlugin,
ID: services.TasksService,
Requires: []Type{
@@ -65,199 +58,165 @@ func TestContainerdPlugin(t *testing.T) {
MetadataPlugin,
TaskMonitorPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: ServicePlugin,
ID: services.IntrospectionService,
})
Register(&Registration{
}).Register(&Registration{
Type: ServicePlugin,
ID: services.NamespacesService,
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "namespaces",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "content",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "containers",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: ServicePlugin,
ID: services.ContainersService,
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "events",
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "leases",
Requires: []Type{
LeasePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: LeasePlugin,
ID: "manager",
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "diff",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: ServicePlugin,
ID: services.DiffService,
Requires: []Type{
DiffPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: ServicePlugin,
ID: services.SnapshotsService,
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "snapshots",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "version",
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "images",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GCPlugin,
ID: "scheduler",
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: RuntimePluginV2,
ID: "task",
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "tasks",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "introspection",
Requires: []Type{"*"},
})
Register(&Registration{
}).Register(&Registration{
Type: ServicePlugin,
ID: services.ContentService,
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "healthcheck",
})
Register(&Registration{
}).Register(&Registration{
Type: InternalPlugin,
ID: "opt",
})
Register(&Registration{
}).Register(&Registration{
Type: GRPCPlugin,
ID: "cri",
Requires: []Type{
ServicePlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: RuntimePlugin,
ID: "linux",
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: InternalPlugin,
Requires: []Type{
ServicePlugin,
},
ID: "restart",
})
Register(&Registration{
}).Register(&Registration{
Type: DiffPlugin,
ID: "walking",
Requires: []Type{
MetadataPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: SnapshotPlugin,
ID: "native",
})
Register(&Registration{
}).Register(&Registration{
Type: SnapshotPlugin,
ID: "overlayfs",
})
Register(&Registration{
}).Register(&Registration{
Type: ContentPlugin,
ID: "content",
})
Register(&Registration{
}).Register(&Registration{
Type: MetadataPlugin,
ID: "bolt",
Requires: []Type{
ContentPlugin,
SnapshotPlugin,
},
})
Register(&Registration{
}).Register(&Registration{
Type: TracingProcessorPlugin,
ID: "otlp",
})
Register(&Registration{
}).Register(&Registration{
Type: InternalPlugin,
ID: "tracing",
Requires: []Type{
@@ -265,7 +224,7 @@ func TestContainerdPlugin(t *testing.T) {
},
})
ordered := Graph(mockPluginFilter)
ordered := register.Graph(mockPluginFilter)
expectedURI := []string{
"io.containerd.monitor.v1.cgroups",
"io.containerd.content.v1.content",
@@ -305,7 +264,7 @@ func TestContainerdPlugin(t *testing.T) {
cmpOrdered(t, ordered, expectedURI)
}
func cmpOrdered(t *testing.T, ordered []*Registration, expectedURI []string) {
func cmpOrdered(t *testing.T, ordered []Registration, expectedURI []string) {
if len(ordered) != len(expectedURI) {
t.Fatalf("ordered compare failed, %d != %d", len(ordered), len(expectedURI))
}
@@ -321,6 +280,7 @@ func TestPluginGraph(t *testing.T) {
for _, testcase := range []struct {
input []*Registration
expectedURI []string
filter DisableFilter
}{
// test requires *
{
@@ -395,21 +355,27 @@ func TestPluginGraph(t *testing.T) {
ID: "content",
},
{
Type: "disable",
ID: "disable",
Disable: true,
Type: "disable",
ID: "disable",
},
},
expectedURI: []string{
"content.content",
},
filter: func(r *Registration) bool {
return r.Type == "disable"
},
},
} {
registerClear()
var register Registry
for _, in := range testcase.input {
Register(in)
register = register.Register(in)
}
ordered := Graph(mockPluginFilter)
var filter DisableFilter = mockPluginFilter
if testcase.filter != nil {
filter = testcase.filter
}
ordered := register.Graph(filter)
cmpOrdered(t, ordered, testcase.expectedURI)
}
}

View File

@@ -0,0 +1,50 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"sync"
"github.com/containerd/containerd/plugin"
)
var register = struct {
sync.RWMutex
r plugin.Registry
}{}
// Register allows plugins to register
func Register(r *plugin.Registration) {
register.Lock()
defer register.Unlock()
register.r = register.r.Register(r)
}
// Reset removes all global registrations
func Reset() {
register.Lock()
defer register.Unlock()
register.r = nil
}
// Graph returns an ordered list of registered plugins for initialization.
// Plugins in disableList specified by id will be disabled.
func Graph(filter plugin.DisableFilter) []plugin.Registration {
register.RLock()
defer register.RUnlock()
return register.r.Graph(filter)
}