diff --git a/Cargo.lock b/Cargo.lock index 7e565e9..ff63e51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -535,6 +535,8 @@ dependencies = [ "pyo3", "pyo3-async-runtimes", "pyo3-log", + "serde", + "serde_json", "thiserror 2.0.18", "time", "tokio", diff --git a/Cargo.toml b/Cargo.toml index c637053..ad0d018 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,8 @@ log = "0.4.29" pyo3 = { version = "0.28", features = ["abi3", "experimental-inspect"] } pyo3-async-runtimes = { version = "0.28", features = ["tokio-runtime"] } pyo3-log = "0.13.3" +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.149" thiserror = "2.0.18" time = "0.3.47" tokio = { version = "1.50.0", features = ["full"] } diff --git a/python/natsrpy/_natsrpy_rs/js/__init__.pyi b/python/natsrpy/_natsrpy_rs/js/__init__.pyi index 1bccdea..d280a09 100644 --- a/python/natsrpy/_natsrpy_rs/js/__init__.pyi +++ b/python/natsrpy/_natsrpy_rs/js/__init__.pyi @@ -1,14 +1,15 @@ from datetime import datetime, timedelta from typing import Any, Literal, final, overload -from . import consumers, kv, managers, object_store, stream -from .managers import KVManager, ObjectStoreManager, StreamsManager +from . import consumers, counters, kv, managers, object_store, stream +from .managers import CountersManager, KVManager, ObjectStoreManager, StreamsManager __all__ = [ "JetStream", "JetStreamMessage", "Publication", "consumers", + "counters", "kv", "managers", "object_store", @@ -24,14 +25,19 @@ class Publication: sequence: sequence number assigned to the message in the stream. domain: JetStream domain of the stream. duplicate: whether the server detected this as a duplicate message. - value: optional metadata value returned by the server. + value: counter value. Only used if counters are enabled. """ - stream: str - sequence: int - domain: str - duplicate: bool - value: str | None + @property + def stream(self) -> str: ... + @property + def sequence(self) -> int: ... + @property + def domain(self) -> str: ... + @property + def duplicate(self) -> bool: ... + @property + def value(self) -> str | None: ... @final class JetStream: @@ -83,6 +89,10 @@ class JetStream: def object_store(self) -> ObjectStoreManager: """Manager for object store buckets.""" + @property + def counters(self) -> CountersManager: + """Manager for streams with CRDT counter support.""" + @final class JetStreamMessage: """Message received from a JetStream consumer. diff --git a/python/natsrpy/_natsrpy_rs/js/counters.pyi b/python/natsrpy/_natsrpy_rs/js/counters.pyi new file mode 100644 index 0000000..86e13c9 --- /dev/null +++ b/python/natsrpy/_natsrpy_rs/js/counters.pyi @@ -0,0 +1,185 @@ +from datetime import timedelta +from typing import final + +from typing_extensions import Self + +from .stream import ( + Compression, + ConsumerLimits, + DiscardPolicy, + PersistenceMode, + Placement, + Republish, + RetentionPolicy, + Source, + StorageType, + SubjectTransform, +) + +__all__ = ["CounterEntry", "Counters", "CountersConfig"] + +@final +class CountersConfig: + """Configuration for creating or updating a JetStream stream. + + This config is almost the same as `StreamConfig`, + but it has 2 predefined values; + + * allow_message_counter=true + * allow_direct=true + + These two are required for counters API to work. + + Attributes: + name: stream name. + subjects: list of subjects the stream listens on. + max_bytes: maximum total size of the stream in bytes. + max_messages: maximum number of messages in the stream. + max_messages_per_subject: maximum messages per subject. + discard: policy for discarding messages when limits are reached. + discard_new_per_subject: when True, apply discard policy per + subject. + retention: message retention policy. + max_consumers: maximum number of consumers. + max_age: maximum message age. + max_message_size: maximum size of a single message in bytes. + storage: storage backend type. + num_replicas: number of stream replicas. + no_ack: when True, disable publish acknowledgements. + duplicate_window: time window for duplicate detection. + template_owner: name of the owning stream template. + sealed: when True, the stream is read-only. + description: human-readable stream description. + allow_rollup: when True, allow ``Nats-Rollup`` header to purge + subjects. + deny_delete: when True, deny message deletion via the API. + deny_purge: when True, deny stream purge via the API. + republish: configuration for republishing messages. + mirror_direct: when True, enable direct get for mirror streams. + mirror: source configuration when the stream is a mirror. + sources: list of source configurations for aggregate streams. + metadata: custom key-value metadata. + subject_transform: subject transformation rule. + compression: compression algorithm for stored messages. + consumer_limits: default limits applied to new consumers. + first_sequence: initial sequence number for the stream. + placement: cluster and tag placement hints. + persist_mode: write persistence mode. + pause_until: timestamp until which the stream is paused. + allow_message_ttl: when True, allow per-message TTL. + subject_delete_marker_ttl: TTL for subject delete markers. + allow_atomic_publish: when True, enable atomic multi-message + publish. + allow_message_schedules: when True, enable scheduled message + delivery. + """ + + name: str + subjects: list[str] + max_bytes: int | None + max_messages: int | None + max_messages_per_subject: int | None + discard: DiscardPolicy | None + discard_new_per_subject: bool | None + retention: RetentionPolicy | None + max_consumers: int | None + max_age: timedelta | None + max_message_size: int | None + storage: StorageType | None + num_replicas: int | None + no_ack: bool | None + duplicate_window: timedelta | None + template_owner: str | None + sealed: bool | None + description: str | None + allow_rollup: bool | None + deny_delete: bool | None + deny_purge: bool | None + republish: Republish | None + mirror_direct: bool | None + mirror: Source | None + sources: list[Source] | None + metadata: dict[str, str] | None + subject_transform: SubjectTransform | None + compression: Compression | None + consumer_limits: ConsumerLimits | None + first_sequence: int | None + placement: Placement | None + persist_mode: PersistenceMode | None + pause_until: int | None + allow_message_ttl: bool | None + subject_delete_marker_ttl: timedelta | None + allow_atomic_publish: bool | None + allow_message_schedules: bool | None + + def __new__( + cls, + name: str, + subjects: list[str], + max_bytes: int | None = None, + max_messages: int | None = None, + max_messages_per_subject: int | None = None, + discard: DiscardPolicy | None = None, + discard_new_per_subject: bool | None = None, + retention: RetentionPolicy | None = None, + max_consumers: int | None = None, + max_age: float | timedelta | None = None, + max_message_size: int | None = None, + storage: StorageType | None = None, + num_replicas: int | None = None, + no_ack: bool | None = None, + duplicate_window: float | timedelta | None = None, + template_owner: str | None = None, + sealed: bool | None = None, + description: str | None = None, + allow_rollup: bool | None = None, + deny_delete: bool | None = None, + deny_purge: bool | None = None, + republish: Republish | None = None, + mirror_direct: bool | None = None, + mirror: Source | None = None, + sources: list[Source] | None = None, + metadata: dict[str, str] | None = None, + subject_transform: SubjectTransform | None = None, + compression: Compression | None = None, + consumer_limits: ConsumerLimits | None = None, + first_sequence: int | None = None, + placement: Placement | None = None, + persist_mode: PersistenceMode | None = None, + pause_until: int | None = None, + allow_message_ttl: bool | None = None, + subject_delete_marker_ttl: float | timedelta | None = None, + allow_atomic_publish: bool | None = None, + allow_message_schedules: bool | None = None, + ) -> Self: ... + +@final +class CounterEntry: + subject: str + value: int + sources: dict[str, dict[str, int]] + increment: int | None + +@final +class Counters: + async def add( + self, + key: str, + value: int, + timeout: float | timedelta | None = None, + ) -> int: ... + async def incr( + self, + key: str, + timeout: float | timedelta | None = None, + ) -> int: ... + async def decr( + self, + key: str, + timeout: float | timedelta | None = None, + ) -> int: ... + async def get( + self, + key: str, + timeout: float | timedelta | None = None, + ) -> CounterEntry: ... diff --git a/python/natsrpy/_natsrpy_rs/js/managers.pyi b/python/natsrpy/_natsrpy_rs/js/managers.pyi index 1caf8c6..1eca8e1 100644 --- a/python/natsrpy/_natsrpy_rs/js/managers.pyi +++ b/python/natsrpy/_natsrpy_rs/js/managers.pyi @@ -7,12 +7,14 @@ from .consumers import ( PushConsumer, PushConsumerConfig, ) +from .counters import Counters, CountersConfig from .kv import KeyValue, KVConfig from .object_store import ObjectStore, ObjectStoreConfig from .stream import Stream, StreamConfig __all__ = [ "ConsumersManager", + "CountersManager", "KVManager", "ObjectStoreManager", "StreamsManager", @@ -57,6 +59,45 @@ class StreamsManager: :return: the updated stream. """ +@final +class CountersManager: + """Manager for JetStream stream with counters support CRUD operations.""" + + async def create(self, config: CountersConfig) -> Counters: + """Create a new counters stream. + + :param config: stream configuration. + :return: the created stream. + """ + + async def create_or_update(self, config: CountersConfig) -> Counters: + """Create a counters stream or update it if it already exists. + + :param config: stream configuration. + :return: the created or updated stream. + """ + + async def get(self, name: str) -> Counters: + """Get an existing counters stream by name. + + :param name: stream name. + :return: the stream. + """ + + async def delete(self, name: str) -> bool: + """Delete a counters stream. + + :param name: stream name. + :return: True if the stream was deleted. + """ + + async def update(self, config: CountersConfig) -> Counters: + """Update an existing counters stream configuration. + + :param config: new stream configuration. + :return: the updated stream. + """ + @final class KVManager: """Manager for key-value bucket CRUD operations.""" diff --git a/python/natsrpy/js.py b/python/natsrpy/js.py index 9050186..8c09b75 100644 --- a/python/natsrpy/js.py +++ b/python/natsrpy/js.py @@ -10,6 +10,7 @@ PushConsumerConfig, ReplayPolicy, ) +from ._natsrpy_rs.js.counters import CounterEntry, Counters, CountersConfig from ._natsrpy_rs.js.kv import ( KeysIterator, KeyValue, @@ -53,6 +54,9 @@ "ClusterInfo", "Compression", "ConsumerLimits", + "CounterEntry", + "Counters", + "CountersConfig", "DeliverPolicy", "DiscardPolicy", "External", diff --git a/src/exceptions/rust_err.rs b/src/exceptions/rust_err.rs index 673f998..12ccac4 100644 --- a/src/exceptions/rust_err.rs +++ b/src/exceptions/rust_err.rs @@ -9,6 +9,10 @@ pub enum NatsrpyError { #[error(transparent)] StdIOError(#[from] std::io::Error), #[error(transparent)] + StdParseIntError(#[from] std::num::ParseIntError), + #[error(transparent)] + JSONParseError(#[from] serde_json::Error), + #[error(transparent)] UnknownError(#[from] Box), #[error("NATS session error: {0}")] SessionError(String), @@ -79,6 +83,8 @@ pub enum NatsrpyError { #[error(transparent)] StreamPurgeError(#[from] async_nats::jetstream::stream::PurgeError), #[error(transparent)] + StreamLastRawMessageError(#[from] async_nats::jetstream::stream::LastRawMessageError), + #[error(transparent)] PullMessageError(#[from] async_nats::jetstream::consumer::pull::MessagesError), #[error(transparent)] ConsumerError(#[from] async_nats::jetstream::stream::ConsumerError), diff --git a/src/js/counters.rs b/src/js/counters.rs index e69de29..7819cd0 100644 --- a/src/js/counters.rs +++ b/src/js/counters.rs @@ -0,0 +1,421 @@ +use std::{collections::HashMap, sync::Arc, time::Duration}; + +use async_nats::{HeaderMap, jetstream::context::traits::Publisher}; +use pyo3::{Bound, PyAny, Python}; +use tokio::sync::RwLock; + +use crate::{ + exceptions::rust_err::{NatsrpyError, NatsrpyResult}, + js::stream::{ + Compression, ConsumerLimits, DiscardPolicy, PersistenceMode, Placement, Republish, + RetentionPolicy, Source, StorageType, SubjectTransform, + }, + utils::{futures::natsrpy_future_with_timeout, py_types::TimeValue}, +}; + +const COUNTER_INCREMENT_HEADER: &str = "Nats-Incr"; +const COUNTER_SOURCES_HEADER: &str = "Nats-Counter-Sources"; +type CounterSources = HashMap>; + +#[pyo3::pyclass(from_py_object, get_all, set_all)] +#[derive(Debug, Clone, Default)] +#[allow(clippy::struct_excessive_bools)] +pub struct CountersConfig { + pub name: String, + pub subjects: Vec, + pub max_bytes: i64, + pub max_messages: i64, + pub max_messages_per_subject: i64, + pub discard: DiscardPolicy, + pub discard_new_per_subject: bool, + pub retention: RetentionPolicy, + pub max_consumers: i32, + pub max_age: Duration, + pub max_message_size: i32, + pub storage: StorageType, + pub num_replicas: usize, + pub no_ack: bool, + pub duplicate_window: Duration, + pub template_owner: String, + pub sealed: bool, + pub description: Option, + pub allow_rollup: bool, + pub deny_delete: bool, + pub deny_purge: bool, + pub republish: Option, + pub mirror_direct: bool, + pub mirror: Option, + pub sources: Option>, + pub metadata: HashMap, + pub subject_transform: Option, + pub compression: Option, + pub consumer_limits: Option, + pub first_sequence: Option, + pub placement: Option, + pub persist_mode: Option, + pub pause_until: Option, + pub allow_message_ttl: bool, + pub subject_delete_marker_ttl: Option, + pub allow_atomic_publish: bool, + pub allow_message_schedules: bool, +} + +#[pyo3::pymethods] +impl CountersConfig { + #[new] + #[pyo3(signature=( + name, + subjects, + max_bytes=None, + max_messages=None, + max_messages_per_subject=None, + discard=None, + discard_new_per_subject=None, + retention=None, + max_consumers=None, + max_age=None, + max_message_size=None, + storage=None, + num_replicas=None, + no_ack=None, + duplicate_window=None, + template_owner=None, + sealed=None, + description=None, + allow_rollup=None, + deny_delete=None, + deny_purge=None, + republish=None, + mirror_direct=None, + mirror=None, + sources=None, + metadata=None, + subject_transform=None, + compression=None, + consumer_limits=None, + first_sequence=None, + placement=None, + persist_mode=None, + pause_until=None, + allow_message_ttl=None, + subject_delete_marker_ttl=None, + allow_atomic_publish=None, + allow_message_schedules=None, + ))] + pub fn __new__( + name: String, + subjects: Vec, + max_bytes: Option, + max_messages: Option, + max_messages_per_subject: Option, + discard: Option, + discard_new_per_subject: Option, + retention: Option, + max_consumers: Option, + max_age: Option, + max_message_size: Option, + storage: Option, + num_replicas: Option, + no_ack: Option, + duplicate_window: Option, + template_owner: Option, + sealed: Option, + description: Option, + allow_rollup: Option, + deny_delete: Option, + deny_purge: Option, + republish: Option, + mirror_direct: Option, + mirror: Option, + sources: Option>, + metadata: Option>, + subject_transform: Option, + compression: Option, + consumer_limits: Option, + first_sequence: Option, + placement: Option, + persist_mode: Option, + pause_until: Option, + allow_message_ttl: Option, + subject_delete_marker_ttl: Option, + allow_atomic_publish: Option, + allow_message_schedules: Option, + ) -> Self { + Self { + name, + subjects, + description, + republish, + mirror, + sources, + subject_transform, + compression, + consumer_limits, + first_sequence, + placement, + persist_mode, + pause_until, + + subject_delete_marker_ttl: subject_delete_marker_ttl.map(Into::into), + max_bytes: max_bytes.unwrap_or_default(), + max_messages: max_messages.unwrap_or_default(), + max_messages_per_subject: max_messages_per_subject.unwrap_or_default(), + discard: discard.unwrap_or_default(), + discard_new_per_subject: discard_new_per_subject.unwrap_or_default(), + retention: retention.unwrap_or_default(), + max_consumers: max_consumers.unwrap_or_default(), + max_age: max_age.unwrap_or_default().into(), + max_message_size: max_message_size.unwrap_or_default(), + storage: storage.unwrap_or_default(), + num_replicas: num_replicas.unwrap_or_default(), + no_ack: no_ack.unwrap_or_default(), + duplicate_window: duplicate_window.unwrap_or_default().into(), + template_owner: template_owner.unwrap_or_default(), + sealed: sealed.unwrap_or_default(), + allow_rollup: allow_rollup.unwrap_or_default(), + deny_delete: deny_delete.unwrap_or_default(), + deny_purge: deny_purge.unwrap_or_default(), + mirror_direct: mirror_direct.unwrap_or_default(), + metadata: metadata.unwrap_or_default(), + allow_message_ttl: allow_message_ttl.unwrap_or_default(), + allow_atomic_publish: allow_atomic_publish.unwrap_or_default(), + allow_message_schedules: allow_message_schedules.unwrap_or_default(), + } + } +} + +impl TryFrom for async_nats::jetstream::stream::Config { + type Error = NatsrpyError; + + fn try_from(value: CountersConfig) -> Result { + let mut conf = Self { + name: value.name, + subjects: value.subjects, + description: value.description, + first_sequence: value.first_sequence, + subject_delete_marker_ttl: value.subject_delete_marker_ttl, + allow_direct: true, + allow_message_counter: true, + ..Default::default() + }; + + // Optional values that have defaults. + // If the value is not present, we just use the one + // that nats' config defaults to. + conf.max_bytes = value.max_bytes; + conf.max_messages = value.max_messages; + conf.max_messages_per_subject = value.max_messages_per_subject; + conf.discard_new_per_subject = value.discard_new_per_subject; + conf.max_consumers = value.max_consumers; + conf.max_age = value.max_age; + conf.max_message_size = value.max_message_size; + conf.num_replicas = value.num_replicas; + conf.no_ack = value.no_ack; + conf.duplicate_window = value.duplicate_window; + conf.template_owner = value.template_owner; + conf.sealed = value.sealed; + conf.allow_rollup = value.allow_rollup; + conf.deny_delete = value.deny_delete; + conf.deny_purge = value.deny_purge; + conf.mirror_direct = value.mirror_direct; + conf.metadata = value.metadata; + conf.allow_message_ttl = value.allow_message_ttl; + conf.allow_atomic_publish = value.allow_atomic_publish; + conf.allow_message_schedules = value.allow_message_schedules; + + // Values that require conversion between python -> rust types. + conf.republish = value.republish.map(Into::into); + conf.storage = value.storage.into(); + conf.retention = value.retention.into(); + conf.discard = value.discard.into(); + conf.mirror = value.mirror.map(TryInto::try_into).transpose()?; + conf.sources = value + .sources + .map(|sources| { + sources + .into_iter() + .map(TryInto::try_into) + .collect::, _>>() + }) + .transpose()?; + conf.subject_transform = value.subject_transform.map(Into::into); + conf.compression = value.compression.map(Into::into); + conf.consumer_limits = value.consumer_limits.map(Into::into); + conf.placement = value.placement.map(Into::into); + conf.persist_mode = value.persist_mode.map(Into::into); + conf.pause_until = value + .pause_until + .map(time::OffsetDateTime::from_unix_timestamp) + .transpose()?; + + Ok(conf) + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct CounterPayload<'a> { + val: &'a str, +} + +#[pyo3::pyclass(from_py_object, get_all)] +#[derive(Clone)] +pub struct CounterEntry { + pub subject: String, + pub value: i128, + pub sources: CounterSources, + pub increment: Option, +} + +impl TryFrom for CounterEntry { + type Error = NatsrpyError; + + fn try_from(value: async_nats::jetstream::message::StreamMessage) -> Result { + let counter_value = serde_json::from_slice::(&value.payload)? + .val + .parse::()?; + let sources = parse_sources(&value.headers)?; + let increment = parse_increment(&value.headers)?; + Ok(Self { + subject: value.subject.to_string(), + value: counter_value, + sources, + increment, + }) + } +} + +#[pyo3::pymethods] +impl CounterEntry { + pub fn __repr__(&self) -> String { + format!( + "CounterEntry", + self.subject, + self.value, + self.increment + .as_ref() + .map_or_else(|| String::from("None"), ToString::to_string) + ) + } +} + +#[pyo3::pyclass] +#[allow(dead_code)] +pub struct Counters { + stream: Arc>>, + js: Arc>, +} + +impl Counters { + pub fn new( + stream: async_nats::jetstream::stream::Stream, + js: Arc>, + ) -> Self { + Self { + stream: Arc::new(RwLock::new(stream)), + js, + } + } +} + +fn parse_sources(headers: &HeaderMap) -> NatsrpyResult { + let Some(sources) = headers.get(COUNTER_SOURCES_HEADER) else { + return Ok(CounterSources::new()); + }; + let raw_sources = + serde_json::from_str::>>(sources.as_str())?; + let mut sources = CounterSources::new(); + for (source_id, subjects) in raw_sources { + let mut subject_values = HashMap::new(); + for (subject, value_str) in subjects { + subject_values.insert(subject, value_str.parse()?); + } + sources.insert(source_id, subject_values); + } + + Ok(sources) +} + +pub fn parse_increment(headers: &HeaderMap) -> NatsrpyResult> { + let Some(header_value) = headers.get(COUNTER_INCREMENT_HEADER) else { + return Ok(None); + }; + Ok(Some(header_value.as_str().parse()?)) +} + +#[pyo3::pymethods] +impl Counters { + #[pyo3(signature=(key, value, timeout=None))] + pub fn add<'py>( + &self, + py: Python<'py>, + key: String, + value: i128, + timeout: Option, + ) -> NatsrpyResult> { + let js = self.js.clone(); + let mut headers = HeaderMap::new(); + headers.insert(COUNTER_INCREMENT_HEADER, value.to_string()); + natsrpy_future_with_timeout(py, timeout, async move { + let resp = js + .read() + .await + .publish_message(async_nats::jetstream::message::OutboundMessage { + subject: key.into(), + payload: bytes::Bytes::new(), + headers: Some(headers), + }) + .await? + .await?; + match &resp.value { + Some(val) => Ok(val.parse::()?), + None => Err(NatsrpyError::SessionError(String::from( + "Missing counter response value.", + ))), + } + }) + } + + #[pyo3(signature=(key, timeout=None))] + pub fn incr<'py>( + &self, + py: Python<'py>, + key: String, + timeout: Option, + ) -> NatsrpyResult> { + self.add(py, key, 1, timeout) + } + + #[pyo3(signature=(key, timeout=None))] + pub fn decr<'py>( + &self, + py: Python<'py>, + key: String, + timeout: Option, + ) -> NatsrpyResult> { + self.add(py, key, -1, timeout) + } + + #[pyo3(signature=(key, timeout=None))] + pub fn get<'py>( + &self, + py: Python<'py>, + key: String, + timeout: Option, + ) -> NatsrpyResult> { + let stream_guard = self.stream.clone(); + natsrpy_future_with_timeout(py, timeout, async move { + let message = stream_guard + .read() + .await + .direct_get_last_for_subject(key) + .await?; + CounterEntry::try_from(message) + }) + } +} + +#[pyo3::pymodule(submodule, name = "counters")] +pub mod pymod { + #[pymodule_export] + use super::{CounterEntry, Counters, CountersConfig}; +} diff --git a/src/js/jetstream.rs b/src/js/jetstream.rs index 212ad36..480da14 100644 --- a/src/js/jetstream.rs +++ b/src/js/jetstream.rs @@ -6,7 +6,10 @@ use tokio::sync::RwLock; use crate::{ exceptions::rust_err::{NatsrpyError, NatsrpyResult}, - js::managers::{kv::KVManager, object_store::ObjectStoreManager, streams::StreamsManager}, + js::managers::{ + counters::CountersManager, kv::KVManager, object_store::ObjectStoreManager, + streams::StreamsManager, + }, utils::{headers::NatsrpyHeadermapExt, natsrpy_future, py_types::SendableValue}, }; @@ -66,6 +69,12 @@ impl JetStream { ObjectStoreManager::new(self.ctx.clone()) } + #[getter] + #[must_use] + pub fn counters(&self) -> CountersManager { + CountersManager::new(self.ctx.clone()) + } + #[pyo3(signature = ( subject, payload, diff --git a/src/js/managers/counters.rs b/src/js/managers/counters.rs new file mode 100644 index 0000000..498c46f --- /dev/null +++ b/src/js/managers/counters.rs @@ -0,0 +1,105 @@ +use std::sync::Arc; + +use crate::{ + exceptions::rust_err::NatsrpyError, + js::counters::{Counters, CountersConfig}, +}; +use pyo3::{Bound, PyAny, Python}; +use tokio::sync::RwLock; + +use crate::{exceptions::rust_err::NatsrpyResult, js::stream::StreamConfig, utils::natsrpy_future}; + +#[pyo3::pyclass] +pub struct CountersManager { + ctx: Arc>, +} + +impl CountersManager { + pub const fn new(ctx: Arc>) -> Self { + Self { ctx } + } +} + +#[pyo3::pymethods] +impl CountersManager { + pub fn create<'py>( + &self, + py: Python<'py>, + config: CountersConfig, + ) -> NatsrpyResult> { + let ctx = self.ctx.clone(); + natsrpy_future(py, async move { + let js = ctx.read().await; + Ok(Counters::new( + js.create_stream(async_nats::jetstream::stream::Config::try_from(config)?) + .await?, + ctx.clone(), + )) + }) + } + + pub fn create_or_update<'py>( + &self, + py: Python<'py>, + config: CountersConfig, + ) -> NatsrpyResult> { + let ctx = self.ctx.clone(); + natsrpy_future(py, async move { + let info = ctx + .read() + .await + .create_or_update_stream(async_nats::jetstream::stream::Config::try_from(config)?) + .await?; + Ok(Counters::new( + ctx.read().await.get_stream(info.config.name).await?, + ctx.clone(), + )) + }) + } + + pub fn get<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { + let ctx = self.ctx.clone(); + natsrpy_future(py, async move { + let stream = ctx.read().await.get_stream(&name).await?; + let config = stream.get_info().await?.config; + if !config.allow_direct { + return Err(NatsrpyError::SessionError(format!( + "Stream {name} doesn't allow direct get.", + ))); + } + if !config.allow_message_counter { + return Err(NatsrpyError::SessionError(format!( + "Stream {name} doesn't allow message counters.", + ))); + } + Ok(Counters::new(stream, ctx.clone())) + }) + } + + pub fn delete<'py>(&self, py: Python<'py>, name: String) -> NatsrpyResult> { + let ctx = self.ctx.clone(); + natsrpy_future(py, async move { + let js = ctx.read().await; + Ok(js.delete_stream(name).await?.success) + }) + } + + pub fn update<'py>( + &self, + py: Python<'py>, + config: StreamConfig, + ) -> NatsrpyResult> { + let ctx = self.ctx.clone(); + natsrpy_future(py, async move { + let info = ctx + .read() + .await + .update_stream(async_nats::jetstream::stream::Config::try_from(config)?) + .await?; + Ok(Counters::new( + ctx.read().await.get_stream(info.config.name).await?, + ctx.clone(), + )) + }) + } +} diff --git a/src/js/managers/mod.rs b/src/js/managers/mod.rs index 399b866..671e48e 100644 --- a/src/js/managers/mod.rs +++ b/src/js/managers/mod.rs @@ -1,4 +1,5 @@ pub mod consumers; +pub mod counters; pub mod kv; pub mod object_store; pub mod streams; @@ -8,6 +9,8 @@ pub mod pymod { #[pymodule_export] use super::consumers::ConsumersManager; #[pymodule_export] + use super::counters::CountersManager; + #[pymodule_export] use super::kv::KVManager; #[pymodule_export] use super::object_store::ObjectStoreManager; diff --git a/src/js/mod.rs b/src/js/mod.rs index 011300d..9915d3b 100644 --- a/src/js/mod.rs +++ b/src/js/mod.rs @@ -1,4 +1,5 @@ pub mod consumers; +pub mod counters; pub mod jetstream; pub mod kv; pub mod managers; @@ -19,6 +20,8 @@ pub mod pymod { #[pymodule_export] pub use super::consumers::pymod as consumers; #[pymodule_export] + pub use super::counters::pymod as counters; + #[pymodule_export] pub use super::kv::pymod as kv; #[pymodule_export] pub use super::managers::pymod as managers; diff --git a/src/utils/futures.rs b/src/utils/futures.rs index a6b2a10..d74baf1 100644 --- a/src/utils/futures.rs +++ b/src/utils/futures.rs @@ -14,6 +14,7 @@ where Ok(res) } +#[inline] pub fn natsrpy_future_with_timeout( py: Python, timeout: Option,