Skip to content

Commit 0bb54d7

Browse files
committed
Agg distinct is totally screwed.
Unscrew. Expect tons of bugs ...
1 parent acf9e49 commit 0bb54d7

22 files changed

+407
-238
lines changed

pkg/common/hashmap/inthashmap.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,3 +370,7 @@ func (m *IntHashMap) UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (int
370370

371371
return n, nil
372372
}
373+
374+
func (m *IntHashMap) AllGroupHash() []uint64 {
375+
return m.hashMap.AllGroupHash()
376+
}

pkg/common/hashmap/strhashmap.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,3 +461,7 @@ func (m *StrHashMap) UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (int
461461

462462
return n, nil
463463
}
464+
465+
func (m *StrHashMap) AllGroupHash() []uint64 {
466+
return m.hashMap.AllGroupHash()
467+
}

pkg/common/hashmap/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type HashMap interface {
5757
WriteTo(w io.Writer) (int64, error)
5858
// UnmarshalFrom deserializes a byte slice from a reader.
5959
UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (int64, error)
60+
// Get all (group, hashCode) pairs
61+
AllGroupHash() []uint64
6062
}
6163

6264
// Iterator allows users to do insert or find operations on hash tables in bulk.

pkg/container/hashtable/int64_hash_map.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,3 +419,15 @@ func (ht *Int64HashMap) UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (
419419

420420
return
421421
}
422+
423+
func (ht *Int64HashMap) AllGroupHash() []uint64 {
424+
ret := make([]uint64, ht.elemCnt)
425+
for i := range ht.cells {
426+
for _, c := range ht.cells[i] {
427+
if c.Mapped != 0 {
428+
ret[c.Mapped-1] = c.Key
429+
}
430+
}
431+
}
432+
return ret
433+
}

pkg/container/hashtable/string_hash_map.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,3 +426,15 @@ func (ht *StringHashMap) UnmarshalFrom(r io.Reader, allocator malloc.Allocator)
426426

427427
return
428428
}
429+
430+
func (ht *StringHashMap) AllGroupHash() []uint64 {
431+
ret := make([]uint64, ht.elemCnt)
432+
for i := range ht.cells {
433+
for _, c := range ht.cells[i] {
434+
if c.Mapped != 0 {
435+
ret[c.Mapped-1] = c.HashState[0]
436+
}
437+
}
438+
}
439+
return ret
440+
}

pkg/container/types/encoding.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,3 +553,14 @@ func Uint32ToInt32(ux uint32) int32 {
553553
}
554554
return x
555555
}
556+
557+
func WriteSizeBytes(bs []byte, w io.Writer) error {
558+
sz := int32(len(bs))
559+
if _, err := w.Write(EncodeInt32(&sz)); err != nil {
560+
return err
561+
}
562+
if _, err := w.Write(bs); err != nil {
563+
return err
564+
}
565+
return nil
566+
}

pkg/sql/colexec/aggexec/approx_count.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"bytes"
1919

2020
hll "github.com/axiomhq/hyperloglog"
21+
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2122
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2223
"github.com/matrixorigin/matrixone/pkg/container/types"
2324
"github.com/matrixorigin/matrixone/pkg/container/vector"
@@ -39,10 +40,13 @@ func (exec *approxCountFixedExec[T]) GetOptResult() SplitResult {
3940

4041
func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
4142
d := exec.singleAggInfo.getEncoded()
42-
r, em, err := exec.ret.marshalToBytes()
43+
r, em, dist, err := exec.ret.marshalToBytes()
4344
if err != nil {
4445
return nil, err
4546
}
47+
if dist != nil {
48+
return nil, moerr.NewInternalErrorNoCtx("distinct should have been nil")
49+
}
4650

4751
encoded := EncodedAgg{
4852
Info: d,
@@ -63,19 +67,20 @@ func (exec *approxCountFixedExec[T]) marshal() ([]byte, error) {
6367
}
6468

6569
func (exec *approxCountFixedExec[T]) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
66-
return marshalRetAndGroupsToBuffers(
70+
return marshalRetAndGroupsToBuffer(
6771
bucketIdx, bucket, buf,
6872
&exec.ret.optSplitResult, exec.groups)
6973
}
7074

7175
func (exec *approxCountFixedExec[T]) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
72-
return marshalChunkRetAndGroupsToBuffer(
76+
return marshalChunkToBuffer(
7377
chunk, buf,
7478
&exec.ret.optSplitResult, exec.groups)
7579
}
7680

7781
func (exec *approxCountFixedExec[T]) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
78-
err := exec.ret.unmarshalFromBytes(result, empties)
82+
// distinct is nil
83+
err := exec.ret.unmarshalFromBytes(result, empties, nil)
7984
if err != nil {
8085
return err
8186
}
@@ -106,10 +111,13 @@ func (exec *approxCountVarExec) GetOptResult() SplitResult {
106111

107112
func (exec *approxCountVarExec) marshal() ([]byte, error) {
108113
d := exec.singleAggInfo.getEncoded()
109-
r, em, err := exec.ret.marshalToBytes()
114+
r, em, dist, err := exec.ret.marshalToBytes()
110115
if err != nil {
111116
return nil, err
112117
}
118+
if dist != nil {
119+
return nil, moerr.NewInternalErrorNoCtx("dist should have been nil")
120+
}
113121

114122
encoded := EncodedAgg{
115123
Info: d,
@@ -130,18 +138,18 @@ func (exec *approxCountVarExec) marshal() ([]byte, error) {
130138
}
131139

132140
func (exec *approxCountVarExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
133-
return marshalRetAndGroupsToBuffers(
141+
return marshalRetAndGroupsToBuffer(
134142
bucketIdx, bucket, buf,
135143
&exec.ret.optSplitResult, exec.groups)
136144
}
137145

138146
func (exec *approxCountVarExec) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
139-
return marshalChunkRetAndGroupsToBuffer(chunk, buf,
147+
return marshalChunkToBuffer(chunk, buf,
140148
&exec.ret.optSplitResult, exec.groups)
141149
}
142150

143151
func (exec *approxCountVarExec) unmarshal(_ *mpool.MPool, result, empties, groups [][]byte) error {
144-
err := exec.ret.unmarshalFromBytes(result, empties)
152+
err := exec.ret.unmarshalFromBytes(result, empties, nil)
145153
if err != nil {
146154
return err
147155
}
@@ -160,7 +168,7 @@ func (exec *approxCountVarExec) unmarshal(_ *mpool.MPool, result, empties, group
160168
func newApproxCountFixedExec[T types.FixedSizeTExceptStrType](mg AggMemoryManager, info singleAggInfo) AggFuncExec {
161169
return &approxCountFixedExec[T]{
162170
singleAggInfo: info,
163-
ret: initAggResultWithFixedTypeResult[uint64](mg, info.retType, false, 0),
171+
ret: initAggResultWithFixedTypeResult[uint64](mg, info.retType, false, 0, false),
164172
}
165173
}
166174

@@ -176,7 +184,7 @@ func makeApproxCount(mg AggMemoryManager, id int64, arg types.Type) AggFuncExec
176184
if info.argType.IsVarlen() {
177185
return &approxCountVarExec{
178186
singleAggInfo: info,
179-
ret: initAggResultWithFixedTypeResult[uint64](mg, info.retType, false, 0),
187+
ret: initAggResultWithFixedTypeResult[uint64](mg, info.retType, false, 0, false),
180188
}
181189
}
182190

pkg/sql/colexec/aggexec/concat.go

Lines changed: 36 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
type groupConcatExec struct {
3030
multiAggInfo
3131
ret aggResultWithBytesType
32-
distinctHash
3332

3433
separator []byte
3534
}
@@ -40,22 +39,20 @@ func (exec *groupConcatExec) GetOptResult() SplitResult {
4039

4140
func (exec *groupConcatExec) marshal() ([]byte, error) {
4241
d := exec.multiAggInfo.getEncoded()
43-
r, em, err := exec.ret.marshalToBytes()
42+
r, em, dist, err := exec.ret.marshalToBytes()
4443
if err != nil {
4544
return nil, err
4645
}
4746
encoded := EncodedAgg{
4847
Info: d,
4948
Result: r,
5049
Empties: em,
51-
Groups: [][]byte{exec.separator},
50+
// Oh, this is so f**ked.
51+
Groups: [][]byte{exec.separator},
5252
}
53-
if exec.IsDistinct() {
54-
data, err := exec.distinctHash.marshal()
55-
if err != nil {
56-
return nil, err
57-
}
58-
encoded.Groups = append(encoded.Groups, data)
53+
54+
if dist != nil {
55+
encoded.Groups = append(encoded.Groups, dist...)
5956
}
6057
return encoded.Marshal()
6158
}
@@ -64,27 +61,34 @@ func (exec *groupConcatExec) unmarshal(_ *mpool.MPool, result, empties, groups [
6461
if err := exec.SetExtraInformation(groups[0], 0); err != nil {
6562
return err
6663
}
67-
if exec.IsDistinct() {
68-
if len(groups) > 1 {
69-
if err := exec.distinctHash.unmarshal(groups[1]); err != nil {
70-
return err
71-
}
72-
}
73-
}
74-
return exec.ret.unmarshalFromBytes(result, empties)
64+
return exec.ret.unmarshalFromBytes(result, empties, groups[1:])
7565
}
7666

7767
func (exec *groupConcatExec) SaveIntermediateResult(bucketIdx []int64, bucket int64, buf *bytes.Buffer) error {
78-
return marshalRetAndGroupsAndDistinctHashToBuffers[dummyBinaryMarshaler](
68+
err := marshalRetAndGroupsToBuffer[dummyBinaryMarshaler](
7969
bucketIdx, bucket, buf,
80-
&exec.ret.optSplitResult, nil,
81-
exec.IsDistinct(), &exec.distinctHash)
70+
&exec.ret.optSplitResult, nil)
71+
if err != nil {
72+
return err
73+
}
74+
75+
if err = types.WriteSizeBytes(exec.separator, buf); err != nil {
76+
return err
77+
}
78+
return nil
8279
}
8380

8481
func (exec *groupConcatExec) SaveIntermediateResultOfChunk(chunk int, buf *bytes.Buffer) error {
85-
return marshalChunkToBuffer[dummyBinaryMarshaler](chunk, buf,
86-
&exec.ret.optSplitResult, nil,
87-
exec.IsDistinct(), &exec.distinctHash)
82+
err := marshalChunkToBuffer[dummyBinaryMarshaler](chunk, buf,
83+
&exec.ret.optSplitResult, nil)
84+
if err != nil {
85+
return err
86+
}
87+
88+
if err = types.WriteSizeBytes(exec.separator, buf); err != nil {
89+
return err
90+
}
91+
return nil
8892
}
8993

9094
func GroupConcatReturnType(args []types.Type) types.Type {
@@ -99,12 +103,9 @@ func GroupConcatReturnType(args []types.Type) types.Type {
99103
func newGroupConcatExec(mg AggMemoryManager, info multiAggInfo, separator string) AggFuncExec {
100104
exec := &groupConcatExec{
101105
multiAggInfo: info,
102-
ret: initAggResultWithBytesTypeResult(mg, info.retType, info.emptyNull, ""),
106+
ret: initAggResultWithBytesTypeResult(mg, info.retType, info.emptyNull, "", info.distinct),
103107
separator: []byte(separator),
104108
}
105-
if info.distinct {
106-
exec.distinctHash = newDistinctHash()
107-
}
108109
return exec
109110
}
110111

@@ -116,11 +117,6 @@ func isValidGroupConcatUnit(value []byte) error {
116117
}
117118

118119
func (exec *groupConcatExec) GroupGrow(more int) error {
119-
if exec.IsDistinct() {
120-
if err := exec.distinctHash.grows(more); err != nil {
121-
return err
122-
}
123-
}
124120
return exec.ret.grows(more)
125121
}
126122

@@ -137,14 +133,13 @@ func (exec *groupConcatExec) Fill(groupIndex int, row int, vectors []*vector.Vec
137133
}
138134
}
139135

140-
if exec.IsDistinct() {
141-
if need, err := exec.distinctHash.fill(groupIndex, vectors, row); err != nil || !need {
142-
return err
143-
}
144-
}
145-
146136
x, y := exec.ret.updateNextAccessIdx(groupIndex)
147137
exec.ret.setGroupNotEmpty(x, y)
138+
139+
if need, err := exec.ret.distinctFill(x, y, vectors, row); err != nil || !need {
140+
return err
141+
}
142+
148143
r := exec.ret.get()
149144
if len(r) > 0 {
150145
r = append(r, exec.separator...)
@@ -189,7 +184,8 @@ func (exec *groupConcatExec) SetExtraInformation(partialResult any, _ int) error
189184
func (exec *groupConcatExec) merge(other *groupConcatExec, idx1, idx2 int) error {
190185
x1, y1 := exec.ret.updateNextAccessIdx(idx1)
191186
x2, y2 := other.ret.updateNextAccessIdx(idx2)
192-
if err := exec.distinctHash.merge(&other.distinctHash); err != nil {
187+
188+
if err := exec.ret.distinctMerge(x1, &other.ret.optSplitResult, x2); err != nil {
193189
return err
194190
}
195191
empty1, empty2 := exec.ret.isGroupEmpty(x1, y1), other.ret.isGroupEmpty(x2, y2)
@@ -230,12 +226,11 @@ func (exec *groupConcatExec) Flush() ([]*vector.Vector, error) {
230226
}
231227

232228
func (exec *groupConcatExec) Free() {
233-
exec.distinctHash.free()
234229
exec.ret.free()
235230
}
236231

237232
func (exec *groupConcatExec) Size() int64 {
238-
return exec.ret.Size() + exec.distinctHash.Size() + int64(cap(exec.separator))
233+
return exec.ret.Size() + int64(cap(exec.separator))
239234
}
240235

241236
var GroupConcatUnsupportedTypes = []types.T{

0 commit comments

Comments
 (0)