
Now that we have most of the services required for use with containerd, it was found that common patterns were used throughout services. By defining a central `errdefs` package, we ensure that services will map errors to and from grpc consistently and cleanly. One can decorate an error with as much context as necessary, using `pkg/errors` and still have the error mapped correctly via grpc. We make a few sacrifices. At this point, the common errors we use across the repository all map directly to grpc error codes. While this seems positively crazy, it actually works out quite well. The error conditions that were specific weren't super necessary and the ones that were necessary now simply have better context information. We lose the ability to add new codes, but this constraint may not be a bad thing. Effectively, as long as one uses the errors defined in `errdefs`, the error class will be mapped correctly across the grpc boundary and everything will be good. If you don't use those definitions, the error maps to "unknown" and the error message is preserved. Signed-off-by: Stephen J Day <stephen.day@docker.com>
308 lines
7.3 KiB
Go
308 lines
7.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
"github.com/containerd/containerd"
|
|
"github.com/containerd/containerd/content"
|
|
"github.com/containerd/containerd/errdefs"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/containerd/containerd/log"
|
|
"github.com/containerd/containerd/progress"
|
|
"github.com/containerd/containerd/remotes"
|
|
digest "github.com/opencontainers/go-digest"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/urfave/cli"
|
|
)
|
|
|
|
var fetchCommand = cli.Command{
|
|
Name: "fetch",
|
|
Usage: "fetch all content for an image into containerd",
|
|
ArgsUsage: "[flags] <remote> <object>",
|
|
Description: `Fetch an image into containerd.
|
|
|
|
This command ensures that containerd has all the necessary resources to build
|
|
an image's rootfs and convert the configuration to a runtime format supported
|
|
by containerd.
|
|
|
|
This command uses the same syntax, of remote and object, as 'dist
|
|
fetch-object'. We may want to make this nicer, but agnostism is preferred for
|
|
the moment.
|
|
|
|
Right now, the responsibility of the daemon and the cli aren't quite clear. Do
|
|
not use this implementation as a guide. The end goal should be having metadata,
|
|
content and snapshots ready for a direct use via the 'ctr run'.
|
|
|
|
Most of this is experimental and there are few leaps to make this work.`,
|
|
Flags: registryFlags,
|
|
Action: func(clicontext *cli.Context) error {
|
|
var (
|
|
ref = clicontext.Args().First()
|
|
)
|
|
ctx, cancel := appContext(clicontext)
|
|
defer cancel()
|
|
|
|
_, err := fetch(ctx, ref, clicontext)
|
|
return err
|
|
},
|
|
}
|
|
|
|
func fetch(ctx context.Context, ref string, clicontext *cli.Context) (containerd.Image, error) {
|
|
client, err := getClient(clicontext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resolver, err := getResolver(ctx, clicontext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ongoing := newJobs(ref)
|
|
|
|
pctx, stopProgress := context.WithCancel(ctx)
|
|
progress := make(chan struct{})
|
|
|
|
go func() {
|
|
showProgress(pctx, ongoing, client.ContentStore(), os.Stdout)
|
|
close(progress)
|
|
}()
|
|
|
|
h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
|
if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
|
|
ongoing.add(desc)
|
|
}
|
|
return nil, nil
|
|
})
|
|
|
|
log.G(pctx).WithField("image", ref).Debug("fetching")
|
|
|
|
img, err := client.Pull(pctx, ref, containerd.WithResolver(resolver), containerd.WithImageHandler(h), containerd.WithSchema1Conversion)
|
|
stopProgress()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
<-progress
|
|
return img, nil
|
|
}
|
|
|
|
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.Writer) {
|
|
var (
|
|
ticker = time.NewTicker(100 * time.Millisecond)
|
|
fw = progress.NewWriter(out)
|
|
start = time.Now()
|
|
statuses = map[string]statusInfo{}
|
|
done bool
|
|
)
|
|
defer ticker.Stop()
|
|
|
|
outer:
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
fw.Flush()
|
|
|
|
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
|
|
|
|
resolved := "resolved"
|
|
if !ongoing.isResolved() {
|
|
resolved = "resolving"
|
|
}
|
|
statuses[ongoing.name] = statusInfo{
|
|
Ref: ongoing.name,
|
|
Status: resolved,
|
|
}
|
|
keys := []string{ongoing.name}
|
|
|
|
activeSeen := map[string]struct{}{}
|
|
if !done {
|
|
active, err := cs.Status(ctx, "")
|
|
if err != nil {
|
|
log.G(ctx).WithError(err).Error("active check failed")
|
|
continue
|
|
}
|
|
// update status of active entries!
|
|
for _, active := range active {
|
|
statuses[active.Ref] = statusInfo{
|
|
Ref: active.Ref,
|
|
Status: "downloading",
|
|
Offset: active.Offset,
|
|
Total: active.Total,
|
|
StartedAt: active.StartedAt,
|
|
UpdatedAt: active.UpdatedAt,
|
|
}
|
|
activeSeen[active.Ref] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// now, update the items in jobs that are not in active
|
|
for _, j := range ongoing.jobs() {
|
|
key := remotes.MakeRefKey(ctx, j)
|
|
keys = append(keys, key)
|
|
if _, ok := activeSeen[key]; ok {
|
|
continue
|
|
}
|
|
|
|
status, ok := statuses[key]
|
|
if !done && (!ok || status.Status == "downloading") {
|
|
info, err := cs.Info(ctx, j.Digest)
|
|
if err != nil {
|
|
if !errdefs.IsNotFound(err) {
|
|
log.G(ctx).WithError(err).Errorf("failed to get content info")
|
|
continue outer
|
|
} else {
|
|
statuses[key] = statusInfo{
|
|
Ref: key,
|
|
Status: "waiting",
|
|
}
|
|
}
|
|
} else if info.CommittedAt.After(start) {
|
|
statuses[key] = statusInfo{
|
|
Ref: key,
|
|
Status: "done",
|
|
Offset: info.Size,
|
|
Total: info.Size,
|
|
UpdatedAt: info.CommittedAt,
|
|
}
|
|
} else {
|
|
statuses[key] = statusInfo{
|
|
Ref: key,
|
|
Status: "exists",
|
|
}
|
|
}
|
|
} else if done {
|
|
if ok {
|
|
if status.Status != "done" && status.Status != "exists" {
|
|
status.Status = "done"
|
|
statuses[key] = status
|
|
}
|
|
} else {
|
|
statuses[key] = statusInfo{
|
|
Ref: key,
|
|
Status: "done",
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
var ordered []statusInfo
|
|
for _, key := range keys {
|
|
ordered = append(ordered, statuses[key])
|
|
}
|
|
|
|
display(tw, ordered, start)
|
|
tw.Flush()
|
|
|
|
if done {
|
|
fw.Flush()
|
|
return
|
|
}
|
|
case <-ctx.Done():
|
|
done = true // allow ui to update once more
|
|
}
|
|
}
|
|
}
|
|
|
|
// jobs provides a way of identifying the download keys for a particular task
|
|
// encountering during the pull walk.
|
|
//
|
|
// This is very minimal and will probably be replaced with something more
|
|
// featured.
|
|
type jobs struct {
|
|
name string
|
|
added map[digest.Digest]struct{}
|
|
descs []ocispec.Descriptor
|
|
mu sync.Mutex
|
|
resolved bool
|
|
}
|
|
|
|
func newJobs(name string) *jobs {
|
|
return &jobs{
|
|
name: name,
|
|
added: map[digest.Digest]struct{}{},
|
|
}
|
|
}
|
|
|
|
func (j *jobs) add(desc ocispec.Descriptor) {
|
|
j.mu.Lock()
|
|
defer j.mu.Unlock()
|
|
j.resolved = true
|
|
|
|
if _, ok := j.added[desc.Digest]; ok {
|
|
return
|
|
}
|
|
j.descs = append(j.descs, desc)
|
|
j.added[desc.Digest] = struct{}{}
|
|
}
|
|
|
|
func (j *jobs) jobs() []ocispec.Descriptor {
|
|
j.mu.Lock()
|
|
defer j.mu.Unlock()
|
|
|
|
var descs []ocispec.Descriptor
|
|
return append(descs, j.descs...)
|
|
}
|
|
|
|
func (j *jobs) isResolved() bool {
|
|
j.mu.Lock()
|
|
defer j.mu.Unlock()
|
|
return j.resolved
|
|
}
|
|
|
|
type statusInfo struct {
|
|
Ref string
|
|
Status string
|
|
Offset int64
|
|
Total int64
|
|
StartedAt time.Time
|
|
UpdatedAt time.Time
|
|
}
|
|
|
|
func display(w io.Writer, statuses []statusInfo, start time.Time) {
|
|
var total int64
|
|
for _, status := range statuses {
|
|
total += status.Offset
|
|
switch status.Status {
|
|
case "downloading", "uploading":
|
|
var bar progress.Bar
|
|
if status.Total > 0.0 {
|
|
bar = progress.Bar(float64(status.Offset) / float64(status.Total))
|
|
}
|
|
fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
|
|
status.Ref,
|
|
status.Status,
|
|
bar,
|
|
progress.Bytes(status.Offset), progress.Bytes(status.Total))
|
|
case "resolving", "waiting":
|
|
bar := progress.Bar(0.0)
|
|
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
|
|
status.Ref,
|
|
status.Status,
|
|
bar)
|
|
default:
|
|
bar := progress.Bar(1.0)
|
|
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
|
|
status.Ref,
|
|
status.Status,
|
|
bar)
|
|
}
|
|
}
|
|
|
|
fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
|
|
time.Since(start).Seconds(),
|
|
// TODO(stevvooe): These calculations are actually way off.
|
|
// Need to account for previously downloaded data. These
|
|
// will basically be right for a download the first time
|
|
// but will be skewed if restarting, as it includes the
|
|
// data into the start time before.
|
|
progress.Bytes(total),
|
|
progress.NewBytesPerSecond(total, time.Since(start)))
|
|
}
|