From e6efb397cfb16c6cd69a026b6bf6e71513c9717b Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Fri, 17 Feb 2017 16:49:59 -0800 Subject: [PATCH] cmd/dist: port commands over to use GRPC content store Following from the rest of the work in this branch, we now are porting the dist command to work directly against the containerd content API. Signed-off-by: Stephen J Day --- cmd/containerd/main.go | 32 ++++-- cmd/dist/get.go | 18 +++- cmd/dist/ingest.go | 7 +- cmd/dist/main.go | 5 + content/client.go | 216 ++++++++++++++++++++++++++++++++++++++ content/service.go | 233 +++++++++++++++++++++++++++++++++++++++++ 6 files changed, 496 insertions(+), 15 deletions(-) create mode 100644 content/client.go create mode 100644 content/service.go diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 5cdb8c034..9002b5df6 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -7,6 +7,7 @@ import ( _ "net/http/pprof" "os" "os/signal" + "path/filepath" "syscall" "time" @@ -15,7 +16,9 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/containerd" + contentapi "github.com/docker/containerd/api/services/content" api "github.com/docker/containerd/api/services/execution" + "github.com/docker/containerd/content" _ "github.com/docker/containerd/linux" "github.com/docker/containerd/log" "github.com/docker/containerd/services/execution" @@ -55,10 +58,6 @@ func main() { Name: "log-level,l", Usage: "set the logging level [debug, info, warn, error, fatal, panic]", }, - cli.StringFlag{ - Name: "root,r", - Usage: "containerd root directory", - }, cli.StringFlag{ Name: "state", Usage: "containerd state directory", @@ -90,14 +89,27 @@ func main() { return err } serveMetricsAPI() + + contentStore, err := resolveContentStore(context) + if err != nil { + return err + } + contentService := content.NewService(contentStore) + // start the GRPC api with the execution service registered - server := newGRPCServer(execution.New(supervisor)) + server := newGRPCServer() + + api.RegisterContainerServiceServer(server, execution.New(supervisor)) + contentapi.RegisterContentServer(server, contentService) + + // start the GRPC api with registered services if err := serveGRPC(server); err != nil { return err } log.G(global).Infof("containerd successfully booted in %fs", time.Now().Sub(start).Seconds()) return handleSignals(signals, server) } + if err := app.Run(os.Args); err != nil { fmt.Fprintf(os.Stderr, "containerd: %s\n", err) os.Exit(1) @@ -192,8 +204,13 @@ func serveDebugAPI() error { return nil } +func resolveContentStore(context *cli.Context) (*content.Store, error) { + cp := filepath.Join(conf.Root, "content") + return content.NewStore(cp) +} + func loadRuntimes() (map[string]containerd.Runtime, error) { - o := make(map[string]containerd.Runtime) + o := map[string]containerd.Runtime{} for _, name := range containerd.Runtimes() { r, err := containerd.NewRuntime(name, conf.State) if err != nil { @@ -205,9 +222,8 @@ func loadRuntimes() (map[string]containerd.Runtime, error) { return o, nil } -func newGRPCServer(service api.ContainerServiceServer) *grpc.Server { +func newGRPCServer() *grpc.Server { s := grpc.NewServer(grpc.UnaryInterceptor(interceptor)) - api.RegisterContainerServiceServer(s, service) return s } diff --git a/cmd/dist/get.go b/cmd/dist/get.go index bf35933af..f4bfbb233 100644 --- a/cmd/dist/get.go +++ b/cmd/dist/get.go @@ -4,6 +4,8 @@ import ( "io" "os" + contentapi "github.com/docker/containerd/api/services/content" + "github.com/docker/containerd/content" digest "github.com/opencontainers/go-digest" "github.com/urfave/cli" ) @@ -17,17 +19,23 @@ var getCommand = cli.Command{ Output paths can be used to directly access blobs on disk.`, Flags: []cli.Flag{}, Action: func(context *cli.Context) error { - cs, err := resolveContentStore(context) - if err != nil { - return err - } + var ( + ctx = background + ) dgst, err := digest.Parse(context.Args().First()) if err != nil { return err } - rc, err := cs.Open(dgst) + conn, err := connectGRPC(context) + if err != nil { + return err + } + + cs := content.NewProviderFromClient(contentapi.NewContentClient(conn)) + + rc, err := cs.Reader(ctx, dgst) if err != nil { return err } diff --git a/cmd/dist/ingest.go b/cmd/dist/ingest.go index e3353cc54..8d8a7cc62 100644 --- a/cmd/dist/ingest.go +++ b/cmd/dist/ingest.go @@ -5,6 +5,7 @@ import ( "fmt" "os" + contentapi "github.com/docker/containerd/api/services/content" "github.com/docker/containerd/content" "github.com/opencontainers/go-digest" "github.com/urfave/cli" @@ -41,7 +42,7 @@ var ingestCommand = cli.Command{ return err } - cs, err := resolveContentStore(context) + conn, err := connectGRPC(context) if err != nil { return err } @@ -50,9 +51,11 @@ var ingestCommand = cli.Command{ return fmt.Errorf("must specify a transaction reference") } + ingester := content.NewIngesterFromClient(contentapi.NewContentClient(conn)) + // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // all data to be written in a single invocation. Allow multiple writes // to the same transaction key followed by a commit. - return content.WriteBlob(ctx, cs, os.Stdin, ref, expectedSize, expectedDigest) + return content.WriteBlob(ctx, ingester, os.Stdin, ref, expectedSize, expectedDigest) }, } diff --git a/cmd/dist/main.go b/cmd/dist/main.go index de4cf22fe..bfe6b24e4 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -42,6 +42,11 @@ distribution tool Usage: "path to content store root", Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content }, + cli.StringFlag{ + Name: "socket, s", + Usage: "socket path for containerd's GRPC server", + Value: "/run/containerd/containerd.sock", + }, } app.Commands = []cli.Command{ fetchCommand, diff --git a/content/client.go b/content/client.go new file mode 100644 index 000000000..92bea3111 --- /dev/null +++ b/content/client.go @@ -0,0 +1,216 @@ +package content + +import ( + "context" + "io" + + contentapi "github.com/docker/containerd/api/services/content" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +func NewProviderFromClient(client contentapi.ContentClient) Provider { + return &remoteProvider{ + client: client, + } +} + +type remoteProvider struct { + client contentapi.ContentClient +} + +func (rp *remoteProvider) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + client, err := rp.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst}) + if err != nil { + return nil, err + } + + return &remoteReader{ + client: client, + }, nil +} + +type remoteReader struct { + client contentapi.Content_ReadClient + extra []byte +} + +func (rr *remoteReader) Read(p []byte) (n int, err error) { + n += copy(p, rr.extra) + if n >= len(p) { + if n <= len(rr.extra) { + rr.extra = rr.extra[n:] + } else { + rr.extra = rr.extra[:0] + } + return + } + + p = p[n:] + for len(p) > 0 { + var resp *contentapi.ReadResponse + // fill our buffer up until we can fill p. + resp, err = rr.client.Recv() + if err != nil { + return + } + + copied := copy(p, resp.Data) + n += copied + p = p[copied:] + + if copied < len(p) { + continue + } + + rr.extra = append(rr.extra, resp.Data[copied:]...) + } + + return +} + +func (rr *remoteReader) Close() error { + return rr.client.CloseSend() +} + +func NewIngesterFromClient(client contentapi.ContentClient) Ingester { + return &remoteIngester{ + client: client, + } +} + +type remoteIngester struct { + client contentapi.ContentClient +} + +func (ri *remoteIngester) Writer(ctx context.Context, ref string) (Writer, error) { + wrclient, offset, err := ri.negotiate(ctx, ref) + if err != nil { + return nil, err + } + + return &remoteWriter{ + client: wrclient, + offset: offset, + }, nil +} + +func (ri *remoteIngester) negotiate(ctx context.Context, ref string) (contentapi.Content_WriteClient, int64, error) { + wrclient, err := ri.client.Write(ctx) + if err != nil { + return nil, 0, err + } + + if err := wrclient.Send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionStat, + Ref: ref, + }); err != nil { + return nil, 0, err + } + + resp, err := wrclient.Recv() + if err != nil { + return nil, 0, err + } + + return wrclient, resp.Offset, nil +} + +type remoteWriter struct { + ref string + client contentapi.Content_WriteClient + offset int64 + digest digest.Digest +} + +func newRemoteWriter(client contentapi.Content_WriteClient, ref string, offset int64) (*remoteWriter, error) { + return &remoteWriter{ + ref: ref, + client: client, + offset: offset, + }, nil +} + +// send performs a synchronous req-resp cycle on the client. +func (rw *remoteWriter) send(req *contentapi.WriteRequest) (*contentapi.WriteResponse, error) { + if err := rw.client.Send(req); err != nil { + return nil, err + } + + resp, err := rw.client.Recv() + + if err == nil { + // try to keep these in sync + if resp.Digest != "" { + rw.digest = resp.Digest + } + } + + return resp, err +} + +func (rw *remoteWriter) Status() (Status, error) { + resp, err := rw.send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionStat, + }) + if err != nil { + return Status{}, err + } + + return Status{ + Ref: rw.ref, + Offset: resp.Offset, + StartedAt: resp.StartedAt, + UpdatedAt: resp.UpdatedAt, + }, nil +} + +func (rw *remoteWriter) Digest() digest.Digest { + return rw.digest +} + +func (rw *remoteWriter) Write(p []byte) (n int, err error) { + offset := rw.offset + + resp, err := rw.send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionWrite, + Offset: offset, + Data: p, + }) + if err != nil { + return 0, err + } + + n = int(resp.Offset - offset) + if n < len(p) { + err = io.ErrShortWrite + } + + rw.offset += int64(n) + return +} + +func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error { + resp, err := rw.send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionCommit, + ExpectedSize: size, + ExpectedDigest: expected, + }) + if err != nil { + return err + } + + if size != 0 && resp.Offset != size { + return errors.Errorf("unexpected size: %v != %v", resp.Offset, size) + } + + if expected != "" && resp.Digest != expected { + return errors.New("unexpected digest") + } + + return nil +} + +func (rw *remoteWriter) Close() error { + return rw.client.CloseSend() +} diff --git a/content/service.go b/content/service.go new file mode 100644 index 000000000..a3c81849f --- /dev/null +++ b/content/service.go @@ -0,0 +1,233 @@ +package content + +import ( + "errors" + "io" + + contentapi "github.com/docker/containerd/api/services/content" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +type Service struct { + store *Store +} + +var _ contentapi.ContentServer = &Service{} + +func NewService(store *Store) contentapi.ContentServer { + return &Service{store: store} +} + +func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) { + if err := req.Digest.Validate(); err != nil { + return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest) + } + + bi, err := s.store.Info(req.Digest) + if err != nil { + return nil, maybeNotFoundGRPC(err, req.Digest.String()) + } + + return &contentapi.InfoResponse{ + Digest: req.Digest, + Size_: bi.Size, + CommittedAt: bi.CommittedAt, + }, nil +} + +func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_ReadServer) error { + if err := req.Digest.Validate(); err != nil { + return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err) + } + + oi, err := s.store.Info(req.Digest) + if err != nil { + return maybeNotFoundGRPC(err, req.Digest.String()) + } + + rc, err := s.store.Reader(session.Context(), req.Digest) + if err != nil { + return maybeNotFoundGRPC(err, req.Digest.String()) + } + defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance. + + ra, ok := rc.(io.ReaderAt) + if !ok { + // TODO(stevvooe): Need to set this up to get correct behavior across + // board. May change interface to store to just return ReaderAtCloser. + // Possibly, we could just return io.ReaderAt and handle file + // descriptors internally. + return errors.New("content service only supports content stores that return ReaderAt") + } + + var ( + offset = req.Offset + size = req.Size_ + + // TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably + // little inefficient for work over a fast network. We can tune this later. + p = bufPool.Get().([]byte) + ) + defer bufPool.Put(p) + + if offset < 0 { + offset = 0 + } + + if size <= 0 { + size = oi.Size - offset + } + + if offset+size > oi.Size { + return grpc.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size) + } + + if _, err := io.CopyBuffer( + &readResponseWriter{session: session}, + io.NewSectionReader(ra, offset, size), p); err != nil { + return err + } + + return nil +} + +type readResponseWriter struct { + offset int64 + session contentapi.Content_ReadServer +} + +func (rw *readResponseWriter) Write(p []byte) (n int, err error) { + if err := rw.session.Send(&contentapi.ReadResponse{ + Offset: rw.offset, + Data: p, + }); err != nil { + return 0, err + } + + rw.offset += int64(len(p)) + return len(p), nil +} + +func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { + var ( + ref string + msg contentapi.WriteResponse + req *contentapi.WriteRequest + ) + + defer func(msg *contentapi.WriteResponse) { + // pump through the last message if no error was encountered + if err != nil { + return + } + + err = session.Send(msg) + }(&msg) + + // handle the very first request! + req, err = session.Recv() + if err != nil { + return err + } + + ref = req.Ref + if ref == "" { + return grpc.Errorf(codes.InvalidArgument, "first message must have a reference") + } + + // this action locks the writer for the session. + wr, err := s.store.Writer(session.Context(), ref) + if err != nil { + return err + } + defer wr.Close() + + for { + // TODO(stevvooe): We need to study this behavior in containerd a + // little better to decide where to put this. We may be able to make + // this determination elsewhere and avoid even creating the writer. + // + // Ideally, we just use the expected digest on commit to abandon the + // cost of the move when they collide. + if req.ExpectedDigest != "" { + if _, err := s.store.Info(req.ExpectedDigest); err != nil { + if !IsNotFound(err) { + return err + } + + return grpc.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.ExpectedDigest) + } + } + + msg.Action = req.Action + ws, err := wr.Status() + if err != nil { + return err + } + + msg.Offset = ws.Offset + msg.StartedAt = ws.StartedAt + msg.UpdatedAt = ws.UpdatedAt + + switch req.Action { + case contentapi.WriteActionStat: + msg.Digest = wr.Digest() + case contentapi.WriteActionWrite, contentapi.WriteActionCommit: + if req.Offset > 0 { + // validate the offset if provided + if req.Offset != ws.Offset { + return grpc.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset) + } + } + + // issue the write if we actually have data. + if len(req.Data) > 0 { + // While this looks like we could use io.WriterAt here, because we + // maintain the offset as append only, we just issue the write. + n, err := wr.Write(req.Data) + if err != nil { + return err + } + + if n != len(req.Data) { + // TODO(stevvooe): Perhaps, we can recover this by including it + // in the offset on the write return. + return grpc.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data)) + } + + msg.Offset += int64(n) + } + + if req.Action == contentapi.WriteActionCommit { + return wr.Commit(req.ExpectedSize, req.ExpectedDigest) + } + case contentapi.WriteActionAbort: + return s.store.Abort(ref) + } + + if err := session.Send(&msg); err != nil { + return err + } + + req, err = session.Recv() + if err != nil { + return err + } + } + + return nil +} + +func (s *Service) Status(*contentapi.StatusRequest, contentapi.Content_StatusServer) error { + return grpc.Errorf(codes.Unimplemented, "not implemented") +} + +func maybeNotFoundGRPC(err error, id string) error { + if IsNotFound(err) { + return grpc.Errorf(codes.NotFound, "%v: not found", id) + } + + return err +}