Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions config/prefect-data-hub.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ pull:
- prefect.deployments.steps.git_clone:
#id: clone-step
repository: https://github.com/CBIIT/icdc-dataloader.git
branch: 3.2.0
branch: 3.2.0_multi_batch
include_submodules: True
- prefect.deployments.steps.git_clone:
repository: https://github.com/CBIIT/icdc-model-tool.git
branch: master
- prefect.deployments.steps.set_working_directory:
directory: "/opt/prefect/icdc-dataloader-3.2.0"
directory: "/opt/prefect/icdc-dataloader-3.2.0_multi_batch"
- prefect.deployments.steps.pip_install_requirements:
requirements_file: requirements.txt
directory: "{{ clone-step.directory }}"
Expand Down Expand Up @@ -58,7 +58,7 @@ deployments:
flow_name: null
entrypoint: loader_prefect:data_hub_loader
parameters:
s3_folder: ""
s3_folder: []
s3_bucket: "{{ prefect.variables.gc_metadata_lowertier_bucket }}"
cheat_mode: True
dry_run: False
Expand All @@ -68,6 +68,7 @@ deployments:
no_parents: True
plugins: []
split_transaction: True
empty_cell_null: True
work_pool:
name: cds-general-mgconsole-prefect-3.4.18-python3.12
work_queue_name: default
Expand Down
4 changes: 2 additions & 2 deletions data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ def load_nodes(self, session, file_name, loading_mode, split=False):
raise Exception('Wrong loading_mode: {}'.format(loading_mode))

# commit and restart a transaction when batch size reached
if split and transaction_counter >= BATCH_SIZE:
if split and transaction_counter >= BATCH_SIZE and loading_mode != DELETE_MODE:
result = tx.run(statement, batch=batch_obj_list)
tx.commit()
nodes_created, nodes_updated = self.node_count(result, node_type, nodes_created, nodes_updated, batch_obj_list)
Expand Down Expand Up @@ -1126,7 +1126,7 @@ def batch_remove_old_relationship(self, tx, old_rel_statement_dict, old_rel_valu
old_parent_list = []
for record in parent_query_results:
old_parent_list.append(record[PARENT_ID])
if len(old_parent_list) > 0:
if len(old_parent_list) > 0 and len(old_parent_list) == len(uploaded_parent_dict[parent_node]):
delete_node_id = []
for i in range(0, len(old_parent_list)):
old_parent_id = old_parent_list[i]
Expand Down
73 changes: 44 additions & 29 deletions loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import sys
import zipfile
import uuid

from neo4j import GraphDatabase
from neo4j.exceptions import ServiceUnavailable
Expand Down Expand Up @@ -53,6 +54,7 @@ def parse_arguments(args = None):
action='store_true')
parser.add_argument('--upload-log-dir', help='Upload destination dir for log file, if dir in s3, use the format, s3://[bucket]/[prefix]')
parser.add_argument('--database-type', help='The database type, can be either neo4j or memgraph', choices=[NEO4J, MEMGRAPH])
parser.add_argument('--empty-cell-null', help='Whether to treat empty cell as null value instead of empty string, default is false', action='store_true')
return parser.parse_args(args)


Expand All @@ -71,15 +73,12 @@ def process_arguments(args, log):

if args.s3_folder:
config.s3_folder = args.s3_folder
multiple_datasets = False
dataset = config.dataset
if isinstance(config.dataset, list):
multiple_datasets = True
if not multiple_datasets:
if not config.s3_folder and not os.path.isdir(dataset):
log.error('{} is not a directory!'.format(config.dataset))
sys.exit(1)
elif multiple_datasets and not config.s3_folder:
if isinstance(config.dataset, str):
config.dataset = [config.dataset]
if not isinstance(config.dataset, list):
log.error('Dataset should be with list or string formats!')
sys.exit(1)
if not config.s3_folder:
for subfolder in config.dataset:
if not os.path.isdir(subfolder):
log.error('{} is not a directory!'.format(subfolder))
Expand Down Expand Up @@ -124,12 +123,16 @@ def process_arguments(args, log):
sys.exit(1)

if config.s3_folder:
if not os.path.exists(config.dataset):
os.makedirs(config.dataset)
if len(config.dataset) != 1:
log.error('Only one local folder should be specified if given s3_bucket and s3_folder!')
sys.exit(1)
if not os.path.exists(config.dataset[0]):
os.makedirs(config.dataset[0])
else:
exist_files = glob.glob('{}/*.txt'.format(config.dataset))
exist_files = glob.glob('{}/**/*.txt'.format(config.dataset[0]), recursive=True)
exist_files += glob.glob('{}/**/*.tsv'.format(config.dataset[0]), recursive=True)
if len(exist_files) > 0:
log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset))
log.error('Folder: "{}" is not empty, please empty it first'.format(config.dataset[0]))
sys.exit(1)

if args.bucket:
Expand All @@ -138,13 +141,25 @@ def process_arguments(args, log):
log.error('Please specify S3 bucket name with -b/--bucket argument!')
sys.exit(1)
bucket = S3Bucket(config.s3_bucket)
if not os.path.isdir(config.dataset):
log.error('{} is not a directory!'.format(config.dataset))
sys.exit(1)
log.info(f'Loading data from s3://{config.s3_bucket}/{config.s3_folder}')
if not bucket.download_files_in_folder(config.s3_folder, config.dataset):
log.error('Download files from S3 bucket "{}" failed!'.format(config.s3_bucket))
if not os.path.isdir(config.dataset[0]):
log.error('{} is not a directory!'.format(config.dataset[0]))
sys.exit(1)
if isinstance(config.s3_folder, str):
s3_folders = [config.s3_folder]
else:
s3_folders = config.s3_folder
download_folders = []
for s3f in s3_folders:
log.info(f'Loading data from s3://{config.s3_bucket}/{s3f}')
# add a random uuid to the local folder name to make sure the local folder names are unique if the different s3 folders have the same last subfolders names.
folder_uuid = uuid.uuid4()
download_subfolder = os.path.basename(s3f.rstrip('/') + '_' + str(folder_uuid))
download_folder = os.path.join(config.dataset[0], download_subfolder)
download_folders.append(download_folder)
if not bucket.download_files_in_folder(s3f, download_folder):
log.error('Download files from S3 bucket "{}" failed!'.format(config.s3_bucket))
sys.exit(1)
config.dataset = download_folders

# Optional Fields
if args.uri:
Expand Down Expand Up @@ -182,7 +197,10 @@ def process_arguments(args, log):
config.max_violations = DEFAULT_MAX_VIOLATIONS

if args.upload_log_dir:
config.upload_log_dir = args.upload_log_dir
if isinstance(args.upload_log_dir, str):
config.upload_log_dir = [args.upload_log_dir]
else:
config.upload_log_dir = args.upload_log_dir

if not config.database_type:
config.database_type = NEO4J
Expand Down Expand Up @@ -236,14 +254,9 @@ def main(args):
mg_connection = None
restore_cmd = ''
load_result = None
list_dataset = False
if isinstance(config.dataset, str):
subfolders = [config.dataset]
else:
subfolders = config.dataset
list_dataset = True
try:
for folder in subfolders:
upload_log_index = 0
for folder in config.dataset:
txt_files = glob.glob('{}/*.txt'.format(folder))
tsv_files = glob.glob('{}/*.tsv'.format(folder))
file_list = txt_files + tsv_files
Expand Down Expand Up @@ -299,11 +312,13 @@ def main(args):
dest_log_dir = None
#check if uploaded dir is configured
if config.upload_log_dir:
dest_log_dir = config.upload_log_dir
dest_log_dir = config.upload_log_dir[upload_log_index]
upload_log_index += 1
else:
#check if s3 bucket/folder are set.
if config.s3_bucket and config.s3_folder:
dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder}/logs'
dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder[upload_log_index]}/logs'
upload_log_index += 1

if dest_log_dir:
try:
Expand Down
25 changes: 19 additions & 6 deletions loader_prefect.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ def load_data(
max_violation = 1000000,
mode = "upsert",
split_transaction = False,
plugins = []
plugins = [],
empty_cell_null = True
):

params = Config(
Expand All @@ -122,7 +123,8 @@ def load_data(
split_transaction,
upload_log_dir,
plugins,
temp_folder
temp_folder,
empty_cell_null
)
main(params)

Expand Down Expand Up @@ -151,7 +153,9 @@ def __init__(
split_transaction,
upload_log_dir,
plugins,
temp_folder
temp_folder,
empty_cell_null

):
self.dataset = dataset
self.uri = uri
Expand All @@ -176,6 +180,7 @@ def __init__(
self.plugins = []
self.temp_folder = temp_folder
self.database_type = database_type
self.empty_cell_null = empty_cell_null
for plugin in plugins:
self.plugins.append(PluginConfig(plugin))

Expand All @@ -196,7 +201,8 @@ def data_hub_loader(
prop_file,
no_parents=True,
plugins=[],
split_transaction=True
split_transaction=True,
empty_cell_null=True
):
secret_name = Variable.get(config_drop_list[ENVIRONMENTS][environment])
secret = get_secret(secret_name)
Expand All @@ -209,12 +215,18 @@ def data_hub_loader(
password = secret[MEMGRAPH_PASSWORD]

schemas = data_model_download(model_repo_url, model_branch)
if isinstance(s3_folder, str):
s3_folder = [s3_folder]
upload_log_dir = []
for s3f in s3_folder:
upload_log_dir.append(f's3://{s3_bucket}/{s3f}/logs')


load_data(
database_type = database_type,
s3_bucket = s3_bucket,
s3_folder = s3_folder,
upload_log_dir = f's3://{s3_bucket}/{s3_folder}/logs', #
upload_log_dir = upload_log_dir,
uri = uri,
user = user,
password = password,
Expand All @@ -227,7 +239,8 @@ def data_hub_loader(
max_violation = 1000000,
mode = mode,
plugins = plugins,
split_transaction = split_transaction
split_transaction = split_transaction,
empty_cell_null = empty_cell_null
)

if __name__ == "__main__":
Expand Down
Loading