Merge pull request #1563 from dmcgowan/gc-alpha
Metadata garbage collection
This commit is contained in:
commit
587f25245a
@ -2,10 +2,12 @@ package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/gogo/protobuf/types"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
@ -91,7 +93,11 @@ func WithNewSnapshot(id string, i Image) NewContainerOpts {
|
||||
return err
|
||||
}
|
||||
setSnapshotterIfEmpty(c)
|
||||
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil {
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().String(),
|
||||
}
|
||||
parent := identity.ChainID(diffIDs).String()
|
||||
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
|
||||
return err
|
||||
}
|
||||
c.SnapshotKey = id
|
||||
@ -120,7 +126,11 @@ func WithNewSnapshotView(id string, i Image) NewContainerOpts {
|
||||
return err
|
||||
}
|
||||
setSnapshotterIfEmpty(c)
|
||||
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, identity.ChainID(diffIDs).String()); err != nil {
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().String(),
|
||||
}
|
||||
parent := identity.ChainID(diffIDs).String()
|
||||
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
|
||||
return err
|
||||
}
|
||||
c.SnapshotKey = id
|
||||
|
@ -69,7 +69,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
|
||||
// the size or digest is unknown, these values may be empty.
|
||||
//
|
||||
// Copy is buffered, so no need to wrap reader in buffered io.
|
||||
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest) error {
|
||||
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
|
||||
ws, err := cw.Status()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -96,7 +96,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cw.Commit(ctx, size, expected); err != nil {
|
||||
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
|
||||
}
|
||||
|
128
gc/gc.go
128
gc/gc.go
@ -5,8 +5,25 @@
|
||||
// under certain use cases.
|
||||
package gc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Resourcetype represents type of resource at a node
|
||||
type ResourceType uint8
|
||||
|
||||
// Node presents a resource which has a type and key,
|
||||
// this node can be used to lookup other nodes.
|
||||
type Node struct {
|
||||
Type ResourceType
|
||||
Namespace string
|
||||
Key string
|
||||
}
|
||||
|
||||
// Tricolor implements basic, single-thread tri-color GC. Given the roots, the
|
||||
// complete set and a refs function, this returns the unreachable objects.
|
||||
// complete set and a refs function, this function returns a map of all
|
||||
// reachable objects.
|
||||
//
|
||||
// Correct usage requires that the caller not allow the arguments to change
|
||||
// until the result is used to delete objects in the system.
|
||||
@ -15,11 +32,11 @@ package gc
|
||||
//
|
||||
// We can probably use this to inform a design for incremental GC by injecting
|
||||
// callbacks to the set modification algorithms.
|
||||
func Tricolor(roots []string, all []string, refs func(ref string) []string) []string {
|
||||
func Tricolor(roots []Node, refs func(ref Node) ([]Node, error)) (map[Node]struct{}, error) {
|
||||
var (
|
||||
grays []string // maintain a gray "stack"
|
||||
seen = map[string]struct{}{} // or not "white", basically "seen"
|
||||
reachable = map[string]struct{}{} // or "block", in tri-color parlance
|
||||
grays []Node // maintain a gray "stack"
|
||||
seen = map[Node]struct{}{} // or not "white", basically "seen"
|
||||
reachable = map[Node]struct{}{} // or "block", in tri-color parlance
|
||||
)
|
||||
|
||||
grays = append(grays, roots...)
|
||||
@ -29,9 +46,13 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st
|
||||
id := grays[len(grays)-1] // effectively "depth first" because first element
|
||||
grays = grays[:len(grays)-1]
|
||||
seen[id] = struct{}{} // post-mark this as not-white
|
||||
rs, err := refs(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// mark all the referenced objects as gray
|
||||
for _, target := range refs(id) {
|
||||
for _, target := range rs {
|
||||
if _, ok := seen[target]; !ok {
|
||||
grays = append(grays, target)
|
||||
}
|
||||
@ -41,14 +62,99 @@ func Tricolor(roots []string, all []string, refs func(ref string) []string) []st
|
||||
reachable[id] = struct{}{}
|
||||
}
|
||||
|
||||
return reachable, nil
|
||||
}
|
||||
|
||||
// ConcurrentMark implements simple, concurrent GC. All the roots are scanned
|
||||
// and the complete set of references is formed by calling the refs function
|
||||
// for each seen object. This function returns a map of all object reachable
|
||||
// from a root.
|
||||
//
|
||||
// Correct usage requires that the caller not allow the arguments to change
|
||||
// until the result is used to delete objects in the system.
|
||||
//
|
||||
// It will allocate memory proportional to the size of the reachable set.
|
||||
func ConcurrentMark(ctx context.Context, root <-chan Node, refs func(context.Context, Node, func(Node)) error) (map[Node]struct{}, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
var (
|
||||
grays = make(chan Node)
|
||||
seen = map[Node]struct{}{} // or not "white", basically "seen"
|
||||
wg sync.WaitGroup
|
||||
|
||||
errOnce sync.Once
|
||||
refErr error
|
||||
)
|
||||
|
||||
go func() {
|
||||
for gray := range grays {
|
||||
if _, ok := seen[gray]; ok {
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
seen[gray] = struct{}{} // post-mark this as non-white
|
||||
|
||||
go func(gray Node) {
|
||||
defer wg.Done()
|
||||
|
||||
send := func(n Node) {
|
||||
wg.Add(1)
|
||||
select {
|
||||
case grays <- n:
|
||||
case <-ctx.Done():
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
if err := refs(ctx, gray, send); err != nil {
|
||||
errOnce.Do(func() {
|
||||
refErr = err
|
||||
cancel()
|
||||
})
|
||||
}
|
||||
|
||||
}(gray)
|
||||
}
|
||||
}()
|
||||
|
||||
for r := range root {
|
||||
wg.Add(1)
|
||||
select {
|
||||
case grays <- r:
|
||||
case <-ctx.Done():
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Wait for outstanding grays to be processed
|
||||
wg.Wait()
|
||||
|
||||
close(grays)
|
||||
|
||||
if refErr != nil {
|
||||
return nil, refErr
|
||||
}
|
||||
if cErr := ctx.Err(); cErr != nil {
|
||||
return nil, cErr
|
||||
}
|
||||
|
||||
return seen, nil
|
||||
}
|
||||
|
||||
// Sweep removes all nodes returned through the channel which are not in
|
||||
// the reachable set by calling the provided remove function.
|
||||
func Sweep(reachable map[Node]struct{}, all <-chan Node, remove func(Node) error) error {
|
||||
// All black objects are now reachable, and all white objects are
|
||||
// unreachable. Free those that are white!
|
||||
var whites []string
|
||||
for _, obj := range all {
|
||||
if _, ok := reachable[obj]; !ok {
|
||||
whites = append(whites, obj)
|
||||
for node := range all {
|
||||
if _, ok := reachable[node]; !ok {
|
||||
if err := remove(node); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return whites
|
||||
return nil
|
||||
}
|
||||
|
140
gc/gc_test.go
140
gc/gc_test.go
@ -1,30 +1,154 @@
|
||||
package gc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestTricolorBasic(t *testing.T) {
|
||||
roots := []string{"A", "C"}
|
||||
all := []string{"A", "B", "C", "D", "E", "F", "G"}
|
||||
all := []string{"A", "B", "C", "D", "E", "F", "G", "H"}
|
||||
refs := map[string][]string{
|
||||
"A": {"B"},
|
||||
"B": {"A"},
|
||||
"C": {"D", "F", "B"},
|
||||
"E": {"F", "G"},
|
||||
"F": {"H"},
|
||||
}
|
||||
|
||||
unreachable := Tricolor(roots, all, lookup(refs))
|
||||
expected := []string{"E", "G"}
|
||||
expected := toNodes([]string{"A", "B", "C", "D", "F", "H"})
|
||||
|
||||
if !reflect.DeepEqual(unreachable, expected) {
|
||||
t.Fatalf("incorrect unreachable set: %v != %v", unreachable, expected)
|
||||
reachable, err := Tricolor(toNodes(roots), lookup(refs))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var sweeped []Node
|
||||
for _, a := range toNodes(all) {
|
||||
if _, ok := reachable[a]; ok {
|
||||
sweeped = append(sweeped, a)
|
||||
}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(sweeped, expected) {
|
||||
t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func lookup(refs map[string][]string) func(id string) []string {
|
||||
return func(ref string) []string {
|
||||
return refs[ref]
|
||||
func TestConcurrentBasic(t *testing.T) {
|
||||
roots := []string{"A", "C"}
|
||||
all := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"}
|
||||
refs := map[string][]string{
|
||||
"A": {"B"},
|
||||
"B": {"A"},
|
||||
"C": {"D", "F", "B"},
|
||||
"E": {"F", "G"},
|
||||
"F": {"H"},
|
||||
"G": {"I"},
|
||||
}
|
||||
|
||||
expected := toNodes([]string{"A", "B", "C", "D", "F", "H"})
|
||||
|
||||
ctx := context.Background()
|
||||
rootC := make(chan Node)
|
||||
go func() {
|
||||
writeNodes(ctx, rootC, toNodes(roots))
|
||||
close(rootC)
|
||||
}()
|
||||
|
||||
reachable, err := ConcurrentMark(ctx, rootC, lookupc(refs))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var sweeped []Node
|
||||
for _, a := range toNodes(all) {
|
||||
if _, ok := reachable[a]; ok {
|
||||
sweeped = append(sweeped, a)
|
||||
}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(sweeped, expected) {
|
||||
t.Fatalf("incorrect unreachable set: %v != %v", sweeped, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func writeNodes(ctx context.Context, nc chan<- Node, nodes []Node) {
|
||||
for _, n := range nodes {
|
||||
select {
|
||||
case nc <- n:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func lookup(refs map[string][]string) func(id Node) ([]Node, error) {
|
||||
return func(ref Node) ([]Node, error) {
|
||||
return toNodes(refs[ref.Key]), nil
|
||||
}
|
||||
}
|
||||
|
||||
func lookupc(refs map[string][]string) func(context.Context, Node, func(Node)) error {
|
||||
return func(ctx context.Context, ref Node, fn func(Node)) error {
|
||||
for _, n := range toNodes(refs[ref.Key]) {
|
||||
fn(n)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func toNodes(s []string) []Node {
|
||||
n := make([]Node, len(s))
|
||||
for i := range s {
|
||||
n[i] = Node{
|
||||
Key: s[i],
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func newScanner(refs []string) *stringScanner {
|
||||
return &stringScanner{
|
||||
i: -1,
|
||||
s: refs,
|
||||
}
|
||||
}
|
||||
|
||||
type stringScanner struct {
|
||||
i int
|
||||
s []string
|
||||
}
|
||||
|
||||
func (ss *stringScanner) Next() bool {
|
||||
ss.i++
|
||||
return ss.i < len(ss.s)
|
||||
}
|
||||
|
||||
func (ss *stringScanner) Node() Node {
|
||||
return Node{
|
||||
Key: ss.s[ss.i],
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *stringScanner) Cleanup() error {
|
||||
ss.s[ss.i] = ""
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *stringScanner) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *stringScanner) All() []Node {
|
||||
remaining := make([]Node, 0, len(ss.s))
|
||||
for _, s := range ss.s {
|
||||
if s != "" {
|
||||
remaining = append(remaining, Node{
|
||||
Key: s,
|
||||
})
|
||||
}
|
||||
}
|
||||
return remaining
|
||||
}
|
||||
|
73
image.go
73
image.go
@ -2,11 +2,16 @@ package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/rootfs"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@ -64,36 +69,68 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sn := i.client.SnapshotService(snapshotterName)
|
||||
a := i.client.DiffService()
|
||||
cs := i.client.ContentStore()
|
||||
var (
|
||||
sn = i.client.SnapshotService(snapshotterName)
|
||||
a = i.client.DiffService()
|
||||
cs = i.client.ContentStore()
|
||||
|
||||
var chain []digest.Digest
|
||||
chain []digest.Digest
|
||||
unpacked bool
|
||||
)
|
||||
for _, layer := range layers {
|
||||
unpacked, err := rootfs.ApplyLayer(ctx, layer, chain, sn, a)
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
||||
"containerd.io/uncompressed": layer.Diff.Digest.String(),
|
||||
}
|
||||
lastUnpacked := unpacked
|
||||
|
||||
unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels))
|
||||
if err != nil {
|
||||
// TODO: possibly wait and retry if extraction of same chain id was in progress
|
||||
return err
|
||||
}
|
||||
if unpacked {
|
||||
info, err := cs.Info(ctx, layer.Blob.Digest)
|
||||
if err != nil {
|
||||
|
||||
if lastUnpacked {
|
||||
info := snapshot.Info{
|
||||
Name: identity.ChainID(chain).String(),
|
||||
}
|
||||
|
||||
// Remove previously created gc.root label
|
||||
if _, err := sn.Update(ctx, info, "labels.containerd.io/gc.root"); err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Labels == nil {
|
||||
info.Labels = map[string]string{}
|
||||
}
|
||||
if info.Labels["containerd.io/uncompressed"] != layer.Diff.Digest.String() {
|
||||
info.Labels["containerd.io/uncompressed"] = layer.Diff.Digest.String()
|
||||
if _, err := cs.Update(ctx, info, "labels.containerd.io/uncompressed"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
chain = append(chain, layer.Diff.Digest)
|
||||
}
|
||||
|
||||
if unpacked {
|
||||
desc, err := i.i.Config(ctx, cs, platforms.Default())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootfs := identity.ChainID(chain).String()
|
||||
|
||||
cinfo := content.Info{
|
||||
Digest: desc.Digest,
|
||||
Labels: map[string]string{
|
||||
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snapshotterName): rootfs,
|
||||
},
|
||||
}
|
||||
if _, err := cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sinfo := snapshot.Info{
|
||||
Name: rootfs,
|
||||
}
|
||||
|
||||
// Config now referenced snapshot, release root reference
|
||||
if _, err := sn.Update(ctx, sinfo, "labels.containerd.io/gc.root"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
@ -11,6 +12,7 @@ import (
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/filters"
|
||||
"github.com/containerd/containerd/labels"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/metadata/boltutil"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
@ -19,12 +21,13 @@ import (
|
||||
|
||||
type contentStore struct {
|
||||
content.Store
|
||||
db transactor
|
||||
db *DB
|
||||
l sync.RWMutex
|
||||
}
|
||||
|
||||
// newContentStore returns a namespaced content store using an existing
|
||||
// content store interface.
|
||||
func newContentStore(db transactor, cs content.Store) content.Store {
|
||||
func newContentStore(db *DB, cs content.Store) *contentStore {
|
||||
return &contentStore{
|
||||
Store: cs,
|
||||
db: db,
|
||||
@ -59,6 +62,9 @@ func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpath
|
||||
return content.Info{}, err
|
||||
}
|
||||
|
||||
cs.l.RLock()
|
||||
defer cs.l.RUnlock()
|
||||
|
||||
updated := content.Info{
|
||||
Digest: info.Digest,
|
||||
}
|
||||
@ -166,15 +172,25 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
return err
|
||||
}
|
||||
|
||||
cs.l.RLock()
|
||||
defer cs.l.RUnlock()
|
||||
|
||||
return update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getBlobBucket(tx, ns, dgst)
|
||||
if bkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
|
||||
}
|
||||
|
||||
// Just remove local reference, garbage collector is responsible for
|
||||
// cleaning up on disk content
|
||||
return getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String()))
|
||||
if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mark content store as dirty for triggering garbage collection
|
||||
cs.db.dirtyL.Lock()
|
||||
cs.db.dirtyCS = true
|
||||
cs.db.dirtyL.Unlock()
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@ -269,6 +285,9 @@ func (cs *contentStore) Abort(ctx context.Context, ref string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
cs.l.RLock()
|
||||
defer cs.l.RUnlock()
|
||||
|
||||
return update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getIngestBucket(tx, ns)
|
||||
if bkt == nil {
|
||||
@ -293,6 +312,9 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cs.l.RLock()
|
||||
defer cs.l.RUnlock()
|
||||
|
||||
var w content.Writer
|
||||
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
if expected != "" {
|
||||
@ -346,6 +368,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
ref: ref,
|
||||
namespace: ns,
|
||||
db: cs.db,
|
||||
l: &cs.l,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -354,9 +377,13 @@ type namespacedWriter struct {
|
||||
ref string
|
||||
namespace string
|
||||
db transactor
|
||||
l *sync.RWMutex
|
||||
}
|
||||
|
||||
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||
nw.l.RLock()
|
||||
defer nw.l.RUnlock()
|
||||
|
||||
return update(ctx, nw.db, func(tx *bolt.Tx) error {
|
||||
bkt := getIngestBucket(tx, nw.namespace)
|
||||
if bkt != nil {
|
||||
@ -495,3 +522,61 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
|
||||
|
||||
return bkt.Put(bucketKeySize, sizeEncoded)
|
||||
}
|
||||
|
||||
func (cs *contentStore) garbageCollect(ctx context.Context) error {
|
||||
lt1 := time.Now()
|
||||
cs.l.Lock()
|
||||
defer func() {
|
||||
cs.l.Unlock()
|
||||
log.G(ctx).WithField("t", time.Now().Sub(lt1)).Debugf("content garbage collected")
|
||||
}()
|
||||
|
||||
seen := map[string]struct{}{}
|
||||
if err := cs.db.View(func(tx *bolt.Tx) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// iterate through each namespace
|
||||
v1c := v1bkt.Cursor()
|
||||
|
||||
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent)
|
||||
if cbkt == nil {
|
||||
continue
|
||||
}
|
||||
bbkt := cbkt.Bucket(bucketKeyObjectBlob)
|
||||
if err := bbkt.ForEach(func(ck, cv []byte) error {
|
||||
if cv == nil {
|
||||
seen[string(ck)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cs.Store.Walk(ctx, func(info content.Info) error {
|
||||
if _, ok := seen[info.Digest.String()]; !ok {
|
||||
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
|
||||
return err
|
||||
}
|
||||
log.G(ctx).WithField("digest", info.Digest).Debug("removed content")
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
155
metadata/db.go
155
metadata/db.go
@ -3,10 +3,13 @@ package metadata
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/gc"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/pkg/errors"
|
||||
@ -36,15 +39,32 @@ type DB struct {
|
||||
db *bolt.DB
|
||||
ss map[string]snapshot.Snapshotter
|
||||
cs content.Store
|
||||
|
||||
// wlock is used to protect access to the data structures during garbage
|
||||
// collection. While the wlock is held no writable transactions can be
|
||||
// opened, preventing changes from occurring between the mark and
|
||||
// sweep phases without preventing read transactions.
|
||||
wlock sync.RWMutex
|
||||
|
||||
// dirty flags and lock keeps track of datastores which have had deletions
|
||||
// since the last garbage collection. These datastores will will be garbage
|
||||
// collected during the next garbage collection.
|
||||
dirtyL sync.Mutex
|
||||
dirtySS map[string]struct{}
|
||||
dirtyCS bool
|
||||
|
||||
// TODO: Keep track of stats such as pause time, number of collected objects, errors
|
||||
lastCollection time.Time
|
||||
}
|
||||
|
||||
// NewDB creates a new metadata database using the provided
|
||||
// bolt database, content store, and snapshotters.
|
||||
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB {
|
||||
return &DB{
|
||||
db: db,
|
||||
ss: ss,
|
||||
cs: cs,
|
||||
db: db,
|
||||
ss: ss,
|
||||
cs: cs,
|
||||
dirtySS: map[string]struct{}{},
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,5 +178,134 @@ func (m *DB) View(fn func(*bolt.Tx) error) error {
|
||||
|
||||
// Update runs a writable transation on the metadata store.
|
||||
func (m *DB) Update(fn func(*bolt.Tx) error) error {
|
||||
m.wlock.RLock()
|
||||
defer m.wlock.RUnlock()
|
||||
return m.db.Update(fn)
|
||||
}
|
||||
|
||||
func (m *DB) GarbageCollect(ctx context.Context) error {
|
||||
lt1 := time.Now()
|
||||
m.wlock.Lock()
|
||||
defer func() {
|
||||
m.wlock.Unlock()
|
||||
log.G(ctx).WithField("d", time.Now().Sub(lt1)).Debug("metadata garbage collected")
|
||||
}()
|
||||
|
||||
var marked map[gc.Node]struct{}
|
||||
|
||||
if err := m.db.View(func(tx *bolt.Tx) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
roots := make(chan gc.Node)
|
||||
errChan := make(chan error)
|
||||
go func() {
|
||||
defer close(errChan)
|
||||
defer close(roots)
|
||||
|
||||
// Call roots
|
||||
if err := scanRoots(ctx, tx, roots); err != nil {
|
||||
cancel()
|
||||
errChan <- err
|
||||
}
|
||||
}()
|
||||
|
||||
refs := func(ctx context.Context, n gc.Node, fn func(gc.Node)) error {
|
||||
return references(ctx, tx, n, fn)
|
||||
}
|
||||
|
||||
reachable, err := gc.ConcurrentMark(ctx, roots, refs)
|
||||
if rerr := <-errChan; rerr != nil {
|
||||
return rerr
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
marked = reachable
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.dirtyL.Lock()
|
||||
defer m.dirtyL.Unlock()
|
||||
|
||||
if err := m.db.Update(func(tx *bolt.Tx) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
nodeC := make(chan gc.Node)
|
||||
var scanErr error
|
||||
|
||||
go func() {
|
||||
defer close(nodeC)
|
||||
scanErr = scanAll(ctx, tx, nodeC)
|
||||
}()
|
||||
|
||||
rm := func(n gc.Node) error {
|
||||
if n.Type == ResourceSnapshot {
|
||||
if idx := strings.IndexRune(n.Key, '/'); idx > 0 {
|
||||
m.dirtySS[n.Key[:idx]] = struct{}{}
|
||||
}
|
||||
} else if n.Type == ResourceContent {
|
||||
m.dirtyCS = true
|
||||
}
|
||||
return remove(ctx, tx, n)
|
||||
}
|
||||
|
||||
if err := gc.Sweep(marked, nodeC, rm); err != nil {
|
||||
return errors.Wrap(err, "failed to sweep")
|
||||
}
|
||||
|
||||
if scanErr != nil {
|
||||
return errors.Wrap(scanErr, "failed to scan all")
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m.lastCollection = time.Now()
|
||||
|
||||
if len(m.dirtySS) > 0 {
|
||||
for snapshotterName := range m.dirtySS {
|
||||
log.G(ctx).WithField("snapshotter", snapshotterName).Debug("scheduling snapshotter cleanup")
|
||||
go m.cleanupSnapshotter(snapshotterName)
|
||||
}
|
||||
m.dirtySS = map[string]struct{}{}
|
||||
}
|
||||
|
||||
if m.dirtyCS {
|
||||
log.G(ctx).Debug("scheduling content cleanup")
|
||||
go m.cleanupContent()
|
||||
m.dirtyCS = false
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *DB) cleanupSnapshotter(name string) {
|
||||
ctx := context.Background()
|
||||
sn, ok := m.ss[name]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
err := newSnapshotter(m, name, sn).garbageCollect(ctx)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).WithField("snapshotter", name).Warn("garbage collection failed")
|
||||
}
|
||||
}
|
||||
|
||||
func (m *DB) cleanupContent() {
|
||||
ctx := context.Background()
|
||||
if m.cs == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := newContentStore(m, m.cs).garbageCollect(ctx)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Warn("content garbage collection failed")
|
||||
}
|
||||
}
|
||||
|
@ -1,11 +1,31 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/pprof"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/content/local"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/gc"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/containerd/containerd/snapshot/naive"
|
||||
"github.com/gogo/protobuf/types"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
@ -176,3 +196,366 @@ func readDBVersion(db *bolt.DB, schema []byte) (int, error) {
|
||||
}
|
||||
return version, nil
|
||||
}
|
||||
|
||||
func TestMetadataCollector(t *testing.T) {
|
||||
mdb, cs, sn, cleanup := newStores(t)
|
||||
defer cleanup()
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
|
||||
objects = []object{
|
||||
blob(bytesFor(1), true),
|
||||
blob(bytesFor(2), false),
|
||||
blob(bytesFor(3), true),
|
||||
blob(bytesFor(4), false, "containerd.io/gc.root", time.Now().String()),
|
||||
newSnapshot("1", "", false, false),
|
||||
newSnapshot("2", "1", false, false),
|
||||
newSnapshot("3", "2", false, false),
|
||||
newSnapshot("4", "3", false, false),
|
||||
newSnapshot("5", "3", false, true),
|
||||
container("1", "4"),
|
||||
image("image-1", digestFor(2)),
|
||||
}
|
||||
remaining []gc.Node
|
||||
)
|
||||
|
||||
if err := mdb.Update(func(tx *bolt.Tx) error {
|
||||
for _, obj := range objects {
|
||||
node, err := create(obj, tx, cs, sn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if node != nil {
|
||||
remaining = append(remaining, *node)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Creation failed: %+v", err)
|
||||
}
|
||||
|
||||
if err := mdb.GarbageCollect(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
var actual []gc.Node
|
||||
|
||||
if err := mdb.View(func(tx *bolt.Tx) error {
|
||||
nodeC := make(chan gc.Node)
|
||||
var scanErr error
|
||||
go func() {
|
||||
defer close(nodeC)
|
||||
scanErr = scanAll(ctx, tx, nodeC)
|
||||
}()
|
||||
for node := range nodeC {
|
||||
actual = append(actual, node)
|
||||
}
|
||||
return scanErr
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkNodesEqual(t, actual, remaining)
|
||||
}
|
||||
|
||||
func BenchmarkGarbageCollect(b *testing.B) {
|
||||
b.Run("10-Sets", benchmarkTrigger(10))
|
||||
b.Run("100-Sets", benchmarkTrigger(100))
|
||||
b.Run("1000-Sets", benchmarkTrigger(1000))
|
||||
b.Run("10000-Sets", benchmarkTrigger(10000))
|
||||
}
|
||||
|
||||
func benchmarkTrigger(n int) func(b *testing.B) {
|
||||
return func(b *testing.B) {
|
||||
mdb, cs, sn, cleanup := newStores(b)
|
||||
defer cleanup()
|
||||
|
||||
objects := []object{}
|
||||
|
||||
// TODO: Allow max to be configurable
|
||||
for i := 0; i < n; i++ {
|
||||
objects = append(objects,
|
||||
blob(bytesFor(int64(i)), false),
|
||||
image(fmt.Sprintf("image-%d", i), digestFor(int64(i))),
|
||||
)
|
||||
lastSnapshot := 6
|
||||
for j := 0; j <= lastSnapshot; j++ {
|
||||
var parent string
|
||||
key := fmt.Sprintf("snapshot-%d-%d", i, j)
|
||||
if j > 0 {
|
||||
parent = fmt.Sprintf("snapshot-%d-%d", i, j-1)
|
||||
}
|
||||
objects = append(objects, newSnapshot(key, parent, false, false))
|
||||
}
|
||||
objects = append(objects, container(fmt.Sprintf("container-%d", i), fmt.Sprintf("snapshot-%d-%d", i, lastSnapshot)))
|
||||
|
||||
}
|
||||
|
||||
// TODO: Create set of objects for removal
|
||||
|
||||
var (
|
||||
ctx = context.Background()
|
||||
|
||||
remaining []gc.Node
|
||||
)
|
||||
|
||||
if err := mdb.Update(func(tx *bolt.Tx) error {
|
||||
for _, obj := range objects {
|
||||
node, err := create(obj, tx, cs, sn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if node != nil {
|
||||
remaining = append(remaining, *node)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
b.Fatalf("Creation failed: %+v", err)
|
||||
}
|
||||
|
||||
// TODO: reset benchmark
|
||||
b.ResetTimer()
|
||||
//b.StopTimer()
|
||||
|
||||
labels := pprof.Labels("worker", "trigger")
|
||||
pprof.Do(ctx, labels, func(ctx context.Context) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
||||
// TODO: Add removal objects
|
||||
|
||||
//b.StartTimer()
|
||||
|
||||
if err := mdb.GarbageCollect(ctx); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
|
||||
//b.StopTimer()
|
||||
|
||||
//var actual []gc.Node
|
||||
|
||||
//if err := db.View(func(tx *bolt.Tx) error {
|
||||
// nodeC := make(chan gc.Node)
|
||||
// var scanErr error
|
||||
// go func() {
|
||||
// defer close(nodeC)
|
||||
// scanErr = scanAll(ctx, tx, nodeC)
|
||||
// }()
|
||||
// for node := range nodeC {
|
||||
// actual = append(actual, node)
|
||||
// }
|
||||
// return scanErr
|
||||
//}); err != nil {
|
||||
// t.Fatal(err)
|
||||
//}
|
||||
|
||||
//checkNodesEqual(t, actual, remaining)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func bytesFor(i int64) []byte {
|
||||
r := rand.New(rand.NewSource(i))
|
||||
var b [256]byte
|
||||
_, err := r.Read(b[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return b[:]
|
||||
}
|
||||
|
||||
func digestFor(i int64) digest.Digest {
|
||||
r := rand.New(rand.NewSource(i))
|
||||
dgstr := digest.SHA256.Digester()
|
||||
_, err := io.Copy(dgstr.Hash(), io.LimitReader(r, 256))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return dgstr.Digest()
|
||||
}
|
||||
|
||||
type object struct {
|
||||
data interface{}
|
||||
removed bool
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshot.Snapshotter) (*gc.Node, error) {
|
||||
var (
|
||||
node *gc.Node
|
||||
namespace = "test"
|
||||
ctx = namespaces.WithNamespace(context.Background(), namespace)
|
||||
)
|
||||
|
||||
switch v := obj.data.(type) {
|
||||
case testContent:
|
||||
ctx := WithTransactionContext(ctx, tx)
|
||||
expected := digest.FromBytes(v.data)
|
||||
w, err := cs.Writer(ctx, "test-ref", int64(len(v.data)), expected)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create writer")
|
||||
}
|
||||
if _, err := w.Write(v.data); err != nil {
|
||||
return nil, errors.Wrap(err, "write blob failed")
|
||||
}
|
||||
if err := w.Commit(ctx, int64(len(v.data)), expected, content.WithLabels(obj.labels)); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to commit blob")
|
||||
}
|
||||
if !obj.removed {
|
||||
node = &gc.Node{
|
||||
Type: ResourceContent,
|
||||
Namespace: namespace,
|
||||
Key: expected.String(),
|
||||
}
|
||||
}
|
||||
case testSnapshot:
|
||||
ctx := WithTransactionContext(ctx, tx)
|
||||
if v.active {
|
||||
_, err := sn.Prepare(ctx, v.key, v.parent, snapshot.WithLabels(obj.labels))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
akey := fmt.Sprintf("%s-active", v.key)
|
||||
_, err := sn.Prepare(ctx, akey, v.parent)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sn.Commit(ctx, v.key, akey, snapshot.WithLabels(obj.labels)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if !obj.removed {
|
||||
node = &gc.Node{
|
||||
Type: ResourceSnapshot,
|
||||
Namespace: namespace,
|
||||
Key: fmt.Sprintf("naive/%s", v.key),
|
||||
}
|
||||
}
|
||||
case testImage:
|
||||
image := images.Image{
|
||||
Name: v.name,
|
||||
Target: v.target,
|
||||
Labels: obj.labels,
|
||||
}
|
||||
_, err := NewImageStore(tx).Create(ctx, image)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create image")
|
||||
}
|
||||
case testContainer:
|
||||
container := containers.Container{
|
||||
ID: v.id,
|
||||
SnapshotKey: v.snapshot,
|
||||
Snapshotter: "naive",
|
||||
Labels: obj.labels,
|
||||
|
||||
Runtime: containers.RuntimeInfo{
|
||||
Name: "testruntime",
|
||||
},
|
||||
Spec: &types.Any{},
|
||||
}
|
||||
_, err := NewContainerStore(tx).Create(ctx, container)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func blob(b []byte, r bool, l ...string) object {
|
||||
return object{
|
||||
data: testContent{
|
||||
data: b,
|
||||
},
|
||||
removed: r,
|
||||
labels: labelmap(l...),
|
||||
}
|
||||
}
|
||||
|
||||
func image(n string, d digest.Digest, l ...string) object {
|
||||
return object{
|
||||
data: testImage{
|
||||
name: n,
|
||||
target: ocispec.Descriptor{
|
||||
MediaType: "irrelevant",
|
||||
Digest: d,
|
||||
Size: 256,
|
||||
},
|
||||
},
|
||||
removed: false,
|
||||
labels: labelmap(l...),
|
||||
}
|
||||
}
|
||||
|
||||
func newSnapshot(key, parent string, active, r bool, l ...string) object {
|
||||
return object{
|
||||
data: testSnapshot{
|
||||
key: key,
|
||||
parent: parent,
|
||||
active: active,
|
||||
},
|
||||
removed: r,
|
||||
labels: labelmap(l...),
|
||||
}
|
||||
}
|
||||
|
||||
func container(id, s string, l ...string) object {
|
||||
return object{
|
||||
data: testContainer{
|
||||
id: id,
|
||||
snapshot: s,
|
||||
},
|
||||
removed: false,
|
||||
labels: labelmap(l...),
|
||||
}
|
||||
}
|
||||
|
||||
type testContent struct {
|
||||
data []byte
|
||||
}
|
||||
|
||||
type testSnapshot struct {
|
||||
key string
|
||||
parent string
|
||||
active bool
|
||||
}
|
||||
|
||||
type testImage struct {
|
||||
name string
|
||||
target ocispec.Descriptor
|
||||
}
|
||||
|
||||
type testContainer struct {
|
||||
id string
|
||||
snapshot string
|
||||
}
|
||||
|
||||
func newStores(t testing.TB) (*DB, content.Store, snapshot.Snapshotter, func()) {
|
||||
td, err := ioutil.TempDir("", "gc-test-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
db, err := bolt.Open(filepath.Join(td, "meta.db"), 0644, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
nsn, err := naive.NewSnapshotter(filepath.Join(td, "snapshots"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
lcs, err := local.NewStore(filepath.Join(td, "content"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
mdb := NewDB(db, lcs, map[string]snapshot.Snapshotter{"naive": nsn})
|
||||
|
||||
return mdb, mdb.ContentStore(), mdb.Snapshotter("naive"), func() {
|
||||
os.RemoveAll(td)
|
||||
}
|
||||
}
|
||||
|
343
metadata/gc.go
Normal file
343
metadata/gc.go
Normal file
@ -0,0 +1,343 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/gc"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
ResourceUnknown gc.ResourceType = iota
|
||||
ResourceContent
|
||||
ResourceSnapshot
|
||||
ResourceContainer
|
||||
ResourceTask
|
||||
)
|
||||
|
||||
var (
|
||||
labelGCRoot = []byte("containerd.io/gc.root")
|
||||
labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.")
|
||||
labelGCContentRef = []byte("containerd.io/gc.ref.content")
|
||||
)
|
||||
|
||||
func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// iterate through each namespace
|
||||
v1c := v1bkt.Cursor()
|
||||
|
||||
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
nbkt := v1bkt.Bucket(k)
|
||||
ns := string(k)
|
||||
|
||||
ibkt := nbkt.Bucket(bucketKeyObjectImages)
|
||||
if ibkt != nil {
|
||||
if err := ibkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
target := ibkt.Bucket(k).Bucket(bucketKeyTarget)
|
||||
if target != nil {
|
||||
contentKey := string(target.Get(bucketKeyDigest))
|
||||
select {
|
||||
case nc <- gcnode(ResourceContent, ns, contentKey):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return sendSnapshotRefs(ns, ibkt.Bucket(k), func(n gc.Node) {
|
||||
select {
|
||||
case nc <- n:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cbkt := nbkt.Bucket(bucketKeyObjectContent)
|
||||
if cbkt != nil {
|
||||
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
|
||||
}
|
||||
if cbkt != nil {
|
||||
if err := cbkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
return sendRootRef(ctx, nc, gcnode(ResourceContent, ns, string(k)), cbkt.Bucket(k))
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cbkt = nbkt.Bucket(bucketKeyObjectContainers)
|
||||
if cbkt != nil {
|
||||
if err := cbkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
snapshotter := string(cbkt.Bucket(k).Get(bucketKeySnapshotter))
|
||||
if snapshotter != "" {
|
||||
ss := string(cbkt.Bucket(k).Get(bucketKeySnapshotKey))
|
||||
select {
|
||||
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, ss)):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Send additional snapshot refs through labels
|
||||
return sendSnapshotRefs(ns, cbkt.Bucket(k), func(n gc.Node) {
|
||||
select {
|
||||
case nc <- n:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
|
||||
if sbkt != nil {
|
||||
if err := sbkt.ForEach(func(sk, sv []byte) error {
|
||||
if sv != nil {
|
||||
return nil
|
||||
}
|
||||
snbkt := sbkt.Bucket(sk)
|
||||
|
||||
return snbkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return sendRootRef(ctx, nc, gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)), snbkt.Bucket(k))
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func references(ctx context.Context, tx *bolt.Tx, node gc.Node, fn func(gc.Node)) error {
|
||||
if node.Type == ResourceContent {
|
||||
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(node.Key))
|
||||
if bkt == nil {
|
||||
// Node may be created from dead edge
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := sendSnapshotRefs(node.Namespace, bkt, fn); err != nil {
|
||||
return err
|
||||
}
|
||||
return sendContentRefs(node.Namespace, bkt, fn)
|
||||
} else if node.Type == ResourceSnapshot {
|
||||
parts := strings.SplitN(node.Key, "/", 2)
|
||||
if len(parts) != 2 {
|
||||
return errors.Errorf("invalid snapshot gc key %s", node.Key)
|
||||
}
|
||||
ss := parts[0]
|
||||
name := parts[1]
|
||||
|
||||
bkt := getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots, []byte(ss), []byte(name))
|
||||
if bkt == nil {
|
||||
getBucket(tx, bucketKeyVersion, []byte(node.Namespace), bucketKeyObjectSnapshots).ForEach(func(k, v []byte) error {
|
||||
return nil
|
||||
})
|
||||
|
||||
// Node may be created from dead edge
|
||||
return nil
|
||||
}
|
||||
|
||||
if pv := bkt.Get(bucketKeyParent); len(pv) > 0 {
|
||||
fn(gcnode(ResourceSnapshot, node.Namespace, fmt.Sprintf("%s/%s", ss, pv)))
|
||||
}
|
||||
|
||||
return sendSnapshotRefs(node.Namespace, bkt, fn)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func scanAll(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// iterate through each namespace
|
||||
v1c := v1bkt.Cursor()
|
||||
|
||||
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
nbkt := v1bkt.Bucket(k)
|
||||
ns := string(k)
|
||||
|
||||
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
|
||||
if sbkt != nil {
|
||||
if err := sbkt.ForEach(func(sk, sv []byte) error {
|
||||
if sv != nil {
|
||||
return nil
|
||||
}
|
||||
snbkt := sbkt.Bucket(sk)
|
||||
return snbkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
cbkt := nbkt.Bucket(bucketKeyObjectContent)
|
||||
if cbkt != nil {
|
||||
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
|
||||
}
|
||||
if cbkt != nil {
|
||||
if err := cbkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
select {
|
||||
case nc <- gcnode(ResourceContent, ns, string(k)):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
nsbkt := v1bkt.Bucket([]byte(node.Namespace))
|
||||
if nsbkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch node.Type {
|
||||
case ResourceContent:
|
||||
cbkt := nsbkt.Bucket(bucketKeyObjectContent)
|
||||
if cbkt != nil {
|
||||
cbkt = cbkt.Bucket(bucketKeyObjectBlob)
|
||||
}
|
||||
if cbkt != nil {
|
||||
log.G(ctx).WithField("key", node.Key).Debug("delete content")
|
||||
return cbkt.DeleteBucket([]byte(node.Key))
|
||||
}
|
||||
case ResourceSnapshot:
|
||||
sbkt := nsbkt.Bucket(bucketKeyObjectSnapshots)
|
||||
if sbkt != nil {
|
||||
parts := strings.SplitN(node.Key, "/", 2)
|
||||
if len(parts) != 2 {
|
||||
return errors.Errorf("invalid snapshot gc key %s", node.Key)
|
||||
}
|
||||
ssbkt := sbkt.Bucket([]byte(parts[0]))
|
||||
if ssbkt != nil {
|
||||
log.G(ctx).WithField("key", parts[1]).WithField("snapshotter", parts[0]).Debug("delete snapshot")
|
||||
return ssbkt.DeleteBucket([]byte(parts[1]))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendSnapshotRefs sends all snapshot references referred to by the labels in the bkt
|
||||
func sendSnapshotRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
|
||||
lbkt := bkt.Bucket(bucketKeyObjectLabels)
|
||||
if lbkt != nil {
|
||||
lc := lbkt.Cursor()
|
||||
|
||||
for k, v := lc.Seek(labelGCSnapRef); k != nil && strings.HasPrefix(string(k), string(labelGCSnapRef)); k, v = lc.Next() {
|
||||
snapshotter := string(k[len(labelGCSnapRef):])
|
||||
fn(gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", snapshotter, v)))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendContentRefs sends all content references referred to by the labels in the bkt
|
||||
func sendContentRefs(ns string, bkt *bolt.Bucket, fn func(gc.Node)) error {
|
||||
lbkt := bkt.Bucket(bucketKeyObjectLabels)
|
||||
if lbkt != nil {
|
||||
lc := lbkt.Cursor()
|
||||
|
||||
labelRef := string(labelGCContentRef)
|
||||
for k, v := lc.Seek(labelGCContentRef); k != nil && strings.HasPrefix(string(k), labelRef); k, v = lc.Next() {
|
||||
if ks := string(k); ks != labelRef {
|
||||
// Allow reference naming, ignore names
|
||||
if ks[len(labelRef)] != '.' {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
fn(gcnode(ResourceContent, ns, string(v)))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isRootRef(bkt *bolt.Bucket) bool {
|
||||
lbkt := bkt.Bucket(bucketKeyObjectLabels)
|
||||
if lbkt != nil {
|
||||
rv := lbkt.Get(labelGCRoot)
|
||||
if rv != nil {
|
||||
// TODO: interpret rv as a timestamp and skip if expired
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func sendRootRef(ctx context.Context, nc chan<- gc.Node, n gc.Node, bkt *bolt.Bucket) error {
|
||||
if isRootRef(bkt) {
|
||||
select {
|
||||
case nc <- n:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func gcnode(t gc.ResourceType, ns, key string) gc.Node {
|
||||
return gc.Node{
|
||||
Type: t,
|
||||
Namespace: ns,
|
||||
Key: key,
|
||||
}
|
||||
}
|
406
metadata/gc_test.go
Normal file
406
metadata/gc_test.go
Normal file
@ -0,0 +1,406 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/gc"
|
||||
"github.com/containerd/containerd/metadata/boltutil"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
func TestGCRoots(t *testing.T) {
|
||||
db, cleanup, err := newDatabase()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
alters := []alterFunc{
|
||||
addImage("ns1", "image1", dgst(1), nil),
|
||||
addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")),
|
||||
addContent("ns1", dgst(1), nil),
|
||||
addContent("ns1", dgst(2), nil),
|
||||
addContent("ns1", dgst(3), nil),
|
||||
addContent("ns2", dgst(1), nil),
|
||||
addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")),
|
||||
addSnapshot("ns1", "overlay", "sn1", "", nil),
|
||||
addSnapshot("ns1", "overlay", "sn2", "", nil),
|
||||
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
|
||||
}
|
||||
|
||||
expected := []gc.Node{
|
||||
gcnode(ResourceContent, "ns1", dgst(1).String()),
|
||||
gcnode(ResourceContent, "ns1", dgst(2).String()),
|
||||
gcnode(ResourceContent, "ns2", dgst(2).String()),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, alter := range alters {
|
||||
if err := alter(v1bkt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Update failed: %+v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
checkNodes(ctx, t, db, expected, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return scanRoots(ctx, tx, nc)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGCRemove(t *testing.T) {
|
||||
db, cleanup, err := newDatabase()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
alters := []alterFunc{
|
||||
addImage("ns1", "image1", dgst(1), nil),
|
||||
addImage("ns1", "image2", dgst(2), labelmap(string(labelGCSnapRef)+"overlay", "sn2")),
|
||||
addContent("ns1", dgst(1), nil),
|
||||
addContent("ns1", dgst(2), nil),
|
||||
addContent("ns1", dgst(3), nil),
|
||||
addContent("ns2", dgst(1), nil),
|
||||
addContent("ns2", dgst(2), labelmap(string(labelGCRoot), "always")),
|
||||
addSnapshot("ns1", "overlay", "sn1", "", nil),
|
||||
addSnapshot("ns1", "overlay", "sn2", "", nil),
|
||||
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
|
||||
addSnapshot("ns2", "overlay", "sn1", "", nil),
|
||||
}
|
||||
|
||||
all := []gc.Node{
|
||||
gcnode(ResourceContent, "ns1", dgst(1).String()),
|
||||
gcnode(ResourceContent, "ns1", dgst(2).String()),
|
||||
gcnode(ResourceContent, "ns1", dgst(3).String()),
|
||||
gcnode(ResourceContent, "ns2", dgst(1).String()),
|
||||
gcnode(ResourceContent, "ns2", dgst(2).String()),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
|
||||
}
|
||||
|
||||
var deleted, remaining []gc.Node
|
||||
for i, n := range all {
|
||||
if i%2 == 0 {
|
||||
deleted = append(deleted, n)
|
||||
} else {
|
||||
remaining = append(remaining, n)
|
||||
}
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, alter := range alters {
|
||||
if err := alter(v1bkt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Update failed: %+v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
checkNodes(ctx, t, db, all, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return scanAll(ctx, tx, nc)
|
||||
})
|
||||
if t.Failed() {
|
||||
t.Fatal("Scan all failed")
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
for _, n := range deleted {
|
||||
if err := remove(ctx, tx, n); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Update failed: %+v", err)
|
||||
}
|
||||
|
||||
checkNodes(ctx, t, db, remaining, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return scanAll(ctx, tx, nc)
|
||||
})
|
||||
}
|
||||
|
||||
func TestGCRefs(t *testing.T) {
|
||||
db, cleanup, err := newDatabase()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
alters := []alterFunc{
|
||||
addContent("ns1", dgst(1), nil),
|
||||
addContent("ns1", dgst(2), nil),
|
||||
addContent("ns1", dgst(3), nil),
|
||||
addContent("ns1", dgst(4), labelmap(string(labelGCContentRef), dgst(1).String())),
|
||||
addContent("ns1", dgst(5), labelmap(string(labelGCContentRef)+".anything-1", dgst(2).String(), string(labelGCContentRef)+".anything-2", dgst(3).String())),
|
||||
addContent("ns1", dgst(6), labelmap(string(labelGCContentRef)+"bad", dgst(1).String())),
|
||||
addContent("ns2", dgst(1), nil),
|
||||
addContent("ns2", dgst(2), nil),
|
||||
addSnapshot("ns1", "overlay", "sn1", "", nil),
|
||||
addSnapshot("ns1", "overlay", "sn2", "sn1", nil),
|
||||
addSnapshot("ns1", "overlay", "sn3", "sn2", nil),
|
||||
addSnapshot("ns1", "overlay", "sn4", "", labelmap(string(labelGCSnapRef)+"btrfs", "sn1", string(labelGCSnapRef)+"overlay", "sn1")),
|
||||
addSnapshot("ns1", "btrfs", "sn1", "", nil),
|
||||
addSnapshot("ns2", "overlay", "sn1", "", nil),
|
||||
addSnapshot("ns2", "overlay", "sn2", "sn1", nil),
|
||||
}
|
||||
|
||||
refs := map[gc.Node][]gc.Node{
|
||||
gcnode(ResourceContent, "ns1", dgst(1).String()): nil,
|
||||
gcnode(ResourceContent, "ns1", dgst(2).String()): nil,
|
||||
gcnode(ResourceContent, "ns1", dgst(3).String()): nil,
|
||||
gcnode(ResourceContent, "ns1", dgst(4).String()): {
|
||||
gcnode(ResourceContent, "ns1", dgst(1).String()),
|
||||
},
|
||||
gcnode(ResourceContent, "ns1", dgst(5).String()): {
|
||||
gcnode(ResourceContent, "ns1", dgst(2).String()),
|
||||
gcnode(ResourceContent, "ns1", dgst(3).String()),
|
||||
},
|
||||
gcnode(ResourceContent, "ns1", dgst(6).String()): nil,
|
||||
gcnode(ResourceContent, "ns2", dgst(1).String()): nil,
|
||||
gcnode(ResourceContent, "ns2", dgst(2).String()): nil,
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"): nil,
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"): {
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"),
|
||||
},
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"): {
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
|
||||
},
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn4"): {
|
||||
gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn1"),
|
||||
},
|
||||
gcnode(ResourceSnapshot, "ns1", "btrfs/sn1"): nil,
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"): nil,
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn2"): {
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
|
||||
},
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
v1bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, alter := range alters {
|
||||
if err := alter(v1bkt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatalf("Update failed: %+v", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
for n, nodes := range refs {
|
||||
checkNodes(ctx, t, db, nodes, func(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return references(ctx, tx, n, func(n gc.Node) {
|
||||
select {
|
||||
case nc <- n:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
})
|
||||
})
|
||||
if t.Failed() {
|
||||
t.Fatalf("Failure scanning %v", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newDatabase() (*bolt.DB, func(), error) {
|
||||
td, err := ioutil.TempDir("", "gc-roots-")
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
db, err := bolt.Open(filepath.Join(td, "test.db"), 0777, nil)
|
||||
if err != nil {
|
||||
os.RemoveAll(td)
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return db, func() {
|
||||
db.Close()
|
||||
os.RemoveAll(td)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func checkNodes(ctx context.Context, t *testing.T, db *bolt.DB, expected []gc.Node, fn func(context.Context, *bolt.Tx, chan<- gc.Node) error) {
|
||||
var actual []gc.Node
|
||||
nc := make(chan gc.Node)
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for n := range nc {
|
||||
actual = append(actual, n)
|
||||
}
|
||||
}()
|
||||
if err := db.View(func(tx *bolt.Tx) error {
|
||||
defer close(nc)
|
||||
return fn(ctx, tx, nc)
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
<-done
|
||||
checkNodesEqual(t, actual, expected)
|
||||
}
|
||||
|
||||
func checkNodesEqual(t *testing.T, n1, n2 []gc.Node) {
|
||||
sort.Sort(nodeList(n1))
|
||||
sort.Sort(nodeList(n2))
|
||||
|
||||
if len(n1) != len(n2) {
|
||||
t.Errorf("Nodes do not match\n\tExpected:\n\t%v\n\tActual:\n\t%v", n2, n1)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range n1 {
|
||||
if n1[i] != n2[i] {
|
||||
t.Errorf("[%d] root does not match expected: expected %v, got %v", i, n2[i], n1[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type nodeList []gc.Node
|
||||
|
||||
func (nodes nodeList) Len() int {
|
||||
return len(nodes)
|
||||
}
|
||||
|
||||
func (nodes nodeList) Less(i, j int) bool {
|
||||
if nodes[i].Type != nodes[j].Type {
|
||||
return nodes[i].Type < nodes[j].Type
|
||||
}
|
||||
if nodes[i].Namespace != nodes[j].Namespace {
|
||||
return nodes[i].Namespace < nodes[j].Namespace
|
||||
}
|
||||
return nodes[i].Key < nodes[j].Key
|
||||
}
|
||||
|
||||
func (nodes nodeList) Swap(i, j int) {
|
||||
nodes[i], nodes[j] = nodes[j], nodes[i]
|
||||
}
|
||||
|
||||
type alterFunc func(bkt *bolt.Bucket) error
|
||||
|
||||
func addImage(ns, name string, dgst digest.Digest, labels map[string]string) alterFunc {
|
||||
return func(bkt *bolt.Bucket) error {
|
||||
ibkt, err := createBuckets(bkt, ns, string(bucketKeyObjectImages), name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
tbkt, err := ibkt.CreateBucket(bucketKeyTarget)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tbkt.Put(bucketKeyDigest, []byte(dgst.String())); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return boltutil.WriteLabels(ibkt, labels)
|
||||
}
|
||||
}
|
||||
|
||||
func addSnapshot(ns, snapshotter, name, parent string, labels map[string]string) alterFunc {
|
||||
return func(bkt *bolt.Bucket) error {
|
||||
sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectSnapshots), snapshotter, name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if parent != "" {
|
||||
if err := sbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return boltutil.WriteLabels(sbkt, labels)
|
||||
}
|
||||
}
|
||||
|
||||
func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFunc {
|
||||
return func(bkt *bolt.Bucket) error {
|
||||
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContent), string(bucketKeyObjectBlob), dgst.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return boltutil.WriteLabels(cbkt, labels)
|
||||
}
|
||||
}
|
||||
|
||||
func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc {
|
||||
return func(bkt *bolt.Bucket) error {
|
||||
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cbkt.Put(bucketKeySnapshotter, []byte(snapshotter)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cbkt.Put(bucketKeySnapshotKey, []byte(snapshot)); err != nil {
|
||||
return err
|
||||
}
|
||||
return boltutil.WriteLabels(cbkt, labels)
|
||||
}
|
||||
}
|
||||
|
||||
func createBuckets(bkt *bolt.Bucket, names ...string) (*bolt.Bucket, error) {
|
||||
for _, name := range names {
|
||||
nbkt, err := bkt.CreateBucketIfNotExists([]byte(name))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
bkt = nbkt
|
||||
}
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
func labelmap(kv ...string) map[string]string {
|
||||
if len(kv)%2 != 0 {
|
||||
panic("bad labels argument")
|
||||
}
|
||||
l := map[string]string{}
|
||||
for i := 0; i < len(kv); i = i + 2 {
|
||||
l[kv[i]] = kv[i+1]
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func dgst(i int64) digest.Digest {
|
||||
r := rand.New(rand.NewSource(i))
|
||||
dgstr := digest.SHA256.Digester()
|
||||
if _, err := io.CopyN(dgstr.Hash(), r, 256); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return dgstr.Digest()
|
||||
}
|
@ -4,11 +4,13 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/labels"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/metadata/boltutil"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
@ -19,12 +21,13 @@ import (
|
||||
type snapshotter struct {
|
||||
snapshot.Snapshotter
|
||||
name string
|
||||
db transactor
|
||||
db *DB
|
||||
l sync.RWMutex
|
||||
}
|
||||
|
||||
// newSnapshotter returns a new Snapshotter which namespaces the given snapshot
|
||||
// using the provided name and database.
|
||||
func newSnapshotter(db transactor, name string, sn snapshot.Snapshotter) snapshot.Snapshotter {
|
||||
func newSnapshotter(db *DB, name string, sn snapshot.Snapshotter) *snapshotter {
|
||||
return &snapshotter{
|
||||
Snapshotter: sn,
|
||||
name: name,
|
||||
@ -125,6 +128,9 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro
|
||||
}
|
||||
|
||||
func (s *snapshotter) Update(ctx context.Context, info snapshot.Info, fieldpaths ...string) (snapshot.Info, error) {
|
||||
s.l.RLock()
|
||||
defer s.l.RUnlock()
|
||||
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return snapshot.Info{}, err
|
||||
@ -249,6 +255,9 @@ func (s *snapshotter) View(ctx context.Context, key, parent string, opts ...snap
|
||||
}
|
||||
|
||||
func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, readonly bool, opts []snapshot.Opt) ([]mount.Mount, error) {
|
||||
s.l.RLock()
|
||||
defer s.l.RUnlock()
|
||||
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -332,6 +341,9 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
|
||||
}
|
||||
|
||||
func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshot.Opt) error {
|
||||
s.l.RLock()
|
||||
defer s.l.RUnlock()
|
||||
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -421,6 +433,9 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
|
||||
}
|
||||
|
||||
func (s *snapshotter) Remove(ctx context.Context, key string) error {
|
||||
s.l.RLock()
|
||||
defer s.l.RUnlock()
|
||||
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -457,7 +472,16 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
|
||||
}
|
||||
}
|
||||
|
||||
return bkt.DeleteBucket([]byte(key))
|
||||
if err := bkt.DeleteBucket([]byte(key)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mark snapshotter as dirty for triggering garbage collection
|
||||
s.db.dirtyL.Lock()
|
||||
s.db.dirtySS[s.name] = struct{}{}
|
||||
s.db.dirtyL.Unlock()
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
@ -565,3 +589,134 @@ func validateSnapshot(info *snapshot.Info) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshotter) garbageCollect(ctx context.Context) error {
|
||||
logger := log.G(ctx).WithField("snapshotter", s.name)
|
||||
lt1 := time.Now()
|
||||
s.l.Lock()
|
||||
defer func() {
|
||||
s.l.Unlock()
|
||||
logger.WithField("t", time.Now().Sub(lt1)).Debugf("garbage collected")
|
||||
}()
|
||||
|
||||
seen := map[string]struct{}{}
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// iterate through each namespace
|
||||
v1c := v1bkt.Cursor()
|
||||
|
||||
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
sbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectSnapshots)
|
||||
if sbkt == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Load specific snapshotter
|
||||
ssbkt := sbkt.Bucket([]byte(s.name))
|
||||
if ssbkt == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := ssbkt.ForEach(func(sk, sv []byte) error {
|
||||
if sv == nil {
|
||||
bkey := ssbkt.Bucket(sk).Get(bucketKeyName)
|
||||
if len(bkey) > 0 {
|
||||
seen[string(bkey)] = struct{}{}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
roots, err := s.walkTree(ctx, seen)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Unlock before prune (once nodes are fully unavailable)
|
||||
|
||||
for _, node := range roots {
|
||||
if err := s.pruneBranch(ctx, node); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type treeNode struct {
|
||||
info snapshot.Info
|
||||
remove bool
|
||||
children []*treeNode
|
||||
}
|
||||
|
||||
func (s *snapshotter) walkTree(ctx context.Context, seen map[string]struct{}) ([]*treeNode, error) {
|
||||
roots := []*treeNode{}
|
||||
nodes := map[string]*treeNode{}
|
||||
|
||||
if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, info snapshot.Info) error {
|
||||
_, isSeen := seen[info.Name]
|
||||
node, ok := nodes[info.Name]
|
||||
if !ok {
|
||||
node = &treeNode{}
|
||||
nodes[info.Name] = node
|
||||
}
|
||||
|
||||
node.remove = !isSeen
|
||||
node.info = info
|
||||
|
||||
if info.Parent == "" {
|
||||
roots = append(roots, node)
|
||||
} else {
|
||||
parent, ok := nodes[info.Parent]
|
||||
if !ok {
|
||||
parent = &treeNode{}
|
||||
nodes[info.Parent] = parent
|
||||
}
|
||||
parent.children = append(parent.children, node)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return roots, nil
|
||||
}
|
||||
|
||||
func (s *snapshotter) pruneBranch(ctx context.Context, node *treeNode) error {
|
||||
for _, child := range node.children {
|
||||
if err := s.pruneBranch(ctx, child); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if node.remove {
|
||||
logger := log.G(ctx).WithField("snapshotter", s.name)
|
||||
if err := s.Snapshotter.Remove(ctx, node.info.Name); err != nil {
|
||||
if !errdefs.IsFailedPrecondition(err) {
|
||||
return err
|
||||
}
|
||||
logger.WithError(err).WithField("key", node.info.Name).Warnf("snapshot removal failed")
|
||||
} else {
|
||||
logger.WithField("key", node.info.Name).Debug("removed snapshot")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package remotes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
@ -11,6 +12,7 @@ import (
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -102,7 +104,76 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
return content.Copy(ctx, cw, rc, desc.Size, desc.Digest)
|
||||
r, opts := commitOpts(desc, rc)
|
||||
return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
|
||||
}
|
||||
|
||||
// commitOpts gets the appropriate content options to alter
|
||||
// the content info on commit based on media type.
|
||||
func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) {
|
||||
var childrenF func(r io.Reader) ([]ocispec.Descriptor, error)
|
||||
|
||||
switch desc.MediaType {
|
||||
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||
childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) {
|
||||
var (
|
||||
manifest ocispec.Manifest
|
||||
decoder = json.NewDecoder(r)
|
||||
)
|
||||
if err := decoder.Decode(&manifest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append([]ocispec.Descriptor{manifest.Config}, manifest.Layers...), nil
|
||||
}
|
||||
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
||||
childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) {
|
||||
var (
|
||||
index ocispec.Index
|
||||
decoder = json.NewDecoder(r)
|
||||
)
|
||||
if err := decoder.Decode(&index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return index.Manifests, nil
|
||||
}
|
||||
default:
|
||||
return r, nil
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
var children []ocispec.Descriptor
|
||||
errC := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer close(errC)
|
||||
ch, err := childrenF(pr)
|
||||
if err != nil {
|
||||
errC <- err
|
||||
}
|
||||
children = ch
|
||||
}()
|
||||
|
||||
opt := func(info *content.Info) error {
|
||||
err := <-errC
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get commit labels")
|
||||
}
|
||||
|
||||
if len(children) > 0 {
|
||||
if info.Labels == nil {
|
||||
info.Labels = map[string]string{}
|
||||
}
|
||||
for i, ch := range children {
|
||||
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return io.TeeReader(r, pw), []content.Opt{opt}
|
||||
}
|
||||
|
||||
// PushHandler returns a handler that will push all content from the provider
|
||||
|
@ -63,8 +63,8 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
|
||||
|
||||
key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
|
||||
|
||||
// Prepare snapshot with from parent
|
||||
mounts, err := sn.Prepare(ctx, key, parent.String())
|
||||
// Prepare snapshot with from parent, label as root
|
||||
mounts, err := sn.Prepare(ctx, key, parent.String(), opts...)
|
||||
if err != nil {
|
||||
//TODO: If is snapshot exists error, retry
|
||||
return false, errors.Wrap(err, "failed to prepare extraction layer")
|
||||
@ -87,7 +87,7 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err = sn.Commit(ctx, chainID.String(), key); err != nil {
|
||||
if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return false, errors.Wrapf(err, "failed to commit snapshot %s", parent)
|
||||
}
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/containerd/containerd/metadata"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -160,6 +161,10 @@ func (s *Service) Delete(ctx context.Context, req *api.DeleteContainerRequest) (
|
||||
return &empty.Empty{}, err
|
||||
}
|
||||
|
||||
if err := s.db.GarbageCollect(ctx); err != nil {
|
||||
return &empty.Empty{}, errdefs.ToGRPC(errors.Wrap(err, "garbage collection failed"))
|
||||
}
|
||||
|
||||
return &empty.Empty{}, nil
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/containerd/containerd/metadata"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -161,6 +162,10 @@ func (s *Service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := s.db.GarbageCollect(ctx); err != nil {
|
||||
return nil, errdefs.ToGRPC(errors.Wrap(err, "garbage collection failed"))
|
||||
}
|
||||
|
||||
return &empty.Empty{}, nil
|
||||
}
|
||||
|
||||
|
@ -567,7 +567,10 @@ func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := writer.Commit(ctx, 0, ""); err != nil {
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &types.Descriptor{
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sys/unix"
|
||||
|
||||
@ -20,6 +21,7 @@ import (
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/opencontainers/runc/libcontainer/user"
|
||||
@ -258,16 +260,19 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
|
||||
snapshotter = client.SnapshotService(c.Snapshotter)
|
||||
parent = identity.ChainID(diffIDs).String()
|
||||
usernsID = fmt.Sprintf("%s-%d-%d", parent, uid, gid)
|
||||
opt = snapshot.WithLabels(map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
||||
})
|
||||
)
|
||||
if _, err := snapshotter.Stat(ctx, usernsID); err == nil {
|
||||
if _, err := snapshotter.Prepare(ctx, id, usernsID); err != nil {
|
||||
if _, err := snapshotter.Prepare(ctx, id, usernsID, opt); err != nil {
|
||||
return err
|
||||
}
|
||||
c.SnapshotKey = id
|
||||
c.Image = i.Name()
|
||||
return nil
|
||||
}
|
||||
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent)
|
||||
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent, opt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -275,13 +280,13 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
|
||||
snapshotter.Remove(ctx, usernsID)
|
||||
return err
|
||||
}
|
||||
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap"); err != nil {
|
||||
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap", opt); err != nil {
|
||||
return err
|
||||
}
|
||||
if readonly {
|
||||
_, err = snapshotter.View(ctx, id, usernsID)
|
||||
_, err = snapshotter.View(ctx, id, usernsID, opt)
|
||||
} else {
|
||||
_, err = snapshotter.Prepare(ctx, id, usernsID)
|
||||
_, err = snapshotter.Prepare(ctx, id, usernsID, opt)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
|
35
task.go
35
task.go
@ -17,6 +17,7 @@ import (
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/diff"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/rootfs"
|
||||
@ -24,6 +25,7 @@ import (
|
||||
google_protobuf "github.com/gogo/protobuf/types"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
specs "github.com/opencontainers/runtime-spec/specs-go"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@ -486,7 +488,13 @@ func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tas
|
||||
}
|
||||
|
||||
func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, snapshotterName string, id string) error {
|
||||
rw, err := rootfs.Diff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id)))
|
||||
opts := []diff.Opt{
|
||||
diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id)),
|
||||
diff.WithLabels(map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
||||
}),
|
||||
}
|
||||
rw, err := rootfs.Diff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), opts...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -510,15 +518,32 @@ func (t *task) checkpointImage(ctx context.Context, index *v1.Index, image strin
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *task) writeIndex(ctx context.Context, index *v1.Index) (v1.Descriptor, error) {
|
||||
func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor, err error) {
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
||||
}
|
||||
|
||||
for i, m := range index.Manifests {
|
||||
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
|
||||
defer func(m ocispec.Descriptor) {
|
||||
if err == nil {
|
||||
info := content.Info{Digest: m.Digest}
|
||||
if _, uerr := t.client.ContentStore().Update(ctx, info, "labels.containerd.io/gc.root"); uerr != nil {
|
||||
log.G(ctx).WithError(uerr).WithField("dgst", m.Digest).Warnf("failed to remove root marker")
|
||||
}
|
||||
}
|
||||
}(m)
|
||||
}
|
||||
|
||||
buf := bytes.NewBuffer(nil)
|
||||
if err := json.NewEncoder(buf).Encode(index); err != nil {
|
||||
return v1.Descriptor{}, err
|
||||
}
|
||||
return writeContent(ctx, t.client.ContentStore(), v1.MediaTypeImageIndex, t.id, buf)
|
||||
|
||||
return writeContent(ctx, t.client.ContentStore(), v1.MediaTypeImageIndex, t.id, buf, content.WithLabels(labels))
|
||||
}
|
||||
|
||||
func writeContent(ctx context.Context, store content.Store, mediaType, ref string, r io.Reader) (d v1.Descriptor, err error) {
|
||||
func writeContent(ctx context.Context, store content.Store, mediaType, ref string, r io.Reader, opts ...content.Opt) (d v1.Descriptor, err error) {
|
||||
writer, err := store.Writer(ctx, ref, 0, "")
|
||||
if err != nil {
|
||||
return d, err
|
||||
@ -528,7 +553,7 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin
|
||||
if err != nil {
|
||||
return d, err
|
||||
}
|
||||
if err := writer.Commit(ctx, size, ""); err != nil {
|
||||
if err := writer.Commit(ctx, size, "", opts...); err != nil {
|
||||
return d, err
|
||||
}
|
||||
return v1.Descriptor{
|
||||
|
Loading…
Reference in New Issue
Block a user