Update metadata interfaces for containers and leases

Add more thorough dirty checking across all types which
may be deleted and hold references.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2019-09-19 16:06:13 -07:00
parent d4802a64f9
commit 0b224ac7d6
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
15 changed files with 539 additions and 606 deletions

View File

@ -19,6 +19,7 @@ package metadata
import (
"context"
"strings"
"sync/atomic"
"time"
"github.com/containerd/containerd/containers"
@ -35,13 +36,13 @@ import (
)
type containerStore struct {
tx *bolt.Tx
db *DB
}
// NewContainerStore returns a Store backed by an underlying bolt DB
func NewContainerStore(tx *bolt.Tx) containers.Store {
func NewContainerStore(db *DB) containers.Store {
return &containerStore{
tx: tx,
db: db,
}
}
@ -51,14 +52,21 @@ func (s *containerStore) Get(ctx context.Context, id string) (containers.Contain
return containers.Container{}, err
}
bkt := getContainerBucket(s.tx, namespace, id)
if bkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
}
container := containers.Container{ID: id}
if err := readContainer(&container, bkt); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to read container %q", id)
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainerBucket(tx, namespace, id)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "container %q in namespace %q", id, namespace)
}
if err := readContainer(&container, bkt); err != nil {
return errors.Wrapf(err, "failed to read container %q", id)
}
return nil
}); err != nil {
return containers.Container{}, err
}
return container, nil
@ -75,27 +83,30 @@ func (s *containerStore) List(ctx context.Context, fs ...string) ([]containers.C
return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error())
}
bkt := getContainersBucket(s.tx, namespace)
if bkt == nil {
return nil, nil // empty store
}
var m []containers.Container
if err := bkt.ForEach(func(k, v []byte) error {
cbkt := bkt.Bucket(k)
if cbkt == nil {
if err := view(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainersBucket(tx, namespace)
if bkt == nil {
return nil // empty store
}
return bkt.ForEach(func(k, v []byte) error {
cbkt := bkt.Bucket(k)
if cbkt == nil {
return nil
}
container := containers.Container{ID: string(k)}
if err := readContainer(&container, cbkt); err != nil {
return errors.Wrapf(err, "failed to read container %q", string(k))
}
if filter.Match(adaptContainer(container)) {
m = append(m, container)
}
return nil
}
container := containers.Container{ID: string(k)}
if err := readContainer(&container, cbkt); err != nil {
return errors.Wrapf(err, "failed to read container %q", string(k))
}
if filter.Match(adaptContainer(container)) {
m = append(m, container)
}
return nil
})
}); err != nil {
return nil, err
}
@ -113,23 +124,29 @@ func (s *containerStore) Create(ctx context.Context, container containers.Contai
return containers.Container{}, errors.Wrap(err, "create container failed validation")
}
bkt, err := createContainersBucket(s.tx, namespace)
if err != nil {
return containers.Container{}, err
}
cbkt, err := bkt.CreateBucket([]byte(container.ID))
if err != nil {
if err == bolt.ErrBucketExists {
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt, err := createContainersBucket(tx, namespace)
if err != nil {
return err
}
return containers.Container{}, err
}
container.CreatedAt = time.Now().UTC()
container.UpdatedAt = container.CreatedAt
if err := writeContainer(cbkt, &container); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
cbkt, err := bkt.CreateBucket([]byte(container.ID))
if err != nil {
if err == bolt.ErrBucketExists {
err = errors.Wrapf(errdefs.ErrAlreadyExists, "container %q", container.ID)
}
return err
}
container.CreatedAt = time.Now().UTC()
container.UpdatedAt = container.CreatedAt
if err := writeContainer(cbkt, &container); err != nil {
return errors.Wrapf(err, "failed to write container %q", container.ID)
}
return nil
}); err != nil {
return containers.Container{}, err
}
return container, nil
@ -145,85 +162,91 @@ func (s *containerStore) Update(ctx context.Context, container containers.Contai
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "must specify a container id")
}
bkt := getContainersBucket(s.tx, namespace)
if bkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
}
cbkt := bkt.Bucket([]byte(container.ID))
if cbkt == nil {
return containers.Container{}, errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
}
var updated containers.Container
if err := readContainer(&updated, cbkt); err != nil {
return updated, errors.Wrapf(err, "failed to read container %q", container.ID)
}
createdat := updated.CreatedAt
updated.ID = container.ID
if len(fieldpaths) == 0 {
// only allow updates to these field on full replace.
fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"}
// Fields that are immutable must cause an error when no field paths
// are provided. This allows these fields to become mutable in the
// future.
if updated.Snapshotter != container.Snapshotter {
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable")
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainersBucket(tx, namespace)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "cannot update container %q in namespace %q", container.ID, namespace)
}
if updated.Runtime.Name != container.Runtime.Name {
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable")
cbkt := bkt.Bucket([]byte(container.ID))
if cbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "container %q", container.ID)
}
}
// apply the field mask. If you update this code, you better follow the
// field mask rules in field_mask.proto. If you don't know what this
// is, do not update this code.
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if updated.Labels == nil {
updated.Labels = map[string]string{}
if err := readContainer(&updated, cbkt); err != nil {
return errors.Wrapf(err, "failed to read container %q", container.ID)
}
createdat := updated.CreatedAt
updated.ID = container.ID
if len(fieldpaths) == 0 {
// only allow updates to these field on full replace.
fieldpaths = []string{"labels", "spec", "extensions", "image", "snapshotkey"}
// Fields that are immutable must cause an error when no field paths
// are provided. This allows these fields to become mutable in the
// future.
if updated.Snapshotter != container.Snapshotter {
return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Snapshotter field is immutable")
}
key := strings.TrimPrefix(path, "labels.")
updated.Labels[key] = container.Labels[key]
continue
}
if strings.HasPrefix(path, "extensions.") {
if updated.Extensions == nil {
updated.Extensions = map[string]types.Any{}
if updated.Runtime.Name != container.Runtime.Name {
return errors.Wrapf(errdefs.ErrInvalidArgument, "container.Runtime.Name field is immutable")
}
key := strings.TrimPrefix(path, "extensions.")
updated.Extensions[key] = container.Extensions[key]
continue
}
switch path {
case "labels":
updated.Labels = container.Labels
case "spec":
updated.Spec = container.Spec
case "extensions":
updated.Extensions = container.Extensions
case "image":
updated.Image = container.Image
case "snapshotkey":
updated.SnapshotKey = container.SnapshotKey
default:
return containers.Container{}, errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
// apply the field mask. If you update this code, you better follow the
// field mask rules in field_mask.proto. If you don't know what this
// is, do not update this code.
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if updated.Labels == nil {
updated.Labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
updated.Labels[key] = container.Labels[key]
continue
}
if strings.HasPrefix(path, "extensions.") {
if updated.Extensions == nil {
updated.Extensions = map[string]types.Any{}
}
key := strings.TrimPrefix(path, "extensions.")
updated.Extensions[key] = container.Extensions[key]
continue
}
switch path {
case "labels":
updated.Labels = container.Labels
case "spec":
updated.Spec = container.Spec
case "extensions":
updated.Extensions = container.Extensions
case "image":
updated.Image = container.Image
case "snapshotkey":
updated.SnapshotKey = container.SnapshotKey
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on %q", path, container.ID)
}
}
}
if err := validateContainer(&updated); err != nil {
return containers.Container{}, errors.Wrap(err, "update failed validation")
}
if err := validateContainer(&updated); err != nil {
return errors.Wrap(err, "update failed validation")
}
updated.CreatedAt = createdat
updated.UpdatedAt = time.Now().UTC()
if err := writeContainer(cbkt, &updated); err != nil {
return containers.Container{}, errors.Wrapf(err, "failed to write container %q", container.ID)
updated.CreatedAt = createdat
updated.UpdatedAt = time.Now().UTC()
if err := writeContainer(cbkt, &updated); err != nil {
return errors.Wrapf(err, "failed to write container %q", container.ID)
}
return nil
}); err != nil {
return containers.Container{}, err
}
return updated, nil
@ -235,15 +258,23 @@ func (s *containerStore) Delete(ctx context.Context, id string) error {
return err
}
bkt := getContainersBucket(s.tx, namespace)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getContainersBucket(tx, namespace)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "cannot delete container %q in namespace %q", id, namespace)
}
if err := bkt.DeleteBucket([]byte(id)); err == bolt.ErrBucketNotFound {
return errors.Wrapf(errdefs.ErrNotFound, "container %v", id)
}
return err
if err := bkt.DeleteBucket([]byte(id)); err != nil {
if err == bolt.ErrBucketNotFound {
err = errors.Wrapf(errdefs.ErrNotFound, "container %v", id)
}
return err
}
atomic.AddUint32(&s.db.dirty, 1)
return nil
})
}
func validateContainer(container *containers.Container) error {

View File

@ -47,6 +47,8 @@ func TestContainersList(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
store := NewContainerStore(NewDB(db, nil, nil))
spec := &specs.Spec{}
encoded, err := typeurl.MarshalAny(spec)
if err != nil {
@ -73,9 +75,8 @@ func TestContainersList(t *testing.T) {
}
if err := db.Update(func(tx *bolt.Tx) error {
store := NewContainerStore(tx)
now := time.Now()
result, err := store.Create(ctx, *testset[id])
result, err := store.Create(WithTransactionContext(ctx, tx), *testset[id])
if err != nil {
return err
}
@ -138,46 +139,35 @@ func TestContainersList(t *testing.T) {
testset = newtestset
}
if err := db.View(func(tx *bolt.Tx) error {
store := NewContainerStore(tx)
results, err := store.List(ctx, testcase.filters...)
if err != nil {
t.Fatal(err)
}
if len(results) == 0 { // all tests return a non-empty result set
t.Fatalf("not results returned")
}
if len(results) != len(testset) {
t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset))
}
for _, result := range results {
checkContainersEqual(t, &result, testset[result.ID], "list results did not match")
}
return nil
}); err != nil {
results, err := store.List(ctx, testcase.filters...)
if err != nil {
t.Fatal(err)
}
if len(results) == 0 { // all tests return a non-empty result set
t.Fatalf("not results returned")
}
if len(results) != len(testset) {
t.Fatalf("length of result does not match testset: %v != %v", len(results), len(testset))
}
for _, result := range results {
checkContainersEqual(t, &result, testset[result.ID], "list results did not match")
}
})
}
// delete everything to test it
for id := range testset {
if err := db.Update(func(tx *bolt.Tx) error {
store := NewContainerStore(tx)
return store.Delete(ctx, id)
}); err != nil {
if err := store.Delete(ctx, id); err != nil {
t.Fatal(err)
}
// try it again, get NotFound
if err := db.Update(func(tx *bolt.Tx) error {
store := NewContainerStore(tx)
return store.Delete(ctx, id)
}); errors.Cause(err) != errdefs.ErrNotFound {
if err := store.Delete(ctx, id); err == nil {
t.Fatalf("expected error deleting non-existent container")
} else if errors.Cause(err) != errdefs.ErrNotFound {
t.Fatalf("unexpected error %v", err)
}
}
@ -188,6 +178,8 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
store := NewContainerStore(NewDB(db, nil, nil))
spec := &specs.Spec{}
encoded, err := typeurl.MarshalAny(spec)
if err != nil {
@ -641,79 +633,51 @@ func TestContainersCreateUpdateDelete(t *testing.T) {
}
testcase.expected.ID = testcase.name
done := errors.New("test complete")
if err := db.Update(func(tx *bolt.Tx) error {
var (
now = time.Now().UTC()
store = NewContainerStore(tx)
)
now := time.Now().UTC()
result, err := store.Create(ctx, testcase.original)
if errors.Cause(err) != testcase.createerr {
if testcase.createerr == nil {
t.Fatalf("unexpected error: %v", err)
} else {
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr)
}
} else if testcase.createerr != nil {
return done
result, err := store.Create(ctx, testcase.original)
if errors.Cause(err) != testcase.createerr {
if testcase.createerr == nil {
t.Fatalf("unexpected error: %v", err)
} else {
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.createerr)
}
} else if testcase.createerr != nil {
return
}
checkContainerTimestamps(t, &result, now, true)
checkContainerTimestamps(t, &result, now, true)
// ensure that createdat is never tampered with
testcase.original.CreatedAt = result.CreatedAt
testcase.expected.CreatedAt = result.CreatedAt
testcase.original.UpdatedAt = result.UpdatedAt
testcase.expected.UpdatedAt = result.UpdatedAt
// ensure that createdat is never tampered with
testcase.original.CreatedAt = result.CreatedAt
testcase.expected.CreatedAt = result.CreatedAt
testcase.original.UpdatedAt = result.UpdatedAt
testcase.expected.UpdatedAt = result.UpdatedAt
checkContainersEqual(t, &result, &testcase.original, "unexpected result on container update")
return nil
}); err != nil {
if err == done {
return
checkContainersEqual(t, &result, &testcase.original, "unexpected result on container update")
now = time.Now()
result, err = store.Update(ctx, testcase.input, testcase.fieldpaths...)
if errors.Cause(err) != testcase.cause {
if testcase.cause == nil {
t.Fatalf("unexpected error: %v", err)
} else {
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause)
}
} else if testcase.cause != nil {
return
}
checkContainerTimestamps(t, &result, now, false)
testcase.expected.UpdatedAt = result.UpdatedAt
checkContainersEqual(t, &result, &testcase.expected, "updated failed to get expected result")
result, err = store.Get(ctx, testcase.original.ID)
if err != nil {
t.Fatal(err)
}
if err := db.Update(func(tx *bolt.Tx) error {
now := time.Now()
store := NewContainerStore(tx)
result, err := store.Update(ctx, testcase.input, testcase.fieldpaths...)
if errors.Cause(err) != testcase.cause {
if testcase.cause == nil {
t.Fatalf("unexpected error: %v", err)
} else {
t.Fatalf("cause of %v (cause: %v) != %v", err, errors.Cause(err), testcase.cause)
}
} else if testcase.cause != nil {
return done
}
checkContainerTimestamps(t, &result, now, false)
testcase.expected.UpdatedAt = result.UpdatedAt
checkContainersEqual(t, &result, &testcase.expected, "updated failed to get expected result")
return nil
}); err != nil {
if err == done {
return
}
t.Fatal(err)
}
if err := db.View(func(tx *bolt.Tx) error {
store := NewContainerStore(tx)
result, err := store.Get(ctx, testcase.original.ID)
if err != nil {
t.Fatal(err)
}
checkContainersEqual(t, &result, &testcase.expected, "get after failed to get expected result")
return nil
}); err != nil {
t.Fatal(err)
}
checkContainersEqual(t, &result, &testcase.expected, "get after failed to get expected result")
})
}
}

View File

@ -21,6 +21,7 @@ import (
"encoding/binary"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/containerd/containerd/content"
@ -221,9 +222,8 @@ func (cs *contentStore) Delete(ctx context.Context, dgst digest.Digest) error {
}
// Mark content store as dirty for triggering garbage collection
cs.db.dirtyL.Lock()
atomic.AddUint32(&cs.db.dirty, 1)
cs.db.dirtyCS = true
cs.db.dirtyL.Unlock()
return nil
})

View File

@ -166,17 +166,13 @@ func TestIngestLeased(t *testing.T) {
}
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
if err := db.Update(func(tx *bolt.Tx) error {
_, err := NewLeaseManager(tx).Create(ctx, leases.WithID(name))
return err
}); err != nil {
lm := NewLeaseManager(db)
if _, err := lm.Create(ctx, leases.WithID(name)); err != nil {
return nil, nil, err
}
return leases.WithLease(ctx, name), func() error {
return db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).Delete(ctx, leases.Lease{
ID: name,
})
return lm.Delete(ctx, leases.Lease{
ID: name,
})
}, nil
}

View File

@ -21,6 +21,7 @@ import (
"encoding/binary"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/containerd/containerd/content"
@ -75,10 +76,16 @@ type DB struct {
// sweep phases without preventing read transactions.
wlock sync.RWMutex
// dirty flags and lock keeps track of datastores which have had deletions
// since the last garbage collection. These datastores will will be garbage
// collected during the next garbage collection.
dirtyL sync.Mutex
// dirty flag indicates that refences have been removed which require
// a garbage collection to ensure the database is clean. This tracks
// the number of dirty operations. This should be updated and read
// atomically if outside of wlock.Lock.
dirty uint32
// dirtySS and dirtyCS flags keeps track of datastores which have had
// deletions since the last garbage collection. These datastores will
// be garbage collected during the next garbage collection. These
// should only be updated inside of a write transaction or wlock.Lock.
dirtySS map[string]struct{}
dirtyCS bool
@ -237,12 +244,10 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
defer m.wlock.RUnlock()
err := m.db.Update(fn)
if err == nil {
m.dirtyL.Lock()
dirty := m.dirtyCS || len(m.dirtySS) > 0
dirty := atomic.LoadUint32(&m.dirty) > 0
for _, fn := range m.mutationCallbacks {
fn(dirty)
}
m.dirtyL.Unlock()
}
return err
@ -254,9 +259,9 @@ func (m *DB) Update(fn func(*bolt.Tx) error) error {
// The callback function is an argument for whether a deletion has occurred
// since the last garbage collection.
func (m *DB) RegisterMutationCallback(fn func(bool)) {
m.dirtyL.Lock()
m.wlock.Lock()
m.mutationCallbacks = append(m.mutationCallbacks, fn)
m.dirtyL.Unlock()
m.wlock.Unlock()
}
// GCStats holds the duration for the different phases of the garbage collector
@ -282,8 +287,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
return nil, err
}
m.dirtyL.Lock()
if err := m.db.Update(func(tx *bolt.Tx) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
@ -309,7 +312,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
return nil
}); err != nil {
m.dirtyL.Unlock()
m.wlock.Unlock()
return nil, err
}
@ -317,6 +319,9 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
var stats GCStats
var wg sync.WaitGroup
// reset dirty, no need for atomic inside of wlock.Lock
m.dirty = 0
if len(m.dirtySS) > 0 {
var sl sync.Mutex
stats.SnapshotD = map[string]time.Duration{}
@ -349,8 +354,6 @@ func (m *DB) GarbageCollect(ctx context.Context) (gc.Stats, error) {
m.dirtyCS = false
}
m.dirtyL.Unlock()
stats.MetaD = time.Since(t1)
m.wlock.Unlock()

View File

@ -386,7 +386,7 @@ func TestMetadataCollector(t *testing.T) {
if err := mdb.Update(func(tx *bolt.Tx) error {
for _, obj := range objects {
node, err := create(obj, tx, NewImageStore(mdb), cs, sn)
node, err := create(obj, tx, mdb, cs, sn)
if err != nil {
return err
}
@ -461,7 +461,7 @@ func benchmarkTrigger(n int) func(b *testing.B) {
if err := mdb.Update(func(tx *bolt.Tx) error {
for _, obj := range objects {
node, err := create(obj, tx, NewImageStore(mdb), cs, sn)
node, err := create(obj, tx, mdb, cs, sn)
if err != nil {
return err
}
@ -541,16 +541,15 @@ type object struct {
labels map[string]string
}
func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) {
func create(obj object, tx *bolt.Tx, db *DB, cs content.Store, sn snapshots.Snapshotter) (*gc.Node, error) {
var (
node *gc.Node
namespace = "test"
ctx = namespaces.WithNamespace(context.Background(), namespace)
ctx = WithTransactionContext(namespaces.WithNamespace(context.Background(), namespace), tx)
)
switch v := obj.data.(type) {
case testContent:
ctx := WithTransactionContext(ctx, tx)
expected := digest.FromBytes(v.data)
w, err := cs.Writer(ctx,
content.WithRef("test-ref"),
@ -572,7 +571,6 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
}
}
case testSnapshot:
ctx := WithTransactionContext(ctx, tx)
if v.active {
_, err := sn.Prepare(ctx, v.key, v.parent, snapshots.WithLabels(obj.labels))
if err != nil {
@ -596,14 +594,13 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
}
}
case testImage:
ctx := WithTransactionContext(ctx, tx)
image := images.Image{
Name: v.name,
Target: v.target,
Labels: obj.labels,
}
_, err := is.Create(ctx, image)
_, err := NewImageStore(db).Create(ctx, image)
if err != nil {
return nil, errors.Wrap(err, "failed to create image")
}
@ -619,12 +616,13 @@ func create(obj object, tx *bolt.Tx, is images.Store, cs content.Store, sn snaps
},
Spec: &types.Any{},
}
_, err := NewContainerStore(tx).Create(ctx, container)
_, err := NewContainerStore(db).Create(ctx, container)
if err != nil {
return nil, err
}
case testLease:
lm := NewLeaseManager(tx)
lm := NewLeaseManager(db)
l, err := lm.Create(ctx, leases.WithID(v.id), leases.WithLabels(obj.labels))
if err != nil {
return nil, err

View File

@ -21,6 +21,7 @@ import (
"encoding/binary"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/containerd/containerd/errdefs"
@ -249,19 +250,16 @@ func (s *imageStore) Delete(ctx context.Context, name string, opts ...images.Del
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
}
err = bkt.DeleteBucket([]byte(name))
if err == bolt.ErrBucketNotFound {
return errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
if err = bkt.DeleteBucket([]byte(name)); err != nil {
if err == bolt.ErrBucketNotFound {
err = errors.Wrapf(errdefs.ErrNotFound, "image %q", name)
}
return err
}
// A reference to a piece of content has been removed,
// mark content store as dirty for triggering garbage
// collection
s.db.dirtyL.Lock()
s.db.dirtyCS = true
s.db.dirtyL.Unlock()
atomic.AddUint32(&s.db.dirty, 1)
return err
return nil
})
}

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"
"github.com/containerd/containerd/errdefs"
@ -35,14 +36,14 @@ import (
// LeaseManager manages the create/delete lifecycle of leases
// and also returns existing leases
type LeaseManager struct {
tx *bolt.Tx
db *DB
}
// NewLeaseManager creates a new lease manager for managing leases using
// the provided database transaction.
func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
func NewLeaseManager(db *DB) *LeaseManager {
return &LeaseManager{
tx: tx,
db: db,
}
}
@ -63,35 +64,40 @@ func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases.
return leases.Lease{}, err
}
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return leases.Lease{}, err
}
txbkt, err := topbkt.CreateBucket([]byte(l.ID))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
if err := update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt, err := createBucketIfNotExists(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return err
}
return leases.Lease{}, errors.Wrapf(err, "lease %q", l.ID)
}
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return leases.Lease{}, err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return leases.Lease{}, err
}
if l.Labels != nil {
if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil {
return leases.Lease{}, err
txbkt, err := topbkt.CreateBucket([]byte(l.ID))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
}
return errors.Wrapf(err, "lease %q", l.ID)
}
}
l.CreatedAt = t
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return err
}
if l.Labels != nil {
if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil {
return err
}
}
l.CreatedAt = t
return nil
}); err != nil {
return leases.Lease{}, err
}
return l, nil
}
@ -102,17 +108,22 @@ func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease, _ ...lea
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
if err := topbkt.DeleteBucket([]byte(lease.ID)); err != nil {
if err == bolt.ErrBucketNotFound {
err = errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
return update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
return err
}
return nil
if err := topbkt.DeleteBucket([]byte(lease.ID)); err != nil {
if err == bolt.ErrBucketNotFound {
err = errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
return err
}
atomic.AddUint32(&lm.db.dirty, 1)
return nil
})
}
// List lists all active leases
@ -129,39 +140,41 @@ func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease,
var ll []leases.Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return ll, nil
}
if err := topbkt.ForEach(func(k, v []byte) error {
if v != nil {
if err := view(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return nil
}
txbkt := topbkt.Bucket(k)
l := leases.Lease{
ID: string(k),
}
return topbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
txbkt := topbkt.Bucket(k)
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
t := &l.CreatedAt
if err := t.UnmarshalBinary(v); err != nil {
l := leases.Lease{
ID: string(k),
}
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
t := &l.CreatedAt
if err := t.UnmarshalBinary(v); err != nil {
return err
}
}
labels, err := boltutil.ReadLabels(txbkt)
if err != nil {
return err
}
}
l.Labels = labels
labels, err := boltutil.ReadLabels(txbkt)
if err != nil {
return err
}
l.Labels = labels
if filter.Match(adaptLease(l)) {
ll = append(ll, l)
}
if filter.Match(adaptLease(l)) {
ll = append(ll, l)
}
return nil
return nil
})
}); err != nil {
return nil, err
}
@ -176,24 +189,26 @@ func (lm *LeaseManager) AddResource(ctx context.Context, lease leases.Lease, r l
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
return update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
keys, ref, err := parseLeaseResource(r)
if err != nil {
return err
}
bkt := topbkt
for _, key := range keys {
bkt, err = bkt.CreateBucketIfNotExists([]byte(key))
keys, ref, err := parseLeaseResource(r)
if err != nil {
return err
}
}
return bkt.Put([]byte(ref), nil)
bkt := topbkt
for _, key := range keys {
bkt, err = bkt.CreateBucketIfNotExists([]byte(key))
if err != nil {
return err
}
}
return bkt.Put([]byte(ref), nil)
})
}
// DeleteResource dereferences the resource by the provided lease.
@ -203,28 +218,35 @@ func (lm *LeaseManager) DeleteResource(ctx context.Context, lease leases.Lease,
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
keys, ref, err := parseLeaseResource(r)
if err != nil {
return err
}
bkt := topbkt
for _, key := range keys {
if bkt == nil {
break
return update(ctx, lm.db, func(tx *bolt.Tx) error {
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
bkt = bkt.Bucket([]byte(key))
}
if bkt == nil {
keys, ref, err := parseLeaseResource(r)
if err != nil {
return err
}
bkt := topbkt
for _, key := range keys {
if bkt == nil {
break
}
bkt = bkt.Bucket([]byte(key))
}
if bkt != nil {
if err := bkt.Delete([]byte(ref)); err != nil {
return err
}
}
atomic.AddUint32(&lm.db.dirty, 1)
return nil
}
return bkt.Delete([]byte(ref))
})
}
// ListResources lists all the resources referenced by the lease.
@ -234,59 +256,66 @@ func (lm *LeaseManager) ListResources(ctx context.Context, lease leases.Lease) (
return nil, err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
var rs []leases.Resource
rs := make([]leases.Resource, 0)
if err := view(ctx, lm.db, func(tx *bolt.Tx) error {
// content resources
if cbkt := topbkt.Bucket(bucketKeyObjectContent); cbkt != nil {
if err := cbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: string(bucketKeyObjectContent),
})
return nil
}); err != nil {
return nil, err
topbkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
}
// ingest resources
if lbkt := topbkt.Bucket(bucketKeyObjectIngests); lbkt != nil {
if err := lbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: string(bucketKeyObjectIngests),
})
return nil
}); err != nil {
return nil, err
}
}
// snapshot resources
if sbkt := topbkt.Bucket(bucketKeyObjectSnapshots); sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, _ []byte) error {
// content resources
if cbkt := topbkt.Bucket(bucketKeyObjectContent); cbkt != nil {
if err := cbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: fmt.Sprintf("%s/%s", bucketKeyObjectSnapshots, sk),
Type: string(bucketKeyObjectContent),
})
return nil
})
}); err != nil {
return nil, err
}); err != nil {
return err
}
}
// ingest resources
if lbkt := topbkt.Bucket(bucketKeyObjectIngests); lbkt != nil {
if err := lbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: string(bucketKeyObjectIngests),
})
return nil
}); err != nil {
return err
}
}
// snapshot resources
if sbkt := topbkt.Bucket(bucketKeyObjectSnapshots); sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: fmt.Sprintf("%s/%s", bucketKeyObjectSnapshots, sk),
})
return nil
})
}); err != nil {
return err
}
}
return nil
}); err != nil {
return nil, err
}
return rs, nil
}

View File

@ -29,6 +29,8 @@ func TestLeases(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
lm := NewLeaseManager(NewDB(db, nil, nil))
testCases := []struct {
ID string
CreateErr error
@ -51,7 +53,7 @@ func TestLeases(t *testing.T) {
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
lease, err := NewLeaseManager(tx).Create(ctx, leases.WithID(tc.ID))
lease, err := lm.Create(WithTransactionContext(ctx, tx), leases.WithID(tc.ID))
if err != nil {
if tc.CreateErr != nil && errors.Cause(err) == tc.CreateErr {
return nil
@ -65,13 +67,8 @@ func TestLeases(t *testing.T) {
}
}
var listed []leases.Lease
// List leases, check same
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx)
return err
}); err != nil {
listed, err := lm.List(ctx)
if err != nil {
t.Fatal(err)
}
@ -88,10 +85,8 @@ func TestLeases(t *testing.T) {
}
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).Delete(ctx, leases.Lease{
ID: tc.ID,
})
if err := lm.Delete(ctx, leases.Lease{
ID: tc.ID,
}); err != nil {
if tc.DeleteErr == nil && errors.Cause(err) != tc.DeleteErr {
t.Fatal(err)
@ -100,11 +95,8 @@ func TestLeases(t *testing.T) {
}
}
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx)
return err
}); err != nil {
listed, err = lm.List(ctx)
if err != nil {
t.Fatal(err)
}
@ -117,6 +109,8 @@ func TestLeasesList(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
lm := NewLeaseManager(NewDB(db, nil, nil))
testset := [][]leases.Opt{
{
leases.WithID("lease1"),
@ -143,14 +137,16 @@ func TestLeasesList(t *testing.T) {
}
// Insert all
for _, opts := range testset {
if err := db.Update(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
_, err := lm.Create(ctx, opts...)
return err
}); err != nil {
t.Fatal(err)
if err := db.Update(func(tx *bolt.Tx) error {
for _, opts := range testset {
_, err := lm.Create(WithTransactionContext(ctx, tx), opts...)
if err != nil {
return err
}
}
return nil
}); err != nil {
t.Fatal(err)
}
for _, testcase := range []struct {
@ -201,39 +197,33 @@ func TestLeasesList(t *testing.T) {
},
} {
t.Run(testcase.name, func(t *testing.T) {
if err := db.View(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
results, err := lm.List(ctx, testcase.filters...)
if err != nil {
return err
}
if len(results) != len(testcase.expected) {
t.Errorf("length of result does not match expected: %v != %v", len(results), len(testcase.expected))
}
expectedMap := map[string]struct{}{}
for _, expected := range testcase.expected {
expectedMap[expected] = struct{}{}
}
for _, result := range results {
if _, ok := expectedMap[result.ID]; !ok {
t.Errorf("unexpected match: %v", result.ID)
} else {
delete(expectedMap, result.ID)
}
}
if len(expectedMap) > 0 {
for match := range expectedMap {
t.Errorf("missing match: %v", match)
}
}
return nil
}); err != nil {
results, err := lm.List(ctx, testcase.filters...)
if err != nil {
t.Fatal(err)
}
if len(results) != len(testcase.expected) {
t.Errorf("length of result does not match expected: %v != %v", len(results), len(testcase.expected))
}
expectedMap := map[string]struct{}{}
for _, expected := range testcase.expected {
expectedMap[expected] = struct{}{}
}
for _, result := range results {
if _, ok := expectedMap[result.ID]; !ok {
t.Errorf("unexpected match: %v", result.ID)
} else {
delete(expectedMap, result.ID)
}
}
if len(expectedMap) > 0 {
for match := range expectedMap {
t.Errorf("missing match: %v", match)
}
}
})
}
@ -246,18 +236,12 @@ func TestLeasesList(t *testing.T) {
}
}
if err := db.Update(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
return lm.Delete(ctx, lease)
}); err != nil {
if err := lm.Delete(ctx, lease); err != nil {
t.Fatal(err)
}
// try it again, get not found
if err := db.Update(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
return lm.Delete(ctx, lease)
}); err == nil {
if err := lm.Delete(ctx, lease); err == nil {
t.Fatalf("expected error deleting non-existent lease")
} else if !errdefs.IsNotFound(err) {
t.Fatalf("unexpected error: %s", err)
@ -269,6 +253,8 @@ func TestLeaseResource(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
lm := NewLeaseManager(NewDB(db, nil, nil))
var (
leaseID = "l1"
@ -280,10 +266,7 @@ func TestLeaseResource(t *testing.T) {
)
// prepare lease
if err := db.Update(func(tx *bolt.Tx) error {
_, err0 := NewLeaseManager(tx).Create(ctx, leases.WithID(leaseID))
return err0
}); err != nil {
if _, err := lm.Create(ctx, leases.WithID(leaseID)); err != nil {
t.Fatal(err)
}
@ -379,7 +362,7 @@ func TestLeaseResource(t *testing.T) {
idxList := make(map[leases.Resource]bool)
for i, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
err0 := NewLeaseManager(tx).AddResource(ctx, tc.lease, tc.resource)
err0 := lm.AddResource(WithTransactionContext(ctx, tx), tc.lease, tc.resource)
if got := errors.Cause(err0); got != tc.err {
return errors.Errorf("expect error (%v), but got (%v)", tc.err, err0)
}
@ -396,11 +379,8 @@ func TestLeaseResource(t *testing.T) {
// check list function
var gotList []leases.Resource
if err := db.View(func(tx *bolt.Tx) error {
var err error
gotList, err = NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
gotList, err := lm.ListResources(ctx, lease)
if err != nil {
t.Fatal(err)
}
@ -420,21 +400,16 @@ func TestLeaseResource(t *testing.T) {
}
// remove snapshots
if err := db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).DeleteResource(ctx, lease, leases.Resource{
ID: snapshotterKey,
Type: "snapshots/overlayfs",
})
if err := lm.DeleteResource(ctx, lease, leases.Resource{
ID: snapshotterKey,
Type: "snapshots/overlayfs",
}); err != nil {
t.Fatal(err)
}
// check list number
if err := db.View(func(tx *bolt.Tx) error {
var err error
gotList, err = NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
gotList, err = lm.ListResources(ctx, lease)
if err != nil {
t.Fatal(err)
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/containerd/containerd/errdefs"
@ -517,9 +518,8 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
}
// Mark snapshotter as dirty for triggering garbage collection
s.db.dirtyL.Lock()
atomic.AddUint32(&s.db.dirty, 1)
s.db.dirtySS[s.name] = struct{}{}
s.db.dirtyL.Unlock()
return nil
})

View File

@ -50,7 +50,6 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"golang.org/x/sys/unix"
)
@ -112,13 +111,13 @@ func New(ic *plugin.InitContext) (interface{}, error) {
}
cfg := ic.Config.(*Config)
r := &Runtime{
root: ic.Root,
state: ic.State,
tasks: runtime.NewTaskList(),
db: m.(*metadata.DB),
address: ic.Address,
events: ic.Events,
config: cfg,
root: ic.Root,
state: ic.State,
tasks: runtime.NewTaskList(),
containers: metadata.NewContainerStore(m.(*metadata.DB)),
address: ic.Address,
events: ic.Events,
config: cfg,
}
tasks, err := r.restoreTasks(ic.Context)
if err != nil {
@ -138,9 +137,9 @@ type Runtime struct {
state string
address string
tasks *runtime.TaskList
db *metadata.DB
events *exchange.Exchange
tasks *runtime.TaskList
containers containers.Store
events *exchange.Exchange
config *Config
}
@ -508,14 +507,8 @@ func (r *Runtime) getRuntime(ctx context.Context, ns, id string) (*runc.Runc, er
}
func (r *Runtime) getRuncOptions(ctx context.Context, id string) (*runctypes.RuncOptions, error) {
var container containers.Container
if err := r.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
container, err := r.containers.Get(ctx, id)
if err != nil {
return nil, err
}

View File

@ -33,7 +33,6 @@ import (
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
bolt "go.etcd.io/bbolt"
)
// Config for the v2 runtime
@ -69,13 +68,15 @@ func init() {
if err != nil {
return nil, err
}
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, m.(*metadata.DB))
cs := metadata.NewContainerStore(m.(*metadata.DB))
return New(ic.Context, ic.Root, ic.State, ic.Address, ic.TTRPCAddress, ic.Events, cs)
},
})
}
// New task manager for v2 shims
func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, db *metadata.DB) (*TaskManager, error) {
func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAddress string, events *exchange.Exchange, cs containers.Store) (*TaskManager, error) {
for _, d := range []string{root, state} {
if err := os.MkdirAll(d, 0711); err != nil {
return nil, err
@ -88,7 +89,7 @@ func New(ctx context.Context, root, state, containerdAddress, containerdTTRPCAdd
containerdTTRPCAddress: containerdTTRPCAddress,
tasks: runtime.NewTaskList(),
events: events,
db: db,
containers: cs,
}
if err := m.loadExistingTasks(ctx); err != nil {
return nil, err
@ -103,9 +104,9 @@ type TaskManager struct {
containerdAddress string
containerdTTRPCAddress string
tasks *runtime.TaskList
events *exchange.Exchange
db *metadata.DB
tasks *runtime.TaskList
events *exchange.Exchange
containers containers.Store
}
// ID of the task manager
@ -278,13 +279,8 @@ func (m *TaskManager) loadTasks(ctx context.Context) error {
}
func (m *TaskManager) container(ctx context.Context, id string) (*containers.Container, error) {
var container containers.Container
if err := m.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
container, err := m.containers.Get(ctx, id)
if err != nil {
return nil, err
}
return &container, nil

View File

@ -48,8 +48,11 @@ func init() {
if err != nil {
return nil, err
}
db := m.(*metadata.DB)
return &local{
db: m.(*metadata.DB),
Store: metadata.NewContainerStore(db),
db: db,
publisher: ic.Events,
}, nil
},
@ -57,6 +60,7 @@ func init() {
}
type local struct {
containers.Store
db *metadata.DB
publisher events.Publisher
}
@ -66,8 +70,8 @@ var _ api.ContainersClient = &local{}
func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc.CallOption) (*api.GetContainerResponse, error) {
var resp api.GetContainerResponse
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
container, err := store.Get(ctx, req.ID)
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error {
container, err := l.Store.Get(ctx, req.ID)
if err != nil {
return err
}
@ -80,8 +84,8 @@ func (l *local) Get(ctx context.Context, req *api.GetContainerRequest, _ ...grpc
func (l *local) List(ctx context.Context, req *api.ListContainersRequest, _ ...grpc.CallOption) (*api.ListContainersResponse, error) {
var resp api.ListContainersResponse
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
containers, err := store.List(ctx, req.Filters...)
return &resp, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error {
containers, err := l.Store.List(ctx, req.Filters...)
if err != nil {
return err
}
@ -94,8 +98,8 @@ func (l *local) ListStream(ctx context.Context, req *api.ListContainersRequest,
stream := &localStream{
ctx: ctx,
}
return stream, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context, store containers.Store) error {
containers, err := store.List(ctx, req.Filters...)
return stream, errdefs.ToGRPC(l.withStoreView(ctx, func(ctx context.Context) error {
containers, err := l.Store.List(ctx, req.Filters...)
if err != nil {
return err
}
@ -107,10 +111,10 @@ func (l *local) ListStream(ctx context.Context, req *api.ListContainersRequest,
func (l *local) Create(ctx context.Context, req *api.CreateContainerRequest, _ ...grpc.CallOption) (*api.CreateContainerResponse, error) {
var resp api.CreateContainerResponse
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
container := containerFromProto(&req.Container)
created, err := store.Create(ctx, container)
created, err := l.Store.Create(ctx, container)
if err != nil {
return err
}
@ -144,13 +148,13 @@ func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ .
container = containerFromProto(&req.Container)
)
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
var fieldpaths []string
if req.UpdateMask != nil && len(req.UpdateMask.Paths) > 0 {
fieldpaths = append(fieldpaths, req.UpdateMask.Paths...)
}
updated, err := store.Update(ctx, container, fieldpaths...)
updated, err := l.Store.Update(ctx, container, fieldpaths...)
if err != nil {
return err
}
@ -174,8 +178,8 @@ func (l *local) Update(ctx context.Context, req *api.UpdateContainerRequest, _ .
}
func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if err := l.withStoreUpdate(ctx, func(ctx context.Context, store containers.Store) error {
return store.Delete(ctx, req.ID)
if err := l.withStoreUpdate(ctx, func(ctx context.Context) error {
return l.Store.Delete(ctx, req.ID)
}); err != nil {
return &ptypes.Empty{}, errdefs.ToGRPC(err)
}
@ -189,15 +193,17 @@ func (l *local) Delete(ctx context.Context, req *api.DeleteContainerRequest, _ .
return &ptypes.Empty{}, nil
}
func (l *local) withStore(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error { return fn(ctx, metadata.NewContainerStore(tx)) }
func (l *local) withStore(ctx context.Context, fn func(ctx context.Context) error) func(tx *bolt.Tx) error {
return func(tx *bolt.Tx) error {
return fn(metadata.WithTransactionContext(ctx, tx))
}
}
func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
func (l *local) withStoreView(ctx context.Context, fn func(ctx context.Context) error) error {
return l.db.View(l.withStore(ctx, fn))
}
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context, store containers.Store) error) error {
func (l *local) withStoreUpdate(ctx context.Context, fn func(ctx context.Context) error) error {
return l.db.Update(l.withStore(ctx, fn))
}

View File

@ -24,7 +24,6 @@ import (
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
bolt "go.etcd.io/bbolt"
)
func init() {
@ -44,8 +43,8 @@ func init() {
return nil, err
}
return &local{
db: m.(*metadata.DB),
gc: g.(gcScheduler),
Manager: metadata.NewLeaseManager(m.(*metadata.DB)),
gc: g.(gcScheduler),
}, nil
},
})
@ -56,22 +55,10 @@ type gcScheduler interface {
}
type local struct {
db *metadata.DB
leases.Manager
gc gcScheduler
}
func (l *local) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
var lease leases.Lease
if err := l.db.Update(func(tx *bolt.Tx) error {
var err error
lease, err = metadata.NewLeaseManager(tx).Create(ctx, opts...)
return err
}); err != nil {
return leases.Lease{}, err
}
return lease, nil
}
func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.DeleteOpt) error {
var do leases.DeleteOptions
for _, opt := range opts {
@ -80,9 +67,7 @@ func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.D
}
}
if err := l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).Delete(ctx, lease)
}); err != nil {
if err := l.Manager.Delete(ctx, lease); err != nil {
return err
}
@ -95,39 +80,3 @@ func (l *local) Delete(ctx context.Context, lease leases.Lease, opts ...leases.D
return nil
}
func (l *local) List(ctx context.Context, filters ...string) ([]leases.Lease, error) {
var ll []leases.Lease
if err := l.db.View(func(tx *bolt.Tx) error {
var err error
ll, err = metadata.NewLeaseManager(tx).List(ctx, filters...)
return err
}); err != nil {
return nil, err
}
return ll, nil
}
func (l *local) AddResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
return l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).AddResource(ctx, lease, r)
})
}
func (l *local) DeleteResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
return l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).DeleteResource(ctx, lease, r)
})
}
func (l *local) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) {
var rs []leases.Resource
if err := l.db.View(func(tx *bolt.Tx) error {
var err error
rs, err = metadata.NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
return nil, err
}
return rs, nil
}

View File

@ -51,7 +51,6 @@ import (
ptypes "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -101,14 +100,14 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
monitor = runtime.NewNoopMonitor()
}
cs := m.(*metadata.DB).ContentStore()
db := m.(*metadata.DB)
l := &local{
runtimes: runtimes,
db: m.(*metadata.DB),
store: cs,
publisher: ic.Events,
monitor: monitor.(runtime.TaskMonitor),
v2Runtime: v2r.(*v2.TaskManager),
runtimes: runtimes,
containers: metadata.NewContainerStore(db),
store: db.ContentStore(),
publisher: ic.Events,
monitor: monitor.(runtime.TaskMonitor),
v2Runtime: v2r.(*v2.TaskManager),
}
for _, r := range runtimes {
tasks, err := r.Tasks(ic.Context, true)
@ -123,10 +122,10 @@ func initFunc(ic *plugin.InitContext) (interface{}, error) {
}
type local struct {
runtimes map[string]runtime.PlatformRuntime
db *metadata.DB
store content.Store
publisher events.Publisher
runtimes map[string]runtime.PlatformRuntime
containers containers.Store
store content.Store
publisher events.Publisher
monitor runtime.TaskMonitor
v2Runtime *v2.TaskManager
@ -647,12 +646,8 @@ func (l *local) writeContent(ctx context.Context, mediaType, ref string, r io.Re
func (l *local) getContainer(ctx context.Context, id string) (*containers.Container, error) {
var container containers.Container
if err := l.db.View(func(tx *bolt.Tx) error {
store := metadata.NewContainerStore(tx)
var err error
container, err = store.Get(ctx, id)
return err
}); err != nil {
container, err := l.containers.Get(ctx, id)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
return &container, nil