diff --git a/metadata/db.go b/metadata/db.go index 52b98bc45..907b45755 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -26,9 +26,12 @@ import ( "sync/atomic" "time" + eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/events" "github.com/containerd/containerd/gc" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/snapshots" bolt "go.etcd.io/bbolt" ) @@ -56,9 +59,18 @@ func WithPolicyIsolated(o *dbOptions) { o.shared = false } +// WithEventsPublisher adds an events publisher to the +// metadata db to directly publish events +func WithEventsPublisher(p events.Publisher) DBOpt { + return func(o *dbOptions) { + o.publisher = p + } +} + // dbOptions configure db options. type dbOptions struct { - shared bool + shared bool + publisher events.Publisher } // DB represents a metadata database backed by a bolt @@ -299,6 +311,32 @@ func (m *DB) RegisterCollectibleResource(t gc.ResourceType, c Collector) { m.collectors[t] = c } +// namespacedEvent is used to handle any event for a namespace +type namespacedEvent struct { + namespace string + event interface{} +} + +func (m *DB) publishEvents(events []namespacedEvent) { + ctx := context.Background() + if publisher := m.dbopts.publisher; publisher != nil { + for _, ne := range events { + ctx := namespaces.WithNamespace(ctx, ne.namespace) + var topic string + switch ne.event.(type) { + case *eventstypes.SnapshotRemove: + topic = "/snapshot/remove" + default: + log.G(ctx).WithField("event", ne.event).Debug("unhandled event type from garbage collection removal") + continue + } + if err := publisher.Publish(ctx, topic, ne.event); err != nil { + log.G(ctx).WithError(err).WithField("topic", topic).Debug("publish event failed") + } + } + } +} + // GCStats holds the duration for the different phases of the garbage collector type GCStats struct { MetaD time.Duration @@ -323,6 +361,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { return nil, err } + events := []namespacedEvent{} if err := m.db.Update(func(tx *bolt.Tx) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -336,10 +375,20 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { if idx := strings.IndexRune(n.Key, '/'); idx > 0 { m.dirtySS[n.Key[:idx]] = struct{}{} } + // queue event to publish after successful commit } else if n.Type == ResourceContent || n.Type == ResourceIngest { m.dirtyCS = true } - return c.remove(ctx, tx, n) // From gc context + + event, err := c.remove(ctx, tx, n) + if event != nil && err == nil { + events = append(events, + namespacedEvent{ + namespace: n.Namespace, + event: event, + }) + } + return err } if err := c.scanAll(ctx, tx, rm); err != nil { // From gc context @@ -356,6 +405,13 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { var stats GCStats var wg sync.WaitGroup + // Flush events asynchronously after commit + wg.Add(1) + go func() { + m.publishEvents(events) + wg.Done() + }() + // reset dirty, no need for atomic inside of wlock.Lock m.dirty = 0 diff --git a/metadata/gc.go b/metadata/gc.go index 87645d6d2..be0b693e4 100644 --- a/metadata/gc.go +++ b/metadata/gc.go @@ -24,6 +24,7 @@ import ( "strings" "time" + eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/gc" "github.com/containerd/containerd/log" bolt "go.etcd.io/bbolt" @@ -569,10 +570,10 @@ func (c *gcContext) scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx contex } // remove all buckets for the given node. -func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { +func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) (interface{}, error) { v1bkt := tx.Bucket(bucketKeyVersion) if v1bkt == nil { - return nil + return nil, nil } nsbkt := v1bkt.Bucket([]byte(node.Namespace)) @@ -581,7 +582,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error if cc, ok := c.contexts[node.Type]; ok { cc.Remove(node) } - return nil + return nil, nil } switch node.Type { @@ -592,25 +593,28 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error } if cbkt != nil { log.G(ctx).WithField("key", node.Key).Debug("remove content") - return cbkt.DeleteBucket([]byte(node.Key)) + return nil, cbkt.DeleteBucket([]byte(node.Key)) } case ResourceSnapshot: sbkt := nsbkt.Bucket(bucketKeyObjectSnapshots) if sbkt != nil { ss, key, ok := strings.Cut(node.Key, "/") if !ok { - return fmt.Errorf("invalid snapshot gc key %s", node.Key) + return nil, fmt.Errorf("invalid snapshot gc key %s", node.Key) } ssbkt := sbkt.Bucket([]byte(ss)) if ssbkt != nil { log.G(ctx).WithField("key", key).WithField("snapshotter", ss).Debug("remove snapshot") - return ssbkt.DeleteBucket([]byte(key)) + return &eventstypes.SnapshotRemove{ + Key: key, + Snapshotter: ss, + }, ssbkt.DeleteBucket([]byte(key)) } } case ResourceLease: lbkt := nsbkt.Bucket(bucketKeyObjectLeases) if lbkt != nil { - return lbkt.DeleteBucket([]byte(node.Key)) + return nil, lbkt.DeleteBucket([]byte(node.Key)) } case ResourceIngest: ibkt := nsbkt.Bucket(bucketKeyObjectContent) @@ -619,7 +623,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error } if ibkt != nil { log.G(ctx).WithField("ref", node.Key).Debug("remove ingest") - return ibkt.DeleteBucket([]byte(node.Key)) + return nil, ibkt.DeleteBucket([]byte(node.Key)) } default: cc, ok := c.contexts[node.Type] @@ -630,7 +634,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error } } - return nil + return nil, nil } // sendLabelRefs sends all snapshot and content references referred to by the labels in the bkt diff --git a/metadata/gc_test.go b/metadata/gc_test.go index fbd994e53..576dc6a91 100644 --- a/metadata/gc_test.go +++ b/metadata/gc_test.go @@ -237,7 +237,7 @@ func TestGCRemove(t *testing.T) { if err := db.Update(func(tx *bolt.Tx) error { for _, n := range deleted { - if err := c.remove(ctx, tx, n); err != nil { + if _, err := c.remove(ctx, tx, n); err != nil { return err } } @@ -482,7 +482,7 @@ func TestCollectibleResources(t *testing.T) { }) if err := db.Update(func(tx *bolt.Tx) error { - if err := c.remove(ctx, tx, all[removeIndex]); err != nil { + if _, err := c.remove(ctx, tx, all[removeIndex]); err != nil { return err } return nil diff --git a/metadata/plugin/plugin.go b/metadata/plugin/plugin.go index 870f23903..256f12d18 100644 --- a/metadata/plugin/plugin.go +++ b/metadata/plugin/plugin.go @@ -29,6 +29,7 @@ import ( "github.com/containerd/containerd/pkg/timeout" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/snapshots" + bolt "go.etcd.io/bbolt" ) @@ -162,7 +163,7 @@ func init() { } dbopts := []metadata.DBOpt{ - // metadata.WithEventsPublisher(ic.Events), + metadata.WithEventsPublisher(ic.Events), } if !shared { diff --git a/metadata/snapshot.go b/metadata/snapshot.go index 348602093..e7774d36e 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -24,6 +24,7 @@ import ( "sync/atomic" "time" + eventstypes "github.com/containerd/containerd/api/events" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/labels" @@ -273,7 +274,22 @@ func (s *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er } func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) { - return s.createSnapshot(ctx, key, parent, false, opts) + mounts, err := s.createSnapshot(ctx, key, parent, false, opts) + if err != nil { + return nil, err + } + + if s.db.dbopts.publisher != nil { + if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ + Key: key, + Parent: parent, + Snapshotter: s.name, + }); err != nil { + return nil, err + } + } + + return mounts, nil } func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) { @@ -618,6 +634,16 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap return err } + if s.db.dbopts.publisher != nil { + if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ + Key: key, + Name: name, + Snapshotter: s.name, + }); err != nil { + return err + } + } + return nil } @@ -631,7 +657,7 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { return err } - return update(ctx, s.db, func(tx *bolt.Tx) error { + if err := update(ctx, s.db, func(tx *bolt.Tx) error { var sbkt *bolt.Bucket bkt := getSnapshotterBucket(tx, ns, s.name) if bkt != nil { @@ -674,7 +700,17 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { s.db.dirtySS[s.name] = struct{}{} return nil - }) + }); err != nil { + return err + } + + if s.db.dbopts.publisher != nil { + return s.db.dbopts.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ + Key: key, + Snapshotter: s.name, + }) + } + return nil } type infoPair struct { diff --git a/services/snapshots/snapshotters.go b/services/snapshots/snapshotters.go index 279eff380..2dcc01cb7 100644 --- a/services/snapshots/snapshotters.go +++ b/services/snapshots/snapshotters.go @@ -17,30 +17,16 @@ package snapshots import ( - "context" - - eventstypes "github.com/containerd/containerd/api/events" - "github.com/containerd/containerd/events" "github.com/containerd/containerd/metadata" - "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/services" - "github.com/containerd/containerd/snapshots" ) -// snapshotter wraps snapshots.Snapshotter with proper events published. -type snapshotter struct { - snapshots.Snapshotter - name string - publisher events.Publisher -} - func init() { plugin.Register(&plugin.Registration{ Type: plugin.ServicePlugin, ID: services.SnapshotsService, Requires: []plugin.Type{ - plugin.EventPlugin, plugin.MetadataPlugin, }, InitFn: func(ic *plugin.InitContext) (interface{}, error) { @@ -48,61 +34,8 @@ func init() { if err != nil { return nil, err } - ep, err := ic.Get(plugin.EventPlugin) - if err != nil { - return nil, err - } - db := m.(*metadata.DB) - ss := make(map[string]snapshots.Snapshotter) - for n, sn := range db.Snapshotters() { - ss[n] = newSnapshotter(sn, n, ep.(events.Publisher)) - } - return ss, nil + return m.(*metadata.DB).Snapshotters(), nil }, }) } - -func newSnapshotter(sn snapshots.Snapshotter, name string, publisher events.Publisher) snapshots.Snapshotter { - return &snapshotter{ - Snapshotter: sn, - name: name, - publisher: publisher, - } -} - -func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) { - mounts, err := s.Snapshotter.Prepare(ctx, key, parent, opts...) - if err != nil { - return nil, err - } - if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{ - Key: key, - Parent: parent, - Snapshotter: s.name, - }); err != nil { - return nil, err - } - return mounts, nil -} - -func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { - if err := s.Snapshotter.Commit(ctx, name, key, opts...); err != nil { - return err - } - return s.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{ - Key: key, - Name: name, - Snapshotter: s.name, - }) -} - -func (s *snapshotter) Remove(ctx context.Context, key string) error { - if err := s.Snapshotter.Remove(ctx, key); err != nil { - return err - } - return s.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{ - Key: key, - Snapshotter: s.name, - }) -}