pyocf: Add tests for detaching core

Signed-off-by: Michal Mielewczyk <michal.mielewczyk@huawei.com>
This commit is contained in:
Michal Mielewczyk 2025-03-26 11:36:15 +01:00
parent 047b276fc8
commit baccc5560b

View File

@ -1,6 +1,6 @@
# #
# Copyright(c) 2019-2022 Intel Corporation # Copyright(c) 2019-2022 Intel Corporation
# Copyright(c) 2024 Huawei Technologies # Copyright(c) 2024-2025 Huawei Technologies
# SPDX-License-Identifier: BSD-3-Clause # SPDX-License-Identifier: BSD-3-Clause
# #
@ -8,7 +8,7 @@ import pytest
from ctypes import c_int from ctypes import c_int
from random import randint from random import randint
from pyocf.types.cache import Cache, CacheMode from pyocf.types.cache import Cache, CacheMode, CleaningPolicy, PromotionPolicy
from pyocf.types.core import Core from pyocf.types.core import Core
from pyocf.types.volume import RamVolume, Volume from pyocf.types.volume import RamVolume, Volume
from pyocf.types.volume_core import CoreVolume from pyocf.types.volume_core import CoreVolume
@ -16,7 +16,7 @@ from pyocf.types.data import Data
from pyocf.types.io import IoDir, Sync from pyocf.types.io import IoDir, Sync
from pyocf.types.queue import Queue from pyocf.types.queue import Queue
from pyocf.utils import Size as S from pyocf.utils import Size as S
from pyocf.types.shared import OcfError, OcfCompletion, CacheLineSize from pyocf.types.shared import OcfError, OcfErrorCode, OcfCompletion, CacheLineSize
@pytest.mark.parametrize("cache_mode", CacheMode) @pytest.mark.parametrize("cache_mode", CacheMode)
@ -89,6 +89,190 @@ def test_remove_dirty_no_flush(pyocf_ctx, cache_mode, cls):
cache.remove_core(core) cache.remove_core(core)
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.parametrize("promotion_policy", PromotionPolicy)
def test_detach_core_detach_cache_cleaning(pyocf_ctx, cleaning_policy, promotion_policy):
cache_device = RamVolume(S.from_MiB(100))
core_device_1 = RamVolume(S.from_MiB(10))
core_device_2 = RamVolume(S.from_MiB(10))
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WB)
core_1 = Core.using_device(core_device_1, name="core_1")
core_2 = Core.using_device(core_device_2, name="core_2")
cache.add_core(core_1)
cache.add_core(core_2)
cache.set_cleaning_policy(cleaning_policy)
cache.set_promotion_policy(promotion_policy)
core_1.detach()
with pytest.raises(OcfError, match="OCF_ERR_CACHE_IN_INCOMPLETE_STATE"):
cache.detach_device()
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.parametrize("promotion_policy", PromotionPolicy)
def test_detach_core_stop_cache_cleaning(pyocf_ctx, cleaning_policy, promotion_policy):
cache_device = RamVolume(S.from_MiB(100))
core_device_1 = RamVolume(S.from_MiB(10))
core_device_2 = RamVolume(S.from_MiB(10))
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WB)
core_1 = Core.using_device(core_device_1, name="core_1")
core_2 = Core.using_device(core_device_2, name="core_2")
cache.add_core(core_1)
cache.add_core(core_2)
cache.set_cleaning_policy(cleaning_policy)
cache.set_promotion_policy(promotion_policy)
core_1.detach()
cache.stop()
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.parametrize("promotion_policy", PromotionPolicy)
def test_detach_cache_detach_core_cleaning(pyocf_ctx, cleaning_policy, promotion_policy):
cache_device = RamVolume(S.from_MiB(100))
core_device_1 = RamVolume(S.from_MiB(10))
core_device_2 = RamVolume(S.from_MiB(10))
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WB)
core_1 = Core.using_device(core_device_1, name="core_1")
core_2 = Core.using_device(core_device_2, name="core_2")
cache.add_core(core_1)
cache.add_core(core_2)
cache.set_cleaning_policy(cleaning_policy)
cache.set_promotion_policy(promotion_policy)
for core in [core_1, core_2]:
vol = CoreVolume(core)
queue = core.cache.get_default_queue()
core_size = core.get_stats()["size"]
data = Data(core_size.B)
_io_to_core(vol, queue, data)
core_1.detach()
cache.stop()
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.parametrize("promotion_policy", PromotionPolicy)
def test_detach_cache_retach_core_cleaning(pyocf_ctx, cleaning_policy, promotion_policy):
cache_device = RamVolume(S.from_MiB(100))
core_device_1 = RamVolume(S.from_MiB(10))
core_device_2 = RamVolume(S.from_MiB(10))
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WB)
core_1 = Core.using_device(core_device_1, name="core_1")
core_2 = Core.using_device(core_device_2, name="core_2")
def _write_cores(cores_list):
for core in cores_list:
vol = CoreVolume(core)
queue = core.cache.get_default_queue()
core_size = core.get_stats()["size"]
data = Data(core_size.B)
_io_to_core(vol, queue, data)
cache.add_core(core_1)
cache.add_core(core_2)
cache.set_cleaning_policy(cleaning_policy)
cache.set_promotion_policy(promotion_policy)
cache.detach_device()
core_1.detach()
_write_cores([core_2])
cache.attach_device(cache_device)
_write_cores([core_2])
cache.add_core(core_1, try_add=True)
_write_cores([core_1, core_2])
cache.stop()
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.parametrize("promotion_policy", PromotionPolicy)
def test_reattach_cache_reattach_core_cleaning(pyocf_ctx, cleaning_policy, promotion_policy):
cache_device = RamVolume(S.from_MiB(100))
core_device_1 = RamVolume(S.from_MiB(10))
core_device_2 = RamVolume(S.from_MiB(10))
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WB)
core_1 = Core.using_device(core_device_1, name="core_1")
core_2 = Core.using_device(core_device_2, name="core_2")
def _write_cores(cores_list):
for core in cores_list:
vol = CoreVolume(core)
queue = core.cache.get_default_queue()
core_size = core.get_stats()["size"]
data = Data(core_size.B)
_io_to_core(vol, queue, data)
cache.add_core(core_1)
cache.add_core(core_2)
cache.set_cleaning_policy(cleaning_policy)
cache.set_promotion_policy(promotion_policy)
cache.detach_device()
core_1.detach()
cache.add_core(core_1, try_add=True)
cache.attach_device(cache_device)
_write_cores([core_1, core_2])
cache.stop()
@pytest.mark.parametrize("cleaning_policy", CleaningPolicy)
@pytest.mark.parametrize("promotion_policy", PromotionPolicy)
def test_detach_cache_detach_core_load_cleaning(pyocf_ctx, cleaning_policy, promotion_policy):
cache_device = RamVolume(S.from_MiB(100))
core_device_1 = RamVolume(S.from_MiB(10))
core_device_2 = RamVolume(S.from_MiB(10))
cache = Cache.start_on_device(cache_device, cache_mode=CacheMode.WB)
core_1 = Core.using_device(core_device_1, name="core_1")
core_2 = Core.using_device(core_device_2, name="core_2")
cache.add_core(core_1)
cache.add_core(core_2)
cache.set_cleaning_policy(cleaning_policy)
cache.set_promotion_policy(promotion_policy)
core_1.detach()
cache.stop()
cache = Cache.load_from_device(cache_device)
def test_30add_remove(pyocf_ctx): def test_30add_remove(pyocf_ctx):
# Start cache device # Start cache device
cache_device = RamVolume(S.from_MiB(50)) cache_device = RamVolume(S.from_MiB(50))