From 8c74da3983116f7dc84283f171c7910612473437 Mon Sep 17 00:00:00 2001 From: Stephen J Day Date: Mon, 3 Apr 2017 18:40:59 -0700 Subject: [PATCH] cmd/dist, cmd/ctr: move image store access to GRPC With this changeset, image store access is now moved to completely accessible over GRPC. No clients manipulate the image store database directly and the GRPC client is fully featured. The metadata database is now managed by the daemon and access coordinated via services. Signed-off-by: Stephen J Day --- cmd/containerd/builtins.go | 1 + cmd/containerd/main.go | 35 +++++++++++++++++++++++++++++++---- cmd/ctr/run.go | 15 +++------------ cmd/ctr/utils.go | 22 +++++----------------- cmd/dist/common.go | 32 ++++++++++---------------------- cmd/dist/images.go | 27 ++++++++------------------- cmd/dist/pull.go | 27 ++++++--------------------- plugin/plugin.go | 4 +++- services/content/service.go | 2 +- services/rootfs/service.go | 2 +- 10 files changed, 69 insertions(+), 98 deletions(-) diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go index e9ee8ade1..793e4460d 100644 --- a/cmd/containerd/builtins.go +++ b/cmd/containerd/builtins.go @@ -7,6 +7,7 @@ import ( _ "github.com/containerd/containerd/services/content" _ "github.com/containerd/containerd/services/execution" _ "github.com/containerd/containerd/services/healthcheck" + _ "github.com/containerd/containerd/services/images" _ "github.com/containerd/containerd/services/metrics" _ "github.com/containerd/containerd/services/rootfs" _ "github.com/containerd/containerd/snapshot/btrfs" diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index fa0bffd41..2c0227a62 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -11,6 +11,7 @@ import ( "syscall" "time" + "github.com/boltdb/bolt" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" gocontext "golang.org/x/net/context" "google.golang.org/grpc" @@ -19,8 +20,10 @@ import ( "github.com/containerd/containerd" contentapi "github.com/containerd/containerd/api/services/content" api "github.com/containerd/containerd/api/services/execution" + imagesapi "github.com/containerd/containerd/api/services/images" rootfsapi "github.com/containerd/containerd/api/services/rootfs" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/plugin" "github.com/containerd/containerd/reaper" @@ -115,11 +118,16 @@ func main() { if err != nil { return err } + meta, err := resolveMetaDB(context) + if err != nil { + return err + } + defer meta.Close() snapshotter, err := loadSnapshotter(store) if err != nil { return err } - services, err := loadServices(runtimes, store, snapshotter) + services, err := loadServices(runtimes, store, snapshotter, meta) if err != nil { return err } @@ -240,6 +248,22 @@ func resolveContentStore() (*content.Store, error) { return content.NewStore(cp) } +func resolveMetaDB(ctx *cli.Context) (*bolt.DB, error) { + path := filepath.Join(conf.Root, "meta.db") + + db, err := bolt.Open(path, 0644, nil) + if err != nil { + return nil, err + } + + // TODO(stevvooe): Break these down into components to be initialized. + if err := images.InitDB(db); err != nil { + return nil, err + } + + return db, nil +} + func loadRuntimes(monitor plugin.ContainerMonitor) (map[string]containerd.Runtime, error) { o := make(map[string]containerd.Runtime) for name, rr := range plugin.Registrations() { @@ -306,7 +330,7 @@ func loadSnapshotter(store *content.Store) (snapshot.Snapshotter, error) { ic := &plugin.InitContext{ Root: conf.Root, State: conf.State, - Store: store, + Content: store, Context: log.WithModule(global, moduleName), } if sr.Config != nil { @@ -333,7 +357,7 @@ func newGRPCServer() *grpc.Server { return s } -func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, sn snapshot.Snapshotter) ([]plugin.Service, error) { +func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, sn snapshot.Snapshotter, meta *bolt.DB) ([]plugin.Service, error) { var o []plugin.Service for name, sr := range plugin.Registrations() { if sr.Type != plugin.GRPCPlugin { @@ -345,7 +369,8 @@ func loadServices(runtimes map[string]containerd.Runtime, store *content.Store, State: conf.State, Context: log.WithModule(global, fmt.Sprintf("service-%s", name)), Runtimes: runtimes, - Store: store, + Content: store, + Meta: meta, Snapshotter: sn, } if sr.Config != nil { @@ -397,6 +422,8 @@ func interceptor(ctx gocontext.Context, ctx = log.WithModule(ctx, "content") case rootfsapi.RootFSServer: ctx = log.WithModule(ctx, "rootfs") + case imagesapi.ImagesServer: + ctx = log.WithModule(ctx, "images") default: fmt.Printf("unknown GRPC server type: %#v\n", info.Server) } diff --git a/cmd/ctr/run.go b/cmd/ctr/run.go index d1a4dd868..1152c3677 100644 --- a/cmd/ctr/run.go +++ b/cmd/ctr/run.go @@ -277,27 +277,18 @@ var runCommand = cli.Command{ return err } - db, err := getDB(context, false) + imageStore, err := getImageStore(context) if err != nil { - return errors.Wrap(err, "failed opening database") + return errors.Wrap(err, "failed resolving image store") } - defer db.Close() - - tx, err := db.Begin(false) - if err != nil { - return err - } - defer tx.Rollback() ref := context.Args().First() - image, err := images.Get(tx, ref) + image, err := imageStore.Get(ctx, ref) if err != nil { return errors.Wrapf(err, "could not resolve %q", ref) } // let's close out our db and tx so we don't hold the lock whilst running. - tx.Rollback() - db.Close() diffIDs, err := image.RootFS(ctx, provider) if err != nil { diff --git a/cmd/ctr/utils.go b/cmd/ctr/utils.go index e727cbf98..7dc36287c 100644 --- a/cmd/ctr/utils.go +++ b/cmd/ctr/utils.go @@ -14,14 +14,15 @@ import ( gocontext "context" - "github.com/boltdb/bolt" contentapi "github.com/containerd/containerd/api/services/content" "github.com/containerd/containerd/api/services/execution" + imagesapi "github.com/containerd/containerd/api/services/images" rootfsapi "github.com/containerd/containerd/api/services/rootfs" "github.com/containerd/containerd/api/types/container" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" contentservice "github.com/containerd/containerd/services/content" + imagesservice "github.com/containerd/containerd/services/images" "github.com/pkg/errors" "github.com/tonistiigi/fifo" "github.com/urfave/cli" @@ -134,25 +135,12 @@ func getRootFSService(context *cli.Context) (rootfsapi.RootFSClient, error) { return rootfsapi.NewRootFSClient(conn), nil } -func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) { - // TODO(stevvooe): For now, we operate directly on the database. We will - // replace this with a GRPC service when the details are more concrete. - path := filepath.Join(ctx.GlobalString("root"), "meta.db") - - db, err := bolt.Open(path, 0644, &bolt.Options{ - ReadOnly: readonly, - }) +func getImageStore(clicontext *cli.Context) (images.Store, error) { + conn, err := getGRPCConnection(clicontext) if err != nil { return nil, err } - - if !readonly { - if err := images.InitDB(db); err != nil { - return nil, err - } - } - - return db, nil + return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), nil } func getTempDir(id string) (string, error) { diff --git a/cmd/dist/common.go b/cmd/dist/common.go index ab5281692..1c819d011 100644 --- a/cmd/dist/common.go +++ b/cmd/dist/common.go @@ -6,11 +6,12 @@ import ( "path/filepath" "time" - "github.com/boltdb/bolt" + imagesapi "github.com/containerd/containerd/api/services/images" "github.com/containerd/containerd/content" "github.com/containerd/containerd/images" "github.com/containerd/containerd/remotes" "github.com/containerd/containerd/remotes/docker" + imagesservice "github.com/containerd/containerd/services/images" "github.com/urfave/cli" "google.golang.org/grpc" ) @@ -27,6 +28,14 @@ func resolveContentStore(context *cli.Context) (*content.Store, error) { return content.NewStore(root) } +func resolveImageStore(clicontext *cli.Context) (images.Store, error) { + conn, err := connectGRPC(clicontext) + if err != nil { + return nil, err + } + return imagesservice.NewStoreFromClient(imagesapi.NewImagesClient(conn)), nil +} + func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) { socket := context.GlobalString("socket") timeout := context.GlobalDuration("connect-timeout") @@ -40,27 +49,6 @@ func connectGRPC(context *cli.Context) (*grpc.ClientConn, error) { ) } -func getDB(ctx *cli.Context, readonly bool) (*bolt.DB, error) { - // TODO(stevvooe): For now, we operate directly on the database. We will - // replace this with a GRPC service when the details are more concrete. - path := filepath.Join(ctx.GlobalString("root"), "meta.db") - - db, err := bolt.Open(path, 0644, &bolt.Options{ - ReadOnly: readonly, - }) - if err != nil { - return nil, err - } - - if !readonly { - if err := images.InitDB(db); err != nil { - return nil, err - } - } - - return db, nil -} - // getResolver prepares the resolver from the environment and options. func getResolver(ctx context.Context) (remotes.Resolver, error) { return docker.NewResolver(), nil diff --git a/cmd/dist/images.go b/cmd/dist/images.go index 164785590..984ae820d 100644 --- a/cmd/dist/images.go +++ b/cmd/dist/images.go @@ -6,7 +6,6 @@ import ( "text/tabwriter" contentapi "github.com/containerd/containerd/api/services/content" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/progress" contentservice "github.com/containerd/containerd/services/content" @@ -25,23 +24,19 @@ var imagesCommand = cli.Command{ ctx = background ) - db, err := getDB(clicontext, true) + imageStore, err := resolveImageStore(clicontext) if err != nil { - return errors.Wrap(err, "failed to open database") + return err } - tx, err := db.Begin(false) - if err != nil { - return errors.Wrap(err, "could not start transaction") - } - defer tx.Rollback() conn, err := connectGRPC(clicontext) if err != nil { return err } + provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) - images, err := images.List(tx) + images, err := imageStore.List(ctx) if err != nil { return errors.Wrap(err, "failed to list images") } @@ -54,7 +49,7 @@ var imagesCommand = cli.Command{ log.G(ctx).WithError(err).Errorf("failed calculating size for image %s", image.Name) } - fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Descriptor.MediaType, image.Descriptor.Digest, progress.Bytes(size)) + fmt.Fprintf(tw, "%v\t%v\t%v\t%v\t\n", image.Name, image.Target.MediaType, image.Target.Digest, progress.Bytes(size)) } tw.Flush() @@ -74,19 +69,13 @@ var rmiCommand = cli.Command{ exitErr error ) - db, err := getDB(clicontext, false) + imageStore, err := resolveImageStore(clicontext) if err != nil { - return errors.Wrap(err, "failed to open database") + return err } - tx, err := db.Begin(true) - if err != nil { - return errors.Wrap(err, "could not start transaction") - } - defer tx.Rollback() - for _, target := range clicontext.Args() { - if err := images.Delete(tx, target); err != nil { + if err := imageStore.Delete(ctx, target); err != nil { if exitErr == nil { exitErr = errors.Wrapf(err, "unable to delete %v", target) } diff --git a/cmd/dist/pull.go b/cmd/dist/pull.go index f00ca695e..84840342c 100644 --- a/cmd/dist/pull.go +++ b/cmd/dist/pull.go @@ -47,17 +47,10 @@ command. As part of this process, we do the following: return err } - db, err := getDB(clicontext, false) + imageStore, err := resolveImageStore(clicontext) if err != nil { return err } - defer db.Close() - - tx, err := db.Begin(true) - if err != nil { - return err - } - defer tx.Rollback() resolver, err := getResolver(ctx) if err != nil { @@ -65,6 +58,7 @@ command. As part of this process, we do the following: } ongoing := newJobs() + // TODO(stevvooe): Must unify this type. ingester := contentservice.NewIngesterFromClient(contentapi.NewContentClient(conn)) provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) @@ -88,13 +82,8 @@ command. As part of this process, we do the following: close(resolved) eg.Go(func() error { - return images.Register(tx, name, desc) + return imageStore.Put(ctx, name, desc) }) - defer func() { - if err := tx.Commit(); err != nil { - log.G(ctx).WithError(err).Error("commit failed") - } - }() return images.Dispatch(ctx, images.Handlers(images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) { @@ -114,24 +103,20 @@ command. As part of this process, we do the following: }() defer func() { - ctx := context.Background() - tx, err := db.Begin(false) - if err != nil { - log.G(ctx).Fatal(err) - } + ctx := background // TODO(stevvooe): This section unpacks the layers and resolves the // root filesystem chainid for the image. For now, we just print // it, but we should keep track of this in the metadata storage. - image, err := images.Get(tx, resolvedImageName) + image, err := imageStore.Get(ctx, resolvedImageName) if err != nil { log.G(ctx).Fatal(err) } provider := contentservice.NewProviderFromClient(contentapi.NewContentClient(conn)) - p, err := content.ReadBlob(ctx, provider, image.Descriptor.Digest) + p, err := content.ReadBlob(ctx, provider, image.Target.Digest) if err != nil { log.G(ctx).Fatal(err) } diff --git a/plugin/plugin.go b/plugin/plugin.go index 96a8c0757..095b9e8f6 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -4,6 +4,7 @@ import ( "fmt" "sync" + "github.com/boltdb/bolt" "github.com/containerd/containerd" "github.com/containerd/containerd/content" "github.com/containerd/containerd/snapshot" @@ -32,7 +33,8 @@ type InitContext struct { Root string State string Runtimes map[string]containerd.Runtime - Store *content.Store + Content *content.Store + Meta *bolt.DB Snapshotter snapshot.Snapshotter Config interface{} Context context.Context diff --git a/services/content/service.go b/services/content/service.go index c31112403..128104fa0 100644 --- a/services/content/service.go +++ b/services/content/service.go @@ -38,7 +38,7 @@ func init() { func NewService(ic *plugin.InitContext) (interface{}, error) { return &Service{ - store: ic.Store, + store: ic.Content, }, nil } diff --git a/services/rootfs/service.go b/services/rootfs/service.go index 146c13cd6..539d36e52 100644 --- a/services/rootfs/service.go +++ b/services/rootfs/service.go @@ -22,7 +22,7 @@ func init() { plugin.Register("rootfs-grpc", &plugin.Registration{ Type: plugin.GRPCPlugin, Init: func(ic *plugin.InitContext) (interface{}, error) { - return NewService(ic.Store, ic.Snapshotter) + return NewService(ic.Content, ic.Snapshotter) }, }) }