From 8b9f5c77954a7912c27df52fe1e26c1e6b38b1d6 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 28 Nov 2025 18:58:10 +0100 Subject: [PATCH 1/6] move subprocess creation into own function --- .../src/deltachat_rpc_client/rpc.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index faf0edaac9..a7ec79dc4b 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -80,16 +80,7 @@ def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltacha def start(self) -> None: """Start RPC server subprocess.""" - popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} - if sys.version_info >= (3, 11): - # Prevent subprocess from capturing SIGINT. - popen_kwargs["process_group"] = 0 - else: - # `process_group` is not supported before Python 3.11. - popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 - - popen_kwargs.update(self._kwargs) - self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) + self.server_stdout, self.server_stdin = self.connect_to_subprocess() self.id_iterator = itertools.count(start=1) self.event_queues = {} self.request_results = {} @@ -102,12 +93,25 @@ def start(self) -> None: self.events_thread = Thread(target=self.events_loop) self.events_thread.start() + def connect_to_subprocess(self): + popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} + if sys.version_info >= (3, 11): + # Prevent subprocess from capturing SIGINT. + popen_kwargs["process_group"] = 0 + else: + # `process_group` is not supported before Python 3.11. + popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 + + popen_kwargs.update(self._kwargs) + process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) + return process.stdout, process.stdin + def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" self.closing = True self.stop_io_for_all_accounts() self.events_thread.join() - self.process.stdin.close() + self.server_stdin.close() self.reader_thread.join() self.request_queue.put(None) self.writer_thread.join() @@ -122,7 +126,7 @@ def __exit__(self, _exc_type, _exc, _tb): def reader_loop(self) -> None: """Process JSON-RPC responses from the RPC server process output.""" try: - while line := self.process.stdout.readline(): + while line := self.server_stdout.readline(): response = json.loads(line) if "id" in response: response_id = response["id"] @@ -138,8 +142,8 @@ def writer_loop(self) -> None: try: while request := self.request_queue.get(): data = (json.dumps(request) + "\n").encode() - self.process.stdin.write(data) - self.process.stdin.flush() + self.server_stdin.write(data) + self.server_stdin.flush() except Exception: # Log an exception if the writer loop dies. From 7dc717fa6249b34f525e0540e15fc159927f75b3 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 28 Nov 2025 19:10:14 +0100 Subject: [PATCH 2/6] strike unncessary kwargs to Rpc --- .../src/deltachat_rpc_client/_utils.py | 9 +++------ .../src/deltachat_rpc_client/rpc.py | 14 ++++++-------- 2 files changed, 9 insertions(+), 14 deletions(-) diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/_utils.py b/deltachat-rpc-client/src/deltachat_rpc_client/_utils.py index 47d9b878cd..bd9ebdeddd 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/_utils.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/_utils.py @@ -47,7 +47,6 @@ def __setattr__(self, attr, val): def run_client_cli( hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None, argv: Optional[list] = None, - **kwargs, ) -> None: """Run a simple command line app, using the given hooks. @@ -55,13 +54,12 @@ def run_client_cli( """ from .client import Client - _run_cli(Client, hooks, argv, **kwargs) + _run_cli(Client, hooks, argv) def run_bot_cli( hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None, argv: Optional[list] = None, - **kwargs, ) -> None: """Run a simple bot command line using the given hooks. @@ -69,14 +67,13 @@ def run_bot_cli( """ from .client import Bot - _run_cli(Bot, hooks, argv, **kwargs) + _run_cli(Bot, hooks, argv) def _run_cli( client_type: Type["Client"], hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None, argv: Optional[list] = None, - **kwargs, ) -> None: from .deltachat import DeltaChat from .rpc import Rpc @@ -94,7 +91,7 @@ def _run_cli( parser.add_argument("--password", action="store", help="password", default=os.getenv("DELTACHAT_PASSWORD")) args = parser.parse_args(argv[1:]) - with Rpc(accounts_dir=args.accounts_dir, **kwargs) as rpc: + with Rpc(accounts_dir=args.accounts_dir) as rpc: deltachat = DeltaChat(rpc) core_version = (deltachat.get_system_info()).deltachat_core_version accounts = deltachat.get_all_accounts() diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index a7ec79dc4b..76801e8510 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -54,18 +54,13 @@ def rpc_future(): class Rpc: """RPC client.""" - def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs): + def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server"): """Initialize RPC client. The given arguments will be passed to subprocess.Popen(). """ - if accounts_dir: - kwargs["env"] = { - **kwargs.get("env", os.environ), - "DC_ACCOUNTS_PATH": str(accounts_dir), - } + self._accounts_dir = accounts_dir - self._kwargs = kwargs self.rpc_server_path = rpc_server_path self.process: subprocess.Popen self.id_iterator: Iterator[int] @@ -102,7 +97,10 @@ def connect_to_subprocess(self): # `process_group` is not supported before Python 3.11. popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 - popen_kwargs.update(self._kwargs) + if self._accounts_dir: + popen_kwargs["env"] = os.environ.copy() + popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir) + process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) return process.stdout, process.stdin From a77e9cb8d541c90849f00b09334fb4018ca7e61e Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 28 Nov 2025 19:49:05 +0100 Subject: [PATCH 3/6] separate out subprocess handling into subclass --- .../src/deltachat_rpc_client/rpc.py | 75 +++++++++++-------- 1 file changed, 43 insertions(+), 32 deletions(-) diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index 76801e8510..fe748fdabe 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -10,7 +10,10 @@ import sys from queue import Empty, Queue from threading import Thread -from typing import Any, Iterator, Optional +from typing import TYPE_CHECKING, Any, Iterator, Optional + +if TYPE_CHECKING: + import io class JsonRpcError(Exception): @@ -51,18 +54,8 @@ def rpc_future(): return rpc_future -class Rpc: - """RPC client.""" - - def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server"): - """Initialize RPC client. - - The given arguments will be passed to subprocess.Popen(). - """ - self._accounts_dir = accounts_dir - - self.rpc_server_path = rpc_server_path - self.process: subprocess.Popen +class BaseRpc: + def __init__(self): self.id_iterator: Iterator[int] self.event_queues: dict[int, Queue] # Map from request ID to a Queue which provides a single result @@ -75,7 +68,7 @@ def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltacha def start(self) -> None: """Start RPC server subprocess.""" - self.server_stdout, self.server_stdin = self.connect_to_subprocess() + self.server_stdout, self.server_stdin = self.connect_to_server() self.id_iterator = itertools.count(start=1) self.event_queues = {} self.request_results = {} @@ -88,28 +81,11 @@ def start(self) -> None: self.events_thread = Thread(target=self.events_loop) self.events_thread.start() - def connect_to_subprocess(self): - popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} - if sys.version_info >= (3, 11): - # Prevent subprocess from capturing SIGINT. - popen_kwargs["process_group"] = 0 - else: - # `process_group` is not supported before Python 3.11. - popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 - - if self._accounts_dir: - popen_kwargs["env"] = os.environ.copy() - popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir) - - process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) - return process.stdout, process.stdin - def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" self.closing = True - self.stop_io_for_all_accounts() + self.disconnect_from_server() self.events_thread.join() - self.server_stdin.close() self.reader_thread.join() self.request_queue.put(None) self.writer_thread.join() @@ -185,3 +161,38 @@ def clear_all_events(self, account_id: int): def __getattr__(self, attr: str): return RpcMethod(self, attr) + + +class Rpc(BaseRpc): + """RPC client that runs and connects to a deltachat-rpc-server in a subprocess.""" + + def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"): + """Initialize RPC client. + + The given arguments will be passed to subprocess.Popen(). + """ + super(Rpc, self).__init__() + self.server_stdout: io.Writer[bytes] + self.server_stdin: io.Reader[bytes] + self._accounts_dir = accounts_dir + self.rpc_server_path = rpc_server_path + + def connect_to_server(self): + popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} + if sys.version_info >= (3, 11): + # Prevent subprocess from capturing SIGINT. + popen_kwargs["process_group"] = 0 + else: + # `process_group` is not supported before Python 3.11. + popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509 + + if self._accounts_dir: + popen_kwargs["env"] = os.environ.copy() + popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir) + + process = subprocess.Popen(self.rpc_server_path, **popen_kwargs) + return process.stdout, process.stdin + + def disconnect_from_server(self): + self.stop_io_for_all_accounts() + self.server_stdin.close() From 00a6f41441e32028ce780135f0fc54bea7f5df34 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Fri, 28 Nov 2025 21:40:08 +0100 Subject: [PATCH 4/6] make fifo's work --- .../src/deltachat_rpc_client/__init__.py | 3 +- .../src/deltachat_rpc_client/rpc.py | 44 ++++++++++++++++--- deltachat-rpc-client/tests/test_rpc_fifo.py | 22 ++++++++++ 3 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 deltachat-rpc-client/tests/test_rpc_fifo.py diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py index 1a532c7011..7a22548161 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/__init__.py @@ -8,7 +8,7 @@ from .contact import Contact from .deltachat import DeltaChat from .message import Message -from .rpc import Rpc +from .rpc import Rpc, RpcFIFO __all__ = [ "Account", @@ -22,6 +22,7 @@ "Message", "SpecialContactId", "Rpc", + "RpcFIFO", "run_bot_cli", "run_client_cli", ] diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index fe748fdabe..880c7c0af8 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -47,6 +47,8 @@ def future(self, *args) -> Any: def rpc_future(): """Wait for the request to receive a result.""" response = queue.get() + if response is RpcShutdownError: + raise RpcShutdownError(f"no response for {request_id}/{self.name} but rpc is shutting down") if "error" in response: raise JsonRpcError(response["error"]) return response.get("result", None) @@ -54,13 +56,22 @@ def rpc_future(): return rpc_future +class RpcShutdownError(Exception): + """Raised in RPC methods if the connection to server is closing.""" + + class BaseRpc: + """Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods + from subclasses to work concretely.""" + def __init__(self): self.id_iterator: Iterator[int] self.event_queues: dict[int, Queue] # Map from request ID to a Queue which provides a single result self.request_results: dict[int, Queue] self.request_queue: Queue[Any] + self.server_stdin: io.Writer[bytes] + self.server_stdout: io.Reader[bytes] self.closing: bool self.reader_thread: Thread self.writer_thread: Thread @@ -85,8 +96,8 @@ def close(self) -> None: """Terminate RPC server process and wait until the reader loop finishes.""" self.closing = True self.disconnect_from_server() - self.events_thread.join() self.reader_thread.join() + self.events_thread.join() self.request_queue.put(None) self.writer_thread.join() @@ -111,6 +122,10 @@ def reader_loop(self) -> None: # Log an exception if the reader loop dies. logging.exception("Exception in the reader loop") + # terminate pending rpc requests because no responses can arrive anymore + for queue in self.request_results.values(): + queue.put(RpcShutdownError) + def writer_loop(self) -> None: """Writer loop ensuring only a single thread writes requests.""" try: @@ -135,7 +150,10 @@ def events_loop(self) -> None: while True: if self.closing: return - event = self.get_next_event() + try: + event = self.get_next_event() + except RpcShutdownError: + return account_id = event["contextId"] queue = self.get_queue(account_id) event = event["event"] @@ -172,10 +190,8 @@ def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional The given arguments will be passed to subprocess.Popen(). """ super(Rpc, self).__init__() - self.server_stdout: io.Writer[bytes] - self.server_stdin: io.Reader[bytes] self._accounts_dir = accounts_dir - self.rpc_server_path = rpc_server_path + self.rpc_server_path: str = rpc_server_path def connect_to_server(self): popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE} @@ -196,3 +212,21 @@ def connect_to_server(self): def disconnect_from_server(self): self.stop_io_for_all_accounts() self.server_stdin.close() + + +class RpcFIFO(BaseRpc): + """RPC client that runs and connects to a deltachat-rpc-server through FIFO files.""" + + def __init__(self, fn_request_fifo: str, fn_response_fifo: str): + super(RpcFIFO, self).__init__() + self.fn_request_fifo = fn_request_fifo + self.fn_response_fifo = fn_response_fifo + + def connect_to_server(self): + server_stdin = open(self.fn_request_fifo, "wb") # noqa + server_stdout = open(self.fn_response_fifo, "rb") # noqa + return server_stdout, server_stdin + + def disconnect_from_server(self): + self.server_stdin.close() + self.server_stdout.close() diff --git a/deltachat-rpc-client/tests/test_rpc_fifo.py b/deltachat-rpc-client/tests/test_rpc_fifo.py new file mode 100644 index 0000000000..7bf66dd277 --- /dev/null +++ b/deltachat-rpc-client/tests/test_rpc_fifo.py @@ -0,0 +1,22 @@ +import os +import platform # noqa +import subprocess + +import pytest + +from deltachat_rpc_client import DeltaChat, RpcFIFO + + +@pytest.mark.skipif("platform.system() == 'Windows'") +def test_rpc_fifo(tmp_path): + fn_request_fifo = tmp_path.joinpath("request_fifo") + fn_response_fifo = tmp_path.joinpath("response_fifo") + os.mkfifo(fn_request_fifo) + os.mkfifo(fn_response_fifo) + popen = subprocess.Popen(f"deltachat-rpc-server <{fn_request_fifo} >{fn_response_fifo}", shell=True) + + rpc = RpcFIFO(fn_response_fifo=fn_response_fifo, fn_request_fifo=fn_request_fifo) + with rpc: + dc = DeltaChat(rpc) + assert dc.rpc.get_system_info()["deltachat_core_version"] is not None + popen.wait() From 5542dcfa0af9c7d55c18bbba23a2a592261b9611 Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sun, 30 Nov 2025 14:29:13 +0100 Subject: [PATCH 5/6] address link2xt feedback --- .../src/deltachat_rpc_client/rpc.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index 880c7c0af8..194470ea4a 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -20,6 +20,10 @@ class JsonRpcError(Exception): """JSON-RPC error.""" +class RpcShutdownError(JsonRpcError): + """Raised in RPC methods if the connection to server is closing.""" + + class RpcMethod: """RPC method.""" @@ -47,8 +51,8 @@ def future(self, *args) -> Any: def rpc_future(): """Wait for the request to receive a result.""" response = queue.get() - if response is RpcShutdownError: - raise RpcShutdownError(f"no response for {request_id}/{self.name} but rpc is shutting down") + if response is None: + raise RpcShutdownError(f"no response for {request_id}/{self.name} while rpc is shutting down") if "error" in response: raise JsonRpcError(response["error"]) return response.get("result", None) @@ -56,10 +60,6 @@ def rpc_future(): return rpc_future -class RpcShutdownError(Exception): - """Raised in RPC methods if the connection to server is closing.""" - - class BaseRpc: """Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods from subclasses to work concretely.""" @@ -124,7 +124,7 @@ def reader_loop(self) -> None: # terminate pending rpc requests because no responses can arrive anymore for queue in self.request_results.values(): - queue.put(RpcShutdownError) + queue.put(None) def writer_loop(self) -> None: """Writer loop ensuring only a single thread writes requests.""" From 51e58583e0dd18f7632fa318e5071feddc5adbae Mon Sep 17 00:00:00 2001 From: holger krekel Date: Sun, 30 Nov 2025 14:49:08 +0100 Subject: [PATCH 6/6] better naming --- deltachat-rpc-client/src/deltachat_rpc_client/rpc.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py index 194470ea4a..c588d971a1 100644 --- a/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py +++ b/deltachat-rpc-client/src/deltachat_rpc_client/rpc.py @@ -27,7 +27,7 @@ class RpcShutdownError(JsonRpcError): class RpcMethod: """RPC method.""" - def __init__(self, rpc: "Rpc", name: str): + def __init__(self, rpc: "BaseRpc", name: str): self.rpc = rpc self.name = name @@ -181,7 +181,7 @@ def __getattr__(self, attr: str): return RpcMethod(self, attr) -class Rpc(BaseRpc): +class RpcSubprocess(BaseRpc): """RPC client that runs and connects to a deltachat-rpc-server in a subprocess.""" def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"): @@ -189,7 +189,7 @@ def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional The given arguments will be passed to subprocess.Popen(). """ - super(Rpc, self).__init__() + super(RpcSubprocess, self).__init__() self._accounts_dir = accounts_dir self.rpc_server_path: str = rpc_server_path @@ -214,6 +214,10 @@ def disconnect_from_server(self): self.server_stdin.close() +# backward compatibility +Rpc = RpcSubprocess + + class RpcFIFO(BaseRpc): """RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""