containerd/pkg/transfer/local/progress.go
Derek McGowan fc2754204f
Cleanup code comments and lint fixes
Signed-off-by: Derek McGowan <derek@mcg.dev>
2022-11-30 16:02:09 -08:00

277 lines
6.3 KiB
Go

/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"sort"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/remotes"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
type ProgressTracker struct {
root string
transferState string
added chan jobUpdate
waitC chan struct{}
parents map[digest.Digest][]ocispec.Descriptor
parentL sync.Mutex
}
type jobState uint8
const (
jobAdded jobState = iota
jobInProgress
jobComplete
)
type jobStatus struct {
state jobState
name string
parents []string
progress int64
desc ocispec.Descriptor
}
type jobUpdate struct {
desc ocispec.Descriptor
exists bool
//children []ocispec.Descriptor
}
type ActiveJobs interface {
Status(string) (content.Status, bool)
}
type StatusTracker interface {
Active(context.Context, ...string) (ActiveJobs, error)
Check(context.Context, digest.Digest) (bool, error)
}
// NewProgressTracker tracks content download progress
func NewProgressTracker(root, transferState string) *ProgressTracker {
return &ProgressTracker{
root: root,
transferState: transferState,
added: make(chan jobUpdate, 1),
waitC: make(chan struct{}),
parents: map[digest.Digest][]ocispec.Descriptor{},
}
}
func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc, pt StatusTracker) {
defer close(j.waitC)
// Instead of ticker, just delay
jobs := map[digest.Digest]*jobStatus{}
tc := time.NewTicker(time.Millisecond * 300)
update := func() {
// TODO: Filter by references
active, err := pt.Active(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
}
for dgst, job := range jobs {
if job.state != jobComplete {
status, ok := active.Status(job.name)
if ok {
if status.Offset > job.progress {
pf(transfer.Progress{
Event: j.transferState,
Name: job.name,
Parents: job.parents,
Progress: status.Offset,
Total: status.Total,
})
job.progress = status.Offset
job.state = jobInProgress
jobs[dgst] = job
}
} else {
ok, err := pt.Check(ctx, job.desc.Digest)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get statuses for progress")
} else if ok {
pf(transfer.Progress{
Event: "complete",
Name: job.name,
Parents: job.parents,
Progress: job.desc.Size,
Total: job.desc.Size,
})
}
job.state = jobComplete
jobs[dgst] = job
}
}
}
}
for {
select {
case update := <-j.added:
job, ok := jobs[update.desc.Digest]
if !ok {
// Only captures the parents defined before,
// could handle parent updates in same thread
// if there is a synchronization issue
var parents []string
j.parentL.Lock()
for _, parent := range j.parents[update.desc.Digest] {
parents = append(parents, remotes.MakeRefKey(ctx, parent))
}
j.parentL.Unlock()
if len(parents) == 0 {
parents = []string{j.root}
}
name := remotes.MakeRefKey(ctx, update.desc)
job = &jobStatus{
state: jobAdded,
name: name,
parents: parents,
desc: update.desc,
}
jobs[update.desc.Digest] = job
pf(transfer.Progress{
Event: "waiting",
Name: name,
Parents: parents,
//Digest: desc.Digest.String(),
Progress: 0,
Total: update.desc.Size,
})
}
if update.exists {
pf(transfer.Progress{
Event: "already exists",
Name: remotes.MakeRefKey(ctx, update.desc),
Progress: update.desc.Size,
Total: update.desc.Size,
})
job.state = jobComplete
job.progress = job.desc.Size
}
case <-tc.C:
update()
// Next timer?
case <-ctx.Done():
update()
return
}
}
}
// Add adds a descriptor to be tracked
func (j *ProgressTracker) Add(desc ocispec.Descriptor) {
if j == nil {
return
}
j.added <- jobUpdate{
desc: desc,
}
}
func (j *ProgressTracker) MarkExists(desc ocispec.Descriptor) {
if j == nil {
return
}
j.added <- jobUpdate{
desc: desc,
exists: true,
}
}
// Adds hierarchy information
func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispec.Descriptor) {
if j == nil || len(children) == 0 {
return
}
j.parentL.Lock()
defer j.parentL.Unlock()
for _, child := range children {
j.parents[child.Digest] = append(j.parents[child.Digest], desc)
}
}
func (j *ProgressTracker) Wait() {
// timeout rather than rely on cancel
timeout := time.After(10 * time.Second)
select {
case <-timeout:
case <-j.waitC:
}
}
type contentActive struct {
active []content.Status
}
func (c *contentActive) Status(ref string) (content.Status, bool) {
idx := sort.Search(len(c.active), func(i int) bool { return c.active[i].Ref >= ref })
if idx < len(c.active) && c.active[idx].Ref == ref {
return c.active[idx], true
}
return content.Status{}, false
}
type contentStatusTracker struct {
cs content.Store
}
func NewContentStatusTracker(cs content.Store) StatusTracker {
return &contentStatusTracker{
cs: cs,
}
}
func (c *contentStatusTracker) Active(ctx context.Context, _ ...string) (ActiveJobs, error) {
active, err := c.cs.ListStatuses(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to list statuses for progress")
}
sort.Slice(active, func(i, j int) bool {
return active[i].Ref < active[j].Ref
})
return &contentActive{
active: active,
}, nil
}
func (c *contentStatusTracker) Check(ctx context.Context, dgst digest.Digest) (bool, error) {
_, err := c.cs.Info(ctx, dgst)
if err == nil {
return true, nil
}
return false, nil
}