Merge pull request #3067 from fuweid/me-fetch-platforms
remotes: add distribution labels to blob data
This commit is contained in:
commit
8f63d2acdb
@ -300,6 +300,10 @@ type RemoteContext struct {
|
||||
|
||||
// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
|
||||
MaxConcurrentDownloads int
|
||||
|
||||
// AppendDistributionSourceLabel allows fetcher to add distribute source
|
||||
// label for each blob content, which doesn't work for legacy schema1.
|
||||
AppendDistributionSourceLabel bool
|
||||
}
|
||||
|
||||
func defaultRemoteContext() *RemoteContext {
|
||||
|
@ -194,3 +194,12 @@ func WithMaxConcurrentDownloads(max int) RemoteOpt {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAppendDistributionSourceLabel allows fetcher to add distribute source
|
||||
// label for each blob content, which doesn't work for legacy schema1.
|
||||
func WithAppendDistributionSourceLabel() RemoteOpt {
|
||||
return func(_ *Client, c *RemoteContext) error {
|
||||
c.AppendDistributionSourceLabel = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -281,6 +281,10 @@ func TestImagePullSomePlatforms(t *testing.T) {
|
||||
count := 0
|
||||
for _, manifest := range manifests {
|
||||
children, err := images.Children(ctx, cs, manifest)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, matcher := range m {
|
||||
if matcher.Match(*manifest.Platform) {
|
||||
@ -302,8 +306,6 @@ func TestImagePullSomePlatforms(t *testing.T) {
|
||||
}
|
||||
ra.Close()
|
||||
}
|
||||
} else if !found && err == nil {
|
||||
t.Fatal("manifest should not have pulled children content")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,10 +149,13 @@ func Fetch(ctx context.Context, client *containerd.Client, ref string, config *F
|
||||
containerd.WithResolver(config.Resolver),
|
||||
containerd.WithImageHandler(h),
|
||||
containerd.WithSchema1Conversion,
|
||||
containerd.WithAppendDistributionSourceLabel(),
|
||||
}
|
||||
|
||||
for _, platform := range config.Platforms {
|
||||
opts = append(opts, containerd.WithPlatform(platform))
|
||||
}
|
||||
|
||||
img, err := client.Fetch(pctx, ref, opts...)
|
||||
stopProgress()
|
||||
if err != nil {
|
||||
|
@ -17,10 +17,16 @@
|
||||
package containerd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/platforms"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
func TestImageIsUnpacked(t *testing.T) {
|
||||
@ -72,3 +78,60 @@ func TestImageIsUnpacked(t *testing.T) {
|
||||
t.Fatalf("image should be unpacked")
|
||||
}
|
||||
}
|
||||
|
||||
func TestImagePullWithDistSourceLabel(t *testing.T) {
|
||||
var (
|
||||
source = "docker.io"
|
||||
repoName = "library/busybox"
|
||||
tag = "latest"
|
||||
)
|
||||
|
||||
ctx, cancel := testContext()
|
||||
defer cancel()
|
||||
|
||||
client, err := newClient(t, address)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
imageName := fmt.Sprintf("%s/%s:%s", source, repoName, tag)
|
||||
pMatcher := platforms.Default()
|
||||
|
||||
// pull content without unpack and add distribution source label
|
||||
image, err := client.Pull(ctx, imageName,
|
||||
WithPlatformMatcher(pMatcher),
|
||||
WithAppendDistributionSourceLabel())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer client.ImageService().Delete(ctx, imageName)
|
||||
|
||||
cs := client.ContentStore()
|
||||
key := fmt.Sprintf("containerd.io/distribution.source.%s", source)
|
||||
|
||||
// only check the target platform
|
||||
childrenHandler := images.FilterPlatforms(images.ChildrenHandler(cs), pMatcher)
|
||||
|
||||
checkLabelHandler := func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
children, err := childrenHandler(ctx, desc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
info, err := cs.Info(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check the label
|
||||
if got := info.Labels[key]; !strings.Contains(got, repoName) {
|
||||
return nil, fmt.Errorf("expected to have %s repo name in label, but got %s", repoName, got)
|
||||
}
|
||||
return children, nil
|
||||
}
|
||||
|
||||
if err := images.Dispatch(ctx, images.HandlerFunc(checkLabelHandler), nil, image.Target()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
22
pull.go
22
pull.go
@ -112,8 +112,9 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
|
||||
childrenHandler := images.ChildrenHandler(store)
|
||||
// Set any children labels for that content
|
||||
childrenHandler = images.SetChildrenLabels(store, childrenHandler)
|
||||
// Filter children by platforms
|
||||
childrenHandler = images.FilterPlatforms(childrenHandler, rCtx.PlatformMatcher)
|
||||
// Filter manifests by platforms but allow to handle manifest
|
||||
// and configuration for not-target platforms
|
||||
childrenHandler = remotes.FilterManifestByPlatformHandler(childrenHandler, rCtx.PlatformMatcher)
|
||||
// Sort and limit manifests if a finite number is needed
|
||||
if limit > 0 {
|
||||
childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
|
||||
@ -130,11 +131,23 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
|
||||
},
|
||||
)
|
||||
|
||||
handler = images.Handlers(append(rCtx.BaseHandlers,
|
||||
handlers := append(rCtx.BaseHandlers,
|
||||
remotes.FetchHandler(store, fetcher),
|
||||
convertibleHandler,
|
||||
childrenHandler,
|
||||
)...)
|
||||
)
|
||||
|
||||
// append distribution source label to blob data
|
||||
if rCtx.AppendDistributionSourceLabel {
|
||||
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, ref)
|
||||
if err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
||||
handlers = append(handlers, appendDistSrcLabelHandler)
|
||||
}
|
||||
|
||||
handler = images.Handlers(handlers...)
|
||||
|
||||
converterFunc = func(ctx context.Context, desc ocispec.Descriptor) (ocispec.Descriptor, error) {
|
||||
return docker.ConvertManifest(ctx, store, desc)
|
||||
@ -148,6 +161,7 @@ func (c *Client) fetch(ctx context.Context, rCtx *RemoteContext, ref string, lim
|
||||
if rCtx.MaxConcurrentDownloads > 0 {
|
||||
limiter = semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
|
||||
}
|
||||
|
||||
if err := images.Dispatch(ctx, handler, limiter, desc); err != nil {
|
||||
return images.Image{}, err
|
||||
}
|
||||
|
112
remotes/docker/handler.go
Normal file
112
remotes/docker/handler.go
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package docker
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/labels"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/reference"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
)
|
||||
|
||||
var (
|
||||
// labelDistributionSource describes the source blob comes from.
|
||||
labelDistributionSource = "containerd.io/distribution.source"
|
||||
)
|
||||
|
||||
// AppendDistributionSourceLabel updates the label of blob with distribution source.
|
||||
func AppendDistributionSourceLabel(manager content.Manager, ref string) (images.HandlerFunc, error) {
|
||||
refspec, err := reference.Parse(ref)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u, err := url.Parse("dummy://" + refspec.Locator)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
source, repo := u.Hostname(), strings.TrimPrefix(u.Path, "/")
|
||||
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
info, err := manager.Info(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key := distributionSourceLabelKey(source)
|
||||
|
||||
originLabel := ""
|
||||
if info.Labels != nil {
|
||||
originLabel = info.Labels[key]
|
||||
}
|
||||
value := appendDistributionSourceLabel(originLabel, repo)
|
||||
|
||||
// The repo name has been limited under 256 and the distribution
|
||||
// label might hit the limitation of label size, when blob data
|
||||
// is used as the very, very common layer.
|
||||
if err := labels.Validate(key, value); err != nil {
|
||||
log.G(ctx).Warnf("skip to append distribution label: %s", err)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
info = content.Info{
|
||||
Digest: desc.Digest,
|
||||
Labels: map[string]string{
|
||||
key: value,
|
||||
},
|
||||
}
|
||||
_, err = manager.Update(ctx, info, fmt.Sprintf("labels.%s", key))
|
||||
return nil, err
|
||||
}, nil
|
||||
}
|
||||
|
||||
func appendDistributionSourceLabel(originLabel, repo string) string {
|
||||
repos := []string{}
|
||||
if originLabel != "" {
|
||||
repos = strings.Split(originLabel, ",")
|
||||
}
|
||||
repos = append(repos, repo)
|
||||
|
||||
// use emtpy string to present duplicate items
|
||||
for i := 1; i < len(repos); i++ {
|
||||
tmp, j := repos[i], i-1
|
||||
for ; j >= 0 && repos[j] >= tmp; j-- {
|
||||
if repos[j] == tmp {
|
||||
tmp = ""
|
||||
}
|
||||
repos[j+1] = repos[j]
|
||||
}
|
||||
repos[j+1] = tmp
|
||||
}
|
||||
|
||||
i := 0
|
||||
for ; i < len(repos) && repos[i] == ""; i++ {
|
||||
}
|
||||
|
||||
return strings.Join(repos[i:], ",")
|
||||
}
|
||||
|
||||
func distributionSourceLabelKey(source string) string {
|
||||
return fmt.Sprintf("%s.%s", labelDistributionSource, source)
|
||||
}
|
67
remotes/docker/handler_test.go
Normal file
67
remotes/docker/handler_test.go
Normal file
@ -0,0 +1,67 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package docker
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAppendDistributionLabel(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
originLabel string
|
||||
repo string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
originLabel: "",
|
||||
repo: "",
|
||||
expected: "",
|
||||
},
|
||||
{
|
||||
originLabel: "",
|
||||
repo: "library/busybox",
|
||||
expected: "library/busybox",
|
||||
},
|
||||
{
|
||||
originLabel: "library/busybox",
|
||||
repo: "library/busybox",
|
||||
expected: "library/busybox",
|
||||
},
|
||||
// remove the duplicate one in origin
|
||||
{
|
||||
originLabel: "library/busybox,library/redis,library/busybox",
|
||||
repo: "library/alpine",
|
||||
expected: "library/alpine,library/busybox,library/redis",
|
||||
},
|
||||
// remove the empty repo
|
||||
{
|
||||
originLabel: "library/busybox,library/redis,library/busybox",
|
||||
repo: "",
|
||||
expected: "library/busybox,library/redis",
|
||||
},
|
||||
{
|
||||
originLabel: "library/busybox,library/redis,library/busybox",
|
||||
repo: "library/redis",
|
||||
expected: "library/busybox,library/redis",
|
||||
},
|
||||
} {
|
||||
if got := appendDistributionSourceLabel(tc.originLabel, tc.repo); !reflect.DeepEqual(got, tc.expected) {
|
||||
t.Fatalf("expected %v, but got %v", tc.expected, got)
|
||||
}
|
||||
}
|
||||
}
|
@ -206,3 +206,38 @@ func PushContent(ctx context.Context, pusher Pusher, desc ocispec.Descriptor, pr
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// FilterManifestByPlatformHandler allows Handler to handle non-target
|
||||
// platform's manifest and configuration data.
|
||||
func FilterManifestByPlatformHandler(f images.HandlerFunc, m platforms.Matcher) images.HandlerFunc {
|
||||
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
children, err := f(ctx, desc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// no platform information
|
||||
if desc.Platform == nil || m == nil {
|
||||
return children, nil
|
||||
}
|
||||
|
||||
var descs []ocispec.Descriptor
|
||||
switch desc.MediaType {
|
||||
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
|
||||
if m.Match(*desc.Platform) {
|
||||
descs = children
|
||||
} else {
|
||||
for _, child := range children {
|
||||
if child.MediaType == images.MediaTypeDockerSchema2Config ||
|
||||
child.MediaType == ocispec.MediaTypeImageConfig {
|
||||
|
||||
descs = append(descs, child)
|
||||
}
|
||||
}
|
||||
}
|
||||
default:
|
||||
descs = children
|
||||
}
|
||||
return descs, nil
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user