Use named pipes for windows processors

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2019-08-05 19:43:17 +00:00
parent 134d3c8159
commit e1489f93c3
3 changed files with 256 additions and 91 deletions

View File

@ -17,16 +17,12 @@
package diff package diff
import ( import (
"bytes"
"context" "context"
"fmt"
"io" "io"
"os" "os"
"os/exec"
"github.com/containerd/containerd/archive/compression" "github.com/containerd/containerd/archive/compression"
"github.com/containerd/containerd/images" "github.com/containerd/containerd/images"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types" "github.com/gogo/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -189,90 +185,3 @@ func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string
} }
const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE" const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE"
// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)
var payloadC io.Closer
if payload != nil {
data, err := proto.Marshal(payload)
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
go func() {
io.Copy(w, bytes.NewReader(data))
w.Close()
}()
cmd.ExtraFiles = append(cmd.ExtraFiles, r)
payloadC = r
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w
if err := cmd.Start(); err != nil {
return nil, err
}
go cmd.Wait()
// close after start and dup
w.Close()
if closer != nil {
closer()
}
if payloadC != nil {
payloadC.Close()
}
return &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
}, nil
}
type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
}
func (c *binaryProcessor) File() *os.File {
return c.r
}
func (c *binaryProcessor) MediaType() string {
return c.mt
}
func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}
func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}

118
diff/stream_unix.go Normal file
View File

@ -0,0 +1,118 @@
// +build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
)
// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)
var payloadC io.Closer
if payload != nil {
data, err := proto.Marshal(payload)
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
go func() {
io.Copy(w, bytes.NewReader(data))
w.Close()
}()
cmd.ExtraFiles = append(cmd.ExtraFiles, r)
payloadC = r
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w
if err := cmd.Start(); err != nil {
return nil, err
}
go cmd.Wait()
// close after start and dup
w.Close()
if closer != nil {
closer()
}
if payloadC != nil {
payloadC.Close()
}
return &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
}, nil
}
type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
}
func (c *binaryProcessor) File() *os.File {
return c.r
}
func (c *binaryProcessor) MediaType() string {
return c.mt
}
func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}
func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}

138
diff/stream_windows.go Normal file
View File

@ -0,0 +1,138 @@
// +build windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
winio "github.com/Microsoft/go-winio"
"github.com/gogo/protobuf/proto"
"github.com/gogo/protobuf/types"
"github.com/sirupsen/logrus"
)
const processorPipe = "STREAM_PROCESSOR_PIPE"
// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args []string, payload *types.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)
if payload != nil {
data, err := proto.Marshal(payload)
if err != nil {
return nil, err
}
up, err := getUiqPath()
if err != nil {
return nil, err
}
path := fmt.Sprintf("\\\\.\\pipe\\containerd-processor-%s-pipe", up)
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}
go func() {
defer l.Close()
conn, err := l.Accept()
if err != nil {
logrus.WithError(err).Error("accept npipe connection")
return
}
io.Copy(conn, bytes.NewReader(data))
conn.Close()
}()
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", processorPipe, path))
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w
if err := cmd.Start(); err != nil {
return nil, err
}
go cmd.Wait()
// close after start and dup
w.Close()
if closer != nil {
closer()
}
return &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
}, nil
}
type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
}
func (c *binaryProcessor) File() *os.File {
return c.r
}
func (c *binaryProcessor) MediaType() string {
return c.mt
}
func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}
func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}
func getUiqPath() (string, error) {
dir, err := ioutil.TempDir("", "")
if err != nil {
return "", err
}
os.Remove(dir)
return filepath.Base(dir), nil
}