Update metadata image store to be initialized once
The boltdb image store now manages its own transactions when one is not provided, but allows the caller to pass in a transaction through the context. This makes the image store more similar to the content and snapshot stores. Additionally, use the reference to the metadata database to mark the content store as dirty after an image has been deleted. The deletion of an image means a reference to a piece of content is gone and therefore garbage collection should be run to check if any resources can be cleaned up as a result. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
5a54862ae5
commit
89fa154efd
@ -222,7 +222,7 @@ func TestMetadataCollector(t *testing.T) {
|
||||
|
||||
if err := mdb.Update(func(tx *bolt.Tx) error {
|
||||
for _, obj := range objects {
|
||||
node, err := create(obj, tx, cs, sn)
|
||||
node, err := create(obj, tx, NewImageStore(mdb), cs, sn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -297,7 +297,7 @@ func benchmarkTrigger(n int) func(b *testing.B) {
|
||||
|
||||
if err := mdb.Update(func(tx *bolt.Tx) error {
|
||||
for _, obj := range objects {
|
||||
node, err := create(obj, tx, cs, sn)
|
||||
node, err := create(obj, tx, NewImageStore(mdb), cs, sn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -377,7 +377,7 @@ type object struct {
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) {
|
||||
func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) {
|
||||
var (
|
||||
node *gc.Node
|
||||
namespace = "test"
|
||||
@ -430,12 +430,14 @@ func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter)
|
||||
}
|
||||
}
|
||||
case testImage:
|
||||
ctx := WithTransactionContext(ctx, tx)
|
||||
|
||||
image := images.Image{
|
||||
Name: v.name,
|
||||
Target: v.target,
|
||||
Labels: obj.labels,
|
||||
}
|
||||
_, err := NewImageStore(tx).Create(ctx, image)
|
||||
_, err := is.Create(ctx, image)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create image")
|
||||
}
|
||||
|
@ -20,12 +20,12 @@ import (
|
||||
)
|
||||
|
||||
type imageStore struct {
|
||||
tx *bolt.Tx
|
||||
db *DB
|
||||
}
|
||||
|
||||
// NewImageStore returns a store backed by a bolt DB
|
||||
func NewImageStore(tx *bolt.Tx) images.Store {
|
||||
return &imageStore{tx: tx}
|
||||
func NewImageStore(db *DB) images.Store {
|
||||
return &imageStore{db: db}
|
||||
}
|
||||
|
||||
func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error) {
|
||||
@ -36,19 +36,25 @@ func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error)
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
bkt := getImagesBucket(s.tx, namespace)
|
||||
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt := getImagesBucket(tx, namespace)
|
||||
if bkt == nil {
|
||||
return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
ibkt := bkt.Bucket([]byte(name))
|
||||
if ibkt == nil {
|
||||
return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
image.Name = name
|
||||
if err := readImage(&image, ibkt); err != nil {
|
||||
return images.Image{}, errors.Wrapf(err, "image %q", name)
|
||||
return errors.Wrapf(err, "image %q", name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
return image, nil
|
||||
@ -65,13 +71,14 @@ func (s *imageStore) List(ctx context.Context, fs ...string) ([]images.Image, er
|
||||
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
bkt := getImagesBucket(s.tx, namespace)
|
||||
var m []images.Image
|
||||
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt := getImagesBucket(tx, namespace)
|
||||
if bkt == nil {
|
||||
return nil, nil // empty store
|
||||
return nil // empty store
|
||||
}
|
||||
|
||||
var m []images.Image
|
||||
if err := bkt.ForEach(func(k, v []byte) error {
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
var (
|
||||
image = images.Image{
|
||||
Name: string(k),
|
||||
@ -87,6 +94,7 @@ func (s *imageStore) List(ctx context.Context, fs ...string) ([]images.Image, er
|
||||
m = append(m, image)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -100,11 +108,16 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
if err := validateImage(&image); err != nil {
|
||||
return images.Image{}, err
|
||||
return err
|
||||
}
|
||||
|
||||
bkt, err := createImagesBucket(tx, namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return image, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error {
|
||||
ibkt, err := bkt.CreateBucket([]byte(image.Name))
|
||||
if err != nil {
|
||||
if err != bolt.ErrBucketExists {
|
||||
@ -117,7 +130,11 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
|
||||
image.CreatedAt = time.Now().UTC()
|
||||
image.UpdatedAt = image.CreatedAt
|
||||
return writeImage(ibkt, &image)
|
||||
})
|
||||
}); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
return image, nil
|
||||
}
|
||||
|
||||
func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths ...string) (images.Image, error) {
|
||||
@ -131,7 +148,13 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
|
||||
}
|
||||
|
||||
var updated images.Image
|
||||
return updated, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error {
|
||||
|
||||
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt, err := createImagesBucket(tx, namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ibkt := bkt.Bucket([]byte(image.Name))
|
||||
if ibkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", image.Name)
|
||||
@ -180,7 +203,12 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
|
||||
updated.CreatedAt = createdat
|
||||
updated.UpdatedAt = time.Now().UTC()
|
||||
return writeImage(ibkt, &updated)
|
||||
})
|
||||
}); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
return updated, nil
|
||||
|
||||
}
|
||||
|
||||
func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.DeleteOpt) error {
|
||||
@ -189,11 +217,24 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
|
||||
return err
|
||||
}
|
||||
|
||||
return withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error {
|
||||
err := bkt.DeleteBucket([]byte(name))
|
||||
return update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt := getImagesBucket(tx, namespace)
|
||||
if bkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
err = bkt.DeleteBucket([]byte(name))
|
||||
if err == bolt.ErrBucketNotFound {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
// A reference to a piece of content has been removed,
|
||||
// mark content store as dirty for triggering garbage
|
||||
// collection
|
||||
s.db.dirtyL.Lock()
|
||||
s.db.dirtyCS = true
|
||||
s.db.dirtyL.Unlock()
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/filters"
|
||||
"github.com/containerd/containerd/images"
|
||||
@ -18,6 +17,7 @@ import (
|
||||
func TestImagesList(t *testing.T) {
|
||||
ctx, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
store := NewImageStore(NewDB(db, nil, nil))
|
||||
|
||||
testset := map[string]*images.Image{}
|
||||
for i := 0; i < 4; i++ {
|
||||
@ -36,21 +36,15 @@ func TestImagesList(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
now := time.Now()
|
||||
result, err := store.Create(ctx, *testset[id])
|
||||
if err != nil {
|
||||
return err
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &result, now, true)
|
||||
testset[id].UpdatedAt, testset[id].CreatedAt = result.UpdatedAt, result.CreatedAt
|
||||
checkImagesEqual(t, &result, testset[id], "ensure that containers were created as expected for list")
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, testcase := range []struct {
|
||||
@ -102,8 +96,6 @@ func TestImagesList(t *testing.T) {
|
||||
testset = newtestset
|
||||
}
|
||||
|
||||
if err := db.View(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
results, err := store.List(ctx, testcase.filters...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -120,28 +112,17 @@ func TestImagesList(t *testing.T) {
|
||||
for _, result := range results {
|
||||
checkImagesEqual(t, &result, testset[result.Name], "list results did not match")
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// delete everything to test it
|
||||
for id := range testset {
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
return store.Delete(ctx, id)
|
||||
}); err != nil {
|
||||
if err := store.Delete(ctx, id); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// try it again, get NotFound
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
return store.Delete(ctx, id)
|
||||
}); errors.Cause(err) != errdefs.ErrNotFound {
|
||||
if err := store.Delete(ctx, id); errors.Cause(err) != errdefs.ErrNotFound {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
}
|
||||
@ -149,6 +130,7 @@ func TestImagesList(t *testing.T) {
|
||||
func TestImagesCreateUpdateDelete(t *testing.T) {
|
||||
ctx, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
store := NewImageStore(NewDB(db, nil, nil))
|
||||
|
||||
for _, testcase := range []struct {
|
||||
name string
|
||||
@ -383,13 +365,8 @@ func TestImagesCreateUpdateDelete(t *testing.T) {
|
||||
testcase.expected.Target.Digest = testcase.original.Target.Digest
|
||||
}
|
||||
|
||||
done := errors.New("test complete")
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
var (
|
||||
store = NewImageStore(tx)
|
||||
now = time.Now()
|
||||
)
|
||||
|
||||
// Create
|
||||
now := time.Now()
|
||||
created, err := store.Create(ctx, testcase.original)
|
||||
if errors.Cause(err) != testcase.createerr {
|
||||
if testcase.createerr == nil {
|
||||
@ -398,7 +375,7 @@ func TestImagesCreateUpdateDelete(t *testing.T) {
|
||||
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr)
|
||||
}
|
||||
} else if testcase.createerr != nil {
|
||||
return done
|
||||
return
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &created, now, true)
|
||||
@ -409,17 +386,9 @@ func TestImagesCreateUpdateDelete(t *testing.T) {
|
||||
testcase.expected.UpdatedAt = created.UpdatedAt
|
||||
|
||||
checkImagesEqual(t, &created, &testcase.original, "unexpected image on creation")
|
||||
return nil
|
||||
}); err != nil {
|
||||
if err == done {
|
||||
return
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
now := time.Now()
|
||||
store := NewImageStore(tx)
|
||||
// Update
|
||||
now = time.Now()
|
||||
updated, err := store.Update(ctx, testcase.input, testcase.fieldpaths...)
|
||||
if errors.Cause(err) != testcase.cause {
|
||||
if testcase.cause == nil {
|
||||
@ -428,33 +397,20 @@ func TestImagesCreateUpdateDelete(t *testing.T) {
|
||||
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause)
|
||||
}
|
||||
} else if testcase.cause != nil {
|
||||
return done
|
||||
return
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &updated, now, false)
|
||||
testcase.expected.UpdatedAt = updated.UpdatedAt
|
||||
checkImagesEqual(t, &updated, &testcase.expected, "updated failed to get expected result")
|
||||
return nil
|
||||
}); err != nil {
|
||||
if err == done {
|
||||
return
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := db.View(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
// Get
|
||||
result, err := store.Get(ctx, testcase.original.Name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkImagesEqual(t, &result, &testcase.expected, "get after failed to get expected result")
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package images
|
||||
import (
|
||||
gocontext "context"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
imagesapi "github.com/containerd/containerd/api/services/images/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@ -38,7 +37,7 @@ func init() {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewService(m.(*metadata.DB), ic.Events, g.(gcScheduler)), nil
|
||||
return NewService(metadata.NewImageStore(m.(*metadata.DB)), ic.Events, g.(gcScheduler)), nil
|
||||
},
|
||||
})
|
||||
}
|
||||
@ -48,15 +47,15 @@ type gcScheduler interface {
|
||||
}
|
||||
|
||||
type service struct {
|
||||
db *metadata.DB
|
||||
store images.Store
|
||||
gc gcScheduler
|
||||
publisher events.Publisher
|
||||
}
|
||||
|
||||
// NewService returns the GRPC image server
|
||||
func NewService(db *metadata.DB, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer {
|
||||
func NewService(is images.Store, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer {
|
||||
return &service{
|
||||
db: db,
|
||||
store: is,
|
||||
gc: gc,
|
||||
publisher: publisher,
|
||||
}
|
||||
@ -68,31 +67,26 @@ func (s *service) Register(server *grpc.Server) error {
|
||||
}
|
||||
|
||||
func (s *service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) {
|
||||
var resp imagesapi.GetImageResponse
|
||||
|
||||
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error {
|
||||
image, err := store.Get(ctx, req.Name)
|
||||
image, err := s.store.Get(ctx, req.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
imagepb := imageToProto(&image)
|
||||
resp.Image = &imagepb
|
||||
return nil
|
||||
}))
|
||||
return &imagesapi.GetImageResponse{
|
||||
Image: &imagepb,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) {
|
||||
var resp imagesapi.ListImagesResponse
|
||||
|
||||
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error {
|
||||
images, err := store.List(ctx, req.Filters...)
|
||||
images, err := s.store.List(ctx, req.Filters...)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
resp.Images = imagesToProto(images)
|
||||
return nil
|
||||
}))
|
||||
return &imagesapi.ListImagesResponse{
|
||||
Images: imagesToProto(images),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) {
|
||||
@ -105,17 +99,12 @@ func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest)
|
||||
image = imageFromProto(&req.Image)
|
||||
resp imagesapi.CreateImageResponse
|
||||
)
|
||||
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
|
||||
created, err := store.Create(ctx, image)
|
||||
created, err := s.store.Create(ctx, image)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
resp.Image = imageToProto(&created)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
if err := s.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
|
||||
Name: resp.Image.Name,
|
||||
@ -136,25 +125,21 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest)
|
||||
var (
|
||||
image = imageFromProto(&req.Image)
|
||||
resp imagesapi.UpdateImageResponse
|
||||
fieldpaths []string
|
||||
)
|
||||
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
|
||||
var fieldpaths []string
|
||||
|
||||
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
|
||||
for _, path := range req.UpdateMask.Paths {
|
||||
fieldpaths = append(fieldpaths, path)
|
||||
}
|
||||
}
|
||||
|
||||
updated, err := store.Update(ctx, image, fieldpaths...)
|
||||
updated, err := s.store.Update(ctx, image, fieldpaths...)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
resp.Image = imageToProto(&updated)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
if err := s.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
|
||||
Name: resp.Image.Name,
|
||||
@ -168,10 +153,9 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest)
|
||||
|
||||
func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).WithField("name", req.Name).Debugf("delete image")
|
||||
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
|
||||
return errdefs.ToGRPC(store.Delete(ctx, req.Name))
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
|
||||
if err := s.store.Delete(ctx, req.Name); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
if err := s.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
|
||||
@ -188,15 +172,3 @@ func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest)
|
||||
|
||||
return &ptypes.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error {
|
||||
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewImageStore(tx)) }
|
||||
}
|
||||
|
||||
func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
|
||||
return s.db.View(s.withStore(ctx, fn))
|
||||
}
|
||||
|
||||
func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
|
||||
return s.db.Update(s.withStore(ctx, fn))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user