From 97a98773cf0bdb489744ce22997e77a8b47b77fb Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 2 Aug 2019 15:02:16 +0000 Subject: [PATCH 1/8] Add StreamProcessor for apply Signed-off-by: Michael Crosby --- diff/apply/apply.go | 73 +++++++------- diff/diff.go | 2 + diff/stream.go | 234 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 269 insertions(+), 40 deletions(-) create mode 100644 diff/stream.go diff --git a/diff/apply/apply.go b/diff/apply/apply.go index e4251026c..db077bccd 100644 --- a/diff/apply/apply.go +++ b/diff/apply/apply.go @@ -23,11 +23,8 @@ import ( "time" "github.com/containerd/containerd/archive" - "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" - "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/mount" digest "github.com/opencontainers/go-digest" @@ -66,54 +63,50 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ } }() - isCompressed, err := images.IsCompressedDiff(ctx, desc.MediaType) - if err != nil { - return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType) + var config diff.ApplyConfig + for _, o := range opts { + if err := o(&config); err != nil { + return emptyDesc, errors.Wrap(err, "failed to apply config opt") + } } - var ocidesc ocispec.Descriptor + ra, err := s.store.ReaderAt(ctx, desc) + if err != nil { + return emptyDesc, errors.Wrap(err, "failed to get reader from content store") + } + defer ra.Close() + + processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + for { + if processor, err = diff.GetProcessor(ctx, desc.MediaType, processor, config.ProcessorPayloads); err != nil { + return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) + } + if processor.MediaType() == ocispec.MediaTypeImageLayer { + break + } + } + defer processor.Close() + + digester := digest.Canonical.Digester() + rc := &readCounter{ + r: io.TeeReader(processor, digester.Hash()), + } if err := mount.WithTempMount(ctx, mounts, func(root string) error { - ra, err := s.store.ReaderAt(ctx, desc) - if err != nil { - return errors.Wrap(err, "failed to get reader from content store") - } - defer ra.Close() - - r := content.NewReader(ra) - if isCompressed { - ds, err := compression.DecompressStream(r) - if err != nil { - return err - } - defer ds.Close() - r = ds - } - - digester := digest.Canonical.Digester() - rc := &readCounter{ - r: io.TeeReader(r, digester.Hash()), - } - if _, err := archive.Apply(ctx, root, rc); err != nil { return err } // Read any trailing data - if _, err := io.Copy(ioutil.Discard, rc); err != nil { - return err - } - - ocidesc = ocispec.Descriptor{ - MediaType: ocispec.MediaTypeImageLayer, - Size: rc.c, - Digest: digester.Digest(), - } - return nil - + _, err := io.Copy(ioutil.Discard, rc) + return err }); err != nil { return emptyDesc, err } - return ocidesc, nil + return ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageLayer, + Size: rc.c, + Digest: digester.Digest(), + }, nil } type readCounter struct { diff --git a/diff/diff.go b/diff/diff.go index a62a4671a..4a5193741 100644 --- a/diff/diff.go +++ b/diff/diff.go @@ -53,6 +53,8 @@ type Comparer interface { // ApplyConfig is used to hold parameters needed for a apply operation type ApplyConfig struct { + // ProcessorPayloads specifies the payload sent to various processors + ProcessorPayloads map[string]interface{} } // ApplyOpt is used to configure an Apply operation diff --git a/diff/stream.go b/diff/stream.go new file mode 100644 index 000000000..1a336fd17 --- /dev/null +++ b/diff/stream.go @@ -0,0 +1,234 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package diff + +import ( + "context" + "io" + "os" + "os/exec" + + "github.com/containerd/containerd/archive/compression" + "github.com/containerd/containerd/images" + ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +var ( + handlers []Handler + + // ErrNoProcessor is returned when no stream processor is avaliable for a media-type + ErrNoProcessor = errors.New("no processor for media-type") +) + +func init() { + // register the default compression handler + RegisterProcessor(compressedHandler) +} + +// RegisterProcessor registers a stream processor for media-types +func RegisterProcessor(handler Handler) { + handlers = append(handlers, handler) +} + +// GetProcessor returns the processor for a media-type +func GetProcessor(ctx context.Context, mediaType string, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { + // reverse this list so that user configured handlers come up first + for i := len(handlers); i >= 0; i-- { + processor, ok := handlers[i](ctx, mediaType) + if ok { + return processor(ctx, stream, payloads) + } + } + return nil, ErrNoProcessor +} + +// Handler checks a media-type and initializes the processor +type Handler func(ctx context.Context, mediaType string) (StreamProcessorInit, bool) + +// StaticHandler returns the processor init func for a static media-type +func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler { + return func(ctx context.Context, mediaType string) (StreamProcessorInit, bool) { + if mediaType == expectedMediaType { + return fn, true + } + return nil, false + } +} + +// StreamProcessorInit returns the initialized stream processor +type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) + +// RawProcessor provides access to direct fd for processing +type RawProcessor interface { + // File returns the fd for the read stream of the underlying processor + File() *os.File +} + +// StreamProcessor handles processing a content stream and transforming it into a different media-type +type StreamProcessor interface { + io.ReadCloser + + // MediaType is the resulting media-type that the processor processes the stream into + MediaType() string +} + +func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorInit, bool) { + compressed, err := images.IsCompressedDiff(ctx, mediaType) + if err != nil { + return nil, false + } + if compressed { + return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { + ds, err := compression.DecompressStream(stream) + if err != nil { + return nil, err + } + + return &compressedProcessor{ + rc: ds, + }, nil + }, true + } + return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { + return &stdProcessor{ + rc: stream, + }, nil + }, true +} + +// NewProcessorChain initialized the root StreamProcessor +func NewProcessorChain(mt string, r io.Reader) StreamProcessor { + return &processorChain{ + mt: mt, + rc: r, + } +} + +type processorChain struct { + mt string + rc io.Reader +} + +func (c *processorChain) MediaType() string { + return c.mt +} + +func (c *processorChain) Read(p []byte) (int, error) { + return c.rc.Read(p) +} + +func (c *processorChain) Close() error { + return nil +} + +type stdProcessor struct { + rc StreamProcessor +} + +func (c *stdProcessor) MediaType() string { + return ocispec.MediaTypeImageLayer +} + +func (c *stdProcessor) Read(p []byte) (int, error) { + return c.rc.Read(p) +} + +func (c *stdProcessor) Close() error { + return nil +} + +type compressedProcessor struct { + rc io.ReadCloser +} + +func (c *compressedProcessor) MediaType() string { + return ocispec.MediaTypeImageLayer +} + +func (c *compressedProcessor) Read(p []byte) (int, error) { + return c.rc.Read(p) +} + +func (c *compressedProcessor) Close() error { + return c.rc.Close() +} + +// NewBinaryProcessor returns a binary processor for use with processing content streams +func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, name string, args ...string) (StreamProcessor, error) { + cmd := exec.CommandContext(ctx, name, args...) + var ( + stdin *os.File + err error + ) + if f, ok := stream.(RawProcessor); ok { + stdin = f.File() + } else { + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + stdin = r + go func() { + io.Copy(w, stream) + }() + } + cmd.Stdin = stdin + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + cmd.Stdout = w + if err := cmd.Start(); err != nil { + return nil, err + } + // close after start and dup + stdin.Close() + w.Close() + + return &binaryProcessor{ + cmd: cmd, + r: r, + mt: mt, + }, nil +} + +type binaryProcessor struct { + cmd *exec.Cmd + r *os.File + mt string +} + +func (c *binaryProcessor) File() *os.File { + return c.r +} + +func (c *binaryProcessor) MediaType() string { + return c.mt +} + +func (c *binaryProcessor) Read(p []byte) (int, error) { + return c.r.Read(p) +} + +func (c *binaryProcessor) Close() error { + err := c.r.Close() + if kerr := c.cmd.Process.Kill(); err == nil { + err = kerr + } + return err +} From 366823727f38fe412d5143de5cdbf27f2986cf27 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Fri, 2 Aug 2019 17:43:22 +0000 Subject: [PATCH 2/8] Add server config for stream processors Signed-off-by: Michael Crosby --- diff/apply/apply.go | 2 +- diff/diff.go | 3 +- diff/stream.go | 76 +++++++++++++++++++++++--------- services/server/config/config.go | 16 +++++++ services/server/server.go | 5 +++ 5 files changed, 78 insertions(+), 24 deletions(-) diff --git a/diff/apply/apply.go b/diff/apply/apply.go index db077bccd..99a6b802e 100644 --- a/diff/apply/apply.go +++ b/diff/apply/apply.go @@ -78,7 +78,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) for { - if processor, err = diff.GetProcessor(ctx, desc.MediaType, processor, config.ProcessorPayloads); err != nil { + if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) } if processor.MediaType() == ocispec.MediaTypeImageLayer { diff --git a/diff/diff.go b/diff/diff.go index 4a5193741..6dcef6ccb 100644 --- a/diff/diff.go +++ b/diff/diff.go @@ -20,6 +20,7 @@ import ( "context" "github.com/containerd/containerd/mount" + "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -54,7 +55,7 @@ type Comparer interface { // ApplyConfig is used to hold parameters needed for a apply operation type ApplyConfig struct { // ProcessorPayloads specifies the payload sent to various processors - ProcessorPayloads map[string]interface{} + ProcessorPayloads map[string]*types.Any } // ApplyOpt is used to configure an Apply operation diff --git a/diff/stream.go b/diff/stream.go index 1a336fd17..cb60ef61e 100644 --- a/diff/stream.go +++ b/diff/stream.go @@ -18,12 +18,15 @@ package diff import ( "context" + "fmt" "io" "os" "os/exec" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/images" + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) @@ -31,7 +34,7 @@ import ( var ( handlers []Handler - // ErrNoProcessor is returned when no stream processor is avaliable for a media-type + // ErrNoProcessor is returned when no stream processor is available for a media-type ErrNoProcessor = errors.New("no processor for media-type") ) @@ -46,10 +49,10 @@ func RegisterProcessor(handler Handler) { } // GetProcessor returns the processor for a media-type -func GetProcessor(ctx context.Context, mediaType string, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { +func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { // reverse this list so that user configured handlers come up first - for i := len(handlers); i >= 0; i-- { - processor, ok := handlers[i](ctx, mediaType) + for i := len(handlers) - 1; i >= 0; i-- { + processor, ok := handlers[i](ctx, stream.MediaType()) if ok { return processor(ctx, stream, payloads) } @@ -71,7 +74,7 @@ func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler { } // StreamProcessorInit returns the initialized stream processor -type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) +type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) // RawProcessor provides access to direct fd for processing type RawProcessor interface { @@ -93,7 +96,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn return nil, false } if compressed { - return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { + return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { ds, err := compression.DecompressStream(stream) if err != nil { return nil, err @@ -104,7 +107,7 @@ func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorIn }, nil }, true } - return func(ctx context.Context, stream StreamProcessor, payloads map[string]interface{}) (StreamProcessor, error) { + return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { return &stdProcessor{ rc: stream, }, nil @@ -168,24 +171,48 @@ func (c *compressedProcessor) Close() error { return c.rc.Close() } +func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args []string) Handler { + set := make(map[string]struct{}, len(mediaTypes)) + for _, m := range mediaTypes { + set[m] = struct{}{} + } + return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) { + if _, ok := set[mediaType]; ok { + return func(ctx context.Context, stream StreamProcessor, payloads map[string]*types.Any) (StreamProcessor, error) { + payload := payloads[id] + return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, payload) + }, true + } + return nil, false + } +} + +const ( + payloadEnvVar = "STREAM_PROCESSOR_PAYLOAD" + mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE" +) + // NewBinaryProcessor returns a binary processor for use with processing content streams -func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, name string, args ...string) (StreamProcessor, error) { +func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { cmd := exec.CommandContext(ctx, name, args...) - var ( - stdin *os.File - err error - ) - if f, ok := stream.(RawProcessor); ok { - stdin = f.File() - } else { - r, w, err := os.Pipe() + if payload != nil { + data, err := proto.Marshal(payload) if err != nil { return nil, err } - stdin = r - go func() { - io.Copy(w, stream) - }() + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", payloadEnvVar, data)) + } + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) + var ( + stdin io.Reader + closer func() error + err error + ) + if f, ok := stream.(RawProcessor); ok { + stdin = f.File() + closer = f.File().Close + } else { + stdin = stream } cmd.Stdin = stdin r, w, err := os.Pipe() @@ -193,17 +220,22 @@ func NewBinaryProcessor(ctx context.Context, mt string, stream StreamProcessor, return nil, err } cmd.Stdout = w + if err := cmd.Start(); err != nil { return nil, err } + go cmd.Wait() + // close after start and dup - stdin.Close() w.Close() + if closer != nil { + closer() + } return &binaryProcessor{ cmd: cmd, r: r, - mt: mt, + mt: rmt, }, nil } diff --git a/services/server/config/config.go b/services/server/config/config.go index 13ccd5b6f..365dfa0fd 100644 --- a/services/server/config/config.go +++ b/services/server/config/config.go @@ -56,9 +56,25 @@ type Config struct { // ProxyPlugins configures plugins which are communicated to over GRPC ProxyPlugins map[string]ProxyPlugin `toml:"proxy_plugins"` + StreamProcessors []StreamProcessor `toml:"stream_processors"` + md toml.MetaData } +// StreamProcessor provides configuration for diff content processors +type StreamProcessor struct { + // ID of the processor, also used to fetch the specific payload + ID string `toml:"id"` + // Accepts specific media-types + Accepts []string `toml:"accepts"` + // Returns the media-type + Returns string `toml:"returns"` + // Path or name of the binary + Path string `toml:"path"` + // Args to the binary + Args []string `toml:"args"` +} + // GetVersion returns the config file's version func (c *Config) GetVersion() int { if c.Version == 0 { diff --git a/services/server/server.go b/services/server/server.go index 3c85b4b7c..0e6923918 100644 --- a/services/server/server.go +++ b/services/server/server.go @@ -35,6 +35,7 @@ import ( "github.com/containerd/containerd/content/local" csproxy "github.com/containerd/containerd/content/proxy" "github.com/containerd/containerd/defaults" + "github.com/containerd/containerd/diff" "github.com/containerd/containerd/events/exchange" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" @@ -80,6 +81,10 @@ func New(ctx context.Context, config *srvconfig.Config) (*Server, error) { if err != nil { return nil, err } + for _, p := range config.StreamProcessors { + diff.RegisterProcessor(diff.BinaryHandler(p.ID, p.Returns, p.Accepts, p.Path, p.Args)) + } + serverOpts := []grpc.ServerOption{ grpc.UnaryInterceptor(grpc_prometheus.UnaryServerInterceptor), grpc.StreamInterceptor(grpc_prometheus.StreamServerInterceptor), From f867401c69d40f75ada6e3f1101cb5f2310792d0 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 5 Aug 2019 14:33:04 +0000 Subject: [PATCH 3/8] Use fds and pass Payloads over diff api Signed-off-by: Michael Crosby --- api/next.pb.txt | 30 ++++ api/services/diff/v1/diff.pb.go | 266 +++++++++++++++++++++++++++----- api/services/diff/v1/diff.proto | 3 + diff/diff.go | 8 + diff/stream.go | 20 ++- services/diff/local.go | 3 + 6 files changed, 288 insertions(+), 42 deletions(-) diff --git a/api/next.pb.txt b/api/next.pb.txt index b52350941..b4a06c11b 100755 --- a/api/next.pb.txt +++ b/api/next.pb.txt @@ -1811,6 +1811,7 @@ file { name: "github.com/containerd/containerd/api/services/diff/v1/diff.proto" package: "containerd.services.diff.v1" dependency: "gogoproto/gogo.proto" + dependency: "google/protobuf/any.proto" dependency: "github.com/containerd/containerd/api/types/mount.proto" dependency: "github.com/containerd/containerd/api/types/descriptor.proto" message_type { @@ -1831,6 +1832,35 @@ file { type_name: ".containerd.types.Mount" json_name: "mounts" } + field { + name: "payloads" + number: 3 + label: LABEL_REPEATED + type: TYPE_MESSAGE + type_name: ".containerd.services.diff.v1.ApplyRequest.PayloadsEntry" + json_name: "payloads" + } + nested_type { + name: "PayloadsEntry" + field { + name: "key" + number: 1 + label: LABEL_OPTIONAL + type: TYPE_STRING + json_name: "key" + } + field { + name: "value" + number: 2 + label: LABEL_OPTIONAL + type: TYPE_MESSAGE + type_name: ".google.protobuf.Any" + json_name: "value" + } + options { + map_entry: true + } + } } message_type { name: "ApplyResponse" diff --git a/api/services/diff/v1/diff.pb.go b/api/services/diff/v1/diff.pb.go index 9ada87346..6c7920004 100644 --- a/api/services/diff/v1/diff.pb.go +++ b/api/services/diff/v1/diff.pb.go @@ -9,6 +9,7 @@ import ( types "github.com/containerd/containerd/api/types" proto "github.com/gogo/protobuf/proto" github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" + types1 "github.com/gogo/protobuf/types" grpc "google.golang.org/grpc" io "io" math "math" @@ -29,11 +30,12 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package type ApplyRequest struct { // Diff is the descriptor of the diff to be extracted - Diff *types.Descriptor `protobuf:"bytes,1,opt,name=diff,proto3" json:"diff,omitempty"` - Mounts []*types.Mount `protobuf:"bytes,2,rep,name=mounts,proto3" json:"mounts,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + Diff *types.Descriptor `protobuf:"bytes,1,opt,name=diff,proto3" json:"diff,omitempty"` + Mounts []*types.Mount `protobuf:"bytes,2,rep,name=mounts,proto3" json:"mounts,omitempty"` + Payloads map[string]*types1.Any `protobuf:"bytes,3,rep,name=payloads,proto3" json:"payloads,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *ApplyRequest) Reset() { *m = ApplyRequest{} } @@ -205,6 +207,7 @@ var xxx_messageInfo_DiffResponse proto.InternalMessageInfo func init() { proto.RegisterType((*ApplyRequest)(nil), "containerd.services.diff.v1.ApplyRequest") + proto.RegisterMapType((map[string]*types1.Any)(nil), "containerd.services.diff.v1.ApplyRequest.PayloadsEntry") proto.RegisterType((*ApplyResponse)(nil), "containerd.services.diff.v1.ApplyResponse") proto.RegisterType((*DiffRequest)(nil), "containerd.services.diff.v1.DiffRequest") proto.RegisterMapType((map[string]string)(nil), "containerd.services.diff.v1.DiffRequest.LabelsEntry") @@ -216,36 +219,40 @@ func init() { } var fileDescriptor_3b36a99e6faaa935 = []byte{ - // 457 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0x4f, 0x6f, 0xd3, 0x30, - 0x14, 0xaf, 0xfb, 0x0f, 0xf5, 0x75, 0x48, 0xc8, 0x9a, 0x44, 0x14, 0x20, 0xaa, 0x7a, 0xea, 0x40, - 0x38, 0xac, 0xa0, 0x09, 0xb6, 0xcb, 0x40, 0x43, 0x5c, 0xc6, 0x25, 0xda, 0x01, 0x81, 0x04, 0x4a, - 0x9b, 0x97, 0xce, 0x22, 0x8d, 0xbd, 0xd8, 0xad, 0x94, 0x1b, 0xdf, 0x85, 0x8f, 0xc2, 0x65, 0x47, - 0x8e, 0x1c, 0x69, 0x3f, 0x09, 0xb2, 0x93, 0x40, 0x24, 0xa4, 0x12, 0x76, 0xca, 0xcb, 0xf3, 0xef, - 0x9f, 0xfd, 0x6c, 0x38, 0x5d, 0x70, 0x7d, 0xb9, 0x9a, 0xb1, 0xb9, 0x58, 0xfa, 0x73, 0x91, 0xea, - 0x90, 0xa7, 0x98, 0x45, 0xf5, 0x32, 0x94, 0xdc, 0x57, 0x98, 0xad, 0xf9, 0x1c, 0x95, 0x1f, 0xf1, - 0x38, 0xf6, 0xd7, 0x87, 0xf6, 0xcb, 0x64, 0x26, 0xb4, 0xa0, 0xf7, 0xfe, 0x60, 0x59, 0x85, 0x63, - 0x76, 0x7d, 0x7d, 0xe8, 0xee, 0x2f, 0xc4, 0x42, 0x58, 0x9c, 0x6f, 0xaa, 0x82, 0xe2, 0x1e, 0x35, - 0x32, 0xd5, 0xb9, 0x44, 0xe5, 0x2f, 0xc5, 0x2a, 0xd5, 0x25, 0xef, 0xe4, 0x3f, 0x78, 0x11, 0xaa, - 0x79, 0xc6, 0xa5, 0x16, 0x59, 0x41, 0x1e, 0x5f, 0xc1, 0xde, 0x4b, 0x29, 0x93, 0x3c, 0xc0, 0xab, - 0x15, 0x2a, 0x4d, 0x9f, 0x40, 0xd7, 0xa4, 0x74, 0xc8, 0x88, 0x4c, 0x86, 0xd3, 0xfb, 0xac, 0xb6, - 0x0d, 0xab, 0xc0, 0xce, 0x7e, 0x2b, 0x04, 0x16, 0x49, 0x7d, 0xe8, 0xdb, 0x34, 0xca, 0x69, 0x8f, - 0x3a, 0x93, 0xe1, 0xf4, 0xee, 0xdf, 0x9c, 0xb7, 0x66, 0x3d, 0x28, 0x61, 0xe3, 0x37, 0x70, 0xbb, - 0xb4, 0x54, 0x52, 0xa4, 0x0a, 0xe9, 0x11, 0xdc, 0x0a, 0xa5, 0x4c, 0x38, 0x46, 0x8d, 0x6c, 0x2b, - 0xf0, 0xf8, 0x6b, 0x1b, 0x86, 0x67, 0x3c, 0x8e, 0xab, 0xec, 0x8f, 0xa0, 0x9b, 0x60, 0xac, 0x1d, - 0xb2, 0x3b, 0x87, 0x05, 0xd1, 0xc7, 0xd0, 0xcb, 0xf8, 0xe2, 0x52, 0xff, 0x2b, 0x75, 0x81, 0xa2, - 0x0f, 0x00, 0x96, 0x18, 0xf1, 0xf0, 0x93, 0x59, 0x73, 0x3a, 0x23, 0x32, 0x19, 0x04, 0x03, 0xdb, - 0xb9, 0xc8, 0x25, 0xd2, 0x3b, 0xd0, 0xc9, 0x30, 0x76, 0xba, 0xb6, 0x6f, 0x4a, 0x7a, 0x0e, 0xfd, - 0x24, 0x9c, 0x61, 0xa2, 0x9c, 0x9e, 0x35, 0x78, 0xc6, 0x76, 0xdc, 0x08, 0x56, 0xdb, 0x06, 0x3b, - 0xb7, 0xb4, 0xd7, 0xa9, 0xce, 0xf2, 0xa0, 0xd4, 0x70, 0x5f, 0xc0, 0xb0, 0xd6, 0x36, 0x76, 0x9f, - 0x31, 0xb7, 0xa7, 0x35, 0x08, 0x4c, 0x49, 0xf7, 0xa1, 0xb7, 0x0e, 0x93, 0x15, 0x3a, 0x6d, 0xdb, - 0x2b, 0x7e, 0x8e, 0xdb, 0xcf, 0xc9, 0xf8, 0x14, 0xf6, 0x0a, 0xf5, 0xf2, 0xb4, 0xab, 0x09, 0x77, - 0x9a, 0x4e, 0x78, 0xfa, 0x8d, 0x40, 0xd7, 0x48, 0xd0, 0x8f, 0xd0, 0xb3, 0x93, 0xa3, 0x07, 0x3b, - 0x37, 0x53, 0xbf, 0x50, 0xee, 0xc3, 0x26, 0xd0, 0x32, 0xda, 0x87, 0xd2, 0x67, 0xd2, 0xf4, 0xac, - 0xdc, 0x83, 0x06, 0xc8, 0x42, 0xfc, 0xd5, 0xc5, 0xf5, 0xc6, 0x6b, 0xfd, 0xd8, 0x78, 0xad, 0x2f, - 0x5b, 0x8f, 0x5c, 0x6f, 0x3d, 0xf2, 0x7d, 0xeb, 0x91, 0x9f, 0x5b, 0x8f, 0xbc, 0x3f, 0xbe, 0xd1, - 0x6b, 0x3f, 0x31, 0xdf, 0x77, 0xad, 0x59, 0xdf, 0x3e, 0xa4, 0xa7, 0xbf, 0x02, 0x00, 0x00, 0xff, - 0xff, 0x61, 0xd1, 0x6e, 0x9e, 0x34, 0x04, 0x00, 0x00, + // 526 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x54, 0x41, 0x6f, 0xd3, 0x4c, + 0x10, 0x8d, 0xed, 0x24, 0xdf, 0x97, 0x49, 0x2b, 0xa1, 0x55, 0x24, 0x8c, 0x01, 0xab, 0xca, 0x29, + 0x2d, 0x62, 0x4d, 0x03, 0x2a, 0xd0, 0x5e, 0x5a, 0x54, 0xc4, 0xa5, 0x48, 0x60, 0x7a, 0x40, 0x20, + 0x81, 0x9c, 0x78, 0xed, 0xae, 0x70, 0xbc, 0x8b, 0x77, 0x1d, 0xc9, 0x37, 0xfe, 0x06, 0x67, 0x7e, + 0x0a, 0x97, 0x1e, 0x39, 0x72, 0xa4, 0xf9, 0x25, 0xc8, 0xeb, 0x75, 0x31, 0x02, 0x05, 0xc3, 0xc9, + 0x9b, 0x9d, 0xf7, 0xde, 0xce, 0xbc, 0x37, 0x0a, 0x1c, 0xc6, 0x54, 0x9e, 0xe5, 0x33, 0x3c, 0x67, + 0x0b, 0x6f, 0xce, 0x52, 0x19, 0xd0, 0x94, 0x64, 0x61, 0xf3, 0x18, 0x70, 0xea, 0x09, 0x92, 0x2d, + 0xe9, 0x9c, 0x08, 0x2f, 0xa4, 0x51, 0xe4, 0x2d, 0x77, 0xd5, 0x17, 0xf3, 0x8c, 0x49, 0x86, 0xae, + 0xff, 0xc0, 0xe2, 0x1a, 0x87, 0x55, 0x7d, 0xb9, 0xeb, 0x8c, 0x62, 0x16, 0x33, 0x85, 0xf3, 0xca, + 0x53, 0x45, 0x71, 0xae, 0xc5, 0x8c, 0xc5, 0x09, 0xf1, 0xd4, 0xaf, 0x59, 0x1e, 0x79, 0x41, 0x5a, + 0xe8, 0xd2, 0x5e, 0xab, 0x7e, 0x64, 0xc1, 0x89, 0xf0, 0x16, 0x2c, 0x4f, 0xa5, 0xe6, 0x1d, 0xfc, + 0x05, 0x2f, 0x24, 0x62, 0x9e, 0x51, 0x2e, 0x59, 0x56, 0x91, 0xc7, 0x1f, 0x4d, 0xd8, 0x38, 0xe2, + 0x3c, 0x29, 0x7c, 0xf2, 0x3e, 0x27, 0x42, 0xa2, 0x3b, 0xd0, 0x2d, 0x27, 0xb0, 0x8d, 0x2d, 0x63, + 0x32, 0x9c, 0xde, 0xc0, 0x8d, 0x11, 0x95, 0x04, 0x3e, 0xbe, 0x94, 0xf0, 0x15, 0x12, 0x79, 0xd0, + 0x57, 0xed, 0x08, 0xdb, 0xdc, 0xb2, 0x26, 0xc3, 0xe9, 0xd5, 0x5f, 0x39, 0x4f, 0xcb, 0xba, 0xaf, + 0x61, 0xe8, 0x05, 0xfc, 0xcf, 0x83, 0x22, 0x61, 0x41, 0x28, 0x6c, 0x4b, 0x51, 0xee, 0xe3, 0x35, + 0x4e, 0xe2, 0x66, 0x7f, 0xf8, 0x99, 0x66, 0x3e, 0x4e, 0x65, 0x56, 0xf8, 0x97, 0x42, 0xce, 0x73, + 0xd8, 0xfc, 0xa9, 0x84, 0xae, 0x80, 0xf5, 0x8e, 0x14, 0x6a, 0x8e, 0x81, 0x5f, 0x1e, 0xd1, 0x0e, + 0xf4, 0x96, 0x41, 0x92, 0x13, 0xdb, 0x54, 0xb3, 0x8d, 0x70, 0x95, 0x05, 0xae, 0xb3, 0xc0, 0x47, + 0x69, 0xe1, 0x57, 0x90, 0x7d, 0xf3, 0x81, 0x31, 0x7e, 0x02, 0x9b, 0xfa, 0x69, 0xc1, 0x59, 0x2a, + 0x08, 0xda, 0x83, 0xff, 0x02, 0xce, 0x13, 0x4a, 0xc2, 0x56, 0xf6, 0xd4, 0xe0, 0xf1, 0x27, 0x13, + 0x86, 0xc7, 0x34, 0x8a, 0x6a, 0x8f, 0x6f, 0x41, 0x37, 0x21, 0x91, 0xb4, 0x8d, 0xf5, 0x7e, 0x29, + 0x10, 0xba, 0x0d, 0xbd, 0x8c, 0xc6, 0x67, 0xf2, 0x4f, 0xee, 0x56, 0x28, 0x74, 0x13, 0x60, 0x41, + 0x42, 0x1a, 0xbc, 0x2d, 0x6b, 0xb6, 0xa5, 0xa6, 0x1f, 0xa8, 0x9b, 0xd3, 0x82, 0x93, 0xd2, 0x95, + 0x8c, 0x44, 0x76, 0xb7, 0x72, 0x25, 0x23, 0x11, 0x3a, 0x81, 0x7e, 0x12, 0xcc, 0x48, 0x22, 0xec, + 0x9e, 0x7a, 0xe0, 0xde, 0xda, 0x2c, 0x1a, 0x63, 0xe0, 0x13, 0x45, 0xab, 0x82, 0xd0, 0x1a, 0xce, + 0x43, 0x18, 0x36, 0xae, 0x7f, 0x13, 0xc2, 0xa8, 0x19, 0xc2, 0xa0, 0x69, 0xf7, 0x21, 0x6c, 0x54, + 0xea, 0xda, 0xed, 0x7a, 0x13, 0xad, 0xb6, 0x9b, 0x38, 0xfd, 0x6c, 0x40, 0xb7, 0x94, 0x40, 0x6f, + 0xa0, 0xa7, 0x92, 0x43, 0xdb, 0xad, 0x17, 0xcb, 0xd9, 0x69, 0x03, 0xd5, 0xad, 0xbd, 0xd6, 0xef, + 0x4c, 0xda, 0x7a, 0xe5, 0x6c, 0xb7, 0x40, 0x56, 0xe2, 0x8f, 0x4e, 0xcf, 0x2f, 0xdc, 0xce, 0xd7, + 0x0b, 0xb7, 0xf3, 0x61, 0xe5, 0x1a, 0xe7, 0x2b, 0xd7, 0xf8, 0xb2, 0x72, 0x8d, 0x6f, 0x2b, 0xd7, + 0x78, 0xb5, 0xff, 0x4f, 0xff, 0x58, 0x07, 0xe5, 0xf7, 0x65, 0x67, 0xd6, 0x57, 0x7b, 0x7e, 0xf7, + 0x7b, 0x00, 0x00, 0x00, 0xff, 0xff, 0xf7, 0x85, 0x25, 0xb8, 0xf8, 0x04, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -400,6 +407,34 @@ func (m *ApplyRequest) MarshalTo(dAtA []byte) (int, error) { i += n } } + if len(m.Payloads) > 0 { + for k, _ := range m.Payloads { + dAtA[i] = 0x1a + i++ + v := m.Payloads[k] + msgSize := 0 + if v != nil { + msgSize = v.Size() + msgSize += 1 + sovDiff(uint64(msgSize)) + } + mapSize := 1 + len(k) + sovDiff(uint64(len(k))) + msgSize + i = encodeVarintDiff(dAtA, i, uint64(mapSize)) + dAtA[i] = 0xa + i++ + i = encodeVarintDiff(dAtA, i, uint64(len(k))) + i += copy(dAtA[i:], k) + if v != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintDiff(dAtA, i, uint64(v.Size())) + n2, err := v.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + } + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -425,11 +460,11 @@ func (m *ApplyResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0xa i++ i = encodeVarintDiff(dAtA, i, uint64(m.Applied.Size())) - n2, err := m.Applied.MarshalTo(dAtA[i:]) + n3, err := m.Applied.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n2 + i += n3 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -530,11 +565,11 @@ func (m *DiffResponse) MarshalTo(dAtA []byte) (int, error) { dAtA[i] = 0x1a i++ i = encodeVarintDiff(dAtA, i, uint64(m.Diff.Size())) - n3, err := m.Diff.MarshalTo(dAtA[i:]) + n4, err := m.Diff.MarshalTo(dAtA[i:]) if err != nil { return 0, err } - i += n3 + i += n4 } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) @@ -567,6 +602,19 @@ func (m *ApplyRequest) Size() (n int) { n += 1 + l + sovDiff(uint64(l)) } } + if len(m.Payloads) > 0 { + for k, v := range m.Payloads { + _ = k + _ = v + l = 0 + if v != nil { + l = v.Size() + l += 1 + sovDiff(uint64(l)) + } + mapEntrySize := 1 + len(k) + sovDiff(uint64(len(k))) + l + n += mapEntrySize + 1 + sovDiff(uint64(mapEntrySize)) + } + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -662,9 +710,20 @@ func (this *ApplyRequest) String() string { if this == nil { return "nil" } + keysForPayloads := make([]string, 0, len(this.Payloads)) + for k, _ := range this.Payloads { + keysForPayloads = append(keysForPayloads, k) + } + github_com_gogo_protobuf_sortkeys.Strings(keysForPayloads) + mapStringForPayloads := "map[string]*types1.Any{" + for _, k := range keysForPayloads { + mapStringForPayloads += fmt.Sprintf("%v: %v,", k, this.Payloads[k]) + } + mapStringForPayloads += "}" s := strings.Join([]string{`&ApplyRequest{`, `Diff:` + strings.Replace(fmt.Sprintf("%v", this.Diff), "Descriptor", "types.Descriptor", 1) + `,`, `Mounts:` + strings.Replace(fmt.Sprintf("%v", this.Mounts), "Mount", "types.Mount", 1) + `,`, + `Payloads:` + mapStringForPayloads + `,`, `XXX_unrecognized:` + fmt.Sprintf("%v", this.XXX_unrecognized) + `,`, `}`, }, "") @@ -824,6 +883,135 @@ func (m *ApplyRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Payloads", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDiff + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDiff + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthDiff + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Payloads == nil { + m.Payloads = make(map[string]*types1.Any) + } + var mapkey string + var mapvalue *types1.Any + for iNdEx < postIndex { + entryPreIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDiff + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + if fieldNum == 1 { + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDiff + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthDiff + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey < 0 { + return ErrInvalidLengthDiff + } + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey = string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + } else if fieldNum == 2 { + var mapmsglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDiff + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + mapmsglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if mapmsglen < 0 { + return ErrInvalidLengthDiff + } + postmsgIndex := iNdEx + mapmsglen + if postmsgIndex < 0 { + return ErrInvalidLengthDiff + } + if postmsgIndex > l { + return io.ErrUnexpectedEOF + } + mapvalue = &types1.Any{} + if err := mapvalue.Unmarshal(dAtA[iNdEx:postmsgIndex]); err != nil { + return err + } + iNdEx = postmsgIndex + } else { + iNdEx = entryPreIndex + skippy, err := skipDiff(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDiff + } + if (iNdEx + skippy) > postIndex { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + m.Payloads[mapkey] = mapvalue + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipDiff(dAtA[iNdEx:]) diff --git a/api/services/diff/v1/diff.proto b/api/services/diff/v1/diff.proto index 66d7ecb19..ae2707a25 100644 --- a/api/services/diff/v1/diff.proto +++ b/api/services/diff/v1/diff.proto @@ -3,6 +3,7 @@ syntax = "proto3"; package containerd.services.diff.v1; import weak "gogoproto/gogo.proto"; +import "google/protobuf/any.proto"; import "github.com/containerd/containerd/api/types/mount.proto"; import "github.com/containerd/containerd/api/types/descriptor.proto"; @@ -25,6 +26,8 @@ message ApplyRequest { containerd.types.Descriptor diff = 1; repeated containerd.types.Mount mounts = 2; + + map payloads = 3; } message ApplyResponse { diff --git a/diff/diff.go b/diff/diff.go index 6dcef6ccb..bf3c514d2 100644 --- a/diff/diff.go +++ b/diff/diff.go @@ -97,3 +97,11 @@ func WithLabels(labels map[string]string) Opt { return nil } } + +// WithPayloads sets the apply processor payloads to the config +func WithPayloads(payloads map[string]*types.Any) ApplyOpt { + return func(c *ApplyConfig) error { + c.ProcessorPayloads = payloads + return nil + } +} diff --git a/diff/stream.go b/diff/stream.go index cb60ef61e..8a3571098 100644 --- a/diff/stream.go +++ b/diff/stream.go @@ -17,6 +17,7 @@ package diff import ( + "bytes" "context" "fmt" "io" @@ -188,19 +189,30 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string } const ( - payloadEnvVar = "STREAM_PROCESSOR_PAYLOAD" mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE" ) // NewBinaryProcessor returns a binary processor for use with processing content streams func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { cmd := exec.CommandContext(ctx, name, args...) + + var payloadC io.Closer if payload != nil { data, err := proto.Marshal(payload) if err != nil { return nil, err } - cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", payloadEnvVar, data)) + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + go func() { + io.Copy(w, bytes.NewReader(data)) + w.Close() + }() + + cmd.ExtraFiles = append(cmd.ExtraFiles, r) + payloadC = r } cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) var ( @@ -231,7 +243,9 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce if closer != nil { closer() } - + if payloadC != nil { + payloadC.Close() + } return &binaryProcessor{ cmd: cmd, r: r, diff --git a/services/diff/local.go b/services/diff/local.go index f60e1c658..f05b222db 100644 --- a/services/diff/local.go +++ b/services/diff/local.go @@ -100,6 +100,9 @@ func (l *local) Apply(ctx context.Context, er *diffapi.ApplyRequest, _ ...grpc.C ) var opts []diff.ApplyOpt + if er.Payloads != nil { + opts = append(opts, diff.WithPayloads(er.Payloads)) + } for _, differ := range l.differs { ocidesc, err = differ.Apply(ctx, desc, mounts, opts...) From 134d3c81597ccfae2d03d52f0772d06fd09a86c1 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 5 Aug 2019 15:05:22 +0000 Subject: [PATCH 4/8] Add windows apply code Signed-off-by: Michael Crosby --- diff/lcow/lcow.go | 33 ++++++++++++++++++--------------- diff/stream.go | 4 +--- diff/windows/windows.go | 27 ++++++++++++++------------- 3 files changed, 33 insertions(+), 31 deletions(-) diff --git a/diff/lcow/lcow.go b/diff/lcow/lcow.go index ad68d1e1a..59b812cfc 100644 --- a/diff/lcow/lcow.go +++ b/diff/lcow/lcow.go @@ -27,11 +27,9 @@ import ( "github.com/Microsoft/go-winio/pkg/security" "github.com/Microsoft/hcsshim/ext4/tar2ext4" - "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" @@ -108,34 +106,39 @@ func (s windowsLcowDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mou } }() + var config diff.ApplyConfig + for _, o := range opts { + if err := o(&config); err != nil { + return emptyDesc, errors.Wrap(err, "failed to apply config opt") + } + } + layer, _, err := mountsToLayerAndParents(mounts) if err != nil { return emptyDesc, err } - isCompressed, err := images.IsCompressedDiff(ctx, desc.MediaType) - if err != nil { - return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType) - } - ra, err := s.store.ReaderAt(ctx, desc) if err != nil { return emptyDesc, errors.Wrap(err, "failed to get reader from content store") } defer ra.Close() - rdr := content.NewReader(ra) - if isCompressed { - ds, err := compression.DecompressStream(rdr) - if err != nil { - return emptyDesc, err + + processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + for { + if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { + return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) + } + if processor.MediaType() == ocispec.MediaTypeImageLayer { + break } - defer ds.Close() - rdr = ds } + defer processor.Close() + // Calculate the Digest as we go digester := digest.Canonical.Digester() rc := &readCounter{ - r: io.TeeReader(rdr, digester.Hash()), + r: io.TeeReader(processor, digester.Hash()), } layerPath := path.Join(layer, "layer.vhd") diff --git a/diff/stream.go b/diff/stream.go index 8a3571098..b53b8b4cd 100644 --- a/diff/stream.go +++ b/diff/stream.go @@ -188,9 +188,7 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string } } -const ( - mediaTypeEnvVar = "STEAM_PROCESSOR_MEDIATYPE" -) +const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE" // NewBinaryProcessor returns a binary processor for use with processing content streams func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { diff --git a/diff/windows/windows.go b/diff/windows/windows.go index bf2887ad0..4ad3b875e 100644 --- a/diff/windows/windows.go +++ b/diff/windows/windows.go @@ -26,11 +26,9 @@ import ( winio "github.com/Microsoft/go-winio" "github.com/containerd/containerd/archive" - "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/content" "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" - "github.com/containerd/containerd/images" "github.com/containerd/containerd/log" "github.com/containerd/containerd/metadata" "github.com/containerd/containerd/mount" @@ -100,9 +98,11 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts } }() - isCompressed, err := images.IsCompressedDiff(ctx, desc.MediaType) - if err != nil { - return emptyDesc, errors.Wrapf(errdefs.ErrNotImplemented, "unsupported diff media type: %v", desc.MediaType) + var config diff.ApplyConfig + for _, o := range opts { + if err := o(&config); err != nil { + return emptyDesc, errors.Wrap(err, "failed to apply config opt") + } } ra, err := s.store.ReaderAt(ctx, desc) @@ -111,19 +111,20 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts } defer ra.Close() - r := content.NewReader(ra) - if isCompressed { - ds, err := compression.DecompressStream(r) - if err != nil { - return emptyDesc, err + processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + for { + if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { + return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) + } + if processor.MediaType() == ocispec.MediaTypeImageLayer { + break } - defer ds.Close() - r = ds } + defer processor.Close() digester := digest.Canonical.Digester() rc := &readCounter{ - r: io.TeeReader(r, digester.Hash()), + r: io.TeeReader(processor, digester.Hash()), } layer, parentLayerPaths, err := mountsToLayerAndParents(mounts) From e1489f93c3629fc68811d30a1dc3ddf6cb951d0d Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 5 Aug 2019 19:43:17 +0000 Subject: [PATCH 5/8] Use named pipes for windows processors Signed-off-by: Michael Crosby --- diff/stream.go | 91 --------------------------- diff/stream_unix.go | 118 +++++++++++++++++++++++++++++++++++ diff/stream_windows.go | 138 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 91 deletions(-) create mode 100644 diff/stream_unix.go create mode 100644 diff/stream_windows.go diff --git a/diff/stream.go b/diff/stream.go index b53b8b4cd..4b8f27f14 100644 --- a/diff/stream.go +++ b/diff/stream.go @@ -17,16 +17,12 @@ package diff import ( - "bytes" "context" - "fmt" "io" "os" - "os/exec" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/images" - "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -189,90 +185,3 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string } const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE" - -// NewBinaryProcessor returns a binary processor for use with processing content streams -func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { - cmd := exec.CommandContext(ctx, name, args...) - - var payloadC io.Closer - if payload != nil { - data, err := proto.Marshal(payload) - if err != nil { - return nil, err - } - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - go func() { - io.Copy(w, bytes.NewReader(data)) - w.Close() - }() - - cmd.ExtraFiles = append(cmd.ExtraFiles, r) - payloadC = r - } - cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) - var ( - stdin io.Reader - closer func() error - err error - ) - if f, ok := stream.(RawProcessor); ok { - stdin = f.File() - closer = f.File().Close - } else { - stdin = stream - } - cmd.Stdin = stdin - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - cmd.Stdout = w - - if err := cmd.Start(); err != nil { - return nil, err - } - go cmd.Wait() - - // close after start and dup - w.Close() - if closer != nil { - closer() - } - if payloadC != nil { - payloadC.Close() - } - return &binaryProcessor{ - cmd: cmd, - r: r, - mt: rmt, - }, nil -} - -type binaryProcessor struct { - cmd *exec.Cmd - r *os.File - mt string -} - -func (c *binaryProcessor) File() *os.File { - return c.r -} - -func (c *binaryProcessor) MediaType() string { - return c.mt -} - -func (c *binaryProcessor) Read(p []byte) (int, error) { - return c.r.Read(p) -} - -func (c *binaryProcessor) Close() error { - err := c.r.Close() - if kerr := c.cmd.Process.Kill(); err == nil { - err = kerr - } - return err -} diff --git a/diff/stream_unix.go b/diff/stream_unix.go new file mode 100644 index 000000000..75d1001f6 --- /dev/null +++ b/diff/stream_unix.go @@ -0,0 +1,118 @@ +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package diff + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" +) + +// NewBinaryProcessor returns a binary processor for use with processing content streams +func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { + cmd := exec.CommandContext(ctx, name, args...) + + var payloadC io.Closer + if payload != nil { + data, err := proto.Marshal(payload) + if err != nil { + return nil, err + } + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + go func() { + io.Copy(w, bytes.NewReader(data)) + w.Close() + }() + + cmd.ExtraFiles = append(cmd.ExtraFiles, r) + payloadC = r + } + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) + var ( + stdin io.Reader + closer func() error + err error + ) + if f, ok := stream.(RawProcessor); ok { + stdin = f.File() + closer = f.File().Close + } else { + stdin = stream + } + cmd.Stdin = stdin + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + cmd.Stdout = w + + if err := cmd.Start(); err != nil { + return nil, err + } + go cmd.Wait() + + // close after start and dup + w.Close() + if closer != nil { + closer() + } + if payloadC != nil { + payloadC.Close() + } + return &binaryProcessor{ + cmd: cmd, + r: r, + mt: rmt, + }, nil +} + +type binaryProcessor struct { + cmd *exec.Cmd + r *os.File + mt string +} + +func (c *binaryProcessor) File() *os.File { + return c.r +} + +func (c *binaryProcessor) MediaType() string { + return c.mt +} + +func (c *binaryProcessor) Read(p []byte) (int, error) { + return c.r.Read(p) +} + +func (c *binaryProcessor) Close() error { + err := c.r.Close() + if kerr := c.cmd.Process.Kill(); err == nil { + err = kerr + } + return err +} diff --git a/diff/stream_windows.go b/diff/stream_windows.go new file mode 100644 index 000000000..ba139b849 --- /dev/null +++ b/diff/stream_windows.go @@ -0,0 +1,138 @@ +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package diff + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + + winio "github.com/Microsoft/go-winio" + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" + "github.com/sirupsen/logrus" +) + +const processorPipe = "STREAM_PROCESSOR_PIPE" + +// NewBinaryProcessor returns a binary processor for use with processing content streams +func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { + cmd := exec.CommandContext(ctx, name, args...) + + if payload != nil { + data, err := proto.Marshal(payload) + if err != nil { + return nil, err + } + up, err := getUiqPath() + if err != nil { + return nil, err + } + path := fmt.Sprintf("\\\\.\\pipe\\containerd-processor-%s-pipe", up) + l, err := winio.ListenPipe(path, nil) + if err != nil { + return nil, err + } + go func() { + defer l.Close() + conn, err := l.Accept() + if err != nil { + logrus.WithError(err).Error("accept npipe connection") + return + } + io.Copy(conn, bytes.NewReader(data)) + conn.Close() + }() + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", processorPipe, path)) + } + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) + var ( + stdin io.Reader + closer func() error + err error + ) + if f, ok := stream.(RawProcessor); ok { + stdin = f.File() + closer = f.File().Close + } else { + stdin = stream + } + cmd.Stdin = stdin + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + cmd.Stdout = w + + if err := cmd.Start(); err != nil { + return nil, err + } + go cmd.Wait() + + // close after start and dup + w.Close() + if closer != nil { + closer() + } + return &binaryProcessor{ + cmd: cmd, + r: r, + mt: rmt, + }, nil +} + +type binaryProcessor struct { + cmd *exec.Cmd + r *os.File + mt string +} + +func (c *binaryProcessor) File() *os.File { + return c.r +} + +func (c *binaryProcessor) MediaType() string { + return c.mt +} + +func (c *binaryProcessor) Read(p []byte) (int, error) { + return c.r.Read(p) +} + +func (c *binaryProcessor) Close() error { + err := c.r.Close() + if kerr := c.cmd.Process.Kill(); err == nil { + err = kerr + } + return err +} + +func getUiqPath() (string, error) { + dir, err := ioutil.TempDir("", "") + if err != nil { + return "", err + } + os.Remove(dir) + return filepath.Base(dir), nil +} From 26b90619e2de608d433351c65578180932ffda9c Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 5 Aug 2019 20:40:52 +0000 Subject: [PATCH 6/8] Pass apply opts through rootfs/* code Signed-off-by: Michael Crosby --- rootfs/apply.go | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/rootfs/apply.go b/rootfs/apply.go index 3ea830f6b..73d4ccca5 100644 --- a/rootfs/apply.go +++ b/rootfs/apply.go @@ -48,6 +48,14 @@ type Layer struct { // Layers are applied in order they are given, making the first layer the // bottom-most layer in the layer chain. func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Applier) (digest.Digest, error) { + return ApplyLayersWithOpts(ctx, layers, sn, a, nil) +} + +// ApplyLayersWithOpts applies all the layers using the given snapshotter, applier, and apply opts. +// The returned result is a chain id digest representing all the applied layers. +// Layers are applied in order they are given, making the first layer the +// bottom-most layer in the layer chain. +func ApplyLayersWithOpts(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, a diff.Applier, applyOpts []diff.ApplyOpt) (digest.Digest, error) { chain := make([]digest.Digest, len(layers)) for i, layer := range layers { chain[i] = layer.Diff.Digest @@ -63,7 +71,7 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, return "", errors.Wrapf(err, "failed to stat snapshot %s", chainID) } - if err := applyLayers(ctx, layers, chain, sn, a); err != nil && !errdefs.IsAlreadyExists(err) { + if err := applyLayers(ctx, layers, chain, sn, a, nil, applyOpts); err != nil && !errdefs.IsAlreadyExists(err) { return "", err } } @@ -75,6 +83,13 @@ func ApplyLayers(ctx context.Context, layers []Layer, sn snapshots.Snapshotter, // using the provided snapshotter and applier. If the layer was unpacked true // is returned, if the layer already exists false is returned. func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) (bool, error) { + return ApplyLayerWithOpts(ctx, layer, chain, sn, a, opts, nil) +} + +// ApplyLayerWithOpts applies a single layer on top of the given provided layer chain, +// using the provided snapshotter, applier, and apply opts. If the layer was unpacked true +// is returned, if the layer already exists false is returned. +func ApplyLayerWithOpts(ctx context.Context, layer Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts []snapshots.Opt, applyOpts []diff.ApplyOpt) (bool, error) { var ( chainID = identity.ChainID(append(chain, layer.Diff.Digest)).String() applied bool @@ -84,7 +99,7 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap return false, errors.Wrapf(err, "failed to stat snapshot %s", chainID) } - if err := applyLayers(ctx, []Layer{layer}, append(chain, layer.Diff.Digest), sn, a, opts...); err != nil { + if err := applyLayers(ctx, []Layer{layer}, append(chain, layer.Diff.Digest), sn, a, opts, applyOpts); err != nil { if !errdefs.IsAlreadyExists(err) { return false, err } @@ -93,9 +108,10 @@ func ApplyLayer(ctx context.Context, layer Layer, chain []digest.Digest, sn snap } } return applied, nil + } -func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts ...snapshots.Opt) error { +func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn snapshots.Snapshotter, a diff.Applier, opts []snapshots.Opt, applyOpts []diff.ApplyOpt) error { var ( parent = identity.ChainID(chain[:len(chain)-1]) chainID = identity.ChainID(chain) @@ -113,7 +129,7 @@ func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn mounts, err = sn.Prepare(ctx, key, parent.String(), opts...) if err != nil { if errdefs.IsNotFound(err) && len(layers) > 1 { - if err := applyLayers(ctx, layers[:len(layers)-1], chain[:len(chain)-1], sn, a); err != nil { + if err := applyLayers(ctx, layers[:len(layers)-1], chain[:len(chain)-1], sn, a, nil, applyOpts); err != nil { if !errdefs.IsAlreadyExists(err) { return err } @@ -144,7 +160,7 @@ func applyLayers(ctx context.Context, layers []Layer, chain []digest.Digest, sn } }() - diff, err = a.Apply(ctx, layer.Blob, mounts) + diff, err = a.Apply(ctx, layer.Blob, mounts, applyOpts...) if err != nil { err = errors.Wrapf(err, "failed to extract layer %s", layer.Diff.Digest) return err From 3fded74bc73a7824af1388400940207207902273 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 6 Aug 2019 18:15:53 +0000 Subject: [PATCH 7/8] Add unpack opts Signed-off-by: Michael Crosby --- diff.go | 7 ++++--- diff/apply/apply.go | 2 +- diff/diff.go | 4 ++-- diff/lcow/lcow.go | 2 +- diff/windows/windows.go | 2 +- image.go | 26 +++++++++++++++++++++++--- 6 files changed, 32 insertions(+), 11 deletions(-) diff --git a/diff.go b/diff.go index 182362941..445df0192 100644 --- a/diff.go +++ b/diff.go @@ -48,13 +48,14 @@ type diffRemote struct { func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (ocispec.Descriptor, error) { var config diff.ApplyConfig for _, opt := range opts { - if err := opt(&config); err != nil { + if err := opt(ctx, desc, &config); err != nil { return ocispec.Descriptor{}, err } } req := &diffapi.ApplyRequest{ - Diff: fromDescriptor(desc), - Mounts: fromMounts(mounts), + Diff: fromDescriptor(desc), + Mounts: fromMounts(mounts), + Payloads: config.ProcessorPayloads, } resp, err := r.client.Apply(ctx, req) if err != nil { diff --git a/diff/apply/apply.go b/diff/apply/apply.go index 99a6b802e..50ee057f9 100644 --- a/diff/apply/apply.go +++ b/diff/apply/apply.go @@ -65,7 +65,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ var config diff.ApplyConfig for _, o := range opts { - if err := o(&config); err != nil { + if err := o(ctx, desc, &config); err != nil { return emptyDesc, errors.Wrap(err, "failed to apply config opt") } } diff --git a/diff/diff.go b/diff/diff.go index bf3c514d2..17aab616e 100644 --- a/diff/diff.go +++ b/diff/diff.go @@ -59,7 +59,7 @@ type ApplyConfig struct { } // ApplyOpt is used to configure an Apply operation -type ApplyOpt func(*ApplyConfig) error +type ApplyOpt func(context.Context, ocispec.Descriptor, *ApplyConfig) error // Applier allows applying diffs between mounts type Applier interface { @@ -100,7 +100,7 @@ func WithLabels(labels map[string]string) Opt { // WithPayloads sets the apply processor payloads to the config func WithPayloads(payloads map[string]*types.Any) ApplyOpt { - return func(c *ApplyConfig) error { + return func(_ context.Context, _ ocispec.Descriptor, c *ApplyConfig) error { c.ProcessorPayloads = payloads return nil } diff --git a/diff/lcow/lcow.go b/diff/lcow/lcow.go index 59b812cfc..05ce3fb21 100644 --- a/diff/lcow/lcow.go +++ b/diff/lcow/lcow.go @@ -108,7 +108,7 @@ func (s windowsLcowDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mou var config diff.ApplyConfig for _, o := range opts { - if err := o(&config); err != nil { + if err := o(ctx, desc, &config); err != nil { return emptyDesc, errors.Wrap(err, "failed to apply config opt") } } diff --git a/diff/windows/windows.go b/diff/windows/windows.go index 4ad3b875e..ce584dc27 100644 --- a/diff/windows/windows.go +++ b/diff/windows/windows.go @@ -100,7 +100,7 @@ func (s windowsDiff) Apply(ctx context.Context, desc ocispec.Descriptor, mounts var config diff.ApplyConfig for _, o := range opts { - if err := o(&config); err != nil { + if err := o(ctx, desc, &config); err != nil { return emptyDesc, errors.Wrap(err, "failed to apply config opt") } } diff --git a/image.go b/image.go index 77c95eaa4..9cfc03a30 100644 --- a/image.go +++ b/image.go @@ -21,10 +21,12 @@ import ( "fmt" "github.com/containerd/containerd/content" + "github.com/containerd/containerd/diff" "github.com/containerd/containerd/errdefs" "github.com/containerd/containerd/images" "github.com/containerd/containerd/platforms" "github.com/containerd/containerd/rootfs" + "github.com/containerd/containerd/snapshots" "github.com/opencontainers/go-digest" "github.com/opencontainers/image-spec/identity" ocispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -40,7 +42,7 @@ type Image interface { // Labels of the image Labels() map[string]string // Unpack unpacks the image's content into a snapshot - Unpack(context.Context, string) error + Unpack(context.Context, string, ...UnpackOpt) error // RootFS returns the unpacked diffids that make up images rootfs. RootFS(ctx context.Context) ([]digest.Digest, error) // Size returns the total size of the image's packed resources. @@ -130,13 +132,31 @@ func (i *image) IsUnpacked(ctx context.Context, snapshotterName string) (bool, e return false, nil } -func (i *image) Unpack(ctx context.Context, snapshotterName string) error { +// UnpackConfig provides configuration for the unpack of an image +type UnpackConfig struct { + // ApplyOpts for applying a diff to a snapshotter + ApplyOpts []diff.ApplyOpt + // SnapshotOpts for configuring a snapshotter + SnapshotOpts []snapshots.Opt +} + +// UnpackOpt provides configuration for unpack +type UnpackOpt func(context.Context, *UnpackConfig) error + +func (i *image) Unpack(ctx context.Context, snapshotterName string, opts ...UnpackOpt) error { ctx, done, err := i.client.WithLease(ctx) if err != nil { return err } defer done(ctx) + var config UnpackConfig + for _, o := range opts { + if err := o(ctx, &config); err != nil { + return err + } + } + layers, err := i.getLayers(ctx, i.platform) if err != nil { return err @@ -158,7 +178,7 @@ func (i *image) Unpack(ctx context.Context, snapshotterName string) error { return err } for _, layer := range layers { - unpacked, err = rootfs.ApplyLayer(ctx, layer, chain, sn, a) + unpacked, err = rootfs.ApplyLayerWithOpts(ctx, layer, chain, sn, a, config.SnapshotOpts, config.ApplyOpts) if err != nil { return err } From 552a0b1be5e196c3d468ab6509bf9b849328b625 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 6 Aug 2019 21:06:18 +0000 Subject: [PATCH 8/8] Handle stderr in stream processors Signed-off-by: Michael Crosby --- diff/apply/apply.go | 13 ++++++++++++ diff/stream_unix.go | 46 +++++++++++++++++++++++++++++++++--------- diff/stream_windows.go | 45 ++++++++++++++++++++++++++++++++--------- 3 files changed, 86 insertions(+), 18 deletions(-) diff --git a/diff/apply/apply.go b/diff/apply/apply.go index 50ee057f9..7a6b65c3e 100644 --- a/diff/apply/apply.go +++ b/diff/apply/apply.go @@ -76,11 +76,14 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ } defer ra.Close() + var processors []diff.StreamProcessor processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra)) + processors = append(processors, processor) for { if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil { return emptyDesc, errors.Wrapf(err, "failed to get stream processor for %s", desc.MediaType) } + processors = append(processors, processor) if processor.MediaType() == ocispec.MediaTypeImageLayer { break } @@ -102,6 +105,16 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ }); err != nil { return emptyDesc, err } + + for _, p := range processors { + if ep, ok := p.(interface { + Err() error + }); ok { + if err := ep.Err(); err != nil { + return emptyDesc, err + } + } + } return ocispec.Descriptor{ MediaType: ocispec.MediaTypeImageLayer, Size: rc.c, diff --git a/diff/stream_unix.go b/diff/stream_unix.go index 75d1001f6..28f38d998 100644 --- a/diff/stream_unix.go +++ b/diff/stream_unix.go @@ -25,14 +25,17 @@ import ( "io" "os" "os/exec" + "sync" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + "github.com/pkg/errors" ) // NewBinaryProcessor returns a binary processor for use with processing content streams func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { cmd := exec.CommandContext(ctx, name, args...) + cmd.Env = os.Environ() var payloadC io.Closer if payload != nil { @@ -71,10 +74,19 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce } cmd.Stdout = w + stderr := bytes.NewBuffer(nil) + cmd.Stderr = stderr + if err := cmd.Start(); err != nil { return nil, err } - go cmd.Wait() + p := &binaryProcessor{ + cmd: cmd, + r: r, + mt: rmt, + stderr: stderr, + } + go p.wait() // close after start and dup w.Close() @@ -84,17 +96,33 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce if payloadC != nil { payloadC.Close() } - return &binaryProcessor{ - cmd: cmd, - r: r, - mt: rmt, - }, nil + return p, nil } type binaryProcessor struct { - cmd *exec.Cmd - r *os.File - mt string + cmd *exec.Cmd + r *os.File + mt string + stderr *bytes.Buffer + + mu sync.Mutex + err error +} + +func (c *binaryProcessor) Err() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.err +} + +func (c *binaryProcessor) wait() { + if err := c.cmd.Wait(); err != nil { + if _, ok := err.(*exec.ExitError); ok { + c.mu.Lock() + c.err = errors.New(c.stderr.String()) + c.mu.Unlock() + } + } } func (c *binaryProcessor) File() *os.File { diff --git a/diff/stream_windows.go b/diff/stream_windows.go index ba139b849..8dadd72c9 100644 --- a/diff/stream_windows.go +++ b/diff/stream_windows.go @@ -27,10 +27,12 @@ import ( "os" "os/exec" "path/filepath" + "sync" winio "github.com/Microsoft/go-winio" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) @@ -39,6 +41,7 @@ const processorPipe = "STREAM_PROCESSOR_PIPE" // NewBinaryProcessor returns a binary processor for use with processing content streams func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { cmd := exec.CommandContext(ctx, name, args...) + cmd.Env = os.Environ() if payload != nil { data, err := proto.Marshal(payload) @@ -84,28 +87,52 @@ func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProce return nil, err } cmd.Stdout = w + stderr := bytes.NewBuffer(nil) + cmd.Stderr = stderr if err := cmd.Start(); err != nil { return nil, err } - go cmd.Wait() + p := &binaryProcessor{ + cmd: cmd, + r: r, + mt: rmt, + stderr: stderr, + } + go p.wait() // close after start and dup w.Close() if closer != nil { closer() } - return &binaryProcessor{ - cmd: cmd, - r: r, - mt: rmt, - }, nil + return p, nil } type binaryProcessor struct { - cmd *exec.Cmd - r *os.File - mt string + cmd *exec.Cmd + r *os.File + mt string + stderr *bytes.Buffer + + mu sync.Mutex + err error +} + +func (c *binaryProcessor) Err() error { + c.mu.Lock() + defer c.mu.Unlock() + return c.err +} + +func (c *binaryProcessor) wait() { + if err := c.cmd.Wait(); err != nil { + if _, ok := err.(*exec.ExitError); ok { + c.mu.Lock() + c.err = errors.New(c.stderr.String()) + c.mu.Unlock() + } + } } func (c *binaryProcessor) File() *os.File {