Support target snapshot references on prepare

Allows backend snapshots to bring existing snapshots into
a namespace without requiring clients to fully snapshots
when the target reference is known. Backend snapshots must
explicitly implement this functionality, it is equivalent
to sharing across namespaces and is up to the backend to
use the label when it is given or ignore it.

This enables remote snapshot functionality for a backend to
query for a target snapshot before a client has performed
any work to create that snapshot.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2019-10-09 17:23:46 -07:00
parent d1261b5087
commit 526c0db693
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
3 changed files with 566 additions and 49 deletions

View File

@ -48,9 +48,31 @@ import (
bolt "go.etcd.io/bbolt"
)
func testDB(t *testing.T) (context.Context, *DB, func()) {
type testOptions struct {
extraSnapshots map[string]func(string) (snapshots.Snapshotter, error)
}
type testOpt func(*testOptions)
func withSnapshotter(name string, fn func(string) (snapshots.Snapshotter, error)) testOpt {
return func(to *testOptions) {
if to.extraSnapshots == nil {
to.extraSnapshots = map[string]func(string) (snapshots.Snapshotter, error){}
}
to.extraSnapshots[name] = fn
}
}
func testDB(t *testing.T, opt ...testOpt) (context.Context, *DB, func()) {
ctx, cancel := context.WithCancel(context.Background())
ctx = namespaces.WithNamespace(ctx, "testing")
ctx = logtest.WithT(ctx, t)
var topts testOptions
for _, o := range opt {
o(&topts)
}
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
if err != nil {
@ -62,6 +84,18 @@ func testDB(t *testing.T) (context.Context, *DB, func()) {
t.Fatal(err)
}
snapshotters := map[string]snapshots.Snapshotter{
"native": snapshotter,
}
for name, fn := range topts.extraSnapshots {
snapshotter, err := fn(filepath.Join(dirname, name))
if err != nil {
t.Fatal(err)
}
snapshotters[name] = snapshotter
}
cs, err := local.NewStore(filepath.Join(dirname, "content"))
if err != nil {
t.Fatal(err)
@ -72,7 +106,7 @@ func testDB(t *testing.T) (context.Context, *DB, func()) {
t.Fatal(err)
}
db := NewDB(bdb, cs, map[string]snapshots.Snapshotter{"native": snapshotter})
db := NewDB(bdb, cs, snapshotters)
if err := db.Init(ctx); err != nil {
t.Fatal(err)
}

View File

@ -38,6 +38,7 @@ import (
const (
inheritedLabelsPrefix = "containerd.io/snapshot/"
labelSnapshotRef = "containerd.io/snapshot.ref"
)
type snapshotter struct {
@ -159,6 +160,7 @@ func (s *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpath
local = snapshots.Info{
Name: info.Name,
}
updated bool
)
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
@ -220,17 +222,19 @@ func (s *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpath
Labels: filterInheritedLabels(local.Labels),
}
if _, err := s.Snapshotter.Update(ctx, inner, fieldpaths...); err != nil {
// NOTE: Perform this inside the transaction to reduce the
// chances of out of sync data. The backend snapshotters
// should perform the Update as fast as possible.
if info, err = s.Snapshotter.Update(ctx, inner, fieldpaths...); err != nil {
return err
}
updated = true
return nil
}); err != nil {
return snapshots.Info{}, err
if updated {
log.G(ctx).WithField("snapshotter", s.name).WithField("key", local.Name).WithError(err).Error("transaction failed after updating snapshot backend")
}
info, err = s.Snapshotter.Stat(ctx, bkey)
if err != nil {
return snapshots.Info{}, err
}
@ -297,31 +301,158 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return nil, err
}
var m []mount.Mount
var (
target = base.Labels[labelSnapshotRef]
bparent string
bkey string
bopts = []snapshots.Opt{
snapshots.WithLabels(filterInheritedLabels(base.Labels)),
}
)
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt, err := createSnapshotterBucket(tx, ns, s.name)
if err != nil {
return err
}
bbkt, err := bkt.CreateBucket([]byte(key))
if err != nil {
if err == bolt.ErrBucketExists {
err = errors.Wrapf(errdefs.ErrAlreadyExists, "snapshot %q", key)
// Check if target exists, if so, return already exists
if target != "" {
if tbkt := bkt.Bucket([]byte(target)); tbkt != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "target snapshot %q", target)
}
return err
}
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
var bparent string
if bbkt := bkt.Bucket([]byte(key)); bbkt != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "snapshot %q", key)
}
if parent != "" {
pbkt := bkt.Bucket([]byte(parent))
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", parent)
}
bparent = string(pbkt.Get(bucketKeyName))
}
sid, err := bkt.NextSequence()
if err != nil {
return err
}
bkey = createKey(sid, ns, key)
return err
}); err != nil {
return nil, err
}
var (
m []mount.Mount
created string
rerr error
)
if readonly {
m, err = s.Snapshotter.View(ctx, bkey, bparent, bopts...)
} else {
m, err = s.Snapshotter.Prepare(ctx, bkey, bparent, bopts...)
}
// An already exists error should indicate the backend found a snapshot
// matching a provided target reference.
if errdefs.IsAlreadyExists(err) {
if target != "" {
var tinfo *snapshots.Info
filter := fmt.Sprintf(`labels."containerd.io/snapshot.ref"==%s,parent==%q`, target, bparent)
if err := s.Snapshotter.Walk(ctx, func(ctx context.Context, i snapshots.Info) error {
if tinfo == nil && i.Kind == snapshots.KindCommitted {
if i.Labels["containerd.io/snapshot.ref"] != target {
// Walk did not respect filter
return nil
}
if i.Parent != bparent {
// Walk did not respect filter
return nil
}
tinfo = &i
}
return nil
}, filter); err != nil {
return nil, errors.Wrap(err, "failed walking backend snapshots")
}
if tinfo == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "target snapshot %q in backend", target)
}
key = target
bkey = tinfo.Name
bparent = tinfo.Parent
base.Created = tinfo.Created
base.Updated = tinfo.Updated
if base.Labels == nil {
base.Labels = tinfo.Labels
} else {
for k, v := range tinfo.Labels {
if _, ok := base.Labels[k]; !ok {
base.Labels[k] = v
}
}
}
// Propagate this error after the final update
rerr = errors.Wrapf(errdefs.ErrAlreadyExists, "target snapshot %q from snapshotter", target)
} else {
// This condition is unexpected as the key provided is expected
// to be new and unique, return as unknown response from backend
// to avoid confusing callers handling already exists.
return nil, errors.Wrapf(errdefs.ErrUnknown, "unexpected error from snapshotter: %v", err)
}
} else if err != nil {
return nil, err
} else {
ts := time.Now().UTC()
base.Created = ts
base.Updated = ts
created = bkey
}
if txerr := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "can not find snapshotter %q", s.name)
}
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
bbkt, err := bkt.CreateBucket([]byte(key))
if err != nil {
if err != bolt.ErrBucketExists {
return err
}
if rerr == nil {
rerr = errors.Wrapf(errdefs.ErrAlreadyExists, "snapshot %q", key)
}
return nil
}
if parent != "" {
pbkt := bkt.Bucket([]byte(parent))
if pbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", parent)
}
// Ensure the backend's parent matches the metadata store's parent
// If it is mismatched, then a target was provided for a snapshotter
// which has a different parent then requested.
// NOTE: The backend snapshotter is responsible for enforcing the
// uniqueness of the reference relationships, the metadata store
// can only error out to prevent inconsistent data.
if bparent != string(pbkt.Get(bucketKeyName)) {
return errors.Wrapf(errdefs.ErrInvalidArgument, "mismatched parent %s from target %s", parent, target)
}
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
if err != nil {
@ -336,36 +467,28 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
}
}
sid, err := bkt.NextSequence()
if err != nil {
return err
}
bkey := createKey(sid, ns, key)
if err := bbkt.Put(bucketKeyName, []byte(bkey)); err != nil {
return err
}
ts := time.Now().UTC()
if err := boltutil.WriteTimestamps(bbkt, ts, ts); err != nil {
if err := boltutil.WriteTimestamps(bbkt, base.Created, base.Updated); err != nil {
return err
}
if err := boltutil.WriteLabels(bbkt, base.Labels); err != nil {
return err
}
inheritedOpt := snapshots.WithLabels(filterInheritedLabels(base.Labels))
return bbkt.Put(bucketKeyName, []byte(bkey))
}); txerr != nil {
rerr = txerr
}
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
if readonly {
m, err = s.Snapshotter.View(ctx, bkey, bparent, inheritedOpt)
} else {
m, err = s.Snapshotter.Prepare(ctx, bkey, bparent, inheritedOpt)
if rerr != nil {
// If the created reference is not stored, attempt clean up
if created != "" {
if err := s.Snapshotter.Remove(ctx, created); err != nil {
log.G(ctx).WithField("snapshotter", s.name).WithField("key", created).WithError(err).Error("failed to cleanup unreferenced snapshot")
}
return err
}); err != nil {
return nil, err
}
return nil, rerr
}
return m, nil
}
@ -389,7 +512,8 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
return err
}
return update(ctx, s.db, func(tx *bolt.Tx) error {
var bname string
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
bkt := getSnapshotterBucket(tx, ns, s.name)
if bkt == nil {
return errors.Wrapf(errdefs.ErrNotFound,
@ -464,10 +588,38 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
inheritedOpt := snapshots.WithLabels(filterInheritedLabels(base.Labels))
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
return s.Snapshotter.Commit(ctx, nameKey, bkey, inheritedOpt)
})
// NOTE: Backend snapshotters should commit fast and reliably to
// prevent metadata store locking and minimizing rollbacks.
// This operation should be done in the transaction to minimize the
// risk of the committed keys becoming out of sync. If this operation
// succeed and the overall transaction fails then the risk of out of
// sync data is higher and may require manual cleanup.
if err := s.Snapshotter.Commit(ctx, nameKey, bkey, inheritedOpt); err != nil {
if errdefs.IsNotFound(err) {
log.G(ctx).WithField("snapshotter", s.name).WithField("key", key).WithError(err).Error("uncommittable snapshot: missing in backend, snapshot should be removed")
}
// NOTE: Consider handling already exists here from the backend. Currently
// already exists from the backend may be confusing to the client since it
// may require the client to re-attempt from prepare. However, if handling
// here it is not clear what happened with the existing backend key and
// whether the already prepared snapshot would still be used or must be
// discarded. It is best that all implementations of the snapshotter
// interface behave the same, in which case the backend should handle the
// mapping of duplicates and not error.
return err
}
bname = nameKey
return nil
}); err != nil {
if bname != "" {
log.G(ctx).WithField("snapshotter", s.name).WithField("key", key).WithField("bname", bname).WithError(err).Error("uncommittable snapshot: transaction failed after commit, snapshot should be removed")
}
return err
}
return nil
}
@ -788,8 +940,9 @@ func (s *snapshotter) Close() error {
return s.Snapshotter.Close()
}
// filterInheritedLabels filters the provided labels by removing any key which doesn't have
// a prefix of "containerd.io/snapshot/".
// filterInheritedLabels filters the provided labels by removing any key which
// isn't a snapshot label. Snapshot labels have a prefix of "containerd.io/snapshot/"
// or are the "containerd.io/snapshot.ref" label.
func filterInheritedLabels(labels map[string]string) map[string]string {
if labels == nil {
return nil
@ -797,7 +950,7 @@ func filterInheritedLabels(labels map[string]string) map[string]string {
filtered := make(map[string]string)
for k, v := range labels {
if strings.HasPrefix(k, inheritedLabelsPrefix) {
if k == labelSnapshotRef || strings.HasPrefix(k, inheritedLabelsPrefix) {
filtered[k] = v
}
}

View File

@ -22,12 +22,19 @@ import (
"path/filepath"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/testutil"
"github.com/containerd/containerd/snapshots"
"github.com/containerd/containerd/snapshots/native"
"github.com/containerd/containerd/snapshots/testsuite"
"github.com/pkg/errors"
bolt "go.etcd.io/bbolt"
)
@ -65,6 +72,129 @@ func TestMetadata(t *testing.T) {
testsuite.SnapshotterSuite(t, "Metadata", newTestSnapshotter)
}
func TestSnapshotterWithRef(t *testing.T) {
ctx, db, done := testDB(t, withSnapshotter("tmp", func(string) (snapshots.Snapshotter, error) {
return NewTmpSnapshotter(), nil
}))
defer done()
sn := db.Snapshotter("tmp")
test1opt := snapshots.WithLabels(
map[string]string{
labelSnapshotRef: "test1",
},
)
_, err := sn.Prepare(ctx, "test1-tmp", "", test1opt)
if err != nil {
t.Fatal(err)
}
err = sn.Commit(ctx, "test1", "test1-tmp", test1opt)
if err != nil {
t.Fatal(err)
}
ctx2 := namespaces.WithNamespace(ctx, "testing2")
_, err = sn.Prepare(ctx2, "test1-tmp", "", test1opt)
if err == nil {
t.Fatal("expected already exists error")
} else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err)
}
// test1 should now be in the namespace
_, err = sn.Stat(ctx2, "test1")
if err != nil {
t.Fatal(err)
}
test2opt := snapshots.WithLabels(
map[string]string{
labelSnapshotRef: "test2",
},
)
_, err = sn.Prepare(ctx2, "test2-tmp", "test1", test2opt)
if err != nil {
t.Fatal(err)
}
// In original namespace, but not committed
_, err = sn.Prepare(ctx, "test2-tmp", "test1", test2opt)
if err != nil {
t.Fatal(err)
}
err = sn.Commit(ctx2, "test2", "test2-tmp", test2opt)
if err != nil {
t.Fatal(err)
}
// See note in Commit function for why
// this does not return ErrAlreadyExists
err = sn.Commit(ctx, "test2", "test2-tmp", test2opt)
if err != nil {
t.Fatal(err)
}
// This should error out, already exists in namespace
// despite mismatched parent
_, err = sn.Prepare(ctx2, "test2-tmp-again", "", test2opt)
if err == nil {
t.Fatal("expected already exists error")
} else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err)
}
// In original namespace, but already exists
_, err = sn.Prepare(ctx, "test2-tmp-again", "test1", test2opt)
if err == nil {
t.Fatal("expected already exists error")
} else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err)
}
// Now try a third namespace
ctx3 := namespaces.WithNamespace(ctx, "testing3")
// This should error out, matching parent not found
_, err = sn.Prepare(ctx3, "test2-tmp", "", test2opt)
if err != nil {
t.Fatal(err)
}
// Remove, not going to use yet
err = sn.Remove(ctx3, "test2-tmp")
if err != nil {
t.Fatal(err)
}
_, err = sn.Prepare(ctx3, "test2-tmp", "test1", test2opt)
if err == nil {
t.Fatal("expected not error")
} else if !errdefs.IsNotFound(err) {
t.Fatal(err)
}
_, err = sn.Prepare(ctx3, "test1-tmp", "", test1opt)
if err == nil {
t.Fatal("expected already exists error")
} else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err)
}
_, err = sn.Prepare(ctx3, "test2-tmp", "test1", test2opt)
if err == nil {
t.Fatal("expected already exists error")
} else if !errdefs.IsAlreadyExists(err) {
t.Fatal(err)
}
}
func TestFilterInheritedLabels(t *testing.T) {
tests := []struct {
labels map[string]string
@ -102,3 +232,203 @@ func TestFilterInheritedLabels(t *testing.T) {
}
}
}
type tmpSnapshotter struct {
l sync.Mutex
snapshots map[string]snapshots.Info
targets map[string][]string
}
func NewTmpSnapshotter() snapshots.Snapshotter {
return &tmpSnapshotter{
snapshots: map[string]snapshots.Info{},
targets: map[string][]string{},
}
}
func (s *tmpSnapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
s.l.Lock()
defer s.l.Unlock()
i, ok := s.snapshots[key]
if !ok {
return snapshots.Info{}, errdefs.ErrNotFound
}
return i, nil
}
func (s *tmpSnapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
s.l.Lock()
defer s.l.Unlock()
i, ok := s.snapshots[info.Name]
if !ok {
return snapshots.Info{}, errdefs.ErrNotFound
}
for k, v := range info.Labels {
i.Labels[k] = v
}
s.snapshots[i.Name] = i
return i, nil
}
func (s *tmpSnapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
s.l.Lock()
defer s.l.Unlock()
_, ok := s.snapshots[key]
if !ok {
return snapshots.Usage{}, errdefs.ErrNotFound
}
return snapshots.Usage{}, nil
}
func (s *tmpSnapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
s.l.Lock()
defer s.l.Unlock()
_, ok := s.snapshots[key]
if !ok {
return nil, errdefs.ErrNotFound
}
return []mount.Mount{}, nil
}
func (s *tmpSnapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
return s.create(ctx, key, parent, snapshots.KindActive, opts...)
}
func (s *tmpSnapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
return s.create(ctx, key, parent, snapshots.KindView, opts...)
}
func (s *tmpSnapshotter) create(ctx context.Context, key, parent string, kind snapshots.Kind, opts ...snapshots.Opt) ([]mount.Mount, error) {
s.l.Lock()
defer s.l.Unlock()
var base snapshots.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return nil, err
}
}
base.Name = key
base.Kind = kind
target := base.Labels[labelSnapshotRef]
if target != "" {
for _, name := range s.targets[target] {
if s.snapshots[name].Parent == parent {
return nil, errors.Wrap(errdefs.ErrAlreadyExists, "found target")
}
}
}
if parent != "" {
_, ok := s.snapshots[parent]
if !ok {
return nil, errdefs.ErrNotFound
}
base.Parent = parent
}
ts := time.Now().UTC()
base.Created = ts
base.Updated = ts
s.snapshots[base.Name] = base
return []mount.Mount{}, nil
}
func (s *tmpSnapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
s.l.Lock()
defer s.l.Unlock()
var base snapshots.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
}
}
base.Name = name
base.Kind = snapshots.KindCommitted
if _, ok := s.snapshots[name]; ok {
return errors.Wrap(errdefs.ErrAlreadyExists, "found name")
}
src, ok := s.snapshots[key]
if !ok {
return errdefs.ErrNotFound
}
if src.Kind == snapshots.KindCommitted {
return errdefs.ErrInvalidArgument
}
base.Parent = src.Parent
ts := time.Now().UTC()
base.Created = ts
base.Updated = ts
s.snapshots[name] = base
delete(s.snapshots, key)
if target := base.Labels[labelSnapshotRef]; target != "" {
s.targets[target] = append(s.targets[target], name)
}
return nil
}
func (s *tmpSnapshotter) Remove(ctx context.Context, key string) error {
s.l.Lock()
defer s.l.Unlock()
sn, ok := s.snapshots[key]
if !ok {
return errdefs.ErrNotFound
}
delete(s.snapshots, key)
// scan and remove all instances of name as a target
for ref, names := range s.targets {
for i := range names {
if names[i] == sn.Name {
if len(names) == 1 {
delete(s.targets, ref)
} else {
copy(names[i:], names[i+1:])
s.targets[ref] = names[:len(names)-1]
}
break
}
}
}
return nil
}
func (s *tmpSnapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error {
s.l.Lock()
defer s.l.Unlock()
filter, err := filters.ParseAll(fs...)
if err != nil {
return err
}
// call func for each
for _, i := range s.snapshots {
if filter.Match(adaptSnapshot(i)) {
if err := fn(ctx, i); err != nil {
return err
}
}
}
return nil
}
func (s *tmpSnapshotter) Close() error {
return nil
}