Skip to content

Commit 841273e

Browse files
committed
Fix some post-merge conflicts.
1 parent e72c189 commit 841273e

File tree

9 files changed

+27
-31
lines changed

9 files changed

+27
-31
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,6 @@ export class MongoBucketBatch
146146
return this.last_checkpoint_lsn;
147147
}
148148

149-
get noCheckpointBeforeLsn() {
150-
return this.no_checkpoint_before_lsn;
151-
}
152-
153149
async flush(options?: storage.BatchBucketFlushOptions): Promise<storage.FlushedResult | null> {
154150
let result: storage.FlushedResult | null = null;
155151
// One flush may be split over multiple transactions.

modules/module-mssql/src/replication/CDCStream.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ export class CDCStream {
304304
const postSnapshotLSN = await getLatestLSN(this.connections);
305305
// Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction.
306306
await transaction.commit();
307-
const [updatedSourceTable] = await batch.markSnapshotDone([table.sourceTable], postSnapshotLSN.toString());
307+
const [updatedSourceTable] = await batch.markTableSnapshotDone([table.sourceTable], postSnapshotLSN.toString());
308308
this.tableCache.updateSourceTable(updatedSourceTable);
309309
} catch (e) {
310310
await transaction.rollback();
@@ -500,11 +500,11 @@ export class CDCStream {
500500

501501
// This will not create a consistent checkpoint yet, but will persist the op.
502502
// Actual checkpoint will be created when streaming replication caught up.
503+
const postSnapshotLSN = await getLatestLSN(this.connections);
504+
await batch.markAllSnapshotDone(postSnapshotLSN.toString());
503505
await batch.commit(snapshotLSN);
504506

505-
this.logger.info(
506-
`Snapshot done. Need to replicate from ${snapshotLSN} to ${batch.noCheckpointBeforeLsn} to be consistent`
507-
);
507+
this.logger.info(`Snapshot done. Need to replicate from ${snapshotLSN} to ${postSnapshotLSN} to be consistent`);
508508
}
509509
);
510510
}

modules/module-mssql/test/src/CDCStream.test.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ describe('CDCStream tests', () => {
1818
describeWithStorage({ timeout: 20_000 }, defineCDCStreamTests);
1919
});
2020

21-
function defineCDCStreamTests(factory: storage.TestStorageFactory) {
21+
function defineCDCStreamTests(config: storage.TestStorageConfig) {
22+
const { factory } = config;
23+
2224
test('Initial snapshot sync', async () => {
2325
await using context = await CDCStreamTestContext.open(factory);
2426
const { connectionManager } = context;

modules/module-mssql/test/src/CDCStream_resumable_snapshot.test.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { describe, expect, test } from 'vitest';
22
import { env } from './env.js';
33
import { createTestTableWithBasicId, describeWithStorage, waitForPendingCDCChanges } from './util.js';
4-
import { TestStorageFactory } from '@powersync/service-core';
4+
import { TestStorageConfig, TestStorageFactory } from '@powersync/service-core';
55
import { METRICS_HELPER } from '@powersync/service-core-tests';
66
import { ReplicationMetric } from '@powersync/service-types';
77
import * as timers from 'node:timers/promises';
@@ -10,19 +10,19 @@ import { CDCStreamTestContext } from './CDCStreamTestContext.js';
1010
import { getLatestLSN } from '@module/utils/mssql.js';
1111

1212
describe.skipIf(!(env.CI || env.SLOW_TESTS))('batch replication', function () {
13-
describeWithStorage({ timeout: 240_000 }, function (factory) {
13+
describeWithStorage({ timeout: 240_000 }, function (config) {
1414
test('resuming initial replication (1)', async () => {
1515
// Stop early - likely to not include deleted row in first replication attempt.
16-
await testResumingReplication(factory, 2000);
16+
await testResumingReplication(config, 2000);
1717
});
1818
test('resuming initial replication (2)', async () => {
1919
// Stop late - likely to include deleted row in first replication attempt.
20-
await testResumingReplication(factory, 8000);
20+
await testResumingReplication(config, 8000);
2121
});
2222
});
2323
});
2424

25-
async function testResumingReplication(factory: TestStorageFactory, stopAfter: number) {
25+
async function testResumingReplication(config: TestStorageConfig, stopAfter: number) {
2626
// This tests interrupting and then resuming initial replication.
2727
// We interrupt replication after test_data1 has fully replicated, and
2828
// test_data2 has partially replicated.
@@ -34,7 +34,9 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n
3434
// have been / have not been replicated at that point is not deterministic.
3535
// We do allow for some variation in the test results to account for this.
3636

37-
await using context = await CDCStreamTestContext.open(factory, { cdcStreamOptions: { snapshotBatchSize: 1000 } });
37+
await using context = await CDCStreamTestContext.open(config.factory, {
38+
cdcStreamOptions: { snapshotBatchSize: 1000 }
39+
});
3840

3941
await context.updateSyncRules(`bucket_definitions:
4042
global:
@@ -80,7 +82,7 @@ async function testResumingReplication(factory: TestStorageFactory, stopAfter: n
8082
}
8183

8284
// Bypass the usual "clear db on factory open" step.
83-
await using context2 = await CDCStreamTestContext.open(factory, {
85+
await using context2 = await CDCStreamTestContext.open(config.factory, {
8486
doNotClear: true,
8587
cdcStreamOptions: { snapshotBatchSize: 1000 }
8688
});

modules/module-mssql/test/src/util.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import * as types from '@module/types/types.js';
22
import { logger } from '@powersync/lib-services-framework';
3-
import { BucketStorageFactory, InternalOpId, ReplicationCheckpoint, TestStorageFactory } from '@powersync/service-core';
3+
import {
4+
BucketStorageFactory,
5+
InternalOpId,
6+
ReplicationCheckpoint,
7+
TestStorageConfig,
8+
TestStorageFactory
9+
} from '@powersync/service-core';
410

511
import * as mongo_storage from '@powersync/service-module-mongodb-storage';
612
import * as postgres_storage from '@powersync/service-module-postgres-storage';
@@ -20,11 +26,11 @@ export const INITIALIZED_MONGO_STORAGE_FACTORY = mongo_storage.test_utils.mongoT
2026
isCI: env.CI
2127
});
2228

23-
export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.test_utils.postgresTestStorageFactoryGenerator({
29+
export const INITIALIZED_POSTGRES_STORAGE_FACTORY = postgres_storage.test_utils.postgresTestSetup({
2430
url: env.PG_STORAGE_TEST_URL
2531
});
2632

27-
export function describeWithStorage(options: TestOptions, fn: (factory: TestStorageFactory) => void) {
33+
export function describeWithStorage(options: TestOptions, fn: (config: TestStorageConfig) => void) {
2834
describe.skipIf(!env.TEST_MONGO_STORAGE)(`mongodb storage`, options, function () {
2935
fn(INITIALIZED_MONGO_STORAGE_FACTORY);
3036
});

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,6 @@ export class PostgresBucketBatch
111111
return this.last_checkpoint_lsn;
112112
}
113113

114-
get noCheckpointBeforeLsn() {
115-
return this.no_checkpoint_before_lsn;
116-
}
117-
118114
async [Symbol.asyncDispose]() {
119115
super.clearListeners();
120116
}

modules/module-postgres-storage/src/utils/test-utils.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,3 @@ export function postgresTestSetup(factoryOptions: PostgresTestStorageOptions) {
9494
tableIdStrings: true
9595
};
9696
}
97-
98-
export function postgresTestStorageFactoryGenerator(factoryOptions: PostgresTestStorageOptions) {
99-
return postgresTestSetup(factoryOptions).factory;
100-
}

modules/module-postgres/test/src/storage_combination.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ describe.skipIf(!env.TEST_POSTGRES_STORAGE)('replication storage combination - p
77
test('should allow the same Postgres cluster to be used for data and storage', async () => {
88
// Use the same cluster for the storage as the data source
99
await using context = await WalStreamTestContext.open(
10-
postgres_storage.test_utils.postgresTestStorageFactoryGenerator({
10+
postgres_storage.test_utils.postgresTestSetup({
1111
url: env.PG_TEST_URL
12-
}),
12+
}).factory,
1313
{ doNotClear: false }
1414
);
1515

packages/service-core/src/storage/BucketStorageBatch.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,6 @@ export interface BucketStorageBatch extends ObserverClient<BucketBatchStorageLis
8787
markTableSnapshotRequired(table: SourceTable): Promise<void>;
8888
markAllSnapshotDone(no_checkpoint_before_lsn: string): Promise<void>;
8989

90-
noCheckpointBeforeLsn: string;
91-
9290
updateTableProgress(table: SourceTable, progress: Partial<TableSnapshotStatus>): Promise<SourceTable>;
9391

9492
/**

0 commit comments

Comments
 (0)