Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion deltachat-rpc-client/src/deltachat_rpc_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .contact import Contact
from .deltachat import DeltaChat
from .message import Message
from .rpc import Rpc
from .rpc import Rpc, RpcFIFO

__all__ = [
"Account",
Expand All @@ -22,6 +22,7 @@
"Message",
"SpecialContactId",
"Rpc",
"RpcFIFO",
"run_bot_cli",
"run_client_cli",
]
9 changes: 3 additions & 6 deletions deltachat-rpc-client/src/deltachat_rpc_client/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,33 @@ def __setattr__(self, attr, val):
def run_client_cli(
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
) -> None:
"""Run a simple command line app, using the given hooks.

Extra keyword arguments are passed to the internal Rpc object.
"""
from .client import Client

_run_cli(Client, hooks, argv, **kwargs)
_run_cli(Client, hooks, argv)


def run_bot_cli(
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
) -> None:
"""Run a simple bot command line using the given hooks.

Extra keyword arguments are passed to the internal Rpc object.
"""
from .client import Bot

_run_cli(Bot, hooks, argv, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if it is actually unused e.g. for rpc_server_path, at least the documentation should be updated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rpc_server_path was recently introduced. we can re-introduce **kwargs here if actually needed. Passing **kwargs all around makes reasoning about what happens harder in my experience.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason **kwargs existed in the past was to be able to control the created process, ex. IIRC TUI clients need to tweak it so stderr don't dirty the screen,

I am talking about the removal in the Rpc class, but other approaches like allowing to pass a Popen directly to the start method in RpcProcess would also work I guess

_run_cli(Bot, hooks, argv)


def _run_cli(
client_type: Type["Client"],
hooks: Optional[Iterable[Tuple[Callable, Union[type, "EventFilter"]]]] = None,
argv: Optional[list] = None,
**kwargs,
) -> None:
from .deltachat import DeltaChat
from .rpc import Rpc
Expand All @@ -94,7 +91,7 @@ def _run_cli(
parser.add_argument("--password", action="store", help="password", default=os.getenv("DELTACHAT_PASSWORD"))
args = parser.parse_args(argv[1:])

with Rpc(accounts_dir=args.accounts_dir, **kwargs) as rpc:
with Rpc(accounts_dir=args.accounts_dir) as rpc:
deltachat = DeltaChat(rpc)
core_version = (deltachat.get_system_info()).deltachat_core_version
accounts = deltachat.get_all_accounts()
Expand Down
121 changes: 86 additions & 35 deletions deltachat-rpc-client/src/deltachat_rpc_client/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,24 @@
import sys
from queue import Empty, Queue
from threading import Thread
from typing import Any, Iterator, Optional
from typing import TYPE_CHECKING, Any, Iterator, Optional

if TYPE_CHECKING:
import io


class JsonRpcError(Exception):
"""JSON-RPC error."""


class RpcShutdownError(JsonRpcError):
"""Raised in RPC methods if the connection to server is closing."""


class RpcMethod:
"""RPC method."""

def __init__(self, rpc: "Rpc", name: str):
def __init__(self, rpc: "BaseRpc", name: str):
self.rpc = rpc
self.name = name

Expand All @@ -44,52 +51,35 @@ def future(self, *args) -> Any:
def rpc_future():
"""Wait for the request to receive a result."""
response = queue.get()
if response is None:
raise RpcShutdownError(f"no response for {request_id}/{self.name} while rpc is shutting down")
if "error" in response:
raise JsonRpcError(response["error"])
return response.get("result", None)

return rpc_future


class Rpc:
"""RPC client."""
class BaseRpc:
"""Base Rpc class which requires 'connect_to_server' and 'disconnect_from_server' methods
from subclasses to work concretely."""

def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path="deltachat-rpc-server", **kwargs):
"""Initialize RPC client.

The given arguments will be passed to subprocess.Popen().
"""
if accounts_dir:
kwargs["env"] = {
**kwargs.get("env", os.environ),
"DC_ACCOUNTS_PATH": str(accounts_dir),
}

self._kwargs = kwargs
self.rpc_server_path = rpc_server_path
self.process: subprocess.Popen
def __init__(self):
self.id_iterator: Iterator[int]
self.event_queues: dict[int, Queue]
# Map from request ID to a Queue which provides a single result
self.request_results: dict[int, Queue]
self.request_queue: Queue[Any]
self.server_stdin: io.Writer[bytes]
self.server_stdout: io.Reader[bytes]
self.closing: bool
self.reader_thread: Thread
self.writer_thread: Thread
self.events_thread: Thread

def start(self) -> None:
"""Start RPC server subprocess."""
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509

popen_kwargs.update(self._kwargs)
self.process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
self.server_stdout, self.server_stdin = self.connect_to_server()
self.id_iterator = itertools.count(start=1)
self.event_queues = {}
self.request_results = {}
Expand All @@ -105,10 +95,9 @@ def start(self) -> None:
def close(self) -> None:
"""Terminate RPC server process and wait until the reader loop finishes."""
self.closing = True
self.stop_io_for_all_accounts()
self.events_thread.join()
self.process.stdin.close()
self.disconnect_from_server()
self.reader_thread.join()
self.events_thread.join()
self.request_queue.put(None)
self.writer_thread.join()

Expand All @@ -122,7 +111,7 @@ def __exit__(self, _exc_type, _exc, _tb):
def reader_loop(self) -> None:
"""Process JSON-RPC responses from the RPC server process output."""
try:
while line := self.process.stdout.readline():
while line := self.server_stdout.readline():
response = json.loads(line)
if "id" in response:
response_id = response["id"]
Expand All @@ -133,13 +122,17 @@ def reader_loop(self) -> None:
# Log an exception if the reader loop dies.
logging.exception("Exception in the reader loop")

# terminate pending rpc requests because no responses can arrive anymore
for queue in self.request_results.values():
queue.put(None)

def writer_loop(self) -> None:
"""Writer loop ensuring only a single thread writes requests."""
try:
while request := self.request_queue.get():
data = (json.dumps(request) + "\n").encode()
self.process.stdin.write(data)
self.process.stdin.flush()
self.server_stdin.write(data)
self.server_stdin.flush()

except Exception:
# Log an exception if the writer loop dies.
Expand All @@ -157,7 +150,10 @@ def events_loop(self) -> None:
while True:
if self.closing:
return
event = self.get_next_event()
try:
event = self.get_next_event()
except RpcShutdownError:
return
account_id = event["contextId"]
queue = self.get_queue(account_id)
event = event["event"]
Expand All @@ -183,3 +179,58 @@ def clear_all_events(self, account_id: int):

def __getattr__(self, attr: str):
return RpcMethod(self, attr)


class RpcSubprocess(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server in a subprocess."""

def __init__(self, accounts_dir: Optional[str] = None, rpc_server_path: Optional[str] = "deltachat-rpc-server"):
"""Initialize RPC client.

The given arguments will be passed to subprocess.Popen().
"""
super(RpcSubprocess, self).__init__()
self._accounts_dir = accounts_dir
self.rpc_server_path: str = rpc_server_path

def connect_to_server(self):
popen_kwargs = {"stdin": subprocess.PIPE, "stdout": subprocess.PIPE}
if sys.version_info >= (3, 11):
# Prevent subprocess from capturing SIGINT.
popen_kwargs["process_group"] = 0
else:
# `process_group` is not supported before Python 3.11.
popen_kwargs["preexec_fn"] = os.setpgrp # noqa: PLW1509

if self._accounts_dir:
popen_kwargs["env"] = os.environ.copy()
popen_kwargs["env"]["DC_ACCOUNTS_PATH"] = str(self._accounts_dir)

process = subprocess.Popen(self.rpc_server_path, **popen_kwargs)
return process.stdout, process.stdin

def disconnect_from_server(self):
self.stop_io_for_all_accounts()
self.server_stdin.close()


# backward compatibility
Rpc = RpcSubprocess


class RpcFIFO(BaseRpc):
"""RPC client that runs and connects to a deltachat-rpc-server through FIFO files."""

def __init__(self, fn_request_fifo: str, fn_response_fifo: str):
super(RpcFIFO, self).__init__()
self.fn_request_fifo = fn_request_fifo
self.fn_response_fifo = fn_response_fifo

def connect_to_server(self):
server_stdin = open(self.fn_request_fifo, "wb") # noqa
server_stdout = open(self.fn_response_fifo, "rb") # noqa
return server_stdout, server_stdin

def disconnect_from_server(self):
self.server_stdin.close()
self.server_stdout.close()
22 changes: 22 additions & 0 deletions deltachat-rpc-client/tests/test_rpc_fifo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import os
import platform # noqa
import subprocess

import pytest

from deltachat_rpc_client import DeltaChat, RpcFIFO


@pytest.mark.skipif("platform.system() == 'Windows'")
def test_rpc_fifo(tmp_path):
fn_request_fifo = tmp_path.joinpath("request_fifo")
fn_response_fifo = tmp_path.joinpath("response_fifo")
os.mkfifo(fn_request_fifo)
os.mkfifo(fn_response_fifo)
popen = subprocess.Popen(f"deltachat-rpc-server <{fn_request_fifo} >{fn_response_fifo}", shell=True)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For any real use FIFOs should likely be replaced with the UNIX socket, which exists on all platforms (including Windows) and can handle multiple connections.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened #7545 for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to rather go for UNIX sockets, but i think it's fine to merge this PR already. Shouldn't be too hard to write another RpcUNIXSocket subclass after #7545 is done.


rpc = RpcFIFO(fn_response_fifo=fn_response_fifo, fn_request_fifo=fn_request_fifo)
with rpc:
dc = DeltaChat(rpc)
assert dc.rpc.get_system_info()["deltachat_core_version"] is not None
popen.wait()
Loading