Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __init__(self, config_file):
self.database_type = "neo4j"
self.plugins = []
self.memgraph_snapshot_dir = None
self.empty_cell_null = False
else:
if os.path.isfile(config_file):
with open(config_file) as c_file:
Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(self, config_file):
self.verbose = config.get('verbose')
self.database_type = config.get("database_type")
self.memgraph_snapshot_dir = config.get("memgraph_snapshot_dir")
self.empty_cell_null = config.get("empty_cell_null")
else:
msg = f'Can NOT open configuration file "{config_file}"!'
self.log.error(msg)
Expand Down
2 changes: 1 addition & 1 deletion config/props-cds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Properties:
TBD: String

id_fields:
study: phs_accession
study: study_id
participant: study_participant_id
diagnosis: study_diagnosis_id
treatment: treatment_id
Expand Down
9 changes: 6 additions & 3 deletions data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
INT_NODE_CREATED = 'int_node_created'
PROVIDED_PARENTS = 'provided_parents'
RELATIONSHIP_PROPS = 'relationship_properties'
BATCH_SIZE = 10000
BATCH_SIZE = 2000
OTHER = '__other__'

maxInt = sys.maxsize
Expand Down Expand Up @@ -179,6 +179,7 @@ def __init__(self, driver, schema, config=None, memgraph_snapshot_dir=None, plug
self.df_validation_dict = {}
self.skip_validation_flag = False
self.cheat_mode = True
self.empty_cell_null = False

def check_files(self, file_list):
if not file_list:
Expand Down Expand Up @@ -265,7 +266,7 @@ def validate_files(self, cheat_mode, loading_mode, file_list, max_violations, te
return True

def load(self, file_list, cheat_mode, dry_run, loading_mode, wipe_db, max_violations, temp_folder, verbose,
split=False, no_backup=True, neo4j_uri=None, backup_folder="/", username=None, password=None):
split=False, no_backup=True, neo4j_uri=None, backup_folder="/", username=None, password=None, empty_cell_null=False):
if not self.check_files(file_list):
return False
start = timer()
Expand Down Expand Up @@ -306,6 +307,7 @@ def load(self, file_list, cheat_mode, dry_run, loading_mode, wipe_db, max_violat
self.nodes_deleted_stat = {}
self.relationships_deleted_stat = {}
self.cheat_mode = True
self.empty_cell_null = empty_cell_null
if not self.driver or not isinstance(self.driver, Driver):
self.log.error('Invalid Neo4j Python Driver!')
return False
Expand Down Expand Up @@ -393,7 +395,8 @@ def prepare_node(self, node, file_name):
search_node_type, search_key = key.split('.')
elif self.schema.is_relationship_property(key):
search_node_type, search_key = key.split(self.rel_prop_delimiter)

if self.empty_cell_null and value == '' and not is_parent_pointer(key):
obj[key] = None
key_type = self.schema.get_prop_type(search_node_type, search_key)
if key_type == 'Boolean':
cleaned_value = None
Expand Down
2 changes: 2 additions & 0 deletions icdc_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ def _process_properties(self, desc):
return {PROPERTIES: props, REQUIRED: required, PRIVATE: private}

def get_list_values(self, list_str):
if list_str is None:
return None
return [item.strip() for item in list_str.split(self.delimiter) if item.strip()]
def process_node(self, name, desc, is_relationship=False):
"""
Expand Down
181 changes: 101 additions & 80 deletions loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,19 @@ def process_arguments(args, log):

if args.s3_folder:
config.s3_folder = args.s3_folder
if not config.s3_folder and not os.path.isdir(config.dataset):
log.error('{} is not a directory!'.format(config.dataset))
sys.exit(1)
multiple_datasets = False
dataset = config.dataset
if isinstance(config.dataset, list):
multiple_datasets = True
if not multiple_datasets:
if not config.s3_folder and not os.path.isdir(dataset):
log.error('{} is not a directory!'.format(config.dataset))
sys.exit(1)
elif multiple_datasets and not config.s3_folder:
for subfolder in config.dataset:
if not os.path.isdir(subfolder):
log.error('{} is not a directory!'.format(subfolder))
sys.exit(1)

if args.prop_file:
config.prop_file = args.prop_file
Expand Down Expand Up @@ -181,6 +191,12 @@ def process_arguments(args, log):
log.error('database_type is neither neo4j nor memgraph, abort loading')
sys.exit(1)

if not config.empty_cell_null:
config.empty_cell_null = False
if config.empty_cell_null not in [True, False]:
log.error('empty_cell_null should be a boolean value, abort loading')
sys.exit(1)

if args.database_type:
config.database_type = args.database_type
# Only applies when running in Prefect via loader_prefect.py, which doesn't have config files and temp_foldetemp_folderr
Expand Down Expand Up @@ -220,62 +236,92 @@ def main(args):
mg_connection = None
restore_cmd = ''
load_result = None
list_dataset = False
if isinstance(config.dataset, str):
subfolders = [config.dataset]
else:
subfolders = config.dataset
list_dataset = True
try:
txt_files = glob.glob('{}/*.txt'.format(config.dataset))
tsv_files = glob.glob('{}/*.tsv'.format(config.dataset))
file_list = txt_files + tsv_files
if file_list:
if config.wipe_db and not config.yes:
if not confirm_deletion('Wipe out entire Neo4j database before loading?'):
sys.exit(1)

if config.loading_mode == DELETE_MODE and not config.yes:
if not confirm_deletion('Delete all nodes and child nodes from data file?'):
sys.exit(1)

prop_path = os.path.join(config.dataset, config.prop_file)
if os.path.isfile(prop_path):
props = Props(prop_path)
else:
props = Props(config.prop_file)
schema = ICDC_Schema(config.schema_files, props)
if not config.dry_run or config.loading_mode == DELETE_MODE:
driver = GraphDatabase.driver(
config.neo4j_uri,
auth=(config.neo4j_user, config.neo4j_password),
encrypted=False
)

plugins = []
memgraph_snapshot_dir = None
if len(config.plugins) > 0:
for plugin_config in config.plugins:
plugins.append(prepare_plugin(plugin_config, schema))
if config.memgraph_snapshot_dir:
memgraph_snapshot_dir = config.memgraph_snapshot_dir
loader = DataLoader(driver, schema, config, memgraph_snapshot_dir, plugins)

load_result = loader.load(file_list, config.cheat_mode, config.dry_run, config.loading_mode, config.wipe_db,
config.max_violations, config.temp_folder, config.verbose, split=config.split_transactions,
no_backup=config.no_backup, neo4j_uri=config.neo4j_uri, backup_folder=config.backup_folder, username=config.neo4j_user, password=config.neo4j_password)

if load_result == False:
if loader.validation_result_file_key != "":
zip_file_key = loader.validation_result_file_key.replace(".xlsx", ".zip")
for folder in subfolders:
txt_files = glob.glob('{}/*.txt'.format(folder))
tsv_files = glob.glob('{}/*.tsv'.format(folder))
file_list = txt_files + tsv_files
if file_list:
if config.wipe_db and not config.yes:
if not confirm_deletion('Wipe out entire Neo4j database before loading?'):
sys.exit(1)

if config.loading_mode == DELETE_MODE and not config.yes:
if not confirm_deletion('Delete all nodes and child nodes from data file?'):
sys.exit(1)
prop_path = os.path.join(folder, config.prop_file)
if os.path.isfile(prop_path):
props = Props(prop_path)
else:
props = Props(config.prop_file)
schema = ICDC_Schema(config.schema_files, props)
if not config.dry_run or config.loading_mode == DELETE_MODE:
driver = GraphDatabase.driver(
config.neo4j_uri,
auth=(config.neo4j_user, config.neo4j_password),
encrypted=False
)

plugins = []
memgraph_snapshot_dir = None
if len(config.plugins) > 0:
for plugin_config in config.plugins:
plugins.append(prepare_plugin(plugin_config, schema))
if config.memgraph_snapshot_dir:
memgraph_snapshot_dir = config.memgraph_snapshot_dir
loader = DataLoader(driver, schema, config, memgraph_snapshot_dir, plugins)

load_result = loader.load(file_list, config.cheat_mode, config.dry_run, config.loading_mode, config.wipe_db,
config.max_violations, config.temp_folder, config.verbose, split=config.split_transactions,
no_backup=config.no_backup, neo4j_uri=config.neo4j_uri, backup_folder=config.backup_folder, username=config.neo4j_user, password=config.neo4j_password, empty_cell_null=config.empty_cell_null)

if load_result == False:
if loader.validation_result_file_key != "":
zip_file_key = loader.validation_result_file_key.replace(".xlsx", ".zip")
with zipfile.ZipFile(zip_file_key, 'w') as zipf:
zipf.write(loader.validation_result_file_key, os.path.basename(loader.validation_result_file_key))
zipf.write(log_file, os.path.basename(log_file))
log.error('Data loading failed, validation result zip file was created at {}'.format(zip_file_key))
else:
log.error('Data loading failed')
else:
zip_file_key = log_file.replace(".log", ".zip")
with zipfile.ZipFile(zip_file_key, 'w') as zipf:
zipf.write(loader.validation_result_file_key, os.path.basename(loader.validation_result_file_key))
zipf.write(log_file, os.path.basename(log_file))
log.error('Data loading failed, validation result zip file was created at {}'.format(zip_file_key))
else:
log.error('Data loading failed')
else:
zip_file_key = log_file.replace(".log", ".zip")
with zipfile.ZipFile(zip_file_key, 'w') as zipf:
zipf.write(log_file, os.path.basename(log_file))
log.info('Data loading succeeded, zip file was created at {}'.format(zip_file_key))
log.info('Data loading succeeded, zip file was created at {}'.format(zip_file_key))
log_file = get_log_file()
dest_log_dir = None
#check if uploaded dir is configured
if config.upload_log_dir:
dest_log_dir = config.upload_log_dir
else:
#check if s3 bucket/folder are set.
if config.s3_bucket and config.s3_folder:
dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder}/logs'

if dest_log_dir:
try:
if load_result == False:
if loader.validation_result_file_key != "":
upload_log_file(dest_log_dir, zip_file_key)
log.info(f'Uploading validation result zip file {zip_file_key} succeeded!')
else:
upload_log_file(dest_log_dir, zip_file_key)
log.info(f'Uploading validation result zip file {zip_file_key} succeeded!')
# upload_log_file(dest_log_dir, log_file)
log.info(f'Uploading log file {log_file} succeeded!')
except Exception as e:
log.debug(e)
log.exception('Copy file failed! Check debug log for detailed information')

else:
log.info('No files to load.')
else:
log.info('No files to load.')


except ServiceUnavailable:
Expand All @@ -293,31 +339,6 @@ def main(args):
if restore_cmd:
log.info(restore_cmd)

log_file = get_log_file()
dest_log_dir = None
#check if uploaded dir is configured
if config.upload_log_dir:
dest_log_dir = config.upload_log_dir
else:
#check if s3 bucket/folder are set.
if config.s3_bucket and config.s3_folder:
dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder}/logs'

if dest_log_dir:
try:
if load_result == False:
if loader.validation_result_file_key != "":
upload_log_file(dest_log_dir, zip_file_key)
log.info(f'Uploading validation result zip file {zip_file_key} succeeded!')
else:
upload_log_file(dest_log_dir, zip_file_key)
log.info(f'Uploading validation result zip file {zip_file_key} succeeded!')
# upload_log_file(dest_log_dir, log_file)
log.info(f'Uploading log file {log_file} succeeded!')
except Exception as e:
log.debug(e)
log.exception('Copy file failed! Check debug log for detailed information')

if load_result == False:
sys.exit(1)

Expand Down
Loading