Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions lib/connect_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
clientoptions "github.com/planetscale/psdb/core/pool/options"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sqltypes"

Expand Down Expand Up @@ -271,6 +272,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
if tc.LastKnownPk != nil {
tc.Position = ""
}
syncStartCursor := cloneTableCursor(tc)

logger.Info(fmt.Sprintf("%sSyncing with cursor position : [%v], using last known PK : %v, stop cursor is : [%v]", preamble, tc.Position, tc.LastKnownPk != nil, stopPosition))

Expand Down Expand Up @@ -308,7 +310,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
}
sqlResult.Rows = append(sqlResult.Rows, row)
if err := onResult(sqlResult, OpType_Insert); err != nil {
return tc, status.Error(codes.Internal, "unable to serialize row")
return syncStartCursor, status.Error(codes.Internal, "unable to serialize row")
}
}
}
Expand All @@ -321,7 +323,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
}
sqlResult.Rows = append(sqlResult.Rows, row)
if err := onResult(sqlResult, OpType_Delete); err != nil {
return nil, status.Error(codes.Internal, "unable to serialize row")
return syncStartCursor, status.Error(codes.Internal, "unable to serialize row")
}
}
}
Expand All @@ -334,7 +336,7 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
After: serializeQueryResult(update.After),
}
if err := onUpdate(updatedRow); err != nil {
return nil, status.Error(codes.Internal, "unable to serialize update")
return syncStartCursor, status.Error(codes.Internal, "unable to serialize update")
}
}
}
Expand All @@ -359,6 +361,13 @@ func (p connectClient) sync(ctx context.Context, logger DatabaseLogger, tableNam
}
}

func cloneTableCursor(tc *psdbconnect.TableCursor) *psdbconnect.TableCursor {
if tc == nil {
return nil
}
return proto.Clone(tc).(*psdbconnect.TableCursor)
}

func (p connectClient) filterExistingColumns(ctx context.Context, ps PlanetScaleSource, tableName string, columns []string) ([]string, error) {
existingColumns := []string{}
results, err := (*p.Mysql).GetKeyspaceTableColumns(ctx, ps.Database, tableName)
Expand Down
215 changes: 215 additions & 0 deletions lib/connect_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,221 @@ func TestRead_CanReturnNewCursorIfNewFound(t *testing.T) {
assert.Equal(t, 2, cc.syncFnInvokedCount)
}

func TestRead_DeleteCallbackErrorReturnsCursor(t *testing.T) {
dbl := &dbLogger{}
ped := connectClient{}
testFields := sqltypes.MakeTestFields("pid|description", "int64|varbinary")
tc := &psdbconnect.TableCursor{
Shard: "-",
Position: "THIS_IS_A_SHARD_GTID",
Keyspace: "connect-test",
}
newTC := &psdbconnect.TableCursor{
Shard: "-",
Position: "I_AM_FARTHER_IN_THE_BINLOG",
Keyspace: "connect-test",
}

getCurrentVGtidClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{{Cursor: newTC}},
}
syncClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{
{Cursor: newTC},
{
Deletes: []*psdbconnect.DeletedRow{
{
Result: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|deleted_monitor")),
},
},
},
},
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
if in.Cursor.Position == "current" {
return getCurrentVGtidClient, nil
}
return syncClient, nil
},
}
ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) {
return &cc, nil
}
getKeyspaceTableColumnsFunc := func(ctx context.Context, keyspaceName string, tableName string) ([]MysqlColumn, error) {
return []MysqlColumn{{Name: "id", Type: "bigint", IsPrimaryKey: true}, {Name: "email", Type: "varchar(256)", IsPrimaryKey: false}}, nil
}
mysqlClient := NewTestMysqlClient(getKeyspaceTableColumnsFunc)
ped.Mysql = &mysqlClient
ps := PlanetScaleSource{
Database: "connect-test",
}
onRow := func(res *sqltypes.Result, op Operation) error {
if op == OpType_Delete {
return errors.New("serialize failed")
}
return nil
}
onCursor := func(*psdbconnect.TableCursor) error {
return nil
}

var (
sc *SerializedCursor
err error
)
assert.NotPanics(t, func() {
sc, err = ped.Read(context.Background(), dbl, ps, "customers", nil, tc, onRow, onCursor, nil)
})
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
assert.Equal(t, esc, sc)
}

func TestRead_InsertCallbackErrorReturnsCursor(t *testing.T) {
dbl := &dbLogger{}
ped := connectClient{}
testFields := sqltypes.MakeTestFields("pid|description", "int64|varbinary")
tc := &psdbconnect.TableCursor{
Shard: "-",
Position: "THIS_IS_A_SHARD_GTID",
Keyspace: "connect-test",
}
newTC := &psdbconnect.TableCursor{
Shard: "-",
Position: "I_AM_FARTHER_IN_THE_BINLOG",
Keyspace: "connect-test",
}

getCurrentVGtidClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{{Cursor: newTC}},
}
syncClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{
{Cursor: newTC},
{
Result: []*query.QueryResult{
sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|new_monitor")),
},
},
},
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
if in.Cursor.Position == "current" {
return getCurrentVGtidClient, nil
}
return syncClient, nil
},
}
ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) {
return &cc, nil
}
getKeyspaceTableColumnsFunc := func(ctx context.Context, keyspaceName string, tableName string) ([]MysqlColumn, error) {
return []MysqlColumn{{Name: "id", Type: "bigint", IsPrimaryKey: true}, {Name: "email", Type: "varchar(256)", IsPrimaryKey: false}}, nil
}
mysqlClient := NewTestMysqlClient(getKeyspaceTableColumnsFunc)
ped.Mysql = &mysqlClient
ps := PlanetScaleSource{
Database: "connect-test",
}
onRow := func(res *sqltypes.Result, op Operation) error {
if op == OpType_Insert {
return errors.New("serialize failed")
}
return nil
}
onCursor := func(*psdbconnect.TableCursor) error {
return nil
}

var (
sc *SerializedCursor
err error
)
assert.NotPanics(t, func() {
sc, err = ped.Read(context.Background(), dbl, ps, "customers", nil, tc, onRow, onCursor, nil)
})
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
assert.Equal(t, esc, sc)
}

func TestRead_UpdateCallbackErrorReturnsCursor(t *testing.T) {
dbl := &dbLogger{}
ped := connectClient{}
testFields := sqltypes.MakeTestFields("pid|description", "int64|varbinary")
tc := &psdbconnect.TableCursor{
Shard: "-",
Position: "THIS_IS_A_SHARD_GTID",
Keyspace: "connect-test",
}
newTC := &psdbconnect.TableCursor{
Shard: "-",
Position: "I_AM_FARTHER_IN_THE_BINLOG",
Keyspace: "connect-test",
}

getCurrentVGtidClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{{Cursor: newTC}},
}
syncClient := &connectSyncClientMock{
syncResponses: []*psdbconnect.SyncResponse{
{Cursor: newTC},
{
Updates: []*psdbconnect.UpdatedRow{
{
Before: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|old_monitor")),
After: sqltypes.ResultToProto3(sqltypes.MakeTestResult(testFields, "12|new_monitor")),
},
},
},
},
}

cc := clientConnectionMock{
syncFn: func(ctx context.Context, in *psdbconnect.SyncRequest, opts ...grpc.CallOption) (psdbconnect.Connect_SyncClient, error) {
if in.Cursor.Position == "current" {
return getCurrentVGtidClient, nil
}
return syncClient, nil
},
}
ped.clientFn = func(ctx context.Context, ps PlanetScaleSource) (psdbconnect.ConnectClient, error) {
return &cc, nil
}
getKeyspaceTableColumnsFunc := func(ctx context.Context, keyspaceName string, tableName string) ([]MysqlColumn, error) {
return []MysqlColumn{{Name: "id", Type: "bigint", IsPrimaryKey: true}, {Name: "email", Type: "varchar(256)", IsPrimaryKey: false}}, nil
}
mysqlClient := NewTestMysqlClient(getKeyspaceTableColumnsFunc)
ped.Mysql = &mysqlClient
ps := PlanetScaleSource{
Database: "connect-test",
}
onCursor := func(*psdbconnect.TableCursor) error {
return nil
}
onUpdate := func(*UpdatedRow) error {
return errors.New("serialize failed")
}

var (
sc *SerializedCursor
err error
)
assert.NotPanics(t, func() {
sc, err = ped.Read(context.Background(), dbl, ps, "customers", nil, tc, nil, onCursor, onUpdate)
})
assert.NoError(t, err)
esc, err := TableCursorToSerializedCursor(tc)
assert.NoError(t, err)
assert.Equal(t, esc, sc)
}

func TestRead_CanStopAtWellKnownCursor(t *testing.T) {
dbl := &dbLogger{}
ped := connectClient{}
Expand Down