diff --git a/pkg/transfer/local/progress.go b/pkg/transfer/local/progress.go index e6ad64089..a60475747 100644 --- a/pkg/transfer/local/progress.go +++ b/pkg/transfer/local/progress.go @@ -31,10 +31,10 @@ import ( ) type ProgressTracker struct { - root string - cs content.Store - added chan jobUpdate - waitC chan struct{} + root string + transferState string + added chan jobUpdate + waitC chan struct{} parents map[digest.Digest][]ocispec.Descriptor parentL sync.Mutex @@ -62,22 +62,76 @@ type jobUpdate struct { //children []ocispec.Descriptor } +type ActiveJobs interface { + Status(string) (content.Status, bool) +} + +type StatusTracker interface { + Active(context.Context, ...string) (ActiveJobs, error) + Check(context.Context, digest.Digest) (bool, error) +} + // NewProgressTracker tracks content download progress -func NewProgressTracker(root string, cs content.Store) *ProgressTracker { +func NewProgressTracker(root, transferState string) *ProgressTracker { return &ProgressTracker{ - root: root, - cs: cs, - added: make(chan jobUpdate, 1), - waitC: make(chan struct{}), - parents: map[digest.Digest][]ocispec.Descriptor{}, + root: root, + transferState: transferState, + added: make(chan jobUpdate, 1), + waitC: make(chan struct{}), + parents: map[digest.Digest][]ocispec.Descriptor{}, } } -func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc) { +func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc, pt StatusTracker) { defer close(j.waitC) // Instead of ticker, just delay jobs := map[digest.Digest]*jobStatus{} - tc := time.NewTicker(time.Millisecond * 200) + tc := time.NewTicker(time.Millisecond * 300) + + update := func() { + // TODO: Filter by references + active, err := pt.Active(ctx) + if err != nil { + log.G(ctx).WithError(err).Error("failed to get statuses for progress") + } + for dgst, job := range jobs { + if job.state != jobComplete { + status, ok := active.Status(job.name) + if ok { + if status.Offset > job.progress { + pf(transfer.Progress{ + Event: j.transferState, + Name: job.name, + Parents: job.parents, + //Digest: job.desc.Digest.String(), + Progress: status.Offset, + Total: status.Total, + }) + job.progress = status.Offset + job.state = jobInProgress + jobs[dgst] = job + } + } else { + ok, err := pt.Check(ctx, job.desc.Digest) + if err != nil { + log.G(ctx).WithError(err).Error("failed to get statuses for progress") + } else if ok { + pf(transfer.Progress{ + Event: "complete", + Name: job.name, + Parents: job.parents, + //Digest: job.desc.Digest.String(), + Progress: job.desc.Size, + Total: job.desc.Size, + }) + + } + job.state = jobComplete + jobs[dgst] = job + } + } + } + } for { select { case update := <-j.added: @@ -126,52 +180,10 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre } case <-tc.C: - // TODO: Filter by references - active, err := j.cs.ListStatuses(ctx) - if err != nil { - log.G(ctx).WithError(err).Error("failed to list statuses for progress") - } - sort.Slice(active, func(i, j int) bool { - return active[i].Ref < active[j].Ref - }) - - for dgst, job := range jobs { - if job.state != jobComplete { - idx := sort.Search(len(active), func(i int) bool { return active[i].Ref >= job.name }) - if idx < len(active) && active[idx].Ref == job.name { - if active[idx].Offset > job.progress { - pf(transfer.Progress{ - Event: "downloading", - Name: job.name, - Parents: job.parents, - //Digest: job.desc.Digest.String(), - Progress: active[idx].Offset, - Total: active[idx].Total, - }) - job.progress = active[idx].Offset - job.state = jobInProgress - jobs[dgst] = job - } - } else { - _, err := j.cs.Info(ctx, job.desc.Digest) - if err == nil { - pf(transfer.Progress{ - Event: "complete", - Name: job.name, - Parents: job.parents, - //Digest: job.desc.Digest.String(), - Progress: job.desc.Size, - Total: job.desc.Size, - }) - - } - job.state = jobComplete - jobs[dgst] = job - } - } - } + update() // Next timer? case <-ctx.Done(): + update() return } } @@ -219,3 +231,48 @@ func (j *ProgressTracker) Wait() { case <-j.waitC: } } + +type contentActive struct { + active []content.Status +} + +func (c *contentActive) Status(ref string) (content.Status, bool) { + idx := sort.Search(len(c.active), func(i int) bool { return c.active[i].Ref >= ref }) + if idx < len(c.active) && c.active[idx].Ref == ref { + return c.active[idx], true + } + + return content.Status{}, false +} + +type contentStatusTracker struct { + cs content.Store +} + +func NewContentStatusTracker(cs content.Store) StatusTracker { + return &contentStatusTracker{ + cs: cs, + } +} + +func (c *contentStatusTracker) Active(ctx context.Context, _ ...string) (ActiveJobs, error) { + active, err := c.cs.ListStatuses(ctx) + if err != nil { + log.G(ctx).WithError(err).Error("failed to list statuses for progress") + } + sort.Slice(active, func(i, j int) bool { + return active[i].Ref < active[j].Ref + }) + + return &contentActive{ + active: active, + }, nil +} + +func (c *contentStatusTracker) Check(ctx context.Context, dgst digest.Digest) (bool, error) { + _, err := c.cs.Info(ctx, dgst) + if err == nil { + return true, nil + } + return false, nil +} diff --git a/pkg/transfer/local/pull.go b/pkg/transfer/local/pull.go index efa25b582..44cbf30a7 100644 --- a/pkg/transfer/local/pull.go +++ b/pkg/transfer/local/pull.go @@ -83,13 +83,12 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch progressTracker *ProgressTracker ) + ctx, cancel := context.WithCancel(ctx) if tops.Progress != nil { - progressTracker = NewProgressTracker(name, store) //Pass in first name as root - go progressTracker.HandleProgress(ctx, tops.Progress) + progressTracker = NewProgressTracker(name, "downloading") //Pass in first name as root + go progressTracker.HandleProgress(ctx, tops.Progress, NewContentStatusTracker(store)) defer progressTracker.Wait() } - - ctx, cancel := context.WithCancel(ctx) defer cancel() // Get all the children for a descriptor @@ -103,6 +102,7 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetch //if limit > 0 { // childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit) //} + //SetChildrenMappedLabels(manager content.Manager, f HandlerFunc, labelMap func(ocispec.Descriptor) []string) HandlerFunc { checkNeedsFix := images.HandlerFunc( func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { diff --git a/pkg/transfer/local/push.go b/pkg/transfer/local/push.go index 9f4db916f..6e4ca4e8a 100644 --- a/pkg/transfer/local/push.go +++ b/pkg/transfer/local/push.go @@ -18,11 +18,18 @@ package local import ( "context" + "fmt" + "sync" + "time" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/pkg/transfer" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGetter, p transfer.ImagePusher, tops *transfer.Config) error { @@ -44,7 +51,6 @@ func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGette } } */ - matcher := platforms.All // Filter push @@ -53,13 +59,38 @@ func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGette return err } - pusher, err := p.Pusher(ctx, img.Target) + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Pushing to %s", p), + }) + tops.Progress(transfer.Progress{ + Event: "pushing content", + Name: img.Name, + //Digest: img.Target.Digest.String(), + }) + } + + var pusher remotes.Pusher + pusher, err = p.Pusher(ctx, img.Target) if err != nil { return err } var wrapper func(images.Handler) images.Handler + ctx, cancel := context.WithCancel(ctx) + if tops.Progress != nil { + progressTracker := NewProgressTracker(img.Name, "uploading") //Pass in first name as root + + p := newProgressPusher(pusher, progressTracker) + go progressTracker.HandleProgress(ctx, tops.Progress, p) + defer progressTracker.Wait() + wrapper = p.WrapHandler + pusher = p + } + defer cancel() + + // TODO: Add handler to track parents /* // TODO: Add handlers if len(pushCtx.BaseHandlers) > 0 { @@ -75,5 +106,165 @@ func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGette } */ - return remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiter, matcher, wrapper) + if err := remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiter, matcher, wrapper); err != nil { + return err + } + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: "pushed content", + Name: img.Name, + //Digest: img.Target.Digest.String(), + }) + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Completed push to %s", p), + }) + } + + return nil +} + +type progressPusher struct { + remotes.Pusher + progress *ProgressTracker + + status *pushStatus +} + +type pushStatus struct { + l sync.Mutex + statuses map[string]content.Status + complete map[digest.Digest]struct{} +} + +func newProgressPusher(pusher remotes.Pusher, progress *ProgressTracker) *progressPusher { + return &progressPusher{ + Pusher: pusher, + progress: progress, + status: &pushStatus{ + statuses: map[string]content.Status{}, + complete: map[digest.Digest]struct{}{}, + }, + } + +} + +func (p *progressPusher) WrapHandler(h images.Handler) images.Handler { + return images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + p.progress.Add(desc) + subdescs, err = h.Handle(ctx, desc) + p.progress.AddChildren(desc, subdescs) + return + }) +} + +func (p *progressPusher) Push(ctx context.Context, d ocispec.Descriptor) (content.Writer, error) { + ref := remotes.MakeRefKey(ctx, d) + p.status.add(ref, d) + cw, err := p.Pusher.Push(ctx, d) + if err != nil { + if errdefs.IsAlreadyExists(err) { + p.progress.MarkExists(d) + p.status.markComplete(ref, d) + } + return nil, err + } + + return &progressWriter{ + Writer: cw, + ref: ref, + desc: d, + status: p.status, + progress: p.progress, + }, nil +} + +func (ps *pushStatus) update(ref string, delta int) { + ps.l.Lock() + status, ok := ps.statuses[ref] + if ok { + if delta > 0 { + status.Offset += int64(delta) + } else if delta < 0 { + status.Offset = 0 + } + ps.statuses[ref] = status + } + ps.l.Unlock() +} + +func (ps *pushStatus) add(ref string, d ocispec.Descriptor) { + status := content.Status{ + Ref: ref, + Offset: 0, + Total: d.Size, + StartedAt: time.Now(), + } + ps.l.Lock() + _, ok := ps.statuses[ref] + _, complete := ps.complete[d.Digest] + if !ok && !complete { + ps.statuses[ref] = status + } + ps.l.Unlock() +} +func (ps *pushStatus) markComplete(ref string, d ocispec.Descriptor) { + ps.l.Lock() + _, ok := ps.statuses[ref] + if ok { + delete(ps.statuses, ref) + } + ps.complete[d.Digest] = struct{}{} + ps.l.Unlock() + +} + +func (ps *pushStatus) Status(name string) (content.Status, bool) { + ps.l.Lock() + status, ok := ps.statuses[name] + ps.l.Unlock() + return status, ok +} + +func (ps *pushStatus) Check(ctx context.Context, dgst digest.Digest) (bool, error) { + ps.l.Lock() + _, ok := ps.complete[dgst] + ps.l.Unlock() + return ok, nil +} + +func (p *progressPusher) Active(ctx context.Context, _ ...string) (ActiveJobs, error) { + return p.status, nil +} + +func (p *progressPusher) Check(ctx context.Context, dgst digest.Digest) (bool, error) { + return p.status.Check(ctx, dgst) +} + +type progressWriter struct { + content.Writer + ref string + desc ocispec.Descriptor + status *pushStatus + progress *ProgressTracker +} + +func (pw *progressWriter) Write(p []byte) (n int, err error) { + n, err = pw.Writer.Write(p) + if err != nil { + // TODO: Handle reset error to reset progress + return + } + pw.status.update(pw.ref, n) + return +} +func (pw *progressWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + err := pw.Writer.Commit(ctx, size, expected, opts...) + if err != nil { + if errdefs.IsAlreadyExists(err) { + pw.progress.MarkExists(pw.desc) + } + // TODO: Handle reset error to reset progress + } + pw.status.markComplete(pw.ref, pw.desc) + return err }