/* Copyright The containerd Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package local import ( "context" "sort" "sync" "time" "github.com/containerd/containerd/content" "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/transfer" "github.com/containerd/containerd/remotes" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) type ProgressTracker struct { root string transferState string added chan jobUpdate waitC chan struct{} parents map[digest.Digest][]ocispec.Descriptor parentL sync.Mutex } type jobState uint8 const ( jobAdded jobState = iota jobInProgress jobComplete ) type jobStatus struct { state jobState name string parents []string progress int64 desc ocispec.Descriptor } type jobUpdate struct { desc ocispec.Descriptor exists bool //children []ocispec.Descriptor } type ActiveJobs interface { Status(string) (content.Status, bool) } type StatusTracker interface { Active(context.Context, ...string) (ActiveJobs, error) Check(context.Context, digest.Digest) (bool, error) } // NewProgressTracker tracks content download progress func NewProgressTracker(root, transferState string) *ProgressTracker { return &ProgressTracker{ root: root, transferState: transferState, added: make(chan jobUpdate, 1), waitC: make(chan struct{}), parents: map[digest.Digest][]ocispec.Descriptor{}, } } func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc, pt StatusTracker) { defer close(j.waitC) // Instead of ticker, just delay jobs := map[digest.Digest]*jobStatus{} tc := time.NewTicker(time.Millisecond * 300) update := func() { // TODO: Filter by references active, err := pt.Active(ctx) if err != nil { log.G(ctx).WithError(err).Error("failed to get statuses for progress") } for dgst, job := range jobs { if job.state != jobComplete { status, ok := active.Status(job.name) if ok { if status.Offset > job.progress { pf(transfer.Progress{ Event: j.transferState, Name: job.name, Parents: job.parents, Progress: status.Offset, Total: status.Total, }) job.progress = status.Offset job.state = jobInProgress jobs[dgst] = job } } else { ok, err := pt.Check(ctx, job.desc.Digest) if err != nil { log.G(ctx).WithError(err).Error("failed to get statuses for progress") } else if ok { pf(transfer.Progress{ Event: "complete", Name: job.name, Parents: job.parents, Progress: job.desc.Size, Total: job.desc.Size, }) } job.state = jobComplete jobs[dgst] = job } } } } for { select { case update := <-j.added: job, ok := jobs[update.desc.Digest] if !ok { // Only captures the parents defined before, // could handle parent updates in same thread // if there is a synchronization issue var parents []string j.parentL.Lock() for _, parent := range j.parents[update.desc.Digest] { parents = append(parents, remotes.MakeRefKey(ctx, parent)) } j.parentL.Unlock() if len(parents) == 0 { parents = []string{j.root} } name := remotes.MakeRefKey(ctx, update.desc) job = &jobStatus{ state: jobAdded, name: name, parents: parents, desc: update.desc, } jobs[update.desc.Digest] = job pf(transfer.Progress{ Event: "waiting", Name: name, Parents: parents, //Digest: desc.Digest.String(), Progress: 0, Total: update.desc.Size, }) } if update.exists { pf(transfer.Progress{ Event: "already exists", Name: remotes.MakeRefKey(ctx, update.desc), Progress: update.desc.Size, Total: update.desc.Size, }) job.state = jobComplete job.progress = job.desc.Size } case <-tc.C: update() // Next timer? case <-ctx.Done(): update() return } } } // Add adds a descriptor to be tracked func (j *ProgressTracker) Add(desc ocispec.Descriptor) { if j == nil { return } j.added <- jobUpdate{ desc: desc, } } func (j *ProgressTracker) MarkExists(desc ocispec.Descriptor) { if j == nil { return } j.added <- jobUpdate{ desc: desc, exists: true, } } // Adds hierarchy information func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispec.Descriptor) { if j == nil || len(children) == 0 { return } j.parentL.Lock() defer j.parentL.Unlock() for _, child := range children { j.parents[child.Digest] = append(j.parents[child.Digest], desc) } } func (j *ProgressTracker) Wait() { // timeout rather than rely on cancel timeout := time.After(10 * time.Second) select { case <-timeout: case <-j.waitC: } } type contentActive struct { active []content.Status } func (c *contentActive) Status(ref string) (content.Status, bool) { idx := sort.Search(len(c.active), func(i int) bool { return c.active[i].Ref >= ref }) if idx < len(c.active) && c.active[idx].Ref == ref { return c.active[idx], true } return content.Status{}, false } type contentStatusTracker struct { cs content.Store } func NewContentStatusTracker(cs content.Store) StatusTracker { return &contentStatusTracker{ cs: cs, } } func (c *contentStatusTracker) Active(ctx context.Context, _ ...string) (ActiveJobs, error) { active, err := c.cs.ListStatuses(ctx) if err != nil { log.G(ctx).WithError(err).Error("failed to list statuses for progress") } sort.Slice(active, func(i, j int) bool { return active[i].Ref < active[j].Ref }) return &contentActive{ active: active, }, nil } func (c *contentStatusTracker) Check(ctx context.Context, dgst digest.Digest) (bool, error) { _, err := c.cs.Info(ctx, dgst) if err == nil { return true, nil } return false, nil }