|
1 | 1 | use pyo3::{ |
2 | 2 | Bound, Py, PyAny, Python, |
3 | | - types::{PyBytes, PyDict}, |
| 3 | + types::{PyBytes, PyDateTime, PyDict}, |
4 | 4 | }; |
5 | 5 | use std::sync::Arc; |
6 | 6 | use tokio::sync::RwLock; |
7 | 7 |
|
8 | 8 | use crate::{ |
9 | 9 | exceptions::rust_err::{NatsrpyError, NatsrpyResult}, |
10 | | - utils::{natsrpy_future, py_types::TimeValue}, |
| 10 | + utils::{ |
| 11 | + natsrpy_future, |
| 12 | + py_types::{TimeValue, ToPyDate}, |
| 13 | + }, |
11 | 14 | }; |
12 | 15 |
|
| 16 | +#[derive(Debug, Clone)] |
| 17 | +pub struct JSInfo { |
| 18 | + pub domain: Option<String>, |
| 19 | + pub acc_hash: Option<String>, |
| 20 | + pub stream: String, |
| 21 | + pub consumer: String, |
| 22 | + pub stream_sequence: u64, |
| 23 | + pub consumer_sequence: u64, |
| 24 | + pub delivered: i64, |
| 25 | + pub pending: u64, |
| 26 | + pub published: time::OffsetDateTime, |
| 27 | + pub token: Option<String>, |
| 28 | +} |
| 29 | + |
| 30 | +impl From<async_nats::jetstream::message::Info<'_>> for JSInfo { |
| 31 | + fn from(value: async_nats::jetstream::message::Info) -> Self { |
| 32 | + Self { |
| 33 | + domain: value.domain.map(ToString::to_string), |
| 34 | + acc_hash: value.acc_hash.map(ToString::to_string), |
| 35 | + stream: value.stream.to_string(), |
| 36 | + consumer: value.consumer.to_string(), |
| 37 | + stream_sequence: value.stream_sequence, |
| 38 | + consumer_sequence: value.consumer_sequence, |
| 39 | + delivered: value.delivered, |
| 40 | + pending: value.pending, |
| 41 | + published: value.published, |
| 42 | + token: value.token.map(ToString::to_string), |
| 43 | + } |
| 44 | + } |
| 45 | +} |
| 46 | + |
13 | 47 | #[pyo3::pyclass] |
14 | 48 | pub struct JetStreamMessage { |
15 | 49 | message: crate::message::Message, |
| 50 | + info: JSInfo, |
16 | 51 | acker: Arc<RwLock<async_nats::jetstream::message::Acker>>, |
17 | 52 | } |
18 | 53 |
|
19 | 54 | impl TryFrom<async_nats::jetstream::Message> for JetStreamMessage { |
20 | 55 | type Error = NatsrpyError; |
21 | 56 |
|
22 | 57 | fn try_from(value: async_nats::jetstream::Message) -> Result<Self, Self::Error> { |
| 58 | + let js_info = JSInfo::from(value.info()?); |
23 | 59 | let (message, acker) = value.split(); |
24 | 60 | Ok(Self { |
25 | 61 | message: message.try_into()?, |
| 62 | + info: js_info, |
26 | 63 | acker: Arc::new(RwLock::new(acker)), |
27 | 64 | }) |
28 | 65 | } |
@@ -69,6 +106,57 @@ impl JetStreamMessage { |
69 | 106 | &self.message.headers |
70 | 107 | } |
71 | 108 |
|
| 109 | + #[getter] |
| 110 | + pub const fn domain(&mut self) -> &Option<String> { |
| 111 | + &self.info.domain |
| 112 | + } |
| 113 | + |
| 114 | + #[getter] |
| 115 | + #[must_use] |
| 116 | + pub const fn acc_hash(&self) -> &Option<String> { |
| 117 | + &self.info.acc_hash |
| 118 | + } |
| 119 | + #[getter] |
| 120 | + #[must_use] |
| 121 | + pub const fn stream(&self) -> &str { |
| 122 | + self.info.stream.as_str() |
| 123 | + } |
| 124 | + #[getter] |
| 125 | + #[must_use] |
| 126 | + pub const fn consumer(&self) -> &str { |
| 127 | + self.info.consumer.as_str() |
| 128 | + } |
| 129 | + #[getter] |
| 130 | + #[must_use] |
| 131 | + pub const fn stream_sequence(&self) -> u64 { |
| 132 | + self.info.stream_sequence |
| 133 | + } |
| 134 | + #[getter] |
| 135 | + #[must_use] |
| 136 | + pub const fn consumer_sequence(&self) -> u64 { |
| 137 | + self.info.consumer_sequence |
| 138 | + } |
| 139 | + #[getter] |
| 140 | + #[must_use] |
| 141 | + pub const fn delivered(&self) -> i64 { |
| 142 | + self.info.delivered |
| 143 | + } |
| 144 | + #[getter] |
| 145 | + #[must_use] |
| 146 | + pub const fn pending(&self) -> u64 { |
| 147 | + self.info.pending |
| 148 | + } |
| 149 | + #[getter] |
| 150 | + pub fn published<'py>(&self, py: Python<'py>) -> NatsrpyResult<Bound<'py, PyDateTime>> { |
| 151 | + Ok(self.info.published.to_py_date(py)?) |
| 152 | + } |
| 153 | + |
| 154 | + #[getter] |
| 155 | + #[must_use] |
| 156 | + pub const fn token(&self) -> &Option<String> { |
| 157 | + &self.info.token |
| 158 | + } |
| 159 | + |
72 | 160 | #[pyo3(signature=(double=false))] |
73 | 161 | pub fn ack<'py>(&self, py: Python<'py>, double: bool) -> NatsrpyResult<Bound<'py, PyAny>> { |
74 | 162 | self.inner_ack(py, async_nats::jetstream::message::AckKind::Ack, double) |
|
0 commit comments