Metadata garbage collection

Marks and sweeps unreferenced objects.
Add snapshot cleanup to metadata.
Add content garbage collection

Add dirty flags for snapshotters and content store which
are set on deletion and used during the next garbage collection.
Cleanup content store backend when content metadata is removed.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2017-10-02 17:44:35 -07:00
parent 7884707c2f
commit 17471d5592
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
8 changed files with 1781 additions and 30 deletions

128
gc/gc.go
View File

@ -5,8 +5,25 @@
// under certain use cases.
package gc
import (
"context"
"sync"
)
// Resourcetype represents type of resource at a node
type ResourceType uint8
// Node presents a resource which has a type and key,
// this node can be used to lookup other nodes.
type Node struct {
Type ResourceType
Namespace string
Key string
}
// Tricolor implements basic, single-thread tri-color GC. Given the roots, the
// complete set and a refs function, this returns the unreachable objects.
// complete set and a refs function, this function returns a map of all
// reachable objects.
//
// Correct usage requires that the caller not allow the arguments to change
// until the result is used to delete objects in the system.
@ -15,11 +32,11 @@ package gc
//
// We can probably use this to inform a design for incremental GC by injecting
// callbacks to the set modification algorithms.
func Tricolor(roots []string, all []string, refs func(ref string) []string) []string {
func Tricolor(roots []Node, refs func(ref Node) ([]Node, error)) (map[Node]struct{}, error) {
var (
grays []string // maintain a gray "stack"
seen = map[string]struct{}{} // or not "white", basically "seen"
reachable = map[string]struct{}{} // or "block", in tri-color parlance
grays []Node // maintain a gray "stack"
seen = map[Node]struct{}{} // or not "white", basically "seen"
reachable = map[Node]struct{}{} // or "block", in tri-color parlance
)
grays = append(grays, roots...)
@ -29,9 +46,13 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st
id := grays[len(grays)-1] // effectively "depth first" because first element
grays = grays[:len(grays)-1]
seen[id] = struct{}{} // post-mark this as not-white
rs, err := refs(id)
if err != nil {
return nil, err
}
// mark all the referenced objects as gray
for _, target := range refs(id) {
for _, target := range rs {
if _, ok := seen[target]; !ok {
grays = append(grays, target)
}
@ -41,14 +62,99 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st
reachable[id] = struct{}{}
}
return reachable, nil
}
// ConcurrentMark implements simple, concurrent GC. All the roots are scanned
// and the complete set of references is formed by calling the refs function
// for each seen object. This function returns a map of all object reachable
// from a root.
//
// Correct usage requires that the caller not allow the arguments to change
// until the result is used to delete objects in the system.
//
// It will allocate memory proportional to the size of the reachable set.
func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Context, Node, func(Node)) error) (map[Node]struct{}, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
grays = make(chan Node)
seen = map[Node]struct{}{} // or not "white", basically "seen"
wg sync.WaitGroup
errOnce sync.Once
refErr error
)
go func() {
for gray := range grays {
if _, ok := seen[gray]; ok {
wg.Done()
continue
}
seen[gray] = struct{}{} // post-mark this as non-white
go func(gray Node) {
defer wg.Done()
send := func(n Node) {
wg.Add(1)
select {
case grays <- n:
case <-ctx.Done():
wg.Done()
}
}
if err := refs(ctx, gray, send); err != nil {
errOnce.Do(func() {
refErr = err
cancel()
})
}
}(gray)
}
}()
for r := range root {
wg.Add(1)
select {
case grays <- r:
case <-ctx.Done():
wg.Done()
}
}
// Wait for outstanding grays to be processed
wg.Wait()
close(grays)
if refErr != nil {
return nil, refErr
}
if cErr := ctx.Err(); cErr != nil {
return nil, cErr
}
return seen, nil
}
// Sweep removes all nodes returned through the channel which are not in
// the reachable set by calling the provided remove function.
func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error {
// All black objects are now reachable, and all white objects are
// unreachable. Free those that are white!
var whites []string
for _, obj := range all {
if _, ok := reachable[obj]; !ok {
whites = append(whites, obj)
for node := range all {
if _, ok := reachable[node]; !ok {
if err := remove(node); err != nil {
return err
}
}
}
return whites
return nil
}

View File

@ -1,30 +1,154 @@
package gc
import (
"context"
"reflect"
"testing"
)
func TestTricolorBasic(t *testing.T) {
roots := []string{"A", "C"}
all := []string{"A", "B", "C", "D", "E", "F", "G"}
all := []string{"A", "B", "C", "D", "E", "F", "G", "H"}
refs := map[string][]string{
"A": {"B"},
"B": {"A"},
"C": {"D", "F", "B"},
"E": {"F", "G"},
"F": {"H"},
}
unreachable := Tricolor(roots, all, lookup(refs))
expected := []string{"E", "G"}
expected := toNodes([]string{"A", "B", "C", "D", "F", "H"})
if !reflect.DeepEqual(unreachable, expected) {
t.Fatalf("incorrect unreachable set: %v != %v", unreachable, expected)
reachable, err := Tricolor(toNodes(roots), lookup(refs))
if err != nil {
t.Fatal(err)
}
var sweeped []Node
for _, a := range toNodes(all) {
if _, ok := reachable[a]; ok {
sweeped = append(sweeped, a)
}
}
if !reflect.DeepEqual(sweeped, expected) {
t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected)
}
}
func lookup(refs map[string][]string) func(id string) []string {
return func(ref string) []string {
return refs[ref]
func TestConcurrentBasic(t *testing.T) {
roots := []string{"A", "C"}
all := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
refs := map[string][]string{
"A": {"B"},
"B": {"A"},
"C": {"D", "F", "B"},
"E": {"F", "G"},
"F": {"H"},
"G": {"I"},
}
expected := toNodes([]string{"A", "B", "C", "D", "F", "H"})
ctx := context.Background()
rootC := make(chan Node)
go func() {
writeNodes(ctx, rootC, toNodes(roots))
close(rootC)
}()
reachable, err := ConcurrentMark(ctx, rootC, lookupc(refs))
if err != nil {
t.Fatal(err)
}
var sweeped []Node
for _, a := range toNodes(all) {
if _, ok := reachable[a]; ok {
sweeped = append(sweeped, a)
}
}
if !reflect.DeepEqual(sweeped, expected) {
t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected)
}
}
func writeNodes(ctx context.Context, nc chan<- Node, nodes []Node) {
for _, n := range nodes {
select {
case nc <- n:
case <-ctx.Done():
return
}
}
}
func lookup(refs map[string][]string) func(id Node) ([]Node, error) {
return func(ref Node) ([]Node, error) {
return toNodes(refs[ref.Key]), nil
}
}
func lookupc(refs map[string][]string) func(context.Context, Node, func(Node)) error {
return func(ctx context.Context, ref Node, fn func(Node)) error {
for _, n := range toNodes(refs[ref.Key]) {
fn(n)
}
return nil
}
}
func toNodes(s []string) []Node {
n := make([]Node, len(s))
for i := range s {
n[i] = Node{
Key: s[i],
}
}
return n
}
func newScanner(refs []string) *stringScanner {
return &stringScanner{
i: -1,
s: refs,
}
}
type stringScanner struct {
i int
s []string
}
func (ss *stringScanner) Next() bool {
ss.i++
return ss.i < len(ss.s)
}
func (ss *stringScanner) Node() Node {
return Node{
Key: ss.s[ss.i],
}
}
func (ss *stringScanner) Cleanup() error {
ss.s[ss.i] = ""
return nil
}
func (ss *stringScanner) Err() error {
return nil
}
func (ss *stringScanner) All() []Node {
remaining := make([]Node, 0, len(ss.s))
for _, s := range ss.s {
if s != "" {
remaining = append(remaining, Node{
Key: s,
})
}
}
return remaining
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/binary"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
@ -11,6 +12,7 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/labels"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
@ -19,12 +21,13 @@ import (
type contentStore struct {
content.Store
db transactor
db *DB
l sync.RWMutex
}
// newContentStore returns a namespaced content store using an existing
// content store interface.
func newContentStore(db transactor, cs content.Store) content.Store {
func newContentStore(db *DB, cs content.Store) *contentStore {
return &contentStore{
Store: cs,
db: db,
@ -59,6 +62,9 @@ func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpath
return content.Info{}, err
}
cs.l.RLock()
defer cs.l.RUnlock()
updated := content.Info{
Digest: info.Digest,
}
@ -166,15 +172,25 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
return err
}
cs.l.RLock()
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, dgst)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
}
// Just remove local reference, garbage collector is responsible for
// cleaning up on disk content
return getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String()))
if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil {
return err
}
// Mark content store as dirty for triggering garbage collection
cs.db.dirtyL.Lock()
cs.db.dirtyCS = true
cs.db.dirtyL.Unlock()
return nil
})
}
@ -269,6 +285,9 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
return err
}
cs.l.RLock()
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
if bkt == nil {
@ -293,6 +312,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
return nil, err
}
cs.l.RLock()
defer cs.l.RUnlock()
var w content.Writer
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
if expected != "" {
@ -346,6 +368,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
ref: ref,
namespace: ns,
db: cs.db,
l: &cs.l,
}, nil
}
@ -354,9 +377,13 @@ type namespacedWriter struct {
ref string
namespace string
db transactor
l *sync.RWMutex
}
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
nw.l.RLock()
defer nw.l.RUnlock()
return update(ctx, nw.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, nw.namespace)
if bkt != nil {
@ -495,3 +522,61 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
return bkt.Put(bucketKeySize, sizeEncoded)
}
func (cs *contentStore) garbageCollect(ctx context.Context) error {
lt1 := time.Now()
cs.l.Lock()
defer func() {
cs.l.Unlock()
log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected")
}()
seen := map[string]struct{}{}
if err := cs.db.View(func(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent)
if cbkt == nil {
continue
}
bbkt := cbkt.Bucket(bucketKeyObjectBlob)
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
}
return nil
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
if err := cs.Store.Walk(ctx, func(info content.Info) error {
if _, ok := seen[info.Digest.String()]; !ok {
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
return err
}
log.G(ctx).WithField("digest", info.Digest).Debug("removed content")
}
return nil
}); err != nil {
return err
}
return nil
}

View File

@ -3,10 +3,13 @@ package metadata
import (
"context"
"encoding/binary"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/snapshot"
"github.com/pkg/errors"
@ -36,15 +39,32 @@ type DB struct {
db *bolt.DB
ss map[string]snapshot.Snapshotter
cs content.Store
// wlock is used to protect access to the data structures during garbage
// collection. While the wlock is held no writable transactions can be
// opened, preventing changes from occurring between the mark and
// sweep phases without preventing read transactions.
wlock sync.RWMutex
// dirty flags and lock keeps track of datastores which have had deletions
// since the last garbage collection. These datastores will will be garbage
// collected during the next garbage collection.
dirtyL sync.Mutex
dirtySS map[string]struct{}
dirtyCS bool
// TODO: Keep track of stats such as pause time, number of collected objects, errors
lastCollection time.Time
}
// NewDB creates a new metadata database using the provided
// bolt database, content store, and snapshotters.
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB {
return &DB{
db: db,
ss: ss,
cs: cs,
db: db,
ss: ss,
cs: cs,
dirtySS: map[string]struct{}{},
}
}
@ -158,5 +178,134 @@ func (m *DB) View(fn func(*bolt.Tx) error) error {
// Update runs a writable transation on the metadata store.
func (m *DB) Update(fn func(*bolt.Tx) error) error {
m.wlock.RLock()
defer m.wlock.RUnlock()
return m.db.Update(fn)
}
func (m *DB) GarbageCollect(ctx context.Context) error {
lt1 := time.Now()
m.wlock.Lock()
defer func() {
m.wlock.Unlock()
log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected")
}()
var marked map[gc.Node]struct{}
if err := m.db.View(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
roots := make(chan gc.Node)
errChan := make(chan error)
go func() {
defer close(errChan)
defer close(roots)
// Call roots
if err := scanRoots(ctx, tx, roots); err != nil {
cancel()
errChan <- err
}
}()
refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error {
return references(ctx, tx, n, fn)
}
reachable, err := gc.ConcurrentMark(ctx, roots, refs)
if rerr := <-errChan; rerr != nil {
return rerr
}
if err != nil {
return err
}
marked = reachable
return nil
}); err != nil {
return err
}
m.dirtyL.Lock()
defer m.dirtyL.Unlock()
if err := m.db.Update(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
nodeC := make(chan gc.Node)
var scanErr error
go func() {
defer close(nodeC)
scanErr = scanAll(ctx, tx, nodeC)
}()
rm := func(n gc.Node) error {
if n.Type == ResourceSnapshot {
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
m.dirtySS[n.Key[:idx]] = struct{}{}
}
} else if n.Type == ResourceContent {
m.dirtyCS = true
}
return remove(ctx, tx, n)
}
if err := gc.Sweep(marked, nodeC, rm); err != nil {
return errors.Wrap(err, "failed to sweep")
}
if scanErr != nil {
return errors.Wrap(scanErr, "failed to scan all")
}
return nil
}); err != nil {
return err
}
m.lastCollection = time.Now()
if len(m.dirtySS) > 0 {
for snapshotterName := range m.dirtySS {
log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup")
go m.cleanupSnapshotter(snapshotterName)
}
m.dirtySS = map[string]struct{}{}
}
if m.dirtyCS {
log.G(ctx).Debug("scheduling content cleanup")
go m.cleanupContent()
m.dirtyCS = false
}
return nil
}
func (m *DB) cleanupSnapshotter(name string) {
ctx := context.Background()
sn, ok := m.ss[name]
if !ok {
return
}
err := newSnapshotter(m, name, sn).garbageCollect(ctx)
if err != nil {
log.G(ctx).WithError(err).WithField("snapshotter", name).Warn("garbage collection failed")
}
}
func (m *DB) cleanupContent() {
ctx := context.Background()
if m.cs == nil {
return
}
err := newContentStore(m, m.cs).garbageCollect(ctx)
if err != nil {
log.G(ctx).WithError(err).Warn("content garbage collection failed")
}
}

View File

@ -1,11 +1,31 @@
package metadata
import (
"context"
"encoding/binary"
"fmt"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"runtime/pprof"
"testing"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/content/local"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/containerd/snapshot/naive"
"github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)
@ -176,3 +196,366 @@ func readDBVersion(db *bolt.DB, schema []byte) (int, error) {
}
return version, nil
}
func TestMetadataCollector(t *testing.T) {
mdb, cs, sn, cleanup := newStores(t)
defer cleanup()
var (
ctx = context.Background()
objects = []object{
blob(bytesFor(1), true),
blob(bytesFor(2), false),
blob(bytesFor(3), true),
blob(bytesFor(4), false, "containerd.io/gc.root", time.Now().String()),
newSnapshot("1", "", false, false),
newSnapshot("2", "1", false, false),
newSnapshot("3", "2", false, false),
newSnapshot("4", "3", false, false),
newSnapshot("5", "3", false, true),
container("1", "4"),
image("image-1", digestFor(2)),
}
remaining []gc.Node
)
if err := mdb.Update(func(tx *bolt.Tx) error {
for _, obj := range objects {
node, err := create(obj, tx, cs, sn)
if err != nil {
return err
}
if node != nil {
remaining = append(remaining, *node)
}
}
return nil
}); err != nil {
t.Fatalf("Creation failed: %+v", err)
}
if err := mdb.GarbageCollect(ctx); err != nil {
t.Fatal(err)
}
var actual []gc.Node
if err := mdb.View(func(tx *bolt.Tx) error {
nodeC := make(chan gc.Node)
var scanErr error
go func() {
defer close(nodeC)
scanErr = scanAll(ctx, tx, nodeC)
}()
for node := range nodeC {
actual = append(actual, node)
}
return scanErr
}); err != nil {
t.Fatal(err)
}
checkNodesEqual(t, actual, remaining)
}
func BenchmarkGarbageCollect(b *testing.B) {
b.Run("10-Sets", benchmarkTrigger(10))
b.Run("100-Sets", benchmarkTrigger(100))
b.Run("1000-Sets", benchmarkTrigger(1000))
b.Run("10000-Sets", benchmarkTrigger(10000))
}
func benchmarkTrigger(n int) func(b *testing.B) {
return func(b *testing.B) {
mdb, cs, sn, cleanup := newStores(b)
defer cleanup()
objects := []object{}
// TODO: Allow max to be configurable
for i := 0; i < n; i++ {
objects = append(objects,
blob(bytesFor(int64(i)), false),
image(fmt.Sprintf("image-%d", i), digestFor(int64(i))),
)
lastSnapshot := 6
for j := 0; j <= lastSnapshot; j++ {
var parent string
key := fmt.Sprintf("snapshot-%d-%d", i, j)
if j > 0 {
parent = fmt.Sprintf("snapshot-%d-%d", i, j-1)
}
objects = append(objects, newSnapshot(key, parent, false, false))
}
objects = append(objects, container(fmt.Sprintf("container-%d", i), fmt.Sprintf("snapshot-%d-%d", i, lastSnapshot)))
}
// TODO: Create set of objects for removal
var (
ctx = context.Background()
remaining []gc.Node
)
if err := mdb.Update(func(tx *bolt.Tx) error {
for _, obj := range objects {
node, err := create(obj, tx, cs, sn)
if err != nil {
return err
}
if node != nil {
remaining = append(remaining, *node)
}
}
return nil
}); err != nil {
b.Fatalf("Creation failed: %+v", err)
}
// TODO: reset benchmark
b.ResetTimer()
//b.StopTimer()
labels := pprof.Labels("worker", "trigger")
pprof.Do(ctx, labels, func(ctx context.Context) {
for i := 0; i < b.N; i++ {
// TODO: Add removal objects
//b.StartTimer()
if err := mdb.GarbageCollect(ctx); err != nil {
b.Fatal(err)
}
//b.StopTimer()
//var actual []gc.Node
//if err := db.View(func(tx *bolt.Tx) error {
// nodeC := make(chan gc.Node)
// var scanErr error
// go func() {
// defer close(nodeC)
// scanErr = scanAll(ctx, tx, nodeC)
// }()
// for node := range nodeC {
// actual = append(actual, node)
// }
// return scanErr
//}); err != nil {
// t.Fatal(err)
//}
//checkNodesEqual(t, actual, remaining)
}
})
}
}
func bytesFor(i int64) []byte {
r := rand.New(rand.NewSource(i))
var b [256]byte
_, err := r.Read(b[:])
if err != nil {
panic(err)
}
return b[:]
}
func digestFor(i int64) digest.Digest {
r := rand.New(rand.NewSource(i))
dgstr := digest.SHA256.Digester()
_, err := io.Copy(dgstr.Hash(), io.LimitReader(r, 256))
if err != nil {
panic(err)
}
return dgstr.Digest()
}
type object struct {
data interface{}
removed bool
labels map[string]string
}
func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshot.Snapshotter) (*gc.Node, error) {
var (
node *gc.Node
namespace = "test"
ctx = namespaces.WithNamespace(context.Background(), namespace)
)
switch v := obj.data.(type) {
case testContent:
ctx := WithTransactionContext(ctx, tx)
expected := digest.FromBytes(v.data)
w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected)
if err != nil {
return nil, errors.Wrap(err, "failed to create writer")
}
if _, err := w.Write(v.data); err != nil {
return nil, errors.Wrap(err, "write blob failed")
}
if err := w.Commit(ctx, int64(len(v.data)), expected, content.WithLabels(obj.labels)); err != nil {
return nil, errors.Wrap(err, "failed to commit blob")
}
if !obj.removed {
node = &gc.Node{
Type: ResourceContent,
Namespace: namespace,
Key: expected.String(),
}
}
case testSnapshot:
ctx := WithTransactionContext(ctx, tx)
if v.active {
_, err := sn.Prepare(ctx, v.key, v.parent, snapshot.WithLabels(obj.labels))
if err != nil {
return nil, err
}
} else {
akey := fmt.Sprintf("%s-active", v.key)
_, err := sn.Prepare(ctx, akey, v.parent)
if err != nil {
return nil, err
}
if err := sn.Commit(ctx, v.key, akey, snapshot.WithLabels(obj.labels)); err != nil {
return nil, err
}
}
if !obj.removed {
node = &gc.Node{
Type: ResourceSnapshot,
Namespace: namespace,
Key: fmt.Sprintf("naive/%s", v.key),
}
}
case testImage:
image := images.Image{
Name: v.name,
Target: v.target,
Labels: obj.labels,
}
_, err := NewImageStore(tx).Create(ctx, image)
if err != nil {
return nil, errors.Wrap(err, "failed to create image")
}
case testContainer:
container := containers.Container{
ID: v.id,
SnapshotKey: v.snapshot,
Snapshotter: "naive",
Labels: obj.labels,
Runtime: containers.RuntimeInfo{
Name: "testruntime",
},
Spec: &types.Any{},
}
_, err := NewContainerStore(tx).Create(ctx, container)
if err != nil {
return nil, err
}
}
return node, nil
}
func blob(b []byte, r bool, l ...string) object {
return object{
data: testContent{
data: b,
},
removed: r,
labels: labelmap(l...),
}
}
func image(n string, d digest.Digest, l ...string) object {
return object{
data: testImage{
name: n,
target: ocispec.Descriptor{
MediaType: "irrelevant",
Digest: d,
Size: 256,
},
},
removed: false,
labels: labelmap(l...),
}
}
func newSnapshot(key, parent string, active, r bool, l ...string) object {
return object{
data: testSnapshot{
key: key,
parent: parent,
active: active,
},
removed: r,
labels: labelmap(l...),
}
}
func container(id, s string, l ...string) object {
return object{
data: testContainer{
id: id,
snapshot: s,
},
removed: false,
labels: labelmap(l...),
}
}
type testContent struct {
data []byte
}
type testSnapshot struct {
key string
parent string
active bool
}
type testImage struct {
name string
target ocispec.Descriptor
}
type testContainer struct {
id string
snapshot string
}
func newStores(t testing.TB) (*DB, content.Store, snapshot.Snapshotter, func()) {
td, err := ioutil.TempDir("", "gc-test-")
if err != nil {
t.Fatal(err)
}
db, err := bolt.Open(filepath.Join(td, "meta.db"), 0644, nil)
if err != nil {
t.Fatal(err)
}
nsn, err := naive.NewSnapshotter(filepath.Join(td, "snapshots"))
if err != nil {
t.Fatal(err)
}
lcs, err := local.NewStore(filepath.Join(td, "content"))
if err != nil {
t.Fatal(err)
}
mdb := NewDB(db, lcs, map[string]snapshot.Snapshotter{"naive": nsn})
return mdb, mdb.ContentStore(), mdb.Snapshotter("naive"), func() {
os.RemoveAll(td)
}
}

343
metadata/gc.go Normal file
View File

@ -0,0 +1,343 @@
package metadata
import (
"context"
"fmt"
"strings"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/log"
"github.com/pkg/errors"
)
const (
ResourceUnknown gc.ResourceType = iota
ResourceContent
ResourceSnapshot
ResourceContainer
ResourceTask
)
var (
labelGCRoot = []byte("containerd.io/gc.root")
labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.")
labelGCContentRef = []byte("containerd.io/gc.ref.content")
)
func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
nbkt := v1bkt.Bucket(k)
ns := string(k)
ibkt := nbkt.Bucket(bucketKeyObjectImages)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
target := ibkt.Bucket(k).Bucket(bucketKeyTarget)
if target != nil {
contentKey := string(target.Get(bucketKeyDigest))
select {
case nc <- gcnode(ResourceContent, ns, contentKey):
case <-ctx.Done():
return ctx.Err()
}
}
return sendSnapshotRefs(ns, ibkt.Bucket(k), func(n gc.Node) {
select {
case nc <- n:
case <-ctx.Done():
}
})
}); err != nil {
return err
}
}
cbkt := nbkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
}
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k))
}); err != nil {
return err
}
}
cbkt = nbkt.Bucket(bucketKeyObjectContainers)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
snapshotter := string(cbkt.Bucket(k).Get(bucketKeySnapshotter))
if snapshotter != "" {
ss := string(cbkt.Bucket(k).Get(bucketKeySnapshotKey))
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, ss)):
case <-ctx.Done():
return ctx.Err()
}
}
// TODO: Send additional snapshot refs through labels
return sendSnapshotRefs(ns, cbkt.Bucket(k), func(n gc.Node) {
select {
case nc <- n:
case <-ctx.Done():
}
})
}); err != nil {
return err
}
}
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
return sendRootRef(ctx, nc, gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)), snbkt.Bucket(k))
})
}); err != nil {
return err
}
}
}
return nil
}
func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error {
if node.Type == ResourceContent {
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(node.Key))
if bkt == nil {
// Node may be created from dead edge
return nil
}
if err := sendSnapshotRefs(node.Namespace, bkt, fn); err != nil {
return err
}
return sendContentRefs(node.Namespace, bkt, fn)
} else if node.Type == ResourceSnapshot {
parts := strings.SplitN(node.Key, "/", 2)
if len(parts) != 2 {
return errors.Errorf("invalid snapshot gc key %s", node.Key)
}
ss := parts[0]
name := parts[1]
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots, []byte(ss), []byte(name))
if bkt == nil {
getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots).ForEach(func(k, v []byte) error {
return nil
})
// Node may be created from dead edge
return nil
}
if pv := bkt.Get(bucketKeyParent); len(pv) > 0 {
fn(gcnode(ResourceSnapshot, node.Namespace, fmt.Sprintf("%s/%s", ss, pv)))
}
return sendSnapshotRefs(node.Namespace, bkt, fn)
}
return nil
}
func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
nbkt := v1bkt.Bucket(k)
ns := string(k)
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}); err != nil {
return err
}
}
cbkt := nbkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
}
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
select {
case nc <- gcnode(ResourceContent, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
}); err != nil {
return err
}
}
}
return nil
}
func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
nsbkt := v1bkt.Bucket([]byte(node.Namespace))
if nsbkt == nil {
return nil
}
switch node.Type {
case ResourceContent:
cbkt := nsbkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
}
if cbkt != nil {
log.G(ctx).WithField("key", node.Key).Debug("delete content")
return cbkt.DeleteBucket([]byte(node.Key))
}
case ResourceSnapshot:
sbkt := nsbkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
parts := strings.SplitN(node.Key, "/", 2)
if len(parts) != 2 {
return errors.Errorf("invalid snapshot gc key %s", node.Key)
}
ssbkt := sbkt.Bucket([]byte(parts[0]))
if ssbkt != nil {
log.G(ctx).WithField("key", parts[1]).WithField("snapshotter", parts[0]).Debug("delete snapshot")
return ssbkt.DeleteBucket([]byte(parts[1]))
}
}
}
return nil
}
// sendSnapshotRefs sends all snapshot references referred to by the labels in the bkt
func sendSnapshotRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
lbkt := bkt.Bucket(bucketKeyObjectLabels)
if lbkt != nil {
lc := lbkt.Cursor()
for k, v := lc.Seek(labelGCSnapRef); k != nil && strings.HasPrefix(string(k), string(labelGCSnapRef)); k, v = lc.Next() {
snapshotter := string(k[len(labelGCSnapRef):])
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v)))
}
}
return nil
}
// sendContentRefs sends all content references referred to by the labels in the bkt
func sendContentRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
lbkt := bkt.Bucket(bucketKeyObjectLabels)
if lbkt != nil {
lc := lbkt.Cursor()
labelRef := string(labelGCContentRef)
for k, v := lc.Seek(labelGCContentRef); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() {
if ks := string(k); ks != labelRef {
// Allow reference naming, ignore names
if ks[len(labelRef)] != '.' {
continue
}
}
fn(gcnode(ResourceContent, ns, string(v)))
}
}
return nil
}
func isRootRef(bkt *bolt.Bucket) bool {
lbkt := bkt.Bucket(bucketKeyObjectLabels)
if lbkt != nil {
rv := lbkt.Get(labelGCRoot)
if rv != nil {
// TODO: interpret rv as a timestamp and skip if expired
return true
}
}
return false
}
func sendRootRef(ctx context.Context, nc chan<- gc.Node, n gc.Node, bkt *bolt.Bucket) error {
if isRootRef(bkt) {
select {
case nc <- n:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
func gcnode(t gc.ResourceType, ns, key string) gc.Node {
return gc.Node{
Type: t,
Namespace: ns,
Key: key,
}
}

406
metadata/gc_test.go Normal file
View File

@ -0,0 +1,406 @@
package metadata
import (
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"sort"
"testing"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/gc"
"github.com/containerd/containerd/metadata/boltutil"
digest "github.com/opencontainers/go-digest"
)
func TestGCRoots(t *testing.T) {
db, cleanup, err := newDatabase()
if err != nil {
t.Fatal(err)
}
defer cleanup()
alters := []alterFunc{
addImage("ns1", "image1", dgst(1), nil),
addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")),
addContent("ns1", dgst(1), nil),
addContent("ns1", dgst(2), nil),
addContent("ns1", dgst(3), nil),
addContent("ns2", dgst(1), nil),
addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")),
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "", nil),
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
}
expected := []gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()),
gcnode(ResourceContent, "ns1", dgst(2).String()),
gcnode(ResourceContent, "ns2", dgst(2).String()),
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
}
if err := db.Update(func(tx *bolt.Tx) error {
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
for _, alter := range alters {
if err := alter(v1bkt); err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatalf("Update failed: %+v", err)
}
ctx := context.Background()
checkNodes(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return scanRoots(ctx, tx, nc)
})
}
func TestGCRemove(t *testing.T) {
db, cleanup, err := newDatabase()
if err != nil {
t.Fatal(err)
}
defer cleanup()
alters := []alterFunc{
addImage("ns1", "image1", dgst(1), nil),
addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")),
addContent("ns1", dgst(1), nil),
addContent("ns1", dgst(2), nil),
addContent("ns1", dgst(3), nil),
addContent("ns2", dgst(1), nil),
addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")),
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "", nil),
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
addSnapshot("ns2", "overlay", "sn1", "", nil),
}
all := []gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()),
gcnode(ResourceContent, "ns1", dgst(2).String()),
gcnode(ResourceContent, "ns1", dgst(3).String()),
gcnode(ResourceContent, "ns2", dgst(1).String()),
gcnode(ResourceContent, "ns2", dgst(2).String()),
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
}
var deleted, remaining []gc.Node
for i, n := range all {
if i%2 == 0 {
deleted = append(deleted, n)
} else {
remaining = append(remaining, n)
}
}
if err := db.Update(func(tx *bolt.Tx) error {
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
for _, alter := range alters {
if err := alter(v1bkt); err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatalf("Update failed: %+v", err)
}
ctx := context.Background()
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return scanAll(ctx, tx, nc)
})
if t.Failed() {
t.Fatal("Scan all failed")
}
if err := db.Update(func(tx *bolt.Tx) error {
for _, n := range deleted {
if err := remove(ctx, tx, n); err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatalf("Update failed: %+v", err)
}
checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return scanAll(ctx, tx, nc)
})
}
func TestGCRefs(t *testing.T) {
db, cleanup, err := newDatabase()
if err != nil {
t.Fatal(err)
}
defer cleanup()
alters := []alterFunc{
addContent("ns1", dgst(1), nil),
addContent("ns1", dgst(2), nil),
addContent("ns1", dgst(3), nil),
addContent("ns1", dgst(4), labelmap(string(labelGCContentRef), dgst(1).String())),
addContent("ns1", dgst(5), labelmap(string(labelGCContentRef)+".anything-1", dgst(2).String(), string(labelGCContentRef)+".anything-2", dgst(3).String())),
addContent("ns1", dgst(6), labelmap(string(labelGCContentRef)+"bad", dgst(1).String())),
addContent("ns2", dgst(1), nil),
addContent("ns2", dgst(2), nil),
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "sn1", nil),
addSnapshot("ns1", "overlay", "sn3", "sn2", nil),
addSnapshot("ns1", "overlay", "sn4", "", labelmap(string(labelGCSnapRef)+"btrfs", "sn1", string(labelGCSnapRef)+"overlay", "sn1")),
addSnapshot("ns1", "btrfs", "sn1", "", nil),
addSnapshot("ns2", "overlay", "sn1", "", nil),
addSnapshot("ns2", "overlay", "sn2", "sn1", nil),
}
refs := map[gc.Node][]gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()): nil,
gcnode(ResourceContent, "ns1", dgst(2).String()): nil,
gcnode(ResourceContent, "ns1", dgst(3).String()): nil,
gcnode(ResourceContent, "ns1", dgst(4).String()): {
gcnode(ResourceContent, "ns1", dgst(1).String()),
},
gcnode(ResourceContent, "ns1", dgst(5).String()): {
gcnode(ResourceContent, "ns1", dgst(2).String()),
gcnode(ResourceContent, "ns1", dgst(3).String()),
},
gcnode(ResourceContent, "ns1", dgst(6).String()): nil,
gcnode(ResourceContent, "ns2", dgst(1).String()): nil,
gcnode(ResourceContent, "ns2", dgst(2).String()): nil,
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"): nil,
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"): {
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"),
},
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"): {
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
},
gcnode(ResourceSnapshot, "ns1", "overlay/sn4"): {
gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"),
},
gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"): nil,
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"): nil,
gcnode(ResourceSnapshot, "ns2", "overlay/sn2"): {
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
},
}
if err := db.Update(func(tx *bolt.Tx) error {
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
if err != nil {
return err
}
for _, alter := range alters {
if err := alter(v1bkt); err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatalf("Update failed: %+v", err)
}
ctx := context.Background()
for n, nodes := range refs {
checkNodes(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return references(ctx, tx, n, func(n gc.Node) {
select {
case nc <- n:
case <-ctx.Done():
}
})
})
if t.Failed() {
t.Fatalf("Failure scanning %v", n)
}
}
}
func newDatabase() (*bolt.DB, func(), error) {
td, err := ioutil.TempDir("", "gc-roots-")
if err != nil {
return nil, nil, err
}
db, err := bolt.Open(filepath.Join(td, "test.db"), 0777, nil)
if err != nil {
os.RemoveAll(td)
return nil, nil, err
}
return db, func() {
db.Close()
os.RemoveAll(td)
}, nil
}
func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) {
var actual []gc.Node
nc := make(chan gc.Node)
done := make(chan struct{})
go func() {
defer close(done)
for n := range nc {
actual = append(actual, n)
}
}()
if err := db.View(func(tx *bolt.Tx) error {
defer close(nc)
return fn(ctx, tx, nc)
}); err != nil {
t.Fatal(err)
}
<-done
checkNodesEqual(t, actual, expected)
}
func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) {
sort.Sort(nodeList(n1))
sort.Sort(nodeList(n2))
if len(n1) != len(n2) {
t.Errorf("Nodes do not match\n\tExpected:\n\t%v\n\tActual:\n\t%v", n2, n1)
return
}
for i := range n1 {
if n1[i] != n2[i] {
t.Errorf("[%d] root does not match expected: expected %v, got %v", i, n2[i], n1[i])
}
}
}
type nodeList []gc.Node
func (nodes nodeList) Len() int {
return len(nodes)
}
func (nodes nodeList) Less(i, j int) bool {
if nodes[i].Type != nodes[j].Type {
return nodes[i].Type < nodes[j].Type
}
if nodes[i].Namespace != nodes[j].Namespace {
return nodes[i].Namespace < nodes[j].Namespace
}
return nodes[i].Key < nodes[j].Key
}
func (nodes nodeList) Swap(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
}
type alterFunc func(bkt *bolt.Bucket) error
func addImage(ns, name string, dgst digest.Digest, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
ibkt, err := createBuckets(bkt, ns, string(bucketKeyObjectImages), name)
if err != nil {
return err
}
tbkt, err := ibkt.CreateBucket(bucketKeyTarget)
if err != nil {
return err
}
if err := tbkt.Put(bucketKeyDigest, []byte(dgst.String())); err != nil {
return err
}
return boltutil.WriteLabels(ibkt, labels)
}
}
func addSnapshot(ns, snapshotter, name, parent string, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectSnapshots), snapshotter, name)
if err != nil {
return err
}
if parent != "" {
if err := sbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
return err
}
}
return boltutil.WriteLabels(sbkt, labels)
}
}
func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContent), string(bucketKeyObjectBlob), dgst.String())
if err != nil {
return err
}
return boltutil.WriteLabels(cbkt, labels)
}
}
func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name)
if err != nil {
return err
}
if err := cbkt.Put(bucketKeySnapshotter, []byte(snapshotter)); err != nil {
return err
}
if err := cbkt.Put(bucketKeySnapshotKey, []byte(snapshot)); err != nil {
return err
}
return boltutil.WriteLabels(cbkt, labels)
}
}
func createBuckets(bkt *bolt.Bucket, names ...string) (*bolt.Bucket, error) {
for _, name := range names {
nbkt, err := bkt.CreateBucketIfNotExists([]byte(name))
if err != nil {
return nil, err
}
bkt = nbkt
}
return bkt, nil
}
func labelmap(kv ...string) map[string]string {
if len(kv)%2 != 0 {
panic("bad labels argument")
}
l := map[string]string{}
for i := 0; i < len(kv); i = i + 2 {
l[kv[i]] = kv[i+1]
}
return l
}
func dgst(i int64) digest.Digest {
r := rand.New(rand.NewSource(i))
dgstr := digest.SHA256.Digester()
if _, err := io.CopyN(dgstr.Hash(), r, 256); err != nil {
panic(err)
}
return dgstr.Digest()
}

View File

@ -4,11 +4,13 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/labels"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
@ -19,12 +21,13 @@ import (
type snapshotter struct {
snapshot.Snapshotter
name string
db transactor
db *DB
l sync.RWMutex
}
// newSnapshotter returns a new Snapshotter which namespaces the given snapshot
// using the provided name and database.
func newSnapshotter(db transactor, name string, sn snapshot.Snapshotter) snapshot.Snapshotter {
func newSnapshotter(db *DB, name string, sn snapshot.Snapshotter) *snapshotter {
return &snapshotter{
Snapshotter: sn,
name: name,
@ -125,6 +128,9 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro
}
func (s *snapshotter) Update(ctx context.Context, info snapshot.Info, fieldpaths ...string) (snapshot.Info, error) {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return snapshot.Info{}, err
@ -249,6 +255,9 @@ func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snap
}
func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshot.Opt) ([]mount.Mount, error) {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
@ -332,6 +341,9 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
}
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshot.Opt) error {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
@ -421,6 +433,9 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
}
func (s *snapshotter) Remove(ctx context.Context, key string) error {
s.l.RLock()
defer s.l.RUnlock()
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
@ -457,7 +472,16 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
}
}
return bkt.DeleteBucket([]byte(key))
if err := bkt.DeleteBucket([]byte(key)); err != nil {
return err
}
// Mark snapshotter as dirty for triggering garbage collection
s.db.dirtyL.Lock()
s.db.dirtySS[s.name] = struct{}{}
s.db.dirtyL.Unlock()
return nil
})
}
@ -565,3 +589,134 @@ func validateSnapshot(info *snapshot.Info) error {
return nil
}
func (s *snapshotter) garbageCollect(ctx context.Context) error {
logger := log.G(ctx).WithField("snapshotter", s.name)
lt1 := time.Now()
s.l.Lock()
defer func() {
s.l.Unlock()
logger.WithField("t", time.Now().Sub(lt1)).Debugf("garbage collected")
}()
seen := map[string]struct{}{}
if err := s.db.View(func(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
sbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectSnapshots)
if sbkt == nil {
continue
}
// Load specific snapshotter
ssbkt := sbkt.Bucket([]byte(s.name))
if ssbkt == nil {
continue
}
if err := ssbkt.ForEach(func(sk, sv []byte) error {
if sv == nil {
bkey := ssbkt.Bucket(sk).Get(bucketKeyName)
if len(bkey) > 0 {
seen[string(bkey)] = struct{}{}
}
}
return nil
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
roots, err := s.walkTree(ctx, seen)
if err != nil {
return err
}
// TODO: Unlock before prune (once nodes are fully unavailable)
for _, node := range roots {
if err := s.pruneBranch(ctx, node); err != nil {
return err
}
}
return nil
}
type treeNode struct {
info snapshot.Info
remove bool
children []*treeNode
}
func (s *snapshotter) walkTree(ctx context.Context, seen map[string]struct{}) ([]*treeNode, error) {
roots := []*treeNode{}
nodes := map[string]*treeNode{}
if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error {
_, isSeen := seen[info.Name]
node, ok := nodes[info.Name]
if !ok {
node = &treeNode{}
nodes[info.Name] = node
}
node.remove = !isSeen
node.info = info
if info.Parent == "" {
roots = append(roots, node)
} else {
parent, ok := nodes[info.Parent]
if !ok {
parent = &treeNode{}
nodes[info.Parent] = parent
}
parent.children = append(parent.children, node)
}
return nil
}); err != nil {
return nil, err
}
return roots, nil
}
func (s *snapshotter) pruneBranch(ctx context.Context, node *treeNode) error {
for _, child := range node.children {
if err := s.pruneBranch(ctx, child); err != nil {
return err
}
}
if node.remove {
logger := log.G(ctx).WithField("snapshotter", s.name)
if err := s.Snapshotter.Remove(ctx, node.info.Name); err != nil {
if !errdefs.IsFailedPrecondition(err) {
return err
}
logger.WithError(err).WithField("key", node.info.Name).Warnf("snapshot removal failed")
} else {
logger.WithField("key", node.info.Name).Debug("removed snapshot")
}
}
return nil
}