Merge pull request #726 from arutk/fipm

flush handling fixes and enhanced tests
This commit is contained in:
Adam Rutkowski 2022-06-02 10:46:36 +02:00 committed by GitHub
commit 5f767dd618
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 249 additions and 8 deletions

View File

@ -1,5 +1,5 @@
/* /*
* Copyright(c) 2012-2021 Intel Corporation * Copyright(c) 2012-2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause * SPDX-License-Identifier: BSD-3-Clause
*/ */
#include "ocf/ocf.h" #include "ocf/ocf.h"
@ -37,10 +37,6 @@ static void _ocf_engine_ops_complete(struct ocf_request *req, int error)
int ocf_engine_ops(struct ocf_request *req) int ocf_engine_ops(struct ocf_request *req)
{ {
struct ocf_cache *cache = req->cache;
OCF_DEBUG_TRACE(req->cache);
/* Get OCF request - increase reference counter */ /* Get OCF request - increase reference counter */
ocf_req_get(req); ocf_req_get(req);
@ -51,8 +47,9 @@ int ocf_engine_ops(struct ocf_request *req)
ocf_submit_volume_req(&req->core->volume, req, ocf_submit_volume_req(&req->core->volume, req,
_ocf_engine_ops_complete); _ocf_engine_ops_complete);
ocf_submit_cache_reqs(cache, req, req->rw, 0, req->byte_length,
1, _ocf_engine_ops_complete); /* submit flush to cache device */
ocf_submit_cache_flush(req, _ocf_engine_ops_complete);
/* Put OCF request - decrease reference counter */ /* Put OCF request - decrease reference counter */
ocf_req_put(req); ocf_req_put(req);

View File

@ -224,6 +224,23 @@ static void ocf_submit_volume_req_cmpl(struct ocf_io *io, int error)
ocf_io_put(io); ocf_io_put(io);
} }
void ocf_submit_cache_flush(struct ocf_request *req, ocf_req_end_t callback)
{
uint64_t flags = req->ioi.io.flags;
struct ocf_io *io;
io = ocf_new_cache_io(req->cache, req->io_queue, 0, 0, OCF_WRITE, 0,
flags);
if (!io) {
callback(req, -OCF_ERR_NO_MEM);
return;
}
ocf_io_set_cmpl(io, req, callback, ocf_submit_volume_req_cmpl);
ocf_volume_submit_flush(io);
}
void ocf_submit_cache_reqs(struct ocf_cache *cache, void ocf_submit_cache_reqs(struct ocf_cache *cache,
struct ocf_request *req, int dir, uint64_t offset, struct ocf_request *req, int dir, uint64_t offset,
uint64_t size, unsigned int reqs, ocf_req_end_t callback) uint64_t size, unsigned int reqs, ocf_req_end_t callback)

View File

@ -1,5 +1,5 @@
/* /*
* Copyright(c) 2012-2021 Intel Corporation * Copyright(c) 2012-2022 Intel Corporation
* SPDX-License-Identifier: BSD-3-Clause * SPDX-License-Identifier: BSD-3-Clause
*/ */
@ -64,6 +64,8 @@ void ocf_submit_cache_reqs(struct ocf_cache *cache,
struct ocf_request *req, int dir, uint64_t offset, struct ocf_request *req, int dir, uint64_t offset,
uint64_t size, unsigned int reqs, ocf_req_end_t callback); uint64_t size, unsigned int reqs, ocf_req_end_t callback);
void ocf_submit_cache_flush(struct ocf_request *req, ocf_req_end_t callback);
static inline struct ocf_io *ocf_new_cache_io(ocf_cache_t cache, static inline struct ocf_io *ocf_new_cache_io(ocf_cache_t cache,
ocf_queue_t queue, uint64_t addr, uint32_t bytes, ocf_queue_t queue, uint64_t addr, uint32_t bytes,
uint32_t dir, uint32_t io_class, uint64_t flags) uint32_t dir, uint32_t io_class, uint64_t flags)

View File

@ -102,6 +102,12 @@ class Io(Structure):
def submit_discard(self): def submit_discard(self):
return OcfLib.getInstance().ocf_volume_submit_discard(byref(self)) return OcfLib.getInstance().ocf_volume_submit_discard(byref(self))
def submit_flush(self):
return OcfLib.getInstance().ocf_volume_submit_flush(byref(self))
def submit_discard(self):
return OcfLib.getInstance().ocf_volume_submit_discard(byref(self))
def set_data(self, data: Data, offset: int = 0): def set_data(self, data: Data, offset: int = 0):
self.data = data self.data = data
OcfLib.getInstance().ocf_io_set_data(byref(self), data, offset) OcfLib.getInstance().ocf_io_set_data(byref(self), data, offset)

View File

@ -22,6 +22,7 @@ from ctypes import (
) )
from hashlib import md5 from hashlib import md5
import weakref import weakref
from enum import IntEnum
from .io import Io, IoOps, IoDir from .io import Io, IoOps, IoDir
from .queue import Queue from .queue import Queue
@ -32,6 +33,10 @@ from .data import Data
from .queue import Queue from .queue import Queue
class IoFlags(IntEnum):
FLUSH = 1
class VolumeCaps(Structure): class VolumeCaps(Structure):
_fields_ = [("_atomic_writes", c_uint32, 1)] _fields_ = [("_atomic_writes", c_uint32, 1)]
@ -350,6 +355,11 @@ class RamVolume(Volume):
discard.contents._end(discard, -OcfErrorCode.OCF_ERR_NOT_SUPP) discard.contents._end(discard, -OcfErrorCode.OCF_ERR_NOT_SUPP)
def do_submit_io(self, io): def do_submit_io(self, io):
flags = int(io.contents._flags)
if flags & IoFlags.FLUSH:
self.do_submit_flush(io)
return
try: try:
io_priv = cast(OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv)) io_priv = cast(OcfLib.getInstance().ocf_io_get_priv(io), POINTER(VolumeIoPriv))
offset = io_priv.contents._offset offset = io_priv.contents._offset
@ -463,6 +473,56 @@ class ErrorDevice(Volume):
return self.vol.get_copy() return self.vol.get_copy()
class TraceDevice(Volume):
class IoType(IntEnum):
Data = 1
Flush = 2
Discard = 3
def __init__(self, vol, trace_fcn=None, uuid=None):
self.vol = vol
super().__init__(uuid)
self.trace_fcn = trace_fcn
def _trace(self, io, io_type):
submit = True
if self.trace_fcn:
submit = self.trace_fcn(self, io, io_type)
return submit
def do_submit_io(self, io):
submit = self._trace(io, TraceDevice.IoType.Data)
if submit:
self.vol.do_submit_io(io)
def do_submit_flush(self, io):
submit = self._trace(io, TraceDevice.IoType.Flush)
if submit:
self.vol.do_submit_flush(io)
def get_length(self):
return self.vol.get_length()
def get_max_io_size(self):
return self.vol.get_max_io_size()
def do_submit_discard(self, discard):
return self.vol.do_submit_discard(discard)
def dump(self, offset=0, size=0, ignore=VOLUME_POISON, **kwargs):
return self.vol.dump(offset, size, ignore=ignore, **kwargs)
def md5(self):
return self.vol.md5()
def get_copy(self):
return self.vol.get_copy()
lib = OcfLib.getInstance() lib = OcfLib.getInstance()
lib.ocf_io_get_priv.restype = POINTER(VolumeIoPriv) lib.ocf_io_get_priv.restype = POINTER(VolumeIoPriv)
lib.ocf_io_get_volume.argtypes = [c_void_p] lib.ocf_io_get_volume.argtypes = [c_void_p]

View File

@ -0,0 +1,72 @@
#
# Copyright(c) 2022-2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause-Clear
#
from ctypes import c_int
from pyocf.types.cache import Cache
from pyocf.types.data import Data
from pyocf.types.core import Core
from pyocf.types.io import IoDir
from pyocf.types.volume import RamVolume, IoFlags, TraceDevice
from pyocf.types.volume_core import CoreVolume
from pyocf.utils import Size
from pyocf.types.shared import OcfCompletion
def test_flush_propagation(pyocf_ctx):
flushes = {}
pyocf_ctx.register_volume_type(TraceDevice)
def trace_flush(vol, io, io_type):
nonlocal flushes
if io_type == TraceDevice.IoType.Flush or int(io.contents._flags) & IoFlags.FLUSH:
if vol.uuid not in flushes:
flushes[vol.uuid] = []
flushes[vol.uuid].append((io.contents._addr, io.contents._bytes))
return True
cache_device = TraceDevice(RamVolume(Size.from_MiB(50)), trace_fcn=trace_flush)
core_device = TraceDevice(RamVolume(Size.from_MiB(100)), trace_fcn=trace_flush)
addr = Size.from_MiB(2).B
size = Size.from_MiB(1).B
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device)
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
flushes = {}
io = vol.new_io(queue, addr, size, IoDir.WRITE, 0, IoFlags.FLUSH)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
data = Data(byte_count=0)
io.set_data(data, 0)
io.submit_flush()
completion.wait()
assert int(completion.results["err"]) == 0
assert cache_device.uuid in flushes
assert core_device.uuid in flushes
cache_flushes = flushes[cache_device.uuid]
core_flushes = flushes[core_device.uuid]
assert len(cache_flushes) == 1
assert len(core_flushes) == 1
assert core_flushes[0] == (addr, size)
# empty flush expected to be sent to cache device
assert cache_flushes[0] == (0, 0)
cache.stop()

View File

@ -0,0 +1,87 @@
#
# Copyright(c) 2022 Intel Corporation
# SPDX-License-Identifier: BSD-3-Clause
#
from ctypes import c_int
from pyocf.types.cache import Cache
from pyocf.types.data import Data
from pyocf.types.core import Core
from pyocf.types.io import IoDir
from pyocf.types.volume import RamVolume, IoFlags
from pyocf.types.volume_core import CoreVolume
from pyocf.utils import Size
from pyocf.types.shared import OcfCompletion
def test_large_flush(pyocf_ctx):
cache_device = RamVolume(Size.from_MiB(50))
core_device = RamVolume(Size.from_MiB(100))
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device)
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, IoFlags.FLUSH)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
data = Data(byte_count=0)
io.set_data(data, 0)
io.submit_flush()
completion.wait()
assert int(completion.results["err"]) == 0
cache.stop()
def test_large_discard(pyocf_ctx):
cache_device = RamVolume(Size.from_MiB(50))
core_device = RamVolume(Size.from_MiB(100))
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device)
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, 0)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
data = Data(byte_count=0)
io.set_data(data, 0)
io.submit_discard()
completion.wait()
assert int(completion.results["err"]) == 0
cache.stop()
def test_large_io(pyocf_ctx):
cache_device = RamVolume(Size.from_MiB(50))
core_device = RamVolume(Size.from_MiB(100))
cache = Cache.start_on_device(cache_device)
core = Core.using_device(core_device)
cache.add_core(core)
queue = cache.get_default_queue()
vol = CoreVolume(core, open=True)
io = vol.new_io(queue, 0, core_device.size.bytes, IoDir.WRITE, 0, 0)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
data = Data(byte_count=core_device.size.bytes)
io.set_data(data)
io.submit()
completion.wait()
assert int(completion.results["err"]) == 0
cache.stop()