ctr: move fetch,fetch-object,push-object to content

Signed-off-by: Jess Valarezo <valarezo.jessica@gmail.com>
This commit is contained in:
Jess Valarezo 2017-10-27 15:00:35 -04:00
parent 1f704e9862
commit b58e4fc2ed
7 changed files with 142 additions and 159 deletions

View File

@ -16,6 +16,7 @@ import (
"github.com/containerd/containerd/log"
units "github.com/docker/go-units"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/urfave/cli"
)
@ -33,6 +34,9 @@ var (
editCommand,
deleteCommand,
setLabelsCommand,
fetchCommand,
fetchObjectCommand,
pushObjectCommand,
},
}
@ -369,6 +373,120 @@ var (
return exitError
},
}
// TODO(stevvooe): Create "multi-fetch" mode that just takes a remote
// then receives object/hint lines on stdin, returning content as
// needed.
fetchObjectCommand = cli.Command{
Name: "fetch-object",
Usage: "retrieve objects from a remote",
ArgsUsage: "[flags] <remote> <object> [<hint>, ...]",
Description: `Fetch objects by identifier from a remote.`,
Flags: commands.RegistryFlags,
Action: func(context *cli.Context) error {
var (
ref = context.Args().First()
)
ctx, cancel := commands.AppContext(context)
defer cancel()
resolver, err := commands.GetResolver(ctx, context)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("ref", ref))
log.G(ctx).Infof("resolving")
name, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
}
fetcher, err := resolver.Fetcher(ctx, name)
if err != nil {
return err
}
log.G(ctx).Infof("fetching")
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
defer rc.Close()
_, err = io.Copy(os.Stdout, rc)
return err
},
}
pushObjectCommand = cli.Command{
Name: "push-object",
Usage: "push an object to a remote",
ArgsUsage: "[flags] <remote> <object> <type>",
Description: `Push objects by identifier to a remote.`,
Flags: commands.RegistryFlags,
Action: func(context *cli.Context) error {
var (
ref = context.Args().Get(0)
object = context.Args().Get(1)
media = context.Args().Get(2)
)
dgst, err := digest.Parse(object)
if err != nil {
return err
}
client, ctx, cancel, err := commands.NewClient(context)
if err != nil {
return err
}
defer cancel()
resolver, err := commands.GetResolver(ctx, context)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("ref", ref))
log.G(ctx).Infof("resolving")
pusher, err := resolver.Pusher(ctx, ref)
if err != nil {
return err
}
cs := client.ContentStore()
info, err := cs.Info(ctx, dgst)
if err != nil {
return err
}
desc := ocispec.Descriptor{
MediaType: media,
Digest: dgst,
Size: info.Size,
}
ra, err := cs.ReaderAt(ctx, dgst)
if err != nil {
return err
}
defer ra.Close()
cw, err := pusher.Push(ctx, desc)
if err != nil {
return err
}
// TODO: Progress reader
if err := content.Copy(ctx, cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil {
return err
}
fmt.Printf("Pushed %s %s\n", desc.Digest, desc.MediaType)
return nil
},
}
)
func edit(rd io.Reader) (io.ReadCloser, error) {

View File

@ -1,4 +1,4 @@
package main
package content
import (
"context"
@ -45,12 +45,13 @@ Most of this is experimental and there are few leaps to make this work.`,
var (
ref = clicontext.Args().First()
)
_, err := fetch(ref, clicontext)
_, err := Fetch(ref, clicontext)
return err
},
}
func fetch(ref string, cliContext *cli.Context) (containerd.Image, error) {
// Fetch loads all resources into the content store and returns the image
func Fetch(ref string, cliContext *cli.Context) (containerd.Image, error) {
client, ctx, cancel, err := commands.NewClient(cliContext)
if err != nil {
return nil, err
@ -104,7 +105,7 @@ func showProgress(ctx context.Context, ongoing *jobs, cs content.Store, out io.W
ticker = time.NewTicker(100 * time.Millisecond)
fw = progress.NewWriter(out)
start = time.Now()
statuses = map[string]statusInfo{}
statuses = map[string]StatusInfo{}
done bool
)
defer ticker.Stop()
@ -121,7 +122,7 @@ outer:
if !ongoing.isResolved() {
resolved = "resolving"
}
statuses[ongoing.name] = statusInfo{
statuses[ongoing.name] = StatusInfo{
Ref: ongoing.name,
Status: resolved,
}
@ -136,7 +137,7 @@ outer:
}
// update status of active entries!
for _, active := range active {
statuses[active.Ref] = statusInfo{
statuses[active.Ref] = StatusInfo{
Ref: active.Ref,
Status: "downloading",
Offset: active.Offset,
@ -164,13 +165,13 @@ outer:
log.G(ctx).WithError(err).Errorf("failed to get content info")
continue outer
} else {
statuses[key] = statusInfo{
statuses[key] = StatusInfo{
Ref: key,
Status: "waiting",
}
}
} else if info.CreatedAt.After(start) {
statuses[key] = statusInfo{
statuses[key] = StatusInfo{
Ref: key,
Status: "done",
Offset: info.Size,
@ -178,7 +179,7 @@ outer:
UpdatedAt: info.CreatedAt,
}
} else {
statuses[key] = statusInfo{
statuses[key] = StatusInfo{
Ref: key,
Status: "exists",
}
@ -190,7 +191,7 @@ outer:
statuses[key] = status
}
} else {
statuses[key] = statusInfo{
statuses[key] = StatusInfo{
Ref: key,
Status: "done",
}
@ -198,12 +199,12 @@ outer:
}
}
var ordered []statusInfo
var ordered []StatusInfo
for _, key := range keys {
ordered = append(ordered, statuses[key])
}
display(tw, ordered, start)
Display(tw, ordered, start)
tw.Flush()
if done {
@ -262,7 +263,8 @@ func (j *jobs) isResolved() bool {
return j.resolved
}
type statusInfo struct {
// StatusInfo holds the status info for an upload or download
type StatusInfo struct {
Ref string
Status string
Offset int64
@ -271,7 +273,8 @@ type statusInfo struct {
UpdatedAt time.Time
}
func display(w io.Writer, statuses []statusInfo, start time.Time) {
// Display pretty prints out the download or upload progress
func Display(w io.Writer, statuses []StatusInfo, start time.Time) {
var total int64
for _, status := range statuses {
total += status.Offset

View File

@ -1,56 +0,0 @@
package main
import (
"io"
"os"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/log"
"github.com/urfave/cli"
)
// TODO(stevvooe): Create "multi-fetch" mode that just takes a remote
// then receives object/hint lines on stdin, returning content as
// needed.
var fetchObjectCommand = cli.Command{
Name: "fetch-object",
Usage: "retrieve objects from a remote",
ArgsUsage: "[flags] <remote> <object> [<hint>, ...]",
Description: `Fetch objects by identifier from a remote.`,
Flags: commands.RegistryFlags,
Action: func(context *cli.Context) error {
var (
ref = context.Args().First()
)
ctx, cancel := commands.AppContext(context)
defer cancel()
resolver, err := commands.GetResolver(ctx, context)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("ref", ref))
log.G(ctx).Infof("resolving")
name, desc, err := resolver.Resolve(ctx, ref)
if err != nil {
return err
}
fetcher, err := resolver.Fetcher(ctx, name)
if err != nil {
return err
}
log.G(ctx).Infof("fetching")
rc, err := fetcher.Fetch(ctx, desc)
if err != nil {
return err
}
defer rc.Close()
_, err = io.Copy(os.Stdout, rc)
return err
},
}

View File

@ -78,14 +78,11 @@ containerd CLI
containers.Command,
content.Command,
events.Command,
fetchCommand,
fetchObjectCommand,
images.Command,
namespacesCmd.Command,
pprofCommand,
pullCommand,
pushCommand,
pushObjectCommand,
rootfsCommand,
runCommand,
snapshot.Command,

View File

@ -4,6 +4,7 @@ import (
"fmt"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/cmd/ctr/commands/content"
"github.com/containerd/containerd/log"
"github.com/urfave/cli"
)
@ -30,7 +31,7 @@ command. As part of this process, we do the following:
ctx, cancel := commands.AppContext(context)
defer cancel()
img, err := fetch(ref, context)
img, err := content.Fetch(ref, context)
if err != nil {
return err
}

View File

@ -9,6 +9,7 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/cmd/ctr/commands/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/progress"
@ -113,7 +114,7 @@ var pushCommand = cli.Command{
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
display(tw, ongoing.status(), start)
content.Display(tw, ongoing.status(), start)
tw.Flush()
if done {
@ -164,13 +165,13 @@ func (j *pushjobs) add(ref string) {
j.jobs[ref] = struct{}{}
}
func (j *pushjobs) status() []statusInfo {
func (j *pushjobs) status() []content.StatusInfo {
j.mu.Lock()
defer j.mu.Unlock()
statuses := make([]statusInfo, 0, len(j.jobs))
statuses := make([]content.StatusInfo, 0, len(j.jobs))
for _, name := range j.ordered {
si := statusInfo{
si := content.StatusInfo{
Ref: name,
}

View File

@ -1,81 +0,0 @@
package main
import (
"fmt"
"github.com/containerd/containerd/cmd/ctr/commands"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/urfave/cli"
)
var pushObjectCommand = cli.Command{
Name: "push-object",
Usage: "push an object to a remote",
ArgsUsage: "[flags] <remote> <object> <type>",
Description: `Push objects by identifier to a remote.`,
Flags: commands.RegistryFlags,
Action: func(context *cli.Context) error {
var (
ref = context.Args().Get(0)
object = context.Args().Get(1)
media = context.Args().Get(2)
)
dgst, err := digest.Parse(object)
if err != nil {
return err
}
client, ctx, cancel, err := commands.NewClient(context)
if err != nil {
return err
}
defer cancel()
resolver, err := commands.GetResolver(ctx, context)
if err != nil {
return err
}
ctx = log.WithLogger(ctx, log.G(ctx).WithField("ref", ref))
log.G(ctx).Infof("resolving")
pusher, err := resolver.Pusher(ctx, ref)
if err != nil {
return err
}
cs := client.ContentStore()
info, err := cs.Info(ctx, dgst)
if err != nil {
return err
}
desc := ocispec.Descriptor{
MediaType: media,
Digest: dgst,
Size: info.Size,
}
ra, err := cs.ReaderAt(ctx, dgst)
if err != nil {
return err
}
defer ra.Close()
cw, err := pusher.Push(ctx, desc)
if err != nil {
return err
}
// TODO: Progress reader
if err := content.Copy(ctx, cw, content.NewReader(ra), desc.Size, desc.Digest); err != nil {
return err
}
fmt.Printf("Pushed %s %s\n", desc.Digest, desc.MediaType)
return nil
},
}