From 0a0621bb47d48d8cbcbb614ee38422ce8b98a340 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Thu, 5 Aug 2021 21:50:40 -0700 Subject: [PATCH] Move plugin context events into separate plugin Signed-off-by: Derek McGowan --- cmd/containerd/builtins.go | 1 + events/plugin/plugin.go | 32 ++++++++++++++++++++++++++++++ metrics/cgroups/cgroups.go | 14 +++++++++++-- pkg/cri/cri.go | 8 +++++++- plugin/context.go | 17 +++++++++++++++- plugin/plugin.go | 2 ++ runtime/restart/monitor/monitor.go | 9 ++++++++- runtime/v1/linux/runtime.go | 9 ++++++++- runtime/v2/manager.go | 8 +++++++- services/containers/local.go | 7 ++++++- services/content/store.go | 7 ++++++- services/events/service.go | 9 ++++++++- services/namespaces/local.go | 7 ++++++- services/server/server.go | 6 +++--- services/snapshots/snapshotters.go | 7 ++++++- services/tasks/local.go | 7 ++++++- services/tasks/local_freebsd.go | 1 + services/tasks/local_unix.go | 1 + services/tasks/local_windows.go | 1 + 19 files changed, 137 insertions(+), 16 deletions(-) create mode 100644 events/plugin/plugin.go diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go index b120b6078..3d79eee09 100644 --- a/cmd/containerd/builtins.go +++ b/cmd/containerd/builtins.go @@ -19,6 +19,7 @@ package main // register containerd builtins here import ( _ "github.com/containerd/containerd/diff/walking/plugin" + _ "github.com/containerd/containerd/events/plugin" _ "github.com/containerd/containerd/gc/scheduler" _ "github.com/containerd/containerd/runtime/restart/monitor" _ "github.com/containerd/containerd/services/containers" diff --git a/events/plugin/plugin.go b/events/plugin/plugin.go new file mode 100644 index 000000000..eab0a3be9 --- /dev/null +++ b/events/plugin/plugin.go @@ -0,0 +1,32 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package plugin + +import ( + "github.com/containerd/containerd/plugin" +) + +func init() { + plugin.Register(&plugin.Registration{ + Type: plugin.EventPlugin, + ID: "exchange", + InitFn: func(ic *plugin.InitContext) (interface{}, error) { + // TODO: In 2.0, create exchange since ic.Events will be removed + return ic.Events, nil + }, + }) +} diff --git a/metrics/cgroups/cgroups.go b/metrics/cgroups/cgroups.go index 6807b19b8..d2f3fb2df 100644 --- a/metrics/cgroups/cgroups.go +++ b/metrics/cgroups/cgroups.go @@ -20,6 +20,7 @@ package cgroups import ( "github.com/containerd/cgroups" + "github.com/containerd/containerd/events" v1 "github.com/containerd/containerd/metrics/cgroups/v1" v2 "github.com/containerd/containerd/metrics/cgroups/v2" "github.com/containerd/containerd/platforms" @@ -38,6 +39,9 @@ func init() { Type: plugin.TaskMonitorPlugin, ID: "cgroups", InitFn: New, + Requires: []plugin.Type{ + plugin.EventPlugin, + }, Config: &Config{}, }) } @@ -53,10 +57,16 @@ func New(ic *plugin.InitContext) (interface{}, error) { tm runtime.TaskMonitor err error ) + + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, err + } + if cgroups.Mode() == cgroups.Unified { - tm, err = v2.NewTaskMonitor(ic.Context, ic.Events, ns) + tm, err = v2.NewTaskMonitor(ic.Context, ep.(events.Publisher), ns) } else { - tm, err = v1.NewTaskMonitor(ic.Context, ic.Events, ns) + tm, err = v1.NewTaskMonitor(ic.Context, ep.(events.Publisher), ns) } if err != nil { return nil, err diff --git a/pkg/cri/cri.go b/pkg/cri/cri.go index 28c80f301..d798e8732 100644 --- a/pkg/cri/cri.go +++ b/pkg/cri/cri.go @@ -53,6 +53,7 @@ func init() { ID: "cri", Config: &config, Requires: []plugin.Type{ + plugin.EventPlugin, plugin.ServicePlugin, }, InitFn: initCRIService, @@ -118,8 +119,13 @@ func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) { return nil, errors.Wrap(err, "failed to get service plugin") } + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, errors.Wrap(err, "failed to get event plugin") + } + opts := []containerd.ServicesOpt{ - containerd.WithEventService(ic.Events), + containerd.WithEventService(ep.(containerd.EventService)), } for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{ services.ContentService: func(s interface{}) containerd.ServicesOpt { diff --git a/plugin/context.go b/plugin/context.go index 75b7366fc..0ad5ac889 100644 --- a/plugin/context.go +++ b/plugin/context.go @@ -34,7 +34,9 @@ type InitContext struct { Config interface{} Address string TTRPCAddress string - Events *exchange.Exchange + + // deprecated: will be removed in 2.0, use plugin.EventType + Events *exchange.Exchange Meta *Meta // plugins can fill in metadata at init. @@ -135,6 +137,19 @@ func (i *InitContext) GetAll() []*Plugin { return i.plugins.ordered } +// GetByID returns the plugin of the given type and ID +func (i *InitContext) GetByID(t Type, id string) (interface{}, error) { + ps, err := i.GetByType(t) + if err != nil { + return nil, err + } + p, ok := ps[id] + if !ok { + return nil, errors.Wrapf(errdefs.ErrNotFound, "no %s plugins with id %s", t, id) + } + return p.Instance() +} + // GetByType returns all plugins with the specific type. func (i *InitContext) GetByType(t Type) (map[string]*Plugin, error) { p, ok := i.plugins.byTypeAndID[t] diff --git a/plugin/plugin.go b/plugin/plugin.go index 1894afe8c..9bbfb0d9e 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -75,6 +75,8 @@ const ( ContentPlugin Type = "io.containerd.content.v1" // GCPlugin implements garbage collection policy GCPlugin Type = "io.containerd.gc.v1" + // EventPlugin implements event handling + EventPlugin Type = "io.containerd.event.v1" ) const ( diff --git a/runtime/restart/monitor/monitor.go b/runtime/restart/monitor/monitor.go index 19c0af0ef..623fc6f2a 100644 --- a/runtime/restart/monitor/monitor.go +++ b/runtime/restart/monitor/monitor.go @@ -63,6 +63,7 @@ func init() { plugin.Register(&plugin.Registration{ Type: plugin.InternalPlugin, Requires: []plugin.Type{ + plugin.EventPlugin, plugin.ServicePlugin, }, ID: "restart", @@ -95,8 +96,14 @@ func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) { if err != nil { return nil, errors.Wrap(err, "failed to get service plugin") } + + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, errors.Wrap(err, "failed to get event plugin") + } + opts := []containerd.ServicesOpt{ - containerd.WithEventService(ic.Events), + containerd.WithEventService(ep.(containerd.EventService)), } for s, fn := range map[string]func(interface{}) containerd.ServicesOpt{ services.ContentService: func(s interface{}) containerd.ServicesOpt { diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index aa6d3f314..fddfb71d2 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -73,6 +73,7 @@ func init() { ID: "linux", InitFn: New, Requires: []plugin.Type{ + plugin.EventPlugin, plugin.MetadataPlugin, }, Config: &Config{ @@ -112,6 +113,12 @@ func New(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + + ep, err := ic.GetByID(plugin.EventPlugin, "exchange") + if err != nil { + return nil, err + } + cfg := ic.Config.(*Config) r := &Runtime{ root: ic.Root, @@ -119,7 +126,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { tasks: runtime.NewTaskList(), containers: metadata.NewContainerStore(m.(*metadata.DB)), address: ic.Address, - events: ic.Events, + events: ep.(*exchange.Exchange), config: cfg, } tasks, err := r.restoreTasks(ic.Context) diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index f8cdd9c6c..7798e7481 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -49,6 +49,7 @@ func init() { Type: plugin.RuntimePluginV2, ID: "task", Requires: []plugin.Type{ + plugin.EventPlugin, plugin.MetadataPlugin, }, Config: &Config{ @@ -71,9 +72,14 @@ func init() { if err != nil { return nil, err } + ep, err := ic.GetByID(plugin.EventPlugin, "exchange") + if err != nil { + return nil, err + } cs := metadata.NewContainerStore(m.(*metadata.DB)) + events := ep.(*exchange.Exchange) - return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs) + return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, events, cs) }, }) } diff --git a/services/containers/local.go b/services/containers/local.go index b1336494c..843512e0e 100644 --- a/services/containers/local.go +++ b/services/containers/local.go @@ -41,6 +41,7 @@ func init() { Type: plugin.ServicePlugin, ID: services.ContainersService, Requires: []plugin.Type{ + plugin.EventPlugin, plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { @@ -48,12 +49,16 @@ func init() { if err != nil { return nil, err } + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, err + } db := m.(*metadata.DB) return &local{ Store: metadata.NewContainerStore(db), db: db, - publisher: ic.Events, + publisher: ep.(events.Publisher), }, nil }, }) diff --git a/services/content/store.go b/services/content/store.go index 3de91d37c..016cd0676 100644 --- a/services/content/store.go +++ b/services/content/store.go @@ -39,6 +39,7 @@ func init() { Type: plugin.ServicePlugin, ID: services.ContentService, Requires: []plugin.Type{ + plugin.EventPlugin, plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { @@ -46,8 +47,12 @@ func init() { if err != nil { return nil, err } + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, err + } - s, err := newContentStore(m.(*metadata.DB).ContentStore(), ic.Events) + s, err := newContentStore(m.(*metadata.DB).ContentStore(), ep.(events.Publisher)) return s, err }, }) diff --git a/services/events/service.go b/services/events/service.go index fc1684862..ef647582a 100644 --- a/services/events/service.go +++ b/services/events/service.go @@ -35,8 +35,15 @@ func init() { plugin.Register(&plugin.Registration{ Type: plugin.GRPCPlugin, ID: "events", + Requires: []plugin.Type{ + plugin.EventPlugin, + }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { - return NewService(ic.Events), nil + ep, err := ic.GetByID(plugin.EventPlugin, "exchange") + if err != nil { + return nil, err + } + return NewService(ep.(*exchange.Exchange)), nil }, }) } diff --git a/services/namespaces/local.go b/services/namespaces/local.go index f50b65355..c45ae8074 100644 --- a/services/namespaces/local.go +++ b/services/namespaces/local.go @@ -40,6 +40,7 @@ func init() { Type: plugin.ServicePlugin, ID: services.NamespacesService, Requires: []plugin.Type{ + plugin.EventPlugin, plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { @@ -47,9 +48,13 @@ func init() { if err != nil { return nil, err } + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, err + } return &local{ db: m.(*metadata.DB), - publisher: ic.Events, + publisher: ep.(events.Publisher), }, nil }, }) diff --git a/services/server/server.go b/services/server/server.go index 444e64f59..761b3d6ac 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -146,9 +146,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { grpcServer: grpcServer, tcpServer: tcpServer, ttrpcServer: ttrpcServer, - events: exchange.NewExchange(), config: config, } + // TODO: Remove this in 2.0 and let event plugin crease it + events = exchange.NewExchange() initialized = plugin.NewPluginSet() required = make(map[string]struct{}) ) @@ -170,7 +171,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { config.Root, config.State, ) - initContext.Events = s.events + initContext.Events = events initContext.Address = config.GRPC.Address initContext.TTRPCAddress = config.TTRPC.Address @@ -246,7 +247,6 @@ type Server struct { grpcServer *grpc.Server ttrpcServer *ttrpc.Server tcpServer *grpc.Server - events *exchange.Exchange config *srvconfig.Config plugins []*plugin.Plugin } diff --git a/services/snapshots/snapshotters.go b/services/snapshots/snapshotters.go index 5da365110..6405cb690 100644 --- a/services/snapshots/snapshotters.go +++ b/services/snapshots/snapshotters.go @@ -39,6 +39,7 @@ func init() { Type: plugin.ServicePlugin, ID: services.SnapshotsService, Requires: []plugin.Type{ + plugin.EventPlugin, plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { @@ -46,11 +47,15 @@ func init() { if err != nil { return nil, err } + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, err + } db := m.(*metadata.DB) ss := make(map[string]snapshots.Snapshotter) for n, sn := range db.Snapshotters() { - ss[n] = newSnapshotter(sn, ic.Events) + ss[n] = newSnapshotter(sn, ep.(events.Publisher)) } return ss, nil }, diff --git a/services/tasks/local.go b/services/tasks/local.go index cd5280a7f..8c5b9d4c4 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -92,6 +92,11 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { return nil, err } + ep, err := ic.Get(plugin.EventPlugin) + if err != nil { + return nil, err + } + monitor, err := ic.Get(plugin.TaskMonitorPlugin) if err != nil { if !errdefs.IsNotFound(err) { @@ -105,7 +110,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { runtimes: runtimes, containers: metadata.NewContainerStore(db), store: db.ContentStore(), - publisher: ic.Events, + publisher: ep.(events.Publisher), monitor: monitor.(runtime.TaskMonitor), v2Runtime: v2r.(*v2.TaskManager), } diff --git a/services/tasks/local_freebsd.go b/services/tasks/local_freebsd.go index d206b8f25..ee88ee9fd 100644 --- a/services/tasks/local_freebsd.go +++ b/services/tasks/local_freebsd.go @@ -24,6 +24,7 @@ import ( ) var tasksServiceRequires = []plugin.Type{ + plugin.EventPlugin, plugin.RuntimePluginV2, plugin.MetadataPlugin, plugin.TaskMonitorPlugin, diff --git a/services/tasks/local_unix.go b/services/tasks/local_unix.go index 2879df0c4..14ce94fbe 100644 --- a/services/tasks/local_unix.go +++ b/services/tasks/local_unix.go @@ -26,6 +26,7 @@ import ( ) var tasksServiceRequires = []plugin.Type{ + plugin.EventPlugin, plugin.RuntimePlugin, plugin.RuntimePluginV2, plugin.MetadataPlugin, diff --git a/services/tasks/local_windows.go b/services/tasks/local_windows.go index d12f9e470..17ce878f2 100644 --- a/services/tasks/local_windows.go +++ b/services/tasks/local_windows.go @@ -24,6 +24,7 @@ import ( ) var tasksServiceRequires = []plugin.Type{ + plugin.EventPlugin, plugin.RuntimePluginV2, plugin.MetadataPlugin, plugin.TaskMonitorPlugin,