Merge pull request #5674 from dmcgowan/metadata-snapshot-publish

This commit is contained in:
Fu Wei 2023-01-03 09:23:48 +08:00 committed by GitHub
commit 9a7c264d25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 115 additions and 85 deletions

View File

@ -26,9 +26,12 @@ import (
"sync/atomic"
"time"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
bolt "go.etcd.io/bbolt"
)
@ -56,9 +59,18 @@ func WithPolicyIsolated(o *dbOptions) {
o.shared = false
}
// WithEventsPublisher adds an events publisher to the
// metadata db to directly publish events
func WithEventsPublisher(p events.Publisher) DBOpt {
return func(o *dbOptions) {
o.publisher = p
}
}
// dbOptions configure db options.
type dbOptions struct {
shared bool
publisher events.Publisher
}
// DB represents a metadata database backed by a bolt
@ -299,6 +311,32 @@ func (m *DB) RegisterCollectibleResource(t gc.ResourceType, c Collector) {
m.collectors[t] = c
}
// namespacedEvent is used to handle any event for a namespace
type namespacedEvent struct {
namespace string
event interface{}
}
func (m *DB) publishEvents(events []namespacedEvent) {
ctx := context.Background()
if publisher := m.dbopts.publisher; publisher != nil {
for _, ne := range events {
ctx := namespaces.WithNamespace(ctx, ne.namespace)
var topic string
switch ne.event.(type) {
case *eventstypes.SnapshotRemove:
topic = "/snapshot/remove"
default:
log.G(ctx).WithField("event", ne.event).Debug("unhandled event type from garbage collection removal")
continue
}
if err := publisher.Publish(ctx, topic, ne.event); err != nil {
log.G(ctx).WithError(err).WithField("topic", topic).Debug("publish event failed")
}
}
}
}
// GCStats holds the duration for the different phases of the garbage collector
type GCStats struct {
MetaD time.Duration
@ -323,6 +361,7 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
return nil, err
}
events := []namespacedEvent{}
if err := m.db.Update(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -336,10 +375,20 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
m.dirtySS[n.Key[:idx]] = struct{}{}
}
// queue event to publish after successful commit
} else if n.Type == ResourceContent || n.Type == ResourceIngest {
m.dirtyCS = true
}
return c.remove(ctx, tx, n) // From gc context
event, err := c.remove(ctx, tx, n)
if event != nil && err == nil {
events = append(events,
namespacedEvent{
namespace: n.Namespace,
event: event,
})
}
return err
}
if err := c.scanAll(ctx, tx, rm); err != nil { // From gc context
@ -356,6 +405,13 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
var stats GCStats
var wg sync.WaitGroup
// Flush events asynchronously after commit
wg.Add(1)
go func() {
m.publishEvents(events)
wg.Done()
}()
// reset dirty, no need for atomic inside of wlock.Lock
m.dirty = 0

View File

@ -24,6 +24,7 @@ import (
"strings"
"time"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/log"
bolt "go.etcd.io/bbolt"
@ -569,10 +570,10 @@ func (c *gcContext) scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx contex
}
// remove all buckets for the given node.
func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) (interface{}, error) {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
return nil, nil
}
nsbkt := v1bkt.Bucket([]byte(node.Namespace))
@ -581,7 +582,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
if cc, ok := c.contexts[node.Type]; ok {
cc.Remove(node)
}
return nil
return nil, nil
}
switch node.Type {
@ -592,25 +593,28 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
}
if cbkt != nil {
log.G(ctx).WithField("key", node.Key).Debug("remove content")
return cbkt.DeleteBucket([]byte(node.Key))
return nil, cbkt.DeleteBucket([]byte(node.Key))
}
case ResourceSnapshot:
sbkt := nsbkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
ss, key, ok := strings.Cut(node.Key, "/")
if !ok {
return fmt.Errorf("invalid snapshot gc key %s", node.Key)
return nil, fmt.Errorf("invalid snapshot gc key %s", node.Key)
}
ssbkt := sbkt.Bucket([]byte(ss))
if ssbkt != nil {
log.G(ctx).WithField("key", key).WithField("snapshotter", ss).Debug("remove snapshot")
return ssbkt.DeleteBucket([]byte(key))
return &eventstypes.SnapshotRemove{
Key: key,
Snapshotter: ss,
}, ssbkt.DeleteBucket([]byte(key))
}
}
case ResourceLease:
lbkt := nsbkt.Bucket(bucketKeyObjectLeases)
if lbkt != nil {
return lbkt.DeleteBucket([]byte(node.Key))
return nil, lbkt.DeleteBucket([]byte(node.Key))
}
case ResourceIngest:
ibkt := nsbkt.Bucket(bucketKeyObjectContent)
@ -619,7 +623,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
}
if ibkt != nil {
log.G(ctx).WithField("ref", node.Key).Debug("remove ingest")
return ibkt.DeleteBucket([]byte(node.Key))
return nil, ibkt.DeleteBucket([]byte(node.Key))
}
default:
cc, ok := c.contexts[node.Type]
@ -630,7 +634,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
}
}
return nil
return nil, nil
}
// sendLabelRefs sends all snapshot and content references referred to by the labels in the bkt

View File

@ -237,7 +237,7 @@ func TestGCRemove(t *testing.T) {
if err := db.Update(func(tx *bolt.Tx) error {
for _, n := range deleted {
if err := c.remove(ctx, tx, n); err != nil {
if _, err := c.remove(ctx, tx, n); err != nil {
return err
}
}
@ -482,7 +482,7 @@ func TestCollectibleResources(t *testing.T) {
})
if err := db.Update(func(tx *bolt.Tx) error {
if err := c.remove(ctx, tx, all[removeIndex]); err != nil {
if _, err := c.remove(ctx, tx, all[removeIndex]); err != nil {
return err
}
return nil

View File

@ -29,6 +29,7 @@ import (
"github.com/containerd/containerd/pkg/timeout"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/snapshots"
bolt "go.etcd.io/bbolt"
)
@ -162,7 +163,7 @@ func init() {
}
dbopts := []metadata.DBOpt{
// metadata.WithEventsPublisher(ic.Events),
metadata.WithEventsPublisher(ic.Events),
}
if !shared {

View File

@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/labels"
@ -273,7 +274,22 @@ func (s *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er
}
func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
return s.createSnapshot(ctx, key, parent, false, opts)
mounts, err := s.createSnapshot(ctx, key, parent, false, opts)
if err != nil {
return nil, err
}
if s.db.dbopts.publisher != nil {
if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
Key: key,
Parent: parent,
Snapshotter: s.name,
}); err != nil {
return nil, err
}
}
return mounts, nil
}
func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
@ -618,6 +634,16 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
return err
}
if s.db.dbopts.publisher != nil {
if err := s.db.dbopts.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
Key: key,
Name: name,
Snapshotter: s.name,
}); err != nil {
return err
}
}
return nil
}
@ -631,7 +657,7 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
return err
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
var sbkt *bolt.Bucket
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt != nil {
@ -674,7 +700,17 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
s.db.dirtySS[s.name] = struct{}{}
return nil
}); err != nil {
return err
}
if s.db.dbopts.publisher != nil {
return s.db.dbopts.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
Key: key,
Snapshotter: s.name,
})
}
return nil
}
type infoPair struct {

View File

@ -17,30 +17,16 @@
package snapshots
import (
"context"
eventstypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
"github.com/containerd/containerd/snapshots"
)
// snapshotter wraps snapshots.Snapshotter with proper events published.
type snapshotter struct {
snapshots.Snapshotter
name string
publisher events.Publisher
}
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.ServicePlugin,
ID: services.SnapshotsService,
Requires: []plugin.Type{
plugin.EventPlugin,
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
@ -48,61 +34,8 @@ func init() {
if err != nil {
return nil, err
}
ep, err := ic.Get(plugin.EventPlugin)
if err != nil {
return nil, err
}
db := m.(*metadata.DB)
ss := make(map[string]snapshots.Snapshotter)
for n, sn := range db.Snapshotters() {
ss[n] = newSnapshotter(sn, n, ep.(events.Publisher))
}
return ss, nil
return m.(*metadata.DB).Snapshotters(), nil
},
})
}
func newSnapshotter(sn snapshots.Snapshotter, name string, publisher events.Publisher) snapshots.Snapshotter {
return &snapshotter{
Snapshotter: sn,
name: name,
publisher: publisher,
}
}
func (s *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
mounts, err := s.Snapshotter.Prepare(ctx, key, parent, opts...)
if err != nil {
return nil, err
}
if err := s.publisher.Publish(ctx, "/snapshot/prepare", &eventstypes.SnapshotPrepare{
Key: key,
Parent: parent,
Snapshotter: s.name,
}); err != nil {
return nil, err
}
return mounts, nil
}
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
if err := s.Snapshotter.Commit(ctx, name, key, opts...); err != nil {
return err
}
return s.publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
Key: key,
Name: name,
Snapshotter: s.name,
})
}
func (s *snapshotter) Remove(ctx context.Context, key string) error {
if err := s.Snapshotter.Remove(ctx, key); err != nil {
return err
}
return s.publisher.Publish(ctx, "/snapshot/remove", &eventstypes.SnapshotRemove{
Key: key,
Snapshotter: s.name,
})
}