containerd/cmd/ctr/commands/content/fetch.go
Wei Fu 506b815483 remotes: add distribution labels to blob data
We can use cross repository push feature to reuse the existing blobs in
the same registry. Before make push fast, we know where the blob comes
from.

Use the `containerd.io/distribution.source. = [,]` as label format. For
example, the blob is downloaded by the docker.io/library/busybox:latest
and the label will be

    containerd.io/distribution.source.docker.io = library/busybox

If the blob is shared by different repos in the same registry, the repo
name will be appended, like:

    containerd.io/distribution.source.docker.io = library/busybox,x/y

NOTE:
1. no need to apply for legacy docker image schema1.
2. the concurrent fetch actions might miss some repo names in label, but
it is ok.
3. it is optional. no need to add label if the engine only uses images
not push.

Signed-off-by: Wei Fu <fuweid89@gmail.com>
2019-03-12 13:42:54 +08:00

383 lines
9.7 KiB
Go

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package content
import (
"context"
"fmt"
"io"
"os"
"sync"
"text/tabwriter"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/progress"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
)
var fetchCommand = cli.Command{
Name: "fetch",
Usage: "fetch all content for an image into containerd",
ArgsUsage: "[flags] <remote> <object>",
Description: `Fetch an image into containerd.
This command ensures that containerd has all the necessary resources to build
an image's rootfs and convert the configuration to a runtime format supported
by containerd.
This command uses the same syntax, of remote and object, as 'ctr fetch-object'.
We may want to make this nicer, but agnostism is preferred for the moment.
Right now, the responsibility of the daemon and the cli aren't quite clear. Do
not use this implementation as a guide. The end goal should be having metadata,
content and snapshots ready for a direct use via the 'ctr run'.
Most of this is experimental and there are few leaps to make this work.`,
Flags: append(commands.RegistryFlags, commands.LabelFlag,
cli.StringSliceFlag{
Name: "platform",
Usage: "Pull content from a specific platform",
},
cli.BoolFlag{
Name: "all-platforms",
Usage: "pull content from all platforms",
},
),
Action: func(clicontext *cli.Context) error {
var (
ref = clicontext.Args().First()
)
client, ctx, cancel, err := commands.NewClient(clicontext)
if err != nil {
return err
}
defer cancel()
config, err := NewFetchConfig(ctx, clicontext)
if err != nil {
return err
}
_, err = Fetch(ctx, client, ref, config)
return err
},
}
// FetchConfig for content fetch
type FetchConfig struct {
// Resolver
Resolver remotes.Resolver
// ProgressOutput to display progress
ProgressOutput io.Writer
// Labels to set on the content
Labels []string
// Platforms to fetch
Platforms []string
}
// NewFetchConfig returns the default FetchConfig from cli flags
func NewFetchConfig(ctx context.Context, clicontext *cli.Context) (*FetchConfig, error) {
resolver, err := commands.GetResolver(ctx, clicontext)
if err != nil {
return nil, err
}
config := &FetchConfig{
Resolver: resolver,
Labels: clicontext.StringSlice("label"),
}
if !clicontext.GlobalBool("debug") {
config.ProgressOutput = os.Stdout
}
if !clicontext.Bool("all-platforms") {
p := clicontext.StringSlice("platform")
if len(p) == 0 {
p = append(p, platforms.DefaultString())
}
config.Platforms = p
}
return config, nil
}
// Fetch loads all resources into the content store and returns the image
func Fetch(ctx context.Context, client *containerd.Client, ref string, config *FetchConfig) (images.Image, error) {
ongoing := newJobs(ref)
pctx, stopProgress := context.WithCancel(ctx)
progress := make(chan struct{})
go func() {
if config.ProgressOutput != nil {
// no progress bar, because it hides some debug logs
showProgress(pctx, ongoing, client.ContentStore(), config.ProgressOutput)
}
close(progress)
}()
h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
ongoing.add(desc)
}
return nil, nil
})
log.G(pctx).WithField("image", ref).Debug("fetching")
labels := commands.LabelArgs(config.Labels)
opts := []containerd.RemoteOpt{
containerd.WithPullLabels(labels),
containerd.WithResolver(config.Resolver),
containerd.WithImageHandler(h),
containerd.WithSchema1Conversion,
containerd.WithAppendDistributionSourceLabel(),
}
for _, platform := range config.Platforms {
opts = append(opts, containerd.WithPlatform(platform))
}
img, err := client.Fetch(pctx, ref, opts...)
stopProgress()
if err != nil {
return images.Image{}, err
}
<-progress
return img, nil
}
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.Writer) {
var (
ticker = time.NewTicker(100 * time.Millisecond)
fw = progress.NewWriter(out)
start = time.Now()
statuses = map[string]StatusInfo{}
done bool
)
defer ticker.Stop()
outer:
for {
select {
case <-ticker.C:
fw.Flush()
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
resolved := "resolved"
if !ongoing.isResolved() {
resolved = "resolving"
}
statuses[ongoing.name] = StatusInfo{
Ref: ongoing.name,
Status: resolved,
}
keys := []string{ongoing.name}
activeSeen := map[string]struct{}{}
if !done {
active, err := cs.ListStatuses(ctx, "")
if err != nil {
log.G(ctx).WithError(err).Error("active check failed")
continue
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = StatusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
Total: active.Total,
StartedAt: active.StartedAt,
UpdatedAt: active.UpdatedAt,
}
activeSeen[active.Ref] = struct{}{}
}
}
// now, update the items in jobs that are not in active
for _, j := range ongoing.jobs() {
key := remotes.MakeRefKey(ctx, j)
keys = append(keys, key)
if _, ok := activeSeen[key]; ok {
continue
}
status, ok := statuses[key]
if !done && (!ok || status.Status == "downloading") {
info, err := cs.Info(ctx, j.Digest)
if err != nil {
if !errdefs.IsNotFound(err) {
log.G(ctx).WithError(err).Errorf("failed to get content info")
continue outer
} else {
statuses[key] = StatusInfo{
Ref: key,
Status: "waiting",
}
}
} else if info.CreatedAt.After(start) {
statuses[key] = StatusInfo{
Ref: key,
Status: "done",
Offset: info.Size,
Total: info.Size,
UpdatedAt: info.CreatedAt,
}
} else {
statuses[key] = StatusInfo{
Ref: key,
Status: "exists",
}
}
} else if done {
if ok {
if status.Status != "done" && status.Status != "exists" {
status.Status = "done"
statuses[key] = status
}
} else {
statuses[key] = StatusInfo{
Ref: key,
Status: "done",
}
}
}
}
var ordered []StatusInfo
for _, key := range keys {
ordered = append(ordered, statuses[key])
}
Display(tw, ordered, start)
tw.Flush()
if done {
fw.Flush()
return
}
case <-ctx.Done():
done = true // allow ui to update once more
}
}
}
// jobs provides a way of identifying the download keys for a particular task
// encountering during the pull walk.
//
// This is very minimal and will probably be replaced with something more
// featured.
type jobs struct {
name string
added map[digest.Digest]struct{}
descs []ocispec.Descriptor
mu sync.Mutex
resolved bool
}
func newJobs(name string) *jobs {
return &jobs{
name: name,
added: map[digest.Digest]struct{}{},
}
}
func (j *jobs) add(desc ocispec.Descriptor) {
j.mu.Lock()
defer j.mu.Unlock()
j.resolved = true
if _, ok := j.added[desc.Digest]; ok {
return
}
j.descs = append(j.descs, desc)
j.added[desc.Digest] = struct{}{}
}
func (j *jobs) jobs() []ocispec.Descriptor {
j.mu.Lock()
defer j.mu.Unlock()
var descs []ocispec.Descriptor
return append(descs, j.descs...)
}
func (j *jobs) isResolved() bool {
j.mu.Lock()
defer j.mu.Unlock()
return j.resolved
}
// StatusInfo holds the status info for an upload or download
type StatusInfo struct {
Ref string
Status string
Offset int64
Total int64
StartedAt time.Time
UpdatedAt time.Time
}
// Display pretty prints out the download or upload progress
func Display(w io.Writer, statuses []StatusInfo, start time.Time) {
var total int64
for _, status := range statuses {
total += status.Offset
switch status.Status {
case "downloading", "uploading":
var bar progress.Bar
if status.Total > 0.0 {
bar = progress.Bar(float64(status.Offset) / float64(status.Total))
}
fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
status.Ref,
status.Status,
bar,
progress.Bytes(status.Offset), progress.Bytes(status.Total))
case "resolving", "waiting":
bar := progress.Bar(0.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
default:
bar := progress.Bar(1.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Ref,
status.Status,
bar)
}
}
fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
}