diff --git a/diff/apply/apply.go b/diff/apply/apply.go index db077bccd..99a6b802e 100644 --- a/diff/apply/apply.go +++ b/diff/apply/apply.go @@ -78,7 +78,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) for { - if processor, err = diff.GetProcessor(ctx, desc.MediaType, processor, config.ProcessorPayloads); err != nil { + if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) } if processor.MediaType() == ocispec.MediaTypeImageLayer { diff --git a/diff/diff.go b/diff/diff.go index 4a5193741..6dcef6ccb 100644 --- a/diff/diff.go +++ b/diff/diff.go @@ -20,6 +20,7 @@ import ( "context" "github.com/containerd/containerd/mount" + "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -54,7 +55,7 @@ type Comparer interface { // ApplyConfig is used to hold parameters needed for a apply operation type ApplyConfig struct { // ProcessorPayloads specifies the payload sent to various processors - ProcessorPayloads map[string]interface{} + ProcessorPayloads map[string]*types.Any } // ApplyOpt is used to configure an Apply operation diff --git a/diff/stream.go b/diff/stream.go index 1a336fd17..cb60ef61e 100644 --- a/diff/stream.go +++ b/diff/stream.go @@ -18,12 +18,15 @@ package diff import ( "context" + "fmt" "io" "os" "os/exec" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/images" + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -31,7 +34,7 @@ import ( var ( handlers []Handler - // ErrNoProcessor is returned when no stream processor is avaliable for a media-type + // ErrNoProcessor is returned when no stream processor is available for a media-type ErrNoProcessor = errors.New("no processor for media-type") ) @@ -46,10 +49,10 @@ func RegisterProcessor(handler Handler) { } // GetProcessor returns the processor for a media-type -func GetProcessor(ctx context.Context, mediaType string, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { +func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { // reverse this list so that user configured handlers come up first - for i := len(handlers); i >= 0; i-- { - processor, ok := handlers[i](ctx, mediaType) + for i := len(handlers) - 1; i >= 0; i-- { + processor, ok := handlers[i](ctx, stream.MediaType()) if ok { return processor(ctx, stream, payloads) } @@ -71,7 +74,7 @@ func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler { } // StreamProcessorInit returns the initialized stream processor -type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) +type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) // RawProcessor provides access to direct fd for processing type RawProcessor interface { @@ -93,7 +96,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn return nil, false } if compressed { - return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { + return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { ds, err := compression.DecompressStream(stream) if err != nil { return nil, err @@ -104,7 +107,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn }, nil }, true } - return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { + return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { return &stdProcessor{ rc: stream, }, nil @@ -168,24 +171,48 @@ func (c *compressedProcessor) Close() error { return c.rc.Close() } +func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args []string) Handler { + set := make(map[string]struct{}, len(mediaTypes)) + for _, m := range mediaTypes { + set[m] = struct{}{} + } + return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) { + if _, ok := set[mediaType]; ok { + return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { + payload := payloads[id] + return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, payload) + }, true + } + return nil, false + } +} + +const ( + payloadEnvVar = "STREAM_PROCESSOR_PAYLOAD" + mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE" +) + // NewBinaryProcessor returns a binary processor for use with processing content streams -func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, name string, args ...string) (StreamProcessor, error) { +func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { cmd := exec.CommandContext(ctx, name, args...) - var ( - stdin *os.File - err error - ) - if f, ok := stream.(RawProcessor); ok { - stdin = f.File() - } else { - r, w, err := os.Pipe() + if payload != nil { + data, err := proto.Marshal(payload) if err != nil { return nil, err } - stdin = r - go func() { - io.Copy(w, stream) - }() + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", payloadEnvVar, data)) + } + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) + var ( + stdin io.Reader + closer func() error + err error + ) + if f, ok := stream.(RawProcessor); ok { + stdin = f.File() + closer = f.File().Close + } else { + stdin = stream } cmd.Stdin = stdin r, w, err := os.Pipe() @@ -193,17 +220,22 @@ func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, return nil, err } cmd.Stdout = w + if err := cmd.Start(); err != nil { return nil, err } + go cmd.Wait() + // close after start and dup - stdin.Close() w.Close() + if closer != nil { + closer() + } return &binaryProcessor{ cmd: cmd, r: r, - mt: mt, + mt: rmt, }, nil } diff --git a/services/server/config/config.go b/services/server/config/config.go index 13ccd5b6f..365dfa0fd 100644 --- a/services/server/config/config.go +++ b/services/server/config/config.go @@ -56,9 +56,25 @@ type Config struct { // ProxyPlugins configures plugins which are communicated to over GRPC ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"` + StreamProcessors []StreamProcessor `toml:"stream_processors"` + md toml.MetaData } +// StreamProcessor provides configuration for diff content processors +type StreamProcessor struct { + // ID of the processor, also used to fetch the specific payload + ID string `toml:"id"` + // Accepts specific media-types + Accepts []string `toml:"accepts"` + // Returns the media-type + Returns string `toml:"returns"` + // Path or name of the binary + Path string `toml:"path"` + // Args to the binary + Args []string `toml:"args"` +} + // GetVersion returns the config file's version func (c *Config) GetVersion() int { if c.Version == 0 { diff --git a/services/server/server.go b/services/server/server.go index 3c85b4b7c..0e6923918 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/content/local" csproxy "github.com/containerd/containerd/content/proxy" "github.com/containerd/containerd/defaults" + "github.com/containerd/containerd/diff" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" @@ -80,6 +81,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { if err != nil { return nil, err } + for _, p := range config.StreamProcessors { + diff.RegisterProcessor(diff.BinaryHandler(p.ID, p.Returns, p.Accepts, p.Path, p.Args)) + } + serverOpts := []grpc.ServerOption{ grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor),