Skip to content

feat: add Inworld TTS transformer#100

Open
cshape wants to merge 17 commits into
rapidaai:mainfrom
cshape:feat/inworld-tts
Open

feat: add Inworld TTS transformer#100
cshape wants to merge 17 commits into
rapidaai:mainfrom
cshape:feat/inworld-tts

Conversation

@cshape

@cshape cshape commented Apr 15, 2026

Copy link
Copy Markdown

Summary

Adds Inworld TTS as a first-class text-to-speech transformer alongside the
existing elevenlabs / cartesia / rime / etc. providers in
api/assistant-api/internal/transformer/.

Inworld exposes an HTTP streaming TTS endpoint at
https://api.inworld.ai/tts/v1/voice:stream that returns NDJSON audio
chunks per request. Rapida's text aggregator already splits LLM deltas at
sentence boundaries before Transform() sees them, so the natural
granularity is one HTTP request per sentence. This shape gets the same
latency as the bidirectional WebSocket alternative — HTTP keep-alive +
HTTP/2 multiplexing on a shared *http.Transport means only the first
sentence pays TCP+TLS cost — while avoiding a significant amount of
per-turn concurrency machinery (no gorilla write locks, no dial
serialization, no per-turn read goroutines, no connection lifecycle).

An earlier revision of this PR used Inworld's bidirectional WebSocket
(wss://.../voice:streamBidirectional). The WS code is still in git
history if you want to compare; the final commit (9ced138) switches
to HTTP streaming and deletes ~130 LOC of concurrency plumbing.

What's in this PR

  • api/assistant-api/internal/transformer/inworld/inworld.go — option
    struct wrapping the vault credential and per-conversation options.
    GetEncoding returns pcm_16000 (byte-for-byte identical to Inworld's
    LINEAR16 @ 16000 Hz); GetVoiceID / GetModelID respect
    speak.voice.id / speak.model with Ashley / inworld-tts-1.5-max
    defaults. Also exports newInworldHTTPClient(), a tuned *http.Client
    with ForceAttemptHTTP2, small idle-conn pool, and 90 s idle timeout.
  • api/assistant-api/internal/transformer/inworld/tts.go — implements
    internal_type.TextToSpeechTransformer. One turnRunner goroutine per
    Rapida context drains a sentence channel and fires one POST /tts/v1/voice:stream per sentence. Interrupt cancels the runner's
    context (which aborts in-flight reads via http.NewRequestWithContext);
    Done closes the sentence channel and the runner emits
    TextToSpeechEndPacket after the final synth drains. tts_latency_ms
    metric is emitted on the first chunk of the first sentence.
  • api/assistant-api/internal/transformer/inworld/internal/type.go
    the StreamRequest / StreamChunk wire types matching Inworld's
    { "result": { "audioContent": "<base64>" } } NDJSON envelope.
  • api/assistant-api/internal/transformer/transformer.go — adds
    INWORLD = "inworld" to AudioTransformer and the
    GetTextToSpeechTransformer dispatch switch. STT dispatch is untouched
    (Inworld STT is a separate follow-up).
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
    unit tests: vault-credential parsing, encoding/voice/model defaults and
    overrides, stream URL, Name() string, plus five streaming-layer tests
    against httptest.Server (see Test plan below).
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
    integration tests behind //go:build integration: lifecycle, streaming
    deltas (one speaking event per delta), interruption + reconnect, and
    two back-to-back sessions. Skipped automatically when
    testdata/integration_config.yaml is missing or inworld.enabled is
    false.
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
    adds a disabled-by-default inworld: stanza so operators can enable the
    test provider with a real key.

Inworld HTTP streaming protocol

POST https://api.inworld.ai/tts/v1/voice:stream
Authorization: Basic ${INWORLD_API_KEY}
Content-Type: application/json

{ "text": "...", "voice_id": "...", "model_id": "...",
  "audio_config": { "audio_encoding": "LINEAR16", "sample_rate_hertz": 16000 } }

# response body: NDJSON, one JSON object per line, ending at EOF
{"result":{"audioContent":"<base64>"}}   # 1..N times
{"error":{"message":"..."}}              # terminal, on failure

Auth header: Authorization: Basic ${INWORLD_API_KEY}. Inworld issues
pre-encoded Basic-auth tokens, so the credential is sent verbatim (no
additional base64 encoding).

Why HTTP instead of WebSocket

The reasoning (captured in 9ced138):

  • Rapida's upstream text aggregator chunks LLM deltas into sentences
    before they reach Transform(). A bidirectional WS that can accept
    incremental characters and return audio on the same socket buys
    nothing in this architecture — the server already has a complete
    sentence every time we talk to it.
  • With HTTP/1.1 keep-alive + HTTP/2 multiplexing on a shared
    *http.Transport, the only request that pays TCP+TLS cost is the
    first one per conversation. Subsequent sentences reuse the same
    connection (asserted by TestInworldTTSKeepAliveReusesConn).
  • The removed machinery — writeMu, initCond/pendingConn,
    per-turn readLoop, handleFlushComplete/handleTurnError,
    bound-ctx-id stale-conn checks — is all about protecting a single
    long-lived WS from concurrent-write panics and cross-turn bleed.
    None of it exists under HTTP streaming: one request per sentence,
    connection pool owns the conn lifecycle, scanner ends when the
    response body closes.

Test plan

go test -race -count=1 ./api/assistant-api/internal/transformer/inworld/...
go test -tags integration -count=1 -v -run TestInworldTTS \
    ./api/assistant-api/internal/transformer/inworld/...

Unit tests (all race-enabled, pure httptest.Server — no network):

  • TestInworldTTSStreamsAudioInOrder — two sentences × two chunks,
    audio packets arrive in submission order
  • TestInworldTTSEmitsEndAfterDone — Delta, Delta, Done →
    TextToSpeechEndPacket fires exactly once, after both drain
  • TestInworldTTSInterruptCancelsInFlight — Interrupt while the
    mock server is still feeding chunks → runner exits, no further
    audio, turn state cleared
  • TestInworldTTSServerErrorSurfaces — NDJSON error line → tts=error
    event AND terminal TextToSpeechEndPacket (callers unblock)
  • TestInworldTTSKeepAliveReusesConn — three sentences, one TCP
    accept (the latency-parity assertion)
  • Constructor/option tests — credential parsing, defaults, overrides,
    stream URL

Integration tests (against the real API, Ashley voice,
inworld-tts-1.5-max):

  • TestInworldTTSLifecycle — audio chunks emitted, TTFB metric
    present, event sequence [initialized, speaking, completed]
  • TestInworldTTSStreamingDeltas — N deltas → N speaking events,
    audio streams through cleanly
  • TestInworldTTSInterruption[initialized, speaking, interrupted, initialized]; runs reconnect cleanly after interrupt
  • TestInworldTTSReconnect — two back-to-back sessions each produce
    audio and a terminal end packet

Follow-ups (not in this PR)

  • UI provider listing (ui/src/providers/inworld/*.json) — adding
    the 21 voices and both models to the UI selector was called out as
    optional in the scoping doc. Happy to stack a follow-up PR once the Go
    transformer lands.
  • Inworld STT — Inworld also exposes stt/v1/transcribe and a
    bidirectional STT WebSocket. Out of scope here; tracked separately.
  • Word timestamps — Inworld supports timestamp_type: "WORD" +
    timestamp_transport_strategy: "ASYNC" for lipsync. Not wired up in
    v1 — add when a downstream consumer needs it.
  • Normalizer — no normalizer.go is included because Inworld accepts
    plain text (no SSML requirements beyond what the pipeline already
    feeds other providers). If the Inworld side later adds phoneme /
    break tags we can follow the elevenlabs / cartesia normalizer pattern.

About

Contributed by Inworld AI (https://inworld.ai). Happy to iterate on style,
naming, or protocol details if anything diverges from Rapida's
conventions.

Summary by CodeRabbit

  • New Features

    • Added Inworld Text-to-Speech provider with HTTP NDJSON streaming: per-sentence streaming audio, configurable voice/model/encoding, interruption/reconnect handling, lifecycle events, connection/latency metrics.
  • Tests

    • Added extensive integration and unit tests for lifecycle, streaming deltas, interruptions, reconnects, error handling, turn isolation, credential validation, and factory routing.
  • Chores

    • Added example integration config for Inworld TTS credentials and options.

Adds Inworld as a first-class text-to-speech provider alongside
elevenlabs / cartesia / rime in api/assistant-api/internal/transformer.

Uses Inworld's bidirectional streaming WebSocket at
wss://api.inworld.ai/tts/v1/voice:streamBidirectional with create /
send_text / close_context framing. End-of-stream is driven by the
server's result.contextClosed frame; audio chunks are delivered as
base64 LINEAR16 @ 16000 Hz (equivalent to pcm_16000).

Lifecycle (init, reconnect-on-interrupt, reconnect-on-drop, per-turn
connection) mirrors the elevenlabs transformer so event and audio-packet
ordering are identical to other streaming providers.

Wires INWORLD into the transformer dispatcher and adds unit + integration
tests covering lifecycle, streaming deltas, interruption, and reconnect.
@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds an Inworld HTTP streaming Text-to-Speech provider: JSON wire types, vault-backed option parsing, an HTTP client, per-turn sentence runners that stream NDJSON audio, factory routing, example config, and unit/integration tests.

Changes

Cohort / File(s) Summary
Protocol Types
api/assistant-api/internal/transformer/inworld/internal/type.go
New Go structs modeling Inworld HTTP/NDJSON wire format: AudioConfig, StreamRequest, StreamChunk, StreamResult, and ErrorBody.
Options & HTTP Client
api/assistant-api/internal/transformer/inworld/inworld.go
New inworldOption with vault credential validation (requires non-empty "key"), getters for auth/encoding/voice/model defaults and overrides, and newInworldHTTPClient creating a tuned http.Client/Transport.
Transformer Implementation
api/assistant-api/internal/transformer/inworld/tts.go
New inworldTTS with constructor and methods (Name/Initialize/Transform/Close): per-Context turn runners queue sentences, POST each sentence to Inworld streaming TTS, parse NDJSON base64 audio chunks, emit audio/end/events/metrics, and handle interruptions and reconnects.
Integration Tests
api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
Integration tests (build tag integration) exercising lifecycle, streaming deltas, interruption, and reconnects against the real service; verify emitted packets, audio, events, and metrics.
Unit & Concurrency Tests
api/assistant-api/internal/transformer/inworld/inworld_test.go
Extensive unit tests using httptest: option validation, defaults/overrides, NDJSON chunk helpers, ordered streaming, interruption and error-frame handling, and TCP keep-alive/connection reuse checks.
Registry & Factory Tests
api/assistant-api/internal/transformer/transformer.go, api/assistant-api/internal/transformer/transformer_test.go
Adds INWORLD provider constant and factory case using NewInworldTextToSpeech; unit test validates routing for valid/empty keys and unknown provider error.
Config Example
api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
Adds tts.inworld example block with enabled, credential.key (pre-encoded Basic-auth token), and options overrides for speak.model and speak.voice.id.
sequenceDiagram
    participant Client
    participant Transformer as InworldTTS<br/>Transformer
    participant Runner as TurnRunner
    participant HTTP as HTTPClient
    participant Inworld as InworldService

    Client->>Transformer: Initialize()
    Transformer->>Transformer: record init timestamp\nemit ConversationEvent "initialized"

    Client->>Transformer: Transform(LLMResponseDelta)
    Transformer->>Runner: enqueue sentence (start runner if absent)
    Transformer->>Client: emit ConversationEvent "speaking"

    Runner->>HTTP: POST /streaming/tts (JSON body)
    HTTP->>Inworld: request
    Inworld-->>HTTP: NDJSON lines (result / error)
    HTTP-->>Runner: stream line
    Runner->>Runner: decode NDJSON\nbase64 -> audio
    Runner->>Transformer: emit TextToSpeechAudioPacket
    Runner->>Transformer: emit metric tts_latency_ms (on first chunk)
    alt server error frame present
        Runner->>Transformer: emit ConversationEvent "error"
    end

    Client->>Transformer: Transform(LLMResponseDone)
    Transformer->>Runner: close sentence channel
    Runner->>Transformer: after drain emit TextToSpeechEndPacket\nemit ConversationEvent "completed"

    Client->>Transformer: Transform(InterruptionDetected)
    Transformer->>Runner: cancel/remove runners
    Transformer->>Transformer: emit ConversationEvent "interrupted" and "initialized"
    Transformer->>HTTP: close idle connections (transport)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Poem

🐰 I hopped through streams of NDJSON so bright,
I unwrapped base64 until sound took flight,
Runners queued sentences, one by one they flew,
Interrupts and reconnects — I watched them through,
A rabbit cheered: "Now hear TTS anew!"

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 68.42% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add Inworld TTS transformer' is clear, specific, and directly summarizes the main change—adding a new Inworld text-to-speech transformer implementation.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/assistant-api/internal/transformer/inworld/inworld.go`:
- Around line 43-57: The parser in NewInworldOption accepts an empty string key
which causes failures later; after extracting key from vaultCredential (in
NewInworldOption, where cx is asserted to string) add a check for key == "" and
return a descriptive error (e.g., "inworld: empty vault key") so the function
fails fast and prevents attempting websocket handshakes with blank auth tokens.

In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 141-144: The current branch that handles payload.Error only logs
it and continues, which swallows provider errors; update the branch that checks
payload.Error (where payload.Error != nil and it.logger.Errorf is called) to
treat the error as terminal: emit a TextToSpeechEndPacket (or the existing
failure/end signal used by the transformer) for the active turn so callers
receive an end/failure, clear any connection/turn state held by the transformer
(reset active turn/session fields), and close the underlying socket/connection
so the transformer stops processing further frames; keep the error log but
perform these shutdown steps immediately after logging.
- Around line 121-132: The turn-scoped state (contextId, ttsStartedAt,
ttsMetricSent, connection) is being shared across turns causing late events from
old websockets to mutate the new turn; modify readLoop() and
handleFlushComplete() to capture the current turn's websocket-specific state at
the start (e.g., copy connection, contextId, ttsStartedAt, ttsMetricSent into
local variables or a small turn struct) and use those locals for reads, error
handling and metric emission so that Transform() switching turns cannot cause
late goroutines to emit under the new ContextID or nil out the fresh connection;
ensure errors that would set connection = nil only operate on the captured
connection identity (or a turn-local connection pointer) rather than the shared
it.connection.

In `@api/assistant-api/internal/transformer/transformer.go`:
- Around line 83-84: Add a unit test that exercises the factory selection path:
call GetTextToSpeechTransformer (or the exported factory function used in
transformer.go) with provider set to "inworld" and the same
ctx/logger/credential/onPacket/opts shape used by other tests, then assert the
returned transformer is non-nil and is created via the INWORLD branch (e.g., by
checking type identity or that the concrete type matches
internal_transformer_inworld.NewInworldTextToSpeech). Also include a
success-path assertion (transformer usable/non-nil) and a fallback/error
assertion for a non-existent provider to satisfy selection/fallback coverage.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4c3daf23-2708-4408-8dc8-3e40713f318e

📥 Commits

Reviewing files that changed from the base of the PR and between a1aef98 and 6773bbb.

📒 Files selected for processing (7)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
  • api/assistant-api/internal/transformer/transformer.go

Comment thread api/assistant-api/internal/transformer/inworld/inworld.go
Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated
Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated
Comment thread api/assistant-api/internal/transformer/transformer.go
Addresses all four actionable CodeRabbit findings from PR rapidaai#100 and wires
Inworld's auto_mode for text streaming.

- inworld.go: reject empty vault keys during option parsing (fail fast
  instead of at websocket handshake).
- tts.go: bind turn state to the websocket that produced it. Each turn's
  (conn, contextCreated, ttsStartedAt, ttsMetricSent) now lives in a
  per-context turnState entry in a map; readLoop and handleFlushComplete
  capture the Inworld context_id at spawn and use only that, so late
  frames from a prior turn cannot be emitted under a new turn's ContextID
  and a stale goroutine can't nil out the fresh connection.
- tts.go: treat server error frames as terminal — emit
  TextToSpeechEndPacket + a "tts:error" event, delete the turn entry,
  close the socket. Previously the branch only logged and callers
  waiting on end-of-turn would hang.
- transformer_test.go: add TestGetTextToSpeechTransformer_InworldRouting
  to assert the INWORLD factory branch resolves to *inworldTTS and that
  unknown providers still hit the factory-error fallback.
- internal/type.go + tts.go: set auto_mode:true in the create frame and
  drop the empty flush_context from every send_text. Under auto_mode the
  server controls flushing for lowest latency while keeping audio
  quality; the explicit close_context at end-of-turn is still sent.

New tests (all race-clean):
- TestNewInworldOption_EmptyKey
- TestInworldTTSTurnsDoNotBleed (regression for the critical bleed bug —
  drives two interleaved turns and verifies a late audio frame on turn
  A's still-open socket is tagged ctx-A even after turn B has started)
- TestInworldTTSCreateFrameEnablesAutoMode (wire-level assertion)
- TestInworldTTSServerErrorTerminatesTurn

in-5rkw
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

CodeRabbit review addressed + Inworld auto_mode enabled

All four actionable items from the CodeRabbit pass are resolved in 4e82a7d. Summary of what changed and where, with the reasoning the reviewer asked for:

1. Reject blank vault keys (minor) — inworld.go

Added an explicit key == "" check in NewInworldOption so misconfiguration surfaces at option-parsing time instead of as an opaque websocket-handshake failure. Covered by TestNewInworldOption_EmptyKey.

2. Bind turn state to its own websocket (CRITICAL) — tts.go

Replaced the shared contextId / contextCreated / connection / ttsStartedAt / ttsMetricSent fields with a per-turn turnState held in map[string]*turnState keyed by the Inworld context id.

  • readLoop(conn, boundCtxID) and handleFlushComplete(conn, boundCtxID) capture the turn's contextID at spawn time and use it (or payload.Result.ContextID when the server echoes it) for packet emission. They never read it.contextId, so late frames from a prior turn cannot be re-tagged with a new turn's ContextID.
  • A fresh websocket is dialed per turn (via acquireConn which consumes the pending conn from Initialize() or dials a new one). handleFlushComplete and the error handler only delete/close their own turn's entry, so a stale goroutine can no longer nil out a live connection.
  • InterruptionDetectedPacket closes all open turn sockets and the pending conn, clears the map, then Initialize() for the next turn.
  • Regression test: TestInworldTTSTurnsDoNotBleed drives two interleaved turns through Transform against an in-process mock websocket server, then fires a late audio frame on turn A's still-open socket after turn B has started. Asserts that late frame is tagged ctx-A — the old shared-state impl would have emitted it as ctx-B.

3. Server error frames are terminal (major) — tts.go

payload.Error != nil now calls a new handleTurnError, which emits a TextToSpeechEndPacket (so callers waiting on end-of-turn unblock), emits a tts:error conversation event for observability, deletes the turn state, and closes the socket. Covered by TestInworldTTSServerErrorTerminatesTurn.

4. Selection test for the INWORLD registry branch (minor) — transformer_test.go

Added TestGetTextToSpeechTransformer_InworldRouting with three subtests:

  • success path: GetTextToSpeechTransformer(ctx, logger, "inworld", <valid cred>, ...) returns a non-nil transformer whose Name() is inworld-text-to-speech, proving it resolved through the INWORLD branch and not another provider.
  • factory rejects empty key: {"key": ""} credential surfaces an error at the factory rather than later.
  • unknown provider: falls through to the existing "illegal text to speech idenitfier" error.

5. Enable Inworld auto_mode (feature ask from Cale) — internal/type.go + tts.go

Extended CreateBody with AutoMode bool \json:"auto_mode,omitempty"`and setAutoMode: trueon the create frame. Since the server now controls flushing under auto_mode, dropped the emptyflush_contextfrom everysend_text(the explicitclose_contextat end-of-turn is still sent). Updated the inline comment that previously explained why we flushed on every send. Wire-level testTestInworldTTSCreateFrameEnablesAutoModeasserts the create frame hasauto_mode:trueand the send_text frame has noflush_context` key at all.

Quality gates

  • go vet ./internal/transformer/inworld/... — clean
  • go build ./internal/transformer/inworld/... — clean
  • go test -race -count=1 ./internal/transformer/inworld/... — all pass, race-clean
  • gofmt — clean
  • The ./internal/transformer/... package (including the new selection test) compiles alongside the rest of assistant-api; it was not exercised directly in my local env because of the pre-existing CGO deps (Microsoft Speech SDK, onnxruntime) that the CI workflow also skips via CGO_ENABLED=0 + selective paths and relies on Docker for validation.

Commit: 4e82a7d

@coderabbitai please re-review.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

@cshape: Thank you for the detailed write-up! The changes in 4e82a7d look well-scoped — the per-turn turnState map addressing the critical shared-state race, the handleTurnError unblocking callers on server errors, the blank-key early rejection, and the auto_mode wire-level test all sound solid. Kicking off a fresh review now.


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go
✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
api/assistant-api/internal/transformer/inworld/tts.go (1)

393-439: Wrap returned write errors with operation context.

These branches log the failure and then return the raw err, which drops whether the failure came from create, send_text, or close_context. Wrap the returned error with %w so callers preserve both the cause and the failing operation.
As per coding guidelines, "Return errors with context wrapping using fmt.Errorf with %w verb for error chain preservation".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 393 -
439, The error returns from the WriteJSON calls should be wrapped with
contextual messages using fmt.Errorf("%s: %w", ...) so the caller retains both
the operation and original error; update the three places where
conn.WriteJSON(...) is followed by it.logger.Errorf(...) and return err —
specifically the createFrame write, the sendFrame (send_text) write, and the
closeFrame (close_context) write — to return fmt.Errorf("inworld-tts: create
frame write failed: %w", err), fmt.Errorf("inworld-tts: send_text write failed:
%w", err), and fmt.Errorf("inworld-tts: close_context write failed: %w", err)
respectively, and add an import for fmt if missing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 279-299: acquireConn currently races with Initialize over
it.pendingConn because the nil-check and the dial/readback happen without
holding it.mu; fix by serializing connection creation: introduce an initializing
flag (or a sync.Cond) guarded by it.mu so only one goroutine calls
Initialize/dials and others wait for pendingConn to be set (or for Initialize to
return an error). Update acquireConn to hold the lock, if pendingConn==nil and
!initializing set initializing=true, release lock and call Initialize, then
re-acquire lock to set initializing=false and notify waiters; if
pendingConn==nil and initializing==true then wait (via Cond or loop) until
pendingConn is non-nil or Initialize failed. Also ensure Initialize respects and
sets it.pendingConn under the same mu and signals waiters so callers don't hit
"pending connection missing after Initialize" or steal each other's connection.
- Around line 371-439: The code currently unlocks it.mu before calling
conn.WriteJSON, allowing concurrent writers on the same websocket; add a
per-turn write mutex (e.g., add a sync.Mutex field like writeMu to the turnState
struct) and use it to serialize all writes for that turn inside Transform():
acquire ts.writeMu (defer unlock) around each conn.WriteJSON call (for
CreateRequest, SendTextRequest, and CloseContextRequest) so
createFrame/sendFrame/closeFrame cannot interleave, and ensure the turnState is
initialized with that mutex when the turn is created.

---

Nitpick comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 393-439: The error returns from the WriteJSON calls should be
wrapped with contextual messages using fmt.Errorf("%s: %w", ...) so the caller
retains both the operation and original error; update the three places where
conn.WriteJSON(...) is followed by it.logger.Errorf(...) and return err —
specifically the createFrame write, the sendFrame (send_text) write, and the
closeFrame (close_context) write — to return fmt.Errorf("inworld-tts: create
frame write failed: %w", err), fmt.Errorf("inworld-tts: send_text write failed:
%w", err), and fmt.Errorf("inworld-tts: close_context write failed: %w", err)
respectively, and add an import for fmt if missing.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a159d84b-1409-420d-96cb-8d0884c71efa

📥 Commits

Reviewing files that changed from the base of the PR and between 6773bbb and 4e82a7d.

📒 Files selected for processing (5)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/transformer_test.go

Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated
Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
api/assistant-api/internal/transformer/inworld/tts.go (2)

279-299: ⚠️ Potential issue | 🟠 Major

Serialize pendingConn initialization and adoption.

If two first-delta calls hit while pendingConn is nil, both can race through the unlocked check, both dial in Initialize(), and one turn can close or steal the other's socket before the loser reaches Line 297. Guard the Initialize/consume path under one condition or singleflight-style state so only one goroutine creates and hands out the pending connection at a time.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 279 -
299, inworldTTS.acquireConn currently races when pendingConn is nil because
multiple goroutines can call Initialize concurrently; serialize creation by
adding a singleflight-style guard or a connecting flag+sync.Cond on the
inworldTTS struct and use it inside acquireConn: under mu, if pendingConn != nil
return it; if connecting is true then wait on cond until pendingConn is set (or
connecting cleared) and loop; if connecting is false set connecting = true,
unlock, call Initialize(), re-lock, set pendingConn from the result, set
connecting = false, cond.Broadcast(), then consume pendingConn as before.
Reference symbols: acquireConn, pendingConn, Initialize, connecting (new bool),
and a sync.Cond on the receiver to ensure only one goroutine creates the
connection and others wait for it.

371-439: ⚠️ Potential issue | 🔴 Critical

Protect each turn socket from concurrent writers.

Transform() can be entered concurrently for the same ContextID, but create, send_text, and close_context all write to ts.conn without a turn-local write lock. Gorilla WebSocket only allows one concurrent writer per connection, and the current needCreate check also lets two goroutines race into duplicate create frames. Add a writeMu to turnState and serialize the full create/send/close path under it. (pkg.go.dev)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 371 -
439, Add a per-turn writer mutex to turnState (e.g., writeMu sync.Mutex) and use
it inside Transform() to serialize all writes to ts.conn (the entire
create/send_text/close_context sequence). Wrap the create/send/close logic with
ts.writeMu.Lock()/Unlock(), and move the needCreate/contextCreated check and any
mutation of ts.contextCreated inside that locked section to prevent the
duplicate-create race; keep other turn-state accesses protected by the existing
it.mu as needed but ensure all conn.WriteJSON calls occur while holding
ts.writeMu. Ensure Transform() uses the new writeMu whenever writing to ts.conn
(including the createFrame, sendFrame, and closeFrame paths).
🧹 Nitpick comments (2)
api/assistant-api/internal/transformer/inworld/inworld_integration_test.go (1)

171-189: Prefer an event wait over the fixed post-interrupt sleep.

Line 176 waits a hard-coded two seconds after sending InterruptionDetectedPacket. That makes the test slower on fast runs and still flaky on slow ones; waiting until the "interrupted" event or the second "initialized" event appears would be deterministic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/inworld_integration_test.go`
around lines 171 - 189, The test currently uses a fixed time.Sleep after calling
tts.Transform with internal_type.InterruptionDetectedPacket, which makes it slow
and flaky; replace the hard-coded sleep with a deterministic wait that polls
collector.EventPackets() (or uses a helper) until either the "interrupted" event
is present or the count of "initialized" events (computed by
ttsEventTypes/iteration) is >= 2, with an overall timeout (e.g., a few seconds)
and a failure assertion if the timeout elapses; keep the existing assertions and
logging (eventTypes, ttsEventTypes, collector.EventPackets(), and the
tts.Transform call) but remove the time.Sleep and use the polling/timeout
approach to make the test reliable.
api/assistant-api/internal/transformer/inworld/tts.go (1)

104-107: Wrap transport errors before returning them.

These branches log the operation name but then return the raw websocket error, which drops the failure site for callers. Returning fmt.Errorf("inworld-tts: create frame write failed: %w", err)-style errors would preserve the chain and the operation context.

As per coding guidelines "Return errors with context wrapping using fmt.Errorf with %w verb for error chain preservation".

Also applies to: 393-395, 410-412, 437-439

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 104 -
107, Wrap returned websocket/transport errors with contextual fmt.Errorf calls
using the %w verb instead of returning raw err; e.g., in the block that calls
websocket.DefaultDialer.Dial (and the other occurrences at the ranges mentioned:
393-395, 410-412, 437-439) change the return to something like
fmt.Errorf("inworld-tts: dial failed: %w", err) or fmt.Errorf("inworld-tts:
create frame write failed: %w", err) while keeping the existing it.logger.Errorf
logging, so callers receive error chains that include the operation context;
update each return site that currently does `return err` to return a wrapped
error using fmt.Errorf with %w.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 242-249: The ConversationEventPacket instances for turn-scoped TTS
events are being sent without their ContextID set; update the calls to
it.onPacket that create ConversationEventPacket (e.g., the one paired with
internal_type.TextToSpeechEndPacket and the other packets emitted for
"speaking", "interrupted", "completed", and "error") to set
ConversationEventPacket.ContextID = boundCtxID so the turn ID is attached before
sending; mirror the same fix for the other occurrences referenced (the other
onPacket calls and the async events produced in readLoop()) so concurrent turns
carry unambiguous context.

In
`@api/assistant-api/internal/transformer/testdata/integration_config.yaml.example`:
- Around line 30-32: The sample credential name is misleading: the YAML key
"key" currently uses the placeholder "YOUR_INWORLD_API_KEY" while the comment
says it should be a pre-encoded Basic token; update the placeholder to something
like "YOUR_INWORLD_BASIC_TOKEN" (or similar) so it clearly matches the expected
auth format and reduces copy/paste errors when using the "key" field in the
integration_config.yaml.example.

---

Duplicate comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 279-299: inworldTTS.acquireConn currently races when pendingConn
is nil because multiple goroutines can call Initialize concurrently; serialize
creation by adding a singleflight-style guard or a connecting flag+sync.Cond on
the inworldTTS struct and use it inside acquireConn: under mu, if pendingConn !=
nil return it; if connecting is true then wait on cond until pendingConn is set
(or connecting cleared) and loop; if connecting is false set connecting = true,
unlock, call Initialize(), re-lock, set pendingConn from the result, set
connecting = false, cond.Broadcast(), then consume pendingConn as before.
Reference symbols: acquireConn, pendingConn, Initialize, connecting (new bool),
and a sync.Cond on the receiver to ensure only one goroutine creates the
connection and others wait for it.
- Around line 371-439: Add a per-turn writer mutex to turnState (e.g., writeMu
sync.Mutex) and use it inside Transform() to serialize all writes to ts.conn
(the entire create/send_text/close_context sequence). Wrap the create/send/close
logic with ts.writeMu.Lock()/Unlock(), and move the needCreate/contextCreated
check and any mutation of ts.contextCreated inside that locked section to
prevent the duplicate-create race; keep other turn-state accesses protected by
the existing it.mu as needed but ensure all conn.WriteJSON calls occur while
holding ts.writeMu. Ensure Transform() uses the new writeMu whenever writing to
ts.conn (including the createFrame, sendFrame, and closeFrame paths).

---

Nitpick comments:
In `@api/assistant-api/internal/transformer/inworld/inworld_integration_test.go`:
- Around line 171-189: The test currently uses a fixed time.Sleep after calling
tts.Transform with internal_type.InterruptionDetectedPacket, which makes it slow
and flaky; replace the hard-coded sleep with a deterministic wait that polls
collector.EventPackets() (or uses a helper) until either the "interrupted" event
is present or the count of "initialized" events (computed by
ttsEventTypes/iteration) is >= 2, with an overall timeout (e.g., a few seconds)
and a failure assertion if the timeout elapses; keep the existing assertions and
logging (eventTypes, ttsEventTypes, collector.EventPackets(), and the
tts.Transform call) but remove the time.Sleep and use the polling/timeout
approach to make the test reliable.

In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 104-107: Wrap returned websocket/transport errors with contextual
fmt.Errorf calls using the %w verb instead of returning raw err; e.g., in the
block that calls websocket.DefaultDialer.Dial (and the other occurrences at the
ranges mentioned: 393-395, 410-412, 437-439) change the return to something like
fmt.Errorf("inworld-tts: dial failed: %w", err) or fmt.Errorf("inworld-tts:
create frame write failed: %w", err) while keeping the existing it.logger.Errorf
logging, so callers receive error chains that include the operation context;
update each return site that currently does `return err` to return a wrapped
error using fmt.Errorf with %w.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 70703a24-b601-4540-b98a-d930e675658a

📥 Commits

Reviewing files that changed from the base of the PR and between a1aef98 and 4e82a7d.

📒 Files selected for processing (8)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
  • api/assistant-api/internal/transformer/transformer.go
  • api/assistant-api/internal/transformer/transformer_test.go

Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated
Comment thread api/assistant-api/internal/transformer/testdata/integration_config.yaml.example Outdated
…text-tagged events (in-k62d)

- Serialize all WriteJSON calls against an Inworld turn's *websocket.Conn
  through a per-turn writeMu so concurrent Transform calls for the same
  turn (create frame racing send_text, or two send_text's in flight) can't
  trigger gorilla's "concurrent write to websocket connection" panic.
- Guard pendingConn mutation with a sync.Cond + initializing flag so
  concurrent first-delta callers serialize through a single dial instead
  of racing to replace/close each other's pendingConn (which previously
  surfaced as the "pending connection missing after Initialize" error).
  Introduces an injectable dialer field so tests can count/mock dials
  without hitting the network.
- Populate ContextID on every ConversationEventPacket emitted by the
  TTS transformer (speaking, interrupted, completed, error) so async
  events from readLoop carry an unambiguous turn id.
- Clarify the integration_config.yaml.example comment for the Inworld
  credential so it's clear the value is a pre-encoded Basic token.
  (Placeholder stays YOUR_INWORLD_API_KEY for consistency with Inworld's
  own docs and SDKs.)
- Add two -race regression tests:
  * TestInworldTTSConcurrentWritesNoRace — verifies the per-turn writeMu
    prevents gorilla's concurrent-write panic under N goroutines x M
    WriteJSON calls for the same turn.
  * TestInworldTTSConcurrentInitDialsOnce — verifies pendingConn
    serialization under N concurrent first-delta callers with an
    injected counting dialer: no "pending connection missing" errors,
    exactly one turn installed, dial count bounded.
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

@coderabbitai addressed round 2 in 5adba9d. Summary:

1. Serialize websocket writes per turn (critical)

Added a writeMu sync.Mutex field to turnState and wrapped every conn.WriteJSON call (create frame, send_text, close_context) in ts.writeMu.Lock() / Unlock(). it.mu and ts.writeMu are never held at the same time — we take it.mu briefly to look up ts and read contextCreated, release it, then take ts.writeMu for the write. This prevents the gorilla/websocket "concurrent write to websocket connection" panic that could fire when two Transform calls for the same turn interleave (e.g. caller A's create + send_text overlapping caller B's send_text).

Regression test added: TestInworldTTSConcurrentWritesNoRace fires 8 goroutines × 25 writes at the same turn and fails on any concurrent-write panic. Runs clean under -race; manually verified it fails without the writeMu.

2. Serialize pendingConn acquisition (major)

Introduced a sync.Cond (initCond) guarded by it.mu plus an initializing bool flag. Only one goroutine at a time performs a dial; others Wait() on the cond and re-check pendingConn on wake-up. pendingConn is now only ever mutated by the goroutine that holds initializing=true, so the previous race — where a second Initialize() could replace/close an earlier dialer's pending conn, causing a caller to steal the other's connection or hit the "pending connection missing after Initialize" path — is gone. Refactored the actual handshake into a small stateless dial() helper that both Initialize() and acquireConn() route through.

Regression test added: TestInworldTTSConcurrentInitDialsOnce fires 16 concurrent first-delta callers at an empty transformer with an injectable counting dialer; asserts no "pending connection missing" errors, exactly one turnState installed for the shared ctxID, and that the dial count is bounded by the number of callers.

3. Populate ContextID on ConversationEventPackets (major)

Every ConversationEventPacket emitted by the TTS transformer now carries the turn's ContextID:

  • speaking — uses the delta's ctxID in Transform
  • interrupted — uses the inbound InterruptionDetectedPacket.ContextID
  • completed — uses boundCtxID in handleFlushComplete
  • error — uses boundCtxID in handleTurnError

closed (emitted from Close) was already tagged with activeContext; initialized (emitted from Initialize) predates any turn so it stays untagged by design.

4. yaml rename — kept as INWORLD_API_KEY by convention

Keeping the placeholder YOUR_INWORLD_API_KEY rather than renaming to YOUR_INWORLD_BASIC_TOKEN: Inworld's own docs and SDKs all use INWORLD_API_KEY universally, so matching that name is worth more than literal accuracy. I did improve the surrounding comment to call out explicitly that the value is a pre-encoded basic-auth token ("client:secret" already base64-encoded) and must NOT be base64-encoded again:

# Your Inworld API key — from platform.inworld.ai → API Keys →
# "Basic (Base64)". Despite the "API key" name, the value is a
# pre-encoded basic-auth token ("client:secret" already base64-encoded);
# the transformer sends it verbatim in the Authorization: Basic <key>
# header, so do NOT base64-encode it again. Keyed as INWORLD_API_KEY
# for consistency with Inworld's own docs and SDKs, which all use that
# name (see https://platform.inworld.ai).

Test results

go test -race -count=1 ./internal/transformer/inworld/... passes clean (all original + 2 new tests).

Re-review please, @coderabbitai.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

@cshape Thanks for the detailed round 2 summary! The four items all look well-reasoned:

  1. writeMu per turn — the lock-ordering discipline (it.mu briefly for lookup → release → ts.writeMu for the actual write) is the right pattern to prevent the gorilla/websocket concurrent-write panic without introducing deadlock risk.
  2. sync.Cond + initializing flag — the single-dialer gate with Wait()/Broadcast() is the idiomatic Go approach for this kind of "dial once, fan out" initialization problem.
  3. ContextID on ConversationEventPackets — tagging all turn-scoped events (speaking, interrupted, completed, error) while intentionally leaving initialized untagged (since it predates any turn) is a clean, consistent policy.
  4. YAML placeholder naming — keeping INWORLD_API_KEY for consistency with Inworld's own SDK/docs convention, backed by an expanded clarifying comment, is a reasonable trade-off.

Kicking off a fresh review now.


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go
✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
api/assistant-api/internal/transformer/inworld/tts.go (1)

484-490: Wrap returned errors with context for chain preservation.

Per coding guidelines, errors should be wrapped using fmt.Errorf with %w to preserve the error chain. These locations return the raw error without additional context, making it harder to trace the failure path in logs.

♻️ Proposed fix to wrap errors with context
 		ts.writeMu.Lock()
 		err := conn.WriteJSON(createFrame)
 		ts.writeMu.Unlock()
 		if err != nil {
-			it.logger.Errorf("inworld-tts: create frame write failed: %v", err)
-			return err
+			return fmt.Errorf("inworld-tts: create frame write failed: %w", err)
 		}
 		ts.writeMu.Lock()
 		err := conn.WriteJSON(sendFrame)
 		ts.writeMu.Unlock()
 		if err != nil {
-			it.logger.Errorf("inworld-tts: send_text write failed: %v", err)
-			return err
+			return fmt.Errorf("inworld-tts: send_text write failed: %w", err)
 		}
 		ts.writeMu.Lock()
 		err := conn.WriteJSON(closeFrame)
 		ts.writeMu.Unlock()
 		if err != nil {
-			it.logger.Errorf("inworld-tts: close_context write failed: %v", err)
-			return err
+			return fmt.Errorf("inworld-tts: close_context write failed: %w", err)
 		}

As per coding guidelines: "Return errors with context wrapping using fmt.Errorf with %w verb for error chain preservation".

Also applies to: 504-507, 535-538

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 484 -
490, The returned errors in the inworld TTS code are being returned raw; update
each return to wrap the underlying error with context using fmt.Errorf and %w so
the error chain is preserved—e.g. where the block logs with
it.logger.Errorf("inworld-tts: create frame write failed: %v", err) and then
returns err, change the return to return fmt.Errorf("inworld-tts: create frame
write failed: %w", err); apply the same wrapping pattern to the other
occurrences referenced (around the create frame write/related returns at the
blocks corresponding to the ranges you noted, including the ones at lines
~504-507 and ~535-538) and ensure fmt is imported if not already.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 484-490: The returned errors in the inworld TTS code are being
returned raw; update each return to wrap the underlying error with context using
fmt.Errorf and %w so the error chain is preserved—e.g. where the block logs with
it.logger.Errorf("inworld-tts: create frame write failed: %v", err) and then
returns err, change the return to return fmt.Errorf("inworld-tts: create frame
write failed: %w", err); apply the same wrapping pattern to the other
occurrences referenced (around the create frame write/related returns at the
blocks corresponding to the ranges you noted, including the ones at lines
~504-507 and ~535-538) and ensure fmt is imported if not already.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 642e1368-d0ad-40f3-96a6-764d146d7515

📥 Commits

Reviewing files that changed from the base of the PR and between 4e82a7d and 5adba9d.

📒 Files selected for processing (3)
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
✅ Files skipped from review due to trivial changes (1)
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
api/assistant-api/internal/transformer/inworld/inworld.go (1)

43-61: Option validation is solid; consider nil-safety for credentials.

The empty key check (Lines 53-55) correctly fails fast on misconfiguration. However, vaultCredential.GetValue().AsMap() at Line 45 could panic if vaultCredential is nil or GetValue() returns nil. Consider adding a defensive check.

🛡️ Optional: Add nil-safety for credential access
 func NewInworldOption(logger commons.Logger, vaultCredential *protos.VaultCredential,
 	opts utils.Option) (*inworldOption, error) {
+	if vaultCredential == nil || vaultCredential.GetValue() == nil {
+		return nil, fmt.Errorf("inworld: vault credential is nil or empty")
+	}
 	cx, ok := vaultCredential.GetValue().AsMap()["key"]
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/inworld.go` around lines 43 -
61, The NewInworldOption constructor currently calls
vaultCredential.GetValue().AsMap() which can panic if vaultCredential or its
GetValue() is nil; add defensive nil checks at the start of NewInworldOption:
verify vaultCredential != nil and that v := vaultCredential.GetValue(); v != nil
before calling v.AsMap(), returning a clear error (e.g., "inworld: nil vault
credential" or "inworld: nil vault value") when either is nil, then proceed with
the existing map lookup and the rest of the inworldOption construction.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@api/assistant-api/internal/transformer/inworld/inworld.go`:
- Around line 43-61: The NewInworldOption constructor currently calls
vaultCredential.GetValue().AsMap() which can panic if vaultCredential or its
GetValue() is nil; add defensive nil checks at the start of NewInworldOption:
verify vaultCredential != nil and that v := vaultCredential.GetValue(); v != nil
before calling v.AsMap(), returning a clear error (e.g., "inworld: nil vault
credential" or "inworld: nil vault value") when either is nil, then proceed with
the existing map lookup and the rest of the inworldOption construction.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dc37bf0b-d177-4ea2-a3b0-c4395950efa2

📥 Commits

Reviewing files that changed from the base of the PR and between a1aef98 and 5adba9d.

📒 Files selected for processing (8)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
  • api/assistant-api/internal/transformer/transformer.go
  • api/assistant-api/internal/transformer/transformer_test.go

Addresses two coderabbitai nitpicks on PR rapidaai#100:

- Wrap the three WriteJSON errors in Transform() (create / send_text /
  close_context) with fmt.Errorf(... %w ...) so the underlying error is
  preserved in the chain for errors.Is / errors.As downstream.
- Add nil checks for vaultCredential and its GetValue() at the top of
  NewInworldOption so a missing credential returns a clear error instead
  of panicking inside AsMap().

Two new unit tests: TestNewInworldOption_NilCredential and
TestNewInworldOption_NilVaultValue.
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

@coderabbitai addressed both nitpicks in dfffe85.

1. Error-chain preservation with %w

The three WriteJSON errors in Transform() — create frame, send_text, close_context — now wrap the underlying error:

return fmt.Errorf("inworld-tts: create frame write failed: %w", err)
return fmt.Errorf("inworld-tts: send_text write failed: %w", err)
return fmt.Errorf("inworld-tts: close_context write failed: %w", err)

Callers using errors.Is / errors.As (e.g. to detect websocket.CloseError or context cancellation) can now peel the chain.

2. Nil-safety in NewInworldOption

Added defensive nil checks before dereferencing:

if vaultCredential == nil {
    return nil, fmt.Errorf("inworld: nil vault credential")
}
val := vaultCredential.GetValue()
if val == nil {
    return nil, fmt.Errorf("inworld: nil vault value")
}

Two new unit tests cover both paths: TestNewInworldOption_NilCredential, TestNewInworldOption_NilVaultValue.

go test -race -count=1 ./internal/transformer/inworld/... passes cleanly.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

Tip

For best results, initiate chat on the files or code changes.

@cshape: Both fixes look great — the %w wrapping makes the error chain inspectable for callers relying on errors.Is/errors.As (e.g. detecting websocket.CloseError), and the nil guards on vaultCredential and its .GetValue() result are the right defensive posture for a constructor that surfaces errors early. The paired unit tests (TestNewInworldOption_NilCredential, TestNewInworldOption_NilVaultValue) round it off cleanly. Thanks for the thorough follow-through across all three rounds!


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
api/assistant-api/internal/transformer/inworld/tts.go (1)

324-332: Consider consistent packet ordering between success and error paths.

The packet emission order differs between handleFlushComplete (line 301-309: TextToSpeechEndPacket then ConversationEventPacket) and handleTurnError (line 324-332: ConversationEventPacket then TextToSpeechEndPacket). If downstream consumers rely on packet ordering, this inconsistency could cause subtle issues.

♻️ Suggested fix for consistency
 	_ = it.onPacket(
+		internal_type.TextToSpeechEndPacket{ContextID: boundCtxID},
 		internal_type.ConversationEventPacket{
 			ContextID: boundCtxID,
 			Name:      "tts",
 			Data:      map[string]string{"type": "error", "message": message},
 			Time:      time.Now(),
 		},
-		internal_type.TextToSpeechEndPacket{ContextID: boundCtxID},
 	)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 324 -
332, The error path in handleTurnError emits
internal_type.ConversationEventPacket before
internal_type.TextToSpeechEndPacket, which is the reverse of
handleFlushComplete; change handleTurnError so it calls it.onPacket with
internal_type.TextToSpeechEndPacket for boundCtxID first, then emits the
internal_type.ConversationEventPacket (type "error") to match the ordering used
by handleFlushComplete and keep packet ordering consistent for downstream
consumers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 324-332: The error path in handleTurnError emits
internal_type.ConversationEventPacket before
internal_type.TextToSpeechEndPacket, which is the reverse of
handleFlushComplete; change handleTurnError so it calls it.onPacket with
internal_type.TextToSpeechEndPacket for boundCtxID first, then emits the
internal_type.ConversationEventPacket (type "error") to match the ordering used
by handleFlushComplete and keep packet ordering consistent for downstream
consumers.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e4da5b11-382e-49cd-ab48-307aa1243f38

📥 Commits

Reviewing files that changed from the base of the PR and between 5adba9d and dfffe85.

📒 Files selected for processing (3)
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
✅ Files skipped from review due to trivial changes (1)
  • api/assistant-api/internal/transformer/inworld/inworld_test.go

Rapida's upstream aggregator already splits LLM deltas at sentence
boundaries before calling Transform(), so the bidirectional WebSocket's
character-streaming capability was never being exploited. Replacing it
with one POST per sentence to voice:stream keeps latency competitive
(HTTP keep-alive + HTTP/2 multiplexing on a shared Transport) while
dropping:

- the per-turn writeMu (gorilla concurrent-write protection)
- the initCond / initializing / pendingConn dial-serialization dance
- the readLoop / handleFlushComplete / handleTurnError lifecycle
- the gorilla/websocket dependency and wsDialFunc injection surface
- the stale-conn / bound-ctxID defense-in-depth in the read path

New structure: one turnRunner per Rapida context drains a sentence
channel and fires one synth() HTTP POST per sentence. Interrupt
cancels the runner's context (aborting in-flight reads); Done closes
the channel (runner drains then emits TextToSpeechEndPacket).

tts.go: 595 → 468 LOC (426 → 332 non-blank/comment).

Tests rewritten against httptest:
- TestInworldTTSStreamsAudioInOrder
- TestInworldTTSEmitsEndAfterDone
- TestInworldTTSInterruptCancelsInFlight
- TestInworldTTSServerErrorSurfaces
- TestInworldTTSKeepAliveReusesConn (asserts single TCP accept across
  three sentences — the load-bearing assertion for latency parity)

All constructor/option tests retained unchanged.
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

Pushed 9ced138 — switched from the bidirectional WebSocket to HTTP streaming.

Why: Rapida's text aggregator (aggregator/text/internal/default/default_llm_text_aggregator.go) already splits LLM deltas at sentence boundaries before Transform() sees them, so the WS's character-streaming capability was never being exercised in this architecture. HTTP streaming with keep-alive + HTTP/2 on a shared http.Transport matches the WS latency (single accept across N sentences in the test suite) while dropping several classes of review-hot complexity.

What went away:

  • per-turn writeMu (gorilla concurrent-write protection)
  • initCond / initializing / pendingConn dial-serialization dance
  • readLoop / handleFlushComplete / handleTurnError lifecycle
  • gorilla/websocket dependency and the wsDialFunc injection surface
  • stale-conn / bound-ctxID defense-in-depth in the read path

What stayed: inworldOption + constructor nil-safety fixes, voice/model defaults, LINEAR16@16kHz encoding, context-tagged conversation events, %w error wrapping.

Shape: one turnRunner goroutine per Rapida context drains a sentence channel and fires one POST /tts/v1/voice:stream per sentence. Interrupt cancels the runner context (aborts in-flight reads via http.NewRequestWithContext); Done closes the channel and the runner emits TextToSpeechEndPacket after draining.

tts.go: 595 → 468 LOC (426 → 332 non-blank/comment).

Tests rewritten against httptest.Server:

  • TestInworldTTSStreamsAudioInOrder
  • TestInworldTTSEmitsEndAfterDone
  • TestInworldTTSInterruptCancelsInFlight
  • TestInworldTTSServerErrorSurfaces
  • TestInworldTTSKeepAliveReusesConn — the latency-parity assertion: three sentences must reuse a single TCP connection

All pass under -race.

@coderabbitai please re-review.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

@cshape: The HTTP-streaming pivot is a solid architectural call — if the text aggregator already delivers sentence-complete chunks, there's no value in the WS character-streaming path, and trading the gorilla write-serialization + dial-serialization + stale-conn complexity for a shared http.Transport with keep-alive is a net win in both code surface and review risk. The TestInworldTTSKeepAliveReusesConn test as a latency-parity assertion is a particularly clean way to lock in the "still fast" property.

Kicking off a full re-review of the updated implementation now.

Please re-review the Inworld TTS transformer after a significant architectural change in commit 9ced138: switched from bidirectional WebSocket to HTTP streaming (POST /tts/v1/voice:stream per sentence via turnRunner goroutine). Dropped gorilla/websocket, writeMu, initCond/pendingConn, readLoop/handleFlushComplete/handleTurnError. Tests rewritten against httptest.Server. Focus on the new HTTP streaming path, turnRunner lifecycle, interrupt/done handling, connection reuse, and error surfacing.


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: When adding new STT/TTS providers, follow the complete guide in claude-skills/adding-stt-tts-provider.md covering provider registry, UI config components, backend transformer, factory wiring, required events/metrics, and checklist
✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
api/assistant-api/internal/transformer/inworld/tts.go (2)

330-333: Consider limiting error body read size.

io.ReadAll(resp.Body) on error responses could read an unbounded amount of data if the server misbehaves. Consider using io.LimitReader:

♻️ Suggested fix
 	if resp.StatusCode != http.StatusOK {
-		respBody, _ := io.ReadAll(resp.Body)
+		respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
 		return fmt.Errorf("inworld-tts: unexpected status %d: %s", resp.StatusCode, string(respBody))
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 330 -
333, The error path that reads the HTTP response body after a non-200 in the TTS
call uses io.ReadAll(resp.Body) which can exhaust memory; change it to read a
bounded amount using io.LimitReader (e.g., wrap resp.Body with
io.LimitReader(resp.Body, maxErrorBodySize)) and decode that limited reader into
a []byte for the error message; introduce a small constant like maxErrorBodySize
(e.g., 4KB or 16KB) near the function and use it when constructing the formatted
error in the block that checks resp.StatusCode != http.StatusOK so the returned
error includes the truncated body safely.

171-184: Potential blocking on full sentence buffer.

The sentences channel has a buffer of 8 (line 224). If the LLM produces deltas faster than synth() can process them, tr.sentences <- input.Text could block, stalling the caller. This is likely fine in practice since:

  1. Each "sentence" from the aggregator is already a complete phrase
  2. HTTP streaming latency limits throughput

However, consider whether this buffer size is appropriate for your expected delta rate.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 171 -
184, The send to tr.sentences in the LLMResponseDeltaPacket case can block if
the buffer fills (tr.sentences <- input.Text), stalling the caller; change this
to a non-blocking send (use a select with tr.sentences <- input.Text case and a
default case that drops or counts/logs the overflow) or alternatively spawn a
goroutine to enqueue so it never blocks the caller, or increase the channel
buffer size; update code around getOrCreateRunner / tr.sentences and ensure any
dropped-delta behavior is accounted for (and consider logging/metrics when the
default branch is taken) so synth() and onPacket logic remain unchanged.
api/assistant-api/internal/transformer/inworld/inworld_integration_test.go (1)

9-12: Stale comment references WebSocket endpoint.

The comment mentions wss://api.inworld.ai but the implementation now uses HTTP streaming per the architectural change noted in the PR summary. Update to reflect the actual endpoint.

-// Inworld integration tests — verify connection, event sequencing, and
-// audio I/O against the real wss://api.inworld.ai TTS endpoint. Skipped
+// Inworld integration tests — verify connection, event sequencing, and
+// audio I/O against the real https://api.inworld.ai TTS endpoint. Skipped
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/inworld_integration_test.go`
around lines 9 - 12, Update the top-of-file comment in
inworld_integration_test.go to remove the stale WebSocket URL and describe that
tests use the HTTP streaming TTS endpoint (instead of wss://api.inworld.ai) —
keep the rest of the context about verifying connection, event sequencing, audio
I/O, and the automatic skip when integration_config.yaml is missing or the
provider is disabled; edit the file-level comment block near the test header so
it accurately references HTTP streaming/HTTP endpoint rather than WebSocket.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@api/assistant-api/internal/transformer/inworld/inworld_integration_test.go`:
- Around line 9-12: Update the top-of-file comment in
inworld_integration_test.go to remove the stale WebSocket URL and describe that
tests use the HTTP streaming TTS endpoint (instead of wss://api.inworld.ai) —
keep the rest of the context about verifying connection, event sequencing, audio
I/O, and the automatic skip when integration_config.yaml is missing or the
provider is disabled; edit the file-level comment block near the test header so
it accurately references HTTP streaming/HTTP endpoint rather than WebSocket.

In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 330-333: The error path that reads the HTTP response body after a
non-200 in the TTS call uses io.ReadAll(resp.Body) which can exhaust memory;
change it to read a bounded amount using io.LimitReader (e.g., wrap resp.Body
with io.LimitReader(resp.Body, maxErrorBodySize)) and decode that limited reader
into a []byte for the error message; introduce a small constant like
maxErrorBodySize (e.g., 4KB or 16KB) near the function and use it when
constructing the formatted error in the block that checks resp.StatusCode !=
http.StatusOK so the returned error includes the truncated body safely.
- Around line 171-184: The send to tr.sentences in the LLMResponseDeltaPacket
case can block if the buffer fills (tr.sentences <- input.Text), stalling the
caller; change this to a non-blocking send (use a select with tr.sentences <-
input.Text case and a default case that drops or counts/logs the overflow) or
alternatively spawn a goroutine to enqueue so it never blocks the caller, or
increase the channel buffer size; update code around getOrCreateRunner /
tr.sentences and ensure any dropped-delta behavior is accounted for (and
consider logging/metrics when the default branch is taken) so synth() and
onPacket logic remain unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: a6f50131-b911-4cdd-b038-ab735fc58e34

📥 Commits

Reviewing files that changed from the base of the PR and between dfffe85 and 9ced138.

📒 Files selected for processing (5)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
✅ Files skipped from review due to trivial changes (1)
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • api/assistant-api/internal/transformer/inworld/internal/type.go

… comment

- synth(): wrap the error-path resp.Body in io.LimitReader(4 KiB) so a
  misbehaving server can't force an unbounded buffer. 4 KiB is plenty
  for any JSON error envelope Inworld actually returns.
- inworld_integration_test.go: replace the stale wss:// endpoint
  reference with the real https://api.inworld.ai HTTP streaming URL.

Declined: the suggestion to drop sentences on a full runner buffer.
The 8-slot channel is backpressure, not a leak — silently dropping
audio would be a correctness regression. Coderabbit's own note
acknowledged the current shape is "likely fine in practice."
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

Thanks for the round 3 pass! Addressed in 496ab60:

  • Error body read cap — fixed. Wrapped the non-200 resp.Body in io.LimitReader(4*1024) so a misbehaving server can't force us to buffer an unbounded response. 4 KiB is plenty for any JSON error envelope Inworld actually returns.
  • Stale WS comment in inworld_integration_test.go — fixed. Now references https://api.inworld.ai + POST /tts/v1/voice:stream.

Declined: sentence-channel drop-on-full. The 8-slot sentences channel is intentional backpressure, not a leak. Silently dropping a sentence on a full buffer would be a correctness regression — the user would hear half the assistant's turn with no indication anything went wrong. Blocking the Transform() caller is the right failure mode here: it applies backpressure up into the LLM pipeline, which is exactly what we want when TTS can't keep up. Your own note acknowledged "likely fine in practice since each sentence is already a complete phrase and HTTP streaming latency limits throughput" — I agree, and leaving the current shape in matches that reasoning.

@coderabbitai ready for re-review.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

@cshape: Both fixes look clean:

  • io.LimitReader(4*1024) on the non-200 body path is the right call — 4 KiB is more than sufficient for any Inworld error envelope, and it closes the unbounded-buffer risk cleanly.
  • Stale WS comment — good housekeeping, aligns the test file with the current HTTP streaming design.

On the declined item: the backpressure argument is correct. Silently dropping a sentence would be a silent correctness regression (partial audio output with no signal to the caller), whereas blocking Transform() propagates pressure up through the LLM pipeline, which is exactly the right behaviour when TTS falls behind. Happy to close that out.

Kicking off the re-review now.

Please re-review the Inworld TTS transformer after round 4 fixes in commit 496ab60: non-200 error body capped with io.LimitReader(4*1024) and stale WebSocket comment in inworld_integration_test.go updated to reference https://api.inworld.ai + POST /tts/v1/voice:stream. The sentence-channel blocking behaviour was intentionally retained as backpressure. Focus on verifying the error-body cap and any remaining issues in the HTTP streaming path.


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go
✅ Actions performed

Full review triggered.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

@cshape: The HTTP-streaming pivot is a solid architectural call — if the text aggregator already delivers sentence-complete chunks, there's no value in the WS character-streaming path, and trading the gorilla write-serialization + dial-serialization + stale-conn complexity for a shared http.Transport with keep-alive is a net win in both code surface and review risk. The TestInworldTTSKeepAliveReusesConn test as a latency-parity assertion is a particularly clean way to lock in the "still fast" property.

Kicking off a full re-review of the updated implementation now.

Please re-review the Inworld TTS transformer after a significant architectural change in commit 9ced138: switched from bidirectional WebSocket to HTTP streaming (POST /tts/v1/voice:stream per sentence via turnRunner goroutine). Dropped gorilla/websocket, writeMu, initCond/pendingConn, readLoop/handleFlushComplete/handleTurnError. Tests rewritten against httptest.Server. Focus on the new HTTP streaming path, turnRunner lifecycle, interrupt/done handling, connection reuse, and error surfacing.


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/type/**/*.go : Define core voice AI pipeline interfaces (Communication, VAD, EndOfSpeech, Denoiser, Resampler, Recorder, Streamer, Aggregator, Normalizer, Transformers) in api/assistant-api/internal/type/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: When adding new STT/TTS providers, follow the complete guide in claude-skills/adding-stt-tts-provider.md covering provider registry, UI config components, backend transformer, factory wiring, required events/metrics, and checklist

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/end_of_speech/**/*.go : End-of-speech provider implementations should be scoped to `api/assistant-api/internal/end_of_speech/internal/<provider>/` with core logic in `api/assistant-api/internal/end_of_speech/end_of_speech.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/denoiser/**/*.go : Noise reduction provider implementations should be scoped to `api/assistant-api/internal/denoiser/internal/<provider>/` with core logic in `api/assistant-api/internal/denoiser/denoiser.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/vad/**/*.go : VAD provider implementations should be scoped to `api/assistant-api/internal/vad/internal/<provider>/` with core logic in `api/assistant-api/internal/vad/vad.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/assistant/**/*.go : Implement assistant, provider, tool, and webhook CRUD handlers in assistant/ package
✅ Actions performed

Full review triggered.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/assistant-api/internal/transformer/inworld/inworld_integration_test.go`:
- Around line 9-12: Update the file header comment in
inworld_integration_test.go so it references the HTTP transport (and the HTTP
TTS endpoint) instead of the old WebSocket path; replace mentions of "WebSocket"
and "wss://api.inworld.ai" with wording like "HTTP transport" and
"https://api.inworld.ai" or "api.inworld.ai HTTP TTS endpoint" so the header
accurately describes the current integration test target.

In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 139-168: The interruption branch currently cancels and deletes
every entry in it.turns regardless of which context was signalled; change it to
only cancel/delete runner(s) whose context id matches input.ContextID (or whose
turn metadata indicates they belong to that ContextID) instead of iterating all
entries. Inside the internal_type.InterruptionDetectedPacket case, acquire
it.mu, collect only matching tr.runCancel funcs and remove only those keys from
it.turns, update it.activeContext only if it.activeContext == input.ContextID,
then unlock and invoke the collected cancels and the same it.onPacket(...) calls
to emit the interrupted/initialized events.
- Around line 257-293: The runner goroutine reading from tr.sentences can stay
blocked forever when the runner is removed (via
InterruptionDetectedPacket/Close) because no one wakes the receive; modify the
shutdown path so removing a runner also signals/unblocks its goroutine by
closing or sending a terminal sentinel on tr.sentences: add a safe-close
mechanism (e.g., a tr.closeSentences() that uses sync.Once) and invoke it from
runCancel/Close/InterruptionDetectedPacket removal code; update the synth loop
(the range over tr.sentences in the goroutine that calls it.synth) to properly
handle the channel close (exit cleanly without emitting duplicate
TextToSpeechEndPacket) so the goroutine always terminates when the runner is
canceled or removed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 1688fd86-f469-49f2-ba61-f899a1960f59

📥 Commits

Reviewing files that changed from the base of the PR and between a1aef98 and 9ced138.

📒 Files selected for processing (8)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
  • api/assistant-api/internal/transformer/transformer.go
  • api/assistant-api/internal/transformer/transformer_test.go

Comment thread api/assistant-api/internal/transformer/inworld/inworld_integration_test.go Outdated
Comment thread api/assistant-api/internal/transformer/inworld/tts.go
Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated
Two real bugs from the round-4 review:

1. **Idle runner leak.** The per-turn goroutine was ranging over
   tr.sentences. runCancel() cancels the context but does NOT wake a
   goroutine parked on a channel receive — so if an interrupt fired
   while the runner was idle between synths, it stayed parked forever.
   Replace the range with a select on both runCtx.Done() AND the
   sentence channel. The select is the load-bearing change: a channel
   close (Done path) still drains terminally, and a context cancel
   (interrupt path) wakes and returns without a terminal end packet.
   Also guard against the Done-then-Interrupt race in the !ok branch
   so a race can't emit a spurious end packet after an interrupt.

2. **Interrupt scope.** InterruptionDetectedPacket was tearing down
   every live runner regardless of input.ContextID. In Rapida's
   pipeline only one turn is usually active, but if two contexts ever
   synthesize in parallel this would drop audio/end packets for the
   bystander. Scope the cancellation to input.ContextID when set;
   fall through to the original session-wide teardown only if the
   ContextID is empty (defensive — the dispatcher should always set
   it). Only clear activeContext if it matches the interrupted ID.

New regression tests:

- TestInworldTTSInterruptWakesIdleRunner — Delta, wait for audio, fire
  interrupt; runner must be reaped within 2s. Times out without the
  select fix.
- TestInworldTTSInterruptScopedToContext — two turns running in
  parallel (turn B held open by a release channel); interrupt turn A
  and assert turn B still produces its terminal end packet.

All 18 unit tests pass under -race.
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

Round 4 addressed in 6a1d977. Two real bugs fixed plus a confirmation on the third.

#1 Idle runner leak (major) — FIXED. Well caught. The runner's `for text := range tr.sentences` parked forever if an interrupt fired while it was idle between synths, because `runCancel()` doesn't wake a channel receive. Replaced with a `select` on both `runCtx.Done()` and the sentence channel:

```go
for {
select {
case <-tr.runCtx.Done():
return // interrupt or Close — no end packet
case text, ok := <-tr.sentences:
if !ok {
if tr.runCtx.Err() != nil { return } // Done-then-Interrupt race guard
// emit terminal end packet
return
}
// synth(); on runCtx.Err() return, on other err log + continue
}
}
```

Regression test `TestInworldTTSInterruptWakesIdleRunner` drives the exact parked-goroutine state — Delta, wait for audio, interrupt — and asserts the runner is reaped within 2s. Times out without the select fix.

#2 Interrupt scope (major) — FIXED. Also well caught. The branch was tearing down every live runner regardless of `input.ContextID`. In Rapida's pipeline only one turn is usually active, but the scope mismatch would drop audio/end packets for a bystander turn if two contexts ever run in parallel. Now:

  • If `input.ContextID` is set → cancel only that runner, clear `activeContext` only if it matches.
  • If empty (defensive — shouldn't happen) → fall through to the old session-wide teardown so an interrupt is never silently ignored.

Regression test `TestInworldTTSInterruptScopedToContext` holds turn B's synth open via a release channel, interrupts turn A, and asserts turn B still produces its terminal end packet.

#3 Stale WS comment in `inworld_integration_test.go` — already fixed in 496ab60 (previous round). Header now references `https://api.inworld.ai\` + `POST /tts/v1/voice:stream`. Looks like the round-4 diff view was reading an older revision.

All 18 unit tests pass under `-race`.
@coderabbitai ready for re-review.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

Tip

For best results, initiate chat on the files or code changes.

@cshape: Both fixes are well-executed — thanks for the thorough round 5 write-up.

#1 — Idle runner leak (select fix): The select { case <-runCtx.Done(): ... case text, ok := <-tr.sentences: ... } shape is the correct Go idiom here — a plain range over a channel can't be unblocked by a context cancellation, so the goroutine would have parked until the channel was closed (which an interrupt never does). The Done-then-Interrupt race guard (if tr.runCtx.Err() != nil { return } after the channel-closed branch) is the right defensive check for the narrow window where Done closes the channel and an Interrupt races in before the runner exits. TestInworldTTSInterruptWakesIdleRunner with the 2s timeout is a clean regression pin for this.

#2 — Interrupt scope: Scoping the cancel to input.ContextID and only clearing activeContext when it matches is the correct fix — the prior session-wide teardown was over-broad. The empty-ContextID fallback to session-wide teardown is the right defensive posture (interrupt must never be silently ignored). TestInworldTTSInterruptScopedToContext holding turn B's synth open via a release channel is a solid concurrent regression test for this.

#3 — Stale comment: Confirmed already handled in 496ab60 — the round 4 diff view was just picking up an older tree revision.

Kicking off a fresh review now.

[full_review]


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (2)
api/assistant-api/internal/transformer/inworld/tts.go (2)

139-168: ⚠️ Potential issue | 🟠 Major

Interrupt should only cancel the matching turn.

Line 141 walks the entire it.turns map, so interrupting one ContextID tears down every other active synthesis on the transformer.

🔧 Suggested fix
 	case internal_type.InterruptionDetectedPacket:
 		it.mu.Lock()
-		cancels := make([]context.CancelFunc, 0, len(it.turns))
-		for id, tr := range it.turns {
-			cancels = append(cancels, tr.runCancel)
-			delete(it.turns, id)
-		}
-		it.activeContext = ""
+		tr, ok := it.turns[input.ContextID]
+		if ok {
+			delete(it.turns, input.ContextID)
+		}
+		if it.activeContext == input.ContextID {
+			it.activeContext = ""
+		}
 		it.mu.Unlock()
-		for _, c := range cancels {
-			c()
+		if ok {
+			tr.runCancel()
 		}
 		_ = it.onPacket(internal_type.ConversationEventPacket{
 			ContextID: input.ContextID,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 139 -
168, The interrupt handler for internal_type.InterruptionDetectedPacket is
cancelling all entries in it.turns instead of only the turn matching the
incoming packet's ContextID; change the logic in that case to locate the
specific turn by input.ContextID (or input.ContextID -> matching key in
it.turns), call its runCancel, remove only that entry from it.turns, and leave
other turns intact; also ensure activeContext and the subsequent onPacket calls
still reflect the interrupted ContextID (use it.activeContext = "" only when the
interrupted turn was the active one) and retain the existing initialized/tts
event emission behavior after cancelling the single matching turn.

257-283: ⚠️ Potential issue | 🟠 Major

Canceled idle runners can leak forever.

runCancel() only aborts an in-flight synth(). When a runner is idle on for text := range tr.sentences, canceling it from InterruptionDetectedPacket or Close() never wakes the goroutine, so it remains parked after being removed from it.turns.

🔧 Suggested fix
-	interrupted := false
-	for text := range tr.sentences {
-		if err := it.synth(tr.runCtx, tr, text); err != nil {
-			if tr.runCtx.Err() != nil {
-				// Canceled by an interrupt — drain any remaining queued
-				// sentences and exit without a terminal end packet.
-				interrupted = true
-				break
-			}
-			it.logger.Errorf("inworld-tts: synth failed for ctx=%s: %v", tr.ctxID, err)
-			// Surface the error to the observability channel but keep
-			// draining — subsequent sentences may succeed and, critically,
-			// callers waiting on TextToSpeechEndPacket still need one.
-			_ = it.onPacket(internal_type.ConversationEventPacket{
-				ContextID: tr.ctxID,
-				Name:      "tts",
-				Data:      map[string]string{"type": "error", "message": err.Error()},
-				Time:      time.Now(),
-			})
-		}
-	}
-
-	if interrupted {
-		// Interrupt path already emitted its own tts=interrupted event on
-		// the Transform side; nothing more to send here.
-		return
-	}
-
-	_ = it.onPacket(
-		internal_type.TextToSpeechEndPacket{ContextID: tr.ctxID},
-		internal_type.ConversationEventPacket{
-			ContextID: tr.ctxID,
-			Name:      "tts",
-			Data:      map[string]string{"type": "completed"},
-			Time:      time.Now(),
-		},
-	)
+	for {
+		select {
+		case <-tr.runCtx.Done():
+			return
+		case text, ok := <-tr.sentences:
+			if !ok {
+				_ = it.onPacket(
+					internal_type.TextToSpeechEndPacket{ContextID: tr.ctxID},
+					internal_type.ConversationEventPacket{
+						ContextID: tr.ctxID,
+						Name:      "tts",
+						Data:      map[string]string{"type": "completed"},
+						Time:      time.Now(),
+					},
+				)
+				return
+			}
+			if err := it.synth(tr.runCtx, tr, text); err != nil {
+				if tr.runCtx.Err() != nil {
+					return
+				}
+				it.logger.Errorf("inworld-tts: synth failed for ctx=%s: %v", tr.ctxID, err)
+				_ = it.onPacket(internal_type.ConversationEventPacket{
+					ContextID: tr.ctxID,
+					Name:      "tts",
+					Data:      map[string]string{"type": "error", "message": err.Error()},
+					Time:      time.Now(),
+				})
+			}
+		}
+	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 257 -
283, The goroutine in the loop "for text := range tr.sentences" can hang if
tr.sentences never closes because runCancel only aborts synth; modify the loop
to select on tr.sentences and tr.runCtx.Done() instead of ranging so
cancellation wakes the goroutine (e.g., replace the range with: for { select {
case text, ok := <-tr.sentences: if !ok { break } if err := it.synth(...); ... }
case <-tr.runCtx.Done(): interrupted = true; break } }), and ensure callers that
signal interruption (InterruptionDetectedPacket handling and Close()) cancel the
runCtx or close the channel consistently so the new select branch unblocks;
update references to synth, runCancel, runCtx, tr.sentences,
InterruptionDetectedPacket and Close accordingly.
🧹 Nitpick comments (1)
api/assistant-api/internal/transformer/inworld/tts.go (1)

82-85: Wrap the constructor error before returning it.

This logs context but returns the naked err, so callers lose the NewInworldTextToSpeech frame in the error chain.

♻️ Suggested change
 	iwOpts, err := NewInworldOption(logger, credential, opts)
 	if err != nil {
 		logger.Errorf("inworld-tts: initializing inworld failed %+v", err)
-		return nil, err
+		return nil, fmt.Errorf("inworld-tts: initialize options: %w", err)
 	}

As per coding guidelines "**/*.go: Return errors with context wrapping using fmt.Errorf with %w verb for error chain preservation".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@api/assistant-api/internal/transformer/inworld/tts.go` around lines 82 - 85,
The constructor logs the error from NewInworldOption but returns the raw err
losing call-site context; update the error return in NewInworldTextToSpeech to
wrap err using fmt.Errorf with the %w verb (e.g., fmt.Errorf("inworld-tts:
initializing inworld failed: %w", err)) so the error chain preserves the
NewInworldTextToSpeech frame while keeping the existing logger.Errorf call that
logs the detailed error.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 139-168: The interrupt handler for
internal_type.InterruptionDetectedPacket is cancelling all entries in it.turns
instead of only the turn matching the incoming packet's ContextID; change the
logic in that case to locate the specific turn by input.ContextID (or
input.ContextID -> matching key in it.turns), call its runCancel, remove only
that entry from it.turns, and leave other turns intact; also ensure
activeContext and the subsequent onPacket calls still reflect the interrupted
ContextID (use it.activeContext = "" only when the interrupted turn was the
active one) and retain the existing initialized/tts event emission behavior
after cancelling the single matching turn.
- Around line 257-283: The goroutine in the loop "for text := range
tr.sentences" can hang if tr.sentences never closes because runCancel only
aborts synth; modify the loop to select on tr.sentences and tr.runCtx.Done()
instead of ranging so cancellation wakes the goroutine (e.g., replace the range
with: for { select { case text, ok := <-tr.sentences: if !ok { break } if err :=
it.synth(...); ... } case <-tr.runCtx.Done(): interrupted = true; break } }),
and ensure callers that signal interruption (InterruptionDetectedPacket handling
and Close()) cancel the runCtx or close the channel consistently so the new
select branch unblocks; update references to synth, runCancel, runCtx,
tr.sentences, InterruptionDetectedPacket and Close accordingly.

---

Nitpick comments:
In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 82-85: The constructor logs the error from NewInworldOption but
returns the raw err losing call-site context; update the error return in
NewInworldTextToSpeech to wrap err using fmt.Errorf with the %w verb (e.g.,
fmt.Errorf("inworld-tts: initializing inworld failed: %w", err)) so the error
chain preserves the NewInworldTextToSpeech frame while keeping the existing
logger.Errorf call that logs the detailed error.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 084ca9b2-b8fe-4230-9d85-33b944e43d0b

📥 Commits

Reviewing files that changed from the base of the PR and between a1aef98 and 496ab60.

📒 Files selected for processing (8)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
  • api/assistant-api/internal/transformer/transformer.go
  • api/assistant-api/internal/transformer/transformer_test.go

Attach `X-User-Agent: rapida-sdk` on each Inworld synth request so
Inworld can bucket TTS traffic by integration for usage telemetry.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@api/assistant-api/internal/transformer/inworld/inworld_test.go`:
- Around line 531-535: Replace the raw r.Body.Read usage with a JSON decoder:
use json.NewDecoder(r.Body).Decode into a temporary map or struct (same pattern
used in TestInworldTTSStreamsAudioInOrder) instead of reading a 256-byte buffer,
then inspect the decoded field (e.g., the text or payload key) to determine if
it contains "B-text"; update the conditional that currently checks
strings.Contains(body, "B-text") to check the decoded value instead. Ensure you
handle/propagate the decode error in the test to fail on malformed input.

In `@api/assistant-api/internal/transformer/inworld/tts.go`:
- Around line 192-202: The code currently always emits a "tts" speaking event
via it.onPacket even when the sentence was dropped by tr.runCtx.Done(); move the
it.onPacket call into the successful send branch so it only runs when
tr.sentences <- input.Text actually succeeds—i.e., in the case that sends to
tr.sentences (not after the select), call it.onPacket with the
internal_type.ConversationEventPacket containing ContextID: input.ContextID and
Data {"type":"speaking","text":input.Text}; do not emit the event in the
<-tr.runCtx.Done() branch.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c60f8041-5202-4892-963a-fd12ce866cda

📥 Commits

Reviewing files that changed from the base of the PR and between a1aef98 and 6a1d977.

📒 Files selected for processing (8)
  • api/assistant-api/internal/transformer/inworld/internal/type.go
  • api/assistant-api/internal/transformer/inworld/inworld.go
  • api/assistant-api/internal/transformer/inworld/inworld_integration_test.go
  • api/assistant-api/internal/transformer/inworld/inworld_test.go
  • api/assistant-api/internal/transformer/inworld/tts.go
  • api/assistant-api/internal/transformer/testdata/integration_config.yaml.example
  • api/assistant-api/internal/transformer/transformer.go
  • api/assistant-api/internal/transformer/transformer_test.go

Comment thread api/assistant-api/internal/transformer/inworld/inworld_test.go Outdated
Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated
Comment thread api/assistant-api/internal/transformer/inworld/tts.go Outdated
Three issues from round 5:

1. **Channel-close race (major).** close(tr.sentences) from Transform's
   Done path can race catastrophically with a concurrent delta send on
   the same channel (panic: send on closed channel) or with a
   duplicate Done packet (panic: close of closed channel). Rapida's
   dispatcher does not guarantee serial Transform calls per
   ContextID, so both races are reachable.

   Fix: introduce a sentenceItem { text, done } wire type and carry
   the Done signal as an in-band sentinel. Nothing ever closes the
   channel; "send on closed" and "close of closed" become structurally
   impossible. Transform's Done path does a select-send of
   {done: true} guarded by runCtx.Done (so an interrupt that already
   tore down the runner doesn't deadlock). run()'s receive branches
   on item.done to emit the terminal end packet.

2. **Stray speaking event for a dropped delta (minor).** The delta
   path emitted the tts=speaking event unconditionally, even when
   runCtx.Done won the select and the text was discarded. Moved the
   emission into the successful-send branch so we never publish
   "speaking: X" for text the runner will never synthesize.

3. **Unreliable body read in test (minor).** A single r.Body.Read into
   a stack buffer is permitted to short-read under io.Reader. Replace
   with json.NewDecoder — matches the pattern already used in
   TestInworldTTSStreamsAudioInOrder, and removes the now-unused
   strings import as a byproduct.

All 18 unit tests still pass under -race.
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

Round 5 addressed in 0e6b252.

#1 Channel-close race (major) — FIXED. Real bug, good catch. `close(tr.sentences)` from Transform's Done path was racy on two dimensions: a concurrent delta send on the same channel panics with send on closed channel, and a duplicate Done panics with close of closed channel. Rapida's dispatcher doesn't guarantee serial Transform calls per ContextID, so both races are reachable in theory.

Fix: introduced a `sentenceItem { text, done }` type and carry the Done signal as an in-band sentinel. Nothing ever closes the channel, so both panics are structurally impossible. Transform's Done path does a select-send of `{done: true}` guarded by `runCtx.Done()` so an interrupt that already reaped the runner doesn't deadlock the send.

```go
// Transform, Done path:
select {
case tr.sentences <- sentenceItem{done: true}:
case <-tr.runCtx.Done():
}

// run() receive branch:
case item := <-tr.sentences:
if item.done {
if tr.runCtx.Err() != nil { return } // Interrupt raced Done
// emit terminal end packet
return
}
synth(item.text)
```

#2 Stray speaking event (minor) — FIXED. The `tts=speaking` event was emitted unconditionally, even when `runCtx.Done` won the select and the text was dropped. Moved the emission into the successful-send branch so we never publish "speaking: X" for text the runner will never synthesize.

#3 `r.Body.Read` in test (minor) — FIXED. Swapped to `json.NewDecoder` to match the pattern in `TestInworldTTSStreamsAudioInOrder`. Removed the now-unused `strings` import as a byproduct.

All 18 unit tests still pass under `-race`.
@coderabbitai ready for re-review.

@coderabbitai

coderabbitai Bot commented Apr 15, 2026

Copy link
Copy Markdown

Tip

For best results, initiate chat on the files or code changes.

@cshape: All three round 5 fixes look correct and well-executed.

#1 — Channel-close race: The sentenceItem sentinel approach is the right structural fix — by never closing the channel, the "send on closed" and "close of closed" panics are made structurally impossible rather than guarded against at runtime. The select { case tr.sentences <- sentenceItem{done: true}: case <-tr.runCtx.Done(): } guard correctly handles the Interrupt-races-Done window, and the symmetric if tr.runCtx.Err() != nil { return } in run() covers the reverse ordering. Clean solution.

#2 — Stray speaking event: Moving the tts=speaking emission into the successful-send branch is the right fix — emitting "speaking: X" for text the runner will never synthesize would give downstream consumers a false signal about what audio to expect.

#3json.NewDecoder in test: Good consistency cleanup; removing the now-unused strings import is a nice byproduct.

Kicking off a fresh review now.

[full_review]


🧠 Learnings used
Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-04-02T09:37:15.419Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT and TTS provider implementations should be scoped to `api/assistant-api/internal/transformer/<provider>/` with core logic in `api/assistant-api/internal/transformer/transformer.go`

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Support voice providers (AssemblyAI, AWS, Azure, Cartesia, Deepgram, ElevenLabs, Google, OpenAI, Resemble, Rev.ai, Sarvam, Speechmatics) with transformers in assistant-api/internal/transformer/

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: .github/copilot-instructions.md:0-0
Timestamp: 2026-04-02T09:36:44.972Z
Learning: Applies to **/transformer/**/*.go : Implement STT/TTS transformers using provider-specific structure under api/assistant-api/internal/transformer/<provider>/ with stt.go, tts.go, normalizer.go, and internal/ subdirectory for SDK wrappers

Learnt from: CR
Repo: rapidaai/voice-ai PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-02T09:37:01.760Z
Learning: Applies to api/assistant-api/internal/transformer/**/*.go : STT/TTS provider implementations under api/assistant-api/internal/transformer/ must implement Transformers[AudioPacket] (STT) and Transformers[TextPacket] (TTS) interfaces, with files following the pattern: <provider>.go, stt.go, tts.go, normalizer.go

cshape added 2 commits April 15, 2026 16:48
…ic doc

Three small cleanups surfaced by a consistency + Go-idiom review:

1. **Drain non-200 response body.** synth() reads up to 4 KiB via
   io.LimitReader on the error path, but leaves the rest unread. Under
   HTTP/2, http.Transport won't return the underlying conn to the idle
   pool until the body is fully consumed — so a larger error response
   silently defeats keep-alive on the very conn the next sentence
   would reuse. Add io.Copy(io.Discard, resp.Body) after the limited
   read.

2. **Clear activeContext in run()'s exit defer.** It's set in
   getOrCreateRunner and cleared on the Interrupt path, but was
   previously left stale on the Done path — so diagnostics kept
   reporting a finished ctx id as "active". Mirror the Interrupt path
   and clear it if it still points at us.

3. **Tighten maybeEmitFirstChunkMetric doc.** The prior comment said
   "runner is the only writer," which is misleading — tr.startedAt is
   set in getOrCreateRunner on the *caller* goroutine. The memory
   ordering is still safe (the write happens-before `go it.run`), but
   the comment should say so rather than overstate single-writer
   ownership.

No behavior change for tr.metricSent; only the docstring is updated.
Two threads of improvement from the round-6 review:

**Test injectability.** The prior tests wrapped their HTTP client with
a rewritingRoundTripper that rewrote every request's host from
api.inworld.ai to the httptest server — ~40 LOC of plumbing to work
around a hard-coded constant in synth(). Move the target URL onto the
inworldTTS struct as a `streamURL` field (initialized to
INWORLD_STREAM_URL by NewInworldTextToSpeech, overridable by
in-package tests). synth() now reads it off the struct. Tests set
streamURL=srv.URL directly and drop the rewriter. No production API
change.

**New regression tests** covering branches the suite previously
skipped:

- TestInworldTTSHTTPErrorStatus — 401 response surfaces as tts=error
  AND still emits a terminal end packet so callers unblock.
- TestInworldTTSMalformedChunkSkipped — one malformed-JSON line and
  one bad-base64 line mid-stream are logged and dropped; valid chunks
  before and after still flow through.
- TestInworldTTSUnsupportedPacketErrors — the default: branch of
  Transform returns a clean error for an unknown packet type, no
  panic, no leaked goroutines.
- TestInworldTTSDoneWithoutDelta — Done fired for a ctx that never
  produced a delta is a no-op (matches the empty-LLM-response case).
- TestInworldTTSDuplicateDone — firing Done twice doesn't panic
  (would have hit close-of-closed-channel before the sentinel-item
  refactor) and emits exactly one terminal end packet.

All 23 unit tests pass under -race.
@cshape

cshape commented Apr 15, 2026

Copy link
Copy Markdown
Author

Round 6 — ran three parallel code-review agents (consistency vs. peers, Go idioms, test quality) and landed their actionable findings in two commits.

ef3b755 — correctness + cleanup.

  • Drain non-200 response body. `synth()` was reading up to 4 KiB via `io.LimitReader` on the error path then returning, leaving the tail of the body on the wire. Under HTTP/2, `http.Transport` won't return the conn to the idle pool until the body is fully consumed, so a larger error response silently defeated keep-alive on the very conn the next sentence would reuse. Added `io.Copy(io.Discard, resp.Body)` after the limited read.
  • Clear `activeContext` on Done path. The Interrupt path already cleared it; the Done path left it stale. Diagnostics-only, but inconsistent — now cleared symmetrically in `run()`'s exit defer.
  • Tighten `maybeEmitFirstChunkMetric` doc comment. Prior comment claimed "runner is the only writer." Misleading: `tr.startedAt` is set in `getOrCreateRunner` on the parent goroutine. The memory ordering is still safe (happens-before `go it.run`), but the comment now says so explicitly.

3d11ec5 — testability + missing coverage.

  • Expose `streamURL` as an injectable field. Previously the test suite worked around the hard-coded `INWORLD_STREAM_URL` constant with a ~40-LOC `rewritingRoundTripper` that rewrote every request's host. Moved the target URL onto `inworldTTS` as a `streamURL` field, initialized by the production constructor. Tests now just set `streamURL=srv.URL` and use a plain `*http.Client`. No production API change; the factory constructor signature is untouched.
  • 5 new regression tests for branches the suite previously didn't exercise:
    • `TestInworldTTSHTTPErrorStatus` — 401 surfaces as `tts=error` AND still produces a terminal end packet.
    • `TestInworldTTSMalformedChunkSkipped` — malformed-JSON and bad-base64 lines mid-stream are dropped; valid chunks before and after still flow.
    • `TestInworldTTSUnsupportedPacketErrors` — the `default:` branch of Transform returns a clean error, no panic.
    • `TestInworldTTSDoneWithoutDelta` — Done for an unknown ctx is a no-op.
    • `TestInworldTTSDuplicateDone` — firing Done twice doesn't panic (would have hit close-of-closed-channel before the round-5 sentinel refactor) and produces exactly one terminal end packet.

Also reviewed and rejected (documenting the reasoning):

  • An alleged `getOrCreateRunner` race (two Transform calls concurrently installing runners for a new ctx id) — false alarm, the map-check and map-write are under a single `it.mu.Lock()`.
  • Divergence from peer transformers on a few axes (custom `*http.Transport`, per-turn goroutine + sentence channel, scoped-to-ContextID interrupt) — all intentional; the transport tuning is load-bearing for the keep-alive reuse assertion, the sentence-channel matches the per-sentence HTTP model, and the scoped interrupt was the explicit round-4 fix.

23 unit tests, all green under `-race`.

cshape and others added 6 commits April 15, 2026 17:03
Inworld's voice:stream endpoint wraps each NDJSON line's LINEAR16
payload in a minimal RIFF/WAVE container — every ~20ms chunk on the
wire is a complete mini-WAV. Concatenating those verbatim (which is
what decodeChunk did before this commit) embeds a 44-byte header deep
inside the audio stream every few milliseconds, causing audible clicks
downstream.

Confirmed against Inworld's own example code: tts_cli.py:163-167
explicitly per-chunk-strips `if chunk[:4]==b'RIFF' then chunk[44:]`,
and the non-streaming example_tts.py:100-101 does the same for the
batch endpoint.

Rapida's pipeline hardcodes LINEAR16 @ 16 kHz mono with no codec
conversion downstream (RAPIDA_INTERNAL_AUDIO_CONFIG at
internal/audio/config.go), so switching to MP3/OGG/MULAW — which
Inworld also offers — is not an option. Stripping is the only path.

New file `wav.go` adds `pcmFromStreamChunk(b)`:
  - If the payload starts with "RIFF...WAVE" (isWAV), walk the RIFF
    chunk list and return the first `data` subchunk's payload. Walking
    handles JUNK/LIST/other padding chunks properly; prior art
    (Inworld's own examples) hardcodes a 44-byte offset which works
    for the simplest case but breaks on wrappers with JUNK alignment.
  - Otherwise pass bytes through unchanged (future-proofs against
    Inworld ever returning bare PCM).
  - In both cases trim a trailing odd byte so we never split a 16-bit
    sample across the chunk boundary.

`decodeChunk` now passes the base64-decoded payload through the helper
before returning.

Tests:

- wav_test.go — unit tests for the helper: raw-PCM passthrough,
  odd-length trim, empty input, WAV data-subchunk extraction,
  odd-data trim, no-data-subchunk returning nil, JUNK before data,
  truncated chunk-size rejection, full isWAV matrix.
- inworld_test.go — TestInworldTTSStripsPerChunkWAVHeader drives two
  WAV-wrapped chunks through the whole decode path and asserts the
  emitted audio packets contain ONLY the data subchunk bytes.

All 31 unit tests pass under -race.
The Go transformer has been live since 9ced138, but the UI dropdown
never learned about it — so users could only configure Inworld by
editing YAML directly. This commit wires the provider into the
settings UI.

- provider.{development,production}.json: register "inworld" with
  featureList: [tts, external] and an API-key configuration. Slot
  alphabetically between elevenlabs and rime.
- ui/src/providers/inworld/tts.json: form config exposing Model and
  Voice dropdowns. No separate Language dropdown — Inworld voices are
  tied to a specific language, so the language ships with each voice
  entry in the dropdown.
- ui/src/providers/inworld/models.json: the two flagship models,
  inworld-tts-1.5-max (quality) and inworld-tts-1.5-mini (latency).
- ui/src/providers/inworld/voices.json: 271 voices across 15
  languages, fetched live from Inworld's GET /tts/v1/voices endpoint
  and stamped into static JSON to match how peer providers
  (elevenlabs, rime, cartesia) ship their catalogs. Each entry carries
  voice_id, a human-readable name with the language suffix (e.g.
  "Ashley — English"), language_id, and the voice description.

Defaults match the Go backend: Ashley / inworld-tts-1.5-max.

Image note: `https://rapida-assets-01.s3.ap-south-1.amazonaws.com/providers/inworld.png`
is referenced but the actual logo needs to be uploaded to the Rapida
S3 bucket by someone with access. A 404 on the image simply falls
back to the alt text in the UI — functional until the asset lands.

Follow-ups not in this PR:
- Dynamic voice fetching via the Inworld API with the user's key
  would require new UI plumbing (no existing provider does this);
  shipping the static list as MVP to unblock configuration now.
- Inworld also offers a PCM encoding that returns raw samples with
  no WAV header. The f292e80 stripper works for LINEAR16 today;
  switching the Go default to PCM would let us drop the stripper.
Inworld's API offers a `PCM` encoding that returns raw little-endian
16-bit samples with no container — the same bytes as LINEAR16 but
without the per-chunk RIFF/WAVE wrapper that was causing clicks. Now
that we know the option exists, switching upstream is cleaner than
shipping a stripper downstream.

Changes:

- inworld.go: INWORLD_AUDIO_ENCODING "LINEAR16" → "PCM" and update
  the surrounding comment to explain why (LINEAR16 adds per-chunk WAV
  headers; PCM is the identical sample stream without the wrapper).
- internal/type.go + GetEncoding doc: refresh LINEAR16 references.
- tts.go: decodeChunk returns the base64-decoded bytes directly; the
  pcmFromStreamChunk hop is no longer load-bearing.
- wav.go, wav_test.go: deleted (130+ LOC removed).
- inworld_test.go: TestInworldTTSStripsPerChunkWAVHeader deleted — it
  exercised the stripper path that no longer exists.

Also drop workspace-scoped custom voices from the UI voices.json —
the initial list pulled everything the key could see including 136
`1framework__*` custom clones. Filter to isCustom:false so the
dropdown only shows the 135 default/stock voices.

All 22 unit tests pass under -race.
Expose audioConfig (speakingRate, sampleRateHertz), temperature,
timestampType, applyTextNormalization, and timestampTransportStrategy
as advanced settings for the Inworld TTS provider.
@iamprashant iamprashant self-requested a review April 16, 2026 09:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant