Skip to content

Commit 24d14ba

Browse files
committed
Added CRDT Counters support.
1 parent 7d5289a commit 24d14ba

File tree

10 files changed

+690
-9
lines changed

10 files changed

+690
-9
lines changed

python/natsrpy/_natsrpy_rs/js/__init__.pyi

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
from datetime import datetime, timedelta
22
from typing import Any, Literal, final, overload
33

4-
from . import consumers, kv, managers, object_store, stream
5-
from .managers import KVManager, ObjectStoreManager, StreamsManager
4+
from . import consumers, counters, kv, managers, object_store, stream
5+
from .managers import CountersManager, KVManager, ObjectStoreManager, StreamsManager
66

77
__all__ = [
88
"JetStream",
99
"JetStreamMessage",
1010
"Publication",
1111
"consumers",
12+
"counters",
1213
"kv",
1314
"managers",
1415
"object_store",
@@ -24,14 +25,19 @@ class Publication:
2425
sequence: sequence number assigned to the message in the stream.
2526
domain: JetStream domain of the stream.
2627
duplicate: whether the server detected this as a duplicate message.
27-
value: optional metadata value returned by the server.
28+
value: counter value. Only used if counters are enabled.
2829
"""
2930

30-
stream: str
31-
sequence: int
32-
domain: str
33-
duplicate: bool
34-
value: str | None
31+
@property
32+
def stream(self) -> str: ...
33+
@property
34+
def sequence(self) -> int: ...
35+
@property
36+
def domain(self) -> str: ...
37+
@property
38+
def duplicate(self) -> bool: ...
39+
@property
40+
def value(self) -> str | None: ...
3541

3642
@final
3743
class JetStream:
@@ -83,6 +89,10 @@ class JetStream:
8389
def object_store(self) -> ObjectStoreManager:
8490
"""Manager for object store buckets."""
8591

92+
@property
93+
def counters(self) -> CountersManager:
94+
"""Manager for streams with CRDT counter support."""
95+
8696
@final
8797
class JetStreamMessage:
8898
"""Message received from a JetStream consumer.
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
from datetime import timedelta
2+
from typing import final
3+
4+
from typing_extensions import Self
5+
6+
from .stream import (
7+
Compression,
8+
ConsumerLimits,
9+
DiscardPolicy,
10+
PersistenceMode,
11+
Placement,
12+
Republish,
13+
RetentionPolicy,
14+
Source,
15+
StorageType,
16+
SubjectTransform,
17+
)
18+
19+
__all__ = ["Counters", "CountersConfig"]
20+
21+
@final
22+
class CountersConfig:
23+
"""Configuration for creating or updating a JetStream stream.
24+
25+
This config is almost the same as `StreamConfig`,
26+
but it has 2 predefined values;
27+
28+
* allow_message_counter=true
29+
* allow_direct=true
30+
31+
These two are required for counters API to work.
32+
33+
Attributes:
34+
name: stream name.
35+
subjects: list of subjects the stream listens on.
36+
max_bytes: maximum total size of the stream in bytes.
37+
max_messages: maximum number of messages in the stream.
38+
max_messages_per_subject: maximum messages per subject.
39+
discard: policy for discarding messages when limits are reached.
40+
discard_new_per_subject: when True, apply discard policy per
41+
subject.
42+
retention: message retention policy.
43+
max_consumers: maximum number of consumers.
44+
max_age: maximum message age.
45+
max_message_size: maximum size of a single message in bytes.
46+
storage: storage backend type.
47+
num_replicas: number of stream replicas.
48+
no_ack: when True, disable publish acknowledgements.
49+
duplicate_window: time window for duplicate detection.
50+
template_owner: name of the owning stream template.
51+
sealed: when True, the stream is read-only.
52+
description: human-readable stream description.
53+
allow_rollup: when True, allow ``Nats-Rollup`` header to purge
54+
subjects.
55+
deny_delete: when True, deny message deletion via the API.
56+
deny_purge: when True, deny stream purge via the API.
57+
republish: configuration for republishing messages.
58+
mirror_direct: when True, enable direct get for mirror streams.
59+
mirror: source configuration when the stream is a mirror.
60+
sources: list of source configurations for aggregate streams.
61+
metadata: custom key-value metadata.
62+
subject_transform: subject transformation rule.
63+
compression: compression algorithm for stored messages.
64+
consumer_limits: default limits applied to new consumers.
65+
first_sequence: initial sequence number for the stream.
66+
placement: cluster and tag placement hints.
67+
persist_mode: write persistence mode.
68+
pause_until: timestamp until which the stream is paused.
69+
allow_message_ttl: when True, allow per-message TTL.
70+
subject_delete_marker_ttl: TTL for subject delete markers.
71+
allow_atomic_publish: when True, enable atomic multi-message
72+
publish.
73+
allow_message_schedules: when True, enable scheduled message
74+
delivery.
75+
"""
76+
77+
name: str
78+
subjects: list[str]
79+
max_bytes: int | None
80+
max_messages: int | None
81+
max_messages_per_subject: int | None
82+
discard: DiscardPolicy | None
83+
discard_new_per_subject: bool | None
84+
retention: RetentionPolicy | None
85+
max_consumers: int | None
86+
max_age: timedelta | None
87+
max_message_size: int | None
88+
storage: StorageType | None
89+
num_replicas: int | None
90+
no_ack: bool | None
91+
duplicate_window: timedelta | None
92+
template_owner: str | None
93+
sealed: bool | None
94+
description: str | None
95+
allow_rollup: bool | None
96+
deny_delete: bool | None
97+
deny_purge: bool | None
98+
republish: Republish | None
99+
mirror_direct: bool | None
100+
mirror: Source | None
101+
sources: list[Source] | None
102+
metadata: dict[str, str] | None
103+
subject_transform: SubjectTransform | None
104+
compression: Compression | None
105+
consumer_limits: ConsumerLimits | None
106+
first_sequence: int | None
107+
placement: Placement | None
108+
persist_mode: PersistenceMode | None
109+
pause_until: int | None
110+
allow_message_ttl: bool | None
111+
subject_delete_marker_ttl: timedelta | None
112+
allow_atomic_publish: bool | None
113+
allow_message_schedules: bool | None
114+
115+
def __new__(
116+
cls,
117+
name: str,
118+
subjects: list[str],
119+
max_bytes: int | None = None,
120+
max_messages: int | None = None,
121+
max_messages_per_subject: int | None = None,
122+
discard: DiscardPolicy | None = None,
123+
discard_new_per_subject: bool | None = None,
124+
retention: RetentionPolicy | None = None,
125+
max_consumers: int | None = None,
126+
max_age: float | timedelta | None = None,
127+
max_message_size: int | None = None,
128+
storage: StorageType | None = None,
129+
num_replicas: int | None = None,
130+
no_ack: bool | None = None,
131+
duplicate_window: float | timedelta | None = None,
132+
template_owner: str | None = None,
133+
sealed: bool | None = None,
134+
description: str | None = None,
135+
allow_rollup: bool | None = None,
136+
deny_delete: bool | None = None,
137+
deny_purge: bool | None = None,
138+
republish: Republish | None = None,
139+
mirror_direct: bool | None = None,
140+
mirror: Source | None = None,
141+
sources: list[Source] | None = None,
142+
metadata: dict[str, str] | None = None,
143+
subject_transform: SubjectTransform | None = None,
144+
compression: Compression | None = None,
145+
consumer_limits: ConsumerLimits | None = None,
146+
first_sequence: int | None = None,
147+
placement: Placement | None = None,
148+
persist_mode: PersistenceMode | None = None,
149+
pause_until: int | None = None,
150+
allow_message_ttl: bool | None = None,
151+
subject_delete_marker_ttl: float | timedelta | None = None,
152+
allow_atomic_publish: bool | None = None,
153+
allow_message_schedules: bool | None = None,
154+
) -> Self: ...
155+
156+
@final
157+
class Counters:
158+
async def add(
159+
self,
160+
key: str,
161+
value: int,
162+
timeout: float | timedelta | None = None,
163+
) -> int: ...
164+
async def incr(
165+
self,
166+
key: str,
167+
timeout: float | timedelta | None = None,
168+
) -> int: ...
169+
async def decr(
170+
self,
171+
key: str,
172+
timeout: float | timedelta | None = None,
173+
) -> int: ...

python/natsrpy/_natsrpy_rs/js/managers.pyi

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ from .consumers import (
77
PushConsumer,
88
PushConsumerConfig,
99
)
10+
from .counters import Counters, CountersConfig
1011
from .kv import KeyValue, KVConfig
1112
from .object_store import ObjectStore, ObjectStoreConfig
1213
from .stream import Stream, StreamConfig
1314

1415
__all__ = [
1516
"ConsumersManager",
17+
"CountersManager",
1618
"KVManager",
1719
"ObjectStoreManager",
1820
"StreamsManager",
@@ -57,6 +59,45 @@ class StreamsManager:
5759
:return: the updated stream.
5860
"""
5961

62+
@final
63+
class CountersManager:
64+
"""Manager for JetStream stream with counters support CRUD operations."""
65+
66+
async def create(self, config: CountersConfig) -> Counters:
67+
"""Create a new counters stream.
68+
69+
:param config: stream configuration.
70+
:return: the created stream.
71+
"""
72+
73+
async def create_or_update(self, config: CountersConfig) -> Counters:
74+
"""Create a counters stream or update it if it already exists.
75+
76+
:param config: stream configuration.
77+
:return: the created or updated stream.
78+
"""
79+
80+
async def get(self, name: str) -> Counters:
81+
"""Get an existing counters stream by name.
82+
83+
:param name: stream name.
84+
:return: the stream.
85+
"""
86+
87+
async def delete(self, name: str) -> bool:
88+
"""Delete a counters stream.
89+
90+
:param name: stream name.
91+
:return: True if the stream was deleted.
92+
"""
93+
94+
async def update(self, config: CountersConfig) -> Counters:
95+
"""Update an existing counters stream configuration.
96+
97+
:param config: new stream configuration.
98+
:return: the updated stream.
99+
"""
100+
60101
@final
61102
class KVManager:
62103
"""Manager for key-value bucket CRUD operations."""

python/natsrpy/js.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
PushConsumerConfig,
1111
ReplayPolicy,
1212
)
13+
from ._natsrpy_rs.js.counters import Counters, CountersConfig
1314
from ._natsrpy_rs.js.kv import (
1415
KeysIterator,
1516
KeyValue,
@@ -53,6 +54,8 @@
5354
"ClusterInfo",
5455
"Compression",
5556
"ConsumerLimits",
57+
"Counters",
58+
"CountersConfig",
5659
"DeliverPolicy",
5760
"DiscardPolicy",
5861
"External",

src/exceptions/rust_err.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ pub enum NatsrpyError {
99
#[error(transparent)]
1010
StdIOError(#[from] std::io::Error),
1111
#[error(transparent)]
12+
StdParseIntError(#[from] std::num::ParseIntError),
13+
#[error(transparent)]
1214
UnknownError(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
1315
#[error("NATS session error: {0}")]
1416
SessionError(String),
@@ -79,6 +81,8 @@ pub enum NatsrpyError {
7981
#[error(transparent)]
8082
StreamPurgeError(#[from] async_nats::jetstream::stream::PurgeError),
8183
#[error(transparent)]
84+
StreamLastRawMessageError(#[from] async_nats::jetstream::stream::LastRawMessageError),
85+
#[error(transparent)]
8286
PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError),
8387
#[error(transparent)]
8488
ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError),

0 commit comments

Comments
 (0)