diff --git a/images/archive/importer.go b/images/archive/importer.go index da83275c3..d42c6dd58 100644 --- a/images/archive/importer.go +++ b/images/archive/importer.go @@ -22,12 +22,14 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" "io/ioutil" "path" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" digest "github.com/opencontainers/go-digest" @@ -137,19 +139,23 @@ func ImportIndex(ctx context.Context, store content.Store, reader io.Reader) (oc if !ok { return ocispec.Descriptor{}, errors.Errorf("image config %q not found", mfst.Config) } - config.MediaType = ocispec.MediaTypeImageConfig + config.MediaType = images.MediaTypeDockerSchema2Config layers, err := resolveLayers(ctx, store, mfst.Layers, blobs) if err != nil { return ocispec.Descriptor{}, errors.Wrap(err, "failed to resolve layers") } - manifest := ocispec.Manifest{ - Versioned: specs.Versioned{ - SchemaVersion: 2, - }, - Config: config, - Layers: layers, + manifest := struct { + SchemaVersion int `json:"schemaVersion"` + MediaType string `json:"mediaType"` + Config ocispec.Descriptor `json:"config"` + Layers []ocispec.Descriptor `json:"layers"` + }{ + SchemaVersion: 2, + MediaType: images.MediaTypeDockerSchema2Manifest, + Config: config, + Layers: layers, } desc, err := writeManifest(ctx, store, manifest, ocispec.MediaTypeImageManifest) @@ -214,35 +220,111 @@ func onUntarBlob(ctx context.Context, r io.Reader, store content.Ingester, size } func resolveLayers(ctx context.Context, store content.Store, layerFiles []string, blobs map[string]ocispec.Descriptor) ([]ocispec.Descriptor, error) { - var layers []ocispec.Descriptor - for _, f := range layerFiles { + layers := make([]ocispec.Descriptor, len(layerFiles)) + descs := map[digest.Digest]*ocispec.Descriptor{} + filters := []string{} + for i, f := range layerFiles { desc, ok := blobs[f] if !ok { return nil, errors.Errorf("layer %q not found", f) } + layers[i] = desc + descs[desc.Digest] = &layers[i] + filters = append(filters, "labels.\"containerd.io/uncompressed\"=="+desc.Digest.String()) + } + err := store.Walk(ctx, func(info content.Info) error { + dgst, ok := info.Labels["containerd.io/uncompressed"] + if ok { + desc := descs[digest.Digest(dgst)] + if desc != nil { + desc.MediaType = images.MediaTypeDockerSchema2LayerGzip + desc.Digest = info.Digest + desc.Size = info.Size + } + } + return nil + }, filters...) + if err != nil { + return nil, errors.Wrap(err, "failure checking for compressed blobs") + } + + for i, desc := range layers { + if desc.MediaType != "" { + continue + } // Open blob, resolve media type ra, err := store.ReaderAt(ctx, desc) if err != nil { - return nil, errors.Wrapf(err, "failed to open %q (%s)", f, desc.Digest) + return nil, errors.Wrapf(err, "failed to open %q (%s)", layerFiles[i], desc.Digest) } s, err := compression.DecompressStream(content.NewReader(ra)) if err != nil { - return nil, errors.Wrapf(err, "failed to detect compression for %q", f) + return nil, errors.Wrapf(err, "failed to detect compression for %q", layerFiles[i]) } if s.GetCompression() == compression.Uncompressed { - // TODO: Support compressing and writing back to content store - desc.MediaType = ocispec.MediaTypeImageLayer - } else { - desc.MediaType = ocispec.MediaTypeImageLayerGzip + ref := fmt.Sprintf("compress-blob-%s-%s", desc.Digest.Algorithm().String(), desc.Digest.Encoded()) + labels := map[string]string{ + "containerd.io/uncompressed": desc.Digest.String(), + } + layers[i], err = compressBlob(ctx, store, s, ref, content.WithLabels(labels)) + if err != nil { + s.Close() + return nil, err + } } + layers[i].MediaType = images.MediaTypeDockerSchema2LayerGzip s.Close() - layers = append(layers, desc) } return layers, nil } +func compressBlob(ctx context.Context, cs content.Store, r io.Reader, ref string, opts ...content.Opt) (desc ocispec.Descriptor, err error) { + w, err := content.OpenWriter(ctx, cs, content.WithRef(ref)) + if err != nil { + return ocispec.Descriptor{}, errors.Wrap(err, "failed to open writer") + } + + defer func() { + w.Close() + if err != nil { + cs.Abort(ctx, ref) + } + }() + if err := w.Truncate(0); err != nil { + return ocispec.Descriptor{}, errors.Wrap(err, "failed to truncate writer") + } + + cw, err := compression.CompressStream(w, compression.Gzip) + if err != nil { + return ocispec.Descriptor{}, err + } + + if _, err := io.Copy(cw, r); err != nil { + return ocispec.Descriptor{}, err + } + if err := cw.Close(); err != nil { + return ocispec.Descriptor{}, err + } + + cst, err := w.Status() + if err != nil { + return ocispec.Descriptor{}, errors.Wrap(err, "failed to get writer status") + } + + desc.Digest = w.Digest() + desc.Size = cst.Offset + + if err := w.Commit(ctx, desc.Size, desc.Digest, opts...); err != nil { + if !errdefs.IsAlreadyExists(err) { + return ocispec.Descriptor{}, errors.Wrap(err, "failed to commit") + } + } + + return desc, nil +} + func writeManifest(ctx context.Context, cs content.Ingester, manifest interface{}, mediaType string) (ocispec.Descriptor, error) { manifestBytes, err := json.Marshal(manifest) if err != nil { diff --git a/import_test.go b/import_test.go index c6cea6d54..7322f2f51 100644 --- a/import_test.go +++ b/import_test.go @@ -17,6 +17,7 @@ package containerd import ( + "bytes" "context" "encoding/json" "io" @@ -26,6 +27,7 @@ import ( "runtime" "testing" + "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/archive/tartest" "github.com/containerd/containerd/images" "github.com/containerd/containerd/images/archive" @@ -274,6 +276,16 @@ func createContent(size int64, seed int64) ([]byte, digest.Digest) { if err != nil { panic(err) } + wb := bytes.NewBuffer(nil) + cw, err := compression.CompressStream(wb, compression.Gzip) + if err != nil { + panic(err) + } + + if _, err := cw.Write(b); err != nil { + panic(err) + } + b = wb.Bytes() return b, digest.FromBytes(b) }