diff --git a/.env.example b/.env.example index 2cc30d6a..9bff9ec0 100644 --- a/.env.example +++ b/.env.example @@ -34,3 +34,7 @@ TIPS_UI_S3_CONFIG_TYPE=manual TIPS_UI_S3_ENDPOINT=http://localhost:7000 TIPS_UI_S3_ACCESS_KEY_ID=minioadmin TIPS_UI_S3_SECRET_ACCESS_KEY=minioadmin + +# User Operations (EIP-4337) +TIPS_INGRESS_KAFKA_USER_OPS_PROPERTIES_FILE=/app/docker/user-operations-kafka-properties +TIPS_INGRESS_KAFKA_USER_OPS_TOPIC=tips-user-operations diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index a319dd08..d0bb456b 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -1,6 +1,7 @@ pub mod kafka; pub mod logger; pub mod types; +pub mod user_operation; #[cfg(any(test, feature = "test-utils"))] pub mod test_utils; @@ -9,3 +10,4 @@ pub use types::{ AcceptedBundle, Bundle, BundleExtensions, BundleHash, BundleTxs, CancelBundle, MeterBundleResponse, }; +pub use user_operation::{UserOperation, UserOperationWithMetadata}; diff --git a/crates/core/src/user_operation.rs b/crates/core/src/user_operation.rs new file mode 100644 index 00000000..31cbf7a3 --- /dev/null +++ b/crates/core/src/user_operation.rs @@ -0,0 +1,144 @@ +//! EIP-4337 Account Abstraction User Operation types +use alloy_primitives::{Address, Bytes, U256, B256, keccak256}; +use serde::{Deserialize, Serialize}; + +/// User Operation as defined by EIP-4337 v0.6 +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UserOperationV06 { + pub sender: Address, + pub nonce: U256, + pub init_code: Bytes, + pub call_data: Bytes, + pub call_gas_limit: U256, + pub verification_gas_limit: U256, + pub pre_verification_gas: U256, + pub max_fee_per_gas: U256, + pub max_priority_fee_per_gas: U256, + pub paymaster_and_data: Bytes, + pub signature: Bytes, +} + +/// User Operation as defined by EIP-4337 v0.7+ +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UserOperationV07 { + pub sender: Address, + pub nonce: U256, + pub factory: Address, + pub factory_data: Bytes, + pub call_data: Bytes, + pub call_gas_limit: U256, + pub verification_gas_limit: U256, + pub pre_verification_gas: U256, + pub max_fee_per_gas: U256, + pub max_priority_fee_per_gas: U256, + pub paymaster: Address, + pub paymaster_verification_gas_limit: U256, + pub paymaster_post_op_gas_limit: U256, + pub paymaster_data: Bytes, + pub signature: Bytes, +} + +/// User Operation that can be either v0.6 or v0.7+ +/// Automatically deserializes based on fields present +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum UserOperation { + V06(UserOperationV06), + V07(UserOperationV07), +} + +impl UserOperation { + /// Get the sender address + pub fn sender(&self) -> Address { + match self { + UserOperation::V06(op) => op.sender, + UserOperation::V07(op) => op.sender, + } + } + + /// Get the nonce + pub fn nonce(&self) -> U256 { + match self { + UserOperation::V06(op) => op.nonce, + UserOperation::V07(op) => op.nonce, + } + } + + /// Calculate the user operation hash (for use as Kafka key and tracking) + /// This is a simplified hash - for production, use the full EIP-4337 hash algorithm + pub fn user_op_hash(&self, entry_point: &Address, chain_id: u64) -> B256 { + let mut data = Vec::new(); + + // Include chain ID and entry point + data.extend_from_slice(&chain_id.to_be_bytes()); + data.extend_from_slice(entry_point.as_slice()); + + match self { + UserOperation::V06(op) => { + data.extend_from_slice(op.sender.as_slice()); + data.extend_from_slice(&op.nonce.to_be_bytes::<32>()); + data.extend_from_slice(&keccak256(&op.init_code).0); + data.extend_from_slice(&keccak256(&op.call_data).0); + data.extend_from_slice(&op.call_gas_limit.to_be_bytes::<32>()); + data.extend_from_slice(&op.verification_gas_limit.to_be_bytes::<32>()); + data.extend_from_slice(&op.pre_verification_gas.to_be_bytes::<32>()); + data.extend_from_slice(&op.max_fee_per_gas.to_be_bytes::<32>()); + data.extend_from_slice(&op.max_priority_fee_per_gas.to_be_bytes::<32>()); + data.extend_from_slice(&keccak256(&op.paymaster_and_data).0); + } + UserOperation::V07(op) => { + data.extend_from_slice(op.sender.as_slice()); + data.extend_from_slice(&op.nonce.to_be_bytes::<32>()); + data.extend_from_slice(op.factory.as_slice()); + data.extend_from_slice(&keccak256(&op.factory_data).0); + data.extend_from_slice(&keccak256(&op.call_data).0); + data.extend_from_slice(&op.call_gas_limit.to_be_bytes::<32>()); + data.extend_from_slice(&op.verification_gas_limit.to_be_bytes::<32>()); + data.extend_from_slice(&op.pre_verification_gas.to_be_bytes::<32>()); + data.extend_from_slice(&op.max_fee_per_gas.to_be_bytes::<32>()); + data.extend_from_slice(&op.max_priority_fee_per_gas.to_be_bytes::<32>()); + data.extend_from_slice(op.paymaster.as_slice()); + data.extend_from_slice(&op.paymaster_verification_gas_limit.to_be_bytes::<32>()); + data.extend_from_slice(&op.paymaster_post_op_gas_limit.to_be_bytes::<32>()); + data.extend_from_slice(&keccak256(&op.paymaster_data).0); + } + } + + keccak256(&data) + } +} + +/// Wrapper for UserOperation with metadata for Kafka queue +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UserOperationWithMetadata { + pub user_operation: UserOperation, + pub entry_point: Address, + pub user_op_hash: B256, + pub received_at: u64, // Unix timestamp + pub chain_id: u64, +} + +impl UserOperationWithMetadata { + pub fn new( + user_operation: UserOperation, + entry_point: Address, + chain_id: u64, + ) -> Self { + let user_op_hash = user_operation.user_op_hash(&entry_point, chain_id); + let received_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); + + Self { + user_operation, + entry_point, + user_op_hash, + received_at, + chain_id, + } + } +} \ No newline at end of file diff --git a/crates/ingress-rpc/src/bin/main.rs b/crates/ingress-rpc/src/bin/main.rs index 83aa8a05..ed226e1d 100644 --- a/crates/ingress-rpc/src/bin/main.rs +++ b/crates/ingress-rpc/src/bin/main.rs @@ -11,7 +11,7 @@ use tips_core::logger::init_logger; use tips_ingress_rpc::Config; use tips_ingress_rpc::connect_ingress_to_builder; use tips_ingress_rpc::metrics::init_prometheus_exporter; -use tips_ingress_rpc::queue::KafkaQueuePublisher; +use tips_ingress_rpc::queue::{BundleQueuePublisher, UserOperationQueuePublisher}; use tips_ingress_rpc::service::{IngressApiServer, IngressService}; use tokio::sync::{broadcast, mpsc}; use tracing::info; @@ -47,19 +47,31 @@ async fn main() -> anyhow::Result<()> { .network::() .connect_http(config.simulation_rpc); + // Bundle queue setup let ingress_client_config = ClientConfig::from_iter(load_kafka_config_from_file( &config.ingress_kafka_properties, )?); - - let queue_producer: FutureProducer = ingress_client_config.create()?; - - let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); - + let bundle_producer: FutureProducer = ingress_client_config.create()?; + let bundle_queue = BundleQueuePublisher::new(bundle_producer, config.ingress_topic.clone()); + + // User operation queue setup + let user_op_queue = if let Some(user_ops_props) = &config.user_ops_kafka_properties { + // Use dedicated user ops configuration if provided + let user_ops_config = ClientConfig::from_iter( + load_kafka_config_from_file(user_ops_props)? + ); + let user_ops_producer: FutureProducer = user_ops_config.create()?; + UserOperationQueuePublisher::new(user_ops_producer, config.user_ops_topic.clone()) + } else { + // Fall back to using the same config as bundles + let user_ops_producer: FutureProducer = ingress_client_config.create()?; + UserOperationQueuePublisher::new(user_ops_producer, config.user_ops_topic.clone()) + }; + + // Audit setup let audit_client_config = ClientConfig::from_iter(load_kafka_config_from_file(&config.audit_kafka_properties)?); - let audit_producer: FutureProducer = audit_client_config.create()?; - let audit_publisher = KafkaBundleEventPublisher::new(audit_producer, config.audit_topic); let (audit_tx, audit_rx) = mpsc::unbounded_channel::(); connect_audit_to_publisher(audit_rx, audit_publisher); @@ -74,7 +86,8 @@ async fn main() -> anyhow::Result<()> { let service = IngressService::new( provider, simulation_provider, - queue, + bundle_queue, + user_op_queue, audit_tx, builder_tx, cfg, diff --git a/crates/ingress-rpc/src/lib.rs b/crates/ingress-rpc/src/lib.rs index 970ab769..208cd903 100644 --- a/crates/ingress-rpc/src/lib.rs +++ b/crates/ingress-rpc/src/lib.rs @@ -83,6 +83,18 @@ pub struct Config { )] pub audit_topic: String, + /// Kafka properties file for user operations (optional, defaults to ingress properties) + #[arg(long, env = "TIPS_INGRESS_KAFKA_USER_OPS_PROPERTIES_FILE")] + pub user_ops_kafka_properties: Option, + + /// Kafka topic for user operations + #[arg( + long, + env = "TIPS_INGRESS_KAFKA_USER_OPS_TOPIC", + default_value = "tips-user-operations" + )] + pub user_ops_topic: String, + #[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")] pub log_level: String, diff --git a/crates/ingress-rpc/src/queue.rs b/crates/ingress-rpc/src/queue.rs index a13ad4fe..61ca9f6c 100644 --- a/crates/ingress-rpc/src/queue.rs +++ b/crates/ingress-rpc/src/queue.rs @@ -1,58 +1,55 @@ use alloy_primitives::B256; use anyhow::Result; -use async_trait::async_trait; use backon::{ExponentialBuilder, Retryable}; use rdkafka::producer::{FutureProducer, FutureRecord}; -use tips_core::AcceptedBundle; +use tips_core::{AcceptedBundle, UserOperationWithMetadata}; use tokio::time::Duration; use tracing::{error, info}; -/// A queue to buffer transactions -#[async_trait] -pub trait QueuePublisher: Send + Sync { - async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()>; -} - -/// A queue to buffer transactions -pub struct KafkaQueuePublisher { +/// Internal Kafka implementation - handles the low-level Kafka publishing with retry logic +struct KafkaPublisher { producer: FutureProducer, topic: String, } -impl KafkaQueuePublisher { - pub fn new(producer: FutureProducer, topic: String) -> Self { +impl KafkaPublisher { + fn new(producer: FutureProducer, topic: String) -> Self { Self { producer, topic } } -} - -#[async_trait] -impl QueuePublisher for KafkaQueuePublisher { - async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()> { - let key = bundle_hash.to_string(); - let payload = serde_json::to_vec(&bundle)?; + /// Publish any message with a key to Kafka with automatic retry + async fn publish( + &self, + key: &str, + payload_bytes: Vec, + entity_type: &str, + ) -> Result<()> { let enqueue = || async { - let record = FutureRecord::to(&self.topic).key(&key).payload(&payload); + let record = FutureRecord::to(&self.topic) + .key(key) + .payload(&payload_bytes); match self.producer.send(record, Duration::from_secs(5)).await { Ok((partition, offset)) => { info!( - bundle_hash = %bundle_hash, + key = %key, partition = partition, offset = offset, topic = %self.topic, - "Successfully enqueued bundle" + entity_type = entity_type, + "Successfully published to Kafka" ); Ok(()) } Err((err, _)) => { error!( - bundle_hash = %bundle_hash, + key = %key, error = %err, topic = %self.topic, - "Failed to enqueue bundle" + entity_type = entity_type, + "Failed to publish to Kafka" ); - Err(anyhow::anyhow!("Failed to enqueue bundle: {err}")) + Err(anyhow::anyhow!("Failed to publish: {err}")) } } }; @@ -65,12 +62,53 @@ impl QueuePublisher for KafkaQueuePublisher { .with_max_times(3), ) .notify(|err: &anyhow::Error, dur: Duration| { - info!("retrying to enqueue bundle {:?} after {:?}", err, dur); + info!("Retrying Kafka publish {:?} after {:?}", err, dur); }) .await } } +/// Publisher for bundle queues - handles bundle-specific publishing logic +pub struct BundleQueuePublisher { + kafka: KafkaPublisher, +} + +impl BundleQueuePublisher { + pub fn new(producer: FutureProducer, topic: String) -> Self { + Self { + kafka: KafkaPublisher::new(producer, topic), + } + } + + pub async fn publish(&self, bundle: &AcceptedBundle, bundle_hash: &B256) -> Result<()> { + let payload_bytes = serde_json::to_vec(bundle)?; + self.kafka + .publish(&bundle_hash.to_string(), payload_bytes, "bundle") + .await + } +} + +/// Publisher for user operations - handles user operation-specific publishing logic +pub struct UserOperationQueuePublisher { + kafka: KafkaPublisher, +} + +impl UserOperationQueuePublisher { + pub fn new(producer: FutureProducer, topic: String) -> Self { + Self { + kafka: KafkaPublisher::new(producer, topic), + } + } + + pub async fn publish(&self, user_op: &UserOperationWithMetadata) -> Result<()> { + let key = user_op.user_op_hash.to_string(); + let payload_bytes = serde_json::to_vec(user_op)?; + self.kafka + .publish(&key, payload_bytes, "user_operation") + .await + } +} + #[cfg(test)] mod tests { use super::*; @@ -93,7 +131,7 @@ mod tests { .create() .expect("Producer creation failed"); - let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); + let publisher = BundleQueuePublisher::new(producer, "tips-ingress-rpc".to_string()); let bundle = create_test_bundle(); let accepted_bundle = AcceptedBundle::new( bundle.try_into().unwrap(), diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index c0862a50..99530a8f 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,6 +1,6 @@ use alloy_consensus::transaction::Recovered; use alloy_consensus::{Transaction, transaction::SignerRecoverable}; -use alloy_primitives::{B256, Bytes}; +use alloy_primitives::{B256, Bytes, Address}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; use jsonrpsee::{ core::{RpcResult, async_trait}, @@ -14,13 +14,14 @@ use tips_audit::BundleEvent; use tips_core::types::ParsedBundle; use tips_core::{ AcceptedBundle, Bundle, BundleExtensions, BundleHash, CancelBundle, MeterBundleResponse, + UserOperation, UserOperationWithMetadata, }; use tokio::sync::{broadcast, mpsc}; use tokio::time::{Duration, Instant, timeout}; use tracing::{info, warn}; use crate::metrics::{Metrics, record_histogram}; -use crate::queue::QueuePublisher; +use crate::queue::{BundleQueuePublisher, UserOperationQueuePublisher}; use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_bundle, validate_tx}; use crate::{Config, TxSubmissionMethod}; @@ -37,26 +38,36 @@ pub trait IngressApi { /// Handler for: `eth_sendRawTransaction` #[method(name = "sendRawTransaction")] async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult; + + #[method(name = "sendUserOperation")] + async fn send_user_operation( + &self, + user_operation: UserOperation, + entry_point: Address, + ) -> RpcResult; } -pub struct IngressService { +pub struct IngressService { provider: RootProvider, simulation_provider: RootProvider, tx_submission_method: TxSubmissionMethod, - bundle_queue: Queue, + bundle_queue: BundleQueuePublisher, + user_op_queue: UserOperationQueuePublisher, audit_channel: mpsc::UnboundedSender, send_transaction_default_lifetime_seconds: u64, metrics: Metrics, block_time_milliseconds: u64, meter_bundle_timeout_ms: u64, builder_tx: broadcast::Sender, + chain_id: u64, } -impl IngressService { +impl IngressService { pub fn new( provider: RootProvider, simulation_provider: RootProvider, - queue: Queue, + bundle_queue: BundleQueuePublisher, + user_op_queue: UserOperationQueuePublisher, audit_channel: mpsc::UnboundedSender, builder_tx: broadcast::Sender, config: Config, @@ -65,7 +76,8 @@ impl IngressService { provider, simulation_provider, tx_submission_method: config.tx_submission_method, - bundle_queue: queue, + bundle_queue, + user_op_queue, audit_channel, send_transaction_default_lifetime_seconds: config .send_transaction_default_lifetime_seconds, @@ -73,15 +85,13 @@ impl IngressService { block_time_milliseconds: config.block_time_milliseconds, meter_bundle_timeout_ms: config.meter_bundle_timeout_ms, builder_tx, + chain_id: config.chain_id, } } } #[async_trait] -impl IngressApiServer for IngressService -where - Queue: QueuePublisher + Sync + Send + 'static, -{ +impl IngressApiServer for IngressService { async fn send_bundle(&self, bundle: Bundle) -> RpcResult { // validate the bundle and consume the `bundle` to get an `AcceptedBundle` self.validate_bundle(&bundle).await?; @@ -138,6 +148,39 @@ where todo!("implement cancel_bundle") } + + async fn send_user_operation(&self, user_operation: UserOperation, entry_point: Address) -> RpcResult { + let user_op_with_meta = UserOperationWithMetadata::new( + user_operation.clone(), + entry_point, + self.chain_id, + ); + + info!( + sender = %user_operation.sender(), + entry_point = %entry_point, + nonce = %user_operation.nonce(), + user_op_hash = %user_op_with_meta.user_op_hash, + "Received sendUserOperation request" + ); + + // Publish to Kafka + if let Err(e) = self.user_op_queue.publish(&user_op_with_meta).await { + warn!( + message = "Failed to publish user operation to queue", + user_op_hash = %user_op_with_meta.user_op_hash, + error = %e + ); + return Err(EthApiError::InvalidParams("Failed to queue user operation".into()).into_rpc_err()); + } + + info!( + message = "Queued user operation", + user_op_hash = %user_op_with_meta.user_op_hash + ); + + Ok(user_op_with_meta.user_op_hash) + } async fn send_raw_transaction(&self, data: Bytes) -> RpcResult { let start = Instant::now(); let transaction = self.validate_tx(&data).await?; @@ -220,10 +263,7 @@ where } } -impl IngressService -where - Queue: QueuePublisher + Sync + Send + 'static, -{ +impl IngressService { async fn validate_tx(&self, data: &Bytes) -> RpcResult> { let start = Instant::now(); if data.is_empty() {