diff --git a/cmd/dist/push.go b/cmd/dist/push.go index 5133dabed..71caa2d2a 100644 --- a/cmd/dist/push.go +++ b/cmd/dist/push.go @@ -8,7 +8,6 @@ import ( "text/tabwriter" "time" - "github.com/Sirupsen/logrus" "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/progress" @@ -99,26 +98,7 @@ var pushCommand = cli.Command{ return nil, nil }) - pushHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { - ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ - "digest": desc.Digest, - "mediatype": desc.MediaType, - "size": desc.Size, - })) - - log.G(ctx).Debug("push") - r, err := cs.Reader(ctx, desc.Digest) - if err != nil { - return nil, err - } - defer r.Close() - - tracker := ongoing.track(remotes.MakeRefKey(ctx, desc), desc.Size) - defer tracker.Close() - tr := io.TeeReader(r, tracker) - - return nil, pusher.Push(ctx, desc, tr) - }) + pushHandler := remotes.PushHandler(cs, ongoing.wrapPusher(pusher)) var m sync.Mutex manifestStack := []ocispec.Descriptor{} @@ -213,6 +193,17 @@ func (pt *pushTracker) Close() error { return nil } +type pushWrapper struct { + jobs *pushjobs + pusher remotes.Pusher +} + +func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error { + tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size) + defer tr.Close() + return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr)) +} + type pushStatus struct { name string started bool @@ -230,6 +221,13 @@ func newPushJobs() *pushjobs { return &pushjobs{jobs: make(map[string]*pushTracker)} } +func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher { + return pushWrapper{ + jobs: j, + pusher: p, + } +} + func (j *pushjobs) add(ref string) { j.mu.Lock() defer j.mu.Unlock() diff --git a/images/handlers.go b/images/handlers.go index e647520ed..bea61f31a 100644 --- a/images/handlers.go +++ b/images/handlers.go @@ -11,9 +11,17 @@ import ( "golang.org/x/sync/errgroup" ) -var SkipDesc = fmt.Errorf("skip descriptor") +var ( + // SkipDesc is used to skip processing of a descriptor and + // its descendants. + SkipDesc = fmt.Errorf("skip descriptor") -var StopHandler = fmt.Errorf("stop handler") + // StopHandler is used to signify that the descriptor + // has been handled and should not be handled further. + // This applies only to a single descriptor in a handler + // chain and does not apply to descendant descriptors. + StopHandler = fmt.Errorf("stop handler") +) type Handler interface { Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) diff --git a/remotes/handlers.go b/remotes/handlers.go index ec9551172..0e4502720 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -64,3 +64,22 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest) } + +func PushHandler(provider content.Provider, pusher Pusher) images.HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ + "digest": desc.Digest, + "mediatype": desc.MediaType, + "size": desc.Size, + })) + + log.G(ctx).Debug("push") + r, err := provider.Reader(ctx, desc.Digest) + if err != nil { + return nil, err + } + defer r.Close() + + return nil, pusher.Push(ctx, desc, r) + } +}