diff --git a/differ/differ.go b/differ/differ.go index 436e33b44..8d7cd19d8 100644 --- a/differ/differ.go +++ b/differ/differ.go @@ -6,7 +6,6 @@ import ( "os" "strings" - "github.com/boltdb/bolt" "github.com/containerd/containerd/archive" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" @@ -26,19 +25,14 @@ func init() { Type: plugin.DiffPlugin, ID: "walking", Requires: []plugin.Type{ - plugin.ContentPlugin, plugin.MetadataPlugin, }, Init: func(ic *plugin.InitContext) (interface{}, error) { - c, err := ic.Get(plugin.ContentPlugin) - if err != nil { - return nil, err - } md, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - return NewWalkingDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store))) + return NewWalkingDiff(md.(*metadata.DB).ContentStore()) }, }) } diff --git a/linux/runtime.go b/linux/runtime.go index 91d94be2f..f7b014644 100644 --- a/linux/runtime.go +++ b/linux/runtime.go @@ -112,7 +112,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { state: ic.State, monitor: monitor.(runtime.TaskMonitor), tasks: runtime.NewTaskList(), - db: m.(*bolt.DB), + db: m.(*metadata.DB), address: ic.Address, events: ic.Events, config: cfg, @@ -138,7 +138,7 @@ type Runtime struct { monitor runtime.TaskMonitor tasks *runtime.TaskList - db *bolt.DB + db *metadata.DB events *events.Exchange config *Config diff --git a/metadata/bolt.go b/metadata/bolt.go index 221dba5a2..2e4c35270 100644 --- a/metadata/bolt.go +++ b/metadata/bolt.go @@ -17,9 +17,14 @@ func WithTransactionContext(ctx context.Context, tx *bolt.Tx) context.Context { return context.WithValue(ctx, transactionKey{}, tx) } +type transactor interface { + View(fn func(*bolt.Tx) error) error + Update(fn func(*bolt.Tx) error) error +} + // view gets a bolt db transaction either from the context // or starts a new one with the provided bolt database. -func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error { +func view(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error { tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx) if !ok { return db.View(fn) @@ -29,7 +34,7 @@ func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error { // update gets a writable bolt db transaction either from the context // or starts a new one with the provided bolt database. -func update(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error { +func update(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error { tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx) if !ok { return db.Update(fn) diff --git a/metadata/buckets.go b/metadata/buckets.go index 6097cbf8f..43849e080 100644 --- a/metadata/buckets.go +++ b/metadata/buckets.go @@ -28,7 +28,8 @@ import ( // key: object-specific key identifying the storage bucket for the objects // contents. var ( - bucketKeyVersion = []byte("v1") + bucketKeyVersion = []byte(schemaVersion) + bucketKeyDBVersion = []byte("version") // stores the version of the schema bucketKeyObjectLabels = []byte("labels") // stores the labels for a namespace. bucketKeyObjectIndexes = []byte("indexes") // reserved bucketKeyObjectImages = []byte("images") // stores image objects @@ -45,6 +46,7 @@ var ( bucketKeyRuntime = []byte("runtime") bucketKeyName = []byte("name") bucketKeyParent = []byte("parent") + bucketKeyChildren = []byte("children") bucketKeyOptions = []byte("options") bucketKeySpec = []byte("spec") bucketKeySnapshotKey = []byte("snapshotKey") diff --git a/metadata/containers_test.go b/metadata/containers_test.go index 781dc861c..273094d63 100644 --- a/metadata/containers_test.go +++ b/metadata/containers_test.go @@ -7,6 +7,7 @@ import ( "os" "path/filepath" "reflect" + "strings" "testing" "time" @@ -701,7 +702,7 @@ func testEnv(t *testing.T) (context.Context, *bolt.DB, func()) { ctx, cancel := context.WithCancel(context.Background()) ctx = namespaces.WithNamespace(ctx, "testing") - dirname, err := ioutil.TempDir("", t.Name()+"-") + dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-") if err != nil { t.Fatal(err) } diff --git a/metadata/content.go b/metadata/content.go index a76e5289a..023d5d215 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -19,12 +19,12 @@ import ( type contentStore struct { content.Store - db *bolt.DB + db transactor } -// NewContentStore returns a namespaced content store using an existing +// newContentStore returns a namespaced content store using an existing // content store interface. -func NewContentStore(db *bolt.DB, cs content.Store) content.Store { +func newContentStore(db transactor, cs content.Store) content.Store { return &contentStore{ Store: cs, db: db, @@ -353,7 +353,7 @@ type namespacedWriter struct { content.Writer ref string namespace string - db *bolt.DB + db transactor } func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { @@ -406,7 +406,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, commitTime := time.Now().UTC() - sizeEncoded, err := encodeSize(size) + sizeEncoded, err := encodeInt(size) if err != nil { return err } @@ -488,7 +488,7 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error { } // Write size - sizeEncoded, err := encodeSize(info.Size) + sizeEncoded, err := encodeInt(info.Size) if err != nil { return err } diff --git a/metadata/content_test.go b/metadata/content_test.go index 8d23c33f9..2e4d833ab 100644 --- a/metadata/content_test.go +++ b/metadata/content_test.go @@ -23,7 +23,7 @@ func createContentStore(ctx context.Context, root string) (content.Store, func() return nil, nil, err } - return NewContentStore(db, cs), func() error { + return NewDB(db, cs, nil).ContentStore(), func() error { return db.Close() }, nil } diff --git a/metadata/db.go b/metadata/db.go new file mode 100644 index 000000000..08a0dbd04 --- /dev/null +++ b/metadata/db.go @@ -0,0 +1,140 @@ +package metadata + +import ( + "context" + "encoding/binary" + "time" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/snapshot" + "github.com/pkg/errors" +) + +const ( + // schemaVersion represents the schema version of + // the database. This schema version represents the + // structure of the data in the database. The schema + // can envolve at any time but any backwards + // incompatible changes or structural changes require + // bumping the schema version. + schemaVersion = "v1" + + // dbVersion represents updates to the schema + // version which are additions and compatible with + // prior version of the same schema. + dbVersion = 1 +) + +type DB struct { + db *bolt.DB + ss map[string]snapshot.Snapshotter + cs content.Store +} + +func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB { + return &DB{ + db: db, + ss: ss, + cs: cs, + } +} + +func (m *DB) Init(ctx context.Context) error { + // errSkip is used when no migration or version needs to be written + // to the database and the transaction can be immediately rolled + // back rather than performing a much slower and unnecessary commit. + var errSkip = errors.New("skip update") + + err := m.db.Update(func(tx *bolt.Tx) error { + var ( + // current schema and version + schema = "v0" + version = 0 + ) + + i := len(migrations) + for ; i > 0; i-- { + migration := migrations[i-1] + + bkt := tx.Bucket([]byte(migration.schema)) + if bkt == nil { + // Hasn't encountered another schema, go to next migration + if schema == "v0" { + continue + } + break + } + if schema == "v0" { + schema = migration.schema + vb := bkt.Get(bucketKeyDBVersion) + if vb != nil { + v, _ := binary.Varint(vb) + version = int(v) + } + } + + if version >= migration.version { + break + } + } + + // Previous version fo database found + if schema != "v0" { + updates := migrations[i:] + + // No migration updates, return immediately + if len(updates) == 0 { + return errSkip + } + + for _, m := range updates { + t0 := time.Now() + if err := m.migrate(tx); err != nil { + return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version) + } + log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version) + } + } + + bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion) + if err != nil { + return err + } + + versionEncoded, err := encodeInt(dbVersion) + if err != nil { + return err + } + + return bkt.Put(bucketKeyDBVersion, versionEncoded) + }) + if err == errSkip { + err = nil + } + return err +} + +func (m *DB) ContentStore() content.Store { + if m.cs == nil { + return nil + } + return newContentStore(m, m.cs) +} + +func (m *DB) Snapshotter(name string) snapshot.Snapshotter { + sn, ok := m.ss[name] + if !ok { + return nil + } + return newSnapshotter(m, name, sn) +} + +func (m *DB) View(fn func(*bolt.Tx) error) error { + return m.db.View(fn) +} + +func (m *DB) Update(fn func(*bolt.Tx) error) error { + return m.db.Update(fn) +} diff --git a/metadata/db_test.go b/metadata/db_test.go new file mode 100644 index 000000000..e14242765 --- /dev/null +++ b/metadata/db_test.go @@ -0,0 +1,178 @@ +package metadata + +import ( + "encoding/binary" + "testing" + + "github.com/boltdb/bolt" + "github.com/containerd/containerd/errdefs" + "github.com/pkg/errors" +) + +func TestInit(t *testing.T) { + ctx, db, cancel := testEnv(t) + defer cancel() + + if err := NewDB(db, nil, nil).Init(ctx); err != nil { + t.Fatal(err) + } + + version, err := readDBVersion(db, bucketKeyVersion) + if err != nil { + t.Fatal(err) + } + if version != dbVersion { + t.Fatalf("Unexpected version %d, expected %d", version, dbVersion) + } +} + +func TestMigrations(t *testing.T) { + migrationTests := []struct { + name string + init func(*bolt.Tx) error + check func(*bolt.Tx) error + }{ + { + name: "ChildrenKey", + init: func(tx *bolt.Tx) error { + bkt, err := createSnapshotterBucket(tx, "testing", "testing") + if err != nil { + return err + } + + snapshots := []struct { + key string + parent string + }{ + { + key: "k1", + parent: "", + }, + { + key: "k2", + parent: "k1", + }, + { + key: "k2a", + parent: "k1", + }, + { + key: "a1", + parent: "k2", + }, + } + + for _, s := range snapshots { + sbkt, err := bkt.CreateBucket([]byte(s.key)) + if err != nil { + return err + } + if err := sbkt.Put(bucketKeyParent, []byte(s.parent)); err != nil { + return err + } + } + + return nil + }, + check: func(tx *bolt.Tx) error { + bkt := getSnapshotterBucket(tx, "testing", "testing") + if bkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "snapshots bucket not found") + } + snapshots := []struct { + key string + children []string + }{ + { + key: "k1", + children: []string{"k2", "k2a"}, + }, + { + key: "k2", + children: []string{"a1"}, + }, + { + key: "k2a", + children: []string{}, + }, + { + key: "a1", + children: []string{}, + }, + } + + for _, s := range snapshots { + sbkt := bkt.Bucket([]byte(s.key)) + if sbkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "key does not exist") + } + + cbkt := sbkt.Bucket(bucketKeyChildren) + var cn int + if cbkt != nil { + cn = cbkt.Stats().KeyN + } + + if cn != len(s.children) { + return errors.Errorf("unexpected number of children %d, expected %d", cn, len(s.children)) + } + + for _, ch := range s.children { + if v := cbkt.Get([]byte(ch)); v == nil { + return errors.Errorf("missing child record for %s", ch) + } + } + } + + return nil + }, + }, + } + + if len(migrationTests) != len(migrations) { + t.Fatal("Each migration must have a test case") + } + + for i, mt := range migrationTests { + t.Run(mt.name, runMigrationTest(i, mt.init, mt.check)) + } +} + +func runMigrationTest(i int, init, check func(*bolt.Tx) error) func(t *testing.T) { + return func(t *testing.T) { + _, db, cancel := testEnv(t) + defer cancel() + + if err := db.Update(init); err != nil { + t.Fatal(err) + } + + if err := db.Update(migrations[i].migrate); err != nil { + t.Fatal(err) + } + + if err := db.View(check); err != nil { + t.Fatal(err) + } + } +} + +func readDBVersion(db *bolt.DB, schema []byte) (int, error) { + var version int + if err := db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(schema) + if bkt == nil { + return errors.Wrap(errdefs.ErrNotFound, "no version bucket") + } + vb := bkt.Get(bucketKeyDBVersion) + if vb == nil { + return errors.Wrap(errdefs.ErrNotFound, "no version value") + } + v, _ := binary.Varint(vb) + version = int(v) + return nil + }); err != nil { + return 0, err + } + return version, nil +} diff --git a/metadata/images.go b/metadata/images.go index 8720244df..7e5e3c76e 100644 --- a/metadata/images.go +++ b/metadata/images.go @@ -284,7 +284,7 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return err } - sizeEncoded, err := encodeSize(image.Target.Size) + sizeEncoded, err := encodeInt(image.Target.Size) if err != nil { return err } @@ -302,15 +302,15 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error { return nil } -func encodeSize(size int64) ([]byte, error) { +func encodeInt(i int64) ([]byte, error) { var ( - buf [binary.MaxVarintLen64]byte - sizeEncoded = buf[:] + buf [binary.MaxVarintLen64]byte + iEncoded = buf[:] ) - sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)] + iEncoded = iEncoded[:binary.PutVarint(iEncoded, i)] - if len(sizeEncoded) == 0 { - return nil, fmt.Errorf("failed encoding size = %v", size) + if len(iEncoded) == 0 { + return nil, fmt.Errorf("failed encoding integer = %v", i) } - return sizeEncoded, nil + return iEncoded, nil } diff --git a/metadata/migrations.go b/metadata/migrations.go new file mode 100644 index 000000000..bc1761f01 --- /dev/null +++ b/metadata/migrations.go @@ -0,0 +1,75 @@ +package metadata + +import "github.com/boltdb/bolt" + +type migration struct { + schema string + version int + migrate func(*bolt.Tx) error +} + +var migrations = []migration{ + { + schema: "v1", + version: 1, + migrate: addChildLinks, + }, +} + +// addChildLinks Adds children key to the snapshotters to enforce snapshot +// entries cannot be removed which have children +func addChildLinks(tx *bolt.Tx) error { + v1bkt := tx.Bucket(bucketKeyVersion) + if v1bkt == nil { + return nil + } + + // iterate through each namespace + v1c := v1bkt.Cursor() + + for k, v := v1c.First(); k != nil; k, v = v1c.Next() { + if v != nil { + continue + } + nbkt := v1bkt.Bucket(k) + + sbkt := nbkt.Bucket(bucketKeyObjectSnapshots) + if sbkt != nil { + // Iterate through each snapshotter + if err := sbkt.ForEach(func(sk, sv []byte) error { + if sv != nil { + return nil + } + snbkt := sbkt.Bucket(sk) + + // Iterate through each snapshot + return snbkt.ForEach(func(k, v []byte) error { + if v != nil { + return nil + } + parent := snbkt.Bucket(k).Get(bucketKeyParent) + if len(parent) > 0 { + pbkt := snbkt.Bucket(parent) + if pbkt == nil { + // Not enforcing consistency during migration, skip + return nil + } + cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren) + if err != nil { + return err + } + if err := cbkt.Put(k, nil); err != nil { + return err + } + } + + return nil + }) + }); err != nil { + return err + } + } + } + + return nil +} diff --git a/metadata/snapshot.go b/metadata/snapshot.go index 254bc1f0f..4103827a4 100644 --- a/metadata/snapshot.go +++ b/metadata/snapshot.go @@ -19,12 +19,12 @@ import ( type snapshotter struct { snapshot.Snapshotter name string - db *bolt.DB + db transactor } -// NewSnapshotter returns a new Snapshotter which namespaces the given snapshot -// using the provided name and metadata store. -func NewSnapshotter(db *bolt.DB, name string, sn snapshot.Snapshotter) snapshot.Snapshotter { +// newSnapshotter returns a new Snapshotter which namespaces the given snapshot +// using the provided name and database. +func newSnapshotter(db transactor, name string, sn snapshot.Snapshotter) snapshot.Snapshotter { return &snapshotter{ Snapshotter: sn, name: name, @@ -283,10 +283,18 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re if parent != "" { pbkt := bkt.Bucket([]byte(parent)) if pbkt == nil { - return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", parent) + return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", parent) } bparent = string(pbkt.Get(bucketKeyName)) + cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren) + if err != nil { + return err + } + if err := cbkt.Put([]byte(key), nil); err != nil { + return err + } + if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil { return err } @@ -360,7 +368,6 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap } bkey := string(obkt.Get(bucketKeyName)) - parent := string(obkt.Get(bucketKeyParent)) sid, err := bkt.NextSequence() if err != nil { @@ -372,8 +379,28 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap if err := bbkt.Put(bucketKeyName, []byte(nameKey)); err != nil { return err } - if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil { - return err + + parent := obkt.Get(bucketKeyParent) + if len(parent) > 0 { + pbkt := bkt.Bucket(parent) + if pbkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent)) + } + + cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren) + if err != nil { + return err + } + if err := cbkt.Delete([]byte(key)); err != nil { + return err + } + if err := cbkt.Put([]byte(name), nil); err != nil { + return err + } + + if err := bbkt.Put(bucketKeyParent, parent); err != nil { + return err + } } ts := time.Now().UTC() if err := boltutil.WriteTimestamps(bbkt, ts, ts); err != nil { @@ -400,23 +427,37 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error { } return update(ctx, s.db, func(tx *bolt.Tx) error { - var bkey string + var sbkt *bolt.Bucket bkt := getSnapshotterBucket(tx, ns, s.name) if bkt != nil { - sbkt := bkt.Bucket([]byte(key)) - if sbkt != nil { - bkey = string(sbkt.Get(bucketKeyName)) - } + sbkt = bkt.Bucket([]byte(key)) } - if bkey == "" { + if sbkt == nil { return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key) } - if err := bkt.DeleteBucket([]byte(key)); err != nil { - return err + cbkt := sbkt.Bucket(bucketKeyChildren) + if cbkt != nil { + if child, _ := cbkt.Cursor().First(); child != nil { + return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot remove snapshot with child") + } } - return s.Snapshotter.Remove(ctx, bkey) + parent := sbkt.Get(bucketKeyParent) + if len(parent) > 0 { + pbkt := bkt.Bucket(parent) + if pbkt == nil { + return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent)) + } + cbkt := pbkt.Bucket(bucketKeyChildren) + if cbkt != nil { + if err := cbkt.Delete([]byte(key)); err != nil { + return errors.Wrap(err, "failed to remove child link") + } + } + } + + return bkt.DeleteBucket([]byte(key)) }) } diff --git a/metadata/snapshot_test.go b/metadata/snapshot_test.go index 37800dc40..5111c36b8 100644 --- a/metadata/snapshot_test.go +++ b/metadata/snapshot_test.go @@ -13,7 +13,7 @@ import ( "github.com/containerd/containerd/testutil" ) -func newSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, func() error, error) { +func newTestSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, func() error, error) { naiveRoot := filepath.Join(root, "naive") if err := os.Mkdir(naiveRoot, 0770); err != nil { return nil, nil, err @@ -28,7 +28,7 @@ func newSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, fun return nil, nil, err } - sn := NewSnapshotter(db, "naive", snapshotter) + sn := NewDB(db, nil, map[string]snapshot.Snapshotter{"naive": snapshotter}).Snapshotter("naive") return sn, func() error { return db.Close() @@ -38,5 +38,5 @@ func newSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, fun func TestMetadata(t *testing.T) { // Snapshot tests require mounting, still requires root testutil.RequiresRoot(t) - testsuite.SnapshotterSuite(t, "Metadata", newSnapshotter) + testsuite.SnapshotterSuite(t, "Metadata", newTestSnapshotter) } diff --git a/server/server.go b/server/server.go index 7d4ef704c..f26736357 100644 --- a/server/server.go +++ b/server/server.go @@ -12,18 +12,21 @@ import ( "github.com/boltdb/bolt" containers "github.com/containerd/containerd/api/services/containers/v1" - content "github.com/containerd/containerd/api/services/content/v1" + contentapi "github.com/containerd/containerd/api/services/content/v1" diff "github.com/containerd/containerd/api/services/diff/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1" images "github.com/containerd/containerd/api/services/images/v1" namespaces "github.com/containerd/containerd/api/services/namespaces/v1" - snapshot "github.com/containerd/containerd/api/services/snapshot/v1" + snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1" tasks "github.com/containerd/containerd/api/services/tasks/v1" version "github.com/containerd/containerd/api/services/version/v1" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/content/local" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/snapshot" metrics "github.com/docker/go-metrics" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "golang.org/x/net/context" @@ -175,11 +178,38 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) { plugin.Register(&plugin.Registration{ Type: plugin.MetadataPlugin, ID: "bolt", + Requires: []plugin.Type{ + plugin.ContentPlugin, + plugin.SnapshotPlugin, + }, Init: func(ic *plugin.InitContext) (interface{}, error) { if err := os.MkdirAll(ic.Root, 0711); err != nil { return nil, err } - return bolt.Open(filepath.Join(ic.Root, "meta.db"), 0644, nil) + cs, err := ic.Get(plugin.ContentPlugin) + if err != nil { + return nil, err + } + + rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin) + if err != nil { + return nil, err + } + + snapshotters := make(map[string]snapshot.Snapshotter) + for name, sn := range rawSnapshotters { + snapshotters[name] = sn.(snapshot.Snapshotter) + } + + db, err := bolt.Open(filepath.Join(ic.Root, "meta.db"), 0644, nil) + if err != nil { + return nil, err + } + mdb := metadata.NewDB(db, cs.(content.Store), snapshotters) + if err := mdb.Init(ic.Context); err != nil { + return nil, err + } + return mdb, nil }, }) @@ -199,7 +229,7 @@ func interceptor( ctx = log.WithModule(ctx, "tasks") case containers.ContainersServer: ctx = log.WithModule(ctx, "containers") - case content.ContentServer: + case contentapi.ContentServer: ctx = log.WithModule(ctx, "content") case images.ImagesServer: ctx = log.WithModule(ctx, "images") @@ -207,7 +237,7 @@ func interceptor( // No need to change the context case version.VersionServer: ctx = log.WithModule(ctx, "version") - case snapshot.SnapshotsServer: + case snapshotapi.SnapshotsServer: ctx = log.WithModule(ctx, "snapshot") case diff.DiffServer: ctx = log.WithModule(ctx, "diff") diff --git a/services/containers/service.go b/services/containers/service.go index d6aeedcba..4fcb8dfeb 100644 --- a/services/containers/service.go +++ b/services/containers/service.go @@ -28,17 +28,17 @@ func init() { if err != nil { return nil, err } - return NewService(m.(*bolt.DB), ic.Events), nil + return NewService(m.(*metadata.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB + db *metadata.DB publisher events.Publisher } -func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer { +func NewService(db *metadata.DB, publisher events.Publisher) api.ContainersServer { return &Service{db: db, publisher: publisher} } diff --git a/services/content/service.go b/services/content/service.go index cd2369925..6040793c2 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -4,7 +4,6 @@ import ( "io" "sync" - "github.com/boltdb/bolt" api "github.com/containerd/containerd/api/services/content/v1" eventsapi "github.com/containerd/containerd/api/services/events/v1" "github.com/containerd/containerd/content" @@ -40,7 +39,6 @@ func init() { Type: plugin.GRPCPlugin, ID: "content", Requires: []plugin.Type{ - plugin.ContentPlugin, plugin.MetadataPlugin, }, Init: NewService, @@ -48,17 +46,13 @@ func init() { } func NewService(ic *plugin.InitContext) (interface{}, error) { - c, err := ic.Get(plugin.ContentPlugin) - if err != nil { - return nil, err - } m, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store)) + return &Service{ - store: cs, + store: m.(*metadata.DB).ContentStore(), publisher: ic.Events, }, nil } diff --git a/services/images/service.go b/services/images/service.go index 975ba0389..5e2697427 100644 --- a/services/images/service.go +++ b/services/images/service.go @@ -28,17 +28,17 @@ func init() { if err != nil { return nil, err } - return NewService(m.(*bolt.DB), ic.Events), nil + return NewService(m.(*metadata.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB + db *metadata.DB publisher events.Publisher } -func NewService(db *bolt.DB, publisher events.Publisher) imagesapi.ImagesServer { +func NewService(db *metadata.DB, publisher events.Publisher) imagesapi.ImagesServer { return &Service{ db: db, publisher: publisher, diff --git a/services/namespaces/service.go b/services/namespaces/service.go index 61e0eeba8..e7f7a15ac 100644 --- a/services/namespaces/service.go +++ b/services/namespaces/service.go @@ -29,19 +29,19 @@ func init() { if err != nil { return nil, err } - return NewService(m.(*bolt.DB), ic.Events), nil + return NewService(m.(*metadata.DB), ic.Events), nil }, }) } type Service struct { - db *bolt.DB + db *metadata.DB publisher events.Publisher } var _ api.NamespacesServer = &Service{} -func NewService(db *bolt.DB, publisher events.Publisher) api.NamespacesServer { +func NewService(db *metadata.DB, publisher events.Publisher) api.NamespacesServer { return &Service{ db: db, publisher: publisher, diff --git a/services/snapshot/service.go b/services/snapshot/service.go index 502958e04..9d8b4cd79 100644 --- a/services/snapshot/service.go +++ b/services/snapshot/service.go @@ -3,7 +3,6 @@ package snapshot import ( gocontext "context" - "github.com/boltdb/bolt" eventsapi "github.com/containerd/containerd/api/services/events/v1" snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1" "github.com/containerd/containerd/api/types" @@ -15,7 +14,6 @@ import ( "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/snapshot" protoempty "github.com/golang/protobuf/ptypes/empty" - "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -25,7 +23,6 @@ func init() { Type: plugin.GRPCPlugin, ID: "snapshots", Requires: []plugin.Type{ - plugin.SnapshotPlugin, plugin.MetadataPlugin, }, Init: newService, @@ -35,31 +32,19 @@ func init() { var empty = &protoempty.Empty{} type service struct { - snapshotters map[string]snapshot.Snapshotter - publisher events.Publisher + db *metadata.DB + publisher events.Publisher } func newService(ic *plugin.InitContext) (interface{}, error) { - rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin) - if err != nil { - return nil, err - } md, err := ic.Get(plugin.MetadataPlugin) if err != nil { return nil, err } - snapshotters := make(map[string]snapshot.Snapshotter) - for name, sn := range rawSnapshotters { - snapshotters[name] = metadata.NewSnapshotter(md.(*bolt.DB), name, sn.(snapshot.Snapshotter)) - } - - if len(snapshotters) == 0 { - return nil, errors.Errorf("failed to create snapshotter service: no snapshotters loaded") - } return &service{ - snapshotters: snapshotters, - publisher: ic.Events, + db: md.(*metadata.DB), + publisher: ic.Events, }, nil } @@ -68,8 +53,8 @@ func (s *service) getSnapshotter(name string) (snapshot.Snapshotter, error) { return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter argument missing") } - sn, ok := s.snapshotters[name] - if !ok { + sn := s.db.Snapshotter(name) + if sn == nil { return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter not loaded: %s", name) } return sn, nil diff --git a/services/tasks/service.go b/services/tasks/service.go index cc126e73b..d619c6d65 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -44,7 +44,6 @@ func init() { Requires: []plugin.Type{ plugin.RuntimePlugin, plugin.MetadataPlugin, - plugin.ContentPlugin, }, Init: New, }) @@ -59,11 +58,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { if err != nil { return nil, err } - ct, err := ic.Get(plugin.ContentPlugin) - if err != nil { - return nil, err - } - cs := metadata.NewContentStore(m.(*bolt.DB), ct.(content.Store)) + cs := m.(*metadata.DB).ContentStore() runtimes := make(map[string]runtime.Runtime) for _, rr := range rt { r := rr.(runtime.Runtime) @@ -71,7 +66,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { } return &Service{ runtimes: runtimes, - db: m.(*bolt.DB), + db: m.(*metadata.DB), store: cs, publisher: ic.Events, }, nil @@ -79,7 +74,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { type Service struct { runtimes map[string]runtime.Runtime - db *bolt.DB + db *metadata.DB store content.Store publisher events.Publisher } diff --git a/snapshot/storage/bolt.go b/snapshot/storage/bolt.go index 4f0c677ce..3ca3b879c 100644 --- a/snapshot/storage/bolt.go +++ b/snapshot/storage/bolt.go @@ -305,7 +305,7 @@ func Remove(ctx context.Context, key string) (string, snapshot.Kind, error) { if pbkt != nil { k, _ := pbkt.Cursor().Seek(parentPrefixKey(id)) if getParentPrefix(k) == id { - return errors.Errorf("cannot remove snapshot with child") + return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot remove snapshot with child") } if si.Parent != "" { diff --git a/windows/runtime.go b/windows/runtime.go index e16c531a1..4c0afb4cc 100644 --- a/windows/runtime.go +++ b/windows/runtime.go @@ -16,6 +16,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/events" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/runtime" @@ -68,7 +69,7 @@ func New(ic *plugin.InitContext) (interface{}, error) { // TODO(mlaventure): windows needs a stat monitor monitor: nil, tasks: runtime.NewTaskList(), - db: m.(*bolt.DB), + db: m.(*metadata.DB), } // Load our existing containers and kill/delete them. We don't support @@ -89,7 +90,7 @@ type windowsRuntime struct { monitor runtime.TaskMonitor tasks *runtime.TaskList - db *bolt.DB + db *metadata.DB } func (r *windowsRuntime) ID() string {