From 046536cfb1c8a813420f3903c95f57987b636ae6 Mon Sep 17 00:00:00 2001 From: frank yang Date: Tue, 3 Apr 2018 18:11:03 +0800 Subject: [PATCH] fixbug: blob for schemav1 could be uncompressed Signed-off-by: frank yang --- archive/compression/compression.go | 20 ++++++++++++++++---- remotes/docker/schema1/converter.go | 27 ++++++++++++++++++--------- 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/archive/compression/compression.go b/archive/compression/compression.go index de85438b9..bd50f083b 100644 --- a/archive/compression/compression.go +++ b/archive/compression/compression.go @@ -43,9 +43,17 @@ var ( } ) +// DecompressReadCloser include the stream after decompress and the compress method detected. +type DecompressReadCloser interface { + io.ReadCloser + // GetCompression returns the compress method which is used before decompressing + GetCompression() Compression +} + type readCloserWrapper struct { io.Reader - closer func() error + compression Compression + closer func() error } func (r *readCloserWrapper) Close() error { @@ -55,6 +63,10 @@ func (r *readCloserWrapper) Close() error { return nil } +func (r *readCloserWrapper) GetCompression() Compression { + return r.compression +} + type writeCloserWrapper struct { io.Writer closer func() error @@ -84,7 +96,7 @@ func DetectCompression(source []byte) Compression { } // DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive. -func DecompressStream(archive io.Reader) (io.ReadCloser, error) { +func DecompressStream(archive io.Reader) (DecompressReadCloser, error) { buf := bufioReader32KPool.Get().(*bufio.Reader) buf.Reset(archive) bs, err := buf.Peek(10) @@ -105,14 +117,14 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) { } switch compression := DetectCompression(bs); compression { case Uncompressed: - readBufWrapper := &readCloserWrapper{buf, closer} + readBufWrapper := &readCloserWrapper{buf, compression, closer} return readBufWrapper, nil case Gzip: gzReader, err := gzip.NewReader(buf) if err != nil { return nil, err } - readBufWrapper := &readCloserWrapper{gzReader, closer} + readBufWrapper := &readCloserWrapper{gzReader, compression, closer} return readBufWrapper, nil default: return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension()) diff --git a/remotes/docker/schema1/converter.go b/remotes/docker/schema1/converter.go index cf1137fc3..3155d6ec3 100644 --- a/remotes/docker/schema1/converter.go +++ b/remotes/docker/schema1/converter.go @@ -18,7 +18,6 @@ package schema1 import ( "bytes" - "compress/gzip" "context" "encoding/base64" "encoding/json" @@ -31,6 +30,7 @@ import ( "golang.org/x/sync/errgroup" + "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" @@ -255,8 +255,9 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro log.G(ctx).Debug("fetch blob") var ( - ref = remotes.MakeRefKey(ctx, desc) - calc = newBlobStateCalculator() + ref = remotes.MakeRefKey(ctx, desc) + calc = newBlobStateCalculator() + compressMethod = compression.Gzip ) // size may be unknown, set to zero for content ingest @@ -280,13 +281,14 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro } defer ra.Close() - gr, err := gzip.NewReader(content.NewReader(ra)) + r, err := compression.DecompressStream(content.NewReader(ra)) if err != nil { return err } - defer gr.Close() - _, err = io.Copy(calc, gr) + compressMethod = r.GetCompression() + _, err = io.Copy(calc, r) + r.Close() if err != nil { return err } @@ -303,13 +305,14 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro pr, pw := io.Pipe() eg.Go(func() error { - gr, err := gzip.NewReader(pr) + r, err := compression.DecompressStream(pr) if err != nil { return err } - defer gr.Close() - _, err = io.Copy(calc, gr) + compressMethod = r.GetCompression() + _, err = io.Copy(calc, r) + r.Close() pr.CloseWithError(err) return err }) @@ -333,6 +336,11 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro desc.Size = info.Size } + if compressMethod == compression.Uncompressed { + log.G(ctx).WithField("id", desc.Digest).Debugf("changed media type for uncompressed schema1 layer blob") + desc.MediaType = images.MediaTypeDockerSchema2Layer + } + state := calc.State() c.mu.Lock() @@ -342,6 +350,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro return nil } + func (c *Converter) schema1ManifestHistory() ([]ocispec.History, []digest.Digest, error) { if c.pulledManifest == nil { return nil, nil, errors.New("missing schema 1 manifest for conversion")