Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions examples/lead-followup-agent-confluent-cloud/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Lead Follow-Up Agent — Confluent Cloud

Same demo as [`lead-followup-agent`](../lead-followup-agent), but talks to a
**Confluent Cloud** cluster instead of a local Kafka broker. Every service
image is built from local source so you can test code changes against a real
cloud cluster without publishing images.

Use this example to validate that a set of Confluent Cloud API keys works
end-to-end across all FlightDeck processors (Java + Python).

## What's different from `lead-followup-agent`

| | `lead-followup-agent` | this example |
|---|---|---|
| Kafka broker | Local container (`apache/kafka:4.1.1`) | Confluent Cloud (SASL_SSL / PLAIN) |
| Service images | Pulled from `ghcr.io/tsuz/flightdeck/*` | Built from local source |
| Auth env vars | none | `CONFLUENT_*` in `.env` |

## Prerequisites

1. A Confluent Cloud cluster (any type — Basic is fine for testing).
2. A **cluster-scoped API key + secret** (Cloud → Cluster → API Keys → Add Key).
The cluster's Cloud API key will NOT work — it must be scoped to the
specific cluster.
3. A Claude API key.
4. Docker + `docker compose`.

## Pre-create topics in Confluent Cloud

> **Why this is required:** the `processing` service tries to auto-create
> topics with replication factor 1, which Confluent Cloud rejects (CC enforces
> RF ≥ 3). So topics must exist *before* the stack starts.

Topics use the agent name as a prefix. Assuming the default `AGENT_NAME=lead-followup`,
create these topics in the cluster (default partition count is fine, RF = 3):

```
lead-followup-message-input
lead-followup-enriched-message-input
lead-followup-think-request-response
lead-followup-tool-use
lead-followup-tool-use-result
lead-followup-tool-use-dlq
lead-followup-tool-use-all-complete
lead-followup-tool-use-latency
lead-followup-message-output
lead-followup-think-dlq
lead-followup-session-context
```

### Option A — Confluent Cloud Console

UI → Cluster → Topics → Add topic — repeat for each name above.

### Option B — `confluent` CLI

```bash
confluent login
confluent kafka cluster use <CLUSTER_ID>

for t in \
lead-followup-message-input \
lead-followup-enriched-message-input \
lead-followup-think-request-response \
lead-followup-tool-use \
lead-followup-tool-use-result \
lead-followup-tool-use-dlq \
lead-followup-tool-use-all-complete \
lead-followup-tool-use-latency \
lead-followup-message-output \
lead-followup-think-dlq \
lead-followup-session-context; do
confluent kafka topic create "$t"
done
```

If you change `AGENT_NAME`, change the topic prefix to match.

## Configure credentials

```bash
cp .env.example .env
```

Edit `.env` and fill in:

| Variable | Where to get it |
|---|---|
| `CONFLUENT_BOOTSTRAP_SERVERS` | Cluster → Cluster settings → Bootstrap server (e.g. `pkc-921jm.us-east-2.aws.confluent.cloud:9092`) |
| `CONFLUENT_API_KEY` | Cluster → API Keys → your key |
| `CONFLUENT_API_SECRET` | Shown once when the key was created |
| `CLAUDE_API_KEY` | console.anthropic.com → API Keys |

## Run

```bash
docker compose up --build
```

First build takes a few minutes (Maven downloads + multi-stage Java builds).
Subsequent runs are cached.

Open <http://localhost> and try a prompt like:

> "Find all dormant leads we haven't contacted since August 2025"

## How auth is wired

The processors all use the `KafkaEnvProps` pass-through pattern: any
`KAFKA_*` env var becomes a Kafka client config property (`KAFKA_FOO_BAR` →
`foo.bar`). The compose file sets these per service:

**Java services** (`api`, `processing`, `think-consumer`) — Apache Kafka client uses JAAS:
```
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="..." password="...";
```

**Python service** (`lead-tool-service`) — librdkafka takes credentials directly:
```
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=...
KAFKA_SASL_PASSWORD=...
```

`processing` additionally sets `KAFKA_REPLICATION_FACTOR=3` so Kafka Streams
creates its internal/changelog topics with the RF Confluent Cloud requires.

## Verifying the keys work

A successful end-to-end run means every service authenticated. To narrow down
the source of an auth failure:

```bash
# Show only auth/connection errors
docker compose logs api processing think-consumer lead-tool-service | grep -iE 'sasl|auth|disconnect|topicauthorization'
```

Common failure modes:

| Symptom | Likely cause |
|---|---|
| `SaslAuthenticationException: Authentication failed` | API key/secret wrong, or key not scoped to this cluster |
| `TopicAuthorizationException` on a `lead-followup-*` topic | ACL missing for the key — grant the key write/read on the topic prefix |
| `InvalidReplicationFactorException` from `processing` | A topic still missing — re-check the pre-create list above |
| `UnknownTopicOrPartitionException` | Topic name typo, or `AGENT_NAME` changed but topics not renamed |
| Hangs at startup, no errors | Bootstrap server hostname wrong, or outbound 9092 blocked |

## Clean up

```bash
docker compose down
```

Topics, ACLs, and the API key remain in Confluent Cloud — delete those from
the Cloud console if you don't want to be billed for storage.
74 changes: 74 additions & 0 deletions examples/lead-followup-agent-confluent-cloud/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
services:

# ─── Application Services (pulled from ghcr.io) ─────────────────────────────

api:
image: ghcr.io/tsuz/flightdeck/chat-api:0.0.2
ports:
- "8002:8000"
- "8003:8001"
environment:
AGENT_NAME: ${AGENT_NAME:-lead-followup}
PORT: 8000
WS_PORT: 8001
# Confluent Cloud connection (Java client — uses JAAS)
KAFKA_BOOTSTRAP_SERVERS: ${CONFLUENT_BOOTSTRAP_SERVERS}
KAFKA_SECURITY_PROTOCOL: SASL_SSL
KAFKA_SASL_MECHANISM: PLAIN
KAFKA_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${CONFLUENT_API_KEY}" password="${CONFLUENT_API_SECRET}";'

processing:
image: ghcr.io/tsuz/flightdeck/processing:0.0.2
environment:
AGENT_NAME: ${AGENT_NAME:-lead-followup}
MEMOIR_ENABLED: "false"
# Confluent Cloud connection (Java client — uses JAAS)
KAFKA_BOOTSTRAP_SERVERS: ${CONFLUENT_BOOTSTRAP_SERVERS}
KAFKA_SECURITY_PROTOCOL: SASL_SSL
KAFKA_SASL_MECHANISM: PLAIN
KAFKA_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${CONFLUENT_API_KEY}" password="${CONFLUENT_API_SECRET}";'
# Confluent Cloud requires RF >= 3 for Streams internal/changelog topics
KAFKA_REPLICATION_FACTOR: "3"

think-consumer:
image: ghcr.io/tsuz/flightdeck/think-consumer:0.0.2
volumes:
- ./tools.json:/app/tools.json:ro
- ./system-prompt.txt:/app/system-prompt.txt:ro
environment:
AGENT_NAME: ${AGENT_NAME:-lead-followup}
CLAUDE_API_KEY: ${CLAUDE_API_KEY}
CLAUDE_MODEL: ${CLAUDE_MODEL:-claude-haiku-4-5-20251001}
CLAUDE_MAX_TOKENS: ${CLAUDE_MAX_TOKENS:-8096}
INPUT_TOKEN_PRICE: ${INPUT_TOKEN_PRICE:-1}
OUTPUT_TOKEN_PRICE: ${OUTPUT_TOKEN_PRICE:-5}
BUDGET_PRICE_PER_SESSION: ${BUDGET_PRICE_PER_SESSION:-0.5}
TOOLS_JSON_FILE: /app/tools.json
SYSTEM_PROMPT_FILE: /app/system-prompt.txt
# Confluent Cloud connection (Java client — uses JAAS)
KAFKA_BOOTSTRAP_SERVERS: ${CONFLUENT_BOOTSTRAP_SERVERS}
KAFKA_SECURITY_PROTOCOL: SASL_SSL
KAFKA_SASL_MECHANISM: PLAIN
KAFKA_SASL_JAAS_CONFIG: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="${CONFLUENT_API_KEY}" password="${CONFLUENT_API_SECRET}";'

lead-tool-service:
build:
context: ../..
dockerfile: examples/lead-followup-agent-confluent-cloud/lead-tool-service/Dockerfile
environment:
AGENT_NAME: ${AGENT_NAME:-lead-followup}
# Confluent Cloud connection (Python / librdkafka — uses sasl.username + sasl.password)
KAFKA_BOOTSTRAP_SERVERS: ${CONFLUENT_BOOTSTRAP_SERVERS}
KAFKA_SECURITY_PROTOCOL: SASL_SSL
KAFKA_SASL_MECHANISM: PLAIN
KAFKA_SASL_USERNAME: ${CONFLUENT_API_KEY}
KAFKA_SASL_PASSWORD: ${CONFLUENT_API_SECRET}

# ─── Frontend ────────────────────────────────────────────────────────────────

frontend:
image: ghcr.io/tsuz/flightdeck/frontend:0.0.2
ports:
- "8080:80"
depends_on:
- api
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM python:3.11-slim
WORKDIR /app

# Install SDK
COPY sdk/python /tmp/sdk
RUN pip install --no-cache-dir /tmp/sdk && rm -rf /tmp/sdk

COPY examples/lead-followup-agent-confluent-cloud/lead-tool-service/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY examples/lead-followup-agent-confluent-cloud/lead-tool-service/app.py .

ENV PYTHONUNBUFFERED=1

CMD ["python", "app.py"]
149 changes: 149 additions & 0 deletions examples/lead-followup-agent-confluent-cloud/lead-tool-service/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
Lead CRM tool consumer — rewritten using the flightdeck SDK.

Compare with app.py to see how the SDK eliminates all Kafka boilerplate.
The developer only writes tool functions and a dispatch handler.
"""

import json
import os
import time
from copy import deepcopy
from datetime import datetime, timezone

from flightdeck_sdk import ToolConsumerRunner, ToolConsumerConfig

# ── Mock CRM Database ────────────────────────────────────────────────────────

LEADS = {
"lead-001": {
"lead_id": "lead-001",
"name": "Jane Chen",
"email": "jane.chen@acmecorp.com",
"company": "Acme Corp",
"industry": "ecommerce",
"deal_value": 45000,
"status": "dormant",
"last_contact": "2025-08-12",
"created": "2025-03-01",
"activities": [
{"date": "2025-03-01", "type": "note", "summary": "Inbound lead from website demo request"},
],
},
"lead-002": {
"lead_id": "lead-002",
"name": "Marcus Johnson",
"email": "m.johnson@brighthealth.io",
"company": "BrightHealth",
"industry": "healthcare",
"deal_value": 120000,
"status": "dormant",
"last_contact": "2025-06-20",
"created": "2025-02-14",
"activities": [],
},
}


# ── Tool implementations ────────────────────────────────────────────────────


def search_leads(input_data):
results = []
status = input_data.get("status")
industry = input_data.get("industry")
limit = input_data.get("limit", 10)

for lead in LEADS.values():
if status and lead["status"] != status:
continue
if industry and lead["industry"] != industry:
continue
results.append({
"lead_id": lead["lead_id"],
"name": lead["name"],
"company": lead["company"],
"status": lead["status"],
})

return {"leads": results[:limit], "count": len(results[:limit])}


def get_lead_details(input_data):
lead = LEADS.get(input_data.get("lead_id", ""))
if not lead:
return {"error": f"Lead '{input_data.get('lead_id')}' not found"}
return deepcopy(lead)


def update_lead_status(input_data):
lead = LEADS.get(input_data.get("lead_id", ""))
if not lead:
return {"error": f"Lead '{input_data.get('lead_id')}' not found"}
old_status = lead["status"]
lead["status"] = input_data.get("status", "")
return {"lead_id": lead["lead_id"], "old_status": old_status, "new_status": lead["status"]}


def log_activity(input_data):
lead = LEADS.get(input_data.get("lead_id", ""))
if not lead:
return {"error": f"Lead '{input_data.get('lead_id')}' not found"}
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
activity = {"date": today, "type": input_data.get("type", "note"), "summary": input_data.get("summary", "")}
lead["activities"].append(activity)
lead["last_contact"] = today
return {"lead_id": lead["lead_id"], "activity": activity}


def draft_followup_email(input_data):
lead = LEADS.get(input_data.get("lead_id", ""))
if not lead:
return {"error": f"Lead '{input_data.get('lead_id')}' not found"}
return {
"to": lead["email"],
"name": lead["name"],
"company": lead["company"],
"reason": input_data.get("reason", "general follow-up"),
}


TOOLS = {
"search_leads": search_leads,
"get_lead_details": get_lead_details,
"update_lead_status": update_lead_status,
"log_activity": log_activity,
"draft_followup_email": draft_followup_email,
}


# ── Process function (the only thing the SDK needs) ─────────────────────────


def process(key, value, ctx):
request = json.loads(value)
tool_name = request.get("name", "")

handler = TOOLS.get(tool_name)
if handler is None:
ctx.error(f"Unknown tool: {tool_name}")
return

try:
result = handler(request.get("input", {}))
ctx.success(result)
except Exception as e:
ctx.error(str(e))


# ── Entry point ─────────────────────────────────────────────────────────────

if __name__ == "__main__":
runner = ToolConsumerRunner(
ToolConsumerConfig(
agent_name=os.environ["AGENT_NAME"],
brokers=os.environ.get("KAFKA_BOOTSTRAP_SERVERS", "kafka:29092"),
process_fn=process,
)
)
runner.start()
Loading
Loading