diff --git a/config/greeting_local.json5 b/config/greeting_local.json5 index 6e6305ae4..dcb04acb1 100644 --- a/config/greeting_local.json5 +++ b/config/greeting_local.json5 @@ -15,7 +15,7 @@ type: "QwenLLM", config: { agent_name: "${ROBOT_NAME:-Bits}", - history_length: 1, + history_length: 0, base_url: "${QWEN_BASE_URL:-http://omr2.local:8860}/v1", model: "nvidia/nemotron-3-nano", }, @@ -56,6 +56,12 @@ enable_tts_interrupt: false, }, }, + { + type: "ConversationHistoryInput", + config: { + max_rounds: 3, + }, + }, ], action_execution_mode: "concurrent", agent_actions: [ diff --git a/src/fuser/__init__.py b/src/fuser/__init__.py index 9d1fe0d49..839aadd78 100644 --- a/src/fuser/__init__.py +++ b/src/fuser/__init__.py @@ -94,7 +94,7 @@ async def fuse( + f"\n\nToday is {today}.\n" ) - inputs_fused = " ".join([s for s in input_strings if s is not None]) + inputs_fused = "".join([s for s in input_strings if s is not None]) # Query the knowledge base if configured and if there are inputs to query with kb_context = "" diff --git a/src/inputs/plugins/conversation_history_input.py b/src/inputs/plugins/conversation_history_input.py new file mode 100644 index 000000000..f9b51478d --- /dev/null +++ b/src/inputs/plugins/conversation_history_input.py @@ -0,0 +1,133 @@ +import asyncio +import logging +import time +from collections import deque +from typing import Deque, Optional + +from pydantic import Field + +from inputs.base import Message, SensorConfig +from inputs.base.loop import FuserInput +from providers.io_provider import IOProvider + + +class ConversationHistoryConfig(SensorConfig): + """ + Configuration for Conversation History Input. + + Parameters + ---------- + max_rounds : int + Maximum number of voice inputs to keep in history. + """ + + max_rounds: int = Field( + default=3, + description="Maximum number of voice inputs to keep in history", + ) + + +class ConversationHistoryInput(FuserInput[ConversationHistoryConfig, Optional[str]]): + """ + Async input that polls IOProvider for voice inputs and maintains + a sliding window of conversation history for the LLM prompt. + """ + + def __init__(self, config: ConversationHistoryConfig): + super().__init__(config) + + self.io_provider = IOProvider() + self.messages: Deque[Message] = deque(maxlen=config.max_rounds) + self._last_recorded_tick: int = -1 + self.descriptor_for_LLM = "Conversation History" + + # Guard flag: when True, this instance ignores incoming voice inputs + self._stopped = False + + async def _poll(self) -> Optional[str]: + """ + Check IOProvider for new voice input this tick. + + Returns + ------- + Optional[str] + The voice input text if new, None otherwise. + """ + await asyncio.sleep(0.5) + + if self._stopped: + return + + current_tick = self.io_provider.tick_counter + if current_tick <= self._last_recorded_tick: + return None + + voice_input = self.io_provider.get_input("Voice") + if voice_input and voice_input.input and voice_input.tick == current_tick: + text = voice_input.input.strip() + if text: + self._last_recorded_tick = current_tick + return text + + return None + + async def _raw_to_text(self, raw_input: Optional[str]) -> Optional[Message]: + """ + Process raw input to generate a timestamped message. + + Parameters + ---------- + raw_input : Optional[str] + Raw input string to be processed. + + Returns + ------- + Optional[Message] + A timestamped message containing the processed input. + """ + if raw_input is None: + return None + return Message(timestamp=time.time(), message=raw_input) + + async def raw_to_text(self, raw_input: Optional[str]): + """ + Convert raw input to text and update message buffer. + + Parameters + ---------- + raw_input : Optional[str] + Raw input to be processed, or None if no input is available. + """ + if raw_input is None: + return + + message = await self._raw_to_text(raw_input) + if message is not None: + self.messages.append(message) + + def formatted_latest_buffer(self) -> Optional[str]: + """ + Return all recorded voice inputs as a conversation history block. + + Returns + ------- + Optional[str] + A formatted string of the conversation history for LLM input, or None if no history exists. + """ + if len(self.messages) == 0: + return None + + lines = [f"User: {msg.message}" for msg in self.messages] + result = f'{self.descriptor_for_LLM}: "{"; ".join(lines)}"' + + return result + + def stop(self): + """ + Clear message history and reset state when stopping the input. + """ + logging.info("Stopping ConversationHistoryInput") + + self._stopped = True + + self.messages.clear() diff --git a/tests/inputs/plugins/test_conversation_history_input.py b/tests/inputs/plugins/test_conversation_history_input.py new file mode 100644 index 000000000..e71a90703 --- /dev/null +++ b/tests/inputs/plugins/test_conversation_history_input.py @@ -0,0 +1,371 @@ +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from inputs.base import Message +from inputs.plugins.conversation_history_input import ( + ConversationHistoryConfig, + ConversationHistoryInput, +) + + +def test_initialization(): + """Test basic initialization with default config.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + assert len(sensor.messages) == 0 + assert sensor.messages.maxlen == 3 # default max_rounds + assert sensor._last_recorded_tick == -1 + assert sensor.descriptor_for_LLM == "Conversation History" + assert sensor._stopped is False + + +def test_initialization_with_custom_max_rounds(): + """Test initialization with custom max_rounds.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig(max_rounds=5) + sensor = ConversationHistoryInput(config=config) + + assert sensor.messages.maxlen == 5 + + +@pytest.mark.asyncio +async def test_poll_with_new_voice_input(): + """Test _poll when there's new voice input.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.asyncio.sleep", new=AsyncMock() + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + mock_provider = MagicMock() + mock_provider.tick_counter = 5 + mock_voice_input = MagicMock() + mock_voice_input.input = "Hello, robot!" + mock_voice_input.tick = 5 + mock_provider.get_input.return_value = mock_voice_input + sensor.io_provider = mock_provider + + result = await sensor._poll() + + assert result == "Hello, robot!" + assert sensor._last_recorded_tick == 5 + mock_provider.get_input.assert_called_once_with("Voice") + + +@pytest.mark.asyncio +async def test_poll_with_whitespace_input(): + """Test _poll filters out whitespace-only input.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.asyncio.sleep", new=AsyncMock() + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + mock_provider = MagicMock() + mock_provider.tick_counter = 5 + mock_voice_input = MagicMock() + mock_voice_input.input = " " # only whitespace + mock_voice_input.tick = 5 + mock_provider.get_input.return_value = mock_voice_input + sensor.io_provider = mock_provider + + result = await sensor._poll() + + assert result is None + assert sensor._last_recorded_tick == -1 # should not update + + +@pytest.mark.asyncio +async def test_poll_with_no_new_voice_input(): + """Test _poll when there's no new voice input.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.asyncio.sleep", new=AsyncMock() + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + mock_provider = MagicMock() + mock_provider.tick_counter = 5 + mock_provider.get_input.return_value = None + sensor.io_provider = mock_provider + + result = await sensor._poll() + + assert result is None + + +@pytest.mark.asyncio +async def test_poll_with_old_tick(): + """Test _poll when tick hasn't advanced.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.asyncio.sleep", new=AsyncMock() + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + mock_provider = MagicMock() + mock_provider.tick_counter = 5 + sensor.io_provider = mock_provider + sensor._last_recorded_tick = 5 # already recorded this tick + + result = await sensor._poll() + + assert result is None + mock_provider.get_input.assert_not_called() + + +@pytest.mark.asyncio +async def test_poll_when_stopped(): + """Test _poll returns None when input is stopped.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.asyncio.sleep", new=AsyncMock() + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + sensor._stopped = True + + mock_provider = MagicMock() + mock_provider.tick_counter = 5 + sensor.io_provider = mock_provider + + result = await sensor._poll() + + assert result is None + mock_provider.get_input.assert_not_called() + + +@pytest.mark.asyncio +async def test_raw_to_text_with_valid_input(): + """Test _raw_to_text with valid input.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.time.time", return_value=1234.5 + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + result = await sensor._raw_to_text("Test message") + + assert result is not None + assert isinstance(result, Message) + assert result.timestamp == 1234.5 + assert result.message == "Test message" + + +@pytest.mark.asyncio +async def test_raw_to_text_with_none(): + """Test _raw_to_text with None input.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + result = await sensor._raw_to_text(None) + + assert result is None + + +@pytest.mark.asyncio +async def test_raw_to_text_updates_messages(): + """Test raw_to_text updates message buffer.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.time.time", return_value=1234.5 + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + await sensor.raw_to_text("First message") + await sensor.raw_to_text("Second message") + + assert len(sensor.messages) == 2 + assert sensor.messages[0].message == "First message" + assert sensor.messages[1].message == "Second message" + + +@pytest.mark.asyncio +async def test_raw_to_text_with_none_does_not_update(): + """Test raw_to_text with None does not update buffer.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + await sensor.raw_to_text(None) + + assert len(sensor.messages) == 0 + + +def test_formatted_latest_buffer_with_messages(): + """Test formatted_latest_buffer with messages in buffer.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + sensor.messages.append(Message(timestamp=1000.0, message="First message")) + sensor.messages.append(Message(timestamp=1001.0, message="Second message")) + sensor.messages.append(Message(timestamp=1002.0, message="Third message")) + + result = sensor.formatted_latest_buffer() + + assert result is not None + assert "Conversation History:" in result + assert "User: First message" in result + assert "User: Second message" in result + assert "User: Third message" in result + + +def test_formatted_latest_buffer_empty(): + """Test formatted_latest_buffer with empty buffer.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + result = sensor.formatted_latest_buffer() + + assert result is None + + +def test_formatted_latest_buffer_single_message(): + """Test formatted_latest_buffer with single message.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + sensor.messages.append(Message(timestamp=1000.0, message="Only message")) + + result = sensor.formatted_latest_buffer() + + assert result is not None + assert "Conversation History:" in result + assert "User: Only message" in result + + +def test_sliding_window_behavior(): + """Test that messages respect max_rounds sliding window.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.time.time", + side_effect=[1000.0, 1001.0, 1002.0, 1003.0], + ), + ): + config = ConversationHistoryConfig(max_rounds=3) + sensor = ConversationHistoryInput(config=config) + + sensor.messages.append(Message(timestamp=1000.0, message="First")) + sensor.messages.append(Message(timestamp=1001.0, message="Second")) + sensor.messages.append(Message(timestamp=1002.0, message="Third")) + sensor.messages.append(Message(timestamp=1003.0, message="Fourth")) + + assert len(sensor.messages) == 3 # max_rounds + assert sensor.messages[0].message == "Second" + assert sensor.messages[1].message == "Third" + assert sensor.messages[2].message == "Fourth" + + +def test_stop_clears_messages(): + """Test stop method clears messages and sets stopped flag.""" + with patch("inputs.plugins.conversation_history_input.IOProvider"): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + sensor.messages.append(Message(timestamp=1000.0, message="First")) + sensor.messages.append(Message(timestamp=1001.0, message="Second")) + + assert len(sensor.messages) == 2 + assert sensor._stopped is False + + sensor.stop() + + assert len(sensor.messages) == 0 + assert sensor._stopped is True + + +@pytest.mark.asyncio +async def test_full_workflow(): + """Test full workflow: poll -> raw_to_text -> formatted_latest_buffer.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.asyncio.sleep", new=AsyncMock() + ), + patch( + "inputs.plugins.conversation_history_input.time.time", + side_effect=[1000.0, 1001.0], + ), + ): + config = ConversationHistoryConfig(max_rounds=2) + sensor = ConversationHistoryInput(config=config) + + mock_provider = MagicMock() + mock_provider.tick_counter = 1 + mock_voice_input1 = MagicMock() + mock_voice_input1.input = "Hello" + mock_voice_input1.tick = 1 + mock_provider.get_input.return_value = mock_voice_input1 + sensor.io_provider = mock_provider + + raw_input1 = await sensor._poll() + await sensor.raw_to_text(raw_input1) + + mock_provider.tick_counter = 2 + mock_voice_input2 = MagicMock() + mock_voice_input2.input = "How are you?" + mock_voice_input2.tick = 2 + mock_provider.get_input.return_value = mock_voice_input2 + + raw_input2 = await sensor._poll() + await sensor.raw_to_text(raw_input2) + + result = sensor.formatted_latest_buffer() + + assert result is not None + assert "User: Hello" in result + assert "User: How are you?" in result + assert len(sensor.messages) == 2 + + +@pytest.mark.asyncio +async def test_poll_with_input_object_no_input_field(): + """Test _poll handles voice input with missing or empty input field.""" + with ( + patch("inputs.plugins.conversation_history_input.IOProvider"), + patch( + "inputs.plugins.conversation_history_input.asyncio.sleep", new=AsyncMock() + ), + ): + config = ConversationHistoryConfig() + sensor = ConversationHistoryInput(config=config) + + mock_provider = MagicMock() + mock_provider.tick_counter = 5 + mock_voice_input = MagicMock() + mock_voice_input.input = None + mock_voice_input.tick = 5 + mock_provider.get_input.return_value = mock_voice_input + sensor.io_provider = mock_provider + + result = await sensor._poll() + + assert result is None