diff --git a/client/image.go b/client/image.go index 75522f5d5..10f30eacb 100644 --- a/client/image.go +++ b/client/image.go @@ -279,6 +279,14 @@ func WithUnpackDuplicationSuppressor(suppressor kmutex.KeyedLocker) UnpackOpt { } } +// WithUnpackApplyOpts appends new apply options on the UnpackConfig. +func WithUnpackApplyOpts(opts ...diff.ApplyOpt) UnpackOpt { + return func(ctx context.Context, uc *UnpackConfig) error { + uc.ApplyOpts = append(uc.ApplyOpts, opts...) + return nil + } +} + func (i *image) Unpack(ctx context.Context, snapshotterName string, opts ...UnpackOpt) error { ctx, done, err := i.client.WithLease(ctx) if err != nil { diff --git a/contrib/diffservice/service.go b/contrib/diffservice/service.go index b9840594b..a75a4725d 100644 --- a/contrib/diffservice/service.go +++ b/contrib/diffservice/service.go @@ -60,6 +60,7 @@ func (s *service) Apply(ctx context.Context, er *diffapi.ApplyRequest) (*diffapi } opts = append(opts, diff.WithPayloads(payloads)) } + opts = append(opts, diff.WithSyncFs(er.SyncFs)) ocidesc, err = s.applier.Apply(ctx, desc, mounts, opts...) if err != nil { diff --git a/diff/apply/apply.go b/diff/apply/apply.go index 9167e3ee7..fb83dafaa 100644 --- a/diff/apply/apply.go +++ b/diff/apply/apply.go @@ -92,7 +92,7 @@ func (s *fsApplier) Apply(ctx context.Context, desc ocispec.Descriptor, mounts [ r: io.TeeReader(processor, digester.Hash()), } - if err := apply(ctx, mounts, rc); err != nil { + if err := apply(ctx, mounts, rc, config.SyncFs); err != nil { return emptyDesc, err } diff --git a/diff/apply/apply_darwin.go b/diff/apply/apply_darwin.go index 9cc362e49..ac8beb6ec 100644 --- a/diff/apply/apply_darwin.go +++ b/diff/apply/apply_darwin.go @@ -25,7 +25,7 @@ import ( "github.com/containerd/containerd/v2/mount" ) -func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error { +func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, _sync bool) error { // We currently do not support mounts nor bind mounts on MacOS in the containerd daemon. // Using this as an exception to enable native snapshotter and allow further research. if len(mounts) == 1 && mounts[0].Type == "bind" { @@ -38,6 +38,8 @@ func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error { path := mounts[0].Source _, err := archive.Apply(ctx, path, r, opts...) return err + + // TODO: Do we need to sync all the filesystems? } return mount.WithTempMount(ctx, mounts, func(root string) error { diff --git a/diff/apply/apply_linux.go b/diff/apply/apply_linux.go index d00af68bc..d2dac89f8 100644 --- a/diff/apply/apply_linux.go +++ b/diff/apply/apply_linux.go @@ -20,15 +20,18 @@ import ( "context" "fmt" "io" + "os" "strings" "github.com/containerd/containerd/v2/archive" "github.com/containerd/containerd/v2/errdefs" "github.com/containerd/containerd/v2/mount" "github.com/containerd/containerd/v2/pkg/userns" + + "golang.org/x/sys/unix" ) -func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error { +func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, sync bool) (retErr error) { switch { case len(mounts) == 1 && mounts[0].Type == "overlay": // OverlayConvertWhiteout (mknod c 0 0) doesn't work in userns. @@ -50,7 +53,18 @@ func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error { opts = append(opts, archive.WithParents(parents)) } _, err = archive.Apply(ctx, path, r, opts...) + if err == nil && sync { + err = doSyncFs(path) + } return err + case sync && len(mounts) == 1 && mounts[0].Type == "bind": + defer func() { + if retErr != nil { + return + } + + retErr = doSyncFs(mounts[0].Source) + }() } return mount.WithTempMount(ctx, mounts, func(root string) error { _, err := archive.Apply(ctx, root, r) @@ -75,3 +89,17 @@ func getOverlayPath(options []string) (upper string, lower []string, err error) return } + +func doSyncFs(file string) error { + fd, err := os.Open(file) + if err != nil { + return fmt.Errorf("failed to open %s: %w", file, err) + } + defer fd.Close() + + _, _, errno := unix.Syscall(unix.SYS_SYNCFS, fd.Fd(), 0, 0) + if errno != 0 { + return fmt.Errorf("failed to syncfs for %s: %w", file, errno) + } + return nil +} diff --git a/diff/apply/apply_other.go b/diff/apply/apply_other.go index 74902bf6a..2531e13b1 100644 --- a/diff/apply/apply_other.go +++ b/diff/apply/apply_other.go @@ -26,7 +26,8 @@ import ( "github.com/containerd/containerd/v2/mount" ) -func apply(ctx context.Context, mounts []mount.Mount, r io.Reader) error { +func apply(ctx context.Context, mounts []mount.Mount, r io.Reader, _sync bool) error { + // TODO: for windows, how to sync? return mount.WithTempMount(ctx, mounts, func(root string) error { _, err := archive.Apply(ctx, root, r) return err diff --git a/diff/diff.go b/diff/diff.go index d4462faba..5aef42c29 100644 --- a/diff/diff.go +++ b/diff/diff.go @@ -67,6 +67,8 @@ type Comparer interface { type ApplyConfig struct { // ProcessorPayloads specifies the payload sent to various processors ProcessorPayloads map[string]typeurl.Any + // SyncFs is to synchronize the underlying filesystem containing files + SyncFs bool } // ApplyOpt is used to configure an Apply operation @@ -125,6 +127,14 @@ func WithPayloads(payloads map[string]typeurl.Any) ApplyOpt { } } +// WithSyncFs sets sync flag to the config. +func WithSyncFs(sync bool) ApplyOpt { + return func(_ context.Context, _ ocispec.Descriptor, c *ApplyConfig) error { + c.SyncFs = sync + return nil + } +} + // WithSourceDateEpoch specifies the timestamp used to provide control for reproducibility. // See also https://reproducible-builds.org/docs/source-date-epoch/ . // diff --git a/diff/proxy/differ.go b/diff/proxy/differ.go index 84e62145d..5b930dec3 100644 --- a/diff/proxy/differ.go +++ b/diff/proxy/differ.go @@ -61,6 +61,7 @@ func (r *diffRemote) Apply(ctx context.Context, desc ocispec.Descriptor, mounts Diff: oci.DescriptorToProto(desc), Mounts: mount.ToProto(mounts), Payloads: payloads, + SyncFs: config.SyncFs, } resp, err := r.client.Apply(ctx, req) if err != nil { diff --git a/pkg/cri/config/config.go b/pkg/cri/config/config.go index fb526ecd9..3307026d3 100644 --- a/pkg/cri/config/config.go +++ b/pkg/cri/config/config.go @@ -357,6 +357,9 @@ type PluginConfig struct { // // For example, the value can be '5h', '2h30m', '10s'. DrainExecSyncIOTimeout string `toml:"drain_exec_sync_io_timeout" json:"drainExecSyncIOTimeout"` + // ImagePullWithSyncFs is an experimental setting. It's to force sync + // filesystem during unpacking to ensure that data integrity. + ImagePullWithSyncFs bool `toml:"image_pull_with_sync_fs" json:"imagePullWithSyncFs"` } // X509KeyPairStreaming contains the x509 configuration for streaming diff --git a/pkg/cri/config/config_unix.go b/pkg/cri/config/config_unix.go index d3fe60e8e..05e90e914 100644 --- a/pkg/cri/config/config_unix.go +++ b/pkg/cri/config/config_unix.go @@ -100,6 +100,7 @@ func DefaultConfig() PluginConfig { CDISpecDirs: []string{"/etc/cdi", "/var/run/cdi"}, ImagePullProgressTimeout: defaultImagePullProgressTimeoutDuration.String(), DrainExecSyncIOTimeout: "0s", + ImagePullWithSyncFs: false, EnableUnprivilegedPorts: true, EnableUnprivilegedICMP: true, } diff --git a/pkg/cri/server/images/image_pull.go b/pkg/cri/server/images/image_pull.go index 8de15f902..b77db49bb 100644 --- a/pkg/cri/server/images/image_pull.go +++ b/pkg/cri/server/images/image_pull.go @@ -39,6 +39,7 @@ import ( runtime "k8s.io/cri-api/pkg/apis/runtime/v1" containerd "github.com/containerd/containerd/v2/client" + "github.com/containerd/containerd/v2/diff" "github.com/containerd/containerd/v2/errdefs" containerdimages "github.com/containerd/containerd/v2/images" "github.com/containerd/containerd/v2/pkg/cri/annotations" @@ -165,6 +166,7 @@ func (c *CRIImageService) PullImage(ctx context.Context, r *runtime.PullImageReq containerd.WithImageHandler(imageHandler), containerd.WithUnpackOpts([]containerd.UnpackOpt{ containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor), + containerd.WithUnpackApplyOpts(diff.WithSyncFs(c.config.ImagePullWithSyncFs)), }), } diff --git a/services/diff/local.go b/services/diff/local.go index 8fa3a43d7..af58dedd5 100644 --- a/services/diff/local.go +++ b/services/diff/local.go @@ -106,6 +106,7 @@ func (l *local) Apply(ctx context.Context, er *diffapi.ApplyRequest, _ ...grpc.C } opts = append(opts, diff.WithPayloads(payloads)) } + opts = append(opts, diff.WithSyncFs(er.SyncFs)) for _, differ := range l.differs { ocidesc, err = differ.Apply(ctx, desc, mounts, opts...)