diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java index c31e8fe55..f32ef0dd7 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CassandraTypes.java @@ -37,6 +37,7 @@ public abstract class CassandraTypes { public static final Pattern COLLECTION_PATTERN = Pattern.compile("^(set|list|map|tuple)<(.+)>$", Pattern.CASE_INSENSITIVE); + public static final Pattern VECTOR_PATTERN = Pattern.compile("^(vector)<(.+),(.*)>$", Pattern.CASE_INSENSITIVE); public static final Pattern FROZEN_PATTERN = Pattern.compile("^frozen<(.*)>$", Pattern.CASE_INSENSITIVE); private final UDTs udts = new UDTs(); @@ -133,6 +134,8 @@ public List supportedTypes() public abstract CqlField.CqlList list(CqlField.CqlType type); + public abstract CqlField.CqlVector vector(CqlField.CqlType type, int dimentions); + public abstract CqlField.CqlSet set(CqlField.CqlType type); public abstract CqlField.CqlMap map(CqlField.CqlType keyType, CqlField.CqlType valueType); @@ -189,6 +192,14 @@ public CqlField.CqlType parseType(String type, Map udts .map(collectionType -> parseType(collectionType, udts)) .toArray(CqlField.CqlType[]::new)); } + Matcher vectorMatcher = VECTOR_PATTERN.matcher(type); + if (vectorMatcher.find()) + { + // CQL vector + String subType = vectorMatcher.group(2); + int dimensions = Integer.parseInt(vectorMatcher.group(3).trim()); + return vector(parseType(subType, udts), dimensions); + } Matcher frozenMatcher = FROZEN_PATTERN.matcher(type); if (frozenMatcher.find()) { diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java index 1c15fad2d..b228e36e0 100644 --- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java +++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/data/CqlField.java @@ -67,7 +67,7 @@ public interface CqlType extends Serializable { enum InternalType { - NativeCql, Set, List, Map, Frozen, Udt, Tuple; + NativeCql, Set, List, Map, Frozen, Udt, Tuple, Vector; public static InternalType fromString(String name) { @@ -77,6 +77,8 @@ public static InternalType fromString(String name) return Set; case "list": return List; + case "vector": + return Vector; case "map": return Map; case "tuple": @@ -237,6 +239,10 @@ public interface CqlList extends CqlCollection { } + public interface CqlVector extends CqlCollection + { + } + public interface CqlTuple extends CqlCollection { ByteBuffer serializeTuple(Object[] values); diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java index 210549f28..eea5cdd1e 100644 --- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java +++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/endtoend/DataTypeTests.java @@ -33,17 +33,21 @@ import org.junit.jupiter.params.provider.MethodSource; import org.apache.cassandra.bridge.CassandraBridge; +import org.apache.cassandra.bridge.CassandraVersion; import org.apache.cassandra.spark.TestUtils; import org.apache.cassandra.spark.Tester; import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.utils.RandomUtils; import org.apache.cassandra.spark.utils.test.TestSchema; import org.apache.spark.sql.Row; +import org.quicktheories.core.Gen; import scala.collection.mutable.AbstractSeq; import static org.apache.cassandra.spark.utils.ScalaConversionUtils.mutableSeqAsJavaList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; import static org.quicktheories.QuickTheory.qt; +import static org.quicktheories.generators.SourceDSL.arbitrary; @Tag("Sequential") public class DataTypeTests @@ -103,6 +107,104 @@ public void testSet(CassandraBridge bridge) ); } + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVector(CassandraBridge bridge) + { + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().forAll(vectorSupportedTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(type, 10))) + .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorVector(CassandraBridge bridge) + { + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().forAll(vectorSupportedTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(bridge.vector(type, 2), 5))) + .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorList(CassandraBridge bridge) + { + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().forAll(vectorSupportedTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(bridge.list(type), 3))) + .withExpectedRowCountPerSSTable(Tester.DEFAULT_NUM_ROWS) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorUDT(CassandraBridge bridge) + { + // pk -> a vector>, 10> + // Test vector of UDTs + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().withExamples(10) + .forAll(vectorSupportedTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector( + bridge.udt("keyspace", "nested_udt") + .withField("x", bridge.aInt()) + .withField("y", type) + .withField("z", bridge.aInt()) + .build().frozen(), + 10))) + .run(bridge.getVersion()) + ); + } + + @ParameterizedTest + @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") + public void testVectorTuple(CassandraBridge bridge) + { + // pk -> a vector>, 7> + // Test tuple nested within vector + assumeThat(bridge.getVersion().versionNumber()).isGreaterThanOrEqualTo(CassandraVersion.FIVEZERO.versionNumber()); + qt().withExamples(10) + .forAll(vectorSupportedTypes(bridge)) + .checkAssert(type -> + Tester.builder(TestSchema.builder(bridge) + .withPartitionKey("pk", bridge.uuid()) + .withColumn("a", bridge.vector(bridge.tuple(type, + bridge.aFloat(), + bridge.text()).frozen(), 7))) + .run(bridge.getVersion()) + ); + } + + private static Gen vectorSupportedTypes(CassandraBridge bridge) + { + // TODO: Vector of durations fail, because we cannot replace DurationSerializer with + // AnalyticsDurationSerializer across all serializers used by VectorType. + // TODO: Fix for CASSANDRA-20979 required. + List supportedTypes = bridge.supportedTypes().stream() + .filter(t -> !t.equals(bridge.date()) && !t.equals(bridge.time()) && !t.equals(bridge.duration())) + .collect(Collectors.toList()); + return arbitrary().pick(supportedTypes); + } + @ParameterizedTest @MethodSource("org.apache.cassandra.bridge.VersionRunner#bridges") public void testList(CassandraBridge bridge) diff --git a/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java b/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java index 5c4b9352d..6610a2c39 100644 --- a/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java +++ b/cassandra-analytics-spark-four-zero-converter/src/main/java/org/apache/cassandra/spark/data/converter/SparkSqlTypeConverterImplementation.java @@ -166,6 +166,10 @@ protected static SparkType getOrThrow(CqlField.CqlType cqlType) { return new SparkSet(INSTANCE, (CqlField.CqlSet) cqlType); } + else if (cqlType instanceof CqlField.CqlVector) + { + return new SparkList(INSTANCE, (CqlField.CqlVector) cqlType); + } else if (cqlType instanceof CqlField.CqlList) { return new SparkList(INSTANCE, (CqlField.CqlList) cqlType); diff --git a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java index 0701ae280..ca34ea49b 100644 --- a/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java +++ b/cassandra-bridge/src/main/java/org/apache/cassandra/bridge/CassandraBridge.java @@ -316,6 +316,11 @@ public CqlField.CqlList list(CqlField.CqlType type) return cassandraTypes().list(type); } + public CqlField.CqlVector vector(CqlField.CqlType type, int dimensions) + { + return cassandraTypes().vector(type, dimensions); + } + public CqlField.CqlSet set(CqlField.CqlType type) { return cassandraTypes().set(type); diff --git a/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/VectorTypeTests.java b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/VectorTypeTests.java new file mode 100644 index 000000000..fc44485c2 --- /dev/null +++ b/cassandra-five-zero-bridge/src/test/java/org/apache/cassandra/spark/data/converter/types/VectorTypeTests.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data.converter.types; + +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Test; + +import org.apache.cassandra.bridge.CassandraBridgeImplementation; +import org.apache.cassandra.spark.data.complex.CqlList; +import org.apache.cassandra.spark.data.complex.CqlVector; + +import static org.assertj.core.api.Assertions.assertThat; + +public class VectorTypeTests +{ + private static final CassandraBridgeImplementation BRIDGE = new CassandraBridgeImplementation(); + + @Test + public void testSimpleTypeConversion() + { + CqlVector cqlVector = new CqlVector(org.apache.cassandra.spark.data.types.Float.INSTANCE, 3); + Object cqlWriterObj = cqlVector.convertForCqlWriter(List.of(3.14f, 0.0f, -1f), BRIDGE.getVersion(), false); + assertThat(cqlWriterObj).isInstanceOf(List.class); + List cqlWriterList = (List) cqlWriterObj; + assertThat(cqlWriterList).containsExactly(3.14f, 0.0f, -1f); + } + + @Test + public void testComplexTypeConversion() + { + CqlVector cqlVector = new CqlVector(CqlList.set(org.apache.cassandra.spark.data.types.Float.INSTANCE), 3); + Object cqlWriterObj = cqlVector.convertForCqlWriter(List.of(Set.of(3.14f, 0f), Set.of(1f), Set.of()), BRIDGE.getVersion(), false); + assertThat(cqlWriterObj).isInstanceOf(List.class); + List> cqlWriterList = (List>) cqlWriterObj; + assertThat(cqlWriterList).hasSize(3); + assertThat(cqlWriterList.get(0)).containsExactlyInAnyOrder(3.14f, 0f); + assertThat(cqlWriterList.get(1)).containsExactly(1f); + assertThat(cqlWriterList.get(2)).isEmpty(); + } +} diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java index a13071c11..f520ddc4a 100644 --- a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/bridge/CassandraTypesImplementation.java @@ -32,6 +32,8 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.locator.SimpleSnitch; import org.apache.cassandra.security.EncryptionContext; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.complex.CqlVector; public class CassandraTypesImplementation extends AbstractCassandraTypes { @@ -88,4 +90,10 @@ protected static void setupCommitLogConfigs(Path path) DatabaseDescriptor.getRawConfig().commitlog_total_space = new DataStorageSpec.IntMebibytesBound(1024); DatabaseDescriptor.setCommitLogSegmentMgrProvider(commitLog -> new CommitLogSegmentManagerStandard(commitLog, commitLogPath.toString())); } + + @Override + public CqlField.CqlVector vector(CqlField.CqlType type, int dimensions) + { + return new CqlVector(type, dimensions); + } } diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java new file mode 100644 index 000000000..df9c6aec6 --- /dev/null +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/data/complex/CqlVector.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.data.complex; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.cassandra.bridge.CassandraVersion; +import org.apache.cassandra.cql3.functions.types.SettableByIndexData; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.VectorType; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.serializers.TypeSerializer; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlType; +import org.apache.cassandra.utils.TimeUUID; + +import static org.apache.cassandra.spark.data.CqlField.NO_TTL; + +public class CqlVector extends CqlCollection implements CqlField.CqlVector +{ + private final int dimensions; + + public CqlVector(CqlField.CqlType type, int dimensions) + { + super(type); + this.dimensions = dimensions; + } + + @Override + public AbstractType dataType(boolean isMultiCell) + { + return VectorType.getInstance(((CqlType) type()).dataType(), dimensions); + } + + @Override + public InternalType internalType() + { + return InternalType.Vector; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer serializer() + { + return (TypeSerializer) dataType(false).getSerializer(); + } + + @Override + public String name() + { + return "vector"; + } + + @Override + public String cqlName() + { + return String.format("%s<%s, %d>", + internalType().name().toLowerCase(), + types.get(0).cqlName(), + dimensions); + } + + @Override + protected void setInnerValueInternal(SettableByIndexData udtValue, int position, Object value) + { + udtValue.setVector(position, (List) value); + } + + @Override + public Object randomValue(int minCollectionSize) + { + return IntStream.range(0, dimensions) + .mapToObj(element -> type().randomValue(minCollectionSize)) + .collect(Collectors.toList()); + } + + @Override + public org.apache.cassandra.cql3.functions.types.DataType driverDataType(boolean isFrozen) + { + return org.apache.cassandra.cql3.functions.types.DataType.vector(((CqlType) type()).driverDataType(isFrozen), dimensions); + } + + @Override + public Object convertForCqlWriter(Object value, CassandraVersion version, boolean isCollectionElement) + { + return ((List) value).stream() + .map(element -> type().convertForCqlWriter(element, version, true)) + .collect(Collectors.toList()); + } + + @Override + public void addCell(final org.apache.cassandra.db.rows.Row.Builder rowBuilder, + ColumnMetadata cd, + long timestamp, + int ttl, + int now, + Object value) + { + for (Object o : (List) value) + { + if (ttl != NO_TTL) + { + rowBuilder.addCell(BufferCell.expiring(cd, timestamp, ttl, now, type().serialize(o), + CellPath.create(TimeUUID.Generator.nextTimeUUID().toBytes()))); + } + else + { + rowBuilder.addCell(BufferCell.live(cd, timestamp, type().serialize(o), randomCellPath())); + } + } + } + + protected CellPath randomCellPath() + { + return CellPath.create(TimeUUID.Generator.nextTimeUUID().toBytes()); + } +} diff --git a/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java new file mode 100644 index 000000000..b98c52d31 --- /dev/null +++ b/cassandra-five-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.util.Collections; +import java.util.Set; +import java.util.UUID; +import java.util.function.Function; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.VectorType; +import org.apache.cassandra.spark.data.CassandraTypes; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.jetbrains.annotations.Nullable; + +public class SchemaBuilder extends AbstractSchemaBuilder +{ + public SchemaBuilder(CqlTable table, Partitioner partitioner, boolean enableCdc) + { + this(table, partitioner, null, enableCdc); + } + + public SchemaBuilder(CqlTable table, Partitioner partitioner) + { + this(table, partitioner, null, false); + } + + public SchemaBuilder(CqlTable table, Partitioner partitioner, UUID tableId, boolean enableCdc) + { + this(table.createStatement(), + table.keyspace(), + table.replicationFactor(), + partitioner, + table::udtCreateStmts, + tableId, + 0, + enableCdc); + } + + @VisibleForTesting + public SchemaBuilder(String createStmt, String keyspace, ReplicationFactor replicationFactor) + { + this(createStmt, keyspace, replicationFactor, Partitioner.Murmur3Partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + @VisibleForTesting + public SchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner) + { + this(createStmt, keyspace, replicationFactor, partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + public SchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner, + Function> udtStatementsProvider, + @Nullable UUID tableId, + int indexCount, + boolean enableCdc) + { + super(createStmt, keyspace, replicationFactor, partitioner, udtStatementsProvider, + tableId, indexCount, enableCdc); + } + + @Override + protected void validateType(CQL3Type cqlType) + { + if (cqlType instanceof CQL3Type.Vector) + { + CQL3Type.Vector vector = (CQL3Type.Vector) cqlType; + VectorType vectorType = vector.getType(); + for (AbstractType subType : vectorType.subTypes()) + { + validateType(subType); + } + return; + } + super.validateType(cqlType); + } +} diff --git a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java index 05d9d45ec..202fe89f5 100644 --- a/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java +++ b/cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/spark/reader/AbstractStreamScanner.java @@ -383,7 +383,7 @@ public void consume() { boolean isStatic = cell.column().isStatic(); rowData.setColumnNameCopy(ReaderUtils.encodeCellName(metadata, - isStatic ? Clustering.STATIC_CLUSTERING : clustering, + isStatic ? Clustering.STATIC_CLUSTERING : clustering, cell.column().name.bytes, null)); if (cell.isTombstone()) diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java index 8c5537e02..6dfeea89d 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/bridge/AbstractCassandraTypes.java @@ -253,6 +253,12 @@ public CqlField.CqlList list(CqlField.CqlType type) return CqlCollection.list(type); } + @Override + public CqlField.CqlVector vector(CqlField.CqlType type, int dimensions) + { + throw new UnsupportedOperationException("Vector data type is available in C* 5.x."); + } + @Override public CqlField.CqlSet set(CqlField.CqlType type) { diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/AbstractSchemaBuilder.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/AbstractSchemaBuilder.java new file mode 100644 index 000000000..f023012a4 --- /dev/null +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/AbstractSchemaBuilder.java @@ -0,0 +1,593 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cassandra.spark.reader; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.antlr.runtime.RecognitionException; +import org.apache.cassandra.bridge.CassandraSchema; +import org.apache.cassandra.bridge.CassandraTypesImplementation; +import org.apache.cassandra.bridge.SchemaUpdater; +import org.apache.cassandra.cql3.CQL3Type; +import org.apache.cassandra.cql3.CQLFragmentParser; +import org.apache.cassandra.cql3.CqlParser; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.CollectionType; +import org.apache.cassandra.db.marshal.ListType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.SetType; +import org.apache.cassandra.db.marshal.TupleType; +import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.TableMetadataRef; +import org.apache.cassandra.schema.Types; +import org.apache.cassandra.spark.data.CassandraTypes; +import org.apache.cassandra.spark.data.CqlField; +import org.apache.cassandra.spark.data.CqlTable; +import org.apache.cassandra.spark.data.ReplicationFactor; +import org.apache.cassandra.spark.data.complex.CqlFrozen; +import org.apache.cassandra.spark.data.complex.CqlUdt; +import org.apache.cassandra.spark.data.partitioner.Partitioner; +import org.apache.cassandra.utils.Pair; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public abstract class AbstractSchemaBuilder +{ + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSchemaBuilder.class); + + protected final TableMetadata metadata; + protected final KeyspaceMetadata keyspaceMetadata; + protected final String createStmt; + protected final String keyspace; + protected final ReplicationFactor replicationFactor; + protected final CassandraTypes cassandraTypes; + protected final int indexCount; + + public AbstractSchemaBuilder(CqlTable table, Partitioner partitioner, boolean enableCdc) + { + this(table, partitioner, null, enableCdc); + } + + public AbstractSchemaBuilder(CqlTable table, Partitioner partitioner) + { + this(table, partitioner, null, false); + } + + public AbstractSchemaBuilder(CqlTable table, Partitioner partitioner, UUID tableId, boolean enableCdc) + { + this(table.createStatement(), + table.keyspace(), + table.replicationFactor(), + partitioner, + table::udtCreateStmts, + tableId, + 0, + enableCdc); + } + + @VisibleForTesting + public AbstractSchemaBuilder(String createStmt, String keyspace, ReplicationFactor replicationFactor) + { + this(createStmt, keyspace, replicationFactor, Partitioner.Murmur3Partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + @VisibleForTesting + public AbstractSchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner) + { + this(createStmt, keyspace, replicationFactor, partitioner, bridge -> Collections.emptySet(), null, 0, false); + } + + public AbstractSchemaBuilder(String createStmt, + String keyspace, + ReplicationFactor replicationFactor, + Partitioner partitioner, + Function> udtStatementsProvider, + @Nullable UUID tableId, + int indexCount, + boolean enableCdc) + { + this.createStmt = createStmt; + this.keyspace = keyspace; + this.replicationFactor = replicationFactor; + this.cassandraTypes = new CassandraTypesImplementation(); + this.indexCount = indexCount; + + Pair updated = CassandraSchema.apply(schema -> + updateSchema(schema, + this.keyspace, + udtStatementsProvider.apply(cassandraTypes), + this.createStmt, + partitioner, + this.replicationFactor, + tableId, enableCdc, + this::validateColumnMetaData)); + this.keyspaceMetadata = updated.left; + this.metadata = updated.right; + } + + // Update schema with the given keyspace, table and udt. + // It creates the corresponding metadata and opens instances for keyspace and table, if needed. + // At the end, it validates that the input keyspace and table both should have metadata exist and instance opened. + private static Pair updateSchema(Schema schema, + String keyspace, + Set udtStatements, + String createStatement, + Partitioner partitioner, + ReplicationFactor replicationFactor, + UUID tableId, + boolean enableCdc, + Consumer columnValidator) + { + // Set up and open keyspace if needed + IPartitioner cassPartitioner = CassandraTypesImplementation.getPartitioner(partitioner); + setupKeyspace(schema, keyspace, replicationFactor, cassPartitioner); + + // Set up and open table if needed, parse UDTs and include when parsing table schema + List typeStatements = new ArrayList<>(udtStatements.size()); + for (String udt : udtStatements) + { + try + { + typeStatements.add((CreateTypeStatement.Raw) CQLFragmentParser + .parseAnyUnhandled(CqlParser::query, udt)); + } + catch (RecognitionException exception) + { + LOGGER.error("Failed to parse type expression '{}'", udt); + throw new IllegalStateException(exception); + } + } + Types.RawBuilder typesBuilder = Types.rawBuilder(keyspace); + for (CreateTypeStatement.Raw st : typeStatements) + { + st.addToRawBuilder(typesBuilder); + } + Types types = typesBuilder.build(); + CreateTableStatement.Raw createTable = CQLFragmentParser.parseAny(CqlParser::createTableStatement, + createStatement, + "CREATE TABLE"); + // If the table already exists, the tableId should remain the same, unless a non-null tableId is supplied + TableMetadata maybeExistingTableMetadata = schema.getTableMetadata(keyspace, createTable.table()); + if (maybeExistingTableMetadata != null && tableId == null) + { + tableId = maybeExistingTableMetadata.id.asUUID(); + } + + TableMetadata.Builder builder = createTable + .keyspace(keyspace) + .prepare(null) + .builder(types) + .partitioner(cassPartitioner); + + if (tableId != null) + { + builder.id(TableId.fromUUID(tableId)); + } + + TableMetadata tableMetadata = builder.build(); + + if (tableMetadata.params.cdc != enableCdc) + { + tableMetadata = tableMetadata.unbuild() + .params(tableMetadata.params.unbuild() + .cdc(enableCdc) + .build()) + .build(); + } + + tableMetadata.columns().forEach(columnValidator); + setupTableAndUdt(schema, keyspace, tableMetadata, types); + + return validateKeyspaceTable(schema, keyspace, tableMetadata.name); + } + + private void validateColumnMetaData(@NotNull ColumnMetadata column) + { + validateType(column.type); + } + + protected void validateType(AbstractType type) + { + validateType(type.asCQL3Type()); + } + + protected void validateType(CQL3Type cqlType) + { + if (!(cqlType instanceof CQL3Type.Native) + && !(cqlType instanceof CQL3Type.Collection) + && !(cqlType instanceof CQL3Type.UserDefined) + && !(cqlType instanceof CQL3Type.Tuple)) + { + throw new UnsupportedOperationException("Only native, collection, tuples or UDT data types are supported, " + + "unsupported data type: " + cqlType.toString()); + } + + if (cqlType instanceof CQL3Type.Native) + { + CqlField.CqlType type = cassandraTypes.parseType(cqlType.toString()); + if (!type.isSupported()) + { + throw new UnsupportedOperationException(type.name() + " data type is not supported"); + } + } + else if (cqlType instanceof CQL3Type.Collection) + { + // Validate collection inner types + CQL3Type.Collection collection = (CQL3Type.Collection) cqlType; + CollectionType type = (CollectionType) collection.getType(); + switch (type.kind) + { + case LIST: + validateType(((ListType) type).getElementsType()); + return; + case SET: + validateType(((SetType) type).getElementsType()); + return; + case MAP: + validateType(((MapType) type).getKeysType()); + validateType(((MapType) type).getValuesType()); + return; + default: + // Do nothing + } + } + else if (cqlType instanceof CQL3Type.Tuple) + { + CQL3Type.Tuple tuple = (CQL3Type.Tuple) cqlType; + TupleType tupleType = (TupleType) tuple.getType(); + for (AbstractType subType : tupleType.allTypes()) + { + validateType(subType); + } + } + else + { + // Validate UDT inner types + UserType userType = (UserType) ((CQL3Type.UserDefined) cqlType).getType(); + for (AbstractType innerType : userType.fieldTypes()) + { + validateType(innerType); + } + } + } + + private static boolean keyspaceMetadataExists(Schema schema, String keyspaceName) + { + return schema.getKeyspaceMetadata(keyspaceName) != null; + } + + private static boolean tableMetadataExists(Schema schema, String keyspaceName, String tableName) + { + KeyspaceMetadata ksMetadata = schema.getKeyspaceMetadata(keyspaceName); + if (ksMetadata == null) + { + return false; + } + + return ksMetadata.hasTable(tableName); + } + + private static boolean keyspaceInstanceExists(Schema schema, String keyspaceName) + { + return schema.getKeyspaceInstance(keyspaceName) != null; + } + + private static boolean tableInstanceExists(Schema schema, String keyspaceName, String tableName) + { + Keyspace keyspace = schema.getKeyspaceInstance(keyspaceName); + if (keyspace == null) + { + return false; + } + + try + { + keyspace.getColumnFamilyStore(tableName); + } + catch (IllegalArgumentException exception) + { + LOGGER.info("Table instance does not exist. keyspace={} table={} existingCFS={}", + keyspace, tableName, keyspace.getColumnFamilyStores()); + return false; + } + return true; + } + + // Check whether keyspace metadata exists. Create keyspace metadata, if not. + // Check whether keyspace instance is opened. Open the keyspace, if not. + // NOTE: It is possible that external code that just creates metadata, but does not open the keyspace + private static void setupKeyspace(Schema schema, + String keyspaceName, + ReplicationFactor replicationFactor, + IPartitioner partitioner) + { + if (!keyspaceMetadataExists(schema, keyspaceName)) + { + LOGGER.info("Setting up keyspace metadata in schema keyspace={} rfStrategy={} partitioner={}", + keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); + KeyspaceMetadata keyspaceMetadata = + KeyspaceMetadata.create(keyspaceName, KeyspaceParams.create(true, rfToMap(replicationFactor))); + SchemaUpdater.load(schema, keyspaceMetadata); + } + + if (!keyspaceInstanceExists(schema, keyspaceName)) + { + LOGGER.info("Setting up keyspace instance in schema keyspace={} rfStrategy={} partitioner={}", + keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); + // Create keyspace instance and also initCf (cfs) for the table + Keyspace.openWithoutSSTables(keyspaceName); + } + } + + // Check whether table metadata exists. Create table metadata, if not. + // Check whether table instance is opened. Open/init the table, if not. + // NOTE: It is possible that external code that just creates metadata, but does not open the table + private static void setupTableAndUdt(Schema schema, + String keyspaceName, + TableMetadata tableMetadata, + Types userTypes) + { + String tableName = tableMetadata.name; + KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); + if (keyspaceMetadata == null) + { + LOGGER.error("Keyspace metadata does not exist. keyspace={}", keyspaceName); + throw new IllegalStateException("Keyspace metadata null for '" + keyspaceName + + "' when it should have been initialized already"); + } + + if (!tableMetadataExists(schema, keyspaceName, tableName)) + { + LOGGER.info("Setting up table metadata in schema keyspace={} table={} partitioner={}", + keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); + keyspaceMetadata = keyspaceMetadata.withSwapped(keyspaceMetadata.tables.with(tableMetadata)); + SchemaUpdater.load(schema, keyspaceMetadata, tableMetadata); + } + + if (!tableMetadata.equals(schema.getTableMetadata(keyspaceName, tableMetadata.name))) + { + // Schema of the table has changed so update it in the schema + updateTableMetaData(schema, keyspaceName, tableMetadata); + LOGGER.info("Table metadata changed schema keyspace={} table={} partitioner={}", + keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); + } + + // The metadata of the table might not be the input tableMetadata. Fetch the current to be safe. + TableMetadata currentTable = schema.getTableMetadata(keyspaceName, tableName); + if (!tableInstanceExists(schema, keyspaceName, tableName)) + { + LOGGER.info("Setting up table instance in schema keyspace={} table={} partitioner={}", + keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); + if (keyspaceInstanceExists(schema, keyspaceName)) + { + // initCf (cfs) in the opened keyspace + schema.getKeyspaceInstance(keyspaceName) + .initCf(TableMetadataRef.forOfflineTools(currentTable), false); + } + else + { + // The keyspace has not yet opened, create/open keyspace instance and also initCf (cfs) for the table + Keyspace.openWithoutSSTables(keyspaceName); + } + } + + if (!userTypes.equals(Types.none())) + { + LOGGER.info("Setting up user types in schema keyspace={} types={}", + keyspaceName, userTypes); + // Update Schema instance with any user-defined types built + keyspaceMetadata = keyspaceMetadata.withSwapped(userTypes); + SchemaUpdater.load(schema, keyspaceMetadata, userTypes); + } + } + + private static void updateTableMetaData(Schema schema, String keyspace, TableMetadata tableMetadata) + { + KeyspaceMetadata ks = schema.getKeyspaceMetadata(keyspace); + ks = ks.withSwapped(ks.tables.withSwapped(tableMetadata)); + SchemaUpdater.load(schema, ks, tableMetadata); + } + + private static Pair validateKeyspaceTable(Schema schema, + String keyspaceName, + String tableName) + { + Preconditions.checkState(keyspaceMetadataExists(schema, keyspaceName), + "Keyspace metadata does not exist after building schema. keyspace=%s", + keyspaceName); + Preconditions.checkState(keyspaceInstanceExists(schema, keyspaceName), + "Keyspace instance is not opened after building schema. keyspace=%s", + keyspaceName); + Preconditions.checkState(tableMetadataExists(schema, keyspaceName, tableName), + "Table metadata does not exist after building schema. keyspace=%s table=%s", + keyspaceName, tableName); + Preconditions.checkState(tableInstanceExists(schema, keyspaceName, tableName), + "Table instance is not opened after building schema. keyspace=%s table=%s", + keyspaceName, tableName); + + // Validated above that keyspace and table, both exist and are opened + KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); + TableMetadata tableMetadata = schema.getTableMetadata(keyspaceName, tableName); + return Pair.create(keyspaceMetadata, tableMetadata); + } + + public TableMetadata tableMetaData() + { + return metadata; + } + + public String createStatement() + { + return createStmt; + } + + public CqlTable build() + { + Map udts = buildsUdts(keyspaceMetadata); + List fields = buildFields(metadata, udts).stream().sorted().collect(Collectors.toList()); + return new CqlTable(keyspace, + metadata.name, + createStmt, + replicationFactor, + fields, + new HashSet<>(udts.values()), + indexCount); + } + + private Map buildsUdts(KeyspaceMetadata keyspaceMetadata) + { + List userTypes = new ArrayList<>(); + keyspaceMetadata.types.forEach(userTypes::add); + Map udts = new HashMap<>(userTypes.size()); + while (!userTypes.isEmpty()) + { + UserType userType = userTypes.remove(0); + if (!AbstractSchemaBuilder.nestedUdts(userType).stream().allMatch(udts::containsKey)) + { + // This UDT contains a nested user-defined type that has not been parsed yet + // so re-add to the queue and parse later + userTypes.add(userType); + continue; + } + String name = userType.getNameAsString(); + CqlUdt.Builder builder = CqlUdt.builder(keyspaceMetadata.name, name); + for (int field = 0; field < userType.size(); field++) + { + builder.withField(userType.fieldName(field).toString(), + cassandraTypes.parseType(userType.fieldType(field).asCQL3Type().toString(), udts)); + } + udts.put(name, builder.build()); + } + + return udts; + } + + /** + * @param type an abstract type + * @return a set of UDTs nested within the type parameter + */ + private static Set nestedUdts(AbstractType type) + { + Set result = new HashSet<>(); + nestedUdts(type, result, false); + return result; + } + + private static void nestedUdts(AbstractType type, Set udts, boolean isNested) + { + if (type instanceof UserType) + { + if (isNested) + { + udts.add(((UserType) type).getNameAsString()); + } + for (AbstractType nestedType : ((UserType) type).fieldTypes()) + { + nestedUdts(nestedType, udts, true); + } + } + else if (type instanceof TupleType) + { + for (AbstractType nestedType : ((TupleType) type).allTypes()) + { + nestedUdts(nestedType, udts, true); + } + } + else if (type instanceof SetType) + { + nestedUdts(((SetType) type).getElementsType(), udts, true); + } + else if (type instanceof ListType) + { + nestedUdts(((ListType) type).getElementsType(), udts, true); + } + else if (type instanceof MapType) + { + nestedUdts(((MapType) type).getKeysType(), udts, true); + nestedUdts(((MapType) type).getValuesType(), udts, true); + } + } + + private List buildFields(TableMetadata metadata, Map udts) + { + Iterator it = metadata.allColumnsInSelectOrder(); + List result = new ArrayList<>(); + int position = 0; + while (it.hasNext()) + { + ColumnMetadata col = it.next(); + boolean isPartitionKey = col.isPartitionKey(); + boolean isClusteringColumn = col.isClusteringColumn(); + boolean isStatic = col.isStatic(); + String name = col.name.toString(); + CqlField.CqlType type = col.type.isUDT() ? udts.get(((UserType) col.type).getNameAsString()) + : cassandraTypes.parseType(col.type.asCQL3Type().toString(), udts); + boolean isFrozen = col.type.isFreezable() && !col.type.isMultiCell(); + result.add(new CqlField(isPartitionKey, + isClusteringColumn, + isStatic, + name, + !(type instanceof CqlFrozen) && isFrozen ? CqlFrozen.build(type) : type, + position)); + position++; + } + return result; + } + + static Map rfToMap(ReplicationFactor replicationFactor) + { + Map result = new HashMap<>(replicationFactor.getOptions().size() + 1); + result.put("class", "org.apache.cassandra.locator." + replicationFactor.getReplicationStrategy().name()); + for (Map.Entry entry : replicationFactor.getOptions().entrySet()) + { + result.put(entry.getKey(), Integer.toString(entry.getValue())); + } + return result; + } +} diff --git a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java index 5d0493a1f..6dd0111cb 100644 --- a/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java +++ b/cassandra-four-zero-types/src/main/java/org/apache/cassandra/spark/reader/SchemaBuilder.java @@ -19,73 +19,21 @@ package org.apache.cassandra.spark.reader; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.antlr.runtime.RecognitionException; -import org.apache.cassandra.bridge.CassandraSchema; -import org.apache.cassandra.bridge.CassandraTypesImplementation; -import org.apache.cassandra.bridge.SchemaUpdater; -import org.apache.cassandra.cql3.CQL3Type; -import org.apache.cassandra.cql3.CQLFragmentParser; -import org.apache.cassandra.cql3.CqlParser; -import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; -import org.apache.cassandra.cql3.statements.schema.CreateTypeStatement; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.CollectionType; -import org.apache.cassandra.db.marshal.ListType; -import org.apache.cassandra.db.marshal.MapType; -import org.apache.cassandra.db.marshal.SetType; -import org.apache.cassandra.db.marshal.TupleType; -import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.schema.ColumnMetadata; -import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.schema.TableMetadataRef; -import org.apache.cassandra.schema.Types; import org.apache.cassandra.spark.data.CassandraTypes; -import org.apache.cassandra.spark.data.CqlField; import org.apache.cassandra.spark.data.CqlTable; import org.apache.cassandra.spark.data.ReplicationFactor; -import org.apache.cassandra.spark.data.complex.CqlFrozen; -import org.apache.cassandra.spark.data.complex.CqlUdt; import org.apache.cassandra.spark.data.partitioner.Partitioner; -import org.apache.cassandra.utils.Pair; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -public class SchemaBuilder +public class SchemaBuilder extends AbstractSchemaBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(SchemaBuilder.class); - - private final TableMetadata metadata; - private final KeyspaceMetadata keyspaceMetadata; - private final String createStmt; - private final String keyspace; - private final ReplicationFactor replicationFactor; - private final CassandraTypes cassandraTypes; - private final int indexCount; - public SchemaBuilder(CqlTable table, Partitioner partitioner, boolean enableCdc) { this(table, partitioner, null, enableCdc); @@ -132,462 +80,7 @@ public SchemaBuilder(String createStmt, int indexCount, boolean enableCdc) { - this.createStmt = createStmt; - this.keyspace = keyspace; - this.replicationFactor = replicationFactor; - this.cassandraTypes = new CassandraTypesImplementation(); - this.indexCount = indexCount; - - Pair updated = CassandraSchema.apply(schema -> - updateSchema(schema, - this.keyspace, - udtStatementsProvider.apply(cassandraTypes), - this.createStmt, - partitioner, - this.replicationFactor, - tableId, enableCdc, - this::validateColumnMetaData)); - this.keyspaceMetadata = updated.left; - this.metadata = updated.right; - } - - // Update schema with the given keyspace, table and udt. - // It creates the corresponding metadata and opens instances for keyspace and table, if needed. - // At the end, it validates that the input keyspace and table both should have metadata exist and instance opened. - private static Pair updateSchema(Schema schema, - String keyspace, - Set udtStatements, - String createStatement, - Partitioner partitioner, - ReplicationFactor replicationFactor, - UUID tableId, - boolean enableCdc, - Consumer columnValidator) - { - // Set up and open keyspace if needed - IPartitioner cassPartitioner = CassandraTypesImplementation.getPartitioner(partitioner); - setupKeyspace(schema, keyspace, replicationFactor, cassPartitioner); - - // Set up and open table if needed, parse UDTs and include when parsing table schema - List typeStatements = new ArrayList<>(udtStatements.size()); - for (String udt : udtStatements) - { - try - { - typeStatements.add((CreateTypeStatement.Raw) CQLFragmentParser - .parseAnyUnhandled(CqlParser::query, udt)); - } - catch (RecognitionException exception) - { - LOGGER.error("Failed to parse type expression '{}'", udt); - throw new IllegalStateException(exception); - } - } - Types.RawBuilder typesBuilder = Types.rawBuilder(keyspace); - for (CreateTypeStatement.Raw st : typeStatements) - { - st.addToRawBuilder(typesBuilder); - } - Types types = typesBuilder.build(); - CreateTableStatement.Raw createTable = CQLFragmentParser.parseAny(CqlParser::createTableStatement, - createStatement, - "CREATE TABLE"); - // If the table already exists, the tableId should remain the same, unless a non-null tableId is supplied - TableMetadata maybeExistingTableMetadata = schema.getTableMetadata(keyspace, createTable.table()); - if (maybeExistingTableMetadata != null && tableId == null) - { - tableId = maybeExistingTableMetadata.id.asUUID(); - } - - TableMetadata.Builder builder = createTable - .keyspace(keyspace) - .prepare(null) - .builder(types) - .partitioner(cassPartitioner); - - if (tableId != null) - { - builder.id(TableId.fromUUID(tableId)); - } - - TableMetadata tableMetadata = builder.build(); - - if (tableMetadata.params.cdc != enableCdc) - { - tableMetadata = tableMetadata.unbuild() - .params(tableMetadata.params.unbuild() - .cdc(enableCdc) - .build()) - .build(); - } - - tableMetadata.columns().forEach(columnValidator); - setupTableAndUdt(schema, keyspace, tableMetadata, types); - - return validateKeyspaceTable(schema, keyspace, tableMetadata.name); - } - - private void validateColumnMetaData(@NotNull ColumnMetadata column) - { - validateType(column.type); - } - - private void validateType(AbstractType type) - { - validateType(type.asCQL3Type()); - } - - private void validateType(CQL3Type cqlType) - { - if (!(cqlType instanceof CQL3Type.Native) - && !(cqlType instanceof CQL3Type.Collection) - && !(cqlType instanceof CQL3Type.UserDefined) - && !(cqlType instanceof CQL3Type.Tuple)) - { - throw new UnsupportedOperationException("Only native, collection, tuples or UDT data types are supported, " - + "unsupported data type: " + cqlType.toString()); - } - - if (cqlType instanceof CQL3Type.Native) - { - CqlField.CqlType type = cassandraTypes.parseType(cqlType.toString()); - if (!type.isSupported()) - { - throw new UnsupportedOperationException(type.name() + " data type is not supported"); - } - } - else if (cqlType instanceof CQL3Type.Collection) - { - // Validate collection inner types - CQL3Type.Collection collection = (CQL3Type.Collection) cqlType; - CollectionType type = (CollectionType) collection.getType(); - switch (type.kind) - { - case LIST: - validateType(((ListType) type).getElementsType()); - return; - case SET: - validateType(((SetType) type).getElementsType()); - return; - case MAP: - validateType(((MapType) type).getKeysType()); - validateType(((MapType) type).getValuesType()); - return; - default: - // Do nothing - } - } - else if (cqlType instanceof CQL3Type.Tuple) - { - CQL3Type.Tuple tuple = (CQL3Type.Tuple) cqlType; - TupleType tupleType = (TupleType) tuple.getType(); - for (AbstractType subType : tupleType.allTypes()) - { - validateType(subType); - } - } - else - { - // Validate UDT inner types - UserType userType = (UserType) ((CQL3Type.UserDefined) cqlType).getType(); - for (AbstractType innerType : userType.fieldTypes()) - { - validateType(innerType); - } - } - } - - private static boolean keyspaceMetadataExists(Schema schema, String keyspaceName) - { - return schema.getKeyspaceMetadata(keyspaceName) != null; - } - - private static boolean tableMetadataExists(Schema schema, String keyspaceName, String tableName) - { - KeyspaceMetadata ksMetadata = schema.getKeyspaceMetadata(keyspaceName); - if (ksMetadata == null) - { - return false; - } - - return ksMetadata.hasTable(tableName); - } - - private static boolean keyspaceInstanceExists(Schema schema, String keyspaceName) - { - return schema.getKeyspaceInstance(keyspaceName) != null; - } - - private static boolean tableInstanceExists(Schema schema, String keyspaceName, String tableName) - { - Keyspace keyspace = schema.getKeyspaceInstance(keyspaceName); - if (keyspace == null) - { - return false; - } - - try - { - keyspace.getColumnFamilyStore(tableName); - } - catch (IllegalArgumentException exception) - { - LOGGER.info("Table instance does not exist. keyspace={} table={} existingCFS={}", - keyspace, tableName, keyspace.getColumnFamilyStores()); - return false; - } - return true; - } - - // Check whether keyspace metadata exists. Create keyspace metadata, if not. - // Check whether keyspace instance is opened. Open the keyspace, if not. - // NOTE: It is possible that external code that just creates metadata, but does not open the keyspace - private static void setupKeyspace(Schema schema, - String keyspaceName, - ReplicationFactor replicationFactor, - IPartitioner partitioner) - { - if (!keyspaceMetadataExists(schema, keyspaceName)) - { - LOGGER.info("Setting up keyspace metadata in schema keyspace={} rfStrategy={} partitioner={}", - keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); - KeyspaceMetadata keyspaceMetadata = - KeyspaceMetadata.create(keyspaceName, KeyspaceParams.create(true, rfToMap(replicationFactor))); - SchemaUpdater.load(schema, keyspaceMetadata); - } - - if (!keyspaceInstanceExists(schema, keyspaceName)) - { - LOGGER.info("Setting up keyspace instance in schema keyspace={} rfStrategy={} partitioner={}", - keyspaceName, replicationFactor.getReplicationStrategy().name(), partitioner); - // Create keyspace instance and also initCf (cfs) for the table - Keyspace.openWithoutSSTables(keyspaceName); - } - } - - // Check whether table metadata exists. Create table metadata, if not. - // Check whether table instance is opened. Open/init the table, if not. - // NOTE: It is possible that external code that just creates metadata, but does not open the table - private static void setupTableAndUdt(Schema schema, - String keyspaceName, - TableMetadata tableMetadata, - Types userTypes) - { - String tableName = tableMetadata.name; - KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); - if (keyspaceMetadata == null) - { - LOGGER.error("Keyspace metadata does not exist. keyspace={}", keyspaceName); - throw new IllegalStateException("Keyspace metadata null for '" + keyspaceName - + "' when it should have been initialized already"); - } - - if (!tableMetadataExists(schema, keyspaceName, tableName)) - { - LOGGER.info("Setting up table metadata in schema keyspace={} table={} partitioner={}", - keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); - keyspaceMetadata = keyspaceMetadata.withSwapped(keyspaceMetadata.tables.with(tableMetadata)); - SchemaUpdater.load(schema, keyspaceMetadata, tableMetadata); - } - - if (!tableMetadata.equals(schema.getTableMetadata(keyspaceName, tableMetadata.name))) - { - // Schema of the table has changed so update it in the schema - updateTableMetaData(schema, keyspaceName, tableMetadata); - LOGGER.info("Table metadata changed schema keyspace={} table={} partitioner={}", - keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); - } - - // The metadata of the table might not be the input tableMetadata. Fetch the current to be safe. - TableMetadata currentTable = schema.getTableMetadata(keyspaceName, tableName); - if (!tableInstanceExists(schema, keyspaceName, tableName)) - { - LOGGER.info("Setting up table instance in schema keyspace={} table={} partitioner={}", - keyspaceName, tableName, tableMetadata.partitioner.getClass().getName()); - if (keyspaceInstanceExists(schema, keyspaceName)) - { - // initCf (cfs) in the opened keyspace - schema.getKeyspaceInstance(keyspaceName) - .initCf(TableMetadataRef.forOfflineTools(currentTable), false); - } - else - { - // The keyspace has not yet opened, create/open keyspace instance and also initCf (cfs) for the table - Keyspace.openWithoutSSTables(keyspaceName); - } - } - - if (!userTypes.equals(Types.none())) - { - LOGGER.info("Setting up user types in schema keyspace={} types={}", - keyspaceName, userTypes); - // Update Schema instance with any user-defined types built - keyspaceMetadata = keyspaceMetadata.withSwapped(userTypes); - SchemaUpdater.load(schema, keyspaceMetadata, userTypes); - } - } - - private static void updateTableMetaData(Schema schema, String keyspace, TableMetadata tableMetadata) - { - KeyspaceMetadata ks = schema.getKeyspaceMetadata(keyspace); - ks = ks.withSwapped(ks.tables.withSwapped(tableMetadata)); - SchemaUpdater.load(schema, ks, tableMetadata); - } - - private static Pair validateKeyspaceTable(Schema schema, - String keyspaceName, - String tableName) - { - Preconditions.checkState(keyspaceMetadataExists(schema, keyspaceName), - "Keyspace metadata does not exist after building schema. keyspace=%s", - keyspaceName); - Preconditions.checkState(keyspaceInstanceExists(schema, keyspaceName), - "Keyspace instance is not opened after building schema. keyspace=%s", - keyspaceName); - Preconditions.checkState(tableMetadataExists(schema, keyspaceName, tableName), - "Table metadata does not exist after building schema. keyspace=%s table=%s", - keyspaceName, tableName); - Preconditions.checkState(tableInstanceExists(schema, keyspaceName, tableName), - "Table instance is not opened after building schema. keyspace=%s table=%s", - keyspaceName, tableName); - - // Validated above that keyspace and table, both exist and are opened - KeyspaceMetadata keyspaceMetadata = schema.getKeyspaceMetadata(keyspaceName); - TableMetadata tableMetadata = schema.getTableMetadata(keyspaceName, tableName); - return Pair.create(keyspaceMetadata, tableMetadata); - } - - public TableMetadata tableMetaData() - { - return metadata; - } - - public String createStatement() - { - return createStmt; - } - - public CqlTable build() - { - Map udts = buildsUdts(keyspaceMetadata); - List fields = buildFields(metadata, udts).stream().sorted().collect(Collectors.toList()); - return new CqlTable(keyspace, - metadata.name, - createStmt, - replicationFactor, - fields, - new HashSet<>(udts.values()), - indexCount); - } - - private Map buildsUdts(KeyspaceMetadata keyspaceMetadata) - { - List userTypes = new ArrayList<>(); - keyspaceMetadata.types.forEach(userTypes::add); - Map udts = new HashMap<>(userTypes.size()); - while (!userTypes.isEmpty()) - { - UserType userType = userTypes.remove(0); - if (!SchemaBuilder.nestedUdts(userType).stream().allMatch(udts::containsKey)) - { - // This UDT contains a nested user-defined type that has not been parsed yet - // so re-add to the queue and parse later - userTypes.add(userType); - continue; - } - String name = userType.getNameAsString(); - CqlUdt.Builder builder = CqlUdt.builder(keyspaceMetadata.name, name); - for (int field = 0; field < userType.size(); field++) - { - builder.withField(userType.fieldName(field).toString(), - cassandraTypes.parseType(userType.fieldType(field).asCQL3Type().toString(), udts)); - } - udts.put(name, builder.build()); - } - - return udts; - } - - /** - * @param type an abstract type - * @return a set of UDTs nested within the type parameter - */ - private static Set nestedUdts(AbstractType type) - { - Set result = new HashSet<>(); - nestedUdts(type, result, false); - return result; - } - - private static void nestedUdts(AbstractType type, Set udts, boolean isNested) - { - if (type instanceof UserType) - { - if (isNested) - { - udts.add(((UserType) type).getNameAsString()); - } - for (AbstractType nestedType : ((UserType) type).fieldTypes()) - { - nestedUdts(nestedType, udts, true); - } - } - else if (type instanceof TupleType) - { - for (AbstractType nestedType : ((TupleType) type).allTypes()) - { - nestedUdts(nestedType, udts, true); - } - } - else if (type instanceof SetType) - { - nestedUdts(((SetType) type).getElementsType(), udts, true); - } - else if (type instanceof ListType) - { - nestedUdts(((ListType) type).getElementsType(), udts, true); - } - else if (type instanceof MapType) - { - nestedUdts(((MapType) type).getKeysType(), udts, true); - nestedUdts(((MapType) type).getValuesType(), udts, true); - } - } - - private List buildFields(TableMetadata metadata, Map udts) - { - Iterator it = metadata.allColumnsInSelectOrder(); - List result = new ArrayList<>(); - int position = 0; - while (it.hasNext()) - { - ColumnMetadata col = it.next(); - boolean isPartitionKey = col.isPartitionKey(); - boolean isClusteringColumn = col.isClusteringColumn(); - boolean isStatic = col.isStatic(); - String name = col.name.toString(); - CqlField.CqlType type = col.type.isUDT() ? udts.get(((UserType) col.type).getNameAsString()) - : cassandraTypes.parseType(col.type.asCQL3Type().toString(), udts); - boolean isFrozen = col.type.isFreezable() && !col.type.isMultiCell(); - result.add(new CqlField(isPartitionKey, - isClusteringColumn, - isStatic, - name, - !(type instanceof CqlFrozen) && isFrozen ? CqlFrozen.build(type) : type, - position)); - position++; - } - return result; - } - - static Map rfToMap(ReplicationFactor replicationFactor) - { - Map result = new HashMap<>(replicationFactor.getOptions().size() + 1); - result.put("class", "org.apache.cassandra.locator." + replicationFactor.getReplicationStrategy().name()); - for (Map.Entry entry : replicationFactor.getOptions().entrySet()) - { - result.put(entry.getKey(), Integer.toString(entry.getValue())); - } - return result; + super(createStmt, keyspace, replicationFactor, partitioner, udtStatementsProvider, + tableId, indexCount, enableCdc); } }