Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
9931731
Disable Docker image build workflow
wasamtc Jan 14, 2026
cd8fe0b
Merge branch 'GradientHQ:main' into main
wasamtc Jan 16, 2026
e83ca8c
Merge branch 'GradientHQ:main' into main
wasamtc Jan 17, 2026
47cfe4c
Merge branch 'GradientHQ:main' into main
wasamtc Jan 19, 2026
dd067d6
Update build-spark-image.yaml
wasamtc Jan 19, 2026
89697a7
Update build-images.yaml
wasamtc Jan 19, 2026
5e74a89
Merge branch 'GradientHQ:main' into main
wasamtc Jan 21, 2026
b989af9
Merge branch 'GradientHQ:main' into main
wasamtc Jan 23, 2026
fe7a4c8
base gpu version:single node can run successly
Jan 29, 2026
d266c14
fix[chunked-prefill]:reordered list use origin list
Jan 30, 2026
1ed5d67
fix[chunked-prefill]:transfer all chunks instead of last chunk
Jan 30, 2026
876f131
fix[chunked-prefill]:restore last chunk
Jan 30, 2026
fd63706
feat[chunked-prefill]: base multi-node chunked prefill for gpu
Jan 31, 2026
13c6773
fix[chunked-prefill]: chunked_req should be send to local and next
Jan 31, 2026
3d9f72f
fix[chunked-prefill]: chunked_req should add it is origin req to base…
Jan 31, 2026
b232959
fix[chunked-prefill]: return can_run_list if not reorder reqs
Jan 31, 2026
4a14ecb
fix[chunked-prefill]: add log
Jan 31, 2026
c98594f
fix[chunked-prefill]: add log
Jan 31, 2026
ffce612
fix[chunked-prefill]: delete all chunked_reqs from base_forward becau…
Jan 31, 2026
b1cbd8c
fix[chunked-prefill]: change base_executor run_loop last judge condition
Jan 31, 2026
b6bd4d3
fix[chunked-prefill]: add log
Feb 1, 2026
185c184
fix[chunked-prefill]: change last peer just send to_forward_reqs
Feb 1, 2026
6d8268e
fix[chunked-prefill]: change message type bug
Feb 1, 2026
0d49612
fix[chunked-prefill]: last send forward_reqs when forward_reqs is not…
Feb 1, 2026
5a6936f
fix[chunked-prefill]: last send forward_reqs when forward_reqs is not…
Feb 1, 2026
8318548
fix[chunked-prefill]: handle_reqs should do other if not last_peer
Feb 1, 2026
4153611
fix[chunked-prefill]: add logs
Feb 1, 2026
484d66f
fix[chunked-prefill]: add logs
Feb 1, 2026
ecdbd0a
fix[chunked-prefill]: add logs
Feb 1, 2026
a4ebbd4
fix[chunked-prefill]: add logs
Feb 1, 2026
0d45d66
fix[chunked-prefill]: add logs
Feb 1, 2026
dd45122
fix[chunked-prefill]: add logs
Feb 1, 2026
2b87e5c
fix[chunked-prefill]: add logs
Feb 1, 2026
1f0373b
fix[chunked-prefill]: if req in running_batch, change status
Feb 1, 2026
4de2713
fix[chunked-prefill]: add logs
Feb 1, 2026
8a930a6
fix[chunked-prefill]: admit req update if req exists
Feb 1, 2026
2b5f9ee
fix[chunked-prefill]: add param for handle-input-reqs
Feb 1, 2026
f070057
fix[chunked-prefill]: every req can admit once per admit
Feb 1, 2026
80e80f3
feat[chunked-prefill]: base chunked-prefill for multi-gpu
Feb 2, 2026
8856d5d
Merge branch 'GradientHQ:main' into main
wasamtc Feb 2, 2026
2b83ebc
feat[chunked-prefill]: change for merge
Feb 2, 2026
635ed65
Restore cron schedule for automatic image builds
wasamtc Feb 2, 2026
a1c81ef
Enable scheduled workflow for building Spark image
wasamtc Feb 2, 2026
383e5bc
fix[mlx-executor]: add param from mlx-executor
Feb 2, 2026
5ec82d7
Merge remote-tracking branch 'origin/tangcong/gpu_chunk_v3' into tang…
Feb 2, 2026
3fcd074
fix[admit-req]: reenque already exists reqs
Feb 2, 2026
d2b10c6
fix[mac-test]: reenque already exists reqs
Feb 2, 2026
0fcbc5a
feat[chunked-prefill]: change chunked-prefill for mac
Feb 2, 2026
3b63715
fix[chunked-prefill]:prepare next req use origin_input_ids instead o…
Feb 2, 2026
4fda246
fix[chunked-prefill]:enqueue req use origin_input_ids instead of inp…
Feb 2, 2026
92def22
fix[chunked-prefill]: get matched tokens before chunked
Feb 3, 2026
a16f853
fix[chunked-prefill]: use effective total_len if it exits
Feb 3, 2026
c4de011
fix[chunked-prefill]: use new reqs instead of old chunked_reqs
Feb 3, 2026
719eb96
feat[chunked-prefill]: the max chunked-prefill
Feb 3, 2026
6342697
fix[prefix-cache]: use correct hidden_states
Feb 3, 2026
eac6229
fix[chunked-prefill]: delete kig
Feb 3, 2026
1b99726
Merge branch 'GradientHQ:main' into main
wasamtc Feb 4, 2026
3a2cb3f
Merge branch 'main' into tangcong/mac_chunk_v3
Feb 4, 2026
c2091cf
fix[chunked-prefill]: fix format
Feb 4, 2026
f68fa32
delete comments and add verify for chunked-prfill param
Feb 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 71 additions & 25 deletions src/parallax/server/executor/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ def __init__(
)

@abstractmethod
def handle_input_requests(self, requests: List[Request]):
def handle_input_requests(self, requests: List[Request], from_previous_peer: bool = False):
"""Update requests states and status in scheduler and cache manager."""

@abstractmethod
Expand Down Expand Up @@ -299,6 +299,11 @@ def recv_requests_from_peer(self) -> Tuple[List[Request], str]:
if recv_req is not None and len(recv_req) > 0:
for req in recv_req:
if req.hidden_states is not None:
size_attr = getattr(req.hidden_states, "size", None)
hidden_size = size_attr() if callable(size_attr) else size_attr
logger.debug(
f"recv request {req.request_id} hidden_states.length: {hidden_size}"
)
if req.hidden_states.dtype != self.dtype:
logger.debug(
f"Converting hidden_states dtype from {req.hidden_states.dtype} to {self.dtype} for request {req.request_id}"
Expand Down Expand Up @@ -386,9 +391,15 @@ def prepare_batch_inputs(self, batched_requests: List[Request]) -> Optional[Dict

def prepare_next_batch_requests(
self, requests: List[Request], batch_output: Any, context_lengths: Any
) -> List[Request]:
) -> Tuple[List[Request], List[Request]]:
"""Prepares a batch of requests for the next stage of the pipeline.

Returns two lists:
- chunked_reqs: requests that still have chunks to complete (e.g. chunked prefill);
these should be handled locally via handle_input_requests only, not sent to next peer.
- to_forward_reqs: requests to forward (single node: handle_input_requests;
multi-node and tp_rank==0: request_to_proto and send to next peer).

Args:
requests: List of requests in the batch
batch_output: Output from process_batch. Always a dict with:
Expand All @@ -403,7 +414,8 @@ def prepare_next_batch_requests(
hidden_states = batch_output["hidden_states"]
token_probs = batch_output["probs"]

batched_requests = []
chunked_reqs = []
to_forward_reqs = []
pre_length = 0
for i, src_request in enumerate(requests):
if self.is_last_peer:
Expand Down Expand Up @@ -437,9 +449,9 @@ def prepare_next_batch_requests(
next_req = self._prepare_next_single_request(
src_request, hidden_state_for_req, token_prob
)
batched_requests.append(next_req)
to_forward_reqs.append(next_req)

return batched_requests
return chunked_reqs, to_forward_reqs

def release_and_evict_request(self, rid: str):
"""Release per-request resources and evict from scheduler. Best-effort, never raises."""
Expand Down Expand Up @@ -471,7 +483,7 @@ def run_loop(self):
if self.enable_weight_refit:
self.check_and_refit_weight(refit_weight_path)

self.handle_input_requests(received_requests)
self.handle_input_requests(received_requests, from_previous_peer=True)
# Send abort signals to P2P server to broadcast to all nodes
if len(self.finished_batch) > 0 and self.tp_rank == 0:
self.send_to_peer_socket.send_multipart(
Expand Down Expand Up @@ -547,28 +559,62 @@ def run_loop(self):
except Exception:
pass
# 7. Prepare requests for the next stage in the pipeline
next_batch = self.prepare_next_batch_requests(
chunked_reqs, to_forward_reqs = self.prepare_next_batch_requests(
requests=prepared_inputs["requests"],
batch_output=output,
context_lengths=prepared_inputs.get("context_lengths"),
)

if chunked_reqs:
logger.debug(f"Handle {len(chunked_reqs)} chunked requests.")
self.handle_input_requests(chunked_reqs)
# 8. Dispatch to the appropriate destination
if self.is_last_peer and self.is_first_peer:
# Single node: handle locally
self.handle_input_requests(next_batch)
elif self.tp_rank == 0:
# Send output to next peer
self.send_to_peer_socket.send_multipart(
[
b"forward",
request_to_proto(next_batch, self.device).SerializeToString(),
]
)
logger.debug(
f"Processed batch of type {batch_type} with {len(next_batch)} requests "
f"in {(time.time() - start_time) * 1000:.3f} ms"
)
if to_forward_reqs or chunked_reqs:
logger.debug(f"dispatch to_forward and chunked_reqs to next peer.")
if (
self.is_last_peer
and self.is_first_peer
and (to_forward_reqs is not None and len(to_forward_reqs) > 0)
):
# Single node: handle to_forward locally
logger.debug(f"Handle {len(to_forward_reqs)} to_forward requests.")
self.handle_input_requests(to_forward_reqs)
elif self.tp_rank == 0:
# Send to_forward to next peer (do not send chunked_reqs if self is last_peer)
logger.debug(
f"self is last_peer: {self.is_last_peer}, self is first_peer: {self.is_first_peer}"
)
if not self.is_last_peer:
logger.debug(
f"Send {len(to_forward_reqs + chunked_reqs)} to_forward and chunked_reqs to next peer."
)
self.send_to_peer_socket.send_multipart(
[
b"forward",
request_to_proto(
to_forward_reqs + chunked_reqs, self.device
).SerializeToString(),
]
)
logger.debug(
f"Processed batch of with {len(to_forward_reqs + chunked_reqs)} to_forward and chunked_reqs "
f"in {(time.time() - start_time) * 1000:.3f} ms"
)
elif to_forward_reqs is not None and len(to_forward_reqs) > 0:
logger.debug(
f"Send {len(to_forward_reqs)} to_forward to next peer."
)
self.send_to_peer_socket.send_multipart(
[
b"forward",
request_to_proto(
to_forward_reqs, self.device
).SerializeToString(),
]
)
logger.debug(
f"Processed batch of with {len(to_forward_reqs)} to_forward "
f"in {(time.time() - start_time) * 1000:.3f} ms"
)

except Exception as e:
logger.exception(f"Error processing batch: {e}")
Expand Down Expand Up @@ -744,7 +790,7 @@ def _prepare_next_single_request(
request_id=request.request_id,
status=RequestStatus.DECODING,
current_position=request.total_length + 1,
input_ids=request.input_ids,
input_ids=request.origin_input_ids,
hidden_states=hidden_states,
next_token_id=next_token_id,
routing_table=request.routing_table,
Expand All @@ -763,7 +809,7 @@ def _prepare_next_single_request(
request_id=request.request_id,
status=RequestStatus.DECODING, # Last peer always changes status to DECODING
current_position=request.total_length,
input_ids=request.input_ids,
input_ids=request.origin_input_ids,
hidden_states=hidden_states,
next_token_id=next_token_id,
routing_table=request.routing_table,
Expand Down
1 change: 1 addition & 0 deletions src/parallax/server/executor/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def create_executor_config(args: argparse.Namespace, shared_state=None, conn=Non
"max_loaded_loras": args.max_loaded_loras,
"enable_weight_refit": args.enable_weight_refit,
"weight_refit_mode": args.weight_refit_mode,
"chunked_prefill_size": args.chunked_prefill_size,
}

if args.gpu_backend == "sglang":
Expand Down
Loading