diff --git a/examples/stress/benchmark/large_io_comparison.py b/examples/stress/benchmark/large_io_comparison.py index eceacf10e..873981ef4 100644 --- a/examples/stress/benchmark/large_io_comparison.py +++ b/examples/stress/benchmark/large_io_comparison.py @@ -13,6 +13,9 @@ import flyte.storage from flyte.extras import ContainerTask +CPU = 2 +MEMORY = "2Gi" + @dataclass class BenchmarkResult: @@ -26,8 +29,8 @@ class BenchmarkResult: s3fs_env = flyte.TaskEnvironment( "s3fs_benchmark", resources=flyte.Resources( - cpu=8, - memory="23Gi", + cpu=CPU, + memory=MEMORY, ), image=flyte.Image.from_debian_base(name="s3fs-benchmarker").with_pip_packages("s3fs", "fsspec"), ) @@ -51,8 +54,8 @@ class BenchmarkResult: name="s5cmd_download_file", image=s5cmd_image, resources=flyte.Resources( - cpu=8, - memory="23Gi", + cpu=CPU, + memory=MEMORY, ), inputs={"remote_path": str, "file_size_mb": int}, outputs={"duration": float, "throughput_mbps": float}, @@ -82,8 +85,8 @@ class BenchmarkResult: name="s5cmd_download_dir", image=s5cmd_image, resources=flyte.Resources( - cpu=8, - memory="23Gi", + cpu=CPU, + memory=MEMORY, ), inputs={"remote_path": str, "expected_files": int}, outputs={"duration": float, "throughput_mbps": float}, @@ -117,8 +120,8 @@ class BenchmarkResult: env = flyte.TaskEnvironment( "file_io_benchmark", resources=flyte.Resources( - cpu=8, - memory="23Gi", + cpu=CPU, + memory=MEMORY, ), depends_on=[s5cmd_env, s3fs_env], image=flyte.Image.from_debian_base(name="io-benchmarker"), @@ -316,12 +319,13 @@ async def read_dir_s3fs(d: flyte.io.Dir, iterations: int = 10) -> List[Benchmark def generate_benchmark_report( - file_results_new: List[BenchmarkResult], - file_results_s3fs: List[BenchmarkResult], - file_results_s5cmd: List[BenchmarkResult], - dir_results_new: List[BenchmarkResult], - dir_results_s3fs: List[BenchmarkResult], - dir_results_s5cmd: List[BenchmarkResult], + file_results_new: List[BenchmarkResult] | None, + file_results_s3fs: List[BenchmarkResult] | None, + file_results_s5cmd: List[BenchmarkResult] | None, + dir_results_new: List[BenchmarkResult] | None, + dir_results_s3fs: List[BenchmarkResult] | None, + dir_results_s5cmd: List[BenchmarkResult] | None, + iterations: int = 10, ) -> str: """Generate HTML report with benchmark results""" @@ -336,46 +340,21 @@ def calculate_stats(results: List[BenchmarkResult]): avg_throughput = avg_bytes / avg_time / (1024**2) return avg_bytes, avg_time, min_time, max_time, avg_throughput, times - # Calculate stats for file benchmarks - file_new_bytes, file_new_time, file_new_min, file_new_max, file_new_tput, file_new_times = calculate_stats( - file_results_new - ) - file_s3fs_bytes, file_s3fs_time, file_s3fs_min, file_s3fs_max, file_s3fs_tput, file_s3fs_times = calculate_stats( - file_results_s3fs - ) - _, file_s5cmd_time, file_s5cmd_min, file_s5cmd_max, file_s5cmd_tput, file_s5cmd_times = calculate_stats( - file_results_s5cmd - ) - - # Calculate stats for directory benchmarks - dir_new_bytes, dir_new_time, dir_new_min, dir_new_max, dir_new_tput, dir_new_times = calculate_stats( - dir_results_new - ) - dir_s3fs_bytes, dir_s3fs_time, dir_s3fs_min, dir_s3fs_max, dir_s3fs_tput, dir_s3fs_times = calculate_stats( - dir_results_s3fs - ) - _, dir_s5cmd_time, dir_s5cmd_min, dir_s5cmd_max, dir_s5cmd_tput, dir_s5cmd_times = calculate_stats( - dir_results_s5cmd - ) - - html = f""" - -
- - - -| New SDK (Flyte v2) | {file_new_bytes / (1024**3):.2f} | {file_new_time:.2f} | -{file_new_min:.2f} / {file_new_max:.2f} | +{f"{file_new_min:.2f} / {file_new_max:.2f}"} | {file_new_tput:.2f} | {", ".join([f"{t:.2f}" for t in file_new_times])} | s3fs + fsspec (v1 style) | {file_s3fs_bytes / (1024**3):.2f} | {file_s3fs_time:.2f} | -{file_s3fs_min:.2f} / {file_s3fs_max:.2f} | +{f"{file_s3fs_min:.2f} / {file_s3fs_max:.2f}"} | {file_s3fs_tput:.2f} | {", ".join([f"{t:.2f}" for t in file_s3fs_times])} | @@ -406,12 +385,25 @@ def calculate_stats(results: List[BenchmarkResult]):s5cmd (ContainerTask) | ~5.00 | {file_s5cmd_time:.2f} | -{file_s5cmd_min:.2f} / {file_s5cmd_max:.2f} | +{f"{file_s5cmd_min:.2f} / {file_s5cmd_max:.2f}"} | {file_s5cmd_tput:.2f} | {", ".join([f"{t:.2f}" for t in file_s5cmd_times])} |
| New SDK (Flyte v2) | {dir_new_bytes / (1024**3):.2f} | {dir_new_time:.2f} | -{dir_new_min:.2f} / {dir_new_max:.2f} | +{f"{dir_new_min:.2f} / {dir_new_max:.2f}"} | {dir_new_tput:.2f} | {", ".join([f"{t:.2f}" for t in dir_new_times])} | s3fs + fsspec (v1 style) | {dir_s3fs_bytes / (1024**3):.2f} | {dir_s3fs_time:.2f} | -{dir_s3fs_min:.2f} / {dir_s3fs_max:.2f} | +{f"{dir_s3fs_min:.2f} / {dir_s3fs_max:.2f}"} | {dir_s3fs_tput:.2f} | {", ".join([f"{t:.2f}" for t in dir_s3fs_times])} | @@ -442,13 +434,33 @@ def calculate_stats(results: List[BenchmarkResult]):s5cmd (ContainerTask) | ~4.88 | {dir_s5cmd_time:.2f} | -{dir_s5cmd_min:.2f} / {dir_s5cmd_max:.2f} | +{f"{dir_s5cmd_min:.2f} / {dir_s5cmd_max:.2f}"} | {dir_s5cmd_tput:.2f} | {", ".join([f"{t:.2f}" for t in dir_s5cmd_times])} |
All tests run 10 times each in parallel on nodes with 8 CPU, 23Gi memory
+ html = f""" + + + + + +All tests run {iterations} times each in parallel on nodes with {CPU} CPU, {MEMORY} memory
""" @@ -456,76 +468,85 @@ def calculate_stats(results: List[BenchmarkResult]): @env.task(report=True) -async def benchmark_all(): +async def benchmark_all(test_file: bool = True, test_dir: bool = True): """Comprehensive benchmark comparing all download methods""" iterations = 2 print("=" * 80) print(f"Starting comprehensive I/O benchmarks ({iterations} runs each)") print("=" * 80) - # Test 1: Large single file (5GB) - print("\n--- Test 1: Single 5GB file (10 runs) ---") - large_file = await create_file(5120) - - print("Running all downloads in parallel (each method runs 10 iterations)...") - # Run the native tasks (they handle 10 iterations internally) - t1 = asyncio.create_task(read_large_file_new(large_file, iterations=iterations)) - t2 = asyncio.create_task(read_large_file_s3fs(large_file, iterations=iterations)) - - # Run s5cmd 10 times in parallel - async def run_s5cmd_file_multiple(): - print("[s5cmd File] Starting 10 parallel runs...", flush=True) - tasks = [s5cmd_file_task(remote_path=large_file.path, file_size_mb=5120) for _ in range(iterations)] - results = await asyncio.gather(*tasks) - # Convert s5cmd results (duration, throughput) to BenchmarkResult - file_size_bytes = 5120 * 1024 * 1024 - return [BenchmarkResult(bytes=file_size_bytes, duration=r[0]) for r in results] - - t3 = asyncio.create_task(run_s5cmd_file_multiple()) - - file_results_new, file_results_s3fs, file_results_s5cmd = await asyncio.gather(t1, t2, t3) - - # Print summary - avg_time_new = sum(r.duration for r in file_results_new) / len(file_results_new) - avg_time_s3fs = sum(r.duration for r in file_results_s3fs) / len(file_results_s3fs) - avg_time_s5cmd = sum(r.duration for r in file_results_s5cmd) / len(file_results_s5cmd) - - print("\n5GB file average results:") - print(f" New SDK: {avg_time_new:.2f}s avg") - print(f" s3fs: {avg_time_s3fs:.2f}s avg") - print(f" s5cmd: {avg_time_s5cmd:.2f}s avg") - - # Test 2: Directory with 1000 5MB files - print("\n--- Test 2: Directory with 1000 x 5MB files (10 runs) ---") - file_dir = await create_dir_with_files(num_files=1000, size_megabytes=5) - - print("Running directory downloads in parallel (each method runs 10 iterations)...") - # Run the native tasks (they handle 10 iterations internally) - td1 = asyncio.create_task(read_dir_new(file_dir, iterations=iterations)) - td2 = asyncio.create_task(read_dir_s3fs(file_dir, iterations=iterations)) - - # Run s5cmd 10 times in parallel - async def run_s5cmd_dir_multiple(): - print("[s5cmd Dir] Starting 10 parallel runs...", flush=True) - tasks = [s5cmd_dir_task(remote_path=file_dir.path, expected_files=1000) for _ in range(iterations)] - results = await asyncio.gather(*tasks) - # Convert s5cmd results (duration, throughput) to BenchmarkResult - dir_size_bytes = 1000 * 5 * 1024 * 1024 - return [BenchmarkResult(bytes=dir_size_bytes, duration=r[0]) for r in results] - - td3 = asyncio.create_task(run_s5cmd_dir_multiple()) - - dir_results_new, dir_results_s3fs, dir_results_s5cmd = await asyncio.gather(td1, td2, td3) - - # Print summary - avg_time_new_dir = sum(r.duration for r in dir_results_new) / len(dir_results_new) - avg_time_s3fs_dir = sum(r.duration for r in dir_results_s3fs) / len(dir_results_s3fs) - avg_time_s5cmd_dir = sum(r.duration for r in dir_results_s5cmd) / len(dir_results_s5cmd) - - print("\nDirectory average results:") - print(f" New SDK: {avg_time_new_dir:.2f}s avg") - print(f" s3fs: {avg_time_s3fs_dir:.2f}s avg") - print(f" s5cmd: {avg_time_s5cmd_dir:.2f}s avg") + file_results_new = None + file_results_s3fs = None + file_results_s5cmd = None + dir_results_new = None + dir_results_s3fs = None + dir_results_s5cmd = None + + if test_file: + # Test 1: Large single file (5GB) + print("\n--- Test 1: Single 5GB file (10 runs) ---") + large_file = await create_file(5120) + + print("Running all downloads in parallel (each method runs 10 iterations)...") + # Run the native tasks (they handle 10 iterations internally) + t1 = asyncio.create_task(read_large_file_new(large_file, iterations=iterations)) + t2 = asyncio.create_task(read_large_file_s3fs(large_file, iterations=iterations)) + + # Run s5cmd 10 times in parallel + async def run_s5cmd_file_multiple(): + print("[s5cmd File] Starting 10 parallel runs...", flush=True) + tasks = [s5cmd_file_task(remote_path=large_file.path, file_size_mb=5120) for _ in range(iterations)] + results = await asyncio.gather(*tasks) + # Convert s5cmd results (duration, throughput) to BenchmarkResult + file_size_bytes = 5120 * 1024 * 1024 + return [BenchmarkResult(bytes=file_size_bytes, duration=r[0]) for r in results] + + t3 = asyncio.create_task(run_s5cmd_file_multiple()) + + file_results_new, file_results_s3fs, file_results_s5cmd = await asyncio.gather(t1, t2, t3) + + # Print summary + avg_time_new = sum(r.duration for r in file_results_new) / len(file_results_new) + avg_time_s3fs = sum(r.duration for r in file_results_s3fs) / len(file_results_s3fs) + avg_time_s5cmd = sum(r.duration for r in file_results_s5cmd) / len(file_results_s5cmd) + # + print("\n5GB file average results:") + print(f" New SDK: {avg_time_new:.2f}s avg") + print(f" s3fs: {avg_time_s3fs:.2f}s avg") + print(f" s5cmd: {avg_time_s5cmd:.2f}s avg") + + if test_dir: + # Test 2: Directory with 1000 5MB files + print("\n--- Test 2: Directory with 1000 x 5MB files (10 runs) ---") + file_dir = await create_dir_with_files(num_files=1000, size_megabytes=5) + + print("Running directory downloads in parallel (each method runs 10 iterations)...") + # Run the native tasks (they handle 10 iterations internally) + td1 = asyncio.create_task(read_dir_new(file_dir, iterations=iterations)) + td2 = asyncio.create_task(read_dir_s3fs(file_dir, iterations=iterations)) + + # Run s5cmd 10 times in parallel + async def run_s5cmd_dir_multiple(): + print("[s5cmd Dir] Starting 10 parallel runs...", flush=True) + tasks = [s5cmd_dir_task(remote_path=file_dir.path, expected_files=1000) for _ in range(iterations)] + results = await asyncio.gather(*tasks) + # Convert s5cmd results (duration, throughput) to BenchmarkResult + dir_size_bytes = 1000 * 5 * 1024 * 1024 + return [BenchmarkResult(bytes=dir_size_bytes, duration=r[0]) for r in results] + + td3 = asyncio.create_task(run_s5cmd_dir_multiple()) + + dir_results_new, dir_results_s3fs, dir_results_s5cmd = await asyncio.gather(td1, td2, td3) + + # Print summary + avg_time_new_dir = sum(r.duration for r in dir_results_new) / len(dir_results_new) + avg_time_s3fs_dir = sum(r.duration for r in dir_results_s3fs) / len(dir_results_s3fs) + avg_time_s5cmd_dir = sum(r.duration for r in dir_results_s5cmd) / len(dir_results_s5cmd) + + print("\nDirectory average results:") + print(f" New SDK: {avg_time_new_dir:.2f}s avg") + print(f" s3fs: {avg_time_s3fs_dir:.2f}s avg") + print(f" s5cmd: {avg_time_s5cmd_dir:.2f}s avg") print("\n" + "=" * 80) print("Benchmark complete!") @@ -539,6 +560,7 @@ async def run_s5cmd_dir_multiple(): dir_results_new=dir_results_new, dir_results_s3fs=dir_results_s3fs, dir_results_s5cmd=dir_results_s5cmd, + iterations=iterations, ) await flyte.report.replace.aio(html) @@ -549,5 +571,5 @@ async def run_s5cmd_dir_multiple(): # $ flyte -c ~/.flyte/builder.remote.demo.yaml run -p flytesnacks -d development stress/benchmark/large_io_comparison.py benchmark_all # noqa: E501 if __name__ == "__main__": flyte.init_from_config() - r = flyte.run(benchmark_all) + r = flyte.run(benchmark_all, test_file=True, test_dir=True) print(r.url) diff --git a/examples/stress/stress_fanout.py b/examples/stress/stress_fanout.py index 777c0fe87..69144de2f 100644 --- a/examples/stress/stress_fanout.py +++ b/examples/stress/stress_fanout.py @@ -115,6 +115,6 @@ async def main( if __name__ == "__main__": flyte.init_from_config() - run = flyte.with_runcontext("local").run(main, fanout_per_layer=[70, 70, 1], sleep_sec=1.0, jitter_sec=0.5) + run = flyte.with_runcontext("remote").run(main, fanout_per_layer=[5, 5, 1], sleep_sec=1.0, jitter_sec=0.5) print(run.outputs) # print(run.url) diff --git a/src/flyte/storage/_storage.py b/src/flyte/storage/_storage.py index e6f4785db..41ff990a6 100644 --- a/src/flyte/storage/_storage.py +++ b/src/flyte/storage/_storage.py @@ -223,7 +223,6 @@ async def get(from_path: str, to_path: Optional[str | pathlib.Path] = None, recu _is_obstore_supported_protocol(file_system.protocol) and hasattr(file_system, "_split_path") and hasattr(file_system, "_construct_store") - and recursive ): return await _get_obstore_bypass(from_path, to_path, recursive, **kwargs)