Refactor metastore transaction

Signed-off-by: Junyu Liu <ljyngup@gmail.com>
This commit is contained in:
iyear 2022-10-14 20:00:59 +08:00
parent 84b81a89ff
commit 1d0619bc0c

View File

@ -102,13 +102,12 @@ func NewSnapshotter(root string) (snapshots.Snapshotter, error) {
//
// Should be used for parent resolution, existence checks and to discern
// the kind of snapshot.
func (b *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
ctx, t, err := b.ms.TransactionContext(ctx, false)
if err != nil {
return snapshots.Info{}, err
}
defer t.Rollback()
_, info, _, err := storage.GetInfo(ctx, key)
func (b *snapshotter) Stat(ctx context.Context, key string) (info snapshots.Info, err error) {
err = b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
_, info, _, err = storage.GetInfo(ctx, key)
return err
})
if err != nil {
return snapshots.Info{}, err
}
@ -116,19 +115,13 @@ func (b *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, err
return info, nil
}
func (b *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
ctx, t, err := b.ms.TransactionContext(ctx, true)
if err != nil {
return snapshots.Info{}, err
}
func (b *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (_ snapshots.Info, err error) {
err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
return err
})
info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
if err != nil {
t.Rollback()
return snapshots.Info{}, err
}
if err := t.Commit(); err != nil {
return snapshots.Info{}, err
}
@ -140,18 +133,20 @@ func (b *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, e
return b.usage(ctx, key)
}
func (b *snapshotter) usage(ctx context.Context, key string) (snapshots.Usage, error) {
ctx, t, err := b.ms.TransactionContext(ctx, false)
if err != nil {
return snapshots.Usage{}, err
}
id, info, usage, err := storage.GetInfo(ctx, key)
var parentID string
if err == nil && info.Kind == snapshots.KindActive && info.Parent != "" {
parentID, _, _, err = storage.GetInfo(ctx, info.Parent)
func (b *snapshotter) usage(ctx context.Context, key string) (usage snapshots.Usage, err error) {
var (
id, parentID string
info snapshots.Info
)
}
t.Rollback() // transaction no longer needed at this point.
err = b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
id, info, usage, err = storage.GetInfo(ctx, key)
if err == nil && info.Kind == snapshots.KindActive && info.Parent != "" {
parentID, _, _, err = storage.GetInfo(ctx, info.Parent)
}
return err
})
if err != nil {
return snapshots.Usage{}, err
@ -177,13 +172,10 @@ func (b *snapshotter) usage(ctx context.Context, key string) (snapshots.Usage, e
}
// Walk the committed snapshots.
func (b *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error {
ctx, t, err := b.ms.TransactionContext(ctx, false)
if err != nil {
return err
}
defer t.Rollback()
return storage.WalkInfo(ctx, fn, fs...)
func (b *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) (err error) {
return b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
return storage.WalkInfo(ctx, fn, fs...)
})
}
func (b *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
@ -194,51 +186,39 @@ func (b *snapshotter) View(ctx context.Context, key, parent string, opts ...snap
return b.makeSnapshot(ctx, snapshots.KindView, key, parent, opts)
}
func (b *snapshotter) makeSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) ([]mount.Mount, error) {
ctx, t, err := b.ms.TransactionContext(ctx, true)
if err != nil {
return nil, err
}
defer func() {
if err != nil && t != nil {
if rerr := t.Rollback(); rerr != nil {
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
}
func (b *snapshotter) makeSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) (_ []mount.Mount, err error) {
var (
target string
s storage.Snapshot
)
err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
s, err = storage.CreateSnapshot(ctx, kind, key, parent, opts...)
if err != nil {
return err
}
}()
s, err := storage.CreateSnapshot(ctx, kind, key, parent, opts...)
if err != nil {
return nil, err
}
target = filepath.Join(b.root, strings.ToLower(s.Kind.String()), s.ID)
target := filepath.Join(b.root, strings.ToLower(s.Kind.String()), s.ID)
if len(s.ParentIDs) == 0 {
// create new subvolume
// btrfs subvolume create /dir
if err = btrfs.SubvolCreate(target); err != nil {
return nil, err
if len(s.ParentIDs) == 0 {
// create new subvolume
// btrfs subvolume create /dir
return btrfs.SubvolCreate(target)
}
} else {
parentp := filepath.Join(b.root, "snapshots", s.ParentIDs[0])
var readonly bool
if kind == snapshots.KindView {
readonly = true
// btrfs subvolume snapshot /parent /subvol
readOnly := kind == snapshots.KindView
return btrfs.SubvolSnapshot(target, parentp, readOnly)
})
if err != nil {
if target != "" {
if derr := btrfs.SubvolDelete(target); derr != nil {
log.G(ctx).WithError(derr).WithField("subvolume", target).Error("failed to delete subvolume")
}
}
// btrfs subvolume snapshot /parent /subvol
if err = btrfs.SubvolSnapshot(target, parentp, readonly); err != nil {
return nil, err
}
}
err = t.Commit()
t = nil
if err != nil {
if derr := btrfs.SubvolDelete(target); derr != nil {
log.G(ctx).WithError(derr).WithField("subvolume", target).Error("failed to delete subvolume")
}
return nil, err
}
@ -272,47 +252,40 @@ func (b *snapshotter) mounts(dir string, s storage.Snapshot) ([]mount.Mount, err
}
func (b *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) (err error) {
usage, err := b.usage(ctx, key)
var usage snapshots.Usage
usage, err = b.usage(ctx, key)
if err != nil {
return fmt.Errorf("failed to compute usage: %w", err)
}
ctx, t, err := b.ms.TransactionContext(ctx, true)
var source, target string
err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
id, err := storage.CommitActive(ctx, key, name, usage, opts...) // TODO(stevvooe): Resolve a usage value for btrfs
if err != nil {
return fmt.Errorf("failed to commit: %w", err)
}
source = filepath.Join(b.root, "active", id)
target = filepath.Join(b.root, "snapshots", id)
return btrfs.SubvolSnapshot(target, source, true)
})
if err != nil {
return err
}
defer func() {
if err != nil && t != nil {
if rerr := t.Rollback(); rerr != nil {
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
if target != "" {
if derr := btrfs.SubvolDelete(target); derr != nil {
log.G(ctx).WithError(derr).WithField("subvolume", target).Error("failed to delete subvolume")
}
}
}()
id, err := storage.CommitActive(ctx, key, name, usage, opts...) // TODO(stevvooe): Resolve a usage value for btrfs
if err != nil {
return fmt.Errorf("failed to commit: %w", err)
}
source := filepath.Join(b.root, "active", id)
target := filepath.Join(b.root, "snapshots", id)
if err := btrfs.SubvolSnapshot(target, source, true); err != nil {
return err
}
err = t.Commit()
t = nil
if err != nil {
if derr := btrfs.SubvolDelete(target); derr != nil {
log.G(ctx).WithError(derr).WithField("subvolume", target).Error("failed to delete subvolume")
if source != "" {
if derr := btrfs.SubvolDelete(source); derr != nil {
// Log as warning, only needed for cleanup, will not cause name collision
log.G(ctx).WithError(derr).WithField("subvolume", source).Warn("failed to delete subvolume")
}
return err
}
if derr := btrfs.SubvolDelete(source); derr != nil {
// Log as warning, only needed for cleanup, will not cause name collision
log.G(ctx).WithError(derr).WithField("subvolume", source).Warn("failed to delete subvolume")
}
return nil
@ -322,13 +295,14 @@ func (b *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
// called on an read-write or readonly transaction.
//
// This can be used to recover mounts after calling View or Prepare.
func (b *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
ctx, t, err := b.ms.TransactionContext(ctx, false)
if err != nil {
return nil, err
}
s, err := storage.GetSnapshot(ctx, key)
t.Rollback()
func (b *snapshotter) Mounts(ctx context.Context, key string) (_ []mount.Mount, err error) {
var s storage.Snapshot
err = b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
s, err = storage.GetSnapshot(ctx, key)
return err
})
if err != nil {
return nil, fmt.Errorf("failed to get active snapshot: %w", err)
}
@ -341,21 +315,11 @@ func (b *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er
// associated with the key will be removed.
func (b *snapshotter) Remove(ctx context.Context, key string) (err error) {
var (
source, removed string
readonly bool
source, removed string
readonly, restore bool
)
ctx, t, err := b.ms.TransactionContext(ctx, true)
if err != nil {
return err
}
defer func() {
if err != nil && t != nil {
if rerr := t.Rollback(); rerr != nil {
log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
}
}
if removed != "" {
if derr := btrfs.SubvolDelete(removed); derr != nil {
log.G(ctx).WithError(derr).WithField("subvolume", removed).Warn("failed to delete subvolume")
@ -363,47 +327,53 @@ func (b *snapshotter) Remove(ctx context.Context, key string) (err error) {
}
}()
id, k, err := storage.Remove(ctx, key)
if err != nil {
return fmt.Errorf("failed to remove snapshot: %w", err)
}
switch k {
case snapshots.KindView:
source = filepath.Join(b.root, "view", id)
removed = filepath.Join(b.root, "view", "rm-"+id)
readonly = true
case snapshots.KindActive:
source = filepath.Join(b.root, "active", id)
removed = filepath.Join(b.root, "active", "rm-"+id)
case snapshots.KindCommitted:
source = filepath.Join(b.root, "snapshots", id)
removed = filepath.Join(b.root, "snapshots", "rm-"+id)
readonly = true
}
if err := btrfs.SubvolSnapshot(removed, source, readonly); err != nil {
removed = ""
return err
}
if err := btrfs.SubvolDelete(source); err != nil {
return fmt.Errorf("failed to remove snapshot %v: %w", source, err)
}
err = t.Commit()
t = nil
if err != nil {
// Attempt to restore source
if err1 := btrfs.SubvolSnapshot(source, removed, readonly); err1 != nil {
log.G(ctx).WithFields(logrus.Fields{
logrus.ErrorKey: err1,
"subvolume": source,
"renamed": removed,
}).Error("failed to restore subvolume from renamed")
// Keep removed to allow for manual restore
removed = ""
err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
id, k, err := storage.Remove(ctx, key)
if err != nil {
return fmt.Errorf("failed to remove snapshot: %w", err)
}
switch k {
case snapshots.KindView:
source = filepath.Join(b.root, "view", id)
removed = filepath.Join(b.root, "view", "rm-"+id)
readonly = true
case snapshots.KindActive:
source = filepath.Join(b.root, "active", id)
removed = filepath.Join(b.root, "active", "rm-"+id)
case snapshots.KindCommitted:
source = filepath.Join(b.root, "snapshots", id)
removed = filepath.Join(b.root, "snapshots", "rm-"+id)
readonly = true
}
if err = btrfs.SubvolSnapshot(removed, source, readonly); err != nil {
removed = ""
return err
}
if err = btrfs.SubvolDelete(source); err != nil {
return fmt.Errorf("failed to remove snapshot %v: %w", source, err)
}
restore = true
return nil
})
if err != nil {
if restore { // means failed to commit transaction
// Attempt to restore source
if err1 := btrfs.SubvolSnapshot(source, removed, readonly); err1 != nil {
log.G(ctx).WithFields(logrus.Fields{
logrus.ErrorKey: err1,
"subvolume": source,
"renamed": removed,
}).Error("failed to restore subvolume from renamed")
// Keep removed to allow for manual restore
removed = ""
}
}
return err
}