Update metadata snapshotter to lease on exists
Currently the metadata snapshotter is not consistently adding keys to a lease when already exists is returned. When a lease is provided, any already exists errors should add the relevant key to the lease. It is not expected that clients must explicitly lease a key after calling Prepare/Commit. Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
parent
114ef75833
commit
c7fb8a9255
@ -323,6 +323,7 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
|
|||||||
bopts = []snapshots.Opt{
|
bopts = []snapshots.Opt{
|
||||||
snapshots.WithLabels(snapshots.FilterInheritedLabels(base.Labels)),
|
snapshots.WithLabels(snapshots.FilterInheritedLabels(base.Labels)),
|
||||||
}
|
}
|
||||||
|
rerr error
|
||||||
)
|
)
|
||||||
|
|
||||||
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||||
@ -334,12 +335,20 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
|
|||||||
// Check if target exists, if so, return already exists
|
// Check if target exists, if so, return already exists
|
||||||
if target != "" {
|
if target != "" {
|
||||||
if tbkt := bkt.Bucket([]byte(target)); tbkt != nil {
|
if tbkt := bkt.Bucket([]byte(target)); tbkt != nil {
|
||||||
return fmt.Errorf("target snapshot %q: %w", target, errdefs.ErrAlreadyExists)
|
rerr = fmt.Errorf("target snapshot %q: %w", target, errdefs.ErrAlreadyExists)
|
||||||
|
if err := addSnapshotLease(ctx, tx, s.name, target); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if bbkt := bkt.Bucket([]byte(key)); bbkt != nil {
|
if bbkt := bkt.Bucket([]byte(key)); bbkt != nil {
|
||||||
return fmt.Errorf("snapshot %q: %w", key, errdefs.ErrAlreadyExists)
|
rerr = fmt.Errorf("snapshot %q: %w", key, errdefs.ErrAlreadyExists)
|
||||||
|
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if parent != "" {
|
if parent != "" {
|
||||||
@ -360,11 +369,14 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
// Already exists and lease successfully added in transaction
|
||||||
|
if rerr != nil {
|
||||||
|
return nil, rerr
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
m []mount.Mount
|
m []mount.Mount
|
||||||
created string
|
created string
|
||||||
rerr error
|
|
||||||
)
|
)
|
||||||
if readonly {
|
if readonly {
|
||||||
m, err = s.Snapshotter.View(ctx, bkey, bparent, bopts...)
|
m, err = s.Snapshotter.View(ctx, bkey, bparent, bopts...)
|
||||||
@ -527,7 +539,10 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var bname string
|
var (
|
||||||
|
bname string
|
||||||
|
rerr error
|
||||||
|
)
|
||||||
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
if err := update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||||
bkt := getSnapshotterBucket(tx, ns, s.name)
|
bkt := getSnapshotterBucket(tx, ns, s.name)
|
||||||
if bkt == nil {
|
if bkt == nil {
|
||||||
@ -535,16 +550,17 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
|
|||||||
s.name, errdefs.ErrNotFound)
|
s.name, errdefs.ErrNotFound)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := addSnapshotLease(ctx, tx, s.name, name); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
bbkt, err := bkt.CreateBucket([]byte(name))
|
bbkt, err := bkt.CreateBucket([]byte(name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err == bolt.ErrBucketExists {
|
if err == bolt.ErrBucketExists {
|
||||||
err = fmt.Errorf("snapshot %q: %w", name, errdefs.ErrAlreadyExists)
|
rerr = fmt.Errorf("snapshot %q: %w", name, errdefs.ErrAlreadyExists)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := addSnapshotLease(ctx, tx, s.name, name); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
obkt := bkt.Bucket([]byte(key))
|
obkt := bkt.Bucket([]byte(key))
|
||||||
if obkt == nil {
|
if obkt == nil {
|
||||||
@ -634,17 +650,19 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if publisher := s.db.Publisher(ctx); publisher != nil {
|
if rerr == nil {
|
||||||
if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
|
if publisher := s.db.Publisher(ctx); publisher != nil {
|
||||||
Key: key,
|
if err := publisher.Publish(ctx, "/snapshot/commit", &eventstypes.SnapshotCommit{
|
||||||
Name: name,
|
Key: key,
|
||||||
Snapshotter: s.name,
|
Name: name,
|
||||||
}); err != nil {
|
Snapshotter: s.name,
|
||||||
return err
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return rerr
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user