Move plugin context events into separate plugin

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2021-08-05 21:50:40 -07:00
parent 7d4891783a
commit 0a0621bb47
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
19 changed files with 137 additions and 16 deletions

View File

@ -19,6 +19,7 @@ package main
// register containerd builtins here // register containerd builtins here
import ( import (
_ "github.com/containerd/containerd/diff/walking/plugin" _ "github.com/containerd/containerd/diff/walking/plugin"
_ "github.com/containerd/containerd/events/plugin"
_ "github.com/containerd/containerd/gc/scheduler" _ "github.com/containerd/containerd/gc/scheduler"
_ "github.com/containerd/containerd/runtime/restart/monitor" _ "github.com/containerd/containerd/runtime/restart/monitor"
_ "github.com/containerd/containerd/services/containers" _ "github.com/containerd/containerd/services/containers"

32
events/plugin/plugin.go Normal file
View File

@ -0,0 +1,32 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"github.com/containerd/containerd/plugin"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.EventPlugin,
ID: "exchange",
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
// TODO: In 2.0, create exchange since ic.Events will be removed
return ic.Events, nil
},
})
}

View File

@ -20,6 +20,7 @@ package cgroups
import ( import (
"github.com/containerd/cgroups" "github.com/containerd/cgroups"
"github.com/containerd/containerd/events"
v1 "github.com/containerd/containerd/metrics/cgroups/v1" v1 "github.com/containerd/containerd/metrics/cgroups/v1"
v2 "github.com/containerd/containerd/metrics/cgroups/v2" v2 "github.com/containerd/containerd/metrics/cgroups/v2"
"github.com/containerd/containerd/platforms" "github.com/containerd/containerd/platforms"
@ -38,6 +39,9 @@ func init() {
Type: plugin.TaskMonitorPlugin, Type: plugin.TaskMonitorPlugin,
ID: "cgroups", ID: "cgroups",
InitFn: New, InitFn: New,
Requires: []plugin.Type{
plugin.EventPlugin,
},
Config: &Config{}, Config: &Config{},
}) })
} }
@ -53,10 +57,16 @@ func New(ic *plugin.InitContext) (interface{}, error) {
tm runtime.TaskMonitor tm runtime.TaskMonitor
err error err error
) )
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
if cgroups.Mode() == cgroups.Unified { if cgroups.Mode() == cgroups.Unified {
tm, err = v2.NewTaskMonitor(ic.Context, ic.Events, ns) tm, err = v2.NewTaskMonitor(ic.Context, ep.(events.Publisher), ns)
} else { } else {
tm, err = v1.NewTaskMonitor(ic.Context, ic.Events, ns) tm, err = v1.NewTaskMonitor(ic.Context, ep.(events.Publisher), ns)
} }
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -53,6 +53,7 @@ func init() {
ID: "cri", ID: "cri",
Config: &config, Config: &config,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.ServicePlugin, plugin.ServicePlugin,
}, },
InitFn: initCRIService, InitFn: initCRIService,
@ -118,8 +119,13 @@ func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
return nil, errors.Wrap(err, "failed to get service plugin") return nil, errors.Wrap(err, "failed to get service plugin")
} }
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, errors.Wrap(err, "failed to get event plugin")
}
opts := []containerd.ServicesOpt{ opts := []containerd.ServicesOpt{
containerd.WithEventService(ic.Events), containerd.WithEventService(ep.(containerd.EventService)),
} }
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{ for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
services.ContentService: func(s interface{}) containerd.ServicesOpt { services.ContentService: func(s interface{}) containerd.ServicesOpt {

View File

@ -34,6 +34,8 @@ type InitContext struct {
Config interface{} Config interface{}
Address string Address string
TTRPCAddress string TTRPCAddress string
// deprecated: will be removed in 2.0, use plugin.EventType
Events *exchange.Exchange Events *exchange.Exchange
Meta *Meta // plugins can fill in metadata at init. Meta *Meta // plugins can fill in metadata at init.
@ -135,6 +137,19 @@ func (i *InitContext) GetAll() []*Plugin {
return i.plugins.ordered return i.plugins.ordered
} }
// GetByID returns the plugin of the given type and ID
func (i *InitContext) GetByID(t Type, id string) (interface{}, error) {
ps, err := i.GetByType(t)
if err != nil {
return nil, err
}
p, ok := ps[id]
if !ok {
return nil, errors.Wrapf(errdefs.ErrNotFound, "no %s plugins with id %s", t, id)
}
return p.Instance()
}
// GetByType returns all plugins with the specific type. // GetByType returns all plugins with the specific type.
func (i *InitContext) GetByType(t Type) (map[string]*Plugin, error) { func (i *InitContext) GetByType(t Type) (map[string]*Plugin, error) {
p, ok := i.plugins.byTypeAndID[t] p, ok := i.plugins.byTypeAndID[t]

View File

@ -75,6 +75,8 @@ const (
ContentPlugin Type = "io.containerd.content.v1" ContentPlugin Type = "io.containerd.content.v1"
// GCPlugin implements garbage collection policy // GCPlugin implements garbage collection policy
GCPlugin Type = "io.containerd.gc.v1" GCPlugin Type = "io.containerd.gc.v1"
// EventPlugin implements event handling
EventPlugin Type = "io.containerd.event.v1"
) )
const ( const (

View File

@ -63,6 +63,7 @@ func init() {
plugin.Register(&plugin.Registration{ plugin.Register(&plugin.Registration{
Type: plugin.InternalPlugin, Type: plugin.InternalPlugin,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.ServicePlugin, plugin.ServicePlugin,
}, },
ID: "restart", ID: "restart",
@ -95,8 +96,14 @@ func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to get service plugin") return nil, errors.Wrap(err, "failed to get service plugin")
} }
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, errors.Wrap(err, "failed to get event plugin")
}
opts := []containerd.ServicesOpt{ opts := []containerd.ServicesOpt{
containerd.WithEventService(ic.Events), containerd.WithEventService(ep.(containerd.EventService)),
} }
for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{ for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{
services.ContentService: func(s interface{}) containerd.ServicesOpt { services.ContentService: func(s interface{}) containerd.ServicesOpt {

View File

@ -73,6 +73,7 @@ func init() {
ID: "linux", ID: "linux",
InitFn: New, InitFn: New,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin, plugin.MetadataPlugin,
}, },
Config: &Config{ Config: &Config{
@ -112,6 +113,12 @@ func New(ic *plugin.InitContext) (interface{}, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ep, err := ic.GetByID(plugin.EventPlugin, "exchange")
if err != nil {
return nil, err
}
cfg := ic.Config.(*Config) cfg := ic.Config.(*Config)
r := &Runtime{ r := &Runtime{
root: ic.Root, root: ic.Root,
@ -119,7 +126,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
tasks: runtime.NewTaskList(), tasks: runtime.NewTaskList(),
containers: metadata.NewContainerStore(m.(*metadata.DB)), containers: metadata.NewContainerStore(m.(*metadata.DB)),
address: ic.Address, address: ic.Address,
events: ic.Events, events: ep.(*exchange.Exchange),
config: cfg, config: cfg,
} }
tasks, err := r.restoreTasks(ic.Context) tasks, err := r.restoreTasks(ic.Context)

View File

@ -49,6 +49,7 @@ func init() {
Type: plugin.RuntimePluginV2, Type: plugin.RuntimePluginV2,
ID: "task", ID: "task",
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin, plugin.MetadataPlugin,
}, },
Config: &Config{ Config: &Config{
@ -71,9 +72,14 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ep, err := ic.GetByID(plugin.EventPlugin, "exchange")
if err != nil {
return nil, err
}
cs := metadata.NewContainerStore(m.(*metadata.DB)) cs := metadata.NewContainerStore(m.(*metadata.DB))
events := ep.(*exchange.Exchange)
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs) return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, events, cs)
}, },
}) })
} }

View File

@ -41,6 +41,7 @@ func init() {
Type: plugin.ServicePlugin, Type: plugin.ServicePlugin,
ID: services.ContainersService, ID: services.ContainersService,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin, plugin.MetadataPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
@ -48,12 +49,16 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
db := m.(*metadata.DB) db := m.(*metadata.DB)
return &local{ return &local{
Store: metadata.NewContainerStore(db), Store: metadata.NewContainerStore(db),
db: db, db: db,
publisher: ic.Events, publisher: ep.(events.Publisher),
}, nil }, nil
}, },
}) })

View File

@ -39,6 +39,7 @@ func init() {
Type: plugin.ServicePlugin, Type: plugin.ServicePlugin,
ID: services.ContentService, ID: services.ContentService,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin, plugin.MetadataPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
@ -46,8 +47,12 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events) s, err := newContentStore(m.(*metadata.DB).ContentStore(), ep.(events.Publisher))
return s, err return s, err
}, },
}) })

View File

@ -35,8 +35,15 @@ func init() {
plugin.Register(&plugin.Registration{ plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin, Type: plugin.GRPCPlugin,
ID: "events", ID: "events",
Requires: []plugin.Type{
plugin.EventPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
return NewService(ic.Events), nil ep, err := ic.GetByID(plugin.EventPlugin, "exchange")
if err != nil {
return nil, err
}
return NewService(ep.(*exchange.Exchange)), nil
}, },
}) })
} }

View File

@ -40,6 +40,7 @@ func init() {
Type: plugin.ServicePlugin, Type: plugin.ServicePlugin,
ID: services.NamespacesService, ID: services.NamespacesService,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin, plugin.MetadataPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
@ -47,9 +48,13 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
return &local{ return &local{
db: m.(*metadata.DB), db: m.(*metadata.DB),
publisher: ic.Events, publisher: ep.(events.Publisher),
}, nil }, nil
}, },
}) })

View File

@ -146,9 +146,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
grpcServer: grpcServer, grpcServer: grpcServer,
tcpServer: tcpServer, tcpServer: tcpServer,
ttrpcServer: ttrpcServer, ttrpcServer: ttrpcServer,
events: exchange.NewExchange(),
config: config, config: config,
} }
// TODO: Remove this in 2.0 and let event plugin crease it
events = exchange.NewExchange()
initialized = plugin.NewPluginSet() initialized = plugin.NewPluginSet()
required = make(map[string]struct{}) required = make(map[string]struct{})
) )
@ -170,7 +171,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
config.Root, config.Root,
config.State, config.State,
) )
initContext.Events = s.events initContext.Events = events
initContext.Address = config.GRPC.Address initContext.Address = config.GRPC.Address
initContext.TTRPCAddress = config.TTRPC.Address initContext.TTRPCAddress = config.TTRPC.Address
@ -246,7 +247,6 @@ type Server struct {
grpcServer *grpc.Server grpcServer *grpc.Server
ttrpcServer *ttrpc.Server ttrpcServer *ttrpc.Server
tcpServer *grpc.Server tcpServer *grpc.Server
events *exchange.Exchange
config *srvconfig.Config config *srvconfig.Config
plugins []*plugin.Plugin plugins []*plugin.Plugin
} }

View File

@ -39,6 +39,7 @@ func init() {
Type: plugin.ServicePlugin, Type: plugin.ServicePlugin,
ID: services.SnapshotsService, ID: services.SnapshotsService,
Requires: []plugin.Type{ Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin, plugin.MetadataPlugin,
}, },
InitFn: func(ic *plugin.InitContext) (interface{}, error) { InitFn: func(ic *plugin.InitContext) (interface{}, error) {
@ -46,11 +47,15 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
db := m.(*metadata.DB) db := m.(*metadata.DB)
ss := make(map[string]snapshots.Snapshotter) ss := make(map[string]snapshots.Snapshotter)
for n, sn := range db.Snapshotters() { for n, sn := range db.Snapshotters() {
ss[n] = newSnapshotter(sn, ic.Events) ss[n] = newSnapshotter(sn, ep.(events.Publisher))
} }
return ss, nil return ss, nil
}, },

View File

@ -92,6 +92,11 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
return nil, err return nil, err
} }
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
monitor, err := ic.Get(plugin.TaskMonitorPlugin) monitor, err := ic.Get(plugin.TaskMonitorPlugin)
if err != nil { if err != nil {
if !errdefs.IsNotFound(err) { if !errdefs.IsNotFound(err) {
@ -105,7 +110,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
runtimes: runtimes, runtimes: runtimes,
containers: metadata.NewContainerStore(db), containers: metadata.NewContainerStore(db),
store: db.ContentStore(), store: db.ContentStore(),
publisher: ic.Events, publisher: ep.(events.Publisher),
monitor: monitor.(runtime.TaskMonitor), monitor: monitor.(runtime.TaskMonitor),
v2Runtime: v2r.(*v2.TaskManager), v2Runtime: v2r.(*v2.TaskManager),
} }

View File

@ -24,6 +24,7 @@ import (
) )
var tasksServiceRequires = []plugin.Type{ var tasksServiceRequires = []plugin.Type{
plugin.EventPlugin,
plugin.RuntimePluginV2, plugin.RuntimePluginV2,
plugin.MetadataPlugin, plugin.MetadataPlugin,
plugin.TaskMonitorPlugin, plugin.TaskMonitorPlugin,

View File

@ -26,6 +26,7 @@ import (
) )
var tasksServiceRequires = []plugin.Type{ var tasksServiceRequires = []plugin.Type{
plugin.EventPlugin,
plugin.RuntimePlugin, plugin.RuntimePlugin,
plugin.RuntimePluginV2, plugin.RuntimePluginV2,
plugin.MetadataPlugin, plugin.MetadataPlugin,

View File

@ -24,6 +24,7 @@ import (
) )
var tasksServiceRequires = []plugin.Type{ var tasksServiceRequires = []plugin.Type{
plugin.EventPlugin,
plugin.RuntimePluginV2, plugin.RuntimePluginV2,
plugin.MetadataPlugin, plugin.MetadataPlugin,
plugin.TaskMonitorPlugin, plugin.TaskMonitorPlugin,