Merge pull request #1582 from dmcgowan/metadata-db-object
Create metadata db object
This commit is contained in:
commit
8558b98eb1
@ -6,7 +6,6 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
"github.com/containerd/containerd/archive"
|
"github.com/containerd/containerd/archive"
|
||||||
"github.com/containerd/containerd/archive/compression"
|
"github.com/containerd/containerd/archive/compression"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
@ -26,19 +25,14 @@ func init() {
|
|||||||
Type: plugin.DiffPlugin,
|
Type: plugin.DiffPlugin,
|
||||||
ID: "walking",
|
ID: "walking",
|
||||||
Requires: []plugin.Type{
|
Requires: []plugin.Type{
|
||||||
plugin.ContentPlugin,
|
|
||||||
plugin.MetadataPlugin,
|
plugin.MetadataPlugin,
|
||||||
},
|
},
|
||||||
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
||||||
c, err := ic.Get(plugin.ContentPlugin)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
md, err := ic.Get(plugin.MetadataPlugin)
|
md, err := ic.Get(plugin.MetadataPlugin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewWalkingDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store)))
|
return NewWalkingDiff(md.(*metadata.DB).ContentStore())
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
state: ic.State,
|
state: ic.State,
|
||||||
monitor: monitor.(runtime.TaskMonitor),
|
monitor: monitor.(runtime.TaskMonitor),
|
||||||
tasks: runtime.NewTaskList(),
|
tasks: runtime.NewTaskList(),
|
||||||
db: m.(*bolt.DB),
|
db: m.(*metadata.DB),
|
||||||
address: ic.Address,
|
address: ic.Address,
|
||||||
events: ic.Events,
|
events: ic.Events,
|
||||||
config: cfg,
|
config: cfg,
|
||||||
@ -138,7 +138,7 @@ type Runtime struct {
|
|||||||
|
|
||||||
monitor runtime.TaskMonitor
|
monitor runtime.TaskMonitor
|
||||||
tasks *runtime.TaskList
|
tasks *runtime.TaskList
|
||||||
db *bolt.DB
|
db *metadata.DB
|
||||||
events *events.Exchange
|
events *events.Exchange
|
||||||
|
|
||||||
config *Config
|
config *Config
|
||||||
|
@ -17,9 +17,14 @@ func WithTransactionContext(ctx context.Context, tx *bolt.Tx) context.Context {
|
|||||||
return context.WithValue(ctx, transactionKey{}, tx)
|
return context.WithValue(ctx, transactionKey{}, tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type transactor interface {
|
||||||
|
View(fn func(*bolt.Tx) error) error
|
||||||
|
Update(fn func(*bolt.Tx) error) error
|
||||||
|
}
|
||||||
|
|
||||||
// view gets a bolt db transaction either from the context
|
// view gets a bolt db transaction either from the context
|
||||||
// or starts a new one with the provided bolt database.
|
// or starts a new one with the provided bolt database.
|
||||||
func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
|
func view(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error {
|
||||||
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
|
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
|
||||||
if !ok {
|
if !ok {
|
||||||
return db.View(fn)
|
return db.View(fn)
|
||||||
@ -29,7 +34,7 @@ func view(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
|
|||||||
|
|
||||||
// update gets a writable bolt db transaction either from the context
|
// update gets a writable bolt db transaction either from the context
|
||||||
// or starts a new one with the provided bolt database.
|
// or starts a new one with the provided bolt database.
|
||||||
func update(ctx context.Context, db *bolt.DB, fn func(*bolt.Tx) error) error {
|
func update(ctx context.Context, db transactor, fn func(*bolt.Tx) error) error {
|
||||||
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
|
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
|
||||||
if !ok {
|
if !ok {
|
||||||
return db.Update(fn)
|
return db.Update(fn)
|
||||||
|
@ -28,7 +28,8 @@ import (
|
|||||||
// key: object-specific key identifying the storage bucket for the objects
|
// key: object-specific key identifying the storage bucket for the objects
|
||||||
// contents.
|
// contents.
|
||||||
var (
|
var (
|
||||||
bucketKeyVersion = []byte("v1")
|
bucketKeyVersion = []byte(schemaVersion)
|
||||||
|
bucketKeyDBVersion = []byte("version") // stores the version of the schema
|
||||||
bucketKeyObjectLabels = []byte("labels") // stores the labels for a namespace.
|
bucketKeyObjectLabels = []byte("labels") // stores the labels for a namespace.
|
||||||
bucketKeyObjectIndexes = []byte("indexes") // reserved
|
bucketKeyObjectIndexes = []byte("indexes") // reserved
|
||||||
bucketKeyObjectImages = []byte("images") // stores image objects
|
bucketKeyObjectImages = []byte("images") // stores image objects
|
||||||
@ -45,6 +46,7 @@ var (
|
|||||||
bucketKeyRuntime = []byte("runtime")
|
bucketKeyRuntime = []byte("runtime")
|
||||||
bucketKeyName = []byte("name")
|
bucketKeyName = []byte("name")
|
||||||
bucketKeyParent = []byte("parent")
|
bucketKeyParent = []byte("parent")
|
||||||
|
bucketKeyChildren = []byte("children")
|
||||||
bucketKeyOptions = []byte("options")
|
bucketKeyOptions = []byte("options")
|
||||||
bucketKeySpec = []byte("spec")
|
bucketKeySpec = []byte("spec")
|
||||||
bucketKeySnapshotKey = []byte("snapshotKey")
|
bucketKeySnapshotKey = []byte("snapshotKey")
|
||||||
|
@ -7,6 +7,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -701,7 +702,7 @@ func testEnv(t *testing.T) (context.Context, *bolt.DB, func()) {
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
ctx = namespaces.WithNamespace(ctx, "testing")
|
ctx = namespaces.WithNamespace(ctx, "testing")
|
||||||
|
|
||||||
dirname, err := ioutil.TempDir("", t.Name()+"-")
|
dirname, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)+"-")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -19,12 +19,12 @@ import (
|
|||||||
|
|
||||||
type contentStore struct {
|
type contentStore struct {
|
||||||
content.Store
|
content.Store
|
||||||
db *bolt.DB
|
db transactor
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewContentStore returns a namespaced content store using an existing
|
// newContentStore returns a namespaced content store using an existing
|
||||||
// content store interface.
|
// content store interface.
|
||||||
func NewContentStore(db *bolt.DB, cs content.Store) content.Store {
|
func newContentStore(db transactor, cs content.Store) content.Store {
|
||||||
return &contentStore{
|
return &contentStore{
|
||||||
Store: cs,
|
Store: cs,
|
||||||
db: db,
|
db: db,
|
||||||
@ -353,7 +353,7 @@ type namespacedWriter struct {
|
|||||||
content.Writer
|
content.Writer
|
||||||
ref string
|
ref string
|
||||||
namespace string
|
namespace string
|
||||||
db *bolt.DB
|
db transactor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
|
||||||
@ -406,7 +406,7 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
|
|||||||
|
|
||||||
commitTime := time.Now().UTC()
|
commitTime := time.Now().UTC()
|
||||||
|
|
||||||
sizeEncoded, err := encodeSize(size)
|
sizeEncoded, err := encodeInt(size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -488,7 +488,7 @@ func writeInfo(info *content.Info, bkt *bolt.Bucket) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Write size
|
// Write size
|
||||||
sizeEncoded, err := encodeSize(info.Size)
|
sizeEncoded, err := encodeInt(info.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ func createContentStore(ctx context.Context, root string) (content.Store, func()
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return NewContentStore(db, cs), func() error {
|
return NewDB(db, cs, nil).ContentStore(), func() error {
|
||||||
return db.Close()
|
return db.Close()
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
140
metadata/db.go
Normal file
140
metadata/db.go
Normal file
@ -0,0 +1,140 @@
|
|||||||
|
package metadata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/binary"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/boltdb/bolt"
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
|
"github.com/containerd/containerd/log"
|
||||||
|
"github.com/containerd/containerd/snapshot"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// schemaVersion represents the schema version of
|
||||||
|
// the database. This schema version represents the
|
||||||
|
// structure of the data in the database. The schema
|
||||||
|
// can envolve at any time but any backwards
|
||||||
|
// incompatible changes or structural changes require
|
||||||
|
// bumping the schema version.
|
||||||
|
schemaVersion = "v1"
|
||||||
|
|
||||||
|
// dbVersion represents updates to the schema
|
||||||
|
// version which are additions and compatible with
|
||||||
|
// prior version of the same schema.
|
||||||
|
dbVersion = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
type DB struct {
|
||||||
|
db *bolt.DB
|
||||||
|
ss map[string]snapshot.Snapshotter
|
||||||
|
cs content.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewDB(db *bolt.DB, cs content.Store, ss map[string]snapshot.Snapshotter) *DB {
|
||||||
|
return &DB{
|
||||||
|
db: db,
|
||||||
|
ss: ss,
|
||||||
|
cs: cs,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *DB) Init(ctx context.Context) error {
|
||||||
|
// errSkip is used when no migration or version needs to be written
|
||||||
|
// to the database and the transaction can be immediately rolled
|
||||||
|
// back rather than performing a much slower and unnecessary commit.
|
||||||
|
var errSkip = errors.New("skip update")
|
||||||
|
|
||||||
|
err := m.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
var (
|
||||||
|
// current schema and version
|
||||||
|
schema = "v0"
|
||||||
|
version = 0
|
||||||
|
)
|
||||||
|
|
||||||
|
i := len(migrations)
|
||||||
|
for ; i > 0; i-- {
|
||||||
|
migration := migrations[i-1]
|
||||||
|
|
||||||
|
bkt := tx.Bucket([]byte(migration.schema))
|
||||||
|
if bkt == nil {
|
||||||
|
// Hasn't encountered another schema, go to next migration
|
||||||
|
if schema == "v0" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if schema == "v0" {
|
||||||
|
schema = migration.schema
|
||||||
|
vb := bkt.Get(bucketKeyDBVersion)
|
||||||
|
if vb != nil {
|
||||||
|
v, _ := binary.Varint(vb)
|
||||||
|
version = int(v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if version >= migration.version {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Previous version fo database found
|
||||||
|
if schema != "v0" {
|
||||||
|
updates := migrations[i:]
|
||||||
|
|
||||||
|
// No migration updates, return immediately
|
||||||
|
if len(updates) == 0 {
|
||||||
|
return errSkip
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, m := range updates {
|
||||||
|
t0 := time.Now()
|
||||||
|
if err := m.migrate(tx); err != nil {
|
||||||
|
return errors.Wrapf(err, "failed to migrate to %s.%d", m.schema, m.version)
|
||||||
|
}
|
||||||
|
log.G(ctx).WithField("d", time.Now().Sub(t0)).Debugf("database migration to %s.%d finished", m.schema, m.version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bkt, err := tx.CreateBucketIfNotExists(bucketKeyVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
versionEncoded, err := encodeInt(dbVersion)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bkt.Put(bucketKeyDBVersion, versionEncoded)
|
||||||
|
})
|
||||||
|
if err == errSkip {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *DB) ContentStore() content.Store {
|
||||||
|
if m.cs == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return newContentStore(m, m.cs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *DB) Snapshotter(name string) snapshot.Snapshotter {
|
||||||
|
sn, ok := m.ss[name]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return newSnapshotter(m, name, sn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *DB) View(fn func(*bolt.Tx) error) error {
|
||||||
|
return m.db.View(fn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *DB) Update(fn func(*bolt.Tx) error) error {
|
||||||
|
return m.db.Update(fn)
|
||||||
|
}
|
178
metadata/db_test.go
Normal file
178
metadata/db_test.go
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
package metadata
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/binary"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/boltdb/bolt"
|
||||||
|
"github.com/containerd/containerd/errdefs"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestInit(t *testing.T) {
|
||||||
|
ctx, db, cancel := testEnv(t)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := NewDB(db, nil, nil).Init(ctx); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
version, err := readDBVersion(db, bucketKeyVersion)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if version != dbVersion {
|
||||||
|
t.Fatalf("Unexpected version %d, expected %d", version, dbVersion)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMigrations(t *testing.T) {
|
||||||
|
migrationTests := []struct {
|
||||||
|
name string
|
||||||
|
init func(*bolt.Tx) error
|
||||||
|
check func(*bolt.Tx) error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "ChildrenKey",
|
||||||
|
init: func(tx *bolt.Tx) error {
|
||||||
|
bkt, err := createSnapshotterBucket(tx, "testing", "testing")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshots := []struct {
|
||||||
|
key string
|
||||||
|
parent string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
key: "k1",
|
||||||
|
parent: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "k2",
|
||||||
|
parent: "k1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "k2a",
|
||||||
|
parent: "k1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "a1",
|
||||||
|
parent: "k2",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range snapshots {
|
||||||
|
sbkt, err := bkt.CreateBucket([]byte(s.key))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := sbkt.Put(bucketKeyParent, []byte(s.parent)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
check: func(tx *bolt.Tx) error {
|
||||||
|
bkt := getSnapshotterBucket(tx, "testing", "testing")
|
||||||
|
if bkt == nil {
|
||||||
|
return errors.Wrap(errdefs.ErrNotFound, "snapshots bucket not found")
|
||||||
|
}
|
||||||
|
snapshots := []struct {
|
||||||
|
key string
|
||||||
|
children []string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
key: "k1",
|
||||||
|
children: []string{"k2", "k2a"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "k2",
|
||||||
|
children: []string{"a1"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "k2a",
|
||||||
|
children: []string{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "a1",
|
||||||
|
children: []string{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, s := range snapshots {
|
||||||
|
sbkt := bkt.Bucket([]byte(s.key))
|
||||||
|
if sbkt == nil {
|
||||||
|
return errors.Wrap(errdefs.ErrNotFound, "key does not exist")
|
||||||
|
}
|
||||||
|
|
||||||
|
cbkt := sbkt.Bucket(bucketKeyChildren)
|
||||||
|
var cn int
|
||||||
|
if cbkt != nil {
|
||||||
|
cn = cbkt.Stats().KeyN
|
||||||
|
}
|
||||||
|
|
||||||
|
if cn != len(s.children) {
|
||||||
|
return errors.Errorf("unexpected number of children %d, expected %d", cn, len(s.children))
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ch := range s.children {
|
||||||
|
if v := cbkt.Get([]byte(ch)); v == nil {
|
||||||
|
return errors.Errorf("missing child record for %s", ch)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(migrationTests) != len(migrations) {
|
||||||
|
t.Fatal("Each migration must have a test case")
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, mt := range migrationTests {
|
||||||
|
t.Run(mt.name, runMigrationTest(i, mt.init, mt.check))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func runMigrationTest(i int, init, check func(*bolt.Tx) error) func(t *testing.T) {
|
||||||
|
return func(t *testing.T) {
|
||||||
|
_, db, cancel := testEnv(t)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := db.Update(init); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.Update(migrations[i].migrate); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.View(check); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func readDBVersion(db *bolt.DB, schema []byte) (int, error) {
|
||||||
|
var version int
|
||||||
|
if err := db.View(func(tx *bolt.Tx) error {
|
||||||
|
bkt := tx.Bucket(schema)
|
||||||
|
if bkt == nil {
|
||||||
|
return errors.Wrap(errdefs.ErrNotFound, "no version bucket")
|
||||||
|
}
|
||||||
|
vb := bkt.Get(bucketKeyDBVersion)
|
||||||
|
if vb == nil {
|
||||||
|
return errors.Wrap(errdefs.ErrNotFound, "no version value")
|
||||||
|
}
|
||||||
|
v, _ := binary.Varint(vb)
|
||||||
|
version = int(v)
|
||||||
|
return nil
|
||||||
|
}); err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
return version, nil
|
||||||
|
}
|
@ -284,7 +284,7 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
sizeEncoded, err := encodeSize(image.Target.Size)
|
sizeEncoded, err := encodeInt(image.Target.Size)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -302,15 +302,15 @@ func writeImage(bkt *bolt.Bucket, image *images.Image) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeSize(size int64) ([]byte, error) {
|
func encodeInt(i int64) ([]byte, error) {
|
||||||
var (
|
var (
|
||||||
buf [binary.MaxVarintLen64]byte
|
buf [binary.MaxVarintLen64]byte
|
||||||
sizeEncoded = buf[:]
|
iEncoded = buf[:]
|
||||||
)
|
)
|
||||||
sizeEncoded = sizeEncoded[:binary.PutVarint(sizeEncoded, size)]
|
iEncoded = iEncoded[:binary.PutVarint(iEncoded, i)]
|
||||||
|
|
||||||
if len(sizeEncoded) == 0 {
|
if len(iEncoded) == 0 {
|
||||||
return nil, fmt.Errorf("failed encoding size = %v", size)
|
return nil, fmt.Errorf("failed encoding integer = %v", i)
|
||||||
}
|
}
|
||||||
return sizeEncoded, nil
|
return iEncoded, nil
|
||||||
}
|
}
|
||||||
|
75
metadata/migrations.go
Normal file
75
metadata/migrations.go
Normal file
@ -0,0 +1,75 @@
|
|||||||
|
package metadata
|
||||||
|
|
||||||
|
import "github.com/boltdb/bolt"
|
||||||
|
|
||||||
|
type migration struct {
|
||||||
|
schema string
|
||||||
|
version int
|
||||||
|
migrate func(*bolt.Tx) error
|
||||||
|
}
|
||||||
|
|
||||||
|
var migrations = []migration{
|
||||||
|
{
|
||||||
|
schema: "v1",
|
||||||
|
version: 1,
|
||||||
|
migrate: addChildLinks,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// addChildLinks Adds children key to the snapshotters to enforce snapshot
|
||||||
|
// entries cannot be removed which have children
|
||||||
|
func addChildLinks(tx *bolt.Tx) error {
|
||||||
|
v1bkt := tx.Bucket(bucketKeyVersion)
|
||||||
|
if v1bkt == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// iterate through each namespace
|
||||||
|
v1c := v1bkt.Cursor()
|
||||||
|
|
||||||
|
for k, v := v1c.First(); k != nil; k, v = v1c.Next() {
|
||||||
|
if v != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
nbkt := v1bkt.Bucket(k)
|
||||||
|
|
||||||
|
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
|
||||||
|
if sbkt != nil {
|
||||||
|
// Iterate through each snapshotter
|
||||||
|
if err := sbkt.ForEach(func(sk, sv []byte) error {
|
||||||
|
if sv != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
snbkt := sbkt.Bucket(sk)
|
||||||
|
|
||||||
|
// Iterate through each snapshot
|
||||||
|
return snbkt.ForEach(func(k, v []byte) error {
|
||||||
|
if v != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
parent := snbkt.Bucket(k).Get(bucketKeyParent)
|
||||||
|
if len(parent) > 0 {
|
||||||
|
pbkt := snbkt.Bucket(parent)
|
||||||
|
if pbkt == nil {
|
||||||
|
// Not enforcing consistency during migration, skip
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := cbkt.Put(k, nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -19,12 +19,12 @@ import (
|
|||||||
type snapshotter struct {
|
type snapshotter struct {
|
||||||
snapshot.Snapshotter
|
snapshot.Snapshotter
|
||||||
name string
|
name string
|
||||||
db *bolt.DB
|
db transactor
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSnapshotter returns a new Snapshotter which namespaces the given snapshot
|
// newSnapshotter returns a new Snapshotter which namespaces the given snapshot
|
||||||
// using the provided name and metadata store.
|
// using the provided name and database.
|
||||||
func NewSnapshotter(db *bolt.DB, name string, sn snapshot.Snapshotter) snapshot.Snapshotter {
|
func newSnapshotter(db transactor, name string, sn snapshot.Snapshotter) snapshot.Snapshotter {
|
||||||
return &snapshotter{
|
return &snapshotter{
|
||||||
Snapshotter: sn,
|
Snapshotter: sn,
|
||||||
name: name,
|
name: name,
|
||||||
@ -283,10 +283,18 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
|
|||||||
if parent != "" {
|
if parent != "" {
|
||||||
pbkt := bkt.Bucket([]byte(parent))
|
pbkt := bkt.Bucket([]byte(parent))
|
||||||
if pbkt == nil {
|
if pbkt == nil {
|
||||||
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", parent)
|
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", parent)
|
||||||
}
|
}
|
||||||
bparent = string(pbkt.Get(bucketKeyName))
|
bparent = string(pbkt.Get(bucketKeyName))
|
||||||
|
|
||||||
|
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := cbkt.Put([]byte(key), nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
|
if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -360,7 +368,6 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
|
|||||||
}
|
}
|
||||||
|
|
||||||
bkey := string(obkt.Get(bucketKeyName))
|
bkey := string(obkt.Get(bucketKeyName))
|
||||||
parent := string(obkt.Get(bucketKeyParent))
|
|
||||||
|
|
||||||
sid, err := bkt.NextSequence()
|
sid, err := bkt.NextSequence()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -372,8 +379,28 @@ func (s *snapshotter) Commit(ctx context.Context, name, key string, opts ...snap
|
|||||||
if err := bbkt.Put(bucketKeyName, []byte(nameKey)); err != nil {
|
if err := bbkt.Put(bucketKeyName, []byte(nameKey)); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := bbkt.Put(bucketKeyParent, []byte(parent)); err != nil {
|
|
||||||
return err
|
parent := obkt.Get(bucketKeyParent)
|
||||||
|
if len(parent) > 0 {
|
||||||
|
pbkt := bkt.Bucket(parent)
|
||||||
|
if pbkt == nil {
|
||||||
|
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent))
|
||||||
|
}
|
||||||
|
|
||||||
|
cbkt, err := pbkt.CreateBucketIfNotExists(bucketKeyChildren)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := cbkt.Delete([]byte(key)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := cbkt.Put([]byte(name), nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := bbkt.Put(bucketKeyParent, parent); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ts := time.Now().UTC()
|
ts := time.Now().UTC()
|
||||||
if err := boltutil.WriteTimestamps(bbkt, ts, ts); err != nil {
|
if err := boltutil.WriteTimestamps(bbkt, ts, ts); err != nil {
|
||||||
@ -400,23 +427,37 @@ func (s *snapshotter) Remove(ctx context.Context, key string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return update(ctx, s.db, func(tx *bolt.Tx) error {
|
return update(ctx, s.db, func(tx *bolt.Tx) error {
|
||||||
var bkey string
|
var sbkt *bolt.Bucket
|
||||||
bkt := getSnapshotterBucket(tx, ns, s.name)
|
bkt := getSnapshotterBucket(tx, ns, s.name)
|
||||||
if bkt != nil {
|
if bkt != nil {
|
||||||
sbkt := bkt.Bucket([]byte(key))
|
sbkt = bkt.Bucket([]byte(key))
|
||||||
if sbkt != nil {
|
|
||||||
bkey = string(sbkt.Get(bucketKeyName))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if bkey == "" {
|
if sbkt == nil {
|
||||||
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
|
return errors.Wrapf(errdefs.ErrNotFound, "snapshot %v does not exist", key)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := bkt.DeleteBucket([]byte(key)); err != nil {
|
cbkt := sbkt.Bucket(bucketKeyChildren)
|
||||||
return err
|
if cbkt != nil {
|
||||||
|
if child, _ := cbkt.Cursor().First(); child != nil {
|
||||||
|
return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot remove snapshot with child")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.Snapshotter.Remove(ctx, bkey)
|
parent := sbkt.Get(bucketKeyParent)
|
||||||
|
if len(parent) > 0 {
|
||||||
|
pbkt := bkt.Bucket(parent)
|
||||||
|
if pbkt == nil {
|
||||||
|
return errors.Wrapf(errdefs.ErrNotFound, "parent snapshot %v does not exist", string(parent))
|
||||||
|
}
|
||||||
|
cbkt := pbkt.Bucket(bucketKeyChildren)
|
||||||
|
if cbkt != nil {
|
||||||
|
if err := cbkt.Delete([]byte(key)); err != nil {
|
||||||
|
return errors.Wrap(err, "failed to remove child link")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return bkt.DeleteBucket([]byte(key))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,7 +13,7 @@ import (
|
|||||||
"github.com/containerd/containerd/testutil"
|
"github.com/containerd/containerd/testutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, func() error, error) {
|
func newTestSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, func() error, error) {
|
||||||
naiveRoot := filepath.Join(root, "naive")
|
naiveRoot := filepath.Join(root, "naive")
|
||||||
if err := os.Mkdir(naiveRoot, 0770); err != nil {
|
if err := os.Mkdir(naiveRoot, 0770); err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
@ -28,7 +28,7 @@ func newSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, fun
|
|||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
sn := NewSnapshotter(db, "naive", snapshotter)
|
sn := NewDB(db, nil, map[string]snapshot.Snapshotter{"naive": snapshotter}).Snapshotter("naive")
|
||||||
|
|
||||||
return sn, func() error {
|
return sn, func() error {
|
||||||
return db.Close()
|
return db.Close()
|
||||||
@ -38,5 +38,5 @@ func newSnapshotter(ctx context.Context, root string) (snapshot.Snapshotter, fun
|
|||||||
func TestMetadata(t *testing.T) {
|
func TestMetadata(t *testing.T) {
|
||||||
// Snapshot tests require mounting, still requires root
|
// Snapshot tests require mounting, still requires root
|
||||||
testutil.RequiresRoot(t)
|
testutil.RequiresRoot(t)
|
||||||
testsuite.SnapshotterSuite(t, "Metadata", newSnapshotter)
|
testsuite.SnapshotterSuite(t, "Metadata", newTestSnapshotter)
|
||||||
}
|
}
|
||||||
|
@ -12,18 +12,21 @@ import (
|
|||||||
|
|
||||||
"github.com/boltdb/bolt"
|
"github.com/boltdb/bolt"
|
||||||
containers "github.com/containerd/containerd/api/services/containers/v1"
|
containers "github.com/containerd/containerd/api/services/containers/v1"
|
||||||
content "github.com/containerd/containerd/api/services/content/v1"
|
contentapi "github.com/containerd/containerd/api/services/content/v1"
|
||||||
diff "github.com/containerd/containerd/api/services/diff/v1"
|
diff "github.com/containerd/containerd/api/services/diff/v1"
|
||||||
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
||||||
images "github.com/containerd/containerd/api/services/images/v1"
|
images "github.com/containerd/containerd/api/services/images/v1"
|
||||||
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
|
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
|
||||||
snapshot "github.com/containerd/containerd/api/services/snapshot/v1"
|
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
|
||||||
tasks "github.com/containerd/containerd/api/services/tasks/v1"
|
tasks "github.com/containerd/containerd/api/services/tasks/v1"
|
||||||
version "github.com/containerd/containerd/api/services/version/v1"
|
version "github.com/containerd/containerd/api/services/version/v1"
|
||||||
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/content/local"
|
"github.com/containerd/containerd/content/local"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
|
"github.com/containerd/containerd/metadata"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
|
"github.com/containerd/containerd/snapshot"
|
||||||
metrics "github.com/docker/go-metrics"
|
metrics "github.com/docker/go-metrics"
|
||||||
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
@ -175,11 +178,38 @@ func loadPlugins(config *Config) ([]*plugin.Registration, error) {
|
|||||||
plugin.Register(&plugin.Registration{
|
plugin.Register(&plugin.Registration{
|
||||||
Type: plugin.MetadataPlugin,
|
Type: plugin.MetadataPlugin,
|
||||||
ID: "bolt",
|
ID: "bolt",
|
||||||
|
Requires: []plugin.Type{
|
||||||
|
plugin.ContentPlugin,
|
||||||
|
plugin.SnapshotPlugin,
|
||||||
|
},
|
||||||
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
||||||
if err := os.MkdirAll(ic.Root, 0711); err != nil {
|
if err := os.MkdirAll(ic.Root, 0711); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return bolt.Open(filepath.Join(ic.Root, "meta.db"), 0644, nil)
|
cs, err := ic.Get(plugin.ContentPlugin)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotters := make(map[string]snapshot.Snapshotter)
|
||||||
|
for name, sn := range rawSnapshotters {
|
||||||
|
snapshotters[name] = sn.(snapshot.Snapshotter)
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := bolt.Open(filepath.Join(ic.Root, "meta.db"), 0644, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
mdb := metadata.NewDB(db, cs.(content.Store), snapshotters)
|
||||||
|
if err := mdb.Init(ic.Context); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return mdb, nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -199,7 +229,7 @@ func interceptor(
|
|||||||
ctx = log.WithModule(ctx, "tasks")
|
ctx = log.WithModule(ctx, "tasks")
|
||||||
case containers.ContainersServer:
|
case containers.ContainersServer:
|
||||||
ctx = log.WithModule(ctx, "containers")
|
ctx = log.WithModule(ctx, "containers")
|
||||||
case content.ContentServer:
|
case contentapi.ContentServer:
|
||||||
ctx = log.WithModule(ctx, "content")
|
ctx = log.WithModule(ctx, "content")
|
||||||
case images.ImagesServer:
|
case images.ImagesServer:
|
||||||
ctx = log.WithModule(ctx, "images")
|
ctx = log.WithModule(ctx, "images")
|
||||||
@ -207,7 +237,7 @@ func interceptor(
|
|||||||
// No need to change the context
|
// No need to change the context
|
||||||
case version.VersionServer:
|
case version.VersionServer:
|
||||||
ctx = log.WithModule(ctx, "version")
|
ctx = log.WithModule(ctx, "version")
|
||||||
case snapshot.SnapshotsServer:
|
case snapshotapi.SnapshotsServer:
|
||||||
ctx = log.WithModule(ctx, "snapshot")
|
ctx = log.WithModule(ctx, "snapshot")
|
||||||
case diff.DiffServer:
|
case diff.DiffServer:
|
||||||
ctx = log.WithModule(ctx, "diff")
|
ctx = log.WithModule(ctx, "diff")
|
||||||
|
@ -28,17 +28,17 @@ func init() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewService(m.(*bolt.DB), ic.Events), nil
|
return NewService(m.(*metadata.DB), ic.Events), nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
db *bolt.DB
|
db *metadata.DB
|
||||||
publisher events.Publisher
|
publisher events.Publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(db *bolt.DB, publisher events.Publisher) api.ContainersServer {
|
func NewService(db *metadata.DB, publisher events.Publisher) api.ContainersServer {
|
||||||
return &Service{db: db, publisher: publisher}
|
return &Service{db: db, publisher: publisher}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
api "github.com/containerd/containerd/api/services/content/v1"
|
api "github.com/containerd/containerd/api/services/content/v1"
|
||||||
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
@ -40,7 +39,6 @@ func init() {
|
|||||||
Type: plugin.GRPCPlugin,
|
Type: plugin.GRPCPlugin,
|
||||||
ID: "content",
|
ID: "content",
|
||||||
Requires: []plugin.Type{
|
Requires: []plugin.Type{
|
||||||
plugin.ContentPlugin,
|
|
||||||
plugin.MetadataPlugin,
|
plugin.MetadataPlugin,
|
||||||
},
|
},
|
||||||
Init: NewService,
|
Init: NewService,
|
||||||
@ -48,17 +46,13 @@ func init() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewService(ic *plugin.InitContext) (interface{}, error) {
|
func NewService(ic *plugin.InitContext) (interface{}, error) {
|
||||||
c, err := ic.Get(plugin.ContentPlugin)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
m, err := ic.Get(plugin.MetadataPlugin)
|
m, err := ic.Get(plugin.MetadataPlugin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
cs := metadata.NewContentStore(m.(*bolt.DB), c.(content.Store))
|
|
||||||
return &Service{
|
return &Service{
|
||||||
store: cs,
|
store: m.(*metadata.DB).ContentStore(),
|
||||||
publisher: ic.Events,
|
publisher: ic.Events,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
@ -28,17 +28,17 @@ func init() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewService(m.(*bolt.DB), ic.Events), nil
|
return NewService(m.(*metadata.DB), ic.Events), nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
db *bolt.DB
|
db *metadata.DB
|
||||||
publisher events.Publisher
|
publisher events.Publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewService(db *bolt.DB, publisher events.Publisher) imagesapi.ImagesServer {
|
func NewService(db *metadata.DB, publisher events.Publisher) imagesapi.ImagesServer {
|
||||||
return &Service{
|
return &Service{
|
||||||
db: db,
|
db: db,
|
||||||
publisher: publisher,
|
publisher: publisher,
|
||||||
|
@ -29,19 +29,19 @@ func init() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return NewService(m.(*bolt.DB), ic.Events), nil
|
return NewService(m.(*metadata.DB), ic.Events), nil
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
db *bolt.DB
|
db *metadata.DB
|
||||||
publisher events.Publisher
|
publisher events.Publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ api.NamespacesServer = &Service{}
|
var _ api.NamespacesServer = &Service{}
|
||||||
|
|
||||||
func NewService(db *bolt.DB, publisher events.Publisher) api.NamespacesServer {
|
func NewService(db *metadata.DB, publisher events.Publisher) api.NamespacesServer {
|
||||||
return &Service{
|
return &Service{
|
||||||
db: db,
|
db: db,
|
||||||
publisher: publisher,
|
publisher: publisher,
|
||||||
|
@ -3,7 +3,6 @@ package snapshot
|
|||||||
import (
|
import (
|
||||||
gocontext "context"
|
gocontext "context"
|
||||||
|
|
||||||
"github.com/boltdb/bolt"
|
|
||||||
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
eventsapi "github.com/containerd/containerd/api/services/events/v1"
|
||||||
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
|
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
|
||||||
"github.com/containerd/containerd/api/types"
|
"github.com/containerd/containerd/api/types"
|
||||||
@ -15,7 +14,6 @@ import (
|
|||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
"github.com/containerd/containerd/snapshot"
|
"github.com/containerd/containerd/snapshot"
|
||||||
protoempty "github.com/golang/protobuf/ptypes/empty"
|
protoempty "github.com/golang/protobuf/ptypes/empty"
|
||||||
"github.com/pkg/errors"
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
@ -25,7 +23,6 @@ func init() {
|
|||||||
Type: plugin.GRPCPlugin,
|
Type: plugin.GRPCPlugin,
|
||||||
ID: "snapshots",
|
ID: "snapshots",
|
||||||
Requires: []plugin.Type{
|
Requires: []plugin.Type{
|
||||||
plugin.SnapshotPlugin,
|
|
||||||
plugin.MetadataPlugin,
|
plugin.MetadataPlugin,
|
||||||
},
|
},
|
||||||
Init: newService,
|
Init: newService,
|
||||||
@ -35,31 +32,19 @@ func init() {
|
|||||||
var empty = &protoempty.Empty{}
|
var empty = &protoempty.Empty{}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
snapshotters map[string]snapshot.Snapshotter
|
db *metadata.DB
|
||||||
publisher events.Publisher
|
publisher events.Publisher
|
||||||
}
|
}
|
||||||
|
|
||||||
func newService(ic *plugin.InitContext) (interface{}, error) {
|
func newService(ic *plugin.InitContext) (interface{}, error) {
|
||||||
rawSnapshotters, err := ic.GetAll(plugin.SnapshotPlugin)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
md, err := ic.Get(plugin.MetadataPlugin)
|
md, err := ic.Get(plugin.MetadataPlugin)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
snapshotters := make(map[string]snapshot.Snapshotter)
|
|
||||||
for name, sn := range rawSnapshotters {
|
|
||||||
snapshotters[name] = metadata.NewSnapshotter(md.(*bolt.DB), name, sn.(snapshot.Snapshotter))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(snapshotters) == 0 {
|
|
||||||
return nil, errors.Errorf("failed to create snapshotter service: no snapshotters loaded")
|
|
||||||
}
|
|
||||||
|
|
||||||
return &service{
|
return &service{
|
||||||
snapshotters: snapshotters,
|
db: md.(*metadata.DB),
|
||||||
publisher: ic.Events,
|
publisher: ic.Events,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,8 +53,8 @@ func (s *service) getSnapshotter(name string) (snapshot.Snapshotter, error) {
|
|||||||
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter argument missing")
|
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter argument missing")
|
||||||
}
|
}
|
||||||
|
|
||||||
sn, ok := s.snapshotters[name]
|
sn := s.db.Snapshotter(name)
|
||||||
if !ok {
|
if sn == nil {
|
||||||
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter not loaded: %s", name)
|
return nil, errdefs.ToGRPCf(errdefs.ErrInvalidArgument, "snapshotter not loaded: %s", name)
|
||||||
}
|
}
|
||||||
return sn, nil
|
return sn, nil
|
||||||
|
@ -44,7 +44,6 @@ func init() {
|
|||||||
Requires: []plugin.Type{
|
Requires: []plugin.Type{
|
||||||
plugin.RuntimePlugin,
|
plugin.RuntimePlugin,
|
||||||
plugin.MetadataPlugin,
|
plugin.MetadataPlugin,
|
||||||
plugin.ContentPlugin,
|
|
||||||
},
|
},
|
||||||
Init: New,
|
Init: New,
|
||||||
})
|
})
|
||||||
@ -59,11 +58,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ct, err := ic.Get(plugin.ContentPlugin)
|
cs := m.(*metadata.DB).ContentStore()
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
cs := metadata.NewContentStore(m.(*bolt.DB), ct.(content.Store))
|
|
||||||
runtimes := make(map[string]runtime.Runtime)
|
runtimes := make(map[string]runtime.Runtime)
|
||||||
for _, rr := range rt {
|
for _, rr := range rt {
|
||||||
r := rr.(runtime.Runtime)
|
r := rr.(runtime.Runtime)
|
||||||
@ -71,7 +66,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
}
|
}
|
||||||
return &Service{
|
return &Service{
|
||||||
runtimes: runtimes,
|
runtimes: runtimes,
|
||||||
db: m.(*bolt.DB),
|
db: m.(*metadata.DB),
|
||||||
store: cs,
|
store: cs,
|
||||||
publisher: ic.Events,
|
publisher: ic.Events,
|
||||||
}, nil
|
}, nil
|
||||||
@ -79,7 +74,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
|
|
||||||
type Service struct {
|
type Service struct {
|
||||||
runtimes map[string]runtime.Runtime
|
runtimes map[string]runtime.Runtime
|
||||||
db *bolt.DB
|
db *metadata.DB
|
||||||
store content.Store
|
store content.Store
|
||||||
publisher events.Publisher
|
publisher events.Publisher
|
||||||
}
|
}
|
||||||
|
@ -305,7 +305,7 @@ func Remove(ctx context.Context, key string) (string, snapshot.Kind, error) {
|
|||||||
if pbkt != nil {
|
if pbkt != nil {
|
||||||
k, _ := pbkt.Cursor().Seek(parentPrefixKey(id))
|
k, _ := pbkt.Cursor().Seek(parentPrefixKey(id))
|
||||||
if getParentPrefix(k) == id {
|
if getParentPrefix(k) == id {
|
||||||
return errors.Errorf("cannot remove snapshot with child")
|
return errors.Wrap(errdefs.ErrFailedPrecondition, "cannot remove snapshot with child")
|
||||||
}
|
}
|
||||||
|
|
||||||
if si.Parent != "" {
|
if si.Parent != "" {
|
||||||
|
@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/events"
|
"github.com/containerd/containerd/events"
|
||||||
"github.com/containerd/containerd/log"
|
"github.com/containerd/containerd/log"
|
||||||
|
"github.com/containerd/containerd/metadata"
|
||||||
"github.com/containerd/containerd/namespaces"
|
"github.com/containerd/containerd/namespaces"
|
||||||
"github.com/containerd/containerd/plugin"
|
"github.com/containerd/containerd/plugin"
|
||||||
"github.com/containerd/containerd/runtime"
|
"github.com/containerd/containerd/runtime"
|
||||||
@ -68,7 +69,7 @@ func New(ic *plugin.InitContext) (interface{}, error) {
|
|||||||
// TODO(mlaventure): windows needs a stat monitor
|
// TODO(mlaventure): windows needs a stat monitor
|
||||||
monitor: nil,
|
monitor: nil,
|
||||||
tasks: runtime.NewTaskList(),
|
tasks: runtime.NewTaskList(),
|
||||||
db: m.(*bolt.DB),
|
db: m.(*metadata.DB),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load our existing containers and kill/delete them. We don't support
|
// Load our existing containers and kill/delete them. We don't support
|
||||||
@ -89,7 +90,7 @@ type windowsRuntime struct {
|
|||||||
|
|
||||||
monitor runtime.TaskMonitor
|
monitor runtime.TaskMonitor
|
||||||
tasks *runtime.TaskList
|
tasks *runtime.TaskList
|
||||||
db *bolt.DB
|
db *metadata.DB
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *windowsRuntime) ID() string {
|
func (r *windowsRuntime) ID() string {
|
||||||
|
Loading…
Reference in New Issue
Block a user