Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/rs/bundle/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ crate-type = ["rlib"]
crsql_fractindex_core = {path="../fractindex-core"}
crsql_core = { path="../core" }
sqlite_nostd = { path="../sqlite-rs-embedded/sqlite_nostd" }
libc-print = "*"

[profile.dev]
panic = "abort"
Expand Down
11 changes: 11 additions & 0 deletions core/rs/bundle/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use crsql_core::sqlite3_crsqlcore_init;
#[cfg(feature = "test")]
pub use crsql_core::test_exports;
use crsql_fractindex_core::sqlite3_crsqlfractionalindex_init;
#[cfg(feature = "test")]
use libc_print::std_name::println;
use sqlite_nostd as sqlite;
use sqlite_nostd::SQLite3Allocator;

Expand All @@ -21,11 +23,20 @@ static ALLOCATOR: SQLite3Allocator = SQLite3Allocator {};

// This must be our panic handler for WASM builds. For simplicity, we make it our panic handler for
// all builds. Abort is also more portable than unwind, enabling us to go to more embedded use cases.
#[cfg(not(feature = "test"))]
#[panic_handler]
fn panic(_info: &PanicInfo) -> ! {
core::intrinsics::abort()
}

// Print panic info for tests
#[cfg(feature = "test")]
#[panic_handler]
fn panic(info: &PanicInfo) -> ! {
println!("PANIC!: {}", info);
core::intrinsics::abort();
}

#[cfg(not(target_family = "wasm"))]
#[lang = "eh_personality"]
extern "C" fn eh_personality() {}
Expand Down
1 change: 1 addition & 0 deletions core/rs/bundle_static/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions core/rs/core/src/changes_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,16 @@ pub extern "C" fn crsql_changes_rollback_to(vtab: *mut sqlite::vtab, _: c_int) -
(*(*tab).pExtData).ordinalMap as *mut BTreeMap<Vec<u8>, i64>,
))
};

let mut table_infos = unsafe {
mem::ManuallyDrop::new(Box::from_raw(
(*(*tab).pExtData).tableInfos as *mut Vec<TableInfo>,
))
};
for tbl_info in table_infos.iter_mut() {
tbl_info.clear_cl_cache();
}

ordinals.clear();
ResultCode::OK as c_int
}
69 changes: 41 additions & 28 deletions core/rs/core/src/changes_vtab_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,36 +380,45 @@ pub unsafe extern "C" fn crsql_merge_insert(

fn get_local_cl(
db: *mut sqlite::sqlite3,
tbl_info: &TableInfo,
tbl_info: &mut TableInfo,
key: sqlite::int64,
) -> Result<sqlite::int64, ResultCode> {
let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?;
let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

let rc = local_cl_stmt
.bind_int64(1, key)
.and_then(|_| local_cl_stmt.bind_int64(2, key));
if let Err(rc) = rc {
reset_cached_stmt(local_cl_stmt.stmt)?;
return Err(rc);
if let Some(cl) = tbl_info.get_cl(key) {
return Ok(*cl);
}

let step_result = local_cl_stmt.step();
match step_result {
Ok(ResultCode::ROW) => {
let ret = local_cl_stmt.column_int64(0);
reset_cached_stmt(local_cl_stmt.stmt)?;
Ok(ret)
}
Ok(ResultCode::DONE) => {
let cl = {
let local_cl_stmt_ref = tbl_info.get_local_cl_stmt(db)?;
let local_cl_stmt = local_cl_stmt_ref.as_ref().ok_or(ResultCode::ERROR)?;

let rc = local_cl_stmt
.bind_int64(1, key)
.and_then(|_| local_cl_stmt.bind_int64(2, key));
if let Err(rc) = rc {
reset_cached_stmt(local_cl_stmt.stmt)?;
Ok(0)
return Err(rc);
}
Ok(rc) | Err(rc) => {
reset_cached_stmt(local_cl_stmt.stmt)?;
Err(rc)

let step_result = local_cl_stmt.step();
match step_result {
Ok(ResultCode::ROW) => {
let ret = local_cl_stmt.column_int64(0);
reset_cached_stmt(local_cl_stmt.stmt)?;
ret
}
Ok(ResultCode::DONE) => {
reset_cached_stmt(local_cl_stmt.stmt)?;
0
}
Ok(rc) | Err(rc) => {
reset_cached_stmt(local_cl_stmt.stmt)?;
return Err(rc);
}
}
}
};

tbl_info.set_cl(key, cl);
Ok(cl)
}

unsafe fn merge_insert(
Expand Down Expand Up @@ -463,9 +472,10 @@ unsafe fn merge_insert(

let insert_site_id = insert_site_id.blob();

let tbl_infos = mem::ManuallyDrop::new(Box::from_raw(
let mut tbl_infos = mem::ManuallyDrop::new(Box::from_raw(
(*(*tab).pExtData).tableInfos as *mut Vec<TableInfo>,
));

// TODO: will this work given `insert_tbl` is null termed?
let tbl_info_index = tbl_infos.iter().position(|x| x.tbl_name == insert_tbl);

Expand All @@ -480,14 +490,14 @@ unsafe fn merge_insert(
// TODO: technically safe since we checked `is_none` but this should be more idiomatic
let tbl_info_index = tbl_info_index.unwrap();

let tbl_info = &tbl_infos[tbl_info_index];
let tbl_info = &mut tbl_infos[tbl_info_index];
let unpacked_pks = unpack_columns(insert_pks.blob())?;

// Get or create key as the first thing we do.
// We'll need the key for all later operations.
let key = tbl_info.get_or_create_key(db, &unpacked_pks)?;

let local_cl = get_local_cl(db, &tbl_info, key)?;
let local_cl = get_local_cl(db, tbl_info, key)?;

// We can ignore all updates from older causal lengths.
// They won't win at anything.
Expand Down Expand Up @@ -593,8 +603,6 @@ unsafe fn merge_insert(
*rowid = slab_rowid(tbl_info_index as i32, inner_rowid);
}
return Ok(ResultCode::OK);


}

// we got a causal length which would resurrect the row.
Expand Down Expand Up @@ -712,6 +720,11 @@ unsafe fn merge_insert(
*errmsg = err.into_raw();
return Err(rc);
}

// a bigger cl always wins
if insert_cl > local_cl {
tbl_info.set_cl(key, insert_cl);
}
}

res
Expand Down
9 changes: 9 additions & 0 deletions core/rs/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use core::{
use sqlite_nostd::ResultCode;

use crate::c::crsql_ExtData;
use crate::tableinfo::TableInfo;

#[no_mangle]
pub unsafe extern "C" fn crsql_commit_hook(user_data: *mut c_void) -> c_int {
Expand Down Expand Up @@ -37,5 +38,13 @@ pub unsafe fn commit_or_rollback_reset(ext_data: *mut crsql_ExtData) {
let mut ordinals: mem::ManuallyDrop<Box<BTreeMap<Vec<u8>, i64>>> = mem::ManuallyDrop::new(
Box::from_raw((*ext_data).ordinalMap as *mut BTreeMap<Vec<u8>, i64>),
);

let mut table_infos = unsafe {
mem::ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec<TableInfo>))
};
ordinals.clear();

for tbl_info in table_infos.iter_mut() {
tbl_info.clear_cl_cache();
}
}
74 changes: 61 additions & 13 deletions core/rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ use local_writes::after_update::x_crsql_after_update;
use sqlite::{Destructor, ResultCode};
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context, Value};
#[cfg(feature = "test")]
use tableinfo::TableInfo;
use tableinfo::{crsql_ensure_table_infos_are_up_to_date, is_table_compatible, pull_table_info};
use teardown::*;
use triggers::create_triggers;
Expand Down Expand Up @@ -453,19 +455,31 @@ pub extern "C" fn sqlite3_crsqlcore_init(
}

#[cfg(feature = "test")]
let rc = db
.create_function_v2(
"crsql_cache_site_ordinal",
1,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_cache_site_ordinal),
None,
None,
None,
)
.unwrap_or(ResultCode::ERROR);
if rc != ResultCode::OK {
if let Err(_) = db.create_function_v2(
"crsql_cache_site_ordinal",
1,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_cache_site_ordinal),
None,
None,
None,
) {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}

#[cfg(feature = "test")]
if let Err(_) = db.create_function_v2(
"crsql_cache_pk_cl",
2,
sqlite::UTF8 | sqlite::DETERMINISTIC,
Some(ext_data as *mut c_void),
Some(x_crsql_cache_pk_cl),
None,
None,
None,
) {
unsafe { crsql_freeExtData(ext_data) };
return null_mut();
}
Expand Down Expand Up @@ -975,6 +989,40 @@ unsafe extern "C" fn x_crsql_cache_site_ordinal(
sqlite::result_int64(ctx, res);
}

/**
* Get the pk cl cached in the ext data for the current transaction.
* only used for test to inspect the cl cache.
*/
#[cfg(feature = "test")]
unsafe extern "C" fn x_crsql_cache_pk_cl(
ctx: *mut sqlite::context,
argc: i32,
argv: *mut *mut sqlite::value,
) {
if argc < 2 {
ctx.result_error(
"Wrong number of args provided to crsql_cache_pk_cl. Provide the table name and pk key.",
);
return;
}

let ext_data = ctx.user_data() as *mut c::crsql_ExtData;
let args = sqlite::args!(argc, argv);
let table_name = args[0].text();
let pk_key = args[1].int64();

let table_infos =
mem::ManuallyDrop::new(Box::from_raw((*ext_data).tableInfos as *mut Vec<TableInfo>));
let table_info = table_infos.iter().find(|t| t.tbl_name == table_name);

if let Some(table_info) = table_info {
let cl = table_info.get_cl(pk_key).cloned().unwrap_or(-1);
sqlite::result_int64(ctx, cl);
} else {
ctx.result_error("table not found");
}
}

/**
* Return the timestamp for the current transaction.
*/
Expand Down
45 changes: 20 additions & 25 deletions core/rs/core/src/local_writes/after_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use sqlite_nostd as sqlite;
use crate::{c::crsql_ExtData, tableinfo::TableInfo};

use super::bump_seq;
use super::mark_locally_deleted;
use super::trigger_fn_preamble;

/**
Expand Down Expand Up @@ -37,7 +38,7 @@ pub unsafe extern "C" fn x_crsql_after_delete(
fn after_delete(
db: *mut sqlite3,
ext_data: *mut crsql_ExtData,
tbl_info: &TableInfo,
tbl_info: &mut TableInfo,
pks_old: &[*mut value],
) -> Result<ResultCode, String> {
let ts = unsafe { (*ext_data).timestamp.to_string() };
Expand All @@ -47,30 +48,24 @@ fn after_delete(
.get_or_create_key_via_raw_values(db, pks_old)
.map_err(|_| "failed getting or creating lookaside key")?;

let mark_locally_deleted_stmt_ref = tbl_info
.get_mark_locally_deleted_stmt(db)
.map_err(|_e| "failed to get mark_locally_deleted_stmt")?;
let mark_locally_deleted_stmt = mark_locally_deleted_stmt_ref
.as_ref()
.ok_or("Failed to deref sentinel stmt")?;
mark_locally_deleted_stmt
.bind_int64(1, key)
.and_then(|_| mark_locally_deleted_stmt.bind_int64(2, db_version))
.and_then(|_| mark_locally_deleted_stmt.bind_int(3, seq))
.and_then(|_| mark_locally_deleted_stmt.bind_text(4, &ts, sqlite::Destructor::STATIC))
.map_err(|_| "failed binding to mark locally deleted stmt")?;
super::step_trigger_stmt(mark_locally_deleted_stmt)?;
let cl = mark_locally_deleted(db, tbl_info, key, db_version, seq, &ts)?;

// now actually delete the row metadata
let drop_clocks_stmt_ref = tbl_info
.get_merge_delete_drop_clocks_stmt(db)
.map_err(|_e| "failed to get mark_locally_deleted_stmt")?;
let drop_clocks_stmt = drop_clocks_stmt_ref
.as_ref()
.ok_or("Failed to deref sentinel stmt")?;
{
// now actually delete the row metadata
let drop_clocks_stmt_ref = tbl_info
.get_merge_delete_drop_clocks_stmt(db)
.map_err(|_e| "failed to get mark_locally_deleted_stmt")?;
let drop_clocks_stmt = drop_clocks_stmt_ref
.as_ref()
.ok_or("Failed to deref sentinel stmt")?;

drop_clocks_stmt
.bind_int64(1, key)
.map_err(|_e| "failed to bind pks to drop_clocks_stmt")?;
super::step_trigger_stmt(drop_clocks_stmt)
drop_clocks_stmt
.bind_int64(1, key)
.map_err(|_e| "failed to bind pks to drop_clocks_stmt")?;
super::step_trigger_stmt(drop_clocks_stmt)?;
}

tbl_info.set_cl(key, cl);

Ok(ResultCode::OK)
}
Loading
Loading