307 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			307 lines
		
	
	
		
			7.3 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package main
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"io"
 | 
						|
	"os"
 | 
						|
	"sync"
 | 
						|
	"text/tabwriter"
 | 
						|
	"time"
 | 
						|
 | 
						|
	"github.com/containerd/containerd"
 | 
						|
	"github.com/containerd/containerd/content"
 | 
						|
	"github.com/containerd/containerd/images"
 | 
						|
	"github.com/containerd/containerd/log"
 | 
						|
	"github.com/containerd/containerd/progress"
 | 
						|
	"github.com/containerd/containerd/remotes"
 | 
						|
	digest "github.com/opencontainers/go-digest"
 | 
						|
	ocispec "github.com/opencontainers/image-spec/specs-go/v1"
 | 
						|
	"github.com/urfave/cli"
 | 
						|
)
 | 
						|
 | 
						|
var fetchCommand = cli.Command{
 | 
						|
	Name:      "fetch",
 | 
						|
	Usage:     "fetch all content for an image into containerd",
 | 
						|
	ArgsUsage: "[flags] <remote> <object>",
 | 
						|
	Description: `Fetch an image into containerd.
 | 
						|
	
 | 
						|
This command ensures that containerd has all the necessary resources to build
 | 
						|
an image's rootfs and convert the configuration to a runtime format supported
 | 
						|
by containerd.
 | 
						|
 | 
						|
This command uses the same syntax, of remote and object, as 'dist
 | 
						|
fetch-object'. We may want to make this nicer, but agnostism is preferred for
 | 
						|
the moment.
 | 
						|
 | 
						|
Right now, the responsibility of the daemon and the cli aren't quite clear. Do
 | 
						|
not use this implementation as a guide. The end goal should be having metadata,
 | 
						|
content and snapshots ready for a direct use via the 'ctr run'.
 | 
						|
 | 
						|
Most of this is experimental and there are few leaps to make this work.`,
 | 
						|
	Flags: registryFlags,
 | 
						|
	Action: func(clicontext *cli.Context) error {
 | 
						|
		var (
 | 
						|
			ref = clicontext.Args().First()
 | 
						|
		)
 | 
						|
		ctx, cancel := appContext(clicontext)
 | 
						|
		defer cancel()
 | 
						|
 | 
						|
		_, err := fetch(ctx, ref, clicontext)
 | 
						|
		return err
 | 
						|
	},
 | 
						|
}
 | 
						|
 | 
						|
func fetch(ctx context.Context, ref string, clicontext *cli.Context) (containerd.Image, error) {
 | 
						|
	client, err := getClient(clicontext)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	resolver, err := getResolver(ctx, clicontext)
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	ongoing := newJobs(ref)
 | 
						|
 | 
						|
	pctx, stopProgress := context.WithCancel(ctx)
 | 
						|
	progress := make(chan struct{})
 | 
						|
 | 
						|
	go func() {
 | 
						|
		showProgress(pctx, ongoing, client.ContentStore(), os.Stdout)
 | 
						|
		close(progress)
 | 
						|
	}()
 | 
						|
 | 
						|
	h := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
 | 
						|
		if desc.MediaType != images.MediaTypeDockerSchema1Manifest {
 | 
						|
			ongoing.add(desc)
 | 
						|
		}
 | 
						|
		return nil, nil
 | 
						|
	})
 | 
						|
 | 
						|
	log.G(pctx).WithField("image", ref).Debug("fetching")
 | 
						|
 | 
						|
	img, err := client.Pull(pctx, ref, containerd.WithResolver(resolver), containerd.WithImageHandler(h), containerd.WithSchema1Conversion)
 | 
						|
	stopProgress()
 | 
						|
	if err != nil {
 | 
						|
		return nil, err
 | 
						|
	}
 | 
						|
 | 
						|
	<-progress
 | 
						|
	return img, nil
 | 
						|
}
 | 
						|
 | 
						|
func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.Writer) {
 | 
						|
	var (
 | 
						|
		ticker   = time.NewTicker(100 * time.Millisecond)
 | 
						|
		fw       = progress.NewWriter(out)
 | 
						|
		start    = time.Now()
 | 
						|
		statuses = map[string]statusInfo{}
 | 
						|
		done     bool
 | 
						|
	)
 | 
						|
	defer ticker.Stop()
 | 
						|
 | 
						|
outer:
 | 
						|
	for {
 | 
						|
		select {
 | 
						|
		case <-ticker.C:
 | 
						|
			fw.Flush()
 | 
						|
 | 
						|
			tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
 | 
						|
 | 
						|
			resolved := "resolved"
 | 
						|
			if !ongoing.isResolved() {
 | 
						|
				resolved = "resolving"
 | 
						|
			}
 | 
						|
			statuses[ongoing.name] = statusInfo{
 | 
						|
				Ref:    ongoing.name,
 | 
						|
				Status: resolved,
 | 
						|
			}
 | 
						|
			keys := []string{ongoing.name}
 | 
						|
 | 
						|
			activeSeen := map[string]struct{}{}
 | 
						|
			if !done {
 | 
						|
				active, err := cs.Status(ctx, "")
 | 
						|
				if err != nil {
 | 
						|
					log.G(ctx).WithError(err).Error("active check failed")
 | 
						|
					continue
 | 
						|
				}
 | 
						|
				// update status of active entries!
 | 
						|
				for _, active := range active {
 | 
						|
					statuses[active.Ref] = statusInfo{
 | 
						|
						Ref:       active.Ref,
 | 
						|
						Status:    "downloading",
 | 
						|
						Offset:    active.Offset,
 | 
						|
						Total:     active.Total,
 | 
						|
						StartedAt: active.StartedAt,
 | 
						|
						UpdatedAt: active.UpdatedAt,
 | 
						|
					}
 | 
						|
					activeSeen[active.Ref] = struct{}{}
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			// now, update the items in jobs that are not in active
 | 
						|
			for _, j := range ongoing.jobs() {
 | 
						|
				key := remotes.MakeRefKey(ctx, j)
 | 
						|
				keys = append(keys, key)
 | 
						|
				if _, ok := activeSeen[key]; ok {
 | 
						|
					continue
 | 
						|
				}
 | 
						|
 | 
						|
				status, ok := statuses[key]
 | 
						|
				if !done && (!ok || status.Status == "downloading") {
 | 
						|
					info, err := cs.Info(ctx, j.Digest)
 | 
						|
					if err != nil {
 | 
						|
						if !content.IsNotFound(err) {
 | 
						|
							log.G(ctx).WithError(err).Errorf("failed to get content info")
 | 
						|
							continue outer
 | 
						|
						} else {
 | 
						|
							statuses[key] = statusInfo{
 | 
						|
								Ref:    key,
 | 
						|
								Status: "waiting",
 | 
						|
							}
 | 
						|
						}
 | 
						|
					} else if info.CommittedAt.After(start) {
 | 
						|
						statuses[key] = statusInfo{
 | 
						|
							Ref:       key,
 | 
						|
							Status:    "done",
 | 
						|
							Offset:    info.Size,
 | 
						|
							Total:     info.Size,
 | 
						|
							UpdatedAt: info.CommittedAt,
 | 
						|
						}
 | 
						|
					} else {
 | 
						|
						statuses[key] = statusInfo{
 | 
						|
							Ref:    key,
 | 
						|
							Status: "exists",
 | 
						|
						}
 | 
						|
					}
 | 
						|
				} else if done {
 | 
						|
					if ok {
 | 
						|
						if status.Status != "done" && status.Status != "exists" {
 | 
						|
							status.Status = "done"
 | 
						|
							statuses[key] = status
 | 
						|
						}
 | 
						|
					} else {
 | 
						|
						statuses[key] = statusInfo{
 | 
						|
							Ref:    key,
 | 
						|
							Status: "done",
 | 
						|
						}
 | 
						|
					}
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			var ordered []statusInfo
 | 
						|
			for _, key := range keys {
 | 
						|
				ordered = append(ordered, statuses[key])
 | 
						|
			}
 | 
						|
 | 
						|
			display(tw, ordered, start)
 | 
						|
			tw.Flush()
 | 
						|
 | 
						|
			if done {
 | 
						|
				fw.Flush()
 | 
						|
				return
 | 
						|
			}
 | 
						|
		case <-ctx.Done():
 | 
						|
			done = true // allow ui to update once more
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// jobs provides a way of identifying the download keys for a particular task
 | 
						|
// encountering during the pull walk.
 | 
						|
//
 | 
						|
// This is very minimal and will probably be replaced with something more
 | 
						|
// featured.
 | 
						|
type jobs struct {
 | 
						|
	name     string
 | 
						|
	added    map[digest.Digest]struct{}
 | 
						|
	descs    []ocispec.Descriptor
 | 
						|
	mu       sync.Mutex
 | 
						|
	resolved bool
 | 
						|
}
 | 
						|
 | 
						|
func newJobs(name string) *jobs {
 | 
						|
	return &jobs{
 | 
						|
		name:  name,
 | 
						|
		added: map[digest.Digest]struct{}{},
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
func (j *jobs) add(desc ocispec.Descriptor) {
 | 
						|
	j.mu.Lock()
 | 
						|
	defer j.mu.Unlock()
 | 
						|
	j.resolved = true
 | 
						|
 | 
						|
	if _, ok := j.added[desc.Digest]; ok {
 | 
						|
		return
 | 
						|
	}
 | 
						|
	j.descs = append(j.descs, desc)
 | 
						|
	j.added[desc.Digest] = struct{}{}
 | 
						|
}
 | 
						|
 | 
						|
func (j *jobs) jobs() []ocispec.Descriptor {
 | 
						|
	j.mu.Lock()
 | 
						|
	defer j.mu.Unlock()
 | 
						|
 | 
						|
	var descs []ocispec.Descriptor
 | 
						|
	return append(descs, j.descs...)
 | 
						|
}
 | 
						|
 | 
						|
func (j *jobs) isResolved() bool {
 | 
						|
	j.mu.Lock()
 | 
						|
	defer j.mu.Unlock()
 | 
						|
	return j.resolved
 | 
						|
}
 | 
						|
 | 
						|
type statusInfo struct {
 | 
						|
	Ref       string
 | 
						|
	Status    string
 | 
						|
	Offset    int64
 | 
						|
	Total     int64
 | 
						|
	StartedAt time.Time
 | 
						|
	UpdatedAt time.Time
 | 
						|
}
 | 
						|
 | 
						|
func display(w io.Writer, statuses []statusInfo, start time.Time) {
 | 
						|
	var total int64
 | 
						|
	for _, status := range statuses {
 | 
						|
		total += status.Offset
 | 
						|
		switch status.Status {
 | 
						|
		case "downloading", "uploading":
 | 
						|
			var bar progress.Bar
 | 
						|
			if status.Total > 0.0 {
 | 
						|
				bar = progress.Bar(float64(status.Offset) / float64(status.Total))
 | 
						|
			}
 | 
						|
			fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
 | 
						|
				status.Ref,
 | 
						|
				status.Status,
 | 
						|
				bar,
 | 
						|
				progress.Bytes(status.Offset), progress.Bytes(status.Total))
 | 
						|
		case "resolving", "waiting":
 | 
						|
			bar := progress.Bar(0.0)
 | 
						|
			fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
 | 
						|
				status.Ref,
 | 
						|
				status.Status,
 | 
						|
				bar)
 | 
						|
		default:
 | 
						|
			bar := progress.Bar(1.0)
 | 
						|
			fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
 | 
						|
				status.Ref,
 | 
						|
				status.Status,
 | 
						|
				bar)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	fmt.Fprintf(w, "elapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
 | 
						|
		time.Since(start).Seconds(),
 | 
						|
		// TODO(stevvooe): These calculations are actually way off.
 | 
						|
		// Need to account for previously downloaded data. These
 | 
						|
		// will basically be right for a download the first time
 | 
						|
		// but will be skewed if restarting, as it includes the
 | 
						|
		// data into the start time before.
 | 
						|
		progress.Bytes(total),
 | 
						|
		progress.NewBytesPerSecond(total, time.Since(start)))
 | 
						|
}
 |