Skip to content

Commit f3975d2

Browse files
feat(wren-engine): add Apache Doris connector support (Canner#1430)
Co-authored-by: Jax Liu <liugs963@gmail.com>
1 parent 8eed811 commit f3975d2

23 files changed

Lines changed: 1135 additions & 4 deletions

File tree

ibis-server/README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,75 @@ docker compose down -v
142142
```
143143

144144

145+
### Running Doris Tests Locally
146+
Doris-related tests require a running Apache Doris instance.
147+
Our GitHub CI already handles this automatically, but you must start Doris manually when running tests locally.
148+
149+
Prerequisites
150+
151+
- Docker & Docker Compose
152+
- Python dependencies installed (`just install`)
153+
- `pymysql` installed in the dev environment (already included in dev dependencies)
154+
155+
#### Config Doris Cluster
156+
157+
1. Start the Doris Container
158+
159+
From the `ibis-server` directory:
160+
```bash
161+
cd tests/routers/v3/connector/doris
162+
docker compose up -d
163+
```
164+
165+
The container uses `apache/doris:4.0.3-all-slim` (all-in-one image with FE + BE).
166+
167+
> ⚠️ The all-in-one Doris image requires sufficient memory (at least 8 GB recommended).
168+
> If you see `MEM_ALLOC_FAILED` errors, increase Docker's memory limit.
169+
170+
Wait until Doris is healthy. Check the status:
171+
```bash
172+
mysql -h 127.0.0.1 -P 9030 -uroot -e "SHOW BACKENDS\G" | grep "Alive"
173+
# Alive: true
174+
```
175+
176+
2. Update Connection Info (if needed)
177+
178+
The default connection in `tests/routers/v3/connector/doris/conftest.py`:
179+
```python
180+
DORIS_HOST = "127.0.0.1"
181+
DORIS_PORT = 9030
182+
DORIS_USER = "root"
183+
DORIS_PASSWORD = ""
184+
```
185+
186+
187+
Adjust these values if your Doris instance has different credentials.
188+
189+
If you already have a remote Doris cluster, update the connection constants in `conftest.py`:
190+
```python
191+
DORIS_HOST = "<your-doris-host>"
192+
DORIS_PORT = 9030
193+
DORIS_USER = "<user>"
194+
DORIS_PASSWORD = "<password>"
195+
```
196+
197+
#### Run Doris Tests
198+
199+
Go back to the `ibis-server` directory and run:
200+
```bash
201+
just test doris
202+
```
203+
204+
⚠️ Doris tests will fail if the Doris instance is not reachable.
205+
206+
#### Cleanup (Local Docker)
207+
208+
After tests finish:
209+
```bash
210+
cd tests/routers/v3/connector/doris
211+
docker compose down -v
212+
```
213+
145214

146215
### Start with Python Interactive Mode
147216
Install the dependencies
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
# ruff: noqa: F401
22

3+
from app.custom_sqlglot.dialects.doris import Doris
34
from app.custom_sqlglot.dialects.mysql import MySQL
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from sqlglot import exp
2+
from sqlglot.dialects import Doris as OriginalDoris
3+
4+
5+
class Doris(OriginalDoris):
6+
class Generator(OriginalDoris.Generator):
7+
TYPE_MAPPING = {
8+
**OriginalDoris.Generator.TYPE_MAPPING,
9+
exp.DataType.Type.VARBINARY: "BINARY",
10+
}

ibis-server/app/model/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,10 @@ class QueryMySqlDTO(QueryDTO):
6767
connection_info: ConnectionUrl | MySqlConnectionInfo = connection_info_field
6868

6969

70+
class QueryDorisDTO(QueryDTO):
71+
connection_info: DorisConnectionInfo = connection_info_field
72+
73+
7074
class QueryOracleDTO(QueryDTO):
7175
connection_info: ConnectionUrl | OracleConnectionInfo = connection_info_field
7276

@@ -340,6 +344,29 @@ class MySqlConnectionInfo(BaseConnectionInfo):
340344
)
341345

342346

347+
class DorisConnectionInfo(BaseConnectionInfo):
348+
host: SecretStr = Field(
349+
description="the hostname of your Doris FE", examples=["localhost"]
350+
)
351+
port: SecretStr = Field(
352+
description="the query port of your Doris FE", examples=["9030"]
353+
)
354+
database: SecretStr = Field(
355+
description="the database name of your Doris database", examples=["default"]
356+
)
357+
user: SecretStr = Field(
358+
description="the username of your Doris database", examples=["root"]
359+
)
360+
password: SecretStr | None = Field(
361+
description="the password of your Doris database",
362+
examples=["password"],
363+
default=None,
364+
)
365+
kwargs: dict[str, str] | None = Field(
366+
description="Additional keyword arguments to pass to PyMySQL", default=None
367+
)
368+
369+
343370
class PostgresConnectionInfo(BaseConnectionInfo):
344371
host: SecretStr = Field(
345372
examples=["localhost"], description="the hostname of your database"
@@ -654,6 +681,7 @@ class GcsFileConnectionInfo(BaseConnectionInfo):
654681
| ConnectionUrl
655682
| MSSqlConnectionInfo
656683
| MySqlConnectionInfo
684+
| DorisConnectionInfo
657685
| OracleConnectionInfo
658686
| PostgresConnectionInfo
659687
| RedshiftConnectionInfo

ibis-server/app/model/connector.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ def __init__(self, data_source: DataSource, connection_info: ConnectionInfo):
105105
self._connector = DatabricksConnector(connection_info)
106106
elif data_source == DataSource.mysql:
107107
self._connector = MySqlConnector(connection_info)
108+
elif data_source == DataSource.doris:
109+
self._connector = DorisConnector(connection_info)
108110
else:
109111
self._connector = IbisConnector(data_source, connection_info)
110112

@@ -358,6 +360,43 @@ def _cast_json_columns(self, result_table: Table, col_name: str) -> Table:
358360
return result_table.mutate(**{col_name: casted_col})
359361

360362

363+
class DorisConnector(IbisConnector):
364+
"""Doris connector - reuses MySQL protocol via ibis.mysql backend.
365+
366+
Doris is an analytical database that is MySQL-protocol compatible.
367+
Autocommit is forced on in get_doris_connection() because Doris may not
368+
properly reflect the SERVER_STATUS_AUTOCOMMIT flag, which would cause
369+
ibis's raw_sql() to wrap every query in BEGIN/ROLLBACK unnecessarily.
370+
"""
371+
372+
def __init__(self, connection_info: ConnectionInfo):
373+
super().__init__(DataSource.doris, connection_info)
374+
375+
def _handle_pyarrow_unsupported_type(self, ibis_table: Table, **kwargs) -> Table:
376+
result_table = ibis_table
377+
for name, dtype in ibis_table.schema().items():
378+
if isinstance(dtype, Decimal):
379+
result_table = self._round_decimal_columns(
380+
result_table=result_table, col_name=name, **kwargs
381+
)
382+
elif isinstance(dtype, UUID):
383+
result_table = self._cast_uuid_columns(
384+
result_table=result_table, col_name=name
385+
)
386+
elif isinstance(dtype, dt.JSON):
387+
# Doris JSON columns need the same handling as MySQL
388+
result_table = self._cast_json_columns(
389+
result_table=result_table, col_name=name
390+
)
391+
392+
return result_table
393+
394+
def _cast_json_columns(self, result_table: Table, col_name: str) -> Table:
395+
col = result_table[col_name]
396+
casted_col = col.cast("string")
397+
return result_table.mutate(**{col_name: casted_col})
398+
399+
361400
class MSSqlConnector(IbisConnector):
362401
def __init__(self, connection_info: ConnectionInfo):
363402
super().__init__(DataSource.mssql, connection_info)

ibis-server/app/model/data_source.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
ConnectionUrl,
2525
DatabricksServicePrincipalConnectionInfo,
2626
DatabricksTokenConnectionInfo,
27+
DorisConnectionInfo,
2728
GcsFileConnectionInfo,
2829
LocalFileConnectionInfo,
2930
MinioFileConnectionInfo,
@@ -36,6 +37,7 @@
3637
QueryCannerDTO,
3738
QueryClickHouseDTO,
3839
QueryDatabricksDTO,
40+
QueryDorisDTO,
3941
QueryDTO,
4042
QueryDuckDBDTO,
4143
QueryGcsFileDTO,
@@ -70,6 +72,7 @@ class DataSource(StrEnum):
7072
clickhouse = auto()
7173
mssql = auto()
7274
mysql = auto()
75+
doris = auto()
7376
oracle = auto()
7477
postgres = auto()
7578
redshift = auto()
@@ -169,6 +172,8 @@ def _build_connection_info(self, data: dict) -> ConnectionInfo:
169172
return MSSqlConnectionInfo.model_validate(data)
170173
case DataSource.mysql:
171174
return MySqlConnectionInfo.model_validate(data)
175+
case DataSource.doris:
176+
return DorisConnectionInfo.model_validate(data)
172177
case DataSource.oracle:
173178
return OracleConnectionInfo.model_validate(data)
174179
case DataSource.postgres:
@@ -240,6 +245,7 @@ class DataSourceExtension(Enum):
240245
clickhouse = QueryClickHouseDTO
241246
mssql = QueryMSSqlDTO
242247
mysql = QueryMySqlDTO
248+
doris = QueryDorisDTO
243249
oracle = QueryOracleDTO
244250
postgres = QueryPostgresDTO
245251
redshift = QueryRedshiftDTO
@@ -402,6 +408,42 @@ def get_mysql_connection(cls, info: MySqlConnectionInfo) -> BaseBackend:
402408
**kwargs,
403409
)
404410

411+
@classmethod
412+
def get_doris_connection(cls, info: DorisConnectionInfo) -> BaseBackend:
413+
kwargs = {}
414+
415+
# utf8mb4 is the actual charset used by Doris (MySQL-compatible)
416+
kwargs.setdefault("charset", "utf8mb4")
417+
418+
if info.kwargs:
419+
kwargs.update(info.kwargs)
420+
# Doris is MySQL-protocol compatible, reuse ibis.mysql.connect()
421+
connection = ibis.mysql.connect(
422+
host=info.host.get_secret_value(),
423+
port=int(info.port.get_secret_value()),
424+
database=info.database.get_secret_value(),
425+
user=info.user.get_secret_value(),
426+
password=info.password.get_secret_value() if info.password else "",
427+
**kwargs,
428+
)
429+
# Doris does not properly reflect the SERVER_STATUS_AUTOCOMMIT flag
430+
# in its MySQL-protocol handshake/OK packets. As a result, the
431+
# underlying mysqlclient driver's get_autocommit() always returns
432+
# False — even after explicitly calling autocommit(True).
433+
#
434+
# ibis's raw_sql() checks get_autocommit() and, when it returns
435+
# False, wraps every query in BEGIN/ROLLBACK. Doris (an OLAP engine)
436+
# does not support transactional SELECT inside BEGIN and will reject
437+
# with: "This is in a transaction, only insert, update, delete,
438+
# commit, rollback is acceptable."
439+
#
440+
# Fix: override get_autocommit on THIS connection instance only so
441+
# that ibis skips the BEGIN/ROLLBACK wrapping. This is a per-object
442+
# attribute override — it does NOT affect the MySQLdb class, other
443+
# MySQL connections, or any other data-source driver.
444+
connection.con.get_autocommit = lambda: True
445+
return connection
446+
405447
@staticmethod
406448
def get_postgres_connection(info: PostgresConnectionInfo) -> BaseBackend:
407449
return ibis.postgres.connect(

0 commit comments

Comments
 (0)