diff --git a/pkg/model/provider/oaistream/adapter.go b/pkg/model/provider/oaistream/adapter.go index c3e12a03d..33c6423d7 100644 --- a/pkg/model/provider/oaistream/adapter.go +++ b/pkg/model/provider/oaistream/adapter.go @@ -5,6 +5,7 @@ This is a shared adapter for OpenAI-compatible streams. */ import ( + "encoding/json" "io" "github.com/openai/openai-go/v3" @@ -66,13 +67,24 @@ func (a *StreamAdapter) Recv() (chat.MessageStreamResponse, error) { a.lastFinishReason = finishReason } + // Extract reasoning_content from ExtraFields since the OpenAI SDK + // does not yet have a dedicated field for it. Providers like DMR + // send reasoning tokens as a "reasoning_content" JSON field in the + // chat completion chunk delta. + var reasoningContent string + if ef, ok := choice.Delta.JSON.ExtraFields["reasoning_content"]; ok && ef.Raw() != "" { + // ef.Raw() returns the raw JSON value (e.g. `"some text"`), so + // we unmarshal it to get the plain Go string. + _ = json.Unmarshal([]byte(ef.Raw()), &reasoningContent) + } + response.Choices[i] = chat.MessageStreamChoice{ Index: int(choice.Index), FinishReason: finishReason, Delta: chat.MessageDelta{ - Role: choice.Delta.Role, - Content: choice.Delta.Content, - // ReasoningContent not available in this SDK version + Role: choice.Delta.Role, + Content: choice.Delta.Content, + ReasoningContent: reasoningContent, }, } diff --git a/pkg/model/provider/oaistream/adapter_test.go b/pkg/model/provider/oaistream/adapter_test.go new file mode 100644 index 000000000..28a74da45 --- /dev/null +++ b/pkg/model/provider/oaistream/adapter_test.go @@ -0,0 +1,103 @@ +package oaistream + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/openai/openai-go/v3" + "github.com/openai/openai-go/v3/packages/ssestream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newTestStream creates an SSE stream from raw SSE event data served by a test HTTP server. +func newTestStream(t *testing.T, sseData string) *ssestream.Stream[openai.ChatCompletionChunk] { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(sseData)) + })) + t.Cleanup(srv.Close) + + resp, err := http.Get(srv.URL) //nolint:gosec,bodyclose // body is closed by the stream + require.NoError(t, err) + return ssestream.NewStream[openai.ChatCompletionChunk](ssestream.NewDecoder(resp), nil) +} + +func TestStreamAdapter_ReasoningContent(t *testing.T) { + t.Parallel() + + // Simulate SSE events with reasoning_content field in the delta, + // as sent by DMR for reasoning models. + sseData := `data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"role":"assistant","reasoning_content":"Let me think"},"finish_reason":null}]} + +data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"reasoning_content":" about this"},"finish_reason":null}]} + +data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"content":"Hello!"},"finish_reason":null}]} + +data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + +data: [DONE] + +` + + stream := newTestStream(t, sseData) + adapter := NewStreamAdapter(stream, false) + defer adapter.Close() + + // First chunk: reasoning content "Let me think" + resp, err := adapter.Recv() + require.NoError(t, err) + require.Len(t, resp.Choices, 1) + assert.Equal(t, "Let me think", resp.Choices[0].Delta.ReasoningContent) + assert.Empty(t, resp.Choices[0].Delta.Content) + + // Second chunk: reasoning content " about this" + resp, err = adapter.Recv() + require.NoError(t, err) + require.Len(t, resp.Choices, 1) + assert.Equal(t, " about this", resp.Choices[0].Delta.ReasoningContent) + assert.Empty(t, resp.Choices[0].Delta.Content) + + // Third chunk: regular content "Hello!" + resp, err = adapter.Recv() + require.NoError(t, err) + require.Len(t, resp.Choices, 1) + assert.Equal(t, "Hello!", resp.Choices[0].Delta.Content) + assert.Empty(t, resp.Choices[0].Delta.ReasoningContent) + + // Fourth chunk: finish reason stop + resp, err = adapter.Recv() + require.NoError(t, err) + require.Len(t, resp.Choices, 1) + assert.Equal(t, "stop", string(resp.Choices[0].FinishReason)) + + // Stream done + _, err = adapter.Recv() + assert.ErrorIs(t, err, io.EOF) +} + +func TestStreamAdapter_NoReasoningContent(t *testing.T) { + t.Parallel() + + // Simulate a normal stream without reasoning_content. + sseData := `data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]} + +data: {"id":"c1","object":"chat.completion.chunk","created":1,"model":"test","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]} + +data: [DONE] + +` + + stream := newTestStream(t, sseData) + adapter := NewStreamAdapter(stream, false) + defer adapter.Close() + + resp, err := adapter.Recv() + require.NoError(t, err) + require.Len(t, resp.Choices, 1) + assert.Equal(t, "Hi", resp.Choices[0].Delta.Content) + assert.Empty(t, resp.Choices[0].Delta.ReasoningContent) +}