diff --git a/Cargo.lock b/Cargo.lock index 421ae65f53f..25cd950dc2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,21 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "addr2line" -version = "0.24.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" -dependencies = [ - "gimli", -] - -[[package]] -name = "adler2" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" - [[package]] name = "aead" version = "0.5.2" @@ -407,21 +392,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "backtrace" -version = "0.3.74" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" -dependencies = [ - "addr2line", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", - "windows-targets 0.52.6", -] - [[package]] name = "base-x" version = "0.2.11" @@ -469,9 +439,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.9.0" +version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" +checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" [[package]] name = "blake2" @@ -500,6 +470,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block2" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c132eebf10f5cad5289222520a4a058514204aed6d791f1cf4fe8088b82d15f" +dependencies = [ + "objc2", +] + [[package]] name = "browser-webrtc-example" version = "0.1.0" @@ -735,6 +714,26 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f46ad14479a25103f283c0f10005961cf086d8dc42205bb44c46ac563475dca6" +[[package]] +name = "cmake" +version = "0.1.54" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7caa3f9de89ddbe2c607f4101924c5abec803763ae9534e4f4d7d8f84aa81f0" +dependencies = [ + "cc", +] + +[[package]] +name = "codespan-reporting" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af491d569909a7e4dee0ad7db7f5341fef5c614d5b8ec8cf765732aba3cff681" +dependencies = [ + "serde", + "termcolor", + "unicode-width", +] + [[package]] name = "colorchoice" version = "1.0.3" @@ -983,6 +982,68 @@ dependencies = [ "syn", ] +[[package]] +name = "cxx" +version = "1.0.187" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8465678d499296e2cbf9d3acf14307458fd69b471a31b65b3c519efe8b5e187" +dependencies = [ + "cc", + "cxx-build", + "cxxbridge-cmd", + "cxxbridge-flags", + "cxxbridge-macro", + "foldhash 0.2.0", + "link-cplusplus", +] + +[[package]] +name = "cxx-build" +version = "1.0.187" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d74b6bcf49ebbd91f1b1875b706ea46545032a14003b5557b7dfa4bbeba6766e" +dependencies = [ + "cc", + "codespan-reporting", + "indexmap 2.9.0", + "proc-macro2", + "quote", + "scratch", + "syn", +] + +[[package]] +name = "cxxbridge-cmd" +version = "1.0.187" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94ca2ad69673c4b35585edfa379617ac364bccd0ba0adf319811ba3a74ffa48a" +dependencies = [ + "clap", + "codespan-reporting", + "indexmap 2.9.0", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "cxxbridge-flags" +version = "1.0.187" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29b52102aa395386d77d322b3a0522f2035e716171c2c60aa87cc5e9466e523" + +[[package]] +name = "cxxbridge-macro" +version = "1.0.187" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8ebf0b6138325af3ec73324cb3a48b64d57721f17291b151206782e61f66cd" +dependencies = [ + "indexmap 2.9.0", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "data-encoding" version = "2.8.0" @@ -1332,6 +1393,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foldhash" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1571,12 +1638,6 @@ dependencies = [ "polyval", ] -[[package]] -name = "gimli" -version = "0.31.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" - [[package]] name = "glob" version = "0.3.2" @@ -1660,7 +1721,7 @@ version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ - "foldhash", + "foldhash 0.1.5", ] [[package]] @@ -1803,7 +1864,7 @@ checksum = "a56f203cd1c76362b69e3863fd987520ac36cf70a8c92627449b2f64a8cf7d65" dependencies = [ "cfg-if", "libc", - "windows-link", + "windows-link 0.1.1", ] [[package]] @@ -2423,6 +2484,7 @@ dependencies = [ "getrandom 0.2.15", "libp2p-allow-block-list", "libp2p-autonat", + "libp2p-bluetooth", "libp2p-connection-limits", "libp2p-core", "libp2p-dcutr", @@ -2501,6 +2563,30 @@ dependencies = [ "web-time 1.1.0", ] +[[package]] +name = "libp2p-bluetooth" +version = "0.1.0" +dependencies = [ + "async-trait", + "block2", + "bytes", + "fnv", + "futures", + "libp2p-core", + "log", + "multiaddr", + "objc2", + "objc2-core-bluetooth", + "objc2-foundation", + "parking_lot", + "rand 0.8.5", + "rw-stream-sink", + "simplersble", + "tokio", + "tokio-stream", + "uuid", +] + [[package]] name = "libp2p-connection-limits" version = "0.6.0" @@ -3332,10 +3418,19 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "libc", ] +[[package]] +name = "link-cplusplus" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f78c730aaa7d0b9336a299029ea49f9ee53b0ed06e9202e8cb7db9bae7b8c82" +dependencies = [ + "cc", +] + [[package]] name = "linux-raw-sys" version = "0.4.15" @@ -3482,15 +3577,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" -[[package]] -name = "miniz_oxide" -version = "0.8.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3be647b768db090acb35d5ec5db2b0e1f1de11133ca123b9eacf5137868f892a" -dependencies = [ - "adler2", -] - [[package]] name = "mio" version = "1.0.3" @@ -3756,12 +3842,48 @@ dependencies = [ ] [[package]] -name = "object" -version = "0.36.7" +name = "objc-sys" +version = "0.3.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +checksum = "cdb91bdd390c7ce1a8607f35f3ca7151b65afc0ff5ff3b34fa350f7d7c7e4310" + +[[package]] +name = "objc2" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46a785d4eeff09c14c487497c162e92766fbb3e4059a71840cecc03d9a50b804" dependencies = [ - "memchr", + "objc-sys", + "objc2-encode", +] + +[[package]] +name = "objc2-core-bluetooth" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a644b62ffb826a5277f536cf0f701493de420b13d40e700c452c36567771111" +dependencies = [ + "bitflags 2.10.0", + "objc2", + "objc2-foundation", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + +[[package]] +name = "objc2-foundation" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee638a5da3799329310ad4cfa62fbf045d5f56e3ef5ba4149e7452dcf89d5a8" +dependencies = [ + "bitflags 2.10.0", + "block2", + "libc", + "objc2", ] [[package]] @@ -3810,7 +3932,7 @@ version = "0.10.72" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fedfea7d58a1f73118430a55da6a286e7b044961736ce96a16a17068ea25e5da" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "cfg-if", "foreign-types", "libc", @@ -4575,7 +4697,7 @@ version = "0.5.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2f103c6d277498fbceb16e84d317e2a400f160f46904d5f5410848c829511a3" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", ] [[package]] @@ -4833,12 +4955,6 @@ dependencies = [ "walkdir", ] -[[package]] -name = "rustc-demangle" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" - [[package]] name = "rustc-hash" version = "2.1.1" @@ -4869,7 +4985,7 @@ version = "0.38.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.4.15", @@ -4882,7 +4998,7 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d97817398dd4bb2e6da002002db259209759911da105da92bec29ccb12cf58bf" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "errno", "libc", "linux-raw-sys 0.9.4", @@ -4993,6 +5109,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "scratch" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d68f2ec51b097e4c1a75b681a8bec621909b5e91f15bb7b840c4f2f7b01148b2" + [[package]] name = "sdp" version = "0.7.0" @@ -5025,7 +5147,7 @@ version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "core-foundation", "core-foundation-sys", "libc", @@ -5213,6 +5335,20 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simplersble" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbdafe1041b27f1051721d53034bf2a5c1b2f83acb2975f307e4e66f84d87937" +dependencies = [ + "cmake", + "cxx", + "cxx-build", + "futures", + "tokio", + "tokio-stream", +] + [[package]] name = "slab" version = "0.4.9" @@ -5447,7 +5583,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c879d448e9d986b661742763247d3693ed13609438cf3d006f51f5368a5ba6b" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "core-foundation", "system-configuration-sys", ] @@ -5673,27 +5809,26 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.2" +version = "1.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b88822cbe49de4185e3a4cbf8321dd487cf5fe0c5c65695fef6346371e9c48" +checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" dependencies = [ - "backtrace", "bytes", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.9", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] name = "tokio-macros" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +checksum = "af407857209536a95c8e56f8231ef2c2e2aff839b22e07a1ffcbc617e9db9fa5" dependencies = [ "proc-macro2", "quote", @@ -5729,6 +5864,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -5851,7 +5987,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", "bytes", "futures-util", "http 1.3.1", @@ -6063,6 +6199,12 @@ version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" +[[package]] +name = "unicode-width" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" + [[package]] name = "universal-hash" version = "0.5.1" @@ -6736,13 +6878,19 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" +[[package]] +name = "windows-link" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" + [[package]] name = "windows-registry" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4286ad90ddb45071efd1a66dfa43eb02dd0dfbae1545ad6cc3c51cf34d7e8ba3" dependencies = [ - "windows-result 0.3.2", + "windows-result 0.3.4", "windows-strings 0.3.1", "windows-targets 0.53.0", ] @@ -6767,11 +6915,11 @@ dependencies = [ [[package]] name = "windows-result" -version = "0.3.2" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c64fd11a4fd95df68efcfee5f44a294fe71b8bc6a91993e2791938abcc712252" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link", + "windows-link 0.1.1", ] [[package]] @@ -6790,7 +6938,7 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "87fa48cc5d406560701792be122a10132491cff9d0aeb23583cc2dcafc847319" dependencies = [ - "windows-link", + "windows-link 0.1.1", ] [[package]] @@ -6820,6 +6968,15 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link 0.2.1", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -7030,7 +7187,7 @@ version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" dependencies = [ - "bitflags 2.9.0", + "bitflags 2.10.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 36c148cffee..316ec3b2cae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ members = [ "swarm-test", "swarm", "transports/dns", + "transports/bluetooth", "transports/noise", "transports/plaintext", "transports/pnet", @@ -81,6 +82,7 @@ libp2p-connection-limits = { version = "0.6.0", path = "misc/connection-limits" libp2p-core = { version = "0.43.1", path = "core" } libp2p-dcutr = { version = "0.14.0", path = "protocols/dcutr" } libp2p-dns = { version = "0.44.0", path = "transports/dns" } +libp2p-bluetooth = { version = "0.1.0", path = "transports/bluetooth" } libp2p-floodsub = { version = "0.47.0", path = "protocols/floodsub" } libp2p-gossipsub = { version = "0.50.0", path = "protocols/gossipsub" } libp2p-identify = { version = "0.47.0", path = "protocols/identify" } @@ -127,7 +129,7 @@ getrandom = "0.2" if-watch = "3.2.1" hickory-proto = { version = "0.25.2", default-features = false } hickory-resolver = { version = "0.25.2", default-features = false } -multiaddr = "0.18.1" +multiaddr = "0.18.2" multihash = "0.19.1" multistream-select = { version = "0.13.0", path = "misc/multistream-select" } prometheus-client = "0.24" diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index dd1952fb93e..c29b0d9b411 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -13,6 +13,7 @@ categories = ["network-programming", "asynchronous"] [features] full = [ "autonat", + "bluetooth", "cbor", "dcutr", "dns", @@ -52,6 +53,7 @@ full = [ ] autonat = ["dep:libp2p-autonat"] +bluetooth = ["dep:libp2p-bluetooth"] cbor = ["libp2p-request-response?/cbor"] dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"] dns = ["dep:libp2p-dns"] @@ -128,6 +130,7 @@ libp2p-webtransport-websys = { workspace = true, optional = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] libp2p-dns = { workspace = true, optional = true } +libp2p-bluetooth = { workspace = true, optional = true } libp2p-mdns = { workspace = true, optional = true } libp2p-memory-connection-limits = { workspace = true, optional = true } libp2p-quic = { workspace = true, optional = true } diff --git a/libp2p/src/builder.rs b/libp2p/src/builder.rs index 95166bd34d4..17637c3ead3 100644 --- a/libp2p/src/builder.rs +++ b/libp2p/src/builder.rs @@ -102,6 +102,23 @@ mod tests { .build(); } + #[test] + #[cfg(all( + feature = "tokio", + feature = "bluetooth", + feature = "noise", + feature = "yamux", + ))] + fn bluetooth() { + let _ = SwarmBuilder::with_new_identity() + .with_tokio() + .with_bluetooth(libp2p_noise::Config::new, libp2p_yamux::Config::default) + .unwrap() + .with_behaviour(|_| libp2p_swarm::dummy::Behaviour) + .unwrap() + .build(); + } + #[test] #[cfg(all(feature = "tokio", feature = "quic"))] fn quic() { diff --git a/libp2p/src/builder/phase.rs b/libp2p/src/builder/phase.rs index fa378273630..76790c8d747 100644 --- a/libp2p/src/builder/phase.rs +++ b/libp2p/src/builder/phase.rs @@ -2,6 +2,7 @@ mod bandwidth_metrics; mod behaviour; +mod bluetooth; mod build; mod dns; mod identity; @@ -16,6 +17,7 @@ mod websocket; use bandwidth_metrics::*; pub use behaviour::BehaviourError; use behaviour::*; +use bluetooth::*; use build::*; use dns::*; use libp2p_core::{muxing::StreamMuxerBox, Transport}; diff --git a/libp2p/src/builder/phase/bluetooth.rs b/libp2p/src/builder/phase/bluetooth.rs new file mode 100644 index 00000000000..32f67dcbd8f --- /dev/null +++ b/libp2p/src/builder/phase/bluetooth.rs @@ -0,0 +1,86 @@ +use std::marker::PhantomData; + +use super::*; +use crate::SwarmBuilder; + +use libp2p_core::upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade}; +use libp2p_core::{Negotiated, UpgradeInfo}; + +pub struct BluetoothPhase { + pub(crate) transport: T, +} + +impl SwarmBuilder> { + pub(crate) fn without_bluetooth(self) -> SwarmBuilder> { + SwarmBuilder { + keypair: self.keypair, + phantom: PhantomData, + phase: OtherTransportPhase { + transport: self.phase.transport, + }, + } + } +} + +impl SwarmBuilder> { + pub fn with_other_transport< + Muxer: libp2p_core::muxing::StreamMuxer + Send + 'static, + OtherTransport: Transport + Send + Unpin + 'static, + R: TryIntoTransport, + >( + self, + constructor: impl FnOnce(&libp2p_identity::Keypair) -> R, + ) -> Result< + SwarmBuilder>, + R::Error, + > + where + ::Error: Send + Sync + 'static, + ::Dial: Send, + ::ListenerUpgrade: Send, + ::Substream: Send, + ::Error: Send + Sync, + { + self.without_bluetooth().with_other_transport(constructor) + } + + pub fn with_behaviour>( + self, + constructor: impl FnOnce(&libp2p_identity::Keypair) -> R, + ) -> Result>, R::Error> { + self.without_bluetooth() + .without_any_other_transports() + .without_dns() + .without_websocket() + .without_relay() + .with_behaviour(constructor) + } +} + +#[cfg(all(not(target_arch = "wasm32"), feature = "tokio", feature = "dns"))] +impl SwarmBuilder> { + pub fn with_dns( + self, + ) -> Result< + SwarmBuilder< + super::provider::Tokio, + WebsocketPhase, + >, + std::io::Error, + > { + self.without_bluetooth() + .without_any_other_transports() + .with_dns() + } + + pub fn with_dns_config( + self, + cfg: libp2p_dns::ResolverConfig, + opts: libp2p_dns::ResolverOpts, + ) -> SwarmBuilder> + { + self.without_bluetooth() + .without_any_other_transports() + .with_dns_config(cfg, opts) + } +} diff --git a/libp2p/src/builder/phase/tcp.rs b/libp2p/src/builder/phase/tcp.rs index f21ae109300..1427a1f9972 100644 --- a/libp2p/src/builder/phase/tcp.rs +++ b/libp2p/src/builder/phase/tcp.rs @@ -2,14 +2,14 @@ use std::marker::PhantomData; #[cfg(all( not(target_arch = "wasm32"), - any(feature = "tcp", feature = "websocket") + any(feature = "tcp", feature = "websocket", feature = "bluetooth") ))] use libp2p_core::muxing::{StreamMuxer, StreamMuxerBox}; #[cfg(all(feature = "websocket", not(target_arch = "wasm32")))] use libp2p_core::Transport; #[cfg(all( not(target_arch = "wasm32"), - any(feature = "tcp", feature = "websocket") + any(feature = "tcp", feature = "websocket", feature = "bluetooth") ))] use libp2p_core::{ upgrade::InboundConnectionUpgrade, upgrade::OutboundConnectionUpgrade, Negotiated, UpgradeInfo, @@ -111,6 +111,71 @@ impl SwarmBuilder { }, } } + + #[cfg(all(not(target_arch = "wasm32"), feature = "bluetooth"))] + #[allow(clippy::too_many_arguments)] + pub fn with_bluetooth( + self, + security_upgrade: SecUpgrade, + multiplexer_upgrade: MuxUpgrade, + ) -> Result< + SwarmBuilder>, + SecUpgrade::Error, + > + where + SecStream: futures::AsyncRead + futures::AsyncWrite + Unpin + Send + 'static, + SecError: std::error::Error + Send + Sync + 'static, + SecUpgrade: IntoSecurityUpgrade>>, + SecUpgrade::Upgrade: InboundConnectionUpgrade< + Negotiated>>, + Output = (libp2p_identity::PeerId, SecStream), + Error = SecError, + > + OutboundConnectionUpgrade< + Negotiated>>, + Output = (libp2p_identity::PeerId, SecStream), + Error = SecError, + > + Clone + + Send + + 'static, + >>>>::Future: Send, + >>>>::Future: Send, + <<>>>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, + <>>>::Upgrade as UpgradeInfo>::Info: Send, + MuxStream: StreamMuxer + Send + 'static, + MuxStream::Substream: Send + 'static, + MuxStream::Error: Send + Sync + 'static, + MuxUpgrade: IntoMultiplexerUpgrade, + MuxUpgrade::Upgrade: InboundConnectionUpgrade< + Negotiated, + Output = MuxStream, + Error = MuxError, + > + OutboundConnectionUpgrade< + Negotiated, + Output = MuxStream, + Error = MuxError, + > + Clone + + Send + + 'static, + >>::Future: Send, + >>::Future: Send, + MuxError: std::error::Error + Send + Sync + 'static, + <<>::Upgrade as UpgradeInfo>::InfoIter as IntoIterator>::IntoIter: Send, + <>::Upgrade as UpgradeInfo>::Info: Send, + { + Ok(SwarmBuilder { + phase: BluetoothPhase { + transport: libp2p_bluetooth::BluetoothTransport::new() + .upgrade(libp2p_core::upgrade::Version::V1Lazy) + .authenticate(security_upgrade.into_security_upgrade(&self.keypair)?) + .multiplex(multiplexer_upgrade.into_multiplexer_upgrade()) + .map(|(peer, muxer), _| (peer, StreamMuxerBox::new(muxer))), + }, + keypair: self.keypair, + phantom: PhantomData, + }) + } } #[cfg(all(not(target_arch = "wasm32"), feature = "quic", feature = "tokio"))] diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 42461f8ef8e..31369edf018 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -39,6 +39,9 @@ pub use libp2p_allow_block_list as allow_block_list; #[cfg(feature = "autonat")] #[doc(inline)] pub use libp2p_autonat as autonat; +#[cfg(all(feature = "bluetooth", not(target_arch = "wasm32")))] +#[doc(inline)] +pub use libp2p_bluetooth as bluetooth; #[doc(inline)] pub use libp2p_connection_limits as connection_limits; #[doc(inline)] diff --git a/transports/bluetooth/Cargo.toml b/transports/bluetooth/Cargo.toml new file mode 100644 index 00000000000..c1462cc563e --- /dev/null +++ b/transports/bluetooth/Cargo.toml @@ -0,0 +1,39 @@ +[package] +name = "libp2p-bluetooth" +edition.workspace = true +rust-version = { workspace = true } +description = "Bluetooth transport protocol for libp2p" +version = "0.1.0" +authors = ["Parity Technologies "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[features] +default = [] + +[dependencies] +fnv = "1.0" +futures = { workspace = true } +libp2p-core = { workspace = true } +multiaddr = { workspace = true } +parking_lot = "0.12.3" +rand = "0.8" +rw-stream-sink = { workspace = true } +log = "0.4" +uuid = "1.0" +simplersble = "0.10" +tokio = { version = "1", features = ["sync", "rt", "time"] } +tokio-stream = "0.1" +bytes = "1.0" +async-trait = "0.1" + +[target.'cfg(target_os = "macos")'.dependencies] +objc2 = "0.5" +objc2-core-bluetooth = { version = "0.2", features = ["all"] } +objc2-foundation = { version = "0.2", features = ["all"] } +block2 = "0.5" + +[lints] +workspace = true diff --git a/transports/bluetooth/README.md b/transports/bluetooth/README.md new file mode 100644 index 00000000000..6f77e5b62f8 --- /dev/null +++ b/transports/bluetooth/README.md @@ -0,0 +1,189 @@ +# libp2p Bluetooth Transport + +A Bluetooth Low Energy (BLE) transport implementation for libp2p with dual-role support. + +## Features + +- **Central (Client) Role**: Full support for scanning and connecting to BLE peripherals via SimpleBLE +- **Peripheral (Server) Role**: Full support for advertising and accepting connections (macOS via CoreBluetooth, Linux/Windows in development) +- **Dual-Role Support**: Can run both central and peripheral simultaneously for peer-to-peer connectivity +- **Cross-platform Central**: Works on macOS, Linux, and Windows via SimpleBLE +- **Frame-based Communication**: Automatic message framing for reliable data transfer +- **Integrated with libp2p**: Full support for Noise encryption, Yamux multiplexing, and all libp2p protocols + +## Current Status + +✅ **Central Mode**: Fully working on all platforms +✅ **Peripheral Mode**: Fully working on macOS (95% complete) +🔧 **Peripheral Mode**: Linux/Windows support in development +✅ **Dual-Role**: Fully working on macOS + +## Testing with Two Machines + +The transport supports dual-role operation - each peer can simultaneously act as both central and peripheral. This allows for true peer-to-peer connectivity over Bluetooth. + +**On macOS**, both machines can run the same application and discover each other automatically. + +### Service UUIDs + +The transport uses these UUIDs: + +- **Service UUID**: `00001234-0000-1000-8000-00805f9b34fb` +- **RX Characteristic**: `00001235-0000-1000-8000-00805f9b34fb` (receives data from central) +- **TX Characteristic**: `00001236-0000-1000-8000-00805f9b34fb` (sends data to central) + +### Setup Instructions (macOS Dual-Role) + +#### Machine A + +```bash +cd /path/to/rust-libp2p-bluetooth-test +RUST_LOG=info,libp2p_bluetooth=debug cargo run --release +``` + +The application will: +- Start advertising as a peripheral +- Start scanning as a central +- Display its peer address to share with peers + +#### Machine B + +```bash +cd /path/to/rust-libp2p-bluetooth-test +# Use the peer address from Machine A if you want to dial explicitly +RUST_LOG=info,libp2p_bluetooth=debug cargo run --release -- \ + --peer /bluetooth//p2p/ +``` + +The application will: +- Start advertising as a peripheral +- Start scanning as a central +- Discover and connect to Machine A + +### Testing Flow + +1. Start both applications +2. Watch the logs for discovery and connection: + ``` + INFO Local peer id: 12D3KooW... + INFO Listening on /bluetooth/... (share this with peers: /bluetooth/.../p2p/12D3KooW...) + INFO Started advertising successfully + INFO Peripheral manager powered on, starting setup + INFO Service added successfully: 00001234-0000-1000-8000-00805f9b34fb + INFO Found peripheral: + INFO Connection established with 12D3KooW... + INFO Peer 12D3KooW... subscribed to bluetooth-chat + ``` + +3. Type messages and press Enter to send them via Gossipsub +4. See messages from the peer displayed in the terminal + +### Troubleshooting + +**"No BLE adapters found"** +- Ensure your machine has Bluetooth capability +- On Linux, check `bluetoothctl` is working +- On macOS, check Bluetooth is enabled in System Settings +- Grant Bluetooth permissions to your terminal application + +**"No peripherals found"** +- Ensure the other peer is running with Bluetooth enabled +- Check both peers are using the same service UUID +- Verify Bluetooth permissions are granted on both machines +- Try restarting both peers +- Increase the scan window if needed by setting `LIBP2P_BLUETOOTH_SCAN_TIMEOUT_SECS` (seconds) or `LIBP2P_BLUETOOTH_SCAN_COLLECTION_MS` (milliseconds) + +**"Connection failed"** +- Check Bluetooth signal strength (peers need to be within range) +- Verify the service UUID matches on both peers +- Ensure characteristics have correct properties (read, write, notify) +- Check system Bluetooth is not busy with other connections + +## Architecture + +### Dual-Role Design + +``` +┌─────────────────────────────────────┐ +│ BluetoothTransport (Dual-Role) │ +├─────────────────────────────────────┤ +│ │ +│ ┌────────────┐ ┌──────────────┐ │ +│ │ Central │ │ Peripheral │ │ +│ │ (SimpleBLE)│ │(CoreBluetooth)│ │ +│ └──────┬─────┘ └──────┬───────┘ │ +│ │ │ │ +│ Scan/Connect Advertise/Accept│ +│ │ │ │ +└─────────┼────────────────┼─────────┘ + │ │ + ▼ ▼ + libp2p Swarm + Protocols +``` + +Each peer runs both roles simultaneously: +- **Central role** (SimpleBLE): Scans for and connects to other BLE peripherals +- **Peripheral role** (CoreBluetooth on macOS): Advertises and accepts incoming connections +- **libp2p layer**: Handles peer identity, security (Noise), multiplexing (Yamux), and application protocols + +### Framing Layer + +BLE has limited MTU sizes (typically 23-512 bytes). This transport uses a simple length-prefix framing protocol: + +- 4-byte big-endian length prefix +- Maximum frame size: 1MB +- Automatic frame assembly from BLE notification chunks +- Transparent fragmentation and reassembly + +### Connection Flow (Dual-Role) + +``` + Peer A Peer B +(Central+Peripheral) (Central+Peripheral) + | | + |-- Advertise --------> | + | <-------- Advertise --| + | | + |-- Scan discovers B --> | + | <-- Scan discovers A -| + | | + |-- Connect as Central -------->| + | (B acts as Peripheral)| + |<- Connection Established -----| + | | + |-- GATT Service Discovery ---->| + |<- Characteristics Info -------| + | | + |-- Subscribe to notifications->| + | | + |<==== Bidirectional Data =====>| + | (libp2p protocols over BLE) | +``` + +## Implementation Details + +- **Transport Type**: Implements `libp2p_core::Transport` +- **Channel Type**: `RwStreamSink>>` +- **Central Library**: SimpleBLE (simplersble v0.10) - cross-platform +- **Peripheral Library**: CoreBluetooth (objc2-core-bluetooth v0.2) - macOS only +- **Async Runtime**: Tokio +- **Security**: Noise protocol (XX handshake) +- **Multiplexing**: Yamux + +## Current Limitations + +1. **Peripheral mode**: macOS only (CoreBluetooth implementation) +2. **Threading**: Peripheral operations must run on main thread (CoreBluetooth requirement) +3. **MTU**: Fixed at 512 bytes, no automatic negotiation +4. **Reconnection**: Manual restart required on disconnect +5. **Multiplexing**: Single connection per peripheral instance + +## Future Enhancements + +- [ ] Automatic reconnection logic +- [ ] MTU negotiation for better throughput +- [ ] Linux peripheral support (BlueZ D-Bus API) +- [ ] Windows peripheral support (Windows.Devices.Bluetooth) +- [ ] Multi-connection support for peripherals +- [ ] Proper dispatch queue integration for peripheral operations +- [ ] Background operation support diff --git a/transports/bluetooth/src/common.rs b/transports/bluetooth/src/common.rs new file mode 100644 index 00000000000..2a9a1b191e2 --- /dev/null +++ b/transports/bluetooth/src/common.rs @@ -0,0 +1,102 @@ +use std::{error, fmt, str::FromStr}; + +use libp2p_core::multiaddr::{Multiaddr, Protocol}; + +/// Bluetooth MAC address wrapper used by the in-memory mock transport as well as +/// platform specific implementations. +#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)] +pub struct BluetoothAddr(pub(crate) [u8; 6]); + +impl BluetoothAddr { + pub fn new(bytes: [u8; 6]) -> Self { + Self(bytes) + } + + pub(crate) fn into_u64(self) -> u64 { + let mut bytes = [0u8; 8]; + bytes[2..].copy_from_slice(&self.0); + u64::from_be_bytes(bytes) + } + + pub fn to_multiaddr(self) -> Multiaddr { + Protocol::Memory(self.into_u64()).into() + } +} + +impl fmt::Display for BluetoothAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{:02x}:{:02x}:{:02x}:{:02x}:{:02x}:{:02x}", + self.0[0], self.0[1], self.0[2], self.0[3], self.0[4], self.0[5] + ) + } +} + +impl FromStr for BluetoothAddr { + type Err = BluetoothAddrParseError; + + fn from_str(s: &str) -> Result { + let mut parts = s.split(':'); + let mut bytes = [0u8; 6]; + for byte in bytes.iter_mut() { + let part = parts.next().ok_or(BluetoothAddrParseError::InvalidFormat)?; + if part.len() != 2 { + return Err(BluetoothAddrParseError::InvalidFormat); + } + *byte = + u8::from_str_radix(part, 16).map_err(|_| BluetoothAddrParseError::InvalidFormat)?; + } + if parts.next().is_some() { + return Err(BluetoothAddrParseError::InvalidFormat); + } + Ok(Self(bytes)) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BluetoothAddrParseError { + InvalidFormat, +} + +impl fmt::Display for BluetoothAddrParseError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "invalid bluetooth address format") + } +} + +impl error::Error for BluetoothAddrParseError {} + +/// Error that can be produced from the `BluetoothTransport`. +#[derive(Debug, Copy, Clone)] +pub enum BluetoothTransportError { + /// There's no listener for the requested address. + Unreachable, + /// Tried to listen on an address that is already registered. + AlreadyInUse, + /// The current platform does not provide a Bluetooth transport implementation. + Unsupported, + /// Failed to establish a connection to a remote peer. + ConnectionFailed, +} + +impl fmt::Display for BluetoothTransportError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + BluetoothTransportError::Unreachable => { + write!(f, "No listener for the given bluetooth address.") + } + BluetoothTransportError::AlreadyInUse => { + write!(f, "Bluetooth address already in use.") + } + BluetoothTransportError::Unsupported => { + write!(f, "Bluetooth transport not supported on this platform.") + } + BluetoothTransportError::ConnectionFailed => { + write!(f, "Failed to establish Bluetooth connection.") + } + } + } +} + +impl error::Error for BluetoothTransportError {} diff --git a/transports/bluetooth/src/framing.rs b/transports/bluetooth/src/framing.rs new file mode 100644 index 00000000000..d949bd07819 --- /dev/null +++ b/transports/bluetooth/src/framing.rs @@ -0,0 +1,153 @@ +//! Simple framing layer for sending length-prefixed messages over BLE characteristics. +//! +//! BLE characteristics have limited MTU sizes (typically 20-512 bytes), so we need +//! to frame our messages properly. This module provides a simple length-prefix framing +//! where each message is prefixed with a 4-byte big-endian length. + +use bytes::{Buf, BufMut, BytesMut}; +use std::io; + +/// Maximum frame size (1MB - prevents DoS) +const MAX_FRAME_SIZE: usize = 1024 * 1024; + +/// Frame encoder/decoder for length-prefixed messages +pub(crate) struct FrameCodec { + buffer: BytesMut, +} + +impl FrameCodec { + pub(crate) fn new() -> Self { + Self { + buffer: BytesMut::with_capacity(4096), + } + } + + /// Encode a message with length prefix + pub(crate) fn encode(&self, data: &[u8]) -> Result, io::Error> { + if data.len() > MAX_FRAME_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "frame too large", + )); + } + + let mut buf = Vec::with_capacity(4 + data.len()); + buf.put_u32(data.len() as u32); + buf.extend_from_slice(data); + Ok(buf) + } + + /// Add incoming data to the buffer + pub(crate) fn push_data(&mut self, data: &[u8]) { + self.buffer.extend_from_slice(data); + } + + /// Try to decode the next complete frame from the buffer + pub(crate) fn decode_next(&mut self) -> Result>, io::Error> { + // Need at least 4 bytes for length prefix + if self.buffer.len() < 4 { + return Ok(None); + } + + // Peek at the length without consuming + let mut length_bytes = &self.buffer[..4]; + let frame_len = length_bytes.get_u32() as usize; + + // Validate frame size + if frame_len > MAX_FRAME_SIZE { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + format!( + "frame size {} exceeds maximum {}", + frame_len, MAX_FRAME_SIZE + ), + )); + } + + // Check if we have the complete frame + let total_len = 4 + frame_len; + if self.buffer.len() < total_len { + return Ok(None); // Need more data + } + + // Consume the length prefix + self.buffer.advance(4); + + // Extract the frame data + let frame = self.buffer.split_to(frame_len).to_vec(); + + Ok(Some(frame)) + } +} + +impl Default for FrameCodec { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_encode_decode() { + let codec = FrameCodec::new(); + let data = b"hello world"; + + // Encode + let encoded = codec.encode(data).unwrap(); + assert_eq!(encoded.len(), 4 + data.len()); + + // Decode + let mut decoder = FrameCodec::new(); + decoder.push_data(&encoded); + let decoded = decoder.decode_next().unwrap().unwrap(); + assert_eq!(decoded, data); + } + + #[test] + fn test_partial_frames() { + let codec = FrameCodec::new(); + let data = b"hello world"; + let encoded = codec.encode(data).unwrap(); + + let mut decoder = FrameCodec::new(); + + // Push only part of the data + decoder.push_data(&encoded[..5]); + assert!(decoder.decode_next().unwrap().is_none()); + + // Push the rest + decoder.push_data(&encoded[5..]); + let decoded = decoder.decode_next().unwrap().unwrap(); + assert_eq!(decoded, data); + } + + #[test] + fn test_multiple_frames() { + let codec = FrameCodec::new(); + let data1 = b"hello"; + let data2 = b"world"; + + let encoded1 = codec.encode(data1).unwrap(); + let encoded2 = codec.encode(data2).unwrap(); + + let mut decoder = FrameCodec::new(); + decoder.push_data(&encoded1); + decoder.push_data(&encoded2); + + let decoded1 = decoder.decode_next().unwrap().unwrap(); + assert_eq!(decoded1, data1); + + let decoded2 = decoder.decode_next().unwrap().unwrap(); + assert_eq!(decoded2, data2); + } + + #[test] + fn test_max_frame_size() { + let codec = FrameCodec::new(); + let data = vec![0u8; MAX_FRAME_SIZE + 1]; + assert!(codec.encode(&data).is_err()); + } +} diff --git a/transports/bluetooth/src/lib.rs b/transports/bluetooth/src/lib.rs new file mode 100644 index 00000000000..842c29fd110 --- /dev/null +++ b/transports/bluetooth/src/lib.rs @@ -0,0 +1,26 @@ +//! Bluetooth transport implementation for libp2p. +//! +//! This transport uses btleplug for cross-platform BLE support, providing +//! Central (dialing) and Peripheral (listening) roles. + +mod common; +mod framing; + +// Platform-specific peripheral implementations +#[cfg(target_os = "macos")] +mod peripheral_macos; + +#[cfg(not(test))] +mod platform; + +// Use mock for tests +#[cfg(test)] +mod mock; + +pub use common::{BluetoothAddr, BluetoothAddrParseError, BluetoothTransportError}; + +#[cfg(not(test))] +pub use platform::*; + +#[cfg(test)] +pub use mock::*; diff --git a/transports/bluetooth/src/mock.rs b/transports/bluetooth/src/mock.rs new file mode 100644 index 00000000000..799954fee15 --- /dev/null +++ b/transports/bluetooth/src/mock.rs @@ -0,0 +1,415 @@ +use std::{ + collections::{hash_map::Entry, VecDeque}, + future::Future, + pin::Pin, + sync::LazyLock, + task::{Context, Poll}, +}; + +use fnv::FnvHashMap; +use futures::{ + channel::mpsc, + future::{self, Ready}, + prelude::*, +}; +use libp2p_core::{ + multiaddr::{Multiaddr, Protocol}, + transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent}, +}; +use parking_lot::Mutex; +use rand::random; +use rw_stream_sink::RwStreamSink; + +use crate::common::{BluetoothAddr, BluetoothTransportError}; + +type ChannelSender = mpsc::Sender<(Channel>, BluetoothAddr)>; +type ChannelReceiver = mpsc::Receiver<(Channel>, BluetoothAddr)>; + +static HUB: LazyLock = LazyLock::new(|| Hub(Mutex::new(FnvHashMap::default()))); + +struct Hub(Mutex>); + +impl Hub { + fn register_addr( + &self, + requested: Option, + ) -> Option<(ChannelReceiver, BluetoothAddr)> { + let mut hub = self.0.lock(); + + let addr = if let Some(addr) = requested { + if hub.contains_key(&addr) { + return None; + } + addr + } else { + loop { + let candidate = random_local_addr(); + if !hub.contains_key(&candidate) { + break candidate; + } + } + }; + + let (tx, rx) = mpsc::channel(2); + match hub.entry(addr) { + Entry::Occupied(_) => return None, + Entry::Vacant(entry) => { + entry.insert(tx); + } + } + + Some((rx, addr)) + } + + fn unregister_addr(&self, addr: &BluetoothAddr) -> Option { + self.0.lock().remove(addr) + } + + fn get(&self, addr: &BluetoothAddr) -> Option { + self.0.lock().get(addr).cloned() + } +} + +#[derive(Default)] +pub struct BluetoothTransport { + listeners: VecDeque>>, +} + +impl BluetoothTransport { + pub fn new() -> Self { + Self::default() + } +} + +pub type Channel = RwStreamSink>; + +pub struct Chan> { + incoming: mpsc::Receiver, + outgoing: mpsc::Sender, + dial_addr: Option, +} + +impl Unpin for Chan {} + +impl Stream for Chan { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Stream::poll_next(Pin::new(&mut self.incoming), cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(item)) => Poll::Ready(Some(Ok(item))), + Poll::Ready(None) => Poll::Ready(None), + } + } +} + +impl Sink for Chan { + type Error = std::io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::poll_ready(Pin::new(&mut self.outgoing), cx).map_err(map_channel_err) + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + Sink::start_send(Pin::new(&mut self.outgoing), item).map_err(map_channel_err) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::poll_flush(Pin::new(&mut self.outgoing), cx).map_err(map_channel_err) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::poll_close(Pin::new(&mut self.outgoing), cx).map_err(map_channel_err) + } +} + +fn map_channel_err(error: mpsc::SendError) -> std::io::Error { + if error.is_full() { + std::io::Error::new(std::io::ErrorKind::WouldBlock, error) + } else { + std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed") + } +} + +impl Drop for Chan { + fn drop(&mut self) { + if let Some(addr) = self.dial_addr { + let channel_sender = HUB.unregister_addr(&addr); + debug_assert!(channel_sender.is_some()); + } + } +} + +pub struct DialFuture { + dial_addr: BluetoothAddr, + sender: ChannelSender, + channel_to_send: Option>>, + channel_to_return: Option>>, +} + +impl DialFuture { + fn new(remote: BluetoothAddr) -> Option { + let sender = HUB.get(&remote)?; + + let (_dial_receiver, dial_addr) = HUB + .register_addr(None) + .expect("random bluetooth address generation to succeed"); + + let (a_tx, a_rx) = mpsc::channel(4096); + let (b_tx, b_rx) = mpsc::channel(4096); + + Some(DialFuture { + dial_addr, + sender, + channel_to_send: Some(RwStreamSink::new(Chan { + incoming: a_rx, + outgoing: b_tx, + dial_addr: None, + })), + channel_to_return: Some(RwStreamSink::new(Chan { + incoming: b_rx, + outgoing: a_tx, + dial_addr: Some(dial_addr), + })), + }) + } +} + +impl Future for DialFuture { + type Output = Result>, BluetoothTransportError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.sender.poll_ready(cx) { + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(())) => {} + Poll::Ready(Err(_)) => return Poll::Ready(Err(BluetoothTransportError::Unreachable)), + } + + let channel_to_send = self + .channel_to_send + .take() + .expect("Future should not be polled after completion"); + let dial_addr = self.dial_addr; + if self + .sender + .start_send((channel_to_send, dial_addr)) + .is_err() + { + return Poll::Ready(Err(BluetoothTransportError::Unreachable)); + } + + Poll::Ready(Ok(self + .channel_to_return + .take() + .expect("Future should not be polled after completion"))) + } +} + +impl Transport for BluetoothTransport { + type Output = Channel>; + type Error = BluetoothTransportError; + type ListenerUpgrade = Ready>; + type Dial = DialFuture; + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + let requested_addr = + parse_bluetooth_addr(&addr).map_err(|_| TransportError::MultiaddrNotSupported(addr))?; + + let (receiver, actual_addr) = match requested_addr { + Some(addr) => HUB + .register_addr(Some(addr)) + .ok_or(TransportError::Other(BluetoothTransportError::AlreadyInUse))?, + None => HUB + .register_addr(None) + .ok_or(TransportError::Other(BluetoothTransportError::Unreachable))?, + }; + + let listen_addr = actual_addr.to_multiaddr(); + let listener = Listener { + id, + addr: listen_addr.clone(), + receiver, + tell_listen_addr: true, + registered_addr: actual_addr, + }; + + self.listeners.push_back(Box::pin(listener)); + + Ok(()) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + if let Some(index) = self.listeners.iter().position(|listener| listener.id == id) { + let listener = self.listeners.get_mut(index).expect("index valid"); + let val_in = HUB.unregister_addr(&listener.registered_addr); + debug_assert!(val_in.is_some()); + listener.receiver.close(); + true + } else { + false + } + } + + fn dial( + &mut self, + addr: Multiaddr, + _opts: DialOpts, + ) -> Result> { + let remote = match parse_bluetooth_addr(&addr) { + Ok(Some(addr)) => addr, + _ => return Err(TransportError::MultiaddrNotSupported(addr)), + }; + + DialFuture::new(remote).ok_or(TransportError::Other(BluetoothTransportError::Unreachable)) + } + + fn poll( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let mut remaining = self.listeners.len(); + while let Some(mut listener) = self.listeners.pop_back() { + if listener.tell_listen_addr { + listener.tell_listen_addr = false; + let listen_addr = listener.addr.clone(); + let listener_id = listener.id; + self.listeners.push_front(listener); + return Poll::Ready(TransportEvent::NewAddress { + listen_addr, + listener_id, + }); + } + + let event = match Stream::poll_next(Pin::new(&mut listener.receiver), cx) { + Poll::Pending => None, + Poll::Ready(Some((channel, dial_addr))) => Some(TransportEvent::Incoming { + listener_id: listener.id, + upgrade: future::ready(Ok(channel)), + local_addr: listener.addr.clone(), + send_back_addr: dial_addr.to_multiaddr(), + }), + Poll::Ready(None) => { + return Poll::Ready(TransportEvent::ListenerClosed { + listener_id: listener.id, + reason: Ok(()), + }); + } + }; + + self.listeners.push_front(listener); + if let Some(event) = event { + return Poll::Ready(event); + } + + remaining -= 1; + if remaining == 0 { + break; + } + } + + Poll::Pending + } +} + +pub struct Listener { + id: ListenerId, + addr: Multiaddr, + receiver: ChannelReceiver, + tell_listen_addr: bool, + registered_addr: BluetoothAddr, +} + +impl Drop for Listener { + fn drop(&mut self) { + let _ = HUB.unregister_addr(&self.registered_addr); + } +} + +fn parse_bluetooth_addr(addr: &Multiaddr) -> Result, ()> { + let mut protocols = addr.iter(); + match protocols.next() { + Some(Protocol::Memory(value)) => match protocols.next() { + None | Some(Protocol::P2p(_)) => { + if value == 0 { + Ok(None) + } else { + BluetoothAddr::from_u64(value).map(Some).ok_or(()) + } + } + _ => Err(()), + }, + _ => Err(()), + } +} + +fn random_local_addr() -> BluetoothAddr { + loop { + let raw: u64 = random(); + let bytes = raw.to_be_bytes(); + let mut addr = [0u8; 6]; + addr.copy_from_slice(&bytes[2..]); + addr[0] |= 0x02; // locally administered + addr[0] &= 0xfe; // unicast + if addr.iter().any(|b| *b != 0) { + return BluetoothAddr::new(addr); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{ + executor::block_on, + io::{AsyncReadExt, AsyncWriteExt}, + }; + use libp2p_core::transport::{DialOpts, ListenerId, TransportEvent}; + use libp2p_core::{transport::Transport, Multiaddr}; + use std::str::FromStr; + + #[test] + fn dial_and_accept() { + let mut listener = BluetoothTransport::new(); + let listen_addr: Multiaddr = BluetoothAddr::from_str("02:00:00:00:00:01") + .unwrap() + .to_multiaddr(); + let listener_id = ListenerId::next(); + listener + .listen_on(listener_id, listen_addr.clone()) + .unwrap(); + + // Consume the initial NewAddress event. + let event = block_on(futures::future::poll_fn(|cx| { + Pin::new(&mut listener).poll(cx) + })); + matches!(event, TransportEvent::NewAddress { .. }) + .then_some(()) + .expect("listener announces new address"); + + let mut dialer = BluetoothTransport::new(); + let dial_opts = DialOpts::unknown_peer_id() + .address(listen_addr.clone()) + .build(); + let dial_future = dialer.dial(listen_addr.clone(), dial_opts).unwrap(); + + let mut dial_conn = block_on(dial_future).unwrap(); + + let mut listener_conn = loop { + let event = block_on(futures::future::poll_fn(|cx| { + Pin::new(&mut listener).poll(cx) + })); + if let TransportEvent::Incoming { upgrade, .. } = event { + break block_on(upgrade).unwrap(); + } + }; + + block_on(dial_conn.write_all(b"ping")).unwrap(); + let mut buf = [0u8; 4]; + block_on(listener_conn.read_exact(&mut buf)).unwrap(); + assert_eq!(&buf, b"ping"); + } +} diff --git a/transports/bluetooth/src/peripheral_macos.rs b/transports/bluetooth/src/peripheral_macos.rs new file mode 100644 index 00000000000..2be63c38193 --- /dev/null +++ b/transports/bluetooth/src/peripheral_macos.rs @@ -0,0 +1,608 @@ +//! macOS BLE peripheral implementation using CoreBluetooth. +//! +//! This provides the peripheral (server) role for BLE, allowing the app to advertise +//! and accept incoming connections from centrals. + +use std::sync::Arc; + +use futures::channel::mpsc; +use objc2::rc::Retained; +use objc2::runtime::ProtocolObject; +use objc2::{declare_class, msg_send_id, mutability, ClassType, DeclaredClass}; +use objc2_core_bluetooth::{ + CBATTError, CBATTRequest, CBAdvertisementDataServiceUUIDsKey, CBCentral, CBCharacteristic, + CBCharacteristicProperties, CBManagerState, CBMutableCharacteristic, CBMutableService, + CBPeripheralManager, CBPeripheralManagerDelegate, CBUUID, +}; +use objc2_foundation::{ + NSArray, NSData, NSDictionary, NSError, NSObject, NSObjectProtocol, NSString, +}; +use parking_lot::Mutex; +use uuid::Uuid; + +use crate::framing::FrameCodec; + +/// libp2p BLE service UUID +const LIBP2P_SERVICE_UUID: Uuid = Uuid::from_u128(0x00001234_0000_1000_8000_00805f9b34fb); + +/// Characteristic UUID for RX (receiving data from central - they write) +const RX_CHARACTERISTIC_UUID: Uuid = Uuid::from_u128(0x00001235_0000_1000_8000_00805f9b34fb); + +/// Characteristic UUID for TX (transmitting data to central - they read/subscribe) +const TX_CHARACTERISTIC_UUID: Uuid = Uuid::from_u128(0x00001236_0000_1000_8000_00805f9b34fb); + +/// Shared state for the peripheral manager +struct PeripheralState { + incoming_tx: mpsc::Sender>, + outgoing_queue: Vec>, + outgoing_data_rx: Option>>, + subscribed_centrals: Vec>, + tx_characteristic: Option>, + rx_characteristic: Option>, + ready_to_send: bool, + frame_codec: FrameCodec, + peripheral_manager: Option>, +} + +pub(crate) struct PeripheralManagerDelegateIvars { + state: Mutex, +} + +declare_class!( + pub(crate) struct PeripheralManagerDelegate; + + unsafe impl ClassType for PeripheralManagerDelegate { + type Super = NSObject; + type Mutability = mutability::InteriorMutable; + const NAME: &'static str = "PeripheralManagerDelegate"; + } + + impl DeclaredClass for PeripheralManagerDelegate { + type Ivars = PeripheralManagerDelegateIvars; + } + + unsafe impl NSObjectProtocol for PeripheralManagerDelegate {} + + unsafe impl CBPeripheralManagerDelegate for PeripheralManagerDelegate { + #[method(peripheralManagerDidUpdateState:)] + fn peripheral_manager_did_update_state(&self, peripheral: &CBPeripheralManager) { + unsafe { + let state = peripheral.state(); + log::info!("Peripheral manager state changed: {:?}", state); + + if state == CBManagerState::PoweredOn { + log::info!("Peripheral manager powered on, starting setup"); + self.setup_service(peripheral); + } + } + } + + #[method(peripheralManager:willRestoreState:)] + fn peripheral_manager_will_restore_state( + &self, + _peripheral: &CBPeripheralManager, + _dict: &objc2_foundation::NSDictionary, + ) { + log::info!("Peripheral manager will restore state"); + } + + #[method(peripheralManager:didAddService:error:)] + fn peripheral_manager_did_add_service( + &self, + _peripheral: &CBPeripheralManager, + service: &objc2_core_bluetooth::CBService, + error: Option<&NSError>, + ) { + unsafe { + if let Some(error) = error { + log::error!("Failed to add service: {}", error.localizedDescription()); + } else { + log::info!("Service added successfully: {}", service.UUID().UUIDString()); + } + } + } + + #[method(peripheralManagerDidStartAdvertising:error:)] + fn peripheral_manager_did_start_advertising( + &self, + _peripheral: &CBPeripheralManager, + error: Option<&NSError>, + ) { + if let Some(error) = error { + log::error!("Failed to start advertising: {}", error.localizedDescription()); + } else { + log::info!("Started advertising successfully"); + } + } + + #[method(peripheralManager:central:didSubscribeToCharacteristic:)] + fn peripheral_manager_central_did_subscribe_to_characteristic( + &self, + peripheral: &CBPeripheralManager, + central: &CBCentral, + characteristic: &CBCharacteristic, + ) { + unsafe { + log::info!( + "Central {} subscribed to characteristic {}", + central.identifier().UUIDString(), + characteristic.UUID().UUIDString() + ); + + let mut state = self.ivars().state.lock(); + if !state.subscribed_centrals.iter().any(|c| c.identifier() == central.identifier()) { + state.subscribed_centrals.push(central.retain()); + } + state.ready_to_send = true; + drop(state); + + // Check for new data and try to send any queued data + self.check_and_send_data(); + } + } + + #[method(peripheralManager:central:didUnsubscribeFromCharacteristic:)] + fn peripheral_manager_central_did_unsubscribe_from_characteristic( + &self, + _peripheral: &CBPeripheralManager, + central: &CBCentral, + characteristic: &CBCharacteristic, + ) { + unsafe { + log::info!( + "Central {} unsubscribed from characteristic {}", + central.identifier().UUIDString(), + characteristic.UUID().UUIDString() + ); + + let mut state = self.ivars().state.lock(); + state.subscribed_centrals.retain(|c| c.identifier() != central.identifier()); + } + } + + #[method(peripheralManager:didReceiveReadRequest:)] + fn peripheral_manager_did_receive_read_request( + &self, + peripheral: &CBPeripheralManager, + request: &CBATTRequest, + ) { + unsafe { + log::debug!("Received read request for characteristic {}", + request.characteristic().UUID().UUIDString()); + + // For now, respond with empty data + let data = NSData::new(); + request.setValue(Some(&data)); + peripheral.respondToRequest_withResult(request, CBATTError::Success); + } + } + + #[method(peripheralManager:didReceiveWriteRequests:)] + fn peripheral_manager_did_receive_write_requests( + &self, + peripheral: &CBPeripheralManager, + requests: &NSArray, + ) { + unsafe { + log::info!("🔵 Peripheral received {} write request(s)", requests.count()); + + for i in 0..requests.count() { + let request = requests.objectAtIndex(i); + log::info!(" Request {} - characteristic: {}", i, request.characteristic().UUID().UUIDString()); + + if let Some(value) = request.value() { + let bytes: &[u8] = value.bytes(); + + log::info!(" Request {} - received {} bytes", i, bytes.len()); + + // Process the data through frame codec + let mut state = self.ivars().state.lock(); + state.frame_codec.push_data(bytes); + + while let Ok(Some(frame)) = state.frame_codec.decode_next() { + log::info!(" ✓ Decoded frame: {} bytes", frame.len()); + let _ = state.incoming_tx.try_send(frame); + } + } else { + log::warn!(" Request {} has no value", i); + } + } + + // Respond success to all requests + if requests.count() > 0 { + if let Some(first_request) = requests.firstObject() { + log::info!(" Responding with Success to request"); + peripheral.respondToRequest_withResult(first_request.as_ref(), CBATTError::Success); + } + } else { + log::warn!(" No requests to respond to!"); + } + + // Check for and send any pending outgoing data + // We do this after receiving data since it indicates an active connection + self.check_and_send_data(); + } + } + + #[method(peripheralManagerIsReadyToUpdateSubscribers:)] + fn peripheral_manager_is_ready_to_update_subscribers( + &self, + peripheral: &CBPeripheralManager, + ) { + log::debug!("Peripheral manager ready to update subscribers"); + let mut state = self.ivars().state.lock(); + state.ready_to_send = true; + drop(state); + + // Check for new data and send + self.check_and_send_data(); + } + } +); + +impl PeripheralManagerDelegate { + pub(crate) fn new( + incoming_tx: mpsc::Sender>, + outgoing_data_rx: mpsc::Receiver>, + ) -> Retained { + let this = Self::alloc().set_ivars(PeripheralManagerDelegateIvars { + state: Mutex::new(PeripheralState { + incoming_tx, + outgoing_queue: Vec::new(), + outgoing_data_rx: Some(outgoing_data_rx), + subscribed_centrals: Vec::new(), + tx_characteristic: None, + rx_characteristic: None, + ready_to_send: false, + frame_codec: FrameCodec::new(), + peripheral_manager: None, + }), + }); + unsafe { msg_send_id![super(this), init] } + } + + pub(crate) fn set_peripheral_manager(&self, peripheral: Retained) { + let mut state = self.ivars().state.lock(); + state.peripheral_manager = Some(peripheral); + } + + fn setup_service(&self, peripheral: &CBPeripheralManager) { + log::info!("Setting up BLE service and characteristics"); + + // Create service UUID + let service_uuid = uuid_to_cbuuid(&LIBP2P_SERVICE_UUID); + let service = unsafe { + CBMutableService::initWithType_primary(CBMutableService::alloc(), &service_uuid, true) + }; + + // Create RX characteristic (central writes to this) + // Use ONLY Write property (with response) to trigger didReceiveWriteRequests + // WriteWithoutResponse does NOT trigger callbacks in peripheral mode + let rx_uuid = uuid_to_cbuuid(&RX_CHARACTERISTIC_UUID); + let rx_properties = CBCharacteristicProperties::CBCharacteristicPropertyWrite; + let rx_char = unsafe { + CBMutableCharacteristic::initWithType_properties_value_permissions( + CBMutableCharacteristic::alloc(), + &rx_uuid, + rx_properties, + None, + objc2_core_bluetooth::CBAttributePermissions::Writeable, + ) + }; + + // Create TX characteristic (central subscribes to this) + let tx_uuid = uuid_to_cbuuid(&TX_CHARACTERISTIC_UUID); + let tx_properties = CBCharacteristicProperties::CBCharacteristicPropertyNotify + | CBCharacteristicProperties::CBCharacteristicPropertyRead; + let tx_char = unsafe { + CBMutableCharacteristic::initWithType_properties_value_permissions( + CBMutableCharacteristic::alloc(), + &tx_uuid, + tx_properties, + None, + objc2_core_bluetooth::CBAttributePermissions::Readable, + ) + }; + + // Store characteristics + { + let mut state = self.ivars().state.lock(); + state.tx_characteristic = Some(tx_char.clone()); + state.rx_characteristic = Some(rx_char.clone()); + } + + // Add characteristics to service + unsafe { + // Cast CBMutableCharacteristic to CBCharacteristic for the array + let tx_char_base: Retained = std::mem::transmute(tx_char.clone()); + let rx_char_base: Retained = std::mem::transmute(rx_char.clone()); + let characteristics = NSArray::from_vec(vec![tx_char_base, rx_char_base]); + service.setCharacteristics(Some(&*characteristics)); + + // Add service to peripheral manager + peripheral.addService(&service); + } + + // Start advertising + self.start_advertising(peripheral); + } + + fn start_advertising(&self, peripheral: &CBPeripheralManager) { + log::info!( + "Starting BLE advertising with service UUID: {}", + LIBP2P_SERVICE_UUID + ); + + unsafe { + let service_uuid = uuid_to_cbuuid(&LIBP2P_SERVICE_UUID); + let service_uuids = NSArray::from_vec(vec![service_uuid]); + + // Create a simple advertisement with just the service UUID + // We use msg_send to construct the dictionary manually + let adv_data: Retained> = msg_send_id![ + NSDictionary::alloc(), + initWithObjects: &[&*service_uuids as &objc2::runtime::AnyObject], + forKeys: &[CBAdvertisementDataServiceUUIDsKey as &objc2::runtime::AnyObject], + count: 1usize + ]; + + peripheral.startAdvertising(Some(&*adv_data)); + } + } + + /// Process any pending outgoing data from the channel + fn process_outgoing_channel(&self) -> bool { + // Collect all pending data from the channel first + let mut pending_data = Vec::new(); + { + let mut state = self.ivars().state.lock(); + let Some(outgoing_rx) = state.outgoing_data_rx.as_mut() else { + return false; + }; + + // Drain all available data from the channel without blocking + while let Ok(Some(data)) = outgoing_rx.try_next() { + pending_data.push(data); + } + } + + // Now process and encode the data + if !pending_data.is_empty() { + let mut state = self.ivars().state.lock(); + + for data in pending_data { + log::debug!( + "Processing outgoing data from channel: {} bytes", + data.len() + ); + + // Encode the data into a frame with length prefix + match state.frame_codec.encode(&data) { + Ok(encoded_frame) => { + log::debug!("Queueing encoded frame: {} bytes", encoded_frame.len()); + state.outgoing_queue.push(encoded_frame); + } + Err(e) => { + log::error!("Failed to encode outgoing data: {}", e); + } + } + } + true + } else { + false + } + } + + /// Check for and send any pending outgoing data + /// This should be called periodically to ensure data is sent + pub(crate) fn check_and_send_data(&self) { + // Process any pending data from the channel + self.process_outgoing_channel(); + + // Always try to send queued data, not just when there's new data + // This ensures we retry sending when the peripheral manager becomes ready + let peripheral = { + let state = self.ivars().state.lock(); + state.peripheral_manager.clone() + }; + + if let Some(peripheral) = peripheral { + self.send_queued_data(&peripheral); + } + } + + pub(crate) fn send_queued_data(&self, peripheral: &CBPeripheralManager) { + // First, process any new data from the channel + self.process_outgoing_channel(); + + let mut state = self.ivars().state.lock(); + + if !state.ready_to_send || state.subscribed_centrals.is_empty() { + // Only log if there's actually data waiting to be sent + if !state.outgoing_queue.is_empty() { + log::debug!( + "Cannot send: ready={}, subscribers={}, queue={}", + state.ready_to_send, + state.subscribed_centrals.len(), + state.outgoing_queue.len() + ); + } + return; + } + + let Some(tx_char) = state.tx_characteristic.clone() else { + if !state.outgoing_queue.is_empty() { + log::debug!( + "No TX characteristic available, {} items queued", + state.outgoing_queue.len() + ); + } + return; + }; + + if !state.outgoing_queue.is_empty() { + log::debug!( + "Attempting to send {} queued items", + state.outgoing_queue.len() + ); + } + + while let Some(data) = state.outgoing_queue.first() { + let ns_data = NSData::from_vec(data.clone()); + + let success = unsafe { + peripheral.updateValue_forCharacteristic_onSubscribedCentrals( + &ns_data, &tx_char, None, // Send to all subscribed centrals + ) + }; + + if success { + log::debug!("Sent {} bytes via notification", data.len()); + state.outgoing_queue.remove(0); + } else { + log::debug!( + "Failed to send, queue has {} items", + state.outgoing_queue.len() + ); + state.ready_to_send = false; + break; + } + } + } +} + +/// BLE Peripheral manager for macOS +pub(crate) struct BlePeripheralManager { + /// Keep peripheral alive - needed to maintain the BLE connection + #[allow(dead_code)] + peripheral: Retained, + /// Keep delegate alive - needed to receive callbacks + #[allow(dead_code)] + delegate: Retained, +} + +impl BlePeripheralManager { + /// Create a new BLE peripheral manager + pub(crate) async fn new( + incoming_tx: mpsc::Sender>, + outgoing_rx: mpsc::Receiver>, + ) -> Result, String> { + log::info!("Creating BLE peripheral manager"); + + let delegate = PeripheralManagerDelegate::new(incoming_tx, outgoing_rx); + + // Use dispatch_get_global_queue to get a concurrent queue for CoreBluetooth + // Using nil queue would use the main thread, which doesn't work well with Tokio + let queue: *mut objc2::runtime::AnyObject = unsafe { + use std::ffi::c_long; + extern "C" { + fn dispatch_get_global_queue( + identifier: c_long, + flags: usize, + ) -> *mut objc2::runtime::AnyObject; + } + // QOS_CLASS_USER_INTERACTIVE = 0x21 + dispatch_get_global_queue(0x21, 0) + }; + + let peripheral: Retained = unsafe { + msg_send_id![ + CBPeripheralManager::alloc(), + initWithDelegate: Some(ProtocolObject::::from_ref(&*delegate)), + queue: queue + ] + }; + + // Set the peripheral manager reference in the delegate so it can trigger sends + delegate.set_peripheral_manager(peripheral.clone()); + + let manager = Arc::new(Self { + peripheral, + delegate: delegate.clone(), + }); + + // Set up a GCD timer to periodically check for outgoing data + // This ensures we don't miss data due to timing issues + unsafe { + use std::ffi::c_void; + extern "C" { + fn dispatch_source_create( + type_: *const c_void, + handle: usize, + mask: usize, + queue: *mut objc2::runtime::AnyObject, + ) -> *mut c_void; + fn dispatch_source_set_timer( + source: *mut c_void, + start: u64, + interval: u64, + leeway: u64, + ); + fn dispatch_source_set_event_handler_f( + source: *mut c_void, + handler: extern "C" fn(*mut c_void), + ); + fn dispatch_set_context(object: *mut c_void, context: *mut c_void); + fn dispatch_resume(object: *mut c_void); + fn dispatch_get_global_queue( + identifier: i64, + flags: usize, + ) -> *mut objc2::runtime::AnyObject; + + static _dispatch_source_type_timer: c_void; + } + + // Create a timer on a global queue + let timer_queue = dispatch_get_global_queue(0, 0); + let timer = + dispatch_source_create(&_dispatch_source_type_timer as *const _, 0, 0, timer_queue); + + // Set timer to fire every 10ms + let start = 0u64; // DISPATCH_TIME_NOW + let interval = 10_000_000u64; // 10ms in nanoseconds + let leeway = 1_000_000u64; // 1ms leeway + dispatch_source_set_timer(timer, start, interval, leeway); + + // Store delegate as context + let delegate_ptr = Box::into_raw(Box::new(delegate.clone())) as *mut c_void; + dispatch_set_context(timer, delegate_ptr); + + // Set event handler + extern "C" fn timer_handler(context: *mut c_void) { + unsafe { + let delegate_ptr = context as *const Retained; + if !delegate_ptr.is_null() { + (*delegate_ptr).check_and_send_data(); + } + } + } + dispatch_source_set_event_handler_f(timer, timer_handler); + + // Start the timer + dispatch_resume(timer); + + // Note: We intentionally don't store the timer - it will run for the lifetime of the program + } + + log::info!("Peripheral manager created with outgoing data handling"); + log::info!("Note: Outgoing data is sent reactively and polled every 10ms"); + + Ok(manager) + } +} + +impl Drop for BlePeripheralManager { + fn drop(&mut self) { + log::info!("Dropping BLE peripheral manager"); + // Note: CoreBluetooth will clean up advertising when the peripheral manager is deallocated + } +} + +unsafe impl Send for BlePeripheralManager {} +unsafe impl Sync for BlePeripheralManager {} + +/// Convert a UUID to CBUUID +fn uuid_to_cbuuid(uuid: &Uuid) -> Retained { + let uuid_str = NSString::from_str(&uuid.to_string()); + unsafe { CBUUID::UUIDWithString(&uuid_str) } +} diff --git a/transports/bluetooth/src/platform.rs b/transports/bluetooth/src/platform.rs new file mode 100644 index 00000000000..4f55388377e --- /dev/null +++ b/transports/bluetooth/src/platform.rs @@ -0,0 +1,804 @@ +//! Cross-platform Bluetooth transport implementation using SimpleBLE + CoreBluetooth. +//! +//! This module provides a dual-role BLE transport for libp2p: +//! - SimpleBLE for the central (dialing) role +//! - CoreBluetooth for the peripheral (listening) role on macOS + +use std::{ + collections::{HashMap, VecDeque}, + pin::Pin, + sync::Arc, + task::{Context, Poll, Waker}, + time::Duration, +}; + +use futures::{ + channel::mpsc, + future::{self, Ready}, + prelude::*, + StreamExt, +}; +use libp2p_core::{ + multiaddr::Multiaddr, + transport::{DialOpts, ListenerId, Transport, TransportError, TransportEvent}, +}; +use parking_lot::Mutex; +use rw_stream_sink::RwStreamSink; +use simplersble::{Adapter, Peripheral, ScanEvent}; + +use crate::common::BluetoothTransportError; +use crate::framing::FrameCodec; + +#[cfg(target_os = "macos")] +use crate::peripheral_macos::BlePeripheralManager; + +/// libp2p BLE service UUID - this identifies our service +const LIBP2P_SERVICE_UUID: &str = "00001234-0000-1000-8000-00805f9b34fb"; + +/// Characteristic UUID for RX (receiving data from peer - we read/subscribe) +const RX_CHARACTERISTIC_UUID: &str = "00001235-0000-1000-8000-00805f9b34fb"; + +/// Characteristic UUID for TX (transmitting data to peer - we write) +const TX_CHARACTERISTIC_UUID: &str = "00001236-0000-1000-8000-00805f9b34fb"; + +pub type Channel = RwStreamSink>; + +fn scan_timeout_duration() -> Duration { + const ENV_VAR: &str = "LIBP2P_BLUETOOTH_SCAN_TIMEOUT_SECS"; + match std::env::var(ENV_VAR) { + Ok(value) => match value.parse::() { + Ok(secs) => Duration::from_secs(secs), + Err(err) => { + log::warn!( + "Failed to parse {}='{}' as seconds: {}. Falling back to default.", + ENV_VAR, + value, + err + ); + Duration::from_secs(20) + } + }, + Err(std::env::VarError::NotPresent) => Duration::from_secs(20), + Err(err) => { + log::warn!( + "Could not read {}: {}. Falling back to default.", + ENV_VAR, + err + ); + Duration::from_secs(20) + } + } +} + +fn scan_collection_duration() -> Duration { + const ENV_VAR: &str = "LIBP2P_BLUETOOTH_SCAN_COLLECTION_MS"; + match std::env::var(ENV_VAR) { + Ok(value) => match value.parse::() { + Ok(ms) => Duration::from_millis(ms), + Err(err) => { + log::warn!( + "Failed to parse {}='{}' as milliseconds: {}. Falling back to default.", + ENV_VAR, + value, + err + ); + Duration::from_millis(5_000) + } + }, + Err(std::env::VarError::NotPresent) => Duration::from_millis(5_000), + Err(err) => { + log::warn!( + "Could not read {}: {}. Falling back to default.", + ENV_VAR, + err + ); + Duration::from_millis(5_000) + } + } +} + +/// Channel implementation for BLE connections +pub struct Chan> { + incoming: mpsc::Receiver, + outgoing: mpsc::Sender, +} + +impl Unpin for Chan {} + +impl Stream for Chan { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Stream::poll_next(Pin::new(&mut self.incoming), cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Some(item)) => Poll::Ready(Some(Ok(item))), + Poll::Ready(None) => Poll::Ready(None), + } + } +} + +impl Sink for Chan { + type Error = std::io::Error; + + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::poll_ready(Pin::new(&mut self.outgoing), cx).map_err(map_channel_err) + } + + fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + Sink::start_send(Pin::new(&mut self.outgoing), item).map_err(map_channel_err) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::poll_flush(Pin::new(&mut self.outgoing), cx).map_err(map_channel_err) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Sink::poll_close(Pin::new(&mut self.outgoing), cx).map_err(map_channel_err) + } +} + +fn map_channel_err(error: mpsc::SendError) -> std::io::Error { + if error.is_full() { + std::io::Error::new(std::io::ErrorKind::WouldBlock, error) + } else { + std::io::Error::new(std::io::ErrorKind::BrokenPipe, "channel closed") + } +} + +/// Bluetooth transport using SimpleBLE +pub struct BluetoothTransport { + inner: Arc>, + waker: Arc>>, +} + +struct TransportState { + adapter: Option, + #[cfg(target_os = "macos")] + peripheral_manager: Option>, + listeners: HashMap, + connections: HashMap, +} + +struct Listener { + id: ListenerId, + addr: Multiaddr, + incoming: VecDeque<(Channel>, Multiaddr)>, + peripheral_incoming_rx: Option>>, + peripheral_outgoing_tx: Option>>, + announced: bool, + // Active connection's incoming channel for forwarding peripheral data + active_connection_tx: Option>>, +} + +struct Connection { + _peripheral: Peripheral, + _service_uuid: String, + _tx_char_uuid: String, + _rx_char_uuid: String, +} + +impl BluetoothTransport { + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(TransportState { + adapter: None, + #[cfg(target_os = "macos")] + peripheral_manager: None, + listeners: HashMap::new(), + connections: HashMap::new(), + })), + waker: Arc::new(Mutex::new(None)), + } + } + + /// Initialize the BLE adapter (lazy initialization) + async fn ensure_adapter( + inner: &Arc>, + ) -> Result { + // Check if we already have an adapter + { + let state = inner.lock(); + if let Some(adapter) = state.adapter.as_ref() { + return Ok(adapter.clone()); + } + } + + // Get available adapters + let adapters = Adapter::get_adapters().map_err(|e| { + log::error!("Failed to get BLE adapters: {:?}", e); + BluetoothTransportError::Unsupported + })?; + + if adapters.is_empty() { + log::error!("No BLE adapters found"); + return Err(BluetoothTransportError::Unsupported); + } + + let adapter = adapters.into_iter().next().unwrap(); + log::info!("Initialized BLE adapter"); + + // Store the adapter + { + let mut state = inner.lock(); + state.adapter = Some(adapter.clone()); + } + + Ok(adapter) + } +} + +impl Clone for BluetoothTransport { + fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + waker: Arc::clone(&self.waker), + } + } +} + +impl Transport for BluetoothTransport { + type Output = Channel>; + type Error = BluetoothTransportError; + type ListenerUpgrade = Ready>; + type Dial = Pin> + Send>>; + + fn listen_on( + &mut self, + id: ListenerId, + addr: Multiaddr, + ) -> Result<(), TransportError> { + log::info!("Starting BLE peripheral (listening) on {}", addr); + + let inner = Arc::clone(&self.inner); + let waker = Arc::clone(&self.waker); + + // Spawn task to start peripheral + tokio::spawn(async move { + // Create channels for incoming/outgoing data from peripheral + let (peripheral_incoming_tx, peripheral_incoming_rx) = mpsc::channel(32); + let (peripheral_outgoing_tx, peripheral_outgoing_rx) = mpsc::channel(32); + + // Start peripheral manager on macOS + #[cfg(target_os = "macos")] + { + let should_start = { + let state = inner.lock(); + state.peripheral_manager.is_none() + }; + + if should_start { + match BlePeripheralManager::new( + peripheral_incoming_tx.clone(), + peripheral_outgoing_rx, + ) + .await + { + Ok(peripheral) => { + { + let mut state = inner.lock(); + state.peripheral_manager = Some(peripheral); + log::info!("Started BLE peripheral manager"); + } + + // Give CoreBluetooth time to initialize and start advertising + // The peripheral manager needs its dispatch queue to process callbacks + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + } + Err(e) => { + log::error!("Failed to start peripheral: {:?}", e); + return; + } + } + } + } + + #[cfg(not(target_os = "macos"))] + { + log::warn!("BLE peripheral mode not supported on this platform"); + } + + // Update listener with peripheral channels + let listener = Listener { + id, + addr: addr.clone(), + incoming: VecDeque::new(), + peripheral_incoming_rx: Some(peripheral_incoming_rx), + peripheral_outgoing_tx: Some(peripheral_outgoing_tx), + announced: false, + active_connection_tx: None, + }; + + let mut state = inner.lock(); + state.listeners.insert(id, listener); + drop(state); + + // Wake the transport to announce the new address + if let Some(waker) = waker.lock().as_ref() { + waker.wake_by_ref(); + } + + log::info!("Peripheral listening on {}", addr); + }); + + Ok(()) + } + + fn remove_listener(&mut self, id: ListenerId) -> bool { + let mut state = self.inner.lock(); + state.listeners.remove(&id).is_some() + } + + fn dial( + &mut self, + addr: Multiaddr, + _opts: DialOpts, + ) -> Result> { + let inner = Arc::clone(&self.inner); + + log::info!("Dialing Bluetooth address: {}", addr); + + Ok(Box::pin(async move { + // Ensure we have an adapter + let adapter = Self::ensure_adapter(&inner).await?; + + // Start scanning + log::info!("Starting BLE scan..."); + + adapter.scan_start().map_err(|e| { + log::error!("Failed to start scan: {:?}", e); + BluetoothTransportError::ConnectionFailed + })?; + + // Get scan events stream + let mut scan_stream = adapter.on_scan_event(); + + let scan_timeout = scan_timeout_duration(); + let collection_window = scan_collection_duration(); + + // Wait for peripherals advertising our service + log::info!( + "Scanning for peripherals with service {} (timeout: {:?}, warmup: {:?})...", + LIBP2P_SERVICE_UUID, + scan_timeout, + collection_window + ); + + let timeout_sleep = tokio::time::sleep(scan_timeout); + tokio::pin!(timeout_sleep); + + let mut discovered_peripherals: Vec = Vec::new(); + let mut found_peripheral: Option = None; + + // Collect peripherals for a bit to avoid connecting to ourselves + let collection_sleep = tokio::time::sleep(collection_window); + tokio::pin!(collection_sleep); + + // First, collect all peripherals for the warmup duration + loop { + tokio::select! { + Some(event) = scan_stream.next() => { + match event { + Ok(ScanEvent::Found(peripheral)) => { + let id = peripheral.identifier().unwrap_or_else(|_| "unknown".to_string()); + let address = peripheral.address().unwrap_or_else(|_| "unknown".to_string()); + + log::debug!("Found peripheral - ID: '{}', Address: '{}'", id, address); + discovered_peripherals.push(peripheral); + } + Ok(ScanEvent::Updated(_)) => { + // Ignore updates + } + Ok(ScanEvent::Start) => { + log::debug!("Scan started"); + } + Ok(ScanEvent::Stop) => { + log::debug!("Scan stopped"); + } + Err(e) => { + log::error!("Scan error: {:?}", e); + } + } + } + _ = &mut collection_sleep => { + log::info!("Collected {} peripherals", discovered_peripherals.len()); + break; + } + _ = &mut timeout_sleep => { + adapter.scan_stop().ok(); + log::error!("Scan timeout - no peripherals found"); + return Err(BluetoothTransportError::ConnectionFailed); + } + } + } + + // Try to connect to each peripheral until we find one with our service + for peripheral in discovered_peripherals { + let id = peripheral + .identifier() + .unwrap_or_else(|_| "unknown".to_string()); + let address = peripheral + .address() + .unwrap_or_else(|_| "unknown".to_string()); + + log::info!("Trying peripheral - ID: '{}', Address: '{}'", id, address); + + // Check if peripheral is already connected + if peripheral.is_connected().unwrap_or(false) { + log::warn!("Peripheral is already connected, skipping..."); + continue; + } + + // Try to connect + match peripheral.connect() { + Ok(_) => { + log::info!("Successfully connected to peripheral! Checking services..."); + + // Get services to verify this peripheral has our service + match peripheral.services() { + Ok(services) => { + log::info!( + "Found {} service(s) on peripheral {}", + services.len(), + address + ); + + // Log all services for debugging + for s in &services { + log::info!(" Service on {}: {}", address, s.uuid()); + } + + // Check if this peripheral has our libp2p service + let has_libp2p_service = services.iter().any(|s| { + let uuid = s.uuid().to_lowercase(); + uuid == LIBP2P_SERVICE_UUID.to_lowercase() + }); + + if has_libp2p_service { + log::info!( + "✓ Peripheral {} has the libp2p service {}!", + address, + LIBP2P_SERVICE_UUID + ); + found_peripheral = Some(peripheral); + break; + } else { + log::info!("✗ Peripheral {} doesn't have libp2p service (expected {}), disconnecting...", address, LIBP2P_SERVICE_UUID); + peripheral.disconnect().ok(); + continue; + } + } + Err(e) => { + log::warn!("Failed to get services from peripheral {}: {:?}, disconnecting...", address, e); + peripheral.disconnect().ok(); + continue; + } + } + } + Err(e) => { + log::warn!( + "Failed to connect to peripheral {}: {:?}, trying next...", + address, + e + ); + continue; + } + } + } + + let peripheral = found_peripheral.ok_or_else(|| { + log::error!("Could not find any peripherals with the libp2p service"); + BluetoothTransportError::ConnectionFailed + })?; + + // Stop scanning after successful connection + adapter.scan_stop().ok(); + + log::info!( + "Connected to peripheral with libp2p service! Discovering characteristics..." + ); + + // Get services (we already checked it has our service above) + let services = peripheral.services().map_err(|e| { + log::error!("Failed to get services: {:?}", e); + BluetoothTransportError::ConnectionFailed + })?; + + // Find our service + let service = services + .iter() + .find(|s| { + let uuid = s.uuid().to_lowercase(); + uuid == LIBP2P_SERVICE_UUID.to_lowercase() + }) + .ok_or_else(|| { + log::error!( + "Service {} not found (this shouldn't happen)", + LIBP2P_SERVICE_UUID + ); + BluetoothTransportError::ConnectionFailed + })?; + + log::info!("Found libp2p service"); + + // Find our characteristics + let characteristics = service.characteristics(); + log::info!("Found {} characteristics", characteristics.len()); + + let rx_char = characteristics + .iter() + .find(|c| { + let uuid = c.uuid().to_lowercase(); + uuid == RX_CHARACTERISTIC_UUID.to_lowercase() + }) + .ok_or_else(|| { + log::error!("RX characteristic not found"); + log::info!("Available characteristics:"); + for c in &characteristics { + log::info!(" - {}", c.uuid()); + } + BluetoothTransportError::ConnectionFailed + })?; + + let tx_char = characteristics + .iter() + .find(|c| { + let uuid = c.uuid().to_lowercase(); + uuid == TX_CHARACTERISTIC_UUID.to_lowercase() + }) + .ok_or_else(|| { + log::error!("TX characteristic not found"); + BluetoothTransportError::ConnectionFailed + })?; + + log::info!("Found RX and TX characteristics"); + + // Create channels for this connection + let (in_tx, in_rx) = mpsc::channel::>(32); + let (out_tx, mut out_rx) = mpsc::channel::>(32); + + // Subscribe to notifications on TX characteristic (peripheral transmits, we receive) + let service_uuid = service.uuid(); + let rx_char_uuid = rx_char.uuid(); + let tx_char_uuid = tx_char.uuid(); + + // Clone peripheral for the notification task + let peripheral_for_notify = peripheral.clone(); + let in_tx_clone = in_tx.clone(); + let service_uuid_for_notify = service_uuid.clone(); + let tx_char_uuid_for_notify = tx_char_uuid.clone(); + + tokio::spawn(async move { + match peripheral_for_notify + .notify(&service_uuid_for_notify, &tx_char_uuid_for_notify) + { + Ok(mut notification_stream) => { + let mut frame_codec = FrameCodec::new(); + + while let Some(event) = notification_stream.next().await { + match event { + Ok(simplersble::ValueChangedEvent::ValueUpdated(data)) => { + log::debug!("Received notification: {} bytes", data.len()); + frame_codec.push_data(&data); + + while let Ok(Some(frame)) = frame_codec.decode_next() { + log::debug!("Decoded frame: {} bytes", frame.len()); + let _ = in_tx_clone.clone().try_send(frame); + } + } + Err(e) => { + log::error!("Notification error: {:?}", e); + break; + } + } + } + + log::info!("Notification stream ended"); + } + Err(e) => { + log::error!("Failed to subscribe to notifications: {:?}", e); + } + } + }); + + // Spawn task to handle outgoing writes to RX characteristic (peripheral receives, we write) + let peripheral_for_write = peripheral.clone(); + let service_uuid_for_write = service_uuid.clone(); + let rx_char_uuid_for_write = rx_char_uuid.clone(); + + tokio::spawn(async move { + // Longer delay to ensure connection and services are fully ready + // SimpleBLE write_request needs everything to be stable + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + while let Some(data) = out_rx.next().await { + log::debug!("Sending {} bytes", data.len()); + + // Encode the frame + let encoded = match FrameCodec::new().encode(&data) { + Ok(e) => e, + Err(e) => { + log::error!("Failed to encode frame: {:?}", e); + continue; + } + }; + + // Write the data to RX characteristic (peripheral receives) + // Use write_request (with response) to trigger didReceiveWriteRequests on peripheral + let mut retry_count = 0; + let max_retries = 3; + + loop { + match peripheral_for_write.write_request( + &service_uuid_for_write, + &rx_char_uuid_for_write, + &encoded, + ) { + Ok(_) => { + log::debug!("Wrote {} bytes", encoded.len()); + break; + } + Err(e) => { + retry_count += 1; + if retry_count >= max_retries { + log::error!( + "Failed to write after {} attempts: {:?}", + max_retries, + e + ); + break; + } + log::warn!( + "Write attempt {} failed, retrying: {:?}", + retry_count, + e + ); + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + } + } + } + } + + log::info!("Outgoing write task ended"); + }); + + // Store connection + let peripheral_id = peripheral + .identifier() + .unwrap_or_else(|_| "unknown".to_string()); + { + let mut state = inner.lock(); + state.connections.insert( + peripheral_id.clone(), + Connection { + _peripheral: peripheral.clone(), + _service_uuid: service_uuid, + _tx_char_uuid: tx_char_uuid, + _rx_char_uuid: rx_char_uuid, + }, + ); + } + + log::info!("Connection established to {}", peripheral_id); + + // Create the channel + let channel = Channel::new(Chan { + incoming: in_rx, + outgoing: out_tx, + }); + + Ok(channel) + })) + } + + fn poll( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // Store the waker + *self.waker.lock() = Some(cx.waker().clone()); + + let mut state = self.inner.lock(); + + // Check if any listeners need to announce their address + for listener in state.listeners.values_mut() { + if !listener.announced { + listener.announced = true; + let listen_addr = listener.addr.clone(); + let listener_id = listener.id; + return Poll::Ready(TransportEvent::NewAddress { + listen_addr, + listener_id, + }); + } + + // Check for incoming data from peripheral side + // Process ALL available data, not just one packet + if let Some(ref mut peripheral_rx) = listener.peripheral_incoming_rx { + loop { + match peripheral_rx.poll_next_unpin(cx) { + Poll::Ready(Some(data)) => { + // Check if we have an active connection + if let Some(ref active_tx) = listener.active_connection_tx { + // Forward data to existing connection + log::debug!("Forwarding {} bytes to active connection", data.len()); + if active_tx.clone().try_send(data).is_err() { + log::warn!("Active connection closed, removing it"); + listener.active_connection_tx = None; + } + } else { + // No active connection, this is the first data - create a new connection + log::info!( + "Received incoming peripheral connection with {} bytes", + data.len() + ); + + // Get the outgoing sender for this listener + let outgoing_tx = listener.peripheral_outgoing_tx.clone().unwrap(); + + // Create new channels for libp2p + let (in_tx, in_rx) = mpsc::channel::>(32); + let (out_tx, out_rx) = mpsc::channel::>(32); + + // Forward the first data + let _ = in_tx.clone().try_send(data); + + // Store the connection's incoming sender for future data forwarding + listener.active_connection_tx = Some(in_tx); + + // Spawn task to forward outgoing data from connection to peripheral + let mut out_rx_stream = out_rx; + tokio::spawn(async move { + while let Some(data) = out_rx_stream.next().await { + let _ = outgoing_tx.clone().try_send(data); + } + log::info!("Peripheral outgoing data forwarding task ended"); + }); + + // Create the channel + let channel = Channel::new(Chan { + incoming: in_rx, + outgoing: out_tx, + }); + + // Create a fake send_back_addr (peripheral connections don't have addresses) + let send_back_addr = listener.addr.clone(); + + listener.incoming.push_back((channel, send_back_addr)); + + // After creating connection, continue to process any buffered data + continue; + } + } + Poll::Ready(None) => { + // Peripheral channel closed + log::warn!("Peripheral incoming channel closed"); + break; + } + Poll::Pending => { + // No more data available right now + break; + } + } + } + } + + // Check for queued incoming connections + if let Some((channel, send_back_addr)) = listener.incoming.pop_front() { + return Poll::Ready(TransportEvent::Incoming { + listener_id: listener.id, + upgrade: future::ready(Ok(channel)), + local_addr: listener.addr.clone(), + send_back_addr, + }); + } + } + + Poll::Pending + } +} + +impl Default for BluetoothTransport { + fn default() -> Self { + Self::new() + } +}