diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java index 547b6e4aa6..d16fc6bd3e 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java @@ -306,7 +306,7 @@ public void setUpBase() throws ExecutionException { } } - private String getSpecPath( + protected String getSpecPath( Class dataflowTemplateClass, Template templateMetadata, String pomPath) throws ExecutionException { if (TestProperties.specPath() != null && !TestProperties.specPath().isEmpty()) { @@ -334,7 +334,7 @@ private String getSpecPath( String.format("%s/%s%s", STAGING_PREFIX, flex ? "flex/" : "", templateMetadata.name()); String stagePath = String.format("gs://%s/%s", bucketName, blobPath); - String identifier = flex ? template.flexContainerName() : templateMetadata.name(); + String identifier = flex ? templateMetadata.flexContainerName() : templateMetadata.name(); stagedTemplates.get( identifier, @@ -359,7 +359,8 @@ private String getSpecPath( } } - String[] mavenCmd = buildMavenStageCommand(STAGING_PREFIX, pom, bucketName, template); + String[] mavenCmd = + buildMavenStageCommand(STAGING_PREFIX, pom, bucketName, templateMetadata); LOG.info("Running command to stage templates: {}", String.join(" ", mavenCmd)); try { diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java index c05dd3c666..0fdffeddff 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java @@ -202,7 +202,14 @@ private synchronized void maybeCreateInstance() { try { InstanceInfo instanceInfo = InstanceInfo.newBuilder(InstanceId.of(projectId, instanceId)) - .setInstanceConfigId(InstanceConfigId.of(projectId, "regional-" + region)) + .setInstanceConfigId( + InstanceConfigId.of( + projectId, + (region.startsWith("nam") + || region.startsWith("eur") + || region.startsWith("asia")) + ? region + : "regional-" + region)) .setDisplayName(instanceId) .setEdition(Edition.ENTERPRISE_PLUS) // Needed by Full Text Search. .setNodeCount(nodeCount) @@ -639,6 +646,10 @@ public synchronized ImmutableList readTableRecords( } } + public List getDatabaseDdl() { + return databaseAdminClient.getDatabaseDdl(instanceId, databaseId); + } + /** * Deletes all created resources (instance, database, and tables) and cleans up all Spanner * sessions, making the manager object unusable. diff --git a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java index e7e2006ed5..4efd0169f5 100644 --- a/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java +++ b/v1/src/main/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScanner.java @@ -1287,12 +1287,14 @@ private void listPropertyGraphLabels(Ddl.Builder builder) { for (int i = 0; i < labelsArray.length(); i++) { JSONObject label = labelsArray.getJSONObject(i); String name = label.getString("name"); - JSONArray propertyDeclarationNamesArray = label.getJSONArray("propertyDeclarationNames"); + JSONArray propertyDeclarationNamesArray = label.optJSONArray("propertyDeclarationNames"); List propertyNames = new ArrayList<>(); - for (int j = 0; j < propertyDeclarationNamesArray.length(); j++) { - String propertyName = propertyDeclarationNamesArray.getString(j); - propertyNames.add(propertyName); + if (propertyDeclarationNamesArray != null) { + for (int j = 0; j < propertyDeclarationNamesArray.length(); j++) { + String propertyName = propertyDeclarationNamesArray.getString(j); + propertyNames.add(propertyName); + } } ImmutableList immutablePropertyNames = ImmutableList.copyOf(propertyNames); @@ -1361,7 +1363,7 @@ private void listPropertyGraphTables(Ddl.Builder builder, String tableType) { String kind = table.getString("kind"); JSONArray labelNamesArray = table.getJSONArray("labelNames"); String name = table.getString("name"); - JSONArray propertyDefinitionsArray = table.getJSONArray("propertyDefinitions"); + JSONArray propertyDefinitionsArray = table.optJSONArray("propertyDefinitions"); ImmutableList.Builder keyColumnsBuilder = ImmutableList.builder(); for (int j = 0; j < keyColumnsArray.length(); j++) { @@ -1427,19 +1429,21 @@ private void listPropertyGraphTables(Ddl.Builder builder, String tableType) { propertyDefinitionsBuilder = ImmutableList.builder(); for (String propertyName : propertyGraphLabel.properties) { - for (int k = 0; k < propertyDefinitionsArray.length(); k++) { - JSONObject propertyDefinition = propertyDefinitionsArray.getJSONObject(k); - String propertyDeclarationName = - propertyDefinition.getString("propertyDeclarationName"); - - if (propertyName.equals(propertyDeclarationName)) { - PropertyGraph.PropertyDeclaration propertyDeclaration = - propertyGraph.getPropertyDeclaration(propertyDeclarationName); - propertyDefinitionsBuilder.add( - new GraphElementTable.PropertyDefinition( - propertyDeclaration.name, - propertyDefinition.getString("valueExpressionSql"))); - break; + if (propertyDefinitionsArray != null) { + for (int k = 0; k < propertyDefinitionsArray.length(); k++) { + JSONObject propertyDefinition = propertyDefinitionsArray.getJSONObject(k); + String propertyDeclarationName = + propertyDefinition.getString("propertyDeclarationName"); + + if (propertyName.equals(propertyDeclarationName)) { + PropertyGraph.PropertyDeclaration propertyDeclaration = + propertyGraph.getPropertyDeclaration(propertyDeclarationName); + propertyDefinitionsBuilder.add( + new GraphElementTable.PropertyDefinition( + propertyDeclaration.name, + propertyDefinition.getString("valueExpressionSql"))); + break; + } } } } diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbIT.java new file mode 100644 index 0000000000..eefc3438d7 --- /dev/null +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbIT.java @@ -0,0 +1,326 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.spanner; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.Dialect; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.ReadOnlyTransaction; +import com.google.cloud.spanner.Struct; +import com.google.cloud.teleport.metadata.Template; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.cloud.teleport.spanner.ddl.Column; +import com.google.cloud.teleport.spanner.ddl.Ddl; +import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner; +import com.google.cloud.teleport.spanner.ddl.RandomDdlGenerator; +import com.google.cloud.teleport.spanner.ddl.RandomInsertMutationGenerator; +import com.google.cloud.teleport.spanner.ddl.Table; +import com.google.cloud.teleport.spanner.spannerio.MutationGroup; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Resources; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; +import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; +import org.apache.beam.it.common.PipelineOperator.Result; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.TemplateTestBase; +import org.apache.beam.it.gcp.artifacts.Artifact; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Integration test for {@link ExportPipeline} and {@link ImportPipeline}. + * + *

This test completely validates the entire lifecycle of exporting a Spanner database to GCS + * using Avro/JSON format, and subsequently importing that exact data into a fresh Spanner database. + * It natively supports testing both Google Standard SQL (GSQL) and PostgreSQL dialects with various + * complex schema combinations (e.g., interleaved tables, foreign keys, arrays) and random data. + */ +@Category(IntegrationTest.class) +@TemplateIntegrationTest(ExportPipeline.class) +@RunWith(JUnit4.class) +public class CopyDbIT extends TemplateTestBase { + + // Resource managers for the source database (exported) and destination database (imported). + private SpannerResourceManager sourceResourceManager; + private SpannerResourceManager destResourceManager; + + @Before + public void setup() { + // We intentionally do NOT initialize the SpannerResourceManagers here. + // They are instantiated dynamically in the `createAndPopulate` method to + // support setting the correct Spanner Dialect (GSQL vs PG) for each specific test case. + } + + @After + public void teardown() { + ResourceManagerUtils.cleanResources(sourceResourceManager, destResourceManager); + } + + /** + * Initializes the source and destination Spanner databases, applies the generated DDL to the + * source database, and randomly populates it with data. + * + * @param ddl The Schema configuration to apply to the source database. + * @param numBatches The number of mutation batches to write to the source database. Set to 0 for + * an empty database test. + */ + private void createAndPopulate(Ddl ddl, int numBatches) { + // Initialize the databases with the appropriate dialect dynamically. + sourceResourceManager = + SpannerResourceManager.builder(testName + "-source", PROJECT, "nam3", ddl.dialect()) + .build(); + destResourceManager = + SpannerResourceManager.builder(testName + "-dest", PROJECT, "nam3", ddl.dialect()).build(); + + // Execute the schema statements on the source database. + // The destination database is intentionally left entirely empty (no tables) so the + // Import pipeline can completely reconstruct the schema on its own. + sourceResourceManager.executeDdlStatements(ddl.statements()); + destResourceManager.executeDdlStatements(Collections.emptyList()); + + if (numBatches > 0) { + final Iterator mutations = + new RandomInsertMutationGenerator(ddl).stream().iterator(); + + for (int i = 0; i < numBatches; i++) { + // We chunk the mutations into small batches of 10. + // This is strictly required to prevent exceeding Spanner's 100MB / 20k mutation limits, + // which would cause the integration test to crash on large random inserts. + List batchMutations = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + MutationGroup m = mutations.next(); + m.forEach(batchMutations::add); + } + sourceResourceManager.write(batchMutations); + } + } + } + + private void runTest() throws Exception { + runTest(Dialect.GOOGLE_STANDARD_SQL); + } + + /** + * Executes the end-to-end flow: 1. Launches Export Pipeline to write source DB to GCS. 2. + * Dynamically resolves the generated GCS path. 3. Launches Import Pipeline to read from GCS into + * the destination DB. 4. Validates that the schema and data match exactly. + * + * @param dialect The Spanner dialect being tested. + */ + private void runTest(Dialect dialect) throws Exception { + String outputDir = getGcsPath("output_" + testName + "/"); + + // ---------------------------------------------------------------------- + // 1. Run Export Pipeline + // ---------------------------------------------------------------------- + LaunchConfig.Builder exportConfig = + LaunchConfig.builder(testName, specPath) + .addParameter("instanceId", sourceResourceManager.getInstanceId()) + .addParameter("databaseId", sourceResourceManager.getDatabaseId()) + .addParameter("spannerProjectId", PROJECT) + .addParameter("outputDir", outputDir); + + LaunchInfo exportInfo = launchTemplate(exportConfig); + Result exportResult = pipelineOperator().waitUntilDone(createConfig(exportInfo)); + assertThat(exportResult).isEqualTo(Result.LAUNCH_FINISHED); + + // ---------------------------------------------------------------------- + // 2. Run Import Pipeline + // ---------------------------------------------------------------------- + Template importTemplate = ImportPipeline.class.getAnnotation(Template.class); + String importSpecPath = getSpecPath(ImportPipeline.class, importTemplate, "pom.xml"); + + // The Export pipeline generates a dynamic subdirectory name inside the outputDir. + // Instead of guessing this path (which can cause FileNotFound failures if the internal Dataflow + // Job IDs differ), we use the Artifact framework to fetch the exact Spanner JSON file. + List artifacts = + gcsClient.listArtifacts("output_" + testName, Pattern.compile(".*spanner-export\\.json$")); + if (artifacts.isEmpty()) { + throw new IllegalStateException("No spanner-export.json found under " + outputDir); + } + String spannerExportPath = artifacts.get(0).name(); + + // We strip the filename itself off the artifact path to resolve the true target directory + String importInputDir = + "gs://" + + gcsClient.getBucket() + + "/" + + spannerExportPath.substring(0, spannerExportPath.indexOf("spanner-export.json")); + + LaunchConfig.Builder importConfig = + LaunchConfig.builder(testName, importSpecPath) + .addParameter("instanceId", destResourceManager.getInstanceId()) + .addParameter("databaseId", destResourceManager.getDatabaseId()) + .addParameter("spannerProjectId", PROJECT) + .addParameter("inputDir", importInputDir) + .addParameter("waitForIndexes", "true") + .addParameter("waitForForeignKeys", "true") + .addParameter("waitForChangeStreams", "true") + .addParameter("waitForSequences", "true"); + + LaunchInfo importInfo = launchTemplate(importConfig, importTemplate); + Result importResult = pipelineOperator().waitUntilDone(createConfig(importInfo)); + assertThat(importResult).isEqualTo(Result.LAUNCH_FINISHED); + + // ---------------------------------------------------------------------- + // 3. Schema & Data Assertions + // ---------------------------------------------------------------------- + // We are asserting that the schema of the source database matches the destination database + // exactly. To do this, we read the parsed, tabular metadata from both databases using + // InformationSchemaScanner and reconstruct them into Ddl object models. We then print these + // models canonically and compare their strings. + // + // IMPORTANT: We cannot use SpannerResourceManager.getDatabaseDdl() directly for this assertion. + // getDatabaseDdl() returns the raw strings used to create the tables. For randomly generated + // schemas, these strings contain un-normalized formatting (e.g., randomized WHERE column + // order). + // The Dataflow Export pipeline normalizes the schema (alphabetizes columns, etc.) when writing + // it to spanner-export.json, which the Import pipeline then executes. As a result, the source + // and destination would have functionally identical but textually mismatched DDL strings. + // + // The InformationSchemaScanner guarantees a true structural comparison without arbitrary + // text-formatting false positives. + Ddl destinationDdl = readDdl(destResourceManager, dialect); + Ddl sourceDdl = readDdl(sourceResourceManager, dialect); + + // Ensure the entire structural representation of the schemas (types, lengths, keys) is + // identical. + assertThat(destinationDdl.prettyPrint()).isEqualTo(sourceDdl.prettyPrint()); + + // Iterate through every single table and systematically ensure every row and column is + // identical. + for (Table table : destinationDdl.allTables()) { + List columnNames = + table.columns().stream() + .filter( + c -> + !(c.isGenerated() && !c.isStored()) + && !c.typeString().toUpperCase().contains("TOKENLIST")) + .map(Column::name) + .collect(Collectors.toList()); + + List sourceRecords = + sourceResourceManager.readTableRecords(table.name(), columnNames); + List destRecords = destResourceManager.readTableRecords(table.name(), columnNames); + + // assertThat(...).containsExactlyElementsIn ignores absolute ordering, + // which is required since distributed Spanner queries do not guarantee return order. + assertThat(destRecords).containsExactlyElementsIn(sourceRecords); + } + } + + private Ddl readDdl(SpannerResourceManager resourceManager, Dialect dialect) { + DatabaseClient dbClient = resourceManager.getDatabaseClient(); + Ddl ddl; + try (ReadOnlyTransaction ctx = dbClient.readOnlyTransaction()) { + ddl = new InformationSchemaScanner(ctx, dialect).scan(); + } + return ddl; + } + + private void createAndPopulate(String sqlFile, Dialect dialect, int numBatches) throws Exception { + sourceResourceManager = + SpannerResourceManager.builder(testName + "-source", PROJECT, "nam3", dialect).build(); + destResourceManager = + SpannerResourceManager.builder(testName + "-dest", PROJECT, "nam3", dialect).build(); + + // Read the SQL statements from the static file + String ddlString = + String.join( + " ", + Resources.readLines(Resources.getResource(sqlFile), StandardCharsets.UTF_8).stream() + .map(line -> line.replaceAll("\\s*--.*$", "")) + .collect(ImmutableList.toImmutableList())); + ddlString = + ddlString + .trim() + .replaceAll("%PROJECT_ID%", PROJECT) + .replaceAll("%DATABASE_NAME%", sourceResourceManager.getDatabaseId()); + List ddlStatements = + Arrays.stream(ddlString.split(";")).filter(d -> !d.isBlank()).collect(Collectors.toList()); + + // Execute the schema statements on the source database. + sourceResourceManager.executeDdlStatements(ddlStatements); + destResourceManager.executeDdlStatements(Collections.emptyList()); + + if (numBatches > 0) { + // Use InformationSchemaScanner to dynamically extract the loaded schema into our Ddl object + Ddl ddl = readDdl(sourceResourceManager, dialect); + final Iterator mutations = + new RandomInsertMutationGenerator(ddl).stream().iterator(); + + for (int i = 0; i < numBatches; i++) { + // We chunk the mutations into small batches of 10. + // This is strictly required to prevent exceeding Spanner's 100MB / 20k mutation limits, + // which would cause the integration test to crash on large random inserts. + List batchMutations = new ArrayList<>(); + for (int j = 0; j < 10; j++) { + MutationGroup m = mutations.next(); + m.forEach(batchMutations::add); + } + sourceResourceManager.write(batchMutations); + } + } + } + + @Test + public void testAllSchemaAndDataGsql() throws Exception { + createAndPopulate("CopyDbIT-gsql.sql", Dialect.GOOGLE_STANDARD_SQL, 100); + runTest(Dialect.GOOGLE_STANDARD_SQL); + } + + @Test + public void testAllSchemaAndDataPg() throws Exception { + createAndPopulate("CopyDbIT-pg.sql", Dialect.POSTGRESQL, 100); + runTest(Dialect.POSTGRESQL); + } + + @Test + public void testRandomSchemaAndDataGsql() throws Exception { + Ddl ddl = RandomDdlGenerator.builder().build().generate(); + createAndPopulate(ddl, 100); + runTest(Dialect.GOOGLE_STANDARD_SQL); + } + + @Test + public void testRandomSchemaAndDataPg() throws Exception { + Ddl ddl = RandomDdlGenerator.builder(Dialect.POSTGRESQL).build().generate(); + createAndPopulate(ddl, 100); + runTest(Dialect.POSTGRESQL); + } + + @Test + public void testEmptyDbGsql() throws Exception { + Ddl ddl = Ddl.builder(Dialect.GOOGLE_STANDARD_SQL).build(); + createAndPopulate(ddl, 0); + runTest(Dialect.GOOGLE_STANDARD_SQL); + } +} diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java b/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java deleted file mode 100644 index c1a64702e0..0000000000 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/CopyDbTest.java +++ /dev/null @@ -1,1356 +0,0 @@ -/* - * Copyright (C) 2018 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.spanner; - -import static org.hamcrest.Matchers.is; -import static org.hamcrest.text.IsEqualCompressingWhiteSpace.equalToCompressingWhiteSpace; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; - -import com.google.cloud.spanner.DatabaseClient; -import com.google.cloud.spanner.Dialect; -import com.google.cloud.spanner.ReadOnlyTransaction; -import com.google.cloud.teleport.spanner.common.Type; -import com.google.cloud.teleport.spanner.common.Type.StructField; -import com.google.cloud.teleport.spanner.ddl.Ddl; -import com.google.cloud.teleport.spanner.ddl.InformationSchemaScanner; -import com.google.cloud.teleport.spanner.ddl.RandomDdlGenerator; -import com.google.cloud.teleport.spanner.ddl.Udf.SqlSecurity; -import com.google.cloud.teleport.spanner.ddl.UdfParameter; -import com.google.cloud.teleport.spanner.proto.ExportProtos.Export; -import com.google.cloud.teleport.spanner.spannerio.SpannerConfig; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.ValueProvider; -import org.apache.beam.sdk.testing.PAssert; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.values.PCollection; -import org.junit.After; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TemporaryFolder; - -/** - * An end to end test that exports and imports a database and verifies that the content is - * identical. Additionally, this test verifies the behavior of table level export. This requires an - * active GCP project with a Spanner instance. Hence this test can only be run locally with a - * project set up using 'gcloud config'. - */ -@Category(IntegrationTest.class) -public class CopyDbTest { - private final Timestamp timestamp = new Timestamp(System.currentTimeMillis()); - private final long numericTime = timestamp.getTime(); - private final String sourceDb = "copydb-source" + Long.toString(numericTime); - private final String destinationDb = "copydb-dest" + Long.toString(numericTime); - private final String destDbPrefix = "import"; - - @Rule public final transient TestPipeline exportPipeline = TestPipeline.create(); - @Rule public final transient TestPipeline importPipeline = TestPipeline.create(); - @Rule public final transient TestPipeline comparePipeline = TestPipeline.create(); - @Rule public final TemporaryFolder tmpDir = new TemporaryFolder(); - @Rule public final SpannerServerResource spannerServer = new SpannerServerResource(); - - @After - public void teardown() { - spannerServer.dropDatabase(sourceDb); - spannerServer.dropDatabase(destinationDb); - } - - private void createAndPopulate(Ddl ddl, int numBatches) throws Exception { - switch (ddl.dialect()) { - case GOOGLE_STANDARD_SQL: - spannerServer.createDatabase(sourceDb, ddl.statements()); - spannerServer.createDatabase(destinationDb, Collections.emptyList()); - break; - case POSTGRESQL: - spannerServer.createPgDatabase(sourceDb, ddl.statements()); - spannerServer.createPgDatabase(destinationDb, Collections.emptyList()); - break; - default: - throw new IllegalArgumentException("Unrecognized dialect: " + ddl.dialect()); - } - spannerServer.populateRandomData(sourceDb, ddl, numBatches); - } - - @Test - public void allTypesSchema() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("AllTYPES") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("bool_field").bool().endColumn() - .column("int64_field").int64().endColumn() - .column("float32_field").float32().endColumn() - .column("float64_field").float64().endColumn() - .column("string_field").string().max().endColumn() - .column("bytes_field").bytes().max().endColumn() - .column("timestamp_field").timestamp().endColumn() - .column("date_field").date().endColumn() - .column("arr_bool_field").type(Type.array(Type.bool())).endColumn() - .column("arr_int64_field").type(Type.array(Type.int64())).endColumn() - .column("arr_float32_field").type(Type.array(Type.float32())).endColumn() - .column("arr_float64_field").type(Type.array(Type.float64())).endColumn() - .column("arr_string_field").type(Type.array(Type.string())).max().endColumn() - .column("arr_bytes_field").type(Type.array(Type.bytes())).max().endColumn() - .column("arr_timestamp_field").type(Type.array(Type.timestamp())).endColumn() - .column("arr_date_field").type(Type.array(Type.date())).endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable() - .build(); - // spotless:on - createAndPopulate(ddl, 100); - runTest(); - } - - @Test - public void allPgTypesSchema() throws Exception { - // spotless:off - Ddl ddl = - Ddl.builder(Dialect.POSTGRESQL) - .createTable("Users") - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name") - .pgVarchar() - .size(5) - .endColumn() - .column("age") - .pgInt8() - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .end() - .endTable() - .createTable("AllTYPES") - .column("id") - .pgInt8() - .notNull() - .endColumn() - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name") - .pgVarchar() - .size(5) - .endColumn() - .column("bool_field") - .pgBool() - .endColumn() - .column("int_field") - .pgInt8() - .endColumn() - .column("float32_field") - .pgFloat4() - .endColumn() - .column("float64_field") - .pgFloat8() - .endColumn() - .column("string_field") - .pgText() - .endColumn() - .column("bytes_field") - .pgBytea() - .endColumn() - .column("timestamp_field") - .pgTimestamptz() - .endColumn() - .column("numeric_field") - .pgNumeric() - .endColumn() - .column("date_field") - .pgDate() - .endColumn() - .column("arr_bool_field") - .type(Type.pgArray(Type.pgBool())) - .endColumn() - .column("arr_int_field") - .type(Type.pgArray(Type.pgInt8())) - .endColumn() - .column("arr_float32_field") - .type(Type.pgArray(Type.pgFloat4())) - .endColumn() - .column("arr_float64_field") - .type(Type.pgArray(Type.pgFloat8())) - .endColumn() - .column("arr_string_field") - .type(Type.pgArray(Type.pgVarchar())) - .max() - .endColumn() - .column("arr_bytes_field") - .type(Type.pgArray(Type.pgBytea())) - .max() - .endColumn() - .column("arr_timestamp_field") - .type(Type.pgArray(Type.pgTimestamptz())) - .endColumn() - .column("arr_date_field") - .type(Type.pgArray(Type.pgDate())) - .endColumn() - .column("arr_numeric_field") - .type(Type.pgArray(Type.pgNumeric())) - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .asc("id") - .asc("float64_field") - .end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable() - .build(); - // spotless:on - createAndPopulate(ddl, 100); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void emptyTables() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("AllTYPES") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("bool_field").bool().endColumn() - .column("int64_field").int64().endColumn() - .column("float32_field").float32().endColumn() - .column("float64_field").float64().endColumn() - .column("string_field").string().max().endColumn() - .column("bytes_field").bytes().max().endColumn() - .column("timestamp_field").timestamp().endColumn() - .column("date_field").date().endColumn() - .column("arr_bool_field").type(Type.array(Type.bool())).endColumn() - .column("arr_int64_field").type(Type.array(Type.int64())).endColumn() - .column("arr_float32_field").type(Type.array(Type.float32())).endColumn() - .column("arr_float64_field").type(Type.array(Type.float64())).endColumn() - .column("arr_string_field").type(Type.array(Type.string())).max().endColumn() - .column("arr_bytes_field").type(Type.array(Type.bytes())).max().endColumn() - .column("arr_timestamp_field").type(Type.array(Type.timestamp())).endColumn() - .column("arr_date_field").type(Type.array(Type.date())).endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .endTable() - .build(); - createAndPopulate(ddl, 10); - - // Add empty tables. - Ddl emptyTables = Ddl.builder() - .createTable("empty_one") - .column("first").string().max().endColumn() - .column("second").string().size(5).endColumn() - .column("value").int64().endColumn() - .primaryKey().asc("first").desc("second").end() - .endTable() - .createTable("empty_two") - .column("first").string().max().endColumn() - .column("second").string().size(5).endColumn() - .column("value").int64().endColumn() - .column("another_value").int64().endColumn() - .primaryKey().asc("first").end() - .endTable() - .build(); - // spotless:on - spannerServer.updateDatabase(sourceDb, emptyTables.createTableStatements()); - runTest(); - } - - @Test - public void emptyPgTables() throws Exception { - // spotless:off - Ddl ddl = - Ddl.builder(Dialect.POSTGRESQL) - .createTable("Users") - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name").pgVarchar().size(5).endColumn() - .column("age") - .pgInt8() - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .end() - .endTable() - .createTable("AllTYPES") - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name").pgVarchar().size(5).endColumn() - .column("id") - .pgInt8() - .notNull() - .endColumn() - .column("bool_field") - .pgBool() - .endColumn() - .column("int_field") - .pgInt8() - .endColumn() - .column("float32_field") - .pgFloat4() - .endColumn() - .column("float64_field") - .pgFloat8() - .endColumn() - .column("string_field") - .pgText() - .endColumn() - .column("bytes_field") - .pgBytea() - .endColumn() - .column("timestamp_field") - .pgTimestamptz() - .endColumn() - .column("numeric_field") - .pgNumeric() - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .asc("id") - .end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable() - .build(); - createAndPopulate(ddl, 10); - - // Add empty tables. - Ddl emptyTables = - Ddl.builder(Dialect.POSTGRESQL) - .createTable("empty_one") - .column("first") - .pgVarchar() - .max() - .endColumn() - .column("second").pgVarchar().size(5).endColumn() - .column("value") - .pgInt8() - .endColumn() - .primaryKey() - .asc("first") - .asc("second") - .end() - .endTable() - .createTable("empty_two") - .column("first") - .pgVarchar() - .max() - .endColumn() - .column("second").pgVarchar().size(5).endColumn() - .column("value") - .pgInt8() - .endColumn() - .column("another_value") - .pgInt8() - .endColumn() - .primaryKey() - .asc("first") - .end() - .endTable() - .build(); - // spotless:on - spannerServer.updateDatabase(sourceDb, emptyTables.createTableStatements()); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void allEmptyTables() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("AllTYPES") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("bool_field").bool().endColumn() - .column("int64_field").int64().endColumn() - .column("float32_field").float32().endColumn() - .column("float64_field").float64().endColumn() - .column("string_field").string().max().endColumn() - .column("bytes_field").bytes().max().endColumn() - .column("timestamp_field").timestamp().endColumn() - .column("date_field").date().endColumn() - .column("arr_bool_field").type(Type.array(Type.bool())).endColumn() - .column("arr_int64_field").type(Type.array(Type.int64())).endColumn() - .column("arr_float32_field").type(Type.array(Type.float32())).endColumn() - .column("arr_float64_field").type(Type.array(Type.float64())).endColumn() - .column("arr_string_field").type(Type.array(Type.string())).max().endColumn() - .column("arr_bytes_field").type(Type.array(Type.bytes())).max().endColumn() - .column("arr_timestamp_field").type(Type.array(Type.timestamp())).endColumn() - .column("arr_date_field").type(Type.array(Type.date())).endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .endTable() - .build(); - // spotless:on - createAndPopulate(ddl, 0); - runTest(); - } - - @Test - public void allEmptyPgTables() throws Exception { - // spotless:off - Ddl ddl = - Ddl.builder(Dialect.POSTGRESQL) - .createTable("Users") - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name") - .pgVarchar() - .size(5) - .endColumn() - .column("age") - .pgInt8() - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .end() - .endTable() - .createTable("AllTYPES") - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name") - .pgVarchar() - .size(5) - .endColumn() - .column("id") - .pgInt8() - .notNull() - .endColumn() - .column("bool_field") - .pgBool() - .endColumn() - .column("int_field") - .pgInt8() - .endColumn() - .column("float32_field") - .pgFloat4() - .endColumn() - .column("float64_field") - .pgFloat8() - .endColumn() - .column("string_field") - .pgText() - .endColumn() - .column("bytes_field") - .pgBytea() - .endColumn() - .column("timestamp_field") - .pgTimestamptz() - .endColumn() - .column("numeric_field") - .pgNumeric() - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .asc("id") - .end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable() - .build(); - // spotless:on - createAndPopulate(ddl, 0); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void databaseOptions() throws Exception { - Ddl.Builder ddlBuilder = Ddl.builder(); - // Table Content - // spotless:off - ddlBuilder.createTable("Users") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("age").int64().endColumn() - .primaryKey().asc("first_name").desc("last_name").end() - .endTable() - .createTable("EmploymentData") - .column("first_name").string().max().endColumn() - .column("last_name").string().size(5).endColumn() - .column("id").int64().notNull().endColumn() - .column("age").int64().endColumn() - .column("address").string().max().endColumn() - .primaryKey().asc("first_name").desc("last_name").asc("id").end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable(); - // spotless:on - // Allowed and well-formed database option - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("version_retention_period") - .setOptionValue("\"6d\"") - .build()); - // Disallowed database option - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("optimizer_version") - .setOptionValue("1") - .build()); - // Malformed database option - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("123version") - .setOptionValue("xyz") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = ddlBuilder.build(); - createAndPopulate(ddl, 100); - runTest(); - Ddl destinationDdl = readDdl(destinationDb, Dialect.GOOGLE_STANDARD_SQL); - List destDbOptions = destinationDdl.setOptionsStatements(destinationDb); - assertThat(destDbOptions.size(), is(1)); - assertThat( - destDbOptions.get(0), - is( - "ALTER DATABASE `" - + destinationDb - + "` SET OPTIONS ( version_retention_period = \"6d\" )")); - } - - @Test - public void pgDatabaseOptions() throws Exception { - Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); - // Table Content - // spotless:off - ddlBuilder - .createTable("Users") - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name").pgVarchar().size(5).endColumn() - .column("age") - .pgInt8() - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .end() - .endTable() - .createTable("EmploymentData") - .column("first_name") - .pgVarchar() - .max() - .endColumn() - .column("last_name").pgVarchar().size(5).endColumn() - .column("id") - .pgInt8() - .notNull() - .endColumn() - .column("age") - .pgInt8() - .endColumn() - .column("address") - .pgVarchar() - .max() - .endColumn() - .primaryKey() - .asc("first_name") - .asc("last_name") - .asc("id") - .end() - .interleaveInParent("Users") - .onDeleteCascade() - .endTable(); - // spotless:on - // Allowed and well-formed database option - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("version_retention_period") - .setOptionValue("'6d'") - .build()); - // Disallowed database option - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("optimizer_version") - .setOptionValue("1") - .build()); - // Malformed database option - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("123version") - .setOptionValue("xyz") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = ddlBuilder.build(); - createAndPopulate(ddl, 100); - runTest(Dialect.POSTGRESQL); - Ddl destinationDdl = readDdl(destinationDb, Dialect.POSTGRESQL); - List destDbOptions = destinationDdl.setOptionsStatements(destinationDb); - assertThat(destDbOptions.size(), is(1)); - assertThat( - destDbOptions.get(0), - is("ALTER DATABASE \"" + destinationDb + "\" SET spanner.version_retention_period = '6d'")); - } - - @Test - public void emptyDb() throws Exception { - Ddl ddl = Ddl.builder().build(); - createAndPopulate(ddl, 0); - runTest(); - } - - @Test - public void emptyPgDb() throws Exception { - Ddl ddl = Ddl.builder(Dialect.POSTGRESQL).build(); - createAndPopulate(ddl, 0); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void foreignKeys() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("Ref") - .column("id1").int64().endColumn() - .column("id2").int64().endColumn() - .primaryKey().asc("id1").asc("id2").end() - .endTable() - .createTable("Child") - .column("id1").int64().endColumn() - .column("id2").int64().endColumn() - .column("id3").int64().endColumn() - .primaryKey().asc("id1").asc("id2").asc("id3").end() - .interleaveInParent("Ref") - // Add some foreign keys that are guaranteed to be satisfied due to interleaving - .foreignKeys(ImmutableList.of( - "ALTER TABLE `Child` ADD CONSTRAINT `fk1` FOREIGN KEY (`id1`) REFERENCES `Ref` (`id1`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk2` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk3` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk4` FOREIGN KEY (`id2`, `id1`) REFERENCES `Ref` (`id2`, `id1`)", - "ALTER TABLE `Child` ADD CONSTRAINT `fk5` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`) NOT ENFORCED", - "ALTER TABLE `Child` ADD CONSTRAINT `fk6` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`) ENFORCED")) - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 100); - runTest(); - } - - @Test - public void pgForeignKeys() throws Exception { - // spotless:off - Ddl ddl = - Ddl.builder(Dialect.POSTGRESQL) - .createTable("Ref") - .column("id1") - .pgInt8() - .endColumn() - .column("id2") - .pgInt8() - .endColumn() - .primaryKey() - .asc("id1") - .asc("id2") - .end() - .endTable() - .createTable("Child") - .column("id1") - .pgInt8() - .endColumn() - .column("id2") - .pgInt8() - .endColumn() - .column("id3") - .pgInt8() - .endColumn() - .primaryKey() - .asc("id1") - .asc("id2") - .asc("id3") - .end() - .interleaveInParent("Ref") - // Add some foreign keys that are guaranteed to be satisfied due to interleaving - .foreignKeys( - ImmutableList.of( - "ALTER TABLE \"Child\" ADD CONSTRAINT \"fk1\" FOREIGN KEY (\"id1\") REFERENCES" - + " \"Ref\" (\"id1\")", - "ALTER TABLE \"Child\" ADD CONSTRAINT \"fk2\" FOREIGN KEY (\"id2\") REFERENCES" - + " \"Ref\" (\"id2\")", - "ALTER TABLE \"Child\" ADD CONSTRAINT \"fk3\" FOREIGN KEY (\"id2\") REFERENCES" - + " \"Ref\" (\"id2\")", - "ALTER TABLE \"Child\" ADD CONSTRAINT \"fk4\" FOREIGN KEY (\"id2\", \"id1\") " - + "REFERENCES \"Ref\" (\"id2\", \"id1\")")) - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 100); - runTest(Dialect.POSTGRESQL); - } - - // TODO: enable this test once CHECK constraints are enabled - // @Test - public void checkConstraints() throws Exception { - // spotless:off - Ddl ddl = Ddl.builder() - .createTable("T") - .column("id").int64().endColumn() - .column("A").int64().endColumn() - .primaryKey().asc("id").end() - .checkConstraints(ImmutableList.of( - "CONSTRAINT `ck` CHECK(TO_HEX(SHA1(CAST(A AS STRING))) <= '~')")) - .endTable().build(); - // spotless:on - - createAndPopulate(ddl, 100); - runTest(); - } - - @Test - public void pgCheckConstraints() throws Exception { - // spotless:off - Ddl ddl = - Ddl.builder(Dialect.POSTGRESQL) - .createTable("T") - .column("id") - .pgInt8() - .endColumn() - .column("A") - .pgInt8() - .endColumn() - .primaryKey() - .asc("id") - .end() - .checkConstraints( - ImmutableList.of( - "CONSTRAINT \"ck\" CHECK(LENGTH(CAST(\"A\" AS VARCHAR)) >= '0'::bigint)")) - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 100); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void models() throws Exception { - // spotless:off - Ddl ddl = - Ddl.builder() - .createModel("Iris") - .remote(true) - .options(ImmutableList.of( - "endpoint=\"//aiplatform.googleapis.com/projects/span-cloud-testing/locations/us-central1/endpoints/4608339105032437760\"")) - .inputColumn("f1").type(Type.float64()).size(-1).endInputColumn() - .inputColumn("f2").type(Type.float64()).size(-1).endInputColumn() - .inputColumn("f3").type(Type.float64()).size(-1).endInputColumn() - .inputColumn("f4").type(Type.float64()).size(-1).endInputColumn() - .outputColumn("classes").type(Type.array(Type.string())).size(-1).endOutputColumn() - .outputColumn("scores").type(Type.array(Type.float64())).size(-1).endOutputColumn() - .endModel() - .createModel("TextEmbeddingGecko") - .remote(true) - .options(ImmutableList.of( - "endpoint=\"//aiplatform.googleapis.com/projects/span-cloud-testing/locations/us-central1/publishers/google/models/textembedding-gecko\"")) - .inputColumn("content").type(Type.string()).size(-1).endInputColumn() - .outputColumn("embeddings").type(Type.struct( - StructField.of("statistics", Type.struct(StructField.of("truncated", Type.bool()), - StructField.of("token_count", Type.float64()))), - StructField.of("values", Type.array(Type.float64())))).size(-1).endOutputColumn() - .endModel() - .build(); - // spotless:on - - createAndPopulate(ddl, 0); - runTest(); - } - - @Test - public void changeStreams() throws Exception { - Ddl ddl = - Ddl.builder() - .createTable("T1") - .endTable() - .createTable("T2") - .column("key") - .int64() - .endColumn() - .column("c1") - .int64() - .endColumn() - .column("c2") - .string() - .max() - .endColumn() - .primaryKey() - .asc("key") - .end() - .endTable() - .createTable("T3") - .endTable() - .createChangeStream("ChangeStreamAll") - .forClause("FOR ALL") - .options( - ImmutableList.of( - "retention_period=\"7d\"", "value_capture_type=\"OLD_AND_NEW_VALUES\"")) - .endChangeStream() - .createChangeStream("ChangeStreamEmpty") - .endChangeStream() - .createChangeStream("ChangeStreamTableColumns") - .forClause("FOR `T1`, `T2`(`c1`, `c2`), `T3`()") - .endChangeStream() - .build(); - createAndPopulate(ddl, 0); - runTest(); - } - - // TODO: Enable the test once change streams are supported in PG. - // @Test - public void pgChangeStreams() throws Exception { - Ddl ddl = - Ddl.builder(Dialect.POSTGRESQL) - .createTable("T1") - .column("key") - .pgInt8() - .endColumn() - .primaryKey() - .asc("key") - .end() - .endTable() - .createTable("T2") - .column("key") - .pgInt8() - .endColumn() - .column("c1") - .pgInt8() - .endColumn() - .column("c2") - .pgVarchar() - .max() - .endColumn() - .primaryKey() - .asc("key") - .end() - .endTable() - .createTable("T3") - .column("key") - .pgInt8() - .endColumn() - .primaryKey() - .asc("key") - .end() - .endTable() - .createChangeStream("ChangeStreamAll") - .forClause("FOR ALL") - .options( - ImmutableList.of( - "retention_period='7d'", "value_capture_type='OLD_AND_NEW_VALUES'")) - .endChangeStream() - .createChangeStream("ChangeStreamEmpty") - .endChangeStream() - .createChangeStream("ChangeStreamTableColumns") - .forClause("FOR \"T1\", \"T2\"(\"c1\", \"c2\"), \"T3\"()") - .endChangeStream() - .build(); - createAndPopulate(ddl, 0); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void identityColumn() throws Exception { - // spotless:off - Ddl.Builder ddlBuilder = Ddl.builder(); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = ddlBuilder - .createTable("IdentityTable") - .column("id") - .int64() - .isIdentityColumn(true) - .sequenceKind("bit_reversed_positive") - .counterStartValue(1000L) - .skipRangeMin(2000L) - .skipRangeMax(3000L) - .endColumn() - .column("non_key_column") - .int64() - .isIdentityColumn(true) - .sequenceKind("bit_reversed_positive") - .counterStartValue(1000L) - .skipRangeMin(2000L) - .skipRangeMax(3000L) - .endColumn() - .column("no_sequence_kind_column") - .int64() - .isIdentityColumn(true) - .sequenceKind("default") - .counterStartValue(1000L) - .skipRangeMin(2000L) - .skipRangeMax(3000L) - .endColumn() - .column("value").int64().endColumn() - .primaryKey().asc("id").end() - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 10); - runTest(); - } - - @Test - public void pgIdentityColumn() throws Exception { - // spotless:off - Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = ddlBuilder - .createTable("IdentityTable") - .column("id") - .int64() - .isIdentityColumn(true) - .sequenceKind("bit_reversed_positive") - .counterStartValue(1000L) - .skipRangeMin(2000L) - .skipRangeMax(3000L) - .endColumn() - .column("non_key_column") - .int64() - .isIdentityColumn(true) - .sequenceKind("bit_reversed_positive") - .counterStartValue(1000L) - .skipRangeMin(2000L) - .skipRangeMax(3000L) - .endColumn() - .column("no_sequence_kind_column") - .int64() - .isIdentityColumn(true) - .sequenceKind("default") - .counterStartValue(1000L) - .skipRangeMin(2000L) - .skipRangeMax(3000L) - .endColumn() - .column("value").int64().endColumn() - .primaryKey().asc("id").end() - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 10); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void commitTimestampColumns() throws Exception { - // spotless:off - Ddl.Builder ddlBuilder = Ddl.builder(); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = ddlBuilder - .createTable("CommitTimestampTable") - .column("id") - .int64() - .endColumn() - .column("default_commit_ts") - .type(Type.timestamp()) - .defaultExpression("PENDING_COMMIT_TIMESTAMP()") - .columnOptions(ImmutableList.of("allow_commit_timestamp=TRUE")) - .endColumn() - .column("on_update_ts") - .type(Type.timestamp()) - .defaultExpression("PENDING_COMMIT_TIMESTAMP()") - .onUpdateExpression("PENDING_COMMIT_TIMESTAMP()") - .columnOptions(ImmutableList.of("allow_commit_timestamp=TRUE")) - .endColumn() - .primaryKey().asc("id").end() - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 10); - runTest(); - } - - @Test - public void pgCommitTimestampColumns() throws Exception { - // spotless:off - Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = ddlBuilder - .createTable("CommitTimestampTable") - .column("id") - .int64() - .endColumn() - .column("default_commit_ts") - .pgSpannerCommitTimestamp() - .defaultExpression("spanner.pending_commit_timestamp()") - .endColumn() - .column("on_update_ts") - .pgSpannerCommitTimestamp() - .defaultExpression("spanner.pending_commit_timestamp()") - .onUpdateExpression("spanner.pending_commit_timestamp()") - .endColumn() - .primaryKey().asc("id").end() - .endTable() - .build(); - // spotless:on - - createAndPopulate(ddl, 10); - runTest(); - } - - @Test - public void udfs() throws Exception { - Ddl.Builder ddlBuilder = Ddl.builder(); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = - ddlBuilder - .createSchema("s1") - .endNamedSchema() - .createUdf("s1.Foo1") - .dialect(Dialect.GOOGLE_STANDARD_SQL) - .name("s1.Foo1") - .definition("(SELECT 'bar')") - .endUdf() - .createUdf("s1.Foo2") - .dialect(Dialect.GOOGLE_STANDARD_SQL) - .name("s1.Foo2") - .definition("(SELECT 'bar')") - .security(SqlSecurity.INVOKER) - .type("STRING") - .addParameter(UdfParameter.parse("arg0 STRING", "s1.Foo2", Dialect.GOOGLE_STANDARD_SQL)) - .addParameter( - UdfParameter.parse( - "arg1 STRING DEFAULT 'bar'", "s1.Foo2", Dialect.GOOGLE_STANDARD_SQL)) - .endUdf() - .createUdf("s1.Foo3") - .dialect(Dialect.GOOGLE_STANDARD_SQL) - .name("s1.Foo3") - .language("REMOTE") - .type("INT64") - .addParameter(UdfParameter.parse("arg0 INT64", "s1.Foo3", Dialect.GOOGLE_STANDARD_SQL)) - .options( - ImmutableList.of( - "endpoint=\"https://us-central1-myproject.cloudfunctions.net/myfunc\"")) - .endUdf() - .build(); - createAndPopulate(ddl, 0); - runTest(); - } - - @Test - public void pgUdfs() throws Exception { - Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = - ddlBuilder - .createSchema("s1") - .endNamedSchema() - .createUdf("s1.Foo1") - .dialect(Dialect.POSTGRESQL) - .name("s1.Foo1") - .definition("(SELECT 'bar')") - .endUdf() - .createUdf("s1.Foo2") - .dialect(Dialect.POSTGRESQL) - .name("s1.Foo2") - .definition("(SELECT 'bar')") - .security(SqlSecurity.INVOKER) - .type("TEXT") - .addParameter(UdfParameter.parse("arg0 TEXT", "s1.Foo2", Dialect.POSTGRESQL)) - .addParameter( - UdfParameter.parse("arg1 TEXT DEFAULT 'bar'", "s1.Foo2", Dialect.POSTGRESQL)) - .endUdf() - .createUdf("s1.Foo3") - .dialect(Dialect.POSTGRESQL) - .name("s1.Foo2") - .language("REMOTE") - .type("BIGINT") - .addParameter(UdfParameter.parse("arg0 BIGINT", "s1.Foo3", Dialect.POSTGRESQL)) - .definition( - "{\"endpoint\": \"https://us-central1-myproject.cloudfunctions.net/myfunc\"}") - .endUdf() - .build(); - createAndPopulate(ddl, 0); - runTest(); - } - - @Test - public void sequences() throws Exception { - Ddl.Builder ddlBuilder = Ddl.builder(); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = - ddlBuilder - .createSequence("Sequence1") - .options( - ImmutableList.of( - "sequence_kind=\"bit_reversed_positive\"", - "skip_range_min=0", - "skip_range_max=1000", - "start_with_counter=50")) - .endSequence() - .createSequence("Sequence2") - .options( - ImmutableList.of( - "sequence_kind=\"bit_reversed_positive\"", "start_with_counter=9999")) - .endSequence() - .createSequence("Sequence3") - .options(ImmutableList.of("sequence_kind=\"bit_reversed_positive\"")) - .endSequence() - .createSequence("Sequence4") - .options( - ImmutableList.of( - "sequence_kind=\"default\"", - "skip_range_min=0", - "skip_range_max=1000", - "start_with_counter=50")) - .endSequence() - .createTable("UsersWithSequenceId") - .column("id") - .int64() - .notNull() - .defaultExpression("GET_NEXT_SEQUENCE_VALUE(SEQUENCE Sequence3)") - .endColumn() - .column("first_name") - .string() - .size(10) - .endColumn() - .primaryKey() - .asc("id") - .end() - .endTable() - .build(); - createAndPopulate(ddl, 0); - runTest(); - } - - @Test - public void pgSequences() throws Exception { - Ddl.Builder ddlBuilder = Ddl.builder(Dialect.POSTGRESQL); - List dbOptionList = new ArrayList<>(); - dbOptionList.add( - Export.DatabaseOption.newBuilder() - .setOptionName("default_sequence_kind") - .setOptionValue("\"bit_reversed_positive\"") - .build()); - ddlBuilder.mergeDatabaseOptions(dbOptionList); - Ddl ddl = - ddlBuilder - .createSequence("PGSequence1") - .sequenceKind("bit_reversed_positive") - .counterStartValue(Long.valueOf(50)) - .skipRangeMin(Long.valueOf(0)) - .skipRangeMax(Long.valueOf(1000)) - .endSequence() - .createSequence("PGSequence2") - .sequenceKind("bit_reversed_positive") - .counterStartValue(Long.valueOf(9999)) - .endSequence() - .createSequence("PGSequence3") - .sequenceKind("bit_reversed_positive") - .endSequence() - .createSequence("PGSequence4") - .sequenceKind("default") - .counterStartValue(Long.valueOf(50)) - .skipRangeMin(Long.valueOf(0)) - .skipRangeMax(Long.valueOf(1000)) - .endSequence() - .createTable("PGUsersWithSequenceId") - .column("id") - .pgInt8() - .notNull() - .defaultExpression("nextval('\"PGSequence3\"')") - .endColumn() - .column("first_name") - .pgVarchar() - .size(10) - .endColumn() - .primaryKey() - .asc("id") - .end() - .endTable() - .build(); - - createAndPopulate(ddl, 0); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void randomSchema() throws Exception { - Ddl ddl = RandomDdlGenerator.builder().build().generate(); - createAndPopulate(ddl, 100); - runTest(); - } - - @Test - public void randomPgSchema() throws Exception { - Ddl ddl = RandomDdlGenerator.builder(Dialect.POSTGRESQL).setMaxViews(2).build().generate(); - System.out.println(ddl.prettyPrint()); - createAndPopulate(ddl, 100); - runTest(Dialect.POSTGRESQL); - } - - @Test - public void randomSchemaNoData() throws Exception { - Ddl ddl = RandomDdlGenerator.builder().build().generate(); - createAndPopulate(ddl, 0); - runTest(); - } - - @Test - public void randomPgSchemaNoData() throws Exception { - Ddl ddl = RandomDdlGenerator.builder(Dialect.POSTGRESQL).setMaxViews(2).build().generate(); - createAndPopulate(ddl, 0); - runTest(Dialect.POSTGRESQL); - } - - private void runTest() { - runTest(Dialect.GOOGLE_STANDARD_SQL); - } - - private void runTest(Dialect dialect) { - String tmpDirPath = tmpDir.getRoot().getAbsolutePath(); - ValueProvider.StaticValueProvider destination = - ValueProvider.StaticValueProvider.of(tmpDirPath); - ValueProvider.StaticValueProvider jobId = ValueProvider.StaticValueProvider.of("jobid"); - ValueProvider.StaticValueProvider source = - ValueProvider.StaticValueProvider.of(tmpDirPath + "/jobid"); - - SpannerConfig sourceConfig = spannerServer.getSpannerConfig(sourceDb); - exportPipeline.apply("Export", new ExportTransform(sourceConfig, destination, jobId)); - PipelineResult exportResult = exportPipeline.run(); - exportResult.waitUntilFinish(); - - SpannerConfig destConfig = spannerServer.getSpannerConfig(destinationDb); - importPipeline.apply( - "Import", - new ImportTransform( - destConfig, - source, - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(true), - ValueProvider.StaticValueProvider.of(30), - ValueProvider.StaticValueProvider.of(40))); - PipelineResult importResult = importPipeline.run(); - importResult.waitUntilFinish(); - - PCollection mismatchCount = - comparePipeline.apply("Compare", new CompareDatabases(sourceConfig, destConfig)); - PAssert.that(mismatchCount) - .satisfies( - (x) -> { - assertEquals(Lists.newArrayList(x), Lists.newArrayList(0L)); - return null; - }); - PipelineResult compareResult = comparePipeline.run(); - compareResult.waitUntilFinish(); - - Ddl sourceDdl = readDdl(sourceDb, dialect); - Ddl destinationDdl = readDdl(destinationDb, dialect); - - assertThat(sourceDdl.prettyPrint(), equalToCompressingWhiteSpace(destinationDdl.prettyPrint())); - } - - /* Returns the Ddl representing a Spanner database for given a String for the database name */ - private Ddl readDdl(String db, Dialect dialect) { - DatabaseClient dbClient = spannerServer.getDbClient(db); - Ddl ddl; - try (ReadOnlyTransaction ctx = dbClient.readOnlyTransaction()) { - ddl = new InformationSchemaScanner(ctx, dialect).scan(); - } - return ddl; - } -} diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java index 92ee8da0e5..dc8cd8a68f 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java @@ -64,6 +64,11 @@ * Test coverage for {@link InformationSchemaScanner}. This requires an active GCP project with a * Spanner instance. Hence this test can only be run locally with a project set up using 'gcloud * config'. + * + *

Note: {@link InformationSchemaScanner} and the resulting {@link Ddl} objects are actively used + * for canonical schema assertions in the end-to-end {@link CopyDbIT} integration test. Testing it + * comprehensively here is extremely important to complete the loop of testing the Export and Import + * Pipelines. */ @Category(IntegrationTest.class) public class InformationSchemaScannerIT { diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java index 0e9ab7bd52..a63e5ef59e 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomDdlGenerator.java @@ -158,7 +158,7 @@ public static Builder builder(Dialect dialect) { .setMaxViews(0) .setMaxIndex(2) .setMaxForeignKeys(2) - .setEnableCheckConstraints(true) + .setEnableCheckConstraints(false) .setMaxColumns(8) .setMaxIdLength(11) .setEnableGeneratedColumns(true) @@ -472,7 +472,8 @@ private void generateTable(Ddl.Builder builder, Table parent, int level) { if (interleaved) { columns.set(pk); if (rnd.nextBoolean()) { - filters.add("\"" + pk.name() + "\" IS NOT NULL"); + String q = getDialect() == Dialect.POSTGRESQL ? "\"" : "`"; + filters.add(q + pk.name() + q + " IS NOT NULL"); } } pks.add(pk.name()); @@ -534,7 +535,8 @@ private void generateTable(Ddl.Builder builder, Table parent, int level) { } columns.endIndexColumn(); if (rnd.nextBoolean()) { - filters.add("\"" + columnName + "\" IS NOT NULL"); + String q = (getDialect() == Dialect.POSTGRESQL) ? "\"" : "`"; + filters.add(q + columnName + q + " IS NOT NULL"); } } } @@ -562,11 +564,17 @@ private void generateTable(Ddl.Builder builder, Table parent, int level) { foreignKeyBuilder.columnsBuilder().add(pk.name()); foreignKeyBuilder.referencedColumnsBuilder().add(pk.name()); } + boolean isEnforced = true; if (rnd.nextBoolean()) { - foreignKeyBuilder.referentialAction(Optional.of(generateRandomReferentialAction(rnd))); + isEnforced = rnd.nextBoolean(); + foreignKeyBuilder.isEnforced(isEnforced); } if (rnd.nextBoolean()) { - foreignKeyBuilder.isEnforced(rnd.nextBoolean()); + ReferentialAction action = generateRandomReferentialAction(rnd); + if (!isEnforced && action == ReferentialAction.ON_DELETE_CASCADE) { + action = ReferentialAction.ON_DELETE_NO_ACTION; + } + foreignKeyBuilder.referentialAction(Optional.of(action)); } ForeignKey foreignKey = foreignKeyBuilder.build(); if (foreignKey.columns().size() > 0) { diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java index 978af58e44..15b3cfc23e 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomInsertMutationGenerator.java @@ -38,6 +38,8 @@ import java.util.Random; import java.util.Set; import java.util.function.Supplier; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Stream; /** @@ -148,19 +150,106 @@ public TableSupplier(Table table) { this.table = checkNotNull(table); RandomValueGenerator randomValueGenerator = RandomValueGenerator.defaultInstance(); Dialect dialect = table.dialect(); - Set primaryKeyNameSet = new HashSet<>(); - if (dialect == Dialect.POSTGRESQL) { - for (IndexColumn primaryKey : table.primaryKeys()) { - primaryKeyNameSet.add(primaryKey.name()); + Set pkNameSet = new HashSet<>(); + Set notNullNameSet = new HashSet<>(); + for (IndexColumn primaryKey : table.primaryKeys()) { + pkNameSet.add(primaryKey.name().toLowerCase()); + notNullNameSet.add(primaryKey.name().toLowerCase()); + } + + // Infer unique indexes to avoid null duplicates + for (String indexStr : table.indexes()) { + if (indexStr.toLowerCase().contains("unique ")) { + for (Column column : table.columns()) { + if (indexStr.toLowerCase().contains(column.name().toLowerCase())) { + notNullNameSet.add(column.name().toLowerCase()); + } + } + } + } + + // Infer check constraints to pick from allowed IN clause values + Map> checkConstraintValues = new HashMap<>(); + for (String checkStr : table.checkConstraints()) { + for (Column column : table.columns()) { + if (checkStr.toLowerCase().contains(column.name().toLowerCase())) { + Matcher matcher = Pattern.compile("'([^']*)'").matcher(checkStr); + List values = new ArrayList<>(); + while (matcher.find()) { + values.add(matcher.group(1)); + } + if (!values.isEmpty()) { + checkConstraintValues.put(column.name().toLowerCase(), values); + } + } } } + + Random rand = new Random(); for (Column column : table.columns()) { if (!column.isGenerated() && !column.isIdentityColumn()) { - valueGenerators.put( - column.name(), - randomValueGenerator - .valueStream(column, primaryKeyNameSet.contains(column.name())) - .iterator()); + Iterator iterator; + String colNameLower = column.name().toLowerCase(); + + if (checkConstraintValues.containsKey(colNameLower)) { + List allowed = checkConstraintValues.get(colNameLower); + if (pkNameSet.contains(colNameLower)) { + iterator = + Stream.generate(() -> Value.string(allowed.get(rand.nextInt(allowed.size())))) + .iterator(); + } else if (notNullNameSet.contains(colNameLower)) { + iterator = + new Iterator() { + int index = 0; + + @Override + public boolean hasNext() { + return true; + } + + @Override + public Value next() { + if (index < allowed.size()) { + return Value.string(allowed.get(index++)); + } + // Return null when exhausted to avoid unique index collision + return Value.string(null); + } + }; + } else { + iterator = + Stream.generate(() -> Value.string(allowed.get(rand.nextInt(allowed.size())))) + .iterator(); + } + } else if (notNullNameSet.contains(colNameLower) + && (column.type().getCode() + == com.google.cloud.teleport.spanner.common.Type.Code.STRING + || column.type().getCode() + == com.google.cloud.teleport.spanner.common.Type.Code.PG_VARCHAR + || column.type().getCode() + == com.google.cloud.teleport.spanner.common.Type.Code.PG_TEXT)) { + // Ensure unique strings for unique indexes to avoid collision + iterator = + Stream.generate( + () -> { + String base = + column.name() + "_" + rand.nextInt(100000) + rand.nextInt(100000); + if (column.size() != null + && column.size() > 0 + && base.length() > column.size()) { + base = base.substring(0, column.size()); + } + return Value.string(base); + }) + .iterator(); + } else { + iterator = + randomValueGenerator + .valueStream(column, notNullNameSet.contains(colNameLower)) + .iterator(); + } + + valueGenerators.put(column.name(), iterator); } } } @@ -204,6 +293,18 @@ public Mutation generateMutation(Map overrides) { String string = value.isNull() ? null : value.getString(); builder.set(columnName).to(string); break; + case JSON: + case PG_JSONB: + String json = value.isNull() ? null : value.getString(); + if (value.getType().getCode() == com.google.cloud.spanner.Type.Code.JSON) { + builder.set(columnName).to(com.google.cloud.spanner.Value.json(json)); + } else { + builder.set(columnName).to(com.google.cloud.spanner.Value.pgJsonb(json)); + } + break; + case UUID: + builder.set(columnName).to(value); + break; case TIMESTAMP: Timestamp timestamp = value.isNull() ? null : value.getTimestamp(); builder.set(columnName).to(timestamp); @@ -258,6 +359,17 @@ public Mutation generateMutation(Map overrides) { List pgNumerics = value.isNull() ? null : value.getStringArray(); builder.set(columnName).toPgNumericArray(pgNumerics); break; + case JSON: + List jsons = value.isNull() ? null : value.getStringArray(); + builder.set(columnName).toJsonArray(jsons); + break; + case PG_JSONB: + List pgJsonbs = value.isNull() ? null : value.getStringArray(); + builder.set(columnName).toPgJsonbArray(pgJsonbs); + break; + case UUID: + builder.set(columnName).to(value); + break; } break; default: diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomValueGenerator.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomValueGenerator.java index f1c6e151a9..5654005f5a 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomValueGenerator.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/RandomValueGenerator.java @@ -98,6 +98,13 @@ private Value generateNullOrNaNValue(Type type) { return Value.pgNumeric("NaN"); } return Value.pgNumeric(null); + case JSON: + return Value.json((String) null); + case PG_JSONB: + return Value.pgJsonb((String) null); + case UUID: + case PG_UUID: + return Value.string(null); case ARRAY: case PG_ARRAY: switch (type.getArrayElementType().getCode()) { @@ -120,6 +127,13 @@ private Value generateNullOrNaNValue(Type type) { case PG_TEXT: case PG_VARCHAR: return Value.stringArray(null); + case JSON: + return Value.jsonArray(null); + case PG_JSONB: + return Value.pgJsonbArray(null); + case UUID: + case PG_UUID: + return Value.uuidArray(null); case DATE: case PG_DATE: return Value.dateArray(null); @@ -145,40 +159,54 @@ private Value generate(Column column) { switch (type.getArrayElementType().getCode()) { case BOOL: case PG_BOOL: - return Value.boolArray(generateList(random::nextBoolean)); + return Value.boolArray(generateList(random::nextBoolean, column)); case INT64: case PG_INT8: - return Value.int64Array(generateList(random::nextLong)); + return Value.int64Array(generateList(random::nextLong, column)); case FLOAT32: case PG_FLOAT4: - return Value.float32Array(generateList(random::nextFloat)); + return Value.float32Array(generateList(random::nextFloat, column)); case FLOAT64: case PG_FLOAT8: - return Value.float64Array(generateList(random::nextDouble)); + return Value.float64Array(generateList(random::nextDouble, column)); case BYTES: case PG_BYTEA: - return Value.bytesArray(generateList(() -> randomByteArray(column.size()))); + return Value.bytesArray(generateList(() -> randomByteArray(column.size()), column)); case STRING: case PG_VARCHAR: case PG_TEXT: - return Value.stringArray(generateList(() -> randomString(column.size()))); + return Value.stringArray(generateList(() -> randomString(column.size()), column)); + case JSON: + return Value.jsonArray(generateList(() -> "{\"key\":\"value\"}", column)); + case PG_JSONB: + return Value.pgJsonbArray(generateList(() -> "{\"key\":\"value\"}", column)); + case UUID: + case PG_UUID: + return Value.uuidArray(generateList(() -> java.util.UUID.randomUUID(), column)); case DATE: case PG_DATE: - return Value.dateArray(generateList(this::randomDate)); + return Value.dateArray(generateList(this::randomDate, column)); case TIMESTAMP: case PG_TIMESTAMPTZ: - return Value.timestampArray(generateList(this::randomTimestamp)); + return Value.timestampArray(generateList(this::randomTimestamp, column)); + case NUMERIC: + return Value.numericArray(generateList(this::randomNumeric, column)); case PG_NUMERIC: - return Value.pgNumericArray(generateList(this::randomPgNumeric)); + return Value.pgNumericArray(generateList(this::randomPgNumeric, column)); } throw new IllegalArgumentException("Unexpected type " + type); } - private List generateList(Supplier v) { + private List generateList(Supplier v, Column column) { int size = random.nextInt(10); + boolean allowNull = true; + if (column != null && column.arrayLength() != null) { + size = column.arrayLength(); + allowNull = false; + } List result = new ArrayList<>(); for (int i = 0; i < size; i++) { - T value = random.nextInt(100) < arrayNullThreshold ? null : v.get(); + T value = allowNull && random.nextInt(100) < arrayNullThreshold ? null : v.get(); result.add(value); } return result; @@ -220,12 +248,42 @@ private Value generateScalar(Column column) { { return Value.timestamp(randomTimestamp()); } + case JSON: + return Value.json("{\"key\":\"value\"}"); + case PG_JSONB: + return Value.pgJsonb("{\"key\":\"value\"}"); + case UUID: + case PG_UUID: + return Value.uuid(java.util.UUID.randomUUID()); + case NUMERIC: + return Value.numeric(randomNumeric()); case PG_NUMERIC: return Value.pgNumeric(randomPgNumeric()); } throw new IllegalArgumentException("Unexpected type " + type); } + private java.math.BigDecimal randomNumeric() { + int leftSize = random.nextInt(NumericUtils.PRECISION - NumericUtils.SCALE) + 1; + int rightSize = random.nextInt(NumericUtils.SCALE + 1); + StringBuilder sb = new StringBuilder(); + if (leftSize == 1) { + sb.append(0); + } else { + sb.append(random.nextInt(9) + 1); + } + for (int i = 1; i < leftSize; i++) { + sb.append(random.nextInt(10)); + } + if (rightSize > 0) { + sb.append("."); + for (int i = 0; i < rightSize; i++) { + sb.append(random.nextInt(10)); + } + } + return new java.math.BigDecimal(sb.toString()); + } + private String randomPgNumeric() { int leftSize = random.nextInt(NumericUtils.PG_MAX_PRECISION - NumericUtils.PG_MAX_SCALE) + 1; int rightSize = random.nextInt(NumericUtils.PG_MAX_SCALE + 1); diff --git a/v1/src/test/resources/CopyDbIT-gsql.sql b/v1/src/test/resources/CopyDbIT-gsql.sql new file mode 100644 index 0000000000..70fe2685d2 --- /dev/null +++ b/v1/src/test/resources/CopyDbIT-gsql.sql @@ -0,0 +1,206 @@ +ALTER DATABASE `%DATABASE_NAME%` SET OPTIONS ( optimizer_version = 1, default_leader = 'us-east4' ); + +CREATE PLACEMENT `my_placement` OPTIONS(instance_partition='default'); + +-- With Named Schema +CREATE SCHEMA `my_schema`; + +CREATE SEQUENCE `my_schema`.`MySequence` OPTIONS (sequence_kind = 'bit_reversed_positive'); + +CREATE TABLE `my_schema`.`MyTable` ( + `id` INT64 NOT NULL, + `data` STRING(MAX), +) PRIMARY KEY (`id` ASC); + +CREATE INDEX `my_schema`.`MyTable_data_idx` ON `my_schema`.`MyTable`(`data` ASC); + +CREATE VIEW `my_schema`.`MyView` SQL SECURITY INVOKER AS SELECT t.id FROM `my_schema`.`MyTable` t; + +ALTER TABLE `my_schema`.`MyTable` ADD COLUMN `tokens` TOKENLIST AS (TOKENIZE_SUBSTRING(data)) HIDDEN; + +CREATE PROPERTY GRAPH `my_schema`.`MyGraph` + NODE TABLES (`my_schema`.`MyTable`); + +CREATE SEARCH INDEX `my_schema`.`MySearchIdx` ON `my_schema`.`MyTable`(`tokens`); + +-- Without Named Schema + +-- Roles +CREATE ROLE `hr_manager`; +CREATE ROLE `sales_manager`; + +-- Tables with interleaving +CREATE TABLE `Users` ( + `first_name` STRING(MAX), + `last_name` STRING(5), + `age` INT64, +) PRIMARY KEY (`first_name` ASC, `last_name` DESC); + +-- All Datatypes +CREATE TABLE `AllTYPES` ( + `first_name` STRING(MAX), + `last_name` STRING(5), + `id` INT64 NOT NULL, + `bool_field` BOOL, + `int64_field` INT64, + `float32_field` FLOAT32, + `float64_field` FLOAT64, + `numeric_field` NUMERIC, + `string_field` STRING(MAX), + `bytes_field` BYTES(MAX), + `timestamp_field` TIMESTAMP, + `date_field` DATE, + `json_field` JSON, + `uuid_field` UUID, + + `arr_bool_field` ARRAY, + `arr_int64_field` ARRAY, + `arr_float32_field` ARRAY, + `arr_float64_field` ARRAY, + `arr_numeric_field` ARRAY, + `arr_string_field` ARRAY, + `arr_bytes_field` ARRAY, + `arr_timestamp_field` ARRAY, + `arr_date_field` ARRAY, + `arr_json_field` ARRAY, + `arr_uuid_field` ARRAY, +) PRIMARY KEY (`first_name` ASC, `last_name` DESC, `id` ASC), +INTERLEAVE IN PARENT `Users` ON DELETE CASCADE; + +-- Comprehensive Primary Key Types +CREATE TABLE `PK_Bool` ( `key` BOOL, `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_Int64` ( `key` INT64, `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_Float64` ( `key` FLOAT64, `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_Numeric` ( `key` NUMERIC, `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_String` ( `key` STRING(MAX), `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_Bytes` ( `key` BYTES(MAX), `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_Date` ( `key` DATE, `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_Timestamp` ( `key` TIMESTAMP, `val` INT64 ) PRIMARY KEY (`key` ASC); +CREATE TABLE `PK_Uuid` ( `key` UUID, `val` INT64 ) PRIMARY KEY (`key` ASC); + +-- Generated and Identity Primary Keys +CREATE TABLE `PK_AutoUuid` ( + `id` UUID DEFAULT (NEW_UUID()), + `val` INT64 +) PRIMARY KEY (`id` ASC); + +CREATE TABLE `PK_Generated` ( + `id` INT64 NOT NULL, + `gen_id` INT64 AS (`id` + 1) STORED, + `val` INT64 +) PRIMARY KEY (`gen_id` ASC); + +CREATE TABLE `PK_Identity` ( + `id` INT64 GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), + `val` INT64 +) PRIMARY KEY (`id` ASC); + +-- Foreign Keys +CREATE TABLE `Ref` ( + `id1` INT64, + `id2` INT64, +) PRIMARY KEY (`id1` ASC, `id2` ASC); + +CREATE TABLE `Child` ( + `id1` INT64, + `id2` INT64, + `id3` INT64, +) PRIMARY KEY (`id1` ASC, `id2` ASC, `id3` ASC), +INTERLEAVE IN PARENT `Ref`; + +ALTER TABLE `Child` ADD CONSTRAINT `fk1` FOREIGN KEY (`id1`) REFERENCES `Ref` (`id1`); +ALTER TABLE `Child` ADD CONSTRAINT `fk2` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`); + +-- Sequences +CREATE SEQUENCE `MySequence` OPTIONS (sequence_kind='bit_reversed_positive'); +CREATE SEQUENCE `CounterSequence` OPTIONS (sequence_kind='bit_reversed_positive', start_with_counter=100); +CREATE SEQUENCE `SkipRangeSequence` OPTIONS (sequence_kind='bit_reversed_positive', skip_range_min=1, skip_range_max=1000); + +-- Comprehensive Schema Features +CREATE TABLE `Authors` ( + `author_id` INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE MySequence)), + `name` STRING(MAX), + `author_type` STRING(20) DEFAULT ('USER'), + `placement_key` STRING(MAX), + `hidden_col` STRING(MAX) HIDDEN, + `score` INT64 GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE SKIP RANGE 1, 1000 START COUNTER WITH 100), +) PRIMARY KEY (`author_id` ASC); + +CREATE TABLE `Articles` ( + `author_id` INT64 NOT NULL, + `article_id` INT64 NOT NULL DEFAULT (GET_NEXT_SEQUENCE_VALUE(SEQUENCE CounterSequence)), + `title` STRING(MAX), + `content` STRING(MAX), + `tags` ARRAY, + `embeddings` ARRAY(vector_length=>3), + `publish_date` TIMESTAMP OPTIONS (allow_commit_timestamp=TRUE), + `status` STRING(20), + `search_tokens` TOKENLIST AS (TOKENIZE_FULLTEXT(`content`)) HIDDEN, + CONSTRAINT `status_check` CHECK (`status` IN ('DRAFT', 'PUBLISHED')), +) PRIMARY KEY (`author_id` ASC, `article_id` ASC), +INTERLEAVE IN PARENT `Authors` ON DELETE CASCADE; + +CREATE TABLE `Comments` ( + `author_id` INT64 NOT NULL, + `article_id` INT64 NOT NULL, + `comment_id` INT64 NOT NULL, + `text` STRING(MAX), +) PRIMARY KEY (`author_id` ASC, `article_id` ASC, `comment_id` ASC), +INTERLEAVE IN PARENT `Articles`; + +CREATE TABLE `GraphNodes` ( + `node_key` INT64, + `dynamic_label_col` STRING(MAX), +) PRIMARY KEY (`node_key` ASC); + +CREATE TABLE `GraphEdges` ( + `edge_key` INT64, + `source_node` INT64, + `dest_node` INT64, +) PRIMARY KEY (`edge_key` ASC); + +-- Indexes +CREATE INDEX `AuthorNameIdx` ON `Authors`(`name` DESC); +CREATE UNIQUE INDEX `UniqueArticleTitle` ON `Articles`(`title` ASC) STORING (`publish_date`); +CREATE UNIQUE NULL_FILTERED INDEX `NullFilteredIdx` ON `Articles`(`status` ASC); +CREATE VECTOR INDEX `ArticlesEmbeddingIdx` ON `Articles`(`embeddings`) WHERE `embeddings` IS NOT NULL OPTIONS (distance_type='COSINE'); +CREATE SEARCH INDEX `ArticlesSearchIdx` ON `Articles`(`search_tokens`); + +-- Grants +GRANT SELECT ON TABLE `Authors` TO ROLE `hr_manager`; +GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE `Articles` TO ROLE `sales_manager`; + +-- Views +CREATE VIEW `AuthorStatsView` SQL SECURITY INVOKER AS SELECT `Articles`.`author_id`, COUNT(*) as `num_articles` FROM `Articles` GROUP BY `Articles`.`author_id`; +CREATE VIEW `DefinerView` SQL SECURITY DEFINER AS SELECT `Articles`.`title` FROM `Articles` WHERE `Articles`.`status` = 'PUBLISHED'; + +-- Change Streams +CREATE CHANGE STREAM `AllStream` FOR ALL OPTIONS (retention_period="7d", value_capture_type="OLD_AND_NEW_VALUES"); +CREATE CHANGE STREAM `TableStream` FOR `Authors`, `Articles`(`title`, `content`), `Comments`(); + +-- Property Graphs +CREATE PROPERTY GRAPH `DynamicGraph` +NODE TABLES( +`GraphNodes` AS `node_alias` + KEY (`node_key`) +LABEL `dummy_label` PROPERTIES(`node_key`) +LABEL `dynamic_label` NO PROPERTIES +DYNAMIC LABEL(`dynamic_label_col`) +) +EDGE TABLES( +`GraphEdges` AS `edge_alias` + KEY (`edge_key`) +SOURCE KEY(`source_node`) REFERENCES `node_alias`(`node_key`) DESTINATION KEY(`dest_node`) REFERENCES `node_alias`(`node_key`) +LABEL `edge_label` NO PROPERTIES +); + +-- Models +CREATE MODEL `m_simpleModel_text_bison` +INPUT ( `prompt` STRING(MAX) ) +OUTPUT ( `content` STRING(MAX) ) +REMOTE OPTIONS (endpoint='//aiplatform.googleapis.com/projects/%PROJECT_ID%/locations/us-central1/publishers/google/models/text-bison'); + +-- Functions (UDFs) +CREATE FUNCTION `my_schema`.`MyCustomFunction`(`input_val` INT64) RETURNS INT64 AS (`input_val` * 2); + + diff --git a/v1/src/test/resources/CopyDbIT-pg.sql b/v1/src/test/resources/CopyDbIT-pg.sql new file mode 100644 index 0000000000..d4dbc3e6b6 --- /dev/null +++ b/v1/src/test/resources/CopyDbIT-pg.sql @@ -0,0 +1,142 @@ +CREATE SCHEMA "my_schema"; + +CREATE SEQUENCE my_schema."MySequence" BIT_REVERSED_POSITIVE; + +CREATE TABLE my_schema."MyTable" ( + "id" bigint NOT NULL, + "data" text, + PRIMARY KEY ("id") +); + +CREATE INDEX "my_schema_MyTable_data_idx" ON my_schema."MyTable"("data"); + +CREATE VIEW my_schema."MyView" SQL SECURITY INVOKER AS SELECT t.id FROM my_schema."MyTable" t; + +ALTER TABLE my_schema."MyTable" ADD COLUMN "tokens" spanner.tokenlist HIDDEN GENERATED ALWAYS AS (spanner.tokenize_substring("data")) STORED; + +CREATE SEARCH INDEX "MySearchIdx" ON my_schema."MyTable"("tokens"); + +ALTER DATABASE "%DATABASE_NAME%" SET spanner.optimizer_version = 1; + +-- Roles +CREATE ROLE "hr_manager"; +CREATE ROLE "sales_manager"; + +CREATE TABLE "Users" ( + "first_name" character varying, + "last_name" character varying(5), + "age" bigint, + PRIMARY KEY ("first_name", "last_name") +); + +-- All Datatypes +CREATE TABLE "AllTYPES" ( + "id" bigint NOT NULL, + "first_name" character varying, + "last_name" character varying(5), + "bool_field" boolean, + "int_field" bigint, + "float32_field" real, + "float64_field" double precision, + "string_field" text, + "bytes_field" bytea, + "timestamp_field" timestamp with time zone, + "numeric_field" numeric, + "date_field" date, + "json_field" jsonb, + "uuid_field" uuid, + + "arr_bool_field" boolean[], + "arr_int_field" bigint[], + "arr_float32_field" real[], + "arr_float64_field" double precision[], + "arr_string_field" character varying[], + "arr_bytes_field" bytea[], + "arr_timestamp_field" timestamp with time zone[], + "arr_date_field" date[], + "arr_numeric_field" numeric[], + "arr_json_field" jsonb[], + "arr_uuid_field" uuid[], + + PRIMARY KEY ("first_name", "last_name", "id") +) +INTERLEAVE IN PARENT "Users" ON DELETE CASCADE; + +CREATE TABLE "PK_Float64" ( "key" double precision, "val" bigint, PRIMARY KEY ("key") ); +CREATE TABLE "PK_Uuid" ( "key" uuid, "val" bigint, PRIMARY KEY ("key") ); + +-- Generated and Identity Primary Keys +CREATE TABLE "PK_AutoUuid" ( + "id" uuid DEFAULT gen_random_uuid(), + "val" bigint, + PRIMARY KEY ("id") +); + +CREATE TABLE "PK_Generated" ( + "id" bigint NOT NULL, + "gen_id" bigint GENERATED ALWAYS AS ("id" + 1) STORED, + "val" bigint, + PRIMARY KEY ("gen_id") +); + +CREATE TABLE "PK_Identity" ( + "id" bigint GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE), + "val" bigint, + PRIMARY KEY ("id") +); + +-- Foreign Keys +CREATE TABLE "Ref" ( + "id1" bigint, + "id2" bigint, + PRIMARY KEY ("id1", "id2") +); +CREATE TABLE "Child" ( + "id1" bigint, + "id2" bigint, + "id3" bigint, + PRIMARY KEY ("id1", "id2", "id3") +) +INTERLEAVE IN PARENT "Ref"; +ALTER TABLE "Child" ADD CONSTRAINT "fk1" FOREIGN KEY ("id1") REFERENCES "Ref" ("id1"); + +-- Sequences +CREATE SEQUENCE mysequence BIT_REVERSED_POSITIVE; +CREATE SEQUENCE countersequence BIT_REVERSED_POSITIVE START COUNTER WITH 100; + +CREATE TABLE "Authors" ( + "author_id" bigint NOT NULL DEFAULT nextval('mysequence'), + "name" text, + "author_type" character varying(20) DEFAULT 'USER', + "placement_key" text, + "hidden_col" text HIDDEN, + "score" bigint GENERATED BY DEFAULT AS IDENTITY (BIT_REVERSED_POSITIVE START COUNTER WITH 100), + PRIMARY KEY ("author_id") +); + +CREATE TABLE "Articles" ( + "author_id" bigint NOT NULL, + "article_id" bigint NOT NULL DEFAULT nextval('countersequence'), + "title" text, + "content" text, + "tags" character varying[], + "publish_date" timestamp with time zone, + "status" character varying(20), + PRIMARY KEY ("author_id", "article_id") +) +INTERLEAVE IN PARENT "Authors" ON DELETE CASCADE; + +-- Indexes +CREATE INDEX "AuthorNameIdx" ON "Authors"("name" DESC NULLS FIRST); +CREATE UNIQUE INDEX "UniqueArticleTitle" ON "Articles"("title" ASC NULLS LAST) INCLUDE ("publish_date"); +CREATE UNIQUE INDEX "NullFilteredIdxPG" ON "Articles"("status") WHERE "status" IS NOT NULL; + +-- Grants +GRANT SELECT ON TABLE "Authors" TO "hr_manager"; +GRANT SELECT, INSERT, UPDATE, DELETE ON TABLE "Articles" TO "sales_manager"; + +-- Views +CREATE VIEW "AuthorStatsView" SQL SECURITY INVOKER AS SELECT "Articles"."author_id", COUNT(*) as "num_articles" FROM "Articles" GROUP BY "Articles"."author_id"; + +-- Change Streams +CREATE CHANGE STREAM "AllStream" FOR ALL WITH (retention_period='7d', value_capture_type='OLD_AND_NEW_VALUES');