diff --git a/Cargo.lock b/Cargo.lock index 8113613e..e81cb599 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,7 @@ dependencies = [ "caryatid_module_rest_server", "caryatid_sdk", "chrono", + "crc", "cryptoxide 0.5.1", "dashmap", "fraction", @@ -59,6 +60,26 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_address_state" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "async-trait", + "caryatid_sdk", + "config", + "fjall", + "hex", + "imbl", + "minicbor 0.26.5", + "serde_cbor", + "tempfile", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "acropolis_module_assets_state" version = "0.1.0" @@ -366,6 +387,7 @@ version = "0.1.0" dependencies = [ "acropolis_common", "acropolis_module_accounts_state", + "acropolis_module_address_state", "acropolis_module_assets_state", "acropolis_module_block_unpacker", "acropolis_module_drdd_state", diff --git a/common/Cargo.toml b/common/Cargo.toml index c970d5da..3cb24e0f 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -20,6 +20,7 @@ bigdecimal = "0.4.8" bitmask-enum = "2.2" bs58 = "0.5" chrono = { workspace = true } +crc = "3" gcd = "2.3" fraction = "0.15" hex = { workspace = true } diff --git a/common/src/address.rs b/common/src/address.rs index 9d5a17c7..42f8eeda 100644 --- a/common/src/address.rs +++ b/common/src/address.rs @@ -4,15 +4,81 @@ use crate::cip19::{VarIntDecoder, VarIntEncoder}; use crate::types::{KeyHash, ScriptHash}; use anyhow::{anyhow, bail, Result}; +use crc::{Crc, CRC_32_ISO_HDLC}; +use minicbor::data::IanaTag; use serde_with::{hex::Hex, serde_as}; /// a Byron-era address -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub struct ByronAddress { /// Raw payload pub payload: Vec, } +impl ByronAddress { + fn compute_crc32(&self) -> u32 { + const CRC32: Crc = Crc::::new(&CRC_32_ISO_HDLC); + CRC32.checksum(&self.payload) + } + + pub fn to_string(&self) -> Result { + let crc = self.compute_crc32(); + + let mut buf = Vec::new(); + { + let mut enc = minicbor::Encoder::new(&mut buf); + enc.array(2)?; + enc.tag(IanaTag::Cbor)?; + enc.bytes(&self.payload)?; + enc.u32(crc)?; + } + + Ok(bs58::encode(buf).into_string()) + } + + pub fn from_string(s: &str) -> Result { + let bytes = bs58::decode(s).into_vec()?; + let mut dec = minicbor::Decoder::new(&bytes); + + let len = dec.array()?.unwrap_or(0); + if len != 2 { + anyhow::bail!("Invalid Byron address CBOR array length"); + } + + let tag = dec.tag()?; + if tag != IanaTag::Cbor.into() { + anyhow::bail!("Invalid Byron address CBOR tag, expected 24"); + } + + let payload = dec.bytes()?.to_vec(); + let crc = dec.u32()?; + + let address = ByronAddress { payload }; + let computed = address.compute_crc32(); + + if crc != computed { + anyhow::bail!("Byron address CRC mismatch"); + } + + Ok(address) + } + + pub fn to_bytes_key(&self) -> Result> { + let crc = self.compute_crc32(); + + let mut buf = Vec::new(); + { + let mut enc = minicbor::Encoder::new(&mut buf); + enc.array(2)?; + enc.tag(minicbor::data::IanaTag::Cbor)?; + enc.bytes(&self.payload)?; + enc.u32(crc)?; + } + + Ok(buf) + } +} + /// Address network identifier #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum AddressNetwork { @@ -170,11 +236,85 @@ impl ShelleyAddress { data.extend(delegation_hash); Ok(bech32::encode::(hrp, &data)?) } + + pub fn to_bytes_key(&self) -> Result> { + let network_bits = match self.network { + AddressNetwork::Main => 1u8, + AddressNetwork::Test => 0u8, + }; + + let (payment_hash, payment_bits): (&Vec, u8) = match &self.payment { + ShelleyAddressPaymentPart::PaymentKeyHash(data) => (data, 0), + ShelleyAddressPaymentPart::ScriptHash(data) => (data, 1), + }; + + let mut data = Vec::new(); + + match &self.delegation { + ShelleyAddressDelegationPart::None => { + let header = network_bits | (payment_bits << 4) | (3 << 5); + data.push(header); + data.extend(payment_hash); + } + ShelleyAddressDelegationPart::StakeKeyHash(hash) => { + let header = network_bits | (payment_bits << 4) | (0 << 5); + data.push(header); + data.extend(payment_hash); + data.extend(hash); + } + ShelleyAddressDelegationPart::ScriptHash(hash) => { + let header = network_bits | (payment_bits << 4) | (1 << 5); + data.push(header); + data.extend(payment_hash); + data.extend(hash); + } + ShelleyAddressDelegationPart::Pointer(pointer) => { + let header = network_bits | (payment_bits << 4) | (2 << 5); + data.push(header); + data.extend(payment_hash); + + let mut encoder = VarIntEncoder::new(); + encoder.push(pointer.slot); + encoder.push(pointer.tx_index); + encoder.push(pointer.cert_index); + data.extend(encoder.to_vec()); + } + } + + Ok(data) + } + + pub fn stake_address_string(&self) -> Result> { + let network_bit = match self.network { + AddressNetwork::Main => 1, + AddressNetwork::Test => 0, + }; + + match &self.delegation { + ShelleyAddressDelegationPart::StakeKeyHash(key_hash) => { + let mut data = Vec::with_capacity(29); + data.push(network_bit | (0b1110 << 4)); + data.extend_from_slice(key_hash); + let stake = StakeAddress::from_binary(&data)?.to_string()?; + Ok(Some(stake)) + } + ShelleyAddressDelegationPart::ScriptHash(script_hash) => { + let mut data = Vec::with_capacity(29); + data.push(network_bit | (0b1111 << 4)); + data.extend_from_slice(script_hash); + let stake = StakeAddress::from_binary(&data)?.to_string()?; + Ok(Some(stake)) + } + // TODO: Use chain store to resolve pointer delegation addresses + ShelleyAddressDelegationPart::Pointer(_pointer) => Ok(None), + ShelleyAddressDelegationPart::None => Ok(None), + } + } } /// Payload of a stake address #[serde_as] -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum StakeAddressPayload { /// Stake key StakeKeyHash(#[serde_as(as = "Hex")] Vec), @@ -196,7 +336,7 @@ impl StakeAddressPayload { } /// A stake address -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub struct StakeAddress { /// Network id pub network: AddressNetwork, @@ -271,10 +411,28 @@ impl StakeAddress { data.extend(stake_hash); Ok(bech32::encode::(hrp, &data)?) } + + pub fn to_bytes_key(&self) -> Result> { + let mut out = Vec::new(); + let (bits, hash): (u8, &[u8]) = match &self.payload { + StakeAddressPayload::StakeKeyHash(h) => (0b1110, h), + StakeAddressPayload::ScriptHash(h) => (0b1111, h), + }; + + let net_bit = match self.network { + AddressNetwork::Main => 1, + AddressNetwork::Test => 0, + }; + + let header = net_bit | (bits << 4); + out.push(header); + out.extend_from_slice(hash); + Ok(out) + } } /// A Cardano address -#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] pub enum Address { None, Byron(ByronAddress), @@ -306,10 +464,9 @@ impl Address { } else if text.starts_with("stake1") || text.starts_with("stake_test1") { Ok(Self::Stake(StakeAddress::from_string(text)?)) } else { - if let Ok(bytes) = bs58::decode(text).into_vec() { - Ok(Self::Byron(ByronAddress { payload: bytes })) - } else { - Ok(Self::None) + match ByronAddress::from_string(text) { + Ok(byron) => Ok(Self::Byron(byron)), + Err(_) => Ok(Self::None), } } } @@ -318,11 +475,46 @@ impl Address { pub fn to_string(&self) -> Result { match self { Self::None => Err(anyhow!("No address")), - Self::Byron(byron) => Ok(bs58::encode(&byron.payload).into_string()), + Self::Byron(byron) => byron.to_string(), Self::Shelley(shelley) => shelley.to_string(), Self::Stake(stake) => stake.to_string(), } } + + pub fn to_bytes_key(&self) -> Result> { + match self { + Address::Byron(b) => b.to_bytes_key(), + + Address::Shelley(s) => s.to_bytes_key(), + + Address::Stake(stake) => stake.to_bytes_key(), + + Address::None => Err(anyhow!("No address to convert")), + } + } + + pub fn kind(&self) -> &'static str { + match self { + Address::Byron(_) => "byron", + Address::Shelley(_) => "shelley", + Address::Stake(_) => "stake", + Address::None => "none", + } + } + + pub fn is_script(&self) -> bool { + match self { + Address::Shelley(shelley) => match shelley.payment { + ShelleyAddressPaymentPart::PaymentKeyHash(_) => false, + ShelleyAddressPaymentPart::ScriptHash(_) => true, + }, + Address::Stake(stake) => match stake.payload { + StakeAddressPayload::StakeKeyHash(_) => false, + StakeAddressPayload::ScriptHash(_) => true, + }, + Address::Byron(_) | Address::None => false, + } + } } // -- Tests -- @@ -336,7 +528,7 @@ mod tests { let payload = vec![42]; let address = Address::Byron(ByronAddress { payload }); let text = address.to_string().unwrap(); - assert_eq!(text, "j"); + assert_eq!(text, "8MMy4x9jE734Gz"); let unpacked = Address::from_string(&text).unwrap(); assert_eq!(address, unpacked); @@ -546,6 +738,30 @@ mod tests { assert_eq!(address, unpacked); } + #[test] + fn shelley_to_stake_address_string_mainnet() { + let normal_address = ShelleyAddress::from_string("addr1q82peck5fynytkgjsp9vnpul59zswsd4jqnzafd0mfzykma625r684xsx574ltpznecr9cnc7n9e2hfq9lyart3h5hpszffds5").expect("valid normal address"); + let script_address = ShelleyAddress::from_string("addr1zx0whlxaw4ksygvuljw8jxqlw906tlql06ern0gtvvzhh0c6409492020k6xml8uvwn34wrexagjh5fsk5xk96jyxk2qhlj6gf").expect("valid script address"); + + let normal_stake_address = normal_address + .stake_address_string() + .expect("stake_address_string should not fail") + .expect("normal address should have stake credential"); + let script_stake_address = script_address + .stake_address_string() + .expect("stake_address_string should not fail") + .expect("script address should have stake credential"); + + assert_eq!( + normal_stake_address, + "stake1uxa92par6ngr202l4s3fuupjufu0fju4t5szljw34cm6tscq40449" + ); + assert_eq!( + script_stake_address, + "stake1uyd2hj6j4848mdrdln7x8fc6hpunw5ft6yct2rtzafzrt9qh0m28h" + ); + } + #[test] fn stake_address_from_binary_mainnet_stake() { // First withdrawal on Mainnet diff --git a/common/src/messages.rs b/common/src/messages.rs index c8d708df..e345e488 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -8,6 +8,7 @@ use crate::ledger_state::SPOState; use crate::protocol_params::{NonceHash, ProtocolParams}; use crate::queries::parameters::{ParametersStateQuery, ParametersStateQueryResponse}; use crate::queries::spdd::{SPDDStateQuery, SPDDStateQueryResponse}; +use crate::queries::utxos::{UTxOStateQuery, UTxOStateQueryResponse}; use crate::queries::{ accounts::{AccountsStateQuery, AccountsStateQueryResponse}, addresses::{AddressStateQuery, AddressStateQueryResponse}, @@ -394,10 +395,11 @@ pub enum StateQuery { Mempool(MempoolStateQuery), Metadata(MetadataStateQuery), Network(NetworkStateQuery), + Parameters(ParametersStateQuery), Pools(PoolsStateQuery), Scripts(ScriptsStateQuery), Transactions(TransactionsStateQuery), - Parameters(ParametersStateQuery), + UTxOs(UTxOStateQuery), SPDD(SPDDStateQuery), } @@ -413,9 +415,10 @@ pub enum StateQueryResponse { Mempool(MempoolStateQueryResponse), Metadata(MetadataStateQueryResponse), Network(NetworkStateQueryResponse), + Parameters(ParametersStateQueryResponse), Pools(PoolsStateQueryResponse), Scripts(ScriptsStateQueryResponse), Transactions(TransactionsStateQueryResponse), - Parameters(ParametersStateQueryResponse), + UTxOs(UTxOStateQueryResponse), SPDD(SPDDStateQueryResponse), } diff --git a/common/src/queries/addresses.rs b/common/src/queries/addresses.rs index 86a3e550..01985834 100644 --- a/common/src/queries/addresses.rs +++ b/common/src/queries/addresses.rs @@ -1,39 +1,20 @@ +use crate::{Address, AddressTotals, TxIdentifier, UTxOIdentifier}; + +pub const DEFAULT_ADDRESS_QUERY_TOPIC: (&str, &str) = + ("address-state-query-topic", "cardano.query.address"); + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AddressStateQuery { - GetAddressInfo { address_key: Vec }, - GetAddressInfoExtended { address_key: Vec }, - GetAddressAssetTotals { address_key: Vec }, - GetAddressUTxOs { address_key: Vec }, - GetAddressAssetUTxOs { address_key: Vec }, - GetAddressTransactions { address_key: Vec }, + GetAddressTotals { address: Address }, + GetAddressUTxOs { address: Address }, + GetAddressTransactions { address: Address }, } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum AddressStateQueryResponse { - AddressInfo(AddressInfo), - AddressInfoExtended(AddressInfoExtended), - AddressAssetTotals(AddressAssetTotals), - AddressUTxOs(AddressUTxOs), - AddressAssetUTxOs(AddressAssetUTxOs), - AddressTransactions(AddressTransactions), + AddressTotals(AddressTotals), + AddressUTxOs(Vec), + AddressTransactions(Vec), NotFound, Error(String), } - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddressInfo {} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddressInfoExtended {} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddressAssetTotals {} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddressUTxOs {} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddressAssetUTxOs {} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddressTransactions {} diff --git a/common/src/queries/mod.rs b/common/src/queries/mod.rs index 8aafd35f..356212ed 100644 --- a/common/src/queries/mod.rs +++ b/common/src/queries/mod.rs @@ -18,6 +18,7 @@ pub mod scripts; pub mod spdd; pub mod transactions; pub mod utils; +pub mod utxos; pub fn get_query_topic(context: Arc>, topic: (&str, &str)) -> String { context.config.get_string(topic.0).unwrap_or_else(|_| topic.1.to_string()) diff --git a/common/src/queries/utxos.rs b/common/src/queries/utxos.rs new file mode 100644 index 00000000..bb75c4c0 --- /dev/null +++ b/common/src/queries/utxos.rs @@ -0,0 +1,18 @@ +use crate::{UTxOIdentifier, Value}; + +pub const DEFAULT_UTXOS_QUERY_TOPIC: (&str, &str) = + ("utxo-state-query-topic", "cardano.query.utxos"); + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum UTxOStateQuery { + GetUTxOsSum { + utxo_identifiers: Vec, + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum UTxOStateQueryResponse { + UTxOsSum(Value), + NotFound, + Error(String), +} diff --git a/common/src/types.rs b/common/src/types.rs index 898ca34a..f6b1e192 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{hex::Hex, serde_as}; use std::collections::{HashMap, HashSet}; use std::fmt::{Display, Formatter}; -use std::ops::Neg; +use std::ops::{AddAssign, Neg}; use std::{cmp::Ordering, fmt}; #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -141,8 +141,11 @@ pub struct AddressDelta { /// Address pub address: Address, + /// UTxO causing address delta + pub utxo: UTxOIdentifier, + /// Balance change - pub delta: ValueDelta, + pub value: ValueDelta, } /// Stake balance change @@ -165,10 +168,24 @@ pub struct StakeRewardDelta { pub type PolicyId = [u8; 28]; pub type NativeAssets = Vec<(PolicyId, Vec)>; pub type NativeAssetsDelta = Vec<(PolicyId, Vec)>; +pub type NativeAssetsMap = HashMap>; -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, + Copy, + Clone, + Eq, + PartialEq, + Hash, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, +)] pub struct AssetName { + #[n(0)] len: u8, + #[n(1)] bytes: [u8; 32], } @@ -194,15 +211,23 @@ impl AssetName { } } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, +)] pub struct NativeAsset { + #[n(0)] pub name: AssetName, + #[n(1)] pub amount: u64, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[derive( + Debug, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, +)] pub struct NativeAssetDelta { + #[n(0)] pub name: AssetName, + #[n(1)] pub amount: i64, } @@ -230,12 +255,57 @@ impl Value { } } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +impl AddAssign<&Value> for Value { + fn add_assign(&mut self, other: &Value) { + self.lovelace += other.lovelace; + + for (policy_id, other_assets) in &other.assets { + if let Some((_, existing_assets)) = + self.assets.iter_mut().find(|(pid, _)| pid == policy_id) + { + for other_asset in other_assets { + if let Some(existing) = + existing_assets.iter_mut().find(|a| a.name == other_asset.name) + { + existing.amount += other_asset.amount; + } else { + existing_assets.push(other_asset.clone()); + } + } + } else { + self.assets.push((*policy_id, other_assets.clone())); + } + } + } +} + +/// Hashmap representation of Value (lovelace + multiasset) +#[derive( + Debug, Default, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, +)] +pub struct ValueMap { + #[n(0)] + pub lovelace: u64, + #[n(1)] + pub assets: NativeAssetsMap, +} + +#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)] pub struct ValueDelta { pub lovelace: i64, pub assets: NativeAssetsDelta, } +#[derive( + Debug, Default, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, +)] +pub struct AddressTotalsMap { + #[n(0)] + pub lovelace: i64, + #[n(1)] + pub assets: NativeAssetsMap, +} + impl ValueDelta { pub fn new(lovelace: i64, assets: NativeAssetsDelta) -> Self { Self { lovelace, assets } @@ -334,9 +404,19 @@ pub type TxHash = [u8; 32]; /// Compact transaction identifier (block_number, tx_index). #[derive( - Debug, Default, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize, + Debug, + Default, + Clone, + Copy, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, )] -pub struct TxIdentifier([u8; 6]); +pub struct TxIdentifier(#[n(0)] [u8; 6]); impl TxIdentifier { pub fn new(block_number: u32, tx_index: u16) -> Self { @@ -370,8 +450,19 @@ impl From for TxIdentifier { } // Compact UTxO identifier (block_number, tx_index, output_index) -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)] -pub struct UTxOIdentifier([u8; 8]); +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Hash, + serde::Serialize, + serde::Deserialize, + minicbor::Encode, + minicbor::Decode, +)] +pub struct UTxOIdentifier(#[n(0)] [u8; 8]); impl UTxOIdentifier { pub fn new(block_number: u32, tx_index: u16, output_index: u16) -> Self { @@ -1744,6 +1835,64 @@ pub struct AssetAddressEntry { pub quantity: u64, } +#[derive( + Debug, Default, Clone, serde::Serialize, serde::Deserialize, minicbor::Encode, minicbor::Decode, +)] +pub struct AddressTotals { + #[n(0)] + pub sent: ValueMap, + #[n(1)] + pub received: ValueMap, + #[n(2)] + pub tx_count: u64, +} + +impl AddressTotals { + pub fn apply_delta(&mut self, delta: &ValueDelta) { + if delta.lovelace > 0 { + self.received.lovelace += delta.lovelace as u64; + } else if delta.lovelace < 0 { + self.sent.lovelace += (-delta.lovelace) as u64; + } + + for (policy, assets) in &delta.assets { + for a in assets { + if a.amount > 0 { + Self::apply_asset( + &mut self.received.assets, + *policy, + a.name.clone(), + a.amount as u64, + ); + } else if a.amount < 0 { + Self::apply_asset( + &mut self.sent.assets, + *policy, + a.name.clone(), + a.amount.unsigned_abs(), + ); + } + } + } + + self.tx_count += 1; + } + + fn apply_asset( + target: &mut HashMap<[u8; 28], HashMap>, + policy: [u8; 28], + name: AssetName, + amount: u64, + ) { + target + .entry(policy) + .or_default() + .entry(name) + .and_modify(|v| *v += amount) + .or_insert(amount); + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/modules/address_state/.gitignore b/modules/address_state/.gitignore new file mode 100644 index 00000000..9f4c740d --- /dev/null +++ b/modules/address_state/.gitignore @@ -0,0 +1 @@ +db/ \ No newline at end of file diff --git a/modules/address_state/Cargo.toml b/modules/address_state/Cargo.toml new file mode 100644 index 00000000..081da56a --- /dev/null +++ b/modules/address_state/Cargo.toml @@ -0,0 +1,32 @@ +# Acropolis address state module + +[package] +name = "acropolis_module_address_state" +version = "0.1.0" +edition = "2021" +authors = ["William Hankins "] +description = "Address State Tracker" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +async-trait = "0.1" +config = { workspace = true } +fjall = "2.7.0" +hex = { workspace = true } +imbl = { workspace = true } +minicbor = { version = "0.26.0", features = ["std", "derive"] } +serde_cbor = "0.11" +tokio = { workspace = true } +tracing = { workspace = true } + +[dev-dependencies] +tempfile = "3" +tracing-subscriber = { version = "0.3", features = ["fmt"] } + +[lib] +path = "src/address_state.rs" diff --git a/modules/address_state/src/address_state.rs b/modules/address_state/src/address_state.rs new file mode 100644 index 00000000..1215bf87 --- /dev/null +++ b/modules/address_state/src/address_state.rs @@ -0,0 +1,261 @@ +//! Acropolis Address State module for Caryatid. +//! Consumes address delta messages and indexes per-address +//! utxos, transactions, and total sent/received amounts. + +use std::sync::Arc; + +use acropolis_common::{ + messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, + queries::addresses::{ + AddressStateQuery, AddressStateQueryResponse, DEFAULT_ADDRESS_QUERY_TOPIC, + }, + BlockInfo, BlockStatus, +}; +use anyhow::Result; +use caryatid_sdk::{module, Context, Module, Subscription}; +use config::Config; +use tokio::sync::{mpsc, Mutex}; +use tracing::{error, info}; + +use crate::{ + immutable_address_store::ImmutableAddressStore, + state::{AddressStorageConfig, State}, +}; +mod immutable_address_store; +mod state; +mod volatile_addresses; + +// Subscription topics +const DEFAULT_ADDRESS_DELTAS_SUBSCRIBE_TOPIC: (&str, &str) = + ("address-deltas-subscribe-topic", "cardano.address.delta"); +const DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC: (&str, &str) = + ("parameters-subscribe-topic", "cardano.protocol.parameters"); + +// Configuration defaults +const DEFAULT_ADDRESS_DB_PATH: (&str, &str) = ("db-path", "./db"); +const DEFAULT_STORE_INFO: (&str, bool) = ("store-info", false); +const DEFAULT_STORE_TOTALS: (&str, bool) = ("store-totals", false); +const DEFAULT_STORE_TRANSACTIONS: (&str, bool) = ("store-transactions", false); + +/// Address State module +#[module( + message_type(Message), + name = "address-state", + description = "In-memory Address State from address delta events" +)] +pub struct AddressState; + +impl AddressState { + async fn run( + state_mutex: Arc>, + mut address_deltas_subscription: Box>, + mut params_subscription: Box>, + ) -> Result<()> { + let _ = params_subscription.read().await?; + info!("Consumed initial genesis params from params_subscription"); + + // Background task to persist epochs sequentialy + const MAX_PENDING_PERSISTS: usize = 1; + let (persist_tx, mut persist_rx) = + mpsc::channel::<(u64, Arc, AddressStorageConfig)>( + MAX_PENDING_PERSISTS, + ); + tokio::spawn(async move { + while let Some((epoch, store, config)) = persist_rx.recv().await { + if let Err(e) = store.persist_epoch(epoch, &config).await { + error!("failed to persist epoch {epoch}: {e}"); + } + } + }); + + // Main loop of synchronised messages + loop { + // Address deltas are the synchroniser + let (_, deltas_msg) = address_deltas_subscription.read().await?; + let (current_block, new_epoch) = match deltas_msg.as_ref() { + Message::Cardano((info, _)) => (info.clone(), info.new_epoch && info.epoch > 0), + _ => continue, + }; + + if current_block.status == BlockStatus::RolledBack { + let mut state = state_mutex.lock().await; + state.volatile.rollback_before(current_block.number); + state.volatile.next_block(); + } + + // Read params message on epoch bounday to update rollback window + // length if needed and set epoch start block for volatile pruning + if new_epoch { + let (_, message) = params_subscription.read().await?; + if let Message::Cardano((ref block_info, CardanoMessage::ProtocolParams(params))) = + message.as_ref() + { + Self::check_sync(¤t_block, &block_info, "params"); + let mut state = state_mutex.lock().await; + state.volatile.start_new_epoch(block_info.number); + if let Some(shelley) = ¶ms.params.shelley { + state.volatile.update_k(shelley.security_param); + } + } + } + + // Process address deltas into volatile and persist to disk if a full epoch is out of rollback window + match deltas_msg.as_ref() { + Message::Cardano(( + ref block_info, + CardanoMessage::AddressDeltas(address_deltas_msg), + )) => { + let (should_prune, store, config, epoch); + { + let mut state = state_mutex.lock().await; + // Skip processing for epochs already stored to DB + if let Some(min_epoch) = state.config.skip_until { + if block_info.epoch <= min_epoch { + state.volatile.next_block(); + continue; + } + } + + // Add deltas to volatile + if let Err(e) = state.apply_address_deltas(&address_deltas_msg.deltas) { + error!("address deltas handling error: {e:#}"); + } + + store = state.immutable.clone(); + config = state.config.clone(); + epoch = block_info.epoch; + + // Move volatile deltas for an epoch to ImmutableAddressStore if out of rollback window + should_prune = state.ready_to_prune(¤t_block); + if should_prune { + state.prune_volatile().await; + } + } + + if should_prune { + if let Err(e) = + persist_tx.send((epoch, store.clone(), config.clone())).await + { + panic!("persistence worker crashed: {e}"); + } + } + + { + let mut state = state_mutex.lock().await; + state.volatile.next_block(); + } + } + other => error!("Unexpected message on address-deltas subscription: {other:?}"), + } + } + } + + fn check_sync(expected: &BlockInfo, actual: &BlockInfo, source: &str) { + if expected.number != actual.number { + error!( + expected = expected.number, + actual = actual.number, + source = source, + "Messages out of sync (expected deltas block {}, got {} from {})", + expected.number, + actual.number, + source, + ); + panic!( + "Message streams diverged: deltas at {} vs {} from {}", + expected.number, actual.number, source + ); + } + } + + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + fn get_bool_flag(config: &Config, key: (&str, bool)) -> bool { + config.get_bool(key.0).unwrap_or(key.1) + } + + fn get_string_flag(config: &Config, key: (&str, &str)) -> String { + config.get_string(key.0).unwrap_or_else(|_| key.1.to_string()) + } + + // Get configuration flags and query topic + let storage_config = AddressStorageConfig { + db_path: get_string_flag(&config, DEFAULT_ADDRESS_DB_PATH), + skip_until: None, + store_info: get_bool_flag(&config, DEFAULT_STORE_INFO), + store_totals: get_bool_flag(&config, DEFAULT_STORE_TOTALS), + store_transactions: get_bool_flag(&config, DEFAULT_STORE_TRANSACTIONS), + }; + + let address_query_topic = get_string_flag(&config, DEFAULT_ADDRESS_QUERY_TOPIC); + info!("Creating asset query handler on '{address_query_topic}'"); + + // Initialize state + let state = State::new(&storage_config).await?; + let state_mutex = Arc::new(Mutex::new(state)); + let state_run = state_mutex.clone(); + + // Query handler + context.handle(&address_query_topic, move |message| { + let state_mutex = state_mutex.clone(); + async move { + let Message::StateQuery(StateQuery::Addresses(query)) = message.as_ref() else { + return Arc::new(Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error( + "Invalid message for address-state".into(), + ), + ))); + }; + + let state = state_mutex.lock().await; + let response = match query { + AddressStateQuery::GetAddressUTxOs { address } => { + match state.get_address_utxos(&address).await { + Ok(Some(utxos)) => AddressStateQueryResponse::AddressUTxOs(utxos), + Ok(None) => AddressStateQueryResponse::NotFound, + Err(e) => AddressStateQueryResponse::Error(e.to_string()), + } + } + AddressStateQuery::GetAddressTransactions { address } => { + match state.get_address_transactions(&address).await { + Ok(Some(txs)) => AddressStateQueryResponse::AddressTransactions(txs), + Ok(None) => AddressStateQueryResponse::NotFound, + Err(e) => AddressStateQueryResponse::Error(e.to_string()), + } + } + AddressStateQuery::GetAddressTotals { address } => { + match state.get_address_totals(&address).await { + Ok(totals) => AddressStateQueryResponse::AddressTotals(totals), + Err(e) => AddressStateQueryResponse::Error(e.to_string()), + } + } + }; + Arc::new(Message::StateQueryResponse(StateQueryResponse::Addresses( + response, + ))) + } + }); + + if storage_config.any_enabled() { + // Get subscribe topics + let address_deltas_subscribe_topic = + get_string_flag(&config, DEFAULT_ADDRESS_DELTAS_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{address_deltas_subscribe_topic}'"); + let params_subscribe_topic = + get_string_flag(&config, DEFAULT_PARAMETERS_SUBSCRIBE_TOPIC); + info!("Creating subscriber on '{params_subscribe_topic}'"); + + // Subscribe to enabled topics + let address_deltas_sub = context.subscribe(&address_deltas_subscribe_topic).await?; + let params_sub = context.subscribe(¶ms_subscribe_topic).await?; + + // Start run task + context.run(async move { + Self::run(state_run, address_deltas_sub, params_sub) + .await + .unwrap_or_else(|e| error!("Failed: {e}")); + }); + } + + Ok(()) + } +} diff --git a/modules/address_state/src/immutable_address_store.rs b/modules/address_state/src/immutable_address_store.rs new file mode 100644 index 00000000..391f2716 --- /dev/null +++ b/modules/address_state/src/immutable_address_store.rs @@ -0,0 +1,314 @@ +use std::{ + collections::{HashMap, HashSet}, + path::Path, +}; + +use crate::state::{AddressEntry, AddressStorageConfig, UtxoDelta}; +use acropolis_common::{Address, AddressTotals, TxIdentifier, UTxOIdentifier}; +use anyhow::Result; +use fjall::{Keyspace, Partition, PartitionCreateOptions}; +use minicbor::{decode, to_vec}; +use tokio::{sync::Mutex, task}; +use tracing::{debug, error, info}; + +// Metadata keys which store the last epoch saved in each partition +const ADDRESS_UTXOS_EPOCH_COUNTER: &[u8] = b"utxos_epoch_last"; +const ADDRESS_TXS_EPOCH_COUNTER: &[u8] = b"txs_epoch_last"; +const ADDRESS_TOTALS_EPOCH_COUNTER: &[u8] = b"totals_epoch_last"; + +pub struct ImmutableAddressStore { + utxos: Partition, + txs: Partition, + totals: Partition, + keyspace: Keyspace, + pub pending: Mutex>>, +} + +impl ImmutableAddressStore { + pub fn new(path: impl AsRef) -> Result { + let cfg = fjall::Config::new(path).max_write_buffer_size(512 * 1024 * 1024); + let keyspace = Keyspace::open(cfg)?; + + let utxos = keyspace.open_partition("address_utxos", PartitionCreateOptions::default())?; + let txs = keyspace.open_partition("address_txs", PartitionCreateOptions::default())?; + let totals = + keyspace.open_partition("address_totals", PartitionCreateOptions::default())?; + + Ok(Self { + utxos, + txs, + totals, + keyspace, + pending: Mutex::new(Vec::new()), + }) + } + + /// Persists volatile UTxOs, transactions, and totals into their respective Fjall partitions for an entire epoch. + /// Skips any partitions that have already stored the given epoch. + /// All writes are batched and committed atomically, preventing on-disk corruption in case of failure. + pub async fn persist_epoch(&self, epoch: u64, config: &AddressStorageConfig) -> Result<()> { + let persist_utxos = config.store_info + && !self.epoch_exists(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER, epoch).await?; + let persist_txs = config.store_transactions + && !self.epoch_exists(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER, epoch).await?; + let persist_totals = config.store_totals + && !self.epoch_exists(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER, epoch).await?; + + if !(persist_utxos || persist_txs || persist_totals) { + debug!("no persistence needed for epoch {epoch} (already persisted or disabled)",); + return Ok(()); + } + + let drained_blocks = { + let mut pending = self.pending.lock().await; + std::mem::take(&mut *pending) + }; + + let mut batch = self.keyspace.batch(); + let mut change_count = 0; + + for block_map in drained_blocks.into_iter() { + if block_map.is_empty() { + continue; + } + + for (addr, entry) in block_map { + change_count += 1; + let addr_key = addr.to_bytes_key()?; + + if persist_utxos { + let mut live: HashSet = self + .utxos + .get(&addr_key)? + .map(|bytes| decode(&bytes)) + .transpose()? + .unwrap_or_default(); + + if let Some(deltas) = &entry.utxos { + for delta in deltas { + match delta { + UtxoDelta::Created(u) => { + live.insert(*u); + } + UtxoDelta::Spent(u) => { + live.remove(u); + } + } + } + } + + batch.insert(&self.utxos, &addr_key, to_vec(&live)?); + } + + if persist_txs { + let mut live: Vec = self + .txs + .get(&addr_key)? + .map(|bytes| decode(&bytes)) + .transpose()? + .unwrap_or_default(); + + if let Some(txs_deltas) = &entry.transactions { + live.extend(txs_deltas.iter().cloned()); + } + + batch.insert(&self.txs, &addr_key, to_vec(&live)?); + } + + if persist_totals { + let mut live: AddressTotals = self + .totals + .get(&addr_key)? + .map(|bytes| decode(&bytes)) + .transpose()? + .unwrap_or_default(); + + if let Some(deltas) = &entry.totals { + for delta in deltas { + live.apply_delta(delta); + } + } + + batch.insert(&self.totals, &addr_key, to_vec(&live)?); + } + } + } + + // Metadata markers + if persist_utxos { + batch.insert( + &self.utxos, + ADDRESS_UTXOS_EPOCH_COUNTER, + &epoch.to_le_bytes(), + ); + } + if persist_txs { + batch.insert(&self.txs, ADDRESS_TXS_EPOCH_COUNTER, &epoch.to_le_bytes()); + } + if persist_totals { + batch.insert( + &self.totals, + ADDRESS_TOTALS_EPOCH_COUNTER, + &epoch.to_le_bytes(), + ); + } + + match batch.commit() { + Ok(_) => { + info!("committed {change_count} address changes for epoch {epoch}"); + Ok(()) + } + Err(e) => { + error!("batch commit failed for epoch {epoch}: {e}"); + Err(e.into()) + } + } + } + + pub async fn update_immutable(&self, drained: Vec>) { + let mut pending = self.pending.lock().await; + pending.extend(drained); + } + + pub async fn get_utxos(&self, address: &Address) -> Result>> { + let key = address.to_bytes_key()?; + + let mut live: HashSet = + self.utxos.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default(); + + let pending = self.pending.lock().await; + for block_map in pending.iter() { + if let Some(entry) = block_map.get(address) { + if let Some(deltas) = &entry.utxos { + for delta in deltas { + match delta { + UtxoDelta::Created(u) => { + live.insert(*u); + } + UtxoDelta::Spent(u) => { + live.remove(u); + } + } + } + } + } + } + + if live.is_empty() { + Ok(None) + } else { + let vec: Vec<_> = live.into_iter().collect(); + Ok(Some(vec)) + } + } + + pub async fn get_txs(&self, address: &Address) -> Result>> { + let key = address.to_bytes_key()?; + let mut live: Vec = + self.txs.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default(); + + let pending = self.pending.lock().await; + for block_map in pending.iter() { + if let Some(entry) = block_map.get(address) { + if let Some(txs) = &entry.transactions { + live.extend(txs.iter().cloned()); + } + } + } + + if live.is_empty() { + Ok(None) + } else { + Ok(Some(live)) + } + } + + pub async fn get_totals(&self, address: &Address) -> Result> { + let key = address.to_bytes_key()?; + + let mut live: AddressTotals = + self.totals.get(&key)?.map(|bytes| decode(&bytes)).transpose()?.unwrap_or_default(); + + let pending = self.pending.lock().await; + for block_map in pending.iter() { + if let Some(entry) = block_map.get(address) { + if let Some(deltas) = &entry.totals { + for delta in deltas { + live.apply_delta(delta); + } + } + } + } + + if live.tx_count == 0 { + Ok(None) + } else { + Ok(Some(live)) + } + } + + pub async fn get_last_epoch_stored(&self) -> Result> { + let read_marker = |partition: Partition, key: &'static [u8]| async move { + task::spawn_blocking(move || { + Ok::<_, anyhow::Error>(match partition.get(key)? { + Some(bytes) if bytes.len() == 8 => { + let mut arr = [0u8; 8]; + arr.copy_from_slice(&bytes); + let val = u64::from_le_bytes(arr); + if val == u64::MAX { + None + } else { + Some(val) + } + } + _ => None, + }) + }) + .await? + }; + + let u = read_marker(self.utxos.clone(), ADDRESS_UTXOS_EPOCH_COUNTER).await?; + let t = read_marker(self.txs.clone(), ADDRESS_TXS_EPOCH_COUNTER).await?; + let tot = read_marker(self.totals.clone(), ADDRESS_TOTALS_EPOCH_COUNTER).await?; + + let min_epoch = [u, t, tot].into_iter().flatten().min(); + + if let Some(epoch) = min_epoch { + info!("last epoch already stored across partitions: {epoch}"); + } else { + info!("no epoch markers found across partitions"); + } + + Ok(min_epoch) + } + + async fn epoch_exists( + &self, + partition: Partition, + key: &'static [u8], + epoch: u64, + ) -> Result { + let exists = task::spawn_blocking(move || -> Result { + let bytes = match partition.get(key)? { + Some(b) if b.len() == 8 => b, + _ => return Ok(false), + }; + + let mut arr = [0u8; 8]; + arr.copy_from_slice(&bytes); + let last_epoch = u64::from_le_bytes(arr); + + Ok(epoch <= last_epoch) + }) + .await??; + + if exists { + let key_name = std::str::from_utf8(key) + .map(|s| s.to_string()) + .unwrap_or_else(|_| format!("{:?}", key)); + info!("epoch {epoch} already stored for {key_name}"); + } + + Ok(exists) + } +} diff --git a/modules/address_state/src/state.rs b/modules/address_state/src/state.rs new file mode 100644 index 00000000..d54d9265 --- /dev/null +++ b/modules/address_state/src/state.rs @@ -0,0 +1,383 @@ +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::Arc, +}; + +use acropolis_common::{ + Address, AddressDelta, AddressTotals, BlockInfo, TxIdentifier, UTxOIdentifier, ValueDelta, +}; +use anyhow::Result; + +use crate::{ + immutable_address_store::ImmutableAddressStore, volatile_addresses::VolatileAddresses, +}; + +#[derive(Debug, Default, Clone)] +pub struct AddressStorageConfig { + pub db_path: String, + pub skip_until: Option, + + pub store_info: bool, + pub store_totals: bool, + pub store_transactions: bool, +} + +impl AddressStorageConfig { + pub fn any_enabled(&self) -> bool { + self.store_info || self.store_totals || self.store_transactions + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash, minicbor::Encode, minicbor::Decode)] +pub enum UtxoDelta { + #[n(0)] + Created(#[n(0)] UTxOIdentifier), + #[n(1)] + Spent(#[n(0)] UTxOIdentifier), +} + +#[derive(Debug, Default, Clone)] +pub struct AddressEntry { + pub utxos: Option>, + pub transactions: Option>, + pub totals: Option>, +} + +#[derive(Clone)] +pub struct State { + pub config: AddressStorageConfig, + pub volatile: VolatileAddresses, + pub immutable: Arc, +} + +impl State { + pub async fn new(config: &AddressStorageConfig) -> Result { + let db_path = if Path::new(&config.db_path).is_relative() { + PathBuf::from(env!("CARGO_MANIFEST_DIR")).join(&config.db_path) + } else { + PathBuf::from(&config.db_path) + }; + + let store = Arc::new(ImmutableAddressStore::new(&db_path)?); + + let mut config = config.clone(); + config.skip_until = store.get_last_epoch_stored().await?; + + Ok(Self { + config, + volatile: VolatileAddresses::default(), + immutable: store, + }) + } + + pub async fn get_address_utxos( + &self, + address: &Address, + ) -> Result>> { + if !self.config.store_info { + return Err(anyhow::anyhow!("address info storage disabled in config")); + } + + let store = self.immutable.clone(); + let mut combined: HashSet = match store.get_utxos(address).await? { + Some(db) => db.into_iter().collect(), + None => HashSet::new(), + }; + + for map in self.volatile.window.iter() { + if let Some(entry) = map.get(address) { + if let Some(deltas) = &entry.utxos { + for delta in deltas { + match delta { + UtxoDelta::Created(u) => { + combined.insert(*u); + } + UtxoDelta::Spent(u) => { + combined.remove(u); + } + } + } + } + } + } + + if combined.is_empty() { + Ok(None) + } else { + Ok(Some(combined.into_iter().collect())) + } + } + + pub async fn get_address_transactions( + &self, + address: &Address, + ) -> Result>> { + if !self.config.store_transactions { + return Err(anyhow::anyhow!( + "address transactions storage disabled in config" + )); + } + + let store = self.immutable.clone(); + + let mut combined: Vec = match store.get_txs(address).await? { + Some(db) => db, + None => Vec::new(), + }; + + for map in self.volatile.window.iter() { + if let Some(entry) = map.get(address) { + if let Some(txs) = &entry.transactions { + combined.extend(txs); + } + } + } + + if combined.is_empty() { + Ok(None) + } else { + Ok(Some(combined)) + } + } + + pub async fn get_address_totals(&self, address: &Address) -> Result { + if !self.config.store_totals { + anyhow::bail!("address totals storage disabled in config"); + } + + let store = self.immutable.clone(); + + let mut totals = match store.get_totals(address).await? { + Some(db) => db, + None => AddressTotals::default(), + }; + + for map in self.volatile.window.iter() { + if let Some(entry) = map.get(address) { + if let Some(address_deltas) = &entry.totals { + for delta in address_deltas { + totals.apply_delta(delta); + } + } + } + } + + Ok(totals) + } + + pub async fn prune_volatile(&mut self) { + let drained = self.volatile.prune_volatile(); + self.immutable.update_immutable(drained).await; + } + + pub fn ready_to_prune(&self, block_info: &BlockInfo) -> bool { + block_info.epoch > 0 + && Some(block_info.epoch) != self.volatile.last_persisted_epoch + && block_info.number > self.volatile.epoch_start_block + self.volatile.security_param_k + } + + pub fn apply_address_deltas(&mut self, deltas: &[AddressDelta]) -> Result<()> { + let addresses = self.volatile.window.back_mut().expect("window should never be empty"); + + for delta in deltas { + let entry = addresses.entry(delta.address.clone()).or_default(); + + if self.config.store_info { + let utxos = entry.utxos.get_or_insert(Vec::new()); + if delta.value.lovelace > 0 { + utxos.push(UtxoDelta::Created(delta.utxo)); + } else { + utxos.push(UtxoDelta::Spent(delta.utxo)); + } + } + + if self.config.store_transactions { + let txs = entry.transactions.get_or_insert(Vec::new()); + txs.push(TxIdentifier::from(delta.utxo)) + } + + if self.config.store_totals { + let totals = entry.totals.get_or_insert(Vec::new()); + totals.push(delta.value.clone()); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use acropolis_common::{Address, AddressDelta, UTxOIdentifier, ValueDelta}; + use tempfile::tempdir; + + fn dummy_address() -> Address { + Address::from_string("DdzFFzCqrht7fNAHwdou7iXPJ5NZrssAH53yoRMUtF9t6momHH52EAxM5KmqDwhrjT7QsHjbMPJUBywmzAgmF4hj2h9eKj4U6Ahandyy").unwrap() + } + + fn test_config() -> AddressStorageConfig { + let dir = tempdir().unwrap(); + AddressStorageConfig { + db_path: dir.path().to_string_lossy().into_owned(), + skip_until: None, + store_info: true, + store_transactions: true, + store_totals: true, + } + } + + async fn setup_state_and_store() -> Result { + let config = test_config(); + let mut state = State::new(&config.clone()).await?; + state.volatile.epoch_start_block = 1; + Ok(state) + } + + fn delta(addr: &Address, utxo: &UTxOIdentifier, lovelace: i64) -> AddressDelta { + AddressDelta { + address: addr.clone(), + utxo: utxo.clone(), + value: ValueDelta { + lovelace, + assets: Vec::new(), + }, + } + } + + #[tokio::test] + async fn test_utxo_storage_lifecycle() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + + let mut state = setup_state_and_store().await?; + + let addr = dummy_address(); + let utxo = UTxOIdentifier::new(0, 0, 0); + let deltas = vec![delta(&addr, &utxo, 1)]; + + // Apply deltas + state.apply_address_deltas(&deltas)?; + + // Verify UTxO is retrievable when in volatile + let utxos = state.get_address_utxos(&addr).await?; + assert!(utxos.is_some()); + assert_eq!(utxos.as_ref().unwrap().len(), 1); + assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + + // Drain volatile to immutable + state.volatile.epoch_start_block = 1; + state.prune_volatile().await; + + // Verify UTxO is retrievable when in immutable pending + let utxos = state.get_address_utxos(&addr).await?; + assert!(utxos.is_some()); + assert_eq!(utxos.as_ref().unwrap().len(), 1); + assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + + // Perisist immutable to disk + state.immutable.persist_epoch(0, &state.config).await?; + + // Verify UTxO is retrievable after persisted to disk + let utxos = state.get_address_utxos(&addr).await?; + assert!(utxos.is_some()); + assert_eq!(utxos.as_ref().unwrap().len(), 1); + assert_eq!(utxos.as_ref().unwrap()[0], UTxOIdentifier::new(0, 0, 0)); + + Ok(()) + } + + #[tokio::test] + async fn test_utxo_removed_when_spent() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + + let mut state = setup_state_and_store().await?; + + let addr = dummy_address(); + let utxo = UTxOIdentifier::new(0, 0, 0); + + let created = vec![delta(&addr, &utxo, 1)]; + + // Apply delta to volatile + state.apply_address_deltas(&created)?; + + // Drain volatile to immutable pending + state.volatile.epoch_start_block = 1; + state.prune_volatile().await; + + // Perisist immutable to disk + state.immutable.persist_epoch(0, &state.config).await?; + + // Verify UTxO was persisted + let after_persist = state.get_address_utxos(&addr).await?; + assert_eq!(after_persist.as_ref().unwrap(), &[utxo]); + + state.volatile.next_block(); + state.apply_address_deltas(&[delta(&addr, &utxo, -1)])?; + + // Verify UTxO was removed while in volatile + let after_spend_volatile = state.get_address_utxos(&addr).await?; + assert!(after_spend_volatile.as_ref().map_or(true, |u| u.is_empty())); + + // Drain volatile to immutable + state.prune_volatile().await; + + // Verify UTxO was removed while in pending immutable + let after_spend_pending = state.get_address_utxos(&addr).await?; + assert!(after_spend_pending.as_ref().map_or(true, |u| u.is_empty())); + + // Perisist immutable to disk + state.immutable.persist_epoch(2, &state.config).await?; + + // Verify UTxO was removed after persisting spend to disk + let after_spend_disk = state.get_address_utxos(&addr).await?; + assert!(after_spend_disk.as_ref().map_or(true, |u| u.is_empty())); + + Ok(()) + } + + #[tokio::test] + async fn test_utxo_spent_and_created_across_blocks_in_volatile() -> Result<()> { + let _ = tracing_subscriber::fmt::try_init(); + + let mut state = setup_state_and_store().await?; + + let addr = dummy_address(); + let utxo_old = UTxOIdentifier::new(0, 0, 0); + let utxo_new = UTxOIdentifier::new(0, 1, 0); + + state.volatile.epoch_start_block = 1; + + state.apply_address_deltas(&[delta(&addr, &utxo_old, 1)])?; + state.volatile.next_block(); + state.apply_address_deltas(&[delta(&addr, &utxo_old, -1), delta(&addr, &utxo_new, 1)])?; + + // Verify Create and spend both in volatile is not included in address utxos + let volatile = state.get_address_utxos(&addr).await?; + assert!( + volatile.as_ref().is_some_and(|u| u.contains(&utxo_new) && !u.contains(&utxo_old)), + "Expected only new UTxO {:?} in volatile view, got {:?}", + utxo_new, + volatile + ); + + // Drain volatile to immutable + state.prune_volatile().await; + + // Perisist immutable to disk + state.immutable.persist_epoch(0, &state.config).await?; + + // UTxO not persisted to disk if created and spent in pruned volatile window + let persisted_view = state.get_address_utxos(&addr).await?; + assert!( + persisted_view + .as_ref() + .is_some_and(|u| u.contains(&utxo_new) && !u.contains(&utxo_old)), + "Expected only new UTxO {:?} after persistence, got {:?}", + utxo_new, + persisted_view + ); + + Ok(()) + } +} diff --git a/modules/address_state/src/volatile_addresses.rs b/modules/address_state/src/volatile_addresses.rs new file mode 100644 index 00000000..f8d4d429 --- /dev/null +++ b/modules/address_state/src/volatile_addresses.rs @@ -0,0 +1,70 @@ +use std::collections::{HashMap, VecDeque}; + +use acropolis_common::Address; + +use crate::state::AddressEntry; + +#[derive(Debug, Clone)] +pub struct VolatileAddresses { + pub window: VecDeque>, + pub start_block: u64, + pub epoch_start_block: u64, + pub last_persisted_epoch: Option, + pub security_param_k: u64, +} + +impl Default for VolatileAddresses { + fn default() -> Self { + Self::new() + } +} + +impl VolatileAddresses { + pub fn new() -> Self { + let mut window = VecDeque::new(); + window.push_back(HashMap::new()); + + VolatileAddresses { + window, + start_block: 0, + epoch_start_block: 0, + last_persisted_epoch: None, + security_param_k: 0, + } + } + + pub fn update_k(&mut self, k: u32) { + self.security_param_k = k as u64; + } + + pub fn next_block(&mut self) { + self.window.push_back(HashMap::new()); + } + + pub fn start_new_epoch(&mut self, block_number: u64) { + self.epoch_start_block = block_number; + } + + pub fn rollback_before(&mut self, block: u64) -> Vec<(Address, AddressEntry)> { + let mut out = Vec::new(); + + while self.start_block + self.window.len() as u64 >= block { + if let Some(map) = self.window.pop_back() { + out.extend(map.into_iter()); + } else { + break; + } + } + out + } + + pub fn prune_volatile(&mut self) -> Vec> { + let epoch = self.last_persisted_epoch.map(|e| e + 1).unwrap_or(0); + let blocks_to_drain = (self.epoch_start_block - self.start_block) as usize; + + self.start_block += blocks_to_drain as u64; + self.last_persisted_epoch = Some(epoch); + + self.window.drain(..blocks_to_drain).collect() + } +} diff --git a/modules/assets_state/src/state.rs b/modules/assets_state/src/state.rs index d9eea542..199f2f96 100644 --- a/modules/assets_state/src/state.rs +++ b/modules/assets_state/src/state.rs @@ -423,7 +423,7 @@ impl State { for address_delta in deltas { if let Address::Shelley(shelley_addr) = &address_delta.address { - for (policy_id, asset_deltas) in &address_delta.delta.assets { + for (policy_id, asset_deltas) in &address_delta.value.assets { for asset_delta in asset_deltas { if let Some(asset_id) = registry.lookup_id(policy_id, &asset_delta.name) { if let Some(holders) = addr_map.get_mut(&asset_id) { @@ -740,7 +740,8 @@ mod tests { fn make_address_delta(policy_id: PolicyId, name: AssetName, amount: i64) -> AddressDelta { AddressDelta { address: dummy_address(), - delta: ValueDelta { + utxo: UTxOIdentifier::new(0, 0, 0), + value: ValueDelta { lovelace: 0, assets: vec![(policy_id, vec![NativeAssetDelta { name, amount }])] .into_iter() diff --git a/modules/rest_blockfrost/src/handlers/addresses.rs b/modules/rest_blockfrost/src/handlers/addresses.rs new file mode 100644 index 00000000..8a7741bd --- /dev/null +++ b/modules/rest_blockfrost/src/handlers/addresses.rs @@ -0,0 +1,195 @@ +use anyhow::Result; +use std::sync::Arc; + +use acropolis_common::{ + messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, + queries::{ + addresses::{AddressStateQuery, AddressStateQueryResponse}, + utils::query_state, + utxos::{UTxOStateQuery, UTxOStateQueryResponse}, + }, + Address, Value, +}; +use caryatid_sdk::Context; + +use crate::{handlers_config::HandlersConfig, types::AddressInfoREST}; + +/// Handle `/addresses/{address}` Blockfrost-compatible endpoint +pub async fn handle_address_single_blockfrost( + context: Arc>, + params: Vec, + handlers_config: Arc, +) -> Result { + let [address_str] = ¶ms[..] else { + return Ok(RESTResponse::with_text(400, "Missing address parameter")); + }; + + let (address, stake_address) = match Address::from_string(address_str) { + Ok(Address::None) | Ok(Address::Stake(_)) => { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid address '{address_str}'"), + )); + } + Ok(Address::Byron(byron)) => (Address::Byron(byron), None), + Ok(Address::Shelley(shelley)) => { + let stake_addr = match shelley.stake_address_string() { + Ok(stake_addr) => stake_addr, + Err(e) => { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid address '{address_str}': {e}"), + )); + } + }; + + (Address::Shelley(shelley), stake_addr) + } + Err(e) => { + return Ok(RESTResponse::with_text( + 400, + &format!("Invalid address '{}': {e}", params[0]), + )); + } + }; + + let address_type = address.kind().to_string(); + let is_script = address.is_script(); + + let address_query_msg = Arc::new(Message::StateQuery(StateQuery::Addresses( + AddressStateQuery::GetAddressUTxOs { address }, + ))); + + let utxo_query_result = query_state( + &context, + &handlers_config.addresses_query_topic, + address_query_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::AddressUTxOs(utxo_identifiers), + )) => Ok(Some(utxo_identifiers)), + + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::NotFound, + )) => Ok(None), + + Message::StateQueryResponse(StateQueryResponse::Addresses( + AddressStateQueryResponse::Error(_), + )) => Err(anyhow::anyhow!("Address info storage disabled")), + + _ => Err(anyhow::anyhow!("Unexpected response")), + }, + ) + .await; + + let utxo_identifiers = match utxo_query_result { + Ok(Some(utxo_identifiers)) => utxo_identifiers, + Ok(None) => { + let rest_response = AddressInfoREST { + address: address_str.to_string(), + amount: Value { + lovelace: 0, + assets: Vec::new(), + } + .into(), + stake_address, + address_type, + script: is_script, + }; + + let json = serde_json::to_string_pretty(&rest_response) + .map_err(|e| anyhow::anyhow!("JSON serialization error: {e}"))?; + + return Ok(RESTResponse::with_json(200, &json)); + } + Err(e) => return Ok(RESTResponse::with_text(500, &format!("Query failed: {e}"))), + }; + + let utxos_query_msg = Arc::new(Message::StateQuery(StateQuery::UTxOs( + UTxOStateQuery::GetUTxOsSum { utxo_identifiers }, + ))); + + let address_balance = match query_state( + &context, + &handlers_config.utxos_query_topic, + utxos_query_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::UTxOsSum(balance), + )) => Ok(balance), + Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::NotFound, + )) => Err(anyhow::anyhow!("UTxOs not found")), + Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::Error(e), + )) => Err(anyhow::anyhow!(format!("UTxO query error: {e}"))), + _ => Err(anyhow::anyhow!("Unexpected response")), + }, + ) + .await + { + Ok(address_balance) => address_balance, + Err(e) => return Ok(RESTResponse::with_text(500, &format!("Query failed: {e}"))), + }; + + let rest_response = AddressInfoREST { + address: address_str.to_string(), + amount: address_balance.into(), + stake_address, + address_type, + script: is_script, + }; + + match serde_json::to_string_pretty(&rest_response) { + Ok(json) => Ok(RESTResponse::with_json(200, &json)), + Err(e) => Ok(RESTResponse::with_text( + 500, + &format!("Internal server error while retrieving address info: {e}"), + )), + } +} + +/// Handle `/addresses/{address}/extended` Blockfrost-compatible endpoint +pub async fn handle_address_extended_blockfrost( + _context: Arc>, + _params: Vec, + _handlers_config: Arc, +) -> Result { + Ok(RESTResponse::with_text(501, "Not implemented")) +} + +/// Handle `/addresses/{address}/totals` Blockfrost-compatible endpoint +pub async fn handle_address_totals_blockfrost( + _context: Arc>, + _params: Vec, + _handlers_config: Arc, +) -> Result { + Ok(RESTResponse::with_text(501, "Not implemented")) +} + +/// Handle `/addresses/{address}/utxos` Blockfrost-compatible endpoint +pub async fn handle_address_utxos_blockfrost( + _context: Arc>, + _params: Vec, + _handlers_config: Arc, +) -> Result { + Ok(RESTResponse::with_text(501, "Not implemented")) +} + +/// Handle `/addresses/{address}/utxos/{asset}` Blockfrost-compatible endpoint +pub async fn handle_address_asset_utxos_blockfrost( + _context: Arc>, + _params: Vec, + _handlers_config: Arc, +) -> Result { + Ok(RESTResponse::with_text(501, "Not implemented")) +} + +/// Handle `/addresses/{address}/transactions` Blockfrost-compatible endpoint +pub async fn handle_address_transactions_blockfrost( + _context: Arc>, + _params: Vec, + _handlers_config: Arc, +) -> Result { + Ok(RESTResponse::with_text(501, "Not implemented")) +} diff --git a/modules/rest_blockfrost/src/handlers/mod.rs b/modules/rest_blockfrost/src/handlers/mod.rs index 008683c4..aca50eb5 100644 --- a/modules/rest_blockfrost/src/handlers/mod.rs +++ b/modules/rest_blockfrost/src/handlers/mod.rs @@ -1,4 +1,5 @@ pub mod accounts; +pub mod addresses; pub mod assets; pub mod epochs; pub mod governance; diff --git a/modules/rest_blockfrost/src/handlers_config.rs b/modules/rest_blockfrost/src/handlers_config.rs index 53ea6b79..b281da95 100644 --- a/modules/rest_blockfrost/src/handlers_config.rs +++ b/modules/rest_blockfrost/src/handlers_config.rs @@ -2,12 +2,14 @@ use std::sync::Arc; use acropolis_common::queries::{ accounts::DEFAULT_ACCOUNTS_QUERY_TOPIC, + addresses::DEFAULT_ADDRESS_QUERY_TOPIC, assets::{DEFAULT_ASSETS_QUERY_TOPIC, DEFAULT_OFFCHAIN_TOKEN_REGISTRY_URL}, epochs::DEFAULT_EPOCHS_QUERY_TOPIC, governance::{DEFAULT_DREPS_QUERY_TOPIC, DEFAULT_GOVERNANCE_QUERY_TOPIC}, parameters::DEFAULT_PARAMETERS_QUERY_TOPIC, pools::DEFAULT_POOLS_QUERY_TOPIC, spdd::DEFAULT_SPDD_QUERY_TOPIC, + utxos::DEFAULT_UTXOS_QUERY_TOPIC, }; use config::Config; @@ -16,6 +18,7 @@ const DEFAULT_EXTERNAL_API_TIMEOUT: (&str, i64) = ("external_api_timeout", 3); / #[derive(Clone)] pub struct HandlersConfig { pub accounts_query_topic: String, + pub addresses_query_topic: String, pub assets_query_topic: String, pub pools_query_topic: String, pub dreps_query_topic: String, @@ -23,6 +26,7 @@ pub struct HandlersConfig { pub epochs_query_topic: String, pub spdd_query_topic: String, pub parameters_query_topic: String, + pub utxos_query_topic: String, pub external_api_timeout: u64, pub offchain_token_registry_url: String, } @@ -33,6 +37,10 @@ impl From> for HandlersConfig { .get_string(DEFAULT_ACCOUNTS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_ACCOUNTS_QUERY_TOPIC.1.to_string()); + let addresses_query_topic = config + .get_string(DEFAULT_ADDRESS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_ADDRESS_QUERY_TOPIC.1.to_string()); + let assets_query_topic = config .get_string(DEFAULT_ASSETS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_ASSETS_QUERY_TOPIC.1.to_string()); @@ -57,6 +65,10 @@ impl From> for HandlersConfig { .get_string(DEFAULT_PARAMETERS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_PARAMETERS_QUERY_TOPIC.1.to_string()); + let utxos_query_topic = config + .get_string(DEFAULT_UTXOS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_UTXOS_QUERY_TOPIC.1.to_string()); + let spdd_query_topic = config .get_string(DEFAULT_SPDD_QUERY_TOPIC.0) .unwrap_or(DEFAULT_SPDD_QUERY_TOPIC.1.to_string()); @@ -71,6 +83,7 @@ impl From> for HandlersConfig { Self { accounts_query_topic, + addresses_query_topic, assets_query_topic, pools_query_topic, dreps_query_topic, @@ -78,6 +91,7 @@ impl From> for HandlersConfig { epochs_query_topic, spdd_query_topic, parameters_query_topic, + utxos_query_topic, external_api_timeout, offchain_token_registry_url, } diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index 745bcaa5..84639b7b 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -17,9 +17,15 @@ mod types; mod utils; use handlers::{ accounts::handle_single_account_blockfrost, + addresses::{ + handle_address_asset_utxos_blockfrost, handle_address_extended_blockfrost, + handle_address_single_blockfrost, handle_address_totals_blockfrost, + handle_address_transactions_blockfrost, handle_address_utxos_blockfrost, + }, assets::{ handle_asset_addresses_blockfrost, handle_asset_history_blockfrost, - handle_asset_transactions_blockfrost, handle_assets_list_blockfrost, + handle_asset_single_blockfrost, handle_asset_transactions_blockfrost, + handle_assets_list_blockfrost, handle_policy_assets_blockfrost, }, epochs::{ handle_epoch_info_blockfrost, handle_epoch_next_blockfrost, handle_epoch_params_blockfrost, @@ -44,10 +50,7 @@ use handlers::{ }, }; -use crate::{ - handlers::assets::{handle_asset_single_blockfrost, handle_policy_assets_blockfrost}, - handlers_config::HandlersConfig, -}; +use crate::handlers_config::HandlersConfig; // Accounts topics const DEFAULT_HANDLE_SINGLE_ACCOUNT_TOPIC: (&str, &str) = @@ -153,10 +156,8 @@ const DEFAULT_HANDLE_EPOCH_POOL_BLOCKS_TOPIC: (&str, &str) = ( // Assets topics const DEFAULT_HANDLE_ASSETS_LIST_TOPIC: (&str, &str) = ("handle-topic-assets-list", "rest.get.assets"); -const DEFAULT_HANDLE_ASSET_SINGLE_TOPIC: (&str, &str) = ( - "handle-topic-policy-assets-asset-single", - "rest.get.assets.*", -); +const DEFAULT_HANDLE_ASSET_SINGLE_TOPIC: (&str, &str) = + ("handle-topic-asset-single", "rest.get.assets.*"); const DEFAULT_HANDLE_ASSET_HISTORY_TOPIC: (&str, &str) = ("handle-topic-asset-history", "rest.get.assets.*.history"); const DEFAULT_HANDLE_ASSET_TRANSACTIONS_TOPIC: (&str, &str) = ( @@ -170,6 +171,27 @@ const DEFAULT_HANDLE_ASSET_ADDRESSES_TOPIC: (&str, &str) = ( const DEFAULT_HANDLE_POLICY_ASSETS_TOPIC: (&str, &str) = ("handle-topic-policy-assets", "rest.get.assets.policy.*"); +// Addresses topics +const DEFAULT_HANDLE_ADDRESS_SINGLE_TOPIC: (&str, &str) = + ("handle-topic-address-single", "rest.get.addresses.*"); + +const DEFAULT_HANDLE_ADDRESS_EXTENDED_TOPIC: (&str, &str) = ( + "handle-topic-address-extended", + "rest.get.addresses.*.extended", +); +const DEFAULT_HANDLE_ADDRESS_TOTALS_TOPIC: (&str, &str) = + ("handle-topic-address-totals", "rest.get.addresses.*.total"); +const DEFAULT_HANDLE_ADDRESS_UTXOS_TOPIC: (&str, &str) = + ("handle-topic-address-utxos", "rest.get.addresses.*.utxos"); +const DEFAULT_HANDLE_ADDRESS_ASSET_UTXOS_TOPIC: (&str, &str) = ( + "handle-topic-address-asset-utxos", + "rest.get.addresses.*.utxos.*", +); +const DEFAULT_HANDLE_ADDRESS_TRANSACTIONS_TOPIC: (&str, &str) = ( + "handle-topic-address-transactions", + "rest.get.addresses.*.transactions", +); + #[module( message_type(Message), name = "rest-blockfrost", @@ -473,6 +495,54 @@ impl BlockfrostREST { handle_policy_assets_blockfrost, ); + // Handler for /addresses/{address} + register_handler( + context.clone(), + DEFAULT_HANDLE_ADDRESS_SINGLE_TOPIC, + handlers_config.clone(), + handle_address_single_blockfrost, + ); + + // Handler for /addresses/{address}/extended + register_handler( + context.clone(), + DEFAULT_HANDLE_ADDRESS_EXTENDED_TOPIC, + handlers_config.clone(), + handle_address_extended_blockfrost, + ); + + // Handler for /addresses/{address}/total + register_handler( + context.clone(), + DEFAULT_HANDLE_ADDRESS_TOTALS_TOPIC, + handlers_config.clone(), + handle_address_totals_blockfrost, + ); + + // Handler for /addresses/{address}/utxos + register_handler( + context.clone(), + DEFAULT_HANDLE_ADDRESS_UTXOS_TOPIC, + handlers_config.clone(), + handle_address_utxos_blockfrost, + ); + + // Handler for /addresses/{address}/utxos/{asset} + register_handler( + context.clone(), + DEFAULT_HANDLE_ADDRESS_ASSET_UTXOS_TOPIC, + handlers_config.clone(), + handle_address_asset_utxos_blockfrost, + ); + + // Handler for /addresses/{address}/transactions + register_handler( + context.clone(), + DEFAULT_HANDLE_ADDRESS_TRANSACTIONS_TOPIC, + handlers_config.clone(), + handle_address_transactions_blockfrost, + ); + Ok(()) } } diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 6730eaf7..542a5026 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -784,3 +784,48 @@ impl TryFrom<&AssetAddressEntry> for AssetAddressRest { }) } } + +#[derive(Serialize)] +pub struct AddressInfoREST { + pub address: String, + pub amount: AmountList, + pub stake_address: Option, + #[serde(rename = "type")] + pub address_type: String, + pub script: bool, +} + +#[derive(Serialize)] +pub struct AmountEntry { + unit: String, + quantity: String, +} + +#[derive(Serialize)] +pub struct AmountList(pub Vec); + +impl From for AmountList { + fn from(value: acropolis_common::Value) -> Self { + let mut out = Vec::new(); + + out.push(AmountEntry { + unit: "lovelace".to_string(), + quantity: value.coin().to_string(), + }); + + for (policy_id, assets) in value.assets { + for asset in assets { + out.push(AmountEntry { + unit: format!( + "{}{}", + hex::encode(&policy_id), + hex::encode(&asset.name.as_slice()) + ), + quantity: asset.amount.to_string(), + }); + } + } + + Self(out) + } +} diff --git a/modules/stake_delta_filter/src/utils.rs b/modules/stake_delta_filter/src/utils.rs index 334df1cd..8ad6ae24 100644 --- a/modules/stake_delta_filter/src/utils.rs +++ b/modules/stake_delta_filter/src/utils.rs @@ -277,13 +277,13 @@ impl Tracker { .map(|a| a.to_string()) .unwrap_or(Ok("(none)".to_owned())) .unwrap_or("(???)".to_owned()); - delta += event.address_delta.delta.lovelace; + delta += event.address_delta.value.lovelace; chunk.push(format!( " blk {}, {}: {} ({:?}) => {} ({:?})", event.block.number, src_addr, - event.address_delta.delta.lovelace, + event.address_delta.value.lovelace, event.address_delta.address, dst_addr, event.stake_address @@ -388,7 +388,7 @@ pub fn process_message( let stake_delta = StakeAddressDelta { address: stake_address, - delta: d.delta.lovelace, + delta: d.value.lovelace, }; result.deltas.push(stake_delta); } @@ -402,7 +402,7 @@ mod test { use acropolis_common::{ messages::AddressDeltasMessage, Address, AddressDelta, BlockHash, BlockInfo, BlockStatus, ByronAddress, Era, ShelleyAddress, ShelleyAddressDelegationPart, ShelleyAddressPaymentPart, - ShelleyAddressPointer, StakeAddress, StakeAddressPayload, ValueDelta, + ShelleyAddressPointer, StakeAddress, StakeAddressPayload, UTxOIdentifier, ValueDelta, }; use bech32::{Bech32, Hrp}; @@ -410,7 +410,8 @@ mod test { let a = pallas::ledger::addresses::Address::from_bech32(s)?; Ok(AddressDelta { address: map_address(&a)?, - delta: ValueDelta::new(1, Vec::new()), + utxo: UTxOIdentifier::new(0, 0, 0), + value: ValueDelta::new(1, Vec::new()), }) } diff --git a/modules/utxo_state/src/address_delta_publisher.rs b/modules/utxo_state/src/address_delta_publisher.rs index 68a33a1b..29a2b222 100644 --- a/modules/utxo_state/src/address_delta_publisher.rs +++ b/modules/utxo_state/src/address_delta_publisher.rs @@ -1,7 +1,7 @@ //! Address delta publisher for the UTXO state Acropolis module use acropolis_common::{ messages::{AddressDeltasMessage, CardanoMessage, Message}, - Address, AddressDelta, BlockInfo, ValueDelta, + AddressDelta, BlockInfo, }; use async_trait::async_trait; use caryatid_sdk::Context; @@ -44,12 +44,9 @@ impl AddressDeltaObserver for AddressDeltaPublisher { } /// Observe an address delta and publish messages - async fn observe_delta(&self, address: &Address, delta: ValueDelta) { + async fn observe_delta(&self, delta: &AddressDelta) { // Accumulate the delta - self.deltas.lock().await.push(AddressDelta { - address: address.clone(), - delta, - }); + self.deltas.lock().await.push(delta.clone()); } async fn finalise_block(&self, block: &BlockInfo) { diff --git a/modules/utxo_state/src/state.rs b/modules/utxo_state/src/state.rs index b3e03d55..611d43cd 100644 --- a/modules/utxo_state/src/state.rs +++ b/modules/utxo_state/src/state.rs @@ -4,7 +4,7 @@ use acropolis_common::{ messages::UTXODeltasMessage, params::SECURITY_PARAMETER_K, Address, BlockInfo, BlockStatus, Datum, TxInput, TxOutput, UTXODelta, }; -use acropolis_common::{UTxOIdentifier, Value, ValueDelta}; +use acropolis_common::{AddressDelta, UTxOIdentifier, Value, ValueDelta}; use anyhow::Result; use async_trait::async_trait; use std::collections::HashMap; @@ -33,7 +33,7 @@ pub trait AddressDeltaObserver: Send + Sync { async fn start_block(&self, block: &BlockInfo); /// Observe a delta - async fn observe_delta(&self, address: &Address, delta: ValueDelta); + async fn observe_delta(&self, address: &AddressDelta); /// Finalise a block async fn finalise_block(&self, block: &BlockInfo); @@ -94,6 +94,25 @@ impl State { } } + /// Get the total value of multiple utxos + pub async fn get_utxos_sum(&self, utxo_identifiers: &Vec) -> Result { + let mut balance = Value::new(0, Vec::new()); + for identifier in utxo_identifiers { + match self.lookup_utxo(&identifier).await { + Ok(Some(utxo)) => balance += &utxo.value, + Ok(None) => return Err(anyhow::anyhow!("UTxO {} does not exist", identifier)), + Err(e) => { + return Err(anyhow::anyhow!( + "Failed to look up UTxO {}: {}", + identifier, + e + )); + } + } + } + Ok(balance) + } + /// Register the delta observer pub fn register_address_delta_observer(&mut self, observer: Arc) { self.address_delta_observer = Some(observer); @@ -131,7 +150,11 @@ impl State { // Tell the observer to debit it if let Some(observer) = self.address_delta_observer.as_ref() { observer - .observe_delta(&utxo.address, -ValueDelta::from(&utxo.value)) + .observe_delta(&AddressDelta { + address: utxo.address.clone(), + utxo: key.clone(), + value: -ValueDelta::from(&utxo.value), + }) .await; } } @@ -145,7 +168,11 @@ impl State { // Tell the observer to recredit it if let Some(observer) = self.address_delta_observer.as_ref() { observer - .observe_delta(&utxo.address, ValueDelta::from(&utxo.value)) + .observe_delta(&AddressDelta { + address: utxo.address.clone(), + utxo: key.clone(), + value: ValueDelta::from(&utxo.value), + }) .await; } } @@ -194,8 +221,13 @@ impl State { } // Tell the observer it's spent - if let Some(observer) = self.address_delta_observer.as_ref() { - observer.observe_delta(&utxo.address, -ValueDelta::from(&utxo.value)).await; + if let Some(obs) = &self.address_delta_observer { + obs.observe_delta(&AddressDelta { + address: utxo.address.clone(), + utxo: key.clone(), + value: -ValueDelta::from(&utxo.value), + }) + .await; } match block.status { @@ -264,8 +296,13 @@ impl State { }; // Tell the observer - if let Some(observer) = self.address_delta_observer.as_ref() { - observer.observe_delta(&output.address, ValueDelta::from(&output.value)).await; + if let Some(obs) = &self.address_delta_observer { + obs.observe_delta(&AddressDelta { + address: output.address.clone(), + utxo: output.utxo_identifier.clone(), + value: ValueDelta::from(&output.value), + }) + .await; } Ok(()) @@ -738,18 +775,18 @@ mod tests { #[async_trait] impl AddressDeltaObserver for TestDeltaObserver { async fn start_block(&self, _block: &BlockInfo) {} - async fn observe_delta(&self, address: &Address, delta: ValueDelta) { + async fn observe_delta(&self, delta: &AddressDelta) { assert!(matches!( - &address, + &delta.address, Address::Byron(ByronAddress { payload }) if payload[0] == 99 )); - assert!(delta.lovelace == 42 || delta.lovelace == -42); + assert!(delta.value.lovelace == 42 || delta.value.lovelace == -42); let mut balance = self.balance.lock().await; - *balance += delta.lovelace; + *balance += delta.value.lovelace; let mut asset_balances = self.asset_balances.lock().await; - for (policy, assets) in &delta.assets { + for (policy, assets) in &delta.value.assets { assert_eq!([1u8; 28], *policy); for asset in assets { assert!( diff --git a/modules/utxo_state/src/utxo_state.rs b/modules/utxo_state/src/utxo_state.rs index 2c76a7c3..842f2458 100644 --- a/modules/utxo_state/src/utxo_state.rs +++ b/modules/utxo_state/src/utxo_state.rs @@ -1,7 +1,10 @@ //! Acropolis UTXO state module for Caryatid //! Accepts UTXO events and derives the current ledger state in memory -use acropolis_common::messages::{CardanoMessage, Message}; +use acropolis_common::{ + messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, + queries::utxos::{UTxOStateQuery, UTxOStateQueryResponse, DEFAULT_UTXOS_QUERY_TOPIC}, +}; use caryatid_sdk::{module, Context, Module}; use anyhow::{anyhow, Result}; @@ -32,8 +35,6 @@ mod fake_immutable_utxo_store; use fake_immutable_utxo_store::FakeImmutableUTXOStore; const DEFAULT_SUBSCRIBE_TOPIC: &str = "cardano.utxo.deltas"; -const DEFAULT_SINGLE_UTXO_TOPIC: (&str, &str) = ("handle-topic-single-utxo", "rest.get.utxos.*"); - const DEFAULT_STORE: &str = "memory"; /// UTXO state module @@ -52,10 +53,9 @@ impl UTXOState { config.get_string("subscribe-topic").unwrap_or(DEFAULT_SUBSCRIBE_TOPIC.to_string()); info!("Creating subscriber on '{subscribe_topic}'"); - let single_utxo_topic = config - .get_string(DEFAULT_SINGLE_UTXO_TOPIC.0) - .unwrap_or(DEFAULT_SINGLE_UTXO_TOPIC.1.to_string()); - info!("Creating REST handler on '{single_utxo_topic}'"); + let utxos_query_topic = config + .get_string(DEFAULT_UTXOS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_UTXOS_QUERY_TOPIC.1.to_string()); // Create store let store_type = config.get_string("store").unwrap_or(DEFAULT_STORE.to_string()); @@ -105,6 +105,32 @@ impl UTXOState { } }); + // Query handler + let state_query = state.clone(); + context.handle(&utxos_query_topic, move |message| { + let state_mutex = state_query.clone(); + async move { + let Message::StateQuery(StateQuery::UTxOs(query)) = message.as_ref() else { + return Arc::new(Message::StateQueryResponse(StateQueryResponse::UTxOs( + UTxOStateQueryResponse::Error("Invalid message for utxo-state".into()), + ))); + }; + + let state = state_mutex.lock().await; + let response = match query { + UTxOStateQuery::GetUTxOsSum { utxo_identifiers } => { + match state.get_utxos_sum(utxo_identifiers).await { + Ok(balance) => UTxOStateQueryResponse::UTxOsSum(balance), + Err(e) => UTxOStateQueryResponse::Error(e.to_string()), + } + } + }; + Arc::new(Message::StateQueryResponse(StateQueryResponse::UTxOs( + response, + ))) + } + }); + // Ticker to log stats and prune state let state2 = state.clone(); let mut subscription = context.subscribe("clock.tick").await?; diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 52a81cf6..16a6f940 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -26,6 +26,7 @@ acropolis_module_rest_blockfrost = { path = "../../modules/rest_blockfrost" } acropolis_module_spdd_state = { path = "../../modules/spdd_state" } acropolis_module_drdd_state = { path = "../../modules/drdd_state" } acropolis_module_assets_state = { path = "../../modules/assets_state" } +acropolis_module_address_state = { path = "../../modules/address_state" } caryatid_process = { workspace = true } caryatid_sdk = { workspace = true } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 4fa7998e..36780058 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -102,6 +102,15 @@ store-addresses = false # Enables /assets/{asset} endpoint (requires store-assets to be enabled) index-by-policy = false +[module.address-state] +# Enables /addresses/{address}, /addresses/{address}/extended, +# /addresses/{address}/utxos/{asset}, and /addresses/{address}/utxos endpoints +store-info = false +# Enables /addresses/{address}/totals endpoint +store-totals = false +# Enables /addresses/{address}/transactions endpoint +store-transactions = false + [module.clock] [module.rest-server] diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index ecbebffe..e4b856ee 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -10,6 +10,7 @@ use tracing_subscriber; // External modules use acropolis_module_accounts_state::AccountsState; +use acropolis_module_address_state::AddressState; use acropolis_module_assets_state::AssetsState; use acropolis_module_block_unpacker::BlockUnpacker; use acropolis_module_drdd_state::DRDDState; @@ -99,6 +100,7 @@ pub async fn main() -> Result<()> { StakeDeltaFilter::register(&mut process); EpochsState::register(&mut process); AccountsState::register(&mut process); + AddressState::register(&mut process); AssetsState::register(&mut process); BlockfrostREST::register(&mut process); SPDDState::register(&mut process);