feat(assistant): add packet-controlled input gate#140
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaces the previous input-gating mechanism with a ChangesDispatch policy routing system and greeting interruptibility
Assistant debugger deployment REST API with timeout validation
Observability collector and recorder refactoring for lazy initialization and async operations
Sequence Diagram(s)sequenceDiagram
participant initializeGreeting
participant DispatchRoute
participant RoutePolicy
participant RequestorChannels
initializeGreeting->>DispatchRoute: ApplyPolicy(UserAudioReceived, Ignore)
DispatchRoute->>RoutePolicy: Apply(Ignore)
DispatchRoute->>RequestorChannels: FlushIngressMatching(UserAudioReceived predicate)
initializeGreeting->>DispatchRoute: ApplyPolicy(InterruptionDetected, Ignore)
DispatchRoute->>RoutePolicy: Apply(Ignore)
DispatchRoute->>RequestorChannels: FlushControlMatching(InterruptionDetected predicate)
initializeGreeting->>initializeGreeting: emit InjectMessagePacket (greeting)
Note over DispatchRoute,RoutePolicy: TTS finishes (HandleTextToSpeechEnd)
DispatchRoute->>RoutePolicy: Apply(UserAudioReceived, Passthrough)
DispatchRoute->>RoutePolicy: Apply(InterruptionDetected, Passthrough)
sequenceDiagram
participant Client
participant CreateAssistantDebuggerDeploymentRest as Handler
participant DeploymentService
participant Gin as Response
Client->>Handler: POST /v1/assistant-deployment/create-debugger-deployment
Handler->>Handler: Extract & validate auth
Handler->>Handler: Bind JSON, validate assistantId
Handler->>Handler: Validate idealTimeout (15–120)
Handler->>Handler: Validate idealTimeoutBackoff (0–5)
Handler->>Handler: Validate maxSessionDuration (180–600)
Handler->>Handler: Construct audio providers
Handler->>DeploymentService: CreateDebuggerDeployment(...)
DeploymentService-->>Handler: AssistantDebuggerDeployment
Handler->>Gin: HTTP 200 + deployment response
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
api/assistant-api/internal/adapters/router/router.go (1)
29-37:⚠️ Potential issue | 🟠 Major | ⚡ Quick winClassify omits
SpeechToTextStartPacket, which breaks input-gate reroutingAt Line 139,
IsInputOriginatedControl(...)marksSpeechToTextStartPacketas input-originated control, but at Line 29-Line 37Classify(...)does not route it asRouteControl. Inapi/assistant-api/internal/adapters/internal/dispatch.go, rerouting behind the input gate only happens forRouteControl, so this packet falls through toRouteBackgroundand bypasses the gate logic.Proposed fix
func Classify(p internal_type.Packet) Route { switch p.(type) { // Critical — interrupts, tool lifecycle case internal_type.InterruptionDetectedPacket, internal_type.TextToSpeechInterruptPacket, internal_type.LLMInterruptPacket, internal_type.DisableInputPacket, internal_type.EnableInputPacket, + internal_type.SpeechToTextStartPacket, internal_type.SpeechToTextEndPacket, internal_type.EndOfSpeechInterruptionPacket, internal_type.TurnChangePacket: return RouteControlAlso applies to: 132-141
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/internal/adapters/router/router.go` around lines 29 - 37, The Classify function is missing SpeechToTextStartPacket so it returns RouteBackground instead of RouteControl and bypasses the input-gate rerouting; update the Classify switch (the case list that returns RouteControl) to include internal_type.SpeechToTextStartPacket so SpeechToTextStartPacket is classified as RouteControl (consistent with IsInputOriginatedControl) and will be rerouted behind the input gate in dispatch logic.
🧹 Nitpick comments (1)
api/assistant-api/internal/adapters/channel/channel.go (1)
231-246: 💤 Low valueDequeued packet dropped on context cancellation while input blocked.
When a packet is dequeued from
ingressCh(line 239) but input becomes blocked before processing, the secondwaitInputEnabled(line 240) may returnfalseif the context is cancelled. The packet in local variableeis silently dropped.This is likely acceptable since context cancellation implies session termination, but the behavior should be documented.
📝 Suggested documentation
func (c *RequestorChannels) RunIngress(ctx context.Context, onEnvelope func(Envelope)) { + // Note: If input is disabled after dequeueing a packet and the context + // is then cancelled, the dequeued packet will be dropped. This is + // acceptable since context cancellation indicates session termination. for { if !c.waitInputEnabled(ctx) { return🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/internal/adapters/channel/channel.go` around lines 231 - 246, The RunIngress loop can dequeue an Envelope from ingressCh into local variable e and then return without processing it if waitInputEnabled(ctx) becomes false (e.g., ctx cancellation), which silently drops the packet; update the RunIngress function (and/or its doc comment) to explicitly document this behavior—mention that when ingressCh yields an Envelope (variable e) but waitInputEnabled(ctx) becomes false (often due to ctx cancellation/session termination) the envelope will be discarded, and that this is intentional/acceptable for session teardown so callers are aware; reference the RunIngress method, ingressCh, waitInputEnabled, and local variable e in the comment.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@api/assistant-api/internal/adapters/internal/behavior_test.go`:
- Around line 461-495: Add a parallel test to cover the TTS error recovery path:
create TestHandleTextToSpeechError_EnablesBlockedInput that mirrors
TestHandleTextToSpeechEnd_EnablesBlockedInput (use
newBehaviorDisconnectTestRequestor, set
r.messageLifecycle.SetContextID("ctx-tts-error"), call
r.channels.DisableInput()), then invoke the error handler with a
internal_type.TextToSpeechErrorPacket for that context (e.g., call
requestorDispatchHandler.HandleTextToSpeechError or, if only packet routing
exists, call r.OnPacket with
internal_type.TextToSpeechErrorPacket{ContextID:"ctx-tts-error"}), and assert an
internal_type.EnableInputPacket is emitted to r.channels.ControlChannel() with
matching ContextID using requireBehaviorPacketEventually; name the test
TestHandleTextToSpeechError_EnablesBlockedInput.
In `@api/assistant-api/internal/adapters/internal/dispatch_handler.go`:
- Around line 672-674: The unblocking path in dispatch_handler.go currently
calls h.r.OnPacket(internal_type.EnableInputPacket{ContextID: p.ContextId()})
whenever h.r.channels.InputBlocked() is true, which can let a stale
TextToSpeechErrorPacket re-enable input for a different active turn; modify the
code to verify that the packet's context ID matches the current active context
before calling h.r.OnPacket: retrieve the active context ID (the same place that
determines the current turn) and only send EnableInputPacket when p.ContextId()
== activeContextID (or otherwise belongs to the active turn), ensuring
TextToSpeechErrorPacket from prior contexts cannot unblock input for the wrong
context.
In `@api/assistant-api/internal/adapters/router/dispatch_test.go`:
- Around line 36-37: The test suite added stub methods HandleDisableInput and
HandleEnableInput but never asserts that dispatcher selects them; update the
tests in dispatch_test.go to include explicit selection assertions for
DisableInputPacket and EnableInputPacket by sending those packet types through
the dispatcher and asserting the stub's corresponding methods were invoked (or
call-count/last-received payload recorded) to cover the success path and
factory/selection behavior; ensure you also add a negative/fallback assertion
(e.g., other handlers not invoked) to mirror existing tests for other packet
types so HandleDisableInput and HandleEnableInput are fully exercised.
In
`@api/assistant-api/internal/services/assistant/assistant.deployment.impl.service.go`:
- Around line 718-724: Add same-package tests in
api/assistant-api/internal/services/assistant that verify GreetingInterruptible
is propagated into disabled deployment records (RECORD_INACTIVE) by the logic in
assistant.deployment.impl.service.go; create *_test.go files in the same package
(not package _test) and implement tests for each channel path (API, Debugger,
Phone, Webplugin, Whatsapp) covering (1) success case where
current.GreetingInterruptible is copied into the disabled record, (2) error case
by mocking the storage/DB to return an error and asserting the service surface
returns the error, and (3) fallback case where GreetingInterruptible is
absent/zero and the disabled record uses the expected default; reference the
constant RECORD_INACTIVE and the field current.GreetingInterruptible and invoke
the same methods used in the implementation (the deployment disable/update path
in assistant.deployment.impl.service.go) to exercise the propagation.
---
Outside diff comments:
In `@api/assistant-api/internal/adapters/router/router.go`:
- Around line 29-37: The Classify function is missing SpeechToTextStartPacket so
it returns RouteBackground instead of RouteControl and bypasses the input-gate
rerouting; update the Classify switch (the case list that returns RouteControl)
to include internal_type.SpeechToTextStartPacket so SpeechToTextStartPacket is
classified as RouteControl (consistent with IsInputOriginatedControl) and will
be rerouted behind the input gate in dispatch logic.
---
Nitpick comments:
In `@api/assistant-api/internal/adapters/channel/channel.go`:
- Around line 231-246: The RunIngress loop can dequeue an Envelope from
ingressCh into local variable e and then return without processing it if
waitInputEnabled(ctx) becomes false (e.g., ctx cancellation), which silently
drops the packet; update the RunIngress function (and/or its doc comment) to
explicitly document this behavior—mention that when ingressCh yields an Envelope
(variable e) but waitInputEnabled(ctx) becomes false (often due to ctx
cancellation/session termination) the envelope will be discarded, and that this
is intentional/acceptable for session teardown so callers are aware; reference
the RunIngress method, ingressCh, waitInputEnabled, and local variable e in the
comment.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6d69ef3f-0705-4d7b-a006-391570187aae
📒 Files selected for processing (16)
api/assistant-api/internal/adapters/channel/channel.goapi/assistant-api/internal/adapters/channel/channel_test.goapi/assistant-api/internal/adapters/internal/behavior.goapi/assistant-api/internal/adapters/internal/behavior_test.goapi/assistant-api/internal/adapters/internal/dispatch.goapi/assistant-api/internal/adapters/internal/dispatch_handler.goapi/assistant-api/internal/adapters/router/dispatch.goapi/assistant-api/internal/adapters/router/dispatch_test.goapi/assistant-api/internal/adapters/router/router.goapi/assistant-api/internal/adapters/router/router_test.goapi/assistant-api/internal/entity/assistants/deployment.assistant.goapi/assistant-api/internal/services/assistant/assistant.deployment.impl.service.goapi/assistant-api/internal/type/packet.goapi/assistant-api/migrations/000001_initial_schema.up.sqlapi/assistant-api/migrations/000028_add_greeting_interruptible_to_assistant_deployments.down.sqlapi/assistant-api/migrations/000028_add_greeting_interruptible_to_assistant_deployments.up.sql
| func TestHandleTextToSpeechEnd_EnablesBlockedInput(t *testing.T) { | ||
| r := newBehaviorDisconnectTestRequestor(t, &behaviorCapturingStreamer{}) | ||
| r.messageLifecycle.SetContextID("ctx-tts-end") | ||
| r.channels.DisableInput() | ||
| h := requestorDispatchHandler{r: r} | ||
|
|
||
| h.HandleTextToSpeechEnd(context.Background(), internal_type.TextToSpeechEndPacket{ContextID: "ctx-tts-end"}) | ||
|
|
||
| enable := requireBehaviorPacketEventually[internal_type.EnableInputPacket]( | ||
| t, r.channels.ControlChannel(), time.Second, | ||
| ) | ||
| assert.Equal(t, "ctx-tts-end", enable.ContextID) | ||
| } | ||
|
|
||
| func TestOnPacket_HoldsInputOriginatedControlWhileInputBlocked(t *testing.T) { | ||
| r := newBehaviorDisconnectTestRequestor(t, &behaviorCapturingStreamer{}) | ||
| r.channels.DisableInput() | ||
|
|
||
| err := r.OnPacket(context.Background(), internal_type.InterruptionDetectedPacket{ | ||
| ContextID: "ctx-interrupt", | ||
| Source: internal_type.InterruptionSourceWord, | ||
| }) | ||
| require.NoError(t, err) | ||
|
|
||
| select { | ||
| case env := <-r.channels.ControlChannel(): | ||
| t.Fatalf("expected interruption to be held on ingress, got control packet %T", env.Pkt) | ||
| default: | ||
| } | ||
|
|
||
| pkt := requireBehaviorPacketEventually[internal_type.InterruptionDetectedPacket]( | ||
| t, r.channels.IngressChannel(), time.Second, | ||
| ) | ||
| assert.Equal(t, "ctx-interrupt", pkt.ContextID) | ||
| } |
There was a problem hiding this comment.
Add a TTS-error-path unblock test alongside the TTS-end test.
The new suite validates unblock-on-end, but not unblock-on-TextToSpeechErrorPacket (the other added recovery path).
As per coding guidelines, "Include at least a success path, fallback/error path, and factory/selection behavior assertion in backend tests."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/adapters/internal/behavior_test.go` around lines
461 - 495, Add a parallel test to cover the TTS error recovery path: create
TestHandleTextToSpeechError_EnablesBlockedInput that mirrors
TestHandleTextToSpeechEnd_EnablesBlockedInput (use
newBehaviorDisconnectTestRequestor, set
r.messageLifecycle.SetContextID("ctx-tts-error"), call
r.channels.DisableInput()), then invoke the error handler with a
internal_type.TextToSpeechErrorPacket for that context (e.g., call
requestorDispatchHandler.HandleTextToSpeechError or, if only packet routing
exists, call r.OnPacket with
internal_type.TextToSpeechErrorPacket{ContextID:"ctx-tts-error"}), and assert an
internal_type.EnableInputPacket is emitted to r.channels.ControlChannel() with
matching ContextID using requireBehaviorPacketEventually; name the test
TestHandleTextToSpeechError_EnablesBlockedInput.
Source: Coding guidelines
| if h.r.channels.InputBlocked() { | ||
| h.r.OnPacket(ctx, internal_type.EnableInputPacket{ContextID: p.ContextId()}) | ||
| } |
There was a problem hiding this comment.
Gate TTS-error unblocking by active context.
A stale TextToSpeechErrorPacket can re-enable input for the current turn because this unblock path checks only InputBlocked() and not whether the error belongs to the active context.
✅ Targeted fix
- if h.r.channels.InputBlocked() {
- h.r.OnPacket(ctx, internal_type.EnableInputPacket{ContextID: p.ContextId()})
- }
+ if p.ContextId() == h.r.GetID() && h.r.channels.InputBlocked() {
+ h.r.OnPacket(ctx, internal_type.EnableInputPacket{ContextID: p.ContextId()})
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/adapters/internal/dispatch_handler.go` around
lines 672 - 674, The unblocking path in dispatch_handler.go currently calls
h.r.OnPacket(internal_type.EnableInputPacket{ContextID: p.ContextId()}) whenever
h.r.channels.InputBlocked() is true, which can let a stale
TextToSpeechErrorPacket re-enable input for a different active turn; modify the
code to verify that the packet's context ID matches the current active context
before calling h.r.OnPacket: retrieve the active context ID (the same place that
determines the current turn) and only send EnableInputPacket when p.ContextId()
== activeContextID (or otherwise belongs to the active turn), ensuring
TextToSpeechErrorPacket from prior contexts cannot unblock input for the wrong
context.
| func (s *dispatchHandlerStub) HandleDisableInput(context.Context, internal_type.DisableInputPacket) {} | ||
| func (s *dispatchHandlerStub) HandleEnableInput(context.Context, internal_type.EnableInputPacket) {} |
There was a problem hiding this comment.
Add explicit dispatch assertions for DisableInputPacket and EnableInputPacket.
The stub was extended, but the suite still lacks direct selection assertions for the two new packet mappings.
💡 Suggested test update
type dispatchHandlerStub struct {
calledUserText bool
calledConversationRecordingDone bool
+ calledDisableInput bool
+ calledEnableInput bool
}
@@
-func (s *dispatchHandlerStub) HandleDisableInput(context.Context, internal_type.DisableInputPacket) {}
-func (s *dispatchHandlerStub) HandleEnableInput(context.Context, internal_type.EnableInputPacket) {}
+func (s *dispatchHandlerStub) HandleDisableInput(context.Context, internal_type.DisableInputPacket) {
+ s.calledDisableInput = true
+}
+func (s *dispatchHandlerStub) HandleEnableInput(context.Context, internal_type.EnableInputPacket) {
+ s.calledEnableInput = true
+}
+
+func TestDispatchPacket_DispatchesInputGatePackets(t *testing.T) {
+ handler := &dispatchHandlerStub{}
+ err := DispatchPacket(context.Background(), internal_type.DisableInputPacket{ContextID: "c"}, handler)
+ if err != nil || !handler.calledDisableInput {
+ t.Fatalf("expected HandleDisableInput to be called")
+ }
+
+ err = DispatchPacket(context.Background(), internal_type.EnableInputPacket{ContextID: "c"}, handler)
+ if err != nil || !handler.calledEnableInput {
+ t.Fatalf("expected HandleEnableInput to be called")
+ }
+}As per coding guidelines, "Include at least a success path, fallback/error path, and factory/selection behavior assertion in backend tests."
Also applies to: 171-201
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/adapters/router/dispatch_test.go` around lines 36
- 37, The test suite added stub methods HandleDisableInput and HandleEnableInput
but never asserts that dispatcher selects them; update the tests in
dispatch_test.go to include explicit selection assertions for DisableInputPacket
and EnableInputPacket by sending those packet types through the dispatcher and
asserting the stub's corresponding methods were invoked (or
call-count/last-received payload recorded) to cover the success path and
factory/selection behavior; ensure you also add a negative/fallback assertion
(e.g., other handlers not invoked) to mirror existing tests for other packet
types so HandleDisableInput and HandleEnableInput are fully exercised.
Source: Coding guidelines
| Greeting: current.Greeting, | ||
| GreetingInterruptible: current.GreetingInterruptible, | ||
| Mistake: current.Mistake, | ||
| IdleTimeout: current.IdleTimeout, | ||
| IdleTimeoutBackoff: current.IdleTimeoutBackoff, | ||
| IdleTimeoutMessage: current.IdleTimeoutMessage, | ||
| MaxSessionDuration: current.MaxSessionDuration, |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "== Changed service file =="
fd -a "assistant.deployment.impl.service.go" api/assistant-api/internal/services/assistant
echo
echo "== Candidate tests in same package =="
fd -a "_test.go" api/assistant-api/internal/services/assistant
echo
echo "== GreetingInterruptible references in service package =="
rg -n -C2 'GreetingInterruptible|DisableAssistant(Api|Debugger|Phone|Webplugin|Whatsapp)Deployment' api/assistant-api/internal/services/assistant --type go
echo
echo "== Assertions touching disable-flow behavior in tests (if present) =="
rg -n -C2 'DisableAssistant(Api|Debugger|Phone|Webplugin|Whatsapp)Deployment|RECORD_INACTIVE|GreetingInterruptible' api/assistant-api/internal/services/assistant --type goRepository: rapidaai/voice-ai
Length of output: 27607
🏁 Script executed:
#!/bin/bash
set -euo pipefail
PKG_DIR="api/assistant-api/internal/services/assistant"
echo "== List *_test.go under same package dir =="
fd -a --hidden --no-ignore "_test.go" "$PKG_DIR" || true
echo
echo "== Any GreetingInterruptible mentions in tests under internal/services =="
rg -n --glob='**/*_test.go' -C2 'GreetingInterruptible|DisableAssistant(Api|Debugger|Phone|Webplugin|Whatsapp)Deployment' api/assistant-api/internal/services || true
echo
echo "== Any assertions about disable-flow / RECORD_INACTIVE in tests under internal/services =="
rg -n --glob='**/*_test.go' -C2 'DisableAssistant(Api|Debugger|Phone|Webplugin|Whatsapp)Deployment|RECORD_INACTIVE|RECORD_ARCHIEVE' api/assistant-api/internal/services || trueRepository: rapidaai/voice-ai
Length of output: 272
Add same-package tests for GreetingInterruptible propagation in disable deployment flows
assistant.deployment.impl.service.go propagates current.GreetingInterruptible into disabled deployment records (e.g., around lines 718-724) for API/Debugger/Phone/Webplugin/Whatsapp, but there are no *_test.go files in api/assistant-api/internal/services/assistant, and no test assertions referencing this disable-flow behavior (e.g., RECORD_INACTIVE). Add same-package tests covering success/error/fallback cases for this propagation path.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@api/assistant-api/internal/services/assistant/assistant.deployment.impl.service.go`
around lines 718 - 724, Add same-package tests in
api/assistant-api/internal/services/assistant that verify GreetingInterruptible
is propagated into disabled deployment records (RECORD_INACTIVE) by the logic in
assistant.deployment.impl.service.go; create *_test.go files in the same package
(not package _test) and implement tests for each channel path (API, Debugger,
Phone, Webplugin, Whatsapp) covering (1) success case where
current.GreetingInterruptible is copied into the disabled record, (2) error case
by mocking the storage/DB to return an error and asserting the service surface
returns the error, and (3) fallback case where GreetingInterruptible is
absent/zero and the disabled record uses the expected default; reference the
constant RECORD_INACTIVE and the field current.GreetingInterruptible and invoke
the same methods used in the implementation (the deployment disable/update path
in assistant.deployment.impl.service.go) to exercise the propagation.
Source: Coding guidelines
fix: early audio is not droped
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
api/assistant-api/internal/adapters/channel/channel.go (1)
131-137:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
OnIngresscan still block after entering the “non-blocking” overflow path.Line 136 performs a blocking send after
FlushIngress(). Under concurrent producers, the channel can refill before this send and block request processing.Suggested fix
func (c *RequestorChannels) OnIngress(e Envelope) { - select { - case c.ingressCh <- e: - default: - c.FlushIngress() - c.ingressCh <- e - } + for { + select { + case c.ingressCh <- e: + return + default: + c.FlushIngress() + } + } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/internal/adapters/channel/channel.go` around lines 131 - 137, The OnIngress method in RequestorChannels has a blocking send on line 136 after calling FlushIngress(), which can still block under concurrent load. After flushing the ingress channel, the channel can be refilled by other goroutines before the blocking send completes, defeating the non-blocking intent. Replace the blocking send after FlushIngress() with another non-blocking select statement (similar to the outer select) that includes a case for sending to c.ingressCh and a default case, so that if the channel fills again before the send succeeds, it calls FlushIngress() again rather than blocking indefinitely.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@api/assistant-api/api/talk/outbound_call_rest.go`:
- Around line 76-77: The observability.AttributeValue(ir) call in the payload
field logs the entire REST request object, which exposes sensitive user data
such as phone numbers and arbitrary metadata/options. Replace this with a
redacted or whitelisted set of safe attributes from the request. Extract only
the necessary non-sensitive fields from the request and log those instead,
ensuring sensitive information like phone data is excluded from the
observability logs.
In `@api/assistant-api/api/talk/outbound_call.go`:
- Around line 46-47: The logging statement at the observability.AttributeValue
call is capturing the entire outbound request payload (ir) which may contain
sensitive phone numbers and metadata. Instead of logging the full payload,
replace the "payload" attribute with a whitelisted, minimal set of non-sensitive
attributes that only capture necessary information for debugging without
exposing PII or secrets. Remove the current observability.AttributeValue(ir)
line and build a curated map of safe, whitelisted fields to include in the
observability logs.
In `@api/assistant-api/internal/adapters/internal/dispatch_handler.go`:
- Around line 355-357: The HandleDispatchPolicy method in
requestorDispatchHandler applies the dispatch policy unconditionally without
verifying that the DispatchPolicyPacket belongs to the active context. Since
route policy is packet-name scoped, stale packets from previous contexts can
incorrectly modify current-turn routing behavior. Check the ContextID field in
the DispatchPolicyPacket parameter against the currently active context ID
before calling h.r.dispatchRoute.ApplyPolicy, and only apply the policy if the
context IDs match.
In `@api/assistant-api/internal/adapters/internal/dispatch_policy_test.go`:
- Around line 100-111: The InterruptionDetectedPacket dispatch in the test is
using default Source and Event values, which means no downstream packets are
produced regardless of whether the ignore policy is applied, making this a false
positive test. To properly validate that the ignore policy skips egress packets,
modify the InterruptionDetectedPacket being dispatched (currently sent with only
ContextID set to "interrupt-dropped") to include appropriate Source and Event
field values that would normally generate downstream egress packets without the
ignore policy, then verify the ignore policy correctly prevents them from being
added to the EgressChannel.
In `@api/assistant-api/internal/adapters/internal/dispatch.go`:
- Around line 89-97: The DispatchPacket method calls in both the AsyncPacket
branch (inside the utils.Go goroutine) and the default branch are silently
discarding their returned errors by assigning to underscore. Replace these error
discards with proper error handling by capturing the error return value from
each DispatchPacket call and either logging the error (if it's non-nil) or
propagating it appropriately. This ensures that routing failures and unknown
packet mappings are visible rather than hidden.
In `@api/assistant-api/internal/channel/telephony/internal/sip/telephony.go`:
- Around line 267-269: The outboundHealthGateEnabled method dereferences
appCfg.SIPConfig without first checking if it is nil, which causes a panic when
called from OutboundCall (at lines 202-205) if the SIP configuration is absent.
Add a nil check for appCfg.SIPConfig at the beginning of the
outboundHealthGateEnabled method, before attempting to access
SIPConfig.OutboundHealthGate, and return a safe default value (such as true)
when the SIP config is nil to allow the call to fail gracefully rather than
panic.
---
Outside diff comments:
In `@api/assistant-api/internal/adapters/channel/channel.go`:
- Around line 131-137: The OnIngress method in RequestorChannels has a blocking
send on line 136 after calling FlushIngress(), which can still block under
concurrent load. After flushing the ingress channel, the channel can be refilled
by other goroutines before the blocking send completes, defeating the
non-blocking intent. Replace the blocking send after FlushIngress() with another
non-blocking select statement (similar to the outer select) that includes a case
for sending to c.ingressCh and a default case, so that if the channel fills
again before the send succeeds, it calls FlushIngress() again rather than
blocking indefinitely.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2f3b545f-de1d-4dd9-a9ab-c2bb758c77fc
📒 Files selected for processing (21)
api/assistant-api/api/talk/outbound_call.goapi/assistant-api/api/talk/outbound_call_rest.goapi/assistant-api/internal/adapters/channel/channel.goapi/assistant-api/internal/adapters/internal/behavior.goapi/assistant-api/internal/adapters/internal/behavior_test.goapi/assistant-api/internal/adapters/internal/dispatch.goapi/assistant-api/internal/adapters/internal/dispatch_handler.goapi/assistant-api/internal/adapters/internal/dispatch_policy_test.goapi/assistant-api/internal/adapters/internal/requestor.goapi/assistant-api/internal/adapters/router/dispatch.goapi/assistant-api/internal/adapters/router/dispatch_test.goapi/assistant-api/internal/adapters/router/policy.goapi/assistant-api/internal/adapters/router/policy_test.goapi/assistant-api/internal/adapters/router/route.goapi/assistant-api/internal/adapters/router/route_test.goapi/assistant-api/internal/adapters/router/router.goapi/assistant-api/internal/adapters/router/router_test.goapi/assistant-api/internal/channel/telephony/internal/sip/telephony.goapi/assistant-api/internal/channel/telephony/internal/sip/telephony_test.goapi/assistant-api/internal/services/assistant/authentication.impl.service.goapi/assistant-api/internal/type/packet.go
💤 Files with no reviewable changes (1)
- api/assistant-api/internal/channel/telephony/internal/sip/telephony_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- api/assistant-api/internal/adapters/router/dispatch_test.go
- api/assistant-api/internal/adapters/internal/behavior.go
| "payload": observability.AttributeValue(ir), | ||
| }, |
There was a problem hiding this comment.
Avoid full REST request payload logging.
observability.AttributeValue(ir) logs the full body, including user phone data and arbitrary metadata/options. Keep logs to a redacted/whitelisted set.
Suggested fix
Attributes: observability.Attributes{
- "payload": observability.AttributeValue(ir),
+ "assistant_id": observability.AttributeValue(assistantID),
+ "assistant_version": observability.AttributeValue(version),
+ "to_present": observability.AttributeValue(validator.NonNil(ir.ToNumber) && validator.NotBlank(*ir.ToNumber)),
+ "from_present": observability.AttributeValue(validator.NonNil(ir.FromNumber) && validator.NotBlank(*ir.FromNumber)),
},🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/api/talk/outbound_call_rest.go` around lines 76 - 77, The
observability.AttributeValue(ir) call in the payload field logs the entire REST
request object, which exposes sensitive user data such as phone numbers and
arbitrary metadata/options. Replace this with a redacted or whitelisted set of
safe attributes from the request. Extract only the necessary non-sensitive
fields from the request and log those instead, ensuring sensitive information
like phone data is excluded from the observability logs.
| "payload": observability.AttributeValue(ir), | ||
| }, |
There was a problem hiding this comment.
Do not log the full outbound request payload.
Logging observability.AttributeValue(ir) can capture sensitive phone and metadata fields wholesale. Prefer a whitelisted, minimal attribute set to avoid PII/secrets overexposure.
Suggested fix
Attributes: observability.Attributes{
- "payload": observability.AttributeValue(ir),
+ "assistant_id": observability.AttributeValue(ir.GetAssistant().GetAssistantId()),
+ "assistant_version": observability.AttributeValue(ir.GetAssistant().GetVersion()),
+ "to_present": observability.AttributeValue(!utils.IsEmpty(ir.GetToNumber())),
+ "from_present": observability.AttributeValue(!utils.IsEmpty(ir.GetFromNumber())),
},📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "payload": observability.AttributeValue(ir), | |
| }, | |
| "assistant_id": observability.AttributeValue(ir.GetAssistant().GetAssistantId()), | |
| "assistant_version": observability.AttributeValue(ir.GetAssistant().GetVersion()), | |
| "to_present": observability.AttributeValue(!utils.IsEmpty(ir.GetToNumber())), | |
| "from_present": observability.AttributeValue(!utils.IsEmpty(ir.GetFromNumber())), | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/api/talk/outbound_call.go` around lines 46 - 47, The
logging statement at the observability.AttributeValue call is capturing the
entire outbound request payload (ir) which may contain sensitive phone numbers
and metadata. Instead of logging the full payload, replace the "payload"
attribute with a whitelisted, minimal set of non-sensitive attributes that only
capture necessary information for debugging without exposing PII or secrets.
Remove the current observability.AttributeValue(ir) line and build a curated map
of safe, whitelisted fields to include in the observability logs.
| func (h requestorDispatchHandler) HandleDispatchPolicy(ctx context.Context, p internal_type.DispatchPolicyPacket) { | ||
| h.r.dispatchRoute.ApplyPolicy(p.Policy) | ||
| } |
There was a problem hiding this comment.
Apply dispatch policy only for the active context.
DispatchPolicyPacket carries ContextID, but Line 356 applies policy unconditionally. Because route policy is packet-name scoped, stale context packets can toggle current-turn routing behavior.
Suggested fix
func (h requestorDispatchHandler) HandleDispatchPolicy(ctx context.Context, p internal_type.DispatchPolicyPacket) {
+ if p.ContextID != "" && p.ContextID != h.r.GetID() {
+ return
+ }
h.r.dispatchRoute.ApplyPolicy(p.Policy)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (h requestorDispatchHandler) HandleDispatchPolicy(ctx context.Context, p internal_type.DispatchPolicyPacket) { | |
| h.r.dispatchRoute.ApplyPolicy(p.Policy) | |
| } | |
| func (h requestorDispatchHandler) HandleDispatchPolicy(ctx context.Context, p internal_type.DispatchPolicyPacket) { | |
| if p.ContextID != "" && p.ContextID != h.r.GetID() { | |
| return | |
| } | |
| h.r.dispatchRoute.ApplyPolicy(p.Policy) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/adapters/internal/dispatch_handler.go` around
lines 355 - 357, The HandleDispatchPolicy method in requestorDispatchHandler
applies the dispatch policy unconditionally without verifying that the
DispatchPolicyPacket belongs to the active context. Since route policy is
packet-name scoped, stale packets from previous contexts can incorrectly modify
current-turn routing behavior. Check the ContextID field in the
DispatchPolicyPacket parameter against the currently active context ID before
calling h.r.dispatchRoute.ApplyPolicy, and only apply the policy if the context
IDs match.
| r.dispatch(context.Background(), internal_type.UserAudioReceivedPacket{ContextID: "audio-dropped"}) | ||
| r.dispatch(context.Background(), internal_type.InterruptionDetectedPacket{ContextID: "interrupt-dropped"}) | ||
|
|
||
| if got := len(r.channels.IngressChannel()); got != 0 { | ||
| t.Fatalf("expected ignored user audio to skip downstream ingress packets, got %d", got) | ||
| } | ||
| if got := len(r.channels.ControlChannel()); got != 0 { | ||
| t.Fatalf("expected ignored interruption to skip control packets, got %d", got) | ||
| } | ||
| if got := len(r.channels.EgressChannel()); got != 0 { | ||
| t.Fatalf("expected ignored interruption to skip egress packets, got %d", got) | ||
| } |
There was a problem hiding this comment.
Interruption assertion is currently a false positive.
The test sends InterruptionDetectedPacket with default Source/Event, which produces no downstream packets even without ignore policy, so this check does not validate policy behavior.
Suggested fix
func newDispatchPolicyTestRequestor() *genericRequestor {
channels := adapter_channel.NewRequestorChannels()
+ lifecycle := adapter_lifecycle.NewMessageLifecycle()
+ lifecycle.SetContextID("ctx-test")
return &genericRequestor{
channels: channels,
dispatchRoute: adapter_router.NewDispatchRoute(adapter_router.NewRoutePolicy(), channels),
sessionLifecycle: adapter_lifecycle.NewSessionLifecycleWithState(adapter_lifecycle.StateReady),
+ messageLifecycle: lifecycle,
}
}
@@
- r.dispatch(context.Background(), internal_type.InterruptionDetectedPacket{ContextID: "interrupt-dropped"})
+ r.dispatch(context.Background(), internal_type.InterruptionDetectedPacket{
+ ContextID: "interrupt-dropped",
+ Source: internal_type.InterruptionSourceWord,
+ })🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/adapters/internal/dispatch_policy_test.go` around
lines 100 - 111, The InterruptionDetectedPacket dispatch in the test is using
default Source and Event values, which means no downstream packets are produced
regardless of whether the ignore policy is applied, making this a false positive
test. To properly validate that the ignore policy skips egress packets, modify
the InterruptionDetectedPacket being dispatched (currently sent with only
ContextID set to "interrupt-dropped") to include appropriate Source and Event
field values that would normally generate downstream egress packets without the
ignore policy, then verify the ignore policy correctly prevents them from being
added to the EgressChannel.
| r.dispatchRoute.Route(ctx, p, func(ctx context.Context, p internal_type.Packet) { | ||
| switch p.(type) { | ||
| case internal_type.AsyncPacket: | ||
| utils.Go(ctx, func() { | ||
| _ = adapter_router.DispatchPacket(ctx, p, requestorDispatchHandler{r: r}) | ||
| }) | ||
| default: | ||
| _ = adapter_router.DispatchPacket(ctx, p, requestorDispatchHandler{r: r}) | ||
| } |
There was a problem hiding this comment.
Do not discard DispatchPacket errors silently.
Both branches ignore returned errors. Unknown packet mappings now fail without logs/visibility, which hides broken routing paths.
Suggested fix
func (r *genericRequestor) dispatch(ctx context.Context, p internal_type.Packet) {
r.dispatchRoute.Route(ctx, p, func(ctx context.Context, p internal_type.Packet) {
switch p.(type) {
case internal_type.AsyncPacket:
utils.Go(ctx, func() {
- _ = adapter_router.DispatchPacket(ctx, p, requestorDispatchHandler{r: r})
+ if err := adapter_router.DispatchPacket(ctx, p, requestorDispatchHandler{r: r}); err != nil {
+ r.logger.Errorw("packet dispatch failed", "packet", p.PacketName(), "error", err)
+ }
})
default:
- _ = adapter_router.DispatchPacket(ctx, p, requestorDispatchHandler{r: r})
+ if err := adapter_router.DispatchPacket(ctx, p, requestorDispatchHandler{r: r}); err != nil {
+ r.logger.Errorw("packet dispatch failed", "packet", p.PacketName(), "error", err)
+ }
}
})
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/adapters/internal/dispatch.go` around lines 89 -
97, The DispatchPacket method calls in both the AsyncPacket branch (inside the
utils.Go goroutine) and the default branch are silently discarding their
returned errors by assigning to underscore. Replace these error discards with
proper error handling by capturing the error return value from each
DispatchPacket call and either logging the error (if it's non-nil) or
propagating it appropriately. This ensures that routing failures and unknown
packet mappings are visible rather than hidden.
| func (t *sipTelephony) outboundHealthGateEnabled(appCfg *config.AssistantConfig) bool { | ||
| if appCfg.SIPConfig.OutboundHealthGate == nil { | ||
| return true |
There was a problem hiding this comment.
Prevent nil-pointer panic in health-gate evaluation.
outboundHealthGateEnabled dereferences appCfg.SIPConfig without nil checks, and it is called unconditionally in OutboundCall (Line 202 and Line 204). If SIP config is absent, this panics instead of returning a controlled failure.
Suggested fix
func (t *sipTelephony) outboundHealthGateEnabled(appCfg *config.AssistantConfig) bool {
- if appCfg.SIPConfig.OutboundHealthGate == nil {
+ if appCfg == nil || appCfg.SIPConfig == nil || appCfg.SIPConfig.OutboundHealthGate == nil {
return true
}
return *appCfg.SIPConfig.OutboundHealthGate
}Also applies to: 202-205
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/channel/telephony/internal/sip/telephony.go`
around lines 267 - 269, The outboundHealthGateEnabled method dereferences
appCfg.SIPConfig without first checking if it is nil, which causes a panic when
called from OutboundCall (at lines 202-205) if the SIP configuration is absent.
Add a nil check for appCfg.SIPConfig at the beginning of the
outboundHealthGateEnabled method, before attempting to access
SIPConfig.OutboundHealthGate, and return a safe default value (such as true)
when the SIP config is nil to allow the call to fail gracefully rather than
panic.
…ent-rest-api feat: added debugger deployment api
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go (1)
82-85:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winMisleading error message: deployment creation failure incorrectly returns "unauthenticated" message.
When
CreateDebuggerDeploymentfails, the handler returns an error message claiming the request is unauthenticated. This is incorrect and will confuse users debugging deployment creation failures.🐛 Proposed fix
if err != nil { return utils.Error[assistant_api.GetAssistantDebuggerDeploymentResponse]( - errors.New("unauthenticated request for create assistant debugger deployment"), - "Please provider valid service credentials to perfom invoke, read docs @ docs.rapida.ai", + err, + "Unable to create assistant debugger deployment, please try again later.", ) }Also consider wrapping the error for better observability:
if err != nil { + deploymentApi.logger.Errorf("failed to create debugger deployment for assistant %d: %v", + deployment.GetDebugger().GetAssistantId(), err) return utils.Error[assistant_api.GetAssistantDebuggerDeploymentResponse]( - errors.New("unauthenticated request for create assistant debugger deployment"), - "Please provider valid service credentials to perfom invoke, read docs @ docs.rapida.ai", + err, + "Unable to create assistant debugger deployment, please try again later.", ) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go` around lines 82 - 85, The error message returned from the CreateDebuggerDeployment function is misleading because it claims "unauthenticated request" when the actual failure is related to deployment creation, not authentication. Replace the misleading error message with an accurate message that describes the actual deployment creation failure, and wrap the underlying error to provide better observability for debugging. Update the utils.Error call to include a more contextually appropriate message that reflects what actually failed during the deployment creation process.
🧹 Nitpick comments (3)
api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment_rest.go (2)
113-199: ⚡ Quick winExtract audio provider construction into a helper function.
The input audio provider construction (lines 113-155) and output audio provider construction (lines 157-199) are nearly identical, differing only in the variable names. Extracting a helper function would eliminate ~80 lines of duplication and reduce maintenance burden.
♻️ Proposed refactor
+func buildDeploymentAudioProvider( + audioRequest *openapi.DeploymentAudioProviderRequest, + errorCode pkg_errors.PlatformError, +) (*assistant_api.DeploymentAudioProvider, *openapi.ErrorResponse) { + if audioRequest == nil { + return nil, nil + } + + if !validator.NotBlank(audioRequest.AudioProvider) { + return nil, &openapi.ErrorResponse{ + Code: utils.Ptr(errorCode.HTTPStatusCodeInt32()), + Success: utils.Ptr(false), + Error: &openapi.Error{ + ErrorCode: utils.Ptr(openapi.Uint64String(errorCode.CodeString())), + ErrorMessage: utils.Ptr(errorCode.Error), + HumanMessage: utils.Ptr(errorCode.ErrorMessage), + }, + } + } + + audioOptions := []*assistant_api.Metadata{} + if validator.NonNil(audioRequest.AudioOptions) { + for _, audioOption := range *audioRequest.AudioOptions { + key := "" + if validator.NonNil(audioOption.Key) { + key = *audioOption.Key + } + value := "" + if validator.NonNil(audioOption.Value) { + value = *audioOption.Value + } + audioOptions = append(audioOptions, &assistant_api.Metadata{Key: key, Value: value}) + } + } + + audioStatus := "" + if validator.NonNil(audioRequest.Status) { + audioStatus = *audioRequest.Status + } + audioType := "" + if validator.NonNil(audioRequest.AudioType) { + audioType = *audioRequest.AudioType + } + + return &assistant_api.DeploymentAudioProvider{ + AudioProvider: audioRequest.AudioProvider, + AudioOptions: audioOptions, + Status: audioStatus, + AudioType: audioType, + }, nil +}Then use it:
- var inputAudio *assistant_api.DeploymentAudioProvider - if validator.NonNil(request.InputAudio) { - if !validator.NotBlank(request.InputAudio.AudioProvider) { - c.JSON(pkg_errors.CreateAssistantDebuggerDeploymentInvalidAudioProvider.HTTPStatusCode, openapi.ErrorResponse{ - Code: utils.Ptr(pkg_errors.CreateAssistantDebuggerDeploymentInvalidAudioProvider.HTTPStatusCodeInt32()), - Success: utils.Ptr(false), - Error: &openapi.Error{ - ErrorCode: utils.Ptr(openapi.Uint64String(pkg_errors.CreateAssistantDebuggerDeploymentInvalidAudioProvider.CodeString())), - ErrorMessage: utils.Ptr(pkg_errors.CreateAssistantDebuggerDeploymentInvalidAudioProvider.Error), - HumanMessage: utils.Ptr(pkg_errors.CreateAssistantDebuggerDeploymentInvalidAudioProvider.ErrorMessage), - }, - }) - return - } - // ... rest of construction logic + inputAudio, errResp := buildDeploymentAudioProvider( + request.InputAudio, + pkg_errors.CreateAssistantDebuggerDeploymentInvalidAudioProvider, + ) + if errResp != nil { + c.JSON(errResp.Code, errResp) + return }Apply the same pattern to
outputAudio.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment_rest.go` around lines 113 - 199, The code for constructing inputAudio and outputAudio objects contains significant duplication - both blocks perform nearly identical validation, option processing, and struct construction, differing only in variable names. Create a helper function that takes a request audio configuration object as a parameter and returns a pointer to an assistant_api.DeploymentAudioProvider struct. This function should contain the shared logic for validating the AudioProvider field, processing AudioOptions into Metadata slices, and extracting Status and AudioType fields with nil checks. Then replace both the input audio construction block and the output audio construction block with calls to this helper function, passing request.InputAudio and request.OutputAudio respectively. This will eliminate approximately 80 lines of duplicated code and improve maintainability.
228-276: ⚡ Quick winExtract audio provider response construction into a helper function.
The input audio response construction (lines 232-253) and output audio response construction (lines 255-276) follow an identical pattern. Extracting a helper function would eliminate duplication and improve maintainability.
♻️ Proposed refactor
+func buildAudioProviderResponse( + audio *internal_assistant_entity.AssistantDeploymentAudio, +) *openapi.DeploymentAudioProvider { + if !validator.NonNil(audio) { + return nil + } + + audioId := openapi.Uint64String(strconv.FormatUint(audio.Id, 10)) + audioStatus := audio.Status.String() + audioOptions := []openapi.Metadata{} + for _, audioOption := range audio.AudioOptions { + if !validator.NonNil(audioOption) { + continue + } + audioOptions = append(audioOptions, openapi.Metadata{ + Key: utils.Ptr(audioOption.Key), + Value: utils.Ptr(audioOption.Value), + }) + } + + return &openapi.DeploymentAudioProvider{ + Id: &audioId, + AudioType: &audio.AudioType, + AudioProvider: &audio.AudioProvider, + AudioOptions: &audioOptions, + Status: &audioStatus, + } +}Then use it:
- var responseInputAudio *openapi.DeploymentAudioProvider - if validator.NonNil(deployment.InputAudio) { - inputAudioId := openapi.Uint64String(strconv.FormatUint(deployment.InputAudio.Id, 10)) - // ... rest of construction logic - } + responseInputAudio := buildAudioProviderResponse(deployment.InputAudio) - var responseOutputAudio *openapi.DeploymentAudioProvider - if validator.NonNil(deployment.OutputAudio) { - outputAudioId := openapi.Uint64String(strconv.FormatUint(deployment.OutputAudio.Id, 10)) - // ... rest of construction logic - } + responseOutputAudio := buildAudioProviderResponse(deployment.OutputAudio)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment_rest.go` around lines 228 - 276, Extract the duplicated audio provider response construction logic into a helper function. Both the input audio block (handling deployment.InputAudio) and output audio block (handling deployment.OutputAudio) follow identical patterns: they format the ID using strconv.FormatUint, get the status string, iterate through AudioOptions to build a metadata slice, and construct an openapi.DeploymentAudioProvider struct. Create a single helper function that accepts an audio provider object as a parameter and returns a populated openapi.DeploymentAudioProvider pointer, then replace both the responseInputAudio and responseOutputAudio construction blocks with calls to this helper function, passing deployment.InputAudio and deployment.OutputAudio respectively.api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go (1)
35-67: ⚡ Quick winExtract validation helper to eliminate duplication.
The three timeout validation blocks (IdealTimeout, IdealTimeoutBackoff, MaxSessionDuration) follow an identical pattern: range check → construct error response → return. Extracting a helper function would reduce duplication and make the validation logic easier to maintain.
♻️ Proposed refactor
+func validateTimeoutParameter( + value uint64, + min, max int, + platformError pkg_errors.PlatformError, +) (*assistant_api.GetAssistantDebuggerDeploymentResponse, error) { + if !validator.Between(int(value), min, max) { + return &assistant_api.GetAssistantDebuggerDeploymentResponse{ + Code: platformError.HTTPStatusCodeInt32(), + Success: false, + Error: &assistant_api.Error{ + ErrorCode: uint64(platformError.Code), + ErrorMessage: platformError.Error, + HumanMessage: platformError.ErrorMessage, + }, + }, errors.New(platformError.Error) + } + return nil, nil +}Then use it:
- if !validator.Between(int(deployment.GetDebugger().GetIdealTimeout()), 15, 120) { - return &assistant_api.GetAssistantDebuggerDeploymentResponse{ - Code: pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.HTTPStatusCodeInt32(), - Success: false, - Error: &assistant_api.Error{ - ErrorCode: uint64(pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.Code), - ErrorMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.Error, - HumanMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.ErrorMessage, - }, - }, errors.New(pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.Error) + if resp, err := validateTimeoutParameter( + deployment.GetDebugger().GetIdealTimeout(), 15, 120, + pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout, + ); err != nil { + return resp, err }Apply the same pattern to the other two timeout validations.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go` around lines 35 - 67, The three validation blocks for IdealTimeout, IdealTimeoutBackoff, and MaxSessionDuration follow an identical pattern of range checking with validator.Between() followed by constructing and returning an error response. Create a helper function that accepts the value to validate, the minimum and maximum bounds, and the corresponding error constant, then returns both the GetAssistantDebuggerDeploymentResponse and error. Replace each of the three validation blocks (for GetIdealTimeout with bounds 15-120, GetIdealTimeoutBackoff with bounds 0-5, and GetMaxSessionDuration with bounds 180-600) with a single call to this helper function, passing the appropriate getter method, bounds, and error constant for each case.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go`:
- Around line 35-67: The gRPC validation for GetIdealTimeout(),
GetIdealTimeoutBackoff(), and GetMaxSessionDuration() doesn't check if these
fields were actually set by the client before validating them with
validator.Between(). Since protobuf uint64 fields default to 0 when unset,
validation fails even when clients omit these optional fields. Add nil-safe
checks before each validator.Between() call to only validate these parameters
when explicitly provided by the client, making them optional with consistent
behavior across both REST and gRPC handlers.
In `@pkg/errors/create_assistant_deployment.go`:
- Around line 29-40: The CreateAssistantDebuggerDeploymentMissingAuthScope error
definition has an identical ErrorMessage to
CreateAssistantDebuggerDeploymentUnauthenticated, which makes it difficult for
users to distinguish between the two authentication failures. Update the
ErrorMessage field in CreateAssistantDebuggerDeploymentMissingAuthScope to
provide a distinct, user-friendly message that specifically indicates the issue
is related to missing authentication scopes rather than general
unauthentication.
---
Outside diff comments:
In
`@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go`:
- Around line 82-85: The error message returned from the
CreateDebuggerDeployment function is misleading because it claims
"unauthenticated request" when the actual failure is related to deployment
creation, not authentication. Replace the misleading error message with an
accurate message that describes the actual deployment creation failure, and wrap
the underlying error to provide better observability for debugging. Update the
utils.Error call to include a more contextually appropriate message that
reflects what actually failed during the deployment creation process.
---
Nitpick comments:
In
`@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment_rest.go`:
- Around line 113-199: The code for constructing inputAudio and outputAudio
objects contains significant duplication - both blocks perform nearly identical
validation, option processing, and struct construction, differing only in
variable names. Create a helper function that takes a request audio
configuration object as a parameter and returns a pointer to an
assistant_api.DeploymentAudioProvider struct. This function should contain the
shared logic for validating the AudioProvider field, processing AudioOptions
into Metadata slices, and extracting Status and AudioType fields with nil
checks. Then replace both the input audio construction block and the output
audio construction block with calls to this helper function, passing
request.InputAudio and request.OutputAudio respectively. This will eliminate
approximately 80 lines of duplicated code and improve maintainability.
- Around line 228-276: Extract the duplicated audio provider response
construction logic into a helper function. Both the input audio block (handling
deployment.InputAudio) and output audio block (handling deployment.OutputAudio)
follow identical patterns: they format the ID using strconv.FormatUint, get the
status string, iterate through AudioOptions to build a metadata slice, and
construct an openapi.DeploymentAudioProvider struct. Create a single helper
function that accepts an audio provider object as a parameter and returns a
populated openapi.DeploymentAudioProvider pointer, then replace both the
responseInputAudio and responseOutputAudio construction blocks with calls to
this helper function, passing deployment.InputAudio and deployment.OutputAudio
respectively.
In
`@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go`:
- Around line 35-67: The three validation blocks for IdealTimeout,
IdealTimeoutBackoff, and MaxSessionDuration follow an identical pattern of range
checking with validator.Between() followed by constructing and returning an
error response. Create a helper function that accepts the value to validate, the
minimum and maximum bounds, and the corresponding error constant, then returns
both the GetAssistantDebuggerDeploymentResponse and error. Replace each of the
three validation blocks (for GetIdealTimeout with bounds 15-120,
GetIdealTimeoutBackoff with bounds 0-5, and GetMaxSessionDuration with bounds
180-600) with a single call to this helper function, passing the appropriate
getter method, bounds, and error constant for each case.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9c5a6725-71dc-4364-a962-e17dc5215944
📒 Files selected for processing (10)
api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.goapi/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment_rest.goapi/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment_rest_test.goapi/assistant-api/router/assistant.gocmd/assistant/assistant.goopenapi/artifacts/assistant-api.yamlopenapi/artifacts/common.yamlopenapi/assistant.gen.goopenapi/common.gen.gopkg/errors/create_assistant_deployment.go
✅ Files skipped from review due to trivial changes (1)
- openapi/common.gen.go
| if !validator.Between(int(deployment.GetDebugger().GetIdealTimeout()), 15, 120) { | ||
| return &assistant_api.GetAssistantDebuggerDeploymentResponse{ | ||
| Code: pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.HTTPStatusCodeInt32(), | ||
| Success: false, | ||
| Error: &assistant_api.Error{ | ||
| ErrorCode: uint64(pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.Code), | ||
| ErrorMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.Error, | ||
| HumanMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.ErrorMessage, | ||
| }, | ||
| }, errors.New(pkg_errors.CreateAssistantDebuggerDeploymentInvalidIdealTimeout.Error) | ||
| } | ||
| if !validator.Between(int(deployment.GetDebugger().GetIdealTimeoutBackoff()), 0, 5) { | ||
| return &assistant_api.GetAssistantDebuggerDeploymentResponse{ | ||
| Code: pkg_errors.CreateAssistantDebuggerDeploymentInvalidTimeoutBackoff.HTTPStatusCodeInt32(), | ||
| Success: false, | ||
| Error: &assistant_api.Error{ | ||
| ErrorCode: uint64(pkg_errors.CreateAssistantDebuggerDeploymentInvalidTimeoutBackoff.Code), | ||
| ErrorMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidTimeoutBackoff.Error, | ||
| HumanMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidTimeoutBackoff.ErrorMessage, | ||
| }, | ||
| }, errors.New(pkg_errors.CreateAssistantDebuggerDeploymentInvalidTimeoutBackoff.Error) | ||
| } | ||
| if !validator.Between(int(deployment.GetDebugger().GetMaxSessionDuration()), 180, 600) { | ||
| return &assistant_api.GetAssistantDebuggerDeploymentResponse{ | ||
| Code: pkg_errors.CreateAssistantDebuggerDeploymentInvalidSessionDuration.HTTPStatusCodeInt32(), | ||
| Success: false, | ||
| Error: &assistant_api.Error{ | ||
| ErrorCode: uint64(pkg_errors.CreateAssistantDebuggerDeploymentInvalidSessionDuration.Code), | ||
| ErrorMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidSessionDuration.Error, | ||
| HumanMessage: pkg_errors.CreateAssistantDebuggerDeploymentInvalidSessionDuration.ErrorMessage, | ||
| }, | ||
| }, errors.New(pkg_errors.CreateAssistantDebuggerDeploymentInvalidSessionDuration.Error) | ||
| } |
There was a problem hiding this comment.
Inconsistent validation: gRPC requires timeout parameters but REST makes them optional.
The gRPC handler validates timeout values without checking if they were set by the client. When a protobuf uint64 field is unset, GetIdealTimeout() returns 0, and validator.Between(0, 15, 120) fails. This effectively requires clients to provide IdealTimeout, IdealTimeoutBackoff, and MaxSessionDuration.
In contrast, the REST handler (lines 76-111 in create_assistant_debugger_deployment_rest.go) checks validator.NonNil(request.IdealTimeout) before validating, making these fields optional.
This breaks API contract consistency between REST and gRPC clients. Either both should require the fields, or both should treat them as optional with nil-safe validation.
Recommended fix: Add nil checks before validation
If timeout parameters should be optional, check whether the proto field was explicitly set before validating:
- if !validator.Between(int(deployment.GetDebugger().GetIdealTimeout()), 15, 120) {
+ debugger := deployment.GetDebugger()
+ if debugger.IdealTimeout != 0 && !validator.Between(int(debugger.IdealTimeout), 15, 120) {
return &assistant_api.GetAssistantDebuggerDeploymentResponse{Apply the same pattern to IdealTimeoutBackoff and MaxSessionDuration. If they should be required for debugger deployments, update the REST handler to enforce the same requirement.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@api/assistant-api/api/assistant-deployment/create_assistant_debugger_deployment.go`
around lines 35 - 67, The gRPC validation for GetIdealTimeout(),
GetIdealTimeoutBackoff(), and GetMaxSessionDuration() doesn't check if these
fields were actually set by the client before validating them with
validator.Between(). Since protobuf uint64 fields default to 0 when unset,
validation fails even when clients omit these optional fields. Add nil-safe
checks before each validator.Between() call to only validate these parameters
when explicitly provided by the client, making them optional with consistent
behavior across both REST and gRPC handlers.
| CreateAssistantDebuggerDeploymentUnauthenticated = PlatformError{ | ||
| HTTPStatusCode: http.StatusUnauthorized, | ||
| Code: CreateAssistantDebuggerDeploymentUnauthenticatedCode, | ||
| Error: "unauthenticated request", | ||
| ErrorMessage: "Unauthenticated request, please try again with valid authentication.", | ||
| } | ||
| CreateAssistantDebuggerDeploymentMissingAuthScope = PlatformError{ | ||
| HTTPStatusCode: http.StatusForbidden, | ||
| Code: CreateAssistantDebuggerDeploymentMissingAuthScopeCode, | ||
| Error: "missing authentication scope", | ||
| ErrorMessage: "Unauthenticated request, please try again with valid authentication.", | ||
| } |
There was a problem hiding this comment.
Duplicate error messages for Unauthenticated vs MissingAuthScope.
Both CreateAssistantDebuggerDeploymentUnauthenticated (line 33) and CreateAssistantDebuggerDeploymentMissingAuthScope (line 39) return identical ErrorMessage: "Unauthenticated request, please try again with valid authentication."
While users can distinguish these via error codes (1004002 vs 1004003), providing distinct human-readable messages would improve the user experience when debugging authentication issues.
📝 Suggested improvement
CreateAssistantDebuggerDeploymentMissingAuthScope = PlatformError{
HTTPStatusCode: http.StatusForbidden,
Code: CreateAssistantDebuggerDeploymentMissingAuthScopeCode,
Error: "missing authentication scope",
- ErrorMessage: "Unauthenticated request, please try again with valid authentication.",
+ ErrorMessage: "Missing required authentication scope (user, project, or organization). Please verify your credentials include all required scopes.",
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CreateAssistantDebuggerDeploymentUnauthenticated = PlatformError{ | |
| HTTPStatusCode: http.StatusUnauthorized, | |
| Code: CreateAssistantDebuggerDeploymentUnauthenticatedCode, | |
| Error: "unauthenticated request", | |
| ErrorMessage: "Unauthenticated request, please try again with valid authentication.", | |
| } | |
| CreateAssistantDebuggerDeploymentMissingAuthScope = PlatformError{ | |
| HTTPStatusCode: http.StatusForbidden, | |
| Code: CreateAssistantDebuggerDeploymentMissingAuthScopeCode, | |
| Error: "missing authentication scope", | |
| ErrorMessage: "Unauthenticated request, please try again with valid authentication.", | |
| } | |
| CreateAssistantDebuggerDeploymentUnauthenticated = PlatformError{ | |
| HTTPStatusCode: http.StatusUnauthorized, | |
| Code: CreateAssistantDebuggerDeploymentUnauthenticatedCode, | |
| Error: "unauthenticated request", | |
| ErrorMessage: "Unauthenticated request, please try again with valid authentication.", | |
| } | |
| CreateAssistantDebuggerDeploymentMissingAuthScope = PlatformError{ | |
| HTTPStatusCode: http.StatusForbidden, | |
| Code: CreateAssistantDebuggerDeploymentMissingAuthScopeCode, | |
| Error: "missing authentication scope", | |
| ErrorMessage: "Missing required authentication scope (user, project, or organization). Please verify your credentials include all required scopes.", | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/errors/create_assistant_deployment.go` around lines 29 - 40, The
CreateAssistantDebuggerDeploymentMissingAuthScope error definition has an
identical ErrorMessage to CreateAssistantDebuggerDeploymentUnauthenticated,
which makes it difficult for users to distinguish between the two authentication
failures. Update the ErrorMessage field in
CreateAssistantDebuggerDeploymentMissingAuthScope to provide a distinct,
user-friendly message that specifically indicates the issue is related to
missing authentication scopes rather than general unauthentication.
…ecorder fix: make observability recorder non-blocking
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
api/assistant-api/internal/observability/collectors/telemetry/collector.go (1)
83-86: ⚡ Quick winWrap lazy exporter initialization errors with collector context.
Returning
errdirectly loses which telemetry collector/exporter failed. Wrap it with%wso callers and tests can preserve the original error chain. As per coding guidelines,**/*.go: Return errors with context wrapping using fmt.Errorf with %w verb for error chain preservation.Proposed fix
import ( "context" "errors" + "fmt" "strconv" "strings" "sync" "time" @@ exporter, err := newExporter(ctx, c.logger, c.provider) if err != nil { c.mu.Unlock() - return err + return fmt.Errorf("telemetry: initialize exporter %q: %w", c.key, err) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@api/assistant-api/internal/observability/collectors/telemetry/collector.go` around lines 83 - 86, The error returned from the newExporter function call in the lazy initialization block loses context about which collector/exporter failed. Instead of returning err directly, wrap it with fmt.Errorf using the %w verb to preserve the error chain and provide contextual information about the failure. This will help callers and tests understand where the error originated in the collector initialization process.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@api/assistant-api/internal/observability/collectors/telemetry/collector.go`:
- Around line 190-204: The Close method in the Collector struct does not
properly track a terminal closed state, allowing subsequent Collect calls to use
a closed exporter and repeated Close calls to close the exporter multiple times.
Add a closed boolean field to the Collector struct (to be protected by the same
mu mutex), check this flag at the start of the Close method to return early if
already closed, set the closed flag to true after acquiring the lock and before
actually closing the exporter, and clear the exporter reference by setting
c.exporter to nil to prevent further use. This ensures Close becomes a true
terminal operation that prevents any subsequent operations on the closed
collector.
In `@api/assistant-api/internal/observability/recorder.go`:
- Around line 139-145: The current `addCollectorOperation` struct carries only a
single collector, which causes partial batch registration when `ErrBufferFull`
occurs during `AddCollectors` execution. Modify the `addCollectorOperation`
struct to carry a slice of collectors instead of a single collector field, then
update all sites that create and enqueue `addCollectorOperation` instances (at
lines 198-200, 241-247, and 354-355) to pass the entire batch of collectors in a
single enqueue operation, ensuring the whole batch either succeeds or fails
atomically as a single queue operation.
---
Nitpick comments:
In `@api/assistant-api/internal/observability/collectors/telemetry/collector.go`:
- Around line 83-86: The error returned from the newExporter function call in
the lazy initialization block loses context about which collector/exporter
failed. Instead of returning err directly, wrap it with fmt.Errorf using the %w
verb to preserve the error chain and provide contextual information about the
failure. This will help callers and tests understand where the error originated
in the collector initialization process.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 6e814f50-7d2b-42ba-a16f-1d8a237d5f04
📒 Files selected for processing (4)
api/assistant-api/internal/channel/telephony/outbound_test.goapi/assistant-api/internal/observability/collectors/telemetry/collector.goapi/assistant-api/internal/observability/recorder.goapi/assistant-api/internal/observability/recorder_test.go
| func (c *Collector) Close(ctx context.Context) error { | ||
| var errs []error | ||
| if validator.NonNil(c.exporter) { | ||
| if err := c.exporter.Close(ctx); err != nil { | ||
| c.mu.Lock() | ||
| if !c.initialized { | ||
| c.initialized = true | ||
| c.mu.Unlock() | ||
| return nil | ||
| } | ||
| exporter := c.exporter | ||
| c.mu.Unlock() | ||
| if validator.NonNil(exporter) { | ||
| if err := exporter.Close(ctx); err != nil { | ||
| errs = append(errs, err) | ||
| } | ||
| } |
There was a problem hiding this comment.
Make Close a terminal state for initialized collectors.
When Close runs after lazy initialization, it leaves initialized=true and exporter populated, so later Collect calls keep exporting through a closed exporter and repeated Close calls close it again. Track a closed state under the same mutex and clear the exporter before closing it.
Proposed fix
type Collector struct {
logger commons.Logger
provider Provider
exporter telemetry.Exporter
initialized bool
+ closed bool
mu sync.Mutex
key string
}
@@
func (c *Collector) Collect(ctx context.Context, scope observability.Scope, observationContext observability.Context, record observability.Record) error {
c.mu.Lock()
+ if c.closed {
+ c.mu.Unlock()
+ return nil
+ }
if !c.initialized {
exporter, err := newExporter(ctx, c.logger, c.provider)
if err != nil {
@@
func (c *Collector) Close(ctx context.Context) error {
var errs []error
c.mu.Lock()
- if !c.initialized {
- c.initialized = true
+ if c.closed {
c.mu.Unlock()
return nil
}
+ c.closed = true
+ if !c.initialized {
+ c.mu.Unlock()
+ return nil
+ }
exporter := c.exporter
+ c.exporter = nil
c.mu.Unlock()
if validator.NonNil(exporter) {
if err := exporter.Close(ctx); err != nil {
- errs = append(errs, err)
+ errs = append(errs, fmt.Errorf("telemetry: close exporter %q: %w", c.key, err))
}
}
return errors.Join(errs...)
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/observability/collectors/telemetry/collector.go`
around lines 190 - 204, The Close method in the Collector struct does not
properly track a terminal closed state, allowing subsequent Collect calls to use
a closed exporter and repeated Close calls to close the exporter multiple times.
Add a closed boolean field to the Collector struct (to be protected by the same
mu mutex), check this flag at the start of the Close method to return early if
already closed, set the closed flag to true after acquiring the lock and before
actually closing the exporter, and clear the exporter reference by setting
c.exporter to nil to prevent further use. This ensures Close becomes a true
terminal operation that prevents any subsequent operations on the closed
collector.
| type addCollectorOperation struct { | ||
| collector Collector | ||
| } | ||
|
|
||
| type recordOperation struct { | ||
| observation observation | ||
| } |
There was a problem hiding this comment.
Batch collector registration into one queue operation.
AddCollectors can currently enqueue some collectors, hit ErrBufferFull, and return an error after partial registration. Make addCollectorOperation carry the whole batch so the enqueue either succeeds or fails atomically.
Proposed fix
type addCollectorOperation struct {
- collector Collector
+ collectors []Collector
}
@@
- for _, collector := range resolvedOptions.collectors {
- r.operationQueue <- addCollectorOperation{collector: collector}
+ if len(resolvedOptions.collectors) > 0 {
+ r.operationQueue <- addCollectorOperation{
+ collectors: append([]Collector(nil), resolvedOptions.collectors...),
+ }
}
@@
- for _, collector := range collectors {
- select {
- case r.operationQueue <- addCollectorOperation{collector: collector}:
- default:
- return ErrBufferFull
- }
+ if len(collectors) == 0 {
+ return nil
+ }
+ select {
+ case r.operationQueue <- addCollectorOperation{collectors: append([]Collector(nil), collectors...)}:
+ default:
+ return ErrBufferFull
}
return nil
}
@@
switch typedOperation := queuedOperation.(type) {
case addCollectorOperation:
- r.fanout.AddCollectors(typedOperation.collector)
+ r.fanout.AddCollectors(typedOperation.collectors...)Also applies to: 198-200, 241-247, 354-355
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/observability/recorder.go` around lines 139 - 145,
The current `addCollectorOperation` struct carries only a single collector,
which causes partial batch registration when `ErrBufferFull` occurs during
`AddCollectors` execution. Modify the `addCollectorOperation` struct to carry a
slice of collectors instead of a single collector field, then update all sites
that create and enqueue `addCollectorOperation` instances (at lines 198-200,
241-247, and 354-355) to pass the entire batch of collectors in a single enqueue
operation, ensuring the whole batch either succeeds or fails atomically as a
single queue operation.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@api/assistant-api/internal/adapters/internal/stream.go`:
- Around line 182-191: The Notify method has two issues: First, when
t.streamer.Send(actionData) fails at line 183, the error is only logged and
dropped, but callers depend on this error for fallback observability—wrap the
error using fmt.Errorf with %w verb and return it instead of silently
continuing. Second, at line 184, t.logger.Errorf can panic if t.logger is nil,
but there is already a nil-guard for logger use at line 186—add the same
nil-check guard before calling t.logger.Errorf to prevent panic when the error
path is taken.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8110c822-966f-4870-8931-9f2a6dc8f134
📒 Files selected for processing (3)
api/assistant-api/internal/adapters/internal/dispatch_handler.goapi/assistant-api/internal/adapters/internal/stream.goapi/assistant-api/internal/type/packet.go
💤 Files with no reviewable changes (1)
- api/assistant-api/internal/type/packet.go
| start := time.Now() | ||
| if err := t.streamer.Send(actionData); err != nil { | ||
| t.logger.Errorf("error while notifing client %v", err) | ||
| } | ||
| if t.logger != nil { | ||
| t.logger.Benchmark(fmt.Sprintf("requestor.Notify.%T", actionData), time.Since(start)) | ||
| } | ||
|
|
||
| } | ||
| return nil |
There was a problem hiding this comment.
Notify currently hides send failures and has an unsafe nil-logger error path.
At Line 183, send errors are only logged and then dropped, but callers (e.g., configuration notification flow) depend on Notify errors to trigger fallback observability. Also, at Line 184, t.logger.Errorf(...) can panic when t.logger == nil (you already guard logger use at Line 186).
Use contextual wrapped returns (%w) and guard logger access in the error path.
Suggested fix
func (t *genericRequestor) Notify(ctx context.Context, actionDatas ...internal_type.Stream) error {
+ var firstErr error
for _, actionData := range actionDatas {
start := time.Now()
if err := t.streamer.Send(actionData); err != nil {
- t.logger.Errorf("error while notifing client %v", err)
+ if t.logger != nil {
+ t.logger.Errorf("error while notifying client: %v", err)
+ }
+ if firstErr == nil {
+ firstErr = fmt.Errorf("notify send failed for %T: %w", actionData, err)
+ }
}
if t.logger != nil {
t.logger.Benchmark(fmt.Sprintf("requestor.Notify.%T", actionData), time.Since(start))
}
}
- return nil
+ return firstErr
}As per coding guidelines, “Return errors with context wrapping using fmt.Errorf with %w verb for error chain preservation.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@api/assistant-api/internal/adapters/internal/stream.go` around lines 182 -
191, The Notify method has two issues: First, when t.streamer.Send(actionData)
fails at line 183, the error is only logged and dropped, but callers depend on
this error for fallback observability—wrap the error using fmt.Errorf with %w
verb and return it instead of silently continuing. Second, at line 184,
t.logger.Errorf can panic if t.logger is nil, but there is already a nil-guard
for logger use at line 186—add the same nil-check guard before calling
t.logger.Errorf to prevent panic when the error path is taken.
Source: Coding guidelines
Description
Tests
env GOCACHE=/private/tmp/voice-ai-gocache go test -timeout=60s -count=1
./api/assistant-api/internal/adapters/channel
./api/assistant-api/internal/adapters/router
./api/assistant-api/internal/adapters/internal
./api/assistant-api/internal/services/assistant
Type of Change
Related Issues
Fixes #
Checklist
General
Testing
Documentation
Security
Screenshots (if applicable)
Additional Notes
Summary by CodeRabbit
Release Notes
greeting_interruptibleto assistant deployments.POST /v1/assistant-deployment/create-debugger-deploymentwith full request/response support.greeting_interruptiblecolumn to assistant deployment tables.