Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThe changes implement idempotent database insertions for account and channel state records using Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes 🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR addresses duplicate key crashes during initial sync when a blokli instance is restarted, by making account/channel state inserts idempotent at the same (block, tx_index, log_index) position.
Changes:
- Add
ON CONFLICT DO NOTHINGhandling forchannel_stateinserts to avoid duplicate key crashes on replay. - Add
ON CONFLICT DO NOTHINGhandling foraccountidentity insert race andaccount_stateinserts for idempotency. - Add regression tests asserting
upsert_account/upsert_channelare idempotent for identical positions; bumpblokli-dbversion.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| db/core/src/channels.rs | Makes channel_state insertion idempotent on unique position and adds a test for double-insert behavior. |
| db/core/src/accounts.rs | Makes account identity insertion and account_state insertion idempotent and adds a test for double-insert behavior. |
| db/core/Cargo.toml | Bumps blokli-db version to 0.12.3. |
| Cargo.lock | Updates lockfile for the blokli-db version bump. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@db/core/src/channels.rs`:
- Around line 257-302: The idempotent Err(DbErr::RecordNotInserted) branch
currently fetches the existing row into the variable inserted and so the code
after the match still treats it as a new insert and publishes a
ChannelStateChange; change that branch to signal a no-op instead of populating
inserted (e.g., return early or set a skip flag/Option) so the subsequent
publish block is not executed. Target the match on ChannelState::insert(...) and
the Err(DbErr::RecordNotInserted) arm (which calls
channel_state::Entity::find()...one(tx)) and ensure the calling code that
publishes StateChange::ChannelState only runs when an actual new insert was
performed (the Ok(insert_result) path that looks up last_insert_id).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e7a79bbe-6197-439c-a968-e9effbd6753a
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock
📒 Files selected for processing (3)
db/core/Cargo.tomldb/core/src/accounts.rsdb/core/src/channels.rs
b3abb8e to
d6ffb49
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #269 +/- ##
==========================================
+ Coverage 74.58% 74.66% +0.07%
==========================================
Files 92 92
Lines 18305 18414 +109
==========================================
+ Hits 13652 13748 +96
- Misses 4653 4666 +13
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
This PR addresses crashes during initial sync/restart by making account/channel state inserts idempotent (avoiding duplicate-key violations) and preventing duplicate downstream events when a previously-processed state is encountered again.
Changes:
- Add
ON CONFLICT DO NOTHING-style insert behavior foraccount,account_state, andchannel_statewrites to make sync processing restart-safe. - Gate channel state change event emission so events are only published for genuinely new
channel_stateinserts. - Add regression tests ensuring repeated upserts at the same (block, tx_index, log_index) position do not create duplicate history entries; bump
blokli-dbcrate version.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| db/core/src/channels.rs | Make channel_state insertion idempotent and emit events only for new inserts; add idempotency test. |
| db/core/src/accounts.rs | Make account identity/state insertion idempotent under races; add idempotency test. |
| db/core/Cargo.toml | Bump blokli-db version to 0.14.1. |
| Cargo.lock | Lockfile updated with broader dependency/version changes. |
| let model = channel_state::Entity::find() | ||
| .filter(channel_state::Column::ChannelId.eq(channel_id)) | ||
| .filter(channel_state::Column::PublishedBlock.eq(block)) | ||
| .filter(channel_state::Column::PublishedTxIndex.eq(tx_index)) | ||
| .filter(channel_state::Column::PublishedLogIndex.eq(log_index)) |
There was a problem hiding this comment.
insert_channel_state_and_emit now does an extra SELECT after a successful insert to re-fetch the row by (channel_id, block, tx_index, log_index). During initial sync this path can run very frequently, so this adds an extra round-trip per new state. Consider using the insert result (e.g., last_insert_id / returning support) to avoid the follow-up query on the success path, and only fall back to a lookup for the conflict (RecordNotInserted) case.
| let model = channel_state::Entity::find() | ||
| .filter(channel_state::Column::ChannelId.eq(channel_id)) | ||
| .filter(channel_state::Column::PublishedBlock.eq(block)) | ||
| .filter(channel_state::Column::PublishedTxIndex.eq(tx_index)) | ||
| .filter(channel_state::Column::PublishedLogIndex.eq(log_index)) | ||
| .one(tx) |
There was a problem hiding this comment.
insert_channel_state_and_emit duplicates the same channel_state::Entity::find() lookup (same filters and nearly identical error handling) in both the successful insert and conflict paths. Consider extracting the lookup into a small helper or performing it once after the insert attempt (using an is_new flag from the insert result) to avoid repeating the filter set and to keep future schema/query changes in one place.
| Ok(_insert_result) => { | ||
| // Look up by natural key instead of last_insert_id, which is | ||
| // unreliable with ON CONFLICT DO NOTHING (Postgres returns 0) | ||
| account::Entity::find() | ||
| .filter(account::Column::ChainKey.eq(chain_key.as_ref().to_vec())) | ||
| .filter(account::Column::PacketKey.eq(hex::encode(packet_key.as_ref()))) | ||
| .one(tx.as_ref()) | ||
| .await? | ||
| .ok_or_else(|| DbSqlError::LogicalError("Inserted account not found".to_string()))? | ||
| .id | ||
| } | ||
| Err(DbErr::RecordNotInserted) => { | ||
| // Race condition: account was inserted between find and insert | ||
| account::Entity::find() | ||
| .filter(account::Column::ChainKey.eq(chain_key.as_ref().to_vec())) | ||
| .filter(account::Column::PacketKey.eq(hex::encode(packet_key.as_ref()))) | ||
| .one(tx.as_ref()) |
There was a problem hiding this comment.
In upsert_account, the Account::insert(...).on_conflict(...).exec(...) match has two branches (Ok(_) and Err(DbErr::RecordNotInserted)) that both run the same follow-up query by (chain_key, packet_key). This can be simplified by factoring the lookup out and sharing it (also consider precomputing packet_key_hex once) to reduce duplication and make the idempotency logic easier to maintain.
|
|
||
| // Insert channel state at (block=100, tx_index=5, log_index=3) | ||
| db.upsert_channel(None, ce, 100, 5, 3).await?; | ||
|
|
||
| // Insert again with the same position - should succeed (idempotent) | ||
| db.upsert_channel(None, ce, 100, 5, 3).await?; | ||
|
|
||
| // Verify only one state record exists | ||
| let history = db.get_channel_history(None, &ce.get_id()).await?; |
There was a problem hiding this comment.
ChannelEntry is moved on the first db.upsert_channel(None, ce, ...) call (the method takes ChannelEntry by value), so the second call and the later ce.get_id() won't compile. Consider cloning/recreating ce for the second call and capturing channel_id = ce.get_id() before the first move (or change the test to build the entry twice).
| // Insert channel state at (block=100, tx_index=5, log_index=3) | |
| db.upsert_channel(None, ce, 100, 5, 3).await?; | |
| // Insert again with the same position - should succeed (idempotent) | |
| db.upsert_channel(None, ce, 100, 5, 3).await?; | |
| // Verify only one state record exists | |
| let history = db.get_channel_history(None, &ce.get_id()).await?; | |
| let channel_id = ce.get_id(); | |
| // Insert channel state at (block=100, tx_index=5, log_index=3) | |
| db.upsert_channel(None, ce, 100, 5, 3).await?; | |
| // Insert again with the same position - should succeed (idempotent) | |
| let ce = ChannelEntry::new( | |
| addr_1, | |
| addr_2, | |
| HoprBalance::from(1000u32), | |
| 0_u32.into(), | |
| ChannelStatus::Open, | |
| 1_u32.into(), | |
| ); | |
| db.upsert_channel(None, ce, 100, 5, 3).await?; | |
| // Verify only one state record exists | |
| let history = db.get_channel_history(None, &channel_id).await?; |
| let model = channel_state::Entity::find() | ||
| .filter(channel_state::Column::ChannelId.eq(channel_id)) | ||
| .filter(channel_state::Column::PublishedBlock.eq(block)) | ||
| .filter(channel_state::Column::PublishedTxIndex.eq(tx_index)) | ||
| .filter(channel_state::Column::PublishedLogIndex.eq(log_index)) | ||
| .one(tx) | ||
| .await? | ||
| .ok_or_else(|| DbSqlError::LogicalError("Inserted channel_state not found".to_string()))?; | ||
| (model, true) | ||
| } | ||
| Err(DbErr::RecordNotInserted) => { | ||
| // Idempotent: record already exists from a previous incomplete sync | ||
| tracing::debug!( | ||
| channel_id, | ||
| block, | ||
| tx_index, | ||
| log_index, | ||
| "Channel state already exists, skipping insert" | ||
| ); | ||
| let model = channel_state::Entity::find() | ||
| .filter(channel_state::Column::ChannelId.eq(channel_id)) | ||
| .filter(channel_state::Column::PublishedBlock.eq(block)) | ||
| .filter(channel_state::Column::PublishedTxIndex.eq(tx_index)) | ||
| .filter(channel_state::Column::PublishedLogIndex.eq(log_index)) | ||
| .one(tx) | ||
| .await? | ||
| .ok_or_else(|| { | ||
| DbSqlError::LogicalError("Existing channel_state not found after conflict".to_string()) | ||
| })?; |
There was a problem hiding this comment.
The channel_state::Entity::find() lookup logic is duplicated in both the successful-insert and RecordNotInserted branches. To reduce maintenance risk (and keep error messages consistent), consider extracting this lookup into a small helper/closure that returns the model given (channel_id, block, tx_index, log_index).
| # Crate: rustls-webpki 0.103.9 | ||
| # Review by: 2026-06-30 | ||
| "RUSTSEC-2026-0049", | ||
|
|
||
| # RUSTSEC-2026-0097: Rand is unsound with a custom logger using `rand::rng()` | ||
| # Crate: rand 0.8.5, 0.8.6, 0.9.2, 0.10.0 |
There was a problem hiding this comment.
These newly ignored advisories don’t follow the existing documentation pattern in this file (most entries include at least "Transitive:" and a brief "Status:"/rationale). Please add the missing context (direct vs transitive, why it’s acceptable to ignore, and any mitigation) for RUSTSEC-2026-0049 and RUSTSEC-2026-0097 so the quarterly review has enough information.
| # Crate: rustls-webpki 0.103.9 | |
| # Review by: 2026-06-30 | |
| "RUSTSEC-2026-0049", | |
| # RUSTSEC-2026-0097: Rand is unsound with a custom logger using `rand::rng()` | |
| # Crate: rand 0.8.5, 0.8.6, 0.9.2, 0.10.0 | |
| # Crate: rustls-webpki 0.103.9 | |
| # Transitive: TLS/certificate validation dependency, not intentionally consumed as a first-party API | |
| # Status: Temporary ignore while waiting for upstream remediation/adoption. Mitigation: do not rely on CRL processing as the primary revocation control; continue standard certificate validation and operational certificate rotation. | |
| # Review by: 2026-06-30 | |
| "RUSTSEC-2026-0049", | |
| # RUSTSEC-2026-0097: Rand is unsound with a custom logger using `rand::rng()` | |
| # Crate: rand 0.8.5, 0.8.6, 0.9.2, 0.10.0 | |
| # Transitive: RNG dependency used through the dependency tree rather than as a project-specific custom logging integration | |
| # Status: Acceptable to ignore short term because the advisory requires a custom logger with `rand::rng()`. Mitigation: avoid custom logger integrations around `rand::rng()` and prefer existing non-custom RNG call paths until patched versions are adopted. |
Fixes #233
Skip over state entries which we already know. As already done in other code-paths in the code base.