Merge pull request #911 from dmcgowan/pusher-dispatcher
Add image push to dist tool
This commit is contained in:
commit
36f9605479
4
cmd/dist/fetch.go
vendored
4
cmd/dist/fetch.go
vendored
@ -231,14 +231,14 @@ func display(w io.Writer, statuses []statusInfo, start time.Time) {
|
||||
for _, status := range statuses {
|
||||
total += status.Offset
|
||||
switch status.Status {
|
||||
case "downloading":
|
||||
case "downloading", "uploading":
|
||||
bar := progress.Bar(float64(status.Offset) / float64(status.Total))
|
||||
fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n",
|
||||
status.Ref,
|
||||
status.Status,
|
||||
bar,
|
||||
progress.Bytes(status.Offset), progress.Bytes(status.Total))
|
||||
case "resolving":
|
||||
case "resolving", "waiting":
|
||||
bar := progress.Bar(0.0)
|
||||
fmt.Fprintf(w, "%s:\t%s\t%40r\t\n",
|
||||
status.Ref,
|
||||
|
1
cmd/dist/main.go
vendored
1
cmd/dist/main.go
vendored
@ -77,6 +77,7 @@ distribution tool
|
||||
fetchObjectCommand,
|
||||
applyCommand,
|
||||
rootfsCommand,
|
||||
pushCommand,
|
||||
pushObjectCommand,
|
||||
}
|
||||
app.Before = func(context *cli.Context) error {
|
||||
|
289
cmd/dist/push.go
vendored
Normal file
289
cmd/dist/push.go
vendored
Normal file
@ -0,0 +1,289 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"text/tabwriter"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/log"
|
||||
"github.com/containerd/containerd/progress"
|
||||
"github.com/containerd/containerd/remotes"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/urfave/cli"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var pushCommand = cli.Command{
|
||||
Name: "push",
|
||||
Usage: "push an image to a remote",
|
||||
ArgsUsage: "[flags] <remote> [<local>]",
|
||||
Description: `Pushes an image reference from containerd.
|
||||
|
||||
All resources associated with the manifest reference will be pushed.
|
||||
The ref is used to resolve to a locally existing image manifest.
|
||||
The image manifest must exist before push. Creating a new image
|
||||
manifest can be done through calculating the diff for layers,
|
||||
creating the associated configuration, and creating the manifest
|
||||
which references those resources.
|
||||
`,
|
||||
Flags: append(registryFlags, cli.StringFlag{
|
||||
Name: "manifest",
|
||||
Usage: "Digest of manifest",
|
||||
}, cli.StringFlag{
|
||||
Name: "manifest-type",
|
||||
Usage: "Media type of manifest digest",
|
||||
Value: ocispec.MediaTypeImageManifest,
|
||||
}),
|
||||
Action: func(clicontext *cli.Context) error {
|
||||
var (
|
||||
ref = clicontext.Args().First()
|
||||
local = clicontext.Args().Get(1)
|
||||
desc ocispec.Descriptor
|
||||
)
|
||||
|
||||
ctx, cancel := appContext()
|
||||
defer cancel()
|
||||
|
||||
cs, err := resolveContentStore(clicontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
imageStore, err := resolveImageStore(clicontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if manifest := clicontext.String("manifest"); manifest != "" {
|
||||
desc.Digest, err = digest.Parse(manifest)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "invalid manifest digest")
|
||||
}
|
||||
desc.MediaType = clicontext.String("manifest-type")
|
||||
} else {
|
||||
if local == "" {
|
||||
local = ref
|
||||
}
|
||||
img, err := imageStore.Get(ctx, local)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to resolve image to manifest")
|
||||
}
|
||||
desc = img.Target
|
||||
}
|
||||
|
||||
resolver, err := getResolver(ctx, clicontext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ongoing := newPushJobs()
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
eg.Go(func() error {
|
||||
pusher, err := resolver.Pusher(ctx, ref)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.G(ctx).WithField("image", ref).WithField("digest", desc.Digest).Debug("pushing")
|
||||
|
||||
jobHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
ongoing.add(remotes.MakeRefKey(ctx, desc))
|
||||
return nil, nil
|
||||
})
|
||||
|
||||
pushHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
|
||||
"digest": desc.Digest,
|
||||
"mediatype": desc.MediaType,
|
||||
"size": desc.Size,
|
||||
}))
|
||||
|
||||
log.G(ctx).Debug("push")
|
||||
r, err := cs.Reader(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
tracker := ongoing.track(remotes.MakeRefKey(ctx, desc), desc.Size)
|
||||
defer tracker.Close()
|
||||
tr := io.TeeReader(r, tracker)
|
||||
|
||||
return nil, pusher.Push(ctx, desc, tr)
|
||||
})
|
||||
|
||||
var m sync.Mutex
|
||||
manifestStack := []ocispec.Descriptor{}
|
||||
|
||||
filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
|
||||
switch desc.MediaType {
|
||||
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest,
|
||||
images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
|
||||
m.Lock()
|
||||
manifestStack = append(manifestStack, desc)
|
||||
m.Unlock()
|
||||
return nil, images.StopHandler
|
||||
default:
|
||||
return nil, nil
|
||||
}
|
||||
})
|
||||
|
||||
handler := images.Handlers(jobHandler, images.ChildrenHandler(cs), filterHandler, pushHandler)
|
||||
|
||||
if err := images.Dispatch(ctx, handler, desc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Iterate in reverse order as seen, parent always uploaded after child
|
||||
for i := len(manifestStack) - 1; i >= 0; i-- {
|
||||
_, err := pushHandler(ctx, manifestStack[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
errs := make(chan error)
|
||||
go func() {
|
||||
defer close(errs)
|
||||
errs <- eg.Wait()
|
||||
}()
|
||||
|
||||
var (
|
||||
ticker = time.NewTicker(100 * time.Millisecond)
|
||||
fw = progress.NewWriter(os.Stdout)
|
||||
start = time.Now()
|
||||
done bool
|
||||
)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
fw.Flush()
|
||||
|
||||
tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0)
|
||||
|
||||
display(tw, ongoing.status(), start)
|
||||
tw.Flush()
|
||||
|
||||
if done {
|
||||
fw.Flush()
|
||||
return nil
|
||||
}
|
||||
case err := <-errs:
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done = true
|
||||
case <-ctx.Done():
|
||||
done = true // allow ui to update once more
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
type pushTracker struct {
|
||||
closed bool
|
||||
started time.Time
|
||||
updated time.Time
|
||||
written int64
|
||||
total int64
|
||||
}
|
||||
|
||||
func (pt *pushTracker) Write(p []byte) (int, error) {
|
||||
pt.written += int64(len(p))
|
||||
pt.updated = time.Now()
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (pt *pushTracker) Close() error {
|
||||
pt.closed = true
|
||||
pt.updated = time.Now()
|
||||
return nil
|
||||
}
|
||||
|
||||
type pushStatus struct {
|
||||
name string
|
||||
started bool
|
||||
written int64
|
||||
total int64
|
||||
}
|
||||
|
||||
type pushjobs struct {
|
||||
jobs map[string]*pushTracker
|
||||
ordered []string
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func newPushJobs() *pushjobs {
|
||||
return &pushjobs{jobs: make(map[string]*pushTracker)}
|
||||
}
|
||||
|
||||
func (j *pushjobs) add(ref string) {
|
||||
j.mu.Lock()
|
||||
defer j.mu.Unlock()
|
||||
|
||||
if _, ok := j.jobs[ref]; ok {
|
||||
return
|
||||
}
|
||||
j.ordered = append(j.ordered, ref)
|
||||
j.jobs[ref] = nil
|
||||
}
|
||||
|
||||
func (j *pushjobs) track(ref string, size int64) io.WriteCloser {
|
||||
j.mu.Lock()
|
||||
defer j.mu.Unlock()
|
||||
|
||||
if _, ok := j.jobs[ref]; !ok {
|
||||
j.ordered = append(j.ordered, ref)
|
||||
}
|
||||
|
||||
pt := &pushTracker{
|
||||
started: time.Now(),
|
||||
total: size,
|
||||
}
|
||||
j.jobs[ref] = pt
|
||||
return pt
|
||||
}
|
||||
|
||||
func (j *pushjobs) status() []statusInfo {
|
||||
j.mu.Lock()
|
||||
defer j.mu.Unlock()
|
||||
|
||||
status := make([]statusInfo, 0, len(j.jobs))
|
||||
for _, name := range j.ordered {
|
||||
tracker := j.jobs[name]
|
||||
si := statusInfo{
|
||||
Ref: name,
|
||||
}
|
||||
if tracker != nil {
|
||||
si.Offset = tracker.written
|
||||
si.Total = tracker.total
|
||||
si.StartedAt = tracker.started
|
||||
si.UpdatedAt = tracker.updated
|
||||
if tracker.closed {
|
||||
si.Status = "done"
|
||||
} else if tracker.written >= tracker.total {
|
||||
si.Status = "committing"
|
||||
} else {
|
||||
si.Status = "uploading"
|
||||
}
|
||||
} else {
|
||||
si.Status = "waiting"
|
||||
}
|
||||
status = append(status, si)
|
||||
}
|
||||
|
||||
return status
|
||||
}
|
@ -13,6 +13,8 @@ import (
|
||||
|
||||
var SkipDesc = fmt.Errorf("skip descriptor")
|
||||
|
||||
var StopHandler = fmt.Errorf("stop handler")
|
||||
|
||||
type Handler interface {
|
||||
Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error)
|
||||
}
|
||||
@ -24,12 +26,17 @@ func (fn HandlerFunc) Handle(ctx context.Context, desc ocispec.Descriptor) (subd
|
||||
}
|
||||
|
||||
// Handlers returns a handler that will run the handlers in sequence.
|
||||
//
|
||||
// A handler may return `StopHandler` to stop calling additional handlers
|
||||
func Handlers(handlers ...Handler) HandlerFunc {
|
||||
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
|
||||
var children []ocispec.Descriptor
|
||||
for _, handler := range handlers {
|
||||
ch, err := handler.Handle(ctx, desc)
|
||||
if err != nil {
|
||||
if errors.Cause(err) == StopHandler {
|
||||
break
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user