From 0cb5bb452b45aef2cfefbfd9a4e5bd3878acc119 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Wed, 8 Apr 2026 13:04:48 -0400 Subject: [PATCH 01/11] update to accept multiple s3 folders --- config/prefect-data-hub.yaml | 7 +++--- data_loader.py | 2 +- loader.py | 41 +++++++++++++++++++++++++----------- loader_prefect.py | 25 ++++++++++++++++------ 4 files changed, 53 insertions(+), 22 deletions(-) diff --git a/config/prefect-data-hub.yaml b/config/prefect-data-hub.yaml index 5e8d1198..e4ba107a 100644 --- a/config/prefect-data-hub.yaml +++ b/config/prefect-data-hub.yaml @@ -17,13 +17,13 @@ pull: - prefect.deployments.steps.git_clone: #id: clone-step repository: https://github.com/CBIIT/icdc-dataloader.git - branch: 3.2.0 + branch: 3.2.0_multi_batch include_submodules: True - prefect.deployments.steps.git_clone: repository: https://github.com/CBIIT/icdc-model-tool.git branch: master - prefect.deployments.steps.set_working_directory: - directory: "/opt/prefect/icdc-dataloader-3.2.0" + directory: "/opt/prefect/icdc-dataloader-3.2.0_multi_batch" - prefect.deployments.steps.pip_install_requirements: requirements_file: requirements.txt directory: "{{ clone-step.directory }}" @@ -58,7 +58,7 @@ deployments: flow_name: null entrypoint: loader_prefect:data_hub_loader parameters: - s3_folder: "" + s3_folder: [] s3_bucket: "{{ prefect.variables.gc_metadata_lowertier_bucket }}" cheat_mode: True dry_run: False @@ -68,6 +68,7 @@ deployments: no_parents: True plugins: [] split_transaction: True + empty_cell_null: True work_pool: name: cds-general-mgconsole-prefect-3.4.18-python3.12 work_queue_name: default diff --git a/data_loader.py b/data_loader.py index 61ba2527..80856080 100755 --- a/data_loader.py +++ b/data_loader.py @@ -1126,7 +1126,7 @@ def batch_remove_old_relationship(self, tx, old_rel_statement_dict, old_rel_valu old_parent_list = [] for record in parent_query_results: old_parent_list.append(record[PARENT_ID]) - if len(old_parent_list) > 0: + if len(old_parent_list) > 0 and len(old_parent_list) == len(uploaded_parent_dict[parent_node]): delete_node_id = [] for i in range(0, len(old_parent_list)): old_parent_id = old_parent_list[i] diff --git a/loader.py b/loader.py index 3e1bf3cf..005c11f2 100755 --- a/loader.py +++ b/loader.py @@ -53,6 +53,7 @@ def parse_arguments(args = None): action='store_true') parser.add_argument('--upload-log-dir', help='Upload destination dir for log file, if dir in s3, use the format, s3://[bucket]/[prefix]') parser.add_argument('--database-type', help='The database type, can be either neo4j or memgraph', choices=[NEO4J, MEMGRAPH]) + parser.add_argument('--empty-cell-null', help='Whether to treat empty cell as null value instead of empty string, default is false', action='store_true') return parser.parse_args(args) @@ -124,10 +125,11 @@ def process_arguments(args, log): sys.exit(1) if config.s3_folder: - if not os.path.exists(config.dataset): - os.makedirs(config.dataset) + + if not os.path.exists(config.dataset[0]): + os.makedirs(config.dataset[0]) else: - exist_files = glob.glob('{}/*.txt'.format(config.dataset)) + exist_files = glob.glob('{}/*.txt'.format(config.dataset[0])) if len(exist_files) > 0: log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset)) sys.exit(1) @@ -138,13 +140,22 @@ def process_arguments(args, log): log.error('Please specify S3 bucket name with -b/--bucket argument!') sys.exit(1) bucket = S3Bucket(config.s3_bucket) - if not os.path.isdir(config.dataset): - log.error('{} is not a directory!'.format(config.dataset)) - sys.exit(1) - log.info(f'Loading data from s3://{config.s3_bucket}/{config.s3_folder}') - if not bucket.download_files_in_folder(config.s3_folder, config.dataset): - log.error('Download files from S3 bucket "{}" failed!'.format(config.s3_bucket)) + if not os.path.isdir(config.dataset[0]): + log.error('{} is not a directory!'.format(config.dataset[0])) sys.exit(1) + if isinstance(config.s3_folder, str): + s3_folders = [config.s3_folder] + else: + s3_folders = config.s3_folder + download_folders = [] + for s3f in s3_folders: + log.info(f'Loading data from s3://{config.s3_bucket}/{s3f}') + download_folder = os.path.join(config.dataset[0], os.path.basename(s3f.rstrip('/'))) + download_folders.append(download_folder) + if not bucket.download_files_in_folder(s3f, download_folder): + log.error('Download files from S3 bucket "{}" failed!'.format(config.s3_bucket)) + sys.exit(1) + config.dataset = download_folders # Optional Fields if args.uri: @@ -182,7 +193,10 @@ def process_arguments(args, log): config.max_violations = DEFAULT_MAX_VIOLATIONS if args.upload_log_dir: - config.upload_log_dir = args.upload_log_dir + if isinstance(args.upload_log_dir, str): + config.upload_log_dir = [args.upload_log_dir] + else: + config.upload_log_dir = args.upload_log_dir if not config.database_type: config.database_type = NEO4J @@ -243,6 +257,7 @@ def main(args): subfolders = config.dataset list_dataset = True try: + upload_log_index = 0 for folder in subfolders: txt_files = glob.glob('{}/*.txt'.format(folder)) tsv_files = glob.glob('{}/*.tsv'.format(folder)) @@ -299,11 +314,13 @@ def main(args): dest_log_dir = None #check if uploaded dir is configured if config.upload_log_dir: - dest_log_dir = config.upload_log_dir + dest_log_dir = config.upload_log_dir[upload_log_index] + upload_log_index += 1 else: #check if s3 bucket/folder are set. if config.s3_bucket and config.s3_folder: - dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder}/logs' + dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder[upload_log_index]}/logs' + upload_log_index += 1 if dest_log_dir: try: diff --git a/loader_prefect.py b/loader_prefect.py index efa9f319..84bce280 100644 --- a/loader_prefect.py +++ b/loader_prefect.py @@ -96,7 +96,8 @@ def load_data( max_violation = 1000000, mode = "upsert", split_transaction = False, - plugins = [] + plugins = [], + empty_cell_null = True ): params = Config( @@ -122,7 +123,8 @@ def load_data( split_transaction, upload_log_dir, plugins, - temp_folder + temp_folder, + empty_cell_null ) main(params) @@ -151,7 +153,9 @@ def __init__( split_transaction, upload_log_dir, plugins, - temp_folder + temp_folder, + empty_cell_null + ): self.dataset = dataset self.uri = uri @@ -176,6 +180,7 @@ def __init__( self.plugins = [] self.temp_folder = temp_folder self.database_type = database_type + self.empty_cell_null = empty_cell_null for plugin in plugins: self.plugins.append(PluginConfig(plugin)) @@ -196,7 +201,8 @@ def data_hub_loader( prop_file, no_parents=True, plugins=[], - split_transaction=True + split_transaction=True, + empty_cell_null=True ): secret_name = Variable.get(config_drop_list[ENVIRONMENTS][environment]) secret = get_secret(secret_name) @@ -209,12 +215,18 @@ def data_hub_loader( password = secret[MEMGRAPH_PASSWORD] schemas = data_model_download(model_repo_url, model_branch) + if isinstance(s3_folder, str): + s3_folder = [s3_folder] + upload_log_dir = [] + for s3f in s3_folder: + upload_log_dir.append(f's3://{s3_bucket}/{s3f}/logs') + load_data( database_type = database_type, s3_bucket = s3_bucket, s3_folder = s3_folder, - upload_log_dir = f's3://{s3_bucket}/{s3_folder}/logs', # + upload_log_dir = upload_log_dir, uri = uri, user = user, password = password, @@ -227,7 +239,8 @@ def data_hub_loader( max_violation = 1000000, mode = mode, plugins = plugins, - split_transaction = split_transaction + split_transaction = split_transaction, + empty_cell_null = empty_cell_null ) if __name__ == "__main__": From e19ad8474aadd8193a76265686c1baeb268a5b5b Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Wed, 8 Apr 2026 16:12:54 -0400 Subject: [PATCH 02/11] Update data_loader.py --- data_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_loader.py b/data_loader.py index 80856080..09e61a1f 100755 --- a/data_loader.py +++ b/data_loader.py @@ -952,7 +952,7 @@ def load_nodes(self, session, file_name, loading_mode, split=False): raise Exception('Wrong loading_mode: {}'.format(loading_mode)) # commit and restart a transaction when batch size reached - if split and transaction_counter >= BATCH_SIZE: + if split and transaction_counter >= BATCH_SIZE and loading_mode != DELETE_MODE: result = tx.run(statement, batch=batch_obj_list) tx.commit() nodes_created, nodes_updated = self.node_count(result, node_type, nodes_created, nodes_updated, batch_obj_list) From ef94dde00d98eed0ddb3c2077d19ac7f717b0f2e Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Mon, 13 Apr 2026 09:59:21 -0400 Subject: [PATCH 03/11] Update prefect-data-hub.yaml --- config/prefect-data-hub.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/prefect-data-hub.yaml b/config/prefect-data-hub.yaml index e4ba107a..0fe41bb0 100644 --- a/config/prefect-data-hub.yaml +++ b/config/prefect-data-hub.yaml @@ -17,13 +17,13 @@ pull: - prefect.deployments.steps.git_clone: #id: clone-step repository: https://github.com/CBIIT/icdc-dataloader.git - branch: 3.2.0_multi_batch + branch: 3.2.0 include_submodules: True - prefect.deployments.steps.git_clone: repository: https://github.com/CBIIT/icdc-model-tool.git branch: master - prefect.deployments.steps.set_working_directory: - directory: "/opt/prefect/icdc-dataloader-3.2.0_multi_batch" + directory: "/opt/prefect/icdc-dataloader-3.2.0" - prefect.deployments.steps.pip_install_requirements: requirements_file: requirements.txt directory: "{{ clone-step.directory }}" From 0b8632e201084a869b42f0f27cf7b03906c5137f Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Wed, 15 Apr 2026 15:43:02 -0400 Subject: [PATCH 04/11] Update loader.py --- loader.py | 45 ++++++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/loader.py b/loader.py index 005c11f2..89ec8c6a 100755 --- a/loader.py +++ b/loader.py @@ -72,19 +72,14 @@ def process_arguments(args, log): if args.s3_folder: config.s3_folder = args.s3_folder - multiple_datasets = False - dataset = config.dataset - if isinstance(config.dataset, list): - multiple_datasets = True - if not multiple_datasets: - if not config.s3_folder and not os.path.isdir(dataset): - log.error('{} is not a directory!'.format(config.dataset)) + + if isinstance(config.dataset, str): + config.dataset = [config.dataset] + + for subfolder in config.dataset: + if not os.path.isdir(subfolder): + log.error('{} is not a directory!'.format(subfolder)) sys.exit(1) - elif multiple_datasets and not config.s3_folder: - for subfolder in config.dataset: - if not os.path.isdir(subfolder): - log.error('{} is not a directory!'.format(subfolder)) - sys.exit(1) if args.prop_file: config.prop_file = args.prop_file @@ -125,13 +120,17 @@ def process_arguments(args, log): sys.exit(1) if config.s3_folder: - + if len(config.dataset) != 1: + log.error('When using S3 folder, only one dataset can be specified!') + sys.exit(1) if not os.path.exists(config.dataset[0]): os.makedirs(config.dataset[0]) else: - exist_files = glob.glob('{}/*.txt'.format(config.dataset[0])) + txt_files = glob.glob('{}/**/*.txt'.format(config.dataset[0]), recursive=True) + tsv_files = glob.glob('{}/**/*.tsv'.format(config.dataset[0]), recursive=True) + exist_files = txt_files + tsv_files if len(exist_files) > 0: - log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset)) + log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset[0])) sys.exit(1) if args.bucket: @@ -140,9 +139,11 @@ def process_arguments(args, log): log.error('Please specify S3 bucket name with -b/--bucket argument!') sys.exit(1) bucket = S3Bucket(config.s3_bucket) - if not os.path.isdir(config.dataset[0]): - log.error('{} is not a directory!'.format(config.dataset[0])) - sys.exit(1) + + for dataset in config.dataset: + if not os.path.isdir(dataset): + log.error('{} is not a directory!'.format(dataset)) + sys.exit(1) if isinstance(config.s3_folder, str): s3_folders = [config.s3_folder] else: @@ -250,15 +251,9 @@ def main(args): mg_connection = None restore_cmd = '' load_result = None - list_dataset = False - if isinstance(config.dataset, str): - subfolders = [config.dataset] - else: - subfolders = config.dataset - list_dataset = True try: upload_log_index = 0 - for folder in subfolders: + for folder in config.dataset: txt_files = glob.glob('{}/*.txt'.format(folder)) tsv_files = glob.glob('{}/*.tsv'.format(folder)) file_list = txt_files + tsv_files From 980f5bcd26d6af4f213f7d2044e81d91e6375de7 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Thu, 16 Apr 2026 11:01:52 -0400 Subject: [PATCH 05/11] Update loader.py --- loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loader.py b/loader.py index 89ec8c6a..ef82ffb8 100755 --- a/loader.py +++ b/loader.py @@ -77,7 +77,7 @@ def process_arguments(args, log): config.dataset = [config.dataset] for subfolder in config.dataset: - if not os.path.isdir(subfolder): + if not os.path.isdir(subfolder) and not config.s3_folder: log.error('{} is not a directory!'.format(subfolder)) sys.exit(1) From 531af918cc522e8bbce4b2b7623b7cf403a8b837 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:40:25 -0400 Subject: [PATCH 06/11] Revert "Update loader.py" This reverts commit 980f5bcd26d6af4f213f7d2044e81d91e6375de7. --- loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/loader.py b/loader.py index ef82ffb8..89ec8c6a 100755 --- a/loader.py +++ b/loader.py @@ -77,7 +77,7 @@ def process_arguments(args, log): config.dataset = [config.dataset] for subfolder in config.dataset: - if not os.path.isdir(subfolder) and not config.s3_folder: + if not os.path.isdir(subfolder): log.error('{} is not a directory!'.format(subfolder)) sys.exit(1) From 13c6517f499fd80f03e82d77fa992d96eb92d735 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:40:41 -0400 Subject: [PATCH 07/11] Revert "Update loader.py" This reverts commit 0b8632e201084a869b42f0f27cf7b03906c5137f. --- loader.py | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/loader.py b/loader.py index 89ec8c6a..005c11f2 100755 --- a/loader.py +++ b/loader.py @@ -72,14 +72,19 @@ def process_arguments(args, log): if args.s3_folder: config.s3_folder = args.s3_folder - - if isinstance(config.dataset, str): - config.dataset = [config.dataset] - - for subfolder in config.dataset: - if not os.path.isdir(subfolder): - log.error('{} is not a directory!'.format(subfolder)) + multiple_datasets = False + dataset = config.dataset + if isinstance(config.dataset, list): + multiple_datasets = True + if not multiple_datasets: + if not config.s3_folder and not os.path.isdir(dataset): + log.error('{} is not a directory!'.format(config.dataset)) sys.exit(1) + elif multiple_datasets and not config.s3_folder: + for subfolder in config.dataset: + if not os.path.isdir(subfolder): + log.error('{} is not a directory!'.format(subfolder)) + sys.exit(1) if args.prop_file: config.prop_file = args.prop_file @@ -120,17 +125,13 @@ def process_arguments(args, log): sys.exit(1) if config.s3_folder: - if len(config.dataset) != 1: - log.error('When using S3 folder, only one dataset can be specified!') - sys.exit(1) + if not os.path.exists(config.dataset[0]): os.makedirs(config.dataset[0]) else: - txt_files = glob.glob('{}/**/*.txt'.format(config.dataset[0]), recursive=True) - tsv_files = glob.glob('{}/**/*.tsv'.format(config.dataset[0]), recursive=True) - exist_files = txt_files + tsv_files + exist_files = glob.glob('{}/*.txt'.format(config.dataset[0])) if len(exist_files) > 0: - log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset[0])) + log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset)) sys.exit(1) if args.bucket: @@ -139,11 +140,9 @@ def process_arguments(args, log): log.error('Please specify S3 bucket name with -b/--bucket argument!') sys.exit(1) bucket = S3Bucket(config.s3_bucket) - - for dataset in config.dataset: - if not os.path.isdir(dataset): - log.error('{} is not a directory!'.format(dataset)) - sys.exit(1) + if not os.path.isdir(config.dataset[0]): + log.error('{} is not a directory!'.format(config.dataset[0])) + sys.exit(1) if isinstance(config.s3_folder, str): s3_folders = [config.s3_folder] else: @@ -251,9 +250,15 @@ def main(args): mg_connection = None restore_cmd = '' load_result = None + list_dataset = False + if isinstance(config.dataset, str): + subfolders = [config.dataset] + else: + subfolders = config.dataset + list_dataset = True try: upload_log_index = 0 - for folder in config.dataset: + for folder in subfolders: txt_files = glob.glob('{}/*.txt'.format(folder)) tsv_files = glob.glob('{}/*.tsv'.format(folder)) file_list = txt_files + tsv_files From 93fb19e350563a1b8351d454124d23d778d1e8d0 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Thu, 16 Apr 2026 15:47:35 -0400 Subject: [PATCH 08/11] Revert "Update prefect-data-hub.yaml" This reverts commit ef94dde00d98eed0ddb3c2077d19ac7f717b0f2e. --- config/prefect-data-hub.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/prefect-data-hub.yaml b/config/prefect-data-hub.yaml index 0fe41bb0..e4ba107a 100644 --- a/config/prefect-data-hub.yaml +++ b/config/prefect-data-hub.yaml @@ -17,13 +17,13 @@ pull: - prefect.deployments.steps.git_clone: #id: clone-step repository: https://github.com/CBIIT/icdc-dataloader.git - branch: 3.2.0 + branch: 3.2.0_multi_batch include_submodules: True - prefect.deployments.steps.git_clone: repository: https://github.com/CBIIT/icdc-model-tool.git branch: master - prefect.deployments.steps.set_working_directory: - directory: "/opt/prefect/icdc-dataloader-3.2.0" + directory: "/opt/prefect/icdc-dataloader-3.2.0_multi_batch" - prefect.deployments.steps.pip_install_requirements: requirements_file: requirements.txt directory: "{{ clone-step.directory }}" From 68e81200133ba6bb33e535979768f2c88a8f8654 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Mon, 20 Apr 2026 11:39:29 -0400 Subject: [PATCH 09/11] update based on Ming's review --- loader.py | 33 ++++++++++++++------------------- 1 file changed, 14 insertions(+), 19 deletions(-) diff --git a/loader.py b/loader.py index 005c11f2..ceb0d027 100755 --- a/loader.py +++ b/loader.py @@ -5,6 +5,7 @@ import sys import zipfile +from dataclasses_json import config from neo4j import GraphDatabase from neo4j.exceptions import ServiceUnavailable from neo4j.exceptions import AuthError @@ -72,15 +73,12 @@ def process_arguments(args, log): if args.s3_folder: config.s3_folder = args.s3_folder - multiple_datasets = False - dataset = config.dataset - if isinstance(config.dataset, list): - multiple_datasets = True - if not multiple_datasets: - if not config.s3_folder and not os.path.isdir(dataset): - log.error('{} is not a directory!'.format(config.dataset)) - sys.exit(1) - elif multiple_datasets and not config.s3_folder: + if isinstance(config.dataset, str): + config.dataset = [config.dataset] + if not isinstance(config.dataset, list): + log.error('Dataset should be with list or string formats!') + sys.exit(1) + if not config.s3_folder: for subfolder in config.dataset: if not os.path.isdir(subfolder): log.error('{} is not a directory!'.format(subfolder)) @@ -125,13 +123,16 @@ def process_arguments(args, log): sys.exit(1) if config.s3_folder: - + if len(config.dataset) != 1: + log.error('Only one local folder should be specified if given s3_bucket and s3_folder!') + sys.exit(1) if not os.path.exists(config.dataset[0]): os.makedirs(config.dataset[0]) else: - exist_files = glob.glob('{}/*.txt'.format(config.dataset[0])) + exist_files = glob.glob('{}/**/*.txt'.format(config.dataset[0]), recursive=True) + exist_files += glob.glob('{}/**/*.tsv'.format(config.dataset[0]), recursive=True) if len(exist_files) > 0: - log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset)) + log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset[0])) sys.exit(1) if args.bucket: @@ -250,15 +251,9 @@ def main(args): mg_connection = None restore_cmd = '' load_result = None - list_dataset = False - if isinstance(config.dataset, str): - subfolders = [config.dataset] - else: - subfolders = config.dataset - list_dataset = True try: upload_log_index = 0 - for folder in subfolders: + for folder in config.dataset: txt_files = glob.glob('{}/*.txt'.format(folder)) tsv_files = glob.glob('{}/*.tsv'.format(folder)) file_list = txt_files + tsv_files From 028107758a2ff641b060bd5c09e4cc7a3e21aa75 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Mon, 20 Apr 2026 11:54:48 -0400 Subject: [PATCH 10/11] remove useless code --- loader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/loader.py b/loader.py index ceb0d027..b68d33af 100755 --- a/loader.py +++ b/loader.py @@ -5,7 +5,6 @@ import sys import zipfile -from dataclasses_json import config from neo4j import GraphDatabase from neo4j.exceptions import ServiceUnavailable from neo4j.exceptions import AuthError From 09ade4dfc54b1de33c8bdbe165912097947682a5 Mon Sep 17 00:00:00 2001 From: wfy1997 <60631776+wfy1997@users.noreply.github.com> Date: Mon, 20 Apr 2026 12:15:28 -0400 Subject: [PATCH 11/11] Update loader.py add a random uuid to the local folder name to make sure the local folder names are unique if the different s3 folders have the same last subfolders names --- loader.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/loader.py b/loader.py index b68d33af..00fac684 100755 --- a/loader.py +++ b/loader.py @@ -4,6 +4,7 @@ import os import sys import zipfile +import uuid from neo4j import GraphDatabase from neo4j.exceptions import ServiceUnavailable @@ -150,7 +151,10 @@ def process_arguments(args, log): download_folders = [] for s3f in s3_folders: log.info(f'Loading data from s3://{config.s3_bucket}/{s3f}') - download_folder = os.path.join(config.dataset[0], os.path.basename(s3f.rstrip('/'))) + # add a random uuid to the local folder name to make sure the local folder names are unique if the different s3 folders have the same last subfolders names. + folder_uuid = uuid.uuid4() + download_subfolder = os.path.basename(s3f.rstrip('/') + '_' + str(folder_uuid)) + download_folder = os.path.join(config.dataset[0], download_subfolder) download_folders.append(download_folder) if not bucket.download_files_in_folder(s3f, download_folder): log.error('Download files from S3 bucket "{}" failed!'.format(config.s3_bucket))