Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@
//! in the pipeline. This module allows for the creation of custom data
//! processors that can be integrated into various stages of the pipeline.
//!
//! - **[`schema`]**: Defines transaction schemas, allowing for structured
//! parsing and validation of transaction data based on specified rules.
//! Supports complex nested instruction matching for comprehensive transaction
//! analysis.
//!
//! - **[`transaction`]**: Manages transaction data, including metadata
//! extraction and parsing. This module supports transaction validation and
Expand Down Expand Up @@ -96,7 +92,7 @@
//! TestProgramDecoder,
//! TestProgramAccountProcessor
//! )
//! .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
//! .transaction(TestProgramTransactionProcessor)
//! .account_deletions(TestProgramAccountDeletionProcessor)
//! .build()?
//! .run()
Expand Down Expand Up @@ -140,7 +136,6 @@ pub mod pipeline;
#[cfg(feature = "postgres")]
pub mod postgres;
pub mod processor;
pub mod schema;
pub mod transaction;
pub mod transformers;

Expand Down
66 changes: 15 additions & 51 deletions crates/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ use {
AccountDecoder, AccountMetadata, AccountPipe, AccountPipes, AccountProcessorInputType,
},
account_deletion::{AccountDeletionPipe, AccountDeletionPipes},
collection::InstructionDecoderCollection,
datasource::{AccountDeletion, Datasource, Update},
error::CarbonResult,
instruction::{
Expand All @@ -68,12 +67,10 @@ use {
},
metrics::{Metrics, MetricsCollection},
processor::Processor,
schema::TransactionSchema,
transaction::{TransactionPipe, TransactionPipes, TransactionProcessorInputType},
transformers,
},
core::time,
serde::de::DeserializeOwned,
std::{convert::TryInto, sync::Arc, time::Instant},
tokio_util::sync::CancellationToken,
};
Expand Down Expand Up @@ -191,7 +188,7 @@ pub const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 1_000;
/// TestProgramDecoder,
/// TestProgramAccountProcessor
/// )
/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
/// .transaction(TestProgramTransactionProcessor)
/// .account_deletions(TestProgramAccountDeletionProcessor)
/// .channel_buffer_size(1000)
/// .build()?
Expand Down Expand Up @@ -1167,47 +1164,27 @@ impl PipelineBuilder {

/// Adds a transaction pipe for processing full transaction data.
///
/// This method requires a transaction schema for decoding and a `Processor`
/// to handle the processed transaction data.
/// This method creates a transaction pipe that processes the full transaction data.
///
/// # Parameters
///
/// - `schema`: A `TransactionSchema` used to match and interpret
/// transaction data.
/// - `processor`: A `Processor` that processes the decoded transaction
/// data.
/// - `processor`: A `Processor` that processes the full transaction data.
///
/// # Example
///
/// ```ignore
/// use carbon_core::pipeline::PipelineBuilder;
///
/// let builder = PipelineBuilder::new()
/// .transaction(MY_SCHEMA.clone(), MyTransactionProcessor);
/// .transaction(MyTransactionProcessor);
/// ```
pub fn transaction<T, U>(
pub fn transaction(
mut self,
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
+ Send
+ Sync
+ 'static,
schema: Option<TransactionSchema<T>>,
) -> Self
where
T: InstructionDecoderCollection + 'static,
U: DeserializeOwned + Send + Sync + 'static,
{
log::trace!(
"transaction(self, schema: {:?}, processor: {:?})",
stringify!(schema),
stringify!(processor)
);
processor: impl Processor<InputType = TransactionProcessorInputType> + Send + Sync + 'static,
) -> Self {
log::trace!("transaction(self, processor: {:?})", stringify!(processor));
self.transaction_pipes
.push(Box::new(TransactionPipe::<T, U>::new(
schema,
processor,
vec![],
)));
.push(Box::new(TransactionPipe::new(processor, vec![])));
self
}

Expand All @@ -1221,8 +1198,6 @@ impl PipelineBuilder {
/// # Parameters
///
/// - `processor`: A `Processor` that processes the decoded transaction data
/// - `schema`: A `TransactionSchema` used to match and interpret
/// transaction data
/// - `filters`: A collection of filters that determine which transactions
/// should be processed
///
Expand All @@ -1240,31 +1215,20 @@ impl PipelineBuilder {
/// let filters = vec![Box::new(filter) as Box<dyn carbon_core::filter::Filter>];
///
/// let builder = PipelineBuilder::new()
/// .transaction_with_filters(MyTransactionProcessor, MY_SCHEMA.clone(), filters);
/// .transaction_with_filters(MyTransactionProcessor, filters);
/// ```
pub fn transaction_with_filters<T, U>(
mut self,
processor: impl Processor<InputType = TransactionProcessorInputType<T, U>>
+ Send
+ Sync
+ 'static,
schema: Option<TransactionSchema<T>>,
processor: impl Processor<InputType = TransactionProcessorInputType> + Send + Sync + 'static,
filters: Vec<Box<dyn Filter + Send + Sync + 'static>>,
) -> Self
where
T: InstructionDecoderCollection + 'static,
U: DeserializeOwned + Send + Sync + 'static,
{
) -> Self {
log::trace!(
"transaction_with_filters(self, schema: {:?}, processor: {:?}, filters: {:?})",
stringify!(schema),
"transaction_with_filters(self, processor: {:?}, filters: {:?})",
stringify!(processor),
stringify!(filters)
);
self.transaction_pipes
.push(Box::new(TransactionPipe::<T, U>::new(
schema, processor, filters,
)));
.push(Box::new(TransactionPipe::new(processor, filters)));
self
}

Expand Down Expand Up @@ -1396,7 +1360,7 @@ impl PipelineBuilder {
/// TestProgramDecoder,
/// TestProgramAccountProcessor
/// )
/// .transaction(TEST_SCHEMA.clone(), TestProgramTransactionProcessor)
/// .transaction(TestProgramTransactionProcessor)
/// .account_deletions(TestProgramAccountDeletionProcessor)
/// .channel_buffer_size(1000)
/// .build()?;
Expand Down
Loading
Loading