diff --git a/metadata/db_test.go b/metadata/db_test.go index bc97e6244..6d0c774f3 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -222,7 +222,7 @@ func TestMetadataCollector(t *testing.T) { if err := mdb.Update(func(tx *bolt.Tx) error { for _, obj := range objects { - node, err := create(obj, tx, cs, sn) + node, err := create(obj, tx, NewImageStore(mdb), cs, sn) if err != nil { return err } @@ -297,7 +297,7 @@ func benchmarkTrigger(n int) func(b *testing.B) { if err := mdb.Update(func(tx *bolt.Tx) error { for _, obj := range objects { - node, err := create(obj, tx, cs, sn) + node, err := create(obj, tx, NewImageStore(mdb), cs, sn) if err != nil { return err } @@ -377,7 +377,7 @@ type object struct { labels map[string]string } -func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) { +func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) { var ( node *gc.Node namespace = "test" @@ -430,12 +430,14 @@ func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter) } } case testImage: + ctx := WithTransactionContext(ctx, tx) + image := images.Image{ Name: v.name, Target: v.target, Labels: obj.labels, } - _, err := NewImageStore(tx).Create(ctx, image) + _, err := is.Create(ctx, image) if err != nil { return nil, errors.Wrap(err, "failed to create image") } diff --git a/metadata/images.go b/metadata/images.go index 070439a8c..d9105a2d4 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -20,12 +20,12 @@ import ( ) type imageStore struct { - tx *bolt.Tx + db *DB } // NewImageStore returns a store backed by a bolt DB -func NewImageStore(tx *bolt.Tx) images.Store { - return &imageStore{tx: tx} +func NewImageStore(db *DB) images.Store { + return &imageStore{db: db} } func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error) { @@ -36,19 +36,25 @@ func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error) return images.Image{}, err } - bkt := getImagesBucket(s.tx, namespace) - if bkt == nil { - return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name) - } + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getImagesBucket(tx, namespace) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + } - ibkt := bkt.Bucket([]byte(name)) - if ibkt == nil { - return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name) - } + ibkt := bkt.Bucket([]byte(name)) + if ibkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + } - image.Name = name - if err := readImage(&image, ibkt); err != nil { - return images.Image{}, errors.Wrapf(err, "image %q", name) + image.Name = name + if err := readImage(&image, ibkt); err != nil { + return errors.Wrapf(err, "image %q", name) + } + + return nil + }); err != nil { + return images.Image{}, err } return image, nil @@ -65,28 +71,30 @@ func (s *imageStore) List(ctx context.Context, fs ...string) ([]images.Image, er return nil, errors.Wrapf(errdefs.ErrInvalidArgument, err.Error()) } - bkt := getImagesBucket(s.tx, namespace) - if bkt == nil { - return nil, nil // empty store - } - var m []images.Image - if err := bkt.ForEach(func(k, v []byte) error { - var ( - image = images.Image{ - Name: string(k), + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getImagesBucket(tx, namespace) + if bkt == nil { + return nil // empty store + } + + return bkt.ForEach(func(k, v []byte) error { + var ( + image = images.Image{ + Name: string(k), + } + kbkt = bkt.Bucket(k) + ) + + if err := readImage(&image, kbkt); err != nil { + return err } - kbkt = bkt.Bucket(k) - ) - if err := readImage(&image, kbkt); err != nil { - return err - } - - if filter.Match(adaptImage(image)) { - m = append(m, image) - } - return nil + if filter.Match(adaptImage(image)) { + m = append(m, image) + } + return nil + }) }); err != nil { return nil, err } @@ -100,11 +108,16 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima return images.Image{}, err } - if err := validateImage(&image); err != nil { - return images.Image{}, err - } + if err := update(ctx, s.db, func(tx *bolt.Tx) error { + if err := validateImage(&image); err != nil { + return err + } + + bkt, err := createImagesBucket(tx, namespace) + if err != nil { + return err + } - return image, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error { ibkt, err := bkt.CreateBucket([]byte(image.Name)) if err != nil { if err != bolt.ErrBucketExists { @@ -117,7 +130,11 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima image.CreatedAt = time.Now().UTC() image.UpdatedAt = image.CreatedAt return writeImage(ibkt, &image) - }) + }); err != nil { + return images.Image{}, err + } + + return image, nil } func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths ...string) (images.Image, error) { @@ -131,7 +148,13 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths } var updated images.Image - return updated, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error { + + if err := update(ctx, s.db, func(tx *bolt.Tx) error { + bkt, err := createImagesBucket(tx, namespace) + if err != nil { + return err + } + ibkt := bkt.Bucket([]byte(image.Name)) if ibkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "image %q", image.Name) @@ -180,7 +203,12 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths updated.CreatedAt = createdat updated.UpdatedAt = time.Now().UTC() return writeImage(ibkt, &updated) - }) + }); err != nil { + return images.Image{}, err + } + + return updated, nil + } func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.DeleteOpt) error { @@ -189,11 +217,24 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del return err } - return withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error { - err := bkt.DeleteBucket([]byte(name)) + return update(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getImagesBucket(tx, namespace) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + } + + err = bkt.DeleteBucket([]byte(name)) if err == bolt.ErrBucketNotFound { return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) } + + // A reference to a piece of content has been removed, + // mark content store as dirty for triggering garbage + // collection + s.db.dirtyL.Lock() + s.db.dirtyCS = true + s.db.dirtyL.Unlock() + return err }) } diff --git a/metadata/images_test.go b/metadata/images_test.go index c90f33082..1cca81809 100644 --- a/metadata/images_test.go +++ b/metadata/images_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/boltdb/bolt" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/images" @@ -18,6 +17,7 @@ import ( func TestImagesList(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + store := NewImageStore(NewDB(db, nil, nil)) testset := map[string]*images.Image{} for i := 0; i < 4; i++ { @@ -36,21 +36,15 @@ func TestImagesList(t *testing.T) { }, } - if err := db.Update(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - now := time.Now() - result, err := store.Create(ctx, *testset[id]) - if err != nil { - return err - } - - checkImageTimestamps(t, &result, now, true) - testset[id].UpdatedAt, testset[id].CreatedAt = result.UpdatedAt, result.CreatedAt - checkImagesEqual(t, &result, testset[id], "ensure that containers were created as expected for list") - return nil - }); err != nil { + now := time.Now() + result, err := store.Create(ctx, *testset[id]) + if err != nil { t.Fatal(err) } + + checkImageTimestamps(t, &result, now, true) + testset[id].UpdatedAt, testset[id].CreatedAt = result.UpdatedAt, result.CreatedAt + checkImagesEqual(t, &result, testset[id], "ensure that containers were created as expected for list") } for _, testcase := range []struct { @@ -102,46 +96,33 @@ func TestImagesList(t *testing.T) { testset = newtestset } - if err := db.View(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - results, err := store.List(ctx, testcase.filters...) - if err != nil { - t.Fatal(err) - } - - if len(results) == 0 { // all tests return a non-empty result set - t.Fatalf("no results returned") - } - - if len(results) != len(testset) { - t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset)) - } - - for _, result := range results { - checkImagesEqual(t, &result, testset[result.Name], "list results did not match") - } - - return nil - }); err != nil { + results, err := store.List(ctx, testcase.filters...) + if err != nil { t.Fatal(err) } + + if len(results) == 0 { // all tests return a non-empty result set + t.Fatalf("no results returned") + } + + if len(results) != len(testset) { + t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset)) + } + + for _, result := range results { + checkImagesEqual(t, &result, testset[result.Name], "list results did not match") + } }) } // delete everything to test it for id := range testset { - if err := db.Update(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - return store.Delete(ctx, id) - }); err != nil { + if err := store.Delete(ctx, id); err != nil { t.Fatal(err) } // try it again, get NotFound - if err := db.Update(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - return store.Delete(ctx, id) - }); errors.Cause(err) != errdefs.ErrNotFound { + if err := store.Delete(ctx, id); errors.Cause(err) != errdefs.ErrNotFound { t.Fatalf("unexpected error %v", err) } } @@ -149,6 +130,7 @@ func TestImagesList(t *testing.T) { func TestImagesCreateUpdateDelete(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + store := NewImageStore(NewDB(db, nil, nil)) for _, testcase := range []struct { name string @@ -383,78 +365,52 @@ func TestImagesCreateUpdateDelete(t *testing.T) { testcase.expected.Target.Digest = testcase.original.Target.Digest } - done := errors.New("test complete") - if err := db.Update(func(tx *bolt.Tx) error { - var ( - store = NewImageStore(tx) - now = time.Now() - ) - - created, err := store.Create(ctx, testcase.original) - if errors.Cause(err) != testcase.createerr { - if testcase.createerr == nil { - t.Fatalf("unexpected error: %v", err) - } else { - t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr) - } - } else if testcase.createerr != nil { - return done + // Create + now := time.Now() + created, err := store.Create(ctx, testcase.original) + if errors.Cause(err) != testcase.createerr { + if testcase.createerr == nil { + t.Fatalf("unexpected error: %v", err) + } else { + t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr) } + } else if testcase.createerr != nil { + return + } - checkImageTimestamps(t, &created, now, true) + checkImageTimestamps(t, &created, now, true) - testcase.original.CreatedAt = created.CreatedAt - testcase.expected.CreatedAt = created.CreatedAt - testcase.original.UpdatedAt = created.UpdatedAt - testcase.expected.UpdatedAt = created.UpdatedAt + testcase.original.CreatedAt = created.CreatedAt + testcase.expected.CreatedAt = created.CreatedAt + testcase.original.UpdatedAt = created.UpdatedAt + testcase.expected.UpdatedAt = created.UpdatedAt - checkImagesEqual(t, &created, &testcase.original, "unexpected image on creation") - return nil - }); err != nil { - if err == done { - return + checkImagesEqual(t, &created, &testcase.original, "unexpected image on creation") + + // Update + now = time.Now() + updated, err := store.Update(ctx, testcase.input, testcase.fieldpaths...) + if errors.Cause(err) != testcase.cause { + if testcase.cause == nil { + t.Fatalf("unexpected error: %v", err) + } else { + t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause) } + } else if testcase.cause != nil { + return + } + + checkImageTimestamps(t, &updated, now, false) + testcase.expected.UpdatedAt = updated.UpdatedAt + checkImagesEqual(t, &updated, &testcase.expected, "updated failed to get expected result") + + // Get + result, err := store.Get(ctx, testcase.original.Name) + if err != nil { t.Fatal(err) } - if err := db.Update(func(tx *bolt.Tx) error { - now := time.Now() - store := NewImageStore(tx) - updated, err := store.Update(ctx, testcase.input, testcase.fieldpaths...) - if errors.Cause(err) != testcase.cause { - if testcase.cause == nil { - t.Fatalf("unexpected error: %v", err) - } else { - t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause) - } - } else if testcase.cause != nil { - return done - } - - checkImageTimestamps(t, &updated, now, false) - testcase.expected.UpdatedAt = updated.UpdatedAt - checkImagesEqual(t, &updated, &testcase.expected, "updated failed to get expected result") - return nil - }); err != nil { - if err == done { - return - } - t.Fatal(err) - } - - if err := db.View(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - result, err := store.Get(ctx, testcase.original.Name) - if err != nil { - t.Fatal(err) - } - - checkImagesEqual(t, &result, &testcase.expected, "get after failed to get expected result") - return nil - }); err != nil { - t.Fatal(err) - } - + checkImagesEqual(t, &result, &testcase.expected, "get after failed to get expected result") }) } } diff --git a/services/images/service.go b/services/images/service.go index 0a6ccec57..57ab40bff 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -3,7 +3,6 @@ package images import ( gocontext "context" - "github.com/boltdb/bolt" eventstypes "github.com/containerd/containerd/api/events" imagesapi "github.com/containerd/containerd/api/services/images/v1" "github.com/containerd/containerd/errdefs" @@ -38,7 +37,7 @@ func init() { return nil, err } - return NewService(m.(*metadata.DB), ic.Events, g.(gcScheduler)), nil + return NewService(metadata.NewImageStore(m.(*metadata.DB)), ic.Events, g.(gcScheduler)), nil }, }) } @@ -48,15 +47,15 @@ type gcScheduler interface { } type service struct { - db *metadata.DB + store images.Store gc gcScheduler publisher events.Publisher } // NewService returns the GRPC image server -func NewService(db *metadata.DB, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer { +func NewService(is images.Store, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer { return &service{ - db: db, + store: is, gc: gc, publisher: publisher, } @@ -68,31 +67,26 @@ func (s *service) Register(server *grpc.Server) error { } func (s *service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) { - var resp imagesapi.GetImageResponse + image, err := s.store.Get(ctx, req.Name) + if err != nil { + return nil, errdefs.ToGRPC(err) + } - return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error { - image, err := store.Get(ctx, req.Name) - if err != nil { - return err - } - imagepb := imageToProto(&image) - resp.Image = &imagepb - return nil - })) + imagepb := imageToProto(&image) + return &imagesapi.GetImageResponse{ + Image: &imagepb, + }, nil } func (s *service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) { - var resp imagesapi.ListImagesResponse + images, err := s.store.List(ctx, req.Filters...) + if err != nil { + return nil, errdefs.ToGRPC(err) + } - return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error { - images, err := store.List(ctx, req.Filters...) - if err != nil { - return err - } - - resp.Images = imagesToProto(images) - return nil - })) + return &imagesapi.ListImagesResponse{ + Images: imagesToProto(images), + }, nil } func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) { @@ -105,18 +99,13 @@ func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) image = imageFromProto(&req.Image) resp imagesapi.CreateImageResponse ) - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error { - created, err := store.Create(ctx, image) - if err != nil { - return err - } - - resp.Image = imageToProto(&created) - return nil - }); err != nil { + created, err := s.store.Create(ctx, image) + if err != nil { return nil, errdefs.ToGRPC(err) } + resp.Image = imageToProto(&created) + if err := s.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ Name: resp.Image.Name, Labels: resp.Image.Labels, @@ -134,28 +123,24 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) } var ( - image = imageFromProto(&req.Image) - resp imagesapi.UpdateImageResponse + image = imageFromProto(&req.Image) + resp imagesapi.UpdateImageResponse + fieldpaths []string ) - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error { - var fieldpaths []string - if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { - for _, path := range req.UpdateMask.Paths { - fieldpaths = append(fieldpaths, path) - } - } - updated, err := store.Update(ctx, image, fieldpaths...) - if err != nil { - return err + if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { + for _, path := range req.UpdateMask.Paths { + fieldpaths = append(fieldpaths, path) } + } - resp.Image = imageToProto(&updated) - return nil - }); err != nil { + updated, err := s.store.Update(ctx, image, fieldpaths...) + if err != nil { return nil, errdefs.ToGRPC(err) } + resp.Image = imageToProto(&updated) + if err := s.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ Name: resp.Image.Name, Labels: resp.Image.Labels, @@ -168,10 +153,9 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*ptypes.Empty, error) { log.G(ctx).WithField("name", req.Name).Debugf("delete image") - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error { - return errdefs.ToGRPC(store.Delete(ctx, req.Name)) - }); err != nil { - return nil, err + + if err := s.store.Delete(ctx, req.Name); err != nil { + return nil, errdefs.ToGRPC(err) } if err := s.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ @@ -188,15 +172,3 @@ func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) return &ptypes.Empty{}, nil } - -func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error { - return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewImageStore(tx)) } -} - -func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error { - return s.db.View(s.withStore(ctx, fn)) -} - -func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error { - return s.db.Update(s.withStore(ctx, fn)) -}