From a34b5f3d53f675867d2b13335aefa7626ff43583 Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Sat, 29 Nov 2025 17:09:13 +0700 Subject: [PATCH 1/5] fix: float encoding --- executor/src/server.rs | 18 +++++++++++++++++- storage/src/heap.rs | 23 ++++++++++++++++++++++- 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/executor/src/server.rs b/executor/src/server.rs index 43d3028..0ef552e 100644 --- a/executor/src/server.rs +++ b/executor/src/server.rs @@ -7,7 +7,7 @@ use crate::sql::Catalog; use anyhow::Error; use anyhow::{bail, Result}; use common::FusionError; -use datafusion::arrow::array::{Array, Int32Array, Int64Array, StringArray}; +use datafusion::arrow::array::{Array, Float32Array, Float64Array, Int32Array, Int64Array, StringArray}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::config::ConfigOptions; use datafusion::execution::SessionStateBuilder; @@ -518,6 +518,22 @@ fn encode_and_write_rows( let v = a.value(row); fields.push(F::ByVal8(v.to_le_bytes())); } + datafusion::arrow::datatypes::DataType::Float32 => { + let v = array + .as_any() + .downcast_ref::() + .expect("float32 array") + .value(row); + fields.push(F::ByVal4(v.to_le_bytes())); + } + datafusion::arrow::datatypes::DataType::Float64 => { + let v = array + .as_any() + .downcast_ref::() + .expect("float64 array") + .value(row); + fields.push(F::ByVal8(v.to_le_bytes())); + } datafusion::arrow::datatypes::DataType::Utf8 => { if let Some(idx) = idx_ref { fields.push(F::ByRef(owned[*idx].as_slice())); diff --git a/storage/src/heap.rs b/storage/src/heap.rs index 5a993ab..7e41735 100644 --- a/storage/src/heap.rs +++ b/storage/src/heap.rs @@ -287,6 +287,11 @@ fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result ScalarValue::Boolean(Some(bytes[0] != 0)), + // PostgreSQL internal single-byte "char" type (not BPCHAR) + x if x == CHAROID => { + let ch = bytes[0] as char; + ScalarValue::Utf8(Some(ch.to_string())) + } x if x == INT2OID => { let mut a = [0u8; 2]; a.copy_from_slice(bytes); @@ -297,6 +302,13 @@ fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result { + let mut a = [0u8; 4]; + a.copy_from_slice(bytes); + let v = u32::from_ne_bytes(a) as i32; + ScalarValue::Int32(Some(v)) + } x if x == INT8OID => { let mut a = [0u8; 8]; a.copy_from_slice(bytes); @@ -358,6 +370,13 @@ fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result { + let end = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len()); + let s = std::str::from_utf8(&bytes[..end]) + .map_err(|e| anyhow::anyhow!("invalid utf8 in name: {}", e))?; + ScalarValue::Utf8(Some(s.to_string())) + } _ => return Ok(None), }; Ok(Some(v)) @@ -388,12 +407,14 @@ fn typed_null_for(atttypid: pg_sys::Oid) -> ScalarValue { use pg_sys::*; match atttypid { x if x == BOOLOID => ScalarValue::Boolean(None), + x if x == CHAROID => ScalarValue::Utf8(None), x if x == INT2OID => ScalarValue::Int16(None), x if x == INT4OID => ScalarValue::Int32(None), + x if x == OIDOID => ScalarValue::Int32(None), x if x == INT8OID => ScalarValue::Int64(None), x if x == FLOAT4OID => ScalarValue::Float32(None), x if x == FLOAT8OID => ScalarValue::Float64(None), - x if x == TEXTOID || x == VARCHAROID || x == BPCHAROID => ScalarValue::Utf8(None), + x if x == TEXTOID || x == VARCHAROID || x == BPCHAROID || x == NAMEOID => ScalarValue::Utf8(None), x if x == DATEOID => ScalarValue::Date32(None), x if x == TIMEOID => ScalarValue::Time64Microsecond(None), x if x == TIMESTAMPOID || x == TIMESTAMPTZOID => { From 8468c173d47782474d409259cef6dc1c4bc92408 Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Sat, 29 Nov 2025 17:26:22 +0700 Subject: [PATCH 2/5] feat: execute only selects via pg_fusion runtime --- postgres/src/planner_hook.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/postgres/src/planner_hook.rs b/postgres/src/planner_hook.rs index 502c6a8..f3495c9 100644 --- a/postgres/src/planner_hook.rs +++ b/postgres/src/planner_hook.rs @@ -35,7 +35,15 @@ extern "C-unwind" fn datafusion_planner_hook( boundparams: ParamListInfo, ) -> *mut PlannedStmt { if ENABLE_DATAFUSION.get() { - return df_planner(query_string, boundparams); + // Only intercept plain SELECT statements; let DML/DDL/ACL/utility go through standard planner + unsafe { + if !parse.is_null() { + if (*parse).commandType == CMD_SELECT { + return df_planner(query_string, boundparams); + } + } + } + // Not a SELECT: fall through to previous/standard planner } unsafe { if let Some(prev_hook) = PREV_PLANNER_HOOK { From b899e6ea1236f5d03ae7201a3e40c260bcfdb023 Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Sat, 29 Nov 2025 22:20:53 +0700 Subject: [PATCH 3/5] feat:reduce backend log level --- postgres/src/backend.rs | 24 ++++++++++++------------ postgres/src/ipc.rs | 10 +++++----- postgres/src/worker.rs | 16 ++++++++-------- 3 files changed, 25 insertions(+), 25 deletions(-) diff --git a/postgres/src/backend.rs b/postgres/src/backend.rs index f6c053e..304d9a6 100644 --- a/postgres/src/backend.rs +++ b/postgres/src/backend.rs @@ -426,7 +426,7 @@ unsafe extern "C-unwind" fn begin_df_scan( Ok(s) => s, Err(e) => error!("Failed to map shared connection: {}", e), }; - pgrx::info!("begin_df_scan: requesting BeginScan (conn={})", id); + pgrx::debug1!("begin_df_scan: requesting BeginScan (conn={})", id); if let Err(err) = request_begin_scan(&mut shared.recv) { let _ = request_failure(&mut shared.recv); let _ = shared.signal_server(); @@ -488,7 +488,7 @@ unsafe extern "C-unwind" fn exec_df_scan( }; EXEC_SCAN_STARTED.with(|started| { if !started.get() { - pgrx::info!("exec_df_scan: requesting ExecScan (conn={})", id); + pgrx::debug1!("exec_df_scan: requesting ExecScan (conn={})", id); if let Err(err) = request_exec_scan(&mut shared.recv) { let _ = request_failure(&mut shared.recv); let _ = shared.signal_server(); @@ -526,7 +526,7 @@ unsafe extern "C-unwind" fn exec_df_scan( } match ControlPacket::try_from(header.tag) { Ok(ControlPacket::ExecReady) => { - pgrx::info!("exec_df_scan: ExecReady received (conn={})", id); + pgrx::debug1!("exec_df_scan: ExecReady received (conn={})", id); EXEC_READY_SEEN.with(|seen| seen.set(true)); break; } @@ -603,7 +603,7 @@ fn process_pending_heap_request(shared: &mut ConnectionShared) { if let Ok((scan_id, table_oid, slot_id)) = read_heap_block_request(&mut shared.send) { - pgrx::info!( + pgrx::debug1!( "process_pending_heap_request: heap request received scan_id={} table_oid={} slot_id={}", scan_id, table_oid, slot_id ); @@ -617,7 +617,7 @@ fn process_pending_heap_request(shared: &mut ConnectionShared) { *e = e.saturating_add(1); cur }; - pgrx::info!( + pgrx::debug1!( "process_pending_heap_request: computed blkno={} for scan_id={}", blkno, scan_id @@ -647,7 +647,7 @@ fn process_pending_heap_request(shared: &mut ConnectionShared) { } as u32; if blkno >= nblocks { // End of relation reached: send EOF for this scan - pgrx::info!( + pgrx::debug1!( "process_pending_heap_request: EOF scan_id={} slot_id={} (blkno {} >= nblocks {})", scan_id, slot_id, blkno, nblocks ); @@ -714,7 +714,7 @@ fn process_pending_heap_request(shared: &mut ConnectionShared) { } } // Publish to shared memory + notify executor - pgrx::info!( + pgrx::debug1!( "process_pending_heap_request: publishing page scan_id={} slot_id={} table_oid={} blkno={} num_offsets={} vis_bytes={}", scan_id, slot_id, table_oid, blkno, num_offsets, vis.len() ); @@ -768,10 +768,10 @@ unsafe fn try_store_wire_tuple_from_result( let mut ring = LockFreeBuffer::from_layout(base, layout); let avail = ring.len(); if avail == 0 { - pgrx::info!("result_ring: empty (conn={})", conn_id); + pgrx::debug1!("result_ring: empty (conn={})", conn_id); return None; } - pgrx::info!("result_ring: bytes available={} (conn={})", avail, conn_id); + pgrx::debug1!("result_ring: bytes available={} (conn={})", avail, conn_id); use std::io::Read; // Read fixed 4-byte little-endian row_len let row_len = match protocol::result::read_frame_len(&mut ring) { @@ -785,10 +785,10 @@ unsafe fn try_store_wire_tuple_from_result( return None; } }; - pgrx::info!("result_ring: frame_len={} (conn={})", row_len, conn_id); + pgrx::debug1!("result_ring: frame_len={} (conn={})", row_len, conn_id); if row_len == 0 { // EOF sentinel - pgrx::info!("result_ring: EOF sentinel (conn={})", conn_id); + pgrx::debug1!("result_ring: EOF sentinel (conn={})", conn_id); RESULT_RING_EOF.with(|f| f.set(true)); return None; } @@ -1018,7 +1018,7 @@ fn wait_latch(timeout: Option) { check_for_interrupts!(); if rc & WL_TIMEOUT as i32 != 0 { // Timeout is expected in non-blocking polling; do not raise ERROR. - pgrx::info!("wait_latch: timeout (non-fatal)"); + pgrx::debug1!("wait_latch: timeout (non-fatal)"); } else if rc & WL_POSTMASTER_DEATH as i32 != 0 { panic!("Postmaster is dead"); } diff --git a/postgres/src/ipc.rs b/postgres/src/ipc.rs index 59fa580..040cb41 100644 --- a/postgres/src/ipc.rs +++ b/postgres/src/ipc.rs @@ -76,9 +76,9 @@ impl ConnectionShared<'_> { self.flag.store(true, Ordering::Release); // Best-effort diagnostics if let Ok(id) = crate::ipc::connection_id() { - pgrx::info!("signal_server: flag set (conn={})", id); + pgrx::debug1!("signal_server: flag set (conn={})", id); } else { - pgrx::info!("signal_server: flag set (conn=?), pid unknown"); + pgrx::debug1!("signal_server: flag set (conn=?), pid unknown"); } // Best-effort: wait briefly for the worker to publish PID. @@ -104,7 +104,7 @@ impl ConnectionShared<'_> { return Err(std::io::Error::last_os_error().into()); } if let Ok(id) = crate::ipc::connection_id() { - pgrx::info!("signal_server: SIGUSR1 sent (conn={} pid={})", id, pid); + pgrx::debug1!("signal_server: SIGUSR1 sent (conn={} pid={})", id, pid); } Ok(()) } @@ -142,8 +142,8 @@ pub(crate) fn connection_shared(id: u32) -> AnyResult> let client_pid_ref: &'static AtomicI32 = unsafe { &*client_ptr }; let server_pid_ref: &'static AtomicI32 = crate::worker::server_pid_atomic(); - // Diagnostics: log addresses once per call (cheap and helps correlate with worker) - pgrx::info!( + // Diagnostics: lower level to avoid noisy INFO in production + pgrx::debug1!( "connection_shared: id={} recv_base={:?} recv_buf={:?} send_base={:?}", id, recv_base, diff --git a/postgres/src/worker.rs b/postgres/src/worker.rs index 7ee628b..49fe898 100644 --- a/postgres/src/worker.rs +++ b/postgres/src/worker.rs @@ -24,7 +24,7 @@ pub(crate) const RESULT_RING_CAP: usize = 64 * 1024; // bytes per-connection for #[pg_guard] pub(crate) unsafe extern "C-unwind" fn init_datafusion_worker() { - info!("Registering DataFusion background worker"); + debug1!("Registering DataFusion background worker"); BackgroundWorkerBuilder::new("datafusion") .set_function("worker_main") .set_library("pg_fusion") @@ -50,7 +50,7 @@ pub unsafe extern "C-unwind" fn init_shmem() { if !found { std::ptr::write_bytes(base, 0, shared.layout.size()); } - info!( + debug1!( "init_shmem: flags region ready: bytes={} count={} found={}", shared.layout.size(), num, @@ -76,7 +76,7 @@ pub unsafe extern "C-unwind" fn init_shmem() { unsafe { (*client_ptr).store(i32::MAX, Ordering::Relaxed) }; } } - info!( + debug1!( "init_shmem: connections region ready: bytes_per_conn={} total_bytes={} recv_cap={} send_cap={} count={} found={}", layout.layout.size(), total, RECV_CAP, SEND_CAP, num, found ); @@ -98,7 +98,7 @@ pub unsafe extern "C-unwind" fn init_shmem() { } // Publish base and layout to executor module for in-process access executor::shm::set_slot_blocks(base, layout); - info!( + debug1!( "init_shmem: slot blocks ready: bytes_per_conn={} total_bytes={} slots_per_conn={} blocks_per_slot={} blksz={} count={} found={}", layout.layout.size(), total, SLOTS_PER_CONN, BLOCKS_PER_SLOT, blksz, num, found ); @@ -189,7 +189,7 @@ pub extern "C-unwind" fn worker_main(_arg: pg_sys::Datum) { let num = crate::max_backends() as usize; init_tracing_file_logger(); let pid = unsafe { libc::getpid() }; - info!( + debug1!( "worker_main: starting DataFusion worker pid={} max_backends={}", pid, num ); @@ -207,7 +207,7 @@ pub extern "C-unwind" fn worker_main(_arg: pg_sys::Datum) { slice::from_raw_parts(flags_ptr, num) }; let state = Arc::new(executor::ipc::SharedState::new(flags_slice)); - info!( + debug1!( "worker_main: SharedState ready (flags={})", flags_slice.len() ); @@ -224,7 +224,7 @@ pub extern "C-unwind" fn worker_main(_arg: pg_sys::Datum) { let pid_ptr = executor::layout::server_pid_ptr(base, layout); let pid = libc::getpid(); (*pid_ptr).store(pid as i32, Ordering::Relaxed); - info!("worker_main: published server pid={}", pid); + debug1!("worker_main: published server pid={}", pid); } // Build runtime @@ -233,7 +233,7 @@ pub extern "C-unwind" fn worker_main(_arg: pg_sys::Datum) { .enable_all() .build() .unwrap(); - info!( + debug1!( "worker_main: tokio runtime built, threads={}", TOKIO_THREAD_NUMBER ); From 8b209431d4f88d81bc64ccc0f45871f7b84e5634 Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Mon, 1 Dec 2025 20:53:19 +0700 Subject: [PATCH 4/5] perf: improve scan hot path --- executor/Cargo.toml | 2 +- executor/src/ipc.rs | 20 +- executor/src/pgscan.rs | 28 +-- executor/src/server.rs | 400 ++++++++++++++++++++++++++++------------ postgres/src/backend.rs | 2 +- postgres/src/worker.rs | 9 +- storage/src/heap.rs | 185 ++++++++++++------- 7 files changed, 430 insertions(+), 216 deletions(-) diff --git a/executor/Cargo.toml b/executor/Cargo.toml index 2afb8f5..3d47eaa 100644 --- a/executor/Cargo.toml +++ b/executor/Cargo.toml @@ -22,7 +22,7 @@ smallvec = { version = "1.14", features = ["const_generics", "union"] } smol_str = "0.3" thiserror = "2.0" tokio = { version = "1.42", features = ["full"] } -tracing = "0.1" +tracing = { version = "0.1" } protocol = { path = "../protocol", version = "25.0" } common = { path = "../common", version = "25.0" } async-trait = "0.1" diff --git a/executor/src/ipc.rs b/executor/src/ipc.rs index 8a61e14..d50cfff 100644 --- a/executor/src/ipc.rs +++ b/executor/src/ipc.rs @@ -35,19 +35,23 @@ pub async fn signal_listener(state: Arc>) { if state.flags[i].load(Ordering::Acquire) { state.wakers[i].wake(); woke += 1; + if tracing::enabled!(target: "executor::ipc", tracing::Level::TRACE) { + tracing::trace!( + target = "executor::ipc", + conn_id = i, + "signal_listener: woke socket" + ); + } + } + } + if woke == 0 { + if tracing::enabled!(target: "executor::ipc", tracing::Level::TRACE) { tracing::trace!( target = "executor::ipc", - conn_id = i, - "signal_listener: woke socket" + "signal_listener: signal received but no flags set" ); } } - if woke == 0 { - tracing::trace!( - target = "executor::ipc", - "signal_listener: signal received but no flags set" - ); - } } } diff --git a/executor/src/pgscan.rs b/executor/src/pgscan.rs index f544655..55b872b 100644 --- a/executor/src/pgscan.rs +++ b/executor/src/pgscan.rs @@ -404,20 +404,8 @@ impl Stream for PgScanStream { .map_err(|e| datafusion::error::DataFusionError::Execution(format!("{e}")))?; // Use tuples_by_offset to iterate LP_NORMAL tuples in page order let mut pairs: Vec<(u16, u16)> = Vec::new(); - // Pre-scan to populate pairs and log LP_NORMAL count - { - let _ = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs); - } - let pairs_len = pairs.len(); - // Create iterator borrowing the filled pairs slice + // Populate pairs once and create iterator borrowing the filled pairs slice let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs); - tracing::trace!( - target = "executor::server", - blkno = block.blkno, - num_offsets = block.num_offsets, - lp_normal = pairs_len, - "pgscan: tuples_by_offset summary" - ); let page_hdr = unsafe { &*(page.as_ptr() as *const pg_sys::PageHeaderData) } as *const pg_sys::PageHeaderData; let mut decoded_rows = 0usize; @@ -462,12 +450,14 @@ impl Stream for PgScanStream { datafusion::error::DataFusionError::Execution(format!("{e}")) })? }; - tracing::trace!( - target = "executor::server", - rows = decoded_rows, - blkno = block.blkno, - "pgscan: decoded rows" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + tracing::trace!( + target = "executor::server", + rows = decoded_rows, + blkno = block.blkno, + "pgscan: decoded rows" + ); + } Poll::Ready(Some(Ok(rb))) } Poll::Ready(None) => Poll::Ready(None), diff --git a/executor/src/server.rs b/executor/src/server.rs index 0ef552e..1015b3c 100644 --- a/executor/src/server.rs +++ b/executor/src/server.rs @@ -7,7 +7,9 @@ use crate::sql::Catalog; use anyhow::Error; use anyhow::{bail, Result}; use common::FusionError; -use datafusion::arrow::array::{Array, Float32Array, Float64Array, Int32Array, Int64Array, StringArray}; +use datafusion::arrow::array::{ + Array, Float32Array, Float64Array, Int32Array, Int64Array, StringArray, +}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::config::ConfigOptions; use datafusion::execution::SessionStateBuilder; @@ -94,15 +96,29 @@ impl<'bytes> Connection<'bytes> { pub async fn poll(&mut self) -> Result<()> { let pending = self.recv_socket.buffer.len(); if pending > 0 { - trace!("poll: data already available: {} bytes", pending); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "poll: data already available: {} bytes", + pending + ); + } return Ok(()); } - trace!("poll: waiting for socket signal"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "poll: waiting for socket signal" + ); + } (&mut self.recv_socket).await?; - trace!( - "poll: socket signaled (data available: {} bytes)", - self.recv_socket.buffer.len() - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "poll: socket signaled (data available: {} bytes)", + self.recv_socket.buffer.len() + ); + } Ok(()) } @@ -112,7 +128,13 @@ impl<'bytes> Connection<'bytes> { #[cfg(unix)] { let pid = self.client_pid.load(Ordering::Relaxed); - trace!(client_pid = pid, "signal_client: about to send SIGUSR1"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + client_pid = pid, + "signal_client: about to send SIGUSR1" + ); + } if pid <= 0 || pid == i32::MAX { bail!(FusionError::FailedTo( "send SIGUSR1".into(), @@ -127,7 +149,13 @@ impl<'bytes> Connection<'bytes> { format_smolstr!("{err}") )); } - trace!(client_pid = pid, "signal_client: SIGUSR1 sent"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + client_pid = pid, + "signal_client: SIGUSR1 sent" + ); + } Ok(()) } #[cfg(not(unix))] @@ -149,11 +177,18 @@ impl<'bytes> Connection<'bytes> { let pending = self.recv_socket.buffer.len(); if pending < 6 { // Header is at least 6 bytes (3 fixints + u16) - trace!("process_message: woke but no header bytes available",); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: woke but no header bytes available" + ); + } return Ok(()); } let header = consume_header(&mut self.recv_socket.buffer)?; - debug!( + if tracing::enabled!(target: "executor::server", tracing::Level::DEBUG) { + debug!( + target = "executor::server", direction = ?header.direction, tag = header.tag, flag = ?header.flag, @@ -161,20 +196,28 @@ impl<'bytes> Connection<'bytes> { recv_unread = self.recv_socket.buffer.len(), send_unread = self.send_buffer.len(), "process_message: header received" - ); + ); + } if header.direction == Direction::ToClient { - trace!("process_message: header direction ToClient, ignoring"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: header direction ToClient, ignoring" + ); + } return Ok(()); } // Column layout arrives during planning/prepare; store it for later encoding use. if header.tag == ControlPacket::ColumnLayout as u8 { let attrs = consume_column_layout(&mut self.recv_socket.buffer)?; let len = attrs.len(); - tracing::trace!( - target = "executor::server", - pg_attrs = len, - "column layout received" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + tracing::trace!( + target = "executor::server", + pg_attrs = len, + "column layout received" + ); + } storage.pg_attrs = Some(attrs); return Ok(()); } @@ -185,16 +228,18 @@ impl<'bytes> Connection<'bytes> { // Read metadata; we expect the visibility bitmap to be stored in shared memory. let meta = read_heap_block_bitmap_meta(&mut self.recv_socket.buffer, header.length)?; - trace!( - target = "executor::server", - scan_id = meta.scan_id, - slot_id = meta.slot_id, - table_oid = meta.table_oid, - blkno = meta.blkno, - num_offsets = meta.num_offsets, - bitmap_inline_len = meta.bitmap_len, - "heap bitmap meta received" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + scan_id = meta.scan_id, + slot_id = meta.slot_id, + table_oid = meta.table_oid, + blkno = meta.blkno, + num_offsets = meta.num_offsets, + bitmap_inline_len = meta.bitmap_len, + "heap bitmap meta received" + ); + } // If payload included inline bitmap bytes (legacy), drain them without allocation. let mut remaining = meta.bitmap_len as usize; while remaining > 0 { @@ -205,13 +250,15 @@ impl<'bytes> Connection<'bytes> { } // Trailing u16 carries the visibility bitmap length in shared memory let vis_len = read_u16_msgpack(&mut self.recv_socket.buffer)?; - trace!( - target = "executor::server", - scan_id = meta.scan_id, - slot_id = meta.slot_id, - vis_len, - "heap bitmap vis_len (shm) received" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + scan_id = meta.scan_id, + slot_id = meta.slot_id, + vis_len, + "heap bitmap vis_len (shm) received" + ); + } if let Some(tx) = storage.registry.sender(meta.scan_id as u64) { let block = crate::pgscan::HeapBlock { scan_id: meta.scan_id as u64, @@ -230,13 +277,15 @@ impl<'bytes> Connection<'bytes> { ); } // Request the next heap block for this scan using the same slot - trace!( - target = "executor::server", - scan_id = meta.scan_id, - table_oid = meta.table_oid, - slot_id = meta.slot_id, - "requesting next heap block" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + scan_id = meta.scan_id, + table_oid = meta.table_oid, + slot_id = meta.slot_id, + "requesting next heap block" + ); + } if let Err(e) = request_heap_block( &mut self.send_buffer, meta.scan_id, @@ -250,11 +299,13 @@ impl<'bytes> Connection<'bytes> { ); } } else { - trace!( - target = "pg_fusion::server", - scan_id = meta.scan_id, - "received heap block for unknown scan_id; dropping" - ); + if tracing::enabled!(target: "pg_fusion::server", tracing::Level::TRACE) { + trace!( + target = "pg_fusion::server", + scan_id = meta.scan_id, + "received heap block for unknown scan_id; dropping" + ); + } // Drop the payload (already consumed above) } return Ok(()); @@ -266,7 +317,12 @@ impl<'bytes> Connection<'bytes> { return Ok(()); } _ => { - trace!("process_message: unrecognized data packet, ignoring"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: unrecognized data packet, ignoring" + ); + } return Ok(()); } } @@ -275,7 +331,9 @@ impl<'bytes> Connection<'bytes> { let mut skip_metadata = false; loop { let action = storage.state.consume(&packet)?; - trace!(current_packet = ?packet, action = ?action, "process_message: state consumed"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!(target = "executor::server", current_packet = ?packet, action = ?action, "process_message: state consumed"); + } let result = match action { Some(Action::Bind) => { let Some(plan) = std::mem::take(&mut storage.logical_plan) else { @@ -285,7 +343,13 @@ impl<'bytes> Connection<'bytes> { )); }; let params = read_params(&mut self.recv_socket.buffer)?; - trace!("process_message: Action::Bind with {} params", params.len()); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::Bind with {} params", + params.len() + ); + } bind(plan, params) } Some(Action::Compile) => { @@ -303,15 +367,22 @@ impl<'bytes> Connection<'bytes> { Arc::clone(&storage.registry), )? }; - trace!( - skip_metadata, - "process_message: Action::Compile (catalog {}loaded)", - if skip_metadata { "not " } else { "" } - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + skip_metadata, + "process_message: Action::Compile (catalog {}loaded)", + if skip_metadata { "not " } else { "" } + ); + } compile(stmt, &catalog) } Some(Action::Explain) => { - trace!("process_message: Action::Explain"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::Explain" + ); + } if let Some(phys) = storage.physical_plan.as_ref() { explain_physical(phys) } else { @@ -328,17 +399,29 @@ impl<'bytes> Connection<'bytes> { "while processing optimize message".into(), )); }; - trace!("process_message: Action::Optimize"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::Optimize" + ); + } optimize(plan) } Some(Action::Flush) => { - trace!("process_message: Action::Flush (reset state)"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::Flush (reset state)" + ); + } storage.flush(); return Ok(()); } Some(Action::Parse) => { let query = read_query(&mut self.recv_socket.buffer)?; - trace!(query = %query, "process_message: Action::Parse"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!(target = "executor::server", query = %query, "process_message: Action::Parse"); + } parse(query.into()) } Some(Action::Translate) => { @@ -348,19 +431,39 @@ impl<'bytes> Connection<'bytes> { "while processing translate message".into(), )); }; - trace!("process_message: Action::Translate"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::Translate" + ); + } translate(plan).await } Some(Action::OpenDataFlow) => { - trace!("process_message: Action::OpenDataFlow"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::OpenDataFlow" + ); + } open_data_flow(self, storage) } Some(Action::StartDataFlow) => { - trace!("process_message: Action::StartDataFlow"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::StartDataFlow" + ); + } start_data_flow(self, storage) } Some(Action::EndDataFlow) => { - trace!("process_message: Action::EndDataFlow"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: Action::EndDataFlow" + ); + } end_data_flow(storage) } None => { @@ -373,37 +476,61 @@ impl<'bytes> Connection<'bytes> { match result { TaskResult::Bind(plan) => { prepare_columns(&mut self.send_buffer, plan.schema().fields())?; - trace!( - columns = plan.schema().fields().len(), - "process_message: prepared columns for Bind" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + columns = plan.schema().fields().len(), + "process_message: prepared columns for Bind" + ); + } storage.logical_plan = Some(plan); // Immediately schedule an Optimize pass for the next loop iteration. packet = ControlPacket::Optimize; - trace!("process_message: scheduling Optimize after Bind"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: scheduling Optimize after Bind" + ); + } continue; } TaskResult::Compilation(plan) => { storage.logical_plan = Some(plan); request_params(&mut self.send_buffer)?; - trace!("process_message: requested params after Compilation"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: requested params after Compilation" + ); + } } TaskResult::Optimized(plan) => { storage.logical_plan = Some(plan); packet = ControlPacket::Translate; - trace!("process_message: scheduling Translate after Optimize"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: scheduling Translate after Optimize" + ); + } continue; } TaskResult::Translated(phys) => { storage.physical_plan = phys; - trace!("process_message: transitioned to PhysicalPlan (built physical plan)"); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + "process_message: transitioned to PhysicalPlan (built physical plan)" + ); + } } TaskResult::Explain(explain) => { prepare_explain(&mut self.send_buffer, explain.as_str())?; - trace!( - explain_len = explain.len(), - "process_message: prepared Explain" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + explain_len = explain.len(), + "process_message: prepared Explain" + ); + } } TaskResult::Noop => { // Intentionally no-op (e.g., heap packet consumed elsewhere) @@ -428,11 +555,14 @@ impl<'bytes> Connection<'bytes> { } break; } - debug!( - send_unread = self.send_buffer.len(), - client_pid = self.client_pid.load(Ordering::Relaxed), - "process_message: response buffered, ready to signal client" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::DEBUG) { + debug!( + target = "executor::server", + send_unread = self.send_buffer.len(), + client_pid = self.client_pid.load(Ordering::Relaxed), + "process_message: response buffered, ready to signal client" + ); + } Ok(()) } @@ -458,13 +588,15 @@ fn encode_and_write_rows( let rows = batch.num_rows(); let cols = std::cmp::min(batch.num_columns(), attrs.len()); if cols == 0 || rows == 0 { - tracing::trace!( - target = "executor::server", - rows, - cols, - attrs = attrs.len(), - "execution: empty batch or no attrs; skipping write" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + tracing::trace!( + target = "executor::server", + rows, + cols, + attrs = attrs.len(), + "execution: empty batch or no attrs; skipping write" + ); + } return 0; } // Working buffers reused across rows to minimize allocations @@ -639,18 +771,24 @@ fn open_data_flow(_conn: &mut Connection, storage: &mut Storage) -> Result(phys, |id, table_oid| { - trace!( - scan_id = id, - table_oid, - "open_data_flow: registering scan channel" - ); + if tracing::enabled!(target: "executor::server", tracing::Level::TRACE) { + trace!( + target = "executor::server", + scan_id = id, + table_oid, + "open_data_flow: registering scan channel" + ); + } let _ = storage.registry.register(id, 16); Ok(()) }); @@ -666,17 +804,21 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result Result { @@ -712,13 +862,16 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result 0 { let pid = client_pid_val; if pid > 0 && pid != i32::MAX { @@ -737,10 +890,12 @@ fn start_data_flow(conn: &mut Connection, storage: &mut Storage) -> Result Result HeapPage<'bytes> { let mut cur = unsafe { self.data.as_ptr().add(size_of::()) as *const ItemIdData }; let end = unsafe { cur.add(cnt) }; + // ItemIdData array is 4-byte aligned on PostgreSQL pages; prefer aligned load while cur < end { - let item = unsafe { ptr::read(cur) }; + let raw: u32 = unsafe { ptr::read(cur as *const u32) }; cur = unsafe { cur.add(1) }; - if (item.lp_flags() as u32) != pg_sys::LP_NORMAL { + + #[inline(always)] + fn unpack_itemid(raw: u32) -> (u16, u8, u16) { + if cfg!(target_endian = "little") { + // [ len:15 ][ flags:2 ][ off:15 ] (low bits declared first in little-endian) + let off = (raw & 0x7FFF) as u16; + let flags = ((raw >> 15) & 0x03) as u8; + let len = ((raw >> 17) & 0x7FFF) as u16; + (off, flags, len) + } else { + // In big-endian, the first-declared field occupies the high bits + let off = ((raw >> 17) & 0x7FFF) as u16; + let flags = ((raw >> 15) & 0x03) as u8; + let len = (raw & 0x7FFF) as u16; + (off, flags, len) + } + } + + let (off_u16, flags_u8, len_u16) = unpack_itemid(raw); + if (flags_u8 as u32) != pg_sys::LP_NORMAL { continue; } - let off = item.lp_off() as usize; - let len = item.lp_len() as usize; + + let off = off_u16 as usize; + let len = len_u16 as usize; if len == 0 { continue; } @@ -123,7 +144,9 @@ impl<'bytes> HeapPage<'bytes> { continue; } if let Some(f) = filter { - if !f(&item, ctx) { + // Pass a reference to the in-page ItemIdData when a filter is present + let item_ref: &ItemIdData = unsafe { &*cur.offset(-1) }; + if !f(item_ref, ctx) { continue; } } @@ -170,19 +193,34 @@ impl<'block> Iterator for TupleSliceIter<'block> { fn next(&mut self) -> Option { while self.cur < self.end { - // Safety: constructed within `line_pointers`, `cur < end` and points to valid item - let item = unsafe { ptr::read(self.cur) }; + // Aligned read of ItemIdData as u32 + let raw: u32 = unsafe { ptr::read(self.cur as *const u32) }; self.cur = unsafe { self.cur.add(1) }; + #[inline(always)] + fn unpack_itemid(raw: u32) -> (u16, u8, u16) { + if cfg!(target_endian = "little") { + let off = (raw & 0x7FFF) as u16; + let flags = ((raw >> 15) & 0x03) as u8; + let len = ((raw >> 17) & 0x7FFF) as u16; + (off, flags, len) + } else { + let off = ((raw >> 17) & 0x7FFF) as u16; + let flags = ((raw >> 15) & 0x03) as u8; + let len = (raw & 0x7FFF) as u16; + (off, flags, len) + } + } + + let (off_u16, flags_u8, len_u16) = unpack_itemid(raw); // Filter only live tuples - let flags = item.lp_flags() as u32; - if flags != pg_sys::LP_NORMAL { + if (flags_u8 as u32) != pg_sys::LP_NORMAL { continue; } // Derive offset and length; validate within data region - let off = item.lp_off() as usize; - let len = item.lp_len() as usize; + let off = off_u16 as usize; + let len = len_u16 as usize; // basic sanity: nonzero length and within page bounds if len == 0 { @@ -203,7 +241,8 @@ impl<'block> Iterator for TupleSliceIter<'block> { // User filter (if any) if let Some(f) = self.filter { - if !f(&item, self.ctx) { + let item_ref: &ItemIdData = unsafe { &*self.cur.offset(-1) }; + if !f(item_ref, self.ctx) { continue; } } @@ -279,55 +318,74 @@ fn align_up(off: usize, align_char: u8) -> usize { (off + mask) & !mask } -#[inline] +#[inline(always)] fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result> { use pg_sys::*; const UNIX_EPOCH_USEC_FROM_PG: i64 = 946_684_800_i64 * 1_000_000; // 1970-01-01 - 2000-01-01 const UNIX_EPOCH_DAYS_FROM_PG: i32 = 10_957; // days between 1970-01-01 and 2000-01-01 let v = match atttypid { - x if x == BOOLOID => ScalarValue::Boolean(Some(bytes[0] != 0)), + x if x == BOOLOID => ScalarValue::Boolean(Some(unsafe { *bytes.get_unchecked(0) } != 0)), // PostgreSQL internal single-byte "char" type (not BPCHAR) x if x == CHAROID => { - let ch = bytes[0] as char; + let ch = unsafe { *bytes.get_unchecked(0) } as char; ScalarValue::Utf8(Some(ch.to_string())) } x if x == INT2OID => { - let mut a = [0u8; 2]; - a.copy_from_slice(bytes); - ScalarValue::Int16(Some(i16::from_ne_bytes(a))) + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let v = unsafe { core::ptr::read(bytes.as_ptr() as *const i16) }; + ScalarValue::Int16(Some(v)) } x if x == INT4OID => { - let mut a = [0u8; 4]; - a.copy_from_slice(bytes); - ScalarValue::Int32(Some(i32::from_ne_bytes(a))) + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let v = unsafe { core::ptr::read(bytes.as_ptr() as *const i32) }; + ScalarValue::Int32(Some(v)) } // OID maps to Int32 for now (DataFusion UInt32 available but keep consistent with common integer use) x if x == OIDOID => { - let mut a = [0u8; 4]; - a.copy_from_slice(bytes); - let v = u32::from_ne_bytes(a) as i32; + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let v = unsafe { core::ptr::read(bytes.as_ptr() as *const u32) } as i32; ScalarValue::Int32(Some(v)) } x if x == INT8OID => { - let mut a = [0u8; 8]; - a.copy_from_slice(bytes); - ScalarValue::Int64(Some(i64::from_ne_bytes(a))) + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let v = unsafe { core::ptr::read(bytes.as_ptr() as *const i64) }; + ScalarValue::Int64(Some(v)) } x if x == FLOAT4OID => { - let mut a = [0u8; 4]; - a.copy_from_slice(bytes); - ScalarValue::Float32(Some(f32::from_ne_bytes(a))) + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let v = unsafe { core::ptr::read(bytes.as_ptr() as *const f32) }; + ScalarValue::Float32(Some(v)) } x if x == FLOAT8OID => { - let mut a = [0u8; 8]; - a.copy_from_slice(bytes); - ScalarValue::Float64(Some(f64::from_ne_bytes(a))) + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let v = unsafe { core::ptr::read(bytes.as_ptr() as *const f64) }; + ScalarValue::Float64(Some(v)) } x if x == DATEOID => { - let mut a = [0u8; 4]; - a.copy_from_slice(bytes); - let pg_days = i32::from_ne_bytes(a); + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let pg_days: i32 = unsafe { core::ptr::read(bytes.as_ptr() as *const i32) }; // Preserve infinities as sentinels, otherwise convert from PG epoch (2000-01-01) to UNIX (1970-01-01) let unix_days = if pg_days == i32::MIN || pg_days == i32::MAX { pg_days @@ -337,30 +395,36 @@ fn decode_fixed_width(atttypid: pg_sys::Oid, bytes: &[u8]) -> Result { - let mut a = [0u8; 8]; - a.copy_from_slice(bytes); - let usec = i64::from_ne_bytes(a); + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let usec: i64 = unsafe { core::ptr::read(bytes.as_ptr() as *const i64) }; ScalarValue::Time64Microsecond(Some(usec)) } x if x == TIMESTAMPOID || x == TIMESTAMPTZOID => { - let mut a = [0u8; 8]; - a.copy_from_slice(bytes); - let pg_usec = i64::from_ne_bytes(a); + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let pg_usec: i64 = unsafe { core::ptr::read(bytes.as_ptr() as *const i64) }; let unix_usec = pg_usec.saturating_add(UNIX_EPOCH_USEC_FROM_PG); ScalarValue::TimestampMicrosecond(Some(unix_usec), None) } x if x == INTERVALOID => { // struct Interval { TimeOffset time; int32 day; int32 month; } // time is microseconds - let mut t = [0u8; 8]; - let mut d = [0u8; 4]; - let mut m = [0u8; 4]; - t.copy_from_slice(&bytes[0..8]); - d.copy_from_slice(&bytes[8..12]); - m.copy_from_slice(&bytes[12..16]); - let usec = i64::from_ne_bytes(t); - let day_count = i32::from_ne_bytes(d); - let month_count = i32::from_ne_bytes(m); + debug_assert_eq!( + (bytes.as_ptr() as usize) & (core::mem::align_of::() - 1), + 0 + ); + let usec: i64 = unsafe { core::ptr::read(bytes.as_ptr() as *const i64) }; + let day_ptr = unsafe { bytes.as_ptr().add(8) }; + let month_ptr = unsafe { bytes.as_ptr().add(12) }; + debug_assert_eq!((day_ptr as usize) & (core::mem::align_of::() - 1), 0); + debug_assert_eq!((month_ptr as usize) & (core::mem::align_of::() - 1), 0); + let day_count: i32 = unsafe { core::ptr::read(day_ptr as *const i32) }; + let month_count: i32 = unsafe { core::ptr::read(month_ptr as *const i32) }; let nanos = usec.saturating_mul(1000); ScalarValue::IntervalMonthDayNano(Some( datafusion_common::arrow::array::types::IntervalMonthDayNano { @@ -414,7 +478,9 @@ fn typed_null_for(atttypid: pg_sys::Oid) -> ScalarValue { x if x == INT8OID => ScalarValue::Int64(None), x if x == FLOAT4OID => ScalarValue::Float32(None), x if x == FLOAT8OID => ScalarValue::Float64(None), - x if x == TEXTOID || x == VARCHAROID || x == BPCHAROID || x == NAMEOID => ScalarValue::Utf8(None), + x if x == TEXTOID || x == VARCHAROID || x == BPCHAROID || x == NAMEOID => { + ScalarValue::Utf8(None) + } x if x == DATEOID => ScalarValue::Date32(None), x if x == TIMEOID => ScalarValue::Time64Microsecond(None), x if x == TIMESTAMPOID || x == TIMESTAMPTZOID => { @@ -436,7 +502,6 @@ pub struct DecodedIter<'bytes, I: Iterator> { hasnull: bool, bits_ptr: *const u8, hoff: usize, - pending_err: Option, } impl<'bytes, I: Iterator> DecodedIter<'bytes, I> { @@ -471,7 +536,6 @@ impl<'bytes, I: Iterator> DecodedIter<'bytes, I> { hasnull, bits_ptr, hoff, - pending_err: None, }) } @@ -525,6 +589,7 @@ impl<'bytes, I: Iterator> DecodedIter<'bytes, I> { } } + #[inline(always)] unsafe fn decode_att(&mut self, att_idx: usize) -> Result { if att_idx < self.last_attno { self.last_attno = 0; @@ -564,9 +629,8 @@ impl<'bytes, I: Iterator> DecodedIter<'bytes, I> { if self.off + 4 > self.tuple.len() { bail!(FusionError::BufferTooSmall(self.tuple.len())); } - let hdr_u32 = u32::from_ne_bytes( - self.tuple[self.off..self.off + 4].try_into().unwrap(), - ); + let hdr_u32: u32 = + ptr::read(self.tuple.as_ptr().add(self.off) as *const u32); let total_len = Self::varlena_4b_total_len(hdr_u32); if total_len < 4 || self.off + total_len > self.tuple.len() { bail!(FusionError::BufferTooSmall(self.tuple.len())); @@ -623,8 +687,7 @@ impl<'bytes, I: Iterator> DecodedIter<'bytes, I> { if self.off + 4 > self.tuple.len() { bail!(FusionError::BufferTooSmall(self.tuple.len())); } - let hdr_u32 = - u32::from_ne_bytes(self.tuple[self.off..self.off + 4].try_into().unwrap()); + let hdr_u32: u32 = ptr::read(self.tuple.as_ptr().add(self.off) as *const u32); let total_len = Self::varlena_4b_total_len(hdr_u32); if total_len < 4 || self.off + total_len > self.tuple.len() { bail!(FusionError::BufferTooSmall(self.tuple.len())); @@ -644,11 +707,9 @@ impl<'bytes, I: Iterator> DecodedIter<'bytes, I> { impl> Iterator for DecodedIter<'_, I> { type Item = Result; + #[inline(always)] fn next(&mut self) -> Option { let att_idx = self.iter.next()?; - if let Some(err) = self.pending_err.take() { - return Some(Err(err)); - } Some(unsafe { self.decode_att(att_idx) }) } } From e9858f97ffc6e36fe7309749b547717db6898c52 Mon Sep 17 00:00:00 2001 From: Denis Smirnov Date: Sun, 7 Dec 2025 14:17:57 +0700 Subject: [PATCH 5/5] perf(heap): reduce allocations on heap repack with sort --- ai/memory/gotchas.md | 6 ++++- executor/src/pgscan.rs | 56 +++++++++++++++++++++++------------------- 2 files changed, 36 insertions(+), 26 deletions(-) diff --git a/ai/memory/gotchas.md b/ai/memory/gotchas.md index f976c60..3bfc442 100644 --- a/ai/memory/gotchas.md +++ b/ai/memory/gotchas.md @@ -3,7 +3,7 @@ id: gotchas-0001 type: gotcha scope: repo tags: ["joins", "partitions", "varlena", "signals"] -updated_at: "2025-11-29" +updated_at: "2025-12-07" importance: 0.7 --- @@ -13,3 +13,7 @@ importance: 0.7 - TOAST/Compressed varlena: in projection → error; when not projected → skip, don’t crash. - SIGUSR1: requires a valid client PID; not available on non‑Unix. - SHM races: don’t cache borrowed slices beyond one read cycle; clamp lengths. + +## Executor borrow patterns + +- Caching tuple `(off,len)` pairs in `PgScanStream`: fill a self-owned `Vec<(u16,u16)>` and create the iterator inside a tight scope. Clone needed metadata (`attrs_full`, `proj_indices`) into locals and avoid using `self` while the iterator (borrowing the pairs slice) is alive. This sidesteps borrow checker conflicts between a mutable `&mut self` and an immutable borrow of `self.pairs`. diff --git a/executor/src/pgscan.rs b/executor/src/pgscan.rs index 55b872b..ab3ac13 100644 --- a/executor/src/pgscan.rs +++ b/executor/src/pgscan.rs @@ -347,6 +347,7 @@ pub struct PgScanStream { proj_indices: Arc>, rx: Option>, conn_id: usize, + pairs: Vec<(u16, u16)>, } impl PgScanStream { @@ -363,6 +364,7 @@ impl PgScanStream { proj_indices, rx, conn_id, + pairs: Vec::new(), } } } @@ -392,9 +394,9 @@ impl Stream for PgScanStream { // Create a HeapPage view and iterate tuples let hp = unsafe { HeapPage::from_slice(page) }; + let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema)); let Ok(hp) = hp else { // On error decoding page, return empty batch for resilience - let batch = RecordBatch::new_empty(Arc::clone(&this.proj_schema)); return Poll::Ready(Some(Ok(batch))); }; @@ -402,35 +404,39 @@ impl Stream for PgScanStream { let col_count = total_cols; let mut builders = make_builders(&this.proj_schema, block.num_offsets as usize) .map_err(|e| datafusion::error::DataFusionError::Execution(format!("{e}")))?; - // Use tuples_by_offset to iterate LP_NORMAL tuples in page order - let mut pairs: Vec<(u16, u16)> = Vec::new(); - // Populate pairs once and create iterator borrowing the filled pairs slice - let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut pairs); + // Take local clones of shared metadata to avoid borrowing `this` during tuple iteration + let proj_indices = Arc::clone(&this.proj_indices); + let attrs_full = Arc::clone(&this.attrs_full); let page_hdr = unsafe { &*(page.as_ptr() as *const pg_sys::PageHeaderData) } as *const pg_sys::PageHeaderData; let mut decoded_rows = 0usize; - for tup in it { - // Decode projected columns for tuple using iterator over requested projection - let iter = unsafe { - decode_tuple_project( - page_hdr, - tup, - &this.attrs_full, - this.proj_indices.iter().copied(), - ) - }; - let Ok(mut iter) = iter else { - continue; - }; - // Iterate over projected columns in order - for b in builders.iter_mut().take(total_cols) { - match iter.next() { - Some(Ok(v)) => append_scalar(b, v), - Some(Err(_e)) => append_null(b), - None => append_null(b), + // Limit the borrow of `this.pairs` to this inner scope to satisfy borrow checker + { + // Populate pairs once and create iterator borrowing the filled pairs slice + let it = hp.tuples_by_offset(None, std::ptr::null_mut(), &mut this.pairs); + for tup in it { + // Decode projected columns for tuple using iterator over requested projection + let iter = unsafe { + decode_tuple_project( + page_hdr, + tup, + &attrs_full, + proj_indices.iter().copied(), + ) + }; + let Ok(mut iter) = iter else { + continue; + }; + // Iterate over projected columns in order + for b in builders.iter_mut().take(total_cols) { + match iter.next() { + Some(Ok(v)) => append_scalar(b, v), + Some(Err(_e)) => append_null(b), + None => append_null(b), + } } + decoded_rows += 1; } - decoded_rows += 1; } // Build Arrow arrays and RecordBatch