diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index 5cdb8c034..9002b5df6 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -7,6 +7,7 @@ import ( _ "net/http/pprof" "os" "os/signal" + "path/filepath" "syscall" "time" @@ -15,7 +16,9 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/containerd" + contentapi "github.com/docker/containerd/api/services/content" api "github.com/docker/containerd/api/services/execution" + "github.com/docker/containerd/content" _ "github.com/docker/containerd/linux" "github.com/docker/containerd/log" "github.com/docker/containerd/services/execution" @@ -55,10 +58,6 @@ func main() { Name: "log-level,l", Usage: "set the logging level [debug, info, warn, error, fatal, panic]", }, - cli.StringFlag{ - Name: "root,r", - Usage: "containerd root directory", - }, cli.StringFlag{ Name: "state", Usage: "containerd state directory", @@ -90,14 +89,27 @@ func main() { return err } serveMetricsAPI() + + contentStore, err := resolveContentStore(context) + if err != nil { + return err + } + contentService := content.NewService(contentStore) + // start the GRPC api with the execution service registered - server := newGRPCServer(execution.New(supervisor)) + server := newGRPCServer() + + api.RegisterContainerServiceServer(server, execution.New(supervisor)) + contentapi.RegisterContentServer(server, contentService) + + // start the GRPC api with registered services if err := serveGRPC(server); err != nil { return err } log.G(global).Infof("containerd successfully booted in %fs", time.Now().Sub(start).Seconds()) return handleSignals(signals, server) } + if err := app.Run(os.Args); err != nil { fmt.Fprintf(os.Stderr, "containerd: %s\n", err) os.Exit(1) @@ -192,8 +204,13 @@ func serveDebugAPI() error { return nil } +func resolveContentStore(context *cli.Context) (*content.Store, error) { + cp := filepath.Join(conf.Root, "content") + return content.NewStore(cp) +} + func loadRuntimes() (map[string]containerd.Runtime, error) { - o := make(map[string]containerd.Runtime) + o := map[string]containerd.Runtime{} for _, name := range containerd.Runtimes() { r, err := containerd.NewRuntime(name, conf.State) if err != nil { @@ -205,9 +222,8 @@ func loadRuntimes() (map[string]containerd.Runtime, error) { return o, nil } -func newGRPCServer(service api.ContainerServiceServer) *grpc.Server { +func newGRPCServer() *grpc.Server { s := grpc.NewServer(grpc.UnaryInterceptor(interceptor)) - api.RegisterContainerServiceServer(s, service) return s } diff --git a/cmd/dist/get.go b/cmd/dist/get.go index bf35933af..f4bfbb233 100644 --- a/cmd/dist/get.go +++ b/cmd/dist/get.go @@ -4,6 +4,8 @@ import ( "io" "os" + contentapi "github.com/docker/containerd/api/services/content" + "github.com/docker/containerd/content" digest "github.com/opencontainers/go-digest" "github.com/urfave/cli" ) @@ -17,17 +19,23 @@ var getCommand = cli.Command{ Output paths can be used to directly access blobs on disk.`, Flags: []cli.Flag{}, Action: func(context *cli.Context) error { - cs, err := resolveContentStore(context) - if err != nil { - return err - } + var ( + ctx = background + ) dgst, err := digest.Parse(context.Args().First()) if err != nil { return err } - rc, err := cs.Open(dgst) + conn, err := connectGRPC(context) + if err != nil { + return err + } + + cs := content.NewProviderFromClient(contentapi.NewContentClient(conn)) + + rc, err := cs.Reader(ctx, dgst) if err != nil { return err } diff --git a/cmd/dist/ingest.go b/cmd/dist/ingest.go index e3353cc54..8d8a7cc62 100644 --- a/cmd/dist/ingest.go +++ b/cmd/dist/ingest.go @@ -5,6 +5,7 @@ import ( "fmt" "os" + contentapi "github.com/docker/containerd/api/services/content" "github.com/docker/containerd/content" "github.com/opencontainers/go-digest" "github.com/urfave/cli" @@ -41,7 +42,7 @@ var ingestCommand = cli.Command{ return err } - cs, err := resolveContentStore(context) + conn, err := connectGRPC(context) if err != nil { return err } @@ -50,9 +51,11 @@ var ingestCommand = cli.Command{ return fmt.Errorf("must specify a transaction reference") } + ingester := content.NewIngesterFromClient(contentapi.NewContentClient(conn)) + // TODO(stevvooe): Allow ingest to be reentrant. Currently, we expect // all data to be written in a single invocation. Allow multiple writes // to the same transaction key followed by a commit. - return content.WriteBlob(ctx, cs, os.Stdin, ref, expectedSize, expectedDigest) + return content.WriteBlob(ctx, ingester, os.Stdin, ref, expectedSize, expectedDigest) }, } diff --git a/cmd/dist/main.go b/cmd/dist/main.go index de4cf22fe..bfe6b24e4 100644 --- a/cmd/dist/main.go +++ b/cmd/dist/main.go @@ -42,6 +42,11 @@ distribution tool Usage: "path to content store root", Value: "/tmp/content", // TODO(stevvooe): for now, just use the PWD/.content }, + cli.StringFlag{ + Name: "socket, s", + Usage: "socket path for containerd's GRPC server", + Value: "/run/containerd/containerd.sock", + }, } app.Commands = []cli.Command{ fetchCommand, diff --git a/content/client.go b/content/client.go new file mode 100644 index 000000000..92bea3111 --- /dev/null +++ b/content/client.go @@ -0,0 +1,216 @@ +package content + +import ( + "context" + "io" + + contentapi "github.com/docker/containerd/api/services/content" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" +) + +func NewProviderFromClient(client contentapi.ContentClient) Provider { + return &remoteProvider{ + client: client, + } +} + +type remoteProvider struct { + client contentapi.ContentClient +} + +func (rp *remoteProvider) Reader(ctx context.Context, dgst digest.Digest) (io.ReadCloser, error) { + client, err := rp.client.Read(ctx, &contentapi.ReadRequest{Digest: dgst}) + if err != nil { + return nil, err + } + + return &remoteReader{ + client: client, + }, nil +} + +type remoteReader struct { + client contentapi.Content_ReadClient + extra []byte +} + +func (rr *remoteReader) Read(p []byte) (n int, err error) { + n += copy(p, rr.extra) + if n >= len(p) { + if n <= len(rr.extra) { + rr.extra = rr.extra[n:] + } else { + rr.extra = rr.extra[:0] + } + return + } + + p = p[n:] + for len(p) > 0 { + var resp *contentapi.ReadResponse + // fill our buffer up until we can fill p. + resp, err = rr.client.Recv() + if err != nil { + return + } + + copied := copy(p, resp.Data) + n += copied + p = p[copied:] + + if copied < len(p) { + continue + } + + rr.extra = append(rr.extra, resp.Data[copied:]...) + } + + return +} + +func (rr *remoteReader) Close() error { + return rr.client.CloseSend() +} + +func NewIngesterFromClient(client contentapi.ContentClient) Ingester { + return &remoteIngester{ + client: client, + } +} + +type remoteIngester struct { + client contentapi.ContentClient +} + +func (ri *remoteIngester) Writer(ctx context.Context, ref string) (Writer, error) { + wrclient, offset, err := ri.negotiate(ctx, ref) + if err != nil { + return nil, err + } + + return &remoteWriter{ + client: wrclient, + offset: offset, + }, nil +} + +func (ri *remoteIngester) negotiate(ctx context.Context, ref string) (contentapi.Content_WriteClient, int64, error) { + wrclient, err := ri.client.Write(ctx) + if err != nil { + return nil, 0, err + } + + if err := wrclient.Send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionStat, + Ref: ref, + }); err != nil { + return nil, 0, err + } + + resp, err := wrclient.Recv() + if err != nil { + return nil, 0, err + } + + return wrclient, resp.Offset, nil +} + +type remoteWriter struct { + ref string + client contentapi.Content_WriteClient + offset int64 + digest digest.Digest +} + +func newRemoteWriter(client contentapi.Content_WriteClient, ref string, offset int64) (*remoteWriter, error) { + return &remoteWriter{ + ref: ref, + client: client, + offset: offset, + }, nil +} + +// send performs a synchronous req-resp cycle on the client. +func (rw *remoteWriter) send(req *contentapi.WriteRequest) (*contentapi.WriteResponse, error) { + if err := rw.client.Send(req); err != nil { + return nil, err + } + + resp, err := rw.client.Recv() + + if err == nil { + // try to keep these in sync + if resp.Digest != "" { + rw.digest = resp.Digest + } + } + + return resp, err +} + +func (rw *remoteWriter) Status() (Status, error) { + resp, err := rw.send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionStat, + }) + if err != nil { + return Status{}, err + } + + return Status{ + Ref: rw.ref, + Offset: resp.Offset, + StartedAt: resp.StartedAt, + UpdatedAt: resp.UpdatedAt, + }, nil +} + +func (rw *remoteWriter) Digest() digest.Digest { + return rw.digest +} + +func (rw *remoteWriter) Write(p []byte) (n int, err error) { + offset := rw.offset + + resp, err := rw.send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionWrite, + Offset: offset, + Data: p, + }) + if err != nil { + return 0, err + } + + n = int(resp.Offset - offset) + if n < len(p) { + err = io.ErrShortWrite + } + + rw.offset += int64(n) + return +} + +func (rw *remoteWriter) Commit(size int64, expected digest.Digest) error { + resp, err := rw.send(&contentapi.WriteRequest{ + Action: contentapi.WriteActionCommit, + ExpectedSize: size, + ExpectedDigest: expected, + }) + if err != nil { + return err + } + + if size != 0 && resp.Offset != size { + return errors.Errorf("unexpected size: %v != %v", resp.Offset, size) + } + + if expected != "" && resp.Digest != expected { + return errors.New("unexpected digest") + } + + return nil +} + +func (rw *remoteWriter) Close() error { + return rw.client.CloseSend() +} diff --git a/content/service.go b/content/service.go new file mode 100644 index 000000000..a3c81849f --- /dev/null +++ b/content/service.go @@ -0,0 +1,233 @@ +package content + +import ( + "errors" + "io" + + contentapi "github.com/docker/containerd/api/services/content" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" +) + +type Service struct { + store *Store +} + +var _ contentapi.ContentServer = &Service{} + +func NewService(store *Store) contentapi.ContentServer { + return &Service{store: store} +} + +func (s *Service) Info(ctx context.Context, req *contentapi.InfoRequest) (*contentapi.InfoResponse, error) { + if err := req.Digest.Validate(); err != nil { + return nil, grpc.Errorf(codes.InvalidArgument, "%q failed validation", req.Digest) + } + + bi, err := s.store.Info(req.Digest) + if err != nil { + return nil, maybeNotFoundGRPC(err, req.Digest.String()) + } + + return &contentapi.InfoResponse{ + Digest: req.Digest, + Size_: bi.Size, + CommittedAt: bi.CommittedAt, + }, nil +} + +func (s *Service) Read(req *contentapi.ReadRequest, session contentapi.Content_ReadServer) error { + if err := req.Digest.Validate(); err != nil { + return grpc.Errorf(codes.InvalidArgument, "%v: %v", req.Digest, err) + } + + oi, err := s.store.Info(req.Digest) + if err != nil { + return maybeNotFoundGRPC(err, req.Digest.String()) + } + + rc, err := s.store.Reader(session.Context(), req.Digest) + if err != nil { + return maybeNotFoundGRPC(err, req.Digest.String()) + } + defer rc.Close() // TODO(stevvooe): Cache these file descriptors for performance. + + ra, ok := rc.(io.ReaderAt) + if !ok { + // TODO(stevvooe): Need to set this up to get correct behavior across + // board. May change interface to store to just return ReaderAtCloser. + // Possibly, we could just return io.ReaderAt and handle file + // descriptors internally. + return errors.New("content service only supports content stores that return ReaderAt") + } + + var ( + offset = req.Offset + size = req.Size_ + + // TODO(stevvooe): Using the global buffer pool. At 32KB, it is probably + // little inefficient for work over a fast network. We can tune this later. + p = bufPool.Get().([]byte) + ) + defer bufPool.Put(p) + + if offset < 0 { + offset = 0 + } + + if size <= 0 { + size = oi.Size - offset + } + + if offset+size > oi.Size { + return grpc.Errorf(codes.OutOfRange, "read past object length %v bytes", oi.Size) + } + + if _, err := io.CopyBuffer( + &readResponseWriter{session: session}, + io.NewSectionReader(ra, offset, size), p); err != nil { + return err + } + + return nil +} + +type readResponseWriter struct { + offset int64 + session contentapi.Content_ReadServer +} + +func (rw *readResponseWriter) Write(p []byte) (n int, err error) { + if err := rw.session.Send(&contentapi.ReadResponse{ + Offset: rw.offset, + Data: p, + }); err != nil { + return 0, err + } + + rw.offset += int64(len(p)) + return len(p), nil +} + +func (s *Service) Write(session contentapi.Content_WriteServer) (err error) { + var ( + ref string + msg contentapi.WriteResponse + req *contentapi.WriteRequest + ) + + defer func(msg *contentapi.WriteResponse) { + // pump through the last message if no error was encountered + if err != nil { + return + } + + err = session.Send(msg) + }(&msg) + + // handle the very first request! + req, err = session.Recv() + if err != nil { + return err + } + + ref = req.Ref + if ref == "" { + return grpc.Errorf(codes.InvalidArgument, "first message must have a reference") + } + + // this action locks the writer for the session. + wr, err := s.store.Writer(session.Context(), ref) + if err != nil { + return err + } + defer wr.Close() + + for { + // TODO(stevvooe): We need to study this behavior in containerd a + // little better to decide where to put this. We may be able to make + // this determination elsewhere and avoid even creating the writer. + // + // Ideally, we just use the expected digest on commit to abandon the + // cost of the move when they collide. + if req.ExpectedDigest != "" { + if _, err := s.store.Info(req.ExpectedDigest); err != nil { + if !IsNotFound(err) { + return err + } + + return grpc.Errorf(codes.AlreadyExists, "blob with expected digest %v exists", req.ExpectedDigest) + } + } + + msg.Action = req.Action + ws, err := wr.Status() + if err != nil { + return err + } + + msg.Offset = ws.Offset + msg.StartedAt = ws.StartedAt + msg.UpdatedAt = ws.UpdatedAt + + switch req.Action { + case contentapi.WriteActionStat: + msg.Digest = wr.Digest() + case contentapi.WriteActionWrite, contentapi.WriteActionCommit: + if req.Offset > 0 { + // validate the offset if provided + if req.Offset != ws.Offset { + return grpc.Errorf(codes.OutOfRange, "write @%v must occur at current offset %v", req.Offset, ws.Offset) + } + } + + // issue the write if we actually have data. + if len(req.Data) > 0 { + // While this looks like we could use io.WriterAt here, because we + // maintain the offset as append only, we just issue the write. + n, err := wr.Write(req.Data) + if err != nil { + return err + } + + if n != len(req.Data) { + // TODO(stevvooe): Perhaps, we can recover this by including it + // in the offset on the write return. + return grpc.Errorf(codes.DataLoss, "wrote %v of %v bytes", n, len(req.Data)) + } + + msg.Offset += int64(n) + } + + if req.Action == contentapi.WriteActionCommit { + return wr.Commit(req.ExpectedSize, req.ExpectedDigest) + } + case contentapi.WriteActionAbort: + return s.store.Abort(ref) + } + + if err := session.Send(&msg); err != nil { + return err + } + + req, err = session.Recv() + if err != nil { + return err + } + } + + return nil +} + +func (s *Service) Status(*contentapi.StatusRequest, contentapi.Content_StatusServer) error { + return grpc.Errorf(codes.Unimplemented, "not implemented") +} + +func maybeNotFoundGRPC(err error, id string) error { + if IsNotFound(err) { + return grpc.Errorf(codes.NotFound, "%v: not found", id) + } + + return err +}