Make Diff/Apply plugable
Signed-off-by: Volodymyr Burenin <vburenin@gmail.com>
This commit is contained in:
parent
35d74aa9d8
commit
3c76a667b6
@ -2,6 +2,7 @@ package main
|
||||
|
||||
// register containerd builtins here
|
||||
import (
|
||||
_ "github.com/containerd/containerd/differ"
|
||||
_ "github.com/containerd/containerd/services/containers"
|
||||
_ "github.com/containerd/containerd/services/content"
|
||||
_ "github.com/containerd/containerd/services/diff"
|
||||
|
@ -49,6 +49,9 @@ type config struct {
|
||||
Metrics metricsConfig `toml:"metrics"`
|
||||
// Snapshotter specifies which snapshot driver to use
|
||||
Snapshotter string `toml:"snapshotter"`
|
||||
// Differ specifies which differ to use. Differ is tightly coupled with the snapshotter
|
||||
// so not all combinations may work.
|
||||
Differ string `toml:"differ"`
|
||||
// Plugins provides plugin specific configuration for the initialization of a plugin
|
||||
Plugins map[string]toml.Primitive `toml:"plugins"`
|
||||
// Enable containerd as a subreaper
|
||||
|
@ -12,5 +12,6 @@ func defaultConfig() *config {
|
||||
Address: "/run/containerd/debug.sock",
|
||||
},
|
||||
Snapshotter: "overlay",
|
||||
Differ: "base",
|
||||
}
|
||||
}
|
||||
|
@ -14,5 +14,6 @@ func defaultConfig() *config {
|
||||
Address: "/run/containerd/debug.sock",
|
||||
},
|
||||
Snapshotter: "naive",
|
||||
Differ: "base",
|
||||
}
|
||||
}
|
||||
|
@ -17,5 +17,6 @@ func defaultConfig() *config {
|
||||
Address: `\\.\pipe\containerd-debug`,
|
||||
},
|
||||
Snapshotter: "windows",
|
||||
Differ: "base",
|
||||
}
|
||||
}
|
||||
|
@ -130,7 +130,13 @@ func main() {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
services, err := loadServices(runtimes, store, snapshotter, meta)
|
||||
|
||||
differ, err := loadDiffer(snapshotter, store)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
services, err := loadServices(runtimes, store, snapshotter, meta, differ)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -351,6 +357,40 @@ func loadSnapshotter(store content.Store) (snapshot.Snapshotter, error) {
|
||||
return nil, fmt.Errorf("snapshotter not loaded: %v", conf.Snapshotter)
|
||||
}
|
||||
|
||||
func loadDiffer(snapshotter snapshot.Snapshotter, store content.Store) (plugin.Differ, error) {
|
||||
for name, sr := range plugin.Registrations() {
|
||||
if sr.Type != plugin.DiffPlugin {
|
||||
continue
|
||||
}
|
||||
moduleName := fmt.Sprintf("diff-%s", conf.Differ)
|
||||
if name != moduleName {
|
||||
continue
|
||||
}
|
||||
|
||||
log.G(global).Infof("loading differ plugin %q...", name)
|
||||
ic := &plugin.InitContext{
|
||||
Root: conf.Root,
|
||||
State: conf.State,
|
||||
Content: store,
|
||||
Snapshotter: snapshotter,
|
||||
Context: log.WithModule(global, moduleName),
|
||||
}
|
||||
if sr.Config != nil {
|
||||
if err := conf.decodePlugin(name, sr.Config); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ic.Config = sr.Config
|
||||
}
|
||||
sn, err := sr.Init(ic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sn.(plugin.Differ), nil
|
||||
}
|
||||
return nil, fmt.Errorf("differ not loaded: %v", conf.Differ)
|
||||
}
|
||||
|
||||
func newGRPCServer() *grpc.Server {
|
||||
s := grpc.NewServer(
|
||||
grpc.UnaryInterceptor(interceptor),
|
||||
@ -359,7 +399,9 @@ func newGRPCServer() *grpc.Server {
|
||||
return s
|
||||
}
|
||||
|
||||
func loadServices(runtimes map[string]plugin.Runtime, store content.Store, sn snapshot.Snapshotter, meta *bolt.DB) ([]plugin.Service, error) {
|
||||
func loadServices(runtimes map[string]plugin.Runtime,
|
||||
store content.Store, sn snapshot.Snapshotter,
|
||||
meta *bolt.DB, differ plugin.Differ) ([]plugin.Service, error) {
|
||||
var o []plugin.Service
|
||||
for name, sr := range plugin.Registrations() {
|
||||
if sr.Type != plugin.GRPCPlugin {
|
||||
@ -374,6 +416,7 @@ func loadServices(runtimes map[string]plugin.Runtime, store content.Store, sn sn
|
||||
Content: store,
|
||||
Meta: meta,
|
||||
Snapshotter: sn,
|
||||
Differ: differ,
|
||||
}
|
||||
if sr.Config != nil {
|
||||
if err := conf.decodePlugin(name, sr.Config); err != nil {
|
||||
|
164
differ/differ.go
Normal file
164
differ/differ.go
Normal file
@ -0,0 +1,164 @@
|
||||
package differ
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"github.com/containerd/containerd/archive"
|
||||
"github.com/containerd/containerd/archive/compression"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
func init() {
|
||||
plugin.Register("diff-base", &plugin.Registration{
|
||||
Type: plugin.DiffPlugin,
|
||||
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return newBaseDiff(ic.Content, ic.Snapshotter)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type BaseDiff struct {
|
||||
store content.Store
|
||||
snapshotter snapshot.Snapshotter
|
||||
}
|
||||
|
||||
var _ plugin.Differ = &BaseDiff{}
|
||||
|
||||
var emptyDesc = ocispec.Descriptor{}
|
||||
|
||||
func newBaseDiff(store content.Store, snapshotter snapshot.Snapshotter) (*BaseDiff, error) {
|
||||
return &BaseDiff{
|
||||
store: store,
|
||||
snapshotter: snapshotter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BaseDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount) (ocispec.Descriptor, error) {
|
||||
// TODO: Check for supported media types
|
||||
dir, err := ioutil.TempDir("", "extract-")
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to create temporary directory")
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
if err := mount.MountAll(mounts, dir); err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to mount")
|
||||
}
|
||||
defer mount.Unmount(dir, 0)
|
||||
|
||||
r, err := s.store.Reader(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to get reader from content store")
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
// TODO: only decompress stream if media type is compressed
|
||||
ds, err := compression.DecompressStream(r)
|
||||
if err != nil {
|
||||
return emptyDesc, err
|
||||
}
|
||||
defer ds.Close()
|
||||
|
||||
digester := digest.Canonical.Digester()
|
||||
rc := &readCounter{
|
||||
r: io.TeeReader(ds, digester.Hash()),
|
||||
}
|
||||
|
||||
if _, err := archive.Apply(ctx, dir, rc); err != nil {
|
||||
return emptyDesc, err
|
||||
}
|
||||
|
||||
// Read any trailing data
|
||||
if _, err := io.Copy(ioutil.Discard, rc); err != nil {
|
||||
return emptyDesc, err
|
||||
}
|
||||
|
||||
return ocispec.Descriptor{
|
||||
MediaType: ocispec.MediaTypeImageLayer,
|
||||
Size: rc.c,
|
||||
Digest: digester.Digest(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BaseDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error) {
|
||||
|
||||
aDir, err := ioutil.TempDir("", "left-")
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to create temporary directory")
|
||||
}
|
||||
defer os.RemoveAll(aDir)
|
||||
|
||||
bDir, err := ioutil.TempDir("", "right-")
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to create temporary directory")
|
||||
}
|
||||
defer os.RemoveAll(bDir)
|
||||
|
||||
if err := mount.MountAll(lower, aDir); err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to mount")
|
||||
}
|
||||
defer mount.Unmount(aDir, 0)
|
||||
|
||||
if err := mount.MountAll(upper, bDir); err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to mount")
|
||||
}
|
||||
defer mount.Unmount(bDir, 0)
|
||||
|
||||
cw, err := s.store.Writer(ctx, ref, 0, "")
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to open writer")
|
||||
}
|
||||
|
||||
// TODO: Validate media type
|
||||
|
||||
// TODO: Support compressed media types (link compressed to uncompressed)
|
||||
//dgstr := digest.SHA256.Digester()
|
||||
//wc := &writeCounter{}
|
||||
//compressed, err := compression.CompressStream(cw, compression.Gzip)
|
||||
//if err != nil {
|
||||
// return nil, errors.Wrap(err, "failed to get compressed stream")
|
||||
//}
|
||||
//err = archive.WriteDiff(ctx, io.MultiWriter(compressed, dgstr.Hash(), wc), lowerDir, upperDir)
|
||||
//compressed.Close()
|
||||
|
||||
err = archive.WriteDiff(ctx, cw, aDir, bDir)
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to write diff")
|
||||
}
|
||||
|
||||
dgst := cw.Digest()
|
||||
if err := cw.Commit(0, dgst); err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to commit")
|
||||
}
|
||||
|
||||
info, err := s.store.Info(ctx, dgst)
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to get info from content store")
|
||||
}
|
||||
|
||||
return ocispec.Descriptor{
|
||||
MediaType: media,
|
||||
Size: info.Size,
|
||||
Digest: info.Digest,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type readCounter struct {
|
||||
r io.Reader
|
||||
c int64
|
||||
}
|
||||
|
||||
func (rc *readCounter) Read(p []byte) (n int, err error) {
|
||||
n, err = rc.r.Read(p)
|
||||
rc.c += int64(n)
|
||||
return
|
||||
}
|
12
plugin/differ.go
Normal file
12
plugin/differ.go
Normal file
@ -0,0 +1,12 @@
|
||||
package plugin
|
||||
|
||||
import (
|
||||
"github.com/containerd/containerd/mount"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
type Differ interface {
|
||||
Apply(ctx context.Context, desc ocispec.Descriptor, mount []mount.Mount) (ocispec.Descriptor, error)
|
||||
DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error)
|
||||
}
|
@ -19,6 +19,7 @@ const (
|
||||
GRPCPlugin
|
||||
SnapshotPlugin
|
||||
TaskMonitorPlugin
|
||||
DiffPlugin
|
||||
)
|
||||
|
||||
type Registration struct {
|
||||
@ -35,6 +36,7 @@ type InitContext struct {
|
||||
Content content.Store
|
||||
Meta *bolt.DB
|
||||
Snapshotter snapshot.Snapshotter
|
||||
Differ Differ
|
||||
Config interface{}
|
||||
Context context.Context
|
||||
Monitor TaskMonitor
|
||||
|
@ -54,6 +54,14 @@ func (r *remote) DiffMounts(ctx context.Context, a, b []mount.Mount, media, ref
|
||||
return toDescriptor(resp.Diff), nil
|
||||
}
|
||||
|
||||
func toDescriptor(d *descriptor.Descriptor) ocispec.Descriptor {
|
||||
return ocispec.Descriptor{
|
||||
MediaType: d.MediaType,
|
||||
Digest: d.Digest,
|
||||
Size: d.Size_,
|
||||
}
|
||||
}
|
||||
|
||||
func fromDescriptor(d ocispec.Descriptor) *descriptor.Descriptor {
|
||||
return &descriptor.Descriptor{
|
||||
MediaType: d.MediaType,
|
||||
|
@ -1,22 +1,10 @@
|
||||
package diff
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
diffapi "github.com/containerd/containerd/api/services/diff"
|
||||
"github.com/containerd/containerd/api/types/descriptor"
|
||||
mounttypes "github.com/containerd/containerd/api/types/mount"
|
||||
"github.com/containerd/containerd/archive"
|
||||
"github.com/containerd/containerd/archive/compression"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
"github.com/containerd/containerd/snapshot"
|
||||
digest "github.com/opencontainers/go-digest"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
@ -25,21 +13,15 @@ func init() {
|
||||
plugin.Register("diff-grpc", &plugin.Registration{
|
||||
Type: plugin.GRPCPlugin,
|
||||
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
return newService(ic.Content, ic.Snapshotter)
|
||||
return &service{
|
||||
diff: ic.Differ,
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type service struct {
|
||||
store content.Store
|
||||
snapshotter snapshot.Snapshotter
|
||||
}
|
||||
|
||||
func newService(store content.Store, snapshotter snapshot.Snapshotter) (*service, error) {
|
||||
return &service{
|
||||
store: store,
|
||||
snapshotter: snapshotter,
|
||||
}, nil
|
||||
diff plugin.Differ
|
||||
}
|
||||
|
||||
func (s *service) Register(gs *grpc.Server) error {
|
||||
@ -53,143 +35,31 @@ func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi
|
||||
|
||||
mounts := toMounts(er.Mounts)
|
||||
|
||||
dir, err := ioutil.TempDir("", "extract-")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create temporary directory")
|
||||
}
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
if err := mount.MountAll(mounts, dir); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to mount")
|
||||
}
|
||||
defer mount.Unmount(dir, 0)
|
||||
|
||||
r, err := s.store.Reader(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get reader from content store")
|
||||
}
|
||||
defer r.Close()
|
||||
|
||||
// TODO: only decompress stream if media type is compressed
|
||||
ds, err := compression.DecompressStream(r)
|
||||
ocidesc, err := s.diff.Apply(ctx, desc, mounts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer ds.Close()
|
||||
|
||||
digester := digest.Canonical.Digester()
|
||||
rc := &readCounter{
|
||||
r: io.TeeReader(ds, digester.Hash()),
|
||||
}
|
||||
return &diffapi.ApplyResponse{
|
||||
Applied: fromDescriptor(ocidesc),
|
||||
}, nil
|
||||
|
||||
if _, err := archive.Apply(ctx, dir, rc); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read any trailing data
|
||||
if _, err := io.Copy(ioutil.Discard, rc); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := &diffapi.ApplyResponse{
|
||||
Applied: &descriptor.Descriptor{
|
||||
MediaType: ocispec.MediaTypeImageLayer,
|
||||
Digest: digester.Digest(),
|
||||
Size_: rc.c,
|
||||
},
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.DiffResponse, error) {
|
||||
aMounts := toMounts(dr.Left)
|
||||
bMounts := toMounts(dr.Right)
|
||||
|
||||
aDir, err := ioutil.TempDir("", "left-")
|
||||
ocidesc, err := s.diff.DiffMounts(ctx, aMounts, bMounts, dr.MediaType, dr.Ref)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create temporary directory")
|
||||
}
|
||||
defer os.RemoveAll(aDir)
|
||||
|
||||
bDir, err := ioutil.TempDir("", "right-")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to create temporary directory")
|
||||
}
|
||||
defer os.RemoveAll(bDir)
|
||||
|
||||
if err := mount.MountAll(aMounts, aDir); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to mount")
|
||||
}
|
||||
defer mount.Unmount(aDir, 0)
|
||||
|
||||
if err := mount.MountAll(bMounts, bDir); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to mount")
|
||||
}
|
||||
defer mount.Unmount(bDir, 0)
|
||||
|
||||
cw, err := s.store.Writer(ctx, dr.Ref, 0, "")
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to open writer")
|
||||
}
|
||||
|
||||
// TODO: Validate media type
|
||||
|
||||
// TODO: Support compressed media types (link compressed to uncompressed)
|
||||
//dgstr := digest.SHA256.Digester()
|
||||
//wc := &writeCounter{}
|
||||
//compressed, err := compression.CompressStream(cw, compression.Gzip)
|
||||
//if err != nil {
|
||||
// return nil, errors.Wrap(err, "failed to get compressed stream")
|
||||
//}
|
||||
//err = archive.WriteDiff(ctx, io.MultiWriter(compressed, dgstr.Hash(), wc), lowerDir, upperDir)
|
||||
//compressed.Close()
|
||||
|
||||
err = archive.WriteDiff(ctx, cw, aDir, bDir)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to write diff")
|
||||
}
|
||||
|
||||
dgst := cw.Digest()
|
||||
if err := cw.Commit(0, dgst); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to commit")
|
||||
}
|
||||
|
||||
info, err := s.store.Info(ctx, dgst)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to get info from content store")
|
||||
}
|
||||
|
||||
desc := ocispec.Descriptor{
|
||||
MediaType: dr.MediaType,
|
||||
Digest: info.Digest,
|
||||
Size: info.Size,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &diffapi.DiffResponse{
|
||||
Diff: fromDescriptor(desc),
|
||||
Diff: fromDescriptor(ocidesc),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type readCounter struct {
|
||||
r io.Reader
|
||||
c int64
|
||||
}
|
||||
|
||||
func (rc *readCounter) Read(p []byte) (n int, err error) {
|
||||
n, err = rc.r.Read(p)
|
||||
rc.c += int64(n)
|
||||
return
|
||||
}
|
||||
|
||||
func toDescriptor(d *descriptor.Descriptor) ocispec.Descriptor {
|
||||
return ocispec.Descriptor{
|
||||
MediaType: d.MediaType,
|
||||
Digest: d.Digest,
|
||||
Size: d.Size_,
|
||||
}
|
||||
}
|
||||
|
||||
func toMounts(apim []*mounttypes.Mount) []mount.Mount {
|
||||
mounts := make([]mount.Mount, len(apim))
|
||||
for i, m := range apim {
|
||||
|
Loading…
Reference in New Issue
Block a user