Merge pull request #2263 from alibaba/fix_no_gzip
fixbug: blob for schemav1 could be uncompressed
This commit is contained in:
commit
c55b9636f7
@ -43,9 +43,17 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DecompressReadCloser include the stream after decompress and the compress method detected.
|
||||||
|
type DecompressReadCloser interface {
|
||||||
|
io.ReadCloser
|
||||||
|
// GetCompression returns the compress method which is used before decompressing
|
||||||
|
GetCompression() Compression
|
||||||
|
}
|
||||||
|
|
||||||
type readCloserWrapper struct {
|
type readCloserWrapper struct {
|
||||||
io.Reader
|
io.Reader
|
||||||
closer func() error
|
compression Compression
|
||||||
|
closer func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *readCloserWrapper) Close() error {
|
func (r *readCloserWrapper) Close() error {
|
||||||
@ -55,6 +63,10 @@ func (r *readCloserWrapper) Close() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *readCloserWrapper) GetCompression() Compression {
|
||||||
|
return r.compression
|
||||||
|
}
|
||||||
|
|
||||||
type writeCloserWrapper struct {
|
type writeCloserWrapper struct {
|
||||||
io.Writer
|
io.Writer
|
||||||
closer func() error
|
closer func() error
|
||||||
@ -84,7 +96,7 @@ func DetectCompression(source []byte) Compression {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
|
// DecompressStream decompresses the archive and returns a ReaderCloser with the decompressed archive.
|
||||||
func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
|
func DecompressStream(archive io.Reader) (DecompressReadCloser, error) {
|
||||||
buf := bufioReader32KPool.Get().(*bufio.Reader)
|
buf := bufioReader32KPool.Get().(*bufio.Reader)
|
||||||
buf.Reset(archive)
|
buf.Reset(archive)
|
||||||
bs, err := buf.Peek(10)
|
bs, err := buf.Peek(10)
|
||||||
@ -105,14 +117,14 @@ func DecompressStream(archive io.Reader) (io.ReadCloser, error) {
|
|||||||
}
|
}
|
||||||
switch compression := DetectCompression(bs); compression {
|
switch compression := DetectCompression(bs); compression {
|
||||||
case Uncompressed:
|
case Uncompressed:
|
||||||
readBufWrapper := &readCloserWrapper{buf, closer}
|
readBufWrapper := &readCloserWrapper{buf, compression, closer}
|
||||||
return readBufWrapper, nil
|
return readBufWrapper, nil
|
||||||
case Gzip:
|
case Gzip:
|
||||||
gzReader, err := gzip.NewReader(buf)
|
gzReader, err := gzip.NewReader(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
readBufWrapper := &readCloserWrapper{gzReader, closer}
|
readBufWrapper := &readCloserWrapper{gzReader, compression, closer}
|
||||||
return readBufWrapper, nil
|
return readBufWrapper, nil
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
|
return nil, fmt.Errorf("unsupported compression format %s", (&compression).Extension())
|
||||||
|
@ -18,7 +18,6 @@ package schema1
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
@ -31,6 +30,7 @@ import (
|
|||||||
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
|
||||||
|
"github.com/containerd/containerd/archive/compression"
|
||||||
"github.com/containerd/containerd/content"
|
"github.com/containerd/containerd/content"
|
||||||
"github.com/containerd/containerd/errdefs"
|
"github.com/containerd/containerd/errdefs"
|
||||||
"github.com/containerd/containerd/images"
|
"github.com/containerd/containerd/images"
|
||||||
@ -255,8 +255,9 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
|
|||||||
log.G(ctx).Debug("fetch blob")
|
log.G(ctx).Debug("fetch blob")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ref = remotes.MakeRefKey(ctx, desc)
|
ref = remotes.MakeRefKey(ctx, desc)
|
||||||
calc = newBlobStateCalculator()
|
calc = newBlobStateCalculator()
|
||||||
|
compressMethod = compression.Gzip
|
||||||
)
|
)
|
||||||
|
|
||||||
// size may be unknown, set to zero for content ingest
|
// size may be unknown, set to zero for content ingest
|
||||||
@ -280,13 +281,14 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
|
|||||||
}
|
}
|
||||||
defer ra.Close()
|
defer ra.Close()
|
||||||
|
|
||||||
gr, err := gzip.NewReader(content.NewReader(ra))
|
r, err := compression.DecompressStream(content.NewReader(ra))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer gr.Close()
|
|
||||||
|
|
||||||
_, err = io.Copy(calc, gr)
|
compressMethod = r.GetCompression()
|
||||||
|
_, err = io.Copy(calc, r)
|
||||||
|
r.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -303,13 +305,14 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
|
|||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
|
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
gr, err := gzip.NewReader(pr)
|
r, err := compression.DecompressStream(pr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer gr.Close()
|
|
||||||
|
|
||||||
_, err = io.Copy(calc, gr)
|
compressMethod = r.GetCompression()
|
||||||
|
_, err = io.Copy(calc, r)
|
||||||
|
r.Close()
|
||||||
pr.CloseWithError(err)
|
pr.CloseWithError(err)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -333,6 +336,11 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
|
|||||||
desc.Size = info.Size
|
desc.Size = info.Size
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if compressMethod == compression.Uncompressed {
|
||||||
|
log.G(ctx).WithField("id", desc.Digest).Debugf("changed media type for uncompressed schema1 layer blob")
|
||||||
|
desc.MediaType = images.MediaTypeDockerSchema2Layer
|
||||||
|
}
|
||||||
|
|
||||||
state := calc.State()
|
state := calc.State()
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
@ -342,6 +350,7 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Converter) schema1ManifestHistory() ([]ocispec.History, []digest.Digest, error) {
|
func (c *Converter) schema1ManifestHistory() ([]ocispec.History, []digest.Digest, error) {
|
||||||
if c.pulledManifest == nil {
|
if c.pulledManifest == nil {
|
||||||
return nil, nil, errors.New("missing schema 1 manifest for conversion")
|
return nil, nil, errors.New("missing schema 1 manifest for conversion")
|
||||||
|
Loading…
Reference in New Issue
Block a user