Skip to content

Commit 23c7971

Browse files
fix: ensure streams are always closed
1 parent 3b5b4a6 commit 23c7971

1 file changed

Lines changed: 50 additions & 48 deletions

File tree

src/writerai/_streaming.py

Lines changed: 50 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -54,30 +54,31 @@ def __stream__(self) -> Iterator[_T]:
5454
process_data = self._client._process_response_data
5555
iterator = self._iter_events()
5656

57-
for sse in iterator:
58-
if sse.data.startswith("[DONE]"):
59-
break
60-
61-
if sse.event is None:
62-
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
63-
64-
if sse.event == "error":
65-
body = sse.data
66-
67-
try:
68-
body = sse.json()
69-
err_msg = f"{body}"
70-
except Exception:
71-
err_msg = sse.data or f"Error code: {response.status_code}"
72-
73-
raise self._client._make_status_error(
74-
err_msg,
75-
body=body,
76-
response=self.response,
77-
)
78-
79-
# As we might not fully consume the response stream, we need to close it explicitly
80-
response.close()
57+
try:
58+
for sse in iterator:
59+
if sse.data.startswith("[DONE]"):
60+
break
61+
62+
if sse.event is None:
63+
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
64+
65+
if sse.event == "error":
66+
body = sse.data
67+
68+
try:
69+
body = sse.json()
70+
err_msg = f"{body}"
71+
except Exception:
72+
err_msg = sse.data or f"Error code: {response.status_code}"
73+
74+
raise self._client._make_status_error(
75+
err_msg,
76+
body=body,
77+
response=self.response,
78+
)
79+
finally:
80+
# Ensure the response is closed even if the consumer doesn't read all data
81+
response.close()
8182

8283
def __enter__(self) -> Self:
8384
return self
@@ -136,30 +137,31 @@ async def __stream__(self) -> AsyncIterator[_T]:
136137
process_data = self._client._process_response_data
137138
iterator = self._iter_events()
138139

139-
async for sse in iterator:
140-
if sse.data.startswith("[DONE]"):
141-
break
142-
143-
if sse.event is None:
144-
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
145-
146-
if sse.event == "error":
147-
body = sse.data
148-
149-
try:
150-
body = sse.json()
151-
err_msg = f"{body}"
152-
except Exception:
153-
err_msg = sse.data or f"Error code: {response.status_code}"
154-
155-
raise self._client._make_status_error(
156-
err_msg,
157-
body=body,
158-
response=self.response,
159-
)
160-
161-
# As we might not fully consume the response stream, we need to close it explicitly
162-
await response.aclose()
140+
try:
141+
async for sse in iterator:
142+
if sse.data.startswith("[DONE]"):
143+
break
144+
145+
if sse.event is None:
146+
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
147+
148+
if sse.event == "error":
149+
body = sse.data
150+
151+
try:
152+
body = sse.json()
153+
err_msg = f"{body}"
154+
except Exception:
155+
err_msg = sse.data or f"Error code: {response.status_code}"
156+
157+
raise self._client._make_status_error(
158+
err_msg,
159+
body=body,
160+
response=self.response,
161+
)
162+
finally:
163+
# Ensure the response is closed even if the consumer doesn't read all data
164+
await response.aclose()
163165

164166
async def __aenter__(self) -> Self:
165167
return self

0 commit comments

Comments
 (0)