Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 62 additions & 26 deletions backends/advanced/src/advanced_omi_backend/workers/audio_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import asyncio
import json
import logging
import os
import time
Expand All @@ -30,7 +31,7 @@ async def audio_streaming_persistence_job(
client_id: str,
always_persist: bool = False,
*,
redis_client=None
redis_client=None,
) -> Dict[str, Any]:
"""
Long-running RQ job that stores audio chunks in MongoDB with Opus compression.
Expand All @@ -57,7 +58,9 @@ async def audio_streaming_persistence_job(
cross-process config cache issues.
"""

logger.info(f"🎵 Starting MongoDB audio persistence for session {session_id} (always_persist={always_persist})")
logger.info(
f"🎵 Starting MongoDB audio persistence for session {session_id} (always_persist={always_persist})"
)

# Setup audio persistence consumer group (separate from transcription consumer)
audio_stream_name = f"audio:stream:{client_id}"
Expand All @@ -66,12 +69,11 @@ async def audio_streaming_persistence_job(

try:
await redis_client.xgroup_create(
audio_stream_name,
audio_group_name,
"0",
mkstream=True
audio_stream_name, audio_group_name, "0", mkstream=True
)
logger.info(
f"📦 Created audio persistence consumer group for {audio_stream_name}"
)
logger.info(f"📦 Created audio persistence consumer group for {audio_stream_name}")
except Exception as e:
if "BUSYGROUP" not in str(e):
logger.warning(f"Failed to create audio consumer group: {e}")
Expand All @@ -90,6 +92,7 @@ async def audio_streaming_persistence_job(
if existing_conversation_id:
existing_id_str = existing_conversation_id.decode()
from advanced_omi_backend.models.conversation import Conversation

existing_conv = await Conversation.find_one(
Conversation.conversation_id == existing_id_str
)
Expand Down Expand Up @@ -118,15 +121,13 @@ async def audio_streaming_persistence_job(
transcript_versions=[],
memory_versions=[],
processing_status="pending_transcription",
always_persist=True
always_persist=True,
)
await conversation.insert()

# Set conversation:current Redis key
await redis_client.set(
conversation_key,
conversation.conversation_id,
ex=3600 # 1 hour expiry
conversation_key, conversation.conversation_id, ex=3600 # 1 hour expiry
)

logger.info(
Expand Down Expand Up @@ -165,13 +166,29 @@ async def audio_streaming_persistence_job(
chunk_index = 0 # Sequential chunk counter for current conversation
chunk_start_time = 0.0 # Start time of current buffered chunk

# Chunk configuration
# Read actual sample rate from the session's audio_format stored in Redis
# Same pattern as streaming_consumer.py:634-644
SAMPLE_RATE = 16000
SAMPLE_WIDTH = 2 # 16-bit
CHANNELS = 1 # Mono
try:
audio_format_raw = await redis_client.hget(session_key, "audio_format")
if audio_format_raw:
audio_format = json.loads(audio_format_raw)
SAMPLE_RATE = int(audio_format.get("rate", 16000))
SAMPLE_WIDTH = int(audio_format.get("width", 2))
CHANNELS = int(audio_format.get("channels", 1))
logger.info(
f"🎵 Audio format from Redis: {SAMPLE_RATE}Hz, {SAMPLE_WIDTH*8}-bit, {CHANNELS}ch"
)
except Exception as e:
logger.warning(
f"Failed to read audio_format from Redis for {session_id}, using defaults: {e}"
)

CHUNK_DURATION_SECONDS = 10.0
BYTES_PER_SECOND = SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS # 32,000 bytes/sec
CHUNK_SIZE_BYTES = int(CHUNK_DURATION_SECONDS * BYTES_PER_SECOND) # 320,000 bytes
BYTES_PER_SECOND = SAMPLE_RATE * SAMPLE_WIDTH * CHANNELS
CHUNK_SIZE_BYTES = int(CHUNK_DURATION_SECONDS * BYTES_PER_SECOND)

# Session stats (across all conversations)
total_pcm_bytes = 0
Expand All @@ -185,6 +202,7 @@ async def audio_streaming_persistence_job(
from rq import get_current_job

from advanced_omi_backend.utils.job_utils import check_job_alive

current_job = get_current_job()

async def flush_pcm_buffer() -> bool:
Expand All @@ -207,7 +225,7 @@ async def flush_pcm_buffer() -> bool:
pcm_data=bytes(pcm_buffer),
sample_rate=SAMPLE_RATE,
channels=CHANNELS,
bitrate=24 # 24kbps for speech
bitrate=24, # 24kbps for speech
)

# Calculate chunk metadata
Expand Down Expand Up @@ -247,7 +265,9 @@ async def flush_pcm_buffer() -> bool:
# Calculate running totals
chunk_count = chunk_index + 1
total_duration = end_time
compression_ratio = compressed_size / original_size if original_size > 0 else 0.0
compression_ratio = (
compressed_size / original_size if original_size > 0 else 0.0
)

# Update conversation fields
conversation.audio_chunks_count = chunk_count
Expand All @@ -271,7 +291,9 @@ async def flush_pcm_buffer() -> bool:
return True

except Exception as e:
logger.error(f"❌ Failed to save audio chunk {chunk_index}: {e}", exc_info=True)
logger.error(
f"❌ Failed to save audio chunk {chunk_index}: {e}", exc_info=True
)
return False

while True:
Expand Down Expand Up @@ -303,7 +325,7 @@ async def flush_pcm_buffer() -> bool:
audio_consumer_name,
{audio_stream_name: ">"},
count=50,
block=500
block=500,
)

if final_messages:
Expand All @@ -322,9 +344,13 @@ async def flush_pcm_buffer() -> bool:
chunk_index += 1
chunk_start_time += CHUNK_DURATION_SECONDS

await redis_client.xack(audio_stream_name, audio_group_name, message_id)
await redis_client.xack(
audio_stream_name, audio_group_name, message_id
)

logger.info(f"📦 Final read processed {len(final_messages[0][1])} messages")
logger.info(
f"📦 Final read processed {len(final_messages[0][1])} messages"
)

except Exception as e:
logger.debug(f"Final audio read error (non-fatal): {e}")
Expand Down Expand Up @@ -377,7 +403,11 @@ async def flush_pcm_buffer() -> bool:
if current_conversation_id and len(pcm_buffer) > 0:
# Flush final partial chunk
await flush_pcm_buffer()
duration = (time.time() - conversation_start_time) if conversation_start_time else 0
duration = (
(time.time() - conversation_start_time)
if conversation_start_time
else 0
)
logger.info(
f"✅ Conversation {current_conversation_id[:12]} ended: "
f"{chunk_index + 1} chunks, {duration:.1f}s"
Expand All @@ -399,7 +429,7 @@ async def flush_pcm_buffer() -> bool:
audio_consumer_name,
{audio_stream_name: ">"},
count=20, # Read up to 20 chunks at a time
block=100 # 100ms timeout
block=100, # 100ms timeout
)

if audio_messages:
Expand Down Expand Up @@ -429,13 +459,17 @@ async def flush_pcm_buffer() -> bool:
chunk_start_time += CHUNK_DURATION_SECONDS

# ACK the message
await redis_client.xack(audio_stream_name, audio_group_name, message_id)
await redis_client.xack(
audio_stream_name, audio_group_name, message_id
)

else:
# No new messages
if end_signal_received:
consecutive_empty_reads += 1
logger.info(f"📭 No new messages ({consecutive_empty_reads}/{max_empty_reads})")
logger.info(
f"📭 No new messages ({consecutive_empty_reads}/{max_empty_reads})"
)

if consecutive_empty_reads >= max_empty_reads:
logger.info(f"✅ Stream empty after END signal - stopping")
Expand All @@ -455,7 +489,9 @@ async def flush_pcm_buffer() -> bool:
# Calculate total duration
if total_pcm_bytes > 0:
duration = total_pcm_bytes / BYTES_PER_SECOND
compression_ratio = total_compressed_bytes / total_pcm_bytes if total_pcm_bytes > 0 else 0.0
compression_ratio = (
total_compressed_bytes / total_pcm_bytes if total_pcm_bytes > 0 else 0.0
)
else:
logger.warning(f"⚠️ No audio chunks written for session {session_id}")
duration = 0.0
Expand Down Expand Up @@ -486,7 +522,7 @@ async def flush_pcm_buffer() -> bool:
"total_compressed_bytes": total_compressed_bytes,
"compression_ratio": compression_ratio,
"duration_seconds": duration,
"runtime_seconds": runtime_seconds
"runtime_seconds": runtime_seconds,
}


Expand Down
12 changes: 12 additions & 0 deletions extras/havpe-relay/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# ESPHome build artifacts
firmware/.esphome/

# Firmware secrets (generated by init.py)
firmware/secrets.yaml

# Audio recordings (debug mode)
audio_chunks/

# Python
__pycache__/
*.pyc
Comment on lines +1 to +12
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing .env from gitignore — secrets will be committed.

init.py generates a .env file containing AUTH_PASSWORD and other credentials. This file should be excluded from version control.

🐛 Proposed fix
 # Firmware secrets (generated by init.py)
 firmware/secrets.yaml
 
+# Environment file (generated by init.py, contains credentials)
+.env
+
 # Audio recordings (debug mode)
 audio_chunks/
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# ESPHome build artifacts
firmware/.esphome/
# Firmware secrets (generated by init.py)
firmware/secrets.yaml
# Audio recordings (debug mode)
audio_chunks/
# Python
__pycache__/
*.pyc
# ESPHome build artifacts
firmware/.esphome/
# Firmware secrets (generated by init.py)
firmware/secrets.yaml
# Environment file (generated by init.py, contains credentials)
.env
# Audio recordings (debug mode)
audio_chunks/
# Python
__pycache__/
*.pyc
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@extras/havpe-relay/.gitignore` around lines 1 - 12, Add the generated .env to
.gitignore so secrets from init.py (e.g., AUTH_PASSWORD and other credentials)
are not committed; edit the repository .gitignore to include a line with ".env"
(and consider adding related patterns like ".env.local" or "*.env" if
appropriate) to ensure the generated environment file is excluded from version
control.

Loading
Loading