From f080298a7e1910eef0d8973b422834603fe30828 Mon Sep 17 00:00:00 2001 From: Nick Van Wiggeren Date: Mon, 15 Jun 2026 17:00:04 +0000 Subject: [PATCH 1/2] Avoid nil cursor on callback errors --- lib/connect_client.go | 4 +- lib/connect_client_test.go | 144 +++++++++++++++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 2 deletions(-) diff --git a/lib/connect_client.go b/lib/connect_client.go index d8b8947..4192a3f 100644 --- a/lib/connect_client.go +++ b/lib/connect_client.go @@ -321,7 +321,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam } sqlResult.Rows = append(sqlResult.Rows, row) if err := onResult(sqlResult, OpType_Delete); err != nil { - return nil, status.Error(codes.Internal, "unable to serialize row") + return tc, status.Error(codes.Internal, "unable to serialize row") } } } @@ -334,7 +334,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam After: serializeQueryResult(update.After), } if err := onUpdate(updatedRow); err != nil { - return nil, status.Error(codes.Internal, "unable to serialize update") + return tc, status.Error(codes.Internal, "unable to serialize update") } } } diff --git a/lib/connect_client_test.go b/lib/connect_client_test.go index bb31b4f..8f87d21 100644 --- a/lib/connect_client_test.go +++ b/lib/connect_client_test.go @@ -259,6 +259,150 @@ func TestRead_CanReturnNewCursorIfNewFound(t *testing.T) { assert.Equal(t, 2, cc.syncFnInvokedCount) } +func TestRead_DeleteCallbackErrorReturnsCursor(t *testing.T) { + dbl := &dbLogger{} + ped := connectClient{} + testFields := sqltypes.MakeTestFields("pid|description", "int64|varbinary") + tc := &psdbconnect.TableCursor{ + Shard: "-", + Position: "THIS_IS_A_SHARD_GTID", + Keyspace: "connect-test", + } + newTC := &psdbconnect.TableCursor{ + Shard: "-", + Position: "I_AM_FARTHER_IN_THE_BINLOG", + Keyspace: "connect-test", + } + + getCurrentVGtidClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{{Cursor: newTC}}, + } + syncClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{ + { + Cursor: newTC, + Deletes: []*psdbconnect.DeletedRow{ + { + Result: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|deleted_monitor")), + }, + }, + }, + }, + } + + cc := clientConnectionMock{ + syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { + if in.Cursor.Position == "current" { + return getCurrentVGtidClient, nil + } + return syncClient, nil + }, + } + ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { + return &cc, nil + } + getKeyspaceTableColumnsFunc := func(ctx context.Context, keyspaceName string, tableName string) ([]MysqlColumn, error) { + return []MysqlColumn{{Name: "id", Type: "bigint", IsPrimaryKey: true}, {Name: "email", Type: "varchar(256)", IsPrimaryKey: false}}, nil + } + mysqlClient := NewTestMysqlClient(getKeyspaceTableColumnsFunc) + ped.Mysql = &mysqlClient + ps := PlanetScaleSource{ + Database: "connect-test", + } + onRow := func(res *sqltypes.Result, op Operation) error { + if op == OpType_Delete { + return errors.New("serialize failed") + } + return nil + } + onCursor := func(*psdbconnect.TableCursor) error { + return nil + } + + var ( + sc *SerializedCursor + err error + ) + assert.NotPanics(t, func() { + sc, err = ped.Read(context.Background(), dbl, ps, "customers", nil, tc, onRow, onCursor, nil) + }) + assert.NoError(t, err) + esc, err := TableCursorToSerializedCursor(tc) + assert.NoError(t, err) + assert.Equal(t, esc, sc) +} + +func TestRead_UpdateCallbackErrorReturnsCursor(t *testing.T) { + dbl := &dbLogger{} + ped := connectClient{} + testFields := sqltypes.MakeTestFields("pid|description", "int64|varbinary") + tc := &psdbconnect.TableCursor{ + Shard: "-", + Position: "THIS_IS_A_SHARD_GTID", + Keyspace: "connect-test", + } + newTC := &psdbconnect.TableCursor{ + Shard: "-", + Position: "I_AM_FARTHER_IN_THE_BINLOG", + Keyspace: "connect-test", + } + + getCurrentVGtidClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{{Cursor: newTC}}, + } + syncClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{ + { + Cursor: newTC, + Updates: []*psdbconnect.UpdatedRow{ + { + Before: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|old_monitor")), + After: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|new_monitor")), + }, + }, + }, + }, + } + + cc := clientConnectionMock{ + syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { + if in.Cursor.Position == "current" { + return getCurrentVGtidClient, nil + } + return syncClient, nil + }, + } + ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { + return &cc, nil + } + getKeyspaceTableColumnsFunc := func(ctx context.Context, keyspaceName string, tableName string) ([]MysqlColumn, error) { + return []MysqlColumn{{Name: "id", Type: "bigint", IsPrimaryKey: true}, {Name: "email", Type: "varchar(256)", IsPrimaryKey: false}}, nil + } + mysqlClient := NewTestMysqlClient(getKeyspaceTableColumnsFunc) + ped.Mysql = &mysqlClient + ps := PlanetScaleSource{ + Database: "connect-test", + } + onCursor := func(*psdbconnect.TableCursor) error { + return nil + } + onUpdate := func(*UpdatedRow) error { + return errors.New("serialize failed") + } + + var ( + sc *SerializedCursor + err error + ) + assert.NotPanics(t, func() { + sc, err = ped.Read(context.Background(), dbl, ps, "customers", nil, tc, nil, onCursor, onUpdate) + }) + assert.NoError(t, err) + esc, err := TableCursorToSerializedCursor(tc) + assert.NoError(t, err) + assert.Equal(t, esc, sc) +} + func TestRead_CanStopAtWellKnownCursor(t *testing.T) { dbl := &dbLogger{} ped := connectClient{} From d71f2046c5d78a479c2ade980d85f284789e2869 Mon Sep 17 00:00:00 2001 From: Nick Van Wiggeren Date: Mon, 15 Jun 2026 20:47:44 +0000 Subject: [PATCH 2/2] Avoid advancing cursor on callback errors --- lib/connect_client.go | 15 ++++++-- lib/connect_client_test.go | 75 +++++++++++++++++++++++++++++++++++++- 2 files changed, 85 insertions(+), 5 deletions(-) diff --git a/lib/connect_client.go b/lib/connect_client.go index 4192a3f..ddb8e7b 100644 --- a/lib/connect_client.go +++ b/lib/connect_client.go @@ -18,6 +18,7 @@ import ( clientoptions "github.com/planetscale/psdb/core/pool/options" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqltypes" @@ -271,6 +272,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam if tc.LastKnownPk != nil { tc.Position = "" } + syncStartCursor := cloneTableCursor(tc) logger.Info(fmt.Sprintf("%sSyncing with cursor position : [%v], using last known PK : %v, stop cursor is : [%v]", preamble, tc.Position, tc.LastKnownPk != nil, stopPosition)) @@ -308,7 +310,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam } sqlResult.Rows = append(sqlResult.Rows, row) if err := onResult(sqlResult, OpType_Insert); err != nil { - return tc, status.Error(codes.Internal, "unable to serialize row") + return syncStartCursor, status.Error(codes.Internal, "unable to serialize row") } } } @@ -321,7 +323,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam } sqlResult.Rows = append(sqlResult.Rows, row) if err := onResult(sqlResult, OpType_Delete); err != nil { - return tc, status.Error(codes.Internal, "unable to serialize row") + return syncStartCursor, status.Error(codes.Internal, "unable to serialize row") } } } @@ -334,7 +336,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam After: serializeQueryResult(update.After), } if err := onUpdate(updatedRow); err != nil { - return tc, status.Error(codes.Internal, "unable to serialize update") + return syncStartCursor, status.Error(codes.Internal, "unable to serialize update") } } } @@ -359,6 +361,13 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam } } +func cloneTableCursor(tc *psdbconnect.TableCursor) *psdbconnect.TableCursor { + if tc == nil { + return nil + } + return proto.Clone(tc).(*psdbconnect.TableCursor) +} + func (p connectClient) filterExistingColumns(ctx context.Context, ps PlanetScaleSource, tableName string, columns []string) ([]string, error) { existingColumns := []string{} results, err := (*p.Mysql).GetKeyspaceTableColumns(ctx, ps.Database, tableName) diff --git a/lib/connect_client_test.go b/lib/connect_client_test.go index 8f87d21..99be106 100644 --- a/lib/connect_client_test.go +++ b/lib/connect_client_test.go @@ -279,8 +279,8 @@ func TestRead_DeleteCallbackErrorReturnsCursor(t *testing.T) { } syncClient := &connectSyncClientMock{ syncResponses: []*psdbconnect.SyncResponse{ + {Cursor: newTC}, { - Cursor: newTC, Deletes: []*psdbconnect.DeletedRow{ { Result: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|deleted_monitor")), @@ -332,6 +332,77 @@ func TestRead_DeleteCallbackErrorReturnsCursor(t *testing.T) { assert.Equal(t, esc, sc) } +func TestRead_InsertCallbackErrorReturnsCursor(t *testing.T) { + dbl := &dbLogger{} + ped := connectClient{} + testFields := sqltypes.MakeTestFields("pid|description", "int64|varbinary") + tc := &psdbconnect.TableCursor{ + Shard: "-", + Position: "THIS_IS_A_SHARD_GTID", + Keyspace: "connect-test", + } + newTC := &psdbconnect.TableCursor{ + Shard: "-", + Position: "I_AM_FARTHER_IN_THE_BINLOG", + Keyspace: "connect-test", + } + + getCurrentVGtidClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{{Cursor: newTC}}, + } + syncClient := &connectSyncClientMock{ + syncResponses: []*psdbconnect.SyncResponse{ + {Cursor: newTC}, + { + Result: []*query.QueryResult{ + sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|new_monitor")), + }, + }, + }, + } + + cc := clientConnectionMock{ + syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) { + if in.Cursor.Position == "current" { + return getCurrentVGtidClient, nil + } + return syncClient, nil + }, + } + ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) { + return &cc, nil + } + getKeyspaceTableColumnsFunc := func(ctx context.Context, keyspaceName string, tableName string) ([]MysqlColumn, error) { + return []MysqlColumn{{Name: "id", Type: "bigint", IsPrimaryKey: true}, {Name: "email", Type: "varchar(256)", IsPrimaryKey: false}}, nil + } + mysqlClient := NewTestMysqlClient(getKeyspaceTableColumnsFunc) + ped.Mysql = &mysqlClient + ps := PlanetScaleSource{ + Database: "connect-test", + } + onRow := func(res *sqltypes.Result, op Operation) error { + if op == OpType_Insert { + return errors.New("serialize failed") + } + return nil + } + onCursor := func(*psdbconnect.TableCursor) error { + return nil + } + + var ( + sc *SerializedCursor + err error + ) + assert.NotPanics(t, func() { + sc, err = ped.Read(context.Background(), dbl, ps, "customers", nil, tc, onRow, onCursor, nil) + }) + assert.NoError(t, err) + esc, err := TableCursorToSerializedCursor(tc) + assert.NoError(t, err) + assert.Equal(t, esc, sc) +} + func TestRead_UpdateCallbackErrorReturnsCursor(t *testing.T) { dbl := &dbLogger{} ped := connectClient{} @@ -352,8 +423,8 @@ func TestRead_UpdateCallbackErrorReturnsCursor(t *testing.T) { } syncClient := &connectSyncClientMock{ syncResponses: []*psdbconnect.SyncResponse{ + {Cursor: newTC}, { - Cursor: newTC, Updates: []*psdbconnect.UpdatedRow{ { Before: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|old_monitor")),