Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 85 additions & 60 deletions src/persistence/stock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,14 +689,16 @@ impl<S: StashProvider, H: StateProvider, P: IndexProvider> Stock<S, H, P> {
let mut bundles_with_height = bundles.into_iter().collect::<Vec<_>>();
bundles_with_height.sort_by_key(|(_, (_, num))| *num);

// Dependency violation detection
let mut needs_reordering = false;
let bundle_positions = bundles_with_height
.iter()
.enumerate()
.map(|(i, (bundle_id, (_, _)))| (*bundle_id, i))
.collect::<HashMap<_, _>>();
'outer: for (i, (_, (witness_bundle, _))) in bundles_with_height.iter().enumerate() {
let mut needs_reordering = false;
let mut known_bundle_dependencies: HashMap<BundleId, HashSet<BundleId>> =
HashMap::with_capacity(bundles_len);

for (i, (bundle_id, (witness_bundle, _))) in bundles_with_height.iter().enumerate() {
for KnownTransition { transition, opid } in &witness_bundle.bundle.known_transitions {
if let Some(ref mut dag_info) = dag_info {
dag_info.register_outputs(transition, opid);
Expand All @@ -705,14 +707,27 @@ impl<S: StashProvider, H: StateProvider, P: IndexProvider> Stock<S, H, P> {
if let Some(ref mut dag_info) = dag_info {
dag_info.connect_input_to_outputs_by_opid(input, opid);
}
if input.op != contract_id {
let input_bundle_id = self.index.bundle_id_for_op(input.op)?;
// ignore missing input bundles (e.g. can happen in case of replace)
if let Some(&input_pos) = bundle_positions.get(&input_bundle_id) {
if input_pos > i {
needs_reordering = true;
break 'outer;
}
if input.op == contract_id {
continue;
}
let input_bundle_id = match self.index.bundle_id_for_op(input.op) {
Ok(bundle_id) => bundle_id,
// Ignore missing input bundles (e.g. can happen in case of replace).
Err(IndexError::Inconsistency(IndexInconsistency::BundleAbsent(_))) => {
continue;
}
Err(err) => return Err(err.into()),
};
if input_bundle_id == *bundle_id {
continue;
}
if let Some(&input_pos) = bundle_positions.get(&input_bundle_id) {
known_bundle_dependencies
.entry(*bundle_id)
.or_default()
.insert(input_bundle_id);
if input_pos > i {
needs_reordering = true;
}
}
}
Expand All @@ -726,62 +741,72 @@ impl<S: StashProvider, H: StateProvider, P: IndexProvider> Stock<S, H, P> {
return Ok((bundles, dag_info.map(|d| d.to_opouts_dag_data())));
}

// Topological sort
let mut known_bundle_dependencies: HashMap<BundleId, HashSet<BundleId>> =
HashMap::with_capacity(bundles_len);
for (bundle_id, (witness_bundle, _)) in &bundles_with_height {
for KnownTransition { transition, opid } in &witness_bundle.bundle.known_transitions {
if let Some(ref mut dag_info) = dag_info {
dag_info.register_outputs(transition, opid);
}
for input in &transition.inputs {
if let Some(ref mut dag_info) = dag_info {
dag_info.connect_input_to_outputs_by_opid(input, opid);
}
if input.op != contract_id {
let input_bundle_id = self.index.bundle_id_for_op(input.op)?;
if bundle_positions.contains_key(&input_bundle_id)
&& input_bundle_id != *bundle_id
{
known_bundle_dependencies
.entry(*bundle_id)
.or_default()
.insert(input_bundle_id);
}
}
}
// Deterministic Kahn topological sort; initial witness-height order is preserved when ties
// are possible.
let ordered_bundle_ids = bundles_with_height
.iter()
.map(|(bundle_id, _)| *bundle_id)
.collect::<Vec<_>>();
let mut bundles_by_id = bundles_with_height
.into_iter()
.map(|(bundle_id, (witness_bundle, _))| (bundle_id, witness_bundle))
.collect::<HashMap<_, _>>();
let mut dependencies_left = ordered_bundle_ids
.iter()
.map(|bundle_id| {
(
*bundle_id,
known_bundle_dependencies
.get(bundle_id)
.map(HashSet::len)
.unwrap_or_default(),
)
})
.collect::<HashMap<_, _>>();
let mut dependents: HashMap<BundleId, Vec<BundleId>> = HashMap::with_capacity(bundles_len);
for (bundle_id, dependencies) in &known_bundle_dependencies {
for dependency in dependencies {
dependents.entry(*dependency).or_default().push(*bundle_id);
}
}
let mut sorted_bundles: Vec<WitnessBundle> = Vec::with_capacity(bundles_len);
let mut remaining = bundles_with_height
.into_iter()
.map(|(id, (wb, _))| (id, wb))
.collect::<Vec<_>>();
while !remaining.is_empty() {
let processed_ids = sorted_bundles
.iter()
.map(|wb| wb.bundle.bundle_id())
.collect::<HashSet<_>>();
let mut found = false;
let mut i = 0;
while i < remaining.len() {
let (bundle_id, _) = &remaining[i];
let dependencies = known_bundle_dependencies
.get(bundle_id)
.cloned()
.unwrap_or_default();
if dependencies.is_subset(&processed_ids) {
let (_, witness_bundle) = remaining.remove(i);
sorted_bundles.push(witness_bundle);
found = true;
break;
let mut ready_positions = BTreeSet::new();
for bundle_id in &ordered_bundle_ids {
if dependencies_left.get(bundle_id).copied().unwrap_or_default() == 0 {
if let Some(position) = bundle_positions.get(bundle_id) {
ready_positions.insert(*position);
}
i += 1;
}
if !found {
}
let mut sorted_bundles = Vec::with_capacity(bundles_len);
while let Some(position) = ready_positions.iter().next().copied() {
ready_positions.remove(&position);
let bundle_id = ordered_bundle_ids[position];
let Some(witness_bundle) = bundles_by_id.remove(&bundle_id) else {
return Err(StockError::BundlesInconsistency);
};
sorted_bundles.push(witness_bundle);

if let Some(children) = dependents.get(&bundle_id) {
for child_id in children {
let Some(left) = dependencies_left.get_mut(child_id) else {
return Err(StockError::BundlesInconsistency);
};
if *left == 0 {
return Err(StockError::BundlesInconsistency);
}
*left -= 1;
if *left == 0 {
let Some(child_position) = bundle_positions.get(child_id) else {
return Err(StockError::BundlesInconsistency);
};
ready_positions.insert(*child_position);
}
}
}
}
if sorted_bundles.len() != bundles_len {
return Err(StockError::BundlesInconsistency);
}
Ok((sorted_bundles, dag_info.map(|d| d.to_opouts_dag_data())))
}

Expand Down