|  | 
|  | 1 | +use crate::Chain; | 
|  | 2 | +use alloy::{consensus::BlockHeader, primitives::B256}; | 
|  | 3 | +use eyre::ContextCompat; | 
|  | 4 | +use init4_bin_base::utils::calc::SlotCalculator; | 
|  | 5 | +use reth::{ | 
|  | 6 | +    primitives::EthPrimitives, | 
|  | 7 | +    providers::{BlockNumReader, BlockReader, ExecutionOutcome, HeaderProvider, ProviderFactory}, | 
|  | 8 | +    revm::{database::StateProviderDatabase, db::StateBuilder}, | 
|  | 9 | +}; | 
|  | 10 | +use reth_chainspec::{ChainSpec, EthereumHardforks}; | 
|  | 11 | +use reth_node_api::{FullNodeComponents, NodeTypes}; | 
|  | 12 | +use signet_blobber::{CacheHandle, ExtractableChainShim}; | 
|  | 13 | +use signet_constants::SignetSystemConstants; | 
|  | 14 | +use signet_db::{DataCompat, DbProviderExt, RuChain, RuRevmState, RuWriter}; | 
|  | 15 | +use signet_evm::{BlockResult, EvmNeedsCfg, SignetDriver}; | 
|  | 16 | +use signet_extract::{Extractor, Extracts}; | 
|  | 17 | +use signet_journal::HostJournal; | 
|  | 18 | +use signet_node_types::{NodeTypesDbTrait, SignetNodeTypes}; | 
|  | 19 | +use std::collections::VecDeque; | 
|  | 20 | +use std::sync::Arc; | 
|  | 21 | +use tracing::{Instrument, debug, error, info, info_span, instrument}; | 
|  | 22 | +use trevm::revm::primitives::hardfork::SpecId; | 
|  | 23 | + | 
|  | 24 | +/// A block processor that listens to host chain commits and processes | 
|  | 25 | +/// Signet blocks accordingly. | 
|  | 26 | +#[derive(Debug)] | 
|  | 27 | +pub struct SignetBlockProcessor<Db> | 
|  | 28 | +where | 
|  | 29 | +    Db: NodeTypesDbTrait, | 
|  | 30 | +{ | 
|  | 31 | +    /// Signet System Constants | 
|  | 32 | +    constants: SignetSystemConstants, | 
|  | 33 | + | 
|  | 34 | +    /// The chain specification, used to determine active hardforks. | 
|  | 35 | +    chain_spec: Arc<ChainSpec>, | 
|  | 36 | + | 
|  | 37 | +    /// A [`ProviderFactory`] instance to allow RU database access. | 
|  | 38 | +    ru_provider: ProviderFactory<SignetNodeTypes<Db>>, | 
|  | 39 | + | 
|  | 40 | +    /// The slot calculator. | 
|  | 41 | +    slot_calculator: SlotCalculator, | 
|  | 42 | + | 
|  | 43 | +    /// A handle to the blob cacher. | 
|  | 44 | +    blob_cacher: CacheHandle, | 
|  | 45 | +} | 
|  | 46 | + | 
|  | 47 | +impl<Db> SignetBlockProcessor<Db> | 
|  | 48 | +where | 
|  | 49 | +    Db: NodeTypesDbTrait, | 
|  | 50 | +{ | 
|  | 51 | +    /// Create a new [`SignetBlockProcessor`]. | 
|  | 52 | +    pub const fn new( | 
|  | 53 | +        constants: SignetSystemConstants, | 
|  | 54 | +        chain_spec: Arc<ChainSpec>, | 
|  | 55 | +        ru_provider: ProviderFactory<SignetNodeTypes<Db>>, | 
|  | 56 | +        slot_calculator: SlotCalculator, | 
|  | 57 | +        blob_cacher: CacheHandle, | 
|  | 58 | +    ) -> Self { | 
|  | 59 | +        Self { constants, chain_spec, ru_provider, slot_calculator, blob_cacher } | 
|  | 60 | +    } | 
|  | 61 | + | 
|  | 62 | +    /// Get the active spec id at the given timestamp. | 
|  | 63 | +    fn spec_id(&self, timestamp: u64) -> SpecId { | 
|  | 64 | +        if self.chain_spec.is_prague_active_at_timestamp(timestamp) { | 
|  | 65 | +            SpecId::PRAGUE | 
|  | 66 | +        } else { | 
|  | 67 | +            SpecId::CANCUN | 
|  | 68 | +        } | 
|  | 69 | +    } | 
|  | 70 | + | 
|  | 71 | +    /// Make a [`StateProviderDatabase`] from the read-write provider, suitable | 
|  | 72 | +    /// for use with Trevm. | 
|  | 73 | +    fn state_provider_database(&self, height: u64) -> eyre::Result<RuRevmState> { | 
|  | 74 | +        // Get the state provider for the block number | 
|  | 75 | +        let sp = self.ru_provider.history_by_block_number(height)?; | 
|  | 76 | + | 
|  | 77 | +        // Wrap in Revm comatibility layer | 
|  | 78 | +        let spd = StateProviderDatabase::new(sp); | 
|  | 79 | +        let builder = StateBuilder::new_with_database(spd); | 
|  | 80 | + | 
|  | 81 | +        Ok(builder.with_bundle_update().build()) | 
|  | 82 | +    } | 
|  | 83 | + | 
|  | 84 | +    /// Make a new Trevm instance, building on the given height. | 
|  | 85 | +    fn trevm(&self, parent_height: u64, spec_id: SpecId) -> eyre::Result<EvmNeedsCfg<RuRevmState>> { | 
|  | 86 | +        let db = self.state_provider_database(parent_height)?; | 
|  | 87 | + | 
|  | 88 | +        let mut trevm = signet_evm::signet_evm(db, self.constants.clone()); | 
|  | 89 | + | 
|  | 90 | +        trevm.set_spec_id(spec_id); | 
|  | 91 | + | 
|  | 92 | +        Ok(trevm) | 
|  | 93 | +    } | 
|  | 94 | + | 
|  | 95 | +    /// Called when the host chain has committed a block or set of blocks. | 
|  | 96 | +    #[instrument(skip_all, fields(count = chain.len(), first = chain.first().number(), tip = chain.tip().number()))] | 
|  | 97 | +    pub async fn on_host_commit<Host>(&self, chain: &Chain<Host>) -> eyre::Result<Option<RuChain>> | 
|  | 98 | +    where | 
|  | 99 | +        Host: FullNodeComponents, | 
|  | 100 | +        Host::Types: NodeTypes<Primitives = EthPrimitives>, | 
|  | 101 | +    { | 
|  | 102 | +        let highest = chain.tip().number(); | 
|  | 103 | +        if highest < self.constants.host_deploy_height() { | 
|  | 104 | +            return Ok(None); | 
|  | 105 | +        } | 
|  | 106 | + | 
|  | 107 | +        // this should never happen but we want to handle it anyway | 
|  | 108 | +        if chain.is_empty() { | 
|  | 109 | +            return Ok(None); | 
|  | 110 | +        } | 
|  | 111 | + | 
|  | 112 | +        let extractor = Extractor::new(self.constants.clone()); | 
|  | 113 | +        let shim = ExtractableChainShim::new(chain); | 
|  | 114 | +        let outputs = extractor.extract_signet(&shim); | 
|  | 115 | + | 
|  | 116 | +        // TODO: ENG-481 Inherit prune modes from Reth configuration. | 
|  | 117 | +        // https://linear.app/initiates/issue/ENG-481/inherit-prune-modes-from-reth-node | 
|  | 118 | + | 
|  | 119 | +        // The extractor will filter out blocks at or before the deployment | 
|  | 120 | +        // height, so we don't need compute the start from the notification. | 
|  | 121 | +        let mut start = None; | 
|  | 122 | +        let mut current = 0; | 
|  | 123 | +        let mut prev_block_journal = self.ru_provider.provider_rw()?.latest_journal_hash()?; | 
|  | 124 | + | 
|  | 125 | +        let mut net_outcome = ExecutionOutcome::default(); | 
|  | 126 | +        let last_ru_height = self.ru_provider.last_block_number()?; | 
|  | 127 | + | 
|  | 128 | +        // There might be a case where we can get a notification that starts | 
|  | 129 | +        // "lower" than our last processed block, | 
|  | 130 | +        // but contains new information beyond one point. In this case, we | 
|  | 131 | +        // should simply skip the block. | 
|  | 132 | +        for block_extracts in outputs.skip_while(|extract| extract.ru_height <= last_ru_height) { | 
|  | 133 | +            // If we haven't set the start yet, set it to the first block. | 
|  | 134 | +            if start.is_none() { | 
|  | 135 | +                let new_ru_height = block_extracts.ru_height; | 
|  | 136 | + | 
|  | 137 | +                // If the above condition passes, we should always be | 
|  | 138 | +                // committing without skipping a range of blocks. | 
|  | 139 | +                if new_ru_height != last_ru_height + 1 { | 
|  | 140 | +                    error!( | 
|  | 141 | +                        %new_ru_height, | 
|  | 142 | +                        %last_ru_height, | 
|  | 143 | +                        "missing range of DB blocks" | 
|  | 144 | +                    ); | 
|  | 145 | +                    eyre::bail!("missing range of DB blocks"); | 
|  | 146 | +                } | 
|  | 147 | +                start = Some(new_ru_height); | 
|  | 148 | +            } | 
|  | 149 | +            current = block_extracts.ru_height; | 
|  | 150 | +            let spec_id = self.spec_id(block_extracts.host_block.timestamp()); | 
|  | 151 | + | 
|  | 152 | +            let span = info_span!( | 
|  | 153 | +                "signet::handle_zenith_outputs::block_processing", | 
|  | 154 | +                start = start.unwrap(), | 
|  | 155 | +                ru_height = block_extracts.ru_height, | 
|  | 156 | +                host_height = block_extracts.host_block.number(), | 
|  | 157 | +                has_ru_block = block_extracts.submitted.is_some(), | 
|  | 158 | +            ); | 
|  | 159 | + | 
|  | 160 | +            tracing::trace!("Running EVM"); | 
|  | 161 | +            let block_result = self.run_evm(&block_extracts, spec_id).instrument(span).await?; | 
|  | 162 | +            tracing::trace!("Committing EVM results"); | 
|  | 163 | +            let journal = | 
|  | 164 | +                self.commit_evm_results(&block_extracts, &block_result, prev_block_journal)?; | 
|  | 165 | + | 
|  | 166 | +            prev_block_journal = journal.journal_hash(); | 
|  | 167 | +            net_outcome.extend(block_result.execution_outcome.convert()); | 
|  | 168 | +        } | 
|  | 169 | +        info!("committed blocks"); | 
|  | 170 | + | 
|  | 171 | +        // If we didn't process any blocks, we don't need to return anything. | 
|  | 172 | +        // In practice, this should never happen, as we should always have at | 
|  | 173 | +        // least one block to process. | 
|  | 174 | +        if start.is_none() { | 
|  | 175 | +            return Ok(None); | 
|  | 176 | +        } | 
|  | 177 | +        let start = start.expect("checked by early return"); | 
|  | 178 | + | 
|  | 179 | +        // Return the range of blocks we processed | 
|  | 180 | +        let provider = self.ru_provider.provider_rw()?; | 
|  | 181 | + | 
|  | 182 | +        let ru_info = provider.get_extraction_results(start..=current)?; | 
|  | 183 | + | 
|  | 184 | +        let inner = | 
|  | 185 | +            Chain::<Host>::new(provider.recovered_block_range(start..=current)?, net_outcome, None); | 
|  | 186 | + | 
|  | 187 | +        Ok(Some(RuChain { inner, ru_info })) | 
|  | 188 | +    } | 
|  | 189 | + | 
|  | 190 | +    /// ========================== | 
|  | 191 | +    /// ========================== | 
|  | 192 | +    /// ██████  ██    ██ ███    ██ | 
|  | 193 | +    /// ██   ██ ██    ██ ████   ██ | 
|  | 194 | +    /// ██████  ██    ██ ██ ██  ██ | 
|  | 195 | +    /// ██   ██ ██    ██ ██  ██ ██ | 
|  | 196 | +    /// ██   ██  ██████  ██   ████ | 
|  | 197 | +    /// | 
|  | 198 | +    /// | 
|  | 199 | +    /// ███████ ██    ██ ███    ███ | 
|  | 200 | +    /// ██      ██    ██ ████  ████ | 
|  | 201 | +    /// █████   ██    ██ ██ ████ ██ | 
|  | 202 | +    /// ██       ██  ██  ██  ██  ██ | 
|  | 203 | +    /// ███████   ████   ██      ██ | 
|  | 204 | +    /// =========================== | 
|  | 205 | +    /// =========================== | 
|  | 206 | +    async fn run_evm( | 
|  | 207 | +        &self, | 
|  | 208 | +        block_extracts: &Extracts<'_, ExtractableChainShim<'_>>, | 
|  | 209 | +        spec_id: SpecId, | 
|  | 210 | +    ) -> eyre::Result<BlockResult> { | 
|  | 211 | +        let ru_height = block_extracts.ru_height; | 
|  | 212 | +        let host_height = block_extracts.host_block.number(); | 
|  | 213 | +        let timestamp = block_extracts.host_block.timestamp(); | 
|  | 214 | + | 
|  | 215 | +        let parent_header = self | 
|  | 216 | +            .ru_provider | 
|  | 217 | +            .sealed_header(block_extracts.ru_height.saturating_sub(1))? | 
|  | 218 | +            .wrap_err("parent ru block not present in DB") | 
|  | 219 | +            .inspect_err(|e| error!(%e))?; | 
|  | 220 | + | 
|  | 221 | +        let slot = self.slot_calculator.slot_ending_at(timestamp).expect("host chain has started"); | 
|  | 222 | + | 
|  | 223 | +        let txns = match &block_extracts.submitted { | 
|  | 224 | +            Some(submitted) => { | 
|  | 225 | +                self.blob_cacher | 
|  | 226 | +                    .signet_block(block_extracts.host_block.number(), slot, submitted) | 
|  | 227 | +                    .await | 
|  | 228 | +                    .map(|block| block.into_parts().1) | 
|  | 229 | +                    .unwrap_or_default() | 
|  | 230 | +                    .into_iter() | 
|  | 231 | +                    .filter(|tx| !tx.is_eip4844()) // redundant, but let's be sure | 
|  | 232 | +                    .map(|tx| tx.into()) | 
|  | 233 | +                    .collect::<VecDeque<_>>() | 
|  | 234 | +            } | 
|  | 235 | +            None => VecDeque::new(), | 
|  | 236 | +        }; | 
|  | 237 | + | 
|  | 238 | +        let mut driver = SignetDriver::new( | 
|  | 239 | +            block_extracts, | 
|  | 240 | +            txns, | 
|  | 241 | +            parent_header.convert(), | 
|  | 242 | +            self.constants.clone(), | 
|  | 243 | +        ); | 
|  | 244 | + | 
|  | 245 | +        let trevm = self.trevm(driver.parent().number(), spec_id)?.fill_cfg(&driver); | 
|  | 246 | + | 
|  | 247 | +        let trevm = match trevm.drive_block(&mut driver) { | 
|  | 248 | +            Ok(t) => t, | 
|  | 249 | +            Err(e) => return Err(e.into_error().into()), | 
|  | 250 | +        }; | 
|  | 251 | + | 
|  | 252 | +        let (sealed_block, receipts) = driver.finish(); | 
|  | 253 | +        let bundle = trevm.finish(); | 
|  | 254 | + | 
|  | 255 | +        Ok(BlockResult { | 
|  | 256 | +            sealed_block, | 
|  | 257 | +            execution_outcome: signet_evm::ExecutionOutcome::new(bundle, vec![receipts], ru_height), | 
|  | 258 | +            host_height, | 
|  | 259 | +        }) | 
|  | 260 | +    } | 
|  | 261 | + | 
|  | 262 | +    /// Commit the outputs of a zenith block to the database. | 
|  | 263 | +    #[instrument(skip_all)] | 
|  | 264 | +    fn commit_evm_results<'a>( | 
|  | 265 | +        &self, | 
|  | 266 | +        extracts: &Extracts<'_, ExtractableChainShim<'_>>, | 
|  | 267 | +        block_result: &'a BlockResult, | 
|  | 268 | +        prev_block_journal: B256, | 
|  | 269 | +    ) -> eyre::Result<HostJournal<'a>> { | 
|  | 270 | +        let journal = block_result.make_host_journal(prev_block_journal); | 
|  | 271 | +        let time = std::time::Instant::now(); | 
|  | 272 | +        let jh = journal.journal_hash(); | 
|  | 273 | + | 
|  | 274 | +        debug!( | 
|  | 275 | +            target: "signet::journal::serialize", | 
|  | 276 | +            bytes = journal.serialized().len(), | 
|  | 277 | +            hash = %jh, | 
|  | 278 | +            elapsed_micros = %time.elapsed().as_micros(), | 
|  | 279 | +            "journal produced" | 
|  | 280 | +        ); | 
|  | 281 | + | 
|  | 282 | +        self.ru_provider.provider_rw()?.update(|writer| { | 
|  | 283 | +            // add execution results to database | 
|  | 284 | +            writer.append_host_block( | 
|  | 285 | +                extracts.ru_header(), | 
|  | 286 | +                extracts.transacts().cloned(), | 
|  | 287 | +                extracts.enters(), | 
|  | 288 | +                extracts.enter_tokens(), | 
|  | 289 | +                block_result, | 
|  | 290 | +                jh, | 
|  | 291 | +            )?; | 
|  | 292 | +            Ok(()) | 
|  | 293 | +        })?; | 
|  | 294 | +        Ok(journal) | 
|  | 295 | +    } | 
|  | 296 | +} | 
0 commit comments