diff --git a/client.go b/client.go index 98c2bdd04..7447ce28d 100644 --- a/client.go +++ b/client.go @@ -312,6 +312,11 @@ type RemoteContext struct { // afterwards. Unpacking is required to run an image. Unpack bool + // DiscardContent is a boolean flag to specify whether to allow GC to clean + // layers up from the content store after successfully unpacking these + // contents to the snapshotter. + DiscardContent bool + // UnpackOpts handles options to the unpack call. UnpackOpts []UnpackOpt diff --git a/client_opts.go b/client_opts.go index d0f884f83..784cbc209 100644 --- a/client_opts.go +++ b/client_opts.go @@ -132,6 +132,14 @@ func WithPullUnpack(_ *Client, c *RemoteContext) error { return nil } +// WithDiscardContent is used to allow GC to clean layers up from +// the content store after successfully unpacking these contents to +// the snapshotter. +func WithDiscardContent(_ *Client, c *RemoteContext) error { + c.DiscardContent = true + return nil +} + // WithUnpackOpts is used to add unpack options to the unpacker. func WithUnpackOpts(opts []UnpackOpt) RemoteOpt { return func(_ *Client, c *RemoteContext) error { diff --git a/client_test.go b/client_test.go index b706167e3..bf8f5dbe6 100644 --- a/client_test.go +++ b/client_test.go @@ -29,13 +29,17 @@ import ( "time" "github.com/containerd/containerd/defaults" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" + "github.com/containerd/containerd/leases" "github.com/containerd/containerd/log" "github.com/containerd/containerd/log/logtest" "github.com/containerd/containerd/namespaces" "github.com/containerd/containerd/pkg/testutil" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/sys" + "github.com/opencontainers/go-digest" + "github.com/opencontainers/image-spec/identity" "github.com/sirupsen/logrus" ) @@ -215,6 +219,80 @@ func TestImagePull(t *testing.T) { } } +func TestImagePullWithDiscardContent(t *testing.T) { + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + ctx, cancel := testContext(t) + defer cancel() + + ls := client.LeasesService() + l, err := ls.Create(ctx, leases.WithRandomID(), leases.WithExpiration(24*time.Hour)) + if err != nil { + t.Fatal(err) + } + ctx = leases.WithLease(ctx, l.ID) + img, err := client.Pull(ctx, testImage, + WithPlatformMatcher(platforms.Default()), + WithPullUnpack, + WithDiscardContent, + ) + // Synchronously garbage collect contents + if errL := ls.Delete(ctx, l, leases.SynchronousDelete); errL != nil { + t.Fatal(errL) + } + if err != nil { + t.Fatal(err) + } + + // Check if all layer contents have been unpacked and aren't preserved + var ( + diffIDs []digest.Digest + layers []digest.Digest + ) + cs := client.ContentStore() + manifest, err := images.Manifest(ctx, cs, img.Target(), platforms.Default()) + if err != nil { + t.Fatal(err) + } + if len(manifest.Layers) == 0 { + t.Fatalf("failed to get children from %v", img.Target()) + } + for _, l := range manifest.Layers { + layers = append(layers, l.Digest) + } + config, err := images.Config(ctx, cs, img.Target(), platforms.Default()) + if err != nil { + t.Fatal(err) + } + diffIDs, err = images.RootFS(ctx, cs, config) + if err != nil { + t.Fatal(err) + } + if len(layers) != len(diffIDs) { + t.Fatalf("number of layers and diffIDs don't match: %d != %d", len(layers), len(diffIDs)) + } else if len(layers) == 0 { + t.Fatalf("there is no layers in the target image(parent: %v)", img.Target()) + } + var ( + sn = client.SnapshotService("") + chain []digest.Digest + ) + for i, dgst := range layers { + chain = append(chain, diffIDs[i]) + chainID := identity.ChainID(chain).String() + if _, err := sn.Stat(ctx, chainID); err != nil { + t.Errorf("snapshot %v must exist: %v", chainID, err) + } + if _, err := cs.Info(ctx, dgst); err == nil || !errdefs.IsNotFound(err) { + t.Errorf("content %v must be garbage collected: %v", dgst, err) + } + } +} + func TestImagePullAllPlatforms(t *testing.T) { client, err := newClient(t, address) if err != nil { diff --git a/unpacker.go b/unpacker.go index 11f7b8ddb..523f7a3e2 100644 --- a/unpacker.go +++ b/unpacker.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "math/rand" + "strings" "sync" "sync/atomic" "time" @@ -77,6 +78,7 @@ func (u *unpacker) unpack( rCtx *RemoteContext, h images.Handler, config ocispec.Descriptor, + parentDesc ocispec.Descriptor, layers []ocispec.Descriptor, ) error { p, err := content.ReadBlob(ctx, u.c.ContentStore(), config) @@ -245,6 +247,31 @@ EachLayer: "chainID": chainID, }).Debug("image unpacked") + if rCtx.DiscardContent { + // delete references to successfully unpacked layers + layersMap := map[string]struct{}{} + for _, desc := range layers { + layersMap[desc.Digest.String()] = struct{}{} + } + pinfo, err := cs.Info(ctx, parentDesc.Digest) + if err != nil { + return err + } + fields := []string{} + for k, v := range pinfo.Labels { + if strings.HasPrefix(k, "containerd.io/gc.ref.content.") { + if _, ok := layersMap[v]; ok { + // We've already unpacked this layer content + pinfo.Labels[k] = "" + fields = append(fields, "labels."+k) + } + } + } + if _, err := cs.Update(ctx, pinfo, fields...); err != nil { + return err + } + } + return nil } @@ -287,6 +314,7 @@ func (u *unpacker) handlerWrapper( var ( lock sync.Mutex layers = map[digest.Digest][]ocispec.Descriptor{} + parent = map[digest.Digest]ocispec.Descriptor{} ) return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { children, err := f.Handle(ctx, desc) @@ -312,6 +340,7 @@ func (u *unpacker) handlerWrapper( lock.Lock() for _, nl := range nonLayers { layers[nl.Digest] = manifestLayers + parent[nl.Digest] = desc } lock.Unlock() @@ -319,11 +348,12 @@ func (u *unpacker) handlerWrapper( case images.MediaTypeDockerSchema2Config, ocispec.MediaTypeImageConfig: lock.Lock() l := layers[desc.Digest] + p := parent[desc.Digest] lock.Unlock() if len(l) > 0 { atomic.AddInt32(unpacks, 1) eg.Go(func() error { - return u.unpack(uctx, rCtx, f, desc, l) + return u.unpack(uctx, rCtx, f, desc, p, l) }) } }