 bbe46b8c43
			
		
	
	bbe46b8c43
	
	
	
		
			
			Signed-off-by: haoyun <yun.hao@daocloud.io> Co-authored-by: zounengren <zouyee1989@gmail.com>
		
			
				
	
	
		
			372 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			372 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| //go:build linux
 | |
| // +build linux
 | |
| 
 | |
| /*
 | |
|    Copyright The containerd Authors.
 | |
| 
 | |
|    Licensed under the Apache License, Version 2.0 (the "License");
 | |
|    you may not use this file except in compliance with the License.
 | |
|    You may obtain a copy of the License at
 | |
| 
 | |
|        http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
|    Unless required by applicable law or agreed to in writing, software
 | |
|    distributed under the License is distributed on an "AS IS" BASIS,
 | |
|    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
|    See the License for the specific language governing permissions and
 | |
|    limitations under the License.
 | |
| */
 | |
| 
 | |
| package devmapper
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"errors"
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 
 | |
| 	bolt "go.etcd.io/bbolt"
 | |
| )
 | |
| 
 | |
| type (
 | |
| 	// DeviceInfoCallback is a callback used for device updates
 | |
| 	DeviceInfoCallback func(deviceInfo *DeviceInfo) error
 | |
| )
 | |
| 
 | |
| type deviceIDState byte
 | |
| 
 | |
| const (
 | |
| 	deviceFree deviceIDState = iota
 | |
| 	deviceTaken
 | |
| 	deviceFaulty
 | |
| )
 | |
| 
 | |
| // Bucket names
 | |
| var (
 | |
| 	devicesBucketName  = []byte("devices")    // Contains thin devices metadata <device_name>=<DeviceInfo>
 | |
| 	deviceIDBucketName = []byte("device_ids") // Tracks used device ids <device_id_[0..maxDeviceID)>=<byte_[0/1]>
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	// ErrNotFound represents an error returned when object not found in meta store
 | |
| 	ErrNotFound = errors.New("not found")
 | |
| 	// ErrAlreadyExists represents an error returned when object can't be duplicated in meta store
 | |
| 	ErrAlreadyExists = errors.New("object already exists")
 | |
| )
 | |
| 
 | |
| // PoolMetadata keeps device info for the given thin-pool device, it also responsible for
 | |
| // generating next available device ids and tracking devmapper transaction numbers
 | |
| type PoolMetadata struct {
 | |
| 	db *bolt.DB
 | |
| }
 | |
| 
 | |
| // NewPoolMetadata creates new or open existing pool metadata database
 | |
| func NewPoolMetadata(dbfile string) (*PoolMetadata, error) {
 | |
| 	db, err := bolt.Open(dbfile, 0600, nil)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	metadata := &PoolMetadata{db: db}
 | |
| 	if err := metadata.ensureDatabaseInitialized(); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to initialize database: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return metadata, nil
 | |
| }
 | |
| 
 | |
| // ensureDatabaseInitialized creates buckets required for metadata store in order
 | |
| // to avoid bucket existence checks across the code
 | |
| func (m *PoolMetadata) ensureDatabaseInitialized() error {
 | |
| 	return m.db.Update(func(tx *bolt.Tx) error {
 | |
| 		if _, err := tx.CreateBucketIfNotExists(devicesBucketName); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if _, err := tx.CreateBucketIfNotExists(deviceIDBucketName); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // AddDevice saves device info to database.
 | |
| func (m *PoolMetadata) AddDevice(ctx context.Context, info *DeviceInfo) error {
 | |
| 	err := m.db.Update(func(tx *bolt.Tx) error {
 | |
| 		devicesBucket := tx.Bucket(devicesBucketName)
 | |
| 
 | |
| 		// Make sure device name is unique. If there is already a device with the same name,
 | |
| 		// but in Faulty state, give it a try with another devmapper device ID.
 | |
| 		// See https://github.com/containerd/containerd/pull/3436 for more context.
 | |
| 		var existing DeviceInfo
 | |
| 		if err := getObject(devicesBucket, info.Name, &existing); err == nil && existing.State != Faulty {
 | |
| 			return fmt.Errorf("device %q is already there %+v: %w", info.Name, existing, ErrAlreadyExists)
 | |
| 		}
 | |
| 
 | |
| 		// Find next available device ID
 | |
| 		deviceID, err := getNextDeviceID(tx)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		info.DeviceID = deviceID
 | |
| 
 | |
| 		return putObject(devicesBucket, info.Name, info, true)
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to save metadata for device %q (parent: %q): %w", info.Name, info.ParentName, err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ChangeDeviceState changes the device state given the device name in devices bucket.
 | |
| func (m *PoolMetadata) ChangeDeviceState(ctx context.Context, name string, state DeviceState) error {
 | |
| 	return m.UpdateDevice(ctx, name, func(deviceInfo *DeviceInfo) error {
 | |
| 		deviceInfo.State = state
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // MarkFaulty marks the given device and corresponding devmapper device ID as faulty.
 | |
| // The snapshotter might attempt to recreate a device in 'Faulty' state with another devmapper ID in
 | |
| // subsequent calls, and in case of success it's status will be changed to 'Created' or 'Activated'.
 | |
| // The devmapper dev ID will remain in 'deviceFaulty' state until manually handled by a user.
 | |
| func (m *PoolMetadata) MarkFaulty(ctx context.Context, name string) error {
 | |
| 	return m.db.Update(func(tx *bolt.Tx) error {
 | |
| 		var (
 | |
| 			device    = DeviceInfo{}
 | |
| 			devBucket = tx.Bucket(devicesBucketName)
 | |
| 		)
 | |
| 
 | |
| 		if err := getObject(devBucket, name, &device); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		device.State = Faulty
 | |
| 
 | |
| 		if err := putObject(devBucket, name, &device, true); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		return markDeviceID(tx, device.DeviceID, deviceFaulty)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // getNextDeviceID finds the next free device ID by taking a cursor
 | |
| // through the deviceIDBucketName bucket and finding the next sequentially
 | |
| // unassigned ID. Device ID state is marked by a byte deviceFree or
 | |
| // deviceTaken. Low device IDs will be reused sooner.
 | |
| func getNextDeviceID(tx *bolt.Tx) (uint32, error) {
 | |
| 	bucket := tx.Bucket(deviceIDBucketName)
 | |
| 	cursor := bucket.Cursor()
 | |
| 
 | |
| 	// Check if any device id can be reused.
 | |
| 	// Bolt stores its keys in byte-sorted order within a bucket.
 | |
| 	// This makes sequential iteration extremely fast.
 | |
| 	for key, taken := cursor.First(); key != nil; key, taken = cursor.Next() {
 | |
| 		isFree := taken[0] == byte(deviceFree)
 | |
| 		if !isFree {
 | |
| 			continue
 | |
| 		}
 | |
| 
 | |
| 		parsedID, err := strconv.ParseUint(string(key), 10, 32)
 | |
| 		if err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 
 | |
| 		id := uint32(parsedID)
 | |
| 		if err := markDeviceID(tx, id, deviceTaken); err != nil {
 | |
| 			return 0, err
 | |
| 		}
 | |
| 
 | |
| 		return id, nil
 | |
| 	}
 | |
| 
 | |
| 	// Try allocate new device ID
 | |
| 	seq, err := bucket.NextSequence()
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	if seq >= maxDeviceID {
 | |
| 		return 0, errors.New("dm-meta: couldn't find free device key")
 | |
| 	}
 | |
| 
 | |
| 	id := uint32(seq)
 | |
| 	if err := markDeviceID(tx, id, deviceTaken); err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 
 | |
| 	return id, nil
 | |
| }
 | |
| 
 | |
| // markDeviceID marks a device as deviceFree or deviceTaken
 | |
| func markDeviceID(tx *bolt.Tx, deviceID uint32, state deviceIDState) error {
 | |
| 	var (
 | |
| 		bucket = tx.Bucket(deviceIDBucketName)
 | |
| 		key    = strconv.FormatUint(uint64(deviceID), 10)
 | |
| 		value  = []byte{byte(state)}
 | |
| 	)
 | |
| 
 | |
| 	if err := bucket.Put([]byte(key), value); err != nil {
 | |
| 		return fmt.Errorf("failed to free device id %q: %w", key, err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // UpdateDevice updates device info in metadata store.
 | |
| // The callback should be used to indicate whether device info update was successful or not.
 | |
| // An error returned from the callback will rollback the update transaction in the database.
 | |
| // Name and Device ID are not allowed to change.
 | |
| func (m *PoolMetadata) UpdateDevice(ctx context.Context, name string, fn DeviceInfoCallback) error {
 | |
| 	return m.db.Update(func(tx *bolt.Tx) error {
 | |
| 		var (
 | |
| 			device = &DeviceInfo{}
 | |
| 			bucket = tx.Bucket(devicesBucketName)
 | |
| 		)
 | |
| 
 | |
| 		if err := getObject(bucket, name, device); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		// Don't allow changing these values, keep things in sync with devmapper
 | |
| 		name := device.Name
 | |
| 		devID := device.DeviceID
 | |
| 
 | |
| 		if err := fn(device); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if name != device.Name {
 | |
| 			return fmt.Errorf("failed to update device info, name didn't match: %q %q", name, device.Name)
 | |
| 		}
 | |
| 
 | |
| 		if devID != device.DeviceID {
 | |
| 			return fmt.Errorf("failed to update device info, device id didn't match: %d %d", devID, device.DeviceID)
 | |
| 		}
 | |
| 
 | |
| 		return putObject(bucket, name, device, true)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // GetDevice retrieves device info by name from database
 | |
| func (m *PoolMetadata) GetDevice(ctx context.Context, name string) (*DeviceInfo, error) {
 | |
| 	var (
 | |
| 		dev DeviceInfo
 | |
| 		err error
 | |
| 	)
 | |
| 
 | |
| 	err = m.db.View(func(tx *bolt.Tx) error {
 | |
| 		bucket := tx.Bucket(devicesBucketName)
 | |
| 		return getObject(bucket, name, &dev)
 | |
| 	})
 | |
| 
 | |
| 	return &dev, err
 | |
| }
 | |
| 
 | |
| // RemoveDevice removes device info from store.
 | |
| func (m *PoolMetadata) RemoveDevice(ctx context.Context, name string) error {
 | |
| 	return m.db.Update(func(tx *bolt.Tx) error {
 | |
| 		var (
 | |
| 			device = &DeviceInfo{}
 | |
| 			bucket = tx.Bucket(devicesBucketName)
 | |
| 		)
 | |
| 
 | |
| 		if err := getObject(bucket, name, device); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		if err := bucket.Delete([]byte(name)); err != nil {
 | |
| 			return fmt.Errorf("failed to delete device info for %q: %w", name, err)
 | |
| 		}
 | |
| 
 | |
| 		return markDeviceID(tx, device.DeviceID, deviceFree)
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // WalkDevices walks all devmapper devices in metadata store and invokes the callback with device info.
 | |
| // The provided callback function must not modify the bucket.
 | |
| func (m *PoolMetadata) WalkDevices(ctx context.Context, cb func(info *DeviceInfo) error) error {
 | |
| 	return m.db.View(func(tx *bolt.Tx) error {
 | |
| 		bucket := tx.Bucket(devicesBucketName)
 | |
| 		return bucket.ForEach(func(key, value []byte) error {
 | |
| 			device := &DeviceInfo{}
 | |
| 			if err := json.Unmarshal(value, device); err != nil {
 | |
| 				return fmt.Errorf("failed to unmarshal %s: %w", key, err)
 | |
| 			}
 | |
| 
 | |
| 			return cb(device)
 | |
| 		})
 | |
| 	})
 | |
| }
 | |
| 
 | |
| // GetDeviceNames retrieves the list of device names currently stored in database
 | |
| func (m *PoolMetadata) GetDeviceNames(ctx context.Context) ([]string, error) {
 | |
| 	var (
 | |
| 		names []string
 | |
| 		err   error
 | |
| 	)
 | |
| 
 | |
| 	err = m.db.View(func(tx *bolt.Tx) error {
 | |
| 		bucket := tx.Bucket(devicesBucketName)
 | |
| 		return bucket.ForEach(func(k, _ []byte) error {
 | |
| 			names = append(names, string(k))
 | |
| 			return nil
 | |
| 		})
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return names, nil
 | |
| }
 | |
| 
 | |
| // Close closes metadata store
 | |
| func (m *PoolMetadata) Close() error {
 | |
| 	if err := m.db.Close(); err != nil && err != bolt.ErrDatabaseNotOpen {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func putObject(bucket *bolt.Bucket, key string, obj interface{}, overwrite bool) error {
 | |
| 	keyBytes := []byte(key)
 | |
| 
 | |
| 	if !overwrite && bucket.Get(keyBytes) != nil {
 | |
| 		return fmt.Errorf("object with key %q already exists", key)
 | |
| 	}
 | |
| 
 | |
| 	data, err := json.Marshal(obj)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("failed to marshal object with key %q: %w", key, err)
 | |
| 	}
 | |
| 
 | |
| 	if err := bucket.Put(keyBytes, data); err != nil {
 | |
| 		return fmt.Errorf("failed to insert object with key %q: %w", key, err)
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func getObject(bucket *bolt.Bucket, key string, obj interface{}) error {
 | |
| 	data := bucket.Get([]byte(key))
 | |
| 	if data == nil {
 | |
| 		return ErrNotFound
 | |
| 	}
 | |
| 
 | |
| 	if obj != nil {
 | |
| 		if err := json.Unmarshal(data, obj); err != nil {
 | |
| 			return fmt.Errorf("failed to unmarshal object with key %q: %w", key, err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |