
Schema 1 has been substantially deprecated since circa. 2017 in favor of Schema 2 introduced in Docker 1.10 (Feb 2016) and its successor OCI Image Spec v1, but we have not officially deprecated Schema 1. One of the reasons was that Quay did not support Schema 2 so far, but it is reported that Quay has been supporting Schema 2 since Feb 2020 (moby/buildkit issue 409). This PR deprecates pulling Schema 1 images but the feature will not be removed before containerd 2.0. Pushing Schema 1 images was never implemented in containerd (and its consumers such as BuildKit). Docker/Moby already disabled pushing Schema 1 images in Docker 20.10 (moby/moby PR 41295), but Docker/Moby has not yet disabled pulling Schema 1 as containerd has not yet deprecated Schema 1. (See the comments in moby/moby PR 42300.) Docker/Moby is expected to disable pulling Schema 1 images in future after this deprecation. Signed-off-by: Akihiro Suda <akihiro.suda.cz@hco.ntt.co.jp>
439 lines
12 KiB
Go
439 lines
12 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package content
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http/httptrace"
|
|
"os"
|
|
"sync"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd"
|
|
"github.com/containerd/containerd/cmd/ctr/commands"
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/pkg/progress"
|
|
"github.com/containerd/containerd/platforms"
|
|
"github.com/containerd/containerd/remotes"
|
|
"github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/urfave/cli"
|
|
)
|
|
|
|
var fetchCommand = cli.Command{
|
|
Name: "fetch",
|
|
Usage: "fetch all content for an image into containerd",
|
|
ArgsUsage: "[flags] <remote> <object>",
|
|
Description: `Fetch an image into containerd.
|
|
|
|
This command ensures that containerd has all the necessary resources to build
|
|
an image's rootfs and convert the configuration to a runtime format supported
|
|
by containerd.
|
|
|
|
This command uses the same syntax, of remote and object, as 'ctr fetch-object'.
|
|
We may want to make this nicer, but agnostism is preferred for the moment.
|
|
|
|
Right now, the responsibility of the daemon and the cli aren't quite clear. Do
|
|
not use this implementation as a guide. The end goal should be having metadata,
|
|
content and snapshots ready for a direct use via the 'ctr run'.
|
|
|
|
Most of this is experimental and there are few leaps to make this work.`,
|
|
Flags: append(commands.RegistryFlags, commands.LabelFlag,
|
|
cli.StringSliceFlag{
|
|
Name: "platform",
|
|
Usage: "Pull content from a specific platform",
|
|
},
|
|
cli.BoolFlag{
|
|
Name: "all-platforms",
|
|
Usage: "pull content from all platforms",
|
|
},
|
|
cli.BoolFlag{
|
|
Name: "all-metadata",
|
|
Usage: "Pull metadata for all platforms",
|
|
},
|
|
cli.BoolFlag{
|
|
Name: "metadata-only",
|
|
Usage: "Pull all metadata including manifests and configs",
|
|
},
|
|
),
|
|
Action: func(clicontext *cli.Context) error {
|
|
var (
|
|
ref = clicontext.Args().First()
|
|
)
|
|
client, ctx, cancel, err := commands.NewClient(clicontext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer cancel()
|
|
config, err := NewFetchConfig(ctx, clicontext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = Fetch(ctx, client, ref, config)
|
|
return err
|
|
},
|
|
}
|
|
|
|
// FetchConfig for content fetch
|
|
type FetchConfig struct {
|
|
// Resolver
|
|
Resolver remotes.Resolver
|
|
// ProgressOutput to display progress
|
|
ProgressOutput io.Writer
|
|
// Labels to set on the content
|
|
Labels []string
|
|
// PlatformMatcher matches platforms, supersedes Platforms
|
|
PlatformMatcher platforms.MatchComparer
|
|
// Platforms to fetch
|
|
Platforms []string
|
|
// Whether or not download all metadata
|
|
AllMetadata bool
|
|
// RemoteOpts to configure object resolutions and transfers with remote content providers
|
|
RemoteOpts []containerd.RemoteOpt
|
|
// TraceHTTP writes DNS and connection information to the log when dealing with a container registry
|
|
TraceHTTP bool
|
|
}
|
|
|
|
// NewFetchConfig returns the default FetchConfig from cli flags
|
|
func NewFetchConfig(ctx context.Context, clicontext *cli.Context) (*FetchConfig, error) {
|
|
resolver, err := commands.GetResolver(ctx, clicontext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
config := &FetchConfig{
|
|
Resolver: resolver,
|
|
Labels: clicontext.StringSlice("label"),
|
|
TraceHTTP: clicontext.Bool("http-trace"),
|
|
}
|
|
if !clicontext.GlobalBool("debug") {
|
|
config.ProgressOutput = os.Stdout
|
|
}
|
|
if !clicontext.Bool("all-platforms") {
|
|
p := clicontext.StringSlice("platform")
|
|
if len(p) == 0 {
|
|
p = append(p, platforms.DefaultString())
|
|
}
|
|
config.Platforms = p
|
|
}
|
|
|
|
if clicontext.Bool("metadata-only") {
|
|
config.AllMetadata = true
|
|
// Any with an empty set is None
|
|
config.PlatformMatcher = platforms.Any()
|
|
} else if clicontext.Bool("all-metadata") {
|
|
config.AllMetadata = true
|
|
}
|
|
|
|
if clicontext.IsSet("max-concurrent-downloads") {
|
|
mcd := clicontext.Int("max-concurrent-downloads")
|
|
config.RemoteOpts = append(config.RemoteOpts, containerd.WithMaxConcurrentDownloads(mcd))
|
|
}
|
|
|
|
if clicontext.IsSet("max-concurrent-uploaded-layers") {
|
|
mcu := clicontext.Int("max-concurrent-uploaded-layers")
|
|
config.RemoteOpts = append(config.RemoteOpts, containerd.WithMaxConcurrentUploadedLayers(mcu))
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
// Fetch loads all resources into the content store and returns the image
|
|
func Fetch(ctx context.Context, client *containerd.Client, ref string, config *FetchConfig) (images.Image, error) {
|
|
ongoing := NewJobs(ref)
|
|
|
|
if config.TraceHTTP {
|
|
ctx = httptrace.WithClientTrace(ctx, commands.NewDebugClientTrace(ctx))
|
|
}
|
|
|
|
pctx, stopProgress := context.WithCancel(ctx)
|
|
progress := make(chan struct{})
|
|
|
|
go func() {
|
|
if config.ProgressOutput != nil {
|
|
// no progress bar, because it hides some debug logs
|
|
ShowProgress(pctx, ongoing, client.ContentStore(), config.ProgressOutput)
|
|
}
|
|
close(progress)
|
|
}()
|
|
|
|
h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
|
|
ongoing.Add(desc)
|
|
}
|
|
return nil, nil
|
|
})
|
|
|
|
log.G(pctx).WithField("image", ref).Debug("fetching")
|
|
labels := commands.LabelArgs(config.Labels)
|
|
opts := []containerd.RemoteOpt{
|
|
containerd.WithPullLabels(labels),
|
|
containerd.WithResolver(config.Resolver),
|
|
containerd.WithImageHandler(h),
|
|
containerd.WithSchema1Conversion, //nolint:staticcheck // Ignore SA1019. Need to keep deprecated package for compatibility.
|
|
}
|
|
opts = append(opts, config.RemoteOpts...)
|
|
|
|
if config.AllMetadata {
|
|
opts = append(opts, containerd.WithAllMetadata())
|
|
}
|
|
|
|
if config.PlatformMatcher != nil {
|
|
opts = append(opts, containerd.WithPlatformMatcher(config.PlatformMatcher))
|
|
} else {
|
|
for _, platform := range config.Platforms {
|
|
opts = append(opts, containerd.WithPlatform(platform))
|
|
}
|
|
}
|
|
|
|
img, err := client.Fetch(pctx, ref, opts...)
|
|
stopProgress()
|
|
if err != nil {
|
|
return images.Image{}, err
|
|
}
|
|
|
|
<-progress
|
|
return img, nil
|
|
}
|
|
|
|
// ShowProgress continuously updates the output with job progress
|
|
// by checking status in the content store.
|
|
func ShowProgress(ctx context.Context, ongoing *Jobs, cs content.Store, out io.Writer) {
|
|
var (
|
|
ticker = time.NewTicker(100 * time.Millisecond)
|
|
fw = progress.NewWriter(out)
|
|
start = time.Now()
|
|
statuses = map[string]StatusInfo{}
|
|
done bool
|
|
)
|
|
defer ticker.Stop()
|
|
|
|
outer:
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
fw.Flush()
|
|
|
|
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
|
|
|
|
resolved := "resolved"
|
|
if !ongoing.IsResolved() {
|
|
resolved = "resolving"
|
|
}
|
|
statuses[ongoing.name] = StatusInfo{
|
|
Ref: ongoing.name,
|
|
Status: resolved,
|
|
}
|
|
keys := []string{ongoing.name}
|
|
|
|
activeSeen := map[string]struct{}{}
|
|
if !done {
|
|
active, err := cs.ListStatuses(ctx, "")
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("active check failed")
|
|
continue
|
|
}
|
|
// update status of active entries!
|
|
for _, active := range active {
|
|
statuses[active.Ref] = StatusInfo{
|
|
Ref: active.Ref,
|
|
Status: "downloading",
|
|
Offset: active.Offset,
|
|
Total: active.Total,
|
|
StartedAt: active.StartedAt,
|
|
UpdatedAt: active.UpdatedAt,
|
|
}
|
|
activeSeen[active.Ref] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// now, update the items in jobs that are not in active
|
|
for _, j := range ongoing.Jobs() {
|
|
key := remotes.MakeRefKey(ctx, j)
|
|
keys = append(keys, key)
|
|
if _, ok := activeSeen[key]; ok {
|
|
continue
|
|
}
|
|
|
|
status, ok := statuses[key]
|
|
if !done && (!ok || status.Status == "downloading") {
|
|
info, err := cs.Info(ctx, j.Digest)
|
|
if err != nil {
|
|
if !errdefs.IsNotFound(err) {
|
|
log.G(ctx).WithError(err).Error("failed to get content info")
|
|
continue outer
|
|
} else {
|
|
statuses[key] = StatusInfo{
|
|
Ref: key,
|
|
Status: "waiting",
|
|
}
|
|
}
|
|
} else if info.CreatedAt.After(start) {
|
|
statuses[key] = StatusInfo{
|
|
Ref: key,
|
|
Status: "done",
|
|
Offset: info.Size,
|
|
Total: info.Size,
|
|
UpdatedAt: info.CreatedAt,
|
|
}
|
|
} else {
|
|
statuses[key] = StatusInfo{
|
|
Ref: key,
|
|
Status: "exists",
|
|
}
|
|
}
|
|
} else if done {
|
|
if ok {
|
|
if status.Status != "done" && status.Status != "exists" {
|
|
status.Status = "done"
|
|
statuses[key] = status
|
|
}
|
|
} else {
|
|
statuses[key] = StatusInfo{
|
|
Ref: key,
|
|
Status: "done",
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var ordered []StatusInfo
|
|
for _, key := range keys {
|
|
ordered = append(ordered, statuses[key])
|
|
}
|
|
|
|
Display(tw, ordered, start)
|
|
tw.Flush()
|
|
|
|
if done {
|
|
fw.Flush()
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
done = true // allow ui to update once more
|
|
}
|
|
}
|
|
}
|
|
|
|
// Jobs provides a way of identifying the download keys for a particular task
|
|
// encountering during the pull walk.
|
|
//
|
|
// This is very minimal and will probably be replaced with something more
|
|
// featured.
|
|
type Jobs struct {
|
|
name string
|
|
added map[digest.Digest]struct{}
|
|
descs []ocispec.Descriptor
|
|
mu sync.Mutex
|
|
resolved bool
|
|
}
|
|
|
|
// NewJobs creates a new instance of the job status tracker
|
|
func NewJobs(name string) *Jobs {
|
|
return &Jobs{
|
|
name: name,
|
|
added: map[digest.Digest]struct{}{},
|
|
}
|
|
}
|
|
|
|
// Add adds a descriptor to be tracked
|
|
func (j *Jobs) Add(desc ocispec.Descriptor) {
|
|
j.mu.Lock()
|
|
defer j.mu.Unlock()
|
|
j.resolved = true
|
|
|
|
if _, ok := j.added[desc.Digest]; ok {
|
|
return
|
|
}
|
|
j.descs = append(j.descs, desc)
|
|
j.added[desc.Digest] = struct{}{}
|
|
}
|
|
|
|
// Jobs returns a list of all tracked descriptors
|
|
func (j *Jobs) Jobs() []ocispec.Descriptor {
|
|
j.mu.Lock()
|
|
defer j.mu.Unlock()
|
|
|
|
var descs []ocispec.Descriptor
|
|
return append(descs, j.descs...)
|
|
}
|
|
|
|
// IsResolved checks whether a descriptor has been resolved
|
|
func (j *Jobs) IsResolved() bool {
|
|
j.mu.Lock()
|
|
defer j.mu.Unlock()
|
|
return j.resolved
|
|
}
|
|
|
|
// StatusInfo holds the status info for an upload or download
|
|
type StatusInfo struct {
|
|
Ref string
|
|
Status string
|
|
Offset int64
|
|
Total int64
|
|
StartedAt time.Time
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
// Display pretty prints out the download or upload progress
|
|
func Display(w io.Writer, statuses []StatusInfo, start time.Time) {
|
|
var total int64
|
|
for _, status := range statuses {
|
|
total += status.Offset
|
|
switch status.Status {
|
|
case "downloading", "uploading":
|
|
var bar progress.Bar
|
|
if status.Total > 0.0 {
|
|
bar = progress.Bar(float64(status.Offset) / float64(status.Total))
|
|
}
|
|
fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
|
|
status.Ref,
|
|
status.Status,
|
|
bar,
|
|
progress.Bytes(status.Offset), progress.Bytes(status.Total))
|
|
case "resolving", "waiting":
|
|
bar := progress.Bar(0.0)
|
|
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
|
|
status.Ref,
|
|
status.Status,
|
|
bar)
|
|
default:
|
|
bar := progress.Bar(1.0)
|
|
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
|
|
status.Ref,
|
|
status.Status,
|
|
bar)
|
|
}
|
|
}
|
|
|
|
fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
|
|
time.Since(start).Seconds(),
|
|
// TODO(stevvooe): These calculations are actually way off.
|
|
// Need to account for previously downloaded data. These
|
|
// will basically be right for a download the first time
|
|
// but will be skewed if restarting, as it includes the
|
|
// data into the start time before.
|
|
progress.Bytes(total),
|
|
progress.NewBytesPerSecond(total, time.Since(start)))
|
|
}
|