Skip to content

Commit c319016

Browse files
committed
[CI]add clear to run-batch ci
1 parent 6f42c37 commit c319016

File tree

1 file changed

+235
-20
lines changed

1 file changed

+235
-20
lines changed

tests/entrypoints/openai/test_run_batch.py

Lines changed: 235 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import asyncio
22
import json
33
import os
4+
import shutil
5+
import signal
6+
import socket
47
import subprocess
8+
import sys
59
import tempfile
10+
import time
611
import unittest
712
from http import HTTPStatus
813
from unittest.mock import AsyncMock, MagicMock, Mock, mock_open, patch
@@ -42,6 +47,125 @@
4247
write_local_file,
4348
)
4449

50+
# Read ports from environment variables; use default values if not set
51+
FD_API_PORT = int(os.getenv("FD_API_PORT", 8188))
52+
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
53+
FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", 8233))
54+
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
55+
56+
# List of ports to clean before and after tests
57+
PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT, FD_CACHE_QUEUE_PORT]
58+
59+
60+
def is_port_open(host: str, port: int, timeout=1.0):
61+
"""
62+
Check if a TCP port is open on the given host.
63+
Returns True if connection succeeds, False otherwise.
64+
"""
65+
try:
66+
with socket.create_connection((host, port), timeout):
67+
return True
68+
except Exception:
69+
return False
70+
71+
72+
def _clean_cuda_process():
73+
"""
74+
Kill processes that are using CUDA devices.
75+
NOTE: Do not call this function directly, use the `clean` function instead.
76+
"""
77+
try:
78+
subprocess.run("fuser -k /dev/nvidia*", shell=True, timeout=5)
79+
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
80+
pass
81+
82+
83+
def kill_process_on_port(port: int):
84+
"""
85+
Kill processes that are listening on the given port.
86+
Uses multiple methods to ensure thorough cleanup.
87+
"""
88+
current_pid = os.getpid()
89+
parent_pid = os.getppid()
90+
91+
# Method 1: Use lsof to find processes
92+
try:
93+
output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip()
94+
for pid in output.splitlines():
95+
pid = int(pid)
96+
if pid in (current_pid, parent_pid):
97+
print(f"Skip killing current process (pid={pid}) on port {port}")
98+
continue
99+
try:
100+
# First try SIGTERM for graceful shutdown
101+
os.kill(pid, signal.SIGTERM)
102+
time.sleep(1)
103+
# Then SIGKILL if still running
104+
os.kill(pid, signal.SIGKILL)
105+
print(f"Killed process on port {port}, pid={pid}")
106+
except ProcessLookupError:
107+
pass # Process already terminated
108+
except subprocess.CalledProcessError:
109+
pass
110+
111+
# Method 2: Use netstat and fuser as backup
112+
try:
113+
# Find processes using netstat and awk
114+
cmd = f"netstat -tulpn 2>/dev/null | grep :{port} | awk '{{print $7}}' | cut -d'/' -f1"
115+
output = subprocess.check_output(cmd, shell=True).decode().strip()
116+
for pid in output.splitlines():
117+
if pid and pid.isdigit():
118+
pid = int(pid)
119+
if pid in (current_pid, parent_pid):
120+
continue
121+
try:
122+
os.kill(pid, signal.SIGKILL)
123+
print(f"Killed process (netstat) on port {port}, pid={pid}")
124+
except ProcessLookupError:
125+
pass
126+
except (subprocess.CalledProcessError, FileNotFoundError):
127+
pass
128+
129+
# Method 3: Use fuser if available
130+
try:
131+
subprocess.run(f"fuser -k {port}/tcp", shell=True, timeout=5)
132+
except (subprocess.TimeoutExpired, subprocess.CalledProcessError, FileNotFoundError):
133+
pass
134+
135+
136+
def clean_ports(ports=None):
137+
"""
138+
Kill all processes occupying the ports
139+
"""
140+
if ports is None:
141+
ports = PORTS_TO_CLEAN
142+
143+
print(f"Cleaning ports: {ports}")
144+
for port in ports:
145+
kill_process_on_port(port)
146+
147+
# Double check and retry if ports are still in use
148+
time.sleep(2)
149+
for port in ports:
150+
if is_port_open("127.0.0.1", port, timeout=0.1):
151+
print(f"Port {port} still in use, retrying cleanup...")
152+
kill_process_on_port(port)
153+
time.sleep(1)
154+
155+
156+
def clean(ports=None):
157+
"""
158+
Clean up resources used during testing.
159+
"""
160+
clean_ports(ports)
161+
162+
# Clean CUDA devices before and after tests.
163+
# NOTE: It is dangerous to use this flag on development machines, as it may kill other processes
164+
clean_cuda = int(os.getenv("CLEAN_CUDA", "0")) == 1
165+
if clean_cuda:
166+
_clean_cuda_process()
167+
168+
45169
INPUT_BATCH = """
46170
{"custom_id": "req-00001", "method": "POST", "url": "/v1/chat/completions", "body": {"messages": [{"role": "user", "content": "Can you write a short poem? (id=1)"}], "temperature": 0.7, "max_tokens": 200}}
47171
{"custom_id": "req-00002", "method": "POST", "url": "/v1/chat/completions", "body": {"messages": [{"role": "user", "content": "What can you do? (id=2)"}], "temperature": 0.7, "max_tokens": 200}}
@@ -1249,28 +1373,67 @@ def test_completed_log_interval(self, mock_logger):
12491373
mock_logger.info.assert_called_with(f"Progress: {i}/100 requests completed")
12501374

12511375

1252-
FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", 8133))
1253-
FD_CACHE_QUEUE_PORT = int(os.getenv("FD_CACHE_QUEUE_PORT", 8333))
1254-
1255-
12561376
class TestFastDeployBatch(unittest.TestCase):
12571377
"""测试 FastDeploy 批处理功能的 unittest 测试类"""
12581378

12591379
def setUp(self):
12601380
"""每个测试方法执行前的准备工作"""
1261-
self.model_path = "baidu/ERNIE-4.5-0.3B-PT"
1262-
self.base_command = ["fastdeploy", "run-batch"]
1263-
self.run_batch_command = ["python", "fastdeploy/entrypoints/openai/run_batch.py"]
1381+
print("\n[SetUp] Pre-test cleanup...")
1382+
1383+
# 1. 清理日志目录
1384+
if os.path.exists("log") and os.path.isdir("log"):
1385+
shutil.rmtree("log")
1386+
1387+
# 2. 清理端口
1388+
clean_ports()
1389+
1390+
# 3. 确定模型路径
1391+
base_path = os.getenv("MODEL_PATH")
1392+
if base_path:
1393+
self.model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-PT")
1394+
else:
1395+
self.model_path = "baidu/ERNIE-4.5-0.3B-PT"
1396+
1397+
self.run_batch_command = [sys.executable, "fastdeploy/entrypoints/openai/run_batch.py"]
1398+
1399+
# 用于追踪所有启动的子进程,以便在 tearDown 中清理
1400+
self.subprocesses = []
1401+
1402+
def tearDown(self):
1403+
"""每个测试方法执行后的清理工作"""
1404+
print("\n[TearDown] executing cleanup...")
1405+
for proc in self.subprocesses:
1406+
try:
1407+
# 检查进程是否还在运行
1408+
if proc.poll() is None:
1409+
print(f"Terminating process group (PGID: {proc.pid})...")
1410+
# 使用 os.killpg 杀掉整个进程组,确保子进程也被清理
1411+
os.killpg(proc.pid, signal.SIGTERM)
1412+
1413+
# 等待进程退出
1414+
start_wait = time.time()
1415+
while proc.poll() is None:
1416+
if time.time() - start_wait > 5:
1417+
print(f"Process group (PGID: {proc.pid}) timed out, forcing SIGKILL...")
1418+
os.killpg(proc.pid, signal.SIGKILL)
1419+
break
1420+
time.sleep(0.1)
1421+
proc.wait()
1422+
except Exception as e:
1423+
print(f"Error cleaning up process (PID: {proc.pid}): {e}")
1424+
1425+
# 再次确保端口释放
1426+
pass
12641427

12651428
def run_fastdeploy_command(self, input_content, port=None):
12661429
"""运行 FastDeploy 命令的辅助方法"""
12671430
if port is None:
12681431
port = str(FD_CACHE_QUEUE_PORT)
12691432

1270-
with tempfile.NamedTemporaryFile("w") as input_file, tempfile.NamedTemporaryFile("r") as output_file:
1271-
1272-
input_file.write(input_content)
1273-
input_file.flush()
1433+
with (
1434+
tempfile.NamedTemporaryFile("w", delete=False) as input_file,
1435+
tempfile.NamedTemporaryFile("r", delete=False) as output_file,
1436+
):
12741437

12751438
param = [
12761439
"-i",
@@ -1294,23 +1457,75 @@ def run_fastdeploy_command(self, input_content, port=None):
12941457
"--engine-worker-queue-port",
12951458
str(FD_ENGINE_QUEUE_PORT),
12961459
]
1460+
input_path = input_file.name
1461+
output_path = output_file.name
1462+
1463+
try:
1464+
input_file.write(input_content)
1465+
input_file.flush()
1466+
1467+
param = [
1468+
"-i",
1469+
input_path,
1470+
"-o",
1471+
output_path,
1472+
"--model",
1473+
self.model_path,
1474+
"--cache-queue-port",
1475+
port,
1476+
"--tensor-parallel-size",
1477+
"1",
1478+
"--quantization",
1479+
"wint4",
1480+
"--max-model-len",
1481+
"5120",
1482+
"--max-num-seqs",
1483+
"64",
1484+
"--load-choices",
1485+
"default_v1",
1486+
"--engine-worker-queue-port",
1487+
str(FD_ENGINE_QUEUE_PORT),
1488+
]
1489+
1490+
run_batch_command = self.run_batch_command + param
1491+
1492+
print(f"Executing command: {' '.join(run_batch_command)}")
1493+
1494+
proc = subprocess.Popen(
1495+
run_batch_command,
1496+
# stdout=logfile,
1497+
# stderr=subprocess.STDOUT,
1498+
start_new_session=True,
1499+
)
1500+
1501+
self.subprocesses.append(proc)
1502+
1503+
try:
1504+
proc.wait(timeout=300) # 等待最多 5 分钟
1505+
except subprocess.TimeoutExpired:
1506+
print("[TIMEOUT] run_batch command timed out.")
1507+
os.killpg(proc.pid, signal.SIGKILL)
1508+
raise
12971509

1298-
# command = self.base_command + param
1299-
run_batch_command = self.run_batch_command + param
1510+
return_code = proc.returncode
13001511

1301-
proc = subprocess.Popen(run_batch_command)
1302-
proc.communicate()
1303-
return_code = proc.wait()
1512+
# 读取输出结果
1513+
output_file.seek(0)
1514+
contents = output_file.read()
13041515

1305-
# 读取输出文件内容
1306-
output_file.seek(0)
1307-
contents = output_file.read()
1516+
return return_code, contents, proc
13081517

1309-
return return_code, contents, proc
1518+
finally:
1519+
# 清理临时文件
1520+
if os.path.exists(input_path):
1521+
os.unlink(input_path)
1522+
if os.path.exists(output_path):
1523+
os.unlink(output_path)
13101524

13111525
def test_completions(self):
13121526
"""测试正常的批量chat请求"""
13131527
return_code, contents, proc = self.run_fastdeploy_command(INPUT_BATCH, port="2235")
1528+
print(f"进程输出: {return_code}")
13141529

13151530
self.assertEqual(return_code, 0, f"进程返回非零码: {return_code}, 进程信息: {proc}")
13161531

0 commit comments

Comments
 (0)