Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/flashblocks-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub(crate) use metrics::Metrics;

mod pending_blocks;

pub mod pubsub;

pub mod rpc;

pub mod state;
Expand Down
150 changes: 150 additions & 0 deletions crates/flashblocks-rpc/src/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
//! `base_` PubSub RPC implementation for flashblocks subscriptions

use std::sync::Arc;

use jsonrpsee::{
PendingSubscriptionSink, SubscriptionSink,
core::{SubscriptionResult, async_trait},
proc_macros::rpc,
server::SubscriptionMessage,
};
use op_alloy_network::Optimism;
use reth_rpc_eth_api::RpcBlock;
use serde::{Deserialize, Serialize};
use tokio_stream::{Stream, StreamExt, wrappers::BroadcastStream};
use tracing::error;

use crate::rpc::FlashblocksAPI;

/// Subscription kind for Base-specific subscriptions
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum BaseSubscriptionKind {
/// New flashblocks subscription.
///
/// Fires a notification each time a new flashblock is processed, providing the current
/// pending block state. Each flashblock represents an incremental update to the pending
/// block, so multiple notifications may be emitted for the same block height as new
/// flashblocks arrive.
NewFlashblocks,
}

/// Base pub-sub RPC interface for flashblocks subscriptions.
#[rpc(server, namespace = "base")]
pub trait BasePubSubApi {
/// Create a Base subscription for the given kind
#[subscription(
name = "subscribe" => "subscription",
unsubscribe = "unsubscribe",
item = RpcBlock<Optimism>
)]
async fn subscribe(&self, kind: BaseSubscriptionKind) -> SubscriptionResult;
}

/// `Base` pubsub RPC implementation.
///
/// This handles `base_subscribe` RPC calls for flashblocks-specific subscriptions.
#[derive(Clone, Debug)]
pub struct BasePubSub<FB> {
/// Flashblocks state for accessing pending blocks stream
flashblocks_state: Arc<FB>,
}

impl<FB> BasePubSub<FB> {
/// Creates a new instance with the given flashblocks state
pub const fn new(flashblocks_state: Arc<FB>) -> Self {
Self { flashblocks_state }
}

/// Returns a stream that yields all new flashblocks as RPC blocks
fn new_flashblocks_stream(flashblocks_state: Arc<FB>) -> impl Stream<Item = RpcBlock<Optimism>>
where
FB: FlashblocksAPI + Send + Sync + 'static,
{
BroadcastStream::new(flashblocks_state.subscribe_to_flashblocks()).filter_map(|result| {
let pending_blocks = match result {
Ok(blocks) => blocks,
Err(err) => {
error!(
message = "Error in flashblocks stream",
error = %err
);
return None;
}
};
Some(pending_blocks.get_latest_block(true))
})
}
}

#[async_trait]
impl<FB> BasePubSubApiServer for BasePubSub<FB>
where
FB: FlashblocksAPI + Send + Sync + 'static,
{
/// Handler for `base_subscribe`
async fn subscribe(
&self,
pending: PendingSubscriptionSink,
kind: BaseSubscriptionKind,
) -> SubscriptionResult {
let sink = pending.accept().await?;

match kind {
BaseSubscriptionKind::NewFlashblocks => {
let stream = Self::new_flashblocks_stream(Arc::clone(&self.flashblocks_state));

tokio::spawn(async move {
pipe_from_stream(sink, stream).await;
});
}
}

Ok(())
}
}

/// Pipes all stream items to the subscription sink.
///
/// This function runs until the stream ends, the client disconnects, or a serialization error occurs.
/// All exit conditions result in graceful termination.
async fn pipe_from_stream<T, St>(sink: SubscriptionSink, mut stream: St)
where
St: Stream<Item = T> + Unpin,
T: Serialize,
{
loop {
tokio::select! {
// dropped by client
_ = sink.closed() => return,

maybe_item = stream.next() => {
// stream ended
let Some(item) = maybe_item else {
return;
};

let msg = match SubscriptionMessage::new(
sink.method_name(),
sink.subscription_id(),
&item
) {
Ok(msg) => msg,
Err(err) => {
error!(
target: "flashblocks_rpc::pubsub",
%err,
"Failed to serialize subscription message"
);
return;
}
};

// if it fails, client disconnected
if sink.send(msg).await.is_err() {
return;
}
}
}
}
}
184 changes: 183 additions & 1 deletion crates/flashblocks-rpc/tests/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@ use alloy_rpc_client::RpcClient;
use alloy_rpc_types::simulate::{SimBlock, SimulatePayload};
use alloy_rpc_types_engine::PayloadId;
use alloy_rpc_types_eth::{TransactionInput, error::EthRpcErrorCode};
use base_reth_flashblocks_rpc::subscription::{Flashblock, Metadata};
use base_reth_flashblocks_rpc::{
pubsub::{BasePubSub, BasePubSubApiServer},
subscription::{Flashblock, Metadata},
};
use base_reth_test_utils::flashblocks_harness::FlashblocksHarness;
use common::{BLOCK_INFO_TXN, BLOCK_INFO_TXN_HASH};
use eyre::Result;
use futures_util::{SinkExt, StreamExt};
use op_alloy_consensus::OpDepositReceipt;
use op_alloy_network::{Optimism, ReceiptResponse, TransactionResponse};
use op_alloy_rpc_types::OpTransactionRequest;
use reth_optimism_primitives::OpReceipt;
use reth_rpc_eth_api::RpcReceipt;
use rollup_boost::{ExecutionPayloadBaseV1, ExecutionPayloadFlashblockDeltaV1};
use serde_json::json;
use tokio_tungstenite::{connect_async, tungstenite::Message};

struct TestSetup {
harness: FlashblocksHarness,
Expand Down Expand Up @@ -731,3 +737,179 @@ async fn test_get_logs_mixed_block_ranges() -> Result<()> {

Ok(())
}

// base_ methods
#[tokio::test]
async fn test_base_subscribe_new_flashblocks() -> eyre::Result<()> {
let setup = TestSetup::new().await?;
let provider = setup.harness.provider();
let ws_url = setup.harness.ws_url();
let (mut ws_stream, _) = connect_async(&ws_url).await?;

ws_stream
.send(Message::Text(
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "base_subscribe",
"params": ["newFlashblocks"]
})
.to_string()
.into(),
))
.await?;

let response = ws_stream.next().await.unwrap()?;
let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?;
assert_eq!(sub["jsonrpc"], "2.0");
assert_eq!(sub["id"], 1);
let subscription_id = sub["result"].as_str().expect("subscription id expected");

setup.send_flashblock(create_first_payload()).await?;

let notification = ws_stream.next().await.unwrap()?;
let notif: serde_json::Value = serde_json::from_str(notification.to_text()?)?;
assert_eq!(notif["method"], "base_subscription");
assert_eq!(notif["params"]["subscription"], subscription_id);

let block = &notif["params"]["result"];
assert_eq!(block["number"], "0x1");
assert!(block["hash"].is_string());
assert!(block["parentHash"].is_string());
assert!(block["transactions"].is_array());
assert_eq!(block["transactions"].as_array().unwrap().len(), 1);

Ok(())
}

#[tokio::test]
async fn test_base_subscribe_multiple_flashblocks() -> eyre::Result<()> {
let setup = TestSetup::new().await?;
let provider = setup.harness.provider();
let ws_url = setup.harness.ws_url();
let (mut ws_stream, _) = connect_async(&ws_url).await?;

ws_stream
.send(Message::Text(
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "base_subscribe",
"params": ["newFlashblocks"]
})
.to_string()
.into(),
))
.await?;

let response = ws_stream.next().await.unwrap()?;
let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?;
let subscription_id = sub["result"].as_str().expect("subscription id expected");

setup.send_flashblock(create_first_payload()).await?;

let notif1 = ws_stream.next().await.unwrap()?;
let notif1: serde_json::Value = serde_json::from_str(notif1.to_text()?)?;
assert_eq!(notif1["params"]["subscription"], subscription_id);

let block1 = &notif1["params"]["result"];
assert_eq!(block1["number"], "0x1");
assert_eq!(block1["transactions"].as_array().unwrap().len(), 1);

setup.send_flashblock(create_second_payload()).await?;

let notif2 = ws_stream.next().await.unwrap()?;
let notif2: serde_json::Value = serde_json::from_str(notif2.to_text()?)?;
assert_eq!(notif2["params"]["subscription"], subscription_id);

let block2 = &notif2["params"]["result"];
assert_eq!(block1["number"], block2["number"]); // Same block, incremental updates
assert_eq!(block2["transactions"].as_array().unwrap().len(), 6);

Ok(())
}

#[tokio::test]
async fn test_base_unsubscribe() -> eyre::Result<()> {
let setup = TestSetup::new().await?;
let provider = setup.harness.provider();
let ws_url = setup.harness.ws_url();
let (mut ws_stream, _) = connect_async(&ws_url).await?;

ws_stream
.send(Message::Text(
json!({
"jsonrpc": "2.0",
"id": 1,
"method": "base_subscribe",
"params": ["newFlashblocks"]
})
.to_string()
.into(),
))
.await?;

let response = ws_stream.next().await.unwrap()?;
let sub: serde_json::Value = serde_json::from_str(response.to_text()?)?;
let subscription_id = sub["result"].as_str().expect("subscription id expected");

ws_stream
.send(Message::Text(
json!({
"jsonrpc": "2.0",
"id": 2,
"method": "base_unsubscribe",
"params": [subscription_id]
})
.to_string()
.into(),
))
.await?;

let unsub = ws_stream.next().await.unwrap()?;
let unsub: serde_json::Value = serde_json::from_str(unsub.to_text()?)?;
assert_eq!(unsub["jsonrpc"], "2.0");
assert_eq!(unsub["id"], 2);
assert_eq!(unsub["result"], true);

Ok(())
}

#[tokio::test]
async fn test_base_subscribe_multiple_clients() -> eyre::Result<()> {
let setup = TestSetup::new().await?;
let provider = setup.harness.provider();
let ws_url = setup.harness.ws_url();
let (mut ws1, _) = connect_async(&ws_url).await?;
let (mut ws2, _) = connect_async(&ws_url).await?;

let req = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "base_subscribe",
"params": ["newFlashblocks"]
});
ws1.send(Message::Text(req.to_string().into())).await?;
ws2.send(Message::Text(req.to_string().into())).await?;

let _sub1 = ws1.next().await.unwrap()?;
let _sub2 = ws2.next().await.unwrap()?;

setup.send_flashblock(create_first_payload()).await?;

let notif1 = ws1.next().await.unwrap()?;
let notif1: serde_json::Value = serde_json::from_str(notif1.to_text()?)?;
let notif2 = ws2.next().await.unwrap()?;
let notif2: serde_json::Value = serde_json::from_str(notif2.to_text()?)?;

assert_eq!(notif1["method"], "base_subscription");
assert_eq!(notif2["method"], "base_subscription");

let block1 = &notif1["params"]["result"];
let block2 = &notif2["params"]["result"];
assert_eq!(block1["number"], "0x1");
assert_eq!(block1["number"], block2["number"]);
assert_eq!(block1["hash"], block2["hash"]);

Ok(())
}
7 changes: 6 additions & 1 deletion crates/runner/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::Arc;

use base_reth_flashblocks_rpc::{
pubsub::{BasePubSub, BasePubSubApiServer},
rpc::{EthApiExt, EthApiOverrideServer},
state::FlashblocksState,
subscription::FlashblocksSubscriber,
Expand Down Expand Up @@ -129,9 +130,13 @@ impl BaseNodeLauncher {
let api_ext = EthApiExt::new(
ctx.registry.eth_api().clone(),
ctx.registry.eth_handlers().filter.clone(),
fb,
fb.clone(),
);
ctx.modules.replace_configured(api_ext.into_rpc())?;

// Register the base_subscribe subscription endpoint
let base_pubsub = BasePubSub::new(fb);
ctx.modules.merge_configured(base_pubsub.into_rpc())?;
} else {
info!(message = "flashblocks integration is disabled");
}
Expand Down
4 changes: 4 additions & 0 deletions crates/test-utils/src/harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl TestHarness {
format!("http://{}", self.node.http_api_addr)
}

pub fn ws_url(&self) -> String {
format!("ws://{}", self.node.ws_api_addr)
}

pub async fn build_block_from_transactions(&self, mut transactions: Vec<Bytes>) -> Result<()> {
// Ensure the block always starts with the required L1 block info deposit.
if transactions.first().is_none_or(|tx| tx != &L1_BLOCK_INFO_DEPOSIT_TX) {
Expand Down
Loading