Update transfer packages

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan 2022-08-19 15:34:08 -07:00
parent 2a8d7a744b
commit 6b5df1ee16
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
13 changed files with 1007 additions and 890 deletions

View File

@ -20,13 +20,20 @@ import (
"context"
"io"
transferapi "github.com/containerd/containerd/api/types/transfer"
transfertypes "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer/plugins"
tstreaming "github.com/containerd/containerd/pkg/transfer/streaming"
"github.com/containerd/typeurl"
)
func init() {
// TODO: Move this to seperate package?
plugins.Register(&transfertypes.ImageExportStream{}, &ImageExportStream{})
plugins.Register(&transfertypes.ImageImportStream{}, &ImageImportStream{})
}
// NewImageImportStream returns a image importer via tar stream
// TODO: Add import options
func NewImageExportStream(stream io.WriteCloser) *ImageExportStream {
@ -43,11 +50,9 @@ func (iis *ImageExportStream) ExportStream(context.Context) (io.WriteCloser, err
return iis.stream, nil
}
func (iis *ImageExportStream) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) {
// TODO: Unique stream ID
sid := "randomid"
// TODO: Should this API be create instead of get
stream, err := sm.Get(ctx, sid)
func (iis *ImageExportStream) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) {
sid := tstreaming.GenerateID("export")
stream, err := sm.Create(ctx, sid)
if err != nil {
return nil, err
}
@ -60,28 +65,26 @@ func (iis *ImageExportStream) MarshalAny(ctx context.Context, sm streaming.Strea
iis.stream.Close()
}()
s := &transferapi.ImageExportStream{
s := &transfertypes.ImageExportStream{
Stream: sid,
}
return typeurl.MarshalAny(s)
}
func (iis *ImageExportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamManager, any typeurl.Any) error {
var s transferapi.ImageExportStream
func (iis *ImageExportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, any typeurl.Any) error {
var s transfertypes.ImageExportStream
if err := typeurl.UnmarshalTo(any, &s); err != nil {
return err
}
stream, err := sm.Get(ctx, s.Stream)
if err != nil {
log.G(ctx).WithError(err).WithField("stream", s.Stream).Debug("failed to get export stream")
return err
}
r, w := io.Pipe()
tstreaming.SendStream(ctx, r, stream)
iis.stream = w
iis.stream = tstreaming.WriteByteStream(ctx, stream)
return nil
}

View File

@ -21,6 +21,7 @@ import (
"io"
transferapi "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/streaming"
tstreaming "github.com/containerd/containerd/pkg/transfer/streaming"
"github.com/containerd/typeurl"
@ -42,11 +43,9 @@ func (iis *ImageImportStream) ImportStream(context.Context) (io.Reader, error) {
return iis.stream, nil
}
func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) {
// TODO: Unique stream ID
sid := "randomid"
// TODO: Should this API be create instead of get
stream, err := sm.Get(ctx, sid)
func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) {
sid := tstreaming.GenerateID("import")
stream, err := sm.Create(ctx, sid)
if err != nil {
return nil, err
}
@ -59,7 +58,7 @@ func (iis *ImageImportStream) MarshalAny(ctx context.Context, sm streaming.Strea
return typeurl.MarshalAny(s)
}
func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamManager, any typeurl.Any) error {
func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, any typeurl.Any) error {
var s transferapi.ImageImportStream
if err := typeurl.UnmarshalTo(any, &s); err != nil {
return err
@ -67,6 +66,7 @@ func (iis *ImageImportStream) UnmarshalAny(ctx context.Context, sm streaming.Str
stream, err := sm.Get(ctx, s.Stream)
if err != nil {
log.G(ctx).WithError(err).WithField("stream", s.Stream).Debug("failed to get import stream")
return err
}

View File

@ -0,0 +1,140 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package image
import (
"context"
"fmt"
transfertypes "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer/plugins"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func init() {
// TODO: Move this to seperate package?
plugins.Register(&transfertypes.ImageStore{}, &ImageStore{}) // TODO: Rename ImageStoreDestination
}
type ImageStore struct {
// TODO: Put these configurations in object which can convert to/from any
// Embed generated type
imageName string
imageLabels map[string]string
platforms platforms.MatchComparer
allMetadata bool
labelMap func(ocispec.Descriptor) []string
manifestLimit int
// TODO: Convert these to unpack platforms
unpacks []unpack.Platform
}
func NewImageStore(image string) *ImageStore {
return &ImageStore{
imageName: image,
}
}
func (is *ImageStore) String() string {
return fmt.Sprintf("Local Image Store (%s)", is.imageName)
}
func (is *ImageStore) FilterHandler(h images.HandlerFunc, cs content.Store) images.HandlerFunc {
h = images.SetChildrenMappedLabels(cs, h, is.labelMap)
if is.allMetadata {
// Filter manifests by platforms but allow to handle manifest
// and configuration for not-target platforms
h = remotes.FilterManifestByPlatformHandler(h, is.platforms)
} else {
// Filter children by platforms if specified.
h = images.FilterPlatforms(h, is.platforms)
}
// Sort and limit manifests if a finite number is needed
if is.manifestLimit > 0 {
h = images.LimitManifests(h, is.platforms, is.manifestLimit)
}
return h
}
func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor, store images.Store) (images.Image, error) {
img := images.Image{
Name: is.imageName,
Target: desc,
Labels: is.imageLabels,
}
for {
if created, err := store.Create(ctx, img); err != nil {
if !errdefs.IsAlreadyExists(err) {
return images.Image{}, err
}
updated, err := store.Update(ctx, img)
if err != nil {
// if image was removed, try create again
if errdefs.IsNotFound(err) {
continue
}
return images.Image{}, err
}
img = updated
} else {
img = created
}
return img, nil
}
}
func (is *ImageStore) Get(ctx context.Context, store images.Store) (images.Image, error) {
return store.Get(ctx, is.imageName)
}
func (is *ImageStore) UnpackPlatforms() []unpack.Platform {
return is.unpacks
}
func (is *ImageStore) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) {
s := &transfertypes.ImageStore{
Name: is.imageName,
// TODO: Support other fields
}
return typeurl.MarshalAny(s)
}
func (is *ImageStore) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, a typeurl.Any) error {
var s transfertypes.ImageStore
if err := typeurl.UnmarshalTo(a, &s); err != nil {
return err
}
is.imageName = s.Name
// TODO: Support other fields
return nil
}

View File

@ -1,322 +0,0 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package transfer
import (
"context"
"fmt"
"net/http"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// TODO: Should a factory be exposed here as a service??
/*
func NewOCIRegistryFromProto(p *transferapi.OCIRegistry, resolver remotes.Resolver, sm streaming.StreamManager) *OCIRegistry {
//transfer.OCIRegistry
// Create resolver
// Convert auth stream to credential manager
return &OCIRegistry{
reference: p.Reference,
resolver: resolver,
}
}
*/
// Initialize with hosts, authorizer callback, and headers
func NewOCIRegistry(ref string, headers http.Header, creds CredentialHelper) *OCIRegistry {
// Create an authorizer
var ropts []docker.RegistryOpt
if creds != nil {
// TODO: Support bearer
authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) {
c, err := creds.GetCredentials(context.Background(), ref, host)
if err != nil {
return "", "", err
}
return c.Username, c.Secret, nil
}))
ropts = append(ropts, docker.WithAuthorizer(authorizer))
}
// TODO: Apply local configuration, maybe dynamically create resolver when requested
resolver := docker.NewResolver(docker.ResolverOptions{
Hosts: docker.ConfigureDefaultRegistries(ropts...),
Headers: headers,
})
return &OCIRegistry{
reference: ref,
headers: headers,
creds: creds,
resolver: resolver,
}
}
// From stream
type CredentialHelper interface {
GetCredentials(ctx context.Context, ref, host string) (Credentials, error)
}
type Credentials struct {
Host string
Username string
Secret string
Bearer string
}
// OCI
type OCIRegistry struct {
reference string
headers http.Header
creds CredentialHelper
resolver remotes.Resolver
// This could be an interface which returns resolver?
// Resolver could also be a plug-able interface, to call out to a program to fetch?
}
func (r *OCIRegistry) String() string {
return fmt.Sprintf("OCI Registry (%s)", r.reference)
}
func (r *OCIRegistry) Image() string {
return r.reference
}
func (r *OCIRegistry) Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) {
return r.resolver.Resolve(ctx, r.reference)
}
func (r *OCIRegistry) Fetcher(ctx context.Context, ref string) (transfer.Fetcher, error) {
return r.resolver.Fetcher(ctx, ref)
}
func (r *OCIRegistry) MarshalAny(ctx context.Context, sm streaming.StreamManager) (typeurl.Any, error) {
if r.creds != nil {
// TODO: Unique stream ID
stream, err := sm.Get(ctx, "")
if err != nil {
return nil, err
}
go func() {
// Check for context cancellation as well
for {
select {
case <-ctx.Done():
return
default:
}
_, err := stream.Recv()
if err != nil {
// If not EOF, log error
return
}
// If closed, return
// Call creds helper
// Send response
}
}()
// link creds to stream
}
// Create API OCI Registry type
// Marshal and return
return nil, nil
}
type ImageStore struct {
// TODO: Put these configurations in object which can convert to/from any
// Embed generated type
imageName string
imageLabels map[string]string
platforms platforms.MatchComparer
allMetadata bool
labelMap func(ocispec.Descriptor) []string
manifestLimit int
images images.Store
content content.Store
// TODO: Convert these to unpack platforms
unpacks []unpack.Platform
}
func NewImageStore(image string, cs content.Store, is images.Store) *ImageStore {
return &ImageStore{
imageName: image,
images: is,
content: cs,
}
}
func (is *ImageStore) String() string {
return fmt.Sprintf("Local Image Store (%s)", is.imageName)
}
func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc {
h = images.SetChildrenMappedLabels(is.content, h, is.labelMap)
if is.allMetadata {
// Filter manifests by platforms but allow to handle manifest
// and configuration for not-target platforms
h = remotes.FilterManifestByPlatformHandler(h, is.platforms)
} else {
// Filter children by platforms if specified.
h = images.FilterPlatforms(h, is.platforms)
}
// Sort and limit manifests if a finite number is needed
if is.manifestLimit > 0 {
h = images.LimitManifests(h, is.platforms, is.manifestLimit)
}
return h
}
func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) {
img := images.Image{
Name: is.imageName,
Target: desc,
Labels: is.imageLabels,
}
for {
if created, err := is.images.Create(ctx, img); err != nil {
if !errdefs.IsAlreadyExists(err) {
return images.Image{}, err
}
updated, err := is.images.Update(ctx, img)
if err != nil {
// if image was removed, try create again
if errdefs.IsNotFound(err) {
continue
}
return images.Image{}, err
}
img = updated
} else {
img = created
}
return img, nil
}
}
func (is *ImageStore) UnpackPlatforms() []unpack.Platform {
return is.unpacks
}
/*
type RemoteContext struct {
// Resolver is used to resolve names to objects, fetchers, and pushers.
// If no resolver is provided, defaults to Docker registry resolver.
Resolver remotes.Resolver
// PlatformMatcher is used to match the platforms for an image
// operation and define the preference when a single match is required
// from multiple platforms.
PlatformMatcher platforms.MatchComparer
// Unpack is done after an image is pulled to extract into a snapshotter.
// It is done simultaneously for schema 2 images when they are pulled.
// If an image is not unpacked on pull, it can be unpacked any time
// afterwards. Unpacking is required to run an image.
Unpack bool
// UnpackOpts handles options to the unpack call.
UnpackOpts []UnpackOpt
// Snapshotter used for unpacking
Snapshotter string
// SnapshotterOpts are additional options to be passed to a snapshotter during pull
SnapshotterOpts []snapshots.Opt
// Labels to be applied to the created image
Labels map[string]string
// BaseHandlers are a set of handlers which get are called on dispatch.
// These handlers always get called before any operation specific
// handlers.
BaseHandlers []images.Handler
// HandlerWrapper wraps the handler which gets sent to dispatch.
// Unlike BaseHandlers, this can run before and after the built
// in handlers, allowing operations to run on the descriptor
// after it has completed transferring.
HandlerWrapper func(images.Handler) images.Handler
// Platforms defines which platforms to handle when doing the image operation.
// Platforms is ignored when a PlatformMatcher is set, otherwise the
// platforms will be used to create a PlatformMatcher with no ordering
// preference.
Platforms []string
// MaxConcurrentDownloads is the max concurrent content downloads for each pull.
MaxConcurrentDownloads int
// MaxConcurrentUploadedLayers is the max concurrent uploaded layers for each push.
MaxConcurrentUploadedLayers int
// AllMetadata downloads all manifests and known-configuration files
AllMetadata bool
// ChildLabelMap sets the labels used to reference child objects in the content
// store. By default, all GC reference labels will be set for all fetched content.
ChildLabelMap func(ocispec.Descriptor) []string
}
*/
/*
// What should streamhandler look like?
type StreamHandler interface {
Authorize() error
Progress(key string, int64)
}
// Distribution options
// Stream handler
// Progress rate
// Unpack options
// Remote options
// Cases:
// Registry -> Content/ImageStore (pull)
// Registry -> Registry
// Content/ImageStore -> Registry (push)
// Content/ImageStore -> Content/ImageStore (tag)
// Common fetch/push interface for registry, content/imagestore, OCI index
// Always starts with string for source and destination, on client side, does not need to resolve
// Higher level implementation just takes strings and options
// Lower level implementation takes pusher/fetcher?
*/

View File

@ -0,0 +1,290 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package image
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"strings"
"sync"
transfertypes "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/transfer/plugins"
tstreaming "github.com/containerd/containerd/pkg/transfer/streaming"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
func init() {
// TODO: Move this to seperate package?
plugins.Register(&transfertypes.OCIRegistry{}, &OCIRegistry{})
}
// Initialize with hosts, authorizer callback, and headers
func NewOCIRegistry(ref string, headers http.Header, creds CredentialHelper) *OCIRegistry {
// Create an authorizer
var ropts []docker.RegistryOpt
if creds != nil {
// TODO: Support bearer
authorizer := docker.NewDockerAuthorizer(docker.WithAuthCreds(func(host string) (string, string, error) {
c, err := creds.GetCredentials(context.Background(), ref, host)
if err != nil {
return "", "", err
}
return c.Username, c.Secret, nil
}))
ropts = append(ropts, docker.WithAuthorizer(authorizer))
}
// TODO: Apply local configuration, maybe dynamically create resolver when requested
resolver := docker.NewResolver(docker.ResolverOptions{
Hosts: docker.ConfigureDefaultRegistries(ropts...),
Headers: headers,
})
return &OCIRegistry{
reference: ref,
headers: headers,
creds: creds,
resolver: resolver,
}
}
// From stream
type CredentialHelper interface {
GetCredentials(ctx context.Context, ref, host string) (Credentials, error)
}
type Credentials struct {
Host string
Username string
Secret string
Header string
}
// OCI
type OCIRegistry struct {
reference string
headers http.Header
creds CredentialHelper
resolver remotes.Resolver
// This could be an interface which returns resolver?
// Resolver could also be a plug-able interface, to call out to a program to fetch?
}
func (r *OCIRegistry) String() string {
return fmt.Sprintf("OCI Registry (%s)", r.reference)
}
func (r *OCIRegistry) Image() string {
return r.reference
}
func (r *OCIRegistry) Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error) {
return r.resolver.Resolve(ctx, r.reference)
}
func (r *OCIRegistry) Fetcher(ctx context.Context, ref string) (transfer.Fetcher, error) {
return r.resolver.Fetcher(ctx, ref)
}
func (r *OCIRegistry) Pusher(ctx context.Context, desc ocispec.Descriptor) (transfer.Pusher, error) {
var ref = r.reference
// Annotate ref with digest to push only push tag for single digest
if !strings.Contains(ref, "@") {
ref = ref + "@" + desc.Digest.String()
}
return r.resolver.Pusher(ctx, ref)
}
func (r *OCIRegistry) MarshalAny(ctx context.Context, sm streaming.StreamCreator) (typeurl.Any, error) {
res := &transfertypes.RegistryResolver{}
if r.headers != nil {
res.Headers = map[string]string{}
for k := range r.headers {
res.Headers[k] = r.headers.Get(k)
}
}
if r.creds != nil {
sid := tstreaming.GenerateID("creds")
stream, err := sm.Create(ctx, sid)
if err != nil {
return nil, err
}
go func() {
// Check for context cancellation as well
for {
select {
case <-ctx.Done():
return
default:
}
req, err := stream.Recv()
if err != nil {
// If not EOF, log error
return
}
var s transfertypes.AuthRequest
if err := typeurl.UnmarshalTo(req, &s); err != nil {
log.G(ctx).WithError(err).Error("failed to unmarshal credential request")
continue
}
creds, err := r.creds.GetCredentials(ctx, s.Reference, s.Host)
if err != nil {
log.G(ctx).WithError(err).Error("failed to get credentials")
continue
}
var resp transfertypes.AuthResponse
if creds.Header != "" {
resp.AuthType = transfertypes.AuthType_HEADER
resp.Secret = creds.Header
} else if creds.Username != "" {
resp.AuthType = transfertypes.AuthType_CREDENTIALS
resp.Username = creds.Username
resp.Secret = creds.Secret
} else {
resp.AuthType = transfertypes.AuthType_REFRESH
resp.Secret = creds.Secret
}
a, err := typeurl.MarshalAny(&resp)
if err != nil {
log.G(ctx).WithError(err).Error("failed to marshal credential response")
continue
}
if err := stream.Send(a); err != nil {
if !errors.Is(err, io.EOF) {
log.G(ctx).WithError(err).Error("unexpected send failure")
}
return
}
}
}()
res.AuthStream = sid
}
s := &transfertypes.OCIRegistry{
Reference: r.reference,
Resolver: res,
}
return typeurl.MarshalAny(s)
}
func (r *OCIRegistry) UnmarshalAny(ctx context.Context, sm streaming.StreamGetter, a typeurl.Any) error {
var (
s transfertypes.OCIRegistry
ropts []docker.RegistryOpt
aopts []docker.AuthorizerOpt
)
if err := typeurl.UnmarshalTo(a, &s); err != nil {
return err
}
if s.Resolver != nil {
if sid := s.Resolver.AuthStream; sid != "" {
stream, err := sm.Get(ctx, sid)
if err != nil {
log.G(ctx).WithError(err).WithField("stream", sid).Debug("failed to get auth stream")
return err
}
r.creds = &credCallback{
stream: stream,
}
aopts = append(aopts, docker.WithAuthCreds(func(host string) (string, string, error) {
c, err := r.creds.GetCredentials(context.Background(), s.Reference, host)
if err != nil {
return "", "", err
}
return c.Username, c.Secret, nil
}))
}
r.headers = http.Header{}
for k, v := range s.Resolver.Headers {
r.headers.Add(k, v)
}
}
authorizer := docker.NewDockerAuthorizer(aopts...)
ropts = append(ropts, docker.WithAuthorizer(authorizer))
r.reference = s.Reference
r.resolver = docker.NewResolver(docker.ResolverOptions{
Hosts: docker.ConfigureDefaultRegistries(ropts...),
Headers: r.headers,
})
return nil
}
type credCallback struct {
sync.Mutex
stream streaming.Stream
}
func (cc *credCallback) GetCredentials(ctx context.Context, ref, host string) (Credentials, error) {
cc.Lock()
defer cc.Unlock()
ar := &transfertypes.AuthRequest{
Host: host,
Reference: ref,
}
any, err := typeurl.MarshalAny(ar)
if err != nil {
return Credentials{}, err
}
if err := cc.stream.Send(any); err != nil {
return Credentials{}, err
}
resp, err := cc.stream.Recv()
if err != nil {
return Credentials{}, err
}
var s transfertypes.AuthResponse
if err := typeurl.UnmarshalTo(resp, &s); err != nil {
return Credentials{}, err
}
creds := Credentials{
Host: host,
}
switch s.AuthType {
case transfertypes.AuthType_CREDENTIALS:
creds.Username = s.Username
creds.Secret = s.Secret
case transfertypes.AuthType_REFRESH:
creds.Secret = s.Secret
case transfertypes.AuthType_HEADER:
creds.Header = s.Secret
}
return creds, nil
}

243
pkg/transfer/local/pull.go Normal file
View File

@ -0,0 +1,243 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"fmt"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)
func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageFetcher, is transfer.ImageStorer, tops *transfer.TransferOpts) error {
ctx, done, err := ts.withLease(ctx)
if err != nil {
return err
}
defer done(ctx)
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Resolving from %s", ir),
})
}
name, desc, err := ir.Resolve(ctx)
if err != nil {
return fmt.Errorf("failed to resolve image: %w", err)
}
if desc.MediaType == images.MediaTypeDockerSchema1Manifest {
// Explicitly call out schema 1 as deprecated and not supported
return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument)
}
// TODO: Handle already exists
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Pulling from %s", ir),
})
tops.Progress(transfer.Progress{
Event: "fetching image content",
Name: name,
//Digest: img.Target.Digest.String(),
})
}
fetcher, err := ir.Fetcher(ctx, name)
if err != nil {
return fmt.Errorf("failed to get fetcher for %q: %w", name, err)
}
var (
handler images.Handler
unpacker *unpack.Unpacker
// has a config media type bug (distribution#1622)
hasMediaTypeBug1622 bool
store = ts.content
progressTracker *ProgressTracker
)
if tops.Progress != nil {
progressTracker = NewProgressTracker(name, store) //Pass in first name as root
go progressTracker.HandleProgress(ctx, tops.Progress)
defer progressTracker.Wait()
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Get all the children for a descriptor
childrenHandler := images.ChildrenHandler(store)
if f, ok := is.(transfer.ImageFilterer); ok {
childrenHandler = f.ImageFilter(childrenHandler, store)
}
// Sort and limit manifests if a finite number is needed
//if limit > 0 {
// childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
//}
checkNeedsFix := images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// set to true if there is application/octet-stream media type
if desc.MediaType == docker.LegacyConfigMediaType {
hasMediaTypeBug1622 = true
}
return []ocispec.Descriptor{}, nil
},
)
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name)
if err != nil {
return err
}
// TODO: Allow initialization from configuration
baseHandlers := []images.Handler{}
if tops.Progress != nil {
baseHandlers = append(baseHandlers, images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
progressTracker.Add(desc)
return []ocispec.Descriptor{}, nil
},
))
baseChildrenHandler := childrenHandler
childrenHandler = images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (children []ocispec.Descriptor, err error) {
children, err = baseChildrenHandler(ctx, desc)
if err != nil {
return
}
progressTracker.AddChildren(desc, children)
return
})
}
handler = images.Handlers(append(baseHandlers,
fetchHandler(store, fetcher, progressTracker),
checkNeedsFix,
childrenHandler, // List children to track hierachy
appendDistSrcLabelHandler,
)...)
// TODO: Should available platforms be a configuration of the service?
// First find suitable platforms to unpack into
//if unpacker, ok := is.
if iu, ok := is.(transfer.ImageUnpacker); ok {
unpacks := iu.UnpackPlatforms()
if len(unpacks) > 0 {
uopts := []unpack.UnpackerOpt{}
for _, u := range unpacks {
uopts = append(uopts, unpack.WithUnpackPlatform(u))
}
if ts.limiter != nil {
uopts = append(uopts, unpack.WithLimiter(ts.limiter))
}
//if uconfig.DuplicationSuppressor != nil {
// uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor))
//}
unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...)
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
}
handler = unpacker.Unpack(handler)
}
}
if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil {
if unpacker != nil {
// wait for unpacker to cleanup
unpacker.Wait()
}
return err
}
// NOTE(fuweid): unpacker defers blobs download. before create image
// record in ImageService, should wait for unpacking(including blobs
// download).
if unpacker != nil {
if _, err = unpacker.Wait(); err != nil {
return err
}
// TODO: Check results to make sure unpack was successful
}
if hasMediaTypeBug1622 {
if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil {
return err
}
}
img, err := is.Store(ctx, desc, ts.images)
if err != nil {
return err
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "saved",
Name: img.Name,
//Digest: img.Target.Digest.String(),
})
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Completed pull from %s", ir),
})
}
return nil
}
func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *ProgressTracker) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
switch desc.MediaType {
case images.MediaTypeDockerSchema1Manifest:
return nil, fmt.Errorf("%v not supported", desc.MediaType)
default:
err := remotes.Fetch(ctx, ingester, fetcher, desc)
if errdefs.IsAlreadyExists(err) {
pt.MarkExists(desc)
return nil, nil
}
return nil, err
}
}
}

View File

@ -0,0 +1,79 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"context"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/remotes"
)
func (ts *localTransferService) push(ctx context.Context, ig transfer.ImageGetter, p transfer.ImagePusher, tops *transfer.TransferOpts) error {
/*
// TODO: Platform matching
if pushCtx.PlatformMatcher == nil {
if len(pushCtx.Platforms) > 0 {
var ps []ocispec.Platform
for _, platform := range pushCtx.Platforms {
p, err := platforms.Parse(platform)
if err != nil {
return fmt.Errorf("invalid platform %s: %w", platform, err)
}
ps = append(ps, p)
}
pushCtx.PlatformMatcher = platforms.Any(ps...)
} else {
pushCtx.PlatformMatcher = platforms.All
}
}
*/
matcher := platforms.All
// Filter push
img, err := ig.Get(ctx, ts.images)
if err != nil {
return err
}
pusher, err := p.Pusher(ctx, img.Target)
if err != nil {
return err
}
var wrapper func(images.Handler) images.Handler
/*
// TODO: Add handlers
if len(pushCtx.BaseHandlers) > 0 {
wrapper = func(h images.Handler) images.Handler {
h = images.Handlers(append(pushCtx.BaseHandlers, h)...)
if pushCtx.HandlerWrapper != nil {
h = pushCtx.HandlerWrapper(h)
}
return h
}
} else if pushCtx.HandlerWrapper != nil {
wrapper = pushCtx.HandlerWrapper
}
*/
return remotes.PushContent(ctx, pusher, img.Target, ts.content, ts.limiter, matcher, wrapper)
}

View File

@ -20,32 +20,26 @@ import (
"context"
"fmt"
"io"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
type localTransferService struct {
leases leases.Manager
content content.Store
images images.Store
// semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
limiter *semaphore.Weighted
// TODO: Duplication suppressor
// Metadata service (Or snapshotters, image, content)
// Diff
// Configuration
// - Max downloads
@ -55,10 +49,11 @@ type localTransferService struct {
// - Platform -> snapshotter defaults?
}
func NewTransferService(lm leases.Manager, cs content.Store) transfer.Transferer {
func NewTransferService(lm leases.Manager, cs content.Store, is images.Store) transfer.Transferer {
return &localTransferService{
leases: lm,
content: cs,
images: is,
}
}
@ -68,15 +63,18 @@ func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, d
opt(topts)
}
// Convert Any to real source/destination
// Figure out matrix of whether source destination combination is supported
switch s := src.(type) {
case transfer.ImageResolver:
case transfer.ImageFetcher:
switch d := dest.(type) {
case transfer.ImageStorer:
return ts.pull(ctx, s, d, topts)
}
case transfer.ImageGetter:
switch d := dest.(type) {
case transfer.ImagePusher:
return ts.push(ctx, s, d, topts)
}
case transfer.ImageImportStreamer:
switch d := dest.(type) {
case transfer.ImageExportStreamer:
@ -119,221 +117,33 @@ func (ts *localTransferService) echo(ctx context.Context, i transfer.ImageImport
}
return err
}
func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error {
// TODO: Attach lease if doesn't have one
// From source, need
// - resolver
// - image name
// WithLease attaches a lease on the context
func (ts *localTransferService) withLease(ctx context.Context, opts ...leases.Opt) (context.Context, func(context.Context) error, error) {
nop := func(context.Context) error { return nil }
// From destination, need
// - Image labels
// - Unpack information
// - Platform to Snapshotter
// - Child label map
// - All metdata?
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Resolving from %s", ir),
})
_, ok := leases.FromContext(ctx)
if ok {
return ctx, nop, nil
}
name, desc, err := ir.Resolve(ctx)
if err != nil {
return fmt.Errorf("failed to resolve image: %w", err)
}
if desc.MediaType == images.MediaTypeDockerSchema1Manifest {
// Explicitly call out schema 1 as deprecated and not supported
return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument)
}
ls := ts.leases
// TODO: Handle already exists
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Pulling from %s", ir),
})
tops.Progress(transfer.Progress{
Event: "fetching image content",
Name: name,
//Digest: img.Target.Digest.String(),
})
}
fetcher, err := ir.Fetcher(ctx, name)
if err != nil {
return fmt.Errorf("failed to get fetcher for %q: %w", name, err)
}
var (
handler images.Handler
unpacker *unpack.Unpacker
// has a config media type bug (distribution#1622)
hasMediaTypeBug1622 bool
store = ts.content
progressTracker *ProgressTracker
)
if tops.Progress != nil {
progressTracker = NewProgressTracker(name, store) //Pass in first name as root
go progressTracker.HandleProgress(ctx, tops.Progress)
defer progressTracker.Wait()
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
//func (is *ImageStore) FilterHandler(h images.HandlerFunc) images.HandlerFunc {
//func (is *ImageStore) Store(ctx context.Context, desc ocispec.Descriptor) (images.Image, error) {
// Get all the children for a descriptor
childrenHandler := images.ChildrenHandler(store)
if f, ok := is.(transfer.ImageFilterer); ok {
childrenHandler = f.ImageFilter(childrenHandler)
}
// Sort and limit manifests if a finite number is needed
//if limit > 0 {
// childrenHandler = images.LimitManifests(childrenHandler, rCtx.PlatformMatcher, limit)
//}
checkNeedsFix := images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// set to true if there is application/octet-stream media type
if desc.MediaType == docker.LegacyConfigMediaType {
hasMediaTypeBug1622 = true
}
return []ocispec.Descriptor{}, nil
},
)
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name)
if err != nil {
return err
}
// TODO: Allow initialization from configuration
baseHandlers := []images.Handler{}
if tops.Progress != nil {
baseHandlers = append(baseHandlers, images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
progressTracker.Add(desc)
return []ocispec.Descriptor{}, nil
},
))
baseChildrenHandler := childrenHandler
childrenHandler = images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) (children []ocispec.Descriptor, err error) {
children, err = baseChildrenHandler(ctx, desc)
if err != nil {
return
}
progressTracker.AddChildren(desc, children)
return
})
}
handler = images.Handlers(append(baseHandlers,
fetchHandler(store, fetcher, progressTracker),
checkNeedsFix,
childrenHandler, // List children to track hierachy
appendDistSrcLabelHandler,
)...)
// TODO: Should available platforms be a configuration of the service?
// First find suitable platforms to unpack into
//if unpacker, ok := is.
if iu, ok := is.(transfer.ImageUnpacker); ok {
unpacks := iu.UnpackPlatforms()
if len(unpacks) > 0 {
uopts := []unpack.UnpackerOpt{}
for _, u := range unpacks {
uopts = append(uopts, unpack.WithUnpackPlatform(u))
}
if ts.limiter != nil {
uopts = append(uopts, unpack.WithLimiter(ts.limiter))
}
//if uconfig.DuplicationSuppressor != nil {
// uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor))
//}
unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...)
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
}
handler = unpacker.Unpack(handler)
if len(opts) == 0 {
// Use default lease configuration if no options provided
opts = []leases.Opt{
leases.WithRandomID(),
leases.WithExpiration(24 * time.Hour),
}
}
if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil {
if unpacker != nil {
// wait for unpacker to cleanup
unpacker.Wait()
}
return err
}
// NOTE(fuweid): unpacker defers blobs download. before create image
// record in ImageService, should wait for unpacking(including blobs
// download).
if unpacker != nil {
if _, err = unpacker.Wait(); err != nil {
return err
}
// TODO: Check results to make sure unpack was successful
}
if hasMediaTypeBug1622 {
if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil {
return err
}
}
img, err := is.Store(ctx, desc)
l, err := ls.Create(ctx, opts...)
if err != nil {
return err
return ctx, nop, err
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: "saved",
Name: img.Name,
//Digest: img.Target.Digest.String(),
})
}
if tops.Progress != nil {
tops.Progress(transfer.Progress{
Event: fmt.Sprintf("Completed pull from %s", ir),
})
}
return nil
}
func fetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, pt *ProgressTracker) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) (subdescs []ocispec.Descriptor, err error) {
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(logrus.Fields{
"digest": desc.Digest,
"mediatype": desc.MediaType,
"size": desc.Size,
}))
switch desc.MediaType {
case images.MediaTypeDockerSchema1Manifest:
return nil, fmt.Errorf("%v not supported", desc.MediaType)
default:
err := remotes.Fetch(ctx, ingester, fetcher, desc)
if errdefs.IsAlreadyExists(err) {
pt.MarkExists(desc)
return nil, nil
}
return nil, err
}
}
ctx = leases.WithLease(ctx, l.ID)
return ctx, func(ctx context.Context) error {
return ls.Delete(ctx, l)
}, nil
}

View File

@ -18,29 +18,73 @@ package proxy
import (
"context"
"io"
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
transfertypes "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer"
tstreaming "github.com/containerd/containerd/pkg/transfer/streaming"
"github.com/containerd/typeurl"
"google.golang.org/protobuf/types/known/anypb"
)
type proxyTransferer struct {
client transferapi.TransferClient
streamManager streaming.StreamManager
streamCreator streaming.StreamCreator
}
// NewTransferer returns a new transferr which communicates over a GRPC
// connection using the containerd transfer API
func NewTransferer(client transferapi.TransferClient, sm streaming.StreamManager) transfer.Transferer {
func NewTransferer(client transferapi.TransferClient, sc streaming.StreamCreator) transfer.Transferer {
return &proxyTransferer{
client: client,
streamManager: sm,
streamCreator: sc,
}
}
func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error {
o := &transfer.TransferOpts{}
for _, opt := range opts {
opt(o)
}
apiOpts := &transferapi.TransferOptions{}
if o.Progress != nil {
sid := tstreaming.GenerateID("progress")
stream, err := p.streamCreator.Create(ctx, sid)
if err != nil {
return err
}
apiOpts.ProgressStream = sid
go func() {
for {
a, err := stream.Recv()
if err != nil {
if err != io.EOF {
log.G(ctx).WithError(err).Error("progress stream failed to recv")
}
return
}
i, err := typeurl.UnmarshalAny(a)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to unmarshal progress object: %v", a.GetTypeUrl())
}
switch v := i.(type) {
case *transfertypes.Progress:
o.Progress(transfer.Progress{
Event: v.Event,
Name: v.Name,
Parents: v.Parents,
Progress: v.Progress,
Total: v.Total,
})
default:
log.G(ctx).Warnf("unhandled progress object %T: %v", i, a.GetTypeUrl())
}
}
}()
}
asrc, err := p.marshalAny(ctx, src)
if err != nil {
return err
@ -49,7 +93,6 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int
if err != nil {
return err
}
// Resolve opts to
req := &transferapi.TransferRequest{
Source: &anypb.Any{
TypeUrl: asrc.GetTypeUrl(),
@ -59,7 +102,7 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int
TypeUrl: adst.GetTypeUrl(),
Value: adst.GetValue(),
},
// TODO: Options
Options: apiOpts,
}
_, err = p.client.Transfer(ctx, req)
return err
@ -67,11 +110,11 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int
func (p *proxyTransferer) marshalAny(ctx context.Context, i interface{}) (typeurl.Any, error) {
switch m := i.(type) {
case streamMarshaler:
return m.MarshalAny(ctx, p.streamManager)
return m.MarshalAny(ctx, p.streamCreator)
}
return typeurl.MarshalAny(i)
}
type streamMarshaler interface {
MarshalAny(context.Context, streaming.StreamManager) (typeurl.Any, error)
MarshalAny(context.Context, streaming.StreamCreator) (typeurl.Any, error)
}

View File

@ -20,6 +20,7 @@ import (
"context"
"io"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/unpack"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -31,23 +32,40 @@ type Transferer interface {
type ImageResolver interface {
Resolve(ctx context.Context) (name string, desc ocispec.Descriptor, err error)
}
type ImageFetcher interface {
ImageResolver
Fetcher(ctx context.Context, ref string) (Fetcher, error)
}
type ImagePusher interface {
Pusher(context.Context, ocispec.Descriptor) (Pusher, error)
}
type Fetcher interface {
Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
Fetch(context.Context, ocispec.Descriptor) (io.ReadCloser, error)
}
type Pusher interface {
Push(context.Context, ocispec.Descriptor) (content.Writer, error)
}
// ImageFilterer is used to filter out child objects of an image
type ImageFilterer interface {
ImageFilter(images.HandlerFunc) images.HandlerFunc
ImageFilter(images.HandlerFunc, content.Store) images.HandlerFunc
}
// ImageStorer is a type which is capable of storing an image to
// for a provided descriptor
type ImageStorer interface {
Store(context.Context, ocispec.Descriptor) (images.Image, error)
Store(context.Context, ocispec.Descriptor, images.Store) (images.Image, error)
}
// ImageGetter is type which returns an image from an image store
type ImageGetter interface {
Get(context.Context, images.Store) (images.Image, error)
}
// ImageImportStreamer returns an import streamer based on OCI or

View File

@ -17,27 +17,16 @@
package transfer
import (
"context"
"fmt"
ttypes "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/metadata"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/unpack"
"github.com/containerd/containerd/pkg/transfer/local"
"github.com/containerd/containerd/plugin"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/containerd/typeurl"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"golang.org/x/sync/semaphore"
_ "github.com/containerd/containerd/pkg/transfer/archive"
_ "github.com/containerd/containerd/pkg/transfer/image"
)
func init() {
plugin.Register(&plugin.Registration{
Type: plugin.TransferPlugin,
ID: "image",
@ -57,274 +46,12 @@ func init() {
return nil, err
}
// Map to url instance handler (typeurl.Any) interface{}
return &localTransferService{
leases: l.(leases.Manager),
content: ms.ContentStore(),
conversions: map[string]func(typeurl.Any) (interface{}, error){},
// // semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
// limiter *semaphore.Weighted
}, nil
return local.NewTransferService(l.(leases.Manager), ms.ContentStore(), metadata.NewImageStore(ms)), nil
},
})
}
type transferConfig struct {
// Max concurrent downloads
// Snapshotter platforms
}
// TODO: Move this to a local package with constructor arguments...?
type localTransferService struct {
leases leases.Manager
content content.Store
// semaphore.NewWeighted(int64(rCtx.MaxConcurrentDownloads))
limiter *semaphore.Weighted
conversions map[string]func(typeurl.Any) (interface{}, error)
// TODO: Duplication suppressor
// Metadata service (Or snapshotters, image, content)
// Diff
// Configuration
// - Max downloads
// - Max uploads
// Supported platforms
// - Platform -> snapshotter defaults?
// Type Resolver, support registration... For Any Type URL -> Constructor
}
// populatedConversions is used to map the typeurls to instance converstion functions,
// since the typeurls are derived from instances rather than types, they are
// calculated at runtime and populate the converstion map.
// Static mapping or offloading conversion to another plugin is probably a more
// ideal long term solution.
func (ts *localTransferService) populateConversions() error {
for _, c := range []struct {
instance interface{}
conversion func(typeurl.Any) (interface{}, error)
}{
{ttypes.ImageStoreDestination{}, ts.convertImageStoreDestination},
{ttypes.OCIRegistry{}, ts.convertOCIRegistry},
} {
u, err := typeurl.TypeURL(c.instance)
if err != nil {
return fmt.Errorf("unable to get type %T: %w", c.instance, err)
}
if _, ok := ts.conversions[u]; ok {
return fmt.Errorf("duplicate typeurl mapping: %s for %T", u, c.instance)
}
}
return nil
}
func (ts *localTransferService) convertImageStoreDestination(a typeurl.Any) (interface{}, error) {
var dest ttypes.ImageStoreDestination
if err := typeurl.UnmarshalTo(a, &dest); err != nil {
return nil, err
}
return nil, nil
}
func (ts *localTransferService) convertOCIRegistry(a typeurl.Any) (interface{}, error) {
var dest ttypes.OCIRegistry
if err := typeurl.UnmarshalTo(a, &dest); err != nil {
return nil, err
}
// TODO: Create credential callback
return nil, nil
}
func (ts *localTransferService) resolveType(a typeurl.Any) (interface{}, error) {
c, ok := ts.conversions[a.GetTypeUrl()]
if !ok {
return nil, fmt.Errorf("type %q not supported: %w", a.GetTypeUrl(), errdefs.ErrNotImplemented)
}
return c(a)
}
func (ts *localTransferService) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
topts := &transfer.TransferOpts{}
for _, opt := range opts {
opt(topts)
}
if a, ok := src.(typeurl.Any); ok {
r, err := ts.resolveType(a)
if err != nil {
return err
}
src = r
}
if a, ok := dest.(typeurl.Any); ok {
r, err := ts.resolveType(a)
if err != nil {
return err
}
dest = r
}
// Figure out matrix of whether source destination combination is supported
switch s := src.(type) {
case transfer.ImageResolver:
switch d := dest.(type) {
case transfer.ImageStorer:
return ts.pull(ctx, s, d, topts)
}
}
return fmt.Errorf("Unable to transfer from %s to %s: %w", name(src), name(dest), errdefs.ErrNotImplemented)
}
func name(t interface{}) string {
switch s := t.(type) {
case fmt.Stringer:
return s.String()
case typeurl.Any:
return s.GetTypeUrl()
default:
return fmt.Sprintf("%T", t)
}
}
func (ts *localTransferService) pull(ctx context.Context, ir transfer.ImageResolver, is transfer.ImageStorer, tops *transfer.TransferOpts) error {
// TODO: Attach lease if doesn't have one
// From source, need
// - resolver
// - image name
// From destination, need
// - Image labels
// - Unpack information
// - Platform to Snapshotter
// - Child label map
// - All metdata?
name, desc, err := ir.Resolve(ctx)
if err != nil {
return fmt.Errorf("failed to resolve image: %w", err)
}
if desc.MediaType == images.MediaTypeDockerSchema1Manifest {
// Explicitly call out schema 1 as deprecated and not supported
return fmt.Errorf("schema 1 image manifests are no longer supported: %w", errdefs.ErrInvalidArgument)
}
fetcher, err := ir.Fetcher(ctx, name)
if err != nil {
return fmt.Errorf("failed to get fetcher for %q: %w", name, err)
}
var (
handler images.Handler
unpacker *unpack.Unpacker
// has a config media type bug (distribution#1622)
hasMediaTypeBug1622 bool
store = ts.content
)
// Get all the children for a descriptor
childrenHandler := images.ChildrenHandler(store)
if hw, ok := is.(transfer.ImageFilterer); ok {
childrenHandler = hw.ImageFilter(childrenHandler)
}
// TODO: Move these to image store
//// TODO: This could easily be handled by having an ImageHandlerWrapper()
//// Set any children labels for that content
checkNeedsFix := images.HandlerFunc(
func(_ context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
// set to true if there is application/octet-stream media type
if desc.MediaType == docker.LegacyConfigMediaType {
hasMediaTypeBug1622 = true
}
return []ocispec.Descriptor{}, nil
},
)
appendDistSrcLabelHandler, err := docker.AppendDistributionSourceLabel(store, name)
if err != nil {
return err
}
// TODO: Support set of base handlers from configuration
// Progress handlers?
handlers := []images.Handler{
remotes.FetchHandler(store, fetcher),
checkNeedsFix,
childrenHandler,
appendDistSrcLabelHandler,
}
handler = images.Handlers(handlers...)
// TODO: Should available platforms be a configuration of the service?
// First find suitable platforms to unpack into
if u, ok := is.(transfer.ImageUnpacker); ok {
uopts := []unpack.UnpackerOpt{}
for _, u := range u.UnpackPlatforms() {
uopts = append(uopts, unpack.WithUnpackPlatform(u))
}
if ts.limiter != nil {
uopts = append(uopts, unpack.WithLimiter(ts.limiter))
}
//if uconfig.DuplicationSuppressor != nil {
// uopts = append(uopts, unpack.WithDuplicationSuppressor(uconfig.DuplicationSuppressor))
//}
unpacker, err = unpack.NewUnpacker(ctx, ts.content, uopts...)
if err != nil {
return fmt.Errorf("unable to initialize unpacker: %w", err)
}
defer func() {
// TODO: This needs to be tigher scoped...
if _, err := unpacker.Wait(); err != nil {
//if retErr == nil {
// retErr = fmt.Errorf("unpack: %w", err)
//}
}
}()
handler = unpacker.Unpack(handler)
}
if err := images.Dispatch(ctx, handler, ts.limiter, desc); err != nil {
// TODO: Cancel unpack and wait?
return err
}
// NOTE(fuweid): unpacker defers blobs download. before create image
// record in ImageService, should wait for unpacking(including blobs
// download).
if unpacker != nil {
if _, err = unpacker.Wait(); err != nil {
return err
}
// TODO: Check results to make sure unpack was successful
}
if hasMediaTypeBug1622 {
if desc, err = docker.ConvertManifest(ctx, store, desc); err != nil {
return err
}
}
_, err = is.Store(ctx, desc)
if err != nil {
return err
}
// TODO: Send status update
return nil
}

View File

@ -20,9 +20,15 @@ import (
"context"
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
transferTypes "github.com/containerd/containerd/api/types/transfer"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/transfer/plugins"
"github.com/containerd/containerd/plugin"
ptypes "github.com/containerd/containerd/protobuf/types"
"github.com/containerd/typeurl"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@ -35,13 +41,15 @@ func init() {
ID: "transfer",
Requires: []plugin.Type{
plugin.TransferPlugin,
plugin.StreamingPlugin,
},
InitFn: newService,
})
}
type service struct {
transferers []transfer.Transferer
transferers []transfer.Transferer
streamManager streaming.StreamManager
transferapi.UnimplementedTransferServer
}
@ -50,6 +58,7 @@ func newService(ic *plugin.InitContext) (interface{}, error) {
if err != nil {
return nil, err
}
// TODO: how to determine order?
t := make([]transfer.Transferer, 0, len(plugins))
for _, p := range plugins {
@ -59,8 +68,13 @@ func newService(ic *plugin.InitContext) (interface{}, error) {
}
t = append(t, i.(transfer.Transferer))
}
sp, err := ic.GetByID(plugin.StreamingPlugin, "manager")
if err != nil {
return nil, err
}
return &service{
transferers: t,
transferers: t,
streamManager: sp.(streaming.StreamManager),
}, nil
}
@ -70,15 +84,75 @@ func (s *service) Register(gs *grpc.Server) error {
}
func (s *service) Transfer(ctx context.Context, req *transferapi.TransferRequest) (*emptypb.Empty, error) {
// TODO: Optionally proxy
var transferOpts []transfer.Opt
if req.Options != nil {
if req.Options.ProgressStream != "" {
stream, err := s.streamManager.Get(ctx, req.Options.ProgressStream)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
defer stream.Close()
pf := func(p transfer.Progress) {
any, err := typeurl.MarshalAny(&transferTypes.Progress{
Event: p.Event,
Name: p.Name,
Parents: p.Parents,
Progress: p.Progress,
Total: p.Total,
})
if err != nil {
log.G(ctx).WithError(err).Warnf("event could not be marshaled: %v/%v", p.Event, p.Name)
return
}
if err := stream.Send(any); err != nil {
log.G(ctx).WithError(err).Warnf("event not sent: %v/%v", p.Event, p.Name)
return
}
}
transferOpts = append(transferOpts, transfer.WithProgress(pf))
}
}
src, err := s.convertAny(ctx, req.Source)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
dst, err := s.convertAny(ctx, req.Destination)
plugins.ResolveType(req.Source)
if err != nil {
return nil, errdefs.ToGRPC(err)
}
// TODO: Convert options
for _, t := range s.transferers {
if err := t.Transfer(ctx, req.Source, req.Destination); err == nil {
return nil, nil
if err := t.Transfer(ctx, src, dst, transferOpts...); err == nil {
return &ptypes.Empty{}, nil
} else if !errdefs.IsNotImplemented(err) {
return nil, errdefs.ToGRPC(err)
}
}
return nil, status.Errorf(codes.Unimplemented, "method Transfer not implemented for %s to %s", req.Source.GetTypeUrl(), req.Destination.GetTypeUrl())
}
func (s *service) convertAny(ctx context.Context, a typeurl.Any) (interface{}, error) {
obj, err := plugins.ResolveType(a)
if err != nil {
if errdefs.IsNotFound(err) {
return typeurl.UnmarshalAny(a)
}
return nil, err
}
switch v := obj.(type) {
case streamUnmarshaler:
err = v.UnmarshalAny(ctx, s.streamManager, a)
return obj, err
default:
log.G(ctx).Debug("unmarshling to..")
err = typeurl.UnmarshalTo(a, obj)
return obj, err
}
}
type streamUnmarshaler interface {
UnmarshalAny(context.Context, streaming.StreamGetter, typeurl.Any) error
}

View File

@ -19,56 +19,68 @@ package containerd
import (
"context"
streamingapi "github.com/containerd/containerd/api/services/streaming/v1"
transferapi "github.com/containerd/containerd/api/services/transfer/v1"
"github.com/containerd/containerd/pkg/streaming"
"github.com/containerd/containerd/pkg/transfer"
"github.com/containerd/containerd/pkg/transfer/proxy"
"github.com/containerd/containerd/protobuf"
"github.com/containerd/typeurl"
"google.golang.org/protobuf/types/known/anypb"
)
func (c *Client) Transfer(ctx context.Context, src interface{}, dst interface{}, opts ...transfer.Opt) error {
// Conver Options
// Convert Source
// Convert Destinations
// Get Stream Manager
func (c *Client) Transfer(ctx context.Context, src interface{}, dest interface{}, opts ...transfer.Opt) error {
return proxy.NewTransferer(transferapi.NewTransferClient(c.conn), c.streamCreator()).Transfer(ctx, src, dest, opts...)
}
asrc, err := c.toAny(ctx, src)
func (c *Client) streamCreator() streaming.StreamCreator {
return &streamCreator{
client: streamingapi.NewStreamingClient(c.conn),
}
}
type streamCreator struct {
client streamingapi.StreamingClient
}
func (sc *streamCreator) Create(ctx context.Context, id string) (streaming.Stream, error) {
stream, err := sc.client.Stream(ctx)
if err != nil {
return err
return nil, err
}
adst, err := c.toAny(ctx, dst)
if err != nil {
return err
}
_, err = transferapi.NewTransferClient(c.conn).Transfer(ctx, &transferapi.TransferRequest{
Source: &anypb.Any{
TypeUrl: asrc.GetTypeUrl(),
Value: asrc.GetValue(),
},
Destination: &anypb.Any{
TypeUrl: adst.GetTypeUrl(),
Value: adst.GetValue(),
},
a, err := typeurl.MarshalAny(&streamingapi.StreamInit{
ID: id,
})
return err
}
func (c *Client) toAny(ctx context.Context, i interface{}) (a typeurl.Any, err error) {
switch v := i.(type) {
case toAny:
//Get stream manager
a, err = v.ToAny(ctx, nil)
case typeurl.Any:
a = v
default:
a, err = typeurl.MarshalAny(i)
if err != nil {
return nil, err
}
err = stream.Send(protobuf.FromAny(a))
if err != nil {
return nil, err
}
return
// Receive an ack that stream is init and ready
if _, err = stream.Recv(); err != nil {
return nil, err
}
return &clientStream{
s: stream,
}, nil
}
type toAny interface {
ToAny(context.Context, streaming.StreamManager) (typeurl.Any, error)
type clientStream struct {
s streamingapi.Streaming_StreamClient
}
func (cs *clientStream) Send(a typeurl.Any) error {
return cs.s.Send(protobuf.FromAny(a))
}
func (cs *clientStream) Recv() (typeurl.Any, error) {
return cs.s.Recv()
}
func (cs *clientStream) Close() error {
return cs.s.CloseSend()
}