Add progress

Signed-off-by: Derek McGowan <derek@mcg.dev>

Update progress to reference parents

Signed-off-by: Derek McGowan <derek@mcg.dev>

Update Progress logic

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-07-19 13:14:37 -07:00
parent 0e4e96544f
commit 81afd9c36e
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
6 changed files with 478 additions and 55 deletions

View File

@ -19,47 +19,84 @@ package transfer
import (
"context"
"fmt"
"net/http"
"github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// TODO: Should a factory be exposed here as a service??
func NewOCIRegistryFromProto(p *transfer.OCIRegistry, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry {
/*
func NewOCIRegistryFromProto(p *transferapi.OCIRegistry, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry {
//transfer.OCIRegistry
// Create resolver
// Convert auth stream to credential manager
return &OCIRegistry{
reference: p.Reference,
resolver: resolver,
streams: sm,
}
}
*/
// Initialize with hosts, authorizer callback, and headers
func NewOCIRegistry(ref string, headers http.Header, creds CredentialHelper) *OCIRegistry {
// Create an authorizer
var ropts []docker.RegistryOpt
if creds != nil {
// TODO: Support bearer
authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) {
c, err := creds.GetCredentials(context.Background(), ref, host)
if err != nil {
return "", "", err
}
return c.Username, c.Secret, nil
}))
ropts = append(ropts, docker.WithAuthorizer(authorizer))
}
// TODO: Apply local configuration, maybe dynamically create resolver when requested
resolver := docker.NewResolver(docker.ResolverOptions{
Hosts: docker.ConfigureDefaultRegistries(ropts...),
Headers: headers,
})
return &OCIRegistry{
reference: ref,
headers: headers,
creds: creds,
resolver: resolver,
}
}
func NewOCIRegistry(ref string, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry {
// With options, stream,
// With streams?
return &OCIRegistry{
reference: ref,
resolver: resolver,
streams: sm,
// From stream
type CredentialHelper interface {
GetCredentials(ctx context.Context, ref, host string) (Credentials, error)
}
type Credentials struct {
Host string
Username string
Secret string
Bearer string
}
// OCI
type OCIRegistry struct {
reference string
headers http.Header
creds CredentialHelper
resolver remotes.Resolver
streams streaming.StreamManager
// This could be an interface which returns resolver?
// Resolver could also be a plug-able interface, to call out to a program to fetch?
@ -73,15 +110,49 @@ func (r *OCIRegistry) Image() string {
return r.reference
}
func (r *OCIRegistry) Resolver() remotes.Resolver {
return r.resolver
func (r *OCIRegistry) Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) {
return r.resolver.Resolve(ctx, r.reference)
}
func (r *OCIRegistry) ToProto() typeurl.Any {
// Might need more context to convert to proto
// Need access to a stream manager
// Service provider
return nil
func (r *OCIRegistry) Fetcher(ctx context.Context, ref string) (transfer.Fetcher, error) {
return r.resolver.Fetcher(ctx, ref)
}
func (r *OCIRegistry) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) {
if r.creds != nil {
// TODO: Unique stream ID
stream, err := sm.Get(ctx, "")
if err != nil {
return nil, err
}
go func() {
// Check for context cancellation as well
for {
select {
case <-ctx.Done():
return
default:
}
_, err := stream.Recv()
if err != nil {
// If not EOF, log error
return
}
// If closed, return
// Call creds helper
// Send response
}
}()
// link creds to stream
}
// Create API OCI Registry type
// Marshal and return
return nil, nil
}
type ImageStore struct {
@ -101,9 +172,11 @@ type ImageStore struct {
unpacks []unpack.Platform
}
func NewImageStore(image string) *ImageStore {
func NewImageStore(image string, cs content.Store, is images.Store) *ImageStore {
return &ImageStore{
imageName: image,
images: is,
content: cs,
}
}

View File

@ -0,0 +1,221 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"sort"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/remotes"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
type ProgressTracker struct {
root string
cs content.Store
added chan jobUpdate
waitC chan struct{}
parents map[digest.Digest][]ocispec.Descriptor
parentL sync.Mutex
}
type jobState uint8
const (
jobAdded jobState = iota
jobInProgress
jobComplete
)
type jobStatus struct {
state jobState
name string
parents []string
progress int64
desc ocispec.Descriptor
}
type jobUpdate struct {
desc ocispec.Descriptor
exists bool
//children []ocispec.Descriptor
}
// NewProgressTracker tracks content download progress
func NewProgressTracker(root string, cs content.Store) *ProgressTracker {
return &ProgressTracker{
root: root,
cs: cs,
added: make(chan jobUpdate, 1),
waitC: make(chan struct{}),
parents: map[digest.Digest][]ocispec.Descriptor{},
}
}
func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc) {
defer close(j.waitC)
// Instead of ticker, just delay
jobs := map[digest.Digest]*jobStatus{}
tc := time.NewTicker(time.Millisecond * 200)
for {
select {
case update := <-j.added:
job, ok := jobs[update.desc.Digest]
if !ok {
// Only captures the parents defined before,
// could handle parent updates in same thread
// if there is a synchronization issue
var parents []string
j.parentL.Lock()
for _, parent := range j.parents[update.desc.Digest] {
parents = append(parents, remotes.MakeRefKey(ctx, parent))
}
j.parentL.Unlock()
if len(parents) == 0 {
parents = []string{j.root}
}
name := remotes.MakeRefKey(ctx, update.desc)
job = &jobStatus{
state: jobAdded,
name: name,
parents: parents,
desc: update.desc,
}
jobs[update.desc.Digest] = job
pf(transfer.Progress{
Event: "waiting",
Name: name,
Parents: parents,
//Digest: desc.Digest.String(),
Progress: 0,
Total: update.desc.Size,
})
}
if update.exists {
pf(transfer.Progress{
Event: "already exists",
Name: remotes.MakeRefKey(ctx, update.desc),
Progress: update.desc.Size,
Total: update.desc.Size,
})
job.state = jobComplete
job.progress = job.desc.Size
}
case <-tc.C:
// TODO: Filter by references
active, err := j.cs.ListStatuses(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("failed to list statuses for progress")
}
sort.Slice(active, func(i, j int) bool {
return active[i].Ref < active[j].Ref
})
for dgst, job := range jobs {
if job.state != jobComplete {
idx := sort.Search(len(active), func(i int) bool { return active[i].Ref >= job.name })
if idx < len(active) && active[idx].Ref == job.name {
if active[idx].Offset > job.progress {
pf(transfer.Progress{
Event: "downloading",
Name: job.name,
Parents: job.parents,
//Digest: job.desc.Digest.String(),
Progress: active[idx].Offset,
Total: active[idx].Total,
})
job.progress = active[idx].Offset
job.state = jobInProgress
jobs[dgst] = job
}
} else {
_, err := j.cs.Info(ctx, job.desc.Digest)
if err == nil {
pf(transfer.Progress{
Event: "complete",
Name: job.name,
Parents: job.parents,
//Digest: job.desc.Digest.String(),
Progress: job.desc.Size,
Total: job.desc.Size,
})
}
job.state = jobComplete
jobs[dgst] = job
}
}
}
// Next timer?
case <-ctx.Done():
return
}
}
}
// Add adds a descriptor to be tracked
func (j *ProgressTracker) Add(desc ocispec.Descriptor) {
if j == nil {
return
}
j.added <- jobUpdate{
desc: desc,
}
}
func (j *ProgressTracker) MarkExists(desc ocispec.Descriptor) {
if j == nil {
return
}
j.added <- jobUpdate{
desc: desc,
exists: true,
}
}
// Adds hierarchy information
func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispec.Descriptor) {
if j == nil || len(children) == 0 {
return
}
j.parentL.Lock()
defer j.parentL.Unlock()
for _, child := range children {
j.parents[child.Digest] = append(j.parents[child.Digest], desc)
}
}
func (j *ProgressTracker) Wait() {
// timeout rather than rely on cancel
timeout := time.After(10 * time.Second)
select {
case <-timeout:
case <-j.waitC:
}
}

View File

@ -19,17 +19,20 @@ package local
import (
"context"
"fmt"
"io"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
@ -74,8 +77,16 @@ func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, d
case transfer.ImageStorer:
return ts.pull(ctx, s, d, topts)
}
case transfer.ImageImportStreamer:
switch d := dest.(type) {
case transfer.ImageExportStreamer:
return ts.echo(ctx, s, d, topts)
// Image import
// case transfer.ImageStorer
}
return fmt.Errorf("Unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented)
}
return fmt.Errorf("unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented)
}
func name(t interface{}) string {
@ -89,6 +100,25 @@ func name(t interface{}) string {
}
}
// echo is mostly used for testing, it implements an import->export which is
// a no-op which only roundtrips the bytes.
func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImportStreamer, e transfer.ImageExportStreamer, tops *transfer.TransferOpts) error {
r, err := i.ImportStream(ctx)
if err != nil {
return err
}
wc, err := e.ExportStream(ctx)
if err != nil {
return err
}
// TODO: Use fixed buffer? Send write progress?
_, err = io.Copy(wc, r)
if werr := wc.Close(); werr != nil && err == nil {
err = werr
}
return err
}
func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error {
// TODO: Attach lease if doesn't have one
@ -102,6 +132,11 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol
// - Platform to Snapshotter
// - Child label map
// - All metdata?
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Resolving from %s", ir),
})
}
name, desc, err := ir.Resolve(ctx)
if err != nil {
@ -112,6 +147,18 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol
return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument)
}
// TODO: Handle already exists
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Pulling from %s", ir),
})
tops.Progress(transfer.Progress{
Event: "fetching image content",
Name: name,
//Digest: img.Target.Digest.String(),
})
}
fetcher, err := ir.Fetcher(ctx, name)
if err != nil {
return fmt.Errorf("failed to get fetcher for %q: %w", name, err)
@ -126,7 +173,18 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol
hasMediaTypeBug1622 bool
store = ts.content
progressTracker *ProgressTracker
)
if tops.Progress != nil {
progressTracker = NewProgressTracker(name, store) //Pass in first name as root
go progressTracker.HandleProgress(ctx, tops.Progress)
defer progressTracker.Wait()
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
//func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc {
//func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) {
@ -158,16 +216,35 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol
return err
}
// TODO: Support set of base handlers from configuration or image store
// Progress handlers?
handlers := []images.Handler{
remotes.FetchHandler(store, fetcher),
checkNeedsFix,
childrenHandler,
appendDistSrcLabelHandler,
// TODO: Allow initialization from configuration
baseHandlers := []images.Handler{}
if tops.Progress != nil {
baseHandlers = append(baseHandlers, images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
progressTracker.Add(desc)
return []ocispec.Descriptor{}, nil
},
))
baseChildrenHandler := childrenHandler
childrenHandler = images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (children []ocispec.Descriptor, err error) {
children, err = baseChildrenHandler(ctx, desc)
if err != nil {
return
}
progressTracker.AddChildren(desc, children)
return
})
}
handler = images.Handlers(handlers...)
handler = images.Handlers(append(baseHandlers,
fetchHandler(store, fetcher, progressTracker),
checkNeedsFix,
childrenHandler, // List children to track hierachy
appendDistSrcLabelHandler,
)...)
// TODO: Should available platforms be a configuration of the service?
// First find suitable platforms to unpack into
@ -189,20 +266,15 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
}
defer func() {
// TODO: This needs to be tigher scoped...
if _, err := unpacker.Wait(); err != nil {
//if retErr == nil {
// retErr = fmt.Errorf("unpack: %w", err)
//}
}
}()
handler = unpacker.Unpack(handler)
}
}
if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil {
// TODO: Cancel unpack and wait?
if unpacker != nil {
// wait for unpacker to cleanup
unpacker.Wait()
}
return err
}
@ -222,12 +294,46 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol
}
}
_, err = is.Store(ctx, desc)
img, err := is.Store(ctx, desc)
if err != nil {
return err
}
// TODO: Send status update for created image
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "saved",
Name: img.Name,
//Digest: img.Target.Digest.String(),
})
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Completed pull from %s", ir),
})
}
return nil
}
func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *ProgressTracker) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
switch desc.MediaType {
case images.MediaTypeDockerSchema1Manifest:
return nil, fmt.Errorf("%v not supported", desc.MediaType)
default:
err := remotes.Fetch(ctx, ingester, fetcher, desc)
if errdefs.IsAlreadyExists(err) {
pt.MarkExists(desc)
return nil, nil
}
return nil, err
}
}
}

View File

@ -20,6 +20,7 @@ import (
"context"
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/typeurl"
"google.golang.org/protobuf/types/known/anypb"
@ -27,22 +28,24 @@ import (
type proxyTransferer struct {
client transferapi.TransferClient
streamManager streaming.StreamManager
}
// NewTransferer returns a new transferr which communicates over a GRPC
// connection using the containerd transfer API
func NewTransferer(client transferapi.TransferClient) transfer.Transferer {
func NewTransferer(client transferapi.TransferClient, sm streaming.StreamManager) transfer.Transferer {
return &proxyTransferer{
client: client,
streamManager: sm,
}
}
func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error {
asrc, err := typeurl.MarshalAny(src)
asrc, err := p.marshalAny(ctx, src)
if err != nil {
return err
}
adst, err := typeurl.MarshalAny(dst)
adst, err := p.marshalAny(ctx, dst)
if err != nil {
return err
}
@ -61,3 +64,14 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int
_, err = p.client.Transfer(ctx, req)
return err
}
func (p *proxyTransferer) marshalAny(ctx context.Context, i interface{}) (typeurl.Any, error) {
switch m := i.(type) {
case streamMarshaler:
return m.MarshalAny(ctx, p.streamManager)
}
return typeurl.MarshalAny(i)
}
type streamMarshaler interface {
MarshalAny(context.Context, streaming.StreamManager) (typeurl.Any, error)
}

View File

@ -44,6 +44,8 @@ type ImageFilterer interface {
ImageFilter(images.HandlerFunc) images.HandlerFunc
}
// ImageStorer is a type which is capable of storing an image to
// for a provided descriptor
type ImageStorer interface {
Store(context.Context, ocispec.Descriptor) (images.Image, error)
}
@ -53,21 +55,27 @@ type ImageUnpacker interface {
UnpackPlatforms() []unpack.Platform
}
type ProgressFunc func(Progress)
type TransferOpts struct {
Progress ProgressFunc
}
type Opt func(*TransferOpts)
func WithProgress() Opt {
return nil
func WithProgress(f ProgressFunc) Opt {
return func(opts *TransferOpts) {
opts.Progress = f
}
}
type Progress struct {
Event string
Name string
Digest string
Parents []string
Progress int64
Total int64
// Descriptor?
}
/*

View File

@ -100,20 +100,21 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc
case images.MediaTypeDockerSchema1Manifest:
return nil, fmt.Errorf("%v not supported", desc.MediaType)
default:
err := fetch(ctx, ingester, fetcher, desc)
err := Fetch(ctx, ingester, fetcher, desc)
if errdefs.IsAlreadyExists(err) {
return nil, nil
}
return nil, err
}
}
}
func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
// Fetch fetches the given digest into the provided ingester
func Fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error {
log.G(ctx).Debug("fetch")
cw, err := content.OpenWriter(ctx, ingester, content.WithRef(MakeRefKey(ctx, desc)), content.WithDescriptor(desc))
if err != nil {
if errdefs.IsAlreadyExists(err) {
return nil
}
return err
}
defer cw.Close()
@ -135,7 +136,7 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc
if err != nil && !errdefs.IsAlreadyExists(err) {
return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err)
}
return nil
return err
}
rc, err := fetcher.Fetch(ctx, desc)