content: remove Provider.Reader
After some analysis, it was found that Content.Reader was generally redudant to an io.ReaderAt. This change removes `Content.Reader` in favor of a `Content.ReaderAt`. In general, `ReaderAt` can perform better over interfaces with indeterminant latency because it avoids remote state for reads. Where a reader is required, a helper is provided to convert it into an `io.SectionReader`. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
parent
1f04eddad1
commit
8be340e37b
@ -54,13 +54,13 @@ var (
|
||||
return err
|
||||
}
|
||||
|
||||
rc, err := cs.Reader(ctx, dgst)
|
||||
ra, err := cs.ReaderAt(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rc.Close()
|
||||
defer ra.Close()
|
||||
|
||||
_, err = io.Copy(os.Stdout, rc)
|
||||
_, err = io.Copy(os.Stdout, content.NewReader(ra))
|
||||
return err
|
||||
},
|
||||
}
|
||||
@ -306,24 +306,24 @@ var (
|
||||
return err
|
||||
}
|
||||
|
||||
content, err := getContentStore(context)
|
||||
cs, err := getContentStore(context)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rc, err := content.Reader(ctx, dgst)
|
||||
ra, err := cs.ReaderAt(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rc.Close()
|
||||
defer ra.Close()
|
||||
|
||||
nrc, err := edit(rc)
|
||||
nrc, err := edit(content.NewReader(ra))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer nrc.Close()
|
||||
|
||||
wr, err := content.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key?
|
||||
wr, err := cs.Writer(ctx, "edit-"+object, 0, "") // TODO(stevvooe): Choose a better key?
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -58,11 +58,11 @@ var pushObjectCommand = cli.Command{
|
||||
Size: info.Size,
|
||||
}
|
||||
|
||||
rc, err := cs.Reader(ctx, dgst)
|
||||
ra, err := cs.ReaderAt(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rc.Close()
|
||||
defer ra.Close()
|
||||
|
||||
cw, err := pusher.Push(ctx, desc)
|
||||
if err != nil {
|
||||
@ -70,7 +70,7 @@ var pushObjectCommand = cli.Command{
|
||||
}
|
||||
|
||||
// TODO: Progress reader
|
||||
if err := content.Copy(cw, rc, desc.Size, desc.Digest); err != nil {
|
||||
if err := content.Copy(cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -6,7 +6,6 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
|
||||
"github.com/containerd/containerd/api/types"
|
||||
"github.com/containerd/containerd/containers"
|
||||
@ -54,12 +53,7 @@ func WithCheckpoint(desc v1.Descriptor, rootfsID string) NewContainerOpts {
|
||||
}
|
||||
c.Image = index.Annotations["image.name"]
|
||||
case images.MediaTypeContainerd1CheckpointConfig:
|
||||
r, err := store.Reader(ctx, m.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
data, err := ioutil.ReadAll(r)
|
||||
r.Close()
|
||||
data, err := content.ReadBlob(ctx, store, m.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -111,11 +105,13 @@ func WithTaskCheckpoint(desc v1.Descriptor) NewTaskOpts {
|
||||
|
||||
func decodeIndex(ctx context.Context, store content.Store, id digest.Digest) (*v1.Index, error) {
|
||||
var index v1.Index
|
||||
r, err := store.Reader(ctx, id)
|
||||
p, err := content.ReadBlob(ctx, store, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = json.NewDecoder(r).Decode(&index)
|
||||
r.Close()
|
||||
return &index, err
|
||||
if err := json.Unmarshal(p, &index); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &index, nil
|
||||
}
|
||||
|
@ -9,9 +9,14 @@ import (
|
||||
"github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
type ReaderAt interface {
|
||||
io.ReaderAt
|
||||
io.Closer
|
||||
Size() int64
|
||||
}
|
||||
|
||||
type Provider interface {
|
||||
Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error)
|
||||
ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error)
|
||||
ReaderAt(ctx context.Context, dgst digest.Digest) (ReaderAt, error)
|
||||
}
|
||||
|
||||
type Ingester interface {
|
||||
|
@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"sync"
|
||||
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
@ -20,17 +19,25 @@ var (
|
||||
}
|
||||
)
|
||||
|
||||
func NewReader(ra ReaderAt) io.Reader {
|
||||
rd := io.NewSectionReader(ra, 0, ra.Size())
|
||||
return rd
|
||||
}
|
||||
|
||||
// ReadBlob retrieves the entire contents of the blob from the provider.
|
||||
//
|
||||
// Avoid using this for large blobs, such as layers.
|
||||
func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) {
|
||||
rc, err := provider.Reader(ctx, dgst)
|
||||
ra, err := provider.ReaderAt(ctx, dgst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rc.Close()
|
||||
defer ra.Close()
|
||||
|
||||
return ioutil.ReadAll(rc)
|
||||
p := make([]byte, ra.Size())
|
||||
|
||||
_, err = ra.ReadAt(p, 0)
|
||||
return p, err
|
||||
}
|
||||
|
||||
// WriteBlob writes data with the expected digest into the content store. If
|
||||
|
@ -1,26 +1,24 @@
|
||||
package local
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
)
|
||||
|
||||
// readerat implements io.ReaderAt in a completely stateless manner by opening
|
||||
// the referenced file for each call to ReadAt.
|
||||
type readerAt struct {
|
||||
f string
|
||||
type sizeReaderAt struct {
|
||||
size int64
|
||||
fp *os.File
|
||||
}
|
||||
|
||||
func (ra readerAt) ReadAt(p []byte, offset int64) (int, error) {
|
||||
fp, err := os.Open(ra.f)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
defer fp.Close()
|
||||
|
||||
if _, err := fp.Seek(offset, io.SeekStart); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return fp.Read(p)
|
||||
func (ra sizeReaderAt) ReadAt(p []byte, offset int64) (int, error) {
|
||||
return ra.fp.ReadAt(p, offset)
|
||||
}
|
||||
|
||||
func (ra sizeReaderAt) Size() int64 {
|
||||
return ra.size
|
||||
}
|
||||
|
||||
func (ra sizeReaderAt) Close() error {
|
||||
return ra.fp.Close()
|
||||
}
|
||||
|
@ -69,22 +69,28 @@ func (s *store) info(dgst digest.Digest, fi os.FileInfo) content.Info {
|
||||
}
|
||||
}
|
||||
|
||||
// Reader returns an io.ReadCloser for the blob.
|
||||
func (s *store) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||
fp, err := os.Open(s.blobPath(dgst))
|
||||
// ReaderAt returns an io.ReaderAt for the blob.
|
||||
func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
|
||||
p := s.blobPath(dgst)
|
||||
fi, err := os.Stat(p)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
err = errors.Wrapf(errdefs.ErrNotFound, "content %v", dgst)
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
return nil, err
|
||||
|
||||
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
|
||||
}
|
||||
|
||||
return fp, nil
|
||||
}
|
||||
fp, err := os.Open(p)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ReaderAt returns an io.ReaderAt for the blob.
|
||||
func (s *store) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) {
|
||||
return readerAt{f: s.blobPath(dgst)}, nil
|
||||
return nil, errors.Wrapf(errdefs.ErrNotFound, "blob %s expected at %s", dgst, p)
|
||||
}
|
||||
|
||||
return sizeReaderAt{size: fi.Size(), fp: fp}, nil
|
||||
}
|
||||
|
||||
// Delete removes a blob by its digest.
|
||||
|
@ -67,14 +67,14 @@ func (s *BaseDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []
|
||||
}
|
||||
defer mount.Unmount(dir, 0)
|
||||
|
||||
r, err := s.store.Reader(ctx, desc.Digest)
|
||||
r, err := s.store.ReaderAt(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to get reader from content store")
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
// TODO: only decompress stream if media type is compressed
|
||||
ds, err := compression.DecompressStream(r)
|
||||
ds, err := compression.DecompressStream(content.NewReader(r))
|
||||
if err != nil {
|
||||
return emptyDesc, err
|
||||
}
|
||||
|
@ -43,15 +43,17 @@ func (c *Client) exportToOCITar(ctx context.Context, desc ocispec.Descriptor, wr
|
||||
|
||||
func exportHandler(cs content.Store, img oci.ImageDriver) images.HandlerFunc {
|
||||
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
r, err := cs.Reader(ctx, desc.Digest)
|
||||
r, err := cs.ReaderAt(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
w, err := oci.NewBlobWriter(img, desc.Digest.Algorithm())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if _, err = io.Copy(w, r); err != nil {
|
||||
if _, err = io.Copy(w, content.NewReader(r)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = w.Commit(desc.Size, desc.Digest); err != nil {
|
||||
|
@ -3,7 +3,6 @@ package images
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
@ -73,13 +72,7 @@ func Config(ctx context.Context, provider content.Provider, image ocispec.Descri
|
||||
return configDesc, Walk(ctx, HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
switch image.MediaType {
|
||||
case MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||
rc, err := provider.Reader(ctx, image.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rc.Close()
|
||||
|
||||
p, err := ioutil.ReadAll(rc)
|
||||
p, err := content.ReadBlob(ctx, provider, image.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -3,7 +3,6 @@ package metadata
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@ -421,14 +420,7 @@ func (nw *namespacedWriter) Status() (content.Status, error) {
|
||||
return st, err
|
||||
}
|
||||
|
||||
func (cs *contentStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||
if err := cs.checkAccess(ctx, dgst); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cs.Store.Reader(ctx, dgst)
|
||||
}
|
||||
|
||||
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) {
|
||||
func (cs *contentStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
|
||||
if err := cs.checkAccess(ctx, dgst); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -210,13 +210,13 @@ func (c *Converter) fetchBlob(ctx context.Context, desc ocispec.Descriptor) erro
|
||||
// TODO: Check if blob -> diff id mapping already exists
|
||||
// TODO: Check if blob empty label exists
|
||||
|
||||
r, err := c.contentStore.Reader(ctx, desc.Digest)
|
||||
ra, err := c.contentStore.ReaderAt(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer r.Close()
|
||||
defer ra.Close()
|
||||
|
||||
gr, err := gzip.NewReader(r)
|
||||
gr, err := gzip.NewReader(content.NewReader(ra))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3,6 +3,7 @@ package remotes
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
@ -131,11 +132,12 @@ func push(ctx context.Context, provider content.Provider, pusher Pusher, desc oc
|
||||
}
|
||||
defer cw.Close()
|
||||
|
||||
rc, err := provider.Reader(ctx, desc.Digest)
|
||||
ra, err := provider.ReaderAt(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rc.Close()
|
||||
defer ra.Close()
|
||||
|
||||
return content.Copy(cw, rc, desc.Size, desc.Digest)
|
||||
rd := io.NewSectionReader(ra, 0, desc.Size)
|
||||
return content.Copy(cw, rd, desc.Size, desc.Digest)
|
||||
}
|
||||
|
@ -7,54 +7,17 @@ import (
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
type remoteReader struct {
|
||||
client contentapi.Content_ReadClient
|
||||
extra []byte
|
||||
}
|
||||
|
||||
func (rr *remoteReader) Read(p []byte) (n int, err error) {
|
||||
n += copy(p, rr.extra)
|
||||
if n >= len(p) {
|
||||
if n <= len(rr.extra) {
|
||||
rr.extra = rr.extra[n:]
|
||||
} else {
|
||||
rr.extra = rr.extra[:0]
|
||||
}
|
||||
return
|
||||
}
|
||||
rr.extra = rr.extra[:0]
|
||||
|
||||
p = p[n:]
|
||||
for len(p) > 0 {
|
||||
var resp *contentapi.ReadContentResponse
|
||||
// fill our buffer up until we can fill p.
|
||||
resp, err = rr.client.Recv()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
copied := copy(p, resp.Data)
|
||||
n += copied
|
||||
p = p[copied:]
|
||||
|
||||
if len(p) == 0 {
|
||||
rr.extra = append(rr.extra, resp.Data[copied:]...)
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (rr *remoteReader) Close() error {
|
||||
return rr.client.CloseSend()
|
||||
}
|
||||
|
||||
type remoteReaderAt struct {
|
||||
ctx context.Context
|
||||
digest digest.Digest
|
||||
size int64
|
||||
client contentapi.ContentClient
|
||||
}
|
||||
|
||||
func (ra *remoteReaderAt) Size() int64 {
|
||||
return ra.size
|
||||
}
|
||||
|
||||
func (ra *remoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
rr := &contentapi.ReadContentRequest{
|
||||
Digest: ra.digest,
|
||||
@ -80,3 +43,7 @@ func (ra *remoteReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (rr *remoteReaderAt) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
@ -168,20 +168,11 @@ func (s *Service) Read(req *api.ReadContentRequest, session api.Content_ReadServ
|
||||
return errdefs.ToGRPC(err)
|
||||
}
|
||||
|
||||
rc, err := s.store.Reader(session.Context(), req.Digest)
|
||||
ra, err := s.store.ReaderAt(session.Context(), req.Digest)
|
||||
if err != nil {
|
||||
return errdefs.ToGRPC(err)
|
||||
}
|
||||
defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance.
|
||||
|
||||
ra, ok := rc.(io.ReaderAt)
|
||||
if !ok {
|
||||
// TODO(stevvooe): Need to set this up to get correct behavior across
|
||||
// board. May change interface to store to just return ReaderAtCloser.
|
||||
// Possibly, we could just return io.ReaderAt and handle file
|
||||
// descriptors internally.
|
||||
return errors.New("content service only supports content stores that return ReaderAt")
|
||||
}
|
||||
defer ra.Close()
|
||||
|
||||
var (
|
||||
offset = req.Offset
|
||||
|
@ -70,21 +70,16 @@ func (rs *remoteStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||
client, err := rs.client.Read(ctx, &contentapi.ReadContentRequest{Digest: dgst})
|
||||
func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (content.ReaderAt, error) {
|
||||
i, err := rs.Info(ctx, dgst)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &remoteReader{
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) ReaderAt(ctx context.Context, dgst digest.Digest) (io.ReaderAt, error) {
|
||||
return &remoteReaderAt{
|
||||
ctx: ctx,
|
||||
digest: dgst,
|
||||
size: i.Size,
|
||||
client: rs.client,
|
||||
}, nil
|
||||
}
|
||||
|
@ -100,11 +100,11 @@ func (s *Service) Create(ctx context.Context, r *api.CreateTaskRequest) (*api.Cr
|
||||
if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint {
|
||||
return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType)
|
||||
}
|
||||
reader, err := s.store.Reader(ctx, r.Checkpoint.Digest)
|
||||
reader, err := s.store.ReaderAt(ctx, r.Checkpoint.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = archive.Apply(ctx, checkpointPath, reader)
|
||||
_, err = archive.Apply(ctx, checkpointPath, content.NewReader(reader))
|
||||
reader.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/typeurl"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
@ -73,15 +74,14 @@ func WithImageConfig(ctx context.Context, i Image) SpecOpts {
|
||||
)
|
||||
switch ic.MediaType {
|
||||
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
|
||||
r, err := store.Reader(ctx, ic.Digest)
|
||||
p, err := content.ReadBlob(ctx, store, ic.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := json.NewDecoder(r).Decode(&ociimage); err != nil {
|
||||
r.Close()
|
||||
|
||||
if err := json.Unmarshal(p, &ociimage); err != nil {
|
||||
return err
|
||||
}
|
||||
r.Close()
|
||||
config = ociimage.Config
|
||||
default:
|
||||
return fmt.Errorf("unknown image config media type %s", ic.MediaType)
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/containerd/containerd/containers"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/typeurl"
|
||||
"github.com/opencontainers/image-spec/specs-go/v1"
|
||||
@ -30,15 +31,13 @@ func WithImageConfig(ctx context.Context, i Image) SpecOpts {
|
||||
)
|
||||
switch ic.MediaType {
|
||||
case v1.MediaTypeImageConfig, images.MediaTypeDockerSchema2Config:
|
||||
r, err := store.Reader(ctx, ic.Digest)
|
||||
p, err := content.ReadBlob(ctx, store, ic.Digest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := json.NewDecoder(r).Decode(&ociimage); err != nil {
|
||||
r.Close()
|
||||
if err := json.Unmarshal(p, &ociimage); err != nil {
|
||||
return err
|
||||
}
|
||||
r.Close()
|
||||
config = ociimage.Config
|
||||
default:
|
||||
return fmt.Errorf("unknown image config media type %s", ic.MediaType)
|
||||
|
Loading…
Reference in New Issue
Block a user