@@ -11,6 +11,21 @@ use bitcoin::{Transaction, Txid};
1111type  CanonicalMap < A >  = HashMap < Txid ,  ( Arc < Transaction > ,  CanonicalReason < A > ) > ; 
1212type  NotCanonicalSet  = HashSet < Txid > ; 
1313
14+ /// Represents the current stage of canonicalization processing. 
15+ #[ derive( Debug ,  Clone ,  Copy ,  PartialEq ,  Eq ) ]  
16+ enum  CanonicalStage  { 
17+     /// Processing directly anchored transactions. 
18+      AnchoredTxs , 
19+     /// Processing transactions seen in mempool. 
20+      SeenTxs , 
21+     /// Processing leftover transactions. 
22+      LeftOverTxs , 
23+     /// Processing transitively anchored transactions. 
24+      TransitivelyAnchoredTxs , 
25+     /// All processing is complete. 
26+      Finished , 
27+ } 
28+ 
1429/// Modifies the canonicalization algorithm. 
1530#[ derive( Debug ,  Default ,  Clone ) ]  
1631pub  struct  CanonicalizationParams  { 
@@ -30,86 +45,147 @@ pub struct CanonicalizationTask<'g, A> {
3045    unprocessed_anchored_txs :  VecDeque < ( Txid ,  Arc < Transaction > ,  & ' g  BTreeSet < A > ) > , 
3146    unprocessed_seen_txs :  Box < dyn  Iterator < Item  = ( Txid ,  Arc < Transaction > ,  u64 ) >  + ' g > , 
3247    unprocessed_leftover_txs :  VecDeque < ( Txid ,  Arc < Transaction > ,  u32 ) > , 
48+     unprocessed_transitively_anchored_txs :  VecDeque < ( Txid ,  Arc < Transaction > ,  & ' g  BTreeSet < A > ) > , 
3349
3450    canonical :  CanonicalMap < A > , 
3551    not_canonical :  NotCanonicalSet , 
3652
3753    // Store canonical transactions in order 
3854    canonical_order :  Vec < Txid > , 
3955
40-     // Track which transactions have confirmed anchors 
41-     confirmed_anchors :  HashMap < Txid ,  A > , 
56+     // Track which transactions have direct anchors (not transitive) 
57+     direct_anchors :  HashMap < Txid ,  A > , 
58+ 
59+     // Track the current stage of processing 
60+     current_stage :  CanonicalStage , 
4261} 
4362
4463impl < ' g ,  A :  Anchor >  ChainQuery  for  CanonicalizationTask < ' g ,  A >  { 
4564    type  Output  = CanonicalView < A > ; 
4665
4766    fn  next_query ( & mut  self )  -> Option < ChainRequest >  { 
48-         // Find the next non-canonicalized transaction to query 
49-         if  let  Some ( ( _txid,  _,  anchors) )  = self . unprocessed_anchored_txs . front ( )  { 
50-             // if !self.is_canonicalized(*txid) { 
51-             //     // Build query for this transaction 
52-             //     let block_ids = anchors.iter().map(|anchor| anchor.anchor_block()).collect(); 
53-             //     return Some(ChainRequest { 
54-             //         chain_tip: self.chain_tip, 
55-             //         block_ids, 
56-             //     }); 
57-             // } 
58-             // // Skip already canonicalized transaction 
59-             // self.unprocessed_anchored_txs.pop_front(); 
60-             // Build query for this transaction 
61-             let  block_ids = anchors. iter ( ) . map ( |anchor| anchor. anchor_block ( ) ) . collect ( ) ; 
62-             return  Some ( ChainRequest  { 
63-                 chain_tip :  self . chain_tip , 
64-                 block_ids, 
65-             } ) ; 
67+         // Try to advance to the next stage if needed 
68+         self . try_advance ( ) ; 
69+ 
70+         match  self . current_stage  { 
71+             CanonicalStage :: AnchoredTxs  => { 
72+                 // Process directly anchored transactions first 
73+                 if  let  Some ( ( _txid,  _,  anchors) )  = self . unprocessed_anchored_txs . front ( )  { 
74+                     let  block_ids = anchors. iter ( ) . map ( |anchor| anchor. anchor_block ( ) ) . collect ( ) ; 
75+                     return  Some ( ChainRequest  { 
76+                         chain_tip :  self . chain_tip , 
77+                         block_ids, 
78+                     } ) ; 
79+                 } 
80+                 None 
81+             } 
82+             CanonicalStage :: TransitivelyAnchoredTxs  => { 
83+                 // Process transitively anchored transactions last 
84+                 if  let  Some ( ( _txid,  _,  anchors) )  =
85+                     self . unprocessed_transitively_anchored_txs . front ( ) 
86+                 { 
87+                     let  block_ids = anchors. iter ( ) . map ( |anchor| anchor. anchor_block ( ) ) . collect ( ) ; 
88+                     return  Some ( ChainRequest  { 
89+                         chain_tip :  self . chain_tip , 
90+                         block_ids, 
91+                     } ) ; 
92+                 } 
93+                 None 
94+             } 
95+             CanonicalStage :: SeenTxs  | CanonicalStage :: LeftOverTxs  | CanonicalStage :: Finished  => { 
96+                 // These stages don't need queries 
97+                 None 
98+             } 
6699        } 
67-         None 
68100    } 
69101
70102    fn  resolve_query ( & mut  self ,  response :  ChainResponse )  { 
71-         if  let  Some ( ( txid,  tx,  anchors) )  = self . unprocessed_anchored_txs . pop_front ( )  { 
72-             // Find the anchor that matches the confirmed BlockId 
73-             let  best_anchor = response. and_then ( |block_id| { 
74-                 anchors
75-                     . iter ( ) 
76-                     . find ( |anchor| anchor. anchor_block ( )  == block_id) 
77-                     . cloned ( ) 
78-             } ) ; 
79- 
80-             match  best_anchor { 
81-                 Some ( best_anchor)  => { 
82-                     self . confirmed_anchors . insert ( txid,  best_anchor. clone ( ) ) ; 
83-                     if  !self . is_canonicalized ( txid)  { 
84-                         self . mark_canonical ( txid,  tx,  CanonicalReason :: from_anchor ( best_anchor) ) ; 
103+         // Only AnchoredTxs and TransitivelyAnchoredTxs stages should receive query 
104+         // responses Other stages don't generate queries and thus shouldn't call 
105+         // resolve_query 
106+         match  self . current_stage  { 
107+             CanonicalStage :: AnchoredTxs  => { 
108+                 // Process directly anchored transaction response 
109+                 if  let  Some ( ( txid,  tx,  anchors) )  = self . unprocessed_anchored_txs . pop_front ( )  { 
110+                     // Find the anchor that matches the confirmed BlockId 
111+                     let  best_anchor = response. and_then ( |block_id| { 
112+                         anchors
113+                             . iter ( ) 
114+                             . find ( |anchor| anchor. anchor_block ( )  == block_id) 
115+                             . cloned ( ) 
116+                     } ) ; 
117+ 
118+                     match  best_anchor { 
119+                         Some ( best_anchor)  => { 
120+                             // Transaction has a confirmed anchor 
121+                             self . direct_anchors . insert ( txid,  best_anchor. clone ( ) ) ; 
122+                             if  !self . is_canonicalized ( txid)  { 
123+                                 self . mark_canonical ( 
124+                                     txid, 
125+                                     tx, 
126+                                     CanonicalReason :: from_anchor ( best_anchor) , 
127+                                 ) ; 
128+                             } 
129+                         } 
130+                         None  => { 
131+                             // No confirmed anchor found, add to leftover transactions for later 
132+                             // processing 
133+                             self . unprocessed_leftover_txs . push_back ( ( 
134+                                 txid, 
135+                                 tx, 
136+                                 anchors
137+                                     . iter ( ) 
138+                                     . last ( ) 
139+                                     . expect ( 
140+                                         "tx taken from `unprocessed_anchored_txs` so it must have at least one anchor" , 
141+                                     ) 
142+                                     . confirmation_height_upper_bound ( ) , 
143+                             ) ) 
144+                         } 
85145                    } 
86146                } 
87-                 None  => { 
88-                     self . unprocessed_leftover_txs . push_back ( ( 
89-                             txid, 
90-                             tx, 
91-                             anchors
92-                                 . iter ( ) 
93-                                 . last ( ) 
94-                                 . expect ( 
95-                                     "tx taken from `unprocessed_txs_with_anchors` so it must at least have an anchor" , 
96-                                 ) 
97-                                 . confirmation_height_upper_bound ( ) , 
98-                         ) ) 
147+             } 
148+             CanonicalStage :: TransitivelyAnchoredTxs  => { 
149+                 // Process transitively anchored transaction response 
150+                 if  let  Some ( ( txid,  _tx,  anchors) )  =
151+                     self . unprocessed_transitively_anchored_txs . pop_front ( ) 
152+                 { 
153+                     // Find the anchor that matches the confirmed BlockId 
154+                     let  best_anchor = response. and_then ( |block_id| { 
155+                         anchors
156+                             . iter ( ) 
157+                             . find ( |anchor| anchor. anchor_block ( )  == block_id) 
158+                             . cloned ( ) 
159+                     } ) ; 
160+ 
161+                     if  let  Some ( best_anchor)  = best_anchor { 
162+                         // Found a confirmed anchor for this transitively anchored transaction 
163+                         self . direct_anchors . insert ( txid,  best_anchor. clone ( ) ) ; 
164+                         // Note: We don't re-mark as canonical since it's already marked 
165+                         // from being transitively anchored by its descendant 
166+                     } 
167+                     // If no confirmed anchor, we keep the transitive canonicalization status 
99168                } 
100169            } 
170+             CanonicalStage :: SeenTxs  | CanonicalStage :: LeftOverTxs  | CanonicalStage :: Finished  => { 
171+                 // These stages don't generate queries and shouldn't receive responses 
172+                 debug_assert ! ( 
173+                     false , 
174+                     "resolve_query called for stage {:?} which doesn't generate queries" , 
175+                     self . current_stage
176+                 ) ; 
177+             } 
101178        } 
102179    } 
103180
104181    fn  is_finished ( & mut  self )  -> bool  { 
105-         self . unprocessed_anchored_txs . is_empty ( ) 
182+         // Try to advance stages first 
183+         self . try_advance ( ) ; 
184+         // Check if we've reached the Finished stage 
185+         self . current_stage  == CanonicalStage :: Finished 
106186    } 
107187
108-     fn  finish ( mut  self )  -> Self :: Output  { 
109-         // Process remaining transactions (seen and leftover) 
110-         self . process_seen_txs ( ) ; 
111-         self . process_leftover_txs ( ) ; 
112- 
188+     fn  finish ( self )  -> Self :: Output  { 
113189        // Build the canonical view 
114190        let  mut  view_order = Vec :: new ( ) ; 
115191        let  mut  view_txs = HashMap :: new ( ) ; 
@@ -138,7 +214,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
138214                // Determine chain position based on reason 
139215                let  chain_position = match  reason { 
140216                    CanonicalReason :: Assumed  {  descendant }  => match  descendant { 
141-                         Some ( _)  => match  self . confirmed_anchors . get ( txid)  { 
217+                         Some ( _)  => match  self . direct_anchors . get ( txid)  { 
142218                            Some ( confirmed_anchor)  => ChainPosition :: Confirmed  { 
143219                                anchor :  confirmed_anchor, 
144220                                transitively :  None , 
@@ -154,7 +230,7 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
154230                        } , 
155231                    } , 
156232                    CanonicalReason :: Anchor  {  anchor,  descendant }  => match  descendant { 
157-                         Some ( _)  => match  self . confirmed_anchors . get ( txid)  { 
233+                         Some ( _)  => match  self . direct_anchors . get ( txid)  { 
158234                            Some ( confirmed_anchor)  => ChainPosition :: Confirmed  { 
159235                                anchor :  confirmed_anchor, 
160236                                transitively :  None , 
@@ -190,6 +266,49 @@ impl<'g, A: Anchor> ChainQuery for CanonicalizationTask<'g, A> {
190266} 
191267
192268impl < ' g ,  A :  Anchor >  CanonicalizationTask < ' g ,  A >  { 
269+     /// Try to advance to the next stage if the current stage is complete. 
270+      /// The loop continues through stages that process all their transactions at once 
271+      /// (SeenTxs and LeftOverTxs) to avoid needing multiple calls. 
272+      fn  try_advance ( & mut  self )  { 
273+         loop  { 
274+             let  advanced = match  self . current_stage  { 
275+                 CanonicalStage :: AnchoredTxs  => { 
276+                     if  self . unprocessed_anchored_txs . is_empty ( )  { 
277+                         self . current_stage  = CanonicalStage :: SeenTxs ; 
278+                         true  // Continue to process SeenTxs immediately 
279+                     }  else  { 
280+                         false  // Still have work, stop advancing 
281+                     } 
282+                 } 
283+                 CanonicalStage :: SeenTxs  => { 
284+                     // Process all seen transactions at once 
285+                     self . process_seen_txs ( ) ; 
286+                     self . current_stage  = CanonicalStage :: LeftOverTxs ; 
287+                     true  // Continue to process LeftOverTxs immediately 
288+                 } 
289+                 CanonicalStage :: LeftOverTxs  => { 
290+                     // Process all leftover transactions at once 
291+                     self . process_leftover_txs ( ) ; 
292+                     self . current_stage  = CanonicalStage :: TransitivelyAnchoredTxs ; 
293+                     false  // Stop here - TransitivelyAnchoredTxs need queries 
294+                 } 
295+                 CanonicalStage :: TransitivelyAnchoredTxs  => { 
296+                     if  self . unprocessed_transitively_anchored_txs . is_empty ( )  { 
297+                         self . current_stage  = CanonicalStage :: Finished ; 
298+                     } 
299+                     false  // Stop advancing 
300+                 } 
301+                 CanonicalStage :: Finished  => { 
302+                     false  // Already finished, nothing to do 
303+                 } 
304+             } ; 
305+ 
306+             if  !advanced { 
307+                 break ; 
308+             } 
309+         } 
310+     } 
311+ 
193312    /// Creates a new canonicalization task. 
194313     pub  fn  new ( 
195314        tx_graph :  & ' g  TxGraph < A > , 
@@ -222,12 +341,14 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
222341            unprocessed_anchored_txs, 
223342            unprocessed_seen_txs, 
224343            unprocessed_leftover_txs :  VecDeque :: new ( ) , 
344+             unprocessed_transitively_anchored_txs :  VecDeque :: new ( ) , 
225345
226346            canonical :  HashMap :: new ( ) , 
227347            not_canonical :  HashSet :: new ( ) , 
228348
229349            canonical_order :  Vec :: new ( ) , 
230-             confirmed_anchors :  HashMap :: new ( ) , 
350+             direct_anchors :  HashMap :: new ( ) , 
351+             current_stage :  CanonicalStage :: AnchoredTxs , 
231352        } ; 
232353
233354        // process assumed transactions first (they don't need queries) 
@@ -342,30 +463,28 @@ impl<'g, A: Anchor> CanonicalizationTask<'g, A> {
342463            for  txid in  undo_not_canonical { 
343464                self . not_canonical . remove ( & txid) ; 
344465            } 
345-         }  else  { 
346-             // Add to canonical order 
347-             for  ( txid,  tx,  reason)  in  & staged_canonical { 
348-                 self . canonical_order . push ( * txid) ; 
349- 
350-                 // If this was marked transitively, check if it has anchors to verify 
351-                 let  is_transitive = matches ! ( 
352-                     reason, 
353-                     CanonicalReason :: Anchor  { 
354-                         descendant:  Some ( _) , 
355-                         ..
356-                     }  | CanonicalReason :: Assumed  { 
357-                         descendant:  Some ( _) , 
358-                         ..
359-                     } 
360-                 ) ; 
466+             return ; 
467+         } 
361468
362-                 if  is_transitive { 
363-                     if  let  Some ( anchors)  = self . tx_graph . all_anchors ( ) . get ( txid)  { 
364-                         // only check anchors we haven't already confirmed 
365-                         if  !self . confirmed_anchors . contains_key ( txid)  { 
366-                             self . unprocessed_anchored_txs 
367-                                 . push_back ( ( * txid,  tx. clone ( ) ,  anchors) ) ; 
368-                         } 
469+         // Add to canonical order 
470+         for  ( txid,  tx,  reason)  in  & staged_canonical { 
471+             self . canonical_order . push ( * txid) ; 
472+ 
473+             // ObservedIn transactions don't need anchor verification 
474+             if  matches ! ( reason,  CanonicalReason :: ObservedIn  {  .. } )  { 
475+                 continue ; 
476+             } 
477+ 
478+             // Check if this transaction was marked transitively and needs its own anchors verified 
479+             if  reason. is_transitive ( )  { 
480+                 if  let  Some ( anchors)  = self . tx_graph . all_anchors ( ) . get ( txid)  { 
481+                     // only check anchors we haven't already confirmed 
482+                     if  !self . direct_anchors . contains_key ( txid)  { 
483+                         self . unprocessed_transitively_anchored_txs . push_back ( ( 
484+                             * txid, 
485+                             tx. clone ( ) , 
486+                             anchors, 
487+                         ) ) ; 
369488                    } 
370489                } 
371490            } 
@@ -460,6 +579,12 @@ impl<A: Clone> CanonicalReason<A> {
460579            CanonicalReason :: ObservedIn  {  descendant,  .. }  => descendant, 
461580        } 
462581    } 
582+ 
583+     /// Returns true if this reason represents a transitive canonicalization 
584+      /// (i.e., the transaction is canonical because of its descendant). 
585+      pub  fn  is_transitive ( & self )  -> bool  { 
586+         self . descendant ( ) . is_some ( ) 
587+     } 
463588} 
464589
465590#[ cfg( test) ]  
0 commit comments