From 2c9004d431df0e02eabbb293d8c2be819869ad48 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 23 Jun 2017 13:04:06 -0700 Subject: [PATCH] Add namespace content store in metadata Add a metadata store for content which enforces content is only visible inside a given namespace. Signed-off-by: Derek McGowan --- differ/differ.go | 9 +- metadata/buckets.go | 18 +++ metadata/content.go | 281 ++++++++++++++++++++++++++++++++++++ metadata/images.go | 24 ++- services/content/service.go | 10 +- 5 files changed, 332 insertions(+), 10 deletions(-) create mode 100644 metadata/content.go diff --git a/differ/differ.go b/differ/differ.go index a39cb0375..76fedcced 100644 --- a/differ/differ.go +++ b/differ/differ.go @@ -5,9 +5,11 @@ import ( "io/ioutil" "os" + "github.com/boltdb/bolt" "github.com/containerd/containerd/archive" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" digest "github.com/opencontainers/go-digest" @@ -22,13 +24,18 @@ func init() { ID: "base-diff", Requires: []plugin.PluginType{ plugin.ContentPlugin, + plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { c, err := ic.Get(plugin.ContentPlugin) if err != nil { return nil, err } - return NewBaseDiff(c.(content.Store)) + md, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store))) }, }) } diff --git a/metadata/buckets.go b/metadata/buckets.go index ca555f395..d4be495c1 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -2,6 +2,7 @@ package metadata import ( "github.com/boltdb/bolt" + digest "github.com/opencontainers/go-digest" ) // The layout where a "/" delineates a bucket is desribed in the following @@ -33,6 +34,7 @@ var ( bucketKeyObjectImages = []byte("images") // stores image objects bucketKeyObjectContainers = []byte("containers") // stores container objects bucketKeyObjectSnapshots = []byte("snapshots") // stores snapshot references + bucketKeyObjectContent = []byte("content") // stores content links bucketKeyDigest = []byte("digest") bucketKeyMediaType = []byte("mediatype") @@ -139,3 +141,19 @@ func createSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) (*bolt. func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Bucket { return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots, []byte(snapshotter)) } + +func createContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) (*bolt.Bucket, error) { + bkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String())) + if err != nil { + return nil, err + } + return bkt, nil +} + +func getAllContentBucket(tx *bolt.Tx, namespace string) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent) +} + +func getContentBucket(tx *bolt.Tx, namespace string, dgst digest.Digest) *bolt.Bucket { + return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, []byte(dgst.String())) +} diff --git a/metadata/content.go b/metadata/content.go new file mode 100644 index 000000000..e53e01457 --- /dev/null +++ b/metadata/content.go @@ -0,0 +1,281 @@ +package metadata + +import ( + "context" + "encoding/binary" + "io" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/namespaces" + digest "github.com/opencontainers/go-digest" +) + +type contentStore struct { + content.Store + db *bolt.DB +} + +// NewContentStore returns a namespaced content store using an existing +// content store interface. +func NewContentStore(db *bolt.DB, cs content.Store) content.Store { + return &contentStore{ + Store: cs, + db: db, + } +} + +func (cs *contentStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return content.Info{}, err + } + + var info content.Info + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, dgst) + if bkt == nil { + return content.ErrNotFound("") + } + + info.Digest = dgst + return readInfo(&info, bkt) + }); err != nil { + return content.Info{}, err + } + + return info, nil +} + +func (cs *contentStore) Walk(ctx context.Context, fn content.WalkFunc) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + // TODO: Batch results to keep from reading all info into memory + var infos []content.Info + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getAllContentBucket(tx, ns) + if bkt == nil { + return nil + } + + return bkt.ForEach(func(k, v []byte) error { + dgst, err := digest.Parse(string(k)) + if err != nil { + return nil + } + info := content.Info{ + Digest: dgst, + } + if err := readInfo(&info, bkt.Bucket(k)); err != nil { + return err + } + infos = append(infos, info) + return nil + }) + }); err != nil { + return err + } + + for _, info := range infos { + if err := fn(info); err != nil { + return err + } + } + + return nil +} + +func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return update(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, dgst) + if bkt == nil { + return content.ErrNotFound("") + } + + // Just remove local reference, garbage collector is responsible for + // cleaning up on disk content + return getAllContentBucket(tx, ns).Delete([]byte(dgst.String())) + }) +} + +func (cs *contentStore) Status(ctx context.Context, re string) ([]content.Status, error) { + _, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + // TODO: Read status keys and match + + return cs.Store.Status(ctx, re) +} + +func (cs *contentStore) Abort(ctx context.Context, ref string) error { + _, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + // TODO: Read status key and delete + + return cs.Store.Abort(ctx, ref) +} + +func (cs *contentStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return nil, err + } + + // TODO: Create ref key + + if expected != "" { + if err := view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, expected) + if bkt != nil { + return content.ErrExists("") + } + return nil + }); err != nil { + return nil, err + } + } + + // Do not use the passed in expected value here since it was + // already checked against the user metadata. If the content + // store has the content, it must still be written before + // linked into the given namespace. It is possible in the future + // to allow content which exists in content store but not + // namespace to be linked here and returned an exist error, but + // this would require more configuration to make secure. + w, err := cs.Store.Writer(ctx, ref, size, "") + if err != nil { + return nil, err + } + + // TODO: keep the expected in the writer to use on commit + // when no expected is provided there. + return &namespacedWriter{ + Writer: w, + namespace: ns, + db: cs.db, + }, nil +} + +type namespacedWriter struct { + content.Writer + namespace string + db *bolt.DB +} + +func (nw *namespacedWriter) Commit(size int64, expected digest.Digest) error { + tx, err := nw.db.Begin(true) + if err != nil { + return err + } + + if err := nw.commit(tx, size, expected); err != nil { + tx.Rollback() + return err + } + + return tx.Commit() +} + +func (nw *namespacedWriter) commit(tx *bolt.Tx, size int64, expected digest.Digest) error { + status, err := nw.Writer.Status() + if err != nil { + return err + } + actual := nw.Writer.Digest() + + // TODO: Handle already exists + if err := nw.Writer.Commit(size, expected); err != nil { + if !content.IsExists(err) { + return err + } + if getContentBucket(tx, nw.namespace, actual) != nil { + return content.ErrExists("") + } + // Link into this namespace + } + + size = status.Total + + bkt, err := createContentBucket(tx, nw.namespace, actual) + if err != nil { + return err + } + + sizeEncoded, err := encodeSize(size) + if err != nil { + return err + } + + timeEncoded, err := status.UpdatedAt.MarshalBinary() + if err != nil { + return err + } + + for _, v := range [][2][]byte{ + {bucketKeyCreatedAt, timeEncoded}, + {bucketKeySize, sizeEncoded}, + } { + if err := bkt.Put(v[0], v[1]); err != nil { + return err + } + } + + return nil +} + +func (cs *contentStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + if err := cs.checkAccess(ctx, dgst); err != nil { + return nil, err + } + return cs.Store.Reader(ctx, dgst) +} + +func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { + if err := cs.checkAccess(ctx, dgst); err != nil { + return nil, err + } + return cs.Store.ReaderAt(ctx, dgst) +} + +func (cs *contentStore) checkAccess(ctx context.Context, dgst digest.Digest) error { + ns, err := namespaces.NamespaceRequired(ctx) + if err != nil { + return err + } + + return view(ctx, cs.db, func(tx *bolt.Tx) error { + bkt := getContentBucket(tx, ns, dgst) + if bkt == nil { + return content.ErrNotFound("") + } + return nil + }) +} + +func readInfo(info *content.Info, bkt *bolt.Bucket) error { + return bkt.ForEach(func(k, v []byte) error { + switch string(k) { + case string(bucketKeyCreatedAt): + if err := info.CommittedAt.UnmarshalBinary(v); err != nil { + return err + } + case string(bucketKeySize): + info.Size, _ = binary.Varint(v) + } + // TODO: Read labels + return nil + }) +} diff --git a/metadata/images.go b/metadata/images.go index 47fc4c167..adb4eac59 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -243,14 +243,9 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return err } - var ( - buf [binary.MaxVarintLen64]byte - sizeEncoded []byte = buf[:] - ) - sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, image.Target.Size)] - - if len(sizeEncoded) == 0 { - return fmt.Errorf("failed encoding size = %v", image.Target.Size) + sizeEncoded, err := encodeSize(image.Target.Size) + if err != nil { + return err } for _, v := range [][2][]byte{ @@ -265,3 +260,16 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return nil } + +func encodeSize(size int64) ([]byte, error) { + var ( + buf [binary.MaxVarintLen64]byte + sizeEncoded []byte = buf[:] + ) + sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)] + + if len(sizeEncoded) == 0 { + return nil, fmt.Errorf("failed encoding size = %v", size) + } + return sizeEncoded, nil +} diff --git a/services/content/service.go b/services/content/service.go index 322d956da..62f1997e7 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -5,12 +5,14 @@ import ( "sync" "github.com/Sirupsen/logrus" + "github.com/boltdb/bolt" api "github.com/containerd/containerd/api/services/content/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" "github.com/golang/protobuf/ptypes/empty" digest "github.com/opencontainers/go-digest" @@ -39,6 +41,7 @@ func init() { ID: "content", Requires: []plugin.PluginType{ plugin.ContentPlugin, + plugin.MetadataPlugin, }, Init: NewService, }) @@ -49,8 +52,13 @@ func NewService(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } + m, err := ic.Get(plugin.MetadataPlugin) + if err != nil { + return nil, err + } + cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store)) return &Service{ - store: c.(content.Store), + store: cs, emitter: events.GetPoster(ic.Context), }, nil }