Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_localityupdate_not_exist(self):
self._assertStatusCodeEqual(response, http.HttpResponseNotFound.status_code)
self.assertEqual(response.content.decode(), f"The localityupdate with task id '{task_id}' was not found")

@patch("specifyweb.specify.views.update_locality_task.AsyncResult")
@patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult")
def test_failed(self, AsyncResult: Mock):
mock_result = Mock()
mock_result.state = CELERY_TASK_STATE.FAILURE
Expand Down Expand Up @@ -70,7 +70,7 @@ def test_failed(self, AsyncResult: Mock):
}
)

@patch("specifyweb.specify.views.update_locality_task.AsyncResult")
@patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult")
def test_parse_failed(self, AsyncResult: Mock):
mock_result = Mock()
mock_result.state = CELERY_TASK_STATE.SUCCESS
Expand Down Expand Up @@ -98,7 +98,7 @@ def test_parse_failed(self, AsyncResult: Mock):
}
)

@patch("specifyweb.specify.views.update_locality_task.AsyncResult")
@patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult")
def test_parsed(self, AsyncResult: Mock):
mock_result = Mock()
mock_result.state = CELERY_TASK_STATE.SUCCESS
Expand Down Expand Up @@ -149,7 +149,7 @@ def test_parsed(self, AsyncResult: Mock):
}
)

@patch("specifyweb.specify.views.update_locality_task.AsyncResult")
@patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult")
def test_succeeded(self, AsyncResult: Mock):
mock_result = Mock()
mock_result.state = LocalityUpdateStatus.SUCCEEDED
Expand Down Expand Up @@ -181,7 +181,7 @@ def test_succeeded(self, AsyncResult: Mock):
}
)

@patch("specifyweb.specify.views.update_locality_task.AsyncResult")
@patch("specifyweb.backend.locality_update_tool.views.update_locality_task.AsyncResult")
def test_succeeded_locality_rows(self, AsyncResult: Mock):
mock_result = Mock()
mock_result.state = LocalityUpdateStatus.SUCCEEDED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def test_no_ui_formatter(self):

self.assertEqual(parsed_with_value, parsed_with_value_result)

@patch("specifyweb.specify.update_locality.get_uiformatter")
@patch("specifyweb.backend.locality_update_tool.update_locality.get_uiformatter")
def test_cnn_formatter(self, get_uiformatter: Mock):

get_uiformatter.return_value = UIFormatter(
Expand Down
16 changes: 12 additions & 4 deletions specifyweb/backend/locality_update_tool/update_locality.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,11 +379,19 @@ def parse_locality_set(collection, raw_headers: list[str], data: list[list[str]]
locality_id: int | None = None if len(
locality_query) != 1 else locality_query[0].id

parsed_locality_fields = [parse_field(
collection, 'Locality', dict['field'], dict['value'], locality_id, row_number) for dict in locality_values if dict['value'].strip() != ""]
parsed_locality_fields = [
parse_field(
collection, 'Locality', d['field'], d['value'], locality_id, row_number
)
for d in locality_values
]

parsed_geocoorddetail_fields = [parse_field(
collection, 'Geocoorddetail', dict["field"], dict['value'], locality_id, row_number) for dict in geocoorddetail_values if dict['value'].strip() != ""]
parsed_geocoorddetail_fields = [
parse_field(
collection, 'Geocoorddetail', d['field'], d['value'], locality_id, row_number
)
for d in geocoorddetail_values
]

parsed_row, parsed_errors = merge_parse_results(
[*parsed_locality_fields, *parsed_geocoorddetail_fields], locality_id, row_number)
Expand Down
86 changes: 63 additions & 23 deletions specifyweb/backend/trees/extras.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,25 +207,67 @@ def close_interval(model, node_number, size):
highestchildnodenumber=F('highestchildnodenumber')-size,
)

def adding_node(node,collection=None, user=None):
def _get_collection_prefs_dict(collection, user) -> dict:
import specifyweb.backend.context.app_resource as app_resource
res = app_resource.get_app_resource(collection, user, 'CollectionPreferences')
if not res:
return {}
collection_prefs_json, _, __ = res
if not collection_prefs_json:
return {}
try:
loaded = json.loads(collection_prefs_json)
return loaded if isinstance(loaded, dict) else {}
except Exception:
return {}

def _expand_synonymization_actions_enabled(collection, user, tree_name: str) -> bool:
"""
New CollectionPreferences shape:
treeManagement.expand_synonymization_actions.<tree_name> = true/false

Backward compat with legacy shape:
treeManagement.synonymized["sp7.allow_adding_child_to_synonymized_parent.<tree_name>"] = true/false
"""
prefs = _get_collection_prefs_dict(collection, user)

tm = prefs.get("treeManagement") or {}
if not isinstance(tm, dict):
return False

# New shape
esa = tm.get("expand_synonymization_actions")
if isinstance(esa, dict) and tree_name in esa:
return bool(esa.get(tree_name))

# Legacy shape
syn = tm.get("synonymized")
if isinstance(syn, dict):
legacy_key = f"sp7.allow_adding_child_to_synonymized_parent.{tree_name}"
if legacy_key in syn:
return bool(syn.get(legacy_key))

# Default if nothing set
return False

def adding_node(node, collection=None, user=None):
logger.info('adding node %s', node)
model = type(node)
parent = model.objects.select_for_update().get(id=node.parent.id)

if parent.accepted_id is not None:
collection_prefs_json, _, __ = app_resource.get_app_resource(collection, user, 'CollectionPreferences')
if collection_prefs_json is not None:
collection_prefs_dict = json.loads(collection_prefs_json)
collection_prefs_dict = _get_collection_prefs_dict(collection, user)

treeManagement_pref = collection_prefs_dict.get('treeManagement', {})
treeManagement_pref = treeManagement_pref if isinstance(treeManagement_pref, dict) else {}

synonymized = treeManagement_pref.get('synonymized', {}) \
if isinstance(treeManagement_pref, dict) else {}
synonymized = treeManagement_pref.get('synonymized', {})
synonymized = synonymized if isinstance(synonymized, dict) else {}

add_synonym_enabled = synonymized.get(r'^sp7\.allow_adding_child_to_synonymized_parent\.' + node.specify_model.name + '=(.+)', False) if isinstance(synonymized, dict) else False
tree_name = node.specify_model.name
add_synonym_enabled = _expand_synonymization_actions_enabled(collection, user, tree_name)

if add_synonym_enabled is True:
if add_synonym_enabled is False:
raise TreeBusinessRuleException(
f'Adding node "{node.fullname}" to synonymized parent "{parent.fullname}"',
{"tree" : "Taxon",
Expand All @@ -245,7 +287,6 @@ def adding_node(node,collection=None, user=None):
"parentid": parent.parent.id,
"children": list(parent.children.values('id', 'fullname'))
}})

insertion_point = open_interval(model, parent.nodenumber, 1)
node.highestchildnodenumber = node.nodenumber = insertion_point

Expand Down Expand Up @@ -410,36 +451,35 @@ def synonymize(node, into, agent, user=None, collection=None):
node.save()

# This check can be disabled by a remote pref
import specifyweb.backend.context.app_resource as app_resource
collection_prefs_json, _, __ = app_resource.get_app_resource(collection, user, 'CollectionPreferences')
if collection_prefs_json is not None:
collection_prefs_dict = json.loads(collection_prefs_json)
collection_prefs_dict = _get_collection_prefs_dict(collection, user)

treeManagement_pref = collection_prefs_dict.get('treeManagement', {})
treeManagement_pref = treeManagement_pref if isinstance(treeManagement_pref, dict) else {}

synonymized = treeManagement_pref.get('synonymized', {}) \
if isinstance(treeManagement_pref, dict) else {}
synonymized = treeManagement_pref.get('synonymized', {})
synonymized = synonymized if isinstance(synonymized, dict) else {}

add_synonym_enabled = synonymized.get(r'^sp7\.allow_adding_child_to_synonymized_parent\.' + node.specify_model.name + '=(.+)', False) if isinstance(synonymized, dict) else False
pref_key = f"sp7.allow_adding_child_to_synonymized_parent.{node.specify_model.name}"
add_synonym_enabled = bool(synonymized.get(pref_key, False))

if node.children.count() > 0 and (add_synonym_enabled is True):
if (add_synonym_enabled is False) and node.children.exists():
raise TreeBusinessRuleException(
f'Synonymizing node "{node.fullname}" which has children',
{"tree" : "Taxon",
"localizationKey" : "nodeSynonimizeWithChildren",
"node" : {
"localizationKey" : "nodeSynonimizeWithChildren",
"node" : {
"id" : node.id,
"rankid" : node.rankid,
"fullName" : node.fullname,
"children": list(node.children.values('id', 'fullname'))
},
"parent" : {
},
"parent" : {
"id" : into.id,
"rankid" : into.rankid,
"fullName" : into.fullname,
"parentid": into.parent.id,
"children": list(into.children.values('id', 'fullname'))
}})
}})
node.acceptedchildren.update(**{node.accepted_id_attr().replace('_id', ''): target})
#assuming synonym can't be synonymized
field_change_infos = [
Expand Down Expand Up @@ -840,4 +880,4 @@ def tree_path_expr(tbl: str, d: int) -> str: # replace path_expr if ordering iss
from specifyweb.specify.models import datamodel, Sptasksemaphore
tree_model = datamodel.get_table(table)
tasknames = [name.format(tree_model.name) for name in ("UpdateNodes{}", "BadNodes{}")]
Sptasksemaphore.objects.filter(taskname__in=tasknames).update(islocked=False)
Sptasksemaphore.objects.filter(taskname__in=tasknames).update(islocked=False)