Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions orchestrator/search/docs/architecture/indexing-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
# Indexing

## Overview

The indexer processes entities in chunks, extracts path/value pairs, manages token budgets for embedding calls, and uses content hashing for incremental updates.

1. **Chunking**: entities are read in configurable groups (default: 1000) so large datasets can be processed without exhausting memory.

2. **Field extraction**: entities are traversed into `path:value:type` records that can be indexed.

3. **Change detection**: content hashes are computed for each `path:value:type` and compared against existing records. Only new or changed fields are prepared for indexing, while stale fields are scheduled for deletion.

4. **Per-chunk batching and upsert**: within each chunk, fields are split into two buffers:
- **Embeddable fields** (semantic strings) are tracked against a token budget. A flush occurs before exceeding the model’s context window or batch size. These fields are converted to vectors via the embedding API.
- **Non-embeddable fields** (UUIDs, integers, booleans, datetimes) are accumulated in parallel. They don’t influence the token budget but are always flushed together with the embeddable buffer.
Once flushed, both buffers are merged and written as a single UPSERT batch, after removing stale paths.

## Core Components

### 1. Streaming Entity Processing

The indexer reads entities from the database using `yield_per` to avoid loading large datasets into memory.

### 2. Field Traversal and Extraction

Each entity is passed to its registered traverser (see `traverse.py`).
Traversers walk the object hierarchy defined by Pydantic models and flatten it into `path:value:type` records that can be indexed.
This makes deeply nested product structures compatible with PostgreSQL’s `LTree` paths.

- **SubscriptionTraverser**: Resolves the root subscription and its linked product.
- **ProductTraverser**: Walks the product schema and its blocks.
- **Process/WorkflowTraversers**: Apply schema validation.

#### Example: Simple Subscription

```python
from uuid import UUID
from orchestrator.domain.base import SubscriptionModel, ProductModel, ProductBlockModel

class BasicBlock(ProductBlockModel):
name: str
value: int
enabled: bool

class SimpleProduct(ProductModel):
product_id: UUID
name: str
basic_block: BasicBlock

class SimpleSubscription(SubscriptionModel, is_base=True):
subscription_id: UUID
customer_id: str
product: SimpleProduct
```

Example instance:

```python
from uuid import UUID

SimpleSubscription(
subscription_id=UUID("abc12345-6789-0000-0000-000000000000"),
customer_id="test-customer",
product=SimpleProduct(
product_id=UUID("99999999-aaaa-0000-0000-000000000000"),
name="Simple Product",
basic_block=BasicBlock(name="SimpleBlock", value=42, enabled=True),
),
)
```

Flattened traversal:

```
subscription.subscription_id -> "abc12345-6789-0000-0000-000000000000" (UUID)
subscription.customer_id -> "test-customer" (STRING)
subscription.product.product_id -> "99999999-aaaa-0000-0000-000000000000" (UUID)
subscription.product.name -> "Simple Product" (STRING)
subscription.product.basic_block.name -> "SimpleBlock" (STRING)
subscription.product.basic_block.value -> "42" (INTEGER)
subscription.product.basic_block.enabled -> "True" (BOOLEAN)
```

### 3. Change Detection

The indexer uses content hashing to avoid reprocessing unchanged data:

```python
content_hash = sha256(f"{path}:{value}:{value_type}").hexdigest()
```

- Fetches existing hashes for each LTree record in the entity chunk
- Compares current vs existing hashes per field
- Only processes changed/new fields (unless `force_index=True`)

### 4. Two-Buffer Batching System

Within each chunk, extracted fields are split into two processing streams: one for fields that require embeddings and one for those that don’t.

#### Determining Embeddable Fields

Not all fields are suitable for embeddings. The indexer applies a **two-stage filter** during traversal to decide which fields to embed:

- **Type-based filter (Pydantic introspection)**: only fields declared as `str` are eligible for embeddings.
- **Value-based filter (semantic check)**: among strings, many values (UUIDs, dates, numeric strings) carry no semantic meaning.
- **Validation helpers**: functions such as `is_uuid`, `is_date`, etc. are applied to skip these non-semantic values.

This approach ensures embeddings are generated only for text with meaningful semantic content, minimizing storage and improving search relevance.

#### Embeddable Buffer (STRING fields)

- Tracks running token count against the model’s context window
- Flushes when adding another field would exceed the token budget or batch size
- Respects `EMBEDDING_MAX_BATCH_SIZE` for local models
- Embedding text format: `path: value` (e.g. `subscription.product.name: Simple Product`)

#### Non-Embeddable Buffer

- Collects UUID, INTEGER, BOOLEAN, and DATETIME fields in parallel
- Does not use token counting (no embeddings generated)
- Always flushed together with the embeddable buffer as part of the same upsert batch

## Configuration

| Setting | Description | Default / Source | Scope |
| ------------------------------- | -------------------------------------------- | ---------------- | ----------------- |
| `chunk_size` | Number of entities processed per transaction | `1000` | All models |
| `EMBEDDING_SAFE_MARGIN_PERCENT` | Safety margin applied to max token budget | `10%` / settings | All models |
| `EMBEDDING_MAX_BATCH_SIZE` | Maximum batch size for embedding calls | `32` / settings | Local models only |
| `EMBEDDING_FALLBACK_MAX_TOKENS` | Context window fallback if model unknown | `512` / settings | Local models only |

## Strengths

- Streaming processing in chunks prevents memory exhaustion
- Incremental updates via content hashing
- Batched embedding calls optimize API usage
- Token budget prevents embedding API errors
- Path, value, value_type format of an indexed record enables very accurate searching.

## Limitations

- **Record explosion**: Deeply nested models may generate a very large number of index records
- **Path rigidity**: requires reindexing when paths change
39 changes: 39 additions & 0 deletions orchestrator/search/docs/guides/running_index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Search Indexing CLI

Typer-based CLI for maintaining search indexes (subscriptions, products, processes, workflows).

## Usage

Run from project root:

```
dotenv run python main.py index [COMMAND] [OPTIONS]
```

### Commands

- `subscriptions` – index `subscription_search_index`
- `products` – index `product_search_index`
- `processes` – index `process_search_index`
- `workflows` – index `workflow_search_index`

### Options

Each command accepts the following options:

- `--subscription-id` / `--product-id` / `--process-id` / `--workflow-id` – UUID string of a specific entity (optional, default: process all entities)
- `--dry-run` – perform indexing operations without writing to database (boolean flag)
- `--force-index` – force re-indexing even if entity hash is unchanged (boolean flag)

### Examples

```
# Index all subscriptions
dotenv run python main.py index subscriptions

# Re-index all subscriptions
dotenv run python main.py index subscriptions --force-index

# Index a single subscription
dotenv run python main.py index subscriptions --subscription-id=<UUID>
```
44 changes: 19 additions & 25 deletions orchestrator/search/docs/index.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
# Search Indexing CLI
# Orchestrator Search

Typer-based CLI for maintaining search indexes (subscriptions, products, processes, workflows).
## Overview

## Usage
- [Searching](overview/searching.md): search types (semantic, fuzzy, structured, hybrid), routing rules, scoring.
- [Filters](overview/filters.md): structured filters.

Run from project root:
---

```
dotenv run python main.py index [COMMAND] [OPTIONS]
```
## How-to Guides

### Commands
Step-by-step instructions for various tasks.

- `subscriptions` – index `subscription_search_index`
- `products` – index `product_search_index`
- `processes` – index `process_search_index`
- `workflows` – index `workflow_search_index`
- [Run a full index](guides/running_index.md): How to run the indexing script.
- [Run local text embedding inference](guides/running_local_text_embedding_inference.md): How to use a local embedding server.

### Options
---

- `--<id>` – UUID of a specific entity (default: all)
- `--dry-run` – no DB writes
- `--force-index` – re-index even if unchanged
## Architecture

### Examples
- [Indexing flow](architecture/indexing-flow.md): chunking, change detection, batching, embeddings, upsert.

```
# Index all subscriptions
dotenv run python main.py index subscriptions
---

# Re-index all subscriptions
dotenv run python main.py index subscriptions --force-index
## Reference

# Index a single subscription
dotenv run python main.py index subscriptions --subscription-id=<UUID>
```
Details and definitions.

- **[Filter operators](reference/filter-operators.md)**: Supported operators per UI type.

---
148 changes: 148 additions & 0 deletions orchestrator/search/docs/overview/agent.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Agent

The AI Search Agent is an assistant that builds and executes structured database queries to retrieve information across orchestrator entities (subscriptions, workflows, products, and processes).

Built on `pydantic-ai` and exposed via FastAPI using the `ag-ui` protocol, the agent:

- Establishes a search context (entity type and action)

- Optionally builds a validated FilterTree using discovered field paths and type-safe operators

- Executes both broad and filtered searches against the search API

Its structured query building leverages Pydantic validation to ensure queries are safe, accurate, and include feedback loops for iterative refinement.

> The [ag-ui protocol](https://docs.ag-ui.com/introduction) is a lightweight, event-based interface for connecting AI agents to user-facing applications. It enables real-time interactions, state synchronization, and tool execution between the agent and UI environments such as chat consoles or admin panels.

> The protocol is natively supported by `pydantic-ai`, allowing agents to expose tools and state without any custom wiring or logic.

---

## Architecture

### Core Components

**Agent (`agent.py`)**

- Built on `pydantic-ai` framework
- Integrates search toolset and custom instructions

**State Management (`state.py`)**

- `SearchState`: Tracks current search parameters and results
- Maintains context across tool calls
- Stores filter trees and search outcomes

**Tool System (`tools.py`)**

- **`set_search_parameters`**: Initializes search context (entity type, query)
- **`set_filter_tree`**: Builds and validates structured filters
- **`execute_search`**: Runs database queries with current parameters
- **`discover_filter_paths`**: Finds valid paths for field names
- **`get_valid_operators`**: Returns compatible operators per field type

**Instructions (`prompts.py`)**

- Base instructions: Define agent role and workflow
- Dynamic instructions: Provide context-aware next step guiding

---

## Workflow

### 1. Context Setting

```
User: "Find active subscriptions for customer SURF"
set_search_parameters(entity_type=SUBSCRIPTION, action=SELECT)
```

### 2. Filter Discovery (if needed)

```
discover_filter_paths(["status", "customer"])
Returns: subscription.status (string), subscription.customer_id (string)

get_valid_operators()
Returns: string fields support [eq, neq, like]
```

### 3. Filter Construction

```json
{
"op": "AND",
"children": [
{
"path": "subscription.status",
"value_kind": "string",
"condition": { "op": "eq", "value": "active" }
},
{
"path": "subscription.customer_id",
"value_kind": "string",
"condition": { "op": "eq", "value": "SURF" }
}
]
}
```

### 4. Execution

```
execute_search(limit=10)

Returns results with an answer to the users message.
```

---

## Schema Awareness & Guidance

### Pydantic-Based Validation

All tool inputs are defined using **Pydantic models**, ensuring:

- Structural correctness (e.g., required fields, value types)
- Logical validation (e.g., wildcard enforcement for `LIKE` operations)
- Automatic transformations (e.g., inferred values for `has_component` operations)

For example, the `set_filter_tree()` tool accepts a `FilterTree` model that:

- Enforces maximum nesting depth
- Validates field types against UI type mappings
- Ensures all children are either `FilterTree` or `PathFilter` instances

### Auto-Generated Tool Context

When tools are registered with the agent, their input schemas are:

- **Automatically parsed** by `pydantic-ai`
- **Summarized and injected** into the agent’s prompt context
- **Enhanced with examples** from each model’s `json_schema_extra` configuration

This allows the agent to:

- Reason about correct input shapes
- Use schema-defined examples for constructing valid tool calls
- Avoid misuse of operators, paths, or nesting structure
- Safely generate structured queries with full validation and iterative feedback loops

---

## Integration

The agent integrates with the search system through:

- **Search Functions**: Direct calls to `search_subscriptions`, `search_workflows`, etc.
- **Validation**: Uses `validate_filter_tree` for runtime schema and logic checks
- **Path discovery**: Uses the /paths endpoint for dynamic field resolution
- **Type system**: Uses the /definitions endpoint to map field types to supported operators

---

## See also

- **[Searching](searching.md)**: Search types and routing
- **[Filters](filters.md)**: Filter structure and validation
- **[Filter Operators](../reference/filter-operators.md)**: Available operators
Loading