[FEATURE] Add queue wrapper +semver: minor#545
Conversation
Reviewer's GuideIntroduces a new AMQP Queue abstraction with retry and dead-letter support, wires in the php-amqplib dependency, documents the feature, and adds both unit and integration tests plus a RabbitMQ service for local testing. Sequence diagram for Queue.publish with optional DLX topologysequenceDiagram
actor Producer
participant Queue
participant AMQPStreamConnection
participant AMQPChannel
participant RabbitMQ
Producer->>Queue: publish(queueName, message, withDlx)
Queue->>Queue: getPublishConnection()
Queue->>Queue: getServers()
Queue->>Queue: shuffle(servers)
Queue->>AMQPStreamConnection: create_connection(shuffledServers, options)
AMQPStreamConnection-->>Queue: connection
Queue->>AMQPStreamConnection: channel()
AMQPStreamConnection-->>Queue: AMQPChannel
alt withDlx is true
Queue->>AMQPChannel: declareQueueWithDLX(queueName)
AMQPChannel->>RabbitMQ: queue_declare(main, retry, failed)
else withDlx is false
Queue->>AMQPChannel: declareQueueWithoutDLX(queueName)
AMQPChannel->>RabbitMQ: queue_declare(main)
end
Queue->>AMQPMessage: __construct(message, properties)
Queue->>AMQPChannel: basic_publish(msg, "", queueName)
AMQPChannel->>RabbitMQ: publish message
Queue->>AMQPChannel: close()
Queue->>AMQPStreamConnection: close()
Queue-->>Producer: return
Sequence diagram for Queue.consume with retry and DLXsequenceDiagram
actor Consumer
participant Queue
participant AMQPStreamConnection
participant AMQPChannel
participant RabbitMQ
participant Callback
Consumer->>Queue: consume(timeout, queueName, callback, withDlx, resetTimeoutOnReceive, qosCount)
Queue->>Queue: getServers()
loop for each server
Queue->>AMQPStreamConnection: create_connection([server], options)
alt connection fails
AMQPStreamConnection-->>Queue: throws QueueException
Queue->>Queue: try next server
else connection ok
AMQPStreamConnection-->>Queue: connection
Queue->>AMQPStreamConnection: channel()
AMQPStreamConnection-->>Queue: AMQPChannel
alt withDlx is true
Queue->>AMQPChannel: declareQueueWithDLX(queueName)
AMQPChannel->>RabbitMQ: queue_declare(main, retry, failed)
else withDlx is false
Queue->>AMQPChannel: declareQueueWithoutDLX(queueName)
AMQPChannel->>RabbitMQ: queue_declare(main)
end
Queue->>AMQPChannel: basic_qos(null, qosCount, null)
Queue->>AMQPChannel: basic_consume(queueName, callbackWrapper)
loop while is_consuming and before timeout
AMQPChannel->>RabbitMQ: wait for message
RabbitMQ-->>AMQPChannel: deliver AMQPMessage
AMQPChannel-->>Queue: invoke callbackWrapper(msg)
alt resetTimeoutOnReceive is true
Queue->>Queue: reset startTime
end
Queue->>Callback: callback(msg)
alt callback returns false or throws
Queue->>Queue: handleRetry(channel, msg, queueName)
Queue->>AMQPMessage: ack()
Queue->>AMQPMessage: __construct(body, propertiesWithRetryCount)
Queue->>AMQPChannel: basic_publish(msg, "", retryOrFailedQueue)
AMQPChannel->>RabbitMQ: route to retry or failed queue
else callback succeeds
Queue->>AMQPMessage: ack()
end
Queue->>Queue: check timeout
end
Queue->>AMQPChannel: close()
Queue->>AMQPStreamConnection: close()
end
end
Queue-->>Consumer: return
Class diagram for new AMQP Queue abstractionclassDiagram
direction LR
class IQueue {
<<interface>>
+publish(queueName, message, withDlx)
+consume(timeout, queueName, callback, withDlx, resetTimeoutOnReceive, qosCount)
}
class Queue {
-string[] connectionStrings
-int[] retryDelaysMs
-const DEFAULT_PORT
-const DEFAULT_QOS_COUNT
-const CONNECTION_TIMEOUT
-const RETRY_COUNT_HEADER
-const DEFAULT_RETRY_DELAYS_MS
+__construct(connectionStrings, retryDelaysMs)
-parseServer(connectionString)
-getServers()
-createConnection(servers)
-getPublishConnection()
-declareQueueWithoutDLX(channel, queueName)
-declareQueueWithDLX(channel, queueName)
-getRetryCount(msg)
-handleRetry(channel, msg, queueName)
+publish(queueName, message, withDlx)
+consume(timeout, queueName, callback, withDlx, resetTimeoutOnReceive, qosCount)
}
class QueueException {
-string operation
+__construct(message, operation, code, previous)
+getOperation()
}
class AMQPStreamConnection {
+create_connection(servers, options)
+channel()
+close()
}
class AMQPChannel {
+queue_declare(queueName, passive, durable, exclusive, autoDelete, nowait, arguments)
+basic_publish(msg, exchange, routingKey)
+basic_qos(prefetchSize, prefetchCount, global)
+basic_consume(queue, consumerTag, noLocal, noAck, exclusive, nowait, callback)
+is_consuming()
+wait(callback, nonBlocking)
+close()
}
class AMQPMessage {
+string body
+const DELIVERY_MODE_PERSISTENT
+__construct(body, properties)
+ack()
+get(name)
}
class AMQPTable {
+__construct(data)
+getNativeData()
}
Queue ..|> IQueue
QueueException --|> Exception
Queue --> QueueException
Queue --> AMQPStreamConnection
Queue --> AMQPChannel
Queue --> AMQPMessage
Queue --> AMQPTable
AMQPStreamConnection --> AMQPChannel
AMQPChannel --> AMQPMessage
AMQPMessage --> AMQPTable
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 20 minutes and 21 seconds. β How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. π¦ How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. βΉοΈ Review infoβοΈ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: π Files selected for processing (2)
WalkthroughAdds a new AMQP Queue feature: interface, implementation, custom exception, docs, tests, composer dependency on php-amqplib, and a RabbitMQ service in docker-compose; enables DLX-based retry topology with exponential backoff and multi-server handling. (β€50 words) Changes
Sequence Diagram(s)sequenceDiagram
participant Publisher as Publisher
participant Queue as Queue Wrapper
participant Broker as RabbitMQ
participant Consumer as Consumer Callback
Publisher->>Queue: publish(queueName, message, withDlx=true)
Queue->>Broker: connect & declare topology (main, retry queues, failed)
Queue->>Broker: publish persistent JSON (x-pancake-retry-count: 0)
Queue->>Broker: close connection
Consumer->>Queue: consume(timeout, queueName, callback, withDlx=true)
Queue->>Broker: connect & declare topology
Queue->>Broker: set QoS prefetch
Broker-->>Queue: deliver message (headers + body)
Queue->>Consumer: invoke callback(message)
alt callback returns false or throws
Consumer-->>Queue: false / Exception
Queue->>Queue: handleRetry() read/increment x-pancake-retry-count
Queue->>Broker: ack original message
alt retry count < limit
Queue->>Broker: publish to queue-retry-N (incremented header)
else retries exhausted
Queue->>Broker: publish to queue-failed
end
else callback succeeds
Consumer-->>Queue: success
Queue->>Broker: ack message
end
Queue->>Broker: close connection
Estimated Code Review Effortπ― 4 (Complex) | β±οΈ ~60 minutes Possibly Related Issues
Poem
π₯ Pre-merge checks | β 2 | β 1β Failed checks (1 warning)
β Passed checks (2 passed)
βοΈ Tip: You can configure your own custom pre-merge checks in the settings. β¨ Finishing Touchesπ§ͺ Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
Failed to generate code suggestions for PR |
|
|
Overall GradeΒ Β |
SecurityΒ Β ReliabilityΒ Β ComplexityΒ Β HygieneΒ Β |
Code Review Summary
| Analyzer | Status | Updated (UTC) | Details |
|---|---|---|---|
| Code coverage | Apr 14, 2026 10:16p.m. | ReviewΒ β | |
| SQL | Apr 14, 2026 10:16p.m. | ReviewΒ β | |
| Secrets | Apr 14, 2026 10:16p.m. | ReviewΒ β | |
| PHP | Apr 14, 2026 10:16p.m. | ReviewΒ β | |
| Docker | Apr 14, 2026 10:16p.m. | ReviewΒ β |
Important
AI Review is run only on demand for your team. We're only showing results of static analysis review right now. To trigger AI Review, comment @deepsourcebot review on this thread.
There was a problem hiding this comment.
Hey - I've found 5 issues, and left some high level feedback:
- In
Queue::consume, the timeout is measured per-server rather than across the whole consume call, so when multiple servers are configured the overall runtime can be up totimeout * serverCount; consider enforcing a single global timeout if the intention is to cap total processing time. - Constructor and public methods on
Queue/IQueueaccept untyped parameters (e.g.$connectionStrings,$retryDelaysMs,$timeout), which can easily mask invalid usage; adding scalar/array type hints would make the API safer and align better with the documented types.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `Queue::consume`, the timeout is measured per-server rather than across the whole consume call, so when multiple servers are configured the overall runtime can be up to `timeout * serverCount`; consider enforcing a single global timeout if the intention is to cap total processing time.
- Constructor and public methods on `Queue`/`IQueue` accept untyped parameters (e.g. `$connectionStrings`, `$retryDelaysMs`, `$timeout`), which can easily mask invalid usage; adding scalar/array type hints would make the API safer and align better with the documented types.
## Individual Comments
### Comment 1
<location path="src/Queue/Queue.php" line_range="297-306" />
<code_context>
+ *
+ * @throws QueueException On connection or publish failure.
+ */
+ public function publish($queueName, $message, $withDlx = true)
+ {
+ $connection = $this->getPublishConnection();
+ $channel = $connection->channel();
+
+ if ($withDlx) {
+ $this->declareQueueWithDLX($channel, $queueName);
+ } else {
+ $this->declareQueueWithoutDLX($channel, $queueName);
+ }
+
+ $msg = new AMQPMessage($message, [
+ 'content_type' => 'application/json',
+ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
+ ]);
+
+ $channel->basic_publish($msg, '', $queueName);
+
+ $channel->close();
+ $connection->close();
+ }
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Publish does not wrap channel-level exceptions into QueueException as documented.
Since the docblock promises `QueueException` on publish failure, any errors after establishing the connection (`channel()`, `queue_declare`, `basic_publish`, `close`) should also be wrapped. Please wrap the publish flow in a try/catch and rethrow as `QueueException` (e.g., with operation `'publish'`), mirroring `createConnection`, so callers reliably get the documented exception type.
</issue_to_address>
### Comment 2
<location path="src/Queue/Queue.php" line_range="350-300" />
<code_context>
+ ) {
+ $servers = $this->getServers();
+
+ foreach ($servers as $server) {
+ try {
+ $connection = $this->createConnection([$server]);
+ } catch (QueueException $e) {
+ continue; // Server unreachable β try the next one.
+ }
+
+ $channel = $connection->channel();
+
+ if ($withDlx) {
</code_context>
<issue_to_address>
**question (bug_risk):** Timeout semantics are per-server, not global, which may surprise callers.
The `consume()` docblock implies a single overall `$timeout`, but the timer is effectively reset for each server in the `foreach`. If you intend a global timeout across all servers, consider capturing a single `$globalStartTime` before the loop and computing remaining time per server. Otherwise, clarify in the docs (or enforce in code) that the timeout is per server to avoid much longer runtimes when multiple servers are configured.
</issue_to_address>
### Comment 3
<location path="src/Queue/Queue.php" line_range="395-398" />
<code_context>
+ $channel->basic_qos(null, $qosCount, null);
+ $channel->basic_consume($queueName, '', false, false, false, false, $fn);
+
+ while ($channel->is_consuming()) {
+ $channel->wait(null, true);
+
+ if ($startTime + $timeout < time()) {
+ break;
+ }
</code_context>
<issue_to_address>
**issue (bug_risk):** Timeout is not enforced when no messages arrive because `wait()` is called without a timeout.
Because `wait(null, true)` can block indefinitely when there are no messages or heartbeats, the timeout condition may never be checked and this method may never return. To ensure `$timeout` is respected even when idle, pass a finite timeout to `wait()` (e.g. remaining time or a small fixed interval) and handle `AMQPTimeoutException` so the loop can exit cleanly.
</issue_to_address>
### Comment 4
<location path="src/Queue/Queue.php" line_range="259-268" />
<code_context>
+ * @param AMQPMessage $msg The message that failed processing.
+ * @param string $queueName Base queue name (used to derive retry/failed queue names).
+ */
+ private function handleRetry($channel, $msg, $queueName)
+ {
+ $retryCount = $this->getRetryCount($msg);
+ $maxRetries = count($this->retryDelaysMs);
+
+ $msg->ack();
+
+ $targetQueue = $retryCount < $maxRetries
+ ? $queueName . '-retry-' . ($retryCount + 1)
+ : $queueName . '-failed';
+
+ $newMsg = new AMQPMessage(
+ $msg->body,
+ [
+ 'content_type' => 'application/json',
+ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
+ 'application_headers' => new AMQPTable([
+ self::RETRY_COUNT_HEADER => $retryCount + 1,
+ ]),
+ ]
+ );
+
+ $channel->basic_publish($newMsg, '', $targetQueue);
+ }
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Acknowledging before publishing to the retry/failed queue can cause message loss on publish failures.
Because `ack()` happens before `basic_publish()`, any exception during `basic_publish()` (e.g. connection issues) will permanently drop the message. Consider publishing first and only acking on success, or catching failures from `basic_publish()` and requeueing or otherwise persisting the original message for later recovery, depending on your loss/duplication tolerance.
Suggested implementation:
```
* Handles a failed message by routing it to the appropriate retry queue or
* the failed queue when all retry levels are exhausted.
*
* The original message is acknowledged only after the new message is
* successfully published, to avoid message loss on publish failures.
*
* @param AMQPChannel $channel Active AMQP channel.
* @param AMQPMessage $msg The message that failed processing.
* @param string $queueName Base queue name (used to derive retry/failed queue names).
*/
```
```
private function handleRetry($channel, $msg, $queueName)
{
$retryCount = $this->getRetryCount($msg);
$maxRetries = count($this->retryDelaysMs);
$targetQueue = $retryCount < $maxRetries
? $queueName . '-retry-' . ($retryCount + 1)
: $queueName . '-failed';
$newMsg = new AMQPMessage(
$msg->body,
[
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable([
self::RETRY_COUNT_HEADER => $retryCount + 1,
]),
]
);
try {
$channel->basic_publish($newMsg, '', $targetQueue);
$msg->ack();
} catch (\Throwable $e) {
// Requeue the original message so it is not lost if publish fails.
if (method_exists($msg, 'get')) {
$deliveryTag = $msg->get('delivery_tag');
$channel->basic_nack($deliveryTag, false, true);
}
throw $e;
}
}
```
</issue_to_address>
### Comment 5
<location path="docs/user-guide/queue.md" line_range="3-6" />
<code_context>
+# Queue
+
+## Table of content
+
+- [Queue](#queue)
+ - [Table of content](#table-of-content)
+ - [About](#about)
+ - [Requirements](#requirements)
</code_context>
<issue_to_address>
**suggestion (typo):** Use "Table of contents" instead of "Table of content" in the heading and link.
Also update the link text and its anchor to match the corrected heading.
```suggestion
## Table of contents
- [Queue](#queue)
- [Table of contents](#table-of-contents)
```
</issue_to_address>Help me be more useful! Please click π or π on each comment and I'll use the feedback to improve your reviews.
Co-authored-by: sourcery-ai[bot] <58596630+sourcery-ai[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 9
π€ Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docker-compose.yml`:
- Around line 49-51: The docker-compose RabbitMQ environment uses
RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS which differ from the
integration test expectations (RABBITMQ_USER and RABBITMQ_PASS); update the
environment keys in docker-compose.yml to match the test variables (use
RABBITMQ_USER and RABBITMQ_PASS) or set both pairs so tests and runtime read the
same creds, ensuring you change the entries that reference
RABBITMQ_DEFAULT_USER/RABBITMQ_DEFAULT_PASS to the matching
RABBITMQ_USER/RABBITMQ_PASS names (or add duplicates) so authentication remains
consistent with the tests.
In `@src/Queue/Queue.php`:
- Around line 352-357: The consume() loop over $servers swallows all
QueueException errors from createConnection() and returns normally when every
server fails, making an all-servers-down situation look like βno messagesβ;
update the logic in consume() (the foreach that calls
createConnection([$server])) to detect when no connection could be established
and throw a QueueException instead of silently continuingβcollect or track
failures while iterating, and after the loop throw a new QueueException (or
rethrow an aggregated/last exception) indicating connection/consume failure;
apply the same change to the other identical block around the createConnection
calls at the 405-407 area so callers receive an error per the IQueue contract.
- Around line 352-357: In consume() the overall timeout is broken because
$startTime is reset inside the foreach ($servers as $server) loop and wait() is
used in non-blocking mode without catching
AMQPTimeoutException/AMQPRuntimeException; fix by moving the $startTime
initialization outside the servers loop (so the total elapsed time across
servers is measured), compute remaining timeout per-server before calling
$channel->wait(null, true), and wrap the $channel->wait(null, true) call in a
try/catch that handles AMQPTimeoutException and AMQPRuntimeException (and any
socket-related exceptions) to break/continue gracefully and respect the overall
timeout; ensure createConnection([$server]) behavior remains unchanged and that
consume() returns or stops when the overall timeout is exceeded.
- Around line 381-390: The try/catch currently always calls
handleRetry($channel, $msg, $queueName) which republishes to -retry and -failed
queues even when DLX is disabled, causing silent drops; update the failure path
in the block around the callback invocation (the code that calls handleRetry and
$msg->ack()) to check the DLX flag (e.g. $this->withDlx or the local $withDlx)
and only call handleRetry when DLX is enabled; when DLX is disabled, instead
nack/reject the message using the channel/message API to either requeue
(basic_nack/basic_reject with requeue=true) or reject without publishing to
non-existent queues (choose requeue behavior consistent with your retry policy),
and ensure the same change is applied in both the result===false branch and the
catch (\Throwable $e) branch so failed messages are not published to undeclared
-retry/-failed queues.
- Around line 91-107: The vhost handling must decode percent-encodings and
normalize double-slash cases: after parse_url($connectionString) compute the raw
path (e.g. $rawPath = $url['path'] ?? '') then derive $vhostCandidate =
rawurldecode(ltrim($rawPath, '/')); if $vhostCandidate === '' set 'vhost' => '/'
else set 'vhost' => $vhostCandidate; keep the rest of the returned array (host,
port using self::DEFAULT_PORT, user, password) unchanged so amqp://host//,
amqp://host/, amqp://host and amqp://host/%2F all resolve correctly.
- Around line 266-283: Wrap the ack+publish in an AMQP transaction so both
succeed or both fail: call $channel->tx_select() before performing $msg->ack(),
then publish the new AMQPMessage (constructed with self::RETRY_COUNT_HEADER,
$msg->body, delivery options) to $targetQueue via $channel->basic_publish(), and
finally call $channel->tx_commit(); on any exception, call
$channel->tx_rollback() and rethrow or handle the error (e.g., do not ack or
requeue) so you avoid losing the message when $channel->basic_publish() fails.
In `@tests/Integration/QueueTest.php`:
- Around line 257-263: The test
testPublishThrowsQueueExceptionWhenAllServersDown uses a loopback IP (127.0.0.2)
that can be reachable and makes the failure-path flaky; update the Queue
instantiation in that test to use a guaranteed-invalid endpoint (e.g., an
.invalid hostname) instead of 'amqp://guest:guest@127.0.0.2:5672/' so the call
to Queue->publish('irrelevant', '{}', false) reliably fails and triggers the
expected QueueException.
- Around line 139-147: The tests are redeclaring retry queues without DLX which
causes RabbitMQ PRECONDITION_FAILED; update the consume calls in tests (the
Queue::consume(...) invocation that currently passes false) to pass true for the
last parameter when the queue name matches the retry pattern (e.g., contains
'-retry-') or otherwise make the $withDlx argument conditional so retry queues
are declared with declareQueueWithDLX() instead of declareQueueWithoutDLX();
reference Queue::consume, declareQueueWithDLX and declareQueueWithoutDLX to
locate and change the call sites (also apply the same fix for the other
occurrence around lines 213-221).
- Around line 38-45: The DSN composition uses the raw $vhost which causes a
double-slash and mis-parsing for the default '/' vhost; encode the vhost before
injecting it into self::$dsn (e.g. use rawurlencode on the value returned by
getenv('RABBITMQ_VHOST') or on $vhost) and then build self::$dsn with the
encoded vhost so parse_url() and Queue::... logic receive a correctly encoded
path.
πͺ Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
βΉοΈ Review info
βοΈ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2ad22aeb-ef63-466b-86a6-9ea651b2a26f
π Files selected for processing (11)
composer.jsondocker-compose.ymldocs/changelog.mddocs/user-guide/queue.mdmkdocs.ymlsrc/Queue/IQueue.phpsrc/Queue/Queue.phpsrc/Queue/QueueException.phptests/Integration/QueueTest.phptests/Unit/Queue/QueueExceptionTest.phptests/Unit/Queue/QueueTest.php
|
@gstraccini composer update lock |
|
Updating |
|
Workflow update-composer-lock.yml failed: β |
|
@gstraccini composer update lock |
|
Updating |
|
Workflow update-composer-lock.yml failed: β |
|
@gstraccini composer update lock |
|
Updating |
|
β Update composer lock result: |
|
@gstraccini composer update lock |
|
Updating |
|
β Update composer lock result: |
|
Review the following changes in direct dependencies. Learn more about Socket for GitHub.
|
|
Infisical secrets check: β No secrets leaked! π» Scan logs2026-04-14T22:15:49Z INF scanning for exposed secrets...
10:15PM INF 445 commits scanned.
2026-04-14T22:15:50Z INF scan completed in 549ms
2026-04-14T22:15:50Z INF no leaks found
|
π Description
Add queue wrapper
β Checks
β’οΈ Does this introduce a breaking change?
Summary by Sourcery
Introduce an AMQP-based Queue abstraction with retry and dead-letter support, along with corresponding tests, documentation, and infrastructure updates.
New Features:
Build:
Documentation:
Tests:
Summary by CodeRabbit
New Features
Documentation
Platform
Tests