Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ dev = [
"pytest-codspeed>=4.1.1",
"pytest-httpbin==2.0.0",
"pytest-trio==0.8.0",
"starlette>=0.49",
"trio==0.31.0",
"trio-typing==0.10.0",
"trustme==1.2.1",
"uvicorn>=0.35",
"websockets>=15",
"wsproto>=1.2",
"werkzeug>=3.1.6",
# Linting
"mypy==1.17.1",
Expand Down
10 changes: 10 additions & 0 deletions src/httpx2/httpx2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@
from ._transports import *
from ._types import *
from ._urls import *
from ._websockets import *

__all__ = [
"__description__",
"__title__",
"__version__",
"ASGITransport",
"ASGIWebSocketTransport",
"AsyncBaseTransport",
"AsyncByteStream",
"AsyncClient",
"AsyncHTTPTransport",
"AsyncWebSocketSession",
"Auth",
"BaseTransport",
"BasicAuth",
Expand Down Expand Up @@ -78,6 +81,13 @@
"UnsupportedProtocol",
"URL",
"USE_CLIENT_DEFAULT",
"websocket",
"WebSocketDisconnect",
"WebSocketException",
"WebSocketInvalidTypeReceived",
"WebSocketNetworkError",
"WebSocketSession",
"WebSocketUpgradeError",
"WriteError",
"WriteTimeout",
"WSGITransport",
Expand Down
63 changes: 63 additions & 0 deletions src/httpx2/httpx2/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
TimeoutTypes,
)
from ._urls import URL
from ._websockets._session import (
DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
DEFAULT_MAX_MESSAGE_SIZE_BYTES,
DEFAULT_QUEUE_SIZE,
WebSocketSession,
)

if typing.TYPE_CHECKING:
import ssl # pragma: no cover
Expand All @@ -34,6 +41,7 @@
"put",
"request",
"stream",
"websocket",
]


Expand Down Expand Up @@ -159,6 +167,61 @@ def stream(
yield response


@contextmanager
def websocket(
url: URL | str,
*,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | None = None,
proxy: ProxyTypes | None = None,
timeout: TimeoutTypes = DEFAULT_TIMEOUT_CONFIG,
follow_redirects: bool = False,
verify: ssl.SSLContext | str | bool = True,
trust_env: bool = True,
subprotocols: list[str] | None = None,
max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
queue_size: int = DEFAULT_QUEUE_SIZE,
keepalive_ping_interval_seconds: float | None = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
keepalive_ping_timeout_seconds: float | None = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
) -> Generator[WebSocketSession]:
"""
Open a WebSocket session.

The session is closed automatically when exiting the context manager.

```
>>> import httpx2
>>> with httpx2.websocket("wss://echo.websocket.org") as ws:
... ws.send_text("Hello!")
... message = ws.receive_text()
```

**Parameters**: See `httpx2.request` and `httpx2.Client.websocket`.
"""
with Client(
cookies=cookies,
proxy=proxy,
verify=verify,
timeout=timeout,
trust_env=trust_env,
) as client:
with client.websocket(
url,
params=params,
headers=headers,
auth=auth,
follow_redirects=follow_redirects,
subprotocols=subprotocols,
max_message_size_bytes=max_message_size_bytes,
queue_size=queue_size,
keepalive_ping_interval_seconds=keepalive_ping_interval_seconds,
keepalive_ping_timeout_seconds=keepalive_ping_timeout_seconds,
) as session:
yield session


def get(
url: URL | str,
*,
Expand Down
145 changes: 145 additions & 0 deletions src/httpx2/httpx2/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@
)
from ._urls import URL, QueryParams
from ._utils import URLPattern, get_environment_proxies
from ._websockets._session import (
DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
DEFAULT_MAX_MESSAGE_SIZE_BYTES,
DEFAULT_QUEUE_SIZE,
AsyncWebSocketSession,
WebSocketSession,
aconnect_ws,
connect_ws,
)

if typing.TYPE_CHECKING:
import ssl # pragma: no cover
Expand Down Expand Up @@ -845,6 +855,71 @@ def stream(
finally:
response.close()

@contextmanager
def websocket(
self,
url: URL | str,
*,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
subprotocols: list[str] | None = None,
max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
queue_size: int = DEFAULT_QUEUE_SIZE,
keepalive_ping_interval_seconds: float | None = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
keepalive_ping_timeout_seconds: float | None = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
) -> Generator[WebSocketSession]:
"""
Open a WebSocket session.

The session is closed automatically when exiting the context manager.

```python
with httpx2.Client() as client:
with client.websocket("wss://example.com/ws") as ws:
ws.send_text("Hello!")
message = ws.receive_text()
```

**Parameters**: See `httpx2.request` for the request parameters, plus:

* **subprotocols** - *(optional)* A list of subprotocols to negotiate with the server.
* **max_message_size_bytes** - Message size in bytes to receive from the server. Defaults to 64 KiB.
* **queue_size** - Size of the queue where the received messages will be held
until they are consumed. If the queue is full, the client will stop receiving
messages from the server until the queue has room available. Defaults to 512.
* **keepalive_ping_interval_seconds** - Interval at which the client will automatically
send a Ping event to keep the connection alive. Set it to `None` to disable
this mechanism. Defaults to 20 seconds.
* **keepalive_ping_timeout_seconds** - Maximum delay the client will wait for an answer
to its Ping event. If the delay is exceeded, `httpx2.WebSocketNetworkError` will be
raised and the connection closed. Defaults to 20 seconds.

Raises `httpx2.WebSocketUpgradeError` if the connection didn't correctly
upgrade to a WebSocket session.
"""
with connect_ws(
self,
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
subprotocols=subprotocols,
max_message_size_bytes=max_message_size_bytes,
queue_size=queue_size,
keepalive_ping_interval_seconds=keepalive_ping_interval_seconds,
keepalive_ping_timeout_seconds=keepalive_ping_timeout_seconds,
) as session:
yield session

def send(
self,
request: Request,
Expand Down Expand Up @@ -1548,6 +1623,76 @@ async def stream(
finally:
await response.aclose()

@asynccontextmanager
async def websocket(
self,
url: URL | str,
*,
params: QueryParamTypes | None = None,
headers: HeaderTypes | None = None,
cookies: CookieTypes | None = None,
auth: AuthTypes | UseClientDefault | None = USE_CLIENT_DEFAULT,
follow_redirects: bool | UseClientDefault = USE_CLIENT_DEFAULT,
timeout: TimeoutTypes | UseClientDefault = USE_CLIENT_DEFAULT,
extensions: RequestExtensions | None = None,
subprotocols: list[str] | None = None,
max_message_size_bytes: int = DEFAULT_MAX_MESSAGE_SIZE_BYTES,
queue_size: int = DEFAULT_QUEUE_SIZE,
keepalive_ping_interval_seconds: float | None = DEFAULT_KEEPALIVE_PING_INTERVAL_SECONDS,
keepalive_ping_timeout_seconds: float | None = DEFAULT_KEEPALIVE_PING_TIMEOUT_SECONDS,
) -> AsyncGenerator[AsyncWebSocketSession]:
"""
Open a WebSocket session.

The session is closed automatically when exiting the context manager.

```python
async with httpx2.AsyncClient() as client:
async with client.websocket("wss://example.com/ws") as ws:
await ws.send_text("Hello!")
message = await ws.receive_text()
```

Internally, the session uses an anyio task group to manage background tasks.
As a result, exceptions that are not caught inside the context manager and
propagate out of the `async with` block will be wrapped in an `ExceptionGroup`.
Use the `except*` syntax to handle them.

**Parameters**: See `httpx2.request` for the request parameters, plus:

* **subprotocols** - *(optional)* A list of subprotocols to negotiate with the server.
* **max_message_size_bytes** - Message size in bytes to receive from the server. Defaults to 64 KiB.
* **queue_size** - Size of the queue where the received messages will be held
until they are consumed. If the queue is full, the client will stop receiving
messages from the server until the queue has room available. Defaults to 512.
* **keepalive_ping_interval_seconds** - Interval at which the client will automatically
send a Ping event to keep the connection alive. Set it to `None` to disable
this mechanism. Defaults to 20 seconds.
* **keepalive_ping_timeout_seconds** - Maximum delay the client will wait for an answer
to its Ping event. If the delay is exceeded, `httpx2.WebSocketNetworkError` will be
raised and the connection closed. Defaults to 20 seconds.

Raises `httpx2.WebSocketUpgradeError` if the connection didn't correctly
upgrade to a WebSocket session.
"""
async with aconnect_ws(
self,
url,
params=params,
headers=headers,
cookies=cookies,
auth=auth,
follow_redirects=follow_redirects,
timeout=timeout,
extensions=extensions,
subprotocols=subprotocols,
max_message_size_bytes=max_message_size_bytes,
queue_size=queue_size,
keepalive_ping_interval_seconds=keepalive_ping_interval_seconds,
keepalive_ping_timeout_seconds=keepalive_ping_timeout_seconds,
) as session:
yield session

async def send(
self,
request: Request,
Expand Down
20 changes: 20 additions & 0 deletions src/httpx2/httpx2/_websockets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from ._exceptions import (
WebSocketDisconnect,
WebSocketException,
WebSocketInvalidTypeReceived,
WebSocketNetworkError,
WebSocketUpgradeError,
)
from ._session import AsyncWebSocketSession, WebSocketSession
from ._transport import ASGIWebSocketTransport

__all__ = [
"ASGIWebSocketTransport",
"AsyncWebSocketSession",
"WebSocketDisconnect",
"WebSocketException",
"WebSocketInvalidTypeReceived",
"WebSocketNetworkError",
"WebSocketSession",
"WebSocketUpgradeError",
]
64 changes: 64 additions & 0 deletions src/httpx2/httpx2/_websockets/_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
"""
Our exception hierarchy:

* WebSocketException
x WebSocketUpgradeError
x WebSocketDisconnect
x WebSocketInvalidTypeReceived
x WebSocketNetworkError
"""

from __future__ import annotations

import typing

if typing.TYPE_CHECKING:
from .._models import Response # pragma: no cover

__all__ = [
"WebSocketDisconnect",
"WebSocketException",
"WebSocketInvalidTypeReceived",
"WebSocketNetworkError",
"WebSocketUpgradeError",
]


class WebSocketException(Exception):
"""
Base class for all WebSocket exceptions.
"""


class WebSocketUpgradeError(WebSocketException):
"""
The initial connection didn't correctly upgrade to a WebSocket session.
"""

def __init__(self, response: Response) -> None:
self.response = response


class WebSocketDisconnect(WebSocketException):
"""
The server closed the WebSocket session.
"""

def __init__(self, code: int = 1000, reason: str | None = None) -> None:
self.code = code
self.reason = reason or ""


class WebSocketInvalidTypeReceived(WebSocketException):
"""
A received message was not of the expected type.
"""

def __init__(self, message: str | bytes) -> None:
self.message = message


class WebSocketNetworkError(WebSocketException):
"""
A network error occurred, typically because the underlying stream has closed or timed out.
"""
Loading
Loading