
Add support for an 'env' field to the StreamProcessor configuration and append the environment variables found there to the os.Environ() array. The env field takes environment variables in the form of key=value. Signed-off-by: Stefan Berger <stefanb@linux.ibm.com>
188 lines
5.0 KiB
Go
188 lines
5.0 KiB
Go
/*
|
|
Copyright The containerd Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package diff
|
|
|
|
import (
|
|
"context"
|
|
"io"
|
|
"os"
|
|
|
|
"github.com/containerd/containerd/archive/compression"
|
|
"github.com/containerd/containerd/images"
|
|
"github.com/gogo/protobuf/types"
|
|
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
|
|
"github.com/pkg/errors"
|
|
)
|
|
|
|
var (
|
|
handlers []Handler
|
|
|
|
// ErrNoProcessor is returned when no stream processor is available for a media-type
|
|
ErrNoProcessor = errors.New("no processor for media-type")
|
|
)
|
|
|
|
func init() {
|
|
// register the default compression handler
|
|
RegisterProcessor(compressedHandler)
|
|
}
|
|
|
|
// RegisterProcessor registers a stream processor for media-types
|
|
func RegisterProcessor(handler Handler) {
|
|
handlers = append(handlers, handler)
|
|
}
|
|
|
|
// GetProcessor returns the processor for a media-type
|
|
func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
|
|
// reverse this list so that user configured handlers come up first
|
|
for i := len(handlers) - 1; i >= 0; i-- {
|
|
processor, ok := handlers[i](ctx, stream.MediaType())
|
|
if ok {
|
|
return processor(ctx, stream, payloads)
|
|
}
|
|
}
|
|
return nil, ErrNoProcessor
|
|
}
|
|
|
|
// Handler checks a media-type and initializes the processor
|
|
type Handler func(ctx context.Context, mediaType string) (StreamProcessorInit, bool)
|
|
|
|
// StaticHandler returns the processor init func for a static media-type
|
|
func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler {
|
|
return func(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
|
|
if mediaType == expectedMediaType {
|
|
return fn, true
|
|
}
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
// StreamProcessorInit returns the initialized stream processor
|
|
type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error)
|
|
|
|
// RawProcessor provides access to direct fd for processing
|
|
type RawProcessor interface {
|
|
// File returns the fd for the read stream of the underlying processor
|
|
File() *os.File
|
|
}
|
|
|
|
// StreamProcessor handles processing a content stream and transforming it into a different media-type
|
|
type StreamProcessor interface {
|
|
io.ReadCloser
|
|
|
|
// MediaType is the resulting media-type that the processor processes the stream into
|
|
MediaType() string
|
|
}
|
|
|
|
func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
|
|
compressed, err := images.DiffCompression(ctx, mediaType)
|
|
if err != nil {
|
|
return nil, false
|
|
}
|
|
if compressed != "" {
|
|
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
|
|
ds, err := compression.DecompressStream(stream)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &compressedProcessor{
|
|
rc: ds,
|
|
}, nil
|
|
}, true
|
|
}
|
|
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
|
|
return &stdProcessor{
|
|
rc: stream,
|
|
}, nil
|
|
}, true
|
|
}
|
|
|
|
// NewProcessorChain initialized the root StreamProcessor
|
|
func NewProcessorChain(mt string, r io.Reader) StreamProcessor {
|
|
return &processorChain{
|
|
mt: mt,
|
|
rc: r,
|
|
}
|
|
}
|
|
|
|
type processorChain struct {
|
|
mt string
|
|
rc io.Reader
|
|
}
|
|
|
|
func (c *processorChain) MediaType() string {
|
|
return c.mt
|
|
}
|
|
|
|
func (c *processorChain) Read(p []byte) (int, error) {
|
|
return c.rc.Read(p)
|
|
}
|
|
|
|
func (c *processorChain) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type stdProcessor struct {
|
|
rc StreamProcessor
|
|
}
|
|
|
|
func (c *stdProcessor) MediaType() string {
|
|
return ocispec.MediaTypeImageLayer
|
|
}
|
|
|
|
func (c *stdProcessor) Read(p []byte) (int, error) {
|
|
return c.rc.Read(p)
|
|
}
|
|
|
|
func (c *stdProcessor) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type compressedProcessor struct {
|
|
rc io.ReadCloser
|
|
}
|
|
|
|
func (c *compressedProcessor) MediaType() string {
|
|
return ocispec.MediaTypeImageLayer
|
|
}
|
|
|
|
func (c *compressedProcessor) Read(p []byte) (int, error) {
|
|
return c.rc.Read(p)
|
|
}
|
|
|
|
func (c *compressedProcessor) Close() error {
|
|
return c.rc.Close()
|
|
}
|
|
|
|
func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args, env []string) Handler {
|
|
set := make(map[string]struct{}, len(mediaTypes))
|
|
for _, m := range mediaTypes {
|
|
set[m] = struct{}{}
|
|
}
|
|
return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) {
|
|
if _, ok := set[mediaType]; ok {
|
|
return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) {
|
|
payload := payloads[id]
|
|
return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, env, payload)
|
|
}, true
|
|
}
|
|
return nil, false
|
|
}
|
|
}
|
|
|
|
const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE"
|