From f3ff95ab72796b2aec334ec9287c73ebc716996b Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 19 Mar 2019 16:04:06 +0900 Subject: [PATCH] decouple api.ContentServer implementation package from bbolt dependency For moby/buildkit#886 Signed-off-by: Akihiro Suda --- .../content/contentserver/contentserver.go | 463 ++++++++++++++++++ services/content/service.go | 446 +---------------- 2 files changed, 465 insertions(+), 444 deletions(-) create mode 100644 services/content/contentserver/contentserver.go diff --git a/services/content/contentserver/contentserver.go b/services/content/contentserver/contentserver.go new file mode 100644 index 000000000..33707303e --- /dev/null +++ b/services/content/contentserver/contentserver.go @@ -0,0 +1,463 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package contentserver + +import ( + "context" + "io" + "sync" + + api "github.com/containerd/containerd/api/services/content/v1" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + ptypes "github.com/gogo/protobuf/types" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type service struct { + store content.Store +} + +var bufPool = sync.Pool{ + New: func() interface{} { + buffer := make([]byte, 1<<20) + return &buffer + }, +} + +// New returns the content GRPC server +func New(cs content.Store) api.ContentServer { + return &service{store: cs} +} + +func (s *service) Register(server *grpc.Server) error { + api.RegisterContentServer(server, s) + return nil +} + +func (s *service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) { + if err := req.Digest.Validate(); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest) + } + + bi, err := s.store.Info(ctx, req.Digest) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &api.InfoResponse{ + Info: infoToGRPC(bi), + }, nil +} + +func (s *service) Update(ctx context.Context, req *api.UpdateRequest) (*api.UpdateResponse, error) { + if err := req.Info.Digest.Validate(); err != nil { + return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Info.Digest) + } + + info, err := s.store.Update(ctx, infoFromGRPC(req.Info), req.UpdateMask.GetPaths()...) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &api.UpdateResponse{ + Info: infoToGRPC(info), + }, nil +} + +func (s *service) List(req *api.ListContentRequest, session api.Content_ListServer) error { + var ( + buffer []api.Info + sendBlock = func(block []api.Info) error { + // send last block + return session.Send(&api.ListContentResponse{ + Info: block, + }) + } + ) + + if err := s.store.Walk(session.Context(), func(info content.Info) error { + buffer = append(buffer, api.Info{ + Digest: info.Digest, + Size_: info.Size, + CreatedAt: info.CreatedAt, + Labels: info.Labels, + }) + + if len(buffer) >= 100 { + if err := sendBlock(buffer); err != nil { + return err + } + + buffer = buffer[:0] + } + + return nil + }, req.Filters...); err != nil { + return err + } + + if len(buffer) > 0 { + // send last block + if err := sendBlock(buffer); err != nil { + return err + } + } + + return nil +} + +func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*ptypes.Empty, error) { + log.G(ctx).WithField("digest", req.Digest).Debugf("delete content") + if err := req.Digest.Validate(); err != nil { + return nil, status.Errorf(codes.InvalidArgument, err.Error()) + } + + if err := s.store.Delete(ctx, req.Digest); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &ptypes.Empty{}, nil +} + +func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error { + if err := req.Digest.Validate(); err != nil { + return status.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err) + } + + oi, err := s.store.Info(session.Context(), req.Digest) + if err != nil { + return errdefs.ToGRPC(err) + } + + ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest}) + if err != nil { + return errdefs.ToGRPC(err) + } + defer ra.Close() + + var ( + offset = req.Offset + // size is read size, not the expected size of the blob (oi.Size), which the caller might not be aware of. + // offset+size can be larger than oi.Size. + size = req.Size_ + + // TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably + // little inefficient for work over a fast network. We can tune this later. + p = bufPool.Get().(*[]byte) + ) + defer bufPool.Put(p) + + if offset < 0 { + offset = 0 + } + + if offset > oi.Size { + return status.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size) + } + + if size <= 0 || offset+size > oi.Size { + size = oi.Size - offset + } + + _, err = io.CopyBuffer( + &readResponseWriter{session: session}, + io.NewSectionReader(ra, offset, size), *p) + return errdefs.ToGRPC(err) +} + +// readResponseWriter is a writer that places the output into ReadContentRequest messages. +// +// This allows io.CopyBuffer to do the heavy lifting of chunking the responses +// into the buffer size. +type readResponseWriter struct { + offset int64 + session api.Content_ReadServer +} + +func (rw *readResponseWriter) Write(p []byte) (n int, err error) { + if err := rw.session.Send(&api.ReadContentResponse{ + Offset: rw.offset, + Data: p, + }); err != nil { + return 0, err + } + + rw.offset += int64(len(p)) + return len(p), nil +} + +func (s *service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) { + status, err := s.store.Status(ctx, req.Ref) + if err != nil { + return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref) + } + + var resp api.StatusResponse + resp.Status = &api.Status{ + StartedAt: status.StartedAt, + UpdatedAt: status.UpdatedAt, + Ref: status.Ref, + Offset: status.Offset, + Total: status.Total, + Expected: status.Expected, + } + + return &resp, nil +} + +func (s *service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) { + statuses, err := s.store.ListStatuses(ctx, req.Filters...) + if err != nil { + return nil, errdefs.ToGRPC(err) + } + + var resp api.ListStatusesResponse + for _, status := range statuses { + resp.Statuses = append(resp.Statuses, api.Status{ + StartedAt: status.StartedAt, + UpdatedAt: status.UpdatedAt, + Ref: status.Ref, + Offset: status.Offset, + Total: status.Total, + Expected: status.Expected, + }) + } + + return &resp, nil +} + +func (s *service) Write(session api.Content_WriteServer) (err error) { + var ( + ctx = session.Context() + msg api.WriteContentResponse + req *api.WriteContentRequest + ref string + total int64 + expected digest.Digest + ) + + defer func(msg *api.WriteContentResponse) { + // pump through the last message if no error was encountered + if err != nil { + if s, ok := status.FromError(err); ok && s.Code() != codes.AlreadyExists { + // TODO(stevvooe): Really need a log line here to track which + // errors are actually causing failure on the server side. May want + // to configure the service with an interceptor to make this work + // identically across all GRPC methods. + // + // This is pretty noisy, so we can remove it but leave it for now. + log.G(ctx).WithError(err).Error("(*service).Write failed") + } + + return + } + + err = session.Send(msg) + }(&msg) + + // handle the very first request! + req, err = session.Recv() + if err != nil { + return err + } + + ref = req.Ref + + if ref == "" { + return status.Errorf(codes.InvalidArgument, "first message must have a reference") + } + + fields := logrus.Fields{ + "ref": ref, + } + total = req.Total + expected = req.Expected + if total > 0 { + fields["total"] = total + } + + if expected != "" { + fields["expected"] = expected + } + + ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields)) + + log.G(ctx).Debug("(*service).Write started") + // this action locks the writer for the session. + wr, err := s.store.Writer(ctx, + content.WithRef(ref), + content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected})) + if err != nil { + return errdefs.ToGRPC(err) + } + defer wr.Close() + + for { + msg.Action = req.Action + ws, err := wr.Status() + if err != nil { + return errdefs.ToGRPC(err) + } + + msg.Offset = ws.Offset // always set the offset. + + // NOTE(stevvooe): In general, there are two cases underwhich a remote + // writer is used. + // + // For pull, we almost always have this before fetching large content, + // through descriptors. We allow predeclaration of the expected size + // and digest. + // + // For push, it is more complex. If we want to cut through content into + // storage, we may have no expectation until we are done processing the + // content. The case here is the following: + // + // 1. Start writing content. + // 2. Compress inline. + // 3. Validate digest and size (maybe). + // + // Supporting these two paths is quite awkward but it lets both API + // users use the same writer style for each with a minimum of overhead. + if req.Expected != "" { + if expected != "" && expected != req.Expected { + log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected) + } + expected = req.Expected + + if _, err := s.store.Info(session.Context(), req.Expected); err == nil { + if err := wr.Close(); err != nil { + log.G(ctx).WithError(err).Error("failed to close writer") + } + if err := s.store.Abort(session.Context(), ref); err != nil { + log.G(ctx).WithError(err).Error("failed to abort write") + } + + return status.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.Expected) + } + } + + if req.Total > 0 { + // Update the expected total. Typically, this could be seen at + // negotiation time or on a commit message. + if total > 0 && req.Total != total { + log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total) + } + total = req.Total + } + + switch req.Action { + case api.WriteActionStat: + msg.Digest = wr.Digest() + msg.StartedAt = ws.StartedAt + msg.UpdatedAt = ws.UpdatedAt + msg.Total = total + case api.WriteActionWrite, api.WriteActionCommit: + if req.Offset > 0 { + // validate the offset if provided + if req.Offset != ws.Offset { + return status.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset) + } + } + + if req.Offset == 0 && ws.Offset > 0 { + if err := wr.Truncate(req.Offset); err != nil { + return errors.Wrapf(err, "truncate failed") + } + msg.Offset = req.Offset + } + + // issue the write if we actually have data. + if len(req.Data) > 0 { + // While this looks like we could use io.WriterAt here, because we + // maintain the offset as append only, we just issue the write. + n, err := wr.Write(req.Data) + if err != nil { + return errdefs.ToGRPC(err) + } + + if n != len(req.Data) { + // TODO(stevvooe): Perhaps, we can recover this by including it + // in the offset on the write return. + return status.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data)) + } + + msg.Offset += int64(n) + } + + if req.Action == api.WriteActionCommit { + var opts []content.Opt + if req.Labels != nil { + opts = append(opts, content.WithLabels(req.Labels)) + } + if err := wr.Commit(ctx, total, expected, opts...); err != nil { + return errdefs.ToGRPC(err) + } + } + + msg.Digest = wr.Digest() + } + + if err := session.Send(&msg); err != nil { + return err + } + + req, err = session.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + return err + } + } +} + +func (s *service) Abort(ctx context.Context, req *api.AbortRequest) (*ptypes.Empty, error) { + if err := s.store.Abort(ctx, req.Ref); err != nil { + return nil, errdefs.ToGRPC(err) + } + + return &ptypes.Empty{}, nil +} + +func infoToGRPC(info content.Info) api.Info { + return api.Info{ + Digest: info.Digest, + Size_: info.Size, + CreatedAt: info.CreatedAt, + UpdatedAt: info.UpdatedAt, + Labels: info.Labels, + } +} + +func infoFromGRPC(info api.Info) content.Info { + return content.Info{ + Digest: info.Digest, + Size: info.Size_, + CreatedAt: info.CreatedAt, + UpdatedAt: info.UpdatedAt, + Labels: info.Labels, + } +} diff --git a/services/content/service.go b/services/content/service.go index d7e666053..43320d54d 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -17,39 +17,13 @@ package content import ( - "context" - "io" - "sync" - - api "github.com/containerd/containerd/api/services/content/v1" "github.com/containerd/containerd/content" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/services" - ptypes "github.com/gogo/protobuf/types" - digest "github.com/opencontainers/go-digest" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/containerd/containerd/services/content/contentserver" "github.com/pkg/errors" - "github.com/sirupsen/logrus" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" ) -type service struct { - store content.Store -} - -var bufPool = sync.Pool{ - New: func() interface{} { - buffer := make([]byte, 1<<20) - return &buffer - }, -} - -var _ api.ContentServer = &service{} - func init() { plugin.Register(&plugin.Registration{ Type: plugin.GRPCPlugin, @@ -70,423 +44,7 @@ func init() { if err != nil { return nil, err } - return NewService(cs.(content.Store)), nil + return contentserver.New(cs.(content.Store)), nil }, }) } - -// NewService returns the content GRPC server -func NewService(cs content.Store) api.ContentServer { - return &service{store: cs} -} - -func (s *service) Register(server *grpc.Server) error { - api.RegisterContentServer(server, s) - return nil -} - -func (s *service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResponse, error) { - if err := req.Digest.Validate(); err != nil { - return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest) - } - - bi, err := s.store.Info(ctx, req.Digest) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &api.InfoResponse{ - Info: infoToGRPC(bi), - }, nil -} - -func (s *service) Update(ctx context.Context, req *api.UpdateRequest) (*api.UpdateResponse, error) { - if err := req.Info.Digest.Validate(); err != nil { - return nil, status.Errorf(codes.InvalidArgument, "%q failed validation", req.Info.Digest) - } - - info, err := s.store.Update(ctx, infoFromGRPC(req.Info), req.UpdateMask.GetPaths()...) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &api.UpdateResponse{ - Info: infoToGRPC(info), - }, nil -} - -func (s *service) List(req *api.ListContentRequest, session api.Content_ListServer) error { - var ( - buffer []api.Info - sendBlock = func(block []api.Info) error { - // send last block - return session.Send(&api.ListContentResponse{ - Info: block, - }) - } - ) - - if err := s.store.Walk(session.Context(), func(info content.Info) error { - buffer = append(buffer, api.Info{ - Digest: info.Digest, - Size_: info.Size, - CreatedAt: info.CreatedAt, - Labels: info.Labels, - }) - - if len(buffer) >= 100 { - if err := sendBlock(buffer); err != nil { - return err - } - - buffer = buffer[:0] - } - - return nil - }, req.Filters...); err != nil { - return err - } - - if len(buffer) > 0 { - // send last block - if err := sendBlock(buffer); err != nil { - return err - } - } - - return nil -} - -func (s *service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*ptypes.Empty, error) { - log.G(ctx).WithField("digest", req.Digest).Debugf("delete content") - if err := req.Digest.Validate(); err != nil { - return nil, status.Errorf(codes.InvalidArgument, err.Error()) - } - - if err := s.store.Delete(ctx, req.Digest); err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &ptypes.Empty{}, nil -} - -func (s *service) Read(req *api.ReadContentRequest, session api.Content_ReadServer) error { - if err := req.Digest.Validate(); err != nil { - return status.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err) - } - - oi, err := s.store.Info(session.Context(), req.Digest) - if err != nil { - return errdefs.ToGRPC(err) - } - - ra, err := s.store.ReaderAt(session.Context(), ocispec.Descriptor{Digest: req.Digest}) - if err != nil { - return errdefs.ToGRPC(err) - } - defer ra.Close() - - var ( - offset = req.Offset - // size is read size, not the expected size of the blob (oi.Size), which the caller might not be aware of. - // offset+size can be larger than oi.Size. - size = req.Size_ - - // TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably - // little inefficient for work over a fast network. We can tune this later. - p = bufPool.Get().(*[]byte) - ) - defer bufPool.Put(p) - - if offset < 0 { - offset = 0 - } - - if offset > oi.Size { - return status.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size) - } - - if size <= 0 || offset+size > oi.Size { - size = oi.Size - offset - } - - _, err = io.CopyBuffer( - &readResponseWriter{session: session}, - io.NewSectionReader(ra, offset, size), *p) - return errdefs.ToGRPC(err) -} - -// readResponseWriter is a writer that places the output into ReadContentRequest messages. -// -// This allows io.CopyBuffer to do the heavy lifting of chunking the responses -// into the buffer size. -type readResponseWriter struct { - offset int64 - session api.Content_ReadServer -} - -func (rw *readResponseWriter) Write(p []byte) (n int, err error) { - if err := rw.session.Send(&api.ReadContentResponse{ - Offset: rw.offset, - Data: p, - }); err != nil { - return 0, err - } - - rw.offset += int64(len(p)) - return len(p), nil -} - -func (s *service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) { - status, err := s.store.Status(ctx, req.Ref) - if err != nil { - return nil, errdefs.ToGRPCf(err, "could not get status for ref %q", req.Ref) - } - - var resp api.StatusResponse - resp.Status = &api.Status{ - StartedAt: status.StartedAt, - UpdatedAt: status.UpdatedAt, - Ref: status.Ref, - Offset: status.Offset, - Total: status.Total, - Expected: status.Expected, - } - - return &resp, nil -} - -func (s *service) ListStatuses(ctx context.Context, req *api.ListStatusesRequest) (*api.ListStatusesResponse, error) { - statuses, err := s.store.ListStatuses(ctx, req.Filters...) - if err != nil { - return nil, errdefs.ToGRPC(err) - } - - var resp api.ListStatusesResponse - for _, status := range statuses { - resp.Statuses = append(resp.Statuses, api.Status{ - StartedAt: status.StartedAt, - UpdatedAt: status.UpdatedAt, - Ref: status.Ref, - Offset: status.Offset, - Total: status.Total, - Expected: status.Expected, - }) - } - - return &resp, nil -} - -func (s *service) Write(session api.Content_WriteServer) (err error) { - var ( - ctx = session.Context() - msg api.WriteContentResponse - req *api.WriteContentRequest - ref string - total int64 - expected digest.Digest - ) - - defer func(msg *api.WriteContentResponse) { - // pump through the last message if no error was encountered - if err != nil { - if s, ok := status.FromError(err); ok && s.Code() != codes.AlreadyExists { - // TODO(stevvooe): Really need a log line here to track which - // errors are actually causing failure on the server side. May want - // to configure the service with an interceptor to make this work - // identically across all GRPC methods. - // - // This is pretty noisy, so we can remove it but leave it for now. - log.G(ctx).WithError(err).Error("(*service).Write failed") - } - - return - } - - err = session.Send(msg) - }(&msg) - - // handle the very first request! - req, err = session.Recv() - if err != nil { - return err - } - - ref = req.Ref - - if ref == "" { - return status.Errorf(codes.InvalidArgument, "first message must have a reference") - } - - fields := logrus.Fields{ - "ref": ref, - } - total = req.Total - expected = req.Expected - if total > 0 { - fields["total"] = total - } - - if expected != "" { - fields["expected"] = expected - } - - ctx = log.WithLogger(ctx, log.G(ctx).WithFields(fields)) - - log.G(ctx).Debug("(*service).Write started") - // this action locks the writer for the session. - wr, err := s.store.Writer(ctx, - content.WithRef(ref), - content.WithDescriptor(ocispec.Descriptor{Size: total, Digest: expected})) - if err != nil { - return errdefs.ToGRPC(err) - } - defer wr.Close() - - for { - msg.Action = req.Action - ws, err := wr.Status() - if err != nil { - return errdefs.ToGRPC(err) - } - - msg.Offset = ws.Offset // always set the offset. - - // NOTE(stevvooe): In general, there are two cases underwhich a remote - // writer is used. - // - // For pull, we almost always have this before fetching large content, - // through descriptors. We allow predeclaration of the expected size - // and digest. - // - // For push, it is more complex. If we want to cut through content into - // storage, we may have no expectation until we are done processing the - // content. The case here is the following: - // - // 1. Start writing content. - // 2. Compress inline. - // 3. Validate digest and size (maybe). - // - // Supporting these two paths is quite awkward but it lets both API - // users use the same writer style for each with a minimum of overhead. - if req.Expected != "" { - if expected != "" && expected != req.Expected { - log.G(ctx).Debugf("commit digest differs from writer digest: %v != %v", req.Expected, expected) - } - expected = req.Expected - - if _, err := s.store.Info(session.Context(), req.Expected); err == nil { - if err := wr.Close(); err != nil { - log.G(ctx).WithError(err).Error("failed to close writer") - } - if err := s.store.Abort(session.Context(), ref); err != nil { - log.G(ctx).WithError(err).Error("failed to abort write") - } - - return status.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.Expected) - } - } - - if req.Total > 0 { - // Update the expected total. Typically, this could be seen at - // negotiation time or on a commit message. - if total > 0 && req.Total != total { - log.G(ctx).Debugf("commit size differs from writer size: %v != %v", req.Total, total) - } - total = req.Total - } - - switch req.Action { - case api.WriteActionStat: - msg.Digest = wr.Digest() - msg.StartedAt = ws.StartedAt - msg.UpdatedAt = ws.UpdatedAt - msg.Total = total - case api.WriteActionWrite, api.WriteActionCommit: - if req.Offset > 0 { - // validate the offset if provided - if req.Offset != ws.Offset { - return status.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset) - } - } - - if req.Offset == 0 && ws.Offset > 0 { - if err := wr.Truncate(req.Offset); err != nil { - return errors.Wrapf(err, "truncate failed") - } - msg.Offset = req.Offset - } - - // issue the write if we actually have data. - if len(req.Data) > 0 { - // While this looks like we could use io.WriterAt here, because we - // maintain the offset as append only, we just issue the write. - n, err := wr.Write(req.Data) - if err != nil { - return errdefs.ToGRPC(err) - } - - if n != len(req.Data) { - // TODO(stevvooe): Perhaps, we can recover this by including it - // in the offset on the write return. - return status.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data)) - } - - msg.Offset += int64(n) - } - - if req.Action == api.WriteActionCommit { - var opts []content.Opt - if req.Labels != nil { - opts = append(opts, content.WithLabels(req.Labels)) - } - if err := wr.Commit(ctx, total, expected, opts...); err != nil { - return errdefs.ToGRPC(err) - } - } - - msg.Digest = wr.Digest() - } - - if err := session.Send(&msg); err != nil { - return err - } - - req, err = session.Recv() - if err != nil { - if err == io.EOF { - return nil - } - - return err - } - } -} - -func (s *service) Abort(ctx context.Context, req *api.AbortRequest) (*ptypes.Empty, error) { - if err := s.store.Abort(ctx, req.Ref); err != nil { - return nil, errdefs.ToGRPC(err) - } - - return &ptypes.Empty{}, nil -} - -func infoToGRPC(info content.Info) api.Info { - return api.Info{ - Digest: info.Digest, - Size_: info.Size, - CreatedAt: info.CreatedAt, - UpdatedAt: info.UpdatedAt, - Labels: info.Labels, - } -} - -func infoFromGRPC(info api.Info) content.Info { - return content.Info{ - Digest: info.Digest, - Size: info.Size_, - CreatedAt: info.CreatedAt, - UpdatedAt: info.UpdatedAt, - Labels: info.Labels, - } -}