From 4e0195b05c16f8c06e35892b68c15852595cf29f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Mon, 18 Aug 2025 11:10:41 +0000 Subject: [PATCH 1/5] Protect shutdown with lock. Allow shutdown more than once. --- src/qasync/__init__.py | 48 +++++++++++++++++++-------------------- tests/test_qthreadexec.py | 26 ++++++++++++++++----- 2 files changed, 43 insertions(+), 31 deletions(-) diff --git a/src/qasync/__init__.py b/src/qasync/__init__.py index d35a5ee..b5ae07b 100644 --- a/src/qasync/__init__.py +++ b/src/qasync/__init__.py @@ -22,6 +22,7 @@ import time from concurrent.futures import Future from queue import Queue +from threading import Lock from typing import TYPE_CHECKING, Literal, Tuple, cast, get_args logger = logging.getLogger(__name__) @@ -172,45 +173,42 @@ def __init__(self, max_workers=10, stack_size=None): self.__workers = [ _QThreadWorker(self.__queue, i + 1, stack_size) for i in range(max_workers) ] + self.__shutdown_lock = Lock() self.__been_shutdown = False for w in self.__workers: w.start() def submit(self, callback, *args, **kwargs): - if self.__been_shutdown: - raise RuntimeError("QThreadExecutor has been shutdown") + with self.__shutdown_lock: + if self.__been_shutdown: + raise RuntimeError("QThreadExecutor has been shutdown") - future = Future() - self._logger.debug( - "Submitting callback %s with args %s and kwargs %s to thread worker queue", - callback, - args, - kwargs, - ) - self.__queue.put((future, callback, args, kwargs)) - return future + future = Future() + self._logger.debug( + "Submitting callback %s with args %s and kwargs %s to thread worker queue", + callback, + args, + kwargs, + ) + self.__queue.put((future, callback, args, kwargs)) + return future def map(self, func, *iterables, timeout=None): raise NotImplementedError("use as_completed on the event loop") def shutdown(self, wait=True): - if self.__been_shutdown: - raise RuntimeError("QThreadExecutor has been shutdown") - - self.__been_shutdown = True - - self._logger.debug("Shutting down") - for i in range(len(self.__workers)): - # Signal workers to stop - self.__queue.put(None) - if wait: - for w in self.__workers: - w.wait() + with self.__shutdown_lock: + self.__been_shutdown = True + self._logger.debug("Shutting down") + for i in range(len(self.__workers)): + # Signal workers to stop + self.__queue.put(None) + if wait: + for w in self.__workers: + w.wait() def __enter__(self, *args): - if self.__been_shutdown: - raise RuntimeError("QThreadExecutor has been shutdown") return self def __exit__(self, *args): diff --git a/tests/test_qthreadexec.py b/tests/test_qthreadexec.py index 67c1833..b311f41 100644 --- a/tests/test_qthreadexec.py +++ b/tests/test_qthreadexec.py @@ -44,15 +44,16 @@ def shutdown_executor(): return exe -def test_shutdown_after_shutdown(shutdown_executor): - with pytest.raises(RuntimeError): - shutdown_executor.shutdown() +@pytest.mark.parametrize("wait", [True, False]) +def test_shutdown_after_shutdown(shutdown_executor, wait): + # it is safe to shutdown twice + shutdown_executor.shutdown(wait=wait) def test_ctx_after_shutdown(shutdown_executor): - with pytest.raises(RuntimeError): - with shutdown_executor: - pass + # it is safe to enter and exit the context after shutdown + with shutdown_executor: + pass def test_submit_after_shutdown(shutdown_executor): @@ -104,3 +105,16 @@ def test_no_stale_reference_as_result(executor, disable_executor_logging): assert collected is True, ( "Stale reference to executor result not collected within timeout." ) + + +def test_context(executor): + """Test that the context manager will shutdown executor""" + with executor: + f = executor.submit(lambda: 42) + assert f.result() == 42 + + # it can be entered again + with executor: + # but will fail when we submit + with pytest.raises(RuntimeError): + executor.submit(lambda: 42) From 4e0ef196616c4dac9d321402184a4d2e5607305b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Sat, 16 Aug 2025 16:06:04 +0000 Subject: [PATCH 2/5] Support the `cancel_futures` parameter to executor.shutdown() --- src/qasync/__init__.py | 9 ++++++++- tests/test_qthreadexec.py | 27 +++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/qasync/__init__.py b/src/qasync/__init__.py index b5ae07b..e3d0279 100644 --- a/src/qasync/__init__.py +++ b/src/qasync/__init__.py @@ -197,10 +197,17 @@ def submit(self, callback, *args, **kwargs): def map(self, func, *iterables, timeout=None): raise NotImplementedError("use as_completed on the event loop") - def shutdown(self, wait=True): + def shutdown(self, wait=True, *, cancel_futures=False): with self.__shutdown_lock: self.__been_shutdown = True self._logger.debug("Shutting down") + if cancel_futures: + # pop all the futures and cancel them + while not self.__queue.empty(): + item = self.__queue.get_nowait() + if item is not None: + future, _, _, _ = item + future.cancel() for i in range(len(self.__workers)): # Signal workers to stop self.__queue.put(None) diff --git a/tests/test_qthreadexec.py b/tests/test_qthreadexec.py index b311f41..bffc969 100644 --- a/tests/test_qthreadexec.py +++ b/tests/test_qthreadexec.py @@ -4,7 +4,9 @@ # BSD License import logging import threading +import time import weakref +from concurrent.futures import CancelledError import pytest @@ -118,3 +120,28 @@ def test_context(executor): # but will fail when we submit with pytest.raises(RuntimeError): executor.submit(lambda: 42) + + +@pytest.mark.parametrize("cancel", [True, False]) +def test_shutdown_cancel_futures(executor, cancel): + """Test that shutdown with cancel_futures=True cancels all remaining futures in the queue.""" + + def task(): + time.sleep(0.01) + + # Submit ten tasks to the executor + futures = [executor.submit(task) for _ in range(10)] + # shut it down + executor.shutdown(cancel_futures=cancel) + + cancels = 0 + for future in futures: + try: + future.result(timeout=0.01) + except CancelledError: + cancels += 1 + + if cancel: + assert cancels > 0 + else: + assert cancels == 0 From 3ba10cdcffd685207b33224f2a84df1c52186d34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Sat, 16 Aug 2025 17:13:29 +0000 Subject: [PATCH 3/5] add executor.map() functionality --- src/qasync/__init__.py | 30 ++++++++++- tests/test_qthreadexec.py | 103 +++++++++++++++++++++++++++++++++++++- 2 files changed, 131 insertions(+), 2 deletions(-) diff --git a/src/qasync/__init__.py b/src/qasync/__init__.py index e3d0279..21f5a30 100644 --- a/src/qasync/__init__.py +++ b/src/qasync/__init__.py @@ -195,7 +195,25 @@ def submit(self, callback, *args, **kwargs): return future def map(self, func, *iterables, timeout=None): - raise NotImplementedError("use as_completed on the event loop") + deadline = time.monotonic() + timeout if timeout is not None else None + futures = [self.submit(func, *args) for args in zip(*iterables)] + + # must have generator as a closure so that the submit occurs before first iteration + def generator(): + try: + futures.reverse() + while futures: + if deadline is not None: + yield _result_or_cancel( + futures.pop(), timeout=deadline - time.monotonic() + ) + else: + yield _result_or_cancel(futures.pop()) + finally: + for future in futures: + future.cancel() + + return generator() def shutdown(self, wait=True, *, cancel_futures=False): with self.__shutdown_lock: @@ -222,6 +240,16 @@ def __exit__(self, *args): self.shutdown() +def _result_or_cancel(fut, timeout=None): + try: + try: + return fut.result(timeout) + finally: + fut.cancel() + finally: + del fut # break reference cycle in exceptions + + def _format_handle(handle: asyncio.Handle): cb = getattr(handle, "_callback", None) if isinstance(getattr(cb, "__self__", None), asyncio.tasks.Task): diff --git a/tests/test_qthreadexec.py b/tests/test_qthreadexec.py index bffc969..30eff1e 100644 --- a/tests/test_qthreadexec.py +++ b/tests/test_qthreadexec.py @@ -6,7 +6,8 @@ import threading import time import weakref -from concurrent.futures import CancelledError +from concurrent.futures import CancelledError, TimeoutError +from itertools import islice import pytest @@ -145,3 +146,103 @@ def task(): assert cancels > 0 else: assert cancels == 0 + + +def test_map(executor): + """Basic test of executor map functionality""" + results = list(executor.map(lambda x: x + 1, range(10))) + assert results == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + + results = list(executor.map(lambda x, y: x + y, range(10), range(9))) + assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16] + + +def test_map_timeout(executor): + """Test that map with timeout raises TimeoutError and cancels futures""" + results = [] + + def func(x): + nonlocal results + time.sleep(0.05) + results.append(x) + return x + + start = time.monotonic() + with pytest.raises(TimeoutError): + list(executor.map(func, range(10), timeout=0.01)) + duration = time.monotonic() - start + # this test is flaky on some platforms, so we give it a wide bearth. + assert duration < 0.1 + + executor.shutdown(wait=True) + # only about half of the tasks should have completed + # because the max number of workers is 5 and the rest of + # the tasks were not started at the time of the cancel. + assert set(results) != {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + + +def test_map_error(executor): + """Test that map with an exception will raise, and remaining tasks are cancelled""" + results = [] + + def func(x): + nonlocal results + time.sleep(0.05) + if len(results) == 5: + raise ValueError("Test error") + results.append(x) + return x + + with pytest.raises(ValueError): + list(executor.map(func, range(15))) + + executor.shutdown(wait=True, cancel_futures=False) + assert len(results) <= 10, "Final 5 at least should have been cancelled" + + +@pytest.mark.parametrize("cancel", [True, False]) +def test_map_shutdown(executor, cancel): + results = [] + + def func(x): + nonlocal results + time.sleep(0.05) + results.append(x) + return x + + # Get the first few results. + # Keep the iterator alive so that it isn't closed when its reference is dropped. + m = executor.map(func, range(15)) + values = list(islice(m, 5)) + assert values == [0, 1, 2, 3, 4] + + executor.shutdown(wait=True, cancel_futures=cancel) + if cancel: + assert len(results) < 15, "Some tasks should have been cancelled" + else: + assert len(results) == 15, "All tasks should have been completed" + m.close() + + +def test_map_start(executor): + """Test that map starts tasks immediately, before iterating""" + e = threading.Event() + m = executor.map(lambda x: (e.set(), x), range(1)) + e.wait(timeout=0.1) + assert list(m) == [(None, 0)] + + +def test_map_close(executor): + """Test that closing a running map cancels all remaining tasks.""" + results = [] + def func(x): + nonlocal results + time.sleep(0.05) + results.append(x) + return x + m = executor.map(func, range(10)) + # must start the generator so that close() has any effect + assert next(m) == 0 + m.close() + executor.shutdown(wait=True, cancel_futures=False) + assert len(results) < 10, "Some tasks should have been cancelled" From 6ec607704ee1223995f2dedca3735f9bb3ffa903 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 10 Oct 2025 15:28:18 +0000 Subject: [PATCH 4/5] use mocking for cancellation tests to avoid timing issues. --- tests/test_qthreadexec.py | 74 ++++++++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 28 deletions(-) diff --git a/tests/test_qthreadexec.py b/tests/test_qthreadexec.py index 30eff1e..9d5c95e 100644 --- a/tests/test_qthreadexec.py +++ b/tests/test_qthreadexec.py @@ -6,8 +6,9 @@ import threading import time import weakref -from concurrent.futures import CancelledError, TimeoutError +from concurrent.futures import Future, TimeoutError from itertools import islice +from unittest.mock import Mock, patch import pytest @@ -124,28 +125,31 @@ def test_context(executor): @pytest.mark.parametrize("cancel", [True, False]) -def test_shutdown_cancel_futures(executor, cancel): +def test_shutdown_cancel_futures(cancel): """Test that shutdown with cancel_futures=True cancels all remaining futures in the queue.""" - def task(): - time.sleep(0.01) + # Create an executor with no workers so futures stay queued and never execute + executor = qasync.QThreadExecutor(max_workers=0) - # Submit ten tasks to the executor - futures = [executor.submit(task) for _ in range(10)] - # shut it down - executor.shutdown(cancel_futures=cancel) + futures = [executor.submit(lambda: None) for _ in range(10)] - cancels = 0 - for future in futures: - try: - future.result(timeout=0.01) - except CancelledError: - cancels += 1 + # Shutdown with cancel_futures parameter + executor.shutdown(wait=False, cancel_futures=cancel) if cancel: - assert cancels > 0 + # All futures should be cancelled since no workers consumed them + cancelled_count = sum(1 for f in futures if f.cancelled()) + assert cancelled_count == 10, ( + f"Expected all 10 futures to be cancelled, got {cancelled_count}" + ) else: - assert cancels == 0 + # No futures should be cancelled, they should still be pending + cancelled_count = sum(1 for f in futures if f.cancelled()) + assert cancelled_count == 0, ( + f"Expected no futures to be cancelled, got {cancelled_count}" + ) + + executor.shutdown(wait=True, cancel_futures=False) def test_map(executor): @@ -232,17 +236,31 @@ def test_map_start(executor): assert list(m) == [(None, 0)] -def test_map_close(executor): +def test_map_close(): """Test that closing a running map cancels all remaining tasks.""" - results = [] - def func(x): - nonlocal results - time.sleep(0.05) - results.append(x) - return x - m = executor.map(func, range(10)) - # must start the generator so that close() has any effect - assert next(m) == 0 - m.close() + + # Create an executor with no workers so we have full control + executor = qasync.QThreadExecutor(max_workers=0) + + # Create mock futures with proper result() method + mock_futures = [] + for i in range(10): + mock_future = Mock(spec=Future) + mock_future.cancel = Mock(return_value=True) + mock_future.result = Mock(return_value=i) + mock_futures.append(mock_future) + + # Mock submit to return our pre-created futures + with patch.object(executor, "submit", side_effect=mock_futures): + m = executor.map(lambda x: x, range(10)) + # must start the generator so that close() has any effect + assert next(m) == 0 + m.close() + + # All futures should have cancel() called: + # - The first one via _result_or_cancel after next() consumed it + # - The rest via the finally block when the generator is closed + for i, f in enumerate(mock_futures): + assert f.cancel.called, f"Future {i} should have been cancelled" + executor.shutdown(wait=True, cancel_futures=False) - assert len(results) < 10, "Some tasks should have been cancelled" From 8378c01aba540d795a9c9174c711a3ac7579c639 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 10 Oct 2025 15:48:50 +0000 Subject: [PATCH 5/5] more mocking for test_map --- tests/test_qthreadexec.py | 149 ++++++++++++++++++-------------------- 1 file changed, 72 insertions(+), 77 deletions(-) diff --git a/tests/test_qthreadexec.py b/tests/test_qthreadexec.py index 9d5c95e..973bddb 100644 --- a/tests/test_qthreadexec.py +++ b/tests/test_qthreadexec.py @@ -4,10 +4,8 @@ # BSD License import logging import threading -import time import weakref from concurrent.futures import Future, TimeoutError -from itertools import islice from unittest.mock import Mock, patch import pytest @@ -48,6 +46,18 @@ def shutdown_executor(): return exe +@pytest.fixture +def executor0(): + """ + Provides a QThreadExecutor with max_workers=0 for deterministic testing. + """ + executor = qasync.QThreadExecutor(max_workers=0) + try: + yield executor + finally: + executor.shutdown(wait=True, cancel_futures=False) + + @pytest.mark.parametrize("wait", [True, False]) def test_shutdown_after_shutdown(shutdown_executor, wait): # it is safe to shutdown twice @@ -125,16 +135,13 @@ def test_context(executor): @pytest.mark.parametrize("cancel", [True, False]) -def test_shutdown_cancel_futures(cancel): +def test_shutdown_cancel_futures(executor0, cancel): """Test that shutdown with cancel_futures=True cancels all remaining futures in the queue.""" - # Create an executor with no workers so futures stay queued and never execute - executor = qasync.QThreadExecutor(max_workers=0) - - futures = [executor.submit(lambda: None) for _ in range(10)] + futures = [executor0.submit(lambda: None) for _ in range(10)] # Shutdown with cancel_futures parameter - executor.shutdown(wait=False, cancel_futures=cancel) + executor0.shutdown(wait=False, cancel_futures=cancel) if cancel: # All futures should be cancelled since no workers consumed them @@ -149,8 +156,6 @@ def test_shutdown_cancel_futures(cancel): f"Expected no futures to be cancelled, got {cancelled_count}" ) - executor.shutdown(wait=True, cancel_futures=False) - def test_map(executor): """Basic test of executor map functionality""" @@ -161,86 +166,78 @@ def test_map(executor): assert results == [0, 2, 4, 6, 8, 10, 12, 14, 16] -def test_map_timeout(executor): - """Test that map with timeout raises TimeoutError and cancels futures""" - results = [] +def test_map_timeout(executor0): + """Test that map with timeout propagates the timeout parameter to future.result()""" - def func(x): - nonlocal results - time.sleep(0.05) - results.append(x) - return x + f = Mock(spec=Future) + f.result = Mock(side_effect=TimeoutError("Timeout")) + f.cancel = Mock(return_value=True) - start = time.monotonic() - with pytest.raises(TimeoutError): - list(executor.map(func, range(10), timeout=0.01)) - duration = time.monotonic() - start - # this test is flaky on some platforms, so we give it a wide bearth. - assert duration < 0.1 + with patch.object(executor0, "submit", return_value=f): + with pytest.raises(TimeoutError, match="Timeout"): + list(executor0.map(lambda x: x, [1], timeout=0.5)) - executor.shutdown(wait=True) - # only about half of the tasks should have completed - # because the max number of workers is 5 and the rest of - # the tasks were not started at the time of the cancel. - assert set(results) != {0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + # Verify the timeout parameter was passed to result() (not None) + # Note: The timeout is calculated as (deadline - time.monotonic()), so it will be + # slightly less than 0.5 due to the time taken to submit futures and start iteration + assert f.result.called + f_timeout = f.result.call_args[0][0] if f.result.call_args[0] else None + assert f_timeout is not None + assert f_timeout <= 0.5 -def test_map_error(executor): +def test_map_error(executor0): """Test that map with an exception will raise, and remaining tasks are cancelled""" - results = [] - def func(x): - nonlocal results - time.sleep(0.05) - if len(results) == 5: - raise ValueError("Test error") - results.append(x) - return x + # Create 3 futures: one success, one exception, one to be cancelled + mock_futures = [] - with pytest.raises(ValueError): - list(executor.map(func, range(15))) + # First future succeeds + f0 = Mock(spec=Future) + f0.result = Mock(return_value=0) + f0.cancel = Mock(return_value=True) + mock_futures.append(f0) - executor.shutdown(wait=True, cancel_futures=False) - assert len(results) <= 10, "Final 5 at least should have been cancelled" + # Second future raises an exception + f1 = Future() + f1.set_exception(ValueError("Test error")) + mock_futures.append(f1) + # Third future should be cancelled + f2 = Mock(spec=Future) + f2.result = Mock(return_value=2) + f2.cancel = Mock(return_value=True) + mock_futures.append(f2) -@pytest.mark.parametrize("cancel", [True, False]) -def test_map_shutdown(executor, cancel): - results = [] - - def func(x): - nonlocal results - time.sleep(0.05) - results.append(x) - return x - - # Get the first few results. - # Keep the iterator alive so that it isn't closed when its reference is dropped. - m = executor.map(func, range(15)) - values = list(islice(m, 5)) - assert values == [0, 1, 2, 3, 4] - - executor.shutdown(wait=True, cancel_futures=cancel) - if cancel: - assert len(results) < 15, "Some tasks should have been cancelled" - else: - assert len(results) == 15, "All tasks should have been completed" - m.close() + with patch.object(executor0, "submit", side_effect=mock_futures): + with pytest.raises(ValueError, match="Test error"): + list(executor0.map(lambda x: x, range(3))) + + # Verify the third future was cancelled when the exception occurred + assert f2.cancel.called, "Future after exception should have been cancelled" -def test_map_start(executor): +def test_map_start(executor0): """Test that map starts tasks immediately, before iterating""" - e = threading.Event() - m = executor.map(lambda x: (e.set(), x), range(1)) - e.wait(timeout=0.1) - assert list(m) == [(None, 0)] + # Mock future that returns immediately + mock_future = Mock(spec=Future) + mock_future.result = Mock(return_value=0) + mock_future.cancel = Mock(return_value=True) -def test_map_close(): - """Test that closing a running map cancels all remaining tasks.""" + with patch.object(executor0, "submit", return_value=mock_future) as mock_submit: + # Create the map - submit should be called immediately + m = executor0.map(lambda x: x, range(1)) - # Create an executor with no workers so we have full control - executor = qasync.QThreadExecutor(max_workers=0) + # Verify submit was called before we start iterating + mock_submit.assert_called_once() + + # Now iterate to verify the result + assert list(m) == [0] + + +def test_map_close(executor0): + """Test that closing a running map cancels all remaining tasks.""" # Create mock futures with proper result() method mock_futures = [] @@ -251,8 +248,8 @@ def test_map_close(): mock_futures.append(mock_future) # Mock submit to return our pre-created futures - with patch.object(executor, "submit", side_effect=mock_futures): - m = executor.map(lambda x: x, range(10)) + with patch.object(executor0, "submit", side_effect=mock_futures): + m = executor0.map(lambda x: x, range(10)) # must start the generator so that close() has any effect assert next(m) == 0 m.close() @@ -262,5 +259,3 @@ def test_map_close(): # - The rest via the finally block when the generator is closed for i, f in enumerate(mock_futures): assert f.cancel.called, f"Future {i} should have been cancelled" - - executor.shutdown(wait=True, cancel_futures=False)