diff --git a/specifyweb/backend/locality_update_tool/tests/test_localityupdate_status.py b/specifyweb/backend/locality_update_tool/tests/test_localityupdate_status.py index acad98e50a6..5b6fceb161f 100644 --- a/specifyweb/backend/locality_update_tool/tests/test_localityupdate_status.py +++ b/specifyweb/backend/locality_update_tool/tests/test_localityupdate_status.py @@ -36,7 +36,7 @@ def test_localityupdate_not_exist(self): self._assertStatusCodeEqual(response, http.HttpResponseNotFound.status_code) self.assertEqual(response.content.decode(), f"The localityupdate with task id '{task_id}' was not found") - @patch("specifyweb.specify.views.update_locality_task.AsyncResult") + @patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult") def test_failed(self, AsyncResult: Mock): mock_result = Mock() mock_result.state = CELERY_TASK_STATE.FAILURE @@ -70,7 +70,7 @@ def test_failed(self, AsyncResult: Mock): } ) - @patch("specifyweb.specify.views.update_locality_task.AsyncResult") + @patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult") def test_parse_failed(self, AsyncResult: Mock): mock_result = Mock() mock_result.state = CELERY_TASK_STATE.SUCCESS @@ -98,7 +98,7 @@ def test_parse_failed(self, AsyncResult: Mock): } ) - @patch("specifyweb.specify.views.update_locality_task.AsyncResult") + @patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult") def test_parsed(self, AsyncResult: Mock): mock_result = Mock() mock_result.state = CELERY_TASK_STATE.SUCCESS @@ -149,7 +149,7 @@ def test_parsed(self, AsyncResult: Mock): } ) - @patch("specifyweb.specify.views.update_locality_task.AsyncResult") + @patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult") def test_succeeded(self, AsyncResult: Mock): mock_result = Mock() mock_result.state = LocalityUpdateStatus.SUCCEEDED @@ -181,7 +181,7 @@ def test_succeeded(self, AsyncResult: Mock): } ) - @patch("specifyweb.specify.views.update_locality_task.AsyncResult") + @patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult") def test_succeeded_locality_rows(self, AsyncResult: Mock): mock_result = Mock() mock_result.state = LocalityUpdateStatus.SUCCEEDED diff --git a/specifyweb/backend/locality_update_tool/tests/test_parse_field.py b/specifyweb/backend/locality_update_tool/tests/test_parse_field.py index 02e7f447433..3e512bb41df 100644 --- a/specifyweb/backend/locality_update_tool/tests/test_parse_field.py +++ b/specifyweb/backend/locality_update_tool/tests/test_parse_field.py @@ -33,7 +33,7 @@ def test_no_ui_formatter(self): self.assertEqual(parsed_with_value, parsed_with_value_result) - @patch("specifyweb.specify.update_locality.get_uiformatter") + @patch("specifyweb.backend.locality_update_tool.update_locality.get_uiformatter") def test_cnn_formatter(self, get_uiformatter: Mock): get_uiformatter.return_value = UIFormatter( diff --git a/specifyweb/backend/locality_update_tool/update_locality.py b/specifyweb/backend/locality_update_tool/update_locality.py index ae1873f1631..1fe2985daf8 100644 --- a/specifyweb/backend/locality_update_tool/update_locality.py +++ b/specifyweb/backend/locality_update_tool/update_locality.py @@ -379,11 +379,19 @@ def parse_locality_set(collection, raw_headers: list[str], data: list[list[str]] locality_id: int | None = None if len( locality_query) != 1 else locality_query[0].id - parsed_locality_fields = [parse_field( - collection, 'Locality', dict['field'], dict['value'], locality_id, row_number) for dict in locality_values if dict['value'].strip() != ""] + parsed_locality_fields = [ + parse_field( + collection, 'Locality', d['field'], d['value'], locality_id, row_number + ) + for d in locality_values + ] - parsed_geocoorddetail_fields = [parse_field( - collection, 'Geocoorddetail', dict["field"], dict['value'], locality_id, row_number) for dict in geocoorddetail_values if dict['value'].strip() != ""] + parsed_geocoorddetail_fields = [ + parse_field( + collection, 'Geocoorddetail', d['field'], d['value'], locality_id, row_number + ) + for d in geocoorddetail_values + ] parsed_row, parsed_errors = merge_parse_results( [*parsed_locality_fields, *parsed_geocoorddetail_fields], locality_id, row_number) diff --git a/specifyweb/backend/trees/extras.py b/specifyweb/backend/trees/extras.py index 52d16aec7d2..27a3342dc2e 100644 --- a/specifyweb/backend/trees/extras.py +++ b/specifyweb/backend/trees/extras.py @@ -207,25 +207,67 @@ def close_interval(model, node_number, size): highestchildnodenumber=F('highestchildnodenumber')-size, ) -def adding_node(node,collection=None, user=None): +def _get_collection_prefs_dict(collection, user) -> dict: import specifyweb.backend.context.app_resource as app_resource + res = app_resource.get_app_resource(collection, user, 'CollectionPreferences') + if not res: + return {} + collection_prefs_json, _, __ = res + if not collection_prefs_json: + return {} + try: + loaded = json.loads(collection_prefs_json) + return loaded if isinstance(loaded, dict) else {} + except Exception: + return {} + +def _expand_synonymization_actions_enabled(collection, user, tree_name: str) -> bool: + """ + New CollectionPreferences shape: + treeManagement.expand_synonymization_actions. = true/false + + Backward compat with legacy shape: + treeManagement.synonymized["sp7.allow_adding_child_to_synonymized_parent."] = true/false + """ + prefs = _get_collection_prefs_dict(collection, user) + + tm = prefs.get("treeManagement") or {} + if not isinstance(tm, dict): + return False + + # New shape + esa = tm.get("expand_synonymization_actions") + if isinstance(esa, dict) and tree_name in esa: + return bool(esa.get(tree_name)) + + # Legacy shape + syn = tm.get("synonymized") + if isinstance(syn, dict): + legacy_key = f"sp7.allow_adding_child_to_synonymized_parent.{tree_name}" + if legacy_key in syn: + return bool(syn.get(legacy_key)) + + # Default if nothing set + return False + +def adding_node(node, collection=None, user=None): logger.info('adding node %s', node) model = type(node) parent = model.objects.select_for_update().get(id=node.parent.id) if parent.accepted_id is not None: - collection_prefs_json, _, __ = app_resource.get_app_resource(collection, user, 'CollectionPreferences') - if collection_prefs_json is not None: - collection_prefs_dict = json.loads(collection_prefs_json) + collection_prefs_dict = _get_collection_prefs_dict(collection, user) treeManagement_pref = collection_prefs_dict.get('treeManagement', {}) + treeManagement_pref = treeManagement_pref if isinstance(treeManagement_pref, dict) else {} - synonymized = treeManagement_pref.get('synonymized', {}) \ - if isinstance(treeManagement_pref, dict) else {} + synonymized = treeManagement_pref.get('synonymized', {}) + synonymized = synonymized if isinstance(synonymized, dict) else {} - add_synonym_enabled = synonymized.get(r'^sp7\.allow_adding_child_to_synonymized_parent\.' + node.specify_model.name + '=(.+)', False) if isinstance(synonymized, dict) else False + tree_name = node.specify_model.name + add_synonym_enabled = _expand_synonymization_actions_enabled(collection, user, tree_name) - if add_synonym_enabled is True: + if add_synonym_enabled is False: raise TreeBusinessRuleException( f'Adding node "{node.fullname}" to synonymized parent "{parent.fullname}"', {"tree" : "Taxon", @@ -245,7 +287,6 @@ def adding_node(node,collection=None, user=None): "parentid": parent.parent.id, "children": list(parent.children.values('id', 'fullname')) }}) - insertion_point = open_interval(model, parent.nodenumber, 1) node.highestchildnodenumber = node.nodenumber = insertion_point @@ -410,36 +451,35 @@ def synonymize(node, into, agent, user=None, collection=None): node.save() # This check can be disabled by a remote pref - import specifyweb.backend.context.app_resource as app_resource - collection_prefs_json, _, __ = app_resource.get_app_resource(collection, user, 'CollectionPreferences') - if collection_prefs_json is not None: - collection_prefs_dict = json.loads(collection_prefs_json) + collection_prefs_dict = _get_collection_prefs_dict(collection, user) treeManagement_pref = collection_prefs_dict.get('treeManagement', {}) + treeManagement_pref = treeManagement_pref if isinstance(treeManagement_pref, dict) else {} - synonymized = treeManagement_pref.get('synonymized', {}) \ - if isinstance(treeManagement_pref, dict) else {} + synonymized = treeManagement_pref.get('synonymized', {}) + synonymized = synonymized if isinstance(synonymized, dict) else {} - add_synonym_enabled = synonymized.get(r'^sp7\.allow_adding_child_to_synonymized_parent\.' + node.specify_model.name + '=(.+)', False) if isinstance(synonymized, dict) else False + pref_key = f"sp7.allow_adding_child_to_synonymized_parent.{node.specify_model.name}" + add_synonym_enabled = bool(synonymized.get(pref_key, False)) - if node.children.count() > 0 and (add_synonym_enabled is True): + if (add_synonym_enabled is False) and node.children.exists(): raise TreeBusinessRuleException( f'Synonymizing node "{node.fullname}" which has children', {"tree" : "Taxon", - "localizationKey" : "nodeSynonimizeWithChildren", - "node" : { + "localizationKey" : "nodeSynonimizeWithChildren", + "node" : { "id" : node.id, "rankid" : node.rankid, "fullName" : node.fullname, "children": list(node.children.values('id', 'fullname')) - }, - "parent" : { + }, + "parent" : { "id" : into.id, "rankid" : into.rankid, "fullName" : into.fullname, "parentid": into.parent.id, "children": list(into.children.values('id', 'fullname')) - }}) + }}) node.acceptedchildren.update(**{node.accepted_id_attr().replace('_id', ''): target}) #assuming synonym can't be synonymized field_change_infos = [ @@ -840,4 +880,4 @@ def tree_path_expr(tbl: str, d: int) -> str: # replace path_expr if ordering iss from specifyweb.specify.models import datamodel, Sptasksemaphore tree_model = datamodel.get_table(table) tasknames = [name.format(tree_model.name) for name in ("UpdateNodes{}", "BadNodes{}")] - Sptasksemaphore.objects.filter(taskname__in=tasknames).update(islocked=False) + Sptasksemaphore.objects.filter(taskname__in=tasknames).update(islocked=False) \ No newline at end of file