Merge pull request #1690 from dmcgowan/metadata-transactions

gc: resource leases
This commit is contained in:
Stephen Day 2017-11-07 16:11:11 -08:00 committed by GitHub
commit 27d450a01b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 2499 additions and 110 deletions

View File

@ -2405,6 +2405,165 @@ file {
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/services/leases/v1/leases.proto"
package: "containerd.services.leases.v1"
dependency: "gogoproto/gogo.proto"
dependency: "google/protobuf/empty.proto"
dependency: "google/protobuf/timestamp.proto"
message_type {
name: "Lease"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "created_at"
number: 2
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".google.protobuf.Timestamp"
options {
65010: 1
65001: 0
}
json_name: "createdAt"
}
field {
name: "labels"
number: 3
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Lease.LabelsEntry"
json_name: "labels"
}
nested_type {
name: "LabelsEntry"
field {
name: "key"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "key"
}
field {
name: "value"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "value"
}
options {
map_entry: true
}
}
}
message_type {
name: "CreateRequest"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
field {
name: "labels"
number: 3
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.CreateRequest.LabelsEntry"
json_name: "labels"
}
nested_type {
name: "LabelsEntry"
field {
name: "key"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "key"
}
field {
name: "value"
number: 2
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "value"
}
options {
map_entry: true
}
}
}
message_type {
name: "CreateResponse"
field {
name: "lease"
number: 1
label: LABEL_OPTIONAL
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Lease"
json_name: "lease"
}
}
message_type {
name: "DeleteRequest"
field {
name: "id"
number: 1
label: LABEL_OPTIONAL
type: TYPE_STRING
json_name: "id"
}
}
message_type {
name: "ListRequest"
field {
name: "filters"
number: 1
label: LABEL_REPEATED
type: TYPE_STRING
json_name: "filters"
}
}
message_type {
name: "ListResponse"
field {
name: "leases"
number: 1
label: LABEL_REPEATED
type: TYPE_MESSAGE
type_name: ".containerd.services.leases.v1.Lease"
json_name: "leases"
}
}
service {
name: "Leases"
method {
name: "Create"
input_type: ".containerd.services.leases.v1.CreateRequest"
output_type: ".containerd.services.leases.v1.CreateResponse"
}
method {
name: "Delete"
input_type: ".containerd.services.leases.v1.DeleteRequest"
output_type: ".google.protobuf.Empty"
}
method {
name: "List"
input_type: ".containerd.services.leases.v1.ListRequest"
output_type: ".containerd.services.leases.v1.ListResponse"
}
}
options {
go_package: "github.com/containerd/containerd/api/services/leases/v1;leases"
}
syntax: "proto3"
}
file {
name: "github.com/containerd/containerd/api/services/namespaces/v1/namespace.proto"
package: "containerd.services.namespaces.v1"

View File

@ -0,0 +1 @@
package leases

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,58 @@
syntax = "proto3";
package containerd.services.leases.v1;
import "gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/leases/v1;leases";
// Leases service manages resources leases within the metadata store.
service Leases {
// Create creates a new lease for managing changes to metadata. A lease
// can be used to protect objects from being removed.
rpc Create(CreateRequest) returns (CreateResponse);
// Delete deletes the lease and makes any unreferenced objects created
// during the lease eligible for garbage collection if not referenced
// or retained by other resources during the lease.
rpc Delete(DeleteRequest) returns (google.protobuf.Empty);
// ListTransactions lists all active leases, returning the full list of
// leases and optionally including the referenced resources.
rpc List(ListRequest) returns (ListResponse);
}
// Lease is an object which retains resources while it exists.
message Lease {
string id = 1;
google.protobuf.Timestamp created_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
map<string, string> labels = 3;
}
message CreateRequest {
// ID is used to identity the lease, when the id is not set the service
// generates a random identifier for the lease.
string id = 1;
map<string, string> labels = 3;
}
message CreateResponse {
Lease lease = 1;
}
message DeleteRequest {
string id = 1;
}
message ListRequest {
repeated string filters = 1;
}
message ListResponse {
repeated Lease leases = 1;
}

View File

@ -138,6 +138,12 @@ func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container
// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
container := containers.Container{
ID: id,
Runtime: containers.RuntimeInfo{
@ -213,6 +219,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
}
store := c.ContentStore()
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
if err != nil {
return nil, err
@ -231,7 +243,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...)
} else {
handler = images.Handlers(append(pullCtx.BaseHandlers,
remotes.FetchHandler(store, fetcher, desc),
remotes.FetchHandler(store, fetcher),
images.ChildrenHandler(store, platforms.Default()))...,
)
}
@ -268,11 +280,6 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
imgrec = created
}
// Remove root tag from manifest now that image refers to it
if _, err := store.Update(ctx, content.Info{Digest: desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
return nil, errors.Wrap(err, "failed to remove manifest root tag")
}
img := &image{
client: c,
i: imgrec,
@ -584,6 +591,13 @@ func (c *Client) Import(ctx context.Context, ref string, reader io.Reader, opts
if err != nil {
return nil, err
}
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
switch iopts.format {
case ociImageFormat:
return c.importFromOCITar(ctx, ref, reader, iopts)

View File

@ -10,6 +10,7 @@ import (
_ "github.com/containerd/containerd/services/healthcheck"
_ "github.com/containerd/containerd/services/images"
_ "github.com/containerd/containerd/services/introspection"
_ "github.com/containerd/containerd/services/leases"
_ "github.com/containerd/containerd/services/namespaces"
_ "github.com/containerd/containerd/services/snapshot"
_ "github.com/containerd/containerd/services/tasks"

View File

@ -2,12 +2,10 @@ package containerd
import (
"context"
"time"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
"github.com/opencontainers/image-spec/identity"
@ -93,11 +91,8 @@ func WithNewSnapshot(id string, i Image) NewContainerOpts {
return err
}
setSnapshotterIfEmpty(c)
labels := map[string]string{
"containerd.io/gc.root": time.Now().String(),
}
parent := identity.ChainID(diffIDs).String()
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent); err != nil {
return err
}
c.SnapshotKey = id
@ -126,11 +121,8 @@ func WithNewSnapshotView(id string, i Image) NewContainerOpts {
return err
}
setSnapshotterIfEmpty(c)
labels := map[string]string{
"containerd.io/gc.root": time.Now().String(),
}
parent := identity.ChainID(diffIDs).String()
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent); err != nil {
return err
}
c.SnapshotKey = id

View File

@ -3,7 +3,6 @@ package containerd
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
@ -102,27 +101,14 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
)
for _, layer := range layers {
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
"containerd.io/uncompressed": layer.Diff.Digest.String(),
}
lastUnpacked := unpacked
unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels))
if err != nil {
return err
}
if lastUnpacked {
info := snapshot.Info{
Name: identity.ChainID(chain).String(),
}
// Remove previously created gc.root label
if _, err := sn.Update(ctx, info, "labels.containerd.io/gc.root"); err != nil {
return err
}
}
chain = append(chain, layer.Diff.Digest)
}
@ -143,15 +129,6 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
if _, err := cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName)); err != nil {
return err
}
sinfo := snapshot.Info{
Name: rootfs,
}
// Config now referenced snapshot, release root reference
if _, err := sn.Update(ctx, sinfo, "labels.containerd.io/gc.root"); err != nil {
return err
}
}
return nil

91
lease.go Normal file
View File

@ -0,0 +1,91 @@
package containerd
import (
"context"
"time"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/leases"
)
// Lease is used to hold a reference to active resources which have not been
// referenced by a root resource. This is useful for preventing garbage
// collection of resources while they are actively being updated.
type Lease struct {
id string
createdAt time.Time
client *Client
}
// CreateLease creates a new lease
func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{})
if err != nil {
return Lease{}, err
}
return Lease{
id: resp.Lease.ID,
client: c,
}, nil
}
// ListLeases lists active leases
func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.List(ctx, &leasesapi.ListRequest{})
if err != nil {
return nil, err
}
leases := make([]Lease, len(resp.Leases))
for i := range resp.Leases {
leases[i] = Lease{
id: resp.Leases[i].ID,
createdAt: resp.Leases[i].CreatedAt,
client: c,
}
}
return leases, nil
}
func (c *Client) withLease(ctx context.Context) (context.Context, func() error, error) {
_, ok := leases.Lease(ctx)
if ok {
return ctx, func() error {
return nil
}, nil
}
l, err := c.CreateLease(ctx)
if err != nil {
return nil, nil, err
}
ctx = leases.WithLease(ctx, l.ID())
return ctx, func() error {
return l.Delete(ctx)
}, nil
}
// ID returns the lease ID
func (l Lease) ID() string {
return l.id
}
// CreatedAt returns the time at which the lease was created
func (l Lease) CreatedAt() time.Time {
return l.createdAt
}
// Delete deletes the lease, removing the reference to all resources created
// during the lease.
func (l Lease) Delete(ctx context.Context) error {
lapi := leasesapi.NewLeasesClient(l.client.conn)
_, err := lapi.Delete(ctx, &leasesapi.DeleteRequest{
ID: l.id,
})
return err
}

24
leases/context.go Normal file
View File

@ -0,0 +1,24 @@
package leases
import "context"
type leaseKey struct{}
// WithLease sets a given lease on the context
func WithLease(ctx context.Context, lid string) context.Context {
ctx = context.WithValue(ctx, leaseKey{}, lid)
// also store on the grpc headers so it gets picked up by any clients that
// are using this.
return withGRPCLeaseHeader(ctx, lid)
}
// Lease returns the lease from the context.
func Lease(ctx context.Context) (string, bool) {
lid, ok := ctx.Value(leaseKey{}).(string)
if !ok {
return fromGRPCHeader(ctx)
}
return lid, ok
}

41
leases/grpc.go Normal file
View File

@ -0,0 +1,41 @@
package leases
import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
const (
// GRPCHeader defines the header name for specifying a containerd lease.
GRPCHeader = "containerd-lease"
)
func withGRPCLeaseHeader(ctx context.Context, lid string) context.Context {
// also store on the grpc headers so it gets picked up by any clients
// that are using this.
txheader := metadata.Pairs(GRPCHeader, lid)
md, ok := metadata.FromOutgoingContext(ctx) // merge with outgoing context.
if !ok {
md = txheader
} else {
// order ensures the latest is first in this list.
md = metadata.Join(txheader, md)
}
return metadata.NewOutgoingContext(ctx, md)
}
func fromGRPCHeader(ctx context.Context) (string, bool) {
// try to extract for use in grpc servers.
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
values := md[GRPCHeader]
if len(values) == 0 {
return "", false
}
return values[0], true
}

View File

@ -38,6 +38,7 @@ var (
bucketKeyObjectContent = []byte("content") // stores content references
bucketKeyObjectBlob = []byte("blob") // stores content links
bucketKeyObjectIngest = []byte("ingest") // stores ingest links
bucketKeyObjectLeases = []byte("leases") // stores leases
bucketKeyDigest = []byte("digest")
bucketKeyMediaType = []byte("mediatype")
@ -53,6 +54,7 @@ var (
bucketKeySnapshotter = []byte("snapshotter")
bucketKeyTarget = []byte("target")
bucketKeyExtensions = []byte("extensions")
bucketKeyCreatedAt = []byte("createdat")
)
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {

View File

@ -391,27 +391,31 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
return err
}
}
return nw.commit(ctx, tx, size, expected, opts...)
dgst, err := nw.commit(ctx, tx, size, expected, opts...)
if err != nil {
return err
}
return addContentLease(ctx, tx, dgst)
})
}
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error {
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) {
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
return "", err
}
}
if err := validateInfo(&base); err != nil {
return err
return "", err
}
status, err := nw.Writer.Status()
if err != nil {
return err
return "", err
}
if size != 0 && size != status.Offset {
return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
@ -419,32 +423,32 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return err
return "", err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
if err != nil {
return err
return "", err
}
commitTime := time.Now().UTC()
sizeEncoded, err := encodeInt(size)
if err != nil {
return err
return "", err
}
if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil {
return err
return "", err
}
if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
return err
return "", err
}
return bkt.Put(bucketKeySize, sizeEncoded)
return actual, bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {

View File

@ -46,6 +46,55 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
nbkt := v1bkt.Bucket(k)
ns := string(k)
lbkt := nbkt.Bucket(bucketKeyObjectLeases)
if lbkt != nil {
if err := lbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
libkt := lbkt.Bucket(k)
cbkt := libkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceContent, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
}); err != nil {
return err
}
}
sbkt := libkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
ibkt := nbkt.Bucket(bucketKeyObjectImages)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {

View File

@ -34,14 +34,22 @@ func TestGCRoots(t *testing.T) {
addSnapshot("ns1", "overlay", "sn1", "", nil),
addSnapshot("ns1", "overlay", "sn2", "", nil),
addSnapshot("ns1", "overlay", "sn3", "", labelmap(string(labelGCRoot), "always")),
addLeaseSnapshot("ns2", "l1", "overlay", "sn5"),
addLeaseSnapshot("ns2", "l2", "overlay", "sn6"),
addLeaseContent("ns2", "l1", dgst(4)),
addLeaseContent("ns2", "l2", dgst(5)),
}
expected := []gc.Node{
gcnode(ResourceContent, "ns1", dgst(1).String()),
gcnode(ResourceContent, "ns1", dgst(2).String()),
gcnode(ResourceContent, "ns2", dgst(2).String()),
gcnode(ResourceContent, "ns2", dgst(4).String()),
gcnode(ResourceContent, "ns2", dgst(5).String()),
gcnode(ResourceSnapshot, "ns1", "overlay/sn2"),
gcnode(ResourceSnapshot, "ns1", "overlay/sn3"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn5"),
gcnode(ResourceSnapshot, "ns2", "overlay/sn6"),
}
if err := db.Update(func(tx *bolt.Tx) error {
@ -374,6 +382,26 @@ func addContent(ns string, dgst digest.Digest, labels map[string]string) alterFu
}
}
func addLeaseSnapshot(ns, lid, snapshotter, name string) alterFunc {
return func(bkt *bolt.Bucket) error {
sbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectSnapshots), snapshotter)
if err != nil {
return err
}
return sbkt.Put([]byte(name), nil)
}
}
func addLeaseContent(ns, lid string, dgst digest.Digest) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectLeases), lid, string(bucketKeyObjectContent))
if err != nil {
return err
}
return cbkt.Put([]byte(dgst.String()), nil)
}
}
func addContainer(ns, name, snapshotter, snapshot string, labels map[string]string) alterFunc {
return func(bkt *bolt.Bucket) error {
cbkt, err := createBuckets(bkt, ns, string(bucketKeyObjectContainers), name)

201
metadata/leases.go Normal file
View File

@ -0,0 +1,201 @@
package metadata
import (
"context"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Lease retains resources to prevent garbage collection before
// the resources can be fully referenced.
type Lease struct {
ID string
CreatedAt time.Time
Labels map[string]string
Content []string
Snapshots map[string][]string
}
// LeaseManager manages the create/delete lifecyle of leases
// and also returns existing leases
type LeaseManager struct {
tx *bolt.Tx
}
// NewLeaseManager creates a new lease manager for managing leases using
// the provided database transaction.
func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
return &LeaseManager{
tx: tx,
}
}
// Create creates a new lease using the provided lease
func (lm *LeaseManager) Create(ctx context.Context, lid string, labels map[string]string) (Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return Lease{}, err
}
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return Lease{}, err
}
txbkt, err := topbkt.CreateBucket([]byte(lid))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
}
return Lease{}, err
}
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return Lease{}, err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return Lease{}, err
}
if labels != nil {
if err := boltutil.WriteLabels(txbkt, labels); err != nil {
return Lease{}, err
}
}
return Lease{
ID: lid,
CreatedAt: t,
Labels: labels,
}, nil
}
// Delete delets the lease with the provided lease ID
func (lm *LeaseManager) Delete(ctx context.Context, lid string) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return nil
}
if err := topbkt.DeleteBucket([]byte(lid)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil
}
// List lists all active leases
func (lm *LeaseManager) List(ctx context.Context, includeResources bool, filter ...string) ([]Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var leases []Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return leases, nil
}
if err := topbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
txbkt := topbkt.Bucket(k)
l := Lease{
ID: string(k),
}
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
t := &l.CreatedAt
if err := t.UnmarshalBinary(v); err != nil {
return err
}
}
labels, err := boltutil.ReadLabels(txbkt)
if err != nil {
return err
}
l.Labels = labels
// TODO: Read Snapshots
// TODO: Read Content
leases = append(leases, l)
return nil
}); err != nil {
return nil, err
}
return leases, nil
}
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectSnapshots)
if err != nil {
return err
}
bkt, err = bkt.CreateBucketIfNotExists([]byte(snapshotter))
if err != nil {
return err
}
return bkt.Put([]byte(key), nil)
}
func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectContent)
if err != nil {
return err
}
return bkt.Put([]byte(dgst.String()), nil)
}

90
metadata/leases_test.go Normal file
View File

@ -0,0 +1,90 @@
package metadata
import (
"testing"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/pkg/errors"
)
func TestLeases(t *testing.T) {
ctx, db, cancel := testEnv(t)
defer cancel()
testCases := []struct {
ID string
Cause error
}{
{
ID: "tx1",
},
{
ID: "tx1",
Cause: errdefs.ErrAlreadyExists,
},
{
ID: "tx2",
},
}
var leases []Lease
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
lease, err := NewLeaseManager(tx).Create(ctx, tc.ID, nil)
if err != nil {
if tc.Cause != nil && errors.Cause(err) == tc.Cause {
return nil
}
return err
}
leases = append(leases, lease)
return nil
}); err != nil {
t.Fatal(err)
}
}
var listed []Lease
// List leases, check same
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx, false)
return err
}); err != nil {
t.Fatal(err)
}
if len(listed) != len(leases) {
t.Fatalf("Expected %d lease, got %d", len(leases), len(listed))
}
for i := range listed {
if listed[i].ID != leases[i].ID {
t.Fatalf("Expected lease ID %s, got %s", leases[i].ID, listed[i].ID)
}
if listed[i].CreatedAt != leases[i].CreatedAt {
t.Fatalf("Expected lease created at time %s, got %s", leases[i].CreatedAt, listed[i].CreatedAt)
}
}
for _, tc := range testCases {
if err := db.Update(func(tx *bolt.Tx) error {
return NewLeaseManager(tx).Delete(ctx, tc.ID)
}); err != nil {
t.Fatal(err)
}
}
if err := db.View(func(tx *bolt.Tx) error {
var err error
listed, err = NewLeaseManager(tx).List(ctx, false)
return err
}); err != nil {
t.Fatal(err)
}
if len(listed) > 0 {
t.Fatalf("Expected no leases, found %d: %v", len(listed), listed)
}
}

View File

@ -326,6 +326,10 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return err
}
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
if readonly {

View File

@ -160,7 +160,6 @@ func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
}
labels := map[string]string{}
labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
labels["containerd.io/gc.ref.content.0"] = manifest.Config.Digest.String()
for i, ch := range manifest.Layers {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
@ -176,12 +175,6 @@ func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
}
for _, ch := range manifest.Layers {
if _, err := c.contentStore.Update(ctx, content.Info{Digest: ch.Digest}, "labels.containerd.io/gc.root"); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to remove blob root tag")
}
}
return desc, nil
}
@ -284,10 +277,7 @@ tryit:
eg.Go(func() error {
defer pw.Close()
opt := content.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})
return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest, opt)
return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest)
})
if err := eg.Wait(); err != nil {

View File

@ -45,7 +45,7 @@ func MakeRefKey(ctx context.Context, desc ocispec.Descriptor) string {
// FetchHandler returns a handler that will fetch all content into the ingester
// discovered in a call to Dispatch. Use with ChildrenHandler to do a full
// recursive fetch.
func FetchHandler(ingester content.Ingester, fetcher Fetcher, root ocispec.Descriptor) images.HandlerFunc {
func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
@ -57,13 +57,13 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher, root ocispec.Descr
case images.MediaTypeDockerSchema1Manifest:
return nil, fmt.Errorf("%v not supported", desc.MediaType)
default:
err := fetch(ctx, ingester, fetcher, desc, desc.Digest == root.Digest)
err := fetch(ctx, ingester, fetcher, desc)
return nil, err
}
}
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor, root bool) error {
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch")
var (
@ -105,13 +105,13 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
}
defer rc.Close()
r, opts := commitOpts(desc, rc, root)
r, opts := commitOpts(desc, rc)
return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
}
// commitOpts gets the appropriate content options to alter
// the content info on commit based on media type.
func commitOpts(desc ocispec.Descriptor, r io.Reader, root bool) (io.Reader, []content.Opt) {
func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) {
var childrenF func(r io.Reader) ([]ocispec.Descriptor, error)
switch desc.MediaType {
@ -163,13 +163,10 @@ func commitOpts(desc ocispec.Descriptor, r io.Reader, root bool) (io.Reader, []c
return errors.Wrap(err, "unable to get commit labels")
}
if len(children) > 0 || root {
if len(children) > 0 {
if info.Labels == nil {
info.Labels = map[string]string{}
}
if root {
info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
}
for i, ch := range children {
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
}

View File

@ -16,6 +16,7 @@ import (
eventsapi "github.com/containerd/containerd/api/services/events/v1"
images "github.com/containerd/containerd/api/services/images/v1"
introspection "github.com/containerd/containerd/api/services/introspection/v1"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
namespaces "github.com/containerd/containerd/api/services/namespaces/v1"
snapshotapi "github.com/containerd/containerd/api/services/snapshot/v1"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
@ -255,6 +256,8 @@ func interceptor(
ctx = log.WithModule(ctx, "events")
case introspection.IntrospectionServer:
ctx = log.WithModule(ctx, "introspection")
case leasesapi.LeasesServer:
ctx = log.WithModule(ctx, "leases")
default:
log.G(ctx).Warnf("unknown GRPC server type: %#v\n", info.Server)
}

115
services/leases/service.go Normal file
View File

@ -0,0 +1,115 @@
package leases
import (
"crypto/rand"
"encoding/base64"
"fmt"
"time"
"google.golang.org/grpc"
"github.com/boltdb/bolt"
api "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/plugin"
"github.com/golang/protobuf/ptypes/empty"
"golang.org/x/net/context"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.GRPCPlugin,
ID: "leases",
Requires: []plugin.Type{
plugin.MetadataPlugin,
},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
m, err := ic.Get(plugin.MetadataPlugin)
if err != nil {
return nil, err
}
return NewService(m.(*metadata.DB)), nil
},
})
}
type service struct {
db *metadata.DB
}
// NewService returns the GRPC metadata server
func NewService(db *metadata.DB) api.LeasesServer {
return &service{
db: db,
}
}
func (s *service) Register(server *grpc.Server) error {
api.RegisterLeasesServer(server, s)
return nil
}
func (s *service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) {
lid := r.ID
if lid == "" {
lid = generateLeaseID()
}
var trans metadata.Lease
if err := s.db.Update(func(tx *bolt.Tx) error {
var err error
trans, err = metadata.NewLeaseManager(tx).Create(ctx, lid, r.Labels)
return err
}); err != nil {
return nil, err
}
return &api.CreateResponse{
Lease: txToGRPC(trans),
}, nil
}
func (s *service) Delete(ctx context.Context, r *api.DeleteRequest) (*empty.Empty, error) {
if err := s.db.Update(func(tx *bolt.Tx) error {
return metadata.NewLeaseManager(tx).Delete(ctx, r.ID)
}); err != nil {
return nil, err
}
return &empty.Empty{}, nil
}
func (s *service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) {
var leases []metadata.Lease
if err := s.db.View(func(tx *bolt.Tx) error {
var err error
leases, err = metadata.NewLeaseManager(tx).List(ctx, false, r.Filters...)
return err
}); err != nil {
return nil, err
}
apileases := make([]*api.Lease, len(leases))
for i := range leases {
apileases[i] = txToGRPC(leases[i])
}
return &api.ListResponse{
Leases: apileases,
}, nil
}
func txToGRPC(tx metadata.Lease) *api.Lease {
return &api.Lease{
ID: tx.ID,
Labels: tx.Labels,
CreatedAt: tx.CreatedAt,
// TODO: Snapshots
// TODO: Content
}
}
func generateLeaseID() string {
t := time.Now()
var b [3]byte
// Ignore read failures, just decreases uniqueness
rand.Read(b[:])
return fmt.Sprintf("%d-%s", t.Nanosecond(), base64.URLEncoding.EncodeToString(b[:]))
}

View File

@ -567,10 +567,7 @@ func (s *service) writeContent(ctx context.Context, mediaType, ref string, r io.
if err != nil {
return nil, err
}
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}
if err := writer.Commit(ctx, 0, "", content.WithLabels(labels)); err != nil {
if err := writer.Commit(ctx, 0, ""); err != nil {
return nil, err
}
return &types.Descriptor{

View File

@ -11,7 +11,6 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"golang.org/x/sys/unix"
@ -21,7 +20,6 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshot"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/runc/libcontainer/user"
@ -260,19 +258,16 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
snapshotter = client.SnapshotService(c.Snapshotter)
parent = identity.ChainID(diffIDs).String()
usernsID = fmt.Sprintf("%s-%d-%d", parent, uid, gid)
opt = snapshot.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})
)
if _, err := snapshotter.Stat(ctx, usernsID); err == nil {
if _, err := snapshotter.Prepare(ctx, id, usernsID, opt); err != nil {
if _, err := snapshotter.Prepare(ctx, id, usernsID); err != nil {
return err
}
c.SnapshotKey = id
c.Image = i.Name()
return nil
}
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent, opt)
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent)
if err != nil {
return err
}
@ -280,13 +275,13 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
snapshotter.Remove(ctx, usernsID)
return err
}
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap", opt); err != nil {
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap"); err != nil {
return err
}
if readonly {
_, err = snapshotter.View(ctx, id, usernsID, opt)
_, err = snapshotter.View(ctx, id, usernsID)
} else {
_, err = snapshotter.Prepare(ctx, id, usernsID, opt)
_, err = snapshotter.Prepare(ctx, id, usernsID)
}
if err != nil {
return err

31
task.go
View File

@ -18,7 +18,6 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/rootfs"
@ -26,7 +25,6 @@ import (
google_protobuf "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
@ -359,6 +357,12 @@ func (t *task) Resize(ctx context.Context, w, h uint32) error {
}
func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Image, error) {
ctx, done, err := t.client.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
request := &tasks.CheckpointTaskRequest{
ContainerID: t.id,
}
@ -392,15 +396,6 @@ func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Imag
index := v1.Index{
Annotations: make(map[string]string),
}
// make sure we clear the gc root labels reguardless of success
var clearRoots []ocispec.Descriptor
defer func() {
for _, r := range append(index.Manifests, clearRoots...) {
if err := clearRootGCLabel(ctx, t.client, r); err != nil {
log.G(ctx).WithError(err).WithField("dgst", r.Digest).Warnf("failed to remove root marker")
}
}
}()
if err := t.checkpointTask(ctx, &index, request); err != nil {
return nil, err
}
@ -419,7 +414,6 @@ func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Imag
if err != nil {
return nil, err
}
clearRoots = append(clearRoots, desc)
im := images.Image{
Name: i.Name,
Target: desc,
@ -535,9 +529,6 @@ func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tas
func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, snapshotterName string, id string) error {
opts := []diff.Opt{
diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id)),
diff.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}),
}
rw, err := rootfs.Diff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), opts...)
if err != nil {
@ -564,9 +555,7 @@ func (t *task) checkpointImage(ctx context.Context, index *v1.Index, image strin
}
func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor, err error) {
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}
labels := map[string]string{}
for i, m := range index.Manifests {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
}
@ -596,9 +585,3 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin
Size: size,
}, nil
}
func clearRootGCLabel(ctx context.Context, client *Client, desc ocispec.Descriptor) error {
info := content.Info{Digest: desc.Digest}
_, err := client.ContentStore().Update(ctx, info, "labels.containerd.io/gc.root")
return err
}