Merge pull request #7529 from iyear/refactor-metastore-tx
Refactor metastore transaction
This commit is contained in:
		@@ -102,13 +102,12 @@ func NewSnapshotter(root string) (snapshots.Snapshotter, error) {
 | 
			
		||||
//
 | 
			
		||||
// Should be used for parent resolution, existence checks and to discern
 | 
			
		||||
// the kind of snapshot.
 | 
			
		||||
func (b *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, false)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return snapshots.Info{}, err
 | 
			
		||||
	}
 | 
			
		||||
	defer t.Rollback()
 | 
			
		||||
	_, info, _, err := storage.GetInfo(ctx, key)
 | 
			
		||||
func (b *snapshotter) Stat(ctx context.Context, key string) (info snapshots.Info, err error) {
 | 
			
		||||
	err = b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
 | 
			
		||||
		_, info, _, err = storage.GetInfo(ctx, key)
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return snapshots.Info{}, err
 | 
			
		||||
	}
 | 
			
		||||
@@ -116,19 +115,13 @@ func (b *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, err
 | 
			
		||||
	return info, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, true)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return snapshots.Info{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
func (b *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (_ snapshots.Info, err error) {
 | 
			
		||||
	err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
 | 
			
		||||
		info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Rollback()
 | 
			
		||||
		return snapshots.Info{}, err
 | 
			
		||||
	}
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err := t.Commit(); err != nil {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return snapshots.Info{}, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -140,18 +133,20 @@ func (b *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, e
 | 
			
		||||
	return b.usage(ctx, key)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *snapshotter) usage(ctx context.Context, key string) (snapshots.Usage, error) {
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, false)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return snapshots.Usage{}, err
 | 
			
		||||
	}
 | 
			
		||||
	id, info, usage, err := storage.GetInfo(ctx, key)
 | 
			
		||||
	var parentID string
 | 
			
		||||
func (b *snapshotter) usage(ctx context.Context, key string) (usage snapshots.Usage, err error) {
 | 
			
		||||
	var (
 | 
			
		||||
		id, parentID string
 | 
			
		||||
		info         snapshots.Info
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	err = b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
 | 
			
		||||
		id, info, usage, err = storage.GetInfo(ctx, key)
 | 
			
		||||
 | 
			
		||||
		if err == nil && info.Kind == snapshots.KindActive && info.Parent != "" {
 | 
			
		||||
			parentID, _, _, err = storage.GetInfo(ctx, info.Parent)
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
	t.Rollback() // transaction no longer needed at this point.
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return snapshots.Usage{}, err
 | 
			
		||||
@@ -177,13 +172,10 @@ func (b *snapshotter) usage(ctx context.Context, key string) (snapshots.Usage, e
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Walk the committed snapshots.
 | 
			
		||||
func (b *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error {
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, false)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer t.Rollback()
 | 
			
		||||
func (b *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) (err error) {
 | 
			
		||||
	return b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
 | 
			
		||||
		return storage.WalkInfo(ctx, fn, fs...)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
 | 
			
		||||
@@ -194,51 +186,39 @@ func (b *snapshotter) View(ctx context.Context, key, parent string, opts ...snap
 | 
			
		||||
	return b.makeSnapshot(ctx, snapshots.KindView, key, parent, opts)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *snapshotter) makeSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) ([]mount.Mount, error) {
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, true)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err != nil && t != nil {
 | 
			
		||||
			if rerr := t.Rollback(); rerr != nil {
 | 
			
		||||
				log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
func (b *snapshotter) makeSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) (_ []mount.Mount, err error) {
 | 
			
		||||
	var (
 | 
			
		||||
		target string
 | 
			
		||||
		s      storage.Snapshot
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	s, err := storage.CreateSnapshot(ctx, kind, key, parent, opts...)
 | 
			
		||||
	err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
 | 
			
		||||
		s, err = storage.CreateSnapshot(ctx, kind, key, parent, opts...)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	target := filepath.Join(b.root, strings.ToLower(s.Kind.String()), s.ID)
 | 
			
		||||
		target = filepath.Join(b.root, strings.ToLower(s.Kind.String()), s.ID)
 | 
			
		||||
 | 
			
		||||
		if len(s.ParentIDs) == 0 {
 | 
			
		||||
			// create new subvolume
 | 
			
		||||
			// btrfs subvolume create /dir
 | 
			
		||||
		if err = btrfs.SubvolCreate(target); err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
			return btrfs.SubvolCreate(target)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		parentp := filepath.Join(b.root, "snapshots", s.ParentIDs[0])
 | 
			
		||||
 | 
			
		||||
		var readonly bool
 | 
			
		||||
		if kind == snapshots.KindView {
 | 
			
		||||
			readonly = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		// btrfs subvolume snapshot /parent /subvol
 | 
			
		||||
		if err = btrfs.SubvolSnapshot(target, parentp, readonly); err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	err = t.Commit()
 | 
			
		||||
	t = nil
 | 
			
		||||
		readOnly := kind == snapshots.KindView
 | 
			
		||||
		return btrfs.SubvolSnapshot(target, parentp, readOnly)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if target != "" {
 | 
			
		||||
			if derr := btrfs.SubvolDelete(target); derr != nil {
 | 
			
		||||
				log.G(ctx).WithError(derr).WithField("subvolume", target).Error("failed to delete subvolume")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -272,48 +252,41 @@ func (b *snapshotter) mounts(dir string, s storage.Snapshot) ([]mount.Mount, err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) (err error) {
 | 
			
		||||
	usage, err := b.usage(ctx, key)
 | 
			
		||||
	var usage snapshots.Usage
 | 
			
		||||
	usage, err = b.usage(ctx, key)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to compute usage: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, true)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err != nil && t != nil {
 | 
			
		||||
			if rerr := t.Rollback(); rerr != nil {
 | 
			
		||||
				log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	var source, target string
 | 
			
		||||
	err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
 | 
			
		||||
		id, err := storage.CommitActive(ctx, key, name, usage, opts...) // TODO(stevvooe): Resolve a usage value for btrfs
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to commit: %w", err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	source := filepath.Join(b.root, "active", id)
 | 
			
		||||
	target := filepath.Join(b.root, "snapshots", id)
 | 
			
		||||
		source = filepath.Join(b.root, "active", id)
 | 
			
		||||
		target = filepath.Join(b.root, "snapshots", id)
 | 
			
		||||
 | 
			
		||||
	if err := btrfs.SubvolSnapshot(target, source, true); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
		return btrfs.SubvolSnapshot(target, source, true)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	err = t.Commit()
 | 
			
		||||
	t = nil
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if target != "" {
 | 
			
		||||
			if derr := btrfs.SubvolDelete(target); derr != nil {
 | 
			
		||||
				log.G(ctx).WithError(derr).WithField("subvolume", target).Error("failed to delete subvolume")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if source != "" {
 | 
			
		||||
		if derr := btrfs.SubvolDelete(source); derr != nil {
 | 
			
		||||
			// Log as warning, only needed for cleanup, will not cause name collision
 | 
			
		||||
			log.G(ctx).WithError(derr).WithField("subvolume", source).Warn("failed to delete subvolume")
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
@@ -322,13 +295,14 @@ func (b *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
 | 
			
		||||
// called on an read-write or readonly transaction.
 | 
			
		||||
//
 | 
			
		||||
// This can be used to recover mounts after calling View or Prepare.
 | 
			
		||||
func (b *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, false)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	s, err := storage.GetSnapshot(ctx, key)
 | 
			
		||||
	t.Rollback()
 | 
			
		||||
func (b *snapshotter) Mounts(ctx context.Context, key string) (_ []mount.Mount, err error) {
 | 
			
		||||
	var s storage.Snapshot
 | 
			
		||||
 | 
			
		||||
	err = b.ms.WithTransaction(ctx, false, func(ctx context.Context) error {
 | 
			
		||||
		s, err = storage.GetSnapshot(ctx, key)
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, fmt.Errorf("failed to get active snapshot: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
@@ -342,20 +316,10 @@ func (b *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er
 | 
			
		||||
func (b *snapshotter) Remove(ctx context.Context, key string) (err error) {
 | 
			
		||||
	var (
 | 
			
		||||
		source, removed   string
 | 
			
		||||
		readonly        bool
 | 
			
		||||
		readonly, restore bool
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	ctx, t, err := b.ms.TransactionContext(ctx, true)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	defer func() {
 | 
			
		||||
		if err != nil && t != nil {
 | 
			
		||||
			if rerr := t.Rollback(); rerr != nil {
 | 
			
		||||
				log.G(ctx).WithError(rerr).Warn("failed to rollback transaction")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if removed != "" {
 | 
			
		||||
			if derr := btrfs.SubvolDelete(removed); derr != nil {
 | 
			
		||||
				log.G(ctx).WithError(derr).WithField("subvolume", removed).Warn("failed to delete subvolume")
 | 
			
		||||
@@ -363,6 +327,7 @@ func (b *snapshotter) Remove(ctx context.Context, key string) (err error) {
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	err = b.ms.WithTransaction(ctx, true, func(ctx context.Context) error {
 | 
			
		||||
		id, k, err := storage.Remove(ctx, key)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to remove snapshot: %w", err)
 | 
			
		||||
@@ -382,18 +347,21 @@ func (b *snapshotter) Remove(ctx context.Context, key string) (err error) {
 | 
			
		||||
			readonly = true
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	if err := btrfs.SubvolSnapshot(removed, source, readonly); err != nil {
 | 
			
		||||
		if err = btrfs.SubvolSnapshot(removed, source, readonly); err != nil {
 | 
			
		||||
			removed = ""
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	if err := btrfs.SubvolDelete(source); err != nil {
 | 
			
		||||
		if err = btrfs.SubvolDelete(source); err != nil {
 | 
			
		||||
			return fmt.Errorf("failed to remove snapshot %v: %w", source, err)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	err = t.Commit()
 | 
			
		||||
	t = nil
 | 
			
		||||
		restore = true
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		if restore { // means failed to commit transaction
 | 
			
		||||
			// Attempt to restore source
 | 
			
		||||
			if err1 := btrfs.SubvolSnapshot(source, removed, readonly); err1 != nil {
 | 
			
		||||
				log.G(ctx).WithFields(logrus.Fields{
 | 
			
		||||
@@ -404,6 +372,8 @@ func (b *snapshotter) Remove(ctx context.Context, key string) (err error) {
 | 
			
		||||
				// Keep removed to allow for manual restore
 | 
			
		||||
				removed = ""
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user