Merge pull request #3135 from dmcgowan/archive-importer-docker-types

Compress import blobs in Docker compatibility code
This commit is contained in:
Phil Estes 2019-07-17 09:25:40 -04:00 committed by GitHub
commit 129942ca4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 109 additions and 16 deletions

View File

@ -22,12 +22,14 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"path"
"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
digest "github.com/opencontainers/go-digest"
@ -137,17 +139,21 @@ func ImportIndex(ctx context.Context, store content.Store, reader io.Reader) (oc
if !ok {
return ocispec.Descriptor{}, errors.Errorf("image config %q not found", mfst.Config)
}
config.MediaType = ocispec.MediaTypeImageConfig
config.MediaType = images.MediaTypeDockerSchema2Config
layers, err := resolveLayers(ctx, store, mfst.Layers, blobs)
if err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to resolve layers")
}
manifest := ocispec.Manifest{
Versioned: specs.Versioned{
manifest := struct {
SchemaVersion int `json:"schemaVersion"`
MediaType string `json:"mediaType"`
Config ocispec.Descriptor `json:"config"`
Layers []ocispec.Descriptor `json:"layers"`
}{
SchemaVersion: 2,
},
MediaType: images.MediaTypeDockerSchema2Manifest,
Config: config,
Layers: layers,
}
@ -212,35 +218,111 @@ func onUntarBlob(ctx context.Context, r io.Reader, store content.Ingester, size
}
func resolveLayers(ctx context.Context, store content.Store, layerFiles []string, blobs map[string]ocispec.Descriptor) ([]ocispec.Descriptor, error) {
var layers []ocispec.Descriptor
for _, f := range layerFiles {
layers := make([]ocispec.Descriptor, len(layerFiles))
descs := map[digest.Digest]*ocispec.Descriptor{}
filters := []string{}
for i, f := range layerFiles {
desc, ok := blobs[f]
if !ok {
return nil, errors.Errorf("layer %q not found", f)
}
layers[i] = desc
descs[desc.Digest] = &layers[i]
filters = append(filters, "labels.\"containerd.io/uncompressed\"=="+desc.Digest.String())
}
err := store.Walk(ctx, func(info content.Info) error {
dgst, ok := info.Labels["containerd.io/uncompressed"]
if ok {
desc := descs[digest.Digest(dgst)]
if desc != nil {
desc.MediaType = images.MediaTypeDockerSchema2LayerGzip
desc.Digest = info.Digest
desc.Size = info.Size
}
}
return nil
}, filters...)
if err != nil {
return nil, errors.Wrap(err, "failure checking for compressed blobs")
}
for i, desc := range layers {
if desc.MediaType != "" {
continue
}
// Open blob, resolve media type
ra, err := store.ReaderAt(ctx, desc)
if err != nil {
return nil, errors.Wrapf(err, "failed to open %q (%s)", f, desc.Digest)
return nil, errors.Wrapf(err, "failed to open %q (%s)", layerFiles[i], desc.Digest)
}
s, err := compression.DecompressStream(content.NewReader(ra))
if err != nil {
return nil, errors.Wrapf(err, "failed to detect compression for %q", f)
return nil, errors.Wrapf(err, "failed to detect compression for %q", layerFiles[i])
}
if s.GetCompression() == compression.Uncompressed {
// TODO: Support compressing and writing back to content store
desc.MediaType = ocispec.MediaTypeImageLayer
} else {
desc.MediaType = ocispec.MediaTypeImageLayerGzip
ref := fmt.Sprintf("compress-blob-%s-%s", desc.Digest.Algorithm().String(), desc.Digest.Encoded())
labels := map[string]string{
"containerd.io/uncompressed": desc.Digest.String(),
}
layers[i], err = compressBlob(ctx, store, s, ref, content.WithLabels(labels))
if err != nil {
s.Close()
return nil, err
}
}
layers[i].MediaType = images.MediaTypeDockerSchema2LayerGzip
s.Close()
layers = append(layers, desc)
}
return layers, nil
}
func compressBlob(ctx context.Context, cs content.Store, r io.Reader, ref string, opts ...content.Opt) (desc ocispec.Descriptor, err error) {
w, err := content.OpenWriter(ctx, cs, content.WithRef(ref))
if err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to open writer")
}
defer func() {
w.Close()
if err != nil {
cs.Abort(ctx, ref)
}
}()
if err := w.Truncate(0); err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to truncate writer")
}
cw, err := compression.CompressStream(w, compression.Gzip)
if err != nil {
return ocispec.Descriptor{}, err
}
if _, err := io.Copy(cw, r); err != nil {
return ocispec.Descriptor{}, err
}
if err := cw.Close(); err != nil {
return ocispec.Descriptor{}, err
}
cst, err := w.Status()
if err != nil {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to get writer status")
}
desc.Digest = w.Digest()
desc.Size = cst.Offset
if err := w.Commit(ctx, desc.Size, desc.Digest, opts...); err != nil {
if !errdefs.IsAlreadyExists(err) {
return ocispec.Descriptor{}, errors.Wrap(err, "failed to commit")
}
}
return desc, nil
}
func writeManifest(ctx context.Context, cs content.Ingester, manifest interface{}, mediaType string) (ocispec.Descriptor, error) {
manifestBytes, err := json.Marshal(manifest)
if err != nil {

View File

@ -28,6 +28,7 @@ import (
"runtime"
"testing"
"github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/archive/tartest"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/images/archive"
@ -289,6 +290,16 @@ func createContent(size int64, seed int64) ([]byte, digest.Digest) {
if err != nil {
panic(err)
}
wb := bytes.NewBuffer(nil)
cw, err := compression.CompressStream(wb, compression.Gzip)
if err != nil {
panic(err)
}
if _, err := cw.Write(b); err != nil {
panic(err)
}
b = wb.Bytes()
return b, digest.FromBytes(b)
}