Merge pull request #858 from robertbaldyga/attach-fix-race-condition
Fix race condition during cache attach
This commit is contained in:
commit
e630b811ff
@ -162,6 +162,11 @@ void ocf_resolve_effective_cache_mode(ocf_cache_t cache,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (env_atomic_read(&cache->attach_pt)) {
|
||||||
|
req->cache_mode = ocf_req_cache_mode_pt;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (cache->pt_unaligned_io && !ocf_req_is_4k(req->addr, req->bytes)) {
|
if (cache->pt_unaligned_io && !ocf_req_is_4k(req->addr, req->bytes)) {
|
||||||
req->cache_mode = ocf_req_cache_mode_pt;
|
req->cache_mode = ocf_req_cache_mode_pt;
|
||||||
return;
|
return;
|
||||||
|
@ -1901,18 +1901,35 @@ static void _ocf_mngt_attach_shutdown_status(ocf_pipeline_t pipeline,
|
|||||||
_ocf_mngt_attach_shutdown_status_complete, context);
|
_ocf_mngt_attach_shutdown_status_complete, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void _ocf_mngt_attach_post_init_finish(void *priv)
|
||||||
|
{
|
||||||
|
struct ocf_cache_attach_context *context = priv;
|
||||||
|
ocf_cache_t cache = context->cache;
|
||||||
|
|
||||||
|
ocf_refcnt_unfreeze(&cache->refcnt.d2c);
|
||||||
|
|
||||||
|
env_atomic_set(&cache->attach_pt, 0);
|
||||||
|
|
||||||
|
ocf_cache_log(cache, log_debug, "Cache attached\n");
|
||||||
|
|
||||||
|
ocf_pipeline_next(context->pipeline);
|
||||||
|
}
|
||||||
|
|
||||||
static void _ocf_mngt_attach_post_init(ocf_pipeline_t pipeline,
|
static void _ocf_mngt_attach_post_init(ocf_pipeline_t pipeline,
|
||||||
void *priv, ocf_pipeline_arg_t arg)
|
void *priv, ocf_pipeline_arg_t arg)
|
||||||
{
|
{
|
||||||
struct ocf_cache_attach_context *context = priv;
|
struct ocf_cache_attach_context *context = priv;
|
||||||
ocf_cache_t cache = context->cache;
|
ocf_cache_t cache = context->cache;
|
||||||
|
|
||||||
|
env_atomic_set(&cache->attach_pt, 1);
|
||||||
|
|
||||||
ocf_cleaner_refcnt_unfreeze(cache);
|
ocf_cleaner_refcnt_unfreeze(cache);
|
||||||
ocf_refcnt_unfreeze(&cache->refcnt.metadata);
|
ocf_refcnt_unfreeze(&cache->refcnt.metadata);
|
||||||
|
|
||||||
ocf_cache_log(cache, log_debug, "Cache attached\n");
|
ocf_refcnt_freeze(&cache->refcnt.d2c);
|
||||||
|
ocf_refcnt_register_zero_cb(&cache->refcnt.d2c,
|
||||||
ocf_pipeline_next(pipeline);
|
_ocf_mngt_attach_post_init_finish, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void _ocf_mngt_attach_handle_error(
|
static void _ocf_mngt_attach_handle_error(
|
||||||
|
@ -106,6 +106,8 @@ struct ocf_cache {
|
|||||||
env_atomic flush_in_progress;
|
env_atomic flush_in_progress;
|
||||||
env_mutex flush_mutex;
|
env_mutex flush_mutex;
|
||||||
|
|
||||||
|
env_atomic attach_pt;
|
||||||
|
|
||||||
struct ocf_cleaner cleaner;
|
struct ocf_cleaner cleaner;
|
||||||
|
|
||||||
struct list_head io_queues;
|
struct list_head io_queues;
|
||||||
|
@ -569,7 +569,7 @@ class Cache:
|
|||||||
def free_device_config(self, cfg):
|
def free_device_config(self, cfg):
|
||||||
lib = OcfLib.getInstance().ocf_volume_destroy(cfg._volume)
|
lib = OcfLib.getInstance().ocf_volume_destroy(cfg._volume)
|
||||||
|
|
||||||
def attach_device(
|
def attach_device_async(
|
||||||
self,
|
self,
|
||||||
device,
|
device,
|
||||||
force=False,
|
force=False,
|
||||||
@ -593,15 +593,40 @@ class Cache:
|
|||||||
|
|
||||||
self.write_lock()
|
self.write_lock()
|
||||||
|
|
||||||
c = OcfCompletion([("cache", c_void_p), ("priv", c_void_p), ("error", c_int)])
|
def callback(c):
|
||||||
|
self.write_unlock()
|
||||||
|
self.free_device_config(device_config)
|
||||||
|
|
||||||
|
c = OcfCompletion(
|
||||||
|
[("cache", c_void_p), ("priv", c_void_p), ("error", c_int)],
|
||||||
|
callback=callback
|
||||||
|
)
|
||||||
|
|
||||||
self.owner.lib.ocf_mngt_cache_attach(self.cache_handle, byref(attach_cfg), c, None)
|
self.owner.lib.ocf_mngt_cache_attach(self.cache_handle, byref(attach_cfg), c, None)
|
||||||
|
|
||||||
|
return c
|
||||||
|
|
||||||
|
def attach_device(
|
||||||
|
self,
|
||||||
|
device,
|
||||||
|
force=False,
|
||||||
|
perform_test=False,
|
||||||
|
cache_line_size=None,
|
||||||
|
open_cores=False,
|
||||||
|
disable_cleaner=False,
|
||||||
|
):
|
||||||
|
|
||||||
|
c = self.attach_device_async(
|
||||||
|
device,
|
||||||
|
force,
|
||||||
|
perform_test,
|
||||||
|
cache_line_size,
|
||||||
|
open_cores,
|
||||||
|
disable_cleaner
|
||||||
|
)
|
||||||
|
|
||||||
c.wait()
|
c.wait()
|
||||||
|
|
||||||
self.write_unlock()
|
|
||||||
|
|
||||||
self.free_device_config(device_config)
|
|
||||||
|
|
||||||
if c.results["error"]:
|
if c.results["error"]:
|
||||||
raise OcfError(
|
raise OcfError(
|
||||||
f"Attaching cache device failed",
|
f"Attaching cache device failed",
|
||||||
|
@ -83,7 +83,7 @@ class OcfCompletion:
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
raise KeyError(f"No completion argument {key} specified")
|
raise KeyError(f"No completion argument {key} specified")
|
||||||
|
|
||||||
def __init__(self, completion_args: list, context=None):
|
def __init__(self, completion_args: list, context=None, callback=None):
|
||||||
"""
|
"""
|
||||||
Provide ctypes arg list, and optionally index of status argument in
|
Provide ctypes arg list, and optionally index of status argument in
|
||||||
completion function which will be extracted (default - last argument).
|
completion function which will be extracted (default - last argument).
|
||||||
@ -95,6 +95,7 @@ class OcfCompletion:
|
|||||||
self.results = OcfCompletion.CompletionResult(completion_args)
|
self.results = OcfCompletion.CompletionResult(completion_args)
|
||||||
self._as_parameter_ = self.callback
|
self._as_parameter_ = self.callback
|
||||||
self.context = context
|
self.context = context
|
||||||
|
self.user_callback = callback
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def callback(self):
|
def callback(self):
|
||||||
@ -102,6 +103,8 @@ class OcfCompletion:
|
|||||||
def complete(*args):
|
def complete(*args):
|
||||||
self.results.results = args
|
self.results.results = args
|
||||||
self.e.set()
|
self.e.set()
|
||||||
|
if self.user_callback:
|
||||||
|
self.user_callback(self)
|
||||||
|
|
||||||
return complete
|
return complete
|
||||||
|
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
#
|
#
|
||||||
|
|
||||||
|
|
||||||
|
from time import sleep
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
@ -18,7 +19,6 @@ from pyocf.utils import Size
|
|||||||
CORE_SIZE = 4096
|
CORE_SIZE = 4096
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.xfail(reason="Data corruption when switching from D2C")
|
|
||||||
def test_d2c_io(pyocf_ctx):
|
def test_d2c_io(pyocf_ctx):
|
||||||
"""
|
"""
|
||||||
Start cache in D2C
|
Start cache in D2C
|
||||||
@ -46,7 +46,8 @@ def test_d2c_io(pyocf_ctx):
|
|||||||
d2c_data.write(b"a" * CORE_SIZE, CORE_SIZE)
|
d2c_data.write(b"a" * CORE_SIZE, CORE_SIZE)
|
||||||
d2c_io.set_data(d2c_data)
|
d2c_io.set_data(d2c_data)
|
||||||
|
|
||||||
cache.attach_device(cache_device)
|
c = cache.attach_device_async(cache_device)
|
||||||
|
sleep(1)
|
||||||
|
|
||||||
wt_io = vol.new_io(queue, 0, CORE_SIZE, IoDir.WRITE, 0, 0)
|
wt_io = vol.new_io(queue, 0, CORE_SIZE, IoDir.WRITE, 0, 0)
|
||||||
wt_data = Data(CORE_SIZE)
|
wt_data = Data(CORE_SIZE)
|
||||||
@ -55,11 +56,19 @@ def test_d2c_io(pyocf_ctx):
|
|||||||
|
|
||||||
wt_completion = Sync(wt_io).submit()
|
wt_completion = Sync(wt_io).submit()
|
||||||
assert int(wt_completion.results["err"]) == 0
|
assert int(wt_completion.results["err"]) == 0
|
||||||
assert cache.get_stats()["req"]["wr_full_misses"]["value"] == 1
|
|
||||||
|
|
||||||
d2c_completion = Sync(d2c_io).submit()
|
d2c_completion = Sync(d2c_io).submit()
|
||||||
assert int(d2c_completion.results["err"]) == 0
|
assert int(d2c_completion.results["err"]) == 0
|
||||||
assert cache.get_stats()["req"]["wr_pt"]["value"] == 1
|
|
||||||
|
c.wait()
|
||||||
|
|
||||||
|
if c.results["error"]:
|
||||||
|
raise OcfError(
|
||||||
|
f"Attaching cache device failed",
|
||||||
|
c.results["error"],
|
||||||
|
)
|
||||||
|
|
||||||
|
assert cache.get_stats()["req"]["wr_pt"]["value"] == 2
|
||||||
|
|
||||||
read_io = vol.new_io(queue, 0, CORE_SIZE, IoDir.READ, 0, 0)
|
read_io = vol.new_io(queue, 0, CORE_SIZE, IoDir.READ, 0, 0)
|
||||||
read_data = Data(CORE_SIZE)
|
read_data = Data(CORE_SIZE)
|
||||||
@ -67,6 +76,7 @@ def test_d2c_io(pyocf_ctx):
|
|||||||
|
|
||||||
read_completion = Sync(read_io).submit()
|
read_completion = Sync(read_io).submit()
|
||||||
assert int(read_completion.results["err"]) == 0
|
assert int(read_completion.results["err"]) == 0
|
||||||
|
assert cache.get_stats()["req"]["rd_full_misses"]["value"] == 1
|
||||||
|
|
||||||
cache.stop()
|
cache.stop()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user