Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,7 @@ root/
Cerebrum/*

# ignore Kiro
/.kiro
/.kiro

# mem0
/.mem0
10 changes: 10 additions & 0 deletions aios/config/config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ memory:
log_mode: "console" # choose from [console, file]
provider: "in-house" # Options: in-house, mem0, zep

# Personalization settings (all opt-in, disabled by default)
# auto_extract: saves conversation turns as memories after each chat LLM call
# auto_inject: retrieves relevant memories and prepends them before each chat LLM call
auto_extract: false
auto_inject: false
relevance_threshold: 0.5 # Minimum similarity score for memory injection
max_injected_memories: 5 # Max memories to inject per LLM call
max_memory_tokens: 1500 # Token budget for injected memory block

# Mem0 provider configuration (used when provider: "mem0")
mem0:
api_key: "" # Optional: for Mem0 cloud
Expand All @@ -83,6 +92,7 @@ memory:
provider: "chroma"
config:
collection_name: "mem0_memories"
# path: ".mem0/chroma" # ChromaDB persistence directory (default: .mem0/chroma)

# Zep provider configuration (used when provider: "zep")
zep:
Expand Down
2 changes: 1 addition & 1 deletion aios/hooks/modules/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def useFactory(
thread_pool = ThreadPoolExecutor(max_workers=params.max_workers)
manager = AgentManager('https://app.aios.foundation')

send_request, _ = useSysCall()
send_request, _, _ = useSysCall()

@validate(AgentSubmitDeclaration)
def submitAgent(declaration_params: AgentSubmitDeclaration) -> int:
Expand Down
233 changes: 233 additions & 0 deletions aios/memory/context_injector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
"""
Context Injector for the AIOS personalization pipeline.

Retrieves relevant memories from the configured memory provider and
prepends them as a system message to the LLM query's message list,
enabling personalized agent responses based on prior interactions.
"""
import logging
from typing import TYPE_CHECKING, Optional

from cerebrum.llm.apis import LLMQuery
from cerebrum.memory.apis import MemoryQuery

if TYPE_CHECKING:
from aios.memory.manager import MemoryManager

logger = logging.getLogger(__name__)


class ContextInjector:
"""Retrieves relevant memories and injects them into LLM
query messages.

Uses the memory provider's ``retrieve_memory`` operation to
find memories scoped to the requesting agent, filters by
relevance score, formats them into a delimited system message,
and prepends it at index 0 of the query's message list.
"""

def __init__(
self,
memory_manager: "MemoryManager",
config: dict,
) -> None:
"""
Args:
memory_manager: Initialized MemoryManager instance.
config: Memory config section from config.yaml.
"""
self.memory_manager = memory_manager
self.enabled = config.get("auto_inject", False)
self.max_memories = config.get(
"max_injected_memories", 5
)
self.relevance_threshold = config.get(
"relevance_threshold", 0.5
)
self.max_tokens = config.get("max_memory_tokens", 1500)

# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------

def inject(
self,
agent_name: str,
query: LLMQuery,
) -> LLMQuery:
"""Retrieve relevant memories and prepend as a system
message.

Returns the query unmodified when:
- ``auto_inject`` is disabled
- no user-role messages exist in the query
- no memories survive filtering
- any exception occurs during retrieval or formatting
"""
if not self.enabled:
return query

try:
user_text = self._extract_latest_user_message(
query.messages
)
if user_text is None:
return query

# Retrieve memories scoped to this agent
mem_query = MemoryQuery(
operation_type="retrieve_memory",
params={
"content": user_text,
"k": self.max_memories,
"user_id": agent_name,
},
)
response = (
self.memory_manager.provider.retrieve_memory(
mem_query
)
)

if (
not response.success
or not response.search_results
):
logger.info(
"No memories retrieved for user_id=%s",
agent_name,
)
return query

results = response.search_results
logger.info(
"Retrieved %d memories for user_id=%s",
len(results),
agent_name,
)

# Filter by relevance threshold
filtered = []
for mem in results:
score = mem.get("score")
if score is None:
# No score from provider — include by default
filtered.append(mem)
elif score >= self.relevance_threshold:
filtered.append(mem)
else:
logger.debug(
"Excluded memory (score=%.3f < "
"threshold=%.3f): %s",
score,
self.relevance_threshold,
(mem.get("content", ""))[:60],
)

if not filtered:
logger.info(
"All memories excluded by relevance "
"threshold for user_id=%s",
agent_name,
)
return query

# Sort by score descending (most relevant first)
filtered.sort(
key=lambda m: m.get("score", 0),
reverse=True,
)

# Truncate by token budget, removing least
# relevant first
filtered = self._truncate_by_token_budget(
filtered
)

if not filtered:
return query

# Build and prepend the system message
block = self._format_memory_block(filtered)
system_msg = {
"role": "system",
"content": block,
}
query.messages = [system_msg] + query.messages

logger.info(
"Injected %d memories for user_id=%s",
len(filtered),
agent_name,
)
return query

except Exception:
logger.warning(
"Context injection failed for "
"user_id=%s",
agent_name,
exc_info=True,
)
return query

# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------

@staticmethod
def _extract_latest_user_message(
messages: list,
) -> Optional[str]:
"""Return the content of the last user-role message,
or ``None`` if none exists."""
for msg in reversed(messages):
if msg.get("role") == "user":
return msg.get("content")
return None

@staticmethod
def _format_memory_block(
memories: list,
) -> str:
"""Format memories into a delimited system message
string."""
lines = [
"===== MEMORY CONTEXT =====",
"The following are relevant memories from prior "
"interactions with this user. Use them to "
"personalize your response:",
"",
]
for mem in memories:
ts = mem.get("timestamp", "unknown")
content = mem.get("content", "")
lines.append(f"- [{ts}] {content}")

lines.append("")
lines.append("===== END MEMORY CONTEXT =====")
return "\n".join(lines)

@staticmethod
def _estimate_tokens(text: str) -> int:
"""Rough token estimate: words * 1.3."""
return int(len(text.split()) * 1.3)

def _truncate_by_token_budget(
self,
memories: list,
) -> list:
"""Remove least-relevant memories until the formatted
block fits within ``max_tokens``.

Memories are assumed to be sorted by score descending.
We remove from the tail (lowest score) first.
"""
while memories:
block = self._format_memory_block(memories)
if self._estimate_tokens(block) <= self.max_tokens:
return memories
# Drop the least relevant (last item)
memories = memories[:-1]
return memories
120 changes: 120 additions & 0 deletions aios/memory/conversation_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""
Conversation Extractor for the AIOS personalization pipeline.

Extracts user+assistant conversation pairs from completed LLM turns
and stores them as memories via the configured memory provider.
Runs asynchronously in a daemon thread so it never blocks LLM responses.
"""
import logging
import threading
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from aios.memory.manager import MemoryManager

logger = logging.getLogger(__name__)


class ConversationExtractor:
"""Extracts conversation turns and stores them as memories.

Uses the memory provider's ``add_memory`` operation (never
``add_agentic_memory``) to persist conversation pairs scoped
to the originating agent name.
"""

def __init__(
self,
memory_manager: "MemoryManager",
config: dict,
) -> None:
"""
Args:
memory_manager: Initialized MemoryManager instance.
config: Memory config section from config.yaml.
"""
self.memory_manager = memory_manager
self.enabled = config.get("auto_extract", False)

# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------

def extract_async(
self,
agent_name: str,
user_message: str,
assistant_message: str,
) -> None:
"""Store a conversation pair in a background daemon thread.

No-op when ``auto_extract`` is disabled. Errors are logged
at WARNING level and never raised.
"""
if not self.enabled:
return

try:
thread = threading.Thread(
target=self._store_conversation,
args=(agent_name, user_message, assistant_message),
daemon=True,
)
thread.start()
except Exception:
logger.warning(
"Failed to spawn extraction thread",
exc_info=True,
)

# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------

def _store_conversation(
self,
agent_name: str,
user_message: str,
assistant_message: str,
) -> None:
"""Synchronous storage called from the background thread."""
try:
from aios.memory.note import MemoryNote

content = self._build_conversation_content(
user_message, assistant_message,
)

memory_note = MemoryNote(
content=content,
context="conversation",
category="conversation",
)
# Attach provider-specific metadata so Mem0Provider
# picks up the correct user_id scope.
memory_note.metadata = {"user_id": agent_name}

result = self.memory_manager.provider.add_memory(
memory_note
)

logger.info(
"Stored conversation memory for user_id=%s: %s",
agent_name,
content[:80],
)
except Exception as e:
logger.warning(
"Conversation extraction failed for "
"user_id=%s",
agent_name,
exc_info=True,
)

@staticmethod
def _build_conversation_content(
user_message: str,
assistant_message: str,
) -> str:
"""Format the conversation pair for storage."""
return f"User: {user_message}\nAssistant: {assistant_message}"
Loading
Loading