Move push handler from dist to remotes

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2017-05-26 14:33:01 -07:00
parent 36f9605479
commit c7317b2d00
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
3 changed files with 48 additions and 23 deletions

40
cmd/dist/push.go vendored
View File

@ -8,7 +8,6 @@ import (
"text/tabwriter"
"time"
"github.com/Sirupsen/logrus"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/progress"
@ -99,26 +98,7 @@ var pushCommand = cli.Command{
return nil, nil
})
pushHandler := images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
log.G(ctx).Debug("push")
r, err := cs.Reader(ctx, desc.Digest)
if err != nil {
return nil, err
}
defer r.Close()
tracker := ongoing.track(remotes.MakeRefKey(ctx, desc), desc.Size)
defer tracker.Close()
tr := io.TeeReader(r, tracker)
return nil, pusher.Push(ctx, desc, tr)
})
pushHandler := remotes.PushHandler(cs, ongoing.wrapPusher(pusher))
var m sync.Mutex
manifestStack := []ocispec.Descriptor{}
@ -213,6 +193,17 @@ func (pt *pushTracker) Close() error {
return nil
}
type pushWrapper struct {
jobs *pushjobs
pusher remotes.Pusher
}
func (pw pushWrapper) Push(ctx context.Context, desc ocispec.Descriptor, r io.Reader) error {
tr := pw.jobs.track(remotes.MakeRefKey(ctx, desc), desc.Size)
defer tr.Close()
return pw.pusher.Push(ctx, desc, io.TeeReader(r, tr))
}
type pushStatus struct {
name string
started bool
@ -230,6 +221,13 @@ func newPushJobs() *pushjobs {
return &pushjobs{jobs: make(map[string]*pushTracker)}
}
func (j *pushjobs) wrapPusher(p remotes.Pusher) remotes.Pusher {
return pushWrapper{
jobs: j,
pusher: p,
}
}
func (j *pushjobs) add(ref string) {
j.mu.Lock()
defer j.mu.Unlock()

View File

@ -11,9 +11,17 @@ import (
"golang.org/x/sync/errgroup"
)
var SkipDesc = fmt.Errorf("skip descriptor")
var (
// SkipDesc is used to skip processing of a descriptor and
// its descendants.
SkipDesc = fmt.Errorf("skip descriptor")
var StopHandler = fmt.Errorf("stop handler")
// StopHandler is used to signify that the descriptor
// has been handled and should not be handled further.
// This applies only to a single descriptor in a handler
// chain and does not apply to descendant descriptors.
StopHandler = fmt.Errorf("stop handler")
)
type Handler interface {
Handle(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error)

View File

@ -64,3 +64,22 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
return content.WriteBlob(ctx, ingester, ref, rc, desc.Size, desc.Digest)
}
func PushHandler(provider content.Provider, pusher Pusher) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
log.G(ctx).Debug("push")
r, err := provider.Reader(ctx, desc.Digest)
if err != nil {
return nil, err
}
defer r.Close()
return nil, pusher.Push(ctx, desc, r)
}
}