diff --git a/api/types/transfer/importexport.proto b/api/types/transfer/importexport.proto index cb1b0f6fa..54c9db9c0 100644 --- a/api/types/transfer/importexport.proto +++ b/api/types/transfer/importexport.proto @@ -38,4 +38,4 @@ message ImageExportStream { string stream = 1; string media_type = 2; -} \ No newline at end of file +} diff --git a/api/types/transfer/progress.proto b/api/types/transfer/progress.proto index 6cea419b5..3059bcbb1 100644 --- a/api/types/transfer/progress.proto +++ b/api/types/transfer/progress.proto @@ -26,4 +26,4 @@ message Progress { repeated string parents = 3; int64 progress = 4; int64 total = 5; -} \ No newline at end of file +} diff --git a/pkg/transfer/local/import.go b/pkg/transfer/local/import.go index 5518aea0a..676873ced 100644 --- a/pkg/transfer/local/import.go +++ b/pkg/transfer/local/import.go @@ -101,7 +101,6 @@ func (ts *localTransferService) importStream(ctx context.Context, i transfer.Ima tops.Progress(transfer.Progress{ Event: "saved", Name: img.Name, - //Digest: img.Target.Digest.String(), }) } } diff --git a/pkg/transfer/local/progress.go b/pkg/transfer/local/progress.go index a60475747..6762842a8 100644 --- a/pkg/transfer/local/progress.go +++ b/pkg/transfer/local/progress.go @@ -100,10 +100,9 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre if ok { if status.Offset > job.progress { pf(transfer.Progress{ - Event: j.transferState, - Name: job.name, - Parents: job.parents, - //Digest: job.desc.Digest.String(), + Event: j.transferState, + Name: job.name, + Parents: job.parents, Progress: status.Offset, Total: status.Total, }) @@ -117,10 +116,9 @@ func (j *ProgressTracker) HandleProgress(ctx context.Context, pf transfer.Progre log.G(ctx).WithError(err).Error("failed to get statuses for progress") } else if ok { pf(transfer.Progress{ - Event: "complete", - Name: job.name, - Parents: job.parents, - //Digest: job.desc.Digest.String(), + Event: "complete", + Name: job.name, + Parents: job.parents, Progress: job.desc.Size, Total: job.desc.Size, }) diff --git a/pkg/transfer/proxy/transfer.go b/pkg/transfer/proxy/transfer.go index 677ec1a8a..657492821 100644 --- a/pkg/transfer/proxy/transfer.go +++ b/pkg/transfer/proxy/transfer.go @@ -18,6 +18,7 @@ package proxy import ( "context" + "errors" "io" transferapi "github.com/containerd/containerd/api/services/transfer/v1" @@ -61,7 +62,7 @@ func (p *proxyTransferer) Transfer(ctx context.Context, src interface{}, dst int for { a, err := stream.Recv() if err != nil { - if err != io.EOF { + if !errors.Is(err, io.EOF) { log.G(ctx).WithError(err).Error("progress stream failed to recv") } return diff --git a/pkg/transfer/streaming/stream.go b/pkg/transfer/streaming/stream.go index f3bd4bb8e..5b4890cdd 100644 --- a/pkg/transfer/streaming/stream.go +++ b/pkg/transfer/streaming/stream.go @@ -113,7 +113,7 @@ func SendStream(ctx context.Context, r io.Reader, stream streaming.Stream) { b := (*buf)[:max] n, err := r.Read(b) if err != nil { - if err != io.EOF { + if !errors.Is(err, io.EOF) { log.G(ctx).WithError(err).Errorf("failed to read stream source") // TODO: Send error message on stream before close to allow remote side to return error } diff --git a/pkg/transfer/transfer.go b/pkg/transfer/transfer.go index f92f4d36b..0d024449b 100644 --- a/pkg/transfer/transfer.go +++ b/pkg/transfer/transfer.go @@ -85,7 +85,7 @@ type ImageExportStreamer interface { } type ImageUnpacker interface { - // TODO: Or unpack options? + // TODO: consider using unpack options UnpackPlatforms() []unpack.Platform } @@ -103,6 +103,10 @@ func WithProgress(f ProgressFunc) Opt { } } +// Progress is used to represent a particular progress event or incremental +// update for the provided named object. The parents represent the names of +// the objects which initiated the progress for the provided named object. +// The name and what object it represents is determined by the implementation. type Progress struct { Event string Name string @@ -111,21 +115,3 @@ type Progress struct { Total int64 // Descriptor? } - -/* -// Distribution options -// Stream handler -// Progress rate -// Unpack options -// Remote options -// Cases: -// Registry -> Content/ImageStore (pull) -// Registry -> Registry -// Content/ImageStore -> Registry (push) -// Content/ImageStore -> Content/ImageStore (tag) -// Common fetch/push interface for registry, content/imagestore, OCI index -// Always starts with string for source and destination, on client side, does not need to resolve -// Higher level implementation just takes strings and options -// Lower level implementation takes pusher/fetcher? - -*/ diff --git a/services/streaming/service.go b/services/streaming/service.go index ffb0ece61..07d0e5771 100644 --- a/services/streaming/service.go +++ b/services/streaming/service.go @@ -17,6 +17,7 @@ package streaming import ( + "errors" "io" api "github.com/containerd/containerd/api/services/streaming/v1" @@ -107,7 +108,7 @@ func (ss *serviceStream) Send(a typeurl.Any) error { func (ss *serviceStream) Recv() (a typeurl.Any, err error) { a, err = ss.s.Recv() - if err != io.EOF { + if !errors.Is(err, io.EOF) { err = errdefs.FromGRPC(err) } return diff --git a/services/transfer/service.go b/services/transfer/service.go index 1e2fa91f9..7cea394f1 100644 --- a/services/transfer/service.go +++ b/services/transfer/service.go @@ -119,7 +119,6 @@ func (s *service) Transfer(ctx context.Context, req *transferapi.TransferRequest return nil, errdefs.ToGRPC(err) } dst, err := s.convertAny(ctx, req.Destination) - plugins.ResolveType(req.Source) if err != nil { return nil, errdefs.ToGRPC(err) }