-
Notifications
You must be signed in to change notification settings - Fork 284
Fengttt spill doit. #22686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fengttt spill doit. #22686
Conversation
|
You are nearing your monthly Qodo Merge usage quota. For more information, please visit here. PR Compliance Guide 🔍Below is a summary of compliance checks for this PR:
Compliance status legend🟢 - Fully Compliant🟡 - Partial Compliant 🔴 - Not Compliant ⚪ - Requires Further Human Verification 🏷️ - Compliance label |
||||||||||||||||||||
|
You are nearing your monthly Qodo Merge usage quota. For more information, please visit here. PR Code Suggestions ✨Explore these optional code suggestions:
|
||||||||||||||||
1463d51 to
54c01cb
Compare
Unscrew. Expect tons of bugs ...
Now we have something that compiles, but there will be tons of bugs ...
And fix a few bugs.
yeah, I know ...
b05a25f to
fb96506
Compare
User description
What type of PR is this?
Which issue(s) this PR fixes:
issue #3433
What this PR does / why we need it:
HashAgg spill. Refactor group and merge group code. Now if agg group mem usage
reached a threshold, we will spill to disk and read it back.
merge group is simply spill to network, and read it back from network.
PR Type
Bug fix, Enhancement, Tests
Description
Implements comprehensive HashAgg spill-to-disk functionality with memory management and multi-pass support
Adds new
GroupandMergeGroupoperators with state machine architecture (Build/Eval/End states) for handling group-by and aggregation operationsImplements bucket-based partitioning and spill file I/O with serialization/deserialization of aggregation state
Adds distinct value tracking and serialization support across all aggregation functions (COUNT, SUM, AVG, MEDIAN, GROUP_CONCAT, APPROX_COUNT, window functions, etc.)
Refactors batch serialization to use flexible
ExtraBuf1andExtraBuf2fields instead of directAggsfield storageMigrates
MergeGroupoperator from separatemergegrouppackage to unifiedgrouppackageAdds reader-based vector and batch deserialization for streaming I/O support
Implements
AllGroupHash()method across all hash map implementations for spill bucket computationAdds utility functions for primitive type serialization/deserialization in encoding module
Fixes hash code slice allocation logic and spill file service type handling
Updates window operator to use
colexec.ExprEvalVectorinstead ofgroup.ExprEvalVectorAdds
MarshalBinary()methods to aggregation context types for serialization supportDiagram Walkthrough
File Walkthrough
4 files
exec2.go
New HashAgg spill implementation with state machinepkg/sql/colexec/group/exec2.go
disk spillover
Prepare()andCall()methods for group operator state machine(Build/Eval/End states)
filling
multi-pass support
helper.go
Spill file I/O and hash table helper functionspkg/sql/colexec/group/helper.go
ResHashRelatedstruct for hash table management with inserttracking
spillDataToDisk()to serialize group-by batches and aggregates tospill files
loadSpilledData()to deserialize and merge spilled databack into memory
traversal
types2.go
Group operator type definitions and container managementpkg/sql/colexec/group/types2.go
GroupandMergeGroupoperator types with spill memoryconfiguration
spill bucket management
lifecycle
mergeGroup.go
MergeGroup operator for partial result merging with spillpkg/sql/colexec/group/mergeGroup.go
MergeGroupoperator for merging partial aggregation resultsPrepare()andCall()methods with spill support during mergephase
merge
table insertion
29 files
result.go
Add distinct tracking and spill serialization to aggregation resultspkg/sql/colexec/aggexec/result.go
hasDistinctparameter to result initialization functions fordistinct value tracking
marshalToBuffers()andmarshalChunkToBuffer()for spill supportunmarshalFromReader()for loadingspilled data
distinctFill()anddistinctMerge()methods to handle distinctaggregation during merge
batch.go
Batch serialization refactoring for spill supportpkg/container/batch/batch.go
Aggsfield serialization withExtraBuf1andExtraBuf2forflexible metadata storage
UnmarshalFromReader()method for streaming deserialization fromI/O readers
MarshalBinary()to delegate toMarshalBinaryWithBuffer()IsEmpty()check to only consider row count, not aggregatestypes.go
Aggregation function serialization interface and helperspkg/sql/colexec/aggexec/types.go
SaveIntermediateResult(),SaveIntermediateResultOfChunk(), andUnmarshalFromReader()toAggFuncExecmarshalRetAndGroupsToBuffer()and
marshalChunkToBuffer()for spill serializationAggFuncExecExpressionserialization viaMarshalToBuffer()andUnmarshalFromReader()unmarshalFromReader()fordeserialization
concat.go
GROUP_CONCAT distinct handling refactoring for spillpkg/sql/colexec/aggexec/concat.go
groupConcatExecto result object viadistinctFill()anddistinctMerge()SaveIntermediateResult()andSaveIntermediateResultOfChunk()for spill serializationUnmarshalFromReader()for loading spilled group_concat statehasDistinctflag to result objectapprox_count.go
APPROX_COUNT spill serialization supportpkg/sql/colexec/aggexec/approx_count.go
SaveIntermediateResult()andSaveIntermediateResultOfChunk()for HyperLogLog sketch serializationUnmarshalFromReader()to deserialize sketch groups from spilleddata
hasDistinctflag (false) to resultobject
count.go
COUNT aggregation distinct and spill serializationpkg/sql/colexec/aggexec/count.go
countColumnExecto result object viadistinctFill()SaveIntermediateResult()andSaveIntermediateResultOfChunk()for spill serializationUnmarshalFromReader()for loading spilled count statecountStarExecwith spill serialization methods and validationwindow.go
Window function spill serialization supportpkg/sql/colexec/aggexec/window.go
SaveIntermediateResult()andSaveIntermediateResultOfChunk()for rank/dense_rank/row_numberserialization
UnmarshalFromReader()to deserialize window function groups fromspilled data
i64Slicetype withMarshalBinary()for int64 sliceserialization
hasDistinct=falseto result objectmedian.go
MEDIAN aggregation spill serialization supportpkg/sql/colexec/aggexec/median.go
SaveIntermediateResult()andSaveIntermediateResultOfChunk()for median value serializationUnmarshalFromReader()to deserialize median groups from spilleddata
hasDistinctparameterdistinctHashfield from median executorfromFixedRetFixed.go
Generic fixed-to-fixed aggregation spill supportpkg/sql/colexec/aggexec/fromFixedRetFixed.go
SaveIntermediateResult()andSaveIntermediateResultOfChunk()for generic fixed-to-fixed aggregationserialization
UnmarshalFromReader()to deserialize group contexts from spilleddata
hasDistinctflag to result objectfromFixedRetBytes.go
Add spill support and distinct handling to fixed-to-bytes aggregatorpkg/sql/colexec/aggexec/fromFixedRetBytes.go
bytes,io, andmoerrpackagesmarshal()to handle a newdistreturn value frommarshalToBytes()with validationSaveIntermediateResult(),SaveIntermediateResultOfChunk(), andUnmarshalFromReader()for spillsupport
unmarshal()to passnilfor distinct parameterinit()to passinfo.IsDistinct()flag toinitAggResultWithBytesTypeResult()and removed separate distinct hashinitialization
fromBytesRetFixed.go
Add spill support to bytes-to-fixed aggregatorpkg/sql/colexec/aggexec/fromBytesRetFixed.go
bytes,io, andmoerrpackagesmarshal()to handle newdistreturn value with validationSaveIntermediateResult(),SaveIntermediateResultOfChunk(), andUnmarshalFromReader()for spillsupport
unmarshal()to passnilfor distinct parameterinit()to passfalsefor distinct flag toinitAggResultWithFixedTypeResult()fromBytesRetBytes.go
Add spill support to bytes-to-bytes aggregatorpkg/sql/colexec/aggexec/fromBytesRetBytes.go
bytes,io, andmoerrpackagesmarshal()to handle newdistreturn value with validationunmarshal()to passnilfor distinct parameterSaveIntermediateResult(),SaveIntermediateResultOfChunk(), andUnmarshalFromReader()for spillsupport
init()to passfalsefor distinct flag toinitAggResultWithBytesTypeResult()encoding.go
Add I/O utility functions for type serializationpkg/container/types/encoding.go
mpoolpackageWriteSizeBytes(),ReadInt64(),WriteInt64(),ReadBool(),ReadInt32(),WriteInt32(),ReadInt32AsInt(),ReadByte(),ReadByteAsInt(),ReadType(),ReadSizeBytes()memory pool allocation
vector.go
Add reader-based vector deserialization methodpkg/container/vector/vector.go
ioimportUnmarshalWithReader()that deserializes a vector froman
io.Readerwith support for memory pool allocationarea, nsp) sequentially from reader
var_pop.go
Add MarshalBinary methods to var_pop aggregation contextspkg/sql/plan/function/agg/var_pop.go
mathbefore other importsMarshalBinary()method toaggVarPopGroupContext,aggVarPopOfDecimalGroupContext, andaggVarPopOfDecimalCommonContexttypes
evalExpression.go
Add ExprEvalVector type for expression evaluationpkg/sql/colexec/evalExpression.go
ExprEvalVectorstruct type with fields for executors,vectors, and types
MakeEvalVector()function to create and initializeExprEvalVectorfrom plan expressionsFree()andResetForNextQuery()methods toExprEvalVectorforresource management
avg.go
Add MarshalBinary methods to avg aggregation contextspkg/sql/plan/function/agg/avg.go
MarshalBinary()method toaggAvgContexttypeMarshalBinary()method toaggAvgDecimalCommonCtxtypedistinct.go
Add reader-based deserialization for distinct hashpkg/sql/colexec/aggexec/distinct.go
marshalToBuffers()method to serialize distinct hash maps basedon flags
unmarshal()to delegate to newunmarshalFromReader()methodunmarshalFromReader()method to deserialize fromio.Readerinstead of byte slice
avg_tw_result.go
Add MarshalBinary methods to avg_tw_result contextspkg/sql/plan/function/agg/avg_tw_result.go
MarshalBinary()method toAvgTwResultContexttypeMarshalBinary()method toAvgTwResultDecimalContexttypeavg_tw_cache.go
Add MarshalBinary methods to avg_tw_cache contextspkg/sql/plan/function/agg/avg_tw_cache.go
MarshalBinary()method toAvgTwCacheContexttypeMarshalBinary()method toAvgTwCacheDecimalContexttypesum.go
Add MarshalBinary method to sum aggregation contextpkg/sql/plan/function/agg/sum.go
MarshalBinary()method toaggSumDecimaltypetypes.go
Add AllGroupHash method to HashMap interfacepkg/common/hashmap/types.go
AllGroupHash()method signature toHashMapinterfacestring_hash_map.go
Implement AllGroupHash for StringHashMappkg/container/hashtable/string_hash_map.go
AllGroupHash()method implementation that returns hash codes forall mapped groups
int64_hash_map.go
Implement AllGroupHash for Int64HashMappkg/container/hashtable/int64_hash_map.go
AllGroupHash()method implementation that returns hash codes forall mapped groups
aggContext.go
Add method to retrieve group contexts for marshallingpkg/sql/colexec/aggexec/aggContext.go
getGroupContextBinaryMarshaller()method to retrieve groupcontexts for binary marshalling
aggMethod.go
Add MarshalBinary to AggCanMarshal interfacepkg/sql/colexec/aggexec/aggMethod.go
MarshalBinary() ([]byte, error)method signature toAggCanMarshalinterfaceinthashmap.go
Implement AllGroupHash for IntHashMappkg/common/hashmap/inthashmap.go
AllGroupHash()method that delegates to underlying hash mapimplementation
strhashmap.go
Implement AllGroupHash for StrHashMappkg/common/hashmap/strhashmap.go
AllGroupHash()method that delegates to underlying hash mapimplementation
bitmap.go
Add MarshalBinary method to bitmap aggregation contextpkg/sql/plan/function/agg/bitmap.go
MarshalBinary()method toaggBitmapGroupContexttype4 files
result_test.go
Test updates for distinct parameter in result functionspkg/sql/colexec/aggexec/result_test.go
hasDistinctparameter to resultinitialization functions
marshalToBytes()calls to handle third return value fordistinct data
unmarshalFromBytes()calls to pass nil for distinct dataparameter
remoterun_test.go
Update test to use group.MergeGrouppkg/sql/compile/remoterun_test.go
mergegrouppackage&mergegroup.MergeGroup{}to&group.MergeGroup{}in test databatch_test.go
Update batch tests for extra buffer fieldspkg/container/batch/batch_test.go
testingbefore other importsExtraBuf1andExtraBuf2fields arepreserved during marshal/unmarshal
newBatch()helperExtraBuf1andExtraBuf2fields in test batchaggFrame_test.go
Add MarshalBinary method to test contextpkg/sql/colexec/aggexec/aggFrame_test.go
MarshalBinary()method toavgDemoCtxtest type7 files
window.go
Window operator refactoring for expression evaluationpkg/sql/colexec/window/window.go
aggVecsandorderVecsfromgroup.ExprEvalVectortocolexec.ExprEvalVectorctr.bat.Aggswithctr.batAggsto separate batch data fromaggregation state
MakeEvalVector()calls to usecolexecpackage instead ofgrouppackage
remoterun.go
Migrate MergeGroup from mergegroup to group packagepkg/sql/compile/remoterun.go
mergegrouppackage*mergegroup.MergeGroupreferences to*group.MergeGroupmergegroup.NewArgument()call togroup.NewArgumentMergeGroup()EncodeMergeGroup()andDecodeMergeGroup()to use*group.MergeGroupscope.go
Migrate MergeGroup references and simplify optimization logicpkg/sql/compile/scope.go
mergegrouppackagefindMergeGroup()return type from*mergegroup.MergeGroupto*group.MergeGroup*mergegroup.MergeGroupto*group.MergeGroupaggOptimize()logic by removing the assignment ofPartialResultsandPartialResultTypesto mergeGrouptypes.go
Refactor window container to use colexec.ExprEvalVectorpkg/sql/colexec/window/types.go
colexecbeforeaggexecgrouppackagebatAggsfield tocontainerstruct for storing aggregationfunctions
orderVecsandaggVecstypes from[]group.ExprEvalVectorto[]colexec.ExprEvalVectorfreeAggFun()to usectr.batAggsinstead ofctr.bat.Aggsoperator.go
Migrate MergeGroup construction to group packagepkg/sql/compile/operator.go
mergegrouppackageconstructMergeGroup()return type from*mergegroup.MergeGroupto
*group.MergeGroupmergegroup.NewArgument()call togroup.NewArgumentMergeGroup()compile.go
Update MergeGroup type reference in compilepkg/sql/compile/compile.go
mergegrouppackage*mergegroup.MergeGrouptype assertion to*group.MergeGroupincompileProjection()methodtypes.go
Replace Aggs field with extra buffer fields in Batchpkg/container/batch/types.go
aggexecpackageAggs []aggexec.AggFuncExecfield withExtraBuf1 []byteandExtraBuf2 []bytefields2 files
buffer.go
Reorder imports for consistencypkg/container/pSpool/buffer.go
syncbefore other importscopy.go
Reorder imports for consistencypkg/container/pSpool/copy.go
mathbefore other imports2 files
process.go
Update spill file service to return LocalFS typepkg/vm/process/process.go
moerrpackageGetSpillFileService()return type fromfileservice.MutableFileServiceto*fileservice.LocalFS*fileservice.LocalFSinstanceindex_util.go
Fix hash code slice allocation logicpkg/sql/util/index_util.go
rowCount > cap(hashCode)torowCount > len(hashCode)hashCodeto exact rowCount when capacity issufficient
9 files