Skip to content

Commit 7699ba5

Browse files
committed
Add concurrent result sets in db.Query.Query(...)
1 parent b47b59e commit 7699ba5

File tree

8 files changed

+197
-5
lines changed

8 files changed

+197
-5
lines changed

internal/query/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,11 +401,12 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
401401
if err != nil {
402402
return xerrors.WithStackTrace(err)
403403
}
404+
404405
defer func() {
405406
_ = streamResult.Close(ctx)
406407
}()
407408

408-
r, err = resultToMaterializedResult(ctx, streamResult)
409+
r, err = concurrentResultToMaterializedResult(ctx, streamResult)
409410
if err != nil {
410411
return xerrors.WithStackTrace(err)
411412
}

internal/query/client_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,24 @@ func TestClient(t *testing.T) {
922922
Status: Ydb.StatusIds_SUCCESS,
923923
ResultSetIndex: 0,
924924
ResultSet: &Ydb.ResultSet{
925+
Columns: []*Ydb.Column{
926+
{
927+
Name: "a",
928+
Type: &Ydb.Type{
929+
Type: &Ydb.Type_TypeId{
930+
TypeId: Ydb.Type_UINT64,
931+
},
932+
},
933+
},
934+
{
935+
Name: "b",
936+
Type: &Ydb.Type{
937+
Type: &Ydb.Type_TypeId{
938+
TypeId: Ydb.Type_UTF8,
939+
},
940+
},
941+
},
942+
},
925943
Rows: []*Ydb.Value{
926944
{
927945
Items: []*Ydb.Value{{

internal/query/execute_query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func executeQueryRequest(sessionID, q string, cfg executeSettings) (
9393
},
9494
Parameters: params,
9595
StatsMode: Ydb_Query.StatsMode(cfg.StatsMode()),
96-
ConcurrentResultSets: cfg.ConcurrentResultSets(),
96+
ConcurrentResultSets: false,
9797
PoolId: cfg.ResourcePool(),
9898
ResponsePartLimitBytes: cfg.ResponsePartLimitSizeBytes(),
9999
}

internal/query/result.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stats"
16+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
1617
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
1718
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
1819
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -341,6 +342,33 @@ func (r *streamResult) nextPartFunc(
341342
}
342343
}
343344

345+
func (r *streamResult) NextPart(ctx context.Context) (_ result.Part, err error) {
346+
if r.lastPart == nil {
347+
return nil, xerrors.WithStackTrace(io.EOF)
348+
}
349+
350+
select {
351+
case <-r.closer.Done():
352+
return nil, xerrors.WithStackTrace(r.closer.Err())
353+
case <-ctx.Done():
354+
return nil, xerrors.WithStackTrace(ctx.Err())
355+
default:
356+
part, err := r.nextPart(ctx)
357+
if err != nil && !xerrors.Is(err, io.EOF) {
358+
return nil, xerrors.WithStackTrace(err)
359+
}
360+
if part.GetExecStats() != nil && r.statsCallback != nil {
361+
r.statsCallback(stats.FromQueryStats(part.GetExecStats()))
362+
}
363+
defer func() {
364+
r.lastPart = part
365+
r.resultSetIndex = part.GetResultSetIndex()
366+
}()
367+
368+
return newResultPart(r.lastPart), nil
369+
}
370+
}
371+
344372
func (r *streamResult) NextResultSet(ctx context.Context) (_ result.Set, err error) {
345373
if r.trace != nil {
346374
onDone := trace.QueryOnResultNextResultSet(r.trace, &ctx,
@@ -459,3 +487,58 @@ func resultToMaterializedResult(ctx context.Context, r result.Result) (result.Re
459487
resultSets: resultSets,
460488
}, nil
461489
}
490+
491+
func concurrentResultToMaterializedResult(ctx context.Context, r result.ConcurrentResult) (result.Result, error) {
492+
type resultSet struct {
493+
rows []query.Row
494+
columnNames []string
495+
columnTypes []types.Type
496+
}
497+
resultSetByIndex := make(map[int64]resultSet)
498+
499+
for {
500+
if ctx.Err() != nil {
501+
return nil, xerrors.WithStackTrace(ctx.Err())
502+
}
503+
504+
part, err := r.NextPart(ctx)
505+
if err != nil {
506+
if xerrors.Is(err, io.EOF) {
507+
break
508+
}
509+
510+
return nil, xerrors.WithStackTrace(err)
511+
}
512+
513+
rs := resultSetByIndex[part.ResultSetIndex()]
514+
if len(rs.columnNames) == 0 {
515+
rs.columnTypes = part.ColumnTypes()
516+
rs.columnNames = part.ColumnNames()
517+
}
518+
519+
rows := make([]query.Row, 0)
520+
for {
521+
row, err := part.NextRow(ctx)
522+
if err != nil {
523+
if xerrors.Is(err, io.EOF) {
524+
break
525+
}
526+
527+
return nil, xerrors.WithStackTrace(err)
528+
}
529+
rows = append(rows, row)
530+
}
531+
rs.rows = append(rs.rows, rows...)
532+
533+
resultSetByIndex[part.ResultSetIndex()] = rs
534+
}
535+
536+
resultSets := make([]result.Set, len(resultSetByIndex))
537+
for rsIndex, rs := range resultSetByIndex {
538+
resultSets[rsIndex] = MaterializedResultSet(int(rsIndex), rs.columnNames, rs.columnTypes, rs.rows)
539+
}
540+
541+
return &materializedResult{
542+
resultSets: resultSets,
543+
}, nil
544+
}

internal/query/result/result.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ type (
2121
// with Go version 1.23+
2222
ResultSets(ctx context.Context) xiter.Seq2[Set, error]
2323
}
24+
ConcurrentResult interface {
25+
closer.Closer
26+
27+
NextPart(ctx context.Context) (Part, error)
28+
}
2429
Set interface {
2530
Index() int
2631
Columns() []string
@@ -34,6 +39,13 @@ type (
3439
Set
3540
closer.Closer
3641
}
42+
Part interface {
43+
ResultSetIndex() int64
44+
ColumnNames() []string
45+
ColumnTypes() []types.Type
46+
47+
NextRow(ctx context.Context) (Row, error)
48+
}
3749
Row interface {
3850
Values() []value.Value
3951

internal/query/result_part.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package query
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
8+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/types"
10+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
11+
"github.com/ydb-platform/ydb-go-sdk/v3/query"
12+
)
13+
14+
var (
15+
_ query.Part = (*resultPart)(nil)
16+
)
17+
18+
type (
19+
resultPart struct {
20+
resultSetIndex int64
21+
columns []*Ydb.Column
22+
rows []*Ydb.Value
23+
columnNames []string
24+
columnTypes []types.Type
25+
rowIndex int
26+
}
27+
)
28+
29+
func (p *resultPart) ResultSetIndex() int64 {
30+
return p.resultSetIndex
31+
}
32+
33+
func (p *resultPart) ColumnNames() []string {
34+
if len(p.columnNames) != 0 {
35+
return p.columnNames
36+
}
37+
names := make([]string, len(p.columns))
38+
for i, col := range p.columns {
39+
names[i] = col.GetName()
40+
}
41+
p.columnNames = names
42+
return names
43+
}
44+
45+
func (p *resultPart) ColumnTypes() []types.Type {
46+
if len(p.columnTypes) != 0 {
47+
return p.columnTypes
48+
}
49+
colTypes := make([]types.Type, len(p.columns))
50+
for i, col := range p.columns {
51+
colTypes[i] = types.TypeFromYDB(col.GetType())
52+
}
53+
p.columnTypes = colTypes
54+
return colTypes
55+
}
56+
57+
func (p *resultPart) NextRow(ctx context.Context) (query.Row, error) {
58+
if p.rowIndex == len(p.rows) {
59+
return nil, xerrors.WithStackTrace(io.EOF)
60+
}
61+
62+
defer func() {
63+
p.rowIndex++
64+
}()
65+
66+
return NewRow(p.columns, p.rows[p.rowIndex]), nil
67+
}
68+
69+
func newResultPart(part *Ydb_Query.ExecuteQueryResponsePart) *resultPart {
70+
return &resultPart{
71+
resultSetIndex: part.GetResultSetIndex(),
72+
columns: part.GetResultSet().GetColumns(),
73+
rows: part.GetResultSet().GetRows(),
74+
rowIndex: 0,
75+
}
76+
}

query/execute_options.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,9 @@ func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {
6363
return options.WithResponsePartLimitSizeBytes(size)
6464
}
6565

66-
//func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
67-
// return options.WithConcurrentResultSets(isEnabled)
68-
//}
66+
func WithConcurrentResultSets(isEnabled bool) ExecuteOption {
67+
return options.WithConcurrentResultSets(isEnabled)
68+
}
6969

7070
func WithCallOptions(opts ...grpc.CallOption) ExecuteOption {
7171
return options.WithCallOptions(opts...)

query/result.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@ import (
88

99
type (
1010
Result = result.Result
11+
ConcurrentResult = result.ConcurrentResult
1112
ResultSet = result.Set
1213
ClosableResultSet = result.ClosableResultSet
14+
Part = result.Part
1315
Row = result.Row
1416
Type = types.Type
1517
NamedDestination = scanner.NamedDestination

0 commit comments

Comments
 (0)