Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
a1834bd
bugfix in the testing env
etsien Oct 2, 2025
1d79035
update tool descriptions for clarity
etsien Oct 19, 2025
6864fa7
refactor tool names to be class constants instead of disparate strings
etsien Oct 19, 2025
e05ea7a
add initial unit tests
etsien Oct 19, 2025
8893f5c
rename tool names to be more consistent and distinct
etsien Oct 19, 2025
dd18463
update unit tests with tool names and tool constants
etsien Oct 19, 2025
4efffcd
cleanup startup guide notebook
etsien Oct 20, 2025
8f3182e
rework intel source score section
etsien Oct 20, 2025
dd215cf
update agent execution stage prompts and make tool descriptions dynamic
etsien Oct 20, 2025
35ee318
add tests for dynamic tool descriptions
etsien Oct 20, 2025
0af8e7a
revamp the tool description list, as well as the checklist prompt for…
etsien Oct 20, 2025
a882f88
revamp checklist prompt implementation, as well as add in dynamic too…
etsien Oct 20, 2025
26f0d74
update tests for tool descriptions
etsien Oct 20, 2025
186350d
add more detailed agent examples with more useful MRKL-formatted steps
etsien Oct 20, 2025
f71e9f8
update for summary prompt
etsien Oct 20, 2025
9707671
update justification prompt with more logic and explanations on how t…
etsien Oct 20, 2025
faeb811
update CVSS prompts and cleanup examples and guidance
etsien Oct 20, 2025
dcf836f
bugfix on intel source
etsien Oct 20, 2025
efa84ad
bug patch for vdb generation
etsien Oct 21, 2025
be0b27d
bugfix by Tamar
etsien Oct 22, 2025
538257d
update register_function() and transitive_search() descriptions
etsien Oct 28, 2025
36bb6d3
bugfix in the testing env
etsien Oct 2, 2025
7b7695d
update tool descriptions for clarity
etsien Oct 19, 2025
d105a38
refactor tool names to be class constants instead of disparate strings
etsien Oct 19, 2025
09fb531
add initial unit tests
etsien Oct 19, 2025
f475fb2
rename tool names to be more consistent and distinct
etsien Oct 19, 2025
a4ed889
update unit tests with tool names and tool constants
etsien Oct 19, 2025
3a13650
cleanup startup guide notebook
etsien Oct 20, 2025
3b477b0
rework intel source score section
etsien Oct 20, 2025
dcdb5cf
update agent execution stage prompts and make tool descriptions dynamic
etsien Oct 20, 2025
9d81f60
add tests for dynamic tool descriptions
etsien Oct 20, 2025
0df74b9
revamp the tool description list, as well as the checklist prompt for…
etsien Oct 20, 2025
333c1eb
revamp checklist prompt implementation, as well as add in dynamic too…
etsien Oct 20, 2025
29306b3
update tests for tool descriptions
etsien Oct 20, 2025
a5368b6
add more detailed agent examples with more useful MRKL-formatted steps
etsien Oct 20, 2025
dbab156
update for summary prompt
etsien Oct 20, 2025
f585abb
update justification prompt with more logic and explanations on how t…
etsien Oct 20, 2025
b3b53e1
update CVSS prompts and cleanup examples and guidance
etsien Oct 20, 2025
c26936e
bugfix on intel source
etsien Oct 20, 2025
ee0c6af
bug patch for vdb generation
etsien Oct 21, 2025
309a554
bugfix by Tamar
etsien Oct 22, 2025
40de82e
update register_function() and transitive_search() descriptions
etsien Oct 28, 2025
7e0bddb
add function locator descriptions
etsien Nov 5, 2025
eb09d06
add names to configs
etsien Nov 5, 2025
8754762
add local output for local testing
etsien Nov 5, 2025
3449499
Merge branch 'APPENG-3801-B-Agent-performance-fixes-checklist-and-exe…
etsien Nov 5, 2025
ee9fd3c
move all prompts out of other files and into prompting.py
etsien Nov 6, 2025
aded7f2
Update prompts, structure them, and order them based on pipeline stage
etsien Nov 6, 2025
2b282d0
add pydantic fields for pipeline stages
etsien Nov 6, 2025
1503e07
add custom ReAct/thinking aware parser to robustly handle ReAct outputs
etsien Nov 6, 2025
508c829
add pydantic output parsing and structure to pipeline stages, add log…
etsien Nov 6, 2025
d3c9ee5
add pydantic to pipeline checklist
etsien Nov 6, 2025
710743a
bugfix
etsien Nov 6, 2025
5986ced
Update tool_names.py
etsien Nov 6, 2025
26f291a
Merge branch 'rh-aiq-main' into APPENG-3853-prompt-standardization-an…
etsien Nov 10, 2025
317ba90
bugfix for brackets and variable passing issues
etsien Nov 18, 2025
b364dd3
Merge branch 'rh-aiq-main' into APPENG-3853-prompt-standardization-an…
etsien Nov 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 96 additions & 1 deletion src/vuln_analysis/data_models/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,107 @@

import typing

from pydantic import BaseModel
from pydantic import BaseModel, Field
from pydantic import model_validator

from .input import AgentMorpheusEngineInput


# =============================================================================
# STRUCTURED OUTPUT MODELS FOR LLM STAGES
# =============================================================================

class IntelScoringCriteria(BaseModel):
"""Individual scoring criteria for intel quality assessment"""
technical_specificity: int = Field(ge=0, le=20, description="Technical detail precision (max 20)")
clarity: int = Field(ge=0, le=10, description="Text clarity and structure (max 10)")
component_impact: int = Field(ge=0, le=15, description="Clarity of affected components (max 15)")
reproducibility: int = Field(ge=0, le=15, description="Exploit reproducibility details (max 15)")
vulnerable_function: int = Field(ge=0, le=15, description="Specific function identification (max 15)")
mitigation: int = Field(ge=0, le=10, description="Mitigation guidance (max 10)")
environment: int = Field(ge=0, le=10, description="Environment context (max 10)")
configuration: int = Field(ge=0, le=5, description="Configuration details (max 5)")


class IntelScoringJustifications(BaseModel):
"""Brief justifications for each scoring criterion"""
technical_specificity: str = Field(min_length=1, max_length=200, description="Why this score")
clarity: str = Field(min_length=1, max_length=200, description="Why this score")
component_impact: str = Field(min_length=1, max_length=200, description="Why this score")
reproducibility: str = Field(min_length=1, max_length=200, description="Why this score")
vulnerable_function: str = Field(min_length=1, max_length=200, description="Why this score")
mitigation: str = Field(min_length=1, max_length=200, description="Why this score")
environment: str = Field(min_length=1, max_length=200, description="Why this score")
configuration: str = Field(min_length=1, max_length=200, description="Why this score")


class IntelScoringOutput(BaseModel):
"""Structured output for intel quality scoring"""
scores: IntelScoringCriteria
justifications: IntelScoringJustifications

def calculate_total(self) -> int:
"""Calculate total score from individual criteria"""
return sum([
self.scores.technical_specificity,
self.scores.clarity,
self.scores.component_impact,
self.scores.reproducibility,
self.scores.vulnerable_function,
self.scores.mitigation,
self.scores.environment,
self.scores.configuration,
])


class ChecklistGenerationOutput(BaseModel):
"""Structured output for checklist generation"""
checklist_items: list[str] = Field(
min_length=3,
max_length=5,
description="List of 3-5 investigation questions for CVE exploitability assessment"
)


class JustificationStructuredOutput(BaseModel):
"""Structured justification output with validated category"""
category: typing.Literal[
"false_positive",
"code_not_present",
"code_not_reachable",
"requires_configuration",
"requires_dependency",
"requires_environment",
"compiler_protected",
"runtime_protected",
"perimeter_protected",
"mitigating_control_protected",
"uncertain",
"vulnerable"
] = Field(description="Exploitability classification category (exact match required)")
reasoning: str = Field(
min_length=20,
description="Brief explanation citing specific evidence from investigation"
)


class CVSSMetricStructuredOutput(BaseModel):
"""Structured output for a single CVSS metric evaluation"""
metric_abbreviation: typing.Literal["AV", "AC", "PR", "UI", "S", "C", "I", "A"]
value_abbreviation: str = Field(pattern=r"^[A-Z]$", description="Single letter value code")
selected_value: str = Field(description="Full name of selected value (e.g., 'Network', 'Low')")
definition_matched: str = Field(description="Definition text that was matched")
evidence: str = Field(description="Supporting evidence quotes or 'None'")

def to_cvss_component(self) -> str:
"""Format as CVSS component string (e.g., 'AV:N')"""
return f"{self.metric_abbreviation}:{self.value_abbreviation}"


# =============================================================================
# EXISTING OUTPUT MODELS
# =============================================================================

class AgentIntermediateStep(BaseModel):
"""
Represents info for an intermediate step taken by an agent.
Expand Down
13 changes: 10 additions & 3 deletions src/vuln_analysis/functions/cve_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
from langchain.agents import AgentExecutor
from langchain.agents import create_react_agent
from langchain.agents.agent import RunnableAgent
from langchain.agents.mrkl.output_parser import MRKLOutputParser
#from langchain.agents.mrkl.output_parser import MRKLOutputParser
from langchain_core.exceptions import OutputParserException
from langchain_core.prompts import PromptTemplate
from pydantic import Field
from vuln_analysis.data_models.state import AgentMorpheusEngineState
from vuln_analysis.tools.tool_names import ToolNames
from vuln_analysis.utils.error_handling_decorator import ToolRaisedException
from vuln_analysis.utils.prompting import get_agent_prompt
from vuln_analysis.utils.thinking_aware_parser import ThinkingAwareMRKLParser, create_thinking_aware_error_handler
from vuln_analysis.logging.loggers_factory import LoggingFactory, trace_id

logger = LoggingFactory.get_agent_logger(__name__)
Expand Down Expand Up @@ -105,17 +106,23 @@ async def _create_agent(config: CVEAgentExecutorToolConfig, builder: Builder,
}
)

# Use thinking-aware parser for robust handling of Llama 3.3 thinking mode
output_parser = ThinkingAwareMRKLParser()

# Create thinking-aware error handler
error_handler = create_thinking_aware_error_handler(enabled_tool_names)

agent = create_react_agent(llm=llm,
tools=tools,
prompt=prompt,
output_parser=MRKLOutputParser(),
output_parser=output_parser,
stop_sequence=["\nObservation:", "\n\tObservation:"])

agent_executor = AgentExecutor(
agent=agent,
tools=tools,
early_stopping_method="force",
handle_parsing_errors="Check your output and make sure it conforms, use the Action/Action Input syntax",
handle_parsing_errors=error_handler,
max_iterations=config.max_iterations,
return_intermediate_steps=config.return_intermediate_steps,
verbose=config.verbose)
Expand Down
12 changes: 9 additions & 3 deletions src/vuln_analysis/functions/cve_generate_cvss.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
from langchain_core.prompts import PromptTemplate
from pydantic import Field
from cvss import CVSS3
from langchain.agents.mrkl.output_parser import MRKLOutputParser
#from langchain.agents.mrkl.output_parser import MRKLOutputParser

from vuln_analysis.data_models.state import AgentMorpheusEngineState
from vuln_analysis.tools.tool_names import ToolNames
from vuln_analysis.utils.prompting import get_cvss_prompt
from vuln_analysis.utils.thinking_aware_parser import ThinkingAwareMRKLParser, create_thinking_aware_error_handler

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -196,17 +197,22 @@ async def _create_agent(config: CVEGenerateCvssToolConfig, builder: Builder,
(tool.name == ToolNames.CODE_KEYWORD_SEARCH and state.code_index_path is None))
]

# Get tool names for error handler
enabled_tool_names = [tool.name for tool in tools]

# Get prompt (examples now embedded in template)
prompt = PromptTemplate.from_template(
get_cvss_prompt(config.prompt, config.prompt_examples)
)

error_handler = _make_parse_error_handler(is_openai)
# Use thinking-aware parser and error handler
output_parser = ThinkingAwareMRKLParser()
error_handler = create_thinking_aware_error_handler(enabled_tool_names)

agent = create_react_agent(llm=llm,
tools=tools,
prompt=prompt,
output_parser=MRKLOutputParser())
output_parser=output_parser)

agent_executor = AgentExecutor(
agent=agent,
Expand Down
38 changes: 26 additions & 12 deletions src/vuln_analysis/functions/cve_justify.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,33 +38,47 @@ class CVEJustifyToolConfig(FunctionBaseConfig, name="cve_justify"):
@register_function(config_type=CVEJustifyToolConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN])
async def cve_justify(config: CVEJustifyToolConfig, builder: Builder):

from langchain_core.prompts import PromptTemplate

from vuln_analysis.data_models.state import AgentMorpheusEngineState
from vuln_analysis.utils.justification_parser import JustificationParser
from vuln_analysis.utils.prompting import get_justification_chat_prompt
from vuln_analysis.data_models.output import JustificationStructuredOutput

jp = JustificationParser()

llm = await builder.get_llm(llm_name=config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN)

prompt = PromptTemplate(input_variables=["summary"], template=jp.JUSTIFICATION_PROMPT)
chain = prompt | llm
# Bind structured output schema to LLM
structured_llm = llm.with_structured_output(JustificationStructuredOutput)

prompt = get_justification_chat_prompt()
chain = prompt | structured_llm

async def justify_cve(summary):
justification_text = await chain.ainvoke({"summary": summary})
return justification_text.content
result = await chain.ainvoke({"summary": summary}) # type: JustificationStructuredOutput
return result

async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState:
trace_id.set(state.original_input.input.scan.id)
results = await asyncio.gather(*(justify_cve(summary) for summary in state.final_summaries.values()))
parsed_justification = await asyncio.gather(jp._parse_justification(results))
results = await asyncio.gather(
*(justify_cve(summary) for summary in state.final_summaries.values())
)

# format justification output
# Convert structured output to expected format
justifications = {}
for i, vuln_id in enumerate(state.checklist_results.keys()):
justifications[vuln_id] = {}
for key in parsed_justification[0]:
justifications[vuln_id][key] = parsed_justification[0][key][i]
result = results[i]

# Map to final justification label
final_label = jp.RAW_TO_FINAL_JUSTIFICATION_MAP.get(result.category, result.category)

# Map to affected status
affected_status = jp.JUSTIFICATION_TO_AFFECTED_STATUS_MAP.get(final_label, "UNKNOWN")

justifications[vuln_id] = {
jp.JUSTIFICATION_LABEL_COL_NAME: final_label,
jp.JUSTIFICATION_REASON_COL_NAME: result.reasoning,
jp.AFFECTED_STATUS_COL_NAME: affected_status
}

state.justifications = justifications
return state
Expand Down
6 changes: 2 additions & 4 deletions src/vuln_analysis/functions/cve_summarize.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,11 @@ class CVESummarizeToolConfig(FunctionBaseConfig, name="cve_summarize"):
@register_function(config_type=CVESummarizeToolConfig, framework_wrappers=[LLMFrameworkEnum.LANGCHAIN])
async def cve_summarize(config: CVESummarizeToolConfig, builder: Builder):

from langchain_core.prompts import PromptTemplate

from vuln_analysis.data_models.state import AgentMorpheusEngineState
from vuln_analysis.utils.prompting import SUMMARY_PROMPT
from vuln_analysis.utils.prompting import get_summary_chat_prompt

llm = await builder.get_llm(llm_name=config.llm_name, wrapper_type=LLMFrameworkEnum.LANGCHAIN)
prompt = PromptTemplate(input_variables=["response"], template=SUMMARY_PROMPT)
prompt = get_summary_chat_prompt()
chain = prompt | llm

async def summarize_cve(results):
Expand Down
37 changes: 24 additions & 13 deletions src/vuln_analysis/utils/checklist_prompt_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from jinja2 import Template
from langchain_core.language_models.base import BaseLanguageModel

from vuln_analysis.utils.prompting import MOD_FEW_SHOT
from vuln_analysis.utils.prompting import CHECKLIST_USER_PROMPT
from vuln_analysis.utils.prompting import additional_intel_prompting
from vuln_analysis.utils.prompting import get_mod_examples
from vuln_analysis.utils.string_utils import attempt_fix_list_string
Expand All @@ -29,10 +29,10 @@

logger = LoggingFactory.get_agent_logger(__name__)

# Format MOD_FEW_SHOT with examples, preserving {tool_descriptions} for Jinja2 rendering
# Format CHECKLIST_USER_PROMPT with examples, preserving {tool_descriptions} for Jinja2 rendering
# Use double braces for tool_descriptions to escape it during format()
_MOD_FEW_SHOT_ESCAPED = MOD_FEW_SHOT.replace('{tool_descriptions}', '{{tool_descriptions}}')
DEFAULT_CHECKLIST_PROMPT = _MOD_FEW_SHOT_ESCAPED.format(examples=get_mod_examples())
_CHECKLIST_USER_PROMPT_ESCAPED = CHECKLIST_USER_PROMPT.replace('{tool_descriptions}', '{{tool_descriptions}}')
DEFAULT_CHECKLIST_PROMPT = _CHECKLIST_USER_PROMPT_ESCAPED.format(examples=get_mod_examples())

cve_prompt2 = """Parse the following numbered checklist into a python list in the format ["x", "y", "z"], a comma separated list surrounded by square braces: {{template}}"""

Expand Down Expand Up @@ -118,7 +118,9 @@ async def generate_checklist(prompt: str | None,
tool_names: list[str] | None = None,
enable_llm_list_parsing: bool = False) -> str:

from vuln_analysis.utils.prompting import build_tool_descriptions
from langchain_core.prompts import ChatPromptTemplate
from vuln_analysis.utils.prompting import build_tool_descriptions, CHECKLIST_SYS_PROMPT
from vuln_analysis.data_models.output import ChecklistGenerationOutput

if not prompt:
prompt = DEFAULT_CHECKLIST_PROMPT
Expand All @@ -135,7 +137,6 @@ async def generate_checklist(prompt: str | None,
tool_descriptions = "Analysis tools will be used to investigate these questions."

# Add tool_descriptions to input_dict for Jinja2 rendering
# This treats it as a Jinja2 variable, consistent with all CVE fields
input_dict_with_tools = {
**input_dict,
'tool_descriptions': tool_descriptions
Expand All @@ -162,15 +163,25 @@ async def generate_checklist(prompt: str | None,
# Jinja2 renders {tool_descriptions} along with all CVE fields
format_cve_intel = await format_jinja_prompt(cve_prompt1, input_dict_with_tools)

gen_checklist = await llm.ainvoke(format_cve_intel)
# Bind structured output schema to LLM
structured_llm = llm.with_structured_output(ChecklistGenerationOutput)

if enable_llm_list_parsing:
parsing_checklist_template = await format_jinja_prompt(cve_prompt2, {"template": gen_checklist.content})
parsed_checklist = await llm.ainvoke(parsing_checklist_template)
return parsed_checklist.content
# Use ChatPromptTemplate with variable placeholder for rendered message
chat_prompt = ChatPromptTemplate.from_messages([
("system", CHECKLIST_SYS_PROMPT),
("user", "{rendered_message}")
])

# Chain returns validated Pydantic object
chain = chat_prompt | structured_llm
# Pass the rendered message to the LLM as a dictionary
result = await chain.ainvoke({"rendered_message": format_cve_intel}) # type: ignore[assignment]

# Return the checklist items as JSON-formatted list string for backward compatibility
# This matches the expected format: '["item1", "item2", ...]'
import json
return json.dumps(result.checklist_items)

except Exception as e:
logging.error(f" Error in generating checklist : {e}")
raise

return gen_checklist.content
Loading