From e1489f93c3629fc68811d30a1dc3ddf6cb951d0d Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 5 Aug 2019 19:43:17 +0000 Subject: [PATCH] Use named pipes for windows processors Signed-off-by: Michael Crosby --- diff/stream.go | 91 --------------------------- diff/stream_unix.go | 118 +++++++++++++++++++++++++++++++++++ diff/stream_windows.go | 138 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 91 deletions(-) create mode 100644 diff/stream_unix.go create mode 100644 diff/stream_windows.go diff --git a/diff/stream.go b/diff/stream.go index b53b8b4cd..4b8f27f14 100644 --- a/diff/stream.go +++ b/diff/stream.go @@ -17,16 +17,12 @@ package diff import ( - "bytes" "context" - "fmt" "io" "os" - "os/exec" "github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/images" - "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" @@ -189,90 +185,3 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string } const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE" - -// NewBinaryProcessor returns a binary processor for use with processing content streams -func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { - cmd := exec.CommandContext(ctx, name, args...) - - var payloadC io.Closer - if payload != nil { - data, err := proto.Marshal(payload) - if err != nil { - return nil, err - } - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - go func() { - io.Copy(w, bytes.NewReader(data)) - w.Close() - }() - - cmd.ExtraFiles = append(cmd.ExtraFiles, r) - payloadC = r - } - cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) - var ( - stdin io.Reader - closer func() error - err error - ) - if f, ok := stream.(RawProcessor); ok { - stdin = f.File() - closer = f.File().Close - } else { - stdin = stream - } - cmd.Stdin = stdin - r, w, err := os.Pipe() - if err != nil { - return nil, err - } - cmd.Stdout = w - - if err := cmd.Start(); err != nil { - return nil, err - } - go cmd.Wait() - - // close after start and dup - w.Close() - if closer != nil { - closer() - } - if payloadC != nil { - payloadC.Close() - } - return &binaryProcessor{ - cmd: cmd, - r: r, - mt: rmt, - }, nil -} - -type binaryProcessor struct { - cmd *exec.Cmd - r *os.File - mt string -} - -func (c *binaryProcessor) File() *os.File { - return c.r -} - -func (c *binaryProcessor) MediaType() string { - return c.mt -} - -func (c *binaryProcessor) Read(p []byte) (int, error) { - return c.r.Read(p) -} - -func (c *binaryProcessor) Close() error { - err := c.r.Close() - if kerr := c.cmd.Process.Kill(); err == nil { - err = kerr - } - return err -} diff --git a/diff/stream_unix.go b/diff/stream_unix.go new file mode 100644 index 000000000..75d1001f6 --- /dev/null +++ b/diff/stream_unix.go @@ -0,0 +1,118 @@ +// +build !windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package diff + +import ( + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" +) + +// NewBinaryProcessor returns a binary processor for use with processing content streams +func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { + cmd := exec.CommandContext(ctx, name, args...) + + var payloadC io.Closer + if payload != nil { + data, err := proto.Marshal(payload) + if err != nil { + return nil, err + } + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + go func() { + io.Copy(w, bytes.NewReader(data)) + w.Close() + }() + + cmd.ExtraFiles = append(cmd.ExtraFiles, r) + payloadC = r + } + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) + var ( + stdin io.Reader + closer func() error + err error + ) + if f, ok := stream.(RawProcessor); ok { + stdin = f.File() + closer = f.File().Close + } else { + stdin = stream + } + cmd.Stdin = stdin + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + cmd.Stdout = w + + if err := cmd.Start(); err != nil { + return nil, err + } + go cmd.Wait() + + // close after start and dup + w.Close() + if closer != nil { + closer() + } + if payloadC != nil { + payloadC.Close() + } + return &binaryProcessor{ + cmd: cmd, + r: r, + mt: rmt, + }, nil +} + +type binaryProcessor struct { + cmd *exec.Cmd + r *os.File + mt string +} + +func (c *binaryProcessor) File() *os.File { + return c.r +} + +func (c *binaryProcessor) MediaType() string { + return c.mt +} + +func (c *binaryProcessor) Read(p []byte) (int, error) { + return c.r.Read(p) +} + +func (c *binaryProcessor) Close() error { + err := c.r.Close() + if kerr := c.cmd.Process.Kill(); err == nil { + err = kerr + } + return err +} diff --git a/diff/stream_windows.go b/diff/stream_windows.go new file mode 100644 index 000000000..ba139b849 --- /dev/null +++ b/diff/stream_windows.go @@ -0,0 +1,138 @@ +// +build windows + +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package diff + +import ( + "bytes" + "context" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + + winio "github.com/Microsoft/go-winio" + "github.com/gogo/protobuf/proto" + "github.com/gogo/protobuf/types" + "github.com/sirupsen/logrus" +) + +const processorPipe = "STREAM_PROCESSOR_PIPE" + +// NewBinaryProcessor returns a binary processor for use with processing content streams +func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) { + cmd := exec.CommandContext(ctx, name, args...) + + if payload != nil { + data, err := proto.Marshal(payload) + if err != nil { + return nil, err + } + up, err := getUiqPath() + if err != nil { + return nil, err + } + path := fmt.Sprintf("\\\\.\\pipe\\containerd-processor-%s-pipe", up) + l, err := winio.ListenPipe(path, nil) + if err != nil { + return nil, err + } + go func() { + defer l.Close() + conn, err := l.Accept() + if err != nil { + logrus.WithError(err).Error("accept npipe connection") + return + } + io.Copy(conn, bytes.NewReader(data)) + conn.Close() + }() + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", processorPipe, path)) + } + cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt)) + var ( + stdin io.Reader + closer func() error + err error + ) + if f, ok := stream.(RawProcessor); ok { + stdin = f.File() + closer = f.File().Close + } else { + stdin = stream + } + cmd.Stdin = stdin + r, w, err := os.Pipe() + if err != nil { + return nil, err + } + cmd.Stdout = w + + if err := cmd.Start(); err != nil { + return nil, err + } + go cmd.Wait() + + // close after start and dup + w.Close() + if closer != nil { + closer() + } + return &binaryProcessor{ + cmd: cmd, + r: r, + mt: rmt, + }, nil +} + +type binaryProcessor struct { + cmd *exec.Cmd + r *os.File + mt string +} + +func (c *binaryProcessor) File() *os.File { + return c.r +} + +func (c *binaryProcessor) MediaType() string { + return c.mt +} + +func (c *binaryProcessor) Read(p []byte) (int, error) { + return c.r.Read(p) +} + +func (c *binaryProcessor) Close() error { + err := c.r.Close() + if kerr := c.cmd.Process.Kill(); err == nil { + err = kerr + } + return err +} + +func getUiqPath() (string, error) { + dir, err := ioutil.TempDir("", "") + if err != nil { + return "", err + } + os.Remove(dir) + return filepath.Base(dir), nil +}