Skip to content
235 changes: 231 additions & 4 deletions application/cmd/cre_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
logger.setLevel(logging.INFO)

app = None
DEFAULT_UPSTREAM_API_URL = "https://opencre.org/rest/v1"
UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS = 30
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV = "CRE_UPSTREAM_SYNC_MAX_MAP_ANALYSIS_PAIRS"
DEFAULT_UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS = 25


def register_node(node: defs.Node, collection: db.Node_collection) -> db.Node:
Expand Down Expand Up @@ -461,14 +465,224 @@ def review_from_spreadsheet(cache: str, spreadsheet_url: str, share_with: str) -
# logger.info("A spreadsheet view is at %s" % sheet_url)


def _upstream_api_url() -> str:
return os.environ.get("CRE_UPSTREAM_API_URL", DEFAULT_UPSTREAM_API_URL).rstrip("/")


def _fetch_upstream_json(
url: str,
*,
context: str,
params: List[Tuple[str, str]] | None = None,
expected_type: type | None = None,
) -> Any | None:
try:
response = requests.get(
url,
params=params,
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
except requests.RequestException as exc:
logger.warning("Could not fetch %s from upstream: %s", context, exc)
return None

if response.status_code != 200:
logger.info(
"Skipping %s from upstream (status=%s)",
context,
response.status_code,
)
return None

try:
payload = response.json()
except ValueError:
logger.warning("Skipping %s due to invalid JSON payload", context)
return None

if expected_type is not None and not isinstance(payload, expected_type):
logger.warning(
"Skipping %s because upstream returned %s instead of %s",
context,
type(payload).__name__,
expected_type.__name__,
)
return None

return payload


def _progressively_sync_weak_links_for_pair(
collection: db.Node_collection,
upstream_api_url: str,
base_standard: str,
compare_standard: str,
result_payload: Dict[str, Any],
) -> Tuple[int, int]:
weak_attempted = 0
weak_synced = 0

for key, value in result_payload.items():
if not isinstance(key, str) or not isinstance(value, dict):
continue

extra = value.get("extra")
try:
extra = int(extra) if extra is not None else 0
except (TypeError, ValueError):
extra = 0
if extra <= 0:
continue

weak_cache_key = gap_analysis.make_subresources_key(
standards=[base_standard, compare_standard], key=key
)
if collection.gap_analysis_exists(weak_cache_key):
continue

weak_attempted += 1
weak_payload = _fetch_upstream_json(
f"{upstream_api_url}/map_analysis_weak_links",
context=f"weak links for {base_standard} >> {compare_standard} (key={key})",
params=[
("standard", base_standard),
("standard", compare_standard),
("key", key),
],
expected_type=dict,
)
if weak_payload is None or weak_payload.get("result") is None:
continue

collection.add_gap_analysis_result(
cache_key=weak_cache_key,
ga_object=json.dumps({"result": weak_payload.get("result")}),
)
weak_synced += 1

return weak_attempted, weak_synced


def _progressively_sync_gap_analysis_from_upstream(
collection: db.Node_collection, upstream_api_url: str
) -> None:
max_pairs_raw = os.environ.get(
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV,
str(DEFAULT_UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS),
)
try:
max_pairs = int(max_pairs_raw)
except ValueError:
logger.warning(
"%s should be an integer, got '%s'. Falling back to default limit %s.",
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV,
max_pairs_raw,
DEFAULT_UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS,
)
max_pairs = DEFAULT_UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS
if max_pairs < 0:
logger.warning(
"%s should not be negative, got '%s'. Falling back to default limit %s.",
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV,
max_pairs_raw,
DEFAULT_UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS,
)
max_pairs = DEFAULT_UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS

standards = _fetch_upstream_json(
f"{upstream_api_url}/standards",
context="standards list for progressive map analysis sync",
expected_type=list,
)
if standards is None:
return
standards = [standard for standard in standards if isinstance(standard, str)]
standards = list(dict.fromkeys(standards))

total_pairs = len(standards) * (len(standards) - 1)
if total_pairs == 0:
logger.info("No standard pairs found for progressive map analysis sync")
return

logger.info(
"Starting progressive map analysis sync for up to %s missing pair attempt(s) out of %s total",
max_pairs if max_pairs else "all",
total_pairs,
)

attempted_pairs = 0
synced_pairs = 0
weak_links_attempted = 0
weak_links_synced = 0

for standard_a in standards:
for standard_b in standards:
if standard_a == standard_b:
continue

cache_key = gap_analysis.make_resources_key([standard_a, standard_b])
if collection.gap_analysis_exists(cache_key):
continue

if max_pairs and attempted_pairs >= max_pairs:
logger.info(
"Reached %s=%s after attempting %s missing pair(s), stopping early",
UPSTREAM_SYNC_MAP_ANALYSIS_MAX_PAIRS_ENV,
max_pairs,
attempted_pairs,
)
return

attempted_pairs += 1
payload = _fetch_upstream_json(
f"{upstream_api_url}/map_analysis",
context=f"map analysis for {standard_a} >> {standard_b}",
params=[("standard", standard_a), ("standard", standard_b)],
expected_type=dict,
)
if payload is None or payload.get("result") is None:
continue

collection.add_gap_analysis_result(
cache_key=cache_key,
ga_object=json.dumps({"result": payload.get("result")}),
)
synced_pairs += 1

weak_attempted, weak_synced = _progressively_sync_weak_links_for_pair(
collection=collection,
upstream_api_url=upstream_api_url,
base_standard=standard_a,
compare_standard=standard_b,
result_payload=payload.get("result"),
)
weak_links_attempted += weak_attempted
weak_links_synced += weak_synced

if synced_pairs % 25 == 0:
logger.info(
"Progressive map analysis sync: synced %s pair(s) so far",
synced_pairs,
)

logger.info(
"Progressive map analysis sync complete. Attempted %s missing pair(s), synced %s pair(s), attempted %s weak-link result(s), synced %s weak-link result(s)",
attempted_pairs,
synced_pairs,
weak_links_attempted,
weak_links_synced,
)


def download_graph_from_upstream(cache: str) -> None:
imported_cres = {}
collection = db_connect(path=cache).with_graph()
upstream_api_url = _upstream_api_url()

def download_cre_from_upstream(creid: str):
cre_response = requests.get(
os.environ.get("CRE_UPSTREAM_API_URL", "https://opencre.org/rest/v1")
+ f"/id/{creid}"
f"{upstream_api_url}/id/{creid}",
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
if cre_response.status_code != 200:
raise RuntimeError(
Expand All @@ -487,8 +701,8 @@ def download_cre_from_upstream(creid: str):
download_cre_from_upstream(link.document.id)

root_cres_response = requests.get(
os.environ.get("CRE_UPSTREAM_API_URL", "https://opencre.org/rest/v1")
+ "/root_cres"
f"{upstream_api_url}/root_cres",
timeout=UPSTREAM_SYNC_REQUEST_TIMEOUT_SECONDS,
)
if root_cres_response.status_code != 200:
raise RuntimeError(
Expand All @@ -503,6 +717,19 @@ def download_cre_from_upstream(creid: str):
if link.document.doctype == defs.Credoctypes.CRE:
download_cre_from_upstream(link.document.id)

if not os.environ.get("CRE_NO_NEO4J"):
try:
populate_neo4j_db(cache)
except Exception as exc:
logger.warning(
"Could not populate local neo4j DB during upstream sync: %s", exc
)

_progressively_sync_gap_analysis_from_upstream(
collection=collection,
upstream_api_url=upstream_api_url,
)


# def review_from_disk(cache: str, cre_file_loc: str, share_with: str) -> None:
# """--review --cre_loc <path>
Expand Down
Loading
Loading