Skip to content

Commit 54c01cb

Browse files
committed
AggSpill 3: Save
Now we have something that compiles, but there will be tons of bugs ...
1 parent 10134dd commit 54c01cb

39 files changed

+1501
-2757
lines changed

pkg/container/batch/batch.go

Lines changed: 69 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ import (
1818
"bytes"
1919
"context"
2020
"fmt"
21+
"io"
2122

2223
"github.com/matrixorigin/matrixone/pkg/common/bitmap"
2324
"github.com/matrixorigin/matrixone/pkg/common/moerr"
2425
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2526
"github.com/matrixorigin/matrixone/pkg/container/types"
2627
"github.com/matrixorigin/matrixone/pkg/container/vector"
27-
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
2828
)
2929

3030
func New(attrs []string) *Batch {
@@ -145,37 +145,9 @@ func (bat *Batch) MarshalBinaryWithBuffer(w *bytes.Buffer) ([]byte, error) {
145145
}
146146
}
147147

148-
// ExtraBuf1
149-
size := int32(len(bat.ExtraBuf1))
150-
w.Write(types.EncodeInt32(&size))
151-
if size > 0 {
152-
w.Write(bat.ExtraBuf1)
153-
}
154-
155-
// ExtraBuf2
156-
size = int32(len(bat.ExtraBuf2))
157-
w.Write(types.EncodeInt32(&size))
158-
if size > 0 {
159-
w.Write(bat.ExtraBuf2)
160-
}
161-
162-
// AggInfos
163-
aggInfos := make([][]byte, len(bat.Aggs))
164-
for i, exec := range bat.Aggs {
165-
data, err := aggexec.MarshalAggFuncExec(exec)
166-
if err != nil {
167-
return nil, err
168-
}
169-
aggInfos[i] = data
170-
}
171-
172-
l = int32(len(aggInfos))
173-
w.Write(types.EncodeInt32(&l))
174-
for i := 0; i < int(l); i++ {
175-
size := int32(len(aggInfos[i]))
176-
w.Write(types.EncodeInt32(&size))
177-
w.Write(aggInfos[i])
178-
}
148+
// ExtraBuf1 and ExtraBuf2
149+
types.WriteSizeBytes(bat.ExtraBuf1, w)
150+
types.WriteSizeBytes(bat.ExtraBuf2, w)
179151

180152
w.Write(types.EncodeInt32(&bat.Recursive))
181153
w.Write(types.EncodeInt32(&bat.ShuffleIDX))
@@ -247,33 +219,75 @@ func (bat *Batch) UnmarshalBinaryWithAnyMp(data []byte, mp *mpool.MPool) (err er
247219
bat.ExtraBuf2 = append(bat.ExtraBuf2, data[:l]...)
248220
data = data[l:]
249221

250-
l = types.DecodeInt32(data[:4])
251-
aggs := make([][]byte, l)
252-
253-
data = data[4:]
254-
for i := 0; i < int(l); i++ {
255-
size := types.DecodeInt32(data[:4])
256-
data = data[4:]
257-
aggs[i] = data[:size]
258-
data = data[size:]
259-
}
260-
261222
bat.Recursive = types.DecodeInt32(data[:4])
262223
data = data[4:]
263224
bat.ShuffleIDX = types.DecodeInt32(data[:4])
225+
return nil
226+
}
264227

265-
if len(aggs) > 0 {
266-
bat.Aggs = make([]aggexec.AggFuncExec, len(aggs))
267-
var aggMemoryManager aggexec.AggMemoryManager = nil
268-
if mp != nil {
269-
aggMemoryManager = aggexec.NewSimpleAggMemoryManager(mp)
228+
func (bat *Batch) UnmarshalFromReader(r io.Reader, mp *mpool.MPool) (err error) {
229+
i64, err := types.ReadInt64(r)
230+
if err != nil {
231+
return err
232+
}
233+
bat.rowCount = int(i64)
234+
235+
l, err := types.ReadInt32AsInt(r)
236+
if l != len(bat.Vecs) {
237+
if len(bat.Vecs) > 0 {
238+
bat.Clean(mp)
270239
}
271-
for i, info := range aggs {
272-
if bat.Aggs[i], err = aggexec.UnmarshalAggFuncExec(aggMemoryManager, info); err != nil {
273-
return err
240+
bat.Vecs = make([]*vector.Vector, l)
241+
for i := range bat.Vecs {
242+
if bat.offHeap {
243+
bat.Vecs[i] = vector.NewOffHeapVec()
244+
} else {
245+
bat.Vecs[i] = vector.NewVecFromReuse()
274246
}
275247
}
276248
}
249+
vecs := bat.Vecs
250+
251+
for i := 0; i < l; i++ {
252+
_, bs, err := types.ReadSizeBytes(r, nil, false)
253+
if err != nil {
254+
return err
255+
}
256+
if err := vecs[i].UnmarshalWithReader(bytes.NewReader(bs), nil); err != nil {
257+
return err
258+
}
259+
}
260+
261+
l, err = types.ReadInt32AsInt(r)
262+
if err != nil {
263+
return err
264+
}
265+
if l != len(bat.Attrs) {
266+
bat.Attrs = make([]string, l)
267+
}
268+
269+
for i := 0; i < int(l); i++ {
270+
_, bs, err := types.ReadSizeBytes(r, nil, false)
271+
if err != nil {
272+
return err
273+
}
274+
bat.Attrs[i] = string(bs)
275+
}
276+
277+
// ExtraBuf1
278+
if _, bat.ExtraBuf1, err = types.ReadSizeBytes(r, nil, false); err != nil {
279+
return err
280+
}
281+
if _, bat.ExtraBuf2, err = types.ReadSizeBytes(r, nil, false); err != nil {
282+
return err
283+
}
284+
285+
if bat.Recursive, err = types.ReadInt32(r); err != nil {
286+
return err
287+
}
288+
if bat.ShuffleIDX, err = types.ReadInt32(r); err != nil {
289+
return err
290+
}
277291
return nil
278292
}
279293

@@ -437,14 +451,11 @@ func (bat *Batch) Clean(m *mpool.MPool) {
437451
vec.Free(m)
438452
}
439453
}
440-
for _, agg := range bat.Aggs {
441-
if agg != nil {
442-
agg.Free()
443-
}
444-
}
445-
bat.Aggs = nil
454+
446455
bat.Vecs = nil
447456
bat.Attrs = nil
457+
bat.ExtraBuf1 = nil
458+
bat.ExtraBuf2 = nil
448459
bat.SetRowCount(0)
449460
}
450461

@@ -639,7 +650,7 @@ func (bat *Batch) ReplaceVector(oldVec *vector.Vector, newVec *vector.Vector, st
639650
}
640651

641652
func (bat *Batch) IsEmpty() bool {
642-
return bat.rowCount == 0 && len(bat.Aggs) == 0
653+
return bat.rowCount == 0
643654
}
644655

645656
func (bat *Batch) IsDone() bool {

pkg/container/batch/batch_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,6 @@ func newBatch(ts []types.Type, rows int) *Batch {
149149
bat.ExtraBuf2 = []byte("extra buf 2")
150150

151151
aggexec.RegisterGroupConcatAgg(0, ",")
152-
agg0, _ := aggexec.MakeAgg(aggexec.NewSimpleAggMemoryManager(mp), 0, false, []types.Type{types.T_varchar.ToType()}...)
153-
bat.Aggs = []aggexec.AggFuncExec{agg0}
154152
bat.Attrs = []string{"1"}
155153
return bat
156154
}

pkg/container/batch/types.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package batch
1616

1717
import (
1818
"github.com/matrixorigin/matrixone/pkg/container/vector"
19-
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
2019
)
2120

2221
// special batch that will never been free.
@@ -52,12 +51,6 @@ type Batch struct {
5251
// Vecs col data
5352
Vecs []*vector.Vector
5453

55-
// We really want to put all data through vectors, but, the
56-
// Aggs is so f**king insane, so keep it, and add two extra
57-
// buffers for sane persons to use.
58-
//
59-
// XXX MUST REMOVE THIS
60-
Aggs []aggexec.AggFuncExec
6154
ExtraBuf1 []byte
6255
ExtraBuf2 []byte
6356

pkg/container/pSpool/buffer.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
package pSpool
1616

1717
import (
18+
"sync"
19+
1820
"github.com/matrixorigin/matrixone/pkg/common/mpool"
1921
"github.com/matrixorigin/matrixone/pkg/container/batch"
2022
"github.com/matrixorigin/matrixone/pkg/container/vector"
21-
"sync"
2223
)
2324

2425
type spoolBuffer struct {
@@ -78,12 +79,6 @@ func (b *spoolBuffer) putCacheID(mp *mpool.MPool, id uint32, bat *batch.Batch) {
7879
bat.Attrs = bat.Attrs[:0]
7980
bat.SetRowCount(0)
8081

81-
// we won't reuse the aggregation's memories now.
82-
for i := range bat.Aggs {
83-
bat.Aggs[i].Free()
84-
}
85-
bat.Aggs = nil
86-
8782
// put id into free list.
8883
b.Lock()
8984
b.readyToUse = b.readyToUse[:len(b.readyToUse)+1]

pkg/container/pSpool/copy.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
package pSpool
1616

1717
import (
18+
"math"
19+
1820
"github.com/matrixorigin/matrixone/pkg/common/mpool"
1921
"github.com/matrixorigin/matrixone/pkg/container/batch"
2022
"github.com/matrixorigin/matrixone/pkg/container/vector"
21-
"math"
2223
)
2324

2425
// cachedBatch is just like the cachedVectorPool in the original code,
@@ -131,9 +132,6 @@ func (cb *cachedBatch) GetCopiedBatch(
131132
}
132133
}
133134

134-
dst.Aggs = src.Aggs
135-
src.Aggs = nil
136-
137135
// set row count.
138136
dst.SetRowCount(src.RowCount())
139137

pkg/container/types/encoding.go

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"unsafe"
2626

2727
"github.com/matrixorigin/matrixone/pkg/common/moerr"
28+
"github.com/matrixorigin/matrixone/pkg/common/mpool"
2829
"github.com/matrixorigin/matrixone/pkg/common/util"
2930
"github.com/matrixorigin/matrixone/pkg/container/bytejson"
3031
)
@@ -559,8 +560,100 @@ func WriteSizeBytes(bs []byte, w io.Writer) error {
559560
if _, err := w.Write(EncodeInt32(&sz)); err != nil {
560561
return err
561562
}
562-
if _, err := w.Write(bs); err != nil {
563-
return err
563+
if sz > 0 {
564+
if _, err := w.Write(bs); err != nil {
565+
return err
566+
}
567+
}
568+
return nil
569+
}
570+
571+
func ReadInt64(r io.Reader) (int64, error) {
572+
buf := make([]byte, 8)
573+
if _, err := io.ReadFull(r, buf); err != nil {
574+
return 0, err
564575
}
576+
return DecodeInt64(buf), nil
577+
}
578+
579+
func WriteInt64(w io.Writer, v int64) error {
580+
w.Write(EncodeInt64(&v))
565581
return nil
566582
}
583+
584+
func ReadBool(r io.Reader) (bool, error) {
585+
buf := make([]byte, 1)
586+
if _, err := io.ReadFull(r, buf); err != nil {
587+
return false, err
588+
}
589+
return DecodeBool(buf), nil
590+
}
591+
592+
func ReadInt32(r io.Reader) (int32, error) {
593+
buf := make([]byte, 4)
594+
if _, err := io.ReadFull(r, buf); err != nil {
595+
return 0, err
596+
}
597+
return DecodeInt32(buf), nil
598+
}
599+
600+
func WriteInt32(w io.Writer, v int32) error {
601+
w.Write(EncodeInt32(&v))
602+
return nil
603+
}
604+
605+
func ReadInt32AsInt(r io.Reader) (int, error) {
606+
buf := make([]byte, 4)
607+
if _, err := io.ReadFull(r, buf); err != nil {
608+
return 0, err
609+
}
610+
return int(DecodeInt32(buf)), nil
611+
}
612+
613+
func ReadByte(r io.Reader) (byte, error) {
614+
buf := make([]byte, 1)
615+
if _, err := io.ReadFull(r, buf); err != nil {
616+
return 0, err
617+
}
618+
return buf[0], nil
619+
}
620+
621+
func ReadByteAsInt(r io.Reader) (int, error) {
622+
buf := make([]byte, 1)
623+
if _, err := io.ReadFull(r, buf); err != nil {
624+
return 0, err
625+
}
626+
return int(buf[0]), nil
627+
}
628+
629+
func ReadType(r io.Reader) (Type, error) {
630+
buf := make([]byte, TSize)
631+
if _, err := io.ReadFull(r, buf); err != nil {
632+
return Type{}, err
633+
}
634+
return DecodeType(buf), nil
635+
}
636+
637+
func ReadSizeBytes(r io.Reader, mp *mpool.MPool, offHeap bool) (int32, []byte, error) {
638+
sz, err := ReadInt32(r)
639+
if err != nil {
640+
return 0, nil, err
641+
}
642+
if sz > 0 {
643+
var bs []byte
644+
if mp != nil {
645+
bs, err = mp.Alloc(int(sz), offHeap)
646+
if err != nil {
647+
return 0, nil, err
648+
}
649+
} else {
650+
bs = make([]byte, sz)
651+
}
652+
653+
if _, err := io.ReadFull(r, bs); err != nil {
654+
return 0, nil, err
655+
}
656+
return sz, bs, nil
657+
}
658+
return sz, nil, nil
659+
}

0 commit comments

Comments
 (0)