Update ctr commands to use transfer interface

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-08-19 15:33:13 -07:00
parent 6b5df1ee16
commit 0aca4bb1f2
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
3 changed files with 407 additions and 47 deletions

View File

@ -17,9 +17,24 @@
package images
import (
"context"
"fmt"
"io"
"os"
"strings"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/cmd/ctr/commands/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/progress"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/transfer/image"
"github.com/containerd/containerd/platforms"
"github.com/opencontainers/image-spec/identity"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
)
@ -58,6 +73,10 @@ command. As part of this process, we do the following:
Name: "max-concurrent-downloads",
Usage: "Set the max concurrent downloads for each pull",
},
cli.BoolFlag{
Name: "local",
Usage: "Fetch content from local client rather than using transfer service",
},
),
Action: func(context *cli.Context) error {
var (
@ -73,6 +92,20 @@ command. As part of this process, we do the following:
}
defer cancel()
if !context.Bool("local") {
ch, err := commands.NewStaticCredentials(ctx, context, ref)
if err != nil {
return err
}
reg := image.NewOCIRegistry(ref, nil, ch)
is := image.NewStore(ref)
pf, done := ProgressHandler(ctx, os.Stdout)
defer done()
return client.Transfer(ctx, reg, is, transfer.WithProgress(pf))
}
ctx, done, err := client.WithLease(ctx)
if err != nil {
return err
@ -80,63 +113,319 @@ command. As part of this process, we do the following:
defer done(ctx)
// TODO: Handle this locally via transfer config
//config, err := content.NewFetchConfig(ctx, context)
// if err != nil {
// return err
//}
if err := client.Transfer(ctx, nil, nil); err != nil {
config, err := content.NewFetchConfig(ctx, context)
if err != nil {
return err
}
/*
img, err := content.Fetch(ctx, client, ref, config)
img, err := content.Fetch(ctx, client, ref, config)
if err != nil {
return err
}
log.G(ctx).WithField("image", ref).Debug("unpacking")
// TODO: Show unpack status
var p []ocispec.Platform
if context.Bool("all-platforms") {
p, err = images.Platforms(ctx, client.ContentStore(), img.Target)
if err != nil {
return fmt.Errorf("unable to resolve image platforms: %w", err)
}
} else {
for _, s := range context.StringSlice("platform") {
ps, err := platforms.Parse(s)
if err != nil {
return fmt.Errorf("unable to parse platform %s: %w", s, err)
}
p = append(p, ps)
}
}
if len(p) == 0 {
p = append(p, platforms.DefaultSpec())
}
start := time.Now()
for _, platform := range p {
fmt.Printf("unpacking %s %s...\n", platforms.Format(platform), img.Target.Digest)
i := containerd.NewImageWithPlatform(client, img, platforms.Only(platform))
err = i.Unpack(ctx, context.String("snapshotter"))
if err != nil {
return err
}
log.G(ctx).WithField("image", ref).Debug("unpacking")
// TODO: Show unpack status
var p []ocispec.Platform
if context.Bool("all-platforms") {
p, err = images.Platforms(ctx, client.ContentStore(), img.Target)
if err != nil {
return fmt.Errorf("unable to resolve image platforms: %w", err)
}
} else {
for _, s := range context.StringSlice("platform") {
ps, err := platforms.Parse(s)
if err != nil {
return fmt.Errorf("unable to parse platform %s: %w", s, err)
}
p = append(p, ps)
}
}
if len(p) == 0 {
p = append(p, platforms.DefaultSpec())
}
start := time.Now()
for _, platform := range p {
fmt.Printf("unpacking %s %s...\n", platforms.Format(platform), img.Target.Digest)
i := containerd.NewImageWithPlatform(client, img, platforms.Only(platform))
err = i.Unpack(ctx, context.String("snapshotter"))
if context.Bool("print-chainid") {
diffIDs, err := i.RootFS(ctx)
if err != nil {
return err
}
if context.Bool("print-chainid") {
diffIDs, err := i.RootFS(ctx)
if err != nil {
return err
}
chainID := identity.ChainID(diffIDs).String()
fmt.Printf("image chain ID: %s\n", chainID)
}
chainID := identity.ChainID(diffIDs).String()
fmt.Printf("image chain ID: %s\n", chainID)
}
fmt.Printf("done: %s\t\n", time.Since(start))
*/
}
fmt.Printf("done: %s\t\n", time.Since(start))
return nil
},
}
type progressNode struct {
transfer.Progress
children []*progressNode
root bool
}
// ProgressHandler continuously updates the output with job progress
// by checking status in the content store.
func ProgressHandler(ctx context.Context, out io.Writer) (transfer.ProgressFunc, func()) {
ctx, cancel := context.WithCancel(ctx)
var (
fw = progress.NewWriter(out)
start = time.Now()
statuses = map[string]*progressNode{}
roots = []*progressNode{}
progress transfer.ProgressFunc
pc = make(chan transfer.Progress, 1)
status string
closeC = make(chan struct{})
)
progress = func(p transfer.Progress) {
select {
case pc <- p:
case <-ctx.Done():
}
}
done := func() {
cancel()
<-closeC
}
go func() {
defer close(closeC)
for {
select {
case p := <-pc:
if p.Name == "" {
status = p.Event
continue
}
if node, ok := statuses[p.Name]; !ok {
node = &progressNode{
Progress: p,
root: true,
}
if len(p.Parents) == 0 {
roots = append(roots, node)
} else {
var parents []string
for _, parent := range p.Parents {
pStatus, ok := statuses[parent]
if ok {
parents = append(parents, parent)
pStatus.children = append(pStatus.children, node)
node.root = false
}
}
node.Progress.Parents = parents
if node.root {
roots = append(roots, node)
}
}
statuses[p.Name] = node
} else {
if len(node.Progress.Parents) != len(p.Parents) {
var parents []string
var removeRoot bool
for _, parent := range p.Parents {
pStatus, ok := statuses[parent]
if ok {
parents = append(parents, parent)
var found bool
for _, child := range pStatus.children {
if child.Progress.Name == p.Name {
found = true
break
}
}
if !found {
pStatus.children = append(pStatus.children, node)
}
if node.root {
removeRoot = true
}
node.root = false
}
}
p.Parents = parents
// Check if needs to remove from root
if removeRoot {
for i := range roots {
if roots[i] == node {
roots = append(roots[:i], roots[i+1:]...)
break
}
}
}
}
node.Progress = p
}
/*
all := make([]transfer.Progress, 0, len(statuses))
for _, p := range statuses {
all = append(all, p.Progress)
}
sort.Slice(all, func(i, j int) bool {
return all[i].Name < all[j].Name
})
Display(fw, status, all, start)
*/
DisplayHierarchy(fw, status, roots, start)
fw.Flush()
case <-ctx.Done():
return
}
}
}()
return progress, done
}
func DisplayHierarchy(w io.Writer, status string, roots []*progressNode, start time.Time) {
total := displayNode(w, "", roots)
// Print the Status line
fmt.Fprintf(w, "%s\telapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
status,
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
}
func displayNode(w io.Writer, prefix string, nodes []*progressNode) int64 {
var total int64
for i, node := range nodes {
status := node.Progress
total += status.Progress
pf, cpf := prefixes(i, len(nodes))
if node.root {
pf, cpf = "", ""
}
name := prefix + pf + displayName(status.Name)
switch status.Event {
case "downloading", "uploading":
var bar progress.Bar
if status.Total > 0.0 {
bar = progress.Bar(float64(status.Progress) / float64(status.Total))
}
fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t%8.8s/%s\t\n",
name,
status.Event,
bar,
progress.Bytes(status.Progress), progress.Bytes(status.Total))
case "resolving", "waiting":
bar := progress.Bar(0.0)
fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n",
name,
status.Event,
bar)
case "complete":
bar := progress.Bar(1.0)
fmt.Fprintf(w, "%-40.40s\t%-11s\t%40r\t\n",
name,
status.Event,
bar)
default:
fmt.Fprintf(w, "%-40.40s\t%s\t\n",
name,
status.Event)
}
total += displayNode(w, prefix+cpf, node.children)
}
return total
}
func prefixes(index, length int) (prefix string, childPrefix string) {
if index+1 == length {
prefix = "└──"
childPrefix = " "
} else {
prefix = "├──"
childPrefix = "│ "
}
return
}
func displayName(name string) string {
parts := strings.Split(name, "-")
for i := range parts {
parts[i] = shortenName(parts[i])
}
return strings.Join(parts, " ")
}
func shortenName(name string) string {
if strings.HasPrefix(name, "sha256:") && len(name) == 71 {
return "(" + name[7:19] + ")"
}
return name
}
// Display pretty prints out the download or upload progress
// Status tree
func Display(w io.Writer, status string, statuses []transfer.Progress, start time.Time) {
var total int64
for _, status := range statuses {
total += status.Progress
switch status.Event {
case "downloading", "uploading":
var bar progress.Bar
if status.Total > 0.0 {
bar = progress.Bar(float64(status.Progress) / float64(status.Total))
}
fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
status.Name,
status.Event,
bar,
progress.Bytes(status.Progress), progress.Bytes(status.Total))
case "resolving", "waiting":
bar := progress.Bar(0.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Name,
status.Event,
bar)
case "complete", "done":
bar := progress.Bar(1.0)
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
status.Name,
status.Event,
bar)
default:
fmt.Fprintf(w, "%s:\t%s\t\n",
status.Name,
status.Event)
}
}
// Print the Status line
fmt.Fprintf(w, "%s\telapsed: %-4.1fs\ttotal: %7.6v\t(%v)\t\n",
status,
time.Since(start).Seconds(),
// TODO(stevvooe): These calculations are actually way off.
// Need to account for previously downloaded data. These
// will basically be right for a download the first time
// but will be skewed if restarting, as it includes the
// data into the start time before.
progress.Bytes(total),
progress.NewBytesPerSecond(total, time.Since(start)))
}

View File

@ -32,6 +32,8 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/progress"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/transfer/image"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
@ -68,6 +70,9 @@ var pushCommand = cli.Command{
}, cli.IntFlag{
Name: "max-concurrent-uploaded-layers",
Usage: "set the max concurrent uploaded layers for each push",
}, cli.BoolFlag{
Name: "local",
Usage: "Push content from local client rather than using transfer service",
}, cli.BoolFlag{
Name: "allow-non-distributable-blobs",
Usage: "allow pushing blobs that are marked as non-distributable",
@ -89,6 +94,24 @@ var pushCommand = cli.Command{
}
defer cancel()
if !context.Bool("local") {
ch, err := commands.NewStaticCredentials(ctx, context, ref)
if err != nil {
return err
}
if local == "" {
local = ref
}
reg := image.NewOCIRegistry(ref, nil, ch)
is := image.NewStore(local)
pf, done := ProgressHandler(ctx, os.Stdout)
defer done()
return client.Transfer(ctx, is, reg, transfer.WithProgress(pf))
}
if manifest := context.String("manifest"); manifest != "" {
desc.Digest, err = digest.Parse(manifest)
if err != nil {

View File

@ -32,6 +32,7 @@ import (
"github.com/containerd/console"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/transfer/image"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/containerd/remotes/docker/config"
@ -209,3 +210,50 @@ func NewDebugClientTrace(ctx gocontext.Context) *httptrace.ClientTrace {
},
}
}
type staticCredentials struct {
ref string
username string
secret string
}
// NewStaticCredentials gets credentials from passing in cli context
func NewStaticCredentials(ctx gocontext.Context, clicontext *cli.Context, ref string) (image.CredentialHelper, error) {
username := clicontext.String("user")
var secret string
if i := strings.IndexByte(username, ':'); i > 0 {
secret = username[i+1:]
username = username[0:i]
}
if username != "" {
if secret == "" {
fmt.Printf("Password: ")
var err error
secret, err = passwordPrompt()
if err != nil {
return nil, err
}
fmt.Print("\n")
}
} else if rt := clicontext.String("refresh"); rt != "" {
secret = rt
}
return &staticCredentials{
ref: ref,
username: username,
secret: secret,
}, nil
}
func (sc *staticCredentials) GetCredentials(ctx gocontext.Context, ref, host string) (image.Credentials, error) {
if ref == sc.ref {
return image.Credentials{
Username: sc.username,
Secret: sc.secret,
}, nil
}
return image.Credentials{}, nil
}