metadata: merge storage into package
The implementations for the storage of metadata have been merged into a single metadata package where they can share storage primitives and techniques. The is a requisite for the addition of namespaces, which will require a coordinated layout for records to be organized by namespace. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
107
metadata/buckets.go
Normal file
107
metadata/buckets.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/log"
|
||||
)
|
||||
|
||||
var (
|
||||
bucketKeyStorageVersion = []byte("v1")
|
||||
bucketKeyImages = []byte("images")
|
||||
bucketKeyContainers = []byte("containers")
|
||||
|
||||
bucketKeyDigest = []byte("digest")
|
||||
bucketKeyMediaType = []byte("mediatype")
|
||||
bucketKeySize = []byte("size")
|
||||
bucketKeyLabels = []byte("labels")
|
||||
bucketKeyImage = []byte("image")
|
||||
bucketKeyRuntime = []byte("runtime")
|
||||
bucketKeySpec = []byte("spec")
|
||||
bucketKeyRootFS = []byte("rootfs")
|
||||
)
|
||||
|
||||
// InitDB will initialize the database for use. The database must be opened for
|
||||
// write and the caller must not be holding an open transaction.
|
||||
func InitDB(db *bolt.DB) error {
|
||||
log.L.Debug("init db")
|
||||
return db.Update(func(tx *bolt.Tx) error {
|
||||
if _, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyImages); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := createBucketIfNotExists(tx, bucketKeyStorageVersion, bucketKeyContainers); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
|
||||
bkt := tx.Bucket(keys[0])
|
||||
|
||||
for _, key := range keys[1:] {
|
||||
if bkt == nil {
|
||||
break
|
||||
}
|
||||
bkt = bkt.Bucket(key)
|
||||
}
|
||||
|
||||
return bkt
|
||||
}
|
||||
|
||||
func createBucketIfNotExists(tx *bolt.Tx, keys ...[]byte) (*bolt.Bucket, error) {
|
||||
bkt, err := tx.CreateBucketIfNotExists(keys[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, key := range keys[1:] {
|
||||
bkt, err = bkt.CreateBucketIfNotExists(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
func withImagesBucket(tx *bolt.Tx, fn func(bkt *bolt.Bucket) error) error {
|
||||
bkt := getImagesBucket(tx)
|
||||
if bkt == nil {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
return fn(bkt)
|
||||
}
|
||||
|
||||
func withImageBucket(tx *bolt.Tx, name string, fn func(bkt *bolt.Bucket) error) error {
|
||||
bkt := getImageBucket(tx, name)
|
||||
if bkt == nil {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
return fn(bkt)
|
||||
}
|
||||
|
||||
func getImagesBucket(tx *bolt.Tx) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyStorageVersion, bucketKeyImages)
|
||||
}
|
||||
|
||||
func getImageBucket(tx *bolt.Tx, name string) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyStorageVersion, bucketKeyImages, []byte(name))
|
||||
}
|
||||
|
||||
func createContainersBucket(tx *bolt.Tx) (*bolt.Bucket, error) {
|
||||
bkt, err := tx.CreateBucketIfNotExists(bucketKeyStorageVersion)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bkt.CreateBucketIfNotExists(bucketKeyContainers)
|
||||
}
|
||||
|
||||
func getContainersBucket(tx *bolt.Tx) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyStorageVersion, bucketKeyContainers)
|
||||
}
|
||||
|
||||
func getContainerBucket(tx *bolt.Tx, id string) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyStorageVersion, bucketKeyContainers, []byte(id))
|
||||
}
|
||||
171
metadata/containers.go
Normal file
171
metadata/containers.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type containerStore struct {
|
||||
tx *bolt.Tx
|
||||
}
|
||||
|
||||
func NewContainerStore(tx *bolt.Tx) containers.Store {
|
||||
return &containerStore{
|
||||
tx: tx,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *containerStore) Get(ctx context.Context, id string) (containers.Container, error) {
|
||||
bkt := getContainerBucket(s.tx, id)
|
||||
if bkt == nil {
|
||||
return containers.Container{}, errors.Wrap(ErrNotFound, "bucket does not exist")
|
||||
}
|
||||
|
||||
container := containers.Container{ID: id}
|
||||
if err := readContainer(&container, bkt); err != nil {
|
||||
return containers.Container{}, errors.Wrap(err, "failed to read container")
|
||||
}
|
||||
|
||||
return container, nil
|
||||
}
|
||||
|
||||
func (s *containerStore) List(ctx context.Context, filter string) ([]containers.Container, error) {
|
||||
var (
|
||||
m = []containers.Container{}
|
||||
bkt = getContainersBucket(s.tx)
|
||||
)
|
||||
if bkt == nil {
|
||||
return m, nil
|
||||
}
|
||||
err := bkt.ForEach(func(k, v []byte) error {
|
||||
cbkt := bkt.Bucket(k)
|
||||
if cbkt == nil {
|
||||
return nil
|
||||
}
|
||||
container := containers.Container{ID: string(k)}
|
||||
|
||||
if err := readContainer(&container, cbkt); err != nil {
|
||||
return errors.Wrap(err, "failed to read container")
|
||||
}
|
||||
m = append(m, container)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (s *containerStore) Create(ctx context.Context, container containers.Container) (containers.Container, error) {
|
||||
bkt, err := createContainersBucket(s.tx)
|
||||
if err != nil {
|
||||
return containers.Container{}, err
|
||||
}
|
||||
|
||||
cbkt, err := bkt.CreateBucket([]byte(container.ID))
|
||||
if err != nil {
|
||||
if err == bolt.ErrBucketExists {
|
||||
err = errors.Wrap(ErrExists, "content for id already exists")
|
||||
}
|
||||
return containers.Container{}, err
|
||||
}
|
||||
|
||||
if err := writeContainer(&container, cbkt); err != nil {
|
||||
return containers.Container{}, errors.Wrap(err, "failed to write container")
|
||||
}
|
||||
|
||||
return container, nil
|
||||
}
|
||||
|
||||
func (s *containerStore) Update(ctx context.Context, container containers.Container) (containers.Container, error) {
|
||||
bkt := getContainersBucket(s.tx)
|
||||
if bkt == nil {
|
||||
return containers.Container{}, errors.Wrap(ErrNotFound, "no containers")
|
||||
}
|
||||
|
||||
cbkt := bkt.Bucket([]byte(container.ID))
|
||||
if cbkt == nil {
|
||||
return containers.Container{}, errors.Wrap(ErrNotFound, "no content for id")
|
||||
}
|
||||
|
||||
if err := writeContainer(&container, cbkt); err != nil {
|
||||
return containers.Container{}, errors.Wrap(err, "failed to write container")
|
||||
}
|
||||
|
||||
return container, nil
|
||||
}
|
||||
|
||||
func (s *containerStore) Delete(ctx context.Context, id string) error {
|
||||
bkt := getContainersBucket(s.tx)
|
||||
if bkt == nil {
|
||||
return errors.Wrap(ErrNotFound, "no containers")
|
||||
}
|
||||
|
||||
err := bkt.DeleteBucket([]byte(id))
|
||||
if err == bolt.ErrBucketNotFound {
|
||||
return errors.Wrap(ErrNotFound, "no content for id")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func readContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
switch string(k) {
|
||||
case string(bucketKeyImage):
|
||||
container.Image = string(v)
|
||||
case string(bucketKeyRuntime):
|
||||
container.Runtime = string(v)
|
||||
case string(bucketKeySpec):
|
||||
container.Spec = v
|
||||
case string(bucketKeyRootFS):
|
||||
container.RootFS = string(v)
|
||||
case string(bucketKeyLabels):
|
||||
lbkt := bkt.Bucket(bucketKeyLabels)
|
||||
if lbkt == nil {
|
||||
return nil
|
||||
}
|
||||
container.Labels = map[string]string{}
|
||||
if err := lbkt.ForEach(func(k, v []byte) error {
|
||||
container.Labels[string(k)] = string(v)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func writeContainer(container *containers.Container, bkt *bolt.Bucket) error {
|
||||
for _, v := range [][2][]byte{
|
||||
{bucketKeyImage, []byte(container.Image)},
|
||||
{bucketKeyRuntime, []byte(container.Runtime)},
|
||||
{bucketKeySpec, container.Spec},
|
||||
{bucketKeyRootFS, []byte(container.RootFS)},
|
||||
} {
|
||||
if err := bkt.Put(v[0], v[1]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// Remove existing labels to keep from merging
|
||||
if lbkt := bkt.Bucket(bucketKeyLabels); lbkt != nil {
|
||||
if err := bkt.DeleteBucket(bucketKeyLabels); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
lbkt, err := bkt.CreateBucket(bucketKeyLabels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for k, v := range container.Labels {
|
||||
if err := lbkt.Put([]byte(k), []byte(v)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
17
metadata/errors.go
Normal file
17
metadata/errors.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package metadata
|
||||
|
||||
import "github.com/pkg/errors"
|
||||
|
||||
var (
|
||||
ErrExists = errors.New("metadata: exists")
|
||||
ErrNotFound = errors.New("metadata: not found")
|
||||
)
|
||||
|
||||
// IsNotFound returns true if the error is due to a missing image.
|
||||
func IsNotFound(err error) bool {
|
||||
return errors.Cause(err) == ErrNotFound
|
||||
}
|
||||
|
||||
func IsExists(err error) bool {
|
||||
return errors.Cause(err) == ErrExists
|
||||
}
|
||||
120
metadata/images.go
Normal file
120
metadata/images.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/images"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
type imageStore struct {
|
||||
tx *bolt.Tx
|
||||
}
|
||||
|
||||
func NewImageStore(tx *bolt.Tx) images.Store {
|
||||
return &imageStore{tx: tx}
|
||||
}
|
||||
|
||||
func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error) {
|
||||
var image images.Image
|
||||
if err := withImageBucket(s.tx, name, func(bkt *bolt.Bucket) error {
|
||||
image.Name = name
|
||||
return readImage(&image, bkt)
|
||||
}); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
return image, nil
|
||||
}
|
||||
|
||||
func (s *imageStore) Put(ctx context.Context, name string, desc ocispec.Descriptor) error {
|
||||
return withImagesBucket(s.tx, func(bkt *bolt.Bucket) error {
|
||||
ibkt, err := bkt.CreateBucketIfNotExists([]byte(name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
buf [binary.MaxVarintLen64]byte
|
||||
sizeEncoded []byte = buf[:]
|
||||
)
|
||||
sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, desc.Size)]
|
||||
|
||||
if len(sizeEncoded) == 0 {
|
||||
return fmt.Errorf("failed encoding size = %v", desc.Size)
|
||||
}
|
||||
|
||||
for _, v := range [][2][]byte{
|
||||
{bucketKeyDigest, []byte(desc.Digest)},
|
||||
{bucketKeyMediaType, []byte(desc.MediaType)},
|
||||
{bucketKeySize, sizeEncoded},
|
||||
} {
|
||||
if err := ibkt.Put(v[0], v[1]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *imageStore) List(ctx context.Context) ([]images.Image, error) {
|
||||
var m []images.Image
|
||||
|
||||
if err := withImagesBucket(s.tx, func(bkt *bolt.Bucket) error {
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
var (
|
||||
image = images.Image{
|
||||
Name: string(k),
|
||||
}
|
||||
kbkt = bkt.Bucket(k)
|
||||
)
|
||||
|
||||
if err := readImage(&image, kbkt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
m = append(m, image)
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (s *imageStore) Delete(ctx context.Context, name string) error {
|
||||
return withImagesBucket(s.tx, func(bkt *bolt.Bucket) error {
|
||||
err := bkt.DeleteBucket([]byte(name))
|
||||
if err == bolt.ErrBucketNotFound {
|
||||
return ErrNotFound
|
||||
}
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func readImage(image *images.Image, bkt *bolt.Bucket) error {
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
if v == nil {
|
||||
return nil // skip it? a bkt maybe?
|
||||
}
|
||||
|
||||
// TODO(stevvooe): This is why we need to use byte values for
|
||||
// keys, rather than full arrays.
|
||||
switch string(k) {
|
||||
case string(bucketKeyDigest):
|
||||
image.Target.Digest = digest.Digest(v)
|
||||
case string(bucketKeyMediaType):
|
||||
image.Target.MediaType = string(v)
|
||||
case string(bucketKeySize):
|
||||
image.Target.Size, _ = binary.Varint(v)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user