From 34c44c97cfc164071a610f1af9ec277c74643fe7 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 4 Dec 2025 02:33:31 +0100 Subject: [PATCH] [FLINK-38807][state] Add state SQL metadata based type inference --- docs/content/docs/libs/state_processor_api.md | 10 +- .../state/api/runtime/SavepointLoader.java | 80 ++- .../SavepointDataStreamScanProvider.java | 18 +- .../SavepointDynamicTableSourceFactory.java | 350 +++++-------- .../table/SavepointTypeInfoResolver.java | 493 ++++++++++++++++++ .../table/StateValueColumnConfiguration.java | 22 +- ...icAvroSavepointTypeInformationFactory.java | 32 -- .../SavepointDynamicTableSourceTest.java | 21 +- ...vepointMetadataDynamicTableSourceTest.java | 4 +- .../SavepointTypeInformationFactoryTest.java | 130 +++++ ...icAvroSavepointTypeInformationFactory.java | 33 -- .../src/test/resources/table-state/_metadata | Bin 22390 -> 26034 bytes 12 files changed, 860 insertions(+), 333 deletions(-) create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointTypeInfoResolver.java delete mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/GenericAvroSavepointTypeInformationFactory.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointTypeInformationFactoryTest.java delete mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SpecificAvroSavepointTypeInformationFactory.java diff --git a/docs/content/docs/libs/state_processor_api.md b/docs/content/docs/libs/state_processor_api.md index 57f35a72f13b3..82c34c11e2344 100644 --- a/docs/content/docs/libs/state_processor_api.md +++ b/docs/content/docs/libs/state_processor_api.md @@ -586,13 +586,6 @@ public class StatefulFunction extends KeyedProcessFunction getTypeInformation() { - return new AvroTypeInfo<>(AvroRecord.class); - } -} ``` Then it can read by querying a table created using the following SQL statement: @@ -609,8 +602,7 @@ CREATE TABLE state_table ( 'connector' = 'savepoint', 'state.backend.type' = 'rocksdb', 'state.path' = '/root/dir/of/checkpoint-data/chk-1', - 'operator.uid' = 'my-uid', - 'fields.MyAvroState.value-type-factory' = 'org.apache.flink.state.table.AvroSavepointTypeInformationFactory' + 'operator.uid' = 'my-uid' ); ``` diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java index 1b2cdedefaf56..e4ea285c0e876 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointLoader.java @@ -19,15 +19,27 @@ package org.apache.flink.state.api.runtime; import org.apache.flink.annotation.Internal; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.Checkpoints; +import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.state.api.OperatorIdentifier; import java.io.DataInputStream; import java.io.IOException; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; -/** Utility class for loading {@link CheckpointMetadata} metadata. */ +/** Utility class for loading savepoint metadata and operator state information. */ @Internal public final class SavepointLoader { private SavepointLoader() {} @@ -55,4 +67,70 @@ public static CheckpointMetadata loadSavepointMetadata(String savepointPath) stream, Thread.currentThread().getContextClassLoader(), savepointPath); } } + + /** + * Loads all state metadata for an operator in a single I/O operation. + * + * @param savepointPath Path to the savepoint directory + * @param operatorIdentifier Operator UID or hash + * @return Map from state name to StateMetaInfoSnapshot + * @throws IOException If reading fails + */ + public static Map loadOperatorStateMetadata( + String savepointPath, OperatorIdentifier operatorIdentifier) throws IOException { + + CheckpointMetadata checkpointMetadata = loadSavepointMetadata(savepointPath); + + OperatorState operatorState = + checkpointMetadata.getOperatorStates().stream() + .filter( + state -> + operatorIdentifier + .getOperatorId() + .equals(state.getOperatorID())) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "Operator " + + operatorIdentifier + + " not found in savepoint")); + + KeyedStateHandle keyedStateHandle = + operatorState.getStates().stream() + .flatMap(s -> s.getManagedKeyedState().stream()) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "No keyed state found for operator " + + operatorIdentifier)); + + KeyedBackendSerializationProxy proxy = readSerializationProxy(keyedStateHandle); + return proxy.getStateMetaInfoSnapshots().stream() + .collect(Collectors.toMap(StateMetaInfoSnapshot::getName, Function.identity())); + } + + private static KeyedBackendSerializationProxy readSerializationProxy( + KeyedStateHandle stateHandle) throws IOException { + + StreamStateHandle streamStateHandle; + if (stateHandle instanceof KeyGroupsStateHandle) { + streamStateHandle = ((KeyGroupsStateHandle) stateHandle).getDelegateStateHandle(); + } else { + throw new IllegalArgumentException( + "Unsupported KeyedStateHandle type: " + stateHandle.getClass()); + } + + try (FSDataInputStream inputStream = streamStateHandle.openInputStream()) { + DataInputViewStreamWrapper inputView = new DataInputViewStreamWrapper(inputStream); + + KeyedBackendSerializationProxy proxy = + new KeyedBackendSerializationProxy<>( + Thread.currentThread().getContextClassLoader()); + proxy.read(inputView); + + return proxy; + } + } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java index 5393e4fa01a34..fd24b0f2448a2 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.StateBackendOptions; @@ -91,32 +92,33 @@ public DataStream produceDataStream( // Get value state descriptors for (StateValueColumnConfiguration columnConfig : keyValueProjections.f1) { - TypeInformation valueTypeInfo = columnConfig.getValueTypeInfo(); + TypeSerializer valueTypeSerializer = columnConfig.getValueTypeSerializer(); switch (columnConfig.getStateType()) { case VALUE: columnConfig.setStateDescriptor( new ValueStateDescriptor<>( - columnConfig.getStateName(), valueTypeInfo)); + columnConfig.getStateName(), valueTypeSerializer)); break; case LIST: columnConfig.setStateDescriptor( new ListStateDescriptor<>( - columnConfig.getStateName(), valueTypeInfo)); + columnConfig.getStateName(), valueTypeSerializer)); break; case MAP: - TypeInformation mapKeyTypeInfo = columnConfig.getMapKeyTypeInfo(); - if (mapKeyTypeInfo == null) { + TypeSerializer mapKeyTypeSerializer = + columnConfig.getMapKeyTypeSerializer(); + if (mapKeyTypeSerializer == null) { throw new ConfigurationException( - "Map key type information is required for map state"); + "Map key type serializer is required for map state"); } columnConfig.setStateDescriptor( new MapStateDescriptor<>( columnConfig.getStateName(), - mapKeyTypeInfo, - valueTypeInfo)); + mapKeyTypeSerializer, + valueTypeSerializer)); break; default: diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java index 7ac206d008110..9c507109c24ea 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java @@ -18,12 +18,16 @@ package org.apache.flink.state.table; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.serialization.SerializerConfigImpl; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeinfo.utils.TypeUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.state.api.OperatorIdentifier; +import org.apache.flink.state.api.runtime.SavepointLoader; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.ResolvedSchema; @@ -31,21 +35,19 @@ import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; -import org.apache.flink.table.types.logical.ArrayType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; -import org.apache.flink.table.types.logical.MapType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; import org.apache.flink.util.Preconditions; -import javax.annotation.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.math.BigDecimal; import java.util.Arrays; import java.util.HashSet; import java.util.List; -import java.util.Optional; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -73,15 +75,27 @@ /** Dynamic source factory for {@link SavepointDynamicTableSource}. */ public class SavepointDynamicTableSourceFactory implements DynamicTableSourceFactory { + + private static final Logger LOG = + LoggerFactory.getLogger(SavepointDynamicTableSourceFactory.class); + @Override public DynamicTableSource createDynamicTableSource(Context context) { Configuration options = new Configuration(); context.getCatalogTable().getOptions().forEach(options::setString); + SerializerConfig serializerConfig = new SerializerConfigImpl(options); final String stateBackendType = options.getOptional(STATE_BACKEND_TYPE).orElse(null); final String statePath = options.get(STATE_PATH); final OperatorIdentifier operatorIdentifier = getOperatorIdentifier(options); + final Map preloadedStateMetadata = + preloadStateMetadata(statePath, operatorIdentifier); + + // Create resolver with preloaded metadata + SavepointTypeInfoResolver typeResolver = + new SavepointTypeInfoResolver(preloadedStateMetadata, serializerConfig); + final Tuple2 keyValueProjections = createKeyValueProjections(context.getCatalogTable()); @@ -106,115 +120,21 @@ public DynamicTableSource createDynamicTableSource(Context context) { optionalOptions.add(keyTypeInfoFactoryOption); TypeInformation keyTypeInfo = - getTypeInfo(options, keyFormatOption, keyTypeInfoFactoryOption, keyRowField, true); + typeResolver.resolveKeyType( + options, keyFormatOption, keyTypeInfoFactoryOption, keyRowField); final Tuple2> keyValueConfigProjections = Tuple2.of( keyValueProjections.f0, Arrays.stream(keyValueProjections.f1) .mapToObj( - columnIndex -> { - RowType.RowField valueRowField = - rowType.getFields().get(columnIndex); - - ConfigOption stateNameOption = - key(String.format( - "%s.%s.%s", - FIELDS, - valueRowField.getName(), - STATE_NAME)) - .stringType() - .noDefaultValue(); - optionalOptions.add(stateNameOption); - - ConfigOption - stateTypeOption = - key(String.format( - "%s.%s.%s", - FIELDS, - valueRowField.getName(), - STATE_TYPE)) - .enumType( - SavepointConnectorOptions - .StateType - .class) - .noDefaultValue(); - optionalOptions.add(stateTypeOption); - - ConfigOption mapKeyFormatOption = - key(String.format( - "%s.%s.%s", - FIELDS, - valueRowField.getName(), - KEY_CLASS)) - .stringType() - .noDefaultValue(); - optionalOptions.add(mapKeyFormatOption); - - ConfigOption mapKeyTypeInfoFactoryOption = - key(String.format( - "%s.%s.%s", - FIELDS, - valueRowField.getName(), - KEY_TYPE_FACTORY)) - .stringType() - .noDefaultValue(); - optionalOptions.add(mapKeyTypeInfoFactoryOption); - - ConfigOption valueFormatOption = - key(String.format( - "%s.%s.%s", - FIELDS, - valueRowField.getName(), - VALUE_CLASS)) - .stringType() - .noDefaultValue(); - optionalOptions.add(valueFormatOption); - - ConfigOption valueTypeInfoFactoryOption = - key(String.format( - "%s.%s.%s", - FIELDS, - valueRowField.getName(), - VALUE_TYPE_FACTORY)) - .stringType() - .noDefaultValue(); - optionalOptions.add(valueTypeInfoFactoryOption); - - LogicalType valueLogicalType = valueRowField.getType(); - - SavepointConnectorOptions.StateType stateType = - options.getOptional(stateTypeOption) - .orElseGet( - () -> - inferStateType( - valueLogicalType)); - - TypeInformation mapKeyTypeInfo = - getTypeInfo( - options, - keyFormatOption, - mapKeyTypeInfoFactoryOption, - valueRowField, - stateType.equals( - SavepointConnectorOptions - .StateType.MAP)); - - TypeInformation valueTypeInfo = - getTypeInfo( - options, - valueFormatOption, - valueTypeInfoFactoryOption, - valueRowField, - true); - return new StateValueColumnConfiguration( - columnIndex, - options.getOptional(stateNameOption) - .orElse(valueRowField.getName()), - stateType, - mapKeyTypeInfo, - valueTypeInfo); - }) + columnIndex -> + createStateColumnConfiguration( + columnIndex, + rowType, + options, + optionalOptions, + typeResolver)) .collect(Collectors.toList())); FactoryUtil.validateFactoryOptions(requiredOptions, optionalOptions, options); @@ -234,6 +154,83 @@ public DynamicTableSource createDynamicTableSource(Context context) { rowType); } + private StateValueColumnConfiguration createStateColumnConfiguration( + int columnIndex, + RowType rowType, + Configuration options, + Set> optionalOptions, + SavepointTypeInfoResolver typeResolver) { + + RowType.RowField valueRowField = rowType.getFields().get(columnIndex); + + ConfigOption stateNameOption = + key(String.format("%s.%s.%s", FIELDS, valueRowField.getName(), STATE_NAME)) + .stringType() + .noDefaultValue(); + optionalOptions.add(stateNameOption); + + ConfigOption stateTypeOption = + key(String.format("%s.%s.%s", FIELDS, valueRowField.getName(), STATE_TYPE)) + .enumType(SavepointConnectorOptions.StateType.class) + .noDefaultValue(); + optionalOptions.add(stateTypeOption); + + ConfigOption mapKeyFormatOption = + key(String.format("%s.%s.%s", FIELDS, valueRowField.getName(), KEY_CLASS)) + .stringType() + .noDefaultValue(); + optionalOptions.add(mapKeyFormatOption); + + ConfigOption mapKeyTypeInfoFactoryOption = + key(String.format("%s.%s.%s", FIELDS, valueRowField.getName(), KEY_TYPE_FACTORY)) + .stringType() + .noDefaultValue(); + optionalOptions.add(mapKeyTypeInfoFactoryOption); + + ConfigOption valueFormatOption = + key(String.format("%s.%s.%s", FIELDS, valueRowField.getName(), VALUE_CLASS)) + .stringType() + .noDefaultValue(); + optionalOptions.add(valueFormatOption); + + ConfigOption valueTypeInfoFactoryOption = + key(String.format("%s.%s.%s", FIELDS, valueRowField.getName(), VALUE_TYPE_FACTORY)) + .stringType() + .noDefaultValue(); + optionalOptions.add(valueTypeInfoFactoryOption); + + LogicalType valueLogicalType = valueRowField.getType(); + + SavepointConnectorOptions.StateType stateType = + options.getOptional(stateTypeOption) + .orElseGet(() -> inferStateType(valueLogicalType)); + + TypeSerializer mapKeyTypeSerializer = + typeResolver.resolveSerializer( + options, + mapKeyFormatOption, + mapKeyTypeInfoFactoryOption, + valueRowField, + stateType.equals(SavepointConnectorOptions.StateType.MAP), + SavepointTypeInfoResolver.InferenceContext.MAP_KEY); + + TypeSerializer valueTypeSerializer = + typeResolver.resolveSerializer( + options, + valueFormatOption, + valueTypeInfoFactoryOption, + valueRowField, + true, + SavepointTypeInfoResolver.InferenceContext.VALUE); + + return new StateValueColumnConfiguration( + columnIndex, + options.getOptional(stateNameOption).orElse(valueRowField.getName()), + stateType, + mapKeyTypeSerializer, + valueTypeSerializer); + } + private Tuple2 createKeyValueProjections(ResolvedCatalogTable catalogTable) { ResolvedSchema schema = catalogTable.getResolvedSchema(); if (schema.getPrimaryKey().isEmpty()) { @@ -271,46 +268,6 @@ private int[] createValueFormatProjection(DataType physicalDataType, int keyProj return physicalFields.filter(pos -> keyProjection != pos).toArray(); } - private TypeInformation getTypeInfo( - Configuration options, - ConfigOption classOption, - ConfigOption typeInfoFactoryOption, - RowType.RowField rowField, - boolean inferStateType) { - Optional clazz = options.getOptional(classOption); - Optional typeInfoFactory = options.getOptional(typeInfoFactoryOption); - if (clazz.isPresent() && typeInfoFactory.isPresent()) { - throw new IllegalArgumentException( - "Either " - + classOption.key() - + " or " - + typeInfoFactoryOption.key() - + " can be specified for column " - + rowField.getName() - + "."); - } - try { - if (clazz.isPresent()) { - return TypeInformation.of(Class.forName(clazz.get())); - } else if (typeInfoFactory.isPresent()) { - SavepointTypeInformationFactory savepointTypeInformationFactory = - (SavepointTypeInformationFactory) - TypeUtils.getInstance(typeInfoFactory.get(), new Object[0]); - return savepointTypeInformationFactory.getTypeInformation(); - } else { - if (inferStateType) { - String inferredValueFormat = - inferStateValueFormat(rowField.getName(), rowField.getType()); - return TypeInformation.of(Class.forName(inferredValueFormat)); - } else { - return null; - } - } - } catch (ReflectiveOperationException e) { - throw new RuntimeException(e); - } - } - private SavepointConnectorOptions.StateType inferStateType(LogicalType logicalType) { switch (logicalType.getTypeRoot()) { case ARRAY: @@ -324,82 +281,25 @@ private SavepointConnectorOptions.StateType inferStateType(LogicalType logicalTy } } - @Nullable - private String inferStateMapKeyFormat(String columnName, LogicalType logicalType) { - return logicalType.is(LogicalTypeRoot.MAP) - ? inferStateValueFormat(columnName, ((MapType) logicalType).getKeyType()) - : null; - } - - private String inferStateValueFormat(String columnName, LogicalType logicalType) { - switch (logicalType.getTypeRoot()) { - case CHAR: - case VARCHAR: - return String.class.getName(); - - case BOOLEAN: - return Boolean.class.getName(); - - case BINARY: - case VARBINARY: - return byte[].class.getName(); - - case DECIMAL: - return BigDecimal.class.getName(); - - case TINYINT: - return Byte.class.getName(); - - case SMALLINT: - return Short.class.getName(); - - case INTEGER: - return Integer.class.getName(); - - case BIGINT: - return Long.class.getName(); - - case FLOAT: - return Float.class.getName(); - - case DOUBLE: - return Double.class.getName(); - - case DATE: - return Integer.class.getName(); - - case INTERVAL_YEAR_MONTH: - case INTERVAL_DAY_TIME: - return Long.class.getName(); - - case ARRAY: - return inferStateValueFormat( - columnName, ((ArrayType) logicalType).getElementType()); - - case MAP: - return inferStateValueFormat(columnName, ((MapType) logicalType).getValueType()); - - case NULL: - return null; - - case ROW: - case MULTISET: - case TIME_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITHOUT_TIME_ZONE: - case TIMESTAMP_WITH_TIME_ZONE: - case TIMESTAMP_WITH_LOCAL_TIME_ZONE: - case DISTINCT_TYPE: - case STRUCTURED_TYPE: - case RAW: - case SYMBOL: - case UNRESOLVED: - case DESCRIPTOR: - default: - throw new UnsupportedOperationException( - String.format( - "Unable to infer state format for SQL type: %s in column: %s. " - + "Please override the type with the following config parameter: %s.%s.%s", - logicalType, columnName, FIELDS, columnName, VALUE_CLASS)); + /** + * Preloads all state metadata for an operator in a single I/O operation. + * + * @param savepointPath Path to the savepoint + * @param operatorIdentifier Operator UID or hash + * @return Map from state name to StateMetaInfoSnapshot + */ + private Map preloadStateMetadata( + String savepointPath, OperatorIdentifier operatorIdentifier) { + try { + return SavepointLoader.loadOperatorStateMetadata(savepointPath, operatorIdentifier); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to load state metadata from savepoint '%s' for operator '%s'. " + + "Ensure the savepoint path is valid and the operator exists in the savepoint. " + + "Original error: %s", + savepointPath, operatorIdentifier, e.getMessage()), + e); } } diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointTypeInfoResolver.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointTypeInfoResolver.java new file mode 100644 index 0000000000000..19832c36d9e65 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointTypeInfoResolver.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.utils.TypeUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.base.MapSerializer; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.state.table.SavepointConnectorOptions.FIELDS; +import static org.apache.flink.state.table.SavepointConnectorOptions.VALUE_CLASS; + +/** Resolver for TypeInformation from savepoint metadata and configuration. */ +public class SavepointTypeInfoResolver { + + private static final Logger LOG = LoggerFactory.getLogger(SavepointTypeInfoResolver.class); + + /** Context for type inference to determine what aspect of the type we need. */ + public enum InferenceContext { + /** Inferring the key type of keyed state (always primitive). */ + KEY, + /** Inferring the key type of a MAP state. */ + MAP_KEY, + /** Inferring the value type (behavior depends on logical type). */ + VALUE + } + + private final Map preloadedStateMetadata; + private final SerializerConfig serializerConfig; + + public SavepointTypeInfoResolver( + Map preloadedStateMetadata, + SerializerConfig serializerConfig) { + this.preloadedStateMetadata = preloadedStateMetadata; + this.serializerConfig = serializerConfig; + } + + /** + * Resolves TypeInformation for keyed state keys (primitive types only). + * + *

This is a simplified version of type resolution specifically for key types, which are + * always primitive and don't require complex metadata inference. + * + * @param options Configuration containing table options + * @param classOption Config option for explicit class specification + * @param typeInfoFactoryOption Config option for type factory specification + * @param rowField The row field containing name and LogicalType + * @return The resolved TypeInformation for the key + * @throws IllegalArgumentException If both class and factory options are specified + * @throws RuntimeException If type instantiation fails + */ + public TypeInformation resolveKeyType( + Configuration options, + ConfigOption classOption, + ConfigOption typeInfoFactoryOption, + RowType.RowField rowField) { + try { + // Priority 1: Explicit configuration (backward compatibility) + TypeInformation explicitTypeInfo = + getExplicitTypeInfo(options, classOption, typeInfoFactoryOption); + if (explicitTypeInfo != null) { + return explicitTypeInfo; + } + + // Priority 2: Simple primitive type inference from LogicalType + LogicalType logicalType = rowField.getType(); + String columnName = rowField.getName(); + return TypeInformation.of(getPrimitiveClass(logicalType, columnName)); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + + /** + * Resolves TypeSerializer for a table field using a three-tier priority system with direct + * serializer extraction for metadata inference. + * + *

Three-Tier Priority System (Serializer-First)

+ * + *
    + *
  1. Priority 1: Explicit Configuration (Highest priority)
    + * Uses user-specified class name or type factory from table options, then converts to + * serializer. + *
  2. Priority 2: Metadata Inference
    + * Directly extracts serializers from preloaded savepoint metadata (NO TypeInformation + * conversion). + *
  3. Priority 3: LogicalType Fallback (Lowest priority)
    + * Infers TypeInformation from table schema's LogicalType, then converts to serializer. + *
+ * + *

This approach eliminates TypeInformation extraction complexity for metadata inference, + * making it work with ANY serializer type (Avro, custom types, etc.). + * + * @param options Configuration containing table options + * @param classOption Config option for explicit class specification + * @param typeInfoFactoryOption Config option for type factory specification + * @param rowField The table field containing name and LogicalType + * @param inferStateType Whether to enable automatic type inference. If false, returns null when + * no explicit configuration is provided. + * @param context The inference context determining what type aspect to extract. + * @return The resolved TypeSerializer, or null if inferStateType is false and no explicit + * configuration is provided. + * @throws IllegalArgumentException If both class and factory options are specified + * @throws RuntimeException If serializer creation fails + */ + public TypeSerializer resolveSerializer( + Configuration options, + ConfigOption classOption, + ConfigOption typeInfoFactoryOption, + RowType.RowField rowField, + boolean inferStateType, + InferenceContext context) { + try { + // Priority 1: Explicit configuration (backward compatibility) + TypeInformation explicitTypeInfo = + getExplicitTypeInfo(options, classOption, typeInfoFactoryOption); + if (explicitTypeInfo != null) { + return explicitTypeInfo.createSerializer(serializerConfig); + } + + if (inferStateType) { + // Priority 2: Direct serializer extraction from metadata + Optional> metadataSerializer = + getSerializerFromMetadata(rowField, context); + if (metadataSerializer.isPresent()) { + LOG.info( + "Using serializer directly from metadata for state '{}' with context {}: {}", + rowField.getName(), + context, + metadataSerializer.get().getClass().getSimpleName()); + return metadataSerializer.get(); + } + + // Priority 3: Fallback to LogicalType-based inference + TypeInformation fallbackTypeInfo = inferTypeFromLogicalType(rowField, context); + return fallbackTypeInfo.createSerializer(serializerConfig); + } else { + return null; + } + } catch (Exception e) { + throw new RuntimeException( + "Failed to resolve serializer for field " + rowField.getName(), e); + } + } + + /** + * Extracts explicit TypeInformation from user configuration (Priority 1). + * + * @param options Configuration containing table options + * @param classOption Config option for explicit class specification + * @param typeInfoFactoryOption Config option for type factory specification + * @return The explicit TypeInformation if specified, null otherwise + * @throws IllegalArgumentException If both class and factory options are specified + * @throws ReflectiveOperationException If type instantiation fails + */ + private TypeInformation getExplicitTypeInfo( + Configuration options, + ConfigOption classOption, + ConfigOption typeInfoFactoryOption) + throws ReflectiveOperationException { + + Optional clazz = options.getOptional(classOption); + Optional typeInfoFactory = options.getOptional(typeInfoFactoryOption); + + if (clazz.isPresent() && typeInfoFactory.isPresent()) { + throw new IllegalArgumentException( + "Either " + + classOption.key() + + " or " + + typeInfoFactoryOption.key() + + " can be specified, not both."); + } + + if (clazz.isPresent()) { + return TypeInformation.of(Class.forName(clazz.get())); + } else if (typeInfoFactory.isPresent()) { + SavepointTypeInformationFactory savepointTypeInformationFactory = + (SavepointTypeInformationFactory) + TypeUtils.getInstance(typeInfoFactory.get(), new Object[0]); + return savepointTypeInformationFactory.getTypeInformation(); + } + + return null; + } + + /** + * Directly extracts TypeSerializer from preloaded metadata (Priority 2). + * + *

This method performs NO I/O and NO TypeInformation conversion. It directly extracts the + * serializer that was used to write the state data. + * + * @param rowField The row field to extract serializer for + * @param context The inference context determining what serializer to extract + * @return The serializer if found in metadata, empty otherwise + */ + private Optional> getSerializerFromMetadata( + RowType.RowField rowField, InferenceContext context) { + try { + // Get state name for this field (defaults to field name) + String stateName = rowField.getName(); + + // Look up from preloaded metadata (NO I/O) + StateMetaInfoSnapshot stateMetaInfo = preloadedStateMetadata.get(stateName); + + if (stateMetaInfo == null) { + LOG.debug("State '{}' not found in preloaded metadata", stateName); + return Optional.empty(); + } + + // Extract appropriate serializer based on context + TypeSerializerSnapshot serializerSnapshot = null; + switch (context) { + case KEY: + serializerSnapshot = + stateMetaInfo.getTypeSerializerSnapshot( + StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER); + break; + case MAP_KEY: + // For MAP_KEY, we need the key serializer from the value serializer + // (which is MapSerializer) + TypeSerializerSnapshot valueSnapshot = + stateMetaInfo.getTypeSerializerSnapshot( + StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER); + if (valueSnapshot != null) { + TypeSerializer valueSerializer = valueSnapshot.restoreSerializer(); + if (valueSerializer instanceof MapSerializer) { + serializerSnapshot = + ((MapSerializer) valueSerializer) + .getKeySerializer() + .snapshotConfiguration(); + } + } + break; + case VALUE: + serializerSnapshot = + stateMetaInfo.getTypeSerializerSnapshot( + StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER); + break; + } + + if (serializerSnapshot == null) { + LOG.debug( + "No serializer snapshot found for state '{}' with context {}", + stateName, + context); + return Optional.empty(); + } + + // Restore serializer from snapshot + TypeSerializer serializer = serializerSnapshot.restoreSerializer(); + + // For VALUE context with complex types, extract the appropriate sub-serializer + if (context == InferenceContext.VALUE) { + return extractValueSerializerForLogicalType(serializer, rowField.getType()); + } + + return Optional.of(serializer); + + } catch (Exception e) { + LOG.warn( + "Failed to extract serializer from metadata for field '{}': {}", + rowField.getName(), + e.getMessage()); + return Optional.empty(); + } + } + + /** + * Extracts the appropriate value serializer based on LogicalType for VALUE context. + * + * @param fullSerializer The complete serializer from metadata + * @param logicalType The LogicalType from the table schema + * @return The appropriate value serializer + */ + private Optional> extractValueSerializerForLogicalType( + TypeSerializer fullSerializer, LogicalType logicalType) { + + switch (logicalType.getTypeRoot()) { + case ARRAY: + // ARRAY logical type → LIST state → extract element serializer + if (fullSerializer + instanceof org.apache.flink.api.common.typeutils.base.ListSerializer) { + org.apache.flink.api.common.typeutils.base.ListSerializer listSerializer = + (org.apache.flink.api.common.typeutils.base.ListSerializer) + fullSerializer; + return Optional.of(listSerializer.getElementSerializer()); + } + LOG.debug( + "Expected ListSerializer for ARRAY logical type but got: {}", + fullSerializer.getClass()); + return Optional.empty(); + + case MAP: + // MAP logical type → MAP state → extract value serializer + if (fullSerializer instanceof MapSerializer) { + return Optional.of(((MapSerializer) fullSerializer).getValueSerializer()); + } + LOG.debug( + "Expected MapSerializer for MAP logical type but got: {}", + fullSerializer.getClass()); + return Optional.empty(); + + default: + // Primitive logical type → VALUE state → use serializer as-is + return Optional.of(fullSerializer); + } + } + + /** + * Fallback inference using LogicalType when metadata extraction fails. + * + * @param rowField The row field to infer type for + * @param context The inference context + * @return The inferred TypeInformation + */ + private TypeInformation inferTypeFromLogicalType( + RowType.RowField rowField, InferenceContext context) { + + LogicalType logicalType = rowField.getType(); + String columnName = rowField.getName(); + + try { + switch (context) { + case KEY: + // Keys are always primitive + return TypeInformation.of(getPrimitiveClass(logicalType, columnName)); + + case MAP_KEY: + // Extract key type from MAP logical type + if (logicalType instanceof MapType) { + LogicalType keyType = ((MapType) logicalType).getKeyType(); + return TypeInformation.of(getPrimitiveClass(keyType, columnName)); + } + throw new UnsupportedOperationException( + "MAP_KEY context requires MAP logical type, but got: " + logicalType); + + case VALUE: + return inferValueTypeFromLogicalType(logicalType, columnName); + + default: + throw new UnsupportedOperationException("Unknown context: " + context); + } + } catch (ClassNotFoundException e) { + throw new RuntimeException("Failed to infer type for context " + context, e); + } + } + + /** + * Infers value type from LogicalType for VALUE context fallback. + * + * @param logicalType The LogicalType + * @param columnName The column name for error messages + * @return The inferred TypeInformation + */ + private TypeInformation inferValueTypeFromLogicalType( + LogicalType logicalType, String columnName) throws ClassNotFoundException { + + switch (logicalType.getTypeRoot()) { + case ARRAY: + // ARRAY logical type → LIST state → return element type + ArrayType arrayType = (ArrayType) logicalType; + return TypeInformation.of( + getPrimitiveClass(arrayType.getElementType(), columnName)); + + case MAP: + // MAP logical type → MAP state → return value type + MapType mapType = (MapType) logicalType; + return TypeInformation.of(getPrimitiveClass(mapType.getValueType(), columnName)); + + default: + // Primitive logical type → VALUE state → return primitive type + return TypeInformation.of(getPrimitiveClass(logicalType, columnName)); + } + } + + /** + * Maps LogicalType to primitive Java class. + * + * @param logicalType The LogicalType to map + * @param columnName The column name for error messages + * @return The corresponding Java class + */ + private Class getPrimitiveClass(LogicalType logicalType, String columnName) + throws ClassNotFoundException { + String className = inferTypeInfoClassFromLogicalType(columnName, logicalType); + return Class.forName(className); + } + + private String inferTypeInfoClassFromLogicalType(String columnName, LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return String.class.getName(); + + case BOOLEAN: + return Boolean.class.getName(); + + case BINARY: + case VARBINARY: + return byte[].class.getName(); + + case DECIMAL: + return BigDecimal.class.getName(); + + case TINYINT: + return Byte.class.getName(); + + case SMALLINT: + return Short.class.getName(); + + case INTEGER: + return Integer.class.getName(); + + case BIGINT: + return Long.class.getName(); + + case FLOAT: + return Float.class.getName(); + + case DOUBLE: + return Double.class.getName(); + + case DATE: + return Integer.class.getName(); + + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return Long.class.getName(); + + case ARRAY: + return inferTypeInfoClassFromLogicalType( + columnName, ((ArrayType) logicalType).getElementType()); + + case MAP: + return inferTypeInfoClassFromLogicalType( + columnName, ((MapType) logicalType).getValueType()); + + case NULL: + return null; + + case ROW: + case MULTISET: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + case RAW: + case SYMBOL: + case UNRESOLVED: + case DESCRIPTOR: + default: + throw new UnsupportedOperationException( + String.format( + "Unable to infer state format for SQL type: %s in column: %s. " + + "Please override the type with the following config parameter: %s.%s.%s", + logicalType, columnName, FIELDS, columnName, VALUE_CLASS)); + } + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java index fa622bb0a10a9..865077717fc7c 100644 --- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java @@ -19,7 +19,7 @@ package org.apache.flink.state.table; import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import javax.annotation.Nullable; @@ -31,21 +31,21 @@ public class StateValueColumnConfiguration implements Serializable { private final int columnIndex; private final String stateName; private final SavepointConnectorOptions.StateType stateType; - @Nullable private final TypeInformation mapKeyTypeInfo; - @Nullable private final TypeInformation valueTypeInfo; + @Nullable private final TypeSerializer mapKeyTypeSerializer; + @Nullable private final TypeSerializer valueTypeSerializer; @Nullable private StateDescriptor stateDescriptor; public StateValueColumnConfiguration( int columnIndex, final String stateName, final SavepointConnectorOptions.StateType stateType, - @Nullable final TypeInformation mapKeyTypeInfo, - final TypeInformation valueTypeInfo) { + @Nullable final TypeSerializer mapKeyTypeSerializer, + final TypeSerializer valueTypeSerializer) { this.columnIndex = columnIndex; this.stateName = stateName; this.stateType = stateType; - this.mapKeyTypeInfo = mapKeyTypeInfo; - this.valueTypeInfo = valueTypeInfo; + this.mapKeyTypeSerializer = mapKeyTypeSerializer; + this.valueTypeSerializer = valueTypeSerializer; } public int getColumnIndex() { @@ -61,12 +61,12 @@ public SavepointConnectorOptions.StateType getStateType() { } @Nullable - public TypeInformation getMapKeyTypeInfo() { - return mapKeyTypeInfo; + public TypeSerializer getMapKeyTypeSerializer() { + return mapKeyTypeSerializer; } - public TypeInformation getValueTypeInfo() { - return valueTypeInfo; + public TypeSerializer getValueTypeSerializer() { + return valueTypeSerializer; } public void setStateDescriptor(StateDescriptor stateDescriptor) { diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/GenericAvroSavepointTypeInformationFactory.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/GenericAvroSavepointTypeInformationFactory.java deleted file mode 100644 index b73e3273e1f97..0000000000000 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/GenericAvroSavepointTypeInformationFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.state.table; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo; - -import com.example.state.writer.job.schema.avro.AvroRecord; - -/** {@link SavepointTypeInformationFactory} for generic avro record. */ -public class GenericAvroSavepointTypeInformationFactory implements SavepointTypeInformationFactory { - @Override - public TypeInformation getTypeInformation() { - return new GenericRecordAvroTypeInfo(AvroRecord.getClassSchema()); - } -} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java index 9afeb85a66040..abbc7653c6f07 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java @@ -53,14 +53,13 @@ public void testReadKeyedState() throws Exception { + " KeyedPrimitiveValue bigint,\n" + " KeyedPojoValue ROW,\n" + " KeyedPrimitiveValueList ARRAY,\n" - + " KeyedPrimitiveValueMap MAP,\n" + + " KeyedPrimitiveValueMap MAP,\n" + " PRIMARY KEY (k) NOT ENFORCED\n" + ")\n" + "with (\n" + " 'connector' = 'savepoint',\n" + " 'state.path' = 'src/test/resources/table-state',\n" - + " 'operator.uid' = 'keyed-state-process-uid',\n" - + " 'fields.KeyedPojoValue.value-class' = 'com.example.state.writer.job.schema.PojoData'\n" + + " 'operator.uid' = 'keyed-state-process-uid'\n" + ")"; tEnv.executeSql(sql); Table table = tEnv.sqlQuery("SELECT * FROM state_table"); @@ -108,20 +107,21 @@ public void testReadKeyedState() throws Exception { } // Check map state - Set>> mapValues = + Set>> mapValues = result.stream() .map( r -> Tuple2.of( (Long) r.getField("k"), - (Map) + (Map) r.getField("KeyedPrimitiveValueMap"))) .flatMap(l -> Set.of(l).stream()) .collect(Collectors.toSet()); assertThat(mapValues.size()).isEqualTo(10); - for (Tuple2> tuple2 : mapValues) { + for (Tuple2> tuple2 : mapValues) { assertThat(tuple2.f1.size()).isEqualTo(1); - assertThat(tuple2.f0).isEqualTo(tuple2.f1.get(tuple2.f0)); + String expectedKey = String.valueOf(tuple2.f0); + assertThat(tuple2.f1.get(expectedKey)).isEqualTo(tuple2.f0); } } @@ -142,8 +142,7 @@ public void testReadKeyedStateWithNullValues() throws Exception { + "with (\n" + " 'connector' = 'savepoint',\n" + " 'state.path' = 'src/test/resources/table-state-nulls',\n" - + " 'operator.uid' = 'keyed-state-process-uid-null',\n" - + " 'fields.total.value-class' = 'com.example.state.writer.job.schema.PojoData'\n" + + " 'operator.uid' = 'keyed-state-process-uid-null'\n" + ")"; tEnv.executeSql(sql); Table table = tEnv.sqlQuery("SELECT * FROM state_table"); @@ -185,9 +184,7 @@ public void testReadAvroKeyedState() throws Exception { + "with (\n" + " 'connector' = 'savepoint',\n" + " 'state.path' = 'src/test/resources/table-state-avro',\n" - + " 'operator.uid' = 'keyed-state-process-uid',\n" - + " 'fields.KeyedSpecificAvroValue.value-type-factory' = 'org.apache.flink.state.table.SpecificAvroSavepointTypeInformationFactory',\n" - + " 'fields.KeyedGenericAvroValue.value-type-factory' = 'org.apache.flink.state.table.GenericAvroSavepointTypeInformationFactory'\n" + + " 'operator.uid' = 'keyed-state-process-uid'\n" + ")"; tEnv.executeSql(sql); Table table = tEnv.sqlQuery("SELECT * FROM state_table"); diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java index 128166d8f57cc..6a009820ba4df 100644 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointMetadataDynamicTableSourceTest.java @@ -53,9 +53,9 @@ public void testReadMetadata() throws Exception { Iterator it = result.iterator(); assertThat(it.next().toString()) .isEqualTo( - "+I[2, Source: broadcast-source, broadcast-source-uid, 3a6f51704798c4f418be51bfb6813b77, 1, 128, 0, 0, 0]"); + "+I[10, Source: broadcast-source, broadcast-source-uid, 3a6f51704798c4f418be51bfb6813b77, 1, 128, 0, 0, 0]"); assertThat(it.next().toString()) .isEqualTo( - "+I[2, keyed-broadcast-process, keyed-broadcast-process-uid, 413c1d6f88ee8627fe4b8bc533b4cf1b, 2, 128, 2, 0, 4548]"); + "+I[10, keyed-broadcast-process, keyed-broadcast-process-uid, 413c1d6f88ee8627fe4b8bc533b4cf1b, 2, 128, 2, 0, 4548]"); } } diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointTypeInformationFactoryTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointTypeInformationFactoryTest.java new file mode 100644 index 0000000000000..280b0b4c86f4b --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointTypeInformationFactoryTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the SavepointTypeInformationFactory. */ +public class SavepointTypeInformationFactoryTest { + + public static class TestLongTypeInformationFactory implements SavepointTypeInformationFactory { + private static volatile boolean wasCalled = false; + + public static boolean wasFactoryCalled() { + return wasCalled; + } + + public static void resetCallTracker() { + wasCalled = false; + } + + @Override + public TypeInformation getTypeInformation() { + wasCalled = true; + return TypeInformation.of(Long.class); + } + } + + public static class TestStringTypeInformationFactory + implements SavepointTypeInformationFactory { + @Override + public TypeInformation getTypeInformation() { + return TypeInformation.of(String.class); + } + } + + @Test + public void testSavepointTypeInformationFactoryEndToEnd() throws Exception { + TestLongTypeInformationFactory.resetCallTracker(); + + Configuration config = new Configuration(); + config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + final String sql = + "CREATE TABLE state_table (\n" + + " k bigint,\n" + + " KeyedPrimitiveValue bigint,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ")\n" + + "with (\n" + + " 'connector' = 'savepoint',\n" + + " 'state.path' = 'src/test/resources/table-state',\n" + + " 'operator.uid' = 'keyed-state-process-uid',\n" + + " 'fields.KeyedPrimitiveValue.value-type-factory' = '" + + TestLongTypeInformationFactory.class.getName() + + "'\n" + + ")"; + + tEnv.executeSql(sql); + Table table = tEnv.sqlQuery("SELECT k, KeyedPrimitiveValue FROM state_table"); + List result = tEnv.toDataStream(table).executeAndCollect(100); + + assertThat(TestLongTypeInformationFactory.wasFactoryCalled()) + .as( + "Factory getTypeInformation() method must be called - this proves factory is used instead of metadata inference") + .isTrue(); + + assertThat(result.size()).isEqualTo(10); + + Set keys = + result.stream().map(r -> (Long) r.getField("k")).collect(Collectors.toSet()); + assertThat(keys).hasSize(10); + assertThat(keys).containsExactlyInAnyOrder(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L); + + Set primitiveValues = + result.stream() + .map(r -> (Long) r.getField("KeyedPrimitiveValue")) + .collect(Collectors.toSet()); + assertThat(primitiveValues).hasSize(1); + assertThat(primitiveValues.iterator().next()).isEqualTo(1L); + } + + @Test + public void testBasicFactoryFunctionality() { + TestLongTypeInformationFactory.resetCallTracker(); + + TestLongTypeInformationFactory longFactory = new TestLongTypeInformationFactory(); + TypeInformation longTypeInfo = longFactory.getTypeInformation(); + + assertThat(longTypeInfo).isEqualTo(TypeInformation.of(Long.class)); + assertThat(TestLongTypeInformationFactory.wasFactoryCalled()).isTrue(); + + TestStringTypeInformationFactory stringFactory = new TestStringTypeInformationFactory(); + TypeInformation stringTypeInfo = stringFactory.getTypeInformation(); + + assertThat(stringTypeInfo).isEqualTo(TypeInformation.of(String.class)); + } +} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SpecificAvroSavepointTypeInformationFactory.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SpecificAvroSavepointTypeInformationFactory.java deleted file mode 100644 index 8e9e459aff0a9..0000000000000 --- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SpecificAvroSavepointTypeInformationFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.state.table; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; - -import com.example.state.writer.job.schema.avro.AvroRecord; - -/** {@link SavepointTypeInformationFactory} for specific avro record. */ -public class SpecificAvroSavepointTypeInformationFactory - implements SavepointTypeInformationFactory { - @Override - public TypeInformation getTypeInformation() { - return new AvroTypeInfo<>(AvroRecord.class); - } -} diff --git a/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata b/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata index dc9d5acbbd2ec17b819a7c2f96ab4a258d9f8303..1bff4e03d5f59c3c2fd6048add68f990f1ccafdd 100644 GIT binary patch literal 26034 zcmeHQZEPIJdEPsU`miF3BB`&?iWQo*+t_=&+q=EnBb``BiB?!qCe5RzT5dw`*HLGY zckJF#vZDGU3Xrrhf&j`V5+nf>pat3@Db%L^kv1umwn5+@4&)~J5x_ANAV%VRRSl3I zY2&`L^S-xtFOQB!HtpKE0glf;v-`}<&d$#4&b;q(@}UJ?)3k2*iEtDAV&6Ko>yv;* zv8@%{>Qcgg#9mrn^b@tZUH21bf^yvt5)W6M#FAZuO&NVO+@1@{3k!ZQ>(}blz<+nO zdJ2-+3_lUx0l!V!u3~j1aQ)H!o?W*W{EA)+xz^Xp^_8-B_)9-N`0?-lN!?Ovdp`Njsf$91{-fgStDm!*-}22wzEb$W2L>Ml}y9Ir}R{EB%EPLAI{H#vRp{L$${6IyR^ zW@>US+A#sC#m% zcG+L4mltaZ$FBJaI|%HviSe^_e9h$=?DzMQrSRgU?v|JArKXD|Huo-8D+~IHSJKav>nC;DQRpW{f3R=OUE-Fn zJo~GyFWvQ#|M|hwpXmE=@1FmHw8C3nBuY20*lrD?eeke{`WYvv+MWyD6ZJF$JVs!5Q>>S;S!(les_%jZj3-SME4 zwq2{_RhSvo2gqyA9 z@DhPgJW7ZV;#=glc0ghm7OJZc`4FOkIF2YrDF|H-Vt{xa@dDyS#7l^m5w9R#MZ6|) zLmaUWaR|{s97mi*bPxl?^N1G^FCtz-yo`7S@hajqiQRF;KExqJ192R27STZr5YHoC zK)i@}3Gp)G6~wEE*CcL?BlaN(F%EJ6g%@6c&Ukyq@ci&nMVh2|BOSO97+e?sA_()b=_5&N9sB4KikzLd+we&;V zUJ!S4TpKw4zks4ASg#X5wyM0rN=2M4z9Sr~mc4uJCBFvGmVQ*);Wgr0&f;w?RqCJi zEA>uo-f(R~BcP#OY(j`J9%sCZ@ePc3Grp1WO^k15d<)|@F}{`Yw=lkq@gByvGv3Si z4#xW!hZhoAQTTU&@tutCVthB_dl=u#_*)qtWc+5v_c8u9#&2PKKjUv_e2DQ|86Rf+ z4;a6V@!J{yL&pDz@pmwO2jh1#t}~use1!2N<0-}s#?y>v7&jTuGM;1HVm#0IDC1*{ z-^KVl8Gje!cen82OfHx9(w+%prIXh)Rz9WM`E*kEt)!Ptnno^d8qMupH2&B2O~J4- zwrg9K?&tj!ylbcPx|206J?mOtI+-t}vW^X9dL9ZEj^>>=1)6YqnRk*m@SDN`xX|JU zq4TC7>!G_3aR?D_+3s=Vvxp92fOsD90^&u)ONf^duOMDUye9Dt|E7?&(j_BT^7Uli zg!h9yyeZgu+tf>@X_+aKYCzmM^AjQ=^~k23xki+_UW&i9I-x){{dnmX{&)d;#%UiQ93SdiztzUzFH;3!+%vhS2*V z#IGX$LgJ2F5iz|!Os{WD#`j@*{g_@qrq_?@^<#SdnBD-UH*j3)2cAUyI^usz+<80V zgNPqR{08E$CGNTdaUStwh~Gr~jl|s}h%Vx%5T8bTN#dR~;sWAl5ucH`w+k_ccuL}1 z2N9`HN;Ac-Z<(+zZ|X)qW5Ftvn}_E= zc(+Vhrf24S(=U0jj(hWyknOt86mQo0kBDWx#@hKYdvS&CCYTk56{LwnxTZh%z|4f! z9WK~IvEpm2-LKD#_Q1V`qZ7rM!ogLx0_!>DyECkt!+r&93EaZzp!$ky=UuBNs7uvg z$*$KDHYDF<+aPj_Ca9YyqHk+TA{2qGiYQ+K%Q@@2)xi63?A(yBlo%Qv3c`q?JBKPT z^9@`iEf2!Da5fxvysS-44U93C?68<1E0z64uQoLL;5kZl5tbznf#nSB#C%1}(4+5p zRC|3cUXN_p)rVjCRU2=r^2i?LVf(b*bj4_;VFq$+d*$NVo!3*tLN6*3)ECX=sMDWjT&s%+G<_Bu7x zU2S^cqqey2BDCv-yN(uSVAon*@ak2&waup>qqdE;OOva1v6fZ4xwbb#d&8_^9gI?% zEfsh~bEmdW&GGf=d3LwCpi||VurU*%-4)$11qO4lZ>>vOP^fs3)pcLf*qf^6<*ig) z_T6&n|Idh>xt?x69Ll4ixifxuOc>Dil^!g@x7WlPrV1Ss1~T_;8CKTw?LhYB;;)=4 zknf_>P;HZl#t^>1H4NLx%t1vH#|aS?4Vg=*Xvm~OMMGv6DjG7)P|-;9jmAVU*$2kL z%8#ODl2G%Fs90#_)SyBbi^JlI3>!ixgO!e}%Wx58cU zXLD=YAd}LzZ6n&*rfr1jVya@BOn+3)k<5csG}45qQ7^J&;;cm?QS|Su7a@$rvv>wD zBPBYyNRRPt>m|ywO}&KaqFQW|iId7Xk{OhWhD@bYG-OVtq9Kzj6%CnXsc6V_OGQKG zT`HPEyyz+#G80qLkSUpphRn@WG-Q&dqS=1~bwH+VD)|iEKt5z5r;-nu(Wz+2)J{c1 z=6EU^GTBqnkXfIKhD`rdG-MvAq9GGP6%Cmgs%Xd*QAI=MiYgj1X;jgW*`tbvOe0k^ zWIm~)ktUXnfsK4d*m$``CfPg$vgZgPjK%XfvVreO%Xk?j8@(m8$+OLP8K#S=ifu9h zRXIm86IIcWDXEHv%uQ7^WRj|)A+uE#4Vkv8XvS`!KPD4dm3+vIRz*XmwkjGj$Gv{d zuz874dGw8sc9geqZ8_?T7~v&mjIS;twSD^&@@*@z)aj??9YK{21al5q~3b zU<47<+llGzJS^jPos?m@(9#19}oh4`YxJ+~kh5kDkx zFQ&H_(_6RI^JibB)$^e4;`Q@p=x3GFZSdsO!6&+8<#VQ+^T82#pQwhg7hNwS z!p`bBbEImSd+V&8M_;7qte*3)IdoRf!;e$6+%atVyR&-U-d(e$&N{2-Fu(m~GP>%l zo_D>jhA5rYbMh8%`l8euZv5`7o_AKy*Z5+VFwTqYgN9pa%h~L@F9`Gg8rzE0T(a+?7-`fiblG-H6|x%gU#~CWRft2 zL{Pe{+*xNG8PpY=7;?d4Fv;1ObbbzLjJD$oplFOcohTQs8H01iHqInR{ zPDMj5gen?xGgQ$?S466PxDYb1wt)s;u&;3>Ya2oqi^GTHD73TUx`Fy9mrIp=$SqSvL#~@D8glPc(U1$MiiX@oRW#&Es-huxQxy%lq^fAh zZB<1>uB|E>a(`9Pkc+H}hTLdXG~{Zlq9J!&6^(S+l?u3R#W*5D<5u1zIYfwKqT?D{ zjUyWy;b-JRStCkzLJdtgTE)|tbR*Nyv@sUXQT(QG;A&BcU5!+uve8y^2{Dy6M@QXP zOWJfK)qyMyCL?kncV?-Qhl%bp`pBc$j#3wlT%J`lA=haY4Y^mVXvhUyMMG}d zDjIU-R?(2Vw~FT64U9SDHm;Hnxt6PFr29EF%kX9pO^S{A^l{9l8-tDc;Se7eU-%9~ zckA<@Y{5bu-9>HkjGA@JP9h=|=^jrpjn0CVH!iCHj&B^v$}0|t#f`AA@F(6{UXr&h zZ?v;bdBb!u)wninn;A0?U%%9|CVX`_leP@q$)^py1U}BL;iQawvSC@OX**VbUwG%L zDB-&QxzEjg<;-VJPyGD7pZ?d8zklq7@C=yi{%iZaFu#;>{fv>-%~A=gZB8zCk}-7d6Gd`c0X!>6mBQ%{YG-Hn}$3%dq#XvLEDnaPK``mO__m}pK#(rs(=S7HAXKb280U t(KcafD1DmtNTBtE1{&hT5)+k`rRJTGfA2lhv&Rdwhvvr%2akx2{{zW%4kiEq literal 22390 zcmeHPZEPFInO;hk{2_^!CE0PB+I1Y&Y11sV++C8pc0Np5w4++K6j4zcx3#h#Lz%Wj zs!K|?lh#T7Be)N=1&q6(D9{|>1M=f=a0}y_U)S_t+!;6&Xba!fe+syPf23&926sl$ zUw7%fv-=J|UQx-ou@dCU0mx^coo8le=WBQ7ot^RL=T${fy5PrM2mE6HerCt3fO&qb zWWDlYf}L{~mljx}Qgx~8pg2Fzf@xN%mIL-wxqJqq z*#tlC9)aIRWkxr%vR&ISZNr1)#x}zh zyVp!qbR&Bvr*hNTnaSz7{7iNxH#at(pP3#XeP(8S^7!1b$+4W$pP!l-pQ(G~Al8`D zKbxI+CO4PQO^;_M#!uv?;cSNzl!N(%v*f@%CBnM{QcOUJFP2M*>iH$MTrDnC60TEW z2`31g^NG>(RW=nA7mL;63d{0Sc@`9%h2je=7=AC$e92j=oGMr0x}ES-ln3C)?`cqp zB{>JT@yS0%E}{+p;mv=DC8l|@>X?SBntC#$y1tf#64R4bCY5yjWcts!#Powv z;cR>dku)*_4MbqWS&5lUN4=#tq7j??LAz`k5hmhF^Gp`RS!2-~QL%?|k#= zAHDJQ17{&R9_z~rzd>kJ(De83Sqh-ps}(p_;`L)5{O|s<`1)_HTR(g7pLcbS-*R~` z@+YSIzBBAR^VND9mx8j#Di!5{H9Yv`vybg6|HV76JpAs_zy3GlpMJ{Q)~d^6k2f#@ zMKdb$B<%9I6tgh)=)LLkn4fWMFU?e4H`366+>B})XSJ%gm7&Yy zmv50SI3FeL{d>L5qtlbwu|wJXOcM=xB2FW^hymh7#7l^m5w9X%L%e}_8}ZJ)=~^s1qh&1HQ61ebsAkGD z;n`eZs%scVs^GzUAj?2a?SQL>)we@gpVru+JTzA=F0x=QeE%K6e!5tipIff5Kn?vW zkP6ZG)_My&xT;&KR0HNLLQLWNSb2#B4!liQ5;OdA^@GXS*5Z+68AK`Jrw-Vk@_%^}^2#3)~lu`IUMEG{X zcMyIb;X4VxpYTr*{s7^hBzza)y9xgk;d=;wknp{PKScO2;h!daAK?!Z{u#nQOZX#% ze~$1+30DbE5I#bpO!x}nX9+(?_<6!#ApAwb zze@Pm2){u1OHF*(NgBFo8ir~x(^5?fChjfE(^W0SGMSXkG?&@YTLJ$cj4To1w3jN( z6KJ<+I>}8Fm%eqt`r+H(tG@Q1KhFKbK;{?W1BV+{@PK3J$TabmcyxLT7}~hoauV?s z#4Ct55zz+KvoGR3=--3>J?P(y{=LT|UGECwYlzn)>_h)P^zTFeIEIg-e;obe!XLxO z--+Vcx&!eE!~)_s5#K<(6=8ojVghj%@dD!Oh(AI6LxkIQBOXR9A-;4#W}D zh$j(WLA-)^6Y<>$2lpYO{~-DgqW=*34;_zmLo0}{Aznv(C&KMJ5T8IS+?#1)SF^R0 z1E%)0?U<^m6-?DOJx%p2&9jp_yc}jsMcMvMxN0}Q@_M}mbt{8MxkayLHJ^1Bmbrb9 zJD3s%>rrkDt@$(0P34rX&{Pke9cx(a*JnC==y>*6EWdd~O8qr54fE`Q)` z8C^9k%@;GAc2=B*r3_-GdJ3*spPFu%g6>crea{sy6<8~==p=afF<8C=rnEcPzw`J~ zSX@0|6JCKOBz$PjU#AY-Wn47;>^fO4wk$2X3q?;PbN^i?lLTL{RMvAAva*)Tit^D^ z)EzAv@iB*&J9n+xap$q@6nL%GidS2G)?Rx97iw8qJC(@l6Z;UXKL4M3o%>o8=0vf= z7bkJosx&uDz)CCdTdO#;EqkBVOR4y!q{}n5PMyF<(~R0CQ(R~#9z@~$a$*hBXxn5e z))qEAp*xlJd?DfOzk~nbn%HPsM40gTd9h|DCaLR?)OAYgHc0BaBz4`Ax{aZ(HciB{ zK@gE8kT=l^BJLqP6|vx^;Z!VK6j4_+$fLNLyucS?6BpSencmHkx-F8r9!XuVq^?g= z7njs+4Ry7Q<4dyy5zDk-A$E)M*%a0H20aAiHC_Y{*KZ3F#U)s6X?MMR`X$r5O;R_| zd|!O9CaxY1+?`w5jBIamJ7+y-$yFtnH4oT{^OX_w=>h6=&?UdBr zFRA;4r0xMp-6th=yCikHC3T;Y)a{YfJt(Q$E2(=(Qa3EA`?REPpQP?#N!@28b)S{g zJtC?5oTTniNu4UGOGxTQBz2mkE-9(gC3Qxqt96iksjnbn*)OdBZP5>Je&6TI>bOMA zwH3wn=Zh(s-jt*+Evd^$>MTi}EveftsXHL4drVUIxTNk0N!^oDx?!u}SZ2o6RKrTb zr#GhIsdgr5t1xG)IVr<-lb*vL`NGv|3b!8|#2>&1wjbd9bq~&8_u%|>&vl3#To2A) z_u~9@FV0{079#&%KC=w2_YK5b5%%FrOJ4%{EaHU-dl_$uO?h;K)@EsmH*Jc;-U;uXZ3i0?)?un+M#;!1>r7=94L z4`TRxZa=twZ|n!2t2^-RimrO51NAUzag!L+Ro`Jr&rGqjZZk#M!SAx0-(lN+(6%3h z)u3%ZARh@+lS$itu);!XSKEHD#<#}JYg>n#Y135TM>C7vwjXqUL}s|Q{h)0>_`sdU z`q~c;N{w?AB`c{LmDC-Q)Qw5%a+12klDZ?3x~C*{Lw(0$0T*fC3TaM zx+zKB(~`PrNnKu2HzTQgMp8E$>T1&f{6G+aaH7S$RP24{rB=t)5F;P-@kGSt;FF3Pkp?;8JFln=%qW->~NU5cM=ikPs_Ta?e7 zWO|>M)H#wmS5oIm>U>EZlhhR?b@P(CQcY0dI;E90mDE-Ua#uBzF~YksCYiS)SM1pMMcO=LIGW}gPTI0u_##@kZL9Z*+P>-({|w{1 z-<|ov+3&36-Z}a0e;N7ygxLV+S35*GTAJUcrfLOPns37A zCt%_+G&N~Cn!#MXka2xT&0CP9aDKgIx-jvu9pL-!i`U*E3NmYTl@?_$o)iK_s}Lxf zqQJ0BAU=Vj-xesA90?RAVu4LNM1?66f4pgYsCy6?ngtt2rd^d;=@fh@?C7dpFnl!) z+Yh8n=6WgDYS+YY6UgS7*!P;4{h01SAS$7v{NwMGu1cU-tJZakmAmzgKof^*dxA~{ ze7B}*k@1b6xPynorV~oPU#xge;2&~I9$Q#o{t2ZIPaSq(gHhmH1uvG0l~avp^2=^z zd6AzH(d3saORVJcQ{9C!?4=3^o0m)8f&;rS`4dXKp0*j-#j~pD6L1?N(N3SZk15_p z!+xeCVrHF(IdE<%#+{--Cg^vU2|WDGTa%q#S9a}()vpuEh7xQCRfR47_}4fi3r=Z% zBwvNS6z302C_|M~4nI5$*-(MAU_6>XUsaCWb&7}cEh2*3s483P<#QoWaxyp5m=!G} sj!IWi1{CF7pv1$^jQEWua;4?PMo+kY|KZ8$XS35|bEDZqNBP130dJ?PK>z>%