diff --git a/content/content.go b/content/content.go index 6be2087e5..58a2ff436 100644 --- a/content/content.go +++ b/content/content.go @@ -5,7 +5,6 @@ import ( "io" "time" - "github.com/containerd/containerd/oci" "github.com/opencontainers/go-digest" ) @@ -83,8 +82,20 @@ type IngestManager interface { } type Writer interface { - oci.BlobWriter + // Close is expected to be called after Commit() when commission is needed. + io.WriteCloser + + // Digest may return empty digest or panics until committed. + Digest() digest.Digest + + // Commit commits the blob (but no roll-back is guaranteed on an error). + // size and expected can be zero-value when unknown. + Commit(size int64, expected digest.Digest) error + + // Status returns the current state of write Status() (Status, error) + + // Truncate updates the size of the target blob Truncate(size int64) error } diff --git a/content/local/writer.go b/content/local/writer.go index 30a7e7172..dfa8e1290 100644 --- a/content/local/writer.go +++ b/content/local/writer.go @@ -8,7 +8,6 @@ import ( "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/oci" "github.com/opencontainers/go-digest" "github.com/pkg/errors" ) @@ -79,7 +78,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error { } if size > 0 && size != fi.Size() { - return oci.ErrUnexpectedSize{Expected: size, Actual: fi.Size()} + return errors.Errorf("unexpected commit size %d, expected %d", fi.Size(), size) } if err := w.fp.Close(); err != nil { @@ -88,7 +87,7 @@ func (w *writer) Commit(size int64, expected digest.Digest) error { dgst := w.digester.Digest() if expected != "" && expected != dgst { - return oci.ErrUnexpectedDigest{Expected: expected, Actual: dgst} + return errors.Errorf("unexpected commit digest %s, expected %s", dgst, expected) } var ( diff --git a/export.go b/export.go index 826fc9c2b..8cafa1675 100644 --- a/export.go +++ b/export.go @@ -3,65 +3,186 @@ package containerd import ( "archive/tar" "context" + "encoding/json" "io" + "sort" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" - "github.com/containerd/containerd/oci" ocispecs "github.com/opencontainers/image-spec/specs-go" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" ) func (c *Client) exportToOCITar(ctx context.Context, desc ocispec.Descriptor, writer io.Writer, eopts exportOpts) error { tw := tar.NewWriter(writer) - img := oci.Tar(tw) + defer tw.Close() - // For tar, we defer creating index until end of the function. - if err := oci.Init(img, oci.InitOpts{SkipCreateIndex: true}); err != nil { - return err + records := []tarRecord{ + ociLayoutFile(""), + ociIndexRecord(desc), } + cs := c.ContentStore() + algorithms := map[string]struct{}{} + exportHandler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + records = append(records, blobRecord(cs, desc)) + algorithms[desc.Digest.Algorithm().String()] = struct{}{} + return nil, nil + } + handlers := images.Handlers( images.ChildrenHandler(cs), - exportHandler(cs, img), + images.HandlerFunc(exportHandler), ) - // For tar, we need to use Walk instead of Dispatch for ensuring sequential write + + // Walk sequentially since the number of fetchs is likely one and doing in + // parallel requires locking the export handler if err := images.Walk(ctx, handlers, desc); err != nil { return err } - // For tar, we don't use oci.PutManifestDescriptorToIndex() which allows appending desc to existing index.json - // but requires img to support random read access so as to read index.json. - return oci.WriteIndex(img, - ocispec.Index{ - Versioned: ocispecs.Versioned{ - SchemaVersion: 2, - }, - Manifests: []ocispec.Descriptor{desc}, - }, - ) + + if len(algorithms) > 0 { + records = append(records, directoryRecord("blobs/", 0755)) + for alg := range algorithms { + records = append(records, directoryRecord("blobs/"+alg+"/", 0755)) + } + } + + return writeTar(ctx, tw, records) } -func exportHandler(cs content.Store, img oci.ImageDriver) images.HandlerFunc { - return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - r, err := cs.ReaderAt(ctx, desc.Digest) - if err != nil { - return nil, err - } - defer r.Close() +type tarRecord struct { + Header *tar.Header + CopyTo func(context.Context, io.Writer) (int64, error) +} - w, err := oci.NewBlobWriter(img, desc.Digest.Algorithm()) - if err != nil { - return nil, err - } - if _, err = io.Copy(w, content.NewReader(r)); err != nil { - return nil, err - } - if err = w.Commit(desc.Size, desc.Digest); err != nil { - return nil, err - } - if err = w.Close(); err != nil { - return nil, err - } - return nil, nil +func blobRecord(cs content.Store, desc ocispec.Descriptor) tarRecord { + path := "blobs/" + desc.Digest.Algorithm().String() + "/" + desc.Digest.Hex() + return tarRecord{ + Header: &tar.Header{ + Name: path, + Mode: 0444, + Size: desc.Size, + Typeflag: tar.TypeReg, + }, + CopyTo: func(ctx context.Context, w io.Writer) (int64, error) { + r, err := cs.ReaderAt(ctx, desc.Digest) + if err != nil { + return 0, err + } + defer r.Close() + + // Verify digest + dgstr := desc.Digest.Algorithm().Digester() + + n, err := io.Copy(io.MultiWriter(w, dgstr.Hash()), content.NewReader(r)) + if err != nil { + return 0, err + } + if dgstr.Digest() != desc.Digest { + return 0, errors.Errorf("unexpected digest %s copied", dgstr.Digest()) + } + return n, nil + }, } } + +func directoryRecord(name string, mode int64) tarRecord { + return tarRecord{ + Header: &tar.Header{ + Name: name, + Mode: mode, + Typeflag: tar.TypeDir, + }, + } +} + +func ociLayoutFile(version string) tarRecord { + if version == "" { + version = ocispec.ImageLayoutVersion + } + layout := ocispec.ImageLayout{ + Version: version, + } + + b, err := json.Marshal(layout) + if err != nil { + panic(err) + } + + return tarRecord{ + Header: &tar.Header{ + Name: ocispec.ImageLayoutFile, + Mode: 0444, + Size: int64(len(b)), + Typeflag: tar.TypeReg, + }, + CopyTo: func(ctx context.Context, w io.Writer) (int64, error) { + n, err := w.Write(b) + return int64(n), err + }, + } + +} + +func ociIndexRecord(manifests ...ocispec.Descriptor) tarRecord { + index := ocispec.Index{ + Versioned: ocispecs.Versioned{ + SchemaVersion: 2, + }, + Manifests: manifests, + } + + b, err := json.Marshal(index) + if err != nil { + panic(err) + } + + return tarRecord{ + Header: &tar.Header{ + Name: "index.json", + Mode: 0644, + Size: int64(len(b)), + Typeflag: tar.TypeReg, + }, + CopyTo: func(ctx context.Context, w io.Writer) (int64, error) { + n, err := w.Write(b) + return int64(n), err + }, + } +} + +func writeTar(ctx context.Context, tw *tar.Writer, records []tarRecord) error { + sort.Sort(tarRecordsByName(records)) + + for _, record := range records { + if err := tw.WriteHeader(record.Header); err != nil { + return err + } + if record.CopyTo != nil { + n, err := record.CopyTo(ctx, tw) + if err != nil { + return err + } + if n != record.Header.Size { + return errors.Errorf("unexpected copy size for %s", record.Header.Name) + } + } else if record.Header.Size > 0 { + return errors.Errorf("no content to write to record with non-zero size for %s", record.Header.Name) + } + } + return nil +} + +type tarRecordsByName []tarRecord + +func (t tarRecordsByName) Len() int { + return len(t) +} +func (t tarRecordsByName) Swap(i, j int) { + t[i], t[j] = t[j], t[i] +} +func (t tarRecordsByName) Less(i, j int) bool { + return t[i].Header.Name < t[j].Header.Name +} diff --git a/oci/oci.go b/oci/oci.go deleted file mode 100644 index 6094d3ea9..000000000 --- a/oci/oci.go +++ /dev/null @@ -1,293 +0,0 @@ -// Package oci provides basic operations for manipulating OCI images. -// This package can be used even outside of containerd, and contains some -// functions not used in containerd itself. -package oci - -import ( - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - - "github.com/opencontainers/go-digest" - "github.com/opencontainers/image-spec/specs-go" - spec "github.com/opencontainers/image-spec/specs-go/v1" -) - -// BlobWriter writes an OCI blob and returns a digest when committed. -type BlobWriter interface { - // Close is expected to be called after Commit() when commission is needed. - io.WriteCloser - // Digest may return empty digest or panics until committed. - Digest() digest.Digest - // Commit commits the blob (but no roll-back is guaranteed on an error). - // size and expected can be zero-value when unknown. - Commit(size int64, expected digest.Digest) error -} - -// ErrUnexpectedSize can be returned from BlobWriter.Commit() -type ErrUnexpectedSize struct { - Expected int64 - Actual int64 -} - -func (e ErrUnexpectedSize) Error() string { - if e.Expected > 0 && e.Expected != e.Actual { - return fmt.Sprintf("unexpected size: %d != %d", e.Expected, e.Actual) - } - return fmt.Sprintf("malformed ErrUnexpectedSize(%+v)", e) -} - -// ErrUnexpectedDigest can be returned from BlobWriter.Commit() -type ErrUnexpectedDigest struct { - Expected digest.Digest - Actual digest.Digest -} - -func (e ErrUnexpectedDigest) Error() string { - if e.Expected.String() != "" && e.Expected.String() != e.Actual.String() { - return fmt.Sprintf("unexpected digest: %v != %v", e.Expected, e.Actual) - } - return fmt.Sprintf("malformed ErrUnexpectedDigest(%+v)", e) -} - -// ImageDriver corresponds to the representation of an image. -// Path uses os.PathSeparator as the separator. -// The methods of ImageDriver should only be called from oci package. -type ImageDriver interface { - Init() error - Remove(path string) error - Reader(path string) (io.ReadCloser, error) - Writer(path string, perm os.FileMode) (io.WriteCloser, error) - BlobWriter(algo digest.Algorithm) (BlobWriter, error) -} - -type InitOpts struct { - // imageLayoutVersion can be an empty string for specifying the default version. - ImageLayoutVersion string - // skip creating oci-layout - SkipCreateImageLayout bool - // skip creating index.json - SkipCreateIndex bool -} - -// Init initializes an OCI image structure. -// Init calls img.Init, creates `oci-layout`(0444), and creates `index.json`(0644). -// -func Init(img ImageDriver, opts InitOpts) error { - if err := img.Init(); err != nil { - return err - } - - // Create oci-layout - if !opts.SkipCreateImageLayout { - imageLayoutVersion := opts.ImageLayoutVersion - if imageLayoutVersion == "" { - imageLayoutVersion = spec.ImageLayoutVersion - } - if err := WriteImageLayout(img, spec.ImageLayout{Version: imageLayoutVersion}); err != nil { - return err - } - } - - // Create index.json - if !opts.SkipCreateIndex { - if err := WriteIndex(img, spec.Index{Versioned: specs.Versioned{SchemaVersion: 2}}); err != nil { - return err - } - } - return nil -} - -func blobPath(d digest.Digest) string { - return filepath.Join("blobs", d.Algorithm().String(), d.Hex()) -} - -const ( - indexPath = "index.json" -) - -// GetBlobReader returns io.ReadCloser for a blob. -func GetBlobReader(img ImageDriver, d digest.Digest) (io.ReadCloser, error) { - // we return a reader rather than the full *os.File here so as to prohibit write operations. - return img.Reader(blobPath(d)) -} - -// ReadBlob reads an OCI blob. -func ReadBlob(img ImageDriver, d digest.Digest) ([]byte, error) { - r, err := GetBlobReader(img, d) - if err != nil { - return nil, err - } - defer r.Close() - return ioutil.ReadAll(r) -} - -// WriteBlob writes bytes as an OCI blob and returns its digest using the canonical digest algorithm. -// If you need to specify certain algorithm, you can use NewBlobWriter(img string, algo digest.Algorithm). -func WriteBlob(img ImageDriver, b []byte) (digest.Digest, error) { - w, err := img.BlobWriter(digest.Canonical) - if err != nil { - return "", err - } - n, err := w.Write(b) - if err != nil { - return "", err - } - if n < len(b) { - return "", io.ErrShortWrite - } - if err := w.Close(); err != nil { - return "", err - } - return w.Digest(), err -} - -// NewBlobWriter returns a BlobWriter. -func NewBlobWriter(img ImageDriver, algo digest.Algorithm) (BlobWriter, error) { - return img.BlobWriter(algo) -} - -// DeleteBlob deletes an OCI blob. -func DeleteBlob(img ImageDriver, d digest.Digest) error { - return img.Remove(blobPath(d)) -} - -// ReadImageLayout returns the image layout. -func ReadImageLayout(img ImageDriver) (spec.ImageLayout, error) { - r, err := img.Reader(spec.ImageLayoutFile) - if err != nil { - return spec.ImageLayout{}, err - } - b, err := ioutil.ReadAll(r) - if err != nil { - return spec.ImageLayout{}, err - } - if err := r.Close(); err != nil { - return spec.ImageLayout{}, err - } - var layout spec.ImageLayout - if err := json.Unmarshal(b, &layout); err != nil { - return spec.ImageLayout{}, err - } - return layout, nil -} - -// WriteImageLayout writes the image layout. -func WriteImageLayout(img ImageDriver, layout spec.ImageLayout) error { - b, err := json.Marshal(layout) - if err != nil { - return err - } - w, err := img.Writer(spec.ImageLayoutFile, 0444) - if err != nil { - return err - } - n, err := w.Write(b) - if err != nil { - return err - } - if n < len(b) { - return io.ErrShortWrite - } - return w.Close() -} - -// ReadIndex returns the index. -func ReadIndex(img ImageDriver) (spec.Index, error) { - r, err := img.Reader(indexPath) - if err != nil { - return spec.Index{}, err - } - b, err := ioutil.ReadAll(r) - if err != nil { - return spec.Index{}, err - } - if err := r.Close(); err != nil { - return spec.Index{}, err - } - var idx spec.Index - if err := json.Unmarshal(b, &idx); err != nil { - return spec.Index{}, err - } - return idx, nil -} - -// WriteIndex writes the index. -func WriteIndex(img ImageDriver, idx spec.Index) error { - b, err := json.Marshal(idx) - if err != nil { - return err - } - w, err := img.Writer(indexPath, 0644) - if err != nil { - return err - } - n, err := w.Write(b) - if err != nil { - return err - } - if n < len(b) { - return io.ErrShortWrite - } - return w.Close() -} - -// RemoveManifestDescriptorFromIndex removes the manifest descriptor from the index. -// Returns nil error when the entry not found. -func RemoveManifestDescriptorFromIndex(img ImageDriver, refName string) error { - if refName == "" { - return errors.New("empty refName specified") - } - src, err := ReadIndex(img) - if err != nil { - return err - } - dst := src - dst.Manifests = nil - for _, m := range src.Manifests { - mRefName, ok := m.Annotations[spec.AnnotationRefName] - if ok && mRefName == refName { - continue - } - dst.Manifests = append(dst.Manifests, m) - } - return WriteIndex(img, dst) -} - -// PutManifestDescriptorToIndex puts a manifest descriptor to the index. -// If ref name is set and conflicts with the existing descriptors, the old ones are removed. -func PutManifestDescriptorToIndex(img ImageDriver, desc spec.Descriptor) error { - refName, ok := desc.Annotations[spec.AnnotationRefName] - if ok && refName != "" { - if err := RemoveManifestDescriptorFromIndex(img, refName); err != nil { - return err - } - } - idx, err := ReadIndex(img) - if err != nil { - return err - } - idx.Manifests = append(idx.Manifests, desc) - return WriteIndex(img, idx) -} - -// WriteJSONBlob is an utility function that writes x as a JSON blob with the specified media type, and returns the descriptor. -func WriteJSONBlob(img ImageDriver, x interface{}, mediaType string) (spec.Descriptor, error) { - b, err := json.Marshal(x) - if err != nil { - return spec.Descriptor{}, err - } - d, err := WriteBlob(img, b) - if err != nil { - return spec.Descriptor{}, err - } - return spec.Descriptor{ - MediaType: mediaType, - Digest: d, - Size: int64(len(b)), - }, nil -} diff --git a/oci/tar.go b/oci/tar.go deleted file mode 100644 index 01614fa53..000000000 --- a/oci/tar.go +++ /dev/null @@ -1,156 +0,0 @@ -package oci - -import ( - "archive/tar" - "bytes" - "errors" - "io" - "os" - "path/filepath" - - "github.com/opencontainers/go-digest" -) - -// TarWriter is an interface that is implemented by archive/tar.Writer. -// (Using an interface allows hooking) -type TarWriter interface { - io.WriteCloser - Flush() error - WriteHeader(hdr *tar.Header) error -} - -// Tar is ImageDriver for TAR representation of an OCI image. -func Tar(w TarWriter) ImageDriver { - return &tarDriver{ - w: w, - } -} - -type tarDriver struct { - w TarWriter -} - -func (d *tarDriver) Init() error { - headers := []tar.Header{ - { - Name: "blobs/", - Mode: 0755, - Typeflag: tar.TypeDir, - }, - { - Name: "blobs/" + string(digest.Canonical) + "/", - Mode: 0755, - Typeflag: tar.TypeDir, - }, - } - for _, h := range headers { - if err := d.w.WriteHeader(&h); err != nil { - return err - } - } - return nil -} - -func (d *tarDriver) Remove(path string) error { - return errors.New("Tar does not support Remove") -} - -func (d *tarDriver) Reader(path string) (io.ReadCloser, error) { - // because tar does not support random access - return nil, errors.New("Tar does not support Reader") -} - -func (d *tarDriver) Writer(path string, perm os.FileMode) (io.WriteCloser, error) { - name := filepath.ToSlash(path) - return &tarDriverWriter{ - w: d.w, - name: name, - mode: int64(perm), - }, nil -} - -// tarDriverWriter is used for writing non-blob files -// (e.g. oci-layout, index.json) -type tarDriverWriter struct { - bytes.Buffer - w TarWriter - name string - mode int64 -} - -func (w *tarDriverWriter) Close() error { - if err := w.w.WriteHeader(&tar.Header{ - Name: w.name, - Mode: w.mode, - Size: int64(w.Len()), - Typeflag: tar.TypeReg, - }); err != nil { - return err - } - n, err := io.Copy(w.w, w) - if err != nil { - return err - } - if n < int64(w.Len()) { - return io.ErrShortWrite - } - return w.w.Flush() -} - -func (d *tarDriver) BlobWriter(algo digest.Algorithm) (BlobWriter, error) { - return &tarBlobWriter{ - w: d.w, - digester: algo.Digester(), - }, nil -} - -// tarBlobWriter implements BlobWriter. -type tarBlobWriter struct { - w TarWriter - digester digest.Digester - buf bytes.Buffer // TODO: use tmp file for large buffer? -} - -// Write implements io.Writer. -func (bw *tarBlobWriter) Write(b []byte) (int, error) { - n, err := bw.buf.Write(b) - if err != nil { - return n, err - } - return bw.digester.Hash().Write(b) -} - -func (bw *tarBlobWriter) Commit(size int64, expected digest.Digest) error { - path := "blobs/" + bw.digester.Digest().Algorithm().String() + "/" + bw.digester.Digest().Hex() - if err := bw.w.WriteHeader(&tar.Header{ - Name: path, - Mode: 0444, - Size: int64(bw.buf.Len()), - Typeflag: tar.TypeReg, - }); err != nil { - return err - } - n, err := io.Copy(bw.w, &bw.buf) - if err != nil { - return err - } - if n < int64(bw.buf.Len()) { - return io.ErrShortWrite - } - if size > 0 && size != n { - return ErrUnexpectedSize{Expected: size, Actual: n} - } - if expected != "" && bw.digester.Digest() != expected { - return ErrUnexpectedDigest{Expected: expected, Actual: bw.digester.Digest()} - } - return bw.w.Flush() -} - -func (bw *tarBlobWriter) Close() error { - // we don't close bw.w (reused for writing another blob) - return bw.w.Flush() -} - -func (bw *tarBlobWriter) Digest() digest.Digest { - return bw.digester.Digest() -}