metastore: Add WithTransaction convenience method
Most snapshotters end up manually handling the rollback logic, either by calling `t.Rollback()` in every failure path, setting up a custom defer func to log on certain errors, or just deferring `t.Rollback()` even for `snapshotter.Commit()` which *will* cause `t.Rollback()` to return an error afaict, but it's just never checked and luckily bolt handles this alright... The devmapper snapshotter has a solution to this which is to have a method that starts either a read-only or writable transaction inside the method, and you pass in a callback to do your bidding and any failures are rolled back, and if it's writable will handle the commit for you. This seems like the right model to me, it removes the burden from the snapshot author to remember to either defer/call rollback in every method for every failure case. This change exposes the convenience method from devmapper to the snapshots/storage package as a method off of `storage.MetaStore` and moves over the devmapper snapshotter to use this. Signed-off-by: Danny Canter <danny@dcantah.dev>
This commit is contained in:
parent
50da24c5ec
commit
3b71cfd407
@ -42,10 +42,13 @@ import (
|
|||||||
type fsType string
|
type fsType string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
metadataFileName = "metadata.db"
|
|
||||||
fsTypeExt4 fsType = "ext4"
|
fsTypeExt4 fsType = "ext4"
|
||||||
fsTypeExt2 fsType = "ext2"
|
fsTypeExt2 fsType = "ext2"
|
||||||
fsTypeXFS fsType = "xfs"
|
fsTypeXFS fsType = "xfs"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
metadataFileName = "metadata.db"
|
||||||
devmapperSnapshotFsType = "containerd.io/snapshot/devmapper/fstype"
|
devmapperSnapshotFsType = "containerd.io/snapshot/devmapper/fstype"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -111,7 +114,7 @@ func (s *Snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, err
|
|||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
err = s.withTransaction(ctx, false, func(ctx context.Context) error {
|
err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||||
_, info, _, err = storage.GetInfo(ctx, key)
|
_, info, _, err = storage.GetInfo(ctx, key)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -124,7 +127,7 @@ func (s *Snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpath
|
|||||||
log.G(ctx).Debugf("update: %s", strings.Join(fieldpaths, ", "))
|
log.G(ctx).Debugf("update: %s", strings.Join(fieldpaths, ", "))
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
err = s.withTransaction(ctx, true, func(ctx context.Context) error {
|
err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||||
info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
|
info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -143,7 +146,7 @@ func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, e
|
|||||||
usage snapshots.Usage
|
usage snapshots.Usage
|
||||||
)
|
)
|
||||||
|
|
||||||
err = s.withTransaction(ctx, false, func(ctx context.Context) error {
|
err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||||
id, info, usage, err = storage.GetInfo(ctx, key)
|
id, info, usage, err = storage.GetInfo(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -183,7 +186,7 @@ func (s *Snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, er
|
|||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
err = s.withTransaction(ctx, false, func(ctx context.Context) error {
|
err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||||
snap, err = storage.GetSnapshot(ctx, key)
|
snap, err = storage.GetSnapshot(ctx, key)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -206,7 +209,7 @@ func (s *Snapshotter) Prepare(ctx context.Context, key, parent string, opts ...s
|
|||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
err = s.withTransaction(ctx, true, func(ctx context.Context) error {
|
err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||||
mounts, err = s.createSnapshot(ctx, snapshots.KindActive, key, parent, opts...)
|
mounts, err = s.createSnapshot(ctx, snapshots.KindActive, key, parent, opts...)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -223,7 +226,7 @@ func (s *Snapshotter) View(ctx context.Context, key, parent string, opts ...snap
|
|||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
|
|
||||||
err = s.withTransaction(ctx, true, func(ctx context.Context) error {
|
err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||||
mounts, err = s.createSnapshot(ctx, snapshots.KindView, key, parent, opts...)
|
mounts, err = s.createSnapshot(ctx, snapshots.KindView, key, parent, opts...)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -237,7 +240,7 @@ func (s *Snapshotter) View(ctx context.Context, key, parent string, opts ...snap
|
|||||||
func (s *Snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
|
func (s *Snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
|
||||||
log.G(ctx).WithFields(logrus.Fields{"name": name, "key": key}).Debug("commit")
|
log.G(ctx).WithFields(logrus.Fields{"name": name, "key": key}).Debug("commit")
|
||||||
|
|
||||||
return s.withTransaction(ctx, true, func(ctx context.Context) error {
|
return s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||||
id, snapInfo, _, err := storage.GetInfo(ctx, key)
|
id, snapInfo, _, err := storage.GetInfo(ctx, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -294,7 +297,7 @@ func (s *Snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
|
|||||||
func (s *Snapshotter) Remove(ctx context.Context, key string) error {
|
func (s *Snapshotter) Remove(ctx context.Context, key string) error {
|
||||||
log.G(ctx).WithField("key", key).Debug("remove")
|
log.G(ctx).WithField("key", key).Debug("remove")
|
||||||
|
|
||||||
return s.withTransaction(ctx, true, func(ctx context.Context) error {
|
return s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
|
||||||
return s.removeDevice(ctx, key)
|
return s.removeDevice(ctx, key)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -330,7 +333,7 @@ func (s *Snapshotter) removeDevice(ctx context.Context, key string) error {
|
|||||||
// Walk iterates through all metadata Info for the stored snapshots and calls the provided function for each.
|
// Walk iterates through all metadata Info for the stored snapshots and calls the provided function for each.
|
||||||
func (s *Snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error {
|
func (s *Snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error {
|
||||||
log.G(ctx).Debug("walk")
|
log.G(ctx).Debug("walk")
|
||||||
return s.withTransaction(ctx, false, func(ctx context.Context) error {
|
return s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
|
||||||
return storage.WalkInfo(ctx, fn, fs...)
|
return storage.WalkInfo(ctx, fn, fs...)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -530,48 +533,6 @@ func (s *Snapshotter) buildMounts(ctx context.Context, snap storage.Snapshot, fi
|
|||||||
return mounts
|
return mounts
|
||||||
}
|
}
|
||||||
|
|
||||||
// withTransaction wraps fn callback with containerd's meta store transaction.
|
|
||||||
// If callback returns an error or transaction is not writable, database transaction will be discarded.
|
|
||||||
func (s *Snapshotter) withTransaction(ctx context.Context, writable bool, fn func(ctx context.Context) error) error {
|
|
||||||
ctx, trans, err := s.store.TransactionContext(ctx, writable)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var result *multierror.Error
|
|
||||||
|
|
||||||
err = fn(ctx)
|
|
||||||
if err != nil {
|
|
||||||
result = multierror.Append(result, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Always rollback if transaction is not writable
|
|
||||||
if err != nil || !writable {
|
|
||||||
if terr := trans.Rollback(); terr != nil {
|
|
||||||
log.G(ctx).WithError(terr).Error("failed to rollback transaction")
|
|
||||||
result = multierror.Append(result, fmt.Errorf("rollback failed: %w", terr))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if terr := trans.Commit(); terr != nil {
|
|
||||||
log.G(ctx).WithError(terr).Error("failed to commit transaction")
|
|
||||||
result = multierror.Append(result, fmt.Errorf("commit failed: %w", terr))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := result.ErrorOrNil(); err != nil {
|
|
||||||
log.G(ctx).WithError(err).Debug("snapshotter error")
|
|
||||||
|
|
||||||
// Unwrap if just one error
|
|
||||||
if len(result.Errors) == 1 {
|
|
||||||
return result.Errors[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cleanup cleans up all removed and unused resources
|
// Cleanup cleans up all removed and unused resources
|
||||||
func (s *Snapshotter) Cleanup(ctx context.Context) error {
|
func (s *Snapshotter) Cleanup(ctx context.Context) error {
|
||||||
log.G(ctx).Debug("cleanup")
|
log.G(ctx).Debug("cleanup")
|
||||||
|
@ -26,7 +26,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/log"
|
||||||
"github.com/containerd/containerd/snapshots"
|
"github.com/containerd/containerd/snapshots"
|
||||||
|
"github.com/hashicorp/go-multierror"
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -104,6 +106,51 @@ func (ms *MetaStore) TransactionContext(ctx context.Context, writable bool) (con
|
|||||||
return ctx, tx, nil
|
return ctx, tx, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TransactionCallback represents a callback to be invoked while under a metastore transaction.
|
||||||
|
type TransactionCallback func(ctx context.Context) error
|
||||||
|
|
||||||
|
// WithTransaction is a convenience method to run a function `fn` while holding a meta store transaction.
|
||||||
|
// If the callback `fn` returns an error or the transaction is not writable, the database transaction will be discarded.
|
||||||
|
func (ms *MetaStore) WithTransaction(ctx context.Context, writable bool, fn TransactionCallback) error {
|
||||||
|
ctx, trans, err := ms.TransactionContext(ctx, writable)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
var result *multierror.Error
|
||||||
|
err = fn(ctx)
|
||||||
|
if err != nil {
|
||||||
|
result = multierror.Append(result, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Always rollback if transaction is not writable
|
||||||
|
if err != nil || !writable {
|
||||||
|
if terr := trans.Rollback(); terr != nil {
|
||||||
|
log.G(ctx).WithError(terr).Error("failed to rollback transaction")
|
||||||
|
|
||||||
|
result = multierror.Append(result, fmt.Errorf("rollback failed: %w", terr))
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if terr := trans.Commit(); terr != nil {
|
||||||
|
log.G(ctx).WithError(terr).Error("failed to commit transaction")
|
||||||
|
|
||||||
|
result = multierror.Append(result, fmt.Errorf("commit failed: %w", terr))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := result.ErrorOrNil(); err != nil {
|
||||||
|
log.G(ctx).WithError(err).Debug("snapshotter error")
|
||||||
|
|
||||||
|
// Unwrap if just one error
|
||||||
|
if len(result.Errors) == 1 {
|
||||||
|
return result.Errors[0]
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Close closes the metastore and any underlying database connections
|
// Close closes the metastore and any underlying database connections
|
||||||
func (ms *MetaStore) Close() error {
|
func (ms *MetaStore) Close() error {
|
||||||
ms.dbL.Lock()
|
ms.dbL.Lock()
|
||||||
|
Loading…
Reference in New Issue
Block a user