Skip to content

Commit dcfccec

Browse files
committed
feat(core): add migrate
feat(core): add migrate feat(core): add migrate
1 parent 0316100 commit dcfccec

File tree

3 files changed

+275
-27
lines changed

3 files changed

+275
-27
lines changed

core/stores/migrate/migrate.go

Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
package migrate
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
"strings"
10+
11+
"github.com/eddieowens/opts"
12+
"github.com/golang-migrate/migrate/v4"
13+
_ "github.com/golang-migrate/migrate/v4/database/mysql"
14+
_ "github.com/golang-migrate/migrate/v4/database/pgx/v5"
15+
"github.com/golang-migrate/migrate/v4/source/file"
16+
"github.com/pkg/errors"
17+
"github.com/zeromicro/go-zero/core/logx"
18+
"github.com/zeromicro/go-zero/core/stores/sqlx"
19+
)
20+
21+
type MigrateOpts struct {
22+
PreProcessSqlFunc func(content string) string
23+
PluginsSort []string // 插件执行 sql_migration 排序
24+
CustomDir []SqlMigrationCustomDir // 自定义目录
25+
}
26+
27+
type SqlMigrationCustomDir struct {
28+
Name string `json:"name"` // 自定义目录名称 e.g. saas 则自动迁移 desc/sql_migration/saas 下的 sql 文件
29+
Enabled bool `json:"enabled"` // 是否启用
30+
}
31+
32+
func (opts MigrateOpts) DefaultOptions() MigrateOpts {
33+
return MigrateOpts{}
34+
}
35+
36+
func WithPluginsSort(pluginsSort []string) opts.Opt[MigrateOpts] {
37+
return func(opts *MigrateOpts) {
38+
opts.PluginsSort = pluginsSort
39+
}
40+
}
41+
42+
func WithCustomDir(customDir []SqlMigrationCustomDir) opts.Opt[MigrateOpts] {
43+
return func(opts *MigrateOpts) {
44+
opts.CustomDir = customDir
45+
}
46+
}
47+
48+
func WithPreProcessSqlFunc(f func(string) string) opts.Opt[MigrateOpts] {
49+
return func(opts *MigrateOpts) {
50+
opts.PreProcessSqlFunc = f
51+
}
52+
}
53+
54+
func Migrate(ctx context.Context, c sqlx.SqlConf, op ...opts.Opt[MigrateOpts]) error {
55+
ops := opts.DefaultApply(op...)
56+
57+
var (
58+
// sql_migration 路径
59+
coreSource, pluginSource, selfSource string
60+
61+
// 数据库连接字符串
62+
databaseUrl string
63+
64+
// 参数连接符号, ? or &
65+
paramConnector string
66+
)
67+
68+
switch c.DriverName {
69+
case "mysql":
70+
coreSource = "file://desc/sql_migration/core"
71+
pluginSource = "file://plugins/%s/desc/sql_migration"
72+
selfSource = "file://desc/sql_migration"
73+
databaseUrl = "mysql://" + c.DataSource
74+
case "pgx":
75+
coreSource = "file://desc/sql_migration/core/postgresql"
76+
pluginSource = "file://plugins/%s/desc/sql_migration/postgresql"
77+
selfSource = "file://desc/sql_migration/postgresql"
78+
databaseUrl = "pgx5://" + strings.TrimPrefix(c.DataSource, "postgres://")
79+
}
80+
81+
// 通过 config.Sqlx.DataSource 判断
82+
if strings.Contains(c.DataSource, "?") {
83+
paramConnector = "&"
84+
} else {
85+
paramConnector = "?"
86+
}
87+
88+
logx.WithContext(ctx).Info("start migrate core")
89+
if err := sqlMigrate(coreSource, fmt.Sprintf("%s%sx-migrations-table=%s", databaseUrl, paramConnector, "schema_migrations_core"), c, ops); err != nil {
90+
return err
91+
}
92+
93+
// 获取所有插件目录
94+
plugins, err := os.ReadDir("plugins")
95+
if err != nil {
96+
return nil
97+
}
98+
99+
// 记录已执行的插件
100+
executedPlugins := make(map[string]bool)
101+
102+
// 先按配置的 plugins_sort 顺序执行插件迁移
103+
if ops.PluginsSort != nil {
104+
for _, pluginName := range ops.PluginsSort {
105+
pluginPath := filepath.Join("plugins", pluginName)
106+
if _, err = os.Stat(pluginPath); err == nil {
107+
logx.WithContext(ctx).Infof("start migrate plugins %s", pluginName)
108+
if err = sqlMigrate(fmt.Sprintf(pluginSource, pluginName), fmt.Sprintf("%s%sx-migrations-table=%s", databaseUrl, paramConnector, "schema_migrations_plugin_"+pluginName), c, ops); err != nil {
109+
return err
110+
}
111+
executedPlugins[pluginName] = true
112+
}
113+
}
114+
}
115+
116+
// 然后执行未在 plugins_sort 中配置的其他插件
117+
for _, plugin := range plugins {
118+
if plugin.IsDir() && !executedPlugins[plugin.Name()] {
119+
logx.WithContext(ctx).Infof("start migrate plugins %s", plugin.Name())
120+
if err = sqlMigrate(fmt.Sprintf(pluginSource, plugin.Name()), fmt.Sprintf("%s%sx-migrations-table=%s", databaseUrl, paramConnector, "schema_migrations_plugin_"+plugin.Name()), c, ops); err != nil {
121+
return err
122+
}
123+
}
124+
}
125+
126+
// 执行自定义目录迁移
127+
if ops.CustomDir != nil {
128+
for _, customDir := range ops.CustomDir {
129+
if customDir.Name == "" {
130+
return errors.New("custom dir name is empty")
131+
}
132+
if customDir.Name == "core" {
133+
return errors.New("custom dir name cannot be core")
134+
}
135+
if customDir.Enabled {
136+
customPath := fmt.Sprintf("file://desc/sql_migration/%s", customDir.Name)
137+
if c.DriverName == "pgx" {
138+
customPath = fmt.Sprintf("file://desc/sql_migration/%s/postgresql", customDir.Name)
139+
}
140+
logx.WithContext(ctx).Infof("start migrate custom dir %s", customDir.Name)
141+
if err = sqlMigrate(customPath, fmt.Sprintf("%s%sx-migrations-table=%s", databaseUrl, paramConnector, "schema_migrations_custom_"+customDir.Name), c, ops); err != nil {
142+
return err
143+
}
144+
}
145+
}
146+
}
147+
148+
logx.WithContext(ctx).Info("start migrate self")
149+
if err = sqlMigrate(selfSource, databaseUrl, c, ops); err != nil {
150+
return err
151+
}
152+
153+
return nil
154+
}
155+
156+
type customFileSource struct {
157+
*file.File
158+
driverName string
159+
preProcessSqlFunc func(content string) string
160+
}
161+
162+
func (c *customFileSource) ReadUp(version uint) (r io.ReadCloser, identifier string, err error) {
163+
rc, id, err := c.File.ReadUp(version)
164+
if err != nil {
165+
return nil, "", err
166+
}
167+
168+
content, err := io.ReadAll(rc)
169+
if err != nil {
170+
return nil, "", err
171+
}
172+
173+
if err = rc.Close(); err != nil {
174+
return nil, "", err
175+
}
176+
return io.NopCloser(strings.NewReader(c.preProcessSqlFunc(string(content)))), id, nil
177+
}
178+
179+
func (c *customFileSource) ReadDown(version uint) (r io.ReadCloser, identifier string, err error) {
180+
rc, id, err := c.File.ReadDown(version)
181+
if err != nil {
182+
return nil, "", err
183+
}
184+
185+
content, err := io.ReadAll(rc)
186+
if err != nil {
187+
return nil, "", err
188+
}
189+
190+
if err = rc.Close(); err != nil {
191+
return nil, "", err
192+
}
193+
194+
modifiedContent := c.preProcessSqlFunc(string(content))
195+
return io.NopCloser(strings.NewReader(modifiedContent)), id, nil
196+
}
197+
198+
func sqlMigrate(sourceUrl, databaseUrl string, c sqlx.SqlConf, ops MigrateOpts) error {
199+
fileDriver := &file.File{}
200+
fileSource, err := fileDriver.Open(sourceUrl)
201+
if err != nil {
202+
if errors.Is(err, os.ErrNotExist) {
203+
return nil
204+
}
205+
return err
206+
}
207+
208+
customSource := &customFileSource{
209+
File: fileSource.(*file.File),
210+
driverName: c.DriverName,
211+
preProcessSqlFunc: ops.PreProcessSqlFunc,
212+
}
213+
214+
m, err := migrate.NewWithSourceInstance("file", customSource, databaseUrl)
215+
if err != nil {
216+
return err
217+
}
218+
219+
if err = m.Up(); err != nil {
220+
if errors.Is(err, migrate.ErrNoChange) {
221+
return nil
222+
}
223+
}
224+
225+
sourceErr, databaseErr := m.Close()
226+
if sourceErr != nil {
227+
return sourceErr
228+
}
229+
if databaseErr != nil {
230+
return databaseErr
231+
}
232+
return nil
233+
}

go.mod

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ require (
99
github.com/bitly/go-simplejson v0.5.1
1010
github.com/eddieowens/opts v0.1.0
1111
github.com/fsnotify/fsnotify v1.9.0
12+
github.com/golang-migrate/migrate/v4 v4.19.0
1213
github.com/gorilla/websocket v1.5.3
1314
github.com/huandu/go-sqlbuilder v1.37.0
1415
github.com/jackc/pgx/v5 v5.7.6
@@ -23,8 +24,8 @@ require (
2324
github.com/spf13/cast v1.10.0
2425
github.com/stretchr/testify v1.11.1
2526
github.com/zeromicro/go-zero v1.9.0
26-
go.opentelemetry.io/otel v1.24.0
27-
go.opentelemetry.io/otel/trace v1.24.0
27+
go.opentelemetry.io/otel v1.37.0
28+
go.opentelemetry.io/otel/trace v1.37.0
2829
golang.org/x/time v0.13.0
2930
google.golang.org/protobuf v1.36.9
3031
)
@@ -43,7 +44,7 @@ require (
4344
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
4445
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
4546
github.com/fatih/color v1.18.0 // indirect
46-
github.com/go-logr/logr v1.4.2 // indirect
47+
github.com/go-logr/logr v1.4.3 // indirect
4748
github.com/go-logr/stdr v1.2.2 // indirect
4849
github.com/go-sql-driver/mysql v1.9.0 // indirect
4950
github.com/gogo/protobuf v1.3.2 // indirect
@@ -54,6 +55,8 @@ require (
5455
github.com/grafana/pyroscope-go v1.2.4 // indirect
5556
github.com/grafana/pyroscope-go/godeltaprof v0.1.8 // indirect
5657
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
58+
github.com/hashicorp/errwrap v1.1.0 // indirect
59+
github.com/hashicorp/go-multierror v1.1.1 // indirect
5760
github.com/huandu/go-clone v1.7.3 // indirect
5861
github.com/huandu/xstrings v1.5.0 // indirect
5962
github.com/jackc/pgpassfile v1.0.0 // indirect
@@ -81,14 +84,15 @@ require (
8184
go.etcd.io/etcd/api/v3 v3.5.15 // indirect
8285
go.etcd.io/etcd/client/pkg/v3 v3.5.15 // indirect
8386
go.etcd.io/etcd/client/v3 v3.5.15 // indirect
87+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
8488
go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect
85-
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.24.0 // indirect
89+
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.29.0 // indirect
8690
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.24.0 // indirect
8791
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.24.0 // indirect
8892
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0 // indirect
8993
go.opentelemetry.io/otel/exporters/zipkin v1.24.0 // indirect
90-
go.opentelemetry.io/otel/metric v1.24.0 // indirect
91-
go.opentelemetry.io/otel/sdk v1.24.0 // indirect
94+
go.opentelemetry.io/otel/metric v1.37.0 // indirect
95+
go.opentelemetry.io/otel/sdk v1.29.0 // indirect
9296
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
9397
go.uber.org/atomic v1.10.0 // indirect
9498
go.uber.org/automaxprocs v1.6.0 // indirect
@@ -100,9 +104,9 @@ require (
100104
golang.org/x/sync v0.14.0 // indirect
101105
golang.org/x/sys v0.33.0 // indirect
102106
golang.org/x/text v0.25.0 // indirect
103-
google.golang.org/genproto/googleapis/api v0.0.0-20240711142825-46eb208f015d // indirect
104-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
105-
google.golang.org/grpc v1.65.0 // indirect
107+
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
108+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect
109+
google.golang.org/grpc v1.67.0 // indirect
106110
gopkg.in/yaml.v2 v2.4.0 // indirect
107111
gopkg.in/yaml.v3 v3.0.1 // indirect
108112
)

0 commit comments

Comments
 (0)