diff --git a/config.py b/config.py index 54196340..843a68d2 100644 --- a/config.py +++ b/config.py @@ -48,6 +48,7 @@ def __init__(self, config_file): self.database_type = "neo4j" self.plugins = [] self.memgraph_snapshot_dir = None + self.empty_cell_null = False else: if os.path.isfile(config_file): with open(config_file) as c_file: @@ -110,6 +111,7 @@ def __init__(self, config_file): self.verbose = config.get('verbose') self.database_type = config.get("database_type") self.memgraph_snapshot_dir = config.get("memgraph_snapshot_dir") + self.empty_cell_null = config.get("empty_cell_null") else: msg = f'Can NOT open configuration file "{config_file}"!' self.log.error(msg) diff --git a/config/props-cds.yml b/config/props-cds.yml index 88f1dc96..9c01546f 100644 --- a/config/props-cds.yml +++ b/config/props-cds.yml @@ -27,7 +27,7 @@ Properties: TBD: String id_fields: - study: phs_accession + study: study_id participant: study_participant_id diagnosis: study_diagnosis_id treatment: treatment_id diff --git a/data_loader.py b/data_loader.py index d6bd0092..61ba2527 100755 --- a/data_loader.py +++ b/data_loader.py @@ -38,7 +38,7 @@ INT_NODE_CREATED = 'int_node_created' PROVIDED_PARENTS = 'provided_parents' RELATIONSHIP_PROPS = 'relationship_properties' -BATCH_SIZE = 10000 +BATCH_SIZE = 2000 OTHER = '__other__' maxInt = sys.maxsize @@ -179,6 +179,7 @@ def __init__(self, driver, schema, config=None, memgraph_snapshot_dir=None, plug self.df_validation_dict = {} self.skip_validation_flag = False self.cheat_mode = True + self.empty_cell_null = False def check_files(self, file_list): if not file_list: @@ -265,7 +266,7 @@ def validate_files(self, cheat_mode, loading_mode, file_list, max_violations, te return True def load(self, file_list, cheat_mode, dry_run, loading_mode, wipe_db, max_violations, temp_folder, verbose, - split=False, no_backup=True, neo4j_uri=None, backup_folder="/", username=None, password=None): + split=False, no_backup=True, neo4j_uri=None, backup_folder="/", username=None, password=None, empty_cell_null=False): if not self.check_files(file_list): return False start = timer() @@ -306,6 +307,7 @@ def load(self, file_list, cheat_mode, dry_run, loading_mode, wipe_db, max_violat self.nodes_deleted_stat = {} self.relationships_deleted_stat = {} self.cheat_mode = True + self.empty_cell_null = empty_cell_null if not self.driver or not isinstance(self.driver, Driver): self.log.error('Invalid Neo4j Python Driver!') return False @@ -393,7 +395,8 @@ def prepare_node(self, node, file_name): search_node_type, search_key = key.split('.') elif self.schema.is_relationship_property(key): search_node_type, search_key = key.split(self.rel_prop_delimiter) - + if self.empty_cell_null and value == '' and not is_parent_pointer(key): + obj[key] = None key_type = self.schema.get_prop_type(search_node_type, search_key) if key_type == 'Boolean': cleaned_value = None diff --git a/icdc_schema.py b/icdc_schema.py index dd4ad794..6701c729 100644 --- a/icdc_schema.py +++ b/icdc_schema.py @@ -152,6 +152,8 @@ def _process_properties(self, desc): return {PROPERTIES: props, REQUIRED: required, PRIVATE: private} def get_list_values(self, list_str): + if list_str is None: + return None return [item.strip() for item in list_str.split(self.delimiter) if item.strip()] def process_node(self, name, desc, is_relationship=False): """ diff --git a/loader.py b/loader.py index c775a9d0..3e1bf3cf 100755 --- a/loader.py +++ b/loader.py @@ -71,9 +71,19 @@ def process_arguments(args, log): if args.s3_folder: config.s3_folder = args.s3_folder - if not config.s3_folder and not os.path.isdir(config.dataset): - log.error('{} is not a directory!'.format(config.dataset)) - sys.exit(1) + multiple_datasets = False + dataset = config.dataset + if isinstance(config.dataset, list): + multiple_datasets = True + if not multiple_datasets: + if not config.s3_folder and not os.path.isdir(dataset): + log.error('{} is not a directory!'.format(config.dataset)) + sys.exit(1) + elif multiple_datasets and not config.s3_folder: + for subfolder in config.dataset: + if not os.path.isdir(subfolder): + log.error('{} is not a directory!'.format(subfolder)) + sys.exit(1) if args.prop_file: config.prop_file = args.prop_file @@ -181,6 +191,12 @@ def process_arguments(args, log): log.error('database_type is neither neo4j nor memgraph, abort loading') sys.exit(1) + if not config.empty_cell_null: + config.empty_cell_null = False + if config.empty_cell_null not in [True, False]: + log.error('empty_cell_null should be a boolean value, abort loading') + sys.exit(1) + if args.database_type: config.database_type = args.database_type # Only applies when running in Prefect via loader_prefect.py, which doesn't have config files and temp_foldetemp_folderr @@ -220,62 +236,92 @@ def main(args): mg_connection = None restore_cmd = '' load_result = None + list_dataset = False + if isinstance(config.dataset, str): + subfolders = [config.dataset] + else: + subfolders = config.dataset + list_dataset = True try: - txt_files = glob.glob('{}/*.txt'.format(config.dataset)) - tsv_files = glob.glob('{}/*.tsv'.format(config.dataset)) - file_list = txt_files + tsv_files - if file_list: - if config.wipe_db and not config.yes: - if not confirm_deletion('Wipe out entire Neo4j database before loading?'): - sys.exit(1) - - if config.loading_mode == DELETE_MODE and not config.yes: - if not confirm_deletion('Delete all nodes and child nodes from data file?'): - sys.exit(1) - - prop_path = os.path.join(config.dataset, config.prop_file) - if os.path.isfile(prop_path): - props = Props(prop_path) - else: - props = Props(config.prop_file) - schema = ICDC_Schema(config.schema_files, props) - if not config.dry_run or config.loading_mode == DELETE_MODE: - driver = GraphDatabase.driver( - config.neo4j_uri, - auth=(config.neo4j_user, config.neo4j_password), - encrypted=False - ) - - plugins = [] - memgraph_snapshot_dir = None - if len(config.plugins) > 0: - for plugin_config in config.plugins: - plugins.append(prepare_plugin(plugin_config, schema)) - if config.memgraph_snapshot_dir: - memgraph_snapshot_dir = config.memgraph_snapshot_dir - loader = DataLoader(driver, schema, config, memgraph_snapshot_dir, plugins) - - load_result = loader.load(file_list, config.cheat_mode, config.dry_run, config.loading_mode, config.wipe_db, - config.max_violations, config.temp_folder, config.verbose, split=config.split_transactions, - no_backup=config.no_backup, neo4j_uri=config.neo4j_uri, backup_folder=config.backup_folder, username=config.neo4j_user, password=config.neo4j_password) - - if load_result == False: - if loader.validation_result_file_key != "": - zip_file_key = loader.validation_result_file_key.replace(".xlsx", ".zip") + for folder in subfolders: + txt_files = glob.glob('{}/*.txt'.format(folder)) + tsv_files = glob.glob('{}/*.tsv'.format(folder)) + file_list = txt_files + tsv_files + if file_list: + if config.wipe_db and not config.yes: + if not confirm_deletion('Wipe out entire Neo4j database before loading?'): + sys.exit(1) + + if config.loading_mode == DELETE_MODE and not config.yes: + if not confirm_deletion('Delete all nodes and child nodes from data file?'): + sys.exit(1) + prop_path = os.path.join(folder, config.prop_file) + if os.path.isfile(prop_path): + props = Props(prop_path) + else: + props = Props(config.prop_file) + schema = ICDC_Schema(config.schema_files, props) + if not config.dry_run or config.loading_mode == DELETE_MODE: + driver = GraphDatabase.driver( + config.neo4j_uri, + auth=(config.neo4j_user, config.neo4j_password), + encrypted=False + ) + + plugins = [] + memgraph_snapshot_dir = None + if len(config.plugins) > 0: + for plugin_config in config.plugins: + plugins.append(prepare_plugin(plugin_config, schema)) + if config.memgraph_snapshot_dir: + memgraph_snapshot_dir = config.memgraph_snapshot_dir + loader = DataLoader(driver, schema, config, memgraph_snapshot_dir, plugins) + + load_result = loader.load(file_list, config.cheat_mode, config.dry_run, config.loading_mode, config.wipe_db, + config.max_violations, config.temp_folder, config.verbose, split=config.split_transactions, + no_backup=config.no_backup, neo4j_uri=config.neo4j_uri, backup_folder=config.backup_folder, username=config.neo4j_user, password=config.neo4j_password, empty_cell_null=config.empty_cell_null) + + if load_result == False: + if loader.validation_result_file_key != "": + zip_file_key = loader.validation_result_file_key.replace(".xlsx", ".zip") + with zipfile.ZipFile(zip_file_key, 'w') as zipf: + zipf.write(loader.validation_result_file_key, os.path.basename(loader.validation_result_file_key)) + zipf.write(log_file, os.path.basename(log_file)) + log.error('Data loading failed, validation result zip file was created at {}'.format(zip_file_key)) + else: + log.error('Data loading failed') + else: + zip_file_key = log_file.replace(".log", ".zip") with zipfile.ZipFile(zip_file_key, 'w') as zipf: - zipf.write(loader.validation_result_file_key, os.path.basename(loader.validation_result_file_key)) zipf.write(log_file, os.path.basename(log_file)) - log.error('Data loading failed, validation result zip file was created at {}'.format(zip_file_key)) - else: - log.error('Data loading failed') - else: - zip_file_key = log_file.replace(".log", ".zip") - with zipfile.ZipFile(zip_file_key, 'w') as zipf: - zipf.write(log_file, os.path.basename(log_file)) - log.info('Data loading succeeded, zip file was created at {}'.format(zip_file_key)) + log.info('Data loading succeeded, zip file was created at {}'.format(zip_file_key)) + log_file = get_log_file() + dest_log_dir = None + #check if uploaded dir is configured + if config.upload_log_dir: + dest_log_dir = config.upload_log_dir + else: + #check if s3 bucket/folder are set. + if config.s3_bucket and config.s3_folder: + dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder}/logs' + + if dest_log_dir: + try: + if load_result == False: + if loader.validation_result_file_key != "": + upload_log_file(dest_log_dir, zip_file_key) + log.info(f'Uploading validation result zip file {zip_file_key} succeeded!') + else: + upload_log_file(dest_log_dir, zip_file_key) + log.info(f'Uploading validation result zip file {zip_file_key} succeeded!') + # upload_log_file(dest_log_dir, log_file) + log.info(f'Uploading log file {log_file} succeeded!') + except Exception as e: + log.debug(e) + log.exception('Copy file failed! Check debug log for detailed information') - else: - log.info('No files to load.') + else: + log.info('No files to load.') except ServiceUnavailable: @@ -293,31 +339,6 @@ def main(args): if restore_cmd: log.info(restore_cmd) - log_file = get_log_file() - dest_log_dir = None - #check if uploaded dir is configured - if config.upload_log_dir: - dest_log_dir = config.upload_log_dir - else: - #check if s3 bucket/folder are set. - if config.s3_bucket and config.s3_folder: - dest_log_dir = f's3://{config.s3_bucket}/{config.s3_folder}/logs' - - if dest_log_dir: - try: - if load_result == False: - if loader.validation_result_file_key != "": - upload_log_file(dest_log_dir, zip_file_key) - log.info(f'Uploading validation result zip file {zip_file_key} succeeded!') - else: - upload_log_file(dest_log_dir, zip_file_key) - log.info(f'Uploading validation result zip file {zip_file_key} succeeded!') - # upload_log_file(dest_log_dir, log_file) - log.info(f'Uploading log file {log_file} succeeded!') - except Exception as e: - log.debug(e) - log.exception('Copy file failed! Check debug log for detailed information') - if load_result == False: sys.exit(1)