diff --git a/content/helpers.go b/content/helpers.go index 5404109a6..0161c8deb 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -148,32 +148,28 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er // // Copy is buffered, so no need to wrap reader in buffered io. func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error { - ws, err := cw.Status() - if err != nil { - return fmt.Errorf("failed to get status: %w", err) - } r := or - if ws.Offset > 0 { - r, err = seekReader(or, ws.Offset, size) - if err != nil { - return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err) - } - } - for i := 0; i < maxResets; i++ { if i >= 1 { log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset") } - copied, err := copyWithBuffer(cw, r) - if errors.Is(err, ErrReset) { - ws, err := cw.Status() - if err != nil { - return fmt.Errorf("failed to get status: %w", err) - } + + ws, err := cw.Status() + if err != nil { + return fmt.Errorf("failed to get status: %w", err) + } + // Reset the original reader if + // 1. there is an offset, or + // 2. this is a retry due to Reset error + if ws.Offset > 0 || i > 0 { r, err = seekReader(or, ws.Offset, size) if err != nil { return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err) } + } + + copied, err := copyWithBuffer(cw, r) + if errors.Is(err, ErrReset) { continue } if err != nil { @@ -185,14 +181,6 @@ func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected dig } if err := cw.Commit(ctx, size, expected, opts...); err != nil { if errors.Is(err, ErrReset) { - ws, err := cw.Status() - if err != nil { - return fmt.Errorf("failed to get status: %w", err) - } - r, err = seekReader(or, ws.Offset, size) - if err != nil { - return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err) - } continue } if !errdefs.IsAlreadyExists(err) { diff --git a/services/content/contentserver/contentserver.go b/services/content/contentserver/contentserver.go index 76a9e6eea..683f883b0 100644 --- a/services/content/contentserver/contentserver.go +++ b/services/content/contentserver/contentserver.go @@ -40,12 +40,15 @@ type service struct { api.UnimplementedContentServer } -var bufPool = sync.Pool{ - New: func() interface{} { - buffer := make([]byte, 1<<20) - return &buffer - }, -} +var ( + empty = &ptypes.Empty{} + bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 1<<20) + return &buffer + }, + } +) // New returns the content GRPC server func New(cs content.Store) api.ContentServer { @@ -101,12 +104,7 @@ func (s *service) List(req *api.ListContentRequest, session api.Content_ListServ ) if err := s.store.Walk(session.Context(), func(info content.Info) error { - buffer = append(buffer, &api.Info{ - Digest: info.Digest.String(), - Size: info.Size, - CreatedAt: protobuf.ToTimestamp(info.CreatedAt), - Labels: info.Labels, - }) + buffer = append(buffer, infoToGRPC(info)) if len(buffer) >= 100 { if err := sendBlock(buffer); err != nil { @@ -142,7 +140,7 @@ func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*p return nil, errdefs.ToGRPC(err) } - return &ptypes.Empty{}, nil + return empty, nil } func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error { @@ -449,7 +447,7 @@ func (s *service) Abort(ctx context.Context, req *api.AbortRequest) (*ptypes.Emp return nil, errdefs.ToGRPC(err) } - return &ptypes.Empty{}, nil + return empty, nil } func infoToGRPC(info content.Info) *api.Info {