diff --git a/cmd/dist/fetch.go b/cmd/dist/fetch.go index f2bef80e1..c2de902c9 100644 --- a/cmd/dist/fetch.go +++ b/cmd/dist/fetch.go @@ -231,14 +231,14 @@ func display(w io.Writer, statuses []statusInfo, start time.Time) { for _, status := range statuses { total += status.Offset switch status.Status { - case "downloading": + case "downloading", "uploading": bar := progress.Bar(float64(status.Offset) / float64(status.Total)) fmt.Fprintf(w, "%s:\t%s\t%40r\t%8.8s/%s\t\n", status.Ref, status.Status, bar, progress.Bytes(status.Offset), progress.Bytes(status.Total)) - case "resolving": + case "resolving", "waiting": bar := progress.Bar(0.0) fmt.Fprintf(w, "%s:\t%s\t%40r\t\n", status.Ref, diff --git a/cmd/dist/main.go b/cmd/dist/main.go index 0faa7801f..adae9ebdf 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -77,6 +77,7 @@ distribution tool fetchObjectCommand, applyCommand, rootfsCommand, + pushCommand, pushObjectCommand, } app.Before = func(context *cli.Context) error { diff --git a/cmd/dist/push.go b/cmd/dist/push.go new file mode 100644 index 000000000..5133dabed --- /dev/null +++ b/cmd/dist/push.go @@ -0,0 +1,289 @@ +package main + +import ( + "context" + "io" + "os" + "sync" + "text/tabwriter" + "time" + + "github.com/Sirupsen/logrus" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/progress" + "github.com/containerd/containerd/remotes" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/urfave/cli" + "golang.org/x/sync/errgroup" +) + +var pushCommand = cli.Command{ + Name: "push", + Usage: "push an image to a remote", + ArgsUsage: "[flags] []", + Description: `Pushes an image reference from containerd. + + All resources associated with the manifest reference will be pushed. + The ref is used to resolve to a locally existing image manifest. + The image manifest must exist before push. Creating a new image + manifest can be done through calculating the diff for layers, + creating the associated configuration, and creating the manifest + which references those resources. +`, + Flags: append(registryFlags, cli.StringFlag{ + Name: "manifest", + Usage: "Digest of manifest", + }, cli.StringFlag{ + Name: "manifest-type", + Usage: "Media type of manifest digest", + Value: ocispec.MediaTypeImageManifest, + }), + Action: func(clicontext *cli.Context) error { + var ( + ref = clicontext.Args().First() + local = clicontext.Args().Get(1) + desc ocispec.Descriptor + ) + + ctx, cancel := appContext() + defer cancel() + + cs, err := resolveContentStore(clicontext) + if err != nil { + return err + } + + imageStore, err := resolveImageStore(clicontext) + if err != nil { + return err + } + + if manifest := clicontext.String("manifest"); manifest != "" { + desc.Digest, err = digest.Parse(manifest) + if err != nil { + return errors.Wrap(err, "invalid manifest digest") + } + desc.MediaType = clicontext.String("manifest-type") + } else { + if local == "" { + local = ref + } + img, err := imageStore.Get(ctx, local) + if err != nil { + return errors.Wrap(err, "unable to resolve image to manifest") + } + desc = img.Target + } + + resolver, err := getResolver(ctx, clicontext) + if err != nil { + return err + } + ongoing := newPushJobs() + + eg, ctx := errgroup.WithContext(ctx) + + eg.Go(func() error { + pusher, err := resolver.Pusher(ctx, ref) + if err != nil { + return err + } + + log.G(ctx).WithField("image", ref).WithField("digest", desc.Digest).Debug("pushing") + + jobHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ongoing.add(remotes.MakeRefKey(ctx, desc)) + return nil, nil + }) + + pushHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ + "digest": desc.Digest, + "mediatype": desc.MediaType, + "size": desc.Size, + })) + + log.G(ctx).Debug("push") + r, err := cs.Reader(ctx, desc.Digest) + if err != nil { + return nil, err + } + defer r.Close() + + tracker := ongoing.track(remotes.MakeRefKey(ctx, desc), desc.Size) + defer tracker.Close() + tr := io.TeeReader(r, tracker) + + return nil, pusher.Push(ctx, desc, tr) + }) + + var m sync.Mutex + manifestStack := []ocispec.Descriptor{} + + filterHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + switch desc.MediaType { + case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest, + images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex: + m.Lock() + manifestStack = append(manifestStack, desc) + m.Unlock() + return nil, images.StopHandler + default: + return nil, nil + } + }) + + handler := images.Handlers(jobHandler, images.ChildrenHandler(cs), filterHandler, pushHandler) + + if err := images.Dispatch(ctx, handler, desc); err != nil { + return err + } + + // Iterate in reverse order as seen, parent always uploaded after child + for i := len(manifestStack) - 1; i >= 0; i-- { + _, err := pushHandler(ctx, manifestStack[i]) + if err != nil { + return err + } + } + + return nil + }) + + errs := make(chan error) + go func() { + defer close(errs) + errs <- eg.Wait() + }() + + var ( + ticker = time.NewTicker(100 * time.Millisecond) + fw = progress.NewWriter(os.Stdout) + start = time.Now() + done bool + ) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + fw.Flush() + + tw := tabwriter.NewWriter(fw, 1, 8, 1, ' ', 0) + + display(tw, ongoing.status(), start) + tw.Flush() + + if done { + fw.Flush() + return nil + } + case err := <-errs: + if err != nil { + return err + } + done = true + case <-ctx.Done(): + done = true // allow ui to update once more + } + } + }, +} + +type pushTracker struct { + closed bool + started time.Time + updated time.Time + written int64 + total int64 +} + +func (pt *pushTracker) Write(p []byte) (int, error) { + pt.written += int64(len(p)) + pt.updated = time.Now() + return len(p), nil +} + +func (pt *pushTracker) Close() error { + pt.closed = true + pt.updated = time.Now() + return nil +} + +type pushStatus struct { + name string + started bool + written int64 + total int64 +} + +type pushjobs struct { + jobs map[string]*pushTracker + ordered []string + mu sync.Mutex +} + +func newPushJobs() *pushjobs { + return &pushjobs{jobs: make(map[string]*pushTracker)} +} + +func (j *pushjobs) add(ref string) { + j.mu.Lock() + defer j.mu.Unlock() + + if _, ok := j.jobs[ref]; ok { + return + } + j.ordered = append(j.ordered, ref) + j.jobs[ref] = nil +} + +func (j *pushjobs) track(ref string, size int64) io.WriteCloser { + j.mu.Lock() + defer j.mu.Unlock() + + if _, ok := j.jobs[ref]; !ok { + j.ordered = append(j.ordered, ref) + } + + pt := &pushTracker{ + started: time.Now(), + total: size, + } + j.jobs[ref] = pt + return pt +} + +func (j *pushjobs) status() []statusInfo { + j.mu.Lock() + defer j.mu.Unlock() + + status := make([]statusInfo, 0, len(j.jobs)) + for _, name := range j.ordered { + tracker := j.jobs[name] + si := statusInfo{ + Ref: name, + } + if tracker != nil { + si.Offset = tracker.written + si.Total = tracker.total + si.StartedAt = tracker.started + si.UpdatedAt = tracker.updated + if tracker.closed { + si.Status = "done" + } else if tracker.written >= tracker.total { + si.Status = "committing" + } else { + si.Status = "uploading" + } + } else { + si.Status = "waiting" + } + status = append(status, si) + } + + return status +} diff --git a/images/handlers.go b/images/handlers.go index bb7bfa70d..e647520ed 100644 --- a/images/handlers.go +++ b/images/handlers.go @@ -13,6 +13,8 @@ import ( var SkipDesc = fmt.Errorf("skip descriptor") +var StopHandler = fmt.Errorf("stop handler") + type Handler interface { Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) } @@ -24,12 +26,17 @@ func (fn HandlerFunc) Handle(ctx context.Context, desc ocispec.Descriptor) (subd } // Handlers returns a handler that will run the handlers in sequence. +// +// A handler may return `StopHandler` to stop calling additional handlers func Handlers(handlers ...Handler) HandlerFunc { return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { var children []ocispec.Descriptor for _, handler := range handlers { ch, err := handler.Handle(ctx, desc) if err != nil { + if errors.Cause(err) == StopHandler { + break + } return nil, err }