Add leases api

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2017-10-26 16:16:14 -07:00
parent 01cdf330bb
commit e13894bb7a
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
19 changed files with 2671 additions and 13 deletions

View File

@ -2405,6 +2405,182 @@ file {
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/services/leases/v1/leases.proto"
package: "containerd.services.leases.v1"
dependency: "gogoproto/gogo.proto"
dependency: "google/protobuf/empty.proto"
dependency: "google/protobuf/timestamp.proto"
message_type {
name: "Snapshot"
field {
name: "snapshotter"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "snapshotter"
}
field {
name: "key"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "key"
}
}
message_type {
name: "Lease"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "created_at"
number: 2
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Timestamp"
options {
65010: 1
65001: 0
}
json_name: "createdAt"
}
field {
name: "labels"
number: 3
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Lease.LabelsEntry"
json_name: "labels"
}
nested_type {
name: "LabelsEntry"
field {
name: "key"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "key"
}
field {
name: "value"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "value"
}
options {
map_entry: true
}
}
}
message_type {
name: "CreateRequest"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "labels"
number: 3
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.CreateRequest.LabelsEntry"
json_name: "labels"
}
nested_type {
name: "LabelsEntry"
field {
name: "key"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "key"
}
field {
name: "value"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "value"
}
options {
map_entry: true
}
}
}
message_type {
name: "CreateResponse"
field {
name: "lease"
number: 1
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Lease"
json_name: "lease"
}
}
message_type {
name: "DeleteRequest"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
}
message_type {
name: "ListRequest"
field {
name: "filters"
number: 1
label: LABEL_REPEATED
type: TYPE_STRING
json_name: "filters"
}
}
message_type {
name: "ListResponse"
field {
name: "leases"
number: 1
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Lease"
json_name: "leases"
}
}
service {
name: "Leases"
method {
name: "Create"
input_type: ".containerd.services.leases.v1.CreateRequest"
output_type: ".containerd.services.leases.v1.CreateResponse"
}
method {
name: "Delete"
input_type: ".containerd.services.leases.v1.DeleteRequest"
output_type: ".google.protobuf.Empty"
}
method {
name: "List"
input_type: ".containerd.services.leases.v1.ListRequest"
output_type: ".containerd.services.leases.v1.ListResponse"
}
}
options {
go_package: "github.com/containerd/containerd/api/services/leases/v1;leases"
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/services/namespaces/v1/namespace.proto"
package: "containerd.services.namespaces.v1"

View File

@ -0,0 +1 @@
package leases

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,64 @@
syntax = "proto3";
package containerd.services.leases.v1;
import "gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/leases/v1;leases";
// Leases service manages resources leases within the metadata store.
service Leases {
// Create creates a new lease for managing changes to metadata. A lease
// can be used to protect objects from being removed.
rpc Create(CreateRequest) returns (CreateResponse);
// Delete deletes the lease and makes any unreferenced objects created
// during the lease eligible for garbage collection if not referenced
// or retained by other resources during the lease.
rpc Delete(DeleteRequest) returns (google.protobuf.Empty);
// ListTransactions lists all active leases, returning the full list of
// leases and optionally including the referenced resources.
rpc List(ListRequest) returns (ListResponse);
}
// Snapshot is a snapshot resource reference.
message Snapshot {
string snapshotter = 1;
string key = 2;
}
// Lease is an object which retains resources while it exists.
message Lease {
string id = 1;
google.protobuf.Timestamp created_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
map<string, string> labels = 3;
}
message CreateRequest {
// ID is used to identity the lease, when the id is not set the service
// generates a random identifier for the lease.
string id = 1;
map<string, string> labels = 3;
}
message CreateResponse {
Lease lease = 1;
}
message DeleteRequest {
string id = 1;
}
message ListRequest {
repeated string filters = 1;
}
message ListResponse {
repeated Lease leases = 1;
}

View File

@ -25,6 +25,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
@ -137,6 +138,14 @@ func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container
// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
l, err := c.CreateLease(ctx)
if err != nil {
return nil, err
}
defer l.Delete(ctx)
ctx = leases.WithLease(ctx, l.ID())
container := containers.Container{
ID: id,
Runtime: containers.RuntimeInfo{
@ -212,6 +221,14 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
}
store := c.ContentStore()
l, err := c.CreateLease(ctx)
if err != nil {
return nil, err
}
defer l.Delete(ctx)
ctx = leases.WithLease(ctx, l.ID())
name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
if err != nil {
return nil, err
@ -583,6 +600,15 @@ func (c *Client) Import(ctx context.Context, ref string, reader io.Reader, opts
if err != nil {
return nil, err
}
l, err := c.CreateLease(ctx)
if err != nil {
return nil, err
}
defer l.Delete(ctx)
ctx = leases.WithLease(ctx, l.ID())
switch iopts.format {
case ociImageFormat:
return c.importFromOCITar(ctx, ref, reader, iopts)

View File

@ -10,6 +10,7 @@ import (
_ "github.com/containerd/containerd/services/healthcheck"
_ "github.com/containerd/containerd/services/images"
_ "github.com/containerd/containerd/services/introspection"
_ "github.com/containerd/containerd/services/leases"
_ "github.com/containerd/containerd/services/namespaces"
_ "github.com/containerd/containerd/services/snapshot"
_ "github.com/containerd/containerd/services/tasks"

71
lease.go Normal file
View File

@ -0,0 +1,71 @@
package containerd
import (
"context"
"time"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
)
// Lease is used to hold a reference to active resources which have not been
// referenced by a root resource. This is useful for preventing garbage
// collection of resources while they are actively being updated.
type Lease struct {
id string
createdAt time.Time
client *Client
}
// CreateLease creates a new lease
func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{})
if err != nil {
return Lease{}, err
}
return Lease{
id: resp.Lease.ID,
client: c,
}, nil
}
// ListLeases lists active leases
func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.List(ctx, &leasesapi.ListRequest{})
if err != nil {
return nil, err
}
leases := make([]Lease, len(resp.Leases))
for i := range resp.Leases {
leases[i] = Lease{
id: resp.Leases[i].ID,
createdAt: resp.Leases[i].CreatedAt,
client: c,
}
}
return leases, nil
}
// ID returns the lease ID
func (l Lease) ID() string {
return l.id
}
// CreatedAt returns the time at which the lease was created
func (l Lease) CreatedAt() time.Time {
return l.createdAt
}
// Delete deletes the lease, removing the reference to all resources created
// during the lease.
func (l Lease) Delete(ctx context.Context) error {
lapi := leasesapi.NewLeasesClient(l.client.conn)
_, err := lapi.Delete(ctx, &leasesapi.DeleteRequest{
ID: l.id,
})
return err
}

24
leases/context.go Normal file
View File

@ -0,0 +1,24 @@
package leases
import "context"
type leaseKey struct{}
// WithLease sets a given lease on the context
func WithLease(ctx context.Context, lid string) context.Context {
ctx = context.WithValue(ctx, leaseKey{}, lid)
// also store on the grpc headers so it gets picked up by any clients that
// are using this.
return withGRPCLeaseHeader(ctx, lid)
}
// Lease returns the lease from the context.
func Lease(ctx context.Context) (string, bool) {
lid, ok := ctx.Value(leaseKey{}).(string)
if !ok {
return fromGRPCHeader(ctx)
}
return lid, ok
}

41
leases/grpc.go Normal file
View File

@ -0,0 +1,41 @@
package leases
import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
const (
// GRPCHeader defines the header name for specifying a containerd lease.
GRPCHeader = "containerd-lease"
)
func withGRPCLeaseHeader(ctx context.Context, lid string) context.Context {
// also store on the grpc headers so it gets picked up by any clients
// that are using this.
txheader := metadata.Pairs(GRPCHeader, lid)
md, ok := metadata.FromOutgoingContext(ctx) // merge with outgoing context.
if !ok {
md = txheader
} else {
// order ensures the latest is first in this list.
md = metadata.Join(txheader, md)
}
return metadata.NewOutgoingContext(ctx, md)
}
func fromGRPCHeader(ctx context.Context) (string, bool) {
// try to extract for use in grpc servers.
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
values := md[GRPCHeader]
if len(values) == 0 {
return "", false
}
return values[0], true
}

View File

@ -38,6 +38,7 @@ var (
bucketKeyObjectContent = []byte("content") // stores content references
bucketKeyObjectBlob = []byte("blob") // stores content links
bucketKeyObjectIngest = []byte("ingest") // stores ingest links
bucketKeyObjectLeases = []byte("leases") // stores leases
bucketKeyDigest = []byte("digest")
bucketKeyMediaType = []byte("mediatype")
@ -53,6 +54,7 @@ var (
bucketKeySnapshotter = []byte("snapshotter")
bucketKeyTarget = []byte("target")
bucketKeyExtensions = []byte("extensions")
bucketKeyCreatedAt = []byte("createdat")
)
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {

View File

@ -391,27 +391,31 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
return err
}
}
return nw.commit(ctx, tx, size, expected, opts...)
dgst, err := nw.commit(ctx, tx, size, expected, opts...)
if err != nil {
return err
}
return addContentLease(ctx, tx, dgst)
})
}
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error {
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) {
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
return "", err
}
}
if err := validateInfo(&base); err != nil {
return err
return "", err
}
status, err := nw.Writer.Status()
if err != nil {
return err
return "", err
}
if size != 0 && size != status.Offset {
return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
@ -419,32 +423,32 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return err
return "", err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
if err != nil {
return err
return "", err
}
commitTime := time.Now().UTC()
sizeEncoded, err := encodeInt(size)
if err != nil {
return err
return "", err
}
if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil {
return err
return "", err
}
if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
return err
return "", err
}
return bkt.Put(bucketKeySize, sizeEncoded)
return actual, bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {

View File

@ -46,6 +46,55 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
nbkt := v1bkt.Bucket(k)
ns := string(k)
lbkt := nbkt.Bucket(bucketKeyObjectLeases)
if lbkt != nil {
if err := lbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
libkt := lbkt.Bucket(k)
cbkt := libkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceContent, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
}); err != nil {
return err
}
}
sbkt := libkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
ibkt := nbkt.Bucket(bucketKeyObjectImages)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {

View File

@ -34,14 +34,22 @@ func TestGCRoots(t *testing.T) {
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "", nil),
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
addLeaseSnapshot("ns2", "l1", "overlay", "sn5"),
addLeaseSnapshot("ns2", "l2", "overlay", "sn6"),
addLeaseContent("ns2", "l1", dgst(4)),
addLeaseContent("ns2", "l2", dgst(5)),
}
expected := []gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()),
gcnode(ResourceContent, "ns1", dgst(2).String()),
gcnode(ResourceContent, "ns2", dgst(2).String()),
gcnode(ResourceContent, "ns2", dgst(4).String()),
gcnode(ResourceContent, "ns2", dgst(5).String()),
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn5"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn6"),
}
if err := db.Update(func(tx *bolt.Tx) error {
@ -374,6 +382,26 @@ func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFu
}
}
func addLeaseSnapshot(ns, lid, snapshotter, name string) alterFunc {
return func(bkt *bolt.Bucket) error {
sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectSnapshots), snapshotter)
if err != nil {
return err
}
return sbkt.Put([]byte(name), nil)
}
}
func addLeaseContent(ns, lid string, dgst digest.Digest) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectContent))
if err != nil {
return err
}
return cbkt.Put([]byte(dgst.String()), nil)
}
}
func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name)

201
metadata/leases.go Normal file
View File

@ -0,0 +1,201 @@
package metadata
import (
"context"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Lease retains resources to prevent garbage collection before
// the resources can be fully referenced.
type Lease struct {
ID string
CreatedAt time.Time
Labels map[string]string
Content []string
Snapshots map[string][]string
}
// LeaseManager manages the create/delete lifecyle of leases
// and also returns existing leases
type LeaseManager struct {
tx *bolt.Tx
}
// NewLeaseManager creates a new lease manager for managing leases using
// the provided database transaction.
func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
return &LeaseManager{
tx: tx,
}
}
// Create creates a new lease using the provided lease
func (lm *LeaseManager) Create(ctx context.Context, lid string, labels map[string]string) (Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return Lease{}, err
}
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return Lease{}, err
}
txbkt, err := topbkt.CreateBucket([]byte(lid))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
}
return Lease{}, err
}
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return Lease{}, err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return Lease{}, err
}
if labels != nil {
if err := boltutil.WriteLabels(txbkt, labels); err != nil {
return Lease{}, err
}
}
return Lease{
ID: lid,
CreatedAt: t,
Labels: labels,
}, nil
}
// Delete delets the lease with the provided lease ID
func (lm *LeaseManager) Delete(ctx context.Context, lid string) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return nil
}
if err := topbkt.DeleteBucket([]byte(lid)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil
}
// List lists all active leases
func (lm *LeaseManager) List(ctx context.Context, includeResources bool, filter ...string) ([]Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var leases []Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return leases, nil
}
if err := topbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
txbkt := topbkt.Bucket(k)
l := Lease{
ID: string(k),
}
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
t := &l.CreatedAt
if err := t.UnmarshalBinary(v); err != nil {
return err
}
}
labels, err := boltutil.ReadLabels(txbkt)
if err != nil {
return err
}
l.Labels = labels
// TODO: Read Snapshots
// TODO: Read Content
leases = append(leases, l)
return nil
}); err != nil {
return nil, err
}
return leases, nil
}
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectSnapshots)
if err != nil {
return err
}
bkt, err = bkt.CreateBucketIfNotExists([]byte(snapshotter))
if err != nil {
return err
}
return bkt.Put([]byte(key), nil)
}
func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectContent)
if err != nil {
return err
}
return bkt.Put([]byte(dgst.String()), nil)
}

90
metadata/leases_test.go Normal file
View File

@ -0,0 +1,90 @@
package metadata
import (
"testing"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/pkg/errors"
)
func TestLeases(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
testCases := []struct {
ID string
Cause error
}{
{
ID: "tx1",
},
{
ID: "tx1",
Cause: errdefs.ErrAlreadyExists,
},
{
ID: "tx2",
},
}
var leases []Lease
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
lease, err := NewLeaseManager(tx).Create(ctx, tc.ID, nil)
if err != nil {
if tc.Cause != nil && errors.Cause(err) == tc.Cause {
return nil
}
return err
}
leases = append(leases, lease)
return nil
}); err != nil {
t.Fatal(err)
}
}
var listed []Lease
// List leases, check same
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx, false)
return err
}); err != nil {
t.Fatal(err)
}
if len(listed) != len(leases) {
t.Fatalf("Expected %d lease, got %d", len(leases), len(listed))
}
for i := range listed {
if listed[i].ID != leases[i].ID {
t.Fatalf("Expected lease ID %s, got %s", leases[i].ID, listed[i].ID)
}
if listed[i].CreatedAt != leases[i].CreatedAt {
t.Fatalf("Expected lease created at time %s, got %s", leases[i].CreatedAt, listed[i].CreatedAt)
}
}
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).Delete(ctx, tc.ID)
}); err != nil {
t.Fatal(err)
}
}
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx, false)
return err
}); err != nil {
t.Fatal(err)
}
if len(listed) > 0 {
t.Fatalf("Expected no leases, found %d: %v", len(listed), listed)
}
}

View File

@ -326,6 +326,10 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return err
}
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
if readonly {

View File

@ -16,6 +16,7 @@ import (
eventsapi "github.com/containerd/containerd/api/services/events/v1"
images "github.com/containerd/containerd/api/services/images/v1"
introspection "github.com/containerd/containerd/api/services/introspection/v1"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
@ -255,6 +256,8 @@ func interceptor(
ctx = log.WithModule(ctx, "events")
case introspection.IntrospectionServer:
ctx = log.WithModule(ctx, "introspection")
case leasesapi.LeasesServer:
ctx = log.WithModule(ctx, "leases")
default:
log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server)
}

115
services/leases/service.go Normal file
View File

@ -0,0 +1,115 @@
package leases
import (
"crypto/rand"
"encoding/base64"
"fmt"
"time"
"google.golang.org/grpc"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "leases",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*metadata.DB)), nil
},
})
}
type service struct {
db *metadata.DB
}
// NewService returns the GRPC metadata server
func NewService(db *metadata.DB) api.LeasesServer {
return &service{
db: db,
}
}
func (s *service) Register(server *grpc.Server) error {
api.RegisterLeasesServer(server, s)
return nil
}
func (s *service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
lid := r.ID
if lid == "" {
lid = generateLeaseID()
}
var trans metadata.Lease
if err := s.db.Update(func(tx *bolt.Tx) error {
var err error
trans, err = metadata.NewLeaseManager(tx).Create(ctx, lid, r.Labels)
return err
}); err != nil {
return nil, err
}
return &api.CreateResponse{
Lease: txToGRPC(trans),
}, nil
}
func (s *service) Delete(ctx context.Context, r *api.DeleteRequest) (*empty.Empty, error) {
if err := s.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).Delete(ctx, r.ID)
}); err != nil {
return nil, err
}
return &empty.Empty{}, nil
}
func (s *service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
var leases []metadata.Lease
if err := s.db.View(func(tx *bolt.Tx) error {
var err error
leases, err = metadata.NewLeaseManager(tx).List(ctx, false, r.Filters...)
return err
}); err != nil {
return nil, err
}
apileases := make([]*api.Lease, len(leases))
for i := range leases {
apileases[i] = txToGRPC(leases[i])
}
return &api.ListResponse{
Leases: apileases,
}, nil
}
func txToGRPC(tx metadata.Lease) *api.Lease {
return &api.Lease{
ID: tx.ID,
Labels: tx.Labels,
CreatedAt: tx.CreatedAt,
// TODO: Snapshots
// TODO: Content
}
}
func generateLeaseID() string {
t := time.Now()
var b [3]byte
// Ignore read failures, just decreases uniqueness
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
}

View File

@ -18,6 +18,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
@ -359,6 +360,14 @@ func (t *task) Resize(ctx context.Context, w, h uint32) error {
}
func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Image, error) {
l, err := t.client.CreateLease(ctx)
if err != nil {
return nil, err
}
defer l.Delete(ctx)
ctx = leases.WithLease(ctx, l.ID())
request := &tasks.CheckpointTaskRequest{
ContainerID: t.id,
}