Skip to content

Commit 7c8399c

Browse files
committed
Refactor database query handling in connectors
- Removed unnecessary transaction handling in ClickHouse, MSSQL, and Snowflake connectors by directly using the database connection for queries.
1 parent b67f1d0 commit 7c8399c

File tree

3 files changed

+8
-75
lines changed

3 files changed

+8
-75
lines changed

connectors/clickhouse/connector.go

Lines changed: 3 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import (
1010

1111
"github.com/centralmind/gateway/connectors"
1212

13-
"database/sql"
14-
1513
_ "github.com/ClickHouse/clickhouse-go/v2"
1614
"github.com/centralmind/gateway/castx"
1715
"github.com/centralmind/gateway/model"
@@ -124,15 +122,7 @@ func (c Connector) Config() connectors.Config {
124122
}
125123

126124
func (c Connector) Sample(ctx context.Context, table model.Table) ([]map[string]any, error) {
127-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
128-
ReadOnly: true,
129-
})
130-
if err != nil {
131-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
132-
}
133-
defer tx.Commit()
134-
135-
rows, err := tx.NamedQuery(fmt.Sprintf("SELECT * FROM %s LIMIT 5", table.Name), map[string]any{})
125+
rows, err := c.db.NamedQuery(fmt.Sprintf("SELECT * FROM %s LIMIT 5", table.Name), map[string]any{})
136126
if err != nil {
137127
return nil, xerrors.Errorf("unable to query db: %w", err)
138128
}
@@ -228,15 +218,7 @@ func (c Connector) Query(ctx context.Context, endpoint model.Endpoint, params ma
228218
return nil, xerrors.Errorf("unable to process params: %w", err)
229219
}
230220

231-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
232-
ReadOnly: c.Config().Readonly(),
233-
})
234-
if err != nil {
235-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
236-
}
237-
defer tx.Commit()
238-
239-
rows, err := tx.NamedQuery(endpoint.Query, processed)
221+
rows, err := c.db.NamedQuery(endpoint.Query, processed)
240222
if err != nil {
241223
return nil, xerrors.Errorf("unable to query db: %w", err)
242224
}
@@ -254,15 +236,7 @@ func (c Connector) Query(ctx context.Context, endpoint model.Endpoint, params ma
254236
}
255237

256238
func (c Connector) LoadsColumns(ctx context.Context, tableName string) ([]model.ColumnSchema, error) {
257-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
258-
ReadOnly: true,
259-
})
260-
if err != nil {
261-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
262-
}
263-
defer tx.Commit()
264-
265-
rows, err := tx.QueryContext(
239+
rows, err := c.db.QueryContext(
266240
ctx,
267241
`SELECT
268242
name,

connectors/mssql/connector.go

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package mssql
22

33
import (
44
"context"
5-
"database/sql"
65
"fmt"
76
"strconv"
87
"strings"
@@ -98,15 +97,7 @@ func (c Connector) Sample(ctx context.Context, table model.Table) ([]map[string]
9897
// Create schema-qualified table name
9998
qualifiedTableName := fmt.Sprintf("[%s].[%s]", schema, table.Name)
10099

101-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
102-
ReadOnly: true,
103-
})
104-
if err != nil {
105-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
106-
}
107-
defer tx.Commit()
108-
109-
rows, err := tx.NamedQuery(fmt.Sprintf("SELECT TOP 5 * FROM %s", qualifiedTableName), map[string]any{})
100+
rows, err := c.db.NamedQuery(fmt.Sprintf("SELECT TOP 5 * FROM %s", qualifiedTableName), map[string]any{})
110101
if err != nil {
111102
return nil, xerrors.Errorf("unable to query db: %w", err)
112103
}
@@ -259,15 +250,7 @@ func (c Connector) Query(ctx context.Context, endpoint model.Endpoint, params ma
259250
}
260251
}
261252

262-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
263-
ReadOnly: c.Config().Readonly(),
264-
})
265-
if err != nil {
266-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
267-
}
268-
defer tx.Commit()
269-
270-
rows, err := tx.NamedQuery(endpoint.Query, processed)
253+
rows, err := c.db.NamedQuery(endpoint.Query, processed)
271254
if err != nil {
272255
return nil, xerrors.Errorf("unable to query db: %w", err)
273256
}
@@ -290,15 +273,7 @@ func (c Connector) LoadsColumns(ctx context.Context, tableName string) ([]model.
290273
schema = c.config.Schema
291274
}
292275

293-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
294-
ReadOnly: true,
295-
})
296-
if err != nil {
297-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
298-
}
299-
defer tx.Commit()
300-
301-
rows, err := tx.QueryContext(
276+
rows, err := c.db.QueryContext(
302277
ctx,
303278
`SELECT
304279
c.COLUMN_NAME,

connectors/snowflake/connector.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -223,15 +223,7 @@ func (c Connector) Query(ctx context.Context, endpoint model.Endpoint, params ma
223223
return nil, xerrors.Errorf("unable to process params: %w", err)
224224
}
225225

226-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
227-
ReadOnly: c.Config().Readonly(),
228-
})
229-
if err != nil {
230-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
231-
}
232-
defer tx.Commit()
233-
234-
rows, err := tx.NamedQuery(endpoint.Query, processed)
226+
rows, err := c.db.NamedQuery(endpoint.Query, processed)
235227
if err != nil {
236228
return nil, xerrors.Errorf("unable to query db: %w", err)
237229
}
@@ -249,15 +241,7 @@ func (c Connector) Query(ctx context.Context, endpoint model.Endpoint, params ma
249241
}
250242

251243
func (c Connector) LoadsColumns(ctx context.Context, tableName string) ([]model.ColumnSchema, error) {
252-
tx, err := c.db.BeginTxx(ctx, &sql.TxOptions{
253-
ReadOnly: true,
254-
})
255-
if err != nil {
256-
return nil, xerrors.Errorf("BeginTx failed with error: %w", err)
257-
}
258-
defer tx.Commit()
259-
260-
rows, err := tx.QueryContext(
244+
rows, err := c.db.QueryContext(
261245
ctx,
262246
`SELECT
263247
c.COLUMN_NAME,

0 commit comments

Comments
 (0)