Add lease expiration to garbage collection
Allow setting an expiration label to have the garbage collector remove an item after the specified time. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
02579c8c3f
commit
c77c89b3d1
13
lease.go
13
lease.go
@ -35,16 +35,22 @@ type Lease struct {
|
||||
}
|
||||
|
||||
// CreateLease creates a new lease
|
||||
// TODO: Add variadic lease opt
|
||||
func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
|
||||
lapi := c.LeasesService()
|
||||
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{})
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.expire": time.Now().Add(24 * 3600 * time.Second).Format(time.RFC3339),
|
||||
}
|
||||
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{labels})
|
||||
if err != nil {
|
||||
return Lease{}, err
|
||||
}
|
||||
|
||||
return Lease{
|
||||
id: resp.Lease.ID,
|
||||
client: c,
|
||||
id: resp.Lease.ID,
|
||||
createdAt: resp.Lease.CreatedAt,
|
||||
labels: labels,
|
||||
client: c,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -60,6 +66,7 @@ func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) {
|
||||
leases[i] = Lease{
|
||||
id: resp.Leases[i].ID,
|
||||
createdAt: resp.Leases[i].CreatedAt,
|
||||
labels: resp.Leases[i].Labels,
|
||||
client: c,
|
||||
}
|
||||
}
|
||||
|
17
leases/lease.go
Normal file
17
leases/lease.go
Normal file
@ -0,0 +1,17 @@
|
||||
package leases
|
||||
|
||||
import "time"
|
||||
|
||||
type LeaseOpt func(*Lease)
|
||||
|
||||
type LeaseManager interface {
|
||||
Create(...LeaseOpt) (Lease, error)
|
||||
Delete(Lease) error
|
||||
List(...string) ([]Lease, error)
|
||||
}
|
||||
|
||||
type Lease struct {
|
||||
ID string
|
||||
CreatedAt time.Time
|
||||
Labels map[string]string
|
||||
}
|
@ -21,6 +21,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/gc"
|
||||
@ -39,12 +40,15 @@ const (
|
||||
ResourceContainer
|
||||
// ResourceTask specifies a task resource
|
||||
ResourceTask
|
||||
// ResourceLease specifies a lease
|
||||
ResourceLease
|
||||
)
|
||||
|
||||
var (
|
||||
labelGCRoot = []byte("containerd.io/gc.root")
|
||||
labelGCSnapRef = []byte("containerd.io/gc.ref.snapshot.")
|
||||
labelGCContentRef = []byte("containerd.io/gc.ref.content")
|
||||
labelGCExpire = []byte("containerd.io/gc.expire")
|
||||
)
|
||||
|
||||
func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
@ -53,6 +57,8 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
expThreshold := time.Now()
|
||||
|
||||
// iterate through each namespace
|
||||
v1c := v1bkt.Cursor()
|
||||
|
||||
@ -71,6 +77,30 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
|
||||
}
|
||||
libkt := lbkt.Bucket(k)
|
||||
|
||||
if lblbkt := libkt.Bucket(bucketKeyObjectLabels); lblbkt != nil {
|
||||
if expV := lblbkt.Get(labelGCExpire); expV != nil {
|
||||
exp, err := time.Parse(time.RFC3339, string(expV))
|
||||
if err != nil {
|
||||
// label not used, log and continue to use lease
|
||||
log.G(ctx).WithError(err).WithField("lease", string(k)).Infof("ignoring invalid expiration value %q", string(expV))
|
||||
} else if expThreshold.After(exp) {
|
||||
// lease has expired, skip
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case nc <- gcnode(ResourceLease, ns, string(k)):
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Emit content and snapshots as roots instead of implementing
|
||||
// in references. Since leases cannot be referenced there is
|
||||
// no need to allow the lookup to be recursive, handling here
|
||||
// therefore reduces the number of database seeks.
|
||||
|
||||
cbkt := libkt.Bucket(bucketKeyObjectContent)
|
||||
if cbkt != nil {
|
||||
if err := cbkt.ForEach(func(k, v []byte) error {
|
||||
@ -261,6 +291,18 @@ func scanAll(ctx context.Context, tx *bolt.Tx, fn func(ctx context.Context, n gc
|
||||
nbkt := v1bkt.Bucket(k)
|
||||
ns := string(k)
|
||||
|
||||
lbkt := nbkt.Bucket(bucketKeyObjectLeases)
|
||||
if lbkt != nil {
|
||||
if err := lbkt.ForEach(func(k, v []byte) error {
|
||||
if v != nil {
|
||||
return nil
|
||||
}
|
||||
return fn(ctx, gcnode(ResourceLease, ns, string(k)))
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
sbkt := nbkt.Bucket(bucketKeyObjectSnapshots)
|
||||
if sbkt != nil {
|
||||
if err := sbkt.ForEach(func(sk, sv []byte) error {
|
||||
@ -334,6 +376,11 @@ func remove(ctx context.Context, tx *bolt.Tx, node gc.Node) error {
|
||||
return ssbkt.DeleteBucket([]byte(parts[1]))
|
||||
}
|
||||
}
|
||||
case ResourceLease:
|
||||
lbkt := nsbkt.Bucket(bucketKeyObjectLeases)
|
||||
if lbkt != nil {
|
||||
return lbkt.DeleteBucket([]byte(node.Key))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/gc"
|
||||
@ -63,6 +64,12 @@ func TestGCRoots(t *testing.T) {
|
||||
addLeaseSnapshot("ns2", "l2", "overlay", "sn6"),
|
||||
addLeaseContent("ns2", "l1", dgst(4)),
|
||||
addLeaseContent("ns2", "l2", dgst(5)),
|
||||
addLease("ns2", "l3", labelmap(string(labelGCExpire), time.Now().Add(3600*time.Second).Format(time.RFC3339))),
|
||||
addLeaseContent("ns2", "l3", dgst(6)),
|
||||
addLeaseSnapshot("ns2", "l3", "overlay", "sn7"),
|
||||
addLease("ns2", "l4", labelmap(string(labelGCExpire), time.Now().Format(time.RFC3339))),
|
||||
addLeaseContent("ns2", "l4", dgst(7)),
|
||||
addLeaseSnapshot("ns2", "l4", "overlay", "sn8"),
|
||||
}
|
||||
|
||||
expected := []gc.Node{
|
||||
@ -71,6 +78,7 @@ func TestGCRoots(t *testing.T) {
|
||||
gcnode(ResourceContent, "ns2", dgst(2).String()),
|
||||
gcnode(ResourceContent, "ns2", dgst(4).String()),
|
||||
gcnode(ResourceContent, "ns2", dgst(5).String()),
|
||||
gcnode(ResourceContent, "ns2", dgst(6).String()),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn4"),
|
||||
@ -81,6 +89,10 @@ func TestGCRoots(t *testing.T) {
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn9"),
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn5"),
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn6"),
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn7"),
|
||||
gcnode(ResourceLease, "ns2", "l1"),
|
||||
gcnode(ResourceLease, "ns2", "l2"),
|
||||
gcnode(ResourceLease, "ns2", "l3"),
|
||||
}
|
||||
|
||||
if err := db.Update(func(tx *bolt.Tx) error {
|
||||
@ -126,6 +138,8 @@ func TestGCRemove(t *testing.T) {
|
||||
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
|
||||
addSnapshot("ns1", "overlay", "sn4", "", nil),
|
||||
addSnapshot("ns2", "overlay", "sn1", "", nil),
|
||||
addLease("ns1", "l1", labelmap(string(labelGCExpire), time.Now().Add(3600*time.Second).Format(time.RFC3339))),
|
||||
addLease("ns2", "l2", labelmap(string(labelGCExpire), time.Now().Format(time.RFC3339))),
|
||||
}
|
||||
|
||||
all := []gc.Node{
|
||||
@ -139,6 +153,8 @@ func TestGCRemove(t *testing.T) {
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
|
||||
gcnode(ResourceSnapshot, "ns1", "overlay/sn4"),
|
||||
gcnode(ResourceSnapshot, "ns2", "overlay/sn1"),
|
||||
gcnode(ResourceLease, "ns1", "l1"),
|
||||
gcnode(ResourceLease, "ns2", "l2"),
|
||||
}
|
||||
|
||||
var deleted, remaining []gc.Node
|
||||
@ -425,6 +441,16 @@ func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFu
|
||||
}
|
||||
}
|
||||
|
||||
func addLease(ns, lid string, labels map[string]string) alterFunc {
|
||||
return func(bkt *bolt.Bucket) error {
|
||||
lbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return boltutil.WriteLabels(lbkt, labels)
|
||||
}
|
||||
}
|
||||
|
||||
func addLeaseSnapshot(ns, lid, snapshotter, name string) alterFunc {
|
||||
return func(bkt *bolt.Bucket) error {
|
||||
sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectSnapshots), snapshotter)
|
||||
|
Loading…
Reference in New Issue
Block a user