Support for ingest namespacing
Move content status to list statuses and add single status to interface. Updates API to support list statuses and status Updates snapshot key creation to be generic Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
@@ -34,7 +34,9 @@ var (
|
||||
bucketKeyObjectImages = []byte("images") // stores image objects
|
||||
bucketKeyObjectContainers = []byte("containers") // stores container objects
|
||||
bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references
|
||||
bucketKeyObjectContent = []byte("content") // stores content links
|
||||
bucketKeyObjectContent = []byte("content") // stores content references
|
||||
bucketKeyObjectBlob = []byte("blob") // stores content links
|
||||
bucketKeyObjectIngest = []byte("ingest") // stores content links
|
||||
|
||||
bucketKeyDigest = []byte("digest")
|
||||
bucketKeyMediaType = []byte("mediatype")
|
||||
@@ -142,18 +144,30 @@ func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Buck
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots, []byte(snapshotter))
|
||||
}
|
||||
|
||||
func createContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) {
|
||||
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String()))
|
||||
func createBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) {
|
||||
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String()))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
func getAllContentBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent)
|
||||
func getBlobsBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob)
|
||||
}
|
||||
|
||||
func getContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String()))
|
||||
func getBlobBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectBlob, []byte(dgst.String()))
|
||||
}
|
||||
|
||||
func createIngestBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
|
||||
bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
func getIngestBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngest)
|
||||
}
|
||||
|
||||
@@ -4,11 +4,14 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"regexp"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/namespaces"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type contentStore struct {
|
||||
@@ -33,9 +36,9 @@ func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.I
|
||||
|
||||
var info content.Info
|
||||
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getContentBucket(tx, ns, dgst)
|
||||
bkt := getBlobBucket(tx, ns, dgst)
|
||||
if bkt == nil {
|
||||
return content.ErrNotFound("")
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
|
||||
}
|
||||
|
||||
info.Digest = dgst
|
||||
@@ -56,7 +59,7 @@ func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
||||
// TODO: Batch results to keep from reading all info into memory
|
||||
var infos []content.Info
|
||||
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getAllContentBucket(tx, ns)
|
||||
bkt := getBlobsBucket(tx, ns)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -95,37 +98,121 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
}
|
||||
|
||||
return update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getContentBucket(tx, ns, dgst)
|
||||
bkt := getBlobBucket(tx, ns, dgst)
|
||||
if bkt == nil {
|
||||
return content.ErrNotFound("")
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
|
||||
}
|
||||
|
||||
// Just remove local reference, garbage collector is responsible for
|
||||
// cleaning up on disk content
|
||||
return getAllContentBucket(tx, ns).Delete([]byte(dgst.String()))
|
||||
return getBlobsBucket(tx, ns).Delete([]byte(dgst.String()))
|
||||
})
|
||||
}
|
||||
|
||||
func (cs *contentStore) Status(ctx context.Context, re string) ([]content.Status, error) {
|
||||
_, err := namespaces.NamespaceRequired(ctx)
|
||||
func (cs *contentStore) ListStatuses(ctx context.Context, re string) ([]content.Status, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: Read status keys and match
|
||||
var rec *regexp.Regexp
|
||||
if re != "" {
|
||||
rec, err = regexp.Compile(re)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return cs.Store.Status(ctx, re)
|
||||
var brefs []string
|
||||
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getIngestBucket(tx, ns)
|
||||
if bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
if rec == nil || rec.Match(k) {
|
||||
brefs = append(brefs, string(v))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
statuses := make([]content.Status, 0, len(brefs))
|
||||
for _, bref := range brefs {
|
||||
status, err := cs.Store.Status(ctx, bref)
|
||||
if err != nil {
|
||||
if errdefs.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
status.Ref = trimKey(status.Ref)
|
||||
|
||||
statuses = append(statuses, status)
|
||||
}
|
||||
|
||||
return statuses, nil
|
||||
|
||||
}
|
||||
|
||||
func getRef(tx *bolt.Tx, ns, ref string) string {
|
||||
bkt := getIngestBucket(tx, ns)
|
||||
if bkt == nil {
|
||||
return ""
|
||||
}
|
||||
v := bkt.Get([]byte(ref))
|
||||
if len(v) == 0 {
|
||||
return ""
|
||||
}
|
||||
return string(v)
|
||||
}
|
||||
|
||||
func (cs *contentStore) Status(ctx context.Context, ref string) (content.Status, error) {
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return content.Status{}, err
|
||||
}
|
||||
|
||||
var bref string
|
||||
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bref = getRef(tx, ns, ref)
|
||||
if bref == "" {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return content.Status{}, err
|
||||
}
|
||||
|
||||
return cs.Store.Status(ctx, bref)
|
||||
}
|
||||
|
||||
func (cs *contentStore) Abort(ctx context.Context, ref string) error {
|
||||
_, err := namespaces.NamespaceRequired(ctx)
|
||||
ns, err := namespaces.NamespaceRequired(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: Read status key and delete
|
||||
return update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getIngestBucket(tx, ns)
|
||||
if bkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
|
||||
}
|
||||
bref := string(bkt.Get([]byte(ref)))
|
||||
if bref == "" {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "reference %v", ref)
|
||||
}
|
||||
if err := bkt.Delete([]byte(ref)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return cs.Store.Abort(ctx, bref)
|
||||
})
|
||||
|
||||
return cs.Store.Abort(ctx, ref)
|
||||
}
|
||||
|
||||
func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
||||
@@ -134,29 +221,44 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: Create ref key
|
||||
|
||||
if expected != "" {
|
||||
if err := view(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getContentBucket(tx, ns, expected)
|
||||
if bkt != nil {
|
||||
return content.ErrExists("")
|
||||
var w content.Writer
|
||||
if err := update(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
if expected != "" {
|
||||
cbkt := getBlobBucket(tx, ns, expected)
|
||||
if cbkt != nil {
|
||||
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", expected)
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Do not use the passed in expected value here since it was
|
||||
// already checked against the user metadata. If the content
|
||||
// store has the content, it must still be written before
|
||||
// linked into the given namespace. It is possible in the future
|
||||
// to allow content which exists in content store but not
|
||||
// namespace to be linked here and returned an exist error, but
|
||||
// this would require more configuration to make secure.
|
||||
w, err := cs.Store.Writer(ctx, ref, size, "")
|
||||
if err != nil {
|
||||
bkt, err := createIngestBucket(tx, ns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(bkt.Get([]byte(ref))) > 0 {
|
||||
return errors.Wrapf(errdefs.ErrUnavailable, "ref %v is currently in use", ref)
|
||||
}
|
||||
|
||||
sid, err := bkt.NextSequence()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bref := createKey(sid, ns, ref)
|
||||
if err := bkt.Put([]byte(ref), []byte(bref)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Do not use the passed in expected value here since it was
|
||||
// already checked against the user metadata. If the content
|
||||
// store has the content, it must still be written before
|
||||
// linked into the given namespace. It is possible in the future
|
||||
// to allow content which exists in content store but not
|
||||
// namespace to be linked here and returned an exist error, but
|
||||
// this would require more configuration to make secure.
|
||||
w, err = cs.Store.Writer(ctx, bref, size, "")
|
||||
return err
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -164,6 +266,7 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
// when no expected is provided there.
|
||||
return &namespacedWriter{
|
||||
Writer: w,
|
||||
ref: ref,
|
||||
namespace: ns,
|
||||
db: cs.db,
|
||||
}, nil
|
||||
@@ -171,22 +274,21 @@ func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expe
|
||||
|
||||
type namespacedWriter struct {
|
||||
content.Writer
|
||||
ref string
|
||||
namespace string
|
||||
db *bolt.DB
|
||||
}
|
||||
|
||||
func (nw *namespacedWriter) Commit(size int64, expected digest.Digest) error {
|
||||
tx, err := nw.db.Begin(true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := nw.commit(tx, size, expected); err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
return nw.db.Update(func(tx *bolt.Tx) error {
|
||||
bkt := getIngestBucket(tx, nw.namespace)
|
||||
if bkt != nil {
|
||||
if err := bkt.Delete([]byte(nw.ref)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nw.commit(tx, size, expected)
|
||||
})
|
||||
}
|
||||
|
||||
func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest) error {
|
||||
@@ -196,20 +298,18 @@ func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Dige
|
||||
}
|
||||
actual := nw.Writer.Digest()
|
||||
|
||||
// TODO: Handle already exists
|
||||
if err := nw.Writer.Commit(size, expected); err != nil {
|
||||
if !content.IsExists(err) {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return err
|
||||
}
|
||||
if getContentBucket(tx, nw.namespace, actual) != nil {
|
||||
return content.ErrExists("")
|
||||
if getBlobBucket(tx, nw.namespace, actual) != nil {
|
||||
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
|
||||
}
|
||||
// Link into this namespace
|
||||
}
|
||||
|
||||
size = status.Total
|
||||
|
||||
bkt, err := createContentBucket(tx, nw.namespace, actual)
|
||||
bkt, err := createBlobBucket(tx, nw.namespace, actual)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -257,9 +357,9 @@ func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) err
|
||||
}
|
||||
|
||||
return view(ctx, cs.db, func(tx *bolt.Tx) error {
|
||||
bkt := getContentBucket(tx, ns, dgst)
|
||||
bkt := getBlobBucket(tx, ns, dgst)
|
||||
if bkt == nil {
|
||||
return content.ErrNotFound("")
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "content digest %v", dgst)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -29,11 +29,11 @@ func NewSnapshotter(db *bolt.DB, name string, sn snapshot.Snapshotter) snapshot.
|
||||
}
|
||||
}
|
||||
|
||||
func snapshotKey(id uint64, namespace, key string) string {
|
||||
func createKey(id uint64, namespace, key string) string {
|
||||
return fmt.Sprintf("%s/%d/%s", namespace, id, key)
|
||||
}
|
||||
|
||||
func trimName(key string) string {
|
||||
func trimKey(key string) string {
|
||||
parts := strings.SplitN(key, "/", 3)
|
||||
if len(parts) < 3 {
|
||||
return ""
|
||||
@@ -82,9 +82,9 @@ func (s *snapshotter) Stat(ctx context.Context, key string) (snapshot.Info, erro
|
||||
if err != nil {
|
||||
return snapshot.Info{}, err
|
||||
}
|
||||
info.Name = trimName(info.Name)
|
||||
info.Name = trimKey(info.Name)
|
||||
if info.Parent != "" {
|
||||
info.Parent = trimName(info.Parent)
|
||||
info.Parent = trimKey(info.Parent)
|
||||
}
|
||||
|
||||
return info, nil
|
||||
@@ -143,7 +143,7 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bkey = snapshotKey(sid, ns, key)
|
||||
bkey = createKey(sid, ns, key)
|
||||
if err := bkt.Put([]byte(key), []byte(bkey)); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -188,7 +188,7 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
nameKey = snapshotKey(sid, ns, name)
|
||||
nameKey = createKey(sid, ns, name)
|
||||
if err := bkt.Put([]byte(name), []byte(nameKey)); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -259,9 +259,9 @@ func (s *snapshotter) Walk(ctx context.Context, fn func(context.Context, snapsho
|
||||
return err
|
||||
}
|
||||
|
||||
info.Name = trimName(info.Name)
|
||||
info.Name = trimKey(info.Name)
|
||||
if info.Parent != "" {
|
||||
info.Parent = trimName(info.Parent)
|
||||
info.Parent = trimKey(info.Parent)
|
||||
}
|
||||
if err := fn(ctx, info); err != nil {
|
||||
return err
|
||||
|
||||
Reference in New Issue
Block a user