diff --git a/.gitignore b/.gitignore index 0a24be5..4008828 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ ex/ .idea +.env diff --git a/examples/call_decode_output/src/main.rs b/examples/call_decode_output/src/main.rs index b249ec0..789fe15 100644 --- a/examples/call_decode_output/src/main.rs +++ b/examples/call_decode_output/src/main.rs @@ -1,9 +1,9 @@ use alloy_json_abi::Function; use anyhow::Context; use hypersync_client::{ + arrow_reader::TraceReader, net_types::{Query, TraceField, TraceFilter}, - simple_types::Trace, - ArrowResponseData, CallDecoder, Client, StreamConfig, + CallDecoder, Client, StreamConfig, }; const BALANCE_OF_SIGNATURE: &str = @@ -42,33 +42,26 @@ async fn main() -> anyhow::Result<()> { let mut rx = client.clone().stream_arrow(query, config).await?; - fn convert_traces(arrow_response_data: ArrowResponseData) -> Vec { - arrow_response_data - .traces - .iter() - .flat_map(Trace::from_arrow) - .collect() - } - while let Some(result) = rx.recv().await { match result { Ok(response) => { println!("Received response"); - let traces = convert_traces(response.data); - for trace in traces { - if let (Some(input), Some(output)) = (trace.input, trace.output) { - if let Some(args) = decoder - .decode_input(&input) - .context("Failed to decode input")? - { - let address = args[0].as_address().unwrap(); - if let Some(results) = decoder - .decode_output(&output, BALANCE_OF_SIGNATURE) - .context("Failed to decode output")? + for batch in response.data.traces { + for trace in TraceReader::iter(&batch) { + if let (Some(input), Some(output)) = (trace.input()?, trace.output()?) { + if let Some(args) = decoder + .decode_input(&input) + .context("Failed to decode input")? { - if !results.is_empty() { - let (balance, _) = results[0].as_uint().unwrap(); - println!("ADDRESS {address} : {balance} DAI"); + let address = args[0].as_address().unwrap(); + if let Some(results) = decoder + .decode_output(&output, BALANCE_OF_SIGNATURE) + .context("Failed to decode output")? + { + if !results.is_empty() { + let (balance, _) = results[0].as_uint().unwrap(); + println!("ADDRESS {address} : {balance} DAI"); + } } } } diff --git a/hypersync-client/Cargo.toml b/hypersync-client/Cargo.toml index 33e8cb7..77e4dbd 100644 --- a/hypersync-client/Cargo.toml +++ b/hypersync-client/Cargo.toml @@ -45,6 +45,7 @@ hypersync-format = { path = "../hypersync-format", version = "0.6.0" } hypersync-schema = { path = "../hypersync-schema", version = "0.3.1" } reqwest-eventsource = "0.6" uuid = { version = "1", features = ["v4"] } +thiserror = "2.0.17" [dependencies.reqwest] version = "0.12" @@ -55,3 +56,4 @@ features = ["json", "rustls-tls", "stream"] maplit = "1" hex-literal = "0.4" env_logger = "0.11" +dotenvy = "0.15.7" diff --git a/hypersync-client/src/arrow_reader.rs b/hypersync-client/src/arrow_reader.rs new file mode 100644 index 0000000..74f0791 --- /dev/null +++ b/hypersync-client/src/arrow_reader.rs @@ -0,0 +1,1652 @@ +//! Reader types for reading Arrow record batch data as native Rust types. +//! +//! This module provides zero-copy readers that access Arrow columnar data directly +//! without copying or allocating new memory for individual field access. + +use anyhow::Context; +use arrow::{ + array::{ + Array, ArrayAccessor, BinaryArray, BooleanArray, RecordBatch, StringArray, UInt64Array, + UInt8Array, + }, + datatypes::DataType, +}; +use hypersync_format::{ + AccessList, Address, Authorization, BlockNumber, Data, FixedSizeData, Hash, LogIndex, Quantity, + TransactionIndex, TransactionStatus, TransactionType, Withdrawal, +}; +use hypersync_net_types::{BlockField, LogField, TraceField, TransactionField}; + +type ColResult = std::result::Result; + +/// Error that occurs when trying to access a column in Arrow data. +#[derive(Debug, thiserror::Error)] +#[error("column {col_name} {err}")] +pub struct ColumnError { + /// The name of the column that caused the error. + pub col_name: &'static str, + /// The specific type of column error that occurred. + pub err: ColumnErrorType, +} + +impl ColumnError { + fn not_found(col_name: &'static str) -> Self { + Self { + col_name, + err: ColumnErrorType::NotFound, + } + } + + fn wrong_type( + col_name: &'static str, + expected_type: &'static str, + actual_type: DataType, + ) -> Self { + Self { + col_name, + err: ColumnErrorType::WrongType { + expected_type, + actual_type, + }, + } + } +} + +/// The specific type of error that can occur when accessing a column. +#[derive(Debug, thiserror::Error)] +pub enum ColumnErrorType { + /// The column was not found in the Arrow schema. + #[error("not found")] + NotFound, + /// The column exists but has a different type than expected. + #[error("expected to be of type {expected_type} but found {actual_type}")] + WrongType { + /// The expected Arrow data type. + expected_type: &'static str, + /// The actual Arrow data type found in the schema. + actual_type: DataType, + }, +} + +fn column_as<'a, T: 'static>(batch: &'a RecordBatch, col_name: &'static str) -> ColResult<&'a T> { + match batch.column_by_name(col_name) { + None => Err(ColumnError::not_found(col_name)), + Some(c) => { + let Some(val) = c.as_any().downcast_ref::() else { + let expected_type = std::any::type_name::(); + let actual_type = c.data_type().clone(); + return Err(ColumnError::wrong_type( + col_name, + expected_type, + actual_type, + )); + }; + Ok(val) + } + } +} + +/// Error that can occur when reading data from Arrow columns. +#[derive(Debug, thiserror::Error)] +pub enum ReadError { + /// A value was expected to be non-null but was null. + #[error("value was unexpectedly null")] + UnexpectedNull, + /// An error occurred while accessing a column. + #[error(transparent)] + ColumnError(#[from] ColumnError), + /// An error occurred during data type conversion. + #[error("{0:?}")] + ConversionError(anyhow::Error), +} + +/// A reader for accessing individual row data from an Arrow RecordBatch. +/// +/// This struct provides zero-copy access to columnar data in Arrow format, +/// allowing efficient reading of specific fields from a single row. +pub struct ArrowRowReader<'a> { + batch: &'a RecordBatch, + row_idx: usize, +} + +impl<'a> ArrowRowReader<'a> { + /// Safely create a new reader for the given batch at row index and check + /// that row_idx is within the bounds of the batch. + fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result { + let len = if let Some(first_column) = batch.columns().first() { + first_column.len() + } else { + 0 + }; + if row_idx >= len { + anyhow::bail!("row index out of bounds"); + } + + Ok(Self { batch, row_idx }) + } + + /// Read and convert the value at col_name that could be null + fn get_nullable(&self, col_name: &'static str) -> Result, ReadError> + where + Col: 'static, + &'a Col: ArrayAccessor, + + <&'a Col as ArrayAccessor>::Item: TryInto, + <<&'a Col as ArrayAccessor>::Item as TryInto>::Error: + std::error::Error + Send + Sync + 'static, + { + let arr = column_as::(self.batch, col_name)?; + + if arr.is_valid(self.row_idx) { + let value = arr.value(self.row_idx); + let converted: T = value + .try_into() + .map_err(|e| ReadError::ConversionError(anyhow::Error::new(e)))?; + Ok(Some(converted)) + } else { + Ok(None) + } + } + + /// Read and convert the value at col_name where it should not be null + fn get(&self, col_name: &'static str) -> Result + where + Col: 'static, + &'a Col: ArrayAccessor, + + <&'a Col as ArrayAccessor>::Item: TryInto, + <<&'a Col as ArrayAccessor>::Item as TryInto>::Error: + std::error::Error + Send + Sync + 'static, + { + match self.get_nullable::(col_name) { + Ok(Some(val)) => Ok(val), + Ok(None) => Err(ReadError::UnexpectedNull), + Err(e) => Err(e), + } + } +} + +/// Iterator over log rows in an RecordBatch. +pub struct ArrowRowIterator<'a, R> { + batch: &'a RecordBatch, + current_idx: usize, + len: usize, + phantom: std::marker::PhantomData, +} + +impl<'a, R: From>> ArrowRowIterator<'a, R> { + /// Create a new iterator for the given batch. + pub fn new(batch: &'a RecordBatch) -> Self { + let len = if let Some(first_column) = batch.columns().first() { + first_column.len() + } else { + 0 + }; + Self { + batch, + current_idx: 0, + len, + phantom: std::marker::PhantomData, + } + } +} + +impl<'a, R: From>> Iterator for ArrowRowIterator<'a, R> { + type Item = R; + + fn next(&mut self) -> Option { + if self.current_idx < self.len { + let reader = ArrowRowReader { + batch: self.batch, + row_idx: self.current_idx, + }; + self.current_idx += 1; + Some(reader.into()) + } else { + None + } + } + + fn size_hint(&self) -> (usize, Option) { + let remaining = self.len - self.current_idx; + (remaining, Some(remaining)) + } +} + +impl<'a, R: From>> ExactSizeIterator for ArrowRowIterator<'a, R> { + fn len(&self) -> usize { + self.len - self.current_idx + } +} + +/// Reader for log data from Arrow batches. +/// +/// Provides efficient access to log fields without copying data from the underlying +/// Arrow columnar format. Each reader is bound to a specific row in the batch. +pub struct LogReader<'a> { + inner: ArrowRowReader<'a>, +} + +impl<'a> From> for LogReader<'a> { + fn from(inner: ArrowRowReader<'a>) -> Self { + Self { inner } + } +} + +/// Iterator over log rows in an RecordBatch. +pub type LogIterator<'a> = ArrowRowIterator<'a, LogReader<'a>>; + +impl<'a> LogReader<'a> { + /// Safely create a new reader for the given batch at row index and check + /// that row_idx is within the bounds of the batch. + pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result { + let inner = ArrowRowReader::new(batch, row_idx)?; + Ok(Self { inner }) + } + /// Create an iterator over all rows in the batch. + pub fn iter(batch: &'a RecordBatch) -> LogIterator<'a> { + LogIterator::new(batch) + } + + /// The boolean value indicating if the event was removed from the blockchain due + /// to a chain reorganization. True if the log was removed. False if it is a valid log. + pub fn removed(&self) -> Result, ReadError> { + self.inner + .get_nullable::(LogField::Removed.as_ref()) + } + + /// The integer identifying the index of the event within the block's list of events. + pub fn log_index(&self) -> Result { + self.inner + .get::(LogField::LogIndex.as_ref()) + } + + /// The integer index of the transaction within the block's list of transactions. + pub fn transaction_index(&self) -> Result { + self.inner + .get::(LogField::TransactionIndex.as_ref()) + } + + /// The hash of the transaction that triggered the event. + pub fn transaction_hash(&self) -> Result { + self.inner + .get::(LogField::TransactionHash.as_ref()) + } + + /// The hash of the block in which the event was included. + pub fn block_hash(&self) -> Result { + self.inner + .get::(LogField::BlockHash.as_ref()) + } + + /// The block number in which the event was included. + pub fn block_number(&self) -> Result { + self.inner + .get::(LogField::BlockNumber.as_ref()) + } + + /// The contract address from which the event originated. + pub fn address(&self) -> Result { + self.inner + .get::(LogField::Address.as_ref()) + } + + /// The first topic of the event (topic0). + pub fn topic0(&self) -> Result>, ReadError> { + self.inner + .get_nullable::>(LogField::Topic0.as_ref()) + } + + /// The second topic of the event (topic1). + pub fn topic1(&self) -> Result>, ReadError> { + self.inner + .get_nullable::>(LogField::Topic1.as_ref()) + } + + /// The third topic of the event (topic2). + pub fn topic2(&self) -> Result>, ReadError> { + self.inner + .get_nullable::>(LogField::Topic2.as_ref()) + } + + /// The fourth topic of the event (topic3). + pub fn topic3(&self) -> Result>, ReadError> { + self.inner + .get_nullable::>(LogField::Topic3.as_ref()) + } + + /// The non-indexed data that was emitted along with the event. + pub fn data(&self) -> Result { + self.inner.get::(LogField::Data.as_ref()) + } +} + +/// Reader for block data from Arrow batches. +/// +/// Provides efficient access to block fields without copying data from the underlying +/// Arrow columnar format. Each reader is bound to a specific row in the batch. +pub struct BlockReader<'a> { + inner: ArrowRowReader<'a>, +} + +impl<'a> From> for BlockReader<'a> { + fn from(inner: ArrowRowReader<'a>) -> Self { + Self { inner } + } +} + +/// Iterator over block rows in an RecordBatch. +pub type BlockIterator<'a> = ArrowRowIterator<'a, BlockReader<'a>>; + +impl<'a> BlockReader<'a> { + /// Safely create a new reader for the given batch at row index and check + /// that row_idx is within the bounds of the batch. + pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result { + let inner = ArrowRowReader::new(batch, row_idx)?; + Ok(Self { inner }) + } + + /// Create an iterator over all rows in the batch. + pub fn iter(batch: &'a RecordBatch) -> BlockIterator<'a> { + BlockIterator::new(batch) + } + + /// The block number. + pub fn number(&self) -> Result { + self.inner + .get::(BlockField::Number.as_ref()) + } + + /// The block hash. + pub fn hash(&self) -> Result { + self.inner + .get::(BlockField::Hash.as_ref()) + } + + /// The parent block hash. + pub fn parent_hash(&self) -> Result { + self.inner + .get::(BlockField::ParentHash.as_ref()) + } + + /// The block nonce. + pub fn nonce(&self) -> Result>, ReadError> { + self.inner + .get_nullable::>(BlockField::Nonce.as_ref()) + } + + /// The SHA3 hash of the uncles. + pub fn sha3_uncles(&self) -> Result { + self.inner + .get::(BlockField::Sha3Uncles.as_ref()) + } + + /// The Bloom filter for the logs of the block. + pub fn logs_bloom(&self) -> Result { + self.inner + .get::(BlockField::LogsBloom.as_ref()) + } + + /// The root of the transaction trie of the block. + pub fn transactions_root(&self) -> Result { + self.inner + .get::(BlockField::TransactionsRoot.as_ref()) + } + + /// The root of the final state trie of the block. + pub fn state_root(&self) -> Result { + self.inner + .get::(BlockField::StateRoot.as_ref()) + } + + /// The root of the receipts trie of the block. + pub fn receipts_root(&self) -> Result { + self.inner + .get::(BlockField::ReceiptsRoot.as_ref()) + } + + /// The address of the beneficiary to whom the mining rewards were given. + pub fn miner(&self) -> Result { + self.inner + .get::(BlockField::Miner.as_ref()) + } + + /// The difficulty of the block. + pub fn difficulty(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::Difficulty.as_ref()) + } + + /// The total difficulty of the chain until this block. + pub fn total_difficulty(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::TotalDifficulty.as_ref()) + } + + /// The "extra data" field of this block. + pub fn extra_data(&self) -> Result { + self.inner + .get::(BlockField::ExtraData.as_ref()) + } + + /// The size of this block in bytes. + pub fn size(&self) -> Result { + self.inner + .get::(BlockField::Size.as_ref()) + } + + /// The maximum gas allowed in this block. + pub fn gas_limit(&self) -> Result { + self.inner + .get::(BlockField::GasLimit.as_ref()) + } + + /// The total used gas by all transactions in this block. + pub fn gas_used(&self) -> Result { + self.inner + .get::(BlockField::GasUsed.as_ref()) + } + + /// The unix timestamp for when the block was collated. + pub fn timestamp(&self) -> Result { + self.inner + .get::(BlockField::Timestamp.as_ref()) + } + + /// Array of uncle hashes. + pub fn uncles(&self) -> Result>>, ReadError> { + let all = self + .inner + .get_nullable::(BlockField::Uncles.as_ref())?; + let Some(data) = all else { + return Ok(None); + }; + let mut uncles = Vec::new(); + for uncle_bytes in data.chunks(32) { + let uncle = FixedSizeData::<32>::try_from(uncle_bytes) + .context("convert uncle bytes to uncle") + .map_err(ReadError::ConversionError)?; + uncles.push(uncle); + } + Ok(Some(uncles)) + } + + /// The base fee per gas. + pub fn base_fee_per_gas(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::BaseFeePerGas.as_ref()) + } + + /// The total amount of blob gas consumed by the transactions in the block. + pub fn blob_gas_used(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::BlobGasUsed.as_ref()) + } + + /// A running total of blob gas consumed in excess of the target. + pub fn excess_blob_gas(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::ExcessBlobGas.as_ref()) + } + + /// The hash of the parent beacon block. + pub fn parent_beacon_block_root(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::ParentBeaconBlockRoot.as_ref()) + } + + /// The root of the withdrawal trie. + pub fn withdrawals_root(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::WithdrawalsRoot.as_ref()) + } + + /// The withdrawals in the block. + pub fn withdrawals(&self) -> Result>, ReadError> { + let withdrawals_bin = self + .inner + .get_nullable::(BlockField::Withdrawals.as_ref())?; + let Some(withdrawals_bin) = withdrawals_bin else { + return Ok(None); + }; + + let deser = bincode::deserialize(&withdrawals_bin) + .context("deserialize withdrawals") + .map_err(ReadError::ConversionError)?; + + Ok(Some(deser)) + } + + /// The L1 block number. + pub fn l1_block_number(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::L1BlockNumber.as_ref()) + } + + /// The send count. + pub fn send_count(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::SendCount.as_ref()) + } + + /// The send root. + pub fn send_root(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::SendRoot.as_ref()) + } + + /// The mix hash. + pub fn mix_hash(&self) -> Result, ReadError> { + self.inner + .get_nullable::(BlockField::MixHash.as_ref()) + } +} + +/// Reader for transaction data from Arrow batches. +/// +/// Provides efficient access to transaction fields without copying data from the underlying +/// Arrow columnar format. Each reader is bound to a specific row in the batch. +pub struct TransactionReader<'a> { + inner: ArrowRowReader<'a>, +} + +impl<'a> From> for TransactionReader<'a> { + fn from(inner: ArrowRowReader<'a>) -> Self { + Self { inner } + } +} + +/// Iterator over transaction rows in an RecordBatch. +pub type TransactionIterator<'a> = ArrowRowIterator<'a, TransactionReader<'a>>; + +impl<'a> TransactionReader<'a> { + /// Safely create a new reader for the given batch at row index and check + /// that row_idx is within the bounds of the batch. + pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result { + let inner = ArrowRowReader::new(batch, row_idx)?; + Ok(Self { inner }) + } + + /// Create an iterator over all rows in the batch. + pub fn iter(batch: &'a RecordBatch) -> TransactionIterator<'a> { + TransactionIterator::new(batch) + } + + /// The hash of the block in which this transaction was included. + pub fn block_hash(&self) -> Result { + self.inner + .get::(TransactionField::BlockHash.as_ref()) + } + + /// The number of the block in which this transaction was included. + pub fn block_number(&self) -> Result { + self.inner + .get::(TransactionField::BlockNumber.as_ref()) + } + + /// The address of the sender. + pub fn from(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::From.as_ref()) + } + + /// The gas limit provided by the sender. + pub fn gas(&self) -> Result { + self.inner + .get::(TransactionField::Gas.as_ref()) + } + + /// The gas price willing to be paid by the sender. + pub fn gas_price(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::GasPrice.as_ref()) + } + + /// The hash of this transaction. + pub fn hash(&self) -> Result { + self.inner + .get::(TransactionField::Hash.as_ref()) + } + + /// The data sent along with the transaction. + pub fn input(&self) -> Result { + self.inner + .get::(TransactionField::Input.as_ref()) + } + + /// The number of transactions made by the sender prior to this one. + pub fn nonce(&self) -> Result { + self.inner + .get::(TransactionField::Nonce.as_ref()) + } + + /// The address of the receiver. + pub fn to(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::To.as_ref()) + } + + /// The index of the transaction in the block. + pub fn transaction_index(&self) -> Result { + self.inner + .get::(TransactionField::TransactionIndex.as_ref()) + } + + /// The value transferred. + pub fn value(&self) -> Result { + self.inner + .get::(TransactionField::Value.as_ref()) + } + + /// ECDSA recovery id. + pub fn v(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::V.as_ref()) + } + + /// ECDSA signature r. + pub fn r(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::R.as_ref()) + } + + /// ECDSA signature s. + pub fn s(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::S.as_ref()) + } + + /// Maximum fee per gas the sender is willing to pay for priority. + pub fn max_priority_fee_per_gas(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::MaxPriorityFeePerGas.as_ref()) + } + + /// Maximum total fee per gas the sender is willing to pay. + pub fn max_fee_per_gas(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::MaxFeePerGas.as_ref()) + } + + /// The chain id of the transaction. + pub fn chain_id(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::ChainId.as_ref()) + } + + /// The total amount of gas used when this transaction was executed in the block. + pub fn cumulative_gas_used(&self) -> Result { + self.inner + .get::(TransactionField::CumulativeGasUsed.as_ref()) + } + + /// The sum of the base fee and tip paid per unit of gas. + pub fn effective_gas_price(&self) -> Result { + self.inner + .get::(TransactionField::EffectiveGasPrice.as_ref()) + } + + /// The amount of gas used by this transaction. + pub fn gas_used(&self) -> Result { + self.inner + .get::(TransactionField::GasUsed.as_ref()) + } + + /// The contract address created, if the transaction was a contract creation. + pub fn contract_address(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::ContractAddress.as_ref()) + } + + /// The Bloom filter for the logs of the transaction. + pub fn logs_bloom(&self) -> Result { + self.inner + .get::(TransactionField::LogsBloom.as_ref()) + } + + /// The type of the transaction. + pub fn type_(&self) -> Result, ReadError> { + let type_ = self + .inner + .get_nullable::(TransactionField::Type.as_ref())?; + Ok(type_.map(TransactionType::from)) + } + + /// The post-transaction stateroot. + pub fn root(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::Root.as_ref()) + } + + /// Either 1 (success) or 0 (failure). + pub fn status(&self) -> Result, ReadError> { + let status = self + .inner + .get_nullable::(TransactionField::Status.as_ref())?; + let Some(status) = status else { + return Ok(None); + }; + let status = TransactionStatus::from_u8(status) + .context("convert u8 to transaction status") + .map_err(ReadError::ConversionError)?; + Ok(Some(status)) + } + + /// The first 4 bytes of the transaction input data. + pub fn sighash(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::Sighash.as_ref()) + } + + /// The y parity of the signature. + pub fn y_parity(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::YParity.as_ref()) + } + + /// The access list. + pub fn access_list(&self) -> Result>, ReadError> { + let bin = self + .inner + .get_nullable::(TransactionField::AccessList.as_ref())?; + let Some(bin) = bin else { + return Ok(None); + }; + let deser = bincode::deserialize(&bin) + .context("deserialize access list") + .map_err(ReadError::ConversionError)?; + Ok(Some(deser)) + } + + /// The authorization list. + pub fn authorization_list(&self) -> Result>, ReadError> { + let bin = self + .inner + .get_nullable::(TransactionField::AuthorizationList.as_ref())?; + let Some(bin) = bin else { + return Ok(None); + }; + let deser = bincode::deserialize(&bin) + .context("deserialize authorization list") + .map_err(ReadError::ConversionError)?; + Ok(Some(deser)) + } + + // Additional L1/L2 and blob-related fields would go here... + // For brevity, I'll include a few key ones: + + /// The L1 fee for L2 transactions. + pub fn l1_fee(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::L1Fee.as_ref()) + } + + /// The maximum fee per blob gas. + pub fn max_fee_per_blob_gas(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::MaxFeePerBlobGas.as_ref()) + } + + /// The blob versioned hashes. + pub fn blob_versioned_hashes(&self) -> Result>, ReadError> { + let bin = self + .inner + .get_nullable::(TransactionField::BlobVersionedHashes.as_ref())?; + let Some(bin) = bin else { + return Ok(None); + }; + let mut hashes = Vec::new(); + for hash_bytes in bin.chunks(32) { + let hash = Hash::try_from(hash_bytes) + .context("convert blob versioned hash bytes to hash") + .map_err(ReadError::ConversionError)?; + hashes.push(hash); + } + Ok(Some(hashes)) + } + + /// The L1 gas price for L2 transactions. + pub fn l1_gas_price(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::L1GasPrice.as_ref()) + } + + /// The L1 gas used for L2 transactions. + pub fn l1_gas_used(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::L1GasUsed.as_ref()) + } + + /// The L1 fee scalar for L2 transactions. + pub fn l1_fee_scalar(&self) -> Result, ReadError> { + let scalar = self + .inner + .get_nullable::(TransactionField::L1FeeScalar.as_ref())?; + let Some(scalar_utf8) = scalar else { + return Ok(None); + }; + // stored as a string of float eg 0.69 (utf8 encoded) + let scalar_str = std::str::from_utf8(&scalar_utf8) + .context("convert l1 fee scalar to string") + .map_err(ReadError::ConversionError)?; + + let scalar_f64: f64 = scalar_str + .parse() + .context("parse l1 fee scalar as f64") + .map_err(ReadError::ConversionError)?; + Ok(Some(scalar_f64)) + } + + /// The gas used for L1 for L2 transactions. + pub fn gas_used_for_l1(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::GasUsedForL1.as_ref()) + } + + /// The blob gas price. + pub fn blob_gas_price(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::BlobGasPrice.as_ref()) + } + + /// The blob gas used. + pub fn blob_gas_used(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::BlobGasUsed.as_ref()) + } + + /// The deposit nonce for deposit transactions. + pub fn deposit_nonce(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::DepositNonce.as_ref()) + } + + /// The deposit receipt version for deposit transactions. + pub fn deposit_receipt_version(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::DepositReceiptVersion.as_ref()) + } + + /// The L1 base fee scalar for L2 transactions. + pub fn l1_base_fee_scalar(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::L1BaseFeeScalar.as_ref()) + } + + /// The L1 blob base fee for L2 transactions. + pub fn l1_blob_base_fee(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::L1BlobBaseFee.as_ref()) + } + + /// The L1 blob base fee scalar for L2 transactions. + pub fn l1_blob_base_fee_scalar(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::L1BlobBaseFeeScalar.as_ref()) + } + + /// The L1 block number for L2 transactions. + pub fn l1_block_number(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::L1BlockNumber.as_ref()) + } + + /// The mint value for deposit transactions. + pub fn mint(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::Mint.as_ref()) + } + + /// The source hash for deposit transactions. + pub fn source_hash(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TransactionField::SourceHash.as_ref()) + } +} + +/// Reader for trace data from Arrow batches. +/// +/// Provides efficient access to trace fields without copying data from the underlying +/// Arrow columnar format. Each reader is bound to a specific row in the batch. +pub struct TraceReader<'a> { + inner: ArrowRowReader<'a>, +} + +impl<'a> From> for TraceReader<'a> { + fn from(inner: ArrowRowReader<'a>) -> Self { + Self { inner } + } +} + +/// Iterator over trace rows in an RecordBatch. +pub type TraceIterator<'a> = ArrowRowIterator<'a, TraceReader<'a>>; + +impl<'a> TraceReader<'a> { + /// Safely create a new reader for the given batch at row index and check + /// that row_idx is within the bounds of the batch. + pub fn new(batch: &'a RecordBatch, row_idx: usize) -> anyhow::Result { + let inner = ArrowRowReader::new(batch, row_idx)?; + Ok(Self { inner }) + } + + /// Create an iterator over all rows in the batch. + pub fn iter(batch: &'a RecordBatch) -> TraceIterator<'a> { + TraceIterator::new(batch) + } + + /// The hash of the block in which this trace occurred. + pub fn block_hash(&self) -> Result { + self.inner + .get::(TraceField::BlockHash.as_ref()) + } + + /// The number of the block in which this trace occurred. + pub fn block_number(&self) -> Result { + self.inner + .get::(TraceField::BlockNumber.as_ref()) + } + + /// The address from which the trace originated. + pub fn from(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::From.as_ref()) + } + + /// The address to which the trace was sent. + pub fn to(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::To.as_ref()) + } + + /// The type of call. + pub fn call_type(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::CallType.as_ref()) + } + + /// The amount of gas provided to the trace. + pub fn gas(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Gas.as_ref()) + } + + /// The input data. + pub fn input(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Input.as_ref()) + } + + /// The init data for contract creation traces. + pub fn init(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Init.as_ref()) + } + + /// The value transferred. + pub fn value(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Value.as_ref()) + } + + /// The address of the author (miner). + pub fn author(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Author.as_ref()) + } + + /// The type of reward. + pub fn reward_type(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::RewardType.as_ref()) + } + + /// The address involved in the trace. + pub fn address(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Address.as_ref()) + } + + /// The bytecode. + pub fn code(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Code.as_ref()) + } + + /// The amount of gas used by the trace. + pub fn gas_used(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::GasUsed.as_ref()) + } + + /// The output data. + pub fn output(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Output.as_ref()) + } + + /// The number of sub-traces. + pub fn subtraces(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Subtraces.as_ref()) + } + + /// The trace address. + pub fn trace_address(&self) -> Result>, ReadError> { + let bin = self + .inner + .get_nullable::(TraceField::TraceAddress.as_ref())?; + let Some(bin) = bin else { + return Ok(None); + }; + let deser = bincode::deserialize(&bin) + .context("deserialize trace address") + .map_err(ReadError::ConversionError)?; + Ok(Some(deser)) + } + + /// The hash of the transaction this trace belongs to. + pub fn transaction_hash(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::TransactionHash.as_ref()) + } + + /// The position of the transaction in the block. + pub fn transaction_position(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::TransactionPosition.as_ref()) + } + + /// The type of trace. + pub fn type_(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Type.as_ref()) + } + + /// The error message, if any. + pub fn error(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Error.as_ref()) + } + + /// The first 4 bytes of the input data. + pub fn sighash(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Sighash.as_ref()) + } + + /// The action address. + pub fn action_address(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::ActionAddress.as_ref()) + } + + /// The balance. + pub fn balance(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::Balance.as_ref()) + } + + /// The refund address. + pub fn refund_address(&self) -> Result, ReadError> { + self.inner + .get_nullable::(TraceField::RefundAddress.as_ref()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Context; + + trait NotOption {} + + impl NotOption for hypersync_format::Quantity {} + impl NotOption for hypersync_format::Data {} + impl NotOption for hypersync_format::UInt {} + impl NotOption for hypersync_format::FixedSizeData {} + impl NotOption for u64 {} + + /// Compile-time tests that ensure the correct return types + #[test] + fn test_nullability_matches_schema() { + fn assert_nullable<'a, T, F>(_: F, log_field: LogField) + where + F: FnOnce(&LogReader<'a>) -> Result, ReadError>, + { + assert!(log_field.is_nullable(), "Optional type should be nullable"); + } + + fn assert_not_nullable<'a, T, F>(_: F, log_field: LogField) + where + F: FnOnce(&LogReader<'a>) -> Result, + // just to make sure its an inner type and not an Option + T: NotOption, + { + assert!(!log_field.is_nullable(), "should not be nullable"); + } + // This test will fail to compile if the return types are wrong + + for field in LogField::all() { + match field { + LogField::Removed => assert_nullable(LogReader::removed, field), + LogField::Topic0 => assert_nullable(LogReader::topic0, field), + LogField::Topic1 => assert_nullable(LogReader::topic1, field), + LogField::Topic2 => assert_nullable(LogReader::topic2, field), + LogField::Topic3 => assert_nullable(LogReader::topic3, field), + LogField::LogIndex => assert_not_nullable(LogReader::log_index, field), + LogField::TransactionIndex => { + assert_not_nullable(LogReader::transaction_index, field) + } + LogField::TransactionHash => { + assert_not_nullable(LogReader::transaction_hash, field) + } + LogField::BlockHash => assert_not_nullable(LogReader::block_hash, field), + LogField::BlockNumber => assert_not_nullable(LogReader::block_number, field), + LogField::Address => assert_not_nullable(LogReader::address, field), + LogField::Data => assert_not_nullable(LogReader::data, field), + } + } + } + + #[test] + fn test_block_nullability_matches_schema() { + fn assert_nullable<'a, T, F>(_: F, block_field: BlockField) + where + F: FnOnce(&BlockReader<'a>) -> Result, ReadError>, + { + assert!( + block_field.is_nullable(), + "Optional type should be nullable" + ); + } + + fn assert_not_nullable<'a, T, F>(_: F, block_field: BlockField) + where + F: FnOnce(&BlockReader<'a>) -> Result, + T: NotOption, + { + assert!(!block_field.is_nullable(), "should not be nullable"); + } + + for field in BlockField::all() { + match field { + // Nullable fields + BlockField::Nonce => assert_nullable(BlockReader::nonce, field), + BlockField::Difficulty => assert_nullable(BlockReader::difficulty, field), + BlockField::TotalDifficulty => { + assert_nullable(BlockReader::total_difficulty, field) + } + BlockField::Uncles => assert_nullable(BlockReader::uncles, field), + BlockField::BaseFeePerGas => assert_nullable(BlockReader::base_fee_per_gas, field), + BlockField::BlobGasUsed => assert_nullable(BlockReader::blob_gas_used, field), + BlockField::ExcessBlobGas => assert_nullable(BlockReader::excess_blob_gas, field), + BlockField::ParentBeaconBlockRoot => { + assert_nullable(BlockReader::parent_beacon_block_root, field) + } + BlockField::WithdrawalsRoot => { + assert_nullable(BlockReader::withdrawals_root, field) + } + BlockField::Withdrawals => assert_nullable(BlockReader::withdrawals, field), + BlockField::L1BlockNumber => assert_nullable(BlockReader::l1_block_number, field), + BlockField::SendCount => assert_nullable(BlockReader::send_count, field), + BlockField::SendRoot => assert_nullable(BlockReader::send_root, field), + BlockField::MixHash => assert_nullable(BlockReader::mix_hash, field), + // Non-nullable fields + BlockField::Number => assert_not_nullable(BlockReader::number, field), + BlockField::Hash => assert_not_nullable(BlockReader::hash, field), + BlockField::ParentHash => assert_not_nullable(BlockReader::parent_hash, field), + BlockField::Sha3Uncles => assert_not_nullable(BlockReader::sha3_uncles, field), + BlockField::LogsBloom => assert_not_nullable(BlockReader::logs_bloom, field), + BlockField::TransactionsRoot => { + assert_not_nullable(BlockReader::transactions_root, field) + } + BlockField::StateRoot => assert_not_nullable(BlockReader::state_root, field), + BlockField::ReceiptsRoot => assert_not_nullable(BlockReader::receipts_root, field), + BlockField::Miner => assert_not_nullable(BlockReader::miner, field), + BlockField::ExtraData => assert_not_nullable(BlockReader::extra_data, field), + BlockField::Size => assert_not_nullable(BlockReader::size, field), + BlockField::GasLimit => assert_not_nullable(BlockReader::gas_limit, field), + BlockField::GasUsed => assert_not_nullable(BlockReader::gas_used, field), + BlockField::Timestamp => assert_not_nullable(BlockReader::timestamp, field), + } + } + } + + #[test] + fn test_transaction_nullability_matches_schema() { + fn assert_nullable<'a, T, F>(_: F, transaction_field: TransactionField) + where + F: FnOnce(&TransactionReader<'a>) -> Result, ReadError>, + { + assert!( + transaction_field.is_nullable(), + "Optional type should be nullable" + ); + } + + fn assert_not_nullable<'a, T, F>(_: F, transaction_field: TransactionField) + where + F: FnOnce(&TransactionReader<'a>) -> Result, + T: NotOption, + { + assert!(!transaction_field.is_nullable(), "should not be nullable"); + } + + for field in TransactionField::all() { + match field { + TransactionField::From => assert_nullable(TransactionReader::from, field), + TransactionField::GasPrice => assert_nullable(TransactionReader::gas_price, field), + TransactionField::To => assert_nullable(TransactionReader::to, field), + TransactionField::V => assert_nullable(TransactionReader::v, field), + TransactionField::R => assert_nullable(TransactionReader::r, field), + TransactionField::S => assert_nullable(TransactionReader::s, field), + TransactionField::MaxPriorityFeePerGas => { + assert_nullable(TransactionReader::max_priority_fee_per_gas, field) + } + TransactionField::MaxFeePerGas => { + assert_nullable(TransactionReader::max_fee_per_gas, field) + } + TransactionField::ChainId => assert_nullable(TransactionReader::chain_id, field), + TransactionField::ContractAddress => { + assert_nullable(TransactionReader::contract_address, field) + } + TransactionField::Type => assert_nullable(TransactionReader::type_, field), + TransactionField::Root => assert_nullable(TransactionReader::root, field), + TransactionField::Status => assert_nullable(TransactionReader::status, field), + TransactionField::Sighash => assert_nullable(TransactionReader::sighash, field), + TransactionField::YParity => assert_nullable(TransactionReader::y_parity, field), + TransactionField::AccessList => { + assert_nullable(TransactionReader::access_list, field) + } + TransactionField::AuthorizationList => { + assert_nullable(TransactionReader::authorization_list, field) + } + TransactionField::L1Fee => assert_nullable(TransactionReader::l1_fee, field), + TransactionField::MaxFeePerBlobGas => { + assert_nullable(TransactionReader::max_fee_per_blob_gas, field) + } + TransactionField::BlobVersionedHashes => { + assert_nullable(TransactionReader::blob_versioned_hashes, field) + } + TransactionField::BlockHash => { + assert_not_nullable(TransactionReader::block_hash, field) + } + TransactionField::BlockNumber => { + assert_not_nullable(TransactionReader::block_number, field) + } + TransactionField::Gas => assert_not_nullable(TransactionReader::gas, field), + TransactionField::Hash => assert_not_nullable(TransactionReader::hash, field), + TransactionField::Input => assert_not_nullable(TransactionReader::input, field), + TransactionField::Nonce => assert_not_nullable(TransactionReader::nonce, field), + TransactionField::TransactionIndex => { + assert_not_nullable(TransactionReader::transaction_index, field) + } + TransactionField::Value => assert_not_nullable(TransactionReader::value, field), + TransactionField::CumulativeGasUsed => { + assert_not_nullable(TransactionReader::cumulative_gas_used, field) + } + TransactionField::EffectiveGasPrice => { + assert_not_nullable(TransactionReader::effective_gas_price, field) + } + TransactionField::GasUsed => { + assert_not_nullable(TransactionReader::gas_used, field) + } + TransactionField::LogsBloom => { + assert_not_nullable(TransactionReader::logs_bloom, field) + } + TransactionField::L1GasPrice => { + assert_nullable(TransactionReader::l1_gas_price, field) + } + TransactionField::L1GasUsed => { + assert_nullable(TransactionReader::l1_gas_used, field) + } + TransactionField::L1FeeScalar => { + assert_nullable(TransactionReader::l1_fee_scalar, field) + } + TransactionField::GasUsedForL1 => { + assert_nullable(TransactionReader::gas_used_for_l1, field) + } + TransactionField::BlobGasPrice => { + assert_nullable(TransactionReader::blob_gas_price, field) + } + TransactionField::BlobGasUsed => { + assert_nullable(TransactionReader::blob_gas_used, field) + } + TransactionField::DepositNonce => { + assert_nullable(TransactionReader::deposit_nonce, field) + } + TransactionField::DepositReceiptVersion => { + assert_nullable(TransactionReader::deposit_receipt_version, field) + } + TransactionField::L1BaseFeeScalar => { + assert_nullable(TransactionReader::l1_base_fee_scalar, field) + } + TransactionField::L1BlobBaseFee => { + assert_nullable(TransactionReader::l1_blob_base_fee, field) + } + TransactionField::L1BlobBaseFeeScalar => { + assert_nullable(TransactionReader::l1_blob_base_fee_scalar, field) + } + + TransactionField::L1BlockNumber => { + assert_nullable(TransactionReader::l1_block_number, field) + } + TransactionField::Mint => assert_nullable(TransactionReader::mint, field), + TransactionField::SourceHash => { + assert_nullable(TransactionReader::source_hash, field) + } + } + } + } + + #[test] + fn test_trace_nullability_matches_schema() { + fn assert_nullable<'a, T, F>(_: F, trace_field: TraceField) + where + F: FnOnce(&TraceReader<'a>) -> Result, ReadError>, + { + assert!( + trace_field.is_nullable(), + "Optional type should be nullable" + ); + } + + fn assert_not_nullable<'a, T, F>(_: F, trace_field: TraceField) + where + F: FnOnce(&TraceReader<'a>) -> Result, + T: NotOption, + { + assert!(!trace_field.is_nullable(), "should not be nullable"); + } + + for field in TraceField::all() { + match field { + // Nullable fields + TraceField::TransactionHash => { + assert_nullable(TraceReader::transaction_hash, field) + } + TraceField::TransactionPosition => { + assert_nullable(TraceReader::transaction_position, field) + } + TraceField::Type => assert_nullable(TraceReader::type_, field), + TraceField::Error => assert_nullable(TraceReader::error, field), + TraceField::From => assert_nullable(TraceReader::from, field), + TraceField::To => assert_nullable(TraceReader::to, field), + TraceField::Author => assert_nullable(TraceReader::author, field), + TraceField::Gas => assert_nullable(TraceReader::gas, field), + TraceField::GasUsed => assert_nullable(TraceReader::gas_used, field), + TraceField::ActionAddress => assert_nullable(TraceReader::action_address, field), + TraceField::Address => assert_nullable(TraceReader::address, field), + TraceField::Balance => assert_nullable(TraceReader::balance, field), + TraceField::CallType => assert_nullable(TraceReader::call_type, field), + TraceField::Code => assert_nullable(TraceReader::code, field), + TraceField::Init => assert_nullable(TraceReader::init, field), + TraceField::Input => assert_nullable(TraceReader::input, field), + TraceField::Output => assert_nullable(TraceReader::output, field), + TraceField::RefundAddress => assert_nullable(TraceReader::refund_address, field), + TraceField::RewardType => assert_nullable(TraceReader::reward_type, field), + TraceField::Sighash => assert_nullable(TraceReader::sighash, field), + TraceField::Subtraces => assert_nullable(TraceReader::subtraces, field), + TraceField::TraceAddress => assert_nullable(TraceReader::trace_address, field), + TraceField::Value => assert_nullable(TraceReader::value, field), + // Non-nullable fields + TraceField::BlockHash => assert_not_nullable(TraceReader::block_hash, field), + TraceField::BlockNumber => assert_not_nullable(TraceReader::block_number, field), + } + } + } + + fn assert_ok(result: Result) { + let _ = result.expect("should be ok"); + } + + #[tokio::test(flavor = "multi_thread")] + #[ignore = "integration test for returned schema"] + async fn test_readers_integration() -> anyhow::Result<()> { + use crate::{ + net_types::{LogField, LogFilter, TransactionField, TransactionFilter}, + Client, Query, + }; + let client = Client::builder() + .url("https://eth-traces.hypersync.xyz") + .api_token(dotenvy::var("HYPERSYNC_API_TOKEN")?) + .build() + .context("Failed to build client")?; + + let query = Query::new() + .from_block(20_000_000) + .to_block_excl(20_000_001) + .include_all_blocks() + .where_logs(LogFilter::all()) + .where_transactions(TransactionFilter::all()) + .select_log_fields(LogField::all()) + .select_block_fields(BlockField::all()) + .select_transaction_fields(TransactionField::all()) + .select_trace_fields(TraceField::all()); + + let res = client.collect_arrow(query, Default::default()).await?; + + let mut num_logs = 0; + + for batch in res.data.logs { + for log_reader in LogReader::iter(&batch) { + num_logs += 1; + + for log_field in LogField::all() { + match log_field { + LogField::Removed => assert_ok(log_reader.removed()), + LogField::LogIndex => assert_ok(log_reader.log_index()), + LogField::TransactionIndex => assert_ok(log_reader.transaction_index()), + LogField::TransactionHash => assert_ok(log_reader.transaction_hash()), + LogField::BlockHash => assert_ok(log_reader.block_hash()), + LogField::BlockNumber => assert_ok(log_reader.block_number()), + LogField::Address => assert_ok(log_reader.address()), + LogField::Data => assert_ok(log_reader.data()), + LogField::Topic0 => assert_ok(log_reader.topic0()), + LogField::Topic1 => assert_ok(log_reader.topic1()), + LogField::Topic2 => assert_ok(log_reader.topic2()), + LogField::Topic3 => assert_ok(log_reader.topic3()), + } + } + } + } + + println!("num_logs: {}", num_logs); + + let mut num_transactions = 0; + + for batch in res.data.transactions { + for transaction_reader in TransactionReader::iter(&batch) { + num_transactions += 1; + + for transaction_field in TransactionField::all() { + match transaction_field { + TransactionField::BlockHash => assert_ok(transaction_reader.block_hash()), + TransactionField::BlockNumber => { + assert_ok(transaction_reader.block_number()) + } + TransactionField::From => assert_ok(transaction_reader.from()), + TransactionField::Gas => assert_ok(transaction_reader.gas()), + TransactionField::GasPrice => assert_ok(transaction_reader.gas_price()), + TransactionField::Hash => assert_ok(transaction_reader.hash()), + TransactionField::Input => assert_ok(transaction_reader.input()), + TransactionField::Nonce => assert_ok(transaction_reader.nonce()), + TransactionField::To => assert_ok(transaction_reader.to()), + TransactionField::TransactionIndex => { + assert_ok(transaction_reader.transaction_index()) + } + TransactionField::Value => assert_ok(transaction_reader.value()), + TransactionField::V => assert_ok(transaction_reader.v()), + TransactionField::R => assert_ok(transaction_reader.r()), + TransactionField::S => assert_ok(transaction_reader.s()), + TransactionField::MaxPriorityFeePerGas => { + assert_ok(transaction_reader.max_priority_fee_per_gas()) + } + TransactionField::MaxFeePerGas => { + assert_ok(transaction_reader.max_fee_per_gas()) + } + TransactionField::ChainId => assert_ok(transaction_reader.chain_id()), + TransactionField::CumulativeGasUsed => { + assert_ok(transaction_reader.cumulative_gas_used()) + } + TransactionField::EffectiveGasPrice => { + assert_ok(transaction_reader.effective_gas_price()) + } + TransactionField::GasUsed => assert_ok(transaction_reader.gas_used()), + TransactionField::ContractAddress => { + assert_ok(transaction_reader.contract_address()) + } + TransactionField::LogsBloom => assert_ok(transaction_reader.logs_bloom()), + TransactionField::Type => assert_ok(transaction_reader.type_()), + TransactionField::Root => assert_ok(transaction_reader.root()), + TransactionField::Status => assert_ok(transaction_reader.status()), + TransactionField::Sighash => assert_ok(transaction_reader.sighash()), + TransactionField::YParity => assert_ok(transaction_reader.y_parity()), + TransactionField::AccessList => assert_ok(transaction_reader.access_list()), + TransactionField::AuthorizationList => { + assert_ok(transaction_reader.authorization_list()) + } + TransactionField::L1Fee => assert_ok(transaction_reader.l1_fee()), + TransactionField::MaxFeePerBlobGas => { + assert_ok(transaction_reader.max_fee_per_blob_gas()) + } + TransactionField::BlobVersionedHashes => { + assert_ok(transaction_reader.blob_versioned_hashes()) + } + TransactionField::L1GasPrice => { + assert_ok(transaction_reader.l1_gas_price()) + } + TransactionField::L1GasUsed => assert_ok(transaction_reader.l1_gas_used()), + TransactionField::L1FeeScalar => { + assert_ok(transaction_reader.l1_fee_scalar()) + } + TransactionField::GasUsedForL1 => { + assert_ok(transaction_reader.gas_used_for_l1()) + } + TransactionField::BlobGasPrice => { + assert_ok(transaction_reader.blob_gas_price()) + } + TransactionField::BlobGasUsed => { + assert_ok(transaction_reader.blob_gas_used()) + } + TransactionField::DepositNonce => { + assert_ok(transaction_reader.deposit_nonce()) + } + TransactionField::DepositReceiptVersion => { + assert_ok(transaction_reader.deposit_receipt_version()) + } + TransactionField::L1BaseFeeScalar => { + assert_ok(transaction_reader.l1_base_fee_scalar()) + } + TransactionField::L1BlobBaseFee => { + assert_ok(transaction_reader.l1_blob_base_fee()) + } + TransactionField::L1BlobBaseFeeScalar => { + assert_ok(transaction_reader.l1_blob_base_fee_scalar()) + } + TransactionField::L1BlockNumber => { + assert_ok(transaction_reader.l1_block_number()) + } + TransactionField::Mint => assert_ok(transaction_reader.mint()), + TransactionField::SourceHash => assert_ok(transaction_reader.source_hash()), + } + } + } + } + + println!("num_transactions: {}", num_transactions); + let mut num_blocks = 0; + + for batch in res.data.blocks { + for block_reader in BlockReader::iter(&batch) { + num_blocks += 1; + + for block_field in BlockField::all() { + match block_field { + BlockField::Number => assert_ok(block_reader.number()), + BlockField::Hash => assert_ok(block_reader.hash()), + BlockField::ParentHash => assert_ok(block_reader.parent_hash()), + BlockField::Nonce => assert_ok(block_reader.nonce()), + BlockField::Sha3Uncles => assert_ok(block_reader.sha3_uncles()), + BlockField::LogsBloom => assert_ok(block_reader.logs_bloom()), + BlockField::TransactionsRoot => assert_ok(block_reader.transactions_root()), + BlockField::StateRoot => assert_ok(block_reader.state_root()), + BlockField::ReceiptsRoot => assert_ok(block_reader.receipts_root()), + BlockField::Miner => assert_ok(block_reader.miner()), + BlockField::Difficulty => assert_ok(block_reader.difficulty()), + BlockField::TotalDifficulty => assert_ok(block_reader.total_difficulty()), + BlockField::ExtraData => assert_ok(block_reader.extra_data()), + BlockField::Size => assert_ok(block_reader.size()), + BlockField::GasLimit => assert_ok(block_reader.gas_limit()), + BlockField::GasUsed => assert_ok(block_reader.gas_used()), + BlockField::Timestamp => assert_ok(block_reader.timestamp()), + BlockField::Uncles => assert_ok(block_reader.uncles()), + BlockField::BaseFeePerGas => assert_ok(block_reader.base_fee_per_gas()), + BlockField::BlobGasUsed => assert_ok(block_reader.blob_gas_used()), + BlockField::ExcessBlobGas => assert_ok(block_reader.excess_blob_gas()), + BlockField::ParentBeaconBlockRoot => { + assert_ok(block_reader.parent_beacon_block_root()) + } + BlockField::WithdrawalsRoot => assert_ok(block_reader.withdrawals_root()), + BlockField::Withdrawals => assert_ok(block_reader.withdrawals()), + BlockField::L1BlockNumber => assert_ok(block_reader.l1_block_number()), + BlockField::SendCount => assert_ok(block_reader.send_count()), + BlockField::SendRoot => assert_ok(block_reader.send_root()), + BlockField::MixHash => assert_ok(block_reader.mix_hash()), + } + } + } + } + + println!("num_blocks: {}", num_blocks); + + let mut num_traces = 0; + + for batch in res.data.traces { + for trace_reader in TraceReader::iter(&batch) { + num_traces += 1; + + for trace_field in TraceField::all() { + match trace_field { + TraceField::BlockHash => assert_ok(trace_reader.block_hash()), + TraceField::BlockNumber => assert_ok(trace_reader.block_number()), + TraceField::From => assert_ok(trace_reader.from()), + TraceField::To => assert_ok(trace_reader.to()), + TraceField::CallType => assert_ok(trace_reader.call_type()), + TraceField::Gas => assert_ok(trace_reader.gas()), + TraceField::Input => assert_ok(trace_reader.input()), + TraceField::Init => assert_ok(trace_reader.init()), + TraceField::Value => assert_ok(trace_reader.value()), + TraceField::Author => assert_ok(trace_reader.author()), + TraceField::RewardType => assert_ok(trace_reader.reward_type()), + TraceField::Address => assert_ok(trace_reader.address()), + TraceField::Code => assert_ok(trace_reader.code()), + TraceField::GasUsed => assert_ok(trace_reader.gas_used()), + TraceField::Output => assert_ok(trace_reader.output()), + TraceField::Subtraces => assert_ok(trace_reader.subtraces()), + TraceField::TraceAddress => assert_ok(trace_reader.trace_address()), + TraceField::TransactionHash => assert_ok(trace_reader.transaction_hash()), + TraceField::TransactionPosition => { + assert_ok(trace_reader.transaction_position()) + } + TraceField::Type => assert_ok(trace_reader.type_()), + TraceField::Error => assert_ok(trace_reader.error()), + TraceField::Sighash => assert_ok(trace_reader.sighash()), + TraceField::ActionAddress => assert_ok(trace_reader.action_address()), + TraceField::Balance => assert_ok(trace_reader.balance()), + TraceField::RefundAddress => assert_ok(trace_reader.refund_address()), + } + } + } + } + + println!("num_traces: {}", num_traces); + + assert!(num_traces > 0, "no traces found"); + assert!(num_logs > 0, "no logs found"); + assert!(num_transactions > 0, "no transactions found"); + assert!(num_blocks > 0, "no blocks found"); + + Ok(()) + } +} diff --git a/hypersync-client/src/from_arrow.rs b/hypersync-client/src/from_arrow.rs index 3ce1f45..a510926 100644 --- a/hypersync-client/src/from_arrow.rs +++ b/hypersync-client/src/from_arrow.rs @@ -1,367 +1,384 @@ -use std::fmt::Debug; - -use arrayvec::ArrayVec; -use arrow::array::{ - Array, BinaryArray, BooleanArray, RecordBatch, StringArray, UInt64Array, UInt8Array, +use crate::{ + arrow_reader::{self, BlockReader, LogReader, TraceReader, TransactionReader}, + simple_types::{Block, Log, Trace, Transaction}, }; -use hypersync_format::{TransactionStatus, TransactionType, UInt}; - -use crate::simple_types::{Block, Log, Trace, Transaction}; +use anyhow::Context; +use arrayvec::ArrayVec; +use arrow::array::RecordBatch; -fn get_str<'a, T: From<&'a str>>(array: Option<&'a StringArray>, index: usize) -> Option { - match array { - None => None, - Some(a) => { - if a.is_valid(index) { - Some(a.value(index).into()) - } else { - None - } - } +fn to_opt(val: Result) -> anyhow::Result> { + match val { + Ok(val) => Ok(Some(val)), + // Only column not found error is valid for flattening to a None as if it + // were null since the user can deselect a the column + Err(arrow_reader::ReadError::ColumnError(arrow_reader::ColumnError { + err: arrow_reader::ColumnErrorType::NotFound, + .. + })) => Ok(None), + // All other errors are unexpected and should be surfaced + Err(e) => Err(anyhow::Error::new(e)), } } -fn get_binary<'a, T: TryFrom<&'a [u8]>>(array: Option<&'a BinaryArray>, index: usize) -> Option -where - >::Error: Debug, -{ - match array { - None => None, - Some(a) => { - if a.is_valid(index) { - Some(a.value(index).try_into().unwrap()) - } else { - None - } - } +fn to_nested_opt(val: Result, arrow_reader::ReadError>) -> anyhow::Result> { + match to_opt(val) { + Ok(Some(Some(val))) => Ok(Some(val)), + Ok(Some(None)) => Ok(None), + Ok(None) => Ok(None), + Err(e) => Err(e), } } -fn get_bool(array: Option<&BooleanArray>, index: usize) -> Option { - match array { - None => None, - Some(a) => { - if a.is_valid(index) { - Some(a.value(index)) - } else { - None - } - } - } -} +impl TryFrom> for Log { + type Error = anyhow::Error; -fn get_u8(array: Option<&UInt8Array>, index: usize) -> Option { - match array { - None => None, - Some(a) => { - if a.is_valid(index) { - Some(a.value(index)) - } else { - None - } - } + fn try_from(reader: LogReader<'_>) -> Result { + let removed = to_nested_opt(reader.removed()).context("read field removed")?; + let log_index = to_opt(reader.log_index()).context("read field log_index")?; + let transaction_index = + to_opt(reader.transaction_index()).context("read field transaction_index")?; + let transaction_hash = + to_opt(reader.transaction_hash()).context("read field transaction_hash")?; + let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?; + let block_number = to_opt(reader.block_number()).context("read field block_number")?; + let address = to_opt(reader.address()).context("read field address")?; + let data = to_opt(reader.data()).context("read field data")?; + let mut topics = ArrayVec::new(); + let topic0 = to_nested_opt(reader.topic0()).context("read field topic0")?; + topics.push(topic0); + let topic1 = to_nested_opt(reader.topic1()).context("read field topic1")?; + topics.push(topic1); + let topic2 = to_nested_opt(reader.topic2()).context("read field topic2")?; + topics.push(topic2); + let topic3 = to_nested_opt(reader.topic3()).context("read field topic3")?; + topics.push(topic3); + Ok(Self { + removed, + log_index, + transaction_index, + transaction_hash, + block_hash, + block_number, + address, + data, + topics, + }) } } -fn get_u64(array: Option<&UInt64Array>, index: usize) -> Option { - match array { - None => None, - Some(a) => { - if a.is_valid(index) { - Some(a.value(index)) - } else { - None - } - } +impl TryFrom> for Block { + type Error = anyhow::Error; + + fn try_from(reader: BlockReader<'_>) -> Result { + let number = to_opt(reader.number()).context("read field number")?; + let hash = to_opt(reader.hash()).context("read field hash")?; + let parent_hash = to_opt(reader.parent_hash()).context("read field parent_hash")?; + let nonce = to_nested_opt(reader.nonce()).context("read field nonce")?; + let sha3_uncles = to_opt(reader.sha3_uncles()).context("read field sha3_uncles")?; + let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?; + let transactions_root = + to_opt(reader.transactions_root()).context("read field transactions_root")?; + let state_root = to_opt(reader.state_root()).context("read field state_root")?; + let receipts_root = to_opt(reader.receipts_root()).context("read field receipts_root")?; + let miner = to_opt(reader.miner()).context("read field miner")?; + let difficulty = to_nested_opt(reader.difficulty()).context("read field difficulty")?; + let total_difficulty = + to_nested_opt(reader.total_difficulty()).context("read field total_difficulty")?; + let extra_data = to_opt(reader.extra_data()).context("read field extra_data")?; + let size = to_opt(reader.size()).context("read field size")?; + let gas_limit = to_opt(reader.gas_limit()).context("read field gas_limit")?; + let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?; + let timestamp = to_opt(reader.timestamp()).context("read field timestamp")?; + let uncles = to_nested_opt(reader.uncles()).context("read field uncles")?; + let base_fee_per_gas = + to_nested_opt(reader.base_fee_per_gas()).context("read field base_fee_per_gas")?; + let blob_gas_used = + to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?; + let excess_blob_gas = + to_nested_opt(reader.excess_blob_gas()).context("read field excess_blob_gas")?; + let parent_beacon_block_root = to_nested_opt(reader.parent_beacon_block_root()) + .context("read field parent_beacon_block_root")?; + let withdrawals_root = + to_nested_opt(reader.withdrawals_root()).context("read field withdrawals_root")?; + let withdrawals = to_nested_opt(reader.withdrawals()).context("read field withdrawals")?; + let l1_block_number = + to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?; + let send_count = to_nested_opt(reader.send_count()).context("read field send_count")?; + let send_root = to_nested_opt(reader.send_root()).context("read field send_root")?; + let mix_hash = to_nested_opt(reader.mix_hash()).context("read field mix_hash")?; + + Ok(Self { + number, + hash, + parent_hash, + nonce, + sha3_uncles, + logs_bloom, + transactions_root, + state_root, + receipts_root, + miner, + difficulty, + total_difficulty, + extra_data, + size, + gas_limit, + gas_used, + timestamp, + uncles, + base_fee_per_gas, + blob_gas_used, + excess_blob_gas, + parent_beacon_block_root, + withdrawals_root, + withdrawals, + l1_block_number, + send_count, + send_root, + mix_hash, + }) } } -fn column_as<'a, T: 'static>(batch: &'a RecordBatch, col_name: &str) -> Option<&'a T> { - match batch.column_by_name(col_name) { - None => None, - Some(c) => c.as_any().downcast_ref::(), +impl TryFrom> for Transaction { + type Error = anyhow::Error; + + fn try_from(reader: TransactionReader<'_>) -> Result { + let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?; + let block_number = to_opt(reader.block_number()).context("read field block_number")?; + let from = to_nested_opt(reader.from()).context("read field from")?; + let gas = to_opt(reader.gas()).context("read field gas")?; + let gas_price = to_nested_opt(reader.gas_price()).context("read field gas_price")?; + let hash = to_opt(reader.hash()).context("read field hash")?; + let input = to_opt(reader.input()).context("read field input")?; + let nonce = to_opt(reader.nonce()).context("read field nonce")?; + let to = to_nested_opt(reader.to()).context("read field to")?; + let transaction_index = + to_opt(reader.transaction_index()).context("read field transaction_index")?; + let value = to_opt(reader.value()).context("read field value")?; + let v = to_nested_opt(reader.v()).context("read field v")?; + let r = to_nested_opt(reader.r()).context("read field r")?; + let s = to_nested_opt(reader.s()).context("read field s")?; + let y_parity = to_nested_opt(reader.y_parity()).context("read field y_parity")?; + let max_priority_fee_per_gas = to_nested_opt(reader.max_priority_fee_per_gas()) + .context("read field max_priority_fee_per_gas")?; + let max_fee_per_gas = + to_nested_opt(reader.max_fee_per_gas()).context("read field max_fee_per_gas")?; + let chain_id = to_nested_opt(reader.chain_id()).context("read field chain_id")?; + let access_list = to_nested_opt(reader.access_list()).context("read field access_list")?; + let authorization_list = + to_nested_opt(reader.authorization_list()).context("read field authorization_list")?; + let max_fee_per_blob_gas = to_nested_opt(reader.max_fee_per_blob_gas()) + .context("read field max_fee_per_blob_gas")?; + let blob_versioned_hashes = to_nested_opt(reader.blob_versioned_hashes()) + .context("read field blob_versioned_hashes")?; + let cumulative_gas_used = + to_opt(reader.cumulative_gas_used()).context("read field cumulative_gas_used")?; + let effective_gas_price = + to_opt(reader.effective_gas_price()).context("read field effective_gas_price")?; + let gas_used = to_opt(reader.gas_used()).context("read field gas_used")?; + let contract_address = + to_nested_opt(reader.contract_address()).context("read field contract_address")?; + let logs_bloom = to_opt(reader.logs_bloom()).context("read field logs_bloom")?; + let type_ = to_nested_opt(reader.type_()).context("read field type_")?; + let root = to_nested_opt(reader.root()).context("read field root")?; + let status = to_nested_opt(reader.status()).context("read field status")?; + let l1_fee = to_nested_opt(reader.l1_fee()).context("read field l1_fee")?; + let l1_gas_price = + to_nested_opt(reader.l1_gas_price()).context("read field l1_gas_price")?; + let l1_gas_used = to_nested_opt(reader.l1_gas_used()).context("read field l1_gas_used")?; + let l1_fee_scalar = + to_nested_opt(reader.l1_fee_scalar()).context("read field l1_fee_scalar")?; + let gas_used_for_l1 = + to_nested_opt(reader.gas_used_for_l1()).context("read field gas_used_for_l1")?; + let blob_gas_price = + to_nested_opt(reader.blob_gas_price()).context("read field blob_gas_price")?; + let blob_gas_used = + to_nested_opt(reader.blob_gas_used()).context("read field blob_gas_used")?; + let deposit_nonce = + to_nested_opt(reader.deposit_nonce()).context("read field deposit_nonce")?; + let deposit_receipt_version = to_nested_opt(reader.deposit_receipt_version()) + .context("read field deposit_receipt_version")?; + let l1_base_fee_scalar = + to_nested_opt(reader.l1_base_fee_scalar()).context("read field l1_base_fee_scalar")?; + let l1_blob_base_fee = + to_nested_opt(reader.l1_blob_base_fee()).context("read field l1_blob_base_fee")?; + let l1_blob_base_fee_scalar = to_nested_opt(reader.l1_blob_base_fee_scalar()) + .context("read field l1_blob_base_fee_scalar")?; + let l1_block_number = + to_nested_opt(reader.l1_block_number()).context("read field l1_block_number")?; + let mint = to_nested_opt(reader.mint()).context("read field mint")?; + let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?; + let source_hash = to_nested_opt(reader.source_hash()).context("read field source_hash")?; + + Ok(Self { + block_hash, + block_number, + from, + gas, + gas_price, + hash, + input, + nonce, + to, + transaction_index, + value, + v, + r, + s, + y_parity, + max_priority_fee_per_gas, + max_fee_per_gas, + chain_id, + access_list, + authorization_list, + max_fee_per_blob_gas, + blob_versioned_hashes, + cumulative_gas_used, + effective_gas_price, + gas_used, + contract_address, + logs_bloom, + type_, + root, + status, + l1_fee, + l1_gas_price, + l1_gas_used, + l1_fee_scalar, + gas_used_for_l1, + blob_gas_price, + blob_gas_used, + deposit_nonce, + deposit_receipt_version, + l1_base_fee_scalar, + l1_blob_base_fee, + l1_blob_base_fee_scalar, + l1_block_number, + mint, + sighash, + source_hash, + }) } } impl Block { /// Convert an arrow RecordBatch into a vector of Block structs - pub fn from_arrow(batch: &RecordBatch) -> Vec { - let number = column_as::(batch, "number"); - let hash = column_as::(batch, "hash"); - let parent_hash = column_as::(batch, "parent_hash"); - let nonce = column_as::(batch, "nonce"); - let sha3_uncles = column_as::(batch, "sha3_uncles"); - let logs_bloom = column_as::(batch, "logs_bloom"); - let transactions_root = column_as::(batch, "transactions_root"); - let state_root = column_as::(batch, "state_root"); - let receipts_root = column_as::(batch, "receipts_root"); - let miner = column_as::(batch, "miner"); - let difficulty = column_as::(batch, "difficulty"); - let total_difficulty = column_as::(batch, "total_difficulty"); - let extra_data = column_as::(batch, "extra_data"); - let size = column_as::(batch, "size"); - let gas_limit = column_as::(batch, "gas_limit"); - let gas_used = column_as::(batch, "gas_used"); - let timestamp = column_as::(batch, "timestamp"); - let uncles = column_as::(batch, "uncles"); - let base_fee_per_gas = column_as::(batch, "base_fee_per_gas"); - let blob_gas_used = column_as::(batch, "blob_gas_used"); - let excess_blob_gas = column_as::(batch, "excess_blob_gas"); - let parent_beacon_block_root = column_as::(batch, "parent_beacon_block_root"); - let withdrawals_root = column_as::(batch, "withdrawals_root"); - let withdrawals = column_as::(batch, "withdrawals"); - let l1_block_number = column_as::(batch, "l1_block_number"); - let send_count = column_as::(batch, "send_count"); - let send_root = column_as::(batch, "send_root"); - let mix_hash = column_as::(batch, "mix_hash"); - - (0..batch.num_rows()) - .map(|idx| Self { - number: get_u64(number, idx), - hash: get_binary(hash, idx), - parent_hash: get_binary(parent_hash, idx), - nonce: get_binary(nonce, idx), - sha3_uncles: get_binary(sha3_uncles, idx), - logs_bloom: get_binary(logs_bloom, idx), - transactions_root: get_binary(transactions_root, idx), - state_root: get_binary(state_root, idx), - receipts_root: get_binary(receipts_root, idx), - miner: get_binary(miner, idx), - difficulty: get_binary(difficulty, idx), - total_difficulty: get_binary(total_difficulty, idx), - extra_data: get_binary(extra_data, idx), - size: get_binary(size, idx), - gas_limit: get_binary(gas_limit, idx), - gas_used: get_binary(gas_used, idx), - timestamp: get_binary(timestamp, idx), - uncles: get_binary(uncles, idx).map(|v: &[u8]| { - v.chunks(32) - .map(|chunk| chunk.try_into().unwrap()) - .collect() - }), - base_fee_per_gas: get_binary(base_fee_per_gas, idx), - blob_gas_used: get_binary(blob_gas_used, idx), - excess_blob_gas: get_binary(excess_blob_gas, idx), - parent_beacon_block_root: get_binary(parent_beacon_block_root, idx), - withdrawals_root: get_binary(withdrawals_root, idx), - withdrawals: get_binary(withdrawals, idx).map(|v| bincode::deserialize(v).unwrap()), - l1_block_number: get_u64(l1_block_number, idx).map(UInt::from), - send_count: get_binary(send_count, idx), - send_root: get_binary(send_root, idx), - mix_hash: get_binary(mix_hash, idx), - }) - .collect() + pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result> { + let mut blocks = Vec::new(); + for block_reader in BlockReader::iter(batch) { + blocks.push( + block_reader + .try_into() + .context("convert block reader to block")?, + ); + } + Ok(blocks) } } impl Transaction { /// Convert an arrow RecordBatch into a vector of Transaction structs - pub fn from_arrow(batch: &RecordBatch) -> Vec { - let block_hash = column_as::(batch, "block_hash"); - let block_number = column_as::(batch, "block_number"); - let from = column_as::(batch, "from"); - let gas = column_as::(batch, "gas"); - let gas_price = column_as::(batch, "gas_price"); - let hash = column_as::(batch, "hash"); - let input = column_as::(batch, "input"); - let nonce = column_as::(batch, "nonce"); - let to = column_as::(batch, "to"); - let transaction_index = column_as::(batch, "transaction_index"); - let value = column_as::(batch, "value"); - let v = column_as::(batch, "v"); - let r = column_as::(batch, "r"); - let s = column_as::(batch, "s"); - let y_parity = column_as::(batch, "y_parity"); - let max_priority_fee_per_gas = column_as::(batch, "max_priority_fee_per_gas"); - let max_fee_per_gas = column_as::(batch, "max_fee_per_gas"); - let chain_id = column_as::(batch, "chain_id"); - let access_list = column_as::(batch, "access_list"); - let authorization_list = column_as::(batch, "authorization_list"); - let max_fee_per_blob_gas = column_as::(batch, "max_fee_per_blob_gas"); - let blob_versioned_hashes = column_as::(batch, "blob_versioned_hashes"); - let cumulative_gas_used = column_as::(batch, "cumulative_gas_used"); - let effective_gas_price = column_as::(batch, "effective_gas_price"); - let gas_used = column_as::(batch, "gas_used"); - let contract_address = column_as::(batch, "contract_address"); - let logs_bloom = column_as::(batch, "logs_bloom"); - let type_ = column_as::(batch, "type"); - let root = column_as::(batch, "root"); - let status = column_as::(batch, "status"); - let l1_fee = column_as::(batch, "l1_fee"); - let l1_gas_price = column_as::(batch, "l1_gas_price"); - let l1_gas_used = column_as::(batch, "l1_gas_used"); - let l1_fee_scalar = column_as::(batch, "l1_fee_scalar"); - let gas_used_for_l1 = column_as::(batch, "gas_used_for_l1"); - let blob_gas_price = column_as::(batch, "blob_gas_price"); - let blob_gas_used = column_as::(batch, "blob_gas_used"); - let deposit_nonce = column_as::(batch, "deposit_nonce"); - let deposit_receipt_version = column_as::(batch, "deposit_receipt_version"); - let l1_base_fee_scalar = column_as::(batch, "l1_base_fee_scalar"); - let l1_blob_base_fee = column_as::(batch, "l1_blob_base_fee"); - let l1_blob_base_fee_scalar = column_as::(batch, "l1_blob_base_fee_scalar"); - let l1_block_number = column_as::(batch, "l1_block_number"); - let mint = column_as::(batch, "mint"); - let sighash = column_as::(batch, "sighash"); - let source_hash = column_as::(batch, "source_hash"); - - (0..batch.num_rows()) - .map(|idx| Self { - block_hash: get_binary(block_hash, idx), - block_number: get_u64(block_number, idx).map(UInt::from), - from: get_binary(from, idx), - gas: get_binary(gas, idx), - gas_price: get_binary(gas_price, idx), - hash: get_binary(hash, idx), - input: get_binary(input, idx), - nonce: get_binary(nonce, idx), - to: get_binary(to, idx), - transaction_index: get_u64(transaction_index, idx).map(UInt::from), - value: get_binary(value, idx), - v: get_binary(v, idx), - r: get_binary(r, idx), - s: get_binary(s, idx), - y_parity: get_binary(y_parity, idx), - max_priority_fee_per_gas: get_binary(max_priority_fee_per_gas, idx), - max_fee_per_gas: get_binary(max_fee_per_gas, idx), - chain_id: get_binary(chain_id, idx), - access_list: get_binary(access_list, idx).map(|v| bincode::deserialize(v).unwrap()), - authorization_list: get_binary(authorization_list, idx) - .map(|v| bincode::deserialize(v).unwrap()), - max_fee_per_blob_gas: get_binary(max_fee_per_blob_gas, idx), - blob_versioned_hashes: get_binary(blob_versioned_hashes, idx).map(|v: &[u8]| { - v.chunks(32) - .map(|chunk| chunk.try_into().unwrap()) - .collect() - }), - cumulative_gas_used: get_binary(cumulative_gas_used, idx), - effective_gas_price: get_binary(effective_gas_price, idx), - gas_used: get_binary(gas_used, idx), - contract_address: get_binary(contract_address, idx), - logs_bloom: get_binary(logs_bloom, idx), - type_: get_u8(type_, idx).map(TransactionType::from), - root: get_binary(root, idx), - status: get_u8(status, idx).map(|v| TransactionStatus::from_u8(v).unwrap()), - l1_fee: get_binary(l1_fee, idx), - l1_gas_price: get_binary(l1_gas_price, idx), - l1_gas_used: get_binary(l1_gas_used, idx), - l1_fee_scalar: get_binary(l1_fee_scalar, idx) - .map(|v| std::str::from_utf8(v).unwrap().parse().unwrap()), - gas_used_for_l1: get_binary(gas_used_for_l1, idx), - blob_gas_price: get_binary(blob_gas_price, idx), - blob_gas_used: get_binary(blob_gas_used, idx), - deposit_nonce: get_binary(deposit_nonce, idx), - deposit_receipt_version: get_binary(deposit_receipt_version, idx), - l1_base_fee_scalar: get_binary(l1_base_fee_scalar, idx), - l1_blob_base_fee: get_binary(l1_blob_base_fee, idx), - l1_blob_base_fee_scalar: get_binary(l1_blob_base_fee_scalar, idx), - l1_block_number: get_binary(l1_block_number, idx), - mint: get_binary(mint, idx), - sighash: get_binary(sighash, idx), - source_hash: get_binary(source_hash, idx), - }) - .collect() + pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result> { + let mut transactions = Vec::new(); + for transaction_reader in TransactionReader::iter(batch) { + transactions.push( + transaction_reader + .try_into() + .context("convert transaction reader to transaction")?, + ); + } + Ok(transactions) } } -impl Log { - /// Convert an arrow RecordBatch into a vector of Log structs - pub fn from_arrow(batch: &RecordBatch) -> Vec { - let removed = column_as::(batch, "removed"); - let log_index = column_as::(batch, "log_index"); - let transaction_index = column_as::(batch, "transaction_index"); - let transaction_hash = column_as::(batch, "transaction_hash"); - let block_hash = column_as::(batch, "block_hash"); - let block_number = column_as::(batch, "block_number"); - let address = column_as::(batch, "address"); - let data = column_as::(batch, "data"); - let topic0 = column_as::(batch, "topic0"); - let topic1 = column_as::(batch, "topic1"); - let topic2 = column_as::(batch, "topic2"); - let topic3 = column_as::(batch, "topic3"); +impl TryFrom> for Trace { + type Error = anyhow::Error; - (0..batch.num_rows()) - .map(|idx| Self { - removed: get_bool(removed, idx), - log_index: get_u64(log_index, idx).map(UInt::from), - transaction_index: get_u64(transaction_index, idx).map(UInt::from), - transaction_hash: get_binary(transaction_hash, idx), - block_hash: get_binary(block_hash, idx), - block_number: get_u64(block_number, idx).map(UInt::from), - address: get_binary(address, idx), - data: get_binary(data, idx), - topics: { - let mut arr = ArrayVec::new(); + fn try_from(reader: TraceReader<'_>) -> Result { + let from = to_nested_opt(reader.from()).context("read field from")?; + let to = to_nested_opt(reader.to()).context("read field to")?; + let call_type = to_nested_opt(reader.call_type()).context("read field call_type")?; + let gas = to_nested_opt(reader.gas()).context("read field gas")?; + let input = to_nested_opt(reader.input()).context("read field input")?; + let init = to_nested_opt(reader.init()).context("read field init")?; + let value = to_nested_opt(reader.value()).context("read field value")?; + let author = to_nested_opt(reader.author()).context("read field author")?; + let reward_type = to_nested_opt(reader.reward_type()).context("read field reward_type")?; + let block_hash = to_opt(reader.block_hash()).context("read field block_hash")?; + let block_number = to_opt(reader.block_number()).context("read field block_number")?; + let address = to_nested_opt(reader.address()).context("read field address")?; + let code = to_nested_opt(reader.code()).context("read field code")?; + let gas_used = to_nested_opt(reader.gas_used()).context("read field gas_used")?; + let output = to_nested_opt(reader.output()).context("read field output")?; + let subtraces = to_nested_opt(reader.subtraces()).context("read field subtraces")?; + let trace_address = + to_nested_opt(reader.trace_address()).context("read field trace_address")?; + let transaction_hash = + to_nested_opt(reader.transaction_hash()).context("read field transaction_hash")?; + let transaction_position = to_nested_opt(reader.transaction_position()) + .context("read field transaction_position")?; + let type_ = to_nested_opt(reader.type_()).context("read field type_")?; + let error = to_nested_opt(reader.error()).context("read field error")?; + let sighash = to_nested_opt(reader.sighash()).context("read field sighash")?; + let action_address = + to_nested_opt(reader.action_address()).context("read field action_address")?; + let balance = to_nested_opt(reader.balance()).context("read field balance")?; + let refund_address = + to_nested_opt(reader.refund_address()).context("read field refund_address")?; - arr.push(get_binary(topic0, idx)); - arr.push(get_binary(topic1, idx)); - arr.push(get_binary(topic2, idx)); - arr.push(get_binary(topic3, idx)); + Ok(Self { + from, + to, + call_type, + gas, + input, + init, + value, + author, + reward_type, + block_hash, + block_number, + address, + code, + gas_used, + output, + subtraces, + trace_address, + transaction_hash, + transaction_position, + type_, + error, + sighash, + action_address, + balance, + refund_address, + }) + } +} - arr - }, - }) - .collect() +impl Log { + /// Convert an arrow RecordBatch into a vector of Log structs + pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result> { + let mut logs = Vec::new(); + for log_reader in LogReader::iter(batch) { + logs.push(log_reader.try_into().context("convert log reader to log")?); + } + Ok(logs) } } impl Trace { /// Convert an arrow RecordBatch into a vector of Trace structs - pub fn from_arrow(batch: &RecordBatch) -> Vec { - let from = column_as::(batch, "from"); - let to = column_as::(batch, "to"); - let call_type = column_as::(batch, "call_type"); - let gas = column_as::(batch, "gas"); - let input = column_as::(batch, "input"); - let init = column_as::(batch, "init"); - let value = column_as::(batch, "value"); - let author = column_as::(batch, "author"); - let reward_type = column_as::(batch, "reward_type"); - let block_hash = column_as::(batch, "block_hash"); - let block_number = column_as::(batch, "block_number"); - let address = column_as::(batch, "address"); - let code = column_as::(batch, "code"); - let gas_used = column_as::(batch, "gas_used"); - let output = column_as::(batch, "output"); - let subtraces = column_as::(batch, "subtraces"); - let trace_address = column_as::(batch, "trace_address"); - let transaction_hash = column_as::(batch, "transaction_hash"); - let transaction_position = column_as::(batch, "transaction_position"); - let type_ = column_as::(batch, "type"); - let error = column_as::(batch, "error"); - let sighash = column_as::(batch, "sighash"); - let action_address = column_as::(batch, "action_address"); - let balance = column_as::(batch, "balance"); - let refund_address = column_as::(batch, "refund_address"); - - (0..batch.num_rows()) - .map(|idx| Self { - from: get_binary(from, idx), - to: get_binary(to, idx), - call_type: get_str(call_type, idx).map(str::to_owned), - gas: get_binary(gas, idx), - input: get_binary(input, idx), - init: get_binary(init, idx), - value: get_binary(value, idx), - author: get_binary(author, idx), - reward_type: get_str(reward_type, idx).map(str::to_owned), - block_hash: get_binary(block_hash, idx), - block_number: get_u64(block_number, idx), - address: get_binary(address, idx), - code: get_binary(code, idx), - gas_used: get_binary(gas_used, idx), - output: get_binary(output, idx), - subtraces: get_u64(subtraces, idx), - trace_address: get_binary(trace_address, idx) - .map(|v| bincode::deserialize(v).unwrap()), - transaction_hash: get_binary(transaction_hash, idx), - transaction_position: get_u64(transaction_position, idx), - type_: get_str(type_, idx).map(str::to_owned), - error: get_str(error, idx).map(str::to_owned), - sighash: get_binary(sighash, idx), - action_address: get_binary(action_address, idx), - balance: get_binary(balance, idx), - refund_address: get_binary(refund_address, idx), - }) - .collect() + pub fn from_arrow(batch: &RecordBatch) -> anyhow::Result> { + let mut traces = Vec::new(); + for trace_reader in TraceReader::iter(batch) { + traces.push( + trace_reader + .try_into() + .context("convert trace reader to trace")?, + ); + } + Ok(traces) } } diff --git a/hypersync-client/src/lib.rs b/hypersync-client/src/lib.rs index 6049236..da607c0 100644 --- a/hypersync-client/src/lib.rs +++ b/hypersync-client/src/lib.rs @@ -81,6 +81,7 @@ use reqwest::{header, Method}; use reqwest_eventsource::retry::ExponentialBackoff; use reqwest_eventsource::{Event, EventSource}; +pub mod arrow_reader; mod column_mapping; mod config; mod decode; @@ -273,7 +274,8 @@ impl Client { while let Some(res) = recv.recv().await { let res = res.context("get response")?; - let res: QueryResponse = QueryResponse::from(&res); + let res: QueryResponse = + QueryResponse::try_from(&res).context("convert arrow response")?; for batch in res.data.blocks { data.blocks.push(batch); @@ -360,7 +362,8 @@ impl Client { while let Some(res) = recv.recv().await { let res = res.context("get response")?; - let res: QueryResponse = QueryResponse::from(&res); + let res: QueryResponse = + QueryResponse::try_from(&res).context("convert arrow response")?; let events = event_join_strategy.join_from_response_data(res.data); data.extend(events); @@ -713,7 +716,9 @@ impl Client { /// ``` pub async fn get(&self, query: &Query) -> Result { let arrow_response = self.get_arrow(query).await.context("get data")?; - Ok(QueryResponse::from(&arrow_response)) + let converted = + QueryResponse::try_from(&arrow_response).context("convert arrow response")?; + Ok(converted) } /// Add block, transaction and log fields selection to the query, executes it with retries @@ -752,10 +757,7 @@ impl Client { let event_join_strategy = InternalEventJoinStrategy::from(&query.field_selection); event_join_strategy.add_join_fields_to_selection(&mut query.field_selection); let arrow_response = self.get_arrow(&query).await.context("get data")?; - Ok(EventResponse::from_arrow_response( - &arrow_response, - &event_join_strategy, - )) + EventResponse::try_from_arrow_response(&arrow_response, &event_join_strategy) } /// Executes query once and returns the result in (Arrow, size) format using JSON serialization. @@ -1011,13 +1013,11 @@ impl Client { tokio::spawn(async move { while let Some(resp) = inner_rx.recv().await { - let is_err = resp.is_err(); - if tx - .send(resp.map(|r| QueryResponse::from(&r))) - .await - .is_err() - || is_err - { + let msg = resp + .context("inner receiver") + .and_then(|r| QueryResponse::try_from(&r)); + let is_err = msg.is_err(); + if tx.send(msg).await.is_err() || is_err { return; } } @@ -1081,15 +1081,11 @@ impl Client { tokio::spawn(async move { while let Some(resp) = inner_rx.recv().await { - let is_err = resp.is_err(); - if tx - .send( - resp.map(|r| EventResponse::from_arrow_response(&r, &event_join_strategy)), - ) - .await - .is_err() - || is_err - { + let msg = resp + .context("inner receiver") + .and_then(|r| EventResponse::try_from_arrow_response(&r, &event_join_strategy)); + let is_err = msg.is_err(); + if tx.send(msg).await.is_err() || is_err { return; } } diff --git a/hypersync-client/src/types.rs b/hypersync-client/src/types.rs index c5f2626..3e5bcaa 100644 --- a/hypersync-client/src/types.rs +++ b/hypersync-client/src/types.rs @@ -1,4 +1,5 @@ use crate::simple_types::{Block, Event, InternalEventJoinStrategy, Log, Trace, Transaction}; +use anyhow::Context; use arrow::array::RecordBatch; use hypersync_net_types::RollbackGuard; @@ -34,49 +35,58 @@ pub struct ResponseData { impl EventResponse { /// Create EventResponse from ArrowResponse with the specified event join strategy - pub(crate) fn from_arrow_response( + pub(crate) fn try_from_arrow_response( arrow_response: &ArrowResponse, event_join_strategy: &InternalEventJoinStrategy, - ) -> Self { - let r: QueryResponse = arrow_response.into(); - Self { + ) -> anyhow::Result { + let r: QueryResponse = arrow_response + .try_into() + .context("convert arrow response")?; + Ok(Self { archive_height: r.archive_height, next_block: r.next_block, total_execution_time: r.total_execution_time, data: event_join_strategy.join_from_response_data(r.data), rollback_guard: r.rollback_guard, - } + }) } } -impl From<&'_ ArrowResponse> for QueryResponse { - fn from(arrow_response: &ArrowResponse) -> Self { +impl TryFrom<&'_ ArrowResponse> for QueryResponse { + type Error = anyhow::Error; + fn try_from(arrow_response: &ArrowResponse) -> Result { let blocks = arrow_response .data .blocks .iter() .map(Block::from_arrow) - .collect(); + .collect::>>() + .context("convert blocks")?; + let transactions = arrow_response .data .transactions .iter() .map(Transaction::from_arrow) - .collect(); + .collect::>>() + .context("convert transactions")?; + let logs = arrow_response .data .logs .iter() .map(Log::from_arrow) - .collect(); + .collect::>>() + .context("convert logs")?; let traces = arrow_response .data .traces .iter() .map(Trace::from_arrow) - .collect(); + .collect::>>() + .context("convert traces")?; - QueryResponse { + Ok(QueryResponse { archive_height: arrow_response.archive_height, next_block: arrow_response.next_block, total_execution_time: arrow_response.total_execution_time, @@ -87,7 +97,7 @@ impl From<&'_ ArrowResponse> for QueryResponse { traces, }, rollback_guard: arrow_response.rollback_guard.clone(), - } + }) } } diff --git a/hypersync-net-types/src/block.rs b/hypersync-net-types/src/block.rs index a49cb4d..4254150 100644 --- a/hypersync-net-types/src/block.rs +++ b/hypersync-net-types/src/block.rs @@ -312,6 +312,39 @@ impl BlockField { Self::iter().collect() } + pub const fn is_nullable(&self) -> bool { + match self { + BlockField::Nonce + | BlockField::Difficulty + | BlockField::TotalDifficulty + | BlockField::Uncles + | BlockField::BaseFeePerGas + | BlockField::BlobGasUsed + | BlockField::ExcessBlobGas + | BlockField::ParentBeaconBlockRoot + | BlockField::WithdrawalsRoot + | BlockField::Withdrawals + | BlockField::L1BlockNumber + | BlockField::SendCount + | BlockField::SendRoot + | BlockField::MixHash => true, + BlockField::Number + | BlockField::Hash + | BlockField::ParentHash + | BlockField::Sha3Uncles + | BlockField::LogsBloom + | BlockField::TransactionsRoot + | BlockField::StateRoot + | BlockField::ReceiptsRoot + | BlockField::Miner + | BlockField::ExtraData + | BlockField::Size + | BlockField::GasLimit + | BlockField::GasUsed + | BlockField::Timestamp => false, + } + } + /// Convert BlockField to Cap'n Proto enum pub fn to_capnp(&self) -> crate::hypersync_net_types_capnp::BlockField { match self { @@ -464,4 +497,23 @@ mod tests { let from_str = BlockField::from_str("number").unwrap(); assert_eq!(block_field, from_str); } + + #[test] + fn nullable_fields() { + use std::collections::HashMap; + + let is_nullable_map: HashMap<_, _> = BlockField::all() + .iter() + .map(|f| (f.to_string(), f.is_nullable())) + .collect(); + for field in hypersync_schema::block_header().fields.iter() { + let should_be_nullable = is_nullable_map.get(field.name().as_str()).unwrap(); + assert_eq!( + field.is_nullable(), + *should_be_nullable, + "field {} nullable mismatch", + field.name() + ); + } + } } diff --git a/hypersync-net-types/src/log.rs b/hypersync-net-types/src/log.rs index 2086e44..af7f744 100644 --- a/hypersync-net-types/src/log.rs +++ b/hypersync-net-types/src/log.rs @@ -489,6 +489,23 @@ impl LogField { Self::iter().collect() } + pub const fn is_nullable(&self) -> bool { + match self { + LogField::Removed + | LogField::Topic0 + | LogField::Topic1 + | LogField::Topic2 + | LogField::Topic3 => true, + LogField::TransactionHash + | LogField::BlockHash + | LogField::BlockNumber + | LogField::TransactionIndex + | LogField::LogIndex + | LogField::Address + | LogField::Data => false, + } + } + /// Convert LogField to Cap'n Proto enum pub fn to_capnp(&self) -> crate::hypersync_net_types_capnp::LogField { match self { @@ -536,6 +553,8 @@ impl LogField { #[cfg(test)] mod tests { + use std::collections::HashMap; + use super::*; use crate::{query::tests::test_query_serde, Query}; use hypersync_format::Hex; @@ -670,4 +689,21 @@ mod tests { check_log_filter_json(json!({"topics": [[TOPIC], [], [], []]})); check_log_filter_json(json!({"topics": [[], [], [TOPIC]]})); } + + #[test] + fn nullable_fields() { + let is_nullable_map: HashMap<_, _> = LogField::all() + .iter() + .map(|f| (f.to_string(), f.is_nullable())) + .collect(); + for field in hypersync_schema::log().fields.iter() { + let should_be_nullable = is_nullable_map.get(field.name().as_str()).unwrap(); + assert_eq!( + field.is_nullable(), + *should_be_nullable, + "field {} nullable mismatch", + field.name() + ); + } + } } diff --git a/hypersync-net-types/src/trace.rs b/hypersync-net-types/src/trace.rs index b99fa43..9577b20 100644 --- a/hypersync-net-types/src/trace.rs +++ b/hypersync-net-types/src/trace.rs @@ -700,6 +700,35 @@ impl TraceField { Self::iter().collect() } + pub const fn is_nullable(&self) -> bool { + match self { + TraceField::From + | TraceField::To + | TraceField::CallType + | TraceField::Gas + | TraceField::Input + | TraceField::Init + | TraceField::Value + | TraceField::Author + | TraceField::RewardType + | TraceField::Address + | TraceField::Code + | TraceField::GasUsed + | TraceField::Output + | TraceField::Subtraces + | TraceField::TraceAddress + | TraceField::TransactionHash + | TraceField::TransactionPosition + | TraceField::Type + | TraceField::Error + | TraceField::Sighash + | TraceField::ActionAddress + | TraceField::Balance + | TraceField::RefundAddress => true, + TraceField::BlockHash | TraceField::BlockNumber => false, + } + } + /// Convert TraceField to Cap'n Proto enum pub fn to_capnp(&self) -> crate::hypersync_net_types_capnp::TraceField { match self { @@ -842,4 +871,23 @@ mod tests { test_query_serde(query, "trace selection with full values"); } + + #[test] + fn nullable_fields() { + use std::collections::HashMap; + + let is_nullable_map: HashMap<_, _> = TraceField::all() + .iter() + .map(|f| (f.to_string(), f.is_nullable())) + .collect(); + for field in hypersync_schema::trace().fields.iter() { + let should_be_nullable = is_nullable_map.get(field.name().as_str()).unwrap(); + assert_eq!( + field.is_nullable(), + *should_be_nullable, + "field {} nullable mismatch", + field.name() + ); + } + } } diff --git a/hypersync-net-types/src/transaction.rs b/hypersync-net-types/src/transaction.rs index 2cec834..5cb7b26 100644 --- a/hypersync-net-types/src/transaction.rs +++ b/hypersync-net-types/src/transaction.rs @@ -983,6 +983,57 @@ impl TransactionField { Self::iter().collect() } + pub const fn is_nullable(&self) -> bool { + match self { + TransactionField::From + | TransactionField::GasPrice + | TransactionField::To + | TransactionField::V + | TransactionField::R + | TransactionField::S + | TransactionField::MaxPriorityFeePerGas + | TransactionField::MaxFeePerGas + | TransactionField::ChainId + | TransactionField::ContractAddress + | TransactionField::Type + | TransactionField::Root + | TransactionField::Status + | TransactionField::Sighash + | TransactionField::YParity + | TransactionField::AccessList + | TransactionField::AuthorizationList + | TransactionField::L1Fee + | TransactionField::L1GasPrice + | TransactionField::L1GasUsed + | TransactionField::L1FeeScalar + | TransactionField::GasUsedForL1 + | TransactionField::MaxFeePerBlobGas + | TransactionField::BlobVersionedHashes + | TransactionField::DepositNonce + | TransactionField::BlobGasPrice + | TransactionField::DepositReceiptVersion + | TransactionField::BlobGasUsed + | TransactionField::L1BaseFeeScalar + | TransactionField::L1BlobBaseFee + | TransactionField::L1BlobBaseFeeScalar + | TransactionField::L1BlockNumber + | TransactionField::Mint + | TransactionField::SourceHash => true, + TransactionField::BlockHash + | TransactionField::BlockNumber + | TransactionField::Gas + | TransactionField::Hash + | TransactionField::Input + | TransactionField::Nonce + | TransactionField::TransactionIndex + | TransactionField::Value + | TransactionField::CumulativeGasUsed + | TransactionField::EffectiveGasPrice + | TransactionField::GasUsed + | TransactionField::LogsBloom => false, + } + } + /// Convert TransactionField to Cap'n Proto enum pub fn to_capnp(&self) -> crate::hypersync_net_types_capnp::TransactionField { match self { @@ -1329,4 +1380,23 @@ mod tests { test_query_serde(query, "authorization selection with rest defaults"); } + + #[test] + fn nullable_fields() { + use std::collections::HashMap; + + let is_nullable_map: HashMap<_, _> = TransactionField::all() + .iter() + .map(|f| (f.to_string(), f.is_nullable())) + .collect(); + for field in hypersync_schema::transaction().fields.iter() { + let should_be_nullable = is_nullable_map.get(field.name().as_str()).unwrap(); + assert_eq!( + field.is_nullable(), + *should_be_nullable, + "field {} nullable mismatch", + field.name() + ); + } + } }