Merge pull request #2474 from dmcgowan/lease-expiration

Improved lease management
This commit is contained in:
Michael Crosby 2018-07-20 16:54:17 -04:00 committed by GitHub
commit 0d52c71c80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 765 additions and 199 deletions

View File

@ -43,6 +43,8 @@ import (
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
leasesproxy "github.com/containerd/containerd/leases/proxy"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/pkg/dialer"
"github.com/containerd/containerd/plugin"
@ -512,11 +514,11 @@ func (c *Client) IntrospectionService() introspectionapi.IntrospectionClient {
}
// LeasesService returns the underlying Leases Client
func (c *Client) LeasesService() leasesapi.LeasesClient {
func (c *Client) LeasesService() leases.Manager {
if c.leasesService != nil {
return c.leasesService
}
return leasesapi.NewLeasesClient(c.conn)
return leasesproxy.NewLeaseManager(leasesapi.NewLeasesClient(c.conn))
}
// HealthService returns the underlying GRPC HealthClient

View File

@ -24,6 +24,7 @@ import (
"github.com/containerd/containerd/cmd/ctr/commands/content"
"github.com/containerd/containerd/cmd/ctr/commands/events"
"github.com/containerd/containerd/cmd/ctr/commands/images"
"github.com/containerd/containerd/cmd/ctr/commands/leases"
namespacesCmd "github.com/containerd/containerd/cmd/ctr/commands/namespaces"
"github.com/containerd/containerd/cmd/ctr/commands/plugins"
"github.com/containerd/containerd/cmd/ctr/commands/pprof"
@ -96,6 +97,7 @@ containerd CLI
content.Command,
events.Command,
images.Command,
leases.Command,
namespacesCmd.Command,
pprof.Command,
run.Command,

View File

@ -0,0 +1,190 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leases
import (
"fmt"
"os"
"sort"
"strings"
"text/tabwriter"
"time"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/leases"
"github.com/pkg/errors"
"github.com/urfave/cli"
)
// Command is the cli command for managing content
var Command = cli.Command{
Name: "leases",
Usage: "manage leases",
Subcommands: cli.Commands{
listCommand,
createCommand,
deleteCommand,
},
}
var listCommand = cli.Command{
Name: "list",
Aliases: []string{"ls"},
Usage: "list all active leases",
ArgsUsage: "[flags] <filter>",
Description: "list active leases by containerd",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "quiet, q",
Usage: "print only the blob digest",
},
},
Action: func(context *cli.Context) error {
var (
filters = context.Args()
quiet = context.Bool("quiet")
)
client, ctx, cancel, err := commands.NewClient(context)
if err != nil {
return err
}
defer cancel()
ls := client.LeasesService()
leaseList, err := ls.List(ctx, filters...)
if err != nil {
return errors.Wrap(err, "failed to list leases")
}
if quiet {
for _, l := range leaseList {
fmt.Println(l.ID)
}
return nil
}
tw := tabwriter.NewWriter(os.Stdout, 1, 8, 1, ' ', 0)
fmt.Fprintln(tw, "ID\tCREATED AT\tLABELS\t")
for _, l := range leaseList {
labels := "-"
if len(l.Labels) > 0 {
var pairs []string
for k, v := range l.Labels {
pairs = append(pairs, fmt.Sprintf("%v=%v", k, v))
}
sort.Strings(pairs)
labels = strings.Join(pairs, ",")
}
fmt.Fprintf(tw, "%v\t%v\t%s\t\n",
l.ID,
l.CreatedAt.Local().Format(time.RFC3339),
labels)
}
return tw.Flush()
},
}
var createCommand = cli.Command{
Name: "create",
Usage: "create lease",
ArgsUsage: "[flags] <label>=<value> ...",
Description: "create a new lease",
Flags: []cli.Flag{
cli.StringFlag{
Name: "id",
Usage: "set the id for the lease, will be generated by default",
},
cli.DurationFlag{
Name: "expires, x",
Usage: "expiration of lease (0 value will not expire)",
Value: 24 * 3600 * time.Second,
},
},
Action: func(context *cli.Context) error {
var labelstr = context.Args()
client, ctx, cancel, err := commands.NewClient(context)
if err != nil {
return err
}
defer cancel()
ls := client.LeasesService()
opts := []leases.Opt{}
if len(labelstr) > 0 {
labels := map[string]string{}
for _, lstr := range labelstr {
l := strings.SplitN(lstr, "=", 2)
if len(l) == 1 {
labels[l[0]] = ""
} else {
labels[l[0]] = l[1]
}
}
opts = append(opts, leases.WithLabels(labels))
}
if id := context.String("id"); id != "" {
opts = append(opts, leases.WithID(id))
}
if exp := context.Duration("expires"); exp > 0 {
opts = append(opts, leases.WithExpiration(exp))
}
l, err := ls.Create(ctx, opts...)
if err != nil {
return err
}
fmt.Println(l.ID)
return nil
},
}
var deleteCommand = cli.Command{
Name: "delete",
Aliases: []string{"rm"},
Usage: "delete a lease",
ArgsUsage: "[flags] <lease id> ...",
Description: "delete a lease",
Action: func(context *cli.Context) error {
var lids = context.Args()
if len(lids) == 0 {
return cli.ShowSubcommandHelp(context)
}
client, ctx, cancel, err := commands.NewClient(context)
if err != nil {
return err
}
defer cancel()
ls := client.LeasesService()
for _, lid := range lids {
lease := leases.Lease{
ID: lid,
}
if err := ls.Delete(ctx, lease); err != nil {
return err
}
fmt.Println(lid)
}
return nil
},
}

View File

@ -20,89 +20,27 @@ import (
"context"
"time"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/leases"
)
// Lease is used to hold a reference to active resources which have not been
// referenced by a root resource. This is useful for preventing garbage
// collection of resources while they are actively being updated.
type Lease struct {
id string
createdAt time.Time
client *Client
}
// CreateLease creates a new lease
func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
lapi := c.LeasesService()
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{})
if err != nil {
return Lease{}, err
}
return Lease{
id: resp.Lease.ID,
client: c,
}, nil
}
// ListLeases lists active leases
func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) {
lapi := c.LeasesService()
resp, err := lapi.List(ctx, &leasesapi.ListRequest{})
if err != nil {
return nil, err
}
leases := make([]Lease, len(resp.Leases))
for i := range resp.Leases {
leases[i] = Lease{
id: resp.Leases[i].ID,
createdAt: resp.Leases[i].CreatedAt,
client: c,
}
}
return leases, nil
}
// WithLease attaches a lease on the context
func (c *Client) WithLease(ctx context.Context) (context.Context, func(context.Context) error, error) {
_, ok := leases.Lease(ctx)
_, ok := leases.FromContext(ctx)
if ok {
return ctx, func(context.Context) error {
return nil
}, nil
}
l, err := c.CreateLease(ctx)
ls := c.LeasesService()
l, err := ls.Create(ctx, leases.WithRandomID(), leases.WithExpiration(24*3600*time.Second))
if err != nil {
return nil, nil, err
}
ctx = leases.WithLease(ctx, l.ID())
ctx = leases.WithLease(ctx, l.ID)
return ctx, func(ctx context.Context) error {
return l.Delete(ctx)
return ls.Delete(ctx, l)
}, nil
}
// ID returns the lease ID
func (l Lease) ID() string {
return l.id
}
// CreatedAt returns the time at which the lease was created
func (l Lease) CreatedAt() time.Time {
return l.createdAt
}
// Delete deletes the lease, removing the reference to all resources created
// during the lease.
func (l Lease) Delete(ctx context.Context) error {
lapi := l.client.LeasesService()
_, err := lapi.Delete(ctx, &leasesapi.DeleteRequest{
ID: l.id,
})
return err
}

View File

@ -29,8 +29,8 @@ func WithLease(ctx context.Context, lid string) context.Context {
return withGRPCLeaseHeader(ctx, lid)
}
// Lease returns the lease from the context.
func Lease(ctx context.Context) (string, bool) {
// FromContext returns the lease from the context.
func FromContext(ctx context.Context) (string, bool) {
lid, ok := ctx.Value(leaseKey{}).(string)
if !ok {
return fromGRPCHeader(ctx)

43
leases/id.go Normal file
View File

@ -0,0 +1,43 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leases
import (
"encoding/base64"
"fmt"
"math/rand"
"time"
)
// WithRandomID sets the lease ID to a random unique value
func WithRandomID() Opt {
return func(l *Lease) error {
t := time.Now()
var b [3]byte
rand.Read(b[:])
l.ID = fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
return nil
}
}
// WithID sets the ID for the lease
func WithID(id string) Opt {
return func(l *Lease) error {
l.ID = id
return nil
}
}

60
leases/lease.go Normal file
View File

@ -0,0 +1,60 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package leases
import (
"context"
"time"
)
// Opt is used to set options on a lease
type Opt func(*Lease) error
// Manager is used to create, list, and remove leases
type Manager interface {
Create(context.Context, ...Opt) (Lease, error)
Delete(context.Context, Lease) error
List(context.Context, ...string) ([]Lease, error)
}
// Lease retains resources to prevent cleanup before
// the resources can be fully referenced.
type Lease struct {
ID string
CreatedAt time.Time
Labels map[string]string
}
// WithLabels sets labels on a lease
func WithLabels(labels map[string]string) Opt {
return func(l *Lease) error {
l.Labels = labels
return nil
}
}
// WithExpiration sets an expiration on the lease
func WithExpiration(d time.Duration) Opt {
return func(l *Lease) error {
if l.Labels == nil {
l.Labels = map[string]string{}
}
l.Labels["containerd.io/gc.expire"] = time.Now().Add(d).Format(time.RFC3339)
return nil
}
}

84
leases/proxy/manager.go Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"context"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/leases"
)
type proxyManager struct {
client leasesapi.LeasesClient
}
// NewLeaseManager returns a lease manager which communicates
// through a grpc lease service.
func NewLeaseManager(client leasesapi.LeasesClient) leases.Manager {
return &proxyManager{
client: client,
}
}
func (pm *proxyManager) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
l := leases.Lease{}
for _, opt := range opts {
if err := opt(&l); err != nil {
return leases.Lease{}, err
}
}
resp, err := pm.client.Create(ctx, &leasesapi.CreateRequest{
ID: l.ID,
Labels: l.Labels,
})
if err != nil {
return leases.Lease{}, err
}
return leases.Lease{
ID: resp.Lease.ID,
CreatedAt: resp.Lease.CreatedAt,
Labels: resp.Lease.Labels,
}, nil
}
func (pm *proxyManager) Delete(ctx context.Context, l leases.Lease) error {
_, err := pm.client.Delete(ctx, &leasesapi.DeleteRequest{
ID: l.ID,
})
return err
}
func (pm *proxyManager) List(ctx context.Context, filters ...string) ([]leases.Lease, error) {
resp, err := pm.client.List(ctx, &leasesapi.ListRequest{
Filters: filters,
})
if err != nil {
return nil, err
}
l := make([]leases.Lease, len(resp.Leases))
for i := range resp.Leases {
l[i] = leases.Lease{
ID: resp.Leases[i].ID,
CreatedAt: resp.Leases[i].CreatedAt,
Labels: resp.Leases[i].Labels,
}
}
return l, nil
}

View File

@ -23,6 +23,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
)
func adaptImage(o interface{}) filters.Adaptor {
@ -119,6 +120,23 @@ func adaptContentStatus(status content.Status) filters.Adaptor {
})
}
func adaptLease(lease leases.Lease) filters.Adaptor {
return filters.AdapterFunc(func(fieldpath []string) (string, bool) {
if len(fieldpath) == 0 {
return "", false
}
switch fieldpath[0] {
case "id":
return lease.ID, len(lease.ID) > 0
case "labels":
return checkMap(fieldpath[1:], lease.Labels)
}
return "", false
})
}
func checkMap(fieldpath []string, m map[string]string) (string, bool) {
if len(m) == 0 {
return "", false

View File

@ -109,14 +109,16 @@ func TestContentLeased(t *testing.T) {
func createLease(ctx context.Context, db *DB, name string) (context.Context, func() error, error) {
if err := db.Update(func(tx *bolt.Tx) error {
_, err := NewLeaseManager(tx).Create(ctx, name, nil)
_, err := NewLeaseManager(tx).Create(ctx, leases.WithID(name))
return err
}); err != nil {
return nil, nil, err
}
return leases.WithLease(ctx, name), func() error {
return db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).Delete(ctx, name)
return NewLeaseManager(tx).Delete(ctx, leases.Lease{
ID: name,
})
})
}, nil
}
@ -126,7 +128,7 @@ func checkContentLeased(ctx context.Context, db *DB, dgst digest.Digest) error {
if !ok {
return errors.New("no namespace in context")
}
lease, ok := leases.Lease(ctx)
lease, ok := leases.FromContext(ctx)
if !ok {
return errors.New("no lease in context")
}

View File

@ -21,6 +21,7 @@ import (
"context"
"fmt"
"strings"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/gc"
@ -39,12 +40,15 @@ const (
ResourceContainer
// ResourceTask specifies a task resource
ResourceTask
// ResourceLease specifies a lease
ResourceLease
)
var (
labelGCRoot = []byte("containerd.io/gc.root")
labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.")
labelGCContentRef = []byte("containerd.io/gc.ref.content")
labelGCExpire = []byte("containerd.io/gc.expire")
)
func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
@ -53,6 +57,8 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
return nil
}
expThreshold := time.Now()
// iterate through each namespace
v1c := v1bkt.Cursor()
@ -71,6 +77,30 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
}
libkt := lbkt.Bucket(k)
if lblbkt := libkt.Bucket(bucketKeyObjectLabels); lblbkt != nil {
if expV := lblbkt.Get(labelGCExpire); expV != nil {
exp, err := time.Parse(time.RFC3339, string(expV))
if err != nil {
// label not used, log and continue to use lease
log.G(ctx).WithError(err).WithField("lease", string(k)).Infof("ignoring invalid expiration value %q", string(expV))
} else if expThreshold.After(exp) {
// lease has expired, skip
return nil
}
}
}
select {
case nc <- gcnode(ResourceLease, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
// Emit content and snapshots as roots instead of implementing
// in references. Since leases cannot be referenced there is
// no need to allow the lookup to be recursive, handling here
// therefore reduces the number of database seeks.
cbkt := libkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
@ -261,6 +291,18 @@ func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc
nbkt := v1bkt.Bucket(k)
ns := string(k)
lbkt := nbkt.Bucket(bucketKeyObjectLeases)
if lbkt != nil {
if err := lbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
return fn(ctx, gcnode(ResourceLease, ns, string(k)))
}); err != nil {
return err
}
}
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
@ -334,6 +376,11 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
return ssbkt.DeleteBucket([]byte(parts[1]))
}
}
case ResourceLease:
lbkt := nsbkt.Bucket(bucketKeyObjectLeases)
if lbkt != nil {
return lbkt.DeleteBucket([]byte(node.Key))
}
}
return nil

View File

@ -25,6 +25,7 @@ import (
"path/filepath"
"sort"
"testing"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/gc"
@ -63,6 +64,12 @@ func TestGCRoots(t *testing.T) {
addLeaseSnapshot("ns2", "l2", "overlay", "sn6"),
addLeaseContent("ns2", "l1", dgst(4)),
addLeaseContent("ns2", "l2", dgst(5)),
addLease("ns2", "l3", labelmap(string(labelGCExpire), time.Now().Add(3600*time.Second).Format(time.RFC3339))),
addLeaseContent("ns2", "l3", dgst(6)),
addLeaseSnapshot("ns2", "l3", "overlay", "sn7"),
addLease("ns2", "l4", labelmap(string(labelGCExpire), time.Now().Format(time.RFC3339))),
addLeaseContent("ns2", "l4", dgst(7)),
addLeaseSnapshot("ns2", "l4", "overlay", "sn8"),
}
expected := []gc.Node{
@ -71,6 +78,7 @@ func TestGCRoots(t *testing.T) {
gcnode(ResourceContent, "ns2", dgst(2).String()),
gcnode(ResourceContent, "ns2", dgst(4).String()),
gcnode(ResourceContent, "ns2", dgst(5).String()),
gcnode(ResourceContent, "ns2", dgst(6).String()),
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn4"),
@ -81,6 +89,10 @@ func TestGCRoots(t *testing.T) {
gcnode(ResourceSnapshot, "ns1", "overlay/sn9"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn5"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn6"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn7"),
gcnode(ResourceLease, "ns2", "l1"),
gcnode(ResourceLease, "ns2", "l2"),
gcnode(ResourceLease, "ns2", "l3"),
}
if err := db.Update(func(tx *bolt.Tx) error {
@ -126,6 +138,8 @@ func TestGCRemove(t *testing.T) {
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
addSnapshot("ns1", "overlay", "sn4", "", nil),
addSnapshot("ns2", "overlay", "sn1", "", nil),
addLease("ns1", "l1", labelmap(string(labelGCExpire), time.Now().Add(3600*time.Second).Format(time.RFC3339))),
addLease("ns2", "l2", labelmap(string(labelGCExpire), time.Now().Format(time.RFC3339))),
}
all := []gc.Node{
@ -139,6 +153,8 @@ func TestGCRemove(t *testing.T) {
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn4"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
gcnode(ResourceLease, "ns1", "l1"),
gcnode(ResourceLease, "ns2", "l2"),
}
var deleted, remaining []gc.Node
@ -425,6 +441,16 @@ func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFu
}
}
func addLease(ns, lid string, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
lbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid)
if err != nil {
return err
}
return boltutil.WriteLabels(lbkt, labels)
}
}
func addLeaseSnapshot(ns, lid, snapshotter, name string) alterFunc {
return func(bkt *bolt.Bucket) error {
sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectSnapshots), snapshotter)

View File

@ -22,6 +22,7 @@ import (
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
@ -29,17 +30,6 @@ import (
"github.com/pkg/errors"
)
// Lease retains resources to prevent garbage collection before
// the resources can be fully referenced.
type Lease struct {
ID string
CreatedAt time.Time
Labels map[string]string
Content []string
Snapshots map[string][]string
}
// LeaseManager manages the create/delete lifecyle of leases
// and also returns existing leases
type LeaseManager struct {
@ -55,49 +45,56 @@ func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
}
// Create creates a new lease using the provided lease
func (lm *LeaseManager) Create(ctx context.Context, lid string, labels map[string]string) (Lease, error) {
func (lm *LeaseManager) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
var l leases.Lease
for _, opt := range opts {
if err := opt(&l); err != nil {
return leases.Lease{}, err
}
}
if l.ID == "" {
return leases.Lease{}, errors.New("lease id must be provided")
}
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return Lease{}, err
return leases.Lease{}, err
}
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return Lease{}, err
return leases.Lease{}, err
}
txbkt, err := topbkt.CreateBucket([]byte(lid))
txbkt, err := topbkt.CreateBucket([]byte(l.ID))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
}
return Lease{}, errors.Wrapf(err, "lease %q", lid)
return leases.Lease{}, errors.Wrapf(err, "lease %q", l.ID)
}
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return Lease{}, err
return leases.Lease{}, err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return Lease{}, err
return leases.Lease{}, err
}
if labels != nil {
if err := boltutil.WriteLabels(txbkt, labels); err != nil {
return Lease{}, err
if l.Labels != nil {
if err := boltutil.WriteLabels(txbkt, l.Labels); err != nil {
return leases.Lease{}, err
}
}
l.CreatedAt = t
return Lease{
ID: lid,
CreatedAt: t,
Labels: labels,
}, nil
return l, nil
}
// Delete delets the lease with the provided lease ID
func (lm *LeaseManager) Delete(ctx context.Context, lid string) error {
func (lm *LeaseManager) Delete(ctx context.Context, lease leases.Lease) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
@ -107,24 +104,29 @@ func (lm *LeaseManager) Delete(ctx context.Context, lid string) error {
if topbkt == nil {
return nil
}
if err := topbkt.DeleteBucket([]byte(lid)); err != nil && err != bolt.ErrBucketNotFound {
if err := topbkt.DeleteBucket([]byte(lease.ID)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil
}
// List lists all active leases
func (lm *LeaseManager) List(ctx context.Context, includeResources bool, filter ...string) ([]Lease, error) {
func (lm *LeaseManager) List(ctx context.Context, fs ...string) ([]leases.Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var leases []Lease
filter, err := filters.ParseAll(fs...)
if err != nil {
return nil, errors.Wrapf(errdefs.ErrInvalidArgument, err.Error())
}
var ll []leases.Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return leases, nil
return ll, nil
}
if err := topbkt.ForEach(func(k, v []byte) error {
@ -133,7 +135,7 @@ func (lm *LeaseManager) List(ctx context.Context, includeResources bool, filter
}
txbkt := topbkt.Bucket(k)
l := Lease{
l := leases.Lease{
ID: string(k),
}
@ -150,21 +152,20 @@ func (lm *LeaseManager) List(ctx context.Context, includeResources bool, filter
}
l.Labels = labels
// TODO: Read Snapshots
// TODO: Read Content
leases = append(leases, l)
if filter.Match(adaptLease(l)) {
ll = append(ll, l)
}
return nil
}); err != nil {
return nil, err
}
return leases, nil
return ll, nil
}
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.Lease(ctx)
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
@ -193,7 +194,7 @@ func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string)
}
func removeSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.Lease(ctx)
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
@ -213,7 +214,7 @@ func removeSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key stri
}
func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.Lease(ctx)
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}
@ -237,7 +238,7 @@ func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error
}
func removeContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.Lease(ctx)
lid, ok := leases.FromContext(ctx)
if !ok {
return nil
}

View File

@ -21,6 +21,7 @@ import (
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/pkg/errors"
)
@ -44,49 +45,51 @@ func TestLeases(t *testing.T) {
},
}
var leases []Lease
var ll []leases.Lease
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
lease, err := NewLeaseManager(tx).Create(ctx, tc.ID, nil)
lease, err := NewLeaseManager(tx).Create(ctx, leases.WithID(tc.ID))
if err != nil {
if tc.Cause != nil && errors.Cause(err) == tc.Cause {
return nil
}
return err
}
leases = append(leases, lease)
ll = append(ll, lease)
return nil
}); err != nil {
t.Fatal(err)
}
}
var listed []Lease
var listed []leases.Lease
// List leases, check same
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx, false)
listed, err = NewLeaseManager(tx).List(ctx)
return err
}); err != nil {
t.Fatal(err)
}
if len(listed) != len(leases) {
t.Fatalf("Expected %d lease, got %d", len(leases), len(listed))
if len(listed) != len(ll) {
t.Fatalf("Expected %d lease, got %d", len(ll), len(listed))
}
for i := range listed {
if listed[i].ID != leases[i].ID {
t.Fatalf("Expected lease ID %s, got %s", leases[i].ID, listed[i].ID)
if listed[i].ID != ll[i].ID {
t.Fatalf("Expected lease ID %s, got %s", ll[i].ID, listed[i].ID)
}
if listed[i].CreatedAt != leases[i].CreatedAt {
t.Fatalf("Expected lease created at time %s, got %s", leases[i].CreatedAt, listed[i].CreatedAt)
if listed[i].CreatedAt != ll[i].CreatedAt {
t.Fatalf("Expected lease created at time %s, got %s", ll[i].CreatedAt, listed[i].CreatedAt)
}
}
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).Delete(ctx, tc.ID)
return NewLeaseManager(tx).Delete(ctx, leases.Lease{
ID: tc.ID,
})
}); err != nil {
t.Fatal(err)
}
@ -94,7 +97,7 @@ func TestLeases(t *testing.T) {
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx, false)
listed, err = NewLeaseManager(tx).List(ctx)
return err
}); err != nil {
t.Fatal(err)
@ -104,3 +107,153 @@ func TestLeases(t *testing.T) {
t.Fatalf("Expected no leases, found %d: %v", len(listed), listed)
}
}
func TestLeasesList(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
testset := [][]leases.Opt{
{
leases.WithID("lease1"),
leases.WithLabels(map[string]string{
"label1": "value1",
"label3": "other",
}),
},
{
leases.WithID("lease2"),
leases.WithLabels(map[string]string{
"label1": "value1",
"label2": "",
"label3": "other",
}),
},
{
leases.WithID("lease3"),
leases.WithLabels(map[string]string{
"label1": "value2",
"label2": "something",
}),
},
}
// Insert all
for _, opts := range testset {
if err := db.Update(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
_, err := lm.Create(ctx, opts...)
return err
}); err != nil {
t.Fatal(err)
}
}
for _, testcase := range []struct {
name string
filters []string
expected []string
}{
{
name: "All",
filters: []string{},
expected: []string{"lease1", "lease2", "lease3"},
},
{
name: "ID",
filters: []string{"id==lease1"},
expected: []string{"lease1"},
},
{
name: "IDx2",
filters: []string{"id==lease1", "id==lease2"},
expected: []string{"lease1", "lease2"},
},
{
name: "Label1",
filters: []string{"labels.label1"},
expected: []string{"lease1", "lease2", "lease3"},
},
{
name: "Label1value1",
filters: []string{"labels.label1==value1"},
expected: []string{"lease1", "lease2"},
},
{
name: "Label1value2",
filters: []string{"labels.label1==value2"},
expected: []string{"lease3"},
},
{
name: "Label2",
filters: []string{"labels.label2"},
expected: []string{"lease3"},
},
{
name: "Label3",
filters: []string{"labels.label2", "labels.label3"},
expected: []string{"lease1", "lease2", "lease3"},
},
} {
t.Run(testcase.name, func(t *testing.T) {
if err := db.View(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
results, err := lm.List(ctx, testcase.filters...)
if err != nil {
return err
}
if len(results) != len(testcase.expected) {
t.Errorf("length of result does not match expected: %v != %v", len(results), len(testcase.expected))
}
expectedMap := map[string]struct{}{}
for _, expected := range testcase.expected {
expectedMap[expected] = struct{}{}
}
for _, result := range results {
if _, ok := expectedMap[result.ID]; !ok {
t.Errorf("unexpected match: %v", result.ID)
} else {
delete(expectedMap, result.ID)
}
}
if len(expectedMap) > 0 {
for match := range expectedMap {
t.Errorf("missing match: %v", match)
}
}
return nil
}); err != nil {
t.Fatal(err)
}
})
}
// delete everything to test it
for _, opts := range testset {
var lease leases.Lease
for _, opt := range opts {
if err := opt(&lease); err != nil {
t.Fatal(err)
}
}
if err := db.Update(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
return lm.Delete(ctx, lease)
}); err != nil {
t.Fatal(err)
}
// try it again, get nil
if err := db.Update(func(tx *bolt.Tx) error {
lm := NewLeaseManager(tx)
return lm.Delete(ctx, lease)
}); err != nil {
t.Fatalf("unexpected error %v", err)
}
}
}

View File

@ -25,10 +25,10 @@ import (
containers "github.com/containerd/containerd/api/services/containers/v1"
diff "github.com/containerd/containerd/api/services/diff/v1"
images "github.com/containerd/containerd/api/services/images/v1"
leases "github.com/containerd/containerd/api/services/leases/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/runtime/restart"
@ -120,7 +120,7 @@ func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
return containerd.WithNamespaceService(s.(namespacesapi.NamespacesClient))
},
services.LeasesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithLeasesService(s.(leases.LeasesClient))
return containerd.WithLeasesService(s.(leases.Manager))
},
} {
p := plugins[s]

View File

@ -20,12 +20,12 @@ import (
containersapi "github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/api/services/diff/v1"
imagesapi "github.com/containerd/containerd/api/services/images/v1"
"github.com/containerd/containerd/api/services/leases/v1"
namespacesapi "github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/snapshots"
)
@ -39,7 +39,7 @@ type services struct {
taskService tasks.TasksClient
diffService DiffService
eventService EventService
leasesService leases.LeasesClient
leasesService leases.Manager
}
// ServicesOpt allows callers to set options on the services
@ -105,7 +105,7 @@ func WithNamespaceService(namespaceService namespacesapi.NamespacesClient) Servi
}
// WithLeasesService sets the lease service.
func WithLeasesService(leasesService leases.LeasesClient) ServicesOpt {
func WithLeasesService(leasesService leases.Manager) ServicesOpt {
return func(s *services) {
s.leasesService = leasesService
}

View File

@ -18,19 +18,12 @@ package leases
import (
"context"
"encoding/base64"
"fmt"
"math/rand"
"time"
"google.golang.org/grpc"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
)
func init() {
@ -54,67 +47,32 @@ type local struct {
db *metadata.DB
}
func (l *local) Create(ctx context.Context, r *api.CreateRequest, _ ...grpc.CallOption) (*api.CreateResponse, error) {
lid := r.ID
if lid == "" {
lid = generateLeaseID()
}
var trans metadata.Lease
func (l *local) Create(ctx context.Context, opts ...leases.Opt) (leases.Lease, error) {
var lease leases.Lease
if err := l.db.Update(func(tx *bolt.Tx) error {
var err error
trans, err = metadata.NewLeaseManager(tx).Create(ctx, lid, r.Labels)
lease, err = metadata.NewLeaseManager(tx).Create(ctx, opts...)
return err
}); err != nil {
return nil, err
return leases.Lease{}, err
}
return &api.CreateResponse{
Lease: txToGRPC(trans),
}, nil
return lease, nil
}
func (l *local) Delete(ctx context.Context, r *api.DeleteRequest, _ ...grpc.CallOption) (*ptypes.Empty, error) {
if err := l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).Delete(ctx, r.ID)
}); err != nil {
return nil, err
}
return &ptypes.Empty{}, nil
func (l *local) Delete(ctx context.Context, lease leases.Lease) error {
return l.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).Delete(ctx, lease)
})
}
func (l *local) List(ctx context.Context, r *api.ListRequest, _ ...grpc.CallOption) (*api.ListResponse, error) {
var leases []metadata.Lease
func (l *local) List(ctx context.Context, filters ...string) ([]leases.Lease, error) {
var ll []leases.Lease
if err := l.db.View(func(tx *bolt.Tx) error {
var err error
leases, err = metadata.NewLeaseManager(tx).List(ctx, false, r.Filters...)
ll, err = metadata.NewLeaseManager(tx).List(ctx, filters...)
return err
}); err != nil {
return nil, err
}
apileases := make([]*api.Lease, len(leases))
for i := range leases {
apileases[i] = txToGRPC(leases[i])
}
return &api.ListResponse{
Leases: apileases,
}, nil
}
func txToGRPC(tx metadata.Lease) *api.Lease {
return &api.Lease{
ID: tx.ID,
Labels: tx.Labels,
CreatedAt: tx.CreatedAt,
// TODO: Snapshots
// TODO: Content
}
}
func generateLeaseID() string {
t := time.Now()
var b [3]byte
// Ignore read failures, just decreases uniqueness
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
return ll, nil
}

View File

@ -22,6 +22,7 @@ import (
"google.golang.org/grpc"
api "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/services"
ptypes "github.com/gogo/protobuf/types"
@ -48,13 +49,13 @@ func init() {
if err != nil {
return nil, err
}
return &service{local: i.(api.LeasesClient)}, nil
return &service{lm: i.(leases.Manager)}, nil
},
})
}
type service struct {
local api.LeasesClient
lm leases.Manager
}
func (s *service) Register(server *grpc.Server) error {
@ -63,13 +64,54 @@ func (s *service) Register(server *grpc.Server) error {
}
func (s *service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
return s.local.Create(ctx, r)
opts := []leases.Opt{
leases.WithLabels(r.Labels),
}
if r.ID == "" {
opts = append(opts, leases.WithRandomID())
} else {
opts = append(opts, leases.WithID(r.ID))
}
l, err := s.lm.Create(ctx, opts...)
if err != nil {
return nil, err
}
return &api.CreateResponse{
Lease: leaseToGRPC(l),
}, nil
}
func (s *service) Delete(ctx context.Context, r *api.DeleteRequest) (*ptypes.Empty, error) {
return s.local.Delete(ctx, r)
if err := s.lm.Delete(ctx, leases.Lease{
ID: r.ID,
}); err != nil {
return nil, err
}
return &ptypes.Empty{}, nil
}
func (s *service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
return s.local.List(ctx, r)
l, err := s.lm.List(ctx, r.Filters...)
if err != nil {
return nil, err
}
apileases := make([]*api.Lease, len(l))
for i := range l {
apileases[i] = leaseToGRPC(l[i])
}
return &api.ListResponse{
Leases: apileases,
}, nil
}
func leaseToGRPC(l leases.Lease) *api.Lease {
return &api.Lease{
ID: l.ID,
Labels: l.Labels,
CreatedAt: l.CreatedAt,
}
}

View File

@ -43,7 +43,7 @@ gotest.tools v2.1.0
github.com/google/go-cmp v0.1.0
# cri dependencies
github.com/containerd/cri v1.11.0
github.com/containerd/cri 661f3b0377db409fe0e5677115f02ce7b89fd17d https://github.com/dmcgowan/cri-containerd
github.com/containerd/go-cni 5882530828ecf62032409b298a3e8b19e08b6534
github.com/blang/semver v3.1.0
github.com/containernetworking/cni v0.6.0

View File

@ -24,10 +24,10 @@ import (
"github.com/containerd/containerd/api/services/containers/v1"
"github.com/containerd/containerd/api/services/diff/v1"
"github.com/containerd/containerd/api/services/images/v1"
"github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/api/services/namespaces/v1"
"github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/plugin"
@ -137,7 +137,7 @@ func getServicesOpts(ic *plugin.InitContext) ([]containerd.ServicesOpt, error) {
return containerd.WithNamespaceService(s.(namespaces.NamespacesClient))
},
services.LeasesService: func(s interface{}) containerd.ServicesOpt {
return containerd.WithLeasesService(s.(leases.LeasesClient))
return containerd.WithLeasesService(s.(leases.Manager))
},
} {
p := plugins[s]

View File

@ -103,7 +103,7 @@ func Import(ctx context.Context, client *containerd.Client, reader io.Reader) (_
defer deferCancel()
if err := done(deferCtx); err != nil {
// Get lease id from context still works after context is done.
leaseID, _ := leases.Lease(ctx)
leaseID, _ := leases.FromContext(ctx)
log.G(ctx).WithError(err).Errorf("Failed to release lease %q", leaseID)
}
}()