From 7f657ce3dedfa68631865b5b4a3ac218b7af9a03 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Wed, 4 Oct 2017 18:34:32 -0700 Subject: [PATCH] Add database migrations Signed-off-by: Derek McGowan --- metadata/buckets.go | 3 +- metadata/containers_test.go | 3 +- metadata/content.go | 4 +- metadata/db.go | 96 +++++++++++++++++++ metadata/db_test.go | 178 ++++++++++++++++++++++++++++++++++++ metadata/images.go | 16 ++-- metadata/migrations.go | 75 +++++++++++++++ server/server.go | 6 +- 8 files changed, 368 insertions(+), 13 deletions(-) create mode 100644 metadata/db_test.go create mode 100644 metadata/migrations.go diff --git a/metadata/buckets.go b/metadata/buckets.go index b6e3500e7..43849e080 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -28,7 +28,8 @@ import ( // key: object-specific key identifying the storage bucket for the objects // contents. var ( - bucketKeyVersion = []byte("v1") + bucketKeyVersion = []byte(schemaVersion) + bucketKeyDBVersion = []byte("version") // stores the version of the schema bucketKeyObjectLabels = []byte("labels") // stores the labels for a namespace. bucketKeyObjectIndexes = []byte("indexes") // reserved bucketKeyObjectImages = []byte("images") // stores image objects diff --git a/metadata/containers_test.go b/metadata/containers_test.go index 781dc861c..273094d63 100644 --- a/metadata/containers_test.go +++ b/metadata/containers_test.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "reflect" + "strings" "testing" "time" @@ -701,7 +702,7 @@ func testEnv(t *testing.T) (context.Context, *bolt.DB, func()) { ctx, cancel := context.WithCancel(context.Background()) ctx = namespaces.WithNamespace(ctx, "testing") - dirname, err := ioutil.TempDir("", t.Name()+"-") + dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-") if err != nil { t.Fatal(err) } diff --git a/metadata/content.go b/metadata/content.go index 50c98b984..023d5d215 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -406,7 +406,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, commitTime := time.Now().UTC() - sizeEncoded, err := encodeSize(size) + sizeEncoded, err := encodeInt(size) if err != nil { return err } @@ -488,7 +488,7 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error { } // Write size - sizeEncoded, err := encodeSize(info.Size) + sizeEncoded, err := encodeInt(info.Size) if err != nil { return err } diff --git a/metadata/db.go b/metadata/db.go index 17027f12b..08a0dbd04 100644 --- a/metadata/db.go +++ b/metadata/db.go @@ -1,9 +1,30 @@ package metadata import ( + "context" + "encoding/binary" + "time" + "github.com/boltdb/bolt" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/snapshot" + "github.com/pkg/errors" +) + +const ( + // schemaVersion represents the schema version of + // the database. This schema version represents the + // structure of the data in the database. The schema + // can envolve at any time but any backwards + // incompatible changes or structural changes require + // bumping the schema version. + schemaVersion = "v1" + + // dbVersion represents updates to the schema + // version which are additions and compatible with + // prior version of the same schema. + dbVersion = 1 ) type DB struct { @@ -20,6 +41,81 @@ func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *D } } +func (m *DB) Init(ctx context.Context) error { + // errSkip is used when no migration or version needs to be written + // to the database and the transaction can be immediately rolled + // back rather than performing a much slower and unnecessary commit. + var errSkip = errors.New("skip update") + + err := m.db.Update(func(tx *bolt.Tx) error { + var ( + // current schema and version + schema = "v0" + version = 0 + ) + + i := len(migrations) + for ; i > 0; i-- { + migration := migrations[i-1] + + bkt := tx.Bucket([]byte(migration.schema)) + if bkt == nil { + // Hasn't encountered another schema, go to next migration + if schema == "v0" { + continue + } + break + } + if schema == "v0" { + schema = migration.schema + vb := bkt.Get(bucketKeyDBVersion) + if vb != nil { + v, _ := binary.Varint(vb) + version = int(v) + } + } + + if version >= migration.version { + break + } + } + + // Previous version fo database found + if schema != "v0" { + updates := migrations[i:] + + // No migration updates, return immediately + if len(updates) == 0 { + return errSkip + } + + for _, m := range updates { + t0 := time.Now() + if err := m.migrate(tx); err != nil { + return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version) + } + log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version) + } + } + + bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + + versionEncoded, err := encodeInt(dbVersion) + if err != nil { + return err + } + + return bkt.Put(bucketKeyDBVersion, versionEncoded) + }) + if err == errSkip { + err = nil + } + return err +} + func (m *DB) ContentStore() content.Store { if m.cs == nil { return nil diff --git a/metadata/db_test.go b/metadata/db_test.go new file mode 100644 index 000000000..e14242765 --- /dev/null +++ b/metadata/db_test.go @@ -0,0 +1,178 @@ +package metadata + +import ( + "encoding/binary" + "testing" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/errdefs" + "github.com/pkg/errors" +) + +func TestInit(t *testing.T) { + ctx, db, cancel := testEnv(t) + defer cancel() + + if err := NewDB(db, nil, nil).Init(ctx); err != nil { + t.Fatal(err) + } + + version, err := readDBVersion(db, bucketKeyVersion) + if err != nil { + t.Fatal(err) + } + if version != dbVersion { + t.Fatalf("Unexpected version %d, expected %d", version, dbVersion) + } +} + +func TestMigrations(t *testing.T) { + migrationTests := []struct { + name string + init func(*bolt.Tx) error + check func(*bolt.Tx) error + }{ + { + name: "ChildrenKey", + init: func(tx *bolt.Tx) error { + bkt, err := createSnapshotterBucket(tx, "testing", "testing") + if err != nil { + return err + } + + snapshots := []struct { + key string + parent string + }{ + { + key: "k1", + parent: "", + }, + { + key: "k2", + parent: "k1", + }, + { + key: "k2a", + parent: "k1", + }, + { + key: "a1", + parent: "k2", + }, + } + + for _, s := range snapshots { + sbkt, err := bkt.CreateBucket([]byte(s.key)) + if err != nil { + return err + } + if err := sbkt.Put(bucketKeyParent, []byte(s.parent)); err != nil { + return err + } + } + + return nil + }, + check: func(tx *bolt.Tx) error { + bkt := getSnapshotterBucket(tx, "testing", "testing") + if bkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "snapshots bucket not found") + } + snapshots := []struct { + key string + children []string + }{ + { + key: "k1", + children: []string{"k2", "k2a"}, + }, + { + key: "k2", + children: []string{"a1"}, + }, + { + key: "k2a", + children: []string{}, + }, + { + key: "a1", + children: []string{}, + }, + } + + for _, s := range snapshots { + sbkt := bkt.Bucket([]byte(s.key)) + if sbkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "key does not exist") + } + + cbkt := sbkt.Bucket(bucketKeyChildren) + var cn int + if cbkt != nil { + cn = cbkt.Stats().KeyN + } + + if cn != len(s.children) { + return errors.Errorf("unexpected number of children %d, expected %d", cn, len(s.children)) + } + + for _, ch := range s.children { + if v := cbkt.Get([]byte(ch)); v == nil { + return errors.Errorf("missing child record for %s", ch) + } + } + } + + return nil + }, + }, + } + + if len(migrationTests) != len(migrations) { + t.Fatal("Each migration must have a test case") + } + + for i, mt := range migrationTests { + t.Run(mt.name, runMigrationTest(i, mt.init, mt.check)) + } +} + +func runMigrationTest(i int, init, check func(*bolt.Tx) error) func(t *testing.T) { + return func(t *testing.T) { + _, db, cancel := testEnv(t) + defer cancel() + + if err := db.Update(init); err != nil { + t.Fatal(err) + } + + if err := db.Update(migrations[i].migrate); err != nil { + t.Fatal(err) + } + + if err := db.View(check); err != nil { + t.Fatal(err) + } + } +} + +func readDBVersion(db *bolt.DB, schema []byte) (int, error) { + var version int + if err := db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(schema) + if bkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "no version bucket") + } + vb := bkt.Get(bucketKeyDBVersion) + if vb == nil { + return errors.Wrap(errdefs.ErrNotFound, "no version value") + } + v, _ := binary.Varint(vb) + version = int(v) + return nil + }); err != nil { + return 0, err + } + return version, nil +} diff --git a/metadata/images.go b/metadata/images.go index 8720244df..7e5e3c76e 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -284,7 +284,7 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return err } - sizeEncoded, err := encodeSize(image.Target.Size) + sizeEncoded, err := encodeInt(image.Target.Size) if err != nil { return err } @@ -302,15 +302,15 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return nil } -func encodeSize(size int64) ([]byte, error) { +func encodeInt(i int64) ([]byte, error) { var ( - buf [binary.MaxVarintLen64]byte - sizeEncoded = buf[:] + buf [binary.MaxVarintLen64]byte + iEncoded = buf[:] ) - sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)] + iEncoded = iEncoded[:binary.PutVarint(iEncoded, i)] - if len(sizeEncoded) == 0 { - return nil, fmt.Errorf("failed encoding size = %v", size) + if len(iEncoded) == 0 { + return nil, fmt.Errorf("failed encoding integer = %v", i) } - return sizeEncoded, nil + return iEncoded, nil } diff --git a/metadata/migrations.go b/metadata/migrations.go new file mode 100644 index 000000000..bc1761f01 --- /dev/null +++ b/metadata/migrations.go @@ -0,0 +1,75 @@ +package metadata + +import "github.com/boltdb/bolt" + +type migration struct { + schema string + version int + migrate func(*bolt.Tx) error +} + +var migrations = []migration{ + { + schema: "v1", + version: 1, + migrate: addChildLinks, + }, +} + +// addChildLinks Adds children key to the snapshotters to enforce snapshot +// entries cannot be removed which have children +func addChildLinks(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + nbkt := v1bkt.Bucket(k) + + sbkt := nbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + // Iterate through each snapshotter + if err := sbkt.ForEach(func(sk, sv []byte) error { + if sv != nil { + return nil + } + snbkt := sbkt.Bucket(sk) + + // Iterate through each snapshot + return snbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + parent := snbkt.Bucket(k).Get(bucketKeyParent) + if len(parent) > 0 { + pbkt := snbkt.Bucket(parent) + if pbkt == nil { + // Not enforcing consistency during migration, skip + return nil + } + cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren) + if err != nil { + return err + } + if err := cbkt.Put(k, nil); err != nil { + return err + } + } + + return nil + }) + }); err != nil { + return err + } + } + } + + return nil +} diff --git a/server/server.go b/server/server.go index 8329adc67..f26736357 100644 --- a/server/server.go +++ b/server/server.go @@ -205,7 +205,11 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) { if err != nil { return nil, err } - return metadata.NewDB(db, cs.(content.Store), snapshotters), nil + mdb := metadata.NewDB(db, cs.(content.Store), snapshotters) + if err := mdb.Init(ic.Context); err != nil { + return nil, err + } + return mdb, nil }, })