Merge pull request #6804 from dmcgowan/metadata-collectible-resources
Add collectible resources to metadata gc
This commit is contained in:
commit
2bfc2a587b
@ -94,6 +94,9 @@ type DB struct {
|
||||
// set indicating whether any dirty flags are set
|
||||
mutationCallbacks []func(bool)
|
||||
|
||||
// collectible resources
|
||||
collectors map[gc.ResourceType]Collector
|
||||
|
||||
dbopts dbOptions
|
||||
}
|
||||
|
||||
@ -265,6 +268,37 @@ func (m *DB) RegisterMutationCallback(fn func(bool)) {
|
||||
m.wlock.Unlock()
|
||||
}
|
||||
|
||||
// RegisterCollectibleResource registers a resource type which can be
|
||||
// referenced by metadata resources and garbage collected.
|
||||
// Collectible Resources are useful ephemeral resources which need to
|
||||
// to be tracked by go away after reboot or process restart.
|
||||
//
|
||||
// A few limitations to consider:
|
||||
// - Collectible Resources cannot reference other resources.
|
||||
// - A failure to complete collection will not fail the garbage collection,
|
||||
// however, the resources can be collected in a later run.
|
||||
// - Collectible Resources must track whether the resource is active and/or
|
||||
// lease membership.
|
||||
func (m *DB) RegisterCollectibleResource(t gc.ResourceType, c Collector) {
|
||||
if t < resourceEnd {
|
||||
panic("cannot re-register metadata resource")
|
||||
} else if t >= gc.ResourceMax {
|
||||
panic("resource type greater than max")
|
||||
}
|
||||
|
||||
m.wlock.Lock()
|
||||
defer m.wlock.Unlock()
|
||||
|
||||
if m.collectors == nil {
|
||||
m.collectors = map[gc.ResourceType]Collector{}
|
||||
}
|
||||
|
||||
if _, ok := m.collectors[t]; ok {
|
||||
panic("cannot register collectible type twice")
|
||||
}
|
||||
m.collectors[t] = c
|
||||
}
|
||||
|
||||
// GCStats holds the duration for the different phases of the garbage collector
|
||||
type GCStats struct {
|
||||
MetaD time.Duration
|
||||
@ -281,8 +315,9 @@ func (s GCStats) Elapsed() time.Duration {
|
||||
func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
|
||||
m.wlock.Lock()
|
||||
t1 := time.Now()
|
||||
c := startGCContext(ctx, m.collectors)
|
||||
|
||||
marked, err := m.getMarked(ctx)
|
||||
marked, err := m.getMarked(ctx, c) // Pass in gc context
|
||||
if err != nil {
|
||||
m.wlock.Unlock()
|
||||
return nil, err
|
||||
@ -304,16 +339,17 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
|
||||
} else if n.Type == ResourceContent || n.Type == ResourceIngest {
|
||||
m.dirtyCS = true
|
||||
}
|
||||
return remove(ctx, tx, n)
|
||||
return c.remove(ctx, tx, n) // From gc context
|
||||
}
|
||||
|
||||
if err := scanAll(ctx, tx, rm); err != nil {
|
||||
if err := c.scanAll(ctx, tx, rm); err != nil { // From gc context
|
||||
return fmt.Errorf("failed to scan and remove: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
m.wlock.Unlock()
|
||||
c.cancel(ctx)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -358,13 +394,15 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
|
||||
stats.MetaD = time.Since(t1)
|
||||
m.wlock.Unlock()
|
||||
|
||||
c.finish(ctx)
|
||||
|
||||
wg.Wait()
|
||||
|
||||
return stats, err
|
||||
}
|
||||
|
||||
// getMarked returns all resources that are used.
|
||||
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
|
||||
func (m *DB) getMarked(ctx context.Context, c *gcContext) (map[gc.Node]struct{}, error) {
|
||||
var marked map[gc.Node]struct{}
|
||||
if err := m.db.View(func(tx *bolt.Tx) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@ -383,7 +421,7 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
|
||||
}
|
||||
}()
|
||||
// Call roots
|
||||
if err := scanRoots(ctx, tx, roots); err != nil {
|
||||
if err := c.scanRoots(ctx, tx, roots); err != nil { // From gc context
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
@ -392,7 +430,7 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
|
||||
|
||||
refs := func(n gc.Node) ([]gc.Node, error) {
|
||||
var sn []gc.Node
|
||||
if err := references(ctx, tx, n, func(nn gc.Node) {
|
||||
if err := c.references(ctx, tx, n, func(nn gc.Node) { // From gc context
|
||||
sn = append(sn, nn)
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
|
@ -406,9 +406,45 @@ func TestMetadataCollector(t *testing.T) {
|
||||
Type: "snapshots/native",
|
||||
},
|
||||
}, false, "containerd.io/gc.flat", time.Now().String()),
|
||||
|
||||
// Test Collectible Resource
|
||||
blob(bytesFor(11), false, "containerd.io/gc.ref.test", "test1"),
|
||||
blob(bytesFor(12), true, "containerd.io/gc.ref.test", "test2"),
|
||||
lease("lease-3", []leases.Resource{
|
||||
{
|
||||
ID: digestFor(11).String(),
|
||||
Type: "content",
|
||||
},
|
||||
}, false),
|
||||
}
|
||||
|
||||
testResource = gc.ResourceType(0x10)
|
||||
|
||||
remaining = []gc.Node{
|
||||
gcnode(testResource, "test", "test1"),
|
||||
gcnode(testResource, "test", "test3"),
|
||||
gcnode(testResource, "test", "test4"),
|
||||
}
|
||||
|
||||
collector = &testCollector{
|
||||
all: []gc.Node{
|
||||
gcnode(testResource, "random", "test1"),
|
||||
gcnode(testResource, "test", "test1"),
|
||||
gcnode(testResource, "test", "test2"),
|
||||
gcnode(testResource, "test", "test3"),
|
||||
gcnode(testResource, "test", "test4"),
|
||||
},
|
||||
active: []gc.Node{
|
||||
gcnode(testResource, "test", "test4"),
|
||||
},
|
||||
leased: map[string][]gc.Node{
|
||||
"lease-3": {
|
||||
gcnode(testResource, "test", "test3"),
|
||||
},
|
||||
},
|
||||
}
|
||||
remaining []gc.Node
|
||||
)
|
||||
mdb.RegisterCollectibleResource(testResource, collector)
|
||||
|
||||
if err := mdb.Update(func(tx *bolt.Tx) error {
|
||||
for _, obj := range objects {
|
||||
@ -436,7 +472,8 @@ func TestMetadataCollector(t *testing.T) {
|
||||
actual = append(actual, node)
|
||||
return nil
|
||||
}
|
||||
return scanAll(ctx, tx, scanFn)
|
||||
cc := startGCContext(ctx, mdb.collectors)
|
||||
return cc.scanAll(ctx, tx, scanFn)
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
209
metadata/gc.go
209
metadata/gc.go
@ -20,6 +20,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -43,6 +44,8 @@ const (
|
||||
ResourceLease
|
||||
// ResourceIngest specifies a content ingest
|
||||
ResourceIngest
|
||||
// resourceEnd is the end of specified resource types
|
||||
resourceEnd
|
||||
)
|
||||
|
||||
const (
|
||||
@ -52,15 +55,161 @@ const (
|
||||
|
||||
var (
|
||||
labelGCRoot = []byte("containerd.io/gc.root")
|
||||
labelGCRef = []byte("containerd.io/gc.ref.")
|
||||
labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.")
|
||||
labelGCContentRef = []byte("containerd.io/gc.ref.content")
|
||||
labelGCExpire = []byte("containerd.io/gc.expire")
|
||||
labelGCFlat = []byte("containerd.io/gc.flat")
|
||||
)
|
||||
|
||||
// CollectionContext manages a resource collection during a single run of
|
||||
// the garbage collector. The context is responsible for managing access to
|
||||
// resources as well as tracking removal.
|
||||
// Implementations should defer any longer running operations to the Finish
|
||||
// function and optimize other functions for running fast during garbage
|
||||
// collection write locks.
|
||||
type CollectionContext interface {
|
||||
// Sends all known resources
|
||||
All(func(gc.Node))
|
||||
|
||||
// Active sends all active resources
|
||||
// Leased resources may be excluded since lease ownership should take
|
||||
// precedence over active status.
|
||||
Active(namespace string, fn func(gc.Node))
|
||||
|
||||
// Leased sends all resources associated with the given lease
|
||||
Leased(namespace, lease string, fn func(gc.Node))
|
||||
|
||||
// Remove marks the given resource as removed
|
||||
Remove(gc.Node)
|
||||
|
||||
// Cancel is called to cleanup a context after a failed collection
|
||||
Cancel() error
|
||||
|
||||
// Finish is called to cleanup a context after a successful collection
|
||||
Finish() error
|
||||
}
|
||||
|
||||
// Collector is an interface to manage resource collection for any collectible
|
||||
// resource registered for garbage collection.
|
||||
type Collector interface {
|
||||
StartCollection(context.Context) (CollectionContext, error)
|
||||
|
||||
ReferenceLabel() string
|
||||
}
|
||||
|
||||
type gcContext struct {
|
||||
labelHandlers []referenceLabelHandler
|
||||
contexts map[gc.ResourceType]CollectionContext
|
||||
}
|
||||
|
||||
type referenceLabelHandler struct {
|
||||
key []byte
|
||||
fn func(string, []byte, []byte, func(gc.Node))
|
||||
}
|
||||
|
||||
func startGCContext(ctx context.Context, collectors map[gc.ResourceType]Collector) *gcContext {
|
||||
var contexts map[gc.ResourceType]CollectionContext
|
||||
labelHandlers := []referenceLabelHandler{
|
||||
{
|
||||
key: labelGCContentRef,
|
||||
fn: func(ns string, k, v []byte, fn func(gc.Node)) {
|
||||
if ks := string(k); ks != string(labelGCContentRef) {
|
||||
// Allow reference naming separated by . or /, ignore names
|
||||
if ks[len(labelGCContentRef)] != '.' && ks[len(labelGCContentRef)] != '/' {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
fn(gcnode(ResourceContent, ns, string(v)))
|
||||
},
|
||||
},
|
||||
{
|
||||
key: labelGCSnapRef,
|
||||
fn: func(ns string, k, v []byte, fn func(gc.Node)) {
|
||||
snapshotter := k[len(labelGCSnapRef):]
|
||||
if i := bytes.IndexByte(snapshotter, '/'); i >= 0 {
|
||||
snapshotter = snapshotter[:i]
|
||||
}
|
||||
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v)))
|
||||
},
|
||||
},
|
||||
}
|
||||
if len(collectors) > 0 {
|
||||
contexts = map[gc.ResourceType]CollectionContext{}
|
||||
for rt, collector := range collectors {
|
||||
c, err := collector.StartCollection(ctx)
|
||||
if err != nil {
|
||||
// Only skipping this resource this round
|
||||
continue
|
||||
}
|
||||
|
||||
if reflabel := collector.ReferenceLabel(); reflabel != "" {
|
||||
key := append(labelGCRef, reflabel...)
|
||||
labelHandlers = append(labelHandlers, referenceLabelHandler{
|
||||
key: key,
|
||||
fn: func(ns string, k, v []byte, fn func(gc.Node)) {
|
||||
if ks := string(k); ks != string(key) {
|
||||
// Allow reference naming separated by . or /, ignore names
|
||||
if ks[len(key)] != '.' && ks[len(key)] != '/' {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
fn(gcnode(rt, ns, string(v)))
|
||||
},
|
||||
})
|
||||
}
|
||||
contexts[rt] = c
|
||||
}
|
||||
// Sort labelHandlers to ensure key seeking is always forwardS
|
||||
sort.Slice(labelHandlers, func(i, j int) bool {
|
||||
return bytes.Compare(labelHandlers[i].key, labelHandlers[j].key) < 0
|
||||
})
|
||||
}
|
||||
return &gcContext{
|
||||
labelHandlers: labelHandlers,
|
||||
contexts: contexts,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gcContext) all(fn func(gc.Node)) {
|
||||
for _, gctx := range c.contexts {
|
||||
gctx.All(fn)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gcContext) active(namespace string, fn func(gc.Node)) {
|
||||
for _, gctx := range c.contexts {
|
||||
gctx.Active(namespace, fn)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gcContext) leased(namespace, lease string, fn func(gc.Node)) {
|
||||
for _, gctx := range c.contexts {
|
||||
gctx.Leased(namespace, lease, fn)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gcContext) cancel(ctx context.Context) {
|
||||
for _, gctx := range c.contexts {
|
||||
if err := gctx.Cancel(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to cancel collection context")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gcContext) finish(ctx context.Context) {
|
||||
for _, gctx := range c.contexts {
|
||||
if err := gctx.Finish(); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to finish collection context")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// scanRoots sends the given channel "root" resources that are certainly used.
|
||||
// The caller could look the references of the resources to find all resources that are used.
|
||||
func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
func (c *gcContext) scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
@ -170,6 +319,8 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
}
|
||||
}
|
||||
|
||||
c.leased(ns, string(k), fn)
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
@ -188,7 +339,7 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
contentKey := string(target.Get(bucketKeyDigest))
|
||||
fn(gcnode(ResourceContent, ns, contentKey))
|
||||
}
|
||||
return sendLabelRefs(ns, ibkt.Bucket(k), fn)
|
||||
return c.sendLabelRefs(ns, ibkt.Bucket(k), fn)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -247,7 +398,7 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, ss)))
|
||||
}
|
||||
|
||||
return sendLabelRefs(ns, cibkt, fn)
|
||||
return c.sendLabelRefs(ns, cibkt, fn)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -274,12 +425,14 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.active(ns, fn)
|
||||
}
|
||||
return cerr
|
||||
}
|
||||
|
||||
// references finds the resources that are reachable from the given node.
|
||||
func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error {
|
||||
func (c *gcContext) references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error {
|
||||
switch node.Type {
|
||||
case ResourceContent:
|
||||
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(node.Key))
|
||||
@ -288,7 +441,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
|
||||
return nil
|
||||
}
|
||||
|
||||
return sendLabelRefs(node.Namespace, bkt, fn)
|
||||
return c.sendLabelRefs(node.Namespace, bkt, fn)
|
||||
case ResourceSnapshot, resourceSnapshotFlat:
|
||||
parts := strings.SplitN(node.Key, "/", 2)
|
||||
if len(parts) != 2 {
|
||||
@ -312,7 +465,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
|
||||
return nil
|
||||
}
|
||||
|
||||
return sendLabelRefs(node.Namespace, bkt, fn)
|
||||
return c.sendLabelRefs(node.Namespace, bkt, fn)
|
||||
case ResourceIngest:
|
||||
// Send expected value
|
||||
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(node.Key))
|
||||
@ -332,7 +485,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
|
||||
}
|
||||
|
||||
// scanAll finds all resources regardless whether the resources are used or not.
|
||||
func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error {
|
||||
func (c *gcContext) scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
@ -409,11 +562,15 @@ func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc
|
||||
}
|
||||
}
|
||||
|
||||
c.all(func(n gc.Node) {
|
||||
fn(ctx, n)
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// remove all buckets for the given node.
|
||||
func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
|
||||
func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
@ -421,6 +578,10 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
|
||||
|
||||
nsbkt := v1bkt.Bucket([]byte(node.Namespace))
|
||||
if nsbkt == nil {
|
||||
// Still remove object if refenced outside the db
|
||||
if cc, ok := c.contexts[node.Type]; ok {
|
||||
cc.Remove(node)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -461,37 +622,29 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
|
||||
log.G(ctx).WithField("ref", node.Key).Debug("remove ingest")
|
||||
return ibkt.DeleteBucket([]byte(node.Key))
|
||||
}
|
||||
default:
|
||||
cc, ok := c.contexts[node.Type]
|
||||
if ok {
|
||||
cc.Remove(node)
|
||||
} else {
|
||||
log.G(ctx).WithField("ref", node.Key).WithField("type", node.Type).Info("no remove defined for resource")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendLabelRefs sends all snapshot and content references referred to by the labels in the bkt
|
||||
func sendLabelRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
|
||||
func (c *gcContext) sendLabelRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
|
||||
lbkt := bkt.Bucket(bucketKeyObjectLabels)
|
||||
if lbkt != nil {
|
||||
lc := lbkt.Cursor()
|
||||
|
||||
labelRef := string(labelGCContentRef)
|
||||
for k, v := lc.Seek(labelGCContentRef); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() {
|
||||
if ks := string(k); ks != labelRef {
|
||||
// Allow reference naming separated by . or /, ignore names
|
||||
if ks[len(labelRef)] != '.' && ks[len(labelRef)] != '/' {
|
||||
continue
|
||||
}
|
||||
for i := range c.labelHandlers {
|
||||
labelRef := string(c.labelHandlers[i].key)
|
||||
for k, v := lc.Seek(c.labelHandlers[i].key); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() {
|
||||
c.labelHandlers[i].fn(ns, k, v, fn)
|
||||
}
|
||||
|
||||
fn(gcnode(ResourceContent, ns, string(v)))
|
||||
}
|
||||
|
||||
for k, v := lc.Seek(labelGCSnapRef); k != nil && strings.HasPrefix(string(k), string(labelGCSnapRef)); k, v = lc.Next() {
|
||||
snapshotter := k[len(labelGCSnapRef):]
|
||||
if i := bytes.IndexByte(snapshotter, '/'); i >= 0 {
|
||||
snapshotter = snapshotter[:i]
|
||||
}
|
||||
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v)))
|
||||
}
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -157,7 +157,7 @@ func TestGCRoots(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
checkNodeC(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return scanRoots(ctx, tx, nc)
|
||||
return startGCContext(ctx, nil).scanRoots(ctx, tx, nc)
|
||||
})
|
||||
}
|
||||
|
||||
@ -230,9 +230,10 @@ func TestGCRemove(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
c := startGCContext(ctx, nil)
|
||||
|
||||
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
|
||||
return scanAll(ctx, tx, fn)
|
||||
return c.scanAll(ctx, tx, fn)
|
||||
})
|
||||
if t.Failed() {
|
||||
t.Fatal("Scan all failed")
|
||||
@ -240,7 +241,7 @@ func TestGCRemove(t *testing.T) {
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
for _, n := range deleted {
|
||||
if err := remove(ctx, tx, n); err != nil {
|
||||
if err := c.remove(ctx, tx, n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -250,7 +251,7 @@ func TestGCRemove(t *testing.T) {
|
||||
}
|
||||
|
||||
checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
|
||||
return scanAll(ctx, tx, fn)
|
||||
return c.scanAll(ctx, tx, fn)
|
||||
})
|
||||
}
|
||||
|
||||
@ -370,10 +371,11 @@ func TestGCRefs(t *testing.T) {
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
c := startGCContext(ctx, nil)
|
||||
|
||||
for n, nodes := range refs {
|
||||
checkNodeC(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return references(ctx, tx, n, func(n gc.Node) {
|
||||
return c.references(ctx, tx, n, func(n gc.Node) {
|
||||
select {
|
||||
case nc <- n:
|
||||
case <-ctx.Done():
|
||||
@ -386,6 +388,174 @@ func TestGCRefs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCollectibleResources(t *testing.T) {
|
||||
db, cleanup, err := newDatabase(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testResource := gc.ResourceType(0x10)
|
||||
defer cleanup()
|
||||
alters := []alterFunc{
|
||||
addContent("ns1", dgst(1), nil),
|
||||
addImage("ns1", "image1", dgst(1), nil),
|
||||
addContent("ns1", dgst(2), map[string]string{
|
||||
"containerd.io/gc.ref.test": "test2",
|
||||
}),
|
||||
addImage("ns1", "image2", dgst(2), nil),
|
||||
addLease("ns1", "lease1", labelmap(string(labelGCExpire), time.Now().Add(time.Hour).Format(time.RFC3339))),
|
||||
addLease("ns1", "lease2", labelmap(string(labelGCExpire), time.Now().Add(-1*time.Hour).Format(time.RFC3339))),
|
||||
}
|
||||
refs := map[gc.Node][]gc.Node{
|
||||
gcnode(ResourceContent, "ns1", dgst(1).String()): nil,
|
||||
gcnode(ResourceContent, "ns1", dgst(2).String()): {
|
||||
gcnode(testResource, "ns1", "test2"),
|
||||
},
|
||||
}
|
||||
all := []gc.Node{
|
||||
gcnode(ResourceContent, "ns1", dgst(1).String()),
|
||||
gcnode(ResourceContent, "ns1", dgst(2).String()),
|
||||
gcnode(ResourceLease, "ns1", "lease1"),
|
||||
gcnode(ResourceLease, "ns1", "lease2"),
|
||||
gcnode(testResource, "ns1", "test1"),
|
||||
gcnode(testResource, "ns1", "test2"), // 5: Will be removed
|
||||
gcnode(testResource, "ns1", "test3"),
|
||||
gcnode(testResource, "ns1", "test4"),
|
||||
}
|
||||
removeIndex := 5
|
||||
roots := []gc.Node{
|
||||
gcnode(ResourceContent, "ns1", dgst(1).String()),
|
||||
gcnode(ResourceContent, "ns1", dgst(2).String()),
|
||||
gcnode(ResourceLease, "ns1", "lease1"),
|
||||
gcnode(testResource, "ns1", "test1"),
|
||||
gcnode(testResource, "ns1", "test3"),
|
||||
}
|
||||
collector := &testCollector{
|
||||
all: []gc.Node{
|
||||
gcnode(testResource, "ns1", "test1"),
|
||||
gcnode(testResource, "ns1", "test2"),
|
||||
gcnode(testResource, "ns1", "test3"),
|
||||
gcnode(testResource, "ns1", "test4"),
|
||||
},
|
||||
active: []gc.Node{
|
||||
gcnode(testResource, "ns1", "test1"),
|
||||
},
|
||||
leased: map[string][]gc.Node{
|
||||
"lease1": {
|
||||
gcnode(testResource, "ns1", "test3"),
|
||||
},
|
||||
"lease2": {
|
||||
gcnode(testResource, "ns1", "test4"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, alter := range alters {
|
||||
if err := alter(v1bkt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Update failed: %+v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
c := startGCContext(ctx, map[gc.ResourceType]Collector{
|
||||
testResource: collector,
|
||||
})
|
||||
|
||||
for n, nodes := range refs {
|
||||
checkNodeC(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return c.references(ctx, tx, n, func(n gc.Node) {
|
||||
select {
|
||||
case nc <- n:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
})
|
||||
})
|
||||
if t.Failed() {
|
||||
t.Fatalf("Failure scanning %v", n)
|
||||
}
|
||||
}
|
||||
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
|
||||
return c.scanAll(ctx, tx, fn)
|
||||
})
|
||||
checkNodeC(ctx, t, db, roots, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return c.scanRoots(ctx, tx, nc)
|
||||
})
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
if err := c.remove(ctx, tx, all[removeIndex]); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Update failed: %+v", err)
|
||||
}
|
||||
all = append(all[:removeIndex], all[removeIndex+1:]...)
|
||||
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
|
||||
return c.scanAll(ctx, tx, fn)
|
||||
})
|
||||
}
|
||||
|
||||
type testCollector struct {
|
||||
all []gc.Node
|
||||
active []gc.Node
|
||||
leased map[string][]gc.Node
|
||||
}
|
||||
|
||||
func (tc *testCollector) StartCollection(context.Context) (CollectionContext, error) {
|
||||
return tc, nil
|
||||
}
|
||||
|
||||
func (tc *testCollector) ReferenceLabel() string {
|
||||
return "test"
|
||||
}
|
||||
|
||||
func (tc *testCollector) All(fn func(gc.Node)) {
|
||||
for _, n := range tc.all {
|
||||
fn(n)
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *testCollector) Active(namespace string, fn func(gc.Node)) {
|
||||
for _, n := range tc.active {
|
||||
if n.Namespace == namespace {
|
||||
fn(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *testCollector) Leased(namespace, lease string, fn func(gc.Node)) {
|
||||
for _, n := range tc.leased[lease] {
|
||||
if n.Namespace == namespace {
|
||||
fn(n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *testCollector) Remove(n gc.Node) {
|
||||
for i := range tc.all {
|
||||
if tc.all[i] == n {
|
||||
tc.all = append(tc.all[:i], tc.all[i+1:]...)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (tc *testCollector) Cancel() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (tc *testCollector) Finish() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newDatabase(t testing.TB) (*bolt.DB, func(), error) {
|
||||
td := t.TempDir()
|
||||
|
||||
@ -400,6 +570,7 @@ func newDatabase(t testing.TB) (*bolt.DB, func(), error) {
|
||||
}
|
||||
|
||||
func checkNodeC(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) {
|
||||
t.Helper()
|
||||
var actual []gc.Node
|
||||
nc := make(chan gc.Node)
|
||||
done := make(chan struct{})
|
||||
@ -421,6 +592,7 @@ func checkNodeC(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.No
|
||||
}
|
||||
|
||||
func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, func(context.Context, gc.Node) error) error) {
|
||||
t.Helper()
|
||||
var actual []gc.Node
|
||||
scanFn := func(ctx context.Context, n gc.Node) error {
|
||||
actual = append(actual, n)
|
||||
@ -437,6 +609,7 @@ func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.No
|
||||
}
|
||||
|
||||
func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) {
|
||||
t.Helper()
|
||||
sort.Sort(nodeList(n1))
|
||||
sort.Sort(nodeList(n2))
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user