@@ -29,7 +29,6 @@ use databend_common_pipeline_core::processors::InputPort;
2929use databend_common_pipeline_core:: processors:: OutputPort ;
3030use databend_common_pipeline_core:: processors:: Processor ;
3131use log:: debug;
32- use log:: info;
3332use parking_lot:: Mutex ;
3433use tokio:: sync:: Barrier ;
3534
@@ -42,57 +41,6 @@ use crate::pipelines::processors::transforms::aggregator::AggregateMeta;
4241use crate :: pipelines:: processors:: transforms:: aggregator:: AggregatePayload ;
4342use crate :: pipelines:: processors:: transforms:: aggregator:: AggregatorParams ;
4443
45- const MAX_BYTES_PER_FINAL_AGGREGATE_HASH_TABLE : usize = 16 * 1024 * 1024 ; // 16MB
46-
47- #[ allow( clippy:: enum_variant_names) ]
48- enum HashTable {
49- MovedOut ,
50- AggregateHashTable ( AggregateHashTable ) ,
51- }
52-
53- impl HashTable {
54- fn new ( params : & AggregatorParams ) -> Self {
55- HashTable :: AggregateHashTable ( Self :: create_table ( params) )
56- }
57-
58- fn take_or_create ( & mut self , params : & AggregatorParams ) -> ( AggregateHashTable , bool ) {
59- match std:: mem:: replace ( self , HashTable :: MovedOut ) {
60- HashTable :: AggregateHashTable ( ht) => {
61- let allocated = ht. allocated_bytes ( ) ;
62- if allocated > MAX_BYTES_PER_FINAL_AGGREGATE_HASH_TABLE {
63- info ! (
64- "NewFinalAggregateTransform hash table re-created due to memory limit, allocated: {}, max allowed: {}" ,
65- allocated, MAX_BYTES_PER_FINAL_AGGREGATE_HASH_TABLE
66- ) ;
67- ( Self :: create_table ( params) , false )
68- } else {
69- ( ht, true )
70- }
71- }
72- HashTable :: MovedOut => ( Self :: create_table ( params) , false ) ,
73- }
74- }
75-
76- fn set ( & mut self , table : AggregateHashTable ) {
77- * self = HashTable :: AggregateHashTable ( table) ;
78- }
79-
80- fn release ( & mut self ) {
81- if let HashTable :: AggregateHashTable ( ht) = std:: mem:: replace ( self , HashTable :: MovedOut ) {
82- drop ( ht) ;
83- }
84- }
85-
86- fn create_table ( params : & AggregatorParams ) -> AggregateHashTable {
87- AggregateHashTable :: new (
88- params. group_data_types . clone ( ) ,
89- params. aggregate_functions . clone ( ) ,
90- HashTableConfig :: default ( ) . with_initial_radix_bits ( 0 ) ,
91- Arc :: new ( Bump :: new ( ) ) ,
92- )
93- }
94- }
95-
9644pub struct NewFinalAggregateTransform {
9745 input : Arc < InputPort > ,
9846 output : Arc < OutputPort > ,
@@ -102,7 +50,6 @@ pub struct NewFinalAggregateTransform {
10250 /// final aggregate
10351 params : Arc < AggregatorParams > ,
10452 flush_state : PayloadFlushState ,
105- hash_table : HashTable ,
10653
10754 /// storing repartition result
10855 repartitioned_queues : RepartitionedQueues ,
@@ -129,15 +76,13 @@ impl NewFinalAggregateTransform {
12976 max_aggregate_spill_level : usize ,
13077 ) -> Result < Box < dyn Processor > > {
13178 let round_state = LocalRoundState :: new ( max_aggregate_spill_level) ;
132- let hash_table = HashTable :: new ( params. as_ref ( ) ) ;
13379 Ok ( Box :: new ( NewFinalAggregateTransform {
13480 input,
13581 output,
13682 id,
13783 partition_count,
13884 params,
13985 flush_state : PayloadFlushState :: default ( ) ,
140- hash_table,
14186 round_state,
14287 repartitioned_queues : RepartitionedQueues :: create ( partition_count) ,
14388 barrier,
@@ -232,12 +177,6 @@ impl NewFinalAggregateTransform {
232177 self . round_state . current_queue_spill_round < self . round_state . max_aggregate_spill_level ;
233178 let need_spill = self . spiller . memory_settings . check_spill ( ) ;
234179
235- // if memory pressure is high, release hashtable's memory immediately
236- if need_spill {
237- info ! ( "NewFinalAggregateTransform[{}] hash table released memory due to high memory pressure" , self . id) ;
238- self . hash_table . release ( ) ;
239- }
240-
241180 if !can_trigger_spill {
242181 if need_spill {
243182 debug ! (
@@ -281,42 +220,60 @@ impl NewFinalAggregateTransform {
281220 }
282221
283222 fn final_aggregate ( & mut self , mut queue : Vec < AggregateMeta > ) -> Result < ( ) > {
284- let ( mut agg_hashtable, need_clear) = self . hash_table . take_or_create ( self . params . as_ref ( ) ) ;
285-
286- // we will clear the hashtable for reuse, this will not release the payload memory
287- // but only clear the internal state, so that we can avoid re-allocating memory
288- // real memory release will happen when memory pressure is high
289- if need_clear {
290- agg_hashtable. clear_for_reuse ( ) ;
291- }
223+ let mut agg_hashtable: Option < AggregateHashTable > = None ;
292224
293225 while let Some ( meta) = queue. pop ( ) {
294226 match meta {
295- AggregateMeta :: Serialized ( payload) => {
296- let partitioned = payload. convert_to_partitioned_payload (
297- self . params . group_data_types . clone ( ) ,
298- self . params . aggregate_functions . clone ( ) ,
299- self . params . num_states ( ) ,
300- 0 ,
301- Arc :: new ( Bump :: new ( ) ) ,
302- ) ?;
303- agg_hashtable. combine_payloads ( & partitioned, & mut self . flush_state ) ?;
304- }
305- AggregateMeta :: AggregatePayload ( payload) => {
306- agg_hashtable. combine_payload ( & payload. payload , & mut self . flush_state ) ?;
307- }
227+ AggregateMeta :: Serialized ( payload) => match agg_hashtable. as_mut ( ) {
228+ Some ( ht) => {
229+ let payload = payload. convert_to_partitioned_payload (
230+ self . params . group_data_types . clone ( ) ,
231+ self . params . aggregate_functions . clone ( ) ,
232+ self . params . num_states ( ) ,
233+ 0 ,
234+ Arc :: new ( Bump :: new ( ) ) ,
235+ ) ?;
236+ ht. combine_payloads ( & payload, & mut self . flush_state ) ?;
237+ }
238+ None => {
239+ agg_hashtable = Some ( payload. convert_to_aggregate_table (
240+ self . params . group_data_types . clone ( ) ,
241+ self . params . aggregate_functions . clone ( ) ,
242+ self . params . num_states ( ) ,
243+ 0 ,
244+ Arc :: new ( Bump :: new ( ) ) ,
245+ true ,
246+ ) ?) ;
247+ }
248+ } ,
249+ AggregateMeta :: AggregatePayload ( payload) => match agg_hashtable. as_mut ( ) {
250+ Some ( ht) => {
251+ ht. combine_payload ( & payload. payload , & mut self . flush_state ) ?;
252+ }
253+ None => {
254+ let capacity =
255+ AggregateHashTable :: get_capacity_for_count ( payload. payload . len ( ) ) ;
256+ let mut hashtable = AggregateHashTable :: new_with_capacity (
257+ self . params . group_data_types . clone ( ) ,
258+ self . params . aggregate_functions . clone ( ) ,
259+ HashTableConfig :: default ( ) . with_initial_radix_bits ( 0 ) ,
260+ capacity,
261+ Arc :: new ( Bump :: new ( ) ) ,
262+ ) ;
263+ hashtable. combine_payload ( & payload. payload , & mut self . flush_state ) ?;
264+ agg_hashtable = Some ( hashtable) ;
265+ }
266+ } ,
308267 _ => unreachable ! ( ) ,
309268 }
310269 }
311270
312- let output_block = if agg_hashtable. len ( ) == 0 {
313- self . params . empty_result_block ( )
314- } else {
271+ let output_block = if let Some ( mut ht) = agg_hashtable {
315272 let mut blocks = vec ! [ ] ;
316273 self . flush_state . clear ( ) ;
317274
318275 loop {
319- if agg_hashtable . merge_result ( & mut self . flush_state ) ? {
276+ if ht . merge_result ( & mut self . flush_state ) ? {
320277 let mut entries = self . flush_state . take_aggregate_results ( ) ;
321278 let group_columns = self . flush_state . take_group_columns ( ) ;
322279 entries. extend_from_slice ( & group_columns) ;
@@ -332,10 +289,10 @@ impl NewFinalAggregateTransform {
332289 } else {
333290 DataBlock :: concat ( & blocks) ?
334291 }
292+ } else {
293+ self . params . empty_result_block ( )
335294 } ;
336295
337- self . hash_table . set ( agg_hashtable) ;
338-
339296 if output_block. is_empty ( ) {
340297 self . round_state . phase = RoundPhase :: Idle ;
341298 } else {
@@ -374,12 +331,6 @@ impl NewFinalAggregateTransform {
374331
375332 Ok ( ( ) )
376333 }
377-
378- fn on_finish ( & mut self ) -> Result < ( ) > {
379- self . round_state . phase = RoundPhase :: Finish ;
380- self . hash_table . release ( ) ;
381- Ok ( ( ) )
382- }
383334}
384335
385336#[ async_trait:: async_trait]
@@ -394,10 +345,6 @@ impl Processor for NewFinalAggregateTransform {
394345
395346 fn event ( & mut self ) -> Result < Event > {
396347 if self . output . is_finished ( ) {
397- if !matches ! ( self . round_state. phase, RoundPhase :: Finish ) {
398- self . round_state . phase = RoundPhase :: Finish ;
399- return Ok ( Event :: Sync ) ;
400- }
401348 self . input . finish ( ) ;
402349 return Ok ( Event :: Finished ) ;
403350 }
@@ -461,10 +408,6 @@ impl Processor for NewFinalAggregateTransform {
461408 }
462409
463410 if self . input . is_finished ( ) {
464- if !matches ! ( self . round_state. phase, RoundPhase :: Finish ) {
465- self . round_state . phase = RoundPhase :: Finish ;
466- return Ok ( Event :: Sync ) ;
467- }
468411 self . output . finish ( ) ;
469412 return Ok ( Event :: Finished ) ;
470413 }
@@ -494,7 +437,6 @@ impl Processor for NewFinalAggregateTransform {
494437 . take_queue ( self . id ) ;
495438 self . final_aggregate ( queue)
496439 }
497- RoundPhase :: Finish => self . on_finish ( ) ,
498440 _ => Err ( ErrorCode :: Internal ( format ! (
499441 "NewFinalAggregateTransform process called in {} state" ,
500442 phase
0 commit comments