Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-langchain"
version = "0.1.12"
version = "0.1.13"
description = "UiPath Langchain"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
57 changes: 42 additions & 15 deletions src/uipath_langchain/agent/react/terminate_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from __future__ import annotations

from typing import Any, NoReturn

from langchain_core.messages import AIMessage
from pydantic import BaseModel
from uipath.agent.react import END_EXECUTION_TOOL, RAISE_ERROR_TOOL
Expand All @@ -11,15 +13,51 @@
AgentNodeRoutingException,
AgentTerminationException,
)
from .types import AgentGraphState
from .types import AgentGraphState, AgentTermination


def _handle_end_execution(
args: dict[str, Any], response_schema: type[BaseModel] | None
) -> dict[str, Any]:
"""Handle LLM-initiated termination via END_EXECUTION_TOOL."""
output_schema = response_schema or END_EXECUTION_TOOL.args_schema
validated = output_schema.model_validate(args)
return validated.model_dump()


def _handle_raise_error(args: dict[str, Any]) -> NoReturn:
"""Handle LLM-initiated error via RAISE_ERROR_TOOL."""
error_message = args.get("message", "The LLM did not set the error message")
detail = args.get("details", "")
raise AgentTerminationException(
code=UiPathErrorCode.EXECUTION_ERROR,
title=error_message,
detail=detail,
)


def _handle_agent_termination(termination: AgentTermination) -> NoReturn:
"""Handle Command-based termination."""
raise AgentTerminationException(
code=UiPathErrorCode.EXECUTION_ERROR,
title=termination.title,
detail=termination.detail,
)


def create_terminate_node(
response_schema: type[BaseModel] | None = None,
):
"""Validates and extracts end_execution args to state output field."""
"""Handles Agent Graph termination for multiple sources and output or error propagation to Orchestrator.
Termination scenarios:
1. Command based termination with information in state (e.g: escalation)
2. LLM-initiated termination (END_EXECUTION_TOOL)
3. LLM-initiated error (RAISE_ERROR_TOOL)
"""

def terminate_node(state: AgentGraphState):
if state.termination:
_handle_agent_termination(state.termination)
last_message = state.messages[-1]
if not isinstance(last_message, AIMessage):
raise AgentNodeRoutingException(
Expand All @@ -30,21 +68,10 @@ def terminate_node(state: AgentGraphState):
tool_name = tool_call["name"]

if tool_name == END_EXECUTION_TOOL.name:
args = tool_call["args"]
output_schema = response_schema or END_EXECUTION_TOOL.args_schema
validated = output_schema.model_validate(args)
return validated.model_dump()
return _handle_end_execution(tool_call["args"], response_schema)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make these into tools instead? That way, the LLM gets the tool definitions without additional change.

Copy link
Collaborator Author

@radu-mocanu radu-mocanu Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be better to leave the terminate node as a 'special' tool for separation of concerns.
image


if tool_name == RAISE_ERROR_TOOL.name:
error_message = tool_call["args"].get(
"message", "The LLM did not set the error message"
)
detail = tool_call["args"].get("details", "")
raise AgentTerminationException(
code=UiPathErrorCode.EXECUTION_ERROR,
title=error_message,
detail=detail,
)
_handle_raise_error(tool_call["args"])

raise AgentNodeRoutingException(
"No control flow tool call found in terminate node. Unexpected state."
Expand Down
13 changes: 13 additions & 0 deletions src/uipath_langchain/agent/react/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,23 @@
from pydantic import BaseModel, Field


class AgentTerminationSource(StrEnum):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need more information here. AFAIK escalations states are submit and reject. IMO the model should be the one deciding to terminate. Otherwise, any potential cleanup may get skipped.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the way low code currently works. without this we won t be able to reach parity

ESCALATION = "escalation"


class AgentTermination(BaseModel):
"""Agent Graph Termination model."""

source: AgentTerminationSource
title: str
detail: str = ""


class AgentGraphState(BaseModel):
"""Agent Graph state for standard loop execution."""

messages: Annotated[list[AnyMessage], add_messages] = []
termination: AgentTermination | None = None


class AgentGraphNode(StrEnum):
Expand Down
2 changes: 2 additions & 0 deletions src/uipath_langchain/agent/tools/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Tool creation and management for LowCode agents."""

from .context_tool import create_context_tool
from .escalation_tool import create_escalation_tool
from .integration_tool import create_integration_tool
from .process_tool import create_process_tool
from .tool_factory import (
Expand All @@ -14,4 +15,5 @@
"create_context_tool",
"create_process_tool",
"create_integration_tool",
"create_escalation_tool",
]
119 changes: 119 additions & 0 deletions src/uipath_langchain/agent/tools/escalation_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""Escalation tool creation for Action Center integration."""

from __future__ import annotations

import logging
from enum import Enum
from typing import Annotated, Any

from langchain_core.messages import ToolMessage
from langchain_core.tools import InjectedToolCallId, StructuredTool
from langgraph.types import Command, interrupt
from pydantic import BaseModel, create_model
from uipath.agent.models.agent import (
AgentEscalationChannel,
AgentEscalationRecipientType,
AgentEscalationResourceConfig,
)
from uipath.eval.mocks import mockable
from uipath.platform.common import CreateEscalation
from uipath.utils.dynamic_schema import jsonschema_to_pydantic

from ..react.types import AgentGraphNode, AgentTerminationSource
from .utils import sanitize_tool_name

logger = logging.getLogger(__name__)


class EscalationAction(str, Enum):
"""Actions that can be taken after an escalation completes."""

CONTINUE = "continue"
END = "end"


def create_escalation_tool(resource: AgentEscalationResourceConfig) -> StructuredTool:
"""Uses interrupt() for Action Center human-in-the-loop."""

tool_name: str = f"escalate_{sanitize_tool_name(resource.name)}"
channel: AgentEscalationChannel = resource.channels[0]
base_input_model: type[BaseModel] = jsonschema_to_pydantic(channel.input_schema)
input_model = create_model(
base_input_model.__name__,
__base__=base_input_model,
tool_call_id=(Annotated[str, InjectedToolCallId]),
)
output_model: type[BaseModel] = jsonschema_to_pydantic(channel.output_schema)

# only works for UserEmail type as value is GUID for UserId or GroupId for other types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# but the API expects strings e.g: [email protected]
# we need to do user resolution via Identity here
assignee: str | None = (
channel.recipients[0].value
if channel.recipients
and channel.recipients[0].type == AgentEscalationRecipientType.USER_EMAIL
else None
)

@mockable(
name=resource.name,
description=resource.description,
input_schema=input_model.model_json_schema(),
output_schema=output_model.model_json_schema(),
)
async def escalation_tool_fn(
tool_call_id: Annotated[str, InjectedToolCallId], **kwargs: Any
) -> Command[Any] | Any:
task_title = channel.task_title or "Escalation Task"
result = interrupt(
CreateEscalation(
title=task_title,
data=kwargs,
assignee=assignee,
app_name=channel.properties.app_name,
app_folder_path=channel.properties.folder_name,
app_version=channel.properties.app_version,
priority=channel.priority,
labels=channel.labels,
is_actionable_message_enabled=channel.properties.is_actionable_message_enabled,
actionable_message_metadata=channel.properties.actionable_message_meta_data,
)
)

escalation_action = getattr(result, "action", None)
escalation_output = getattr(result, "data", {})
validated_result = output_model.model_validate(escalation_output)

outcome = (
channel.outcome_mapping.get(escalation_action)
if channel.outcome_mapping and escalation_action
else None
)
if outcome and outcome == EscalationAction.END:
return Command(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like an anti-pattern. Tools should ideally not be aware of state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above. the escalations can be configured to terminate the execution on reject

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, can we port over this logic to ToolNode?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is already being converted to a ToolNode

return {tool.name: ToolNode([tool], handle_tool_errors=False) for tool in tools}

update={
"messages": [
ToolMessage(
content="Terminating the agent execution as configured in the escalation outcome",
tool_call_id=tool_call_id,
)
],
"termination": {
"source": AgentTerminationSource.ESCALATION,
"title": f"Agent run ended based on escalation outcome {outcome} with directive {escalation_action}",
"detail": f"Escalation output: {validated_result.model_dump()}",
},
},
goto=AgentGraphNode.TERMINATE,
)

return validated_result

tool = StructuredTool(
name=tool_name,
description=resource.description,
args_schema=input_model,
coroutine=escalation_tool_fn,
)

return tool
4 changes: 4 additions & 0 deletions src/uipath_langchain/agent/tools/tool_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
from langchain_core.tools import BaseTool, StructuredTool
from uipath.agent.models.agent import (
AgentContextResourceConfig,
AgentEscalationResourceConfig,
AgentIntegrationToolResourceConfig,
AgentProcessToolResourceConfig,
BaseAgentResourceConfig,
LowCodeAgentDefinition,
)

from . import create_escalation_tool
from .context_tool import create_context_tool
from .integration_tool import create_integration_tool
from .process_tool import create_process_tool
Expand Down Expand Up @@ -40,5 +42,7 @@ async def _build_tool_for_resource(

elif isinstance(resource, AgentIntegrationToolResourceConfig):
return create_integration_tool(resource)
elif isinstance(resource, AgentEscalationResourceConfig):
return create_escalation_tool(resource)

return None
Loading