11import asyncio
22import json
33import os
4+ import shutil
5+ import signal
6+ import socket
47import subprocess
8+ import sys
59import tempfile
10+ import time
611import unittest
712from http import HTTPStatus
813from unittest .mock import AsyncMock , MagicMock , Mock , mock_open , patch
4247 write_local_file ,
4348)
4449
50+ # Read ports from environment variables; use default values if not set
51+ FD_API_PORT = int (os .getenv ("FD_API_PORT" , 8188 ))
52+ FD_ENGINE_QUEUE_PORT = int (os .getenv ("FD_ENGINE_QUEUE_PORT" , 8133 ))
53+ FD_METRICS_PORT = int (os .getenv ("FD_METRICS_PORT" , 8233 ))
54+ FD_CACHE_QUEUE_PORT = int (os .getenv ("FD_CACHE_QUEUE_PORT" , 8333 ))
55+
56+ # List of ports to clean before and after tests
57+ PORTS_TO_CLEAN = [FD_API_PORT , FD_ENGINE_QUEUE_PORT , FD_METRICS_PORT , FD_CACHE_QUEUE_PORT ]
58+
59+
60+ def is_port_open (host : str , port : int , timeout = 1.0 ):
61+ """
62+ Check if a TCP port is open on the given host.
63+ Returns True if connection succeeds, False otherwise.
64+ """
65+ try :
66+ with socket .create_connection ((host , port ), timeout ):
67+ return True
68+ except Exception :
69+ return False
70+
71+
72+ def _clean_cuda_process ():
73+ """
74+ Kill processes that are using CUDA devices.
75+ NOTE: Do not call this function directly, use the `clean` function instead.
76+ """
77+ try :
78+ subprocess .run ("fuser -k /dev/nvidia*" , shell = True , timeout = 5 )
79+ except (subprocess .TimeoutExpired , subprocess .CalledProcessError , FileNotFoundError ):
80+ pass
81+
82+
83+ def kill_process_on_port (port : int ):
84+ """
85+ Kill processes that are listening on the given port.
86+ Uses multiple methods to ensure thorough cleanup.
87+ """
88+ current_pid = os .getpid ()
89+ parent_pid = os .getppid ()
90+
91+ # Method 1: Use lsof to find processes
92+ try :
93+ output = subprocess .check_output (f"lsof -i:{ port } -t" , shell = True ).decode ().strip ()
94+ for pid in output .splitlines ():
95+ pid = int (pid )
96+ if pid in (current_pid , parent_pid ):
97+ print (f"Skip killing current process (pid={ pid } ) on port { port } " )
98+ continue
99+ try :
100+ # First try SIGTERM for graceful shutdown
101+ os .kill (pid , signal .SIGTERM )
102+ time .sleep (1 )
103+ # Then SIGKILL if still running
104+ os .kill (pid , signal .SIGKILL )
105+ print (f"Killed process on port { port } , pid={ pid } " )
106+ except ProcessLookupError :
107+ pass # Process already terminated
108+ except subprocess .CalledProcessError :
109+ pass
110+
111+ # Method 2: Use netstat and fuser as backup
112+ try :
113+ # Find processes using netstat and awk
114+ cmd = f"netstat -tulpn 2>/dev/null | grep :{ port } | awk '{{print $7}}' | cut -d'/' -f1"
115+ output = subprocess .check_output (cmd , shell = True ).decode ().strip ()
116+ for pid in output .splitlines ():
117+ if pid and pid .isdigit ():
118+ pid = int (pid )
119+ if pid in (current_pid , parent_pid ):
120+ continue
121+ try :
122+ os .kill (pid , signal .SIGKILL )
123+ print (f"Killed process (netstat) on port { port } , pid={ pid } " )
124+ except ProcessLookupError :
125+ pass
126+ except (subprocess .CalledProcessError , FileNotFoundError ):
127+ pass
128+
129+ # Method 3: Use fuser if available
130+ try :
131+ subprocess .run (f"fuser -k { port } /tcp" , shell = True , timeout = 5 )
132+ except (subprocess .TimeoutExpired , subprocess .CalledProcessError , FileNotFoundError ):
133+ pass
134+
135+
136+ def clean_ports (ports = None ):
137+ """
138+ Kill all processes occupying the ports
139+ """
140+ if ports is None :
141+ ports = PORTS_TO_CLEAN
142+
143+ print (f"Cleaning ports: { ports } " )
144+ for port in ports :
145+ kill_process_on_port (port )
146+
147+ # Double check and retry if ports are still in use
148+ time .sleep (2 )
149+ for port in ports :
150+ if is_port_open ("127.0.0.1" , port , timeout = 0.1 ):
151+ print (f"Port { port } still in use, retrying cleanup..." )
152+ kill_process_on_port (port )
153+ time .sleep (1 )
154+
155+
156+ def clean (ports = None ):
157+ """
158+ Clean up resources used during testing.
159+ """
160+ clean_ports (ports )
161+
162+ # Clean CUDA devices before and after tests.
163+ # NOTE: It is dangerous to use this flag on development machines, as it may kill other processes
164+ clean_cuda = int (os .getenv ("CLEAN_CUDA" , "0" )) == 1
165+ if clean_cuda :
166+ _clean_cuda_process ()
167+
168+
45169INPUT_BATCH = """
46170{"custom_id": "req-00001", "method": "POST", "url": "/v1/chat/completions", "body": {"messages": [{"role": "user", "content": "Can you write a short poem? (id=1)"}], "temperature": 0.7, "max_tokens": 200}}
47171{"custom_id": "req-00002", "method": "POST", "url": "/v1/chat/completions", "body": {"messages": [{"role": "user", "content": "What can you do? (id=2)"}], "temperature": 0.7, "max_tokens": 200}}
@@ -1249,28 +1373,63 @@ def test_completed_log_interval(self, mock_logger):
12491373 mock_logger .info .assert_called_with (f"Progress: { i } /100 requests completed" )
12501374
12511375
1252- FD_ENGINE_QUEUE_PORT = int (os .getenv ("FD_ENGINE_QUEUE_PORT" , 8133 ))
1253- FD_CACHE_QUEUE_PORT = int (os .getenv ("FD_CACHE_QUEUE_PORT" , 8333 ))
1254-
1255-
12561376class TestFastDeployBatch (unittest .TestCase ):
12571377 """测试 FastDeploy 批处理功能的 unittest 测试类"""
12581378
12591379 def setUp (self ):
12601380 """每个测试方法执行前的准备工作"""
1381+ print ("\n [SetUp] Pre-test cleanup..." )
1382+
1383+ # 1. 清理日志目录
1384+ if os .path .exists ("log" ) and os .path .isdir ("log" ):
1385+ shutil .rmtree ("log" )
1386+
1387+ # 2. 清理端口
1388+ clean_ports ()
1389+
1390+ # 3. 确定模型路径
12611391 self .model_path = "baidu/ERNIE-4.5-0.3B-PT"
1262- self .base_command = ["fastdeploy" , "run-batch" ]
1263- self .run_batch_command = ["python" , "fastdeploy/entrypoints/openai/run_batch.py" ]
1392+
1393+ self .run_batch_command = [sys .executable , "fastdeploy/entrypoints/openai/run_batch.py" ]
1394+
1395+ # 用于追踪所有启动的子进程,以便在 tearDown 中清理
1396+ self .subprocesses = []
1397+
1398+ def tearDown (self ):
1399+ """每个测试方法执行后的清理工作"""
1400+ print ("\n [TearDown] executing cleanup..." )
1401+ for proc in self .subprocesses :
1402+ try :
1403+ # 检查进程是否还在运行
1404+ if proc .poll () is None :
1405+ print (f"Terminating process group (PGID: { proc .pid } )..." )
1406+ # 使用 os.killpg 杀掉整个进程组,确保子进程也被清理
1407+ os .killpg (proc .pid , signal .SIGTERM )
1408+
1409+ # 等待进程退出
1410+ start_wait = time .time ()
1411+ while proc .poll () is None :
1412+ if time .time () - start_wait > 5 :
1413+ print (f"Process group (PGID: { proc .pid } ) timed out, forcing SIGKILL..." )
1414+ os .killpg (proc .pid , signal .SIGKILL )
1415+ break
1416+ time .sleep (0.1 )
1417+ proc .wait ()
1418+ except Exception as e :
1419+ print (f"Error cleaning up process (PID: { proc .pid } ): { e } " )
1420+
1421+ # 再次确保端口释放
1422+ pass
12641423
12651424 def run_fastdeploy_command (self , input_content , port = None ):
12661425 """运行 FastDeploy 命令的辅助方法"""
12671426 if port is None :
12681427 port = str (FD_CACHE_QUEUE_PORT )
12691428
1270- with tempfile . NamedTemporaryFile ( "w" ) as input_file , tempfile . NamedTemporaryFile ( "r" ) as output_file :
1271-
1272- input_file . write ( input_content )
1273- input_file . flush ()
1429+ with (
1430+ tempfile . NamedTemporaryFile ( "w" , delete = False ) as input_file ,
1431+ tempfile . NamedTemporaryFile ( "r" , delete = False ) as output_file ,
1432+ ):
12741433
12751434 param = [
12761435 "-i" ,
@@ -1294,23 +1453,75 @@ def run_fastdeploy_command(self, input_content, port=None):
12941453 "--engine-worker-queue-port" ,
12951454 str (FD_ENGINE_QUEUE_PORT ),
12961455 ]
1456+ input_path = input_file .name
1457+ output_path = output_file .name
1458+
1459+ try :
1460+ input_file .write (input_content )
1461+ input_file .flush ()
1462+
1463+ param = [
1464+ "-i" ,
1465+ input_path ,
1466+ "-o" ,
1467+ output_path ,
1468+ "--model" ,
1469+ self .model_path ,
1470+ "--cache-queue-port" ,
1471+ port ,
1472+ "--tensor-parallel-size" ,
1473+ "1" ,
1474+ "--quantization" ,
1475+ "wint4" ,
1476+ "--max-model-len" ,
1477+ "5120" ,
1478+ "--max-num-seqs" ,
1479+ "64" ,
1480+ "--load-choices" ,
1481+ "default_v1" ,
1482+ "--engine-worker-queue-port" ,
1483+ str (FD_ENGINE_QUEUE_PORT ),
1484+ ]
1485+
1486+ run_batch_command = self .run_batch_command + param
1487+
1488+ print (f"Executing command: { ' ' .join (run_batch_command )} " )
1489+
1490+ proc = subprocess .Popen (
1491+ run_batch_command ,
1492+ # stdout=logfile,
1493+ # stderr=subprocess.STDOUT,
1494+ start_new_session = True ,
1495+ )
1496+
1497+ self .subprocesses .append (proc )
1498+
1499+ try :
1500+ proc .wait (timeout = 300 ) # 等待最多 5 分钟
1501+ except subprocess .TimeoutExpired :
1502+ print ("[TIMEOUT] run_batch command timed out." )
1503+ os .killpg (proc .pid , signal .SIGKILL )
1504+ raise
12971505
1298- # command = self.base_command + param
1299- run_batch_command = self .run_batch_command + param
1506+ return_code = proc .returncode
13001507
1301- proc = subprocess . Popen ( run_batch_command )
1302- proc . communicate ( )
1303- return_code = proc . wait ()
1508+ # 读取输出结果
1509+ output_file . seek ( 0 )
1510+ contents = output_file . read ()
13041511
1305- # 读取输出文件内容
1306- output_file .seek (0 )
1307- contents = output_file .read ()
1512+ return return_code , contents , proc
13081513
1309- return return_code , contents , proc
1514+ finally :
1515+ # 清理临时文件
1516+ if os .path .exists (input_path ):
1517+ os .unlink (input_path )
1518+ if os .path .exists (output_path ):
1519+ os .unlink (output_path )
13101520
13111521 def test_completions (self ):
13121522 """测试正常的批量chat请求"""
13131523 return_code , contents , proc = self .run_fastdeploy_command (INPUT_BATCH , port = "2235" )
1524+ print (f"进程输出: { return_code } " )
13141525
13151526 self .assertEqual (return_code , 0 , f"进程返回非零码: { return_code } , 进程信息: { proc } " )
13161527
0 commit comments