Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ yarn-debug.log*
yarn-error.log*
lerna-debug.log*
.pnpm-debug.log*
examples/jupiter-swap-postgres/logs

# Runtime data
pids
Expand Down
57 changes: 57 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 75 additions & 39 deletions datasources/rpc-block-crawler-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,19 @@ const BLOCK_INTERVAL: Duration = Duration::from_millis(100);
/// It uses a channel to send blocks to the task processor.
pub struct RpcBlockCrawler {
pub rpc_url: String,
pub start_slot: u64,
pub start_slot: Option<u64>,
pub end_slot: Option<u64>,
pub block_interval: Duration,
pub block_config: RpcBlockConfig,
pub max_concurrent_requests: usize,
pub channel_buffer_size: usize,
pub request_throttle: Option<Duration>,
}

impl RpcBlockCrawler {
pub fn new(
rpc_url: String,
start_slot: u64,
start_slot: Option<u64>,
end_slot: Option<u64>,
block_interval: Option<Duration>,
block_config: RpcBlockConfig,
Expand All @@ -59,6 +60,7 @@ impl RpcBlockCrawler {
block_interval: block_interval.unwrap_or(BLOCK_INTERVAL),
max_concurrent_requests: max_concurrent_requests.unwrap_or(MAX_CONCURRENT_REQUESTS),
channel_buffer_size: channel_buffer_size.unwrap_or(CHANNEL_BUFFER_SIZE),
request_throttle: None,
}
}
}
Expand All @@ -80,16 +82,29 @@ impl Datasource for RpcBlockCrawler {
));
let (block_sender, block_receiver) = mpsc::channel(self.channel_buffer_size);

let start_slot = match self.start_slot {
Some(start_slot) => start_slot,
None => {
log::info!("RpcBlockCrawler start_slot not provided; defaulting to latest slot");
rpc_client.get_slot().await.map_err(|err| {
carbon_core::error::Error::FailedToConsumeDatasource(format!(
"Failed to determine latest slot: {err}"
))
})?
}
};

let block_fetcher = block_fetcher(
rpc_client,
self.start_slot,
start_slot,
self.end_slot,
self.block_interval,
self.block_config,
block_sender,
self.max_concurrent_requests,
cancellation_token.clone(),
metrics.clone(),
self.request_throttle,
);

let task_processor = task_processor(
Expand Down Expand Up @@ -126,6 +141,7 @@ fn block_fetcher(
max_concurrent_requests: usize,
cancellation_token: CancellationToken,
metrics: Arc<MetricsCollection>,
request_throttle: Option<Duration>,
) -> JoinHandle<()> {
let rpc_client_clone = rpc_client.clone();
tokio::spawn(async move {
Expand Down Expand Up @@ -174,55 +190,73 @@ fn block_fetcher(
}
};

let retry_delay = block_interval;
fetch_stream
.map(|slot| {
let rpc_client = Arc::clone(&rpc_client);
let metrics = metrics.clone();
let block_config = block_config.clone();
let request_throttle = request_throttle;

async move {
let start = Instant::now();
match rpc_client.get_block_with_config(slot, block_config).await {
Ok(block) => {
let time_taken = start.elapsed().as_millis();
metrics
.record_histogram(
"block_crawler_blocks_fetch_times_milliseconds",
time_taken as f64,
)
.await
.unwrap_or_else(|value| {
log::error!("Error recording metric: {}", value)
});

metrics
.increment_counter("block_crawler_blocks_fetched", 1)
.await
.unwrap_or_else(|value| {
log::error!("Error recording metric: {}", value)
});

Some((slot, block))
loop {
if let Some(throttle) = request_throttle {
tokio::time::sleep(throttle).await;
}
Err(e) => {
// https://support.quicknode.com/hc/en-us/articles/16459608696721-Solana-RPC-Error-Code-Reference
// solana skippable errors
// -32004, // Block not available for slot x
// -32007, // Slot {} was skipped, or missing due to ledger jump to recent snapshot
// -32009, // Slot {} was skipped, or missing in long-term storage
if e.to_string().contains("-32009")
|| e.to_string().contains("-32004")
|| e.to_string().contains("-32007")
{
let start = Instant::now();
match rpc_client
.get_block_with_config(slot, block_config.clone())
.await
{
Ok(block) => {
let time_taken = start.elapsed().as_millis();
metrics
.increment_counter("block_crawler_blocks_skipped", 1)
.record_histogram(
"block_crawler_blocks_fetch_times_milliseconds",
time_taken as f64,
)
.await
.unwrap_or_else(|value| {
log::error!("Error recording metric: {}", value)
});
} else {
log::error!("Error fetching block at slot {}: {:?}", slot, e);

metrics
.increment_counter("block_crawler_blocks_fetched", 1)
.await
.unwrap_or_else(|value| {
log::error!("Error recording metric: {}", value)
});

break Some((slot, block));
}
Err(e) => {
let error_string = e.to_string();
// https://support.quicknode.com/hc/en-us/articles/16459608696721-Solana-RPC-Error-Code-Reference
// solana skippable errors
// -32004, // Block not available for slot x
// -32007, // Slot {} was skipped, or missing due to ledger jump to recent snapshot
// -32009, // Slot {} was skipped, or missing in long-term storage
if error_string.contains("-32009")
|| error_string.contains("-32004")
|| error_string.contains("-32007")
|| error_string.contains("429")
{
log::debug!(
"Block at slot {} not ready yet ({}); retrying...",
slot,
error_string
);
tokio::time::sleep(retry_delay).await;
continue;
} else {
log::error!(
"Error fetching block at slot {}: {:?}",
slot,
e
);
break None;
}
}
None
}
}
}
Expand Down Expand Up @@ -381,6 +415,7 @@ mod tests {
1,
cancellation_token.clone(),
Arc::new(MetricsCollection::new(vec![])),
None,
);

// Create a task to receive blocks
Expand Down Expand Up @@ -462,6 +497,7 @@ mod tests {
2,
cancellation_token.clone(),
Arc::new(MetricsCollection::new(vec![])),
None,
);

// Create a task to receive blocks
Expand Down
56 changes: 55 additions & 1 deletion datasources/rpc-transaction-crawler-datasource/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ use {
},
std::{collections::HashSet, str::FromStr, sync::Arc, time::Duration},
tokio::{
sync::mpsc::{self, Receiver, Sender},
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
},
task::JoinHandle,
time::Instant,
},
Expand Down Expand Up @@ -98,6 +101,7 @@ pub struct ConnectionConfig {
pub max_transaction_channel_size: Option<usize>,
pub retry_config: RetryConfig,
pub blocking_send: bool,
pub rate_limiter: Option<RateLimiter>,
}

impl ConnectionConfig {
Expand All @@ -118,6 +122,7 @@ impl ConnectionConfig {
max_signature_channel_size,
max_transaction_channel_size,
blocking_send,
rate_limiter: None,
}
}

Expand All @@ -130,6 +135,47 @@ impl ConnectionConfig {
max_signature_channel_size: None,
max_transaction_channel_size: None,
blocking_send: false,
rate_limiter: None,
}
}

pub fn with_rate_limit(mut self, requests_per_second: u32) -> Self {
if requests_per_second == 0 {
return self;
}
self.rate_limiter = Some(RateLimiter::new(requests_per_second));
self
}
}

#[derive(Debug, Clone)]
pub struct RateLimiter {
interval: Duration,
next_allowed: Arc<Mutex<Instant>>,
}

impl RateLimiter {
pub fn new(requests_per_second: u32) -> Self {
assert!(requests_per_second > 0, "requests_per_second must be > 0");
Self {
interval: Duration::from_secs_f64(1.0 / requests_per_second as f64),
next_allowed: Arc::new(Mutex::new(Instant::now())),
}
}

pub async fn acquire(&self) {
loop {
let wait_duration = {
let mut guard = self.next_allowed.lock().await;
let now = Instant::now();
if *guard <= now {
*guard = now + self.interval;
return;
}
*guard - now
};

tokio::time::sleep(wait_duration).await;
}
}
}
Expand Down Expand Up @@ -266,6 +312,10 @@ fn signature_fetcher(
let mut backoff = connection_config.retry_config.initial_backoff_ms;

loop {
if let Some(rate_limiter) = connection_config.rate_limiter.as_ref() {
rate_limiter.acquire().await;
}

match rpc_client.get_signatures_for_address_with_config(
&account,
GetConfirmedSignaturesForAddress2Config {
Expand Down Expand Up @@ -395,6 +445,10 @@ fn transaction_fetcher(
let mut backoff = connection_config.retry_config.initial_backoff_ms;

loop {
if let Some(rate_limiter) = connection_config.rate_limiter.as_ref() {
rate_limiter.acquire().await;
}

match rpc_client.get_transaction_with_config(
&signature,
RpcTransactionConfig {
Expand Down
Loading
Loading