Add push progress

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-09-12 13:15:14 -07:00
parent e88baa0873
commit 737257bb48
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
3 changed files with 311 additions and 63 deletions

View File

@ -31,10 +31,10 @@ import (
)
type ProgressTracker struct {
root string
cs content.Store
added chan jobUpdate
waitC chan struct{}
root string
transferState string
added chan jobUpdate
waitC chan struct{}
parents map[digest.Digest][]ocispec.Descriptor
parentL sync.Mutex
@ -62,22 +62,76 @@ type jobUpdate struct {
//children []ocispec.Descriptor
}
type ActiveJobs interface {
Status(string) (content.Status, bool)
}
type StatusTracker interface {
Active(context.Context, ...string) (ActiveJobs, error)
Check(context.Context, digest.Digest) (bool, error)
}
// NewProgressTracker tracks content download progress
func NewProgressTracker(root string, cs content.Store) *ProgressTracker {
func NewProgressTracker(root, transferState string) *ProgressTracker {
return &ProgressTracker{
root: root,
cs: cs,
added: make(chan jobUpdate, 1),
waitC: make(chan struct{}),
parents: map[digest.Digest][]ocispec.Descriptor{},
root: root,
transferState: transferState,
added: make(chan jobUpdate, 1),
waitC: make(chan struct{}),
parents: map[digest.Digest][]ocispec.Descriptor{},
}
}
func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc) {
func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc, pt StatusTracker) {
defer close(j.waitC)
// Instead of ticker, just delay
jobs := map[digest.Digest]*jobStatus{}
tc := time.NewTicker(time.Millisecond * 200)
tc := time.NewTicker(time.Millisecond * 300)
update := func() {
// TODO: Filter by references
active, err := pt.Active(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
}
for dgst, job := range jobs {
if job.state != jobComplete {
status, ok := active.Status(job.name)
if ok {
if status.Offset > job.progress {
pf(transfer.Progress{
Event: j.transferState,
Name: job.name,
Parents: job.parents,
//Digest: job.desc.Digest.String(),
Progress: status.Offset,
Total: status.Total,
})
job.progress = status.Offset
job.state = jobInProgress
jobs[dgst] = job
}
} else {
ok, err := pt.Check(ctx, job.desc.Digest)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
} else if ok {
pf(transfer.Progress{
Event: "complete",
Name: job.name,
Parents: job.parents,
//Digest: job.desc.Digest.String(),
Progress: job.desc.Size,
Total: job.desc.Size,
})
}
job.state = jobComplete
jobs[dgst] = job
}
}
}
}
for {
select {
case update := <-j.added:
@ -126,52 +180,10 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre
}
case <-tc.C:
// TODO: Filter by references
active, err := j.cs.ListStatuses(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to list statuses for progress")
}
sort.Slice(active, func(i, j int) bool {
return active[i].Ref < active[j].Ref
})
for dgst, job := range jobs {
if job.state != jobComplete {
idx := sort.Search(len(active), func(i int) bool { return active[i].Ref >= job.name })
if idx < len(active) && active[idx].Ref == job.name {
if active[idx].Offset > job.progress {
pf(transfer.Progress{
Event: "downloading",
Name: job.name,
Parents: job.parents,
//Digest: job.desc.Digest.String(),
Progress: active[idx].Offset,
Total: active[idx].Total,
})
job.progress = active[idx].Offset
job.state = jobInProgress
jobs[dgst] = job
}
} else {
_, err := j.cs.Info(ctx, job.desc.Digest)
if err == nil {
pf(transfer.Progress{
Event: "complete",
Name: job.name,
Parents: job.parents,
//Digest: job.desc.Digest.String(),
Progress: job.desc.Size,
Total: job.desc.Size,
})
}
job.state = jobComplete
jobs[dgst] = job
}
}
}
update()
// Next timer?
case <-ctx.Done():
update()
return
}
}
@ -219,3 +231,48 @@ func (j *ProgressTracker) Wait() {
case <-j.waitC:
}
}
type contentActive struct {
active []content.Status
}
func (c *contentActive) Status(ref string) (content.Status, bool) {
idx := sort.Search(len(c.active), func(i int) bool { return c.active[i].Ref >= ref })
if idx < len(c.active) && c.active[idx].Ref == ref {
return c.active[idx], true
}
return content.Status{}, false
}
type contentStatusTracker struct {
cs content.Store
}
func NewContentStatusTracker(cs content.Store) StatusTracker {
return &contentStatusTracker{
cs: cs,
}
}
func (c *contentStatusTracker) Active(ctx context.Context, _ ...string) (ActiveJobs, error) {
active, err := c.cs.ListStatuses(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to list statuses for progress")
}
sort.Slice(active, func(i, j int) bool {
return active[i].Ref < active[j].Ref
})
return &contentActive{
active: active,
}, nil
}
func (c *contentStatusTracker) Check(ctx context.Context, dgst digest.Digest) (bool, error) {
_, err := c.cs.Info(ctx, dgst)
if err == nil {
return true, nil
}
return false, nil
}

View File

@ -83,13 +83,12 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
progressTracker *ProgressTracker
)
ctx, cancel := context.WithCancel(ctx)
if tops.Progress != nil {
progressTracker = NewProgressTracker(name, store) //Pass in first name as root
go progressTracker.HandleProgress(ctx, tops.Progress)
progressTracker = NewProgressTracker(name, "downloading") //Pass in first name as root
go progressTracker.HandleProgress(ctx, tops.Progress, NewContentStatusTracker(store))
defer progressTracker.Wait()
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Get all the children for a descriptor
@ -103,6 +102,7 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch
//if limit > 0 {
// childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
//}
//SetChildrenMappedLabels(manager content.Manager, f HandlerFunc, labelMap func(ocispec.Descriptor) []string) HandlerFunc {
checkNeedsFix := images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {

View File

@ -18,11 +18,18 @@ package local
import (
"context"
"fmt"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGetter, p transfer.ImagePusher, tops *transfer.Config) error {
@ -44,7 +51,6 @@ func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGette
}
}
*/
matcher := platforms.All
// Filter push
@ -53,13 +59,38 @@ func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGette
return err
}
pusher, err := p.Pusher(ctx, img.Target)
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Pushing to %s", p),
})
tops.Progress(transfer.Progress{
Event: "pushing content",
Name: img.Name,
//Digest: img.Target.Digest.String(),
})
}
var pusher remotes.Pusher
pusher, err = p.Pusher(ctx, img.Target)
if err != nil {
return err
}
var wrapper func(images.Handler) images.Handler
ctx, cancel := context.WithCancel(ctx)
if tops.Progress != nil {
progressTracker := NewProgressTracker(img.Name, "uploading") //Pass in first name as root
p := newProgressPusher(pusher, progressTracker)
go progressTracker.HandleProgress(ctx, tops.Progress, p)
defer progressTracker.Wait()
wrapper = p.WrapHandler
pusher = p
}
defer cancel()
// TODO: Add handler to track parents
/*
// TODO: Add handlers
if len(pushCtx.BaseHandlers) > 0 {
@ -75,5 +106,165 @@ func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGette
}
*/
return remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiter, matcher, wrapper)
if err := remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiter, matcher, wrapper); err != nil {
return err
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "pushed content",
Name: img.Name,
//Digest: img.Target.Digest.String(),
})
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Completed push to %s", p),
})
}
return nil
}
type progressPusher struct {
remotes.Pusher
progress *ProgressTracker
status *pushStatus
}
type pushStatus struct {
l sync.Mutex
statuses map[string]content.Status
complete map[digest.Digest]struct{}
}
func newProgressPusher(pusher remotes.Pusher, progress *ProgressTracker) *progressPusher {
return &progressPusher{
Pusher: pusher,
progress: progress,
status: &pushStatus{
statuses: map[string]content.Status{},
complete: map[digest.Digest]struct{}{},
},
}
}
func (p *progressPusher) WrapHandler(h images.Handler) images.Handler {
return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
p.progress.Add(desc)
subdescs, err = h.Handle(ctx, desc)
p.progress.AddChildren(desc, subdescs)
return
})
}
func (p *progressPusher) Push(ctx context.Context, d ocispec.Descriptor) (content.Writer, error) {
ref := remotes.MakeRefKey(ctx, d)
p.status.add(ref, d)
cw, err := p.Pusher.Push(ctx, d)
if err != nil {
if errdefs.IsAlreadyExists(err) {
p.progress.MarkExists(d)
p.status.markComplete(ref, d)
}
return nil, err
}
return &progressWriter{
Writer: cw,
ref: ref,
desc: d,
status: p.status,
progress: p.progress,
}, nil
}
func (ps *pushStatus) update(ref string, delta int) {
ps.l.Lock()
status, ok := ps.statuses[ref]
if ok {
if delta > 0 {
status.Offset += int64(delta)
} else if delta < 0 {
status.Offset = 0
}
ps.statuses[ref] = status
}
ps.l.Unlock()
}
func (ps *pushStatus) add(ref string, d ocispec.Descriptor) {
status := content.Status{
Ref: ref,
Offset: 0,
Total: d.Size,
StartedAt: time.Now(),
}
ps.l.Lock()
_, ok := ps.statuses[ref]
_, complete := ps.complete[d.Digest]
if !ok && !complete {
ps.statuses[ref] = status
}
ps.l.Unlock()
}
func (ps *pushStatus) markComplete(ref string, d ocispec.Descriptor) {
ps.l.Lock()
_, ok := ps.statuses[ref]
if ok {
delete(ps.statuses, ref)
}
ps.complete[d.Digest] = struct{}{}
ps.l.Unlock()
}
func (ps *pushStatus) Status(name string) (content.Status, bool) {
ps.l.Lock()
status, ok := ps.statuses[name]
ps.l.Unlock()
return status, ok
}
func (ps *pushStatus) Check(ctx context.Context, dgst digest.Digest) (bool, error) {
ps.l.Lock()
_, ok := ps.complete[dgst]
ps.l.Unlock()
return ok, nil
}
func (p *progressPusher) Active(ctx context.Context, _ ...string) (ActiveJobs, error) {
return p.status, nil
}
func (p *progressPusher) Check(ctx context.Context, dgst digest.Digest) (bool, error) {
return p.status.Check(ctx, dgst)
}
type progressWriter struct {
content.Writer
ref string
desc ocispec.Descriptor
status *pushStatus
progress *ProgressTracker
}
func (pw *progressWriter) Write(p []byte) (n int, err error) {
n, err = pw.Writer.Write(p)
if err != nil {
// TODO: Handle reset error to reset progress
return
}
pw.status.update(pw.ref, n)
return
}
func (pw *progressWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
err := pw.Writer.Commit(ctx, size, expected, opts...)
if err != nil {
if errdefs.IsAlreadyExists(err) {
pw.progress.MarkExists(pw.desc)
}
// TODO: Handle reset error to reset progress
}
pw.status.markComplete(pw.ref, pw.desc)
return err
}