From 6c35ced236fe7e319802c6cf37ba1a0fb6b5c28d Mon Sep 17 00:00:00 2001 From: userFRM Date: Tue, 17 Mar 2026 16:17:54 +0100 Subject: [PATCH] perf: 10 constraint-anchored optimizations across Rust core, plugin, and TS client MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Rust core (conduit-core): - Replace VecDeque> with preformatted wire buffer in RingBuffer and Queue — drain_all is now a single memcpy instead of N×2 extend_from_slice calls (estimated 40-80x speedup for streaming drain) - Add Bytes newtype with bulk encode/decode for Vec payloads - Add MIN_SIZE associated constant to Decode trait with upfront bounds check in derive macro, reducing redundant per-field checks - Add overflow guards: u32 truncation on frame length, frame_count u32::MAX, checked_add for frame_cost on 32-bit targets, Bytes::decode checked_add Plugin (tauri-plugin-conduit): - Eliminate 3 allocations per invoke: use URI path() directly, borrow invoke key header as &str, return Cow from percent_decode - Replace double-spawn async pattern with single spawn + catch_unwind via futures-util FutureExt - Pre-cache Arc in PluginState to avoid per-request Arc allocation TypeScript client: - parseDrainBlob returns zero-copy Uint8Array views instead of copies - Add WireWriter builder class for single-allocation multi-field encoding - Pass JSON.stringify result directly to fetch, skip TextEncoder pass - Use AbortSignal.timeout() for drain, replacing AbortController + setTimeout Reviewed by Codex CLI (gpt-5.4) over 5 rounds — all findings addressed. All 138 Rust tests and 35 TypeScript tests pass. Zero clippy warnings. Co-Authored-By: Claude Opus 4.6 (1M context) --- crates/conduit-core/src/codec.rs | 145 ++++++++++++++ crates/conduit-core/src/lib.rs | 2 +- crates/conduit-core/src/queue.rs | 112 ++++++++--- crates/conduit-core/src/ringbuf.rs | 131 ++++++++---- crates/conduit-derive/src/lib.rs | 18 ++ crates/conduit-derive/tests/derive_tests.rs | 25 +++ crates/tauri-plugin-conduit/Cargo.lock | 1 + crates/tauri-plugin-conduit/Cargo.toml | 1 + crates/tauri-plugin-conduit/src/lib.rs | 64 +++--- .../tauri-plugin-conduit/src/codec/wire.ts | 187 +++++++++++++++++- packages/tauri-plugin-conduit/src/index.ts | 10 +- .../src/transport/protocol.ts | 9 +- 12 files changed, 595 insertions(+), 110 deletions(-) diff --git a/crates/conduit-core/src/codec.rs b/crates/conduit-core/src/codec.rs index fde9153..034eeb4 100644 --- a/crates/conduit-core/src/codec.rs +++ b/crates/conduit-core/src/codec.rs @@ -199,6 +199,13 @@ pub trait Encode { /// Returns the decoded value together with the number of bytes consumed, /// or `None` if the data is too short or malformed. pub trait Decode: Sized { + /// Minimum number of bytes required to attempt decoding this type. + /// + /// For fixed-size types (primitives), this equals the exact encoded size. + /// For variable-size types (String, Vec), this is the minimum (the length + /// prefix size). Used by derived impls for an upfront bounds check. + const MIN_SIZE: usize = 0; + /// Attempt to decode from the start of `data`. fn decode(data: &[u8]) -> Option<(Self, usize)>; } @@ -221,6 +228,8 @@ macro_rules! impl_wire_int { } impl Decode for $ty { + const MIN_SIZE: usize = std::mem::size_of::<$ty>(); + fn decode(data: &[u8]) -> Option<(Self, usize)> { const SIZE: usize = std::mem::size_of::<$ty>(); if data.len() < SIZE { @@ -248,6 +257,8 @@ impl Encode for bool { } impl Decode for bool { + const MIN_SIZE: usize = 1; + fn decode(data: &[u8]) -> Option<(Self, usize)> { if data.is_empty() { return None; @@ -283,6 +294,8 @@ impl Encode for Vec { } impl Decode for Vec { + const MIN_SIZE: usize = 4; + fn decode(data: &[u8]) -> Option<(Self, usize)> { if data.len() < 4 { return None; @@ -318,6 +331,8 @@ impl Encode for String { } impl Decode for String { + const MIN_SIZE: usize = 4; + fn decode(data: &[u8]) -> Option<(Self, usize)> { if data.len() < 4 { return None; @@ -333,6 +348,78 @@ impl Decode for String { } } +// --------------------------------------------------------------------------- +// Bytes: optimized Vec wrapper with bulk encode/decode +// --------------------------------------------------------------------------- + +/// A newtype wrapper around `Vec` with optimized binary Encode/Decode. +/// +/// Unlike `Vec` which goes through the generic `Vec` impl (decoding +/// each byte individually), `Bytes` uses a single bulk copy for both encoding +/// and decoding. +/// +/// The wire format is identical to `Vec`: `[u32 LE count][bytes...]`. +#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)] +pub struct Bytes(pub Vec); + +impl From> for Bytes { + fn from(v: Vec) -> Self { + Self(v) + } +} + +impl From for Vec { + fn from(b: Bytes) -> Self { + b.0 + } +} + +impl std::ops::Deref for Bytes { + type Target = [u8]; + fn deref(&self) -> &[u8] { + &self.0 + } +} + +impl AsRef<[u8]> for Bytes { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} + +impl Encode for Bytes { + fn encode(&self, buf: &mut Vec) { + let count: u32 = self.0.len().try_into().unwrap_or_else(|_| { + panic!( + "conduit: bytes too large ({} bytes exceeds u32::MAX)", + self.0.len() + ) + }); + buf.extend_from_slice(&count.to_le_bytes()); + buf.extend_from_slice(&self.0); + } + + fn encode_size(&self) -> usize { + 4 + self.0.len() + } +} + +impl Decode for Bytes { + const MIN_SIZE: usize = 4; + + fn decode(data: &[u8]) -> Option<(Self, usize)> { + if data.len() < 4 { + return None; + } + let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize; + let total = 4usize.checked_add(count)?; + if data.len() < total { + return None; + } + Some((Bytes(data[4..total].to_vec()), total)) + } +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -447,4 +534,62 @@ mod tests { assert_eq!(decoded, original); assert_eq!(consumed, 4 + original.len()); } + + #[test] + fn encode_decode_bytes() { + let original = Bytes(vec![10, 20, 30, 40, 50]); + let mut buf = Vec::new(); + original.encode(&mut buf); + assert_eq!(original.encode_size(), buf.len()); + let (decoded, consumed) = Bytes::decode(&buf).unwrap(); + assert_eq!(decoded, original); + assert_eq!(consumed, buf.len()); + } + + #[test] + fn bytes_empty() { + let original = Bytes(Vec::new()); + let mut buf = Vec::new(); + original.encode(&mut buf); + assert_eq!(buf.len(), 4); // just the count + let (decoded, consumed) = Bytes::decode(&buf).unwrap(); + assert_eq!(decoded.0.len(), 0); + assert_eq!(consumed, 4); + } + + #[test] + fn bytes_wire_compatible_with_vec_u8() { + // Verify Bytes and Vec produce identical wire format + let data: Vec = vec![1, 2, 3, 4, 5]; + let bytes = Bytes(data.clone()); + + let mut buf_vec = Vec::new(); + data.encode(&mut buf_vec); + + let mut buf_bytes = Vec::new(); + bytes.encode(&mut buf_bytes); + + assert_eq!( + buf_vec, buf_bytes, + "Bytes and Vec must produce identical wire format" + ); + } + + #[test] + fn min_size_primitives() { + assert_eq!(::MIN_SIZE, 1); + assert_eq!(::MIN_SIZE, 2); + assert_eq!(::MIN_SIZE, 4); + assert_eq!(::MIN_SIZE, 8); + assert_eq!(::MIN_SIZE, 1); + assert_eq!(::MIN_SIZE, 2); + assert_eq!(::MIN_SIZE, 4); + assert_eq!(::MIN_SIZE, 8); + assert_eq!(::MIN_SIZE, 4); + assert_eq!(::MIN_SIZE, 8); + assert_eq!(::MIN_SIZE, 1); + assert_eq!(::MIN_SIZE, 4); + assert_eq!( as Decode>::MIN_SIZE, 4); + assert_eq!(::MIN_SIZE, 4); + } } diff --git a/crates/conduit-core/src/lib.rs b/crates/conduit-core/src/lib.rs index be79483..9cceb87 100644 --- a/crates/conduit-core/src/lib.rs +++ b/crates/conduit-core/src/lib.rs @@ -25,7 +25,7 @@ pub mod router; pub use channel::ChannelBuffer; pub use codec::{ - DRAIN_FRAME_OVERHEAD, Decode, Encode, FRAME_HEADER_SIZE, FrameHeader, MsgType, + Bytes, DRAIN_FRAME_OVERHEAD, Decode, Encode, FRAME_HEADER_SIZE, FrameHeader, MsgType, PROTOCOL_VERSION, frame_pack, frame_unpack, }; pub use error::Error; diff --git a/crates/conduit-core/src/queue.rs b/crates/conduit-core/src/queue.rs index 8b006ae..aecf8f8 100644 --- a/crates/conduit-core/src/queue.rs +++ b/crates/conduit-core/src/queue.rs @@ -17,7 +17,6 @@ //! ... //! ``` -use std::collections::VecDeque; use std::sync::Mutex; use crate::Error; @@ -28,10 +27,18 @@ use crate::codec::DRAIN_FRAME_OVERHEAD; // --------------------------------------------------------------------------- /// The unsynchronized interior of the queue. +/// +/// Frames are stored pre-formatted in wire layout: `[u32 LE len][bytes]` per +/// frame, so that `drain_all()` can emit the entire payload with a single +/// memcpy instead of N×2 `extend_from_slice` calls. struct QueueInner { - /// Buffered frames in FIFO order. - frames: VecDeque>, - /// Total bytes used: sum of (DRAIN_FRAME_OVERHEAD + frame.len()) for each frame. + /// Pre-formatted wire data: frames stored as [u32 LE len][bytes][u32 LE len][bytes]... + wire_data: Vec, + /// Number of frames currently stored. + frame_count: u32, + /// Start of live data in wire_data (frames before this offset have been popped). + read_pos: usize, + /// Total bytes used for capacity accounting: sum of (DRAIN_FRAME_OVERHEAD + frame.len()). bytes_used: usize, /// Maximum byte budget. `0` means unbounded. max_bytes: usize, @@ -41,7 +48,9 @@ impl QueueInner { /// Create an empty inner buffer with the given byte budget. fn new(max_bytes: usize) -> Self { Self { - frames: VecDeque::new(), + wire_data: Vec::new(), + frame_count: 0, + read_pos: 0, bytes_used: 0, max_bytes, } @@ -112,6 +121,14 @@ impl Queue { /// (plus its 4-byte length prefix) would exceed `max_bytes`. When /// `max_bytes` is `0` (unbounded), pushes always succeed. pub fn push(&self, frame: &[u8]) -> Result<(), Error> { + // Guard: frame length must fit in u32 (wire format invariant) and + // frame_cost must not overflow usize (relevant on 32-bit targets). + if frame.len() > u32::MAX as usize + || DRAIN_FRAME_OVERHEAD.checked_add(frame.len()).is_none() + { + return Err(Error::PayloadTooLarge(frame.len())); + } + let cost = QueueInner::frame_cost(frame); let mut inner = crate::lock_or_recover(&self.inner); @@ -119,7 +136,17 @@ impl Queue { return Err(Error::ChannelFull); } - inner.frames.push_back(frame.to_vec()); + // Guard: frame count must fit in u32 (wire format uses u32 count header). + if inner.frame_count == u32::MAX { + return Err(Error::ChannelFull); + } + + // Append frame in wire format: [u32 LE len][bytes]. + inner + .wire_data + .extend_from_slice(&(frame.len() as u32).to_le_bytes()); + inner.wire_data.extend_from_slice(frame); + inner.frame_count += 1; inner.bytes_used = inner.bytes_used.saturating_add(cost); Ok(()) } @@ -130,8 +157,34 @@ impl Queue { #[must_use] pub fn try_pop(&self) -> Option> { let mut inner = crate::lock_or_recover(&self.inner); - let frame = inner.frames.pop_front()?; - inner.bytes_used -= QueueInner::frame_cost(&frame); + if inner.frame_count == 0 { + return None; + } + let len_bytes: [u8; 4] = inner.wire_data[inner.read_pos..inner.read_pos + 4] + .try_into() + .unwrap(); + let payload_len = u32::from_le_bytes(len_bytes) as usize; + let payload_start = inner.read_pos + 4; + let frame = inner.wire_data[payload_start..payload_start + payload_len].to_vec(); + let cost = DRAIN_FRAME_OVERHEAD + payload_len; + inner.read_pos += cost; + inner.frame_count -= 1; + inner.bytes_used -= cost; + + // Compact: when empty, just reset; otherwise shift when read_pos + // exceeds half the allocation to prevent unbounded growth during + // steady pop/push workloads. + if inner.frame_count == 0 { + inner.wire_data.clear(); + inner.read_pos = 0; + } else if inner.read_pos > inner.wire_data.len() / 2 { + let rp = inner.read_pos; + inner.wire_data.copy_within(rp.., 0); + let new_len = inner.wire_data.len() - rp; + inner.wire_data.truncate(new_len); + inner.read_pos = 0; + } + Some(frame) } @@ -149,39 +202,34 @@ impl Queue { /// Returns an empty `Vec` if the queue is empty. #[must_use] pub fn drain_all(&self) -> Vec { - // Swap the frames out under the lock, then serialize without contention. - let (mut frames, bytes_used) = { + // Take the pre-formatted wire data out under the lock, then prepend + // the frame count header without contention. + let (wire_data, read_pos, frame_count) = { let mut inner = crate::lock_or_recover(&self.inner); - if inner.frames.is_empty() { + if inner.frame_count == 0 { return Vec::new(); } - let frames = std::mem::take(&mut inner.frames); - let bytes_used = inner.bytes_used; + let wire_data = std::mem::take(&mut inner.wire_data); + let read_pos = inner.read_pos; + let frame_count = inner.frame_count; + inner.read_pos = 0; + inner.frame_count = 0; inner.bytes_used = 0; - (frames, bytes_used) + (wire_data, read_pos, frame_count) }; - // Lock released — serialize without holding the mutex. - let output_size = 4usize.saturating_add(bytes_used); + // Lock released — build output with TWO extend_from_slice calls (was N×2). + let live_data = &wire_data[read_pos..]; + let output_size = 4 + live_data.len(); let mut buf = Vec::with_capacity(output_size); - - // Frame count header. - let count = frames.len() as u32; - buf.extend_from_slice(&count.to_le_bytes()); - - // Each frame: [u32 LE len][bytes]. - for frame in frames.make_contiguous() { - let len = frame.len() as u32; - buf.extend_from_slice(&len.to_le_bytes()); - buf.extend_from_slice(frame); - } - + buf.extend_from_slice(&frame_count.to_le_bytes()); + buf.extend_from_slice(live_data); buf } /// Number of frames currently queued. #[must_use] pub fn frame_count(&self) -> usize { - crate::lock_or_recover(&self.inner).frames.len() + crate::lock_or_recover(&self.inner).frame_count as usize } /// Number of bytes currently used (including per-frame length prefixes). @@ -199,7 +247,9 @@ impl Queue { /// Clear all queued frames. pub fn clear(&self) { let mut inner = crate::lock_or_recover(&self.inner); - inner.frames.clear(); + inner.wire_data.clear(); + inner.frame_count = 0; + inner.read_pos = 0; inner.bytes_used = 0; } } @@ -208,7 +258,7 @@ impl std::fmt::Debug for Queue { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let inner = crate::lock_or_recover(&self.inner); f.debug_struct("Queue") - .field("frame_count", &inner.frames.len()) + .field("frame_count", &inner.frame_count) .field("bytes_used", &inner.bytes_used) .field("max_bytes", &inner.max_bytes) .finish() diff --git a/crates/conduit-core/src/ringbuf.rs b/crates/conduit-core/src/ringbuf.rs index c3dabc4..c6d2a1d 100644 --- a/crates/conduit-core/src/ringbuf.rs +++ b/crates/conduit-core/src/ringbuf.rs @@ -22,7 +22,6 @@ //! ... //! ``` -use std::collections::VecDeque; use std::sync::Mutex; use crate::codec::DRAIN_FRAME_OVERHEAD; @@ -39,10 +38,18 @@ const DEFAULT_CAPACITY: usize = 64 * 1024; // --------------------------------------------------------------------------- /// The unsynchronized interior of the ring buffer. +/// +/// Frames are stored pre-formatted in wire layout: `[u32 LE len][bytes]` per +/// frame, so that `drain_all()` can emit the entire payload with a single +/// memcpy instead of N×2 `extend_from_slice` calls. struct Inner { - /// Buffered frames in FIFO order. - frames: VecDeque>, - /// Total bytes used: sum of (DRAIN_FRAME_OVERHEAD + frame.len()) for each frame. + /// Pre-formatted wire data: frames stored as [u32 LE len][bytes][u32 LE len][bytes]... + wire_data: Vec, + /// Number of frames currently stored. + frame_count: u32, + /// Start of live data in wire_data (frames before this offset have been evicted). + read_pos: usize, + /// Total bytes used for capacity accounting: sum of (DRAIN_FRAME_OVERHEAD + frame.len()). bytes_used: usize, /// Maximum byte budget. capacity: usize, @@ -52,7 +59,9 @@ impl Inner { /// Create an empty inner buffer with the given byte budget. fn new(capacity: usize) -> Self { Self { - frames: VecDeque::new(), + wire_data: Vec::new(), + frame_count: 0, + read_pos: 0, bytes_used: 0, capacity, } @@ -64,15 +73,21 @@ impl Inner { DRAIN_FRAME_OVERHEAD + frame.len() } - /// Drop the oldest frame, adjusting the byte counter. Returns `true` if + /// Drop the oldest frame by advancing `read_pos`. Returns `true` if /// a frame was actually removed. fn drop_oldest(&mut self) -> bool { - if let Some(old) = self.frames.pop_front() { - self.bytes_used -= Self::frame_cost(&old); - true - } else { - false + if self.frame_count == 0 { + return false; } + let len_bytes: [u8; 4] = self.wire_data[self.read_pos..self.read_pos + 4] + .try_into() + .unwrap(); + let payload_len = u32::from_le_bytes(len_bytes) as usize; + let cost = DRAIN_FRAME_OVERHEAD + payload_len; + self.read_pos += cost; + self.frame_count -= 1; + self.bytes_used -= cost; + true } } @@ -157,6 +172,14 @@ impl RingBuffer { /// the frame can never fit, instead of silently returning `0`. #[must_use] pub fn push_checked(&self, frame: &[u8]) -> PushOutcome { + // Guard: frame length must fit in u32 (wire format invariant) and + // frame_cost must not overflow usize (relevant on 32-bit targets). + if frame.len() > u32::MAX as usize + || DRAIN_FRAME_OVERHEAD.checked_add(frame.len()).is_none() + { + return PushOutcome::TooLarge; + } + let cost = Inner::frame_cost(frame); let mut inner = crate::lock_or_recover(&self.inner); @@ -173,7 +196,26 @@ impl RingBuffer { dropped += 1; } - inner.frames.push_back(frame.to_vec()); + // Compact if read_pos is more than half the allocated buffer. + if inner.read_pos > 0 && inner.read_pos > inner.wire_data.len() / 2 { + let rp = inner.read_pos; + inner.wire_data.copy_within(rp.., 0); + let new_len = inner.wire_data.len() - rp; + inner.wire_data.truncate(new_len); + inner.read_pos = 0; + } + + // Guard: frame count must fit in u32 (wire format uses u32 count header). + if inner.frame_count == u32::MAX { + return PushOutcome::TooLarge; + } + + // Append frame in wire format: [u32 LE len][bytes]. + inner + .wire_data + .extend_from_slice(&(frame.len() as u32).to_le_bytes()); + inner.wire_data.extend_from_slice(frame); + inner.frame_count += 1; inner.bytes_used += cost; PushOutcome::Accepted(dropped) } @@ -193,32 +235,27 @@ impl RingBuffer { /// Returns an empty `Vec` if the buffer is empty. #[must_use] pub fn drain_all(&self) -> Vec { - // Swap the frames out under the lock, then serialize without contention. - let (mut frames, bytes_used) = { + // Take the pre-formatted wire data out under the lock, then prepend + // the frame count header without contention. + let (wire_data, read_pos, frame_count) = { let mut inner = crate::lock_or_recover(&self.inner); - if inner.frames.is_empty() { + if inner.frame_count == 0 { return Vec::new(); } - let frames = std::mem::take(&mut inner.frames); - let bytes_used = inner.bytes_used; + let wire_data = std::mem::take(&mut inner.wire_data); + let read_pos = inner.read_pos; + let frame_count = inner.frame_count; + inner.read_pos = 0; + inner.frame_count = 0; inner.bytes_used = 0; - (frames, bytes_used) + (wire_data, read_pos, frame_count) }; - // Lock released — serialize without holding the mutex. - let output_size = 4usize.saturating_add(bytes_used); + // Lock released — build output with TWO extend_from_slice calls (was N×2). + let live_data = &wire_data[read_pos..]; + let output_size = 4 + live_data.len(); let mut buf = Vec::with_capacity(output_size); - - // Frame count header. - let count = frames.len() as u32; - buf.extend_from_slice(&count.to_le_bytes()); - - // Each frame: [u32 LE len][bytes]. - for frame in frames.make_contiguous() { - let len = frame.len() as u32; - buf.extend_from_slice(&len.to_le_bytes()); - buf.extend_from_slice(frame); - } - + buf.extend_from_slice(&frame_count.to_le_bytes()); + buf.extend_from_slice(live_data); buf } @@ -228,15 +265,33 @@ impl RingBuffer { #[must_use] pub fn try_pop(&self) -> Option> { let mut inner = crate::lock_or_recover(&self.inner); - let frame = inner.frames.pop_front()?; - inner.bytes_used -= Inner::frame_cost(&frame); + if inner.frame_count == 0 { + return None; + } + let len_bytes: [u8; 4] = inner.wire_data[inner.read_pos..inner.read_pos + 4] + .try_into() + .unwrap(); + let payload_len = u32::from_le_bytes(len_bytes) as usize; + let payload_start = inner.read_pos + 4; + let frame = inner.wire_data[payload_start..payload_start + payload_len].to_vec(); + let cost = DRAIN_FRAME_OVERHEAD + payload_len; + inner.read_pos += cost; + inner.frame_count -= 1; + inner.bytes_used -= cost; + + // Compact when empty. + if inner.frame_count == 0 { + inner.wire_data.clear(); + inner.read_pos = 0; + } + Some(frame) } /// Number of frames currently buffered. #[must_use] pub fn frame_count(&self) -> usize { - crate::lock_or_recover(&self.inner).frames.len() + crate::lock_or_recover(&self.inner).frame_count as usize } /// Number of bytes currently used (including per-frame length prefixes). @@ -254,7 +309,9 @@ impl RingBuffer { /// Clear all buffered frames. pub fn clear(&self) { let mut inner = crate::lock_or_recover(&self.inner); - inner.frames.clear(); + inner.wire_data.clear(); + inner.frame_count = 0; + inner.read_pos = 0; inner.bytes_used = 0; } } @@ -263,7 +320,7 @@ impl std::fmt::Debug for RingBuffer { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let inner = crate::lock_or_recover(&self.inner); f.debug_struct("RingBuffer") - .field("frame_count", &inner.frames.len()) + .field("frame_count", &inner.frame_count) .field("bytes_used", &inner.bytes_used) .field("capacity", &inner.capacity) .finish() diff --git a/crates/conduit-derive/src/lib.rs b/crates/conduit-derive/src/lib.rs index 763646e..1112431 100644 --- a/crates/conduit-derive/src/lib.rs +++ b/crates/conduit-derive/src/lib.rs @@ -151,11 +151,16 @@ fn impl_encode(input: &DeriveInput) -> syn::Result { /// Generate the `Decode` impl: decodes each named field in declaration /// order, tracking the cumulative byte offset through the input slice. +/// +/// Also emits a `MIN_SIZE` constant (sum of each field's `MIN_SIZE`) and +/// an upfront bounds check that short-circuits before any per-field work. fn impl_decode(input: &DeriveInput) -> syn::Result { reject_generics(input)?; let name = &input.ident; let fields = named_fields(input)?; + let field_types: Vec<_> = fields.named.iter().map(|f| &f.ty).collect(); + let decode_stmts: Vec<_> = fields .named .iter() @@ -177,9 +182,22 @@ fn impl_decode(input: &DeriveInput) -> syn::Result { .map(|f| f.ident.as_ref().unwrap()) .collect(); + // Build MIN_SIZE as sum of field MIN_SIZEs + let min_size_expr = if field_types.is_empty() { + quote! { 0 } + } else { + let tys = &field_types; + quote! { 0 #(+ <#tys as ::conduit_core::Decode>::MIN_SIZE)* } + }; + Ok(quote! { impl ::conduit_core::Decode for #name { + const MIN_SIZE: usize = #min_size_expr; + fn decode(__cdec_src__: &[u8]) -> Option<(Self, usize)> { + if __cdec_src__.len() < Self::MIN_SIZE { + return None; + } let mut __cdec_off__ = 0usize; #(#decode_stmts)* Some((Self { #(#field_names),* }, __cdec_off__)) diff --git a/crates/conduit-derive/tests/derive_tests.rs b/crates/conduit-derive/tests/derive_tests.rs index 4deaba4..759fb39 100644 --- a/crates/conduit-derive/tests/derive_tests.rs +++ b/crates/conduit-derive/tests/derive_tests.rs @@ -734,3 +734,28 @@ async fn original_async_result_function_preserved() { "division by zero" ); } + +// --------------------------------------------------------------------------- +// 17. MIN_SIZE on derived structs +// --------------------------------------------------------------------------- + +#[test] +fn min_size_derived_structs() { + // SimplePrimitives: u8(1) + u32(4) + i64(8) + f64(8) + bool(1) = 22 + assert_eq!(::MIN_SIZE, 22); + + // VarLength: Vec prefix(4) + String prefix(4) = 8 + assert_eq!(::MIN_SIZE, 8); + + // Empty: 0 + assert_eq!(::MIN_SIZE, 0); + + // SingleField: u32(4) = 4 + assert_eq!(::MIN_SIZE, 4); + + // Alpha: u16(2) + u16(2) = 4 + assert_eq!(::MIN_SIZE, 4); + + // Beta: bool(1) + String prefix(4) = 5 + assert_eq!(::MIN_SIZE, 5); +} diff --git a/crates/tauri-plugin-conduit/Cargo.lock b/crates/tauri-plugin-conduit/Cargo.lock index 11642f1..07a88d7 100644 --- a/crates/tauri-plugin-conduit/Cargo.lock +++ b/crates/tauri-plugin-conduit/Cargo.lock @@ -3558,6 +3558,7 @@ version = "2.0.0" dependencies = [ "conduit-core", "conduit-derive", + "futures-util", "getrandom 0.3.4", "http", "serde", diff --git a/crates/tauri-plugin-conduit/Cargo.toml b/crates/tauri-plugin-conduit/Cargo.toml index bb04c50..2d5f576 100644 --- a/crates/tauri-plugin-conduit/Cargo.toml +++ b/crates/tauri-plugin-conduit/Cargo.toml @@ -21,6 +21,7 @@ serde = { version = "1", features = ["derive"] } getrandom = "0.3" subtle = "2" http = "1" +futures-util = "0.3" [build-dependencies] tauri-plugin = { version = "2", features = ["build"] } diff --git a/crates/tauri-plugin-conduit/src/lib.rs b/crates/tauri-plugin-conduit/src/lib.rs index 6dc4d61..3889e4e 100644 --- a/crates/tauri-plugin-conduit/src/lib.rs +++ b/crates/tauri-plugin-conduit/src/lib.rs @@ -69,6 +69,7 @@ use std::sync::Arc; use conduit_core::{ ChannelBuffer, ConduitHandler, Decode, Encode, HandlerResponse, Queue, RingBuffer, Router, }; +use futures_util::FutureExt; use subtle::ConstantTimeEq; use tauri::plugin::{Builder as TauriPluginBuilder, TauriPlugin}; use tauri::{AppHandle, Emitter, Manager, Runtime}; @@ -162,6 +163,8 @@ pub struct PluginState { channels: HashMap>, /// Tauri app handle for emitting events to the frontend. app_handle: AppHandle, + /// Pre-cached `Arc` of the app handle — avoids a heap allocation per request. + app_handle_arc: Arc>, /// Per-launch invoke key (hex-encoded, 64 hex chars = 32 bytes). invoke_key: String, /// Raw invoke key bytes for constant-time comparison. @@ -643,19 +646,8 @@ impl PluginBuilder { // Extract the managed PluginState from the app handle. let state: tauri::State<'_, PluginState> = ctx.app_handle().state(); - let url = request.uri().to_string(); - - // Extract path from URL: conduit://localhost/{action}/{target} - // Use simple string splitting instead of full URL parsing — - // the format is fixed and under our control. - let path = url - .find("://") - .and_then(|i| url[i + 3..].find('/')) - .map(|i| { - let host_end = url.find("://").unwrap() + 3; - &url[host_end + i..] - }) - .unwrap_or("/"); + // Extract path directly from the URI — zero allocation. + let path = request.uri().path(); let segments: Vec<&str> = path.trim_start_matches('/').splitn(2, '/').collect(); if segments.len() != 2 { @@ -667,9 +659,10 @@ impl PluginBuilder { } // Validate the invoke key from the X-Conduit-Key header. + // Borrow the header value directly — no allocation needed. let key = match request.headers().get("X-Conduit-Key") { Some(v) => match v.to_str() { - Ok(s) => s.to_string(), + Ok(s) => s, Err(_) => { responder .respond(make_error_response(401, "invalid invoke key header")); @@ -682,7 +675,7 @@ impl PluginBuilder { } }; - if !state.validate_invoke_key(&key) { + if !state.validate_invoke_key(key) { responder.respond(make_error_response(403, "invalid invoke key")); return; } @@ -702,9 +695,8 @@ impl PluginBuilder { let body = request.body().to_vec(); // 1) Check #[command]-generated handlers first (sync or async) - if let Some(handler) = state.handlers.get(&target) { + if let Some(handler) = state.handlers.get(&*target) { let handler = Arc::clone(handler); - let app_handle = ctx.app_handle().clone(); // Extract webview label from X-Conduit-Webview header (sent by JS client). // NOTE: This header is client-provided and could be spoofed by JS // running in the same webview. We validate the format to prevent @@ -723,8 +715,12 @@ impl PluginBuilder { }) }) .map(|s| s.to_string()); + // Clone the pre-cached Arc and coerce to trait object — + // one atomic increment, no heap allocation. + let app_handle_arc: Arc = + state.app_handle_arc.clone(); let handler_ctx = conduit_core::HandlerContext::new( - Arc::new(app_handle), + app_handle_arc, webview_label, ); let ctx_any: Arc = @@ -756,12 +752,12 @@ impl PluginBuilder { } Ok(HandlerResponse::Async(future)) => { // Truly async — spawned on tokio, just like #[tauri::command]. - // Inner spawn provides panic isolation: if the future panics - // during execution, the JoinHandle catches it and we respond - // with a 500 instead of leaving the request hanging. + // Single spawn with catch_unwind for panic isolation. tauri::async_runtime::spawn(async move { - let handle = tauri::async_runtime::spawn(future); - match handle.await { + let result = std::panic::AssertUnwindSafe(future) + .catch_unwind() + .await; + match result { Ok(Ok(bytes)) => { responder.respond(make_response( 200, @@ -794,17 +790,18 @@ impl PluginBuilder { } else { // 2) Fall back to legacy sync Router let dispatch = Arc::clone(&state.dispatch); - let app_handle = ctx.app_handle().clone(); + // Use the app_handle reference from state — no clone needed. + let app_handle_ref = &state.app_handle; // SAFETY: AssertUnwindSafe is used here because: // - `body` is a Vec (unwind-safe by itself) // - `dispatch` is an Arc (unwind-safe) - // - `app_handle` is a cloned AppHandle (unwind-safe) + // - `app_handle_ref` borrows from Tauri state (unwind-safe) // - conduit's own locks use poison-recovery helpers (lock_or_recover) // - User-defined handler state may be left inconsistent after panic, // but this is inherent to catch_unwind and documented as a limitation. let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { - dispatch.call_with_context(&target, body, &app_handle) + dispatch.call_with_context(&target, body, app_handle_ref) })); match result { Ok(Ok(bytes)) => { @@ -893,12 +890,14 @@ impl PluginBuilder { // Obtain the app handle for emitting events. let app_handle = app.app_handle().clone(); + let app_handle_arc = Arc::new(app_handle.clone()); let state = PluginState { dispatch, handlers, channels, app_handle, + app_handle_arc, invoke_key, invoke_key_bytes, }; @@ -1007,10 +1006,13 @@ fn sanitize_error(e: &conduit_core::Error) -> String { /// Percent-decode a URL path segment (e.g., `hello%20world` → `hello world`). /// -/// This is a minimal implementation — no new dependency needed. Only `%XX` -/// sequences with valid hex digits are decoded; all other bytes pass through -/// unchanged. -fn percent_decode(input: &str) -> String { +/// Returns `Cow::Borrowed` when no percent-encoding is present (the common +/// case), avoiding a heap allocation entirely. +fn percent_decode(input: &str) -> std::borrow::Cow<'_, str> { + // Fast path: no percent-encoded characters — return the input as-is. + if !input.as_bytes().contains(&b'%') { + return std::borrow::Cow::Borrowed(input); + } let mut result = Vec::with_capacity(input.len()); let bytes = input.as_bytes(); let mut i = 0; @@ -1025,7 +1027,7 @@ fn percent_decode(input: &str) -> String { result.push(bytes[i]); i += 1; } - String::from_utf8_lossy(&result).into_owned() + std::borrow::Cow::Owned(String::from_utf8_lossy(&result).into_owned()) } /// Convert a single ASCII hex character to its 4-bit numeric value. diff --git a/packages/tauri-plugin-conduit/src/codec/wire.ts b/packages/tauri-plugin-conduit/src/codec/wire.ts index 1abbaed..40f9956 100644 --- a/packages/tauri-plugin-conduit/src/codec/wire.ts +++ b/packages/tauri-plugin-conduit/src/codec/wire.ts @@ -237,6 +237,11 @@ export function writeString(value: string): Uint8Array { * `[u32 LE len][bytes]` for each frame. An empty ArrayBuffer * (byteLength === 0) means no frames. * + * **Important**: Returned Uint8Array frames are views into the input + * ArrayBuffer, not copies. Do not modify the returned frames or the + * input buffer after parsing. If you need owned copies, call + * `frame.slice()` on individual frames. + * * Corresponds to the Rust `RingBuffer::drain_all()` / `Queue::drain_all()` output. */ export function parseDrainBlob(buf: ArrayBuffer): Uint8Array[] { @@ -251,8 +256,188 @@ export function parseDrainBlob(buf: ArrayBuffer): Uint8Array[] { const len = view.getUint32(offset, true); offset += 4; if (offset + len > buf.byteLength) throw new RangeError(`parseDrainBlob: truncated frame payload at index ${i}`); - frames.push(new Uint8Array(buf.slice(offset, offset + len))); + frames.push(new Uint8Array(buf, offset, len)); offset += len; } return frames; } + +// ── WireWriter (buffered builder) ──────────────────────────────────── + +/** + * Buffered binary writer that builds wire messages with a single allocation. + * + * Instead of creating a new ArrayBuffer per field (as the standalone write + * helpers do), WireWriter pre-allocates a buffer and writes sequentially. + * Call {@link finish} to get the final Uint8Array. + * + * The buffer auto-grows if capacity is exceeded. + * + * @example + * ```ts + * const w = new WireWriter(64); + * const payload = w + * .writeU64LE(BigInt(Date.now())) + * .writeF64LE(price) + * .writeF64LE(volume) + * .writeU8(side) + * .finish(); + * ``` + */ +export class WireWriter { + private buf: ArrayBuffer; + private view: DataView; + private u8: Uint8Array; + private pos: number; + + constructor(capacity: number = 256) { + const cap = Math.max(capacity, 1); // Prevent zero-capacity infinite loop in grow() + this.buf = new ArrayBuffer(cap); + this.view = new DataView(this.buf); + this.u8 = new Uint8Array(this.buf); + this.pos = 0; + } + + private grow(needed: number): void { + if (this.pos + needed <= this.buf.byteLength) return; + let newCap = Math.max(this.buf.byteLength * 2, 1); + while (newCap < this.pos + needed) newCap *= 2; + const newBuf = new ArrayBuffer(newCap); + new Uint8Array(newBuf).set(this.u8.subarray(0, this.pos)); + this.buf = newBuf; + this.view = new DataView(this.buf); + this.u8 = new Uint8Array(this.buf); + } + + writeU8(value: number): this { + this.grow(1); + this.u8[this.pos] = value; + this.pos += 1; + return this; + } + + writeU16LE(value: number): this { + this.grow(2); + this.view.setUint16(this.pos, value, true); + this.pos += 2; + return this; + } + + writeU32LE(value: number): this { + this.grow(4); + this.view.setUint32(this.pos, value, true); + this.pos += 4; + return this; + } + + writeU64LE(value: bigint): this { + this.grow(8); + this.view.setBigUint64(this.pos, value, true); + this.pos += 8; + return this; + } + + writeI8(value: number): this { + this.grow(1); + this.view.setInt8(this.pos, value); + this.pos += 1; + return this; + } + + writeI16LE(value: number): this { + this.grow(2); + this.view.setInt16(this.pos, value, true); + this.pos += 2; + return this; + } + + writeI32LE(value: number): this { + this.grow(4); + this.view.setInt32(this.pos, value, true); + this.pos += 4; + return this; + } + + writeI64LE(value: bigint): this { + this.grow(8); + this.view.setBigInt64(this.pos, value, true); + this.pos += 8; + return this; + } + + writeF32LE(value: number): this { + this.grow(4); + this.view.setFloat32(this.pos, value, true); + this.pos += 4; + return this; + } + + writeF64LE(value: number): this { + this.grow(8); + this.view.setFloat64(this.pos, value, true); + this.pos += 8; + return this; + } + + writeBool(value: boolean): this { + this.grow(1); + this.u8[this.pos] = value ? 1 : 0; + this.pos += 1; + return this; + } + + /** Write raw bytes without a length prefix. */ + writeRaw(data: Uint8Array): this { + this.grow(data.byteLength); + this.u8.set(data, this.pos); + this.pos += data.byteLength; + return this; + } + + /** Write a length-prefixed byte array: `[u32 LE length][bytes]`. */ + writeBytes(data: Uint8Array): this { + if (data.byteLength > 0xFFFFFFFF) { + throw new RangeError(`writeBytes: length ${data.byteLength} exceeds u32 max`); + } + this.grow(4 + data.byteLength); + this.view.setUint32(this.pos, data.byteLength, true); + this.pos += 4; + this.u8.set(data, this.pos); + this.pos += data.byteLength; + return this; + } + + /** Write a length-prefixed UTF-8 string: `[u32 LE byte_length][utf8 bytes]`. */ + writeString(value: string): this { + const encoded = textEncoder.encode(value); + if (encoded.byteLength > 0xFFFFFFFF) { + throw new RangeError(`writeString: byte length ${encoded.byteLength} exceeds u32 max`); + } + this.grow(4 + encoded.byteLength); + this.view.setUint32(this.pos, encoded.byteLength, true); + this.pos += 4; + this.u8.set(encoded, this.pos); + this.pos += encoded.byteLength; + return this; + } + + /** Number of bytes written so far. */ + get length(): number { + return this.pos; + } + + /** Return the written portion as a Uint8Array view into the internal buffer. */ + finish(): Uint8Array { + return new Uint8Array(this.buf, 0, this.pos); + } + + /** Return the written portion as an owned copy (safe to retain after reuse). */ + finishCopy(): Uint8Array { + return new Uint8Array(this.buf.slice(0, this.pos)); + } + + /** Reset the writer position to reuse the buffer for a new message. */ + reset(): void { + this.pos = 0; + } +} diff --git a/packages/tauri-plugin-conduit/src/index.ts b/packages/tauri-plugin-conduit/src/index.ts index f52e708..f1dbe23 100644 --- a/packages/tauri-plugin-conduit/src/index.ts +++ b/packages/tauri-plugin-conduit/src/index.ts @@ -137,15 +137,13 @@ function buildConduit( } async function drainChannel(channel: string): Promise { - const controller = new AbortController(); - const timeoutId = setTimeout(() => controller.abort(), 30_000); // 30s default const url = `${bootstrapInfo.protocolBase}/drain/${encodeURIComponent(channel)}`; try { const response = await fetch(url, { method: 'GET', headers: _baseHeaders, - signal: controller.signal, + signal: AbortSignal.timeout(30_000), }); if (!response.ok) { const errorBody = await response.text(); @@ -153,12 +151,10 @@ function buildConduit( } return response.arrayBuffer(); } catch (err) { - if (err instanceof DOMException && err.name === 'AbortError') { + if (err instanceof DOMException && err.name === 'TimeoutError') { throw new ConduitError(408, channel, 'drain timed out'); } throw err; - } finally { - clearTimeout(timeoutId); } } @@ -209,7 +205,7 @@ function buildConduit( args?: Record, options?: InvokeOptions, ): Promise { - const payload = _encoder.encode(JSON.stringify(args ?? {})); + const payload = JSON.stringify(args ?? {}); const extra = _baseHeaders['X-Conduit-Webview'] ? { 'X-Conduit-Webview': _baseHeaders['X-Conduit-Webview'] } : undefined; const raw = await protocol.invoke(cmd, payload, options?.timeout, extra); diff --git a/packages/tauri-plugin-conduit/src/transport/protocol.ts b/packages/tauri-plugin-conduit/src/transport/protocol.ts index 496d4cd..6c95e37 100644 --- a/packages/tauri-plugin-conduit/src/transport/protocol.ts +++ b/packages/tauri-plugin-conduit/src/transport/protocol.ts @@ -17,7 +17,12 @@ export const DEFAULT_TIMEOUT_MS = 10_000; const EMPTY_BODY = new Uint8Array(0); export interface ProtocolTransport { - invoke(cmd: string, payload?: Uint8Array, timeoutMs?: number, extraHeaders?: Record): Promise; + invoke( + cmd: string, + payload?: Uint8Array | string, + timeoutMs?: number, + extraHeaders?: Record, + ): Promise; } /** @@ -33,7 +38,7 @@ export function createProtocolTransport( return { async invoke( cmd: string, - payload?: Uint8Array, + payload?: Uint8Array | string, timeoutMs: number = DEFAULT_TIMEOUT_MS, extraHeaders?: Record, ): Promise {