Skip to content

Commit 1ab8329

Browse files
committed
make fifo's work
1 parent c04f5b8 commit 1ab8329

File tree

3 files changed

+63
-6
lines changed

3 files changed

+63
-6
lines changed

deltachat-rpc-client/src/deltachat_rpc_client/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from .contact import Contact
99
from .deltachat import DeltaChat
1010
from .message import Message
11-
from .rpc import Rpc
11+
from .rpc import Rpc, RpcFIFO
1212

1313
__all__ = [
1414
"Account",
@@ -22,6 +22,7 @@
2222
"Message",
2323
"SpecialContactId",
2424
"Rpc",
25+
"RpcFIFO",
2526
"run_bot_cli",
2627
"run_client_cli",
2728
]

deltachat-rpc-client/src/deltachat_rpc_client/rpc.py

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,20 +47,31 @@ def future(self, *args) -> Any:
4747
def rpc_future():
4848
"""Wait for the request to receive a result."""
4949
response = queue.get()
50+
if response is RpcShutdownError:
51+
raise RpcShutdownError(f"no response for {request_id}/{self.name} but rpc is shutting down")
5052
if "error" in response:
5153
raise JsonRpcError(response["error"])
5254
return response.get("result", None)
5355

5456
return rpc_future
5557

5658

59+
class RpcShutdownError(Exception):
60+
"""Raised in RPC methods if the connection to server is closing."""
61+
62+
5763
class BaseRpc:
64+
"""Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods
65+
from subclasses to work concretely."""
66+
5867
def __init__(self):
5968
self.id_iterator: Iterator[int]
6069
self.event_queues: dict[int, Queue]
6170
# Map from request ID to a Queue which provides a single result
6271
self.request_results: dict[int, Queue]
6372
self.request_queue: Queue[Any]
73+
self.server_stdin: io.Writer[bytes]
74+
self.server_stdout: io.Reader[bytes]
6475
self.closing: bool
6576
self.reader_thread: Thread
6677
self.writer_thread: Thread
@@ -85,8 +96,8 @@ def close(self) -> None:
8596
"""Terminate RPC server process and wait until the reader loop finishes."""
8697
self.closing = True
8798
self.disconnect_from_server()
88-
self.events_thread.join()
8999
self.reader_thread.join()
100+
self.events_thread.join()
90101
self.request_queue.put(None)
91102
self.writer_thread.join()
92103

@@ -111,6 +122,10 @@ def reader_loop(self) -> None:
111122
# Log an exception if the reader loop dies.
112123
logging.exception("Exception in the reader loop")
113124

125+
# terminate pending rpc requests because no responses can arrive anymore
126+
for queue in self.request_results.values():
127+
queue.put(RpcShutdownError)
128+
114129
def writer_loop(self) -> None:
115130
"""Writer loop ensuring only a single thread writes requests."""
116131
try:
@@ -135,7 +150,10 @@ def events_loop(self) -> None:
135150
while True:
136151
if self.closing:
137152
return
138-
event = self.get_next_event()
153+
try:
154+
event = self.get_next_event()
155+
except RpcShutdownError:
156+
return
139157
account_id = event["contextId"]
140158
queue = self.get_queue(account_id)
141159
event = event["event"]
@@ -172,10 +190,8 @@ def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional
172190
The given arguments will be passed to subprocess.Popen().
173191
"""
174192
super(Rpc, self).__init__()
175-
self.server_stdout: io.Writer[bytes]
176-
self.server_stdin: io.Reader[bytes]
177193
self._accounts_dir = accounts_dir
178-
self.rpc_server_path = rpc_server_path
194+
self.rpc_server_path: str = rpc_server_path
179195

180196
def connect_to_server(self):
181197
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
@@ -196,3 +212,21 @@ def connect_to_server(self):
196212
def disconnect_from_server(self):
197213
self.stop_io_for_all_accounts()
198214
self.server_stdin.close()
215+
216+
217+
class RpcFIFO(BaseRpc):
218+
"""RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""
219+
220+
def __init__(self, fn_request_fifo: str, fn_response_fifo: str):
221+
super(RpcFIFO, self).__init__()
222+
self.fn_request_fifo = fn_request_fifo
223+
self.fn_response_fifo = fn_response_fifo
224+
225+
def connect_to_server(self):
226+
server_stdin = open(self.fn_request_fifo, "wb") # noqa
227+
server_stdout = open(self.fn_response_fifo, "rb") # noqa
228+
return server_stdout, server_stdin
229+
230+
def disconnect_from_server(self):
231+
self.server_stdin.close()
232+
self.server_stdout.close()
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import os
2+
import platform # noqa
3+
import subprocess
4+
5+
import pytest
6+
7+
from deltachat_rpc_client import DeltaChat, RpcFIFO
8+
9+
10+
@pytest.mark.skipif("platform.system() == 'Windows'")
11+
def test_rpc_fifo(tmp_path):
12+
fn_request_fifo = tmp_path.joinpath("request_fifo")
13+
fn_response_fifo = tmp_path.joinpath("response_fifo")
14+
os.mkfifo(fn_request_fifo)
15+
os.mkfifo(fn_response_fifo)
16+
popen = subprocess.Popen(f"deltachat-rpc-server <{fn_request_fifo} >{fn_response_fifo}", shell=True)
17+
18+
rpc = RpcFIFO(fn_response_fifo=fn_response_fifo, fn_request_fifo=fn_request_fifo)
19+
with rpc:
20+
dc = DeltaChat(rpc)
21+
assert dc.rpc.get_system_info()["deltachat_core_version"] is not None
22+
popen.wait()

0 commit comments

Comments
 (0)