Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 32 additions & 21 deletions echo/frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -708,10 +708,12 @@ export const initiateAndUploadConversationChunk = async (payload: {
const fileQueue = [...Array(payload.chunks.length).keys()];
const inProgress = new Set<number>();

// Track conversation ID for finish hook
let conversationId: string | null = null;
// Each uploaded file creates its own conversation.
// We need to remember every conversation ID so we can call finishConversation()
// on each one after uploads complete. Without this, only the first conversation
// would enter the processing pipeline (transcription, merging, duration, etc.).
const conversationIdByFileIndex = new Map<number, string>();

// Process a single file
const processFile = async (i: number) => {
const chunk = payload.chunks[i];
let fileName = "";
Expand All @@ -726,7 +728,6 @@ export const initiateAndUploadConversationChunk = async (payload: {
const source = payload.source || "PORTAL_AUDIO";

try {
// Create the conversation first (one per file in this implementation)
const conversation = await initiateConversation({
email: payload.email,
name: `${payload.namePrefix} - ${fileName}`,
Expand All @@ -736,10 +737,7 @@ export const initiateAndUploadConversationChunk = async (payload: {
tagIdList: payload.tagIdList,
});

// Store conversation ID for finish hook
if (!conversationId) {
conversationId = conversation.id;
}
conversationIdByFileIndex.set(i, conversation.id);

// Upload using new presigned URL method
const uploadResult = await uploadConversationChunkWithPresignedUrl({
Expand Down Expand Up @@ -811,20 +809,33 @@ export const initiateAndUploadConversationChunk = async (payload: {
toast.success(`All ${payload.chunks.length} file(s) uploaded successfully`);
}

// IMPORTANT: Trigger finish hook after all uploads complete
// This triggers: audio merging, ETL pipeline, summarization
if (conversationId && failures.length === 0) {
console.log(
`[Upload] Triggering finish hook for conversation ${conversationId}`,
);
try {
await finishConversation(conversationId);
console.log("[Upload] Finish hook triggered successfully");
} catch (error) {
console.error("[Upload] Failed to trigger finish hook:", error);
// Don't throw - uploads succeeded, this is just post-processing
// Collect conversation IDs for files that uploaded successfully.
// Failed uploads are skipped — they have no conversation to finish.
const succeededConversationIds = results.reduce<string[]>((ids, result, i) => {
const isFailure = result && "error" in result;
const conversationId = conversationIdByFileIndex.get(i);
if (!isFailure && conversationId) {
ids.push(conversationId);
}
}
return ids;
}, []);

// Call finishConversation() for every successful upload, concurrently.
// This triggers the backend pipeline: transcription → merging → duration → summary.
// Each call is independent, so one failure won't block the others.
await Promise.all(
succeededConversationIds.map(async (conversationId) => {
try {
await finishConversation(conversationId);
console.log(`[Upload] Finish hook triggered for conversation ${conversationId}`);
} catch (error) {
console.error(
`[Upload] Failed to finish conversation ${conversationId}:`,
error,
);
}
}),
);

return results;
};
Expand Down
28 changes: 14 additions & 14 deletions echo/server/dembrane/api/conversation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from dembrane.service import project_service, conversation_service, build_conversation_service
from dembrane.directus import directus
from dembrane.audio_utils import (
get_duration_from_s3,
sanitize_filename_component,
merge_multiple_audio_files_and_save_to_s3,
)
Expand Down Expand Up @@ -291,20 +290,14 @@ async def get_conversation_content(
try:
uuid = generate_uuid()

merged_path = await run_in_thread_pool(
merged_path, duration = await run_in_thread_pool(
merge_multiple_audio_files_and_save_to_s3,
file_paths,
f"audio-conversations/merged-{sanitize_filename_component(conversation_id)}-{uuid}.mp3",
"mp3",
)

logger.debug(f"Successfully merged audio to: {merged_path}")

duration = -1.0
try:
duration = await run_in_thread_pool(get_duration_from_s3, merged_path)
except Exception as e:
logger.error(f"Error getting duration from s3: {str(e)}")
logger.debug(f"Successfully merged audio to: {merged_path}, duration: {duration}s")

await run_in_thread_pool(
active_client.update_item,
Expand Down Expand Up @@ -793,11 +786,18 @@ async def retranscribe_conversation(
# because return_url is True
assert isinstance(merged_audio_path, str)

duration = None
try:
duration = await run_in_thread_pool(get_duration_from_s3, merged_audio_path)
except Exception as e:
logger.error(f"Error getting duration from s3: {str(e)}")
# Duration was already computed and saved by get_conversation_content above
updated_conversation = await run_in_thread_pool(
active_client.get_items,
"conversation",
{
"query": {
"filter": {"id": {"_eq": conversation_id}},
"fields": ["duration"],
}
},
)
duration = updated_conversation[0].get("duration") if updated_conversation else None

# Create a new conversation
new_conversation_id = generate_uuid()
Expand Down
Loading
Loading