Add server config for stream processors
Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
		| @@ -78,7 +78,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ | |||||||
|  |  | ||||||
| 	processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) | 	processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) | ||||||
| 	for { | 	for { | ||||||
| 		if processor, err = diff.GetProcessor(ctx, desc.MediaType, processor, config.ProcessorPayloads); err != nil { | 		if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { | ||||||
| 			return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) | 			return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) | ||||||
| 		} | 		} | ||||||
| 		if processor.MediaType() == ocispec.MediaTypeImageLayer { | 		if processor.MediaType() == ocispec.MediaTypeImageLayer { | ||||||
|   | |||||||
| @@ -20,6 +20,7 @@ import ( | |||||||
| 	"context" | 	"context" | ||||||
|  |  | ||||||
| 	"github.com/containerd/containerd/mount" | 	"github.com/containerd/containerd/mount" | ||||||
|  | 	"github.com/gogo/protobuf/types" | ||||||
| 	ocispec "github.com/opencontainers/image-spec/specs-go/v1" | 	ocispec "github.com/opencontainers/image-spec/specs-go/v1" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -54,7 +55,7 @@ type Comparer interface { | |||||||
| // ApplyConfig is used to hold parameters needed for a apply operation | // ApplyConfig is used to hold parameters needed for a apply operation | ||||||
| type ApplyConfig struct { | type ApplyConfig struct { | ||||||
| 	// ProcessorPayloads specifies the payload sent to various processors | 	// ProcessorPayloads specifies the payload sent to various processors | ||||||
| 	ProcessorPayloads map[string]interface{} | 	ProcessorPayloads map[string]*types.Any | ||||||
| } | } | ||||||
|  |  | ||||||
| // ApplyOpt is used to configure an Apply operation | // ApplyOpt is used to configure an Apply operation | ||||||
|   | |||||||
| @@ -18,12 +18,15 @@ package diff | |||||||
|  |  | ||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
|  | 	"fmt" | ||||||
| 	"io" | 	"io" | ||||||
| 	"os" | 	"os" | ||||||
| 	"os/exec" | 	"os/exec" | ||||||
|  |  | ||||||
| 	"github.com/containerd/containerd/archive/compression" | 	"github.com/containerd/containerd/archive/compression" | ||||||
| 	"github.com/containerd/containerd/images" | 	"github.com/containerd/containerd/images" | ||||||
|  | 	"github.com/gogo/protobuf/proto" | ||||||
|  | 	"github.com/gogo/protobuf/types" | ||||||
| 	ocispec "github.com/opencontainers/image-spec/specs-go/v1" | 	ocispec "github.com/opencontainers/image-spec/specs-go/v1" | ||||||
| 	"github.com/pkg/errors" | 	"github.com/pkg/errors" | ||||||
| ) | ) | ||||||
| @@ -31,7 +34,7 @@ import ( | |||||||
| var ( | var ( | ||||||
| 	handlers []Handler | 	handlers []Handler | ||||||
|  |  | ||||||
| 	// ErrNoProcessor is returned when no stream processor is avaliable for a media-type | 	// ErrNoProcessor is returned when no stream processor is available for a media-type | ||||||
| 	ErrNoProcessor = errors.New("no processor for media-type") | 	ErrNoProcessor = errors.New("no processor for media-type") | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -46,10 +49,10 @@ func RegisterProcessor(handler Handler) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // GetProcessor returns the processor for a media-type | // GetProcessor returns the processor for a media-type | ||||||
| func GetProcessor(ctx context.Context, mediaType string, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { | func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { | ||||||
| 	// reverse this list so that user configured handlers come up first | 	// reverse this list so that user configured handlers come up first | ||||||
| 	for i := len(handlers); i >= 0; i-- { | 	for i := len(handlers) - 1; i >= 0; i-- { | ||||||
| 		processor, ok := handlers[i](ctx, mediaType) | 		processor, ok := handlers[i](ctx, stream.MediaType()) | ||||||
| 		if ok { | 		if ok { | ||||||
| 			return processor(ctx, stream, payloads) | 			return processor(ctx, stream, payloads) | ||||||
| 		} | 		} | ||||||
| @@ -71,7 +74,7 @@ func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler { | |||||||
| } | } | ||||||
|  |  | ||||||
| // StreamProcessorInit returns the initialized stream processor | // StreamProcessorInit returns the initialized stream processor | ||||||
| type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) | type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) | ||||||
|  |  | ||||||
| // RawProcessor provides access to direct fd for processing | // RawProcessor provides access to direct fd for processing | ||||||
| type RawProcessor interface { | type RawProcessor interface { | ||||||
| @@ -93,7 +96,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn | |||||||
| 		return nil, false | 		return nil, false | ||||||
| 	} | 	} | ||||||
| 	if compressed { | 	if compressed { | ||||||
| 		return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { | 		return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { | ||||||
| 			ds, err := compression.DecompressStream(stream) | 			ds, err := compression.DecompressStream(stream) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| 				return nil, err | 				return nil, err | ||||||
| @@ -104,7 +107,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn | |||||||
| 			}, nil | 			}, nil | ||||||
| 		}, true | 		}, true | ||||||
| 	} | 	} | ||||||
| 	return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { | 	return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { | ||||||
| 		return &stdProcessor{ | 		return &stdProcessor{ | ||||||
| 			rc: stream, | 			rc: stream, | ||||||
| 		}, nil | 		}, nil | ||||||
| @@ -168,24 +171,48 @@ func (c *compressedProcessor) Close() error { | |||||||
| 	return c.rc.Close() | 	return c.rc.Close() | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args []string) Handler { | ||||||
|  | 	set := make(map[string]struct{}, len(mediaTypes)) | ||||||
|  | 	for _, m := range mediaTypes { | ||||||
|  | 		set[m] = struct{}{} | ||||||
|  | 	} | ||||||
|  | 	return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) { | ||||||
|  | 		if _, ok := set[mediaType]; ok { | ||||||
|  | 			return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { | ||||||
|  | 				payload := payloads[id] | ||||||
|  | 				return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, payload) | ||||||
|  | 			}, true | ||||||
|  | 		} | ||||||
|  | 		return nil, false | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	payloadEnvVar   = "STREAM_PROCESSOR_PAYLOAD" | ||||||
|  | 	mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE" | ||||||
|  | ) | ||||||
|  |  | ||||||
| // NewBinaryProcessor returns a binary processor for use with processing content streams | // NewBinaryProcessor returns a binary processor for use with processing content streams | ||||||
| func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, name string, args ...string) (StreamProcessor, error) { | func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { | ||||||
| 	cmd := exec.CommandContext(ctx, name, args...) | 	cmd := exec.CommandContext(ctx, name, args...) | ||||||
| 	var ( | 	if payload != nil { | ||||||
| 		stdin *os.File | 		data, err := proto.Marshal(payload) | ||||||
| 		err   error |  | ||||||
| 	) |  | ||||||
| 	if f, ok := stream.(RawProcessor); ok { |  | ||||||
| 		stdin = f.File() |  | ||||||
| 	} else { |  | ||||||
| 		r, w, err := os.Pipe() |  | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 		stdin = r | 		cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", payloadEnvVar, data)) | ||||||
| 		go func() { | 	} | ||||||
| 			io.Copy(w, stream) | 	cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) | ||||||
| 		}() | 	var ( | ||||||
|  | 		stdin  io.Reader | ||||||
|  | 		closer func() error | ||||||
|  | 		err    error | ||||||
|  | 	) | ||||||
|  | 	if f, ok := stream.(RawProcessor); ok { | ||||||
|  | 		stdin = f.File() | ||||||
|  | 		closer = f.File().Close | ||||||
|  | 	} else { | ||||||
|  | 		stdin = stream | ||||||
| 	} | 	} | ||||||
| 	cmd.Stdin = stdin | 	cmd.Stdin = stdin | ||||||
| 	r, w, err := os.Pipe() | 	r, w, err := os.Pipe() | ||||||
| @@ -193,17 +220,22 @@ func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, | |||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	cmd.Stdout = w | 	cmd.Stdout = w | ||||||
|  |  | ||||||
| 	if err := cmd.Start(); err != nil { | 	if err := cmd.Start(); err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	go cmd.Wait() | ||||||
|  |  | ||||||
| 	// close after start and dup | 	// close after start and dup | ||||||
| 	stdin.Close() |  | ||||||
| 	w.Close() | 	w.Close() | ||||||
|  | 	if closer != nil { | ||||||
|  | 		closer() | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	return &binaryProcessor{ | 	return &binaryProcessor{ | ||||||
| 		cmd: cmd, | 		cmd: cmd, | ||||||
| 		r:   r, | 		r:   r, | ||||||
| 		mt:  mt, | 		mt:  rmt, | ||||||
| 	}, nil | 	}, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -56,9 +56,25 @@ type Config struct { | |||||||
| 	// ProxyPlugins configures plugins which are communicated to over GRPC | 	// ProxyPlugins configures plugins which are communicated to over GRPC | ||||||
| 	ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"` | 	ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"` | ||||||
|  |  | ||||||
|  | 	StreamProcessors []StreamProcessor `toml:"stream_processors"` | ||||||
|  |  | ||||||
| 	md toml.MetaData | 	md toml.MetaData | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // StreamProcessor provides configuration for diff content processors | ||||||
|  | type StreamProcessor struct { | ||||||
|  | 	// ID of the processor, also used to fetch the specific payload | ||||||
|  | 	ID string `toml:"id"` | ||||||
|  | 	// Accepts specific media-types | ||||||
|  | 	Accepts []string `toml:"accepts"` | ||||||
|  | 	// Returns the media-type | ||||||
|  | 	Returns string `toml:"returns"` | ||||||
|  | 	// Path or name of the binary | ||||||
|  | 	Path string `toml:"path"` | ||||||
|  | 	// Args to the binary | ||||||
|  | 	Args []string `toml:"args"` | ||||||
|  | } | ||||||
|  |  | ||||||
| // GetVersion returns the config file's version | // GetVersion returns the config file's version | ||||||
| func (c *Config) GetVersion() int { | func (c *Config) GetVersion() int { | ||||||
| 	if c.Version == 0 { | 	if c.Version == 0 { | ||||||
|   | |||||||
| @@ -35,6 +35,7 @@ import ( | |||||||
| 	"github.com/containerd/containerd/content/local" | 	"github.com/containerd/containerd/content/local" | ||||||
| 	csproxy "github.com/containerd/containerd/content/proxy" | 	csproxy "github.com/containerd/containerd/content/proxy" | ||||||
| 	"github.com/containerd/containerd/defaults" | 	"github.com/containerd/containerd/defaults" | ||||||
|  | 	"github.com/containerd/containerd/diff" | ||||||
| 	"github.com/containerd/containerd/events/exchange" | 	"github.com/containerd/containerd/events/exchange" | ||||||
| 	"github.com/containerd/containerd/log" | 	"github.com/containerd/containerd/log" | ||||||
| 	"github.com/containerd/containerd/metadata" | 	"github.com/containerd/containerd/metadata" | ||||||
| @@ -80,6 +81,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
|  | 	for _, p := range config.StreamProcessors { | ||||||
|  | 		diff.RegisterProcessor(diff.BinaryHandler(p.ID, p.Returns, p.Accepts, p.Path, p.Args)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	serverOpts := []grpc.ServerOption{ | 	serverOpts := []grpc.ServerOption{ | ||||||
| 		grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), | 		grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), | ||||||
| 		grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), | 		grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Michael Crosby
					Michael Crosby