diff --git a/conn.go b/conn.go index d36bc82d..d1e80dc1 100644 --- a/conn.go +++ b/conn.go @@ -683,6 +683,7 @@ func (c *conn) ResetSession(_ context.Context) error { c.readOnlyStaleness = spanner.TimestampBound{} c.execOptions = ExecOptions{ DecodeToNativeArrays: c.connector.connectorConfig.DecodeToNativeArrays, + PreloadRowsInMemory: c.connector.connectorConfig.PreloadRowsInMemory, } return nil } @@ -816,6 +817,9 @@ func (c *conn) queryContext(ctx context.Context, query string, execOptions ExecO return nil, err } } + if execOptions.PreloadRowsInMemory { + iter = newPrefetchRowIterator(ctx, iter) + } res := createRows(iter, execOptions) if execOptions.DirectExecuteQuery { // This call to res.getColumns() triggers the execution of the statement, as it needs to fetch the metadata. diff --git a/driver.go b/driver.go index 559d23a6..725b6da9 100644 --- a/driver.go +++ b/driver.go @@ -207,6 +207,13 @@ type ExecOptions struct { // the execution. Set this flag to true to execute the query directly when // [sql.DB.QueryContext] is called. DirectExecuteQuery bool + + // PreloadRowsInMemory determines whether result rows should be prefetched + // into memory asynchronously upon query start, instead of waiting for calls + // to [sql.Rows.Next]. This can reduce latency when iterating rows later at the + // cost of increased memory usage. Enable via connection params or set on the + // connection before executing a query. + PreloadRowsInMemory bool } type DecodeOption int @@ -321,6 +328,10 @@ type ConnectorConfig struct { // See ExecOptions.DecodeToNativeArrays for more information. DecodeToNativeArrays bool + // PreloadRowsInMemory determines whether connections should, by default, + // preload query rows in memory. Can be overridden per statement via ExecOptions. + PreloadRowsInMemory bool + logger *slog.Logger name string @@ -549,6 +560,11 @@ func createConnector(d *Driver, connectorConfig ConnectorConfig) (*connector, er connectorConfig.DecodeToNativeArrays = val } } + if strval, ok := connectorConfig.Params[strings.ToLower("PreloadRowsInMemory")]; ok { + if val, err := strconv.ParseBool(strval); err == nil { + connectorConfig.PreloadRowsInMemory = val + } + } if strval, ok := connectorConfig.Params[strings.ToLower("AutoConfigEmulator")]; ok { if val, err := strconv.ParseBool(strval); err == nil { connectorConfig.AutoConfigEmulator = val diff --git a/prefetch_row_iterator.go b/prefetch_row_iterator.go new file mode 100644 index 00000000..df45d2e3 --- /dev/null +++ b/prefetch_row_iterator.go @@ -0,0 +1,100 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package spannerdriver + +import ( + "context" + + "cloud.google.com/go/spanner" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + "google.golang.org/api/iterator" +) + +// prefetchRowIterator wraps a rowIterator and eagerly fetches rows in a +// background goroutine, buffering them in memory so calls to Next() do not +// block on I/O. +type prefetchRowIterator struct { + u rowIterator + + items chan prefetchItem + done chan struct{} +} + +type prefetchItem struct { + row *spanner.Row + err error +} + +func newPrefetchRowIterator(ctx context.Context, u rowIterator) rowIterator { + it := &prefetchRowIterator{ + u: u, + items: make(chan prefetchItem, 64), + done: make(chan struct{}), + } + go it.runPrefetch(ctx) + return it +} + +func (p *prefetchRowIterator) runPrefetch(ctx context.Context) { + defer close(p.items) + for { + select { + case <-p.done: + return + default: + } + + row, err := p.u.Next() + + select { + case <-p.done: + return + case p.items <- prefetchItem{row: row, err: err}: + } + + if err != nil { + return + } + } +} + +func (p *prefetchRowIterator) Next() (*spanner.Row, error) { + it, ok := <-p.items + if !ok { + return nil, iterator.Done + } + if it.err != nil { + return nil, it.err + } + return it.row, nil +} + +func (p *prefetchRowIterator) Stop() { + select { + case <-p.done: + // already stopped + default: + close(p.done) + } + p.u.Stop() +} + +func (p *prefetchRowIterator) Metadata() (*sppb.ResultSetMetadata, error) { + return p.u.Metadata() +} + +func (p *prefetchRowIterator) ResultSetStats() *sppb.ResultSetStats { + return p.u.ResultSetStats() +}