Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ai/memory/gotchas.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ id: gotchas-0001
type: gotcha
scope: repo
tags: ["joins", "partitions", "varlena", "signals"]
updated_at: "2025-11-29"
updated_at: "2025-12-07"
importance: 0.7
---

Expand All @@ -13,3 +13,7 @@ importance: 0.7
- TOAST/Compressed varlena: in projection → error; when not projected → skip, don’t crash.
- SIGUSR1: requires a valid client PID; not available on non‑Unix.
- SHM races: don’t cache borrowed slices beyond one read cycle; clamp lengths.

## Executor borrow patterns

- Caching tuple `(off,len)` pairs in `PgScanStream`: fill a self-owned `Vec<(u16,u16)>` and create the iterator inside a tight scope. Clone needed metadata (`attrs_full`, `proj_indices`) into locals and avoid using `self` while the iterator (borrowing the pairs slice) is alive. This sidesteps borrow checker conflicts between a mutable `&mut self` and an immutable borrow of `self.pairs`.
2 changes: 1 addition & 1 deletion executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ smallvec = { version = "1.14", features = ["const_generics", "union"] }
smol_str = "0.3"
thiserror = "2.0"
tokio = { version = "1.42", features = ["full"] }
tracing = "0.1"
tracing = { version = "0.1" }
protocol = { path = "../protocol", version = "25.0" }
common = { path = "../common", version = "25.0" }
async-trait = "0.1"
Expand Down
20 changes: 12 additions & 8 deletions executor/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,23 @@ pub async fn signal_listener(state: Arc<SharedState<'_>>) {
if state.flags[i].load(Ordering::Acquire) {
state.wakers[i].wake();
woke += 1;
if tracing::enabled!(target: "executor::ipc", tracing::Level::TRACE) {
tracing::trace!(
target = "executor::ipc",
conn_id = i,
"signal_listener: woke socket"
);
}
}
}
if woke == 0 {
if tracing::enabled!(target: "executor::ipc", tracing::Level::TRACE) {
tracing::trace!(
target = "executor::ipc",
conn_id = i,
"signal_listener: woke socket"
"signal_listener: signal received but no flags set"
);
}
}
if woke == 0 {
tracing::trace!(
target = "executor::ipc",
"signal_listener: signal received but no flags set"
);
}
}
}

Expand Down
82 changes: 39 additions & 43 deletions executor/src/pgscan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ pub struct PgScanStream {
proj_indices: Arc<Vec<usize>>,
rx: Option<mpsc::Receiver<HeapBlock>>,
conn_id: usize,
pairs: Vec<(u16, u16)>,
}

impl PgScanStream {
Expand All @@ -363,6 +364,7 @@ impl PgScanStream {
proj_indices,
rx,
conn_id,
pairs: Vec::new(),
}
}
}
Expand Down Expand Up @@ -392,57 +394,49 @@ impl Stream for PgScanStream {

// Create a HeapPage view and iterate tuples
let hp = unsafe { HeapPage::from_slice(page) };
let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema));
let Ok(hp) = hp else {
// On error decoding page, return empty batch for resilience
let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema));
return Poll::Ready(Some(Ok(batch)));
};

// Prepare per-column builders
let col_count = total_cols;
let mut builders = make_builders(&this.proj_schema, block.num_offsets as usize)
.map_err(|e| datafusion::error::DataFusionError::Execution(format!("{e}")))?;
// Use tuples_by_offset to iterate LP_NORMAL tuples in page order
let mut pairs: Vec<(u16, u16)> = Vec::new();
// Pre-scan to populate pairs and log LP_NORMAL count
{
let _ = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs);
}
let pairs_len = pairs.len();
// Create iterator borrowing the filled pairs slice
let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs);
tracing::trace!(
target = "executor::server",
blkno = block.blkno,
num_offsets = block.num_offsets,
lp_normal = pairs_len,
"pgscan: tuples_by_offset summary"
);
// Take local clones of shared metadata to avoid borrowing `this` during tuple iteration
let proj_indices = Arc::clone(&this.proj_indices);
let attrs_full = Arc::clone(&this.attrs_full);
let page_hdr = unsafe { &*(page.as_ptr() as *const pg_sys::PageHeaderData) }
as *const pg_sys::PageHeaderData;
let mut decoded_rows = 0usize;
for tup in it {
// Decode projected columns for tuple using iterator over requested projection
let iter = unsafe {
decode_tuple_project(
page_hdr,
tup,
&this.attrs_full,
this.proj_indices.iter().copied(),
)
};
let Ok(mut iter) = iter else {
continue;
};
// Iterate over projected columns in order
for b in builders.iter_mut().take(total_cols) {
match iter.next() {
Some(Ok(v)) => append_scalar(b, v),
Some(Err(_e)) => append_null(b),
None => append_null(b),
// Limit the borrow of `this.pairs` to this inner scope to satisfy borrow checker
{
// Populate pairs once and create iterator borrowing the filled pairs slice
let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut this.pairs);
for tup in it {
// Decode projected columns for tuple using iterator over requested projection
let iter = unsafe {
decode_tuple_project(
page_hdr,
tup,
&attrs_full,
proj_indices.iter().copied(),
)
};
let Ok(mut iter) = iter else {
continue;
};
// Iterate over projected columns in order
for b in builders.iter_mut().take(total_cols) {
match iter.next() {
Some(Ok(v)) => append_scalar(b, v),
Some(Err(_e)) => append_null(b),
None => append_null(b),
}
}
decoded_rows += 1;
}
decoded_rows += 1;
}

// Build Arrow arrays and RecordBatch
Expand All @@ -462,12 +456,14 @@ impl Stream for PgScanStream {
datafusion::error::DataFusionError::Execution(format!("{e}"))
})?
};
tracing::trace!(
target = "executor::server",
rows = decoded_rows,
blkno = block.blkno,
"pgscan: decoded rows"
);
if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) {
tracing::trace!(
target = "executor::server",
rows = decoded_rows,
blkno = block.blkno,
"pgscan: decoded rows"
);
}
Poll::Ready(Some(Ok(rb)))
}
Poll::Ready(None) => Poll::Ready(None),
Expand Down
Loading
Loading