containerd/metadata/content.go
Derek McGowan 00596f400e
Add gc policy plugin
Add garbage collection as a background process and policy
configuration for configuring when to run garbage collection.
By default garbage collection will run when deletion occurs
and no more than 20ms out of every second.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
2017-11-20 16:57:39 -08:00

589 lines
13 KiB
Go

package metadata
import (
"context"
"encoding/binary"
"strings"
"sync"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/labels"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
type contentStore struct {
content.Store
db *DB
l sync.RWMutex
}
// newContentStore returns a namespaced content store using an existing
// content store interface.
func newContentStore(db *DB, cs content.Store) *contentStore {
return &contentStore{
Store: cs,
db: db,
}
}
func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return content.Info{}, err
}
var info content.Info
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, dgst)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
}
info.Digest = dgst
return readInfo(&info, bkt)
}); err != nil {
return content.Info{}, err
}
return info, nil
}
func (cs *contentStore) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return content.Info{}, err
}
cs.l.RLock()
defer cs.l.RUnlock()
updated := content.Info{
Digest: info.Digest,
}
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, info.Digest)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", info.Digest)
}
if err := readInfo(&updated, bkt); err != nil {
return errors.Wrapf(err, "info %q", info.Digest)
}
if len(fieldpaths) > 0 {
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if updated.Labels == nil {
updated.Labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
updated.Labels[key] = info.Labels[key]
continue
}
switch path {
case "labels":
updated.Labels = info.Labels
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on content info %q", path, info.Digest)
}
}
} else {
// Set mutable fields
updated.Labels = info.Labels
}
if err := validateInfo(&updated); err != nil {
return err
}
updated.UpdatedAt = time.Now().UTC()
return writeInfo(&updated, bkt)
}); err != nil {
return content.Info{}, err
}
return updated, nil
}
func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
filter, err := filters.ParseAll(fs...)
if err != nil {
return err
}
// TODO: Batch results to keep from reading all info into memory
var infos []content.Info
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobsBucket(tx, ns)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
dgst, err := digest.Parse(string(k))
if err != nil {
// Not a digest, skip
return nil
}
bbkt := bkt.Bucket(k)
if bbkt == nil {
return nil
}
info := content.Info{
Digest: dgst,
}
if err := readInfo(&info, bkt.Bucket(k)); err != nil {
return err
}
if filter.Match(adaptContentInfo(info)) {
infos = append(infos, info)
}
return nil
})
}); err != nil {
return err
}
for _, info := range infos {
if err := fn(info); err != nil {
return err
}
}
return nil
}
func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
cs.l.RLock()
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, dgst)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
}
if err := getBlobsBucket(tx, ns).DeleteBucket([]byte(dgst.String())); err != nil {
return err
}
if err := removeContentLease(ctx, tx, dgst); err != nil {
return err
}
// Mark content store as dirty for triggering garbage collection
cs.db.dirtyL.Lock()
cs.db.dirtyCS = true
cs.db.dirtyL.Unlock()
return nil
})
}
func (cs *contentStore) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
filter, err := filters.ParseAll(fs...)
if err != nil {
return nil, err
}
brefs := map[string]string{}
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
if bkt == nil {
return nil
}
return bkt.ForEach(func(k, v []byte) error {
// TODO(dmcgowan): match name and potentially labels here
brefs[string(k)] = string(v)
return nil
})
}); err != nil {
return nil, err
}
statuses := make([]content.Status, 0, len(brefs))
for k, bref := range brefs {
status, err := cs.Store.Status(ctx, bref)
if err != nil {
if errdefs.IsNotFound(err) {
continue
}
return nil, err
}
status.Ref = k
if filter.Match(adaptContentStatus(status)) {
statuses = append(statuses, status)
}
}
return statuses, nil
}
func getRef(tx *bolt.Tx, ns, ref string) string {
bkt := getIngestBucket(tx, ns)
if bkt == nil {
return ""
}
v := bkt.Get([]byte(ref))
if len(v) == 0 {
return ""
}
return string(v)
}
func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return content.Status{}, err
}
var bref string
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
bref = getRef(tx, ns, ref)
if bref == "" {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
return nil
}); err != nil {
return content.Status{}, err
}
st, err := cs.Store.Status(ctx, bref)
if err != nil {
return content.Status{}, err
}
st.Ref = ref
return st, nil
}
func (cs *contentStore) Abort(ctx context.Context, ref string) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
cs.l.RLock()
defer cs.l.RUnlock()
return update(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, ns)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
bref := string(bkt.Get([]byte(ref)))
if bref == "" {
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
}
if err := bkt.Delete([]byte(ref)); err != nil {
return err
}
return cs.Store.Abort(ctx, bref)
})
}
func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
cs.l.RLock()
defer cs.l.RUnlock()
var w content.Writer
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
if expected != "" {
cbkt := getBlobBucket(tx, ns, expected)
if cbkt != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
}
}
bkt, err := createIngestBucket(tx, ns)
if err != nil {
return err
}
var (
bref string
brefb = bkt.Get([]byte(ref))
)
if brefb == nil {
sid, err := bkt.NextSequence()
if err != nil {
return err
}
bref = createKey(sid, ns, ref)
if err := bkt.Put([]byte(ref), []byte(bref)); err != nil {
return err
}
} else {
bref = string(brefb)
}
// Do not use the passed in expected value here since it was
// already checked against the user metadata. If the content
// store has the content, it must still be written before
// linked into the given namespace. It is possible in the future
// to allow content which exists in content store but not
// namespace to be linked here and returned an exist error, but
// this would require more configuration to make secure.
w, err = cs.Store.Writer(ctx, bref, size, "")
return err
}); err != nil {
return nil, err
}
// TODO: keep the expected in the writer to use on commit
// when no expected is provided there.
return &namespacedWriter{
Writer: w,
ref: ref,
namespace: ns,
db: cs.db,
l: &cs.l,
}, nil
}
type namespacedWriter struct {
content.Writer
ref string
namespace string
db transactor
l *sync.RWMutex
}
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
nw.l.RLock()
defer nw.l.RUnlock()
return update(ctx, nw.db, func(tx *bolt.Tx) error {
bkt := getIngestBucket(tx, nw.namespace)
if bkt != nil {
if err := bkt.Delete([]byte(nw.ref)); err != nil {
return err
}
}
dgst, err := nw.commit(ctx, tx, size, expected, opts...)
if err != nil {
return err
}
return addContentLease(ctx, tx, dgst)
})
}
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) {
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return "", err
}
}
if err := validateInfo(&base); err != nil {
return "", err
}
status, err := nw.Writer.Status()
if err != nil {
return "", err
}
if size != 0 && size != status.Offset {
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
actual := nw.Writer.Digest()
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return "", err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
if err != nil {
return "", err
}
commitTime := time.Now().UTC()
sizeEncoded, err := encodeInt(size)
if err != nil {
return "", err
}
if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil {
return "", err
}
if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
return "", err
}
return actual, bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {
st, err := nw.Writer.Status()
if err == nil {
st.Ref = nw.ref
}
return st, err
}
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
if err := cs.checkAccess(ctx, dgst); err != nil {
return nil, err
}
return cs.Store.ReaderAt(ctx, dgst)
}
func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
return view(ctx, cs.db, func(tx *bolt.Tx) error {
bkt := getBlobBucket(tx, ns, dgst)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
}
return nil
})
}
func validateInfo(info *content.Info) error {
for k, v := range info.Labels {
if err := labels.Validate(k, v); err == nil {
return errors.Wrapf(err, "info.Labels")
}
}
return nil
}
func readInfo(info *content.Info, bkt *bolt.Bucket) error {
if err := boltutil.ReadTimestamps(bkt, &info.CreatedAt, &info.UpdatedAt); err != nil {
return err
}
labels, err := boltutil.ReadLabels(bkt)
if err != nil {
return err
}
info.Labels = labels
if v := bkt.Get(bucketKeySize); len(v) > 0 {
info.Size, _ = binary.Varint(v)
}
return nil
}
func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
if err := boltutil.WriteTimestamps(bkt, info.CreatedAt, info.UpdatedAt); err != nil {
return err
}
if err := boltutil.WriteLabels(bkt, info.Labels); err != nil {
return errors.Wrapf(err, "writing labels for info %v", info.Digest)
}
// Write size
sizeEncoded, err := encodeInt(info.Size)
if err != nil {
return err
}
return bkt.Put(bucketKeySize, sizeEncoded)
}
func (cs *contentStore) garbageCollect(ctx context.Context) (d time.Duration, err error) {
cs.l.Lock()
t1 := time.Now()
defer func() {
if err == nil {
d = time.Now().Sub(t1)
}
cs.l.Unlock()
}()
seen := map[string]struct{}{}
if err := cs.db.View(func(tx *bolt.Tx) error {
v1bkt := tx.Bucket(bucketKeyVersion)
if v1bkt == nil {
return nil
}
// iterate through each namespace
v1c := v1bkt.Cursor()
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
if v != nil {
continue
}
cbkt := v1bkt.Bucket(k).Bucket(bucketKeyObjectContent)
if cbkt == nil {
continue
}
bbkt := cbkt.Bucket(bucketKeyObjectBlob)
if err := bbkt.ForEach(func(ck, cv []byte) error {
if cv == nil {
seen[string(ck)] = struct{}{}
}
return nil
}); err != nil {
return err
}
}
return nil
}); err != nil {
return 0, err
}
err = cs.Store.Walk(ctx, func(info content.Info) error {
if _, ok := seen[info.Digest.String()]; !ok {
if err := cs.Store.Delete(ctx, info.Digest); err != nil {
return err
}
log.G(ctx).WithField("digest", info.Digest).Debug("removed content")
}
return nil
})
return
}