Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ func (c *conn) ResetSession(_ context.Context) error {
c.readOnlyStaleness = spanner.TimestampBound{}
c.execOptions = ExecOptions{
DecodeToNativeArrays: c.connector.connectorConfig.DecodeToNativeArrays,
PreloadRowsInMemory: c.connector.connectorConfig.PreloadRowsInMemory,
}
return nil
}
Expand Down Expand Up @@ -816,6 +817,9 @@ func (c *conn) queryContext(ctx context.Context, query string, execOptions ExecO
return nil, err
}
}
if execOptions.PreloadRowsInMemory {
iter = newPrefetchRowIterator(ctx, iter)
}
res := createRows(iter, execOptions)
if execOptions.DirectExecuteQuery {
// This call to res.getColumns() triggers the execution of the statement, as it needs to fetch the metadata.
Expand Down
16 changes: 16 additions & 0 deletions driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,13 @@ type ExecOptions struct {
// the execution. Set this flag to true to execute the query directly when
// [sql.DB.QueryContext] is called.
DirectExecuteQuery bool

// PreloadRowsInMemory determines whether result rows should be prefetched
// into memory asynchronously upon query start, instead of waiting for calls
// to [sql.Rows.Next]. This can reduce latency when iterating rows later at the
// cost of increased memory usage. Enable via connection params or set on the
// connection before executing a query.
PreloadRowsInMemory bool
}

type DecodeOption int
Expand Down Expand Up @@ -321,6 +328,10 @@ type ConnectorConfig struct {
// See ExecOptions.DecodeToNativeArrays for more information.
DecodeToNativeArrays bool

// PreloadRowsInMemory determines whether connections should, by default,
// preload query rows in memory. Can be overridden per statement via ExecOptions.
PreloadRowsInMemory bool

logger *slog.Logger
name string

Expand Down Expand Up @@ -549,6 +560,11 @@ func createConnector(d *Driver, connectorConfig ConnectorConfig) (*connector, er
connectorConfig.DecodeToNativeArrays = val
}
}
if strval, ok := connectorConfig.Params[strings.ToLower("PreloadRowsInMemory")]; ok {
if val, err := strconv.ParseBool(strval); err == nil {
connectorConfig.PreloadRowsInMemory = val
}
}
if strval, ok := connectorConfig.Params[strings.ToLower("AutoConfigEmulator")]; ok {
if val, err := strconv.ParseBool(strval); err == nil {
connectorConfig.AutoConfigEmulator = val
Expand Down
100 changes: 100 additions & 0 deletions prefetch_row_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package spannerdriver

import (
"context"

"cloud.google.com/go/spanner"
sppb "cloud.google.com/go/spanner/apiv1/spannerpb"
"google.golang.org/api/iterator"
)

// prefetchRowIterator wraps a rowIterator and eagerly fetches rows in a
// background goroutine, buffering them in memory so calls to Next() do not
// block on I/O.
type prefetchRowIterator struct {
u rowIterator

items chan prefetchItem
done chan struct{}
}

type prefetchItem struct {
row *spanner.Row
err error
}

func newPrefetchRowIterator(ctx context.Context, u rowIterator) rowIterator {
it := &prefetchRowIterator{
u: u,
items: make(chan prefetchItem, 64),
done: make(chan struct{}),
}
go it.runPrefetch(ctx)
return it
}

func (p *prefetchRowIterator) runPrefetch(ctx context.Context) {
defer close(p.items)
for {
select {
case <-p.done:
return
default:
}

row, err := p.u.Next()

select {
case <-p.done:
return
case p.items <- prefetchItem{row: row, err: err}:
}

if err != nil {
return
}
}
}

func (p *prefetchRowIterator) Next() (*spanner.Row, error) {
it, ok := <-p.items
if !ok {
return nil, iterator.Done
}
if it.err != nil {
return nil, it.err
}
return it.row, nil
}

func (p *prefetchRowIterator) Stop() {
select {
case <-p.done:
// already stopped
default:
close(p.done)
}
p.u.Stop()
}

func (p *prefetchRowIterator) Metadata() (*sppb.ResultSetMetadata, error) {
return p.u.Metadata()
}

func (p *prefetchRowIterator) ResultSetStats() *sppb.ResultSetStats {
return p.u.ResultSetStats()
}
Loading