From deb04abf73a6592eca04df454c91baf18d61b9a0 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Mon, 25 Aug 2025 18:41:15 +0800 Subject: [PATCH 01/10] [Feature] support clear data --- fastdeploy/engine/engine.py | 4 +++ .../engine/sched/resource_manager_v1.py | 4 +++ fastdeploy/entrypoints/openai/api_server.py | 1 + .../inter_communicator/engine_worker_queue.py | 7 +++++ fastdeploy/output/token_processor.py | 29 +++++++++++++++++++ fastdeploy/rl/dynamic_weight_manager.py | 1 + fastdeploy/worker/gcu_model_runner.py | 9 ++++++ fastdeploy/worker/gpu_model_runner.py | 9 ++++++ fastdeploy/worker/worker_process.py | 4 +-- 9 files changed, 66 insertions(+), 2 deletions(-) diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index d09f02122f..c9bcda85ee 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -1360,6 +1360,10 @@ def detect_thread(): pass return True + def clear_data(self): + self.token_processor.clear_data() + self.engine_worker_queue.clear_data() + def start_queue_service(self): """ start queue service for engine worker communication diff --git a/fastdeploy/engine/sched/resource_manager_v1.py b/fastdeploy/engine/sched/resource_manager_v1.py index 95f2c235d7..a12985ff65 100644 --- a/fastdeploy/engine/sched/resource_manager_v1.py +++ b/fastdeploy/engine/sched/resource_manager_v1.py @@ -443,3 +443,7 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]): del self.requests[req_id] except Exception as e: llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}") + + def clear_data(self): + self.waiting: deque[Request] = deque() + self.to_be_rescheduled_request_id_set = set() diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 6abdcb7684..2bd06689d7 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -449,6 +449,7 @@ def reset_scheduler(): if llm_engine is None: return Response("Engine not loaded", status_code=500) + llm_engine.clear_data() llm_engine.scheduler.reset() return Response("Scheduler Reset Successfully", status_code=200) diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index da88265a26..528b73673f 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -392,6 +392,13 @@ def get_disaggregated_tasks(self): llm_logger.debug("get tasks from queue success") return item + def clear_data(self): + self.lock.acquire() + self.tasks[:] = list() + self.client_read_flag[:] = [0] * self.num_client + self.lock.release() + llm_logger.info("clear data for engine worker queue") + def cleanup(self): """ Exit the worker queue gracefully. diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index c72150a284..2762f89314 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -505,6 +505,35 @@ def _record_speculative_decoding_mertics(self, accept_num): single_head_acceptance_rate ) + def clear_data(self): + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + self.resource_manager.clear_data() + batch_result = [] + for i in range(self.cfg.max_num_seqs): + if self.resource_manager.stop_flags[i]: + continue + task = self.resource_manager.tasks_list[i] + result = RequestOutput( + request_id=task.request_id, + outputs=CompletionOutput( + index=i, + send_idx=self.tokens_counter[task.request_id], + token_ids=task.eos_token_ids, + draft_token_ids=[], + ), + finished=True, + metrics=RequestMetrics( + arrival_time=time.time(), + request_start_time=task.arrival_time, + ), + ) + result.error_msg = "Clear weight, stop all inference!" + is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill" + self._recycle_resources(task.request_id, i, task, result, is_prefill) + llm_logger.warning(f"clear data for task {task.request_id}") + batch_result.append(result) + self.postprocess(batch_result) + class WarmUpTokenProcessor(TokenProcessor): """ diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index ad39accdb2..4b75d3529c 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -218,6 +218,7 @@ def check_model_weights_status(model_weights_status, model_runner, pid): model_runner.update_parameters(pid) elif model_weights_status.value[0] == -1: logger.info("infer engine stopped! start to clear checkpoint...") + model_runner.clear_requests() model_runner.clear_parameters(pid) while True: diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index d1f8f2c689..3844f7fac1 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -1201,6 +1201,15 @@ def clear_parameters(self, pid): paddle.device.cuda.empty_cache() self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") + def clear_requests(self): + """ " Dynamic model loader use to clear requests use for RL""" + self.share_inputs["block_tables"][:, :] = -1 + self.share_inputs["stop_flags"][:] = True + self.seq_lens_this_time_buffer[:] = 0 + self.share_inputs["seq_lens_decoder"][:] = 0 + self.share_inputs["seq_lens_encoder"][:] = 0 + self.share_inputs["is_block_step"][:] = False + def update_parameters(self, pid): """ " Dynamic model loader use to update parameters use for RL""" self.dynamic_weight_manager.update_parameters(pid) diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 7e7165c74a..d3b55a2a71 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1597,6 +1597,15 @@ def clear_parameters(self, pid): paddle.device.cuda.empty_cache() self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") + def clear_requests(self): + """ " Dynamic model loader use to clear requests use for RL""" + self.share_inputs["block_tables"][:, :] = -1 + self.share_inputs["stop_flags"][:] = True + self.seq_lens_this_time_buffer[:] = 0 + self.share_inputs["seq_lens_decoder"][:] = 0 + self.share_inputs["seq_lens_encoder"][:] = 0 + self.share_inputs["is_block_step"][:] = False + def update_parameters(self, pid): """ " Dynamic model loader use to update parameters use for RL""" self.dynamic_weight_manager.update_parameters(pid) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 6ac8850819..0c48e68af9 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -279,7 +279,7 @@ def event_loop_normal(self) -> None: req_ids = [] num_running_requests = 0 while True: - if self.local_rank == 0: + if self.local_rank % mp_num_per_node == 0: if self.model_weights_status.value[0] != 0: self.exist_task_signal.value[0] = 2 else: @@ -293,7 +293,7 @@ def event_loop_normal(self) -> None: self.worker_healthy_live_signal.value[self.local_rank % self.max_chips_per_node] = int(time.time()) # The first worker detects whether there are tasks in the task queue - if self.local_rank % mp_num_per_node == 0: + if self.local_rank % mp_num_per_node == 0 and self.exist_task_signal.value[0] != 2: if self.task_queue.num_tasks() > 0: # VL only support 1 batch to prefill if envs.ENABLE_V1_KVCACHE_SCHEDULER or not ( From d93f9b1823db4a3f88824e4f2a6dc85442992472 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Tue, 26 Aug 2025 16:08:00 +0800 Subject: [PATCH 02/10] update --- fastdeploy/entrypoints/openai/serving_chat.py | 5 +++++ fastdeploy/entrypoints/openai/serving_completion.py | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 05bd571835..592d54c107 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -317,6 +317,9 @@ async def chat_completion_stream_generator( if res.get("error_msg") is not None and "Recover" in res["error_msg"]: choice.finish_reason = "recover_stop" + if res.get("error_msg") is not None and "Clear" in res["error_msg"]: + choice.finish_reason = "clear_data" + if request.return_token_ids: choice.delta.completion_token_ids = list(output["token_ids"]) choice.delta.raw_prediction = output.get("raw_prediction") @@ -480,6 +483,8 @@ async def chat_completion_full_generator( if final_res.get("error_msg") is not None and "Recover" in final_res["error_msg"]: choice.finish_reason = "recover_stop" + if final_res.get("error_msg") is not None and "Clear" in final_res["error_msg"]: + choice.finish_reason = "clear_data" choices.append(choice) num_prompt_tokens = len(prompt_token_ids) diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index 704330373a..d97e5de68d 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -412,6 +412,8 @@ async def completion_stream_generator( choices[-1].finish_reason = self.calc_finish_reason( request.max_tokens, output_tokens[idx], output, tool_called[idx] ) + if res.get("error_msg") is not None and "Clear" in res["error_msg"]: + choices[-1].finish_reason = "clear_data" send_idx = output.get("send_idx") # 只有当 send_idx 明确为 0 时才记录日志 if send_idx == 0 and not request.return_token_ids: @@ -511,6 +513,8 @@ def request_output_to_completion_response( output_text = output["text"] finish_reason = self.calc_finish_reason(request.max_tokens, final_res["output_token_ids"], output, False) + if final_res.get("error_msg") is not None and "Clear" in final_res["error_msg"]: + finish_reason = "clear_data" choice_data = CompletionResponseChoice( token_ids=token_ids, index=len(choices), From b3b77fba6115de923629b8d2cdaa28a8c35c1ac8 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Tue, 16 Sep 2025 14:20:00 +0800 Subject: [PATCH 03/10] fix --- fastdeploy/inter_communicator/engine_worker_queue.py | 2 +- fastdeploy/worker/gcu_model_runner.py | 7 +------ fastdeploy/worker/gpu_model_runner.py | 7 +------ 3 files changed, 3 insertions(+), 13 deletions(-) diff --git a/fastdeploy/inter_communicator/engine_worker_queue.py b/fastdeploy/inter_communicator/engine_worker_queue.py index 528b73673f..9b516013d1 100644 --- a/fastdeploy/inter_communicator/engine_worker_queue.py +++ b/fastdeploy/inter_communicator/engine_worker_queue.py @@ -395,7 +395,7 @@ def get_disaggregated_tasks(self): def clear_data(self): self.lock.acquire() self.tasks[:] = list() - self.client_read_flag[:] = [0] * self.num_client + self.client_read_flag[:] = [1] * self.num_client self.lock.release() llm_logger.info("clear data for engine worker queue") diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index 282ed4c286..33e3bad57a 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -1231,13 +1231,8 @@ def clear_parameters(self, pid): self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") def clear_requests(self): - """ " Dynamic model loader use to clear requests use for RL""" - self.share_inputs["block_tables"][:, :] = -1 + """ Dynamic model loader use to clear requests use for RL""" self.share_inputs["stop_flags"][:] = True - self.seq_lens_this_time_buffer[:] = 0 - self.share_inputs["seq_lens_decoder"][:] = 0 - self.share_inputs["seq_lens_encoder"][:] = 0 - self.share_inputs["is_block_step"][:] = False def update_parameters(self, pid): """ " Dynamic model loader use to update parameters use for RL""" diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 86374ddc70..67c2e10e43 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1912,13 +1912,8 @@ def clear_parameters(self, pid): self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") def clear_requests(self): - """ " Dynamic model loader use to clear requests use for RL""" - self.share_inputs["block_tables"][:, :] = -1 + """ Dynamic model loader use to clear requests use for RL""" self.share_inputs["stop_flags"][:] = True - self.seq_lens_this_time_buffer[:] = 0 - self.share_inputs["seq_lens_decoder"][:] = 0 - self.share_inputs["seq_lens_encoder"][:] = 0 - self.share_inputs["is_block_step"][:] = False def update_parameters(self, pid): """Dynamic model loader use to update parameters use for RL""" From 716f70c86a1a80cbcbc0c28e1d3aae2599ab772b Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Tue, 16 Sep 2025 14:22:41 +0800 Subject: [PATCH 04/10] fix --- fastdeploy/worker/gcu_model_runner.py | 2 +- fastdeploy/worker/gpu_model_runner.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/fastdeploy/worker/gcu_model_runner.py b/fastdeploy/worker/gcu_model_runner.py index 33e3bad57a..660ac21a5b 100644 --- a/fastdeploy/worker/gcu_model_runner.py +++ b/fastdeploy/worker/gcu_model_runner.py @@ -1231,7 +1231,7 @@ def clear_parameters(self, pid): self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") def clear_requests(self): - """ Dynamic model loader use to clear requests use for RL""" + """Dynamic model loader use to clear requests use for RL""" self.share_inputs["stop_flags"][:] = True def update_parameters(self, pid): diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 67c2e10e43..c7e3d976d8 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -1912,7 +1912,7 @@ def clear_parameters(self, pid): self.dynamic_weight_manager._log_memory("dynamic weight manager clear all memory") def clear_requests(self): - """ Dynamic model loader use to clear requests use for RL""" + """Dynamic model loader use to clear requests use for RL""" self.share_inputs["stop_flags"][:] = True def update_parameters(self, pid): From 813ad4286aa3ca319d2def5cd0267f52c2f60309 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Wed, 17 Sep 2025 14:31:26 +0800 Subject: [PATCH 05/10] fix --- fastdeploy/engine/common_engine.py | 13 +++++++++++++ fastdeploy/engine/engine.py | 4 ---- fastdeploy/entrypoints/openai/api_server.py | 2 +- 3 files changed, 14 insertions(+), 5 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index 4375452b2a..c689537747 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -755,6 +755,19 @@ def start_cache_service(self, device_ids, ipc_signal_suffix): def check_and_free_block_tables(self): self.resource_manager.check_and_free_block_tables() + def clear_data(self): + try: + llm_logger.info("Clear Data: Start") + self.token_processor.clear_data() + self.engine_worker_queue.clear_data() + while self.zmq_server.req_dict: + time.sleep(0.1) + llm_logger.info("Clear Data: Successfully") + return True + except Exception as e: + llm_logger.error(f"Clear data error: {e}") + return False + def _exit_sub_services(self): """ exit sub services diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index 5f92cdfe75..9109cc7b6f 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -733,7 +733,3 @@ def detect_thread(): except Exception: pass return True - - def clear_data(self): - self.token_processor.clear_data() - self.engine_worker_queue.clear_data() diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index bc7214860a..9f90fbf102 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -496,7 +496,7 @@ def reset_scheduler(): if llm_engine is None: return Response("Engine not loaded", status_code=500) - llm_engine.clear_data() + llm_engine.engine.clear_data() llm_engine.engine.scheduler.reset() return Response("Scheduler Reset Successfully", status_code=200) From 60a2dfdf3747b84d3b977ae55edb4bf57958ebf1 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Wed, 17 Sep 2025 19:38:07 +0800 Subject: [PATCH 06/10] fix --- fastdeploy/entrypoints/engine_client.py | 3 +++ fastdeploy/entrypoints/openai/serving_chat.py | 16 +++++++++++----- .../entrypoints/openai/serving_completion.py | 16 +++++++++++----- fastdeploy/output/token_processor.py | 4 ---- 4 files changed, 25 insertions(+), 14 deletions(-) diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index b6c0008c3a..6f93905060 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -343,3 +343,6 @@ def clear_load_weight(self, timeout=300): return False, "clear model weight timeout" time.sleep(1) return True, "" + + def check_model_weight_status(self): + return self.model_weights_status_signal.value[0] < 0 diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index fe16af5072..9f628614be 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -220,6 +220,8 @@ async def chat_completion_stream_generator( decoder_base_url=self.tokenizer_base_url, ) while num_choices > 0: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -353,9 +355,6 @@ async def chat_completion_stream_generator( if res.get("error_msg") is not None and "Recover" in res["error_msg"]: choice.finish_reason = "recover_stop" - if res.get("error_msg") is not None and "Clear" in res["error_msg"]: - choice.finish_reason = "clear_data" - if request.return_token_ids: if response_processor.enable_multimodal_content(): choice.delta.multimodal_content[0]["completion_token_ids"] = list(output["token_ids"]) @@ -438,6 +437,14 @@ async def chat_completion_full_generator( decoder_base_url=self.tokenizer_base_url, ) while True: + if self.engine_client.check_model_weight_status(): + return ErrorResponse( + error=ErrorInfo( + message="Model weight cleared", + code=ErrorCode.INVALID_VALUE, + type=ErrorType.INVALID_REQUEST_ERROR, + ) + ) try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -526,8 +533,7 @@ async def chat_completion_full_generator( if final_res.get("error_msg") is not None and "Recover" in final_res["error_msg"]: choice.finish_reason = "recover_stop" - if final_res.get("error_msg") is not None and "Clear" in final_res["error_msg"]: - choice.finish_reason = "clear_data" + choices.append(choice) num_prompt_tokens = len(prompt_token_ids) diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index b5476ba9da..75068921ab 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -227,6 +227,14 @@ async def completion_full_generator( completion_batched_token_ids = [[] for _ in range(num_choices)] current_waiting_time = 0 while num_choices > 0: + if self.engine_client.check_model_weight_status(): + return ErrorResponse( + error=ErrorInfo( + message="Model weight cleared", + code=ErrorCode.INVALID_VALUE, + type=ErrorType.INVALID_REQUEST_ERROR, + ) + ) try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -281,7 +289,6 @@ async def completion_full_generator( return res except Exception as e: api_server_logger.error(f"Error in completion_full_generator: {e}", exc_info=True) - raise finally: self.engine_client.semaphore.release() if dealer is not None: @@ -360,6 +367,8 @@ async def completion_stream_generator( ) current_waiting_time = 0 while num_choices > 0: + if self.engine_client.check_model_weight_status(): + raise ValueError("Engine is clearing model weight") try: response = await asyncio.wait_for(response_queue.get(), timeout=10) current_waiting_time = 0 @@ -447,8 +456,7 @@ async def completion_stream_generator( choices[-1].finish_reason = self.calc_finish_reason( request.max_tokens, output_tokens[idx], output, tool_called[idx] ) - if res.get("error_msg") is not None and "Clear" in res["error_msg"]: - choices[-1].finish_reason = "clear_data" + send_idx = output.get("send_idx") # 只有当 send_idx 明确为 0 时才记录日志 if send_idx == 0 and not request.return_token_ids: @@ -533,8 +541,6 @@ def request_output_to_completion_response( output_text = output["text"] finish_reason = self.calc_finish_reason(request.max_tokens, final_res["output_token_ids"], output, False) - if final_res.get("error_msg") is not None and "Clear" in final_res["error_msg"]: - finish_reason = "clear_data" choice_data = CompletionResponseChoice( token_ids=token_ids, index=len(choices), diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index 98356e6625..dab46ff516 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -548,7 +548,6 @@ def _record_speculative_decoding_mertics(self, accept_num): def clear_data(self): if envs.ENABLE_V1_KVCACHE_SCHEDULER: self.resource_manager.clear_data() - batch_result = [] for i in range(self.cfg.max_num_seqs): if self.resource_manager.stop_flags[i]: continue @@ -567,12 +566,9 @@ def clear_data(self): request_start_time=task.arrival_time, ), ) - result.error_msg = "Clear weight, stop all inference!" is_prefill = task.disaggregate_info is not None and task.disaggregate_info["role"] == "prefill" self._recycle_resources(task.request_id, i, task, result, is_prefill) llm_logger.warning(f"clear data for task {task.request_id}") - batch_result.append(result) - self.postprocess(batch_result) class WarmUpTokenProcessor(TokenProcessor): From 7ce733e7551a3a114bfca68785d6f7ba5c4af9f0 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Thu, 18 Sep 2025 10:46:27 +0800 Subject: [PATCH 07/10] fix --- fastdeploy/engine/common_engine.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fastdeploy/engine/common_engine.py b/fastdeploy/engine/common_engine.py index c689537747..8ce70657a6 100644 --- a/fastdeploy/engine/common_engine.py +++ b/fastdeploy/engine/common_engine.py @@ -760,8 +760,7 @@ def clear_data(self): llm_logger.info("Clear Data: Start") self.token_processor.clear_data() self.engine_worker_queue.clear_data() - while self.zmq_server.req_dict: - time.sleep(0.1) + self.zmq_server.req_dict.clear() llm_logger.info("Clear Data: Successfully") return True except Exception as e: From b14792e35490b125bd0fc2156665a98f4d5893ec Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Mon, 22 Sep 2025 14:33:11 +0800 Subject: [PATCH 08/10] fix --- tests/entrypoints/openai/test_max_streaming_tokens.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/entrypoints/openai/test_max_streaming_tokens.py b/tests/entrypoints/openai/test_max_streaming_tokens.py index 0b474d332e..9746f02701 100644 --- a/tests/entrypoints/openai/test_max_streaming_tokens.py +++ b/tests/entrypoints/openai/test_max_streaming_tokens.py @@ -24,6 +24,8 @@ async def asyncSetUp(self): self.engine_client.semaphore.release = Mock() self.engine_client.data_processor = Mock() self.engine_client.is_master = True + self.engine_client.check_model_weight_status = Mock(return_value=False) + self.chat_serving = OpenAIServingChat( engine_client=self.engine_client, From 495eb8640a4a63f2af41962c7383ba9ee6b187c2 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Mon, 22 Sep 2025 14:35:03 +0800 Subject: [PATCH 09/10] fix --- tests/entrypoints/openai/test_max_streaming_tokens.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/entrypoints/openai/test_max_streaming_tokens.py b/tests/entrypoints/openai/test_max_streaming_tokens.py index 9746f02701..61d5f88d45 100644 --- a/tests/entrypoints/openai/test_max_streaming_tokens.py +++ b/tests/entrypoints/openai/test_max_streaming_tokens.py @@ -25,7 +25,6 @@ async def asyncSetUp(self): self.engine_client.data_processor = Mock() self.engine_client.is_master = True self.engine_client.check_model_weight_status = Mock(return_value=False) - self.chat_serving = OpenAIServingChat( engine_client=self.engine_client, From cee5c31157c47dab0ae763e23e426c843b103604 Mon Sep 17 00:00:00 2001 From: ltd0924 Date: Fri, 26 Sep 2025 11:59:25 +0800 Subject: [PATCH 10/10] [BugFix] fix clear data --- fastdeploy/entrypoints/openai/api_server.py | 1 + fastdeploy/rl/dynamic_weight_manager.py | 1 + fastdeploy/worker/worker_process.py | 2 ++ 3 files changed, 4 insertions(+) diff --git a/fastdeploy/entrypoints/openai/api_server.py b/fastdeploy/entrypoints/openai/api_server.py index 9f90fbf102..29738c2fcf 100644 --- a/fastdeploy/entrypoints/openai/api_server.py +++ b/fastdeploy/entrypoints/openai/api_server.py @@ -516,6 +516,7 @@ def control_scheduler(request: ControlSchedulerRequest): return JSONResponse(content=content.model_dump(), status_code=500) if request.reset: + llm_engine.engine.clear_data() llm_engine.engine.scheduler.reset() if request.load_shards_num or request.reallocate_shard: diff --git a/fastdeploy/rl/dynamic_weight_manager.py b/fastdeploy/rl/dynamic_weight_manager.py index 6d55c3e414..cc714f79dc 100644 --- a/fastdeploy/rl/dynamic_weight_manager.py +++ b/fastdeploy/rl/dynamic_weight_manager.py @@ -256,6 +256,7 @@ def check_model_weights_status(model_weights_status, model_runner, pid): while model_weights_status.value[0] != 0: if model_weights_status.value[0] == 1: logger.info("infer engine stopped! start to load new checkpoint...") + model_runner.clear_requests() model_runner.update_parameters(pid) elif model_weights_status.value[0] == -1: logger.info("infer engine stopped! start to clear checkpoint...") diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 186dd58ea6..eeed9539c4 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -321,6 +321,8 @@ def event_loop_normal(self) -> None: self.worker.model_runner, self.parallel_config.engine_worker_queue_port, ) + logger.info(f"current task queue data: {self.task_queue.num_tasks()}") + self.task_queue.clear_data() self.model_weights_signal[0] = 0 logger.info(f"Rank: {self.local_rank} has updated or cleared parameters.")