feat(wallet): migrate legacy output key-ids to current format on startup (closes #7829)#7859
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a background migration task to convert legacy key-id strings in the output table to the current TariKeyId format. The review feedback highlights a critical issue where the migration loop is susceptible to an infinite loop and 100% CPU usage if any row fails to convert or update. To address this, the reviewer suggests implementing keyset pagination (using last_id) across the database backend, SQL queries, and tests. Additionally, a minor recommendation is made to add a short sleep between batch iterations to prevent SQLite database lock contention.
| /// Number of output rows processed per iteration of the legacy-key migration. | ||
| const LEGACY_KEY_MIGRATION_BATCH_SIZE: i64 = 100; | ||
|
|
||
| /// Background task that converts any `spending_key` / `script_private_key` column values stored in the legacy | ||
| /// "managed.<branch>.<index>" or "imported.<pubkey>" format to the current `TariKeyId` encoding. | ||
| /// | ||
| /// The migration runs in a loop, fetching `LEGACY_KEY_MIGRATION_BATCH_SIZE` rows per iteration. Because each row is | ||
| /// updated in-place (so it no longer matches the LIKE filter on the next query), the result set naturally shrinks - | ||
| /// no explicit offset is needed. | ||
| /// | ||
| /// Errors for individual rows are logged and skipped; the migration continues so a single bad row cannot block the | ||
| /// rest. Errors fetching a batch cause the migration to abort early with a warning. | ||
| async fn migrate_legacy_output_keys<TBackend, TWalletConnectivity, TKeyManagerInterface>( | ||
| resources: OutputManagerResources<TBackend, TWalletConnectivity, TKeyManagerInterface>, | ||
| ) where | ||
| TBackend: OutputManagerBackend + 'static, | ||
| TWalletConnectivity: Send, | ||
| TKeyManagerInterface: LegacyTransactionKeyManagerInterface, | ||
| { | ||
| use std::str::FromStr; | ||
|
|
||
| let mut total_migrated: usize = 0; | ||
| loop { | ||
| let batch = match resources | ||
| .db | ||
| .fetch_outputs_with_legacy_key_ids(LEGACY_KEY_MIGRATION_BATCH_SIZE) | ||
| { | ||
| Ok(b) => b, | ||
| Err(e) => { | ||
| warn!( | ||
| target: LOG_TARGET, | ||
| "Legacy key migration: failed to fetch next batch - aborting migration early: {e}" | ||
| ); | ||
| return; | ||
| }, | ||
| }; | ||
|
|
||
| if batch.is_empty() { | ||
| break; | ||
| } | ||
|
|
||
| for (output_id, spending_key_str, script_key_str) in &batch { |
There was a problem hiding this comment.
The current migration loop is susceptible to an infinite loop if any row fails to convert or update. Because the query uses a LIKE filter without any paging or offset, any row that fails to convert (and thus still matches the LIKE filter) will be repeatedly fetched at the beginning of the next batch, causing the background task to loop indefinitely and consume 100% CPU.
To prevent this, we should use keyset pagination (cursor-based paging) by tracking the last_id processed and filtering the query with id > last_id. This ensures that we always make forward progress through the table, even if some rows fail to migrate.
/// Number of output rows processed per iteration of the legacy-key migration.
const LEGACY_KEY_MIGRATION_BATCH_SIZE: i64 = 100;
/// Background task that converts any `spending_key` / `script_private_key` column values stored in the legacy
/// "managed.<branch>.<index>" or "imported.<pubkey>" format to the current `TariKeyId` encoding.
///
/// The migration runs in a loop, fetching `LEGACY_KEY_MIGRATION_BATCH_SIZE` rows per iteration. It uses keyset
/// pagination (filtering by `id > last_id`) to avoid infinite loops if some rows fail to convert or update.
///
/// Errors for individual rows are logged and skipped; the migration continues so a single bad row cannot block the
/// rest. Errors fetching a batch cause the migration to abort early with a warning.
async fn migrate_legacy_output_keys<TBackend, TWalletConnectivity, TKeyManagerInterface>(
resources: OutputManagerResources<TBackend, TWalletConnectivity, TKeyManagerInterface>,
) where
TBackend: OutputManagerBackend + 'static,
TWalletConnectivity: Send,
TKeyManagerInterface: LegacyTransactionKeyManagerInterface,
{
use std::str::FromStr;
let mut total_migrated: usize = 0;
let mut last_id = 0;
loop {
let batch = match resources
.db
.fetch_outputs_with_legacy_key_ids(last_id, LEGACY_KEY_MIGRATION_BATCH_SIZE)
{
Ok(b) => b,
Err(e) => {
warn!(
target: LOG_TARGET,
"Legacy key migration: failed to fetch next batch - aborting migration early: {e}"
);
return;
},
};
if batch.is_empty() {
break;
}
for (output_id, spending_key_str, script_key_str) in &batch {
last_id = *output_id;| /// Fetch a batch of outputs whose `spending_key` or `script_private_key` columns still hold a legacy key-id | ||
| /// string (i.e., one that starts with "managed." or "imported." and therefore cannot be parsed by | ||
| /// `TariKeyId::from_str`). Only the three columns required for migration are returned; no BLOB columns | ||
| /// (rangeproof, encrypted_data, etc.) are loaded. `batch_size` rows are returned per call; callers should | ||
| /// repeat with `offset = 0` after each batch until an empty vec is returned because already-migrated rows no | ||
| /// longer match the filter. | ||
| fn fetch_outputs_with_legacy_key_ids( | ||
| &self, | ||
| batch_size: i64, | ||
| ) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError>; |
There was a problem hiding this comment.
Update the trait method signature to accept last_id to support keyset pagination.
/// Fetch a batch of outputs whose `spending_key` or `script_private_key` columns still hold a legacy key-id
/// string (i.e., one that starts with "managed." or "imported." and therefore cannot be parsed by
/// `TariKeyId::from_str`). Only the three columns required for migration are returned; no BLOB columns
/// (rangeproof, encrypted_data, etc.) are loaded. `batch_size` rows are returned per call, starting after `last_id`.
fn fetch_outputs_with_legacy_key_ids(
&self,
last_id: i32,
batch_size: i64,
) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError>;| /// See `OutputManagerBackend::fetch_outputs_with_legacy_key_ids`. | ||
| pub fn fetch_outputs_with_legacy_key_ids( | ||
| &self, | ||
| batch_size: i64, | ||
| ) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError> { | ||
| self.db.fetch_outputs_with_legacy_key_ids(batch_size) | ||
| } |
There was a problem hiding this comment.
Update the database wrapper method to pass last_id to the backend.
/// See `OutputManagerBackend::fetch_outputs_with_legacy_key_ids`.
pub fn fetch_outputs_with_legacy_key_ids(
&self,
last_id: i32,
batch_size: i64,
) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError> {
self.db.fetch_outputs_with_legacy_key_ids(last_id, batch_size)
}| fn fetch_outputs_with_legacy_key_ids( | ||
| &self, | ||
| batch_size: i64, | ||
| ) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError> { | ||
| let mut conn = self.database_connection.get_pooled_connection()?; | ||
| OutputSql::find_outputs_with_legacy_key_ids(batch_size, &mut conn) | ||
| } |
There was a problem hiding this comment.
Update the backend implementation to pass last_id to the SQL query.
| fn fetch_outputs_with_legacy_key_ids( | |
| &self, | |
| batch_size: i64, | |
| ) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError> { | |
| let mut conn = self.database_connection.get_pooled_connection()?; | |
| OutputSql::find_outputs_with_legacy_key_ids(batch_size, &mut conn) | |
| } | |
| fn fetch_outputs_with_legacy_key_ids( | |
| &self, | |
| last_id: i32, | |
| batch_size: i64, | |
| ) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError> { | |
| let mut conn = self.database_connection.get_pooled_connection()?; | |
| OutputSql::find_outputs_with_legacy_key_ids(last_id, batch_size, &mut conn) | |
| } |
| /// Return up to `batch_size` outputs whose `spending_key` or `script_private_key` still contains a legacy key-id | ||
| /// string - i.e., one that begins with `"managed."` or `"imported."` and therefore cannot be parsed by | ||
| /// `TariKeyId::from_str`. Only the three columns needed by the migration are fetched; no BLOB columns are loaded. | ||
| /// | ||
| /// Callers should keep calling with `batch_size` until an empty vec is returned. Because each batch updates the | ||
| /// rows in-place, already-migrated rows stop matching the filter and the result set naturally shrinks. | ||
| pub fn find_outputs_with_legacy_key_ids( | ||
| batch_size: i64, | ||
| conn: &mut SqliteConnection, | ||
| ) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError> { | ||
| outputs::table | ||
| .select((outputs::id, outputs::spending_key, outputs::script_private_key)) | ||
| .filter( | ||
| outputs::spending_key | ||
| .like("managed.%") | ||
| .or(outputs::spending_key.like("imported.%")) | ||
| .or(outputs::script_private_key.like("managed.%")) | ||
| .or(outputs::script_private_key.like("imported.%")), | ||
| ) | ||
| .limit(batch_size) | ||
| .load::<(i32, String, String)>(conn) | ||
| .map_err(OutputManagerStorageError::DieselError) | ||
| } |
There was a problem hiding this comment.
Update the SQL query to filter by id > last_id and order by id ASC to implement keyset pagination.
/// Return up to `batch_size` outputs whose `spending_key` or `script_private_key` still contains a legacy key-id
/// string - i.e., one that begins with `"managed."` or `"imported."` and therefore cannot be parsed by
/// `TariKeyId::from_str`. Only the three columns needed by the migration are fetched; no BLOB columns are loaded.
///
/// Callers should keep calling with `batch_size` until an empty vec is returned, passing the last processed ID
/// to avoid infinite loops if some rows fail to convert.
pub fn find_outputs_with_legacy_key_ids(
last_id: i32,
batch_size: i64,
conn: &mut SqliteConnection,
) -> Result<Vec<(i32, String, String)>, OutputManagerStorageError> {
outputs::table
.select((outputs::id, outputs::spending_key, outputs::script_private_key))
.filter(outputs::id.gt(last_id))
.filter(
outputs::spending_key
.like("managed.%")
.or(outputs::spending_key.like("imported.%"))
.or(outputs::script_private_key.like("managed.%"))
.or(outputs::script_private_key.like("imported.%")),
)
.order_by(outputs::id.asc())
.limit(batch_size)
.load::<(i32, String, String)>(conn)
.map_err(OutputManagerStorageError::DieselError)
}| // Sanity check: no legacy keys yet. | ||
| let before = db.fetch_outputs_with_legacy_key_ids(100).unwrap(); | ||
| assert!(before.is_empty(), "expected no legacy keys before injection"); | ||
|
|
||
| // 2. Overwrite kmo1's spending_key with the legacy comms-key string. wallet_types constants: SPEND_KEY_BRANCH = | ||
| // "comms", VIEW_KEY_BRANCH = "data encryption". | ||
| let legacy_key_str = "managed.comms.0"; | ||
| { | ||
| let mut conn = connection.get_pooled_connection().unwrap(); | ||
| diesel::update(outputs::table.filter(outputs::commitment.eq(&kmo1.commitment.to_vec()))) | ||
| .set(outputs::spending_key.eq(legacy_key_str)) | ||
| .execute(&mut conn) | ||
| .unwrap(); | ||
| } | ||
|
|
||
| // 3. Migration query must find exactly the one injected row. | ||
| let found = db.fetch_outputs_with_legacy_key_ids(100).unwrap(); | ||
| assert_eq!(found.len(), 1, "expected exactly 1 output with a legacy spending_key"); | ||
| let (output_id, found_spending, found_script) = found.into_iter().next().unwrap(); | ||
| assert_eq!(found_spending, legacy_key_str); | ||
|
|
||
| // 4. Parse -> convert -> update (mirrors what `migrate_legacy_output_keys` does per row). | ||
| let legacy_id = LegacyTariKeyId::from_str(&found_spending).expect("must parse as LegacyTariKeyId"); | ||
| let current_id = key_manager | ||
| .convert_legacy_tari_key_id_to_current(&legacy_id) | ||
| .expect("conversion must succeed"); | ||
| // "managed.comms.0" must convert to TariKeyId::SpendKey. | ||
| assert_eq!( | ||
| current_id, | ||
| TariKeyId::SpendKey, | ||
| "legacy managed.comms.0 should convert to TariKeyId::SpendKey" | ||
| ); | ||
|
|
||
| let new_spending = current_id.to_string(); | ||
| db.update_output_key_ids(output_id, new_spending, found_script).unwrap(); | ||
|
|
||
| // 5. After the update the legacy filter must return nothing. | ||
| let after = db.fetch_outputs_with_legacy_key_ids(100).unwrap(); | ||
| assert!(after.is_empty(), "expected no legacy keys after migration"); |
There was a problem hiding this comment.
Update the test calls to pass 0 as the last_id parameter to match the updated signature.
// Sanity check: no legacy keys yet.
let before = db.fetch_outputs_with_legacy_key_ids(0, 100).unwrap();
assert!(before.is_empty(), "expected no legacy keys before injection");
// 2. Overwrite kmo1's spending_key with the legacy comms-key string. wallet_types constants: SPEND_KEY_BRANCH =
// "comms", VIEW_KEY_BRANCH = "data encryption".
let legacy_key_str = "managed.comms.0";
{
let mut conn = connection.get_pooled_connection().unwrap();
diesel::update(outputs::table.filter(outputs::commitment.eq(&kmo1.commitment.to_vec())))
.set(outputs::spending_key.eq(legacy_key_str))
.execute(&mut conn)
.unwrap();
}
// 3. Migration query must find exactly the one injected row.
let found = db.fetch_outputs_with_legacy_key_ids(0, 100).unwrap();
assert_eq!(found.len(), 1, "expected exactly 1 output with a legacy spending_key");
let (output_id, found_spending, found_script) = found.into_iter().next().unwrap();
assert_eq!(found_spending, legacy_key_str);
// 4. Parse -> convert -> update (mirrors what `migrate_legacy_output_keys` does per row).
let legacy_id = LegacyTariKeyId::from_str(&found_spending).expect("must parse as LegacyTariKeyId");
let current_id = key_manager
.convert_legacy_tari_key_id_to_current(&legacy_id)
.expect("conversion must succeed");
// "managed.comms.0" must convert to TariKeyId::SpendKey.
assert_eqblock(
current_id,
TariKeyId::SpendKey,
"legacy managed.comms.0 should convert to TariKeyId::SpendKey"
);
let new_spending = current_id.to_string();
db.update_output_key_ids(output_id, new_spending, found_script).unwrap();
// 5. After the update the legacy filter must return nothing.
let after = db.fetch_outputs_with_legacy_key_ids(0, 100).unwrap();
assert!(after.is_empty(), "expected no legacy keys after migration");| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Since SQLite is a single-writer database, running a tight loop of database updates in a background task can cause database lock contention (SQLITE_BUSY errors) and starve the main event loop. Adding a small sleep at the end of each batch iteration yields control and allows other database connections in the pool to acquire the write lock.
}
}
// Yield to other tasks to prevent SQLite database lock contention
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}…tup (closes tari-project#7829) On startup the OutputManagerService now spawns a background task that converts any spending_key / script_private_key column values stored in the legacy LegacyTariKeyId encoding (e.g. "managed.<branch>.<index>", "imported.<pubkey>") to the current TariKeyId encoding. Design notes: - Targeted LIKE filter covers the legacy variants that have no current TariKeyId equivalent: "managed.%", "imported.%", plus the single-level Derived wrappers "derived.managed.%" / "derived.imported.%". No full-table scan. Only id, spending_key, script_private_key are loaded; no BLOBs. - Keyset pagination on outputs.id (id > last_id, ORDER BY id ASC) guarantees the migration always makes forward progress. Rows that fail to convert and remain in the LIKE filter cannot cause an infinite loop because the cursor always advances past them. - 50ms sleep between batches yields to the main service event loop and avoids SQLITE_BUSY contention against the shared connection pool. - Rows whose conversion fails are left in their original form; the existing on-read fallback in OutputSql::to_db_wallet_output still converts them lazily, so functionality is preserved. - Launched via tokio::spawn so wallet startup is not blocked. New surface: - OutputManagerBackend::fetch_outputs_with_legacy_key_ids(last_id, batch_size) - OutputManagerBackend::update_output_key_ids(id, spending, script) - OutputSql::find_outputs_with_legacy_key_ids (LIKE filter + keyset paging) - OutputSql::update_key_ids - OutputManagerDatabase wrappers for both - service::migrate_legacy_output_keys async fn + service::convert_one_key_id helper - Storage test: test_migrate_legacy_output_key_ids exercises the full round-trip on "managed.comms.0" -> TariKeyId::SpendKey and verifies clean outputs remain loadable. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1045a6e to
74cc1b8
Compare
| .like("managed.%") | ||
| .or(outputs::spending_key.like("imported.%")) | ||
| .or(outputs::script_private_key.like("managed.%")) | ||
| .or(outputs::script_private_key.like("imported.%")), |
There was a problem hiding this comment.
you are not capturing all legacy types here:
managed.%
imported.%
derived.managed.%
derived.imported.%
etc, you need all
Summary
Closes #7829.
On startup, the
OutputManagerServicenow spawns a background task that converts anyspending_key/script_private_keycolumn values stored in the legacy"managed.<branch>.<index>"or"imported.<pubkey>"format to the currentTariKeyIdencoding.Design choices
WHERE spending_key LIKE 'managed.%' OR ... LIKE 'imported.%'targets only rows that need migration. No full-table scan; no BLOB columns loaded (onlyid,spending_key,script_private_key).tokio::spawnbefore the service event loop; wallet startup is not delayed.Files changed
storage/database/backend.rsfetch_outputs_with_legacy_key_ids/update_output_key_idsstorage/sqlite_db/output_sql.rsOutputSql::find_outputs_with_legacy_key_ids(LIKE query, no BLOBs) +OutputSql::update_key_idsstorage/sqlite_db/mod.rsOutputManagerSqliteDatabasestorage/database/mod.rsOutputManagerDatabase<T>service.rsmigrate_legacy_output_keysfree async fn +tokio::spawncall instart()tests/.../storage.rstest_migrate_legacy_output_key_ids- injects"managed.comms.0", verifies LIKE filter detects it, verifies conversion toTariKeyId::SpendKey, verifies row is cleared from filter, verifies clean outputs are unaffectedTest
Passes locally.
🤖 Generated with Claude Code