Store container metadata in boltdb buckets

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2017-05-23 17:11:29 -07:00
parent e1ed4a2ea4
commit f98a4f10df
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB

View File

@ -2,7 +2,6 @@ package containers
import ( import (
"context" "context"
"encoding/json"
"github.com/boltdb/bolt" "github.com/boltdb/bolt"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -25,6 +24,11 @@ func IsExists(err error) bool {
var ( var (
bucketKeyStorageVersion = []byte("v1") bucketKeyStorageVersion = []byte("v1")
bucketKeyContainers = []byte("containers") bucketKeyContainers = []byte("containers")
bucketKeyLabels = []byte("labels")
bucketKeyImage = []byte("image")
bucketKeyRuntime = []byte("runtime")
bucketKeySpec = []byte("spec")
bucketKeyRootFS = []byte("rootfs")
) )
type storage struct { type storage struct {
@ -38,19 +42,14 @@ func NewStore(tx *bolt.Tx) Store {
} }
func (s *storage) Get(ctx context.Context, id string) (Container, error) { func (s *storage) Get(ctx context.Context, id string) (Container, error) {
bkt := s.bucket() bkt := getContainerBucket(s.tx, id)
if bkt == nil { if bkt == nil {
return Container{}, errors.Wrap(ErrNotFound, "bucket does not exist") return Container{}, errors.Wrap(ErrNotFound, "bucket does not exist")
} }
cb := bkt.Get([]byte(id)) container := Container{ID: id}
if len(cb) == 0 { if err := readContainer(&container, bkt); err != nil {
return Container{}, errors.Wrap(ErrNotFound, "no content for id") return Container{}, errors.Wrap(err, "failed to read container")
}
var container Container
if err := json.Unmarshal(cb, &container); err != nil {
return Container{}, errors.Wrap(err, "failed to unmarshal container")
} }
return container, nil return container, nil
@ -58,14 +57,19 @@ func (s *storage) Get(ctx context.Context, id string) (Container, error) {
func (s *storage) List(ctx context.Context, filter string) ([]Container, error) { func (s *storage) List(ctx context.Context, filter string) ([]Container, error) {
containers := []Container{} containers := []Container{}
bkt := s.bucket() bkt := getContainersBucket(s.tx)
if bkt == nil { if bkt == nil {
return containers, nil return containers, nil
} }
err := bkt.ForEach(func(k, v []byte) error { err := bkt.ForEach(func(k, v []byte) error {
cbkt := bkt.Bucket(k)
if cbkt == nil {
return nil
}
container := Container{ID: string(k)} container := Container{ID: string(k)}
if err := json.Unmarshal(v, &container); err != nil {
return errors.Wrap(err, "failed to unmarshal container") if err := readContainer(&container, cbkt); err != nil {
return errors.Wrap(err, "failed to read container")
} }
containers = append(containers, container) containers = append(containers, container)
return nil return nil
@ -78,69 +82,140 @@ func (s *storage) List(ctx context.Context, filter string) ([]Container, error)
} }
func (s *storage) Create(ctx context.Context, container Container) (Container, error) { func (s *storage) Create(ctx context.Context, container Container) (Container, error) {
bkt, err := s.createBucket() bkt, err := createContainersBucket(s.tx)
if err != nil { if err != nil {
return Container{}, err return Container{}, err
} }
b := bkt.Get([]byte(container.ID)) cbkt, err := bkt.CreateBucket([]byte(container.ID))
if len(b) > 0 {
return Container{}, errors.Wrap(ErrExists, "content for id already exists")
}
b, err = json.Marshal(container)
if err != nil { if err != nil {
if err == bolt.ErrBucketExists {
err = errors.Wrap(ErrExists, "content for id already exists")
}
return Container{}, err return Container{}, err
} }
return container, bkt.Put([]byte(container.ID), b) if err := writeContainer(&container, cbkt); err != nil {
return Container{}, errors.Wrap(err, "failed to write container")
}
return container, nil
} }
func (s *storage) Update(ctx context.Context, container Container) (Container, error) { func (s *storage) Update(ctx context.Context, container Container) (Container, error) {
bkt, err := s.createBucket() bkt := getContainersBucket(s.tx)
if err != nil { if bkt == nil {
return Container{}, err return Container{}, errors.Wrap(ErrNotFound, "no containers")
} }
b := bkt.Get([]byte(container.ID)) cbkt := bkt.Bucket([]byte(container.ID))
if len(b) == 0 { if cbkt == nil {
return Container{}, errors.Wrap(ErrNotFound, "no content for id") return Container{}, errors.Wrap(ErrNotFound, "no content for id")
} }
b, err = json.Marshal(container) if err := writeContainer(&container, cbkt); err != nil {
if err != nil { return Container{}, errors.Wrap(err, "failed to write container")
return Container{}, err
} }
return container, bkt.Put([]byte(container.ID), b) return container, nil
} }
func (s *storage) Delete(ctx context.Context, id string) error { func (s *storage) Delete(ctx context.Context, id string) error {
bkt, err := s.createBucket() bkt := getContainersBucket(s.tx)
if bkt == nil {
return errors.Wrap(ErrNotFound, "no containers")
}
err := bkt.DeleteBucket([]byte(id))
if err == bolt.ErrBucketNotFound {
return errors.Wrap(ErrNotFound, "no content for id")
}
return err
}
func readContainer(container *Container, bkt *bolt.Bucket) error {
return bkt.ForEach(func(k, v []byte) error {
switch string(k) {
case string(bucketKeyImage):
container.Image = string(v)
case string(bucketKeyRuntime):
container.Runtime = string(v)
case string(bucketKeySpec):
container.Spec = v
case string(bucketKeyRootFS):
container.RootFS = string(v)
case string(bucketKeyLabels):
lbkt := bkt.Bucket(bucketKeyLabels)
if lbkt == nil {
return nil
}
container.Labels = map[string]string{}
if err := lbkt.ForEach(func(k, v []byte) error {
container.Labels[string(k)] = string(v)
return nil
}); err != nil {
return err
}
}
return nil
})
}
func writeContainer(container *Container, bkt *bolt.Bucket) error {
for _, v := range [][2][]byte{
{bucketKeyImage, []byte(container.Image)},
{bucketKeyRuntime, []byte(container.Runtime)},
{bucketKeySpec, container.Spec},
{bucketKeyRootFS, []byte(container.RootFS)},
} {
if err := bkt.Put(v[0], v[1]); err != nil {
return err
}
}
// Remove existing labels to keep from merging
if lbkt := bkt.Bucket(bucketKeyLabels); lbkt != nil {
if err := bkt.DeleteBucket(bucketKeyLabels); err != nil {
return err
}
}
lbkt, err := bkt.CreateBucket(bucketKeyLabels)
if err != nil { if err != nil {
return err return err
} }
for k, v := range container.Labels {
b := bkt.Get([]byte(id)) if err := lbkt.Put([]byte(k), []byte(v)); err != nil {
if len(b) == 0 { return err
return errors.Wrap(ErrNotFound, "no content for id") }
} }
return bkt.Delete([]byte(id))
}
func (s *storage) bucket() *bolt.Bucket {
bkt := s.tx.Bucket(bucketKeyStorageVersion)
if bkt == nil {
return nil return nil
}
return bkt.Bucket(bucketKeyContainers)
} }
func (s *storage) createBucket() (*bolt.Bucket, error) { func createContainersBucket(tx *bolt.Tx) (*bolt.Bucket, error) {
bkt, err := s.tx.CreateBucketIfNotExists(bucketKeyStorageVersion) bkt, err := tx.CreateBucketIfNotExists(bucketKeyStorageVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return bkt.CreateBucketIfNotExists(bucketKeyContainers) return bkt.CreateBucketIfNotExists(bucketKeyContainers)
} }
func getContainersBucket(tx *bolt.Tx) *bolt.Bucket {
return getBucket(tx, bucketKeyStorageVersion, bucketKeyContainers)
}
func getContainerBucket(tx *bolt.Tx, id string) *bolt.Bucket {
return getBucket(tx, bucketKeyStorageVersion, bucketKeyContainers, []byte(id))
}
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {
bkt := tx.Bucket(keys[0])
for _, key := range keys[1:] {
if bkt == nil {
break
}
bkt = bkt.Bucket(key)
}
return bkt
}