diff --git a/snapshot/storage/boltdb/bolt.go b/snapshot/storage/boltdb/bolt.go index 7aeb02584..9ed698ec8 100644 --- a/snapshot/storage/boltdb/bolt.go +++ b/snapshot/storage/boltdb/bolt.go @@ -70,16 +70,16 @@ func (ms *boltMetastore) TransactionContext(ctx context.Context, writable bool) return ctx, t, nil } -func (ms *boltMetastore) withBucket(ctx context.Context, fn func(ctx context.Context, tx *bolt.Bucket) error) error { +func (ms *boltMetastore) withBucket(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error { t, ok := ctx.Value(transactionKey{}).(*boltFileTransactor) if !ok { return errors.Errorf("no transaction in context") } - bkt := t.tx.Bucket(bucketKeyStorageVersion).Bucket(bucketKeySnapshot) - return fn(ctx, bkt) + bkt := t.tx.Bucket(bucketKeyStorageVersion) + return fn(ctx, bkt.Bucket(bucketKeySnapshot), bkt.Bucket(bucketKeyParents)) } -func (ms *boltMetastore) createBucketIfNotExists(ctx context.Context, fn func(ctx context.Context, tx *bolt.Bucket) error) error { +func (ms *boltMetastore) createBucketIfNotExists(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error { t, ok := ctx.Value(transactionKey{}).(*boltFileTransactor) if !ok { return errors.Errorf("no transaction in context") @@ -89,11 +89,15 @@ func (ms *boltMetastore) createBucketIfNotExists(ctx context.Context, fn func(ct if err != nil { return errors.Wrap(err, "failed to create version bucket") } - bkt, err = bkt.CreateBucketIfNotExists(bucketKeySnapshot) + sbkt, err := bkt.CreateBucketIfNotExists(bucketKeySnapshot) if err != nil { return errors.Wrap(err, "failed to create snapshots bucket") } - return fn(ctx, bkt) + pbkt, err := bkt.CreateBucketIfNotExists(bucketKeyParents) + if err != nil { + return errors.Wrap(err, "failed to create snapshots bucket") + } + return fn(ctx, sbkt, pbkt) } func fromProtoKind(k Kind) snapshot.Kind { @@ -114,21 +118,33 @@ func toProtoKind(k snapshot.Kind) Kind { } } +// parentKey returns a composite key of the parent and child identifiers. The +// parts of the key are separated by a zero byte. func parentKey(parent, child uint64) []byte { - b := make([]byte, binary.Size([]uint64{parent, child})) + b := make([]byte, binary.Size([]uint64{parent, child})+1) i := binary.PutUvarint(b, parent) - j := binary.PutUvarint(b[i:], child) - return b[0 : i+j] + j := binary.PutUvarint(b[i+1:], child) + return b[0 : i+j+1] } -func getParent(b []byte) uint64 { +// parentPrefixKey returns the parent part of the composite key with the +// zero byte separator. +func parentPrefixKey(parent uint64) []byte { + b := make([]byte, binary.Size(parent)+1) + i := binary.PutUvarint(b, parent) + return b[0 : i+1] +} + +// getParentPrefix returns the first part of the composite key which +// represents the parent identifier. +func getParentPrefix(b []byte) uint64 { parent, _ := binary.Uvarint(b) return parent } func (ms *boltMetastore) Stat(ctx context.Context, key string) (snapshot.Info, error) { var ss Snapshot - err := ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error { + err := ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error { return getSnapshot(bkt, key, &ss) }) if err != nil { @@ -144,7 +160,7 @@ func (ms *boltMetastore) Stat(ctx context.Context, key string) (snapshot.Info, e } func (ms *boltMetastore) Walk(ctx context.Context, fn func(context.Context, snapshot.Info) error) error { - return ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error { + return ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error { return bkt.ForEach(func(k, v []byte) error { // skip nested buckets if v == nil { @@ -167,7 +183,7 @@ func (ms *boltMetastore) Walk(ctx context.Context, fn func(context.Context, snap } func (ms *boltMetastore) CreateActive(ctx context.Context, key, parent string, readonly bool) (a storage.Active, err error) { - err = ms.createBucketIfNotExists(ctx, func(ctx context.Context, bkt *bolt.Bucket) error { + err = ms.createBucketIfNotExists(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error { var ( parentS *Snapshot ) @@ -202,11 +218,9 @@ func (ms *boltMetastore) CreateActive(ctx context.Context, key, parent string, r } if parentS != nil { - pbkt, err := bkt.CreateBucketIfNotExists(bucketKeyParents) - if err != nil { - return errors.Wrap(err, "failed to create parent bucket") - } - if err := pbkt.Put(parentKey(parentS.ID, ss.ID), nil); err != nil { + // Store a backlink from the key to the parent. Store the snapshot name + // as the value to allow following the backlink to the snapshot value. + if err := pbkt.Put(parentKey(parentS.ID, ss.ID), []byte(key)); err != nil { return errors.Wrap(err, "failed to write parent link") } @@ -229,7 +243,7 @@ func (ms *boltMetastore) CreateActive(ctx context.Context, key, parent string, r } func (ms *boltMetastore) GetActive(ctx context.Context, key string) (a storage.Active, err error) { - err = ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error { + err = ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error { b := bkt.Get([]byte(key)) if len(b) == 0 { return errors.Errorf("active not found") @@ -285,7 +299,7 @@ func (ms *boltMetastore) parents(bkt *bolt.Bucket, parent *Snapshot) (parents [] } func (ms *boltMetastore) Remove(ctx context.Context, key string) (id string, k snapshot.Kind, err error) { - err = ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error { + err = ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error { var ss Snapshot b := bkt.Get([]byte(key)) if len(b) == 0 { @@ -296,10 +310,9 @@ func (ms *boltMetastore) Remove(ctx context.Context, key string) (id string, k s return errors.Wrap(err, "failed to unmarshal snapshot") } - pbkt := bkt.Bucket(bucketKeyParents) if pbkt != nil { - k, _ := pbkt.Cursor().Seek(parentKey(ss.ID, 0)) - if getParent(k) == ss.ID { + k, _ := pbkt.Cursor().Seek(parentPrefixKey(ss.ID)) + if getParentPrefix(k) == ss.ID { return errors.Errorf("cannot remove snapshot with child") } @@ -329,7 +342,7 @@ func (ms *boltMetastore) Remove(ctx context.Context, key string) (id string, k s } func (ms *boltMetastore) Commit(ctx context.Context, key, name string) (id string, err error) { - err = ms.withBucket(ctx, func(ctx context.Context, bkt *bolt.Bucket) error { + err = ms.withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error { b := bkt.Get([]byte(name)) if len(b) != 0 { return errors.Errorf("key already exists")