Skip to content

Commit 10134dd

Browse files
committed
AggSpill 2: spill and load.
1 parent 0bb54d7 commit 10134dd

File tree

13 files changed

+175
-84
lines changed

13 files changed

+175
-84
lines changed

pkg/sql/colexec/aggexec/approx_count.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
6666
return encoded.Marshal()
6767
}
6868

69-
func (exec *approxCountFixedExec[T]) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
69+
func (exec *approxCountFixedExec[T]) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
7070
return marshalRetAndGroupsToBuffer(
71-
bucketIdx, bucket, buf,
71+
cnt, flags, buf,
7272
&exec.ret.optSplitResult, exec.groups)
7373
}
7474

@@ -137,9 +137,9 @@ func (exec *approxCountVarExec) marshal() ([]byte, error) {
137137
return encoded.Marshal()
138138
}
139139

140-
func (exec *approxCountVarExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
140+
func (exec *approxCountVarExec) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
141141
return marshalRetAndGroupsToBuffer(
142-
bucketIdx, bucket, buf,
142+
cnt, flags, buf,
143143
&exec.ret.optSplitResult, exec.groups)
144144
}
145145

pkg/sql/colexec/aggexec/concat.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ func (exec *groupConcatExec) unmarshal(_ *mpool.MPool, result, empties, groups [
6464
return exec.ret.unmarshalFromBytes(result, empties, groups[1:])
6565
}
6666

67-
func (exec *groupConcatExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
67+
func (exec *groupConcatExec) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
6868
err := marshalRetAndGroupsToBuffer[dummyBinaryMarshaler](
69-
bucketIdx, bucket, buf,
69+
cnt, flags, buf,
7070
&exec.ret.optSplitResult, nil)
7171
if err != nil {
7272
return err

pkg/sql/colexec/aggexec/count.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,9 @@ func (exec *countColumnExec) marshal() ([]byte, error) {
5757
return encoded.Marshal()
5858
}
5959

60-
func (exec *countColumnExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
60+
func (exec *countColumnExec) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
6161
return marshalRetAndGroupsToBuffer[dummyBinaryMarshaler](
62-
bucketIdx, bucket, buf,
62+
cnt, flags, buf,
6363
&exec.ret.optSplitResult, nil)
6464
}
6565

@@ -300,9 +300,9 @@ func (exec *countStarExec) marshal() ([]byte, error) {
300300
return encoded.Marshal()
301301
}
302302

303-
func (exec *countStarExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
303+
func (exec *countStarExec) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
304304
return marshalRetAndGroupsToBuffer[dummyBinaryMarshaler](
305-
bucketIdx, bucket, buf,
305+
cnt, flags, buf,
306306
&exec.ret.optSplitResult, nil)
307307
}
308308
func (exec *countStarExec) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {

pkg/sql/colexec/aggexec/fromBytesRetBytes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,9 @@ func (exec *aggregatorFromBytesToBytes) unmarshal(_ *mpool.MPool, result, emptie
127127
return exec.ret.unmarshalFromBytes(result, empties, nil)
128128
}
129129

130-
func (exec *aggregatorFromBytesToBytes) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
130+
func (exec *aggregatorFromBytesToBytes) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
131131
return marshalRetAndGroupsToBuffer(
132-
bucketIdx, bucket, buf,
132+
cnt, flags, buf,
133133
&exec.ret.optSplitResult, exec.execContext.getGroupContextBinaryMarshaller())
134134
}
135135

pkg/sql/colexec/aggexec/fromBytesRetFixed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,9 @@ func (exec *aggregatorFromBytesToFixed[to]) marshal() ([]byte, error) {
218218
return encoded.Marshal()
219219
}
220220

221-
func (exec *aggregatorFromBytesToFixed[to]) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
221+
func (exec *aggregatorFromBytesToFixed[to]) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
222222
return marshalRetAndGroupsToBuffer(
223-
bucketIdx, bucket, buf,
223+
cnt, flags, buf,
224224
&exec.ret.optSplitResult, exec.execContext.getGroupContextBinaryMarshaller())
225225
}
226226

pkg/sql/colexec/aggexec/fromFixedRetBytes.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,9 +237,9 @@ func (exec *aggregatorFromFixedToBytes[from]) marshal() ([]byte, error) {
237237
return encoded.Marshal()
238238
}
239239

240-
func (exec *aggregatorFromFixedToBytes[from]) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
240+
func (exec *aggregatorFromFixedToBytes[from]) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
241241
return marshalRetAndGroupsToBuffer(
242-
bucketIdx, bucket, buf,
242+
cnt, flags, buf,
243243
&exec.ret.optSplitResult, exec.execContext.getGroupContextBinaryMarshaller())
244244
}
245245

pkg/sql/colexec/aggexec/fromFixedRetFixed.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -291,9 +291,9 @@ func (exec *aggregatorFromFixedToFixed[from, to]) marshal() ([]byte, error) {
291291
return encoded.Marshal()
292292
}
293293

294-
func (exec *aggregatorFromFixedToFixed[from, to]) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
294+
func (exec *aggregatorFromFixedToFixed[from, to]) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
295295
return marshalRetAndGroupsToBuffer(
296-
bucketIdx, bucket, buf,
296+
cnt, flags, buf,
297297
&exec.ret.optSplitResult, exec.execContext.getGroupContextBinaryMarshaller())
298298
}
299299

pkg/sql/colexec/aggexec/median.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,9 @@ func (exec *medianColumnExecSelf[T, R]) marshal() ([]byte, error) {
8383
return encoded.Marshal()
8484
}
8585

86-
func (exec *medianColumnExecSelf[T, R]) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
86+
func (exec *medianColumnExecSelf[T, R]) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
8787
return marshalRetAndGroupsToBuffer(
88-
bucketIdx, bucket, buf,
88+
cnt, flags, buf,
8989
&exec.ret.optSplitResult, exec.groups)
9090
}
9191

pkg/sql/colexec/aggexec/types.go

Lines changed: 14 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ type AggFuncExec interface {
112112
Flush() ([]*vector.Vector, error)
113113

114114
// Serialize intermediate result to bytes.
115-
SaveIntermediateResult(buketIdx []int64, bucket int64, buf *bytes.Buffer) error
115+
SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error
116116
SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error
117117

118118
Size() int64
@@ -309,36 +309,6 @@ func makeWindowExec(
309309
return makeRankDenseRankRowNumber(mg, info), nil
310310
}
311311

312-
// given buckets, and a specific bucket, compute the flags for vector union.
313-
func computeChunkFlags(bucketIdx []int64, bucket int64, chunkSize int) (int64, [][]uint8) {
314-
// compute the number of chunks,
315-
nChunks := (len(bucketIdx) + chunkSize - 1) / chunkSize
316-
317-
// return values
318-
cnt := int64(0)
319-
flags := make([][]uint8, nChunks)
320-
for i := range flags {
321-
flags[i] = make([]uint8, chunkSize)
322-
}
323-
324-
nextX := 0
325-
nextY := 0
326-
327-
for _, idx := range bucketIdx {
328-
nextY += 1
329-
if nextY == chunkSize {
330-
nextX += 1
331-
nextY = 0
332-
}
333-
334-
if idx == bucket {
335-
flags[nextX][nextY] = 1
336-
cnt += 1
337-
}
338-
}
339-
return cnt, flags
340-
}
341-
342312
type dummyBinaryMarshaler struct {
343313
encoding.BinaryMarshaler
344314
}
@@ -348,9 +318,8 @@ func (d dummyBinaryMarshaler) MarshalBinary() ([]byte, error) {
348318
}
349319

350320
func marshalRetAndGroupsToBuffer[T encoding.BinaryMarshaler](
351-
bucketIdx []int64, bucket int64, buf *bytes.Buffer,
321+
cnt int64, flags [][]uint8, buf *bytes.Buffer,
352322
ret *optSplitResult, groups []T) error {
353-
cnt, flags := computeChunkFlags(bucketIdx, bucket, ret.optInformation.chunkSize)
354323
buf.Write(types.EncodeInt64(&cnt))
355324
if cnt == 0 {
356325
return nil
@@ -359,18 +328,19 @@ func marshalRetAndGroupsToBuffer[T encoding.BinaryMarshaler](
359328
return err
360329
}
361330
if len(groups) > 0 {
362-
if len(groups) != len(bucketIdx) {
363-
return moerr.NewInternalErrorNoCtx("approx_count: the number of groups does not match the number of buckets")
364-
}
365-
for i := range groups {
366-
if bucketIdx[i] == bucket {
367-
bs, err := groups[i].MarshalBinary()
368-
if err != nil {
369-
return err
370-
}
371-
if err = types.WriteSizeBytes(bs, buf); err != nil {
372-
return err
331+
groupIdx := 0
332+
for i := range flags {
333+
for j := range flags[i] {
334+
if flags[i][j] == 1 {
335+
bs, err := groups[groupIdx].MarshalBinary()
336+
if err != nil {
337+
return err
338+
}
339+
if err = types.WriteSizeBytes(bs, buf); err != nil {
340+
return err
341+
}
373342
}
343+
groupIdx += 1
374344
}
375345
}
376346
}

pkg/sql/colexec/aggexec/window.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,9 @@ func (exec *singleWindowExec) marshal() ([]byte, error) {
9393
return encoded.Marshal()
9494
}
9595

96-
func (exec *singleWindowExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
96+
func (exec *singleWindowExec) SaveIntermediateResult(cnt int64, flags [][]uint8, buf *bytes.Buffer) error {
9797
return marshalRetAndGroupsToBuffer(
98-
bucketIdx, bucket, buf,
98+
cnt, flags, buf,
9999
&exec.ret.optSplitResult, exec.groups)
100100
}
101101

0 commit comments

Comments
 (0)