Skip to content

Conversation

rkrumins
Copy link

@rkrumins rkrumins commented Oct 12, 2025

As mentioned above, adding support for writing all the lineage to HDFS in a centralised location. Some the key reasons why this is being considered:

  • Very large lineage messages (realistically anything more than default 1MB Kafka message size) are not easy to publish on messaging systems like Kafka - especially as we often see complex Spark jobs producing lineage messages larger than 100MB
  • Current HDFS Lineage dispatcher implementation works well when a single account is running all the Spark jobs but not when one needs to be able to gather lineage from all the different paths across dozens of service accounts that are running those jobs

Open to any suggestions as planning to add further PRs as I am extending current implementation.

Summary by CodeRabbit

  • New Features

    • Optional setting to store lineage centrally (when configured) instead of alongside data.
    • Centralized lineage files use unique, timestamped filenames including the application identifier.
    • Parent directories are created with proper permissions for supported filesystems; skipped for object-storage backends.
    • Default behavior unchanged unless the new setting is enabled.
  • Documentation

    • Examples and guidance for Local, S3, GCS, and HDFS; clarified centralized-mode behavior and filename format.
  • Tests

    • Integration tests covering centralized and default modes, empty/whitespace config cases, and content/filename validations.

@coderabbitai
Copy link

coderabbitai bot commented Oct 12, 2025

Walkthrough

Adds an optional customLineagePath config to the HDFS lineage dispatcher; dispatcher now resolves between centralized vs alongside-target lineage storage, generates UTC-timestamped unique filenames ({timestamp}{planId}{appId}), and conditionally creates parent directories (skipping object-storage backends). Integration tests cover unset/blank and centralized modes.

Changes

Cohort / File(s) Summary
Configuration: HDFS dispatcher option
core/src/main/resources/spline.default.yaml
Adds optional customLineagePath under spline.lineageDispatcher.hdfs, documents two operation modes (centralized vs alongside-target), shows filename format and examples for local, S3, GCS, and HDFS destinations.
Dispatcher implementation: path resolution & persistence
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala
Class signature extended with customLineagePath: Option[String]; secondary ctor reads trimmed/filtered config key CustomLineagePathKey; adds resolveLineagePath, generateUniqueFilename (UTC timestamp + planId + appId), and ensureParentDirectoriesExist; persistToHadoopFs uses resolved path and conditionally creates parent dirs (skipping object-store backends); replaces direct path concatenation with resolved path usage.
Integration tests: centralized vs default behavior
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala
Adds tests for DEFAULT mode (unset/empty/whitespace customLineagePath) and CENTRALIZED mode (non-blank customLineagePath); verifies centralized files in central directory with timestamped filenames containing appId, correct JSON content (executionPlan/executionEvent), and absence of per-destination _LINEAGE files when centralized.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Conf as Config
  participant Disp as HDFSLineageDispatcher
  participant FS as HadoopFS
  participant Tgt as TargetDir

  Conf->>Disp: init(..., customLineagePath?)
  Note over Disp: resolveLineagePath(customLineagePath, targetPath)
  alt Centralized (non-blank)
    Disp->>Disp: generateUniqueFilename(UTC_ts, planId, appId)
    Disp->>FS: ensureParentDirectoriesExist(centralDir)   %%{bgColor:#f3f4f6}%%
    Disp->>FS: write(centralDir/uniqueName.json, bytes)   %%{bgColor:#e6ffed}%%
  else Default (unset/blank)
    Disp->>FS: write(dirname(targetPath)/filename.json, bytes) %%{bgColor:#e6ffed}%%
  end
  FS-->>Disp: write complete
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

I hop where paths and timestamps meet the trail,
Central burrow or beside each file's vale.
I stitch app name, id, and UTC tick—
Skip cloud holes where mkdirs won't stick,
A rabbit's nibble: neat lineage on the trail. 🥕

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title accurately reflects the primary enhancement — extending the HDFS Lineage Dispatcher to support centralized lineage file storage — which is the central focus of the changeset. It succinctly captures the key functionality added without extraneous details or noise. This phrasing enables teammates to quickly understand the PR’s purpose at a glance.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bebc869 and 7bd5b0a.

📒 Files selected for processing (1)
  • core/src/main/resources/spline.default.yaml (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@wajda
Copy link
Contributor

wajda commented Oct 12, 2025

Snyk checks have passed. No issues have been found so far.

Status Scanner Critical High Medium Low Total (0)
Open Source Security 0 0 0 0 0 issues

💻 Catch issues earlier using the plugins for VS Code, JetBrains IDEs, Visual Studio, and Eclipse.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (2)

112-122: Defensive guard for empty paths (optional)

If normalization on read is ever bypassed (programmatic construction), add a defensive filter here to avoid “/” on blank customPath.

Apply:

   private def resolveLineagePath(planId: String): String = {
-    customLineagePath match {
+    customLineagePath.map(_.trim).filter(_.nonEmpty) match {
       case Some(customPath) =>
         val cleanCustomPath = customPath.stripSuffix("/")
         val uniqueFilename = generateUniqueFilename(planId)
         s"$cleanCustomPath/$uniqueFilename"
       case None =>
         s"${this._lastSeenPlan.operations.write.outputSource.stripSuffix("/")}/$filename"
     }
   }

137-143: Minor: reuse DateTimeFormatter; consider including planId

  • Hoist the formatter to a private static val to avoid reallocation per call.
  • Optional: include a short planId prefix/suffix in the filename to aid grepability (JSON still contains IDs).
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 677da45 and 6f0ecdf.

📒 Files selected for processing (3)
  • core/src/main/resources/spline.default.yaml (1 hunks)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (4)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2)

66-108: Centralized mode test looks good

Covers location, filename format, and JSON content coherently.


110-135: No normalization needed: getOptionalString already filters out empty and whitespace-only values, so customLineagePath will be None for blank inputs and DEFAULT mode applies.

Likely an incorrect or invalid review comment.

core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (2)

196-198: Good: directory prep before write

Ensures parent dirs exist (or skipped for object stores) before fs.create.


219-220: Config key addition is fine

Constant aligns with YAML and constructor wiring.

Comment on lines 149 to 169
customLineagePath:
# OPTIONAL: Custom path for centralized lineage storage.
# If left empty, null, or not specified → DEFAULT MODE: lineage written alongside target data files
# If set to a path → CENTRALIZED MODE: all lineage written to this location with unique filenames
#
# CENTRALIZED MODE filename format: {timestamp}_{fileName}_{appId}
# - timestamp: Human-readable UTC timestamp (yyyy-MM-dd_HH-mm-ss-SSS) for natural chronological sorting and easy filtering
# Example: 2025-10-12_14-30-45-123
# - fileName: The configured fileName value (e.g., "my_file.parq_LINEAGE")
# - appId: Spark application ID for traceability to specific runs
# Example: app-20251012143045-0001
#
# More examples:
# - Local: customLineagePath: /my/centralized/lineage
# Output: /my/centralized/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
# - S3: customLineagePath: s3://my-bucket/lineage
# Output: s3://my-bucket/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
# - GCS: customLineagePath: gs://my-bucket/lineage
# Output: gs://my-bucket/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
# - HDFS: customLineagePath: hdfs://cluster/lineage
# Output: hdfs://cluster/lineage/2025-10-12_14-30-45-123_my_file.parq_LINEAGE_app-20251012143045-0001
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Docs imply gs/abfs/hdfs scheme support that the code doesn’t yet provide

YAML examples list S3, GCS (gs://), Azure (abfs/wasb), and hdfs://. Current FS resolution only special‑cases S3 and otherwise uses the default FS, which won’t work for gs/abfs/wasb/adl or hdfs:// URIs with authority. Either (a) narrow the examples, or (b) implement generic scheme‑aware FS resolution (recommended). See HDFSLineageDispatcher.pathStringToFsWithPath. Also, ensure blanks/whitespace are treated as default mode per these comments.


🏁 Script executed:

#!/bin/bash
set -e
# Locate the HDFSLineageDispatcher definition
rg -n "HDFSLineageDispatcher" -g '*.scala'
# Inspect the pathStringToFsWithPath method implementation
rg -n "pathStringToFsWithPath" -g '*.scala' -nC5

Length of output: 2044


🏁 Script executed:

#!/bin/bash
set -e
# Inspect pathStringToFsWithPath implementation
rg -nC5 "def pathStringToFsWithPath" -g 'core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala'
# Inspect customLineagePath blank or whitespace handling
rg -nC5 "customLineagePath" -g 'core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala'

Length of output: 10232


Align docs and code for customLineagePath schemes and blank handling

  • Doc examples list gs://, abfs:///wasb://, hdfs:// URIs, but HDFSLineageDispatcher.pathStringToFsWithPath only special-cases S3 (via toSimpleS3Location) and falls back to default FS; either narrow examples to S3/local or implement generic scheme-aware resolution (e.g. URI.getScheme → FileSystem.get)
  • Empty or whitespace customLineagePath currently matches Some("") and enters centralized mode; strip/null/blank values must map to default mode per docs
🤖 Prompt for AI Agents
core/src/main/resources/spline.default.yaml lines 149-169: docs and code diverge
— implement scheme-aware resolution and treat blank values as default mode.
Update HDFSLineageDispatcher.pathStringToFsWithPath so that it first trims the
input and returns None/empty (i.e., default mode) for null/empty/whitespace; for
non-empty values, parse the string as a URI, inspect getScheme(), and resolve
the target FileSystem via FileSystem.get(URI, conf) (fallback to local FS when
scheme is null or unsupported), preserving existing s3 handling via
toSimpleS3Location only if still necessary; ensure the returned path preserves
the original path segment. Also update/align the YAML examples to only show
schemes actually supported or confirm support for gs/abfs by adding their scheme
handling in the resolver.

Comment on lines 155 to 171
private def ensureParentDirectoriesExist(fs: FileSystem, path: Path): Unit = {
// Object storage systems (S3, GCS, Azure Blob) don't have real directories - they're just key prefixes
// Skip directory creation for these systems to avoid unnecessary operations
val fsScheme = fs.getUri.getScheme
val isObjectStorage = fsScheme != null && (
fsScheme.startsWith("s3") || // S3: s3, s3a, s3n
fsScheme.startsWith("gs") || // Google Cloud Storage: gs
fsScheme.startsWith("wasb") || // Azure Blob Storage: wasb, wasbs
fsScheme.startsWith("abfs") || // Azure Data Lake Storage Gen2: abfs, abfss
fsScheme.startsWith("adl") // Azure Data Lake Storage Gen1: adl
)

if (isObjectStorage) {
logDebug(s"Skipping directory creation for object storage filesystem ($fsScheme) - directories are implicit key prefixes")
return
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Scheme checks should be case-insensitive; and backend support is incomplete

  • Use toLowerCase(Locale.ROOT) on scheme before startsWith to be robust.
  • You skip mkdirs for gs/wasb/abfs/adl, but pathStringToFsWithPath doesn’t actually resolve these schemes; writes will likely fail with Wrong FS. Implement generic FS resolution (see below).

Apply:

-    val fsScheme = fs.getUri.getScheme
-    val isObjectStorage = fsScheme != null && (
-      fsScheme.startsWith("s3") ||
-      fsScheme.startsWith("gs") ||
-      fsScheme.startsWith("wasb") ||
-      fsScheme.startsWith("abfs") ||
-      fsScheme.startsWith("adl")
-    )
+    val fsSchemeRaw = fs.getUri.getScheme
+    val scheme = Option(fsSchemeRaw).map(_.toLowerCase(java.util.Locale.ROOT)).orNull
+    val isObjectStorage = scheme != null && (
+      scheme.startsWith("s3") ||     // s3, s3a, s3n
+      scheme.startsWith("gs") ||     // Google Cloud Storage
+      scheme.startsWith("wasb") ||   // Azure Blob Storage
+      scheme.startsWith("abfs") ||   // Azure Data Lake Gen2
+      scheme.startsWith("adl")       // Azure Data Lake Gen1
+    )
-    if (isObjectStorage) {
-      logDebug(s"Skipping directory creation for object storage filesystem ($fsScheme) - directories are implicit key prefixes")
+    if (isObjectStorage) {
+      logDebug(s"Skipping directory creation for object storage filesystem ($scheme) - directories are implicit key prefixes")
       return
     }

Additionally, replace pathStringToFsWithPath with a scheme‑aware implementation:

// Replace the whole method with this simpler, generic version
def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
  val path = new Path(pathString)
  val fs = path.getFileSystem(HadoopConfiguration)
  (fs, path)
}

This natively supports hdfs://, s3/s3a, gs://, wasb(s)://, abfs(s)://, etc.

🤖 Prompt for AI Agents
In
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala
around lines 155 to 171, the filesystem scheme checks are case-sensitive and the
code skips mkdirs for several object storage schemes while
pathStringToFsWithPath does not resolve those schemes; change the scheme
normalization to use toLowerCase(Locale.ROOT) before the startsWith checks so
comparisons are case-insensitive, and replace the existing
pathStringToFsWithPath implementation with a generic, scheme-aware approach that
creates a Path from the path string and calls
path.getFileSystem(HadoopConfiguration) to obtain the correct FileSystem and
Path tuple (this removes the need to special-case s3/gs/wasb/abfs/adl and
ensures writes resolve to the proper backend).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (3)

231-244: Incomplete backend support for object storage schemes.

This issue was previously flagged but has not been addressed. The pathStringToFsWithPath method only handles S3 specially; paths using gs://, wasb://, abfs://, or adl:// fall through to the default case, which retrieves the default filesystem instead of the correct one for those schemes. Writes will fail with "Wrong FS".

Replace the entire method with the Hadoop-idiomatic approach that natively supports all schemes:

-  def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
-    pathString.toSimpleS3Location match {
-      case Some(s3Location) =>
-        val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
-        val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>
-
-        val fs = FileSystem.get(s3Uri, HadoopConfiguration)
-        (fs, s3Path)
-
-      case None => // local hdfs location
-        val fs = FileSystem.get(HadoopConfiguration)
-        (fs, new Path(pathString))
-    }
-  }
+  def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
+    val path = new Path(pathString)
+    val fs = path.getFileSystem(HadoopConfiguration)
+    (fs, path)
+  }

This natively supports hdfs://, s3/s3a/s3n://, gs://, wasb(s)://, abfs(s)://, adl://, and file:// schemes.

Based on past review comments.


71-72: Normalize customLineagePath on read (handle empty/whitespace).

This issue was previously flagged but has not been addressed. Without trimming and filtering, Some("") and Some(" ") are treated as centralized mode, causing paths like "/<filename>".

Apply this diff to normalize the value:

   def this(conf: Configuration) = this(
     filename = conf.getRequiredString(FileNameKey),
     permission = new FsPermission(conf.getRequiredObject(FilePermissionsKey).toString),
     bufferSize = conf.getRequiredInt(BufferSizeKey),
-    customLineagePath = conf.getOptionalString(CustomLineagePathKey)
+    customLineagePath = conf
+      .getOptionalString(CustomLineagePathKey)
+      .map(_.trim)
+      .filter(_.nonEmpty)
   )

Based on past review comments.


158-165: Scheme checks should be case-insensitive.

This issue was previously flagged but has not been addressed. Filesystem schemes can appear in different cases (e.g., "S3" vs "s3"), and the current code only checks lowercase variants.

Apply this diff to normalize the scheme:

     val fsScheme = fs.getUri.getScheme
-    val isObjectStorage = fsScheme != null && (
-      fsScheme.startsWith("s3") ||      // S3: s3, s3a, s3n
-      fsScheme.startsWith("gs") ||      // Google Cloud Storage: gs
-      fsScheme.startsWith("wasb") ||    // Azure Blob Storage: wasb, wasbs
-      fsScheme.startsWith("abfs") ||    // Azure Data Lake Storage Gen2: abfs, abfss
-      fsScheme.startsWith("adl")        // Azure Data Lake Storage Gen1: adl
+    val scheme = Option(fsScheme).map(_.toLowerCase(java.util.Locale.ROOT)).orNull
+    val isObjectStorage = scheme != null && (
+      scheme.startsWith("s3") ||      // S3: s3, s3a, s3n
+      scheme.startsWith("gs") ||      // Google Cloud Storage: gs
+      scheme.startsWith("wasb") ||    // Azure Blob Storage: wasb, wasbs
+      scheme.startsWith("abfs") ||    // Azure Data Lake Storage Gen2: abfs, abfss
+      scheme.startsWith("adl")        // Azure Data Lake Storage Gen1: adl
     )
     
     if (isObjectStorage) {
-      logDebug(s"Skipping directory creation for object storage filesystem ($fsScheme) - directories are implicit key prefixes")
+      logDebug(s"Skipping directory creation for object storage filesystem ($scheme) - directories are implicit key prefixes")
       return
     }

Based on past review comments.

🧹 Nitpick comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

137-143: Consider caching the DateTimeFormatter.

The implementation is correct, but the DateTimeFormatter could be extracted as a private constant since it's immutable and thread-safe, avoiding repeated instantiation on every call.

Apply this diff to cache the formatter:

+  private val TimestampFormatter = DateTimeFormatter
+    .ofPattern("yyyy-MM-dd_HH-mm-ss-SSS")
+    .withZone(ZoneId.of("UTC"))
+
   private def generateUniqueFilename(planId: String): String = {
     val sparkContext = SparkContext.getOrCreate()
     val appId = sparkContext.applicationId
-    val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
-    val timestamp = dateFormatter.format(Instant.now())
+    val timestamp = TimestampFormatter.format(Instant.now())
     s"${timestamp}_${filename}_${appId}"
   }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6f0ecdf and b064041.

📒 Files selected for processing (1)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (5)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5)

33-34: LGTM!

The time-related imports are appropriate for the timestamp-based unique filename generation in centralized mode.


45-62: LGTM!

The documentation clearly explains the two operating modes (default and centralized) with concrete examples and the rationale for the timestamp-first filename format. This will help users understand when and how to use the centralized lineage feature.


103-122: LGTM!

The path resolution logic correctly handles both centralized and default modes. The defensive stripSuffix("/") on line 115 prevents double-slash issues, and using the write operation's outputSource on line 120 maintains backward compatibility with the default mode.


196-197: LGTM!

The directory creation call is correctly placed before the write operation, ensuring parent directories exist with proper permissions for multi-user access in centralized mode.


219-219: LGTM!

The configuration key constant follows the existing pattern and enables reading the centralized lineage path from configuration.

Comment on lines 177 to 180
val created = fs.mkdirs(parentDir, permission)
if (created) {
logInfo(s"Created parent directories: $parentDir with permissions $permission")
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use directory-appropriate permissions.

Directories created with file permissions (e.g., 0644) lack execute bits and are non-traversable, defeating the "multi-user friendly permissions" goal stated in the comment. Execute permission is required for directory access.

Apply this diff to use directory-appropriate permissions:

       try {
-        // Create directories with multi-user friendly permissions to allow all service accounts to write
-        // This uses the same permission object that's already configured for files
-        val created = fs.mkdirs(parentDir, permission)
+        // Create directories with multi-user friendly permissions (execute bits for traversal)
+        // Add execute bits to the file permissions to make directories traversable
+        val dirPermission = new FsPermission(
+          permission.getUserAction.or(FsAction.EXECUTE),
+          permission.getGroupAction.or(FsAction.EXECUTE),
+          permission.getOtherAction.or(FsAction.EXECUTE)
+        )
+        val created = fs.mkdirs(parentDir, dirPermission)
         if (created) {
-          logInfo(s"Created parent directories: $parentDir with permissions $permission")
+          logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
val created = fs.mkdirs(parentDir, permission)
if (created) {
logInfo(s"Created parent directories: $parentDir with permissions $permission")
}
try {
// Create directories with multi-user friendly permissions (execute bits for traversal)
// Add execute bits to the file permissions to make directories traversable
val dirPermission = new FsPermission(
permission.getUserAction.or(FsAction.EXECUTE),
permission.getGroupAction.or(FsAction.EXECUTE),
permission.getOtherAction.or(FsAction.EXECUTE)
)
val created = fs.mkdirs(parentDir, dirPermission)
if (created) {
logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (2)

174-179: Ensure directory permissions include execute bits for traversal.

Creating directories with the file permission object (e.g., 0644) omits execute bits, making the directories non-traversable for other users—exactly the scenario centralized mode is intended to support. Please derive a directory permission that ORs in FsAction.EXECUTE for user/group/other before calling mkdirs.

-        val created = fs.mkdirs(parentDir, permission)
-        if (created) {
-          logInfo(s"Created parent directories: $parentDir with permissions $permission")
+        val dirPermission = new FsPermission(
+          permission.getUserAction.or(FsAction.EXECUTE),
+          permission.getGroupAction.or(FsAction.EXECUTE),
+          permission.getOtherAction.or(FsAction.EXECUTE)
+        )
+        val created = fs.mkdirs(parentDir, dirPermission)
+        if (created) {
+          logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")

156-164: Undefined identifier fsSchemeRaw (compile blocker).

fsSchemeRaw is not defined; the code meant to use the scheme extracted from fs.getUri. Rename the binding accordingly to restore compilation.

-    val fsScheme = fs.getUri.getScheme
-    val scheme = Option(fsSchemeRaw).map(_.toLowerCase(java.util.Locale.ROOT)).orNull
+    val fsSchemeRaw = fs.getUri.getScheme
+    val scheme = Option(fsSchemeRaw).map(_.toLowerCase(java.util.Locale.ROOT)).orNull
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b064041 and 578c876.

📒 Files selected for processing (2)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

176-182: Directory permissions still lack execute bits (unaddressed from previous review).

As previously flagged, directories created with file permissions (e.g., 0644) lack execute bits and are non-traversable, preventing the intended "multi-user friendly permissions" behavior. Execute permission is required for directory access.

Apply this diff to use directory-appropriate permissions:

       try {
-        // Create directories with multi-user friendly permissions to allow all service accounts to write
-        // This uses the same permission object that's already configured for files
-        val created = fs.mkdirs(parentDir, permission)
+        // Create directories with multi-user friendly permissions (execute bits for traversal)
+        // Add execute bits to the file permissions to make directories traversable
+        val dirPermission = new FsPermission(
+          permission.getUserAction.or(FsAction.EXECUTE),
+          permission.getGroupAction.or(FsAction.EXECUTE),
+          permission.getOtherAction.or(FsAction.EXECUTE)
+        )
+        val created = fs.mkdirs(parentDir, dirPermission)
         if (created) {
-          logInfo(s"Created parent directories: $parentDir with permissions $permission")
+          logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
         }

Additionally, import FsAction at the top of the file:

import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
🧹 Nitpick comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

233-246: Document pathStringToFsWithPath limitations for centralized mode.

The current implementation only resolves S3 paths and local HDFS, but centralized mode users may specify gs://, wasb://, abfs://, or non-local HDFS paths (hdfs://nameservice:8020/...). These will fall through to the local HDFS case and likely fail with "Wrong FS" errors.

Consider documenting these limitations in the class-level documentation or configuration guide, and optionally implement the generic resolution suggested in past reviews:

def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
  // Generic approach: let Hadoop resolve any scheme
  val path = new Path(pathString)
  val fs = path.getFileSystem(HadoopConfiguration)
  (fs, path)
}

This would natively support all Hadoop-compatible schemes without special-casing.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 04eebe3 and 5f63f61.

📒 Files selected for processing (3)
  • core/src/main/resources/spline.default.yaml (1 hunks)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • core/src/main/resources/spline.default.yaml
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (5)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5)

33-34: LGTM!

The DateTime imports support the new timestamp-based filename generation for centralized mode.


46-62: LGTM!

Excellent documentation that clearly explains both operation modes, the centralized filename format, and filesystem handling nuances.


103-122: LGTM!

The path resolution logic correctly implements both DEFAULT and CENTRALIZED modes, properly handling trailing slashes and constructing appropriate paths.


198-199: LGTM!

Correctly ensures parent directories exist before writing the lineage file.


221-221: LGTM!

The new configuration key constant is correctly defined.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

233-246: Use Path.getFileSystem to handle all schemes in pathStringToFsWithPath
pathStringToFsWithPath currently only handles S3 explicitly and defaults everything else to HDFS, causing “Wrong FS” errors for gs://, wasb://, abfs://, adl://, etc. Replace the method body with:

val path = new Path(pathString)
val fs   = path.getFileSystem(HadoopConfiguration)
(fs, path)

This natively resolves any Hadoop-configured scheme.

♻️ Duplicate comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

179-182: Directory permissions still lack execute bits (from previous review).

Directories created with file permissions typically lack execute bits, making them non-traversable. This defeats the "multi-user friendly permissions" goal. The previous review comment on these lines correctly identified this issue, but it was not fully addressed.

Apply this diff to use directory-appropriate permissions:

       try {
-        // Create directories with multi-user friendly permissions to allow all service accounts to write
-        // This uses the same permission object that's already configured for files
-        val created = fs.mkdirs(parentDir, permission)
+        // Create directories with multi-user friendly permissions (add execute bits for traversal)
+        val dirPermission = new FsPermission(
+          permission.getUserAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+          permission.getGroupAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+          permission.getOtherAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE)
+        )
+        val created = fs.mkdirs(parentDir, dirPermission)
         if (created) {
-          logInfo(s"Created parent directories: $parentDir with permissions $permission")
+          logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
         }

Based on previous review comment.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 04eebe3 and 5f63f61.

📒 Files selected for processing (3)
  • core/src/main/resources/spline.default.yaml (1 hunks)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • core/src/main/resources/spline.default.yaml
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (6)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (6)

46-62: LGTM! Clear and comprehensive documentation.

The documentation clearly distinguishes between default and centralized modes, explains the filename format rationale, and includes important operational details about object storage behavior.


64-64: LGTM! Constructor signature properly extended.

The addition of customLineagePath: Option[String] cleanly extends the constructor to support centralized mode while maintaining backward compatibility.


68-73: LGTM! Proper normalization of customLineagePath.

The normalization logic correctly handles empty and whitespace-only strings by trimming and filtering, ensuring that Some("") and Some(" ") are converted to None. This aligns with the intended behavior documented in the class.


103-122: LGTM! Path resolution logic is clear and correct.

The method properly implements the two-mode behavior, with clean path normalization using stripSuffix("/") to handle trailing slashes consistently. The centralized mode correctly generates unique filenames while the default mode preserves existing behavior.


198-200: LGTM! Proper integration of directory creation.

The call to ensureParentDirectoriesExist is correctly placed before file creation, ensuring parent directories are available when needed.


221-221: LGTM! Configuration key follows established patterns.

The CustomLineagePathKey constant follows the same pattern as existing configuration keys in the companion object.

Comment on lines 137 to 144
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Sanitize appName and appId to prevent filesystem path issues.

The Spark application name can contain spaces, slashes, or special characters that may cause filesystem path issues. While the filename format is well-designed for sorting, unsanitized names can break path construction.

Apply this diff to sanitize the application metadata:

   private def generateUniqueFilename(): String = {
     val sparkContext = SparkContext.getOrCreate()
-    val appName = sparkContext.appName
-    val appId = sparkContext.applicationId
+    val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
+    val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
     val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
     val timestamp = dateFormatter.format(Instant.now())
     s"${timestamp}_${appName}_${appId}"
   }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName
val appId = sparkContext.applicationId
val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
private def generateUniqueFilename(): String = {
val sparkContext = SparkContext.getOrCreate()
val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
val dateFormatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS")
.withZone(ZoneId.of("UTC"))
val timestamp = dateFormatter.format(Instant.now())
s"${timestamp}_${appName}_${appId}"
}
🤖 Prompt for AI Agents
In
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala
around lines 137 to 144, the generated filename uses raw sparkContext.appName
and sparkContext.applicationId which may contain spaces, slashes or special
characters that break filesystem paths; sanitize both values before composing
the filename by replacing or removing unsafe characters (e.g., replace
non-alphanumeric, dash, underscore characters with underscore), trim length if
necessary, and then build the filename using the sanitized appName and appId so
the resulting string is safe for use in HDFS paths.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (3)

239-252: Centralised paths still unsupported for most schemes

pathStringToFsWithPath still special-cases only the legacy SimpleS3 helper and otherwise falls back to the cluster-default FS, so values like hdfs://nameservice/..., s3a://..., abfss://..., etc., resolve to the wrong backend in centralized mode. Use new Path(pathString) plus path.getFileSystem(HadoopConfiguration) to obtain the correct FS generically.


176-182: Directory permissions must include execute bits

Creating parent dirs with the file permission (e.g. 0644) omits the execute bit, making the directory non-traversable and preventing lineage writes immediately afterwards. Derive a directory-specific permission (permission.withExe(...), or manual FsPermission with or(FsAction.EXECUTE)) before calling mkdirs.


139-144: Sanitize app metadata before embedding in filenames

appName/applicationId can carry spaces or path separators, producing invalid/ambiguous filenames (e.g. "My Job" or "app-2025/10"). Normalize them (e.g. replace [^A-Za-z0-9._-] with _) before concatenation.

🧹 Nitpick comments (4)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)

40-44: Configuration constants are well-structured.

The constants clearly define configuration keys and make the tests more maintainable. While the names are verbose (e.g., lineageDispatcherConfigCustomLineagePathKeyName), they are descriptive and self-documenting.

If you prefer more concise names, consider abbreviations like:

-  val lineageDispatcherConfigKeyName = "spark.spline.lineageDispatcher"
-  val lineageDispatcherConfigValueName = "hdfs"
-  val lineageDispatcherConfigClassNameKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.className"
-  val lineageDispatcherConfigCustomLineagePathKeyName = s"$lineageDispatcherConfigKeyName.$lineageDispatcherConfigValueName.customLineagePath"
+  val dispatcherKey = "spark.spline.lineageDispatcher"
+  val dispatcherName = "hdfs"
+  val classNameKey = s"$dispatcherKey.$dispatcherName.className"
+  val customPathKey = s"$dispatcherKey.$dispatcherName.customLineagePath"

101-105: Incomplete filename pattern validation.

The comment describes the expected format as {timestamp}_{appName}_{appId}, but the regex on line 105 only validates the timestamp prefix. Consider enhancing the validation to verify the complete filename structure.

Apply this diff to validate the full pattern:

-          // Verify filename format: {timestamp}_{appName}_{appId}
           val filename = lineageFile.getName
-          // Should match pattern: yyyy-MM-dd_HH-mm-ss-SSS_{appName}_app-...
-          // AppName and AppId are part of the filename
-          filename should startWith regex """\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{3}"""
+          // Verify filename format: {timestamp}_{appName}_{appId}
+          // Pattern: yyyy-MM-dd_HH-mm-ss-SSS_{appName}_app-{appId}
+          filename should fullyMatch regex """\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{3}_.+_app-.+"""

116-168: Test coverage for edge cases is excellent.

The tests correctly validate that empty string and whitespace-only values for customLineagePath fall back to DEFAULT mode behavior. However, there's significant code duplication between these tests and the first DEFAULT mode test (lines 46-70).

Consider extracting common logic into a helper method to reduce duplication:

private def testDefaultModeLineageWriting(
  additionalConfig: SparkSession.Builder => SparkSession.Builder
)(implicit spark: SparkSession): Future[Assertion] = {
  import spark.implicits._
  val dummyDF = Seq((1, 2)).toDF
  val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit()
  
  withLineageTracking { captor =>
    for {
      (_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString))
    } yield {
      val lineageFile = new File(destPath.asString, "_LINEAGE")
      lineageFile.exists should be(true)
      lineageFile.length should be > 0L
      
      val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
      lineageJson should contain key "executionPlan"
      lineageJson should contain key "executionEvent"
      lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId"))
    }
  }
}

// Then use it in tests:
it should "save lineage file to a filesystem in DEFAULT mode" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
  withIsolatedSparkSession(_
    .config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
    .config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
  ) { implicit spark =>
    testDefaultModeLineageWriting(identity)
  }
}

72-114: Consider adding a test for multiple writes to verify centralized collection.

While the current tests validate the centralized mode functionality, consider adding a test that performs multiple writes to different destinations and verifies that all lineage files are collected in the single centralized location. This would better validate the "gathering lineage across many service accounts and disparate HDFS paths" use case mentioned in the PR description.

Example test structure:

it should "collect lineage from multiple destinations in centralized location" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
  val centralizedPath = TempDirectory("spline_centralized_", "", pathOnly = true).deleteOnExit()
  
  withIsolatedSparkSession(_
    .config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
    .config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
    .config(lineageDispatcherConfigCustomLineagePathKeyName, centralizedPath.asString)
  ) { implicit spark =>
    withLineageTracking { captor =>
      import spark.implicits._
      val destPath1 = TempDirectory("spline_1_", destFilePathExtension, pathOnly = true).deleteOnExit()
      val destPath2 = TempDirectory("spline_2_", destFilePathExtension, pathOnly = true).deleteOnExit()
      
      for {
        _ <- captor.lineageOf(Seq((1, 2)).toDF.write.save(destPath1.asString))
        _ <- captor.lineageOf(Seq((3, 4)).toDF.write.save(destPath2.asString))
      } yield {
        val centralizedDir = new File(centralizedPath.asString)
        val lineageFiles = Option(centralizedDir.listFiles())
        lineageFiles.get.length should be(2) // Two lineage files from two writes
      }
    }
  }
}
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5f63f61 and 8bb78a1.

📒 Files selected for processing (2)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HttpOpenLineageDispatcher.scala (2)
  • this (37-41)
  • this (43-43)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/KafkaLineageDispatcher.scala (1)
  • this (42-50)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HttpLineageDispatcher.scala (2)
  • this (45-52)
  • this (52-52)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/ConsoleLineageDispatcher.scala (1)
  • this (30-32)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/LoggingLineageDispatcher.scala (1)
  • this (29-31)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

46-70: DEFAULT mode test is well-structured and comprehensive.

The test correctly validates:

  • Lineage file is written to the destination directory
  • File contains valid JSON with executionPlan and executionEvent
  • Plan and event IDs match as expected

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

72-124: Reduce code duplication across DEFAULT mode tests.

The three DEFAULT mode tests (lines 46-70, 72-97, 99-124) share nearly identical test logic, differing only in the customLineagePath configuration value. This duplication violates the DRY principle and increases maintenance burden.

Consider refactoring to use a parameterized test approach:

+  Seq(
+    ("without customLineagePath config", None),
+    ("with empty string customLineagePath", Some("")),
+    ("with whitespace-only customLineagePath", Some("   "))
+  ).foreach { case (scenarioDesc, customPathValue) =>
+    it should s"use DEFAULT mode $scenarioDesc" taggedAs ignoreIf(ver"$SPARK_VERSION" < ver"2.3") in {
+      val builder = (b: SparkSession.Builder) => {
+        val configured = b
+          .config(lineageDispatcherConfigKeyName, lineageDispatcherConfigValueName)
+          .config(lineageDispatcherConfigClassNameKeyName, classOf[HDFSLineageDispatcher].getName)
+        customPathValue.fold(configured)(path => configured.config(lineageDispatcherConfigCustomLineagePathKeyName, path))
+      }
+      
+      withIsolatedSparkSession(builder) { implicit spark =>
+        withLineageTracking { captor =>
+          import spark.implicits._
+          val dummyDF = Seq((1, 2)).toDF
+          val destPath = TempDirectory("spline_", destFilePathExtension, pathOnly = true).deleteOnExit()
+
+          for {
+            (_, _) <- captor.lineageOf(dummyDF.write.save(destPath.asString))
+          } yield {
+            val lineageFile = new File(destPath.asString, "_LINEAGE")
+            lineageFile.exists should be(true)
+            lineageFile.length should be > 0L
+
+            val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
+            lineageJson should contain key "executionPlan"
+            lineageJson should contain key "executionEvent"
+            lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId"))
+          }
+        }
+      }
+    }
+  }

Alternatively, extract a shared helper method for the common test logic.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 32db7ac and 30f2f40.

📒 Files selected for processing (1)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
🔇 Additional comments (3)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (3)

40-45: LGTM! Well-structured configuration constants.

The configuration constants are appropriately defined and follow consistent naming conventions. They're used effectively throughout the test suite.


46-70: Test logic is correct and comprehensive.

The test properly validates DEFAULT mode behavior, including filesystem write, JSON structure, and plan-event correlation. The assertions are appropriate and the async test pattern is used correctly.


126-175: Excellent comprehensive test for centralized lineage path.

This test thoroughly validates the centralized path feature:

  • Multiple writes verify unique filename generation
  • Assertions confirm lineage files are written to the centralized location
  • Negative assertions verify lineage files are NOT written to destination directories
  • JSON content validation ensures correctness

The test structure and assertions are well-designed and comprehensive.

@rkrumins rkrumins changed the title Extending HDFS Lineage Dispatcher to support writing lineage files to a centralised location WIP: Extending HDFS Lineage Dispatcher to support writing lineage files to a centralised location Oct 13, 2025
…acters is covered and updated integration tests for HDFSLineageDispatcherSpec
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (3)

243-256: Fix FS resolution for all schemes (current method breaks non‑S3 URIs)

The current pathStringToFsWithPath only handles S3 and otherwise uses the default FS, causing Wrong FS errors for hdfs://, abfs(s)://, wasb(s)://, gs://, etc. Use Path.getFileSystem(conf) to resolve by scheme.

Apply:

-  def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
-    pathString.toSimpleS3Location match {
-      case Some(s3Location) =>
-        val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
-        val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>
-
-        val fs = FileSystem.get(s3Uri, HadoopConfiguration)
-        (fs, s3Path)
-
-      case None => // local hdfs location
-        val fs = FileSystem.get(HadoopConfiguration)
-        (fs, new Path(pathString))
-    }
-  }
+  def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
+    val path = new Path(pathString)
+    val fs = path.getFileSystem(HadoopConfiguration)
+    (fs, path)
+  }

Additionally remove now-unused imports:

- import za.co.absa.spline.commons.s3.SimpleS3Location.SimpleS3LocationExt
- import java.net.URI

176-183: Use directory-appropriate perms for mkdirs (add execute bits)

Creating directories with file permissions may omit execute bits, making dirs non-traversable for group/others.

Apply:

-          // Create directories with multi-user friendly permissions to allow all service accounts to write
-          // This uses the same permission object that's already configured for files
-          val created = fs.mkdirs(parentDir, permission)
-          if (created) {
-            logInfo(s"Created parent directories: $parentDir with permissions $permission")
-          }
+          // Ensure execute bits so directories are traversable
+          val dirPermission = new FsPermission(
+            permission.getUserAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getGroupAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getOtherAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE)
+          )
+          val created = fs.mkdirs(parentDir, dirPermission)
+          if (created) {
+            logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
+          }

138-145: Also sanitize appId when generating filenames

appId may contain characters unsafe for filesystems. You already sanitize appName; do the same for appId.

-    val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
-    val appId = sparkContext.applicationId
+    val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
+    val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
🧹 Nitpick comments (2)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (2)

215-217: Remove no-op umask code

These lines compute a value and don't use it; safe to delete.

-    val umask = FsPermission.getUMask(fs.getConf)
-    FsPermission.getFileDefault.applyUMask(umask)

218-221: Prefer StandardCharsets.UTF_8

Avoid string-based charset names.

-    using(outputStream) {
-      _.write(content.getBytes("UTF-8"))
-    }
+    using(outputStream) {
+      _.write(content.getBytes(java.nio.charset.StandardCharsets.UTF_8))
+    }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 30f2f40 and 87936b8.

📒 Files selected for processing (2)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (3)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

69-74: LGTM: normalized centralized path on read

Trimming and filtering empty values for customLineagePath is correct and matches config expectations.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (3)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (3)

243-256: Make FS resolution scheme-aware (supports hdfs://, s3a, abfs, wasb, gs, etc.)

Current resolver only special-cases s3 and otherwise uses default FS, breaking for hdfs://nameservice, s3a://, abfs://, wasb://, gs://, etc.

Apply:

-  def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
-    pathString.toSimpleS3Location match {
-      case Some(s3Location) =>
-        val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
-        val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>
-
-        val fs = FileSystem.get(s3Uri, HadoopConfiguration)
-        (fs, s3Path)
-
-      case None => // local hdfs location
-        val fs = FileSystem.get(HadoopConfiguration)
-        (fs, new Path(pathString))
-    }
-  }
+  def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
+    val path = new Path(pathString)
+    val fs = path.getFileSystem(HadoopConfiguration)
+    (fs, path)
+  }

Also update the Scaladoc above accordingly (remove “non-local HDFS not supported”).


138-145: Sanitize appId too (filesystem-safe filenames)

appId may contain characters unsafe for some backends. Sanitize like appName.

Apply:

   private def generateUniqueFilename(): String = {
     val sparkContext = SparkContext.getOrCreate()
-    val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9_-]", "_")
-    val appId = sparkContext.applicationId
+    val appName = sparkContext.appName.replaceAll("[^a-zA-Z0-9._-]", "_")
+    val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9._-]", "_")
     val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
     val timestamp = dateFormatter.format(Instant.now())
     s"${timestamp}_${appName}_${appId}"
   }

176-183: Use directory-appropriate permissions (add execute bits)

Creating dirs with file perms (e.g., 0644) makes them non-traversable, breaking multi-user access.

Apply:

-          // Create directories with multi-user friendly permissions to allow all service accounts to write
-          // This uses the same permission object that's already configured for files
-          val created = fs.mkdirs(parentDir, permission)
+          // Create directories with execute bits for traversal
+          val dirPermission = new FsPermission(
+            permission.getUserAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getGroupAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getOtherAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE)
+          )
+          val created = fs.mkdirs(parentDir, dirPermission)
           if (created) {
-            logInfo(s"Created parent directories: $parentDir with permissions $permission")
+            logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
           }
🧹 Nitpick comments (3)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (2)

104-123: Prefer Path-based join for robustness

When composing paths, use Hadoop Path to avoid double slashes and handle URIs correctly.

Apply:

-        val cleanCustomPath = customPath.stripSuffix("/")
-        val uniqueFilename = generateUniqueFilename()
-        s"$cleanCustomPath/$uniqueFilename"
+        val base = new Path(customPath)
+        new Path(base, generateUniqueFilename()).toString

201-222: UMask code is a no-op here

applyUMask result is unused; fs.create already applies server-side umask. Remove or use the result to compute an effective permission.

Apply:

-    val umask = FsPermission.getUMask(fs.getConf)
-    FsPermission.getFileDefault.applyUMask(umask)
+    // fs.create applies server-side umask; no client-side action needed here
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

149-150: Scalatest matcher composition

Use parentheses for ‘and’ composition.

Apply:

-            name should include (appId) and include (appNameCleaned)
+            name should (include (appId) and include (appNameCleaned))
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 30f2f40 and 87936b8.

📒 Files selected for processing (2)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (3)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (3)

69-74: Good: config normalization for customLineagePath

Trimming and filtering empty values avoids accidental centralized mode. LGTM.


91-99: Good: fixed resolveLineagePath() invocation

Zero-arg call resolves the prior compile issue. LGTM.


160-168: Good: case-insensitive scheme detection

Lowercasing the scheme before checks hardens object-storage detection. LGTM.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

141-141: Filename regex never matches (double-escaped backslashes).

The pattern literal uses """\\d{4}-...""", which produces the string \d{4}-.... Scala regexes consume single backslashes, so you must not double-escape when using triple-quoted strings. As written, filenamePattern.matches(name) always returns false, so the assertion never verifies the timestamp format.

Replace with a single backslash version:

-          val filenamePattern = """\\d{4}-\\d{2}-\\d{2}_\\d{2}-\\d{2}-\\d{2}-\\d{3}_.+_.+""".r
+          val filenamePattern = """\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}-\d{3}_.+_.+""".r
🧹 Nitpick comments (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

138-138: Remove debug println statements.

Debug print statements should be removed before merging to production code.

Apply this diff to remove the debug statements:

           val lineageFiles = Option(centralizedDir.listFiles()).getOrElse(Array.empty[File])
-          println(lineageFiles.map(_.getName).mkString(", "))

           // Verify naming convention aligns with centralized lineage pattern (timestamp_appName_appId)
           val filenamePattern = """\\d{4}-\\d{2}-\\d{2}_\\d{2}-\\d{2}-\\d{2}-\\d{3}_.+_.+""".r
           lineageFiles.foreach { file =>
             val name = file.getName
-            println(name)
             withClue(s"Lineage filename '$name' should follow the timestamp_appName_appId pattern") {

Also applies to: 144-144

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 87936b8 and 4e699dd.

📒 Files selected for processing (1)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (3)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (4)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)

40-44: LGTM!

The configuration constants are well-named and follow a consistent pattern. The new customLineagePath configuration key aligns with the centralized lineage feature.


46-70: LGTM!

The test has been appropriately renamed to clarify it tests DEFAULT mode behavior, and correctly migrated to use the new configuration constants.


72-106: LGTM!

The parameterized test structure effectively covers DEFAULT mode variations. The use of fold to conditionally apply configuration is idiomatic Scala.


158-162: LGTM!

The assertions correctly verify that no _LINEAGE files exist in the per-destination directories when centralized mode is enabled, confirming the centralized behavior works as expected.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

138-139: Remove debug println statements.

These debug statements should be removed before merging to production.

Apply this diff to remove the debug output:

           val lineageFiles = Option(centralizedDir.listFiles()).getOrElse(Array.empty[File])
-          println(lineageFiles.map(_.getName).mkString(", "))
-
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 87936b8 and 8a4b431.

📒 Files selected for processing (1)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (6)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (6)

29-29: Import added correctly.

The SparkSession import addresses the previous compilation issue. This fix is correct.


40-44: Good practice: centralized configuration keys.

Extracting configuration keys into constants improves maintainability and ensures consistency across tests.


46-70: LGTM!

The test correctly validates DEFAULT mode behavior using the new configuration constants. The assertion logic properly verifies lineage file creation and content.


72-106: LGTM!

The parameterized test structure effectively covers edge cases for customLineagePath (missing, empty, whitespace-only). The test logic correctly validates DEFAULT mode fallback behavior.


135-135: Previous issue resolved.

The replaceAll call now correctly uses a string pattern instead of a regex object. This fix is correct.


140-149: Previous issue resolved.

The regex pattern now correctly uses single backslashes in triple-quoted strings, which will properly match digit characters in filenames. This fix is correct.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2)

137-149: Add count assertion to verify both writes generated lineage files.

The test writes two datasets but doesn't assert that exactly 2 lineage files exist in the centralized directory. This was flagged in previous reviews.

Add this assertion after line 139:

           val lineageFiles = Option(centralizedDir.listFiles()).getOrElse(Array.empty[File])
           println("lineageFiles: " + lineageFiles.map(_.getName).mkString(", "))
+          lineageFiles should have length 2

152-156: Add planId consistency check to match DEFAULT mode tests.

The DEFAULT mode tests verify that executionPlan.id equals executionEvent.planId. The centralized test should include the same verification for consistency.

Apply this diff:

           lineageFiles.foreach { lineageFile =>
             val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
             lineageJson should contain key "executionPlan"
             lineageJson should contain key "executionEvent"
+            lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId"))
           }
🧹 Nitpick comments (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

138-138: Consider removing debug println statements.

The println statements at lines 138 and 144 appear to be debugging artifacts. Consider removing them or replacing with proper test logging if the output is valuable for troubleshooting test failures.

Also applies to: 144-144

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8a4b431 and 6bc068d.

📒 Files selected for processing (1)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Codacy Static Code Analysis
🔇 Additional comments (2)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (2)

40-44: LGTM: Configuration constants improve maintainability.

Extracting configuration keys and values into constants is a good refactoring that reduces duplication and makes tests easier to maintain.


72-106: LGTM: Parameterized tests provide thorough DEFAULT mode coverage.

The parameterized test structure effectively validates that empty and whitespace-only customLineagePath values fall back to DEFAULT mode behavior. The builder pattern is clean and the assertions correctly verify planId consistency.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

187-193: Add execute bits to directory permissions for traversal.

Directories created with file permissions (e.g., 0644) lack execute bits, making them non-traversable. This defeats the "multi-user friendly permissions" goal stated in the comment. Directory traversal requires execute permission.

Apply this diff to use directory-appropriate permissions:

         Try {
-          // Create directories with multi-user friendly permissions to allow all service accounts to write
-          // This uses the same permission object that's already configured for files
-          val created = fs.mkdirs(parentDir, permission)
+          // Create directories with multi-user friendly permissions (execute bits for traversal)
+          // Add execute bits to the file permissions to make directories traversable
+          val dirPermission = new FsPermission(
+            permission.getUserAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getGroupAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getOtherAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE)
+          )
+          val created = fs.mkdirs(parentDir, dirPermission)
           if (created) {
-            logInfo(s"Created parent directories: $parentDir with permissions $permission")
+            logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
           }
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

157-161: Add planId verification for consistency.

The JSON structure validation checks for required keys but doesn't verify that executionPlan.id matches executionEvent.planId, unlike the DEFAULT mode tests (lines 66, 101). This verification ensures data integrity between the plan and event.

Add this assertion inside the foreach block:

           lineageFilesOnly.foreach { lineageFile =>
             val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
             lineageJson should contain key "executionPlan"
             lineageJson should contain key "executionEvent"
+            lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId"))
           }
🧹 Nitpick comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

147-156: Defensive sanitization for appId.

While Spark application IDs are typically filesystem-safe (e.g., app-20251012143045-0001), there's no strict guarantee across all deployment modes and configurations. Consider sanitizing appId as a defensive measure.

Apply this diff:

   private def generateUniqueFilename(): String = {
     val sparkContext = SparkContext.getOrCreate()
     val planId = this._lastSeenPlan.id.getOrElse(
       throw new IllegalStateException("Execution plan ID is missing")
     ).toString
-    val appId = sparkContext.applicationId
+    val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
     val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
     val timestamp = dateFormatter.format(Instant.now())
     s"${timestamp}_${planId}_${appId}"
   }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4c7d723 and bebc869.

📒 Files selected for processing (2)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)
🔇 Additional comments (13)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (7)

33-36: LGTM!

The added imports support the new timestamp generation and error handling functionality.


65-74: LGTM!

The constructor properly normalizes customLineagePath by trimming and filtering empty values, ensuring that blank configurations are treated as None (default mode).


111-123: LGTM!

The path resolution logic correctly handles both modes: centralized storage with unique filenames and default alongside-data storage.


194-207: LGTM!

The error handling comprehensively covers all failure scenarios including the catch-all case, preventing MatchError exceptions.


216-220: LGTM!

The conditional directory creation correctly applies only to centralized mode, where parent directories may not exist.


242-242: LGTM!

The configuration key follows the existing naming pattern.


254-267: Ensure scheme-aware FileSystem resolution for all Hadoop-compatible schemes

  • pathStringToFsWithPath currently only special-cases S3 and falls back to the default FS for other URIs, which may break on gs://, wasb://, abfs://, adl://.
  • No tests or docs reference these schemes—please verify support or add coverage.
  • Consider replacing the special-case logic with:
  val path = new Path(pathString)
  val fs   = path.getFileSystem(HadoopConfiguration)
  (fs, path)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (6)

29-44: LGTM!

The SparkSession import and configuration constants are well-structured and follow proper naming conventions.


46-70: LGTM!

The test correctly verifies DEFAULT mode behavior, including proper JSON structure and planId consistency between executionPlan and executionEvent.


72-106: LGTM!

The parameterized tests provide excellent coverage of DEFAULT mode fallback scenarios (unset, empty, whitespace-only customLineagePath). The builder pattern is used correctly to conditionally apply configuration.


108-135: LGTM!

The test setup correctly initializes a centralized directory, writes multiple DataFrames, and prepares for filename validation. Deleting the directory first (line 110) ensures a clean test state.


136-154: LGTM!

The filename validation is thorough and correct:

  • Safely handles null from listFiles()
  • Verifies count matches expected writes
  • Validates timestamp_planId_appId pattern with proper regex
  • Uses withClue for clear error messages

163-169: LGTM!

The verification correctly confirms that lineage files are written only to the centralized location and not alongside the data destinations, validating the expected centralized mode behavior.

@rkrumins rkrumins changed the title WIP: Extending HDFS Lineage Dispatcher to support writing lineage files to a centralised location Extending HDFS Lineage Dispatcher to support writing lineage files to a centralised location Oct 13, 2025
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

254-267: Implement generic filesystem resolution for non-S3 object storage.

The current implementation only resolves S3 paths correctly. For GCS (gs://), Azure (wasb://, abfs://), and other schemes, the method falls through to the None case (line 263) and attempts to use them as local HDFS paths via FileSystem.get(HadoopConfiguration) (line 264), which returns the default filesystem instead of the scheme-specific one. This causes write failures when customLineagePath targets these backends, despite the object-storage detection logic (lines 173-179) correctly identifying them.

Replace the method with a simpler, scheme-aware implementation that leverages Hadoop's native filesystem resolution:

   def pathStringToFsWithPath(pathString: String): (FileSystem, Path) = {
-    pathString.toSimpleS3Location match {
-      case Some(s3Location) =>
-        val s3Uri = new URI(s3Location.asSimpleS3LocationString) // s3://<bucket>
-        val s3Path = new Path(s"/${s3Location.path}") // /<text-file-object-path>
-
-        val fs = FileSystem.get(s3Uri, HadoopConfiguration)
-        (fs, s3Path)
-
-      case None => // local hdfs location
-        val fs = FileSystem.get(HadoopConfiguration)
-        (fs, new Path(pathString))
-    }
+    // Generic implementation that automatically handles all filesystem schemes
+    // configured in Hadoop (hdfs://, s3://, s3a://, gs://, wasb://, abfs://, etc.)
+    val path = new Path(pathString)
+    val fs = path.getFileSystem(HadoopConfiguration)
+    (fs, path)
   }

This change removes the need for scheme-specific logic and correctly resolves all filesystem types configured in the Hadoop environment.

♻️ Duplicate comments (1)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

187-193: Add execute permissions for directory traversal.

Directories created with file permissions (typically 0644) lack execute bits and are non-traversable, breaking the multi-user centralized storage feature. Users need execute permission (x) to traverse directories, even if they only have write permission on files within them.

Apply this diff to add execute bits to directory permissions:

         Try {
-          // Create directories with multi-user friendly permissions to allow all service accounts to write
-          // This uses the same permission object that's already configured for files
-          val created = fs.mkdirs(parentDir, permission)
+          // Create directories with multi-user friendly permissions (execute bits required for traversal)
+          // Add execute bits to the file permissions to make directories traversable
+          val dirPermission = new FsPermission(
+            permission.getUserAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getGroupAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE),
+            permission.getOtherAction.or(org.apache.hadoop.fs.permission.FsAction.EXECUTE)
+          )
+          val created = fs.mkdirs(parentDir, dirPermission)
           if (created) {
-            logInfo(s"Created parent directories: $parentDir with permissions $permission")
+            logInfo(s"Created parent directories: $parentDir with permissions $dirPermission")
           }
🧹 Nitpick comments (2)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (1)

157-161: Add planId verification for consistency.

The centralized mode test verifies the JSON structure but doesn't confirm that executionPlan.id matches executionEvent.planId, unlike the DEFAULT mode tests (lines 66, 101). Adding this assertion would ensure executionPlan and executionEvent are properly paired.

Apply this diff to add the verification:

           // Verify each file has the correct format and content
           lineageFilesOnly.foreach { lineageFile =>
             val lineageJson = readFileToString(lineageFile, "UTF-8").fromJson[Map[String, Map[String, _]]]
             lineageJson should contain key "executionPlan"
             lineageJson should contain key "executionEvent"
+            lineageJson("executionPlan")("id") should equal(lineageJson("executionEvent")("planId"))
           }
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (1)

147-156: Consider sanitizing appId defensively.

While standard Spark application IDs (e.g., app-20251012-0001) are filesystem-safe, custom cluster managers might use different formats. Sanitizing appId would provide defense-in-depth against edge cases where special characters appear in application IDs.

Apply this diff to add sanitization:

   private def generateUniqueFilename(): String = {
     val sparkContext = SparkContext.getOrCreate()
     val planId = this._lastSeenPlan.id.getOrElse(
       throw new IllegalStateException("Execution plan ID is missing")
     ).toString
-    val appId = sparkContext.applicationId
+    val appId = sparkContext.applicationId.replaceAll("[^a-zA-Z0-9_-]", "_")
     val dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd_HH-mm-ss-SSS").withZone(ZoneId.of("UTC"))
     val timestamp = dateFormatter.format(Instant.now())
     s"${timestamp}_${planId}_${appId}"
   }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4c7d723 and bebc869.

📒 Files selected for processing (2)
  • core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (5 hunks)
  • integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcher.scala (2)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/HttpOpenLineageDispatcher.scala (2)
  • this (37-41)
  • this (43-43)
core/src/main/scala/za/co/absa/spline/harvester/dispatcher/KafkaLineageDispatcher.scala (1)
  • this (42-50)
integration-tests/src/test/scala/za/co/absa/spline/harvester/dispatcher/HDFSLineageDispatcherSpec.scala (4)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/SparkFixture.scala (1)
  • withIsolatedSparkSession (54-67)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/SplineFixture.scala (1)
  • withLineageTracking (23-25)
integration-tests/src/main/scala/za/co/absa/spline/test/fixture/spline/LineageCaptor.scala (1)
  • lineageOf (46-56)
core/src/main/scala/za/co/absa/spline/harvester/json/HarvesterJsonSerDe.scala (1)
  • fromJson (68-73)

@sonarqubecloud
Copy link

@rkrumins
Copy link
Author

rkrumins commented Oct 23, 2025

Hi @wajda and @cerveada! Wondering if either of you could take a look at this PR when you have a moment? Any feedback would be much appreciated. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants