
Now that we have most of the services required for use with containerd, it was found that common patterns were used throughout services. By defining a central `errdefs` package, we ensure that services will map errors to and from grpc consistently and cleanly. One can decorate an error with as much context as necessary, using `pkg/errors` and still have the error mapped correctly via grpc. We make a few sacrifices. At this point, the common errors we use across the repository all map directly to grpc error codes. While this seems positively crazy, it actually works out quite well. The error conditions that were specific weren't super necessary and the ones that were necessary now simply have better context information. We lose the ability to add new codes, but this constraint may not be a bad thing. Effectively, as long as one uses the errors defined in `errdefs`, the error class will be mapped correctly across the grpc boundary and everything will be good. If you don't use those definitions, the error maps to "unknown" and the error message is preserved. Signed-off-by: Stephen J Day <stephen.day@docker.com>
451 lines
11 KiB
Go
451 lines
11 KiB
Go
package schema1
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/remotes"
|
|
digest "github.com/opencontainers/go-digest"
|
|
specs "github.com/opencontainers/image-spec/specs-go"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
var (
|
|
mediaTypeManifest = "application/vnd.docker.distribution.manifest.v1+json"
|
|
)
|
|
|
|
type blobState struct {
|
|
diffID digest.Digest
|
|
empty bool
|
|
}
|
|
|
|
// Converter converts schema1 manifests to schema2 on fetch
|
|
type Converter struct {
|
|
contentStore content.Store
|
|
fetcher remotes.Fetcher
|
|
|
|
pulledManifest *manifest
|
|
|
|
mu sync.Mutex
|
|
blobMap map[digest.Digest]blobState
|
|
layerBlobs map[digest.Digest]ocispec.Descriptor
|
|
}
|
|
|
|
func NewConverter(contentStore content.Store, fetcher remotes.Fetcher) *Converter {
|
|
return &Converter{
|
|
contentStore: contentStore,
|
|
fetcher: fetcher,
|
|
blobMap: map[digest.Digest]blobState{},
|
|
layerBlobs: map[digest.Digest]ocispec.Descriptor{},
|
|
}
|
|
}
|
|
|
|
func (c *Converter) Handle(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
switch desc.MediaType {
|
|
case images.MediaTypeDockerSchema1Manifest:
|
|
if err := c.fetchManifest(ctx, desc); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
m := c.pulledManifest
|
|
if len(m.FSLayers) != len(m.History) {
|
|
return nil, errors.New("invalid schema 1 manifest, history and layer mismatch")
|
|
}
|
|
descs := make([]ocispec.Descriptor, 0, len(c.pulledManifest.FSLayers))
|
|
|
|
for i := range m.FSLayers {
|
|
if _, ok := c.blobMap[c.pulledManifest.FSLayers[i].BlobSum]; !ok {
|
|
empty, err := isEmptyLayer([]byte(m.History[i].V1Compatibility))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Do no attempt to download a known empty blob
|
|
if !empty {
|
|
descs = append([]ocispec.Descriptor{
|
|
{
|
|
MediaType: images.MediaTypeDockerSchema2LayerGzip,
|
|
Digest: c.pulledManifest.FSLayers[i].BlobSum,
|
|
},
|
|
}, descs...)
|
|
}
|
|
c.blobMap[c.pulledManifest.FSLayers[i].BlobSum] = blobState{
|
|
empty: empty,
|
|
}
|
|
}
|
|
}
|
|
return descs, nil
|
|
case images.MediaTypeDockerSchema2LayerGzip:
|
|
if c.pulledManifest == nil {
|
|
return nil, errors.New("manifest required for schema 1 blob pull")
|
|
}
|
|
return nil, c.fetchBlob(ctx, desc)
|
|
default:
|
|
return nil, fmt.Errorf("%v not support for schema 1 manifests", desc.MediaType)
|
|
}
|
|
}
|
|
|
|
func (c *Converter) Convert(ctx context.Context) (ocispec.Descriptor, error) {
|
|
history, diffIDs, err := c.schema1ManifestHistory()
|
|
if err != nil {
|
|
return ocispec.Descriptor{}, errors.Wrap(err, "schema 1 conversion failed")
|
|
}
|
|
|
|
var img ocispec.Image
|
|
if err := json.Unmarshal([]byte(c.pulledManifest.History[0].V1Compatibility), &img); err != nil {
|
|
return ocispec.Descriptor{}, errors.Wrap(err, "failed to unmarshal image from schema 1 history")
|
|
}
|
|
|
|
img.History = history
|
|
img.RootFS = ocispec.RootFS{
|
|
Type: "layers",
|
|
DiffIDs: diffIDs,
|
|
}
|
|
|
|
b, err := json.Marshal(img)
|
|
if err != nil {
|
|
return ocispec.Descriptor{}, errors.Wrap(err, "failed to marshal image")
|
|
}
|
|
|
|
config := ocispec.Descriptor{
|
|
MediaType: ocispec.MediaTypeImageConfig,
|
|
Digest: digest.Canonical.FromBytes(b),
|
|
Size: int64(len(b)),
|
|
}
|
|
|
|
ref := remotes.MakeRefKey(ctx, config)
|
|
if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), config.Size, config.Digest); err != nil {
|
|
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
|
|
}
|
|
|
|
layers := make([]ocispec.Descriptor, len(diffIDs))
|
|
for i, diffID := range diffIDs {
|
|
layers[i] = c.layerBlobs[diffID]
|
|
}
|
|
|
|
manifest := ocispec.Manifest{
|
|
Versioned: specs.Versioned{
|
|
SchemaVersion: 2,
|
|
},
|
|
Config: config,
|
|
Layers: layers,
|
|
}
|
|
|
|
b, err = json.Marshal(manifest)
|
|
if err != nil {
|
|
return ocispec.Descriptor{}, errors.Wrap(err, "failed to marshal image")
|
|
}
|
|
|
|
desc := ocispec.Descriptor{
|
|
MediaType: ocispec.MediaTypeImageManifest,
|
|
Digest: digest.Canonical.FromBytes(b),
|
|
Size: int64(len(b)),
|
|
}
|
|
|
|
ref = remotes.MakeRefKey(ctx, desc)
|
|
if err := content.WriteBlob(ctx, c.contentStore, ref, bytes.NewReader(b), desc.Size, desc.Digest); err != nil {
|
|
return ocispec.Descriptor{}, errors.Wrap(err, "failed to write config")
|
|
}
|
|
|
|
return desc, nil
|
|
}
|
|
|
|
func (c *Converter) fetchManifest(ctx context.Context, desc ocispec.Descriptor) error {
|
|
log.G(ctx).Debug("fetch schema 1")
|
|
|
|
rc, err := c.fetcher.Fetch(ctx, desc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b, err := ioutil.ReadAll(rc)
|
|
rc.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
b, err = stripSignature(b)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var m manifest
|
|
if err := json.Unmarshal(b, &m); err != nil {
|
|
return err
|
|
}
|
|
c.pulledManifest = &m
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) error {
|
|
log.G(ctx).Debug("fetch blob")
|
|
|
|
ref := remotes.MakeRefKey(ctx, desc)
|
|
|
|
calc := newBlobStateCalculator()
|
|
|
|
cw, err := c.contentStore.Writer(ctx, ref, desc.Size, desc.Digest)
|
|
if err != nil {
|
|
if !errdefs.IsAlreadyExists(err) {
|
|
return err
|
|
}
|
|
|
|
// TODO: Check if blob -> diff id mapping already exists
|
|
// TODO: Check if blob empty label exists
|
|
|
|
r, err := c.contentStore.Reader(ctx, desc.Digest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer r.Close()
|
|
|
|
gr, err := gzip.NewReader(r)
|
|
defer gr.Close()
|
|
|
|
_, err = io.Copy(calc, gr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
defer cw.Close()
|
|
|
|
rc, err := c.fetcher.Fetch(ctx, desc)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer rc.Close()
|
|
|
|
eg, _ := errgroup.WithContext(ctx)
|
|
pr, pw := io.Pipe()
|
|
|
|
eg.Go(func() error {
|
|
gr, err := gzip.NewReader(pr)
|
|
defer gr.Close()
|
|
|
|
_, err = io.Copy(calc, gr)
|
|
pr.CloseWithError(err)
|
|
return err
|
|
})
|
|
|
|
eg.Go(func() error {
|
|
defer pw.Close()
|
|
return content.Copy(cw, io.TeeReader(rc, pw), desc.Size, desc.Digest)
|
|
})
|
|
|
|
if err := eg.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// TODO: Label blob
|
|
}
|
|
|
|
if desc.Size == 0 {
|
|
info, err := c.contentStore.Info(ctx, desc.Digest)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to get blob info")
|
|
}
|
|
desc.Size = info.Size
|
|
}
|
|
|
|
state := calc.State()
|
|
|
|
c.mu.Lock()
|
|
c.blobMap[desc.Digest] = state
|
|
c.layerBlobs[state.diffID] = desc
|
|
c.mu.Unlock()
|
|
|
|
return nil
|
|
}
|
|
func (c *Converter) schema1ManifestHistory() ([]ocispec.History, []digest.Digest, error) {
|
|
if c.pulledManifest == nil {
|
|
return nil, nil, errors.New("missing schema 1 manifest for conversion")
|
|
}
|
|
m := *c.pulledManifest
|
|
|
|
if len(m.History) == 0 {
|
|
return nil, nil, errors.New("no history")
|
|
}
|
|
|
|
history := make([]ocispec.History, len(m.History))
|
|
diffIDs := []digest.Digest{}
|
|
for i := range m.History {
|
|
var h v1History
|
|
if err := json.Unmarshal([]byte(m.History[i].V1Compatibility), &h); err != nil {
|
|
return nil, nil, errors.Wrap(err, "failed to unmarshal history")
|
|
}
|
|
|
|
blobSum := m.FSLayers[i].BlobSum
|
|
|
|
state := c.blobMap[blobSum]
|
|
|
|
history[len(history)-i-1] = ocispec.History{
|
|
Author: h.Author,
|
|
Comment: h.Comment,
|
|
Created: &h.Created,
|
|
CreatedBy: strings.Join(h.ContainerConfig.Cmd, " "),
|
|
EmptyLayer: state.empty,
|
|
}
|
|
|
|
if !state.empty {
|
|
diffIDs = append([]digest.Digest{state.diffID}, diffIDs...)
|
|
|
|
}
|
|
}
|
|
|
|
return history, diffIDs, nil
|
|
}
|
|
|
|
type fsLayer struct {
|
|
BlobSum digest.Digest `json:"blobSum"`
|
|
}
|
|
|
|
type history struct {
|
|
V1Compatibility string `json:"v1Compatibility"`
|
|
}
|
|
|
|
type manifest struct {
|
|
FSLayers []fsLayer `json:"fsLayers"`
|
|
History []history `json:"history"`
|
|
}
|
|
|
|
type v1History struct {
|
|
Author string `json:"author,omitempty"`
|
|
Created time.Time `json:"created"`
|
|
Comment string `json:"comment,omitempty"`
|
|
ThrowAway *bool `json:"throwaway,omitempty"`
|
|
Size *int `json:"Size,omitempty"` // used before ThrowAway field
|
|
ContainerConfig struct {
|
|
Cmd []string `json:"Cmd,omitempty"`
|
|
} `json:"container_config,omitempty"`
|
|
}
|
|
|
|
// isEmptyLayer returns whether the v1 compability history describes an
|
|
// empty layer. A return value of true indicates the layer is empty,
|
|
// however false does not indicate non-empty.
|
|
func isEmptyLayer(compatHistory []byte) (bool, error) {
|
|
var h v1History
|
|
if err := json.Unmarshal(compatHistory, &h); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if h.ThrowAway != nil {
|
|
return *h.ThrowAway, nil
|
|
}
|
|
if h.Size != nil {
|
|
return *h.Size == 0, nil
|
|
}
|
|
|
|
// If no `Size` or `throwaway` field is given, then
|
|
// it cannot be determined whether the layer is empty
|
|
// from the history, return false
|
|
return false, nil
|
|
}
|
|
|
|
type signature struct {
|
|
Signatures []jsParsedSignature `json:"signatures"`
|
|
}
|
|
|
|
type jsParsedSignature struct {
|
|
Protected string `json:"protected"`
|
|
}
|
|
|
|
type protectedBlock struct {
|
|
Length int `json:"formatLength"`
|
|
Tail string `json:"formatTail"`
|
|
}
|
|
|
|
// joseBase64UrlDecode decodes the given string using the standard base64 url
|
|
// decoder but first adds the appropriate number of trailing '=' characters in
|
|
// accordance with the jose specification.
|
|
// http://tools.ietf.org/html/draft-ietf-jose-json-web-signature-31#section-2
|
|
func joseBase64UrlDecode(s string) ([]byte, error) {
|
|
switch len(s) % 4 {
|
|
case 0:
|
|
case 2:
|
|
s += "=="
|
|
case 3:
|
|
s += "="
|
|
default:
|
|
return nil, errors.New("illegal base64url string")
|
|
}
|
|
return base64.URLEncoding.DecodeString(s)
|
|
}
|
|
|
|
func stripSignature(b []byte) ([]byte, error) {
|
|
var sig signature
|
|
if err := json.Unmarshal(b, &sig); err != nil {
|
|
return nil, err
|
|
}
|
|
if len(sig.Signatures) == 0 {
|
|
return nil, errors.New("no signatures")
|
|
}
|
|
pb, err := joseBase64UrlDecode(sig.Signatures[0].Protected)
|
|
if err != nil {
|
|
return nil, errors.Wrapf(err, "could not decode %s", sig.Signatures[0].Protected)
|
|
}
|
|
|
|
var protected protectedBlock
|
|
if err := json.Unmarshal(pb, &protected); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if protected.Length > len(b) {
|
|
return nil, errors.New("invalid protected length block")
|
|
}
|
|
|
|
tail, err := joseBase64UrlDecode(protected.Tail)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "invalid tail base 64 value")
|
|
}
|
|
|
|
return append(b[:protected.Length], tail...), nil
|
|
}
|
|
|
|
type blobStateCalculator struct {
|
|
empty bool
|
|
digester digest.Digester
|
|
}
|
|
|
|
func newBlobStateCalculator() *blobStateCalculator {
|
|
return &blobStateCalculator{
|
|
empty: true,
|
|
digester: digest.Canonical.Digester(),
|
|
}
|
|
}
|
|
|
|
func (c *blobStateCalculator) Write(p []byte) (int, error) {
|
|
if c.empty {
|
|
for _, b := range p {
|
|
if b != 0x00 {
|
|
c.empty = false
|
|
break
|
|
}
|
|
}
|
|
}
|
|
return c.digester.Hash().Write(p)
|
|
}
|
|
|
|
func (c *blobStateCalculator) State() blobState {
|
|
return blobState{
|
|
empty: c.empty,
|
|
diffID: c.digester.Digest(),
|
|
}
|
|
}
|