diff --git a/metadata/containers.go b/metadata/containers.go index af8224786..09b0d203d 100644 --- a/metadata/containers.go +++ b/metadata/containers.go @@ -19,6 +19,7 @@ package metadata import ( "context" "strings" + "sync/atomic" "time" "github.com/containerd/containerd/containers" @@ -35,13 +36,13 @@ import ( ) type containerStore struct { - tx *bolt.Tx + db *DB } // NewContainerStore returns a Store backed by an underlying bolt DB -func NewContainerStore(tx *bolt.Tx) containers.Store { +func NewContainerStore(db *DB) containers.Store { return &containerStore{ - tx: tx, + db: db, } } @@ -51,14 +52,21 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain return containers.Container{}, err } - bkt := getContainerBucket(s.tx, namespace, id) - if bkt == nil { - return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace) - } - container := containers.Container{ID: id} - if err := readContainer(&container, bkt); err != nil { - return containers.Container{}, errors.Wrapf(err, "failed to read container %q", id) + + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getContainerBucket(tx, namespace, id) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace) + } + + if err := readContainer(&container, bkt); err != nil { + return errors.Wrapf(err, "failed to read container %q", id) + } + + return nil + }); err != nil { + return containers.Container{}, err } return container, nil @@ -75,27 +83,30 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error()) } - bkt := getContainersBucket(s.tx, namespace) - if bkt == nil { - return nil, nil // empty store - } - var m []containers.Container - if err := bkt.ForEach(func(k, v []byte) error { - cbkt := bkt.Bucket(k) - if cbkt == nil { + + if err := view(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getContainersBucket(tx, namespace) + if bkt == nil { + return nil // empty store + } + + return bkt.ForEach(func(k, v []byte) error { + cbkt := bkt.Bucket(k) + if cbkt == nil { + return nil + } + container := containers.Container{ID: string(k)} + + if err := readContainer(&container, cbkt); err != nil { + return errors.Wrapf(err, "failed to read container %q", string(k)) + } + + if filter.Match(adaptContainer(container)) { + m = append(m, container) + } return nil - } - container := containers.Container{ID: string(k)} - - if err := readContainer(&container, cbkt); err != nil { - return errors.Wrapf(err, "failed to read container %q", string(k)) - } - - if filter.Match(adaptContainer(container)) { - m = append(m, container) - } - return nil + }) }); err != nil { return nil, err } @@ -113,23 +124,29 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai return containers.Container{}, errors.Wrap(err, "create container failed validation") } - bkt, err := createContainersBucket(s.tx, namespace) - if err != nil { - return containers.Container{}, err - } - - cbkt, err := bkt.CreateBucket([]byte(container.ID)) - if err != nil { - if err == bolt.ErrBucketExists { - err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID) + if err := update(ctx, s.db, func(tx *bolt.Tx) error { + bkt, err := createContainersBucket(tx, namespace) + if err != nil { + return err } - return containers.Container{}, err - } - container.CreatedAt = time.Now().UTC() - container.UpdatedAt = container.CreatedAt - if err := writeContainer(cbkt, &container); err != nil { - return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID) + cbkt, err := bkt.CreateBucket([]byte(container.ID)) + if err != nil { + if err == bolt.ErrBucketExists { + err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID) + } + return err + } + + container.CreatedAt = time.Now().UTC() + container.UpdatedAt = container.CreatedAt + if err := writeContainer(cbkt, &container); err != nil { + return errors.Wrapf(err, "failed to write container %q", container.ID) + } + + return nil + }); err != nil { + return containers.Container{}, err } return container, nil @@ -145,85 +162,91 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "must specify a container id") } - bkt := getContainersBucket(s.tx, namespace) - if bkt == nil { - return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace) - } - - cbkt := bkt.Bucket([]byte(container.ID)) - if cbkt == nil { - return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID) - } - var updated containers.Container - if err := readContainer(&updated, cbkt); err != nil { - return updated, errors.Wrapf(err, "failed to read container %q", container.ID) - } - createdat := updated.CreatedAt - updated.ID = container.ID - - if len(fieldpaths) == 0 { - // only allow updates to these field on full replace. - fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"} - - // Fields that are immutable must cause an error when no field paths - // are provided. This allows these fields to become mutable in the - // future. - if updated.Snapshotter != container.Snapshotter { - return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable") + if err := update(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getContainersBucket(tx, namespace) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace) } - if updated.Runtime.Name != container.Runtime.Name { - return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable") + cbkt := bkt.Bucket([]byte(container.ID)) + if cbkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID) } - } - // apply the field mask. If you update this code, you better follow the - // field mask rules in field_mask.proto. If you don't know what this - // is, do not update this code. - for _, path := range fieldpaths { - if strings.HasPrefix(path, "labels.") { - if updated.Labels == nil { - updated.Labels = map[string]string{} + if err := readContainer(&updated, cbkt); err != nil { + return errors.Wrapf(err, "failed to read container %q", container.ID) + } + createdat := updated.CreatedAt + updated.ID = container.ID + + if len(fieldpaths) == 0 { + // only allow updates to these field on full replace. + fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"} + + // Fields that are immutable must cause an error when no field paths + // are provided. This allows these fields to become mutable in the + // future. + if updated.Snapshotter != container.Snapshotter { + return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable") } - key := strings.TrimPrefix(path, "labels.") - updated.Labels[key] = container.Labels[key] - continue - } - if strings.HasPrefix(path, "extensions.") { - if updated.Extensions == nil { - updated.Extensions = map[string]types.Any{} + if updated.Runtime.Name != container.Runtime.Name { + return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable") } - key := strings.TrimPrefix(path, "extensions.") - updated.Extensions[key] = container.Extensions[key] - continue } - switch path { - case "labels": - updated.Labels = container.Labels - case "spec": - updated.Spec = container.Spec - case "extensions": - updated.Extensions = container.Extensions - case "image": - updated.Image = container.Image - case "snapshotkey": - updated.SnapshotKey = container.SnapshotKey - default: - return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID) + // apply the field mask. If you update this code, you better follow the + // field mask rules in field_mask.proto. If you don't know what this + // is, do not update this code. + for _, path := range fieldpaths { + if strings.HasPrefix(path, "labels.") { + if updated.Labels == nil { + updated.Labels = map[string]string{} + } + key := strings.TrimPrefix(path, "labels.") + updated.Labels[key] = container.Labels[key] + continue + } + + if strings.HasPrefix(path, "extensions.") { + if updated.Extensions == nil { + updated.Extensions = map[string]types.Any{} + } + key := strings.TrimPrefix(path, "extensions.") + updated.Extensions[key] = container.Extensions[key] + continue + } + + switch path { + case "labels": + updated.Labels = container.Labels + case "spec": + updated.Spec = container.Spec + case "extensions": + updated.Extensions = container.Extensions + case "image": + updated.Image = container.Image + case "snapshotkey": + updated.SnapshotKey = container.SnapshotKey + default: + return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID) + } } - } - if err := validateContainer(&updated); err != nil { - return containers.Container{}, errors.Wrap(err, "update failed validation") - } + if err := validateContainer(&updated); err != nil { + return errors.Wrap(err, "update failed validation") + } - updated.CreatedAt = createdat - updated.UpdatedAt = time.Now().UTC() - if err := writeContainer(cbkt, &updated); err != nil { - return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID) + updated.CreatedAt = createdat + updated.UpdatedAt = time.Now().UTC() + if err := writeContainer(cbkt, &updated); err != nil { + return errors.Wrapf(err, "failed to write container %q", container.ID) + } + + return nil + }); err != nil { + return containers.Container{}, err } return updated, nil @@ -235,15 +258,23 @@ func (s *containerStore) Delete(ctx context.Context, id string) error { return err } - bkt := getContainersBucket(s.tx, namespace) - if bkt == nil { - return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace) - } + return update(ctx, s.db, func(tx *bolt.Tx) error { + bkt := getContainersBucket(tx, namespace) + if bkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace) + } - if err := bkt.DeleteBucket([]byte(id)); err == bolt.ErrBucketNotFound { - return errors.Wrapf(errdefs.ErrNotFound, "container %v", id) - } - return err + if err := bkt.DeleteBucket([]byte(id)); err != nil { + if err == bolt.ErrBucketNotFound { + err = errors.Wrapf(errdefs.ErrNotFound, "container %v", id) + } + return err + } + + atomic.AddUint32(&s.db.dirty, 1) + + return nil + }) } func validateContainer(container *containers.Container) error { diff --git a/metadata/containers_test.go b/metadata/containers_test.go index efa6ce68e..8cf1414ee 100644 --- a/metadata/containers_test.go +++ b/metadata/containers_test.go @@ -47,6 +47,8 @@ func TestContainersList(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + store := NewContainerStore(NewDB(db, nil, nil)) + spec := &specs.Spec{} encoded, err := typeurl.MarshalAny(spec) if err != nil { @@ -73,9 +75,8 @@ func TestContainersList(t *testing.T) { } if err := db.Update(func(tx *bolt.Tx) error { - store := NewContainerStore(tx) now := time.Now() - result, err := store.Create(ctx, *testset[id]) + result, err := store.Create(WithTransactionContext(ctx, tx), *testset[id]) if err != nil { return err } @@ -138,46 +139,35 @@ func TestContainersList(t *testing.T) { testset = newtestset } - if err := db.View(func(tx *bolt.Tx) error { - store := NewContainerStore(tx) - results, err := store.List(ctx, testcase.filters...) - if err != nil { - t.Fatal(err) - } - - if len(results) == 0 { // all tests return a non-empty result set - t.Fatalf("not results returned") - } - - if len(results) != len(testset) { - t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset)) - } - - for _, result := range results { - checkContainersEqual(t, &result, testset[result.ID], "list results did not match") - } - - return nil - }); err != nil { + results, err := store.List(ctx, testcase.filters...) + if err != nil { t.Fatal(err) } + + if len(results) == 0 { // all tests return a non-empty result set + t.Fatalf("not results returned") + } + + if len(results) != len(testset) { + t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset)) + } + + for _, result := range results { + checkContainersEqual(t, &result, testset[result.ID], "list results did not match") + } }) } // delete everything to test it for id := range testset { - if err := db.Update(func(tx *bolt.Tx) error { - store := NewContainerStore(tx) - return store.Delete(ctx, id) - }); err != nil { + if err := store.Delete(ctx, id); err != nil { t.Fatal(err) } // try it again, get NotFound - if err := db.Update(func(tx *bolt.Tx) error { - store := NewContainerStore(tx) - return store.Delete(ctx, id) - }); errors.Cause(err) != errdefs.ErrNotFound { + if err := store.Delete(ctx, id); err == nil { + t.Fatalf("expected error deleting non-existent container") + } else if errors.Cause(err) != errdefs.ErrNotFound { t.Fatalf("unexpected error %v", err) } } @@ -188,6 +178,8 @@ func TestContainersCreateUpdateDelete(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + store := NewContainerStore(NewDB(db, nil, nil)) + spec := &specs.Spec{} encoded, err := typeurl.MarshalAny(spec) if err != nil { @@ -641,79 +633,51 @@ func TestContainersCreateUpdateDelete(t *testing.T) { } testcase.expected.ID = testcase.name - done := errors.New("test complete") - if err := db.Update(func(tx *bolt.Tx) error { - var ( - now = time.Now().UTC() - store = NewContainerStore(tx) - ) + now := time.Now().UTC() - result, err := store.Create(ctx, testcase.original) - if errors.Cause(err) != testcase.createerr { - if testcase.createerr == nil { - t.Fatalf("unexpected error: %v", err) - } else { - t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr) - } - } else if testcase.createerr != nil { - return done + result, err := store.Create(ctx, testcase.original) + if errors.Cause(err) != testcase.createerr { + if testcase.createerr == nil { + t.Fatalf("unexpected error: %v", err) + } else { + t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr) } + } else if testcase.createerr != nil { + return + } - checkContainerTimestamps(t, &result, now, true) + checkContainerTimestamps(t, &result, now, true) - // ensure that createdat is never tampered with - testcase.original.CreatedAt = result.CreatedAt - testcase.expected.CreatedAt = result.CreatedAt - testcase.original.UpdatedAt = result.UpdatedAt - testcase.expected.UpdatedAt = result.UpdatedAt + // ensure that createdat is never tampered with + testcase.original.CreatedAt = result.CreatedAt + testcase.expected.CreatedAt = result.CreatedAt + testcase.original.UpdatedAt = result.UpdatedAt + testcase.expected.UpdatedAt = result.UpdatedAt - checkContainersEqual(t, &result, &testcase.original, "unexpected result on container update") - return nil - }); err != nil { - if err == done { - return + checkContainersEqual(t, &result, &testcase.original, "unexpected result on container update") + + now = time.Now() + result, err = store.Update(ctx, testcase.input, testcase.fieldpaths...) + if errors.Cause(err) != testcase.cause { + if testcase.cause == nil { + t.Fatalf("unexpected error: %v", err) + } else { + t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause) } + } else if testcase.cause != nil { + return + } + + checkContainerTimestamps(t, &result, now, false) + testcase.expected.UpdatedAt = result.UpdatedAt + checkContainersEqual(t, &result, &testcase.expected, "updated failed to get expected result") + + result, err = store.Get(ctx, testcase.original.ID) + if err != nil { t.Fatal(err) } - if err := db.Update(func(tx *bolt.Tx) error { - now := time.Now() - store := NewContainerStore(tx) - result, err := store.Update(ctx, testcase.input, testcase.fieldpaths...) - if errors.Cause(err) != testcase.cause { - if testcase.cause == nil { - t.Fatalf("unexpected error: %v", err) - } else { - t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause) - } - } else if testcase.cause != nil { - return done - } - - checkContainerTimestamps(t, &result, now, false) - testcase.expected.UpdatedAt = result.UpdatedAt - checkContainersEqual(t, &result, &testcase.expected, "updated failed to get expected result") - return nil - }); err != nil { - if err == done { - return - } - t.Fatal(err) - } - - if err := db.View(func(tx *bolt.Tx) error { - store := NewContainerStore(tx) - result, err := store.Get(ctx, testcase.original.ID) - if err != nil { - t.Fatal(err) - } - - checkContainersEqual(t, &result, &testcase.expected, "get after failed to get expected result") - return nil - }); err != nil { - t.Fatal(err) - } - + checkContainersEqual(t, &result, &testcase.expected, "get after failed to get expected result") }) } } diff --git a/metadata/content.go b/metadata/content.go index 4a07a256b..268a9b1b7 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "strings" "sync" + "sync/atomic" "time" "github.com/containerd/containerd/content" @@ -221,9 +222,8 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error { } // Mark content store as dirty for triggering garbage collection - cs.db.dirtyL.Lock() + atomic.AddUint32(&cs.db.dirty, 1) cs.db.dirtyCS = true - cs.db.dirtyL.Unlock() return nil }) diff --git a/metadata/content_test.go b/metadata/content_test.go index 3b35b10a3..a97ec4f2e 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -166,17 +166,13 @@ func TestIngestLeased(t *testing.T) { } func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) { - if err := db.Update(func(tx *bolt.Tx) error { - _, err := NewLeaseManager(tx).Create(ctx, leases.WithID(name)) - return err - }); err != nil { + lm := NewLeaseManager(db) + if _, err := lm.Create(ctx, leases.WithID(name)); err != nil { return nil, nil, err } return leases.WithLease(ctx, name), func() error { - return db.Update(func(tx *bolt.Tx) error { - return NewLeaseManager(tx).Delete(ctx, leases.Lease{ - ID: name, - }) + return lm.Delete(ctx, leases.Lease{ + ID: name, }) }, nil } diff --git a/metadata/db.go b/metadata/db.go index 7f1b27b38..0330a1acb 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "strings" "sync" + "sync/atomic" "time" "github.com/containerd/containerd/content" @@ -75,10 +76,16 @@ type DB struct { // sweep phases without preventing read transactions. wlock sync.RWMutex - // dirty flags and lock keeps track of datastores which have had deletions - // since the last garbage collection. These datastores will will be garbage - // collected during the next garbage collection. - dirtyL sync.Mutex + // dirty flag indicates that refences have been removed which require + // a garbage collection to ensure the database is clean. This tracks + // the number of dirty operations. This should be updated and read + // atomically if outside of wlock.Lock. + dirty uint32 + + // dirtySS and dirtyCS flags keeps track of datastores which have had + // deletions since the last garbage collection. These datastores will + // be garbage collected during the next garbage collection. These + // should only be updated inside of a write transaction or wlock.Lock. dirtySS map[string]struct{} dirtyCS bool @@ -237,12 +244,10 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error { defer m.wlock.RUnlock() err := m.db.Update(fn) if err == nil { - m.dirtyL.Lock() - dirty := m.dirtyCS || len(m.dirtySS) > 0 + dirty := atomic.LoadUint32(&m.dirty) > 0 for _, fn := range m.mutationCallbacks { fn(dirty) } - m.dirtyL.Unlock() } return err @@ -254,9 +259,9 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error { // The callback function is an argument for whether a deletion has occurred // since the last garbage collection. func (m *DB) RegisterMutationCallback(fn func(bool)) { - m.dirtyL.Lock() + m.wlock.Lock() m.mutationCallbacks = append(m.mutationCallbacks, fn) - m.dirtyL.Unlock() + m.wlock.Unlock() } // GCStats holds the duration for the different phases of the garbage collector @@ -282,8 +287,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { return nil, err } - m.dirtyL.Lock() - if err := m.db.Update(func(tx *bolt.Tx) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -309,7 +312,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { return nil }); err != nil { - m.dirtyL.Unlock() m.wlock.Unlock() return nil, err } @@ -317,6 +319,9 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { var stats GCStats var wg sync.WaitGroup + // reset dirty, no need for atomic inside of wlock.Lock + m.dirty = 0 + if len(m.dirtySS) > 0 { var sl sync.Mutex stats.SnapshotD = map[string]time.Duration{} @@ -349,8 +354,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) { m.dirtyCS = false } - m.dirtyL.Unlock() - stats.MetaD = time.Since(t1) m.wlock.Unlock() diff --git a/metadata/db_test.go b/metadata/db_test.go index a1ae62e69..772139274 100644 --- a/metadata/db_test.go +++ b/metadata/db_test.go @@ -386,7 +386,7 @@ func TestMetadataCollector(t *testing.T) { if err := mdb.Update(func(tx *bolt.Tx) error { for _, obj := range objects { - node, err := create(obj, tx, NewImageStore(mdb), cs, sn) + node, err := create(obj, tx, mdb, cs, sn) if err != nil { return err } @@ -461,7 +461,7 @@ func benchmarkTrigger(n int) func(b *testing.B) { if err := mdb.Update(func(tx *bolt.Tx) error { for _, obj := range objects { - node, err := create(obj, tx, NewImageStore(mdb), cs, sn) + node, err := create(obj, tx, mdb, cs, sn) if err != nil { return err } @@ -541,16 +541,15 @@ type object struct { labels map[string]string } -func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) { +func create(obj object, tx *bolt.Tx, db *DB, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) { var ( node *gc.Node namespace = "test" - ctx = namespaces.WithNamespace(context.Background(), namespace) + ctx = WithTransactionContext(namespaces.WithNamespace(context.Background(), namespace), tx) ) switch v := obj.data.(type) { case testContent: - ctx := WithTransactionContext(ctx, tx) expected := digest.FromBytes(v.data) w, err := cs.Writer(ctx, content.WithRef("test-ref"), @@ -572,7 +571,6 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps } } case testSnapshot: - ctx := WithTransactionContext(ctx, tx) if v.active { _, err := sn.Prepare(ctx, v.key, v.parent, snapshots.WithLabels(obj.labels)) if err != nil { @@ -596,14 +594,13 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps } } case testImage: - ctx := WithTransactionContext(ctx, tx) - image := images.Image{ Name: v.name, Target: v.target, Labels: obj.labels, } - _, err := is.Create(ctx, image) + + _, err := NewImageStore(db).Create(ctx, image) if err != nil { return nil, errors.Wrap(err, "failed to create image") } @@ -619,12 +616,13 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps }, Spec: &types.Any{}, } - _, err := NewContainerStore(tx).Create(ctx, container) + _, err := NewContainerStore(db).Create(ctx, container) if err != nil { return nil, err } case testLease: - lm := NewLeaseManager(tx) + lm := NewLeaseManager(db) + l, err := lm.Create(ctx, leases.WithID(v.id), leases.WithLabels(obj.labels)) if err != nil { return nil, err diff --git a/metadata/images.go b/metadata/images.go index 1dda753db..cace4e180 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -21,6 +21,7 @@ import ( "encoding/binary" "fmt" "strings" + "sync/atomic" "time" "github.com/containerd/containerd/errdefs" @@ -249,19 +250,16 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) } - err = bkt.DeleteBucket([]byte(name)) - if err == bolt.ErrBucketNotFound { - return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + if err = bkt.DeleteBucket([]byte(name)); err != nil { + if err == bolt.ErrBucketNotFound { + err = errors.Wrapf(errdefs.ErrNotFound, "image %q", name) + } + return err } - // A reference to a piece of content has been removed, - // mark content store as dirty for triggering garbage - // collection - s.db.dirtyL.Lock() - s.db.dirtyCS = true - s.db.dirtyL.Unlock() + atomic.AddUint32(&s.db.dirty, 1) - return err + return nil }) } diff --git a/metadata/leases.go b/metadata/leases.go index cd8809f4c..60da06b0f 100644 --- a/metadata/leases.go +++ b/metadata/leases.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "sync/atomic" "time" "github.com/containerd/containerd/errdefs" @@ -35,14 +36,14 @@ import ( // LeaseManager manages the create/delete lifecycle of leases // and also returns existing leases type LeaseManager struct { - tx *bolt.Tx + db *DB } // NewLeaseManager creates a new lease manager for managing leases using // the provided database transaction. -func NewLeaseManager(tx *bolt.Tx) *LeaseManager { +func NewLeaseManager(db *DB) *LeaseManager { return &LeaseManager{ - tx: tx, + db: db, } } @@ -63,35 +64,40 @@ func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases. return leases.Lease{}, err } - topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) - if err != nil { - return leases.Lease{}, err - } - - txbkt, err := topbkt.CreateBucket([]byte(l.ID)) - if err != nil { - if err == bolt.ErrBucketExists { - err = errdefs.ErrAlreadyExists + if err := update(ctx, lm.db, func(tx *bolt.Tx) error { + topbkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) + if err != nil { + return err } - return leases.Lease{}, errors.Wrapf(err, "lease %q", l.ID) - } - t := time.Now().UTC() - createdAt, err := t.MarshalBinary() - if err != nil { - return leases.Lease{}, err - } - if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil { - return leases.Lease{}, err - } - - if l.Labels != nil { - if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil { - return leases.Lease{}, err + txbkt, err := topbkt.CreateBucket([]byte(l.ID)) + if err != nil { + if err == bolt.ErrBucketExists { + err = errdefs.ErrAlreadyExists + } + return errors.Wrapf(err, "lease %q", l.ID) } - } - l.CreatedAt = t + t := time.Now().UTC() + createdAt, err := t.MarshalBinary() + if err != nil { + return err + } + if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil { + return err + } + + if l.Labels != nil { + if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil { + return err + } + } + l.CreatedAt = t + + return nil + }); err != nil { + return leases.Lease{}, err + } return l, nil } @@ -102,17 +108,22 @@ func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease, _ ...lea return err } - topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) - if topbkt == nil { - return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) - } - if err := topbkt.DeleteBucket([]byte(lease.ID)); err != nil { - if err == bolt.ErrBucketNotFound { - err = errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) + return update(ctx, lm.db, func(tx *bolt.Tx) error { + topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) + if topbkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) } - return err - } - return nil + if err := topbkt.DeleteBucket([]byte(lease.ID)); err != nil { + if err == bolt.ErrBucketNotFound { + err = errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) + } + return err + } + + atomic.AddUint32(&lm.db.dirty, 1) + + return nil + }) } // List lists all active leases @@ -129,39 +140,41 @@ func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease, var ll []leases.Lease - topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) - if topbkt == nil { - return ll, nil - } - - if err := topbkt.ForEach(func(k, v []byte) error { - if v != nil { + if err := view(ctx, lm.db, func(tx *bolt.Tx) error { + topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) + if topbkt == nil { return nil } - txbkt := topbkt.Bucket(k) - l := leases.Lease{ - ID: string(k), - } + return topbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + txbkt := topbkt.Bucket(k) - if v := txbkt.Get(bucketKeyCreatedAt); v != nil { - t := &l.CreatedAt - if err := t.UnmarshalBinary(v); err != nil { + l := leases.Lease{ + ID: string(k), + } + + if v := txbkt.Get(bucketKeyCreatedAt); v != nil { + t := &l.CreatedAt + if err := t.UnmarshalBinary(v); err != nil { + return err + } + } + + labels, err := boltutil.ReadLabels(txbkt) + if err != nil { return err } - } + l.Labels = labels - labels, err := boltutil.ReadLabels(txbkt) - if err != nil { - return err - } - l.Labels = labels + if filter.Match(adaptLease(l)) { + ll = append(ll, l) + } - if filter.Match(adaptLease(l)) { - ll = append(ll, l) - } - - return nil + return nil + }) }); err != nil { return nil, err } @@ -176,24 +189,26 @@ func (lm *LeaseManager) AddResource(ctx context.Context, lease leases.Lease, r l return err } - topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) - if topbkt == nil { - return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) - } + return update(ctx, lm.db, func(tx *bolt.Tx) error { + topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) + if topbkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) + } - keys, ref, err := parseLeaseResource(r) - if err != nil { - return err - } - - bkt := topbkt - for _, key := range keys { - bkt, err = bkt.CreateBucketIfNotExists([]byte(key)) + keys, ref, err := parseLeaseResource(r) if err != nil { return err } - } - return bkt.Put([]byte(ref), nil) + + bkt := topbkt + for _, key := range keys { + bkt, err = bkt.CreateBucketIfNotExists([]byte(key)) + if err != nil { + return err + } + } + return bkt.Put([]byte(ref), nil) + }) } // DeleteResource dereferences the resource by the provided lease. @@ -203,28 +218,35 @@ func (lm *LeaseManager) DeleteResource(ctx context.Context, lease leases.Lease, return err } - topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) - if topbkt == nil { - return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) - } - - keys, ref, err := parseLeaseResource(r) - if err != nil { - return err - } - - bkt := topbkt - for _, key := range keys { - if bkt == nil { - break + return update(ctx, lm.db, func(tx *bolt.Tx) error { + topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) + if topbkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) } - bkt = bkt.Bucket([]byte(key)) - } - if bkt == nil { + keys, ref, err := parseLeaseResource(r) + if err != nil { + return err + } + + bkt := topbkt + for _, key := range keys { + if bkt == nil { + break + } + bkt = bkt.Bucket([]byte(key)) + } + + if bkt != nil { + if err := bkt.Delete([]byte(ref)); err != nil { + return err + } + } + + atomic.AddUint32(&lm.db.dirty, 1) + return nil - } - return bkt.Delete([]byte(ref)) + }) } // ListResources lists all the resources referenced by the lease. @@ -234,59 +256,66 @@ func (lm *LeaseManager) ListResources(ctx context.Context, lease leases.Lease) ( return nil, err } - topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) - if topbkt == nil { - return nil, errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) - } + var rs []leases.Resource - rs := make([]leases.Resource, 0) + if err := view(ctx, lm.db, func(tx *bolt.Tx) error { - // content resources - if cbkt := topbkt.Bucket(bucketKeyObjectContent); cbkt != nil { - if err := cbkt.ForEach(func(k, _ []byte) error { - rs = append(rs, leases.Resource{ - ID: string(k), - Type: string(bucketKeyObjectContent), - }) - - return nil - }); err != nil { - return nil, err + topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) + if topbkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) } - } - // ingest resources - if lbkt := topbkt.Bucket(bucketKeyObjectIngests); lbkt != nil { - if err := lbkt.ForEach(func(k, _ []byte) error { - rs = append(rs, leases.Resource{ - ID: string(k), - Type: string(bucketKeyObjectIngests), - }) - - return nil - }); err != nil { - return nil, err - } - } - - // snapshot resources - if sbkt := topbkt.Bucket(bucketKeyObjectSnapshots); sbkt != nil { - if err := sbkt.ForEach(func(sk, sv []byte) error { - if sv != nil { - return nil - } - - snbkt := sbkt.Bucket(sk) - return snbkt.ForEach(func(k, _ []byte) error { + // content resources + if cbkt := topbkt.Bucket(bucketKeyObjectContent); cbkt != nil { + if err := cbkt.ForEach(func(k, _ []byte) error { rs = append(rs, leases.Resource{ ID: string(k), - Type: fmt.Sprintf("%s/%s", bucketKeyObjectSnapshots, sk), + Type: string(bucketKeyObjectContent), }) + return nil - }) - }); err != nil { - return nil, err + }); err != nil { + return err + } } + + // ingest resources + if lbkt := topbkt.Bucket(bucketKeyObjectIngests); lbkt != nil { + if err := lbkt.ForEach(func(k, _ []byte) error { + rs = append(rs, leases.Resource{ + ID: string(k), + Type: string(bucketKeyObjectIngests), + }) + + return nil + }); err != nil { + return err + } + } + + // snapshot resources + if sbkt := topbkt.Bucket(bucketKeyObjectSnapshots); sbkt != nil { + if err := sbkt.ForEach(func(sk, sv []byte) error { + if sv != nil { + return nil + } + + snbkt := sbkt.Bucket(sk) + return snbkt.ForEach(func(k, _ []byte) error { + rs = append(rs, leases.Resource{ + ID: string(k), + Type: fmt.Sprintf("%s/%s", bucketKeyObjectSnapshots, sk), + }) + return nil + }) + }); err != nil { + return err + } + } + + return nil + }); err != nil { + return nil, err } return rs, nil } diff --git a/metadata/leases_test.go b/metadata/leases_test.go index 26198ca40..182596f67 100644 --- a/metadata/leases_test.go +++ b/metadata/leases_test.go @@ -29,6 +29,8 @@ func TestLeases(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + lm := NewLeaseManager(NewDB(db, nil, nil)) + testCases := []struct { ID string CreateErr error @@ -51,7 +53,7 @@ func TestLeases(t *testing.T) { for _, tc := range testCases { if err := db.Update(func(tx *bolt.Tx) error { - lease, err := NewLeaseManager(tx).Create(ctx, leases.WithID(tc.ID)) + lease, err := lm.Create(WithTransactionContext(ctx, tx), leases.WithID(tc.ID)) if err != nil { if tc.CreateErr != nil && errors.Cause(err) == tc.CreateErr { return nil @@ -65,13 +67,8 @@ func TestLeases(t *testing.T) { } } - var listed []leases.Lease - // List leases, check same - if err := db.View(func(tx *bolt.Tx) error { - var err error - listed, err = NewLeaseManager(tx).List(ctx) - return err - }); err != nil { + listed, err := lm.List(ctx) + if err != nil { t.Fatal(err) } @@ -88,10 +85,8 @@ func TestLeases(t *testing.T) { } for _, tc := range testCases { - if err := db.Update(func(tx *bolt.Tx) error { - return NewLeaseManager(tx).Delete(ctx, leases.Lease{ - ID: tc.ID, - }) + if err := lm.Delete(ctx, leases.Lease{ + ID: tc.ID, }); err != nil { if tc.DeleteErr == nil && errors.Cause(err) != tc.DeleteErr { t.Fatal(err) @@ -100,11 +95,8 @@ func TestLeases(t *testing.T) { } } - if err := db.View(func(tx *bolt.Tx) error { - var err error - listed, err = NewLeaseManager(tx).List(ctx) - return err - }); err != nil { + listed, err = lm.List(ctx) + if err != nil { t.Fatal(err) } @@ -117,6 +109,8 @@ func TestLeasesList(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + lm := NewLeaseManager(NewDB(db, nil, nil)) + testset := [][]leases.Opt{ { leases.WithID("lease1"), @@ -143,14 +137,16 @@ func TestLeasesList(t *testing.T) { } // Insert all - for _, opts := range testset { - if err := db.Update(func(tx *bolt.Tx) error { - lm := NewLeaseManager(tx) - _, err := lm.Create(ctx, opts...) - return err - }); err != nil { - t.Fatal(err) + if err := db.Update(func(tx *bolt.Tx) error { + for _, opts := range testset { + _, err := lm.Create(WithTransactionContext(ctx, tx), opts...) + if err != nil { + return err + } } + return nil + }); err != nil { + t.Fatal(err) } for _, testcase := range []struct { @@ -201,39 +197,33 @@ func TestLeasesList(t *testing.T) { }, } { t.Run(testcase.name, func(t *testing.T) { - if err := db.View(func(tx *bolt.Tx) error { - lm := NewLeaseManager(tx) - results, err := lm.List(ctx, testcase.filters...) - if err != nil { - return err - } - - if len(results) != len(testcase.expected) { - t.Errorf("length of result does not match expected: %v != %v", len(results), len(testcase.expected)) - } - - expectedMap := map[string]struct{}{} - for _, expected := range testcase.expected { - expectedMap[expected] = struct{}{} - } - - for _, result := range results { - if _, ok := expectedMap[result.ID]; !ok { - t.Errorf("unexpected match: %v", result.ID) - } else { - delete(expectedMap, result.ID) - } - } - if len(expectedMap) > 0 { - for match := range expectedMap { - t.Errorf("missing match: %v", match) - } - } - - return nil - }); err != nil { + results, err := lm.List(ctx, testcase.filters...) + if err != nil { t.Fatal(err) } + + if len(results) != len(testcase.expected) { + t.Errorf("length of result does not match expected: %v != %v", len(results), len(testcase.expected)) + } + + expectedMap := map[string]struct{}{} + for _, expected := range testcase.expected { + expectedMap[expected] = struct{}{} + } + + for _, result := range results { + if _, ok := expectedMap[result.ID]; !ok { + t.Errorf("unexpected match: %v", result.ID) + } else { + delete(expectedMap, result.ID) + } + } + if len(expectedMap) > 0 { + for match := range expectedMap { + t.Errorf("missing match: %v", match) + } + } + }) } @@ -246,18 +236,12 @@ func TestLeasesList(t *testing.T) { } } - if err := db.Update(func(tx *bolt.Tx) error { - lm := NewLeaseManager(tx) - return lm.Delete(ctx, lease) - }); err != nil { + if err := lm.Delete(ctx, lease); err != nil { t.Fatal(err) } // try it again, get not found - if err := db.Update(func(tx *bolt.Tx) error { - lm := NewLeaseManager(tx) - return lm.Delete(ctx, lease) - }); err == nil { + if err := lm.Delete(ctx, lease); err == nil { t.Fatalf("expected error deleting non-existent lease") } else if !errdefs.IsNotFound(err) { t.Fatalf("unexpected error: %s", err) @@ -269,6 +253,8 @@ func TestLeaseResource(t *testing.T) { ctx, db, cancel := testEnv(t) defer cancel() + lm := NewLeaseManager(NewDB(db, nil, nil)) + var ( leaseID = "l1" @@ -280,10 +266,7 @@ func TestLeaseResource(t *testing.T) { ) // prepare lease - if err := db.Update(func(tx *bolt.Tx) error { - _, err0 := NewLeaseManager(tx).Create(ctx, leases.WithID(leaseID)) - return err0 - }); err != nil { + if _, err := lm.Create(ctx, leases.WithID(leaseID)); err != nil { t.Fatal(err) } @@ -379,7 +362,7 @@ func TestLeaseResource(t *testing.T) { idxList := make(map[leases.Resource]bool) for i, tc := range testCases { if err := db.Update(func(tx *bolt.Tx) error { - err0 := NewLeaseManager(tx).AddResource(ctx, tc.lease, tc.resource) + err0 := lm.AddResource(WithTransactionContext(ctx, tx), tc.lease, tc.resource) if got := errors.Cause(err0); got != tc.err { return errors.Errorf("expect error (%v), but got (%v)", tc.err, err0) } @@ -396,11 +379,8 @@ func TestLeaseResource(t *testing.T) { // check list function var gotList []leases.Resource - if err := db.View(func(tx *bolt.Tx) error { - var err error - gotList, err = NewLeaseManager(tx).ListResources(ctx, lease) - return err - }); err != nil { + gotList, err := lm.ListResources(ctx, lease) + if err != nil { t.Fatal(err) } @@ -420,21 +400,16 @@ func TestLeaseResource(t *testing.T) { } // remove snapshots - if err := db.Update(func(tx *bolt.Tx) error { - return NewLeaseManager(tx).DeleteResource(ctx, lease, leases.Resource{ - ID: snapshotterKey, - Type: "snapshots/overlayfs", - }) + if err := lm.DeleteResource(ctx, lease, leases.Resource{ + ID: snapshotterKey, + Type: "snapshots/overlayfs", }); err != nil { t.Fatal(err) } // check list number - if err := db.View(func(tx *bolt.Tx) error { - var err error - gotList, err = NewLeaseManager(tx).ListResources(ctx, lease) - return err - }); err != nil { + gotList, err = lm.ListResources(ctx, lease) + if err != nil { t.Fatal(err) } diff --git a/metadata/snapshot.go b/metadata/snapshot.go index 23976636f..4c38b41d7 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -21,6 +21,7 @@ import ( "fmt" "strings" "sync" + "sync/atomic" "time" "github.com/containerd/containerd/errdefs" @@ -517,9 +518,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { } // Mark snapshotter as dirty for triggering garbage collection - s.db.dirtyL.Lock() + atomic.AddUint32(&s.db.dirty, 1) s.db.dirtySS[s.name] = struct{}{} - s.db.dirtyL.Unlock() return nil }) diff --git a/runtime/v1/linux/runtime.go b/runtime/v1/linux/runtime.go index 0243c3986..fdaff5f9e 100644 --- a/runtime/v1/linux/runtime.go +++ b/runtime/v1/linux/runtime.go @@ -50,7 +50,6 @@ import ( ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" - bolt "go.etcd.io/bbolt" "golang.org/x/sys/unix" ) @@ -112,13 +111,13 @@ func New(ic *plugin.InitContext) (interface{}, error) { } cfg := ic.Config.(*Config) r := &Runtime{ - root: ic.Root, - state: ic.State, - tasks: runtime.NewTaskList(), - db: m.(*metadata.DB), - address: ic.Address, - events: ic.Events, - config: cfg, + root: ic.Root, + state: ic.State, + tasks: runtime.NewTaskList(), + containers: metadata.NewContainerStore(m.(*metadata.DB)), + address: ic.Address, + events: ic.Events, + config: cfg, } tasks, err := r.restoreTasks(ic.Context) if err != nil { @@ -138,9 +137,9 @@ type Runtime struct { state string address string - tasks *runtime.TaskList - db *metadata.DB - events *exchange.Exchange + tasks *runtime.TaskList + containers containers.Store + events *exchange.Exchange config *Config } @@ -508,14 +507,8 @@ func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, er } func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) { - var container containers.Container - - if err := r.db.View(func(tx *bolt.Tx) error { - store := metadata.NewContainerStore(tx) - var err error - container, err = store.Get(ctx, id) - return err - }); err != nil { + container, err := r.containers.Get(ctx, id) + if err != nil { return nil, err } diff --git a/runtime/v2/manager.go b/runtime/v2/manager.go index 5bd986641..0e110f7bf 100644 --- a/runtime/v2/manager.go +++ b/runtime/v2/manager.go @@ -33,7 +33,6 @@ import ( "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" ocispec "github.com/opencontainers/image-spec/specs-go/v1" - bolt "go.etcd.io/bbolt" ) // Config for the v2 runtime @@ -69,13 +68,15 @@ func init() { if err != nil { return nil, err } - return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, m.(*metadata.DB)) + cs := metadata.NewContainerStore(m.(*metadata.DB)) + + return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs) }, }) } // New task manager for v2 shims -func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) { +func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, cs containers.Store) (*TaskManager, error) { for _, d := range []string{root, state} { if err := os.MkdirAll(d, 0711); err != nil { return nil, err @@ -88,7 +89,7 @@ func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAdd containerdTTRPCAddress: containerdTTRPCAddress, tasks: runtime.NewTaskList(), events: events, - db: db, + containers: cs, } if err := m.loadExistingTasks(ctx); err != nil { return nil, err @@ -103,9 +104,9 @@ type TaskManager struct { containerdAddress string containerdTTRPCAddress string - tasks *runtime.TaskList - events *exchange.Exchange - db *metadata.DB + tasks *runtime.TaskList + events *exchange.Exchange + containers containers.Store } // ID of the task manager @@ -278,13 +279,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error { } func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) { - var container containers.Container - if err := m.db.View(func(tx *bolt.Tx) error { - store := metadata.NewContainerStore(tx) - var err error - container, err = store.Get(ctx, id) - return err - }); err != nil { + container, err := m.containers.Get(ctx, id) + if err != nil { return nil, err } return &container, nil diff --git a/services/containers/local.go b/services/containers/local.go index 7b1a24b8f..b1336494c 100644 --- a/services/containers/local.go +++ b/services/containers/local.go @@ -48,8 +48,11 @@ func init() { if err != nil { return nil, err } + + db := m.(*metadata.DB) return &local{ - db: m.(*metadata.DB), + Store: metadata.NewContainerStore(db), + db: db, publisher: ic.Events, }, nil }, @@ -57,6 +60,7 @@ func init() { } type local struct { + containers.Store db *metadata.DB publisher events.Publisher } @@ -66,8 +70,8 @@ var _ api.ContainersClient = &local{} func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc.CallOption) (*api.GetContainerResponse, error) { var resp api.GetContainerResponse - return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { - container, err := store.Get(ctx, req.ID) + return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error { + container, err := l.Store.Get(ctx, req.ID) if err != nil { return err } @@ -80,8 +84,8 @@ func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc func (l *local) List(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (*api.ListContainersResponse, error) { var resp api.ListContainersResponse - return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { - containers, err := store.List(ctx, req.Filters...) + return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error { + containers, err := l.Store.List(ctx, req.Filters...) if err != nil { return err } @@ -94,8 +98,8 @@ func (l *local) ListStream(ctx context.Context, req *api.ListContainersRequest, stream := &localStream{ ctx: ctx, } - return stream, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { - containers, err := store.List(ctx, req.Filters...) + return stream, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error { + containers, err := l.Store.List(ctx, req.Filters...) if err != nil { return err } @@ -107,10 +111,10 @@ func (l *local) ListStream(ctx context.Context, req *api.ListContainersRequest, func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) { var resp api.CreateContainerResponse - if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { + if err := l.withStoreUpdate(ctx, func(ctx context.Context) error { container := containerFromProto(&req.Container) - created, err := store.Create(ctx, container) + created, err := l.Store.Create(ctx, container) if err != nil { return err } @@ -144,13 +148,13 @@ func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ . container = containerFromProto(&req.Container) ) - if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { + if err := l.withStoreUpdate(ctx, func(ctx context.Context) error { var fieldpaths []string if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { fieldpaths = append(fieldpaths, req.UpdateMask.Paths...) } - updated, err := store.Update(ctx, container, fieldpaths...) + updated, err := l.Store.Update(ctx, container, fieldpaths...) if err != nil { return err } @@ -174,8 +178,8 @@ func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ . } func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { - if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { - return store.Delete(ctx, req.ID) + if err := l.withStoreUpdate(ctx, func(ctx context.Context) error { + return l.Store.Delete(ctx, req.ID) }); err != nil { return &ptypes.Empty{}, errdefs.ToGRPC(err) } @@ -189,15 +193,17 @@ func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ . return &ptypes.Empty{}, nil } -func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error { - return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) } +func (l *local) withStore(ctx context.Context, fn func(ctx context.Context) error) func(tx *bolt.Tx) error { + return func(tx *bolt.Tx) error { + return fn(metadata.WithTransactionContext(ctx, tx)) + } } -func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { +func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context) error) error { return l.db.View(l.withStore(ctx, fn)) } -func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { +func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context) error) error { return l.db.Update(l.withStore(ctx, fn)) } diff --git a/services/leases/local.go b/services/leases/local.go index fcc621d4d..f942ba45f 100644 --- a/services/leases/local.go +++ b/services/leases/local.go @@ -24,7 +24,6 @@ import ( "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/services" - bolt "go.etcd.io/bbolt" ) func init() { @@ -44,8 +43,8 @@ func init() { return nil, err } return &local{ - db: m.(*metadata.DB), - gc: g.(gcScheduler), + Manager: metadata.NewLeaseManager(m.(*metadata.DB)), + gc: g.(gcScheduler), }, nil }, }) @@ -56,22 +55,10 @@ type gcScheduler interface { } type local struct { - db *metadata.DB + leases.Manager gc gcScheduler } -func (l *local) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) { - var lease leases.Lease - if err := l.db.Update(func(tx *bolt.Tx) error { - var err error - lease, err = metadata.NewLeaseManager(tx).Create(ctx, opts...) - return err - }); err != nil { - return leases.Lease{}, err - } - return lease, nil -} - func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.DeleteOpt) error { var do leases.DeleteOptions for _, opt := range opts { @@ -80,9 +67,7 @@ func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.D } } - if err := l.db.Update(func(tx *bolt.Tx) error { - return metadata.NewLeaseManager(tx).Delete(ctx, lease) - }); err != nil { + if err := l.Manager.Delete(ctx, lease); err != nil { return err } @@ -95,39 +80,3 @@ func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.D return nil } - -func (l *local) List(ctx context.Context, filters ...string) ([]leases.Lease, error) { - var ll []leases.Lease - if err := l.db.View(func(tx *bolt.Tx) error { - var err error - ll, err = metadata.NewLeaseManager(tx).List(ctx, filters...) - return err - }); err != nil { - return nil, err - } - return ll, nil -} - -func (l *local) AddResource(ctx context.Context, lease leases.Lease, r leases.Resource) error { - return l.db.Update(func(tx *bolt.Tx) error { - return metadata.NewLeaseManager(tx).AddResource(ctx, lease, r) - }) -} - -func (l *local) DeleteResource(ctx context.Context, lease leases.Lease, r leases.Resource) error { - return l.db.Update(func(tx *bolt.Tx) error { - return metadata.NewLeaseManager(tx).DeleteResource(ctx, lease, r) - }) -} - -func (l *local) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) { - var rs []leases.Resource - if err := l.db.View(func(tx *bolt.Tx) error { - var err error - rs, err = metadata.NewLeaseManager(tx).ListResources(ctx, lease) - return err - }); err != nil { - return nil, err - } - return rs, nil -} diff --git a/services/tasks/local.go b/services/tasks/local.go index fc59936de..2833cd31b 100644 --- a/services/tasks/local.go +++ b/services/tasks/local.go @@ -51,7 +51,6 @@ import ( ptypes "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -101,14 +100,14 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { monitor = runtime.NewNoopMonitor() } - cs := m.(*metadata.DB).ContentStore() + db := m.(*metadata.DB) l := &local{ - runtimes: runtimes, - db: m.(*metadata.DB), - store: cs, - publisher: ic.Events, - monitor: monitor.(runtime.TaskMonitor), - v2Runtime: v2r.(*v2.TaskManager), + runtimes: runtimes, + containers: metadata.NewContainerStore(db), + store: db.ContentStore(), + publisher: ic.Events, + monitor: monitor.(runtime.TaskMonitor), + v2Runtime: v2r.(*v2.TaskManager), } for _, r := range runtimes { tasks, err := r.Tasks(ic.Context, true) @@ -123,10 +122,10 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) { } type local struct { - runtimes map[string]runtime.PlatformRuntime - db *metadata.DB - store content.Store - publisher events.Publisher + runtimes map[string]runtime.PlatformRuntime + containers containers.Store + store content.Store + publisher events.Publisher monitor runtime.TaskMonitor v2Runtime *v2.TaskManager @@ -647,12 +646,8 @@ func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Re func (l *local) getContainer(ctx context.Context, id string) (*containers.Container, error) { var container containers.Container - if err := l.db.View(func(tx *bolt.Tx) error { - store := metadata.NewContainerStore(tx) - var err error - container, err = store.Get(ctx, id) - return err - }); err != nil { + container, err := l.containers.Get(ctx, id) + if err != nil { return nil, errdefs.ToGRPC(err) } return &container, nil