Skip to content

Commit 7b8fbf7

Browse files
lvan100lianghuan
authored andcommitted
111
1 parent 5560d59 commit 7b8fbf7

File tree

3 files changed

+66
-70
lines changed

3 files changed

+66
-70
lines changed

gen/generator/golang/client.go

Lines changed: 45 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/httputil/httputil.go

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -42,55 +42,51 @@ type Message struct {
4242
Err error
4343
}
4444

45+
type IStream interface {
46+
Send(msg Message) bool
47+
}
48+
4549
// SSEEvent represents an SSE event.
46-
type SSEEvent[T any] struct {
50+
type SSEEvent struct {
4751
ID string
48-
Data T
52+
Data string
4953
Event string
5054
Retry int
5155
}
5256

53-
type IStream interface {
54-
Send(msg Message) bool
55-
}
56-
5757
// Stream manages streaming data asynchronously from an HTTP response.
5858
// It supports safe concurrent use and can be closed idempotently.
59-
type Stream[T any] struct {
59+
type Stream struct {
6060
msgs chan Message
6161
curr Message
6262
closed atomic.Bool
6363
done chan struct{}
6464
}
6565

6666
// NewStream creates and initializes a new Stream instance.
67-
func NewStream[T any]() *Stream[T] {
68-
return &Stream[T]{
67+
func NewStream() *Stream {
68+
return &Stream{
6969
msgs: make(chan Message),
7070
done: make(chan struct{}),
7171
}
7272
}
7373

7474
// Event unmarshals the current data item into an SSEEvent.
75-
func (s *Stream[T]) Event() (SSEEvent[T], error) {
76-
var e SSEEvent[T]
77-
err := json.Unmarshal([]byte(s.curr.Data), &e)
78-
if err != nil {
79-
return SSEEvent[T]{}, err
80-
}
75+
func (s *Stream) Event() (SSEEvent, error) {
76+
var e SSEEvent
8177
return e, nil
8278
}
8379

8480
// Error returns the last error encountered by the stream.
85-
func (s *Stream[T]) Error() error {
81+
func (s *Stream) Error() error {
8682
return s.curr.Err
8783
}
8884

8985
// Next waits for the next data item from the stream,
9086
// honoring the provided context and optional timeout.
9187
// Returns true if a new data item is successfully received,
9288
// or false if the stream is closed, the context is done, or an error occurs.
93-
func (s *Stream[T]) Next(ctx context.Context, timeout time.Duration) bool {
89+
func (s *Stream) Next(ctx context.Context, timeout time.Duration) bool {
9490
if s.closed.Load() {
9591
return false
9692
}
@@ -125,7 +121,7 @@ func (s *Stream[T]) Next(ctx context.Context, timeout time.Duration) bool {
125121

126122
// Send pushes a Message into the internal channel.
127123
// Returns false if the stream is closed or already done.
128-
func (s *Stream[T]) Send(msg Message) bool {
124+
func (s *Stream) Send(msg Message) bool {
129125
if s.closed.Load() {
130126
return false
131127
}
@@ -139,7 +135,7 @@ func (s *Stream[T]) Send(msg Message) bool {
139135

140136
// Close closes the Stream safely (idempotent).
141137
// It ensures that the internal channels are closed only once.
142-
func (s *Stream[T]) Close() {
138+
func (s *Stream) Close() {
143139
if s.closed.CompareAndSwap(false, true) {
144140
close(s.done)
145141
close(s.msgs)
@@ -315,8 +311,8 @@ func JSONResponse[RespType any](r *http.Request, opts ...RequestOption) (*http.R
315311

316312
// StreamResponse executes the given HTTP request using the provided HTTPClient,
317313
// and returns a Stream instance for streaming the response body.
318-
func StreamResponse[T any](r *http.Request, opts ...RequestOption) (*http.Response, *Stream[T], error) {
319-
s := NewStream[T]()
314+
func StreamResponse(r *http.Request, opts ...RequestOption) (*http.Response, *Stream, error) {
315+
s := NewStream()
320316
resp, err := DefaultClient.Stream(r, buildMeta(opts), s)
321317
if err != nil {
322318
return nil, nil, err

lib/httputil/httputil_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ type StreamResponse struct {
295295

296296
// Stream sends a POST request to the /v1/stream endpoint with the given request body.
297297
func (c *HelloClient) Stream(ctx context.Context, req *StreamRequest, opts ...httputil.RequestOption) (
298-
*http.Response, *httputil.Stream[*StreamResponse], error) {
298+
*http.Response, *httputil.Stream, error) {
299299

300300
path := "/v1/stream"
301301
if s, err := req.QueryString(); err != nil {
@@ -321,7 +321,7 @@ func (c *HelloClient) Stream(ctx context.Context, req *StreamRequest, opts ...ht
321321
opts = append(opts, httputil.WithTarget(c.ServiceName))
322322
opts = append(opts, httputil.WithPath("/v1/hello"))
323323
opts = append(opts, httputil.WithSchema("http"))
324-
return httputil.StreamResponse[*StreamResponse](r, opts...)
324+
return httputil.StreamResponse(r, opts...)
325325
}
326326

327327
func TestHello(t *testing.T) {
@@ -394,8 +394,8 @@ func TestStream(t *testing.T) {
394394
_ = r.Header.Write(os.Stdout)
395395
fmt.Println()
396396
for i := range 5 {
397-
_, _ = w.Write([]byte(fmt.Sprintf("%d: ", i)))
398-
_, _ = w.Write([]byte(`{"message": "hello world"}`))
397+
_, _ = w.Write([]byte(fmt.Sprintf("id: %v\n", i)))
398+
_, _ = w.Write([]byte(fmt.Sprintf("data: %s\n", `{"message": "hello world"}`)))
399399
_, _ = w.Write([]byte("\n\n"))
400400
w.(http.Flusher).Flush()
401401
time.Sleep(time.Millisecond * 500)

0 commit comments

Comments
 (0)