@@ -15,8 +15,8 @@ use ethrex_common::{
1515} ;
1616use ethrex_trie:: { Nibbles , Trie } ;
1717use rocksdb:: {
18- BlockBasedOptions , BoundColumnFamily , Cache , ColumnFamilyDescriptor , MultiThreaded ,
19- OptimisticTransactionDB , Options , WriteBatchWithTransaction ,
18+ BlockBasedOptions , BoundColumnFamily , ColumnFamilyDescriptor , DBWithThreadMode , MultiThreaded ,
19+ Options , WriteBatch ,
2020} ;
2121use std:: {
2222 collections:: HashSet ,
@@ -111,7 +111,7 @@ const CF_FULLSYNC_HEADERS: &str = "fullsync_headers";
111111
112112#[ derive( Debug ) ]
113113pub struct Store {
114- db : Arc < OptimisticTransactionDB < MultiThreaded > > ,
114+ db : Arc < DBWithThreadMode < MultiThreaded > > ,
115115 trie_cache : Arc < RwLock < TrieLayerCache > > ,
116116}
117117
@@ -121,8 +121,6 @@ impl Store {
121121 db_options. create_if_missing ( true ) ;
122122 db_options. create_missing_column_families ( true ) ;
123123
124- let cache = Cache :: new_lru_cache ( 4 * 1024 * 1024 * 1024 ) ; // 4GB cache
125-
126124 db_options. set_max_open_files ( -1 ) ;
127125 db_options. set_max_file_opening_threads ( 16 ) ;
128126
@@ -175,18 +173,17 @@ impl Store {
175173 ] ;
176174
177175 // Get existing column families to know which ones to drop later
178- let existing_cfs =
179- match OptimisticTransactionDB :: < MultiThreaded > :: list_cf ( & db_options, path) {
180- Ok ( cfs) => {
181- info ! ( "Found existing column families: {:?}" , cfs) ;
182- cfs
183- }
184- Err ( _) => {
185- // Database doesn't exist yet
186- info ! ( "Database doesn't exist, will create with expected column families" ) ;
187- vec ! [ "default" . to_string( ) ]
188- }
189- } ;
176+ let existing_cfs = match DBWithThreadMode :: < MultiThreaded > :: list_cf ( & db_options, path) {
177+ Ok ( cfs) => {
178+ info ! ( "Found existing column families: {:?}" , cfs) ;
179+ cfs
180+ }
181+ Err ( _) => {
182+ // Database doesn't exist yet
183+ info ! ( "Database doesn't exist, will create with expected column families" ) ;
184+ vec ! [ "default" . to_string( ) ]
185+ }
186+ } ;
190187
191188 // Create descriptors for ALL existing CFs + expected ones (RocksDB requires opening all existing CFs)
192189 let mut all_cfs_to_open = HashSet :: new ( ) ;
@@ -220,9 +217,7 @@ impl Store {
220217 cf_opts. set_target_file_size_base ( 256 * 1024 * 1024 ) ; // 256MB
221218
222219 let mut block_opts = BlockBasedOptions :: default ( ) ;
223- block_opts. set_block_cache ( & cache) ;
224220 block_opts. set_block_size ( 32 * 1024 ) ; // 32KB blocks
225- block_opts. set_cache_index_and_filter_blocks ( true ) ;
226221 cf_opts. set_block_based_table_factory ( & block_opts) ;
227222 }
228223 CF_CANONICAL_BLOCK_HASHES | CF_BLOCK_NUMBERS => {
@@ -232,10 +227,8 @@ impl Store {
232227 cf_opts. set_target_file_size_base ( 128 * 1024 * 1024 ) ; // 128MB
233228
234229 let mut block_opts = BlockBasedOptions :: default ( ) ;
235- block_opts. set_block_cache ( & cache) ;
236230 block_opts. set_block_size ( 16 * 1024 ) ; // 16KB
237231 block_opts. set_bloom_filter ( 10.0 , false ) ;
238- block_opts. set_cache_index_and_filter_blocks ( true ) ;
239232 cf_opts. set_block_based_table_factory ( & block_opts) ;
240233 }
241234 CF_TRIE_NODES => {
@@ -248,10 +241,7 @@ impl Store {
248241
249242 let mut block_opts = BlockBasedOptions :: default ( ) ;
250243 block_opts. set_block_size ( 16 * 1024 ) ; // 16KB
251- block_opts. set_block_cache ( & cache) ;
252244 block_opts. set_bloom_filter ( 10.0 , false ) ; // 10 bits per key
253- block_opts. set_cache_index_and_filter_blocks ( true ) ;
254- block_opts. set_pin_l0_filter_and_index_blocks_in_cache ( true ) ;
255245 cf_opts. set_block_based_table_factory ( & block_opts) ;
256246 }
257247 CF_RECEIPTS | CF_ACCOUNT_CODES => {
@@ -261,9 +251,7 @@ impl Store {
261251 cf_opts. set_target_file_size_base ( 256 * 1024 * 1024 ) ; // 256MB
262252
263253 let mut block_opts = BlockBasedOptions :: default ( ) ;
264- block_opts. set_block_cache ( & cache) ;
265254 block_opts. set_block_size ( 32 * 1024 ) ; // 32KB
266- block_opts. set_block_cache ( & cache) ;
267255 cf_opts. set_block_based_table_factory ( & block_opts) ;
268256 }
269257 _ => {
@@ -275,15 +263,29 @@ impl Store {
275263
276264 let mut block_opts = BlockBasedOptions :: default ( ) ;
277265 block_opts. set_block_size ( 16 * 1024 ) ;
278- block_opts. set_block_cache ( & cache) ;
279266 cf_opts. set_block_based_table_factory ( & block_opts) ;
280267 }
281268 }
282269
283270 cf_descriptors. push ( ColumnFamilyDescriptor :: new ( cf_name, cf_opts) ) ;
284271 }
285272
286- let db = OptimisticTransactionDB :: < MultiThreaded > :: open_cf_descriptors (
273+ // Note: we are not using transactions on our Rocksdb instance.
274+ // This is safe as long as two conditions are met:
275+ // - We never write to the same table from two different places concurrently.
276+ // - We always use batch writes. This guarantees atomicity in rocksdb.
277+ //
278+ // For the first point, we know that all writes to the state and storage tries are
279+ // done through the `apply_updates` function, called only after block execution.
280+ // There is only one other place where we write to the tries, and that's during snap
281+ // sync, through the `write_storage_trie_nodes_batch` function (and similarly for state trie nodes);
282+ // this does not pose a problem because there is no block execution until snap sync is done.
283+ //
284+ // Regardless of transactionality, all writes go through a WAL, which ensures
285+ // we get durability (i.e. crash recovery).
286+ //
287+ // For other less crucial tables refer to the db_safety documentation.
288+ let db = DBWithThreadMode :: < MultiThreaded > :: open_cf_descriptors (
287289 & db_options,
288290 path,
289291 cf_descriptors,
@@ -381,7 +383,7 @@ impl Store {
381383 ) -> Result < ( ) , StoreError > {
382384 let db = self . db . clone ( ) ;
383385 tokio:: task:: spawn_blocking ( move || {
384- let mut batch = WriteBatchWithTransaction :: default ( ) ;
386+ let mut batch = WriteBatch :: default ( ) ;
385387
386388 for ( cf_name, key, value) in batch_ops {
387389 let cf = db. cf_handle ( & cf_name) . ok_or_else ( || {
@@ -498,7 +500,7 @@ impl StoreEngine for Store {
498500 ) ?;
499501
500502 let _span = tracing:: trace_span!( "Block DB update" ) . entered ( ) ;
501- let mut batch = WriteBatchWithTransaction :: default ( ) ;
503+ let mut batch = WriteBatch :: default ( ) ;
502504
503505 let mut trie = trie_cache. write ( ) . map_err ( |_| StoreError :: LockError ) ?;
504506 if let Some ( root) = trie. get_commitable ( parent_state_root, COMMIT_THRESHOLD ) {
@@ -580,7 +582,7 @@ impl StoreEngine for Store {
580582 let db = self . db . clone ( ) ;
581583
582584 tokio:: task:: spawn_blocking ( move || {
583- let mut batch = WriteBatchWithTransaction :: default ( ) ;
585+ let mut batch = WriteBatch :: default ( ) ;
584586
585587 let [ cf_headers, cf_bodies, cf_block_numbers, cf_tx_locations] = open_cfs (
586588 & db,
@@ -689,7 +691,7 @@ impl StoreEngine for Store {
689691 }
690692
691693 async fn remove_block ( & self , block_number : BlockNumber ) -> Result < ( ) , StoreError > {
692- let mut batch = WriteBatchWithTransaction :: default ( ) ;
694+ let mut batch = WriteBatch :: default ( ) ;
693695
694696 let Some ( hash) = self . get_canonical_block_hash_sync ( block_number) ? else {
695697 return Ok ( ( ) ) ;
@@ -939,7 +941,7 @@ impl StoreEngine for Store {
939941 . ok_or_else ( || StoreError :: Custom ( "Column family not found" . to_string ( ) ) ) ?;
940942
941943 let mut iter = db. iterator_cf ( & cf, rocksdb:: IteratorMode :: Start ) ;
942- let mut batch = WriteBatchWithTransaction :: default ( ) ;
944+ let mut batch = WriteBatch :: default ( ) ;
943945
944946 while let Some ( Ok ( ( key, _) ) ) = iter. next ( ) {
945947 batch. delete_cf ( & cf, key) ;
@@ -1203,7 +1205,7 @@ impl StoreEngine for Store {
12031205 let db = self . db . clone ( ) ;
12041206
12051207 tokio:: task:: spawn_blocking ( move || {
1206- let mut batch = WriteBatchWithTransaction :: default ( ) ;
1208+ let mut batch = WriteBatch :: default ( ) ;
12071209
12081210 let [ cf_canonical, cf_chain_data] =
12091211 open_cfs ( & db, [ CF_CANONICAL_BLOCK_HASHES , CF_CHAIN_DATA ] ) ?;
@@ -1448,7 +1450,7 @@ impl StoreEngine for Store {
14481450 ) -> Result < ( ) , StoreError > {
14491451 let db = self . db . clone ( ) ;
14501452 tokio:: task:: spawn_blocking ( move || {
1451- let mut batch = WriteBatchWithTransaction :: default ( ) ;
1453+ let mut batch = WriteBatch :: default ( ) ;
14521454 let cf = db. cf_handle ( CF_TRIE_NODES ) . ok_or_else ( || {
14531455 StoreError :: Custom ( "Column family not found: CF_TRIE_NODES" . to_string ( ) )
14541456 } ) ?;
@@ -1525,7 +1527,7 @@ impl StoreEngine for Store {
15251527 . ok_or_else ( || StoreError :: Custom ( "Column family not found" . to_string ( ) ) ) ?;
15261528
15271529 let mut iter = db. iterator_cf ( & cf, rocksdb:: IteratorMode :: Start ) ;
1528- let mut batch = WriteBatchWithTransaction :: default ( ) ;
1530+ let mut batch = WriteBatch :: default ( ) ;
15291531
15301532 while let Some ( Ok ( ( key, _) ) ) = iter. next ( ) {
15311533 batch. delete_cf ( & cf, key) ;
@@ -1541,7 +1543,7 @@ impl StoreEngine for Store {
15411543
15421544/// Open column families
15431545fn open_cfs < ' a , const N : usize > (
1544- db : & ' a Arc < OptimisticTransactionDB < MultiThreaded > > ,
1546+ db : & ' a Arc < DBWithThreadMode < MultiThreaded > > ,
15451547 names : [ & str ; N ] ,
15461548) -> Result < [ Arc < BoundColumnFamily < ' a > > ; N ] , StoreError > {
15471549 let mut handles = Vec :: with_capacity ( N ) ;
0 commit comments