Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/natsrpy/_natsrpy_rs/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class Nats:
headers: dict[str, Any] | None = None,
inbox: str | None = None,
timeout: float | timedelta | None = None,
) -> None:
) -> Message:
"""Send a request and discard the response.

:param subject: subject to send the request to.
Expand All @@ -185,6 +185,7 @@ class Nats:
if None.
:param timeout: maximum time to wait for a response in seconds
or as a timedelta, defaults to the client request_timeout.
:return: response message.
"""

async def drain(self) -> None:
Expand Down
38 changes: 18 additions & 20 deletions python/tests/test_request_reply.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,71 @@
import asyncio
import uuid

from natsrpy import Nats
from natsrpy import Message, Nats


async def test_request_sends_with_reply(nats: Nats) -> None:
subj = uuid.uuid4().hex

received_payload: list[bytes] = []
received_reply: list[str | None] = []
received_msgs: list[Message] = []

async def responder() -> None:
sub = await nats.subscribe(subject=subj)
msg = await anext(sub)
received_payload.append(msg.payload)
received_reply.append(msg.reply)
received_msgs.append(msg)
if msg.reply:
await nats.publish(msg.reply, b"reply-data")

task = asyncio.create_task(responder())
await asyncio.sleep(0.1)

# request() sends a message and waits for a reply (though the response
# is not returned by the current implementation)
await nats.request(subj, b"request-payload")
response = await nats.request(subj, b"request-payload")
await task

assert received_payload == [b"request-payload"]
# request() should set a reply subject automatically
assert received_reply[0] is not None
assert response.payload == b"reply-data"
assert received_msgs
assert received_msgs[0].payload == b"request-payload"
assert received_msgs[0].reply is not None


async def test_request_with_headers(nats: Nats) -> None:
subj = uuid.uuid4().hex

received_headers: list[dict[str, str]] = []
received_msgs: list[Message] = []

async def responder() -> None:
sub = await nats.subscribe(subject=subj)
msg = await anext(sub)
received_headers.append(msg.headers)
received_msgs.append(msg)
if msg.reply:
await nats.publish(msg.reply, b"reply")

task = asyncio.create_task(responder())
await asyncio.sleep(0.1)

await nats.request(subj, b"data", headers={"x-custom": "value"})
resp = await nats.request(subj, b"data", headers={"x-custom": "value"})
await task

assert received_headers[0] == {"x-custom": "value"}
assert resp.payload == b"reply"
assert received_msgs[0].headers == {"x-custom": "value"}


async def test_request_none_payload(nats: Nats) -> None:
subj = uuid.uuid4().hex

received_payload: list[bytes] = []
received_msgs: list[Message] = []

async def responder() -> None:
sub = await nats.subscribe(subject=subj)
msg = await anext(sub)
received_payload.append(msg.payload)
received_msgs.append(msg)
if msg.reply:
await nats.publish(msg.reply, b"reply")

task = asyncio.create_task(responder())
await asyncio.sleep(0.1)

await nats.request(subj, b"")
response = await nats.request(subj, b"")
await task
assert response.payload == b"reply"

assert received_payload[0] == b""
assert received_msgs[0].payload == b""
3 changes: 1 addition & 2 deletions src/nats_cls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,7 @@ impl NatsCls {
inbox,
timeout: timeout.map(Into::into).map(Some),
};
session.send_request(subject, request).await?;
Ok(())
crate::message::Message::try_from(session.send_request(subject, request).await?)
} else {
Err(NatsrpyError::NotInitialized)
}
Expand Down
Loading