Skip to content

Commit f0fe17d

Browse files
feat(audit): add UserOpEventPublisher for Kafka
1 parent 4a657a9 commit f0fe17d

File tree

2 files changed

+117
-1
lines changed

2 files changed

+117
-1
lines changed

crates/audit/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,19 @@ where
2626
}
2727
});
2828
}
29+
30+
pub fn connect_userop_audit_to_publisher<P>(
31+
event_rx: mpsc::UnboundedReceiver<UserOpEvent>,
32+
publisher: P,
33+
) where
34+
P: UserOpEventPublisher + 'static,
35+
{
36+
tokio::spawn(async move {
37+
let mut event_rx = event_rx;
38+
while let Some(event) = event_rx.recv().await {
39+
if let Err(e) = publisher.publish(event).await {
40+
error!(error = %e, "Failed to publish user op event");
41+
}
42+
}
43+
});
44+
}

crates/audit/src/publisher.rs

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::types::BundleEvent;
1+
use crate::types::{BundleEvent, UserOpEvent};
22
use anyhow::Result;
33
use async_trait::async_trait;
44
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -104,3 +104,103 @@ impl BundleEventPublisher for LoggingBundleEventPublisher {
104104
Ok(())
105105
}
106106
}
107+
108+
#[async_trait]
109+
pub trait UserOpEventPublisher: Send + Sync {
110+
async fn publish(&self, event: UserOpEvent) -> Result<()>;
111+
112+
async fn publish_all(&self, events: Vec<UserOpEvent>) -> Result<()>;
113+
}
114+
115+
#[derive(Clone)]
116+
pub struct KafkaUserOpEventPublisher {
117+
producer: FutureProducer,
118+
topic: String,
119+
}
120+
121+
impl KafkaUserOpEventPublisher {
122+
pub fn new(producer: FutureProducer, topic: String) -> Self {
123+
Self { producer, topic }
124+
}
125+
126+
async fn send_event(&self, event: &UserOpEvent) -> Result<()> {
127+
let user_op_hash = event.user_op_hash();
128+
let key = event.generate_event_key();
129+
let payload = serde_json::to_vec(event)?;
130+
131+
let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
132+
133+
match self
134+
.producer
135+
.send(record, tokio::time::Duration::from_secs(5))
136+
.await
137+
{
138+
Ok(_) => {
139+
debug!(
140+
user_op_hash = %user_op_hash,
141+
topic = %self.topic,
142+
payload_size = payload.len(),
143+
"Successfully published user op event"
144+
);
145+
Ok(())
146+
}
147+
Err((err, _)) => {
148+
error!(
149+
user_op_hash = %user_op_hash,
150+
topic = %self.topic,
151+
error = %err,
152+
"Failed to publish user op event"
153+
);
154+
Err(anyhow::anyhow!("Failed to publish user op event: {err}"))
155+
}
156+
}
157+
}
158+
}
159+
160+
#[async_trait]
161+
impl UserOpEventPublisher for KafkaUserOpEventPublisher {
162+
async fn publish(&self, event: UserOpEvent) -> Result<()> {
163+
self.send_event(&event).await
164+
}
165+
166+
async fn publish_all(&self, events: Vec<UserOpEvent>) -> Result<()> {
167+
for event in events {
168+
self.send_event(&event).await?;
169+
}
170+
Ok(())
171+
}
172+
}
173+
174+
#[derive(Clone)]
175+
pub struct LoggingUserOpEventPublisher;
176+
177+
impl LoggingUserOpEventPublisher {
178+
pub fn new() -> Self {
179+
Self
180+
}
181+
}
182+
183+
impl Default for LoggingUserOpEventPublisher {
184+
fn default() -> Self {
185+
Self::new()
186+
}
187+
}
188+
189+
#[async_trait]
190+
impl UserOpEventPublisher for LoggingUserOpEventPublisher {
191+
async fn publish(&self, event: UserOpEvent) -> Result<()> {
192+
info!(
193+
user_op_hash = %event.user_op_hash(),
194+
event = ?event,
195+
"Received user op event"
196+
);
197+
Ok(())
198+
}
199+
200+
async fn publish_all(&self, events: Vec<UserOpEvent>) -> Result<()> {
201+
for event in events {
202+
self.publish(event).await?;
203+
}
204+
Ok(())
205+
}
206+
}

0 commit comments

Comments
 (0)