Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,10 @@
# dataflow_transfer
A tool for transferring sequencing data from NAS to HPC
A tool for transferring sequencing data from NAS to HPC

## Status Files
The logic of the script relies on the following status files
- run.final_file
- The final file written by each sequencing machine. Used to indicate when the sequencing has completed.
- final_rsync_exitcode
- Used to indicate when the final rsync is done, so that the final rsync can be run in the background. This is especially useful for restarts after long pauses of the cronjob.
"""
1 change: 1 addition & 0 deletions dataflow_transfer/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def cli(config_file, run, sequencer):
Args:
config_file (file): Path to the configuration file.
run (str): A identifier, e.g., '20250528_LH00217_0219_A22TT52LT4'.
sequencer (str): Sequencer type, e.g., 'NovaSeqXPlus', 'MiSeq', 'AVITI'.
"""
if sequencer and not run:
raise click.UsageError(
Expand Down
32 changes: 23 additions & 9 deletions dataflow_transfer/dataflow_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,47 @@ def get_run_object(run_dir, sequencer, config):
run_class = RUN_CLASS_REGISTRY.get(sequencer)
if run_class:
return run_class(run_dir, config)
return None
else:
raise ValueError(
f"Unknown sequencer type: {sequencer}. Skipping run: {run_dir}"
)


def process_run(run_dir, sequencer, config):
run = get_run_object(run_dir, sequencer, config)
run.confirm_run_type()
if not run:
logger.warning(f"Unknown sequencer type: {sequencer}. Skipping run: {run_dir}")
if run.get_status("final_transfer_completed"):
logger.info(f"Transfer already completed for {run_dir}. No action needed.")
return
if run.sequencing_ongoing():
run.update_statusdb(status="sequencing_started")
logger.info(
f"Sequencing is ongoing for {run_dir}. Starting background transfer."
)
run.initiate_background_transfer()
run.upload_stats()
run.update_statusdb(status="transfer_started")
return
if not run.transfer_complete():
run.set_status("Sequenced", True)
run.upload_stats()
if run.get_status("sequencing_completed"):
logger.info(
f"Run {run_dir} is already marked as sequenced, but transfer not complete. Will attempt final transfer again."
)
run.update_statusdb(status="sequencing_completed")
run.sync_metadata()
logger.info(f"Sequencing is complete for {run_dir}. Starting final transfer.")
run.do_final_transfer()
run.set_status("Transferred", True)
run.update_statusdb(status="final_transfer_started")
return
else:
logger.info(f"Transfer already completed for {run_dir}. No action needed.")
if run.final_sync_successful():
logger.info(f"Final transfer completed successfully for {run_dir}.")
run.update_statusdb(status="final_transfer_completed")
return
else:
logger.error(f"Final transfer failed for {run_dir}. Please check rsync logs.")
raise RuntimeError(
f"Final transfer failed for {run_dir}."
) # TODO: we could retry? e.g log nr of retries in the DB and retry N times before sending aout an email warning?
# Removing the final transfer indicator should let the run retry next iteration


def get_run_dir(run):
Expand Down
33 changes: 20 additions & 13 deletions dataflow_transfer/run_classes/generic_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ def __init__(self, run_dir, configuration):
self.run_id = os.path.basename(run_dir)
self.configuration = configuration
self.final_file = ""
self.rsync_log = os.path.join(
self.run_dir, "rsync.log"
) # TODO: add timestamp to log filename
self.rsync_log = os.path.join(self.run_dir, "rsync.log")
self.final_rsync_exitcode_file = os.path.join(
self.run_dir, "final_rsync_exitcode"
)
self.miarka_destination = self.configuration.get("miarka_destination").get(
getattr(self, "run_type", None)
)
Expand All @@ -42,7 +43,6 @@ def initiate_background_transfer(self):
run_path=self.run_dir,
destination=self.miarka_destination,
rsync_log=self.rsync_log,
background=True,
transfer_details=self.configuration.get("transfer_details", {}),
)

Expand All @@ -52,22 +52,29 @@ def do_final_transfer(self):
run_path=self.run_dir,
destination=self.miarka_destination,
rsync_log=self.rsync_log,
background=False,
transfer_details=self.configuration.get("transfer_details", {}),
rsync_exit_code_file=self.final_rsync_exitcode_file,
)

def final_sync_successful(self):
if os.path.exists(self.final_rsync_exitcode_file):
with open(self.final_rsync_exitcode_file, "r") as f:
exit_code = f.read().strip()
if exit_code == "0":
return True
return False

def sync_metadata(self):
# TODO: implement actual metadata sync logic. Look at TACA
pass

def transfer_complete(self):
# TODO: check the status in statusdb
pass
return os.path.exists(self.final_rsync_exitcode_file)

def set_status(self, status, value):
# TODO: implement actual status update logic. Look at TACA aviti
logger.info(f"Setting status {status} to {value} for {self.run_dir}")

def upload_stats(self):
# TODO: implement actual stats upload logic. Each subclasss can have a "gather_info" method that is called here
def get_status(self, status_name):
# TODO: get statuses from view in statusdb and return true or false depending on if the status is set
pass

def update_statusdb(self, status):
# TODO: implement actual status update logic. Look at TACA aviti. Always fetch the latest doc from statusdb and use this for updating
logger.info(f"Setting status {status} for {self.run_dir}")
39 changes: 16 additions & 23 deletions dataflow_transfer/utils/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ def sync_to_hpc(
run_path: str,
destination: str,
rsync_log: str,
background: bool,
transfer_details: dict = {},
rsync_exit_code_file: os.PathLike = None,
):
"""Sync the run to storage using rsync.
Skip if rsync is already running on the run."""
Expand All @@ -42,6 +42,14 @@ def sync_to_hpc(
remote_destination,
]

if rsync_exit_code_file:
command.extend(
[
f"; echo $? > {rsync_exit_code_file}",
]
)
command_str = " ".join(command)

if rsync_is_running(
src=run_path, dst=remote_destination
): # TODO: check if this works as intended
Expand All @@ -50,25 +58,10 @@ def sync_to_hpc(
)
return False
else:
if background:
p_background = subprocess.Popen(command)
logger.info(
f"{os.path.basename(run_path)}: Started background rsync to {remote_destination}"
+ f" with PID {p_background.pid} and the following command: '{' '.join(command)}'"
)
else:
logger.info(
f"{os.path.basename(run_path)}: Starting foreground rsync to {remote_destination}"
+ f" with the following command: '{' '.join(command)}'"
)
p_foreground = subprocess.run(command)
if p_foreground.returncode == 0:
logger.info(
f"{os.path.basename(run_path)}: Rsync to {remote_destination} finished successfully."
)
return True
else:
logger.error(
f"{os.path.basename(run_path)}: Rsync to {remote_destination} failed with error code {p_foreground.returncode}."
)
return False
background_process = subprocess.Popen(
command_str, stdout=subprocess.PIPE, shell=True
)
logger.info(
f"{os.path.basename(run_path)}: Started background rsync to {remote_destination}"
+ f" with PID {background_process.pid} and the following command: '{command_str}'"
)