From 506b8154838c0561ec51c497c34e7f1c611bb356 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Wed, 6 Mar 2019 12:12:33 +0800 Subject: [PATCH] remotes: add distribution labels to blob data We can use cross repository push feature to reuse the existing blobs in the same registry. Before make push fast, we know where the blob comes from. Use the `containerd.io/distribution.source. = [,]` as label format. For example, the blob is downloaded by the docker.io/library/busybox:latest and the label will be containerd.io/distribution.source.docker.io = library/busybox If the blob is shared by different repos in the same registry, the repo name will be appended, like: containerd.io/distribution.source.docker.io = library/busybox,x/y NOTE: 1. no need to apply for legacy docker image schema1. 2. the concurrent fetch actions might miss some repo names in label, but it is ok. 3. it is optional. no need to add label if the engine only uses images not push. Signed-off-by: Wei Fu --- client.go | 4 ++ client_opts.go | 9 +++ client_test.go | 6 +- cmd/ctr/commands/content/fetch.go | 3 + image_test.go | 63 +++++++++++++++++ pull.go | 22 ++++-- remotes/docker/handler.go | 112 ++++++++++++++++++++++++++++++ remotes/docker/handler_test.go | 67 ++++++++++++++++++ remotes/handlers.go | 35 ++++++++++ 9 files changed, 315 insertions(+), 6 deletions(-) create mode 100644 remotes/docker/handler.go create mode 100644 remotes/docker/handler_test.go diff --git a/client.go b/client.go index 63e8c193a..39adfd9e7 100644 --- a/client.go +++ b/client.go @@ -300,6 +300,10 @@ type RemoteContext struct { // MaxConcurrentDownloads is the max concurrent content downloads for each pull. MaxConcurrentDownloads int + + // AppendDistributionSourceLabel allows fetcher to add distribute source + // label for each blob content, which doesn't work for legacy schema1. + AppendDistributionSourceLabel bool } func defaultRemoteContext() *RemoteContext { diff --git a/client_opts.go b/client_opts.go index 669829751..ed2ff05d5 100644 --- a/client_opts.go +++ b/client_opts.go @@ -194,3 +194,12 @@ func WithMaxConcurrentDownloads(max int) RemoteOpt { return nil } } + +// WithAppendDistributionSourceLabel allows fetcher to add distribute source +// label for each blob content, which doesn't work for legacy schema1. +func WithAppendDistributionSourceLabel() RemoteOpt { + return func(_ *Client, c *RemoteContext) error { + c.AppendDistributionSourceLabel = true + return nil + } +} diff --git a/client_test.go b/client_test.go index f50020fa4..821517e43 100644 --- a/client_test.go +++ b/client_test.go @@ -281,6 +281,10 @@ func TestImagePullSomePlatforms(t *testing.T) { count := 0 for _, manifest := range manifests { children, err := images.Children(ctx, cs, manifest) + if err != nil { + t.Fatal(err) + } + found := false for _, matcher := range m { if matcher.Match(*manifest.Platform) { @@ -302,8 +306,6 @@ func TestImagePullSomePlatforms(t *testing.T) { } ra.Close() } - } else if !found && err == nil { - t.Fatal("manifest should not have pulled children content") } } diff --git a/cmd/ctr/commands/content/fetch.go b/cmd/ctr/commands/content/fetch.go index a9b70026c..99d2206b9 100644 --- a/cmd/ctr/commands/content/fetch.go +++ b/cmd/ctr/commands/content/fetch.go @@ -149,10 +149,13 @@ func Fetch(ctx context.Context, client *containerd.Client, ref string, config *F containerd.WithResolver(config.Resolver), containerd.WithImageHandler(h), containerd.WithSchema1Conversion, + containerd.WithAppendDistributionSourceLabel(), } + for _, platform := range config.Platforms { opts = append(opts, containerd.WithPlatform(platform)) } + img, err := client.Fetch(pctx, ref, opts...) stopProgress() if err != nil { diff --git a/image_test.go b/image_test.go index d95f5105b..beb55fa14 100644 --- a/image_test.go +++ b/image_test.go @@ -17,10 +17,16 @@ package containerd import ( + "context" + "fmt" "runtime" + "strings" "testing" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/platforms" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) func TestImageIsUnpacked(t *testing.T) { @@ -72,3 +78,60 @@ func TestImageIsUnpacked(t *testing.T) { t.Fatalf("image should be unpacked") } } + +func TestImagePullWithDistSourceLabel(t *testing.T) { + var ( + source = "docker.io" + repoName = "library/busybox" + tag = "latest" + ) + + ctx, cancel := testContext() + defer cancel() + + client, err := newClient(t, address) + if err != nil { + t.Fatal(err) + } + defer client.Close() + + imageName := fmt.Sprintf("%s/%s:%s", source, repoName, tag) + pMatcher := platforms.Default() + + // pull content without unpack and add distribution source label + image, err := client.Pull(ctx, imageName, + WithPlatformMatcher(pMatcher), + WithAppendDistributionSourceLabel()) + if err != nil { + t.Fatal(err) + } + defer client.ImageService().Delete(ctx, imageName) + + cs := client.ContentStore() + key := fmt.Sprintf("containerd.io/distribution.source.%s", source) + + // only check the target platform + childrenHandler := images.FilterPlatforms(images.ChildrenHandler(cs), pMatcher) + + checkLabelHandler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + children, err := childrenHandler(ctx, desc) + if err != nil { + return nil, err + } + + info, err := cs.Info(ctx, desc.Digest) + if err != nil { + return nil, err + } + + // check the label + if got := info.Labels[key]; !strings.Contains(got, repoName) { + return nil, fmt.Errorf("expected to have %s repo name in label, but got %s", repoName, got) + } + return children, nil + } + + if err := images.Dispatch(ctx, images.HandlerFunc(checkLabelHandler), nil, image.Target()); err != nil { + t.Fatal(err) + } +} diff --git a/pull.go b/pull.go index f938ccc3b..693dcafe1 100644 --- a/pull.go +++ b/pull.go @@ -112,8 +112,9 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim childrenHandler := images.ChildrenHandler(store) // Set any children labels for that content childrenHandler = images.SetChildrenLabels(store, childrenHandler) - // Filter children by platforms - childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher) + // Filter manifests by platforms but allow to handle manifest + // and configuration for not-target platforms + childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher) // Sort and limit manifests if a finite number is needed if limit > 0 { childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) @@ -130,11 +131,23 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim }, ) - handler = images.Handlers(append(rCtx.BaseHandlers, + handlers := append(rCtx.BaseHandlers, remotes.FetchHandler(store, fetcher), convertibleHandler, childrenHandler, - )...) + ) + + // append distribution source label to blob data + if rCtx.AppendDistributionSourceLabel { + appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref) + if err != nil { + return images.Image{}, err + } + + handlers = append(handlers, appendDistSrcLabelHandler) + } + + handler = images.Handlers(handlers...) converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) { return docker.ConvertManifest(ctx, store, desc) @@ -148,6 +161,7 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim if rCtx.MaxConcurrentDownloads > 0 { limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads)) } + if err := images.Dispatch(ctx, handler, limiter, desc); err != nil { return images.Image{}, err } diff --git a/remotes/docker/handler.go b/remotes/docker/handler.go new file mode 100644 index 000000000..21872b5f6 --- /dev/null +++ b/remotes/docker/handler.go @@ -0,0 +1,112 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package docker + +import ( + "context" + "fmt" + "net/url" + "strings" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/labels" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/reference" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +var ( + // labelDistributionSource describes the source blob comes from. + labelDistributionSource = "containerd.io/distribution.source" +) + +// AppendDistributionSourceLabel updates the label of blob with distribution source. +func AppendDistributionSourceLabel(manager content.Manager, ref string) (images.HandlerFunc, error) { + refspec, err := reference.Parse(ref) + if err != nil { + return nil, err + } + + u, err := url.Parse("dummy://" + refspec.Locator) + if err != nil { + return nil, err + } + + source, repo := u.Hostname(), strings.TrimPrefix(u.Path, "/") + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + info, err := manager.Info(ctx, desc.Digest) + if err != nil { + return nil, err + } + + key := distributionSourceLabelKey(source) + + originLabel := "" + if info.Labels != nil { + originLabel = info.Labels[key] + } + value := appendDistributionSourceLabel(originLabel, repo) + + // The repo name has been limited under 256 and the distribution + // label might hit the limitation of label size, when blob data + // is used as the very, very common layer. + if err := labels.Validate(key, value); err != nil { + log.G(ctx).Warnf("skip to append distribution label: %s", err) + return nil, nil + } + + info = content.Info{ + Digest: desc.Digest, + Labels: map[string]string{ + key: value, + }, + } + _, err = manager.Update(ctx, info, fmt.Sprintf("labels.%s", key)) + return nil, err + }, nil +} + +func appendDistributionSourceLabel(originLabel, repo string) string { + repos := []string{} + if originLabel != "" { + repos = strings.Split(originLabel, ",") + } + repos = append(repos, repo) + + // use emtpy string to present duplicate items + for i := 1; i < len(repos); i++ { + tmp, j := repos[i], i-1 + for ; j >= 0 && repos[j] >= tmp; j-- { + if repos[j] == tmp { + tmp = "" + } + repos[j+1] = repos[j] + } + repos[j+1] = tmp + } + + i := 0 + for ; i < len(repos) && repos[i] == ""; i++ { + } + + return strings.Join(repos[i:], ",") +} + +func distributionSourceLabelKey(source string) string { + return fmt.Sprintf("%s.%s", labelDistributionSource, source) +} diff --git a/remotes/docker/handler_test.go b/remotes/docker/handler_test.go new file mode 100644 index 000000000..e0864123c --- /dev/null +++ b/remotes/docker/handler_test.go @@ -0,0 +1,67 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package docker + +import ( + "reflect" + "testing" +) + +func TestAppendDistributionLabel(t *testing.T) { + for _, tc := range []struct { + originLabel string + repo string + expected string + }{ + { + originLabel: "", + repo: "", + expected: "", + }, + { + originLabel: "", + repo: "library/busybox", + expected: "library/busybox", + }, + { + originLabel: "library/busybox", + repo: "library/busybox", + expected: "library/busybox", + }, + // remove the duplicate one in origin + { + originLabel: "library/busybox,library/redis,library/busybox", + repo: "library/alpine", + expected: "library/alpine,library/busybox,library/redis", + }, + // remove the empty repo + { + originLabel: "library/busybox,library/redis,library/busybox", + repo: "", + expected: "library/busybox,library/redis", + }, + { + originLabel: "library/busybox,library/redis,library/busybox", + repo: "library/redis", + expected: "library/busybox,library/redis", + }, + } { + if got := appendDistributionSourceLabel(tc.originLabel, tc.repo); !reflect.DeepEqual(got, tc.expected) { + t.Fatalf("expected %v, but got %v", tc.expected, got) + } + } +} diff --git a/remotes/handlers.go b/remotes/handlers.go index 56d4c5081..0ee56c887 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -206,3 +206,38 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr return nil } + +// FilterManifestByPlatformHandler allows Handler to handle non-target +// platform's manifest and configuration data. +func FilterManifestByPlatformHandler(f images.HandlerFunc, m platforms.Matcher) images.HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + children, err := f(ctx, desc) + if err != nil { + return nil, err + } + + // no platform information + if desc.Platform == nil || m == nil { + return children, nil + } + + var descs []ocispec.Descriptor + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest: + if m.Match(*desc.Platform) { + descs = children + } else { + for _, child := range children { + if child.MediaType == images.MediaTypeDockerSchema2Config || + child.MediaType == ocispec.MediaTypeImageConfig { + + descs = append(descs, child) + } + } + } + default: + descs = children + } + return descs, nil + } +}