diff --git a/Cargo.lock b/Cargo.lock index 4d6f599a22f..05c1b524f74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3356,6 +3356,7 @@ dependencies = [ "ethrex-trie", "ethrex-vm", "hex", + "lru 0.16.2", "rustc-hash 2.1.1", "secp256k1", "serde_json", diff --git a/crates/blockchain/Cargo.toml b/crates/blockchain/Cargo.toml index 9ade036cd49..fca66b20c0a 100644 --- a/crates/blockchain/Cargo.toml +++ b/crates/blockchain/Cargo.toml @@ -26,6 +26,8 @@ tokio-util.workspace = true ethrex-metrics = { path = "./metrics", default-features = false } +lru = "0.16.2" + [dev-dependencies] serde_json.workspace = true hex = "0.4.3" diff --git a/crates/blockchain/blockchain.rs b/crates/blockchain/blockchain.rs index 6ec5951415d..3caf24da298 100644 --- a/crates/blockchain/blockchain.rs +++ b/crates/blockchain/blockchain.rs @@ -36,9 +36,10 @@ use ethrex_storage::{ use ethrex_trie::{Nibbles, Node, NodeRef, Trie}; use ethrex_vm::backends::levm::db::DatabaseLogger; use ethrex_vm::{BlockExecutionResult, DynVmDatabase, Evm, EvmError}; +use lru::LruCache; use mempool::Mempool; use payload::PayloadOrTask; -use rustc_hash::FxHashMap; +use rustc_hash::{FxBuildHasher, FxHashMap}; use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::sync::{ @@ -61,6 +62,8 @@ use ethrex_common::types::BlobsBundle; const MAX_PAYLOADS: usize = 10; const MAX_MEMPOOL_SIZE_DEFAULT: usize = 10_000; +const TX_SENDER_CACHE_MAX_SIZE: usize = 50_000; + type StoreUpdatesMap = FxHashMap, FxHashMap>)>; //TODO: Implement a struct Chain or BlockChain to encapsulate //functionality and canonical chain state and config @@ -90,6 +93,8 @@ pub struct Blockchain { /// Mapping from a payload id to either a complete payload or a payload build task /// We need to keep completed payloads around in case consensus requests them twice pub payloads: Arc>>, + /// Cache for transaction senders, mapping from transaction hash to sender address + tx_sender_cache: Arc>>, } #[derive(Debug, Clone)] @@ -142,6 +147,10 @@ impl Blockchain { is_synced: AtomicBool::new(false), payloads: Arc::new(TokioMutex::new(Vec::new())), options: blockchain_opts, + tx_sender_cache: Arc::new(Mutex::new(LruCache::with_hasher( + TX_SENDER_CACHE_MAX_SIZE.try_into().unwrap(), + FxBuildHasher, + ))), } } @@ -152,6 +161,10 @@ impl Blockchain { is_synced: AtomicBool::new(false), payloads: Arc::new(TokioMutex::new(Vec::new())), options: BlockchainOptions::default(), + tx_sender_cache: Arc::new(Mutex::new(LruCache::with_hasher( + TX_SENDER_CACHE_MAX_SIZE.try_into().unwrap(), + FxBuildHasher, + ))), } } @@ -227,14 +240,35 @@ impl Blockchain { let queue_length = AtomicUsize::new(0); let queue_length_ref = &queue_length; let mut max_queue_length = 0; + let tx_sender_cache = self.tx_sender_cache.clone(); + let (execution_result, account_updates_list) = std::thread::scope(|s| { let max_queue_length_ref = &mut max_queue_length; let (tx, rx) = channel(); let execution_handle = std::thread::Builder::new() .name("block_executor_execution".to_string()) .spawn_scoped(s, move || -> Result<_, ChainError> { + let block_senders = { + let mut tx_sender_cache = tx_sender_cache.lock().unwrap(); + block.body.precompute_tx_hashes(); + block + .body + .transactions + .iter() + .map(|tx| { + tx_sender_cache + .get(&tx.hash()) + .copied() + .unwrap_or(H160::zero()) + }) + .collect() + }; + let block_senders = block + .body + .recover_with_cached_senders(block_senders) + .unwrap(); let execution_result = - vm.execute_block_pipeline(block, tx, queue_length_ref)?; + vm.execute_block_pipeline(block, tx, queue_length_ref, &block_senders)?; // Validate execution went alright validate_gas_used(&execution_result.receipts, &block.header)?; @@ -1400,8 +1434,9 @@ impl Blockchain { // Add transaction and blobs bundle to storage self.mempool - .add_transaction(hash, MempoolTransaction::new(transaction, sender))?; + .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?; self.mempool.add_blobs_bundle(hash, blobs_bundle)?; + self.tx_sender_cache.lock().unwrap().put(hash, sender); Ok(hash) } @@ -1426,7 +1461,8 @@ impl Blockchain { // Add transaction to storage self.mempool - .add_transaction(hash, MempoolTransaction::new(transaction, sender))?; + .add_transaction(hash, sender, MempoolTransaction::new(transaction, sender))?; + self.tx_sender_cache.lock().unwrap().put(hash, sender); Ok(hash) } diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index db14719c79a..247066dbbd1 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -103,6 +103,7 @@ impl Mempool { pub fn add_transaction( &self, hash: H256, + tx_sender: Address, transaction: MempoolTransaction, ) -> Result<(), StoreError> { let mut inner = self.write()?; @@ -119,7 +120,7 @@ impl Mempool { inner.txs_order.push_back(hash); inner .txs_by_sender_nonce - .insert((transaction.sender(), transaction.nonce()), hash); + .insert((tx_sender, transaction.nonce()), hash); inner.transaction_pool.insert(hash, transaction); inner.broadcast_pool.insert(hash); @@ -852,9 +853,11 @@ mod tests { let filter = |tx: &Transaction| -> bool { matches!(tx, Transaction::EIP4844Transaction(_)) }; mempool - .add_transaction(blob_tx_hash, blob_tx.clone()) + .add_transaction(blob_tx_hash, blob_tx_sender, blob_tx.clone()) + .unwrap(); + mempool + .add_transaction(plain_tx_hash, plain_tx_sender, plain_tx) .unwrap(); - mempool.add_transaction(plain_tx_hash, plain_tx).unwrap(); let txs = mempool.filter_transactions_with_filter_fn(&filter).unwrap(); assert_eq!(txs, HashMap::from([(blob_tx.sender(), vec![blob_tx])])); } diff --git a/crates/common/types/block.rs b/crates/common/types/block.rs index e4f5eab06fa..d3a31f40642 100644 --- a/crates/common/types/block.rs +++ b/crates/common/types/block.rs @@ -20,7 +20,7 @@ use ethrex_rlp::{ structs::{Decoder, Encoder}, }; use ethrex_trie::Trie; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; +use rayon::iter::{IndexedParallelIterator, IntoParallelRefIterator, ParallelIterator}; use rkyv::{Archive, Deserialize as RDeserialize, Serialize as RSerialize}; use serde::{Deserialize, Serialize}; @@ -260,6 +260,31 @@ impl BlockBody { .map(|tx| Ok((tx, tx.sender()?))) .collect::, secp256k1::Error>>() } + + pub fn precompute_tx_hashes(&self) { + self.transactions.par_iter().for_each(|tx| { + let _ = tx.hash(); + }); + } + + pub fn recover_with_cached_senders( + &self, + cached_senders: Vec
, + ) -> Result, secp256k1::Error> { + // Recovering addresses is computationally expensive. + // Computing them in parallel greatly reduces execution time. + cached_senders + .par_iter() + .zip(&self.transactions) + .map(|(sender, tx)| { + if sender.is_zero() { + tx.sender() + } else { + Ok(*sender) + } + }) + .collect::, secp256k1::Error>>() + } } pub fn compute_transactions_root(transactions: &[Transaction]) -> H256 { diff --git a/crates/vm/backends/levm/mod.rs b/crates/vm/backends/levm/mod.rs index 5a303f987e4..8d41193fffe 100644 --- a/crates/vm/backends/levm/mod.rs +++ b/crates/vm/backends/levm/mod.rs @@ -101,6 +101,7 @@ impl LEVM { vm_type: VMType, merkleizer: Sender>, queue_length: &AtomicUsize, + block_senders: &[Address], ) -> Result { Self::prepare_block(block, db, vm_type)?; @@ -113,9 +114,7 @@ impl LEVM { // The value itself can be safely changed. let mut tx_since_last_flush = 2; - for (tx, tx_sender) in block.body.get_transactions_with_sender().map_err(|error| { - EvmError::Transaction(format!("Couldn't recover addresses with error: {error}")) - })? { + for (tx, tx_sender) in block.body.transactions.iter().zip(block_senders) { if cumulative_gas_used + tx.gas_limit() > block.header.gas_limit { return Err(EvmError::Transaction(format!( "Gas allowance exceeded. Block gas limit {} can be surpassed by executing transaction with gas limit {}", @@ -126,7 +125,7 @@ impl LEVM { let report = Self::execute_tx_in_block( tx, - tx_sender, + *tx_sender, &block.header, db, vm_type, diff --git a/crates/vm/backends/mod.rs b/crates/vm/backends/mod.rs index 8bbacc2aebd..23d7d984074 100644 --- a/crates/vm/backends/mod.rs +++ b/crates/vm/backends/mod.rs @@ -88,8 +88,16 @@ impl Evm { block: &Block, merkleizer: Sender>, queue_length: &AtomicUsize, + block_senders: &[Address], ) -> Result { - LEVM::execute_block_pipeline(block, &mut self.db, self.vm_type, merkleizer, queue_length) + LEVM::execute_block_pipeline( + block, + &mut self.db, + self.vm_type, + merkleizer, + queue_length, + block_senders, + ) } /// Wraps [LEVM::execute_tx].