[sandbox] Implement metadata store

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2021-04-19 15:10:50 -07:00
parent 87d4c8923e
commit cab7d5b3d2
4 changed files with 700 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/sandbox"
"github.com/containerd/containerd/snapshots"
)
@ -149,6 +150,23 @@ func adaptSnapshot(info snapshots.Info) filters.Adaptor {
})
}
func adaptSandbox(instance *sandbox.Sandbox) filters.Adaptor {
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "id":
return instance.ID, true
case "labels":
return checkMap(fieldpath[1:], instance.Labels)
default:
return "", false
}
})
}
func checkMap(fieldpath []string, m map[string]string) (string, bool) {
if len(m) == 0 {
return "", false

View File

@ -130,6 +130,7 @@ var (
bucketKeyObjectBlob = []byte("blob") // stores content links
bucketKeyObjectIngests = []byte("ingests") // stores ingest objects
bucketKeyObjectLeases = []byte("leases") // stores leases
bucketKeyObjectSandboxes = []byte("sandboxes") // stores sandboxes
bucketKeyDigest = []byte("digest")
bucketKeyMediaType = []byte("mediatype")
@ -149,6 +150,7 @@ var (
bucketKeyExpected = []byte("expected")
bucketKeyRef = []byte("ref")
bucketKeyExpireAt = []byte("expireat")
bucketKeySandboxID = []byte("sandboxid")
deprecatedBucketKeyObjectIngest = []byte("ingest") // stores ingest links, deprecated in v1.2
)
@ -270,3 +272,19 @@ func createIngestBucket(tx *bolt.Tx, namespace, ref string) (*bolt.Bucket, error
func getIngestBucket(tx *bolt.Tx, namespace, ref string) *bolt.Bucket {
return getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectContent, bucketKeyObjectIngests, []byte(ref))
}
func createSandboxBucket(tx *bolt.Tx, namespace string) (*bolt.Bucket, error) {
return createBucketIfNotExists(
tx,
[]byte(namespace),
bucketKeyObjectSandboxes,
)
}
func getSandboxBucket(tx *bolt.Tx, namespace string) *bolt.Bucket {
return getBucket(
tx,
[]byte(namespace),
bucketKeyObjectSandboxes,
)
}

374
metadata/sandbox.go Normal file
View File

@ -0,0 +1,374 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metadata
import (
"context"
"strings"
"time"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
api "github.com/containerd/containerd/sandbox"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
)
type sandboxStore struct {
db *DB
}
var _ api.Store = (*sandboxStore)(nil)
// NewSandboxStore creates a datababase client for sandboxes
func NewSandboxStore(db *DB) api.Store {
return &sandboxStore{db: db}
}
// Create a sandbox record in the store
func (s *sandboxStore) Create(ctx context.Context, sandbox api.Sandbox) (api.Sandbox, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return api.Sandbox{}, err
}
sandbox.CreatedAt = time.Now().UTC()
sandbox.UpdatedAt = sandbox.CreatedAt
if err := s.validate(&sandbox); err != nil {
return api.Sandbox{}, errors.Wrap(err, "failed to validate sandbox")
}
if err := s.db.Update(func(tx *bbolt.Tx) error {
parent, err := createSandboxBucket(tx, ns)
if err != nil {
return err
}
if err := s.write(parent, &sandbox, false); err != nil {
return err
}
return nil
}); err != nil {
return api.Sandbox{}, err
}
return sandbox, nil
}
// Update the sandbox with the provided sandbox object and fields
func (s *sandboxStore) Update(ctx context.Context, sandbox api.Sandbox, fieldpaths ...string) (api.Sandbox, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return api.Sandbox{}, err
}
ret := api.Sandbox{}
if err := update(ctx, s.db, func(tx *bbolt.Tx) error {
parent := getSandboxBucket(tx, ns)
if parent == nil {
return errors.Wrap(errdefs.ErrNotFound, "no sandbox buckets")
}
updated, err := s.read(parent, []byte(sandbox.ID))
if err != nil {
return err
}
if len(fieldpaths) == 0 {
fieldpaths = []string{"labels", "extensions", "spec", "runtime"}
if updated.Runtime.Name != sandbox.Runtime.Name {
return errors.Wrapf(errdefs.ErrInvalidArgument, "sandbox.Runtime.Name field is immutable")
}
}
for _, path := range fieldpaths {
if strings.HasPrefix(path, "labels.") {
if updated.Labels == nil {
updated.Labels = map[string]string{}
}
key := strings.TrimPrefix(path, "labels.")
updated.Labels[key] = sandbox.Labels[key]
continue
} else if strings.HasPrefix(path, "extensions.") {
if updated.Extensions == nil {
updated.Extensions = map[string]types.Any{}
}
key := strings.TrimPrefix(path, "extensions.")
updated.Extensions[key] = sandbox.Extensions[key]
continue
}
switch path {
case "labels":
updated.Labels = sandbox.Labels
case "extensions":
updated.Extensions = sandbox.Extensions
case "runtime":
updated.Runtime = sandbox.Runtime
case "spec":
updated.Spec = sandbox.Spec
default:
return errors.Wrapf(errdefs.ErrInvalidArgument, "cannot update %q field on sandbox %q", path, sandbox.ID)
}
}
updated.UpdatedAt = time.Now().UTC()
if err := s.validate(&updated); err != nil {
return err
}
if err := s.write(parent, &updated, true); err != nil {
return err
}
ret = updated
return nil
}); err != nil {
return api.Sandbox{}, err
}
return ret, nil
}
// Get sandbox metadata using the id
func (s *sandboxStore) Get(ctx context.Context, id string) (api.Sandbox, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return api.Sandbox{}, err
}
ret := api.Sandbox{}
if err := view(ctx, s.db, func(tx *bbolt.Tx) error {
bucket := getSandboxBucket(tx, ns)
if bucket == nil {
return errors.Wrap(errdefs.ErrNotFound, "no sandbox buckets")
}
out, err := s.read(bucket, []byte(id))
if err != nil {
return err
}
ret = out
return nil
}); err != nil {
return api.Sandbox{}, err
}
return ret, nil
}
// List returns sandboxes that match one or more of the provided filters
func (s *sandboxStore) List(ctx context.Context, fields ...string) ([]api.Sandbox, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
filter, err := filters.ParseAll(fields...)
if err != nil {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, err.Error())
}
var (
list []api.Sandbox
)
if err := view(ctx, s.db, func(tx *bbolt.Tx) error {
bucket := getSandboxBucket(tx, ns)
if bucket == nil {
return errors.Wrap(errdefs.ErrNotFound, "not sandbox buckets")
}
if err := bucket.ForEach(func(k, v []byte) error {
info, err := s.read(bucket, k)
if err != nil {
return errors.Wrapf(err, "failed to read bucket %q", string(k))
}
if filter.Match(adaptSandbox(&info)) {
list = append(list, info)
}
return nil
}); err != nil {
return err
}
return nil
}); err != nil {
return nil, err
}
return list, nil
}
// Delete a sandbox from metadata store using the id
func (s *sandboxStore) Delete(ctx context.Context, id string) error {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
if err := update(ctx, s.db, func(tx *bbolt.Tx) error {
buckets := getSandboxBucket(tx, ns)
if buckets == nil {
return errors.Wrap(errdefs.ErrNotFound, "no sandbox buckets")
}
if err := buckets.DeleteBucket([]byte(id)); err != nil {
return errors.Wrapf(err, "failed to delete sandbox %q", id)
}
return nil
}); err != nil {
return err
}
return nil
}
func (s *sandboxStore) write(parent *bbolt.Bucket, instance *api.Sandbox, overwrite bool) error {
var (
bucket *bbolt.Bucket
err error
id = []byte(instance.ID)
)
if overwrite {
bucket, err = parent.CreateBucketIfNotExists(id)
if err != nil {
return err
}
} else {
bucket = parent.Bucket(id)
if bucket != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "sandbox bucket %q already exists", instance.ID)
}
bucket, err = parent.CreateBucket(id)
if err != nil {
return err
}
}
if err := boltutil.WriteTimestamps(bucket, instance.CreatedAt, instance.UpdatedAt); err != nil {
return err
}
if err := boltutil.WriteLabels(bucket, instance.Labels); err != nil {
return err
}
if err := boltutil.WriteExtensions(bucket, instance.Extensions); err != nil {
return err
}
if err := boltutil.WriteAny(bucket, bucketKeySpec, instance.Spec); err != nil {
return err
}
runtimeBucket, err := bucket.CreateBucketIfNotExists(bucketKeyRuntime)
if err != nil {
return err
}
if err := runtimeBucket.Put(bucketKeyName, []byte(instance.Runtime.Name)); err != nil {
return err
}
if err := boltutil.WriteAny(runtimeBucket, bucketKeyOptions, instance.Runtime.Options); err != nil {
return err
}
return nil
}
func (s *sandboxStore) read(parent *bbolt.Bucket, id []byte) (api.Sandbox, error) {
var (
inst api.Sandbox
err error
)
bucket := parent.Bucket(id)
if bucket == nil {
return api.Sandbox{}, errors.Wrapf(errdefs.ErrNotFound, "bucket %q not found", id)
}
inst.ID = string(id)
inst.Labels, err = boltutil.ReadLabels(bucket)
if err != nil {
return api.Sandbox{}, err
}
if err := boltutil.ReadTimestamps(bucket, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
return api.Sandbox{}, err
}
inst.Spec, err = boltutil.ReadAny(bucket, bucketKeySpec)
if err != nil {
return api.Sandbox{}, err
}
runtimeBucket := bucket.Bucket(bucketKeyRuntime)
if runtimeBucket == nil {
return api.Sandbox{}, errors.New("no runtime bucket")
}
inst.Runtime.Name = string(runtimeBucket.Get(bucketKeyName))
inst.Runtime.Options, err = boltutil.ReadAny(runtimeBucket, bucketKeyOptions)
if err != nil {
return api.Sandbox{}, err
}
inst.Extensions, err = boltutil.ReadExtensions(bucket)
if err != nil {
return api.Sandbox{}, err
}
return inst, nil
}
func (s *sandboxStore) validate(new *api.Sandbox) error {
if new.ID == "" {
return errors.Wrap(errdefs.ErrInvalidArgument, "instance ID must not be empty")
}
if new.CreatedAt.IsZero() {
return errors.Wrap(errdefs.ErrInvalidArgument, "creation date must not be zero")
}
if new.UpdatedAt.IsZero() {
return errors.Wrap(errdefs.ErrInvalidArgument, "updated date must not be zero")
}
if new.Runtime.Name == "" {
return errors.Wrapf(errdefs.ErrInvalidArgument, "sandbox.Runtime.Name must be set")
}
return nil
}

290
metadata/sandbox_test.go Normal file
View File

@ -0,0 +1,290 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metadata
import (
"reflect"
"testing"
"github.com/containerd/containerd/errdefs"
api "github.com/containerd/containerd/sandbox"
"github.com/gogo/protobuf/types"
)
func TestSandboxCreate(t *testing.T) {
ctx, db, done := testDB(t)
defer done()
store := NewSandboxStore(db)
in := api.Sandbox{
ID: "1",
Labels: map[string]string{"a": "1", "b": "2"},
Spec: &types.Any{TypeUrl: "1", Value: []byte{1, 2, 3}},
Extensions: map[string]types.Any{
"ext1": {TypeUrl: "url/1", Value: []byte{1, 2, 3}},
"ext2": {TypeUrl: "url/2", Value: []byte{3, 2, 1}},
},
Runtime: api.RuntimeOpts{
Name: "test",
Options: &types.Any{TypeUrl: "url/3", Value: []byte{4, 5, 6}},
},
}
_, err := store.Create(ctx, in)
if err != nil {
t.Fatal(err)
}
out, err := store.Get(ctx, "1")
if err != nil {
t.Fatal(err)
}
assertEqualInstances(t, in, out)
}
func TestSandboxCreateDup(t *testing.T) {
ctx, db, done := testDB(t)
defer done()
store := NewSandboxStore(db)
in := api.Sandbox{
ID: "1",
Spec: &types.Any{TypeUrl: "1", Value: []byte{1, 2, 3}},
Runtime: api.RuntimeOpts{Name: "test"},
}
_, err := store.Create(ctx, in)
if err != nil {
t.Fatal(err)
}
_, err = store.Create(ctx, in)
if !errdefs.IsAlreadyExists(err) {
t.Fatalf("expected %+v, got %+v", errdefs.ErrAlreadyExists, err)
}
}
func TestSandboxUpdate(t *testing.T) {
ctx, db, done := testDB(t)
defer done()
store := NewSandboxStore(db)
if _, err := store.Create(ctx, api.Sandbox{
ID: "2",
Labels: map[string]string{"lbl1": "existing"},
Spec: &types.Any{TypeUrl: "1", Value: []byte{1}}, // will replace
Extensions: map[string]types.Any{
"ext2": {TypeUrl: "url2", Value: []byte{4, 5, 6}}, // will append `ext1`
},
Runtime: api.RuntimeOpts{Name: "test"}, // no change
}); err != nil {
t.Fatal(err)
}
expectedSpec := types.Any{TypeUrl: "2", Value: []byte{3, 2, 1}}
out, err := store.Update(ctx, api.Sandbox{
ID: "2",
Labels: map[string]string{"lbl1": "new"},
Spec: &expectedSpec,
Extensions: map[string]types.Any{
"ext1": {TypeUrl: "url1", Value: []byte{1, 2}},
},
}, "labels.lbl1", "extensions.ext1", "spec")
if err != nil {
t.Fatal(err)
}
expected := api.Sandbox{
ID: "2",
Spec: &expectedSpec,
Labels: map[string]string{
"lbl1": "new",
},
Extensions: map[string]types.Any{
"ext1": {TypeUrl: "url1", Value: []byte{1, 2}},
"ext2": {TypeUrl: "url2", Value: []byte{4, 5, 6}},
},
Runtime: api.RuntimeOpts{Name: "test"},
}
assertEqualInstances(t, out, expected)
}
func TestSandboxGetInvalid(t *testing.T) {
ctx, db, done := testDB(t)
defer done()
store := NewSandboxStore(db)
_, err := store.Get(ctx, "invalid_id")
if err == nil {
t.Fatalf("expected %+v error for invalid ID", errdefs.ErrNotFound)
} else if !errdefs.IsNotFound(err) {
t.Fatalf("unexpected error %T type", err)
}
}
func TestSandboxList(t *testing.T) {
ctx, db, done := testDB(t)
defer done()
store := NewSandboxStore(db)
in := []api.Sandbox{
{
ID: "1",
Labels: map[string]string{"test": "1"},
Spec: &types.Any{TypeUrl: "1", Value: []byte{1, 2, 3}},
Extensions: map[string]types.Any{"ext": {}},
Runtime: api.RuntimeOpts{Name: "test"},
},
{
ID: "2",
Labels: map[string]string{"test": "2"},
Spec: &types.Any{TypeUrl: "2", Value: []byte{3, 2, 1}},
Extensions: map[string]types.Any{"ext": {
TypeUrl: "test",
Value: []byte{9},
}},
Runtime: api.RuntimeOpts{Name: "test"},
},
}
for _, inst := range in {
_, err := store.Create(ctx, inst)
if err != nil {
t.Fatal(err)
}
}
out, err := store.List(ctx)
if err != nil {
t.Fatal(err)
}
if len(in) != len(out) {
t.Fatalf("expected list size: %d != %d", len(in), len(out))
}
for i := range out {
assertEqualInstances(t, out[i], in[i])
}
}
func TestSandboxListWithFilter(t *testing.T) {
ctx, db, done := testDB(t)
defer done()
store := NewSandboxStore(db)
in := []api.Sandbox{
{
ID: "1",
Labels: map[string]string{"test": "1"},
Spec: &types.Any{TypeUrl: "1", Value: []byte{1, 2, 3}},
Extensions: map[string]types.Any{"ext": {}},
Runtime: api.RuntimeOpts{Name: "test"},
},
{
ID: "2",
Labels: map[string]string{"test": "2"},
Spec: &types.Any{TypeUrl: "2", Value: []byte{3, 2, 1}},
Extensions: map[string]types.Any{"ext": {
TypeUrl: "test",
Value: []byte{9},
}},
Runtime: api.RuntimeOpts{Name: "test"},
},
}
for _, inst := range in {
_, err := store.Create(ctx, inst)
if err != nil {
t.Fatal(err)
}
}
out, err := store.List(ctx, "id==1")
if err != nil {
t.Fatal(err)
}
if len(out) != 1 {
t.Fatalf("expected list to contain 1 element, got %d", len(out))
}
assertEqualInstances(t, out[0], in[0])
}
func TestSandboxDelete(t *testing.T) {
ctx, db, done := testDB(t)
defer done()
store := NewSandboxStore(db)
in := api.Sandbox{
ID: "2",
Spec: &types.Any{TypeUrl: "1", Value: []byte{1, 2, 3}},
Runtime: api.RuntimeOpts{Name: "test"},
}
_, err := store.Create(ctx, in)
if err != nil {
t.Fatal(err)
}
err = store.Delete(ctx, "2")
if err != nil {
t.Fatalf("deleted failed %+v", err)
}
_, err = store.Get(ctx, "2")
if !errdefs.IsNotFound(err) {
t.Fatalf("unexpected err result: %+v != %+v", err, errdefs.ErrNotFound)
}
}
func assertEqualInstances(t *testing.T, x, y api.Sandbox) {
if x.ID != y.ID {
t.Fatalf("ids are not equal: %q != %q", x.ID, y.ID)
}
if !reflect.DeepEqual(x.Labels, y.Labels) {
t.Fatalf("labels are not equal: %+v != %+v", x.Labels, y.Labels)
}
if !reflect.DeepEqual(x.Spec, y.Spec) {
t.Fatalf("specs are not equal: %+v != %+v", x.Spec, y.Spec)
}
if !reflect.DeepEqual(x.Extensions, y.Extensions) {
t.Fatalf("extensions are not equal: %+v != %+v", x.Extensions, y.Extensions)
}
if x.Runtime.Name != y.Runtime.Name {
t.Fatalf("runtime names are not equal: %q != %q", x.Runtime.Name, y.Runtime.Name)
}
if !reflect.DeepEqual(x.Runtime.Options, y.Runtime.Options) {
t.Fatalf("runtime options are not equal: %+v != %+v", x.Runtime.Options, y.Runtime.Options)
}
}