Move transfer and unpack packages

Packages related to transfer and unpacking provide core interfaces which
use other core interfaces and part of common functionality.

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-02-07 22:40:15 -08:00
parent f5ed7b84e9
commit f46aea6187
35 changed files with 62 additions and 62 deletions

View File

@@ -0,0 +1,64 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/transfer"
)
func (ts *localTransferService) exportStream(ctx context.Context, ig transfer.ImageGetter, is transfer.ImageExporter, tops *transfer.Config) error {
ctx, done, err := ts.withLease(ctx)
if err != nil {
return err
}
defer done(ctx)
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "Exporting",
})
}
var imgs []images.Image
if il, ok := ig.(transfer.ImageLookup); ok {
imgs, err = il.Lookup(ctx, ts.images)
if err != nil {
return err
}
} else {
img, err := ig.Get(ctx, ts.images)
if err != nil {
return err
}
imgs = append(imgs, img)
}
err = is.Export(ctx, ts.content, imgs)
if err != nil {
return err
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "Completed export",
})
}
return nil
}

View File

@@ -0,0 +1,170 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"encoding/json"
"fmt"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/containerd/v2/core/unpack"
"github.com/containerd/errdefs"
"github.com/containerd/log"
)
func (ts *localTransferService) importStream(ctx context.Context, i transfer.ImageImporter, is transfer.ImageStorer, tops *transfer.Config) error {
ctx, done, err := ts.withLease(ctx)
if err != nil {
return err
}
defer done(ctx)
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "Importing",
})
}
index, err := i.Import(ctx, ts.content)
if err != nil {
return err
}
var (
descriptors []ocispec.Descriptor
handler images.Handler
unpacker *unpack.Unpacker
)
// If save index, add index
descriptors = append(descriptors, index)
var handlerFunc images.HandlerFunc = func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// Only save images at top level
if desc.Digest != index.Digest {
return images.Children(ctx, ts.content, desc)
}
p, err := content.ReadBlob(ctx, ts.content, desc)
if err != nil {
return nil, err
}
var idx ocispec.Index
if err := json.Unmarshal(p, &idx); err != nil {
return nil, err
}
for _, m := range idx.Manifests {
m.Annotations = mergeMap(m.Annotations, map[string]string{"io.containerd.import.ref-source": "annotation"})
descriptors = append(descriptors, m)
}
return idx.Manifests, nil
}
if f, ok := is.(transfer.ImageFilterer); ok {
handlerFunc = f.ImageFilter(handlerFunc, ts.content)
}
handler = images.Handlers(handlerFunc)
// First find suitable platforms to unpack into
// If image storer is also an unpacker type, i.e implemented UnpackPlatforms() func
if iu, ok := is.(transfer.ImageUnpacker); ok {
unpacks := iu.UnpackPlatforms()
if len(unpacks) > 0 {
uopts := []unpack.UnpackerOpt{}
for _, u := range unpacks {
matched, mu := getSupportedPlatform(u, ts.config.UnpackPlatforms)
if matched {
uopts = append(uopts, unpack.WithUnpackPlatform(mu))
}
}
if ts.config.DuplicationSuppressor != nil {
uopts = append(uopts, unpack.WithDuplicationSuppressor(ts.config.DuplicationSuppressor))
}
unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...)
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
}
handler = unpacker.Unpack(handler)
}
}
if err := images.WalkNotEmpty(ctx, handler, index); err != nil {
if unpacker != nil {
// wait for unpacker to cleanup
unpacker.Wait()
}
// TODO: Handle Not Empty as a special case on the input
return err
}
if unpacker != nil {
if _, err = unpacker.Wait(); err != nil {
return err
}
}
for _, desc := range descriptors {
desc := desc
imgs, err := is.Store(ctx, desc, ts.images)
if err != nil {
if errdefs.IsNotFound(err) {
log.G(ctx).Infof("No images store for %s", desc.Digest)
continue
}
return err
}
if tops.Progress != nil {
for _, img := range imgs {
tops.Progress(transfer.Progress{
Event: "saved",
Name: img.Name,
Desc: &desc,
})
}
}
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "Completed import",
})
}
return nil
}
func mergeMap(m1, m2 map[string]string) map[string]string {
merged := make(map[string]string, len(m1)+len(m2))
for k, v := range m1 {
merged[k] = v
}
for k, v := range m2 {
merged[k] = v
}
return merged
}

View File

@@ -0,0 +1,281 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"sort"
"sync"
"time"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/log"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
type ProgressTracker struct {
root string
transferState string
added chan jobUpdate
waitC chan struct{}
parents map[digest.Digest][]ocispec.Descriptor
parentL sync.Mutex
}
type jobState uint8
const (
jobAdded jobState = iota
jobInProgress
jobComplete
)
type jobStatus struct {
state jobState
name string
parents []string
progress int64
desc ocispec.Descriptor
}
type jobUpdate struct {
desc ocispec.Descriptor
exists bool
//children []ocispec.Descriptor
}
type ActiveJobs interface {
Status(string) (content.Status, bool)
}
type StatusTracker interface {
Active(context.Context, ...string) (ActiveJobs, error)
Check(context.Context, digest.Digest) (bool, error)
}
// NewProgressTracker tracks content download progress
func NewProgressTracker(root, transferState string) *ProgressTracker {
return &ProgressTracker{
root: root,
transferState: transferState,
added: make(chan jobUpdate, 1),
waitC: make(chan struct{}),
parents: map[digest.Digest][]ocispec.Descriptor{},
}
}
func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc, pt StatusTracker) {
defer close(j.waitC)
// Instead of ticker, just delay
jobs := map[digest.Digest]*jobStatus{}
tc := time.NewTicker(time.Millisecond * 300)
defer tc.Stop()
update := func() {
// TODO: Filter by references
active, err := pt.Active(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
}
for dgst, job := range jobs {
if job.state != jobComplete {
status, ok := active.Status(job.name)
if ok {
if status.Offset > job.progress {
pf(transfer.Progress{
Event: j.transferState,
Name: job.name,
Parents: job.parents,
Progress: status.Offset,
Total: status.Total,
Desc: &job.desc,
})
job.progress = status.Offset
job.state = jobInProgress
jobs[dgst] = job
}
} else {
ok, err := pt.Check(ctx, job.desc.Digest)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
} else if ok {
pf(transfer.Progress{
Event: "complete",
Name: job.name,
Parents: job.parents,
Progress: job.desc.Size,
Total: job.desc.Size,
Desc: &job.desc,
})
}
job.state = jobComplete
jobs[dgst] = job
}
}
}
}
for {
select {
case update := <-j.added:
job, ok := jobs[update.desc.Digest]
if !ok {
// Only captures the parents defined before,
// could handle parent updates in same thread
// if there is a synchronization issue
var parents []string
j.parentL.Lock()
for _, parent := range j.parents[update.desc.Digest] {
parents = append(parents, remotes.MakeRefKey(ctx, parent))
}
j.parentL.Unlock()
if len(parents) == 0 {
parents = []string{j.root}
}
name := remotes.MakeRefKey(ctx, update.desc)
job = &jobStatus{
state: jobAdded,
name: name,
parents: parents,
desc: update.desc,
}
jobs[update.desc.Digest] = job
pf(transfer.Progress{
Event: "waiting",
Name: name,
Parents: parents,
//Digest: desc.Digest.String(),
Progress: 0,
Total: update.desc.Size,
Desc: &job.desc,
})
}
if update.exists {
pf(transfer.Progress{
Event: "already exists",
Name: remotes.MakeRefKey(ctx, update.desc),
Progress: update.desc.Size,
Total: update.desc.Size,
Desc: &job.desc,
})
job.state = jobComplete
job.progress = job.desc.Size
}
case <-tc.C:
update()
// Next timer?
case <-ctx.Done():
update()
return
}
}
}
// Add adds a descriptor to be tracked
func (j *ProgressTracker) Add(desc ocispec.Descriptor) {
if j == nil {
return
}
j.added <- jobUpdate{
desc: desc,
}
}
func (j *ProgressTracker) MarkExists(desc ocispec.Descriptor) {
if j == nil {
return
}
j.added <- jobUpdate{
desc: desc,
exists: true,
}
}
// AddChildren adds hierarchy information
func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispec.Descriptor) {
if j == nil || len(children) == 0 {
return
}
j.parentL.Lock()
defer j.parentL.Unlock()
for _, child := range children {
j.parents[child.Digest] = append(j.parents[child.Digest], desc)
}
}
func (j *ProgressTracker) Wait() {
// timeout rather than rely on cancel
timeout := time.After(10 * time.Second)
select {
case <-timeout:
case <-j.waitC:
}
}
type contentActive struct {
active []content.Status
}
func (c *contentActive) Status(ref string) (content.Status, bool) {
idx := sort.Search(len(c.active), func(i int) bool { return c.active[i].Ref >= ref })
if idx < len(c.active) && c.active[idx].Ref == ref {
return c.active[idx], true
}
return content.Status{}, false
}
type contentStatusTracker struct {
cs content.Store
}
func NewContentStatusTracker(cs content.Store) StatusTracker {
return &contentStatusTracker{
cs: cs,
}
}
func (c *contentStatusTracker) Active(ctx context.Context, _ ...string) (ActiveJobs, error) {
active, err := c.cs.ListStatuses(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to list statuses for progress")
}
sort.Slice(active, func(i, j int) bool {
return active[i].Ref < active[j].Ref
})
return &contentActive{
active: active,
}, nil
}
func (c *contentStatusTracker) Check(ctx context.Context, dgst digest.Digest) (bool, error) {
_, err := c.cs.Info(ctx, dgst)
if err == nil {
return true, nil
}
return false, nil
}

295
core/transfer/local/pull.go Normal file
View File

@@ -0,0 +1,295 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"fmt"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/core/remotes/docker"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/containerd/v2/core/unpack"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/errdefs"
"github.com/containerd/log"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)
func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetcher, is transfer.ImageStorer, tops *transfer.Config) error {
ctx, done, err := ts.withLease(ctx)
if err != nil {
return err
}
defer done(ctx)
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Resolving from %s", ir),
})
}
name, desc, err := ir.Resolve(ctx)
if err != nil {
return fmt.Errorf("failed to resolve image: %w", err)
}
if desc.MediaType == images.MediaTypeDockerSchema1Manifest {
// Explicitly call out schema 1 as deprecated and not supported
return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument)
}
// Verify image before pulling.
for vfName, vf := range ts.verifiers {
log := log.G(ctx).WithFields(logrus.Fields{
"name": name,
"digest": desc.Digest.String(),
"verifier": vfName,
})
log.Debug("Verifying image pull")
jdg, err := vf.VerifyImage(ctx, name, desc)
if err != nil {
log.WithError(err).Error("No judgement received from verifier")
return fmt.Errorf("blocking pull of %v with digest %v: image verifier %v returned error: %w", name, desc.Digest.String(), vfName, err)
}
log = log.WithFields(logrus.Fields{
"ok": jdg.OK,
"reason": jdg.Reason,
})
if !jdg.OK {
log.Warn("Image verifier blocked pull")
return fmt.Errorf("image verifier %s blocked pull of %v with digest %v for reason: %v", vfName, name, desc.Digest.String(), jdg.Reason)
}
log.Debug("Image verifier allowed pull")
}
// TODO: Handle already exists
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Pulling from %s", ir),
})
tops.Progress(transfer.Progress{
Event: "fetching image content",
Name: name,
//Digest: img.Target.Digest.String(),
})
}
fetcher, err := ir.Fetcher(ctx, name)
if err != nil {
return fmt.Errorf("failed to get fetcher for %q: %w", name, err)
}
var (
handler images.Handler
baseHandlers []images.Handler
unpacker *unpack.Unpacker
// has a config media type bug (distribution#1622)
hasMediaTypeBug1622 bool
store = ts.content
progressTracker *ProgressTracker
)
ctx, cancel := context.WithCancel(ctx)
if tops.Progress != nil {
progressTracker = NewProgressTracker(name, "downloading") //Pass in first name as root
go progressTracker.HandleProgress(ctx, tops.Progress, NewContentStatusTracker(store))
defer progressTracker.Wait()
}
defer cancel()
// Get all the children for a descriptor
childrenHandler := images.ChildrenHandler(store)
if f, ok := is.(transfer.ImageFilterer); ok {
childrenHandler = f.ImageFilter(childrenHandler, store)
}
checkNeedsFix := images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// set to true if there is application/octet-stream media type
if desc.MediaType == docker.LegacyConfigMediaType {
hasMediaTypeBug1622 = true
}
return []ocispec.Descriptor{}, nil
},
)
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name)
if err != nil {
return err
}
// Set up baseHandlers from service configuration if present or create a new one
if ts.config.BaseHandlers != nil {
baseHandlers = ts.config.BaseHandlers
} else {
baseHandlers = []images.Handler{}
}
if tops.Progress != nil {
baseHandlers = append(baseHandlers, images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
progressTracker.Add(desc)
return []ocispec.Descriptor{}, nil
},
))
baseChildrenHandler := childrenHandler
childrenHandler = images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (children []ocispec.Descriptor, err error) {
children, err = baseChildrenHandler(ctx, desc)
if err != nil {
return
}
progressTracker.AddChildren(desc, children)
return
})
}
handler = images.Handlers(append(baseHandlers,
fetchHandler(store, fetcher, progressTracker),
checkNeedsFix,
childrenHandler, // List children to track hierarchy
appendDistSrcLabelHandler,
)...)
// First find suitable platforms to unpack into
// If image storer is also an unpacker type, i.e implemented UnpackPlatforms() func
if iu, ok := is.(transfer.ImageUnpacker); ok {
unpacks := iu.UnpackPlatforms()
if len(unpacks) > 0 {
uopts := []unpack.UnpackerOpt{}
// Only unpack if requested unpackconfig matches default/supported unpackconfigs
for _, u := range unpacks {
matched, mu := getSupportedPlatform(u, ts.config.UnpackPlatforms)
if matched {
uopts = append(uopts, unpack.WithUnpackPlatform(mu))
}
}
if ts.limiterD != nil {
uopts = append(uopts, unpack.WithLimiter(ts.limiterD))
}
if ts.config.DuplicationSuppressor != nil {
uopts = append(uopts, unpack.WithDuplicationSuppressor(ts.config.DuplicationSuppressor))
}
unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...)
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
}
handler = unpacker.Unpack(handler)
}
}
if err := images.Dispatch(ctx, handler, ts.limiterD, desc); err != nil {
if unpacker != nil {
// wait for unpacker to cleanup
unpacker.Wait()
}
return err
}
// NOTE(fuweid): unpacker defers blobs download. before create image
// record in ImageService, should wait for unpacking(including blobs
// download).
if unpacker != nil {
if _, err = unpacker.Wait(); err != nil {
return err
}
// TODO: Check results to make sure unpack was successful
}
if hasMediaTypeBug1622 {
if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil {
return err
}
}
imgs, err := is.Store(ctx, desc, ts.images)
if err != nil {
return err
}
if tops.Progress != nil {
for _, img := range imgs {
tops.Progress(transfer.Progress{
Event: "saved",
Name: img.Name,
})
}
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Completed pull from %s", ir),
})
}
return nil
}
func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *ProgressTracker) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(log.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
if desc.MediaType == images.MediaTypeDockerSchema1Manifest {
return nil, fmt.Errorf("%v not supported", desc.MediaType)
}
err := remotes.Fetch(ctx, ingester, fetcher, desc)
if errdefs.IsAlreadyExists(err) {
pt.MarkExists(desc)
return nil, nil
}
return nil, err
}
}
// getSupportedPlatform returns a matched platform comparing input UnpackConfiguration to the supported platform/snapshotter combinations
// If input platform didn't specify snapshotter, default will be used if there is a match on platform.
func getSupportedPlatform(uc transfer.UnpackConfiguration, supportedPlatforms []unpack.Platform) (bool, unpack.Platform) {
var u unpack.Platform
for _, sp := range supportedPlatforms {
// If both platform and snapshotter match, return the supportPlatform
// If platform matched and SnapshotterKey is empty, we assume client didn't pass SnapshotterKey
// use default Snapshotter
if sp.Platform.Match(uc.Platform) {
// Assume sp.SnapshotterKey is not empty
if uc.Snapshotter == sp.SnapshotterKey {
return true, sp
} else if uc.Snapshotter == "" && sp.SnapshotterKey == defaults.DefaultSnapshotter {
return true, sp
}
}
}
return false, u
}

View File

@@ -0,0 +1,153 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"testing"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/containerd/v2/core/unpack"
"github.com/containerd/containerd/v2/defaults"
"github.com/containerd/platforms"
)
func TestGetSupportedPlatform(t *testing.T) {
supportedPlatforms := []unpack.Platform{
{
Platform: platforms.OnlyStrict(platforms.MustParse("linux/amd64")),
SnapshotterKey: "native",
},
{
Platform: platforms.OnlyStrict(platforms.MustParse("linux/amd64")),
SnapshotterKey: "devmapper",
},
{
Platform: platforms.OnlyStrict(platforms.MustParse("linux/arm64")),
SnapshotterKey: "native",
},
{
Platform: platforms.OnlyStrict(platforms.MustParse("linux/arm")),
SnapshotterKey: "native",
},
{
Platform: platforms.DefaultStrict(),
SnapshotterKey: defaults.DefaultSnapshotter,
},
}
for _, testCase := range []struct {
// Name is the name of the test
Name string
// Input
UnpackConfig transfer.UnpackConfiguration
SupportedPlatforms []unpack.Platform
// Expected
Match bool
ExpectedPlatform transfer.UnpackConfiguration
}{
{
Name: "No match on input linux/arm64 and devmapper snapshotter",
UnpackConfig: transfer.UnpackConfiguration{
Platform: platforms.MustParse("linux/arm64"),
Snapshotter: "devmapper",
},
SupportedPlatforms: supportedPlatforms,
Match: false,
ExpectedPlatform: transfer.UnpackConfiguration{},
},
{
Name: "No match on input linux/386 and native snapshotter",
UnpackConfig: transfer.UnpackConfiguration{
Platform: platforms.MustParse("linux/386"),
Snapshotter: "native",
},
SupportedPlatforms: supportedPlatforms,
Match: false,
ExpectedPlatform: transfer.UnpackConfiguration{},
},
{
Name: "Match linux/amd64 and native snapshotter",
UnpackConfig: transfer.UnpackConfiguration{
Platform: platforms.MustParse("linux/amd64"),
Snapshotter: "native",
},
SupportedPlatforms: supportedPlatforms,
Match: true,
ExpectedPlatform: transfer.UnpackConfiguration{
Platform: platforms.MustParse("linux/amd64"),
Snapshotter: "native",
},
},
{
Name: "Match linux/arm64 and native snapshotter",
UnpackConfig: transfer.UnpackConfiguration{
Platform: platforms.MustParse("linux/arm64"),
Snapshotter: "native",
},
SupportedPlatforms: supportedPlatforms,
Match: true,
ExpectedPlatform: transfer.UnpackConfiguration{
Platform: platforms.MustParse("linux/arm64"),
Snapshotter: "native",
},
},
{
Name: "Default platform input only match with defaultSnapshotter",
UnpackConfig: transfer.UnpackConfiguration{
Platform: platforms.DefaultSpec(),
},
SupportedPlatforms: supportedPlatforms,
Match: true,
ExpectedPlatform: transfer.UnpackConfiguration{
Platform: platforms.DefaultSpec(),
Snapshotter: defaults.DefaultSnapshotter,
},
},
} {
testCase := testCase
t.Run(testCase.Name, func(t *testing.T) {
m, sp := getSupportedPlatform(testCase.UnpackConfig, testCase.SupportedPlatforms)
// Match result should match expected
if m != testCase.Match {
t.Fatalf("Expect match result %v, but got %v", testCase.Match, m)
}
// If match result is false, the Platform should be nil too
if !m && sp.Platform != nil {
t.Fatalf("Expect nil Platform when we don't have a match")
}
// Snapshotter should match, empty string can be compared too
if sp.SnapshotterKey != testCase.ExpectedPlatform.Snapshotter {
t.Fatalf("Expect SnapshotterKey %v, but got %v", testCase.ExpectedPlatform.Snapshotter, sp.SnapshotterKey)
}
// If the matched Platform is not nil, it should match the expected Platform
if sp.Platform != nil && !sp.Platform.Match(testCase.ExpectedPlatform.Platform) {
t.Fatalf("Expect Platform %v doesn't match", testCase.ExpectedPlatform.Platform)
}
// If the ExectedPlatform is not empty, the matched Platform shoule not be nil either
if sp.Platform == nil && testCase.ExpectedPlatform.Platform.OS != "" {
t.Fatalf("Expect Platform %v doesn't match", testCase.ExpectedPlatform.Platform)
}
})
}
}

268
core/transfer/local/push.go Normal file
View File

@@ -0,0 +1,268 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"fmt"
"sync"
"time"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/remotes"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/errdefs"
"github.com/containerd/platforms"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGetter, p transfer.ImagePusher, tops *transfer.Config) error {
/*
// TODO: Platform matching
if pushCtx.PlatformMatcher == nil {
if len(pushCtx.Platforms) > 0 {
ps, err := platforms.ParseAll(pushCtx.Platforms)
if err != nil {
return err
}
pushCtx.PlatformMatcher = platforms.Any(ps...)
} else {
pushCtx.PlatformMatcher = platforms.All
}
}
*/
matcher := platforms.All
// Filter push
img, err := ig.Get(ctx, ts.images)
if err != nil {
return err
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Pushing to %s", p),
})
tops.Progress(transfer.Progress{
Event: "pushing content",
Name: img.Name,
//Digest: img.Target.Digest.String(),
Desc: &img.Target,
})
}
var pusher remotes.Pusher
pusher, err = p.Pusher(ctx, img.Target)
if err != nil {
return err
}
var wrapper func(images.Handler) images.Handler
ctx, cancel := context.WithCancel(ctx)
if tops.Progress != nil {
progressTracker := NewProgressTracker(img.Name, "uploading") //Pass in first name as root
p := newProgressPusher(pusher, progressTracker)
go progressTracker.HandleProgress(ctx, tops.Progress, p)
defer progressTracker.Wait()
wrapper = p.WrapHandler
pusher = p
}
defer cancel()
// TODO: Add handler to track parents
/*
// TODO: Add handlers
if len(pushCtx.BaseHandlers) > 0 {
wrapper = func(h images.Handler) images.Handler {
h = images.Handlers(append(pushCtx.BaseHandlers, h)...)
if pushCtx.HandlerWrapper != nil {
h = pushCtx.HandlerWrapper(h)
}
return h
}
} else if pushCtx.HandlerWrapper != nil {
wrapper = pushCtx.HandlerWrapper
}
*/
if err := remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiterU, matcher, wrapper); err != nil {
return err
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "pushed content",
Name: img.Name,
//Digest: img.Target.Digest.String(),
Desc: &img.Target,
})
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Completed push to %s", p),
Desc: &img.Target,
})
}
return nil
}
type progressPusher struct {
remotes.Pusher
progress *ProgressTracker
status *pushStatus
}
type pushStatus struct {
l sync.Mutex
statuses map[string]content.Status
complete map[digest.Digest]struct{}
}
func newProgressPusher(pusher remotes.Pusher, progress *ProgressTracker) *progressPusher {
return &progressPusher{
Pusher: pusher,
progress: progress,
status: &pushStatus{
statuses: map[string]content.Status{},
complete: map[digest.Digest]struct{}{},
},
}
}
func (p *progressPusher) WrapHandler(h images.Handler) images.Handler {
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
p.progress.Add(desc)
subdescs, err = h.Handle(ctx, desc)
p.progress.AddChildren(desc, subdescs)
return
})
}
func (p *progressPusher) Push(ctx context.Context, d ocispec.Descriptor) (content.Writer, error) {
ref := remotes.MakeRefKey(ctx, d)
p.status.add(ref, d)
cw, err := p.Pusher.Push(ctx, d)
if err != nil {
if errdefs.IsAlreadyExists(err) {
p.progress.MarkExists(d)
p.status.markComplete(ref, d)
}
return nil, err
}
return &progressWriter{
Writer: cw,
ref: ref,
desc: d,
status: p.status,
progress: p.progress,
}, nil
}
func (ps *pushStatus) update(ref string, delta int) {
ps.l.Lock()
status, ok := ps.statuses[ref]
if ok {
if delta > 0 {
status.Offset += int64(delta)
} else if delta < 0 {
status.Offset = 0
}
ps.statuses[ref] = status
}
ps.l.Unlock()
}
func (ps *pushStatus) add(ref string, d ocispec.Descriptor) {
status := content.Status{
Ref: ref,
Offset: 0,
Total: d.Size,
StartedAt: time.Now(),
}
ps.l.Lock()
_, ok := ps.statuses[ref]
_, complete := ps.complete[d.Digest]
if !ok && !complete {
ps.statuses[ref] = status
}
ps.l.Unlock()
}
func (ps *pushStatus) markComplete(ref string, d ocispec.Descriptor) {
ps.l.Lock()
_, ok := ps.statuses[ref]
if ok {
delete(ps.statuses, ref)
}
ps.complete[d.Digest] = struct{}{}
ps.l.Unlock()
}
func (ps *pushStatus) Status(name string) (content.Status, bool) {
ps.l.Lock()
status, ok := ps.statuses[name]
ps.l.Unlock()
return status, ok
}
func (ps *pushStatus) Check(ctx context.Context, dgst digest.Digest) (bool, error) {
ps.l.Lock()
_, ok := ps.complete[dgst]
ps.l.Unlock()
return ok, nil
}
func (p *progressPusher) Active(ctx context.Context, _ ...string) (ActiveJobs, error) {
return p.status, nil
}
func (p *progressPusher) Check(ctx context.Context, dgst digest.Digest) (bool, error) {
return p.status.Check(ctx, dgst)
}
type progressWriter struct {
content.Writer
ref string
desc ocispec.Descriptor
status *pushStatus
progress *ProgressTracker
}
func (pw *progressWriter) Write(p []byte) (n int, err error) {
n, err = pw.Writer.Write(p)
if err != nil {
// TODO: Handle reset error to reset progress
return
}
pw.status.update(pw.ref, n)
return
}
func (pw *progressWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
err := pw.Writer.Commit(ctx, size, expected, opts...)
if err != nil {
if errdefs.IsAlreadyExists(err) {
pw.progress.MarkExists(pw.desc)
}
// TODO: Handle reset error to reset progress
}
pw.status.markComplete(pw.ref, pw.desc)
return err
}

View File

@@ -0,0 +1,39 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"github.com/containerd/containerd/v2/core/transfer"
)
func (ts *localTransferService) tag(ctx context.Context, ig transfer.ImageGetter, is transfer.ImageStorer, tops *transfer.Config) error {
ctx, done, err := ts.withLease(ctx)
if err != nil {
return err
}
defer done(ctx)
img, err := ig.Get(ctx, ts.images)
if err != nil {
return err
}
_, err = is.Store(ctx, img.Target, ts.images)
return err
}

View File

@@ -0,0 +1,187 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"fmt"
"io"
"time"
"github.com/containerd/typeurl/v2"
"golang.org/x/sync/semaphore"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/images"
"github.com/containerd/containerd/v2/core/leases"
"github.com/containerd/containerd/v2/core/transfer"
"github.com/containerd/containerd/v2/core/unpack"
"github.com/containerd/containerd/v2/internal/kmutex"
"github.com/containerd/containerd/v2/pkg/imageverifier"
"github.com/containerd/errdefs"
)
type localTransferService struct {
leases leases.Manager
content content.Store
images images.Store
verifiers map[string]imageverifier.ImageVerifier
// limiter for upload
limiterU *semaphore.Weighted
// limiter for download operation
limiterD *semaphore.Weighted
config TransferConfig
}
func NewTransferService(lm leases.Manager, cs content.Store, is images.Store, vfs map[string]imageverifier.ImageVerifier, tc *TransferConfig) transfer.Transferrer {
ts := &localTransferService{
leases: lm,
content: cs,
images: is,
verifiers: vfs,
config: *tc,
}
if tc.MaxConcurrentUploadedLayers > 0 {
ts.limiterU = semaphore.NewWeighted(int64(tc.MaxConcurrentUploadedLayers))
}
if tc.MaxConcurrentDownloads > 0 {
ts.limiterD = semaphore.NewWeighted(int64(tc.MaxConcurrentDownloads))
}
return ts
}
func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
topts := &transfer.Config{}
for _, opt := range opts {
opt(topts)
}
// Figure out matrix of whether source destination combination is supported
switch s := src.(type) {
case transfer.ImageFetcher:
switch d := dest.(type) {
case transfer.ImageStorer:
return ts.pull(ctx, s, d, topts)
}
case transfer.ImageGetter:
switch d := dest.(type) {
case transfer.ImagePusher:
return ts.push(ctx, s, d, topts)
case transfer.ImageExporter:
return ts.exportStream(ctx, s, d, topts)
case transfer.ImageStorer:
return ts.tag(ctx, s, d, topts)
}
case transfer.ImageImporter:
switch d := dest.(type) {
case transfer.ImageExportStreamer:
return ts.echo(ctx, s, d, topts)
case transfer.ImageStorer:
// TODO: verify imports with ImageVerifiers?
return ts.importStream(ctx, s, d, topts)
}
}
return fmt.Errorf("unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented)
}
func name(t interface{}) string {
switch s := t.(type) {
case fmt.Stringer:
return s.String()
case typeurl.Any:
return s.GetTypeUrl()
default:
return fmt.Sprintf("%T", t)
}
}
// echo is mostly used for testing, it implements an import->export which is
// a no-op which only roundtrips the bytes.
func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImporter, e transfer.ImageExportStreamer, tops *transfer.Config) error {
iis, ok := i.(transfer.ImageImportStreamer)
if !ok {
return fmt.Errorf("echo requires access to raw stream: %w", errdefs.ErrNotImplemented)
}
r, _, err := iis.ImportStream(ctx)
if err != nil {
return err
}
wc, _, err := e.ExportStream(ctx)
if err != nil {
return err
}
// TODO: Use fixed buffer? Send write progress?
_, err = io.Copy(wc, r)
if werr := wc.Close(); werr != nil && err == nil {
err = werr
}
return err
}
// WithLease attaches a lease on the context
func (ts *localTransferService) withLease(ctx context.Context, opts ...leases.Opt) (context.Context, func(context.Context) error, error) {
nop := func(context.Context) error { return nil }
_, ok := leases.FromContext(ctx)
if ok {
return ctx, nop, nil
}
ls := ts.leases
if len(opts) == 0 {
// Use default lease configuration if no options provided
opts = []leases.Opt{
leases.WithRandomID(),
leases.WithExpiration(24 * time.Hour),
}
}
l, err := ls.Create(ctx, opts...)
if err != nil {
return ctx, nop, err
}
ctx = leases.WithLease(ctx, l.ID)
return ctx, func(ctx context.Context) error {
return ls.Delete(ctx, l)
}, nil
}
type TransferConfig struct {
// MaxConcurrentDownloads is the max concurrent content downloads for pull.
MaxConcurrentDownloads int
// MaxConcurrentUploadedLayers is the max concurrent uploads for push
MaxConcurrentUploadedLayers int
// DuplicationSuppressor is used to make sure that there is only one
// in-flight fetch request or unpack handler for a given descriptor's
// digest or chain ID.
DuplicationSuppressor kmutex.KeyedLocker
// BaseHandlers are a set of handlers which get are called on dispatch.
// These handlers always get called before any operation specific
// handlers.
BaseHandlers []images.Handler
// UnpackPlatforms are used to specify supported combination of platforms and snapshotters
UnpackPlatforms []unpack.Platform
// RegistryConfigPath is a path to the root directory containing registry-specific configurations
RegistryConfigPath string
}