Add database migrations
Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
8d892a651b
commit
7f657ce3de
@ -28,7 +28,8 @@ import (
|
||||
// key: object-specific key identifying the storage bucket for the objects
|
||||
// contents.
|
||||
var (
|
||||
bucketKeyVersion = []byte("v1")
|
||||
bucketKeyVersion = []byte(schemaVersion)
|
||||
bucketKeyDBVersion = []byte("version") // stores the version of the schema
|
||||
bucketKeyObjectLabels = []byte("labels") // stores the labels for a namespace.
|
||||
bucketKeyObjectIndexes = []byte("indexes") // reserved
|
||||
bucketKeyObjectImages = []byte("images") // stores image objects
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -701,7 +702,7 @@ func testEnv(t *testing.T) (context.Context, *bolt.DB, func()) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ctx = namespaces.WithNamespace(ctx, "testing")
|
||||
|
||||
dirname, err := ioutil.TempDir("", t.Name()+"-")
|
||||
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -406,7 +406,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
|
||||
|
||||
commitTime := time.Now().UTC()
|
||||
|
||||
sizeEncoded, err := encodeSize(size)
|
||||
sizeEncoded, err := encodeInt(size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -488,7 +488,7 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
|
||||
}
|
||||
|
||||
// Write size
|
||||
sizeEncoded, err := encodeSize(info.Size)
|
||||
sizeEncoded, err := encodeInt(info.Size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1,9 +1,30 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
// schemaVersion represents the schema version of
|
||||
// the database. This schema version represents the
|
||||
// structure of the data in the database. The schema
|
||||
// can envolve at any time but any backwards
|
||||
// incompatible changes or structural changes require
|
||||
// bumping the schema version.
|
||||
schemaVersion = "v1"
|
||||
|
||||
// dbVersion represents updates to the schema
|
||||
// version which are additions and compatible with
|
||||
// prior version of the same schema.
|
||||
dbVersion = 1
|
||||
)
|
||||
|
||||
type DB struct {
|
||||
@ -20,6 +41,81 @@ func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *D
|
||||
}
|
||||
}
|
||||
|
||||
func (m *DB) Init(ctx context.Context) error {
|
||||
// errSkip is used when no migration or version needs to be written
|
||||
// to the database and the transaction can be immediately rolled
|
||||
// back rather than performing a much slower and unnecessary commit.
|
||||
var errSkip = errors.New("skip update")
|
||||
|
||||
err := m.db.Update(func(tx *bolt.Tx) error {
|
||||
var (
|
||||
// current schema and version
|
||||
schema = "v0"
|
||||
version = 0
|
||||
)
|
||||
|
||||
i := len(migrations)
|
||||
for ; i > 0; i-- {
|
||||
migration := migrations[i-1]
|
||||
|
||||
bkt := tx.Bucket([]byte(migration.schema))
|
||||
if bkt == nil {
|
||||
// Hasn't encountered another schema, go to next migration
|
||||
if schema == "v0" {
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
if schema == "v0" {
|
||||
schema = migration.schema
|
||||
vb := bkt.Get(bucketKeyDBVersion)
|
||||
if vb != nil {
|
||||
v, _ := binary.Varint(vb)
|
||||
version = int(v)
|
||||
}
|
||||
}
|
||||
|
||||
if version >= migration.version {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Previous version fo database found
|
||||
if schema != "v0" {
|
||||
updates := migrations[i:]
|
||||
|
||||
// No migration updates, return immediately
|
||||
if len(updates) == 0 {
|
||||
return errSkip
|
||||
}
|
||||
|
||||
for _, m := range updates {
|
||||
t0 := time.Now()
|
||||
if err := m.migrate(tx); err != nil {
|
||||
return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version)
|
||||
}
|
||||
log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version)
|
||||
}
|
||||
}
|
||||
|
||||
bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
versionEncoded, err := encodeInt(dbVersion)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bkt.Put(bucketKeyDBVersion, versionEncoded)
|
||||
})
|
||||
if err == errSkip {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *DB) ContentStore() content.Store {
|
||||
if m.cs == nil {
|
||||
return nil
|
||||
|
178
metadata/db_test.go
Normal file
178
metadata/db_test.go
Normal file
@ -0,0 +1,178 @@
|
||||
package metadata
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"testing"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
ctx, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
|
||||
if err := NewDB(db, nil, nil).Init(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
version, err := readDBVersion(db, bucketKeyVersion)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if version != dbVersion {
|
||||
t.Fatalf("Unexpected version %d, expected %d", version, dbVersion)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMigrations(t *testing.T) {
|
||||
migrationTests := []struct {
|
||||
name string
|
||||
init func(*bolt.Tx) error
|
||||
check func(*bolt.Tx) error
|
||||
}{
|
||||
{
|
||||
name: "ChildrenKey",
|
||||
init: func(tx *bolt.Tx) error {
|
||||
bkt, err := createSnapshotterBucket(tx, "testing", "testing")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
snapshots := []struct {
|
||||
key string
|
||||
parent string
|
||||
}{
|
||||
{
|
||||
key: "k1",
|
||||
parent: "",
|
||||
},
|
||||
{
|
||||
key: "k2",
|
||||
parent: "k1",
|
||||
},
|
||||
{
|
||||
key: "k2a",
|
||||
parent: "k1",
|
||||
},
|
||||
{
|
||||
key: "a1",
|
||||
parent: "k2",
|
||||
},
|
||||
}
|
||||
|
||||
for _, s := range snapshots {
|
||||
sbkt, err := bkt.CreateBucket([]byte(s.key))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := sbkt.Put(bucketKeyParent, []byte(s.parent)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
check: func(tx *bolt.Tx) error {
|
||||
bkt := getSnapshotterBucket(tx, "testing", "testing")
|
||||
if bkt == nil {
|
||||
return errors.Wrap(errdefs.ErrNotFound, "snapshots bucket not found")
|
||||
}
|
||||
snapshots := []struct {
|
||||
key string
|
||||
children []string
|
||||
}{
|
||||
{
|
||||
key: "k1",
|
||||
children: []string{"k2", "k2a"},
|
||||
},
|
||||
{
|
||||
key: "k2",
|
||||
children: []string{"a1"},
|
||||
},
|
||||
{
|
||||
key: "k2a",
|
||||
children: []string{},
|
||||
},
|
||||
{
|
||||
key: "a1",
|
||||
children: []string{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, s := range snapshots {
|
||||
sbkt := bkt.Bucket([]byte(s.key))
|
||||
if sbkt == nil {
|
||||
return errors.Wrap(errdefs.ErrNotFound, "key does not exist")
|
||||
}
|
||||
|
||||
cbkt := sbkt.Bucket(bucketKeyChildren)
|
||||
var cn int
|
||||
if cbkt != nil {
|
||||
cn = cbkt.Stats().KeyN
|
||||
}
|
||||
|
||||
if cn != len(s.children) {
|
||||
return errors.Errorf("unexpected number of children %d, expected %d", cn, len(s.children))
|
||||
}
|
||||
|
||||
for _, ch := range s.children {
|
||||
if v := cbkt.Get([]byte(ch)); v == nil {
|
||||
return errors.Errorf("missing child record for %s", ch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if len(migrationTests) != len(migrations) {
|
||||
t.Fatal("Each migration must have a test case")
|
||||
}
|
||||
|
||||
for i, mt := range migrationTests {
|
||||
t.Run(mt.name, runMigrationTest(i, mt.init, mt.check))
|
||||
}
|
||||
}
|
||||
|
||||
func runMigrationTest(i int, init, check func(*bolt.Tx) error) func(t *testing.T) {
|
||||
return func(t *testing.T) {
|
||||
_, db, cancel := testEnv(t)
|
||||
defer cancel()
|
||||
|
||||
if err := db.Update(init); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := db.Update(migrations[i].migrate); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := db.View(check); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func readDBVersion(db *bolt.DB, schema []byte) (int, error) {
|
||||
var version int
|
||||
if err := db.View(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(schema)
|
||||
if bkt == nil {
|
||||
return errors.Wrap(errdefs.ErrNotFound, "no version bucket")
|
||||
}
|
||||
vb := bkt.Get(bucketKeyDBVersion)
|
||||
if vb == nil {
|
||||
return errors.Wrap(errdefs.ErrNotFound, "no version value")
|
||||
}
|
||||
v, _ := binary.Varint(vb)
|
||||
version = int(v)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return version, nil
|
||||
}
|
@ -284,7 +284,7 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sizeEncoded, err := encodeSize(image.Target.Size)
|
||||
sizeEncoded, err := encodeInt(image.Target.Size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -302,15 +302,15 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func encodeSize(size int64) ([]byte, error) {
|
||||
func encodeInt(i int64) ([]byte, error) {
|
||||
var (
|
||||
buf [binary.MaxVarintLen64]byte
|
||||
sizeEncoded = buf[:]
|
||||
iEncoded = buf[:]
|
||||
)
|
||||
sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)]
|
||||
iEncoded = iEncoded[:binary.PutVarint(iEncoded, i)]
|
||||
|
||||
if len(sizeEncoded) == 0 {
|
||||
return nil, fmt.Errorf("failed encoding size = %v", size)
|
||||
if len(iEncoded) == 0 {
|
||||
return nil, fmt.Errorf("failed encoding integer = %v", i)
|
||||
}
|
||||
return sizeEncoded, nil
|
||||
return iEncoded, nil
|
||||
}
|
||||
|
75
metadata/migrations.go
Normal file
75
metadata/migrations.go
Normal file
@ -0,0 +1,75 @@
|
||||
package metadata
|
||||
|
||||
import "github.com/boltdb/bolt"
|
||||
|
||||
type migration struct {
|
||||
schema string
|
||||
version int
|
||||
migrate func(*bolt.Tx) error
|
||||
}
|
||||
|
||||
var migrations = []migration{
|
||||
{
|
||||
schema: "v1",
|
||||
version: 1,
|
||||
migrate: addChildLinks,
|
||||
},
|
||||
}
|
||||
|
||||
// addChildLinks Adds children key to the snapshotters to enforce snapshot
|
||||
// entries cannot be removed which have children
|
||||
func addChildLinks(tx *bolt.Tx) error {
|
||||
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||
if v1bkt == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// iterate through each namespace
|
||||
v1c := v1bkt.Cursor()
|
||||
|
||||
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
|
||||
if v != nil {
|
||||
continue
|
||||
}
|
||||
nbkt := v1bkt.Bucket(k)
|
||||
|
||||
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
|
||||
if sbkt != nil {
|
||||
// Iterate through each snapshotter
|
||||
if err := sbkt.ForEach(func(sk, sv []byte) error {
|
||||
if sv != nil {
|
||||
return nil
|
||||
}
|
||||
snbkt := sbkt.Bucket(sk)
|
||||
|
||||
// Iterate through each snapshot
|
||||
return snbkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
parent := snbkt.Bucket(k).Get(bucketKeyParent)
|
||||
if len(parent) > 0 {
|
||||
pbkt := snbkt.Bucket(parent)
|
||||
if pbkt == nil {
|
||||
// Not enforcing consistency during migration, skip
|
||||
return nil
|
||||
}
|
||||
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := cbkt.Put(k, nil); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
@ -205,7 +205,11 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return metadata.NewDB(db, cs.(content.Store), snapshotters), nil
|
||||
mdb := metadata.NewDB(db, cs.(content.Store), snapshotters)
|
||||
if err := mdb.Init(ic.Context); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mdb, nil
|
||||
},
|
||||
})
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user