Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ log = "0.4.29"
pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] }
pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] }
pyo3-log = "0.13.3"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.149"
thiserror = "2.0.18"
time = "0.3.47"
tokio = { version = "1.50.0", features = ["full"] }
26 changes: 18 additions & 8 deletions python/natsrpy/_natsrpy_rs/js/__init__.pyi
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
from datetime import datetime, timedelta
from typing import Any, Literal, final, overload

from . import consumers, kv, managers, object_store, stream
from .managers import KVManager, ObjectStoreManager, StreamsManager
from . import consumers, counters, kv, managers, object_store, stream
from .managers import CountersManager, KVManager, ObjectStoreManager, StreamsManager

__all__ = [
"JetStream",
"JetStreamMessage",
"Publication",
"consumers",
"counters",
"kv",
"managers",
"object_store",
Expand All @@ -24,14 +25,19 @@ class Publication:
sequence: sequence number assigned to the message in the stream.
domain: JetStream domain of the stream.
duplicate: whether the server detected this as a duplicate message.
value: optional metadata value returned by the server.
value: counter value. Only used if counters are enabled.
"""

stream: str
sequence: int
domain: str
duplicate: bool
value: str | None
@property
def stream(self) -> str: ...
@property
def sequence(self) -> int: ...
@property
def domain(self) -> str: ...
@property
def duplicate(self) -> bool: ...
@property
def value(self) -> str | None: ...

@final
class JetStream:
Expand Down Expand Up @@ -83,6 +89,10 @@ class JetStream:
def object_store(self) -> ObjectStoreManager:
"""Manager for object store buckets."""

@property
def counters(self) -> CountersManager:
"""Manager for streams with CRDT counter support."""

@final
class JetStreamMessage:
"""Message received from a JetStream consumer.
Expand Down
185 changes: 185 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/counters.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
from datetime import timedelta
from typing import final

from typing_extensions import Self

from .stream import (
Compression,
ConsumerLimits,
DiscardPolicy,
PersistenceMode,
Placement,
Republish,
RetentionPolicy,
Source,
StorageType,
SubjectTransform,
)

__all__ = ["CounterEntry", "Counters", "CountersConfig"]

@final
class CountersConfig:
"""Configuration for creating or updating a JetStream stream.

This config is almost the same as `StreamConfig`,
but it has 2 predefined values;

* allow_message_counter=true
* allow_direct=true

These two are required for counters API to work.

Attributes:
name: stream name.
subjects: list of subjects the stream listens on.
max_bytes: maximum total size of the stream in bytes.
max_messages: maximum number of messages in the stream.
max_messages_per_subject: maximum messages per subject.
discard: policy for discarding messages when limits are reached.
discard_new_per_subject: when True, apply discard policy per
subject.
retention: message retention policy.
max_consumers: maximum number of consumers.
max_age: maximum message age.
max_message_size: maximum size of a single message in bytes.
storage: storage backend type.
num_replicas: number of stream replicas.
no_ack: when True, disable publish acknowledgements.
duplicate_window: time window for duplicate detection.
template_owner: name of the owning stream template.
sealed: when True, the stream is read-only.
description: human-readable stream description.
allow_rollup: when True, allow ``Nats-Rollup`` header to purge
subjects.
deny_delete: when True, deny message deletion via the API.
deny_purge: when True, deny stream purge via the API.
republish: configuration for republishing messages.
mirror_direct: when True, enable direct get for mirror streams.
mirror: source configuration when the stream is a mirror.
sources: list of source configurations for aggregate streams.
metadata: custom key-value metadata.
subject_transform: subject transformation rule.
compression: compression algorithm for stored messages.
consumer_limits: default limits applied to new consumers.
first_sequence: initial sequence number for the stream.
placement: cluster and tag placement hints.
persist_mode: write persistence mode.
pause_until: timestamp until which the stream is paused.
allow_message_ttl: when True, allow per-message TTL.
subject_delete_marker_ttl: TTL for subject delete markers.
allow_atomic_publish: when True, enable atomic multi-message
publish.
allow_message_schedules: when True, enable scheduled message
delivery.
"""

name: str
subjects: list[str]
max_bytes: int | None
max_messages: int | None
max_messages_per_subject: int | None
discard: DiscardPolicy | None
discard_new_per_subject: bool | None
retention: RetentionPolicy | None
max_consumers: int | None
max_age: timedelta | None
max_message_size: int | None
storage: StorageType | None
num_replicas: int | None
no_ack: bool | None
duplicate_window: timedelta | None
template_owner: str | None
sealed: bool | None
description: str | None
allow_rollup: bool | None
deny_delete: bool | None
deny_purge: bool | None
republish: Republish | None
mirror_direct: bool | None
mirror: Source | None
sources: list[Source] | None
metadata: dict[str, str] | None
subject_transform: SubjectTransform | None
compression: Compression | None
consumer_limits: ConsumerLimits | None
first_sequence: int | None
placement: Placement | None
persist_mode: PersistenceMode | None
pause_until: int | None
allow_message_ttl: bool | None
subject_delete_marker_ttl: timedelta | None
allow_atomic_publish: bool | None
allow_message_schedules: bool | None

def __new__(
cls,
name: str,
subjects: list[str],
max_bytes: int | None = None,
max_messages: int | None = None,
max_messages_per_subject: int | None = None,
discard: DiscardPolicy | None = None,
discard_new_per_subject: bool | None = None,
retention: RetentionPolicy | None = None,
max_consumers: int | None = None,
max_age: float | timedelta | None = None,
max_message_size: int | None = None,
storage: StorageType | None = None,
num_replicas: int | None = None,
no_ack: bool | None = None,
duplicate_window: float | timedelta | None = None,
template_owner: str | None = None,
sealed: bool | None = None,
description: str | None = None,
allow_rollup: bool | None = None,
deny_delete: bool | None = None,
deny_purge: bool | None = None,
republish: Republish | None = None,
mirror_direct: bool | None = None,
mirror: Source | None = None,
sources: list[Source] | None = None,
metadata: dict[str, str] | None = None,
subject_transform: SubjectTransform | None = None,
compression: Compression | None = None,
consumer_limits: ConsumerLimits | None = None,
first_sequence: int | None = None,
placement: Placement | None = None,
persist_mode: PersistenceMode | None = None,
pause_until: int | None = None,
allow_message_ttl: bool | None = None,
subject_delete_marker_ttl: float | timedelta | None = None,
allow_atomic_publish: bool | None = None,
allow_message_schedules: bool | None = None,
) -> Self: ...

@final
class CounterEntry:
subject: str
value: int
sources: dict[str, dict[str, int]]
increment: int | None

@final
class Counters:
async def add(
self,
key: str,
value: int,
timeout: float | timedelta | None = None,
) -> int: ...
async def incr(
self,
key: str,
timeout: float | timedelta | None = None,
) -> int: ...
async def decr(
self,
key: str,
timeout: float | timedelta | None = None,
) -> int: ...
async def get(
self,
key: str,
timeout: float | timedelta | None = None,
) -> CounterEntry: ...
41 changes: 41 additions & 0 deletions python/natsrpy/_natsrpy_rs/js/managers.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ from .consumers import (
PushConsumer,
PushConsumerConfig,
)
from .counters import Counters, CountersConfig
from .kv import KeyValue, KVConfig
from .object_store import ObjectStore, ObjectStoreConfig
from .stream import Stream, StreamConfig

__all__ = [
"ConsumersManager",
"CountersManager",
"KVManager",
"ObjectStoreManager",
"StreamsManager",
Expand Down Expand Up @@ -57,6 +59,45 @@ class StreamsManager:
:return: the updated stream.
"""

@final
class CountersManager:
"""Manager for JetStream stream with counters support CRUD operations."""

async def create(self, config: CountersConfig) -> Counters:
"""Create a new counters stream.

:param config: stream configuration.
:return: the created stream.
"""

async def create_or_update(self, config: CountersConfig) -> Counters:
"""Create a counters stream or update it if it already exists.

:param config: stream configuration.
:return: the created or updated stream.
"""

async def get(self, name: str) -> Counters:
"""Get an existing counters stream by name.

:param name: stream name.
:return: the stream.
"""

async def delete(self, name: str) -> bool:
"""Delete a counters stream.

:param name: stream name.
:return: True if the stream was deleted.
"""

async def update(self, config: CountersConfig) -> Counters:
"""Update an existing counters stream configuration.

:param config: new stream configuration.
:return: the updated stream.
"""

@final
class KVManager:
"""Manager for key-value bucket CRUD operations."""
Expand Down
4 changes: 4 additions & 0 deletions python/natsrpy/js.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
PushConsumerConfig,
ReplayPolicy,
)
from ._natsrpy_rs.js.counters import CounterEntry, Counters, CountersConfig
from ._natsrpy_rs.js.kv import (
KeysIterator,
KeyValue,
Expand Down Expand Up @@ -53,6 +54,9 @@
"ClusterInfo",
"Compression",
"ConsumerLimits",
"CounterEntry",
"Counters",
"CountersConfig",
"DeliverPolicy",
"DiscardPolicy",
"External",
Expand Down
6 changes: 6 additions & 0 deletions src/exceptions/rust_err.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ pub enum NatsrpyError {
#[error(transparent)]
StdIOError(#[from] std::io::Error),
#[error(transparent)]
StdParseIntError(#[from] std::num::ParseIntError),
#[error(transparent)]
JSONParseError(#[from] serde_json::Error),
#[error(transparent)]
UnknownError(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("NATS session error: {0}")]
SessionError(String),
Expand Down Expand Up @@ -79,6 +83,8 @@ pub enum NatsrpyError {
#[error(transparent)]
StreamPurgeError(#[from] async_nats::jetstream::stream::PurgeError),
#[error(transparent)]
StreamLastRawMessageError(#[from] async_nats::jetstream::stream::LastRawMessageError),
#[error(transparent)]
PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError),
#[error(transparent)]
ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),
Expand Down
Loading
Loading