Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ repos:
# Formatters: hooks that re-write Python and RST files
########################################################################################
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.8.0
rev: v0.8.6
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ dependencies = [
"sqlalchemy>=2,<3",
"timm>0.9,<2", # dependency for Hugging Face computer vision models
"torch>=2.2,<3",
"transformers>=4.42.3,<=4.46.3",
"transformers>=4.42.3,<=4.47.1",
"xhtml2pdf", # Convert html to PDF files
]
classifiers = [
Expand Down Expand Up @@ -91,7 +91,7 @@ dev = [
"black>=22,<25", # A deterministic code formatter
# "label-studio>=1.12,<2.0", # Tool for labeling training data, exclude until pydantic upgrade
"tox>=4,<5", # Python test environment manager
"twine>=4,<6", # Used to make releases to PyPI
"twine>=4,<7", # Used to make releases to PyPI
]
docs = [
"doc8>=1,<2", # Ensures clean documentation formatting
Expand All @@ -105,7 +105,7 @@ tests = [
"coverage>=7,<8", # Lets us track what code is being tested
"exceptiongroup>=1,<2",
"jupyter", # For integration testing Jupyter notebooks
"mypy>=1.0,<1.14", # Static type checking
"mypy>=1.0,<1.15", # Static type checking
"nbconvert>=7,<8",
"nbformat>=5,<6",
"pre-commit>=3,<5", # Allow us to run pre-commit hooks in testing
Expand Down
134 changes: 134 additions & 0 deletions src/mozilla_sec_eia/models/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
"""Define schema for PUDL models."""

import pyarrow as pa

PUDL_MODELS_SCHEMA = {
"core_sec10k__filings": pa.schema(
[
pa.field(
"filename",
pa.string(),
metadata={
"description": "Name of filing as provided by SEC data portal."
},
),
pa.field(
"cik",
pa.string(),
metadata={
"description": "SEC Central Index Key, which uniquely identifies corporations."
},
),
pa.field(
"company_name",
pa.string(),
metadata={"description": "Name of company submitting filing."},
),
pa.field(
"form_type",
pa.string(),
metadata={"description": "Specific version of SEC 10k filed."},
),
pa.field(
"date_filed",
pa.date64(),
metadata={"description": "Date filing was submitted."},
),
pa.field(
"exhibit_21_version",
pa.string(),
metadata={
"description": "Version of exhibit 21 submitted (if applicable)."
},
),
pa.field(
"year_quarter",
pa.string(),
metadata={"description": "Year quarter filing applies to."},
),
],
metadata={"description": "Metadata describing all submitted SEC 10k filings."},
),
"core_sec10k__exhibit_21_company_ownership": pa.schema(
[
pa.field(
"filename",
pa.string(),
metadata={
"description": "Name of filing as provided by SEC data portal."
},
),
pa.field(
"subsidiary",
pa.string(),
metadata={"description": "Name of subsidiary company."},
),
pa.field(
"location",
pa.string(),
metadata={"description": "Location of subsidiary company."},
),
pa.field(
"ownership_percentage",
pa.string(),
metadata={
"description": "Percentage of subsidiary company owned by parent."
},
),
pa.field(
"year_quarter",
pa.string(),
metadata={"description": "Year quarter filing applies to."},
),
],
metadata={
"description": "Company ownership data extracted from Exhibit 21 attachments to SEC 10k filings."
},
),
"core_sec10k__company_information": pa.schema(
[
pa.field(
"filename",
pa.string(),
metadata={
"description": "Name of filing as provided by SEC data portal."
},
),
pa.field(
"filer_count",
pa.int64(),
metadata={
"description": "Index company information as some filings contain information for multiple companies."
},
),
pa.field(
"block",
pa.string(),
metadata={"description": "Title of block of data."},
),
pa.field(
"block_count",
pa.int64(),
metadata={
"description": "Some blocks are repeated, `block_count` defines the index of the data block."
},
),
pa.field(
"key",
pa.string(),
metadata={"description": "Key within block."},
),
pa.field(
"value",
pa.string(),
metadata={"description": "String value of data point."},
),
pa.field(
"year_quarter",
pa.string(),
metadata={"description": "Year quarter filing applies to."},
),
],
metadata={"description": "Company information extracted from SEC 10k filings."},
),
}
150 changes: 140 additions & 10 deletions src/mozilla_sec_eia/models/sec10k/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,61 @@
from dagster_pandera import pandera_schema_to_dagster_type
from pandera.typing import Index, Series

sec_company_id = pa.Field(description="Assigned identifier for the company.")
filename = pa.Field(description="Name of extracted filing.")
central_index_key = pa.Field(description="Identifier of the company in SEC database.")
report_date = pa.Field(description="Report date of the record.")
company_name = pa.Field(
description="Cleaned name of the company with legal terms expanded."
)
utility_id_eia = pa.Field(
description="EIA utility identifier for the company. Matched via record linkage model.",
nullable=True,
coerce=True,
)
street_address = pa.Field(description="Street address of the company.", nullable=True)
street_address_2 = pa.Field(
description="Secondary street address of the company.", nullable=True
)
phone_number = pa.Field(description="Phone number of company.", nullable=True)
city = pa.Field(description="The city where the company is located.", nullable=True)
state = pa.Field(
description="Two letter state code where the company is located.", nullable=True
)
state_of_incorporation = pa.Field(
description="Two letter state code where the company is located.", nullable=True
)
zip_code = pa.Field(
description="5 digit zip code where the company is located.", nullable=True
)
company_name_raw = pa.Field(description="The raw company name.", nullable=True)
date_of_name_change = pa.Field(
description="Date of last name change of the company.", nullable=True
)
former_conformed_name = pa.Field(
description="Former name of the company.", nullable=True
)
standard_industrial_classification = pa.Field(
description="The company's type of business.", nullable=True
)
location_of_inc = pa.Field(
description="Cleaned location of incorporation of the company.", nullable=True
)
company_name_no_legal = pa.Field(
description="Company name with legal terms stripped, e.g. LLC", nullable=True
)
company_name_mphone = pa.Field(
description="Metaphone of the company name, could be used for record linkage."
)
irs_number = pa.Field(description="ID of the company with the IRS.", nullable=True)
files_10k = pa.Field(description="Indicates whether the company files a 10-K.")
#: Use str to avoid conversion errors
own_per = pa.Field(
description="Parent company's percent ownership of the company. String extracted from SEC 10-K Ex. 21 attachment.",
nullable=True,
coerce=True,
)


class Ex21CompanyOwnership(pa.DataFrameModel):
"""Define table structure for extracted EX 21 data."""
Expand All @@ -14,22 +69,18 @@ class Ex21CompanyOwnership(pa.DataFrameModel):
description="Location of subsidiary company.", nullable=True
)
#: Use str to avoid conversion errors
own_per: Series[str] = pa.Field(
description="Percent ownership of subsidiary company.",
nullable=True,
coerce=True,
)
own_per: Series[str] = own_per


class Basic10kCompanyInfo(pa.DataFrameModel):
"""Define table structure for extracted basic 10k data."""

filename: Index[str] = pa.Field(description="Name of extracted filing.")
filer_count: Index[str] = pa.Field(
filename: Index[str] = filename
filer_count: Index[int] = pa.Field(
description="Some filings have multiple blocks of company data."
)
block: Index[str] = pa.Field(description="Block of company data.")
block_count: Index[str] = pa.Field(description="Some blocks occur multiple times.")
block_count: Index[int] = pa.Field(description="Some blocks occur multiple times.")
key: Index[str] = pa.Field(description="Key within block.")
value: Series[str] = pa.Field(description="Company info fact.")

Expand All @@ -44,7 +95,7 @@ class Config:
class Sec10kExtractionMetadata(pa.DataFrameModel):
"""Define table structure extraction metadata."""

filename: Index[str] = pa.Field(description="Name of extracted filing.")
filename: Index[str] = filename
success: Series[bool] = pa.Field(
description="Indicates whether filing was successfully extracted.", coerce=True
)
Expand All @@ -56,14 +107,93 @@ class Sec10kExtractionMetadata(pa.DataFrameModel):
class Ex21Layout(pa.DataFrameModel):
"""Define table structure for ex21 layout classification."""

filename: Index[str] = pa.Field(description="Name of extracted filing.")
filename: Index[str] = filename
paragraph: Series[bool] = pa.Field(
description="Indicates whether ex21 is formatted as a paragraph or not.",
coerce=True,
)


class Sec10kCoreTable(pa.DataFrameModel):
"""Define table structure for core SEC companies table."""

sec_company_id: Series[str] = sec_company_id
company_name: Series[str] = company_name
filename: Series[str] = filename
report_date: Series[pa.DateTime] = report_date
central_index_key: Series[str] = central_index_key
utility_id_eia: Series[int] = utility_id_eia
street_address: Series[str] = street_address
street_address_2: Series[str] = street_address_2
phone_number: Series[str] = phone_number
city: Series[str] = city
state: Series[str] = state
state_of_incorporation: Series[str] = state_of_incorporation
zip_code: Series[str] = zip_code
company_name_raw: Series[str] = company_name_raw
company_name_no_legal: Series[str] = company_name_no_legal
company_name_mphone: Series[str] = company_name_mphone
date_of_name_change: Series[pa.DateTime] = date_of_name_change
former_conformed_name: Series[str] = former_conformed_name
standard_industrial_classification: Series[str] = standard_industrial_classification
location_of_inc: Series[str] = location_of_inc
irs_number: Series[str] = irs_number
files_10k: Series[bool] = files_10k


class Sec10kOutputTable(pa.DataFrameModel):
"""Define table structure for output parents and subsidiaries table."""

sec_company_id: Series[str] = sec_company_id
company_name: Series[str] = company_name
filename: Series[str] = filename
report_date: Series[pa.DateTime] = report_date
central_index_key: Series[str] = pa.Field(
description="Identifier of the company in SEC database.", nullable=True
)
utility_id_eia: Series[float] = utility_id_eia
street_address: Series[str] = street_address
street_address_2: Series[str] = street_address_2
city: Series[str] = city
state: Series[str] = state
company_name_raw: Series[str] = company_name_raw
date_of_name_change: Series[pa.DateTime] = date_of_name_change
former_conformed_name: Series[str] = former_conformed_name
standard_industrial_classification: Series[str] = standard_industrial_classification
state_of_incorporation: Series[str] = state_of_incorporation
location_of_inc: Series[str] = location_of_inc
irs_number: Series[str] = irs_number
files_10k: Series[bool] = files_10k
parent_company_cik: Series[str] = pa.Field(
description="CIK of the company's parent company.", nullable=True
)
own_per: Series[str] = own_per


class EiaCompanies(pa.DataFrameModel):
"""Define table structure for EIA owner and operator companies table."""

company_name: Series[str] = company_name
street_address: Series[str] = street_address
street_address_2: Series[str] = street_address_2
utility_id_eia: Series[int] = utility_id_eia
company_name_raw: Series[str] = company_name_raw
report_date: Series[pa.DateTime] = report_date
report_year: Series[int] = pa.Field(
description="Report year of the record.", coerce=True
)
city: Series[str] = city
state: Series[str] = state
zip_code: Series[str] = zip_code
phone_number: Series[str] = phone_number
company_name_no_legal: Series[str] = company_name_no_legal
company_name_mphone: Series[str] = company_name_mphone


ex21_extract_type = pandera_schema_to_dagster_type(Ex21CompanyOwnership)
basic_10k_extract_type = pandera_schema_to_dagster_type(Basic10kCompanyInfo)
sec10k_extract_metadata_type = pandera_schema_to_dagster_type(Sec10kExtractionMetadata)
ex21_layout_type = pandera_schema_to_dagster_type(Ex21Layout)
eia_layout_type = pandera_schema_to_dagster_type(EiaCompanies)
sec10k_output_layout_type = pandera_schema_to_dagster_type(Sec10kOutputTable)
sec10k_core_layout_type = pandera_schema_to_dagster_type(Sec10kCoreTable)
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ def clean_extracted_df(extracted_df):
r"[^a-zA-Z&,\s]", "", regex=True
)
if "own_per" in extracted_df.columns:
# enforce single decimal points
extracted_df["own_per"] = extracted_df["own_per"].str.replace(
r"\.+", ".", regex=True
)
# remove special chars and letters
extracted_df["own_per"] = extracted_df["own_per"].str.replace(
r"[^\d.]", "", regex=True
Expand Down
10 changes: 4 additions & 6 deletions src/mozilla_sec_eia/models/sec_eia_record_linkage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
eia_assets = load_assets_from_modules([transform_eia_input])
sec_assets = load_assets_from_modules([transform_sec_input])

eia_input_table_production_job = model_jobs.create_production_model_job(
"eia_input_table_creation", transform_eia_input.production_assets
)
sec_input_table_production_job = model_jobs.create_production_model_job(
"sec_input_table_creation", transform_sec_input.production_assets
record_linkage_job = model_jobs.create_production_model_job(
"sec_eia_record_linkage",
transform_eia_input.production_assets + transform_sec_input.production_assets,
)

# Create year_quarter partitions
Expand Down Expand Up @@ -63,7 +61,7 @@
sec_assets
+ eia_assets
+ [basic_10k_company_info, ex21_company_ownership_info, sec10k_filing_metadata],
jobs=[eia_input_table_production_job, sec_input_table_production_job],
jobs=[record_linkage_job],
resources={
"cloud_interface": cloud_interface_resource,
"mlflow_interface": mlflow_interface_resource,
Expand Down
Loading