Move snapshot event publishing into metadata store
Removes the snapshot event publishing from the snapshot service. Adds an option to metadata db to add a publisher. Adds event publishing to prepare, commit, and remove snapshot operations. Adds remove snapshot event to garbage collection. Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
@@ -24,6 +24,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
"github.com/containerd/containerd/gc"
|
||||
"github.com/containerd/containerd/log"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
@@ -569,10 +570,10 @@ func (c *gcContext) scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx contex
|
||||
}
|
||||
|
||||
// remove all buckets for the given node.
|
||||
func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
|
||||
func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) (interface{}, error) {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
nsbkt := v1bkt.Bucket([]byte(node.Namespace))
|
||||
@@ -581,7 +582,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
|
||||
if cc, ok := c.contexts[node.Type]; ok {
|
||||
cc.Remove(node)
|
||||
}
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
switch node.Type {
|
||||
@@ -592,25 +593,28 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
|
||||
}
|
||||
if cbkt != nil {
|
||||
log.G(ctx).WithField("key", node.Key).Debug("remove content")
|
||||
return cbkt.DeleteBucket([]byte(node.Key))
|
||||
return nil, cbkt.DeleteBucket([]byte(node.Key))
|
||||
}
|
||||
case ResourceSnapshot:
|
||||
sbkt := nsbkt.Bucket(bucketKeyObjectSnapshots)
|
||||
if sbkt != nil {
|
||||
ss, key, ok := strings.Cut(node.Key, "/")
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid snapshot gc key %s", node.Key)
|
||||
return nil, fmt.Errorf("invalid snapshot gc key %s", node.Key)
|
||||
}
|
||||
ssbkt := sbkt.Bucket([]byte(ss))
|
||||
if ssbkt != nil {
|
||||
log.G(ctx).WithField("key", key).WithField("snapshotter", ss).Debug("remove snapshot")
|
||||
return ssbkt.DeleteBucket([]byte(key))
|
||||
return &eventstypes.SnapshotRemove{
|
||||
Key: key,
|
||||
Snapshotter: ss,
|
||||
}, ssbkt.DeleteBucket([]byte(key))
|
||||
}
|
||||
}
|
||||
case ResourceLease:
|
||||
lbkt := nsbkt.Bucket(bucketKeyObjectLeases)
|
||||
if lbkt != nil {
|
||||
return lbkt.DeleteBucket([]byte(node.Key))
|
||||
return nil, lbkt.DeleteBucket([]byte(node.Key))
|
||||
}
|
||||
case ResourceIngest:
|
||||
ibkt := nsbkt.Bucket(bucketKeyObjectContent)
|
||||
@@ -619,7 +623,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
|
||||
}
|
||||
if ibkt != nil {
|
||||
log.G(ctx).WithField("ref", node.Key).Debug("remove ingest")
|
||||
return ibkt.DeleteBucket([]byte(node.Key))
|
||||
return nil, ibkt.DeleteBucket([]byte(node.Key))
|
||||
}
|
||||
default:
|
||||
cc, ok := c.contexts[node.Type]
|
||||
@@ -630,7 +634,7 @@ func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// sendLabelRefs sends all snapshot and content references referred to by the labels in the bkt
|
||||
|
||||
Reference in New Issue
Block a user