From 89fa154efdc595a37120ef3ff2c63e0589469c63 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 3 Jan 2018 15:13:05 -0800 Subject: [PATCH] Update metadata image store to be initialized once The boltdb image store now manages its own transactions when one is not provided, but allows the caller to pass in a transaction through the context. This makes the image store more similar to the content and snapshot stores. Additionally, use the reference to the metadata database to mark the content store as dirty after an image has been deleted. The deletion of an image means a reference to a piece of content is gone and therefore garbage collection should be run to check if any resources can be cleaned up as a result. Signed-off-by: Derek McGowan --- metadata/db_test.go | 10 ++- metadata/images.go | 125 ++++++++++++++++++--------- metadata/images_test.go | 170 ++++++++++++++----------------------- services/images/service.go | 102 ++++++++-------------- 4 files changed, 189 insertions(+), 218 deletions(-) diff --git a/metadata/db_test.go b/metadata/db_test.go index bc97e6244..6d0c774f3 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -222,7 +222,7 @@ func TestMetadataCollector(t *testing.T) { if err := mdb.Update(func(tx *bolt.Tx) error { for _, obj := range objects { - node, err := create(obj, tx, cs, sn) + node, err := create(obj, tx, NewImageStore(mdb), cs, sn) if err != nil { return err } @@ -297,7 +297,7 @@ func benchmarkTrigger(n int) func(b *testing.B) { if err := mdb.Update(func(tx *bolt.Tx) error { for _, obj := range objects { - node, err := create(obj, tx, cs, sn) + node, err := create(obj, tx, NewImageStore(mdb), cs, sn) if err != nil { return err } @@ -377,7 +377,7 @@ type object struct { labels map[string]string } -func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) { +func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) { var ( node *gc.Node namespace = "test" @@ -430,12 +430,14 @@ func create(obj object, tx *bolt.Tx, cs content.Store, sn snapshots.Snapshotter) } } case testImage: + ctx := WithTransactionContext(ctx, tx) + image := images.Image{ Name: v.name, Target: v.target, Labels: obj.labels, } - _, err := NewImageStore(tx).Create(ctx, image) + _, err := is.Create(ctx, image) if err != nil { return nil, errors.Wrap(err, "failed to create image") } diff --git a/metadata/images.go b/metadata/images.go index 070439a8c..d9105a2d4 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -20,12 +20,12 @@ import ( ) type imageStore struct { - tx *bolt.Tx + db *DB } // NewImageStore returns a store backed by a bolt DB -func NewImageStore(tx *bolt.Tx) images.Store { - return &imageStore{tx: tx} +func NewImageStore(db *DB) images.Store { + return &imageStore{db: db} } func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error) { @@ -36,19 +36,25 @@ func (s *imageStore) Get(ctx context.Context, name string) (images.Image, error) return images.Image{}, err } - bkt := getImagesBucket(s.tx, namespace) - if bkt == nil { - return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name) - } + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getImagesBucket(tx, namespace) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + } - ibkt := bkt.Bucket([]byte(name)) - if ibkt == nil { - return images.Image{}, errors.Wrapf(errdefs.ErrNotFound, "image %q", name) - } + ibkt := bkt.Bucket([]byte(name)) + if ibkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + } - image.Name = name - if err := readImage(&image, ibkt); err != nil { - return images.Image{}, errors.Wrapf(err, "image %q", name) + image.Name = name + if err := readImage(&image, ibkt); err != nil { + return errors.Wrapf(err, "image %q", name) + } + + return nil + }); err != nil { + return images.Image{}, err } return image, nil @@ -65,28 +71,30 @@ func (s *imageStore) List(ctx context.Context, fs ...string) ([]images.Image, er return nil, errors.Wrapf(errdefs.ErrInvalidArgument, err.Error()) } - bkt := getImagesBucket(s.tx, namespace) - if bkt == nil { - return nil, nil // empty store - } - var m []images.Image - if err := bkt.ForEach(func(k, v []byte) error { - var ( - image = images.Image{ - Name: string(k), + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getImagesBucket(tx, namespace) + if bkt == nil { + return nil // empty store + } + + return bkt.ForEach(func(k, v []byte) error { + var ( + image = images.Image{ + Name: string(k), + } + kbkt = bkt.Bucket(k) + ) + + if err := readImage(&image, kbkt); err != nil { + return err } - kbkt = bkt.Bucket(k) - ) - if err := readImage(&image, kbkt); err != nil { - return err - } - - if filter.Match(adaptImage(image)) { - m = append(m, image) - } - return nil + if filter.Match(adaptImage(image)) { + m = append(m, image) + } + return nil + }) }); err != nil { return nil, err } @@ -100,11 +108,16 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima return images.Image{}, err } - if err := validateImage(&image); err != nil { - return images.Image{}, err - } + if err := update(ctx, s.db, func(tx *bolt.Tx) error { + if err := validateImage(&image); err != nil { + return err + } + + bkt, err := createImagesBucket(tx, namespace) + if err != nil { + return err + } - return image, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error { ibkt, err := bkt.CreateBucket([]byte(image.Name)) if err != nil { if err != bolt.ErrBucketExists { @@ -117,7 +130,11 @@ func (s *imageStore) Create(ctx context.Context, image images.Image) (images.Ima image.CreatedAt = time.Now().UTC() image.UpdatedAt = image.CreatedAt return writeImage(ibkt, &image) - }) + }); err != nil { + return images.Image{}, err + } + + return image, nil } func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths ...string) (images.Image, error) { @@ -131,7 +148,13 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths } var updated images.Image - return updated, withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error { + + if err := update(ctx, s.db, func(tx *bolt.Tx) error { + bkt, err := createImagesBucket(tx, namespace) + if err != nil { + return err + } + ibkt := bkt.Bucket([]byte(image.Name)) if ibkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "image %q", image.Name) @@ -180,7 +203,12 @@ func (s *imageStore) Update(ctx context.Context, image images.Image, fieldpaths updated.CreatedAt = createdat updated.UpdatedAt = time.Now().UTC() return writeImage(ibkt, &updated) - }) + }); err != nil { + return images.Image{}, err + } + + return updated, nil + } func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.DeleteOpt) error { @@ -189,11 +217,24 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del return err } - return withImagesBucket(s.tx, namespace, func(bkt *bolt.Bucket) error { - err := bkt.DeleteBucket([]byte(name)) + return update(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getImagesBucket(tx, namespace) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + } + + err = bkt.DeleteBucket([]byte(name)) if err == bolt.ErrBucketNotFound { return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) } + + // A reference to a piece of content has been removed, + // mark content store as dirty for triggering garbage + // collection + s.db.dirtyL.Lock() + s.db.dirtyCS = true + s.db.dirtyL.Unlock() + return err }) } diff --git a/metadata/images_test.go b/metadata/images_test.go index c90f33082..1cca81809 100644 --- a/metadata/images_test.go +++ b/metadata/images_test.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/boltdb/bolt" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/filters" "github.com/containerd/containerd/images" @@ -18,6 +17,7 @@ import ( func TestImagesList(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + store := NewImageStore(NewDB(db, nil, nil)) testset := map[string]*images.Image{} for i := 0; i < 4; i++ { @@ -36,21 +36,15 @@ func TestImagesList(t *testing.T) { }, } - if err := db.Update(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - now := time.Now() - result, err := store.Create(ctx, *testset[id]) - if err != nil { - return err - } - - checkImageTimestamps(t, &result, now, true) - testset[id].UpdatedAt, testset[id].CreatedAt = result.UpdatedAt, result.CreatedAt - checkImagesEqual(t, &result, testset[id], "ensure that containers were created as expected for list") - return nil - }); err != nil { + now := time.Now() + result, err := store.Create(ctx, *testset[id]) + if err != nil { t.Fatal(err) } + + checkImageTimestamps(t, &result, now, true) + testset[id].UpdatedAt, testset[id].CreatedAt = result.UpdatedAt, result.CreatedAt + checkImagesEqual(t, &result, testset[id], "ensure that containers were created as expected for list") } for _, testcase := range []struct { @@ -102,46 +96,33 @@ func TestImagesList(t *testing.T) { testset = newtestset } - if err := db.View(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - results, err := store.List(ctx, testcase.filters...) - if err != nil { - t.Fatal(err) - } - - if len(results) == 0 { // all tests return a non-empty result set - t.Fatalf("no results returned") - } - - if len(results) != len(testset) { - t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset)) - } - - for _, result := range results { - checkImagesEqual(t, &result, testset[result.Name], "list results did not match") - } - - return nil - }); err != nil { + results, err := store.List(ctx, testcase.filters...) + if err != nil { t.Fatal(err) } + + if len(results) == 0 { // all tests return a non-empty result set + t.Fatalf("no results returned") + } + + if len(results) != len(testset) { + t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset)) + } + + for _, result := range results { + checkImagesEqual(t, &result, testset[result.Name], "list results did not match") + } }) } // delete everything to test it for id := range testset { - if err := db.Update(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - return store.Delete(ctx, id) - }); err != nil { + if err := store.Delete(ctx, id); err != nil { t.Fatal(err) } // try it again, get NotFound - if err := db.Update(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - return store.Delete(ctx, id) - }); errors.Cause(err) != errdefs.ErrNotFound { + if err := store.Delete(ctx, id); errors.Cause(err) != errdefs.ErrNotFound { t.Fatalf("unexpected error %v", err) } } @@ -149,6 +130,7 @@ func TestImagesList(t *testing.T) { func TestImagesCreateUpdateDelete(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + store := NewImageStore(NewDB(db, nil, nil)) for _, testcase := range []struct { name string @@ -383,78 +365,52 @@ func TestImagesCreateUpdateDelete(t *testing.T) { testcase.expected.Target.Digest = testcase.original.Target.Digest } - done := errors.New("test complete") - if err := db.Update(func(tx *bolt.Tx) error { - var ( - store = NewImageStore(tx) - now = time.Now() - ) - - created, err := store.Create(ctx, testcase.original) - if errors.Cause(err) != testcase.createerr { - if testcase.createerr == nil { - t.Fatalf("unexpected error: %v", err) - } else { - t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr) - } - } else if testcase.createerr != nil { - return done + // Create + now := time.Now() + created, err := store.Create(ctx, testcase.original) + if errors.Cause(err) != testcase.createerr { + if testcase.createerr == nil { + t.Fatalf("unexpected error: %v", err) + } else { + t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr) } + } else if testcase.createerr != nil { + return + } - checkImageTimestamps(t, &created, now, true) + checkImageTimestamps(t, &created, now, true) - testcase.original.CreatedAt = created.CreatedAt - testcase.expected.CreatedAt = created.CreatedAt - testcase.original.UpdatedAt = created.UpdatedAt - testcase.expected.UpdatedAt = created.UpdatedAt + testcase.original.CreatedAt = created.CreatedAt + testcase.expected.CreatedAt = created.CreatedAt + testcase.original.UpdatedAt = created.UpdatedAt + testcase.expected.UpdatedAt = created.UpdatedAt - checkImagesEqual(t, &created, &testcase.original, "unexpected image on creation") - return nil - }); err != nil { - if err == done { - return + checkImagesEqual(t, &created, &testcase.original, "unexpected image on creation") + + // Update + now = time.Now() + updated, err := store.Update(ctx, testcase.input, testcase.fieldpaths...) + if errors.Cause(err) != testcase.cause { + if testcase.cause == nil { + t.Fatalf("unexpected error: %v", err) + } else { + t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause) } + } else if testcase.cause != nil { + return + } + + checkImageTimestamps(t, &updated, now, false) + testcase.expected.UpdatedAt = updated.UpdatedAt + checkImagesEqual(t, &updated, &testcase.expected, "updated failed to get expected result") + + // Get + result, err := store.Get(ctx, testcase.original.Name) + if err != nil { t.Fatal(err) } - if err := db.Update(func(tx *bolt.Tx) error { - now := time.Now() - store := NewImageStore(tx) - updated, err := store.Update(ctx, testcase.input, testcase.fieldpaths...) - if errors.Cause(err) != testcase.cause { - if testcase.cause == nil { - t.Fatalf("unexpected error: %v", err) - } else { - t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause) - } - } else if testcase.cause != nil { - return done - } - - checkImageTimestamps(t, &updated, now, false) - testcase.expected.UpdatedAt = updated.UpdatedAt - checkImagesEqual(t, &updated, &testcase.expected, "updated failed to get expected result") - return nil - }); err != nil { - if err == done { - return - } - t.Fatal(err) - } - - if err := db.View(func(tx *bolt.Tx) error { - store := NewImageStore(tx) - result, err := store.Get(ctx, testcase.original.Name) - if err != nil { - t.Fatal(err) - } - - checkImagesEqual(t, &result, &testcase.expected, "get after failed to get expected result") - return nil - }); err != nil { - t.Fatal(err) - } - + checkImagesEqual(t, &result, &testcase.expected, "get after failed to get expected result") }) } } diff --git a/services/images/service.go b/services/images/service.go index 0a6ccec57..57ab40bff 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -3,7 +3,6 @@ package images import ( gocontext "context" - "github.com/boltdb/bolt" eventstypes "github.com/containerd/containerd/api/events" imagesapi "github.com/containerd/containerd/api/services/images/v1" "github.com/containerd/containerd/errdefs" @@ -38,7 +37,7 @@ func init() { return nil, err } - return NewService(m.(*metadata.DB), ic.Events, g.(gcScheduler)), nil + return NewService(metadata.NewImageStore(m.(*metadata.DB)), ic.Events, g.(gcScheduler)), nil }, }) } @@ -48,15 +47,15 @@ type gcScheduler interface { } type service struct { - db *metadata.DB + store images.Store gc gcScheduler publisher events.Publisher } // NewService returns the GRPC image server -func NewService(db *metadata.DB, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer { +func NewService(is images.Store, publisher events.Publisher, gc gcScheduler) imagesapi.ImagesServer { return &service{ - db: db, + store: is, gc: gc, publisher: publisher, } @@ -68,31 +67,26 @@ func (s *service) Register(server *grpc.Server) error { } func (s *service) Get(ctx context.Context, req *imagesapi.GetImageRequest) (*imagesapi.GetImageResponse, error) { - var resp imagesapi.GetImageResponse + image, err := s.store.Get(ctx, req.Name) + if err != nil { + return nil, errdefs.ToGRPC(err) + } - return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error { - image, err := store.Get(ctx, req.Name) - if err != nil { - return err - } - imagepb := imageToProto(&image) - resp.Image = &imagepb - return nil - })) + imagepb := imageToProto(&image) + return &imagesapi.GetImageResponse{ + Image: &imagepb, + }, nil } func (s *service) List(ctx context.Context, req *imagesapi.ListImagesRequest) (*imagesapi.ListImagesResponse, error) { - var resp imagesapi.ListImagesResponse + images, err := s.store.List(ctx, req.Filters...) + if err != nil { + return nil, errdefs.ToGRPC(err) + } - return &resp, errdefs.ToGRPC(s.withStoreView(ctx, func(ctx context.Context, store images.Store) error { - images, err := store.List(ctx, req.Filters...) - if err != nil { - return err - } - - resp.Images = imagesToProto(images) - return nil - })) + return &imagesapi.ListImagesResponse{ + Images: imagesToProto(images), + }, nil } func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) (*imagesapi.CreateImageResponse, error) { @@ -105,18 +99,13 @@ func (s *service) Create(ctx context.Context, req *imagesapi.CreateImageRequest) image = imageFromProto(&req.Image) resp imagesapi.CreateImageResponse ) - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error { - created, err := store.Create(ctx, image) - if err != nil { - return err - } - - resp.Image = imageToProto(&created) - return nil - }); err != nil { + created, err := s.store.Create(ctx, image) + if err != nil { return nil, errdefs.ToGRPC(err) } + resp.Image = imageToProto(&created) + if err := s.publisher.Publish(ctx, "/images/create", &eventstypes.ImageCreate{ Name: resp.Image.Name, Labels: resp.Image.Labels, @@ -134,28 +123,24 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) } var ( - image = imageFromProto(&req.Image) - resp imagesapi.UpdateImageResponse + image = imageFromProto(&req.Image) + resp imagesapi.UpdateImageResponse + fieldpaths []string ) - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error { - var fieldpaths []string - if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { - for _, path := range req.UpdateMask.Paths { - fieldpaths = append(fieldpaths, path) - } - } - updated, err := store.Update(ctx, image, fieldpaths...) - if err != nil { - return err + if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { + for _, path := range req.UpdateMask.Paths { + fieldpaths = append(fieldpaths, path) } + } - resp.Image = imageToProto(&updated) - return nil - }); err != nil { + updated, err := s.store.Update(ctx, image, fieldpaths...) + if err != nil { return nil, errdefs.ToGRPC(err) } + resp.Image = imageToProto(&updated) + if err := s.publisher.Publish(ctx, "/images/update", &eventstypes.ImageUpdate{ Name: resp.Image.Name, Labels: resp.Image.Labels, @@ -168,10 +153,9 @@ func (s *service) Update(ctx context.Context, req *imagesapi.UpdateImageRequest) func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) (*ptypes.Empty, error) { log.G(ctx).WithField("name", req.Name).Debugf("delete image") - if err := s.withStoreUpdate(ctx, func(ctx context.Context, store images.Store) error { - return errdefs.ToGRPC(store.Delete(ctx, req.Name)) - }); err != nil { - return nil, err + + if err := s.store.Delete(ctx, req.Name); err != nil { + return nil, errdefs.ToGRPC(err) } if err := s.publisher.Publish(ctx, "/images/delete", &eventstypes.ImageDelete{ @@ -188,15 +172,3 @@ func (s *service) Delete(ctx context.Context, req *imagesapi.DeleteImageRequest) return &ptypes.Empty{}, nil } - -func (s *service) withStore(ctx context.Context, fn func(ctx context.Context, store images.Store) error) func(tx *bolt.Tx) error { - return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewImageStore(tx)) } -} - -func (s *service) withStoreView(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error { - return s.db.View(s.withStore(ctx, fn)) -} - -func (s *service) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store images.Store) error) error { - return s.db.Update(s.withStore(ctx, fn)) -}