diff --git a/container_opts.go b/container_opts.go index b8d334638..96ba75812 100644 --- a/container_opts.go +++ b/container_opts.go @@ -2,10 +2,12 @@ package containerd import ( "context" + "time" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/snapshot" "github.com/containerd/typeurl" "github.com/gogo/protobuf/types" "github.com/opencontainers/image-spec/identity" @@ -91,7 +93,11 @@ func WithNewSnapshot(id string, i Image) NewContainerOpts { return err } setSnapshotterIfEmpty(c) - if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + labels := map[string]string{ + "containerd.io/gc.root": time.Now().String(), + } + parent := identity.ChainID(diffIDs).String() + if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent, snapshot.WithLabels(labels)); err != nil { return err } c.SnapshotKey = id @@ -120,7 +126,11 @@ func WithNewSnapshotView(id string, i Image) NewContainerOpts { return err } setSnapshotterIfEmpty(c) - if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + labels := map[string]string{ + "containerd.io/gc.root": time.Now().String(), + } + parent := identity.ChainID(diffIDs).String() + if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent, snapshot.WithLabels(labels)); err != nil { return err } c.SnapshotKey = id diff --git a/content/helpers.go b/content/helpers.go index 1c1087057..af05d0688 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -69,7 +69,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i // the size or digest is unknown, these values may be empty. // // Copy is buffered, so no need to wrap reader in buffered io. -func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest) error { +func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { ws, err := cw.Status() if err != nil { return err @@ -96,7 +96,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige return err } - if err := cw.Commit(ctx, size, expected); err != nil { + if err := cw.Commit(ctx, size, expected, opts...); err != nil { if !errdefs.IsAlreadyExists(err) { return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) } diff --git a/gc/gc.go b/gc/gc.go index b9c265044..e892be155 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -5,8 +5,25 @@ // under certain use cases. package gc +import ( + "context" + "sync" +) + +// Resourcetype represents type of resource at a node +type ResourceType uint8 + +// Node presents a resource which has a type and key, +// this node can be used to lookup other nodes. +type Node struct { + Type ResourceType + Namespace string + Key string +} + // Tricolor implements basic, single-thread tri-color GC. Given the roots, the -// complete set and a refs function, this returns the unreachable objects. +// complete set and a refs function, this function returns a map of all +// reachable objects. // // Correct usage requires that the caller not allow the arguments to change // until the result is used to delete objects in the system. @@ -15,11 +32,11 @@ package gc // // We can probably use this to inform a design for incremental GC by injecting // callbacks to the set modification algorithms. -func Tricolor(roots []string, all []string, refs func(ref string) []string) []string { +func Tricolor(roots []Node, refs func(ref Node) ([]Node, error)) (map[Node]struct{}, error) { var ( - grays []string // maintain a gray "stack" - seen = map[string]struct{}{} // or not "white", basically "seen" - reachable = map[string]struct{}{} // or "block", in tri-color parlance + grays []Node // maintain a gray "stack" + seen = map[Node]struct{}{} // or not "white", basically "seen" + reachable = map[Node]struct{}{} // or "block", in tri-color parlance ) grays = append(grays, roots...) @@ -29,9 +46,13 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st id := grays[len(grays)-1] // effectively "depth first" because first element grays = grays[:len(grays)-1] seen[id] = struct{}{} // post-mark this as not-white + rs, err := refs(id) + if err != nil { + return nil, err + } // mark all the referenced objects as gray - for _, target := range refs(id) { + for _, target := range rs { if _, ok := seen[target]; !ok { grays = append(grays, target) } @@ -41,14 +62,99 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st reachable[id] = struct{}{} } + return reachable, nil +} + +// ConcurrentMark implements simple, concurrent GC. All the roots are scanned +// and the complete set of references is formed by calling the refs function +// for each seen object. This function returns a map of all object reachable +// from a root. +// +// Correct usage requires that the caller not allow the arguments to change +// until the result is used to delete objects in the system. +// +// It will allocate memory proportional to the size of the reachable set. +func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Context, Node, func(Node)) error) (map[Node]struct{}, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + grays = make(chan Node) + seen = map[Node]struct{}{} // or not "white", basically "seen" + wg sync.WaitGroup + + errOnce sync.Once + refErr error + ) + + go func() { + for gray := range grays { + if _, ok := seen[gray]; ok { + wg.Done() + continue + } + seen[gray] = struct{}{} // post-mark this as non-white + + go func(gray Node) { + defer wg.Done() + + send := func(n Node) { + wg.Add(1) + select { + case grays <- n: + case <-ctx.Done(): + wg.Done() + } + } + + if err := refs(ctx, gray, send); err != nil { + errOnce.Do(func() { + refErr = err + cancel() + }) + } + + }(gray) + } + }() + + for r := range root { + wg.Add(1) + select { + case grays <- r: + case <-ctx.Done(): + wg.Done() + } + + } + + // Wait for outstanding grays to be processed + wg.Wait() + + close(grays) + + if refErr != nil { + return nil, refErr + } + if cErr := ctx.Err(); cErr != nil { + return nil, cErr + } + + return seen, nil +} + +// Sweep removes all nodes returned through the channel which are not in +// the reachable set by calling the provided remove function. +func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error { // All black objects are now reachable, and all white objects are // unreachable. Free those that are white! - var whites []string - for _, obj := range all { - if _, ok := reachable[obj]; !ok { - whites = append(whites, obj) + for node := range all { + if _, ok := reachable[node]; !ok { + if err := remove(node); err != nil { + return err + } } } - return whites + return nil } diff --git a/gc/gc_test.go b/gc/gc_test.go index 94ad3dd27..c3c6a9590 100644 --- a/gc/gc_test.go +++ b/gc/gc_test.go @@ -1,30 +1,154 @@ package gc import ( + "context" "reflect" "testing" ) func TestTricolorBasic(t *testing.T) { roots := []string{"A", "C"} - all := []string{"A", "B", "C", "D", "E", "F", "G"} + all := []string{"A", "B", "C", "D", "E", "F", "G", "H"} refs := map[string][]string{ "A": {"B"}, "B": {"A"}, "C": {"D", "F", "B"}, "E": {"F", "G"}, + "F": {"H"}, } - unreachable := Tricolor(roots, all, lookup(refs)) - expected := []string{"E", "G"} + expected := toNodes([]string{"A", "B", "C", "D", "F", "H"}) - if !reflect.DeepEqual(unreachable, expected) { - t.Fatalf("incorrect unreachable set: %v != %v", unreachable, expected) + reachable, err := Tricolor(toNodes(roots), lookup(refs)) + if err != nil { + t.Fatal(err) + } + + var sweeped []Node + for _, a := range toNodes(all) { + if _, ok := reachable[a]; ok { + sweeped = append(sweeped, a) + } + } + + if !reflect.DeepEqual(sweeped, expected) { + t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected) } } -func lookup(refs map[string][]string) func(id string) []string { - return func(ref string) []string { - return refs[ref] +func TestConcurrentBasic(t *testing.T) { + roots := []string{"A", "C"} + all := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"} + refs := map[string][]string{ + "A": {"B"}, + "B": {"A"}, + "C": {"D", "F", "B"}, + "E": {"F", "G"}, + "F": {"H"}, + "G": {"I"}, + } + + expected := toNodes([]string{"A", "B", "C", "D", "F", "H"}) + + ctx := context.Background() + rootC := make(chan Node) + go func() { + writeNodes(ctx, rootC, toNodes(roots)) + close(rootC) + }() + + reachable, err := ConcurrentMark(ctx, rootC, lookupc(refs)) + if err != nil { + t.Fatal(err) + } + + var sweeped []Node + for _, a := range toNodes(all) { + if _, ok := reachable[a]; ok { + sweeped = append(sweeped, a) + } + } + + if !reflect.DeepEqual(sweeped, expected) { + t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected) } } + +func writeNodes(ctx context.Context, nc chan<- Node, nodes []Node) { + for _, n := range nodes { + select { + case nc <- n: + case <-ctx.Done(): + return + } + } +} + +func lookup(refs map[string][]string) func(id Node) ([]Node, error) { + return func(ref Node) ([]Node, error) { + return toNodes(refs[ref.Key]), nil + } +} + +func lookupc(refs map[string][]string) func(context.Context, Node, func(Node)) error { + return func(ctx context.Context, ref Node, fn func(Node)) error { + for _, n := range toNodes(refs[ref.Key]) { + fn(n) + } + return nil + } +} + +func toNodes(s []string) []Node { + n := make([]Node, len(s)) + for i := range s { + n[i] = Node{ + Key: s[i], + } + } + return n +} + +func newScanner(refs []string) *stringScanner { + return &stringScanner{ + i: -1, + s: refs, + } +} + +type stringScanner struct { + i int + s []string +} + +func (ss *stringScanner) Next() bool { + ss.i++ + return ss.i < len(ss.s) +} + +func (ss *stringScanner) Node() Node { + return Node{ + Key: ss.s[ss.i], + } +} + +func (ss *stringScanner) Cleanup() error { + ss.s[ss.i] = "" + return nil +} + +func (ss *stringScanner) Err() error { + return nil +} + +func (ss *stringScanner) All() []Node { + remaining := make([]Node, 0, len(ss.s)) + for _, s := range ss.s { + if s != "" { + remaining = append(remaining, Node{ + Key: s, + }) + } + } + return remaining +} diff --git a/image.go b/image.go index d333c44fa..b41037a16 100644 --- a/image.go +++ b/image.go @@ -2,11 +2,16 @@ package containerd import ( "context" + "fmt" + "time" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/rootfs" + "github.com/containerd/containerd/snapshot" digest "github.com/opencontainers/go-digest" + "github.com/opencontainers/image-spec/identity" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -64,36 +69,68 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error { return err } - sn := i.client.SnapshotService(snapshotterName) - a := i.client.DiffService() - cs := i.client.ContentStore() + var ( + sn = i.client.SnapshotService(snapshotterName) + a = i.client.DiffService() + cs = i.client.ContentStore() - var chain []digest.Digest + chain []digest.Digest + unpacked bool + ) for _, layer := range layers { - unpacked, err := rootfs.ApplyLayer(ctx, layer, chain, sn, a) + labels := map[string]string{ + "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339), + "containerd.io/uncompressed": layer.Diff.Digest.String(), + } + lastUnpacked := unpacked + + unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels)) if err != nil { - // TODO: possibly wait and retry if extraction of same chain id was in progress return err } - if unpacked { - info, err := cs.Info(ctx, layer.Blob.Digest) - if err != nil { + + if lastUnpacked { + info := snapshot.Info{ + Name: identity.ChainID(chain).String(), + } + + // Remove previously created gc.root label + if _, err := sn.Update(ctx, info, "labels.containerd.io/gc.root"); err != nil { return err } - if info.Labels == nil { - info.Labels = map[string]string{} - } - if info.Labels["containerd.io/uncompressed"] != layer.Diff.Digest.String() { - info.Labels["containerd.io/uncompressed"] = layer.Diff.Digest.String() - if _, err := cs.Update(ctx, info, "labels.containerd.io/uncompressed"); err != nil { - return err - } - } } chain = append(chain, layer.Diff.Digest) } + if unpacked { + desc, err := i.i.Config(ctx, cs, platforms.Default()) + if err != nil { + return err + } + + rootfs := identity.ChainID(chain).String() + + cinfo := content.Info{ + Digest: desc.Digest, + Labels: map[string]string{ + fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snapshotterName): rootfs, + }, + } + if _, err := cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName)); err != nil { + return err + } + + sinfo := snapshot.Info{ + Name: rootfs, + } + + // Config now referenced snapshot, release root reference + if _, err := sn.Update(ctx, sinfo, "labels.containerd.io/gc.root"); err != nil { + return err + } + } + return nil } diff --git a/metadata/content.go b/metadata/content.go index 023d5d215..05064fdec 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "strings" + "sync" "time" "github.com/boltdb/bolt" @@ -11,6 +12,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/labels" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/namespaces" digest "github.com/opencontainers/go-digest" @@ -19,12 +21,13 @@ import ( type contentStore struct { content.Store - db transactor + db *DB + l sync.RWMutex } // newContentStore returns a namespaced content store using an existing // content store interface. -func newContentStore(db transactor, cs content.Store) content.Store { +func newContentStore(db *DB, cs content.Store) *contentStore { return &contentStore{ Store: cs, db: db, @@ -59,6 +62,9 @@ func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpath return content.Info{}, err } + cs.l.RLock() + defer cs.l.RUnlock() + updated := content.Info{ Digest: info.Digest, } @@ -166,15 +172,25 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { return err } + cs.l.RLock() + defer cs.l.RUnlock() + return update(ctx, cs.db, func(tx *bolt.Tx) error { bkt := getBlobBucket(tx, ns, dgst) if bkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) } - // Just remove local reference, garbage collector is responsible for - // cleaning up on disk content - return getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())) + if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil { + return err + } + + // Mark content store as dirty for triggering garbage collection + cs.db.dirtyL.Lock() + cs.db.dirtyCS = true + cs.db.dirtyL.Unlock() + + return nil }) } @@ -269,6 +285,9 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error { return err } + cs.l.RLock() + defer cs.l.RUnlock() + return update(ctx, cs.db, func(tx *bolt.Tx) error { bkt := getIngestBucket(tx, ns) if bkt == nil { @@ -293,6 +312,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe return nil, err } + cs.l.RLock() + defer cs.l.RUnlock() + var w content.Writer if err := update(ctx, cs.db, func(tx *bolt.Tx) error { if expected != "" { @@ -346,6 +368,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe ref: ref, namespace: ns, db: cs.db, + l: &cs.l, }, nil } @@ -354,9 +377,13 @@ type namespacedWriter struct { ref string namespace string db transactor + l *sync.RWMutex } func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + nw.l.RLock() + defer nw.l.RUnlock() + return update(ctx, nw.db, func(tx *bolt.Tx) error { bkt := getIngestBucket(tx, nw.namespace) if bkt != nil { @@ -495,3 +522,61 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error { return bkt.Put(bucketKeySize, sizeEncoded) } + +func (cs *contentStore) garbageCollect(ctx context.Context) error { + lt1 := time.Now() + cs.l.Lock() + defer func() { + cs.l.Unlock() + log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected") + }() + + seen := map[string]struct{}{} + if err := cs.db.View(func(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + + cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent) + if cbkt == nil { + continue + } + bbkt := cbkt.Bucket(bucketKeyObjectBlob) + if err := bbkt.ForEach(func(ck, cv []byte) error { + if cv == nil { + seen[string(ck)] = struct{}{} + } + return nil + }); err != nil { + return err + } + } + + return nil + }); err != nil { + return err + } + + if err := cs.Store.Walk(ctx, func(info content.Info) error { + if _, ok := seen[info.Digest.String()]; !ok { + if err := cs.Store.Delete(ctx, info.Digest); err != nil { + return err + } + log.G(ctx).WithField("digest", info.Digest).Debug("removed content") + } + return nil + }); err != nil { + return err + } + + return nil +} diff --git a/metadata/db.go b/metadata/db.go index aed6f1e9a..7ae293159 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -3,10 +3,13 @@ package metadata import ( "context" "encoding/binary" + "strings" + "sync" "time" "github.com/boltdb/bolt" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/gc" "github.com/containerd/containerd/log" "github.com/containerd/containerd/snapshot" "github.com/pkg/errors" @@ -36,15 +39,32 @@ type DB struct { db *bolt.DB ss map[string]snapshot.Snapshotter cs content.Store + + // wlock is used to protect access to the data structures during garbage + // collection. While the wlock is held no writable transactions can be + // opened, preventing changes from occurring between the mark and + // sweep phases without preventing read transactions. + wlock sync.RWMutex + + // dirty flags and lock keeps track of datastores which have had deletions + // since the last garbage collection. These datastores will will be garbage + // collected during the next garbage collection. + dirtyL sync.Mutex + dirtySS map[string]struct{} + dirtyCS bool + + // TODO: Keep track of stats such as pause time, number of collected objects, errors + lastCollection time.Time } // NewDB creates a new metadata database using the provided // bolt database, content store, and snapshotters. func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB { return &DB{ - db: db, - ss: ss, - cs: cs, + db: db, + ss: ss, + cs: cs, + dirtySS: map[string]struct{}{}, } } @@ -158,5 +178,134 @@ func (m *DB) View(fn func(*bolt.Tx) error) error { // Update runs a writable transation on the metadata store. func (m *DB) Update(fn func(*bolt.Tx) error) error { + m.wlock.RLock() + defer m.wlock.RUnlock() return m.db.Update(fn) } + +func (m *DB) GarbageCollect(ctx context.Context) error { + lt1 := time.Now() + m.wlock.Lock() + defer func() { + m.wlock.Unlock() + log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected") + }() + + var marked map[gc.Node]struct{} + + if err := m.db.View(func(tx *bolt.Tx) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + roots := make(chan gc.Node) + errChan := make(chan error) + go func() { + defer close(errChan) + defer close(roots) + + // Call roots + if err := scanRoots(ctx, tx, roots); err != nil { + cancel() + errChan <- err + } + }() + + refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error { + return references(ctx, tx, n, fn) + } + + reachable, err := gc.ConcurrentMark(ctx, roots, refs) + if rerr := <-errChan; rerr != nil { + return rerr + } + if err != nil { + return err + } + marked = reachable + return nil + }); err != nil { + return err + } + + m.dirtyL.Lock() + defer m.dirtyL.Unlock() + + if err := m.db.Update(func(tx *bolt.Tx) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + nodeC := make(chan gc.Node) + var scanErr error + + go func() { + defer close(nodeC) + scanErr = scanAll(ctx, tx, nodeC) + }() + + rm := func(n gc.Node) error { + if n.Type == ResourceSnapshot { + if idx := strings.IndexRune(n.Key, '/'); idx > 0 { + m.dirtySS[n.Key[:idx]] = struct{}{} + } + } else if n.Type == ResourceContent { + m.dirtyCS = true + } + return remove(ctx, tx, n) + } + + if err := gc.Sweep(marked, nodeC, rm); err != nil { + return errors.Wrap(err, "failed to sweep") + } + + if scanErr != nil { + return errors.Wrap(scanErr, "failed to scan all") + } + + return nil + }); err != nil { + return err + } + + m.lastCollection = time.Now() + + if len(m.dirtySS) > 0 { + for snapshotterName := range m.dirtySS { + log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup") + go m.cleanupSnapshotter(snapshotterName) + } + m.dirtySS = map[string]struct{}{} + } + + if m.dirtyCS { + log.G(ctx).Debug("scheduling content cleanup") + go m.cleanupContent() + m.dirtyCS = false + } + + return nil +} + +func (m *DB) cleanupSnapshotter(name string) { + ctx := context.Background() + sn, ok := m.ss[name] + if !ok { + return + } + + err := newSnapshotter(m, name, sn).garbageCollect(ctx) + if err != nil { + log.G(ctx).WithError(err).WithField("snapshotter", name).Warn("garbage collection failed") + } +} + +func (m *DB) cleanupContent() { + ctx := context.Background() + if m.cs == nil { + return + } + + err := newContentStore(m, m.cs).garbageCollect(ctx) + if err != nil { + log.G(ctx).WithError(err).Warn("content garbage collection failed") + } +} diff --git a/metadata/db_test.go b/metadata/db_test.go index e14242765..ff3e9cd15 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -1,11 +1,31 @@ package metadata import ( + "context" "encoding/binary" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "runtime/pprof" "testing" + "time" "github.com/boltdb/bolt" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshot" + "github.com/containerd/containerd/snapshot/naive" + "github.com/gogo/protobuf/types" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -176,3 +196,366 @@ func readDBVersion(db *bolt.DB, schema []byte) (int, error) { } return version, nil } + +func TestMetadataCollector(t *testing.T) { + mdb, cs, sn, cleanup := newStores(t) + defer cleanup() + + var ( + ctx = context.Background() + + objects = []object{ + blob(bytesFor(1), true), + blob(bytesFor(2), false), + blob(bytesFor(3), true), + blob(bytesFor(4), false, "containerd.io/gc.root", time.Now().String()), + newSnapshot("1", "", false, false), + newSnapshot("2", "1", false, false), + newSnapshot("3", "2", false, false), + newSnapshot("4", "3", false, false), + newSnapshot("5", "3", false, true), + container("1", "4"), + image("image-1", digestFor(2)), + } + remaining []gc.Node + ) + + if err := mdb.Update(func(tx *bolt.Tx) error { + for _, obj := range objects { + node, err := create(obj, tx, cs, sn) + if err != nil { + return err + } + if node != nil { + remaining = append(remaining, *node) + } + } + return nil + }); err != nil { + t.Fatalf("Creation failed: %+v", err) + } + + if err := mdb.GarbageCollect(ctx); err != nil { + t.Fatal(err) + } + + var actual []gc.Node + + if err := mdb.View(func(tx *bolt.Tx) error { + nodeC := make(chan gc.Node) + var scanErr error + go func() { + defer close(nodeC) + scanErr = scanAll(ctx, tx, nodeC) + }() + for node := range nodeC { + actual = append(actual, node) + } + return scanErr + }); err != nil { + t.Fatal(err) + } + + checkNodesEqual(t, actual, remaining) +} + +func BenchmarkGarbageCollect(b *testing.B) { + b.Run("10-Sets", benchmarkTrigger(10)) + b.Run("100-Sets", benchmarkTrigger(100)) + b.Run("1000-Sets", benchmarkTrigger(1000)) + b.Run("10000-Sets", benchmarkTrigger(10000)) +} + +func benchmarkTrigger(n int) func(b *testing.B) { + return func(b *testing.B) { + mdb, cs, sn, cleanup := newStores(b) + defer cleanup() + + objects := []object{} + + // TODO: Allow max to be configurable + for i := 0; i < n; i++ { + objects = append(objects, + blob(bytesFor(int64(i)), false), + image(fmt.Sprintf("image-%d", i), digestFor(int64(i))), + ) + lastSnapshot := 6 + for j := 0; j <= lastSnapshot; j++ { + var parent string + key := fmt.Sprintf("snapshot-%d-%d", i, j) + if j > 0 { + parent = fmt.Sprintf("snapshot-%d-%d", i, j-1) + } + objects = append(objects, newSnapshot(key, parent, false, false)) + } + objects = append(objects, container(fmt.Sprintf("container-%d", i), fmt.Sprintf("snapshot-%d-%d", i, lastSnapshot))) + + } + + // TODO: Create set of objects for removal + + var ( + ctx = context.Background() + + remaining []gc.Node + ) + + if err := mdb.Update(func(tx *bolt.Tx) error { + for _, obj := range objects { + node, err := create(obj, tx, cs, sn) + if err != nil { + return err + } + if node != nil { + remaining = append(remaining, *node) + } + } + return nil + }); err != nil { + b.Fatalf("Creation failed: %+v", err) + } + + // TODO: reset benchmark + b.ResetTimer() + //b.StopTimer() + + labels := pprof.Labels("worker", "trigger") + pprof.Do(ctx, labels, func(ctx context.Context) { + for i := 0; i < b.N; i++ { + + // TODO: Add removal objects + + //b.StartTimer() + + if err := mdb.GarbageCollect(ctx); err != nil { + b.Fatal(err) + } + + //b.StopTimer() + + //var actual []gc.Node + + //if err := db.View(func(tx *bolt.Tx) error { + // nodeC := make(chan gc.Node) + // var scanErr error + // go func() { + // defer close(nodeC) + // scanErr = scanAll(ctx, tx, nodeC) + // }() + // for node := range nodeC { + // actual = append(actual, node) + // } + // return scanErr + //}); err != nil { + // t.Fatal(err) + //} + + //checkNodesEqual(t, actual, remaining) + } + }) + } +} + +func bytesFor(i int64) []byte { + r := rand.New(rand.NewSource(i)) + var b [256]byte + _, err := r.Read(b[:]) + if err != nil { + panic(err) + } + return b[:] +} + +func digestFor(i int64) digest.Digest { + r := rand.New(rand.NewSource(i)) + dgstr := digest.SHA256.Digester() + _, err := io.Copy(dgstr.Hash(), io.LimitReader(r, 256)) + if err != nil { + panic(err) + } + return dgstr.Digest() +} + +type object struct { + data interface{} + removed bool + labels map[string]string +} + +func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshot.Snapshotter) (*gc.Node, error) { + var ( + node *gc.Node + namespace = "test" + ctx = namespaces.WithNamespace(context.Background(), namespace) + ) + + switch v := obj.data.(type) { + case testContent: + ctx := WithTransactionContext(ctx, tx) + expected := digest.FromBytes(v.data) + w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected) + if err != nil { + return nil, errors.Wrap(err, "failed to create writer") + } + if _, err := w.Write(v.data); err != nil { + return nil, errors.Wrap(err, "write blob failed") + } + if err := w.Commit(ctx, int64(len(v.data)), expected, content.WithLabels(obj.labels)); err != nil { + return nil, errors.Wrap(err, "failed to commit blob") + } + if !obj.removed { + node = &gc.Node{ + Type: ResourceContent, + Namespace: namespace, + Key: expected.String(), + } + } + case testSnapshot: + ctx := WithTransactionContext(ctx, tx) + if v.active { + _, err := sn.Prepare(ctx, v.key, v.parent, snapshot.WithLabels(obj.labels)) + if err != nil { + return nil, err + } + } else { + akey := fmt.Sprintf("%s-active", v.key) + _, err := sn.Prepare(ctx, akey, v.parent) + if err != nil { + return nil, err + } + if err := sn.Commit(ctx, v.key, akey, snapshot.WithLabels(obj.labels)); err != nil { + return nil, err + } + } + if !obj.removed { + node = &gc.Node{ + Type: ResourceSnapshot, + Namespace: namespace, + Key: fmt.Sprintf("naive/%s", v.key), + } + } + case testImage: + image := images.Image{ + Name: v.name, + Target: v.target, + Labels: obj.labels, + } + _, err := NewImageStore(tx).Create(ctx, image) + if err != nil { + return nil, errors.Wrap(err, "failed to create image") + } + case testContainer: + container := containers.Container{ + ID: v.id, + SnapshotKey: v.snapshot, + Snapshotter: "naive", + Labels: obj.labels, + + Runtime: containers.RuntimeInfo{ + Name: "testruntime", + }, + Spec: &types.Any{}, + } + _, err := NewContainerStore(tx).Create(ctx, container) + if err != nil { + return nil, err + } + } + + return node, nil +} + +func blob(b []byte, r bool, l ...string) object { + return object{ + data: testContent{ + data: b, + }, + removed: r, + labels: labelmap(l...), + } +} + +func image(n string, d digest.Digest, l ...string) object { + return object{ + data: testImage{ + name: n, + target: ocispec.Descriptor{ + MediaType: "irrelevant", + Digest: d, + Size: 256, + }, + }, + removed: false, + labels: labelmap(l...), + } +} + +func newSnapshot(key, parent string, active, r bool, l ...string) object { + return object{ + data: testSnapshot{ + key: key, + parent: parent, + active: active, + }, + removed: r, + labels: labelmap(l...), + } +} + +func container(id, s string, l ...string) object { + return object{ + data: testContainer{ + id: id, + snapshot: s, + }, + removed: false, + labels: labelmap(l...), + } +} + +type testContent struct { + data []byte +} + +type testSnapshot struct { + key string + parent string + active bool +} + +type testImage struct { + name string + target ocispec.Descriptor +} + +type testContainer struct { + id string + snapshot string +} + +func newStores(t testing.TB) (*DB, content.Store, snapshot.Snapshotter, func()) { + td, err := ioutil.TempDir("", "gc-test-") + if err != nil { + t.Fatal(err) + } + db, err := bolt.Open(filepath.Join(td, "meta.db"), 0644, nil) + if err != nil { + t.Fatal(err) + } + + nsn, err := naive.NewSnapshotter(filepath.Join(td, "snapshots")) + if err != nil { + t.Fatal(err) + } + + lcs, err := local.NewStore(filepath.Join(td, "content")) + if err != nil { + t.Fatal(err) + } + + mdb := NewDB(db, lcs, map[string]snapshot.Snapshotter{"naive": nsn}) + + return mdb, mdb.ContentStore(), mdb.Snapshotter("naive"), func() { + os.RemoveAll(td) + } +} diff --git a/metadata/gc.go b/metadata/gc.go new file mode 100644 index 000000000..8434d694b --- /dev/null +++ b/metadata/gc.go @@ -0,0 +1,343 @@ +package metadata + +import ( + "context" + "fmt" + "strings" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/log" + "github.com/pkg/errors" +) + +const ( + ResourceUnknown gc.ResourceType = iota + ResourceContent + ResourceSnapshot + ResourceContainer + ResourceTask +) + +var ( + labelGCRoot = []byte("containerd.io/gc.root") + labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.") + labelGCContentRef = []byte("containerd.io/gc.ref.content") +) + +func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + nbkt := v1bkt.Bucket(k) + ns := string(k) + + ibkt := nbkt.Bucket(bucketKeyObjectImages) + if ibkt != nil { + if err := ibkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + + target := ibkt.Bucket(k).Bucket(bucketKeyTarget) + if target != nil { + contentKey := string(target.Get(bucketKeyDigest)) + select { + case nc <- gcnode(ResourceContent, ns, contentKey): + case <-ctx.Done(): + return ctx.Err() + } + } + return sendSnapshotRefs(ns, ibkt.Bucket(k), func(n gc.Node) { + select { + case nc <- n: + case <-ctx.Done(): + } + }) + }); err != nil { + return err + } + } + + cbkt := nbkt.Bucket(bucketKeyObjectContent) + if cbkt != nil { + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + } + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k)) + }); err != nil { + return err + } + } + + cbkt = nbkt.Bucket(bucketKeyObjectContainers) + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + snapshotter := string(cbkt.Bucket(k).Get(bucketKeySnapshotter)) + if snapshotter != "" { + ss := string(cbkt.Bucket(k).Get(bucketKeySnapshotKey)) + select { + case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, ss)): + case <-ctx.Done(): + return ctx.Err() + } + } + + // TODO: Send additional snapshot refs through labels + return sendSnapshotRefs(ns, cbkt.Bucket(k), func(n gc.Node) { + select { + case nc <- n: + case <-ctx.Done(): + } + }) + }); err != nil { + return err + } + } + + sbkt := nbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + if err := sbkt.ForEach(func(sk, sv []byte) error { + if sv != nil { + return nil + } + snbkt := sbkt.Bucket(sk) + + return snbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + + return sendRootRef(ctx, nc, gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)), snbkt.Bucket(k)) + }) + }); err != nil { + return err + } + } + } + return nil +} + +func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error { + if node.Type == ResourceContent { + bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(node.Key)) + if bkt == nil { + // Node may be created from dead edge + return nil + } + + if err := sendSnapshotRefs(node.Namespace, bkt, fn); err != nil { + return err + } + return sendContentRefs(node.Namespace, bkt, fn) + } else if node.Type == ResourceSnapshot { + parts := strings.SplitN(node.Key, "/", 2) + if len(parts) != 2 { + return errors.Errorf("invalid snapshot gc key %s", node.Key) + } + ss := parts[0] + name := parts[1] + + bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots, []byte(ss), []byte(name)) + if bkt == nil { + getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots).ForEach(func(k, v []byte) error { + return nil + }) + + // Node may be created from dead edge + return nil + } + + if pv := bkt.Get(bucketKeyParent); len(pv) > 0 { + fn(gcnode(ResourceSnapshot, node.Namespace, fmt.Sprintf("%s/%s", ss, pv))) + } + + return sendSnapshotRefs(node.Namespace, bkt, fn) + } + + return nil +} + +func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + nbkt := v1bkt.Bucket(k) + ns := string(k) + + sbkt := nbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + if err := sbkt.ForEach(func(sk, sv []byte) error { + if sv != nil { + return nil + } + snbkt := sbkt.Bucket(sk) + return snbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + select { + case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)): + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + }); err != nil { + return err + } + } + + cbkt := nbkt.Bucket(bucketKeyObjectContent) + if cbkt != nil { + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + } + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + select { + case nc <- gcnode(ResourceContent, ns, string(k)): + case <-ctx.Done(): + return ctx.Err() + } + return nil + }); err != nil { + return err + } + } + } + + return nil +} + +func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + nsbkt := v1bkt.Bucket([]byte(node.Namespace)) + if nsbkt == nil { + return nil + } + + switch node.Type { + case ResourceContent: + cbkt := nsbkt.Bucket(bucketKeyObjectContent) + if cbkt != nil { + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + } + if cbkt != nil { + log.G(ctx).WithField("key", node.Key).Debug("delete content") + return cbkt.DeleteBucket([]byte(node.Key)) + } + case ResourceSnapshot: + sbkt := nsbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + parts := strings.SplitN(node.Key, "/", 2) + if len(parts) != 2 { + return errors.Errorf("invalid snapshot gc key %s", node.Key) + } + ssbkt := sbkt.Bucket([]byte(parts[0])) + if ssbkt != nil { + log.G(ctx).WithField("key", parts[1]).WithField("snapshotter", parts[0]).Debug("delete snapshot") + return ssbkt.DeleteBucket([]byte(parts[1])) + } + } + } + + return nil +} + +// sendSnapshotRefs sends all snapshot references referred to by the labels in the bkt +func sendSnapshotRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error { + lbkt := bkt.Bucket(bucketKeyObjectLabels) + if lbkt != nil { + lc := lbkt.Cursor() + + for k, v := lc.Seek(labelGCSnapRef); k != nil && strings.HasPrefix(string(k), string(labelGCSnapRef)); k, v = lc.Next() { + snapshotter := string(k[len(labelGCSnapRef):]) + fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v))) + } + } + return nil +} + +// sendContentRefs sends all content references referred to by the labels in the bkt +func sendContentRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error { + lbkt := bkt.Bucket(bucketKeyObjectLabels) + if lbkt != nil { + lc := lbkt.Cursor() + + labelRef := string(labelGCContentRef) + for k, v := lc.Seek(labelGCContentRef); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() { + if ks := string(k); ks != labelRef { + // Allow reference naming, ignore names + if ks[len(labelRef)] != '.' { + continue + } + } + + fn(gcnode(ResourceContent, ns, string(v))) + } + } + return nil +} + +func isRootRef(bkt *bolt.Bucket) bool { + lbkt := bkt.Bucket(bucketKeyObjectLabels) + if lbkt != nil { + rv := lbkt.Get(labelGCRoot) + if rv != nil { + // TODO: interpret rv as a timestamp and skip if expired + return true + } + } + return false +} + +func sendRootRef(ctx context.Context, nc chan<- gc.Node, n gc.Node, bkt *bolt.Bucket) error { + if isRootRef(bkt) { + select { + case nc <- n: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + +func gcnode(t gc.ResourceType, ns, key string) gc.Node { + return gc.Node{ + Type: t, + Namespace: ns, + Key: key, + } +} diff --git a/metadata/gc_test.go b/metadata/gc_test.go new file mode 100644 index 000000000..78422cd81 --- /dev/null +++ b/metadata/gc_test.go @@ -0,0 +1,406 @@ +package metadata + +import ( + "context" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "sort" + "testing" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/metadata/boltutil" + digest "github.com/opencontainers/go-digest" +) + +func TestGCRoots(t *testing.T) { + db, cleanup, err := newDatabase() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + alters := []alterFunc{ + addImage("ns1", "image1", dgst(1), nil), + addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")), + addContent("ns1", dgst(1), nil), + addContent("ns1", dgst(2), nil), + addContent("ns1", dgst(3), nil), + addContent("ns2", dgst(1), nil), + addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")), + addSnapshot("ns1", "overlay", "sn1", "", nil), + addSnapshot("ns1", "overlay", "sn2", "", nil), + addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")), + } + + expected := []gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()), + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceContent, "ns2", dgst(2).String()), + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn3"), + } + + if err := db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + for _, alter := range alters { + if err := alter(v1bkt); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + ctx := context.Background() + + checkNodes(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return scanRoots(ctx, tx, nc) + }) +} + +func TestGCRemove(t *testing.T) { + db, cleanup, err := newDatabase() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + alters := []alterFunc{ + addImage("ns1", "image1", dgst(1), nil), + addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")), + addContent("ns1", dgst(1), nil), + addContent("ns1", dgst(2), nil), + addContent("ns1", dgst(3), nil), + addContent("ns2", dgst(1), nil), + addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")), + addSnapshot("ns1", "overlay", "sn1", "", nil), + addSnapshot("ns1", "overlay", "sn2", "", nil), + addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")), + addSnapshot("ns2", "overlay", "sn1", "", nil), + } + + all := []gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()), + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceContent, "ns1", dgst(3).String()), + gcnode(ResourceContent, "ns2", dgst(1).String()), + gcnode(ResourceContent, "ns2", dgst(2).String()), + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn3"), + gcnode(ResourceSnapshot, "ns2", "overlay/sn1"), + } + + var deleted, remaining []gc.Node + for i, n := range all { + if i%2 == 0 { + deleted = append(deleted, n) + } else { + remaining = append(remaining, n) + } + } + + if err := db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + for _, alter := range alters { + if err := alter(v1bkt); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + ctx := context.Background() + + checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return scanAll(ctx, tx, nc) + }) + if t.Failed() { + t.Fatal("Scan all failed") + } + + if err := db.Update(func(tx *bolt.Tx) error { + for _, n := range deleted { + if err := remove(ctx, tx, n); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return scanAll(ctx, tx, nc) + }) +} + +func TestGCRefs(t *testing.T) { + db, cleanup, err := newDatabase() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + alters := []alterFunc{ + addContent("ns1", dgst(1), nil), + addContent("ns1", dgst(2), nil), + addContent("ns1", dgst(3), nil), + addContent("ns1", dgst(4), labelmap(string(labelGCContentRef), dgst(1).String())), + addContent("ns1", dgst(5), labelmap(string(labelGCContentRef)+".anything-1", dgst(2).String(), string(labelGCContentRef)+".anything-2", dgst(3).String())), + addContent("ns1", dgst(6), labelmap(string(labelGCContentRef)+"bad", dgst(1).String())), + addContent("ns2", dgst(1), nil), + addContent("ns2", dgst(2), nil), + addSnapshot("ns1", "overlay", "sn1", "", nil), + addSnapshot("ns1", "overlay", "sn2", "sn1", nil), + addSnapshot("ns1", "overlay", "sn3", "sn2", nil), + addSnapshot("ns1", "overlay", "sn4", "", labelmap(string(labelGCSnapRef)+"btrfs", "sn1", string(labelGCSnapRef)+"overlay", "sn1")), + addSnapshot("ns1", "btrfs", "sn1", "", nil), + addSnapshot("ns2", "overlay", "sn1", "", nil), + addSnapshot("ns2", "overlay", "sn2", "sn1", nil), + } + + refs := map[gc.Node][]gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()): nil, + gcnode(ResourceContent, "ns1", dgst(2).String()): nil, + gcnode(ResourceContent, "ns1", dgst(3).String()): nil, + gcnode(ResourceContent, "ns1", dgst(4).String()): { + gcnode(ResourceContent, "ns1", dgst(1).String()), + }, + gcnode(ResourceContent, "ns1", dgst(5).String()): { + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceContent, "ns1", dgst(3).String()), + }, + gcnode(ResourceContent, "ns1", dgst(6).String()): nil, + gcnode(ResourceContent, "ns2", dgst(1).String()): nil, + gcnode(ResourceContent, "ns2", dgst(2).String()): nil, + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"): nil, + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"): { + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"), + }, + gcnode(ResourceSnapshot, "ns1", "overlay/sn3"): { + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"), + }, + gcnode(ResourceSnapshot, "ns1", "overlay/sn4"): { + gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"), + }, + gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"): nil, + gcnode(ResourceSnapshot, "ns2", "overlay/sn1"): nil, + gcnode(ResourceSnapshot, "ns2", "overlay/sn2"): { + gcnode(ResourceSnapshot, "ns2", "overlay/sn1"), + }, + } + + if err := db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + for _, alter := range alters { + if err := alter(v1bkt); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + ctx := context.Background() + + for n, nodes := range refs { + checkNodes(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return references(ctx, tx, n, func(n gc.Node) { + select { + case nc <- n: + case <-ctx.Done(): + } + }) + }) + if t.Failed() { + t.Fatalf("Failure scanning %v", n) + } + } +} + +func newDatabase() (*bolt.DB, func(), error) { + td, err := ioutil.TempDir("", "gc-roots-") + if err != nil { + return nil, nil, err + } + + db, err := bolt.Open(filepath.Join(td, "test.db"), 0777, nil) + if err != nil { + os.RemoveAll(td) + return nil, nil, err + } + + return db, func() { + db.Close() + os.RemoveAll(td) + }, nil +} + +func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) { + var actual []gc.Node + nc := make(chan gc.Node) + done := make(chan struct{}) + go func() { + defer close(done) + for n := range nc { + actual = append(actual, n) + } + }() + if err := db.View(func(tx *bolt.Tx) error { + defer close(nc) + return fn(ctx, tx, nc) + }); err != nil { + t.Fatal(err) + } + + <-done + checkNodesEqual(t, actual, expected) +} + +func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) { + sort.Sort(nodeList(n1)) + sort.Sort(nodeList(n2)) + + if len(n1) != len(n2) { + t.Errorf("Nodes do not match\n\tExpected:\n\t%v\n\tActual:\n\t%v", n2, n1) + return + } + + for i := range n1 { + if n1[i] != n2[i] { + t.Errorf("[%d] root does not match expected: expected %v, got %v", i, n2[i], n1[i]) + } + } +} + +type nodeList []gc.Node + +func (nodes nodeList) Len() int { + return len(nodes) +} + +func (nodes nodeList) Less(i, j int) bool { + if nodes[i].Type != nodes[j].Type { + return nodes[i].Type < nodes[j].Type + } + if nodes[i].Namespace != nodes[j].Namespace { + return nodes[i].Namespace < nodes[j].Namespace + } + return nodes[i].Key < nodes[j].Key +} + +func (nodes nodeList) Swap(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] +} + +type alterFunc func(bkt *bolt.Bucket) error + +func addImage(ns, name string, dgst digest.Digest, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + ibkt, err := createBuckets(bkt, ns, string(bucketKeyObjectImages), name) + if err != nil { + return err + } + + tbkt, err := ibkt.CreateBucket(bucketKeyTarget) + if err != nil { + return err + } + if err := tbkt.Put(bucketKeyDigest, []byte(dgst.String())); err != nil { + return err + } + + return boltutil.WriteLabels(ibkt, labels) + } +} + +func addSnapshot(ns, snapshotter, name, parent string, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectSnapshots), snapshotter, name) + if err != nil { + return err + } + if parent != "" { + if err := sbkt.Put(bucketKeyParent, []byte(parent)); err != nil { + return err + } + } + return boltutil.WriteLabels(sbkt, labels) + } +} + +func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContent), string(bucketKeyObjectBlob), dgst.String()) + if err != nil { + return err + } + return boltutil.WriteLabels(cbkt, labels) + } +} + +func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name) + if err != nil { + return err + } + if err := cbkt.Put(bucketKeySnapshotter, []byte(snapshotter)); err != nil { + return err + } + if err := cbkt.Put(bucketKeySnapshotKey, []byte(snapshot)); err != nil { + return err + } + return boltutil.WriteLabels(cbkt, labels) + } +} + +func createBuckets(bkt *bolt.Bucket, names ...string) (*bolt.Bucket, error) { + for _, name := range names { + nbkt, err := bkt.CreateBucketIfNotExists([]byte(name)) + if err != nil { + return nil, err + } + bkt = nbkt + } + return bkt, nil +} + +func labelmap(kv ...string) map[string]string { + if len(kv)%2 != 0 { + panic("bad labels argument") + } + l := map[string]string{} + for i := 0; i < len(kv); i = i + 2 { + l[kv[i]] = kv[i+1] + } + return l +} + +func dgst(i int64) digest.Digest { + r := rand.New(rand.NewSource(i)) + dgstr := digest.SHA256.Digester() + if _, err := io.CopyN(dgstr.Hash(), r, 256); err != nil { + panic(err) + } + return dgstr.Digest() +} diff --git a/metadata/snapshot.go b/metadata/snapshot.go index 4103827a4..ad38e5915 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/boltdb/bolt" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/labels" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" @@ -19,12 +21,13 @@ import ( type snapshotter struct { snapshot.Snapshotter name string - db transactor + db *DB + l sync.RWMutex } // newSnapshotter returns a new Snapshotter which namespaces the given snapshot // using the provided name and database. -func newSnapshotter(db transactor, name string, sn snapshot.Snapshotter) snapshot.Snapshotter { +func newSnapshotter(db *DB, name string, sn snapshot.Snapshotter) *snapshotter { return &snapshotter{ Snapshotter: sn, name: name, @@ -125,6 +128,9 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro } func (s *snapshotter) Update(ctx context.Context, info snapshot.Info, fieldpaths ...string) (snapshot.Info, error) { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return snapshot.Info{}, err @@ -249,6 +255,9 @@ func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snap } func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshot.Opt) ([]mount.Mount, error) { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -332,6 +341,9 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re } func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshot.Opt) error { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -421,6 +433,9 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap } func (s *snapshotter) Remove(ctx context.Context, key string) error { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -457,7 +472,16 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { } } - return bkt.DeleteBucket([]byte(key)) + if err := bkt.DeleteBucket([]byte(key)); err != nil { + return err + } + + // Mark snapshotter as dirty for triggering garbage collection + s.db.dirtyL.Lock() + s.db.dirtySS[s.name] = struct{}{} + s.db.dirtyL.Unlock() + + return nil }) } @@ -565,3 +589,134 @@ func validateSnapshot(info *snapshot.Info) error { return nil } + +func (s *snapshotter) garbageCollect(ctx context.Context) error { + logger := log.G(ctx).WithField("snapshotter", s.name) + lt1 := time.Now() + s.l.Lock() + defer func() { + s.l.Unlock() + logger.WithField("t", time.Now().Sub(lt1)).Debugf("garbage collected") + }() + + seen := map[string]struct{}{} + if err := s.db.View(func(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + + sbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectSnapshots) + if sbkt == nil { + continue + } + + // Load specific snapshotter + ssbkt := sbkt.Bucket([]byte(s.name)) + if ssbkt == nil { + continue + } + + if err := ssbkt.ForEach(func(sk, sv []byte) error { + if sv == nil { + bkey := ssbkt.Bucket(sk).Get(bucketKeyName) + if len(bkey) > 0 { + seen[string(bkey)] = struct{}{} + } + } + return nil + }); err != nil { + return err + } + } + + return nil + }); err != nil { + return err + } + + roots, err := s.walkTree(ctx, seen) + if err != nil { + return err + } + + // TODO: Unlock before prune (once nodes are fully unavailable) + + for _, node := range roots { + if err := s.pruneBranch(ctx, node); err != nil { + return err + } + } + + return nil +} + +type treeNode struct { + info snapshot.Info + remove bool + children []*treeNode +} + +func (s *snapshotter) walkTree(ctx context.Context, seen map[string]struct{}) ([]*treeNode, error) { + roots := []*treeNode{} + nodes := map[string]*treeNode{} + + if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error { + _, isSeen := seen[info.Name] + node, ok := nodes[info.Name] + if !ok { + node = &treeNode{} + nodes[info.Name] = node + } + + node.remove = !isSeen + node.info = info + + if info.Parent == "" { + roots = append(roots, node) + } else { + parent, ok := nodes[info.Parent] + if !ok { + parent = &treeNode{} + nodes[info.Parent] = parent + } + parent.children = append(parent.children, node) + } + + return nil + }); err != nil { + return nil, err + } + + return roots, nil +} + +func (s *snapshotter) pruneBranch(ctx context.Context, node *treeNode) error { + for _, child := range node.children { + if err := s.pruneBranch(ctx, child); err != nil { + return err + } + } + + if node.remove { + logger := log.G(ctx).WithField("snapshotter", s.name) + if err := s.Snapshotter.Remove(ctx, node.info.Name); err != nil { + if !errdefs.IsFailedPrecondition(err) { + return err + } + logger.WithError(err).WithField("key", node.info.Name).Warnf("snapshot removal failed") + } else { + logger.WithField("key", node.info.Name).Debug("removed snapshot") + } + } + + return nil +} diff --git a/remotes/handlers.go b/remotes/handlers.go index 0a4db6bce..f92bb469d 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -2,6 +2,7 @@ package remotes import ( "context" + "encoding/json" "fmt" "io" "time" @@ -11,6 +12,7 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -102,7 +104,76 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc } defer rc.Close() - return content.Copy(ctx, cw, rc, desc.Size, desc.Digest) + r, opts := commitOpts(desc, rc) + return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...) +} + +// commitOpts gets the appropriate content options to alter +// the content info on commit based on media type. +func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) { + var childrenF func(r io.Reader) ([]ocispec.Descriptor, error) + + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) { + var ( + manifest ocispec.Manifest + decoder = json.NewDecoder(r) + ) + if err := decoder.Decode(&manifest); err != nil { + return nil, err + } + + return append([]ocispec.Descriptor{manifest.Config}, manifest.Layers...), nil + } + case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) { + var ( + index ocispec.Index + decoder = json.NewDecoder(r) + ) + if err := decoder.Decode(&index); err != nil { + return nil, err + } + + return index.Manifests, nil + } + default: + return r, nil + } + + pr, pw := io.Pipe() + + var children []ocispec.Descriptor + errC := make(chan error) + + go func() { + defer close(errC) + ch, err := childrenF(pr) + if err != nil { + errC <- err + } + children = ch + }() + + opt := func(info *content.Info) error { + err := <-errC + if err != nil { + return errors.Wrap(err, "unable to get commit labels") + } + + if len(children) > 0 { + if info.Labels == nil { + info.Labels = map[string]string{} + } + for i, ch := range children { + info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String() + } + } + return nil + } + + return io.TeeReader(r, pw), []content.Opt{opt} } // PushHandler returns a handler that will push all content from the provider diff --git a/rootfs/apply.go b/rootfs/apply.go index 4e21fd10f..a198c99f9 100644 --- a/rootfs/apply.go +++ b/rootfs/apply.go @@ -63,8 +63,8 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID) - // Prepare snapshot with from parent - mounts, err := sn.Prepare(ctx, key, parent.String()) + // Prepare snapshot with from parent, label as root + mounts, err := sn.Prepare(ctx, key, parent.String(), opts...) if err != nil { //TODO: If is snapshot exists error, retry return false, errors.Wrap(err, "failed to prepare extraction layer") @@ -87,7 +87,7 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap return false, err } - if err = sn.Commit(ctx, chainID.String(), key); err != nil { + if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil { if !errdefs.IsAlreadyExists(err) { return false, errors.Wrapf(err, "failed to commit snapshot %s", parent) } diff --git a/services/containers/service.go b/services/containers/service.go index 428de476d..c5322f874 100644 --- a/services/containers/service.go +++ b/services/containers/service.go @@ -10,6 +10,7 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -160,6 +161,10 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContainerRequest) ( return &empty.Empty{}, err } + if err := s.db.GarbageCollect(ctx); err != nil { + return &empty.Empty{}, errdefs.ToGRPC(errors.Wrap(err, "garbage collection failed")) + } + return &empty.Empty{}, nil } diff --git a/services/images/service.go b/services/images/service.go index 6c5d3b685..fa8a00aae 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -10,6 +10,7 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" + "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -161,6 +162,10 @@ func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) return nil, err } + if err := s.db.GarbageCollect(ctx); err != nil { + return nil, errdefs.ToGRPC(errors.Wrap(err, "garbage collection failed")) + } + return &empty.Empty{}, nil } diff --git a/services/tasks/service.go b/services/tasks/service.go index 6db93b5f8..75904a98c 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -567,7 +567,10 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io. if err != nil { return nil, err } - if err := writer.Commit(ctx, 0, ""); err != nil { + labels := map[string]string{ + "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339), + } + if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil { return nil, err } return &types.Descriptor{ diff --git a/spec_opts_unix.go b/spec_opts_unix.go index 337095cfa..7009522d2 100644 --- a/spec_opts_unix.go +++ b/spec_opts_unix.go @@ -11,6 +11,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "golang.org/x/sys/unix" @@ -20,6 +21,7 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/snapshot" "github.com/opencontainers/image-spec/identity" "github.com/opencontainers/image-spec/specs-go/v1" "github.com/opencontainers/runc/libcontainer/user" @@ -258,16 +260,19 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool snapshotter = client.SnapshotService(c.Snapshotter) parent = identity.ChainID(diffIDs).String() usernsID = fmt.Sprintf("%s-%d-%d", parent, uid, gid) + opt = snapshot.WithLabels(map[string]string{ + "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339), + }) ) if _, err := snapshotter.Stat(ctx, usernsID); err == nil { - if _, err := snapshotter.Prepare(ctx, id, usernsID); err != nil { + if _, err := snapshotter.Prepare(ctx, id, usernsID, opt); err != nil { return err } c.SnapshotKey = id c.Image = i.Name() return nil } - mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent) + mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent, opt) if err != nil { return err } @@ -275,13 +280,13 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool snapshotter.Remove(ctx, usernsID) return err } - if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap"); err != nil { + if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap", opt); err != nil { return err } if readonly { - _, err = snapshotter.View(ctx, id, usernsID) + _, err = snapshotter.View(ctx, id, usernsID, opt) } else { - _, err = snapshotter.Prepare(ctx, id, usernsID) + _, err = snapshotter.Prepare(ctx, id, usernsID, opt) } if err != nil { return err diff --git a/task.go b/task.go index 0496b5953..bf830d2cf 100644 --- a/task.go +++ b/task.go @@ -17,6 +17,7 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/rootfs" @@ -24,6 +25,7 @@ import ( google_protobuf "github.com/gogo/protobuf/types" digest "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/specs-go/v1" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" specs "github.com/opencontainers/runtime-spec/specs-go" "github.com/pkg/errors" ) @@ -486,7 +488,13 @@ func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tas } func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, snapshotterName string, id string) error { - rw, err := rootfs.Diff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id))) + opts := []diff.Opt{ + diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id)), + diff.WithLabels(map[string]string{ + "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339), + }), + } + rw, err := rootfs.Diff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), opts...) if err != nil { return err } @@ -510,15 +518,32 @@ func (t *task) checkpointImage(ctx context.Context, index *v1.Index, image strin return nil } -func (t *task) writeIndex(ctx context.Context, index *v1.Index) (v1.Descriptor, error) { +func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor, err error) { + labels := map[string]string{ + "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339), + } + + for i, m := range index.Manifests { + labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String() + defer func(m ocispec.Descriptor) { + if err == nil { + info := content.Info{Digest: m.Digest} + if _, uerr := t.client.ContentStore().Update(ctx, info, "labels.containerd.io/gc.root"); uerr != nil { + log.G(ctx).WithError(uerr).WithField("dgst", m.Digest).Warnf("failed to remove root marker") + } + } + }(m) + } + buf := bytes.NewBuffer(nil) if err := json.NewEncoder(buf).Encode(index); err != nil { return v1.Descriptor{}, err } - return writeContent(ctx, t.client.ContentStore(), v1.MediaTypeImageIndex, t.id, buf) + + return writeContent(ctx, t.client.ContentStore(), v1.MediaTypeImageIndex, t.id, buf, content.WithLabels(labels)) } -func writeContent(ctx context.Context, store content.Store, mediaType, ref string, r io.Reader) (d v1.Descriptor, err error) { +func writeContent(ctx context.Context, store content.Store, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) { writer, err := store.Writer(ctx, ref, 0, "") if err != nil { return d, err @@ -528,7 +553,7 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin if err != nil { return d, err } - if err := writer.Commit(ctx, size, ""); err != nil { + if err := writer.Commit(ctx, size, "", opts...); err != nil { return d, err } return v1.Descriptor{