Merge pull request #3668 from dmcgowan/fix-metadata-dirty

Update metadata interfaces for containers and leases
This commit is contained in:
Phil Estes 2019-09-24 09:38:27 -04:00 committed by GitHub
commit 9c10bf89ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 539 additions and 606 deletions

View File

@ -19,6 +19,7 @@ package metadata
import ( import (
"context" "context"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/containerd/containerd/containers" "github.com/containerd/containerd/containers"
@ -35,13 +36,13 @@ import (
) )
type containerStore struct { type containerStore struct {
tx *bolt.Tx db *DB
} }
// NewContainerStore returns a Store backed by an underlying bolt DB // NewContainerStore returns a Store backed by an underlying bolt DB
func NewContainerStore(tx *bolt.Tx) containers.Store { func NewContainerStore(db *DB) containers.Store {
return &containerStore{ return &containerStore{
tx: tx, db: db,
} }
} }
@ -51,14 +52,21 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain
return containers.Container{}, err return containers.Container{}, err
} }
bkt := getContainerBucket(s.tx, namespace, id) container := containers.Container{ID: id}
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainerBucket(tx, namespace, id)
if bkt == nil { if bkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace) return errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
} }
container := containers.Container{ID: id}
if err := readContainer(&container, bkt); err != nil { if err := readContainer(&container, bkt); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to read container %q", id) return errors.Wrapf(err, "failed to read container %q", id)
}
return nil
}); err != nil {
return containers.Container{}, err
} }
return container, nil return container, nil
@ -75,13 +83,15 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error()) return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error())
} }
bkt := getContainersBucket(s.tx, namespace) var m []containers.Container
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainersBucket(tx, namespace)
if bkt == nil { if bkt == nil {
return nil, nil // empty store return nil // empty store
} }
var m []containers.Container return bkt.ForEach(func(k, v []byte) error {
if err := bkt.ForEach(func(k, v []byte) error {
cbkt := bkt.Bucket(k) cbkt := bkt.Bucket(k)
if cbkt == nil { if cbkt == nil {
return nil return nil
@ -96,6 +106,7 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
m = append(m, container) m = append(m, container)
} }
return nil return nil
})
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
@ -113,9 +124,10 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
return containers.Container{}, errors.Wrap(err, "create container failed validation") return containers.Container{}, errors.Wrap(err, "create container failed validation")
} }
bkt, err := createContainersBucket(s.tx, namespace) if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt, err := createContainersBucket(tx, namespace)
if err != nil { if err != nil {
return containers.Container{}, err return err
} }
cbkt, err := bkt.CreateBucket([]byte(container.ID)) cbkt, err := bkt.CreateBucket([]byte(container.ID))
@ -123,13 +135,18 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
if err == bolt.ErrBucketExists { if err == bolt.ErrBucketExists {
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID) err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
} }
return containers.Container{}, err return err
} }
container.CreatedAt = time.Now().UTC() container.CreatedAt = time.Now().UTC()
container.UpdatedAt = container.CreatedAt container.UpdatedAt = container.CreatedAt
if err := writeContainer(cbkt, &container); err != nil { if err := writeContainer(cbkt, &container); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID) return errors.Wrapf(err, "failed to write container %q", container.ID)
}
return nil
}); err != nil {
return containers.Container{}, err
} }
return container, nil return container, nil
@ -145,19 +162,20 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "must specify a container id") return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "must specify a container id")
} }
bkt := getContainersBucket(s.tx, namespace) var updated containers.Container
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainersBucket(tx, namespace)
if bkt == nil { if bkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace) return errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
} }
cbkt := bkt.Bucket([]byte(container.ID)) cbkt := bkt.Bucket([]byte(container.ID))
if cbkt == nil { if cbkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID) return errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
} }
var updated containers.Container
if err := readContainer(&updated, cbkt); err != nil { if err := readContainer(&updated, cbkt); err != nil {
return updated, errors.Wrapf(err, "failed to read container %q", container.ID) return errors.Wrapf(err, "failed to read container %q", container.ID)
} }
createdat := updated.CreatedAt createdat := updated.CreatedAt
updated.ID = container.ID updated.ID = container.ID
@ -170,11 +188,11 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
// are provided. This allows these fields to become mutable in the // are provided. This allows these fields to become mutable in the
// future. // future.
if updated.Snapshotter != container.Snapshotter { if updated.Snapshotter != container.Snapshotter {
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable") return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable")
} }
if updated.Runtime.Name != container.Runtime.Name { if updated.Runtime.Name != container.Runtime.Name {
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable") return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable")
} }
} }
@ -212,18 +230,23 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
case "snapshotkey": case "snapshotkey":
updated.SnapshotKey = container.SnapshotKey updated.SnapshotKey = container.SnapshotKey
default: default:
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID) return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
} }
} }
if err := validateContainer(&updated); err != nil { if err := validateContainer(&updated); err != nil {
return containers.Container{}, errors.Wrap(err, "update failed validation") return errors.Wrap(err, "update failed validation")
} }
updated.CreatedAt = createdat updated.CreatedAt = createdat
updated.UpdatedAt = time.Now().UTC() updated.UpdatedAt = time.Now().UTC()
if err := writeContainer(cbkt, &updated); err != nil { if err := writeContainer(cbkt, &updated); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID) return errors.Wrapf(err, "failed to write container %q", container.ID)
}
return nil
}); err != nil {
return containers.Container{}, err
} }
return updated, nil return updated, nil
@ -235,17 +258,25 @@ func (s *containerStore) Delete(ctx context.Context, id string) error {
return err return err
} }
bkt := getContainersBucket(s.tx, namespace) return update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainersBucket(tx, namespace)
if bkt == nil { if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace) return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
} }
if err := bkt.DeleteBucket([]byte(id)); err == bolt.ErrBucketNotFound { if err := bkt.DeleteBucket([]byte(id)); err != nil {
return errors.Wrapf(errdefs.ErrNotFound, "container %v", id) if err == bolt.ErrBucketNotFound {
err = errors.Wrapf(errdefs.ErrNotFound, "container %v", id)
} }
return err return err
} }
atomic.AddUint32(&s.db.dirty, 1)
return nil
})
}
func validateContainer(container *containers.Container) error { func validateContainer(container *containers.Container) error {
if err := identifiers.Validate(container.ID); err != nil { if err := identifiers.Validate(container.ID); err != nil {
return errors.Wrap(err, "container.ID") return errors.Wrap(err, "container.ID")

View File

@ -47,6 +47,8 @@ func TestContainersList(t *testing.T) {
ctx, db, cancel := testEnv(t) ctx, db, cancel := testEnv(t)
defer cancel() defer cancel()
store := NewContainerStore(NewDB(db, nil, nil))
spec := &specs.Spec{} spec := &specs.Spec{}
encoded, err := typeurl.MarshalAny(spec) encoded, err := typeurl.MarshalAny(spec)
if err != nil { if err != nil {
@ -73,9 +75,8 @@ func TestContainersList(t *testing.T) {
} }
if err := db.Update(func(tx *bolt.Tx) error { if err := db.Update(func(tx *bolt.Tx) error {
store := NewContainerStore(tx)
now := time.Now() now := time.Now()
result, err := store.Create(ctx, *testset[id]) result, err := store.Create(WithTransactionContext(ctx, tx), *testset[id])
if err != nil { if err != nil {
return err return err
} }
@ -138,8 +139,6 @@ func TestContainersList(t *testing.T) {
testset = newtestset testset = newtestset
} }
if err := db.View(func(tx *bolt.Tx) error {
store := NewContainerStore(tx)
results, err := store.List(ctx, testcase.filters...) results, err := store.List(ctx, testcase.filters...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -156,28 +155,19 @@ func TestContainersList(t *testing.T) {
for _, result := range results { for _, result := range results {
checkContainersEqual(t, &result, testset[result.ID], "list results did not match") checkContainersEqual(t, &result, testset[result.ID], "list results did not match")
} }
return nil
}); err != nil {
t.Fatal(err)
}
}) })
} }
// delete everything to test it // delete everything to test it
for id := range testset { for id := range testset {
if err := db.Update(func(tx *bolt.Tx) error { if err := store.Delete(ctx, id); err != nil {
store := NewContainerStore(tx)
return store.Delete(ctx, id)
}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// try it again, get NotFound // try it again, get NotFound
if err := db.Update(func(tx *bolt.Tx) error { if err := store.Delete(ctx, id); err == nil {
store := NewContainerStore(tx) t.Fatalf("expected error deleting non-existent container")
return store.Delete(ctx, id) } else if errors.Cause(err) != errdefs.ErrNotFound {
}); errors.Cause(err) != errdefs.ErrNotFound {
t.Fatalf("unexpected error %v", err) t.Fatalf("unexpected error %v", err)
} }
} }
@ -188,6 +178,8 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
ctx, db, cancel := testEnv(t) ctx, db, cancel := testEnv(t)
defer cancel() defer cancel()
store := NewContainerStore(NewDB(db, nil, nil))
spec := &specs.Spec{} spec := &specs.Spec{}
encoded, err := typeurl.MarshalAny(spec) encoded, err := typeurl.MarshalAny(spec)
if err != nil { if err != nil {
@ -641,12 +633,7 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
} }
testcase.expected.ID = testcase.name testcase.expected.ID = testcase.name
done := errors.New("test complete") now := time.Now().UTC()
if err := db.Update(func(tx *bolt.Tx) error {
var (
now = time.Now().UTC()
store = NewContainerStore(tx)
)
result, err := store.Create(ctx, testcase.original) result, err := store.Create(ctx, testcase.original)
if errors.Cause(err) != testcase.createerr { if errors.Cause(err) != testcase.createerr {
@ -656,7 +643,7 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr) t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr)
} }
} else if testcase.createerr != nil { } else if testcase.createerr != nil {
return done return
} }
checkContainerTimestamps(t, &result, now, true) checkContainerTimestamps(t, &result, now, true)
@ -668,18 +655,9 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
testcase.expected.UpdatedAt = result.UpdatedAt testcase.expected.UpdatedAt = result.UpdatedAt
checkContainersEqual(t, &result, &testcase.original, "unexpected result on container update") checkContainersEqual(t, &result, &testcase.original, "unexpected result on container update")
return nil
}); err != nil {
if err == done {
return
}
t.Fatal(err)
}
if err := db.Update(func(tx *bolt.Tx) error { now = time.Now()
now := time.Now() result, err = store.Update(ctx, testcase.input, testcase.fieldpaths...)
store := NewContainerStore(tx)
result, err := store.Update(ctx, testcase.input, testcase.fieldpaths...)
if errors.Cause(err) != testcase.cause { if errors.Cause(err) != testcase.cause {
if testcase.cause == nil { if testcase.cause == nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
@ -687,33 +665,19 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause) t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause)
} }
} else if testcase.cause != nil { } else if testcase.cause != nil {
return done return
} }
checkContainerTimestamps(t, &result, now, false) checkContainerTimestamps(t, &result, now, false)
testcase.expected.UpdatedAt = result.UpdatedAt testcase.expected.UpdatedAt = result.UpdatedAt
checkContainersEqual(t, &result, &testcase.expected, "updated failed to get expected result") checkContainersEqual(t, &result, &testcase.expected, "updated failed to get expected result")
return nil
}); err != nil {
if err == done {
return
}
t.Fatal(err)
}
if err := db.View(func(tx *bolt.Tx) error { result, err = store.Get(ctx, testcase.original.ID)
store := NewContainerStore(tx)
result, err := store.Get(ctx, testcase.original.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
checkContainersEqual(t, &result, &testcase.expected, "get after failed to get expected result") checkContainersEqual(t, &result, &testcase.expected, "get after failed to get expected result")
return nil
}); err != nil {
t.Fatal(err)
}
}) })
} }
} }

View File

@ -21,6 +21,7 @@ import (
"encoding/binary" "encoding/binary"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
@ -221,9 +222,8 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
} }
// Mark content store as dirty for triggering garbage collection // Mark content store as dirty for triggering garbage collection
cs.db.dirtyL.Lock() atomic.AddUint32(&cs.db.dirty, 1)
cs.db.dirtyCS = true cs.db.dirtyCS = true
cs.db.dirtyL.Unlock()
return nil return nil
}) })

View File

@ -166,18 +166,14 @@ func TestIngestLeased(t *testing.T) {
} }
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) { func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
if err := db.Update(func(tx *bolt.Tx) error { lm := NewLeaseManager(db)
_, err := NewLeaseManager(tx).Create(ctx, leases.WithID(name)) if _, err := lm.Create(ctx, leases.WithID(name)); err != nil {
return err
}); err != nil {
return nil, nil, err return nil, nil, err
} }
return leases.WithLease(ctx, name), func() error { return leases.WithLease(ctx, name), func() error {
return db.Update(func(tx *bolt.Tx) error { return lm.Delete(ctx, leases.Lease{
return NewLeaseManager(tx).Delete(ctx, leases.Lease{
ID: name, ID: name,
}) })
})
}, nil }, nil
} }

View File

@ -21,6 +21,7 @@ import (
"encoding/binary" "encoding/binary"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/containerd/containerd/content" "github.com/containerd/containerd/content"
@ -75,10 +76,16 @@ type DB struct {
// sweep phases without preventing read transactions. // sweep phases without preventing read transactions.
wlock sync.RWMutex wlock sync.RWMutex
// dirty flags and lock keeps track of datastores which have had deletions // dirty flag indicates that refences have been removed which require
// since the last garbage collection. These datastores will will be garbage // a garbage collection to ensure the database is clean. This tracks
// collected during the next garbage collection. // the number of dirty operations. This should be updated and read
dirtyL sync.Mutex // atomically if outside of wlock.Lock.
dirty uint32
// dirtySS and dirtyCS flags keeps track of datastores which have had
// deletions since the last garbage collection. These datastores will
// be garbage collected during the next garbage collection. These
// should only be updated inside of a write transaction or wlock.Lock.
dirtySS map[string]struct{} dirtySS map[string]struct{}
dirtyCS bool dirtyCS bool
@ -237,12 +244,10 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
defer m.wlock.RUnlock() defer m.wlock.RUnlock()
err := m.db.Update(fn) err := m.db.Update(fn)
if err == nil { if err == nil {
m.dirtyL.Lock() dirty := atomic.LoadUint32(&m.dirty) > 0
dirty := m.dirtyCS || len(m.dirtySS) > 0
for _, fn := range m.mutationCallbacks { for _, fn := range m.mutationCallbacks {
fn(dirty) fn(dirty)
} }
m.dirtyL.Unlock()
} }
return err return err
@ -254,9 +259,9 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
// The callback function is an argument for whether a deletion has occurred // The callback function is an argument for whether a deletion has occurred
// since the last garbage collection. // since the last garbage collection.
func (m *DB) RegisterMutationCallback(fn func(bool)) { func (m *DB) RegisterMutationCallback(fn func(bool)) {
m.dirtyL.Lock() m.wlock.Lock()
m.mutationCallbacks = append(m.mutationCallbacks, fn) m.mutationCallbacks = append(m.mutationCallbacks, fn)
m.dirtyL.Unlock() m.wlock.Unlock()
} }
// GCStats holds the duration for the different phases of the garbage collector // GCStats holds the duration for the different phases of the garbage collector
@ -282,8 +287,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
return nil, err return nil, err
} }
m.dirtyL.Lock()
if err := m.db.Update(func(tx *bolt.Tx) error { if err := m.db.Update(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -309,7 +312,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
return nil return nil
}); err != nil { }); err != nil {
m.dirtyL.Unlock()
m.wlock.Unlock() m.wlock.Unlock()
return nil, err return nil, err
} }
@ -317,6 +319,9 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
var stats GCStats var stats GCStats
var wg sync.WaitGroup var wg sync.WaitGroup
// reset dirty, no need for atomic inside of wlock.Lock
m.dirty = 0
if len(m.dirtySS) > 0 { if len(m.dirtySS) > 0 {
var sl sync.Mutex var sl sync.Mutex
stats.SnapshotD = map[string]time.Duration{} stats.SnapshotD = map[string]time.Duration{}
@ -349,8 +354,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
m.dirtyCS = false m.dirtyCS = false
} }
m.dirtyL.Unlock()
stats.MetaD = time.Since(t1) stats.MetaD = time.Since(t1)
m.wlock.Unlock() m.wlock.Unlock()

View File

@ -386,7 +386,7 @@ func TestMetadataCollector(t *testing.T) {
if err := mdb.Update(func(tx *bolt.Tx) error { if err := mdb.Update(func(tx *bolt.Tx) error {
for _, obj := range objects { for _, obj := range objects {
node, err := create(obj, tx, NewImageStore(mdb), cs, sn) node, err := create(obj, tx, mdb, cs, sn)
if err != nil { if err != nil {
return err return err
} }
@ -461,7 +461,7 @@ func benchmarkTrigger(n int) func(b *testing.B) {
if err := mdb.Update(func(tx *bolt.Tx) error { if err := mdb.Update(func(tx *bolt.Tx) error {
for _, obj := range objects { for _, obj := range objects {
node, err := create(obj, tx, NewImageStore(mdb), cs, sn) node, err := create(obj, tx, mdb, cs, sn)
if err != nil { if err != nil {
return err return err
} }
@ -541,16 +541,15 @@ type object struct {
labels map[string]string labels map[string]string
} }
func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) { func create(obj object, tx *bolt.Tx, db *DB, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) {
var ( var (
node *gc.Node node *gc.Node
namespace = "test" namespace = "test"
ctx = namespaces.WithNamespace(context.Background(), namespace) ctx = WithTransactionContext(namespaces.WithNamespace(context.Background(), namespace), tx)
) )
switch v := obj.data.(type) { switch v := obj.data.(type) {
case testContent: case testContent:
ctx := WithTransactionContext(ctx, tx)
expected := digest.FromBytes(v.data) expected := digest.FromBytes(v.data)
w, err := cs.Writer(ctx, w, err := cs.Writer(ctx,
content.WithRef("test-ref"), content.WithRef("test-ref"),
@ -572,7 +571,6 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
} }
} }
case testSnapshot: case testSnapshot:
ctx := WithTransactionContext(ctx, tx)
if v.active { if v.active {
_, err := sn.Prepare(ctx, v.key, v.parent, snapshots.WithLabels(obj.labels)) _, err := sn.Prepare(ctx, v.key, v.parent, snapshots.WithLabels(obj.labels))
if err != nil { if err != nil {
@ -596,14 +594,13 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
} }
} }
case testImage: case testImage:
ctx := WithTransactionContext(ctx, tx)
image := images.Image{ image := images.Image{
Name: v.name, Name: v.name,
Target: v.target, Target: v.target,
Labels: obj.labels, Labels: obj.labels,
} }
_, err := is.Create(ctx, image)
_, err := NewImageStore(db).Create(ctx, image)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to create image") return nil, errors.Wrap(err, "failed to create image")
} }
@ -619,12 +616,13 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
}, },
Spec: &types.Any{}, Spec: &types.Any{},
} }
_, err := NewContainerStore(tx).Create(ctx, container) _, err := NewContainerStore(db).Create(ctx, container)
if err != nil { if err != nil {
return nil, err return nil, err
} }
case testLease: case testLease:
lm := NewLeaseManager(tx) lm := NewLeaseManager(db)
l, err := lm.Create(ctx, leases.WithID(v.id), leases.WithLabels(obj.labels)) l, err := lm.Create(ctx, leases.WithID(v.id), leases.WithLabels(obj.labels))
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -21,6 +21,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
@ -249,19 +250,16 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
} }
err = bkt.DeleteBucket([]byte(name)) if err = bkt.DeleteBucket([]byte(name)); err != nil {
if err == bolt.ErrBucketNotFound { if err == bolt.ErrBucketNotFound {
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name) err = errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
}
return err
} }
// A reference to a piece of content has been removed, atomic.AddUint32(&s.db.dirty, 1)
// mark content store as dirty for triggering garbage
// collection
s.db.dirtyL.Lock()
s.db.dirtyCS = true
s.db.dirtyL.Unlock()
return err return nil
}) })
} }

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"strings" "strings"
"sync/atomic"
"time" "time"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
@ -35,14 +36,14 @@ import (
// LeaseManager manages the create/delete lifecycle of leases // LeaseManager manages the create/delete lifecycle of leases
// and also returns existing leases // and also returns existing leases
type LeaseManager struct { type LeaseManager struct {
tx *bolt.Tx db *DB
} }
// NewLeaseManager creates a new lease manager for managing leases using // NewLeaseManager creates a new lease manager for managing leases using
// the provided database transaction. // the provided database transaction.
func NewLeaseManager(tx *bolt.Tx) *LeaseManager { func NewLeaseManager(db *DB) *LeaseManager {
return &LeaseManager{ return &LeaseManager{
tx: tx, db: db,
} }
} }
@ -63,9 +64,10 @@ func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases.
return leases.Lease{}, err return leases.Lease{}, err
} }
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) if err := update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil { if err != nil {
return leases.Lease{}, err return err
} }
txbkt, err := topbkt.CreateBucket([]byte(l.ID)) txbkt, err := topbkt.CreateBucket([]byte(l.ID))
@ -73,25 +75,29 @@ func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases.
if err == bolt.ErrBucketExists { if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists err = errdefs.ErrAlreadyExists
} }
return leases.Lease{}, errors.Wrapf(err, "lease %q", l.ID) return errors.Wrapf(err, "lease %q", l.ID)
} }
t := time.Now().UTC() t := time.Now().UTC()
createdAt, err := t.MarshalBinary() createdAt, err := t.MarshalBinary()
if err != nil { if err != nil {
return leases.Lease{}, err return err
} }
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil { if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return leases.Lease{}, err return err
} }
if l.Labels != nil { if l.Labels != nil {
if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil { if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil {
return leases.Lease{}, err return err
} }
} }
l.CreatedAt = t l.CreatedAt = t
return nil
}); err != nil {
return leases.Lease{}, err
}
return l, nil return l, nil
} }
@ -102,7 +108,8 @@ func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease, _ ...lea
return err return err
} }
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) return update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil { if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
} }
@ -112,7 +119,11 @@ func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease, _ ...lea
} }
return err return err
} }
atomic.AddUint32(&lm.db.dirty, 1)
return nil return nil
})
} }
// List lists all active leases // List lists all active leases
@ -129,12 +140,13 @@ func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease,
var ll []leases.Lease var ll []leases.Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases) if err := view(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil { if topbkt == nil {
return ll, nil return nil
} }
if err := topbkt.ForEach(func(k, v []byte) error { return topbkt.ForEach(func(k, v []byte) error {
if v != nil { if v != nil {
return nil return nil
} }
@ -162,6 +174,7 @@ func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease,
} }
return nil return nil
})
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
@ -176,7 +189,8 @@ func (lm *LeaseManager) AddResource(ctx context.Context, lease leases.Lease, r l
return err return err
} }
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) return update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil { if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
} }
@ -194,6 +208,7 @@ func (lm *LeaseManager) AddResource(ctx context.Context, lease leases.Lease, r l
} }
} }
return bkt.Put([]byte(ref), nil) return bkt.Put([]byte(ref), nil)
})
} }
// DeleteResource dereferences the resource by the provided lease. // DeleteResource dereferences the resource by the provided lease.
@ -203,7 +218,8 @@ func (lm *LeaseManager) DeleteResource(ctx context.Context, lease leases.Lease,
return err return err
} }
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) return update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil { if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID) return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
} }
@ -221,10 +237,16 @@ func (lm *LeaseManager) DeleteResource(ctx context.Context, lease leases.Lease,
bkt = bkt.Bucket([]byte(key)) bkt = bkt.Bucket([]byte(key))
} }
if bkt == nil { if bkt != nil {
return nil if err := bkt.Delete([]byte(ref)); err != nil {
return err
} }
return bkt.Delete([]byte(ref)) }
atomic.AddUint32(&lm.db.dirty, 1)
return nil
})
} }
// ListResources lists all the resources referenced by the lease. // ListResources lists all the resources referenced by the lease.
@ -234,12 +256,14 @@ func (lm *LeaseManager) ListResources(ctx context.Context, lease leases.Lease) (
return nil, err return nil, err
} }
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID)) var rs []leases.Resource
if topbkt == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
rs := make([]leases.Resource, 0) if err := view(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
// content resources // content resources
if cbkt := topbkt.Bucket(bucketKeyObjectContent); cbkt != nil { if cbkt := topbkt.Bucket(bucketKeyObjectContent); cbkt != nil {
@ -251,7 +275,7 @@ func (lm *LeaseManager) ListResources(ctx context.Context, lease leases.Lease) (
return nil return nil
}); err != nil { }); err != nil {
return nil, err return err
} }
} }
@ -265,7 +289,7 @@ func (lm *LeaseManager) ListResources(ctx context.Context, lease leases.Lease) (
return nil return nil
}); err != nil { }); err != nil {
return nil, err return err
} }
} }
@ -285,9 +309,14 @@ func (lm *LeaseManager) ListResources(ctx context.Context, lease leases.Lease) (
return nil return nil
}) })
}); err != nil { }); err != nil {
return nil, err return err
} }
} }
return nil
}); err != nil {
return nil, err
}
return rs, nil return rs, nil
} }

View File

@ -29,6 +29,8 @@ func TestLeases(t *testing.T) {
ctx, db, cancel := testEnv(t) ctx, db, cancel := testEnv(t)
defer cancel() defer cancel()
lm := NewLeaseManager(NewDB(db, nil, nil))
testCases := []struct { testCases := []struct {
ID string ID string
CreateErr error CreateErr error
@ -51,7 +53,7 @@ func TestLeases(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error { if err := db.Update(func(tx *bolt.Tx) error {
lease, err := NewLeaseManager(tx).Create(ctx, leases.WithID(tc.ID)) lease, err := lm.Create(WithTransactionContext(ctx, tx), leases.WithID(tc.ID))
if err != nil { if err != nil {
if tc.CreateErr != nil && errors.Cause(err) == tc.CreateErr { if tc.CreateErr != nil && errors.Cause(err) == tc.CreateErr {
return nil return nil
@ -65,13 +67,8 @@ func TestLeases(t *testing.T) {
} }
} }
var listed []leases.Lease listed, err := lm.List(ctx)
// List leases, check same if err != nil {
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx)
return err
}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -88,10 +85,8 @@ func TestLeases(t *testing.T) {
} }
for _, tc := range testCases { for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error { if err := lm.Delete(ctx, leases.Lease{
return NewLeaseManager(tx).Delete(ctx, leases.Lease{
ID: tc.ID, ID: tc.ID,
})
}); err != nil { }); err != nil {
if tc.DeleteErr == nil && errors.Cause(err) != tc.DeleteErr { if tc.DeleteErr == nil && errors.Cause(err) != tc.DeleteErr {
t.Fatal(err) t.Fatal(err)
@ -100,11 +95,8 @@ func TestLeases(t *testing.T) {
} }
} }
if err := db.View(func(tx *bolt.Tx) error { listed, err = lm.List(ctx)
var err error if err != nil {
listed, err = NewLeaseManager(tx).List(ctx)
return err
}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -117,6 +109,8 @@ func TestLeasesList(t *testing.T) {
ctx, db, cancel := testEnv(t) ctx, db, cancel := testEnv(t)
defer cancel() defer cancel()
lm := NewLeaseManager(NewDB(db, nil, nil))
testset := [][]leases.Opt{ testset := [][]leases.Opt{
{ {
leases.WithID("lease1"), leases.WithID("lease1"),
@ -143,15 +137,17 @@ func TestLeasesList(t *testing.T) {
} }
// Insert all // Insert all
for _, opts := range testset {
if err := db.Update(func(tx *bolt.Tx) error { if err := db.Update(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx) for _, opts := range testset {
_, err := lm.Create(ctx, opts...) _, err := lm.Create(WithTransactionContext(ctx, tx), opts...)
if err != nil {
return err return err
}
}
return nil
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
}
for _, testcase := range []struct { for _, testcase := range []struct {
name string name string
@ -201,11 +197,9 @@ func TestLeasesList(t *testing.T) {
}, },
} { } {
t.Run(testcase.name, func(t *testing.T) { t.Run(testcase.name, func(t *testing.T) {
if err := db.View(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
results, err := lm.List(ctx, testcase.filters...) results, err := lm.List(ctx, testcase.filters...)
if err != nil { if err != nil {
return err t.Fatal(err)
} }
if len(results) != len(testcase.expected) { if len(results) != len(testcase.expected) {
@ -230,10 +224,6 @@ func TestLeasesList(t *testing.T) {
} }
} }
return nil
}); err != nil {
t.Fatal(err)
}
}) })
} }
@ -246,18 +236,12 @@ func TestLeasesList(t *testing.T) {
} }
} }
if err := db.Update(func(tx *bolt.Tx) error { if err := lm.Delete(ctx, lease); err != nil {
lm := NewLeaseManager(tx)
return lm.Delete(ctx, lease)
}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// try it again, get not found // try it again, get not found
if err := db.Update(func(tx *bolt.Tx) error { if err := lm.Delete(ctx, lease); err == nil {
lm := NewLeaseManager(tx)
return lm.Delete(ctx, lease)
}); err == nil {
t.Fatalf("expected error deleting non-existent lease") t.Fatalf("expected error deleting non-existent lease")
} else if !errdefs.IsNotFound(err) { } else if !errdefs.IsNotFound(err) {
t.Fatalf("unexpected error: %s", err) t.Fatalf("unexpected error: %s", err)
@ -269,6 +253,8 @@ func TestLeaseResource(t *testing.T) {
ctx, db, cancel := testEnv(t) ctx, db, cancel := testEnv(t)
defer cancel() defer cancel()
lm := NewLeaseManager(NewDB(db, nil, nil))
var ( var (
leaseID = "l1" leaseID = "l1"
@ -280,10 +266,7 @@ func TestLeaseResource(t *testing.T) {
) )
// prepare lease // prepare lease
if err := db.Update(func(tx *bolt.Tx) error { if _, err := lm.Create(ctx, leases.WithID(leaseID)); err != nil {
_, err0 := NewLeaseManager(tx).Create(ctx, leases.WithID(leaseID))
return err0
}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -379,7 +362,7 @@ func TestLeaseResource(t *testing.T) {
idxList := make(map[leases.Resource]bool) idxList := make(map[leases.Resource]bool)
for i, tc := range testCases { for i, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error { if err := db.Update(func(tx *bolt.Tx) error {
err0 := NewLeaseManager(tx).AddResource(ctx, tc.lease, tc.resource) err0 := lm.AddResource(WithTransactionContext(ctx, tx), tc.lease, tc.resource)
if got := errors.Cause(err0); got != tc.err { if got := errors.Cause(err0); got != tc.err {
return errors.Errorf("expect error (%v), but got (%v)", tc.err, err0) return errors.Errorf("expect error (%v), but got (%v)", tc.err, err0)
} }
@ -396,11 +379,8 @@ func TestLeaseResource(t *testing.T) {
// check list function // check list function
var gotList []leases.Resource var gotList []leases.Resource
if err := db.View(func(tx *bolt.Tx) error { gotList, err := lm.ListResources(ctx, lease)
var err error if err != nil {
gotList, err = NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -420,21 +400,16 @@ func TestLeaseResource(t *testing.T) {
} }
// remove snapshots // remove snapshots
if err := db.Update(func(tx *bolt.Tx) error { if err := lm.DeleteResource(ctx, lease, leases.Resource{
return NewLeaseManager(tx).DeleteResource(ctx, lease, leases.Resource{
ID: snapshotterKey, ID: snapshotterKey,
Type: "snapshots/overlayfs", Type: "snapshots/overlayfs",
})
}); err != nil { }); err != nil {
t.Fatal(err) t.Fatal(err)
} }
// check list number // check list number
if err := db.View(func(tx *bolt.Tx) error { gotList, err = lm.ListResources(ctx, lease)
var err error if err != nil {
gotList, err = NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -21,6 +21,7 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
@ -517,9 +518,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
} }
// Mark snapshotter as dirty for triggering garbage collection // Mark snapshotter as dirty for triggering garbage collection
s.db.dirtyL.Lock() atomic.AddUint32(&s.db.dirty, 1)
s.db.dirtySS[s.name] = struct{}{} s.db.dirtySS[s.name] = struct{}{}
s.db.dirtyL.Unlock()
return nil return nil
}) })

View File

@ -50,7 +50,6 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"golang.org/x/sys/unix" "golang.org/x/sys/unix"
) )
@ -115,7 +114,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
root: ic.Root, root: ic.Root,
state: ic.State, state: ic.State,
tasks: runtime.NewTaskList(), tasks: runtime.NewTaskList(),
db: m.(*metadata.DB), containers: metadata.NewContainerStore(m.(*metadata.DB)),
address: ic.Address, address: ic.Address,
events: ic.Events, events: ic.Events,
config: cfg, config: cfg,
@ -139,7 +138,7 @@ type Runtime struct {
address string address string
tasks *runtime.TaskList tasks *runtime.TaskList
db *metadata.DB containers containers.Store
events *exchange.Exchange events *exchange.Exchange
config *Config config *Config
@ -508,14 +507,8 @@ func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, er
} }
func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) { func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) {
var container containers.Container container, err := r.containers.Get(ctx, id)
if err != nil {
if err := r.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
return nil, err return nil, err
} }

View File

@ -33,7 +33,6 @@ import (
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime" "github.com/containerd/containerd/runtime"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
bolt "go.etcd.io/bbolt"
) )
// Config for the v2 runtime // Config for the v2 runtime
@ -69,13 +68,15 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, m.(*metadata.DB)) cs := metadata.NewContainerStore(m.(*metadata.DB))
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs)
}, },
}) })
} }
// New task manager for v2 shims // New task manager for v2 shims
func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) { func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, cs containers.Store) (*TaskManager, error) {
for _, d := range []string{root, state} { for _, d := range []string{root, state} {
if err := os.MkdirAll(d, 0711); err != nil { if err := os.MkdirAll(d, 0711); err != nil {
return nil, err return nil, err
@ -88,7 +89,7 @@ func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAdd
containerdTTRPCAddress: containerdTTRPCAddress, containerdTTRPCAddress: containerdTTRPCAddress,
tasks: runtime.NewTaskList(), tasks: runtime.NewTaskList(),
events: events, events: events,
db: db, containers: cs,
} }
if err := m.loadExistingTasks(ctx); err != nil { if err := m.loadExistingTasks(ctx); err != nil {
return nil, err return nil, err
@ -105,7 +106,7 @@ type TaskManager struct {
tasks *runtime.TaskList tasks *runtime.TaskList
events *exchange.Exchange events *exchange.Exchange
db *metadata.DB containers containers.Store
} }
// ID of the task manager // ID of the task manager
@ -278,13 +279,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
} }
func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) { func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) {
var container containers.Container container, err := m.containers.Get(ctx, id)
if err := m.db.View(func(tx *bolt.Tx) error { if err != nil {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
return nil, err return nil, err
} }
return &container, nil return &container, nil

View File

@ -48,8 +48,11 @@ func init() {
if err != nil { if err != nil {
return nil, err return nil, err
} }
db := m.(*metadata.DB)
return &local{ return &local{
db: m.(*metadata.DB), Store: metadata.NewContainerStore(db),
db: db,
publisher: ic.Events, publisher: ic.Events,
}, nil }, nil
}, },
@ -57,6 +60,7 @@ func init() {
} }
type local struct { type local struct {
containers.Store
db *metadata.DB db *metadata.DB
publisher events.Publisher publisher events.Publisher
} }
@ -66,8 +70,8 @@ var _ api.ContainersClient = &local{}
func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc.CallOption) (*api.GetContainerResponse, error) { func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc.CallOption) (*api.GetContainerResponse, error) {
var resp api.GetContainerResponse var resp api.GetContainerResponse
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error {
container, err := store.Get(ctx, req.ID) container, err := l.Store.Get(ctx, req.ID)
if err != nil { if err != nil {
return err return err
} }
@ -80,8 +84,8 @@ func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc
func (l *local) List(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (*api.ListContainersResponse, error) { func (l *local) List(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (*api.ListContainersResponse, error) {
var resp api.ListContainersResponse var resp api.ListContainersResponse
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error {
containers, err := store.List(ctx, req.Filters...) containers, err := l.Store.List(ctx, req.Filters...)
if err != nil { if err != nil {
return err return err
} }
@ -94,8 +98,8 @@ func (l *local) ListStream(ctx context.Context, req *api.ListContainersRequest,
stream := &localStream{ stream := &localStream{
ctx: ctx, ctx: ctx,
} }
return stream, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error { return stream, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error {
containers, err := store.List(ctx, req.Filters...) containers, err := l.Store.List(ctx, req.Filters...)
if err != nil { if err != nil {
return err return err
} }
@ -107,10 +111,10 @@ func (l *local) ListStream(ctx context.Context, req *api.ListContainersRequest,
func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) { func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse var resp api.CreateContainerResponse
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
container := containerFromProto(&req.Container) container := containerFromProto(&req.Container)
created, err := store.Create(ctx, container) created, err := l.Store.Create(ctx, container)
if err != nil { if err != nil {
return err return err
} }
@ -144,13 +148,13 @@ func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ .
container = containerFromProto(&req.Container) container = containerFromProto(&req.Container)
) )
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
var fieldpaths []string var fieldpaths []string
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 { if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
fieldpaths = append(fieldpaths, req.UpdateMask.Paths...) fieldpaths = append(fieldpaths, req.UpdateMask.Paths...)
} }
updated, err := store.Update(ctx, container, fieldpaths...) updated, err := l.Store.Update(ctx, container, fieldpaths...)
if err != nil { if err != nil {
return err return err
} }
@ -174,8 +178,8 @@ func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ .
} }
func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) { func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error { if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
return store.Delete(ctx, req.ID) return l.Store.Delete(ctx, req.ID)
}); err != nil { }); err != nil {
return &ptypes.Empty{}, errdefs.ToGRPC(err) return &ptypes.Empty{}, errdefs.ToGRPC(err)
} }
@ -189,15 +193,17 @@ func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ .
return &ptypes.Empty{}, nil return &ptypes.Empty{}, nil
} }
func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error { func (l *local) withStore(ctx context.Context, fn func(ctx context.Context) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) } return func(tx *bolt.Tx) error {
return fn(metadata.WithTransactionContext(ctx, tx))
}
} }
func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context) error) error {
return l.db.View(l.withStore(ctx, fn)) return l.db.View(l.withStore(ctx, fn))
} }
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error { func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context) error) error {
return l.db.Update(l.withStore(ctx, fn)) return l.db.Update(l.withStore(ctx, fn))
} }

View File

@ -24,7 +24,6 @@ import (
"github.com/containerd/containerd/metadata" "github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin" "github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services" "github.com/containerd/containerd/services"
bolt "go.etcd.io/bbolt"
) )
func init() { func init() {
@ -44,7 +43,7 @@ func init() {
return nil, err return nil, err
} }
return &local{ return &local{
db: m.(*metadata.DB), Manager: metadata.NewLeaseManager(m.(*metadata.DB)),
gc: g.(gcScheduler), gc: g.(gcScheduler),
}, nil }, nil
}, },
@ -56,22 +55,10 @@ type gcScheduler interface {
} }
type local struct { type local struct {
db *metadata.DB leases.Manager
gc gcScheduler gc gcScheduler
} }
func (l *local) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
var lease leases.Lease
if err := l.db.Update(func(tx *bolt.Tx) error {
var err error
lease, err = metadata.NewLeaseManager(tx).Create(ctx, opts...)
return err
}); err != nil {
return leases.Lease{}, err
}
return lease, nil
}
func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.DeleteOpt) error { func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.DeleteOpt) error {
var do leases.DeleteOptions var do leases.DeleteOptions
for _, opt := range opts { for _, opt := range opts {
@ -80,9 +67,7 @@ func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.D
} }
} }
if err := l.db.Update(func(tx *bolt.Tx) error { if err := l.Manager.Delete(ctx, lease); err != nil {
return metadata.NewLeaseManager(tx).Delete(ctx, lease)
}); err != nil {
return err return err
} }
@ -95,39 +80,3 @@ func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.D
return nil return nil
} }
func (l *local) List(ctx context.Context, filters ...string) ([]leases.Lease, error) {
var ll []leases.Lease
if err := l.db.View(func(tx *bolt.Tx) error {
var err error
ll, err = metadata.NewLeaseManager(tx).List(ctx, filters...)
return err
}); err != nil {
return nil, err
}
return ll, nil
}
func (l *local) AddResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
return l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).AddResource(ctx, lease, r)
})
}
func (l *local) DeleteResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
return l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).DeleteResource(ctx, lease, r)
})
}
func (l *local) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) {
var rs []leases.Resource
if err := l.db.View(func(tx *bolt.Tx) error {
var err error
rs, err = metadata.NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
return nil, err
}
return rs, nil
}

View File

@ -51,7 +51,6 @@ import (
ptypes "github.com/gogo/protobuf/types" ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
@ -101,11 +100,11 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
monitor = runtime.NewNoopMonitor() monitor = runtime.NewNoopMonitor()
} }
cs := m.(*metadata.DB).ContentStore() db := m.(*metadata.DB)
l := &local{ l := &local{
runtimes: runtimes, runtimes: runtimes,
db: m.(*metadata.DB), containers: metadata.NewContainerStore(db),
store: cs, store: db.ContentStore(),
publisher: ic.Events, publisher: ic.Events,
monitor: monitor.(runtime.TaskMonitor), monitor: monitor.(runtime.TaskMonitor),
v2Runtime: v2r.(*v2.TaskManager), v2Runtime: v2r.(*v2.TaskManager),
@ -124,7 +123,7 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
type local struct { type local struct {
runtimes map[string]runtime.PlatformRuntime runtimes map[string]runtime.PlatformRuntime
db *metadata.DB containers containers.Store
store content.Store store content.Store
publisher events.Publisher publisher events.Publisher
@ -647,12 +646,8 @@ func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Re
func (l *local) getContainer(ctx context.Context, id string) (*containers.Container, error) { func (l *local) getContainer(ctx context.Context, id string) (*containers.Container, error) {
var container containers.Container var container containers.Container
if err := l.db.View(func(tx *bolt.Tx) error { container, err := l.containers.Get(ctx, id)
store := metadata.NewContainerStore(tx) if err != nil {
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
return nil, errdefs.ToGRPC(err) return nil, errdefs.ToGRPC(err)
} }
return &container, nil return &container, nil