diff --git a/README.md b/README.md index 126bebd..16b9e61 100644 --- a/README.md +++ b/README.md @@ -103,7 +103,7 @@ PGVECTOR_DATABASE=satellite If `PGVECTOR_*` environment variables are set, `POST /api/get_transcription` can persist the raw transcription to Postgres when the request includes `persist=true` and a valid `uniqueid`. The database schema is created automatically on first use and includes: -- `transcripts`: stores `uniqueid`, diarized raw transcription (Deepgram paragraphs transcript), `state`, optional cleaned transcription + summary, and `sentiment` (0-10) +- `transcripts`: stores `uniqueid`, optional `linkedid`, and optional `src_number` and `dst_number` participant numbers, diarized raw transcription (Deepgram paragraphs transcript), `state`, optional cleaned transcription + summary, and `sentiment` (0-10). `uniqueid` is indexed but not unique, so transferred calls can persist multiple fragments under the same Asterisk call identifier while Satellite tracks each stored fragment by its internal `id`. - `transcript_chunks`: table for storing chunked `text-embedding-3-small` embeddings in a `vector(1536)` column for similarity search `transcripts.state` is DB-only and represents the processing lifecycle: @@ -136,6 +136,8 @@ Optional fields (query string or multipart form fields): - `uniqueid`: Asterisk-style uniqueid like `1234567890.1234` (required only when `persist=true`) - `persist`: `true|false` (default `false`) — persist raw transcript to Postgres (requires `PGVECTOR_*` env vars) - `summary`: `true|false` (default `false`) — run AI enrichment (requires `OPENAI_API_KEY` and also `persist=true` so there is a DB record to update) +- `linkedid`: optional linked Asterisk call id stored with the persisted transcript row when `persist=true` +- `src_number`, `dst_number`: optional participant numbers stored with the persisted transcript row when `persist=true` - `channel0_name`, `channel1_name`: rename diarization labels in the returned transcript (replaces `Channel 0:` / `Channel 1:`) Deepgram parameters: @@ -156,10 +158,12 @@ Authentication: - If `API_TOKEN` is unset/empty, auth is disabled (backwards compatible default). If `persist=true` and `PGVECTOR_*` is configured, the raw transcription is saved to Postgres. +Each persisted request creates or updates its own transcript row by internal `id`; repeated `uniqueid` values are allowed for multi-fragment call recordings. If `summary=true` and `OPENAI_API_KEY` is set, the service also generates a cleaned transcription, summary, and sentiment score (0-10) via a per-request subprocess worker (`call_processor.py`) and stores them in Postgres. If `OPENAI_API_KEY` is missing (or `persist=false`), clean/summary/sentiment are skipped. When `persist=true`, `POST /api/get_transcription` updates `transcripts.state` as it runs: `progress` → (`summarizing` →) `done`, or `failed` on errors. +If Deepgram returns `results.channels: []` for silent or zero-duration audio, Satellite returns `200` with an empty transcript and, when persistence is enabled, marks the row as `done` so the caller can discard the source audio instead of retrying forever. #### `POST /api/get_speech` diff --git a/api.py b/api.py index 7ae0732..41a74eb 100644 --- a/api.py +++ b/api.py @@ -408,7 +408,9 @@ async def get_transcription( logger.debug(f"Params: {input_params}") uniqueid = (input_params.get("uniqueid") or "").strip() - linkedid = (input_params.get("linkedid") or "").strip() + linkedid = (input_params.get("linkedid") or "").strip() or None + src_number = (input_params.get("src_number") or "").strip() or None + dst_number = (input_params.get("dst_number") or "").strip() or None channel0_name = (input_params.get("channel0_name") or "").strip() channel1_name = (input_params.get("channel1_name") or "").strip() # Persist only when explicitly requested. @@ -424,12 +426,14 @@ async def get_transcription( transcript_id = None if db.is_configured() and persist: - # Create/mark a DB row immediately so we can track state even if Deepgram fails. + # Create a DB row immediately so we can track state even if Deepgram fails. try: transcript_id = await run_in_threadpool( db.upsert_transcript_progress, uniqueid=uniqueid, linkedid=linkedid, + src_number=src_number, + dst_number=dst_number, ) except Exception: logger.exception("Failed to initialize transcript row for state tracking") @@ -553,29 +557,41 @@ async def get_transcription( result = response.json() detected_language = None # always define; mocks may omit this field - - # Deepgram returned an explicitly empty channels list — audio was silent or - # zero-duration (e.g. a very short or dropped call). Treat this as "nothing to - # transcribe": mark the row done so the cleanup service does not retry it, and return - # 200 so the bash script deletes the WAV files. A 500 here would leave the WAV files - # on disk and trigger infinite retries. - # NOTE: we only handle the case where channels is present but empty ([]); if the key is - # absent entirely the response is genuinely malformed and falls through to the error path. - _channels = result.get("results", {}).get("channels") - if _channels is not None and not _channels: - duration = result.get("metadata", {}).get("duration", 0.0) - logger.warning( - "Deepgram returned no channels (duration=%.1f, uniqueid=%s) - audio is silent or empty; skipping", + results = result.get("results", {}) + metadata = result.get("metadata", {}) + channels = results.get("channels") + if channels is not None and not channels: + duration = metadata.get("duration", 0.0) + paragraphs = results.get("paragraphs") + paragraphs_transcript = paragraphs.get("transcript") if isinstance(paragraphs, dict) else None + if duration == 0.0 and ( + paragraphs_transcript is None + or (isinstance(paragraphs_transcript, str) and paragraphs_transcript.strip() == "") + ): + logger.warning( + "Deepgram returned explicit empty transcription (duration=%.1f, uniqueid=%s); skipping empty audio", + duration, + uniqueid, + ) + if transcript_id is not None: + try: + await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="done") + except Exception: + logger.exception("Failed to update transcript state=done after empty audio") + return {"transcript": "", "detected_language": None} + + logger.error( + "Deepgram returned empty channels without explicit empty-audio indicators " + "(duration=%s, uniqueid=%s); treating response as invalid", duration, uniqueid, ) if transcript_id is not None: try: - await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="done") + await run_in_threadpool(db.set_transcript_state, transcript_id=transcript_id, state="failed") except Exception: - logger.exception("Failed to update transcript state=done after silent audio") - return {"transcript": "", "detected_language": None} - + logger.exception("Failed to update transcript state=failed after invalid empty channels response") + raise HTTPException(status_code=502, detail="Invalid Deepgram response") try: if "paragraphs" in result["results"] and "transcript" in result["results"]["paragraphs"]: raw_transcription = result["results"]["paragraphs"]["transcript"].strip() @@ -617,8 +633,11 @@ async def get_transcription( try: transcript_id = await run_in_threadpool( db.upsert_transcript_raw, + transcript_id=transcript_id, uniqueid=uniqueid, linkedid=linkedid, + src_number=src_number, + dst_number=dst_number, raw_transcription=raw_transcription, ) except ValueError as e: diff --git a/db.py b/db.py index f5263eb..f27d8c7 100644 --- a/db.py +++ b/db.py @@ -93,8 +93,10 @@ def _ensure_schema() -> None: """ CREATE TABLE IF NOT EXISTS transcripts ( id BIGSERIAL PRIMARY KEY, - uniqueid TEXT NOT NULL UNIQUE, - linkedid TEXT NULL, + uniqueid TEXT NOT NULL, + linkedid TEXT, + src_number TEXT, + dst_number TEXT, raw_transcription TEXT NOT NULL, state TEXT NOT NULL DEFAULT 'done', cleaned_transcription TEXT, @@ -108,6 +110,17 @@ def _ensure_schema() -> None: """ ) + conn.execute( + "ALTER TABLE transcripts DROP CONSTRAINT IF EXISTS transcripts_uniqueid_key" + ) + + # Older deployments may still have a standalone unique index. + conn.execute("DROP INDEX IF EXISTS transcripts_uniqueid_key") + + conn.execute( + "CREATE INDEX IF NOT EXISTS transcripts_uniqueid_idx ON transcripts (uniqueid)" + ) + conn.execute( f""" CREATE TABLE IF NOT EXISTS transcript_chunks ( @@ -129,27 +142,22 @@ def _ensure_schema() -> None: # Commit the core schema changes explicitly for clarity. conn.commit() - # Idempotent migration: ensure `linkedid TEXT NULL` exists on the transcripts table. - # Three cases for existing databases: - # 1. Column missing entirely (DB predates this column) → ADD COLUMN. - # 2. Column exists as NOT NULL (old migration that required it) → DROP NOT NULL. - # 3. Column exists as nullable → nothing to do. - linkedid_row = conn.execute( - """ - SELECT is_nullable - FROM information_schema.columns - WHERE table_schema = 'public' - AND table_name = 'transcripts' - AND column_name = 'linkedid' - """ - ).fetchone() - if linkedid_row is None: - conn.execute("ALTER TABLE transcripts ADD COLUMN linkedid TEXT NULL") - conn.commit() - elif linkedid_row[0] == "NO": - conn.execute("ALTER TABLE transcripts ALTER COLUMN linkedid DROP NOT NULL") - conn.commit() - + # Idempotent migration: add participant tracking columns for + # existing databases created before these fields existed. + for col in ("linkedid", "src_number", "dst_number"): + col_exists = conn.execute( + """ + SELECT 1 + FROM information_schema.columns + WHERE table_schema = 'public' + AND table_name = 'transcripts' + AND column_name = %s + """, + (col,), + ).fetchone() + if col_exists is None: + conn.execute(f"ALTER TABLE transcripts ADD COLUMN {col} TEXT NULL") + conn.commit() # "Modern" pgvector index: HNSW (if supported by server pgvector version) try: # Run this in its own transaction so a failure doesn't leave the @@ -184,11 +192,17 @@ def validate_transcript_state(state: str) -> None: raise ValueError(f"Invalid transcript state {state!r}; expected one of {', '.join(TRANSCRIPT_STATES)}") -def upsert_transcript_progress(*, uniqueid: str, linkedid: Optional[str] = None) -> int: - """Ensure a transcript row exists and mark it as 'progress'. Returns transcript id. +def upsert_transcript_progress( + *, + uniqueid: str, + linkedid: Optional[str] = None, + src_number: Optional[str] = None, + dst_number: Optional[str] = None, +) -> int: + """Create a transcript row in 'progress' state. Returns transcript id. This is used to represent a requested transcription before the Deepgram request - completes. + completes, even when multiple transcript rows share the same uniqueid. """ validate_uniqueid(uniqueid) @@ -197,16 +211,11 @@ def upsert_transcript_progress(*, uniqueid: str, linkedid: Optional[str] = None) with _connect() as conn: row = conn.execute( """ - INSERT INTO transcripts (uniqueid, linkedid, raw_transcription, state) - VALUES (%s, %s, %s, 'progress') - ON CONFLICT (uniqueid) - DO UPDATE SET - linkedid = COALESCE(EXCLUDED.linkedid, transcripts.linkedid), - state = 'progress', - updated_at = now() + INSERT INTO transcripts (uniqueid, linkedid, src_number, dst_number, raw_transcription, state) + VALUES (%s, %s, %s, %s, %s, 'progress') RETURNING id """, - (uniqueid, linkedid, ""), + (uniqueid, linkedid, src_number, dst_number, ""), ).fetchone() if row is None: @@ -238,40 +247,61 @@ def set_transcript_state_by_uniqueid(*, uniqueid: str, state: str) -> None: with _connect() as conn: conn.execute( """ + WITH latest_transcript AS ( + SELECT id + FROM transcripts + WHERE uniqueid = %s + ORDER BY id DESC + LIMIT 1 + ) UPDATE transcripts SET state = %s, updated_at = now() - WHERE uniqueid = %s + WHERE id IN (SELECT id FROM latest_transcript) """, - (state, uniqueid), + (uniqueid, state), ) def upsert_transcript_raw( *, + transcript_id: Optional[int] = None, uniqueid: str, linkedid: Optional[str] = None, + src_number: Optional[str] = None, + dst_number: Optional[str] = None, raw_transcription: str, ) -> int: - """Insert or update the raw transcript row and return its transcript id.""" + """Persist the raw transcript row and return its transcript id.""" validate_uniqueid(uniqueid) _ensure_schema() with _connect() as conn: - row = conn.execute( - """ - INSERT INTO transcripts (uniqueid, linkedid, raw_transcription) - VALUES (%s, %s, %s) - ON CONFLICT (uniqueid) - DO UPDATE SET - linkedid = COALESCE(EXCLUDED.linkedid, transcripts.linkedid), - raw_transcription = EXCLUDED.raw_transcription, - updated_at = now() - RETURNING id - """, - (uniqueid, linkedid, raw_transcription), - ).fetchone() + if transcript_id is None: + row = conn.execute( + """ + INSERT INTO transcripts (uniqueid, linkedid, src_number, dst_number, raw_transcription) + VALUES (%s, %s, %s, %s, %s) + RETURNING id + """, + (uniqueid, linkedid, src_number, dst_number, raw_transcription), + ).fetchone() + else: + row = conn.execute( + """ + UPDATE transcripts + SET linkedid = COALESCE(%s, linkedid), + src_number = COALESCE(%s, src_number), + dst_number = COALESCE(%s, dst_number), + raw_transcription = %s, + updated_at = now() + WHERE id = %s + AND uniqueid = %s + RETURNING id + """, + (linkedid, src_number, dst_number, raw_transcription, transcript_id, uniqueid), + ).fetchone() if row is None: raise RuntimeError("Failed to upsert transcript") diff --git a/tests/test_api.py b/tests/test_api.py index 8c0b0f7..9dd8f95 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -200,18 +200,24 @@ async def fake_run_in_threadpool(func, *args, **kwargs): assert response.status_code == 200 - progress_mock.assert_called_once_with(uniqueid="1234567890.1234", linkedid="") + progress_mock.assert_called_once_with( + uniqueid="1234567890.1234", + linkedid=None, + src_number=None, + dst_number=None, + ) upsert_mock.assert_called_once_with( + transcript_id=123, uniqueid="1234567890.1234", - linkedid="", + linkedid=None, + src_number=None, + dst_number=None, raw_transcription="SPEAKER 1: Hello world", ) state_mock.assert_any_call(transcript_id=123, state="done") @patch('httpx.AsyncClient') def test_persists_linkedid_when_provided(self, mock_client_class, client, valid_wav_content): - """Ensure linkedid from the request is forwarded to the DB persistence layer.""" - mock_response = Mock() mock_response.json.return_value = { "results": { @@ -258,10 +264,77 @@ async def fake_run_in_threadpool(func, *args, **kwargs): progress_mock.assert_called_once_with( uniqueid="1234567890.1234", linkedid="1234567890.1000", + src_number=None, + dst_number=None, ) upsert_mock.assert_called_once_with( + transcript_id=123, uniqueid="1234567890.1234", linkedid="1234567890.1000", + src_number=None, + dst_number=None, + raw_transcription="SPEAKER 1: Hello world", + ) + state_mock.assert_any_call(transcript_id=123, state="done") + + @patch('httpx.AsyncClient') + def test_persists_src_and_dst_numbers_when_provided(self, mock_client_class, client, valid_wav_content): + mock_response = Mock() + mock_response.json.return_value = { + "results": { + "paragraphs": {"transcript": "SPEAKER 1: Hello world"}, + "channels": [ + { + "alternatives": [ + {"transcript": "Hello world"} + ] + } + ] + } + } + mock_response.raise_for_status = Mock() + + mock_client = AsyncMock() + mock_client.post = AsyncMock(return_value=mock_response) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock() + mock_client_class.return_value = mock_client + + async def fake_run_in_threadpool(func, *args, **kwargs): + return func(*args, **kwargs) + + with patch.dict(os.environ, {"OPENAI_API_KEY": ""}), \ + patch("api.db.is_configured", return_value=True), \ + patch("api.db.upsert_transcript_progress", return_value=123) as progress_mock, \ + patch("api.db.upsert_transcript_raw", return_value=123) as upsert_mock, \ + patch("api.db.set_transcript_state") as state_mock, \ + patch("api.run_in_threadpool", new=fake_run_in_threadpool): + response = client.post( + "/api/get_transcription", + files={"file": ("test.wav", valid_wav_content, "audio/wav")}, + data={ + "uniqueid": "1234567890.1234", + "src_number": "+390111111111", + "dst_number": "+390222222222", + "persist": "true", + "multichannel": "true", + }, + ) + + assert response.status_code == 200 + + progress_mock.assert_called_once_with( + uniqueid="1234567890.1234", + linkedid=None, + src_number="+390111111111", + dst_number="+390222222222", + ) + upsert_mock.assert_called_once_with( + transcript_id=123, + uniqueid="1234567890.1234", + linkedid=None, + src_number="+390111111111", + dst_number="+390222222222", raw_transcription="SPEAKER 1: Hello world", ) state_mock.assert_any_call(transcript_id=123, state="done") @@ -349,8 +422,7 @@ def test_malformed_deepgram_response(self, mock_client_class, client, valid_wav_ assert "Failed to parse transcription response" in response.json()["detail"] @patch('httpx.AsyncClient') - def test_silent_audio_empty_channels_returns_200_with_empty_transcript(self, mock_client_class, client, valid_wav_content): - """Deepgram returns channels:[] (silent/zero-duration audio) → 200 with empty transcript, no retry storm.""" + def test_silent_audio_marks_persisted_transcript_done_and_returns_200(self, mock_client_class, client, valid_wav_content): mock_response = Mock() mock_response.json.return_value = { "metadata": {"duration": 0.0, "channels": 0}, @@ -364,14 +436,28 @@ def test_silent_audio_empty_channels_returns_200_with_empty_transcript(self, moc mock_client.__aexit__ = AsyncMock() mock_client_class.return_value = mock_client - response = client.post( - "/api/get_transcription", - files={"file": ("test.wav", valid_wav_content, "audio/wav")}, - data={"uniqueid": "1234567890.1234"}, - ) + async def fake_run_in_threadpool(func, *args, **kwargs): + return func(*args, **kwargs) + + with patch("api.db.is_configured", return_value=True), \ + patch("api.db.upsert_transcript_progress", return_value=123) as progress_mock, \ + patch("api.db.set_transcript_state") as state_mock, \ + patch("api.run_in_threadpool", new=fake_run_in_threadpool): + response = client.post( + "/api/get_transcription", + files={"file": ("test.wav", valid_wav_content, "audio/wav")}, + data={"uniqueid": "1234567890.1234", "persist": "true"}, + ) assert response.status_code == 200 - assert response.json()["transcript"] == "" + assert response.json() == {"transcript": "", "detected_language": None} + progress_mock.assert_called_once_with( + uniqueid="1234567890.1234", + linkedid=None, + src_number=None, + dst_number=None, + ) + state_mock.assert_called_once_with(transcript_id=123, state="done") @patch('httpx.AsyncClient') def test_missing_paragraphs_transcript_is_error(self, mock_client_class, client, valid_wav_content): diff --git a/tests/test_db.py b/tests/test_db.py index e632474..855b813 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -136,9 +136,63 @@ async def test_ensure_schema_hnsw_success_path(monkeypatch: pytest.MonkeyPatch): # Verify HNSW index attempt was made executed_sql = "\n".join(str(call.args[0]) for call in conn.execute.call_args_list) + assert "uniqueid TEXT NOT NULL UNIQUE" not in executed_sql + assert "DROP CONSTRAINT IF EXISTS transcripts_uniqueid_key" in executed_sql + assert "CREATE INDEX IF NOT EXISTS transcripts_uniqueid_idx" in executed_sql + assert "linkedid TEXT" in executed_sql + assert "src_number TEXT" in executed_sql + assert "dst_number TEXT" in executed_sql assert "USING hnsw" in executed_sql +@pytest.mark.asyncio +async def test_ensure_schema_adds_missing_participant_columns(monkeypatch: pytest.MonkeyPatch): + conn = MagicMock(name="conn") + conn.__enter__.return_value = conn + conn.__exit__.return_value = False + + def execute_side_effect(sql, params=None): + sql_text = str(sql) + cursor = MagicMock(name="cursor") + if "FROM information_schema.columns" in sql_text and params in (("linkedid",), ("src_number",), ("dst_number",)): + cursor.fetchone.return_value = None + else: + cursor.fetchone.return_value = (1,) + return cursor + + conn.execute.side_effect = execute_side_effect + monkeypatch.setattr(db, "_connect_without_pgvector", MagicMock(return_value=conn)) + + await run_in_threadpool(db._ensure_schema) + + executed_sql = "\n".join(str(call.args[0]) for call in conn.execute.call_args_list) + assert "ALTER TABLE transcripts ADD COLUMN linkedid TEXT NULL" in executed_sql + assert "ALTER TABLE transcripts ADD COLUMN src_number TEXT NULL" in executed_sql + assert "ALTER TABLE transcripts ADD COLUMN dst_number TEXT NULL" in executed_sql + + +@pytest.mark.asyncio +async def test_upsert_transcript_progress_inserts_new_row(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(db, "_ensure_schema", lambda: None) + + conn = _make_conn(fetchone_result=(51,)) + monkeypatch.setattr(db, "_connect", MagicMock(return_value=conn)) + + transcript_id = await run_in_threadpool( + db.upsert_transcript_progress, + uniqueid="1234567890.1234", + linkedid="1234567890.1000", + src_number="100", + dst_number="200", + ) + + assert transcript_id == 51 + + executed_sql = "\n".join(str(call.args[0]) for call in conn.execute.call_args_list) + assert "INSERT INTO transcripts (uniqueid, linkedid, src_number, dst_number, raw_transcription, state)" in executed_sql + assert "ON CONFLICT" not in executed_sql + + @pytest.mark.asyncio async def test_upsert_transcript_raw_returns_id(monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr(db, "_ensure_schema", lambda: None) @@ -149,11 +203,45 @@ async def test_upsert_transcript_raw_returns_id(monkeypatch: pytest.MonkeyPatch) transcript_id = await run_in_threadpool( db.upsert_transcript_raw, uniqueid="1234567890.1234", + linkedid="1234567890.1000", + src_number="100", + dst_number="200", raw_transcription="hello", ) assert transcript_id == 42 + executed_sql = "\n".join(str(call.args[0]) for call in conn.execute.call_args_list) + assert "INSERT INTO transcripts (uniqueid, linkedid, src_number, dst_number, raw_transcription)" in executed_sql + assert "ON CONFLICT" not in executed_sql + + +@pytest.mark.asyncio +async def test_upsert_transcript_raw_updates_existing_row_by_id(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(db, "_ensure_schema", lambda: None) + + conn = _make_conn(fetchone_result=(42,)) + monkeypatch.setattr(db, "_connect", MagicMock(return_value=conn)) + + transcript_id = await run_in_threadpool( + db.upsert_transcript_raw, + transcript_id=42, + uniqueid="1234567890.1234", + linkedid="1234567890.1000", + src_number="100", + raw_transcription="hello", + ) + + assert transcript_id == 42 + + executed_sql = "\n".join(str(call.args[0]) for call in conn.execute.call_args_list) + assert "UPDATE transcripts" in executed_sql + assert "linkedid = COALESCE(%s, linkedid)" in executed_sql + assert "src_number = COALESCE(%s, src_number)" in executed_sql + assert "dst_number = COALESCE(%s, dst_number)" in executed_sql + assert "WHERE id = %s" in executed_sql + assert "AND uniqueid = %s" in executed_sql + @pytest.mark.asyncio async def test_upsert_transcript_raw_raises_when_no_row(monkeypatch: pytest.MonkeyPatch): @@ -190,6 +278,24 @@ async def test_update_transcript_ai_fields_executes_update(monkeypatch: pytest.M assert "UPDATE transcripts" in executed_sql +@pytest.mark.asyncio +async def test_set_transcript_state_by_uniqueid_updates_latest_row(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr(db, "_ensure_schema", lambda: None) + + conn = _make_conn() + monkeypatch.setattr(db, "_connect", MagicMock(return_value=conn)) + + await run_in_threadpool( + db.set_transcript_state_by_uniqueid, + uniqueid="1234567890.1234", + state="done", + ) + + executed_sql = "\n".join(str(call.args[0]) for call in conn.execute.call_args_list) + assert "WITH latest_transcript AS" in executed_sql + assert "ORDER BY id DESC" in executed_sql + + @pytest.mark.asyncio async def test_split_text_for_embedding_filters_empty(monkeypatch: pytest.MonkeyPatch): class StubSplitter: