Merge pull request #7781 from dcantah/withtransaction
metastore: Add WithTransaction convenience method
This commit is contained in:
		| @@ -42,10 +42,13 @@ import ( | |||||||
| type fsType string | type fsType string | ||||||
|  |  | ||||||
| const ( | const ( | ||||||
| 	metadataFileName               = "metadata.db" |  | ||||||
| 	fsTypeExt4 fsType = "ext4" | 	fsTypeExt4 fsType = "ext4" | ||||||
| 	fsTypeExt2 fsType = "ext2" | 	fsTypeExt2 fsType = "ext2" | ||||||
| 	fsTypeXFS  fsType = "xfs" | 	fsTypeXFS  fsType = "xfs" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	metadataFileName        = "metadata.db" | ||||||
| 	devmapperSnapshotFsType = "containerd.io/snapshot/devmapper/fstype" | 	devmapperSnapshotFsType = "containerd.io/snapshot/devmapper/fstype" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -111,7 +114,7 @@ func (s *Snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, err | |||||||
| 		err  error | 		err  error | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	err = s.withTransaction(ctx, false, func(ctx context.Context) error { | 	err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
| 		_, info, _, err = storage.GetInfo(ctx, key) | 		_, info, _, err = storage.GetInfo(ctx, key) | ||||||
| 		return err | 		return err | ||||||
| 	}) | 	}) | ||||||
| @@ -124,7 +127,7 @@ func (s *Snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpath | |||||||
| 	log.G(ctx).Debugf("update: %s", strings.Join(fieldpaths, ", ")) | 	log.G(ctx).Debugf("update: %s", strings.Join(fieldpaths, ", ")) | ||||||
|  |  | ||||||
| 	var err error | 	var err error | ||||||
| 	err = s.withTransaction(ctx, true, func(ctx context.Context) error { | 	err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
| 		info, err = storage.UpdateInfo(ctx, info, fieldpaths...) | 		info, err = storage.UpdateInfo(ctx, info, fieldpaths...) | ||||||
| 		return err | 		return err | ||||||
| 	}) | 	}) | ||||||
| @@ -143,7 +146,7 @@ func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, e | |||||||
| 		usage snapshots.Usage | 		usage snapshots.Usage | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	err = s.withTransaction(ctx, false, func(ctx context.Context) error { | 	err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
| 		id, info, usage, err = storage.GetInfo(ctx, key) | 		id, info, usage, err = storage.GetInfo(ctx, key) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| @@ -183,7 +186,7 @@ func (s *Snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er | |||||||
| 		err  error | 		err  error | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	err = s.withTransaction(ctx, false, func(ctx context.Context) error { | 	err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
| 		snap, err = storage.GetSnapshot(ctx, key) | 		snap, err = storage.GetSnapshot(ctx, key) | ||||||
| 		return err | 		return err | ||||||
| 	}) | 	}) | ||||||
| @@ -206,7 +209,7 @@ func (s *Snapshotter) Prepare(ctx context.Context, key, parent string, opts ...s | |||||||
| 		err    error | 		err    error | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	err = s.withTransaction(ctx, true, func(ctx context.Context) error { | 	err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
| 		mounts, err = s.createSnapshot(ctx, snapshots.KindActive, key, parent, opts...) | 		mounts, err = s.createSnapshot(ctx, snapshots.KindActive, key, parent, opts...) | ||||||
| 		return err | 		return err | ||||||
| 	}) | 	}) | ||||||
| @@ -223,7 +226,7 @@ func (s *Snapshotter) View(ctx context.Context, key, parent string, opts ...snap | |||||||
| 		err    error | 		err    error | ||||||
| 	) | 	) | ||||||
|  |  | ||||||
| 	err = s.withTransaction(ctx, true, func(ctx context.Context) error { | 	err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
| 		mounts, err = s.createSnapshot(ctx, snapshots.KindView, key, parent, opts...) | 		mounts, err = s.createSnapshot(ctx, snapshots.KindView, key, parent, opts...) | ||||||
| 		return err | 		return err | ||||||
| 	}) | 	}) | ||||||
| @@ -237,7 +240,7 @@ func (s *Snapshotter) View(ctx context.Context, key, parent string, opts ...snap | |||||||
| func (s *Snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { | func (s *Snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { | ||||||
| 	log.G(ctx).WithFields(logrus.Fields{"name": name, "key": key}).Debug("commit") | 	log.G(ctx).WithFields(logrus.Fields{"name": name, "key": key}).Debug("commit") | ||||||
|  |  | ||||||
| 	return s.withTransaction(ctx, true, func(ctx context.Context) error { | 	return s.store.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
| 		id, snapInfo, _, err := storage.GetInfo(ctx, key) | 		id, snapInfo, _, err := storage.GetInfo(ctx, key) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return err | 			return err | ||||||
| @@ -294,7 +297,7 @@ func (s *Snapshotter) Commit(ctx context.Context, name, key string, opts ...snap | |||||||
| func (s *Snapshotter) Remove(ctx context.Context, key string) error { | func (s *Snapshotter) Remove(ctx context.Context, key string) error { | ||||||
| 	log.G(ctx).WithField("key", key).Debug("remove") | 	log.G(ctx).WithField("key", key).Debug("remove") | ||||||
|  |  | ||||||
| 	return s.withTransaction(ctx, true, func(ctx context.Context) error { | 	return s.store.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
| 		return s.removeDevice(ctx, key) | 		return s.removeDevice(ctx, key) | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
| @@ -330,7 +333,7 @@ func (s *Snapshotter) removeDevice(ctx context.Context, key string) error { | |||||||
| // Walk iterates through all metadata Info for the stored snapshots and calls the provided function for each. | // Walk iterates through all metadata Info for the stored snapshots and calls the provided function for each. | ||||||
| func (s *Snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error { | func (s *Snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error { | ||||||
| 	log.G(ctx).Debug("walk") | 	log.G(ctx).Debug("walk") | ||||||
| 	return s.withTransaction(ctx, false, func(ctx context.Context) error { | 	return s.store.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
| 		return storage.WalkInfo(ctx, fn, fs...) | 		return storage.WalkInfo(ctx, fn, fs...) | ||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
| @@ -530,48 +533,6 @@ func (s *Snapshotter) buildMounts(ctx context.Context, snap storage.Snapshot, fi | |||||||
| 	return mounts | 	return mounts | ||||||
| } | } | ||||||
|  |  | ||||||
| // withTransaction wraps fn callback with containerd's meta store transaction. |  | ||||||
| // If callback returns an error or transaction is not writable, database transaction will be discarded. |  | ||||||
| func (s *Snapshotter) withTransaction(ctx context.Context, writable bool, fn func(ctx context.Context) error) error { |  | ||||||
| 	ctx, trans, err := s.store.TransactionContext(ctx, writable) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	var result *multierror.Error |  | ||||||
|  |  | ||||||
| 	err = fn(ctx) |  | ||||||
| 	if err != nil { |  | ||||||
| 		result = multierror.Append(result, err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Always rollback if transaction is not writable |  | ||||||
| 	if err != nil || !writable { |  | ||||||
| 		if terr := trans.Rollback(); terr != nil { |  | ||||||
| 			log.G(ctx).WithError(terr).Error("failed to rollback transaction") |  | ||||||
| 			result = multierror.Append(result, fmt.Errorf("rollback failed: %w", terr)) |  | ||||||
| 		} |  | ||||||
| 	} else { |  | ||||||
| 		if terr := trans.Commit(); terr != nil { |  | ||||||
| 			log.G(ctx).WithError(terr).Error("failed to commit transaction") |  | ||||||
| 			result = multierror.Append(result, fmt.Errorf("commit failed: %w", terr)) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if err := result.ErrorOrNil(); err != nil { |  | ||||||
| 		log.G(ctx).WithError(err).Debug("snapshotter error") |  | ||||||
|  |  | ||||||
| 		// Unwrap if just one error |  | ||||||
| 		if len(result.Errors) == 1 { |  | ||||||
| 			return result.Errors[0] |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Cleanup cleans up all removed and unused resources | // Cleanup cleans up all removed and unused resources | ||||||
| func (s *Snapshotter) Cleanup(ctx context.Context) error { | func (s *Snapshotter) Cleanup(ctx context.Context) error { | ||||||
| 	log.G(ctx).Debug("cleanup") | 	log.G(ctx).Debug("cleanup") | ||||||
|   | |||||||
| @@ -127,15 +127,13 @@ func NewSnapshotter(root string, opts ...Opt) (snapshots.Snapshotter, error) { | |||||||
| // | // | ||||||
| // Should be used for parent resolution, existence checks and to discern | // Should be used for parent resolution, existence checks and to discern | ||||||
| // the kind of snapshot. | // the kind of snapshot. | ||||||
| func (o *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) { | func (o *snapshotter) Stat(ctx context.Context, key string) (info snapshots.Info, err error) { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, false) | 	var id string | ||||||
| 	if err != nil { | 	if err := o.ms.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
| 		return snapshots.Info{}, err | 		id, info, _, err = storage.GetInfo(ctx, key) | ||||||
| 	} | 		return err | ||||||
| 	defer t.Rollback() | 	}); err != nil { | ||||||
| 	id, info, _, err := storage.GetInfo(ctx, key) | 		return info, err | ||||||
| 	if err != nil { |  | ||||||
| 		return snapshots.Info{}, err |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if o.upperdirLabel { | 	if o.upperdirLabel { | ||||||
| @@ -144,47 +142,29 @@ func (o *snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, err | |||||||
| 		} | 		} | ||||||
| 		info.Labels[upperdirKey] = o.upperPath(id) | 		info.Labels[upperdirKey] = o.upperPath(id) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return info, nil | 	return info, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (o *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) { | func (o *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (newInfo snapshots.Info, err error) { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, true) | 	err = o.ms.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
|  | 		newInfo, err = storage.UpdateInfo(ctx, info, fieldpaths...) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 		return snapshots.Info{}, err | 			return err | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	rollback := true |  | ||||||
| 	defer func() { |  | ||||||
| 		if rollback { |  | ||||||
| 			if rerr := t.Rollback(); rerr != nil { |  | ||||||
| 				log.G(ctx).WithError(rerr).Warn("failed to rollback transaction") |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	info, err = storage.UpdateInfo(ctx, info, fieldpaths...) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return snapshots.Info{}, err |  | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if o.upperdirLabel { | 		if o.upperdirLabel { | ||||||
| 		id, _, _, err := storage.GetInfo(ctx, info.Name) | 			id, _, _, err := storage.GetInfo(ctx, newInfo.Name) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 			return snapshots.Info{}, err | 				return err | ||||||
| 			} | 			} | ||||||
| 		if info.Labels == nil { | 			if newInfo.Labels == nil { | ||||||
| 			info.Labels = make(map[string]string) | 				newInfo.Labels = make(map[string]string) | ||||||
| 			} | 			} | ||||||
| 		info.Labels[upperdirKey] = o.upperPath(id) | 			newInfo.Labels[upperdirKey] = o.upperPath(id) | ||||||
| 		} | 		} | ||||||
|  | 		return nil | ||||||
| 	rollback = false | 	}) | ||||||
| 	if err := t.Commit(); err != nil { | 	return newInfo, err | ||||||
| 		return snapshots.Info{}, err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return info, nil |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // Usage returns the resources taken by the snapshot identified by key. | // Usage returns the resources taken by the snapshot identified by key. | ||||||
| @@ -193,16 +173,17 @@ func (o *snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpath | |||||||
| // "upper") directory and may take some time. | // "upper") directory and may take some time. | ||||||
| // | // | ||||||
| // For committed snapshots, the value is returned from the metadata database. | // For committed snapshots, the value is returned from the metadata database. | ||||||
| func (o *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) { | func (o *snapshotter) Usage(ctx context.Context, key string) (_ snapshots.Usage, err error) { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, false) | 	var ( | ||||||
| 	if err != nil { | 		usage snapshots.Usage | ||||||
| 		return snapshots.Usage{}, err | 		info  snapshots.Info | ||||||
| 	} | 		id    string | ||||||
| 	id, info, usage, err := storage.GetInfo(ctx, key) | 	) | ||||||
| 	t.Rollback() // transaction no longer needed at this point. | 	if err := o.ms.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
|  | 		id, info, usage, err = storage.GetInfo(ctx, key) | ||||||
| 	if err != nil { | 		return err | ||||||
| 		return snapshots.Usage{}, err | 	}); err != nil { | ||||||
|  | 		return usage, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if info.Kind == snapshots.KindActive { | 	if info.Kind == snapshots.KindActive { | ||||||
| @@ -212,10 +193,8 @@ func (o *snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, e | |||||||
| 			// TODO(stevvooe): Consider not reporting an error in this case. | 			// TODO(stevvooe): Consider not reporting an error in this case. | ||||||
| 			return snapshots.Usage{}, err | 			return snapshots.Usage{}, err | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		usage = snapshots.Usage(du) | 		usage = snapshots.Usage(du) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return usage, nil | 	return usage, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -231,33 +210,22 @@ func (o *snapshotter) View(ctx context.Context, key, parent string, opts ...snap | |||||||
| // called on an read-write or readonly transaction. | // called on an read-write or readonly transaction. | ||||||
| // | // | ||||||
| // This can be used to recover mounts after calling View or Prepare. | // This can be used to recover mounts after calling View or Prepare. | ||||||
| func (o *snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) { | func (o *snapshotter) Mounts(ctx context.Context, key string) (_ []mount.Mount, err error) { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, false) | 	var s storage.Snapshot | ||||||
|  | 	if err := o.ms.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
|  | 		s, err = storage.GetSnapshot(ctx, key) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 		return nil, err | 			return fmt.Errorf("failed to get active mount: %w", err) | ||||||
| 		} | 		} | ||||||
| 	s, err := storage.GetSnapshot(ctx, key) | 		return nil | ||||||
| 	t.Rollback() | 	}); err != nil { | ||||||
| 	if err != nil { | 		return nil, err | ||||||
| 		return nil, fmt.Errorf("failed to get active mount: %w", err) |  | ||||||
| 	} | 	} | ||||||
| 	return o.mounts(s), nil | 	return o.mounts(s), nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (o *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { | func (o *snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, true) | 	return o.ms.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	defer func() { |  | ||||||
| 		if err != nil { |  | ||||||
| 			if rerr := t.Rollback(); rerr != nil { |  | ||||||
| 				log.G(ctx).WithError(rerr).Warn("failed to rollback transaction") |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 		// grab the existing id | 		// grab the existing id | ||||||
| 		id, _, _, err := storage.GetInfo(ctx, key) | 		id, _, _, err := storage.GetInfo(ctx, key) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| @@ -270,39 +238,17 @@ func (o *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap | |||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if _, err = storage.CommitActive(ctx, key, name, snapshots.Usage(usage), opts...); err != nil { | 		if _, err = storage.CommitActive(ctx, key, name, snapshots.Usage(usage), opts...); err != nil { | ||||||
| 		return fmt.Errorf("failed to commit snapshot: %w", err) | 			return fmt.Errorf("failed to commit snapshot %s: %w", key, err) | ||||||
| 		} | 		} | ||||||
| 	return t.Commit() | 		return nil | ||||||
|  | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Remove abandons the snapshot identified by key. The snapshot will | // Remove abandons the snapshot identified by key. The snapshot will | ||||||
| // immediately become unavailable and unrecoverable. Disk space will | // immediately become unavailable and unrecoverable. Disk space will | ||||||
| // be freed up on the next call to `Cleanup`. | // be freed up on the next call to `Cleanup`. | ||||||
| func (o *snapshotter) Remove(ctx context.Context, key string) (err error) { | func (o *snapshotter) Remove(ctx context.Context, key string) (err error) { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, true) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	defer func() { |  | ||||||
| 		if err != nil { |  | ||||||
| 			if rerr := t.Rollback(); rerr != nil { |  | ||||||
| 				log.G(ctx).WithError(rerr).Warn("failed to rollback transaction") |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	_, _, err = storage.Remove(ctx, key) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("failed to remove: %w", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if !o.asyncRemove { |  | ||||||
| 	var removals []string | 	var removals []string | ||||||
| 		removals, err = o.getCleanupDirectories(ctx) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return fmt.Errorf("unable to get directories for removal: %w", err) |  | ||||||
| 		} |  | ||||||
|  |  | ||||||
| 	// Remove directories after the transaction is closed, failures must not | 	// Remove directories after the transaction is closed, failures must not | ||||||
| 	// return error since the transaction is committed with the removal | 	// return error since the transaction is committed with the removal | ||||||
| 	// key no longer available. | 	// key no longer available. | ||||||
| @@ -315,19 +261,25 @@ func (o *snapshotter) Remove(ctx context.Context, key string) (err error) { | |||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  | 	return o.ms.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
|  | 		_, _, err = storage.Remove(ctx, key) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return fmt.Errorf("failed to remove snapshot %s: %w", key, err) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 	return t.Commit() | 		if !o.asyncRemove { | ||||||
|  | 			removals, err = o.getCleanupDirectories(ctx) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return fmt.Errorf("unable to get directories for removal: %w", err) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Walk the snapshots. | // Walk the snapshots. | ||||||
| func (o *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error { | func (o *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, false) | 	return o.ms.WithTransaction(ctx, false, func(ctx context.Context) error { | ||||||
| 	if err != nil { |  | ||||||
| 		return err |  | ||||||
| 	} |  | ||||||
| 	defer t.Rollback() |  | ||||||
| 		if o.upperdirLabel { | 		if o.upperdirLabel { | ||||||
| 			return storage.WalkInfo(ctx, func(ctx context.Context, info snapshots.Info) error { | 			return storage.WalkInfo(ctx, func(ctx context.Context, info snapshots.Info) error { | ||||||
| 				id, _, _, err := storage.GetInfo(ctx, info.Name) | 				id, _, _, err := storage.GetInfo(ctx, info.Name) | ||||||
| @@ -342,6 +294,7 @@ func (o *snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...str | |||||||
| 			}, fs...) | 			}, fs...) | ||||||
| 		} | 		} | ||||||
| 		return storage.WalkInfo(ctx, fn, fs...) | 		return storage.WalkInfo(ctx, fn, fs...) | ||||||
|  | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Cleanup cleans up disk resources from removed or abandoned snapshots | // Cleanup cleans up disk resources from removed or abandoned snapshots | ||||||
| @@ -360,16 +313,17 @@ func (o *snapshotter) Cleanup(ctx context.Context) error { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (o *snapshotter) cleanupDirectories(ctx context.Context) ([]string, error) { | func (o *snapshotter) cleanupDirectories(ctx context.Context) (_ []string, err error) { | ||||||
|  | 	var cleanupDirs []string | ||||||
| 	// Get a write transaction to ensure no other write transaction can be entered | 	// Get a write transaction to ensure no other write transaction can be entered | ||||||
| 	// while the cleanup is scanning. | 	// while the cleanup is scanning. | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, true) | 	if err := o.ms.WithTransaction(ctx, true, func(ctx context.Context) error { | ||||||
| 	if err != nil { | 		cleanupDirs, err = o.getCleanupDirectories(ctx) | ||||||
|  | 		return err | ||||||
|  | 	}); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	return cleanupDirs, nil | ||||||
| 	defer t.Rollback() |  | ||||||
| 	return o.getCleanupDirectories(ctx) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (o *snapshotter) getCleanupDirectories(ctx context.Context) ([]string, error) { | func (o *snapshotter) getCleanupDirectories(ctx context.Context) ([]string, error) { | ||||||
| @@ -402,12 +356,11 @@ func (o *snapshotter) getCleanupDirectories(ctx context.Context) ([]string, erro | |||||||
| } | } | ||||||
|  |  | ||||||
| func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) (_ []mount.Mount, err error) { | func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts []snapshots.Opt) (_ []mount.Mount, err error) { | ||||||
| 	ctx, t, err := o.ms.TransactionContext(ctx, true) | 	var ( | ||||||
| 	if err != nil { | 		s        storage.Snapshot | ||||||
| 		return nil, err | 		td, path string | ||||||
| 	} | 	) | ||||||
|  |  | ||||||
| 	var td, path string |  | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			if td != "" { | 			if td != "" { | ||||||
| @@ -424,50 +377,39 @@ func (o *snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, k | |||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
|  |  | ||||||
|  | 	if err := o.ms.WithTransaction(ctx, true, func(ctx context.Context) (err error) { | ||||||
| 		snapshotDir := filepath.Join(o.root, "snapshots") | 		snapshotDir := filepath.Join(o.root, "snapshots") | ||||||
| 		td, err = o.prepareDirectory(ctx, snapshotDir, kind) | 		td, err = o.prepareDirectory(ctx, snapshotDir, kind) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 		if rerr := t.Rollback(); rerr != nil { | 			return fmt.Errorf("failed to create prepare snapshot dir: %w", err) | ||||||
| 			log.G(ctx).WithError(rerr).Warn("failed to rollback transaction") |  | ||||||
| 		} | 		} | ||||||
| 		return nil, fmt.Errorf("failed to create prepare snapshot dir: %w", err) |  | ||||||
| 	} |  | ||||||
| 	rollback := true |  | ||||||
| 	defer func() { |  | ||||||
| 		if rollback { |  | ||||||
| 			if rerr := t.Rollback(); rerr != nil { |  | ||||||
| 				log.G(ctx).WithError(rerr).Warn("failed to rollback transaction") |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
|  |  | ||||||
| 	s, err := storage.CreateSnapshot(ctx, kind, key, parent, opts...) | 		s, err = storage.CreateSnapshot(ctx, kind, key, parent, opts...) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 		return nil, fmt.Errorf("failed to create snapshot: %w", err) | 			return fmt.Errorf("failed to create snapshot: %w", err) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		if len(s.ParentIDs) > 0 { | 		if len(s.ParentIDs) > 0 { | ||||||
| 			st, err := os.Stat(o.upperPath(s.ParentIDs[0])) | 			st, err := os.Stat(o.upperPath(s.ParentIDs[0])) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 			return nil, fmt.Errorf("failed to stat parent: %w", err) | 				return fmt.Errorf("failed to stat parent: %w", err) | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			stat := st.Sys().(*syscall.Stat_t) | 			stat := st.Sys().(*syscall.Stat_t) | ||||||
|  |  | ||||||
| 			if err := os.Lchown(filepath.Join(td, "fs"), int(stat.Uid), int(stat.Gid)); err != nil { | 			if err := os.Lchown(filepath.Join(td, "fs"), int(stat.Uid), int(stat.Gid)); err != nil { | ||||||
| 			return nil, fmt.Errorf("failed to chown: %w", err) | 				return fmt.Errorf("failed to chown: %w", err) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		path = filepath.Join(snapshotDir, s.ID) | 		path = filepath.Join(snapshotDir, s.ID) | ||||||
| 		if err = os.Rename(td, path); err != nil { | 		if err = os.Rename(td, path); err != nil { | ||||||
| 		return nil, fmt.Errorf("failed to rename: %w", err) | 			return fmt.Errorf("failed to rename: %w", err) | ||||||
| 		} | 		} | ||||||
| 		td = "" | 		td = "" | ||||||
|  |  | ||||||
| 	rollback = false | 		return nil | ||||||
| 	if err = t.Commit(); err != nil { | 	}); err != nil { | ||||||
| 		return nil, fmt.Errorf("commit failed: %w", err) | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return o.mounts(s), nil | 	return o.mounts(s), nil | ||||||
|   | |||||||
| @@ -26,7 +26,9 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"sync" | 	"sync" | ||||||
|  |  | ||||||
|  | 	"github.com/containerd/containerd/log" | ||||||
| 	"github.com/containerd/containerd/snapshots" | 	"github.com/containerd/containerd/snapshots" | ||||||
|  | 	"github.com/hashicorp/go-multierror" | ||||||
| 	bolt "go.etcd.io/bbolt" | 	bolt "go.etcd.io/bbolt" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -104,6 +106,51 @@ func (ms *MetaStore) TransactionContext(ctx context.Context, writable bool) (con | |||||||
| 	return ctx, tx, nil | 	return ctx, tx, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // TransactionCallback represents a callback to be invoked while under a metastore transaction. | ||||||
|  | type TransactionCallback func(ctx context.Context) error | ||||||
|  |  | ||||||
|  | // WithTransaction is a convenience method to run a function `fn` while holding a meta store transaction. | ||||||
|  | // If the callback `fn` returns an error or the transaction is not writable, the database transaction will be discarded. | ||||||
|  | func (ms *MetaStore) WithTransaction(ctx context.Context, writable bool, fn TransactionCallback) error { | ||||||
|  | 	ctx, trans, err := ms.TransactionContext(ctx, writable) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var result *multierror.Error | ||||||
|  | 	err = fn(ctx) | ||||||
|  | 	if err != nil { | ||||||
|  | 		result = multierror.Append(result, err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Always rollback if transaction is not writable | ||||||
|  | 	if err != nil || !writable { | ||||||
|  | 		if terr := trans.Rollback(); terr != nil { | ||||||
|  | 			log.G(ctx).WithError(terr).Error("failed to rollback transaction") | ||||||
|  |  | ||||||
|  | 			result = multierror.Append(result, fmt.Errorf("rollback failed: %w", terr)) | ||||||
|  | 		} | ||||||
|  | 	} else { | ||||||
|  | 		if terr := trans.Commit(); terr != nil { | ||||||
|  | 			log.G(ctx).WithError(terr).Error("failed to commit transaction") | ||||||
|  |  | ||||||
|  | 			result = multierror.Append(result, fmt.Errorf("commit failed: %w", terr)) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if err := result.ErrorOrNil(); err != nil { | ||||||
|  | 		log.G(ctx).WithError(err).Debug("snapshotter error") | ||||||
|  |  | ||||||
|  | 		// Unwrap if just one error | ||||||
|  | 		if len(result.Errors) == 1 { | ||||||
|  | 			return result.Errors[0] | ||||||
|  | 		} | ||||||
|  | 		return err | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
| // Close closes the metastore and any underlying database connections | // Close closes the metastore and any underlying database connections | ||||||
| func (ms *MetaStore) Close() error { | func (ms *MetaStore) Close() error { | ||||||
| 	ms.dbL.Lock() | 	ms.dbL.Lock() | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Derek McGowan
					Derek McGowan