leases: support resource management

Add three methods for lease service so that the client can use it to
manage the resource by lease, not just gc.root label. With the following
methods, it is easy for client to maintain their own cache system.

```
 - AddResource(context.Context, Lease, Resource) error
 - RemoveResource(context.Context, Lease, Resource) error
 - ListResources(context.Context, Lease) ([]Resource, error)
```

And the resource is to be

```golang
type Resource {
  ID   string
  Type string
}
```

For the snapshots, the Type field will be formatted by
snapshots/%{type}, like snapshots/overlayfs.

fix: #3295

Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
Wei Fu 2019-05-26 23:54:48 +08:00
parent 0e7a3c9e51
commit 8a388d6238
10 changed files with 1930 additions and 34 deletions

View File

@ -2627,6 +2627,89 @@ file {
json_name: "leases" json_name: "leases"
} }
} }
message_type {
name: "Resource"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "type"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "type"
}
}
message_type {
name: "AddResourceRequest"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "resource"
number: 2
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Resource"
options {
65001: 0
}
json_name: "resource"
}
}
message_type {
name: "DeleteResourceRequest"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "resource"
number: 2
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Resource"
options {
65001: 0
}
json_name: "resource"
}
}
message_type {
name: "ListResourcesRequest"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
}
message_type {
name: "ListResourcesResponse"
field {
name: "resources"
number: 1
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Resource"
options {
65001: 0
}
json_name: "resources"
}
}
service { service {
name: "Leases" name: "Leases"
method { method {
@ -2644,6 +2727,21 @@ file {
input_type: ".containerd.services.leases.v1.ListRequest" input_type: ".containerd.services.leases.v1.ListRequest"
output_type: ".containerd.services.leases.v1.ListResponse" output_type: ".containerd.services.leases.v1.ListResponse"
} }
method {
name: "AddResource"
input_type: ".containerd.services.leases.v1.AddResourceRequest"
output_type: ".google.protobuf.Empty"
}
method {
name: "DeleteResource"
input_type: ".containerd.services.leases.v1.DeleteResourceRequest"
output_type: ".google.protobuf.Empty"
}
method {
name: "ListResources"
input_type: ".containerd.services.leases.v1.ListResourcesRequest"
output_type: ".containerd.services.leases.v1.ListResourcesResponse"
}
} }
options { options {
go_package: "github.com/containerd/containerd/api/services/leases/v1;leases" go_package: "github.com/containerd/containerd/api/services/leases/v1;leases"

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,15 @@ service Leases {
// List lists all active leases, returning the full list of // List lists all active leases, returning the full list of
// leases and optionally including the referenced resources. // leases and optionally including the referenced resources.
rpc List(ListRequest) returns (ListResponse); rpc List(ListRequest) returns (ListResponse);
// AddResource references the resource by the provided lease.
rpc AddResource(AddResourceRequest) returns (google.protobuf.Empty);
// DeleteResource dereferences the resource by the provided lease.
rpc DeleteResource(DeleteResourceRequest) returns (google.protobuf.Empty);
// ListResources lists all the resources referenced by the lease.
rpc ListResources(ListResourcesRequest) returns (ListResourcesResponse);
} }
// Lease is an object which retains resources while it exists. // Lease is an object which retains resources while it exists.
@ -62,3 +71,32 @@ message ListRequest {
message ListResponse { message ListResponse {
repeated Lease leases = 1; repeated Lease leases = 1;
} }
message Resource {
string id = 1;
// For snapshotter resource, there are many snapshotter types here, like
// overlayfs, devmapper etc. The type will be formatted with type,
// like "snapshotter/overlayfs".
string type = 2;
}
message AddResourceRequest {
string id = 1;
Resource resource = 2 [(gogoproto.nullable) = false];
}
message DeleteResourceRequest {
string id = 1;
Resource resource = 2 [(gogoproto.nullable) = false];
}
message ListResourcesRequest {
string id = 1;
}
message ListResourcesResponse {
repeated Resource resources = 1 [(gogoproto.nullable) = false];
}

141
lease_test.go Normal file
View File

@ -0,0 +1,141 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containerd
import (
"runtime"
"testing"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/opencontainers/image-spec/identity"
"github.com/pkg/errors"
)
func TestLeaseResources(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip()
}
ctx, cancel := testContext()
defer cancel()
client, err := newClient(t, address)
if err != nil {
t.Fatal(err)
}
defer client.Close()
var (
ls = client.LeasesService()
cs = client.ContentStore()
imgSrv = client.ImageService()
sn = client.SnapshotService("native")
)
l, err := ls.Create(ctx, leases.WithRandomID())
if err != nil {
t.Fatal(err)
}
defer ls.Delete(ctx, l, leases.SynchronousDelete)
// step 1: download image
imageName := "docker.io/library/busybox:1.25"
image, err := client.Pull(ctx, imageName, WithPullUnpack, WithPullSnapshotter("native"))
if err != nil {
t.Fatal(err)
}
defer imgSrv.Delete(ctx, imageName)
// both the config and snapshotter should exist
cfgDesc, err := image.Config(ctx)
if err != nil {
t.Fatal(err)
}
if _, err := cs.Info(ctx, cfgDesc.Digest); err != nil {
t.Fatal(err)
}
dgsts, err := image.RootFS(ctx)
if err != nil {
t.Fatal(err)
}
chainID := identity.ChainID(dgsts)
if _, err := sn.Stat(ctx, chainID.String()); err != nil {
t.Fatal(err)
}
// step 2: reference snapshotter with lease
r := leases.Resource{
ID: chainID.String(),
Type: "snapshots/native",
}
if err := ls.AddResource(ctx, l, r); err != nil {
t.Fatal(err)
}
list, err := ls.ListResources(ctx, l)
if err != nil {
t.Fatal(err)
}
if len(list) != 1 || list[0] != r {
t.Fatalf("expected (%v), but got (%v)", []leases.Resource{r}, list)
}
// step 3: remove image and check the status of snapshotter and content
if err := imgSrv.Delete(ctx, imageName, images.SynchronousDelete()); err != nil {
t.Fatal(err)
}
// config should be removed but the snapshotter should exist
if _, err := cs.Info(ctx, cfgDesc.Digest); errors.Cause(err) != errdefs.ErrNotFound {
t.Fatalf("expected error(%v), but got(%v)", errdefs.ErrNotFound, err)
}
if _, err := sn.Stat(ctx, chainID.String()); err != nil {
t.Fatal(err)
}
// step 4: remove resource from the lease and check the list API
if err := ls.DeleteResource(ctx, l, r); err != nil {
t.Fatal(err)
}
list, err = ls.ListResources(ctx, l)
if err != nil {
t.Fatal(err)
}
if len(list) != 0 {
t.Fatalf("expected nothing, but got (%v)", list)
}
// step 5: remove the lease to check the status of snapshotter
if err := ls.Delete(ctx, l, leases.SynchronousDelete); err != nil {
t.Fatal(err)
}
if _, err := sn.Stat(ctx, chainID.String()); errors.Cause(err) != errdefs.ErrNotFound {
t.Fatalf("expected error(%v), but got(%v)", errdefs.ErrNotFound, err)
}
}

View File

@ -32,6 +32,9 @@ type Manager interface {
Create(context.Context, ...Opt) (Lease, error) Create(context.Context, ...Opt) (Lease, error)
Delete(context.Context, Lease, ...DeleteOpt) error Delete(context.Context, Lease, ...DeleteOpt) error
List(context.Context, ...string) ([]Lease, error) List(context.Context, ...string) ([]Lease, error)
AddResource(context.Context, Lease, Resource) error
DeleteResource(context.Context, Lease, Resource) error
ListResources(context.Context, Lease) ([]Resource, error)
} }
// Lease retains resources to prevent cleanup before // Lease retains resources to prevent cleanup before
@ -42,6 +45,13 @@ type Lease struct {
Labels map[string]string Labels map[string]string
} }
// Resource represents low level resource of image, like content, ingest and
// snapshotter.
type Resource struct {
ID string
Type string
}
// DeleteOptions provide options on image delete // DeleteOptions provide options on image delete
type DeleteOptions struct { type DeleteOptions struct {
Synchronous bool Synchronous bool

View File

@ -91,3 +91,43 @@ func (pm *proxyManager) List(ctx context.Context, filters ...string) ([]leases.L
return l, nil return l, nil
} }
func (pm *proxyManager) AddResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
_, err := pm.client.AddResource(ctx, &leasesapi.AddResourceRequest{
ID: lease.ID,
Resource: leasesapi.Resource{
ID: r.ID,
Type: r.Type,
},
})
return errdefs.FromGRPC(err)
}
func (pm *proxyManager) DeleteResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
_, err := pm.client.DeleteResource(ctx, &leasesapi.DeleteResourceRequest{
ID: lease.ID,
Resource: leasesapi.Resource{
ID: r.ID,
Type: r.Type,
},
})
return errdefs.FromGRPC(err)
}
func (pm *proxyManager) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) {
resp, err := pm.client.ListResources(ctx, &leasesapi.ListResourcesRequest{
ID: lease.ID,
})
if err != nil {
return nil, errdefs.FromGRPC(err)
}
rs := make([]leases.Resource, 0, len(resp.Resources))
for _, i := range resp.Resources {
rs = append(rs, leases.Resource{
ID: i.ID,
Type: i.Type,
})
}
return rs, nil
}

View File

@ -18,6 +18,8 @@ package metadata
import ( import (
"context" "context"
"fmt"
"strings"
"time" "time"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
@ -167,6 +169,128 @@ func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease,
return ll, nil return ll, nil
} }
// AddResource references the resource by the provided lease.
func (lm *LeaseManager) AddResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
keys, ref, err := parseLeaseResource(r)
if err != nil {
return err
}
bkt := topbkt
for _, key := range keys {
bkt, err = bkt.CreateBucketIfNotExists([]byte(key))
if err != nil {
return err
}
}
return bkt.Put([]byte(ref), nil)
}
// DeleteResource dereferences the resource by the provided lease.
func (lm *LeaseManager) DeleteResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
keys, ref, err := parseLeaseResource(r)
if err != nil {
return err
}
bkt := topbkt
for _, key := range keys {
if bkt == nil {
break
}
bkt = bkt.Bucket([]byte(key))
}
if bkt == nil {
return nil
}
return bkt.Delete([]byte(ref))
}
// ListResources lists all the resources referenced by the lease.
func (lm *LeaseManager) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lease.ID))
if topbkt == nil {
return nil, errors.Wrapf(errdefs.ErrNotFound, "lease %q", lease.ID)
}
rs := make([]leases.Resource, 0)
// content resources
if cbkt := topbkt.Bucket(bucketKeyObjectContent); cbkt != nil {
if err := cbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: string(bucketKeyObjectContent),
})
return nil
}); err != nil {
return nil, err
}
}
// ingest resources
if lbkt := topbkt.Bucket(bucketKeyObjectIngests); lbkt != nil {
if err := lbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: string(bucketKeyObjectIngests),
})
return nil
}); err != nil {
return nil, err
}
}
// snapshot resources
if sbkt := topbkt.Bucket(bucketKeyObjectSnapshots); sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, _ []byte) error {
rs = append(rs, leases.Resource{
ID: string(k),
Type: fmt.Sprintf("%s/%s", bucketKeyObjectSnapshots, sk),
})
return nil
})
}); err != nil {
return nil, err
}
}
return rs, nil
}
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error { func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.FromContext(ctx) lid, ok := leases.FromContext(ctx)
if !ok { if !ok {
@ -307,3 +431,36 @@ func removeIngestLease(ctx context.Context, tx *bolt.Tx, ref string) error {
return bkt.Delete([]byte(ref)) return bkt.Delete([]byte(ref))
} }
func parseLeaseResource(r leases.Resource) ([]string, string, error) {
var (
ref = r.ID
typ = r.Type
keys = strings.Split(typ, "/")
)
switch k := keys[0]; k {
case string(bucketKeyObjectContent),
string(bucketKeyObjectIngests):
if len(keys) != 1 {
return nil, "", errors.Wrapf(errdefs.ErrInvalidArgument, "invalid resource type %s", typ)
}
if k == string(bucketKeyObjectContent) {
dgst, err := digest.Parse(ref)
if err != nil {
return nil, "", errors.Wrapf(errdefs.ErrInvalidArgument, "invalid content resource id %s: %v", ref, err)
}
ref = dgst.String()
}
case string(bucketKeyObjectSnapshots):
if len(keys) != 2 {
return nil, "", errors.Wrapf(errdefs.ErrInvalidArgument, "invalid snapshot resource type %s", typ)
}
default:
return nil, "", errors.Wrapf(errdefs.ErrNotImplemented, "resource type %s not supported yet", typ)
}
return keys, ref, nil
}

View File

@ -264,3 +264,181 @@ func TestLeasesList(t *testing.T) {
} }
} }
} }
func TestLeaseResource(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
var (
leaseID = "l1"
lease = leases.Lease{
ID: leaseID,
}
snapshotterKey = "RstMI3X8vguKoPFkmIStZ5fQFI7F1L0o"
)
// prepare lease
if err := db.Update(func(tx *bolt.Tx) error {
_, err0 := NewLeaseManager(tx).Create(ctx, leases.WithID(leaseID))
return err0
}); err != nil {
t.Fatal(err)
}
testCases := []struct {
lease leases.Lease
resource leases.Resource
err error
}{
{
lease: lease,
resource: leases.Resource{
ID: "sha256:29f5d56d12684887bdfa50dcd29fc31eea4aaf4ad3bec43daf19026a7ce69912",
Type: "content",
},
},
{
lease: lease,
resource: leases.Resource{
ID: "d2UdcINOwrBTQG9kS8rySAM3eMNBSojH",
Type: "ingests",
},
},
{
// allow to add resource which exists
lease: lease,
resource: leases.Resource{
ID: "d2UdcINOwrBTQG9kS8rySAM3eMNBSojH",
Type: "ingests",
},
},
{
// not allow to reference to lease
lease: lease,
resource: leases.Resource{
ID: "xCAV3F6PddlXitbtby0Vo23Qof6RTWpG",
Type: "leases",
},
err: errdefs.ErrNotImplemented,
},
{
// not allow to reference to container
lease: lease,
resource: leases.Resource{
ID: "05O9ljptPu5Qq9kZGOacEfymBwQFM8ZH",
Type: "containers",
},
err: errdefs.ErrNotImplemented,
},
{
// not allow to reference to image
lease: lease,
resource: leases.Resource{
ID: "qBUHpWBn03YaCt9cL3PPGKWoxBqTlLfu",
Type: "image",
},
err: errdefs.ErrNotImplemented,
},
{
lease: lease,
resource: leases.Resource{
ID: "HMemOhlygombYhkhHhAZj5aRbDy2a3z2",
Type: "snapshots",
},
err: errdefs.ErrInvalidArgument,
},
{
lease: lease,
resource: leases.Resource{
ID: snapshotterKey,
Type: "snapshots/overlayfs",
},
},
{
lease: lease,
resource: leases.Resource{
ID: "HMemOhlygombYhkhHhAZj5aRbDy2a3z2",
Type: "snapshots/overlayfs/type1",
},
err: errdefs.ErrInvalidArgument,
},
{
lease: leases.Lease{
ID: "non-found",
},
resource: leases.Resource{
ID: "HMemOhlygombYhkhHhAZj5aRbDy2a3z2",
Type: "snapshots/overlayfs",
},
err: errdefs.ErrNotFound,
},
}
idxList := make(map[leases.Resource]bool)
for i, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
err0 := NewLeaseManager(tx).AddResource(ctx, tc.lease, tc.resource)
if got := errors.Cause(err0); got != tc.err {
return errors.Errorf("expect error (%v), but got (%v)", tc.err, err0)
}
if err0 == nil {
// not visited yet
idxList[tc.resource] = false
}
return nil
}); err != nil {
t.Fatalf("failed to run case %d with resource: %v", i, err)
}
}
// check list function
var gotList []leases.Resource
if err := db.View(func(tx *bolt.Tx) error {
var err error
gotList, err = NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
t.Fatal(err)
}
if len(gotList) != len(idxList) {
t.Fatalf("expected (%d) resources, but got (%d)", len(idxList), len(gotList))
}
for _, r := range gotList {
visited, ok := idxList[r]
if !ok {
t.Fatalf("unexpected resource(%v)", r)
}
if visited {
t.Fatalf("duplicate resource(%v)", r)
}
idxList[r] = true
}
// remove snapshots
if err := db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).DeleteResource(ctx, lease, leases.Resource{
ID: snapshotterKey,
Type: "snapshots/overlayfs",
})
}); err != nil {
t.Fatal(err)
}
// check list number
if err := db.View(func(tx *bolt.Tx) error {
var err error
gotList, err = NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
t.Fatal(err)
}
if len(gotList)+1 != len(idxList) {
t.Fatalf("expected (%d) resources, but got (%d)", len(idxList)-1, len(gotList))
}
}

View File

@ -107,3 +107,27 @@ func (l *local) List(ctx context.Context, filters ...string) ([]leases.Lease, er
} }
return ll, nil return ll, nil
} }
func (l *local) AddResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
return l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).AddResource(ctx, lease, r)
})
}
func (l *local) DeleteResource(ctx context.Context, lease leases.Lease, r leases.Resource) error {
return l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).DeleteResource(ctx, lease, r)
})
}
func (l *local) ListResources(ctx context.Context, lease leases.Lease) ([]leases.Resource, error) {
var rs []leases.Resource
if err := l.db.View(func(tx *bolt.Tx) error {
var err error
rs, err = metadata.NewLeaseManager(tx).ListResources(ctx, lease)
return err
}); err != nil {
return nil, err
}
return rs, nil
}

View File

@ -113,6 +113,56 @@ func (s *service) List(ctx context.Context, r *api.ListRequest) (*api.ListRespon
}, nil }, nil
} }
func (s *service) AddResource(ctx context.Context, r *api.AddResourceRequest) (*ptypes.Empty, error) {
lease := leases.Lease{
ID: r.ID,
}
if err := s.lm.AddResource(ctx, lease, leases.Resource{
ID: r.Resource.ID,
Type: r.Resource.Type,
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &ptypes.Empty{}, nil
}
func (s *service) DeleteResource(ctx context.Context, r *api.DeleteResourceRequest) (*ptypes.Empty, error) {
lease := leases.Lease{
ID: r.ID,
}
if err := s.lm.DeleteResource(ctx, lease, leases.Resource{
ID: r.Resource.ID,
Type: r.Resource.Type,
}); err != nil {
return nil, errdefs.ToGRPC(err)
}
return &ptypes.Empty{}, nil
}
func (s *service) ListResources(ctx context.Context, r *api.ListResourcesRequest) (*api.ListResourcesResponse, error) {
lease := leases.Lease{
ID: r.ID,
}
rs, err := s.lm.ListResources(ctx, lease)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
apiResources := make([]api.Resource, 0, len(rs))
for _, i := range rs {
apiResources = append(apiResources, api.Resource{
ID: i.ID,
Type: i.Type,
})
}
return &api.ListResourcesResponse{
Resources: apiResources,
}, nil
}
func leaseToGRPC(l leases.Lease) *api.Lease { func leaseToGRPC(l leases.Lease) *api.Lease {
return &api.Lease{ return &api.Lease{
ID: l.ID, ID: l.ID,