diff --git a/diff/lcow/lcow.go b/diff/lcow/lcow.go index ad68d1e1a..59b812cfc 100644 --- a/diff/lcow/lcow.go +++ b/diff/lcow/lcow.go @@ -27,11 +27,9 @@ import ( "github.com/Microsoft/go-winio/pkg/security" "github.com/Microsoft/hcsshim/ext4/tar2ext4" - "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" @@ -108,34 +106,39 @@ func (s windowsLcowDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mou } }() + var config diff.ApplyConfig + for _, o := range opts { + if err := o(&config); err != nil { + return emptyDesc, errors.Wrap(err, "failed to apply config opt") + } + } + layer, _, err := mountsToLayerAndParents(mounts) if err != nil { return emptyDesc, err } - isCompressed, err := images.IsCompressedDiff(ctx, desc.MediaType) - if err != nil { - return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType) - } - ra, err := s.store.ReaderAt(ctx, desc) if err != nil { return emptyDesc, errors.Wrap(err, "failed to get reader from content store") } defer ra.Close() - rdr := content.NewReader(ra) - if isCompressed { - ds, err := compression.DecompressStream(rdr) - if err != nil { - return emptyDesc, err + + processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + for { + if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { + return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) + } + if processor.MediaType() == ocispec.MediaTypeImageLayer { + break } - defer ds.Close() - rdr = ds } + defer processor.Close() + // Calculate the Digest as we go digester := digest.Canonical.Digester() rc := &readCounter{ - r: io.TeeReader(rdr, digester.Hash()), + r: io.TeeReader(processor, digester.Hash()), } layerPath := path.Join(layer, "layer.vhd") diff --git a/diff/stream.go b/diff/stream.go index 8a3571098..b53b8b4cd 100644 --- a/diff/stream.go +++ b/diff/stream.go @@ -188,9 +188,7 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string } } -const ( - mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE" -) +const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE" // NewBinaryProcessor returns a binary processor for use with processing content streams func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { diff --git a/diff/windows/windows.go b/diff/windows/windows.go index bf2887ad0..4ad3b875e 100644 --- a/diff/windows/windows.go +++ b/diff/windows/windows.go @@ -26,11 +26,9 @@ import ( winio "github.com/Microsoft/go-winio" "github.com/containerd/containerd/archive" - "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" @@ -100,9 +98,11 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts } }() - isCompressed, err := images.IsCompressedDiff(ctx, desc.MediaType) - if err != nil { - return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType) + var config diff.ApplyConfig + for _, o := range opts { + if err := o(&config); err != nil { + return emptyDesc, errors.Wrap(err, "failed to apply config opt") + } } ra, err := s.store.ReaderAt(ctx, desc) @@ -111,19 +111,20 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts } defer ra.Close() - r := content.NewReader(ra) - if isCompressed { - ds, err := compression.DecompressStream(r) - if err != nil { - return emptyDesc, err + processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + for { + if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { + return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) + } + if processor.MediaType() == ocispec.MediaTypeImageLayer { + break } - defer ds.Close() - r = ds } + defer processor.Close() digester := digest.Canonical.Digester() rc := &readCounter{ - r: io.TeeReader(r, digester.Hash()), + r: io.TeeReader(processor, digester.Hash()), } layer, parentLayerPaths, err := mountsToLayerAndParents(mounts)