remotes: support cross-repo-push
With distribution source label in content store, select the longest common prefix components as condidate mount blob source and try to push with mount blob. Fix #2964 Signed-off-by: Wei Fu <fuweid89@gmail.com>
This commit is contained in:
@@ -44,7 +44,8 @@ type dockerAuthorizer struct {
|
||||
ua string
|
||||
mu sync.Mutex
|
||||
|
||||
auth map[string]string
|
||||
// indexed by host name
|
||||
handlers map[string]*authHandler
|
||||
}
|
||||
|
||||
// NewAuthorizer creates a Docker authorizer using the provided function to
|
||||
@@ -53,116 +54,226 @@ func NewAuthorizer(client *http.Client, f func(string) (string, string, error))
|
||||
if client == nil {
|
||||
client = http.DefaultClient
|
||||
}
|
||||
|
||||
return &dockerAuthorizer{
|
||||
credentials: f,
|
||||
client: client,
|
||||
ua: "containerd/" + version.Version,
|
||||
auth: map[string]string{},
|
||||
handlers: make(map[string]*authHandler),
|
||||
}
|
||||
}
|
||||
|
||||
// Authorize handles auth request.
|
||||
func (a *dockerAuthorizer) Authorize(ctx context.Context, req *http.Request) error {
|
||||
// TODO: Lookup matching challenge and scope rather than just host
|
||||
if auth := a.getAuth(req.URL.Host); auth != "" {
|
||||
req.Header.Set("Authorization", auth)
|
||||
// skip if there is no auth handler
|
||||
ah := a.getAuthHandler(req.URL.Host)
|
||||
if ah == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
auth, err := ah.authorize(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req.Header.Set("Authorization", auth)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *dockerAuthorizer) getAuthHandler(host string) *authHandler {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
return a.handlers[host]
|
||||
}
|
||||
|
||||
func (a *dockerAuthorizer) AddResponses(ctx context.Context, responses []*http.Response) error {
|
||||
last := responses[len(responses)-1]
|
||||
host := last.Request.URL.Host
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
for _, c := range parseAuthHeader(last.Header) {
|
||||
if c.scheme == bearerAuth {
|
||||
if err := invalidAuthorization(c, responses); err != nil {
|
||||
// TODO: Clear token
|
||||
a.setAuth(host, "")
|
||||
delete(a.handlers, host)
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO(dmcg): Store challenge, not token
|
||||
// Move token fetching to authorize
|
||||
return a.setTokenAuth(ctx, host, c.parameters)
|
||||
// reuse existing handler
|
||||
//
|
||||
// assume that one registry will return the common
|
||||
// challenge information, including realm and service.
|
||||
// and the resource scope is only different part
|
||||
// which can be provided by each request.
|
||||
if _, ok := a.handlers[host]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
common, err := a.generateTokenOptions(ctx, host, c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.handlers[host] = newAuthHandler(a.client, a.ua, c.scheme, common)
|
||||
return nil
|
||||
} else if c.scheme == basicAuth && a.credentials != nil {
|
||||
// TODO: Resolve credentials on authorize
|
||||
username, secret, err := a.credentials(host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if username != "" && secret != "" {
|
||||
auth := username + ":" + secret
|
||||
a.setAuth(host, fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(auth))))
|
||||
common := tokenOptions{
|
||||
username: username,
|
||||
secret: secret,
|
||||
}
|
||||
|
||||
a.handlers[host] = newAuthHandler(a.client, a.ua, c.scheme, common)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Wrap(errdefs.ErrNotImplemented, "failed to find supported auth scheme")
|
||||
}
|
||||
|
||||
func (a *dockerAuthorizer) getAuth(host string) string {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
return a.auth[host]
|
||||
}
|
||||
|
||||
func (a *dockerAuthorizer) setAuth(host string, auth string) bool {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
||||
changed := a.auth[host] != auth
|
||||
a.auth[host] = auth
|
||||
|
||||
return changed
|
||||
}
|
||||
|
||||
func (a *dockerAuthorizer) setTokenAuth(ctx context.Context, host string, params map[string]string) error {
|
||||
realm, ok := params["realm"]
|
||||
func (a *dockerAuthorizer) generateTokenOptions(ctx context.Context, host string, c challenge) (tokenOptions, error) {
|
||||
realm, ok := c.parameters["realm"]
|
||||
if !ok {
|
||||
return errors.New("no realm specified for token auth challenge")
|
||||
return tokenOptions{}, errors.New("no realm specified for token auth challenge")
|
||||
}
|
||||
|
||||
realmURL, err := url.Parse(realm)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "invalid token auth challenge realm")
|
||||
return tokenOptions{}, errors.Wrap(err, "invalid token auth challenge realm")
|
||||
}
|
||||
|
||||
to := tokenOptions{
|
||||
realm: realmURL.String(),
|
||||
service: params["service"],
|
||||
service: c.parameters["service"],
|
||||
}
|
||||
|
||||
to.scopes = getTokenScopes(ctx, params)
|
||||
if len(to.scopes) == 0 {
|
||||
return errors.Errorf("no scope specified for token auth challenge")
|
||||
scope, ok := c.parameters["scope"]
|
||||
if !ok {
|
||||
return tokenOptions{}, errors.Errorf("no scope specified for token auth challenge")
|
||||
}
|
||||
to.scopes = append(to.scopes, scope)
|
||||
|
||||
if a.credentials != nil {
|
||||
to.username, to.secret, err = a.credentials(host)
|
||||
if err != nil {
|
||||
return err
|
||||
return tokenOptions{}, err
|
||||
}
|
||||
}
|
||||
return to, nil
|
||||
}
|
||||
|
||||
var token string
|
||||
// authResult is used to control limit rate.
|
||||
type authResult struct {
|
||||
sync.WaitGroup
|
||||
token string
|
||||
err error
|
||||
}
|
||||
|
||||
// authHandler is used to handle auth request per registry server.
|
||||
type authHandler struct {
|
||||
sync.Mutex
|
||||
|
||||
ua string
|
||||
|
||||
client *http.Client
|
||||
|
||||
// only support basic and bearer schemes
|
||||
scheme authenticationScheme
|
||||
|
||||
// common contains common challenge answer
|
||||
common tokenOptions
|
||||
|
||||
// scopedTokens caches token indexed by scopes, which used in
|
||||
// bearer auth case
|
||||
scopedTokens map[string]*authResult
|
||||
}
|
||||
|
||||
func newAuthHandler(client *http.Client, ua string, scheme authenticationScheme, opts tokenOptions) *authHandler {
|
||||
if client == nil {
|
||||
client = http.DefaultClient
|
||||
}
|
||||
|
||||
return &authHandler{
|
||||
ua: ua,
|
||||
client: client,
|
||||
scheme: scheme,
|
||||
common: opts,
|
||||
scopedTokens: map[string]*authResult{},
|
||||
}
|
||||
}
|
||||
|
||||
func (ah *authHandler) authorize(ctx context.Context) (string, error) {
|
||||
switch ah.scheme {
|
||||
case basicAuth:
|
||||
return ah.doBasicAuth(ctx)
|
||||
case bearerAuth:
|
||||
return ah.doBearerAuth(ctx)
|
||||
default:
|
||||
return "", errors.Wrap(errdefs.ErrNotImplemented, "failed to find supported auth scheme")
|
||||
}
|
||||
}
|
||||
|
||||
func (ah *authHandler) doBasicAuth(ctx context.Context) (string, error) {
|
||||
username, secret := ah.common.username, ah.common.secret
|
||||
|
||||
if username == "" || secret == "" {
|
||||
return "", fmt.Errorf("failed to handle basic auth because missing username or secret")
|
||||
}
|
||||
|
||||
auth := base64.StdEncoding.EncodeToString([]byte(username + ":" + secret))
|
||||
return fmt.Sprintf("%s %s", "Basic", auth), nil
|
||||
}
|
||||
|
||||
func (ah *authHandler) doBearerAuth(ctx context.Context) (string, error) {
|
||||
// copy common tokenOptions
|
||||
to := ah.common
|
||||
|
||||
to.scopes = getTokenScopes(ctx, to.scopes)
|
||||
if len(to.scopes) == 0 {
|
||||
return "", errors.Errorf("no scope specified for token auth challenge")
|
||||
}
|
||||
|
||||
// Docs: https://docs.docker.com/registry/spec/auth/scope
|
||||
scoped := strings.Join(to.scopes, " ")
|
||||
|
||||
ah.Lock()
|
||||
if r, exist := ah.scopedTokens[scoped]; exist {
|
||||
ah.Unlock()
|
||||
r.Wait()
|
||||
return r.token, r.err
|
||||
}
|
||||
|
||||
// only one fetch token job
|
||||
r := new(authResult)
|
||||
r.Add(1)
|
||||
ah.scopedTokens[scoped] = r
|
||||
ah.Unlock()
|
||||
|
||||
// fetch token for the resource scope
|
||||
var (
|
||||
token string
|
||||
err error
|
||||
)
|
||||
if to.secret != "" {
|
||||
// Credential information is provided, use oauth POST endpoint
|
||||
token, err = a.fetchTokenWithOAuth(ctx, to)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to fetch oauth token")
|
||||
}
|
||||
// credential information is provided, use oauth POST endpoint
|
||||
token, err = ah.fetchTokenWithOAuth(ctx, to)
|
||||
err = errors.Wrap(err, "failed to fetch oauth token")
|
||||
} else {
|
||||
// Do request anonymously
|
||||
token, err = a.fetchToken(ctx, to)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to fetch anonymous token")
|
||||
}
|
||||
// do request anonymously
|
||||
token, err = ah.fetchToken(ctx, to)
|
||||
err = errors.Wrap(err, "failed to fetch anonymous token")
|
||||
}
|
||||
a.setAuth(host, fmt.Sprintf("Bearer %s", token))
|
||||
token = fmt.Sprintf("%s %s", "Bearer", token)
|
||||
|
||||
return nil
|
||||
r.token, r.err = token, err
|
||||
r.Done()
|
||||
return r.token, r.err
|
||||
}
|
||||
|
||||
type tokenOptions struct {
|
||||
@@ -181,7 +292,7 @@ type postTokenResponse struct {
|
||||
Scope string `json:"scope"`
|
||||
}
|
||||
|
||||
func (a *dockerAuthorizer) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) {
|
||||
func (ah *authHandler) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) {
|
||||
form := url.Values{}
|
||||
form.Set("scope", strings.Join(to.scopes, " "))
|
||||
form.Set("service", to.service)
|
||||
@@ -202,11 +313,11 @@ func (a *dockerAuthorizer) fetchTokenWithOAuth(ctx context.Context, to tokenOpti
|
||||
return "", err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8")
|
||||
if a.ua != "" {
|
||||
req.Header.Set("User-Agent", a.ua)
|
||||
if ah.ua != "" {
|
||||
req.Header.Set("User-Agent", ah.ua)
|
||||
}
|
||||
|
||||
resp, err := ctxhttp.Do(ctx, a.client, req)
|
||||
resp, err := ctxhttp.Do(ctx, ah.client, req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@@ -216,7 +327,7 @@ func (a *dockerAuthorizer) fetchTokenWithOAuth(ctx context.Context, to tokenOpti
|
||||
// As of September 2017, GCR is known to return 404.
|
||||
// As of February 2018, JFrog Artifactory is known to return 401.
|
||||
if (resp.StatusCode == 405 && to.username != "") || resp.StatusCode == 404 || resp.StatusCode == 401 {
|
||||
return a.fetchToken(ctx, to)
|
||||
return ah.fetchToken(ctx, to)
|
||||
} else if resp.StatusCode < 200 || resp.StatusCode >= 400 {
|
||||
b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB
|
||||
log.G(ctx).WithFields(logrus.Fields{
|
||||
@@ -245,15 +356,15 @@ type getTokenResponse struct {
|
||||
RefreshToken string `json:"refresh_token"`
|
||||
}
|
||||
|
||||
// getToken fetches a token using a GET request
|
||||
func (a *dockerAuthorizer) fetchToken(ctx context.Context, to tokenOptions) (string, error) {
|
||||
// fetchToken fetches a token using a GET request
|
||||
func (ah *authHandler) fetchToken(ctx context.Context, to tokenOptions) (string, error) {
|
||||
req, err := http.NewRequest("GET", to.realm, nil)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if a.ua != "" {
|
||||
req.Header.Set("User-Agent", a.ua)
|
||||
if ah.ua != "" {
|
||||
req.Header.Set("User-Agent", ah.ua)
|
||||
}
|
||||
|
||||
reqParams := req.URL.Query()
|
||||
@@ -272,7 +383,7 @@ func (a *dockerAuthorizer) fetchToken(ctx context.Context, to tokenOptions) (str
|
||||
|
||||
req.URL.RawQuery = reqParams.Encode()
|
||||
|
||||
resp, err := ctxhttp.Do(ctx, a.client, req)
|
||||
resp, err := ctxhttp.Do(ctx, ah.client, req)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
Reference in New Issue
Block a user