Skip to content

Conversation

@oliverlambson
Copy link
Contributor

@oliverlambson oliverlambson commented Nov 15, 2025

Stacked on #788

Closes #784

Implements support for ADR-42


This is mostly a port of the implementation in nats.go – but adapted to try be in-line with the way things are done in nats.py.

@oliverlambson oliverlambson marked this pull request as draft November 15, 2025 16:04
@oliverlambson oliverlambson marked this pull request as ready for review November 15, 2025 16:17
@oliverlambson
Copy link
Contributor Author

oliverlambson commented Nov 15, 2025

As far as I can tell, the failing tests are unrelated to this PR and nats.py main will also fail those tests.

The current nats-server@main behaviour seems to be different to the 2.9-2.12 behaviour for StreamConfig: values like allow_direct are expected to be False but are coming back as None

#788 will fix this

@oliverlambson
Copy link
Contributor Author

@wallyqs @caspervonb any thoughts on this?

@oliverlambson oliverlambson force-pushed the js-consumer-groups branch 3 times, most recently from e07767b to 33960e4 Compare November 21, 2025 12:10
@oliverlambson oliverlambson changed the title Add jetstream consumer priority groups Add jetstream consumer priority groups (ADR-42) Nov 21, 2025
@caspervonb
Copy link
Collaborator

Sketched out an implementation as-well, will review shortly.

@oliverlambson
Copy link
Contributor Author

@caspervonb great👍 not wedded to any particular implementation, I’m just keen for the feature so I can do better event sourcing

@caspervonb caspervonb requested a review from Copilot November 27, 2025 11:31
Copilot finished reviewing on behalf of caspervonb November 27, 2025 11:34
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR implements support for JetStream consumer priority groups (ADR-42), enabling flexible failover and priority management for multiple clients pulling from the same consumer. The implementation is ported from nats.go and adapted to fit nats.py conventions.

Key Changes

  • Adds three priority policies (PINNED, OVERFLOW, PRIORITIZED) for managing consumer message delivery
  • Implements pin ID tracking to support pinned consumer behavior
  • Extends fetch API with min_pending, min_ack_pending, and priority parameters for overflow and prioritized policies
  • Adds unpin_consumer API for manual unpinning of pinned consumers

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
nats/src/nats/js/api.py Adds PriorityPolicy enum, PriorityGroupState dataclass, PIN_ID_MISMATCH status code, and extends ConsumerConfig with priority fields
nats/src/nats/js/errors.py Adds PinIdMismatchError exception for handling 423 errors
nats/src/nats/js/client.py Implements priority_group parameter, pin_id tracking, and extends fetch() with priority-related parameters
nats/src/nats/js/manager.py Adds unpin_consumer method for manually unpinning consumers
nats/tests/test_js.py Updates version-specific assertions and adds comprehensive tests for all three priority policies
Comments suppressed due to low confidence (3)

nats/src/nats/js/client.py:664

  • Missing documentation for new parameter: The docstring should document the new priority_group parameter. This is an important parameter for the priority groups feature.

Suggested addition:
:param priority_group: Priority group name for this pull subscription (requires consumer configured with priority_groups).

        """
        pull_subscribe returns a `PullSubscription` that can be delivered messages
        from a JetStream pull based consumer by calling `sub.fetch`.

        ::

            import asyncio
            import nats

            async def main():
                nc = await nats.connect()
                js = nc.jetstream()

                await js.add_stream(name='mystream', subjects=['foo'])
                await js.publish('foo', b'Hello World!')

                msgs = await sub.fetch()
                msg = msgs[0]
                await msg.ack()

                await nc.close()

            if __name__ == '__main__':
                asyncio.run(main())

        """

nats/src/nats/js/client.py:580

  • Missing documentation for new parameter: The docstring should document the new priority_group parameter. This is an important parameter for the priority groups feature.

Suggested addition:
:param priority_group: Priority group name for this pull subscription (requires consumer configured with priority_groups).

        """Create consumer and pull subscription.

        1. Find stream name by subject if `stream` is not passed.
        2. Create consumer with the given `config` if not created.
        3. Call `pull_subscribe_bind`.

        ::

            import asyncio
            import nats

            async def main():
                nc = await nats.connect()
                js = nc.jetstream()

                await js.add_stream(name='mystream', subjects=['foo'])
                await js.publish('foo', b'Hello World!')

                sub = await js.pull_subscribe('foo', stream='mystream')

                msgs = await sub.fetch()
                msg = msgs[0]
                await msg.ack()

                await nc.close()

            if __name__ == '__main__':
                asyncio.run(main())

        """

nats/src/nats/js/client.py:1086

  • Missing documentation for new parameters: The docstring should document the new min_pending, min_ack_pending, and priority parameters that were added to the fetch method. These are important parameters for priority group functionality and should be documented for API consumers.

Suggested additions:

  • :param min_pending: Minimum number of messages pending (OVERFLOW policy).
  • :param min_ack_pending: Minimum number of pending acks (OVERFLOW policy).
  • :param priority: Priority level 0-9 where 0 is highest (PRIORITIZED policy).
            """
            fetch makes a request to JetStream to be delivered a set of messages.

            :param batch: Number of messages to fetch from server.
            :param timeout: Max duration of the fetch request before it expires.
            :param heartbeat: Idle Heartbeat interval in seconds for the fetch request.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

priority_policy: Optional[PriorityPolicy] = None

# The duration (seconds) after which the client will be unpinned if no new
# pull requests are sent.Used with PriorityPolicy.PINNED.
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space in comment. Should be "sent. Used" instead of "sent.Used".

Suggested change
# pull requests are sent.Used with PriorityPolicy.PINNED.
# pull requests are sent. Used with PriorityPolicy.PINNED.

Copilot uses AI. Check for mistakes.
next_req["min_pending"] = min_pending
if min_ack_pending:
next_req["min_ack_pending"] = min_ack_pending
if priority:
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The condition if priority: will not add priority to the request when priority=0, which is the highest priority value. This should be if priority is not None: to match the validation logic and ensure priority=0 is correctly sent to the server.

Suggested change
if priority:
if priority is not None:

Copilot uses AI. Check for mistakes.
next_req["min_pending"] = min_pending
if min_ack_pending:
next_req["min_ack_pending"] = min_ack_pending
if priority:
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The condition if priority: will not add priority to the request when priority=0, which is the highest priority value. This should be if priority is not None: to match the validation logic and ensure priority=0 is correctly sent to the server.

Suggested change
if priority:
if priority is not None:

Copilot uses AI. Check for mistakes.
Comment on lines +499 to +501
pulling from the same consumer
Introduced in nats-server 2.12.0.
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation inconsistency: The class docstring states "Introduced in nats-server 2.12.0" but PINNED and OVERFLOW were actually introduced in nats-server 2.11.0 (as mentioned in the references and test skip messages). Only PRIORITIZED was introduced in 2.12.0. Consider updating the class-level docstring to indicate "Introduced in nats-server 2.11.0" and keep the specific note about PRIORITIZED being from 2.12.0.

Suggested change
pulling from the same consumer
Introduced in nats-server 2.12.0.
pulling from the same consumer.
Introduced in nats-server 2.11.0.
The PRIORITIZED policy was added in nats-server 2.12.0.

Copilot uses AI. Check for mistakes.
nc = await nats.connect()

server_version = nc.connected_server_version
if server_version.major == 2 and server_version.minor < 12:
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version check inconsistency: The skip message says "requires nats-server v2.11.0 or later" but the condition checks for < 12. This should be server_version.minor < 11 to match the documented requirement. The OVERFLOW priority policy was introduced in nats-server 2.11.0, not 2.12.0.

Suggested change
if server_version.major == 2 and server_version.minor < 12:
if server_version.major == 2 and server_version.minor < 11:

Copilot uses AI. Check for mistakes.

async def unpin_consumer(self, stream_name: str, consumer_name: str, group: str) -> None:
"""
unpin_consumer unpins a pinned consumer.
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete documentation: The docstring should document the parameters and provide more detail about what unpinning means and when it's used.

Suggested improvement:

"""
Unpin a pinned consumer for a priority group.

Manually unpins a consumer that was pinned by the PINNED priority policy,
allowing other consumers in the same priority group to receive messages
without waiting for the pin timeout to expire.

:param stream_name: Name of the stream.
:param consumer_name: Name of the consumer.
:param group: Priority group name to unpin.
:raises NotFoundError: If the stream or consumer doesn't exist.
"""
Suggested change
unpin_consumer unpins a pinned consumer.
Unpin a pinned consumer for a priority group.
Manually unpins a consumer that was pinned by the PINNED priority policy,
allowing other consumers in the same priority group to receive messages
without waiting for the pin timeout to expire.
:param stream_name: Name of the stream.
:param consumer_name: Name of the consumer.
:param group: Priority group name to unpin.
:raises NotFoundError: If the stream or consumer doesn't exist.

Copilot uses AI. Check for mistakes.
next_req["min_pending"] = min_pending
if min_ack_pending:
next_req["min_ack_pending"] = min_ack_pending
if priority:
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The condition if priority: will not add priority to the request when priority=0, which is the highest priority value. This should be if priority is not None: to match the validation logic at line 1138 and ensure priority=0 is correctly sent to the server.

Suggested change
if priority:
if priority is not None:

Copilot uses AI. Check for mistakes.
nc = await nats.connect()

server_version = nc.connected_server_version
if server_version.major == 2 and server_version.minor < 12:
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Version check inconsistency: The skip message says "requires nats-server v2.11.0 or later" but the condition checks for < 12. This should be server_version.minor < 11 to match the documented requirement. The PINNED priority policy was introduced in nats-server 2.11.0, not 2.12.0.

Suggested change
if server_version.major == 2 and server_version.minor < 12:
if server_version.major == 2 and server_version.minor < 11:

Copilot uses AI. Check for mistakes.
for i in range(0, 100):
await js.publish("foo", f"{i}".encode())
with pytest.raises(TimeoutError):
msgs = await psub.fetch(10, timeout=0.5, min_pending=110)
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment to 'msgs' is unnecessary as it is redefined before this value is used.

Suggested change
msgs = await psub.fetch(10, timeout=0.5, min_pending=110)
await psub.fetch(10, timeout=0.5, min_pending=110)

Copilot uses AI. Check for mistakes.
priority_group="A",
)
with pytest.raises(TimeoutError):
msgs = await psub.fetch(10, timeout=0.5, min_ack_pending=10)
Copy link

Copilot AI Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assignment to 'msgs' is unnecessary as it is redefined before this value is used.

Suggested change
msgs = await psub.fetch(10, timeout=0.5, min_ack_pending=10)
await psub.fetch(10, timeout=0.5, min_ack_pending=10)

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Missing support for jetstream consumer groups

2 participants