Add OCI/Image Volume Source support
Signed-off-by: Shiming Zhang <wzshiming@hotmail.com>
This commit is contained in:
@@ -161,18 +161,25 @@ func (c *criService) CreateContainer(ctx context.Context, r *runtime.CreateConta
|
||||
return nil, fmt.Errorf("failed to query sandbox platform: %w", err)
|
||||
}
|
||||
|
||||
ociRuntime, err := c.getPodSandboxRuntime(sandboxID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
|
||||
}
|
||||
|
||||
// mutate the extra CRI volume mounts from the runtime spec to properly specify the OCI image volume mount requests as bind mounts for this container
|
||||
err = c.mutateMounts(ctx, config.GetMounts(), c.RuntimeSnapshotter(ctx, ociRuntime), sandboxID, platform)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to mount image volume: %w", err)
|
||||
}
|
||||
|
||||
var volumeMounts []*runtime.Mount
|
||||
if !c.config.IgnoreImageDefinedVolumes {
|
||||
// Create container image volumes mounts.
|
||||
// create a list of image volume mounts from the image spec that are not also already in the runtime config volume list
|
||||
volumeMounts = c.volumeMounts(platform, containerRootDir, config, &image.ImageSpec.Config)
|
||||
} else if len(image.ImageSpec.Config.Volumes) != 0 {
|
||||
log.G(ctx).Debugf("Ignoring volumes defined in image %v because IgnoreImageDefinedVolumes is set", image.ID)
|
||||
}
|
||||
|
||||
ociRuntime, err := c.config.GetSandboxRuntime(sandboxConfig, sandbox.Metadata.RuntimeHandler)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get sandbox runtime: %w", err)
|
||||
}
|
||||
var runtimeHandler *runtime.RuntimeHandler
|
||||
for _, f := range c.runtimeHandlers {
|
||||
f := f
|
||||
|
||||
196
internal/cri/server/container_image_mount.go
Normal file
196
internal/cri/server/container_image_mount.go
Normal file
@@ -0,0 +1,196 @@
|
||||
/*
|
||||
Copyright The containerd Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/core/leases"
|
||||
"github.com/containerd/containerd/v2/core/mount"
|
||||
"github.com/containerd/errdefs"
|
||||
"github.com/containerd/log"
|
||||
"github.com/containerd/platforms"
|
||||
"github.com/opencontainers/image-spec/identity"
|
||||
imagespec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
)
|
||||
|
||||
func (c *criService) mutateMounts(
|
||||
ctx context.Context,
|
||||
extraMounts []*runtime.Mount,
|
||||
snapshotter string,
|
||||
sandboxID string,
|
||||
platform imagespec.Platform,
|
||||
) error {
|
||||
if err := c.ensureLeaseExist(ctx, sandboxID); err != nil {
|
||||
return fmt.Errorf("failed to ensure lease %v for sandbox: %w", sandboxID, err)
|
||||
}
|
||||
|
||||
ctx = leases.WithLease(ctx, sandboxID)
|
||||
for _, m := range extraMounts {
|
||||
err := c.mutateImageMount(ctx, m, snapshotter, sandboxID, platform)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *criService) ensureLeaseExist(ctx context.Context, sandboxID string) error {
|
||||
leaseSvc := c.client.LeasesService()
|
||||
_, err := leaseSvc.Create(ctx, leases.WithID(sandboxID))
|
||||
if err != nil {
|
||||
if errdefs.IsAlreadyExists(err) {
|
||||
err = nil
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *criService) mutateImageMount(
|
||||
ctx context.Context,
|
||||
extraMount *runtime.Mount,
|
||||
snapshotter string,
|
||||
sandboxID string,
|
||||
platform imagespec.Platform,
|
||||
) (retErr error) {
|
||||
imageSpec := extraMount.GetImage()
|
||||
if imageSpec == nil {
|
||||
return nil
|
||||
}
|
||||
if extraMount.GetHostPath() != "" {
|
||||
return fmt.Errorf("hostpath must be empty while mount image: %+v", extraMount)
|
||||
}
|
||||
if !extraMount.GetReadonly() {
|
||||
return fmt.Errorf("readonly must be true while mount image: %+v", extraMount)
|
||||
}
|
||||
|
||||
ref := imageSpec.GetImage()
|
||||
if ref == "" {
|
||||
return fmt.Errorf("image not specified in: %+v", imageSpec)
|
||||
}
|
||||
image, err := c.LocalResolve(ref)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to resolve image %q: %w", ref, err)
|
||||
}
|
||||
containerdImage, err := c.toContainerdImage(ctx, image)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get image from containerd %q: %w", image.ID, err)
|
||||
}
|
||||
|
||||
// This is a digest of the manifest
|
||||
imageID := containerdImage.Target().Digest.Encoded()
|
||||
|
||||
target := c.getImageVolumeHostPath(sandboxID, imageID)
|
||||
|
||||
// Already mounted in another container on the same pod
|
||||
if stat, err := os.Stat(target); err == nil && stat.IsDir() {
|
||||
extraMount.HostPath = target
|
||||
return nil
|
||||
}
|
||||
|
||||
img, err := c.client.ImageService().Get(ctx, ref)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get image volume ref %q: %w", ref, err)
|
||||
}
|
||||
|
||||
i := containerd.NewImageWithPlatform(c.client, img, platforms.Only(platform))
|
||||
if err := i.Unpack(ctx, snapshotter); err != nil {
|
||||
return fmt.Errorf("failed to unpack image volume: %w", err)
|
||||
}
|
||||
|
||||
diffIDs, err := i.RootFS(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get diff IDs for image volume %q: %w", ref, err)
|
||||
}
|
||||
chainID := identity.ChainID(diffIDs).String()
|
||||
|
||||
s := c.client.SnapshotService(snapshotter)
|
||||
mounts, err := s.Prepare(ctx, target, chainID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to prepare for image volume %q: %w", ref, err)
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
_ = s.Remove(ctx, target)
|
||||
}
|
||||
}()
|
||||
|
||||
err = os.MkdirAll(target, 0755)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create directory to image volume target path %q: %w", target, err)
|
||||
}
|
||||
|
||||
if err := mount.All(mounts, target); err != nil {
|
||||
return fmt.Errorf("failed to mount image volume component %q: %w", target, err)
|
||||
}
|
||||
|
||||
extraMount.HostPath = target
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *criService) cleanupImageMounts(
|
||||
ctx context.Context,
|
||||
sandboxID string,
|
||||
) (retErr error) {
|
||||
// Some checks to avoid affecting old pods.
|
||||
ociRuntime, err := c.getPodSandboxRuntime(sandboxID)
|
||||
if err != nil {
|
||||
log.G(ctx).WithError(err).Errorf("failed to get sandbox runtime handler %q", sandboxID)
|
||||
return nil
|
||||
}
|
||||
snapshotter := c.RuntimeSnapshotter(ctx, ociRuntime)
|
||||
s := c.client.SnapshotService(snapshotter)
|
||||
if s == nil {
|
||||
return nil
|
||||
}
|
||||
targetBase := c.getImageVolumeBaseDir(sandboxID)
|
||||
entries, err := os.ReadDir(targetBase)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to read directory: %w", err)
|
||||
}
|
||||
|
||||
for _, entry := range entries {
|
||||
target := filepath.Join(targetBase, entry.Name())
|
||||
|
||||
err = mount.UnmountAll(target, 0)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmount image volume component %q: %w", target, err)
|
||||
}
|
||||
err = s.Remove(ctx, target)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to removing snapshot: %w", err)
|
||||
}
|
||||
err = os.Remove(target)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to removing mounts directory: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = os.Remove(targetBase)
|
||||
if err != nil && !errdefs.IsNotFound(err) {
|
||||
return fmt.Errorf("failed to remove directory to cleanup image volume mounts: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
|
||||
containerd "github.com/containerd/containerd/v2/client"
|
||||
"github.com/containerd/containerd/v2/core/containers"
|
||||
criconfig "github.com/containerd/containerd/v2/internal/cri/config"
|
||||
containerstore "github.com/containerd/containerd/v2/internal/cri/store/container"
|
||||
imagestore "github.com/containerd/containerd/v2/internal/cri/store/image"
|
||||
"github.com/containerd/errdefs"
|
||||
@@ -61,6 +62,8 @@ const (
|
||||
sandboxesDir = "sandboxes"
|
||||
// containersDir contains all container root.
|
||||
containersDir = "containers"
|
||||
// imageVolumeDir contains all image volume root.
|
||||
imageVolumeDir = "image-volumes"
|
||||
// Delimiter used to construct container/sandbox names.
|
||||
nameDelimiter = "_"
|
||||
|
||||
@@ -139,6 +142,16 @@ func (c *criService) getContainerRootDir(id string) string {
|
||||
return filepath.Join(c.config.RootDir, containersDir, id)
|
||||
}
|
||||
|
||||
// getImageVolumeHostPath returns the image volume directory for share.
|
||||
func (c *criService) getImageVolumeHostPath(podID, imageID string) string {
|
||||
return filepath.Join(c.config.StateDir, imageVolumeDir, podID, imageID)
|
||||
}
|
||||
|
||||
// getImageVolumeBaseDir returns the image volume base directory for cleanup.
|
||||
func (c *criService) getImageVolumeBaseDir(podID string) string {
|
||||
return filepath.Join(c.config.StateDir, imageVolumeDir, podID)
|
||||
}
|
||||
|
||||
// getVolatileContainerRootDir returns the root directory for managing volatile container files,
|
||||
// e.g. named pipes.
|
||||
func (c *criService) getVolatileContainerRootDir(id string) string {
|
||||
@@ -356,6 +369,18 @@ func (c *criService) generateAndSendContainerEvent(ctx context.Context, containe
|
||||
c.containerEventsQ.Send(event)
|
||||
}
|
||||
|
||||
func (c *criService) getPodSandboxRuntime(sandboxID string) (runtime criconfig.Runtime, err error) {
|
||||
sandbox, err := c.sandboxStore.Get(sandboxID)
|
||||
if err != nil {
|
||||
return criconfig.Runtime{}, err
|
||||
}
|
||||
runtime, err = c.config.GetSandboxRuntime(sandbox.Config, sandbox.Metadata.RuntimeHandler)
|
||||
if err != nil {
|
||||
return criconfig.Runtime{}, err
|
||||
}
|
||||
return runtime, nil
|
||||
}
|
||||
|
||||
func (c *criService) getPodSandboxStatus(ctx context.Context, podSandboxID string) (*runtime.PodSandboxStatus, error) {
|
||||
request := &runtime.PodSandboxStatusRequest{PodSandboxId: podSandboxID}
|
||||
response, err := c.PodSandboxStatus(ctx, request)
|
||||
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/containerd/containerd/v2/core/leases"
|
||||
"github.com/containerd/containerd/v2/pkg/tracing"
|
||||
"github.com/containerd/errdefs"
|
||||
"github.com/containerd/log"
|
||||
@@ -59,6 +60,12 @@ func (c *criService) RemovePodSandbox(ctx context.Context, r *runtime.RemovePodS
|
||||
return nil, fmt.Errorf("failed to forcibly stop sandbox %q: %w", id, err)
|
||||
}
|
||||
|
||||
if err := c.client.LeasesService().Delete(ctx, leases.Lease{ID: id}); err != nil {
|
||||
if !errdefs.IsNotFound(err) {
|
||||
return nil, fmt.Errorf("failed to delete lease for sandbox %q: %w", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Return error if sandbox network namespace is not closed yet.
|
||||
if sandbox.NetNS != nil {
|
||||
nsPath := sandbox.NetNS.GetPath()
|
||||
|
||||
@@ -31,6 +31,7 @@ import (
|
||||
"github.com/containerd/typeurl/v2"
|
||||
runtime "k8s.io/cri-api/pkg/apis/runtime/v1"
|
||||
|
||||
"github.com/containerd/containerd/v2/core/leases"
|
||||
sb "github.com/containerd/containerd/v2/core/sandbox"
|
||||
"github.com/containerd/containerd/v2/internal/cri/annotations"
|
||||
"github.com/containerd/containerd/v2/internal/cri/bandwidth"
|
||||
@@ -87,6 +88,22 @@ func (c *criService) RunPodSandbox(ctx context.Context, r *runtime.RunPodSandbox
|
||||
}
|
||||
}()
|
||||
|
||||
leaseSvc := c.client.LeasesService()
|
||||
ls, lerr := leaseSvc.Create(ctx, leases.WithID(id))
|
||||
if lerr != nil {
|
||||
return nil, fmt.Errorf("failed to create lease for sandbox name %q: %w", name, lerr)
|
||||
}
|
||||
defer func() {
|
||||
if retErr != nil {
|
||||
deferCtx, deferCancel := util.DeferContext()
|
||||
defer deferCancel()
|
||||
|
||||
if derr := leaseSvc.Delete(deferCtx, ls); derr != nil {
|
||||
log.G(deferCtx).WithError(derr).Error("failed to delete lease during cleanup")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
var (
|
||||
err error
|
||||
sandboxInfo = sb.Sandbox{ID: id}
|
||||
|
||||
@@ -130,6 +130,11 @@ func (c *criService) stopPodSandbox(ctx context.Context, sandbox sandboxstore.Sa
|
||||
}
|
||||
|
||||
log.G(ctx).Infof("TearDown network for sandbox %q successfully", id)
|
||||
|
||||
err = c.cleanupImageMounts(ctx, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to cleanup image mounts for sandbox %q: %w", id, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user