Skip to content
Open
4 changes: 4 additions & 0 deletions pkg/common/hashmap/inthashmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,3 +370,7 @@ func (m *IntHashMap) UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (int

return n, nil
}

func (m *IntHashMap) AllGroupHash() []uint64 {
return m.hashMap.AllGroupHash()
}
4 changes: 4 additions & 0 deletions pkg/common/hashmap/strhashmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,3 +461,7 @@ func (m *StrHashMap) UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (int

return n, nil
}

func (m *StrHashMap) AllGroupHash() []uint64 {
return m.hashMap.AllGroupHash()
}
2 changes: 2 additions & 0 deletions pkg/common/hashmap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ type HashMap interface {
WriteTo(w io.Writer) (int64, error)
// UnmarshalFrom deserializes a byte slice from a reader.
UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (int64, error)
// Get all (group, hashCode) pairs
AllGroupHash() []uint64
}

// Iterator allows users to do insert or find operations on hash tables in bulk.
Expand Down
181 changes: 88 additions & 93 deletions pkg/container/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (
"bytes"
"context"
"fmt"
"io"

"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
)

func New(attrs []string) *Batch {
Expand Down Expand Up @@ -107,63 +107,16 @@ func (bat *Batch) Slice(from, to int) *Batch {
}

func (bat *Batch) MarshalBinary() ([]byte, error) {
// --------------------------------------------------------------------
// | len | Zs... | len | Vecs... | len | Attrs... | len | AggInfos... |
// --------------------------------------------------------------------
var w bytes.Buffer
return bat.MarshalBinaryWithBuffer(&w, false)
}

// row count.
rl := int64(bat.rowCount)
w.Write(types.EncodeInt64(&rl))

// Vecs
l := int32(len(bat.Vecs))
w.Write(types.EncodeInt32(&l))
for i := 0; i < int(l); i++ {
data, err := bat.Vecs[i].MarshalBinary()
if err != nil {
return nil, err
}
size := int32(len(data))
w.Write(types.EncodeInt32(&size))
w.Write(data)
}

// Attrs
l = int32(len(bat.Attrs))
w.Write(types.EncodeInt32(&l))
for i := 0; i < int(l); i++ {
size := int32(len(bat.Attrs[i]))
w.Write(types.EncodeInt32(&size))
w.WriteString(bat.Attrs[i])
}

// AggInfos
aggInfos := make([][]byte, len(bat.Aggs))
for i, exec := range bat.Aggs {
data, err := aggexec.MarshalAggFuncExec(exec)
if err != nil {
return nil, err
}
aggInfos[i] = data
}

l = int32(len(aggInfos))
w.Write(types.EncodeInt32(&l))
for i := 0; i < int(l); i++ {
size := int32(len(aggInfos[i]))
w.Write(types.EncodeInt32(&size))
w.Write(aggInfos[i])
func (bat *Batch) MarshalBinaryWithBuffer(w *bytes.Buffer, reset bool) ([]byte, error) {
// reset the buffer if caller wants to.
if reset {
w.Reset()
}

w.Write(types.EncodeInt32(&bat.Recursive))
w.Write(types.EncodeInt32(&bat.ShuffleIDX))

return w.Bytes(), nil
}

func (bat *Batch) MarshalBinaryWithBuffer(w *bytes.Buffer) ([]byte, error) {
w.Reset()
// row count.
rl := int64(bat.rowCount)
w.Write(types.EncodeInt64(&rl))
Expand Down Expand Up @@ -196,23 +149,9 @@ func (bat *Batch) MarshalBinaryWithBuffer(w *bytes.Buffer) ([]byte, error) {
}
}

// AggInfos
aggInfos := make([][]byte, len(bat.Aggs))
for i, exec := range bat.Aggs {
data, err := aggexec.MarshalAggFuncExec(exec)
if err != nil {
return nil, err
}
aggInfos[i] = data
}

l = int32(len(aggInfos))
w.Write(types.EncodeInt32(&l))
for i := 0; i < int(l); i++ {
size := int32(len(aggInfos[i]))
w.Write(types.EncodeInt32(&size))
w.Write(aggInfos[i])
}
// ExtraBuf1 and ExtraBuf2
types.WriteSizeBytes(bat.ExtraBuf1, w)
types.WriteSizeBytes(bat.ExtraBuf2, w)

w.Write(types.EncodeInt32(&bat.Recursive))
w.Write(types.EncodeInt32(&bat.ShuffleIDX))
Expand Down Expand Up @@ -270,33 +209,92 @@ func (bat *Batch) UnmarshalBinaryWithAnyMp(data []byte, mp *mpool.MPool) (err er
data = data[size:]
}

// ExtraBuf1
l = types.DecodeInt32(data[:4])
aggs := make([][]byte, l)
data = data[4:]
bat.ExtraBuf1 = nil
bat.ExtraBuf1 = append(bat.ExtraBuf1, data[:l]...)
data = data[l:]

// ExtraBuf2
l = types.DecodeInt32(data[:4])
data = data[4:]
for i := 0; i < int(l); i++ {
size := types.DecodeInt32(data[:4])
data = data[4:]
aggs[i] = data[:size]
data = data[size:]
}
bat.ExtraBuf2 = nil
bat.ExtraBuf2 = append(bat.ExtraBuf2, data[:l]...)
data = data[l:]

bat.Recursive = types.DecodeInt32(data[:4])
data = data[4:]
bat.ShuffleIDX = types.DecodeInt32(data[:4])
return nil
}

if len(aggs) > 0 {
bat.Aggs = make([]aggexec.AggFuncExec, len(aggs))
var aggMemoryManager aggexec.AggMemoryManager = nil
if mp != nil {
aggMemoryManager = aggexec.NewSimpleAggMemoryManager(mp)
func (bat *Batch) UnmarshalFromReader(r io.Reader, mp *mpool.MPool) (err error) {
i64, err := types.ReadInt64(r)
if err != nil {
return err
}
bat.rowCount = int(i64)

l, err := types.ReadInt32AsInt(r)
if err != nil {
return err
}
if l != len(bat.Vecs) {
if len(bat.Vecs) > 0 {
bat.Clean(mp)
}
for i, info := range aggs {
if bat.Aggs[i], err = aggexec.UnmarshalAggFuncExec(aggMemoryManager, info); err != nil {
return err
bat.Vecs = make([]*vector.Vector, l)
for i := range bat.Vecs {
if bat.offHeap {
bat.Vecs[i] = vector.NewOffHeapVec()
} else {
bat.Vecs[i] = vector.NewVecFromReuse()
}
}
}
vecs := bat.Vecs

for i := 0; i < l; i++ {
_, bs, err := types.ReadSizeBytes(r, nil, false)
if err != nil {
return err
}
if err := vecs[i].UnmarshalWithReader(bytes.NewReader(bs), mp); err != nil {
return err
}
}

l, err = types.ReadInt32AsInt(r)
if err != nil {
return err
}
if l != len(bat.Attrs) {
bat.Attrs = make([]string, l)
}

for i := 0; i < int(l); i++ {
_, bs, err := types.ReadSizeBytes(r, nil, false)
if err != nil {
return err
}
bat.Attrs[i] = string(bs)
}

// ExtraBuf1
if _, bat.ExtraBuf1, err = types.ReadSizeBytes(r, nil, false); err != nil {
return err
}
if _, bat.ExtraBuf2, err = types.ReadSizeBytes(r, nil, false); err != nil {
return err
}

if bat.Recursive, err = types.ReadInt32(r); err != nil {
return err
}
if bat.ShuffleIDX, err = types.ReadInt32(r); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -460,14 +458,11 @@ func (bat *Batch) Clean(m *mpool.MPool) {
vec.Free(m)
}
}
for _, agg := range bat.Aggs {
if agg != nil {
agg.Free()
}
}
bat.Aggs = nil

bat.Vecs = nil
bat.Attrs = nil
bat.ExtraBuf1 = nil
bat.ExtraBuf2 = nil
bat.SetRowCount(0)
}

Expand Down Expand Up @@ -662,7 +657,7 @@ func (bat *Batch) ReplaceVector(oldVec *vector.Vector, newVec *vector.Vector, st
}

func (bat *Batch) IsEmpty() bool {
return bat.rowCount == 0 && len(bat.Aggs) == 0
return bat.rowCount == 0
}

func (bat *Batch) IsDone() bool {
Expand Down
22 changes: 18 additions & 4 deletions pkg/container/batch/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package batch

import (
"bytes"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
"testing"

"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"

"github.com/matrixorigin/matrixone/pkg/common/mpool"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/container/vector"
Expand Down Expand Up @@ -53,14 +54,18 @@ func TestBatchMarshalAndUnmarshal(t *testing.T) {
rbat := new(Batch)
err = rbat.UnmarshalBinary(data)
require.NoError(t, err)

require.Equal(t, tc.bat.ExtraBuf1, rbat.ExtraBuf1)
require.Equal(t, tc.bat.ExtraBuf2, rbat.ExtraBuf2)

for i, vec := range rbat.Vecs {
require.Equal(t, vector.MustFixedColWithTypeCheck[int8](tc.bat.Vecs[i]), vector.MustFixedColWithTypeCheck[int8](vec))
}
}

var buf bytes.Buffer
for _, tc := range tcs {
data, err := tc.bat.MarshalBinaryWithBuffer(&buf)
data, err := tc.bat.MarshalBinaryWithBuffer(&buf, true)
require.NoError(t, err)

rbat := new(Batch)
Expand All @@ -69,6 +74,14 @@ func TestBatchMarshalAndUnmarshal(t *testing.T) {
for i, vec := range rbat.Vecs {
require.Equal(t, vector.MustFixedColWithTypeCheck[int8](tc.bat.Vecs[i]), vector.MustFixedColWithTypeCheck[int8](vec))
}

reader := bytes.NewReader(data)
rbat = new(Batch)
err = rbat.UnmarshalFromReader(reader, nil)
require.NoError(t, err)
for i, vec := range rbat.Vecs {
require.Equal(t, vector.MustFixedColWithTypeCheck[int8](tc.bat.Vecs[i]), vector.MustFixedColWithTypeCheck[int8](vec))
}
}
}

Expand Down Expand Up @@ -140,9 +153,10 @@ func newBatch(ts []types.Type, rows int) *Batch {
}
}

bat.ExtraBuf1 = []byte("extra buf 1")
bat.ExtraBuf2 = []byte("extra buf 2")

aggexec.RegisterGroupConcatAgg(0, ",")
agg0, _ := aggexec.MakeAgg(aggexec.NewSimpleAggMemoryManager(mp), 0, false, []types.Type{types.T_varchar.ToType()}...)
bat.Aggs = []aggexec.AggFuncExec{agg0}
bat.Attrs = []string{"1"}
return bat
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/container/batch/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package batch

import (
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/sql/colexec/aggexec"
)

// special batch that will never been free.
Expand Down Expand Up @@ -52,10 +51,10 @@ type Batch struct {
// Vecs col data
Vecs []*vector.Vector

Aggs []aggexec.AggFuncExec
ExtraBuf1 []byte
ExtraBuf2 []byte

// row count of batch, to instead of old len(Zs).
rowCount int

offHeap bool
offHeap bool
}
12 changes: 12 additions & 0 deletions pkg/container/hashtable/int64_hash_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,3 +419,15 @@ func (ht *Int64HashMap) UnmarshalFrom(r io.Reader, allocator malloc.Allocator) (

return
}

func (ht *Int64HashMap) AllGroupHash() []uint64 {
ret := make([]uint64, ht.elemCnt)
for i := range ht.cells {
for _, c := range ht.cells[i] {
if c.Mapped != 0 {
ret[c.Mapped-1] = c.Key
}
}
}
return ret
}
Loading
Loading