Merge pull request #559 from jfckm/pyocf-loading

Make loading in pyocf viable
This commit is contained in:
Adam Rutkowski 2022-02-24 14:08:09 +01:00 committed by GitHub
commit 021e7e87ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 222 additions and 114 deletions

View File

@ -14,6 +14,7 @@
#include "ocf_types.h"
#include "ocf_env_headers.h"
#include "ocf/ocf_err.h"
#include "ocf/ocf_io.h"
struct ocf_io;

View File

@ -38,6 +38,7 @@ from .stats.cache import CacheInfo
from .ioclass import IoClassesInfo, IoClassInfo
from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats
from .ctx import OcfCtx
from .volume import Volume
class Backfill(Structure):
@ -176,37 +177,53 @@ class Cache:
metadata_volatile: bool = False,
max_queue_size: int = DEFAULT_BACKFILL_QUEUE_SIZE,
queue_unblock_size: int = DEFAULT_BACKFILL_UNBLOCK,
locked: bool = False,
pt_unaligned_io: bool = DEFAULT_PT_UNALIGNED_IO,
use_submit_fast: bool = DEFAULT_USE_SUBMIT_FAST,
):
self.device = None
self.started = False
self.owner = owner
self.cache_line_size = cache_line_size
self.cfg = CacheConfig(
_name=name.encode("ascii"),
_cache_mode=cache_mode,
_promotion_policy=promotion_policy,
_cache_line_size=cache_line_size,
_metadata_layout=metadata_layout,
_metadata_volatile=metadata_volatile,
_backfill=Backfill(
_max_queue_size=max_queue_size, _queue_unblock_size=queue_unblock_size
),
_locked=locked,
_pt_unaligned_io=pt_unaligned_io,
_use_submit_fast=use_submit_fast,
)
self.name = name
self.cache_mode = cache_mode
self.promotion_policy = promotion_policy
self.cache_line_size = cache_line_size
self.metadata_layout = metadata_layout
self.metadata_volatile = metadata_volatile
self.max_queue_size = max_queue_size
self.queue_unblock_size = queue_unblock_size
self.pt_unaligned_io = pt_unaligned_io
self.use_submit_fast = use_submit_fast
self.cache_handle = c_void_p()
self._as_parameter_ = self.cache_handle
self.io_queues = []
self.cores = []
def start_cache(self, default_io_queue: Queue = None, mngt_queue: Queue = None):
def start_cache(
self,
default_io_queue: Queue = None,
mngt_queue: Queue = None,
locked: bool = False,
):
cfg = CacheConfig(
_name=self.name.encode("ascii"),
_cache_mode=self.cache_mode,
_promotion_policy=self.promotion_policy,
_cache_line_size=self.cache_line_size,
_metadata_layout=self.metadata_layout,
_metadata_volatile=self.metadata_volatile,
_backfill=Backfill(
_max_queue_size=self.max_queue_size,
_queue_unblock_size=self.queue_unblock_size,
),
_locked=locked,
_pt_unaligned_io=self.pt_unaligned_io,
_use_submit_fast=self.use_submit_fast,
)
status = self.owner.lib.ocf_mngt_cache_start(
self.owner.ctx_handle, byref(self.cache_handle), byref(self.cfg), None
self.owner.ctx_handle, byref(self.cache_handle), byref(cfg), None
)
if status:
raise OcfError("Creating cache instance failed", status)
@ -427,12 +444,13 @@ class Cache:
if status:
raise OcfError("Error adding partition to cache", status)
def configure_device(
def generate_attach_config(
self,
device,
force=False,
perform_test=True,
cache_line_size=None,
discard=False,
open_cores=True,
):
self.device = device
@ -440,17 +458,15 @@ class Cache:
device_config = CacheDeviceConfig(
_uuid=Uuid(
_data=cast(
create_string_buffer(self.device_name.encode("ascii")), c_char_p
),
_size=len(self.device_name) + 1,
_data=cast(create_string_buffer(device.uuid.encode("ascii")), c_char_p),
_size=len(device.uuid) + 1,
),
_volume_type=device.type_id,
_perform_test=perform_test,
_volume_params=None,
)
self.dev_cfg = CacheAttachConfig(
attach_cfg = CacheAttachConfig(
_device=device_config,
_cache_line_size=cache_line_size
if cache_line_size
@ -460,23 +476,28 @@ class Cache:
_discard_on_start=False,
)
return attach_cfg
def attach_device(
self,
device,
force=False,
perform_test=False,
cache_line_size=None,
open_cores=True,
self, device, force=False, perform_test=False, cache_line_size=None, open_cores=True
):
self.configure_device(
device, force, perform_test, cache_line_size, open_cores=open_cores
attach_cfg = self.generate_attach_config(
device,
force,
perform_test,
cache_line_size,
open_cores=open_cores
)
self.device = device
self.write_lock()
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
self.owner.lib.ocf_mngt_cache_attach(
self.cache_handle, byref(self.dev_cfg), c, None
self.cache_handle, byref(attach_cfg), c, None
)
c.wait()
@ -500,10 +521,13 @@ class Cache:
raise OcfError("Attaching cache device failed", c.results["error"])
def load_cache(self, device, open_cores=True):
self.configure_device(device, open_cores=open_cores)
attach_cfg = self.generate_attach_config(device, open_cores=open_cores)
self.device = device
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
self.owner.lib.ocf_mngt_cache_load(
self.cache_handle, byref(self.dev_cfg), c, None
self.cache_handle, byref(attach_cfg), c, None
)
c.wait()
@ -570,7 +594,32 @@ class Cache:
def write_unlock(self):
self.owner.lib.ocf_mngt_cache_unlock(self.cache_handle)
def add_core(self, core: Core):
def get_core_by_name(self, name: str):
core_handle = c_void_p()
result = self.owner.lib.ocf_core_get_by_name(
self.cache_handle,
name.encode("ascii"),
len(name),
byref(core_handle),
)
if result != 0:
raise OcfError("Failed getting core by name", result)
uuid = self.owner.lib.ocf_core_get_uuid_wrapper(core_handle)
device = Volume.get_by_uuid(uuid.contents._data.decode("ascii"))
core = Core(device)
core.cache = self
core.handle = core_handle
self.cores.append(core)
return core
def add_core(self, core: Core, try_add=False):
cfg = core.get_config()
cfg._try_add = try_add
self.write_lock()
c = OcfCompletion(
@ -582,9 +631,7 @@ class Cache:
]
)
self.owner.lib.ocf_mngt_cache_add_core(
self.cache_handle, byref(core.get_cfg()), c, None
)
self.owner.lib.ocf_mngt_cache_add_core(self.cache_handle, byref(cfg), c, None)
c.wait()
if c.results["error"]:
@ -716,6 +763,11 @@ class Cache:
self.mngt_queue.put()
del self.io_queues[:]
self.started = False
for core in self.cores:
core.cache = None
core.handle = None
self.write_unlock()
self.device = None
self.started = False

View File

@ -19,6 +19,7 @@ from ctypes import (
cast,
byref,
create_string_buffer,
POINTER,
)
from datetime import timedelta
@ -58,39 +59,42 @@ class Core:
def __init__(
self,
device: Volume,
try_add: bool,
name: str = "core",
seq_cutoff_threshold: int = DEFAULT_SEQ_CUTOFF_THRESHOLD,
seq_cutoff_promotion_count: int = DEFAULT_SEQ_CUTOFF_PROMOTION_COUNT,
):
self.cache = None
self.device = device
self.device_name = device.uuid
self.name = name
self.seq_cutoff_threshold = seq_cutoff_threshold
self.seq_cutoff_promotion_count = seq_cutoff_promotion_count
self.handle = c_void_p()
self.cfg = CoreConfig(
_uuid=Uuid(
_data=cast(
create_string_buffer(self.device_name.encode("ascii")),
c_char_p,
),
_size=len(self.device_name) + 1,
),
_name=name.encode("ascii"),
_volume_type=self.device.type_id,
_try_add=try_add,
_seq_cutoff_threshold=seq_cutoff_threshold,
_seq_cutoff_promotion_count=seq_cutoff_promotion_count,
_user_metadata=UserMetadata(_data=None, _size=0),
)
@classmethod
def using_device(cls, device, **kwargs):
c = cls(device=device, try_add=False, **kwargs)
c = cls(device=device, **kwargs)
return c
def get_cfg(self):
return self.cfg
def get_config(self):
cfg = CoreConfig(
_uuid=Uuid(
_data=cast(
create_string_buffer(self.device.uuid.encode("ascii")),
c_char_p,
),
_size=len(self.device.uuid) + 1,
),
_name=self.name.encode("ascii"),
_volume_type=self.device.type_id,
_try_add=False,
_seq_cutoff_threshold=self.seq_cutoff_threshold,
_seq_cutoff_promotion_count=self.seq_cutoff_promotion_count,
_user_metadata=UserMetadata(_data=None, _size=0),
)
return cfg
def get_handle(self):
return self.handle
@ -218,6 +222,8 @@ class Core:
lib = OcfLib.getInstance()
lib.ocf_core_get_uuid_wrapper.restype = POINTER(Uuid)
lib.ocf_core_get_uuid_wrapper.argtypes = [c_void_p]
lib.ocf_core_get_volume.restype = c_void_p
lib.ocf_volume_new_io.argtypes = [
c_void_p,

View File

@ -0,0 +1,12 @@
/*
* Copyright(c) 2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "ocf/ocf_mngt.h"
void ocf_mngt_cache_config_set_default_wrapper(struct ocf_mngt_cache_config *cfg)
{
ocf_mngt_cache_config_set_default(cfg);
}

View File

@ -84,3 +84,45 @@ def test_load_cache_recovery(pyocf_ctx):
cache.stop()
cache = Cache.load_from_device(device_copy)
@pytest.mark.parametrize("open_cores", [True, False])
def test_load_cache_with_cores(pyocf_ctx, open_cores):
cache_device = Volume(S.from_MiB(40))
core_device = Volume(S.from_MiB(40))
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device, name="test_core")
cache.add_core(core)
write_data = Data.from_string("This is test data")
io = core.new_io(cache.get_default_queue(), S.from_sector(3).B,
write_data.size, IoDir.WRITE, 0, 0)
io.set_data(write_data)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
cache.stop()
cache = Cache.load_from_device(cache_device, open_cores=open_cores)
if not open_cores:
cache.add_core(core, try_add=True)
else:
core = cache.get_core_by_name("test_core")
read_data = Data(write_data.size)
io = core.new_io(cache.get_default_queue(), S.from_sector(3).B,
read_data.size, IoDir.READ, 0, 0)
io.set_data(read_data)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
io.submit()
cmpl.wait()
assert read_data.md5() == write_data.md5()
assert core.exp_obj_md5() == core_device.md5()

View File

@ -11,7 +11,15 @@ from itertools import count
import pytest
from pyocf.ocf import OcfLib
from pyocf.types.cache import Cache, CacheMode, MetadataLayout, CleaningPolicy
from pyocf.types.cache import (
Cache,
CacheMode,
MetadataLayout,
CleaningPolicy,
CacheConfig,
PromotionPolicy,
Backfill
)
from pyocf.types.core import Core
from pyocf.types.data import Data
from pyocf.types.io import IoDir
@ -387,12 +395,14 @@ def test_start_too_small_device(pyocf_ctx, mode, cls):
def test_start_stop_noqueue(pyocf_ctx):
# cache object just to construct cfg conveniently
_cache = Cache(pyocf_ctx.ctx_handle)
cfg = CacheConfig()
pyocf_ctx.lib.ocf_mngt_cache_config_set_default_wrapper(byref(cfg))
cfg._metadata_volatile = True
cfg._name = "Test".encode("ascii")
cache_handle = c_void_p()
status = pyocf_ctx.lib.ocf_mngt_cache_start(
pyocf_ctx.ctx_handle, byref(cache_handle), byref(_cache.cfg), None
pyocf_ctx.ctx_handle, byref(cache_handle), byref(cfg), None
)
assert not status, "Failed to start cache: {}".format(status)

View File

@ -102,9 +102,7 @@ def mngmt_op_surprise_shutdown_test(
# disable error injection and load the cache
device.disarm()
# load cache with open_cores = False to allow consistency check to add
# core with WA for pyocf object management
cache = Cache.load_from_device(device, open_cores=False)
cache = Cache.load_from_device(device, open_cores=True)
# run consistency check
if consistency_check_func is not None:
@ -127,7 +125,7 @@ def test_surprise_shutdown_add_core(pyocf_ctx):
assert stats["conf"]["core_count"] == (0 if error_triggered else 1)
def tested_func(cache):
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
cache.add_core(core)
def check_func(cache, error_triggered):
@ -161,7 +159,7 @@ def test_surprise_shutdown_remove_core(pyocf_ctx):
def test_surprise_shutdown_remove_core_with_data(pyocf_ctx):
io_offset = mngmt_op_surprise_shutdown_test_io_offset
core_device = Volume(S.from_MiB(10))
core = Core.using_device(core_device)
core = Core.using_device(core_device, name="core1")
def prepare_func(cache):
cache.add_core(core)
@ -176,8 +174,7 @@ def test_surprise_shutdown_remove_core_with_data(pyocf_ctx):
if stats["conf"]["core_count"] == 0:
assert core_device.get_bytes()[io_offset] == 0xAA
else:
core = Core(device=core_device, try_add=True)
cache.add_core(core)
core = cache.get_core_by_name("core1")
assert ocf_read(cache, core, io_offset) == 0xAA
mngmt_op_surprise_shutdown_test(pyocf_ctx, tested_func, prepare_func, check_func)
@ -204,24 +201,16 @@ def test_surprise_shutdown_swap_core(pyocf_ctx):
def check_func(cache, error_triggered):
stats = cache.get_stats()
assert stats["conf"]["core_count"] == (0 if error_triggered else 1)
core1_ptr = c_void_p()
core2_ptr = c_void_p()
ret1 = OcfLib.getInstance().ocf_core_get_by_name(
cache, "core1".encode("utf-8"), 6, byref(core1_ptr)
)
ret2 = OcfLib.getInstance().ocf_core_get_by_name(
cache, "core2".encode("utf-8"), 6, byref(core2_ptr)
)
assert ret1 != 0
with pytest.raises(OcfError):
core1 = cache.get_core_by_name("core1")
if error_triggered:
assert ret2 != 0
with pytest.raises(OcfError):
core2 = cache.get_core_by_name("core2")
else:
assert ret2 == 0
uuid_ptr = cast(
cache.owner.lib.ocf_core_get_uuid_wrapper(core2_ptr), POINTER(Uuid)
)
uuid = str(uuid_ptr.contents._data, encoding="ascii")
assert uuid == "dev2"
core2 = cache.get_core_by_name("core2")
assert core2.device.uuid == "dev2"
mngmt_op_surprise_shutdown_test(pyocf_ctx, tested_func, prepare, check_func)
@ -248,23 +237,19 @@ def test_surprise_shutdown_swap_core_with_data(pyocf_ctx):
def check_func(cache, error_triggered):
stats = cache.get_stats()
assert stats["conf"]["core_count"] == (0 if error_triggered else 1)
core1_ptr = c_void_p()
core2_ptr = c_void_p()
ret1 = OcfLib.getInstance().ocf_core_get_by_name(
cache, "core1".encode("utf-8"), 6, byref(core1_ptr)
)
ret2 = OcfLib.getInstance().ocf_core_get_by_name(
cache, "core2".encode("utf-8"), 6, byref(core2_ptr)
)
assert ret1 != 0
if ret2 == 0:
uuid_ptr = cast(
cache.owner.lib.ocf_core_get_uuid_wrapper(core2_ptr), POINTER(Uuid)
)
uuid = str(uuid_ptr.contents._data, encoding="ascii")
assert uuid == "dev2"
core2 = Core(device=core_device_2, try_add=True, name="core2")
cache.add_core(core2)
with pytest.raises(OcfError):
core1 = cache.get_core_by_name("core1")
core2 = None
if error_triggered:
with pytest.raises(OcfError):
core2 = cache.get_core_by_name("core2")
else:
core2 = cache.get_core_by_name("core2")
if core2 is not None:
assert core2.device.uuid == "dev2"
assert (
ocf_read(cache, core2, mngmt_op_surprise_shutdown_test_io_offset)
== Volume.VOLUME_POISON
@ -341,7 +326,7 @@ def test_surprise_shutdown_stop_cache(pyocf_ctx):
# setup cache and insert some data
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
cache.add_core(core)
ocf_write(cache, core, 0xAA, io_offset)
@ -374,8 +359,8 @@ def test_surprise_shutdown_stop_cache(pyocf_ctx):
stats = cache.get_stats()
if stats["conf"]["core_count"] == 1:
assert stats["usage"]["occupancy"]["value"] == 1
core = Core(device=core_device, try_add=True)
cache.add_core(core)
core = Core(device=core_device)
cache.add_core(core, try_add=True)
assert ocf_read(cache, core, io_offset) == 0xAA
cache.stop()
@ -401,7 +386,7 @@ def test_surprise_shutdown_cache_reinit(pyocf_ctx):
# start WB
cache = Cache.start_on_device(device, cache_mode=CacheMode.WB)
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
cache.add_core(core)
# insert dirty cacheline
@ -456,7 +441,7 @@ def test_surprise_shutdown_cache_reinit(pyocf_ctx):
def _test_surprise_shutdown_mngmt_generic(pyocf_ctx, func):
core_device = Volume(S.from_MiB(10))
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
def prepare(cache):
cache.add_core(core)
@ -480,7 +465,7 @@ def test_surprise_shutdown_change_cache_mode(pyocf_ctx):
@pytest.mark.long
def test_surprise_shutdown_set_cleaning_policy(pyocf_ctx):
core_device = Volume(S.from_MiB(10))
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
for c1 in CleaningPolicy:
for c2 in CleaningPolicy:
@ -501,7 +486,7 @@ def test_surprise_shutdown_set_cleaning_policy(pyocf_ctx):
@pytest.mark.long
def test_surprise_shutdown_set_seq_cut_off_policy(pyocf_ctx):
core_device = Volume(S.from_MiB(10))
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
for s1 in SeqCutOffPolicy:
for s2 in SeqCutOffPolicy:
@ -538,7 +523,7 @@ def test_surprise_shutdown_set_seq_cut_off_threshold(pyocf_ctx):
@pytest.mark.long
def test_surprise_shutdown_set_cleaning_policy_param(pyocf_ctx):
core_device = Volume(S.from_MiB(10))
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
for pol in CleaningPolicy:
if pol == CleaningPolicy.NOP:
@ -590,7 +575,7 @@ def test_surprise_shutdown_set_cleaning_policy_param(pyocf_ctx):
@pytest.mark.long
def test_surprise_shutdown_set_promotion_policy(pyocf_ctx):
core_device = Volume(S.from_MiB(10))
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
for pp1 in PromotionPolicy:
for pp2 in PromotionPolicy:
@ -611,7 +596,7 @@ def test_surprise_shutdown_set_promotion_policy(pyocf_ctx):
@pytest.mark.long
def test_surprise_shutdown_set_promotion_policy_param(pyocf_ctx):
core_device = Volume(S.from_MiB(10))
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
for pp in PromotionPolicy:
if pp == PromotionPolicy.ALWAYS:
@ -649,7 +634,7 @@ def test_surprise_shutdown_set_promotion_policy_param(pyocf_ctx):
@pytest.mark.long
def test_surprise_shutdown_set_io_class_config(pyocf_ctx):
core_device = Volume(S.from_MiB(10))
core = Core(device=core_device, try_add=False)
core = Core(device=core_device)
class_range = range(0, IoClassesInfo.MAX_IO_CLASSES)
old_ioclass = [