From 8367f69fb5b02e4d59eca4252ea10527862bb07c Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 11 Apr 2022 20:03:13 -0700 Subject: [PATCH 1/2] Add collectible resources to metadata gc Adds a registration function to metadata which allows plugins to register resources to be garbage collected. These resources allow defining resources types which are ephemeral and stored outside the metadata plugin without extending it. The garbage collection of these resources will not fail the metadata gc process if their removal fails. These resources may be referenced by existing metadata store resources but may not be used to reference metadata store resources for the purpose of preventing garbage collection. Signed-off-by: Derek McGowan --- metadata/db.go | 49 +++++++++-- metadata/db_test.go | 41 ++++++++- metadata/gc.go | 207 ++++++++++++++++++++++++++++++++++++++------ metadata/gc_test.go | 183 +++++++++++++++++++++++++++++++++++++-- 4 files changed, 439 insertions(+), 41 deletions(-) diff --git a/metadata/db.go b/metadata/db.go index 2d9cbf31a..9d8a4f5c9 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -94,6 +94,9 @@ type DB struct { // set indicating whether any dirty flags are set mutationCallbacks []func(bool) + // collectible resources + collectors map[gc.ResourceType]Collector + dbopts dbOptions } @@ -265,6 +268,36 @@ func (m *DB) RegisterMutationCallback(fn func(bool)) { m.wlock.Unlock() } +// RegisterCollectibleResource registers a resource type which can be +// referenced by metadata resources and garbage collected. +// Collectible Resources are useful ephemeral resources which need to +// to be tracked by go away after reboot or process restart. +// +// A few limitations to consider: +// - Collectible Resources cannot reference other resources. +// - A failure to complete collection will not fail the garbage collection, +// however, the resources can be collected in a later run. +// - Collectible Resources must track whether the resource is active and/or +// lease membership. +func (m *DB) RegisterCollectibleResource(t gc.ResourceType, c Collector) { + m.wlock.Lock() + defer m.wlock.Unlock() + + if m.collectors == nil { + m.collectors = map[gc.ResourceType]Collector{} + } + + switch t { + case ResourceContainer: + panic("cannot re-register metadata resource") + default: + if _, ok := m.collectors[t]; ok { + panic("cannot register collectible type twice") + } + m.collectors[t] = c + } +} + // GCStats holds the duration for the different phases of the garbage collector type GCStats struct { MetaD time.Duration @@ -281,8 +314,9 @@ func (s GCStats) Elapsed() time.Duration { func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { m.wlock.Lock() t1 := time.Now() + c := startGCContext(ctx, m.collectors) - marked, err := m.getMarked(ctx) + marked, err := m.getMarked(ctx, c) // Pass in gc context if err != nil { m.wlock.Unlock() return nil, err @@ -304,16 +338,17 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { } else if n.Type == ResourceContent || n.Type == ResourceIngest { m.dirtyCS = true } - return remove(ctx, tx, n) + return c.remove(ctx, tx, n) // From gc context } - if err := scanAll(ctx, tx, rm); err != nil { + if err := c.scanAll(ctx, tx, rm); err != nil { // From gc context return fmt.Errorf("failed to scan and remove: %w", err) } return nil }); err != nil { m.wlock.Unlock() + c.cancel(ctx) return nil, err } @@ -358,13 +393,15 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { stats.MetaD = time.Since(t1) m.wlock.Unlock() + c.finish(ctx) + wg.Wait() return stats, err } // getMarked returns all resources that are used. -func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { +func (m *DB) getMarked(ctx context.Context, c *gcContext) (map[gc.Node]struct{}, error) { var marked map[gc.Node]struct{} if err := m.db.View(func(tx *bolt.Tx) error { ctx, cancel := context.WithCancel(ctx) @@ -383,7 +420,7 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { } }() // Call roots - if err := scanRoots(ctx, tx, roots); err != nil { + if err := c.scanRoots(ctx, tx, roots); err != nil { // From gc context cancel() return err } @@ -392,7 +429,7 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) { refs := func(n gc.Node) ([]gc.Node, error) { var sn []gc.Node - if err := references(ctx, tx, n, func(nn gc.Node) { + if err := c.references(ctx, tx, n, func(nn gc.Node) { // From gc context sn = append(sn, nn) }); err != nil { return nil, err diff --git a/metadata/db_test.go b/metadata/db_test.go index 707d9a9f4..a983533ca 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -406,9 +406,45 @@ func TestMetadataCollector(t *testing.T) { Type: "snapshots/native", }, }, false, "containerd.io/gc.flat", time.Now().String()), + + // Test Collectible Resource + blob(bytesFor(11), false, "containerd.io/gc.ref.test", "test1"), + blob(bytesFor(12), true, "containerd.io/gc.ref.test", "test2"), + lease("lease-3", []leases.Resource{ + { + ID: digestFor(11).String(), + Type: "content", + }, + }, false), + } + + testResource = gc.ResourceType(0x10) + + remaining = []gc.Node{ + gcnode(testResource, "test", "test1"), + gcnode(testResource, "test", "test3"), + gcnode(testResource, "test", "test4"), + } + + collector = &testCollector{ + all: []gc.Node{ + gcnode(testResource, "random", "test1"), + gcnode(testResource, "test", "test1"), + gcnode(testResource, "test", "test2"), + gcnode(testResource, "test", "test3"), + gcnode(testResource, "test", "test4"), + }, + active: []gc.Node{ + gcnode(testResource, "test", "test4"), + }, + leased: map[string][]gc.Node{ + "lease-3": { + gcnode(testResource, "test", "test3"), + }, + }, } - remaining []gc.Node ) + mdb.RegisterCollectibleResource(testResource, collector) if err := mdb.Update(func(tx *bolt.Tx) error { for _, obj := range objects { @@ -436,7 +472,8 @@ func TestMetadataCollector(t *testing.T) { actual = append(actual, node) return nil } - return scanAll(ctx, tx, scanFn) + cc := startGCContext(ctx, mdb.collectors) + return cc.scanAll(ctx, tx, scanFn) }); err != nil { t.Fatal(err) } diff --git a/metadata/gc.go b/metadata/gc.go index 60bf410a6..4fadb40bf 100644 --- a/metadata/gc.go +++ b/metadata/gc.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "fmt" + "sort" "strings" "time" @@ -52,15 +53,161 @@ const ( var ( labelGCRoot = []byte("containerd.io/gc.root") + labelGCRef = []byte("containerd.io/gc.ref.") labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.") labelGCContentRef = []byte("containerd.io/gc.ref.content") labelGCExpire = []byte("containerd.io/gc.expire") labelGCFlat = []byte("containerd.io/gc.flat") ) +// CollectionContext manages a resource collection during a single run of +// the garbage collector. The context is responsible for managing access to +// resources as well as tracking removal. +// Implementations should defer any longer running operations to the Finish +// function and optimize other functions for running fast during garbage +// collection write locks. +type CollectionContext interface { + // Sends all known resources + All(func(gc.Node)) + + // Active sends all active resources + // Leased resources may be excluded since lease ownership should take + // precedence over active status. + Active(namespace string, fn func(gc.Node)) + + // Leased sends all resources associated with the given lease + Leased(namespace, lease string, fn func(gc.Node)) + + // Remove marks the given resource as removed + Remove(gc.Node) + + // Cancel is called to cleanup a context after a failed collection + Cancel() error + + // Finish is called to cleanup a context after a successful collection + Finish() error +} + +// Collector is an interface to manage resource collection for any collectible +// resource registered for garbage collection. +type Collector interface { + StartCollection(context.Context) (CollectionContext, error) + + ReferenceLabel() string +} + +type gcContext struct { + labelHandlers []referenceLabelHandler + contexts map[gc.ResourceType]CollectionContext +} + +type referenceLabelHandler struct { + key []byte + fn func(string, []byte, []byte, func(gc.Node)) +} + +func startGCContext(ctx context.Context, collectors map[gc.ResourceType]Collector) *gcContext { + var contexts map[gc.ResourceType]CollectionContext + labelHandlers := []referenceLabelHandler{ + { + key: labelGCContentRef, + fn: func(ns string, k, v []byte, fn func(gc.Node)) { + if ks := string(k); ks != string(labelGCContentRef) { + // Allow reference naming separated by . or /, ignore names + if ks[len(labelGCContentRef)] != '.' && ks[len(labelGCContentRef)] != '/' { + return + } + } + + fn(gcnode(ResourceContent, ns, string(v))) + }, + }, + { + key: labelGCSnapRef, + fn: func(ns string, k, v []byte, fn func(gc.Node)) { + snapshotter := k[len(labelGCSnapRef):] + if i := bytes.IndexByte(snapshotter, '/'); i >= 0 { + snapshotter = snapshotter[:i] + } + fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v))) + }, + }, + } + if len(collectors) > 0 { + contexts = map[gc.ResourceType]CollectionContext{} + for rt, collector := range collectors { + c, err := collector.StartCollection(ctx) + if err != nil { + // Only skipping this resource this round + continue + } + + if reflabel := collector.ReferenceLabel(); reflabel != "" { + key := append(labelGCRef, reflabel...) + labelHandlers = append(labelHandlers, referenceLabelHandler{ + key: key, + fn: func(ns string, k, v []byte, fn func(gc.Node)) { + if ks := string(k); ks != string(key) { + // Allow reference naming separated by . or /, ignore names + if ks[len(key)] != '.' && ks[len(key)] != '/' { + return + } + } + + fn(gcnode(rt, ns, string(v))) + }, + }) + } + contexts[rt] = c + } + // Sort labelHandlers to ensure key seeking is always forwardS + sort.Slice(labelHandlers, func(i, j int) bool { + return bytes.Compare(labelHandlers[i].key, labelHandlers[j].key) < 0 + }) + } + return &gcContext{ + labelHandlers: labelHandlers, + contexts: contexts, + } +} + +func (c *gcContext) all(fn func(gc.Node)) { + for _, gctx := range c.contexts { + gctx.All(fn) + } +} + +func (c *gcContext) active(namespace string, fn func(gc.Node)) { + for _, gctx := range c.contexts { + gctx.Active(namespace, fn) + } +} + +func (c *gcContext) leased(namespace, lease string, fn func(gc.Node)) { + for _, gctx := range c.contexts { + gctx.Leased(namespace, lease, fn) + } +} + +func (c *gcContext) cancel(ctx context.Context) { + for _, gctx := range c.contexts { + if err := gctx.Cancel(); err != nil { + log.G(ctx).WithError(err).Error("failed to cancel collection context") + } + } +} + +func (c *gcContext) finish(ctx context.Context) { + for _, gctx := range c.contexts { + if err := gctx.Finish(); err != nil { + log.G(ctx).WithError(err).Error("failed to finish collection context") + } + } +} + // scanRoots sends the given channel "root" resources that are certainly used. // The caller could look the references of the resources to find all resources that are used. -func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { +func (c *gcContext) scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { v1bkt := tx.Bucket(bucketKeyVersion) if v1bkt == nil { return nil @@ -170,6 +317,8 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { } } + c.leased(ns, string(k), fn) + return nil }); err != nil { return err @@ -188,7 +337,7 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { contentKey := string(target.Get(bucketKeyDigest)) fn(gcnode(ResourceContent, ns, contentKey)) } - return sendLabelRefs(ns, ibkt.Bucket(k), fn) + return c.sendLabelRefs(ns, ibkt.Bucket(k), fn) }); err != nil { return err } @@ -247,7 +396,7 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, ss))) } - return sendLabelRefs(ns, cibkt, fn) + return c.sendLabelRefs(ns, cibkt, fn) }); err != nil { return err } @@ -274,12 +423,14 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { return err } } + + c.active(ns, fn) } return cerr } // references finds the resources that are reachable from the given node. -func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error { +func (c *gcContext) references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error { switch node.Type { case ResourceContent: bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(node.Key)) @@ -288,7 +439,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node) return nil } - return sendLabelRefs(node.Namespace, bkt, fn) + return c.sendLabelRefs(node.Namespace, bkt, fn) case ResourceSnapshot, resourceSnapshotFlat: parts := strings.SplitN(node.Key, "/", 2) if len(parts) != 2 { @@ -312,7 +463,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node) return nil } - return sendLabelRefs(node.Namespace, bkt, fn) + return c.sendLabelRefs(node.Namespace, bkt, fn) case ResourceIngest: // Send expected value bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(node.Key)) @@ -332,7 +483,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node) } // scanAll finds all resources regardless whether the resources are used or not. -func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error { +func (c *gcContext) scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error { v1bkt := tx.Bucket(bucketKeyVersion) if v1bkt == nil { return nil @@ -409,11 +560,15 @@ func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc } } + c.all(func(n gc.Node) { + fn(ctx, n) + }) + return nil } // remove all buckets for the given node. -func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { +func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { v1bkt := tx.Bucket(bucketKeyVersion) if v1bkt == nil { return nil @@ -421,6 +576,10 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { nsbkt := v1bkt.Bucket([]byte(node.Namespace)) if nsbkt == nil { + // Still remove object if refenced outside the db + if cc, ok := c.contexts[node.Type]; ok { + cc.Remove(node) + } return nil } @@ -461,37 +620,29 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { log.G(ctx).WithField("ref", node.Key).Debug("remove ingest") return ibkt.DeleteBucket([]byte(node.Key)) } + default: + cc, ok := c.contexts[node.Type] + if ok { + cc.Remove(node) + } else { + log.G(ctx).WithField("ref", node.Key).WithField("type", node.Type).Info("no remove defined for resource") + } } return nil } // sendLabelRefs sends all snapshot and content references referred to by the labels in the bkt -func sendLabelRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error { +func (c *gcContext) sendLabelRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error { lbkt := bkt.Bucket(bucketKeyObjectLabels) if lbkt != nil { lc := lbkt.Cursor() - - labelRef := string(labelGCContentRef) - for k, v := lc.Seek(labelGCContentRef); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() { - if ks := string(k); ks != labelRef { - // Allow reference naming separated by . or /, ignore names - if ks[len(labelRef)] != '.' && ks[len(labelRef)] != '/' { - continue - } + for i := range c.labelHandlers { + labelRef := string(c.labelHandlers[i].key) + for k, v := lc.Seek(c.labelHandlers[i].key); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() { + c.labelHandlers[i].fn(ns, k, v, fn) } - - fn(gcnode(ResourceContent, ns, string(v))) } - - for k, v := lc.Seek(labelGCSnapRef); k != nil && strings.HasPrefix(string(k), string(labelGCSnapRef)); k, v = lc.Next() { - snapshotter := k[len(labelGCSnapRef):] - if i := bytes.IndexByte(snapshotter, '/'); i >= 0 { - snapshotter = snapshotter[:i] - } - fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v))) - } - } return nil } diff --git a/metadata/gc_test.go b/metadata/gc_test.go index 9a514e935..a27210a2b 100644 --- a/metadata/gc_test.go +++ b/metadata/gc_test.go @@ -157,7 +157,7 @@ func TestGCRoots(t *testing.T) { ctx := context.Background() checkNodeC(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { - return scanRoots(ctx, tx, nc) + return startGCContext(ctx, nil).scanRoots(ctx, tx, nc) }) } @@ -230,9 +230,10 @@ func TestGCRemove(t *testing.T) { } ctx := context.Background() + c := startGCContext(ctx, nil) checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error { - return scanAll(ctx, tx, fn) + return c.scanAll(ctx, tx, fn) }) if t.Failed() { t.Fatal("Scan all failed") @@ -240,7 +241,7 @@ func TestGCRemove(t *testing.T) { if err := db.Update(func(tx *bolt.Tx) error { for _, n := range deleted { - if err := remove(ctx, tx, n); err != nil { + if err := c.remove(ctx, tx, n); err != nil { return err } } @@ -250,7 +251,7 @@ func TestGCRemove(t *testing.T) { } checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error { - return scanAll(ctx, tx, fn) + return c.scanAll(ctx, tx, fn) }) } @@ -370,10 +371,11 @@ func TestGCRefs(t *testing.T) { } ctx := context.Background() + c := startGCContext(ctx, nil) for n, nodes := range refs { checkNodeC(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { - return references(ctx, tx, n, func(n gc.Node) { + return c.references(ctx, tx, n, func(n gc.Node) { select { case nc <- n: case <-ctx.Done(): @@ -386,6 +388,174 @@ func TestGCRefs(t *testing.T) { } } +func TestCollectibleResources(t *testing.T) { + db, cleanup, err := newDatabase(t) + if err != nil { + t.Fatal(err) + } + testResource := gc.ResourceType(0x10) + defer cleanup() + alters := []alterFunc{ + addContent("ns1", dgst(1), nil), + addImage("ns1", "image1", dgst(1), nil), + addContent("ns1", dgst(2), map[string]string{ + "containerd.io/gc.ref.test": "test2", + }), + addImage("ns1", "image2", dgst(2), nil), + addLease("ns1", "lease1", labelmap(string(labelGCExpire), time.Now().Add(time.Hour).Format(time.RFC3339))), + addLease("ns1", "lease2", labelmap(string(labelGCExpire), time.Now().Add(-1*time.Hour).Format(time.RFC3339))), + } + refs := map[gc.Node][]gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()): nil, + gcnode(ResourceContent, "ns1", dgst(2).String()): { + gcnode(testResource, "ns1", "test2"), + }, + } + all := []gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()), + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceLease, "ns1", "lease1"), + gcnode(ResourceLease, "ns1", "lease2"), + gcnode(testResource, "ns1", "test1"), + gcnode(testResource, "ns1", "test2"), // 5: Will be removed + gcnode(testResource, "ns1", "test3"), + gcnode(testResource, "ns1", "test4"), + } + removeIndex := 5 + roots := []gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()), + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceLease, "ns1", "lease1"), + gcnode(testResource, "ns1", "test1"), + gcnode(testResource, "ns1", "test3"), + } + collector := &testCollector{ + all: []gc.Node{ + gcnode(testResource, "ns1", "test1"), + gcnode(testResource, "ns1", "test2"), + gcnode(testResource, "ns1", "test3"), + gcnode(testResource, "ns1", "test4"), + }, + active: []gc.Node{ + gcnode(testResource, "ns1", "test1"), + }, + leased: map[string][]gc.Node{ + "lease1": { + gcnode(testResource, "ns1", "test3"), + }, + "lease2": { + gcnode(testResource, "ns1", "test4"), + }, + }, + } + + if err := db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + for _, alter := range alters { + if err := alter(v1bkt); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + ctx := context.Background() + c := startGCContext(ctx, map[gc.ResourceType]Collector{ + testResource: collector, + }) + + for n, nodes := range refs { + checkNodeC(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return c.references(ctx, tx, n, func(n gc.Node) { + select { + case nc <- n: + case <-ctx.Done(): + } + }) + }) + if t.Failed() { + t.Fatalf("Failure scanning %v", n) + } + } + checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error { + return c.scanAll(ctx, tx, fn) + }) + checkNodeC(ctx, t, db, roots, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return c.scanRoots(ctx, tx, nc) + }) + + if err := db.Update(func(tx *bolt.Tx) error { + if err := c.remove(ctx, tx, all[removeIndex]); err != nil { + return err + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + all = append(all[:removeIndex], all[removeIndex+1:]...) + checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error { + return c.scanAll(ctx, tx, fn) + }) +} + +type testCollector struct { + all []gc.Node + active []gc.Node + leased map[string][]gc.Node +} + +func (tc *testCollector) StartCollection(context.Context) (CollectionContext, error) { + return tc, nil +} + +func (tc *testCollector) ReferenceLabel() string { + return "test" +} + +func (tc *testCollector) All(fn func(gc.Node)) { + for _, n := range tc.all { + fn(n) + } +} + +func (tc *testCollector) Active(namespace string, fn func(gc.Node)) { + for _, n := range tc.active { + if n.Namespace == namespace { + fn(n) + } + } +} + +func (tc *testCollector) Leased(namespace, lease string, fn func(gc.Node)) { + for _, n := range tc.leased[lease] { + if n.Namespace == namespace { + fn(n) + } + } +} + +func (tc *testCollector) Remove(n gc.Node) { + for i := range tc.all { + if tc.all[i] == n { + tc.all = append(tc.all[:i], tc.all[i+1:]...) + return + } + } +} + +func (tc *testCollector) Cancel() error { + return nil +} + +func (tc *testCollector) Finish() error { + return nil +} + func newDatabase(t testing.TB) (*bolt.DB, func(), error) { td := t.TempDir() @@ -400,6 +570,7 @@ func newDatabase(t testing.TB) (*bolt.DB, func(), error) { } func checkNodeC(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) { + t.Helper() var actual []gc.Node nc := make(chan gc.Node) done := make(chan struct{}) @@ -421,6 +592,7 @@ func checkNodeC(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.No } func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, func(context.Context, gc.Node) error) error) { + t.Helper() var actual []gc.Node scanFn := func(ctx context.Context, n gc.Node) error { actual = append(actual, n) @@ -437,6 +609,7 @@ func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.No } func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) { + t.Helper() sort.Sort(nodeList(n1)) sort.Sort(nodeList(n2)) From 3b82f9e33c3f30e80e1625231235dd1837ab74ce Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Tue, 19 Apr 2022 14:25:31 -0700 Subject: [PATCH 2/2] metadata: use resource max and end on registration Ensure the registered resource type does not conflict with existing resource types or over the max. Signed-off-by: Derek McGowan --- metadata/db.go | 17 +++++++++-------- metadata/gc.go | 2 ++ 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/metadata/db.go b/metadata/db.go index 9d8a4f5c9..d746e1560 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -280,6 +280,12 @@ func (m *DB) RegisterMutationCallback(fn func(bool)) { // - Collectible Resources must track whether the resource is active and/or // lease membership. func (m *DB) RegisterCollectibleResource(t gc.ResourceType, c Collector) { + if t < resourceEnd { + panic("cannot re-register metadata resource") + } else if t >= gc.ResourceMax { + panic("resource type greater than max") + } + m.wlock.Lock() defer m.wlock.Unlock() @@ -287,15 +293,10 @@ func (m *DB) RegisterCollectibleResource(t gc.ResourceType, c Collector) { m.collectors = map[gc.ResourceType]Collector{} } - switch t { - case ResourceContainer: - panic("cannot re-register metadata resource") - default: - if _, ok := m.collectors[t]; ok { - panic("cannot register collectible type twice") - } - m.collectors[t] = c + if _, ok := m.collectors[t]; ok { + panic("cannot register collectible type twice") } + m.collectors[t] = c } // GCStats holds the duration for the different phases of the garbage collector diff --git a/metadata/gc.go b/metadata/gc.go index 4fadb40bf..a5551380b 100644 --- a/metadata/gc.go +++ b/metadata/gc.go @@ -44,6 +44,8 @@ const ( ResourceLease // ResourceIngest specifies a content ingest ResourceIngest + // resourceEnd is the end of specified resource types + resourceEnd ) const (