diff --git a/build.gradle b/build.gradle index 16222eb4c1f2b4..8033b2277daa0f 100644 --- a/build.gradle +++ b/build.gradle @@ -61,7 +61,7 @@ buildscript { ext.hazelcastVersion = '5.3.6' ext.ebeanVersion = '15.5.2' ext.googleJavaFormatVersion = '1.18.1' - ext.openLineageVersion = '1.33.0' + ext.openLineageVersion = '1.38.0' ext.logbackClassicJava8 = '1.2.12' ext.awsSdk2Version = '2.30.33' ext.micrometerVersion = '1.15.1' diff --git a/metadata-integration/java/acryl-spark-lineage/.gitignore b/metadata-integration/java/acryl-spark-lineage/.gitignore new file mode 100644 index 00000000000000..5807ae10c04d30 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/.gitignore @@ -0,0 +1,7 @@ +# Temporary upstream files - downloaded only when patching +patches/upstream-*/ +patches/backup-*/ + +# Build artifacts +build/ +.gradle/ diff --git a/metadata-integration/java/acryl-spark-lineage/.tool-versions b/metadata-integration/java/acryl-spark-lineage/.tool-versions new file mode 100644 index 00000000000000..5eed8d4bcfba63 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/.tool-versions @@ -0,0 +1 @@ +java liberica-17.0.9+11 diff --git a/metadata-integration/java/acryl-spark-lineage/CLAUDE.md b/metadata-integration/java/acryl-spark-lineage/CLAUDE.md new file mode 100644 index 00000000000000..79f4ec2b85b5fa --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/CLAUDE.md @@ -0,0 +1,240 @@ +# acryl-spark-lineage Module + +This module integrates OpenLineage with DataHub's Spark lineage collection. It contains shadowed and modified OpenLineage classes to support custom functionality. + +## Architecture + +This module: + +- Builds shadow JARs for multiple Scala versions (2.12 and 2.13) +- Contains custom OpenLineage class implementations +- Depends on `io.openlineage:openlineage-spark` as specified by `ext.openLineageVersion` in the root `build.gradle` + +## OpenLineage Version Upgrade Process + +### Current Version + +The OpenLineage version is defined in the root `build.gradle`: + +```gradle +ext.openLineageVersion = '1.38.0' +``` + +**Last upgraded:** October 2, 2025 (from 1.33.0 to 1.38.0) + +**Key changes in 1.38.0:** + +- ✅ AWS Glue ARN handling now in upstream (was DataHub customization) +- ✅ `getMetastoreUri()` and `getWarehouseLocation()` now public upstream +- ✅ SaveIntoDataSourceCommandVisitor Delta table handling adopted upstream +- ⚡ Enhanced schema handling with nested structs, maps, and arrays +- ⚡ New UnionRDD and NewHadoopRDD extractors for improved path detection + +### Architecture: Patch-Based Customization System + +DataHub maintains customizations to OpenLineage through a **version-organized patch-based system**: + +``` +patches/ +└── datahub-customizations/ # Versioned patch files (committed to git) + ├── v1.38.0/ # Patches for OpenLineage 1.38.0 + │ ├── Vendors.patch + │ ├── PathUtils.patch + │ └── ... + └── v1.33.0/ # Patches for OpenLineage 1.33.0 (historical) + └── ... + +# Temporary files (NOT committed, downloaded when patching): +patches/upstream-/ # Original OpenLineage files +patches/backup-/ # Automatic backups during upgrades +``` + +**Important**: Only `patches/datahub-customizations/` is version controlled. Each OpenLineage version has its own subdirectory (e.g., `v1.38.0/`) containing patches specific to that version. Upstream and backup files are temporary and excluded via `.gitignore`. + +### Shadowed Classes Location + +Custom OpenLineage implementations are in: + +``` +src/main/java/io/openlineage/ +``` + +These files are OpenLineage source files with DataHub-specific modifications applied via patches. + +### Known Customizations (v1.38.0) + +Tracked via patch files in `patches/datahub-customizations/v1.38.0/`: + +1. **`Vendors.patch`**: Adds `RedshiftVendor` to the vendors list +2. **`PathUtils.patch`**: "Table outside warehouse" symlink handling +3. **`PlanUtils.patch`**: Custom directory path handling with DataHub's HdfsPathDataset +4. **`RemovePathPatternUtils.patch`**: DataHub-specific PathSpec transformations +5. **`StreamingDataSourceV2RelationVisitor.patch`**: File-based streaming source support +6. **`WriteToDataSourceV2Visitor.patch`**: ForeachBatch streaming write support +7. **`MergeIntoCommandEdgeInputDatasetBuilder.patch`**: Delta Lake merge command complex subquery handling +8. **`MergeIntoCommandInputDatasetBuilder.patch`**: Enables recursive traversal for merge command subqueries +9. **`SparkOpenLineageExtensionVisitorWrapper.patch`**: Extension visitor customizations +10. **`RddPathUtils.patch`**: Debug log level for noise reduction (3 log statements changed from warn to debug) +11. **Redshift vendor** (`spark/agent/vendor/redshift/*`): Complete custom implementation (no upstream equivalent) + +### Automated Upgrade Process + +Use the automated upgrade script: + +```bash +# Quick upgrade (recommended) +./scripts/upgrade-openlineage.sh 1.33.0 1.38.0 + +# This will: +# 1. Fetch new upstream files from GitHub +# 2. Compare old vs new upstream versions +# 3. Update shadowed files with new upstream code +# 4. Apply DataHub customizations via patches +# 5. Update build.gradle version +# 6. Report any conflicts requiring manual merge +``` + +### Manual Upgrade Steps + +If you prefer manual control or need to resolve conflicts: + +1. **Fetch upstream files**: + + ```bash + ./scripts/fetch-upstream.sh 1.38.0 + ``` + +2. **Compare upstream changes** (optional): + + ```bash + # See what changed between versions + diff -r patches/upstream-1.33.0 patches/upstream-1.38.0 + ``` + +3. **Update source files**: + + ```bash + # Copy new upstream files + cp -r patches/upstream-1.38.0/* src/main/java/io/openlineage/ + ``` + +4. **Apply DataHub customizations**: + + ```bash + # Apply all patches for the target version + for patch in patches/datahub-customizations/v1.38.0/*.patch; do + echo "Applying $(basename $patch)..." + patch -p0 < "$patch" || echo "Conflict in $patch - manual merge required" + done + ``` + +5. **Handle conflicts**: + + - If patches fail, manually merge changes from: + - Backup: `patches/backup-/` + - New upstream: `patches/upstream-/` + - Patches show what to customize: `patches/datahub-customizations/v/` + +6. **Regenerate patches** (if you manually merged): + + ```bash + ./scripts/generate-patches.sh 1.38.0 + ``` + +7. **Update build.gradle**: + + ```gradle + ext.openLineageVersion = '1.38.0' + ``` + +8. **Test thoroughly**: + ```bash + ./gradlew :metadata-integration:java:acryl-spark-lineage:build + ./gradlew :metadata-integration:java:acryl-spark-lineage:test + ``` + +### Understanding Patch Files + +Patch files show exactly what DataHub customized: + +```bash +# View a specific customization for v1.38.0 +cat patches/datahub-customizations/v1.38.0/Vendors.patch + +# Example output shows: +# - Lines removed from upstream (-) +# - Lines added by DataHub (+) +# - Context around changes +``` + +### Adding New Customizations + +If you need to customize additional files: + +1. Make your changes to files in `src/main/java/io/openlineage/` +2. Regenerate patches: + ```bash + ./scripts/generate-patches.sh + ``` +3. The script will update `patches/datahub-customizations/` with your new changes + +### Troubleshooting + +**Patch conflicts during upgrade:** + +- The upgrade script preserves backups in `patches/backup-/` +- Manually merge by comparing: + - Your backup (shows DataHub customizations) + - New upstream (shows OpenLineage changes) + - Existing patch (shows what customizations to preserve) + +**Files that are entirely DataHub-specific:** + +- `FileStreamMicroBatchStreamStrategy.java` - Custom file-based streaming +- Redshift vendor files - Complete custom Redshift support +- These have `.note` files instead of `.patch` files + +**Debugging patches:** + +```bash +# Dry-run to see if patch will apply cleanly +patch -p0 --dry-run < patches/datahub-customizations/v1.38.0/Vendors.patch + +# See what a patch would change +patch -p0 --dry-run < patches/datahub-customizations/v1.38.0/Vendors.patch | less +``` + +### Debugging + +To see resolved dependencies for each Scala version: + +```bash +./gradlew :metadata-integration:java:acryl-spark-lineage:debugDependencies +``` + +## Build Tasks + +**Building from repository root:** + +```bash +# Build all shadow JARs (both Scala 2.12 and 2.13) +./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:acryl-spark-lineage:shadowJar + +# Build specific Scala version +./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:acryl-spark-lineage:shadowJar_2_12 +./gradlew -PjavaClassVersionDefault=8 :metadata-integration:java:acryl-spark-lineage:shadowJar_2_13 + +# Run tests +./gradlew :metadata-integration:java:acryl-spark-lineage:test + +# Run integration tests +./gradlew :metadata-integration:java:acryl-spark-lineage:integrationTest + +# Full build with tests +./gradlew :metadata-integration:java:acryl-spark-lineage:build +``` + +**Note:** Java 8 is required to build this project, specified via `-PjavaClassVersionDefault=8`. The shadow JARs are output to `build/libs/` with filenames like: + +- `acryl-spark-lineage_2.12-.jar` +- `acryl-spark-lineage_2.13-.jar` diff --git a/metadata-integration/java/acryl-spark-lineage/README.md b/metadata-integration/java/acryl-spark-lineage/README.md index 1ea237dfad1154..1c47dac6baa829 100644 --- a/metadata-integration/java/acryl-spark-lineage/README.md +++ b/metadata-integration/java/acryl-spark-lineage/README.md @@ -412,6 +412,13 @@ Use Java 8 to build the project. The project uses Gradle as the build tool. To b ## Changelog +### Next + +- _Changes_: + - Map jdbc sqlserver dialect to mssql platform otherwise OpenLineage fails to parse the sql +- _Fixes_: + - **Dependency Relocation Fix** ([#14989](https://github.com/datahub-project/datahub/issues/14989)): Fixed shadow JAR packaging to properly relocate all transitive dependencies, preventing classloading conflicts with other Spark extensions. All dependencies except `io.openlineage` (which contains customized classes) and `datahub.spark` (the public API) are now properly relocated under `io.acryl.shaded` namespace. This resolves conflicts with libraries like ANTLR, Apache Avro, and others that could clash with Delta Lake and other Spark components. + ### Version 0.2.18 - _Changes_: diff --git a/metadata-integration/java/acryl-spark-lineage/build.gradle b/metadata-integration/java/acryl-spark-lineage/build.gradle index 5ee8be4f124abd..6346e310697275 100644 --- a/metadata-integration/java/acryl-spark-lineage/build.gradle +++ b/metadata-integration/java/acryl-spark-lineage/build.gradle @@ -210,6 +210,28 @@ scalaVersions.each { sv -> relocate 'com.google.errorprone', 'io.acryl.shaded.com.google.errorprone' relocate 'com.sun.jna', 'io.acryl.shaded.com.sun.jna' + // Additional relocations to prevent classloading conflicts (issue #14989) + relocate 'antlr', 'io.acryl.shaded.antlr' + relocate 'org.antlr', 'io.acryl.shaded.org.antlr' + relocate 'org.apache.avro', 'io.acryl.shaded.org.apache.avro' + relocate 'org.codehaus.jackson', 'io.acryl.shaded.org.codehaus.jackson' + relocate 'org.eclipse.jetty', 'io.acryl.shaded.org.eclipse.jetty' + relocate 'jakarta.json', 'io.acryl.shaded.jakarta.json' + relocate 'org.reactivestreams', 'io.acryl.shaded.org.reactivestreams' + relocate 'net.jpountz', 'io.acryl.shaded.net.jpountz' + relocate 'org.xerial', 'io.acryl.shaded.org.xerial' + relocate 'software.amazon', 'io.acryl.shaded.software.amazon' + relocate 'nonapi.io', 'io.acryl.shaded.nonapi.io' + relocate 'org.publicsuffix', 'io.acryl.shaded.org.publicsuffix' + relocate 'io.swagger', 'io.acryl.shaded.io.swagger' + relocate 'io.micrometer', 'io.acryl.shaded.io.micrometer' + relocate 'io.confluent', 'io.acryl.shaded.io.confluent' + relocate 'io.github', 'io.acryl.shaded.io.github' + relocate 'io.datahubproject', 'io.acryl.shaded.io.datahubproject' + // NOTE: Do NOT relocate io.openlineage - we have customized OpenLineage classes in src/main/java/io/openlineage/ + // that must remain at io.openlineage package to properly override the upstream dependency + relocate 'com.eclipsesource', 'io.acryl.shaded.com.eclipsesource' + // Debug output to verify we're using the right dependency doFirst { println "Building JAR for Scala ${sv}" diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/MergeIntoCommandEdgeInputDatasetBuilder.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/MergeIntoCommandEdgeInputDatasetBuilder.patch new file mode 100644 index 00000000000000..efee9acd7e36f0 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/MergeIntoCommandEdgeInputDatasetBuilder.patch @@ -0,0 +1,59 @@ +# Patch for DataHub customizations in MergeIntoCommandEdgeInputDatasetBuilder.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/MergeIntoCommandEdgeInputDatasetBuilder.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java 2025-10-02 14:47:52.097747891 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java 2025-09-12 19:50:04.457785281 +0200 +@@ -48,9 +48,48 @@ + inputs.addAll(delegate((LogicalPlan) o1, event)); + } + if (o2 != null && o2 instanceof LogicalPlan) { +- inputs.addAll(delegate((LogicalPlan) o2, event)); ++ List sourceDatasets = delegate((LogicalPlan) o2, event); ++ inputs.addAll(sourceDatasets); ++ ++ // Handle complex subqueries that aren't captured by standard delegation ++ if (sourceDatasets.isEmpty()) { ++ inputs.addAll(extractInputDatasetsFromComplexSource((LogicalPlan) o2, event)); ++ } + } + + return inputs; + } ++ ++ /** ++ * Extracts input datasets from complex source plans like subqueries with DISTINCT, PROJECT, etc. ++ * This handles cases where the standard delegation doesn't work due to missing builders for ++ * intermediate logical plan nodes. ++ */ ++ private List extractInputDatasetsFromComplexSource( ++ LogicalPlan source, SparkListenerEvent event) { ++ List datasets = new ArrayList<>(); ++ ++ // Use a queue to traverse the logical plan tree depth-first ++ java.util.Queue queue = new java.util.LinkedList<>(); ++ queue.offer(source); ++ ++ while (!queue.isEmpty()) { ++ LogicalPlan current = queue.poll(); ++ ++ // Try to delegate this node directly ++ List currentDatasets = delegate(current, event); ++ datasets.addAll(currentDatasets); ++ ++ // If this node didn't produce any datasets, traverse its children ++ if (currentDatasets.isEmpty()) { ++ // Add all children to the queue for traversal ++ scala.collection.Iterator children = current.children().iterator(); ++ while (children.hasNext()) { ++ queue.offer(children.next()); ++ } ++ } ++ } ++ ++ return datasets; ++ } + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/MergeIntoCommandInputDatasetBuilder.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/MergeIntoCommandInputDatasetBuilder.patch new file mode 100644 index 00000000000000..96e489132111c4 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/MergeIntoCommandInputDatasetBuilder.patch @@ -0,0 +1,75 @@ +# Patch for DataHub customizations in MergeIntoCommandInputDatasetBuilder.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/MergeIntoCommandInputDatasetBuilder.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java 2025-10-02 14:47:52.336608842 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java 2025-09-12 19:50:04.457859239 +0200 +@@ -20,7 +20,7 @@ + extends AbstractQueryPlanInputDatasetBuilder { + + public MergeIntoCommandInputDatasetBuilder(OpenLineageContext context) { +- super(context, false); ++ super(context, true); // FIXED: This enables recursive traversal of subqueries + } + + @Override +@@ -31,8 +31,54 @@ + @Override + protected List apply(SparkListenerEvent event, MergeIntoCommand x) { + List datasets = new ArrayList<>(); +- datasets.addAll(delegate(x.target(), event)); +- datasets.addAll(delegate(x.source(), event)); ++ ++ // Process target table ++ List targetDatasets = delegate(x.target(), event); ++ datasets.addAll(targetDatasets); ++ ++ // Process source - this will recursively process all datasets in the source plan, ++ // including those in subqueries ++ List sourceDatasets = delegate(x.source(), event); ++ datasets.addAll(sourceDatasets); ++ ++ // Handle complex subqueries that aren't captured by standard delegation ++ if (sourceDatasets.isEmpty()) { ++ sourceDatasets.addAll(extractInputDatasetsFromComplexSource(x.source(), event)); ++ datasets.addAll(sourceDatasets); ++ } ++ ++ return datasets; ++ } ++ ++ /** ++ * Extracts input datasets from complex source plans like subqueries with DISTINCT, PROJECT, etc. ++ * This handles cases where the standard delegation doesn't work due to missing builders for ++ * intermediate logical plan nodes. ++ */ ++ private List extractInputDatasetsFromComplexSource( ++ LogicalPlan source, SparkListenerEvent event) { ++ List datasets = new ArrayList<>(); ++ ++ // Use a queue to traverse the logical plan tree depth-first ++ java.util.Queue queue = new java.util.LinkedList<>(); ++ queue.offer(source); ++ ++ while (!queue.isEmpty()) { ++ LogicalPlan current = queue.poll(); ++ ++ // Try to delegate this node directly ++ List currentDatasets = delegate(current, event); ++ datasets.addAll(currentDatasets); ++ ++ // If this node didn't produce any datasets, traverse its children ++ if (currentDatasets.isEmpty()) { ++ // Add all children to the queue for traversal ++ scala.collection.Iterator children = current.children().iterator(); ++ while (children.hasNext()) { ++ queue.offer(children.next()); ++ } ++ } ++ } + + return datasets; + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/PathUtils.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/PathUtils.patch new file mode 100644 index 00000000000000..c4e5a9d4c7beca --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/PathUtils.patch @@ -0,0 +1,21 @@ +# Patch for DataHub customizations in PathUtils.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/PathUtils.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/util/PathUtils.java 2025-10-02 14:47:50.071931557 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PathUtils.java 2025-10-02 14:53:54.367355461 +0200 +@@ -96,6 +96,11 @@ + symlinkDataset = + Optional.of( + FilesystemDatasetUtils.fromLocationAndName(warehouseLocation.get(), tableName)); ++ } else { ++ // Table is outside warehouse, but we create symlink to actual location + tableName ++ String tableName = nameFromTableIdentifier(catalogTable.identifier()); ++ symlinkDataset = ++ Optional.of(FilesystemDatasetUtils.fromLocationAndName(locationUri, tableName)); + } + } + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/PlanUtils.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/PlanUtils.patch new file mode 100644 index 00000000000000..fa56ee93089a60 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/PlanUtils.patch @@ -0,0 +1,75 @@ +# Patch for DataHub customizations in PlanUtils.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/PlanUtils.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/util/PlanUtils.java 2025-10-02 14:47:50.292757088 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java 2025-10-02 14:55:47.403520416 +0200 +@@ -7,11 +7,16 @@ + + import static io.openlineage.spark.agent.util.ScalaConversionUtils.asJavaOptional; + ++import com.typesafe.config.Config; ++import com.typesafe.config.ConfigFactory; ++import datahub.spark.conf.SparkLineageConf; ++import io.datahubproject.openlineage.dataset.HdfsPathDataset; + import io.openlineage.client.OpenLineage; + import io.openlineage.spark.agent.Versions; + import io.openlineage.spark.api.naming.NameNormalizer; + import java.io.IOException; + import java.net.URI; ++import java.net.URISyntaxException; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collection; +@@ -25,6 +30,8 @@ + import lombok.extern.slf4j.Slf4j; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.Path; ++import org.apache.spark.SparkConf; ++import org.apache.spark.SparkEnv; + import org.apache.spark.rdd.RDD; + import org.apache.spark.sql.catalyst.expressions.Attribute; + import org.apache.spark.sql.types.ArrayType; +@@ -274,7 +281,7 @@ + return new ArrayList<>(normalizedPaths); + } + +- private static Path getDirectoryPath(Path p, Configuration hadoopConf) { ++ public static Path getDirectoryPathOl(Path p, Configuration hadoopConf) { + try { + if (p.getFileSystem(hadoopConf).getFileStatus(p).isFile()) { + return p.getParent(); +@@ -282,7 +289,29 @@ + return p; + } + } catch (IOException e) { +- log.warn("Unable to get file system for path: {}", e.getMessage()); ++ log.warn("Unable to get file system for path ", e); ++ return p; ++ } ++ } ++ ++ // This method was replaced to support Datahub PathSpecs ++ public static Path getDirectoryPath(Path p, Configuration hadoopConf) { ++ SparkConf conf = SparkEnv.get().conf(); ++ String propertiesString = ++ Arrays.stream(conf.getAllWithPrefix("spark.datahub.")) ++ .map(tup -> tup._1 + "= \"" + tup._2 + "\"") ++ .collect(Collectors.joining("\n")); ++ Config datahubConfig = ConfigFactory.parseString(propertiesString); ++ SparkLineageConf sparkLineageConf = ++ SparkLineageConf.toSparkLineageConf(datahubConfig, null, null); ++ HdfsPathDataset hdfsPath = null; ++ try { ++ URI uri = new URI(p.toString()); ++ hdfsPath = HdfsPathDataset.create(uri, sparkLineageConf.getOpenLineageConf()); ++ log.debug("Path {} transformed to {}", p, hdfsPath.getDatasetPath()); ++ return new Path(hdfsPath.getDatasetPath()); ++ } catch (InstantiationException | URISyntaxException e) { ++ log.warn("Unable to convert path to hdfs path {} the exception was {}", p, e.getMessage()); + return p; + } + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/RddPathUtils.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/RddPathUtils.patch new file mode 100644 index 00000000000000..f0d2b9ab750912 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/RddPathUtils.patch @@ -0,0 +1,34 @@ +# Patch for DataHub customizations in RddPathUtils.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/RddPathUtils.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/util/RddPathUtils.java 2025-10-02 14:47:50.561792227 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java 2025-10-02 14:57:05.423816118 +0200 +@@ -70,7 +70,8 @@ + + @Override + public Stream extract(RDD rdd) { +- log.warn("Unknown RDD class {}", rdd); ++ // Change to debug to silence error ++ log.debug("Unknown RDD class {}", rdd); + return Stream.empty(); + } + } +@@ -197,10 +198,12 @@ + .map(o -> parentOf(o.toString())) + .filter(Objects::nonNull); + } else { +- log.warn("Cannot extract path from ParallelCollectionRDD {}", data); ++ // Changed to debug to silence error ++ log.debug("Cannot extract path from ParallelCollectionRDD {}", data); + } + } catch (IllegalAccessException | IllegalArgumentException e) { +- log.warn("Cannot read data field from ParallelCollectionRDD {}", rdd); ++ // Changed to debug to silence error ++ log.debug("Cannot read data field from ParallelCollectionRDD {}", rdd); + } + return Stream.empty(); + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/RemovePathPatternUtils.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/RemovePathPatternUtils.patch new file mode 100644 index 00000000000000..1538c910a23465 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/RemovePathPatternUtils.patch @@ -0,0 +1,180 @@ +# Patch for DataHub customizations in RemovePathPatternUtils.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/RemovePathPatternUtils.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/util/RemovePathPatternUtils.java 2025-10-02 14:47:50.871983858 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RemovePathPatternUtils.java 2025-09-12 19:50:04.457325990 +0200 +@@ -5,26 +5,42 @@ + + package io.openlineage.spark.agent.util; + ++import com.typesafe.config.Config; ++import com.typesafe.config.ConfigFactory; ++import datahub.spark.conf.SparkAppContext; ++import datahub.spark.conf.SparkConfigParser; ++import io.datahubproject.openlineage.config.DatahubOpenlineageConfig; ++import io.datahubproject.openlineage.dataset.HdfsPathDataset; + import io.openlineage.client.OpenLineage.InputDataset; + import io.openlineage.client.OpenLineage.OutputDataset; + import io.openlineage.spark.api.OpenLineageContext; ++import java.net.URI; ++import java.net.URISyntaxException; ++import java.util.Arrays; + import java.util.List; ++import java.util.Objects; + import java.util.Optional; + import java.util.regex.Pattern; + import java.util.stream.Collectors; ++import lombok.extern.slf4j.Slf4j; + import org.apache.commons.lang3.StringUtils; ++import org.apache.spark.SparkConf; ++import org.apache.spark.sql.SparkSession; + + /** + * Utility class to handle removing path patterns in dataset names. Given a configured regex pattern + * with "remove" group defined, class methods run regex replacements on all the datasets available + * within the event + */ ++@Slf4j + public class RemovePathPatternUtils { + public static final String REMOVE_PATTERN_GROUP = "remove"; + public static final String SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN = + "spark.openlineage.dataset.removePath.pattern"; + +- public static List removeOutputsPathPattern( ++ private static Optional sparkConf = Optional.empty(); ++ ++ public static List removeOutputsPathPattern_ol( + OpenLineageContext context, List outputs) { + return getPattern(context) + .map( +@@ -50,36 +66,55 @@ + .orElse(outputs); + } + ++ // This method was replaced to support Datahub PathSpecs ++ public static List removeOutputsPathPattern( ++ OpenLineageContext context, List outputs) { ++ return outputs.stream() ++ .map( ++ dataset -> { ++ String newName = removePathPattern(dataset.getName()); ++ if (!Objects.equals(newName, dataset.getName())) { ++ return context ++ .getOpenLineage() ++ .newOutputDatasetBuilder() ++ .name(newName) ++ .namespace(dataset.getNamespace()) ++ .facets(dataset.getFacets()) ++ .outputFacets(dataset.getOutputFacets()) ++ .build(); ++ } else { ++ return dataset; ++ } ++ }) ++ .collect(Collectors.toList()); ++ } ++ ++ // This method was replaced to support Datahub PathSpecs + public static List removeInputsPathPattern( +- OpenLineageContext context, List outputs) { +- return getPattern(context) ++ OpenLineageContext context, List inputs) { ++ return inputs.stream() + .map( +- pattern -> +- outputs.stream() +- .map( +- dataset -> { +- String newName = removePath(pattern, dataset.getName()); +- if (newName != dataset.getName()) { +- return context +- .getOpenLineage() +- .newInputDatasetBuilder() +- .name(newName) +- .namespace(dataset.getNamespace()) +- .facets(dataset.getFacets()) +- .inputFacets(dataset.getInputFacets()) +- .build(); +- } else { +- return dataset; +- } +- }) +- .collect(Collectors.toList())) +- .orElse(outputs); ++ dataset -> { ++ String newName = removePathPattern(dataset.getName()); ++ if (!Objects.equals(newName, dataset.getName())) { ++ return context ++ .getOpenLineage() ++ .newInputDatasetBuilder() ++ .name(newName) ++ .namespace(dataset.getNamespace()) ++ .facets(dataset.getFacets()) ++ .inputFacets(dataset.getInputFacets()) ++ .build(); ++ } else { ++ return dataset; ++ } ++ }) ++ .collect(Collectors.toList()); + } + + private static Optional getPattern(OpenLineageContext context) { +- return context +- .getSparkContext() +- .map(sparkContext -> sparkContext.conf()) ++ return Optional.of(context.getSparkContext()) ++ .map(sparkContext -> sparkContext.get().conf()) + .filter(conf -> conf.contains(SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN)) + .map(conf -> conf.get(SPARK_OPENLINEAGE_DATASET_REMOVE_PATH_PATTERN)) + .map(pattern -> Pattern.compile(pattern)); +@@ -104,4 +139,45 @@ + + name.substring(matcher.end(REMOVE_PATTERN_GROUP), name.length())) + .orElse(name); + } ++ ++ /** ++ * SparkConf does not change through job lifetime but it can get lost once session is closed. It's ++ * good to have it set in case of SPARK-29046 ++ */ ++ private static Optional loadSparkConf() { ++ if (!sparkConf.isPresent() && SparkSession.getDefaultSession().isDefined()) { ++ sparkConf = Optional.of(SparkSession.getDefaultSession().get().sparkContext().getConf()); ++ } ++ return sparkConf; ++ } ++ ++ private static String removePathPattern(String datasetName) { ++ // TODO: The reliance on global-mutable state here should be changed ++ // this led to problems in the PathUtilsTest class, where some tests interfered with others ++ log.info("Removing path pattern from dataset name {}", datasetName); ++ Optional conf = loadSparkConf(); ++ if (!conf.isPresent()) { ++ return datasetName; ++ } ++ try { ++ String propertiesString = ++ Arrays.stream(conf.get().getAllWithPrefix("spark.datahub.")) ++ .map(tup -> tup._1 + "= \"" + tup._2 + "\"") ++ .collect(Collectors.joining("\n")); ++ Config datahubConfig = ConfigFactory.parseString(propertiesString); ++ DatahubOpenlineageConfig datahubOpenlineageConfig = ++ SparkConfigParser.sparkConfigToDatahubOpenlineageConf( ++ datahubConfig, new SparkAppContext()); ++ HdfsPathDataset hdfsPath = ++ HdfsPathDataset.create(new URI(datasetName), datahubOpenlineageConfig); ++ log.debug("Transformed path is {}", hdfsPath.getDatasetPath()); ++ return hdfsPath.getDatasetPath(); ++ } catch (InstantiationException e) { ++ log.warn( ++ "Unable to convert dataset {} to path the exception was {}", datasetName, e.getMessage()); ++ return datasetName; ++ } catch (URISyntaxException e) { ++ throw new RuntimeException(e); ++ } ++ } + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/SparkOpenLineageExtensionVisitorWrapper.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/SparkOpenLineageExtensionVisitorWrapper.patch new file mode 100644 index 00000000000000..2fe754543a8c78 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/SparkOpenLineageExtensionVisitorWrapper.patch @@ -0,0 +1,192 @@ +# Patch for DataHub customizations in SparkOpenLineageExtensionVisitorWrapper.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/SparkOpenLineageExtensionVisitorWrapper.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/lifecycle/SparkOpenLineageExtensionVisitorWrapper.java 2025-10-02 14:47:51.149945623 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/SparkOpenLineageExtensionVisitorWrapper.java 2025-09-12 19:50:04.456499783 +0200 +@@ -4,10 +4,6 @@ + */ + package io.openlineage.spark.agent.lifecycle; + +-import com.fasterxml.jackson.annotation.JsonCreator; +-import com.fasterxml.jackson.annotation.JsonProperty; +-import com.fasterxml.jackson.core.type.TypeReference; +-import com.fasterxml.jackson.databind.ObjectMapper; + import io.openlineage.client.OpenLineage; + import io.openlineage.client.OpenLineage.InputDataset; + import io.openlineage.client.OpenLineageClientUtils; +@@ -16,9 +12,14 @@ + import io.openlineage.spark.agent.util.ExtensionClassloader; + import io.openlineage.spark.api.SparkOpenLineageConfig; + import io.openlineage.spark.extension.OpenLineageExtensionProvider; ++import io.openlineage.spark.shaded.com.fasterxml.jackson.annotation.JsonCreator; ++import io.openlineage.spark.shaded.com.fasterxml.jackson.annotation.JsonProperty; ++import io.openlineage.spark.shaded.com.fasterxml.jackson.core.type.TypeReference; ++import io.openlineage.spark.shaded.com.fasterxml.jackson.databind.ObjectMapper; ++import io.openlineage.spark.shaded.org.apache.commons.lang3.tuple.ImmutablePair; ++import io.openlineage.spark.shaded.org.apache.commons.lang3.tuple.Pair; + import java.io.IOException; + import java.io.InputStream; +-import java.lang.reflect.InvocationTargetException; + import java.lang.reflect.Method; + import java.nio.ByteBuffer; + import java.util.ArrayList; +@@ -31,9 +32,8 @@ + import java.util.stream.Collectors; + import lombok.extern.slf4j.Slf4j; + import org.apache.commons.io.IOUtils; +-import org.apache.commons.lang3.reflect.MethodUtils; +-import org.apache.commons.lang3.tuple.ImmutablePair; +-import org.apache.commons.lang3.tuple.Pair; ++ ++// We only shadow this jar to silence warnings about illegal reflective access + + /** + * A helper class that uses reflection to call all methods of SparkOpenLineageExtensionVisitor, +@@ -91,11 +91,12 @@ + .anyMatch( + objectAndMethod -> { + try { +- return (boolean) objectAndMethod.right.invoke(objectAndMethod.left, object); ++ return (boolean) ++ objectAndMethod.getRight().invoke(objectAndMethod.getLeft(), object); + } catch (Exception e) { + log.error( + "Can't invoke 'isDefinedAt' method on {} class instance", +- objectAndMethod.left.getClass().getCanonicalName()); ++ objectAndMethod.getLeft().getClass().getCanonicalName()); + } + return false; + }); +@@ -119,19 +120,21 @@ + try { + Map result = + (Map) +- objectAndMethod.right.invoke( +- objectAndMethod.left, +- lineageNode, +- sparkListenerEventName, +- sqlContext, +- parameters); ++ objectAndMethod ++ .getRight() ++ .invoke( ++ objectAndMethod.getLeft(), ++ lineageNode, ++ sparkListenerEventName, ++ sqlContext, ++ parameters); + if (result != null && !result.isEmpty()) { + return objectMapper.convertValue(result, DatasetIdentifier.class); + } + } catch (Exception e) { + log.warn( + "Can't invoke apply method on {} class instance", +- objectAndMethod.left.getClass().getCanonicalName()); ++ objectAndMethod.getLeft().getClass().getCanonicalName()); + } + } + } +@@ -183,15 +186,16 @@ + try { + Map result = + (Map) +- objectAndMethod.right.invoke( +- objectAndMethod.left, lineageNode, sparkListenerEventName); ++ objectAndMethod ++ .getRight() ++ .invoke(objectAndMethod.getLeft(), lineageNode, sparkListenerEventName); + if (result != null && !result.isEmpty()) { + return result; + } + } catch (Exception e) { + log.error( + "Can't invoke apply method on {} class instance", +- objectAndMethod.left.getClass().getCanonicalName()); ++ objectAndMethod.getLeft().getClass().getCanonicalName()); + } + } + } +@@ -251,6 +255,7 @@ + return objects; + } + ++ // FIXED: This method now uses safer classloader handling to avoid illegal reflective access + private static void loadProviderToAvailableClassloaders(List classloaders) + throws IOException { + List filteredClassloaders = +@@ -263,46 +268,41 @@ + + if (!filteredClassloaders.isEmpty()) { + log.warn( +- "An illegal reflective access operation will occur when using the openlineage-spark integration with a " +- + "Spark connector that implements spark-openlineage extension interfaces. \n" +- + "This issue arises when the openlineage-spark integration and the Spark connector are loaded using " +- + "different classloaders. For example, if one library is loaded using the --jars parameter while the other " +- + "is placed in the /usr/lib/spark/jars directory. \n" +- + "In this case, the OpenLineageExtensionProvider class will only be accessible to the class loader that " +- + "loaded the openlineage-spark integration. If the other class loader attempts to access this class, it " +- + "will trigger an illegal reflective access operation.\n" +- + "To prevent this, ensure that both the openlineage-spark integration and the Spark connector are loaded " +- + "using the same class loader. This can be achieved by: \n" ++ "Different classloaders detected for openlineage-spark integration and Spark connector. " ++ + "This may cause extension loading issues. " ++ + "For optimal compatibility, ensure both libraries are loaded using the same classloader by: \n" + + "1. Placing both libraries in the /usr/lib/spark/jars directory, or \n" + + "2. Loading both libraries through the --jars parameter."); + } + ++ // FIXED: Instead of using dangerous reflective defineClass, try to load extensions ++ // using available classloaders without illegal reflection + filteredClassloaders.forEach( + cl -> { + try { +- MethodUtils.invokeMethod( +- cl, true, "defineClass", providerCanonicalName, providerClassBytes, null); +- log.trace("{} succeeded to load a class", cl); +- } catch (InvocationTargetException ex) { +- if (!(ex.getCause() instanceof LinkageError)) { +- if (!providerFailWarned) { +- log.error("Failed to load OpenLineageExtensionProvider class", ex.getCause()); +- } +- providerFailWarned = true; ++ // SAFE APPROACH: Try to load the class normally first ++ try { ++ cl.loadClass(providerCanonicalName); ++ log.debug("Provider class already available in classloader: {}", cl); ++ return; ++ } catch (ClassNotFoundException e) { ++ // Class not found, but we won't force-define it using illegal reflection ++ log.debug("Provider class not found in classloader {}, skipping unsafe loading", cl); + } +- } catch (RuntimeException ex) { +- // On Java 17, if add opens is not used, the defineClass method will throw an +- // InaccessibleObjectException +- // Any workaround is TODO, but we should not crash here +- if (!providerFailWarned) { +- log.warn("{}: Failed to load OpenLineageExtensionProvider class ", cl, ex); ++ ++ // ALTERNATIVE SAFE APPROACH: Try using parent classloader delegation ++ ClassLoader parent = cl.getParent(); ++ if (parent != null && parent != currentThreadClassloader) { ++ try { ++ parent.loadClass(providerCanonicalName); ++ log.debug("Provider class found via parent delegation in classloader: {}", cl); ++ } catch (ClassNotFoundException e) { ++ log.trace("Provider class not found via parent delegation either"); ++ } + } +- providerFailWarned = true; ++ + } catch (Exception e) { +- if (!providerFailWarned) { +- log.warn("{}: Failed to load OpenLineageExtensionProvider class ", cl, e); +- } +- providerFailWarned = true; ++ log.debug("Safe provider loading failed for classloader {}: {}", cl, e.getMessage()); + } + }); + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/StreamingDataSourceV2RelationVisitor.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/StreamingDataSourceV2RelationVisitor.patch new file mode 100644 index 00000000000000..3ca22bd5c602af --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/StreamingDataSourceV2RelationVisitor.patch @@ -0,0 +1,58 @@ +# Patch for DataHub customizations in StreamingDataSourceV2RelationVisitor.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/StreamingDataSourceV2RelationVisitor.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/lifecycle/plan/StreamingDataSourceV2RelationVisitor.java 2025-10-02 14:47:51.648391168 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/StreamingDataSourceV2RelationVisitor.java 2025-09-12 19:50:04.456795324 +0200 +@@ -24,6 +24,8 @@ + "org.apache.spark.sql.connector.kinesis.KinesisV2MicrobatchStream"; + private static final String MONGO_MICRO_BATCH_STREAM_CLASS_NAME = + "com.mongodb.spark.sql.connector.read.MongoMicroBatchStream"; ++ private static final String FILE_STREAM_MICRO_BATCH_STREAM_CLASS_NAME = ++ "org.apache.spark.sql.execution.streaming.sources.FileStreamSourceV2"; + + public StreamingDataSourceV2RelationVisitor(@NonNull OpenLineageContext context) { + super(context); +@@ -68,6 +70,9 @@ + streamStrategy = new KinesisMicroBatchStreamStrategy(inputDataset(), relation); + } else if (MONGO_MICRO_BATCH_STREAM_CLASS_NAME.equals(streamClassName)) { + streamStrategy = new MongoMicroBatchStreamStrategy(inputDataset(), relation); ++ } else if (FILE_STREAM_MICRO_BATCH_STREAM_CLASS_NAME.equals(streamClassName) ++ || isFileBasedStreamingSource(streamClassName)) { ++ streamStrategy = new FileStreamMicroBatchStreamStrategy(inputDataset(), relation); + } else { + log.warn( + "The {} has been selected because no rules have matched for the stream class of {}", +@@ -81,7 +86,28 @@ + ScalaConversionUtils.asJavaOptional(relation.startOffset())); + } + +- log.info("Selected this strategy: {}", streamStrategy.getClass().getSimpleName()); ++ log.info( ++ "Selected this strategy: {} for stream class: {}", ++ streamStrategy.getClass().getSimpleName(), ++ streamClassName); + return streamStrategy; + } ++ ++ /** Check if the stream class name indicates a file-based streaming source. */ ++ private boolean isFileBasedStreamingSource(String streamClassName) { ++ if (streamClassName == null) { ++ return false; ++ } ++ ++ return streamClassName.contains("FileStreamSource") ++ || streamClassName.contains("TextFileStreamSource") ++ || streamClassName.contains("FileSource") ++ || streamClassName.contains("ParquetFileSource") ++ || streamClassName.contains("JsonFileSource") ++ || streamClassName.contains("CsvFileSource") ++ || streamClassName.contains("org.apache.spark.sql.execution.streaming.sources") ++ || streamClassName.contains("org.apache.spark.sql.execution.datasources.v2.csv") ++ || streamClassName.contains("org.apache.spark.sql.execution.datasources.v2.json") ++ || streamClassName.contains("org.apache.spark.sql.execution.datasources.v2.parquet"); ++ } + } diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/Vendors.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/Vendors.patch new file mode 100644 index 00000000000000..7e72008cc5d043 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/Vendors.patch @@ -0,0 +1,20 @@ +# Patch for DataHub customizations in Vendors.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/Vendors.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/api/Vendors.java 2025-10-02 14:47:49.324288220 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/api/Vendors.java 2025-09-12 19:50:04.457417323 +0200 +@@ -22,7 +22,9 @@ + // Add vendor classes here + "io.openlineage.spark.agent.vendor.snowflake.SnowflakeVendor", + "io.openlineage.spark.agent.vendor.iceberg.IcebergVendor", +- "io.openlineage.spark.agent.vendor.gcp.GcpVendor"); ++ "io.openlineage.spark.agent.vendor.gcp.GcpVendor", ++ // This is the only chance we have to add the RedshiftVendor to the list of vendors ++ "io.openlineage.spark.agent.vendor.redshift.RedshiftVendor"); + + static Vendors getVendors() { + return getVendors(Collections.emptyList()); diff --git a/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/WriteToDataSourceV2Visitor.patch b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/WriteToDataSourceV2Visitor.patch new file mode 100644 index 00000000000000..0d915b8c00de72 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/patches/datahub-customizations/v1.38.0/WriteToDataSourceV2Visitor.patch @@ -0,0 +1,184 @@ +# Patch for DataHub customizations in WriteToDataSourceV2Visitor.java +# Upstream version: OpenLineage 1.38.0 +# Generated: 2025-10-02 12:57:22 UTC +# +# To apply this patch to a new upstream version: +# patch -p0 < datahub-customizations/WriteToDataSourceV2Visitor.patch +# +--- /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/patches/upstream-1.38.0/spark/agent/lifecycle/plan/WriteToDataSourceV2Visitor.java 2025-10-02 14:47:51.875538520 +0200 ++++ /Users/treff7es/shadow/datahub/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/WriteToDataSourceV2Visitor.java 2025-09-12 19:50:04.456873157 +0200 +@@ -2,13 +2,18 @@ + /* Copyright 2018-2025 contributors to the OpenLineage project + /* SPDX-License-Identifier: Apache-2.0 + */ +- ++/* ++This class is shadowed from Openlineage to support foreachBatch in streaming ++*/ + package io.openlineage.spark.agent.lifecycle.plan; + + import io.openlineage.client.OpenLineage.OutputDataset; ++import io.openlineage.client.utils.DatasetIdentifier; ++import io.openlineage.spark.agent.util.PathUtils; + import io.openlineage.spark.agent.util.ScalaConversionUtils; + import io.openlineage.spark.api.OpenLineageContext; + import io.openlineage.spark.api.QueryPlanVisitor; ++import java.net.URI; + import java.util.Collections; + import java.util.List; + import java.util.Map; +@@ -30,6 +35,8 @@ + extends QueryPlanVisitor { + private static final String KAFKA_STREAMING_WRITE_CLASS_NAME = + "org.apache.spark.sql.kafka010.KafkaStreamingWrite"; ++ private static final String FOREACH_BATCH_SINK_CLASS_NAME = ++ "org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink"; + + public WriteToDataSourceV2Visitor(@NonNull OpenLineageContext context) { + super(context); +@@ -60,6 +67,13 @@ + String streamingWriteClassName = streamingWriteClass.getCanonicalName(); + if (KAFKA_STREAMING_WRITE_CLASS_NAME.equals(streamingWriteClassName)) { + result = handleKafkaStreamingWrite(streamingWrite); ++ } else if (streamingWriteClassName != null ++ && (streamingWriteClassName.contains("FileStreamSink") ++ || streamingWriteClassName.contains("ForeachBatchSink") ++ || streamingWriteClassName.contains("ConsoleSink") ++ || streamingWriteClassName.contains("DeltaSink") ++ || streamingWriteClassName.contains("ParquetSink"))) { ++ result = handleFileBasedStreamingWrite(streamingWrite, write); + } else { + log.warn( + "The streaming write class '{}' for '{}' is not supported", +@@ -73,6 +87,131 @@ + return result; + } + ++ private @NotNull List handleFileBasedStreamingWrite( ++ StreamingWrite streamingWrite, WriteToDataSourceV2 write) { ++ log.debug( ++ "Handling file-based streaming write: {}", streamingWrite.getClass().getCanonicalName()); ++ ++ try { ++ // Try to extract path from streaming write ++ Optional pathOpt = extractPathFromStreamingWrite(streamingWrite); ++ if (!pathOpt.isPresent()) { ++ log.warn("Could not extract path from file-based streaming write"); ++ return Collections.emptyList(); ++ } ++ ++ String path = pathOpt.get(); ++ log.debug("Found streaming write path: {}", path); ++ ++ // Create dataset from path ++ URI uri = URI.create(path); ++ DatasetIdentifier identifier = PathUtils.fromURI(uri); ++ String namespace = identifier.getNamespace(); ++ String name = identifier.getName(); ++ ++ log.debug("Creating output dataset with namespace: {}, name: {}", namespace, name); ++ ++ // Get schema from the write operation ++ StructType schema = null; ++ if (write.query() != null) { ++ schema = write.query().schema(); ++ } ++ ++ // Use the inherited outputDataset() method to create the dataset ++ OutputDataset dataset = outputDataset().getDataset(name, namespace, schema); ++ return Collections.singletonList(dataset); ++ ++ } catch (Exception e) { ++ log.error("Error extracting output dataset from file-based streaming write", e); ++ return Collections.emptyList(); ++ } ++ } ++ ++ private Optional extractPathFromStreamingWrite(StreamingWrite streamingWrite) { ++ try { ++ // Try to get path using reflection from various sink types ++ String className = streamingWrite.getClass().getCanonicalName(); ++ ++ // For ForeachBatchSink, try to get the underlying sink's path ++ if (className != null && className.contains("ForeachBatchSink")) { ++ // ForeachBatchSink typically wraps another sink or has batch function ++ // We need to extract path from the context of how it's used ++ return tryExtractPathFromForeachBatch(streamingWrite); ++ } ++ ++ // For file-based sinks, try standard path extraction ++ if (className != null ++ && (className.contains("FileStreamSink") ++ || className.contains("ParquetSink") ++ || className.contains("DeltaSink"))) { ++ return tryExtractPathFromFileSink(streamingWrite); ++ } ++ ++ // For console sink, return console identifier ++ if (className != null && className.contains("ConsoleSink")) { ++ return Optional.of("console://output"); ++ } ++ ++ } catch (Exception e) { ++ log.debug("Error extracting path from streaming write: {}", e.getMessage()); ++ } ++ ++ return Optional.empty(); ++ } ++ ++ private Optional tryExtractPathFromForeachBatch(StreamingWrite streamingWrite) { ++ try { ++ // ForeachBatchSink doesn't have a direct path since outputs are determined ++ // dynamically by the user's foreachBatch function. The actual lineage ++ // will be captured when the user's function executes batch operations. ++ // ++ // For now, we return empty to indicate that this sink doesn't have ++ // a predetermined output path, and rely on the batch operations ++ // within the foreachBatch function to generate proper lineage events. ++ log.debug("ForeachBatchSink detected - outputs will be tracked from batch operations"); ++ return Optional.empty(); ++ } catch (Exception e) { ++ log.debug("Could not extract path from ForeachBatchSink: {}", e.getMessage()); ++ return Optional.empty(); ++ } ++ } ++ ++ private Optional tryExtractPathFromFileSink(StreamingWrite streamingWrite) { ++ try { ++ // Try to extract path using reflection ++ Optional pathOpt = tryReadField(streamingWrite, "path"); ++ if (pathOpt.isPresent()) { ++ return pathOpt; ++ } ++ ++ // Try alternative field names ++ pathOpt = tryReadField(streamingWrite, "outputPath"); ++ if (pathOpt.isPresent()) { ++ return pathOpt; ++ } ++ ++ pathOpt = tryReadField(streamingWrite, "location"); ++ if (pathOpt.isPresent()) { ++ return pathOpt; ++ } ++ ++ } catch (Exception e) { ++ log.debug("Error extracting path from file sink: {}", e.getMessage()); ++ } ++ ++ return Optional.empty(); ++ } ++ ++ private Optional tryReadField(Object target, String fieldName) { ++ try { ++ T result = (T) FieldUtils.readDeclaredField(target, fieldName, true); ++ return result == null ? Optional.empty() : Optional.of(result); ++ } catch (IllegalAccessException e) { ++ log.debug("Could not read field {}: {}", fieldName, e.getMessage()); ++ return Optional.empty(); ++ } ++ } ++ + private @NotNull List handleKafkaStreamingWrite(StreamingWrite streamingWrite) { + KafkaStreamWriteProxy proxy = new KafkaStreamWriteProxy(streamingWrite); + Optional topicOpt = proxy.getTopic(); diff --git a/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh b/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh index 96d124d5de9016..e837e33eedfdb2 100755 --- a/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh +++ b/metadata-integration/java/acryl-spark-lineage/scripts/check_jar.sh @@ -25,28 +25,22 @@ for jarFile in ${jarFiles}; do grep -v "git.properties" |\ grep -v "org/aopalliance" |\ grep -v "javax/" |\ - grep -v "io/swagger" |\ grep -v "JavaSpring" |\ grep -v "java-header-style.xml" |\ grep -v "xml-header-style.xml" |\ grep -v "license.header" |\ grep -v "module-info.class" |\ - grep -v "com/google/" |\ - grep -v "org/codehaus/" |\ grep -v "client.properties" |\ grep -v "kafka" |\ grep -v "win/" |\ grep -v "include/" |\ grep -v "linux/" |\ grep -v "darwin" |\ + grep -v "aix" |\ grep -v "MetadataChangeProposal.avsc" |\ grep -v "io.openlineage" |\ - grep -v "org.apache" |\ - grep -v "aix" |\ - grep -v "io/micrometer/" |\ grep -v "library.properties|rootdoc.txt" \| - grep -v "com/ibm/.*" |\ - grep -v "org/publicsuffix" + grep -v "com/ibm/.*" if [ $? -ne 0 ]; then diff --git a/metadata-integration/java/acryl-spark-lineage/scripts/fetch-upstream.sh b/metadata-integration/java/acryl-spark-lineage/scripts/fetch-upstream.sh new file mode 100755 index 00000000000000..0ebbe402f4bd4c --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/scripts/fetch-upstream.sh @@ -0,0 +1,79 @@ +#!/bin/bash +# +# Fetch upstream OpenLineage files from GitHub +# +# Usage: ./scripts/fetch-upstream.sh +# Example: ./scripts/fetch-upstream.sh 1.38.0 +# + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +MODULE_DIR="$(dirname "$SCRIPT_DIR")" + +VERSION="${1:-1.33.0}" + +if [ -z "$VERSION" ]; then + echo "Usage: $0 " + echo "Example: $0 1.38.0" + exit 1 +fi + +GITHUB_BASE="https://raw.githubusercontent.com/OpenLineage/OpenLineage" +OUTPUT_DIR="$MODULE_DIR/patches/upstream-$VERSION" + +# Files to fetch - array of "path|file" pairs +# Format: "upstream_path|relative_file_path" +FILES=( + "integration/spark/shared/src/main/java/io/openlineage|spark/api/Vendors.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/api/VendorsContext.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/api/VendorsImpl.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/PathUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/PlanUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/RddPathUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/RemovePathPatternUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/SparkOpenLineageExtensionVisitorWrapper.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/plan/FileStreamMicroBatchStreamStrategy.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/plan/StreamingDataSourceV2RelationVisitor.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/plan/WriteToDataSourceV2Visitor.java" + "integration/spark/spark3/src/main/java/io/openlineage|spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java" + "integration/spark/spark3/src/main/java/io/openlineage|spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java" +) + +echo "Fetching OpenLineage $VERSION files from GitHub..." +echo "" + +mkdir -p "$OUTPUT_DIR" + +SUCCESS_COUNT=0 +FAIL_COUNT=0 + +for entry in "${FILES[@]}"; do + # Split on pipe: upstream_path|file_path + IFS='|' read -r upstream_path file_path <<< "$entry" + + URL="$GITHUB_BASE/$VERSION/$upstream_path/$file_path" + OUTPUT="$OUTPUT_DIR/$file_path" + + mkdir -p "$(dirname "$OUTPUT")" + + echo -n "Fetching: $file_path ... " + + if curl -sf "$URL" -o "$OUTPUT" 2>/dev/null; then + echo "✓" + ((SUCCESS_COUNT++)) + else + echo "✗ (not found - may be DataHub-specific)" + rm -f "$OUTPUT" + ((FAIL_COUNT++)) + fi +done + +echo "" +echo "=========================================" +echo "Fetch complete!" +echo "Success: $SUCCESS_COUNT files" +echo "Not found: $FAIL_COUNT files" +echo "Output directory: $OUTPUT_DIR" +echo "=========================================" \ No newline at end of file diff --git a/metadata-integration/java/acryl-spark-lineage/scripts/generate-patches.sh b/metadata-integration/java/acryl-spark-lineage/scripts/generate-patches.sh new file mode 100755 index 00000000000000..87baecf50317db --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/scripts/generate-patches.sh @@ -0,0 +1,96 @@ +#!/bin/bash +# +# Generate patch files showing DataHub customizations vs upstream OpenLineage +# +# Usage: ./scripts/generate-patches.sh +# + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +MODULE_DIR="$(dirname "$SCRIPT_DIR")" +UPSTREAM_VERSION="${1:-1.33.0}" +UPSTREAM_DIR="$MODULE_DIR/patches/upstream-$UPSTREAM_VERSION" +PATCHES_DIR="$MODULE_DIR/patches" +DATAHUB_SRC="$MODULE_DIR/src/main/java/io/openlineage" + +# Files to compare +FILES=( + "spark/api/Vendors.java" + "spark/api/VendorsContext.java" + "spark/api/VendorsImpl.java" + "spark/agent/util/PathUtils.java" + "spark/agent/util/PlanUtils.java" + "spark/agent/util/RddPathUtils.java" + "spark/agent/util/RemovePathPatternUtils.java" + "spark/agent/lifecycle/SparkOpenLineageExtensionVisitorWrapper.java" + "spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java" + "spark/agent/lifecycle/plan/StreamingDataSourceV2RelationVisitor.java" + "spark/agent/lifecycle/plan/WriteToDataSourceV2Visitor.java" + "spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java" + "spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java" +) + +echo "Generating patches for DataHub customizations vs OpenLineage $UPSTREAM_VERSION..." + +# Check if upstream directory exists +if [ ! -d "$UPSTREAM_DIR" ]; then + echo "Error: Upstream directory not found: $UPSTREAM_DIR" + echo "Please fetch upstream files first using scripts/fetch-upstream.sh" + exit 1 +fi + +# Create output directory for patches +mkdir -p "$PATCHES_DIR/datahub-customizations" + +# Generate patches for each file +for file in "${FILES[@]}"; do + UPSTREAM_FILE="$UPSTREAM_DIR/$file" + DATAHUB_FILE="$DATAHUB_SRC/$file" + PATCH_FILE="$PATCHES_DIR/datahub-customizations/$(basename "$file" .java).patch" + + if [ ! -f "$UPSTREAM_FILE" ]; then + echo "Warning: Upstream file not found: $UPSTREAM_FILE (skipping)" + continue + fi + + if [ ! -f "$DATAHUB_FILE" ]; then + echo "Warning: DataHub file not found: $DATAHUB_FILE (DataHub-specific file)" + # Create a note file indicating this is a DataHub-specific addition + echo "# DataHub-specific file - not present in upstream OpenLineage $UPSTREAM_VERSION" > "${PATCH_FILE}.note" + continue + fi + + # Generate unified diff + if diff -u "$UPSTREAM_FILE" "$DATAHUB_FILE" > "$PATCH_FILE" 2>/dev/null; then + # No differences found + echo "No customizations in: $file" + rm -f "$PATCH_FILE" + else + # Differences found + echo "Generated patch: $(basename "$PATCH_FILE")" + + # Add header to patch file + TEMP_PATCH=$(mktemp) + cat > "$TEMP_PATCH" <> "$TEMP_PATCH" + mv "$TEMP_PATCH" "$PATCH_FILE" + fi +done + +echo "" +echo "Patch generation complete!" +echo "Patches stored in: $PATCHES_DIR/datahub-customizations/" +echo "" +echo "To apply these patches to a new upstream version:" +echo " 1. Place new upstream files in patches/upstream-/" +echo " 2. Copy files to src/main/java/io/openlineage/" +echo " 3. Apply patches: for p in patches/datahub-customizations/*.patch; do patch -p0 < \$p; done" \ No newline at end of file diff --git a/metadata-integration/java/acryl-spark-lineage/scripts/upgrade-openlineage.sh b/metadata-integration/java/acryl-spark-lineage/scripts/upgrade-openlineage.sh new file mode 100755 index 00000000000000..d4b1b08e39bb2b --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/scripts/upgrade-openlineage.sh @@ -0,0 +1,257 @@ +#!/bin/bash +# +# Automate OpenLineage version upgrades for acryl-spark-lineage +# +# This script: +# 1. Fetches upstream OpenLineage files at specified versions +# 2. Compares old vs new upstream versions +# 3. Applies DataHub customizations to the new version +# 4. Updates the version in build.gradle +# +# Usage: ./scripts/upgrade-openlineage.sh +# Example: ./scripts/upgrade-openlineage.sh 1.33.0 1.38.0 +# + +set -e + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +MODULE_DIR="$(dirname "$SCRIPT_DIR")" +ROOT_DIR="$(cd "$MODULE_DIR/../../../.." && pwd)" + +OLD_VERSION="$1" +NEW_VERSION="$2" + +if [ -z "$OLD_VERSION" ] || [ -z "$NEW_VERSION" ]; then + echo "Usage: $0 " + echo "Example: $0 1.33.0 1.38.0" + exit 1 +fi + +echo "=========================================" +echo "OpenLineage Upgrade: $OLD_VERSION → $NEW_VERSION" +echo "=========================================" +echo "" + +# Files to upgrade - array of "path|file" pairs +# Format: "upstream_path|relative_file_path" +FILES=( + "integration/spark/shared/src/main/java/io/openlineage|spark/api/Vendors.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/api/VendorsContext.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/api/VendorsImpl.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/PathUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/PlanUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/RddPathUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/util/RemovePathPatternUtils.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/SparkOpenLineageExtensionVisitorWrapper.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/plan/StreamingDataSourceV2RelationVisitor.java" + "integration/spark/shared/src/main/java/io/openlineage|spark/agent/lifecycle/plan/WriteToDataSourceV2Visitor.java" + "integration/spark/spark3/src/main/java/io/openlineage|spark3/agent/lifecycle/plan/MergeIntoCommandEdgeInputDatasetBuilder.java" + "integration/spark/spark3/src/main/java/io/openlineage|spark3/agent/lifecycle/plan/MergeIntoCommandInputDatasetBuilder.java" +) + +GITHUB_BASE="https://raw.githubusercontent.com/OpenLineage/OpenLineage" + +OLD_UPSTREAM_DIR="$MODULE_DIR/patches/upstream-$OLD_VERSION" +NEW_UPSTREAM_DIR="$MODULE_DIR/patches/upstream-$NEW_VERSION" +DATAHUB_SRC="$MODULE_DIR/src/main/java/io/openlineage" +PATCHES_DIR="$MODULE_DIR/patches/datahub-customizations" +BACKUP_DIR="$MODULE_DIR/patches/backup-$(date +%Y%m%d-%H%M%S)" + +echo "Step 1: Fetching upstream files..." +echo "-----------------------------------" + +mkdir -p "$NEW_UPSTREAM_DIR" + +for entry in "${FILES[@]}"; do + # Split on pipe: upstream_path|file_path + IFS='|' read -r upstream_path file_path <<< "$entry" + + URL="$GITHUB_BASE/$NEW_VERSION/$upstream_path/$file_path" + OUTPUT="$NEW_UPSTREAM_DIR/$file_path" + + mkdir -p "$(dirname "$OUTPUT")" + + echo "Fetching: $file_path" + if curl -sf "$URL" -o "$OUTPUT"; then + echo " ✓ Downloaded" + else + echo " ✗ Not found (may be DataHub-specific or moved)" + rm -f "$OUTPUT" + fi +done + +echo "" +echo "Step 2: Comparing upstream changes..." +echo "--------------------------------------" + +CHANGED_FILES=() +UNCHANGED_FILES=() + +for entry in "${FILES[@]}"; do + # Split on pipe: upstream_path|file_path + IFS='|' read -r upstream_path file_path <<< "$entry" + + OLD_FILE="$OLD_UPSTREAM_DIR/$file_path" + NEW_FILE="$NEW_UPSTREAM_DIR/$file_path" + + if [ ! -f "$OLD_FILE" ] && [ ! -f "$NEW_FILE" ]; then + continue + fi + + if [ ! -f "$OLD_FILE" ]; then + echo "NEW: $file_path (added in $NEW_VERSION)" + CHANGED_FILES+=("$file_path") + elif [ ! -f "$NEW_FILE" ]; then + echo "REMOVED: $file_path (removed in $NEW_VERSION)" + CHANGED_FILES+=("$file_path") + elif ! diff -q "$OLD_FILE" "$NEW_FILE" > /dev/null 2>&1; then + echo "CHANGED: $file_path" + CHANGED_FILES+=("$file_path") + else + UNCHANGED_FILES+=("$file_path") + fi +done + +echo "" +if [ ${#UNCHANGED_FILES[@]} -gt 0 ]; then + echo "Unchanged files (${#UNCHANGED_FILES[@]}):" + for file in "${UNCHANGED_FILES[@]}"; do + echo " - $file" + done + echo "" +fi + +if [ ${#CHANGED_FILES[@]} -eq 0 ]; then + echo "No upstream changes detected. Skipping file updates." +else + echo "Changed files (${#CHANGED_FILES[@]}):" + for file in "${CHANGED_FILES[@]}"; do + echo " - $file" + done + + echo "" + echo "Step 3: Backing up current files..." + echo "------------------------------------" + + mkdir -p "$BACKUP_DIR" + for file in "${CHANGED_FILES[@]}"; do + DATAHUB_FILE="$DATAHUB_SRC/$file" + if [ -f "$DATAHUB_FILE" ]; then + BACKUP_FILE="$BACKUP_DIR/$file" + mkdir -p "$(dirname "$BACKUP_FILE")" + cp "$DATAHUB_FILE" "$BACKUP_FILE" + echo "Backed up: $file" + fi + done + + echo "" + echo "Step 4: Updating files with new upstream..." + echo "---------------------------------------------" + + for file in "${CHANGED_FILES[@]}"; do + NEW_FILE="$NEW_UPSTREAM_DIR/$file" + DATAHUB_FILE="$DATAHUB_SRC/$file" + + if [ -f "$NEW_FILE" ]; then + mkdir -p "$(dirname "$DATAHUB_FILE")" + cp "$NEW_FILE" "$DATAHUB_FILE" + echo "Updated: $file" + elif [ -f "$DATAHUB_FILE" ]; then + echo "Removed upstream file (keeping DataHub version): $file" + fi + done + + echo "" + echo "Step 5: Applying DataHub customizations..." + echo "--------------------------------------------" + + PATCH_CONFLICTS=() + + for file in "${CHANGED_FILES[@]}"; do + PATCH_FILE="$PATCHES_DIR/$(basename "$file" .java).patch" + DATAHUB_FILE="$DATAHUB_SRC/$file" + + if [ -f "$PATCH_FILE" ] && [ -f "$DATAHUB_FILE" ]; then + echo "Applying patch: $(basename "$PATCH_FILE")" + + if patch -p0 --dry-run < "$PATCH_FILE" > /dev/null 2>&1; then + patch -p0 < "$PATCH_FILE" + echo " ✓ Applied successfully" + else + echo " ✗ Patch conflict detected - manual merge required" + PATCH_CONFLICTS+=("$file") + # Restore backup + BACKUP_FILE="$BACKUP_DIR/$file" + if [ -f "$BACKUP_FILE" ]; then + cp "$BACKUP_FILE" "$DATAHUB_FILE" + echo " ↻ Restored original file" + fi + fi + fi + done + + if [ ${#PATCH_CONFLICTS[@]} -gt 0 ]; then + echo "" + echo "⚠️ WARNING: Patch conflicts detected in:" + for file in "${PATCH_CONFLICTS[@]}"; do + echo " - $file" + done + echo "" + echo "Manual merge required. See upgrade guide in CLAUDE.md" + echo "Backup files available in: $BACKUP_DIR" + fi +fi + +echo "" +echo "Step 6: Updating version in build.gradle..." +echo "---------------------------------------------" + +BUILD_GRADLE="$ROOT_DIR/build.gradle" + +if [ -f "$BUILD_GRADLE" ]; then + if grep -q "ext.openLineageVersion = '$OLD_VERSION'" "$BUILD_GRADLE"; then + sed -i.bak "s/ext.openLineageVersion = '$OLD_VERSION'/ext.openLineageVersion = '$NEW_VERSION'/" "$BUILD_GRADLE" + echo "✓ Updated build.gradle: $OLD_VERSION → $NEW_VERSION" + rm -f "$BUILD_GRADLE.bak" + else + echo "⚠️ Warning: Could not find version $OLD_VERSION in build.gradle" + echo " Please update manually: ext.openLineageVersion = '$NEW_VERSION'" + fi +else + echo "⚠️ Warning: build.gradle not found at $BUILD_GRADLE" +fi + +echo "" +echo "=========================================" +echo "Upgrade Summary" +echo "=========================================" +echo "Old version: $OLD_VERSION" +echo "New version: $NEW_VERSION" +echo "Changed files: ${#CHANGED_FILES[@]}" +echo "Patch conflicts: ${#PATCH_CONFLICTS[@]}" +echo "" + +if [ ${#PATCH_CONFLICTS[@]} -eq 0 ]; then + echo "✓ Upgrade completed successfully!" + echo "" + echo "Next steps:" + echo " 1. Review changes: git diff" + echo " 2. Build: ./gradlew :metadata-integration:java:acryl-spark-lineage:build" + echo " 3. Test: ./gradlew :metadata-integration:java:acryl-spark-lineage:test" + echo " 4. Update CLAUDE.md if needed" +else + echo "⚠️ Upgrade completed with conflicts - manual review required" + echo "" + echo "Next steps:" + echo " 1. Review conflicts in files listed above" + echo " 2. Manually merge changes from:" + echo " - Backup: $BACKUP_DIR" + echo " - New upstream: $NEW_UPSTREAM_DIR" + echo " 3. Regenerate patches: ./scripts/generate-patches.sh $NEW_VERSION" + echo " 4. Build and test" +fi + +echo "" +echo "Backup location: $BACKUP_DIR" +echo "=========================================" \ No newline at end of file diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java index 8daa9cbff9ed0d..f0353bc78b86f8 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/datahub/spark/DatahubSparkListener.java @@ -60,7 +60,7 @@ public class DatahubSparkListener extends SparkListener { private static final Logger log = LoggerFactory.getLogger(DatahubSparkListener.class); private final Map batchLastUpdated = new HashMap(); - private final OpenLineageSparkListener listener; + private OpenLineageSparkListener listener; private DatahubEventEmitter emitter; private Config datahubConf = ConfigFactory.empty(); private SparkAppContext appContext; @@ -78,7 +78,8 @@ public class DatahubSparkListener extends SparkListener { public DatahubSparkListener(SparkConf conf) throws URISyntaxException { this.conf = ((SparkConf) Objects.requireNonNull(conf)).clone(); - listener = new OpenLineageSparkListener(conf); + // Listener will be created lazily in initializeContextFactoryIfNotInitialized + // after we can inject our custom DatahubEventEmitter via ContextFactory log.info( "Initializing DatahubSparkListener. Version: {} with Spark version: {}", VersionUtil.getVersion(), @@ -103,7 +104,7 @@ public void onApplicationStart(SparkListenerApplicationStart applicationStart) { log.info("Application start called"); this.appContext = getSparkAppContext(applicationStart); - initializeContextFactoryIfNotInitialized(); + initializeContextFactoryIfNotInitialized(applicationStart.appName()); listener.onApplicationStart(applicationStart); long elapsedTime = System.currentTimeMillis() - startTime; log.info("onApplicationStart completed successfully in {} ms", elapsedTime); @@ -307,6 +308,7 @@ public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { public void onTaskEnd(SparkListenerTaskEnd taskEnd) { long startTime = System.currentTimeMillis(); + initializeContextFactoryIfNotInitialized(); log.debug("Task end called"); listener.onTaskEnd(taskEnd); @@ -316,6 +318,7 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { public void onJobEnd(SparkListenerJobEnd jobEnd) { long startTime = System.currentTimeMillis(); + initializeContextFactoryIfNotInitialized(); log.debug("Job end called"); listener.onJobEnd(jobEnd); @@ -335,6 +338,8 @@ public void onJobStart(SparkListenerJobStart jobStart) { public void onOtherEvent(SparkListenerEvent event) { long startTime = System.currentTimeMillis(); + initializeContextFactoryIfNotInitialized(); + log.debug("Other event called {}", event.getClass().getName()); listener.onOtherEvent(event); } @@ -398,7 +403,13 @@ private void initializeContextFactoryIfNotInitialized(SparkConf sparkConf, Strin emitter.setConfig(datahubConfig); contextFactory = new ContextFactory(emitter, meterRegistry, config); circuitBreaker = new CircuitBreakerFactory(config.getCircuitBreaker()).build(); - OpenLineageSparkListener.init(contextFactory); + + // In OpenLineage 1.37.0+, the static init() method was removed. + // Instead, we use overrideDefaultFactoryForTests() to inject our custom ContextFactory + // before creating the listener. The method name says "ForTests" but it's the official + // way to inject a custom emitter - see OpenLineageSparkListener source code. + OpenLineageSparkListener.overrideDefaultFactoryForTests(contextFactory); + listener = new OpenLineageSparkListener(sparkConf); } catch (URISyntaxException e) { log.error("Unable to parse OpenLineage endpoint. Lineage events will not be collected", e); } diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java index 1ac5863b2368bf..a04973e762c1fe 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/lifecycle/plan/SaveIntoDataSourceCommandVisitor.java @@ -16,6 +16,7 @@ import io.openlineage.client.utils.DatasetIdentifier; import io.openlineage.client.utils.jdbc.JdbcDatasetUtils; import io.openlineage.spark.agent.util.DatasetFacetsUtils; +import io.openlineage.spark.agent.util.LogicalRelationFactory; import io.openlineage.spark.agent.util.PathUtils; import io.openlineage.spark.agent.util.PlanUtils; import io.openlineage.spark.agent.util.ScalaConversionUtils; @@ -142,39 +143,11 @@ public List apply(SparkListenerEvent event, SaveIntoDataSourceCom (SaveMode.Overwrite == command.mode()) ? OVERWRITE : CREATE; if (command.dataSource().getClass().getName().contains("DeltaDataSource")) { - // Handle path-based Delta tables if (command.options().contains("path")) { URI uri = URI.create(command.options().get("path").get()); return Collections.singletonList( outputDataset().getDataset(PathUtils.fromURI(uri), schema, lifecycleStateChange)); } - - // Handle catalog-based Delta tables (saveAsTable scenarios) - if (command.options().contains("table")) { - String tableName = command.options().get("table").get(); - // For catalog tables, use the default namespace or catalog - String namespace = "spark_catalog"; // Default Spark catalog namespace - DatasetIdentifier identifier = new DatasetIdentifier(tableName, namespace); - return Collections.singletonList( - outputDataset().getDataset(identifier, schema, lifecycleStateChange)); - } - - // Handle saveAsTable without explicit table option - check for table info in query execution - if (context.getQueryExecution().isPresent()) { - QueryExecution qe = context.getQueryExecution().get(); - // Try to extract table name from query execution context - String extractedTableName = extractTableNameFromContext(qe); - if (extractedTableName != null) { - String namespace = "spark_catalog"; - DatasetIdentifier identifier = new DatasetIdentifier(extractedTableName, namespace); - return Collections.singletonList( - outputDataset().getDataset(identifier, schema, lifecycleStateChange)); - } - } - - log.debug( - "Delta table detected but could not determine path or table name from options: {}", - command.options()); } if (command @@ -212,11 +185,12 @@ public List apply(SparkListenerEvent event, SaveIntoDataSourceCom throw ex; } LogicalRelation logicalRelation = - new LogicalRelation( - relation, - ScalaConversionUtils.asScalaSeqEmpty(), - Option.empty(), - command.isStreaming()); + LogicalRelationFactory.create( + relation, + ScalaConversionUtils.asScalaSeqEmpty(), + Option.empty(), + command.isStreaming()) + .orElseThrow(() -> new RuntimeException("Failed to create LogicalRelation")); return delegate( context.getOutputDatasetQueryPlanVisitors(), context.getOutputDatasetBuilders(), event) .applyOrElse( @@ -266,57 +240,6 @@ private StructType getSchema(SaveIntoDataSourceCommand command) { return schema; } - /** - * Attempts to extract table name from QueryExecution context for saveAsTable operations. This - * handles cases where the table name isn't explicitly in the command options. - */ - private String extractTableNameFromContext(QueryExecution qe) { - try { - // Try to get table name from SQL text if available - // Note: sqlText() is not available in all Spark versions, use reflection - try { - java.lang.reflect.Method sqlTextMethod = qe.getClass().getMethod("sqlText"); - Object sqlOption = sqlTextMethod.invoke(qe); - if (sqlOption != null && ((Option) sqlOption).isDefined()) { - String sql = (String) ((Option) sqlOption).get(); - log.debug("Attempting to extract table name from SQL: {}", sql); - - // Look for saveAsTable pattern which typically generates CREATE TABLE statements - if (sql.toLowerCase().contains("create table")) { - // Extract table name using regex pattern matching - String[] tokens = sql.split("\\s+"); - for (int i = 0; i < tokens.length - 1; i++) { - if (tokens[i].toLowerCase().equals("table")) { - String candidateTableName = tokens[i + 1]; - // Clean up table name (remove backticks, quotes, database prefix) - candidateTableName = candidateTableName.replaceAll("[`'\"]", ""); - // Handle database.table format by taking just the table name - if (candidateTableName.contains(".")) { - String[] parts = candidateTableName.split("\\."); - candidateTableName = parts[parts.length - 1]; // Take the last part (table name) - } - if (!candidateTableName.isEmpty() - && !candidateTableName.toLowerCase().equals("if")) { - log.debug("Extracted table name from SQL: {}", candidateTableName); - return candidateTableName; - } - } - } - } - } - } catch (Exception reflectionEx) { - log.debug( - "sqlText() method not available in this Spark version: {}", reflectionEx.getMessage()); - } - - log.debug("Could not extract table name from QueryExecution SQL text"); - } catch (Exception e) { - log.debug("Error extracting table name from QueryExecution: {}", e.getMessage()); - } - - return null; - } - @Override public Optional jobNameSuffix(OpenLineageContext context) { return context diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/JdbcSparkUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/JdbcSparkUtils.java new file mode 100644 index 00000000000000..b29873f026baa0 --- /dev/null +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/JdbcSparkUtils.java @@ -0,0 +1,156 @@ +/* +/* Copyright 2018-2025 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.agent.util; + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.utils.DatasetIdentifier; +import io.openlineage.client.utils.jdbc.JdbcDatasetUtils; +import io.openlineage.spark.api.DatasetFactory; +import io.openlineage.sql.ColumnLineage; +import io.openlineage.sql.ColumnMeta; +import io.openlineage.sql.DbTableMeta; +import io.openlineage.sql.ExtractionError; +import io.openlineage.sql.OpenLineageSql; +import io.openlineage.sql.SqlMeta; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions; +import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions$; +import org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +@Slf4j +public class JdbcSparkUtils { + + public static List getDatasets( + DatasetFactory datasetFactory, SqlMeta meta, JDBCRelation relation) { + + StructType schema = relation.schema(); + String jdbcUrl = relation.jdbcOptions().url(); + Properties jdbcProperties = relation.jdbcOptions().asConnectionProperties(); + + if (meta.columnLineage().isEmpty()) { + int numberOfTables = meta.inTables().size(); + + return meta.inTables().stream() + .map( + dbtm -> { + DatasetIdentifier di = + JdbcDatasetUtils.getDatasetIdentifier( + jdbcUrl, dbtm.qualifiedName(), jdbcProperties); + + if (numberOfTables > 1) { + return datasetFactory.getDataset(di.getName(), di.getNamespace()); + } + + return datasetFactory.getDataset(di.getName(), di.getNamespace(), schema); + }) + .collect(Collectors.toList()); + } + return meta.inTables().stream() + .map( + dbtm -> { + DatasetIdentifier di = + JdbcDatasetUtils.getDatasetIdentifier( + jdbcUrl, dbtm.qualifiedName(), jdbcProperties); + return datasetFactory.getDataset( + di.getName(), di.getNamespace(), generateSchemaFromSqlMeta(dbtm, schema, meta)); + }) + .collect(Collectors.toList()); + } + + public static StructType generateSchemaFromSqlMeta( + DbTableMeta origin, StructType schema, SqlMeta sqlMeta) { + StructType originSchema = new StructType(); + for (StructField f : schema.fields()) { + List fields = + sqlMeta.columnLineage().stream() + .filter(cl -> cl.descendant().name().equals(f.name())) + .flatMap( + cl -> + cl.lineage().stream() + .filter( + cm -> cm.origin().isPresent() && cm.origin().get().equals(origin))) + .collect(Collectors.toList()); + for (ColumnMeta cm : fields) { + originSchema = originSchema.add(cm.name(), f.dataType()); + } + } + return originSchema; + } + + public static Optional extractQueryFromSpark(JDBCRelation relation) { + Optional table = + ScalaConversionUtils.asJavaOptional( + relation.jdbcOptions().parameters().get(JDBCOptions$.MODULE$.JDBC_TABLE_NAME())); + // in some cases table value can be "(SELECT col1, col2 FROM table_name WHERE some='filter') + // ALIAS" + if (table.isPresent() && !table.get().startsWith("(")) { + DbTableMeta origin = new DbTableMeta(null, null, table.get()); + return Optional.of( + new SqlMeta( + Collections.singletonList(origin), + Collections.emptyList(), + Arrays.stream(relation.schema().fields()) + .map( + field -> + new ColumnLineage( + new ColumnMeta(null, field.name()), + Collections.singletonList(new ColumnMeta(origin, field.name())))) + .collect(Collectors.toList()), + Collections.emptyList())); + } + + String query = queryStringFromJdbcOptions(relation.jdbcOptions()); + + String dialect = extractDialectFromJdbcUrl(relation.jdbcOptions().url()); + Optional sqlMeta = OpenLineageSql.parse(Collections.singletonList(query), dialect); + + if (!sqlMeta.isPresent()) { // missing JNI library + return sqlMeta; + } + if (!sqlMeta.get().errors().isEmpty()) { // error return nothing + log.error( + String.format( + "error while parsing query: %s", + sqlMeta.get().errors().stream() + .map(ExtractionError::toString) + .collect(Collectors.joining(",")))); + return Optional.empty(); + } + if (sqlMeta.get().inTables().isEmpty()) { + log.error("no tables defined in query, this should not happen"); + return Optional.empty(); + } + return sqlMeta; + } + + public static String queryStringFromJdbcOptions(JDBCOptions options) { + String tableOrQuery = options.tableOrQuery(); + return tableOrQuery.substring(0, tableOrQuery.lastIndexOf(")")).replaceFirst("\\(", ""); + } + + private static String extractDialectFromJdbcUrl(String jdbcUrl) { + Pattern pattern = Pattern.compile("^jdbc:([^:]+):.*"); + Matcher matcher = pattern.matcher(jdbcUrl); + + if (matcher.find()) { + String dialect = matcher.group(1); + // Added to map SQL Server dialect to the one recognized by OpenLineage + return "sqlserver".equals(dialect) ? "mssql" : dialect; + } else { + return null; + } + } +} diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PathUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PathUtils.java index bdc06093de7974..ef15314b00baef 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PathUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PathUtils.java @@ -41,11 +41,7 @@ public static DatasetIdentifier fromURI(URI location) { public static DatasetIdentifier fromCatalogTable( CatalogTable catalogTable, SparkSession sparkSession) { URI locationUri; - if (catalogTable.storage() != null && catalogTable.storage().locationUri().isDefined()) { - locationUri = catalogTable.storage().locationUri().get(); - } else { - locationUri = getDefaultLocationUri(sparkSession, catalogTable.identifier()); - } + locationUri = getLocationUri(catalogTable, sparkSession); return fromCatalogTable(catalogTable, sparkSession, locationUri); } @@ -142,13 +138,24 @@ public static Path reconstructDefaultLocation(String warehouse, String[] namespa return new Path(warehouse, database + ".db", name); } + public static Optional getMetastoreUri(SparkContext context) { + // make sure enableHiveSupport is called + Optional setting = + SparkConfUtils.findSparkConfigKey( + context.getConf(), StaticSQLConf.CATALOG_IMPLEMENTATION().key()); + if (!setting.isPresent() || !"hive".equals(setting.get())) { + return Optional.empty(); + } + return SparkConfUtils.getMetastoreUri(context); + } + @SneakyThrows public static URI prepareHiveUri(URI uri) { return new URI("hive", uri.getAuthority(), null, null, null); } @SneakyThrows - private static Optional getWarehouseLocation(SparkConf sparkConf, Configuration hadoopConf) { + public static Optional getWarehouseLocation(SparkConf sparkConf, Configuration hadoopConf) { Optional warehouseLocation = SparkConfUtils.findSparkConfigKey(sparkConf, StaticSQLConf.WAREHOUSE_PATH().key()); if (!warehouseLocation.isPresent()) { @@ -158,15 +165,14 @@ private static Optional getWarehouseLocation(SparkConf sparkConf, Configura return warehouseLocation.map(URI::create); } - private static Optional getMetastoreUri(SparkContext context) { - // make sure enableHiveSupport is called - Optional setting = - SparkConfUtils.findSparkConfigKey( - context.getConf(), StaticSQLConf.CATALOG_IMPLEMENTATION().key()); - if (!setting.isPresent() || !"hive".equals(setting.get())) { - return Optional.empty(); + private static URI getLocationUri(CatalogTable catalogTable, SparkSession sparkSession) { + URI locationUri; + if (catalogTable.storage() != null && catalogTable.storage().locationUri().isDefined()) { + locationUri = catalogTable.storage().locationUri().get(); + } else { + locationUri = getDefaultLocationUri(sparkSession, catalogTable.identifier()); } - return SparkConfUtils.getMetastoreUri(context); + return locationUri; } /** Get DatasetIdentifier name in format database.table or table */ diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java index 4ac3fa002ad177..6eac3a0825ec56 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/PlanUtils.java @@ -5,6 +5,8 @@ package io.openlineage.spark.agent.util; +import static io.openlineage.spark.agent.util.ScalaConversionUtils.asJavaOptional; + import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import datahub.spark.conf.SparkLineageConf; @@ -19,6 +21,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -31,6 +34,9 @@ import org.apache.spark.SparkEnv; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.catalyst.expressions.Attribute; +import org.apache.spark.sql.types.ArrayType; +import org.apache.spark.sql.types.MapType; +import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.PartialFunction; @@ -119,16 +125,65 @@ public static OpenLineage.SchemaDatasetFacet schemaFacet( private static List transformFields( OpenLineage openLineage, StructField... fields) { - List list = new ArrayList<>(); - for (StructField field : fields) { - list.add( - openLineage - .newSchemaDatasetFacetFieldsBuilder() - .name(field.name()) - .type(field.dataType().typeName()) - .build()); + return Arrays.stream(fields) + .map(field -> transformField(openLineage, field)) + .collect(Collectors.toList()); + } + + private static OpenLineage.SchemaDatasetFacetFields transformField( + OpenLineage openLineage, StructField field) { + OpenLineage.SchemaDatasetFacetFieldsBuilder builder = + openLineage + .newSchemaDatasetFacetFieldsBuilder() + .name(field.name()) + .type(field.dataType().typeName()); + + if (field.metadata() != null) { + // field.getComment() actually tries to access field.metadata(), + // and fails with NullPointerException if it is null instead of expected Metadata.empty() + builder = builder.description(asJavaOptional(field.getComment()).orElse(null)); + } + + if (field.dataType() instanceof StructType) { + StructType structField = (StructType) field.dataType(); + return builder + .type("struct") + .fields(transformFields(openLineage, structField.fields())) + .build(); + } + + if (field.dataType() instanceof MapType) { + MapType mapField = (MapType) field.dataType(); + return builder + .type("map") + .fields( + transformFields( + openLineage, + new StructField("key", mapField.keyType(), false, Metadata.empty()), + new StructField( + "value", + mapField.valueType(), + mapField.valueContainsNull(), + Metadata.empty()))) + .build(); + } + + if (field.dataType() instanceof ArrayType) { + ArrayType arrayField = (ArrayType) field.dataType(); + return builder + .type("array") + .fields( + transformFields( + openLineage, + new StructField( + "_element", + arrayField.elementType(), + arrayField.containsNull(), + Metadata.empty()))) + .build(); } - return list; + + return builder.build(); } /** @@ -204,6 +259,28 @@ public static OpenLineage.ParentRunFacet parentRunFacet( .build(); } + /** + * Given a list of paths, it collects list of data location directories. For each path, a parent + * directory is taken and list of distinct locations is returned. Operation is optimized to check + * for each path if it was already added to the list of normalized paths. + * + * @param paths + * @param hadoopConf + * @return + */ + public static List getDirectoryPaths(Collection paths, Configuration hadoopConf) { + LinkedHashSet normalizedPaths = new LinkedHashSet<>(); + for (Path path : paths) { + // check if the root path is already contained in normalized Paths + Path parent = path.getParent(); + if (parent != null && !normalizedPaths.contains(parent)) { + // if not, add new path to normalized paths -> call getDirectoryPath + normalizedPaths.add(PlanUtils.getDirectoryPath(path, hadoopConf)); + } + } + return new ArrayList<>(normalizedPaths); + } + public static Path getDirectoryPathOl(Path p, Configuration hadoopConf) { try { if (p.getFileSystem(hadoopConf).getFileStatus(p).isFile()) { @@ -282,6 +359,9 @@ public static boolean safeIsDefinedAt(PartialFunction pfn, Object x) { } catch (ClassCastException e) { // do nothing return false; + } catch (TypeNotPresentException e) { + log.info("isDefinedAt method failed due to missing type: {}", e.getMessage()); + return false; } catch (Exception e) { if (e != null) { log.info("isDefinedAt method failed on {}", e); diff --git a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java index 987da0f6bd06e3..fd0a6d933a9cfa 100644 --- a/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java +++ b/metadata-integration/java/acryl-spark-lineage/src/main/java/io/openlineage/spark/agent/util/RddPathUtils.java @@ -5,6 +5,7 @@ package io.openlineage.spark.agent.util; +import java.io.IOException; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,11 +15,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapreduce.Job; import org.apache.spark.package$; import org.apache.spark.rdd.HadoopRDD; import org.apache.spark.rdd.MapPartitionsRDD; +import org.apache.spark.rdd.NewHadoopRDD; import org.apache.spark.rdd.ParallelCollectionRDD; import org.apache.spark.rdd.RDD; +import org.apache.spark.rdd.UnionRDD; import org.apache.spark.sql.execution.datasources.FilePartition; import org.apache.spark.sql.execution.datasources.FileScanRDD; import scala.Tuple2; @@ -34,6 +38,8 @@ public static Stream findRDDPaths(RDD rdd) { new HadoopRDDExtractor(), new FileScanRDDExtractor(), new MapPartitionsRDDExtractor(), + new UnionRddExctractor(), + new NewHadoopRDDExtractor(), new ParallelCollectionRDDExtractor()) .filter(e -> e.isDefinedAt(rdd)) .findFirst() @@ -42,6 +48,20 @@ public static Stream findRDDPaths(RDD rdd) { .filter(p -> p != null); } + static class UnionRddExctractor implements RddPathExtractor { + @Override + public boolean isDefinedAt(Object rdd) { + return rdd instanceof UnionRDD; + } + + @Override + public Stream extract(RDD rdd) { + return ScalaConversionUtils.fromSeq(((UnionRDD) rdd).rdds()).stream() + .map(RDD.class::cast) + .flatMap(r -> findRDDPaths((RDD) r)); + } + } + static class UnknownRDDExtractor implements RddPathExtractor { @Override public boolean isDefinedAt(Object rdd) { @@ -71,7 +91,28 @@ public Stream extract(HadoopRDD rdd) { log.debug("Hadoop RDD input paths {}", Arrays.toString(inputPaths)); log.debug("Hadoop RDD job conf {}", rdd.getJobConf()); } - return Arrays.stream(inputPaths).map(p -> PlanUtils.getDirectoryPath(p, hadoopConf)); + return PlanUtils.getDirectoryPaths(Arrays.asList(inputPaths), hadoopConf).stream(); + } + } + + static class NewHadoopRDDExtractor implements RddPathExtractor { + @Override + public boolean isDefinedAt(Object rdd) { + return rdd instanceof NewHadoopRDD; + } + + @Override + public Stream extract(NewHadoopRDD rdd) { + try { + org.apache.hadoop.fs.Path[] inputPaths = + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getInputPaths( + new Job(((NewHadoopRDD) rdd).getConf())); + + return PlanUtils.getDirectoryPaths(Arrays.asList(inputPaths), rdd.getConf()).stream(); + } catch (IOException e) { + log.error("Openlineage spark agent could not get input paths", e); + } + return Stream.empty(); } }