
Support CRI configuration to allow for request-time rewrite rules applicable only to the repository portion of resource paths when pulling images. Because the rewrites are applied at request time, images themselves will not be "rewritten" -- images as stored by CRI (and the underlying containerd facility) will continue to present as normal. As an example, if you use the following config for your containerd: ```toml [plugins] [plugins."io.containerd.grpc.v1.cri"] [plugins."io.containerd.grpc.v1.cri".registry] [plugins."io.containerd.grpc.v1.cri".registry.mirrors] [plugins."io.containerd.grpc.v1.cri".registry.mirrors."docker.io"] endpoint = ["https://registry-1.docker.io/v2"] [plugins."io.containerd.grpc.v1.cri".registry.mirrors."docker.io".rewrite] "^library/(.*)" = "my-org/$1" ``` And then subsequently invoke `crictl pull alpine:3.13` it will pull content from `docker.io/my-org/alpine:3.13` but still show up as `docker.io/library/alpine:3.13` in the `crictl images` listing. This commit has been reworked from the original implementation. Rewites are now done when resolving instead of when building the request, so that auth token scopes stored in the context properly reflect the rewritten repository path. For the original implementation, see 06c4ea9baec2b278b8172a789bf601168292f645. Ref: https://github.com/k3s-io/k3s/issues/11191#issuecomment-2455525773 Signed-off-by: Jacob Blain Christen <jacob@rancher.com> Co-authored-by: Brad Davidson <brad.davidson@rancher.com> Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
372 lines
9.9 KiB
Go
372 lines
9.9 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package docker
|
|
|
|
import (
|
|
"compress/flate"
|
|
"compress/gzip"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"github.com/containerd/containerd/v2/core/images"
|
|
"github.com/containerd/containerd/v2/core/remotes"
|
|
"github.com/containerd/errdefs"
|
|
"github.com/containerd/log"
|
|
"github.com/klauspost/compress/zstd"
|
|
digest "github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
)
|
|
|
|
type dockerFetcher struct {
|
|
*dockerBase
|
|
}
|
|
|
|
func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest))
|
|
|
|
hosts := r.filterHosts(HostCapabilityPull)
|
|
if len(hosts) == 0 {
|
|
return nil, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound)
|
|
}
|
|
|
|
return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) {
|
|
// firstly try fetch via external urls
|
|
for _, us := range desc.URLs {
|
|
ctx, err := ContextWithRepositoryScope(ctx, r.refspec, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
u, err := url.Parse(us)
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Debugf("failed to parse %q", us)
|
|
continue
|
|
}
|
|
if u.Scheme != "http" && u.Scheme != "https" {
|
|
log.G(ctx).Debug("non-http(s) alternative url is unsupported")
|
|
continue
|
|
}
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u))
|
|
log.G(ctx).Info("request")
|
|
|
|
// Try this first, parse it
|
|
host := RegistryHost{
|
|
Client: http.DefaultClient,
|
|
Host: u.Host,
|
|
Scheme: u.Scheme,
|
|
Path: u.Path,
|
|
Capabilities: HostCapabilityPull,
|
|
}
|
|
req := r.request(host, http.MethodGet)
|
|
// Strip namespace from base
|
|
req.path = u.Path
|
|
if u.RawQuery != "" {
|
|
req.path = req.path + "?" + u.RawQuery
|
|
}
|
|
|
|
rc, err := r.open(ctx, req, desc.MediaType, offset)
|
|
if err != nil {
|
|
if errdefs.IsNotFound(err) {
|
|
continue // try one of the other urls.
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return rc, nil
|
|
}
|
|
|
|
// Try manifests endpoints for manifests types
|
|
if images.IsManifestType(desc.MediaType) || images.IsIndexType(desc.MediaType) ||
|
|
desc.MediaType == images.MediaTypeDockerSchema1Manifest {
|
|
|
|
var firstErr error
|
|
for _, host := range r.hosts {
|
|
base := r.withRewritesFromHost(host)
|
|
ctx, err := ContextWithRepositoryScope(ctx, base.refspec, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := base.request(host, http.MethodGet, "manifests", desc.Digest.String())
|
|
if err := req.addNamespace(base.refspec.Hostname()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rc, err := r.open(ctx, req, desc.MediaType, offset)
|
|
if err != nil {
|
|
// Store the error for referencing later
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
continue // try another host
|
|
}
|
|
|
|
return rc, nil
|
|
}
|
|
|
|
return nil, firstErr
|
|
}
|
|
|
|
// Finally use blobs endpoints
|
|
var firstErr error
|
|
for _, host := range r.hosts {
|
|
base := r.withRewritesFromHost(host)
|
|
ctx, err := ContextWithRepositoryScope(ctx, base.refspec, false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := base.request(host, http.MethodGet, "blobs", desc.Digest.String())
|
|
if err := req.addNamespace(base.refspec.Hostname()); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rc, err := r.open(ctx, req, desc.MediaType, offset)
|
|
if err != nil {
|
|
// Store the error for referencing later
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
continue // try another host
|
|
}
|
|
|
|
return rc, nil
|
|
}
|
|
|
|
if errdefs.IsNotFound(firstErr) {
|
|
firstErr = fmt.Errorf("could not fetch content descriptor %v (%v) from remote: %w",
|
|
desc.Digest, desc.MediaType, errdefs.ErrNotFound,
|
|
)
|
|
}
|
|
|
|
return nil, firstErr
|
|
|
|
})
|
|
}
|
|
|
|
func (r dockerFetcher) createGetReq(ctx context.Context, host RegistryHost, mediatype string, ps ...string) (*request, int64, error) {
|
|
base := r.withRewritesFromHost(host)
|
|
ctx, err := ContextWithRepositoryScope(ctx, base.refspec, false)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
headReq := base.request(host, http.MethodHead, ps...)
|
|
if err := headReq.addNamespace(base.refspec.Hostname()); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
if mediatype == "" {
|
|
headReq.header.Set("Accept", "*/*")
|
|
} else {
|
|
headReq.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", "))
|
|
}
|
|
|
|
headResp, err := headReq.doWithRetries(ctx, nil)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if headResp.Body != nil {
|
|
headResp.Body.Close()
|
|
}
|
|
if headResp.StatusCode > 299 {
|
|
return nil, 0, fmt.Errorf("unexpected HEAD status code %v: %s", headReq.String(), headResp.Status)
|
|
}
|
|
|
|
getReq := base.request(host, http.MethodGet, ps...)
|
|
if err := getReq.addNamespace(base.refspec.Hostname()); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
return getReq, headResp.ContentLength, nil
|
|
}
|
|
|
|
func (r dockerFetcher) FetchByDigest(ctx context.Context, dgst digest.Digest, opts ...remotes.FetchByDigestOpts) (io.ReadCloser, ocispec.Descriptor, error) {
|
|
var desc ocispec.Descriptor
|
|
ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", dgst))
|
|
var config remotes.FetchByDigestConfig
|
|
for _, o := range opts {
|
|
if err := o(ctx, &config); err != nil {
|
|
return nil, desc, err
|
|
}
|
|
}
|
|
|
|
hosts := r.filterHosts(HostCapabilityPull)
|
|
if len(hosts) == 0 {
|
|
return nil, desc, fmt.Errorf("no pull hosts: %w", errdefs.ErrNotFound)
|
|
}
|
|
|
|
var (
|
|
getReq *request
|
|
sz int64
|
|
err, firstErr error
|
|
)
|
|
|
|
for _, host := range r.hosts {
|
|
getReq, sz, err = r.createGetReq(ctx, host, config.Mediatype, "blobs", dgst.String())
|
|
if err == nil {
|
|
break
|
|
}
|
|
// Store the error for referencing later
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
|
|
if getReq == nil {
|
|
// Fall back to the "manifests" endpoint
|
|
for _, host := range r.hosts {
|
|
getReq, sz, err = r.createGetReq(ctx, host, config.Mediatype, "manifests", dgst.String())
|
|
if err == nil {
|
|
break
|
|
}
|
|
// Store the error for referencing later
|
|
if firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
}
|
|
|
|
if getReq == nil {
|
|
if errdefs.IsNotFound(firstErr) {
|
|
firstErr = fmt.Errorf("could not fetch content %v from remote: %w", dgst, errdefs.ErrNotFound)
|
|
}
|
|
if firstErr == nil {
|
|
firstErr = fmt.Errorf("could not fetch content %v from remote: (unknown)", dgst)
|
|
}
|
|
return nil, desc, firstErr
|
|
}
|
|
|
|
seeker, err := newHTTPReadSeeker(sz, func(offset int64) (io.ReadCloser, error) {
|
|
return r.open(ctx, getReq, config.Mediatype, offset)
|
|
})
|
|
if err != nil {
|
|
return nil, desc, err
|
|
}
|
|
|
|
desc = ocispec.Descriptor{
|
|
MediaType: "application/octet-stream",
|
|
Digest: dgst,
|
|
Size: sz,
|
|
}
|
|
if config.Mediatype != "" {
|
|
desc.MediaType = config.Mediatype
|
|
}
|
|
return seeker, desc, nil
|
|
}
|
|
|
|
func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64) (_ io.ReadCloser, retErr error) {
|
|
if mediatype == "" {
|
|
req.header.Set("Accept", "*/*")
|
|
} else {
|
|
req.header.Set("Accept", strings.Join([]string{mediatype, `*/*`}, ", "))
|
|
}
|
|
req.header.Set("Accept-Encoding", "zstd;q=1.0, gzip;q=0.8, deflate;q=0.5")
|
|
|
|
if offset > 0 {
|
|
// Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints
|
|
// will return the header without supporting the range. The content
|
|
// range must always be checked.
|
|
req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
|
|
}
|
|
|
|
resp, err := req.doWithRetries(ctx, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() {
|
|
if retErr != nil {
|
|
resp.Body.Close()
|
|
}
|
|
}()
|
|
|
|
if resp.StatusCode > 299 {
|
|
// TODO(stevvooe): When doing a offset specific request, we should
|
|
// really distinguish between a 206 and a 200. In the case of 200, we
|
|
// can discard the bytes, hiding the seek behavior from the
|
|
// implementation.
|
|
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
return nil, fmt.Errorf("content at %v not found: %w", req.String(), errdefs.ErrNotFound)
|
|
}
|
|
var registryErr Errors
|
|
if err := json.NewDecoder(resp.Body).Decode(®istryErr); err != nil || registryErr.Len() < 1 {
|
|
return nil, fmt.Errorf("unexpected status code %v: %v", req.String(), resp.Status)
|
|
}
|
|
return nil, fmt.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error())
|
|
}
|
|
if offset > 0 {
|
|
cr := resp.Header.Get("content-range")
|
|
if cr != "" {
|
|
if !strings.HasPrefix(cr, fmt.Sprintf("bytes %d-", offset)) {
|
|
return nil, fmt.Errorf("unhandled content range in response: %v", cr)
|
|
|
|
}
|
|
} else {
|
|
// TODO: Should any cases where use of content range
|
|
// without the proper header be considered?
|
|
// 206 responses?
|
|
|
|
// Discard up to offset
|
|
// Could use buffer pool here but this case should be rare
|
|
n, err := io.Copy(io.Discard, io.LimitReader(resp.Body, offset))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to discard to offset: %w", err)
|
|
}
|
|
if n != offset {
|
|
return nil, errors.New("unable to discard to offset")
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
body := resp.Body
|
|
encoding := strings.FieldsFunc(resp.Header.Get("Content-Encoding"), func(r rune) bool {
|
|
return r == ' ' || r == '\t' || r == ','
|
|
})
|
|
for i := len(encoding) - 1; i >= 0; i-- {
|
|
algorithm := strings.ToLower(encoding[i])
|
|
switch algorithm {
|
|
case "zstd":
|
|
r, err := zstd.NewReader(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
body = r.IOReadCloser()
|
|
case "gzip":
|
|
body, err = gzip.NewReader(body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
case "deflate":
|
|
body = flate.NewReader(body)
|
|
case "identity", "":
|
|
// no content-encoding applied, use raw body
|
|
default:
|
|
return nil, errors.New("unsupported Content-Encoding algorithm: " + algorithm)
|
|
}
|
|
}
|
|
|
|
return body, nil
|
|
}
|