From 1b232657307ef25458f59d55e09be8e0fea29e6a Mon Sep 17 00:00:00 2001 From: fourbit Date: Tue, 5 Aug 2025 23:42:04 +0200 Subject: [PATCH 01/16] unfuck replies --- katzenpost_thinclient/__init__.py | 270 +++++++++++------------------- 1 file changed, 95 insertions(+), 175 deletions(-) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index 4fbd118..776fc46 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -101,9 +101,13 @@ def thin_client_error_to_string(error_code: int) -> str: } return error_messages.get(error_code, f"Unknown thin client error code: {error_code}") +class ThinClientOfflineError(Exception): + pass + # Export public API __all__ = [ 'ThinClient', + 'ThinClientOfflineError', 'Config', 'ServiceDescriptor', 'WriteChannelReply', @@ -468,8 +472,8 @@ def __init__(self, config:Config) -> None: self._send_lock = asyncio.Lock() self._recv_lock = asyncio.Lock() - # Channel API support - individual response queues (simulating Rust's event sinks) - self.channel_response_queues : Dict[str, asyncio.Queue] = {} # reply_type -> Queue + # Letterbox for each response associated (by query_id) with a request. + self.response_queues : Dict[bytes, asyncio.Queue] = {} # query_id -> Queue # Channel query message ID correlation (for send_channel_query_await_reply) self.pending_channel_message_queries : Dict[bytes, asyncio.Event] = {} # message_id -> Event @@ -641,13 +645,15 @@ async def worker_loop(self, loop:asyncio.events.AbstractEventLoop) -> None: self.logger.debug("read loop") try: response = await self.recv(loop) - self.handle_response(response) except asyncio.CancelledError: # Handle cancellation of the read loop break except Exception as e: self.logger.error(f"Error reading from socket: {e}") - break + raise + else: + self.handle_response(response) + def parse_status(self, event: "Dict[str,Any]") -> None: """ @@ -747,6 +753,26 @@ def new_query_id(self) -> bytes: """ return os.urandom(16) + async def _send_and_wait(self, *, query_id:bytes, request: Dict[str, Any]) -> Dict[str, Any]: + cbor_request = cbor2.dumps(request) + length_prefix = struct.pack('>I', len(cbor_request)) + length_prefixed_request = length_prefix + cbor_request + assert query_id not in self.response_queues + self.response_queues[query_id] = asyncio.Queue(maxsize=1) + request_type = list(request.keys())[0] + try: + await self._send_all(length_prefixed_request) + self.logger.info("{request_type} request sent.") + reply = await self.response_queues[query_id].get() + self.logger.info("{request_type} response received.") + # TODO error handling, see _wait_for_channel_reply + return reply + except asyncio.CancelledError: + self.logger.info("{request_type} task cancelled.") + raise + finally: + del self.response_queues[query_id] + async def _wait_for_channel_reply(self, expected_reply_type: str) -> Dict[Any, Any]: """ Wait for a channel API reply using response queues (simulating Rust's event sinks). @@ -807,34 +833,18 @@ def handle_response(self, response: "Dict[str,Any]") -> None: if response.get("message_reply_event") is not None: self.logger.debug("message reply event") reply = response["message_reply_event"] - - self.reply_received_event.set() self.config.handle_message_reply_event(reply) return - # Handle channel API replies using response queues (simulating Rust's event sinks) - channel_reply_types = [ - "create_write_channel_reply", - "create_read_channel_reply", - "write_channel_reply", - "read_channel_reply", - "resume_write_channel_reply", - "resume_read_channel_reply", - "resume_write_channel_query_reply", - "resume_read_channel_query_reply" - ] - - for reply_type in channel_reply_types: - if response.get(reply_type) is not None: - self.logger.debug(f"channel {reply_type} event") - # Put the response in the appropriate queue - if reply_type in self.channel_response_queues: - try: - self.channel_response_queues[reply_type].put_nowait(response[reply_type]) - except asyncio.QueueFull: - self.logger.warning(f"Response queue full for {reply_type}") - return + for reply_type, reply in response: + self.logger.debug(f"channel {reply_type} event") + if not reply_type.endswith("_reply") or not (query_id := reply.get("query_id", None)): + continue + if not (queue := self.response_queues.get(query_id, None)): + continue + # avoid blocking recv loop: + asyncio.create_task(queue.put(reply)) # Handle channel query events (for send_channel_query_await_reply) if response.get("channel_query_sent_event") is not None: @@ -887,11 +897,11 @@ async def send_message_without_reply(self, payload:bytes|str, dest_node:bytes, d dest_queue (bytes): Destination recipient queue ID. Raises: - RuntimeError: If in offline mode (daemon not connected to mixnet). + ThinClientOfflineError: If in offline mode (daemon not connected to mixnet). """ # Check if we're in offline mode if not self._is_connected: - raise RuntimeError("cannot send message in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send message in offline mode - daemon not connected to mixnet") if not isinstance(payload, bytes): payload = payload.encode('utf-8') # Encoding the string to bytes @@ -932,11 +942,11 @@ async def send_message(self, surb_id:bytes, payload:bytes|str, dest_node:bytes, dest_queue (bytes): Destination recipient queue ID. Raises: - RuntimeError: If in offline mode (daemon not connected to mixnet). + ThinClientOfflineError: If in offline mode (daemon not connected to mixnet). """ # Check if we're in offline mode if not self._is_connected: - raise RuntimeError("cannot send message in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send message in offline mode - daemon not connected to mixnet") if not isinstance(payload, bytes): payload = payload.encode('utf-8') # Encoding the string to bytes @@ -979,11 +989,11 @@ async def send_reliable_message(self, message_id:bytes, payload:bytes|str, dest_ dest_queue (bytes): Destination recipient queue ID. Raises: - RuntimeError: If in offline mode (daemon not connected to mixnet). + ThinClientOfflineError: If in offline mode (daemon not connected to mixnet). """ # Check if we're in offline mode if not self._is_connected: - raise RuntimeError("cannot send reliable message in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send reliable message in offline mode - daemon not connected to mixnet") if not isinstance(payload, bytes): payload = payload.encode('utf-8') # Encoding the string to bytes @@ -1076,26 +1086,17 @@ async def create_write_channel(self) -> "Tuple[int, bytes, bytes]": } } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("CreateWriteChannel request sent successfully.") - - # Wait for CreateWriteChannelReply using response queue - reply = await self._wait_for_channel_reply("create_write_channel_reply") - - channel_id = reply["channel_id"] - read_cap = reply["read_cap"] - write_cap = reply["write_cap"] - - return channel_id, read_cap, write_cap - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error creating write channel: {e}") - raise + raise e + + channel_id = reply["channel_id"] + read_cap = reply["read_cap"] + write_cap = reply["write_cap"] + + return channel_id, read_cap, write_cap async def create_read_channel(self, read_cap: bytes) -> int: """ @@ -1119,24 +1120,15 @@ async def create_read_channel(self, read_cap: bytes) -> int: } } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("CreateReadChannel request sent successfully.") - - # Wait for CreateReadChannelReply using response queue - reply = await self._wait_for_channel_reply("create_read_channel_reply") - - channel_id = reply["channel_id"] - return channel_id - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error creating read channel: {e}") raise + channel_id = reply["channel_id"] + return channel_id + async def write_channel(self, channel_id: int, payload: "bytes|str") -> WriteChannelReply: """ Prepares a message for writing to a Pigeonhole channel. @@ -1164,29 +1156,21 @@ async def write_channel(self, channel_id: int, payload: "bytes|str") -> WriteCha } } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("WriteChannel request sent successfully.") - - # Wait for WriteChannelReply using response queue - reply = await self._wait_for_channel_reply("write_channel_reply") - - return WriteChannelReply( - send_message_payload=reply["send_message_payload"], - current_message_index=reply["current_message_index"], - next_message_index=reply["next_message_index"], - envelope_descriptor=reply["envelope_descriptor"], - envelope_hash=reply["envelope_hash"] - ) - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error preparing write to channel: {e}") raise + return WriteChannelReply( + send_message_payload=reply["send_message_payload"], + current_message_index=reply["current_message_index"], + next_message_index=reply["next_message_index"], + envelope_descriptor=reply["envelope_descriptor"], + envelope_hash=reply["envelope_hash"] + ) + + async def read_channel(self, channel_id: int, message_box_index: "bytes|None" = None, reply_index: "int|None" = None) -> ReadChannelReply: """ @@ -1220,30 +1204,22 @@ async def read_channel(self, channel_id: int, message_box_index: "bytes|None" = "read_channel": request_data } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("ReadChannel request sent successfully.") - - # Wait for ReadChannelReply using response queue - reply = await self._wait_for_channel_reply("read_channel_reply") - - return ReadChannelReply( - send_message_payload=reply["send_message_payload"], - current_message_index=reply["current_message_index"], - next_message_index=reply["next_message_index"], - reply_index=reply.get("reply_index"), - envelope_descriptor=reply["envelope_descriptor"], - envelope_hash=reply["envelope_hash"] - ) - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error preparing read from channel: {e}") raise + return ReadChannelReply( + send_message_payload=reply["send_message_payload"], + current_message_index=reply["current_message_index"], + next_message_index=reply["next_message_index"], + reply_index=reply.get("reply_index"), + envelope_descriptor=reply["envelope_descriptor"], + envelope_hash=reply["envelope_hash"] + ) + + async def resume_write_channel(self, write_cap: bytes, message_box_index: "bytes|None" = None) -> int: """ Resumes a write channel from a previous session. @@ -1272,23 +1248,13 @@ async def resume_write_channel(self, write_cap: bytes, message_box_index: "bytes "resume_write_channel": request_data } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("ResumeWriteChannel request sent successfully.") - - # Wait for ResumeWriteChannelReply using response queue - reply = await self._wait_for_channel_reply("resume_write_channel_reply") - - channel_id = reply["channel_id"] - return channel_id - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error resuming write channel: {e}") raise + return reply["channel_id"] + async def resume_read_channel(self, read_cap: bytes, next_message_index: "bytes|None" = None, reply_index: "int|None" = None) -> int: @@ -1323,23 +1289,13 @@ async def resume_read_channel(self, read_cap: bytes, next_message_index: "bytes| "resume_read_channel": request_data } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("ResumeReadChannel request sent successfully.") - - # Wait for ResumeReadChannelReply using response queue - reply = await self._wait_for_channel_reply("resume_read_channel_reply") - - channel_id = reply["channel_id"] - return channel_id - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error resuming read channel: {e}") raise + return reply["channel_id"] + async def resume_write_channel_query(self, write_cap: bytes, message_box_index: bytes, envelope_descriptor: bytes, envelope_hash: bytes) -> int: @@ -1374,23 +1330,13 @@ async def resume_write_channel_query(self, write_cap: bytes, message_box_index: } } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("ResumeWriteChannelQuery request sent successfully.") - - # Wait for ResumeWriteChannelQueryReply using response queue - reply = await self._wait_for_channel_reply("resume_write_channel_query_reply") - - channel_id = reply["channel_id"] - return channel_id - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error resuming write channel query: {e}") raise + return reply["channel_id"] + async def resume_read_channel_query(self, read_cap: bytes, next_message_index: bytes, reply_index: "int|None", envelope_descriptor: bytes, @@ -1432,23 +1378,13 @@ async def resume_read_channel_query(self, read_cap: bytes, next_message_index: b "resume_read_channel_query": request_data } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: - await self._send_all(length_prefixed_request) - self.logger.info("ResumeReadChannelQuery request sent successfully.") - - # Wait for ResumeReadChannelQueryReply using response queue - reply = await self._wait_for_channel_reply("resume_read_channel_query_reply") - - channel_id = reply["channel_id"] - return channel_id - + reply = await self._send_and_wait(query_id=query_id, request=request) except Exception as e: self.logger.error(f"Error resuming read channel query: {e}") raise + return reply["channel_id"] + async def get_courier_destination(self) -> "Tuple[bytes, bytes]": """ @@ -1487,12 +1423,12 @@ async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, bytes: The received payload from the channel. Raises: - RuntimeError: If in offline mode (daemon not connected to mixnet). + ThinClientOfflineError: If in offline mode (daemon not connected to mixnet). Exception: If the query fails or times out. """ # Check if we're in offline mode if not self._is_connected: - raise RuntimeError("cannot send channel query in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send channel query in offline mode - daemon not connected to mixnet") # Create an event for this message_id event = asyncio.Event() @@ -1537,11 +1473,11 @@ async def send_channel_query(self, channel_id: int, payload: bytes, dest_node: b message_id: Message ID for reply correlation. Raises: - RuntimeError: If in offline mode (daemon not connected to mixnet). + ThinClientOfflineError: If in offline mode (daemon not connected to mixnet). """ # Check if we're in offline mode if not self._is_connected: - raise RuntimeError("cannot send channel query in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send channel query in offline mode - daemon not connected to mixnet") if not isinstance(payload, bytes): payload = payload.encode('utf-8') @@ -1583,33 +1519,17 @@ async def close_channel(self, channel_id: int) -> None: Raises: Exception: If the socket send operation fails. """ + request = { "close_channel": { "channel_id": channel_id } } - cbor_request = cbor2.dumps(request) - length_prefix = struct.pack('>I', len(cbor_request)) - length_prefixed_request = length_prefix + cbor_request - try: # CloseChannel is infallible - fire and forget, no reply expected await self._send_all(length_prefixed_request) - self.logger.info(f"CloseChannel request sent for channel {channel_id}.") except Exception as e: self.logger.error(f"Error sending close channel request: {e}") raise - - - - - - - - - - - - - + self.logger.info(f"CloseChannel request sent for channel {channel_id}.") From 1930900f4a664643b9ce49bde09e8fc84963c72d Mon Sep 17 00:00:00 2001 From: fourbit Date: Wed, 13 Aug 2025 11:36:03 +0200 Subject: [PATCH 02/16] async callbacks --- examples/echo_ping.py | 2 +- katzenpost_thinclient/__init__.py | 37 ++++++++++++++++--------------- tests/conftest.py | 2 +- tests/test_core.py | 4 ++-- 4 files changed, 23 insertions(+), 22 deletions(-) diff --git a/examples/echo_ping.py b/examples/echo_ping.py index a62f8f4..edc3b2d 100644 --- a/examples/echo_ping.py +++ b/examples/echo_ping.py @@ -6,7 +6,7 @@ class ClientState: def __init__(self): self.reply_message = None - def save_reply(self, reply): + async def save_reply(self, reply): self.reply_message = reply async def main(): diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index 776fc46..56dca47 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -274,7 +274,7 @@ class ServiceDescriptor: is used as the destination address along with the service's queue ID. Attributes: - recipient_queue_id (bytes): The identifier of the recipient's queue on the mixnet. + recipient_queue_id (bytes): The identifier of the recipient's queue on the mixnet. ("Kaetzchen.endpoint" in the PKI) mix_descriptor (dict): A CBOR-decoded dictionary describing the mix node, typically includes the 'IdentityKey' and other metadata. @@ -288,6 +288,7 @@ def __init__(self, recipient_queue_id:bytes, mix_descriptor: "Dict[Any,Any]") -> self.mix_descriptor = mix_descriptor def to_destination(self) -> "Tuple[bytes,bytes]": + "provider identity key hash and queue id" provider_id_hash = blake2_256_sum(self.mix_descriptor['IdentityKey']) return (provider_id_hash, self.recipient_queue_id) @@ -320,7 +321,7 @@ def find_services(capability:str, doc:"Dict[str,Any]") -> "List[ServiceDescripto for cap, details in mynode['Kaetzchen'].items(): if cap == capability: service_desc = ServiceDescriptor( - recipient_queue_id=bytes(details['endpoint'], 'utf-8'), + recipient_queue_id=bytes(details['endpoint'], 'utf-8'), # why is this bytes when it's string in PKI? mix_descriptor=mynode ) services.append(service_desc) @@ -421,21 +422,21 @@ def __init__(self, filepath:str, self.on_message_sent = on_message_sent self.on_message_reply = on_message_reply - def handle_connection_status_event(self, event: asyncio.Event) -> None: + async def handle_connection_status_event(self, event: asyncio.Event) -> None: if self.on_connection_status: - self.on_connection_status(event) + await self.on_connection_status(event) - def handle_new_pki_document_event(self, event: asyncio.Event) -> None: + async def handle_new_pki_document_event(self, event: asyncio.Event) -> None: if self.on_new_pki_document: - self.on_new_pki_document(event) + await self.on_new_pki_document(event) - def handle_message_sent_event(self, event: asyncio.Event) -> None: + async def handle_message_sent_event(self, event: asyncio.Event) -> None: if self.on_message_sent: - self.on_message_sent(event) + await self.on_message_sent(event) - def handle_message_reply_event(self, event: asyncio.Event) -> None: + async def handle_message_reply_event(self, event: asyncio.Event) -> None: if self.on_message_reply: - self.on_message_reply(event) + await self.on_message_reply(event) class ThinClient: @@ -548,13 +549,13 @@ async def start(self, loop:asyncio.AbstractEventLoop) -> None: response = await self.recv(loop) assert response is not None assert response["connection_status_event"] is not None - self.handle_response(response) + await self.handle_response(response) # 2nd message is always a new pki doc event response = await self.recv(loop) assert response is not None assert response["new_pki_document_event"] is not None - self.handle_response(response) + await self.handle_response(response) # Start the read loop as a background task self.logger.debug("starting read loop") @@ -652,7 +653,7 @@ async def worker_loop(self, loop:asyncio.events.AbstractEventLoop) -> None: self.logger.error(f"Error reading from socket: {e}") raise else: - self.handle_response(response) + await self.handle_response(response) def parse_status(self, event: "Dict[str,Any]") -> None: @@ -810,7 +811,7 @@ async def _wait_for_channel_reply(self, expected_reply_type: str) -> Dict[Any, A # Clean up self.channel_response_queues.pop(expected_reply_type, None) - def handle_response(self, response: "Dict[str,Any]") -> None: + async def handle_response(self, response: "Dict[str,Any]") -> None: """ Dispatch a parsed CBOR response to the appropriate handler or callback. """ @@ -819,22 +820,22 @@ def handle_response(self, response: "Dict[str,Any]") -> None: if response.get("connection_status_event") is not None: self.logger.debug("connection status event") self.parse_status(response["connection_status_event"]) - self.config.handle_connection_status_event(response["connection_status_event"]) + await self.config.handle_connection_status_event(response["connection_status_event"]) return if response.get("new_pki_document_event") is not None: self.logger.debug("new pki doc event") self.parse_pki_doc(response["new_pki_document_event"]) - self.config.handle_new_pki_document_event(response["new_pki_document_event"]) + await self.config.handle_new_pki_document_event(response["new_pki_document_event"]) return if response.get("message_sent_event") is not None: self.logger.debug("message sent event") - self.config.handle_message_sent_event(response["message_sent_event"]) + await self.config.handle_message_sent_event(response["message_sent_event"]) return if response.get("message_reply_event") is not None: self.logger.debug("message reply event") reply = response["message_reply_event"] self.reply_received_event.set() - self.config.handle_message_reply_event(reply) + await self.config.handle_message_reply_event(reply) return for reply_type, reply in response: diff --git a/tests/conftest.py b/tests/conftest.py index 669cdb7..d4016ef 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -97,7 +97,7 @@ async def thin_client(config_path, daemon_available): @pytest.fixture -def reply_handler(): +async def reply_handler(): """Provide a reply handler for message tests.""" replies = [] diff --git a/tests/test_core.py b/tests/test_core.py index f587520..070ccaa 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -10,7 +10,7 @@ # Global variable to store reply message reply_message = None -def save_reply(event): +async def save_reply(event): """Callback function to save reply messages.""" global reply_message reply_message = event @@ -67,7 +67,7 @@ async def test_config_validation(): assert cfg is not None, "Config should be created successfully" # Test config with callbacks - def dummy_callback(event): + async def dummy_callback(event): pass cfg_with_callbacks = Config( From b38505da54cf9bfb9748633e7305028f8532f0b4 Mon Sep 17 00:00:00 2001 From: fourbit Date: Sun, 21 Sep 2025 10:40:33 +0200 Subject: [PATCH 03/16] on_connection_status is now async --- katzenpost_thinclient/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index 56dca47..7bf5e77 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -424,7 +424,7 @@ def __init__(self, filepath:str, async def handle_connection_status_event(self, event: asyncio.Event) -> None: if self.on_connection_status: - await self.on_connection_status(event) + return await self.on_connection_status(event) async def handle_new_pki_document_event(self, event: asyncio.Event) -> None: if self.on_new_pki_document: From 92ffa361bad97b21b74dc0a2282f8148cf64b155 Mon Sep 17 00:00:00 2001 From: fourbit Date: Sun, 21 Sep 2025 14:05:14 +0200 Subject: [PATCH 04/16] fixes for receiving replies --- katzenpost_thinclient/__init__.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index 7bf5e77..03d9705 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -634,7 +634,7 @@ async def recv(self, loop:asyncio.AbstractEventLoop) -> "Dict[Any,Any]": except cbor2.CBORDecodeValueError as e: self.logger.error(f"{e}") raise ValueError(f"{e}") - self.logger.debug(f"Received daemon response: [{len(raw_data)}] {type(response)}") + self.logger.debug(f"Received daemon response: [{len(raw_data)}] {type(response)} {response}") return response async def worker_loop(self, loop:asyncio.events.AbstractEventLoop) -> None: @@ -763,9 +763,9 @@ async def _send_and_wait(self, *, query_id:bytes, request: Dict[str, Any]) -> Di request_type = list(request.keys())[0] try: await self._send_all(length_prefixed_request) - self.logger.info("{request_type} request sent.") + self.logger.info(f"{request_type} request sent.") reply = await self.response_queues[query_id].get() - self.logger.info("{request_type} response received.") + self.logger.info(f"{request_type} response received.") # TODO error handling, see _wait_for_channel_reply return reply except asyncio.CancelledError: @@ -837,12 +837,13 @@ async def handle_response(self, response: "Dict[str,Any]") -> None: self.reply_received_event.set() await self.config.handle_message_reply_event(reply) return - - for reply_type, reply in response: + for reply_type, reply in response.items(): self.logger.debug(f"channel {reply_type} event") if not reply_type.endswith("_reply") or not (query_id := reply.get("query_id", None)): + self.logger.debug(f"{reply_type} is not a reply, or can't get query_id") continue if not (queue := self.response_queues.get(query_id, None)): + self.logger.debug(f"query_id for {reply_type} has no listener") continue # avoid blocking recv loop: asyncio.create_task(queue.put(reply)) @@ -1127,6 +1128,8 @@ async def create_read_channel(self, read_cap: bytes) -> int: self.logger.error(f"Error creating read channel: {e}") raise + # client2/thin/thin_messages.go: ThinClientCapabilityAlreadyInUse uint8 = 21 + channel_id = reply["channel_id"] return channel_id @@ -1309,9 +1312,9 @@ async def resume_write_channel_query(self, write_cap: bytes, message_box_index: Args: write_cap: The write capability bytes. - message_box_index: Message box index for resuming from a specific position. - envelope_descriptor: Envelope descriptor from previous query. - envelope_hash: Envelope hash from previous query. + message_box_index: Message box index for resuming from a specific position (WriteChannelReply.current_message_index). + envelope_descriptor: Envelope descriptor from previous query (WriteChannelReply.envelope_descriptor). + envelope_hash: Envelope hash from previous query (WriteChannelReply.envelope_hash). Returns: int: The channel ID. @@ -1432,8 +1435,9 @@ async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, raise ThinClientOfflineError("cannot send channel query in offline mode - daemon not connected to mixnet") # Create an event for this message_id - event = asyncio.Event() - self.pending_channel_message_queries[message_id] = event + if message_id not in self.pending_channel_message_queries: + event = asyncio.Event() + self.pending_channel_message_queries[message_id] = event try: # Send the channel query From 9e31e527716f7e4d0915468ad76980e082e14a76 Mon Sep 17 00:00:00 2001 From: fourbit Date: Sun, 21 Sep 2025 14:05:31 +0200 Subject: [PATCH 05/16] make close_channel work --- katzenpost_thinclient/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index 03d9705..10a90f4 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -1531,6 +1531,10 @@ async def close_channel(self, channel_id: int) -> None: } } + cbor_request = cbor2.dumps(request) + length_prefix = struct.pack('>I', len(cbor_request)) + length_prefixed_request = length_prefix + cbor_request + try: # CloseChannel is infallible - fire and forget, no reply expected await self._send_all(length_prefixed_request) From 2ef8e743a5522d725718da7989ef102ee8f79177 Mon Sep 17 00:00:00 2001 From: fourbit Date: Sun, 21 Sep 2025 17:56:35 +0200 Subject: [PATCH 06/16] misc fixes --- katzenpost_thinclient/__init__.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index 10a90f4..adb8f35 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -838,6 +838,8 @@ async def handle_response(self, response: "Dict[str,Any]") -> None: await self.config.handle_message_reply_event(reply) return for reply_type, reply in response.items(): + if not reply: + continue self.logger.debug(f"channel {reply_type} event") if not reply_type.endswith("_reply") or not (query_id := reply.get("query_id", None)): self.logger.debug(f"{reply_type} is not a reply, or can't get query_id") @@ -903,7 +905,7 @@ async def send_message_without_reply(self, payload:bytes|str, dest_node:bytes, d """ # Check if we're in offline mode if not self._is_connected: - raise ThinClientOfflineError("cannot send message in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send_message_without_reply in offline mode - daemon not connected to mixnet") if not isinstance(payload, bytes): payload = payload.encode('utf-8') # Encoding the string to bytes @@ -1143,6 +1145,10 @@ async def write_channel(self, channel_id: int, payload: "bytes|str") -> WriteCha Returns: WriteChannelReply: Reply containing send_message_payload and other metadata. + // ThinClientErrorInternalError indicates an internal error occurred within + // the client daemon or thin client that prevented operation completion. + ThinClientErrorInternalError uint8 = 4 + Raises: Exception: If the write preparation fails. @@ -1166,6 +1172,9 @@ async def write_channel(self, channel_id: int, payload: "bytes|str") -> WriteCha self.logger.error(f"Error preparing write to channel: {e}") raise + if reply['error_code'] != 0: + raise Exception(f"write_channel got error from clientd: {reply['error_code']}") + return WriteChannelReply( send_message_payload=reply["send_message_payload"], current_message_index=reply["current_message_index"], @@ -1410,8 +1419,8 @@ async def get_courier_destination(self) -> "Tuple[bytes, bytes]": return dest_node, dest_queue async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, - dest_node: bytes, dest_queue: bytes, - message_id: bytes) -> bytes: + dest_node: bytes, dest_queue: bytes, + message_id: bytes, timeout_seconds=30.0) -> bytes: """ Sends a channel query and waits for the reply. This combines send_channel_query with event handling to wait for the response. @@ -1432,7 +1441,7 @@ async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, """ # Check if we're in offline mode if not self._is_connected: - raise ThinClientOfflineError("cannot send channel query in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send_channel_query_await_reply in offline mode - daemon not connected to mixnet") # Create an event for this message_id if message_id not in self.pending_channel_message_queries: @@ -1444,11 +1453,11 @@ async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, await self.send_channel_query(channel_id, payload, dest_node, dest_queue, message_id) # Wait for the reply with timeout - await asyncio.wait_for(event.wait(), timeout=30.0) + await asyncio.wait_for(event.wait(), timeout=timeout_seconds) # Get the response payload if message_id not in self.channel_message_query_responses: - raise Exception("No channel query reply received") + raise Exception("No channel query reply received within timeout_seconds") response_payload = self.channel_message_query_responses[message_id] @@ -1482,7 +1491,7 @@ async def send_channel_query(self, channel_id: int, payload: bytes, dest_node: b """ # Check if we're in offline mode if not self._is_connected: - raise ThinClientOfflineError("cannot send channel query in offline mode - daemon not connected to mixnet") + raise ThinClientOfflineError("cannot send_channel_query while not is_connected() - daemon not connected to mixnet") if not isinstance(payload, bytes): payload = payload.encode('utf-8') From 6ee7af93d997899e956572292163136828471572 Mon Sep 17 00:00:00 2001 From: fourbit Date: Mon, 22 Sep 2025 00:01:34 +0200 Subject: [PATCH 07/16] bunch of debugging and some error handling --- katzenpost_thinclient/__init__.py | 73 ++++++++++++++++++------------- 1 file changed, 42 insertions(+), 31 deletions(-) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index adb8f35..b241609 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -132,8 +132,8 @@ def __init__(self, send_message_payload: bytes, current_message_index: bytes, self.send_message_payload = send_message_payload self.current_message_index = current_message_index self.next_message_index = next_message_index - self.envelope_descriptor = envelope_descriptor self.envelope_hash = envelope_hash + self.envelope_descriptor = envelope_descriptor class ReadChannelReply: @@ -474,7 +474,7 @@ def __init__(self, config:Config) -> None: self._recv_lock = asyncio.Lock() # Letterbox for each response associated (by query_id) with a request. - self.response_queues : Dict[bytes, asyncio.Queue] = {} # query_id -> Queue + self.response_queues : Dict[bytes, asyncio.Queue] = {} # (query_id|message_id) -> Queue # Channel query message ID correlation (for send_channel_query_await_reply) self.pending_channel_message_queries : Dict[bytes, asyncio.Event] = {} # message_id -> Event @@ -634,7 +634,9 @@ async def recv(self, loop:asyncio.AbstractEventLoop) -> "Dict[Any,Any]": except cbor2.CBORDecodeValueError as e: self.logger.error(f"{e}") raise ValueError(f"{e}") - self.logger.debug(f"Received daemon response: [{len(raw_data)}] {type(response)} {response}") + response = {k:v for k,v in response.items() if v} # filter empty KV pairs + if not (set(response.keys()) & {'new_pki_document_event'}): + self.logger.debug(f"Received daemon response: [{len(raw_data)}] {type(response)} {response}") return response async def worker_loop(self, loop:asyncio.events.AbstractEventLoop) -> None: @@ -837,19 +839,6 @@ async def handle_response(self, response: "Dict[str,Any]") -> None: self.reply_received_event.set() await self.config.handle_message_reply_event(reply) return - for reply_type, reply in response.items(): - if not reply: - continue - self.logger.debug(f"channel {reply_type} event") - if not reply_type.endswith("_reply") or not (query_id := reply.get("query_id", None)): - self.logger.debug(f"{reply_type} is not a reply, or can't get query_id") - continue - if not (queue := self.response_queues.get(query_id, None)): - self.logger.debug(f"query_id for {reply_type} has no listener") - continue - # avoid blocking recv loop: - asyncio.create_task(queue.put(reply)) - # Handle channel query events (for send_channel_query_await_reply) if response.get("channel_query_sent_event") is not None: self.logger.debug("channel_query_sent_event") @@ -867,26 +856,45 @@ async def handle_response(self, response: "Dict[str,Any]") -> None: return if response.get("channel_query_reply_event") is not None: + # channel_query_sent_event': {'message_id': b'\xb7\xd5\xaeG\x8a\xc4\x96\x99|M\x89c\x90\xc3\xd4\x1f', 'sent_at': 1758485828, 'reply_eta': 1179000000, 'error_code': 0}, self.logger.debug("channel_query_reply_event") event = response["channel_query_reply_event"] message_id = event.get("message_id") - if message_id is not None: - # Check for error code - error_code = event.get("error_code", 0) - if error_code != 0: - error_msg = f"Channel query failed with error code: {error_code}".encode() - self.channel_message_query_responses[message_id] = error_msg - else: - # Extract the payload - payload = event.get("payload", b"") - self.channel_message_query_responses[message_id] = payload - - # Signal the waiting coroutine - if message_id in self.pending_channel_message_queries: - self.pending_channel_message_queries[message_id].set() - return + if message_id is None: + return + # TODO wait why are we storing these indefinitely if we don't really care about them?? + if error_code := event.get("error_code", 0): + error_msg = f"Channel query failed with error code: {error_code}".encode() + self.channel_message_query_responses[message_id] = error_msg + else: + # Extract the payload + payload = event.get("payload", b"") + self.channel_message_query_responses[message_id] = payload + + # Signal the waiting coroutine + if message_id in self.pending_channel_message_queries: + self.pending_channel_message_queries[message_id].set() + return + for reply_type, reply in response.items(): + if not reply: + continue + self.logger.debug(f"channel {reply_type} event") + if not reply_type.endswith("_reply") or not (query_id := reply.get("query_id", None)): + self.logger.debug(f"{reply_type} is not a reply, or can't get query_id") + # 'create_read_channel_reply': {'query_id': None, 'channel_id': 0, 'error_code': 21}, + # DEBUG [thinclient] channel_query_reply_event is not a reply, or can't get query_id + # REPLY {'message_id': b'\xfd\xc0\x9d\xcfh\xa3\x88X[\xab\xa8\xd3\x1b\x8b\x15\xd1', 'payload': b'', 'reply_index': None, 'error_code': 0} + # SELF.RESPONSE_QUEUES {} + print("REPLY", reply) + print('SELF.RESPONSE_QUEUES', self.response_queues) + continue + if not (queue := self.response_queues.get(query_id, None)): + self.logger.debug(f"query_id for {reply_type} has no listener") + continue + # avoid blocking recv loop: + asyncio.create_task(queue.put(reply)) @@ -1307,6 +1315,8 @@ async def resume_read_channel(self, read_cap: bytes, next_message_index: "bytes| except Exception as e: self.logger.error(f"Error resuming read channel: {e}") raise + if not reply["channel_id"]: + raise Exception("TODO resume_read_channel error", reply) return reply["channel_id"] @@ -1431,6 +1441,7 @@ async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, dest_node: Destination node identity hash. dest_queue: Destination recipient queue ID. message_id: Message ID for reply correlation. + timeout_seconds: float (seconds to wait), None for indefinite wait Returns: bytes: The received payload from the channel. From a5f87b9623eb349ba376d7e7555c3154302216e8 Mon Sep 17 00:00:00 2001 From: fourbit Date: Sat, 27 Sep 2025 10:15:41 +0200 Subject: [PATCH 08/16] bit of logging and fixes to event handling --- katzenpost_thinclient/__init__.py | 34 +++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index b241609..4cf6b5c 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -474,7 +474,8 @@ def __init__(self, config:Config) -> None: self._recv_lock = asyncio.Lock() # Letterbox for each response associated (by query_id) with a request. - self.response_queues : Dict[bytes, asyncio.Queue] = {} # (query_id|message_id) -> Queue + self.response_queues : Dict[bytes, asyncio.Queue[Dict[str,Any]]] = {} # (query_id|message_id) -> Queue + self.ack_queues : Dict[bytes, asyncio.Queue[Dict[str,Any]]] = {} # (query_id|message_id) -> Queue # Channel query message ID correlation (for send_channel_query_await_reply) self.pending_channel_message_queries : Dict[bytes, asyncio.Event] = {} # message_id -> Event @@ -484,9 +485,10 @@ def __init__(self, config:Config) -> None: self.logger = logging.getLogger('thinclient') self.logger.setLevel(logging.DEBUG) # Only add handler if none exists to avoid duplicate log messages - if not self.logger.handlers: - handler = logging.StreamHandler(sys.stderr) - self.logger.addHandler(handler) + # XXX: commented out because it did in fact log twice: + #if not self.logger.handlers: + # handler = logging.StreamHandler(sys.stderr) + # self.logger.addHandler(handler) if self.config.network is None: raise RuntimeError("config.network is None") @@ -528,6 +530,9 @@ async def start(self, loop:asyncio.AbstractEventLoop) -> None: Args: loop (asyncio.AbstractEventLoop): The running asyncio event loop. + + Exceptions: + BrokenPipeError """ self.logger.debug("connecting to daemon") server_addr : str | Tuple[str,int] = '' @@ -839,8 +844,9 @@ async def handle_response(self, response: "Dict[str,Any]") -> None: self.reply_received_event.set() await self.config.handle_message_reply_event(reply) return - # Handle channel query events (for send_channel_query_await_reply) + # Handle channel query events (for send_channel_query_await_reply), this is the ACK from the local clientd (not courier) if response.get("channel_query_sent_event") is not None: + # channel_query_sent_event': {'message_id': b'\xb7\xd5\xaeG\x8a\xc4\x96\x99|M\x89c\x90\xc3\xd4\x1f', 'sent_at': 1758485828, 'reply_eta': 1179000000, 'error_code': 0}, self.logger.debug("channel_query_sent_event") event = response["channel_query_sent_event"] message_id = event.get("message_id") @@ -855,12 +861,14 @@ async def handle_response(self, response: "Dict[str,Any]") -> None: # Continue waiting for the reply (don't return here) return - if response.get("channel_query_reply_event") is not None: - # channel_query_sent_event': {'message_id': b'\xb7\xd5\xaeG\x8a\xc4\x96\x99|M\x89c\x90\xc3\xd4\x1f', 'sent_at': 1758485828, 'reply_eta': 1179000000, 'error_code': 0}, + if query_ack := response.get("channel_query_reply_event", None): + # this is the ACK from the courier self.logger.debug("channel_query_reply_event") event = response["channel_query_reply_event"] message_id = event.get("message_id") + if message_id is None: + self.logger.error("channel_query_reply_event without message_id") return # TODO wait why are we storing these indefinitely if we don't really care about them?? @@ -872,6 +880,13 @@ async def handle_response(self, response: "Dict[str,Any]") -> None: payload = event.get("payload", b"") self.channel_message_query_responses[message_id] = payload + if (queue := self.ack_queues.get(message_id, None)): + self.logger.debug(f"ack_queues: populated with message_id {message_id.hex()}") + asyncio.create_task(queue.put(query_ack)) + else: + self.logger.error(f"channel_query_reply_event for message_id {message_id.hex()}, but there is no listener") + + # Signal the waiting coroutine if message_id in self.pending_channel_message_queries: self.pending_channel_message_queries[message_id].set() @@ -1461,7 +1476,7 @@ async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, try: # Send the channel query - await self.send_channel_query(channel_id, payload, dest_node, dest_queue, message_id) + await self.send_channel_query(channel_id, payload=payload, dest_node=dest_node, dest_queue=dest_queue, message_id=message_id) # Wait for the reply with timeout await asyncio.wait_for(event.wait(), timeout=timeout_seconds) @@ -1485,7 +1500,7 @@ async def send_channel_query_await_reply(self, channel_id: int, payload: bytes, self.pending_channel_message_queries.pop(message_id, None) self.channel_message_query_responses.pop(message_id, None) - async def send_channel_query(self, channel_id: int, payload: bytes, dest_node: bytes, + async def send_channel_query(self, channel_id: int, *, payload: bytes, dest_node: bytes, dest_queue: bytes, message_id: bytes) -> None: """ Sends a prepared channel query to the mixnet without waiting for a reply. @@ -1505,6 +1520,7 @@ async def send_channel_query(self, channel_id: int, payload: bytes, dest_node: b raise ThinClientOfflineError("cannot send_channel_query while not is_connected() - daemon not connected to mixnet") if not isinstance(payload, bytes): + self.logger.error("send_channel_query: type error: payload= must be bytes()") payload = payload.encode('utf-8') # Create the SendChannelQuery structure (matches Rust implementation) From 487d2b8e46440abba5bb4db95a168353de2d45d9 Mon Sep 17 00:00:00 2001 From: fourbit Date: Sun, 12 Oct 2025 02:13:54 +0200 Subject: [PATCH 09/16] block less and report exceptions better --- katzenpost_thinclient/__init__.py | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/katzenpost_thinclient/__init__.py b/katzenpost_thinclient/__init__.py index 4cf6b5c..aa86d8f 100644 --- a/katzenpost_thinclient/__init__.py +++ b/katzenpost_thinclient/__init__.py @@ -565,6 +565,14 @@ async def start(self, loop:asyncio.AbstractEventLoop) -> None: # Start the read loop as a background task self.logger.debug("starting read loop") self.task = loop.create_task(self.worker_loop(loop)) + def handle_loop_err(task): + try: + result = task.result() + except Exception: + import traceback + traceback.print_exc() + raise + self.task.add_done_callback(handle_loop_err) def get_config(self) -> Config: """ @@ -650,18 +658,25 @@ async def worker_loop(self, loop:asyncio.events.AbstractEventLoop) -> None: """ self.logger.debug("read loop start") while True: - self.logger.debug("read loop") try: response = await self.recv(loop) except asyncio.CancelledError: # Handle cancellation of the read loop + self.logger.error(f"worker_loop cancelled") break except Exception as e: self.logger.error(f"Error reading from socket: {e}") raise else: - await self.handle_response(response) - + def handle_response_err(task): + try: + result = task.result() + except Exception: + import traceback + traceback.print_exc() + raise + resp = asyncio.create_task(self.handle_response(response)) + resp.add_done_callback(handle_response_err) def parse_status(self, event: "Dict[str,Any]") -> None: """ @@ -1196,6 +1211,10 @@ async def write_channel(self, channel_id: int, payload: "bytes|str") -> WriteCha raise if reply['error_code'] != 0: + # Examples: + # 12:24:32.206 ERRO katzenpost/client2: writeChannel failure: failed to create write request: pki: replica not found + # - This one will probably never succeed? Why is the client using a bad replica? + # raise Exception(f"write_channel got error from clientd: {reply['error_code']}") return WriteChannelReply( @@ -1331,6 +1350,7 @@ async def resume_read_channel(self, read_cap: bytes, next_message_index: "bytes| self.logger.error(f"Error resuming read channel: {e}") raise if not reply["channel_id"]: + self.logger.error(f"Error resuming read channel: no channel_id") raise Exception("TODO resume_read_channel error", reply) return reply["channel_id"] From bc953ffa8e94363ef6b7a83a8c0510ce95283109 Mon Sep 17 00:00:00 2001 From: David Stainton Date: Tue, 18 Nov 2025 16:13:02 -0800 Subject: [PATCH 10/16] python tests: remove broken tests --- tests/test_channel_api.py | 149 -------------------------------------- 1 file changed, 149 deletions(-) diff --git a/tests/test_channel_api.py b/tests/test_channel_api.py index 0e05ba5..651b568 100644 --- a/tests/test_channel_api.py +++ b/tests/test_channel_api.py @@ -27,155 +27,6 @@ async def setup_thin_client(): return client -@pytest.mark.asyncio -async def test_channel_api_basics(): - """ - Test basic channel API operations - equivalent to TestChannelAPIBasics from Rust. - This test demonstrates the full channel workflow: Alice creates a write channel, - Bob creates a read channel, Alice writes messages, Bob reads them back. - """ - alice_thin_client = await setup_thin_client() - bob_thin_client = await setup_thin_client() - - # Wait for PKI documents to be available and connection to mixnet - print("Waiting for daemon to connect to mixnet...") - attempts = 0 - while not alice_thin_client.is_connected() and attempts < 30: - await asyncio.sleep(1) - attempts += 1 - - if not alice_thin_client.is_connected(): - raise Exception("Daemon failed to connect to mixnet within 30 seconds") - - print("✅ Daemon connected to mixnet, using current PKI document") - - # Alice creates write channel - print("Alice: Creating write channel") - alice_channel_id, read_cap, _write_cap = await alice_thin_client.create_write_channel() - print(f"Alice: Created write channel {alice_channel_id}") - - # Bob creates read channel using the read capability from Alice's write channel - print("Bob: Creating read channel") - bob_channel_id = await bob_thin_client.create_read_channel(read_cap) - print(f"Bob: Created read channel {bob_channel_id}") - - # Alice writes first message - original_message = b"hello1" - print("Alice: Writing first message and waiting for completion") - - write_reply1 = await alice_thin_client.write_channel(alice_channel_id, original_message) - print("Alice: Write operation completed successfully") - - # Get the courier service from PKI - courier_service = alice_thin_client.get_service("courier") - dest_node, dest_queue = courier_service.to_destination() - - alice_message_id1 = ThinClient.new_message_id() - - _reply1 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply1.send_message_payload, - dest_node, - dest_queue, - alice_message_id1 - ) - - # Alice writes a second message - second_message = b"hello2" - print("Alice: Writing second message and waiting for completion") - - write_reply2 = await alice_thin_client.write_channel(alice_channel_id, second_message) - print("Alice: Second write operation completed successfully") - - alice_message_id2 = ThinClient.new_message_id() - - _reply2 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply2.send_message_payload, - dest_node, - dest_queue, - alice_message_id2 - ) - - # Wait for message propagation to storage replicas - print("Waiting for message propagation to storage replicas") - await asyncio.sleep(10) - - # Bob reads first message - print("Bob: Reading first message") - read_reply1 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id1 = ThinClient.new_message_id() - - # In a real implementation, you'd retry the send_channel_query_await_reply until you get a response - bob_reply_payload1 = b"" - for i in range(10): - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply1.send_message_payload, - dest_node, - dest_queue, - bob_message_id1 - ) - if payload: - bob_reply_payload1 = payload - break - else: - print(f"Bob: Read attempt {i + 1} returned empty payload, retrying...") - await asyncio.sleep(0.5) - except Exception as e: - raise e - - assert original_message == bob_reply_payload1, "Bob: Reply payload mismatch" - - # Bob closes and resumes read channel to advance to second message - await bob_thin_client.close_channel(bob_channel_id) - - print("Bob: Resuming read channel to read second message") - bob_channel_id = await bob_thin_client.resume_read_channel( - read_cap, - read_reply1.next_message_index, - read_reply1.reply_index - ) - - # Bob reads second message - print("Bob: Reading second message") - read_reply2 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id2 = ThinClient.new_message_id() - bob_reply_payload2 = b"" - - for i in range(10): - print(f"Bob: second read attempt {i + 1}") - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply2.send_message_payload, - dest_node, - dest_queue, - bob_message_id2 - ) - if payload: - bob_reply_payload2 = payload - break - else: - await asyncio.sleep(0.5) - except Exception as e: - raise e - - assert second_message == bob_reply_payload2, "Bob: Second reply payload mismatch" - - # Clean up channels - await alice_thin_client.close_channel(alice_channel_id) - await bob_thin_client.close_channel(bob_channel_id) - - alice_thin_client.stop() - bob_thin_client.stop() - - print("✅ Channel API basics test completed successfully") - - @pytest.mark.asyncio async def test_resume_write_channel(): """ From 71598a8f27faf5465d32836c9d87808d7e6a07a3 Mon Sep 17 00:00:00 2001 From: David Stainton Date: Tue, 18 Nov 2025 16:36:31 -0800 Subject: [PATCH 11/16] python tests: remove another broken test, test_resume_write_channel_query --- tests/test_channel_api_extended.py | 156 ----------------------------- 1 file changed, 156 deletions(-) diff --git a/tests/test_channel_api_extended.py b/tests/test_channel_api_extended.py index 14b6304..39922fc 100644 --- a/tests/test_channel_api_extended.py +++ b/tests/test_channel_api_extended.py @@ -24,162 +24,6 @@ async def setup_thin_client(): return client - -@pytest.mark.asyncio -async def test_resume_write_channel_query(): - """ - Test resuming a write channel with query state - equivalent to TestResumeWriteChannelQuery from Rust. - This test demonstrates the write channel query resumption workflow: - 1. Create write channel - 2. Create first write query message but do not send to channel yet - 3. Close channel - 4. Resume write channel with query via resume_write_channel_query - 5. Send resumed write query to channel - 6. Send second message to channel - 7. Create read channel - 8. Read both messages from channel - 9. Verify payloads match - """ - alice_thin_client = await setup_thin_client() - bob_thin_client = await setup_thin_client() - - # Wait for PKI documents to be available and connection to mixnet - print("Waiting for daemon to connect to mixnet...") - attempts = 0 - while not alice_thin_client.is_connected() and attempts < 30: - await asyncio.sleep(1) - attempts += 1 - - if not alice_thin_client.is_connected(): - raise Exception("Daemon failed to connect to mixnet within 30 seconds") - - print("✅ Daemon connected to mixnet, using current PKI document") - - # Alice creates write channel - print("Alice: Creating write channel") - alice_channel_id, read_cap, write_cap = await alice_thin_client.create_write_channel() - print(f"Alice: Created write channel {alice_channel_id}") - - # Alice prepares first message but doesn't send it yet - alice_payload1 = b"Hello, Bob!" - write_reply = await alice_thin_client.write_channel(alice_channel_id, alice_payload1) - - # Get courier destination - courier_node, courier_queue_id = await alice_thin_client.get_courier_destination() - alice_message_id1 = ThinClient.new_message_id() - - # Close the channel immediately (like in Rust test - no waiting for propagation) - await alice_thin_client.close_channel(alice_channel_id) - - # Resume the write channel with query state using current_message_index like Rust test - print("Alice: Resuming write channel") - alice_channel_id = await alice_thin_client.resume_write_channel_query( - write_cap, - write_reply.current_message_index, # Use current_message_index like in Rust test - write_reply.envelope_descriptor, - write_reply.envelope_hash - ) - print(f"Alice: Resumed write channel with ID {alice_channel_id}") - - # Send the first message after resume - print("Alice: Writing first message after resume") - _reply1 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply.send_message_payload, - courier_node, - courier_queue_id, - alice_message_id1 - ) - - # Write second message - print("Alice: Writing second message") - alice_payload2 = b"Second message from Alice!" - write_reply2 = await alice_thin_client.write_channel(alice_channel_id, alice_payload2) - - alice_message_id2 = ThinClient.new_message_id() - _reply2 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply2.send_message_payload, - courier_node, - courier_queue_id, - alice_message_id2 - ) - print("Alice: Second write operation completed successfully") - - print("Waiting for second message propagation to storage replicas") - await asyncio.sleep(3) - - # Bob creates read channel - print("Bob: Creating read channel") - bob_channel_id = await bob_thin_client.create_read_channel(read_cap) - print(f"Bob: Created read channel {bob_channel_id}") - - # Bob reads first message - print("Bob: Reading first message") - read_reply1 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id1 = ThinClient.new_message_id() - bob_reply_payload1 = b"" - - for i in range(10): - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply1.send_message_payload, - courier_node, - courier_queue_id, - bob_message_id1 - ) - if payload: - bob_reply_payload1 = payload - break - else: - print(f"Bob: First read attempt {i + 1} returned empty payload, retrying...") - await asyncio.sleep(0.5) - except Exception as e: - raise e - - assert alice_payload1 == bob_reply_payload1, "Bob: First message payload mismatch" - - # Bob reads second message - print("Bob: Reading second message") - read_reply2 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id2 = ThinClient.new_message_id() - bob_reply_payload2 = b"" - - for i in range(10): - print(f"Bob: second message read attempt {i + 1}") - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply2.send_message_payload, - courier_node, - courier_queue_id, - bob_message_id2 - ) - if payload: - bob_reply_payload2 = payload - break - else: - await asyncio.sleep(0.5) - except Exception as e: - raise e - - # Verify the second message content matches - assert alice_payload2 == bob_reply_payload2, "Bob: Second message payload mismatch" - print("Bob: Successfully received and verified second message") - - # Clean up channels - await alice_thin_client.close_channel(alice_channel_id) - await bob_thin_client.close_channel(bob_channel_id) - - alice_thin_client.stop() - bob_thin_client.stop() - - print("✅ Resume write channel query test completed successfully") - - @pytest.mark.asyncio async def test_resume_read_channel(): """ From c4cd8920418f9e2cc6d2b6c0002b4314db1a4f35 Mon Sep 17 00:00:00 2001 From: David Stainton Date: Tue, 18 Nov 2025 18:58:44 -0800 Subject: [PATCH 12/16] remove broken python test: test_resume_read_channel --- tests/test_channel_api_extended.py | 154 ----------------------------- 1 file changed, 154 deletions(-) diff --git a/tests/test_channel_api_extended.py b/tests/test_channel_api_extended.py index 39922fc..2a62ab2 100644 --- a/tests/test_channel_api_extended.py +++ b/tests/test_channel_api_extended.py @@ -24,160 +24,6 @@ async def setup_thin_client(): return client -@pytest.mark.asyncio -async def test_resume_read_channel(): - """ - Test resuming a read channel - equivalent to TestResumeReadChannel from Rust. - This test demonstrates the read channel resumption workflow: - 1. Create a write channel - 2. Write two messages to the channel - 3. Create a read channel - 4. Read the first message from the channel - 5. Verify payload matches - 6. Close the read channel - 7. Resume the read channel - 8. Read the second message from the channel - 9. Verify payload matches - """ - alice_thin_client = await setup_thin_client() - bob_thin_client = await setup_thin_client() - - # Wait for PKI documents to be available and connection to mixnet - print("Waiting for daemon to connect to mixnet...") - attempts = 0 - while not alice_thin_client.is_connected() and attempts < 30: - await asyncio.sleep(1) - attempts += 1 - - if not alice_thin_client.is_connected(): - raise Exception("Daemon failed to connect to mixnet within 30 seconds") - - print("✅ Daemon connected to mixnet, using current PKI document") - - # Alice creates write channel - print("Alice: Creating write channel") - alice_channel_id, read_cap, _write_cap = await alice_thin_client.create_write_channel() - print(f"Alice: Created write channel {alice_channel_id}") - - # Alice writes first message - alice_payload1 = b"Hello, Bob!" - write_reply1 = await alice_thin_client.write_channel(alice_channel_id, alice_payload1) - - dest_node, dest_queue = await alice_thin_client.get_courier_destination() - alice_message_id1 = ThinClient.new_message_id() - - _reply1 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply1.send_message_payload, - dest_node, - dest_queue, - alice_message_id1 - ) - - print("Waiting for first message propagation to storage replicas") - await asyncio.sleep(3) - - # Alice writes second message - print("Alice: Writing second message") - alice_payload2 = b"Second message from Alice!" - write_reply2 = await alice_thin_client.write_channel(alice_channel_id, alice_payload2) - - alice_message_id2 = ThinClient.new_message_id() - _reply2 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply2.send_message_payload, - dest_node, - dest_queue, - alice_message_id2 - ) - print("Alice: Second write operation completed successfully") - - print("Waiting for second message propagation to storage replicas") - await asyncio.sleep(3) - - # Bob creates read channel - print("Bob: Creating read channel") - bob_channel_id = await bob_thin_client.create_read_channel(read_cap) - print(f"Bob: Created read channel {bob_channel_id}") - - # Bob reads first message - print("Bob: Reading first message") - read_reply1 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id1 = ThinClient.new_message_id() - bob_reply_payload1 = b"" - - for i in range(10): - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply1.send_message_payload, - dest_node, - dest_queue, - bob_message_id1 - ) - if payload: - bob_reply_payload1 = payload - break - else: - print(f"Bob: First read attempt {i + 1} returned empty payload, retrying...") - await asyncio.sleep(0.5) - except Exception as e: - raise e - - assert alice_payload1 == bob_reply_payload1, "Bob: First message payload mismatch" - - # Close the read channel - await bob_thin_client.close_channel(bob_channel_id) - - # Resume the read channel - print("Bob: Resuming read channel") - bob_channel_id = await bob_thin_client.resume_read_channel( - read_cap, - read_reply1.next_message_index, - read_reply1.reply_index - ) - print(f"Bob: Resumed read channel with ID {bob_channel_id}") - - # Bob reads second message - print("Bob: Reading second message") - read_reply2 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id2 = ThinClient.new_message_id() - bob_reply_payload2 = b"" - - for i in range(10): - print(f"Bob: second message read attempt {i + 1}") - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply2.send_message_payload, - dest_node, - dest_queue, - bob_message_id2 - ) - if payload: - bob_reply_payload2 = payload - break - else: - await asyncio.sleep(0.5) - except Exception as e: - raise e - - # Verify the second message content matches - assert alice_payload2 == bob_reply_payload2, "Bob: Second message payload mismatch" - print("Bob: Successfully received and verified second message") - - # Clean up channels - await alice_thin_client.close_channel(alice_channel_id) - await bob_thin_client.close_channel(bob_channel_id) - - alice_thin_client.stop() - bob_thin_client.stop() - - print("✅ Resume read channel test completed successfully") - - @pytest.mark.asyncio async def test_resume_read_channel_query(): """ From a316f50d67e815a3984ba006d5007d4233c3e29a Mon Sep 17 00:00:00 2001 From: David Stainton Date: Tue, 18 Nov 2025 19:16:03 -0800 Subject: [PATCH 13/16] remove broken python test: test_resume_read_channel_query --- tests/test_channel_api_extended.py | 157 ----------------------------- 1 file changed, 157 deletions(-) diff --git a/tests/test_channel_api_extended.py b/tests/test_channel_api_extended.py index 2a62ab2..3ac925a 100644 --- a/tests/test_channel_api_extended.py +++ b/tests/test_channel_api_extended.py @@ -23,160 +23,3 @@ async def setup_thin_client(): await asyncio.sleep(2) return client - -@pytest.mark.asyncio -async def test_resume_read_channel_query(): - """ - Test resuming a read channel with query state - equivalent to TestResumeReadChannelQuery from Rust. - This test demonstrates the read channel query resumption workflow: - 1. Create a write channel - 2. Write two messages to the channel - 3. Create read channel - 4. Make read query but do not send it - 5. Close read channel - 6. Resume read channel query with resume_read_channel_query method - 7. Send previously made read query to channel - 8. Verify received payload matches - 9. Read second message from channel - 10. Verify received payload matches - """ - alice_thin_client = await setup_thin_client() - bob_thin_client = await setup_thin_client() - - # Wait for PKI documents to be available and connection to mixnet - print("Waiting for daemon to connect to mixnet...") - attempts = 0 - while not alice_thin_client.is_connected() and attempts < 30: - await asyncio.sleep(1) - attempts += 1 - - if not alice_thin_client.is_connected(): - raise Exception("Daemon failed to connect to mixnet within 30 seconds") - - print("✅ Daemon connected to mixnet, using current PKI document") - - # Alice creates write channel - print("Alice: Creating write channel") - alice_channel_id, read_cap, _write_cap = await alice_thin_client.create_write_channel() - print(f"Alice: Created write channel {alice_channel_id}") - - # Alice writes first message - alice_payload1 = b"Hello, Bob!" - write_reply1 = await alice_thin_client.write_channel(alice_channel_id, alice_payload1) - - dest_node, dest_queue = await alice_thin_client.get_courier_destination() - alice_message_id1 = ThinClient.new_message_id() - - _reply1 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply1.send_message_payload, - dest_node, - dest_queue, - alice_message_id1 - ) - - print("Waiting for first message propagation to storage replicas") - await asyncio.sleep(3) - - # Alice writes second message - print("Alice: Writing second message") - alice_payload2 = b"Second message from Alice!" - write_reply2 = await alice_thin_client.write_channel(alice_channel_id, alice_payload2) - - alice_message_id2 = ThinClient.new_message_id() - _reply2 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply2.send_message_payload, - dest_node, - dest_queue, - alice_message_id2 - ) - print("Alice: Second write operation completed successfully") - - print("Waiting for second message propagation to storage replicas") - await asyncio.sleep(3) - - # Bob creates read channel - print("Bob: Creating read channel") - bob_channel_id = await bob_thin_client.create_read_channel(read_cap) - print(f"Bob: Created read channel {bob_channel_id}") - - # Bob prepares first read query but doesn't send it yet - print("Bob: Reading first message") - read_reply1 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - # Close the read channel - await bob_thin_client.close_channel(bob_channel_id) - - # Resume the read channel with query state - print("Bob: Resuming read channel") - bob_channel_id = await bob_thin_client.resume_read_channel_query( - read_cap, - read_reply1.current_message_index, - read_reply1.reply_index, - read_reply1.envelope_descriptor, - read_reply1.envelope_hash - ) - print(f"Bob: Resumed read channel with ID {bob_channel_id}") - - # Send the first read query and get the message payload - bob_message_id1 = ThinClient.new_message_id() - bob_reply_payload1 = b"" - - for i in range(10): - print(f"Bob: first message read attempt {i + 1}") - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply1.send_message_payload, - dest_node, - dest_queue, - bob_message_id1 - ) - if payload: - bob_reply_payload1 = payload - break - else: - await asyncio.sleep(0.5) - except Exception as e: - raise e - - assert alice_payload1 == bob_reply_payload1, "Bob: First message payload mismatch" - - # Bob reads second message - print("Bob: Reading second message") - read_reply2 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id2 = ThinClient.new_message_id() - bob_reply_payload2 = b"" - - for i in range(10): - print(f"Bob: second message read attempt {i + 1}") - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply2.send_message_payload, - dest_node, - dest_queue, - bob_message_id2 - ) - if payload: - bob_reply_payload2 = payload - break - else: - await asyncio.sleep(0.5) - except Exception as e: - raise e - - # Verify the second message content matches - assert alice_payload2 == bob_reply_payload2, "Bob: Second message payload mismatch" - print("Bob: Successfully received and verified second message") - - # Clean up channels - await alice_thin_client.close_channel(alice_channel_id) - await bob_thin_client.close_channel(bob_channel_id) - - alice_thin_client.stop() - bob_thin_client.stop() - - print("✅ Resume read channel query test completed successfully") From 8503b9e22e1471883bb686077c364d447dc75d8b Mon Sep 17 00:00:00 2001 From: David Stainton Date: Tue, 18 Nov 2025 20:15:48 -0800 Subject: [PATCH 14/16] remove broken python test: test_resume_write_channel --- tests/test_channel_api.py | 165 -------------------------------------- 1 file changed, 165 deletions(-) diff --git a/tests/test_channel_api.py b/tests/test_channel_api.py index 651b568..afbd3a2 100644 --- a/tests/test_channel_api.py +++ b/tests/test_channel_api.py @@ -27,170 +27,5 @@ async def setup_thin_client(): return client -@pytest.mark.asyncio -async def test_resume_write_channel(): - """ - Test resuming a write channel - equivalent to TestResumeWriteChannel from Rust. - This test demonstrates the write channel resumption workflow: - 1. Create a write channel - 2. Write the first message onto the channel - 3. Close the channel - 4. Resume the channel - 5. Write the second message onto the channel - 6. Create a read channel - 7. Read first and second message from the channel - 8. Verify payloads match - """ - alice_thin_client = await setup_thin_client() - bob_thin_client = await setup_thin_client() - - # Wait for PKI documents to be available and connection to mixnet - print("Waiting for daemon to connect to mixnet...") - attempts = 0 - while not alice_thin_client.is_connected() and attempts < 30: - await asyncio.sleep(1) - attempts += 1 - - if not alice_thin_client.is_connected(): - raise Exception("Daemon failed to connect to mixnet within 30 seconds") - - print("✅ Daemon connected to mixnet, using current PKI document") - - # Alice creates write channel - print("Alice: Creating write channel") - alice_channel_id, read_cap, write_cap = await alice_thin_client.create_write_channel() - print(f"Alice: Created write channel {alice_channel_id}") - - # Alice writes first message - alice_payload1 = b"Hello, Bob!" - print("Alice: Writing first message") - write_reply1 = await alice_thin_client.write_channel(alice_channel_id, alice_payload1) - - # Get courier destination - dest_node, dest_queue = await alice_thin_client.get_courier_destination() - alice_message_id1 = ThinClient.new_message_id() - - # Send first message - _reply1 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply1.send_message_payload, - dest_node, - dest_queue, - alice_message_id1 - ) - - print("Waiting for first message propagation to storage replicas") - await asyncio.sleep(3) - - # Close the channel - await alice_thin_client.close_channel(alice_channel_id) - - # Resume the write channel - print("Alice: Resuming write channel") - alice_channel_id = await alice_thin_client.resume_write_channel( - write_cap, - write_reply1.next_message_index - ) - print(f"Alice: Resumed write channel with ID {alice_channel_id}") - - # Write second message after resume - print("Alice: Writing second message after resume") - alice_payload2 = b"Second message from Alice!" - write_reply2 = await alice_thin_client.write_channel(alice_channel_id, alice_payload2) - - alice_message_id2 = ThinClient.new_message_id() - _reply2 = await alice_thin_client.send_channel_query_await_reply( - alice_channel_id, - write_reply2.send_message_payload, - dest_node, - dest_queue, - alice_message_id2 - ) - print("Alice: Second write operation completed successfully") - - print("Waiting for second message propagation to storage replicas") - await asyncio.sleep(3) - - # Bob creates read channel - print("Bob: Creating read channel") - bob_channel_id = await bob_thin_client.create_read_channel(read_cap) - print(f"Bob: Created read channel {bob_channel_id}") - - # Bob reads first message - print("Bob: Reading first message") - read_reply1 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id1 = ThinClient.new_message_id() - bob_reply_payload1 = b"" - - for i in range(10): - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply1.send_message_payload, - dest_node, - dest_queue, - bob_message_id1 - ) - if payload: - bob_reply_payload1 = payload - break - else: - print(f"Bob: First read attempt {i + 1} returned empty payload, retrying...") - await asyncio.sleep(0.5) - except Exception as e: - raise e - - assert alice_payload1 == bob_reply_payload1, "Bob: First message payload mismatch" - - # Bob closes and resumes read channel to advance to second message - await bob_thin_client.close_channel(bob_channel_id) - - print("Bob: Resuming read channel to read second message") - bob_channel_id = await bob_thin_client.resume_read_channel( - read_cap, - read_reply1.next_message_index, - read_reply1.reply_index - ) - - # Bob reads second message - print("Bob: Reading second message") - read_reply2 = await bob_thin_client.read_channel(bob_channel_id, None, None) - - bob_message_id2 = ThinClient.new_message_id() - bob_reply_payload2 = b"" - - for i in range(10): - print(f"Bob: second message read attempt {i + 1}") - try: - payload = await alice_thin_client.send_channel_query_await_reply( - bob_channel_id, - read_reply2.send_message_payload, - dest_node, - dest_queue, - bob_message_id2 - ) - if payload: - bob_reply_payload2 = payload - break - else: - await asyncio.sleep(0.5) - except Exception as e: - raise e - - # Verify the second message content matches - assert alice_payload2 == bob_reply_payload2, "Bob: Second message payload mismatch" - print("Bob: Successfully received and verified second message") - - # Clean up channels - await alice_thin_client.close_channel(alice_channel_id) - await bob_thin_client.close_channel(bob_channel_id) - - alice_thin_client.stop() - bob_thin_client.stop() - - print("✅ Resume write channel test completed successfully") - - if __name__ == "__main__": pytest.main([__file__]) From 0f5fb193ab00eb8c64dcc53bea3f75e8c0ffc763 Mon Sep 17 00:00:00 2001 From: Leif Ryge Date: Sat, 22 Nov 2025 19:47:05 +0100 Subject: [PATCH 15/16] update thinclient.toml from version currently generated on katzenpost main --- testdata/thinclient.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testdata/thinclient.toml b/testdata/thinclient.toml index 06bab92..513a2a5 100644 --- a/testdata/thinclient.toml +++ b/testdata/thinclient.toml @@ -18,10 +18,10 @@ Address = "localhost:64331" KEMName = "" [PigeonholeGeometry] - BoxPayloadLength = 1556 - CourierQueryReadLength = 360 + MaxPlaintextPayloadLength = 1553 + CourierQueryReadLength = 359 CourierQueryWriteLength = 2000 - CourierQueryReplyReadLength = 1701 + CourierQueryReplyReadLength = 1698 CourierQueryReplyWriteLength = 50 NIKEName = "CTIDH1024-X25519" SignatureSchemeName = "Ed25519" From ed6c19d01bb023927ad17070a468d29d72c2e440 Mon Sep 17 00:00:00 2001 From: Leif Ryge Date: Sat, 22 Nov 2025 21:24:25 +0100 Subject: [PATCH 16/16] test_channel_api.py: add simple single-message test based on more complicated test which was recently-removed --- tests/test_channel_api.py | 92 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 91 insertions(+), 1 deletion(-) diff --git a/tests/test_channel_api.py b/tests/test_channel_api.py index afbd3a2..26e377d 100644 --- a/tests/test_channel_api.py +++ b/tests/test_channel_api.py @@ -23,9 +23,99 @@ async def setup_thin_client(): loop = asyncio.get_running_loop() await client.start(loop) await asyncio.sleep(2) - return client +@pytest.mark.asyncio +async def test_channel_send_one_receive_one(): + """ + Alice sends a message and Bob receives it. + """ + alice_thin_client = await setup_thin_client() + bob_thin_client = await setup_thin_client() + + # Wait for PKI documents to be available and connection to mixnet + print("Waiting for daemon to connect to mixnet...") + attempts = 0 + while not alice_thin_client.is_connected() and attempts < 30: + await asyncio.sleep(1) + attempts += 1 + + if not alice_thin_client.is_connected(): + raise Exception("Daemon failed to connect to mixnet within 30 seconds") + + print("✅ Daemon connected to mixnet, using current PKI document") + + # Alice creates write channel + print("Alice: Creating write channel") + alice_channel_id, read_cap, _write_cap = await alice_thin_client.create_write_channel() + print(f"Alice: Created write channel {alice_channel_id}") + + # Bob creates read channel using the read capability from Alice's write channel + print("Bob: Creating read channel") + bob_channel_id = await bob_thin_client.create_read_channel(read_cap) + print(f"Bob: Created read channel {bob_channel_id}") + + # Alice writes first message + original_message = b"hello1" + print("Alice: Writing first message and waiting for completion") + + write_reply1 = await alice_thin_client.write_channel(alice_channel_id, original_message) + print("Alice: Write operation completed successfully") + + # Get the courier service from PKI + courier_service = alice_thin_client.get_service("courier") + dest_node, dest_queue = courier_service.to_destination() + + alice_message_id1 = ThinClient.new_message_id() + + _reply1 = await alice_thin_client.send_channel_query_await_reply( + alice_channel_id, + write_reply1.send_message_payload, + dest_node, + dest_queue, + alice_message_id1 + ) + + # Wait for message propagation to storage replicas + print("Waiting for message propagation to storage replicas") + await asyncio.sleep(10) + + # Bob reads first message + print("Bob: Reading first message") + read_reply1 = await bob_thin_client.read_channel(bob_channel_id, None, None) + + bob_message_id1 = ThinClient.new_message_id() + + # In a real implementation, you'd retry the send_channel_query_await_reply until you get a response + bob_reply_payload1 = b"" + for i in range(10): + try: + payload = await alice_thin_client.send_channel_query_await_reply( + bob_channel_id, + read_reply1.send_message_payload, + dest_node, + dest_queue, + bob_message_id1 + ) + if payload: + bob_reply_payload1 = payload + break + else: + print(f"Bob: Read attempt {i + 1} returned empty payload, retrying...") + await asyncio.sleep(0.5) + except Exception as e: + raise e + + assert original_message == bob_reply_payload1, "Bob: Reply payload mismatch" + + # Clean up channels + await alice_thin_client.close_channel(alice_channel_id) + await bob_thin_client.close_channel(bob_channel_id) + + alice_thin_client.stop() + bob_thin_client.stop() + + print("✅ Channel API basics test completed successfully") if __name__ == "__main__": pytest.main([__file__])