content: unify provider and ingester
The split between provider and ingester was a long standing division reflecting the client-side use cases. For the most part, we were differentiating these for the algorithms that operate them, but it made instantation and use of the types challenging. On the server-side, this distinction is generally less important. This change unifies these types and in the process we get a few benefits. The first is that we now completely access the content store over GRPC. This was the initial intent and we have now satisfied this goal completely. There are a few issues around listing content and getting status, but we resolve these with simple streaming and regexp filters. More can probably be done to polish this but the result is clean. Several other content-oriented methods were polished in the process of unification. We have now properly seperated out the `Abort` method to cancel ongoing or stalled ingest processes. We have also replaced the `Active` method with a single status method. The transition went extremely smoothly. Once the clients were updated to use the new methods, every thing worked as expected on the first compile. Signed-off-by: Stephen J Day <stephen.day@docker.com>
This commit is contained in:
@@ -17,3 +17,14 @@ func rewriteGRPCError(err error) error {
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func serverErrorToGRPC(err error, id string) error {
|
||||
switch {
|
||||
case content.IsNotFound(err):
|
||||
return grpc.Errorf(codes.NotFound, "%v: not found", id)
|
||||
case content.IsExists(err):
|
||||
return grpc.Errorf(codes.AlreadyExists, "%v: exists", id)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -1,35 +1,9 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
contentapi "github.com/containerd/containerd/api/services/content"
|
||||
"github.com/containerd/containerd/content"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
func NewProviderFromClient(client contentapi.ContentClient) content.Provider {
|
||||
return &remoteProvider{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type remoteProvider struct {
|
||||
client contentapi.ContentClient
|
||||
}
|
||||
|
||||
func (rp *remoteProvider) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||
client, err := rp.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &remoteReader{
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type remoteReader struct {
|
||||
client contentapi.Content_ReadClient
|
||||
extra []byte
|
||||
@@ -18,7 +18,7 @@ import (
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
store *content.Store
|
||||
store content.Store
|
||||
}
|
||||
|
||||
var bufPool = sync.Pool{
|
||||
@@ -52,25 +52,68 @@ func (s *Service) Info(ctx context.Context, req *api.InfoRequest) (*api.InfoResp
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest)
|
||||
}
|
||||
|
||||
bi, err := s.store.Info(req.Digest)
|
||||
bi, err := s.store.Info(ctx, req.Digest)
|
||||
if err != nil {
|
||||
return nil, maybeNotFoundGRPC(err, req.Digest.String())
|
||||
return nil, serverErrorToGRPC(err, req.Digest.String())
|
||||
}
|
||||
|
||||
return &api.InfoResponse{
|
||||
Digest: req.Digest,
|
||||
Size_: bi.Size,
|
||||
CommittedAt: bi.CommittedAt,
|
||||
Info: api.Info{
|
||||
Digest: bi.Digest,
|
||||
Size_: bi.Size,
|
||||
CommittedAt: bi.CommittedAt,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Service) List(req *api.ListContentRequest, session api.Content_ListServer) error {
|
||||
var (
|
||||
buffer []api.Info
|
||||
sendBlock = func(block []api.Info) error {
|
||||
// send last block
|
||||
return session.Send(&api.ListContentResponse{
|
||||
Info: block,
|
||||
})
|
||||
}
|
||||
)
|
||||
|
||||
if err := s.store.Walk(session.Context(), func(info content.Info) error {
|
||||
buffer = append(buffer, api.Info{
|
||||
Digest: info.Digest,
|
||||
Size_: info.Size,
|
||||
CommittedAt: info.CommittedAt,
|
||||
})
|
||||
|
||||
if len(buffer) >= 100 {
|
||||
if err := sendBlock(buffer); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buffer = buffer[:0]
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(buffer) > 0 {
|
||||
// send last block
|
||||
if err := sendBlock(buffer); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Delete(ctx context.Context, req *api.DeleteContentRequest) (*empty.Empty, error) {
|
||||
if err := req.Digest.Validate(); err != nil {
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err := s.store.Delete(req.Digest); err != nil {
|
||||
return nil, maybeNotFoundGRPC(err, req.Digest.String())
|
||||
if err := s.store.Delete(ctx, req.Digest); err != nil {
|
||||
return nil, serverErrorToGRPC(err, req.Digest.String())
|
||||
}
|
||||
|
||||
return &empty.Empty{}, nil
|
||||
@@ -81,14 +124,14 @@ func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) err
|
||||
return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err)
|
||||
}
|
||||
|
||||
oi, err := s.store.Info(req.Digest)
|
||||
oi, err := s.store.Info(session.Context(), req.Digest)
|
||||
if err != nil {
|
||||
return maybeNotFoundGRPC(err, req.Digest.String())
|
||||
return serverErrorToGRPC(err, req.Digest.String())
|
||||
}
|
||||
|
||||
rc, err := s.store.Reader(session.Context(), req.Digest)
|
||||
if err != nil {
|
||||
return maybeNotFoundGRPC(err, req.Digest.String())
|
||||
return serverErrorToGRPC(err, req.Digest.String())
|
||||
}
|
||||
defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance.
|
||||
|
||||
@@ -132,6 +175,10 @@ func (s *Service) Read(req *api.ReadRequest, session api.Content_ReadServer) err
|
||||
return nil
|
||||
}
|
||||
|
||||
// readResponseWriter is a writer that places the output into ReadResponse messages.
|
||||
//
|
||||
// This allows io.CopyBuffer to do the heavy lifting of chunking the responses
|
||||
// into the buffer size.
|
||||
type readResponseWriter struct {
|
||||
offset int64
|
||||
session api.Content_ReadServer
|
||||
@@ -149,6 +196,27 @@ func (rw *readResponseWriter) Write(p []byte) (n int, err error) {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func (s *Service) Status(ctx context.Context, req *api.StatusRequest) (*api.StatusResponse, error) {
|
||||
statuses, err := s.store.Status(ctx, req.Regexp)
|
||||
if err != nil {
|
||||
return nil, serverErrorToGRPC(err, req.Regexp)
|
||||
}
|
||||
|
||||
var resp api.StatusResponse
|
||||
for _, status := range statuses {
|
||||
resp.Statuses = append(resp.Statuses, api.Status{
|
||||
StartedAt: status.StartedAt,
|
||||
UpdatedAt: status.UpdatedAt,
|
||||
Ref: status.Ref,
|
||||
Offset: status.Offset,
|
||||
Total: status.Total,
|
||||
Expected: status.Expected,
|
||||
})
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
||||
|
||||
func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||
var (
|
||||
ctx = session.Context()
|
||||
@@ -243,8 +311,8 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||
}
|
||||
expected = req.Expected
|
||||
|
||||
if _, err := s.store.Info(req.Expected); err == nil {
|
||||
if err := s.store.Abort(ref); err != nil {
|
||||
if _, err := s.store.Info(session.Context(), req.Expected); err == nil {
|
||||
if err := s.store.Abort(session.Context(), ref); err != nil {
|
||||
log.G(ctx).WithError(err).Error("failed to abort write")
|
||||
}
|
||||
|
||||
@@ -307,8 +375,6 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||
}
|
||||
|
||||
msg.Digest = wr.Digest()
|
||||
case api.WriteActionAbort:
|
||||
return s.store.Abort(ref)
|
||||
}
|
||||
|
||||
if err := session.Send(&msg); err != nil {
|
||||
@@ -326,14 +392,10 @@ func (s *Service) Write(session api.Content_WriteServer) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) Status(*api.StatusRequest, api.Content_StatusServer) error {
|
||||
return grpc.Errorf(codes.Unimplemented, "not implemented")
|
||||
}
|
||||
|
||||
func maybeNotFoundGRPC(err error, id string) error {
|
||||
if content.IsNotFound(err) {
|
||||
return grpc.Errorf(codes.NotFound, "%v: not found", id)
|
||||
func (s *Service) Abort(ctx context.Context, req *api.AbortRequest) (*empty.Empty, error) {
|
||||
if err := s.store.Abort(ctx, req.Ref); err != nil {
|
||||
return nil, serverErrorToGRPC(err, req.Ref)
|
||||
}
|
||||
|
||||
return err
|
||||
return &empty.Empty{}, nil
|
||||
}
|
||||
|
||||
155
services/content/store.go
Normal file
155
services/content/store.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
contentapi "github.com/containerd/containerd/api/services/content"
|
||||
"github.com/containerd/containerd/content"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
type remoteStore struct {
|
||||
client contentapi.ContentClient
|
||||
}
|
||||
|
||||
func NewStoreFromClient(client contentapi.ContentClient) content.Store {
|
||||
return &remoteStore{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *remoteStore) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {
|
||||
resp, err := rs.client.Info(ctx, &contentapi.InfoRequest{
|
||||
Digest: dgst,
|
||||
})
|
||||
if err != nil {
|
||||
return content.Info{}, rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
return content.Info{
|
||||
Digest: resp.Info.Digest,
|
||||
Size: resp.Info.Size_,
|
||||
CommittedAt: resp.Info.CommittedAt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) Walk(ctx context.Context, fn content.WalkFunc) error {
|
||||
session, err := rs.client.List(ctx, &contentapi.ListContentRequest{})
|
||||
if err != nil {
|
||||
return rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
for {
|
||||
msg, err := session.Recv()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
return rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
for _, info := range msg.Info {
|
||||
if err := fn(content.Info{
|
||||
Digest: info.Digest,
|
||||
Size: info.Size_,
|
||||
CommittedAt: info.CommittedAt,
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) Delete(ctx context.Context, dgst digest.Digest) error {
|
||||
if _, err := rs.client.Delete(ctx, &contentapi.DeleteContentRequest{
|
||||
Digest: dgst,
|
||||
}); err != nil {
|
||||
return rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) {
|
||||
client, err := rs.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &remoteReader{
|
||||
client: client,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) Status(ctx context.Context, re string) ([]content.Status, error) {
|
||||
resp, err := rs.client.Status(ctx, &contentapi.StatusRequest{
|
||||
Regexp: re,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
var statuses []content.Status
|
||||
for _, status := range resp.Statuses {
|
||||
statuses = append(statuses, content.Status{
|
||||
Ref: status.Ref,
|
||||
StartedAt: status.StartedAt,
|
||||
UpdatedAt: status.UpdatedAt,
|
||||
Offset: status.Offset,
|
||||
Total: status.Total,
|
||||
Expected: status.Expected,
|
||||
})
|
||||
}
|
||||
|
||||
return statuses, nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
||||
wrclient, offset, err := rs.negotiate(ctx, ref, size, expected)
|
||||
if err != nil {
|
||||
return nil, rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
return &remoteWriter{
|
||||
client: wrclient,
|
||||
offset: offset,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Abort implements asynchronous abort. It starts a new write session on the ref l
|
||||
func (rs *remoteStore) Abort(ctx context.Context, ref string) error {
|
||||
if _, err := rs.client.Abort(ctx, &contentapi.AbortRequest{
|
||||
Ref: ref,
|
||||
}); err != nil {
|
||||
return rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rs *remoteStore) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
|
||||
wrclient, err := rs.client.Write(ctx)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if err := wrclient.Send(&contentapi.WriteRequest{
|
||||
Action: contentapi.WriteActionStat,
|
||||
Ref: ref,
|
||||
Total: size,
|
||||
Expected: expected,
|
||||
}); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
resp, err := wrclient.Recv()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return wrclient, resp.Offset, nil
|
||||
}
|
||||
@@ -1,61 +1,14 @@
|
||||
package content
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
contentapi "github.com/containerd/containerd/api/services/content"
|
||||
"github.com/containerd/containerd/content"
|
||||
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
func NewIngesterFromClient(client contentapi.ContentClient) content.Ingester {
|
||||
return &remoteIngester{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type remoteIngester struct {
|
||||
client contentapi.ContentClient
|
||||
}
|
||||
|
||||
func (ri *remoteIngester) Writer(ctx context.Context, ref string, size int64, expected digest.Digest) (content.Writer, error) {
|
||||
wrclient, offset, err := ri.negotiate(ctx, ref, size, expected)
|
||||
if err != nil {
|
||||
return nil, rewriteGRPCError(err)
|
||||
}
|
||||
|
||||
return &remoteWriter{
|
||||
client: wrclient,
|
||||
offset: offset,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (ri *remoteIngester) negotiate(ctx context.Context, ref string, size int64, expected digest.Digest) (contentapi.Content_WriteClient, int64, error) {
|
||||
wrclient, err := ri.client.Write(ctx)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
if err := wrclient.Send(&contentapi.WriteRequest{
|
||||
Action: contentapi.WriteActionStat,
|
||||
Ref: ref,
|
||||
Total: size,
|
||||
Expected: expected,
|
||||
}); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
resp, err := wrclient.Recv()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return wrclient, resp.Offset, nil
|
||||
}
|
||||
|
||||
type remoteWriter struct {
|
||||
ref string
|
||||
client contentapi.Content_WriteClient
|
||||
@@ -26,11 +26,11 @@ func init() {
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
store *content.Store
|
||||
store content.Store
|
||||
snapshotter snapshot.Snapshotter
|
||||
}
|
||||
|
||||
func NewService(store *content.Store, snapshotter snapshot.Snapshotter) (*Service, error) {
|
||||
func NewService(store content.Store, snapshotter snapshot.Snapshotter) (*Service, error) {
|
||||
return &Service{
|
||||
store: store,
|
||||
snapshotter: snapshotter,
|
||||
|
||||
Reference in New Issue
Block a user