diff --git a/benchmarks/asv_bench/benchmarks/storage.py b/benchmarks/asv_bench/benchmarks/storage.py index 02e0354636..e5586652e8 100644 --- a/benchmarks/asv_bench/benchmarks/storage.py +++ b/benchmarks/asv_bench/benchmarks/storage.py @@ -13,6 +13,7 @@ # limitations under the License. import itertools +from typing import List import cloudpickle import numpy as np @@ -24,20 +25,30 @@ from mars.utils import Timer, readable_size -def send_1_to_1(n: int = None): +def send_1_to_1(n: int = None, n_out: int = 1): ctx = get_context() workers = ctx.get_worker_addresses() worker_to_gen_data = { - w: mr.spawn(_gen_data, kwargs=dict(n=n, worker=w), expect_worker=w) + w: mr.spawn( + _gen_data, + kwargs=dict(n=n, worker=w, n_out=n_out), + expect_worker=w, + n_output=n_out, + ) for i, w in enumerate(workers) } - all_data = mars.execute(list(worker_to_gen_data.values())) + all_data = mars.execute(list(itertools.chain(*worker_to_gen_data.values()))) progress = 0.1 ctx.set_progress(progress) - infos = [d._fetch_infos(fields=["data_key", "store_size"]) for d in all_data] - data_size = infos[0]["store_size"][0] - worker_to_data_keys = dict(zip(workers, [info["data_key"][0] for info in infos])) + infos = np.array( + [d._fetch_infos(fields=["data_key", "store_size"]) for d in all_data], + dtype=object, + ) + data_size = sum(info["store_size"][0] for info in infos[:n_out]) + worker_to_data_keys = dict() + for worker, infos in zip(workers, np.split(infos, len(infos) // n_out)): + worker_to_data_keys[worker] = [info["data_key"][0] for info in infos] workers_to_durations = dict() size = len(workers) * (len(workers) - 1) @@ -60,27 +71,31 @@ def send_1_to_1(n: int = None): def _gen_data( - n: int = None, worker: str = None, check_addr: bool = True -) -> pd.DataFrame: + n: int = None, worker: str = None, check_addr: bool = True, n_out: int = 1 +) -> List[pd.DataFrame]: if check_addr: ctx = get_context() assert ctx.worker_address == worker - n = n if n is not None else 5_000_000 rs = np.random.RandomState(123) - data = { - "a": rs.rand(n), - "b": rs.randint(n * 10, size=n), - "c": [f"foo{i}" for i in range(n)], - } - return pd.DataFrame(data) + outs = [] + for _ in range(n_out): + n = n if n is not None else 5_000_000 + data = { + "a": rs.rand(n), + "b": rs.randint(n * 10, size=n), + "c": [f"foo{i}" for i in range(n)], + } + outs.append(pd.DataFrame(data)) + return outs -def _fetch_data(data_key: str, worker: str = None): + +def _fetch_data(data_keys: List[str], worker: str = None): # do nothing actually ctx = get_context() assert ctx.worker_address == worker with Timer() as timer: - ctx.get_chunks_result([data_key], fetch_only=True) + ctx.get_chunks_result(data_keys, fetch_only=True) return timer.duration @@ -107,9 +122,15 @@ def teardown(self): def time_1_to_1(self): return mr.spawn(send_1_to_1).execute().fetch() + def time_1_to_1_small_objects(self): + return mr.spawn(send_1_to_1, kwargs=dict(n=1_000, n_out=100)).execute().fetch() + if __name__ == "__main__": suite = TransferPackageSuite() suite.setup() + print("- Bench 1 to 1 -") print(suite.time_1_to_1()) + print("- Bench 1 to 1 with small objects -") + print(suite.time_1_to_1_small_objects()) suite.teardown() diff --git a/mars/services/storage/api/oscar.py b/mars/services/storage/api/oscar.py index 81277abaa0..a71ed0f6f5 100644 --- a/mars/services/storage/api/oscar.py +++ b/mars/services/storage/api/oscar.py @@ -23,9 +23,8 @@ StorageManagerActor, DataManagerActor, DataInfo, - WrappedStorageFileObject, ) -from ..handler import StorageHandlerActor +from ..handler import StorageHandlerActor, WrappedStorageFileObject from .core import AbstractStorageAPI _is_windows = sys.platform.lower().startswith("win") diff --git a/mars/services/storage/core.py b/mars/services/storage/core.py index eedc375389..6e4d33daf2 100644 --- a/mars/services/storage/core.py +++ b/mars/services/storage/core.py @@ -19,12 +19,10 @@ from typing import Dict, List, Optional, Union, Tuple from ... import oscar as mo -from ...lib.aio import AioFileObject from ...oscar.backends.allocate_strategy import IdleLabel, NoIdleSlot from ...resource import cuda_card_stats from ...storage import StorageLevel, get_storage_backend -from ...storage.base import ObjectInfo, StorageBackend -from ...storage.core import StorageFileObject +from ...storage.base import ObjectInfo from ...utils import dataslots from .errors import DataNotExist, StorageFull @@ -44,50 +42,6 @@ def build_data_info(storage_info: ObjectInfo, level, size, band_name=None): return DataInfo(storage_info.object_id, level, size, store_size, band_name) -class WrappedStorageFileObject(AioFileObject): - """ - Wrap to hold ref after write close - """ - - def __init__( - self, - file: StorageFileObject, - level: StorageLevel, - size: int, - session_id: str, - data_key: str, - data_manager: mo.ActorRefType["DataManagerActor"], - storage_handler: StorageBackend, - ): - self._object_id = file.object_id - super().__init__(file) - self._size = size - self._level = level - self._session_id = session_id - self._data_key = data_key - self._data_manager = data_manager - self._storage_handler = storage_handler - - def __getattr__(self, item): - return getattr(self._file, item) - - async def clean_up(self): - self._file.close() - - async def close(self): - self._file.close() - if self._object_id is None: - # for some backends like vineyard, - # object id is generated after write close - self._object_id = self._file.object_id - if "w" in self._file.mode: - object_info = await self._storage_handler.object_info(self._object_id) - data_info = build_data_info(object_info, self._level, self._size) - await self._data_manager.put_data_info( - self._session_id, self._data_key, data_info, object_info - ) - - class StorageQuotaActor(mo.Actor): def __init__( self, diff --git a/mars/services/storage/handler.py b/mars/services/storage/handler.py index 631d616a6e..13ed7a43d3 100644 --- a/mars/services/storage/handler.py +++ b/mars/services/storage/handler.py @@ -18,6 +18,7 @@ from typing import Any, Dict, List, Union from ... import oscar as mo +from ...lib.aio import AioFileObject from ...storage import StorageLevel, get_storage_backend from ...storage.core import StorageFileObject from ...typing import BandType @@ -29,7 +30,6 @@ DataManagerActor, DataInfo, build_data_info, - WrappedStorageFileObject, ) from .errors import DataNotExist, NoDataToSpill @@ -39,6 +39,54 @@ logger = logging.getLogger(__name__) +class WrappedStorageFileObject(AioFileObject): + """ + Wrap to hold ref after write close + """ + + def __init__( + self, + file: StorageFileObject, + level: StorageLevel, + size: int, + session_id: str, + data_key: str, + storage_handler: mo.ActorRefType["StorageHandlerActor"], + ): + self._object_id = file.object_id + super().__init__(file) + self._size = size + self._level = level + self._session_id = session_id + self._data_key = data_key + self._storage_handler = storage_handler + + def __getattr__(self, item): + return getattr(self._file, item) + + @property + def file(self): + return self._file + + @property + def object_id(self): + return self._object_id + + @property + def level(self): + return self._level + + @property + def size(self): + return self._size + + async def clean_up(self): + self._file.close() + + async def close(self): + await self._storage_handler.close_writer(self) + + class StorageHandlerActor(mo.Actor): """ Storage handler actor, provide methods like `get`, `put`, etc. @@ -82,22 +130,26 @@ async def __post_create__(self): if client.level & level: clients[level] = client - async def _get_data(self, data_info, conditions): + @mo.extensible + async def get_data_by_info(self, data_info: DataInfo, conditions: List = None): if conditions is None: - res = yield self._clients[data_info.level].get(data_info.object_id) + res = await self._clients[data_info.level].get(data_info.object_id) else: try: - res = yield self._clients[data_info.level].get( + res = await self._clients[data_info.level].get( data_info.object_id, conditions=conditions ) except NotImplementedError: - data = yield self._clients[data_info.level].get(data_info.object_id) + data = await self._clients[data_info.level].get(data_info.object_id) try: sliced_value = data.iloc[tuple(conditions)] except AttributeError: sliced_value = data[tuple(conditions)] res = sliced_value - raise mo.Return(res) + return res + + def get_client(self, level: StorageLevel): + return self._clients[level] @mo.extensible async def get( @@ -111,7 +163,7 @@ async def get( data_info = await self._data_manager_ref.get_data_info( session_id, data_key, self._band_name ) - data = yield self._get_data(data_info, conditions) + data = yield self.get_data_by_info(data_info, conditions) raise mo.Return(data) except DataNotExist: if error == "raise": @@ -143,7 +195,7 @@ async def batch_get(self, args_list, kwargs_list): if data_info is None: results.append(None) else: - result = yield self._get_data(data_info, conditions) + result = yield self.get_data_by_info(data_info, conditions) results.append(result) raise mo.Return(results) @@ -314,12 +366,16 @@ async def batch_delete(self, args_list, kwargs_list): for level, size in level_sizes.items(): await self._quota_refs[level].release_quota(size) + @mo.extensible + async def open_reader_by_info(self, data_info: DataInfo) -> StorageFileObject: + return await self._clients[data_info.level].open_reader(data_info.object_id) + @mo.extensible async def open_reader(self, session_id: str, data_key: str) -> StorageFileObject: data_info = await self._data_manager_ref.get_data_info( session_id, data_key, self._band_name ) - reader = await self._clients[data_info.level].open_reader(data_info.object_id) + reader = await self.open_reader_by_info(data_info) return reader @open_reader.batch @@ -333,10 +389,7 @@ async def batch_open_readers(self, args_list, kwargs_list): ) data_infos = await self._data_manager_ref.get_data_info.batch(*get_data_infos) return await asyncio.gather( - *[ - self._clients[data_info.level].open_reader(data_info.object_id) - for data_info in data_infos - ] + *[self.open_reader_by_info(data_info) for data_info in data_infos] ) @mo.extensible @@ -357,8 +410,7 @@ async def open_writer( size, session_id, data_key, - self._data_manager_ref, - self._clients[level], + self, ) @open_writer.batch @@ -389,12 +441,48 @@ async def batch_open_writers(self, args_list, kwargs_list): size, session_id, data_key, - self._data_manager_ref, - self._clients[level], + self, ) ) return wrapped_writers + @mo.extensible + async def close_writer(self, writer: WrappedStorageFileObject): + writer.file.close() + if writer.object_id is None: + # for some backends like vineyard, + # object id is generated after write close + writer._object_id = writer.file.object_id + if "w" in writer.file.mode: + client = self._clients[writer.level] + object_info = await client.object_info(writer.object_id) + data_info = build_data_info(object_info, writer.level, writer.size) + await self._data_manager_ref.put_data_info( + writer._session_id, writer._data_key, data_info, object_info + ) + + @close_writer.batch + async def batch_close_writers(self, args_list, kwargs_list): + put_info_tasks = [] + for args, kwargs in zip(args_list, kwargs_list): + (writer,) = self.close_writer.bind(*args, **kwargs) + writer.file.close() + if writer.object_id is None: + # for some backends like vineyard, + # object id is generated after write close + writer._object_id = writer.file.object_id + if "w" in writer.file.mode: + client = self._clients[writer.level] + object_info = await client.object_info(writer.object_id) + data_info = build_data_info(object_info, writer.level, writer.size) + put_info_tasks.append( + self._data_manager_ref.put_data_info.delay( + writer._session_id, writer._data_key, data_info, object_info + ) + ) + if put_info_tasks: + await self._data_manager_ref.put_data_info.batch(*put_info_tasks) + async def _get_meta_api(self, session_id: str): if self._supervisor_address is None: cluster_api = await ClusterAPI.create(self.address) diff --git a/mars/services/storage/tests/test_transfer.py b/mars/services/storage/tests/test_transfer.py index c3228fca16..2a37f56141 100644 --- a/mars/services/storage/tests/test_transfer.py +++ b/mars/services/storage/tests/test_transfer.py @@ -102,50 +102,84 @@ async def test_simple_transfer(create_actors): storage_handler1 = await mo.actor_ref( uid=StorageHandlerActor.gen_uid("numa-0"), address=worker_address_1 ) + data_manager1 = await mo.actor_ref( + uid=DataManagerActor.default_uid(), address=worker_address_1 + ) storage_handler2 = await mo.actor_ref( uid=StorageHandlerActor.gen_uid("numa-0"), address=worker_address_2 ) + data_manager2 = await mo.actor_ref( + uid=DataManagerActor.default_uid(), address=worker_address_2 + ) await storage_handler1.put(session_id, "data_key1", data1, StorageLevel.MEMORY) await storage_handler1.put(session_id, "data_key2", data2, StorageLevel.MEMORY) await storage_handler2.put(session_id, "data_key3", data2, StorageLevel.MEMORY) - sender_actor = await mo.actor_ref( + # sender_actor1 use default block_size + sender_actor1 = await mo.actor_ref( address=worker_address_1, uid=SenderManagerActor.gen_uid("numa-0") ) - - # send data to worker2 from worker1 - await sender_actor.send_batch_data( - session_id, - ["data_key1"], - worker_address_2, - StorageLevel.MEMORY, - block_size=1000, + # send_actor2 set block_size to 0 + sender_actor2 = await mo.create_actor( + SenderManagerActor, + "numa-0", + 0, + data_manager1, + storage_handler1, + uid=SenderManagerActor.gen_uid("mock"), + address=worker_address_1, ) - await sender_actor.send_batch_data( - session_id, - ["data_key2"], - worker_address_2, - StorageLevel.MEMORY, - block_size=1000, - ) + for i, sender_actor in enumerate([sender_actor1, sender_actor2]): + # send data to worker2 from worker1 + await sender_actor.send_batch_data( + session_id, + ["data_key1"], + worker_address_2, + StorageLevel.MEMORY, + block_size=1000, + ) - get_data1 = await storage_handler2.get(session_id, "data_key1") - np.testing.assert_array_equal(data1, get_data1) + await sender_actor.send_batch_data( + session_id, + ["data_key2"], + worker_address_2, + StorageLevel.MEMORY, + block_size=1000, + ) + + get_data1 = await storage_handler2.get(session_id, "data_key1") + np.testing.assert_array_equal(data1, get_data1) - get_data2 = await storage_handler2.get(session_id, "data_key2") - pd.testing.assert_frame_equal(data2, get_data2) + get_data2 = await storage_handler2.get(session_id, "data_key2") + pd.testing.assert_frame_equal(data2, get_data2) + await storage_handler2.delete(session_id, "data_key1") + await storage_handler2.delete(session_id, "data_key2") # send data to worker1 from worker2 - sender_actor = await mo.actor_ref( + sender_actor1 = await mo.actor_ref( address=worker_address_2, uid=SenderManagerActor.gen_uid("numa-0") ) - await sender_actor.send_batch_data( - session_id, ["data_key3"], worker_address_1, StorageLevel.MEMORY + # send_actor2 set block_size to 0 + sender_actor2 = await mo.create_actor( + SenderManagerActor, + "numa-0", + 0, + data_manager2, + storage_handler2, + uid=SenderManagerActor.gen_uid("mock"), + address=worker_address_2, ) - get_data3 = await storage_handler1.get(session_id, "data_key3") - pd.testing.assert_frame_equal(data2, get_data3) + for i, sender_actor in enumerate([sender_actor1, sender_actor2]): + # send data to worker1 from worker2 + data_key = f"data_key3" + await sender_actor.send_batch_data( + session_id, [data_key], worker_address_1, StorageLevel.MEMORY + ) + get_data3 = await storage_handler1.get(session_id, data_key) + pd.testing.assert_frame_equal(data2, get_data3) + await storage_handler1.delete(session_id, "data_key3") # test for cancelling happens when writing @@ -232,7 +266,11 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver): send_task = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key1"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key1"], + worker_address_2, + StorageLevel.MEMORY, + is_small_objects=False, ) ) @@ -250,7 +288,11 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver): send_task = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key1"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key1"], + worker_address_2, + StorageLevel.MEMORY, + is_small_objects=False, ) ) await send_task @@ -261,12 +303,20 @@ async def test_cancel_transfer(create_actors, mock_sender, mock_receiver): if mock_sender is MockSenderManagerActor: send_task1 = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key2"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key2"], + worker_address_2, + StorageLevel.MEMORY, + is_small_objects=False, ) ) send_task2 = asyncio.create_task( sender_actor.send_batch_data( - "mock", ["data_key2"], worker_address_2, StorageLevel.MEMORY + "mock", + ["data_key2"], + worker_address_2, + StorageLevel.MEMORY, + is_small_objects=False, ) ) await asyncio.sleep(0.5) diff --git a/mars/services/storage/transfer.py b/mars/services/storage/transfer.py index db4f892077..b076ec7e38 100644 --- a/mars/services/storage/transfer.py +++ b/mars/services/storage/transfer.py @@ -15,14 +15,14 @@ import asyncio import logging from dataclasses import dataclass -from typing import Dict, List +from typing import Dict, List, Tuple, Union from ... import oscar as mo from ...lib.aio import alru_cache from ...storage import StorageLevel from ...utils import dataslots -from .core import DataManagerActor, WrappedStorageFileObject -from .handler import StorageHandlerActor +from .core import DataManagerActor, DataInfo +from .handler import StorageHandlerActor, WrappedStorageFileObject DEFAULT_TRANSFER_BLOCK_SIZE = 4 * 1024**2 @@ -60,11 +60,20 @@ async def get_receiver_ref(address: str, band_name: str): address=address, uid=ReceiverManagerActor.gen_uid(band_name) ) + async def _open_readers(self, data_infos: List[DataInfo]): + open_reader_tasks = [] + for info in data_infos: + open_reader_tasks.append( + self._storage_handler.open_reader_by_info.delay(info) + ) + return await self._storage_handler.open_reader_by_info.batch(*open_reader_tasks) + async def _send_data( self, receiver_ref: mo.ActorRefType["ReceiverManagerActor"], session_id: str, data_keys: List[str], + data_infos: List[DataInfo], level: StorageLevel, block_size: int, ): @@ -92,12 +101,7 @@ async def send(self, buffer, eof_mark, key): await self.flush() sender = BufferedSender() - open_reader_tasks = [] - for data_key in data_keys: - open_reader_tasks.append( - self._storage_handler.open_reader.delay(session_id, data_key) - ) - readers = await self._storage_handler.open_reader.batch(*open_reader_tasks) + readers = await self._open_readers(data_infos) for data_key, reader in zip(data_keys, readers): while True: @@ -116,7 +120,59 @@ async def send(self, buffer, eof_mark, key): break await sender.flush() - @mo.extensible + async def _send( + self, + session_id: str, + data_keys: List[Union[str, Tuple]], + data_infos: List[DataInfo], + data_sizes: List[int], + block_size: int, + address: str, + band_name: str, + level: StorageLevel, + ): + receiver_ref: mo.ActorRefType[ + ReceiverManagerActor + ] = await self.get_receiver_ref(address, band_name) + is_transferring_list = await receiver_ref.open_writers( + session_id, data_keys, data_sizes, level + ) + to_send_keys = [] + to_send_infos = [] + to_wait_keys = [] + for data_key, is_transferring, info in zip( + data_keys, is_transferring_list, data_infos + ): + if is_transferring: + to_wait_keys.append(data_key) + else: + to_send_keys.append(data_key) + to_send_infos.append(info) + + if to_send_keys: + await self._send_data( + receiver_ref, session_id, to_send_keys, to_send_infos, level, block_size + ) + if to_wait_keys: + await receiver_ref.wait_transfer_done(session_id, to_wait_keys) + + async def _send_small_objects( + self, + session_id: str, + data_keys: List[Union[str, Tuple]], + data_infos: List[DataInfo], + address: str, + band_name: str, + level: StorageLevel, + ): + # simple get all objects and send them all to receiver + readers = await self._open_readers(data_infos) + data_list = await asyncio.gather(*(reader.read() for reader in readers)) + receiver_ref: mo.ActorRefType[ + ReceiverManagerActor + ] = await self.get_receiver_ref(address, band_name) + await receiver_ref.put_small_objects(session_id, data_keys, data_list, level) + async def send_batch_data( self, session_id: str, @@ -125,15 +181,13 @@ async def send_batch_data( level: StorageLevel, band_name: str = "numa-0", block_size: int = None, + is_small_objects=None, error: str = "raise", ): logger.debug( "Begin to send data (%s, %s) to %s", session_id, data_keys, address ) block_size = block_size or self._transfer_block_size - receiver_ref: mo.ActorRefType[ - ReceiverManagerActor - ] = await self.get_receiver_ref(address, band_name) get_infos = [] pin_tasks = [] for data_key in data_keys: @@ -162,23 +216,29 @@ async def send_batch_data( data_sizes = [info.store_size for info in infos] if level is None: level = infos[0].level - is_transferring_list = await receiver_ref.open_writers( - session_id, data_keys, data_sizes, level - ) - to_send_keys = [] - to_wait_keys = [] - for data_key, is_transferring in zip(data_keys, is_transferring_list): - if is_transferring: - to_wait_keys.append(data_key) - else: - to_send_keys.append(data_key) - - if to_send_keys: - await self._send_data( - receiver_ref, session_id, to_send_keys, level, block_size + total_size = sum(data_sizes) + if is_small_objects is None: + is_small_objects = total_size <= block_size + if is_small_objects: + logger.debug( + "Choose send_small_objects method for sending data of %s bytes", + total_size, + ) + await self._send_small_objects( + session_id, data_keys, infos, address, band_name, level + ) + else: + logger.debug("Choose block method for sending data of %s bytes", total_size) + await self._send( + session_id, + data_keys, + infos, + data_sizes, + block_size, + address, + band_name, + level, ) - if to_wait_keys: - await receiver_ref.wait_transfer_done(session_id, to_wait_keys) unpin_tasks = [] for data_key in data_keys: unpin_tasks.append( @@ -232,6 +292,25 @@ def _decref_writing_key(self, session_id: str, data_key: str): if self._writing_infos[(session_id, data_key)].ref_counts == 0: del self._writing_infos[(session_id, data_key)] + async def put_small_objects( + self, session_id: str, data_keys: List[str], objects: Tuple, level: StorageLevel + ): + open_writers = [] + for data_key, data in zip(data_keys, objects): + open_writers.append( + self._storage_handler.open_writer.delay( + session_id, data_key, len(data), level + ) + ) + writers = await self._storage_handler.open_writer.batch(*open_writers) + writes = [] + closes = [] + for writer, data in zip(writers, objects): + writes.append(writer.write(data)) + closes.append(self._storage_handler.close_writer.delay(writer)) + await asyncio.gather(*writes) + await self._storage_handler.close_writer.batch(*closes) + async def create_writers( self, session_id: str, @@ -292,9 +371,9 @@ async def do_write( if data: await writer.write(data) if is_eof: - close_tasks.append(writer.close()) + close_tasks.append(self._storage_handler.close_writer.delay(writer)) finished_keys.append(data_key) - await asyncio.gather(*close_tasks) + await self._storage_handler.close_writer.batch(*close_tasks) async with self._lock: for data_key in finished_keys: event = self._writing_infos[(session_id, data_key)].event