From 0aca4bb1f2b91464dd6d2acd35de1f379171ab02 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 19 Aug 2022 15:33:13 -0700 Subject: [PATCH] Update ctr commands to use transfer interface Signed-off-by: Derek McGowan --- cmd/ctr/commands/images/pull.go | 383 ++++++++++++++++++++++++++++---- cmd/ctr/commands/images/push.go | 23 ++ cmd/ctr/commands/resolver.go | 48 ++++ 3 files changed, 407 insertions(+), 47 deletions(-) diff --git a/cmd/ctr/commands/images/pull.go b/cmd/ctr/commands/images/pull.go index 0bfe3f8f4..666aeb69d 100644 --- a/cmd/ctr/commands/images/pull.go +++ b/cmd/ctr/commands/images/pull.go @@ -17,9 +17,24 @@ package images import ( + "context" "fmt" + "io" + "os" + "strings" + "time" + "github.com/containerd/containerd" "github.com/containerd/containerd/cmd/ctr/commands" + "github.com/containerd/containerd/cmd/ctr/commands/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/progress" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/transfer/image" + "github.com/containerd/containerd/platforms" + "github.com/opencontainers/image-spec/identity" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/urfave/cli" ) @@ -58,6 +73,10 @@ command. As part of this process, we do the following: Name: "max-concurrent-downloads", Usage: "Set the max concurrent downloads for each pull", }, + cli.BoolFlag{ + Name: "local", + Usage: "Fetch content from local client rather than using transfer service", + }, ), Action: func(context *cli.Context) error { var ( @@ -73,6 +92,20 @@ command. As part of this process, we do the following: } defer cancel() + if !context.Bool("local") { + ch, err := commands.NewStaticCredentials(ctx, context, ref) + if err != nil { + return err + } + reg := image.NewOCIRegistry(ref, nil, ch) + is := image.NewStore(ref) + + pf, done := ProgressHandler(ctx, os.Stdout) + defer done() + + return client.Transfer(ctx, reg, is, transfer.WithProgress(pf)) + } + ctx, done, err := client.WithLease(ctx) if err != nil { return err @@ -80,63 +113,319 @@ command. As part of this process, we do the following: defer done(ctx) // TODO: Handle this locally via transfer config - //config, err := content.NewFetchConfig(ctx, context) - // if err != nil { - // return err - //} - - if err := client.Transfer(ctx, nil, nil); err != nil { + config, err := content.NewFetchConfig(ctx, context) + if err != nil { return err } - /* - img, err := content.Fetch(ctx, client, ref, config) + img, err := content.Fetch(ctx, client, ref, config) + if err != nil { + return err + } + + log.G(ctx).WithField("image", ref).Debug("unpacking") + + // TODO: Show unpack status + + var p []ocispec.Platform + if context.Bool("all-platforms") { + p, err = images.Platforms(ctx, client.ContentStore(), img.Target) + if err != nil { + return fmt.Errorf("unable to resolve image platforms: %w", err) + } + } else { + for _, s := range context.StringSlice("platform") { + ps, err := platforms.Parse(s) + if err != nil { + return fmt.Errorf("unable to parse platform %s: %w", s, err) + } + p = append(p, ps) + } + } + if len(p) == 0 { + p = append(p, platforms.DefaultSpec()) + } + + start := time.Now() + for _, platform := range p { + fmt.Printf("unpacking %s %s...\n", platforms.Format(platform), img.Target.Digest) + i := containerd.NewImageWithPlatform(client, img, platforms.Only(platform)) + err = i.Unpack(ctx, context.String("snapshotter")) if err != nil { return err } - - log.G(ctx).WithField("image", ref).Debug("unpacking") - - // TODO: Show unpack status - - var p []ocispec.Platform - if context.Bool("all-platforms") { - p, err = images.Platforms(ctx, client.ContentStore(), img.Target) - if err != nil { - return fmt.Errorf("unable to resolve image platforms: %w", err) - } - } else { - for _, s := range context.StringSlice("platform") { - ps, err := platforms.Parse(s) - if err != nil { - return fmt.Errorf("unable to parse platform %s: %w", s, err) - } - p = append(p, ps) - } - } - if len(p) == 0 { - p = append(p, platforms.DefaultSpec()) - } - - start := time.Now() - for _, platform := range p { - fmt.Printf("unpacking %s %s...\n", platforms.Format(platform), img.Target.Digest) - i := containerd.NewImageWithPlatform(client, img, platforms.Only(platform)) - err = i.Unpack(ctx, context.String("snapshotter")) + if context.Bool("print-chainid") { + diffIDs, err := i.RootFS(ctx) if err != nil { return err } - if context.Bool("print-chainid") { - diffIDs, err := i.RootFS(ctx) - if err != nil { - return err - } - chainID := identity.ChainID(diffIDs).String() - fmt.Printf("image chain ID: %s\n", chainID) - } + chainID := identity.ChainID(diffIDs).String() + fmt.Printf("image chain ID: %s\n", chainID) } - fmt.Printf("done: %s\t\n", time.Since(start)) - */ + } + fmt.Printf("done: %s\t\n", time.Since(start)) return nil }, } + +type progressNode struct { + transfer.Progress + children []*progressNode + root bool +} + +// ProgressHandler continuously updates the output with job progress +// by checking status in the content store. +func ProgressHandler(ctx context.Context, out io.Writer) (transfer.ProgressFunc, func()) { + ctx, cancel := context.WithCancel(ctx) + var ( + fw = progress.NewWriter(out) + start = time.Now() + statuses = map[string]*progressNode{} + roots = []*progressNode{} + progress transfer.ProgressFunc + pc = make(chan transfer.Progress, 1) + status string + closeC = make(chan struct{}) + ) + + progress = func(p transfer.Progress) { + select { + case pc <- p: + case <-ctx.Done(): + } + } + + done := func() { + cancel() + <-closeC + } + go func() { + defer close(closeC) + for { + select { + case p := <-pc: + if p.Name == "" { + status = p.Event + continue + } + if node, ok := statuses[p.Name]; !ok { + node = &progressNode{ + Progress: p, + root: true, + } + + if len(p.Parents) == 0 { + roots = append(roots, node) + } else { + var parents []string + for _, parent := range p.Parents { + pStatus, ok := statuses[parent] + if ok { + parents = append(parents, parent) + pStatus.children = append(pStatus.children, node) + node.root = false + } + } + node.Progress.Parents = parents + if node.root { + roots = append(roots, node) + } + } + statuses[p.Name] = node + } else { + if len(node.Progress.Parents) != len(p.Parents) { + var parents []string + var removeRoot bool + for _, parent := range p.Parents { + pStatus, ok := statuses[parent] + if ok { + parents = append(parents, parent) + var found bool + for _, child := range pStatus.children { + + if child.Progress.Name == p.Name { + found = true + break + } + } + if !found { + pStatus.children = append(pStatus.children, node) + + } + if node.root { + removeRoot = true + } + node.root = false + } + } + p.Parents = parents + // Check if needs to remove from root + if removeRoot { + for i := range roots { + if roots[i] == node { + roots = append(roots[:i], roots[i+1:]...) + break + } + } + } + + } + node.Progress = p + } + + /* + all := make([]transfer.Progress, 0, len(statuses)) + for _, p := range statuses { + all = append(all, p.Progress) + } + sort.Slice(all, func(i, j int) bool { + return all[i].Name < all[j].Name + }) + Display(fw, status, all, start) + */ + DisplayHierarchy(fw, status, roots, start) + fw.Flush() + case <-ctx.Done(): + return + } + } + }() + + return progress, done +} + +func DisplayHierarchy(w io.Writer, status string, roots []*progressNode, start time.Time) { + total := displayNode(w, "", roots) + // Print the Status line + fmt.Fprintf(w, "%s\telapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n", + status, + time.Since(start).Seconds(), + // TODO(stevvooe): These calculations are actually way off. + // Need to account for previously downloaded data. These + // will basically be right for a download the first time + // but will be skewed if restarting, as it includes the + // data into the start time before. + progress.Bytes(total), + progress.NewBytesPerSecond(total, time.Since(start))) +} + +func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 { + var total int64 + for i, node := range nodes { + status := node.Progress + total += status.Progress + pf, cpf := prefixes(i, len(nodes)) + if node.root { + pf, cpf = "", "" + } + + name := prefix + pf + displayName(status.Name) + + switch status.Event { + case "downloading", "uploading": + var bar progress.Bar + if status.Total > 0.0 { + bar = progress.Bar(float64(status.Progress) / float64(status.Total)) + } + fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t%8.8s/%s\t\n", + name, + status.Event, + bar, + progress.Bytes(status.Progress), progress.Bytes(status.Total)) + case "resolving", "waiting": + bar := progress.Bar(0.0) + fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n", + name, + status.Event, + bar) + case "complete": + bar := progress.Bar(1.0) + fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n", + name, + status.Event, + bar) + default: + fmt.Fprintf(w, "%-40.40s\t%s\t\n", + name, + status.Event) + } + total += displayNode(w, prefix+cpf, node.children) + } + return total +} + +func prefixes(index, length int) (prefix string, childPrefix string) { + if index+1 == length { + prefix = "└──" + childPrefix = " " + } else { + prefix = "├──" + childPrefix = "│ " + } + return +} + +func displayName(name string) string { + parts := strings.Split(name, "-") + for i := range parts { + parts[i] = shortenName(parts[i]) + } + return strings.Join(parts, " ") +} + +func shortenName(name string) string { + if strings.HasPrefix(name, "sha256:") && len(name) == 71 { + return "(" + name[7:19] + ")" + } + return name +} + +// Display pretty prints out the download or upload progress +// Status tree +func Display(w io.Writer, status string, statuses []transfer.Progress, start time.Time) { + var total int64 + for _, status := range statuses { + total += status.Progress + switch status.Event { + case "downloading", "uploading": + var bar progress.Bar + if status.Total > 0.0 { + bar = progress.Bar(float64(status.Progress) / float64(status.Total)) + } + fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n", + status.Name, + status.Event, + bar, + progress.Bytes(status.Progress), progress.Bytes(status.Total)) + case "resolving", "waiting": + bar := progress.Bar(0.0) + fmt.Fprintf(w, "%s:\t%s\t%40r\t\n", + status.Name, + status.Event, + bar) + case "complete", "done": + bar := progress.Bar(1.0) + fmt.Fprintf(w, "%s:\t%s\t%40r\t\n", + status.Name, + status.Event, + bar) + default: + fmt.Fprintf(w, "%s:\t%s\t\n", + status.Name, + status.Event) + } + } + + // Print the Status line + fmt.Fprintf(w, "%s\telapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n", + status, + time.Since(start).Seconds(), + // TODO(stevvooe): These calculations are actually way off. + // Need to account for previously downloaded data. These + // will basically be right for a download the first time + // but will be skewed if restarting, as it includes the + // data into the start time before. + progress.Bytes(total), + progress.NewBytesPerSecond(total, time.Since(start))) +} diff --git a/cmd/ctr/commands/images/push.go b/cmd/ctr/commands/images/push.go index 4dc70b693..8937d2930 100644 --- a/cmd/ctr/commands/images/push.go +++ b/cmd/ctr/commands/images/push.go @@ -32,6 +32,8 @@ import ( "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/progress" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/pkg/transfer/image" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" @@ -68,6 +70,9 @@ var pushCommand = cli.Command{ }, cli.IntFlag{ Name: "max-concurrent-uploaded-layers", Usage: "set the max concurrent uploaded layers for each push", + }, cli.BoolFlag{ + Name: "local", + Usage: "Push content from local client rather than using transfer service", }, cli.BoolFlag{ Name: "allow-non-distributable-blobs", Usage: "allow pushing blobs that are marked as non-distributable", @@ -89,6 +94,24 @@ var pushCommand = cli.Command{ } defer cancel() + if !context.Bool("local") { + ch, err := commands.NewStaticCredentials(ctx, context, ref) + if err != nil { + return err + } + + if local == "" { + local = ref + } + reg := image.NewOCIRegistry(ref, nil, ch) + is := image.NewStore(local) + + pf, done := ProgressHandler(ctx, os.Stdout) + defer done() + + return client.Transfer(ctx, is, reg, transfer.WithProgress(pf)) + } + if manifest := context.String("manifest"); manifest != "" { desc.Digest, err = digest.Parse(manifest) if err != nil { diff --git a/cmd/ctr/commands/resolver.go b/cmd/ctr/commands/resolver.go index bd533e171..4b2aa4944 100644 --- a/cmd/ctr/commands/resolver.go +++ b/cmd/ctr/commands/resolver.go @@ -32,6 +32,7 @@ import ( "github.com/containerd/console" "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/transfer/image" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/containerd/containerd/remotes/docker/config" @@ -209,3 +210,50 @@ func NewDebugClientTrace(ctx gocontext.Context) *httptrace.ClientTrace { }, } } + +type staticCredentials struct { + ref string + username string + secret string +} + +// NewStaticCredentials gets credentials from passing in cli context +func NewStaticCredentials(ctx gocontext.Context, clicontext *cli.Context, ref string) (image.CredentialHelper, error) { + username := clicontext.String("user") + var secret string + if i := strings.IndexByte(username, ':'); i > 0 { + secret = username[i+1:] + username = username[0:i] + } + if username != "" { + if secret == "" { + fmt.Printf("Password: ") + + var err error + secret, err = passwordPrompt() + if err != nil { + return nil, err + } + + fmt.Print("\n") + } + } else if rt := clicontext.String("refresh"); rt != "" { + secret = rt + } + + return &staticCredentials{ + ref: ref, + username: username, + secret: secret, + }, nil +} + +func (sc *staticCredentials) GetCredentials(ctx gocontext.Context, ref, host string) (image.Credentials, error) { + if ref == sc.ref { + return image.Credentials{ + Username: sc.username, + Secret: sc.secret, + }, nil + } + return image.Credentials{}, nil +}