diff --git a/pkg/transfer/image/local.go b/pkg/transfer/image/local.go index 699b5565c..25f9428fe 100644 --- a/pkg/transfer/image/local.go +++ b/pkg/transfer/image/local.go @@ -19,47 +19,84 @@ package transfer import ( "context" "fmt" + "net/http" - "github.com/containerd/containerd/api/types/transfer" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/pkg/streaming" + "github.com/containerd/containerd/pkg/transfer" "github.com/containerd/containerd/pkg/unpack" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" "github.com/containerd/typeurl" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) // TODO: Should a factory be exposed here as a service?? -func NewOCIRegistryFromProto(p *transfer.OCIRegistry, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry { +/* +func NewOCIRegistryFromProto(p *transferapi.OCIRegistry, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry { //transfer.OCIRegistry // Create resolver // Convert auth stream to credential manager return &OCIRegistry{ reference: p.Reference, resolver: resolver, - streams: sm, + } +} +*/ + +// Initialize with hosts, authorizer callback, and headers +func NewOCIRegistry(ref string, headers http.Header, creds CredentialHelper) *OCIRegistry { + // Create an authorizer + var ropts []docker.RegistryOpt + if creds != nil { + // TODO: Support bearer + authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) { + c, err := creds.GetCredentials(context.Background(), ref, host) + if err != nil { + return "", "", err + } + + return c.Username, c.Secret, nil + })) + ropts = append(ropts, docker.WithAuthorizer(authorizer)) + } + + // TODO: Apply local configuration, maybe dynamically create resolver when requested + resolver := docker.NewResolver(docker.ResolverOptions{ + Hosts: docker.ConfigureDefaultRegistries(ropts...), + Headers: headers, + }) + return &OCIRegistry{ + reference: ref, + headers: headers, + creds: creds, + resolver: resolver, } } -func NewOCIRegistry(ref string, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry { - // With options, stream, - // With streams? - return &OCIRegistry{ - reference: ref, - resolver: resolver, - streams: sm, - } +// From stream +type CredentialHelper interface { + GetCredentials(ctx context.Context, ref, host string) (Credentials, error) +} + +type Credentials struct { + Host string + Username string + Secret string + Bearer string } // OCI type OCIRegistry struct { reference string + headers http.Header + creds CredentialHelper + resolver remotes.Resolver - streams streaming.StreamManager // This could be an interface which returns resolver? // Resolver could also be a plug-able interface, to call out to a program to fetch? @@ -73,15 +110,49 @@ func (r *OCIRegistry) Image() string { return r.reference } -func (r *OCIRegistry) Resolver() remotes.Resolver { - return r.resolver +func (r *OCIRegistry) Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) { + return r.resolver.Resolve(ctx, r.reference) } -func (r *OCIRegistry) ToProto() typeurl.Any { - // Might need more context to convert to proto - // Need access to a stream manager - // Service provider - return nil +func (r *OCIRegistry) Fetcher(ctx context.Context, ref string) (transfer.Fetcher, error) { + return r.resolver.Fetcher(ctx, ref) +} + +func (r *OCIRegistry) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) { + if r.creds != nil { + // TODO: Unique stream ID + stream, err := sm.Get(ctx, "") + if err != nil { + return nil, err + } + go func() { + // Check for context cancellation as well + for { + select { + case <-ctx.Done(): + return + default: + } + + _, err := stream.Recv() + if err != nil { + // If not EOF, log error + return + } + // If closed, return + // Call creds helper + // Send response + } + + }() + // link creds to stream + } + + // Create API OCI Registry type + + // Marshal and return + + return nil, nil } type ImageStore struct { @@ -101,9 +172,11 @@ type ImageStore struct { unpacks []unpack.Platform } -func NewImageStore(image string) *ImageStore { +func NewImageStore(image string, cs content.Store, is images.Store) *ImageStore { return &ImageStore{ imageName: image, + images: is, + content: cs, } } diff --git a/pkg/transfer/local/progress.go b/pkg/transfer/local/progress.go new file mode 100644 index 000000000..e6ad64089 --- /dev/null +++ b/pkg/transfer/local/progress.go @@ -0,0 +1,221 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package local + +import ( + "context" + "sort" + "sync" + "time" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/pkg/transfer" + "github.com/containerd/containerd/remotes" + "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" +) + +type ProgressTracker struct { + root string + cs content.Store + added chan jobUpdate + waitC chan struct{} + + parents map[digest.Digest][]ocispec.Descriptor + parentL sync.Mutex +} + +type jobState uint8 + +const ( + jobAdded jobState = iota + jobInProgress + jobComplete +) + +type jobStatus struct { + state jobState + name string + parents []string + progress int64 + desc ocispec.Descriptor +} + +type jobUpdate struct { + desc ocispec.Descriptor + exists bool + //children []ocispec.Descriptor +} + +// NewProgressTracker tracks content download progress +func NewProgressTracker(root string, cs content.Store) *ProgressTracker { + return &ProgressTracker{ + root: root, + cs: cs, + added: make(chan jobUpdate, 1), + waitC: make(chan struct{}), + parents: map[digest.Digest][]ocispec.Descriptor{}, + } +} + +func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.ProgressFunc) { + defer close(j.waitC) + // Instead of ticker, just delay + jobs := map[digest.Digest]*jobStatus{} + tc := time.NewTicker(time.Millisecond * 200) + for { + select { + case update := <-j.added: + job, ok := jobs[update.desc.Digest] + if !ok { + + // Only captures the parents defined before, + // could handle parent updates in same thread + // if there is a synchronization issue + var parents []string + j.parentL.Lock() + for _, parent := range j.parents[update.desc.Digest] { + parents = append(parents, remotes.MakeRefKey(ctx, parent)) + } + j.parentL.Unlock() + if len(parents) == 0 { + parents = []string{j.root} + } + name := remotes.MakeRefKey(ctx, update.desc) + + job = &jobStatus{ + state: jobAdded, + name: name, + parents: parents, + desc: update.desc, + } + jobs[update.desc.Digest] = job + pf(transfer.Progress{ + Event: "waiting", + Name: name, + Parents: parents, + //Digest: desc.Digest.String(), + Progress: 0, + Total: update.desc.Size, + }) + } + if update.exists { + pf(transfer.Progress{ + Event: "already exists", + Name: remotes.MakeRefKey(ctx, update.desc), + Progress: update.desc.Size, + Total: update.desc.Size, + }) + job.state = jobComplete + job.progress = job.desc.Size + } + + case <-tc.C: + // TODO: Filter by references + active, err := j.cs.ListStatuses(ctx) + if err != nil { + log.G(ctx).WithError(err).Error("failed to list statuses for progress") + } + sort.Slice(active, func(i, j int) bool { + return active[i].Ref < active[j].Ref + }) + + for dgst, job := range jobs { + if job.state != jobComplete { + idx := sort.Search(len(active), func(i int) bool { return active[i].Ref >= job.name }) + if idx < len(active) && active[idx].Ref == job.name { + if active[idx].Offset > job.progress { + pf(transfer.Progress{ + Event: "downloading", + Name: job.name, + Parents: job.parents, + //Digest: job.desc.Digest.String(), + Progress: active[idx].Offset, + Total: active[idx].Total, + }) + job.progress = active[idx].Offset + job.state = jobInProgress + jobs[dgst] = job + } + } else { + _, err := j.cs.Info(ctx, job.desc.Digest) + if err == nil { + pf(transfer.Progress{ + Event: "complete", + Name: job.name, + Parents: job.parents, + //Digest: job.desc.Digest.String(), + Progress: job.desc.Size, + Total: job.desc.Size, + }) + + } + job.state = jobComplete + jobs[dgst] = job + } + } + } + // Next timer? + case <-ctx.Done(): + return + } + } +} + +// Add adds a descriptor to be tracked +func (j *ProgressTracker) Add(desc ocispec.Descriptor) { + if j == nil { + return + } + j.added <- jobUpdate{ + desc: desc, + } +} + +func (j *ProgressTracker) MarkExists(desc ocispec.Descriptor) { + if j == nil { + return + } + j.added <- jobUpdate{ + desc: desc, + exists: true, + } + +} + +// Adds hierarchy information +func (j *ProgressTracker) AddChildren(desc ocispec.Descriptor, children []ocispec.Descriptor) { + if j == nil || len(children) == 0 { + return + } + j.parentL.Lock() + defer j.parentL.Unlock() + for _, child := range children { + j.parents[child.Digest] = append(j.parents[child.Digest], desc) + } + +} + +func (j *ProgressTracker) Wait() { + // timeout rather than rely on cancel + timeout := time.After(10 * time.Second) + select { + case <-timeout: + case <-j.waitC: + } +} diff --git a/pkg/transfer/local/transfer.go b/pkg/transfer/local/transfer.go index 2a8fd9e14..cf22f662a 100644 --- a/pkg/transfer/local/transfer.go +++ b/pkg/transfer/local/transfer.go @@ -19,17 +19,20 @@ package local import ( "context" "fmt" + "io" "github.com/containerd/containerd/content" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/pkg/transfer" "github.com/containerd/containerd/pkg/unpack" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" "github.com/containerd/typeurl" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/sirupsen/logrus" "golang.org/x/sync/semaphore" ) @@ -74,8 +77,16 @@ func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, d case transfer.ImageStorer: return ts.pull(ctx, s, d, topts) } + case transfer.ImageImportStreamer: + switch d := dest.(type) { + case transfer.ImageExportStreamer: + return ts.echo(ctx, s, d, topts) + + // Image import + // case transfer.ImageStorer + } } - return fmt.Errorf("Unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented) + return fmt.Errorf("unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented) } func name(t interface{}) string { @@ -89,6 +100,25 @@ func name(t interface{}) string { } } +// echo is mostly used for testing, it implements an import->export which is +// a no-op which only roundtrips the bytes. +func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImportStreamer, e transfer.ImageExportStreamer, tops *transfer.TransferOpts) error { + r, err := i.ImportStream(ctx) + if err != nil { + return err + } + wc, err := e.ExportStream(ctx) + if err != nil { + return err + } + + // TODO: Use fixed buffer? Send write progress? + _, err = io.Copy(wc, r) + if werr := wc.Close(); werr != nil && err == nil { + err = werr + } + return err +} func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error { // TODO: Attach lease if doesn't have one @@ -102,6 +132,11 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol // - Platform to Snapshotter // - Child label map // - All metdata? + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Resolving from %s", ir), + }) + } name, desc, err := ir.Resolve(ctx) if err != nil { @@ -112,6 +147,18 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument) } + // TODO: Handle already exists + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Pulling from %s", ir), + }) + tops.Progress(transfer.Progress{ + Event: "fetching image content", + Name: name, + //Digest: img.Target.Digest.String(), + }) + } + fetcher, err := ir.Fetcher(ctx, name) if err != nil { return fmt.Errorf("failed to get fetcher for %q: %w", name, err) @@ -125,8 +172,19 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol // has a config media type bug (distribution#1622) hasMediaTypeBug1622 bool - store = ts.content + store = ts.content + progressTracker *ProgressTracker ) + + if tops.Progress != nil { + progressTracker = NewProgressTracker(name, store) //Pass in first name as root + go progressTracker.HandleProgress(ctx, tops.Progress) + defer progressTracker.Wait() + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + //func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc { //func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) { @@ -158,16 +216,35 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol return err } - // TODO: Support set of base handlers from configuration or image store - // Progress handlers? - handlers := []images.Handler{ - remotes.FetchHandler(store, fetcher), - checkNeedsFix, - childrenHandler, - appendDistSrcLabelHandler, + // TODO: Allow initialization from configuration + baseHandlers := []images.Handler{} + + if tops.Progress != nil { + baseHandlers = append(baseHandlers, images.HandlerFunc( + func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { + progressTracker.Add(desc) + + return []ocispec.Descriptor{}, nil + }, + )) + + baseChildrenHandler := childrenHandler + childrenHandler = images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (children []ocispec.Descriptor, err error) { + children, err = baseChildrenHandler(ctx, desc) + if err != nil { + return + } + progressTracker.AddChildren(desc, children) + return + }) } - handler = images.Handlers(handlers...) + handler = images.Handlers(append(baseHandlers, + fetchHandler(store, fetcher, progressTracker), + checkNeedsFix, + childrenHandler, // List children to track hierachy + appendDistSrcLabelHandler, + )...) // TODO: Should available platforms be a configuration of the service? // First find suitable platforms to unpack into @@ -189,20 +266,15 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol if err != nil { return fmt.Errorf("unable to initialize unpacker: %w", err) } - defer func() { - // TODO: This needs to be tigher scoped... - if _, err := unpacker.Wait(); err != nil { - //if retErr == nil { - // retErr = fmt.Errorf("unpack: %w", err) - //} - } - }() handler = unpacker.Unpack(handler) } } if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil { - // TODO: Cancel unpack and wait? + if unpacker != nil { + // wait for unpacker to cleanup + unpacker.Wait() + } return err } @@ -222,12 +294,46 @@ func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResol } } - _, err = is.Store(ctx, desc) + img, err := is.Store(ctx, desc) if err != nil { return err } - // TODO: Send status update for created image + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: "saved", + Name: img.Name, + //Digest: img.Target.Digest.String(), + }) + } + + if tops.Progress != nil { + tops.Progress(transfer.Progress{ + Event: fmt.Sprintf("Completed pull from %s", ir), + }) + } return nil } + +func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *ProgressTracker) images.HandlerFunc { + return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) { + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{ + "digest": desc.Digest, + "mediatype": desc.MediaType, + "size": desc.Size, + })) + + switch desc.MediaType { + case images.MediaTypeDockerSchema1Manifest: + return nil, fmt.Errorf("%v not supported", desc.MediaType) + default: + err := remotes.Fetch(ctx, ingester, fetcher, desc) + if errdefs.IsAlreadyExists(err) { + pt.MarkExists(desc) + return nil, nil + } + return nil, err + } + } +} diff --git a/pkg/transfer/proxy/transfer.go b/pkg/transfer/proxy/transfer.go index fcfd78720..f6e677aad 100644 --- a/pkg/transfer/proxy/transfer.go +++ b/pkg/transfer/proxy/transfer.go @@ -20,29 +20,32 @@ import ( "context" transferapi "github.com/containerd/containerd/api/services/transfer/v1" + "github.com/containerd/containerd/pkg/streaming" "github.com/containerd/containerd/pkg/transfer" "github.com/containerd/typeurl" "google.golang.org/protobuf/types/known/anypb" ) type proxyTransferer struct { - client transferapi.TransferClient + client transferapi.TransferClient + streamManager streaming.StreamManager } // NewTransferer returns a new transferr which communicates over a GRPC // connection using the containerd transfer API -func NewTransferer(client transferapi.TransferClient) transfer.Transferer { +func NewTransferer(client transferapi.TransferClient, sm streaming.StreamManager) transfer.Transferer { return &proxyTransferer{ - client: client, + client: client, + streamManager: sm, } } func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error { - asrc, err := typeurl.MarshalAny(src) + asrc, err := p.marshalAny(ctx, src) if err != nil { return err } - adst, err := typeurl.MarshalAny(dst) + adst, err := p.marshalAny(ctx, dst) if err != nil { return err } @@ -61,3 +64,14 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int _, err = p.client.Transfer(ctx, req) return err } +func (p *proxyTransferer) marshalAny(ctx context.Context, i interface{}) (typeurl.Any, error) { + switch m := i.(type) { + case streamMarshaler: + return m.MarshalAny(ctx, p.streamManager) + } + return typeurl.MarshalAny(i) +} + +type streamMarshaler interface { + MarshalAny(context.Context, streaming.StreamManager) (typeurl.Any, error) +} diff --git a/pkg/transfer/transfer.go b/pkg/transfer/transfer.go index aa5fcdda2..5a8bb6581 100644 --- a/pkg/transfer/transfer.go +++ b/pkg/transfer/transfer.go @@ -44,6 +44,8 @@ type ImageFilterer interface { ImageFilter(images.HandlerFunc) images.HandlerFunc } +// ImageStorer is a type which is capable of storing an image to +// for a provided descriptor type ImageStorer interface { Store(context.Context, ocispec.Descriptor) (images.Image, error) } @@ -53,21 +55,27 @@ type ImageUnpacker interface { UnpackPlatforms() []unpack.Platform } +type ProgressFunc func(Progress) + type TransferOpts struct { + Progress ProgressFunc } type Opt func(*TransferOpts) -func WithProgress() Opt { - return nil +func WithProgress(f ProgressFunc) Opt { + return func(opts *TransferOpts) { + opts.Progress = f + } } type Progress struct { Event string Name string - Digest string + Parents []string Progress int64 Total int64 + // Descriptor? } /* diff --git a/remotes/handlers.go b/remotes/handlers.go index 4d91ed2e5..ec7bbe041 100644 --- a/remotes/handlers.go +++ b/remotes/handlers.go @@ -100,20 +100,21 @@ func FetchHandler(ingester content.Ingester, fetcher Fetcher) images.HandlerFunc case images.MediaTypeDockerSchema1Manifest: return nil, fmt.Errorf("%v not supported", desc.MediaType) default: - err := fetch(ctx, ingester, fetcher, desc) + err := Fetch(ctx, ingester, fetcher, desc) + if errdefs.IsAlreadyExists(err) { + return nil, nil + } return nil, err } } } -func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { +// Fetch fetches the given digest into the provided ingester +func Fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc ocispec.Descriptor) error { log.G(ctx).Debug("fetch") cw, err := content.OpenWriter(ctx, ingester, content.WithRef(MakeRefKey(ctx, desc)), content.WithDescriptor(desc)) if err != nil { - if errdefs.IsAlreadyExists(err) { - return nil - } return err } defer cw.Close() @@ -135,7 +136,7 @@ func fetch(ctx context.Context, ingester content.Ingester, fetcher Fetcher, desc if err != nil && !errdefs.IsAlreadyExists(err) { return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err) } - return nil + return err } rc, err := fetcher.Fetch(ctx, desc)