Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion src/api/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::api::models::{
AddMessageRequest, DeleteMessagesRequest, GetMessagesRequest, RetryMessagesRequest,
AddMessageRequest, DeleteMessagesRequest, GetMessagesRequest, ProcessingMessagesRequest,
RetryMessagesRequest,
};
use crate::services::MessageService;
use crate::types::Message;
Expand Down Expand Up @@ -29,6 +30,16 @@ pub async fn get_messages(
}
}

pub async fn processing_messages(
State(service): State<MessageService>,
Json(_request): Json<ProcessingMessagesRequest>,
) -> ApiResponse<Vec<Message>> {
match service.processing().await {
Ok(messages) => success(messages.values().cloned().collect::<Vec<Message>>()),
Err(message) => error(ApiError::BadRequest(Some(message))),
}
}

pub async fn delete_messages(
State(service): State<MessageService>,
Json(request): Json<DeleteMessagesRequest>,
Expand Down
1 change: 1 addition & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub fn create_api(service: MessageService) -> Router {
.route("/hello", get(health::check))
.route("/add", post(handlers::add_message))
.route("/get", post(handlers::get_messages))
.route("/processing", post(handlers::processing_messages))
.route("/delete", post(handlers::delete_messages))
.route("/purge", post(handlers::purge_messages))
.route("/retry", post(handlers::retry_messages))
Expand Down
3 changes: 3 additions & 0 deletions src/api/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub struct GetMessagesRequest {
pub count: Option<usize>,
}

#[derive(Serialize, Deserialize, Debug)]
pub struct ProcessingMessagesRequest;

#[derive(Serialize, Deserialize, Debug)]
pub struct DeleteMessagesRequest {
pub ids: Vec<String>,
Expand Down
6 changes: 5 additions & 1 deletion src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::config;
use crate::storage::traits::Storage;
use crate::types::Message;
use crate::types::{Message, MessageProcessing};
use std::sync::Arc;

#[derive(Clone)]
Expand Down Expand Up @@ -29,6 +29,10 @@ impl MessageService {
self.store.get(count).await
}

pub async fn processing(&self) -> Result<MessageProcessing, String> {
self.store.processing().await
}

pub async fn delete(&self, ids: Vec<String>) -> Result<(), String> {
Self::validate_ids(&ids)?;

Expand Down
35 changes: 30 additions & 5 deletions src/storage/memory/base.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use crate::types::{Message, MessageState};
use std::collections::HashMap;
use crate::types::{Message, MessageProcessing, MessageState};

pub struct BaseMemoryStorage {
queue: Vec<Message>,
processing: HashMap<String, Message>,
processing: MessageProcessing,
}

impl BaseMemoryStorage {
pub(crate) fn new() -> Self {
BaseMemoryStorage {
queue: Vec::new(),
processing: HashMap::new(),
processing: MessageProcessing::new(),
}
}

Expand All @@ -32,6 +31,10 @@ impl BaseMemoryStorage {
Ok(messages)
}

pub(crate) async fn processing(&self) -> Result<MessageProcessing, String> {
Ok(self.processing.clone())
}

pub(crate) async fn delete(&mut self, ids: Vec<String>) -> Result<(), String> {
for id in ids {
self.processing.remove(&id);
Expand Down Expand Up @@ -65,6 +68,8 @@ impl BaseMemoryStorage {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use super::*;

fn setup_storage() -> BaseMemoryStorage {
Expand All @@ -74,7 +79,7 @@ mod tests {
Message::new("Hello Solar System".to_string()),
Message::new("Hello Universe".to_string()),
],
processing: HashMap::new(),
processing: MessageProcessing::new(),
}
}

Expand Down Expand Up @@ -110,6 +115,26 @@ mod tests {
assert_eq!(storage.processing.len(), 2);
}

#[tokio::test]
async fn test_base_memory_storage_processing() {
let mut storage = setup_storage();

let processing_message = storage.processing().await.unwrap();
assert_eq!(processing_message.len(), 0);

let get_messages = storage
.get(2)
.await
.unwrap()
.into_iter()
.collect::<HashSet<_>>();
let processing_message = storage.processing().await.unwrap();

let messages = processing_message.into_values().collect::<HashSet<_>>();
assert_eq!(get_messages.len(), messages.len());
assert_eq!(get_messages, messages);
}

#[tokio::test]
async fn test_base_memory_storage_get_more_than_available() {
let mut storage = setup_storage();
Expand Down
7 changes: 6 additions & 1 deletion src/storage/memory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::storage::traits::Storage;
use crate::types::Message;
use crate::types::{Message, MessageProcessing};
use async_trait::async_trait;
use base::BaseMemoryStorage;
use std::sync::Arc;
Expand Down Expand Up @@ -37,6 +37,11 @@ impl Storage for MemoryStorage {
storage.get(count).await
}

async fn processing(&self) -> Result<MessageProcessing, String> {
let storage = self.inner.lock().await;
storage.processing().await
}

async fn delete(&self, ids: Vec<String>) -> Result<(), String> {
let mut storage = self.inner.lock().await;
storage.delete(ids).await
Expand Down
3 changes: 2 additions & 1 deletion src/storage/traits.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::types::Message;
use crate::types::{Message, MessageProcessing};
use async_trait::async_trait;

#[async_trait]
pub trait Storage: Send + Sync {
async fn add(&self, msg: Message) -> Result<(), String>;
async fn get(&self, count: usize) -> Result<Vec<Message>, String>;
async fn processing(&self) -> Result<MessageProcessing, String>;
async fn delete(&self, ids: Vec<String>) -> Result<(), String>;
async fn purge(&self) -> Result<(), String>;
async fn retry(&self, ids: Vec<String>) -> Result<(), String>;
Expand Down
6 changes: 4 additions & 2 deletions src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
use uuid::Uuid;

/// Represents the current state of a message in the queue
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, PartialOrd, Eq, Ord, Hash)]
pub enum MessageState {
/// Message is available for processing
Ready,
Expand All @@ -14,7 +14,7 @@ pub enum MessageState {

/// Represents a message in the queue system.
/// Uses UUID v7 for time-ordered message IDs with embedded timestamps.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct Message {
/// Unique identifier with embedded timestamp (UUID v7)
pub id: Uuid,
Expand Down Expand Up @@ -61,6 +61,8 @@ impl Message {
}
}

pub type MessageProcessing = std::collections::HashMap<String, Message>;

#[cfg(test)]
mod tests {
use super::*;
Expand Down
5 changes: 3 additions & 2 deletions tests/integration/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod add;
mod add;
mod delete;
pub mod get;
mod get;
mod processing;
mod purge;
mod retry;
32 changes: 32 additions & 0 deletions tests/integration/messages/processing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use crate::common::{create_post_request, send_request, setup_test_app};
use http::StatusCode;
use http_body_util::BodyExt;
use serde_json::json;
use std::collections::HashSet;
use tlq::types::Message;

#[tokio::test]
async fn test_processing_messages() {
let mut app = setup_test_app().into_service();

for i in 1..=5 {
let post_request = create_post_request("/add", json!({"body": format!("message {}", i)}));
send_request(&mut app, post_request).await;
}

let get_request = create_post_request("/get", json!({"count": 3}));
let response = send_request(&mut app, get_request).await;
assert_eq!(response.status(), StatusCode::OK);

let get_body = response.into_body().collect().await.unwrap().to_bytes();
let get_json = serde_json::from_slice::<HashSet<Message>>(&get_body).unwrap();
assert_eq!(get_json.len(), 3);

let processing_request = create_post_request("/processing", json!(null));
let response = send_request(&mut app, processing_request).await;
assert_eq!(response.status(), StatusCode::OK);

let processing_body = response.into_body().collect().await.unwrap().to_bytes();
let processing_json = serde_json::from_slice::<HashSet<Message>>(&processing_body).unwrap();
assert_eq!(get_json, processing_json);
}