diff --git a/cmd/containerd/builtins.go b/cmd/containerd/builtins.go index 5baa9dacf..9386b1c18 100644 --- a/cmd/containerd/builtins.go +++ b/cmd/containerd/builtins.go @@ -2,6 +2,7 @@ package main // register containerd builtins here import ( + _ "github.com/containerd/containerd/differ" _ "github.com/containerd/containerd/services/containers" _ "github.com/containerd/containerd/services/content" _ "github.com/containerd/containerd/services/diff" diff --git a/cmd/containerd/config.go b/cmd/containerd/config.go index d47951f8d..1a451add0 100644 --- a/cmd/containerd/config.go +++ b/cmd/containerd/config.go @@ -49,6 +49,9 @@ type config struct { Metrics metricsConfig `toml:"metrics"` // Snapshotter specifies which snapshot driver to use Snapshotter string `toml:"snapshotter"` + // Differ specifies which differ to use. Differ is tightly coupled with the snapshotter + // so not all combinations may work. + Differ string `toml:"differ"` // Plugins provides plugin specific configuration for the initialization of a plugin Plugins map[string]toml.Primitive `toml:"plugins"` // Enable containerd as a subreaper diff --git a/cmd/containerd/config_linux.go b/cmd/containerd/config_linux.go index 64863fa02..f88ed0266 100644 --- a/cmd/containerd/config_linux.go +++ b/cmd/containerd/config_linux.go @@ -12,5 +12,6 @@ func defaultConfig() *config { Address: "/run/containerd/debug.sock", }, Snapshotter: "overlay", + Differ: "base", } } diff --git a/cmd/containerd/config_unix.go b/cmd/containerd/config_unix.go index 358969bae..e26b5f821 100644 --- a/cmd/containerd/config_unix.go +++ b/cmd/containerd/config_unix.go @@ -14,5 +14,6 @@ func defaultConfig() *config { Address: "/run/containerd/debug.sock", }, Snapshotter: "naive", + Differ: "base", } } diff --git a/cmd/containerd/config_windows.go b/cmd/containerd/config_windows.go index eed74d1c0..dbeabe3fd 100644 --- a/cmd/containerd/config_windows.go +++ b/cmd/containerd/config_windows.go @@ -17,5 +17,6 @@ func defaultConfig() *config { Address: `\\.\pipe\containerd-debug`, }, Snapshotter: "windows", + Differ: "base", } } diff --git a/cmd/containerd/main.go b/cmd/containerd/main.go index b29a5f474..2f82e3f60 100644 --- a/cmd/containerd/main.go +++ b/cmd/containerd/main.go @@ -130,7 +130,13 @@ func main() { if err != nil { return err } - services, err := loadServices(runtimes, store, snapshotter, meta) + + differ, err := loadDiffer(snapshotter, store) + if err != nil { + return err + } + + services, err := loadServices(runtimes, store, snapshotter, meta, differ) if err != nil { return err } @@ -351,6 +357,40 @@ func loadSnapshotter(store content.Store) (snapshot.Snapshotter, error) { return nil, fmt.Errorf("snapshotter not loaded: %v", conf.Snapshotter) } +func loadDiffer(snapshotter snapshot.Snapshotter, store content.Store) (plugin.Differ, error) { + for name, sr := range plugin.Registrations() { + if sr.Type != plugin.DiffPlugin { + continue + } + moduleName := fmt.Sprintf("diff-%s", conf.Differ) + if name != moduleName { + continue + } + + log.G(global).Infof("loading differ plugin %q...", name) + ic := &plugin.InitContext{ + Root: conf.Root, + State: conf.State, + Content: store, + Snapshotter: snapshotter, + Context: log.WithModule(global, moduleName), + } + if sr.Config != nil { + if err := conf.decodePlugin(name, sr.Config); err != nil { + return nil, err + } + ic.Config = sr.Config + } + sn, err := sr.Init(ic) + if err != nil { + return nil, err + } + + return sn.(plugin.Differ), nil + } + return nil, fmt.Errorf("differ not loaded: %v", conf.Differ) +} + func newGRPCServer() *grpc.Server { s := grpc.NewServer( grpc.UnaryInterceptor(interceptor), @@ -359,7 +399,9 @@ func newGRPCServer() *grpc.Server { return s } -func loadServices(runtimes map[string]plugin.Runtime, store content.Store, sn snapshot.Snapshotter, meta *bolt.DB) ([]plugin.Service, error) { +func loadServices(runtimes map[string]plugin.Runtime, + store content.Store, sn snapshot.Snapshotter, + meta *bolt.DB, differ plugin.Differ) ([]plugin.Service, error) { var o []plugin.Service for name, sr := range plugin.Registrations() { if sr.Type != plugin.GRPCPlugin { @@ -374,6 +416,7 @@ func loadServices(runtimes map[string]plugin.Runtime, store content.Store, sn sn Content: store, Meta: meta, Snapshotter: sn, + Differ: differ, } if sr.Config != nil { if err := conf.decodePlugin(name, sr.Config); err != nil { diff --git a/differ/differ.go b/differ/differ.go new file mode 100644 index 000000000..10883df4e --- /dev/null +++ b/differ/differ.go @@ -0,0 +1,164 @@ +package differ + +import ( + "io" + "io/ioutil" + "os" + + "github.com/containerd/containerd/archive" + "github.com/containerd/containerd/archive/compression" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/mount" + "github.com/containerd/containerd/plugin" + "github.com/containerd/containerd/snapshot" + digest "github.com/opencontainers/go-digest" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +func init() { + plugin.Register("diff-base", &plugin.Registration{ + Type: plugin.DiffPlugin, + Init: func(ic *plugin.InitContext) (interface{}, error) { + return newBaseDiff(ic.Content, ic.Snapshotter) + }, + }) +} + +type BaseDiff struct { + store content.Store + snapshotter snapshot.Snapshotter +} + +var _ plugin.Differ = &BaseDiff{} + +var emptyDesc = ocispec.Descriptor{} + +func newBaseDiff(store content.Store, snapshotter snapshot.Snapshotter) (*BaseDiff, error) { + return &BaseDiff{ + store: store, + snapshotter: snapshotter, + }, nil +} + +func (s *BaseDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount) (ocispec.Descriptor, error) { + // TODO: Check for supported media types + dir, err := ioutil.TempDir("", "extract-") + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to create temporary directory") + } + defer os.RemoveAll(dir) + + if err := mount.MountAll(mounts, dir); err != nil { + return emptyDesc, errors.Wrap(err, "failed to mount") + } + defer mount.Unmount(dir, 0) + + r, err := s.store.Reader(ctx, desc.Digest) + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to get reader from content store") + } + defer r.Close() + + // TODO: only decompress stream if media type is compressed + ds, err := compression.DecompressStream(r) + if err != nil { + return emptyDesc, err + } + defer ds.Close() + + digester := digest.Canonical.Digester() + rc := &readCounter{ + r: io.TeeReader(ds, digester.Hash()), + } + + if _, err := archive.Apply(ctx, dir, rc); err != nil { + return emptyDesc, err + } + + // Read any trailing data + if _, err := io.Copy(ioutil.Discard, rc); err != nil { + return emptyDesc, err + } + + return ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageLayer, + Size: rc.c, + Digest: digester.Digest(), + }, nil +} + +func (s *BaseDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error) { + + aDir, err := ioutil.TempDir("", "left-") + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to create temporary directory") + } + defer os.RemoveAll(aDir) + + bDir, err := ioutil.TempDir("", "right-") + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to create temporary directory") + } + defer os.RemoveAll(bDir) + + if err := mount.MountAll(lower, aDir); err != nil { + return emptyDesc, errors.Wrap(err, "failed to mount") + } + defer mount.Unmount(aDir, 0) + + if err := mount.MountAll(upper, bDir); err != nil { + return emptyDesc, errors.Wrap(err, "failed to mount") + } + defer mount.Unmount(bDir, 0) + + cw, err := s.store.Writer(ctx, ref, 0, "") + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to open writer") + } + + // TODO: Validate media type + + // TODO: Support compressed media types (link compressed to uncompressed) + //dgstr := digest.SHA256.Digester() + //wc := &writeCounter{} + //compressed, err := compression.CompressStream(cw, compression.Gzip) + //if err != nil { + // return nil, errors.Wrap(err, "failed to get compressed stream") + //} + //err = archive.WriteDiff(ctx, io.MultiWriter(compressed, dgstr.Hash(), wc), lowerDir, upperDir) + //compressed.Close() + + err = archive.WriteDiff(ctx, cw, aDir, bDir) + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to write diff") + } + + dgst := cw.Digest() + if err := cw.Commit(0, dgst); err != nil { + return emptyDesc, errors.Wrap(err, "failed to commit") + } + + info, err := s.store.Info(ctx, dgst) + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to get info from content store") + } + + return ocispec.Descriptor{ + MediaType: media, + Size: info.Size, + Digest: info.Digest, + }, nil +} + +type readCounter struct { + r io.Reader + c int64 +} + +func (rc *readCounter) Read(p []byte) (n int, err error) { + n, err = rc.r.Read(p) + rc.c += int64(n) + return +} diff --git a/plugin/differ.go b/plugin/differ.go new file mode 100644 index 000000000..80c3c6181 --- /dev/null +++ b/plugin/differ.go @@ -0,0 +1,12 @@ +package plugin + +import ( + "github.com/containerd/containerd/mount" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/net/context" +) + +type Differ interface { + Apply(ctx context.Context, desc ocispec.Descriptor, mount []mount.Mount) (ocispec.Descriptor, error) + DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error) +} diff --git a/plugin/plugin.go b/plugin/plugin.go index 6310e9019..8248d6244 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -19,6 +19,7 @@ const ( GRPCPlugin SnapshotPlugin TaskMonitorPlugin + DiffPlugin ) type Registration struct { @@ -35,6 +36,7 @@ type InitContext struct { Content content.Store Meta *bolt.DB Snapshotter snapshot.Snapshotter + Differ Differ Config interface{} Context context.Context Monitor TaskMonitor diff --git a/services/diff/client.go b/services/diff/client.go index 359e9328d..1fe1390fd 100644 --- a/services/diff/client.go +++ b/services/diff/client.go @@ -54,6 +54,14 @@ func (r *remote) DiffMounts(ctx context.Context, a, b []mount.Mount, media, ref return toDescriptor(resp.Diff), nil } +func toDescriptor(d *descriptor.Descriptor) ocispec.Descriptor { + return ocispec.Descriptor{ + MediaType: d.MediaType, + Digest: d.Digest, + Size: d.Size_, + } +} + func fromDescriptor(d ocispec.Descriptor) *descriptor.Descriptor { return &descriptor.Descriptor{ MediaType: d.MediaType, diff --git a/services/diff/service.go b/services/diff/service.go index 165a041f2..79da2ba5c 100644 --- a/services/diff/service.go +++ b/services/diff/service.go @@ -1,22 +1,10 @@ package diff import ( - "io" - "io/ioutil" - "os" - diffapi "github.com/containerd/containerd/api/services/diff" - "github.com/containerd/containerd/api/types/descriptor" mounttypes "github.com/containerd/containerd/api/types/mount" - "github.com/containerd/containerd/archive" - "github.com/containerd/containerd/archive/compression" - "github.com/containerd/containerd/content" "github.com/containerd/containerd/mount" "github.com/containerd/containerd/plugin" - "github.com/containerd/containerd/snapshot" - digest "github.com/opencontainers/go-digest" - ocispec "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/pkg/errors" "golang.org/x/net/context" "google.golang.org/grpc" ) @@ -25,21 +13,15 @@ func init() { plugin.Register("diff-grpc", &plugin.Registration{ Type: plugin.GRPCPlugin, Init: func(ic *plugin.InitContext) (interface{}, error) { - return newService(ic.Content, ic.Snapshotter) + return &service{ + diff: ic.Differ, + }, nil }, }) } type service struct { - store content.Store - snapshotter snapshot.Snapshotter -} - -func newService(store content.Store, snapshotter snapshot.Snapshotter) (*service, error) { - return &service{ - store: store, - snapshotter: snapshotter, - }, nil + diff plugin.Differ } func (s *service) Register(gs *grpc.Server) error { @@ -53,143 +35,31 @@ func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi mounts := toMounts(er.Mounts) - dir, err := ioutil.TempDir("", "extract-") - if err != nil { - return nil, errors.Wrap(err, "failed to create temporary directory") - } - defer os.RemoveAll(dir) - - if err := mount.MountAll(mounts, dir); err != nil { - return nil, errors.Wrap(err, "failed to mount") - } - defer mount.Unmount(dir, 0) - - r, err := s.store.Reader(ctx, desc.Digest) - if err != nil { - return nil, errors.Wrap(err, "failed to get reader from content store") - } - defer r.Close() - - // TODO: only decompress stream if media type is compressed - ds, err := compression.DecompressStream(r) + ocidesc, err := s.diff.Apply(ctx, desc, mounts) if err != nil { return nil, err } - defer ds.Close() - digester := digest.Canonical.Digester() - rc := &readCounter{ - r: io.TeeReader(ds, digester.Hash()), - } + return &diffapi.ApplyResponse{ + Applied: fromDescriptor(ocidesc), + }, nil - if _, err := archive.Apply(ctx, dir, rc); err != nil { - return nil, err - } - - // Read any trailing data - if _, err := io.Copy(ioutil.Discard, rc); err != nil { - return nil, err - } - - resp := &diffapi.ApplyResponse{ - Applied: &descriptor.Descriptor{ - MediaType: ocispec.MediaTypeImageLayer, - Digest: digester.Digest(), - Size_: rc.c, - }, - } - - return resp, nil } func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.DiffResponse, error) { aMounts := toMounts(dr.Left) bMounts := toMounts(dr.Right) - aDir, err := ioutil.TempDir("", "left-") + ocidesc, err := s.diff.DiffMounts(ctx, aMounts, bMounts, dr.MediaType, dr.Ref) if err != nil { - return nil, errors.Wrap(err, "failed to create temporary directory") - } - defer os.RemoveAll(aDir) - - bDir, err := ioutil.TempDir("", "right-") - if err != nil { - return nil, errors.Wrap(err, "failed to create temporary directory") - } - defer os.RemoveAll(bDir) - - if err := mount.MountAll(aMounts, aDir); err != nil { - return nil, errors.Wrap(err, "failed to mount") - } - defer mount.Unmount(aDir, 0) - - if err := mount.MountAll(bMounts, bDir); err != nil { - return nil, errors.Wrap(err, "failed to mount") - } - defer mount.Unmount(bDir, 0) - - cw, err := s.store.Writer(ctx, dr.Ref, 0, "") - if err != nil { - return nil, errors.Wrap(err, "failed to open writer") - } - - // TODO: Validate media type - - // TODO: Support compressed media types (link compressed to uncompressed) - //dgstr := digest.SHA256.Digester() - //wc := &writeCounter{} - //compressed, err := compression.CompressStream(cw, compression.Gzip) - //if err != nil { - // return nil, errors.Wrap(err, "failed to get compressed stream") - //} - //err = archive.WriteDiff(ctx, io.MultiWriter(compressed, dgstr.Hash(), wc), lowerDir, upperDir) - //compressed.Close() - - err = archive.WriteDiff(ctx, cw, aDir, bDir) - if err != nil { - return nil, errors.Wrap(err, "failed to write diff") - } - - dgst := cw.Digest() - if err := cw.Commit(0, dgst); err != nil { - return nil, errors.Wrap(err, "failed to commit") - } - - info, err := s.store.Info(ctx, dgst) - if err != nil { - return nil, errors.Wrap(err, "failed to get info from content store") - } - - desc := ocispec.Descriptor{ - MediaType: dr.MediaType, - Digest: info.Digest, - Size: info.Size, + return nil, err } return &diffapi.DiffResponse{ - Diff: fromDescriptor(desc), + Diff: fromDescriptor(ocidesc), }, nil } -type readCounter struct { - r io.Reader - c int64 -} - -func (rc *readCounter) Read(p []byte) (n int, err error) { - n, err = rc.r.Read(p) - rc.c += int64(n) - return -} - -func toDescriptor(d *descriptor.Descriptor) ocispec.Descriptor { - return ocispec.Descriptor{ - MediaType: d.MediaType, - Digest: d.Digest, - Size: d.Size_, - } -} - func toMounts(apim []*mounttypes.Mount) []mount.Mount { mounts := make([]mount.Mount, len(apim)) for i, m := range apim {