Skip to content

jmougeot/Ruby-call

Repository files navigation

Call Pipeline 🎯

Architecture scalable pour le traitement de calls audio : ingestion, transcription ASR, analyse NLP, scoring.

🏗️ Architecture Globale

┌─────────────────────────────────────────────────────────────────────────────┐
│                              INGESTION                                       │
│  ┌─────────┐    ┌─────────────┐    ┌─────────────┐                          │
│  │ Upload  │───▶│  API REST   │───▶│  Storage    │                          │
│  │ / Webhooks   │  (Hono)     │    │  (S3/MinIO) │                          │
│  └─────────┘    └──────┬──────┘    └─────────────┘                          │
└────────────────────────┼────────────────────────────────────────────────────┘
                         │
                         ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                           ORCHESTRATION                                      │
│  ┌─────────────┐    ┌─────────────────────────────────────────┐             │
│  │   Queue     │◀──▶│           Orchestrator                  │             │
│  │  (BullMQ)   │    │  (State machine, décide next step)      │             │
│  └──────┬──────┘    └─────────────────────────────────────────┘             │
└─────────┼───────────────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                              COMPUTE (Workers)                               │
│  ┌───────────┐   ┌───────────┐   ┌───────────┐   ┌───────────┐             │
│  │ Preprocess│──▶│ Transcribe│──▶│  Analyze  │──▶│   Score   │             │
│  │  Worker   │   │  Worker   │   │  Worker   │   │  Worker   │             │
│  └───────────┘   └───────────┘   └───────────┘   └───────────┘             │
└─────────────────────────────────────────────────────────────────────────────┘
          │
          ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│                               STORAGE                                        │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                      │
│  │  Postgres   │    │  Artifacts  │    │   Audio     │                      │
│  │  (Metadata) │    │  (JSON/S3)  │    │   (S3)      │                      │
│  └─────────────┘    └─────────────┘    └─────────────┘                      │
└─────────────────────────────────────────────────────────────────────────────┘

📁 Structure du Projet

call-pipeline/
├── packages/
│   ├── database/          # Schéma Drizzle + types Zod
│   ├── queue/             # BullMQ queues et workers factory
│   ├── storage/           # Abstraction S3/local storage
│   ├── orchestrator/      # Service de coordination pipeline
│   ├── workers/           # Workers de traitement
│   └── api/               # API REST (Hono)
├── docker/                # Dockerfiles
├── docker-compose.yml     # Infrastructure (Postgres, Redis, MinIO)
└── docker-compose.services.yml  # Services applicatifs

📦 MODULES EN DÉTAIL


📊 @call-pipeline/database

Responsabilité : Source de vérité pour les types, le schéma DB et la validation.

packages/database/
├── src/
│   ├── index.ts       # Connexion Drizzle + exports
│   ├── schema.ts      # Tables PostgreSQL (Drizzle)
│   └── types.ts       # Types TypeScript + Schemas Zod

Ce qu'il contient

Export Description
db Instance Drizzle connectée à PostgreSQL
calls, jobs, artifacts, teams, users, modelRuns Tables Drizzle
CallStatus, JobStatus, JobType, ArtifactType Enums TypeScript
TranscriptSchema, AnalysisSchema, ScoresSchema Schemas Zod pour validation
PreprocessJobPayload, TranscribeJobPayload, etc. Types des payloads de jobs
eq, and, or, desc, asc, ... Opérateurs Drizzle ré-exportés

Types de Payloads (définis ici, utilisés partout)

// Chaque worker reçoit un payload typé
type PreprocessJobPayload = {
  callId: string;
  teamId: string;
  pipelineVersion: string;
  audioRef: string;
  targetFormat: 'wav' | 'flac' | 'mp3';
  targetSampleRate: number;
}

type TranscribeJobPayload = {
  callId: string;
  teamId: string;
  pipelineVersion: string;
  audioRef: string;
  model: string;
  language?: string;
}

// etc.

Pourquoi c'est ici ?

  • Single Source of Truth : Les types de payloads sont définis une fois
  • Partagé partout : queue, workers, orchestrator, api importent depuis ici
  • Validation Zod : Les workers valident les données entrantes

🚇 @call-pipeline/queue

Responsabilité : Infrastructure BullMQ - factories, instances de queues, dispatchers typés.

packages/queue/
├── src/
│   ├── index.ts       # Factories (createQueue, createWorker) + connexion Redis
│   └── queues.ts      # Instances de queues + dispatchers typés

Exports principaux

Export Type Description
createQueue<T, N>() Factory Crée une queue BullMQ typée
createWorker<T, R>() Factory Crée un worker avec logging
createQueueEvents() Factory Pour monitoring des queues
QueueName Enum Noms des queues (preprocess, transcribe, etc.)
preprocessQueue, transcribeQueue, analyzeQueue, scoreQueue Instances Queues pré-configurées
orchestratorQueue Instance Queue des événements orchestrateur
dispatchPreprocessJob(), dispatchTranscribeJob(), etc. Functions Dispatchers typés avec idempotence

Pattern de Dispatch (clé du système)

dispatchTranscribeJob(callId, {
  callId,
  teamId,
  pipelineVersion,
  audioRef,
  model: 'whisper-large-v3',
}, { version: pipelineVersion });

// Le dispatcher garantit :
// - jobName = 'TRANSCRIBE_CALL' (type de travail)
// - jobId = `${callId}-transcribe-v1` (instance unique, idempotent)
// - data = TranscribeJobPayload (typé)

Distinction Cruciale

Concept Description Exemple
jobName Ce que le worker sait faire 'TRANSCRIBE_CALL'
jobId Quelle instance précise 'call_123-transcribe-v1'

L'idempotence est garantie par jobId : BullMQ ignore les jobs avec un jobId déjà existant.


🗄️ @call-pipeline/storage

Responsabilité : Abstraction du stockage objet (S3/MinIO).

packages/storage/
├── src/
│   └── index.ts       # Client S3 + helpers

API

import { storage } from '@call-pipeline/storage';

// Upload
await storage.upload('audio/call_123.wav', buffer, 'audio/wav');

// Download
const data = await storage.download('transcripts/call_123.json');

// URL signée (pour téléchargement client)
const url = await storage.getSignedUrl('audio/call_123.wav', 3600);

// Vérifier existence
const exists = await storage.exists('audio/call_123.wav');

// Supprimer
await storage.delete('audio/call_123.wav');

Configuration

Variables d'environnement :

  • S3_ENDPOINT - URL du service S3 (MinIO en dev)
  • S3_ACCESS_KEY / S3_SECRET_KEY - Credentials
  • S3_BUCKET - Nom du bucket
  • S3_REGION - Région (optionnel)

🎭 @call-pipeline/orchestrator

Responsabilité : Cerveau du pipeline - décide quoi faire après chaque étape.

packages/orchestrator/
├── src/
│   ├── index.ts       # Worker qui écoute orchestratorQueue
│   ├── handlers.ts    # Logique de state machine
│   └── logger.ts      # Logger Pino

Comment ça marche

  1. Écoute les événements sur orchestratorQueue
  2. Décide de la prochaine étape selon l'événement
  3. Dispatch le job correspondant via les dispatchers
// Événement reçu d'un worker
{
  type: 'preprocess-completed',
  callId: 'call_123',
  teamId: 'team_abc',
  payload: { normalizedAudioRef: 'audio/call_123_normalized.wav' }
}

// L'orchestrateur décide : "preprocess terminé → lancer transcribe"
await dispatchTranscribeJob(callId, {
  callId,
  teamId,
  pipelineVersion,
  audioRef: payload.normalizedAudioRef,
  model: 'whisper-large-v3',
}, { version: pipelineVersion });

State Machine

CALL_INGESTED
     │
     ▼
PREPROCESS_REQUESTED → PREPROCESS_COMPLETED
                              │
                              ▼
               TRANSCRIBE_REQUESTED → TRANSCRIBE_COMPLETED
                                             │
                                             ▼
                              ANALYZE_REQUESTED → ANALYZE_COMPLETED
                                                        │
                                                        ▼
                                         SCORE_REQUESTED → SCORE_COMPLETED
                                                                  │
                                                                  ▼
                                                        PIPELINE_COMPLETED

Handlers

Événement Action
call-ingested Lance dispatchPreprocessJob
preprocess-completed Lance dispatchTranscribeJob
transcribe-completed Lance dispatchAnalyzeJob
analyze-completed Lance dispatchScoreJob
score-completed Marque le call comme completed
pipeline-failed Marque le call comme failed

⚙️ @call-pipeline/workers

Responsabilité : Exécution du travail lourd (compute).

packages/workers/
├── src/
│   ├── index.ts       # Point d'entrée (lance tous les workers)
│   ├── preprocess.ts  # Worker de normalisation audio
│   ├── transcribe.ts  # Worker ASR (Whisper)
│   ├── analyze.ts     # Worker LLM (extraction)
│   ├── score.ts       # Worker de scoring
│   └── lib/
│       ├── logger.ts  # Logger Pino
│       └── storage.ts # Accès storage

Structure d'un Worker

// transcribe.ts
import { createWorker, QueueName, Job, orchestratorQueue } from '@call-pipeline/queue';
import { db, jobs, artifacts, ... } from '@call-pipeline/database';

async function processTranscribe(job: Job<TranscribeJobPayload>): Promise<void> {
  const { callId, teamId, audioRef, model } = job.data;
  
  // 1. Télécharger l'audio
  const audio = await storage.download(audioRef);
  
  // 2. Appeler le service ASR
  const transcript = await callASR(audio, model);
  
  // 3. Stocker l'artifact
  await storage.upload(`transcripts/${callId}.json`, transcript);
  await db.insert(artifacts).values({ ... });
  
  // 4. Notifier l'orchestrateur
  await orchestratorQueue.add('transcribe-completed', {
    type: 'transcribe-completed',
    callId,
    teamId,
    payload: { transcriptArtifactId: artifact.id },
    timestamp: new Date().toISOString(),
  });
}

// Créer le worker
const worker = createWorker<TranscribeJobPayload, void>(
  QueueName.TRANSCRIBE,
  processTranscribe,
  { concurrency: 5 }
);

Workers disponibles

Worker Input Output Service externe
preprocess Audio brut Audio normalisé (WAV 16kHz) FFmpeg
transcribe Audio normalisé Transcript JSON Whisper API
analyze Transcript Analysis (summary, topics, sentiment) LLM (GPT/Claude)
score Analysis Scores (0-100) LLM ou règles

🌐 @call-pipeline/api

Responsabilité : API REST pour les clients externes.

packages/api/
├── src/
│   ├── index.ts       # Point d'entrée
│   ├── app.ts         # Configuration Hono
│   ├── routes/
│   │   ├── calls.ts     # CRUD calls + upload + reprocess
│   │   ├── artifacts.ts # Récupération artifacts
│   │   ├── jobs.ts      # Status des jobs
│   │   └── health.ts    # Health checks
│   └── lib/
│       └── logger.ts

Endpoints

Calls
Method Endpoint Description
GET /api/v1/calls Liste les calls (pagination, filtres)
POST /api/v1/calls Crée un nouveau call
GET /api/v1/calls/:id Détails d'un call avec artifacts
POST /api/v1/calls/:id/upload Upload de l'audio (multipart)
POST /api/v1/calls/:id/reprocess Relance le pipeline depuis une étape
Artifacts
Method Endpoint Description
GET /api/v1/artifacts/:id Metadata d'un artifact
GET /api/v1/artifacts/:id/content Contenu JSON inline
GET /api/v1/artifacts/:id/download URL signée S3
Jobs
Method Endpoint Description
GET /api/v1/jobs/:id Status d'un job
GET /api/v1/jobs?callId=xxx Jobs d'un call
Health
Method Endpoint Description
GET /health Liveness check
GET /ready Readiness (DB + Redis + S3)

🔄 Flux de Données Complet

1. Client POST /api/v1/calls
   └─▶ API crée entry en DB
   └─▶ API envoie 'call-ingested' → orchestratorQueue

2. Orchestrator reçoit 'call-ingested'
   └─▶ dispatchPreprocessJob() → preprocessQueue

3. Preprocess Worker traite le job
   └─▶ Télécharge audio depuis S3
   └─▶ Normalise (FFmpeg)
   └─▶ Upload audio normalisé → S3
   └─▶ Crée artifact en DB
   └─▶ Envoie 'preprocess-completed' → orchestratorQueue

4. Orchestrator reçoit 'preprocess-completed'
   └─▶ dispatchTranscribeJob() → transcribeQueue

5. Transcribe Worker traite le job
   └─▶ Télécharge audio normalisé
   └─▶ Appelle Whisper API
   └─▶ Stocke transcript → S3 + DB
   └─▶ Envoie 'transcribe-completed' → orchestratorQueue

6. ... (analyze, score)

7. Orchestrator reçoit 'score-completed'
   └─▶ UPDATE calls SET status = 'completed'

🎯 5 Règles de Scalabilité

  1. Asynchrone partout pour le lourd (queue + workers)
  2. Idempotence : un job rejoué ne duplique rien (callId + step + version)
  3. Artefacts immuables : on écrit de nouveaux artifacts, jamais d'écrasement
  4. Versioning : schema/pipeline/model/prompt stockés avec chaque résultat
  5. Découplage storage/compute : audio et JSON hors des workers

🚀 Quick Start

1. Démarrer l'infrastructure

docker compose up -d
docker compose ps

2. Installer et builder

pnpm install
pnpm build

3. Configurer l'environnement

cp .env.example .env

4. Lancer en dev

# API
pnpm --filter @call-pipeline/api dev

# Orchestrator
pnpm --filter @call-pipeline/orchestrator dev

# Workers
pnpm --filter @call-pipeline/workers dev

Scaling

docker compose up --scale workers=5

# Ou par type
WORKER_TYPE=transcribe docker compose up workers

🔧 Technologies

Couche Technologie
Runtime Node.js 20+
Language TypeScript
API Hono
Database PostgreSQL + Drizzle ORM
Queue BullMQ + Redis
Storage S3-compatible (MinIO en dev)
Validation Zod
Monorepo pnpm workspaces + Turborepo

About

Tool too analyse calls

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages