Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

### 2025-10-13


- Remove explicit cache-related options from RocksDB configuration and reverted optimistic transactions to reduce RAM usage [#4853](https://github.com/lambdaclass/ethrex/pull/4853)
- Remove unnecesary mul in ecpairing [#4843](https://github.com/lambdaclass/ethrex/pull/4843)

### 2025-10-06
Expand Down
76 changes: 39 additions & 37 deletions crates/storage/store_db/rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use ethrex_common::{
};
use ethrex_trie::{Nibbles, Trie};
use rocksdb::{
BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor, MultiThreaded,
OptimisticTransactionDB, Options, WriteBatchWithTransaction,
BlockBasedOptions, BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded,
Options, WriteBatch,
};
use std::{
collections::HashSet,
Expand Down Expand Up @@ -111,7 +111,7 @@ const CF_FULLSYNC_HEADERS: &str = "fullsync_headers";

#[derive(Debug)]
pub struct Store {
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
trie_cache: Arc<RwLock<TrieLayerCache>>,
}

Expand All @@ -121,8 +121,6 @@ impl Store {
db_options.create_if_missing(true);
db_options.create_missing_column_families(true);

let cache = Cache::new_lru_cache(4 * 1024 * 1024 * 1024); // 4GB cache

db_options.set_max_open_files(-1);
db_options.set_max_file_opening_threads(16);

Expand Down Expand Up @@ -175,18 +173,17 @@ impl Store {
];

// Get existing column families to know which ones to drop later
let existing_cfs =
match OptimisticTransactionDB::<MultiThreaded>::list_cf(&db_options, path) {
Ok(cfs) => {
info!("Found existing column families: {:?}", cfs);
cfs
}
Err(_) => {
// Database doesn't exist yet
info!("Database doesn't exist, will create with expected column families");
vec!["default".to_string()]
}
};
let existing_cfs = match DBWithThreadMode::<MultiThreaded>::list_cf(&db_options, path) {
Ok(cfs) => {
info!("Found existing column families: {:?}", cfs);
cfs
}
Err(_) => {
// Database doesn't exist yet
info!("Database doesn't exist, will create with expected column families");
vec!["default".to_string()]
}
};

// Create descriptors for ALL existing CFs + expected ones (RocksDB requires opening all existing CFs)
let mut all_cfs_to_open = HashSet::new();
Expand Down Expand Up @@ -220,9 +217,7 @@ impl Store {
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(32 * 1024); // 32KB blocks
block_opts.set_cache_index_and_filter_blocks(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_CANONICAL_BLOCK_HASHES | CF_BLOCK_NUMBERS => {
Expand All @@ -232,10 +227,8 @@ impl Store {
cf_opts.set_target_file_size_base(128 * 1024 * 1024); // 128MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_bloom_filter(10.0, false);
block_opts.set_cache_index_and_filter_blocks(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_TRIE_NODES => {
Expand All @@ -248,10 +241,7 @@ impl Store {

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024); // 16KB
block_opts.set_block_cache(&cache);
block_opts.set_bloom_filter(10.0, false); // 10 bits per key
block_opts.set_cache_index_and_filter_blocks(true);
block_opts.set_pin_l0_filter_and_index_blocks_in_cache(true);
cf_opts.set_block_based_table_factory(&block_opts);
}
CF_RECEIPTS | CF_ACCOUNT_CODES => {
Expand All @@ -261,9 +251,7 @@ impl Store {
cf_opts.set_target_file_size_base(256 * 1024 * 1024); // 256MB

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_cache(&cache);
block_opts.set_block_size(32 * 1024); // 32KB
block_opts.set_block_cache(&cache);
cf_opts.set_block_based_table_factory(&block_opts);
}
_ => {
Expand All @@ -275,15 +263,29 @@ impl Store {

let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(16 * 1024);
block_opts.set_block_cache(&cache);
cf_opts.set_block_based_table_factory(&block_opts);
}
}

cf_descriptors.push(ColumnFamilyDescriptor::new(cf_name, cf_opts));
}

let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
// Note: we are not using transactions on our Rocksdb instance.
// This is safe as long as two conditions are met:
// - We never write to the same table from two different places concurrently.
// - We always use batch writes. This guarantees atomicity in rocksdb.
//
// For the first point, we know that all writes to the state and storage tries are
// done through the `apply_updates` function, called only after block execution.
// There is only one other place where we write to the tries, and that's during snap
// sync, through the `write_storage_trie_nodes_batch` function (and similarly for state trie nodes);
// this does not pose a problem because there is no block execution until snap sync is done.
//
// Regardless of transactionality, all writes go through a WAL, which ensures
// we get durability (i.e. crash recovery).
//
// For other less crucial tables refer to the db_safety documentation.
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
path,
cf_descriptors,
Expand Down Expand Up @@ -381,7 +383,7 @@ impl Store {
) -> Result<(), StoreError> {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

for (cf_name, key, value) in batch_ops {
let cf = db.cf_handle(&cf_name).ok_or_else(|| {
Expand Down Expand Up @@ -498,7 +500,7 @@ impl StoreEngine for Store {
)?;

let _span = tracing::trace_span!("Block DB update").entered();
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let mut trie = trie_cache.write().map_err(|_| StoreError::LockError)?;
if let Some(root) = trie.get_commitable(parent_state_root, COMMIT_THRESHOLD) {
Expand Down Expand Up @@ -580,7 +582,7 @@ impl StoreEngine for Store {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let [cf_headers, cf_bodies, cf_block_numbers, cf_tx_locations] = open_cfs(
&db,
Expand Down Expand Up @@ -689,7 +691,7 @@ impl StoreEngine for Store {
}

async fn remove_block(&self, block_number: BlockNumber) -> Result<(), StoreError> {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let Some(hash) = self.get_canonical_block_hash_sync(block_number)? else {
return Ok(());
Expand Down Expand Up @@ -939,7 +941,7 @@ impl StoreEngine for Store {
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;

let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

while let Some(Ok((key, _))) = iter.next() {
batch.delete_cf(&cf, key);
Expand Down Expand Up @@ -1203,7 +1205,7 @@ impl StoreEngine for Store {
let db = self.db.clone();

tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

let [cf_canonical, cf_chain_data] =
open_cfs(&db, [CF_CANONICAL_BLOCK_HASHES, CF_CHAIN_DATA])?;
Expand Down Expand Up @@ -1448,7 +1450,7 @@ impl StoreEngine for Store {
) -> Result<(), StoreError> {
let db = self.db.clone();
tokio::task::spawn_blocking(move || {
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();
let cf = db.cf_handle(CF_TRIE_NODES).ok_or_else(|| {
StoreError::Custom("Column family not found: CF_TRIE_NODES".to_string())
})?;
Expand Down Expand Up @@ -1525,7 +1527,7 @@ impl StoreEngine for Store {
.ok_or_else(|| StoreError::Custom("Column family not found".to_string()))?;

let mut iter = db.iterator_cf(&cf, rocksdb::IteratorMode::Start);
let mut batch = WriteBatchWithTransaction::default();
let mut batch = WriteBatch::default();

while let Some(Ok((key, _))) = iter.next() {
batch.delete_cf(&cf, key);
Expand All @@ -1541,7 +1543,7 @@ impl StoreEngine for Store {

/// Open column families
fn open_cfs<'a, const N: usize>(
db: &'a Arc<OptimisticTransactionDB<MultiThreaded>>,
db: &'a Arc<DBWithThreadMode<MultiThreaded>>,
names: [&str; N],
) -> Result<[Arc<BoundColumnFamily<'a>>; N], StoreError> {
let mut handles = Vec::with_capacity(N);
Expand Down
16 changes: 8 additions & 8 deletions crates/storage/trie_db/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
use ethrex_common::H256;
use ethrex_rlp::encode::RLPEncode;
use ethrex_trie::{Nibbles, Node, TrieDB, error::TrieError};
use rocksdb::{MultiThreaded, OptimisticTransactionDB};
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::sync::Arc;

use crate::trie_db::layering::apply_prefix;

/// RocksDB implementation for the TrieDB trait, with get and put operations.
pub struct RocksDBTrieDB {
/// RocksDB database
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
/// Column family name
cf_name: String,
/// Storage trie address prefix
Expand All @@ -18,7 +18,7 @@ pub struct RocksDBTrieDB {

impl RocksDBTrieDB {
pub fn new(
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
cf_name: &str,
address_prefix: Option<H256>,
) -> Result<Self, TrieError> {
Expand Down Expand Up @@ -64,7 +64,7 @@ impl TrieDB for RocksDBTrieDB {

fn put_batch(&self, key_values: Vec<(Nibbles, Vec<u8>)>) -> Result<(), TrieError> {
let cf = self.cf_handle()?;
let mut batch = rocksdb::WriteBatchWithTransaction::default();
let mut batch = rocksdb::WriteBatch::default();

for (key, value) in key_values {
let db_key = self.make_key(key);
Expand All @@ -82,7 +82,7 @@ impl TrieDB for RocksDBTrieDB {

fn put_batch_no_alloc(&self, key_values: &[(Nibbles, Node)]) -> Result<(), TrieError> {
let cf = self.cf_handle()?;
let mut batch = rocksdb::WriteBatchWithTransaction::default();
let mut batch = rocksdb::WriteBatch::default();
// 532 is the maximum size of an encoded branch node.
let mut buffer = Vec::with_capacity(532);

Expand Down Expand Up @@ -117,7 +117,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down Expand Up @@ -157,7 +157,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down Expand Up @@ -194,7 +194,7 @@ mod tests {
db_options.create_missing_column_families(true);

let cf_descriptor = ColumnFamilyDescriptor::new("test_cf", Options::default());
let db = OptimisticTransactionDB::<MultiThreaded>::open_cf_descriptors(
let db = DBWithThreadMode::<MultiThreaded>::open_cf_descriptors(
&db_options,
db_path,
vec![cf_descriptor],
Expand Down
12 changes: 6 additions & 6 deletions crates/storage/trie_db/rocksdb_locked.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
use ethrex_common::H256;
use ethrex_trie::{Nibbles, TrieDB, error::TrieError};
use rocksdb::{MultiThreaded, OptimisticTransactionDB, SnapshotWithThreadMode};
use rocksdb::{DBWithThreadMode, MultiThreaded, SnapshotWithThreadMode};
use std::sync::Arc;

use crate::trie_db::layering::apply_prefix;

/// RocksDB locked implementation for the TrieDB trait, read-only with consistent snapshot.
pub struct RocksDBLockedTrieDB {
/// RocksDB database
db: &'static Arc<OptimisticTransactionDB<MultiThreaded>>,
db: &'static Arc<DBWithThreadMode<MultiThreaded>>,
/// Column family handle
cf: std::sync::Arc<rocksdb::BoundColumnFamily<'static>>,
/// Read-only snapshot for consistent reads
snapshot: SnapshotWithThreadMode<'static, OptimisticTransactionDB<MultiThreaded>>,
snapshot: SnapshotWithThreadMode<'static, DBWithThreadMode<MultiThreaded>>,
/// Storage trie address prefix
address_prefix: Option<H256>,
}

impl RocksDBLockedTrieDB {
pub fn new(
db: Arc<OptimisticTransactionDB<MultiThreaded>>,
db: Arc<DBWithThreadMode<MultiThreaded>>,
cf_name: &str,
address_prefix: Option<H256>,
) -> Result<Self, TrieError> {
Expand Down Expand Up @@ -54,8 +54,8 @@ impl Drop for RocksDBLockedTrieDB {
// Restore the leaked database reference
unsafe {
drop(Box::from_raw(
self.db as *const Arc<OptimisticTransactionDB<MultiThreaded>>
as *mut Arc<OptimisticTransactionDB<MultiThreaded>>,
self.db as *const Arc<DBWithThreadMode<MultiThreaded>>
as *mut Arc<DBWithThreadMode<MultiThreaded>>,
));
}
}
Expand Down
56 changes: 56 additions & 0 deletions docs/internal/l1/db_safety.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Database safety without Rocksdb transactions

## Content addressed tables

- (block)`headers`
- (block)`bodies`
- `account_codes`
- `pending_blocks`

These tables are content addressed, which makes them safe because writes to them are atomic,
and them being content addressable means anyone reading from them either sees their
only possible value or they don't, but nothing else.

## Other Tables

- `block_numbers`
- `transaction_locations`
These tables are only written to in the `apply_updates` function, which means there are no concurrent writes to them.

### `canonical_block_hashes`

Written to only in `forkchoice_update` and `remove_blocks`, but the last one is used to revert batches from a CLI
option, not in runtime.

## `chain_data`

Written to during ethrex initialization and then read on forkchoice_update.

## `receipts`

Written to only in `apply_updates`.

## `snap_state`

Written to only during snap sync and mostly a legacy table used to signal the rest of the code when snap sync has finished.

## `trie_nodes`

All writes to the state and storage tries are done through the `apply_updates` function,
called only after block execution.
There is only one other place where we write to the tries, and that's during snap
sync, through the `write_storage_trie_nodes_batch` function (and similarly for state trie nodes);
this does not pose a problem because there is no block execution until snap sync is done.

There is also a `put_batch` function for the trie itself, but it is only used inside snap sync and
genesis setup, but nowhere else.

## `invalid_ancestors`

Written to in `set_latest_valid_ancestor`, called from every engine api endpoint and during full sync.

TODO: check validity of this.

## `full_sync_headers`

Written to and read only sequentially on the same function during full sync.