Merge pull request #216 from robertbaldyga/io-and-req-in-single-allocation

Allocate io and req in single allocation
This commit is contained in:
Jan Musiał
2019-07-23 11:40:30 +02:00
committed by GitHub
50 changed files with 655 additions and 521 deletions

View File

@@ -11,9 +11,10 @@ from ctypes import (
c_int,
c_uint8,
c_uint16,
c_uint32,
c_uint64,
c_char_p,
c_bool,
c_uint32,
cast,
byref,
create_string_buffer,
@@ -22,6 +23,7 @@ from datetime import timedelta
from .data import Data
from .io import Io, IoDir
from .queue import Queue
from .shared import Uuid, OcfCompletion, OcfError, SeqCutOffPolicy
from .stats.core import CoreStats
from .stats.shared import UsageStats, RequestsStats, BlocksStats, ErrorsStats
@@ -92,17 +94,25 @@ class Core:
def get_handle(self):
return self.handle
def new_io(self):
def new_io(
self, queue: Queue, addr: int, length: int, direction: IoDir,
io_class: int, flags: int
):
if not self.cache:
raise Exception("Core isn't attached to any cache")
io = OcfLib.getInstance().ocf_core_new_io_wrapper(self.handle)
io = OcfLib.getInstance().ocf_core_new_io_wrapper(
self.handle, queue.handle, addr, length, direction, io_class, flags)
return Io.from_pointer(io)
def new_core_io(self):
def new_core_io(
self, queue: Queue, addr: int, length: int, direction: IoDir,
io_class: int, flags: int
):
lib = OcfLib.getInstance()
core = lib.ocf_core_get_volume(self.handle)
io = lib.ocf_volume_new_io(core)
volume = lib.ocf_core_get_volume(self.handle)
io = lib.ocf_volume_new_io(
volume, queue.handle, addr, length, direction, io_class, flags)
return Io.from_pointer(io)
def get_stats(self):
@@ -166,10 +176,9 @@ class Core:
position = 0
while position < read_buffer_all.size:
io = self.new_io()
io.configure(position, cache_line_size, IoDir.READ, 0, 0)
io = self.new_io(self.cache.get_default_queue(), position,
cache_line_size, IoDir.READ, 0, 0)
io.set_data(read_buffer)
io.set_queue(self.cache.get_default_queue())
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
@@ -187,7 +196,15 @@ class Core:
lib = OcfLib.getInstance()
lib.ocf_core_get_volume.restype = c_void_p
lib.ocf_volume_new_io.argtypes = [c_void_p]
lib.ocf_volume_new_io.argtypes = [
c_void_p,
c_void_p,
c_uint64,
c_uint32,
c_uint32,
c_uint32,
c_uint64,
]
lib.ocf_volume_new_io.restype = c_void_p
lib.ocf_core_get_volume.argtypes = [c_void_p]
lib.ocf_core_get_volume.restype = c_void_p
@@ -197,3 +214,13 @@ lib.ocf_stats_collect_core.argtypes = [c_void_p, c_void_p, c_void_p, c_void_p, c
lib.ocf_stats_collect_core.restype = c_int
lib.ocf_core_get_stats.argtypes = [c_void_p, c_void_p]
lib.ocf_core_get_stats.restype = c_int
lib.ocf_core_new_io_wrapper.argtypes = [
c_void_p,
c_void_p,
c_uint64,
c_uint32,
c_uint32,
c_uint32,
c_uint64,
]
lib.ocf_core_new_io_wrapper.restype = c_void_p

View File

@@ -18,7 +18,6 @@ from enum import IntEnum
from ..ocf import OcfLib
from .data import Data
from .queue import Queue
class IoDir(IntEnum):
@@ -37,8 +36,6 @@ class Io(Structure):
_instances_ = {}
_fields_ = [
("_volume", c_void_p),
("_ops", POINTER(IoOps)),
("_addr", c_uint64),
("_flags", c_uint64),
("_bytes", c_uint32),
@@ -101,19 +98,9 @@ class Io(Structure):
def submit(self):
return OcfLib.getInstance().ocf_core_submit_io_wrapper(byref(self))
def configure(
self, addr: int, length: int, direction: IoDir, io_class: int, flags: int
):
OcfLib.getInstance().ocf_io_configure_wrapper(
byref(self), addr, length, direction, io_class, flags
)
def set_data(self, data: Data, offset: int = 0):
self.data = data
OcfLib.getInstance().ocf_io_set_data_wrapper(byref(self), data, offset)
def set_queue(self, queue: Queue):
OcfLib.getInstance().ocf_io_set_queue_wrapper(byref(self), queue.handle)
OcfLib.getInstance().ocf_io_set_data(byref(self), data, offset)
IoOps.SET_DATA = CFUNCTYPE(c_int, POINTER(Io), c_void_p, c_uint32)
@@ -122,22 +109,10 @@ IoOps.GET_DATA = CFUNCTYPE(c_void_p, POINTER(Io))
IoOps._fields_ = [("_set_data", IoOps.SET_DATA), ("_get_data", IoOps.GET_DATA)]
lib = OcfLib.getInstance()
lib.ocf_core_new_io_wrapper.restype = POINTER(Io)
lib.ocf_io_set_cmpl_wrapper.argtypes = [POINTER(Io), c_void_p, c_void_p, Io.END]
lib.ocf_io_configure_wrapper.argtypes = [
POINTER(Io),
c_uint64,
c_uint32,
c_uint32,
c_uint32,
c_uint64,
]
lib.ocf_io_set_queue_wrapper.argtypes = [POINTER(Io), c_uint32]
lib.ocf_core_new_io_wrapper.argtypes = [c_void_p]
lib.ocf_core_new_io_wrapper.restype = c_void_p
lib.ocf_io_set_data_wrapper.argtypes = [POINTER(Io), c_void_p, c_uint32]
lib.ocf_io_set_data_wrapper.restype = c_int
lib.ocf_io_set_queue_wrapper.argtypes = [POINTER(Io), c_void_p]
lib.ocf_io_set_data.argtypes = [POINTER(Io), c_void_p, c_uint32]
lib.ocf_io_set_data.restype = c_int

View File

@@ -145,7 +145,9 @@ class Volume(Structure):
@VolumeOps.SUBMIT_IO
def _submit_io(io):
io_structure = cast(io, POINTER(Io))
volume = Volume.get_instance(io_structure.contents._volume)
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
volume.submit_io(io_structure)
@@ -153,7 +155,9 @@ class Volume(Structure):
@VolumeOps.SUBMIT_FLUSH
def _submit_flush(flush):
io_structure = cast(flush, POINTER(Io))
volume = Volume.get_instance(io_structure.contents._volume)
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
volume.submit_flush(io_structure)
@@ -166,7 +170,9 @@ class Volume(Structure):
@VolumeOps.SUBMIT_DISCARD
def _submit_discard(discard):
io_structure = cast(discard, POINTER(Io))
volume = Volume.get_instance(io_structure.contents._volume)
volume = Volume.get_instance(
OcfLib.getInstance().ocf_io_get_volume(io_structure)
)
volume.submit_discard(io_structure)
@@ -273,11 +279,11 @@ class Volume(Structure):
offset = io_priv.contents._offset
if io.contents._dir == IoDir.WRITE:
src_ptr = cast(io.contents._ops.contents._get_data(io), c_void_p)
src_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p)
src = Data.get_instance(src_ptr.value).handle.value + offset
dst = self._storage + io.contents._addr
elif io.contents._dir == IoDir.READ:
dst_ptr = cast(io.contents._ops.contents._get_data(io), c_void_p)
dst_ptr = cast(OcfLib.getInstance().ocf_io_get_data(io), c_void_p)
dst = Data.get_instance(dst_ptr.value).handle.value + offset
src = self._storage + io.contents._addr
@@ -342,3 +348,7 @@ class TraceDevice(Volume):
lib = OcfLib.getInstance()
lib.ocf_io_get_priv.restype = POINTER(VolumeIoPriv)
lib.ocf_io_get_volume.argtypes = [c_void_p]
lib.ocf_io_get_volume.restype = c_void_p
lib.ocf_io_get_data.argtypes = [c_void_p]
lib.ocf_io_get_data.restype = c_void_p

View File

@@ -6,15 +6,11 @@
#include "ocf/ocf_io.h"
#include "ocf/ocf_core.h"
struct ocf_io *ocf_core_new_io_wrapper(ocf_core_t core)
struct ocf_io *ocf_core_new_io_wrapper(ocf_core_t core, ocf_queue_t queue,
uint64_t addr, uint32_t bytes, uint32_t dir,
uint32_t io_class, uint64_t flags)
{
return ocf_core_new_io(core);
}
void ocf_io_configure_wrapper(struct ocf_io *io, uint64_t addr,
uint32_t bytes, uint32_t dir, uint32_t class, uint64_t flags)
{
ocf_io_configure(io, addr, bytes, dir, class, flags);
return ocf_core_new_io(core, queue, addr, bytes, dir, io_class, flags);
}
void ocf_io_set_cmpl_wrapper(struct ocf_io *io, void *context,
@@ -33,22 +29,6 @@ void ocf_io_set_handle_wrapper(struct ocf_io *io, ocf_handle_io_t fn)
ocf_io_set_handle(io, fn);
}
int ocf_io_set_data_wrapper(struct ocf_io *io, ctx_data_t *data,
uint32_t offset)
{
return ocf_io_set_data(io, data, offset);
}
ctx_data_t *ocf_io_get_data_wrapper(struct ocf_io *io)
{
return ocf_io_get_data(io);
}
void ocf_io_set_queue_wrapper(struct ocf_io *io, ocf_queue_t queue)
{
ocf_io_set_queue(io, queue);
}
void ocf_core_submit_io_wrapper(struct ocf_io *io)
{
ocf_core_submit_io(io);

View File

@@ -32,10 +32,9 @@ def test_simple_wt_write(pyocf_ctx):
core_device.reset_stats()
write_data = Data.from_string("This is test data")
io = core.new_io()
io = core.new_io(cache.get_default_queue(), 20, write_data.size,
IoDir.WRITE, 0, 0)
io.set_data(write_data)
io.configure(20, write_data.size, IoDir.WRITE, 0, 0)
io.set_queue(cache.get_default_queue())
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback

View File

@@ -19,8 +19,6 @@ from pyocf.types.shared import OcfCompletion
def __io(io, queue, address, size, data, direction):
io.set_data(data, 0)
io.configure(address, size, direction, 0, 0)
io.set_queue(queue)
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
io.submit()
@@ -28,7 +26,8 @@ def __io(io, queue, address, size, data, direction):
return int(completion.results['err'])
def _io(io, queue, address, size, data, offset, direction):
def _io(new_io, queue, address, size, data, offset, direction):
io = new_io(queue, address, size, direction, 0, 0)
if direction == IoDir.READ:
_data = Data.from_bytes(bytes(size))
else:
@@ -40,12 +39,12 @@ def _io(io, queue, address, size, data, offset, direction):
def io_to_core(core, address, size, data, offset, direction):
return _io(core.new_core_io(), core.cache.get_default_queue(), address, size,
return _io(core.new_core_io, core.cache.get_default_queue(), address, size,
data, offset, direction)
def io_to_exp_obj(core, address, size, data, offset, direction):
return _io(core.new_io(), core.cache.get_default_queue(), address, size, data,
return _io(core.new_io, core.cache.get_default_queue(), address, size, data,
offset, direction)

View File

@@ -101,10 +101,10 @@ def test_10add_remove_with_io(pyocf_ctx):
assert stats["conf"]["core_count"] == 1
write_data = Data.from_string("Test data")
io = core.new_io()
io = core.new_io(
cache.get_default_queue(), 20, write_data.size, IoDir.WRITE, 0, 0
)
io.set_data(write_data)
io.configure(20, write_data.size, IoDir.WRITE, 0, 0)
io.set_queue(cache.get_default_queue())
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback

View File

@@ -372,10 +372,10 @@ def run_io_and_cache_data_if_possible(exported_obj, mode, cls, cls_no):
def io_to_core(exported_obj: Core, data: Data, offset: int, to_core_device=False):
io = exported_obj.new_core_io() if to_core_device else exported_obj.new_io()
new_io = exported_obj.new_core_io if to_core_device else exported_obj.new_io
io = new_io(exported_obj.cache.get_default_queue(), offset, data.size,
IoDir.WRITE, 0, 0)
io.set_data(data)
io.configure(offset, data.size, IoDir.WRITE, 0, 0)
io.set_queue(exported_obj.cache.get_default_queue())
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback
@@ -387,10 +387,9 @@ def io_to_core(exported_obj: Core, data: Data, offset: int, to_core_device=False
def io_from_exported_object(exported_obj: Core, buffer_size: int, offset: int):
read_buffer = Data(buffer_size)
io = exported_obj.new_io()
io.configure(offset, read_buffer.size, IoDir.READ, 0, 0)
io = exported_obj.new_io(exported_obj.cache.get_default_queue(), offset,
read_buffer.size, IoDir.READ, 0, 0)
io.set_data(read_buffer)
io.set_queue(exported_obj.cache.get_default_queue())
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback

View File

@@ -164,10 +164,9 @@ def prepare_cache_and_core(core_size: Size, cache_size: Size = Size.from_MiB(20)
def io_operation(core: Core, data: Data, io_direction: int, offset: int = 0, io_class: int = 0):
io = core.new_io()
io = core.new_io(core.cache.get_default_queue(), offset, data.size,
io_direction, io_class, 0)
io.set_data(data)
io.configure(offset, data.size, io_direction, io_class, 0)
io.set_queue(core.cache.get_default_queue())
completion = OcfCompletion([("err", c_int)])
io.callback = completion.callback

View File

@@ -87,10 +87,10 @@ def test_secure_erase_simple_io_read_misses(cache_mode):
cache.add_core(core)
write_data = Data.from_string("This is test data")
io = core.new_io()
io = core.new_io(
cache.get_default_queue(), 20, write_data.size, IoDir.WRITE, 0, 0
)
io.set_data(write_data)
io.configure(20, write_data.size, IoDir.WRITE, 0, 0)
io.set_queue(cache.get_default_queue())
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
@@ -100,12 +100,11 @@ def test_secure_erase_simple_io_read_misses(cache_mode):
cmpls = []
for i in range(100):
read_data = Data(500)
io = core.new_io()
io.set_data(read_data)
io.configure(
(i * 1259) % int(core_device.size), read_data.size, IoDir.READ, 0, 0
io = core.new_io(
cache.get_default_queue(), (i * 1259) % int(core_device.size),
read_data.size, IoDir.READ, 0, 0
)
io.set_queue(cache.get_default_queue())
io.set_data(read_data)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
@@ -116,10 +115,10 @@ def test_secure_erase_simple_io_read_misses(cache_mode):
c.wait()
write_data = Data.from_string("TEST DATA" * 100)
io = core.new_io()
io = core.new_io(
cache.get_default_queue(), 500, write_data.size, IoDir.WRITE, 0, 0
)
io.set_data(write_data)
io.configure(500, write_data.size, IoDir.WRITE, 0, 0)
io.set_queue(cache.get_default_queue())
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback
@@ -169,12 +168,11 @@ def test_secure_erase_simple_io_cleaning():
cmpls = []
for i in range(10000):
read_data = Data(S.from_KiB(120))
io = core.new_io()
io.set_data(read_data)
io.configure(
(i * 1259) % int(core_device.size), read_data.size, IoDir.WRITE, 0, 0
io = core.new_io(
cache.get_default_queue(), (i * 1259) % int(core_device.size),
read_data.size, IoDir.WRITE, 0, 0
)
io.set_queue(cache.get_default_queue())
io.set_data(read_data)
cmpl = OcfCompletion([("err", c_int)])
io.callback = cmpl.callback