From 3889f7433dc8870ee70477487412744a143ff68f Mon Sep 17 00:00:00 2001 From: GabrielVasilescu04 Date: Thu, 30 Oct 2025 16:19:45 +0200 Subject: [PATCH 1/6] feat: support for conversational coded agents --- src/uipath_langchain/_cli/_utils/_config.py | 43 +++++++++++++++++++++ src/uipath_langchain/_cli/cli_run.py | 26 +++++++++++++ 2 files changed, 69 insertions(+) create mode 100644 src/uipath_langchain/_cli/_utils/_config.py diff --git a/src/uipath_langchain/_cli/_utils/_config.py b/src/uipath_langchain/_cli/_utils/_config.py new file mode 100644 index 00000000..f41d4cfa --- /dev/null +++ b/src/uipath_langchain/_cli/_utils/_config.py @@ -0,0 +1,43 @@ +import json +import logging +import os +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +class UiPathConfig: + """Configuration from uipath.json""" + + def __init__(self, config_path: str = "uipath.json"): + self.config_path = config_path + self._config: Optional[Dict[str, Any]] = None + + @property + def exists(self) -> bool: + """Check if uipath.json exists""" + return os.path.exists(self.config_path) + + def load_config(self) -> Dict[str, Any]: + """Load and validate configuration""" + if not self.exists: + raise FileNotFoundError(f"Config file not found: {self.config_path}") + + try: + with open(self.config_path, "r") as f: + config = json.load(f) + + self._config = config + return config + except Exception as e: + logger.error(f"Failed to load uipath.json: {str(e)}") + raise + + @property + def is_conversational(self) -> bool: + """Check if the agent is conversational""" + if not self._config: + self.load_config() + + # Check isConversational at root level (testing purposes only) + return self._config.get("isConversational", False) if self._config else False diff --git a/src/uipath_langchain/_cli/cli_run.py b/src/uipath_langchain/_cli/cli_run.py index 819c2ae9..9441afa7 100644 --- a/src/uipath_langchain/_cli/cli_run.py +++ b/src/uipath_langchain/_cli/cli_run.py @@ -7,6 +7,8 @@ get_current_span, ) from uipath._cli._debug._bridge import ConsoleDebugBridge, UiPathDebugBridge +from uipath._cli._conversational._bridge import get_conversation_bridge +from uipath._cli._conversational._runtime import UiPathConversationRuntime from uipath._cli._runtime._contracts import ( UiPathRuntimeFactory, UiPathRuntimeResult, @@ -14,6 +16,7 @@ from uipath._cli.middlewares import MiddlewareResult from uipath._events._events import UiPathAgentStateEvent from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter +from uipath._events._events import UiPathAgentMessageEvent from .._tracing import ( _instrument_traceable_attributes, @@ -23,6 +26,7 @@ LangGraphRuntimeContext, LangGraphScriptRuntime, ) +from ._utils._config import UiPathConfig from ._utils._graph import LangGraphConfig @@ -50,6 +54,13 @@ async def execute(): context.execution_id = context.job_id or "default" _instrument_traceable_attributes() + # Check if this is a conversational agent + uipath_config = UiPathConfig() + is_conversational = False + if uipath_config.exists: + is_conversational = uipath_config.is_conversational + context.is_conversational = is_conversational + def generate_runtime( ctx: LangGraphRuntimeContext, ) -> LangGraphScriptRuntime: @@ -64,6 +75,7 @@ def generate_runtime( LangGraphScriptRuntime, LangGraphRuntimeContext, runtime_generator=generate_runtime, + context_generator=lambda: context, ) runtime_factory.add_instrumentor(LangChainInstrumentor, get_current_span) @@ -75,8 +87,22 @@ def generate_runtime( runtime_factory.add_span_exporter( LlmOpsHttpExporter(extra_process_spans=True) ) + + # Handle conversational agents + if is_conversational: + conversation_bridge = get_conversation_bridge(context) + async with UiPathConversationRuntime.from_conversation_context( + context=context, + factory=runtime_factory, + conversation_bridge=conversation_bridge, + ) as conversation_runtime: + await conversation_runtime.execute() + # Handle non-conversational agents + elif context.job_id: + # Cloud execution - direct runtime execution await runtime_factory.execute(context) else: + # Local execution - stream with debug bridge for visibility debug_bridge: UiPathDebugBridge = ConsoleDebugBridge() await debug_bridge.emit_execution_started(context.execution_id) async for event in runtime_factory.stream(context): From adfcca1e2bc341ca85e8673be3262f3c019fcf9f Mon Sep 17 00:00:00 2001 From: GabrielVasilescu04 Date: Mon, 3 Nov 2025 12:35:50 +0200 Subject: [PATCH 2/6] wip: add mappings for events --- .../_cli/_runtime/_conversation.py | 103 ++++++++++------- .../_cli/_runtime/_runtime.py | 23 +++- src/uipath_langchain/_cli/cli_run.py | 27 ++++- src/uipath_langchain/chat/__init__.py | 2 + src/uipath_langchain/chat/content_blocks.py | 109 ++++++++++++++++++ 5 files changed, 218 insertions(+), 46 deletions(-) create mode 100644 src/uipath_langchain/chat/content_blocks.py diff --git a/src/uipath_langchain/_cli/_runtime/_conversation.py b/src/uipath_langchain/_cli/_runtime/_conversation.py index 6a588951..0f596e26 100644 --- a/src/uipath_langchain/_cli/_runtime/_conversation.py +++ b/src/uipath_langchain/_cli/_runtime/_conversation.py @@ -1,3 +1,4 @@ +import logging import uuid from datetime import datetime from typing import Any, Dict, List, Optional @@ -9,6 +10,7 @@ HumanMessage, ToolMessage, ) +from pydantic import TypeAdapter, ValidationError from uipath.agent.conversation import ( UiPathConversationContentPartChunkEvent, UiPathConversationContentPartEndEvent, @@ -26,6 +28,15 @@ UiPathInlineValue, ) +from uipath_langchain.chat.content_blocks import ( + ContentBlock, + TextContent, + ToolCallChunkContent, + ToolCallContent, +) + +logger = logging.getLogger(__name__) + def _new_id() -> str: return str(uuid.uuid4()) @@ -125,54 +136,68 @@ def map_message( ) elif isinstance(message.content, list) and message.content: - for chunk in message.content: - if not isinstance(chunk, dict): + content_adapter = TypeAdapter(ContentBlock) + + for raw_chunk in message.content: + if not isinstance(raw_chunk, dict): continue - idx = chunk.get("index", 0) - ctype = chunk.get("type") - id = chunk.get("id", f"chunk-{message.id}-{idx}") - - # Start of a tool call - if ctype == "tool_use": - msg_event.tool_call = UiPathConversationToolCallEvent( - tool_call_id=id, - start=UiPathConversationToolCallStartEvent( - tool_name=chunk.get("name") or "", - arguments=UiPathInlineValue(inline=""), - timestamp=timestamp, - ), - ) - # JSON args streaming (content part for tool args) - elif ctype == "input_json_delta": - text = chunk.get("partial_json", "") - # first delta: emit content part start + first chunk - if text == "": + try: + # Parse chunk + chunk = content_adapter.validate_python(raw_chunk) + + if isinstance(chunk, TextContent): + chunk_id = raw_chunk.get("id", f"chunk-{message.id}-0") msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=id, - start=UiPathConversationContentPartStartEvent( - mime_type="application/json" + content_part_id=chunk_id, + chunk=UiPathConversationContentPartChunkEvent( + data=chunk.text, + content_part_sequence=0, ), ) - else: - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=id, - chunk=UiPathConversationContentPartChunkEvent( - data=text, - content_part_sequence=idx, + + elif isinstance(chunk, ToolCallContent): + # Complete tool call (non-streaming) + msg_event.tool_call = UiPathConversationToolCallEvent( + tool_call_id=chunk.id, + start=UiPathConversationToolCallStartEvent( + tool_name=chunk.name, + arguments=UiPathInlineValue(inline=str(chunk.args)), + timestamp=timestamp, ), + end=UiPathConversationToolCallEndEvent(timestamp=timestamp), ) - # Plain text from assistant - elif ctype == "text": - text = chunk.get("text", "") - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=id, - chunk=UiPathConversationContentPartChunkEvent( - data=text, - content_part_sequence=idx, - ), + elif isinstance(chunk, ToolCallChunkContent): + # Streaming tool call chunk + chunk_id = chunk.id or f"chunk-{message.id}-{chunk.index or 0}" + + if chunk.name and not chunk.args: + # Tool call start + msg_event.tool_call = UiPathConversationToolCallEvent( + tool_call_id=chunk_id, + start=UiPathConversationToolCallStartEvent( + tool_name=chunk.name, + arguments=UiPathInlineValue(inline=""), + timestamp=timestamp, + ), + ) + elif chunk.args: + # Streaming tool arguments + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=chunk_id, + chunk=UiPathConversationContentPartChunkEvent( + data=str(chunk.args), + content_part_sequence=chunk.index or 0, + ), + ) + + except ValidationError as e: + # Log and skip unknown/invalid chunk types + logger.warning( + f"Failed to parse content chunk: {raw_chunk}. Error: {e}" ) + continue elif isinstance(message.content, str) and message.content: msg_event.content_part = UiPathConversationContentPartEvent( content_part_id=f"content-{message.id}", diff --git a/src/uipath_langchain/_cli/_runtime/_runtime.py b/src/uipath_langchain/_cli/_runtime/_runtime.py index 6fc20e78..e4cd642d 100644 --- a/src/uipath_langchain/_cli/_runtime/_runtime.py +++ b/src/uipath_langchain/_cli/_runtime/_runtime.py @@ -25,7 +25,7 @@ UiPathAgentStateEvent, UiPathRuntimeEvent, ) - +from ._conversation import map_message from .._utils._schema import generate_schema_from_graph from ._context import LangGraphRuntimeContext from ._exception import LangGraphErrorCode, LangGraphRuntimeError @@ -159,11 +159,24 @@ async def stream( if chunk_type == "messages": if isinstance(data, tuple): message, _ = data - event = UiPathAgentMessageEvent( - payload=message, - execution_id=self.context.execution_id, + + # Use stored conversation/exchange IDs from input, or fallback to execution_id + conversation_id = getattr(self.context, "conversation_id", None) or self.context.execution_id + exchange_id = getattr(self.context, "exchange_id", None) or self.context.execution_id + + conversation_event = map_message( + message=message, + exchange_id=exchange_id, + conversation_id=conversation_id, ) - yield event + + # Only emit if conversion was successful + if conversation_event: + event = UiPathAgentMessageEvent( + payload=conversation_event, + execution_id=self.context.execution_id, + ) + yield event # Emit UiPathAgentStateEvent for state updates elif chunk_type == "updates": diff --git a/src/uipath_langchain/_cli/cli_run.py b/src/uipath_langchain/_cli/cli_run.py index 9441afa7..34b3d5cf 100644 --- a/src/uipath_langchain/_cli/cli_run.py +++ b/src/uipath_langchain/_cli/cli_run.py @@ -1,5 +1,7 @@ import asyncio +import logging import os +import json from typing import Optional from openinference.instrumentation.langchain import ( @@ -14,9 +16,10 @@ UiPathRuntimeResult, ) from uipath._cli.middlewares import MiddlewareResult -from uipath._events._events import UiPathAgentStateEvent +from uipath._events._events import UiPathAgentStateEvent, UiPathAgentMessageEvent from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter -from uipath._events._events import UiPathAgentMessageEvent +from uipath.agent.conversation import UiPathConversationMessage +from pydantic import TypeAdapter from .._tracing import ( _instrument_traceable_attributes, @@ -30,6 +33,8 @@ from ._utils._graph import LangGraphConfig +logger = logging.getLogger(__name__) + def langgraph_run_middleware( entrypoint: Optional[str], input: Optional[str], @@ -61,6 +66,24 @@ async def execute(): is_conversational = uipath_config.is_conversational context.is_conversational = is_conversational + if is_conversational and context.input: + try: + input_dict = json.loads(context.input) + + conversation_id = input_dict.get("conversation_id") or input_dict.get("conversationId") + exchange_id = input_dict.get("exchange_id") or input_dict.get("exchangeId") + + # Store IDs in context for reuse in output + if conversation_id: + context.conversation_id = conversation_id + if exchange_id: + context.exchange_id = exchange_id + + context.input_message = TypeAdapter(UiPathConversationMessage).validate_python(input_dict) + logger.info(f"Parsed conversational input: message_id={context.input_message.message_id}, conversation_id={conversation_id}, exchange_id={exchange_id}") + except Exception as e: + logger.warning(f"Failed to parse input as UiPathConversationMessage: {e}. Using as plain JSON.") + def generate_runtime( ctx: LangGraphRuntimeContext, ) -> LangGraphScriptRuntime: diff --git a/src/uipath_langchain/chat/__init__.py b/src/uipath_langchain/chat/__init__.py index 78e3305d..ae35872f 100644 --- a/src/uipath_langchain/chat/__init__.py +++ b/src/uipath_langchain/chat/__init__.py @@ -1,6 +1,8 @@ +from .content_blocks import ContentBlock from .models import UiPathAzureChatOpenAI, UiPathChat __all__ = [ "UiPathChat", "UiPathAzureChatOpenAI", + "ContentBlock" ] diff --git a/src/uipath_langchain/chat/content_blocks.py b/src/uipath_langchain/chat/content_blocks.py new file mode 100644 index 00000000..ae0bf7bb --- /dev/null +++ b/src/uipath_langchain/chat/content_blocks.py @@ -0,0 +1,109 @@ +from typing import Literal, Optional, List, Any, Dict, Annotated, Union + +from pydantic import BaseModel, Field + + +class TextContent(BaseModel): + type: Literal["text"] + text: str = Field(alias="text") + annotations: Optional[List[Any]] = Field(default=None, alias="annotations") + extras: Optional[Dict[str, Any]] = Field(default=None, alias="extras") + +class ReasoningContent(BaseModel): + type: Literal["reasoning"] + reasoning: str = Field(alias="reasoning") + extras: Optional[Dict[str, Any]] = Field(default=None, alias="extras") + +class ImageContent(BaseModel): + type: Literal["image"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class AudioContent(BaseModel): + type: Literal["audio"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class VideoContent(BaseModel): + type: Literal["video"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class FileContent(BaseModel): + type: Literal["file"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class PlainTextContent(BaseModel): + type: Literal["text-plain"] + text: Optional[str] = Field(default=None, alias="text-plain") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class ToolCallContent(BaseModel): + type: Literal["tool_call"] + name: str = Field(alias="name") + args: Dict[str, Any] = Field(alias="args") + id: str = Field(alias="id") + +class ToolCallChunkContent(BaseModel): + type: Literal["tool_call_chunk"] + name: Optional[str] = Field(default=None, alias="name") + args: Optional[str] = Field(default=None, alias="args") + id: Optional[str] = Field(default=None, alias="id") + index: Optional[int | str] = Field(default=None, alias="index") + +class InvalidToolCallContent(BaseModel): + type: Literal["invalid_tool_call"] + name: Optional[str] = Field(default=None, alias="name") + args: Optional[Dict[str, Any]] = Field(default=None, alias="args") + error: Optional[str] = Field(default=None, alias="error") + +class ServerToolCallContent(BaseModel): + type: Literal["server_tool_call"] + id: str = Field(alias="id") + name: str = Field(alias="name") + args: Dict[str, Any] = Field(default=None, alias="args") + +class ServerToolCallChunkContent(BaseModel): + type: Literal["server_tool_call_chunk"] + id: Optional[str] = Field(default=None, alias="id") + name: Optional[str] = Field(default=None, alias="name") + args: Optional[Dict[str, Any]] = Field(default=None, alias="args") + index: Optional[int | str] = Field(default=None, alias="index") + +class ServerToolResultContent(BaseModel): + type: Literal["server_tool_result"] + toll_call_id: str = Field(alias="toll_call_id") + id: Optional[str] = Field(default=None, alias="id") + status: str = Field(alias="status") + output: Optional[Any] = Field(default=None, alias="output") + +ContentBlock = Annotated[ + Union[ + TextContent, + ReasoningContent, + ImageContent, + AudioContent, + VideoContent, + FileContent, + PlainTextContent, + ToolCallContent, + ToolCallChunkContent, + InvalidToolCallContent, + ServerToolCallContent, + ServerToolCallChunkContent, + ServerToolResultContent, + InvalidToolCallContent, + ServerToolResultContent + ], + Field(discriminator="type") +] + From 78d8f8cf84a9dc5a080e06351c2c1e794d62bef4 Mon Sep 17 00:00:00 2001 From: GabrielVasilescu04 Date: Thu, 6 Nov 2025 11:20:41 +0200 Subject: [PATCH 3/6] working E2E flow with plain/text --- pyproject.toml | 9 +- .../_cli/_runtime/_conversation.py | 212 +++++++++--------- .../_cli/_runtime/_runtime.py | 9 +- 3 files changed, 113 insertions(+), 117 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index cd0941cb..99ce3a2c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,13 +5,13 @@ description = "UiPath Langchain" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.10" dependencies = [ - "uipath>=2.1.123, <2.2.0", - "langgraph>=0.5.0, <0.7.0", + "uipath @ file:///C:/Users/gabriel.vasilescu/source/repos/uipath-python", + "langgraph>=1.0.0", "langchain-core>=0.3.34", "langgraph-checkpoint-sqlite>=2.0.3", "langchain-community>=0.3.21", "langchain-openai>=0.3.3", - "langchain>=0.3.4", + "langchain>=1.0.1", "pydantic-settings>=2.6.0", "python-dotenv>=1.0.1", "httpx>=0.27.0", @@ -62,6 +62,9 @@ langchain = [ "uipath-langchain>=0.0.2" ] +[tool.hatch.metadata] +allow-direct-references = true + [tool.hatch.build.targets.wheel] packages = ["src/uipath_langchain"] diff --git a/src/uipath_langchain/_cli/_runtime/_conversation.py b/src/uipath_langchain/_cli/_runtime/_conversation.py index 0f596e26..a32cc4c8 100644 --- a/src/uipath_langchain/_cli/_runtime/_conversation.py +++ b/src/uipath_langchain/_cli/_runtime/_conversation.py @@ -1,6 +1,6 @@ import logging import uuid -from datetime import datetime +from datetime import datetime, timezone from typing import Any, Dict, List, Optional from langchain_core.messages import ( @@ -115,10 +115,12 @@ def map_message( message: BaseMessage, exchange_id: Optional[str] = None, conversation_id: Optional[str] = None, + tool_chunks_dict: Optional[Dict[str, ToolCallChunkContent]] = None, ) -> Optional[UiPathConversationEvent]: """Convert LangGraph BaseMessage (chunk or full) into a UiPathConversationEvent.""" message_id = getattr(message, "id", None) or _new_id() - timestamp = datetime.now().isoformat() + # Format timestamp as ISO 8601 UTC with milliseconds: 2025-01-04T10:30:00.123Z + timestamp = datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z') # --- Streaming AIMessageChunk --- if isinstance(message, AIMessageChunk): @@ -126,12 +128,22 @@ def map_message( message_id=message.id or _new_id(), ) + # Check if this is the last chunk by examining chunk_position + chunk = AIMessageChunk(**message.model_dump()) + if hasattr(chunk, "chunk_position") and getattr(chunk, "chunk_position") == "last": + msg_event.end = UiPathConversationMessageEndEvent(timestamp=timestamp) + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=f"chunk-{message.id}-0", + end=UiPathConversationContentPartEndEvent() + ) + return _wrap_in_conversation_event(msg_event, exchange_id, conversation_id) + if message.content == []: msg_event.start = UiPathConversationMessageStartEvent( role="assistant", timestamp=timestamp ) msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=f"chunk-{message.id}-{0}", + content_part_id=f"chunk-{message.id}-0", start=UiPathConversationContentPartStartEvent(mime_type="text/plain"), ) @@ -147,6 +159,7 @@ def map_message( chunk = content_adapter.validate_python(raw_chunk) if isinstance(chunk, TextContent): + logger.warning("MAPPED TO TextContentChunk") chunk_id = raw_chunk.get("id", f"chunk-{message.id}-0") msg_event.content_part = UiPathConversationContentPartEvent( content_part_id=chunk_id, @@ -156,41 +169,51 @@ def map_message( ), ) - elif isinstance(chunk, ToolCallContent): - # Complete tool call (non-streaming) - msg_event.tool_call = UiPathConversationToolCallEvent( - tool_call_id=chunk.id, - start=UiPathConversationToolCallStartEvent( - tool_name=chunk.name, - arguments=UiPathInlineValue(inline=str(chunk.args)), - timestamp=timestamp, - ), - end=UiPathConversationToolCallEndEvent(timestamp=timestamp), - ) + #elif isinstance(chunk, ToolCallContent): + # # Complete tool call (non-streaming) + # msg_event.tool_call = UiPathConversationToolCallEvent( + # tool_call_id=chunk.id, + # start=UiPathConversationToolCallStartEvent( + # tool_name=chunk.name, + # arguments=UiPathInlineValue(inline=str(chunk.args)), + # timestamp=timestamp, + # ), + # end=UiPathConversationToolCallEndEvent(timestamp=timestamp), + # ) elif isinstance(chunk, ToolCallChunkContent): - # Streaming tool call chunk - chunk_id = chunk.id or f"chunk-{message.id}-{chunk.index or 0}" - - if chunk.name and not chunk.args: - # Tool call start - msg_event.tool_call = UiPathConversationToolCallEvent( - tool_call_id=chunk_id, - start=UiPathConversationToolCallStartEvent( - tool_name=chunk.name, - arguments=UiPathInlineValue(inline=""), - timestamp=timestamp, - ), - ) - elif chunk.args: - # Streaming tool arguments - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=chunk_id, - chunk=UiPathConversationContentPartChunkEvent( - data=str(chunk.args), - content_part_sequence=chunk.index or 0, - ), - ) + # Streaming tool call chunk - accumulate in dictionary instead of emitting + if tool_chunks_dict is not None and chunk.id: + # Add or accumulate chunk using the __add__ operator + if chunk.id in tool_chunks_dict: + tool_chunks_dict[chunk.id] = tool_chunks_dict[chunk.id] + chunk + else: + tool_chunks_dict[chunk.id] = chunk + # Don't emit any events for tool call chunks - they'll be processed when ToolMessage arrives + continue + + # OLD CODE (commented out - now accumulating chunks in dict): + # chunk_id = chunk.id or f"chunk-{message.id}-{chunk.index or 0}" + # + # if chunk.name and not chunk.args: + # # Tool call start + # msg_event.tool_call = UiPathConversationToolCallEvent( + # tool_call_id=chunk_id, + # start=UiPathConversationToolCallStartEvent( + # tool_name=chunk.name, + # arguments=UiPathInlineValue(inline=""), + # timestamp=timestamp, + # ), + # ) + # elif chunk.args: + # # Streaming tool arguments + # msg_event.content_part = UiPathConversationContentPartEvent( + # content_part_id=chunk_id, + # chunk=UiPathConversationContentPartChunkEvent( + # data=str(chunk.args), + # content_part_sequence=chunk.index or 0, + # ), + # ) except ValidationError as e: # Log and skip unknown/invalid chunk types @@ -198,6 +221,7 @@ def map_message( f"Failed to parse content chunk: {raw_chunk}. Error: {e}" ) continue + elif isinstance(message.content, str) and message.content: msg_event.content_part = UiPathConversationContentPartEvent( content_part_id=f"content-{message.id}", @@ -207,10 +231,6 @@ def map_message( ), ) - stop_reason = message.response_metadata.get("stop_reason") - if not message.content and stop_reason in ("tool_use", "end_turn"): - msg_event.end = UiPathConversationMessageEndEvent(timestamp=timestamp) - if ( msg_event.start or msg_event.content_part @@ -221,86 +241,31 @@ def map_message( return None - text_content = _extract_text(message.content) - - # --- HumanMessage --- - if isinstance(message, HumanMessage): - return _wrap_in_conversation_event( - UiPathConversationMessageEvent( - message_id=message_id, - start=UiPathConversationMessageStartEvent( - role="user", timestamp=timestamp - ), - content_part=UiPathConversationContentPartEvent( - content_part_id=f"cp-{message_id}", - start=UiPathConversationContentPartStartEvent( - mime_type="text/plain" - ), - chunk=UiPathConversationContentPartChunkEvent(data=text_content), - end=UiPathConversationContentPartEndEvent(), - ), - end=UiPathConversationMessageEndEvent(), - ), - exchange_id, - conversation_id, - ) - - # --- AIMessage --- - if isinstance(message, AIMessage): - # Extract first tool call if present - tool_calls = getattr(message, "tool_calls", []) or [] - first_tc = tool_calls[0] if tool_calls else None - - return _wrap_in_conversation_event( - UiPathConversationMessageEvent( - message_id=message_id, - start=UiPathConversationMessageStartEvent( - role="assistant", timestamp=timestamp - ), - content_part=( - UiPathConversationContentPartEvent( - content_part_id=f"cp-{message_id}", - start=UiPathConversationContentPartStartEvent( - mime_type="text/plain" - ), - chunk=UiPathConversationContentPartChunkEvent( - data=text_content - ), - end=UiPathConversationContentPartEndEvent(), - ) - if text_content - else None - ), - tool_call=( - UiPathConversationToolCallEvent( - tool_call_id=first_tc.get("id") or _new_id(), - start=UiPathConversationToolCallStartEvent( - tool_name=first_tc.get("name"), - arguments=UiPathInlineValue( - inline=str(first_tc.get("args", "")) - ), - timestamp=timestamp, - ), - ) - if first_tc - else None - ), - end=UiPathConversationMessageEndEvent(), - ), - exchange_id, - conversation_id, - ) - # --- ToolMessage --- if isinstance(message, ToolMessage): + tool_name = message.name or "" + arguments = "" + + # Retrieve accumulated chunks if available + if tool_chunks_dict is not None and message.tool_call_id: + accumulated_chunk = tool_chunks_dict.get(message.tool_call_id) + if accumulated_chunk: + # Use the accumulated chunk's name and args + if accumulated_chunk.name: + tool_name = accumulated_chunk.name + if accumulated_chunk.args: + arguments = accumulated_chunk.args + # Delete the entry from the dict after processing + del tool_chunks_dict[message.tool_call_id] + return _wrap_in_conversation_event( UiPathConversationMessageEvent( message_id=message_id, tool_call=UiPathConversationToolCallEvent( tool_call_id=message.tool_call_id, start=UiPathConversationToolCallStartEvent( - tool_name=message.name or "", - arguments=UiPathInlineValue(inline=""), + tool_name=tool_name, + arguments=UiPathInlineValue(inline=arguments), timestamp=timestamp, ), end=UiPathConversationToolCallEndEvent( @@ -313,7 +278,30 @@ def map_message( conversation_id, ) + # OLD CODE (commented out - now using accumulated chunks): + # return _wrap_in_conversation_event( + # UiPathConversationMessageEvent( + # message_id=message_id, + # tool_call=UiPathConversationToolCallEvent( + # tool_call_id=message.tool_call_id, + # start=UiPathConversationToolCallStartEvent( + # tool_name=message.name or "", + # arguments=UiPathInlineValue(inline=""), + # timestamp=timestamp, + # ), + # end=UiPathConversationToolCallEndEvent( + # timestamp=timestamp, + # result=UiPathInlineValue(inline=message.content), + # ), + # ), + # ), + # exchange_id, + # conversation_id, + # ) + + text_content = _extract_text(message.content) # --- Fallback --- + logger.warning(f"MAPPED TO FALLBACK \n") return _wrap_in_conversation_event( UiPathConversationMessageEvent( message_id=message_id, diff --git a/src/uipath_langchain/_cli/_runtime/_runtime.py b/src/uipath_langchain/_cli/_runtime/_runtime.py index e4cd642d..f3d29041 100644 --- a/src/uipath_langchain/_cli/_runtime/_runtime.py +++ b/src/uipath_langchain/_cli/_runtime/_runtime.py @@ -145,6 +145,10 @@ async def stream( # Track final chunk for result creation final_chunk: Optional[dict[Any, Any]] = None + # Dictionary to accumulate tool call chunks across streaming messages + from uipath_langchain.chat.content_blocks import ToolCallChunkContent + tool_chunks_dict: dict[str, ToolCallChunkContent] = {} + # Stream events from graph async for stream_chunk in compiled_graph.astream( graph_input, @@ -161,13 +165,14 @@ async def stream( message, _ = data # Use stored conversation/exchange IDs from input, or fallback to execution_id - conversation_id = getattr(self.context, "conversation_id", None) or self.context.execution_id - exchange_id = getattr(self.context, "exchange_id", None) or self.context.execution_id + conversation_id ="b2c6e7df-41cd-4144-b637-96db39b90e2b" or getattr(self.context, "conversation_id", None) + exchange_id = "9dae98ea-c940-4aa2-9f3c-894584b8f358" or getattr(self.context, "exchange_id", None) conversation_event = map_message( message=message, exchange_id=exchange_id, conversation_id=conversation_id, + tool_chunks_dict=tool_chunks_dict, ) # Only emit if conversion was successful From 0ea5126e52d55d202e5b80bff78bde35e44ac00b Mon Sep 17 00:00:00 2001 From: GabrielVasilescu04 Date: Thu, 6 Nov 2025 14:18:22 +0200 Subject: [PATCH 4/6] wip toolCalls --- .../_cli/_runtime/_conversation.py | 434 ++++++++---------- .../_cli/_runtime/_runtime.py | 10 +- 2 files changed, 205 insertions(+), 239 deletions(-) diff --git a/src/uipath_langchain/_cli/_runtime/_conversation.py b/src/uipath_langchain/_cli/_runtime/_conversation.py index a32cc4c8..c5681a00 100644 --- a/src/uipath_langchain/_cli/_runtime/_conversation.py +++ b/src/uipath_langchain/_cli/_runtime/_conversation.py @@ -42,32 +42,208 @@ def _new_id() -> str: return str(uuid.uuid4()) -def _wrap_in_conversation_event( - msg_event: UiPathConversationMessageEvent, - exchange_id: Optional[str] = None, - conversation_id: Optional[str] = None, -) -> UiPathConversationEvent: - """Helper to wrap a message event into a conversation-level event.""" - return UiPathConversationEvent( - conversation_id=conversation_id or _new_id(), - exchange=UiPathConversationExchangeEvent( - exchange_id=exchange_id or _new_id(), - message=msg_event, - ), - ) - - -def _extract_text(content) -> str: - """Normalize LangGraph message.content to plain text.""" - if isinstance(content, str): - return content - if isinstance(content, list): - return "".join( - part.get("text", "") - for part in content - if isinstance(part, dict) and part.get("type") == "text" +class MessageMapper: + """Stateful mapper that converts LangChain messages to UiPath conversation events. + + Maintains state across multiple message conversions to properly track: + - Tool call chunks that are accumulated until the ToolMessage arrives + - The last AI message ID for associating tool results with their originating message + """ + + def __init__(self): + """Initialize the mapper with empty state.""" + self.tool_chunks_dict: Dict[str, ToolCallChunkContent] = {} + self.last_ai_message_id: Optional[str] = None + + def _wrap_in_conversation_event( + self, + msg_event: UiPathConversationMessageEvent, + exchange_id: Optional[str] = None, + conversation_id: Optional[str] = None, + ) -> UiPathConversationEvent: + """Helper to wrap a message event into a conversation-level event.""" + return UiPathConversationEvent( + conversation_id=conversation_id or _new_id(), + exchange=UiPathConversationExchangeEvent( + exchange_id=exchange_id or _new_id(), + message=msg_event, + ), + ) + + def _extract_text(self, content) -> str: + """Normalize LangGraph message.content to plain text.""" + if isinstance(content, str): + return content + if isinstance(content, list): + return "".join( + part.get("text", "") + for part in content + if isinstance(part, dict) and part.get("type") == "text" + ) + return str(content or "") + + def map_message( + self, + message: BaseMessage, + exchange_id: Optional[str] = None, + conversation_id: Optional[str] = None, + ) -> Optional[UiPathConversationEvent]: + """Convert LangGraph BaseMessage (chunk or full) into a UiPathConversationEvent. + + Args: + message: The LangChain message to convert + exchange_id: Optional exchange ID for the conversation + conversation_id: Optional conversation ID + + Returns: + A UiPathConversationEvent if the message should be emitted, None otherwise + """ + message_id = message.id or _new_id() + # Format timestamp as ISO 8601 UTC with milliseconds: 2025-01-04T10:30:00.123Z + timestamp = datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z') + + # --- Streaming AIMessageChunk --- + if isinstance(message, AIMessageChunk): + # Track this AI message ID for future tool message references + self.last_ai_message_id = message.id or _new_id() + + msg_event = UiPathConversationMessageEvent( + message_id=self.last_ai_message_id, + ) + + # Check if this is the last chunk by examining chunk_position + chunk = AIMessageChunk(**message.model_dump()) + if hasattr(chunk, "chunk_position") and getattr(chunk, "chunk_position") == "last": + msg_event.end = UiPathConversationMessageEndEvent(timestamp=timestamp) + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=f"chunk-{message.id}-0", + end=UiPathConversationContentPartEndEvent() + ) + return self._wrap_in_conversation_event(msg_event, exchange_id, conversation_id) + + if message.content == []: + msg_event.start = UiPathConversationMessageStartEvent( + role="assistant", timestamp=timestamp + ) + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=f"chunk-{message.id}-0", + start=UiPathConversationContentPartStartEvent(mime_type="text/plain"), + ) + + elif isinstance(message.content, list) and message.content: + content_adapter = TypeAdapter(ContentBlock) + + for raw_chunk in message.content: + if not isinstance(raw_chunk, dict): + continue + + try: + # Parse chunk + chunk = content_adapter.validate_python(raw_chunk) + + if isinstance(chunk, TextContent): + chunk_id = raw_chunk.get("id", f"chunk-{message.id}-0") + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=chunk_id, + chunk=UiPathConversationContentPartChunkEvent( + data=chunk.text, + content_part_sequence=0, + ), + ) + + elif isinstance(chunk, ToolCallChunkContent): + # Streaming tool call chunk - accumulate in dictionary instead of emitting + if chunk.id: + if chunk.id in self.tool_chunks_dict: + self.tool_chunks_dict[chunk.id] = self.tool_chunks_dict[chunk.id] + chunk + else: + self.tool_chunks_dict[chunk.id] = chunk + continue + + except ValidationError as e: + # Log and skip unknown/invalid chunk types + logger.warning( + f"Failed to parse content chunk: {raw_chunk}. Error: {e}" + ) + continue + + elif isinstance(message.content, str) and message.content: + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=f"content-{message.id}", + chunk=UiPathConversationContentPartChunkEvent( + data=message.content, + content_part_sequence=0, + ), + ) + + if ( + msg_event.start + or msg_event.content_part + or msg_event.tool_call + or msg_event.end + ): + return self._wrap_in_conversation_event(msg_event, exchange_id, conversation_id) + + return None + + # --- ToolMessage --- + if isinstance(message, ToolMessage): + tool_name = message.name or "" + arguments = "" + + # Retrieve accumulated chunks if available + if message.tool_call_id: + accumulated_chunk = self.tool_chunks_dict.get(message.tool_call_id) + if accumulated_chunk: + # Use the accumulated chunk's name and args + if accumulated_chunk.name: + tool_name = accumulated_chunk.name + if accumulated_chunk.args: + arguments = accumulated_chunk.args + # Delete the entry from the dict after processing + del self.tool_chunks_dict[message.tool_call_id] + + # Use the last AI message ID instead of the ToolMessage's own ID + # This associates the tool result with the message that initiated the tool call + result_message_id = self.last_ai_message_id or message_id + + return self._wrap_in_conversation_event( + UiPathConversationMessageEvent( + message_id=result_message_id, + tool_call=UiPathConversationToolCallEvent( + tool_call_id=message.tool_call_id, + start=UiPathConversationToolCallStartEvent( + tool_name=tool_name, + arguments=UiPathInlineValue(inline=arguments), + timestamp=timestamp, + ), + end=UiPathConversationToolCallEndEvent( + timestamp=timestamp, + result=UiPathInlineValue(inline=message.content), + ), + ), + ), + exchange_id, + conversation_id, + ) + + text_content = self._extract_text(message.content) + # --- Fallback --- + return self._wrap_in_conversation_event( + UiPathConversationMessageEvent( + message_id=message_id, + start=UiPathConversationMessageStartEvent( + role="assistant", timestamp=timestamp + ), + content_part=UiPathConversationContentPartEvent( + content_part_id=f"cp-{message_id}", + chunk=UiPathConversationContentPartChunkEvent(data=text_content), + ), + end=UiPathConversationMessageEndEvent(), + ), + exchange_id, + conversation_id, ) - return str(content or "") def uipath_to_human_messages( @@ -109,211 +285,3 @@ def uipath_to_human_messages( human_messages.append(HumanMessage(content="", metadata=metadata)) return human_messages - - -def map_message( - message: BaseMessage, - exchange_id: Optional[str] = None, - conversation_id: Optional[str] = None, - tool_chunks_dict: Optional[Dict[str, ToolCallChunkContent]] = None, -) -> Optional[UiPathConversationEvent]: - """Convert LangGraph BaseMessage (chunk or full) into a UiPathConversationEvent.""" - message_id = getattr(message, "id", None) or _new_id() - # Format timestamp as ISO 8601 UTC with milliseconds: 2025-01-04T10:30:00.123Z - timestamp = datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z') - - # --- Streaming AIMessageChunk --- - if isinstance(message, AIMessageChunk): - msg_event = UiPathConversationMessageEvent( - message_id=message.id or _new_id(), - ) - - # Check if this is the last chunk by examining chunk_position - chunk = AIMessageChunk(**message.model_dump()) - if hasattr(chunk, "chunk_position") and getattr(chunk, "chunk_position") == "last": - msg_event.end = UiPathConversationMessageEndEvent(timestamp=timestamp) - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=f"chunk-{message.id}-0", - end=UiPathConversationContentPartEndEvent() - ) - return _wrap_in_conversation_event(msg_event, exchange_id, conversation_id) - - if message.content == []: - msg_event.start = UiPathConversationMessageStartEvent( - role="assistant", timestamp=timestamp - ) - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=f"chunk-{message.id}-0", - start=UiPathConversationContentPartStartEvent(mime_type="text/plain"), - ) - - elif isinstance(message.content, list) and message.content: - content_adapter = TypeAdapter(ContentBlock) - - for raw_chunk in message.content: - if not isinstance(raw_chunk, dict): - continue - - try: - # Parse chunk - chunk = content_adapter.validate_python(raw_chunk) - - if isinstance(chunk, TextContent): - logger.warning("MAPPED TO TextContentChunk") - chunk_id = raw_chunk.get("id", f"chunk-{message.id}-0") - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=chunk_id, - chunk=UiPathConversationContentPartChunkEvent( - data=chunk.text, - content_part_sequence=0, - ), - ) - - #elif isinstance(chunk, ToolCallContent): - # # Complete tool call (non-streaming) - # msg_event.tool_call = UiPathConversationToolCallEvent( - # tool_call_id=chunk.id, - # start=UiPathConversationToolCallStartEvent( - # tool_name=chunk.name, - # arguments=UiPathInlineValue(inline=str(chunk.args)), - # timestamp=timestamp, - # ), - # end=UiPathConversationToolCallEndEvent(timestamp=timestamp), - # ) - - elif isinstance(chunk, ToolCallChunkContent): - # Streaming tool call chunk - accumulate in dictionary instead of emitting - if tool_chunks_dict is not None and chunk.id: - # Add or accumulate chunk using the __add__ operator - if chunk.id in tool_chunks_dict: - tool_chunks_dict[chunk.id] = tool_chunks_dict[chunk.id] + chunk - else: - tool_chunks_dict[chunk.id] = chunk - # Don't emit any events for tool call chunks - they'll be processed when ToolMessage arrives - continue - - # OLD CODE (commented out - now accumulating chunks in dict): - # chunk_id = chunk.id or f"chunk-{message.id}-{chunk.index or 0}" - # - # if chunk.name and not chunk.args: - # # Tool call start - # msg_event.tool_call = UiPathConversationToolCallEvent( - # tool_call_id=chunk_id, - # start=UiPathConversationToolCallStartEvent( - # tool_name=chunk.name, - # arguments=UiPathInlineValue(inline=""), - # timestamp=timestamp, - # ), - # ) - # elif chunk.args: - # # Streaming tool arguments - # msg_event.content_part = UiPathConversationContentPartEvent( - # content_part_id=chunk_id, - # chunk=UiPathConversationContentPartChunkEvent( - # data=str(chunk.args), - # content_part_sequence=chunk.index or 0, - # ), - # ) - - except ValidationError as e: - # Log and skip unknown/invalid chunk types - logger.warning( - f"Failed to parse content chunk: {raw_chunk}. Error: {e}" - ) - continue - - elif isinstance(message.content, str) and message.content: - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=f"content-{message.id}", - chunk=UiPathConversationContentPartChunkEvent( - data=message.content, - content_part_sequence=0, - ), - ) - - if ( - msg_event.start - or msg_event.content_part - or msg_event.tool_call - or msg_event.end - ): - return _wrap_in_conversation_event(msg_event, exchange_id, conversation_id) - - return None - - # --- ToolMessage --- - if isinstance(message, ToolMessage): - tool_name = message.name or "" - arguments = "" - - # Retrieve accumulated chunks if available - if tool_chunks_dict is not None and message.tool_call_id: - accumulated_chunk = tool_chunks_dict.get(message.tool_call_id) - if accumulated_chunk: - # Use the accumulated chunk's name and args - if accumulated_chunk.name: - tool_name = accumulated_chunk.name - if accumulated_chunk.args: - arguments = accumulated_chunk.args - # Delete the entry from the dict after processing - del tool_chunks_dict[message.tool_call_id] - - return _wrap_in_conversation_event( - UiPathConversationMessageEvent( - message_id=message_id, - tool_call=UiPathConversationToolCallEvent( - tool_call_id=message.tool_call_id, - start=UiPathConversationToolCallStartEvent( - tool_name=tool_name, - arguments=UiPathInlineValue(inline=arguments), - timestamp=timestamp, - ), - end=UiPathConversationToolCallEndEvent( - timestamp=timestamp, - result=UiPathInlineValue(inline=message.content), - ), - ), - ), - exchange_id, - conversation_id, - ) - - # OLD CODE (commented out - now using accumulated chunks): - # return _wrap_in_conversation_event( - # UiPathConversationMessageEvent( - # message_id=message_id, - # tool_call=UiPathConversationToolCallEvent( - # tool_call_id=message.tool_call_id, - # start=UiPathConversationToolCallStartEvent( - # tool_name=message.name or "", - # arguments=UiPathInlineValue(inline=""), - # timestamp=timestamp, - # ), - # end=UiPathConversationToolCallEndEvent( - # timestamp=timestamp, - # result=UiPathInlineValue(inline=message.content), - # ), - # ), - # ), - # exchange_id, - # conversation_id, - # ) - - text_content = _extract_text(message.content) - # --- Fallback --- - logger.warning(f"MAPPED TO FALLBACK \n") - return _wrap_in_conversation_event( - UiPathConversationMessageEvent( - message_id=message_id, - start=UiPathConversationMessageStartEvent( - role="assistant", timestamp=timestamp - ), - content_part=UiPathConversationContentPartEvent( - content_part_id=f"cp-{message_id}", - chunk=UiPathConversationContentPartChunkEvent(data=text_content), - ), - end=UiPathConversationMessageEndEvent(), - ), - exchange_id, - conversation_id, - ) diff --git a/src/uipath_langchain/_cli/_runtime/_runtime.py b/src/uipath_langchain/_cli/_runtime/_runtime.py index f3d29041..d89d2d75 100644 --- a/src/uipath_langchain/_cli/_runtime/_runtime.py +++ b/src/uipath_langchain/_cli/_runtime/_runtime.py @@ -25,7 +25,7 @@ UiPathAgentStateEvent, UiPathRuntimeEvent, ) -from ._conversation import map_message +from ._conversation import MessageMapper from .._utils._schema import generate_schema_from_graph from ._context import LangGraphRuntimeContext from ._exception import LangGraphErrorCode, LangGraphRuntimeError @@ -145,9 +145,8 @@ async def stream( # Track final chunk for result creation final_chunk: Optional[dict[Any, Any]] = None - # Dictionary to accumulate tool call chunks across streaming messages - from uipath_langchain.chat.content_blocks import ToolCallChunkContent - tool_chunks_dict: dict[str, ToolCallChunkContent] = {} + # Create a stateful message mapper to track state across messages + message_mapper = MessageMapper() # Stream events from graph async for stream_chunk in compiled_graph.astream( @@ -168,11 +167,10 @@ async def stream( conversation_id ="b2c6e7df-41cd-4144-b637-96db39b90e2b" or getattr(self.context, "conversation_id", None) exchange_id = "9dae98ea-c940-4aa2-9f3c-894584b8f358" or getattr(self.context, "exchange_id", None) - conversation_event = map_message( + conversation_event = message_mapper.map_message( message=message, exchange_id=exchange_id, conversation_id=conversation_id, - tool_chunks_dict=tool_chunks_dict, ) # Only emit if conversion was successful From 42f5c3ae6161731f84068879e060b05b2297bec6 Mon Sep 17 00:00:00 2001 From: GabrielVasilescu04 Date: Mon, 10 Nov 2025 09:46:16 +0200 Subject: [PATCH 5/6] mapped tool calls nearly done --- .../_cli/_runtime/_conversation.py | 44 ++++++++++++------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/src/uipath_langchain/_cli/_runtime/_conversation.py b/src/uipath_langchain/_cli/_runtime/_conversation.py index c5681a00..2ecb976f 100644 --- a/src/uipath_langchain/_cli/_runtime/_conversation.py +++ b/src/uipath_langchain/_cli/_runtime/_conversation.py @@ -47,13 +47,12 @@ class MessageMapper: Maintains state across multiple message conversions to properly track: - Tool call chunks that are accumulated until the ToolMessage arrives - - The last AI message ID for associating tool results with their originating message + - The AI message ID associated with each tool call for proper correlation """ def __init__(self): """Initialize the mapper with empty state.""" - self.tool_chunks_dict: Dict[str, ToolCallChunkContent] = {} - self.last_ai_message_id: Optional[str] = None + self.tool_chunks_dict: Dict[str, tuple[str, ToolCallChunkContent]] = {} def _wrap_in_conversation_event( self, @@ -104,11 +103,11 @@ def map_message( # --- Streaming AIMessageChunk --- if isinstance(message, AIMessageChunk): - # Track this AI message ID for future tool message references - self.last_ai_message_id = message.id or _new_id() + # Track this AI message ID for associating tool calls + ai_message_id = message.id or _new_id() msg_event = UiPathConversationMessageEvent( - message_id=self.last_ai_message_id, + message_id=ai_message_id, ) # Check if this is the last chunk by examining chunk_position @@ -152,12 +151,15 @@ def map_message( ) elif isinstance(chunk, ToolCallChunkContent): - # Streaming tool call chunk - accumulate in dictionary instead of emitting + # Streaming tool call chunk - accumulate in dictionary with AI message ID if chunk.id: if chunk.id in self.tool_chunks_dict: - self.tool_chunks_dict[chunk.id] = self.tool_chunks_dict[chunk.id] + chunk + # Accumulate the chunk, keeping the same AI message ID + stored_ai_id, stored_chunk = self.tool_chunks_dict[chunk.id] + self.tool_chunks_dict[chunk.id] = (stored_ai_id, stored_chunk + chunk) else: - self.tool_chunks_dict[chunk.id] = chunk + # Store new chunk with AI message ID + self.tool_chunks_dict[chunk.id] = (ai_message_id, chunk) continue except ValidationError as e: @@ -188,13 +190,18 @@ def map_message( # --- ToolMessage --- if isinstance(message, ToolMessage): - tool_name = message.name or "" - arguments = "" + result_message_id: Optional[str] = None + tool_name = message.name + arguments = None - # Retrieve accumulated chunks if available + # Retrieve accumulated chunks and AI message ID if message.tool_call_id: - accumulated_chunk = self.tool_chunks_dict.get(message.tool_call_id) - if accumulated_chunk: + tool_data = self.tool_chunks_dict.get(message.tool_call_id) + if tool_data: + # Unpack the AI message ID and accumulated chunk + stored_ai_id, accumulated_chunk = tool_data + result_message_id = stored_ai_id + # Use the accumulated chunk's name and args if accumulated_chunk.name: tool_name = accumulated_chunk.name @@ -203,9 +210,12 @@ def map_message( # Delete the entry from the dict after processing del self.tool_chunks_dict[message.tool_call_id] - # Use the last AI message ID instead of the ToolMessage's own ID - # This associates the tool result with the message that initiated the tool call - result_message_id = self.last_ai_message_id or message_id + # If no AI message ID was found, we cannot properly associate this tool result + if not result_message_id: + logger.warning( + f"Tool message {message.tool_call_id} has no associated AI message ID. Skipping." + ) + return None return self._wrap_in_conversation_event( UiPathConversationMessageEvent( From 3937990e407b8a4d2dfa579143288127848e1077 Mon Sep 17 00:00:00 2001 From: GabrielVasilescu04 Date: Mon, 10 Nov 2025 17:54:25 +0200 Subject: [PATCH 6/6] final mappings --- .../_cli/_runtime/_conversation.py | 77 +++++++++---------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/src/uipath_langchain/_cli/_runtime/_conversation.py b/src/uipath_langchain/_cli/_runtime/_conversation.py index 2ecb976f..e4b9b161 100644 --- a/src/uipath_langchain/_cli/_runtime/_conversation.py +++ b/src/uipath_langchain/_cli/_runtime/_conversation.py @@ -1,3 +1,4 @@ +import json import logging import uuid from datetime import datetime, timezone @@ -46,13 +47,13 @@ class MessageMapper: """Stateful mapper that converts LangChain messages to UiPath conversation events. Maintains state across multiple message conversions to properly track: - - Tool call chunks that are accumulated until the ToolMessage arrives - - The AI message ID associated with each tool call for proper correlation + - The AI message ID associated with each tool call for proper correlation with ToolMessage """ def __init__(self): """Initialize the mapper with empty state.""" - self.tool_chunks_dict: Dict[str, tuple[str, ToolCallChunkContent]] = {} + self.tool_call_to_ai_message: Dict[str, str] = {} + self.seen_message_ids: set[str] = set() def _wrap_in_conversation_event( self, @@ -97,14 +98,13 @@ def map_message( Returns: A UiPathConversationEvent if the message should be emitted, None otherwise """ - message_id = message.id or _new_id() # Format timestamp as ISO 8601 UTC with milliseconds: 2025-01-04T10:30:00.123Z timestamp = datetime.now(timezone.utc).isoformat(timespec='milliseconds').replace('+00:00', 'Z') # --- Streaming AIMessageChunk --- if isinstance(message, AIMessageChunk): # Track this AI message ID for associating tool calls - ai_message_id = message.id or _new_id() + ai_message_id = message.id msg_event = UiPathConversationMessageEvent( message_id=ai_message_id, @@ -120,7 +120,9 @@ def map_message( ) return self._wrap_in_conversation_event(msg_event, exchange_id, conversation_id) - if message.content == []: + # For every new message_id, start a new message + if ai_message_id not in self.seen_message_ids: + self.seen_message_ids.add(ai_message_id) msg_event.start = UiPathConversationMessageStartEvent( role="assistant", timestamp=timestamp ) @@ -141,9 +143,8 @@ def map_message( chunk = content_adapter.validate_python(raw_chunk) if isinstance(chunk, TextContent): - chunk_id = raw_chunk.get("id", f"chunk-{message.id}-0") msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=chunk_id, + content_part_id=f"chunk-{message.id}-0", chunk=UiPathConversationContentPartChunkEvent( data=chunk.text, content_part_sequence=0, @@ -151,15 +152,17 @@ def map_message( ) elif isinstance(chunk, ToolCallChunkContent): - # Streaming tool call chunk - accumulate in dictionary with AI message ID + # Track tool_call_id -> ai_message_id mapping if chunk.id: - if chunk.id in self.tool_chunks_dict: - # Accumulate the chunk, keeping the same AI message ID - stored_ai_id, stored_chunk = self.tool_chunks_dict[chunk.id] - self.tool_chunks_dict[chunk.id] = (stored_ai_id, stored_chunk + chunk) - else: - # Store new chunk with AI message ID - self.tool_chunks_dict[chunk.id] = (ai_message_id, chunk) + self.tool_call_to_ai_message[chunk.id] = ai_message_id + + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=f"chunk-{message.id}-0", + chunk=UiPathConversationContentPartChunkEvent( + data=chunk.args, + content_part_sequence=0, + ), + ) continue except ValidationError as e: @@ -190,25 +193,8 @@ def map_message( # --- ToolMessage --- if isinstance(message, ToolMessage): - result_message_id: Optional[str] = None - tool_name = message.name - arguments = None - - # Retrieve accumulated chunks and AI message ID - if message.tool_call_id: - tool_data = self.tool_chunks_dict.get(message.tool_call_id) - if tool_data: - # Unpack the AI message ID and accumulated chunk - stored_ai_id, accumulated_chunk = tool_data - result_message_id = stored_ai_id - - # Use the accumulated chunk's name and args - if accumulated_chunk.name: - tool_name = accumulated_chunk.name - if accumulated_chunk.args: - arguments = accumulated_chunk.args - # Delete the entry from the dict after processing - del self.tool_chunks_dict[message.tool_call_id] + # Look up the AI message ID using the tool_call_id + result_message_id = self.tool_call_to_ai_message.get(message.tool_call_id) if message.tool_call_id else None # If no AI message ID was found, we cannot properly associate this tool result if not result_message_id: @@ -217,19 +203,30 @@ def map_message( ) return None + # Clean up the mapping after use + if message.tool_call_id: + del self.tool_call_to_ai_message[message.tool_call_id] + + content_value = message.content + if isinstance(content_value, str): + try: + content_value = json.loads(content_value) + except (json.JSONDecodeError, TypeError): + pass # Keep as string if not valid JSON + return self._wrap_in_conversation_event( UiPathConversationMessageEvent( message_id=result_message_id, tool_call=UiPathConversationToolCallEvent( tool_call_id=message.tool_call_id, start=UiPathConversationToolCallStartEvent( - tool_name=tool_name, - arguments=UiPathInlineValue(inline=arguments), + tool_name=message.name, + arguments=None, timestamp=timestamp, ), end=UiPathConversationToolCallEndEvent( timestamp=timestamp, - result=UiPathInlineValue(inline=message.content), + result=content_value, ), ), ), @@ -241,12 +238,12 @@ def map_message( # --- Fallback --- return self._wrap_in_conversation_event( UiPathConversationMessageEvent( - message_id=message_id, + message_id=message.id, start=UiPathConversationMessageStartEvent( role="assistant", timestamp=timestamp ), content_part=UiPathConversationContentPartEvent( - content_part_id=f"cp-{message_id}", + content_part_id=f"cp-{message.id}", chunk=UiPathConversationContentPartChunkEvent(data=text_content), ), end=UiPathConversationMessageEndEvent(),