Merge pull request #4916 from stefanberger/streamproc_env_vars

Allow passing environent variables to StreamProcessors
This commit is contained in:
Maksym Pavlenko 2021-01-11 16:34:12 -08:00 committed by GitHub
commit a4f4a43110
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 9 additions and 5 deletions

View File

@ -168,7 +168,7 @@ func (c *compressedProcessor) Close() error {
return c.rc.Close() return c.rc.Close()
} }
func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args []string) Handler { func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args, env []string) Handler {
set := make(map[string]struct{}, len(mediaTypes)) set := make(map[string]struct{}, len(mediaTypes))
for _, m := range mediaTypes { for _, m := range mediaTypes {
set[m] = struct{}{} set[m] = struct{}{}
@ -177,7 +177,7 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string
if _, ok := set[mediaType]; ok { if _, ok := set[mediaType]; ok {
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
payload := payloads[id] payload := payloads[id]
return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, payload) return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, env, payload)
}, true }, true
} }
return nil, false return nil, false

View File

@ -33,9 +33,10 @@ import (
) )
// NewBinaryProcessor returns a binary processor for use with processing content streams // NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args, env []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...) cmd := exec.CommandContext(ctx, name, args...)
cmd.Env = os.Environ() cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, env...)
var payloadC io.Closer var payloadC io.Closer
if payload != nil { if payload != nil {

View File

@ -39,9 +39,10 @@ import (
const processorPipe = "STREAM_PROCESSOR_PIPE" const processorPipe = "STREAM_PROCESSOR_PIPE"
// NewBinaryProcessor returns a binary processor for use with processing content streams // NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args, env []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...) cmd := exec.CommandContext(ctx, name, args...)
cmd.Env = os.Environ() cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, env...)
if payload != nil { if payload != nil {
data, err := proto.Marshal(payload) data, err := proto.Marshal(payload)

View File

@ -80,6 +80,8 @@ type StreamProcessor struct {
Path string `toml:"path"` Path string `toml:"path"`
// Args to the binary // Args to the binary
Args []string `toml:"args"` Args []string `toml:"args"`
// Environment variables for the binary
Env []string `toml:"env"`
} }
// GetVersion returns the config file's version // GetVersion returns the config file's version

View File

@ -91,7 +91,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
return nil, err return nil, err
} }
for id, p := range config.StreamProcessors { for id, p := range config.StreamProcessors {
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args)) diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))
} }
serverOpts := []grpc.ServerOption{ serverOpts := []grpc.ServerOption{