Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["crates/*"]
resolver = "2"

[workspace.package]
version = "0.10.3"
version = "0.10.4"
edition = "2024"
rust-version = "1.88"
authors = ["init4"]
Expand Down
78 changes: 50 additions & 28 deletions crates/blobber/src/cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{BlobFetcherError, Blobs, FetchResult};
use crate::{BlobberError, BlobberResult, Blobs, FetchResult};
use alloy::consensus::{SidecarCoder, SimpleCoder, Transaction as _};
use alloy::eips::eip7691::MAX_BLOBS_PER_BLOCK_ELECTRA;
use alloy::eips::merge::EPOCH_SLOTS;
Expand All @@ -13,7 +13,7 @@ use std::{
time::Duration,
};
use tokio::sync::{mpsc, oneshot};
use tracing::{Instrument, debug, error, info, instrument};
use tracing::{Instrument, debug_span, error, info, instrument, trace};

const BLOB_CACHE_SIZE: u32 = (MAX_BLOBS_PER_BLOCK_ELECTRA * EPOCH_SLOTS) as u32;
const CACHE_REQUEST_CHANNEL_SIZE: usize = (MAX_BLOBS_PER_BLOCK_ELECTRA * 2) as usize;
Expand Down Expand Up @@ -54,7 +54,7 @@ impl CacheHandle {
slot: usize,
tx_hash: B256,
version_hashes: Vec<B256>,
) -> FetchResult<Blobs> {
) -> BlobberResult<Blobs> {
let (resp, receiver) = oneshot::channel();

self.send(CacheInst::Retrieve {
Expand All @@ -66,7 +66,7 @@ impl CacheHandle {
})
.await;

receiver.await.map_err(|_| BlobFetcherError::missing_sidecar(tx_hash))
receiver.await.map_err(|_| BlobberError::missing_sidecar(tx_hash))
}

/// Fetch the blobs using [`Self::fetch_blobs`] and decode them to get the
Expand All @@ -76,24 +76,24 @@ impl CacheHandle {
slot: usize,
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
mut coder: C,
) -> FetchResult<Bytes> {
) -> BlobberResult<Bytes> {
let tx_hash = extract.tx_hash();
let versioned_hashes = extract
.tx
.as_eip4844()
.ok_or_else(BlobFetcherError::non_4844_transaction)?
.ok_or_else(BlobberError::non_4844_transaction)?
.blob_versioned_hashes()
.expect("tx is eip4844");

let blobs = self.fetch_blobs(slot, tx_hash, versioned_hashes.to_owned()).await?;

coder
.decode_all(blobs.as_ref())
.ok_or_else(BlobFetcherError::blob_decode_error)?
.ok_or_else(BlobberError::blob_decode_error)?
.into_iter()
.find(|data| keccak256(data) == extract.block_data_hash())
.map(Into::into)
.ok_or_else(|| BlobFetcherError::block_data_not_found(tx_hash))
.ok_or_else(|| BlobberError::block_data_not_found(tx_hash))
}

/// Fetch the blobs using [`Self::fetch_blobs`] and decode them using
Expand All @@ -102,12 +102,21 @@ impl CacheHandle {
&self,
slot: usize,
extract: &ExtractedEvent<'_, Receipt, BlockSubmitted>,
) -> FetchResult<Bytes> {
) -> BlobberResult<Bytes> {
self.fetch_and_decode_with_coder(slot, extract, SimpleCoder::default()).await
}

/// Fetch the blobs, decode them using the provided coder, and construct a
/// Zenith block from the header and data.
///
/// # Returns
///
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
/// decoded.
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
/// decoded (e.g., due to a malformatted blob).
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
/// blobs.
pub async fn signet_block_with_coder<C: SidecarCoder>(
&self,
host_block_number: u64,
Expand All @@ -116,13 +125,28 @@ impl CacheHandle {
coder: C,
) -> FetchResult<ZenithBlock> {
let header = extract.ru_header(host_block_number);
self.fetch_and_decode_with_coder(slot, extract, coder)
.await
.map(|buf| ZenithBlock::from_header_and_data(header, buf))
let block_data = match self.fetch_and_decode_with_coder(slot, extract, coder).await {
Ok(buf) => buf,
Err(BlobberError::Decode(_)) => {
trace!("Failed to decode block data");
Bytes::default()
}
Err(BlobberError::Fetch(err)) => return Err(err),
};
Ok(ZenithBlock::from_header_and_data(header, block_data))
}

/// Fetch the blobs, decode them using [`SimpleCoder`], and construct a
/// Zenith block from the header and data.
///
/// # Returns
///
/// - `Ok(ZenithBlock)` if the block was successfully fetched and
/// decoded.
/// - `Ok(ZenithBlock)` with an EMPTY BLOCK if the block_data could not be
/// decoded (e.g., due to a malformatted blob).
/// - `Err(FetchError)` if there was an unrecoverable error fetching the
/// blobs.
pub async fn signet_block(
&self,
host_block_number: u64,
Expand Down Expand Up @@ -159,7 +183,7 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
slot: usize,
tx_hash: B256,
versioned_hashes: Vec<B256>,
) -> FetchResult<Blobs> {
) -> BlobberResult<Blobs> {
// Cache hit
if let Some(blobs) = self.cache.lock().unwrap().get(&(slot, tx_hash)) {
info!(target: "signet_blobber::BlobCacher", "Cache hit");
Expand All @@ -169,23 +193,21 @@ impl<Pool: TransactionPool + 'static> BlobCacher<Pool> {
// Cache miss, use the fetcher to retrieve blobs
// Retry fetching blobs up to `FETCH_RETRIES` times
for attempt in 1..=FETCH_RETRIES {
let blobs = self.fetcher.fetch_blobs(slot, tx_hash, &versioned_hashes).await;

match blobs {
Ok(blobs) => {
self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone());
return Ok(blobs);
}
Err(BlobFetcherError::Ignorable(e)) => {
debug!(target: "signet_blobber::BlobCacher", attempt, %e, "Blob fetch attempt failed.");
tokio::time::sleep(BETWEEN_RETRIES).await;
continue;
}
Err(e) => return Err(e), // unrecoverable error
}
let Ok(blobs) = self
.fetcher
.fetch_blobs(slot, tx_hash, &versioned_hashes)
.instrument(debug_span!("fetch_blobs_loop", attempt))
.await
else {
tokio::time::sleep(BETWEEN_RETRIES).await;
continue;
};

self.cache.lock().unwrap().insert((slot, tx_hash), blobs.clone());
return Ok(blobs);
}
error!(target: "signet_blobber::BlobCacher", "All fetch attempts failed");
Err(BlobFetcherError::missing_sidecar(tx_hash))
Err(BlobberError::missing_sidecar(tx_hash))
}

/// Processes the cache instructions.
Expand Down
64 changes: 35 additions & 29 deletions crates/blobber/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use alloy::{eips::eip2718::Eip2718Error, primitives::B256};
use reth::transaction_pool::BlobStoreError;

/// Fetch Result
pub type FetchResult<T, E = BlobFetcherError> = std::result::Result<T, E>;
/// Result using [`BlobFetcherError`] as the default error type.
pub type BlobberResult<T, E = BlobberError> = std::result::Result<T, E>;

/// Result using [`FetchError`] as the default error type.
pub type FetchResult<T> = BlobberResult<T, FetchError>;

/// Result using [`DecodeError`] as the default error type.
pub type DecodeResult<T> = BlobberResult<T, DecodeError>;

/// Unrecoverable blob fetching errors. These result in the node shutting
/// down. They occur when the blobstore is down or the sidecar is unretrievable.
#[derive(Debug, thiserror::Error)]
pub enum UnrecoverableBlobError {
pub enum FetchError {
/// Reqwest error
#[error(transparent)]
Reqwest(#[from] reqwest::Error),
Expand All @@ -30,7 +36,7 @@ pub enum UnrecoverableBlobError {

/// Ignorable blob fetching errors. These result in the block being skipped.
#[derive(Debug, thiserror::Error, Copy, Clone)]
pub enum IgnorableBlobError {
pub enum DecodeError {
/// Incorrect transaction type error
#[error("Non-4844 transaction")]
Non4844Transaction,
Expand All @@ -50,86 +56,86 @@ pub enum IgnorableBlobError {

/// Blob fetching errors
#[derive(Debug, thiserror::Error)]
pub enum BlobFetcherError {
pub enum BlobberError {
/// Unrecoverable blob fetching error
#[error(transparent)]
Unrecoverable(#[from] UnrecoverableBlobError),
Fetch(#[from] FetchError),
/// Ignorable blob fetching error
#[error(transparent)]
Ignorable(#[from] IgnorableBlobError),
Decode(#[from] DecodeError),
}

impl BlobFetcherError {
impl BlobberError {
/// Returns true if the error is ignorable
pub const fn is_ignorable(&self) -> bool {
matches!(self, Self::Ignorable(_))
pub const fn is_decode(&self) -> bool {
matches!(self, Self::Decode(_))
}

/// Returns true if the error is unrecoverable
pub const fn is_unrecoverable(&self) -> bool {
matches!(self, Self::Unrecoverable(_))
pub const fn is_fetch(&self) -> bool {
matches!(self, Self::Fetch(_))
}

/// Non-4844 transaction error
pub fn non_4844_transaction() -> Self {
IgnorableBlobError::Non4844Transaction.into()
DecodeError::Non4844Transaction.into()
}

/// Blob decode error
pub fn blob_decode_error() -> Self {
IgnorableBlobError::BlobDecodeError.into()
DecodeError::BlobDecodeError.into()
}

/// Blob decode error
pub fn block_decode_error(err: Eip2718Error) -> Self {
IgnorableBlobError::BlockDecodeError(err).into()
DecodeError::BlockDecodeError(err).into()
}

/// Blob decoded, but expected hash not found
pub fn block_data_not_found(tx: B256) -> Self {
IgnorableBlobError::BlockDataNotFound(tx).into()
DecodeError::BlockDataNotFound(tx).into()
}

/// Missing sidecar error
pub fn missing_sidecar(tx: B256) -> Self {
UnrecoverableBlobError::MissingSidecar(tx).into()
FetchError::MissingSidecar(tx).into()
}

/// Blob store error
pub fn blob_store(err: BlobStoreError) -> Self {
UnrecoverableBlobError::BlobStore(err).into()
FetchError::BlobStore(err).into()
}
}

impl From<BlobStoreError> for UnrecoverableBlobError {
impl From<BlobStoreError> for FetchError {
fn from(err: BlobStoreError) -> Self {
match err {
BlobStoreError::MissingSidecar(tx) => UnrecoverableBlobError::MissingSidecar(tx),
_ => UnrecoverableBlobError::BlobStore(err),
BlobStoreError::MissingSidecar(tx) => FetchError::MissingSidecar(tx),
_ => FetchError::BlobStore(err),
}
}
}

impl From<BlobStoreError> for BlobFetcherError {
impl From<BlobStoreError> for BlobberError {
fn from(err: BlobStoreError) -> Self {
Self::Unrecoverable(err.into())
Self::Fetch(err.into())
}
}

impl From<reqwest::Error> for BlobFetcherError {
impl From<reqwest::Error> for BlobberError {
fn from(err: reqwest::Error) -> Self {
Self::Unrecoverable(err.into())
Self::Fetch(err.into())
}
}

impl From<Eip2718Error> for BlobFetcherError {
impl From<Eip2718Error> for BlobberError {
fn from(err: Eip2718Error) -> Self {
Self::Ignorable(err.into())
Self::Decode(err.into())
}
}

impl From<url::ParseError> for BlobFetcherError {
impl From<url::ParseError> for BlobberError {
fn from(err: url::ParseError) -> Self {
Self::Unrecoverable(UnrecoverableBlobError::UrlParse(err))
Self::Fetch(FetchError::UrlParse(err))
}
}
Loading
Loading