Add reference labels to snapshots and content
Ensure all snapshots and content are referenced on commit and protected from cleanup. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
551579b316
commit
7884707c2f
@ -2,10 +2,12 @@ package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
"github.com/containerd/typeurl"
|
||||
"github.com/gogo/protobuf/types"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
@ -91,7 +93,11 @@ func WithNewSnapshot(id string, i Image) NewContainerOpts {
|
||||
return err
|
||||
}
|
||||
setSnapshotterIfEmpty(c)
|
||||
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, identity.ChainID(diffIDs).String()); err != nil {
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().String(),
|
||||
}
|
||||
parent := identity.ChainID(diffIDs).String()
|
||||
if _, err := client.SnapshotService(c.Snapshotter).Prepare(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
|
||||
return err
|
||||
}
|
||||
c.SnapshotKey = id
|
||||
@ -120,7 +126,11 @@ func WithNewSnapshotView(id string, i Image) NewContainerOpts {
|
||||
return err
|
||||
}
|
||||
setSnapshotterIfEmpty(c)
|
||||
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, identity.ChainID(diffIDs).String()); err != nil {
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().String(),
|
||||
}
|
||||
parent := identity.ChainID(diffIDs).String()
|
||||
if _, err := client.SnapshotService(c.Snapshotter).View(ctx, id, parent, snapshot.WithLabels(labels)); err != nil {
|
||||
return err
|
||||
}
|
||||
c.SnapshotKey = id
|
||||
|
@ -69,7 +69,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size i
|
||||
// the size or digest is unknown, these values may be empty.
|
||||
//
|
||||
// Copy is buffered, so no need to wrap reader in buffered io.
|
||||
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest) error {
|
||||
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
|
||||
ws, err := cw.Status()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -96,7 +96,7 @@ func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected dige
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cw.Commit(ctx, size, expected); err != nil {
|
||||
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
|
||||
}
|
||||
|
69
image.go
69
image.go
@ -2,11 +2,16 @@ package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
"github.com/containerd/containerd/rootfs"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
@ -64,36 +69,68 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
sn := i.client.SnapshotService(snapshotterName)
|
||||
a := i.client.DiffService()
|
||||
cs := i.client.ContentStore()
|
||||
var (
|
||||
sn = i.client.SnapshotService(snapshotterName)
|
||||
a = i.client.DiffService()
|
||||
cs = i.client.ContentStore()
|
||||
|
||||
var chain []digest.Digest
|
||||
chain []digest.Digest
|
||||
unpacked bool
|
||||
)
|
||||
for _, layer := range layers {
|
||||
unpacked, err := rootfs.ApplyLayer(ctx, layer, chain, sn, a)
|
||||
if err != nil {
|
||||
// TODO: possibly wait and retry if extraction of same chain id was in progress
|
||||
return err
|
||||
labels := map[string]string{
|
||||
"containerd.io/gc.root": time.Now().UTC().Format(time.RFC3339),
|
||||
"containerd.io/uncompressed": layer.Diff.Digest.String(),
|
||||
}
|
||||
if unpacked {
|
||||
info, err := cs.Info(ctx, layer.Blob.Digest)
|
||||
lastUnpacked := unpacked
|
||||
|
||||
unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a, snapshot.WithLabels(labels))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if info.Labels == nil {
|
||||
info.Labels = map[string]string{}
|
||||
|
||||
if lastUnpacked {
|
||||
info := snapshot.Info{
|
||||
Name: identity.ChainID(chain).String(),
|
||||
}
|
||||
if info.Labels["containerd.io/uncompressed"] != layer.Diff.Digest.String() {
|
||||
info.Labels["containerd.io/uncompressed"] = layer.Diff.Digest.String()
|
||||
if _, err := cs.Update(ctx, info, "labels.containerd.io/uncompressed"); err != nil {
|
||||
|
||||
// Remove previously created gc.root label
|
||||
if _, err := sn.Update(ctx, info, "labels.containerd.io/gc.root"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
chain = append(chain, layer.Diff.Digest)
|
||||
}
|
||||
|
||||
if unpacked {
|
||||
desc, err := i.i.Config(ctx, cs, platforms.Default())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rootfs := identity.ChainID(chain).String()
|
||||
|
||||
cinfo := content.Info{
|
||||
Digest: desc.Digest,
|
||||
Labels: map[string]string{
|
||||
fmt.Sprintf("containerd.io/gc.ref.snapshot.%s", snapshotterName): rootfs,
|
||||
},
|
||||
}
|
||||
if _, err := cs.Update(ctx, cinfo, fmt.Sprintf("labels.containerd.io/gc.ref.snapshot.%s", snapshotterName)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sinfo := snapshot.Info{
|
||||
Name: rootfs,
|
||||
}
|
||||
|
||||
// Config now referenced snapshot, release root reference
|
||||
if _, err := sn.Update(ctx, sinfo, "labels.containerd.io/gc.root"); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@ package remotes
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
@ -11,6 +12,7 @@ import (
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -102,7 +104,76 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
return content.Copy(ctx, cw, rc, desc.Size, desc.Digest)
|
||||
r, opts := commitOpts(desc, rc)
|
||||
return content.Copy(ctx, cw, r, desc.Size, desc.Digest, opts...)
|
||||
}
|
||||
|
||||
// commitOpts gets the appropriate content options to alter
|
||||
// the content info on commit based on media type.
|
||||
func commitOpts(desc ocispec.Descriptor, r io.Reader) (io.Reader, []content.Opt) {
|
||||
var childrenF func(r io.Reader) ([]ocispec.Descriptor, error)
|
||||
|
||||
switch desc.MediaType {
|
||||
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||
childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) {
|
||||
var (
|
||||
manifest ocispec.Manifest
|
||||
decoder = json.NewDecoder(r)
|
||||
)
|
||||
if err := decoder.Decode(&manifest); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return append([]ocispec.Descriptor{manifest.Config}, manifest.Layers...), nil
|
||||
}
|
||||
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
||||
childrenF = func(r io.Reader) ([]ocispec.Descriptor, error) {
|
||||
var (
|
||||
index ocispec.Index
|
||||
decoder = json.NewDecoder(r)
|
||||
)
|
||||
if err := decoder.Decode(&index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return index.Manifests, nil
|
||||
}
|
||||
default:
|
||||
return r, nil
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
var children []ocispec.Descriptor
|
||||
errC := make(chan error)
|
||||
|
||||
go func() {
|
||||
defer close(errC)
|
||||
ch, err := childrenF(pr)
|
||||
if err != nil {
|
||||
errC <- err
|
||||
}
|
||||
children = ch
|
||||
}()
|
||||
|
||||
opt := func(info *content.Info) error {
|
||||
err := <-errC
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to get commit labels")
|
||||
}
|
||||
|
||||
if len(children) > 0 {
|
||||
if info.Labels == nil {
|
||||
info.Labels = map[string]string{}
|
||||
}
|
||||
for i, ch := range children {
|
||||
info.Labels[fmt.Sprintf("containerd.io/gc.ref.content.%d", i)] = ch.Digest.String()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return io.TeeReader(r, pw), []content.Opt{opt}
|
||||
}
|
||||
|
||||
// PushHandler returns a handler that will push all content from the provider
|
||||
|
@ -63,8 +63,8 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
|
||||
|
||||
key := fmt.Sprintf("extract-%s %s", uniquePart(), chainID)
|
||||
|
||||
// Prepare snapshot with from parent
|
||||
mounts, err := sn.Prepare(ctx, key, parent.String())
|
||||
// Prepare snapshot with from parent, label as root
|
||||
mounts, err := sn.Prepare(ctx, key, parent.String(), opts...)
|
||||
if err != nil {
|
||||
//TODO: If is snapshot exists error, retry
|
||||
return false, errors.Wrap(err, "failed to prepare extraction layer")
|
||||
@ -87,7 +87,7 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap
|
||||
return false, err
|
||||
}
|
||||
|
||||
if err = sn.Commit(ctx, chainID.String(), key); err != nil {
|
||||
if err = sn.Commit(ctx, chainID.String(), key, opts...); err != nil {
|
||||
if !errdefs.IsAlreadyExists(err) {
|
||||
return false, errors.Wrapf(err, "failed to commit snapshot %s", parent)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user