基于 Daft 引擎构建的多模态数据处理低代码平台,提供 Web SQL 工作台、Schema 浏览器、函数文档、AI UDF 等功能。
| 模块 | 说明 |
|---|---|
| SQL 工作台 | Web 端 SQL 编辑器,支持自动补全、语法高亮、执行历史 |
| Schema 浏览器 | Catalog → Schema → Table 三级树形元数据浏览 |
| 函数文档 | 219+ 内置函数分类查看、搜索、自动补全 |
| AI UDF | 16 个预置 AI 函数:图像识别、OCR、情感分析、PII 脱敏等 |
| 元数据服务 | 支持 Apache Gravitino(分布式)和 SQLite(本地降级)双后端 |
| DAG 编排 | 可视化数据处理流程编排(实验性) |
daft-platform/ # 项目根目录
├── README.md
├── requirements.txt
├── Makefile # make dev / make test / make run
├── .gitignore
└── daft_platform/ # Python 包
├── app.py # FastAPI 应用入口
├── config.py # 配置管理(环境变量)
├── models/ # Pydantic 请求/响应模型
│ ├── sql.py # SQL 执行相关
│ ├── datasource.py # 数据源/元数据相关
│ └── function.py # 函数注册表相关
├── routers/ # API 路由层
│ ├── sql.py # /api/sql/*
│ ├── datasource.py # /api/datasource/*
│ ├── function.py # /api/function/*
│ └── settings.py # /api/settings/*
├── services/ # 业务逻辑层
│ ├── session_manager.py # Daft Session 单例管理
│ ├── sql_executor.py # SQL 执行 + 错误翻译
│ ├── sql_history.py # 执行历史(SQLite)
│ ├── function_registry.py # 函数注册表
│ ├── builtin_udfs.py # 16 个预置 AI UDF
│ ├── metadata_service.py # 元数据抽象接口 + 工厂函数
│ ├── gravitino_metadata.py # Gravitino REST API 实现
│ └── local_metadata.py # SQLite 本地降级实现
├── templates/ # Jinja2 前端页面
│ ├── index.html # SQL 工作台主页
│ └── dag.html # DAG 编排页
├── static/ # 静态资源(JS/CSS)
├── tests/ # 单元测试(pytest)
├── test_data/ # 测试用图片(dog_*.jpg)
├── test_data.csv # 测试用 CSV
├── deploy/ # 部署配置
│ └── gravitino-pod.yaml # Gravitino K8s 部署
└── data/ # 运行时数据(.gitignore)
- 后端: Python 3.10+, FastAPI, Uvicorn, Pydantic v2
- 计算引擎: Daft (Rust + Python)
- 元数据: Apache Gravitino / SQLite (降级)
- 前端: Jinja2 模板 + 原生 JavaScript
- 测试: pytest
- Python 3.10+
- Daft 引擎(需从源码编译)
# 编译安装 Daft 引擎
git clone https://github.com/Eventual-Inc/Daft.git && cd Daft
make .venv && make build
source .venv/bin/activategit clone https://github.com/fightBoxing/daft-platform.git
cd daft-platform
pip install -r requirements.txt
# 可选:AI UDF 额外依赖
pip install easyocr # OCR 文字识别
pip install imagehash # 图像感知哈希
pip install face_recognition # 人脸检测(或 opencv-python)# 开发模式(自动重载)
make dev
# 生产模式
make run
# 或直接用 uvicorn
uvicorn daft_platform.app:app --host 0.0.0.0 --port 8080启动后访问:
- SQL 工作台: http://localhost:8080
- DAG 编排: http://localhost:8080/dag
- 健康检查: http://localhost:8080/health
- API 文档: http://localhost:8080/docs
所有配置通过环境变量管理,前缀 DAFT_PLATFORM_:
| 环境变量 | 默认值 | 说明 |
|---|---|---|
DAFT_PLATFORM_DEBUG |
true |
调试模式 |
DAFT_PLATFORM_MAX_PREVIEW_ROWS |
50 |
预览最大行数 |
DAFT_PLATFORM_MAX_RESULT_ROWS |
10000 |
查询结果最大行数 |
DAFT_PLATFORM_SQL_TIMEOUT_SECONDS |
300 |
SQL 执行超时(秒) |
DAFT_PLATFORM_GRAVITINO_ENABLED |
true |
是否启用 Gravitino 元数据服务 |
DAFT_PLATFORM_GRAVITINO_URI |
http://localhost:30090 |
Gravitino 服务地址 |
DAFT_PLATFORM_GRAVITINO_METALAKE |
daft_platform |
Gravitino metalake 名称 |
DAFT_PLATFORM_GRAVITINO_TIMEOUT |
3 |
Gravitino 连接超时(秒) |
AI UDF(如 classify_image、text_summary)需要 LLM API:
| 环境变量 | 说明 |
|---|---|
OPENAI_API_KEY |
OpenAI / 兼容 API 密钥 |
OPENAI_BASE_URL |
API 地址(默认 https://api.openai.com/v1) |
OPENAI_MODEL |
模型名(默认 gpt-4o-mini) |
也可在 Web 界面右上角 ⚙️ 设置中配置,保存到本地数据库。
启动时自动检测 Gravitino 可用性:
- Gravitino 可用 → 使用 Gravitino REST API 管理元数据
- Gravitino 不可用 → 自动降级到本地 SQLite(零外部依赖)
禁用 Gravitino 直接使用 SQLite:
export DAFT_PLATFORM_GRAVITINO_ENABLED=false# K8s 部署
kubectl apply -f daft_platform/deploy/gravitino-pod.yaml
# Docker 部署
docker run -d --name gravitino -p 30090:8090 apache/gravitino:latest# 执行 SQL
curl -X POST http://localhost:8080/api/sql/execute \
-H 'Content-Type: application/json' \
-d '{"sql": "SELECT * FROM my_table LIMIT 10", "limit": 1000}'
# SQL 安全验证
curl -X POST http://localhost:8080/api/sql/validate \
-H 'Content-Type: application/json' \
-d '{"sql": "SELECT 1"}'
# 查看执行历史
curl http://localhost:8080/api/sql/history?limit=10# 注册 JSON 数据表
curl -X POST http://localhost:8080/api/datasource/register \
-H 'Content-Type: application/json' \
-d '{
"name": "users",
"source_type": "json",
"data": {"id": [1, 2, 3], "name": ["Alice", "Bob", "Carol"]}
}'
# 注册文件表(CSV/Parquet/JSON)
curl -X POST http://localhost:8080/api/datasource/register \
-H 'Content-Type: application/json' \
-d '{"name": "sales", "source_type": "file", "path": "/data/sales.parquet"}'
# 注册 HuggingFace 数据集
curl -X POST http://localhost:8080/api/datasource/register \
-H 'Content-Type: application/json' \
-d '{"name": "mnist", "source_type": "huggingface", "dataset": "mnist", "limit": 1000}'
# 注册 Glob 路径(批量文件)
curl -X POST http://localhost:8080/api/datasource/register \
-H 'Content-Type: application/json' \
-d '{"name": "images", "source_type": "glob", "glob_pattern": "/data/photos/**/*.jpg"}'
# 查看元数据树
curl http://localhost:8080/api/datasource/tree# 列出所有函数
curl http://localhost:8080/api/function/list
# 按分类筛选
curl http://localhost:8080/api/function/list?category=ai
# 搜索函数
curl http://localhost:8080/api/function/search?q=image
# SQL 自动补全
curl http://localhost:8080/api/function/completions?prefix=image_Daft SQL 兼容标准 SQL 语法,额外支持多模态数据处理函数。
-- 注册测试数据后查询
SELECT * FROM test_data WHERE score > 85 ORDER BY score DESC;
-- 聚合统计
SELECT category, COUNT(*) AS cnt, AVG(score) AS avg_score
FROM test_data
GROUP BY category
ORDER BY avg_score DESC;
-- CTE(公用表表达式)
WITH top_students AS (
SELECT * FROM test_data WHERE score > 90
)
SELECT category, COUNT(*) FROM top_students GROUP BY category;
-- 窗口函数
SELECT name, score,
RANK() OVER (ORDER BY score DESC) AS rank,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY score DESC) AS cat_rank
FROM test_data;SELECT lower(name), upper(city) FROM test_data;
SELECT * FROM test_data WHERE contains(city, '京');
SELECT * FROM test_data WHERE starts_with(name, '王');
SELECT length(name) AS name_len FROM test_data;
SELECT replace(city, '北京', 'Beijing') FROM test_data;
SELECT split(name, '') FROM test_data; -- 按字符拆分SELECT abs(score - 85) AS diff FROM test_data;
SELECT round(score, 0) AS rounded FROM test_data;
SELECT ceil(score), floor(score) FROM test_data;
SELECT clip(score, 80, 95) AS clipped FROM test_data;SELECT current_date(), current_timestamp();
SELECT year(to_date('2024-03-15', '%Y-%m-%d'));
SELECT date_trunc('month', current_timestamp());SELECT explode(split(name, '')) FROM test_data; -- 展开为多行
SELECT list_count(split(city, '')) FROM test_data;以下为 Daft 特色的多模态数据处理能力,可在 SQL 中直接使用。
-- 1. 注册图片数据集(Glob 方式扫描目录)
-- 在 Web 界面中:数据源 → 注册 → Glob → 路径填 /path/to/images/**/*.jpg
-- 2. 解码图片并调整尺寸
SELECT path, image_decode(path) AS img FROM images;
SELECT path, image_resize(image_decode(path), 224, 224) AS resized FROM images;
-- 3. 图像裁剪
SELECT path, image_crop(image_decode(path), 0, 0, 100, 100) AS cropped FROM images;-- 图像内容识别:调用 Vision API 返回中文描述
SELECT path, classify_image(path) AS description FROM images;
-- 图像元信息:宽高、格式、文件大小
SELECT path, image_info(path) AS info FROM images;
-- 主色调提取:返回 HEX 颜色值
SELECT path, dominant_color(path) AS color FROM images;
-- 图像感知哈希:用于去重/相似度比较
SELECT path, image_phash(path) AS phash FROM images;
-- 人脸检测:返回人脸数量
SELECT path, face_count(path) AS faces FROM images;
-- EXIF 元数据:相机型号、GPS、拍摄时间等
SELECT path, exif_extract(path) AS exif FROM images;
-- OCR 文字识别(支持中英文)
SELECT path, ocr_extract(path) AS text FROM document_images;-- 假设已注册 images 表(Glob: /data/photos/**/*.jpg)
-- 分析每张图片:识别内容 + 提取信息 + 主色调 + 人脸
SELECT
path,
classify_image(path) AS description,
image_info(path) AS info,
dominant_color(path) AS color,
face_count(path) AS faces,
image_phash(path) AS phash
FROM images
LIMIT 20;
-- 筛选包含人脸的图片
SELECT path, face_count(path) AS faces
FROM images
WHERE face_count(path) > 0;
-- 图像去重(基于感知哈希)
SELECT phash, COUNT(*) AS cnt, MIN(path) AS sample
FROM (SELECT path, image_phash(path) AS phash FROM images)
GROUP BY phash
HAVING cnt > 1;-- 情感分析:返回 positive / negative / neutral
SELECT name, sentiment(name) AS mood FROM test_data;
-- 语言检测:返回 zh / en
SELECT name, detect_language(name) AS lang FROM test_data;
-- 关键词提取(TF 频率统计)
SELECT extract_keywords(description) AS keywords FROM articles;
-- 文本摘要(调用 LLM,50 字以内)
SELECT text_summary(content) AS summary FROM articles;
-- PII 脱敏(自动识别手机号/身份证/邮箱/银行卡)
SELECT mask_pii('联系方式: 13812345678, 邮箱: test@example.com') AS masked;
-- 输出: "联系方式: 138****5678, 邮箱: te***@example.com"
-- 文本相似度(Jaccard bigram)
SELECT text_similarity('你好世界', '你好中国') AS similarity;-- 音频时长(秒,支持 mp3/wav/flac)
SELECT path, audio_duration(path) AS duration_sec FROM audio_files;
-- 文件 MD5 哈希
SELECT path, file_md5(path) AS md5 FROM files;
-- 文件转 Base64
SELECT path, to_base64(path) AS b64 FROM files;-- 余弦相似度(用于 Embedding 比较)
SELECT cosine_similarity(emb1, emb2) AS sim FROM embedding_pairs;
-- 欧氏距离
SELECT euclidean_distance(emb1, emb2) AS dist FROM embedding_pairs;
-- 点积
SELECT dot_product(emb1, emb2) AS dp FROM embedding_pairs;-- 从 URL 下载内容
SELECT url_download(url_col) AS content FROM web_sources;
-- 解析 URL 各部分
SELECT url_parse(url_col) AS parsed FROM web_sources;
-- 猜测文件 MIME 类型
SELECT guess_mime_type(path) AS mime FROM files;| 函数名 | 输入 | 输出 | 说明 | 额外依赖 |
|---|---|---|---|---|
classify_image(path) |
文件路径 | String | LLM Vision API 图像识别 | LLM API Key |
image_info(path) |
文件路径 | Struct | 宽高/格式/文件大小 | Pillow |
dominant_color(path) |
文件路径 | String | 主色调 HEX 值 | Pillow |
image_phash(path) |
文件路径 | String | 感知哈希(去重用) | imagehash / 降级 MD5 |
face_count(path) |
文件路径 | Int32 | 人脸数量 | face_recognition / OpenCV |
exif_extract(path) |
文件路径 | String | EXIF 元数据 JSON | Pillow |
ocr_extract(path) |
文件路径 | String | OCR 文字识别 | easyocr / pytesseract |
sentiment(text) |
文本 | String | 情感分析 | 无 |
detect_language(text) |
文本 | String | 语言检测 (zh/en) | 无 |
extract_keywords(text) |
文本 | String | 关键词提取 | 无 |
text_summary(text) |
文本 | String | LLM 文本摘要 | LLM API Key |
mask_pii(text) |
文本 | String | PII 脱敏 | 无 |
text_similarity(a, b) |
两段文本 | Float64 | Jaccard 相似度 | 无 |
audio_duration(path) |
文件路径 | Float64 | 音频时长(秒) | mutagen / wave |
file_md5(path) |
文件路径 | String | 文件 MD5 哈希 | 无 |
to_base64(path) |
文件路径 | String | 文件转 Base64 | 无 |
标记 "无" 的函数无需额外安装,开箱即用。
# 运行全部测试
make test
# 运行单个测试文件
python -m pytest daft_platform/tests/test_sql_executor.py -v
# 运行单个测试方法
python -m pytest daft_platform/tests/test_routers.py::test_register_and_list_table -v测试覆盖:
| 测试文件 | 覆盖模块 |
|---|---|
test_models.py |
Pydantic 模型验证、health 端点 |
test_sql_executor.py |
SQL 安全验证、错误翻译 |
test_sql_history.py |
执行历史 CRUD |
test_function_registry.py |
函数列表/搜索/补全 |
test_metadata_service.py |
LocalMetadataService 完整 CRUD |
test_session_manager.py |
Session 单例、表注册 |
test_routers.py |
API 路由集成测试 |
执行前自动拦截危险操作:
DROP TABLE → ❌ 拦截
DELETE FROM → ❌ 拦截
TRUNCATE → ❌ 拦截
ALTER TABLE → ❌ 拦截
SELECT / WITH → ✅ 允许
Daft 引擎的 Rust 层错误自动翻译为中文友好提示:
| 原始错误 | 翻译 | 建议 |
|---|---|---|
column 'foo' not found |
列 'foo' 不存在于当前表中 | 请检查列名拼写 |
table 'bar' not found |
表 'bar' 不存在 | 请先注册该表 |
Type mismatch |
数据类型不匹配 | 使用 CAST() 转换 |
Division by zero |
除零错误 | 添加 CASE WHEN 判断 |
Out of memory |
内存不足 | 添加 LIMIT 限制 |
详见 KNOWN_ISSUES.md,包含开发过程中的踩坑记录和解决方案。
本项目基于 Daft 引擎开发,遵循 Apache 2.0 License。