Merge pull request #3400 from dmcgowan/registry-configuration

Update resolver to handle endpoint configuration
This commit is contained in:
Michael Crosby 2019-07-17 11:53:45 -04:00 committed by GitHub
commit 1ff1f87fea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 861 additions and 363 deletions

View File

@ -31,7 +31,6 @@ import (
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log" "github.com/containerd/containerd/log"
"github.com/containerd/containerd/version"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/net/context/ctxhttp" "golang.org/x/net/context/ctxhttp"
@ -41,7 +40,7 @@ type dockerAuthorizer struct {
credentials func(string) (string, string, error) credentials func(string) (string, string, error)
client *http.Client client *http.Client
ua string header http.Header
mu sync.Mutex mu sync.Mutex
// indexed by host name // indexed by host name
@ -50,15 +49,58 @@ type dockerAuthorizer struct {
// NewAuthorizer creates a Docker authorizer using the provided function to // NewAuthorizer creates a Docker authorizer using the provided function to
// get credentials for the token server or basic auth. // get credentials for the token server or basic auth.
// Deprecated: Use NewDockerAuthorizer
func NewAuthorizer(client *http.Client, f func(string) (string, string, error)) Authorizer { func NewAuthorizer(client *http.Client, f func(string) (string, string, error)) Authorizer {
if client == nil { return NewDockerAuthorizer(WithAuthClient(client), WithAuthCreds(f))
client = http.DefaultClient }
type authorizerConfig struct {
credentials func(string) (string, string, error)
client *http.Client
header http.Header
}
// AuthorizerOpt configures an authorizer
type AuthorizerOpt func(*authorizerConfig)
// WithAuthClient provides the HTTP client for the authorizer
func WithAuthClient(client *http.Client) AuthorizerOpt {
return func(opt *authorizerConfig) {
opt.client = client
}
}
// WithAuthCreds provides a credential function to the authorizer
func WithAuthCreds(creds func(string) (string, string, error)) AuthorizerOpt {
return func(opt *authorizerConfig) {
opt.credentials = creds
}
}
// WithAuthHeader provides HTTP headers for authorization
func WithAuthHeader(hdr http.Header) AuthorizerOpt {
return func(opt *authorizerConfig) {
opt.header = hdr
}
}
// NewDockerAuthorizer creates an authorizer using Docker's registry
// authentication spec.
// See https://docs.docker.com/registry/spec/auth/
func NewDockerAuthorizer(opts ...AuthorizerOpt) Authorizer {
var ao authorizerConfig
for _, opt := range opts {
opt(&ao)
}
if ao.client == nil {
ao.client = http.DefaultClient
} }
return &dockerAuthorizer{ return &dockerAuthorizer{
credentials: f, credentials: ao.credentials,
client: client, client: ao.client,
ua: "containerd/" + version.Version, header: ao.header,
handlers: make(map[string]*authHandler), handlers: make(map[string]*authHandler),
} }
} }
@ -115,7 +157,7 @@ func (a *dockerAuthorizer) AddResponses(ctx context.Context, responses []*http.R
return err return err
} }
a.handlers[host] = newAuthHandler(a.client, a.ua, c.scheme, common) a.handlers[host] = newAuthHandler(a.client, a.header, c.scheme, common)
return nil return nil
} else if c.scheme == basicAuth && a.credentials != nil { } else if c.scheme == basicAuth && a.credentials != nil {
username, secret, err := a.credentials(host) username, secret, err := a.credentials(host)
@ -129,7 +171,7 @@ func (a *dockerAuthorizer) AddResponses(ctx context.Context, responses []*http.R
secret: secret, secret: secret,
} }
a.handlers[host] = newAuthHandler(a.client, a.ua, c.scheme, common) a.handlers[host] = newAuthHandler(a.client, a.header, c.scheme, common)
return nil return nil
} }
} }
@ -179,7 +221,7 @@ type authResult struct {
type authHandler struct { type authHandler struct {
sync.Mutex sync.Mutex
ua string header http.Header
client *http.Client client *http.Client
@ -194,13 +236,9 @@ type authHandler struct {
scopedTokens map[string]*authResult scopedTokens map[string]*authResult
} }
func newAuthHandler(client *http.Client, ua string, scheme authenticationScheme, opts tokenOptions) *authHandler { func newAuthHandler(client *http.Client, hdr http.Header, scheme authenticationScheme, opts tokenOptions) *authHandler {
if client == nil {
client = http.DefaultClient
}
return &authHandler{ return &authHandler{
ua: ua, header: hdr,
client: client, client: client,
scheme: scheme, scheme: scheme,
common: opts, common: opts,
@ -313,8 +351,10 @@ func (ah *authHandler) fetchTokenWithOAuth(ctx context.Context, to tokenOptions)
return "", err return "", err
} }
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8") req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
if ah.ua != "" { if ah.header != nil {
req.Header.Set("User-Agent", ah.ua) for k, v := range ah.header {
req.Header[k] = append(req.Header[k], v...)
}
} }
resp, err := ctxhttp.Do(ctx, ah.client, req) resp, err := ctxhttp.Do(ctx, ah.client, req)
@ -363,8 +403,10 @@ func (ah *authHandler) fetchToken(ctx context.Context, to tokenOptions) (string,
return "", err return "", err
} }
if ah.ua != "" { if ah.header != nil {
req.Header.Set("User-Agent", ah.ua) for k, v := range ah.header {
req.Header[k] = append(req.Header[k], v...)
}
} }
reqParams := req.URL.Query() reqParams := req.URL.Query()

View File

@ -23,7 +23,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path" "net/url"
"strings" "strings"
"github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/errdefs"
@ -32,7 +32,6 @@ import (
"github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/errcode"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sirupsen/logrus"
) )
type dockerFetcher struct { type dockerFetcher struct {
@ -40,26 +39,46 @@ type dockerFetcher struct {
} }
func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) { func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields( ctx = log.WithLogger(ctx, log.G(ctx).WithField("digest", desc.Digest))
logrus.Fields{
"base": r.base.String(),
"digest": desc.Digest,
},
))
urls, err := r.getV2URLPaths(ctx, desc) hosts := r.filterHosts(HostCapabilityPull)
if err != nil { if len(hosts) == 0 {
return nil, err return nil, errors.Wrap(errdefs.ErrNotFound, "no pull hosts")
} }
ctx, err = contextWithRepositoryScope(ctx, r.refspec, false) ctx, err := contextWithRepositoryScope(ctx, r.refspec, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) { return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) {
for _, u := range urls { // firstly try fetch via external urls
rc, err := r.open(ctx, u, desc.MediaType, offset) for _, us := range desc.URLs {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", us))
u, err := url.Parse(us)
if err != nil {
log.G(ctx).WithError(err).Debug("failed to parse")
continue
}
log.G(ctx).Debug("trying alternative url")
// Try this first, parse it
host := RegistryHost{
Client: http.DefaultClient,
Host: u.Host,
Scheme: u.Scheme,
Path: u.Path,
Capabilities: HostCapabilityPull,
}
req := r.request(host, http.MethodGet)
// Strip namespace from base
req.path = u.Path
if u.RawQuery != "" {
req.path = req.path + "?" + u.RawQuery
}
rc, err := r.open(ctx, req, desc.MediaType, offset)
if err != nil { if err != nil {
if errdefs.IsNotFound(err) { if errdefs.IsNotFound(err) {
continue // try one of the other urls. continue // try one of the other urls.
@ -71,6 +90,44 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R
return rc, nil return rc, nil
} }
// Try manifests endpoints for manifests types
switch desc.MediaType {
case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList,
images.MediaTypeDockerSchema1Manifest,
ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex:
for _, host := range r.hosts {
req := r.request(host, http.MethodGet, "manifests", desc.Digest.String())
rc, err := r.open(ctx, req, desc.MediaType, offset)
if err != nil {
if errdefs.IsNotFound(err) {
continue // try another host
}
return nil, err
}
return rc, nil
}
}
// Finally use blobs endpoints
for _, host := range r.hosts {
req := r.request(host, http.MethodGet, "blobs", desc.Digest.String())
rc, err := r.open(ctx, req, desc.MediaType, offset)
if err != nil {
if errdefs.IsNotFound(err) {
continue // try another host
}
return nil, err
}
return rc, nil
}
return nil, errors.Wrapf(errdefs.ErrNotFound, return nil, errors.Wrapf(errdefs.ErrNotFound,
"could not fetch content descriptor %v (%v) from remote", "could not fetch content descriptor %v (%v) from remote",
desc.Digest, desc.MediaType) desc.Digest, desc.MediaType)
@ -78,22 +135,17 @@ func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R
}) })
} }
func (r dockerFetcher) open(ctx context.Context, u, mediatype string, offset int64) (io.ReadCloser, error) { func (r dockerFetcher) open(ctx context.Context, req *request, mediatype string, offset int64) (io.ReadCloser, error) {
req, err := http.NewRequest(http.MethodGet, u, nil) req.header.Set("Accept", strings.Join([]string{mediatype, `*`}, ", "))
if err != nil {
return nil, err
}
req.Header.Set("Accept", strings.Join([]string{mediatype, `*`}, ", "))
if offset > 0 { if offset > 0 {
// Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints // Note: "Accept-Ranges: bytes" cannot be trusted as some endpoints
// will return the header without supporting the range. The content // will return the header without supporting the range. The content
// range must always be checked. // range must always be checked.
req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) req.header.Set("Range", fmt.Sprintf("bytes=%d-", offset))
} }
resp, err := r.doRequestWithRetries(ctx, req, nil) resp, err := req.doWithRetries(ctx, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -106,13 +158,13 @@ func (r dockerFetcher) open(ctx context.Context, u, mediatype string, offset int
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode == http.StatusNotFound { if resp.StatusCode == http.StatusNotFound {
return nil, errors.Wrapf(errdefs.ErrNotFound, "content at %v not found", u) return nil, errors.Wrapf(errdefs.ErrNotFound, "content at %v not found", req.String())
} }
var registryErr errcode.Errors var registryErr errcode.Errors
if err := json.NewDecoder(resp.Body).Decode(&registryErr); err != nil || registryErr.Len() < 1 { if err := json.NewDecoder(resp.Body).Decode(&registryErr); err != nil || registryErr.Len() < 1 {
return nil, errors.Errorf("unexpected status code %v: %v", u, resp.Status) return nil, errors.Errorf("unexpected status code %v: %v", req.String(), resp.Status)
} }
return nil, errors.Errorf("unexpected status code %v: %s - Server message: %s", u, resp.Status, registryErr.Error()) return nil, errors.Errorf("unexpected status code %v: %s - Server message: %s", req.String(), resp.Status, registryErr.Error())
} }
if offset > 0 { if offset > 0 {
cr := resp.Header.Get("content-range") cr := resp.Header.Get("content-range")
@ -141,30 +193,3 @@ func (r dockerFetcher) open(ctx context.Context, u, mediatype string, offset int
return resp.Body, nil return resp.Body, nil
} }
// getV2URLPaths generates the candidate urls paths for the object based on the
// set of hints and the provided object id. URLs are returned in the order of
// most to least likely succeed.
func (r *dockerFetcher) getV2URLPaths(ctx context.Context, desc ocispec.Descriptor) ([]string, error) {
var urls []string
if len(desc.URLs) > 0 {
// handle fetch via external urls.
for _, u := range desc.URLs {
log.G(ctx).WithField("url", u).Debug("adding alternative url")
urls = append(urls, u)
}
}
switch desc.MediaType {
case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList,
images.MediaTypeDockerSchema1Manifest,
ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex:
urls = append(urls, r.url(path.Join("manifests", desc.Digest.String())))
}
// always fallback to attempting to get the object out of the blobs store.
urls = append(urls, r.url(path.Join("blobs", desc.Digest.String())))
return urls, nil
}

View File

@ -25,6 +25,7 @@ import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"testing" "testing"
"github.com/docker/distribution/registry/api/errcode" "github.com/docker/distribution/registry/api/errcode"
@ -46,14 +47,30 @@ func TestFetcherOpen(t *testing.T) {
})) }))
defer s.Close() defer s.Close()
u, err := url.Parse(s.URL)
if err != nil {
t.Fatal(err)
}
f := dockerFetcher{&dockerBase{ f := dockerFetcher{&dockerBase{
client: s.Client(), namespace: "nonempty",
}} }}
host := RegistryHost{
Client: s.Client(),
Host: u.Host,
Scheme: u.Scheme,
Path: u.Path,
}
ctx := context.Background() ctx := context.Background()
req := f.request(host, http.MethodGet)
checkReader := func(o int64) { checkReader := func(o int64) {
t.Helper() t.Helper()
rc, err := f.open(ctx, s.URL, "", o)
rc, err := f.open(ctx, req, "", o)
if err != nil { if err != nil {
t.Fatalf("failed to open: %+v", err) t.Fatalf("failed to open: %+v", err)
} }
@ -93,7 +110,7 @@ func TestFetcherOpen(t *testing.T) {
// Check that server returning a different content range // Check that server returning a different content range
// then requested errors // then requested errors
start = 30 start = 30
_, err := f.open(ctx, s.URL, "", 20) _, err = f.open(ctx, req, "", 20)
if err == nil { if err == nil {
t.Fatal("expected error opening with invalid server response") t.Fatal("expected error opening with invalid server response")
} }
@ -160,20 +177,34 @@ func TestDockerFetcherOpen(t *testing.T) {
})) }))
defer s.Close() defer s.Close()
r := dockerFetcher{&dockerBase{ u, err := url.Parse(s.URL)
client: s.Client(), if err != nil {
t.Fatal(err)
}
f := dockerFetcher{&dockerBase{
namespace: "ns",
}} }}
got, err := r.open(context.TODO(), s.URL, "", 0) host := RegistryHost{
Client: s.Client(),
Host: u.Host,
Scheme: u.Scheme,
Path: u.Path,
}
req := f.request(host, http.MethodGet)
got, err := f.open(context.TODO(), req, "", 0)
assert.Equal(t, tt.wantErr, (err != nil)) assert.Equal(t, tt.wantErr, (err != nil))
assert.Equal(t, tt.want, got) assert.Equal(t, tt.want, got)
assert.Equal(t, tt.retries, 0) assert.Equal(t, tt.retries, 0)
if tt.wantErr { if tt.wantErr {
var expectedError error var expectedError error
if tt.wantServerMessageError { if tt.wantServerMessageError {
expectedError = errors.Errorf("unexpected status code %v: %v %s - Server message: %s", s.URL, tt.mockedStatus, http.StatusText(tt.mockedStatus), tt.mockedErr.Error()) expectedError = errors.Errorf("unexpected status code %v/ns: %v %s - Server message: %s", s.URL, tt.mockedStatus, http.StatusText(tt.mockedStatus), tt.mockedErr.Error())
} else if tt.wantPlainError { } else if tt.wantPlainError {
expectedError = errors.Errorf("unexpected status code %v: %v %s", s.URL, tt.mockedStatus, http.StatusText(tt.mockedStatus)) expectedError = errors.Errorf("unexpected status code %v/ns: %v %s", s.URL, tt.mockedStatus, http.StatusText(tt.mockedStatus))
} }
assert.Equal(t, expectedError.Error(), err.Error()) assert.Equal(t, expectedError.Error(), err.Error())

View File

@ -21,7 +21,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"path" "net/url"
"strings" "strings"
"time" "time"
@ -59,9 +59,15 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
return nil, errors.Wrap(err, "failed to get status") return nil, errors.Wrap(err, "failed to get status")
} }
hosts := p.filterHosts(HostCapabilityPush)
if len(hosts) == 0 {
return nil, errors.Wrap(errdefs.ErrNotFound, "no push hosts")
}
var ( var (
isManifest bool isManifest bool
existCheck string existCheck []string
host = hosts[0]
) )
switch desc.MediaType { switch desc.MediaType {
@ -69,21 +75,20 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex:
isManifest = true isManifest = true
if p.tag == "" { if p.tag == "" {
existCheck = path.Join("manifests", desc.Digest.String()) existCheck = []string{"manifests", desc.Digest.String()}
} else { } else {
existCheck = path.Join("manifests", p.tag) existCheck = []string{"manifests", p.tag}
} }
default: default:
existCheck = path.Join("blobs", desc.Digest.String()) existCheck = []string{"blobs", desc.Digest.String()}
} }
req, err := http.NewRequest(http.MethodHead, p.url(existCheck), nil) req := p.request(host, http.MethodHead, existCheck...)
if err != nil { req.header.Set("Accept", strings.Join([]string{desc.MediaType, `*`}, ", "))
return nil, err
}
req.Header.Set("Accept", strings.Join([]string{desc.MediaType, `*`}, ", ")) log.G(ctx).WithField("url", req.String()).Debugf("checking and pushing to")
resp, err := p.doRequestWithRetries(ctx, req, nil)
resp, err := req.doWithRetries(ctx, nil)
if err != nil { if err != nil {
if errors.Cause(err) != ErrInvalidAuthorization { if errors.Cause(err) != ErrInvalidAuthorization {
return nil, err return nil, err
@ -117,28 +122,22 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
} }
if isManifest { if isManifest {
var putPath string var putPath []string
if p.tag != "" { if p.tag != "" {
putPath = path.Join("manifests", p.tag) putPath = []string{"manifests", p.tag}
} else { } else {
putPath = path.Join("manifests", desc.Digest.String()) putPath = []string{"manifests", desc.Digest.String()}
} }
req, err = http.NewRequest(http.MethodPut, p.url(putPath), nil) req = p.request(host, http.MethodPut, putPath...)
if err != nil { req.header.Add("Content-Type", desc.MediaType)
return nil, err
}
req.Header.Add("Content-Type", desc.MediaType)
} else { } else {
// Start upload request // Start upload request
req, err = http.NewRequest(http.MethodPost, p.url("blobs", "uploads")+"/", nil) req = p.request(host, http.MethodPost, "blobs", "uploads/")
if err != nil {
return nil, err
}
var resp *http.Response var resp *http.Response
if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" { if fromRepo := selectRepositoryMountCandidate(p.refspec, desc.Annotations); fromRepo != "" {
req = requestWithMountFrom(req, desc.Digest.String(), fromRepo) preq := requestWithMountFrom(req, desc.Digest.String(), fromRepo)
pctx := contextWithAppendPullRepositoryScope(ctx, fromRepo) pctx := contextWithAppendPullRepositoryScope(ctx, fromRepo)
// NOTE: the fromRepo might be private repo and // NOTE: the fromRepo might be private repo and
@ -147,7 +146,8 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
// //
// for the private repo, we should remove mount-from // for the private repo, we should remove mount-from
// query and send the request again. // query and send the request again.
resp, err = p.doRequest(pctx, req) resp, err = preq.do(pctx)
//resp, err = p.doRequest(pctx, req)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -157,16 +157,11 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
resp.Body.Close() resp.Body.Close()
resp = nil resp = nil
req, err = removeMountFromQuery(req)
if err != nil {
return nil, err
}
} }
} }
if resp == nil { if resp == nil {
resp, err = p.doRequestWithRetries(ctx, req, nil) resp, err = req.doWithRetries(ctx, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -186,31 +181,41 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
return nil, errors.Errorf("unexpected response: %s", resp.Status) return nil, errors.Errorf("unexpected response: %s", resp.Status)
} }
location := resp.Header.Get("Location") var (
location = resp.Header.Get("Location")
lurl *url.URL
lhost = host
)
// Support paths without host in location // Support paths without host in location
if strings.HasPrefix(location, "/") { if strings.HasPrefix(location, "/") {
// Support location string containing path and query lurl, err = url.Parse(lhost.Scheme + "://" + lhost.Host + location)
qmIndex := strings.Index(location, "?") if err != nil {
if qmIndex > 0 { return nil, errors.Wrapf(err, "unable to parse location %v", location)
u := p.base }
u.Path = location[:qmIndex] } else {
u.RawQuery = location[qmIndex+1:] if !strings.Contains(location, "://") {
location = u.String() location = lhost.Scheme + "://" + location
} else { }
u := p.base lurl, err = url.Parse(location)
u.Path = location if err != nil {
location = u.String() return nil, errors.Wrapf(err, "unable to parse location %v", location)
}
if lurl.Host != lhost.Host || lhost.Scheme != lurl.Scheme {
lhost.Scheme = lurl.Scheme
lhost.Host = lurl.Host
log.G(ctx).WithField("host", lhost.Host).WithField("scheme", lhost.Scheme).Debug("upload changed destination")
// Strip authorizer if change to host or scheme
lhost.Authorizer = nil
} }
} }
q := lurl.Query()
req, err = http.NewRequest(http.MethodPut, location, nil)
if err != nil {
return nil, err
}
q := req.URL.Query()
q.Add("digest", desc.Digest.String()) q.Add("digest", desc.Digest.String())
req.URL.RawQuery = q.Encode()
req = p.request(lhost, http.MethodPut)
req.path = lurl.Path + "?" + q.Encode()
} }
p.tracker.SetStatus(ref, Status{ p.tracker.SetStatus(ref, Status{
Status: content.Status{ Status: content.Status{
@ -226,12 +231,14 @@ func (p dockerPusher) Push(ctx context.Context, desc ocispec.Descriptor) (conten
pr, pw := io.Pipe() pr, pw := io.Pipe()
respC := make(chan *http.Response, 1) respC := make(chan *http.Response, 1)
req.Body = ioutil.NopCloser(pr) req.body = func() (io.ReadCloser, error) {
req.ContentLength = desc.Size return ioutil.NopCloser(pr), nil
}
req.size = desc.Size
go func() { go func() {
defer close(respC) defer close(respC)
resp, err = p.doRequest(ctx, req) resp, err = req.do(ctx)
if err != nil { if err != nil {
pr.CloseWithError(err) pr.CloseWithError(err)
return return
@ -355,24 +362,15 @@ func (pw *pushWriter) Truncate(size int64) error {
return errors.New("cannot truncate remote upload") return errors.New("cannot truncate remote upload")
} }
func requestWithMountFrom(req *http.Request, mount, from string) *http.Request { func requestWithMountFrom(req *request, mount, from string) *request {
q := req.URL.Query() creq := *req
q.Set("mount", mount) sep := "?"
q.Set("from", from) if strings.Contains(creq.path, sep) {
req.URL.RawQuery = q.Encode() sep = "&"
return req
}
func removeMountFromQuery(req *http.Request) (*http.Request, error) {
req, err := copyRequest(req)
if err != nil {
return nil, err
} }
q := req.URL.Query() creq.path = creq.path + sep + "mount=" + mount + "&from=" + from
q.Del("mount")
q.Del("from") return &creq
req.URL.RawQuery = q.Encode()
return req, nil
} }

202
remotes/docker/registry.go Normal file
View File

@ -0,0 +1,202 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import (
"net/http"
)
// HostCapabilities represent the capabilities of the registry
// host. This also represents the set of operations for which
// the registry host may be trusted to perform.
//
// For example pushing is a capability which should only be
// performed on an upstream source, not a mirror.
// Resolving (the process of converting a name into a digest)
// must be considered a trusted operation and only done by
// a host which is trusted (or more preferably by secure process
// which can prove the provenance of the mapping). A public
// mirror should never be trusted to do a resolve action.
//
// | Registry Type | Pull | Resolve | Push |
// |------------------|------|---------|------|
// | Public Registry | yes | yes | yes |
// | Private Registry | yes | yes | yes |
// | Public Mirror | yes | no | no |
// | Private Mirror | yes | yes | no |
type HostCapabilities uint8
const (
// HostCapabilityPull represents the capability to fetch manifests
// and blobs by digest
HostCapabilityPull HostCapabilities = 1 << iota
// HostCapabilityResolve represents the capability to fetch manifests
// by name
HostCapabilityResolve
// HostCapabilityPush represents the capability to push blobs and
// manifests
HostCapabilityPush
// Reserved for future capabilities (i.e. search, catalog, remove)
)
func (c HostCapabilities) Has(t HostCapabilities) bool {
return c&t == t
}
// RegistryHost represents a complete configuration for a registry
// host, representing the capabilities, authorizations, connection
// configuration, and location.
type RegistryHost struct {
Client *http.Client
Authorizer Authorizer
Host string
Scheme string
Path string
Capabilities HostCapabilities
}
// RegistryHosts fetches the registry hosts for a given namespace,
// provided by the host component of an distribution image reference.
type RegistryHosts func(string) ([]RegistryHost, error)
// Registries joins multiple registry configuration functions, using the same
// order as provided within the arguments. When an empty registry configuration
// is returned with a nil error, the next function will be called.
// NOTE: This function will not join configurations, as soon as a non-empty
// configuration is returned from a configuration function, it will be returned
// to the caller.
func Registries(registries ...RegistryHosts) RegistryHosts {
return func(host string) ([]RegistryHost, error) {
for _, registry := range registries {
config, err := registry(host)
if err != nil {
return config, err
}
if len(config) > 0 {
return config, nil
}
}
return nil, nil
}
}
type registryOpts struct {
authorizer Authorizer
plainHTTP func(string) (bool, error)
host func(string) (string, error)
client *http.Client
}
// RegistryOpt defines a registry default option
type RegistryOpt func(*registryOpts)
// WithPlainHTTP configures registries to use plaintext http scheme
// for the provided host match function.
func WithPlainHTTP(f func(string) (bool, error)) RegistryOpt {
return func(opts *registryOpts) {
opts.plainHTTP = f
}
}
// WithAuthorizer configures the default authorizer for a registry
func WithAuthorizer(a Authorizer) RegistryOpt {
return func(opts *registryOpts) {
opts.authorizer = a
}
}
// WithHostTranslator defines the default translator to use for registry hosts
func WithHostTranslator(h func(string) (string, error)) RegistryOpt {
return func(opts *registryOpts) {
opts.host = h
}
}
// WithClient configures the default http client for a registry
func WithClient(c *http.Client) RegistryOpt {
return func(opts *registryOpts) {
opts.client = c
}
}
// ConfigureDefaultRegistries is used to create a default configuration for
// registries. For more advanced configurations or per-domain setups,
// the RegistryHosts interface should be used directly.
// NOTE: This function will always return a non-empty value or error
func ConfigureDefaultRegistries(ropts ...RegistryOpt) RegistryHosts {
var opts registryOpts
for _, opt := range ropts {
opt(&opts)
}
return func(host string) ([]RegistryHost, error) {
config := RegistryHost{
Client: opts.client,
Authorizer: opts.authorizer,
Host: host,
Scheme: "https",
Path: "/v2",
Capabilities: HostCapabilityPull | HostCapabilityResolve | HostCapabilityPush,
}
if config.Client == nil {
config.Client = http.DefaultClient
}
if opts.plainHTTP != nil {
match, err := opts.plainHTTP(host)
if err != nil {
return nil, err
}
if match {
config.Scheme = "http"
}
}
if opts.host != nil {
var err error
config.Host, err = opts.host(config.Host)
if err != nil {
return nil, err
}
} else if host == "docker.io" {
config.Host = "registry-1.docker.io"
}
return []RegistryHost{config}, nil
}
}
// MatchAllHosts is a host match function which is always true.
func MatchAllHosts(string) (bool, error) {
return true, nil
}
// MatchLocalhost is a host match function which returns true for
// localhost.
func MatchLocalhost(host string) (bool, error) {
for _, s := range []string{"localhost", "127.0.0.1", "[::1]"} {
if len(host) >= len(s) && host[0:len(s)] == s && (len(host) == len(s) || host[len(s)] == ':') {
return true, nil
}
}
return host == "::1", nil
}

View File

@ -0,0 +1,76 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package docker
import "testing"
func TestHasCapability(t *testing.T) {
var (
pull = HostCapabilityPull
rslv = HostCapabilityResolve
push = HostCapabilityPush
all = pull | rslv | push
)
for i, tc := range []struct {
c HostCapabilities
t HostCapabilities
e bool
}{
{all, pull, true},
{all, pull | rslv, true},
{all, pull | push, true},
{all, all, true},
{pull, all, false},
{pull, push, false},
{rslv, pull, false},
{pull | rslv, push, false},
{pull | rslv, rslv, true},
} {
if a := tc.c.Has(tc.t); a != tc.e {
t.Fatalf("%d: failed, expected %t, got %t", i, tc.e, a)
}
}
}
func TestMatchLocalhost(t *testing.T) {
for _, tc := range []struct {
host string
match bool
}{
{"", false},
{"127.1.1.1", false},
{"127.0.0.1", true},
{"127.0.0.1:5000", true},
{"registry.org", false},
{"localhost", true},
{"localhost:5000", true},
{"[127:0:0:1]", false},
{"[::1]", true},
{"[::1]:5000", true},
{"::1", true},
} {
actual, _ := MatchLocalhost(tc.host)
if actual != tc.match {
if tc.match {
t.Logf("Expected match for %s", tc.host)
} else {
t.Logf("Unexpected match for %s", tc.host)
}
t.Fail()
}
}
}

View File

@ -18,9 +18,10 @@ package docker
import ( import (
"context" "context"
"fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"net/url"
"path" "path"
"strings" "strings"
@ -46,6 +47,19 @@ var (
// ErrInvalidAuthorization is used when credentials are passed to a server but // ErrInvalidAuthorization is used when credentials are passed to a server but
// those credentials are rejected. // those credentials are rejected.
ErrInvalidAuthorization = errors.New("authorization failed") ErrInvalidAuthorization = errors.New("authorization failed")
// MaxManifestSize represents the largest size accepted from a registry
// during resolution. Larger manifests may be accepted using a
// resolution method other than the registry.
//
// NOTE: The max supported layers by some runtimes is 128 and individual
// layers will not contribute more than 256 bytes, making a
// reasonable limit for a large image manifests of 32K bytes.
// 4M bytes represents a much larger upper bound for images which may
// contain large annotations or be non-images. A proper manifest
// design puts large metadata in subobjects, as is consistent the
// intent of the manifest design.
MaxManifestSize int64 = 4 * 1048 * 1048
) )
// Authorizer is used to authorize HTTP requests based on 401 HTTP responses. // Authorizer is used to authorize HTTP requests based on 401 HTTP responses.
@ -72,31 +86,38 @@ type Authorizer interface {
// ResolverOptions are used to configured a new Docker register resolver // ResolverOptions are used to configured a new Docker register resolver
type ResolverOptions struct { type ResolverOptions struct {
// Authorizer is used to authorize registry requests // Hosts returns registry host configurations for a namespace.
Authorizer Authorizer Hosts RegistryHosts
// Credentials provides username and secret given a host.
// If username is empty but a secret is given, that secret
// is interpreted as a long lived token.
// Deprecated: use Authorizer
Credentials func(string) (string, string, error)
// Host provides the hostname given a namespace.
Host func(string) (string, error)
// Headers are the HTTP request header fields sent by the resolver // Headers are the HTTP request header fields sent by the resolver
Headers http.Header Headers http.Header
// PlainHTTP specifies to use plain http and not https
PlainHTTP bool
// Client is the http client to used when making registry requests
Client *http.Client
// Tracker is used to track uploads to the registry. This is used // Tracker is used to track uploads to the registry. This is used
// since the registry does not have upload tracking and the existing // since the registry does not have upload tracking and the existing
// mechanism for getting blob upload status is expensive. // mechanism for getting blob upload status is expensive.
Tracker StatusTracker Tracker StatusTracker
// Authorizer is used to authorize registry requests
// Deprecated: use Hosts
Authorizer Authorizer
// Credentials provides username and secret given a host.
// If username is empty but a secret is given, that secret
// is interpreted as a long lived token.
// Deprecated: use Hosts
Credentials func(string) (string, string, error)
// Host provides the hostname given a namespace.
// Deprecated: use Hosts
Host func(string) (string, error)
// PlainHTTP specifies to use plain http and not https
// Deprecated: use Hosts
PlainHTTP bool
// Client is the http client to used when making registry requests
// Deprecated: use Hosts
Client *http.Client
} }
// DefaultHost is the default host function. // DefaultHost is the default host function.
@ -108,13 +129,10 @@ func DefaultHost(ns string) (string, error) {
} }
type dockerResolver struct { type dockerResolver struct {
auth Authorizer hosts RegistryHosts
host func(string) (string, error) header http.Header
headers http.Header resolveHeader http.Header
uagent string tracker StatusTracker
plainHTTP bool
client *http.Client
tracker StatusTracker
} }
// NewResolver returns a new resolver to a Docker registry // NewResolver returns a new resolver to a Docker registry
@ -122,39 +140,56 @@ func NewResolver(options ResolverOptions) remotes.Resolver {
if options.Tracker == nil { if options.Tracker == nil {
options.Tracker = NewInMemoryTracker() options.Tracker = NewInMemoryTracker()
} }
if options.Host == nil {
options.Host = DefaultHost
}
if options.Headers == nil { if options.Headers == nil {
options.Headers = make(http.Header) options.Headers = make(http.Header)
} }
if _, ok := options.Headers["User-Agent"]; !ok {
options.Headers.Set("User-Agent", "containerd/"+version.Version)
}
resolveHeader := http.Header{}
if _, ok := options.Headers["Accept"]; !ok { if _, ok := options.Headers["Accept"]; !ok {
// set headers for all the types we support for resolution. // set headers for all the types we support for resolution.
options.Headers.Set("Accept", strings.Join([]string{ resolveHeader.Set("Accept", strings.Join([]string{
images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2Manifest,
images.MediaTypeDockerSchema2ManifestList, images.MediaTypeDockerSchema2ManifestList,
ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageManifest,
ocispec.MediaTypeImageIndex, "*"}, ", ")) ocispec.MediaTypeImageIndex, "*"}, ", "))
}
ua := options.Headers.Get("User-Agent")
if ua != "" {
options.Headers.Del("User-Agent")
} else { } else {
ua = "containerd/" + version.Version resolveHeader["Accept"] = options.Headers["Accept"]
delete(options.Headers, "Accept")
} }
if options.Authorizer == nil { if options.Hosts == nil {
options.Authorizer = NewAuthorizer(options.Client, options.Credentials) opts := []RegistryOpt{}
options.Authorizer.(*dockerAuthorizer).ua = ua if options.Host != nil {
opts = append(opts, WithHostTranslator(options.Host))
}
if options.Authorizer == nil {
options.Authorizer = NewDockerAuthorizer(
WithAuthClient(options.Client),
WithAuthHeader(options.Headers),
WithAuthCreds(options.Credentials))
}
opts = append(opts, WithAuthorizer(options.Authorizer))
if options.Client != nil {
opts = append(opts, WithClient(options.Client))
}
if options.PlainHTTP {
opts = append(opts, WithPlainHTTP(MatchAllHosts))
} else {
opts = append(opts, WithPlainHTTP(MatchLocalhost))
}
options.Hosts = ConfigureDefaultRegistries(opts...)
} }
return &dockerResolver{ return &dockerResolver{
auth: options.Authorizer, hosts: options.Hosts,
host: options.Host, header: options.Headers,
headers: options.Headers, resolveHeader: resolveHeader,
uagent: ua, tracker: options.Tracker,
plainHTTP: options.PlainHTTP,
client: options.Client,
tracker: options.Tracker,
} }
} }
@ -201,13 +236,11 @@ func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocisp
return "", ocispec.Descriptor{}, err return "", ocispec.Descriptor{}, err
} }
fetcher := dockerFetcher{
dockerBase: base,
}
var ( var (
urls []string lastErr error
dgst = refspec.Digest() paths [][]string
dgst = refspec.Digest()
caps = HostCapabilityPull
) )
if dgst != "" { if dgst != "" {
@ -218,100 +251,130 @@ func (r *dockerResolver) Resolve(ctx context.Context, ref string) (string, ocisp
} }
// turns out, we have a valid digest, make a url. // turns out, we have a valid digest, make a url.
urls = append(urls, fetcher.url("manifests", dgst.String())) paths = append(paths, []string{"manifests", dgst.String()})
// fallback to blobs on not found. // fallback to blobs on not found.
urls = append(urls, fetcher.url("blobs", dgst.String())) paths = append(paths, []string{"blobs", dgst.String()})
} else { } else {
urls = append(urls, fetcher.url("manifests", refspec.Object)) // Add
paths = append(paths, []string{"manifests", refspec.Object})
caps |= HostCapabilityResolve
}
hosts := base.filterHosts(caps)
if len(hosts) == 0 {
return "", ocispec.Descriptor{}, errors.Wrap(errdefs.ErrNotFound, "no resolve hosts")
} }
ctx, err = contextWithRepositoryScope(ctx, refspec, false) ctx, err = contextWithRepositoryScope(ctx, refspec, false)
if err != nil { if err != nil {
return "", ocispec.Descriptor{}, err return "", ocispec.Descriptor{}, err
} }
for _, u := range urls {
req, err := http.NewRequest(http.MethodHead, u, nil)
if err != nil {
return "", ocispec.Descriptor{}, err
}
req.Header = r.headers for _, u := range paths {
for _, host := range hosts {
ctx := log.WithLogger(ctx, log.G(ctx).WithField("host", host.Host))
log.G(ctx).Debug("resolving") req := base.request(host, http.MethodHead, u...)
resp, err := fetcher.doRequestWithRetries(ctx, req, nil) for key, value := range r.resolveHeader {
if err != nil { req.header[key] = append(req.header[key], value...)
if errors.Cause(err) == ErrInvalidAuthorization {
err = errors.Wrapf(err, "pull access denied, repository does not exist or may require authorization")
} }
return "", ocispec.Descriptor{}, err
}
resp.Body.Close() // don't care about body contents.
if resp.StatusCode > 299 { log.G(ctx).Debug("resolving")
if resp.StatusCode == http.StatusNotFound { resp, err := req.doWithRetries(ctx, nil)
if err != nil {
if errors.Cause(err) == ErrInvalidAuthorization {
err = errors.Wrapf(err, "pull access denied, repository does not exist or may require authorization")
}
return "", ocispec.Descriptor{}, err
}
resp.Body.Close() // don't care about body contents.
if resp.StatusCode > 299 {
if resp.StatusCode == http.StatusNotFound {
continue
}
return "", ocispec.Descriptor{}, errors.Errorf("unexpected status code %v: %v", u, resp.Status)
}
size := resp.ContentLength
contentType := getManifestMediaType(resp)
// if no digest was provided, then only a resolve
// trusted registry was contacted, in this case use
// the digest header (or content from GET)
if dgst == "" {
// this is the only point at which we trust the registry. we use the
// content headers to assemble a descriptor for the name. when this becomes
// more robust, we mostly get this information from a secure trust store.
dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest"))
if dgstHeader != "" && size != -1 {
if err := dgstHeader.Validate(); err != nil {
return "", ocispec.Descriptor{}, errors.Wrapf(err, "%q in header not a valid digest", dgstHeader)
}
dgst = dgstHeader
}
}
if dgst == "" || size == -1 {
log.G(ctx).Debug("no Docker-Content-Digest header, fetching manifest instead")
req = base.request(host, http.MethodGet, u...)
for key, value := range r.resolveHeader {
req.header[key] = append(req.header[key], value...)
}
resp, err := req.doWithRetries(ctx, nil)
if err != nil {
return "", ocispec.Descriptor{}, err
}
defer resp.Body.Close()
bodyReader := countingReader{reader: resp.Body}
contentType = getManifestMediaType(resp)
if dgst == "" {
if contentType == images.MediaTypeDockerSchema1Manifest {
b, err := schema1.ReadStripSignature(&bodyReader)
if err != nil {
return "", ocispec.Descriptor{}, err
}
dgst = digest.FromBytes(b)
} else {
dgst, err = digest.FromReader(&bodyReader)
if err != nil {
return "", ocispec.Descriptor{}, err
}
}
} else if _, err := io.Copy(ioutil.Discard, &bodyReader); err != nil {
return "", ocispec.Descriptor{}, err
}
size = bodyReader.bytesRead
}
// Prevent resolving to excessively large manifests
if size > MaxManifestSize {
if lastErr == nil {
lastErr = errors.Wrapf(errdefs.ErrNotFound, "rejecting %d byte manifest for %s", size, ref)
}
continue continue
} }
return "", ocispec.Descriptor{}, errors.Errorf("unexpected status code %v: %v", u, resp.Status)
desc := ocispec.Descriptor{
Digest: dgst,
MediaType: contentType,
Size: size,
}
log.G(ctx).WithField("desc.digest", desc.Digest).Debug("resolved")
return ref, desc, nil
} }
size := resp.ContentLength
// this is the only point at which we trust the registry. we use the
// content headers to assemble a descriptor for the name. when this becomes
// more robust, we mostly get this information from a secure trust store.
dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest"))
contentType := getManifestMediaType(resp)
if dgstHeader != "" && size != -1 {
if err := dgstHeader.Validate(); err != nil {
return "", ocispec.Descriptor{}, errors.Wrapf(err, "%q in header not a valid digest", dgstHeader)
}
dgst = dgstHeader
} else {
log.G(ctx).Debug("no Docker-Content-Digest header, fetching manifest instead")
req, err := http.NewRequest(http.MethodGet, u, nil)
if err != nil {
return "", ocispec.Descriptor{}, err
}
req.Header = r.headers
resp, err := fetcher.doRequestWithRetries(ctx, req, nil)
if err != nil {
return "", ocispec.Descriptor{}, err
}
defer resp.Body.Close()
bodyReader := countingReader{reader: resp.Body}
contentType = getManifestMediaType(resp)
if contentType == images.MediaTypeDockerSchema1Manifest {
b, err := schema1.ReadStripSignature(&bodyReader)
if err != nil {
return "", ocispec.Descriptor{}, err
}
dgst = digest.FromBytes(b)
} else {
dgst, err = digest.FromReader(&bodyReader)
if err != nil {
return "", ocispec.Descriptor{}, err
}
}
size = bodyReader.bytesRead
}
desc := ocispec.Descriptor{
Digest: dgst,
MediaType: contentType,
Size: size,
}
log.G(ctx).WithField("desc.digest", desc.Digest).Debug("resolved")
return ref, desc, nil
} }
return "", ocispec.Descriptor{}, errors.Errorf("%v not found", ref) if lastErr == nil {
lastErr = errors.Wrap(errdefs.ErrNotFound, ref)
}
return "", ocispec.Descriptor{}, lastErr
} }
func (r *dockerResolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) { func (r *dockerResolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) {
@ -356,56 +419,58 @@ func (r *dockerResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher
} }
type dockerBase struct { type dockerBase struct {
refspec reference.Spec refspec reference.Spec
base url.URL namespace string
uagent string hosts []RegistryHost
header http.Header
client *http.Client
auth Authorizer
} }
func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) { func (r *dockerResolver) base(refspec reference.Spec) (*dockerBase, error) {
var (
err error
base url.URL
)
host := refspec.Hostname() host := refspec.Hostname()
base.Host = host hosts, err := r.hosts(host)
if r.host != nil { if err != nil {
base.Host, err = r.host(host) return nil, err
if err != nil {
return nil, err
}
} }
base.Scheme = "https"
if r.plainHTTP || strings.HasPrefix(base.Host, "localhost:") {
base.Scheme = "http"
}
prefix := strings.TrimPrefix(refspec.Locator, host+"/")
base.Path = path.Join("/v2", prefix)
return &dockerBase{ return &dockerBase{
refspec: refspec, refspec: refspec,
base: base, namespace: strings.TrimPrefix(refspec.Locator, host+"/"),
uagent: r.uagent, hosts: hosts,
client: r.client, header: r.header,
auth: r.auth,
}, nil }, nil
} }
func (r *dockerBase) url(ps ...string) string { func (r *dockerBase) filterHosts(caps HostCapabilities) (hosts []RegistryHost) {
url := r.base for _, host := range r.hosts {
url.Path = path.Join(url.Path, path.Join(ps...)) if host.Capabilities.Has(caps) {
return url.String() hosts = append(hosts, host)
}
}
return
} }
func (r *dockerBase) authorize(ctx context.Context, req *http.Request) error { func (r *dockerBase) request(host RegistryHost, method string, ps ...string) *request {
header := http.Header{}
for key, value := range r.header {
header[key] = append(header[key], value...)
}
parts := append([]string{"/", host.Path, r.namespace}, ps...)
p := path.Join(parts...)
// Join strips trailing slash, re-add ending "/" if included
if len(parts) > 0 && strings.HasSuffix(parts[len(parts)-1], "/") {
p = p + "/"
}
return &request{
method: method,
path: p,
header: header,
host: host,
}
}
func (r *request) authorize(ctx context.Context, req *http.Request) error {
// Check if has header for host // Check if has header for host
if r.auth != nil { if r.host.Authorizer != nil {
if err := r.auth.Authorize(ctx, req); err != nil { if err := r.host.Authorizer.Authorize(ctx, req); err != nil {
return err return err
} }
} }
@ -413,83 +478,132 @@ func (r *dockerBase) authorize(ctx context.Context, req *http.Request) error {
return nil return nil
} }
func (r *dockerBase) doRequest(ctx context.Context, req *http.Request) (*http.Response, error) { type request struct {
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", req.URL.String())) method string
log.G(ctx).WithField("request.headers", req.Header).WithField("request.method", req.Method).Debug("do request") path string
req.Header.Set("User-Agent", r.uagent) header http.Header
host RegistryHost
body func() (io.ReadCloser, error)
size int64
}
func (r *request) do(ctx context.Context) (*http.Response, error) {
u := r.host.Scheme + "://" + r.host.Host + r.path
req, err := http.NewRequest(r.method, u, nil)
if err != nil {
return nil, err
}
req.Header = r.header
if r.body != nil {
req.GetBody = r.body
if r.size > 0 {
req.ContentLength = r.size
}
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", u))
log.G(ctx).WithFields(requestFields(req)).Debug("do request")
if err := r.authorize(ctx, req); err != nil { if err := r.authorize(ctx, req); err != nil {
return nil, errors.Wrap(err, "failed to authorize") return nil, errors.Wrap(err, "failed to authorize")
} }
resp, err := ctxhttp.Do(ctx, r.client, req) resp, err := ctxhttp.Do(ctx, r.host.Client, req)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to do request") return nil, errors.Wrap(err, "failed to do request")
} }
log.G(ctx).WithFields(logrus.Fields{ log.G(ctx).WithFields(responseFields(resp)).Debug("fetch response received")
"status": resp.Status,
"response.headers": resp.Header,
}).Debug("fetch response received")
return resp, nil return resp, nil
} }
func (r *dockerBase) doRequestWithRetries(ctx context.Context, req *http.Request, responses []*http.Response) (*http.Response, error) { func (r *request) doWithRetries(ctx context.Context, responses []*http.Response) (*http.Response, error) {
resp, err := r.doRequest(ctx, req) resp, err := r.do(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
responses = append(responses, resp) responses = append(responses, resp)
req, err = r.retryRequest(ctx, req, responses) retry, err := r.retryRequest(ctx, responses)
if err != nil { if err != nil {
resp.Body.Close() resp.Body.Close()
return nil, err return nil, err
} }
if req != nil { if retry {
resp.Body.Close() resp.Body.Close()
return r.doRequestWithRetries(ctx, req, responses) return r.doWithRetries(ctx, responses)
} }
return resp, err return resp, err
} }
func (r *dockerBase) retryRequest(ctx context.Context, req *http.Request, responses []*http.Response) (*http.Request, error) { func (r *request) retryRequest(ctx context.Context, responses []*http.Response) (bool, error) {
if len(responses) > 5 { if len(responses) > 5 {
return nil, nil return false, nil
} }
last := responses[len(responses)-1] last := responses[len(responses)-1]
switch last.StatusCode { switch last.StatusCode {
case http.StatusUnauthorized: case http.StatusUnauthorized:
log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized")
if r.auth != nil { if r.host.Authorizer != nil {
if err := r.auth.AddResponses(ctx, responses); err == nil { if err := r.host.Authorizer.AddResponses(ctx, responses); err == nil {
return copyRequest(req) return true, nil
} else if !errdefs.IsNotImplemented(err) { } else if !errdefs.IsNotImplemented(err) {
return nil, err return false, err
} }
} }
return nil, nil
return false, nil
case http.StatusMethodNotAllowed: case http.StatusMethodNotAllowed:
// Support registries which have not properly implemented the HEAD method for // Support registries which have not properly implemented the HEAD method for
// manifests endpoint // manifests endpoint
if req.Method == http.MethodHead && strings.Contains(req.URL.Path, "/manifests/") { if r.method == http.MethodHead && strings.Contains(r.path, "/manifests/") {
// TODO: copy request? r.method = http.MethodGet
req.Method = http.MethodGet return true, nil
return copyRequest(req)
} }
case http.StatusRequestTimeout, http.StatusTooManyRequests: case http.StatusRequestTimeout, http.StatusTooManyRequests:
return copyRequest(req) return true, nil
} }
// TODO: Handle 50x errors accounting for attempt history // TODO: Handle 50x errors accounting for attempt history
return nil, nil return false, nil
} }
func copyRequest(req *http.Request) (*http.Request, error) { func (r *request) String() string {
ireq := *req return r.host.Scheme + "://" + r.host.Host + r.path
if ireq.GetBody != nil { }
var err error
ireq.Body, err = ireq.GetBody() func requestFields(req *http.Request) logrus.Fields {
if err != nil { fields := map[string]interface{}{
return nil, err "request.method": req.Method,
}
for k, vals := range req.Header {
k = strings.ToLower(k)
if k == "authorization" {
continue
}
for i, v := range vals {
field := "request.header." + k
if i > 0 {
field = fmt.Sprintf("%s.%d", field, i)
}
fields[field] = v
} }
} }
return &ireq, nil
return logrus.Fields(fields)
}
func responseFields(resp *http.Response) logrus.Fields {
fields := map[string]interface{}{
"response.status": resp.Status,
}
for k, vals := range resp.Header {
k = strings.ToLower(k)
for i, v := range vals {
field := "response.header." + k
if i > 0 {
field = fmt.Sprintf("%s.%d", field, i)
}
fields[field] = v
}
}
return logrus.Fields(fields)
} }

View File

@ -41,9 +41,7 @@ func TestHTTPResolver(t *testing.T) {
s := func(h http.Handler) (string, ResolverOptions, func()) { s := func(h http.Handler) (string, ResolverOptions, func()) {
s := httptest.NewServer(h) s := httptest.NewServer(h)
options := ResolverOptions{ options := ResolverOptions{}
PlainHTTP: true,
}
base := s.URL[7:] // strip "http://" base := s.URL[7:] // strip "http://"
return base, options, s.Close return base, options, s.Close
} }
@ -69,9 +67,12 @@ func TestBasicResolver(t *testing.T) {
}) })
base, options, close := tlsServer(wrapped) base, options, close := tlsServer(wrapped)
options.Authorizer = NewAuthorizer(options.Client, func(string) (string, string, error) { options.Hosts = ConfigureDefaultRegistries(
return "user1", "password1", nil WithClient(options.Client),
}) WithAuthorizer(NewAuthorizer(options.Client, func(string) (string, string, error) {
return "user1", "password1", nil
})),
)
return base, options, close return base, options, close
} }
runBasicTest(t, "testname", basicAuth) runBasicTest(t, "testname", basicAuth)
@ -215,7 +216,13 @@ func withTokenServer(th http.Handler, creds func(string) (string, string, error)
}) })
base, options, close := tlsServer(wrapped) base, options, close := tlsServer(wrapped)
options.Authorizer = NewAuthorizer(options.Client, creds) options.Hosts = ConfigureDefaultRegistries(
WithClient(options.Client),
WithAuthorizer(NewDockerAuthorizer(
WithAuthClient(options.Client),
WithAuthCreds(creds),
)),
)
options.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs.AddCert(cert) options.Client.Transport.(*http.Transport).TLSClientConfig.RootCAs.AddCert(cert)
return base, options, func() { return base, options, func() {
s.Close() s.Close()
@ -232,15 +239,18 @@ func tlsServer(h http.Handler) (string, ResolverOptions, func()) {
cert, _ := x509.ParseCertificate(s.TLS.Certificates[0].Certificate[0]) cert, _ := x509.ParseCertificate(s.TLS.Certificates[0].Certificate[0])
capool.AddCert(cert) capool.AddCert(cert)
options := ResolverOptions{ client := &http.Client{
Client: &http.Client{ Transport: &http.Transport{
Transport: &http.Transport{ TLSClientConfig: &tls.Config{
TLSClientConfig: &tls.Config{ RootCAs: capool,
RootCAs: capool,
},
}, },
}, },
} }
options := ResolverOptions{
Hosts: ConfigureDefaultRegistries(WithClient(client)),
// Set deprecated field for tests to use for configuration
Client: client,
}
base := s.URL[8:] // strip "https://" base := s.URL[8:] // strip "https://"
return base, options, s.Close return base, options, s.Close
} }