diff --git a/src/api/handlers.rs b/src/api/handlers.rs index 59f6715..e97cd2a 100644 --- a/src/api/handlers.rs +++ b/src/api/handlers.rs @@ -1,5 +1,6 @@ use crate::api::models::{ - AddMessageRequest, DeleteMessagesRequest, GetMessagesRequest, RetryMessagesRequest, + AddMessageRequest, DeleteMessagesRequest, GetMessagesRequest, ProcessingMessagesRequest, + RetryMessagesRequest, }; use crate::services::MessageService; use crate::types::Message; @@ -29,6 +30,16 @@ pub async fn get_messages( } } +pub async fn processing_messages( + State(service): State, + Json(_request): Json, +) -> ApiResponse> { + match service.processing().await { + Ok(messages) => success(messages.values().cloned().collect::>()), + Err(message) => error(ApiError::BadRequest(Some(message))), + } +} + pub async fn delete_messages( State(service): State, Json(request): Json, diff --git a/src/api/mod.rs b/src/api/mod.rs index d51a2d4..54905fd 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -11,6 +11,7 @@ pub fn create_api(service: MessageService) -> Router { .route("/hello", get(health::check)) .route("/add", post(handlers::add_message)) .route("/get", post(handlers::get_messages)) + .route("/processing", post(handlers::processing_messages)) .route("/delete", post(handlers::delete_messages)) .route("/purge", post(handlers::purge_messages)) .route("/retry", post(handlers::retry_messages)) diff --git a/src/api/models.rs b/src/api/models.rs index cf315bb..ce683bc 100644 --- a/src/api/models.rs +++ b/src/api/models.rs @@ -10,6 +10,9 @@ pub struct GetMessagesRequest { pub count: Option, } +#[derive(Serialize, Deserialize, Debug)] +pub struct ProcessingMessagesRequest; + #[derive(Serialize, Deserialize, Debug)] pub struct DeleteMessagesRequest { pub ids: Vec, diff --git a/src/services/mod.rs b/src/services/mod.rs index 48fdbcd..54d068f 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,6 +1,6 @@ use crate::config; use crate::storage::traits::Storage; -use crate::types::Message; +use crate::types::{Message, MessageProcessing}; use std::sync::Arc; #[derive(Clone)] @@ -29,6 +29,10 @@ impl MessageService { self.store.get(count).await } + pub async fn processing(&self) -> Result { + self.store.processing().await + } + pub async fn delete(&self, ids: Vec) -> Result<(), String> { Self::validate_ids(&ids)?; diff --git a/src/storage/memory/base.rs b/src/storage/memory/base.rs index 5c96233..449af6b 100644 --- a/src/storage/memory/base.rs +++ b/src/storage/memory/base.rs @@ -1,16 +1,15 @@ -use crate::types::{Message, MessageState}; -use std::collections::HashMap; +use crate::types::{Message, MessageProcessing, MessageState}; pub struct BaseMemoryStorage { queue: Vec, - processing: HashMap, + processing: MessageProcessing, } impl BaseMemoryStorage { pub(crate) fn new() -> Self { BaseMemoryStorage { queue: Vec::new(), - processing: HashMap::new(), + processing: MessageProcessing::new(), } } @@ -32,6 +31,10 @@ impl BaseMemoryStorage { Ok(messages) } + pub(crate) async fn processing(&self) -> Result { + Ok(self.processing.clone()) + } + pub(crate) async fn delete(&mut self, ids: Vec) -> Result<(), String> { for id in ids { self.processing.remove(&id); @@ -65,6 +68,8 @@ impl BaseMemoryStorage { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; fn setup_storage() -> BaseMemoryStorage { @@ -74,7 +79,7 @@ mod tests { Message::new("Hello Solar System".to_string()), Message::new("Hello Universe".to_string()), ], - processing: HashMap::new(), + processing: MessageProcessing::new(), } } @@ -110,6 +115,26 @@ mod tests { assert_eq!(storage.processing.len(), 2); } + #[tokio::test] + async fn test_base_memory_storage_processing() { + let mut storage = setup_storage(); + + let processing_message = storage.processing().await.unwrap(); + assert_eq!(processing_message.len(), 0); + + let get_messages = storage + .get(2) + .await + .unwrap() + .into_iter() + .collect::>(); + let processing_message = storage.processing().await.unwrap(); + + let messages = processing_message.into_values().collect::>(); + assert_eq!(get_messages.len(), messages.len()); + assert_eq!(get_messages, messages); + } + #[tokio::test] async fn test_base_memory_storage_get_more_than_available() { let mut storage = setup_storage(); diff --git a/src/storage/memory/mod.rs b/src/storage/memory/mod.rs index 20afc09..7aa59d8 100644 --- a/src/storage/memory/mod.rs +++ b/src/storage/memory/mod.rs @@ -1,5 +1,5 @@ use crate::storage::traits::Storage; -use crate::types::Message; +use crate::types::{Message, MessageProcessing}; use async_trait::async_trait; use base::BaseMemoryStorage; use std::sync::Arc; @@ -37,6 +37,11 @@ impl Storage for MemoryStorage { storage.get(count).await } + async fn processing(&self) -> Result { + let storage = self.inner.lock().await; + storage.processing().await + } + async fn delete(&self, ids: Vec) -> Result<(), String> { let mut storage = self.inner.lock().await; storage.delete(ids).await diff --git a/src/storage/traits.rs b/src/storage/traits.rs index e9b929c..8a685b0 100644 --- a/src/storage/traits.rs +++ b/src/storage/traits.rs @@ -1,10 +1,11 @@ -use crate::types::Message; +use crate::types::{Message, MessageProcessing}; use async_trait::async_trait; #[async_trait] pub trait Storage: Send + Sync { async fn add(&self, msg: Message) -> Result<(), String>; async fn get(&self, count: usize) -> Result, String>; + async fn processing(&self) -> Result; async fn delete(&self, ids: Vec) -> Result<(), String>; async fn purge(&self) -> Result<(), String>; async fn retry(&self, ids: Vec) -> Result<(), String>; diff --git a/src/types/mod.rs b/src/types/mod.rs index e6100ad..89b1513 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; /// Represents the current state of a message in the queue -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize, PartialOrd, Eq, Ord, Hash)] pub enum MessageState { /// Message is available for processing Ready, @@ -14,7 +14,7 @@ pub enum MessageState { /// Represents a message in the queue system. /// Uses UUID v7 for time-ordered message IDs with embedded timestamps. -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Message { /// Unique identifier with embedded timestamp (UUID v7) pub id: Uuid, @@ -61,6 +61,8 @@ impl Message { } } +pub type MessageProcessing = std::collections::HashMap; + #[cfg(test)] mod tests { use super::*; diff --git a/tests/integration/messages/mod.rs b/tests/integration/messages/mod.rs index d02f485..37def6d 100644 --- a/tests/integration/messages/mod.rs +++ b/tests/integration/messages/mod.rs @@ -1,5 +1,6 @@ -pub mod add; +mod add; mod delete; -pub mod get; +mod get; +mod processing; mod purge; mod retry; diff --git a/tests/integration/messages/processing.rs b/tests/integration/messages/processing.rs new file mode 100644 index 0000000..77bc745 --- /dev/null +++ b/tests/integration/messages/processing.rs @@ -0,0 +1,32 @@ +use crate::common::{create_post_request, send_request, setup_test_app}; +use http::StatusCode; +use http_body_util::BodyExt; +use serde_json::json; +use std::collections::HashSet; +use tlq::types::Message; + +#[tokio::test] +async fn test_processing_messages() { + let mut app = setup_test_app().into_service(); + + for i in 1..=5 { + let post_request = create_post_request("/add", json!({"body": format!("message {}", i)})); + send_request(&mut app, post_request).await; + } + + let get_request = create_post_request("/get", json!({"count": 3})); + let response = send_request(&mut app, get_request).await; + assert_eq!(response.status(), StatusCode::OK); + + let get_body = response.into_body().collect().await.unwrap().to_bytes(); + let get_json = serde_json::from_slice::>(&get_body).unwrap(); + assert_eq!(get_json.len(), 3); + + let processing_request = create_post_request("/processing", json!(null)); + let response = send_request(&mut app, processing_request).await; + assert_eq!(response.status(), StatusCode::OK); + + let processing_body = response.into_body().collect().await.unwrap().to_bytes(); + let processing_json = serde_json::from_slice::>(&processing_body).unwrap(); + assert_eq!(get_json, processing_json); +}