diff --git a/echo/frontend/src/lib/api.ts b/echo/frontend/src/lib/api.ts index e0e312b6..1ce02760 100644 --- a/echo/frontend/src/lib/api.ts +++ b/echo/frontend/src/lib/api.ts @@ -708,10 +708,12 @@ export const initiateAndUploadConversationChunk = async (payload: { const fileQueue = [...Array(payload.chunks.length).keys()]; const inProgress = new Set(); - // Track conversation ID for finish hook - let conversationId: string | null = null; + // Each uploaded file creates its own conversation. + // We need to remember every conversation ID so we can call finishConversation() + // on each one after uploads complete. Without this, only the first conversation + // would enter the processing pipeline (transcription, merging, duration, etc.). + const conversationIdByFileIndex = new Map(); - // Process a single file const processFile = async (i: number) => { const chunk = payload.chunks[i]; let fileName = ""; @@ -726,7 +728,6 @@ export const initiateAndUploadConversationChunk = async (payload: { const source = payload.source || "PORTAL_AUDIO"; try { - // Create the conversation first (one per file in this implementation) const conversation = await initiateConversation({ email: payload.email, name: `${payload.namePrefix} - ${fileName}`, @@ -736,10 +737,7 @@ export const initiateAndUploadConversationChunk = async (payload: { tagIdList: payload.tagIdList, }); - // Store conversation ID for finish hook - if (!conversationId) { - conversationId = conversation.id; - } + conversationIdByFileIndex.set(i, conversation.id); // Upload using new presigned URL method const uploadResult = await uploadConversationChunkWithPresignedUrl({ @@ -811,20 +809,33 @@ export const initiateAndUploadConversationChunk = async (payload: { toast.success(`All ${payload.chunks.length} file(s) uploaded successfully`); } - // IMPORTANT: Trigger finish hook after all uploads complete - // This triggers: audio merging, ETL pipeline, summarization - if (conversationId && failures.length === 0) { - console.log( - `[Upload] Triggering finish hook for conversation ${conversationId}`, - ); - try { - await finishConversation(conversationId); - console.log("[Upload] Finish hook triggered successfully"); - } catch (error) { - console.error("[Upload] Failed to trigger finish hook:", error); - // Don't throw - uploads succeeded, this is just post-processing + // Collect conversation IDs for files that uploaded successfully. + // Failed uploads are skipped — they have no conversation to finish. + const succeededConversationIds = results.reduce((ids, result, i) => { + const isFailure = result && "error" in result; + const conversationId = conversationIdByFileIndex.get(i); + if (!isFailure && conversationId) { + ids.push(conversationId); } - } + return ids; + }, []); + + // Call finishConversation() for every successful upload, concurrently. + // This triggers the backend pipeline: transcription → merging → duration → summary. + // Each call is independent, so one failure won't block the others. + await Promise.all( + succeededConversationIds.map(async (conversationId) => { + try { + await finishConversation(conversationId); + console.log(`[Upload] Finish hook triggered for conversation ${conversationId}`); + } catch (error) { + console.error( + `[Upload] Failed to finish conversation ${conversationId}:`, + error, + ); + } + }), + ); return results; }; diff --git a/echo/server/dembrane/api/conversation.py b/echo/server/dembrane/api/conversation.py index 7797a86e..8fec22bf 100644 --- a/echo/server/dembrane/api/conversation.py +++ b/echo/server/dembrane/api/conversation.py @@ -16,7 +16,6 @@ from dembrane.service import project_service, conversation_service, build_conversation_service from dembrane.directus import directus from dembrane.audio_utils import ( - get_duration_from_s3, sanitize_filename_component, merge_multiple_audio_files_and_save_to_s3, ) @@ -291,20 +290,14 @@ async def get_conversation_content( try: uuid = generate_uuid() - merged_path = await run_in_thread_pool( + merged_path, duration = await run_in_thread_pool( merge_multiple_audio_files_and_save_to_s3, file_paths, f"audio-conversations/merged-{sanitize_filename_component(conversation_id)}-{uuid}.mp3", "mp3", ) - logger.debug(f"Successfully merged audio to: {merged_path}") - - duration = -1.0 - try: - duration = await run_in_thread_pool(get_duration_from_s3, merged_path) - except Exception as e: - logger.error(f"Error getting duration from s3: {str(e)}") + logger.debug(f"Successfully merged audio to: {merged_path}, duration: {duration}s") await run_in_thread_pool( active_client.update_item, @@ -793,11 +786,18 @@ async def retranscribe_conversation( # because return_url is True assert isinstance(merged_audio_path, str) - duration = None - try: - duration = await run_in_thread_pool(get_duration_from_s3, merged_audio_path) - except Exception as e: - logger.error(f"Error getting duration from s3: {str(e)}") + # Duration was already computed and saved by get_conversation_content above + updated_conversation = await run_in_thread_pool( + active_client.get_items, + "conversation", + { + "query": { + "filter": {"id": {"_eq": conversation_id}}, + "fields": ["duration"], + } + }, + ) + duration = updated_conversation[0].get("duration") if updated_conversation else None # Create a new conversation new_conversation_id = generate_uuid() diff --git a/echo/server/dembrane/audio_utils.py b/echo/server/dembrane/audio_utils.py index 5b7e17fb..e590aca8 100644 --- a/echo/server/dembrane/audio_utils.py +++ b/echo/server/dembrane/audio_utils.py @@ -176,17 +176,23 @@ def convert_and_save_to_s3( if b"ftypM4A" in input_data[:50] or b"moov" in input_data[:200]: logger.info("Detected possible Apple Voice Memo signature") - # Process through ffmpeg - with tempfile.NamedTemporaryFile(suffix=f".{file_format}") as input_temp_file: - input_temp_file.write(input_data) - input_temp_file.flush() + # Write to a temp file (not pipe) so ffmpeg can seek back to write + # proper VBR headers (e.g. Xing for MP3), which are required for + # accurate duration metadata. + with tempfile.TemporaryDirectory() as tmpdir: + input_path = os.path.join(tmpdir, f"input.{file_format}") + output_path = os.path.join(tmpdir, f"output.{output_format}") + + with open(input_path, "wb") as f: + f.write(input_data) + if output_format == "ogg": if file_format.lower() in ["m4a", "mp4"]: logger.debug("Special handling for M4A files") process = ( - ffmpeg.input(input_temp_file.name, f=file_format) + ffmpeg.input(input_path, f=file_format) .output( - "pipe:1", + output_path, f="ogg", acodec="libvorbis", q="5", @@ -201,26 +207,25 @@ def convert_and_save_to_s3( "ignore_err", ) .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) else: process = ( - ffmpeg.input(input_temp_file.name, f=file_format) - .output("pipe:1", f="ogg", acodec="libvorbis", q="5") + ffmpeg.input(input_path, f=file_format) + .output(output_path, f="ogg", acodec="libvorbis", q="5") .global_args("-hide_banner", "-loglevel", "warning") .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) elif output_format == "mp3": process = ( - ffmpeg.input(input_temp_file.name, f=file_format) + ffmpeg.input(input_path, f=file_format) .output( - "pipe:1", + output_path, f="mp3", acodec="libmp3lame", q="5", strict="-2", - preset="veryfast", ) .global_args( "-hide_banner", @@ -230,33 +235,38 @@ def convert_and_save_to_s3( "ignore_err", ) .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) else: raise ValueError(f"Not implemented for file format: {output_format}") - output, err = process.communicate(input=None) + _, err = process.communicate() + + # Log the stderr output for debugging + err_text = err.decode() if err else "" + if err_text: + logger.debug(f"FFmpeg stderr: {err_text}") + + if process.returncode != 0: + error_message = err_text or "Unknown FFmpeg error" + if "No such file or directory" in error_message: + raise FFmpegError(f"Input file not found: {input_file_name}") + elif "Invalid data found when processing input" in error_message: + raise FFmpegError("Invalid or corrupted input file") + elif "Memory allocation error" in error_message: + raise FFmpegError( + f"Memory allocation failed - file too large. " + f"Required memory: {estimated_memory_mb:.1f}MB" + ) + else: + raise FFmpegError(f"FFmpeg processing failed: {error_message}") - # Log the stderr output for debugging - err_text = err.decode() if err else "" - if err_text: - logger.debug(f"FFmpeg stderr: {err_text}") + if not os.path.exists(output_path): + raise ConversionError("FFmpeg produced no output file") - if process.returncode != 0: - error_message = err_text or "Unknown FFmpeg error" - if "No such file or directory" in error_message: - raise FFmpegError(f"Input file not found: {input_file_name}") - elif "Invalid data found when processing input" in error_message: - raise FFmpegError("Invalid or corrupted input file") - elif "Memory allocation error" in error_message: - raise FFmpegError( - f"Memory allocation failed - file too large. " - f"Required memory: {estimated_memory_mb:.1f}MB" - ) - else: - raise FFmpegError(f"FFmpeg processing failed: {error_message}") + with open(output_path, "rb") as f: + output = f.read() - # Verify we got valid output if not output: raise ConversionError("FFmpeg produced empty output") @@ -296,7 +306,7 @@ def merge_multiple_audio_files_and_save_to_s3( input_file_names: List[str], output_file_name: str, output_format: str, -) -> str: +) -> tuple[str, float]: """Merge multiple audio files and save the result back to S3. Args: @@ -305,7 +315,7 @@ def merge_multiple_audio_files_and_save_to_s3( output_format: Format to convert to Returns: - str: Public URL of the processed file + tuple of (public_url, duration_seconds). Duration is -1.0 if probing fails. Raises: FFmpegError: For FFmpeg-specific errors @@ -361,67 +371,86 @@ def merge_multiple_audio_files_and_save_to_s3( if not processed_data_streams: raise ValueError("No processed data streams") - with tempfile.NamedTemporaryFile(suffix=f".{output_format}") as temp_file: - for data_stream in processed_data_streams: - temp_file.write(data_stream.read()) + with tempfile.TemporaryDirectory() as tmpdir: + chunk_paths = [] + for i, data_stream in enumerate(processed_data_streams): + chunk_path = os.path.join(tmpdir, f"chunk_{i}.{output_format}") + with open(chunk_path, "wb") as f: + f.write(data_stream.read()) + chunk_paths.append(chunk_path) - temp_file.flush() + concat_list_path = os.path.join(tmpdir, "concat_list.txt") + with open(concat_list_path, "w") as f: + for p in chunk_paths: + f.write(f"file '{p}'\n") + + merged_path = os.path.join(tmpdir, f"merged.{output_format}") if output_format == "ogg": - # Final processing to ensure consistent output process = ( - ffmpeg.input(temp_file.name, format=output_format) - .output("pipe:1", f="ogg", acodec="libvorbis", q="5") + ffmpeg.input(concat_list_path, f="concat", safe=0) + .output(merged_path, f="ogg", acodec="libvorbis", q="5") .global_args("-hide_banner", "-loglevel", "warning") .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) elif output_format == "mp3": process = ( - ffmpeg.input(temp_file.name, format=output_format) + ffmpeg.input(concat_list_path, f="concat", safe=0) .output( - "pipe:1", + merged_path, f="mp3", acodec="libmp3lame", q="5", - preset="veryfast", ) .global_args("-hide_banner", "-loglevel", "warning") .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) else: raise ValueError(f"Not implemented for file format: {output_format}") - output, err = process.communicate(input=None) + _, err = process.communicate() - if process.returncode != 0: - error_message = err.decode() if err else "Unknown FFmpeg error" - raise FFmpegError(f"FFmpeg final processing failed: {error_message}") + if process.returncode != 0: + error_message = err.decode() if err else "Unknown FFmpeg error" + raise FFmpegError(f"FFmpeg final processing failed: {error_message}") - # Save to S3 - logger.info(f"Saving merged audio to S3 as {output_file_name}") - s3_client.put_object( - Bucket=STORAGE_S3_BUCKET, - Key=get_sanitized_s3_key(output_file_name), - Body=output, - ACL="private", - ) + # Probe duration from the local temp file before cleanup (no S3 round-trip) + audio_duration = -1.0 + try: + probe_data = probe_from_file(merged_path) + if "format" in probe_data and "duration" in probe_data["format"]: + audio_duration = float(probe_data["format"]["duration"]) + else: + logger.error("Duration not found in ffprobe output for merged file") + except Exception as e: + logger.error(f"Error probing duration from local merged file: {str(e)}") + + # Stream-upload to S3 from disk (never loads full file into memory) + s3_key = get_sanitized_s3_key(output_file_name) + logger.info(f"Saving merged audio to S3 as {output_file_name}") + s3_client.upload_file( + merged_path, + STORAGE_S3_BUCKET, + s3_key, + ExtraArgs={"ACL": "private"}, + ) info = s3_client.head_object( Bucket=STORAGE_S3_BUCKET, Key=get_sanitized_s3_key(output_file_name) ) logger.debug(f"Head object from S3: {info}") - duration = time.time() - start_time + elapsed = time.time() - start_time logger.info( - f"Completed merging {len(input_file_names)} files in {duration:.2f}s. " - f"Total input size: {total_size_mb:.1f}MB" + f"Completed merging {len(input_file_names)} files in {elapsed:.2f}s. " + f"Total input size: {total_size_mb:.1f}MB, duration: {audio_duration:.1f}s" ) public_url = f"{STORAGE_S3_ENDPOINT}/{STORAGE_S3_BUCKET}/{output_file_name}" - return public_url + return public_url, audio_duration def probe_from_bytes(file_bytes: bytes, input_format: str) -> dict: @@ -559,6 +588,39 @@ def probe_from_bytes(file_bytes: bytes, input_format: str) -> dict: logger.warning(f"Failed to delete temporary file {temp_file_path}: {e}") +def probe_from_file(file_path: str) -> dict: + """Run ffprobe directly on a local file path. Avoids loading the file into memory.""" + if not os.path.exists(file_path): + raise ValueError(f"File not found: {file_path}") + + cmd = [ + "ffprobe", + "-hide_banner", + "-loglevel", + "warning", + "-print_format", + "json", + "-show_format", + "-show_streams", + file_path, + ] + + process = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + stderr_output = process.stderr.decode().strip() + if stderr_output: + logger.debug(f"ffprobe stderr: {stderr_output}") + + if process.returncode != 0: + raise ValueError(f"ffprobe failed on {file_path}: {stderr_output or 'Unknown error'}") + + output = process.stdout.decode() + if not output: + raise ValueError("ffprobe returned empty output") + + return json.loads(output) + + def probe_from_s3(file_name: str, input_format: str) -> dict: return probe_from_bytes(get_stream_from_s3(file_name).read(), input_format) @@ -655,9 +717,10 @@ def split_audio_chunk( s3_keys_created = [] # Track S3 keys for cleanup on failure try: - with tempfile.NamedTemporaryFile(suffix=f".{output_format}") as temp_file: - temp_file.write(get_stream_from_s3(updated_chunk_path).read()) - temp_file.flush() + with tempfile.TemporaryDirectory() as tmpdir: + source_path = os.path.join(tmpdir, f"source.{output_format}") + with open(source_path, "wb") as f: + f.write(get_stream_from_s3(updated_chunk_path).read()) # Phase 1: Split and upload all chunks to S3 for i in range(number_chunks): @@ -670,24 +733,27 @@ def split_audio_chunk( ) logger.debug(f"Extracting chunk {i + 1}/{number_chunks} starting at {start_time}s") + split_out_path = os.path.join(tmpdir, f"split_{i}.{output_format}") process = ( - ffmpeg.input(temp_file.name) + ffmpeg.input(source_path) .output( - "pipe:1", + split_out_path, ss=start_time, t=chunk_duration, f=output_format, - preset="veryfast", ) .overwrite_output() - .run_async(pipe_stdin=True, pipe_stdout=True, pipe_stderr=True) + .run_async(pipe_stdout=True, pipe_stderr=True) ) - chunk_output, err = process.communicate(input=None) + _, err = process.communicate() if process.returncode != 0: raise FFmpegError(f"ffmpeg splitting failed: {err.decode().strip()}") + with open(split_out_path, "rb") as f: + chunk_output = f.read() + s3_client.put_object( Bucket=STORAGE_S3_BUCKET, Key=s3_chunk_path, diff --git a/echo/server/tests/test_audio_utils.py b/echo/server/tests/test_audio_utils.py index 9989cdd8..2875fda5 100644 --- a/echo/server/tests/test_audio_utils.py +++ b/echo/server/tests/test_audio_utils.py @@ -67,13 +67,13 @@ def test_convert_and_save_to_s3(file_name, output_format): output_url = convert_and_save_to_s3(input_file_key, output_file_key, output_format) assert output_url is not None, "output_url is None" - assert output_url.startswith( - STORAGE_S3_ENDPOINT - ), "output_url does not start with STORAGE_S3_ENDPOINT" + assert output_url.startswith(STORAGE_S3_ENDPOINT), ( + "output_url does not start with STORAGE_S3_ENDPOINT" + ) assert output_url.startswith("http"), "output_url does not start with http" - assert output_url.endswith( - f".{output_format}" - ), f"output_url does not end with .{output_format}" + assert output_url.endswith(f".{output_format}"), ( + f"output_url does not end with .{output_format}" + ) # Verify the file exists and has content head_response = s3_client.head_object( @@ -135,17 +135,18 @@ def test_merge_multiple_audio_files_and_save_to_s3(output_format): # merge files merged_file_key = "tests/" + generate_uuid() + "." + output_format - merged_file_url = merge_multiple_audio_files_and_save_to_s3( + merged_file_url, duration = merge_multiple_audio_files_and_save_to_s3( new_file_names, merged_file_key, output_format ) assert merged_file_url is not None, "merged_file_url is None" - assert merged_file_url.startswith( - STORAGE_S3_ENDPOINT - ), "merged_file_url does not start with STORAGE_S3_ENDPOINT" - assert merged_file_url.endswith( - f".{output_format}" - ), f"merged_file_url does not end with .{output_format}" + assert merged_file_url.startswith(STORAGE_S3_ENDPOINT), ( + "merged_file_url does not start with STORAGE_S3_ENDPOINT" + ) + assert merged_file_url.endswith(f".{output_format}"), ( + f"merged_file_url does not end with .{output_format}" + ) + assert duration > 0, f"Duration should be positive, got {duration}" response = s3_client.get_object( Bucket=STORAGE_S3_BUCKET, Key=get_sanitized_s3_key(merged_file_key) @@ -245,12 +246,12 @@ def test_split_audio_chunk(file_name, output_format): split_chunk, ) assert item is not None, f"Failed to get split chunk {item['id']}" - assert item["path"].startswith( - "http" - ), f"Split chunk {item['path']} does not start with http" - assert item["path"].startswith( - STORAGE_S3_ENDPOINT - ), f"Split chunk {item['path']} does not start with STORAGE_S3_ENDPOINT" + assert item["path"].startswith("http"), ( + f"Split chunk {item['path']} does not start with http" + ) + assert item["path"].startswith(STORAGE_S3_ENDPOINT), ( + f"Split chunk {item['path']} does not start with STORAGE_S3_ENDPOINT" + ) data = s3_client.get_object( Bucket=STORAGE_S3_BUCKET, @@ -263,12 +264,12 @@ def test_split_audio_chunk(file_name, output_format): assert probe is not None, f"Failed to probe split chunk {item['path']}" assert probe["streams"] is not None, f"Probe result for {item['path']} has no streams" assert len(probe["streams"]) > 0, f"Probe result for {item['path']} has no streams" - assert ( - probe["streams"][0]["codec_type"] == "audio" - ), f"Probe result for {item['path']} is not an audio stream" - assert ( - float(probe["streams"][0]["duration"]) > 0 - ), f"Probe result for {item['path']} has no duration" + assert probe["streams"][0]["codec_type"] == "audio", ( + f"Probe result for {item['path']} is not an audio stream" + ) + assert float(probe["streams"][0]["duration"]) > 0, ( + f"Probe result for {item['path']} has no duration" + ) # delete the conversation chunk directus.delete_item("conversation_chunk", chunk_id) @@ -326,14 +327,14 @@ def test_probe_from_bytes(file_name: str): assert probe_result is not None, f"Failed to probe {file_name}" assert "streams" in probe_result, f"No streams in probe result for {file_name}" assert len(probe_result["streams"]) > 0, f"No streams in probe result for {file_name}" - assert ( - probe_result["streams"][0]["codec_type"] == "audio" - ), f"Not an audio stream for {file_name}" + assert probe_result["streams"][0]["codec_type"] == "audio", ( + f"Not an audio stream for {file_name}" + ) assert "format" in probe_result, f"No format information in probe result for {file_name}" assert "duration" in probe_result["format"], f"No duration in probe result for {file_name}" - assert ( - float(probe_result["format"]["duration"]) > 0 - ), f"Duration not positive for {file_name}" + assert float(probe_result["format"]["duration"]) > 0, ( + f"Duration not positive for {file_name}" + ) logger.info(f"Successfully probed {file_name}") @@ -367,14 +368,14 @@ def test_probe_from_s3(file_name: str): assert probe_result is not None, f"Failed to probe {s3_key}" assert "streams" in probe_result, f"No streams in probe result for {s3_key}" assert len(probe_result["streams"]) > 0, f"No streams in probe result for {s3_key}" - assert ( - probe_result["streams"][0]["codec_type"] == "audio" - ), f"Not an audio stream for {s3_key}" + assert probe_result["streams"][0]["codec_type"] == "audio", ( + f"Not an audio stream for {s3_key}" + ) assert "format" in probe_result, f"No format information in probe result for {s3_key}" assert "duration" in probe_result["format"], f"No duration in probe result for {s3_key}" - assert ( - float(probe_result["format"]["duration"]) > 0 - ), f"Duration not positive for {s3_key}" + assert float(probe_result["format"]["duration"]) > 0, ( + f"Duration not positive for {s3_key}" + ) logger.info(f"Successfully probed {s3_key} from S3") @@ -452,17 +453,18 @@ def test_merge_specific_format_pairs(file_formats, output_format): merged_file_key = ( f"tests/{generate_uuid()}-{format1}_{format2}_to_{output_format}.{output_format}" ) - merged_file_url = merge_multiple_audio_files_and_save_to_s3( + merged_file_url, duration = merge_multiple_audio_files_and_save_to_s3( new_file_names, merged_file_key, output_format ) assert merged_file_url is not None, "merged_file_url is None" - assert merged_file_url.startswith( - STORAGE_S3_ENDPOINT - ), "merged_file_url does not start with STORAGE_S3_ENDPOINT" - assert merged_file_url.endswith( - f".{output_format}" - ), f"merged_file_url does not end with .{output_format}" + assert merged_file_url.startswith(STORAGE_S3_ENDPOINT), ( + "merged_file_url does not start with STORAGE_S3_ENDPOINT" + ) + assert merged_file_url.endswith(f".{output_format}"), ( + f"merged_file_url does not end with .{output_format}" + ) + assert duration > 0, f"Duration should be positive, got {duration}" # Verify the merged file response = s3_client.get_object(