Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions pkg/model/provider/oaistream/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This is a shared adapter for OpenAI-compatible streams.
*/

import (
"encoding/json"
"io"

"github.com/openai/openai-go/v3"
Expand Down Expand Up @@ -66,13 +67,24 @@ func (a *StreamAdapter) Recv() (chat.MessageStreamResponse, error) {
a.lastFinishReason = finishReason
}

// Extract reasoning_content from ExtraFields since the OpenAI SDK
// does not yet have a dedicated field for it. Providers like DMR
// send reasoning tokens as a "reasoning_content" JSON field in the
// chat completion chunk delta.
var reasoningContent string
if ef, ok := choice.Delta.JSON.ExtraFields["reasoning_content"]; ok && ef.Raw() != "" {
// ef.Raw() returns the raw JSON value (e.g. `"some text"`), so
// we unmarshal it to get the plain Go string.
_ = json.Unmarshal([]byte(ef.Raw()), &reasoningContent)
}

response.Choices[i] = chat.MessageStreamChoice{
Index: int(choice.Index),
FinishReason: finishReason,
Delta: chat.MessageDelta{
Role: choice.Delta.Role,
Content: choice.Delta.Content,
// ReasoningContent not available in this SDK version
Role: choice.Delta.Role,
Content: choice.Delta.Content,
ReasoningContent: reasoningContent,
},
}

Expand Down
103 changes: 103 additions & 0 deletions pkg/model/provider/oaistream/adapter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package oaistream

import (
"io"
"net/http"
"net/http/httptest"
"testing"

"github.com/openai/openai-go/v3"
"github.com/openai/openai-go/v3/packages/ssestream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// newTestStream creates an SSE stream from raw SSE event data served by a test HTTP server.
func newTestStream(t *testing.T, sseData string) *ssestream.Stream[openai.ChatCompletionChunk] {
t.Helper()
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
_, _ = w.Write([]byte(sseData))
}))
t.Cleanup(srv.Close)

resp, err := http.Get(srv.URL) //nolint:gosec,bodyclose // body is closed by the stream
require.NoError(t, err)
return ssestream.NewStream[openai.ChatCompletionChunk](ssestream.NewDecoder(resp), nil)
}

func TestStreamAdapter_ReasoningContent(t *testing.T) {
t.Parallel()

// Simulate SSE events with reasoning_content field in the delta,
// as sent by DMR for reasoning models.
sseData := `data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"Let me think"},"finish_reason":null}]}

data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"reasoning_content":" about this"},"finish_reason":null}]}

data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"content":"Hello!"},"finish_reason":null}]}

data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

`

stream := newTestStream(t, sseData)
adapter := NewStreamAdapter(stream, false)
defer adapter.Close()

// First chunk: reasoning content "Let me think"
resp, err := adapter.Recv()
require.NoError(t, err)
require.Len(t, resp.Choices, 1)
assert.Equal(t, "Let me think", resp.Choices[0].Delta.ReasoningContent)
assert.Empty(t, resp.Choices[0].Delta.Content)

// Second chunk: reasoning content " about this"
resp, err = adapter.Recv()
require.NoError(t, err)
require.Len(t, resp.Choices, 1)
assert.Equal(t, " about this", resp.Choices[0].Delta.ReasoningContent)
assert.Empty(t, resp.Choices[0].Delta.Content)

// Third chunk: regular content "Hello!"
resp, err = adapter.Recv()
require.NoError(t, err)
require.Len(t, resp.Choices, 1)
assert.Equal(t, "Hello!", resp.Choices[0].Delta.Content)
assert.Empty(t, resp.Choices[0].Delta.ReasoningContent)

// Fourth chunk: finish reason stop
resp, err = adapter.Recv()
require.NoError(t, err)
require.Len(t, resp.Choices, 1)
assert.Equal(t, "stop", string(resp.Choices[0].FinishReason))

// Stream done
_, err = adapter.Recv()
assert.ErrorIs(t, err, io.EOF)
}

func TestStreamAdapter_NoReasoningContent(t *testing.T) {
t.Parallel()

// Simulate a normal stream without reasoning_content.
sseData := `data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}

data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

`

stream := newTestStream(t, sseData)
adapter := NewStreamAdapter(stream, false)
defer adapter.Close()

resp, err := adapter.Recv()
require.NoError(t, err)
require.Len(t, resp.Choices, 1)
assert.Equal(t, "Hi", resp.Choices[0].Delta.Content)
assert.Empty(t, resp.Choices[0].Delta.ReasoningContent)
}
Loading