diff --git a/cmd/cri-containerd/options/options.go b/cmd/cri-containerd/options/options.go index 5599c007b..7c72268f9 100644 --- a/cmd/cri-containerd/options/options.go +++ b/cmd/cri-containerd/options/options.go @@ -72,6 +72,8 @@ type PluginConfig struct { ContainerdConfig `toml:"containerd" json:"containerd,omitempty"` // CniConfig contains config related to cni CniConfig `toml:"cni" json:"cni,omitempty"` + // Registry contains config related to the registry + Registry `toml:"registry" json:"registry,omitempty"` // StreamServerAddress is the ip address streaming server is listening on. StreamServerAddress string `toml:"stream_server_address" json:"streamServerAddress,omitempty"` // StreamServerPort is the port streaming server is listening on. @@ -191,6 +193,8 @@ func (c *CRIContainerdOptions) AddFlags(fs *pflag.FlagSet) { defaults.SkipImageFSUUID, "Skip retrieval of imagefs uuid. When turned on, kubelet will not be able to get imagefs capacity or perform imagefs disk eviction.") fs.BoolVar(&c.EnableIPv6DAD, "enable-ipv6-dad", defaults.EnableIPv6DAD, "Enable IPv6 DAD (duplicate address detection) for pod sandbox network. Enabling this will increase pod sandbox start latency by several seconds.") + fs.Var(&c.Registry, "registry", + "Registry config for image pull eg --registry=myregistry.io=https://mymirror.io/ --registry=myregistry2.io=https://mymirror2.io/") } // InitFlags load configurations from config file, and then overwrite with flags. @@ -259,6 +263,13 @@ func DefaultConfig() Config { SystemdCgroup: false, SkipImageFSUUID: false, EnableIPv6DAD: false, + Registry: Registry{ + Mirrors: map[string]Mirror{ + "docker.io": { + Endpoints: []string{"https://registry-1.docker.io"}, + }, + }, + }, }, ContainerdRootDir: "/var/lib/containerd", ContainerdEndpoint: "/run/containerd/containerd.sock", diff --git a/cmd/cri-containerd/options/registry.go b/cmd/cri-containerd/options/registry.go new file mode 100644 index 000000000..7f057ee4c --- /dev/null +++ b/cmd/cri-containerd/options/registry.go @@ -0,0 +1,88 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + "fmt" + "net/url" + "strings" +) + +// Mirror contains the config related to the registry mirror +type Mirror struct { + Endpoints []string `toml:"endpoint" json:"endpoint,omitempty"` + // TODO (Abhi) We might need to add auth per namespace. Looks like + // image auth information is passed by kube itself. +} + +// Registry is registry settings configured +type Registry struct { + Mirrors map[string]Mirror `toml:"mirrors" json:"mirrors,omitempty"` +} + +// String returns the string format of registry type +func (r *Registry) String() string { + // Its not used hence return empty string + return "" +} + +// Set validates and converts into the internal registry struct +func (r *Registry) Set(s string) error { + // --registry docker.io=https://mymirror.io,http://mymirror2.io + // If no option is set then return format error + if len(s) == 0 { + return fmt.Errorf("incomplete registry mirror option") + } + var mirrors []string + host := "docker.io" + opt := strings.Split(s, "=") + if len(opt) > 1 { + // If option is set in the format "mynamespace.io=https://mymirror.io,https://mymirror2.io" + // Then associate the mirror urls for the namespace only" + host = opt[0] + mirrors = strings.Split(opt[1], ",") + } else { + // If option is set in the format "https://mymirror.io,https://mymirror.io" + // Then associate mirror against default docker.io namespace + mirrors = strings.Split(opt[0], ",") + } + + // Validate the format of the urls passed + for _, u := range mirrors { + _, err := url.Parse(u) + if err != nil { + return fmt.Errorf("invalid registry mirror url format %v: %v", u, err) + } + } + + if r.Mirrors == nil { + r.Mirrors = make(map[string]Mirror) + } + if _, ok := r.Mirrors[host]; !ok { + r.Mirrors[host] = Mirror{} + } + m := r.Mirrors[host] + m.Endpoints = append(m.Endpoints, mirrors...) + r.Mirrors[host] = m + + return nil +} + +// Type returns a string name for the option type +func (r *Registry) Type() string { + return "list" +} diff --git a/pkg/containerd/resolver/auth.go b/pkg/containerd/resolver/auth.go new file mode 100644 index 000000000..15d406a93 --- /dev/null +++ b/pkg/containerd/resolver/auth.go @@ -0,0 +1,202 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/ +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "net/http" + "sort" + "strings" +) + +type authenticationScheme byte + +const ( + basicAuth authenticationScheme = 1 << iota // Defined in RFC 7617 + digestAuth // Defined in RFC 7616 + bearerAuth // Defined in RFC 6750 +) + +// challenge carries information from a WWW-Authenticate response header. +// See RFC 2617. +type challenge struct { + // scheme is the auth-scheme according to RFC 2617 + scheme authenticationScheme + + // parameters are the auth-params according to RFC 2617 + parameters map[string]string +} + +type byScheme []challenge + +func (bs byScheme) Len() int { return len(bs) } +func (bs byScheme) Swap(i, j int) { bs[i], bs[j] = bs[j], bs[i] } + +// Sort in priority order: token > digest > basic +func (bs byScheme) Less(i, j int) bool { return bs[i].scheme > bs[j].scheme } + +// Octet types from RFC 2616. +type octetType byte + +var octetTypes [256]octetType + +const ( + isToken octetType = 1 << iota + isSpace +) + +func init() { + // OCTET = + // CHAR = + // CTL = + // CR = + // LF = + // SP = + // HT = + // <"> = + // CRLF = CR LF + // LWS = [CRLF] 1*( SP | HT ) + // TEXT = + // separators = "(" | ")" | "<" | ">" | "@" | "," | ";" | ":" | "\" | <"> + // | "/" | "[" | "]" | "?" | "=" | "{" | "}" | SP | HT + // token = 1* + // qdtext = > + + for c := 0; c < 256; c++ { + var t octetType + isCtl := c <= 31 || c == 127 + isChar := 0 <= c && c <= 127 + isSeparator := strings.ContainsRune(" \t\"(),/:;<=>?@[]\\{}", rune(c)) + if strings.ContainsRune(" \t\r\n", rune(c)) { + t |= isSpace + } + if isChar && !isCtl && !isSeparator { + t |= isToken + } + octetTypes[c] = t + } +} + +func parseAuthHeader(header http.Header) []challenge { + challenges := []challenge{} + for _, h := range header[http.CanonicalHeaderKey("WWW-Authenticate")] { + v, p := parseValueAndParams(h) + var s authenticationScheme + switch v { + case "basic": + s = basicAuth + case "digest": + s = digestAuth + case "bearer": + s = bearerAuth + default: + continue + } + challenges = append(challenges, challenge{scheme: s, parameters: p}) + } + sort.Stable(byScheme(challenges)) + return challenges +} + +func parseValueAndParams(header string) (value string, params map[string]string) { + params = make(map[string]string) + value, s := expectToken(header) + if value == "" { + return + } + value = strings.ToLower(value) + for { + var pkey string + pkey, s = expectToken(skipSpace(s)) + if pkey == "" { + return + } + if !strings.HasPrefix(s, "=") { + return + } + var pvalue string + pvalue, s = expectTokenOrQuoted(s[1:]) + if pvalue == "" { + return + } + pkey = strings.ToLower(pkey) + params[pkey] = pvalue + s = skipSpace(s) + if !strings.HasPrefix(s, ",") { + return + } + s = s[1:] + } +} + +func skipSpace(s string) (rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isSpace == 0 { + break + } + } + return s[i:] +} + +func expectToken(s string) (token, rest string) { + i := 0 + for ; i < len(s); i++ { + if octetTypes[s[i]]&isToken == 0 { + break + } + } + return s[:i], s[i:] +} + +func expectTokenOrQuoted(s string) (value string, rest string) { + if !strings.HasPrefix(s, "\"") { + return expectToken(s) + } + s = s[1:] + for i := 0; i < len(s); i++ { + switch s[i] { + case '"': + return s[:i], s[i+1:] + case '\\': + p := make([]byte, len(s)-1) + j := copy(p, s[:i]) + escape := true + for i = i + 1; i < len(s); i++ { + b := s[i] + switch { + case escape: + escape = false + p[j] = b + j++ + case b == '\\': + escape = true + case b == '"': + return string(p[:j]), s[i+1:] + default: + p[j] = b + j++ + } + } + return "", "" + } + } + return "", "" +} diff --git a/pkg/containerd/resolver/fetcher.go b/pkg/containerd/resolver/fetcher.go new file mode 100644 index 000000000..45cc605a7 --- /dev/null +++ b/pkg/containerd/resolver/fetcher.go @@ -0,0 +1,142 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/ +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "context" + "fmt" + "io" + "net/http" + "path" + "strings" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type dockerFetcher struct { + *dockerBase +} + +func (r dockerFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields( + logrus.Fields{ + //"base": r.base.String(), + "digest": desc.Digest, + }, + )) + + urls, err := r.getV2URLPaths(ctx, desc) + if err != nil { + return nil, err + } + + ctx, err = contextWithRepositoryScope(ctx, r.refspec, false) + if err != nil { + return nil, err + } + + return newHTTPReadSeeker(desc.Size, func(offset int64) (io.ReadCloser, error) { + for _, u := range urls { + rc, err := r.open(ctx, u, desc.MediaType, offset) + if err != nil { + if errdefs.IsNotFound(err) { + continue // try one of the other urls. + } + + return nil, err + } + + return rc, nil + } + + return nil, errors.Wrapf(errdefs.ErrNotFound, + "could not fetch content descriptor %v (%v) from remote", + desc.Digest, desc.MediaType) + + }) +} + +func (r dockerFetcher) open(ctx context.Context, u, mediatype string, offset int64) (io.ReadCloser, error) { + req, err := http.NewRequest(http.MethodGet, u, nil) + if err != nil { + return nil, err + } + + req.Header.Set("Accept", strings.Join([]string{mediatype, `*`}, ", ")) + + if offset > 0 { + // TODO(stevvooe): Only set this header in response to the + // "Accept-Ranges: bytes" header. + req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) + } + + resp, err := r.doRequestWithRetries(ctx, req, nil) + if err != nil { + return nil, err + } + + if resp.StatusCode > 299 { + // TODO(stevvooe): When doing a offset specific request, we should + // really distinguish between a 206 and a 200. In the case of 200, we + // can discard the bytes, hiding the seek behavior from the + // implementation. + + resp.Body.Close() + if resp.StatusCode == http.StatusNotFound { + return nil, errors.Wrapf(errdefs.ErrNotFound, "content at %v not found", u) + } + return nil, errors.Errorf("unexpected status code %v: %v", u, resp.Status) + } + + return resp.Body, nil +} + +// getV2URLPaths generates the candidate urls paths for the object based on the +// set of hints and the provided object id. URLs are returned in the order of +// most to least likely succeed. +func (r *dockerFetcher) getV2URLPaths(ctx context.Context, desc ocispec.Descriptor) ([]string, error) { + var urls []string + + if len(desc.URLs) > 0 { + // handle fetch via external urls. + for _, u := range desc.URLs { + log.G(ctx).WithField("url", u).Debug("adding alternative url") + urls = append(urls, u) + } + } + + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, images.MediaTypeDockerSchema2ManifestList, + images.MediaTypeDockerSchema1Manifest, + ocispec.MediaTypeImageManifest, ocispec.MediaTypeImageIndex: + urls = append(urls, r.url(path.Join("manifests", desc.Digest.String()))) + } + + // always fallback to attempting to get the object out of the blobs store. + urls = append(urls, r.url(path.Join("blobs", desc.Digest.String()))) + + return urls, nil +} diff --git a/pkg/containerd/resolver/httpreadseeker.go b/pkg/containerd/resolver/httpreadseeker.go new file mode 100644 index 000000000..16af3d7e4 --- /dev/null +++ b/pkg/containerd/resolver/httpreadseeker.go @@ -0,0 +1,148 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "bytes" + "io" + "io/ioutil" + + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/pkg/errors" +) + +type httpReadSeeker struct { + size int64 + offset int64 + rc io.ReadCloser + open func(offset int64) (io.ReadCloser, error) + closed bool +} + +func newHTTPReadSeeker(size int64, open func(offset int64) (io.ReadCloser, error)) (io.ReadCloser, error) { + return &httpReadSeeker{ + size: size, + open: open, + }, nil +} + +func (hrs *httpReadSeeker) Read(p []byte) (n int, err error) { + if hrs.closed { + return 0, io.EOF + } + + rd, err := hrs.reader() + if err != nil { + return 0, err + } + + n, err = rd.Read(p) + hrs.offset += int64(n) + return +} + +func (hrs *httpReadSeeker) Close() error { + if hrs.closed { + return nil + } + hrs.closed = true + if hrs.rc != nil { + return hrs.rc.Close() + } + + return nil +} + +func (hrs *httpReadSeeker) Seek(offset int64, whence int) (int64, error) { + if hrs.closed { + return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: closed") + } + + abs := hrs.offset + switch whence { + case io.SeekStart: + abs = offset + case io.SeekCurrent: + abs += offset + case io.SeekEnd: + if hrs.size == -1 { + return 0, errors.Wrap(errdefs.ErrUnavailable, "Fetcher.Seek: unknown size, cannot seek from end") + } + abs = hrs.size + offset + default: + return 0, errors.Wrap(errdefs.ErrInvalidArgument, "Fetcher.Seek: invalid whence") + } + + if abs < 0 { + return 0, errors.Wrapf(errdefs.ErrInvalidArgument, "Fetcher.Seek: negative offset") + } + + if abs != hrs.offset { + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Errorf("Fetcher.Seek: failed to close ReadCloser") + } + + hrs.rc = nil + } + + hrs.offset = abs + } + + return hrs.offset, nil +} + +func (hrs *httpReadSeeker) reader() (io.Reader, error) { + if hrs.rc != nil { + return hrs.rc, nil + } + + if hrs.size == -1 || hrs.offset < hrs.size { + // only try to reopen the body request if we are seeking to a value + // less than the actual size. + if hrs.open == nil { + return nil, errors.Wrapf(errdefs.ErrNotImplemented, "cannot open") + } + + rc, err := hrs.open(hrs.offset) + if err != nil { + return nil, errors.Wrapf(err, "httpReaderSeeker: failed open") + } + + if hrs.rc != nil { + if err := hrs.rc.Close(); err != nil { + log.L.WithError(err).Errorf("httpReadSeeker: failed to close ReadCloser") + } + } + hrs.rc = rc + } else { + // There is an edge case here where offset == size of the content. If + // we seek, we will probably get an error for content that cannot be + // sought (?). In that case, we should err on committing the content, + // as the length is already satisified but we just return the empty + // reader instead. + + hrs.rc = ioutil.NopCloser(bytes.NewReader([]byte{})) + } + + return hrs.rc, nil +} diff --git a/pkg/containerd/resolver/resolver.go b/pkg/containerd/resolver/resolver.go new file mode 100644 index 000000000..4d9195ac7 --- /dev/null +++ b/pkg/containerd/resolver/resolver.go @@ -0,0 +1,606 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resolver + +import ( + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "time" + + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + "github.com/containerd/containerd/remotes" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "golang.org/x/net/context/ctxhttp" +) + +// This file is a modified copy of containerd/remote/docker/resolver.go. +// The changes carried over by this file includes support for image ref +// resolution and fetching image using multiple registry mirror urls. A new +// ResolverOption called Registry is added. Registry will contain a map +// of namespace relevant mirror urls. The client will use the ResolverOptions +// to set the urls associated with the namespace of the image reference. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. The specific changes are made to the base +// function to calculate the base urls for the image location. urls() is +// added to fetch the mirror urls associated with the namespace of the image +// ResolverOptions are changed for client to set the namespace and mirror urls +// for to pull the image. + +var ( + // ErrNoToken is returned if a request is successful but the body does not + // contain an authorization token. + ErrNoToken = errors.New("authorization server did not include a token in the response") + + // ErrInvalidAuthorization is used when credentials are passed to a server but + // those credentials are rejected. + ErrInvalidAuthorization = errors.New("authorization failed") +) + +type containerdResolver struct { + credentials func(string) (string, string, error) + plainHTTP bool + client *http.Client + tracker StatusTracker + registry map[string][]string +} + +// Options are used to configured a new Docker register resolver +type Options struct { + // Credentials provides username and secret given a host. + // If username is empty but a secret is given, that secret + // is interpretted as a long lived token. + Credentials func(string) (string, string, error) + + // PlainHTTP specifies to use plain http and not https + PlainHTTP bool + + // Client is the http client to used when making registry requests + Client *http.Client + + // Tracker is used to track uploads to the registry. This is used + // since the registry does not have upload tracking and the existing + // mechanism for getting blob upload status is expensive. + Tracker StatusTracker + + Registry map[string][]string +} + +// NewResolver returns a new resolver to a Docker registry +func NewResolver(options Options) remotes.Resolver { + tracker := options.Tracker + if tracker == nil { + tracker = NewInMemoryTracker() + } + + return &containerdResolver{ + credentials: options.Credentials, + plainHTTP: options.PlainHTTP, + client: options.Client, + tracker: tracker, + registry: options.Registry, + } +} + +var _ remotes.Resolver = &containerdResolver{} + +func (r *containerdResolver) Resolve(ctx context.Context, ref string) (string, ocispec.Descriptor, error) { + refspec, err := reference.Parse(ref) + if err != nil { + return "", ocispec.Descriptor{}, err + } + + if refspec.Object == "" { + return "", ocispec.Descriptor{}, reference.ErrObjectRequired + } + + base, err := r.base(refspec) + if err != nil { + return "", ocispec.Descriptor{}, err + } + + fetcher := dockerFetcher{ + dockerBase: base, + } + + var ( + urls []string + dgst = refspec.Digest() + ) + + if dgst != "" { + if err := dgst.Validate(); err != nil { + // need to fail here, since we can't actually resolve the invalid + // digest. + return "", ocispec.Descriptor{}, err + } + + // turns out, we have a valid digest, make a url. + urls = append(urls, fetcher.urls("manifests", dgst.String())...) + + // fallback to blobs on not found. + urls = append(urls, fetcher.urls("blobs", dgst.String())...) + } else { + urls = append(urls, fetcher.urls("manifests", refspec.Object)...) + } + + ctx, err = contextWithRepositoryScope(ctx, refspec, false) + if err != nil { + return "", ocispec.Descriptor{}, err + } + for _, u := range urls { + log.G(ctx).WithFields(logrus.Fields{ + "url": u, + }).Debug("Trying to fetch from url") + req, err := http.NewRequest(http.MethodHead, u, nil) + if err != nil { + return "", ocispec.Descriptor{}, err + } + + // set headers for all the types we support for resolution. + req.Header.Set("Accept", strings.Join([]string{ + images.MediaTypeDockerSchema2Manifest, + images.MediaTypeDockerSchema2ManifestList, + ocispec.MediaTypeImageManifest, + ocispec.MediaTypeImageIndex, "*"}, ", ")) + + log.G(ctx).Info("resolving") + resp, err := fetcher.doRequestWithRetries(ctx, req, nil) + if err != nil { + return "", ocispec.Descriptor{}, err + } + resp.Body.Close() // don't care about body contents. + + if resp.StatusCode > 299 { + if resp.StatusCode == http.StatusNotFound { + continue + } + return "", ocispec.Descriptor{}, errors.Errorf("unexpected status code %v: %v", u, resp.Status) + } + + // this is the only point at which we trust the registry. we use the + // content headers to assemble a descriptor for the name. when this becomes + // more robust, we mostly get this information from a secure trust store. + dgstHeader := digest.Digest(resp.Header.Get("Docker-Content-Digest")) + + if dgstHeader != "" { + if err := dgstHeader.Validate(); err != nil { + return "", ocispec.Descriptor{}, errors.Wrapf(err, "%q in header not a valid digest", dgstHeader) + } + dgst = dgstHeader + } + + if dgst == "" { + return "", ocispec.Descriptor{}, errors.Errorf("could not resolve digest for %v", ref) + } + + var ( + size int64 + sizeHeader = resp.Header.Get("Content-Length") + ) + + size, err = strconv.ParseInt(sizeHeader, 10, 64) + if err != nil { + + return "", ocispec.Descriptor{}, errors.Wrapf(err, "invalid size header: %q", sizeHeader) + } + if size < 0 { + return "", ocispec.Descriptor{}, errors.Errorf("%q in header not a valid size", sizeHeader) + } + + desc := ocispec.Descriptor{ + Digest: dgst, + MediaType: resp.Header.Get("Content-Type"), // need to strip disposition? + Size: size, + } + + log.G(ctx).WithField("desc.digest", desc.Digest).Debug("resolved") + return ref, desc, nil + } + + return "", ocispec.Descriptor{}, errors.Errorf("%v not found", ref) +} + +func (r *containerdResolver) Fetcher(ctx context.Context, ref string) (remotes.Fetcher, error) { + refspec, err := reference.Parse(ref) + if err != nil { + return nil, err + } + + base, err := r.base(refspec) + if err != nil { + return nil, err + } + + return dockerFetcher{ + dockerBase: base, + }, nil +} + +func (r *containerdResolver) Pusher(ctx context.Context, ref string) (remotes.Pusher, error) { + return nil, nil +} + +type dockerBase struct { + refspec reference.Spec + base []url.URL + token string + + client *http.Client + useBasic bool + username string + secret string +} + +func (r *containerdResolver) base(refspec reference.Spec) (*dockerBase, error) { + var ( + err error + base []url.URL + username, secret string + ) + + host := refspec.Hostname() + prefix := strings.TrimPrefix(refspec.Locator, host+"/") + + if urls, ok := r.registry[host]; ok { + urls, err := r.getV2Urls(urls, prefix) + if err != nil { + return nil, fmt.Errorf("failed to fetch v2 urls: %v", err) + } + base = append(base, urls...) + } else if host == "docker.io" { + base = append(base, []url.URL{{Host: "registry-1.docker.io", Scheme: "https", Path: path.Join("/v2", prefix)}}...) + } else { + scheme := "https" + if r.plainHTTP || strings.HasPrefix(host, "localhost:") { + scheme = "http" + } + base = append(base, []url.URL{{Host: host, Scheme: scheme, Path: path.Join("/v2", prefix)}}...) + } + + if r.credentials != nil { + username, secret, err = r.credentials(base[0].Host) + if err != nil { + return nil, err + } + } + + return &dockerBase{ + refspec: refspec, + base: base, + client: r.client, + username: username, + secret: secret, + }, nil +} + +func (r *dockerBase) url(ps ...string) string { + url := r.base[0] + url.Path = path.Join(url.Path, path.Join(ps...)) + return url.String() +} + +func (r *dockerBase) urls(ps ...string) []string { + urls := []string{} + for _, url := range r.base { + url.Path = path.Join(url.Path, path.Join(ps...)) + urls = append(urls, url.String()) + } + return urls +} + +func (r *dockerBase) authorize(req *http.Request) { + if r.useBasic { + req.SetBasicAuth(r.username, r.secret) + } else if r.token != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", r.token)) + } +} + +func (r *dockerBase) doRequest(ctx context.Context, req *http.Request) (*http.Response, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithField("url", req.URL.String())) + log.G(ctx).WithField("request.headers", req.Header).WithField("request.method", req.Method).Debug("do request") + r.authorize(req) + resp, err := ctxhttp.Do(ctx, r.client, req) + if err != nil { + return nil, errors.Wrap(err, "failed to do request") + } + log.G(ctx).WithFields(logrus.Fields{ + "status": resp.Status, + "response.headers": resp.Header, + }).Debug("fetch response received") + return resp, nil +} + +func (r *dockerBase) doRequestWithRetries(ctx context.Context, req *http.Request, responses []*http.Response) (*http.Response, error) { + resp, err := r.doRequest(ctx, req) + if err != nil { + return nil, err + } + + responses = append(responses, resp) + req, err = r.retryRequest(ctx, req, responses) + if err != nil { + resp.Body.Close() + return nil, err + } + if req != nil { + resp.Body.Close() + return r.doRequestWithRetries(ctx, req, responses) + } + return resp, err +} + +func (r *dockerBase) retryRequest(ctx context.Context, req *http.Request, responses []*http.Response) (*http.Request, error) { + if len(responses) > 5 { + return nil, nil + } + last := responses[len(responses)-1] + if last.StatusCode == http.StatusUnauthorized { + log.G(ctx).WithField("header", last.Header.Get("WWW-Authenticate")).Debug("Unauthorized") + for _, c := range parseAuthHeader(last.Header) { + if c.scheme == bearerAuth { + if err := invalidAuthorization(c, responses); err != nil { + r.token = "" + return nil, err + } + if err := r.setTokenAuth(ctx, c.parameters); err != nil { + return nil, err + } + return copyRequest(req) + } else if c.scheme == basicAuth { + if r.username != "" && r.secret != "" { + r.useBasic = true + } + return copyRequest(req) + } + } + return nil, nil + } else if last.StatusCode == http.StatusMethodNotAllowed && req.Method == http.MethodHead { + // Support registries which have not properly implemented the HEAD method for + // manifests endpoint + if strings.Contains(req.URL.Path, "/manifests/") { + // TODO: copy request? + req.Method = http.MethodGet + return copyRequest(req) + } + } + + // TODO: Handle 50x errors accounting for attempt history + return nil, nil +} + +func invalidAuthorization(c challenge, responses []*http.Response) error { + errStr := c.parameters["error"] + if errStr == "" { + return nil + } + + n := len(responses) + if n == 1 || (n > 1 && !sameRequest(responses[n-2].Request, responses[n-1].Request)) { + return nil + } + + return errors.Wrapf(ErrInvalidAuthorization, "server message: %s", errStr) +} + +func sameRequest(r1, r2 *http.Request) bool { + if r1.Method != r2.Method { + return false + } + if *r1.URL != *r2.URL { + return false + } + return true +} + +func copyRequest(req *http.Request) (*http.Request, error) { + ireq := *req + if ireq.GetBody != nil { + var err error + ireq.Body, err = ireq.GetBody() + if err != nil { + return nil, err + } + } + return &ireq, nil +} + +func (r *dockerBase) setTokenAuth(ctx context.Context, params map[string]string) error { + realm, ok := params["realm"] + if !ok { + return errors.New("no realm specified for token auth challenge") + } + + realmURL, err := url.Parse(realm) + if err != nil { + return fmt.Errorf("invalid token auth challenge realm: %s", err) + } + + to := tokenOptions{ + realm: realmURL.String(), + service: params["service"], + } + + to.scopes = getTokenScopes(ctx, params) + if len(to.scopes) == 0 { + return errors.Errorf("no scope specified for token auth challenge") + } + if r.secret != "" { + // Credential information is provided, use oauth POST endpoint + r.token, err = r.fetchTokenWithOAuth(ctx, to) + if err != nil { + return errors.Wrap(err, "failed to fetch oauth token") + } + } else { + // Do request anonymously + r.token, err = r.getToken(ctx, to) + if err != nil { + return errors.Wrap(err, "failed to fetch anonymous token") + } + } + + return nil +} + +type tokenOptions struct { + realm string + service string + scopes []string +} + +type postTokenResponse struct { + AccessToken string `json:"access_token"` + RefreshToken string `json:"refresh_token"` + ExpiresIn int `json:"expires_in"` + IssuedAt time.Time `json:"issued_at"` + Scope string `json:"scope"` +} + +func (r *dockerBase) fetchTokenWithOAuth(ctx context.Context, to tokenOptions) (string, error) { + form := url.Values{} + form.Set("scope", strings.Join(to.scopes, " ")) + form.Set("service", to.service) + // TODO: Allow setting client_id + form.Set("client_id", "containerd-dist-tool") + + if r.username == "" { + form.Set("grant_type", "refresh_token") + form.Set("refresh_token", r.secret) + } else { + form.Set("grant_type", "password") + form.Set("username", r.username) + form.Set("password", r.secret) + } + + resp, err := ctxhttp.PostForm(ctx, r.client, to.realm, form) + if err != nil { + return "", err + } + defer resp.Body.Close() + + // Registries without support for POST may return 404 for POST /v2/token. + // As of September 2017, GCR is known to return 404. + if (resp.StatusCode == 405 && r.username != "") || resp.StatusCode == 404 { + return r.getToken(ctx, to) + } else if resp.StatusCode < 200 || resp.StatusCode >= 400 { + b, _ := ioutil.ReadAll(io.LimitReader(resp.Body, 64000)) // 64KB + log.G(ctx).WithFields(logrus.Fields{ + "status": resp.Status, + "body": string(b), + }).Debugf("token request failed") + // TODO: handle error body and write debug output + return "", errors.Errorf("unexpected status: %s", resp.Status) + } + + decoder := json.NewDecoder(resp.Body) + + var tr postTokenResponse + if err = decoder.Decode(&tr); err != nil { + return "", fmt.Errorf("unable to decode token response: %s", err) + } + + return tr.AccessToken, nil +} + +type getTokenResponse struct { + Token string `json:"token"` + AccessToken string `json:"access_token"` + ExpiresIn int `json:"expires_in"` + IssuedAt time.Time `json:"issued_at"` + RefreshToken string `json:"refresh_token"` +} + +// getToken fetches a token using a GET request +func (r *dockerBase) getToken(ctx context.Context, to tokenOptions) (string, error) { + req, err := http.NewRequest("GET", to.realm, nil) + if err != nil { + return "", err + } + + reqParams := req.URL.Query() + + if to.service != "" { + reqParams.Add("service", to.service) + } + + for _, scope := range to.scopes { + reqParams.Add("scope", scope) + } + + if r.secret != "" { + req.SetBasicAuth(r.username, r.secret) + } + + req.URL.RawQuery = reqParams.Encode() + + resp, err := ctxhttp.Do(ctx, r.client, req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 400 { + // TODO: handle error body and write debug output + return "", errors.Errorf("unexpected status: %s", resp.Status) + } + + decoder := json.NewDecoder(resp.Body) + + var tr getTokenResponse + if err = decoder.Decode(&tr); err != nil { + return "", fmt.Errorf("unable to decode token response: %s", err) + } + + // `access_token` is equivalent to `token` and if both are specified + // the choice is undefined. Canonicalize `access_token` by sticking + // things in `token`. + if tr.AccessToken != "" { + tr.Token = tr.AccessToken + } + + if tr.Token == "" { + return "", ErrNoToken + } + + return tr.Token, nil +} + +func (r *containerdResolver) getV2Urls(urls []string, imagePath string) ([]url.URL, error) { + v2Urls := []url.URL{} + for _, u := range urls { + v2Url, err := url.Parse(u) + if err != nil { + return nil, fmt.Errorf("Failed to parse url during getv2 urls: %+v, err:%s", u, err) + } + v2Url.Path = path.Join("/v2", imagePath) + v2Urls = append(v2Urls, *v2Url) + } + return v2Urls, nil +} diff --git a/pkg/containerd/resolver/scope.go b/pkg/containerd/resolver/scope.go new file mode 100644 index 000000000..c8ced9179 --- /dev/null +++ b/pkg/containerd/resolver/scope.go @@ -0,0 +1,80 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "context" + "net/url" + "sort" + "strings" + + "github.com/containerd/containerd/reference" +) + +// repositoryScope returns a repository scope string such as "repository:foo/bar:pull" +// for "host/foo/bar:baz". +// When push is true, both pull and push are added to the scope. +func repositoryScope(refspec reference.Spec, push bool) (string, error) { + u, err := url.Parse("dummy://" + refspec.Locator) + if err != nil { + return "", err + } + s := "repository:" + strings.TrimPrefix(u.Path, "/") + ":pull" + if push { + s += ",push" + } + return s, nil +} + +// tokenScopesKey is used for the key for context.WithValue(). +// value: []string (e.g. {"registry:foo/bar:pull"}) +type tokenScopesKey struct{} + +// contextWithRepositoryScope returns a context with tokenScopesKey{} and the repository scope value. +func contextWithRepositoryScope(ctx context.Context, refspec reference.Spec, push bool) (context.Context, error) { + s, err := repositoryScope(refspec, push) + if err != nil { + return nil, err + } + return context.WithValue(ctx, tokenScopesKey{}, []string{s}), nil +} + +// getTokenScopes returns deduplicated and sorted scopes from ctx.Value(tokenScopesKey{}) and params["scope"]. +func getTokenScopes(ctx context.Context, params map[string]string) []string { + var scopes []string + if x := ctx.Value(tokenScopesKey{}); x != nil { + scopes = append(scopes, x.([]string)...) + } + if scope, ok := params["scope"]; ok { + for _, s := range scopes { + // Note: this comparison is unaware of the scope grammar (https://docs.docker.com/registry/spec/auth/scope/) + // So, "repository:foo/bar:pull,push" != "repository:foo/bar:push,pull", although semantically they are equal. + if s == scope { + // already appended + goto Sort + } + } + scopes = append(scopes, scope) + } +Sort: + sort.Strings(scopes) + return scopes +} diff --git a/pkg/containerd/resolver/scope_test.go b/pkg/containerd/resolver/scope_test.go new file mode 100644 index 000000000..f620dd4e6 --- /dev/null +++ b/pkg/containerd/resolver/scope_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "testing" + + "github.com/containerd/containerd/reference" + "github.com/stretchr/testify/assert" +) + +func TestRepositoryScope(t *testing.T) { + testCases := []struct { + refspec reference.Spec + push bool + expected string + }{ + { + refspec: reference.Spec{ + Locator: "host/foo/bar", + Object: "ignored", + }, + push: false, + expected: "repository:foo/bar:pull", + }, + { + refspec: reference.Spec{ + Locator: "host:4242/foo/bar", + Object: "ignored", + }, + push: true, + expected: "repository:foo/bar:pull,push", + }, + } + for _, x := range testCases { + actual, err := repositoryScope(x.refspec, x.push) + assert.NoError(t, err) + assert.Equal(t, x.expected, actual) + } +} diff --git a/pkg/containerd/resolver/status.go b/pkg/containerd/resolver/status.go new file mode 100644 index 000000000..5ffcdaec0 --- /dev/null +++ b/pkg/containerd/resolver/status.go @@ -0,0 +1,71 @@ +/* +Copyright 2018 The Containerd Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// The corresponding file is in containerd/remote/docker/. +// This package can be removed once a more feasible and hollistic resolver +// is finalized in containerd. + +package resolver + +import ( + "sync" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/pkg/errors" +) + +// Status of a content operation +type Status struct { + content.Status + + // UploadUUID is used by the Docker registry to reference blob uploads + UploadUUID string +} + +// StatusTracker to track status of operations +type StatusTracker interface { + GetStatus(string) (Status, error) + SetStatus(string, Status) +} + +type memoryStatusTracker struct { + statuses map[string]Status + m sync.Mutex +} + +// NewInMemoryTracker returns a StatusTracker that tracks content status in-memory +func NewInMemoryTracker() StatusTracker { + return &memoryStatusTracker{ + statuses: map[string]Status{}, + } +} + +func (t *memoryStatusTracker) GetStatus(ref string) (Status, error) { + t.m.Lock() + defer t.m.Unlock() + status, ok := t.statuses[ref] + if !ok { + return Status{}, errors.Wrapf(errdefs.ErrNotFound, "status for ref %v", ref) + } + return status, nil +} + +func (t *memoryStatusTracker) SetStatus(ref string, status Status) { + t.m.Lock() + t.statuses[ref] = status + t.m.Unlock() +} diff --git a/pkg/server/helpers.go b/pkg/server/helpers.go index 48b98f2c7..dbc6fffb1 100644 --- a/pkg/server/helpers.go +++ b/pkg/server/helpers.go @@ -210,7 +210,6 @@ func (c *criContainerdService) localResolve(ctx context.Context, refOrID string) if _, err := imagedigest.Parse(refOrID); err == nil { return refOrID } - return func(ref string) string { // ref is not image id, try to resolve it locally. normalized, err := util.NormalizeImageRef(ref) diff --git a/pkg/server/image_pull.go b/pkg/server/image_pull.go index bf6634fe5..884f0dd7c 100644 --- a/pkg/server/image_pull.go +++ b/pkg/server/image_pull.go @@ -25,12 +25,12 @@ import ( "github.com/containerd/containerd" "github.com/containerd/containerd/errdefs" containerdimages "github.com/containerd/containerd/images" - "github.com/containerd/containerd/remotes/docker" imagespec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" "golang.org/x/net/context" "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + containerdresolver "github.com/containerd/cri-containerd/pkg/containerd/resolver" imagestore "github.com/containerd/cri-containerd/pkg/store/image" "github.com/containerd/cri-containerd/pkg/util" ) @@ -83,15 +83,14 @@ func (c *criContainerdService) PullImage(ctx context.Context, r *runtime.PullIma if err != nil { return nil, fmt.Errorf("failed to parse image reference %q: %v", imageRef, err) } - // TODO(random-liu): [P0] Avoid concurrent pulling/removing on the same image reference. ref := namedRef.String() if ref != imageRef { logrus.Debugf("PullImage using normalized image ref: %q", ref) } - - resolver := docker.NewResolver(docker.ResolverOptions{ + resolver := containerdresolver.NewResolver(containerdresolver.Options{ Credentials: func(string) (string, string, error) { return ParseAuth(r.GetAuth()) }, Client: http.DefaultClient, + Registry: c.getResolverOptions(), }) _, desc, err := resolver.Resolve(ctx, ref) if err != nil { @@ -212,3 +211,11 @@ func (c *criContainerdService) createImageReference(ctx context.Context, name st _, err = c.client.ImageService().Update(ctx, img, "target") return err } + +func (c *criContainerdService) getResolverOptions() map[string][]string { + options := make(map[string][]string) + for ns, mirror := range c.config.Mirrors { + options[ns] = append(options[ns], mirror.Endpoints...) + } + return options +}