Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ We structure this changelog in accordance with [Keep a Changelog](https://keepac
- ETL `FastAPIServer`: stream no-FQN `hpush` PUTs in constant memory,
mirroring the `hpull` GET change. Transient direct-put errors surface to AIS
to retry the whole PUT (request body is one-shot).
- ETL `HTTPMultiThreadedServer`: stream no-FQN `hpush` PUTs in constant
memory. Previously the full request body was read into a `BytesIO`
before being handed to `transform_stream`.
- **ETL direct-put retry**: added exponential-backoff retry for transient connection
errors in Flask and HTTP multi-threaded ETL servers for parity with FastAPI.
`ConnectionRefused` is now treated as a permanent error that returns HTTP 502
Expand Down
26 changes: 26 additions & 0 deletions python/aistore/sdk/etl/webserver/base_etl_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,32 @@ def _handle_direct_put_transient_error(
raise ETLDirectPutTransientError(direct_put_url, exc) from exc


def _compute_replayable_retries(
fqn: str, is_get: bool, direct_put_retries: int
) -> Tuple[bool, int]:
"""
Determine whether the streaming source is replayable and the effective
retry budget.

Sources backed by a local FQN file or a GET stream can be reopened on
retry. No-FQN PUT bodies are one-shot (consumed from the request socket)
and cannot be replayed locally, so the retry budget is forced to zero.

Args:
fqn: Local FQN of the source object; empty for streaming PUT.
is_get: ``True`` for hpull GET, ``False`` for hpush PUT.
direct_put_retries: Configured retry count.

Returns:
``(replayable, effective_retries)`` — `replayable` is ``True`` when
the source can be reopened; `effective_retries` equals
`direct_put_retries` when replayable, else ``0``.
"""
replayable = bool(fqn) or is_get
effective_retries = direct_put_retries if replayable else 0
return replayable, effective_retries


class ETLServer(ABC): # pylint: disable=too-many-instance-attributes
"""
Abstract base class for all ETL servers.
Expand Down
9 changes: 6 additions & 3 deletions python/aistore/sdk/etl/webserver/fastapi_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
CountingIterator,
RETRY_BACKOFF_BASE,
RETRY_BACKOFF_MAX,
_compute_replayable_retries,
)
from aistore.sdk.session_manager import resolve_ssl_config
from aistore.sdk.etl.webserver.fastapi_streaming import (
Expand Down Expand Up @@ -375,12 +376,14 @@ async def _direct_put_stream_with_retry( # pylint: disable=too-many-arguments,t
Raises:
ETLDirectPutTransientError: if all retry attempts are exhausted.
"""
replayable = bool(fqn) or is_get
effective_retries = self.direct_put_retries if replayable else 0
replayable, effective_retries = _compute_replayable_retries(
fqn, is_get, self.direct_put_retries
)
if not replayable and self.direct_put_retries:
self.logger.debug(
"no-FQN PUT: source not replayable; "
"local retries skipped, AIS will retry"
"local retries skipped; transient direct-put error "
"will surface as transform failure"
)

reader = await self._get_stream_reader(fqn, path, request, is_get)
Expand Down
93 changes: 75 additions & 18 deletions python/aistore/sdk/etl/webserver/http_multi_threaded_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved.
#

import io
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from io import BytesIO
from socketserver import ThreadingMixIn
from typing import Iterator, Type, Tuple
from typing import BinaryIO, Iterator, Type, Tuple
import signal
import threading
from urllib.parse import urlparse, parse_qs
Expand All @@ -20,6 +20,7 @@
RETRY_BACKOFF_BASE,
RETRY_BACKOFF_MAX,
_handle_direct_put_transient_error,
_compute_replayable_retries,
)
from aistore.sdk.etl.webserver.utils import (
compose_etl_direct_put_url,
Expand All @@ -40,6 +41,57 @@
)


class _RFileLimitedReader(io.RawIOBase):
"""Bound `BaseHTTPRequestHandler.rfile` to the current PUT body length.

`self.rfile` is the raw connection stream; it has no intrinsic EOF at the
end of this request body. Passing it directly to `transform_stream` would
cause any transform that calls `reader.read()` with no size argument to
block indefinitely waiting for the client to close the connection.

This wrapper tracks `Content-Length` remaining bytes and clamps every
`read()` call accordingly, giving transforms the same EOF semantics they
get from a `BytesIO` — without buffering the full body upfront.

The request body is one-shot; `_direct_put_stream_with_retry` sets
`effective_retries=0` on this path. `close()` drains any unread bytes
from the request body so a transform that exits early does not leave
residual data on a keep-alive connection.
"""

def __init__(self, rfile: BinaryIO, content_length: int) -> None:
self._rfile = rfile
self._remaining = content_length

def readable(self) -> bool:
return True

def read(self, size: int = -1) -> bytes:
if self._remaining == 0:
return b""
if size is None or size < 0:
data = self._rfile.read(self._remaining)
self._remaining = 0
return data
to_read = min(size, self._remaining)
data = self._rfile.read(to_read)
self._remaining -= len(data)
return data

Comment thread
chanu1406 marked this conversation as resolved.
def close(self) -> None:
try:
while self._remaining > 0:
try:
chunk = self._rfile.read(min(self._remaining, 65536))
except (OSError, ValueError):
break
if not chunk:
break
self._remaining -= len(chunk)
finally:
super().close()


class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""
Multi-threaded HTTP server that delegates ETL logic to a provided ETLServer instance.
Expand Down Expand Up @@ -161,11 +213,10 @@ def _get_stream_reader(self, fqn, raw_path, is_get):
resp.close()
raise
return _ResponseRawReader(resp)
# TODO: non-FQN PUT still buffers the full request body into BytesIO so
# retries can replay it; true streaming for this path still needs to be
# implemented.
# Request body is one-shot; local retries are skipped for this path
# (see _direct_put_stream_with_retry).
content_length = int(self.headers.get(HEADER_CONTENT_LENGTH, 0))
return BytesIO(self.rfile.read(content_length))
return _RFileLimitedReader(self.rfile, content_length)

def _direct_put_with_retry(
self,
Expand Down Expand Up @@ -208,15 +259,18 @@ def _direct_put_stream_with_retry( # pylint: disable=too-many-arguments,too-man
"""
Streaming direct-put with exponential-backoff retry on transient errors.

For FQN and GET sources, the reader is closed and reopened on each retry.
For PUT requests without FQN, the body is buffered in a BytesIO by
_get_stream_reader and seeked back to 0 on retry so the same bytes are
replayed (mirrors FastAPIServer._direct_put_stream_with_retry).
Replayable sources (FQN-backed or GET) close and reopen the reader on
each retry. No-FQN PUT bodies are one-shot (request body is consumed
from the socket); effective_retries is forced to 0 and a transient
direct-put error surfaces to AIS as a transform failure.
"""
etl = self.server.etl_server
replayable, effective_retries = _compute_replayable_retries(
fqn, is_get, etl.direct_put_retries
)
reader = self._get_stream_reader(fqn, raw_path, is_get)
try:
for attempt in range(etl.direct_put_retries + 1):
for attempt in range(effective_retries + 1):
try:
return self._direct_put_stream(
direct_put_url,
Expand All @@ -225,22 +279,25 @@ def _direct_put_stream_with_retry( # pylint: disable=too-many-arguments,too-man
raw_path,
)
except ETLDirectPutTransientError as exc:
if attempt >= etl.direct_put_retries:
if attempt >= effective_retries:
if not replayable and etl.direct_put_retries:
Comment thread
chanu1406 marked this conversation as resolved.
etl.logger.debug(
"no-FQN PUT: source not replayable; "
"local retries skipped; transient direct-put error "
"will surface as transform failure"
)
raise
delay = min(RETRY_BACKOFF_BASE**attempt, RETRY_BACKOFF_MAX)
etl.logger.warning(
"direct_put_stream attempt %d/%d failed, retrying in %.1fs: %s",
attempt + 1,
etl.direct_put_retries + 1,
effective_retries + 1,
delay,
exc,
exc_info=True,
)
if isinstance(reader, BytesIO):
reader.seek(0)
else:
etl.close_reader(reader)
reader = self._get_stream_reader(fqn, raw_path, is_get)
etl.close_reader(reader)
reader = self._get_stream_reader(fqn, raw_path, is_get)
time.sleep(delay)
finally:
etl.close_reader(reader)
Expand Down
Loading
Loading