Move snapshots/devmapper to plugins/snapshots/devmapper

Signed-off-by: Derek McGowan <derek@mcg.dev>
This commit is contained in:
Derek McGowan
2024-01-17 09:53:24 -08:00
parent 7dd96fe346
commit 8473322f0b
15 changed files with 9 additions and 9 deletions

View File

@@ -0,0 +1,41 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package blkdiscard
import "os/exec"
// Version returns the output of "blkdiscard --version"
func Version() (string, error) {
return blkdiscard("--version")
}
// BlkDiscard discards all blocks of a device.
// devicePath is expected to be a fully qualified path.
// BlkDiscard expects the caller to verify that the device is not in use.
func BlkDiscard(devicePath string) (string, error) {
return blkdiscard(devicePath)
}
func blkdiscard(args ...string) (string, error) {
output, err := exec.Command("blkdiscard", args...).CombinedOutput()
if err != nil {
return "", err
}
return string(output), nil
}

View File

@@ -0,0 +1,126 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"errors"
"fmt"
"os"
"github.com/docker/go-units"
"github.com/pelletier/go-toml/v2"
)
// Config represents device mapper configuration loaded from file.
// Size units can be specified in human-readable string format (like "32KIB", "32GB", "32Tb")
type Config struct {
// Device snapshotter root directory for metadata
RootPath string `toml:"root_path"`
// Name for 'thin-pool' device to be used by snapshotter (without /dev/mapper/ prefix)
PoolName string `toml:"pool_name"`
// Defines how much space to allocate when creating base image for container
BaseImageSize string `toml:"base_image_size"`
BaseImageSizeBytes uint64 `toml:"-"`
// Flag to async remove device using Cleanup() callback in snapshots GC
AsyncRemove bool `toml:"async_remove"`
// Whether to discard blocks when removing a thin device.
DiscardBlocks bool `toml:"discard_blocks"`
// Defines file system to use for snapshout device mount. Defaults to "ext4"
FileSystemType fsType `toml:"fs_type"`
// Defines optional file system options passed through config file
FsOptions string `toml:"fs_options"`
}
// LoadConfig reads devmapper configuration file from disk in TOML format
func LoadConfig(path string) (*Config, error) {
f, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, os.ErrNotExist
}
return nil, err
}
defer f.Close()
config := Config{}
if err := toml.NewDecoder(f).Decode(&config); err != nil {
return nil, fmt.Errorf("failed to unmarshal devmapper TOML: %w", err)
}
if err := config.parse(); err != nil {
return nil, err
}
if err := config.Validate(); err != nil {
return nil, err
}
return &config, nil
}
func (c *Config) parse() error {
baseImageSize, err := units.RAMInBytes(c.BaseImageSize)
if err != nil {
return fmt.Errorf("failed to parse base image size: '%s': %w", c.BaseImageSize, err)
}
if c.FileSystemType == "" {
c.FileSystemType = fsTypeExt4
}
c.BaseImageSizeBytes = uint64(baseImageSize)
return nil
}
// Validate makes sure configuration fields are valid
func (c *Config) Validate() error {
var result []error
if c.PoolName == "" {
result = append(result, fmt.Errorf("pool_name is required"))
}
if c.RootPath == "" {
result = append(result, fmt.Errorf("root_path is required"))
}
if c.BaseImageSize == "" {
result = append(result, fmt.Errorf("base_image_size is required"))
}
if c.FileSystemType != "" {
switch c.FileSystemType {
case fsTypeExt4, fsTypeXFS, fsTypeExt2:
default:
result = append(result, fmt.Errorf("unsupported Filesystem Type: %q", c.FileSystemType))
}
} else {
result = append(result, fmt.Errorf("filesystem type cannot be empty"))
}
return errors.Join(result...)
}

View File

@@ -0,0 +1,101 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"os"
"testing"
"github.com/pelletier/go-toml/v2"
"github.com/stretchr/testify/assert"
)
func TestLoadConfig(t *testing.T) {
expected := Config{
RootPath: "/tmp",
PoolName: "test",
BaseImageSize: "128Mb",
}
file, err := os.CreateTemp("", "devmapper-config-")
assert.NoError(t, err)
encoder := toml.NewEncoder(file)
err = encoder.Encode(&expected)
assert.NoError(t, err)
defer func() {
err := file.Close()
assert.NoError(t, err)
err = os.Remove(file.Name())
assert.NoError(t, err)
}()
loaded, err := LoadConfig(file.Name())
assert.NoError(t, err)
assert.Equal(t, loaded.RootPath, expected.RootPath)
assert.Equal(t, loaded.PoolName, expected.PoolName)
assert.Equal(t, loaded.BaseImageSize, expected.BaseImageSize)
assert.True(t, loaded.BaseImageSizeBytes == 128*1024*1024)
}
func TestLoadConfigInvalidPath(t *testing.T) {
_, err := LoadConfig("")
assert.Equal(t, os.ErrNotExist, err)
_, err = LoadConfig("/dev/null")
assert.NotNil(t, err)
}
func TestParseInvalidData(t *testing.T) {
config := Config{
BaseImageSize: "y",
}
err := config.parse()
assert.Error(t, err, "failed to parse base image size: 'y': invalid size: 'y'")
}
func TestFieldValidation(t *testing.T) {
config := &Config{}
err := config.Validate()
assert.NotNil(t, err)
multErr := err.(interface{ Unwrap() []error }).Unwrap()
assert.Len(t, multErr, 4)
assert.NotNil(t, multErr[0], "pool_name is empty")
assert.NotNil(t, multErr[1], "root_path is empty")
assert.NotNil(t, multErr[2], "base_image_size is empty")
assert.NotNil(t, multErr[3], "filesystem type cannot be empty")
}
func TestExistingPoolFieldValidation(t *testing.T) {
config := &Config{
PoolName: "test",
RootPath: "test",
BaseImageSize: "10mb",
FileSystemType: "ext4",
}
err := config.Validate()
assert.NoError(t, err)
}

View File

@@ -0,0 +1,110 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"fmt"
)
const (
maxDeviceID = 0xffffff // Device IDs are 24-bit numbers
)
// DeviceState represents current devmapper device state reflected in meta store
type DeviceState int
const (
// Unknown means that device just allocated and no operations were performed
Unknown DeviceState = iota
// Creating means that device is going to be created
Creating
// Created means that devices successfully created
Created
// Activating means that device is going to be activated
Activating
// Activated means that device successfully activated
Activated
// Suspending means that device is going to be suspended
Suspending
// Suspended means that device successfully suspended
Suspended
// Resuming means that device is going to be resumed from suspended state
Resuming
// Resumed means that device successfully resumed
Resumed
// Deactivating means that device is going to be deactivated
Deactivating
// Deactivated means that device successfully deactivated
Deactivated
// Removing means that device is going to be removed
Removing
// Removed means that device successfully removed but not yet deleted from meta store
Removed
// Faulty means that the device is errored and the snapshotter failed to rollback it
Faulty
)
func (s DeviceState) String() string {
switch s {
case Creating:
return "Creating"
case Created:
return "Created"
case Activating:
return "Activating"
case Activated:
return "Activated"
case Suspending:
return "Suspending"
case Suspended:
return "Suspended"
case Resuming:
return "Resuming"
case Resumed:
return "Resumed"
case Deactivating:
return "Deactivating"
case Deactivated:
return "Deactivated"
case Removing:
return "Removing"
case Removed:
return "Removed"
case Faulty:
return "Faulty"
default:
return fmt.Sprintf("unknown %d", s)
}
}
// DeviceInfo represents metadata for thin device within thin-pool
type DeviceInfo struct {
// DeviceID is a 24-bit number assigned to a device within thin-pool device
DeviceID uint32 `json:"device_id"`
// Size is a thin device size
Size uint64 `json:"size"`
// Name is a device name to be used in /dev/mapper/
Name string `json:"name"`
// ParentName is a name of parent device (if snapshot)
ParentName string `json:"parent_name"`
// State represents current device state
State DeviceState `json:"state"`
// Error details if device state change failed
Error string `json:"error"`
}

View File

@@ -0,0 +1,446 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Copyright 2012-2017 Docker, Inc.
package dmsetup
import (
"errors"
"fmt"
"io"
"os"
"os/exec"
"strconv"
"strings"
blkdiscard "github.com/containerd/containerd/v2/plugins/snapshots/devmapper/blkdiscard"
"golang.org/x/sys/unix"
)
const (
// DevMapperDir represents devmapper devices location
DevMapperDir = "/dev/mapper/"
// SectorSize represents the number of bytes in one sector on devmapper devices
SectorSize = 512
)
// ErrInUse represents an error mutating a device because it is in use elsewhere
var ErrInUse = errors.New("device is in use")
// DeviceInfo represents device info returned by "dmsetup info".
// dmsetup(8) provides more information on each of these fields.
type DeviceInfo struct {
Name string
BlockDeviceName string
TableLive bool
TableInactive bool
Suspended bool
ReadOnly bool
Major uint32
Minor uint32
OpenCount uint32 // Open reference count
TargetCount uint32 // Number of targets in the live table
EventNumber uint32 // Last event sequence number (used by wait)
}
var errTable map[string]unix.Errno
func init() {
// Precompute map of <text>=<errno> for optimal lookup
errTable = make(map[string]unix.Errno)
for errno := unix.EPERM; errno <= unix.EHWPOISON; errno++ {
errTable[errno.Error()] = errno
}
}
// CreatePool creates a device with the given name, data and metadata file and block size (see "dmsetup create")
func CreatePool(poolName, dataFile, metaFile string, blockSizeSectors uint32) error {
thinPool, err := makeThinPoolMapping(dataFile, metaFile, blockSizeSectors)
if err != nil {
return err
}
_, err = dmsetup("create", poolName, "--table", thinPool)
return err
}
// ReloadPool reloads existing thin-pool (see "dmsetup reload")
func ReloadPool(deviceName, dataFile, metaFile string, blockSizeSectors uint32) error {
thinPool, err := makeThinPoolMapping(dataFile, metaFile, blockSizeSectors)
if err != nil {
return err
}
_, err = dmsetup("reload", deviceName, "--table", thinPool)
return err
}
const (
lowWaterMark = 32768 // Picked arbitrary, might need tuning
skipZeroing = "skip_block_zeroing" // Skipping zeroing to reduce latency for device creation
)
// makeThinPoolMapping makes thin-pool table entry
func makeThinPoolMapping(dataFile, metaFile string, blockSizeSectors uint32) (string, error) {
dataDeviceSizeBytes, err := BlockDeviceSize(dataFile)
if err != nil {
return "", fmt.Errorf("failed to get block device size: %s: %w", dataFile, err)
}
// Thin-pool mapping target has the following format:
// start - starting block in virtual device
// length - length of this segment
// metadata_dev - the metadata device
// data_dev - the data device
// data_block_size - the data block size in sectors
// low_water_mark - the low water mark, expressed in blocks of size data_block_size
// feature_args - the number of feature arguments
// args
lengthSectors := dataDeviceSizeBytes / SectorSize
target := fmt.Sprintf("0 %d thin-pool %s %s %d %d 1 %s",
lengthSectors,
metaFile,
dataFile,
blockSizeSectors,
lowWaterMark,
skipZeroing)
return target, nil
}
// CreateDevice sends "create_thin <deviceID>" message to the given thin-pool
func CreateDevice(poolName string, deviceID uint32) error {
_, err := dmsetup("message", poolName, "0", fmt.Sprintf("create_thin %d", deviceID))
return err
}
// ActivateDevice activates the given thin-device using the 'thin' target
func ActivateDevice(poolName string, deviceName string, deviceID uint32, size uint64, external string) error {
mapping := makeThinMapping(poolName, deviceID, size, external)
_, err := dmsetup("create", deviceName, "--table", mapping)
return err
}
// makeThinMapping makes thin target table entry
func makeThinMapping(poolName string, deviceID uint32, sizeBytes uint64, externalOriginDevice string) string {
lengthSectors := sizeBytes / SectorSize
// Thin target has the following format:
// start - starting block in virtual device
// length - length of this segment
// pool_dev - the thin-pool device, can be /dev/mapper/pool_name or 253:0
// dev_id - the internal device id of the device to be activated
// external_origin_dev - an optional block device outside the pool to be treated as a read-only snapshot origin.
target := fmt.Sprintf("0 %d thin %s %d %s", lengthSectors, GetFullDevicePath(poolName), deviceID, externalOriginDevice)
return strings.TrimSpace(target)
}
// SuspendDevice suspends the given device (see "dmsetup suspend")
func SuspendDevice(deviceName string) error {
_, err := dmsetup("suspend", deviceName)
return err
}
// ResumeDevice resumes the given device (see "dmsetup resume")
func ResumeDevice(deviceName string) error {
_, err := dmsetup("resume", deviceName)
return err
}
// Table returns the current table for the device
func Table(deviceName string) (string, error) {
return dmsetup("table", deviceName)
}
// CreateSnapshot sends "create_snap" message to the given thin-pool.
// Caller needs to suspend and resume device if it is active.
func CreateSnapshot(poolName string, deviceID uint32, baseDeviceID uint32) error {
_, err := dmsetup("message", poolName, "0", fmt.Sprintf("create_snap %d %d", deviceID, baseDeviceID))
return err
}
// DeleteDevice sends "delete <deviceID>" message to the given thin-pool
func DeleteDevice(poolName string, deviceID uint32) error {
_, err := dmsetup("message", poolName, "0", fmt.Sprintf("delete %d", deviceID))
return err
}
// RemoveDeviceOpt represents command line arguments for "dmsetup remove" command
type RemoveDeviceOpt string
const (
// RemoveWithForce flag replaces the table with one that fails all I/O if
// open device can't be removed
RemoveWithForce RemoveDeviceOpt = "--force"
// RemoveWithRetries option will cause the operation to be retried
// for a few seconds before failing
RemoveWithRetries RemoveDeviceOpt = "--retry"
// RemoveDeferred flag will enable deferred removal of open devices,
// the device will be removed when the last user closes it
RemoveDeferred RemoveDeviceOpt = "--deferred"
)
// RemoveDevice removes a device (see "dmsetup remove")
func RemoveDevice(deviceName string, opts ...RemoveDeviceOpt) error {
args := []string{
"remove",
}
for _, opt := range opts {
args = append(args, string(opt))
}
args = append(args, GetFullDevicePath(deviceName))
_, err := dmsetup(args...)
if err == unix.ENXIO {
// Ignore "No such device or address" error because we dmsetup
// remove with "deferred" option, there is chance for the device
// having been removed.
return nil
}
return err
}
// Info outputs device information (see "dmsetup info").
// If device name is empty, all device infos will be returned.
func Info(deviceName string) ([]*DeviceInfo, error) {
output, err := dmsetup(
"info",
"--columns",
"--noheadings",
"-o",
"name,blkdevname,attr,major,minor,open,segments,events",
"--separator",
" ",
deviceName)
if err != nil {
return nil, err
}
var (
lines = strings.Split(output, "\n")
devices = make([]*DeviceInfo, len(lines))
)
for i, line := range lines {
var (
attr = ""
info = &DeviceInfo{}
)
_, err := fmt.Sscan(line,
&info.Name,
&info.BlockDeviceName,
&attr,
&info.Major,
&info.Minor,
&info.OpenCount,
&info.TargetCount,
&info.EventNumber)
if err != nil {
return nil, fmt.Errorf("failed to parse line %q: %w", line, err)
}
// Parse attributes (see "man 8 dmsetup" for details)
info.Suspended = strings.Contains(attr, "s")
info.ReadOnly = strings.Contains(attr, "r")
info.TableLive = strings.Contains(attr, "L")
info.TableInactive = strings.Contains(attr, "I")
devices[i] = info
}
return devices, nil
}
// Version returns "dmsetup version" output
func Version() (string, error) {
return dmsetup("version")
}
// DeviceStatus represents devmapper device status information
type DeviceStatus struct {
RawOutput string
Offset int64
Length int64
Target string
Params []string
}
// Status provides status information for devmapper device
func Status(deviceName string) (*DeviceStatus, error) {
var (
err error
status DeviceStatus
)
output, err := dmsetup("status", deviceName)
if err != nil {
return nil, err
}
status.RawOutput = output
// Status output format:
// Offset (int64)
// Length (int64)
// Target type (string)
// Params (Array of strings)
const MinParseCount = 4
parts := strings.Split(output, " ")
if len(parts) < MinParseCount {
return nil, fmt.Errorf("failed to parse output: %q", output)
}
status.Offset, err = strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse offset: %q: %w", parts[0], err)
}
status.Length, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse length: %q: %w", parts[1], err)
}
status.Target = parts[2]
status.Params = parts[3:]
return &status, nil
}
// GetFullDevicePath returns full path for the given device name (like "/dev/mapper/name")
func GetFullDevicePath(deviceName string) string {
if strings.HasPrefix(deviceName, DevMapperDir) {
return deviceName
}
return DevMapperDir + deviceName
}
// BlockDeviceSize returns size of block device in bytes
func BlockDeviceSize(path string) (int64, error) {
f, err := os.Open(path)
if err != nil {
return 0, err
}
defer f.Close()
size, err := f.Seek(0, io.SeekEnd)
if err != nil {
return 0, fmt.Errorf("failed to seek on %q: %w", path, err)
}
return size, nil
}
// DiscardBlocks discards all blocks for the given thin device
//
// ported from https://github.com/moby/moby/blob/7b9275c0da707b030e62c96b679a976f31f929d3/pkg/devicemapper/devmapper.go#L416
func DiscardBlocks(deviceName string) error {
inUse, err := isInUse(deviceName)
if err != nil {
return err
}
if inUse {
return ErrInUse
}
path := GetFullDevicePath(deviceName)
_, err = blkdiscard.BlkDiscard(path)
if err != nil {
return err
}
return nil
}
func dmsetup(args ...string) (string, error) {
data, err := exec.Command("dmsetup", args...).CombinedOutput()
output := string(data)
if err != nil {
// Try find Linux error code otherwise return generic error with dmsetup output
if errno, ok := tryGetUnixError(output); ok {
return "", errno
}
return "", fmt.Errorf("dmsetup %s\nerror: %s\n: %w", strings.Join(args, " "), output, err)
}
output = strings.TrimSuffix(output, "\n")
output = strings.TrimSpace(output)
return output, nil
}
// tryGetUnixError tries to find Linux error code from dmsetup output
func tryGetUnixError(output string) (unix.Errno, bool) {
// It's useful to have Linux error codes like EBUSY, EPERM, ..., instead of just text.
// Unfortunately there is no better way than extracting/comparing error text.
text := parseDmsetupError(output)
if text == "" {
return 0, false
}
err, ok := errTable[text]
return err, ok
}
// dmsetup returns error messages in format:
//
// device-mapper: message ioctl on <name> failed: File exists\n
// Command failed\n
//
// parseDmsetupError extracts text between "failed: " and "\n"
func parseDmsetupError(output string) string {
lines := strings.SplitN(output, "\n", 2)
if len(lines) < 2 {
return ""
}
line := lines[0]
// Handle output like "Device /dev/mapper/snapshotter-suite-pool-snap-1 not found"
if strings.HasSuffix(line, "not found") {
return unix.ENXIO.Error()
}
const failedSubstr = "failed: "
idx := strings.LastIndex(line, failedSubstr)
if idx == -1 {
return ""
}
str := line[idx:]
// Strip "failed: " prefix
str = strings.TrimPrefix(str, failedSubstr)
str = strings.ToLower(str)
return str
}
func isInUse(deviceName string) (bool, error) {
info, err := Info(deviceName)
if err != nil {
return true, err
}
if len(info) != 1 {
return true, errors.New("could not get device info")
}
return info[0].OpenCount != 0, nil
}

View File

@@ -0,0 +1,202 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dmsetup
import (
"os"
"strings"
"testing"
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/pkg/testutil"
"github.com/docker/go-units"
"github.com/stretchr/testify/assert"
"golang.org/x/sys/unix"
)
const (
testPoolName = "test-pool"
testDeviceName = "test-device"
deviceID = 1
snapshotID = 2
)
func TestDMSetup(t *testing.T) {
testutil.RequiresRoot(t)
tempDir := t.TempDir()
dataImage, loopDataDevice := createLoopbackDevice(t, tempDir)
metaImage, loopMetaDevice := createLoopbackDevice(t, tempDir)
defer func() {
err := mount.DetachLoopDevice(loopDataDevice, loopMetaDevice)
assert.Nil(t, err, "failed to detach loop devices for data image: %s and meta image: %s", dataImage, metaImage)
}()
t.Run("CreatePool", func(t *testing.T) {
err := CreatePool(testPoolName, loopDataDevice, loopMetaDevice, 128)
assert.Nil(t, err, "failed to create thin-pool with %s %s", loopDataDevice, loopMetaDevice)
table, err := Table(testPoolName)
t.Logf("table: %s", table)
assert.NoError(t, err)
assert.True(t, strings.HasPrefix(table, "0 32768 thin-pool"))
assert.True(t, strings.HasSuffix(table, "128 32768 1 skip_block_zeroing"))
})
t.Run("ReloadPool", func(t *testing.T) {
err := ReloadPool(testPoolName, loopDataDevice, loopMetaDevice, 256)
assert.Nil(t, err, "failed to reload thin-pool")
})
t.Run("CreateDevice", testCreateDevice)
t.Run("CreateSnapshot", testCreateSnapshot)
t.Run("DeleteSnapshot", testDeleteSnapshot)
t.Run("ActivateDevice", testActivateDevice)
t.Run("DeviceStatus", testDeviceStatus)
t.Run("SuspendResumeDevice", testSuspendResumeDevice)
t.Run("DiscardBlocks", testDiscardBlocks)
t.Run("RemoveDevice", testRemoveDevice)
t.Run("RemovePool", func(t *testing.T) {
err := RemoveDevice(testPoolName, RemoveWithForce, RemoveWithRetries)
assert.Nil(t, err, "failed to remove thin-pool")
})
t.Run("Version", testVersion)
}
func testCreateDevice(t *testing.T) {
err := CreateDevice(testPoolName, deviceID)
assert.Nil(t, err, "failed to create test device")
err = CreateDevice(testPoolName, deviceID)
assert.True(t, err == unix.EEXIST)
infos, err := Info(testPoolName)
assert.NoError(t, err)
assert.Len(t, infos, 1, "got unexpected number of device infos")
}
func testCreateSnapshot(t *testing.T) {
err := CreateSnapshot(testPoolName, snapshotID, deviceID)
assert.NoError(t, err)
}
func testDeleteSnapshot(t *testing.T) {
err := DeleteDevice(testPoolName, snapshotID)
assert.Nil(t, err, "failed to send delete message")
err = DeleteDevice(testPoolName, snapshotID)
assert.Equal(t, err, unix.ENODATA)
}
func testActivateDevice(t *testing.T) {
err := ActivateDevice(testPoolName, testDeviceName, 1, 1024, "")
assert.Nil(t, err, "failed to activate device")
err = ActivateDevice(testPoolName, testDeviceName, 1, 1024, "")
assert.Equal(t, err, unix.EBUSY)
if _, err := os.Stat("/dev/mapper/" + testDeviceName); err != nil && !os.IsExist(err) {
assert.Nil(t, err, "failed to stat device")
}
list, err := Info(testPoolName)
assert.NoError(t, err)
assert.Len(t, list, 1)
info := list[0]
assert.Equal(t, testPoolName, info.Name)
assert.True(t, info.TableLive)
}
func testDeviceStatus(t *testing.T) {
status, err := Status(testDeviceName)
assert.NoError(t, err)
assert.Equal(t, int64(0), status.Offset)
assert.Equal(t, int64(2), status.Length)
assert.Equal(t, "thin", status.Target)
assert.Equal(t, status.Params, []string{"0", "-"})
}
func testSuspendResumeDevice(t *testing.T) {
err := SuspendDevice(testDeviceName)
assert.NoError(t, err)
err = SuspendDevice(testDeviceName)
assert.NoError(t, err)
list, err := Info(testDeviceName)
assert.NoError(t, err)
assert.Len(t, list, 1)
info := list[0]
assert.True(t, info.Suspended)
err = ResumeDevice(testDeviceName)
assert.NoError(t, err)
err = ResumeDevice(testDeviceName)
assert.NoError(t, err)
}
func testDiscardBlocks(t *testing.T) {
err := DiscardBlocks(testDeviceName)
assert.Nil(t, err, "failed to discard blocks")
}
func testRemoveDevice(t *testing.T) {
err := RemoveDevice(testPoolName)
assert.Equal(t, err, unix.EBUSY, "removing thin-pool with dependencies shouldn't be allowed")
err = RemoveDevice(testDeviceName, RemoveWithRetries)
assert.Nil(t, err, "failed to remove thin-device")
}
func testVersion(t *testing.T) {
version, err := Version()
assert.NoError(t, err)
assert.NotEmpty(t, version)
}
func createLoopbackDevice(t *testing.T, dir string) (string, string) {
file, err := os.CreateTemp(dir, "dmsetup-tests-")
assert.NoError(t, err)
size, err := units.RAMInBytes("16Mb")
assert.NoError(t, err)
err = file.Truncate(size)
assert.NoError(t, err)
err = file.Close()
assert.NoError(t, err)
imagePath := file.Name()
loopDevice, err := mount.AttachLoopDevice(imagePath)
assert.NoError(t, err)
return imagePath, loopDevice
}

View File

@@ -0,0 +1,371 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"github.com/containerd/containerd/v2/errdefs"
bolt "go.etcd.io/bbolt"
)
type (
// DeviceInfoCallback is a callback used for device updates
DeviceInfoCallback func(deviceInfo *DeviceInfo) error
)
type deviceIDState byte
const (
deviceFree deviceIDState = iota
deviceTaken
deviceFaulty
)
// Bucket names
var (
devicesBucketName = []byte("devices") // Contains thin devices metadata <device_name>=<DeviceInfo>
deviceIDBucketName = []byte("device_ids") // Tracks used device ids <device_id_[0..maxDeviceID)>=<byte_[0/1]>
)
var (
// ErrNotFound represents an error returned when object not found in meta store
ErrNotFound = errdefs.ErrNotFound
// ErrAlreadyExists represents an error returned when object can't be duplicated in meta store
ErrAlreadyExists = errdefs.ErrAlreadyExists
)
// PoolMetadata keeps device info for the given thin-pool device, generates next available device ids,
// and tracks devmapper transaction numbers
type PoolMetadata struct {
db *bolt.DB
}
// NewPoolMetadata creates new or opens existing pool metadata database
func NewPoolMetadata(dbfile string) (*PoolMetadata, error) {
db, err := bolt.Open(dbfile, 0600, nil)
if err != nil {
return nil, err
}
metadata := &PoolMetadata{db: db}
if err := metadata.ensureDatabaseInitialized(); err != nil {
return nil, fmt.Errorf("failed to initialize database: %w", err)
}
return metadata, nil
}
// ensureDatabaseInitialized creates buckets required for metadata store in order
// to avoid bucket existence checks across the code
func (m *PoolMetadata) ensureDatabaseInitialized() error {
return m.db.Update(func(tx *bolt.Tx) error {
if _, err := tx.CreateBucketIfNotExists(devicesBucketName); err != nil {
return err
}
if _, err := tx.CreateBucketIfNotExists(deviceIDBucketName); err != nil {
return err
}
return nil
})
}
// AddDevice saves device info to database.
func (m *PoolMetadata) AddDevice(ctx context.Context, info *DeviceInfo) error {
err := m.db.Update(func(tx *bolt.Tx) error {
devicesBucket := tx.Bucket(devicesBucketName)
// Make sure device name is unique. If there is already a device with the same name,
// but in Faulty state, give it a try with another devmapper device ID.
// See https://github.com/containerd/containerd/pull/3436 for more context.
var existing DeviceInfo
if err := getObject(devicesBucket, info.Name, &existing); err == nil && existing.State != Faulty {
return fmt.Errorf("device %q is already there %+v: %w", info.Name, existing, ErrAlreadyExists)
}
// Find next available device ID
deviceID, err := getNextDeviceID(tx)
if err != nil {
return err
}
info.DeviceID = deviceID
return putObject(devicesBucket, info.Name, info, true)
})
if err != nil {
return fmt.Errorf("failed to save metadata for device %q (parent: %q): %w", info.Name, info.ParentName, err)
}
return nil
}
// ChangeDeviceState changes the device state given the device name in devices bucket.
func (m *PoolMetadata) ChangeDeviceState(ctx context.Context, name string, state DeviceState) error {
return m.UpdateDevice(ctx, name, func(deviceInfo *DeviceInfo) error {
deviceInfo.State = state
return nil
})
}
// MarkFaulty marks the given device and corresponding devmapper device ID as faulty.
// The snapshotter might attempt to recreate a device in 'Faulty' state with another devmapper ID in
// subsequent calls, and in case of success its status will be changed to 'Created' or 'Activated'.
// The devmapper dev ID will remain in 'deviceFaulty' state until manually handled by a user.
func (m *PoolMetadata) MarkFaulty(ctx context.Context, name string) error {
return m.db.Update(func(tx *bolt.Tx) error {
var (
device = DeviceInfo{}
devBucket = tx.Bucket(devicesBucketName)
)
if err := getObject(devBucket, name, &device); err != nil {
return err
}
device.State = Faulty
if err := putObject(devBucket, name, &device, true); err != nil {
return err
}
return markDeviceID(tx, device.DeviceID, deviceFaulty)
})
}
// getNextDeviceID finds the next free device ID by taking a cursor
// through the deviceIDBucketName bucket and finding the next sequentially
// unassigned ID. Device ID state is marked by a byte deviceFree or
// deviceTaken. Low device IDs will be reused sooner.
func getNextDeviceID(tx *bolt.Tx) (uint32, error) {
bucket := tx.Bucket(deviceIDBucketName)
cursor := bucket.Cursor()
// Check if any device id can be reused.
// Bolt stores its keys in byte-sorted order within a bucket.
// This makes sequential iteration extremely fast.
for key, taken := cursor.First(); key != nil; key, taken = cursor.Next() {
isFree := taken[0] == byte(deviceFree)
if !isFree {
continue
}
parsedID, err := strconv.ParseUint(string(key), 10, 32)
if err != nil {
return 0, err
}
id := uint32(parsedID)
if err := markDeviceID(tx, id, deviceTaken); err != nil {
return 0, err
}
return id, nil
}
// Try allocate new device ID
seq, err := bucket.NextSequence()
if err != nil {
return 0, err
}
if seq >= maxDeviceID {
return 0, errors.New("dm-meta: couldn't find free device key")
}
id := uint32(seq)
if err := markDeviceID(tx, id, deviceTaken); err != nil {
return 0, err
}
return id, nil
}
// markDeviceID marks a device as deviceFree or deviceTaken
func markDeviceID(tx *bolt.Tx, deviceID uint32, state deviceIDState) error {
var (
bucket = tx.Bucket(deviceIDBucketName)
key = strconv.FormatUint(uint64(deviceID), 10)
value = []byte{byte(state)}
)
if err := bucket.Put([]byte(key), value); err != nil {
return fmt.Errorf("failed to free device id %q: %w", key, err)
}
return nil
}
// UpdateDevice updates device info in metadata store.
// The callback should be used to indicate whether device info update was successful or not.
// An error returned from the callback will rollback the update transaction in the database.
// Name and Device ID are not allowed to change.
func (m *PoolMetadata) UpdateDevice(ctx context.Context, name string, fn DeviceInfoCallback) error {
return m.db.Update(func(tx *bolt.Tx) error {
var (
device = &DeviceInfo{}
bucket = tx.Bucket(devicesBucketName)
)
if err := getObject(bucket, name, device); err != nil {
return err
}
// Don't allow changing these values, keep things in sync with devmapper
name := device.Name
devID := device.DeviceID
if err := fn(device); err != nil {
return err
}
if name != device.Name {
return fmt.Errorf("failed to update device info, name didn't match: %q %q", name, device.Name)
}
if devID != device.DeviceID {
return fmt.Errorf("failed to update device info, device id didn't match: %d %d", devID, device.DeviceID)
}
return putObject(bucket, name, device, true)
})
}
// GetDevice retrieves device info by name from database
func (m *PoolMetadata) GetDevice(ctx context.Context, name string) (*DeviceInfo, error) {
var (
dev DeviceInfo
err error
)
err = m.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(devicesBucketName)
return getObject(bucket, name, &dev)
})
return &dev, err
}
// RemoveDevice removes device info from store.
func (m *PoolMetadata) RemoveDevice(ctx context.Context, name string) error {
return m.db.Update(func(tx *bolt.Tx) error {
var (
device = &DeviceInfo{}
bucket = tx.Bucket(devicesBucketName)
)
if err := getObject(bucket, name, device); err != nil {
return err
}
if err := bucket.Delete([]byte(name)); err != nil {
return fmt.Errorf("failed to delete device info for %q: %w", name, err)
}
return markDeviceID(tx, device.DeviceID, deviceFree)
})
}
// WalkDevices walks all devmapper devices in metadata store and invokes the callback with device info.
// The provided callback function must not modify the bucket.
func (m *PoolMetadata) WalkDevices(ctx context.Context, cb func(info *DeviceInfo) error) error {
return m.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(devicesBucketName)
return bucket.ForEach(func(key, value []byte) error {
device := &DeviceInfo{}
if err := json.Unmarshal(value, device); err != nil {
return fmt.Errorf("failed to unmarshal %s: %w", key, err)
}
return cb(device)
})
})
}
// GetDeviceNames retrieves the list of device names currently stored in database
func (m *PoolMetadata) GetDeviceNames(ctx context.Context) ([]string, error) {
var (
names []string
err error
)
err = m.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(devicesBucketName)
return bucket.ForEach(func(k, _ []byte) error {
names = append(names, string(k))
return nil
})
})
if err != nil {
return nil, err
}
return names, nil
}
// Close closes metadata store
func (m *PoolMetadata) Close() error {
if err := m.db.Close(); err != nil && err != bolt.ErrDatabaseNotOpen {
return err
}
return nil
}
func putObject(bucket *bolt.Bucket, key string, obj interface{}, overwrite bool) error {
keyBytes := []byte(key)
if !overwrite && bucket.Get(keyBytes) != nil {
return fmt.Errorf("object with key %q already exists", key)
}
data, err := json.Marshal(obj)
if err != nil {
return fmt.Errorf("failed to marshal object with key %q: %w", key, err)
}
if err := bucket.Put(keyBytes, data); err != nil {
return fmt.Errorf("failed to insert object with key %q: %w", key, err)
}
return nil
}
func getObject(bucket *bolt.Bucket, key string, obj interface{}) error {
data := bucket.Get([]byte(key))
if data == nil {
return ErrNotFound
}
if obj != nil {
if err := json.Unmarshal(data, obj); err != nil {
return fmt.Errorf("failed to unmarshal object with key %q: %w", key, err)
}
}
return nil
}

View File

@@ -0,0 +1,242 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"context"
"errors"
"path/filepath"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"go.etcd.io/bbolt"
)
var (
testCtx = context.Background()
)
func TestPoolMetadata_AddDevice(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
expected := &DeviceInfo{
Name: "test2",
ParentName: "test1",
Size: 1,
State: Activated,
}
err := store.AddDevice(testCtx, expected)
assert.NoError(t, err)
result, err := store.GetDevice(testCtx, "test2")
assert.NoError(t, err)
assert.Equal(t, expected.Name, result.Name)
assert.Equal(t, expected.ParentName, result.ParentName)
assert.Equal(t, expected.Size, result.Size)
assert.Equal(t, expected.State, result.State)
assert.NotZero(t, result.DeviceID, 0)
assert.Equal(t, expected.DeviceID, result.DeviceID)
}
func TestPoolMetadata_AddDeviceRollback(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
err := store.AddDevice(testCtx, &DeviceInfo{Name: ""})
assert.True(t, err != nil)
_, err = store.GetDevice(testCtx, "")
assert.Equal(t, ErrNotFound, err)
}
func TestPoolMetadata_AddDeviceDuplicate(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
err := store.AddDevice(testCtx, &DeviceInfo{Name: "test"})
assert.NoError(t, err)
err = store.AddDevice(testCtx, &DeviceInfo{Name: "test"})
assert.True(t, errors.Is(err, ErrAlreadyExists))
}
func TestPoolMetadata_ReuseDeviceID(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
info1 := &DeviceInfo{Name: "test1"}
err := store.AddDevice(testCtx, info1)
assert.NoError(t, err)
info2 := &DeviceInfo{Name: "test2"}
err = store.AddDevice(testCtx, info2)
assert.NoError(t, err)
assert.NotEqual(t, info1.DeviceID, info2.DeviceID)
assert.NotZero(t, info1.DeviceID)
err = store.RemoveDevice(testCtx, info2.Name)
assert.NoError(t, err)
info3 := &DeviceInfo{Name: "test3"}
err = store.AddDevice(testCtx, info3)
assert.NoError(t, err)
assert.Equal(t, info2.DeviceID, info3.DeviceID)
}
func TestPoolMetadata_RemoveDevice(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
err := store.AddDevice(testCtx, &DeviceInfo{Name: "test"})
assert.NoError(t, err)
err = store.RemoveDevice(testCtx, "test")
assert.NoError(t, err)
_, err = store.GetDevice(testCtx, "test")
assert.Equal(t, ErrNotFound, err)
}
func TestPoolMetadata_UpdateDevice(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
oldInfo := &DeviceInfo{
Name: "test1",
ParentName: "test2",
Size: 3,
State: Activated,
}
err := store.AddDevice(testCtx, oldInfo)
assert.NoError(t, err)
err = store.UpdateDevice(testCtx, oldInfo.Name, func(info *DeviceInfo) error {
info.ParentName = "test5"
info.Size = 6
info.State = Created
return nil
})
assert.NoError(t, err)
newInfo, err := store.GetDevice(testCtx, "test1")
assert.NoError(t, err)
assert.Equal(t, "test1", newInfo.Name)
assert.Equal(t, "test5", newInfo.ParentName)
assert.EqualValues(t, newInfo.Size, 6)
assert.Equal(t, Created, newInfo.State)
}
func TestPoolMetadata_MarkFaulty(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
info := &DeviceInfo{Name: "test"}
err := store.AddDevice(testCtx, info)
assert.NoError(t, err)
err = store.MarkFaulty(testCtx, "test")
assert.NoError(t, err)
saved, err := store.GetDevice(testCtx, info.Name)
assert.NoError(t, err)
assert.Equal(t, saved.State, Faulty)
assert.True(t, saved.DeviceID > 0)
// Make sure a device ID marked as faulty as well
err = store.db.View(func(tx *bbolt.Tx) error {
bucket := tx.Bucket(deviceIDBucketName)
key := strconv.FormatUint(uint64(saved.DeviceID), 10)
value := bucket.Get([]byte(key))
assert.Equal(t, value[0], byte(deviceFaulty))
return nil
})
assert.NoError(t, err)
}
func TestPoolMetadata_WalkDevices(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
err := store.AddDevice(testCtx, &DeviceInfo{Name: "device1", DeviceID: 1, State: Created})
assert.NoError(t, err)
err = store.AddDevice(testCtx, &DeviceInfo{Name: "device2", DeviceID: 2, State: Faulty})
assert.NoError(t, err)
called := 0
err = store.WalkDevices(testCtx, func(info *DeviceInfo) error {
called++
switch called {
case 1:
assert.Equal(t, "device1", info.Name)
assert.Equal(t, uint32(1), info.DeviceID)
assert.Equal(t, Created, info.State)
case 2:
assert.Equal(t, "device2", info.Name)
assert.Equal(t, uint32(2), info.DeviceID)
assert.Equal(t, Faulty, info.State)
default:
t.Error("unexpected walk call")
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, called, 2)
}
func TestPoolMetadata_GetDeviceNames(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
err := store.AddDevice(testCtx, &DeviceInfo{Name: "test1"})
assert.NoError(t, err)
err = store.AddDevice(testCtx, &DeviceInfo{Name: "test2"})
assert.NoError(t, err)
names, err := store.GetDeviceNames(testCtx)
assert.NoError(t, err)
assert.Len(t, names, 2)
assert.Equal(t, "test1", names[0])
assert.Equal(t, "test2", names[1])
}
func createStore(t *testing.T) (store *PoolMetadata) {
path := filepath.Join(t.TempDir(), "test.db")
metadata, err := NewPoolMetadata(path)
assert.NoError(t, err)
return metadata
}
func cleanupStore(t *testing.T, store *PoolMetadata) {
err := store.Close()
assert.Nil(t, err, "failed to close metadata store")
}

View File

@@ -0,0 +1,56 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugin
import (
"errors"
"fmt"
"github.com/containerd/containerd/v2/platforms"
"github.com/containerd/containerd/v2/plugins"
"github.com/containerd/containerd/v2/plugins/snapshots/devmapper"
"github.com/containerd/plugin"
"github.com/containerd/plugin/registry"
)
func init() {
registry.Register(&plugin.Registration{
Type: plugins.SnapshotPlugin,
ID: "devmapper",
Config: &devmapper.Config{},
InitFn: func(ic *plugin.InitContext) (interface{}, error) {
ic.Meta.Platforms = append(ic.Meta.Platforms, platforms.DefaultSpec())
config, ok := ic.Config.(*devmapper.Config)
if !ok {
return nil, errors.New("invalid devmapper configuration")
}
if config.PoolName == "" {
return nil, fmt.Errorf("devmapper not configured: %w", plugin.ErrSkipPlugin)
}
if config.RootPath == "" {
config.RootPath = ic.Properties[plugins.PropertyRootDir]
}
return devmapper.NewSnapshotter(ic.Context, config)
},
})
}

View File

@@ -0,0 +1,579 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"context"
"errors"
"fmt"
"path/filepath"
"strconv"
"time"
"golang.org/x/sys/unix"
blkdiscard "github.com/containerd/containerd/v2/plugins/snapshots/devmapper/blkdiscard"
"github.com/containerd/containerd/v2/plugins/snapshots/devmapper/dmsetup"
"github.com/containerd/log"
)
// PoolDevice ties together data and metadata volumes, represents thin-pool and manages volumes, snapshots and device ids.
type PoolDevice struct {
poolName string
metadata *PoolMetadata
discardBlocks bool
}
// NewPoolDevice creates new thin-pool from existing data and metadata volumes.
// If pool 'poolName' already exists, it'll be reloaded with new parameters.
func NewPoolDevice(ctx context.Context, config *Config) (*PoolDevice, error) {
log.G(ctx).Infof("initializing pool device %q", config.PoolName)
version, err := dmsetup.Version()
if err != nil {
log.G(ctx).Error("dmsetup not available")
return nil, err
}
log.G(ctx).Infof("using dmsetup:\n%s", version)
if config.DiscardBlocks {
blkdiscardVersion, err := blkdiscard.Version()
if err != nil {
log.G(ctx).Error("blkdiscard is not available")
return nil, err
}
log.G(ctx).Infof("using blkdiscard:\n%s", blkdiscardVersion)
}
dbpath := filepath.Join(config.RootPath, config.PoolName+".db")
poolMetaStore, err := NewPoolMetadata(dbpath)
if err != nil {
return nil, err
}
// Make sure pool exists and available
poolPath := dmsetup.GetFullDevicePath(config.PoolName)
if _, err := dmsetup.Info(poolPath); err != nil {
return nil, fmt.Errorf("failed to query pool %q: %w", poolPath, err)
}
poolDevice := &PoolDevice{
poolName: config.PoolName,
metadata: poolMetaStore,
discardBlocks: config.DiscardBlocks,
}
if err := poolDevice.ensureDeviceStates(ctx); err != nil {
return nil, fmt.Errorf("failed to check devices state: %w", err)
}
return poolDevice, nil
}
func skipRetry(err error) bool {
if err == nil {
return true // skip retry if no error
} else if !errors.Is(err, unix.EBUSY) {
return true // skip retry if error is not due to device or resource busy
}
return false
}
func retry(ctx context.Context, f func() error) error {
var (
maxRetries = 100
retryDelay = 100 * time.Millisecond
retryErr error
)
for attempt := 1; attempt <= maxRetries; attempt++ {
retryErr = f()
if skipRetry(retryErr) {
return retryErr
}
// Don't spam logs
if attempt%10 == 0 {
log.G(ctx).WithError(retryErr).Warnf("retrying... (%d of %d)", attempt, maxRetries)
}
// Devmapper device is busy, give it a bit of time and retry removal
time.Sleep(retryDelay)
}
return retryErr
}
// ensureDeviceStates updates devices to their real state:
// - marks devices with incomplete states (after crash) as 'Faulty'
// - activates devices if they are marked as 'Activated' but the dm
// device is not active, which can happen to a stopped container
// after a reboot
func (p *PoolDevice) ensureDeviceStates(ctx context.Context) error {
var faultyDevices []*DeviceInfo
var activatedDevices []*DeviceInfo
if err := p.WalkDevices(ctx, func(info *DeviceInfo) error {
switch info.State {
case Suspended, Resumed, Deactivated, Removed, Faulty:
case Activated:
activatedDevices = append(activatedDevices, info)
default:
faultyDevices = append(faultyDevices, info)
}
return nil
}); err != nil {
return fmt.Errorf("failed to query devices from metastore: %w", err)
}
var result []error
for _, dev := range activatedDevices {
if p.IsActivated(dev.Name) {
continue
}
log.G(ctx).Warnf("devmapper device %q marked as %q but not active, activating it", dev.Name, dev.State)
if err := p.activateDevice(ctx, dev); err != nil {
result = append(result, fmt.Errorf("devmapper: %w", err))
}
}
for _, dev := range faultyDevices {
log.G(ctx).
WithField("dev_id", dev.DeviceID).
WithField("parent", dev.ParentName).
WithField("error", dev.Error).
Warnf("devmapper device %q has invalid state %q, marking as faulty", dev.Name, dev.State)
if err := p.metadata.MarkFaulty(ctx, dev.Name); err != nil {
result = append(result, fmt.Errorf("devmapper: %w", err))
}
}
return errors.Join(result...)
}
// transition invokes 'updateStateFn' callback to perform devmapper operation and reflects device state changes/errors in meta store.
// 'tryingState' will be set before invoking callback. If callback succeeded 'successState' will be set, otherwise
// error details will be recorded in meta store.
func (p *PoolDevice) transition(ctx context.Context, deviceName string, tryingState DeviceState, successState DeviceState, updateStateFn func() error) error {
// Set device to trying state
uerr := p.metadata.UpdateDevice(ctx, deviceName, func(deviceInfo *DeviceInfo) error {
deviceInfo.State = tryingState
return nil
})
if uerr != nil {
return fmt.Errorf("failed to set device %q state to %q: %w", deviceName, tryingState, uerr)
}
var result []error
// Invoke devmapper operation
err := updateStateFn()
if err != nil {
result = append(result, err)
}
// If operation succeeded transition to success state, otherwise save error details
uerr = p.metadata.UpdateDevice(ctx, deviceName, func(deviceInfo *DeviceInfo) error {
if err == nil {
deviceInfo.State = successState
deviceInfo.Error = ""
} else {
deviceInfo.Error = err.Error()
}
return nil
})
if uerr != nil {
result = append(result, uerr)
}
return errors.Join(result...)
}
// CreateThinDevice creates new devmapper thin-device with given name and size.
// Device ID for thin-device will be allocated from metadata store.
// If allocation successful, device will be activated with /dev/mapper/<deviceName>
func (p *PoolDevice) CreateThinDevice(ctx context.Context, deviceName string, virtualSizeBytes uint64) (retErr error) {
info := &DeviceInfo{
Name: deviceName,
Size: virtualSizeBytes,
State: Unknown,
}
var (
metaErr error
devErr error
activeErr error
)
defer func() {
// We've created a devmapper device, but failed to activate it, try rollback everything
if activeErr != nil {
retErr = p.rollbackActivate(ctx, info, activeErr)
return
}
// We're unable to create the devmapper device, most likely something wrong with the deviceID
if devErr != nil {
retErr = errors.Join(retErr, p.metadata.MarkFaulty(ctx, info.Name))
return
}
}()
// Save initial device metadata and allocate new device ID from store
metaErr = p.metadata.AddDevice(ctx, info)
if metaErr != nil {
return metaErr
}
// Create thin device
devErr = p.createDevice(ctx, info)
if devErr != nil {
return devErr
}
// Activate thin device
activeErr = p.activateDevice(ctx, info)
if activeErr != nil {
return activeErr
}
return nil
}
func (p *PoolDevice) rollbackActivate(ctx context.Context, info *DeviceInfo, activateErr error) error {
// Delete the device first.
delErr := p.deleteDevice(ctx, info)
if delErr != nil {
// Failed to rollback, mark the device as faulty and keep metadata in order to
// preserve the faulty device ID
return errors.Join(activateErr, delErr, p.metadata.MarkFaulty(ctx, info.Name))
}
// The devmapper device has been successfully deleted, deallocate device ID
if err := p.RemoveDevice(ctx, info.Name); err != nil {
return errors.Join(activateErr, err)
}
return activateErr
}
// createDevice creates thin device
func (p *PoolDevice) createDevice(ctx context.Context, info *DeviceInfo) error {
if err := p.transition(ctx, info.Name, Creating, Created, func() error {
return dmsetup.CreateDevice(p.poolName, info.DeviceID)
}); err != nil {
return fmt.Errorf("failed to create new thin device %q (dev: %d): %w", info.Name, info.DeviceID, err)
}
return nil
}
// activateDevice activates thin device
func (p *PoolDevice) activateDevice(ctx context.Context, info *DeviceInfo) error {
if err := p.transition(ctx, info.Name, Activating, Activated, func() error {
return dmsetup.ActivateDevice(p.poolName, info.Name, info.DeviceID, info.Size, "")
}); err != nil {
return fmt.Errorf("failed to activate new thin device %q (dev: %d): %w", info.Name, info.DeviceID, err)
}
return nil
}
// CreateSnapshotDevice creates and activates new thin-device from parent thin-device (makes snapshot)
func (p *PoolDevice) CreateSnapshotDevice(ctx context.Context, deviceName string, snapshotName string, virtualSizeBytes uint64) (retErr error) {
baseInfo, err := p.metadata.GetDevice(ctx, deviceName)
if err != nil {
return fmt.Errorf("failed to query device metadata for %q: %w", deviceName, err)
}
snapInfo := &DeviceInfo{
Name: snapshotName,
Size: virtualSizeBytes,
ParentName: deviceName,
State: Unknown,
}
var (
metaErr error
devErr error
activeErr error
)
defer func() {
// We've created a devmapper device, but failed to activate it, try rollback everything
if activeErr != nil {
retErr = p.rollbackActivate(ctx, snapInfo, activeErr)
return
}
// We're unable to create the devmapper device, most likely something wrong with the deviceID
if devErr != nil {
retErr = errors.Join(retErr, p.metadata.MarkFaulty(ctx, snapInfo.Name))
return
}
}()
// The base device must be suspend before taking a snapshot to
// avoid corruption.
// https://github.com/torvalds/linux/blob/v5.7/Documentation/admin-guide/device-mapper/thin-provisioning.rst#internal-snapshots
if p.IsLoaded(deviceName) {
log.G(ctx).Debugf("suspending %q before taking its snapshot", deviceName)
suspendErr := p.SuspendDevice(ctx, deviceName)
if suspendErr != nil {
return suspendErr
}
defer func() {
err := p.ResumeDevice(ctx, deviceName)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to resume base device %q after taking its snapshot", baseInfo.Name)
}
}()
}
// Save snapshot metadata and allocate new device ID
metaErr = p.metadata.AddDevice(ctx, snapInfo)
if metaErr != nil {
return metaErr
}
// Create thin device snapshot
devErr = p.createSnapshot(ctx, baseInfo, snapInfo)
if devErr != nil {
return devErr
}
// Activate the snapshot device
activeErr = p.activateDevice(ctx, snapInfo)
if activeErr != nil {
return activeErr
}
return nil
}
func (p *PoolDevice) createSnapshot(ctx context.Context, baseInfo, snapInfo *DeviceInfo) error {
if err := p.transition(ctx, snapInfo.Name, Creating, Created, func() error {
return dmsetup.CreateSnapshot(p.poolName, snapInfo.DeviceID, baseInfo.DeviceID)
}); err != nil {
return fmt.Errorf(
"failed to create snapshot %q (dev: %d) from %q (dev: %d): %w",
snapInfo.Name,
snapInfo.DeviceID,
baseInfo.Name,
baseInfo.DeviceID, err,
)
}
return nil
}
// SuspendDevice flushes the outstanding IO and blocks the further IO
func (p *PoolDevice) SuspendDevice(ctx context.Context, deviceName string) error {
if err := p.transition(ctx, deviceName, Suspending, Suspended, func() error {
return dmsetup.SuspendDevice(deviceName)
}); err != nil {
return fmt.Errorf("failed to suspend device %q: %w", deviceName, err)
}
return nil
}
// ResumeDevice resumes IO for the given device
func (p *PoolDevice) ResumeDevice(ctx context.Context, deviceName string) error {
if err := p.transition(ctx, deviceName, Resuming, Resumed, func() error {
return dmsetup.ResumeDevice(deviceName)
}); err != nil {
return fmt.Errorf("failed to resume device %q: %w", deviceName, err)
}
return nil
}
// DeactivateDevice deactivates thin device
func (p *PoolDevice) DeactivateDevice(ctx context.Context, deviceName string, deferred, withForce bool) error {
if !p.IsLoaded(deviceName) {
return nil
}
opts := []dmsetup.RemoveDeviceOpt{dmsetup.RemoveWithRetries}
if deferred {
opts = append(opts, dmsetup.RemoveDeferred)
}
if withForce {
opts = append(opts, dmsetup.RemoveWithForce)
}
if err := p.transition(ctx, deviceName, Deactivating, Deactivated, func() error {
return retry(ctx, func() error {
if !deferred && p.discardBlocks {
err := dmsetup.DiscardBlocks(deviceName)
if err != nil {
if err == dmsetup.ErrInUse {
log.G(ctx).Warnf("device %q is in use, skipping blkdiscard", deviceName)
} else {
return err
}
}
}
if err := dmsetup.RemoveDevice(deviceName, opts...); err != nil {
return fmt.Errorf("failed to deactivate device: %w", err)
}
return nil
})
}); err != nil {
return fmt.Errorf("failed to deactivate device %q: %w", deviceName, err)
}
return nil
}
// IsActivated returns true if thin-device is activated
func (p *PoolDevice) IsActivated(deviceName string) bool {
infos, err := dmsetup.Info(deviceName)
if err != nil || len(infos) != 1 {
// Couldn't query device info, device not active
return false
}
if devInfo := infos[0]; devInfo.TableLive {
return true
}
return false
}
// IsLoaded returns true if thin-device is visible for dmsetup
func (p *PoolDevice) IsLoaded(deviceName string) bool {
_, err := dmsetup.Info(deviceName)
return err == nil
}
// GetUsage reports total size in bytes consumed by a thin-device.
// It relies on the number of used blocks reported by 'dmsetup status'.
// The output looks like:
//
// device2: 0 204800 thin 17280 204799
//
// Where 17280 is the number of used sectors
func (p *PoolDevice) GetUsage(deviceName string) (int64, error) {
status, err := dmsetup.Status(deviceName)
if err != nil {
return 0, fmt.Errorf("can't get status for device %q: %w", deviceName, err)
}
if len(status.Params) == 0 {
return 0, errors.New("failed to get the number of used blocks, unexpected output from dmsetup status")
}
count, err := strconv.ParseInt(status.Params[0], 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse status params: %q: %w", status.Params[0], err)
}
return count * dmsetup.SectorSize, nil
}
// RemoveDevice completely wipes out thin device from thin-pool and frees it's device ID
func (p *PoolDevice) RemoveDevice(ctx context.Context, deviceName string) error {
info, err := p.metadata.GetDevice(ctx, deviceName)
if err != nil {
return fmt.Errorf("can't query metadata for device %q: %w", deviceName, err)
}
if err := p.DeactivateDevice(ctx, deviceName, false, true); err != nil {
return err
}
if err := p.deleteDevice(ctx, info); err != nil {
return err
}
// Remove record from meta store and free device ID
if err := p.metadata.RemoveDevice(ctx, deviceName); err != nil {
return fmt.Errorf("can't remove device %q metadata from store after removal: %w", deviceName, err)
}
return nil
}
func (p *PoolDevice) deleteDevice(ctx context.Context, info *DeviceInfo) error {
if err := p.transition(ctx, info.Name, Removing, Removed, func() error {
return retry(ctx, func() error {
// Send 'delete' message to thin-pool
e := dmsetup.DeleteDevice(p.poolName, info.DeviceID)
// Ignores the error if the device has been deleted already.
if e != nil && !errors.Is(e, unix.ENODATA) {
return e
}
return nil
})
}); err != nil {
return fmt.Errorf("failed to delete device %q (dev id: %d): %w", info.Name, info.DeviceID, err)
}
return nil
}
// RemovePool deactivates all child thin-devices and removes thin-pool device
func (p *PoolDevice) RemovePool(ctx context.Context) error {
deviceNames, err := p.metadata.GetDeviceNames(ctx)
if err != nil {
return fmt.Errorf("can't query device names: %w", err)
}
var result []error
// Deactivate devices if any
for _, name := range deviceNames {
if err := p.DeactivateDevice(ctx, name, true, true); err != nil {
result = append(result, fmt.Errorf("failed to remove %q: %w", name, err))
}
}
if err := dmsetup.RemoveDevice(p.poolName, dmsetup.RemoveWithForce, dmsetup.RemoveWithRetries, dmsetup.RemoveDeferred); err != nil {
result = append(result, fmt.Errorf("failed to remove pool %q: %w", p.poolName, err))
}
return errors.Join(result...)
}
// MarkDeviceState changes the device's state in metastore
func (p *PoolDevice) MarkDeviceState(ctx context.Context, name string, state DeviceState) error {
return p.metadata.ChangeDeviceState(ctx, name, state)
}
// WalkDevices iterates all devices in pool metadata
func (p *PoolDevice) WalkDevices(ctx context.Context, cb func(info *DeviceInfo) error) error {
return p.metadata.WalkDevices(ctx, func(info *DeviceInfo) error {
return cb(info)
})
}
// Close closes pool device (thin-pool will not be removed)
func (p *PoolDevice) Close() error {
return p.metadata.Close()
}

View File

@@ -0,0 +1,308 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"testing"
"time"
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/pkg/testutil"
"github.com/containerd/containerd/v2/plugins/snapshots/devmapper/dmsetup"
"github.com/containerd/log"
"github.com/docker/go-units"
"github.com/stretchr/testify/assert"
)
const (
thinDevice1 = "thin-1"
thinDevice2 = "thin-2"
snapDevice1 = "snap-1"
device1Size = 1000000
device2Size = 2000000
testsPrefix = "devmapper-snapshotter-tests-"
)
// TestPoolDevice runs integration tests for pool device.
// The following scenario implemented:
// - Create pool device with name 'test-pool-device'
// - Create two thin volumes 'thin-1' and 'thin-2'
// - Write ext4 file system on 'thin-1' and make sure it'errs moutable
// - Write v1 test file on 'thin-1' volume
// - Take 'thin-1' snapshot 'snap-1'
// - Change v1 file to v2 on 'thin-1'
// - Mount 'snap-1' and make sure test file is v1
// - Unmount volumes and remove all devices
func TestPoolDevice(t *testing.T) {
testutil.RequiresRoot(t)
assert.NoError(t, log.SetLevel("debug"))
ctx := context.Background()
tempDir := t.TempDir()
_, loopDataDevice := createLoopbackDevice(t, tempDir)
_, loopMetaDevice := createLoopbackDevice(t, tempDir)
poolName := fmt.Sprintf("test-pool-device-%d", time.Now().Nanosecond())
err := dmsetup.CreatePool(poolName, loopDataDevice, loopMetaDevice, 64*1024/dmsetup.SectorSize)
assert.Nil(t, err, "failed to create pool %q", poolName)
defer func() {
// Detach loop devices and remove images
err := mount.DetachLoopDevice(loopDataDevice, loopMetaDevice)
assert.NoError(t, err)
}()
config := &Config{
PoolName: poolName,
RootPath: tempDir,
BaseImageSize: "16mb",
BaseImageSizeBytes: 16 * 1024 * 1024,
DiscardBlocks: true,
}
pool, err := NewPoolDevice(ctx, config)
assert.Nil(t, err, "can't create device pool")
assert.True(t, pool != nil)
defer func() {
err := pool.RemovePool(ctx)
assert.Nil(t, err, "can't close device pool")
}()
// Create thin devices
t.Run("CreateThinDevice", func(t *testing.T) {
testCreateThinDevice(t, pool)
})
// Make ext4 filesystem on 'thin-1'
t.Run("MakeFileSystem", func(t *testing.T) {
testMakeFileSystem(t, pool)
})
// Mount 'thin-1' and write v1 test file on 'thin-1' device
err = mount.WithTempMount(ctx, getMounts(thinDevice1), func(thin1MountPath string) error {
// Write v1 test file on 'thin-1' device
thin1TestFilePath := filepath.Join(thin1MountPath, "TEST")
err := os.WriteFile(thin1TestFilePath, []byte("test file (v1)"), 0700)
assert.Nil(t, err, "failed to write test file v1 on '%s' volume", thinDevice1)
return nil
})
// Take snapshot of 'thin-1'
t.Run("CreateSnapshotDevice", func(t *testing.T) {
testCreateSnapshot(t, pool)
})
// Update TEST file on 'thin-1' to v2
err = mount.WithTempMount(ctx, getMounts(thinDevice1), func(thin1MountPath string) error {
thin1TestFilePath := filepath.Join(thin1MountPath, "TEST")
err = os.WriteFile(thin1TestFilePath, []byte("test file (v2)"), 0700)
assert.Nil(t, err, "failed to write test file v2 on 'thin-1' volume after taking snapshot")
return nil
})
assert.NoError(t, err)
// Mount 'snap-1' and make sure TEST file is v1
err = mount.WithTempMount(ctx, getMounts(snapDevice1), func(snap1MountPath string) error {
// Read test file from snapshot device and make sure it's v1
fileData, err := os.ReadFile(filepath.Join(snap1MountPath, "TEST"))
assert.Nil(t, err, "couldn't read test file from '%s' device", snapDevice1)
assert.Equal(t, "test file (v1)", string(fileData), "test file content is invalid on snapshot")
return nil
})
assert.NoError(t, err)
t.Run("DeactivateDevice", func(t *testing.T) {
testDeactivateThinDevice(t, pool)
})
t.Run("RemoveDevice", func(t *testing.T) {
testRemoveThinDevice(t, pool)
})
t.Run("rollbackActivate", func(t *testing.T) {
testCreateThinDevice(t, pool)
ctx := context.Background()
snapDevice := "snap2"
err := pool.CreateSnapshotDevice(ctx, thinDevice1, snapDevice, device1Size)
assert.NoError(t, err)
info, err := pool.metadata.GetDevice(ctx, snapDevice)
assert.NoError(t, err)
// Simulate a case that the device cannot be activated.
err = pool.DeactivateDevice(ctx, info.Name, false, false)
assert.NoError(t, err)
err = pool.rollbackActivate(ctx, info, err)
assert.NoError(t, err)
})
}
func TestPoolDeviceMarkFaulty(t *testing.T) {
store := createStore(t)
defer cleanupStore(t, store)
err := store.AddDevice(testCtx, &DeviceInfo{Name: "1", State: Unknown})
assert.NoError(t, err)
// Note: do not use 'Activated' here because pool.ensureDeviceStates() will
// try to activate the real dm device, which will fail on a faked device.
err = store.AddDevice(testCtx, &DeviceInfo{Name: "2", State: Deactivated})
assert.NoError(t, err)
pool := &PoolDevice{metadata: store}
err = pool.ensureDeviceStates(testCtx)
assert.NoError(t, err)
called := 0
err = pool.metadata.WalkDevices(testCtx, func(info *DeviceInfo) error {
called++
switch called {
case 1:
assert.Equal(t, Faulty, info.State)
assert.Equal(t, "1", info.Name)
case 2:
assert.Equal(t, Deactivated, info.State)
assert.Equal(t, "2", info.Name)
default:
t.Error("unexpected walk call")
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, 2, called)
}
func testCreateThinDevice(t *testing.T, pool *PoolDevice) {
ctx := context.Background()
err := pool.CreateThinDevice(ctx, thinDevice1, device1Size)
assert.Nil(t, err, "can't create first thin device")
err = pool.CreateThinDevice(ctx, thinDevice1, device1Size)
assert.True(t, err != nil, "device pool allows duplicated device names")
err = pool.CreateThinDevice(ctx, thinDevice2, device2Size)
assert.Nil(t, err, "can't create second thin device")
deviceInfo1, err := pool.metadata.GetDevice(ctx, thinDevice1)
assert.NoError(t, err)
deviceInfo2, err := pool.metadata.GetDevice(ctx, thinDevice2)
assert.NoError(t, err)
assert.True(t, deviceInfo1.DeviceID != deviceInfo2.DeviceID, "assigned device ids should be different")
usage, err := pool.GetUsage(thinDevice1)
assert.NoError(t, err)
assert.Equal(t, usage, int64(0))
}
func testMakeFileSystem(t *testing.T, pool *PoolDevice) {
devicePath := dmsetup.GetFullDevicePath(thinDevice1)
args := []string{
devicePath,
"-E",
"nodiscard,lazy_itable_init=0,lazy_journal_init=0",
}
output, err := exec.Command("mkfs.ext4", args...).CombinedOutput()
assert.Nil(t, err, "failed to make filesystem on '%s': %s", thinDevice1, string(output))
usage, err := pool.GetUsage(thinDevice1)
assert.NoError(t, err)
assert.True(t, usage > 0)
}
func testCreateSnapshot(t *testing.T, pool *PoolDevice) {
err := pool.CreateSnapshotDevice(context.Background(), thinDevice1, snapDevice1, device1Size)
assert.Nil(t, err, "failed to create snapshot from '%s' volume", thinDevice1)
}
func testDeactivateThinDevice(t *testing.T, pool *PoolDevice) {
deviceList := []string{
thinDevice2,
snapDevice1,
}
for _, deviceName := range deviceList {
assert.True(t, pool.IsActivated(deviceName))
err := pool.DeactivateDevice(context.Background(), deviceName, false, true)
assert.Nil(t, err, "failed to remove '%s'", deviceName)
assert.False(t, pool.IsActivated(deviceName))
}
}
func testRemoveThinDevice(t *testing.T, pool *PoolDevice) {
err := pool.RemoveDevice(testCtx, thinDevice1)
assert.Nil(t, err, "should delete thin device from pool")
err = pool.RemoveDevice(testCtx, thinDevice2)
assert.Nil(t, err, "should delete thin device from pool")
}
func getMounts(thinDeviceName string) []mount.Mount {
return []mount.Mount{
{
Source: dmsetup.GetFullDevicePath(thinDeviceName),
Type: "ext4",
},
}
}
func createLoopbackDevice(t *testing.T, dir string) (string, string) {
file, err := os.CreateTemp(dir, testsPrefix)
assert.NoError(t, err)
size, err := units.RAMInBytes("128Mb")
assert.NoError(t, err)
err = file.Truncate(size)
assert.NoError(t, err)
err = file.Close()
assert.NoError(t, err)
imagePath := file.Name()
loopDevice, err := mount.AttachLoopDevice(imagePath)
assert.NoError(t, err)
return imagePath, loopDevice
}

View File

@@ -0,0 +1,565 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/errdefs"
"github.com/containerd/containerd/v2/plugins/snapshots/devmapper/dmsetup"
"github.com/containerd/containerd/v2/snapshots"
"github.com/containerd/containerd/v2/snapshots/storage"
"github.com/containerd/log"
)
type fsType string
const (
fsTypeExt4 fsType = "ext4"
fsTypeExt2 fsType = "ext2"
fsTypeXFS fsType = "xfs"
)
const (
metadataFileName = "metadata.db"
devmapperSnapshotFsType = "containerd.io/snapshot/devmapper/fstype"
)
type closeFunc func() error
// Snapshotter implements containerd's snapshotter (https://godoc.org/github.com/containerd/containerd/snapshots#Snapshotter)
// based on Linux device-mapper targets.
type Snapshotter struct {
store *storage.MetaStore
pool *PoolDevice
config *Config
cleanupFn []closeFunc
closeOnce sync.Once
}
// NewSnapshotter creates new device mapper snapshotter.
// Internally it creates thin-pool device (or reloads if it's already exists) and
// initializes a database file for metadata.
func NewSnapshotter(ctx context.Context, config *Config) (*Snapshotter, error) {
// Make sure snapshotter configuration valid before running
if err := config.parse(); err != nil {
return nil, err
}
if err := config.Validate(); err != nil {
return nil, err
}
var cleanupFn []closeFunc
if err := os.MkdirAll(config.RootPath, 0750); err != nil && !os.IsExist(err) {
return nil, fmt.Errorf("failed to create root directory: %s: %w", config.RootPath, err)
}
store, err := storage.NewMetaStore(filepath.Join(config.RootPath, metadataFileName))
if err != nil {
return nil, fmt.Errorf("failed to create metastore: %w", err)
}
cleanupFn = append(cleanupFn, store.Close)
poolDevice, err := NewPoolDevice(ctx, config)
if err != nil {
return nil, err
}
cleanupFn = append(cleanupFn, poolDevice.Close)
return &Snapshotter{
store: store,
config: config,
pool: poolDevice,
cleanupFn: cleanupFn,
}, nil
}
// Stat returns the info for an active or committed snapshot from store
func (s *Snapshotter) Stat(ctx context.Context, key string) (snapshots.Info, error) {
log.G(ctx).WithField("key", key).Debug("stat")
var (
info snapshots.Info
err error
)
err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
_, info, _, err = storage.GetInfo(ctx, key)
return err
})
return info, err
}
// Update updates an existing snapshot info's data
func (s *Snapshotter) Update(ctx context.Context, info snapshots.Info, fieldpaths ...string) (snapshots.Info, error) {
log.G(ctx).Debugf("update: %s", strings.Join(fieldpaths, ", "))
var err error
err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
info, err = storage.UpdateInfo(ctx, info, fieldpaths...)
return err
})
return info, err
}
// Usage returns the resource usage of an active or committed snapshot excluding the usage of parent snapshots.
func (s *Snapshotter) Usage(ctx context.Context, key string) (snapshots.Usage, error) {
log.G(ctx).WithField("key", key).Debug("usage")
var (
id string
err error
info snapshots.Info
usage snapshots.Usage
)
err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
id, info, usage, err = storage.GetInfo(ctx, key)
if err != nil {
return err
}
if info.Kind == snapshots.KindActive {
deviceName := s.getDeviceName(id)
usage.Size, err = s.pool.GetUsage(deviceName)
if err != nil {
return err
}
}
if info.Parent != "" {
// GetInfo returns total number of bytes used by a snapshot (including parent).
// So subtract parent usage in order to get delta consumed by layer itself.
_, _, parentUsage, err := storage.GetInfo(ctx, info.Parent)
if err != nil {
return err
}
usage.Size -= parentUsage.Size
}
return err
})
return usage, err
}
// Mounts return the list of mounts for the active or view snapshot
func (s *Snapshotter) Mounts(ctx context.Context, key string) ([]mount.Mount, error) {
log.G(ctx).WithField("key", key).Debug("mounts")
var (
snap storage.Snapshot
err error
)
err = s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
snap, err = storage.GetSnapshot(ctx, key)
return err
})
snapInfo, err := s.Stat(ctx, key)
if err != nil {
log.G(ctx).WithError(err).Errorf("cannot retrieve snapshot info for key %s", key)
return nil, err
}
return s.buildMounts(ctx, snap, fsType(snapInfo.Labels[devmapperSnapshotFsType])), nil
}
// Prepare creates thin device for an active snapshot identified by key
func (s *Snapshotter) Prepare(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
log.G(ctx).WithFields(log.Fields{"key": key, "parent": parent}).Debug("prepare")
var (
mounts []mount.Mount
err error
)
err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
mounts, err = s.createSnapshot(ctx, snapshots.KindActive, key, parent, opts...)
return err
})
return mounts, err
}
// View creates readonly thin device for the given snapshot key
func (s *Snapshotter) View(ctx context.Context, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
log.G(ctx).WithFields(log.Fields{"key": key, "parent": parent}).Debug("view")
var (
mounts []mount.Mount
err error
)
err = s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
mounts, err = s.createSnapshot(ctx, snapshots.KindView, key, parent, opts...)
return err
})
return mounts, err
}
// Commit marks an active snapshot as committed in meta store.
// Block device unmount operation captures snapshot changes by itself, so no
// additional actions needed within Commit operation.
func (s *Snapshotter) Commit(ctx context.Context, name, key string, opts ...snapshots.Opt) error {
log.G(ctx).WithFields(log.Fields{"name": name, "key": key}).Debug("commit")
return s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
id, snapInfo, _, err := storage.GetInfo(ctx, key)
if err != nil {
return err
}
deviceName := s.getDeviceName(id)
size, err := s.pool.GetUsage(deviceName)
if err != nil {
return err
}
usage := snapshots.Usage{
Size: size,
}
// Add file system type label if present. In case more than one file system
// type is supported file system type from parent will be used for creating
// snapshot.
fsTypeActive := snapInfo.Labels[devmapperSnapshotFsType]
if fsTypeActive != "" {
fsLabel := make(map[string]string)
fsLabel[devmapperSnapshotFsType] = fsTypeActive
opts = append(opts, snapshots.WithLabels(fsLabel))
}
_, err = storage.CommitActive(ctx, key, name, usage, opts...)
if err != nil {
return err
}
// After committed, the snapshot device will not be directly
// used anymore. We'd better deactivate it to make it *invisible*
// in userspace, so that tools like LVM2 and fdisk cannot touch it,
// and avoid useless IOs on it.
//
// Before deactivation, we need to flush the outstanding IO by suspend.
// Afterward, we resume it again to prevent a race window which may cause
// a process IO hang. See the issue below for details:
// (https://github.com/containerd/containerd/issues/4234)
err = s.pool.SuspendDevice(ctx, deviceName)
if err != nil {
return err
}
err = s.pool.ResumeDevice(ctx, deviceName)
if err != nil {
return err
}
return s.pool.DeactivateDevice(ctx, deviceName, true, false)
})
}
// Remove removes thin device and snapshot metadata by key
func (s *Snapshotter) Remove(ctx context.Context, key string) error {
log.G(ctx).WithField("key", key).Debug("remove")
return s.store.WithTransaction(ctx, true, func(ctx context.Context) error {
return s.removeDevice(ctx, key)
})
}
func (s *Snapshotter) removeDevice(ctx context.Context, key string) error {
snapID, _, err := storage.Remove(ctx, key)
if err != nil {
return err
}
deviceName := s.getDeviceName(snapID)
if !s.config.AsyncRemove {
if err := s.pool.RemoveDevice(ctx, deviceName); err != nil {
log.G(ctx).WithError(err).Error("failed to remove device")
// Tell snapshot GC continue to collect other snapshots.
// Otherwise, one snapshot collection failure will stop
// the GC, and all snapshots won't be collected even though
// having no relationship with the failed one.
return errdefs.ErrFailedPrecondition
}
} else {
// The asynchronous cleanup will do the real device remove work.
log.G(ctx).WithField("device", deviceName).Debug("async remove")
if err := s.pool.MarkDeviceState(ctx, deviceName, Removed); err != nil {
log.G(ctx).WithError(err).Error("failed to mark device as removed")
return err
}
}
return nil
}
// Walk iterates through all metadata Info for the stored snapshots and calls the provided function for each.
func (s *Snapshotter) Walk(ctx context.Context, fn snapshots.WalkFunc, fs ...string) error {
log.G(ctx).Debug("walk")
return s.store.WithTransaction(ctx, false, func(ctx context.Context) error {
return storage.WalkInfo(ctx, fn, fs...)
})
}
// ResetPool deactivates and deletes all thin devices in thin-pool.
// Used for cleaning pool after benchmarking.
func (s *Snapshotter) ResetPool(ctx context.Context) error {
names, err := s.pool.metadata.GetDeviceNames(ctx)
if err != nil {
return err
}
var result []error
for _, name := range names {
if err := s.pool.RemoveDevice(ctx, name); err != nil {
result = append(result, err)
}
}
return errors.Join(result...)
}
// Close releases devmapper snapshotter resources.
// All subsequent Close calls will be ignored.
func (s *Snapshotter) Close() error {
log.L.Debug("close")
var result []error
s.closeOnce.Do(func() {
for _, fn := range s.cleanupFn {
if err := fn(); err != nil {
result = append(result, err)
}
}
})
return errors.Join(result...)
}
func (s *Snapshotter) createSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string, opts ...snapshots.Opt) ([]mount.Mount, error) {
var fileSystemType fsType
// For snapshots with no parents, we use file system type as configured in config.
// For snapshots with parents, we inherit the file system type. We use the same
// file system type derived here for building mount points later.
fsLabel := make(map[string]string)
if len(parent) == 0 {
fileSystemType = s.config.FileSystemType
} else {
_, snapInfo, _, err := storage.GetInfo(ctx, parent)
if err != nil {
log.G(ctx).Errorf("failed to read snapshotInfo for %s", parent)
return nil, err
}
fileSystemType = fsType(snapInfo.Labels[devmapperSnapshotFsType])
if fileSystemType == "" {
// For parent snapshots created without label support, we can assume that
// they are ext4 type. Children of parents with no label for fsType will
// now have correct label and committed snapshots from them will carry fs type
// label. TODO: find out if it is better to update the parent's label with
// fsType as ext4.
fileSystemType = fsTypeExt4
}
}
fsLabel[devmapperSnapshotFsType] = string(fileSystemType)
opts = append(opts, snapshots.WithLabels(fsLabel))
snap, err := storage.CreateSnapshot(ctx, kind, key, parent, opts...)
if err != nil {
return nil, err
}
if len(snap.ParentIDs) == 0 {
fsOptions := ""
deviceName := s.getDeviceName(snap.ID)
log.G(ctx).Debugf("creating new thin device '%s'", deviceName)
err := s.pool.CreateThinDevice(ctx, deviceName, s.config.BaseImageSizeBytes)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to create thin device for snapshot %s", snap.ID)
return nil, err
}
if s.config.FileSystemType == fsTypeExt4 && s.config.FsOptions == "" {
// Explicitly disable lazy_itable_init and lazy_journal_init in order to enable lazy initialization.
fsOptions = "nodiscard,lazy_itable_init=0,lazy_journal_init=0"
} else {
fsOptions = s.config.FsOptions
}
log.G(ctx).Debugf("Creating file system of type: %s with options: %s for thin device %q", s.config.FileSystemType, fsOptions, deviceName)
if err := mkfs(ctx, s.config.FileSystemType, fsOptions, dmsetup.GetFullDevicePath(deviceName)); err != nil {
errs := []error{err}
status, sErr := dmsetup.Status(s.pool.poolName)
if sErr != nil {
errs = append(errs, sErr)
}
// Rollback thin device creation if mkfs failed
log.G(ctx).WithError(errors.Join(errs...)).Errorf("failed to initialize thin device %q for snapshot %s pool status %s", deviceName, snap.ID, status.RawOutput)
return nil, errors.Join(append(errs, s.pool.RemoveDevice(ctx, deviceName))...)
}
} else {
parentDeviceName := s.getDeviceName(snap.ParentIDs[0])
snapDeviceName := s.getDeviceName(snap.ID)
log.G(ctx).Debugf("creating snapshot device '%s' from '%s' with fsType: '%s'", snapDeviceName, parentDeviceName, fileSystemType)
err = s.pool.CreateSnapshotDevice(ctx, parentDeviceName, snapDeviceName, s.config.BaseImageSizeBytes)
if err != nil {
log.G(ctx).WithError(err).Errorf("failed to create snapshot device from parent %s", parentDeviceName)
return nil, err
}
}
mounts := s.buildMounts(ctx, snap, fileSystemType)
// Remove default directories not expected by the container image
_ = mount.WithTempMount(ctx, mounts, func(root string) error {
return os.Remove(filepath.Join(root, "lost+found"))
})
return mounts, nil
}
// mkfs creates filesystem on the given devmapper device based on type
// specified in config.
func mkfs(ctx context.Context, fs fsType, fsOptions string, path string) error {
mkfsCommand := ""
var args []string
switch fs {
case fsTypeExt4:
mkfsCommand = "mkfs.ext4"
args = []string{
"-E",
fsOptions,
path,
}
case fsTypeExt2:
mkfsCommand = "mkfs.ext2"
args = []string{
"-E",
fsOptions,
path,
}
case fsTypeXFS:
mkfsCommand = "mkfs.xfs"
args = []string{
path,
}
default:
return errors.New("file system not supported")
}
log.G(ctx).Debugf("%s %s", mkfsCommand, strings.Join(args, " "))
b, err := exec.Command(mkfsCommand, args...).CombinedOutput()
out := string(b)
if err != nil {
return fmt.Errorf("%s couldn't initialize %q: %s: %w", mkfsCommand, path, out, err)
}
log.G(ctx).Debugf("mkfs:\n%s", out)
return nil
}
func (s *Snapshotter) getDeviceName(snapID string) string {
// Add pool name as prefix to avoid collisions with devices from other pools
return fmt.Sprintf("%s-snap-%s", s.config.PoolName, snapID)
}
func (s *Snapshotter) getDevicePath(snap storage.Snapshot) string {
name := s.getDeviceName(snap.ID)
return dmsetup.GetFullDevicePath(name)
}
func (s *Snapshotter) buildMounts(ctx context.Context, snap storage.Snapshot, fileSystemType fsType) []mount.Mount {
var options []string
if fileSystemType == "" {
log.G(ctx).Error("File system type cannot be empty")
return nil
} else if fileSystemType == fsTypeXFS {
options = append(options, "nouuid")
}
if snap.Kind != snapshots.KindActive {
options = append(options, "ro")
}
mounts := []mount.Mount{
{
Source: s.getDevicePath(snap),
Type: string(fileSystemType),
Options: options,
},
}
return mounts
}
// Cleanup cleans up all removed and unused resources
func (s *Snapshotter) Cleanup(ctx context.Context) error {
log.G(ctx).Debug("cleanup")
var removedDevices []*DeviceInfo
if !s.config.AsyncRemove {
return nil
}
if err := s.pool.WalkDevices(ctx, func(info *DeviceInfo) error {
if info.State == Removed {
removedDevices = append(removedDevices, info)
}
return nil
}); err != nil {
log.G(ctx).WithError(err).Error("failed to query devices from metastore")
return err
}
var result []error
for _, dev := range removedDevices {
log.G(ctx).WithField("device", dev.Name).Debug("cleanup device")
if err := s.pool.RemoveDevice(ctx, dev.Name); err != nil {
log.G(ctx).WithField("device", dev.Name).Error("failed to cleanup device")
result = append(result, err)
} else {
log.G(ctx).WithField("device", dev.Name).Debug("cleanuped device")
}
}
return errors.Join(result...)
}

View File

@@ -0,0 +1,213 @@
//go:build linux
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devmapper
import (
"context"
_ "crypto/sha256"
"errors"
"fmt"
"testing"
"time"
"github.com/containerd/continuity/fs/fstest"
"github.com/stretchr/testify/assert"
"github.com/containerd/containerd/v2/core/mount"
"github.com/containerd/containerd/v2/namespaces"
"github.com/containerd/containerd/v2/pkg/testutil"
"github.com/containerd/containerd/v2/plugins/snapshots/devmapper/dmsetup"
"github.com/containerd/containerd/v2/snapshots"
"github.com/containerd/containerd/v2/snapshots/testsuite"
"github.com/containerd/log"
)
func TestSnapshotterSuite(t *testing.T) {
testutil.RequiresRoot(t)
assert.NoError(t, log.SetLevel("debug"))
snapshotterFn := func(ctx context.Context, root string) (snapshots.Snapshotter, func() error, error) {
poolName := fmt.Sprintf("containerd-snapshotter-suite-pool-%d", time.Now().Nanosecond())
config := &Config{
RootPath: root,
PoolName: poolName,
BaseImageSize: "16Mb",
}
return createSnapshotter(ctx, t, config)
}
testsuite.SnapshotterSuite(t, "devmapper", snapshotterFn)
ctx := context.Background()
ctx = namespaces.WithNamespace(ctx, "testsuite")
t.Run("DevMapperUsage", func(t *testing.T) {
snapshotter, closer, err := snapshotterFn(ctx, t.TempDir())
assert.NoError(t, err)
defer closer()
testUsage(t, snapshotter)
})
}
// testUsage tests devmapper's Usage implementation. This is an approximate test as it's hard to
// predict how many blocks will be consumed under different conditions and parameters.
func testUsage(t *testing.T, snapshotter snapshots.Snapshotter) {
ctx := context.Background()
// Create empty base layer
_, err := snapshotter.Prepare(ctx, "prepare-1", "")
assert.NoError(t, err)
emptyLayerUsage, err := snapshotter.Usage(ctx, "prepare-1")
assert.NoError(t, err)
// Should be > 0 as just written file system also consumes blocks
assert.Greater(t, emptyLayerUsage.Size, int64(0))
err = snapshotter.Commit(ctx, "layer-1", "prepare-1")
assert.NoError(t, err)
// Create child layer with 1MB file
var (
sizeBytes int64 = 1048576 // 1MB
baseApplier = fstest.Apply(fstest.CreateRandomFile("/a", 12345679, sizeBytes, 0777))
)
mounts, err := snapshotter.Prepare(ctx, "prepare-2", "layer-1")
assert.NoError(t, err)
err = mount.WithTempMount(ctx, mounts, baseApplier.Apply)
assert.NoError(t, err)
err = snapshotter.Commit(ctx, "layer-2", "prepare-2")
assert.NoError(t, err)
layer2Usage, err := snapshotter.Usage(ctx, "layer-2")
assert.NoError(t, err)
// Should be at least 1 MB + fs metadata
assert.GreaterOrEqual(t, layer2Usage.Size, sizeBytes,
"%d > %d", layer2Usage.Size, sizeBytes)
}
func TestMkfsExt4(t *testing.T) {
ctx := context.Background()
// We test the default setting which is lazy init is disabled
err := mkfs(ctx, "ext4", "nodiscard,lazy_itable_init=0,lazy_journal_init=0", "")
assert.Contains(t, err.Error(), `mkfs.ext4 couldn't initialize ""`)
}
func TestMkfsExt4NonDefault(t *testing.T) {
ctx := context.Background()
// We test a non default setting where we enable lazy init for ext4
err := mkfs(ctx, "ext4", "nodiscard", "")
assert.Contains(t, err.Error(), `mkfs.ext4 couldn't initialize ""`)
}
func TestMkfsXfs(t *testing.T) {
ctx := context.Background()
err := mkfs(ctx, "xfs", "", "")
assert.Contains(t, err.Error(), `mkfs.xfs couldn't initialize ""`)
}
func TestMkfsXfsNonDefault(t *testing.T) {
ctx := context.Background()
err := mkfs(ctx, "xfs", "noquota", "")
assert.Contains(t, err.Error(), `mkfs.xfs couldn't initialize ""`)
}
func TestMultipleXfsMounts(t *testing.T) {
testutil.RequiresRoot(t)
assert.NoError(t, log.SetLevel("debug"))
ctx := context.Background()
ctx = namespaces.WithNamespace(ctx, "testsuite")
poolName := fmt.Sprintf("containerd-snapshotter-suite-pool-%d", time.Now().Nanosecond())
config := &Config{
RootPath: t.TempDir(),
PoolName: poolName,
BaseImageSize: "16Mb",
FileSystemType: "xfs",
}
snapshotter, closer, err := createSnapshotter(ctx, t, config)
assert.NoError(t, err)
defer closer()
var (
sizeBytes int64 = 1048576 // 1MB
baseApplier = fstest.Apply(fstest.CreateRandomFile("/a", 12345679, sizeBytes, 0777))
)
// Create base layer
mounts, err := snapshotter.Prepare(ctx, "prepare-1", "")
assert.NoError(t, err)
root1 := t.TempDir()
defer func() {
mount.UnmountAll(root1, 0)
}()
err = mount.All(mounts, root1)
assert.NoError(t, err)
baseApplier.Apply(root1)
snapshotter.Commit(ctx, "layer-1", "prepare-1")
// Create one child layer
mounts, err = snapshotter.Prepare(ctx, "prepare-2", "layer-1")
assert.NoError(t, err)
root2 := t.TempDir()
defer func() {
mount.UnmountAll(root2, 0)
}()
err = mount.All(mounts, root2)
assert.NoError(t, err)
}
func createSnapshotter(ctx context.Context, t *testing.T, config *Config) (snapshots.Snapshotter, func() error, error) {
// Create loopback devices for each test case
_, loopDataDevice := createLoopbackDevice(t, config.RootPath)
_, loopMetaDevice := createLoopbackDevice(t, config.RootPath)
err := dmsetup.CreatePool(config.PoolName, loopDataDevice, loopMetaDevice, 64*1024/dmsetup.SectorSize)
assert.Nil(t, err, "failed to create pool %q", config.PoolName)
snap, err := NewSnapshotter(ctx, config)
if err != nil {
return nil, nil, err
}
// Remove device mapper pool and detach loop devices after test completes
removePool := func() error {
result := errors.Join(
snap.pool.RemovePool(ctx),
mount.DetachLoopDevice(loopDataDevice, loopMetaDevice))
return result
}
// Pool cleanup should be called before closing metadata store (as we need to retrieve device names)
snap.cleanupFn = append([]closeFunc{removePool}, snap.cleanupFn...)
return snap, snap.Close, nil
}