diff --git a/Cargo.lock b/Cargo.lock index 13a4412796c..d8d2558e855 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom 0.2.15", "once_cell", "version_check", "zerocopy 0.7.35", @@ -681,6 +682,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -1937,6 +1947,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -2162,7 +2181,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "18e4fdb82bd54a12e42fb58a800dcae6b9e13982238ce2296dc3570b92148e1f" dependencies = [ "data-encoding", - "syn 1.0.109", + "syn 2.0.100", ] [[package]] @@ -2454,6 +2473,12 @@ dependencies = [ "validator_store", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -2586,6 +2611,9 @@ name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +dependencies = [ + "serde", +] [[package]] name = "ekzg-bls12-381" @@ -2821,6 +2849,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + [[package]] name = "eth-keystore" version = "0.5.0" @@ -3661,6 +3700,17 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot 0.12.3", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -4084,6 +4134,9 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +dependencies = [ + "unicode-segmentation", +] [[package]] name = "heck" @@ -5534,6 +5587,7 @@ checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ "bitflags 2.9.0", "libc", + "redox_syscall 0.5.10", ] [[package]] @@ -5894,6 +5948,16 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest 0.10.7", +] + [[package]] name = "mdbx-sys" version = "0.11.6-4" @@ -6561,9 +6625,9 @@ dependencies = [ [[package]] name = "once_cell" -version = "1.21.0" +version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde51589ab56b20a6f686b2c68f7a0bd6add753d697abf720d63f8db3ab7b1ad" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" dependencies = [ "critical-section", "portable-atomic", @@ -8920,6 +8984,141 @@ dependencies = [ "der 0.7.9", ] +[[package]] +name = "sqlformat" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" +dependencies = [ + "nom", + "unicode_categories", +] + +[[package]] +name = "sqlx" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9a2ccff1a000a5a59cd33da541d9f2fdcd9e6e8229cc200565942bff36d0aaa" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-postgres", +] + +[[package]] +name = "sqlx-core" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24ba59a9342a3d9bab6c56c118be528b27c9b60e490080e9711a04dccac83ef6" +dependencies = [ + "ahash", + "atoi", + "byteorder", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener 2.5.3", + "futures-channel", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashlink 0.8.4", + "hex", + "indexmap 2.8.0", + "log", + "memchr", + "once_cell", + "paste", + "percent-encoding", + "serde", + "serde_json", + "sha2 0.10.8", + "smallvec", + "sqlformat", + "thiserror 1.0.69", + "tokio", + "tokio-stream", + "tracing", + "url", +] + +[[package]] +name = "sqlx-macros" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ea40e2345eb2faa9e1e5e326db8c34711317d2b5e08d0d5741619048a803127" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn 1.0.109", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5833ef53aaa16d860e92123292f1f6a3d53c34ba8b1969f152ef1a7bb803f3c8" +dependencies = [ + "dotenvy", + "either", + "heck 0.4.1", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2 0.10.8", + "sqlx-core", + "sqlx-postgres", + "syn 1.0.109", + "tempfile", + "tokio", + "url", +] + +[[package]] +name = "sqlx-postgres" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c824eb80b894f926f89a0b9da0c7f435d27cdd35b8c655b114e58223918577e" +dependencies = [ + "atoi", + "base64 0.21.7", + "bitflags 2.9.0", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "hex", + "hkdf", + "hmac 0.12.1", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.5", + "serde", + "serde_json", + "sha2 0.10.8", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 1.0.69", + "tracing", + "whoami", +] + [[package]] name = "ssz_types" version = "0.11.0" @@ -8999,22 +9198,26 @@ dependencies = [ "directory", "ethereum_ssz", "ethereum_ssz_derive", + "heck 0.4.1", "itertools 0.10.5", "leveldb", "logging", "lru", "metrics", + "once_cell", "parking_lot 0.12.3", "rand 0.9.0", "redb", "safe_arith", "serde", "smallvec", + "sqlx", "ssz_types", "state_processing", "strum", "superstruct", "tempfile", + "tokio", "tracing", "tracing-subscriber", "types", @@ -9022,6 +9225,17 @@ dependencies = [ "zstd 0.13.3", ] +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.10.0" @@ -9977,6 +10191,12 @@ version = "2.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.18" @@ -9992,6 +10212,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" + [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -10004,6 +10230,12 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "unicode_categories" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" + [[package]] name = "universal-hash" version = "0.5.1" @@ -10405,6 +10637,12 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.100" @@ -10572,6 +10810,16 @@ dependencies = [ "rustix 0.38.44", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", +] + [[package]] name = "widestring" version = "0.4.3" diff --git a/Makefile b/Makefile index b9f93942f6f..5cfc803391a 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ BUILD_PATH_RISCV64 = "target/$(RISCV64_TAG)/release" PINNED_NIGHTLY ?= nightly # List of features to use when cross-compiling. Can be overridden via the environment. -CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,slasher-redb,beacon-node-leveldb,beacon-node-redb +CROSS_FEATURES ?= gnosis,slasher-lmdb,slasher-mdbx,slasher-redb,beacon-node-leveldb,beacon-node-redb,beacon-node-postgres # Cargo profile for Cross builds. Default is for local builds, CI uses an override. CROSS_PROFILE ?= release @@ -264,7 +264,7 @@ lint-fix: # Also run the lints on the optimized-only tests lint-full: - TEST_FEATURES="beacon-node-leveldb,beacon-node-redb,${TEST_FEATURES}" RUSTFLAGS="-C debug-assertions=no $(RUSTFLAGS)" $(MAKE) lint + TEST_FEATURES="beacon-node-leveldb,beacon-node-redb,beacon-node-postgres,${TEST_FEATURES}" RUSTFLAGS="-C debug-assertions=no $(RUSTFLAGS)" $(MAKE) lint # Runs the makefile in the `ef_tests` repo. # diff --git a/beacon_node/store/Cargo.toml b/beacon_node/store/Cargo.toml index 61a8474a731..4963d03dd80 100644 --- a/beacon_node/store/Cargo.toml +++ b/beacon_node/store/Cargo.toml @@ -5,9 +5,10 @@ authors = ["Paul Hauner "] edition = { workspace = true } [features] -default = ["leveldb"] +default = ["postgres"] leveldb = ["dep:leveldb"] redb = ["dep:redb"] +postgres = ["dep:sqlx", "dep:tokio"] [dependencies] bls = { workspace = true } @@ -15,13 +16,17 @@ db-key = "0.0.5" directory = { workspace = true } ethereum_ssz = { workspace = true } ethereum_ssz_derive = { workspace = true } +heck = "0.4" itertools = { workspace = true } leveldb = { version = "0.8.6", optional = true, default-features = false } logging = { workspace = true } lru = { workspace = true } metrics = { workspace = true } +once_cell = "1.21.3" parking_lot = { workspace = true } redb = { version = "2.1.3", optional = true } +tokio = { workspace = true, optional = true } +sqlx = { version = "0.7", optional = true, default-features = false, features = ["runtime-tokio", "postgres"] } safe_arith = { workspace = true } serde = { workspace = true } smallvec = { workspace = true } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index c0f15f2417b..3d01ab76106 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -11,10 +11,13 @@ use types::EthSpec; use types::non_zero_usize::new_non_zero_usize; use zstd::{Decoder, Encoder}; -#[cfg(all(feature = "redb", not(feature = "leveldb")))] +#[cfg(all(feature = "redb", not(feature = "leveldb"), not(feature = "postgres")))] pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Redb; -#[cfg(feature = "leveldb")] + +#[cfg(all(feature = "leveldb", not(feature = "postgres"), not(feature = "redb")))] pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::LevelDb; +#[cfg(feature = "postgres")] +pub const DEFAULT_BACKEND: DatabaseBackend = DatabaseBackend::Postgres; pub const PREV_DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 2048; pub const DEFAULT_SLOTS_PER_RESTORE_POINT: u64 = 8192; @@ -275,4 +278,6 @@ pub enum DatabaseBackend { LevelDb, #[cfg(feature = "redb")] Redb, + #[cfg(feature = "postgres")] + Postgres, } diff --git a/beacon_node/store/src/database.rs b/beacon_node/store/src/database.rs index 2232f73c5cc..244ab6fb4a3 100644 --- a/beacon_node/store/src/database.rs +++ b/beacon_node/store/src/database.rs @@ -1,5 +1,7 @@ pub mod interface; #[cfg(feature = "leveldb")] pub mod leveldb_impl; +#[cfg(feature = "postgres")] +pub mod postgres_impl; #[cfg(feature = "redb")] pub mod redb_impl; diff --git a/beacon_node/store/src/database/interface.rs b/beacon_node/store/src/database/interface.rs index 5646f1179c8..2690214d911 100644 --- a/beacon_node/store/src/database/interface.rs +++ b/beacon_node/store/src/database/interface.rs @@ -1,5 +1,7 @@ #[cfg(feature = "leveldb")] use crate::database::leveldb_impl; +#[cfg(feature = "postgres")] +use crate::database::postgres_impl::{self, PostgresDB}; #[cfg(feature = "redb")] use crate::database::redb_impl; use crate::{ColumnIter, ColumnKeyIter, DBColumn, Error, ItemStore, Key, KeyValueStore, metrics}; @@ -13,6 +15,8 @@ pub enum BeaconNodeBackend { LevelDb(leveldb_impl::LevelDB), #[cfg(feature = "redb")] Redb(redb_impl::Redb), + #[cfg(feature = "postgres")] + Postgres(PostgresDB), } impl ItemStore for BeaconNodeBackend {} @@ -24,6 +28,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::get_bytes(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::get_bytes(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => { + postgres_impl::PostgresDB::get_bytes(txn, column, key) + } } } @@ -45,6 +53,10 @@ impl KeyValueStore for BeaconNodeBackend { value, txn.write_options(), ), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => { + postgres_impl::PostgresDB::put_bytes(txn, column, key, value) + } } } @@ -66,6 +78,10 @@ impl KeyValueStore for BeaconNodeBackend { value, txn.write_options_sync(), ), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => { + postgres_impl::PostgresDB::put_bytes(txn, column, key, value) + } } } @@ -75,6 +91,8 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::sync(txn), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::sync(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(_) => Ok(()), } } @@ -84,6 +102,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_exists(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_exists(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => { + postgres_impl::PostgresDB::key_exists(txn, column, key) + } } } @@ -93,6 +115,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::key_delete(txn, column, key), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::key_delete(txn, column, key), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => { + postgres_impl::PostgresDB::key_delete(txn, column, key) + } } } @@ -102,6 +128,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::do_atomically(txn, batch), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::do_atomically(txn, batch), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => { + postgres_impl::PostgresDB::do_atomically(txn, batch) + } } } @@ -111,6 +141,8 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact(txn), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => postgres_impl::PostgresDB::compact(txn), } } @@ -128,6 +160,10 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::Redb(txn) => { redb_impl::Redb::iter_column_keys_from(txn, _column, from) } + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => { + postgres_impl::PostgresDB::iter_column_keys_from(txn, _column, from) + } } } @@ -137,6 +173,8 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::iter_column_keys(txn, column), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_keys(txn, column), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => postgres_impl::PostgresDB::iter_column_keys(txn, column) } } @@ -148,6 +186,8 @@ impl KeyValueStore for BeaconNodeBackend { } #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::iter_column_from(txn, column, from), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => postgres_impl::PostgresDB::iter_column_from(txn, column, from) } } @@ -157,6 +197,8 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::compact_column(txn, _column), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::compact(txn), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => postgres_impl::PostgresDB::compact_column(txn, _column) } } @@ -166,6 +208,8 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::delete_batch(txn, col, ops), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_batch(txn, col, ops), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => postgres_impl::PostgresDB::delete_batch(txn, col, ops) } } @@ -179,6 +223,8 @@ impl KeyValueStore for BeaconNodeBackend { BeaconNodeBackend::LevelDb(txn) => leveldb_impl::LevelDB::delete_if(txn, column, f), #[cfg(feature = "redb")] BeaconNodeBackend::Redb(txn) => redb_impl::Redb::delete_if(txn, column, f), + #[cfg(feature = "postgres")] + BeaconNodeBackend::Postgres(txn) => postgres_impl::PostgresDB::delete_if(txn, column, f) } } } @@ -193,6 +239,10 @@ impl BeaconNodeBackend { } #[cfg(feature = "redb")] DatabaseBackend::Redb => redb_impl::Redb::open(path).map(BeaconNodeBackend::Redb), + #[cfg(feature = "postgres")] + DatabaseBackend::Postgres => { + postgres_impl::PostgresDB::open(path).map(BeaconNodeBackend::Postgres) + } } } } diff --git a/beacon_node/store/src/database/postgres_impl.rs b/beacon_node/store/src/database/postgres_impl.rs new file mode 100644 index 00000000000..cfc80ca60c4 --- /dev/null +++ b/beacon_node/store/src/database/postgres_impl.rs @@ -0,0 +1,371 @@ +use crate::{ColumnIter, ColumnKeyIter, DBColumn, Error, Key, KeyValueStoreOp}; +use heck::ToSnakeCase; +use once_cell::sync::Lazy; +use sqlx::postgres::PgPoolOptions; +use sqlx::{PgPool, Row}; +use std::collections::HashSet; +use std::future::Future; +use std::iter::once; +use std::marker::PhantomData; +use std::path::Path; +use std::time::Duration; +use strum::IntoEnumIterator; +use tokio::runtime::{Handle, Runtime}; +use types::EthSpec; + +static GLOBAL_RT: Lazy = + Lazy::new(|| Runtime::new().expect("Failed to create global tokio runtime for PostgresDB")); + +pub struct PostgresDB { + pool: PgPool, + _phantom: PhantomData, +} + +impl PostgresDB { + pub fn open(_path: &Path) -> Result { + let url = "postgres://postgres:admin@localhost:5432/store"; + + let pool = block_on_in_runtime(async { + PgPoolOptions::new() + .max_connections(50) + .acquire_timeout(Duration::from_secs(30)) + .connect(url) + .await + .map_err(|e| Error::DBError { + message: format!("Failed to connect to Postgres: {:?}", e), + }) + })??; + + Self::create_tables(&pool)?; + Ok(Self { + pool, + _phantom: PhantomData, + }) + } + + fn create_tables(pool: &PgPool) -> Result<(), Error> { + block_on_in_runtime(async { + for column in DBColumn::iter() { + let table = table_name_for_column(column); + let q = format!( + "CREATE TABLE IF NOT EXISTS {} (key BYTEA PRIMARY KEY, value BYTEA NOT NULL)", + table + ); + sqlx::query(&q).execute(pool).await?; + } + Ok::<_, sqlx::Error>(()) + })? + .map_err(|e| Error::DBError { + message: e.to_string(), + }) + } + + pub fn get_bytes(&self, column: DBColumn, key: &[u8]) -> Result>, Error> { + let table = table_name_for_column(column); + let query = format!("SELECT value FROM {} WHERE key = $1", table); + + let row = block_on_in_runtime(async { + sqlx::query(&query) + .bind(key) + .fetch_optional(&self.pool) + .await + })? + .map_err(|e| Error::DBError { + message: format!("{:?}", e), + })?; + + Ok(row.map(|r| r.get::, _>("value"))) + } + + pub fn put_bytes(&self, column: DBColumn, key: &[u8], value: &[u8]) -> Result<(), Error> { + let table = table_name_for_column(column); + let query = format!( + "INSERT INTO {} (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", + table + ); + + block_on_in_runtime(async { + sqlx::query(&query) + .bind(key) + .bind(value) + .execute(&self.pool) + .await + })? + .map(|_| ()) + .map_err(|e| Error::DBError { + message: format!("{:?}", e), + }) + } + + pub fn key_exists(&self, column: DBColumn, key: &[u8]) -> Result { + let table = table_name_for_column(column); + let query = format!("SELECT EXISTS(SELECT 1 FROM {} WHERE key = $1)", table); + + block_on_in_runtime(async { + sqlx::query_scalar::<_, bool>(&query) + .bind(key) + .fetch_one(&self.pool) + .await + })? + .map_err(|e| Error::DBError { + message: e.to_string(), + }) + } + + pub fn key_delete(&self, column: DBColumn, key: &[u8]) -> Result<(), Error> { + let table = table_name_for_column(column); + let query = format!("DELETE FROM {} WHERE key = $1", table); + + block_on_in_runtime(async { sqlx::query(&query).bind(key).execute(&self.pool).await }) + .map(|_| ()) + .map_err(|e| Error::DBError { + message: format!("{:?}", e), + }) + } + + pub fn do_atomically(&self, ops: Vec) -> Result<(), Error> { + block_on_in_runtime(async { + let mut tx = self.pool.begin().await.map_err(|e| Error::DBError { + message: format!("{:?}", e), + })?; + + for op in ops { + match op { + KeyValueStoreOp::PutKeyValue(col, key, value) => { + let table = table_name_for_column(col); + let q = format!( + "INSERT INTO {} (key, value) VALUES ($1, $2) + ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value", + table + ); + sqlx::query(&q) + .bind(&key) + .bind(&value) + .execute(&mut *tx) + .await + .map_err(|e| Error::DBError { + message: format!("{:?}", e), + })?; + } + KeyValueStoreOp::DeleteKey(col, key) => { + let table = table_name_for_column(col); + let q = format!("DELETE FROM {} WHERE key = $1", table); + sqlx::query(&q) + .bind(&key) + .execute(&mut *tx) + .await + .map_err(|e| Error::DBError { + message: format!("{:?}", e), + })?; + } + } + } + + tx.commit().await.map_err(|e| Error::DBError { + message: format!("{:?}", e), + })?; + + Ok::<(), Error>(()) + })? + } + + pub fn iter_column_from( + &self, + column: DBColumn, + from: &[u8] + ) -> ColumnIter<'_, K> { + let table = table_name_for_column(column); + let query = format!("SELECT Key, value FROM {} WHERE >= $1 ORDER BY key ASC", table); + + let row_results: Result, Vec)>, Error> = block_on_in_runtime(async { + let rows = sqlx::query(&query) + .bind(from) + .fetch_all(&self.pool) + .await + .map_err(|e| Error::DBError { + message: format!("{:?}", e), + })?; + + Ok(rows + .into_iter() + .map(|row| { + let k: Vec = row.get("key"); + let v: Vec = row.get("value"); + Ok((k, v)) + }) + .collect::, Error>>()?) + }).and_then(|r| r); + + match row_results { + Ok(rows) => { + let iter = rows.into_iter().map(|(k_bytes, v)| { + let k = K::from_bytes(&k_bytes)?; + Ok((k, v)) + }); + Box::new(iter) + } + Err(e) => Box::new(once(Err(e))) + } + } + + pub fn iter_column_keys_from( + &self, + column: DBColumn, + from: &[u8], + ) -> ColumnKeyIter<'_, K> { + let table = table_name_for_column(column); + let query = format!("SELECT key FROM {} WHERE key >= $1 ORDER BY key ASC", table); + + let rows_result: Result>, Error> = block_on_in_runtime(async { + let rows = sqlx::query(&query) + .bind(from) + .fetch_all(&self.pool) + .await + .map_err(|e| Error::DBError { message: format!("{:?}", e) })?; + + Ok(rows + .into_iter() + .map(|row| { + let k: Vec = row.get("key"); + Ok(k) + }) + .collect::, Error>>()?) + }).and_then(|r| r); + + match rows_result { + Ok(keys) => { + let iter = keys.into_iter().map(|k_bytes| { + let k = K::from_bytes(&k_bytes)?; + Ok(k) + + }); + Box::new(iter) + } + Err(e) => Box::new(once(Err(e))) + } + } + + pub fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter<'_, K> { + let table = table_name_for_column(column); + let query = format!("SELECT key FROM {} ORDER BY key ASC", table); + + let rows_result: Result>, Error> = block_on_in_runtime(async { + let rows = sqlx::query(&query) + .fetch_all(&self.pool) + .await + .map_err(|e| Error::DBError { message: format!("{:?}", e) })?; + + Ok(rows + .into_iter() + .map(|row| { + let k: Vec = row.get("key"); + Ok(k) + }) + .collect::, Error>>()?) + }).and_then(|r| r); + + match rows_result { + Ok(keys) => { + let iter = keys.into_iter().map(|k_bytes| { + let k = K::from_bytes(&k_bytes)?; + Ok(k) + }); + Box::new(iter) + } + Err(e) => Box::new(once(Err(e))) + } + } + + pub fn compact(&self) -> Result<(), Error> { + block_on_in_runtime(async { + for column in DBColumn::iter() { + let table = table_name_for_column(column); + let query = format!("VACUUM ANALYZE {}", table); + + sqlx::query(&query) + .execute(&self.pool) + .await + .map_err(|e| Error::DBError { + message: format!("Failed to vacuum {}: {:?}", table, e), + })?; + } + + Ok(()) + })? + } + + pub fn compact_column(&self, column: DBColumn) -> Result<(), Error> { + let table = table_name_for_column(column); + let query = format!("VACUUM ANALYZE {}", table); + + block_on_in_runtime(async { + sqlx::query(&query) + .execute(&self.pool) + .await + .map_err(|e| Error::DBError { + message: format!("Failed to vacuum {}: {:?}", table, e), + })?; + Ok(()) + })? + } + + pub fn delete_batch(&self, column: DBColumn, ops: HashSet<&[u8]>) -> Result<(), Error> { + block_on_in_runtime(async { + let mut tx = self.pool.begin().await.map_err(|e| Error::DBError { + message: format!("{:?}", e), + })?; + + let table = table_name_for_column(column); + let q = format!("DELETE FROM {} WHERE key = $1", table); + + for key in ops { + sqlx::query(&q) + .bind(key) + .execute(&mut *tx) + .await + .map_err(|e| Error::DBError { message: format!("{:?}", e) })?; + } + + tx.commit().await.map_err(|e| Error::DBError { message: format!("{:?}", e) }) + })? + } + + pub fn delete_if(&self, column: DBColumn, mut f: impl FnMut(&[u8]) -> Result) -> Result<(), Error> { + let keys: Vec> = { + let table = table_name_for_column(column); + let query = format!("SELECT key FROM {} ORDER BY key ASC", table); + + block_on_in_runtime(async { + let rows = sqlx::query(&query) + .fetch_all(&self.pool) + .await + .map_err(|e| Error::DBError { message: format!("Failed to fetch key: {:?}", e) })?; + + Ok::>, Error>( + rows.into_iter() + .map(|r| r.get("key")) + .collect::>>() + ) + })?? + }; + + for key in keys { + if f(&key)? { + self.key_delete(column, &key)?; + } + } + Ok(()) + } + +} + +fn table_name_for_column(column: DBColumn) -> String { + format!("{:?}", column).to_snake_case() +} + +fn block_on_in_runtime(fut: F) -> Result { + match Handle::try_current() { + Ok(handle) => Ok(tokio::task::block_in_place(|| handle.block_on(fut))), + Err(_) => Ok(GLOBAL_RT.block_on(fut)), + } +} diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index cb332546f94..e266840cb79 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -62,7 +62,7 @@ pub struct DatabaseManager { value_name = "DATABASE", help = "Set the database backend to be used by the beacon node.", display_order = 0, - default_value_t = store::config::DatabaseBackend::LevelDb + default_value_t = store::config::DatabaseBackend::Postgres )] pub backend: store::config::DatabaseBackend, diff --git a/lighthouse/Cargo.toml b/lighthouse/Cargo.toml index ef680c9b969..4d2f50119cf 100644 --- a/lighthouse/Cargo.toml +++ b/lighthouse/Cargo.toml @@ -12,7 +12,7 @@ rust-version = "1.88.0" normal = ["target_check"] [features] -default = ["slasher-lmdb", "beacon-node-leveldb"] +default = ["slasher-lmdb", "beacon-node-postgres"] # Writes debugging .ssz files to /tmp during block processing. write_ssz_files = ["beacon_node/write_ssz_files"] # Compiles the BLS crypto code so that the binary is portable across machines. @@ -33,6 +33,8 @@ slasher-redb = ["slasher/redb"] beacon-node-leveldb = ["store/leveldb"] # Supports beacon node redb backend. beacon-node-redb = ["store/redb"] +# Supports beacon node postgres backend +beacon-node-postgres = ["store/postgres"] # Supports console subscriber for debugging console-subscriber = ["console-subscriber/default"] # Force the use of the system memory allocator rather than jemalloc.