| @@ -280,6 +280,22 @@ func (m *PoolMetadata) RemoveDevice(ctx context.Context, name string) error { | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // WalkDevices walks all devmapper devices in metadata store and invokes the callback with device info. | ||||||
|  | // The provided callback function must not modify the bucket. | ||||||
|  | func (m *PoolMetadata) WalkDevices(ctx context.Context, cb func(info *DeviceInfo) error) error { | ||||||
|  | 	return m.db.View(func(tx *bolt.Tx) error { | ||||||
|  | 		bucket := tx.Bucket(devicesBucketName) | ||||||
|  | 		return bucket.ForEach(func(key, value []byte) error { | ||||||
|  | 			device := &DeviceInfo{} | ||||||
|  | 			if err := json.Unmarshal(value, device); err != nil { | ||||||
|  | 				return errors.Wrapf(err, "failed to unmarshal %s", key) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			return cb(device) | ||||||
|  | 		}) | ||||||
|  | 	}) | ||||||
|  | } | ||||||
|  |  | ||||||
| // GetDeviceNames retrieves the list of device names currently stored in database | // GetDeviceNames retrieves the list of device names currently stored in database | ||||||
| func (m *PoolMetadata) GetDeviceNames(ctx context.Context) ([]string, error) { | func (m *PoolMetadata) GetDeviceNames(ctx context.Context) ([]string, error) { | ||||||
| 	var ( | 	var ( | ||||||
|   | |||||||
| @@ -181,6 +181,38 @@ func TestPoolMetadata_MarkFaulty(t *testing.T) { | |||||||
| 	assert.NilError(t, err) | 	assert.NilError(t, err) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestPoolMetadata_WalkDevices(t *testing.T) { | ||||||
|  | 	tempDir, store := createStore(t) | ||||||
|  | 	defer cleanupStore(t, tempDir, store) | ||||||
|  |  | ||||||
|  | 	err := store.AddDevice(testCtx, &DeviceInfo{Name: "device1", DeviceID: 1, State: Created}) | ||||||
|  | 	assert.NilError(t, err) | ||||||
|  |  | ||||||
|  | 	err = store.AddDevice(testCtx, &DeviceInfo{Name: "device2", DeviceID: 2, State: Faulty}) | ||||||
|  | 	assert.NilError(t, err) | ||||||
|  |  | ||||||
|  | 	called := 0 | ||||||
|  | 	err = store.WalkDevices(testCtx, func(info *DeviceInfo) error { | ||||||
|  | 		called++ | ||||||
|  | 		switch called { | ||||||
|  | 		case 1: | ||||||
|  | 			assert.Equal(t, "device1", info.Name) | ||||||
|  | 			assert.Equal(t, uint32(1), info.DeviceID) | ||||||
|  | 			assert.Equal(t, Created, info.State) | ||||||
|  | 		case 2: | ||||||
|  | 			assert.Equal(t, "device2", info.Name) | ||||||
|  | 			assert.Equal(t, uint32(2), info.DeviceID) | ||||||
|  | 			assert.Equal(t, Faulty, info.State) | ||||||
|  | 		default: | ||||||
|  | 			t.Error("unexpected walk call") | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
|  | 	assert.NilError(t, err) | ||||||
|  | 	assert.Equal(t, called, 2) | ||||||
|  | } | ||||||
|  |  | ||||||
| func TestPoolMetadata_GetDeviceNames(t *testing.T) { | func TestPoolMetadata_GetDeviceNames(t *testing.T) { | ||||||
| 	tempDir, store := createStore(t) | 	tempDir, store := createStore(t) | ||||||
| 	defer cleanupStore(t, tempDir, store) | 	defer cleanupStore(t, tempDir, store) | ||||||
|   | |||||||
| @@ -61,10 +61,47 @@ func NewPoolDevice(ctx context.Context, config *Config) (*PoolDevice, error) { | |||||||
| 		return nil, errors.Wrapf(err, "failed to query pool %q", poolPath) | 		return nil, errors.Wrapf(err, "failed to query pool %q", poolPath) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return &PoolDevice{ | 	poolDevice := &PoolDevice{ | ||||||
| 		poolName: config.PoolName, | 		poolName: config.PoolName, | ||||||
| 		metadata: poolMetaStore, | 		metadata: poolMetaStore, | ||||||
| 	}, nil | 	} | ||||||
|  |  | ||||||
|  | 	if err := poolDevice.ensureDeviceStates(ctx); err != nil { | ||||||
|  | 		return nil, errors.Wrap(err, "failed to check devices state") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return poolDevice, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // ensureDeviceStates marks devices with incomplete states (after crash) as 'Faulty' | ||||||
|  | func (p *PoolDevice) ensureDeviceStates(ctx context.Context) error { | ||||||
|  | 	var devices []*DeviceInfo | ||||||
|  |  | ||||||
|  | 	if err := p.metadata.WalkDevices(ctx, func(info *DeviceInfo) error { | ||||||
|  | 		switch info.State { | ||||||
|  | 		case Activated, Suspended, Resumed, Deactivated, Removed, Faulty: | ||||||
|  | 			return nil | ||||||
|  | 		} | ||||||
|  | 		devices = append(devices, info) | ||||||
|  | 		return nil | ||||||
|  | 	}); err != nil { | ||||||
|  | 		return errors.Wrap(err, "failed to query devices from metastore") | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var result *multierror.Error | ||||||
|  | 	for _, dev := range devices { | ||||||
|  | 		log.G(ctx). | ||||||
|  | 			WithField("dev_id", dev.DeviceID). | ||||||
|  | 			WithField("parent", dev.ParentName). | ||||||
|  | 			WithField("error", dev.Error). | ||||||
|  | 			Warnf("devmapper device %q has invalid state %q, marking as faulty", dev.Name, dev.State) | ||||||
|  |  | ||||||
|  | 		if err := p.metadata.MarkFaulty(ctx, dev.Name); err != nil { | ||||||
|  | 			result = multierror.Append(result, err) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return multierror.Prefix(result.ErrorOrNil(), "devmapper:") | ||||||
| } | } | ||||||
|  |  | ||||||
| // transition invokes 'updateStateFn' callback to perform devmapper operation and reflects device state changes/errors in meta store. | // transition invokes 'updateStateFn' callback to perform devmapper operation and reflects device state changes/errors in meta store. | ||||||
|   | |||||||
| @@ -154,6 +154,41 @@ func TestPoolDevice(t *testing.T) { | |||||||
| 	}) | 	}) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestPoolDeviceMarkFaulty(t *testing.T) { | ||||||
|  | 	tempDir, store := createStore(t) | ||||||
|  | 	defer cleanupStore(t, tempDir, store) | ||||||
|  |  | ||||||
|  | 	err := store.AddDevice(testCtx, &DeviceInfo{Name: "1", State: Unknown}) | ||||||
|  | 	assert.NilError(t, err) | ||||||
|  |  | ||||||
|  | 	err = store.AddDevice(testCtx, &DeviceInfo{Name: "2", State: Activated}) | ||||||
|  | 	assert.NilError(t, err) | ||||||
|  |  | ||||||
|  | 	pool := &PoolDevice{metadata: store} | ||||||
|  | 	err = pool.ensureDeviceStates(testCtx) | ||||||
|  | 	assert.NilError(t, err) | ||||||
|  |  | ||||||
|  | 	called := 0 | ||||||
|  | 	err = pool.metadata.WalkDevices(testCtx, func(info *DeviceInfo) error { | ||||||
|  | 		called++ | ||||||
|  |  | ||||||
|  | 		switch called { | ||||||
|  | 		case 1: | ||||||
|  | 			assert.Equal(t, Faulty, info.State) | ||||||
|  | 			assert.Equal(t, "1", info.Name) | ||||||
|  | 		case 2: | ||||||
|  | 			assert.Equal(t, Activated, info.State) | ||||||
|  | 			assert.Equal(t, "2", info.Name) | ||||||
|  | 		default: | ||||||
|  | 			t.Error("unexpected walk call") | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		return nil | ||||||
|  | 	}) | ||||||
|  | 	assert.NilError(t, err) | ||||||
|  | 	assert.Equal(t, 2, called) | ||||||
|  | } | ||||||
|  |  | ||||||
| func testCreateThinDevice(t *testing.T, pool *PoolDevice) { | func testCreateThinDevice(t *testing.T, pool *PoolDevice) { | ||||||
| 	ctx := context.Background() | 	ctx := context.Background() | ||||||
|  |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Maksym Pavlenko
					Maksym Pavlenko