diff --git a/cmd/ctr/content.go b/cmd/ctr/content.go index 1a61abedb..6be04e942 100644 --- a/cmd/ctr/content.go +++ b/cmd/ctr/content.go @@ -54,13 +54,13 @@ var ( return err } - rc, err := cs.Reader(ctx, dgst) + ra, err := cs.ReaderAt(ctx, dgst) if err != nil { return err } - defer rc.Close() + defer ra.Close() - _, err = io.Copy(os.Stdout, rc) + _, err = io.Copy(os.Stdout, content.NewReader(ra)) return err }, } @@ -306,24 +306,24 @@ var ( return err } - content, err := getContentStore(context) + cs, err := getContentStore(context) if err != nil { return err } - rc, err := content.Reader(ctx, dgst) + ra, err := cs.ReaderAt(ctx, dgst) if err != nil { return err } - defer rc.Close() + defer ra.Close() - nrc, err := edit(rc) + nrc, err := edit(content.NewReader(ra)) if err != nil { return err } defer nrc.Close() - wr, err := content.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key? + wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key? if err != nil { return err } diff --git a/cmd/ctr/pushobject.go b/cmd/ctr/pushobject.go index 989c6c93c..00baf1894 100644 --- a/cmd/ctr/pushobject.go +++ b/cmd/ctr/pushobject.go @@ -58,11 +58,11 @@ var pushObjectCommand = cli.Command{ Size: info.Size, } - rc, err := cs.Reader(ctx, dgst) + ra, err := cs.ReaderAt(ctx, dgst) if err != nil { return err } - defer rc.Close() + defer ra.Close() cw, err := pusher.Push(ctx, desc) if err != nil { @@ -70,7 +70,7 @@ var pushObjectCommand = cli.Command{ } // TODO: Progress reader - if err := content.Copy(cw, rc, desc.Size, desc.Digest); err != nil { + if err := content.Copy(cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil { return err } diff --git a/container_opts_unix.go b/container_opts_unix.go index d83baf7f5..be69500bd 100644 --- a/container_opts_unix.go +++ b/container_opts_unix.go @@ -6,7 +6,6 @@ import ( "context" "encoding/json" "fmt" - "io/ioutil" "github.com/containerd/containerd/api/types" "github.com/containerd/containerd/containers" @@ -54,12 +53,7 @@ func WithCheckpoint(desc v1.Descriptor, rootfsID string) NewContainerOpts { } c.Image = index.Annotations["image.name"] case images.MediaTypeContainerd1CheckpointConfig: - r, err := store.Reader(ctx, m.Digest) - if err != nil { - return err - } - data, err := ioutil.ReadAll(r) - r.Close() + data, err := content.ReadBlob(ctx, store, m.Digest) if err != nil { return err } @@ -111,11 +105,13 @@ func WithTaskCheckpoint(desc v1.Descriptor) NewTaskOpts { func decodeIndex(ctx context.Context, store content.Store, id digest.Digest) (*v1.Index, error) { var index v1.Index - r, err := store.Reader(ctx, id) + p, err := content.ReadBlob(ctx, store, id) if err != nil { return nil, err } - err = json.NewDecoder(r).Decode(&index) - r.Close() - return &index, err + if err := json.Unmarshal(p, &index); err != nil { + return nil, err + } + + return &index, nil } diff --git a/content/content.go b/content/content.go index 12ed2fe7f..6be2087e5 100644 --- a/content/content.go +++ b/content/content.go @@ -9,9 +9,14 @@ import ( "github.com/opencontainers/go-digest" ) +type ReaderAt interface { + io.ReaderAt + io.Closer + Size() int64 +} + type Provider interface { - Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) - ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) + ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error) } type Ingester interface { diff --git a/content/helpers.go b/content/helpers.go index 7b295be74..cb3ca7507 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "io" - "io/ioutil" "sync" "github.com/containerd/containerd/errdefs" @@ -20,17 +19,25 @@ var ( } ) +func NewReader(ra ReaderAt) io.Reader { + rd := io.NewSectionReader(ra, 0, ra.Size()) + return rd +} + // ReadBlob retrieves the entire contents of the blob from the provider. // // Avoid using this for large blobs, such as layers. func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) { - rc, err := provider.Reader(ctx, dgst) + ra, err := provider.ReaderAt(ctx, dgst) if err != nil { return nil, err } - defer rc.Close() + defer ra.Close() - return ioutil.ReadAll(rc) + p := make([]byte, ra.Size()) + + _, err = ra.ReadAt(p, 0) + return p, err } // WriteBlob writes data with the expected digest into the content store. If diff --git a/content/local/readerat.go b/content/local/readerat.go index 47521bc1a..ae1af5d8d 100644 --- a/content/local/readerat.go +++ b/content/local/readerat.go @@ -1,26 +1,24 @@ package local import ( - "io" "os" ) // readerat implements io.ReaderAt in a completely stateless manner by opening // the referenced file for each call to ReadAt. -type readerAt struct { - f string +type sizeReaderAt struct { + size int64 + fp *os.File } -func (ra readerAt) ReadAt(p []byte, offset int64) (int, error) { - fp, err := os.Open(ra.f) - if err != nil { - return 0, err - } - defer fp.Close() - - if _, err := fp.Seek(offset, io.SeekStart); err != nil { - return 0, err - } - - return fp.Read(p) +func (ra sizeReaderAt) ReadAt(p []byte, offset int64) (int, error) { + return ra.fp.ReadAt(p, offset) +} + +func (ra sizeReaderAt) Size() int64 { + return ra.size +} + +func (ra sizeReaderAt) Close() error { + return ra.fp.Close() } diff --git a/content/local/store.go b/content/local/store.go index fb9293034..f977e5ba9 100644 --- a/content/local/store.go +++ b/content/local/store.go @@ -69,22 +69,28 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo) content.Info { } } -// Reader returns an io.ReadCloser for the blob. -func (s *store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { - fp, err := os.Open(s.blobPath(dgst)) +// ReaderAt returns an io.ReaderAt for the blob. +func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { + p := s.blobPath(dgst) + fi, err := os.Stat(p) if err != nil { - if os.IsNotExist(err) { - err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst) + if !os.IsNotExist(err) { + return nil, err } - return nil, err + + return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) } - return fp, nil -} + fp, err := os.Open(p) + if err != nil { + if !os.IsNotExist(err) { + return nil, err + } -// ReaderAt returns an io.ReaderAt for the blob. -func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { - return readerAt{f: s.blobPath(dgst)}, nil + return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p) + } + + return sizeReaderAt{size: fi.Size(), fp: fp}, nil } // Delete removes a blob by its digest. diff --git a/differ/differ.go b/differ/differ.go index 76fedcced..9cd525399 100644 --- a/differ/differ.go +++ b/differ/differ.go @@ -67,14 +67,14 @@ func (s *BaseDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [] } defer mount.Unmount(dir, 0) - r, err := s.store.Reader(ctx, desc.Digest) + r, err := s.store.ReaderAt(ctx, desc.Digest) if err != nil { return emptyDesc, errors.Wrap(err, "failed to get reader from content store") } defer r.Close() // TODO: only decompress stream if media type is compressed - ds, err := compression.DecompressStream(r) + ds, err := compression.DecompressStream(content.NewReader(r)) if err != nil { return emptyDesc, err } diff --git a/export.go b/export.go index 22e56f0f5..826fc9c2b 100644 --- a/export.go +++ b/export.go @@ -43,15 +43,17 @@ func (c *Client) exportToOCITar(ctx context.Context, desc ocispec.Descriptor, wr func exportHandler(cs content.Store, img oci.ImageDriver) images.HandlerFunc { return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - r, err := cs.Reader(ctx, desc.Digest) + r, err := cs.ReaderAt(ctx, desc.Digest) if err != nil { return nil, err } + defer r.Close() + w, err := oci.NewBlobWriter(img, desc.Digest.Algorithm()) if err != nil { return nil, err } - if _, err = io.Copy(w, r); err != nil { + if _, err = io.Copy(w, content.NewReader(r)); err != nil { return nil, err } if err = w.Commit(desc.Size, desc.Digest); err != nil { diff --git a/images/image.go b/images/image.go index 6e3bd44ed..4df6e9dd2 100644 --- a/images/image.go +++ b/images/image.go @@ -3,7 +3,6 @@ package images import ( "context" "encoding/json" - "io/ioutil" "time" "github.com/containerd/containerd/content" @@ -73,13 +72,7 @@ func Config(ctx context.Context, provider content.Provider, image ocispec.Descri return configDesc, Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { switch image.MediaType { case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: - rc, err := provider.Reader(ctx, image.Digest) - if err != nil { - return nil, err - } - defer rc.Close() - - p, err := ioutil.ReadAll(rc) + p, err := content.ReadBlob(ctx, provider, image.Digest) if err != nil { return nil, err } diff --git a/metadata/content.go b/metadata/content.go index 811a91912..614d50496 100644 --- a/metadata/content.go +++ b/metadata/content.go @@ -3,7 +3,6 @@ package metadata import ( "context" "encoding/binary" - "io" "strings" "time" @@ -421,14 +420,7 @@ func (nw *namespacedWriter) Status() (content.Status, error) { return st, err } -func (cs *contentStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { - if err := cs.checkAccess(ctx, dgst); err != nil { - return nil, err - } - return cs.Store.Reader(ctx, dgst) -} - -func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { +func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { if err := cs.checkAccess(ctx, dgst); err != nil { return nil, err } diff --git a/remotes/docker/schema1/converter.go b/remotes/docker/schema1/converter.go index 7d24d156f..1003ea774 100644 --- a/remotes/docker/schema1/converter.go +++ b/remotes/docker/schema1/converter.go @@ -210,13 +210,13 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro // TODO: Check if blob -> diff id mapping already exists // TODO: Check if blob empty label exists - r, err := c.contentStore.Reader(ctx, desc.Digest) + ra, err := c.contentStore.ReaderAt(ctx, desc.Digest) if err != nil { return err } - defer r.Close() + defer ra.Close() - gr, err := gzip.NewReader(r) + gr, err := gzip.NewReader(content.NewReader(ra)) if err != nil { return err } diff --git a/remotes/handlers.go b/remotes/handlers.go index 9c5f9ca83..d21274b32 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -3,6 +3,7 @@ package remotes import ( "context" "fmt" + "io" "time" "github.com/containerd/containerd/content" @@ -131,11 +132,12 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc } defer cw.Close() - rc, err := provider.Reader(ctx, desc.Digest) + ra, err := provider.ReaderAt(ctx, desc.Digest) if err != nil { return err } - defer rc.Close() + defer ra.Close() - return content.Copy(cw, rc, desc.Size, desc.Digest) + rd := io.NewSectionReader(ra, 0, desc.Size) + return content.Copy(cw, rd, desc.Size, desc.Digest) } diff --git a/services/content/reader.go b/services/content/reader.go index 3a5f6fde3..a8cc55430 100644 --- a/services/content/reader.go +++ b/services/content/reader.go @@ -7,54 +7,17 @@ import ( digest "github.com/opencontainers/go-digest" ) -type remoteReader struct { - client contentapi.Content_ReadClient - extra []byte -} - -func (rr *remoteReader) Read(p []byte) (n int, err error) { - n += copy(p, rr.extra) - if n >= len(p) { - if n <= len(rr.extra) { - rr.extra = rr.extra[n:] - } else { - rr.extra = rr.extra[:0] - } - return - } - rr.extra = rr.extra[:0] - - p = p[n:] - for len(p) > 0 { - var resp *contentapi.ReadContentResponse - // fill our buffer up until we can fill p. - resp, err = rr.client.Recv() - if err != nil { - return - } - - copied := copy(p, resp.Data) - n += copied - p = p[copied:] - - if len(p) == 0 { - rr.extra = append(rr.extra, resp.Data[copied:]...) - } - } - - return -} - -func (rr *remoteReader) Close() error { - return rr.client.CloseSend() -} - type remoteReaderAt struct { ctx context.Context digest digest.Digest + size int64 client contentapi.ContentClient } +func (ra *remoteReaderAt) Size() int64 { + return ra.size +} + func (ra *remoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) { rr := &contentapi.ReadContentRequest{ Digest: ra.digest, @@ -80,3 +43,7 @@ func (ra *remoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) { } return n, nil } + +func (rr *remoteReaderAt) Close() error { + return nil +} diff --git a/services/content/service.go b/services/content/service.go index 3b972e481..f75f89071 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -168,20 +168,11 @@ func (s *Service) Read(req *api.ReadContentRequest, session api.Content_ReadServ return errdefs.ToGRPC(err) } - rc, err := s.store.Reader(session.Context(), req.Digest) + ra, err := s.store.ReaderAt(session.Context(), req.Digest) if err != nil { return errdefs.ToGRPC(err) } - defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance. - - ra, ok := rc.(io.ReaderAt) - if !ok { - // TODO(stevvooe): Need to set this up to get correct behavior across - // board. May change interface to store to just return ReaderAtCloser. - // Possibly, we could just return io.ReaderAt and handle file - // descriptors internally. - return errors.New("content service only supports content stores that return ReaderAt") - } + defer ra.Close() var ( offset = req.Offset diff --git a/services/content/store.go b/services/content/store.go index b65d50797..11fcc0365 100644 --- a/services/content/store.go +++ b/services/content/store.go @@ -70,21 +70,16 @@ func (rs *remoteStore) Delete(ctx context.Context, dgst digest.Digest) error { return nil } -func (rs *remoteStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { - client, err := rs.client.Read(ctx, &contentapi.ReadContentRequest{Digest: dgst}) +func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) { + i, err := rs.Info(ctx, dgst) if err != nil { return nil, err } - return &remoteReader{ - client: client, - }, nil -} - -func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) { return &remoteReaderAt{ ctx: ctx, digest: dgst, + size: i.Size, client: rs.client, }, nil } diff --git a/services/tasks/service.go b/services/tasks/service.go index 0d0129398..bb16b2e69 100644 --- a/services/tasks/service.go +++ b/services/tasks/service.go @@ -100,11 +100,11 @@ func (s *Service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.Cr if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) } - reader, err := s.store.Reader(ctx, r.Checkpoint.Digest) + reader, err := s.store.ReaderAt(ctx, r.Checkpoint.Digest) if err != nil { return nil, err } - _, err = archive.Apply(ctx, checkpointPath, reader) + _, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader)) reader.Close() if err != nil { return nil, err diff --git a/spec_opts_unix.go b/spec_opts_unix.go index 7ee366718..ac5d288d0 100644 --- a/spec_opts_unix.go +++ b/spec_opts_unix.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/typeurl" "github.com/opencontainers/image-spec/identity" @@ -73,15 +74,14 @@ func WithImageConfig(ctx context.Context, i Image) SpecOpts { ) switch ic.MediaType { case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: - r, err := store.Reader(ctx, ic.Digest) + p, err := content.ReadBlob(ctx, store, ic.Digest) if err != nil { return err } - if err := json.NewDecoder(r).Decode(&ociimage); err != nil { - r.Close() + + if err := json.Unmarshal(p, &ociimage); err != nil { return err } - r.Close() config = ociimage.Config default: return fmt.Errorf("unknown image config media type %s", ic.MediaType) diff --git a/spec_opts_windows.go b/spec_opts_windows.go index 861bdb1d8..f4dc8b25b 100644 --- a/spec_opts_windows.go +++ b/spec_opts_windows.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/containerd/containerd/containers" + "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/typeurl" "github.com/opencontainers/image-spec/specs-go/v1" @@ -30,15 +31,13 @@ func WithImageConfig(ctx context.Context, i Image) SpecOpts { ) switch ic.MediaType { case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config: - r, err := store.Reader(ctx, ic.Digest) + p, err := content.ReadBlob(ctx, store, ic.Digest) if err != nil { return err } - if err := json.NewDecoder(r).Decode(&ociimage); err != nil { - r.Close() + if err := json.Unmarshal(p, &ociimage); err != nil { return err } - r.Close() config = ociimage.Config default: return fmt.Errorf("unknown image config media type %s", ic.MediaType)