From f3ccd8589192a7cf66a9abae57c9034325d18e69 Mon Sep 17 00:00:00 2001 From: abhi Date: Fri, 1 Dec 2017 15:56:53 -0800 Subject: [PATCH] Adding Registry Mirror support This commit aims to add registy mirror support similar to docker. The UI is similar to docker where user can provide mirror urls and the image resolves against the provided mirrors before fetching from default docker regitry mirror url. Signed-off-by: abhi --- cmd/cri-containerd/options/options.go | 11 + cmd/cri-containerd/options/registry.go | 88 ++++ pkg/containerd/resolver/auth.go | 202 ++++++++ pkg/containerd/resolver/fetcher.go | 142 +++++ pkg/containerd/resolver/httpreadseeker.go | 148 ++++++ pkg/containerd/resolver/resolver.go | 606 ++++++++++++++++++++++ pkg/containerd/resolver/scope.go | 80 +++ pkg/containerd/resolver/scope_test.go | 58 +++ pkg/containerd/resolver/status.go | 71 +++ pkg/server/helpers.go | 1 - pkg/server/image_pull.go | 15 +- 11 files changed, 1417 insertions(+), 5 deletions(-) create mode 100644 cmd/cri-containerd/options/registry.go create mode 100644 pkg/containerd/resolver/auth.go create mode 100644 pkg/containerd/resolver/fetcher.go create mode 100644 pkg/containerd/resolver/httpreadseeker.go create mode 100644 pkg/containerd/resolver/resolver.go create mode 100644 pkg/containerd/resolver/scope.go create mode 100644 pkg/containerd/resolver/scope_test.go create mode 100644 pkg/containerd/resolver/status.go diff --git a/cmd/cri-containerd/options/options.go b/cmd/cri-containerd/options/options.go index 5599c007b..7c72268f9 100644 --- a/cmd/cri-containerd/options/options.go +++ b/cmd/cri-containerd/options/options.go @@ -72,6 +72,8 @@ type PluginConfig struct { ContainerdConfig `toml:"containerd" json:"containerd,omitempty"` // CniConfig contains config related to cni CniConfig `toml:"cni" json:"cni,omitempty"` + // Registry contains config related to the registry + Registry `toml:"registry" json:"registry,omitempty"` // StreamServerAddress is the ip address streaming server is listening on. StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress,omitempty"` // StreamServerPort is the port streaming server is listening on. @@ -191,6 +193,8 @@ func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) { defaults.SkipImageFSUUID, "Skip retrieval of imagefs uuid. When turned on, kubelet will not be able to get imagefs capacity or perform imagefs disk eviction.") fs.BoolVar(&c.EnableIPv6DAD, "enable-ipv6-dad", defaults.EnableIPv6DAD, "Enable IPv6 DAD (duplicate address detection) for pod sandbox network. Enabling this will increase pod sandbox start latency by several seconds.") + fs.Var(&c.Registry, "registry", + "Registry config for image pull eg --registry=myregistry.io=https://mymirror.io/ --registry=myregistry2.io=https://mymirror2.io/") } // InitFlags load configurations from config file, and then overwrite with flags. @@ -259,6 +263,13 @@ func DefaultConfig() Config { SystemdCgroup: false, SkipImageFSUUID: false, EnableIPv6DAD: false, + Registry: Registry{ + Mirrors: map[string]Mirror{ + "docker.io": { + Endpoints: []string{"https://registry-1.docker.io"}, + }, + }, + }, }, ContainerdRootDir: "/var/lib/containerd", ContainerdEndpoint: "/run/containerd/containerd.sock", diff --git a/cmd/cri-containerd/options/registry.go b/cmd/cri-containerd/options/registry.go new file mode 100644 index 000000000..7f057ee4c --- /dev/null +++ b/cmd/cri-containerd/options/registry.go @@ -0,0 +1,88 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "fmt" + "net/url" + "strings" +) + +// Mirror contains the config related to the registry mirror +type Mirror struct { + Endpoints []string `toml:"endpoint" json:"endpoint,omitempty"` + // TODO (Abhi) We might need to add auth per namespace. Looks like + // image auth information is passed by kube itself. +} + +// Registry is registry settings configured +type Registry struct { + Mirrors map[string]Mirror `toml:"mirrors" json:"mirrors,omitempty"` +} + +// String returns the string format of registry type +func (r *Registry) String() string { + // Its not used hence return empty string + return "" +} + +// Set validates and converts into the internal registry struct +func (r *Registry) Set(s string) error { + // --registry docker.io=https://mymirror.io,http://mymirror2.io + // If no option is set then return format error + if len(s) == 0 { + return fmt.Errorf("incomplete registry mirror option") + } + var mirrors []string + host := "docker.io" + opt := strings.Split(s, "=") + if len(opt) > 1 { + // If option is set in the format "mynamespace.io=https://mymirror.io,https://mymirror2.io" + // Then associate the mirror urls for the namespace only" + host = opt[0] + mirrors = strings.Split(opt[1], ",") + } else { + // If option is set in the format "https://mymirror.io,https://mymirror.io" + // Then associate mirror against default docker.io namespace + mirrors = strings.Split(opt[0], ",") + } + + // Validate the format of the urls passed + for _, u := range mirrors { + _, err := url.Parse(u) + if err != nil { + return fmt.Errorf("invalid registry mirror url format %v: %v", u, err) + } + } + + if r.Mirrors == nil { + r.Mirrors = make(map[string]Mirror) + } + if _, ok := r.Mirrors[host]; !ok { + r.Mirrors[host] = Mirror{} + } + m := r.Mirrors[host] + m.Endpoints = append(m.Endpoints, mirrors...) + r.Mirrors[host] = m + + return nil +} + +// Type returns a string name for the option type +func (r *Registry) Type() string { + return "list" +} diff --git a/pkg/containerd/resolver/auth.go b/pkg/containerd/resolver/auth.go new file mode 100644 index 000000000..15d406a93 --- /dev/null +++ b/pkg/containerd/resolver/auth.go @@ -0,0 +1,202 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/ +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "net/http" + "sort" + "strings" +) + +type authenticationScheme byte + +const ( + basicAuth authenticationScheme = 1 << iota // Defined in RFC 7617 + digestAuth // Defined in RFC 7616 + bearerAuth // Defined in RFC 6750 +) + +// challenge carries information from a WWW-Authenticate response header. +// See RFC 2617. +type challenge struct { + // scheme is the auth-scheme according to RFC 2617 + scheme authenticationScheme + + // parameters are the auth-params according to RFC 2617 + parameters map[string]string +} + +type byScheme []challenge + +func (bs byScheme) Len() int { return len(bs) } +func (bs byScheme) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] } + +// Sort in priority order: token > digest > basic +func (bs byScheme) Less(i, j int) bool { return bs[i].scheme > bs[j].scheme } + +// Octet types from RFC 2616. +type octetType byte + +var octetTypes [256]octetType + +const ( + isToken octetType = 1 << iota + isSpace +) + +func init() { + // OCTET = + // CHAR = + // CTL = + // CR = + // LF = + // SP = + // HT = + // <"> = + // CRLF = CR LF + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = + // separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <"> + // | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT + // token = 1* + // qdtext = > + + for c := 0; c < 256; c++ { + var t octetType + isCtl := c <= 31 || c == 127 + isChar := 0 <= c && c <= 127 + isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c)) + if strings.ContainsRune(" \t\r\n", rune(c)) { + t |= isSpace + } + if isChar && !isCtl && !isSeparator { + t |= isToken + } + octetTypes[c] = t + } +} + +func parseAuthHeader(header http.Header) []challenge { + challenges := []challenge{} + for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] { + v, p := parseValueAndParams(h) + var s authenticationScheme + switch v { + case "basic": + s = basicAuth + case "digest": + s = digestAuth + case "bearer": + s = bearerAuth + default: + continue + } + challenges = append(challenges, challenge{scheme: s, parameters: p}) + } + sort.Stable(byScheme(challenges)) + return challenges +} + +func parseValueAndParams(header string) (value string, params map[string]string) { + params = make(map[string]string) + value, s := expectToken(header) + if value == "" { + return + } + value = strings.ToLower(value) + for { + var pkey string + pkey, s = expectToken(skipSpace(s)) + if pkey == "" { + return + } + if !strings.HasPrefix(s, "=") { + return + } + var pvalue string + pvalue, s = expectTokenOrQuoted(s[1:]) + if pvalue == "" { + return + } + pkey = strings.ToLower(pkey) + params[pkey] = pvalue + s = skipSpace(s) + if !strings.HasPrefix(s, ",") { + return + } + s = s[1:] + } +} + +func skipSpace(s string) (rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isSpace == 0 { + break + } + } + return s[i:] +} + +func expectToken(s string) (token, rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isToken == 0 { + break + } + } + return s[:i], s[i:] +} + +func expectTokenOrQuoted(s string) (value string, rest string) { + if !strings.HasPrefix(s, "\"") { + return expectToken(s) + } + s = s[1:] + for i := 0; i < len(s); i++ { + switch s[i] { + case '"': + return s[:i], s[i+1:] + case '\\': + p := make([]byte, len(s)-1) + j := copy(p, s[:i]) + escape := true + for i = i + 1; i < len(s); i++ { + b := s[i] + switch { + case escape: + escape = false + p[j] = b + j++ + case b == '\\': + escape = true + case b == '"': + return string(p[:j]), s[i+1:] + default: + p[j] = b + j++ + } + } + return "", "" + } + } + return "", "" +} diff --git a/pkg/containerd/resolver/fetcher.go b/pkg/containerd/resolver/fetcher.go new file mode 100644 index 000000000..45cc605a7 --- /dev/null +++ b/pkg/containerd/resolver/fetcher.go @@ -0,0 +1,142 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/ +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "context" + "fmt" + "io" + "net/http" + "path" + "strings" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type dockerFetcher struct { + *dockerBase +} + +func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields( + logrus.Fields{ + //"base": r.base.String(), + "digest": desc.Digest, + }, + )) + + urls, err := r.getV2URLPaths(ctx, desc) + if err != nil { + return nil, err + } + + ctx, err = contextWithRepositoryScope(ctx, r.refspec, false) + if err != nil { + return nil, err + } + + return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) { + for _, u := range urls { + rc, err := r.open(ctx, u, desc.MediaType, offset) + if err != nil { + if errdefs.IsNotFound(err) { + continue // try one of the other urls. + } + + return nil, err + } + + return rc, nil + } + + return nil, errors.Wrapf(errdefs.ErrNotFound, + "could not fetch content descriptor %v (%v) from remote", + desc.Digest, desc.MediaType) + + }) +} + +func (r dockerFetcher) open(ctx context.Context, u, mediatype string, offset int64) (io.ReadCloser, error) { + req, err := http.NewRequest(http.MethodGet, u, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Accept", strings.Join([]string{mediatype, `*`}, ", ")) + + if offset > 0 { + // TODO(stevvooe): Only set this header in response to the + // "Accept-Ranges: bytes" header. + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + + resp, err := r.doRequestWithRetries(ctx, req, nil) + if err != nil { + return nil, err + } + + if resp.StatusCode > 299 { + // TODO(stevvooe): When doing a offset specific request, we should + // really distinguish between a 206 and a 200. In the case of 200, we + // can discard the bytes, hiding the seek behavior from the + // implementation. + + resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + return nil, errors.Wrapf(errdefs.ErrNotFound, "content at %v not found", u) + } + return nil, errors.Errorf("unexpected status code %v: %v", u, resp.Status) + } + + return resp.Body, nil +} + +// getV2URLPaths generates the candidate urls paths for the object based on the +// set of hints and the provided object id. URLs are returned in the order of +// most to least likely succeed. +func (r *dockerFetcher) getV2URLPaths(ctx context.Context, desc ocispec.Descriptor) ([]string, error) { + var urls []string + + if len(desc.URLs) > 0 { + // handle fetch via external urls. + for _, u := range desc.URLs { + log.G(ctx).WithField("url", u).Debug("adding alternative url") + urls = append(urls, u) + } + } + + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, + images.MediaTypeDockerSchema1Manifest, + ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: + urls = append(urls, r.url(path.Join("manifests", desc.Digest.String()))) + } + + // always fallback to attempting to get the object out of the blobs store. + urls = append(urls, r.url(path.Join("blobs", desc.Digest.String()))) + + return urls, nil +} diff --git a/pkg/containerd/resolver/httpreadseeker.go b/pkg/containerd/resolver/httpreadseeker.go new file mode 100644 index 000000000..16af3d7e4 --- /dev/null +++ b/pkg/containerd/resolver/httpreadseeker.go @@ -0,0 +1,148 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "bytes" + "io" + "io/ioutil" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/pkg/errors" +) + +type httpReadSeeker struct { + size int64 + offset int64 + rc io.ReadCloser + open func(offset int64) (io.ReadCloser, error) + closed bool +} + +func newHTTPReadSeeker(size int64, open func(offset int64) (io.ReadCloser, error)) (io.ReadCloser, error) { + return &httpReadSeeker{ + size: size, + open: open, + }, nil +} + +func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { + if hrs.closed { + return 0, io.EOF + } + + rd, err := hrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hrs.offset += int64(n) + return +} + +func (hrs *httpReadSeeker) Close() error { + if hrs.closed { + return nil + } + hrs.closed = true + if hrs.rc != nil { + return hrs.rc.Close() + } + + return nil +} + +func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { + if hrs.closed { + return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: closed") + } + + abs := hrs.offset + switch whence { + case io.SeekStart: + abs = offset + case io.SeekCurrent: + abs += offset + case io.SeekEnd: + if hrs.size == -1 { + return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: unknown size, cannot seek from end") + } + abs = hrs.size + offset + default: + return 0, errors.Wrap(errdefs.ErrInvalidArgument, "Fetcher.Seek: invalid whence") + } + + if abs < 0 { + return 0, errors.Wrapf(errdefs.ErrInvalidArgument, "Fetcher.Seek: negative offset") + } + + if abs != hrs.offset { + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Errorf("Fetcher.Seek: failed to close ReadCloser") + } + + hrs.rc = nil + } + + hrs.offset = abs + } + + return hrs.offset, nil +} + +func (hrs *httpReadSeeker) reader() (io.Reader, error) { + if hrs.rc != nil { + return hrs.rc, nil + } + + if hrs.size == -1 || hrs.offset < hrs.size { + // only try to reopen the body request if we are seeking to a value + // less than the actual size. + if hrs.open == nil { + return nil, errors.Wrapf(errdefs.ErrNotImplemented, "cannot open") + } + + rc, err := hrs.open(hrs.offset) + if err != nil { + return nil, errors.Wrapf(err, "httpReaderSeeker: failed open") + } + + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Errorf("httpReadSeeker: failed to close ReadCloser") + } + } + hrs.rc = rc + } else { + // There is an edge case here where offset == size of the content. If + // we seek, we will probably get an error for content that cannot be + // sought (?). In that case, we should err on committing the content, + // as the length is already satisified but we just return the empty + // reader instead. + + hrs.rc = ioutil.NopCloser(bytes.NewReader([]byte{})) + } + + return hrs.rc, nil +} diff --git a/pkg/containerd/resolver/resolver.go b/pkg/containerd/resolver/resolver.go new file mode 100644 index 000000000..4d9195ac7 --- /dev/null +++ b/pkg/containerd/resolver/resolver.go @@ -0,0 +1,606 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + "github.com/containerd/containerd/remotes" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/net/context/ctxhttp" +) + +// This file is a modified copy of containerd/remote/docker/resolver.go. +// The changes carried over by this file includes support for image ref +// resolution and fetching image using multiple registry mirror urls. A new +// ResolverOption called Registry is added. Registry will contain a map +// of namespace relevant mirror urls. The client will use the ResolverOptions +// to set the urls associated with the namespace of the image reference. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. The specific changes are made to the base +// function to calculate the base urls for the image location. urls() is +// added to fetch the mirror urls associated with the namespace of the image +// ResolverOptions are changed for client to set the namespace and mirror urls +// for to pull the image. + +var ( + // ErrNoToken is returned if a request is successful but the body does not + // contain an authorization token. + ErrNoToken = errors.New("authorization server did not include a token in the response") + + // ErrInvalidAuthorization is used when credentials are passed to a server but + // those credentials are rejected. + ErrInvalidAuthorization = errors.New("authorization failed") +) + +type containerdResolver struct { + credentials func(string) (string, string, error) + plainHTTP bool + client *http.Client + tracker StatusTracker + registry map[string][]string +} + +// Options are used to configured a new Docker register resolver +type Options struct { + // Credentials provides username and secret given a host. + // If username is empty but a secret is given, that secret + // is interpretted as a long lived token. + Credentials func(string) (string, string, error) + + // PlainHTTP specifies to use plain http and not https + PlainHTTP bool + + // Client is the http client to used when making registry requests + Client *http.Client + + // Tracker is used to track uploads to the registry. This is used + // since the registry does not have upload tracking and the existing + // mechanism for getting blob upload status is expensive. + Tracker StatusTracker + + Registry map[string][]string +} + +// NewResolver returns a new resolver to a Docker registry +func NewResolver(options Options) remotes.Resolver { + tracker := options.Tracker + if tracker == nil { + tracker = NewInMemoryTracker() + } + + return &containerdResolver{ + credentials: options.Credentials, + plainHTTP: options.PlainHTTP, + client: options.Client, + tracker: tracker, + registry: options.Registry, + } +} + +var _ remotes.Resolver = &containerdResolver{} + +func (r *containerdResolver) Resolve(ctx context.Context, ref string) (string, ocispec.Descriptor, error) { + refspec, err := reference.Parse(ref) + if err != nil { + return "", ocispec.Descriptor{}, err + } + + if refspec.Object == "" { + return "", ocispec.Descriptor{}, reference.ErrObjectRequired + } + + base, err := r.base(refspec) + if err != nil { + return "", ocispec.Descriptor{}, err + } + + fetcher := dockerFetcher{ + dockerBase: base, + } + + var ( + urls []string + dgst = refspec.Digest() + ) + + if dgst != "" { + if err := dgst.Validate(); err != nil { + // need to fail here, since we can't actually resolve the invalid + // digest. + return "", ocispec.Descriptor{}, err + } + + // turns out, we have a valid digest, make a url. + urls = append(urls, fetcher.urls("manifests", dgst.String())...) + + // fallback to blobs on not found. + urls = append(urls, fetcher.urls("blobs", dgst.String())...) + } else { + urls = append(urls, fetcher.urls("manifests", refspec.Object)...) + } + + ctx, err = contextWithRepositoryScope(ctx, refspec, false) + if err != nil { + return "", ocispec.Descriptor{}, err + } + for _, u := range urls { + log.G(ctx).WithFields(logrus.Fields{ + "url": u, + }).Debug("Trying to fetch from url") + req, err := http.NewRequest(http.MethodHead, u, nil) + if err != nil { + return "", ocispec.Descriptor{}, err + } + + // set headers for all the types we support for resolution. + req.Header.Set("Accept", strings.Join([]string{ + images.MediaTypeDockerSchema2Manifest, + images.MediaTypeDockerSchema2ManifestList, + ocispec.MediaTypeImageManifest, + ocispec.MediaTypeImageIndex, "*"}, ", ")) + + log.G(ctx).Info("resolving") + resp, err := fetcher.doRequestWithRetries(ctx, req, nil) + if err != nil { + return "", ocispec.Descriptor{}, err + } + resp.Body.Close() // don't care about body contents. + + if resp.StatusCode > 299 { + if resp.StatusCode == http.StatusNotFound { + continue + } + return "", ocispec.Descriptor{}, errors.Errorf("unexpected status code %v: %v", u, resp.Status) + } + + // this is the only point at which we trust the registry. we use the + // content headers to assemble a descriptor for the name. when this becomes + // more robust, we mostly get this information from a secure trust store. + dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest")) + + if dgstHeader != "" { + if err := dgstHeader.Validate(); err != nil { + return "", ocispec.Descriptor{}, errors.Wrapf(err, "%q in header not a valid digest", dgstHeader) + } + dgst = dgstHeader + } + + if dgst == "" { + return "", ocispec.Descriptor{}, errors.Errorf("could not resolve digest for %v", ref) + } + + var ( + size int64 + sizeHeader = resp.Header.Get("Content-Length") + ) + + size, err = strconv.ParseInt(sizeHeader, 10, 64) + if err != nil { + + return "", ocispec.Descriptor{}, errors.Wrapf(err, "invalid size header: %q", sizeHeader) + } + if size < 0 { + return "", ocispec.Descriptor{}, errors.Errorf("%q in header not a valid size", sizeHeader) + } + + desc := ocispec.Descriptor{ + Digest: dgst, + MediaType: resp.Header.Get("Content-Type"), // need to strip disposition? + Size: size, + } + + log.G(ctx).WithField("desc.digest", desc.Digest).Debug("resolved") + return ref, desc, nil + } + + return "", ocispec.Descriptor{}, errors.Errorf("%v not found", ref) +} + +func (r *containerdResolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) { + refspec, err := reference.Parse(ref) + if err != nil { + return nil, err + } + + base, err := r.base(refspec) + if err != nil { + return nil, err + } + + return dockerFetcher{ + dockerBase: base, + }, nil +} + +func (r *containerdResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher, error) { + return nil, nil +} + +type dockerBase struct { + refspec reference.Spec + base []url.URL + token string + + client *http.Client + useBasic bool + username string + secret string +} + +func (r *containerdResolver) base(refspec reference.Spec) (*dockerBase, error) { + var ( + err error + base []url.URL + username, secret string + ) + + host := refspec.Hostname() + prefix := strings.TrimPrefix(refspec.Locator, host+"/") + + if urls, ok := r.registry[host]; ok { + urls, err := r.getV2Urls(urls, prefix) + if err != nil { + return nil, fmt.Errorf("failed to fetch v2 urls: %v", err) + } + base = append(base, urls...) + } else if host == "docker.io" { + base = append(base, []url.URL{{Host: "registry-1.docker.io", Scheme: "https", Path: path.Join("/v2", prefix)}}...) + } else { + scheme := "https" + if r.plainHTTP || strings.HasPrefix(host, "localhost:") { + scheme = "http" + } + base = append(base, []url.URL{{Host: host, Scheme: scheme, Path: path.Join("/v2", prefix)}}...) + } + + if r.credentials != nil { + username, secret, err = r.credentials(base[0].Host) + if err != nil { + return nil, err + } + } + + return &dockerBase{ + refspec: refspec, + base: base, + client: r.client, + username: username, + secret: secret, + }, nil +} + +func (r *dockerBase) url(ps ...string) string { + url := r.base[0] + url.Path = path.Join(url.Path, path.Join(ps...)) + return url.String() +} + +func (r *dockerBase) urls(ps ...string) []string { + urls := []string{} + for _, url := range r.base { + url.Path = path.Join(url.Path, path.Join(ps...)) + urls = append(urls, url.String()) + } + return urls +} + +func (r *dockerBase) authorize(req *http.Request) { + if r.useBasic { + req.SetBasicAuth(r.username, r.secret) + } else if r.token != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", r.token)) + } +} + +func (r *dockerBase) doRequest(ctx context.Context, req *http.Request) (*http.Response, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", req.URL.String())) + log.G(ctx).WithField("request.headers", req.Header).WithField("request.method", req.Method).Debug("do request") + r.authorize(req) + resp, err := ctxhttp.Do(ctx, r.client, req) + if err != nil { + return nil, errors.Wrap(err, "failed to do request") + } + log.G(ctx).WithFields(logrus.Fields{ + "status": resp.Status, + "response.headers": resp.Header, + }).Debug("fetch response received") + return resp, nil +} + +func (r *dockerBase) doRequestWithRetries(ctx context.Context, req *http.Request, responses []*http.Response) (*http.Response, error) { + resp, err := r.doRequest(ctx, req) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + req, err = r.retryRequest(ctx, req, responses) + if err != nil { + resp.Body.Close() + return nil, err + } + if req != nil { + resp.Body.Close() + return r.doRequestWithRetries(ctx, req, responses) + } + return resp, err +} + +func (r *dockerBase) retryRequest(ctx context.Context, req *http.Request, responses []*http.Response) (*http.Request, error) { + if len(responses) > 5 { + return nil, nil + } + last := responses[len(responses)-1] + if last.StatusCode == http.StatusUnauthorized { + log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") + for _, c := range parseAuthHeader(last.Header) { + if c.scheme == bearerAuth { + if err := invalidAuthorization(c, responses); err != nil { + r.token = "" + return nil, err + } + if err := r.setTokenAuth(ctx, c.parameters); err != nil { + return nil, err + } + return copyRequest(req) + } else if c.scheme == basicAuth { + if r.username != "" && r.secret != "" { + r.useBasic = true + } + return copyRequest(req) + } + } + return nil, nil + } else if last.StatusCode == http.StatusMethodNotAllowed && req.Method == http.MethodHead { + // Support registries which have not properly implemented the HEAD method for + // manifests endpoint + if strings.Contains(req.URL.Path, "/manifests/") { + // TODO: copy request? + req.Method = http.MethodGet + return copyRequest(req) + } + } + + // TODO: Handle 50x errors accounting for attempt history + return nil, nil +} + +func invalidAuthorization(c challenge, responses []*http.Response) error { + errStr := c.parameters["error"] + if errStr == "" { + return nil + } + + n := len(responses) + if n == 1 || (n > 1 && !sameRequest(responses[n-2].Request, responses[n-1].Request)) { + return nil + } + + return errors.Wrapf(ErrInvalidAuthorization, "server message: %s", errStr) +} + +func sameRequest(r1, r2 *http.Request) bool { + if r1.Method != r2.Method { + return false + } + if *r1.URL != *r2.URL { + return false + } + return true +} + +func copyRequest(req *http.Request) (*http.Request, error) { + ireq := *req + if ireq.GetBody != nil { + var err error + ireq.Body, err = ireq.GetBody() + if err != nil { + return nil, err + } + } + return &ireq, nil +} + +func (r *dockerBase) setTokenAuth(ctx context.Context, params map[string]string) error { + realm, ok := params["realm"] + if !ok { + return errors.New("no realm specified for token auth challenge") + } + + realmURL, err := url.Parse(realm) + if err != nil { + return fmt.Errorf("invalid token auth challenge realm: %s", err) + } + + to := tokenOptions{ + realm: realmURL.String(), + service: params["service"], + } + + to.scopes = getTokenScopes(ctx, params) + if len(to.scopes) == 0 { + return errors.Errorf("no scope specified for token auth challenge") + } + if r.secret != "" { + // Credential information is provided, use oauth POST endpoint + r.token, err = r.fetchTokenWithOAuth(ctx, to) + if err != nil { + return errors.Wrap(err, "failed to fetch oauth token") + } + } else { + // Do request anonymously + r.token, err = r.getToken(ctx, to) + if err != nil { + return errors.Wrap(err, "failed to fetch anonymous token") + } + } + + return nil +} + +type tokenOptions struct { + realm string + service string + scopes []string +} + +type postTokenResponse struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn int `json:"expires_in"` + IssuedAt time.Time `json:"issued_at"` + Scope string `json:"scope"` +} + +func (r *dockerBase) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) { + form := url.Values{} + form.Set("scope", strings.Join(to.scopes, " ")) + form.Set("service", to.service) + // TODO: Allow setting client_id + form.Set("client_id", "containerd-dist-tool") + + if r.username == "" { + form.Set("grant_type", "refresh_token") + form.Set("refresh_token", r.secret) + } else { + form.Set("grant_type", "password") + form.Set("username", r.username) + form.Set("password", r.secret) + } + + resp, err := ctxhttp.PostForm(ctx, r.client, to.realm, form) + if err != nil { + return "", err + } + defer resp.Body.Close() + + // Registries without support for POST may return 404 for POST /v2/token. + // As of September 2017, GCR is known to return 404. + if (resp.StatusCode == 405 && r.username != "") || resp.StatusCode == 404 { + return r.getToken(ctx, to) + } else if resp.StatusCode < 200 || resp.StatusCode >= 400 { + b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB + log.G(ctx).WithFields(logrus.Fields{ + "status": resp.Status, + "body": string(b), + }).Debugf("token request failed") + // TODO: handle error body and write debug output + return "", errors.Errorf("unexpected status: %s", resp.Status) + } + + decoder := json.NewDecoder(resp.Body) + + var tr postTokenResponse + if err = decoder.Decode(&tr); err != nil { + return "", fmt.Errorf("unable to decode token response: %s", err) + } + + return tr.AccessToken, nil +} + +type getTokenResponse struct { + Token string `json:"token"` + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + IssuedAt time.Time `json:"issued_at"` + RefreshToken string `json:"refresh_token"` +} + +// getToken fetches a token using a GET request +func (r *dockerBase) getToken(ctx context.Context, to tokenOptions) (string, error) { + req, err := http.NewRequest("GET", to.realm, nil) + if err != nil { + return "", err + } + + reqParams := req.URL.Query() + + if to.service != "" { + reqParams.Add("service", to.service) + } + + for _, scope := range to.scopes { + reqParams.Add("scope", scope) + } + + if r.secret != "" { + req.SetBasicAuth(r.username, r.secret) + } + + req.URL.RawQuery = reqParams.Encode() + + resp, err := ctxhttp.Do(ctx, r.client, req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 400 { + // TODO: handle error body and write debug output + return "", errors.Errorf("unexpected status: %s", resp.Status) + } + + decoder := json.NewDecoder(resp.Body) + + var tr getTokenResponse + if err = decoder.Decode(&tr); err != nil { + return "", fmt.Errorf("unable to decode token response: %s", err) + } + + // `access_token` is equivalent to `token` and if both are specified + // the choice is undefined. Canonicalize `access_token` by sticking + // things in `token`. + if tr.AccessToken != "" { + tr.Token = tr.AccessToken + } + + if tr.Token == "" { + return "", ErrNoToken + } + + return tr.Token, nil +} + +func (r *containerdResolver) getV2Urls(urls []string, imagePath string) ([]url.URL, error) { + v2Urls := []url.URL{} + for _, u := range urls { + v2Url, err := url.Parse(u) + if err != nil { + return nil, fmt.Errorf("Failed to parse url during getv2 urls: %+v, err:%s", u, err) + } + v2Url.Path = path.Join("/v2", imagePath) + v2Urls = append(v2Urls, *v2Url) + } + return v2Urls, nil +} diff --git a/pkg/containerd/resolver/scope.go b/pkg/containerd/resolver/scope.go new file mode 100644 index 000000000..c8ced9179 --- /dev/null +++ b/pkg/containerd/resolver/scope.go @@ -0,0 +1,80 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "context" + "net/url" + "sort" + "strings" + + "github.com/containerd/containerd/reference" +) + +// repositoryScope returns a repository scope string such as "repository:foo/bar:pull" +// for "host/foo/bar:baz". +// When push is true, both pull and push are added to the scope. +func repositoryScope(refspec reference.Spec, push bool) (string, error) { + u, err := url.Parse("dummy://" + refspec.Locator) + if err != nil { + return "", err + } + s := "repository:" + strings.TrimPrefix(u.Path, "/") + ":pull" + if push { + s += ",push" + } + return s, nil +} + +// tokenScopesKey is used for the key for context.WithValue(). +// value: []string (e.g. {"registry:foo/bar:pull"}) +type tokenScopesKey struct{} + +// contextWithRepositoryScope returns a context with tokenScopesKey{} and the repository scope value. +func contextWithRepositoryScope(ctx context.Context, refspec reference.Spec, push bool) (context.Context, error) { + s, err := repositoryScope(refspec, push) + if err != nil { + return nil, err + } + return context.WithValue(ctx, tokenScopesKey{}, []string{s}), nil +} + +// getTokenScopes returns deduplicated and sorted scopes from ctx.Value(tokenScopesKey{}) and params["scope"]. +func getTokenScopes(ctx context.Context, params map[string]string) []string { + var scopes []string + if x := ctx.Value(tokenScopesKey{}); x != nil { + scopes = append(scopes, x.([]string)...) + } + if scope, ok := params["scope"]; ok { + for _, s := range scopes { + // Note: this comparison is unaware of the scope grammar (https://docs.docker.com/registry/spec/auth/scope/) + // So, "repository:foo/bar:pull,push" != "repository:foo/bar:push,pull", although semantically they are equal. + if s == scope { + // already appended + goto Sort + } + } + scopes = append(scopes, scope) + } +Sort: + sort.Strings(scopes) + return scopes +} diff --git a/pkg/containerd/resolver/scope_test.go b/pkg/containerd/resolver/scope_test.go new file mode 100644 index 000000000..f620dd4e6 --- /dev/null +++ b/pkg/containerd/resolver/scope_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "testing" + + "github.com/containerd/containerd/reference" + "github.com/stretchr/testify/assert" +) + +func TestRepositoryScope(t *testing.T) { + testCases := []struct { + refspec reference.Spec + push bool + expected string + }{ + { + refspec: reference.Spec{ + Locator: "host/foo/bar", + Object: "ignored", + }, + push: false, + expected: "repository:foo/bar:pull", + }, + { + refspec: reference.Spec{ + Locator: "host:4242/foo/bar", + Object: "ignored", + }, + push: true, + expected: "repository:foo/bar:pull,push", + }, + } + for _, x := range testCases { + actual, err := repositoryScope(x.refspec, x.push) + assert.NoError(t, err) + assert.Equal(t, x.expected, actual) + } +} diff --git a/pkg/containerd/resolver/status.go b/pkg/containerd/resolver/status.go new file mode 100644 index 000000000..5ffcdaec0 --- /dev/null +++ b/pkg/containerd/resolver/status.go @@ -0,0 +1,71 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "sync" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/pkg/errors" +) + +// Status of a content operation +type Status struct { + content.Status + + // UploadUUID is used by the Docker registry to reference blob uploads + UploadUUID string +} + +// StatusTracker to track status of operations +type StatusTracker interface { + GetStatus(string) (Status, error) + SetStatus(string, Status) +} + +type memoryStatusTracker struct { + statuses map[string]Status + m sync.Mutex +} + +// NewInMemoryTracker returns a StatusTracker that tracks content status in-memory +func NewInMemoryTracker() StatusTracker { + return &memoryStatusTracker{ + statuses: map[string]Status{}, + } +} + +func (t *memoryStatusTracker) GetStatus(ref string) (Status, error) { + t.m.Lock() + defer t.m.Unlock() + status, ok := t.statuses[ref] + if !ok { + return Status{}, errors.Wrapf(errdefs.ErrNotFound, "status for ref %v", ref) + } + return status, nil +} + +func (t *memoryStatusTracker) SetStatus(ref string, status Status) { + t.m.Lock() + t.statuses[ref] = status + t.m.Unlock() +} diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 48b98f2c7..dbc6fffb1 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -210,7 +210,6 @@ func (c *criContainerdService) localResolve(ctx context.Context, refOrID string) if _, err := imagedigest.Parse(refOrID); err == nil { return refOrID } - return func(ref string) string { // ref is not image id, try to resolve it locally. normalized, err := util.NormalizeImageRef(ref) diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index bf6634fe5..884f0dd7c 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -25,12 +25,12 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" containerdimages "github.com/containerd/containerd/images" - "github.com/containerd/containerd/remotes/docker" imagespec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + containerdresolver "github.com/containerd/cri-containerd/pkg/containerd/resolver" imagestore "github.com/containerd/cri-containerd/pkg/store/image" "github.com/containerd/cri-containerd/pkg/util" ) @@ -83,15 +83,14 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma if err != nil { return nil, fmt.Errorf("failed to parse image reference %q: %v", imageRef, err) } - // TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference. ref := namedRef.String() if ref != imageRef { logrus.Debugf("PullImage using normalized image ref: %q", ref) } - - resolver := docker.NewResolver(docker.ResolverOptions{ + resolver := containerdresolver.NewResolver(containerdresolver.Options{ Credentials: func(string) (string, string, error) { return ParseAuth(r.GetAuth()) }, Client: http.DefaultClient, + Registry: c.getResolverOptions(), }) _, desc, err := resolver.Resolve(ctx, ref) if err != nil { @@ -212,3 +211,11 @@ func (c *criContainerdService) createImageReference(ctx context.Context, name st _, err = c.client.ImageService().Update(ctx, img, "target") return err } + +func (c *criContainerdService) getResolverOptions() map[string][]string { + options := make(map[string][]string) + for ns, mirror := range c.config.Mirrors { + options[ns] = append(options[ns], mirror.Endpoints...) + } + return options +}