Allow passing environent variables to StreamProcessors
Add support for an 'env' field to the StreamProcessor configuration and append the environment variables found there to the os.Environ() array. The env field takes environment variables in the form of key=value. Signed-off-by: Stefan Berger <stefanb@linux.ibm.com>
This commit is contained in:
parent
092f9e607a
commit
1917ca5f79
@ -168,7 +168,7 @@ func (c *compressedProcessor) Close() error {
|
|||||||
return c.rc.Close()
|
return c.rc.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args []string) Handler {
|
func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args, env []string) Handler {
|
||||||
set := make(map[string]struct{}, len(mediaTypes))
|
set := make(map[string]struct{}, len(mediaTypes))
|
||||||
for _, m := range mediaTypes {
|
for _, m := range mediaTypes {
|
||||||
set[m] = struct{}{}
|
set[m] = struct{}{}
|
||||||
@ -177,7 +177,7 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string
|
|||||||
if _, ok := set[mediaType]; ok {
|
if _, ok := set[mediaType]; ok {
|
||||||
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
|
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
|
||||||
payload := payloads[id]
|
payload := payloads[id]
|
||||||
return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, payload)
|
return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, env, payload)
|
||||||
}, true
|
}, true
|
||||||
}
|
}
|
||||||
return nil, false
|
return nil, false
|
||||||
|
@ -33,9 +33,10 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewBinaryProcessor returns a binary processor for use with processing content streams
|
// NewBinaryProcessor returns a binary processor for use with processing content streams
|
||||||
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
|
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args, env []string, payload *types.Any) (StreamProcessor, error) {
|
||||||
cmd := exec.CommandContext(ctx, name, args...)
|
cmd := exec.CommandContext(ctx, name, args...)
|
||||||
cmd.Env = os.Environ()
|
cmd.Env = os.Environ()
|
||||||
|
cmd.Env = append(cmd.Env, env...)
|
||||||
|
|
||||||
var payloadC io.Closer
|
var payloadC io.Closer
|
||||||
if payload != nil {
|
if payload != nil {
|
||||||
|
@ -39,9 +39,10 @@ import (
|
|||||||
const processorPipe = "STREAM_PROCESSOR_PIPE"
|
const processorPipe = "STREAM_PROCESSOR_PIPE"
|
||||||
|
|
||||||
// NewBinaryProcessor returns a binary processor for use with processing content streams
|
// NewBinaryProcessor returns a binary processor for use with processing content streams
|
||||||
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
|
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args, env []string, payload *types.Any) (StreamProcessor, error) {
|
||||||
cmd := exec.CommandContext(ctx, name, args...)
|
cmd := exec.CommandContext(ctx, name, args...)
|
||||||
cmd.Env = os.Environ()
|
cmd.Env = os.Environ()
|
||||||
|
cmd.Env = append(cmd.Env, env...)
|
||||||
|
|
||||||
if payload != nil {
|
if payload != nil {
|
||||||
data, err := proto.Marshal(payload)
|
data, err := proto.Marshal(payload)
|
||||||
|
@ -80,6 +80,8 @@ type StreamProcessor struct {
|
|||||||
Path string `toml:"path"`
|
Path string `toml:"path"`
|
||||||
// Args to the binary
|
// Args to the binary
|
||||||
Args []string `toml:"args"`
|
Args []string `toml:"args"`
|
||||||
|
// Environment variables for the binary
|
||||||
|
Env []string `toml:"env"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetVersion returns the config file's version
|
// GetVersion returns the config file's version
|
||||||
|
@ -91,7 +91,7 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
for id, p := range config.StreamProcessors {
|
for id, p := range config.StreamProcessors {
|
||||||
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args))
|
diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))
|
||||||
}
|
}
|
||||||
|
|
||||||
serverOpts := []grpc.ServerOption{
|
serverOpts := []grpc.ServerOption{
|
||||||
|
Loading…
Reference in New Issue
Block a user