Skip to content

Datasets: First-Class Directory Outputs #184

@daniel-thom

Description

@daniel-thom

Datasets: First-Class Directory Outputs

Created in collaboration with Opus 4.6 via Claude Code.

Problem Statement

Torc's current FileModel represents individual files as job inputs/outputs for dependency
tracking. However, many workflows produce directory-based outputs such as:

  • Hive-partitioned Parquet datasets (thousands of files)
  • Sharded model checkpoints
  • Multi-file archives or bundles

These have different semantics than individual files:

  1. Multiple jobs contribute to one dataset - Parameterized jobs write files to the same
    directory structure, often embedding job IDs in filenames for uniqueness
  2. Completion is aggregate - The dataset isn't "complete" until all contributing jobs finish
  3. File counts are unpredictable - Due to compaction, dynamic partitioning, etc.
  4. Integrity verification is different - Hashing thousands of files individually is impractical;
    a manifest-based approach is preferred

Long-Term Vision

DatasetModel

A new database table to represent directory-based outputs:

CREATE TABLE datasets (
    id INTEGER PRIMARY KEY,
    workflow_id INTEGER NOT NULL REFERENCES workflows(id) ON DELETE CASCADE,
    name TEXT NOT NULL,
    path TEXT NOT NULL,
    description TEXT,

    -- Computed when dataset is finalized:
    file_count INTEGER,
    total_size_bytes INTEGER,
    manifest_hash TEXT,
    hash_mode TEXT,  -- 'manifest', 'content', or 'none'
    finalized_at REAL,

    UNIQUE(workflow_id, name)
);

-- Track which jobs contribute to which datasets
CREATE TABLE job_dataset_outputs (
    job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
    dataset_id INTEGER NOT NULL REFERENCES datasets(id) ON DELETE CASCADE,
    PRIMARY KEY (job_id, dataset_id)
);

CREATE TABLE job_dataset_inputs (
    job_id INTEGER NOT NULL REFERENCES jobs(id) ON DELETE CASCADE,
    dataset_id INTEGER NOT NULL REFERENCES datasets(id) ON DELETE CASCADE,
    PRIMARY KEY (job_id, dataset_id)
);

Workflow Specification Syntax

Datasets would be defined similarly to files, with analogous reference syntax:

name: distributed_training
enable_ro_crate: true
ro_crate_hash_mode: manifest  # default for datasets

datasets:
  - name: training_output
    path: output/training.parquet/
    description: "Hive-partitioned training results"

  - name: checkpoints
    path: output/checkpoints/

files:
  - name: config
    path: input/config.json
    st_mtime: 1709567890

jobs:
  # Parameterized jobs write to the dataset
  - name: train_chunk_{i}
    command: >
      python train.py
        --config ${files.input.config}
        --chunk {i}
        --output ${datasets.output.training_output}/chunk_{i}/
    parameters:
      i: "0:99"

  # Aggregation job depends on all chunks completing
  - name: aggregate_results
    command: >
      python aggregate.py
        --input ${datasets.input.training_output}
        --output ${files.output.summary}

Dependency Semantics

Dataset dependencies follow the same pattern as file dependencies:

  • ${datasets.output.X} - Job writes to dataset X (recorded in job_dataset_outputs)
  • ${datasets.input.X} - Job reads from dataset X (recorded in job_dataset_inputs)

Dependency resolution:

  • A job using ${datasets.input.X} depends on all jobs that use ${datasets.output.X}
  • This naturally handles fan-in from parameterized jobs

Example:

train_chunk_0  ─┐
train_chunk_1  ─┼─→ training_output ─→ aggregate_results
train_chunk_2  ─┤
...            ─┘

Trait Abstraction

To avoid code duplication, files and datasets should share a common interface:

/// Common behavior for workflow artifacts (files and datasets)
pub trait WorkflowArtifact {
    fn id(&self) -> i64;
    fn workflow_id(&self) -> i64;
    fn name(&self) -> &str;
    fn path(&self) -> &str;

    /// Check if the artifact exists on the filesystem
    fn exists(&self) -> bool;

    /// Get the artifact type for RO-Crate
    fn ro_crate_type(&self) -> &'static str;

    /// Build RO-Crate metadata for this artifact
    fn build_ro_crate_metadata(&self, hash_mode: HashMode) -> serde_json::Value;
}

impl WorkflowArtifact for FileModel { /* ... */ }
impl WorkflowArtifact for DatasetModel { /* ... */ }

Implementation Scope

This is a significant change touching:

Server:

  • New datasets table and API endpoints
  • Dependency resolution in unblock_jobs_waiting_for must handle dataset dependencies
  • Job initialization must resolve dataset references

Client:

  • workflow_spec.rs - Parse datasets section, handle ${datasets.*} references
  • workflow_manager.rs - Create datasets, track contributions, finalize on completion
  • job_runner.rs - No changes needed (jobs just write to paths)
  • RO-Crate export - Include Dataset entities

Migration:

  • Database migration for new tables
  • Existing workflows unaffected (no datasets defined)

Short-Term Solution

Until the full implementation is complete, a CLI command provides manual dataset support:

torc ro-crate add-dataset \
  --workflow-id 123 \
  --name training_output \
  --path output/training.parquet/ \
  --hash-mode manifest

This allows users to:

  1. Run their workflow (with manual dependency management)
  2. After completion, add Dataset entities to the RO-Crate metadata

See the torc ro-crate add-dataset command for details.

Hash Modes

Three modes for dataset integrity verification:

manifest (recommended for large datasets)

Hash a sorted list of (relative_path, size, mtime) tuples:

file1.parquet|1048576|1709567890.123
file2.parquet|2097152|1709567891.456
...
  • Fast: Only reads filesystem metadata, not file contents
  • Detects: File additions, deletions, size changes, modification time changes
  • Does not detect: Content changes without size/mtime change (rare)

content

Compute SHA256 of all file contents (Merkle tree style):

  • Thorough: Detects any content change
  • Slow: Must read all file contents
  • Use for: Small datasets where integrity is critical

none

No hash computed. Only record file count and total size.

  • Fastest: Just stat() calls
  • Use for: Ephemeral datasets, very large datasets where hashing is impractical

RO-Crate Representation

Datasets are represented as schema:Dataset entities:

{
  "@id": "output/training.parquet/",
  "@type": "Dataset",
  "name": "training_output",
  "description": "Hive-partitioned training results",
  "contentSize": 15032385536,
  "fileCount": 2847,
  "sha256": "a1b2c3...",
  "hashMode": "manifest",
  "encodingFormat": "application/vnd.apache.parquet",
  "wasGeneratedBy": [
    {"@id": "#job-1-attempt-1"},
    {"@id": "#job-2-attempt-1"},
    ...
  ]
}

Open Questions

  1. Partial dataset reads - Can a job depend on a subset of a dataset? (Probably not needed
    initially)

  2. Dataset versioning - If a workflow is restarted and jobs re-run, how do we handle the
    dataset? This could be deferred to the user application.

  3. External datasets - Can a dataset be an input that exists before the workflow runs? (Yes,
    similar to input files with st_mtime set)

  4. Nested datasets - Can datasets contain subdatasets? (Probably not needed)

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions