Merge pull request #407 from Random-Liu/update-containerd

Update containerd to v1.0.0-beta.3
This commit is contained in:
Lantao Liu 2017-11-09 16:00:25 -08:00 committed by GitHub
commit ac8b0979fa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 2141 additions and 133 deletions

View File

@ -1,5 +1,5 @@
RUNC_VERSION=74a17296470088de3805e138d3d87c62e613dfc4
CNI_VERSION=v0.6.0
CONTAINERD_VERSION=564600ee79aefb0f24cbcecc90d4388bd0ea59de
CONTAINERD_VERSION=v1.0.0-beta.3
CRITOOL_VERSION=4e3c99777477277030734ee2c9253d44a6216955
KUBERNETES_VERSION=b958430ec2654bac10f74abbeab402c71cf5fa3b

View File

@ -2,7 +2,7 @@ github.com/blang/semver v3.1.0
github.com/boltdb/bolt e9cf4fae01b5a8ff89d0ec6b32f0d9c9f79aefdd
github.com/BurntSushi/toml a368813c5e648fee92e5f6c30e3944ff9d5e8895
github.com/containerd/cgroups f7dd103d3e4e696aa67152f6b4ddd1779a3455a9
github.com/containerd/containerd 564600ee79aefb0f24cbcecc90d4388bd0ea59de
github.com/containerd/containerd v1.0.0-beta.3
github.com/containerd/continuity cf279e6ac893682272b4479d4c67fd3abf878b4e
github.com/containerd/fifo fbfb6a11ec671efbe94ad1c12c2e98773f19e1e6
github.com/containerd/typeurl f6943554a7e7e88b3c14aad190bf05932da84788

View File

@ -0,0 +1 @@
package leases

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,58 @@
syntax = "proto3";
package containerd.services.leases.v1;
import "gogoproto/gogo.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
option go_package = "github.com/containerd/containerd/api/services/leases/v1;leases";
// Leases service manages resources leases within the metadata store.
service Leases {
// Create creates a new lease for managing changes to metadata. A lease
// can be used to protect objects from being removed.
rpc Create(CreateRequest) returns (CreateResponse);
// Delete deletes the lease and makes any unreferenced objects created
// during the lease eligible for garbage collection if not referenced
// or retained by other resources during the lease.
rpc Delete(DeleteRequest) returns (google.protobuf.Empty);
// ListTransactions lists all active leases, returning the full list of
// leases and optionally including the referenced resources.
rpc List(ListRequest) returns (ListResponse);
}
// Lease is an object which retains resources while it exists.
message Lease {
string id = 1;
google.protobuf.Timestamp created_at = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
map<string, string> labels = 3;
}
message CreateRequest {
// ID is used to identity the lease, when the id is not set the service
// generates a random identifier for the lease.
string id = 1;
map<string, string> labels = 3;
}
message CreateResponse {
Lease lease = 1;
}
message DeleteRequest {
string id = 1;
}
message ListRequest {
repeated string filters = 1;
}
message ListResponse {
repeated Lease leases = 1;
}

View File

@ -22,6 +22,7 @@ import (
versionservice "github.com/containerd/containerd/api/services/version/v1"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/dialer"
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
@ -72,7 +73,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
grpc.WithTimeout(60 * time.Second),
grpc.FailOnNonTempDialError(true),
grpc.WithBackoffMaxDelay(3 * time.Second),
grpc.WithDialer(Dialer),
grpc.WithDialer(dialer.Dialer),
}
if len(copts.dialOptions) > 0 {
gopts = copts.dialOptions
@ -84,7 +85,7 @@ func New(address string, opts ...ClientOpt) (*Client, error) {
grpc.WithStreamInterceptor(stream),
)
}
conn, err := grpc.Dial(DialAddress(address), gopts...)
conn, err := grpc.Dial(dialer.DialAddress(address), gopts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial %q", address)
}
@ -137,6 +138,12 @@ func (c *Client) Containers(ctx context.Context, filters ...string) ([]Container
// NewContainer will create a new container in container with the provided id
// the id must be unique within the namespace
func (c *Client) NewContainer(ctx context.Context, id string, opts ...NewContainerOpts) (Container, error) {
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
container := containers.Container{
ID: id,
Runtime: containers.RuntimeInfo{
@ -212,6 +219,12 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
}
store := c.ContentStore()
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
name, desc, err := pullCtx.Resolver.Resolve(ctx, ref)
if err != nil {
return nil, err
@ -230,7 +243,7 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
handler = images.Handlers(append(pullCtx.BaseHandlers, schema1Converter)...)
} else {
handler = images.Handlers(append(pullCtx.BaseHandlers,
remotes.FetchHandler(store, fetcher, desc),
remotes.FetchHandler(store, fetcher),
images.ChildrenHandler(store, platforms.Default()))...,
)
}
@ -267,11 +280,6 @@ func (c *Client) Pull(ctx context.Context, ref string, opts ...RemoteOpt) (Image
imgrec = created
}
// Remove root tag from manifest now that image refers to it
if _, err := store.Update(ctx, content.Info{Digest: desc.Digest}, "labels.containerd.io/gc.root"); err != nil {
return nil, errors.Wrap(err, "failed to remove manifest root tag")
}
img := &image{
client: c,
i: imgrec,
@ -583,6 +591,13 @@ func (c *Client) Import(ctx context.Context, ref string, reader io.Reader, opts
if err != nil {
return nil, err
}
ctx, done, err := c.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
switch iopts.format {
case ociImageFormat:
return c.importFromOCITar(ctx, ref, reader, iopts)

View File

@ -2,12 +2,10 @@ package containerd
import (
"context"
"time"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshot"
"github.com/containerd/typeurl"
"github.com/gogo/protobuf/types"
"github.com/opencontainers/image-spec/identity"
@ -93,11 +91,8 @@ func WithNewSnapshot(id string, i Image) NewContainerOpts {
return err
}
setSnapshotterIfEmpty(c)
labels := map[string]string{
"containerd.io/gc.root": time.Now().String(),
}
parent := identity.ChainID(diffIDs).String()
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent); err != nil {
return err
}
c.SnapshotKey = id
@ -126,11 +121,8 @@ func WithNewSnapshotView(id string, i Image) NewContainerOpts {
return err
}
setSnapshotterIfEmpty(c)
labels := map[string]string{
"containerd.io/gc.root": time.Now().String(),
}
parent := identity.ChainID(diffIDs).String()
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent); err != nil {
return err
}
c.SnapshotKey = id

View File

@ -1,4 +1,4 @@
package containerd
package dialer
import (
"net"

View File

@ -1,6 +1,6 @@
// +build !windows
package containerd
package dialer
import (
"fmt"
@ -11,6 +11,12 @@ import (
"time"
)
// DialAddress returns the address with unix:// prepended to the
// provided address
func DialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}
func isNoent(err error) bool {
if err != nil {
if nerr, ok := err.(*net.OpError); ok {
@ -28,9 +34,3 @@ func dialer(address string, timeout time.Duration) (net.Conn, error) {
address = strings.TrimPrefix(address, "unix://")
return net.DialTimeout("unix", address, timeout)
}
// DialAddress returns the address with unix:// prepended to the
// provided address
func DialAddress(address string) string {
return fmt.Sprintf("unix://%s", address)
}

View File

@ -1,4 +1,4 @@
package containerd
package dialer
import (
"net"

View File

@ -1,12 +1,13 @@
package events
package exchange
import (
"context"
"strings"
"time"
events "github.com/containerd/containerd/api/services/events/v1"
v1 "github.com/containerd/containerd/api/services/events/v1"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/filters"
"github.com/containerd/containerd/identifiers"
"github.com/containerd/containerd/log"
@ -34,7 +35,7 @@ func NewExchange() *Exchange {
//
// This is useful when an event is forwaded on behalf of another namespace or
// when the event is propagated on behalf of another publisher.
func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err error) {
func (e *Exchange) Forward(ctx context.Context, envelope *v1.Envelope) (err error) {
if err := validateEnvelope(envelope); err != nil {
return err
}
@ -59,11 +60,11 @@ func (e *Exchange) Forward(ctx context.Context, envelope *events.Envelope) (err
// Publish packages and sends an event. The caller will be considered the
// initial publisher of the event. This means the timestamp will be calculated
// at this point and this method may read from the calling context.
func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err error) {
func (e *Exchange) Publish(ctx context.Context, topic string, event events.Event) (err error) {
var (
namespace string
encoded *types.Any
envelope events.Envelope
envelope v1.Envelope
)
namespace, err = namespaces.NamespaceRequired(ctx)
@ -108,9 +109,9 @@ func (e *Exchange) Publish(ctx context.Context, topic string, event Event) (err
// Zero or more filters may be provided as strings. Only events that match
// *any* of the provided filters will be sent on the channel. The filters use
// the standard containerd filters package syntax.
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *events.Envelope, errs <-chan error) {
func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *v1.Envelope, errs <-chan error) {
var (
evch = make(chan *events.Envelope)
evch = make(chan *v1.Envelope)
errq = make(chan error, 1)
channel = goevents.NewChannel(0)
queue = goevents.NewQueue(channel)
@ -150,7 +151,7 @@ func (e *Exchange) Subscribe(ctx context.Context, fs ...string) (ch <-chan *even
for {
select {
case ev := <-channel.C:
env, ok := ev.(*events.Envelope)
env, ok := ev.(*v1.Envelope)
if !ok {
// TODO(stevvooe): For the most part, we are well protected
// from this condition. Both Forward and Publish protect
@ -204,7 +205,7 @@ func validateTopic(topic string) error {
return nil
}
func validateEnvelope(envelope *events.Envelope) error {
func validateEnvelope(envelope *v1.Envelope) error {
if err := namespaces.Validate(envelope.Namespace); err != nil {
return errors.Wrapf(err, "event envelope has invalid namespace")
}

View File

@ -3,7 +3,6 @@ package containerd
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
@ -102,27 +101,14 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
)
for _, layer := range layers {
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
"containerd.io/uncompressed": layer.Diff.Digest.String(),
}
lastUnpacked := unpacked
unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels))
if err != nil {
return err
}
if lastUnpacked {
info := snapshot.Info{
Name: identity.ChainID(chain).String(),
}
// Remove previously created gc.root label
if _, err := sn.Update(ctx, info, "labels.containerd.io/gc.root"); err != nil {
return err
}
}
chain = append(chain, layer.Diff.Digest)
}
@ -143,15 +129,6 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
if _, err := cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName)); err != nil {
return err
}
sinfo := snapshot.Info{
Name: rootfs,
}
// Config now referenced snapshot, release root reference
if _, err := sn.Update(ctx, sinfo, "labels.containerd.io/gc.root"); err != nil {
return err
}
}
return nil

91
vendor/github.com/containerd/containerd/lease.go generated vendored Normal file
View File

@ -0,0 +1,91 @@
package containerd
import (
"context"
"time"
leasesapi "github.com/containerd/containerd/api/services/leases/v1"
"github.com/containerd/containerd/leases"
)
// Lease is used to hold a reference to active resources which have not been
// referenced by a root resource. This is useful for preventing garbage
// collection of resources while they are actively being updated.
type Lease struct {
id string
createdAt time.Time
client *Client
}
// CreateLease creates a new lease
func (c *Client) CreateLease(ctx context.Context) (Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.Create(ctx, &leasesapi.CreateRequest{})
if err != nil {
return Lease{}, err
}
return Lease{
id: resp.Lease.ID,
client: c,
}, nil
}
// ListLeases lists active leases
func (c *Client) ListLeases(ctx context.Context) ([]Lease, error) {
lapi := leasesapi.NewLeasesClient(c.conn)
resp, err := lapi.List(ctx, &leasesapi.ListRequest{})
if err != nil {
return nil, err
}
leases := make([]Lease, len(resp.Leases))
for i := range resp.Leases {
leases[i] = Lease{
id: resp.Leases[i].ID,
createdAt: resp.Leases[i].CreatedAt,
client: c,
}
}
return leases, nil
}
func (c *Client) withLease(ctx context.Context) (context.Context, func() error, error) {
_, ok := leases.Lease(ctx)
if ok {
return ctx, func() error {
return nil
}, nil
}
l, err := c.CreateLease(ctx)
if err != nil {
return nil, nil, err
}
ctx = leases.WithLease(ctx, l.ID())
return ctx, func() error {
return l.Delete(ctx)
}, nil
}
// ID returns the lease ID
func (l Lease) ID() string {
return l.id
}
// CreatedAt returns the time at which the lease was created
func (l Lease) CreatedAt() time.Time {
return l.createdAt
}
// Delete deletes the lease, removing the reference to all resources created
// during the lease.
func (l Lease) Delete(ctx context.Context) error {
lapi := leasesapi.NewLeasesClient(l.client.conn)
_, err := lapi.Delete(ctx, &leasesapi.DeleteRequest{
ID: l.id,
})
return err
}

View File

@ -0,0 +1,24 @@
package leases
import "context"
type leaseKey struct{}
// WithLease sets a given lease on the context
func WithLease(ctx context.Context, lid string) context.Context {
ctx = context.WithValue(ctx, leaseKey{}, lid)
// also store on the grpc headers so it gets picked up by any clients that
// are using this.
return withGRPCLeaseHeader(ctx, lid)
}
// Lease returns the lease from the context.
func Lease(ctx context.Context) (string, bool) {
lid, ok := ctx.Value(leaseKey{}).(string)
if !ok {
return fromGRPCHeader(ctx)
}
return lid, ok
}

41
vendor/github.com/containerd/containerd/leases/grpc.go generated vendored Normal file
View File

@ -0,0 +1,41 @@
package leases
import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
const (
// GRPCHeader defines the header name for specifying a containerd lease.
GRPCHeader = "containerd-lease"
)
func withGRPCLeaseHeader(ctx context.Context, lid string) context.Context {
// also store on the grpc headers so it gets picked up by any clients
// that are using this.
txheader := metadata.Pairs(GRPCHeader, lid)
md, ok := metadata.FromOutgoingContext(ctx) // merge with outgoing context.
if !ok {
md = txheader
} else {
// order ensures the latest is first in this list.
md = metadata.Join(txheader, md)
}
return metadata.NewOutgoingContext(ctx, md)
}
func fromGRPCHeader(ctx context.Context) (string, bool) {
// try to extract for use in grpc servers.
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
values := md[GRPCHeader]
if len(values) == 0 {
return "", false
}
return values[0], true
}

View File

@ -38,6 +38,7 @@ var (
bucketKeyObjectContent = []byte("content") // stores content references
bucketKeyObjectBlob = []byte("blob") // stores content links
bucketKeyObjectIngest = []byte("ingest") // stores ingest links
bucketKeyObjectLeases = []byte("leases") // stores leases
bucketKeyDigest = []byte("digest")
bucketKeyMediaType = []byte("mediatype")
@ -53,6 +54,7 @@ var (
bucketKeySnapshotter = []byte("snapshotter")
bucketKeyTarget = []byte("target")
bucketKeyExtensions = []byte("extensions")
bucketKeyCreatedAt = []byte("createdat")
)
func getBucket(tx *bolt.Tx, keys ...[]byte) *bolt.Bucket {

View File

@ -391,27 +391,31 @@ func (nw *namespacedWriter) Commit(ctx context.Context, size int64, expected dig
return err
}
}
return nw.commit(ctx, tx, size, expected, opts...)
dgst, err := nw.commit(ctx, tx, size, expected, opts...)
if err != nil {
return err
}
return addContentLease(ctx, tx, dgst)
})
}
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) error {
func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64, expected digest.Digest, opts ...content.Opt) (digest.Digest, error) {
var base content.Info
for _, opt := range opts {
if err := opt(&base); err != nil {
return err
return "", err
}
}
if err := validateInfo(&base); err != nil {
return err
return "", err
}
status, err := nw.Writer.Status()
if err != nil {
return err
return "", err
}
if size != 0 && size != status.Offset {
return errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
return "", errors.Errorf("%q failed size validation: %v != %v", nw.ref, status.Offset, size)
}
size = status.Offset
@ -419,32 +423,32 @@ func (nw *namespacedWriter) commit(ctx context.Context, tx *bolt.Tx, size int64,
if err := nw.Writer.Commit(ctx, size, expected); err != nil {
if !errdefs.IsAlreadyExists(err) {
return err
return "", err
}
if getBlobBucket(tx, nw.namespace, actual) != nil {
return errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
return "", errors.Wrapf(errdefs.ErrAlreadyExists, "content %v", actual)
}
}
bkt, err := createBlobBucket(tx, nw.namespace, actual)
if err != nil {
return err
return "", err
}
commitTime := time.Now().UTC()
sizeEncoded, err := encodeInt(size)
if err != nil {
return err
return "", err
}
if err := boltutil.WriteTimestamps(bkt, commitTime, commitTime); err != nil {
return err
return "", err
}
if err := boltutil.WriteLabels(bkt, base.Labels); err != nil {
return err
return "", err
}
return bkt.Put(bucketKeySize, sizeEncoded)
return actual, bkt.Put(bucketKeySize, sizeEncoded)
}
func (nw *namespacedWriter) Status() (content.Status, error) {

View File

@ -46,6 +46,55 @@ func scanRoots(ctx context.Context, tx *bolt.Tx, nc chan<- gc.Node) error {
nbkt := v1bkt.Bucket(k)
ns := string(k)
lbkt := nbkt.Bucket(bucketKeyObjectLeases)
if lbkt != nil {
if err := lbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
libkt := lbkt.Bucket(k)
cbkt := libkt.Bucket(bucketKeyObjectContent)
if cbkt != nil {
if err := cbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceContent, ns, string(k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
}); err != nil {
return err
}
}
sbkt := libkt.Bucket(bucketKeyObjectSnapshots)
if sbkt != nil {
if err := sbkt.ForEach(func(sk, sv []byte) error {
if sv != nil {
return nil
}
snbkt := sbkt.Bucket(sk)
return snbkt.ForEach(func(k, v []byte) error {
select {
case nc <- gcnode(ResourceSnapshot, ns, fmt.Sprintf("%s/%s", sk, k)):
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
}
ibkt := nbkt.Bucket(bucketKeyObjectImages)
if ibkt != nil {
if err := ibkt.ForEach(func(k, v []byte) error {

View File

@ -0,0 +1,201 @@
package metadata
import (
"context"
"time"
"github.com/boltdb/bolt"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata/boltutil"
"github.com/containerd/containerd/namespaces"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
// Lease retains resources to prevent garbage collection before
// the resources can be fully referenced.
type Lease struct {
ID string
CreatedAt time.Time
Labels map[string]string
Content []string
Snapshots map[string][]string
}
// LeaseManager manages the create/delete lifecyle of leases
// and also returns existing leases
type LeaseManager struct {
tx *bolt.Tx
}
// NewLeaseManager creates a new lease manager for managing leases using
// the provided database transaction.
func NewLeaseManager(tx *bolt.Tx) *LeaseManager {
return &LeaseManager{
tx: tx,
}
}
// Create creates a new lease using the provided lease
func (lm *LeaseManager) Create(ctx context.Context, lid string, labels map[string]string) (Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return Lease{}, err
}
topbkt, err := createBucketIfNotExists(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if err != nil {
return Lease{}, err
}
txbkt, err := topbkt.CreateBucket([]byte(lid))
if err != nil {
if err == bolt.ErrBucketExists {
err = errdefs.ErrAlreadyExists
}
return Lease{}, err
}
t := time.Now().UTC()
createdAt, err := t.MarshalBinary()
if err != nil {
return Lease{}, err
}
if err := txbkt.Put(bucketKeyCreatedAt, createdAt); err != nil {
return Lease{}, err
}
if labels != nil {
if err := boltutil.WriteLabels(txbkt, labels); err != nil {
return Lease{}, err
}
}
return Lease{
ID: lid,
CreatedAt: t,
Labels: labels,
}, nil
}
// Delete delets the lease with the provided lease ID
func (lm *LeaseManager) Delete(ctx context.Context, lid string) error {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return err
}
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return nil
}
if err := topbkt.DeleteBucket([]byte(lid)); err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil
}
// List lists all active leases
func (lm *LeaseManager) List(ctx context.Context, includeResources bool, filter ...string) ([]Lease, error) {
namespace, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return nil, err
}
var leases []Lease
topbkt := getBucket(lm.tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases)
if topbkt == nil {
return leases, nil
}
if err := topbkt.ForEach(func(k, v []byte) error {
if v != nil {
return nil
}
txbkt := topbkt.Bucket(k)
l := Lease{
ID: string(k),
}
if v := txbkt.Get(bucketKeyCreatedAt); v != nil {
t := &l.CreatedAt
if err := t.UnmarshalBinary(v); err != nil {
return err
}
}
labels, err := boltutil.ReadLabels(txbkt)
if err != nil {
return err
}
l.Labels = labels
// TODO: Read Snapshots
// TODO: Read Content
leases = append(leases, l)
return nil
}); err != nil {
return nil, err
}
return leases, nil
}
func addSnapshotLease(ctx context.Context, tx *bolt.Tx, snapshotter, key string) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectSnapshots)
if err != nil {
return err
}
bkt, err = bkt.CreateBucketIfNotExists([]byte(snapshotter))
if err != nil {
return err
}
return bkt.Put([]byte(key), nil)
}
func addContentLease(ctx context.Context, tx *bolt.Tx, dgst digest.Digest) error {
lid, ok := leases.Lease(ctx)
if !ok {
return nil
}
namespace, ok := namespaces.Namespace(ctx)
if !ok {
panic("namespace must already be required")
}
bkt := getBucket(tx, bucketKeyVersion, []byte(namespace), bucketKeyObjectLeases, []byte(lid))
if bkt == nil {
return errors.Wrap(errdefs.ErrNotFound, "lease does not exist")
}
bkt, err := bkt.CreateBucketIfNotExists(bucketKeyObjectContent)
if err != nil {
return err
}
return bkt.Put([]byte(dgst.String()), nil)
}

View File

@ -326,6 +326,10 @@ func (s *snapshotter) createSnapshot(ctx context.Context, key, parent string, re
return err
}
if err := addSnapshotLease(ctx, tx, s.name, key); err != nil {
return err
}
// TODO: Consider doing this outside of transaction to lessen
// metadata lock time
if readonly {

View File

@ -5,17 +5,21 @@ package mount
import "github.com/pkg/errors"
var (
// ErrNotImplementOnUnix is returned for methods that are not implemented
ErrNotImplementOnUnix = errors.New("not implemented under unix")
)
// Mount is not implemented on this platform
func (m *Mount) Mount(target string) error {
return ErrNotImplementOnUnix
}
// Unmount is not implemented on this platform
func Unmount(mount string, flags int) error {
return ErrNotImplementOnUnix
}
// UnmountAll is not implemented on this platform
func UnmountAll(mount string, flags int) error {
return ErrNotImplementOnUnix
}

View File

@ -5,7 +5,7 @@ import (
"path/filepath"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/events"
"github.com/containerd/containerd/events/exchange"
"github.com/containerd/containerd/log"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
@ -18,7 +18,7 @@ type InitContext struct {
State string
Config interface{}
Address string
Events *events.Exchange
Events *exchange.Exchange
Meta *Meta // plugins can fill in metadata at init.

View File

@ -160,7 +160,6 @@ func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
}
labels := map[string]string{}
labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
labels["containerd.io/gc.ref.content.0"] = manifest.Config.Digest.String()
for i, ch := range manifest.Layers {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i+1)] = ch.Digest.String()
@ -176,12 +175,6 @@ func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
}
for _, ch := range manifest.Layers {
if _, err := c.contentStore.Update(ctx, content.Info{Digest: ch.Digest}, "labels.containerd.io/gc.root"); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to remove blob root tag")
}
}
return desc, nil
}
@ -284,10 +277,7 @@ tryit:
eg.Go(func() error {
defer pw.Close()
opt := content.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})
return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest, opt)
return content.Copy(ctx, cw, io.TeeReader(rc, pw), desc.Size, desc.Digest)
})
if err := eg.Wait(); err != nil {

View File

@ -45,7 +45,7 @@ func MakeRefKey(ctx context.Context, desc ocispec.Descriptor) string {
// FetchHandler returns a handler that will fetch all content into the ingester
// discovered in a call to Dispatch. Use with ChildrenHandler to do a full
// recursive fetch.
func FetchHandler(ingester content.Ingester, fetcher Fetcher, root ocispec.Descriptor) images.HandlerFunc {
func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
@ -57,13 +57,13 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher, root ocispec.Descr
case images.MediaTypeDockerSchema1Manifest:
return nil, fmt.Errorf("%v not supported", desc.MediaType)
default:
err := fetch(ctx, ingester, fetcher, desc, desc.Digest == root.Digest)
err := fetch(ctx, ingester, fetcher, desc)
return nil, err
}
}
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor, root bool) error {
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch")
var (
@ -105,13 +105,13 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
}
defer rc.Close()
r, opts := commitOpts(desc, rc, root)
r, opts := commitOpts(desc, rc)
return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
}
// commitOpts gets the appropriate content options to alter
// the content info on commit based on media type.
func commitOpts(desc ocispec.Descriptor, r io.Reader, root bool) (io.Reader, []content.Opt) {
func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) {
var childrenF func(r io.Reader) ([]ocispec.Descriptor, error)
switch desc.MediaType {
@ -163,13 +163,10 @@ func commitOpts(desc ocispec.Descriptor, r io.Reader, root bool) (io.Reader, []c
return errors.Wrap(err, "unable to get commit labels")
}
if len(children) > 0 || root {
if len(children) > 0 {
if info.Labels == nil {
info.Labels = map[string]string{}
}
if root {
info.Labels["containerd.io/gc.root"] = time.Now().UTC().Format(time.RFC3339)
}
for i, ch := range children {
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
}

View File

@ -11,17 +11,16 @@ import (
"path/filepath"
"strconv"
"strings"
"time"
"golang.org/x/sys/unix"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/fs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/snapshot"
"github.com/opencontainers/image-spec/identity"
"github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/runc/libcontainer/user"
@ -260,19 +259,17 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
snapshotter = client.SnapshotService(c.Snapshotter)
parent = identity.ChainID(diffIDs).String()
usernsID = fmt.Sprintf("%s-%d-%d", parent, uid, gid)
opt = snapshot.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
})
)
if _, err := snapshotter.Stat(ctx, usernsID); err == nil {
if _, err := snapshotter.Prepare(ctx, id, usernsID, opt); err != nil {
if _, err := snapshotter.Prepare(ctx, id, usernsID); err == nil {
c.SnapshotKey = id
c.Image = i.Name()
return nil
} else if !errdefs.IsNotFound(err) {
return err
}
c.SnapshotKey = id
c.Image = i.Name()
return nil
}
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent, opt)
mounts, err := snapshotter.Prepare(ctx, usernsID+"-remap", parent)
if err != nil {
return err
}
@ -280,13 +277,13 @@ func withRemappedSnapshotBase(id string, i Image, uid, gid uint32, readonly bool
snapshotter.Remove(ctx, usernsID)
return err
}
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap", opt); err != nil {
if err := snapshotter.Commit(ctx, usernsID, usernsID+"-remap"); err != nil {
return err
}
if readonly {
_, err = snapshotter.View(ctx, id, usernsID, opt)
_, err = snapshotter.View(ctx, id, usernsID)
} else {
_, err = snapshotter.Prepare(ctx, id, usernsID, opt)
_, err = snapshotter.Prepare(ctx, id, usernsID)
}
if err != nil {
return err

View File

@ -151,6 +151,7 @@ func createDefaultSpec(ctx context.Context, id string) (*specs.Spec, error) {
"/proc/timer_stats",
"/proc/sched_debug",
"/sys/firmware",
"/proc/scsi",
},
ReadonlyPaths: []string{
"/proc/asound",

View File

@ -6,14 +6,17 @@ import (
"syscall"
)
// StatAtime returns the access time from a stat struct
func StatAtime(st *syscall.Stat_t) syscall.Timespec {
return st.Atimespec
}
// StatCtime returns the created time from a stat struct
func StatCtime(st *syscall.Stat_t) syscall.Timespec {
return st.Ctimespec
}
// StatMtime returns the modified time from a stat struct
func StatMtime(st *syscall.Stat_t) syscall.Timespec {
return st.Mtimespec
}

View File

@ -18,7 +18,6 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/rootfs"
@ -26,7 +25,6 @@ import (
google_protobuf "github.com/gogo/protobuf/types"
digest "github.com/opencontainers/go-digest"
"github.com/opencontainers/image-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
)
@ -359,6 +357,12 @@ func (t *task) Resize(ctx context.Context, w, h uint32) error {
}
func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Image, error) {
ctx, done, err := t.client.withLease(ctx)
if err != nil {
return nil, err
}
defer done()
request := &tasks.CheckpointTaskRequest{
ContainerID: t.id,
}
@ -392,15 +396,6 @@ func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Imag
index := v1.Index{
Annotations: make(map[string]string),
}
// make sure we clear the gc root labels reguardless of success
var clearRoots []ocispec.Descriptor
defer func() {
for _, r := range append(index.Manifests, clearRoots...) {
if err := clearRootGCLabel(ctx, t.client, r); err != nil {
log.G(ctx).WithError(err).WithField("dgst", r.Digest).Warnf("failed to remove root marker")
}
}
}()
if err := t.checkpointTask(ctx, &index, request); err != nil {
return nil, err
}
@ -419,7 +414,6 @@ func (t *task) Checkpoint(ctx context.Context, opts ...CheckpointTaskOpts) (Imag
if err != nil {
return nil, err
}
clearRoots = append(clearRoots, desc)
im := images.Image{
Name: i.Name,
Target: desc,
@ -535,9 +529,6 @@ func (t *task) checkpointTask(ctx context.Context, index *v1.Index, request *tas
func (t *task) checkpointRWSnapshot(ctx context.Context, index *v1.Index, snapshotterName string, id string) error {
opts := []diff.Opt{
diff.WithReference(fmt.Sprintf("checkpoint-rw-%s", id)),
diff.WithLabels(map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}),
}
rw, err := rootfs.Diff(ctx, id, t.client.SnapshotService(snapshotterName), t.client.DiffService(), opts...)
if err != nil {
@ -564,9 +555,7 @@ func (t *task) checkpointImage(ctx context.Context, index *v1.Index, image strin
}
func (t *task) writeIndex(ctx context.Context, index *v1.Index) (d v1.Descriptor, err error) {
labels := map[string]string{
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
}
labels := map[string]string{}
for i, m := range index.Manifests {
labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = m.Digest.String()
}
@ -596,9 +585,3 @@ func writeContent(ctx context.Context, store content.Store, mediaType, ref strin
Size: size,
}, nil
}
func clearRootGCLabel(ctx context.Context, client *Client, desc ocispec.Descriptor) error {
info := content.Info{Digest: desc.Digest}
_, err := client.ContentStore().Update(ctx, info, "labels.containerd.io/gc.root")
return err
}