Skip to content

feat: batch job history#150

Open
shydefoo wants to merge 17 commits intomainfrom
add-batch-job-history
Open

feat: batch job history#150
shydefoo wants to merge 17 commits intomainfrom
add-batch-job-history

Conversation

@shydefoo
Copy link
Collaborator

@shydefoo shydefoo commented Jun 6, 2025

Summary

  • This MR adds the feature to record each ingestion job as a batch job record in the database
  • The SDK is modified to retrieve a list of BatchJobRecord for users to inspect batch ingestion jobs triggered for each feature table
  • This capability can be extended to support historical retrieval jobs as well

Implementation

DB Schema changes

  • New table added, the changes can be found in db/migration/V3__BatchJobRecord.sql
  • The difference between a BatchJobRecord and Job is that a Job reflects real time data in the Kubernetes Cluster, whereas a BatchJobRecord refers to data persisted in the DB.

SparkApplication changes

  • New label added to each sparkapplication when it is created or updated: caraml.dev/record: <some-uuid>. This uuid is used as the BatchJobRecord ID in the DB
  • Each time the sparkapplication is created or updated, this label is updated, ensuring that each time an ingestion job is triggered, a new uuid is used, creating a new record in the DB
  • This also applies to SparkApplications created from a ScheduledSparkapplication

SparkApplication watcher

  • The watcher is run in a separate thread, and opens a Watcher to receive events from the K8s api-server
  • The watcher only receives events from SparkApplications that have the caraml.dev/record label.
  • Based on the events, the watcher will update the DB based on the SparkApplication state

Python SDK

  • New SDK method is added to list BatchJobRecords, with optional parameters start and end to filter BatchJobRecords within a certain time window.
  • Sample response:
start = datetime.now() - timedelta(days=1)
client = Client("localhost:6565")
response = client.list_batch_job_records("sample", "jaeger_driver_quality_hbase_2", start)
>> response[0]
id: "793df3fe-4c89-49ae-a9db-4b65ee1c8b85-1750832112447993441"
job_id: "caraml-0f5b6f4f4d69b7cb-1750832112447993441"
type: BATCH_INGESTION_JOB
status: JOB_STATUS_PENDING
job_start_time {
  seconds: 1750832115
}
job_end_time {
  seconds: -1
}
batch_ingestion {
  table_name: "jaeger_driver_quality_hbase_2"
  project: "sample"
  start_time_param {
    seconds: 1750659325
  }
  end_time_param {
    seconds: 1750832125
  }
}
spark_app_manifest: "...."

Others

  • I'm no java expert, so any feedback to improve code structure / best practices is welcomed

@shydefoo shydefoo changed the title Add batch job history feat: batch job history Jun 10, 2025
@shydefoo shydefoo self-assigned this Jun 24, 2025
@shydefoo shydefoo marked this pull request as ready for review June 25, 2025 02:07
static final String FEATURE_TABLE_LABEL = LABEL_PREFIX + "table";
static final String FEATURE_TABLE_HASH_LABEL = LABEL_PREFIX + "hash";
static final String PROJECT_LABEL = LABEL_PREFIX + "project";
static final String RECORD_JOB_LABEL = LABEL_PREFIX + "record-id";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This label stores the batch job record id, which is unique to each job.

.getMetadata()
.getLabels()
.containsKey("sparkoperator.k8s.io/scheduled-app-name")) {
// get sparkapplication name suffix
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This step is performed because the SparkApplication created from the ScheduledSparkapplication will contain the same set of labels passed to the ScheduledSparkapplication. Since the record-id is passed only once, we need another way to differentiate Sparkapplications created from the same ScheduledSparkapplication in the db.

This is where the suffix in the Sparkapplication name is used, since all Sparkapplications created from a ScheduledSparkapplication has the format caraml-65d8c6f9340adee8-1750762810975636102, where 1750762810975636102 is unique.

public void watchSparkApplications(String namespace, String labelSelector) {
Watchable<SparkApplication> watch;
try {
watch = sparkOperatorApi.watch(namespace, labelSelector);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The label selector here is used to only record Sparkapps that have the specific labels:

  • caraml.dev/type=Batch_INGESTION_JOB
  • caraml.dev/record-id

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant