diff --git a/gc/gc.go b/gc/gc.go index b9c265044..e892be155 100644 --- a/gc/gc.go +++ b/gc/gc.go @@ -5,8 +5,25 @@ // under certain use cases. package gc +import ( + "context" + "sync" +) + +// Resourcetype represents type of resource at a node +type ResourceType uint8 + +// Node presents a resource which has a type and key, +// this node can be used to lookup other nodes. +type Node struct { + Type ResourceType + Namespace string + Key string +} + // Tricolor implements basic, single-thread tri-color GC. Given the roots, the -// complete set and a refs function, this returns the unreachable objects. +// complete set and a refs function, this function returns a map of all +// reachable objects. // // Correct usage requires that the caller not allow the arguments to change // until the result is used to delete objects in the system. @@ -15,11 +32,11 @@ package gc // // We can probably use this to inform a design for incremental GC by injecting // callbacks to the set modification algorithms. -func Tricolor(roots []string, all []string, refs func(ref string) []string) []string { +func Tricolor(roots []Node, refs func(ref Node) ([]Node, error)) (map[Node]struct{}, error) { var ( - grays []string // maintain a gray "stack" - seen = map[string]struct{}{} // or not "white", basically "seen" - reachable = map[string]struct{}{} // or "block", in tri-color parlance + grays []Node // maintain a gray "stack" + seen = map[Node]struct{}{} // or not "white", basically "seen" + reachable = map[Node]struct{}{} // or "block", in tri-color parlance ) grays = append(grays, roots...) @@ -29,9 +46,13 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st id := grays[len(grays)-1] // effectively "depth first" because first element grays = grays[:len(grays)-1] seen[id] = struct{}{} // post-mark this as not-white + rs, err := refs(id) + if err != nil { + return nil, err + } // mark all the referenced objects as gray - for _, target := range refs(id) { + for _, target := range rs { if _, ok := seen[target]; !ok { grays = append(grays, target) } @@ -41,14 +62,99 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st reachable[id] = struct{}{} } + return reachable, nil +} + +// ConcurrentMark implements simple, concurrent GC. All the roots are scanned +// and the complete set of references is formed by calling the refs function +// for each seen object. This function returns a map of all object reachable +// from a root. +// +// Correct usage requires that the caller not allow the arguments to change +// until the result is used to delete objects in the system. +// +// It will allocate memory proportional to the size of the reachable set. +func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Context, Node, func(Node)) error) (map[Node]struct{}, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + grays = make(chan Node) + seen = map[Node]struct{}{} // or not "white", basically "seen" + wg sync.WaitGroup + + errOnce sync.Once + refErr error + ) + + go func() { + for gray := range grays { + if _, ok := seen[gray]; ok { + wg.Done() + continue + } + seen[gray] = struct{}{} // post-mark this as non-white + + go func(gray Node) { + defer wg.Done() + + send := func(n Node) { + wg.Add(1) + select { + case grays <- n: + case <-ctx.Done(): + wg.Done() + } + } + + if err := refs(ctx, gray, send); err != nil { + errOnce.Do(func() { + refErr = err + cancel() + }) + } + + }(gray) + } + }() + + for r := range root { + wg.Add(1) + select { + case grays <- r: + case <-ctx.Done(): + wg.Done() + } + + } + + // Wait for outstanding grays to be processed + wg.Wait() + + close(grays) + + if refErr != nil { + return nil, refErr + } + if cErr := ctx.Err(); cErr != nil { + return nil, cErr + } + + return seen, nil +} + +// Sweep removes all nodes returned through the channel which are not in +// the reachable set by calling the provided remove function. +func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error { // All black objects are now reachable, and all white objects are // unreachable. Free those that are white! - var whites []string - for _, obj := range all { - if _, ok := reachable[obj]; !ok { - whites = append(whites, obj) + for node := range all { + if _, ok := reachable[node]; !ok { + if err := remove(node); err != nil { + return err + } } } - return whites + return nil } diff --git a/gc/gc_test.go b/gc/gc_test.go index 94ad3dd27..c3c6a9590 100644 --- a/gc/gc_test.go +++ b/gc/gc_test.go @@ -1,30 +1,154 @@ package gc import ( + "context" "reflect" "testing" ) func TestTricolorBasic(t *testing.T) { roots := []string{"A", "C"} - all := []string{"A", "B", "C", "D", "E", "F", "G"} + all := []string{"A", "B", "C", "D", "E", "F", "G", "H"} refs := map[string][]string{ "A": {"B"}, "B": {"A"}, "C": {"D", "F", "B"}, "E": {"F", "G"}, + "F": {"H"}, } - unreachable := Tricolor(roots, all, lookup(refs)) - expected := []string{"E", "G"} + expected := toNodes([]string{"A", "B", "C", "D", "F", "H"}) - if !reflect.DeepEqual(unreachable, expected) { - t.Fatalf("incorrect unreachable set: %v != %v", unreachable, expected) + reachable, err := Tricolor(toNodes(roots), lookup(refs)) + if err != nil { + t.Fatal(err) + } + + var sweeped []Node + for _, a := range toNodes(all) { + if _, ok := reachable[a]; ok { + sweeped = append(sweeped, a) + } + } + + if !reflect.DeepEqual(sweeped, expected) { + t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected) } } -func lookup(refs map[string][]string) func(id string) []string { - return func(ref string) []string { - return refs[ref] +func TestConcurrentBasic(t *testing.T) { + roots := []string{"A", "C"} + all := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"} + refs := map[string][]string{ + "A": {"B"}, + "B": {"A"}, + "C": {"D", "F", "B"}, + "E": {"F", "G"}, + "F": {"H"}, + "G": {"I"}, + } + + expected := toNodes([]string{"A", "B", "C", "D", "F", "H"}) + + ctx := context.Background() + rootC := make(chan Node) + go func() { + writeNodes(ctx, rootC, toNodes(roots)) + close(rootC) + }() + + reachable, err := ConcurrentMark(ctx, rootC, lookupc(refs)) + if err != nil { + t.Fatal(err) + } + + var sweeped []Node + for _, a := range toNodes(all) { + if _, ok := reachable[a]; ok { + sweeped = append(sweeped, a) + } + } + + if !reflect.DeepEqual(sweeped, expected) { + t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected) } } + +func writeNodes(ctx context.Context, nc chan<- Node, nodes []Node) { + for _, n := range nodes { + select { + case nc <- n: + case <-ctx.Done(): + return + } + } +} + +func lookup(refs map[string][]string) func(id Node) ([]Node, error) { + return func(ref Node) ([]Node, error) { + return toNodes(refs[ref.Key]), nil + } +} + +func lookupc(refs map[string][]string) func(context.Context, Node, func(Node)) error { + return func(ctx context.Context, ref Node, fn func(Node)) error { + for _, n := range toNodes(refs[ref.Key]) { + fn(n) + } + return nil + } +} + +func toNodes(s []string) []Node { + n := make([]Node, len(s)) + for i := range s { + n[i] = Node{ + Key: s[i], + } + } + return n +} + +func newScanner(refs []string) *stringScanner { + return &stringScanner{ + i: -1, + s: refs, + } +} + +type stringScanner struct { + i int + s []string +} + +func (ss *stringScanner) Next() bool { + ss.i++ + return ss.i < len(ss.s) +} + +func (ss *stringScanner) Node() Node { + return Node{ + Key: ss.s[ss.i], + } +} + +func (ss *stringScanner) Cleanup() error { + ss.s[ss.i] = "" + return nil +} + +func (ss *stringScanner) Err() error { + return nil +} + +func (ss *stringScanner) All() []Node { + remaining := make([]Node, 0, len(ss.s)) + for _, s := range ss.s { + if s != "" { + remaining = append(remaining, Node{ + Key: s, + }) + } + } + return remaining +} diff --git a/metadata/content.go b/metadata/content.go index 023d5d215..05064fdec 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "strings" + "sync" "time" "github.com/boltdb/bolt" @@ -11,6 +12,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/labels" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/namespaces" digest "github.com/opencontainers/go-digest" @@ -19,12 +21,13 @@ import ( type contentStore struct { content.Store - db transactor + db *DB + l sync.RWMutex } // newContentStore returns a namespaced content store using an existing // content store interface. -func newContentStore(db transactor, cs content.Store) content.Store { +func newContentStore(db *DB, cs content.Store) *contentStore { return &contentStore{ Store: cs, db: db, @@ -59,6 +62,9 @@ func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpath return content.Info{}, err } + cs.l.RLock() + defer cs.l.RUnlock() + updated := content.Info{ Digest: info.Digest, } @@ -166,15 +172,25 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { return err } + cs.l.RLock() + defer cs.l.RUnlock() + return update(ctx, cs.db, func(tx *bolt.Tx) error { bkt := getBlobBucket(tx, ns, dgst) if bkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst) } - // Just remove local reference, garbage collector is responsible for - // cleaning up on disk content - return getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())) + if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil { + return err + } + + // Mark content store as dirty for triggering garbage collection + cs.db.dirtyL.Lock() + cs.db.dirtyCS = true + cs.db.dirtyL.Unlock() + + return nil }) } @@ -269,6 +285,9 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error { return err } + cs.l.RLock() + defer cs.l.RUnlock() + return update(ctx, cs.db, func(tx *bolt.Tx) error { bkt := getIngestBucket(tx, ns) if bkt == nil { @@ -293,6 +312,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe return nil, err } + cs.l.RLock() + defer cs.l.RUnlock() + var w content.Writer if err := update(ctx, cs.db, func(tx *bolt.Tx) error { if expected != "" { @@ -346,6 +368,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe ref: ref, namespace: ns, db: cs.db, + l: &cs.l, }, nil } @@ -354,9 +377,13 @@ type namespacedWriter struct { ref string namespace string db transactor + l *sync.RWMutex } func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + nw.l.RLock() + defer nw.l.RUnlock() + return update(ctx, nw.db, func(tx *bolt.Tx) error { bkt := getIngestBucket(tx, nw.namespace) if bkt != nil { @@ -495,3 +522,61 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error { return bkt.Put(bucketKeySize, sizeEncoded) } + +func (cs *contentStore) garbageCollect(ctx context.Context) error { + lt1 := time.Now() + cs.l.Lock() + defer func() { + cs.l.Unlock() + log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected") + }() + + seen := map[string]struct{}{} + if err := cs.db.View(func(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + + cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent) + if cbkt == nil { + continue + } + bbkt := cbkt.Bucket(bucketKeyObjectBlob) + if err := bbkt.ForEach(func(ck, cv []byte) error { + if cv == nil { + seen[string(ck)] = struct{}{} + } + return nil + }); err != nil { + return err + } + } + + return nil + }); err != nil { + return err + } + + if err := cs.Store.Walk(ctx, func(info content.Info) error { + if _, ok := seen[info.Digest.String()]; !ok { + if err := cs.Store.Delete(ctx, info.Digest); err != nil { + return err + } + log.G(ctx).WithField("digest", info.Digest).Debug("removed content") + } + return nil + }); err != nil { + return err + } + + return nil +} diff --git a/metadata/db.go b/metadata/db.go index aed6f1e9a..7ae293159 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -3,10 +3,13 @@ package metadata import ( "context" "encoding/binary" + "strings" + "sync" "time" "github.com/boltdb/bolt" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/gc" "github.com/containerd/containerd/log" "github.com/containerd/containerd/snapshot" "github.com/pkg/errors" @@ -36,15 +39,32 @@ type DB struct { db *bolt.DB ss map[string]snapshot.Snapshotter cs content.Store + + // wlock is used to protect access to the data structures during garbage + // collection. While the wlock is held no writable transactions can be + // opened, preventing changes from occurring between the mark and + // sweep phases without preventing read transactions. + wlock sync.RWMutex + + // dirty flags and lock keeps track of datastores which have had deletions + // since the last garbage collection. These datastores will will be garbage + // collected during the next garbage collection. + dirtyL sync.Mutex + dirtySS map[string]struct{} + dirtyCS bool + + // TODO: Keep track of stats such as pause time, number of collected objects, errors + lastCollection time.Time } // NewDB creates a new metadata database using the provided // bolt database, content store, and snapshotters. func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB { return &DB{ - db: db, - ss: ss, - cs: cs, + db: db, + ss: ss, + cs: cs, + dirtySS: map[string]struct{}{}, } } @@ -158,5 +178,134 @@ func (m *DB) View(fn func(*bolt.Tx) error) error { // Update runs a writable transation on the metadata store. func (m *DB) Update(fn func(*bolt.Tx) error) error { + m.wlock.RLock() + defer m.wlock.RUnlock() return m.db.Update(fn) } + +func (m *DB) GarbageCollect(ctx context.Context) error { + lt1 := time.Now() + m.wlock.Lock() + defer func() { + m.wlock.Unlock() + log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected") + }() + + var marked map[gc.Node]struct{} + + if err := m.db.View(func(tx *bolt.Tx) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + roots := make(chan gc.Node) + errChan := make(chan error) + go func() { + defer close(errChan) + defer close(roots) + + // Call roots + if err := scanRoots(ctx, tx, roots); err != nil { + cancel() + errChan <- err + } + }() + + refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error { + return references(ctx, tx, n, fn) + } + + reachable, err := gc.ConcurrentMark(ctx, roots, refs) + if rerr := <-errChan; rerr != nil { + return rerr + } + if err != nil { + return err + } + marked = reachable + return nil + }); err != nil { + return err + } + + m.dirtyL.Lock() + defer m.dirtyL.Unlock() + + if err := m.db.Update(func(tx *bolt.Tx) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + nodeC := make(chan gc.Node) + var scanErr error + + go func() { + defer close(nodeC) + scanErr = scanAll(ctx, tx, nodeC) + }() + + rm := func(n gc.Node) error { + if n.Type == ResourceSnapshot { + if idx := strings.IndexRune(n.Key, '/'); idx > 0 { + m.dirtySS[n.Key[:idx]] = struct{}{} + } + } else if n.Type == ResourceContent { + m.dirtyCS = true + } + return remove(ctx, tx, n) + } + + if err := gc.Sweep(marked, nodeC, rm); err != nil { + return errors.Wrap(err, "failed to sweep") + } + + if scanErr != nil { + return errors.Wrap(scanErr, "failed to scan all") + } + + return nil + }); err != nil { + return err + } + + m.lastCollection = time.Now() + + if len(m.dirtySS) > 0 { + for snapshotterName := range m.dirtySS { + log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup") + go m.cleanupSnapshotter(snapshotterName) + } + m.dirtySS = map[string]struct{}{} + } + + if m.dirtyCS { + log.G(ctx).Debug("scheduling content cleanup") + go m.cleanupContent() + m.dirtyCS = false + } + + return nil +} + +func (m *DB) cleanupSnapshotter(name string) { + ctx := context.Background() + sn, ok := m.ss[name] + if !ok { + return + } + + err := newSnapshotter(m, name, sn).garbageCollect(ctx) + if err != nil { + log.G(ctx).WithError(err).WithField("snapshotter", name).Warn("garbage collection failed") + } +} + +func (m *DB) cleanupContent() { + ctx := context.Background() + if m.cs == nil { + return + } + + err := newContentStore(m, m.cs).garbageCollect(ctx) + if err != nil { + log.G(ctx).WithError(err).Warn("content garbage collection failed") + } +} diff --git a/metadata/db_test.go b/metadata/db_test.go index e14242765..ff3e9cd15 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -1,11 +1,31 @@ package metadata import ( + "context" "encoding/binary" + "fmt" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "runtime/pprof" "testing" + "time" "github.com/boltdb/bolt" + "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/snapshot" + "github.com/containerd/containerd/snapshot/naive" + "github.com/gogo/protobuf/types" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -176,3 +196,366 @@ func readDBVersion(db *bolt.DB, schema []byte) (int, error) { } return version, nil } + +func TestMetadataCollector(t *testing.T) { + mdb, cs, sn, cleanup := newStores(t) + defer cleanup() + + var ( + ctx = context.Background() + + objects = []object{ + blob(bytesFor(1), true), + blob(bytesFor(2), false), + blob(bytesFor(3), true), + blob(bytesFor(4), false, "containerd.io/gc.root", time.Now().String()), + newSnapshot("1", "", false, false), + newSnapshot("2", "1", false, false), + newSnapshot("3", "2", false, false), + newSnapshot("4", "3", false, false), + newSnapshot("5", "3", false, true), + container("1", "4"), + image("image-1", digestFor(2)), + } + remaining []gc.Node + ) + + if err := mdb.Update(func(tx *bolt.Tx) error { + for _, obj := range objects { + node, err := create(obj, tx, cs, sn) + if err != nil { + return err + } + if node != nil { + remaining = append(remaining, *node) + } + } + return nil + }); err != nil { + t.Fatalf("Creation failed: %+v", err) + } + + if err := mdb.GarbageCollect(ctx); err != nil { + t.Fatal(err) + } + + var actual []gc.Node + + if err := mdb.View(func(tx *bolt.Tx) error { + nodeC := make(chan gc.Node) + var scanErr error + go func() { + defer close(nodeC) + scanErr = scanAll(ctx, tx, nodeC) + }() + for node := range nodeC { + actual = append(actual, node) + } + return scanErr + }); err != nil { + t.Fatal(err) + } + + checkNodesEqual(t, actual, remaining) +} + +func BenchmarkGarbageCollect(b *testing.B) { + b.Run("10-Sets", benchmarkTrigger(10)) + b.Run("100-Sets", benchmarkTrigger(100)) + b.Run("1000-Sets", benchmarkTrigger(1000)) + b.Run("10000-Sets", benchmarkTrigger(10000)) +} + +func benchmarkTrigger(n int) func(b *testing.B) { + return func(b *testing.B) { + mdb, cs, sn, cleanup := newStores(b) + defer cleanup() + + objects := []object{} + + // TODO: Allow max to be configurable + for i := 0; i < n; i++ { + objects = append(objects, + blob(bytesFor(int64(i)), false), + image(fmt.Sprintf("image-%d", i), digestFor(int64(i))), + ) + lastSnapshot := 6 + for j := 0; j <= lastSnapshot; j++ { + var parent string + key := fmt.Sprintf("snapshot-%d-%d", i, j) + if j > 0 { + parent = fmt.Sprintf("snapshot-%d-%d", i, j-1) + } + objects = append(objects, newSnapshot(key, parent, false, false)) + } + objects = append(objects, container(fmt.Sprintf("container-%d", i), fmt.Sprintf("snapshot-%d-%d", i, lastSnapshot))) + + } + + // TODO: Create set of objects for removal + + var ( + ctx = context.Background() + + remaining []gc.Node + ) + + if err := mdb.Update(func(tx *bolt.Tx) error { + for _, obj := range objects { + node, err := create(obj, tx, cs, sn) + if err != nil { + return err + } + if node != nil { + remaining = append(remaining, *node) + } + } + return nil + }); err != nil { + b.Fatalf("Creation failed: %+v", err) + } + + // TODO: reset benchmark + b.ResetTimer() + //b.StopTimer() + + labels := pprof.Labels("worker", "trigger") + pprof.Do(ctx, labels, func(ctx context.Context) { + for i := 0; i < b.N; i++ { + + // TODO: Add removal objects + + //b.StartTimer() + + if err := mdb.GarbageCollect(ctx); err != nil { + b.Fatal(err) + } + + //b.StopTimer() + + //var actual []gc.Node + + //if err := db.View(func(tx *bolt.Tx) error { + // nodeC := make(chan gc.Node) + // var scanErr error + // go func() { + // defer close(nodeC) + // scanErr = scanAll(ctx, tx, nodeC) + // }() + // for node := range nodeC { + // actual = append(actual, node) + // } + // return scanErr + //}); err != nil { + // t.Fatal(err) + //} + + //checkNodesEqual(t, actual, remaining) + } + }) + } +} + +func bytesFor(i int64) []byte { + r := rand.New(rand.NewSource(i)) + var b [256]byte + _, err := r.Read(b[:]) + if err != nil { + panic(err) + } + return b[:] +} + +func digestFor(i int64) digest.Digest { + r := rand.New(rand.NewSource(i)) + dgstr := digest.SHA256.Digester() + _, err := io.Copy(dgstr.Hash(), io.LimitReader(r, 256)) + if err != nil { + panic(err) + } + return dgstr.Digest() +} + +type object struct { + data interface{} + removed bool + labels map[string]string +} + +func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshot.Snapshotter) (*gc.Node, error) { + var ( + node *gc.Node + namespace = "test" + ctx = namespaces.WithNamespace(context.Background(), namespace) + ) + + switch v := obj.data.(type) { + case testContent: + ctx := WithTransactionContext(ctx, tx) + expected := digest.FromBytes(v.data) + w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected) + if err != nil { + return nil, errors.Wrap(err, "failed to create writer") + } + if _, err := w.Write(v.data); err != nil { + return nil, errors.Wrap(err, "write blob failed") + } + if err := w.Commit(ctx, int64(len(v.data)), expected, content.WithLabels(obj.labels)); err != nil { + return nil, errors.Wrap(err, "failed to commit blob") + } + if !obj.removed { + node = &gc.Node{ + Type: ResourceContent, + Namespace: namespace, + Key: expected.String(), + } + } + case testSnapshot: + ctx := WithTransactionContext(ctx, tx) + if v.active { + _, err := sn.Prepare(ctx, v.key, v.parent, snapshot.WithLabels(obj.labels)) + if err != nil { + return nil, err + } + } else { + akey := fmt.Sprintf("%s-active", v.key) + _, err := sn.Prepare(ctx, akey, v.parent) + if err != nil { + return nil, err + } + if err := sn.Commit(ctx, v.key, akey, snapshot.WithLabels(obj.labels)); err != nil { + return nil, err + } + } + if !obj.removed { + node = &gc.Node{ + Type: ResourceSnapshot, + Namespace: namespace, + Key: fmt.Sprintf("naive/%s", v.key), + } + } + case testImage: + image := images.Image{ + Name: v.name, + Target: v.target, + Labels: obj.labels, + } + _, err := NewImageStore(tx).Create(ctx, image) + if err != nil { + return nil, errors.Wrap(err, "failed to create image") + } + case testContainer: + container := containers.Container{ + ID: v.id, + SnapshotKey: v.snapshot, + Snapshotter: "naive", + Labels: obj.labels, + + Runtime: containers.RuntimeInfo{ + Name: "testruntime", + }, + Spec: &types.Any{}, + } + _, err := NewContainerStore(tx).Create(ctx, container) + if err != nil { + return nil, err + } + } + + return node, nil +} + +func blob(b []byte, r bool, l ...string) object { + return object{ + data: testContent{ + data: b, + }, + removed: r, + labels: labelmap(l...), + } +} + +func image(n string, d digest.Digest, l ...string) object { + return object{ + data: testImage{ + name: n, + target: ocispec.Descriptor{ + MediaType: "irrelevant", + Digest: d, + Size: 256, + }, + }, + removed: false, + labels: labelmap(l...), + } +} + +func newSnapshot(key, parent string, active, r bool, l ...string) object { + return object{ + data: testSnapshot{ + key: key, + parent: parent, + active: active, + }, + removed: r, + labels: labelmap(l...), + } +} + +func container(id, s string, l ...string) object { + return object{ + data: testContainer{ + id: id, + snapshot: s, + }, + removed: false, + labels: labelmap(l...), + } +} + +type testContent struct { + data []byte +} + +type testSnapshot struct { + key string + parent string + active bool +} + +type testImage struct { + name string + target ocispec.Descriptor +} + +type testContainer struct { + id string + snapshot string +} + +func newStores(t testing.TB) (*DB, content.Store, snapshot.Snapshotter, func()) { + td, err := ioutil.TempDir("", "gc-test-") + if err != nil { + t.Fatal(err) + } + db, err := bolt.Open(filepath.Join(td, "meta.db"), 0644, nil) + if err != nil { + t.Fatal(err) + } + + nsn, err := naive.NewSnapshotter(filepath.Join(td, "snapshots")) + if err != nil { + t.Fatal(err) + } + + lcs, err := local.NewStore(filepath.Join(td, "content")) + if err != nil { + t.Fatal(err) + } + + mdb := NewDB(db, lcs, map[string]snapshot.Snapshotter{"naive": nsn}) + + return mdb, mdb.ContentStore(), mdb.Snapshotter("naive"), func() { + os.RemoveAll(td) + } +} diff --git a/metadata/gc.go b/metadata/gc.go new file mode 100644 index 000000000..8434d694b --- /dev/null +++ b/metadata/gc.go @@ -0,0 +1,343 @@ +package metadata + +import ( + "context" + "fmt" + "strings" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/log" + "github.com/pkg/errors" +) + +const ( + ResourceUnknown gc.ResourceType = iota + ResourceContent + ResourceSnapshot + ResourceContainer + ResourceTask +) + +var ( + labelGCRoot = []byte("containerd.io/gc.root") + labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.") + labelGCContentRef = []byte("containerd.io/gc.ref.content") +) + +func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + nbkt := v1bkt.Bucket(k) + ns := string(k) + + ibkt := nbkt.Bucket(bucketKeyObjectImages) + if ibkt != nil { + if err := ibkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + + target := ibkt.Bucket(k).Bucket(bucketKeyTarget) + if target != nil { + contentKey := string(target.Get(bucketKeyDigest)) + select { + case nc <- gcnode(ResourceContent, ns, contentKey): + case <-ctx.Done(): + return ctx.Err() + } + } + return sendSnapshotRefs(ns, ibkt.Bucket(k), func(n gc.Node) { + select { + case nc <- n: + case <-ctx.Done(): + } + }) + }); err != nil { + return err + } + } + + cbkt := nbkt.Bucket(bucketKeyObjectContent) + if cbkt != nil { + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + } + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k)) + }); err != nil { + return err + } + } + + cbkt = nbkt.Bucket(bucketKeyObjectContainers) + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + snapshotter := string(cbkt.Bucket(k).Get(bucketKeySnapshotter)) + if snapshotter != "" { + ss := string(cbkt.Bucket(k).Get(bucketKeySnapshotKey)) + select { + case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, ss)): + case <-ctx.Done(): + return ctx.Err() + } + } + + // TODO: Send additional snapshot refs through labels + return sendSnapshotRefs(ns, cbkt.Bucket(k), func(n gc.Node) { + select { + case nc <- n: + case <-ctx.Done(): + } + }) + }); err != nil { + return err + } + } + + sbkt := nbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + if err := sbkt.ForEach(func(sk, sv []byte) error { + if sv != nil { + return nil + } + snbkt := sbkt.Bucket(sk) + + return snbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + + return sendRootRef(ctx, nc, gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)), snbkt.Bucket(k)) + }) + }); err != nil { + return err + } + } + } + return nil +} + +func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error { + if node.Type == ResourceContent { + bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(node.Key)) + if bkt == nil { + // Node may be created from dead edge + return nil + } + + if err := sendSnapshotRefs(node.Namespace, bkt, fn); err != nil { + return err + } + return sendContentRefs(node.Namespace, bkt, fn) + } else if node.Type == ResourceSnapshot { + parts := strings.SplitN(node.Key, "/", 2) + if len(parts) != 2 { + return errors.Errorf("invalid snapshot gc key %s", node.Key) + } + ss := parts[0] + name := parts[1] + + bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots, []byte(ss), []byte(name)) + if bkt == nil { + getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots).ForEach(func(k, v []byte) error { + return nil + }) + + // Node may be created from dead edge + return nil + } + + if pv := bkt.Get(bucketKeyParent); len(pv) > 0 { + fn(gcnode(ResourceSnapshot, node.Namespace, fmt.Sprintf("%s/%s", ss, pv))) + } + + return sendSnapshotRefs(node.Namespace, bkt, fn) + } + + return nil +} + +func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + nbkt := v1bkt.Bucket(k) + ns := string(k) + + sbkt := nbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + if err := sbkt.ForEach(func(sk, sv []byte) error { + if sv != nil { + return nil + } + snbkt := sbkt.Bucket(sk) + return snbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + select { + case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)): + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + }); err != nil { + return err + } + } + + cbkt := nbkt.Bucket(bucketKeyObjectContent) + if cbkt != nil { + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + } + if cbkt != nil { + if err := cbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + select { + case nc <- gcnode(ResourceContent, ns, string(k)): + case <-ctx.Done(): + return ctx.Err() + } + return nil + }); err != nil { + return err + } + } + } + + return nil +} + +func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + nsbkt := v1bkt.Bucket([]byte(node.Namespace)) + if nsbkt == nil { + return nil + } + + switch node.Type { + case ResourceContent: + cbkt := nsbkt.Bucket(bucketKeyObjectContent) + if cbkt != nil { + cbkt = cbkt.Bucket(bucketKeyObjectBlob) + } + if cbkt != nil { + log.G(ctx).WithField("key", node.Key).Debug("delete content") + return cbkt.DeleteBucket([]byte(node.Key)) + } + case ResourceSnapshot: + sbkt := nsbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + parts := strings.SplitN(node.Key, "/", 2) + if len(parts) != 2 { + return errors.Errorf("invalid snapshot gc key %s", node.Key) + } + ssbkt := sbkt.Bucket([]byte(parts[0])) + if ssbkt != nil { + log.G(ctx).WithField("key", parts[1]).WithField("snapshotter", parts[0]).Debug("delete snapshot") + return ssbkt.DeleteBucket([]byte(parts[1])) + } + } + } + + return nil +} + +// sendSnapshotRefs sends all snapshot references referred to by the labels in the bkt +func sendSnapshotRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error { + lbkt := bkt.Bucket(bucketKeyObjectLabels) + if lbkt != nil { + lc := lbkt.Cursor() + + for k, v := lc.Seek(labelGCSnapRef); k != nil && strings.HasPrefix(string(k), string(labelGCSnapRef)); k, v = lc.Next() { + snapshotter := string(k[len(labelGCSnapRef):]) + fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v))) + } + } + return nil +} + +// sendContentRefs sends all content references referred to by the labels in the bkt +func sendContentRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error { + lbkt := bkt.Bucket(bucketKeyObjectLabels) + if lbkt != nil { + lc := lbkt.Cursor() + + labelRef := string(labelGCContentRef) + for k, v := lc.Seek(labelGCContentRef); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() { + if ks := string(k); ks != labelRef { + // Allow reference naming, ignore names + if ks[len(labelRef)] != '.' { + continue + } + } + + fn(gcnode(ResourceContent, ns, string(v))) + } + } + return nil +} + +func isRootRef(bkt *bolt.Bucket) bool { + lbkt := bkt.Bucket(bucketKeyObjectLabels) + if lbkt != nil { + rv := lbkt.Get(labelGCRoot) + if rv != nil { + // TODO: interpret rv as a timestamp and skip if expired + return true + } + } + return false +} + +func sendRootRef(ctx context.Context, nc chan<- gc.Node, n gc.Node, bkt *bolt.Bucket) error { + if isRootRef(bkt) { + select { + case nc <- n: + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} + +func gcnode(t gc.ResourceType, ns, key string) gc.Node { + return gc.Node{ + Type: t, + Namespace: ns, + Key: key, + } +} diff --git a/metadata/gc_test.go b/metadata/gc_test.go new file mode 100644 index 000000000..78422cd81 --- /dev/null +++ b/metadata/gc_test.go @@ -0,0 +1,406 @@ +package metadata + +import ( + "context" + "io" + "io/ioutil" + "math/rand" + "os" + "path/filepath" + "sort" + "testing" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/gc" + "github.com/containerd/containerd/metadata/boltutil" + digest "github.com/opencontainers/go-digest" +) + +func TestGCRoots(t *testing.T) { + db, cleanup, err := newDatabase() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + alters := []alterFunc{ + addImage("ns1", "image1", dgst(1), nil), + addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")), + addContent("ns1", dgst(1), nil), + addContent("ns1", dgst(2), nil), + addContent("ns1", dgst(3), nil), + addContent("ns2", dgst(1), nil), + addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")), + addSnapshot("ns1", "overlay", "sn1", "", nil), + addSnapshot("ns1", "overlay", "sn2", "", nil), + addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")), + } + + expected := []gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()), + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceContent, "ns2", dgst(2).String()), + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn3"), + } + + if err := db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + for _, alter := range alters { + if err := alter(v1bkt); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + ctx := context.Background() + + checkNodes(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return scanRoots(ctx, tx, nc) + }) +} + +func TestGCRemove(t *testing.T) { + db, cleanup, err := newDatabase() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + alters := []alterFunc{ + addImage("ns1", "image1", dgst(1), nil), + addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")), + addContent("ns1", dgst(1), nil), + addContent("ns1", dgst(2), nil), + addContent("ns1", dgst(3), nil), + addContent("ns2", dgst(1), nil), + addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")), + addSnapshot("ns1", "overlay", "sn1", "", nil), + addSnapshot("ns1", "overlay", "sn2", "", nil), + addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")), + addSnapshot("ns2", "overlay", "sn1", "", nil), + } + + all := []gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()), + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceContent, "ns1", dgst(3).String()), + gcnode(ResourceContent, "ns2", dgst(1).String()), + gcnode(ResourceContent, "ns2", dgst(2).String()), + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn3"), + gcnode(ResourceSnapshot, "ns2", "overlay/sn1"), + } + + var deleted, remaining []gc.Node + for i, n := range all { + if i%2 == 0 { + deleted = append(deleted, n) + } else { + remaining = append(remaining, n) + } + } + + if err := db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + for _, alter := range alters { + if err := alter(v1bkt); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + ctx := context.Background() + + checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return scanAll(ctx, tx, nc) + }) + if t.Failed() { + t.Fatal("Scan all failed") + } + + if err := db.Update(func(tx *bolt.Tx) error { + for _, n := range deleted { + if err := remove(ctx, tx, n); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return scanAll(ctx, tx, nc) + }) +} + +func TestGCRefs(t *testing.T) { + db, cleanup, err := newDatabase() + if err != nil { + t.Fatal(err) + } + defer cleanup() + + alters := []alterFunc{ + addContent("ns1", dgst(1), nil), + addContent("ns1", dgst(2), nil), + addContent("ns1", dgst(3), nil), + addContent("ns1", dgst(4), labelmap(string(labelGCContentRef), dgst(1).String())), + addContent("ns1", dgst(5), labelmap(string(labelGCContentRef)+".anything-1", dgst(2).String(), string(labelGCContentRef)+".anything-2", dgst(3).String())), + addContent("ns1", dgst(6), labelmap(string(labelGCContentRef)+"bad", dgst(1).String())), + addContent("ns2", dgst(1), nil), + addContent("ns2", dgst(2), nil), + addSnapshot("ns1", "overlay", "sn1", "", nil), + addSnapshot("ns1", "overlay", "sn2", "sn1", nil), + addSnapshot("ns1", "overlay", "sn3", "sn2", nil), + addSnapshot("ns1", "overlay", "sn4", "", labelmap(string(labelGCSnapRef)+"btrfs", "sn1", string(labelGCSnapRef)+"overlay", "sn1")), + addSnapshot("ns1", "btrfs", "sn1", "", nil), + addSnapshot("ns2", "overlay", "sn1", "", nil), + addSnapshot("ns2", "overlay", "sn2", "sn1", nil), + } + + refs := map[gc.Node][]gc.Node{ + gcnode(ResourceContent, "ns1", dgst(1).String()): nil, + gcnode(ResourceContent, "ns1", dgst(2).String()): nil, + gcnode(ResourceContent, "ns1", dgst(3).String()): nil, + gcnode(ResourceContent, "ns1", dgst(4).String()): { + gcnode(ResourceContent, "ns1", dgst(1).String()), + }, + gcnode(ResourceContent, "ns1", dgst(5).String()): { + gcnode(ResourceContent, "ns1", dgst(2).String()), + gcnode(ResourceContent, "ns1", dgst(3).String()), + }, + gcnode(ResourceContent, "ns1", dgst(6).String()): nil, + gcnode(ResourceContent, "ns2", dgst(1).String()): nil, + gcnode(ResourceContent, "ns2", dgst(2).String()): nil, + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"): nil, + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"): { + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"), + }, + gcnode(ResourceSnapshot, "ns1", "overlay/sn3"): { + gcnode(ResourceSnapshot, "ns1", "overlay/sn2"), + }, + gcnode(ResourceSnapshot, "ns1", "overlay/sn4"): { + gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"), + gcnode(ResourceSnapshot, "ns1", "overlay/sn1"), + }, + gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"): nil, + gcnode(ResourceSnapshot, "ns2", "overlay/sn1"): nil, + gcnode(ResourceSnapshot, "ns2", "overlay/sn2"): { + gcnode(ResourceSnapshot, "ns2", "overlay/sn1"), + }, + } + + if err := db.Update(func(tx *bolt.Tx) error { + v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + for _, alter := range alters { + if err := alter(v1bkt); err != nil { + return err + } + } + return nil + }); err != nil { + t.Fatalf("Update failed: %+v", err) + } + + ctx := context.Background() + + for n, nodes := range refs { + checkNodes(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error { + return references(ctx, tx, n, func(n gc.Node) { + select { + case nc <- n: + case <-ctx.Done(): + } + }) + }) + if t.Failed() { + t.Fatalf("Failure scanning %v", n) + } + } +} + +func newDatabase() (*bolt.DB, func(), error) { + td, err := ioutil.TempDir("", "gc-roots-") + if err != nil { + return nil, nil, err + } + + db, err := bolt.Open(filepath.Join(td, "test.db"), 0777, nil) + if err != nil { + os.RemoveAll(td) + return nil, nil, err + } + + return db, func() { + db.Close() + os.RemoveAll(td) + }, nil +} + +func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) { + var actual []gc.Node + nc := make(chan gc.Node) + done := make(chan struct{}) + go func() { + defer close(done) + for n := range nc { + actual = append(actual, n) + } + }() + if err := db.View(func(tx *bolt.Tx) error { + defer close(nc) + return fn(ctx, tx, nc) + }); err != nil { + t.Fatal(err) + } + + <-done + checkNodesEqual(t, actual, expected) +} + +func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) { + sort.Sort(nodeList(n1)) + sort.Sort(nodeList(n2)) + + if len(n1) != len(n2) { + t.Errorf("Nodes do not match\n\tExpected:\n\t%v\n\tActual:\n\t%v", n2, n1) + return + } + + for i := range n1 { + if n1[i] != n2[i] { + t.Errorf("[%d] root does not match expected: expected %v, got %v", i, n2[i], n1[i]) + } + } +} + +type nodeList []gc.Node + +func (nodes nodeList) Len() int { + return len(nodes) +} + +func (nodes nodeList) Less(i, j int) bool { + if nodes[i].Type != nodes[j].Type { + return nodes[i].Type < nodes[j].Type + } + if nodes[i].Namespace != nodes[j].Namespace { + return nodes[i].Namespace < nodes[j].Namespace + } + return nodes[i].Key < nodes[j].Key +} + +func (nodes nodeList) Swap(i, j int) { + nodes[i], nodes[j] = nodes[j], nodes[i] +} + +type alterFunc func(bkt *bolt.Bucket) error + +func addImage(ns, name string, dgst digest.Digest, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + ibkt, err := createBuckets(bkt, ns, string(bucketKeyObjectImages), name) + if err != nil { + return err + } + + tbkt, err := ibkt.CreateBucket(bucketKeyTarget) + if err != nil { + return err + } + if err := tbkt.Put(bucketKeyDigest, []byte(dgst.String())); err != nil { + return err + } + + return boltutil.WriteLabels(ibkt, labels) + } +} + +func addSnapshot(ns, snapshotter, name, parent string, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectSnapshots), snapshotter, name) + if err != nil { + return err + } + if parent != "" { + if err := sbkt.Put(bucketKeyParent, []byte(parent)); err != nil { + return err + } + } + return boltutil.WriteLabels(sbkt, labels) + } +} + +func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContent), string(bucketKeyObjectBlob), dgst.String()) + if err != nil { + return err + } + return boltutil.WriteLabels(cbkt, labels) + } +} + +func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc { + return func(bkt *bolt.Bucket) error { + cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name) + if err != nil { + return err + } + if err := cbkt.Put(bucketKeySnapshotter, []byte(snapshotter)); err != nil { + return err + } + if err := cbkt.Put(bucketKeySnapshotKey, []byte(snapshot)); err != nil { + return err + } + return boltutil.WriteLabels(cbkt, labels) + } +} + +func createBuckets(bkt *bolt.Bucket, names ...string) (*bolt.Bucket, error) { + for _, name := range names { + nbkt, err := bkt.CreateBucketIfNotExists([]byte(name)) + if err != nil { + return nil, err + } + bkt = nbkt + } + return bkt, nil +} + +func labelmap(kv ...string) map[string]string { + if len(kv)%2 != 0 { + panic("bad labels argument") + } + l := map[string]string{} + for i := 0; i < len(kv); i = i + 2 { + l[kv[i]] = kv[i+1] + } + return l +} + +func dgst(i int64) digest.Digest { + r := rand.New(rand.NewSource(i)) + dgstr := digest.SHA256.Digester() + if _, err := io.CopyN(dgstr.Hash(), r, 256); err != nil { + panic(err) + } + return dgstr.Digest() +} diff --git a/metadata/snapshot.go b/metadata/snapshot.go index 4103827a4..ad38e5915 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -4,11 +4,13 @@ import ( "context" "fmt" "strings" + "sync" "time" "github.com/boltdb/bolt" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/labels" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata/boltutil" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/namespaces" @@ -19,12 +21,13 @@ import ( type snapshotter struct { snapshot.Snapshotter name string - db transactor + db *DB + l sync.RWMutex } // newSnapshotter returns a new Snapshotter which namespaces the given snapshot // using the provided name and database. -func newSnapshotter(db transactor, name string, sn snapshot.Snapshotter) snapshot.Snapshotter { +func newSnapshotter(db *DB, name string, sn snapshot.Snapshotter) *snapshotter { return &snapshotter{ Snapshotter: sn, name: name, @@ -125,6 +128,9 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro } func (s *snapshotter) Update(ctx context.Context, info snapshot.Info, fieldpaths ...string) (snapshot.Info, error) { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return snapshot.Info{}, err @@ -249,6 +255,9 @@ func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snap } func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshot.Opt) ([]mount.Mount, error) { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return nil, err @@ -332,6 +341,9 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re } func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshot.Opt) error { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -421,6 +433,9 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap } func (s *snapshotter) Remove(ctx context.Context, key string) error { + s.l.RLock() + defer s.l.RUnlock() + ns, err := namespaces.NamespaceRequired(ctx) if err != nil { return err @@ -457,7 +472,16 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { } } - return bkt.DeleteBucket([]byte(key)) + if err := bkt.DeleteBucket([]byte(key)); err != nil { + return err + } + + // Mark snapshotter as dirty for triggering garbage collection + s.db.dirtyL.Lock() + s.db.dirtySS[s.name] = struct{}{} + s.db.dirtyL.Unlock() + + return nil }) } @@ -565,3 +589,134 @@ func validateSnapshot(info *snapshot.Info) error { return nil } + +func (s *snapshotter) garbageCollect(ctx context.Context) error { + logger := log.G(ctx).WithField("snapshotter", s.name) + lt1 := time.Now() + s.l.Lock() + defer func() { + s.l.Unlock() + logger.WithField("t", time.Now().Sub(lt1)).Debugf("garbage collected") + }() + + seen := map[string]struct{}{} + if err := s.db.View(func(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + + sbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectSnapshots) + if sbkt == nil { + continue + } + + // Load specific snapshotter + ssbkt := sbkt.Bucket([]byte(s.name)) + if ssbkt == nil { + continue + } + + if err := ssbkt.ForEach(func(sk, sv []byte) error { + if sv == nil { + bkey := ssbkt.Bucket(sk).Get(bucketKeyName) + if len(bkey) > 0 { + seen[string(bkey)] = struct{}{} + } + } + return nil + }); err != nil { + return err + } + } + + return nil + }); err != nil { + return err + } + + roots, err := s.walkTree(ctx, seen) + if err != nil { + return err + } + + // TODO: Unlock before prune (once nodes are fully unavailable) + + for _, node := range roots { + if err := s.pruneBranch(ctx, node); err != nil { + return err + } + } + + return nil +} + +type treeNode struct { + info snapshot.Info + remove bool + children []*treeNode +} + +func (s *snapshotter) walkTree(ctx context.Context, seen map[string]struct{}) ([]*treeNode, error) { + roots := []*treeNode{} + nodes := map[string]*treeNode{} + + if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error { + _, isSeen := seen[info.Name] + node, ok := nodes[info.Name] + if !ok { + node = &treeNode{} + nodes[info.Name] = node + } + + node.remove = !isSeen + node.info = info + + if info.Parent == "" { + roots = append(roots, node) + } else { + parent, ok := nodes[info.Parent] + if !ok { + parent = &treeNode{} + nodes[info.Parent] = parent + } + parent.children = append(parent.children, node) + } + + return nil + }); err != nil { + return nil, err + } + + return roots, nil +} + +func (s *snapshotter) pruneBranch(ctx context.Context, node *treeNode) error { + for _, child := range node.children { + if err := s.pruneBranch(ctx, child); err != nil { + return err + } + } + + if node.remove { + logger := log.G(ctx).WithField("snapshotter", s.name) + if err := s.Snapshotter.Remove(ctx, node.info.Name); err != nil { + if !errdefs.IsFailedPrecondition(err) { + return err + } + logger.WithError(err).WithField("key", node.info.Name).Warnf("snapshot removal failed") + } else { + logger.WithField("key", node.info.Name).Debug("removed snapshot") + } + } + + return nil +}