diff --git a/python/natsrpy/_natsrpy_rs/__init__.pyi b/python/natsrpy/_natsrpy_rs/__init__.pyi index 7ecd342..2a00c3f 100644 --- a/python/natsrpy/_natsrpy_rs/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/__init__.pyi @@ -34,19 +34,67 @@ class Message: @final class IteratorSubscription: + """Async iterator subscription for receiving NATS messages. + + Returned by :meth:`Nats.subscribe` when no callback is provided. + Messages can be received using ``async for`` or by calling :meth:`next` + directly. + """ + def __aiter__(self) -> IteratorSubscription: ... async def __anext__(self) -> Message: ... - async def next(self, timeout: float | timedelta | None = None) -> Message: ... - async def unsubscribe(self, limit: int | None = None) -> None: ... - async def drain(self) -> None: ... + async def next(self, timeout: float | timedelta | None = None) -> Message: + """Receive the next message from the subscription. + + :param timeout: maximum time to wait for a message in seconds + or as a timedelta, defaults to None (wait indefinitely). + :return: the next message. + :raises StopAsyncIteration: when the subscription is drained or + unsubscribed. + """ + + async def unsubscribe(self, limit: int | None = None) -> None: + """Unsubscribe from the subject. + + :param limit: if set, automatically unsubscribe after receiving + this many additional messages, defaults to None. + """ + + async def drain(self) -> None: + """Drain the subscription. + + Unsubscribes and flushes any remaining messages before closing. + """ @final class CallbackSubscription: - async def unsubscribe(self, limit: int | None = None) -> None: ... - async def drain(self) -> None: ... + """Callback-based subscription for receiving NATS messages. + + Returned by :meth:`Nats.subscribe` when a callback is provided. + Messages are automatically delivered to the callback in a background task. + """ + + async def unsubscribe(self, limit: int | None = None) -> None: + """Unsubscribe from the subject. + + :param limit: if set, automatically unsubscribe after receiving + this many additional messages, defaults to None. + """ + + async def drain(self) -> None: + """Drain the subscription. + + Unsubscribes and flushes any remaining messages before closing. + """ @final class Nats: + """NATS client. + + Provides publish/subscribe messaging, request-reply, and JetStream + access over a connection to one or more NATS servers. + """ + def __new__( cls, /, @@ -60,9 +108,45 @@ class Nats: max_reconnects: int | None = None, connection_timeout: float | timedelta = ..., # 5 sec request_timeout: float | timedelta = ..., # 10 sec - ) -> Self: ... - async def startup(self) -> None: ... - async def shutdown(self) -> None: ... + ) -> Self: + """Create a new NATS client instance. + + The client is not connected until :meth:`startup` is called. + + :param addrs: list of NATS server URLs, defaults to + ``["nats://localhost:4222"]``. + :param user_and_pass: username and password tuple for authentication. + :param nkey: NKey seed for authentication. + :param token: token string for authentication. + :param custom_inbox_prefix: custom prefix for auto-generated inbox + subjects. + :param read_buffer_capacity: size of the read buffer in bytes, + defaults to 65535. + :param sender_capacity: capacity of the internal send channel, + defaults to 128. + :param max_reconnects: maximum number of reconnection attempts, + None means unlimited. + :param connection_timeout: timeout for establishing a connection + in seconds or as a timedelta, defaults to 5 seconds. + :param request_timeout: default timeout for request-reply operations + in seconds or as a timedelta, defaults to 10 seconds. + """ + + async def startup(self) -> None: + """Connect to the NATS server. + + Establishes the connection using the parameters provided at + construction time. Must be called before any publish, subscribe, + or JetStream operations. + """ + + async def shutdown(self) -> None: + """Close the NATS connection. + + Drains all subscriptions and flushes pending data before + disconnecting. + """ + async def publish( self, subject: str, @@ -71,7 +155,18 @@ class Nats: headers: dict[str, Any] | None = None, reply: str | None = None, err_on_disconnect: bool = False, - ) -> None: ... + ) -> None: + """Publish a message to a subject. + + :param subject: subject to publish the message to. + :param payload: message payload. + :param headers: optional NATS headers dictionary. + :param reply: optional reply-to subject for the request-reply + pattern. + :param err_on_disconnect: when True, raise an error if the client + is disconnected, defaults to False. + """ + async def request( self, subject: str, @@ -80,9 +175,30 @@ class Nats: headers: dict[str, Any] | None = None, inbox: str | None = None, timeout: float | timedelta | None = None, - ) -> None: ... - async def drain(self) -> None: ... - async def flush(self) -> None: ... + ) -> None: + """Send a request and discard the response. + + :param subject: subject to send the request to. + :param payload: request payload. + :param headers: optional NATS headers dictionary. + :param inbox: custom inbox subject for the reply, auto-generated + if None. + :param timeout: maximum time to wait for a response in seconds + or as a timedelta, defaults to the client request_timeout. + """ + + async def drain(self) -> None: + """Drain the connection. + + Gracefully closes all subscriptions and flushes pending messages. + """ + + async def flush(self) -> None: + """Flush the connection. + + Waits until all pending messages have been sent to the server. + """ + @overload async def subscribe( self, @@ -105,7 +221,24 @@ class Nats: concurrency_limit: int | None = None, max_ack_inflight: int | None = None, backpressure_on_inflight: bool | None = None, - ) -> js.JetStream: ... + ) -> js.JetStream: + """Create a JetStream context. + + :param domain: JetStream domain to use. + :param api_prefix: custom JetStream API prefix, cannot be used + together with *domain*. + :param timeout: default request timeout for JetStream operations + in seconds or as a timedelta. + :param ack_timeout: acknowledgement timeout for consumers in seconds + or as a timedelta. + :param concurrency_limit: maximum number of concurrent JetStream + operations. + :param max_ack_inflight: maximum number of unacknowledged messages + in flight. + :param backpressure_on_inflight: when True, apply backpressure when + the in-flight limit is reached. + :return: a JetStream context. + """ __all__ = [ "CallbackSubscription", diff --git a/python/natsrpy/_natsrpy_rs/exceptions.pyi b/python/natsrpy/_natsrpy_rs/exceptions.pyi index 72640a5..251de60 100644 --- a/python/natsrpy/_natsrpy_rs/exceptions.pyi +++ b/python/natsrpy/_natsrpy_rs/exceptions.pyi @@ -1,6 +1,11 @@ -class NatsrpyBaseError(Exception): ... -class NatsrpySessionError(NatsrpyBaseError): ... -class NatsrpyPublishError(NatsrpyBaseError): ... +class NatsrpyBaseError(Exception): + """Base exception for all natsrpy errors.""" + +class NatsrpySessionError(NatsrpyBaseError): + """Raised on connection or session-level errors.""" + +class NatsrpyPublishError(NatsrpyBaseError): + """Raised when a publish operation fails.""" __all__ = [ "NatsrpyBaseError", diff --git a/python/natsrpy/_natsrpy_rs/js/__init__.pyi b/python/natsrpy/_natsrpy_rs/js/__init__.pyi index 58c1250..1bccdea 100644 --- a/python/natsrpy/_natsrpy_rs/js/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/js/__init__.pyi @@ -17,6 +17,16 @@ __all__ = [ @final class Publication: + """Result of a JetStream publish with ``wait=True``. + + Attributes: + stream: name of the stream the message was published to. + sequence: sequence number assigned to the message in the stream. + domain: JetStream domain of the stream. + duplicate: whether the server detected this as a duplicate message. + value: optional metadata value returned by the server. + """ + stream: str sequence: int domain: str @@ -25,6 +35,12 @@ class Publication: @final class JetStream: + """JetStream context for persistent messaging. + + Provides access to publish, stream management, key-value stores, + and object stores. + """ + @overload async def publish( self, @@ -56,47 +72,86 @@ class JetStream: wait: bool = False, ) -> Publication | None: ... @property - def kv(self) -> KVManager: ... + def kv(self) -> KVManager: + """Manager for key-value store buckets.""" + @property - def streams(self) -> StreamsManager: ... + def streams(self) -> StreamsManager: + """Manager for JetStream streams.""" + @property - def object_store(self) -> ObjectStoreManager: ... + def object_store(self) -> ObjectStoreManager: + """Manager for object store buckets.""" @final class JetStreamMessage: + """Message received from a JetStream consumer. + + Extends the base message with JetStream-specific metadata + such as stream and consumer sequence numbers, delivery count, + and acknowledgement methods. + """ + @property - def subject(self) -> str: ... + def subject(self) -> str: + """Subject the message was published to.""" + @property - def reply(self) -> str | None: ... + def reply(self) -> str | None: + """Reply-to subject, if any.""" + @property - def payload(self) -> bytes: ... + def payload(self) -> bytes: + """Message payload.""" + @property - def headers(self) -> dict[str, Any]: ... + def headers(self) -> dict[str, Any]: + """Message headers dictionary.""" + @property - def domain(self) -> str | None: ... + def domain(self) -> str | None: + """JetStream domain, if applicable.""" + @property - def acc_hash(self) -> str | None: ... + def acc_hash(self) -> str | None: + """Account hash, if applicable.""" + @property - def stream(self) -> str: ... + def stream(self) -> str: + """Name of the stream the message belongs to.""" + @property - def consumer(self) -> str: ... + def consumer(self) -> str: + """Name of the consumer that received the message.""" + @property - def stream_sequence(self) -> int: ... + def stream_sequence(self) -> int: + """Sequence number of the message in the stream.""" + @property - def consumer_sequence(self) -> int: ... + def consumer_sequence(self) -> int: + """Sequence number of the message in the consumer.""" + @property - def delivered(self) -> int: ... + def delivered(self) -> int: + """Number of times this message has been delivered.""" + @property - def pending(self) -> int: ... + def pending(self) -> int: + """Number of messages pending for the consumer.""" + @property - def published(self) -> datetime: ... + def published(self) -> datetime: + """Timestamp when the message was published.""" + @property - def token(self) -> str | None: ... + def token(self) -> str | None: + """Authentication token, if applicable.""" + async def ack(self, double: bool = False) -> None: - """ - Acknowledge that a message was handled. + """Acknowledge that a message was handled. - :param double: whether to wait for server response, defaults to False + :param double: whether to wait for server response, defaults to False. """ async def nack( @@ -104,45 +159,41 @@ class JetStreamMessage: delay: float | timedelta | None = None, double: bool = False, ) -> None: - """ - Negative acknowledgement. + """Negative acknowledgement. Signals that the message will not be processed now and processing can move onto the next message, NAK'd message will be retried. - :param duration: time, defaults to None - :param double: whether to wait for server response, defaults to False + :param delay: delay before redelivery, defaults to None. + :param double: whether to wait for server response, defaults to False. """ async def progress(self, double: bool = False) -> None: - """ - Progress acknowledgement. + """Progress acknowledgement. - Singnals that the mesasge is being handled right now. + Signals that the message is being handled right now. Sending this request before the AckWait will extend wait period before redelivering a message. - :param double: whether to wait for server response, defaults to False + :param double: whether to wait for server response, defaults to False. """ async def next(self, double: bool = False) -> None: - """ - Next acknowledgement. + """Next acknowledgement. - Only applies to pull consumers! + Only applies to pull consumers. Acknowledges message processing and instructs server to send delivery of the next message to the reply subject. - :param double: whether to wait for server response, defaults to False + :param double: whether to wait for server response, defaults to False. """ async def term(self, double: bool = False) -> None: - """ - Term acknowledgement. + """Term acknowledgement. Instructs server to stop redelivering message. Useful to stop redelivering a message after multiple NACKs. - :param double: whether to wait for server response, defaults to False + :param double: whether to wait for server response, defaults to False. """ diff --git a/python/natsrpy/_natsrpy_rs/js/consumers.pyi b/python/natsrpy/_natsrpy_rs/js/consumers.pyi index ff7408a..0a32338 100644 --- a/python/natsrpy/_natsrpy_rs/js/consumers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/consumers.pyi @@ -18,6 +18,17 @@ __all__ = [ @final class DeliverPolicy: + """Policy controlling which messages a consumer starts receiving. + + Attributes: + ALL: deliver all available messages. + LAST: deliver starting with the last message. + NEW: deliver only new messages. + BY_START_SEQUENCE: deliver from a specific sequence number. + BY_START_TIME: deliver from a specific timestamp. + LAST_PER_SUBJECT: deliver the last message for each subject. + """ + ALL: DeliverPolicy LAST: DeliverPolicy NEW: DeliverPolicy @@ -27,17 +38,43 @@ class DeliverPolicy: @final class AckPolicy: + """Acknowledgement policy for a consumer. + + Attributes: + EXPLICIT: each message must be individually acknowledged. + NONE: no acknowledgement required. + ALL: acknowledging a message implicitly acknowledges all prior + messages. + """ + EXPLICIT: AckPolicy NONE: AckPolicy ALL: AckPolicy @final class ReplayPolicy: + """Replay speed policy for a consumer. + + Attributes: + INSTANT: deliver messages as fast as possible. + ORIGINAL: deliver messages at the rate they were originally + published. + """ + INSTANT: ReplayPolicy ORIGINAL: ReplayPolicy @final class PriorityPolicy: + """Priority dispatch policy for a consumer. + + Attributes: + NONE: no priority dispatching. + OVERFLOW: dispatch to priority groups on overflow. + PINNED_CLIENT: pin messages to a specific client. + PRIORITIZED: dispatch based on message priority. + """ + NONE: PriorityPolicy OVERFLOW: PriorityPolicy PINNED_CLIENT: PriorityPolicy @@ -45,6 +82,42 @@ class PriorityPolicy: @final class PullConsumerConfig: + """Configuration for a pull-based JetStream consumer. + + Attributes: + name: consumer name. + durable_name: durable consumer name for persistence across restarts. + description: human-readable consumer description. + deliver_policy: policy for initial message delivery. + delivery_start_sequence: starting sequence when using + ``BY_START_SEQUENCE`` deliver policy. + delivery_start_time: starting timestamp when using + ``BY_START_TIME`` deliver policy. + ack_policy: acknowledgement policy. + ack_wait: how long the server waits for an acknowledgement. + max_deliver: maximum number of delivery attempts per message. + filter_subject: subject filter for the consumer. + filter_subjects: list of subject filters for the consumer. + replay_policy: message replay speed policy. + rate_limit: rate limit in bits per second. + sample_frequency: percentage of acknowledgements to sample. + max_waiting: maximum pull requests waiting to be fulfilled. + max_ack_pending: maximum outstanding unacknowledged messages. + headers_only: when True, deliver only message headers. + max_batch: maximum messages per pull batch. + max_bytes: maximum bytes per pull batch. + max_expires: maximum pull request expiration duration. + inactive_threshold: duration before an inactive consumer is + removed. + num_replicas: number of consumer replicas. + memory_storage: when True, use in-memory storage. + metadata: custom key-value metadata. + backoff: list of durations for redelivery backoff. + priority_policy: priority dispatch policy. + priority_groups: list of priority group names. + pause_until: timestamp until which the consumer is paused. + """ + name: str | None durable_name: str | None description: str | None @@ -108,6 +181,41 @@ class PullConsumerConfig: @final class PushConsumerConfig: + """Configuration for a push-based JetStream consumer. + + Attributes: + deliver_subject: subject where messages are pushed to. + name: consumer name. + durable_name: durable consumer name for persistence across restarts. + description: human-readable consumer description. + deliver_group: queue group for load-balanced delivery. + deliver_policy: policy for initial message delivery. + delivery_start_sequence: starting sequence when using + ``BY_START_SEQUENCE`` deliver policy. + delivery_start_time: starting timestamp when using + ``BY_START_TIME`` deliver policy. + ack_policy: acknowledgement policy. + ack_wait: how long the server waits for an acknowledgement. + max_deliver: maximum number of delivery attempts per message. + filter_subject: subject filter for the consumer. + filter_subjects: list of subject filters for the consumer. + replay_policy: message replay speed policy. + rate_limit: rate limit in bits per second. + sample_frequency: percentage of acknowledgements to sample. + max_waiting: maximum pull requests waiting to be fulfilled. + max_ack_pending: maximum outstanding unacknowledged messages. + headers_only: when True, deliver only message headers. + flow_control: when True, enable flow control. + idle_heartbeat: interval for idle heartbeat messages. + num_replicas: number of consumer replicas. + memory_storage: when True, use in-memory storage. + metadata: custom key-value metadata. + backoff: list of durations for redelivery backoff. + inactive_threshold: duration before an inactive consumer is + removed. + pause_until: timestamp until which the consumer is paused. + """ + deliver_subject: str name: str | None durable_name: str | None @@ -169,19 +277,41 @@ class PushConsumerConfig: @final class MessagesIterator: + """Async iterator over JetStream consumer messages.""" + def __aiter__(self) -> Self: ... async def __anext__(self) -> JetStreamMessage: ... async def next( self, timeout: float | timedelta | None = None, - ) -> JetStreamMessage: ... + ) -> JetStreamMessage: + """Receive the next message from the consumer. + + :param timeout: maximum time to wait in seconds or as a timedelta, + defaults to None (wait indefinitely). + :return: the next JetStream message. + """ @final class PushConsumer: - async def messages(self) -> MessagesIterator: ... + """A push-based JetStream consumer. + + Messages are delivered by the server to a specified subject. + """ + + async def messages(self) -> MessagesIterator: + """Get an async iterator for consuming messages. + + :return: an async iterator over JetStream messages. + """ @final class PullConsumer: + """A pull-based JetStream consumer. + + Messages are fetched on demand in batches by the client. + """ + async def fetch( self, max_messages: int | None = None, @@ -193,4 +323,21 @@ class PullConsumer: min_pending: int | None = None, min_ack_pending: int | None = None, timeout: float | timedelta | None = None, - ) -> list[JetStreamMessage]: ... + ) -> list[JetStreamMessage]: + """Fetch a batch of messages from the consumer. + + :param max_messages: maximum number of messages to fetch. + :param group: consumer group for priority dispatch. + :param priority: priority level for the fetch request. + :param max_bytes: maximum total bytes to fetch. + :param heartbeat: server heartbeat interval in seconds or as a + timedelta. + :param expires: fetch request expiration in seconds or as a + timedelta. + :param min_pending: minimum pending messages before pausing. + :param min_ack_pending: minimum unacknowledged messages before + pausing. + :param timeout: overall operation timeout in seconds or as a + timedelta. + :return: list of fetched messages. + """ diff --git a/python/natsrpy/_natsrpy_rs/js/kv.pyi b/python/natsrpy/_natsrpy_rs/js/kv.pyi index db65439..f0213fc 100644 --- a/python/natsrpy/_natsrpy_rs/js/kv.pyi +++ b/python/natsrpy/_natsrpy_rs/js/kv.pyi @@ -23,52 +23,115 @@ __all__ = [ @final class KVStatus: + """Status information for a key-value bucket. + + Attributes: + info: underlying stream information. + bucket: name of the key-value bucket. + """ + info: StreamInfo bucket: str @final class KVOperation: + """Type of operation recorded in a key-value entry. + + Attributes: + Put: a value was written or updated. + Delete: the key was deleted. + Purge: the key history was purged. + """ + Put: KVOperation Delete: KVOperation Purge: KVOperation @final class KVEntry: + """A single key-value entry from a watch or history query.""" + @property - def bucket(self) -> str: ... + def bucket(self) -> str: + """Name of the key-value bucket.""" + @property - def key(self) -> str: ... + def key(self) -> str: + """Key of the entry.""" + @property - def value(self) -> bytes: ... + def value(self) -> bytes: + """Value payload.""" + @property - def revision(self) -> int: ... + def revision(self) -> int: + """Revision number of the entry.""" + @property - def delta(self) -> int: ... + def delta(self) -> int: + """Number of pending entries for the watcher.""" + @property - def created(self) -> datetime: ... + def created(self) -> datetime: + """Timestamp when this revision was created.""" + @property - def operation(self) -> KVOperation: ... + def operation(self) -> KVOperation: + """Operation type that produced this entry.""" + @property - def seen_current(self) -> bool: ... + def seen_current(self) -> bool: + """Whether all historical entries have been delivered.""" @final class KVEntryIterator: + """Async iterator over key-value entries.""" + def __aiter__(self) -> Self: ... async def __anext__(self) -> KVEntry: ... - async def next(self, timeout: float | timedelta | None = None) -> KVEntry: ... + async def next(self, timeout: float | timedelta | None = None) -> KVEntry: + """Receive the next key-value entry. + + :param timeout: maximum time to wait in seconds or as a timedelta, + defaults to None (wait indefinitely). + :return: the next entry. + """ @final class KeysIterator: + """Async iterator over key-value bucket keys.""" + def __aiter__(self) -> Self: ... async def __anext__(self) -> str: ... - async def next(self, timeout: float | timedelta | None = None) -> str: ... + async def next(self, timeout: float | timedelta | None = None) -> str: + """Receive the next key. + + :param timeout: maximum time to wait in seconds or as a timedelta, + defaults to None (wait indefinitely). + :return: the next key name. + """ @final class KVConfig: - """ - KV bucket config. + """Configuration for creating or updating a key-value bucket. - Used for creating or updating KV buckets. + Attributes: + bucket: bucket name. + description: human-readable bucket description. + max_value_size: maximum size of a single value in bytes. + history: number of historical revisions to keep per key. + max_age: maximum entry age in seconds. + max_bytes: maximum total bucket size in bytes. + storage: storage backend type. + num_replicas: number of bucket replicas. + republish: configuration for republishing changes. + mirror: source configuration when the bucket is a mirror. + sources: list of source configurations for aggregate buckets. + mirror_direct: when True, enable direct get for mirror buckets. + compression: when True, enable S2 compression. + placement: cluster and tag placement hints. + limit_markers: TTL for delete markers in seconds, also enables + per-message TTL support on the bucket when set. """ bucket: str @@ -108,46 +171,165 @@ class KVConfig: @final class KeyValue: + """Handle for a NATS JetStream key-value bucket. + + Provides CRUD operations, watching for changes, and iteration + over keys and entries. + """ + @property - def stream_name(self) -> str: ... + def stream_name(self) -> str: + """Name of the underlying JetStream stream.""" + @property - def prefix(self) -> str: ... + def prefix(self) -> str: + """Subject prefix used for key-value operations.""" + @property - def put_prefix(self) -> str | None: ... + def put_prefix(self) -> str | None: + """Subject prefix used for put operations, if different.""" + @property - def use_jetstream_prefix(self) -> bool: ... + def use_jetstream_prefix(self) -> bool: + """Whether the JetStream API prefix is used.""" + @property - def name(self) -> str: ... - async def get(self, key: str) -> bytes | None: ... + def name(self) -> str: + """Name of the key-value bucket.""" + + async def get(self, key: str) -> bytes | None: + """Get the current value for a key. + + :param key: key to look up. + :return: value bytes, or None if the key does not exist. + """ + async def delete( self, key: str, expect_revision: int | None = None, - ) -> int: ... - async def update(self, key: str, value: bytes | str, revision: int) -> None: ... + ) -> int: + """Delete a key from the bucket. + + :param key: key to delete. + :param expect_revision: expected current revision for optimistic + concurrency control, defaults to None. + :return: the new revision number. + """ + + async def update(self, key: str, value: bytes | str, revision: int) -> None: + """Update a key only if it matches the expected revision. + + :param key: key to update. + :param value: new value. + :param revision: expected current revision. + """ + async def create( self, key: str, value: bytes | str, ttl: float | timedelta | None = None, - ) -> int: ... - async def put(self, key: str, value: bytes | str) -> int: ... + ) -> int: + """Create a new key. + + Fails if the key already exists. + + :param key: key to create. + :param value: value to store. + :param ttl: optional time-to-live in seconds or as a timedelta. + :return: the initial revision number. + """ + + async def put(self, key: str, value: bytes | str) -> int: + """Put a value for a key, creating or updating as needed. + + :param key: key to set. + :param value: value to store. + :return: the new revision number. + """ + async def purge( self, key: str, ttl: float | timedelta | None = None, expect_revision: int | None = None, - ) -> None: ... - async def history(self, key: str) -> KVEntryIterator: ... - async def entry(self, key: str, revision: int | None = None) -> KVEntry | None: ... + ) -> None: + """Purge all revisions of a key. + + :param key: key to purge. + :param ttl: optional time-to-live for the purge marker in seconds + or as a timedelta. + :param expect_revision: expected current revision for optimistic + concurrency control, defaults to None. + """ + + async def history(self, key: str) -> KVEntryIterator: + """Get the revision history for a key. + + :param key: key to query. + :return: an async iterator over historical entries. + """ + + async def entry(self, key: str, revision: int | None = None) -> KVEntry | None: + """Get a specific entry for a key. + + :param key: key to look up. + :param revision: specific revision to retrieve, defaults to the + latest. + :return: the entry, or None if not found. + """ + async def watch( self, key: str, from_revision: int | None = None, - ) -> KVEntryIterator: ... - async def watch_with_history(self, key: str) -> KVEntryIterator: ... - async def watch_all(self, from_revision: int | None = None) -> KVEntryIterator: ... - async def watch_many(self, keys: list[str]) -> KVEntryIterator: ... - async def watch_many_with_history(self, keys: list[str]) -> KVEntryIterator: ... - async def keys(self) -> KeysIterator: ... - async def status(self) -> KVStatus: ... + ) -> KVEntryIterator: + """Watch a key for changes. + + :param key: key to watch. + :param from_revision: start watching from this revision, + defaults to the latest. + :return: an async iterator over entry changes. + """ + + async def watch_with_history(self, key: str) -> KVEntryIterator: + """Watch a key for changes, delivering all historical entries first. + + :param key: key to watch. + :return: an async iterator starting with history then live changes. + """ + + async def watch_all(self, from_revision: int | None = None) -> KVEntryIterator: + """Watch all keys in the bucket for changes. + + :param from_revision: start watching from this revision, + defaults to the latest. + :return: an async iterator over entry changes. + """ + + async def watch_many(self, keys: list[str]) -> KVEntryIterator: + """Watch multiple keys for changes. + + :param keys: list of keys to watch. + :return: an async iterator over entry changes. + """ + + async def watch_many_with_history(self, keys: list[str]) -> KVEntryIterator: + """Watch multiple keys, delivering all historical entries first. + + :param keys: list of keys to watch. + :return: an async iterator starting with history then live changes. + """ + + async def keys(self) -> KeysIterator: + """List all keys in the bucket. + + :return: an async iterator over key names. + """ + + async def status(self) -> KVStatus: + """Get the status of the key-value bucket. + + :return: bucket status information. + """ diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index a43d212..1caf8c6 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -20,22 +20,86 @@ __all__ = [ @final class StreamsManager: - async def create(self, config: StreamConfig) -> Stream: ... - async def create_or_update(self, config: StreamConfig) -> Stream: ... - async def get(self, name: str) -> Stream: ... - async def delete(self, name: str) -> bool: ... - async def update(self, config: StreamConfig) -> Stream: ... + """Manager for JetStream stream CRUD operations.""" + + async def create(self, config: StreamConfig) -> Stream: + """Create a new stream. + + :param config: stream configuration. + :return: the created stream. + """ + + async def create_or_update(self, config: StreamConfig) -> Stream: + """Create a stream or update it if it already exists. + + :param config: stream configuration. + :return: the created or updated stream. + """ + + async def get(self, name: str) -> Stream: + """Get an existing stream by name. + + :param name: stream name. + :return: the stream. + """ + + async def delete(self, name: str) -> bool: + """Delete a stream. + + :param name: stream name. + :return: True if the stream was deleted. + """ + + async def update(self, config: StreamConfig) -> Stream: + """Update an existing stream configuration. + + :param config: new stream configuration. + :return: the updated stream. + """ @final class KVManager: - async def create(self, config: KVConfig) -> KeyValue: ... - async def create_or_update(self, config: KVConfig) -> KeyValue: ... - async def get(self, bucket: str) -> KeyValue: ... - async def delete(self, bucket: str) -> bool: ... - async def update(self, config: KVConfig) -> KeyValue: ... + """Manager for key-value bucket CRUD operations.""" + + async def create(self, config: KVConfig) -> KeyValue: + """Create a new key-value bucket. + + :param config: bucket configuration. + :return: the created key-value bucket. + """ + + async def create_or_update(self, config: KVConfig) -> KeyValue: + """Create a bucket or update it if it already exists. + + :param config: bucket configuration. + :return: the created or updated key-value bucket. + """ + + async def get(self, bucket: str) -> KeyValue: + """Get an existing key-value bucket by name. + + :param bucket: bucket name. + :return: the key-value bucket. + """ + + async def delete(self, bucket: str) -> bool: + """Delete a key-value bucket. + + :param bucket: bucket name. + :return: True if the bucket was deleted. + """ + + async def update(self, config: KVConfig) -> KeyValue: + """Update an existing key-value bucket configuration. + + :param config: new bucket configuration. + :return: the updated key-value bucket. + """ @final class ConsumersManager: + """Manager for JetStream consumer CRUD operations.""" + @overload async def create(self, config: PullConsumerConfig) -> PullConsumer: ... @overload @@ -44,14 +108,62 @@ class ConsumersManager: async def update(self, config: PullConsumerConfig) -> PullConsumer: ... @overload async def update(self, config: PushConsumerConfig) -> PushConsumer: ... - async def get_pull(self, name: str) -> PullConsumer: ... - async def get_push(self, name: str) -> PushConsumer: ... - async def delete(self, name: str) -> bool: ... - async def pause(self, name: str, delay: float | timedelta) -> bool: ... - async def resume(self, name: str) -> bool: ... + async def get_pull(self, name: str) -> PullConsumer: + """Get an existing pull consumer by name. + + :param name: consumer name. + :return: the pull consumer. + """ + + async def get_push(self, name: str) -> PushConsumer: + """Get an existing push consumer by name. + + :param name: consumer name. + :return: the push consumer. + """ + + async def delete(self, name: str) -> bool: + """Delete a consumer. + + :param name: consumer name. + :return: True if the consumer was deleted. + """ + + async def pause(self, name: str, delay: float | timedelta) -> bool: + """Pause a consumer for a specified duration. + + :param name: consumer name. + :param delay: duration to pause in seconds or as a timedelta. + :return: True if the consumer was paused. + """ + + async def resume(self, name: str) -> bool: + """Resume a paused consumer. + + :param name: consumer name. + :return: True if the consumer was resumed. + """ @final class ObjectStoreManager: - async def create(self, config: ObjectStoreConfig) -> ObjectStore: ... - async def get(self, bucket: str) -> ObjectStore: ... - async def delete(self, bucket: str) -> None: ... + """Manager for object store bucket operations.""" + + async def create(self, config: ObjectStoreConfig) -> ObjectStore: + """Create a new object store bucket. + + :param config: object store configuration. + :return: the created object store. + """ + + async def get(self, bucket: str) -> ObjectStore: + """Get an existing object store bucket by name. + + :param bucket: bucket name. + :return: the object store. + """ + + async def delete(self, bucket: str) -> None: + """Delete an object store bucket. + + :param bucket: bucket name. + """ diff --git a/python/natsrpy/_natsrpy_rs/js/object_store.pyi b/python/natsrpy/_natsrpy_rs/js/object_store.pyi index cc3b3bd..449e8c9 100644 --- a/python/natsrpy/_natsrpy_rs/js/object_store.pyi +++ b/python/natsrpy/_natsrpy_rs/js/object_store.pyi @@ -15,6 +15,19 @@ __all__ = [ @final class ObjectStoreConfig: + """Configuration for creating an object store bucket. + + Attributes: + bucket: bucket name. + description: human-readable bucket description. + max_age: maximum object age. + max_bytes: maximum total bucket size in bytes. + storage: storage backend type. + num_replicas: number of bucket replicas. + compression: whether S2 compression is enabled. + placement: cluster and tag placement hints. + """ + bucket: str description: str | None max_age: timedelta @@ -38,11 +51,36 @@ class ObjectStoreConfig: @final class ObjectLink: + """Reference link to another object or bucket. + + Attributes: + name: name of the linked object, or None for a bucket link. + bucket: name of the linked bucket. + """ + name: str | None bucket: str @final class ObjectInfo: + """Metadata for an object stored in the object store. + + Attributes: + name: object name. + description: human-readable object description. + metadata: custom key-value metadata. + headers: NATS message headers. + bucket: name of the containing bucket. + nuid: unique identifier for the object. + size: total object size in bytes. + chunks: number of chunks the object is split into. + modified: last modification timestamp. + digest: SHA-256 digest of the object content. + deleted: whether the object is deleted. + link: link reference if this is a linked object. + max_chunk_size: maximum chunk size in bytes, if set. + """ + name: str description: str | None metadata: dict[str, str] @@ -59,18 +97,42 @@ class ObjectInfo: @final class ObjectInfoIterator: + """Async iterator over object info entries.""" + def __aiter__(self) -> Self: ... async def __anext__(self) -> ObjectInfo: ... - async def next(self, timeout: float | timedelta | None = None) -> ObjectInfo: ... + async def next(self, timeout: float | timedelta | None = None) -> ObjectInfo: + """Receive the next object info entry. + + :param timeout: maximum time to wait in seconds or as a timedelta, + defaults to None (wait indefinitely). + :return: the next object info. + """ @final class ObjectStore: + """Handle for a NATS JetStream object store bucket. + + Provides methods for storing, retrieving, and managing large + objects that are chunked across NATS messages. + """ + async def get( self, name: str, writer: Writer[bytes], chunk_size: int | None = ..., # 24MB - ) -> None: ... + ) -> None: + """Download an object from the store. + + Writes the object content to the given writer in chunks. + + :param name: name of the object to retrieve. + :param writer: writable buffer that receives the object bytes. + :param chunk_size: size of read chunks in bytes, + defaults to 24 MB. + """ + async def put( self, name: str, @@ -79,14 +141,64 @@ class ObjectStore: description: str | None = None, headers: dict[str, str | list[str]] | None = None, metadata: dict[str, str] | None = None, - ) -> None: ... - async def delete(self, name: str) -> None: ... - async def seal(self) -> None: ... - async def get_info(self, name: str) -> ObjectInfo: ... - async def watch(self, with_history: bool = False) -> ObjectInfoIterator: ... - async def list(self) -> ObjectInfoIterator: ... - async def link_bucket(self, src_bucket: str, dest: str) -> ObjectInfo: ... - async def link_object(self, src: str, dest: str) -> ObjectInfo: ... + ) -> None: + """Upload an object to the store. + + :param name: name for the stored object. + :param value: object content. + :param chunk_size: size of upload chunks in bytes, + defaults to 24 MB. + :param description: human-readable object description. + :param headers: optional NATS headers. + :param metadata: optional custom key-value metadata. + """ + + async def delete(self, name: str) -> None: + """Delete an object from the store. + + :param name: name of the object to delete. + """ + + async def seal(self) -> None: + """Seal the object store, making it read-only.""" + + async def get_info(self, name: str) -> ObjectInfo: + """Get metadata for an object. + + :param name: name of the object. + :return: object metadata. + """ + + async def watch(self, with_history: bool = False) -> ObjectInfoIterator: + """Watch the object store for changes. + + :param with_history: when True, deliver all existing entries + before live updates, defaults to False. + :return: an async iterator over object info changes. + """ + + async def list(self) -> ObjectInfoIterator: + """List all objects in the store. + + :return: an async iterator over object info entries. + """ + + async def link_bucket(self, src_bucket: str, dest: str) -> ObjectInfo: + """Create a link to another object store bucket. + + :param src_bucket: name of the source bucket to link. + :param dest: name for the link in this store. + :return: object info for the created link. + """ + + async def link_object(self, src: str, dest: str) -> ObjectInfo: + """Create a link to another object in the store. + + :param src: name of the source object to link. + :param dest: name for the link in this store. + :return: object info for the created link. + """ + async def update_metadata( self, name: str, @@ -94,4 +206,13 @@ class ObjectStore: description: str | None = None, headers: dict[str, Any] | None = None, metadata: dict[str, str] | None = None, - ) -> ObjectInfo: ... + ) -> ObjectInfo: + """Update the metadata of an existing object. + + :param name: current name of the object. + :param new_name: optional new name for the object. + :param description: new description, if provided. + :param headers: new headers, if provided. + :param metadata: new custom metadata, if provided. + :return: updated object info. + """ diff --git a/python/natsrpy/_natsrpy_rs/js/stream.pyi b/python/natsrpy/_natsrpy_rs/js/stream.pyi index afc0489..36a19a6 100644 --- a/python/natsrpy/_natsrpy_rs/js/stream.pyi +++ b/python/natsrpy/_natsrpy_rs/js/stream.pyi @@ -29,32 +29,77 @@ __all__ = [ @final class StorageType: + """Stream storage backend type. + + Attributes: + FILE: persistent file-based storage. + MEMORY: in-memory storage. + """ + FILE: StorageType MEMORY: StorageType @final class DiscardPolicy: + """Policy for discarding messages when stream limits are reached. + + Attributes: + OLD: discard the oldest messages first. + NEW: reject new messages when the limit is reached. + """ + OLD: DiscardPolicy NEW: DiscardPolicy @final class RetentionPolicy: + """Stream message retention policy. + + Attributes: + LIMITS: retain messages until size or age limits are hit. + INTEREST: retain messages until all known consumers have acknowledged. + WORKQUEUE: retain messages until acknowledged by a single consumer. + """ + LIMITS: RetentionPolicy INTEREST: RetentionPolicy WORKQUEUE: RetentionPolicy @final class Compression: + """Stream data compression algorithm. + + Attributes: + S2: S2 compression. + NONE: no compression. + """ + S2: Compression NONE: Compression @final class PersistenceMode: + """Write persistence mode for stream messages. + + Attributes: + Default: synchronous write to disk. + Async: asynchronous write to disk. + """ + Default: PersistenceMode Async: PersistenceMode @final class ConsumerLimits: + """Default consumer limits applied to consumers on this stream. + + Attributes: + inactive_threshold: duration after which an inactive consumer + is removed. + max_ack_pending: maximum number of outstanding unacknowledged + messages. + """ + inactive_threshold: timedelta max_ack_pending: int @@ -62,6 +107,13 @@ class ConsumerLimits: @final class External: + """External stream reference for cross-account or cross-domain access. + + Attributes: + api_prefix: API prefix for the external stream. + delivery_prefix: optional delivery prefix override. + """ + api_prefix: str delivery_prefix: str | None @@ -69,6 +121,13 @@ class External: @final class SubjectTransform: + """Subject mapping transformation rule. + + Attributes: + source: source subject pattern. + destination: destination subject pattern. + """ + source: str destination: str @@ -76,6 +135,18 @@ class SubjectTransform: @final class Source: + """Configuration for a stream source or mirror origin. + + Attributes: + name: name of the source stream. + filter_subject: optional subject filter applied to the source. + external: optional external stream reference. + start_sequence: optional starting sequence number. + start_time: optional starting timestamp. + domain: optional JetStream domain. + subject_transforms: optional subject transformation rule. + """ + name: str filter_subject: str | None = None external: External | None = None @@ -97,6 +168,13 @@ class Source: @final class Placement: + """Placement hints for stream replicas across clusters and tags. + + Attributes: + cluster: preferred cluster name. + tags: server tags used for placement. + """ + cluster: str | None tags: list[str] | None @@ -108,6 +186,14 @@ class Placement: @final class Republish: + """Republish configuration for echoing stream messages to other subjects. + + Attributes: + source: source subject filter. + destination: destination subject to republish to. + headers_only: when True, only headers are republished. + """ + source: str destination: str headers_only: bool @@ -116,6 +202,54 @@ class Republish: @final class StreamConfig: + """Configuration for creating or updating a JetStream stream. + + Attributes: + name: stream name. + subjects: list of subjects the stream listens on. + max_bytes: maximum total size of the stream in bytes. + max_messages: maximum number of messages in the stream. + max_messages_per_subject: maximum messages per subject. + discard: policy for discarding messages when limits are reached. + discard_new_per_subject: when True, apply discard policy per + subject. + retention: message retention policy. + max_consumers: maximum number of consumers. + max_age: maximum message age. + max_message_size: maximum size of a single message in bytes. + storage: storage backend type. + num_replicas: number of stream replicas. + no_ack: when True, disable publish acknowledgements. + duplicate_window: time window for duplicate detection. + template_owner: name of the owning stream template. + sealed: when True, the stream is read-only. + description: human-readable stream description. + allow_rollup: when True, allow ``Nats-Rollup`` header to purge + subjects. + deny_delete: when True, deny message deletion via the API. + deny_purge: when True, deny stream purge via the API. + republish: configuration for republishing messages. + allow_direct: when True, enable direct get API for the stream. + mirror_direct: when True, enable direct get for mirror streams. + mirror: source configuration when the stream is a mirror. + sources: list of source configurations for aggregate streams. + metadata: custom key-value metadata. + subject_transform: subject transformation rule. + compression: compression algorithm for stored messages. + consumer_limits: default limits applied to new consumers. + first_sequence: initial sequence number for the stream. + placement: cluster and tag placement hints. + persist_mode: write persistence mode. + pause_until: timestamp until which the stream is paused. + allow_message_ttl: when True, allow per-message TTL. + subject_delete_marker_ttl: TTL for subject delete markers. + allow_atomic_publish: when True, enable atomic multi-message + publish. + allow_message_schedules: when True, enable scheduled message + delivery. + allow_message_counter: when True, enable message counter header. + """ + name: str subjects: list[str] max_bytes: int | None @@ -201,6 +335,16 @@ class StreamConfig: @final class StreamMessage: + """A raw message stored in a JetStream stream. + + Attributes: + subject: subject the message was published to. + sequence: sequence number of the message. + headers: message headers dictionary. + payload: message payload. + time: timestamp when the message was stored. + """ + subject: str sequence: int headers: dict[str, Any] @@ -209,6 +353,21 @@ class StreamMessage: @final class StreamState: + """Current state snapshot of a JetStream stream. + + Attributes: + messages: total number of messages in the stream. + bytes: total size of the stream in bytes. + first_sequence: sequence number of the first message. + first_timestamp: timestamp of the first message. + last_sequence: sequence number of the last message. + last_timestamp: timestamp of the last message. + consumer_count: number of consumers bound to the stream. + subjects_count: number of unique subjects in the stream. + deleted_count: number of deleted messages, if available. + deleted: list of deleted sequence numbers, if available. + """ + messages: int bytes: int first_sequence: int @@ -222,6 +381,17 @@ class StreamState: @final class SourceInfo: + """Runtime information about a stream source. + + Attributes: + name: name of the source stream. + lag: number of messages the source is behind. + active: duration since the last activity. + filter_subject: subject filter applied to the source. + subject_transform_dest: destination of the subject transform. + subject_transforms: list of subject transformation rules. + """ + name: str lag: int active: timedelta | None @@ -231,6 +401,16 @@ class SourceInfo: @final class PeerInfo: + """Information about a stream cluster peer. + + Attributes: + name: name of the peer server. + current: whether the peer is up to date. + active: duration since the last peer activity. + offline: whether the peer is offline. + lag: number of messages the peer is behind, if known. + """ + name: str current: bool active: timedelta @@ -239,6 +419,18 @@ class PeerInfo: @final class ClusterInfo: + """Cluster information for a JetStream stream. + + Attributes: + name: cluster name. + raft_group: Raft group name for the stream. + leader: name of the current leader server. + leader_since: timestamp when the current leader was elected. + system_account: whether this belongs to the system account. + traffic_account: traffic accounting identifier. + replicas: list of peer replicas. + """ + name: str | None raft_group: str | None leader: str | None @@ -249,6 +441,17 @@ class ClusterInfo: @final class StreamInfo: + """Detailed information about a JetStream stream. + + Attributes: + config: stream configuration. + created: timestamp when the stream was created. + state: current stream state. + cluster: cluster information, if applicable. + mirror: mirror source info, if the stream is a mirror. + sources: list of source info for aggregate streams. + """ + config: StreamConfig created: float state: StreamState @@ -258,23 +461,29 @@ class StreamInfo: @final class Stream: + """A JetStream stream handle. + + Provides methods for inspecting, purging, and directly + accessing messages in the stream, as well as managing consumers. + """ + async def direct_get( self, sequence: int, timeout: float | datetime | None = None, ) -> StreamMessage: - """ - Get direct message from the stream. + """Get a message directly from the stream by sequence number. :param sequence: sequence number of the message to get. - :return: Message. + :param timeout: operation timeout. + :return: the stream message. """ async def get_info(self, timeout: float | datetime | None = None) -> StreamInfo: - """ - Get information about the stream. + """Get information about the stream. - :return: Stream info. + :param timeout: operation timeout. + :return: stream info. """ async def purge( @@ -284,15 +493,18 @@ class Stream: keep: int | None = None, timeout: float | datetime | None = None, ) -> int: - """ - Purge current stream. - - :param filter: filter of subjects to purge, defaults to None - :param sequence: Message sequence to purge up to (inclusive), defaults to None - :param keep: Message count to keep starting from the end of the stream, - defaults to None - :return: number of messages purged + """Purge messages from the stream. + + :param filter: subject filter for messages to purge, + defaults to None. + :param sequence: purge messages up to this sequence number + inclusive, defaults to None. + :param keep: number of messages to keep starting from the end + of the stream, defaults to None. + :param timeout: operation timeout. + :return: number of messages purged. """ @property - def consumers(self) -> ConsumersManager: ... + def consumers(self) -> ConsumersManager: + """Manager for consumers bound to this stream."""