Merge pull request #7460 from AkihiroSuda/fetch-by-digest
remotes: add `FetcherByDigest` for fetching blobs without foreknown descriptors (useful for general-purpose CAS)
This commit is contained in:
commit
31bb8fef7e
@ -29,6 +29,7 @@ import (
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/remotes"
|
||||
units "github.com/docker/go-units"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
@ -47,6 +48,7 @@ var (
|
||||
editCommand,
|
||||
fetchCommand,
|
||||
fetchObjectCommand,
|
||||
fetchBlobCommand,
|
||||
getCommand,
|
||||
ingestCommand,
|
||||
listCommand,
|
||||
@ -442,6 +444,60 @@ var (
|
||||
},
|
||||
}
|
||||
|
||||
fetchBlobCommand = cli.Command{
|
||||
Name: "fetch-blob",
|
||||
Usage: "retrieve blobs from a remote",
|
||||
ArgsUsage: "[flags] <remote> [<digest>, ...]",
|
||||
Description: `Fetch blobs by digests from a remote.`,
|
||||
Flags: commands.RegistryFlags,
|
||||
Action: func(context *cli.Context) error {
|
||||
var (
|
||||
ref = context.Args().First()
|
||||
digests = context.Args().Tail()
|
||||
)
|
||||
if len(digests) == 0 {
|
||||
return errors.New("must specify digests")
|
||||
}
|
||||
ctx, cancel := commands.AppContext(context)
|
||||
defer cancel()
|
||||
|
||||
resolver, err := commands.GetResolver(ctx, context)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("ref", ref))
|
||||
|
||||
log.G(ctx).Debugf("resolving")
|
||||
fetcher, err := resolver.Fetcher(ctx, ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
fetcherByDigest, ok := fetcher.(remotes.FetcherByDigest)
|
||||
if !ok {
|
||||
return fmt.Errorf("fetcher %T does not implement remotes.FetcherByDigest", fetcher)
|
||||
}
|
||||
|
||||
for _, f := range digests {
|
||||
dgst, err := digest.Parse(f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rc, _, err := fetcherByDigest.FetchByDigest(ctx, dgst)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = io.Copy(os.Stdout, rc)
|
||||
rc.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
pushObjectCommand = cli.Command{
|
||||
Name: "push-object",
|
||||
Usage: "push an object to a remote",
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
@ -150,8 +151,106 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R
|
||||
})
|
||||
}
|
||||
|
||||
func (r dockerFetcher) createGetReq(ctx context.Context, host RegistryHost, ps ...string) (*request, int64, error) {
|
||||
headReq := r.request(host, http.MethodHead, ps...)
|
||||
if err := headReq.addNamespace(r.refspec.Hostname()); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
headResp, err := headReq.doWithRetries(ctx, nil)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
if headResp.Body != nil {
|
||||
headResp.Body.Close()
|
||||
}
|
||||
if headResp.StatusCode > 299 {
|
||||
return nil, 0, fmt.Errorf("unexpected HEAD status code %v: %s", headReq.String(), headResp.Status)
|
||||
}
|
||||
|
||||
getReq := r.request(host, http.MethodGet, ps...)
|
||||
if err := getReq.addNamespace(r.refspec.Hostname()); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return getReq, headResp.ContentLength, nil
|
||||
}
|
||||
|
||||
func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest) (io.ReadCloser, ocispec.Descriptor, error) {
|
||||
var desc ocispec.Descriptor
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", dgst))
|
||||
|
||||
hosts := r.filterHosts(HostCapabilityPull)
|
||||
if len(hosts) == 0 {
|
||||
return nil, desc, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound)
|
||||
}
|
||||
|
||||
ctx, err := ContextWithRepositoryScope(ctx, r.refspec, false)
|
||||
if err != nil {
|
||||
return nil, desc, err
|
||||
}
|
||||
|
||||
var (
|
||||
getReq *request
|
||||
sz int64
|
||||
firstErr error
|
||||
)
|
||||
|
||||
for _, host := range r.hosts {
|
||||
getReq, sz, err = r.createGetReq(ctx, host, "blobs", dgst.String())
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
// Store the error for referencing later
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
|
||||
if getReq == nil {
|
||||
// Fall back to the "manifests" endpoint
|
||||
for _, host := range r.hosts {
|
||||
getReq, sz, err = r.createGetReq(ctx, host, "manifests", dgst.String())
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
// Store the error for referencing later
|
||||
if firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if getReq == nil {
|
||||
if errdefs.IsNotFound(firstErr) {
|
||||
firstErr = fmt.Errorf("could not fetch content %v from remote: %w", dgst, errdefs.ErrNotFound)
|
||||
}
|
||||
if firstErr == nil {
|
||||
firstErr = fmt.Errorf("could not fetch content %v from remote: (unknown)", dgst)
|
||||
}
|
||||
return nil, desc, firstErr
|
||||
}
|
||||
|
||||
seeker, err := newHTTPReadSeeker(sz, func(offset int64) (io.ReadCloser, error) {
|
||||
return r.open(ctx, getReq, "", offset)
|
||||
})
|
||||
if err != nil {
|
||||
return nil, desc, err
|
||||
}
|
||||
|
||||
desc = ocispec.Descriptor{
|
||||
MediaType: "application/octet-stream",
|
||||
Digest: dgst,
|
||||
Size: sz,
|
||||
}
|
||||
return seeker, desc, nil
|
||||
}
|
||||
|
||||
func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) {
|
||||
if mediatype == "" {
|
||||
req.header.Set("Accept", "*/*")
|
||||
} else {
|
||||
req.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", "))
|
||||
}
|
||||
|
||||
if offset > 0 {
|
||||
// Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints
|
||||
|
@ -591,6 +591,28 @@ func testFetch(ctx context.Context, f remotes.Fetcher, desc ocispec.Descriptor)
|
||||
return fmt.Errorf("content mismatch: %s != %s", dgstr.Digest(), desc.Digest)
|
||||
}
|
||||
|
||||
fByDigest, ok := f.(remotes.FetcherByDigest)
|
||||
if !ok {
|
||||
return fmt.Errorf("fetcher %T does not implement FetcherByDigest", f)
|
||||
}
|
||||
r2, desc2, err := fByDigest.FetchByDigest(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return fmt.Errorf("FetcherByDigest: faild to fetch %v: %w", desc.Digest, err)
|
||||
}
|
||||
if desc2.Size != desc.Size {
|
||||
r2b, err := io.ReadAll(r2)
|
||||
if err != nil {
|
||||
return fmt.Errorf("FetcherByDigest: size mismatch: %d != %d (content: %v)", desc2.Size, desc.Size, err)
|
||||
}
|
||||
return fmt.Errorf("FetcherByDigest: size mismatch: %d != %d (content: %q)", desc2.Size, desc.Size, string(r2b))
|
||||
}
|
||||
dgstr2 := desc.Digest.Algorithm().Digester()
|
||||
if _, err = io.Copy(dgstr2.Hash(), r2); err != nil {
|
||||
return fmt.Errorf("FetcherByDigest: faild to copy: %w", err)
|
||||
}
|
||||
if dgstr2.Digest() != desc.Digest {
|
||||
return fmt.Errorf("FetcherByDigest: content mismatch: %s != %s", dgstr2.Digest(), desc.Digest)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
@ -50,12 +51,23 @@ type Resolver interface {
|
||||
Pusher(ctx context.Context, ref string) (Pusher, error)
|
||||
}
|
||||
|
||||
// Fetcher fetches content
|
||||
// Fetcher fetches content.
|
||||
// A fetcher implementation may implement the FetcherByDigest interface too.
|
||||
type Fetcher interface {
|
||||
// Fetch the resource identified by the descriptor.
|
||||
Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
// FetcherByDigest fetches content by the digest.
|
||||
type FetcherByDigest interface {
|
||||
// FetchByDigest fetches the resource identified by the digest.
|
||||
//
|
||||
// FetcherByDigest usually returns an incomplete descriptor.
|
||||
// Typically, the media type is always set to "application/octet-stream",
|
||||
// and the annotations are unset.
|
||||
FetchByDigest(ctx context.Context, dgst digest.Digest) (io.ReadCloser, ocispec.Descriptor, error)
|
||||
}
|
||||
|
||||
// Pusher pushes content
|
||||
type Pusher interface {
|
||||
// Push returns a content writer for the given resource identified
|
||||
|
Loading…
Reference in New Issue
Block a user