From 7884707c2f4c96104f65981e8f2187e2cdbf1675 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Mon, 25 Sep 2017 11:15:06 -0700 Subject: [PATCH] Add reference labels to snapshots and content Ensure all snapshots and content are referenced on commit and protected from cleanup. Signed-off-by: Derek McGowan --- container_opts.go | 14 +++++++-- content/helpers.go | 4 +-- image.go | 73 ++++++++++++++++++++++++++++++++++----------- remotes/handlers.go | 73 ++++++++++++++++++++++++++++++++++++++++++++- rootfs/apply.go | 6 ++-- 5 files changed, 144 insertions(+), 26 deletions(-) diff --git a/container_opts.go b/container_opts.go index b8d334638..96ba75812 100644 --- a/container_opts.go +++ b/container_opts.go @@ -2,10 +2,12 @@ package containerd import ( "context" + "time" "github.com/containerd/containerd/containers" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/snapshot" "github.com/containerd/typeurl" "github.com/gogo/protobuf/types" "github.com/opencontainers/image-spec/identity" @@ -91,7 +93,11 @@ func WithNewSnapshot(id string, i Image) NewContainerOpts { return err } setSnapshotterIfEmpty(c) - if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + labels := map[string]string{ + "containerd.io/gc.root": time.Now().String(), + } + parent := identity.ChainID(diffIDs).String() + if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent, snapshot.WithLabels(labels)); err != nil { return err } c.SnapshotKey = id @@ -120,7 +126,11 @@ func WithNewSnapshotView(id string, i Image) NewContainerOpts { return err } setSnapshotterIfEmpty(c) - if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, identity.ChainID(diffIDs).String()); err != nil { + labels := map[string]string{ + "containerd.io/gc.root": time.Now().String(), + } + parent := identity.ChainID(diffIDs).String() + if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent, snapshot.WithLabels(labels)); err != nil { return err } c.SnapshotKey = id diff --git a/content/helpers.go b/content/helpers.go index 1c1087057..af05d0688 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -69,7 +69,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i // the size or digest is unknown, these values may be empty. // // Copy is buffered, so no need to wrap reader in buffered io. -func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest) error { +func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { ws, err := cw.Status() if err != nil { return err @@ -96,7 +96,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige return err } - if err := cw.Commit(ctx, size, expected); err != nil { + if err := cw.Commit(ctx, size, expected, opts...); err != nil { if !errdefs.IsAlreadyExists(err) { return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) } diff --git a/image.go b/image.go index d333c44fa..b41037a16 100644 --- a/image.go +++ b/image.go @@ -2,11 +2,16 @@ package containerd import ( "context" + "fmt" + "time" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/rootfs" + "github.com/containerd/containerd/snapshot" digest "github.com/opencontainers/go-digest" + "github.com/opencontainers/image-spec/identity" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -64,36 +69,68 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error { return err } - sn := i.client.SnapshotService(snapshotterName) - a := i.client.DiffService() - cs := i.client.ContentStore() + var ( + sn = i.client.SnapshotService(snapshotterName) + a = i.client.DiffService() + cs = i.client.ContentStore() - var chain []digest.Digest + chain []digest.Digest + unpacked bool + ) for _, layer := range layers { - unpacked, err := rootfs.ApplyLayer(ctx, layer, chain, sn, a) + labels := map[string]string{ + "containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339), + "containerd.io/uncompressed": layer.Diff.Digest.String(), + } + lastUnpacked := unpacked + + unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels)) if err != nil { - // TODO: possibly wait and retry if extraction of same chain id was in progress return err } - if unpacked { - info, err := cs.Info(ctx, layer.Blob.Digest) - if err != nil { + + if lastUnpacked { + info := snapshot.Info{ + Name: identity.ChainID(chain).String(), + } + + // Remove previously created gc.root label + if _, err := sn.Update(ctx, info, "labels.containerd.io/gc.root"); err != nil { return err } - if info.Labels == nil { - info.Labels = map[string]string{} - } - if info.Labels["containerd.io/uncompressed"] != layer.Diff.Digest.String() { - info.Labels["containerd.io/uncompressed"] = layer.Diff.Digest.String() - if _, err := cs.Update(ctx, info, "labels.containerd.io/uncompressed"); err != nil { - return err - } - } } chain = append(chain, layer.Diff.Digest) } + if unpacked { + desc, err := i.i.Config(ctx, cs, platforms.Default()) + if err != nil { + return err + } + + rootfs := identity.ChainID(chain).String() + + cinfo := content.Info{ + Digest: desc.Digest, + Labels: map[string]string{ + fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snapshotterName): rootfs, + }, + } + if _, err := cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName)); err != nil { + return err + } + + sinfo := snapshot.Info{ + Name: rootfs, + } + + // Config now referenced snapshot, release root reference + if _, err := sn.Update(ctx, sinfo, "labels.containerd.io/gc.root"); err != nil { + return err + } + } + return nil } diff --git a/remotes/handlers.go b/remotes/handlers.go index 0a4db6bce..f92bb469d 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -2,6 +2,7 @@ package remotes import ( "context" + "encoding/json" "fmt" "io" "time" @@ -11,6 +12,7 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -102,7 +104,76 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc } defer rc.Close() - return content.Copy(ctx, cw, rc, desc.Size, desc.Digest) + r, opts := commitOpts(desc, rc) + return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...) +} + +// commitOpts gets the appropriate content options to alter +// the content info on commit based on media type. +func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) { + var childrenF func(r io.Reader) ([]ocispec.Descriptor, error) + + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) { + var ( + manifest ocispec.Manifest + decoder = json.NewDecoder(r) + ) + if err := decoder.Decode(&manifest); err != nil { + return nil, err + } + + return append([]ocispec.Descriptor{manifest.Config}, manifest.Layers...), nil + } + case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) { + var ( + index ocispec.Index + decoder = json.NewDecoder(r) + ) + if err := decoder.Decode(&index); err != nil { + return nil, err + } + + return index.Manifests, nil + } + default: + return r, nil + } + + pr, pw := io.Pipe() + + var children []ocispec.Descriptor + errC := make(chan error) + + go func() { + defer close(errC) + ch, err := childrenF(pr) + if err != nil { + errC <- err + } + children = ch + }() + + opt := func(info *content.Info) error { + err := <-errC + if err != nil { + return errors.Wrap(err, "unable to get commit labels") + } + + if len(children) > 0 { + if info.Labels == nil { + info.Labels = map[string]string{} + } + for i, ch := range children { + info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String() + } + } + return nil + } + + return io.TeeReader(r, pw), []content.Opt{opt} } // PushHandler returns a handler that will push all content from the provider diff --git a/rootfs/apply.go b/rootfs/apply.go index 4e21fd10f..a198c99f9 100644 --- a/rootfs/apply.go +++ b/rootfs/apply.go @@ -63,8 +63,8 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID) - // Prepare snapshot with from parent - mounts, err := sn.Prepare(ctx, key, parent.String()) + // Prepare snapshot with from parent, label as root + mounts, err := sn.Prepare(ctx, key, parent.String(), opts...) if err != nil { //TODO: If is snapshot exists error, retry return false, errors.Wrap(err, "failed to prepare extraction layer") @@ -87,7 +87,7 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap return false, err } - if err = sn.Commit(ctx, chainID.String(), key); err != nil { + if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil { if !errdefs.IsAlreadyExists(err) { return false, errors.Wrapf(err, "failed to commit snapshot %s", parent) }