Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions crates/conduit-core/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ pub trait Encode {
/// Returns the decoded value together with the number of bytes consumed,
/// or `None` if the data is too short or malformed.
pub trait Decode: Sized {
/// Minimum number of bytes required to attempt decoding this type.
///
/// For fixed-size types (primitives), this equals the exact encoded size.
/// For variable-size types (String, Vec), this is the minimum (the length
/// prefix size). Used by derived impls for an upfront bounds check.
const MIN_SIZE: usize = 0;

/// Attempt to decode from the start of `data`.
fn decode(data: &[u8]) -> Option<(Self, usize)>;
}
Expand All @@ -221,6 +228,8 @@ macro_rules! impl_wire_int {
}

impl Decode for $ty {
const MIN_SIZE: usize = std::mem::size_of::<$ty>();

fn decode(data: &[u8]) -> Option<(Self, usize)> {
const SIZE: usize = std::mem::size_of::<$ty>();
if data.len() < SIZE {
Expand Down Expand Up @@ -248,6 +257,8 @@ impl Encode for bool {
}

impl Decode for bool {
const MIN_SIZE: usize = 1;

fn decode(data: &[u8]) -> Option<(Self, usize)> {
if data.is_empty() {
return None;
Expand Down Expand Up @@ -283,6 +294,8 @@ impl<T: Encode> Encode for Vec<T> {
}

impl<T: Decode> Decode for Vec<T> {
const MIN_SIZE: usize = 4;

fn decode(data: &[u8]) -> Option<(Self, usize)> {
if data.len() < 4 {
return None;
Expand Down Expand Up @@ -318,6 +331,8 @@ impl Encode for String {
}

impl Decode for String {
const MIN_SIZE: usize = 4;

fn decode(data: &[u8]) -> Option<(Self, usize)> {
if data.len() < 4 {
return None;
Expand All @@ -333,6 +348,78 @@ impl Decode for String {
}
}

// ---------------------------------------------------------------------------
// Bytes: optimized Vec<u8> wrapper with bulk encode/decode
// ---------------------------------------------------------------------------

/// A newtype wrapper around `Vec<u8>` with optimized binary Encode/Decode.
///
/// Unlike `Vec<u8>` which goes through the generic `Vec<T>` impl (decoding
/// each byte individually), `Bytes` uses a single bulk copy for both encoding
/// and decoding.
///
/// The wire format is identical to `Vec<u8>`: `[u32 LE count][bytes...]`.
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct Bytes(pub Vec<u8>);

impl From<Vec<u8>> for Bytes {
fn from(v: Vec<u8>) -> Self {
Self(v)
}
}

impl From<Bytes> for Vec<u8> {
fn from(b: Bytes) -> Self {
b.0
}
}

impl std::ops::Deref for Bytes {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.0
}
}

impl AsRef<[u8]> for Bytes {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl Encode for Bytes {
fn encode(&self, buf: &mut Vec<u8>) {
let count: u32 = self.0.len().try_into().unwrap_or_else(|_| {
panic!(
"conduit: bytes too large ({} bytes exceeds u32::MAX)",
self.0.len()
)
});
buf.extend_from_slice(&count.to_le_bytes());
buf.extend_from_slice(&self.0);
}

fn encode_size(&self) -> usize {
4 + self.0.len()
}
}

impl Decode for Bytes {
const MIN_SIZE: usize = 4;

fn decode(data: &[u8]) -> Option<(Self, usize)> {
if data.len() < 4 {
return None;
}
let count = u32::from_le_bytes([data[0], data[1], data[2], data[3]]) as usize;
let total = 4usize.checked_add(count)?;
if data.len() < total {
return None;
}
Some((Bytes(data[4..total].to_vec()), total))
}
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -447,4 +534,62 @@ mod tests {
assert_eq!(decoded, original);
assert_eq!(consumed, 4 + original.len());
}

#[test]
fn encode_decode_bytes() {
let original = Bytes(vec![10, 20, 30, 40, 50]);
let mut buf = Vec::new();
original.encode(&mut buf);
assert_eq!(original.encode_size(), buf.len());
let (decoded, consumed) = Bytes::decode(&buf).unwrap();
assert_eq!(decoded, original);
assert_eq!(consumed, buf.len());
}

#[test]
fn bytes_empty() {
let original = Bytes(Vec::new());
let mut buf = Vec::new();
original.encode(&mut buf);
assert_eq!(buf.len(), 4); // just the count
let (decoded, consumed) = Bytes::decode(&buf).unwrap();
assert_eq!(decoded.0.len(), 0);
assert_eq!(consumed, 4);
}

#[test]
fn bytes_wire_compatible_with_vec_u8() {
// Verify Bytes and Vec<u8> produce identical wire format
let data: Vec<u8> = vec![1, 2, 3, 4, 5];
let bytes = Bytes(data.clone());

let mut buf_vec = Vec::new();
data.encode(&mut buf_vec);

let mut buf_bytes = Vec::new();
bytes.encode(&mut buf_bytes);

assert_eq!(
buf_vec, buf_bytes,
"Bytes and Vec<u8> must produce identical wire format"
);
}

#[test]
fn min_size_primitives() {
assert_eq!(<u8 as Decode>::MIN_SIZE, 1);
assert_eq!(<u16 as Decode>::MIN_SIZE, 2);
assert_eq!(<u32 as Decode>::MIN_SIZE, 4);
assert_eq!(<u64 as Decode>::MIN_SIZE, 8);
assert_eq!(<i8 as Decode>::MIN_SIZE, 1);
assert_eq!(<i16 as Decode>::MIN_SIZE, 2);
assert_eq!(<i32 as Decode>::MIN_SIZE, 4);
assert_eq!(<i64 as Decode>::MIN_SIZE, 8);
assert_eq!(<f32 as Decode>::MIN_SIZE, 4);
assert_eq!(<f64 as Decode>::MIN_SIZE, 8);
assert_eq!(<bool as Decode>::MIN_SIZE, 1);
assert_eq!(<String as Decode>::MIN_SIZE, 4);
assert_eq!(<Vec<u8> as Decode>::MIN_SIZE, 4);
assert_eq!(<Bytes as Decode>::MIN_SIZE, 4);
}
}
2 changes: 1 addition & 1 deletion crates/conduit-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod router;

pub use channel::ChannelBuffer;
pub use codec::{
DRAIN_FRAME_OVERHEAD, Decode, Encode, FRAME_HEADER_SIZE, FrameHeader, MsgType,
Bytes, DRAIN_FRAME_OVERHEAD, Decode, Encode, FRAME_HEADER_SIZE, FrameHeader, MsgType,
PROTOCOL_VERSION, frame_pack, frame_unpack,
};
pub use error::Error;
Expand Down
112 changes: 81 additions & 31 deletions crates/conduit-core/src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
//! ...
//! ```

use std::collections::VecDeque;
use std::sync::Mutex;

use crate::Error;
Expand All @@ -28,10 +27,18 @@ use crate::codec::DRAIN_FRAME_OVERHEAD;
// ---------------------------------------------------------------------------

/// The unsynchronized interior of the queue.
///
/// Frames are stored pre-formatted in wire layout: `[u32 LE len][bytes]` per
/// frame, so that `drain_all()` can emit the entire payload with a single
/// memcpy instead of N×2 `extend_from_slice` calls.
struct QueueInner {
/// Buffered frames in FIFO order.
frames: VecDeque<Vec<u8>>,
/// Total bytes used: sum of (DRAIN_FRAME_OVERHEAD + frame.len()) for each frame.
/// Pre-formatted wire data: frames stored as [u32 LE len][bytes][u32 LE len][bytes]...
wire_data: Vec<u8>,
/// Number of frames currently stored.
frame_count: u32,
/// Start of live data in wire_data (frames before this offset have been popped).
read_pos: usize,
/// Total bytes used for capacity accounting: sum of (DRAIN_FRAME_OVERHEAD + frame.len()).
bytes_used: usize,
/// Maximum byte budget. `0` means unbounded.
max_bytes: usize,
Expand All @@ -41,7 +48,9 @@ impl QueueInner {
/// Create an empty inner buffer with the given byte budget.
fn new(max_bytes: usize) -> Self {
Self {
frames: VecDeque::new(),
wire_data: Vec::new(),
frame_count: 0,
read_pos: 0,
bytes_used: 0,
max_bytes,
}
Expand Down Expand Up @@ -112,14 +121,32 @@ impl Queue {
/// (plus its 4-byte length prefix) would exceed `max_bytes`. When
/// `max_bytes` is `0` (unbounded), pushes always succeed.
pub fn push(&self, frame: &[u8]) -> Result<(), Error> {
// Guard: frame length must fit in u32 (wire format invariant) and
// frame_cost must not overflow usize (relevant on 32-bit targets).
if frame.len() > u32::MAX as usize
|| DRAIN_FRAME_OVERHEAD.checked_add(frame.len()).is_none()
{
return Err(Error::PayloadTooLarge(frame.len()));
}

let cost = QueueInner::frame_cost(frame);
let mut inner = crate::lock_or_recover(&self.inner);

if inner.max_bytes > 0 && inner.bytes_used + cost > inner.max_bytes {
return Err(Error::ChannelFull);
}

inner.frames.push_back(frame.to_vec());
// Guard: frame count must fit in u32 (wire format uses u32 count header).
if inner.frame_count == u32::MAX {
return Err(Error::ChannelFull);
}

// Append frame in wire format: [u32 LE len][bytes].
inner
.wire_data
.extend_from_slice(&(frame.len() as u32).to_le_bytes());
inner.wire_data.extend_from_slice(frame);
inner.frame_count += 1;
inner.bytes_used = inner.bytes_used.saturating_add(cost);
Ok(())
}
Expand All @@ -130,8 +157,34 @@ impl Queue {
#[must_use]
pub fn try_pop(&self) -> Option<Vec<u8>> {
let mut inner = crate::lock_or_recover(&self.inner);
let frame = inner.frames.pop_front()?;
inner.bytes_used -= QueueInner::frame_cost(&frame);
if inner.frame_count == 0 {
return None;
}
let len_bytes: [u8; 4] = inner.wire_data[inner.read_pos..inner.read_pos + 4]
.try_into()
.unwrap();
let payload_len = u32::from_le_bytes(len_bytes) as usize;
let payload_start = inner.read_pos + 4;
let frame = inner.wire_data[payload_start..payload_start + payload_len].to_vec();
let cost = DRAIN_FRAME_OVERHEAD + payload_len;
inner.read_pos += cost;
inner.frame_count -= 1;
inner.bytes_used -= cost;

// Compact: when empty, just reset; otherwise shift when read_pos
// exceeds half the allocation to prevent unbounded growth during
// steady pop/push workloads.
if inner.frame_count == 0 {
inner.wire_data.clear();
inner.read_pos = 0;
} else if inner.read_pos > inner.wire_data.len() / 2 {
let rp = inner.read_pos;
inner.wire_data.copy_within(rp.., 0);
let new_len = inner.wire_data.len() - rp;
inner.wire_data.truncate(new_len);
inner.read_pos = 0;
}

Some(frame)
}

Expand All @@ -149,39 +202,34 @@ impl Queue {
/// Returns an empty `Vec` if the queue is empty.
#[must_use]
pub fn drain_all(&self) -> Vec<u8> {
// Swap the frames out under the lock, then serialize without contention.
let (mut frames, bytes_used) = {
// Take the pre-formatted wire data out under the lock, then prepend
// the frame count header without contention.
let (wire_data, read_pos, frame_count) = {
let mut inner = crate::lock_or_recover(&self.inner);
if inner.frames.is_empty() {
if inner.frame_count == 0 {
return Vec::new();
}
let frames = std::mem::take(&mut inner.frames);
let bytes_used = inner.bytes_used;
let wire_data = std::mem::take(&mut inner.wire_data);
let read_pos = inner.read_pos;
let frame_count = inner.frame_count;
inner.read_pos = 0;
inner.frame_count = 0;
inner.bytes_used = 0;
(frames, bytes_used)
(wire_data, read_pos, frame_count)
};
// Lock released — serialize without holding the mutex.
let output_size = 4usize.saturating_add(bytes_used);
// Lock released — build output with TWO extend_from_slice calls (was N×2).
let live_data = &wire_data[read_pos..];
let output_size = 4 + live_data.len();
let mut buf = Vec::with_capacity(output_size);

// Frame count header.
let count = frames.len() as u32;
buf.extend_from_slice(&count.to_le_bytes());

// Each frame: [u32 LE len][bytes].
for frame in frames.make_contiguous() {
let len = frame.len() as u32;
buf.extend_from_slice(&len.to_le_bytes());
buf.extend_from_slice(frame);
}

buf.extend_from_slice(&frame_count.to_le_bytes());
buf.extend_from_slice(live_data);
buf
}

/// Number of frames currently queued.
#[must_use]
pub fn frame_count(&self) -> usize {
crate::lock_or_recover(&self.inner).frames.len()
crate::lock_or_recover(&self.inner).frame_count as usize
}

/// Number of bytes currently used (including per-frame length prefixes).
Expand All @@ -199,7 +247,9 @@ impl Queue {
/// Clear all queued frames.
pub fn clear(&self) {
let mut inner = crate::lock_or_recover(&self.inner);
inner.frames.clear();
inner.wire_data.clear();
inner.frame_count = 0;
inner.read_pos = 0;
inner.bytes_used = 0;
}
}
Expand All @@ -208,7 +258,7 @@ impl std::fmt::Debug for Queue {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let inner = crate::lock_or_recover(&self.inner);
f.debug_struct("Queue")
.field("frame_count", &inner.frames.len())
.field("frame_count", &inner.frame_count)
.field("bytes_used", &inner.bytes_used)
.field("max_bytes", &inner.max_bytes)
.finish()
Expand Down
Loading
Loading