Move diff to core/diff

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-01-17 09:51:17 -08:00
parent d14350376e
commit 913edcd489
29 changed files with 24 additions and 24 deletions

131
core/diff/apply/apply.go Normal file
View File

@@ -0,0 +1,131 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"context"
"fmt"
"io"
"time"
"github.com/containerd/containerd/v2/core/content"
"github.com/containerd/containerd/v2/core/diff"
"github.com/containerd/containerd/v2/mount"
"github.com/containerd/log"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// NewFileSystemApplier returns an applier which simply mounts
// and applies diff onto the mounted filesystem.
func NewFileSystemApplier(cs content.Provider) diff.Applier {
return &fsApplier{
store: cs,
}
}
type fsApplier struct {
store content.Provider
}
var emptyDesc = ocispec.Descriptor{}
// Apply applies the content associated with the provided digests onto the
// provided mounts. Archive content will be extracted and decompressed if
// necessary.
func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (d ocispec.Descriptor, err error) {
t1 := time.Now()
defer func() {
if err == nil {
log.G(ctx).WithFields(log.Fields{
"d": time.Since(t1),
"digest": desc.Digest,
"size": desc.Size,
"media": desc.MediaType,
}).Debugf("diff applied")
}
}()
var config diff.ApplyConfig
for _, o := range opts {
if err := o(ctx, desc, &config); err != nil {
return emptyDesc, fmt.Errorf("failed to apply config opt: %w", err)
}
}
ra, err := s.store.ReaderAt(ctx, desc)
if err != nil {
return emptyDesc, fmt.Errorf("failed to get reader from content store: %w", err)
}
defer ra.Close()
var processors []diff.StreamProcessor
processor := diff.NewProcessorChain(desc.MediaType, content.NewReader(ra))
processors = append(processors, processor)
for {
if processor, err = diff.GetProcessor(ctx, processor, config.ProcessorPayloads); err != nil {
return emptyDesc, fmt.Errorf("failed to get stream processor for %s: %w", desc.MediaType, err)
}
processors = append(processors, processor)
if processor.MediaType() == ocispec.MediaTypeImageLayer {
break
}
}
defer processor.Close()
digester := digest.Canonical.Digester()
rc := &readCounter{
r: io.TeeReader(processor, digester.Hash()),
}
if err := apply(ctx, mounts, rc, config.SyncFs); err != nil {
return emptyDesc, err
}
// Read any trailing data
if _, err := io.Copy(io.Discard, rc); err != nil {
return emptyDesc, err
}
for _, p := range processors {
if ep, ok := p.(interface {
Err() error
}); ok {
if err := ep.Err(); err != nil {
return emptyDesc, err
}
}
}
return ocispec.Descriptor{
MediaType: ocispec.MediaTypeImageLayer,
Size: rc.c,
Digest: digester.Digest(),
}, nil
}
type readCounter struct {
r io.Reader
c int64
}
func (rc *readCounter) Read(p []byte) (n int, err error) {
n, err = rc.r.Read(p)
if n > 0 {
rc.c += int64(n)
}
return
}

View File

@@ -0,0 +1,49 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"context"
"io"
"os"
"github.com/containerd/containerd/v2/archive"
"github.com/containerd/containerd/v2/mount"
)
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, _sync bool) error {
// We currently do not support mounts nor bind mounts on MacOS in the containerd daemon.
// Using this as an exception to enable native snapshotter and allow further research.
if len(mounts) == 1 && mounts[0].Type == "bind" {
opts := []archive.ApplyOpt{}
if os.Getuid() != 0 {
opts = append(opts, archive.WithNoSameOwner())
}
path := mounts[0].Source
_, err := archive.Apply(ctx, path, r, opts...)
return err
// TODO: Do we need to sync all the filesystems?
}
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
return err
})
}

View File

@@ -0,0 +1,105 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"context"
"fmt"
"io"
"os"
"strings"
"github.com/containerd/containerd/v2/archive"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/containerd/v2/mount"
"github.com/containerd/containerd/v2/pkg/userns"
"golang.org/x/sys/unix"
)
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, sync bool) (retErr error) {
switch {
case len(mounts) == 1 && mounts[0].Type == "overlay":
// OverlayConvertWhiteout (mknod c 0 0) doesn't work in userns.
// https://github.com/containerd/containerd/issues/3762
if userns.RunningInUserNS() {
break
}
path, parents, err := getOverlayPath(mounts[0].Options)
if err != nil {
if errdefs.IsInvalidArgument(err) {
break
}
return err
}
opts := []archive.ApplyOpt{
archive.WithConvertWhiteout(archive.OverlayConvertWhiteout),
}
if len(parents) > 0 {
opts = append(opts, archive.WithParents(parents))
}
_, err = archive.Apply(ctx, path, r, opts...)
if err == nil && sync {
err = doSyncFs(path)
}
return err
case sync && len(mounts) == 1 && mounts[0].Type == "bind":
defer func() {
if retErr != nil {
return
}
retErr = doSyncFs(mounts[0].Source)
}()
}
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
return err
})
}
func getOverlayPath(options []string) (upper string, lower []string, err error) {
const upperdirPrefix = "upperdir="
const lowerdirPrefix = "lowerdir="
for _, o := range options {
if strings.HasPrefix(o, upperdirPrefix) {
upper = strings.TrimPrefix(o, upperdirPrefix)
} else if strings.HasPrefix(o, lowerdirPrefix) {
lower = strings.Split(strings.TrimPrefix(o, lowerdirPrefix), ":")
}
}
if upper == "" {
return "", nil, fmt.Errorf("upperdir not found: %w", errdefs.ErrInvalidArgument)
}
return
}
func doSyncFs(file string) error {
fd, err := os.Open(file)
if err != nil {
return fmt.Errorf("failed to open %s: %w", file, err)
}
defer fd.Close()
_, _, errno := unix.Syscall(unix.SYS_SYNCFS, fd.Fd(), 0, 0)
if errno != 0 {
return fmt.Errorf("failed to syncfs for %s: %w", file, errno)
}
return nil
}

View File

@@ -0,0 +1,41 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"testing"
)
func TestGetOverlayPath(t *testing.T) {
good := []string{"upperdir=/test/upper", "lowerdir=/test/lower1:/test/lower2", "workdir=/test/work"}
path, parents, err := getOverlayPath(good)
if err != nil {
t.Fatalf("Get overlay path failed: %v", err)
}
if path != "/test/upper" {
t.Fatalf("Unexpected upperdir: %q", path)
}
if len(parents) != 2 || parents[0] != "/test/lower1" || parents[1] != "/test/lower2" {
t.Fatalf("Unexpected parents: %v", parents)
}
bad := []string{"lowerdir=/test/lower"}
_, _, err = getOverlayPath(bad)
if err == nil {
t.Fatalf("An error is expected")
}
}

View File

@@ -0,0 +1,35 @@
//go:build !linux && !darwin
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package apply
import (
"context"
"io"
"github.com/containerd/containerd/v2/archive"
"github.com/containerd/containerd/v2/mount"
)
func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, _sync bool) error {
// TODO: for windows, how to sync?
return mount.WithTempMount(ctx, mounts, func(root string) error {
_, err := archive.Apply(ctx, root, r)
return err
})
}

148
core/diff/diff.go Normal file
View File

@@ -0,0 +1,148 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"context"
"io"
"time"
"github.com/containerd/containerd/v2/mount"
"github.com/containerd/typeurl/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
// Config is used to hold parameters needed for a diff operation
type Config struct {
// MediaType is the type of diff to generate
// Default depends on the differ,
// i.e. application/vnd.oci.image.layer.v1.tar+gzip
MediaType string
// Reference is the content upload reference
// Default will use a random reference string
Reference string
// Labels are the labels to apply to the generated content
Labels map[string]string
// Compressor is a function to compress the diff stream
// instead of the default gzip compressor. Differ passes
// the MediaType of the target diff content to the compressor.
// When using this config, MediaType must be specified as well.
Compressor func(dest io.Writer, mediaType string) (io.WriteCloser, error)
// SourceDateEpoch specifies the SOURCE_DATE_EPOCH without touching the env vars.
SourceDateEpoch *time.Time
}
// Opt is used to configure a diff operation
type Opt func(*Config) error
// Comparer allows creation of filesystem diffs between mounts
type Comparer interface {
// Compare computes the difference between two mounts and returns a
// descriptor for the computed diff. The options can provide
// a ref which can be used to track the content creation of the diff.
// The media type which is used to determine the format of the created
// content can also be provided as an option.
Compare(ctx context.Context, lower, upper []mount.Mount, opts ...Opt) (ocispec.Descriptor, error)
}
// ApplyConfig is used to hold parameters needed for a apply operation
type ApplyConfig struct {
// ProcessorPayloads specifies the payload sent to various processors
ProcessorPayloads map[string]typeurl.Any
// SyncFs is to synchronize the underlying filesystem containing files
SyncFs bool
}
// ApplyOpt is used to configure an Apply operation
type ApplyOpt func(context.Context, ocispec.Descriptor, *ApplyConfig) error
// Applier allows applying diffs between mounts
type Applier interface {
// Apply applies the content referred to by the given descriptor to
// the provided mount. The method of applying is based on the
// implementation and content descriptor. For example, in the common
// case the descriptor is a file system difference in tar format,
// that tar would be applied on top of the mounts.
Apply(ctx context.Context, desc ocispec.Descriptor, mount []mount.Mount, opts ...ApplyOpt) (ocispec.Descriptor, error)
}
// WithCompressor sets the function to be used to compress the diff stream.
func WithCompressor(f func(dest io.Writer, mediaType string) (io.WriteCloser, error)) Opt {
return func(c *Config) error {
c.Compressor = f
return nil
}
}
// WithMediaType sets the media type to use for creating the diff, without
// specifying the differ will choose a default.
func WithMediaType(m string) Opt {
return func(c *Config) error {
c.MediaType = m
return nil
}
}
// WithReference is used to set the content upload reference used by
// the diff operation. This allows the caller to track the upload through
// the content store.
func WithReference(ref string) Opt {
return func(c *Config) error {
c.Reference = ref
return nil
}
}
// WithLabels is used to set content labels on the created diff content.
func WithLabels(labels map[string]string) Opt {
return func(c *Config) error {
c.Labels = labels
return nil
}
}
// WithPayloads sets the apply processor payloads to the config
func WithPayloads(payloads map[string]typeurl.Any) ApplyOpt {
return func(_ context.Context, _ ocispec.Descriptor, c *ApplyConfig) error {
c.ProcessorPayloads = payloads
return nil
}
}
// WithSyncFs sets sync flag to the config.
func WithSyncFs(sync bool) ApplyOpt {
return func(_ context.Context, _ ocispec.Descriptor, c *ApplyConfig) error {
c.SyncFs = sync
return nil
}
}
// WithSourceDateEpoch specifies the timestamp used to provide control for reproducibility.
// See also https://reproducible-builds.org/docs/source-date-epoch/ .
//
// Since containerd v2.0, the whiteout timestamps are set to zero (1970-01-01),
// not to the source date epoch.
func WithSourceDateEpoch(tm *time.Time) Opt {
return func(c *Config) error {
c.SourceDateEpoch = tm
return nil
}
}

100
core/diff/proxy/differ.go Normal file
View File

@@ -0,0 +1,100 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package proxy
import (
"context"
diffapi "github.com/containerd/containerd/v2/api/services/diff/v1"
"github.com/containerd/containerd/v2/core/diff"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/containerd/v2/mount"
"github.com/containerd/containerd/v2/oci"
"github.com/containerd/containerd/v2/pkg/epoch"
"github.com/containerd/containerd/v2/protobuf"
ptypes "github.com/containerd/containerd/v2/protobuf/types"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)
// NewDiffApplier returns a new comparer and applier which communicates
// over a GRPC connection.
func NewDiffApplier(client diffapi.DiffClient) interface{} {
return &diffRemote{
client: client,
}
}
type diffRemote struct {
client diffapi.DiffClient
}
func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts []mount.Mount, opts ...diff.ApplyOpt) (ocispec.Descriptor, error) {
var config diff.ApplyConfig
for _, opt := range opts {
if err := opt(ctx, desc, &config); err != nil {
return ocispec.Descriptor{}, err
}
}
payloads := make(map[string]*ptypes.Any)
for k, v := range config.ProcessorPayloads {
payloads[k] = protobuf.FromAny(v)
}
req := &diffapi.ApplyRequest{
Diff: oci.DescriptorToProto(desc),
Mounts: mount.ToProto(mounts),
Payloads: payloads,
SyncFs: config.SyncFs,
}
resp, err := r.client.Apply(ctx, req)
if err != nil {
return ocispec.Descriptor{}, errdefs.FromGRPC(err)
}
return oci.DescriptorFromProto(resp.Applied), nil
}
func (r *diffRemote) Compare(ctx context.Context, a, b []mount.Mount, opts ...diff.Opt) (ocispec.Descriptor, error) {
var config diff.Config
for _, opt := range opts {
if err := opt(&config); err != nil {
return ocispec.Descriptor{}, err
}
}
if tm := epoch.FromContext(ctx); tm != nil && config.SourceDateEpoch == nil {
config.SourceDateEpoch = tm
}
var sourceDateEpoch *timestamppb.Timestamp
if config.SourceDateEpoch != nil {
sourceDateEpoch = timestamppb.New(*config.SourceDateEpoch)
}
req := &diffapi.DiffRequest{
Left: mount.ToProto(a),
Right: mount.ToProto(b),
MediaType: config.MediaType,
Ref: config.Reference,
Labels: config.Labels,
SourceDateEpoch: sourceDateEpoch,
}
resp, err := r.client.Diff(ctx, req)
if err != nil {
return ocispec.Descriptor{}, errdefs.FromGRPC(err)
}
return oci.DescriptorFromProto(resp.Diff), nil
}

191
core/diff/stream.go Normal file
View File

@@ -0,0 +1,191 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"context"
"errors"
"io"
"os"
"github.com/containerd/containerd/v2/archive/compression"
"github.com/containerd/containerd/v2/images"
"github.com/containerd/typeurl/v2"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
var (
handlers []Handler
// ErrNoProcessor is returned when no stream processor is available for a media-type
ErrNoProcessor = errors.New("no processor for media-type")
)
func init() {
// register the default compression handler
RegisterProcessor(compressedHandler)
}
// RegisterProcessor registers a stream processor for media-types
func RegisterProcessor(handler Handler) {
handlers = append(handlers, handler)
}
// GetProcessor returns the processor for a media-type
func GetProcessor(ctx context.Context, stream StreamProcessor, payloads map[string]typeurl.Any) (StreamProcessor, error) {
// reverse this list so that user configured handlers come up first
for i := len(handlers) - 1; i >= 0; i-- {
processor, ok := handlers[i](ctx, stream.MediaType())
if ok {
return processor(ctx, stream, payloads)
}
}
return nil, ErrNoProcessor
}
// Handler checks a media-type and initializes the processor
type Handler func(ctx context.Context, mediaType string) (StreamProcessorInit, bool)
// StaticHandler returns the processor init func for a static media-type
func StaticHandler(expectedMediaType string, fn StreamProcessorInit) Handler {
return func(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
if mediaType == expectedMediaType {
return fn, true
}
return nil, false
}
}
// StreamProcessorInit returns the initialized stream processor
type StreamProcessorInit func(ctx context.Context, stream StreamProcessor, payloads map[string]typeurl.Any) (StreamProcessor, error)
// RawProcessor provides access to direct fd for processing
type RawProcessor interface {
// File returns the fd for the read stream of the underlying processor
File() *os.File
}
// StreamProcessor handles processing a content stream and transforming it into a different media-type
type StreamProcessor interface {
io.ReadCloser
// MediaType is the resulting media-type that the processor processes the stream into
MediaType() string
}
func compressedHandler(ctx context.Context, mediaType string) (StreamProcessorInit, bool) {
compressed, err := images.DiffCompression(ctx, mediaType)
if err != nil {
return nil, false
}
if compressed != "" {
return func(ctx context.Context, stream StreamProcessor, payloads map[string]typeurl.Any) (StreamProcessor, error) {
ds, err := compression.DecompressStream(stream)
if err != nil {
return nil, err
}
return &compressedProcessor{
rc: ds,
}, nil
}, true
}
return func(ctx context.Context, stream StreamProcessor, payloads map[string]typeurl.Any) (StreamProcessor, error) {
return &stdProcessor{
rc: stream,
}, nil
}, true
}
// NewProcessorChain initialized the root StreamProcessor
func NewProcessorChain(mt string, r io.Reader) StreamProcessor {
return &processorChain{
mt: mt,
rc: r,
}
}
type processorChain struct {
mt string
rc io.Reader
}
func (c *processorChain) MediaType() string {
return c.mt
}
func (c *processorChain) Read(p []byte) (int, error) {
return c.rc.Read(p)
}
func (c *processorChain) Close() error {
return nil
}
type stdProcessor struct {
rc StreamProcessor
}
func (c *stdProcessor) MediaType() string {
return ocispec.MediaTypeImageLayer
}
func (c *stdProcessor) Read(p []byte) (int, error) {
return c.rc.Read(p)
}
func (c *stdProcessor) Close() error {
return nil
}
type compressedProcessor struct {
rc io.ReadCloser
}
func (c *compressedProcessor) MediaType() string {
return ocispec.MediaTypeImageLayer
}
func (c *compressedProcessor) Read(p []byte) (int, error) {
return c.rc.Read(p)
}
func (c *compressedProcessor) Close() error {
return c.rc.Close()
}
// BinaryHandler creates a new stream processor handler which calls out to the given binary.
// The id is used to identify the stream processor and allows the caller to send
// payloads specific for that stream processor (i.e. decryption keys for decrypt stream processor).
// The binary will be called for the provided mediaTypes and return the given media type.
func BinaryHandler(id, returnsMediaType string, mediaTypes []string, path string, args, env []string) Handler {
set := make(map[string]struct{}, len(mediaTypes))
for _, m := range mediaTypes {
set[m] = struct{}{}
}
return func(_ context.Context, mediaType string) (StreamProcessorInit, bool) {
if _, ok := set[mediaType]; ok {
return func(ctx context.Context, stream StreamProcessor, payloads map[string]typeurl.Any) (StreamProcessor, error) {
payload := payloads[id]
return NewBinaryProcessor(ctx, mediaType, returnsMediaType, stream, path, args, env, payload)
}, true
}
return nil, false
}
}
const mediaTypeEnvVar = "STREAM_PROCESSOR_MEDIATYPE"

165
core/diff/stream_unix.go Normal file
View File

@@ -0,0 +1,165 @@
//go:build !windows
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"sync"
"github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/containerd/v2/protobuf/proto"
"github.com/containerd/typeurl/v2"
)
// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args, env []string, payload typeurl.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, env...)
var payloadC io.Closer
if payload != nil {
pb := protobuf.FromAny(payload)
data, err := proto.Marshal(pb)
if err != nil {
return nil, err
}
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
go func() {
io.Copy(w, bytes.NewReader(data))
w.Close()
}()
cmd.ExtraFiles = append(cmd.ExtraFiles, r)
payloadC = r
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w
stderr := bytes.NewBuffer(nil)
cmd.Stderr = stderr
if err := cmd.Start(); err != nil {
return nil, err
}
p := &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
stderr: stderr,
done: make(chan struct{}),
}
go p.wait()
// close after start and dup
w.Close()
if closer != nil {
closer()
}
if payloadC != nil {
payloadC.Close()
}
return p, nil
}
type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
stderr *bytes.Buffer
mu sync.Mutex
err error
// There is a race condition between waiting on c.cmd.Wait() and setting c.err within
// c.wait(), and reading that value from c.Err().
// Use done to wait for the returned error to be captured and set.
done chan struct{}
}
func (c *binaryProcessor) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *binaryProcessor) wait() {
if err := c.cmd.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); ok {
c.mu.Lock()
c.err = errors.New(c.stderr.String())
c.mu.Unlock()
}
}
close(c.done)
}
func (c *binaryProcessor) Wait(ctx context.Context) error {
select {
case <-c.done:
return c.Err()
case <-ctx.Done():
return ctx.Err()
}
}
func (c *binaryProcessor) File() *os.File {
return c.r
}
func (c *binaryProcessor) MediaType() string {
return c.mt
}
func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}
func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}

182
core/diff/stream_windows.go Normal file
View File

@@ -0,0 +1,182 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diff
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"sync"
"github.com/Microsoft/go-winio"
"github.com/containerd/containerd/v2/protobuf"
"github.com/containerd/containerd/v2/protobuf/proto"
"github.com/containerd/log"
"github.com/containerd/typeurl/v2"
)
const processorPipe = "STREAM_PROCESSOR_PIPE"
// NewBinaryProcessor returns a binary processor for use with processing content streams
func NewBinaryProcessor(ctx context.Context, imt, rmt string, stream StreamProcessor, name string, args, env []string, payload typeurl.Any) (StreamProcessor, error) {
cmd := exec.CommandContext(ctx, name, args...)
cmd.Env = os.Environ()
cmd.Env = append(cmd.Env, env...)
if payload != nil {
pb := protobuf.FromAny(payload)
data, err := proto.Marshal(pb)
if err != nil {
return nil, err
}
up, err := getUiqPath()
if err != nil {
return nil, err
}
path := fmt.Sprintf("\\\\.\\pipe\\containerd-processor-%s-pipe", up)
l, err := winio.ListenPipe(path, nil)
if err != nil {
return nil, err
}
go func() {
defer l.Close()
conn, err := l.Accept()
if err != nil {
log.G(ctx).WithError(err).Error("accept npipe connection")
return
}
io.Copy(conn, bytes.NewReader(data))
conn.Close()
}()
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", processorPipe, path))
}
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", mediaTypeEnvVar, imt))
var (
stdin io.Reader
closer func() error
err error
)
if f, ok := stream.(RawProcessor); ok {
stdin = f.File()
closer = f.File().Close
} else {
stdin = stream
}
cmd.Stdin = stdin
r, w, err := os.Pipe()
if err != nil {
return nil, err
}
cmd.Stdout = w
stderr := bytes.NewBuffer(nil)
cmd.Stderr = stderr
if err := cmd.Start(); err != nil {
return nil, err
}
p := &binaryProcessor{
cmd: cmd,
r: r,
mt: rmt,
stderr: stderr,
done: make(chan struct{}),
}
go p.wait()
// close after start and dup
w.Close()
if closer != nil {
closer()
}
return p, nil
}
type binaryProcessor struct {
cmd *exec.Cmd
r *os.File
mt string
stderr *bytes.Buffer
mu sync.Mutex
err error
// There is a race condition between waiting on c.cmd.Wait() and setting c.err within
// c.wait(), and reading that value from c.Err().
// Use done to wait for the returned error to be captured and set.
done chan struct{}
}
func (c *binaryProcessor) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *binaryProcessor) wait() {
if err := c.cmd.Wait(); err != nil {
if _, ok := err.(*exec.ExitError); ok {
c.mu.Lock()
c.err = errors.New(c.stderr.String())
c.mu.Unlock()
}
}
close(c.done)
}
func (c *binaryProcessor) Wait(ctx context.Context) error {
select {
case <-c.done:
return c.Err()
case <-ctx.Done():
return ctx.Err()
}
}
func (c *binaryProcessor) File() *os.File {
return c.r
}
func (c *binaryProcessor) MediaType() string {
return c.mt
}
func (c *binaryProcessor) Read(p []byte) (int, error) {
return c.r.Read(p)
}
func (c *binaryProcessor) Close() error {
err := c.r.Close()
if kerr := c.cmd.Process.Kill(); err == nil {
err = kerr
}
return err
}
func getUiqPath() (string, error) {
dir, err := os.MkdirTemp("", "")
if err != nil {
return "", err
}
os.Remove(dir)
return filepath.Base(dir), nil
}