diff --git a/examples/direct_reply_queue/direct_reply_to.py b/examples/direct_reply_queue/direct_reply_to.py new file mode 100644 index 0000000..53c6924 --- /dev/null +++ b/examples/direct_reply_queue/direct_reply_to.py @@ -0,0 +1,90 @@ +# type: ignore + + +from rabbitmq_amqp_python_client import ( + AMQPMessagingHandler, + Connection, + Converter, + DirectReplyToConsumerOptions, + Environment, + Event, + Message, + OutcomeState, +) + +MESSAGES_TO_PUBLISH = 200 + + +class MyMessageHandler(AMQPMessagingHandler): + + def __init__(self): + super().__init__() + self._count = 0 + + def on_amqp_message(self, event: Event): + print( + "received message: {} ".format( + Converter.bytes_to_string(event.message.body) + ) + ) + + # accepting + self.delivery_context.accept(event) + + self._count = self._count + 1 + print("count " + str(self._count)) + + if self._count == MESSAGES_TO_PUBLISH: + print("received all messages") + + def on_connection_closed(self, event: Event): + # if you want you can add cleanup operations here + print("connection closed") + + def on_link_closed(self, event: Event) -> None: + # if you want you can add cleanup operations here + print("link closed") + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + return connection + + +def main() -> None: + print("connection_consumer to amqp server") + environment = Environment(uri="amqp://guest:guest@localhost:5672/") + connection_consumer = create_connection(environment) + consumer = connection_consumer.consumer( + message_handler=MyMessageHandler(), + consumer_options=DirectReplyToConsumerOptions(), + ) + addr = consumer.address + print("connecting to address: {}".format(addr)) + connection_publisher = create_connection(environment) + publisher = connection_publisher.publisher(addr) + + for i in range(MESSAGES_TO_PUBLISH): + msg = Message(body=Converter.string_to_bytes("test message {} ".format(i))) + status = publisher.publish(msg) + if status.remote_state == OutcomeState.ACCEPTED: + print("message accepted") + elif status.remote_state == OutcomeState.RELEASED: + print("message not routed") + elif status.remote_state == OutcomeState.REJECTED: + print("message rejected") + + try: + consumer.run() + except KeyboardInterrupt: + pass + + consumer.close() + publisher.close() + connection_consumer.close() + connection_publisher.close() + + +if __name__ == "__main__": + main() diff --git a/poetry.lock b/poetry.lock index b3b615c..48cbdf1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.2 and should not be changed by hand. [[package]] name = "backports-asyncio-runner" @@ -602,22 +602,22 @@ testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"] [[package]] name = "python-qpid-proton" -version = "0.39.0" +version = "0.40.0" description = "An AMQP based messaging library." optional = false python-versions = "*" groups = ["main", "dev"] files = [ - {file = "python-qpid-proton-0.39.0.tar.gz", hash = "sha256:362055ae6ab4c7f1437247c602757f30328d55c0a6986d5b68ca9798de9fce02"}, - {file = "python_qpid_proton-0.39.0-cp38-abi3-macosx_11_0_x86_64.whl", hash = "sha256:f69da296ffa9e3b22f88a53fe9e27c4f4844e088a9f041061bd4f75f74f2a0af"}, - {file = "python_qpid_proton-0.39.0-cp38-abi3-win_amd64.whl", hash = "sha256:d052e85ffbc817d4db973fae230d8a80732d444e0abbac55360ad4beb181cb43"}, + {file = "python_qpid_proton-0.40.0-cp312-cp312-macosx_13_0_x86_64.whl", hash = "sha256:fe56211c6dcc7ea1fb9d78a017208a4c08043cd901780b6602a74ff70f38bf1f"}, + {file = "python_qpid_proton-0.40.0-cp313-cp313-win_amd64.whl", hash = "sha256:a19d8c71c908700ceb38f6cbc1eb4a039428570f96bfc2caeeafdfec804fb94f"}, + {file = "python_qpid_proton-0.40.0.tar.gz", hash = "sha256:7680d607cf6e9684f97bf5b2ba16cda7d8512aab9e4ff78f98d44a4644fc819a"}, ] [package.dependencies] cffi = ">=1.0.0" [package.extras] -opentracing = ["jaeger-client", "opentracing"] +opentracing = ["jaeger_client", "opentracing"] [[package]] name = "requests" @@ -729,4 +729,4 @@ zstd = ["zstandard (>=0.18.0)"] [metadata] lock-version = "2.1" python-versions = "^3.9" -content-hash = "6855640542dddf03775cf0ecc647aa2e277b471618471e31a382012117ea76ce" +content-hash = "46cb7621d2b4b109705a4bbfc39c5c06150581644a4d3a480f59f41abd3e928c" diff --git a/pyproject.toml b/pyproject.toml index 3479f1a..16de4ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" [tool.poetry.dependencies] python = "^3.9" -python-qpid-proton = "^0.39.0" +python-qpid-proton = "^0.40.0" typing-extensions = "^4.13.0" packaging = "^23.0" @@ -21,7 +21,7 @@ isort = "^5.9.3" mypy = "^0.910" pytest = "^8.3.4" black = "^24.3.0" -python-qpid-proton = "^0.39.0" +python-qpid-proton = "^0.40.0" requests = "^2.31.0" pytest-asyncio = "^1.2.0" diff --git a/rabbitmq_amqp_python_client/__init__.py b/rabbitmq_amqp_python_client/__init__.py index 0ec9cb4..6c1c30e 100644 --- a/rabbitmq_amqp_python_client/__init__.py +++ b/rabbitmq_amqp_python_client/__init__.py @@ -14,6 +14,7 @@ from .consumer import Consumer from .entities import ( ConsumerOptions, + DirectReplyToConsumerOptions, ExchangeCustomSpecification, ExchangeSpecification, ExchangeToExchangeBindingSpecification, @@ -72,6 +73,7 @@ "QuorumQueueSpecification", "ClassicQueueSpecification", "StreamSpecification", + "DirectReplyToConsumerOptions", "ExchangeToQueueBindingSpecification", "ExchangeToExchangeBindingSpecification", "QueueType", diff --git a/rabbitmq_amqp_python_client/connection.py b/rabbitmq_amqp_python_client/connection.py index e5f347d..7fbdbee 100644 --- a/rabbitmq_amqp_python_client/connection.py +++ b/rabbitmq_amqp_python_client/connection.py @@ -379,7 +379,7 @@ def publisher(self, destination: str = "") -> Publisher: def consumer( self, - destination: str, + destination: Optional[str] = None, message_handler: Optional[MessagingHandler] = None, consumer_options: Optional[ConsumerOptions] = None, credit: Optional[int] = None, @@ -388,7 +388,7 @@ def consumer( Create a new consumer instance. Args: - destination: The address to consume from + destination: Optional The address to consume from message_handler: Optional handler for processing messages consumer_options: Optional configuration for queue consumption. Each queue has its own consumer options. credit: Optional credit value for flow control @@ -397,9 +397,11 @@ def consumer( Consumer: A new consumer instance Raises: - ArgumentOutOfRangeException: If destination address format is invalid + ArgumentOutOfRangeException: If destination address format is invalid. + Only applies if not using Direct Reply-to. + The server will provide the queue name in that case. """ - if not validate_address(destination): + if destination is not None and not validate_address(destination): raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) @@ -438,9 +440,7 @@ def _on_disconnection(self) -> None: time.sleep(delay.total_seconds()) try: - self._open_connections(reconnect_handlers=True) - self._connections.append(self) except ConnectionException as e: diff --git a/rabbitmq_amqp_python_client/consumer.py b/rabbitmq_amqp_python_client/consumer.py index 3efd8f3..f63dd2b 100644 --- a/rabbitmq_amqp_python_client/consumer.py +++ b/rabbitmq_amqp_python_client/consumer.py @@ -2,7 +2,11 @@ from typing import Literal, Optional, Union, cast from .amqp_consumer_handler import AMQPMessagingHandler -from .entities import ConsumerOptions +from .entities import ( + ConsumerOptions, + DirectReplyToConsumerOptions, + StreamConsumerOptions, +) from .options import ( ReceiverOptionUnsettled, ReceiverOptionUnsettledWithFilters, @@ -29,16 +33,16 @@ class Consumer: _conn (BlockingConnection): The underlying connection to RabbitMQ _addr (str): The address to consume from _handler (Optional[MessagingHandler]): Optional message handling callback - _stream_options (Optional[StreamConsumerOptions]): Configuration for stream consumption + _consumer_options (Optional[StreamConsumerOptions]): Configuration for stream consumption _credit (Optional[int]): Flow control credit value """ def __init__( self, conn: BlockingConnection, - addr: str, + addr: Optional[str] = None, handler: Optional[AMQPMessagingHandler] = None, - stream_options: Optional[ConsumerOptions] = None, + consumer_options: Optional[ConsumerOptions] = None, credit: Optional[int] = None, ): """ @@ -48,14 +52,14 @@ def __init__( conn: The blocking connection to use for consuming addr: The address to consume from handler: Optional message handler for processing received messages - stream_options: Optional configuration for stream-based consumption + consumer_options: Optional configuration for stream-based consumption credit: Optional credit value for flow control """ self._receiver: Optional[BlockingReceiver] = None self._conn = conn self._addr = addr self._handler = handler - self._stream_options = stream_options + self._consumer_options = consumer_options self._credit = credit self._consumers: list[Consumer] = [] self._open() @@ -66,21 +70,25 @@ def _open(self) -> None: self._receiver = self._create_receiver(self._addr) def _update_connection(self, conn: BlockingConnection) -> None: + addr = "" + if self._addr is not None: + addr = self._addr + self._conn = conn - if self._stream_options is None: + if self._consumer_options is None: logger.debug("creating new receiver without stream") self._receiver = self._conn.create_receiver( - self._addr, - options=ReceiverOptionUnsettled(self._addr), + addr, + options=ReceiverOptionUnsettled(addr), handler=self._handler, ) else: logger.debug("creating new stream receiver") - self._stream_options.offset(self._handler.offset - 1) # type: ignore + self._consumer_options.offset(self._handler.offset - 1) # type: ignore self._receiver = self._conn.create_receiver( - self._addr, + addr, options=ReceiverOptionUnsettledWithFilters( - self._addr, self._stream_options + addr, self._consumer_options ), handler=self._handler, ) @@ -142,29 +150,54 @@ def stop(self) -> None: self._receiver.container.stop_events() self._receiver.container.stop() - def _create_receiver(self, addr: str) -> BlockingReceiver: - logger.debug("Creating the receiver") - if self._stream_options is None: - receiver = self._conn.create_receiver( - addr, options=ReceiverOptionUnsettled(addr), handler=self._handler - ) + def _create_receiver(self, addr: Optional[str] = None) -> BlockingReceiver: + credit = 10 + if self._credit is not None: + credit = self._credit + if self._consumer_options is not None: + logger.debug( + "Creating the receiver, with options: %s", + type(self._consumer_options).__name__, + ) else: - receiver = self._conn.create_receiver( + logger.debug("Creating the receiver, without options") + + if self._consumer_options is None: + return self._conn.create_receiver( addr, - options=ReceiverOptionUnsettledWithFilters(addr, self._stream_options), + options=ReceiverOptionUnsettled(addr), handler=self._handler, + credit=credit, ) - if self._credit is not None: - receiver.credit = self._credit + if isinstance(self._consumer_options, DirectReplyToConsumerOptions): + logger.debug("Creating dynamic receiver for direct reply-to") + dynamic_receiver = self._conn.create_dynamic_receiver( + credit, handler=self._handler + ) + dynamic_receiver.credit = credit + return dynamic_receiver - return receiver + if isinstance(self._consumer_options, StreamConsumerOptions): + return self._conn.create_receiver( + addr, + options=ReceiverOptionUnsettledWithFilters( + addr, self._consumer_options + ), + handler=self._handler, + ) + + raise Exception( + "Receiver is not initialized. No valid consumer options provided." + ) @property - def address(self) -> str: - """Get the current publisher address.""" - return self._addr + def address(self) -> Optional[str]: + if self._receiver is not None: + return cast(Optional[str], self._receiver.link.remote_source.address) + else: + raise Exception("Receiver is not initialized") @property def handler(self) -> Optional[AMQPMessagingHandler]: diff --git a/rabbitmq_amqp_python_client/entities.py b/rabbitmq_amqp_python_client/entities.py index 8b82e4f..d5f8632 100644 --- a/rabbitmq_amqp_python_client/entities.py +++ b/rabbitmq_amqp_python_client/entities.py @@ -160,6 +160,9 @@ def validate(self, versions: Dict[str, bool]) -> None: def filter_set(self) -> Dict[symbol, Described]: raise NotImplementedError("Subclasses should implement this method") + def direct_reply_to(self) -> bool: + return False + @dataclass class MessageProperties: @@ -400,6 +403,21 @@ def validate(self, versions: Dict[str, bool]) -> None: ) +class DirectReplyToConsumerOptions(ConsumerOptions): + + def validate(self, versions: Dict[str, bool]) -> None: + if not versions.get("4.2.0", False): + raise ValidationCodeException( + "Direct Reply-To requires RabbitMQ 4.2.0 or higher" + ) + + def filter_set(self) -> Dict[symbol, Described]: + return {} + + def direct_reply_to(self) -> bool: + return True + + @dataclass class RecoveryConfiguration: """ diff --git a/rabbitmq_amqp_python_client/options.py b/rabbitmq_amqp_python_client/options.py index 9d49ebe..a8a2134 100644 --- a/rabbitmq_amqp_python_client/options.py +++ b/rabbitmq_amqp_python_client/options.py @@ -1,9 +1,15 @@ +from typing import Optional + from .entities import ConsumerOptions from .qpid.proton._data import ( # noqa: E402 + Data, PropertyDict, symbol, ) -from .qpid.proton._endpoints import Link # noqa: E402 +from .qpid.proton._endpoints import ( # noqa: E402 + Link, + Terminus, +) from .qpid.proton.reactor import ( # noqa: E402 Filter, LinkOption, @@ -52,8 +58,23 @@ def apply(self, link: Link) -> None: link.source.dynamic = False +class DynamicReceiverOption(LinkOption): # type: ignore + + def apply(self, link: Link) -> None: + link.snd_settle_mode = Link.SND_SETTLED + # link.rcv_settle_mode = Link.RCV_FIRST + link.source.expiry_policy = Terminus.EXPIRE_WITH_LINK + link.properties = PropertyDict({symbol("paired"): True}) + link.source.dynamic = True + data = link.source.capabilities + data.put_array(False, Data.SYMBOL) + data.enter() + data.put_string("rabbitmq:volatile-queue") + data.exit() + + class ReceiverOptionUnsettled(LinkOption): # type: ignore - def __init__(self, addr: str): + def __init__(self, addr: Optional[str]): self._addr = addr def apply(self, link: Link) -> None: @@ -68,7 +89,7 @@ def test(self, link: Link) -> bool: class ReceiverOptionUnsettledWithFilters(Filter): # type: ignore - def __init__(self, addr: str, consumer_options: ConsumerOptions): + def __init__(self, addr: Optional[str], consumer_options: ConsumerOptions): super().__init__(consumer_options.filter_set()) self._addr = addr diff --git a/rabbitmq_amqp_python_client/publisher.py b/rabbitmq_amqp_python_client/publisher.py index ccb4486..455d9ed 100644 --- a/rabbitmq_amqp_python_client/publisher.py +++ b/rabbitmq_amqp_python_client/publisher.py @@ -94,7 +94,7 @@ def publish(self, message: Message) -> Delivery: return self._sender.send(message) else: if message.address != "": - if validate_address(message.address) is False: + if not validate_address(message.address): raise ArgumentOutOfRangeException( "destination address must start with /queues or /exchanges" ) diff --git a/rabbitmq_amqp_python_client/qpid/proton/_utils.py b/rabbitmq_amqp_python_client/qpid/proton/_utils.py index 25435a7..b1f1089 100644 --- a/rabbitmq_amqp_python_client/qpid/proton/_utils.py +++ b/rabbitmq_amqp_python_client/qpid/proton/_utils.py @@ -44,6 +44,7 @@ ) from ._reactor import Container from ._url import Url +from ...options import DynamicReceiverOption try: from typing import Literal @@ -505,6 +506,17 @@ def create_sender( ), ) + def create_dynamic_receiver( + self, credit: Optional[int] = None, handler: Optional[Handler] = None + ): + return self.create_receiver( + credit=credit, + dynamic=True, + options=DynamicReceiverOption(), + handler=handler, + name="dynamic-receiver_" + str(id(self)), + ) + def create_receiver( self, address: Optional[str] = None, @@ -514,6 +526,8 @@ def create_receiver( name: Optional[str] = None, options: Optional[ Union[ + "DynamicReceiverOption", + List["DynamicReceiverOption"], "ReceiverOption", List["ReceiverOption"], "LinkOption", diff --git a/tests/direct_reply_to/__init__.py b/tests/direct_reply_to/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/direct_reply_to/test_direct_reply.py b/tests/direct_reply_to/test_direct_reply.py new file mode 100644 index 0000000..a92e8be --- /dev/null +++ b/tests/direct_reply_to/test_direct_reply.py @@ -0,0 +1,57 @@ +from rabbitmq_amqp_python_client import ( + Connection, + Converter, + DirectReplyToConsumerOptions, + Environment, + OutcomeState, +) +from rabbitmq_amqp_python_client.qpid.proton import Message + + +def test_consumer_create_reply_name(connection: Connection) -> None: + consumer = connection.consumer(consumer_options=DirectReplyToConsumerOptions()) + assert "/queues/amq.rabbitmq.reply-to." in consumer.address + + +def create_connection(environment: Environment) -> Connection: + connection = environment.connection() + connection.dial() + return connection + + +def test_direct_reply_to_send_and_receive(environment: Environment) -> None: + """Test that messages can be published to and consumed from a direct reply-to queue.""" + messages_to_send = 100 + + # Create a consumer using DirectReplyToConsumerOptions + consumer = create_connection(environment).consumer( + credit=100, consumer_options=DirectReplyToConsumerOptions() + ) + + # Get the queue address from the consumer + addr = consumer.address + assert addr is not None + assert "/queues/amq.rabbitmq.reply-to." in addr + + # Create a new connection and publisher to publish to the reply-to address + publisher = create_connection(environment).publisher(addr) + + # Publish messages to the direct reply-to queue + for i in range(messages_to_send): + msg = Message(body=Converter.string_to_bytes("test message {}".format(i))) + status = publisher.publish(msg) + assert status.remote_state == OutcomeState.ACCEPTED + + # Consume messages synchronously + consumed = 0 + for i in range(messages_to_send): + message = consumer.consume() + if Converter.bytes_to_string(message.body) == "test message {}".format(i): + consumed = consumed + 1 + + # Clean up + publisher.close() + consumer.close() + + # Verify all messages were received + assert consumed == messages_to_send