Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 62 additions & 4 deletions python/natsrpy/_natsrpy_rs/js/counters.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -155,31 +155,89 @@ class CountersConfig:

@final
class CounterEntry:
"""A single counter entry retrieved from a counters stream.

Holds the current aggregated value for a counter subject along
with metadata about cross-stream sources and the last increment.

Attributes:
subject: the subject this counter entry belongs to.
value: the current aggregated counter value.
sources: mapping of source stream names to their per-subject
counter contributions.
increment: the value of the last increment applied, or ``None``
when the entry was retrieved via ``Counters.get``.
"""

subject: str
value: int
sources: dict[str, dict[str, int]]
increment: int | None

@final
class Counters:
"""Handle for a JetStream counters stream.

Provides atomic increment, decrement, and retrieval operations
on CRDT counters backed by a JetStream stream with
``allow_message_counter`` enabled.
"""

async def add(
self,
key: str,
value: int,
timeout: float | timedelta | None = None,
) -> int: ...
) -> int:
"""Add an arbitrary value to a counter.

:param key: subject key identifying the counter.
:param value: integer amount to add (may be negative).
:param timeout: optional operation timeout in seconds or as
a timedelta.
:return: the new counter value after the addition.
"""

async def incr(
self,
key: str,
timeout: float | timedelta | None = None,
) -> int: ...
) -> int:
"""Increment a counter by one.

Shorthand for ``add(key, 1)``.

:param key: subject key identifying the counter.
:param timeout: optional operation timeout in seconds or as
a timedelta.
:return: the new counter value after the increment.
"""

async def decr(
self,
key: str,
timeout: float | timedelta | None = None,
) -> int: ...
) -> int:
"""Decrement a counter by one.

Shorthand for ``add(key, -1)``.

:param key: subject key identifying the counter.
:param timeout: optional operation timeout in seconds or as
a timedelta.
:return: the new counter value after the decrement.
"""

async def get(
self,
key: str,
timeout: float | timedelta | None = None,
) -> CounterEntry: ...
) -> CounterEntry:
"""Retrieve the current value of a counter.

:param key: subject key identifying the counter.
:param timeout: optional operation timeout in seconds or as
a timedelta.
:return: counter entry with the current value and metadata.
:raises Exception: if no counter entry exists for the key.
"""
225 changes: 225 additions & 0 deletions python/tests/test_counters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import uuid

import pytest
from natsrpy.js import CounterEntry, Counters, CountersConfig, JetStream


async def test_counters_create(js: JetStream) -> None:
name = f"test-cnt-create-{uuid.uuid4().hex[:8]}"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
assert isinstance(counters, Counters)
finally:
await js.counters.delete(name)


async def test_counters_create_or_update(js: JetStream) -> None:
name = f"test-cnt-cou-{uuid.uuid4().hex[:8]}"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create_or_update(config)
try:
assert isinstance(counters, Counters)
config.description = "updated"
counters2 = await js.counters.create_or_update(config)
assert isinstance(counters2, Counters)
finally:
await js.counters.delete(name)


async def test_counters_get(js: JetStream) -> None:
name = f"test-cnt-get-{uuid.uuid4().hex[:8]}"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
await js.counters.create(config)
try:
counters = await js.counters.get(name)
assert isinstance(counters, Counters)
finally:
await js.counters.delete(name)


async def test_counters_delete(js: JetStream) -> None:
name = f"test-cnt-del-{uuid.uuid4().hex[:8]}"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
await js.counters.create(config)
result = await js.counters.delete(name)
assert result is True


async def test_counters_update(js: JetStream) -> None:
name = f"test-cnt-upd-{uuid.uuid4().hex[:8]}"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
await js.counters.create(config)
try:
update_cfg = CountersConfig(
name=name,
subjects=[f"{name}.>"],
description="updated description",
)
counters = await js.counters.update(update_cfg)
assert isinstance(counters, Counters)
finally:
await js.counters.delete(name)


async def test_counters_incr(js: JetStream) -> None:
name = f"test-cnt-incr-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
value = await counters.incr(subj)
assert value == 1
finally:
await js.counters.delete(name)


async def test_counters_decr(js: JetStream) -> None:
name = f"test-cnt-decr-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
value = await counters.decr(subj)
assert value == -1
finally:
await js.counters.delete(name)


async def test_counters_add(js: JetStream) -> None:
name = f"test-cnt-add-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
value = await counters.add(subj, 10)
assert value == 10
finally:
await js.counters.delete(name)


async def test_counters_add_negative(js: JetStream) -> None:
name = f"test-cnt-addneg-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
value = await counters.add(subj, -5)
assert value == -5
finally:
await js.counters.delete(name)


async def test_counters_get_entry(js: JetStream) -> None:
name = f"test-cnt-gete-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
await counters.incr(subj)
entry = await counters.get(subj)
assert isinstance(entry, CounterEntry)
assert entry.subject == subj
assert entry.value == 1
finally:
await js.counters.delete(name)


async def test_counter_entry_attributes(js: JetStream) -> None:
name = f"test-cnt-attr-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
await counters.add(subj, 5)
entry = await counters.get(subj)
assert isinstance(entry.subject, str)
assert isinstance(entry.value, int)
assert isinstance(entry.sources, dict)
assert entry.increment is None or isinstance(entry.increment, int)
finally:
await js.counters.delete(name)


async def test_counters_multiple_increments(js: JetStream) -> None:
name = f"test-cnt-multi-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
val1 = await counters.incr(subj)
val2 = await counters.incr(subj)
val3 = await counters.incr(subj)
assert val1 == 1
assert val2 == 2
assert val3 == 3
entry = await counters.get(subj)
assert entry.value == 3
finally:
await js.counters.delete(name)


async def test_counters_incr_then_decr(js: JetStream) -> None:
name = f"test-cnt-incdec-{uuid.uuid4().hex[:8]}"
subj = f"{name}.hits"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
await counters.incr(subj)
await counters.incr(subj)
await counters.decr(subj)
entry = await counters.get(subj)
assert entry.value == 1
finally:
await js.counters.delete(name)


async def test_counters_separate_subjects(js: JetStream) -> None:
name = f"test-cnt-sep-{uuid.uuid4().hex[:8]}"
subj_a = f"{name}.a"
subj_b = f"{name}.b"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
await counters.add(subj_a, 10)
await counters.add(subj_b, 20)
entry_a = await counters.get(subj_a)
entry_b = await counters.get(subj_b)
assert entry_a.value == 10
assert entry_b.value == 20
finally:
await js.counters.delete(name)


async def test_counters_get_nonexistent_key(js: JetStream) -> None:
name = f"test-cnt-nokey-{uuid.uuid4().hex[:8]}"
config = CountersConfig(name=name, subjects=[f"{name}.>"])
counters = await js.counters.create(config)
try:
with pytest.raises(Exception):
await counters.get(f"{name}.nonexistent")
finally:
await js.counters.delete(name)


async def test_counters_config_description(js: JetStream) -> None:
name = f"test-cnt-desc-{uuid.uuid4().hex[:8]}"
config = CountersConfig(
name=name,
subjects=[f"{name}.>"],
description="A test counters stream",
)
counters = await js.counters.create(config)
try:
assert isinstance(counters, Counters)
finally:
await js.counters.delete(name)


async def test_counters_config_defaults() -> None:
config = CountersConfig(name="test", subjects=["test.>"])
assert config.name == "test"
assert config.subjects == ["test.>"]
assert config.description is None
assert config.max_bytes is not None
assert config.max_messages is not None
4 changes: 2 additions & 2 deletions src/js/managers/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use pyo3::{Bound, PyAny, Python};
use tokio::sync::RwLock;

use crate::{exceptions::rust_err::NatsrpyResult, js::stream::StreamConfig, utils::natsrpy_future};
use crate::{exceptions::rust_err::NatsrpyResult, utils::natsrpy_future};

#[pyo3::pyclass]
pub struct CountersManager {
Expand Down Expand Up @@ -87,7 +87,7 @@ impl CountersManager {
pub fn update<'py>(
&self,
py: Python<'py>,
config: StreamConfig,
config: CountersConfig,
) -> NatsrpyResult<Bound<'py, PyAny>> {
let ctx = self.ctx.clone();
natsrpy_future(py, async move {
Expand Down
Loading