diff --git a/balius-runtime/src/lib.rs b/balius-runtime/src/lib.rs index bcbd089..89bce96 100644 --- a/balius-runtime/src/lib.rs +++ b/balius-runtime/src/lib.rs @@ -6,6 +6,7 @@ use logging::LoggerHost; use router::Router; use sign::SignerHost; use std::{collections::HashMap, io::Read, path::Path, sync::Arc, time::Instant}; +use submit::SubmitHost; use thiserror::Error; use tokio::sync::{Mutex, RwLock}; use tracing::{debug, info, warn}; @@ -64,6 +65,9 @@ pub enum Error { #[error("ledger error: {0}")] Ledger(String), + #[error("submit error: {0}")] + Submit(String), + #[error("config error: {0}")] Config(String), @@ -312,7 +316,7 @@ struct WorkerState { pub logging: Option, pub kv: Option, pub sign: Option, - pub submit: Option, + pub submit: Option, pub http: Option, } @@ -560,7 +564,10 @@ impl Runtime { .sign .as_ref() .map(|s| SignerHost::new(id, s, &self.metrics)), - submit: self.submit.clone(), + submit: self + .submit + .as_ref() + .map(|s| SubmitHost::new(id, s, &self.metrics)), http: self.http.clone(), }, ); diff --git a/balius-runtime/src/metrics.rs b/balius-runtime/src/metrics.rs index 60b3549..33e2ca1 100644 --- a/balius-runtime/src/metrics.rs +++ b/balius-runtime/src/metrics.rs @@ -17,6 +17,7 @@ pub struct Metrics { tx_handled: Counter, undo_utxo_handled: Counter, undo_tx_handled: Counter, + submit_tx: Counter, signer_sign_payload: Counter, ledger_read_utxos: Counter, ledger_search_utxos: Counter, @@ -78,6 +79,11 @@ impl Metrics { .with_description("Amount of undo Tx event handled per worker.") .build(); + let submit_tx = meter + .u64_counter("submit_tx") + .with_description("Amount of submit_tx calls per worker.") + .build(); + let signer_sign_payload = meter .u64_counter("signer_sign_payload") .with_description("Amount of sign payload handled per worker.") @@ -150,6 +156,7 @@ impl Metrics { tx_handled, undo_utxo_handled, undo_tx_handled, + submit_tx, signer_sign_payload, ledger_read_utxos, ledger_search_utxos, @@ -219,6 +226,10 @@ impl Metrics { .add(1, &[KeyValue::new("worker", worker_id.to_owned())]); } + pub fn submit_tx(&self, worker_id: &str) { + self.submit_tx + .add(1, &[KeyValue::new("worker", worker_id.to_owned())]); + } pub fn signer_sign_payload(&self, worker_id: &str) { self.signer_sign_payload .add(1, &[KeyValue::new("worker", worker_id.to_owned())]); diff --git a/balius-runtime/src/submit/mod.rs b/balius-runtime/src/submit/mod.rs index 05999f5..e899e78 100644 --- a/balius-runtime/src/submit/mod.rs +++ b/balius-runtime/src/submit/mod.rs @@ -1,14 +1,52 @@ -use crate::wit::balius::app::submit as wit; +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::{metrics::Metrics, wit::balius::app::submit as wit}; + +pub mod u5c; + +#[async_trait::async_trait] +pub trait Submitter: Send + Sync { + async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError>; +} #[derive(Clone)] +#[allow(clippy::large_enum_variant)] pub enum Submit { Mock, + U5C(u5c::Submit), + Custom(Arc>), } -impl wit::Host for Submit { - async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError> { - println!("{}", hex::encode(tx)); +pub struct SubmitHost { + worker_id: String, + submit: Submit, + metrics: Arc, +} +impl SubmitHost { + pub fn new(worker_id: &str, submit: &Submit, metrics: &Arc) -> Self { + Self { + worker_id: worker_id.to_string(), + submit: submit.clone(), + metrics: metrics.clone(), + } + } +} - Ok(()) +impl wit::Host for SubmitHost { + async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError> { + self.metrics.submit_tx(&self.worker_id); + match &mut self.submit { + Submit::Mock => { + println!("{}", hex::encode(tx)); + Ok(()) + } + Submit::U5C(x) => x.submit_tx(tx).await, + Submit::Custom(x) => { + let mut lock = x.lock().await; + lock.submit_tx(tx).await + } + } } } diff --git a/balius-runtime/src/submit/u5c.rs b/balius-runtime/src/submit/u5c.rs new file mode 100644 index 0000000..683eef6 --- /dev/null +++ b/balius-runtime/src/submit/u5c.rs @@ -0,0 +1,50 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +use crate::wit::balius::app::submit as wit; + +#[derive(Clone, Serialize, Deserialize, Debug)] +pub struct Config { + pub endpoint_url: String, + pub headers: Option>, +} + +#[derive(Clone)] +pub struct Submit { + client: utxorpc::CardanoSubmitClient, +} + +impl Submit { + pub async fn new(config: &Config) -> Result { + let mut builder = utxorpc::ClientBuilder::new().uri(&config.endpoint_url)?; + if let Some(headers) = &config.headers { + for (k, v) in headers.iter() { + builder = builder.metadata(k, v)?; + } + } + + Ok(Self { + client: builder.build().await, + }) + } + + pub async fn submit_tx(&mut self, tx: wit::Cbor) -> Result<(), wit::SubmitError> { + self.client + .submit_tx(vec![tx]) + .await + .map_err(|err| match err { + utxorpc::Error::GrpcError(status) => { + let code: i32 = status.code().into(); + if code == 3 { + wit::SubmitError::Invalid(status.to_string()) + } else { + wit::SubmitError::Internal(status.to_string()) + } + } + utxorpc::Error::TransportError(err) => wit::SubmitError::Internal(err.to_string()), + utxorpc::Error::ParseError(err) => wit::SubmitError::Internal(err.to_string()), + })?; + Ok(()) + } +} diff --git a/balius-sdk/src/qol.rs b/balius-sdk/src/qol.rs index 7676c55..be23e54 100644 --- a/balius-sdk/src/qol.rs +++ b/balius-sdk/src/qol.rs @@ -27,6 +27,8 @@ pub enum Error { Ledger(wit::balius::app::ledger::LedgerError), #[error("sign error: {0}")] Sign(wit::balius::app::sign::SignError), + #[error("submit error: {0}")] + Submit(wit::balius::app::submit::SubmitError), #[error("http error: {0}")] Http(wit::balius::app::http::ErrorCode), } @@ -74,6 +76,10 @@ impl From for wit::HandleError { code: 9, message: format!("event mismatch, expected {x}"), }, + Error::Submit(err) => wit::HandleError { + code: 10, + message: err.to_string(), + }, } } } @@ -102,6 +108,12 @@ impl From for Error { } } +impl From for Error { + fn from(error: wit::balius::app::submit::SubmitError) -> Self { + Error::Submit(error) + } +} + impl From for Error { fn from(error: wit::balius::app::http::ErrorCode) -> Self { Error::Http(error) diff --git a/baliusd/src/config.rs b/baliusd/src/config.rs index 8a5bfda..b0bb857 100644 --- a/baliusd/src/config.rs +++ b/baliusd/src/config.rs @@ -12,7 +12,7 @@ use balius_runtime::{ sign::in_memory::{Ed25519Key, SignerKey}, }; use pallas::crypto::key::ed25519; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DisplayFromStr}; use tokio::sync::{Mutex, RwLock}; @@ -145,6 +145,13 @@ pub enum HttpConfig { Reqwest(ReqwestHttpConfig), } +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(tag = "type")] +#[serde(rename_all = "lowercase")] +pub enum SubmitConfig { + U5c(balius_runtime::submit::u5c::Config), +} + #[derive(Deserialize, Clone, Debug)] pub struct Config { pub rpc: drivers::jsonrpc::Config, @@ -158,6 +165,7 @@ pub struct Config { pub signing: Option, pub store: Option, pub http: Option, + pub submit: Option, } impl From<&Config> for balius_runtime::kv::Kv { @@ -232,3 +240,16 @@ impl From<&Config> for balius_runtime::http::Http { balius_runtime::http::Http::Reqwest(client) } } + +impl Config { + pub async fn into_submit(self) -> balius_runtime::submit::Submit { + match &self.submit { + Some(SubmitConfig::U5c(cfg)) => balius_runtime::submit::Submit::U5C( + balius_runtime::submit::u5c::Submit::new(cfg) + .await + .expect("Failed to convert config into submit interface"), + ), + None => balius_runtime::submit::Submit::Mock, + } + } +} diff --git a/baliusd/src/main.rs b/baliusd/src/main.rs index 3dac126..4be14b8 100644 --- a/baliusd/src/main.rs +++ b/baliusd/src/main.rs @@ -97,6 +97,7 @@ async fn daemon(debug: bool) -> miette::Result<()> { .with_kv(kv) .with_logger((&config).into()) .with_signer((&config).into()) + .with_submit(config.clone().into_submit().await) .with_http((&config).into()) .build() .into_diagnostic() diff --git a/examples/asteria-tracker/src/lib.rs b/examples/asteria-tracker/src/lib.rs index 2067f82..ea25d9e 100644 --- a/examples/asteria-tracker/src/lib.rs +++ b/examples/asteria-tracker/src/lib.rs @@ -144,7 +144,7 @@ fn handle_utxo(config: sdk::Config, utxo: sdk::Utxo) -> sdk::Work worker::logging::log( worker::logging::Level::Debug, &format!("UTxO {}:", &out_ref), - &format!("{:#?} - {pos_x_str} - {pos_y_str} - {fuel_str}", operation), + &format!("{operation:#?} - {pos_x_str} - {pos_y_str} - {fuel_str}"), ); let _ = HttpRequest::post(url).json(&payload)?.send()?; diff --git a/examples/comprehensive/src/lib.rs b/examples/comprehensive/src/lib.rs index c24f41e..bdcdabe 100644 --- a/examples/comprehensive/src/lib.rs +++ b/examples/comprehensive/src/lib.rs @@ -160,6 +160,19 @@ fn signer_sign_payload( #[serde_as] #[derive(Serialize, Deserialize)] +struct SubmitTxParams { + cbor: String, +} + +fn submit_tx(_: Config, request: Params) -> WorkerResult<()> { + let cbor = hex::decode(&request.cbor).map_err(|_| Error::BadParams)?; + balius_sdk::wit::balius::app::submit::submit_tx(&cbor)?; + + Ok(()) +} + +#[derive(Debug, Deserialize)] + struct LedgerSearchUtxosParams { address: String, max_items: u32, @@ -236,6 +249,7 @@ fn main() -> balius_sdk::Worker { FnHandler::from(signer_get_public_key), ) .with_request_handler("signer-sign-payload", FnHandler::from(signer_sign_payload)) + .with_request_handler("submit-tx", FnHandler::from(submit_tx)) .with_request_handler("ledger-search-utxos", FnHandler::from(ledger_search_utxos)) .with_signer("alice", "ed25519") .with_signer("bob", "ed25519") diff --git a/wit/balius.wit b/wit/balius.wit index 8939fd5..c559dd4 100644 --- a/wit/balius.wit +++ b/wit/balius.wit @@ -105,7 +105,10 @@ interface sign { interface submit { type cbor = list; - type submit-error = u32; + variant submit-error { + internal(string), + invalid(string) + } submit-tx: func(tx: cbor) -> result<_, submit-error>; }