Add support for multiple differs
Updates the differ service to support calling and configuring multiple differs. The differs are configured as an ordered list of differs which will each be attempting until a supported differ is called. Additionally a not supported error type was added to allow differs to be selective of whether the differ arguments are supported by the differ. This error type corresponds to the GRPC unimplemented error. Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
parent
b2ee0ab34e
commit
2fa366d6af
@ -4,11 +4,14 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/containerd/containerd/archive"
|
||||
"github.com/containerd/containerd/archive/compression"
|
||||
"github.com/containerd/containerd/content"
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/images"
|
||||
"github.com/containerd/containerd/metadata"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
@ -21,7 +24,7 @@ import (
|
||||
func init() {
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.DiffPlugin,
|
||||
ID: "base-diff",
|
||||
ID: "walking",
|
||||
Requires: []plugin.PluginType{
|
||||
plugin.ContentPlugin,
|
||||
plugin.MetadataPlugin,
|
||||
@ -35,27 +38,38 @@ func init() {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return NewBaseDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store)))
|
||||
return newWalkingDiff(metadata.NewContentStore(md.(*bolt.DB), c.(content.Store)))
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type BaseDiff struct {
|
||||
type walkingDiff struct {
|
||||
store content.Store
|
||||
}
|
||||
|
||||
var _ plugin.Differ = &BaseDiff{}
|
||||
|
||||
var emptyDesc = ocispec.Descriptor{}
|
||||
|
||||
func NewBaseDiff(store content.Store) (*BaseDiff, error) {
|
||||
return &BaseDiff{
|
||||
func newWalkingDiff(store content.Store) (plugin.Differ, error) {
|
||||
return &walkingDiff{
|
||||
store: store,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BaseDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount) (ocispec.Descriptor, error) {
|
||||
// TODO: Check for supported media types
|
||||
func (s *walkingDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount) (ocispec.Descriptor, error) {
|
||||
var isCompressed bool
|
||||
switch desc.MediaType {
|
||||
case ocispec.MediaTypeImageLayer, images.MediaTypeDockerSchema2Layer:
|
||||
case ocispec.MediaTypeImageLayerGzip, images.MediaTypeDockerSchema2LayerGzip:
|
||||
isCompressed = true
|
||||
default:
|
||||
// Still apply all generic media types *.tar[.+]gzip and *.tar
|
||||
if strings.HasSuffix(desc.MediaType, ".tar.gzip") || strings.HasSuffix(desc.MediaType, ".tar+gzip") {
|
||||
isCompressed = true
|
||||
} else if !strings.HasSuffix(desc.MediaType, ".tar") {
|
||||
return emptyDesc, errors.Wrapf(errdefs.ErrNotSupported, "unsupported diff media type: %v", desc.MediaType)
|
||||
}
|
||||
}
|
||||
|
||||
dir, err := ioutil.TempDir("", "extract-")
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to create temporary directory")
|
||||
@ -67,22 +81,25 @@ func (s *BaseDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []
|
||||
}
|
||||
defer mount.Unmount(dir, 0)
|
||||
|
||||
r, err := s.store.ReaderAt(ctx, desc.Digest)
|
||||
ra, err := s.store.ReaderAt(ctx, desc.Digest)
|
||||
if err != nil {
|
||||
return emptyDesc, errors.Wrap(err, "failed to get reader from content store")
|
||||
}
|
||||
defer r.Close()
|
||||
defer ra.Close()
|
||||
|
||||
// TODO: only decompress stream if media type is compressed
|
||||
ds, err := compression.DecompressStream(content.NewReader(r))
|
||||
r := content.NewReader(ra)
|
||||
if isCompressed {
|
||||
ds, err := compression.DecompressStream(r)
|
||||
if err != nil {
|
||||
return emptyDesc, err
|
||||
}
|
||||
defer ds.Close()
|
||||
r = ds
|
||||
}
|
||||
|
||||
digester := digest.Canonical.Digester()
|
||||
rc := &readCounter{
|
||||
r: io.TeeReader(ds, digester.Hash()),
|
||||
r: io.TeeReader(r, digester.Hash()),
|
||||
}
|
||||
|
||||
if _, err := archive.Apply(ctx, dir, rc); err != nil {
|
||||
@ -101,7 +118,7 @@ func (s *BaseDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *BaseDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error) {
|
||||
func (s *walkingDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount, media, ref string) (ocispec.Descriptor, error) {
|
||||
var isCompressed bool
|
||||
switch media {
|
||||
case ocispec.MediaTypeImageLayer:
|
||||
@ -111,7 +128,7 @@ func (s *BaseDiff) DiffMounts(ctx context.Context, lower, upper []mount.Mount, m
|
||||
media = ocispec.MediaTypeImageLayerGzip
|
||||
isCompressed = true
|
||||
default:
|
||||
return emptyDesc, errors.Errorf("unsupported diff media type: %v", media)
|
||||
return emptyDesc, errors.Wrapf(errdefs.ErrNotSupported, "unsupported diff media type: %v", media)
|
||||
}
|
||||
aDir, err := ioutil.TempDir("", "left-")
|
||||
if err != nil {
|
||||
|
@ -26,6 +26,7 @@ var (
|
||||
ErrAlreadyExists = errors.New("already exists")
|
||||
ErrFailedPrecondition = errors.New("failed precondition")
|
||||
ErrUnavailable = errors.New("unavailable")
|
||||
ErrNotSupported = errors.New("not supported") // represents not supported and unimplemented
|
||||
)
|
||||
|
||||
func IsInvalidArgument(err error) bool {
|
||||
@ -52,3 +53,7 @@ func IsFailedPrecondition(err error) bool {
|
||||
func IsUnavailable(err error) bool {
|
||||
return errors.Cause(err) == ErrUnavailable
|
||||
}
|
||||
|
||||
func IsNotSupported(err error) bool {
|
||||
return errors.Cause(err) == ErrNotSupported
|
||||
}
|
||||
|
@ -38,6 +38,8 @@ func ToGRPC(err error) error {
|
||||
return grpc.Errorf(codes.FailedPrecondition, err.Error())
|
||||
case IsUnavailable(err):
|
||||
return grpc.Errorf(codes.Unavailable, err.Error())
|
||||
case IsNotSupported(err):
|
||||
return grpc.Errorf(codes.Unimplemented, err.Error())
|
||||
}
|
||||
|
||||
return err
|
||||
@ -69,6 +71,8 @@ func FromGRPC(err error) error {
|
||||
cls = ErrUnavailable
|
||||
case codes.FailedPrecondition:
|
||||
cls = ErrFailedPrecondition
|
||||
case codes.Unimplemented:
|
||||
cls = ErrNotSupported
|
||||
default:
|
||||
cls = ErrUnknown
|
||||
}
|
||||
|
@ -6,10 +6,22 @@ import (
|
||||
"github.com/containerd/containerd/errdefs"
|
||||
"github.com/containerd/containerd/mount"
|
||||
"github.com/containerd/containerd/plugin"
|
||||
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type config struct {
|
||||
// Order is the order of preference in which to try diff algorithms, the
|
||||
// first differ which is supported is used.
|
||||
// Note when multiple differs may be supported, this order will be
|
||||
// respected for which is choosen. Each differ should return the same
|
||||
// correct output, allowing any ordering to be used to prefer
|
||||
// more optimimal implementations.
|
||||
Order []string `toml:"default,omitempty"`
|
||||
}
|
||||
|
||||
func init() {
|
||||
plugin.Register(&plugin.Registration{
|
||||
Type: plugin.GRPCPlugin,
|
||||
@ -17,20 +29,34 @@ func init() {
|
||||
Requires: []plugin.PluginType{
|
||||
plugin.DiffPlugin,
|
||||
},
|
||||
Config: &config{
|
||||
Order: []string{"walking"},
|
||||
},
|
||||
Init: func(ic *plugin.InitContext) (interface{}, error) {
|
||||
d, err := ic.Get(plugin.DiffPlugin)
|
||||
differs, err := ic.GetAll(plugin.DiffPlugin)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
orderedNames := ic.Config.(*config).Order
|
||||
ordered := make([]plugin.Differ, len(orderedNames))
|
||||
for i, n := range orderedNames {
|
||||
differ, ok := differs[n]
|
||||
if !ok {
|
||||
return nil, errors.Errorf("needed differ not loaded: %s", n)
|
||||
}
|
||||
ordered[i] = differ.(plugin.Differ)
|
||||
}
|
||||
|
||||
return &service{
|
||||
diff: d.(plugin.Differ),
|
||||
differs: ordered,
|
||||
}, nil
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
type service struct {
|
||||
diff plugin.Differ
|
||||
differs []plugin.Differ
|
||||
}
|
||||
|
||||
func (s *service) Register(gs *grpc.Server) error {
|
||||
@ -39,12 +65,20 @@ func (s *service) Register(gs *grpc.Server) error {
|
||||
}
|
||||
|
||||
func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi.ApplyResponse, error) {
|
||||
desc := toDescriptor(er.Diff)
|
||||
// TODO: Check for supported media types
|
||||
var (
|
||||
ocidesc ocispec.Descriptor
|
||||
err error
|
||||
desc = toDescriptor(er.Diff)
|
||||
mounts = toMounts(er.Mounts)
|
||||
)
|
||||
|
||||
mounts := toMounts(er.Mounts)
|
||||
for _, differ := range s.differs {
|
||||
ocidesc, err = differ.Apply(ctx, desc, mounts)
|
||||
if !errdefs.IsNotSupported(err) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
ocidesc, err := s.diff.Apply(ctx, desc, mounts)
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
@ -56,10 +90,19 @@ func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi
|
||||
}
|
||||
|
||||
func (s *service) Diff(ctx context.Context, dr *diffapi.DiffRequest) (*diffapi.DiffResponse, error) {
|
||||
aMounts := toMounts(dr.Left)
|
||||
bMounts := toMounts(dr.Right)
|
||||
var (
|
||||
ocidesc ocispec.Descriptor
|
||||
err error
|
||||
aMounts = toMounts(dr.Left)
|
||||
bMounts = toMounts(dr.Right)
|
||||
)
|
||||
|
||||
ocidesc, err := s.diff.DiffMounts(ctx, aMounts, bMounts, dr.MediaType, dr.Ref)
|
||||
for _, differ := range s.differs {
|
||||
ocidesc, err = differ.DiffMounts(ctx, aMounts, bMounts, dr.MediaType, dr.Ref)
|
||||
if !errdefs.IsNotSupported(err) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errdefs.ToGRPC(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user