Skip to content
9 changes: 5 additions & 4 deletions compass/scripts/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,13 +687,14 @@ async def filter_ordinance_docs(
model_configs[LLMTasks.DEFAULT],
),
)
sources_as_str = "\n\t- ".join(
[doc.attrs.get("source", "Unknown source") for doc in docs]
)
logger.info(
"%d document(s) remaining after jurisdiction filter for %s\n\t- %s",
"%d document(s) remaining after jurisdiction filter for %s %s",
len(docs),
jurisdiction.full_name,
"\n\t- ".join(
[doc.attrs.get("source", "Unknown source") for doc in docs]
),
f"\n\t- {sources_as_str}" if sources_as_str else "",
)

COMPASS_PB.update_jurisdiction_task(
Expand Down
100 changes: 98 additions & 2 deletions compass/utilities/jurisdictions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import logging
from warnings import warn
import importlib.resources
from functools import cached_property
from functools import cached_property, lru_cache
import unicodedata

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -125,6 +126,28 @@ def full_name_the_prefixed(self):

return self.full_name

@cached_property
def short_name_with_state(self):
"""str: Comma-separated short jurisdiction name"""
if self.subdivision_name:
name_parts = [self.full_subdivision_phrase]
else:
name_parts = [self.full_county_phrase]

name_parts.append(self.state)
return ", ".join(filter(None, name_parts))

@cached_property
def short_name_with_state_the_prefixed(self):
"""str: Short jurisdiction name maybe prefixed with ``the``"""
if not self.subdivision_name:
return self.short_name_with_state

if self.type.casefold() in _JURISDICTION_TYPES_AS_PREFIXES:
return f"the {self.short_name_with_state}"

return self.short_name_with_state

@cached_property
def full_subdivision_phrase(self):
"""str: Subdivision phrase for the jurisdiction or empty str"""
Expand Down Expand Up @@ -189,8 +212,16 @@ def load_all_jurisdiction_info():
Notes
-----
Missing values are normalized to ``None`` to simplify downstream
serialization.
serialization. A shallow copy is returned on each call so callers
can safely mutate their local view without affecting the cached
source data.
"""
return _load_all_jurisdiction_info_cached().copy()


@lru_cache(maxsize=1)
def _load_all_jurisdiction_info_cached():
"""Load and cache canonical jurisdiction metadata"""
return pd.concat(
pd.read_csv(fp).replace({np.nan: None})
for fp in KNOWN_JURISDICTIONS_REGISTRY
Expand Down Expand Up @@ -227,6 +258,62 @@ def jurisdiction_websites(jurisdiction_info=None):
}


def load_jurisdictions_from_subdivision_names(
subdivision_names, state=None, jurisdiction_info=None
):
"""Load known jurisdictions matching subdivision names

Parameters
----------
subdivision_names : str or iterable of str
One or more subdivision names to match against the canonical
jurisdiction data.
state : str, optional
Optional state name used to filter matching jurisdictions.
Matching uses the same normalized comparison applied to the
subdivision names. By default, ``None``.
jurisdiction_info : pandas.DataFrame, optional
DataFrame containing jurisdiction info. If ``None``, this info
is loaded using :func:`load_all_jurisdiction_info`. By default,
``None``.

Returns
-------
pandas.DataFrame
Rows from the canonical jurisdiction data whose subdivision
names match the requested names.
"""
if jurisdiction_info is None:
jurisdiction_info = load_all_jurisdiction_info()

if subdivision_names is None:
subdivision_names = []
elif isinstance(subdivision_names, str):
subdivision_names = [subdivision_names]

normalized_names = {
normalized_name
for name in subdivision_names
if (normalized_name := _normalize_jurisdiction_name(name))
}
if not normalized_names:
return jurisdiction_info.iloc[0:0].copy() # empty df

subdivision_mask = (
jurisdiction_info["Subdivision"]
.map(_normalize_jurisdiction_name)
.isin(normalized_names)
)
if state is not None:
normalized_state = _normalize_jurisdiction_name(state)
subdivision_mask &= (
jurisdiction_info["State"].map(_normalize_jurisdiction_name)
== normalized_state
)

return jurisdiction_info[subdivision_mask].reset_index(drop=True)


def load_jurisdictions_from_fp(jurisdiction_fp):
"""Load jurisdiction metadata for entries listed in a CSV file

Expand Down Expand Up @@ -395,3 +482,12 @@ def _format_jurisdiction_df_for_output(df):
def _build_merge_col(row, merge_cols):
"""Build column to merge jurisdiction DataFrames on"""
return " ".join(str(row[c]).casefold() for c in merge_cols)


def _normalize_jurisdiction_name(name):
"""Normalize jurisdiction names for resilient comparisons"""
if name is None or pd.isna(name):
return ""

normalized_name = unicodedata.normalize("NFKD", str(name).casefold())
return "".join(char for char in normalized_name if char.isalnum())
51 changes: 35 additions & 16 deletions compass/validation/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
llm_response_starts_with_yes,
llm_response_starts_with_no,
)
from compass.utilities.jurisdictions import (
load_jurisdictions_from_subdivision_names,
)


def setup_graph_correct_document_type(**kwargs):
Expand Down Expand Up @@ -193,6 +196,10 @@ def setup_graph_correct_document_type(**kwargs):
"(e.g., 'Appendix,' 'Form,' or 'Application Template') as indicators "
"of an unfinished draft. Many finalized ordinances and regulations "
"include such templates for public or administrative use.\n"
"* Do **not** treat blank fields, sequences of underscores, blank "
"lines, or other similar placeholders for **dates** or **signatures** "
"as the sole indicator of draft status. Other signals must be present "
"in order to treat the document as a draft.\n"
"\nFocus instead on signs of incompleteness or active "
"editing, such as (but not limited to):\n"
'* explicit labels: "DRAFT", "DRAFT VERSION", "NOT FINAL", "FOR '
Expand Down Expand Up @@ -314,6 +321,7 @@ def setup_graph_correct_jurisdiction_type(jurisdiction, **kwargs):
),
)

jur_name = jurisdiction.full_name_the_prefixed
names_we_want = _jurisdiction_names_to_extract(jurisdiction)

G.add_edge("init", "has_name", condition=llm_response_starts_with_yes)
Expand Down Expand Up @@ -355,15 +363,14 @@ def setup_graph_correct_jurisdiction_type(jurisdiction, **kwargs):
prompt=(
"Based on the legal text, is there clear and specific "
"evidence that the ordinance applies specifically to "
f"**{jurisdiction.full_name_the_prefixed}**? This could "
f"**{jur_name}**? This could "
f"include a direct mention of **{jurisdiction.state}**, a "
"title, heading, or citation indicating it's an ordinance for "
f"{jurisdiction.state} state, or other language that "
f"reasonably ties the text to {jurisdiction.full_name} "
"specifically. Generic references such as 'the state' or "
"'State Zoning Administrator' are not sufficient on their own "
"unless clearly linked to "
f"{jurisdiction.full_name_the_prefixed}. "
f"unless clearly linked to {jur_name}. "
"{YES_NO_PROMPT}"
),
)
Expand Down Expand Up @@ -409,7 +416,7 @@ def setup_graph_correct_jurisdiction_type(jurisdiction, **kwargs):
prompt=(
"Based on the legal text, is there clear and specific "
"evidence that the ordinance applies specifically to "
f"**{jurisdiction.full_name_the_prefixed}**? This could "
f"**{jur_name}**? This could "
f"include a direct mention of **{jurisdiction.county}**, "
"a title, heading, or citation indicating it's an "
f"ordinance for {jurisdiction.county} "
Expand All @@ -419,7 +426,7 @@ def setup_graph_correct_jurisdiction_type(jurisdiction, **kwargs):
f"{jurisdiction.type.casefold()}' or "
f"'{jurisdiction.type} Zoning Administrator' are not "
"sufficient on their own unless clearly linked to "
f"{jurisdiction.full_name_the_prefixed}. "
f"{jur_name}. "
"{YES_NO_PROMPT}"
),
)
Expand All @@ -431,9 +438,6 @@ def setup_graph_correct_jurisdiction_type(jurisdiction, **kwargs):
node_to_connect = "is_county"

if jurisdiction.subdivision_name:
# TODO: check known jurisdictions to see if duplicate names
# exist in the same state. If not, don't include county name in
# phrase
G.add_edge(
node_to_connect,
"is_subdivision",
Expand Down Expand Up @@ -466,23 +470,30 @@ def setup_graph_correct_jurisdiction_type(jurisdiction, **kwargs):
"has_subdivision_name",
condition=llm_response_starts_with_yes,
)

num_other_jurisdictions_with_same_name = len(
load_jurisdictions_from_subdivision_names(
jurisdiction.subdivision_name, state=jurisdiction.state
)
)
if num_other_jurisdictions_with_same_name > 1:
jur_name = jurisdiction.short_name_with_state_the_prefixed

Comment thread
ppinchuk marked this conversation as resolved.
G.add_node(
"has_subdivision_name",
prompt=(
"Based on the legal text, is there clear and specific "
"evidence that the ordinance applies specifically to "
f"**{jurisdiction.full_name_the_prefixed}**? This could "
"include a direct mention of "
f"**{jur_name}**? This could include a direct mention of "
f"**{jurisdiction.subdivision_name}**, "
"a title, heading, or citation indicating it's an ordinance "
f"for {jurisdiction.full_subdivision_phrase_the_prefixed}, "
"or other language that reasonably ties the text to "
f"{jurisdiction.full_name_the_prefixed} specifically. "
"Generic references such as 'the "
f"{jur_name} specifically. Generic references such as 'the "
f"{jurisdiction.type.casefold()}' or "
f"'{jurisdiction.type} Zoning Administrator' are not "
"sufficient on their own unless clearly linked to "
f"{jurisdiction.full_name_the_prefixed}. "
f"{jur_name}. "
"{YES_NO_PROMPT}"
),
)
Expand All @@ -498,8 +509,7 @@ def setup_graph_correct_jurisdiction_type(jurisdiction, **kwargs):
"'correct_jurisdiction' key should be a boolean that is set to "
"`true` **only if** it is reasonable to conclude that the "
"provisions within apply to the entire area (i.e. "
f"{jurisdiction.type.casefold()}-wide) governed by "
f"**{jurisdiction.full_name_the_prefixed}** "
f"{jurisdiction.type.casefold()}-wide) governed by **{jur_name}** "
"(`false` otherwise). The value of the 'explanation' key should "
"be a string containing a brief explanation for your choice. "
),
Expand Down Expand Up @@ -551,7 +561,16 @@ def setup_graph_correct_jurisdiction_from_url(jurisdiction, **kwargs):
node_to_connect = "init"
keys_to_collect = {"correct_state": f"{jurisdiction.state} state"}

if jurisdiction.county:
should_check_county = bool(jurisdiction.county)
if should_check_county and jurisdiction.subdivision_name:
num_other_jurisdictions_with_same_name = len(
load_jurisdictions_from_subdivision_names(
jurisdiction.subdivision_name, state=jurisdiction.state
)
)
should_check_county &= num_other_jurisdictions_with_same_name > 1

if should_check_county:
G.add_edge(
node_to_connect,
"mentions_county",
Expand Down
43 changes: 42 additions & 1 deletion compass/validation/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@

import asyncio
import logging

from urllib.parse import urlsplit

from compass.llm.calling import BaseLLMCaller, ChatLLMCaller, LLMCaller
from compass.utilities.jurisdictions import jurisdiction_websites
from compass.common import setup_async_decision_tree, run_async_tree
from compass.validation.graphs import (
setup_graph_correct_jurisdiction_type,
Expand Down Expand Up @@ -86,6 +87,15 @@ async def check(self, url):
if not url:
return False

if _url_matches_known_jurisdiction_website(url, self.jurisdiction):
logger.debug(
"Skipping URL jurisdiction LLM check for %s because its "
"domain matches the canonical website for %s",
url,
self.jurisdiction,
)
return True

chat_llm_caller = ChatLLMCaller(
llm_service=self.llm_service,
system_message=self.SYSTEM_MESSAGE,
Expand Down Expand Up @@ -459,3 +469,34 @@ def _weighted_vote(out, raw_pages, doc_source):

weights = max(weights, 1)
return total / weights, num_verdicts


def _url_matches_known_jurisdiction_website(url, jurisdiction):
"""Return whether URL domain matches canonical website"""
known_website = _known_jurisdiction_website(jurisdiction)
if not known_website:
return False

url_domain = _normalize_domain(url)
known_domain = _normalize_domain(known_website)
if not url_domain or not known_domain:
return False

return url_domain == known_domain or url_domain.endswith(
f".{known_domain}"
)


def _known_jurisdiction_website(jurisdiction):
"""Return a canonical website URL for a jurisdiction if available"""
if jurisdiction.website_url:
return jurisdiction.website_url
return jurisdiction_websites().get(jurisdiction.code)


def _normalize_domain(url):
"""Return a comparable domain string for a URL or empty string"""
domain = urlsplit(url).netloc.partition(":")[0].casefold().strip()
if domain.startswith("www."):
return domain[4:]
return domain
Loading
Loading