Package and function reference for src/.
Public entry point called by python -m src. Configures logging then delegates to _main(). Catches all unhandled exceptions, logs them at ERROR, and exits with code 1.
Parses sys.argv[1:]. Valid commands:
securityDigest— runsrun_security_digest()and prints the result to stdout.--help/-h— prints usage and exits 0.- Anything else — prints usage to stderr and exits 1.
Attaches a logging.StreamHandler (stderr) to the root logger with _CloudRunFormatter. Idempotent — no-ops if handlers are already present.
Custom logging.Formatter that emits single-line JSON:
{"severity": "INFO", "message": "...", "logger": "src.security_digest.pipeline"}Maps Python log levels to GCP severity strings: DEBUG → DEBUG, INFO → INFO, WARNING → WARNING, ERROR → ERROR, CRITICAL → CRITICAL.
Wraps google-genai's genai.Client. Reads config from AppEnv if not passed explicitly.
Plain text generation. Retries up to max_retries times with retry_delay_ms back-off. Detects spend-cap / quota errors via is_quota_exhausted_error() and raises immediately without retrying.
Returns the response text or raises the last exception.
Structured output generation.
- If
schemais aBaseModelsubclass, usesresponse_schema=schemafor native structured output (no post-processing). - Otherwise, calls
generate_text()and extracts the first JSON block from the response. - Falls back to
json.loads()on the full response text if no JSON block is found. - Validates the result against
schemaand logs a warning on validation errors.
Returns True if the exception message contains quota/spend-cap indicators. Used by generate_text() for fail-fast behaviour.
Replaces sequences of 20+ alphanumeric characters in error messages with [REDACTED] before logging. Prevents accidental API key exposure in logs.
AppEnv(BaseSettings) loaded via pydantic-settings. A singleton is instantiated at module level and imported across the codebase.
| Field | Type | Required | Default | Notes |
|---|---|---|---|---|
GEMINI_API_KEY |
SecretStr |
✓ | — | Redacted in repr |
GEMINI_MODEL |
str |
gemini-2.5-flash-lite |
||
GEMINI_API_BASE |
str |
https://generativelanguage.googleapis.com/v1beta |
Must be HTTPS + googleapis.com | |
GITHUB_TOKEN |
SecretStr |
✓ | — | |
SOURCES_CONFIG |
str |
config/sources.yaml |
Path to feed config file | |
SECURITY_MONITORED_PACKAGES |
str |
"" |
CSV / JSON / newline list | |
LLM_GLOBAL_CONTEXT |
str |
"" |
Injected into every LLM prompt | |
SECURITY_MAX_TRIAGE_ITEMS |
int |
40 |
Intel items passed to LLM for triage | |
SECURITY_MAX_ALERT_THREATS |
int |
10 |
Max threats in final output |
_validate_gemini_base validator rejects non-HTTPS or non-googleapis.com values.
Typed loader for sources.yaml / sources.json.
Reads the file, detects format by extension, and calls _from_dict().
Constructs typed dataclasses from the raw dict. Calls _parse_rss_feeds() for the rss_feeds.feeds list.
Validates that every RSS feed URL starts with https://. Raises ValueError otherwise.
| Class | Fields |
|---|---|
GitHubAdvisoriesConfig |
enabled, ecosystems: list[str], per_page: int |
RedditConfig |
enabled, subreddits: list[str] |
RssFeedEntry |
url: str |
RssFeedsConfig |
enabled, feeds: list[RssFeedEntry] |
NvdConfig |
enabled, api_url: str |
CisaKevConfig |
enabled, feed_url: str |
| Field | Type | Description |
|---|---|---|
title |
str |
Short advisory title |
ecosystem |
str |
Package ecosystem (npm, PyPI, etc.) |
threat_level |
str |
CRITICAL / HIGH / MEDIUM |
summary |
str |
2–3 sentence description |
source_url |
str |
Canonical URL — validated by @field_validator to be blank for non-HTTP(S) values |
action_required |
str |
What to do |
cve_id |
str | None |
Normalised CVE ID |
fixed_version |
str | None |
Patched version string |
@field_validator("source_url") — returns "" for any value that does not start with http:// or https://. The pipeline's existing if not source_url: continue check then skips these threats. This prevents schema-validated threats with fabricated URLs from reaching the output.
| Field | Type | Default |
|---|---|---|
threats |
list[SecurityThreat] |
[] |
Used as the response_schema in generate_json().
| Field | Type | Default | Description |
|---|---|---|---|
reddit_title |
str |
"" |
News-style post title |
reddit_body |
str |
"" |
Full Reddit markdown post body |
twitter_thread |
list[str] |
[] |
Thread tweets, each ≤280 chars |
linkedin_post |
str |
"" |
Professional narrative ≤1100 chars |
Used as the response_schema in run_social_drafts().
Logs Pydantic ValidationError details as a structured JSON WARNING to stderr. Called when LLM output fails schema validation.
await sleep(500) — thin wrapper around asyncio.sleep accepting milliseconds.
Parses any datetime string to UTC ISO-8601. Returns "" and logs a WARNING on parse failure.
Keyword scan of text for ecosystem names. Returns the first match (npm, PyPI, Maven, etc.) or "supply-chain" for general security news, or "unknown".
Upper-cases and strip-normalises a CVE string. Returns "" for malformed input.
Regex scan for CVE-YYYY-NNNN pattern in free text. Returns the first match or "".
Returns cve_id if present, else source_url, else title.lower()[:80]. Used for deduplication in _normalize_intel().
Strips whitespace; returns None for empty/None values.
Maps raw LLM threat level to one of CRITICAL, HIGH, MEDIUM. Defaults to MEDIUM for unrecognised values.
Accepts any of:
- JSON array string:
'["react","lodash"]' - Comma-separated:
"react,lodash,express" - Semicolon-separated:
"react;lodash" - Newline-separated
Returns a deduplicated, lowercased list.
Builds word-boundary regex (^|[^a-z0-9@/_.-]){pkg}($|[^a-z0-9@/_.-]) for each package and searches across title + summary + action_required + source_url (all lowercased). Returns the matched package name or None.
Calls _parse_monitored_packages(), then precompiles all regexes once before the threat loop. For each matched threat, sets:
threat_level = "CRITICAL"priority_tag = "TOP_PRIORITY"matched_package = <matched_name>
All fetchers use httpx.AsyncClient with a 30-second timeout and log failures at WARNING or ERROR level. All return list[dict[str, str]].
Queries the GitHub Advisory Database REST API (/advisories?ecosystem={e}&per_page={n}) for each ecosystem in parallel. Extracts fixed_version from vulnerabilities[].first_patched_version.identifier.
Raises HTTPStatusError / RequestError on failure; logs at ERROR and returns [].
Searches each subreddit for npm OR pypi OR malicious package via r/{sub}/search.json?q=...&sort=new&t=day. Handles 403 gracefully (Reddit blocks bots) — logs WARNING and returns [].
Fetches each RSS/Atom URL and parses with xml.etree.ElementTree. Supports <item> (RSS 2.0) and <entry> (Atom). Logs WARNING on HTTP error or malformed XML; continues with remaining feeds.
Queries the NIST NVD CVE 2.0 API for CVEs modified in the last 7 days (lastModStartDate / lastModEndDate params, resultsPerPage=100). Uses _infer_ecosystem_from_nvd() to set ecosystem_hint.
Downloads the CISA KEV catalog (JSON). Skips entries without a cveID field. Returns a flat list with threat_level hint pre-set to CRITICAL.
Inspects cpe criteria strings in the NVD vulnerability object for npm/PyPI indicators. Falls back to _infer_ecosystem(fallback_text).
| Name | Value | Description |
|---|---|---|
_MAX_TRIAGE_INTEL_ITEMS |
from env |
Items sent to LLM (default 40) |
_MAX_ALERT_THREATS |
from env |
Max threats in output (default 10) |
Pass 1: Dedup by {url}|{title.lower()}. Drops items with empty URL or title.
Pass 2: For items sharing a cve_id, keep only the highest-authority source: CISA KEV (0) > NIST NVD (1) > GitHub Advisory (2) > others (99).
Stable sort by (source_rank, ecosystem_relevance, has_cve, recency). Returns top _MAX_TRIAGE_INTEL_ITEMS.
Deterministic extraction: for items with a cve_id, synthesises a SecurityThreat-shaped dict with threat_level="HIGH". No LLM call. Used when Gemini is unavailable.
Caps to _MAX_ALERT_THREATS. Fill order: TOP_PRIORITY → CRITICAL → HIGH → MEDIUM.
Matches each threat's source_url against original intel items. Backfills cve_id, fixed_version, then adds dedup_key and status: "OPEN".
Builds the full triage prompt. max_threats (default: env.SECURITY_MAX_ALERT_THREATS) is injected into rule 4 so the model caps its response. Structure:
- Role statement + optional
global_context - Threat level rubric (CRITICAL / HIGH / MEDIUM definitions)
- Rules: discard Windows/WordPress/hardware, focus on npm/PyPI/supply-chain, max
Nthreats, no hallucinated URLs, empty →{"threats": []} - Monitored packages line
- Raw intel JSON
Builds the social formatting prompt. max_tweets defaults to 8. The prompt instructs Gemini to produce a single JSON object with four keys (reddit_title, reddit_body, twitter_thread, linkedin_post) and enforces per-platform constraints:
| Platform | Constraint |
|---|---|
| Full markdown post; source URLs from input verbatim | |
| Twitter/X | ≤280 chars per tweet, ≤max_tweets total; [REDDIT_LINK] placeholder in closing tweet |
| 900–1100 chars; no raw CVE IDs; hashtags on final line |
Generates social media drafts for all three platforms in a single Gemini call.
threats: the finalised output threat list (dicts, not Pydantic models)gemini: aGeminiServiceinstance (reused from the pipeline run)run_date: ISO date string used in post headers; defaults to today UTCmax_tweets: passed through toget_social_drafts_prompt()
Uses temperature=0.4 (higher than triage to allow more natural phrasing). Returns None and logs an ERROR if generation fails; the pipeline continues and omits the drafts key from output.
Main entry point. social=True when --social flag is passed. Steps:
- Load
AppEnvandSourcesConfig asyncio.gather()all five fetch coroutines- Log fetch counts:
[fetch] github=N reddit=N rss=N nvd=N cisa=N _normalize_intel()→_prioritize_intel_for_triage()- Build
intel_url_setandintel_by_cvelookup tables - Call
GeminiService.generate_json(prompt, SecurityTriageResponse); on failure fall back to_fallback_threats_from_intel() - Hallucination guard: check each
source_url; recover via CVE lookup or drop + log WARNING _enrich_threats()→_apply_monitored_priority()→_select_alert_threats()- If
social=True: callrun_social_drafts(); merge result into output envelope asdrafts print(json.dumps(output, indent=2))- Return
{"run_at": "<iso>", "threats": [...]}— caller prints JSON to stdout