snapshots: refactor metastore transaction
Signed-off-by: Junyu Liu <ljyngup@gmail.com>
This commit is contained in:
@@ -61,13 +61,11 @@ func NewSnapshotter(root string) (snapshots.Snapshotter, error) {
|
||||
//
|
||||
// Should be used for parent resolution, existence checks and to discern
|
||||
// the kind of snapshot.
|
||||
func (o *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, false)
|
||||
if err != nil {
|
||||
return snapshots.Info{}, err
|
||||
}
|
||||
defer t.Rollback()
|
||||
_, info, _, err := storage.GetInfo(ctx, key)
|
||||
func (o *snapshotter) Stat(ctx context.Context, key string) (info snapshots.Info, err error) {
|
||||
err = o.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||
_, info, _, err = storage.GetInfo(ctx, key)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return snapshots.Info{}, err
|
||||
}
|
||||
@@ -75,33 +73,28 @@ func (o *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, err
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (o *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, true)
|
||||
func (o *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (_ snapshots.Info, err error) {
|
||||
err = o.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||
info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return snapshots.Info{}, err
|
||||
}
|
||||
|
||||
info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
|
||||
if err != nil {
|
||||
t.Rollback()
|
||||
return snapshots.Info{}, err
|
||||
}
|
||||
|
||||
if err := t.Commit(); err != nil {
|
||||
return snapshots.Info{}, err
|
||||
}
|
||||
|
||||
return info, nil
|
||||
}
|
||||
|
||||
func (o *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, false)
|
||||
if err != nil {
|
||||
return snapshots.Usage{}, err
|
||||
}
|
||||
defer t.Rollback()
|
||||
func (o *snapshotter) Usage(ctx context.Context, key string) (usage snapshots.Usage, err error) {
|
||||
var (
|
||||
id string
|
||||
info snapshots.Info
|
||||
)
|
||||
|
||||
id, info, usage, err := storage.GetInfo(ctx, key)
|
||||
err = o.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||
id, info, usage, err = storage.GetInfo(ctx, key)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return snapshots.Usage{}, err
|
||||
}
|
||||
@@ -129,89 +122,77 @@ func (o *snapshotter) View(ctx context.Context, key, parent string, opts ...snap
|
||||
// called on an read-write or readonly transaction.
|
||||
//
|
||||
// This can be used to recover mounts after calling View or Prepare.
|
||||
func (o *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, false)
|
||||
func (o *snapshotter) Mounts(ctx context.Context, key string) (_ []mount.Mount, err error) {
|
||||
var s storage.Snapshot
|
||||
err = o.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||
s, err = storage.GetSnapshot(ctx, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get snapshot mount: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s, err := storage.GetSnapshot(ctx, key)
|
||||
t.Rollback()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get snapshot mount: %w", err)
|
||||
}
|
||||
|
||||
return o.mounts(s), nil
|
||||
}
|
||||
|
||||
func (o *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
id, _, _, err := storage.GetInfo(ctx, key)
|
||||
if err != nil {
|
||||
if rerr := t.Rollback(); rerr != nil {
|
||||
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
|
||||
return o.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||
id, _, _, err := storage.GetInfo(ctx, key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
usage, err := fs.DiskUsage(ctx, o.getSnapshotDir(id))
|
||||
if err != nil {
|
||||
if rerr := t.Rollback(); rerr != nil {
|
||||
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
|
||||
usage, err := fs.DiskUsage(ctx, o.getSnapshotDir(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := storage.CommitActive(ctx, key, name, snapshots.Usage(usage), opts...); err != nil {
|
||||
if rerr := t.Rollback(); rerr != nil {
|
||||
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
|
||||
if _, err = storage.CommitActive(ctx, key, name, snapshots.Usage(usage), opts...); err != nil {
|
||||
return fmt.Errorf("failed to commit snapshot: %w", err)
|
||||
}
|
||||
return fmt.Errorf("failed to commit snapshot: %w", err)
|
||||
}
|
||||
return t.Commit()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Remove abandons the transaction identified by key. All resources
|
||||
// associated with the key will be removed.
|
||||
func (o *snapshotter) Remove(ctx context.Context, key string) (err error) {
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil && t != nil {
|
||||
if rerr := t.Rollback(); rerr != nil {
|
||||
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
|
||||
var (
|
||||
renamed, path string
|
||||
restore bool
|
||||
)
|
||||
|
||||
err = o.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||
id, _, err := storage.Remove(ctx, key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove: %w", err)
|
||||
}
|
||||
|
||||
path = o.getSnapshotDir(id)
|
||||
renamed = filepath.Join(o.root, "snapshots", "rm-"+id)
|
||||
if err = os.Rename(path, renamed); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to rename: %w", err)
|
||||
}
|
||||
renamed = ""
|
||||
}
|
||||
}()
|
||||
|
||||
id, _, err := storage.Remove(ctx, key)
|
||||
restore = true
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to remove: %w", err)
|
||||
}
|
||||
|
||||
path := o.getSnapshotDir(id)
|
||||
renamed := filepath.Join(o.root, "snapshots", "rm-"+id)
|
||||
if err := os.Rename(path, renamed); err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to rename: %w", err)
|
||||
}
|
||||
renamed = ""
|
||||
}
|
||||
|
||||
err = t.Commit()
|
||||
t = nil
|
||||
if err != nil {
|
||||
if renamed != "" {
|
||||
if renamed != "" && restore {
|
||||
if err1 := os.Rename(renamed, path); err1 != nil {
|
||||
// May cause inconsistent data on disk
|
||||
log.G(ctx).WithError(err1).WithField("path", renamed).Error("failed to rename after failed commit")
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("failed to commit: %w", err)
|
||||
return err
|
||||
}
|
||||
if renamed != "" {
|
||||
if err := os.RemoveAll(renamed); err != nil {
|
||||
@@ -225,17 +206,15 @@ func (o *snapshotter) Remove(ctx context.Context, key string) (err error) {
|
||||
|
||||
// Walk the committed snapshots.
|
||||
func (o *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error {
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer t.Rollback()
|
||||
return storage.WalkInfo(ctx, fn, fs...)
|
||||
return o.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||
return storage.WalkInfo(ctx, fn, fs...)
|
||||
})
|
||||
}
|
||||
|
||||
func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) (_ []mount.Mount, err error) {
|
||||
var (
|
||||
path, td string
|
||||
s storage.Snapshot
|
||||
)
|
||||
|
||||
if kind == snapshots.KindActive || parent == "" {
|
||||
@@ -262,52 +241,41 @@ func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, k
|
||||
}()
|
||||
}
|
||||
|
||||
ctx, t, err := o.ms.TransactionContext(ctx, true)
|
||||
err = o.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||
s, err = storage.CreateSnapshot(ctx, kind, key, parent, opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create snapshot: %w", err)
|
||||
}
|
||||
|
||||
if td != "" {
|
||||
if len(s.ParentIDs) > 0 {
|
||||
parent := o.getSnapshotDir(s.ParentIDs[0])
|
||||
xattrErrorHandler := func(dst, src, xattrKey string, copyErr error) error {
|
||||
// security.* xattr cannot be copied in most cases (moby/buildkit#1189)
|
||||
log.G(ctx).WithError(copyErr).Debugf("failed to copy xattr %q", xattrKey)
|
||||
return nil
|
||||
}
|
||||
copyDirOpts := []fs.CopyDirOpt{
|
||||
fs.WithXAttrErrorHandler(xattrErrorHandler),
|
||||
}
|
||||
if err = fs.CopyDir(td, parent, copyDirOpts...); err != nil {
|
||||
return fmt.Errorf("copying of parent failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
path = o.getSnapshotDir(s.ID)
|
||||
if err = os.Rename(td, path); err != nil {
|
||||
return fmt.Errorf("failed to rename: %w", err)
|
||||
}
|
||||
td = ""
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s, err := storage.CreateSnapshot(ctx, kind, key, parent, opts...)
|
||||
if err != nil {
|
||||
if rerr := t.Rollback(); rerr != nil {
|
||||
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to create snapshot: %w", err)
|
||||
}
|
||||
|
||||
if td != "" {
|
||||
if len(s.ParentIDs) > 0 {
|
||||
parent := o.getSnapshotDir(s.ParentIDs[0])
|
||||
xattrErrorHandler := func(dst, src, xattrKey string, copyErr error) error {
|
||||
// security.* xattr cannot be copied in most cases (moby/buildkit#1189)
|
||||
log.G(ctx).WithError(copyErr).Debugf("failed to copy xattr %q", xattrKey)
|
||||
return nil
|
||||
}
|
||||
copyDirOpts := []fs.CopyDirOpt{
|
||||
fs.WithXAttrErrorHandler(xattrErrorHandler),
|
||||
}
|
||||
if err := fs.CopyDir(td, parent, copyDirOpts...); err != nil {
|
||||
if rerr := t.Rollback(); rerr != nil {
|
||||
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
|
||||
}
|
||||
return nil, fmt.Errorf("copying of parent failed: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
path = o.getSnapshotDir(s.ID)
|
||||
if err := os.Rename(td, path); err != nil {
|
||||
if rerr := t.Rollback(); rerr != nil {
|
||||
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to rename: %w", err)
|
||||
}
|
||||
td = ""
|
||||
}
|
||||
|
||||
if err := t.Commit(); err != nil {
|
||||
return nil, fmt.Errorf("commit failed: %w", err)
|
||||
}
|
||||
|
||||
return o.mounts(s), nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user