Add collectible resources to metadata gc

Adds a registration function to metadata which allows plugins to
register resources to be garbage collected. These resources allow
defining resources types which are ephemeral and stored outside the
metadata plugin without extending it. The garbage collection of these
resources will not fail the metadata gc process if their removal fails.
These resources may be referenced by existing metadata store resources
but may not be used to reference metadata store resources for the purpose
of preventing garbage collection.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-04-11 20:03:13 -07:00
parent eaf286224b
commit 8367f69fb5
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
4 changed files with 439 additions and 41 deletions

View File

@ -94,6 +94,9 @@ type DB struct {
// set indicating whether any dirty flags are set
mutationCallbacks []func(bool)
// collectible resources
collectors map[gc.ResourceType]Collector
dbopts dbOptions
}
@ -265,6 +268,36 @@ func (m *DB) RegisterMutationCallback(fn func(bool)) {
m.wlock.Unlock()
}
// RegisterCollectibleResource registers a resource type which can be
// referenced by metadata resources and garbage collected.
// Collectible Resources are useful ephemeral resources which need to
// to be tracked by go away after reboot or process restart.
//
// A few limitations to consider:
// - Collectible Resources cannot reference other resources.
// - A failure to complete collection will not fail the garbage collection,
// however, the resources can be collected in a later run.
// - Collectible Resources must track whether the resource is active and/or
// lease membership.
func (m *DB) RegisterCollectibleResource(t gc.ResourceType, c Collector) {
m.wlock.Lock()
defer m.wlock.Unlock()
if m.collectors == nil {
m.collectors = map[gc.ResourceType]Collector{}
}
switch t {
case ResourceContainer:
panic("cannot re-register metadata resource")
default:
if _, ok := m.collectors[t]; ok {
panic("cannot register collectible type twice")
}
m.collectors[t] = c
}
}
// GCStats holds the duration for the different phases of the garbage collector
type GCStats struct {
MetaD time.Duration
@ -281,8 +314,9 @@ func (s GCStats) Elapsed() time.Duration {
func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
m.wlock.Lock()
t1 := time.Now()
c := startGCContext(ctx, m.collectors)
marked, err := m.getMarked(ctx)
marked, err := m.getMarked(ctx, c) // Pass in gc context
if err != nil {
m.wlock.Unlock()
return nil, err
@ -304,16 +338,17 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
} else if n.Type == ResourceContent || n.Type == ResourceIngest {
m.dirtyCS = true
}
return remove(ctx, tx, n)
return c.remove(ctx, tx, n) // From gc context
}
if err := scanAll(ctx, tx, rm); err != nil {
if err := c.scanAll(ctx, tx, rm); err != nil { // From gc context
return fmt.Errorf("failed to scan and remove: %w", err)
}
return nil
}); err != nil {
m.wlock.Unlock()
c.cancel(ctx)
return nil, err
}
@ -358,13 +393,15 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
stats.MetaD = time.Since(t1)
m.wlock.Unlock()
c.finish(ctx)
wg.Wait()
return stats, err
}
// getMarked returns all resources that are used.
func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
func (m *DB) getMarked(ctx context.Context, c *gcContext) (map[gc.Node]struct{}, error) {
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
@ -383,7 +420,7 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
}
}()
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
if err := c.scanRoots(ctx, tx, roots); err != nil { // From gc context
cancel()
return err
}
@ -392,7 +429,7 @@ func (m *DB) getMarked(ctx context.Context) (map[gc.Node]struct{}, error) {
refs := func(n gc.Node) ([]gc.Node, error) {
var sn []gc.Node
if err := references(ctx, tx, n, func(nn gc.Node) {
if err := c.references(ctx, tx, n, func(nn gc.Node) { // From gc context
sn = append(sn, nn)
}); err != nil {
return nil, err

View File

@ -406,9 +406,45 @@ func TestMetadataCollector(t *testing.T) {
Type: "snapshots/native",
},
}, false, "containerd.io/gc.flat", time.Now().String()),
// Test Collectible Resource
blob(bytesFor(11), false, "containerd.io/gc.ref.test", "test1"),
blob(bytesFor(12), true, "containerd.io/gc.ref.test", "test2"),
lease("lease-3", []leases.Resource{
{
ID: digestFor(11).String(),
Type: "content",
},
}, false),
}
testResource = gc.ResourceType(0x10)
remaining = []gc.Node{
gcnode(testResource, "test", "test1"),
gcnode(testResource, "test", "test3"),
gcnode(testResource, "test", "test4"),
}
collector = &testCollector{
all: []gc.Node{
gcnode(testResource, "random", "test1"),
gcnode(testResource, "test", "test1"),
gcnode(testResource, "test", "test2"),
gcnode(testResource, "test", "test3"),
gcnode(testResource, "test", "test4"),
},
active: []gc.Node{
gcnode(testResource, "test", "test4"),
},
leased: map[string][]gc.Node{
"lease-3": {
gcnode(testResource, "test", "test3"),
},
},
}
remaining []gc.Node
)
mdb.RegisterCollectibleResource(testResource, collector)
if err := mdb.Update(func(tx *bolt.Tx) error {
for _, obj := range objects {
@ -436,7 +472,8 @@ func TestMetadataCollector(t *testing.T) {
actual = append(actual, node)
return nil
}
return scanAll(ctx, tx, scanFn)
cc := startGCContext(ctx, mdb.collectors)
return cc.scanAll(ctx, tx, scanFn)
}); err != nil {
t.Fatal(err)
}

View File

@ -20,6 +20,7 @@ import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"time"
@ -52,15 +53,161 @@ const (
var (
labelGCRoot = []byte("containerd.io/gc.root")
labelGCRef = []byte("containerd.io/gc.ref.")
labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.")
labelGCContentRef = []byte("containerd.io/gc.ref.content")
labelGCExpire = []byte("containerd.io/gc.expire")
labelGCFlat = []byte("containerd.io/gc.flat")
)
// CollectionContext manages a resource collection during a single run of
// the garbage collector. The context is responsible for managing access to
// resources as well as tracking removal.
// Implementations should defer any longer running operations to the Finish
// function and optimize other functions for running fast during garbage
// collection write locks.
type CollectionContext interface {
// Sends all known resources
All(func(gc.Node))
// Active sends all active resources
// Leased resources may be excluded since lease ownership should take
// precedence over active status.
Active(namespace string, fn func(gc.Node))
// Leased sends all resources associated with the given lease
Leased(namespace, lease string, fn func(gc.Node))
// Remove marks the given resource as removed
Remove(gc.Node)
// Cancel is called to cleanup a context after a failed collection
Cancel() error
// Finish is called to cleanup a context after a successful collection
Finish() error
}
// Collector is an interface to manage resource collection for any collectible
// resource registered for garbage collection.
type Collector interface {
StartCollection(context.Context) (CollectionContext, error)
ReferenceLabel() string
}
type gcContext struct {
labelHandlers []referenceLabelHandler
contexts map[gc.ResourceType]CollectionContext
}
type referenceLabelHandler struct {
key []byte
fn func(string, []byte, []byte, func(gc.Node))
}
func startGCContext(ctx context.Context, collectors map[gc.ResourceType]Collector) *gcContext {
var contexts map[gc.ResourceType]CollectionContext
labelHandlers := []referenceLabelHandler{
{
key: labelGCContentRef,
fn: func(ns string, k, v []byte, fn func(gc.Node)) {
if ks := string(k); ks != string(labelGCContentRef) {
// Allow reference naming separated by . or /, ignore names
if ks[len(labelGCContentRef)] != '.' && ks[len(labelGCContentRef)] != '/' {
return
}
}
fn(gcnode(ResourceContent, ns, string(v)))
},
},
{
key: labelGCSnapRef,
fn: func(ns string, k, v []byte, fn func(gc.Node)) {
snapshotter := k[len(labelGCSnapRef):]
if i := bytes.IndexByte(snapshotter, '/'); i >= 0 {
snapshotter = snapshotter[:i]
}
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v)))
},
},
}
if len(collectors) > 0 {
contexts = map[gc.ResourceType]CollectionContext{}
for rt, collector := range collectors {
c, err := collector.StartCollection(ctx)
if err != nil {
// Only skipping this resource this round
continue
}
if reflabel := collector.ReferenceLabel(); reflabel != "" {
key := append(labelGCRef, reflabel...)
labelHandlers = append(labelHandlers, referenceLabelHandler{
key: key,
fn: func(ns string, k, v []byte, fn func(gc.Node)) {
if ks := string(k); ks != string(key) {
// Allow reference naming separated by . or /, ignore names
if ks[len(key)] != '.' && ks[len(key)] != '/' {
return
}
}
fn(gcnode(rt, ns, string(v)))
},
})
}
contexts[rt] = c
}
// Sort labelHandlers to ensure key seeking is always forwardS
sort.Slice(labelHandlers, func(i, j int) bool {
return bytes.Compare(labelHandlers[i].key, labelHandlers[j].key) < 0
})
}
return &gcContext{
labelHandlers: labelHandlers,
contexts: contexts,
}
}
func (c *gcContext) all(fn func(gc.Node)) {
for _, gctx := range c.contexts {
gctx.All(fn)
}
}
func (c *gcContext) active(namespace string, fn func(gc.Node)) {
for _, gctx := range c.contexts {
gctx.Active(namespace, fn)
}
}
func (c *gcContext) leased(namespace, lease string, fn func(gc.Node)) {
for _, gctx := range c.contexts {
gctx.Leased(namespace, lease, fn)
}
}
func (c *gcContext) cancel(ctx context.Context) {
for _, gctx := range c.contexts {
if err := gctx.Cancel(); err != nil {
log.G(ctx).WithError(err).Error("failed to cancel collection context")
}
}
}
func (c *gcContext) finish(ctx context.Context) {
for _, gctx := range c.contexts {
if err := gctx.Finish(); err != nil {
log.G(ctx).WithError(err).Error("failed to finish collection context")
}
}
}
// scanRoots sends the given channel "root" resources that are certainly used.
// The caller could look the references of the resources to find all resources that are used.
func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
func (c *gcContext) scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
@ -170,6 +317,8 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
}
}
c.leased(ns, string(k), fn)
return nil
}); err != nil {
return err
@ -188,7 +337,7 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
contentKey := string(target.Get(bucketKeyDigest))
fn(gcnode(ResourceContent, ns, contentKey))
}
return sendLabelRefs(ns, ibkt.Bucket(k), fn)
return c.sendLabelRefs(ns, ibkt.Bucket(k), fn)
}); err != nil {
return err
}
@ -247,7 +396,7 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, ss)))
}
return sendLabelRefs(ns, cibkt, fn)
return c.sendLabelRefs(ns, cibkt, fn)
}); err != nil {
return err
}
@ -274,12 +423,14 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return err
}
}
c.active(ns, fn)
}
return cerr
}
// references finds the resources that are reachable from the given node.
func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error {
func (c *gcContext) references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error {
switch node.Type {
case ResourceContent:
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(node.Key))
@ -288,7 +439,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
return nil
}
return sendLabelRefs(node.Namespace, bkt, fn)
return c.sendLabelRefs(node.Namespace, bkt, fn)
case ResourceSnapshot, resourceSnapshotFlat:
parts := strings.SplitN(node.Key, "/", 2)
if len(parts) != 2 {
@ -312,7 +463,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
return nil
}
return sendLabelRefs(node.Namespace, bkt, fn)
return c.sendLabelRefs(node.Namespace, bkt, fn)
case ResourceIngest:
// Send expected value
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(node.Key))
@ -332,7 +483,7 @@ func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)
}
// scanAll finds all resources regardless whether the resources are used or not.
func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error {
func (c *gcContext) scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc.Node) error) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
@ -409,11 +560,15 @@ func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc
}
}
c.all(func(n gc.Node) {
fn(ctx, n)
})
return nil
}
// remove all buckets for the given node.
func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
func (c *gcContext) remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
@ -421,6 +576,10 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
nsbkt := v1bkt.Bucket([]byte(node.Namespace))
if nsbkt == nil {
// Still remove object if refenced outside the db
if cc, ok := c.contexts[node.Type]; ok {
cc.Remove(node)
}
return nil
}
@ -461,37 +620,29 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
log.G(ctx).WithField("ref", node.Key).Debug("remove ingest")
return ibkt.DeleteBucket([]byte(node.Key))
}
default:
cc, ok := c.contexts[node.Type]
if ok {
cc.Remove(node)
} else {
log.G(ctx).WithField("ref", node.Key).WithField("type", node.Type).Info("no remove defined for resource")
}
}
return nil
}
// sendLabelRefs sends all snapshot and content references referred to by the labels in the bkt
func sendLabelRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
func (c *gcContext) sendLabelRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
lbkt := bkt.Bucket(bucketKeyObjectLabels)
if lbkt != nil {
lc := lbkt.Cursor()
labelRef := string(labelGCContentRef)
for k, v := lc.Seek(labelGCContentRef); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() {
if ks := string(k); ks != labelRef {
// Allow reference naming separated by . or /, ignore names
if ks[len(labelRef)] != '.' && ks[len(labelRef)] != '/' {
continue
}
for i := range c.labelHandlers {
labelRef := string(c.labelHandlers[i].key)
for k, v := lc.Seek(c.labelHandlers[i].key); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() {
c.labelHandlers[i].fn(ns, k, v, fn)
}
fn(gcnode(ResourceContent, ns, string(v)))
}
for k, v := lc.Seek(labelGCSnapRef); k != nil && strings.HasPrefix(string(k), string(labelGCSnapRef)); k, v = lc.Next() {
snapshotter := k[len(labelGCSnapRef):]
if i := bytes.IndexByte(snapshotter, '/'); i >= 0 {
snapshotter = snapshotter[:i]
}
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v)))
}
}
return nil
}

View File

@ -157,7 +157,7 @@ func TestGCRoots(t *testing.T) {
ctx := context.Background()
checkNodeC(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return scanRoots(ctx, tx, nc)
return startGCContext(ctx, nil).scanRoots(ctx, tx, nc)
})
}
@ -230,9 +230,10 @@ func TestGCRemove(t *testing.T) {
}
ctx := context.Background()
c := startGCContext(ctx, nil)
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
return scanAll(ctx, tx, fn)
return c.scanAll(ctx, tx, fn)
})
if t.Failed() {
t.Fatal("Scan all failed")
@ -240,7 +241,7 @@ func TestGCRemove(t *testing.T) {
if err := db.Update(func(tx *bolt.Tx) error {
for _, n := range deleted {
if err := remove(ctx, tx, n); err != nil {
if err := c.remove(ctx, tx, n); err != nil {
return err
}
}
@ -250,7 +251,7 @@ func TestGCRemove(t *testing.T) {
}
checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
return scanAll(ctx, tx, fn)
return c.scanAll(ctx, tx, fn)
})
}
@ -370,10 +371,11 @@ func TestGCRefs(t *testing.T) {
}
ctx := context.Background()
c := startGCContext(ctx, nil)
for n, nodes := range refs {
checkNodeC(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return references(ctx, tx, n, func(n gc.Node) {
return c.references(ctx, tx, n, func(n gc.Node) {
select {
case nc <- n:
case <-ctx.Done():
@ -386,6 +388,174 @@ func TestGCRefs(t *testing.T) {
}
}
func TestCollectibleResources(t *testing.T) {
db, cleanup, err := newDatabase(t)
if err != nil {
t.Fatal(err)
}
testResource := gc.ResourceType(0x10)
defer cleanup()
alters := []alterFunc{
addContent("ns1", dgst(1), nil),
addImage("ns1", "image1", dgst(1), nil),
addContent("ns1", dgst(2), map[string]string{
"containerd.io/gc.ref.test": "test2",
}),
addImage("ns1", "image2", dgst(2), nil),
addLease("ns1", "lease1", labelmap(string(labelGCExpire), time.Now().Add(time.Hour).Format(time.RFC3339))),
addLease("ns1", "lease2", labelmap(string(labelGCExpire), time.Now().Add(-1*time.Hour).Format(time.RFC3339))),
}
refs := map[gc.Node][]gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()): nil,
gcnode(ResourceContent, "ns1", dgst(2).String()): {
gcnode(testResource, "ns1", "test2"),
},
}
all := []gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()),
gcnode(ResourceContent, "ns1", dgst(2).String()),
gcnode(ResourceLease, "ns1", "lease1"),
gcnode(ResourceLease, "ns1", "lease2"),
gcnode(testResource, "ns1", "test1"),
gcnode(testResource, "ns1", "test2"), // 5: Will be removed
gcnode(testResource, "ns1", "test3"),
gcnode(testResource, "ns1", "test4"),
}
removeIndex := 5
roots := []gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()),
gcnode(ResourceContent, "ns1", dgst(2).String()),
gcnode(ResourceLease, "ns1", "lease1"),
gcnode(testResource, "ns1", "test1"),
gcnode(testResource, "ns1", "test3"),
}
collector := &testCollector{
all: []gc.Node{
gcnode(testResource, "ns1", "test1"),
gcnode(testResource, "ns1", "test2"),
gcnode(testResource, "ns1", "test3"),
gcnode(testResource, "ns1", "test4"),
},
active: []gc.Node{
gcnode(testResource, "ns1", "test1"),
},
leased: map[string][]gc.Node{
"lease1": {
gcnode(testResource, "ns1", "test3"),
},
"lease2": {
gcnode(testResource, "ns1", "test4"),
},
},
}
if err := db.Update(func(tx *bolt.Tx) error {
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
for _, alter := range alters {
if err := alter(v1bkt); err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatalf("Update failed: %+v", err)
}
ctx := context.Background()
c := startGCContext(ctx, map[gc.ResourceType]Collector{
testResource: collector,
})
for n, nodes := range refs {
checkNodeC(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return c.references(ctx, tx, n, func(n gc.Node) {
select {
case nc <- n:
case <-ctx.Done():
}
})
})
if t.Failed() {
t.Fatalf("Failure scanning %v", n)
}
}
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
return c.scanAll(ctx, tx, fn)
})
checkNodeC(ctx, t, db, roots, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return c.scanRoots(ctx, tx, nc)
})
if err := db.Update(func(tx *bolt.Tx) error {
if err := c.remove(ctx, tx, all[removeIndex]); err != nil {
return err
}
return nil
}); err != nil {
t.Fatalf("Update failed: %+v", err)
}
all = append(all[:removeIndex], all[removeIndex+1:]...)
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, fn func(context.Context, gc.Node) error) error {
return c.scanAll(ctx, tx, fn)
})
}
type testCollector struct {
all []gc.Node
active []gc.Node
leased map[string][]gc.Node
}
func (tc *testCollector) StartCollection(context.Context) (CollectionContext, error) {
return tc, nil
}
func (tc *testCollector) ReferenceLabel() string {
return "test"
}
func (tc *testCollector) All(fn func(gc.Node)) {
for _, n := range tc.all {
fn(n)
}
}
func (tc *testCollector) Active(namespace string, fn func(gc.Node)) {
for _, n := range tc.active {
if n.Namespace == namespace {
fn(n)
}
}
}
func (tc *testCollector) Leased(namespace, lease string, fn func(gc.Node)) {
for _, n := range tc.leased[lease] {
if n.Namespace == namespace {
fn(n)
}
}
}
func (tc *testCollector) Remove(n gc.Node) {
for i := range tc.all {
if tc.all[i] == n {
tc.all = append(tc.all[:i], tc.all[i+1:]...)
return
}
}
}
func (tc *testCollector) Cancel() error {
return nil
}
func (tc *testCollector) Finish() error {
return nil
}
func newDatabase(t testing.TB) (*bolt.DB, func(), error) {
td := t.TempDir()
@ -400,6 +570,7 @@ func newDatabase(t testing.TB) (*bolt.DB, func(), error) {
}
func checkNodeC(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) {
t.Helper()
var actual []gc.Node
nc := make(chan gc.Node)
done := make(chan struct{})
@ -421,6 +592,7 @@ func checkNodeC(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.No
}
func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, func(context.Context, gc.Node) error) error) {
t.Helper()
var actual []gc.Node
scanFn := func(ctx context.Context, n gc.Node) error {
actual = append(actual, n)
@ -437,6 +609,7 @@ func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.No
}
func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) {
t.Helper()
sort.Sort(nodeList(n1))
sort.Sort(nodeList(n2))