diff --git a/core/rs/bundle/Cargo.toml b/core/rs/bundle/Cargo.toml index a5bcc151..f79cef3b 100644 --- a/core/rs/bundle/Cargo.toml +++ b/core/rs/bundle/Cargo.toml @@ -16,6 +16,7 @@ crate-type = ["rlib"] crsql_fractindex_core = {path="../fractindex-core"} crsql_core = { path="../core" } sqlite_nostd = { path="../sqlite-rs-embedded/sqlite_nostd" } +libc-print = "*" [profile.dev] panic = "abort" diff --git a/core/rs/bundle/src/lib.rs b/core/rs/bundle/src/lib.rs index 4c4a35df..da000046 100644 --- a/core/rs/bundle/src/lib.rs +++ b/core/rs/bundle/src/lib.rs @@ -11,6 +11,8 @@ use crsql_core::sqlite3_crsqlcore_init; #[cfg(feature = "test")] pub use crsql_core::test_exports; use crsql_fractindex_core::sqlite3_crsqlfractionalindex_init; +#[cfg(feature = "test")] +use libc_print::std_name::println; use sqlite_nostd as sqlite; use sqlite_nostd::SQLite3Allocator; @@ -21,11 +23,20 @@ static ALLOCATOR: SQLite3Allocator = SQLite3Allocator {}; // This must be our panic handler for WASM builds. For simplicity, we make it our panic handler for // all builds. Abort is also more portable than unwind, enabling us to go to more embedded use cases. +#[cfg(not(feature = "test"))] #[panic_handler] fn panic(_info: &PanicInfo) -> ! { core::intrinsics::abort() } +// Print panic info for tests +#[cfg(feature = "test")] +#[panic_handler] +fn panic(info: &PanicInfo) -> ! { + println!("PANIC!: {}", info); + core::intrinsics::abort(); +} + #[cfg(not(target_family = "wasm"))] #[lang = "eh_personality"] extern "C" fn eh_personality() {} diff --git a/core/rs/bundle_static/Cargo.lock b/core/rs/bundle_static/Cargo.lock index 909df5a6..55b91c73 100644 --- a/core/rs/bundle_static/Cargo.lock +++ b/core/rs/bundle_static/Cargo.lock @@ -93,6 +93,7 @@ version = "0.1.0" dependencies = [ "crsql_core", "crsql_fractindex_core", + "libc-print", "sqlite_nostd", ] diff --git a/core/rs/core/src/changes_vtab.rs b/core/rs/core/src/changes_vtab.rs index e75b80ab..47e760b7 100644 --- a/core/rs/core/src/changes_vtab.rs +++ b/core/rs/core/src/changes_vtab.rs @@ -586,6 +586,16 @@ pub extern "C" fn crsql_changes_rollback_to(vtab: *mut sqlite::vtab, _: c_int) - (*(*tab).pExtData).ordinalMap as *mut BTreeMap, i64>, )) }; + + let mut table_infos = unsafe { + mem::ManuallyDrop::new(Box::from_raw( + (*(*tab).pExtData).tableInfos as *mut Vec, + )) + }; + for tbl_info in table_infos.iter_mut() { + tbl_info.clear_cl_cache(); + } + ordinals.clear(); ResultCode::OK as c_int } diff --git a/core/rs/core/src/changes_vtab_write.rs b/core/rs/core/src/changes_vtab_write.rs index 70d94d19..a543f78a 100644 --- a/core/rs/core/src/changes_vtab_write.rs +++ b/core/rs/core/src/changes_vtab_write.rs @@ -380,36 +380,45 @@ pub unsafe extern "C" fn crsql_merge_insert( fn get_local_cl( db: *mut sqlite::sqlite3, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, key: sqlite::int64, ) -> Result { - let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?; - let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; - - let rc = local_cl_stmt - .bind_int64(1, key) - .and_then(|_| local_cl_stmt.bind_int64(2, key)); - if let Err(rc) = rc { - reset_cached_stmt(local_cl_stmt.stmt)?; - return Err(rc); + if let Some(cl) = tbl_info.get_cl(key) { + return Ok(*cl); } - let step_result = local_cl_stmt.step(); - match step_result { - Ok(ResultCode::ROW) => { - let ret = local_cl_stmt.column_int64(0); - reset_cached_stmt(local_cl_stmt.stmt)?; - Ok(ret) - } - Ok(ResultCode::DONE) => { + let cl = { + let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?; + let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?; + + let rc = local_cl_stmt + .bind_int64(1, key) + .and_then(|_| local_cl_stmt.bind_int64(2, key)); + if let Err(rc) = rc { reset_cached_stmt(local_cl_stmt.stmt)?; - Ok(0) + return Err(rc); } - Ok(rc) | Err(rc) => { - reset_cached_stmt(local_cl_stmt.stmt)?; - Err(rc) + + let step_result = local_cl_stmt.step(); + match step_result { + Ok(ResultCode::ROW) => { + let ret = local_cl_stmt.column_int64(0); + reset_cached_stmt(local_cl_stmt.stmt)?; + ret + } + Ok(ResultCode::DONE) => { + reset_cached_stmt(local_cl_stmt.stmt)?; + 0 + } + Ok(rc) | Err(rc) => { + reset_cached_stmt(local_cl_stmt.stmt)?; + return Err(rc); + } } - } + }; + + tbl_info.set_cl(key, cl); + Ok(cl) } unsafe fn merge_insert( @@ -463,9 +472,10 @@ unsafe fn merge_insert( let insert_site_id = insert_site_id.blob(); - let tbl_infos = mem::ManuallyDrop::new(Box::from_raw( + let mut tbl_infos = mem::ManuallyDrop::new(Box::from_raw( (*(*tab).pExtData).tableInfos as *mut Vec, )); + // TODO: will this work given `insert_tbl` is null termed? let tbl_info_index = tbl_infos.iter().position(|x| x.tbl_name == insert_tbl); @@ -480,14 +490,14 @@ unsafe fn merge_insert( // TODO: technically safe since we checked `is_none` but this should be more idiomatic let tbl_info_index = tbl_info_index.unwrap(); - let tbl_info = &tbl_infos[tbl_info_index]; + let tbl_info = &mut tbl_infos[tbl_info_index]; let unpacked_pks = unpack_columns(insert_pks.blob())?; // Get or create key as the first thing we do. // We'll need the key for all later operations. let key = tbl_info.get_or_create_key(db, &unpacked_pks)?; - let local_cl = get_local_cl(db, &tbl_info, key)?; + let local_cl = get_local_cl(db, tbl_info, key)?; // We can ignore all updates from older causal lengths. // They won't win at anything. @@ -593,8 +603,6 @@ unsafe fn merge_insert( *rowid = slab_rowid(tbl_info_index as i32, inner_rowid); } return Ok(ResultCode::OK); - - } // we got a causal length which would resurrect the row. @@ -712,6 +720,11 @@ unsafe fn merge_insert( *errmsg = err.into_raw(); return Err(rc); } + + // a bigger cl always wins + if insert_cl > local_cl { + tbl_info.set_cl(key, insert_cl); + } } res diff --git a/core/rs/core/src/commit.rs b/core/rs/core/src/commit.rs index 4c9e4335..096482e9 100644 --- a/core/rs/core/src/commit.rs +++ b/core/rs/core/src/commit.rs @@ -8,6 +8,7 @@ use core::{ use sqlite_nostd::ResultCode; use crate::c::crsql_ExtData; +use crate::tableinfo::TableInfo; #[no_mangle] pub unsafe extern "C" fn crsql_commit_hook(user_data: *mut c_void) -> c_int { @@ -37,5 +38,13 @@ pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) { let mut ordinals: mem::ManuallyDrop, i64>>> = mem::ManuallyDrop::new( Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap, i64>), ); + + let mut table_infos = unsafe { + mem::ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec)) + }; ordinals.clear(); + + for tbl_info in table_infos.iter_mut() { + tbl_info.clear_cl_cache(); + } } diff --git a/core/rs/core/src/lib.rs b/core/rs/core/src/lib.rs index a8a0554d..368ce6b5 100644 --- a/core/rs/core/src/lib.rs +++ b/core/rs/core/src/lib.rs @@ -73,6 +73,8 @@ use local_writes::after_update::x_crsql_after_update; use sqlite::{Destructor, ResultCode}; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context, Value}; +#[cfg(feature = "test")] +use tableinfo::TableInfo; use tableinfo::{crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info}; use teardown::*; use triggers::create_triggers; @@ -453,19 +455,31 @@ pub extern "C" fn sqlite3_crsqlcore_init( } #[cfg(feature = "test")] - let rc = db - .create_function_v2( - "crsql_cache_site_ordinal", - 1, - sqlite::UTF8 | sqlite::DETERMINISTIC, - Some(ext_data as *mut c_void), - Some(x_crsql_cache_site_ordinal), - None, - None, - None, - ) - .unwrap_or(ResultCode::ERROR); - if rc != ResultCode::OK { + if let Err(_) = db.create_function_v2( + "crsql_cache_site_ordinal", + 1, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_cache_site_ordinal), + None, + None, + None, + ) { + unsafe { crsql_freeExtData(ext_data) }; + return null_mut(); + } + + #[cfg(feature = "test")] + if let Err(_) = db.create_function_v2( + "crsql_cache_pk_cl", + 2, + sqlite::UTF8 | sqlite::DETERMINISTIC, + Some(ext_data as *mut c_void), + Some(x_crsql_cache_pk_cl), + None, + None, + None, + ) { unsafe { crsql_freeExtData(ext_data) }; return null_mut(); } @@ -975,6 +989,40 @@ unsafe extern "C" fn x_crsql_cache_site_ordinal( sqlite::result_int64(ctx, res); } +/** + * Get the pk cl cached in the ext data for the current transaction. + * only used for test to inspect the cl cache. + */ +#[cfg(feature = "test")] +unsafe extern "C" fn x_crsql_cache_pk_cl( + ctx: *mut sqlite::context, + argc: i32, + argv: *mut *mut sqlite::value, +) { + if argc < 2 { + ctx.result_error( + "Wrong number of args provided to crsql_cache_pk_cl. Provide the table name and pk key.", + ); + return; + } + + let ext_data = ctx.user_data() as *mut c::crsql_ExtData; + let args = sqlite::args!(argc, argv); + let table_name = args[0].text(); + let pk_key = args[1].int64(); + + let table_infos = + mem::ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec)); + let table_info = table_infos.iter().find(|t| t.tbl_name == table_name); + + if let Some(table_info) = table_info { + let cl = table_info.get_cl(pk_key).cloned().unwrap_or(-1); + sqlite::result_int64(ctx, cl); + } else { + ctx.result_error("table not found"); + } +} + /** * Return the timestamp for the current transaction. */ diff --git a/core/rs/core/src/local_writes/after_delete.rs b/core/rs/core/src/local_writes/after_delete.rs index 1f5b922e..724a4b38 100644 --- a/core/rs/core/src/local_writes/after_delete.rs +++ b/core/rs/core/src/local_writes/after_delete.rs @@ -10,6 +10,7 @@ use sqlite_nostd as sqlite; use crate::{c::crsql_ExtData, tableinfo::TableInfo}; use super::bump_seq; +use super::mark_locally_deleted; use super::trigger_fn_preamble; /** @@ -37,7 +38,7 @@ pub unsafe extern "C" fn x_crsql_after_delete( fn after_delete( db: *mut sqlite3, ext_data: *mut crsql_ExtData, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, pks_old: &[*mut value], ) -> Result { let ts = unsafe { (*ext_data).timestamp.to_string() }; @@ -47,30 +48,24 @@ fn after_delete( .get_or_create_key_via_raw_values(db, pks_old) .map_err(|_| "failed getting or creating lookaside key")?; - let mark_locally_deleted_stmt_ref = tbl_info - .get_mark_locally_deleted_stmt(db) - .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; - let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref - .as_ref() - .ok_or("Failed to deref sentinel stmt")?; - mark_locally_deleted_stmt - .bind_int64(1, key) - .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) - .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) - .and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC)) - .map_err(|_| "failed binding to mark locally deleted stmt")?; - super::step_trigger_stmt(mark_locally_deleted_stmt)?; + let cl = mark_locally_deleted(db, tbl_info, key, db_version, seq, &ts)?; - // now actually delete the row metadata - let drop_clocks_stmt_ref = tbl_info - .get_merge_delete_drop_clocks_stmt(db) - .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; - let drop_clocks_stmt = drop_clocks_stmt_ref - .as_ref() - .ok_or("Failed to deref sentinel stmt")?; + { + // now actually delete the row metadata + let drop_clocks_stmt_ref = tbl_info + .get_merge_delete_drop_clocks_stmt(db) + .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; + let drop_clocks_stmt = drop_clocks_stmt_ref + .as_ref() + .ok_or("Failed to deref sentinel stmt")?; - drop_clocks_stmt - .bind_int64(1, key) - .map_err(|_e| "failed to bind pks to drop_clocks_stmt")?; - super::step_trigger_stmt(drop_clocks_stmt) + drop_clocks_stmt + .bind_int64(1, key) + .map_err(|_e| "failed to bind pks to drop_clocks_stmt")?; + super::step_trigger_stmt(drop_clocks_stmt)?; + } + + tbl_info.set_cl(key, cl); + + Ok(ResultCode::OK) } diff --git a/core/rs/core/src/local_writes/after_insert.rs b/core/rs/core/src/local_writes/after_insert.rs index e7a25c7c..2a4cf4e8 100644 --- a/core/rs/core/src/local_writes/after_insert.rs +++ b/core/rs/core/src/local_writes/after_insert.rs @@ -37,7 +37,7 @@ pub unsafe extern "C" fn x_crsql_after_insert( fn after_insert( db: *mut sqlite3, ext_data: *mut crsql_ExtData, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, pks_new: &[*mut value], ) -> Result { let ts = unsafe { (*ext_data).timestamp.to_string() }; @@ -46,17 +46,27 @@ fn after_insert( let (create_record_existed, key_new) = tbl_info .get_or_create_key_for_insert(db, pks_new) .map_err(|_| "failed getting or creating lookaside key")?; - if tbl_info.non_pks.is_empty() { + + let cl = if tbl_info.non_pks.is_empty() { let seq = bump_seq(ext_data); // just a sentinel record - return super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq, &ts); - } else if create_record_existed { - // update the create record since it already exists. - let seq = bump_seq(ext_data); - update_create_record(db, tbl_info, key_new, db_version, seq, &ts)?; - } + let cl = super::mark_new_pk_row_created(db, tbl_info, key_new, db_version, seq, &ts)?; + Some(cl) + } else { + let cl = if create_record_existed { + // update the create record since it already exists. + let seq = bump_seq(ext_data); + update_create_record(db, tbl_info, key_new, db_version, seq, &ts)? + } else { + None + }; + super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; + cl + }; - super::mark_locally_inserted(db, ext_data, tbl_info, key_new, db_version, &ts)?; + if let Some(cl) = cl { + tbl_info.set_cl(key_new, cl); + } Ok(ResultCode::OK) } @@ -68,7 +78,7 @@ fn update_create_record( db_version: sqlite::int64, seq: i32, ts: &str, -) -> Result { +) -> Result, String> { let update_create_record_stmt_ref = tbl_info .get_maybe_mark_locally_reinserted_stmt(db) .map_err(|_e| "failed to get update_create_record_stmt")?; @@ -90,5 +100,16 @@ fn update_create_record( }) .map_err(|_e| "failed binding to update_create_record_stmt")?; - super::step_trigger_stmt(update_create_record_stmt) + let res = update_create_record_stmt.step(); + let result = match res { + Ok(ResultCode::ROW) => { + let col_version = update_create_record_stmt.column_int64(0); + Ok(Some(col_version)) + } + Ok(ResultCode::DONE) => Ok(None), + _ => Err("failed to step update_create_record_stmt".to_string()), + }; + super::reset_cached_stmt(update_create_record_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + result } diff --git a/core/rs/core/src/local_writes/after_update.rs b/core/rs/core/src/local_writes/after_update.rs index 43f67b5f..b75b55ef 100644 --- a/core/rs/core/src/local_writes/after_update.rs +++ b/core/rs/core/src/local_writes/after_update.rs @@ -66,7 +66,7 @@ fn partition_values( fn after_update( db: *mut sqlite3, ext_data: *mut crsql_ExtData, - tbl_info: &TableInfo, + tbl_info: &mut TableInfo, pks_new: &[*mut value], pks_old: &[*mut value], non_pks_new: &[*mut value], @@ -81,39 +81,37 @@ fn after_update( let mut changed = false; // Changing a primary key column to a new value is the same thing as deleting the row // previously identified by the primary key. - if crate::compare_values::any_value_changed(pks_new, pks_old)? { - let old_key = tbl_info - .get_or_create_key_via_raw_values(db, pks_old) - .map_err(|_| "failed geteting or creating lookaside key")?; - let next_seq = super::bump_seq(ext_data); - changed = true; - // Record the delete of the row identified by the old primary keys - after_update__mark_old_pk_row_deleted( - db, - tbl_info, - old_key, - next_db_version, - next_seq, - &ts, - )?; - let next_seq = super::bump_seq(ext_data); - // todo: we don't need to this, if there's no existing row (cl is assumed to be 1). - super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq, &ts)?; - for col in tbl_info.non_pks.iter() { + let cl_info = { + if crate::compare_values::any_value_changed(pks_new, pks_old)? { + let old_key = tbl_info + .get_or_create_key_via_raw_values(db, pks_old) + .map_err(|_| "failed getting or creating lookaside key")?; let next_seq = super::bump_seq(ext_data); - after_update__move_non_pk_col( - db, - tbl_info, - new_key, - old_key, - &col.name, - next_db_version, - &ts, - next_seq, - )?; + changed = true; + // Record the delete of the row identified by the old primary keys + let cl = + super::mark_locally_deleted(db, tbl_info, old_key, next_db_version, next_seq, &ts)?; + let next_seq = super::bump_seq(ext_data); + // todo: we don't need to this, if there's no existing row (cl is assumed to be 1). + super::mark_new_pk_row_created(db, tbl_info, new_key, next_db_version, next_seq, &ts)?; + for col in tbl_info.non_pks.iter() { + let next_seq = super::bump_seq(ext_data); + after_update__move_non_pk_col( + db, + tbl_info, + new_key, + old_key, + &col.name, + next_db_version, + &ts, + next_seq, + )?; + } + Some((old_key, cl)) + } else { + None } - } - + }; // now for each non_pk_col we need to do an insert // where new value is not old value for ((new, old), col_info) in non_pks_new @@ -144,32 +142,11 @@ fn after_update( crate::db_version::next_db_version(db, ext_data)?; } - Ok(ResultCode::OK) -} + if let Some((old_key, cl)) = cl_info { + tbl_info.set_cl(old_key, cl); + } -#[allow(non_snake_case)] -fn after_update__mark_old_pk_row_deleted( - db: *mut sqlite3, - tbl_info: &TableInfo, - old_key: sqlite::int64, - db_version: sqlite::int64, - seq: i32, - ts: &str, -) -> Result { - let mark_locally_deleted_stmt_ref = tbl_info - .get_mark_locally_deleted_stmt(db) - .or_else(|_e| Err("failed to get mark_locally_deleted_stmt"))?; - let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref - .as_ref() - .ok_or("Failed to deref sentinel stmt")?; - mark_locally_deleted_stmt - .bind_int64(1, old_key) - .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) - .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) - .and_then(|_| mark_locally_deleted_stmt.bind_text(4, ts, sqlite::Destructor::STATIC)) - // .and_then(|_| mark_locally_deleted_stmt.bind_int64(4, db_version)) - .or_else(|_| Err("failed binding to mark_locally_deleted_stmt"))?; - super::step_trigger_stmt(mark_locally_deleted_stmt) + Ok(ResultCode::OK) } #[allow(non_snake_case)] diff --git a/core/rs/core/src/local_writes/mod.rs b/core/rs/core/src/local_writes/mod.rs index f89483b5..f14100e9 100644 --- a/core/rs/core/src/local_writes/mod.rs +++ b/core/rs/core/src/local_writes/mod.rs @@ -26,7 +26,7 @@ fn trigger_fn_preamble( f: F, ) -> Result where - F: Fn(&TableInfo, &[*mut sqlite::value], *mut crsql_ExtData) -> Result, + F: Fn(&mut TableInfo, &[*mut sqlite::value], *mut crsql_ExtData) -> Result, { if argc < 1 { return Err("expected at least 1 argument".to_string()); @@ -45,10 +45,10 @@ where )); } - let table_infos = + let mut table_infos = unsafe { ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec)) }; let table_name = values[0].text(); - let table_info = match table_infos.iter().find(|t| &(t.tbl_name) == table_name) { + let table_info = match table_infos.iter_mut().find(|t| &(t.tbl_name) == table_name) { Some(t) => t, None => { return Err(format!("table {} not found", table_name)); @@ -84,7 +84,7 @@ fn mark_new_pk_row_created( db_version: i64, seq: i32, ts: &str, -) -> Result { +) -> Result { let mark_locally_created_stmt_ref = tbl_info .get_mark_locally_created_stmt(db) .map_err(|_e| "failed to get mark_locally_created_stmt")?; @@ -98,7 +98,17 @@ fn mark_new_pk_row_created( .and_then(|_| mark_locally_created_stmt.bind_int(3, seq)) .and_then(|_| mark_locally_created_stmt.bind_text(4, ts, sqlite::Destructor::STATIC)) .map_err(|_| "failed binding to mark_locally_created_stmt")?; - step_trigger_stmt(mark_locally_created_stmt) + let stmt_res = mark_locally_created_stmt.step(); + let result = match stmt_res { + Ok(ResultCode::ROW) => { + let cl = mark_locally_created_stmt.column_int64(0); + Ok(cl) + } + _ => Err("failed to step mark locally created stmt".to_string()), + }; + reset_cached_stmt(mark_locally_created_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + result } fn bump_seq(ext_data: *mut crsql_ExtData) -> c_int { @@ -267,3 +277,42 @@ fn mark_locally_updated( Ok(ResultCode::OK) } + +fn mark_locally_deleted( + db: *mut sqlite3, + tbl_info: &TableInfo, + key: sqlite::int64, + db_version: sqlite::int64, + seq: i32, + ts: &str, +) -> Result { + let mark_locally_deleted_stmt_ref = tbl_info + .get_mark_locally_deleted_stmt(db) + .map_err(|_e| "failed to get mark_locally_deleted_stmt")?; + + let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref + .as_ref() + .ok_or("Failed to deref sentinel stmt")?; + + mark_locally_deleted_stmt + .bind_int64(1, key) + .and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version)) + .and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq)) + .and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC)) + .map_err(|_| "failed binding to mark locally deleted stmt")?; + + let stmt_res = mark_locally_deleted_stmt.step(); + + let result = match stmt_res { + Ok(ResultCode::ROW) => { + let cl = mark_locally_deleted_stmt.column_int64(0); + Ok(cl) + } + _ => Err("failed to step mark locally deleted stmt".to_string()), + }; + + reset_cached_stmt(mark_locally_deleted_stmt.stmt) + .map_err(|_e| "failed to reset cached stmt")?; + + result +} diff --git a/core/rs/core/src/tableinfo.rs b/core/rs/core/src/tableinfo.rs index 5a6eb6bc..081d54b3 100644 --- a/core/rs/core/src/tableinfo.rs +++ b/core/rs/core/src/tableinfo.rs @@ -7,6 +7,7 @@ use crate::pack_columns::ColumnValue; use crate::stmt_cache::reset_cached_stmt; use crate::util::Countable; use alloc::boxed::Box; +use alloc::collections::BTreeMap; use alloc::format; use alloc::string::String; use alloc::vec; @@ -27,6 +28,8 @@ use sqlite_nostd::ResultCode; use sqlite_nostd::Stmt; use sqlite_nostd::StrRef; +// TODO: make this configurable with a crsql_config_set. +const MAX_CL_CACHE_SIZE: usize = 1500; pub struct TableInfo { pub tbl_name: String, pub pks: Vec, @@ -66,9 +69,28 @@ pub struct TableInfo { move_non_sentinels_stmt: RefCell>, mark_locally_created_stmt: RefCell>, maybe_mark_locally_reinserted_stmt: RefCell>, + cl_cache: BTreeMap, } impl TableInfo { + pub fn get_cl(&self, key: i64) -> Option<&i64> { + self.cl_cache.get(&key) + } + + pub fn set_cl(&mut self, key: i64, cl: i64) { + // clear the cache if we are over limit + if self.cl_cache.len() >= MAX_CL_CACHE_SIZE { + self.cl_cache.clear(); + } + self.cl_cache.insert(key, cl); + } + + pub fn clear_cl_cache(&mut self) { + if !self.cl_cache.is_empty() { + self.cl_cache.clear(); + } + } + fn find_non_pk_col(&self, col_name: &str) -> Result<&ColumnInfo, ResultCode> { for col in &self.non_pks { if col.name == col_name { @@ -458,7 +480,8 @@ impl TableInfo { db_version = excluded.db_version, seq = excluded.seq, site_id = 0, - ts = excluded.ts", + ts = excluded.ts + RETURNING col_version", table_name = crate::util::escape_ident(&self.tbl_name), sentinel = crate::c::DELETE_SENTINEL, ); @@ -539,7 +562,8 @@ impl TableInfo { db_version = excluded.db_version, seq = excluded.seq, site_id = 0, - ts = excluded.ts", + ts = excluded.ts + RETURNING col_version", table_name = crate::util::escape_ident(&self.tbl_name), sentinel = crate::c::INSERT_SENTINEL, ); @@ -644,7 +668,8 @@ impl TableInfo { seq = ?, site_id = 0, ts = ? - WHERE key = ? AND col_name = ?", + WHERE key = ? AND col_name = ? + RETURNING col_version", table_name = crate::util::escape_ident(&self.tbl_name), ); let ret = db.prepare_v3(&sql, sqlite::PREPARE_PERSISTENT)?; @@ -862,7 +887,8 @@ pub extern "C" fn crsql_ensure_table_infos_are_up_to_date( return ResultCode::ERROR as c_int; } - let mut table_infos = unsafe { Box::from_raw((*ext_data).tableInfos as *mut Vec) }; + let mut table_infos: Box> = + unsafe { Box::from_raw((*ext_data).tableInfos as *mut Vec) }; if schema_changed > 0 || table_infos.len() == 0 { match pull_all_table_infos(db, ext_data, err) { @@ -1006,6 +1032,7 @@ pub fn pull_table_info( select_clock_stmt: RefCell::new(None), insert_clock_stmt: RefCell::new(None), update_clock_stmt: RefCell::new(None), + cl_cache: BTreeMap::new(), }) } diff --git a/core/rs/integration_check/Cargo.lock b/core/rs/integration_check/Cargo.lock index 9ce49607..fd9e431f 100644 --- a/core/rs/integration_check/Cargo.lock +++ b/core/rs/integration_check/Cargo.lock @@ -123,6 +123,7 @@ version = "0.1.0" dependencies = [ "crsql_core", "crsql_fractindex_core", + "libc-print", "sqlite_nostd", ] diff --git a/core/rs/integration_check/src/t/pk_only_tables.rs b/core/rs/integration_check/src/t/pk_only_tables.rs index 8784caf8..3e8135d1 100644 --- a/core/rs/integration_check/src/t/pk_only_tables.rs +++ b/core/rs/integration_check/src/t/pk_only_tables.rs @@ -149,7 +149,8 @@ fn modify_pkonly_row() -> Result<(), ResultCode> { .db .prepare_v2("UPDATE foo SET id = 2 WHERE id = 1;") .expect("prepare set to foo"); - stmt.step().expect("step update to foo"); + let result = stmt.step(); + assert_eq!(result, Ok(ResultCode::DONE), "failed to update foo"); sync_left_to_right(&db_a.db, &db_b.db, -1); diff --git a/core/rs/integration_check/src/t/tableinfo.rs b/core/rs/integration_check/src/t/tableinfo.rs index 1f5574c7..f7b9f982 100644 --- a/core/rs/integration_check/src/t/tableinfo.rs +++ b/core/rs/integration_check/src/t/tableinfo.rs @@ -366,7 +366,9 @@ fn test_site_id_initialization() { let raw_db = db.db.db; let site_id = select_site_id(raw_db).expect("selected site id"); assert_eq!(site_id.len(), 16); - raw_db.exec_safe("DELETE FROM crsql_site_id;").expect("deleted site id"); + raw_db + .exec_safe("DELETE FROM crsql_site_id;") + .expect("deleted site id"); } { @@ -374,7 +376,9 @@ fn test_site_id_initialization() { let raw_db = db.db.db; let site_id = select_site_id(raw_db).expect("selected site id"); assert_eq!(site_id.len(), 16); - raw_db.exec_safe("DROP TABLE crsql_site_id;").expect("dropped crsql_site_id"); + raw_db + .exec_safe("DROP TABLE crsql_site_id;") + .expect("dropped crsql_site_id"); } { diff --git a/core/rs/integration_check/src/t/test_db_version.rs b/core/rs/integration_check/src/t/test_db_version.rs index aacf08ab..6eee9230 100644 --- a/core/rs/integration_check/src/t/test_db_version.rs +++ b/core/rs/integration_check/src/t/test_db_version.rs @@ -3,7 +3,7 @@ use alloc::{ffi::CString, format, string::String}; use core::ffi::c_char; use crsql_bundle::test_exports; use sqlite::{Connection, ResultCode}; -use sqlite_nostd as sqlite; +use sqlite_nostd::{self as sqlite, ManagedStmt}; fn make_site() -> *mut c_char { let inner_ptr: *mut c_char = CString::new("0000000000000000").unwrap().into_raw(); @@ -19,7 +19,8 @@ fn get_site_id(db: *mut sqlite::sqlite3) -> *mut c_char { let blob_ptr = stmt.column_blob(0).expect("failed to get site_id"); - let cstring = CString::new(blob_ptr.to_vec()).expect("failed to create CString from site id"); + // use vec_unchecked because `new` errors if there's a 0 byte in the vec. + let cstring = unsafe { CString::from_vec_unchecked(blob_ptr.to_vec()) }; cstring.into_raw() as *mut c_char } @@ -182,6 +183,178 @@ fn test_get_or_set_site_ordinal() -> Result<(), ResultCode> { Ok(()) } +fn test_get_or_set_pk_cl() -> Result<(), ResultCode> { + let c = crate::opendb().expect("db opened"); + let db = &c.db; + db.db + .exec_safe("CREATE TABLE foo (a primary key not null, b);")?; + + db.db.exec_safe("SELECT crsql_as_crr('foo');")?; + + let insert_foo_stmt = db.db.prepare_v2("INSERT INTO foo VALUES (?, ?);")?; + + let get_pk_key_stmt = db + .db + .prepare_v2("SELECT __crsql_key from foo__crsql_pks where a = ?;")?; + + let get_cache_cl_stmt = db.db.prepare_v2("SELECT crsql_cache_pk_cl(?, ?);")?; + + db.db.exec_safe("BEGIN TRANSACTION;")?; + + // insert row, pl cache doesn't get updated on new insert + + insert_foo_row(&insert_foo_stmt, 1, "b")?; + + insert_foo_row(&insert_foo_stmt, 2, "c")?; + + insert_foo_row(&insert_foo_stmt, 4, "d")?; + + let key1 = get_pk_key(&get_pk_key_stmt, 1).expect("get pk key"); + let key2 = get_pk_key(&get_pk_key_stmt, 2).expect("get pk key"); + + let delete_foo_stmt = db + .db + .prepare_v2("DELETE FROM foo WHERE a = ?;") + .expect("prepare delete foo"); + delete_foo_stmt.bind_int64(1, 1)?; + delete_foo_stmt.step()?; + + // replace pk 2 with pk 3 + let update_foo_stmt = db + .db + .prepare_v2("UPDATE foo SET a = ? WHERE a = ?;") + .expect("prepare update foo"); + update_foo_stmt.bind_int64(1, 3)?; + update_foo_stmt.bind_int64(2, 2)?; + update_foo_stmt.step()?; + + // pk 1 and 2 should have a cl of 2 since they have been deleted + assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key1)?); + assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); + + // reinsert pk 2, check cl is 3 + insert_foo_row(&insert_foo_stmt, 2, "d")?; + assert_eq!(3, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); + + // check insert or replace updates the cache + let insert_or_replace = db + .db + .prepare_v2("INSERT OR REPLACE INTO foo VALUES (?, ?);")?; + insert_or_replace.bind_int64(1, 4)?; + insert_or_replace.bind_text(2, "c", sqlite::Destructor::STATIC)?; + insert_or_replace.step()?; + reset_cached_stmt(&insert_or_replace)?; + + // insert of pk with no clock row gets no update + let key4 = get_pk_key(&get_pk_key_stmt, 4).expect("get pk key"); + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + db.db.exec_safe("COMMIT;")?; + + // commit clears the cache + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key1)?); + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key2)?); + + db.db.exec_safe("BEGIN TRANSACTION;")?; + db.db.exec_safe("SAVEPOINT test;")?; + + // new site_id in crsql_changes table + // pk number is 4 + let pk: [u8; 3] = [1, 9, 4]; + + // insert should update the cache. + insert_crsql_changes_row(db.db, &pk, "b", "e", 1, 1, 1).expect("insert crsql changes row"); + let key4 = get_pk_key(&get_pk_key_stmt, 4).expect("get pk key"); + assert_eq!(1, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + // a delete should also update the cache. + insert_crsql_changes_row(db.db, &pk, "-1", "", 2, 2, 2)?; + assert_eq!(2, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + // test that a resurrected cache would also get updated. + insert_crsql_changes_row(db.db, &pk, "b", "f", 1, 3, 5)?; + assert_eq!(5, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + // a lower cl should not update the cache + insert_crsql_changes_row(db.db, &pk, "b", "e", 1, 3, 3)?; + assert_eq!(5, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + db.db.exec_safe("ROLLBACK TO SAVEPOINT test;")?; + + assert_eq!(-1, get_cache_cl(&get_cache_cl_stmt, "foo", key4)?); + + Ok(()) +} + +fn insert_crsql_changes_row( + db: *mut sqlite::sqlite3, + pk: &[u8], + cid: &str, + val: &str, + col_version: i64, + db_version: i64, + cl: i64, +) -> Result<(), ResultCode> { + let stmt = db.prepare_v2( + "INSERT INTO crsql_changes VALUES ('foo', ?, ?, ?, ?, ?, X'0000000000000000', ?, 0, 0);", + )?; + stmt.bind_blob(1, pk, sqlite::Destructor::STATIC)?; + stmt.bind_text(2, cid, sqlite::Destructor::STATIC)?; + if cid == "-1" { + stmt.bind_null(3)?; + } else { + stmt.bind_text(3, val, sqlite::Destructor::STATIC)?; + } + stmt.bind_int64(4, col_version)?; + stmt.bind_int64(5, db_version)?; + stmt.bind_int64(6, cl)?; + stmt.step()?; + Ok(()) +} + +fn insert_foo_row(stmt: &ManagedStmt, col1: i64, col2: &str) -> Result<(), ResultCode> { + stmt.bind_int64(1, col1)?; + stmt.bind_text(2, col2, sqlite::Destructor::STATIC)?; + stmt.step()?; + + reset_cached_stmt(stmt)?; + Ok(()) +} + +fn get_pk_key(stmt: &ManagedStmt, pk_value: i64) -> Result { + stmt.bind_int64(1, pk_value)?; + let res = stmt.step()?; + match res { + ResultCode::ROW => { + let key = stmt.column_int64(0); + reset_cached_stmt(stmt)?; + Ok(key) + } + _ => { + reset_cached_stmt(stmt)?; + Err(ResultCode::ERROR) + } + } +} + +fn get_cache_cl(stmt: &ManagedStmt, table_name: &str, pk_key: i64) -> Result { + stmt.bind_text(1, table_name, sqlite::Destructor::STATIC) + .expect("bind table name"); + stmt.bind_int64(2, pk_key).expect("bind pk key"); + let res = stmt.step(); + match res { + Ok(ResultCode::ROW) => { + let key = stmt.column_int64(0); + reset_cached_stmt(stmt).expect("reset cached stmt"); + Ok(key) + } + _ => { + reset_cached_stmt(stmt).expect("reset cached stmt"); + Err(ResultCode::ERROR) + } + } +} + fn get_cache_ordinal(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result { let stmt = db.prepare_v2("SELECT crsql_cache_site_ordinal(?);")?; stmt.bind_blob(1, site_id, sqlite::Destructor::STATIC)?; @@ -192,8 +365,13 @@ fn get_cache_ordinal(db: *mut sqlite::sqlite3, site_id: &[u8]) -> Result Result<(), String> { test_fetch_db_version_from_storage()?; test_next_db_version()?; - if let Err(rc) = test_get_or_set_site_ordinal() { - return Err(format!("test_get_or_set_site_ordinal failed: {:?}", rc)); - } + test_get_or_set_site_ordinal() + .map_err(|e| format!("test_get_or_set_site_ordinal failed: {:?}", e))?; + test_get_or_set_pk_cl().map_err(|e| format!("test_get_or_set_pk_cl failed: {:?}", e))?; Ok(()) } + +pub fn reset_cached_stmt(stmt: &ManagedStmt) -> Result { + stmt.clear_bindings()?; + stmt.reset() +}