 a12f493bd3
			
		
	
	a12f493bd3
	
	
	
		
			
			Prevents the copy method from calling discard on the writer when the reader is not seekable. Instead, the copy method will discard up to the offset. Truncate is a more expensive operation since any bytes that are truncated already have their hash calculated and are stored on disk in the backend. Re-writing bytes which were truncated requires transfering the data over GRPC again and re-computing the hash up to the point of truncation. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
		
			
				
	
	
		
			140 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			140 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package content
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"io"
 | |
| 	"io/ioutil"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/containerd/containerd/errdefs"
 | |
| 	"github.com/opencontainers/go-digest"
 | |
| 	"github.com/pkg/errors"
 | |
| )
 | |
| 
 | |
| var bufPool = sync.Pool{
 | |
| 	New: func() interface{} {
 | |
| 		buffer := make([]byte, 1<<20)
 | |
| 		return &buffer
 | |
| 	},
 | |
| }
 | |
| 
 | |
| // NewReader returns a io.Reader from a ReaderAt
 | |
| func NewReader(ra ReaderAt) io.Reader {
 | |
| 	rd := io.NewSectionReader(ra, 0, ra.Size())
 | |
| 	return rd
 | |
| }
 | |
| 
 | |
| // ReadBlob retrieves the entire contents of the blob from the provider.
 | |
| //
 | |
| // Avoid using this for large blobs, such as layers.
 | |
| func ReadBlob(ctx context.Context, provider Provider, dgst digest.Digest) ([]byte, error) {
 | |
| 	ra, err := provider.ReaderAt(ctx, dgst)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	defer ra.Close()
 | |
| 
 | |
| 	p := make([]byte, ra.Size())
 | |
| 
 | |
| 	_, err = ra.ReadAt(p, 0)
 | |
| 	return p, err
 | |
| }
 | |
| 
 | |
| // WriteBlob writes data with the expected digest into the content store. If
 | |
| // expected already exists, the method returns immediately and the reader will
 | |
| // not be consumed.
 | |
| //
 | |
| // This is useful when the digest and size are known beforehand.
 | |
| //
 | |
| // Copy is buffered, so no need to wrap reader in buffered io.
 | |
| func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
 | |
| 	cw, err := cs.Writer(ctx, ref, size, expected)
 | |
| 	if err != nil {
 | |
| 		if !errdefs.IsAlreadyExists(err) {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		return nil // all ready present
 | |
| 	}
 | |
| 	defer cw.Close()
 | |
| 
 | |
| 	return Copy(ctx, cw, r, size, expected, opts...)
 | |
| }
 | |
| 
 | |
| // Copy copies data with the expected digest from the reader into the
 | |
| // provided content store writer.
 | |
| //
 | |
| // This is useful when the digest and size are known beforehand. When
 | |
| // the size or digest is unknown, these values may be empty.
 | |
| //
 | |
| // Copy is buffered, so no need to wrap reader in buffered io.
 | |
| func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
 | |
| 	ws, err := cw.Status()
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if ws.Offset > 0 {
 | |
| 		r, err = seekReader(r, ws.Offset, size)
 | |
| 		if err != nil {
 | |
| 			return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	buf := bufPool.Get().(*[]byte)
 | |
| 	defer bufPool.Put(buf)
 | |
| 
 | |
| 	if _, err := io.CopyBuffer(cw, r, *buf); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	if err := cw.Commit(ctx, size, expected, opts...); err != nil {
 | |
| 		if !errdefs.IsAlreadyExists(err) {
 | |
| 			return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // seekReader attempts to seek the reader to the given offset, either by
 | |
| // resolving `io.Seeker`, by detecting `io.ReaderAt`, or discarding
 | |
| // up to the given offset.
 | |
| func seekReader(r io.Reader, offset, size int64) (io.Reader, error) {
 | |
| 	// attempt to resolve r as a seeker and setup the offset.
 | |
| 	seeker, ok := r.(io.Seeker)
 | |
| 	if ok {
 | |
| 		nn, err := seeker.Seek(offset, io.SeekStart)
 | |
| 		if nn != offset {
 | |
| 			return nil, errors.Wrapf(err, "failed to seek to offset %v", offset)
 | |
| 		}
 | |
| 
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		return r, nil
 | |
| 	}
 | |
| 
 | |
| 	// ok, let's try io.ReaderAt!
 | |
| 	readerAt, ok := r.(io.ReaderAt)
 | |
| 	if ok && size > offset {
 | |
| 		sr := io.NewSectionReader(readerAt, offset, size)
 | |
| 		return sr, nil
 | |
| 	}
 | |
| 
 | |
| 	// well then, let's just discard up to the offset
 | |
| 	buf := bufPool.Get().(*[]byte)
 | |
| 	defer bufPool.Put(buf)
 | |
| 
 | |
| 	n, err := io.CopyBuffer(ioutil.Discard, io.LimitReader(r, offset), *buf)
 | |
| 	if err != nil {
 | |
| 		return nil, errors.Wrap(err, "failed to discard to offset")
 | |
| 	}
 | |
| 	if n != offset {
 | |
| 		return nil, errors.Errorf("unable to discard to offset")
 | |
| 	}
 | |
| 
 | |
| 	return r, nil
 | |
| }
 |