Merge pull request #1960 from dmcgowan/images-removal-dirty
metadata: image removal triggers GC
This commit is contained in:
commit
acc6011ac1
@ -106,13 +106,8 @@ func imagesBucketPath(namespace string) [][]byte {
|
||||
return [][]byte{bucketKeyVersion, []byte(namespace), bucketKeyObjectImages}
|
||||
}
|
||||
|
||||
func withImagesBucket(tx *bolt.Tx, namespace string, fn func(bkt *bolt.Bucket) error) error {
|
||||
bkt, err := createBucketIfNotExists(tx, imagesBucketPath(namespace)...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return fn(bkt)
|
||||
func createImagesBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
|
||||
return createBucketIfNotExists(tx, imagesBucketPath(namespace)...)
|
||||
}
|
||||
|
||||
func getImagesBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
|
||||
@ -143,6 +138,10 @@ func createSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) (*bolt.
|
||||
return bkt, nil
|
||||
}
|
||||
|
||||
func getSnapshottersBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots)
|
||||
}
|
||||
|
||||
func getSnapshotterBucket(tx *bolt.Tx, namespace, snapshotter string) *bolt.Bucket {
|
||||
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectSnapshots, []byte(snapshotter))
|
||||
}
|
||||
|
@ -261,7 +261,7 @@ func TestMetadataCollector(t *testing.T) {
|
||||
|
||||
if err := mdb.Update(func(tx *bolt.Tx) error {
|
||||
for _, obj := range objects {
|
||||
node, err := create(obj, tx, cs, sn)
|
||||
node, err := create(obj, tx, NewImageStore(mdb), cs, sn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -336,7 +336,7 @@ func benchmarkTrigger(n int) func(b *testing.B) {
|
||||
|
||||
if err := mdb.Update(func(tx *bolt.Tx) error {
|
||||
for _, obj := range objects {
|
||||
node, err := create(obj, tx, cs, sn)
|
||||
node, err := create(obj, tx, NewImageStore(mdb), cs, sn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -416,7 +416,7 @@ type object struct {
|
||||
labels map[string]string
|
||||
}
|
||||
|
||||
func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) {
|
||||
func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) {
|
||||
var (
|
||||
node *gc.Node
|
||||
namespace = "test"
|
||||
@ -469,12 +469,14 @@ func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter)
|
||||
}
|
||||
}
|
||||
case testImage:
|
||||
ctx := WithTransactionContext(ctx, tx)
|
||||
|
||||
image := images.Image{
|
||||
Name: v.name,
|
||||
Target: v.target,
|
||||
Labels: obj.labels,
|
||||
}
|
||||
_, err := NewImageStore(tx).Create(ctx, image)
|
||||
_, err := is.Create(ctx, image)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create image")
|
||||
}
|
||||
|
@ -20,12 +20,12 @@ import (
|
||||
)
|
||||
|
||||
type imageStore struct {
|
||||
tx *bolt.Tx
|
||||
db *DB
|
||||
}
|
||||
|
||||
// NewImageStore returns a store backed by a bolt DB
|
||||
func NewImageStore(tx *bolt.Tx) images.Store {
|
||||
return &imageStore{tx: tx}
|
||||
func NewImageStore(db *DB) images.Store {
|
||||
return &imageStore{db: db}
|
||||
}
|
||||
|
||||
func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error) {
|
||||
@ -36,19 +36,25 @@ func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error)
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
bkt := getImagesBucket(s.tx, namespace)
|
||||
if bkt == nil {
|
||||
return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt := getImagesBucket(tx, namespace)
|
||||
if bkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
ibkt := bkt.Bucket([]byte(name))
|
||||
if ibkt == nil {
|
||||
return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
ibkt := bkt.Bucket([]byte(name))
|
||||
if ibkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
image.Name = name
|
||||
if err := readImage(&image, ibkt); err != nil {
|
||||
return images.Image{}, errors.Wrapf(err, "image %q", name)
|
||||
image.Name = name
|
||||
if err := readImage(&image, ibkt); err != nil {
|
||||
return errors.Wrapf(err, "image %q", name)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
return image, nil
|
||||
@ -65,28 +71,30 @@ func (s *imageStore) List(ctx context.Context, fs ...string) ([]images.Image, er
|
||||
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
bkt := getImagesBucket(s.tx, namespace)
|
||||
if bkt == nil {
|
||||
return nil, nil // empty store
|
||||
}
|
||||
|
||||
var m []images.Image
|
||||
if err := bkt.ForEach(func(k, v []byte) error {
|
||||
var (
|
||||
image = images.Image{
|
||||
Name: string(k),
|
||||
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt := getImagesBucket(tx, namespace)
|
||||
if bkt == nil {
|
||||
return nil // empty store
|
||||
}
|
||||
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
var (
|
||||
image = images.Image{
|
||||
Name: string(k),
|
||||
}
|
||||
kbkt = bkt.Bucket(k)
|
||||
)
|
||||
|
||||
if err := readImage(&image, kbkt); err != nil {
|
||||
return err
|
||||
}
|
||||
kbkt = bkt.Bucket(k)
|
||||
)
|
||||
|
||||
if err := readImage(&image, kbkt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if filter.Match(adaptImage(image)) {
|
||||
m = append(m, image)
|
||||
}
|
||||
return nil
|
||||
if filter.Match(adaptImage(image)) {
|
||||
m = append(m, image)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -100,11 +108,16 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
if err := validateImage(&image); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
if err := validateImage(&image); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bkt, err := createImagesBucket(tx, namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return image, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error {
|
||||
ibkt, err := bkt.CreateBucket([]byte(image.Name))
|
||||
if err != nil {
|
||||
if err != bolt.ErrBucketExists {
|
||||
@ -117,7 +130,11 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima
|
||||
image.CreatedAt = time.Now().UTC()
|
||||
image.UpdatedAt = image.CreatedAt
|
||||
return writeImage(ibkt, &image)
|
||||
})
|
||||
}); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
return image, nil
|
||||
}
|
||||
|
||||
func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths ...string) (images.Image, error) {
|
||||
@ -131,7 +148,13 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
|
||||
}
|
||||
|
||||
var updated images.Image
|
||||
return updated, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error {
|
||||
|
||||
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt, err := createImagesBucket(tx, namespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ibkt := bkt.Bucket([]byte(image.Name))
|
||||
if ibkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", image.Name)
|
||||
@ -180,7 +203,12 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths
|
||||
updated.CreatedAt = createdat
|
||||
updated.UpdatedAt = time.Now().UTC()
|
||||
return writeImage(ibkt, &updated)
|
||||
})
|
||||
}); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
return updated, nil
|
||||
|
||||
}
|
||||
|
||||
func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.DeleteOpt) error {
|
||||
@ -189,11 +217,24 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
|
||||
return err
|
||||
}
|
||||
|
||||
return withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error {
|
||||
err := bkt.DeleteBucket([]byte(name))
|
||||
return update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||
bkt := getImagesBucket(tx, namespace)
|
||||
if bkt == nil {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
err = bkt.DeleteBucket([]byte(name))
|
||||
if err == bolt.ErrBucketNotFound {
|
||||
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
|
||||
}
|
||||
|
||||
// A reference to a piece of content has been removed,
|
||||
// mark content store as dirty for triggering garbage
|
||||
// collection
|
||||
s.db.dirtyL.Lock()
|
||||
s.db.dirtyCS = true
|
||||
s.db.dirtyL.Unlock()
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/filters"
|
||||
"github.com/containerd/containerd/images"
|
||||
@ -18,6 +17,7 @@ import (
|
||||
func TestImagesList(t *testing.T) {
|
||||
ctx, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
store := NewImageStore(NewDB(db, nil, nil))
|
||||
|
||||
testset := map[string]*images.Image{}
|
||||
for i := 0; i < 4; i++ {
|
||||
@ -36,21 +36,15 @@ func TestImagesList(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
now := time.Now()
|
||||
result, err := store.Create(ctx, *testset[id])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &result, now, true)
|
||||
testset[id].UpdatedAt, testset[id].CreatedAt = result.UpdatedAt, result.CreatedAt
|
||||
checkImagesEqual(t, &result, testset[id], "ensure that containers were created as expected for list")
|
||||
return nil
|
||||
}); err != nil {
|
||||
now := time.Now()
|
||||
result, err := store.Create(ctx, *testset[id])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &result, now, true)
|
||||
testset[id].UpdatedAt, testset[id].CreatedAt = result.UpdatedAt, result.CreatedAt
|
||||
checkImagesEqual(t, &result, testset[id], "ensure that containers were created as expected for list")
|
||||
}
|
||||
|
||||
for _, testcase := range []struct {
|
||||
@ -102,46 +96,33 @@ func TestImagesList(t *testing.T) {
|
||||
testset = newtestset
|
||||
}
|
||||
|
||||
if err := db.View(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
results, err := store.List(ctx, testcase.filters...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(results) == 0 { // all tests return a non-empty result set
|
||||
t.Fatalf("no results returned")
|
||||
}
|
||||
|
||||
if len(results) != len(testset) {
|
||||
t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset))
|
||||
}
|
||||
|
||||
for _, result := range results {
|
||||
checkImagesEqual(t, &result, testset[result.Name], "list results did not match")
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
results, err := store.List(ctx, testcase.filters...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(results) == 0 { // all tests return a non-empty result set
|
||||
t.Fatalf("no results returned")
|
||||
}
|
||||
|
||||
if len(results) != len(testset) {
|
||||
t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset))
|
||||
}
|
||||
|
||||
for _, result := range results {
|
||||
checkImagesEqual(t, &result, testset[result.Name], "list results did not match")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// delete everything to test it
|
||||
for id := range testset {
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
return store.Delete(ctx, id)
|
||||
}); err != nil {
|
||||
if err := store.Delete(ctx, id); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// try it again, get NotFound
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
return store.Delete(ctx, id)
|
||||
}); errors.Cause(err) != errdefs.ErrNotFound {
|
||||
if err := store.Delete(ctx, id); errors.Cause(err) != errdefs.ErrNotFound {
|
||||
t.Fatalf("unexpected error %v", err)
|
||||
}
|
||||
}
|
||||
@ -149,6 +130,7 @@ func TestImagesList(t *testing.T) {
|
||||
func TestImagesCreateUpdateDelete(t *testing.T) {
|
||||
ctx, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
store := NewImageStore(NewDB(db, nil, nil))
|
||||
|
||||
for _, testcase := range []struct {
|
||||
name string
|
||||
@ -383,78 +365,52 @@ func TestImagesCreateUpdateDelete(t *testing.T) {
|
||||
testcase.expected.Target.Digest = testcase.original.Target.Digest
|
||||
}
|
||||
|
||||
done := errors.New("test complete")
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
var (
|
||||
store = NewImageStore(tx)
|
||||
now = time.Now()
|
||||
)
|
||||
|
||||
created, err := store.Create(ctx, testcase.original)
|
||||
if errors.Cause(err) != testcase.createerr {
|
||||
if testcase.createerr == nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else {
|
||||
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr)
|
||||
}
|
||||
} else if testcase.createerr != nil {
|
||||
return done
|
||||
// Create
|
||||
now := time.Now()
|
||||
created, err := store.Create(ctx, testcase.original)
|
||||
if errors.Cause(err) != testcase.createerr {
|
||||
if testcase.createerr == nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else {
|
||||
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr)
|
||||
}
|
||||
} else if testcase.createerr != nil {
|
||||
return
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &created, now, true)
|
||||
checkImageTimestamps(t, &created, now, true)
|
||||
|
||||
testcase.original.CreatedAt = created.CreatedAt
|
||||
testcase.expected.CreatedAt = created.CreatedAt
|
||||
testcase.original.UpdatedAt = created.UpdatedAt
|
||||
testcase.expected.UpdatedAt = created.UpdatedAt
|
||||
testcase.original.CreatedAt = created.CreatedAt
|
||||
testcase.expected.CreatedAt = created.CreatedAt
|
||||
testcase.original.UpdatedAt = created.UpdatedAt
|
||||
testcase.expected.UpdatedAt = created.UpdatedAt
|
||||
|
||||
checkImagesEqual(t, &created, &testcase.original, "unexpected image on creation")
|
||||
return nil
|
||||
}); err != nil {
|
||||
if err == done {
|
||||
return
|
||||
checkImagesEqual(t, &created, &testcase.original, "unexpected image on creation")
|
||||
|
||||
// Update
|
||||
now = time.Now()
|
||||
updated, err := store.Update(ctx, testcase.input, testcase.fieldpaths...)
|
||||
if errors.Cause(err) != testcase.cause {
|
||||
if testcase.cause == nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else {
|
||||
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause)
|
||||
}
|
||||
} else if testcase.cause != nil {
|
||||
return
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &updated, now, false)
|
||||
testcase.expected.UpdatedAt = updated.UpdatedAt
|
||||
checkImagesEqual(t, &updated, &testcase.expected, "updated failed to get expected result")
|
||||
|
||||
// Get
|
||||
result, err := store.Get(ctx, testcase.original.Name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
now := time.Now()
|
||||
store := NewImageStore(tx)
|
||||
updated, err := store.Update(ctx, testcase.input, testcase.fieldpaths...)
|
||||
if errors.Cause(err) != testcase.cause {
|
||||
if testcase.cause == nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
} else {
|
||||
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause)
|
||||
}
|
||||
} else if testcase.cause != nil {
|
||||
return done
|
||||
}
|
||||
|
||||
checkImageTimestamps(t, &updated, now, false)
|
||||
testcase.expected.UpdatedAt = updated.UpdatedAt
|
||||
checkImagesEqual(t, &updated, &testcase.expected, "updated failed to get expected result")
|
||||
return nil
|
||||
}); err != nil {
|
||||
if err == done {
|
||||
return
|
||||
}
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := db.View(func(tx *bolt.Tx) error {
|
||||
store := NewImageStore(tx)
|
||||
result, err := store.Get(ctx, testcase.original.Name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkImagesEqual(t, &result, &testcase.expected, "get after failed to get expected result")
|
||||
return nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
checkImagesEqual(t, &result, &testcase.expected, "get after failed to get expected result")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -133,31 +133,38 @@ func (s *namespaceStore) Delete(ctx context.Context, namespace string) error {
|
||||
}
|
||||
|
||||
func (s *namespaceStore) namespaceEmpty(ctx context.Context, namespace string) (bool, error) {
|
||||
ctx = namespaces.WithNamespace(ctx, namespace)
|
||||
|
||||
// need to check the various object stores.
|
||||
|
||||
imageStore := NewImageStore(s.tx)
|
||||
images, err := imageStore.List(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
// Get all data buckets
|
||||
buckets := []*bolt.Bucket{
|
||||
getImagesBucket(s.tx, namespace),
|
||||
getBlobsBucket(s.tx, namespace),
|
||||
getContainersBucket(s.tx, namespace),
|
||||
}
|
||||
if len(images) > 0 {
|
||||
return false, nil
|
||||
if snbkt := getSnapshottersBucket(s.tx, namespace); snbkt != nil {
|
||||
if err := snbkt.ForEach(func(k, v []byte) error {
|
||||
if v == nil {
|
||||
buckets = append(buckets, snbkt.Bucket(k))
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
containerStore := NewContainerStore(s.tx)
|
||||
containers, err := containerStore.List(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
// Ensure data buckets are empty
|
||||
for _, bkt := range buckets {
|
||||
if !isBucketEmpty(bkt) {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
if len(containers) > 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// TODO(stevvooe): Need to add check for content store, as well. Still need
|
||||
// to make content store namespace aware.
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func isBucketEmpty(bkt *bolt.Bucket) bool {
|
||||
if bkt == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
k, _ := bkt.Cursor().First()
|
||||
return k == nil
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package images
|
||||
import (
|
||||
gocontext "context"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
eventstypes "github.com/containerd/containerd/api/events"
|
||||
imagesapi "github.com/containerd/containerd/api/services/images/v1"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@ -38,7 +37,7 @@ func init() {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewService(m.(*metadata.DB), ic.Events, g.(gcScheduler)), nil
|
||||
return NewService(metadata.NewImageStore(m.(*metadata.DB)), ic.Events, g.(gcScheduler)), nil
|
||||
},
|
||||
})
|
||||
}
|
||||
@ -48,15 +47,15 @@ type gcScheduler interface {
|
||||
}
|
||||
|
||||
type service struct {
|
||||
db *metadata.DB
|
||||
store images.Store
|
||||
gc gcScheduler
|
||||
publisher events.Publisher
|
||||
}
|
||||
|
||||
// NewService returns the GRPC image server
|
||||
func NewService(db *metadata.DB, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer {
|
||||
func NewService(is images.Store, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer {
|
||||
return &service{
|
||||
db: db,
|
||||
store: is,
|
||||
gc: gc,
|
||||
publisher: publisher,
|
||||
}
|
||||
@ -68,31 +67,26 @@ func (s *service) Register(server *grpc.Server) error {
|
||||
}
|
||||
|
||||
func (s *service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) {
|
||||
var resp imagesapi.GetImageResponse
|
||||
image, err := s.store.Get(ctx, req.Name)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error {
|
||||
image, err := store.Get(ctx, req.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
imagepb := imageToProto(&image)
|
||||
resp.Image = &imagepb
|
||||
return nil
|
||||
}))
|
||||
imagepb := imageToProto(&image)
|
||||
return &imagesapi.GetImageResponse{
|
||||
Image: &imagepb,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) {
|
||||
var resp imagesapi.ListImagesResponse
|
||||
images, err := s.store.List(ctx, req.Filters...)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error {
|
||||
images, err := store.List(ctx, req.Filters...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Images = imagesToProto(images)
|
||||
return nil
|
||||
}))
|
||||
return &imagesapi.ListImagesResponse{
|
||||
Images: imagesToProto(images),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) {
|
||||
@ -105,18 +99,13 @@ func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest)
|
||||
image = imageFromProto(&req.Image)
|
||||
resp imagesapi.CreateImageResponse
|
||||
)
|
||||
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
|
||||
created, err := store.Create(ctx, image)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp.Image = imageToProto(&created)
|
||||
return nil
|
||||
}); err != nil {
|
||||
created, err := s.store.Create(ctx, image)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
resp.Image = imageToProto(&created)
|
||||
|
||||
if err := s.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{
|
||||
Name: resp.Image.Name,
|
||||
Labels: resp.Image.Labels,
|
||||
@ -134,28 +123,24 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest)
|
||||
}
|
||||
|
||||
var (
|
||||
image = imageFromProto(&req.Image)
|
||||
resp imagesapi.UpdateImageResponse
|
||||
image = imageFromProto(&req.Image)
|
||||
resp imagesapi.UpdateImageResponse
|
||||
fieldpaths []string
|
||||
)
|
||||
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
|
||||
var fieldpaths []string
|
||||
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
|
||||
for _, path := range req.UpdateMask.Paths {
|
||||
fieldpaths = append(fieldpaths, path)
|
||||
}
|
||||
}
|
||||
|
||||
updated, err := store.Update(ctx, image, fieldpaths...)
|
||||
if err != nil {
|
||||
return err
|
||||
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
|
||||
for _, path := range req.UpdateMask.Paths {
|
||||
fieldpaths = append(fieldpaths, path)
|
||||
}
|
||||
}
|
||||
|
||||
resp.Image = imageToProto(&updated)
|
||||
return nil
|
||||
}); err != nil {
|
||||
updated, err := s.store.Update(ctx, image, fieldpaths...)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
resp.Image = imageToProto(&updated)
|
||||
|
||||
if err := s.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{
|
||||
Name: resp.Image.Name,
|
||||
Labels: resp.Image.Labels,
|
||||
@ -168,10 +153,9 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest)
|
||||
|
||||
func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*ptypes.Empty, error) {
|
||||
log.G(ctx).WithField("name", req.Name).Debugf("delete image")
|
||||
if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error {
|
||||
return errdefs.ToGRPC(store.Delete(ctx, req.Name))
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
|
||||
if err := s.store.Delete(ctx, req.Name); err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
if err := s.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{
|
||||
@ -188,15 +172,3 @@ func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest)
|
||||
|
||||
return &ptypes.Empty{}, nil
|
||||
}
|
||||
|
||||
func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error {
|
||||
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewImageStore(tx)) }
|
||||
}
|
||||
|
||||
func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
|
||||
return s.db.View(s.withStore(ctx, fn))
|
||||
}
|
||||
|
||||
func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error {
|
||||
return s.db.Update(s.withStore(ctx, fn))
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user