Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 145 additions & 0 deletions access_all_tests/access_all_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
# -*- coding: UTF-8 -*-

import os
import sys
import subprocess
import threading
import queue
import argparse
from pathlib import Path
import random
import psutil
from access_control import (
TestMgr,
BASE_DIR, TEST_DIR, SLOW_TEST_BLOCKLIST, NOT_RUN_DIRECTLY, EXEC_TIMEOUT
)


def exec_ut(files):
"""
执行单元测试文件,其中存在失败,则标识异常并打印相关信息
"""

def get_op_name(ut_file):
op_name = str(ut_file.split('/')[-1].split('.')[0])
return op_name[5:] if op_name.startswith("test_") else op_name

def get_ut_name(ut_file):
return str(Path(ut_file).relative_to(TEST_DIR))[:-3]

def get_ut_cmd(ut_type, ut_file):
cmd = [sys.executable, "run_test.py", "-v"]
if ut_type == "op_ut_files":
# do not skip ops related test entries
return cmd + ["-e"] + SLOW_TEST_BLOCKLIST[1:] + ["-i", "test_ops", "--", "-k", "_" + get_op_name(ut_file)]
return cmd + ["-i", get_ut_name(ut_file)]

def wait_thread(process, event_timer):
process.wait()
event_timer.set()

def enqueue_output(out, log_queue):
for line in iter(out.readline, b''):
log_queue.put(line.decode('utf-8'))
out.close()
return

def start_thread(fn, *args):
stdout_t = threading.Thread(target=fn, args=args)
stdout_t.daemon = True
stdout_t.start()

def print_subprocess_log(log_queue):
while not log_queue.empty():
print((log_queue.get()).strip())

def run_cmd_with_timeout(cmd):
os.chdir(str(TEST_DIR))
stdout_queue = queue.Queue()
event_timer = threading.Event()

p = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
start_thread(wait_thread, p, event_timer)
start_thread(enqueue_output, p.stdout, stdout_queue)

try:
event_timer.wait(EXEC_TIMEOUT)
ret = p.poll()
if ret:
print_subprocess_log(stdout_queue)
if not event_timer.is_set():
ret = 1
parent_process = psutil.Process(p.pid)
for children_process in parent_process.children(recursive=True):
children_process.kill()
p.kill()
p.terminate()
print("Timeout: Command '{}' timed out after {} seconds".format(" ".join(cmd), EXEC_TIMEOUT))
print_subprocess_log(stdout_queue)
except Exception as err:
ret = 1
print(err)
return ret

def run_tests(test_files):
test_infos = []
has_failed = 0
init_method = random.randint(1, 2)
for ut_type, ut_files in test_files.items():
for ut_file in ut_files:
cmd = get_ut_cmd(ut_type, ut_file)
ut_info = str(cmd[-1])
if ut_type == "op_ut_files":
ut_info = "test_ops " + ut_info
else:
cmd = cmd if 'op-plugin' in str(Path(ut_file)) else cmd + ["--init_method={}".format(init_method)]
ret = run_cmd_with_timeout(cmd)
if ret:
has_failed = ret
test_infos.append("exec ut {} failed.".format(ut_info))
else:
test_infos.append("exec ut {} success.".format(ut_info))
init_method = 2 if init_method == 1 else 1
return has_failed, test_infos

ret_status, exec_infos = run_tests(files)

print("***** Total result:")
for exec_info in exec_infos:
print(exec_info)
return ret_status


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Control needed ut cases')
parser.add_argument('--all', action="store_true", help='Run all testcases')
parser.add_argument('--distributed', action="store_true", help='Run distributed testcases')
parser.add_argument('--rank', default=0, type=int, help='Index of current ut nodes')
parser.add_argument('--world_size', default=0, type=int, help='Number of ut nodes')
parser.add_argument('--network_ops', action="store_true", help='Run network_ops testcases in the op-plugin repo')
options = parser.parse_args()
print(f"options: {options}")
cur_modify_files = str(BASE_DIR / 'modify_files.txt')
test_mgr = TestMgr()

if options.all:
test_mgr.load_all_ut(options.distributed, options.network_ops)
elif options.distributed:
test_mgr.load_distributed_ut()
elif os.path.exists(cur_modify_files):
test_mgr.load(cur_modify_files)
test_mgr.analyze()
else:
test_mgr.load_core_ut()
test_mgr.exclude_test_files(not_run_files=NOT_RUN_DIRECTLY, mode="not_run_directly")

if options.rank > 0 and options.world_size > 0:
test_mgr.split_test_files(options.rank, options.world_size)
cur_test_files = test_mgr.get_test_files()

test_mgr.print_modify_files()
test_mgr.print_ut_files()
test_mgr.print_op_ut_files()

ret_ut = exec_ut(cur_test_files)
sys.exit(ret_ut)
5 changes: 5 additions & 0 deletions access_all_tests/test_units/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .constants import (
BASE_DIR, TEST_DIR, NETWORK_OPS_DIR,
SLOW_TEST_BLOCKLIST, NOT_RUN_DIRECTLY, INCLUDE_FILES, EXEC_TIMEOUT
)
from .test_manager import TestMgr
36 changes: 36 additions & 0 deletions access_all_tests/test_units/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os
from pathlib import Path

BASE_DIR = Path(__file__).absolute().parent.parent.parent
TEST_DIR = BASE_DIR / 'tests'
DEVICE_SPECIFIED_TEST_DIR = TEST_DIR / 'device_specified_tests'
PROCESSED_TEST_DIR = TEST_DIR / 'processed_tests'

# Add slow test cases here (first element must be test_ops)
SLOW_TEST_BLOCKLIST = [
'test_ops',
'test_modules',
'test_binary_ufuncs',
'test_ops_fwd_gradients',
'test_ops_gradients',
'test_reductions',
'test_unary_ufuncs',
'test_ops_jit',
'onnx/test_op_consistency',
'test_foreach',
'onnx/test_pytorch_onnx_onnxruntime'
]

# exclude some not run directly test files
NOT_RUN_DIRECTLY = {
"jit": "test_jit.py",
}

# include some files
INCLUDE_FILES = [
'jit/test_complexity.py',
]

# default ut cmd execution timeout is 2000s
EXEC_TIMEOUT = os.getenv("PTA_UT_EXEC_TIMEOUT", "2000")
EXEC_TIMEOUT = int(EXEC_TIMEOUT) if EXEC_TIMEOUT.isdigit() else 2000
5 changes: 5 additions & 0 deletions access_all_tests/test_units/strategy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .copy_opt import CopyOptStrategy
from .core import CoreTestStrategy
from .directory_mapping import DirectoryMappingStrategy
from .op import OpStrategy
from .test_file import TestFileStrategy
30 changes: 30 additions & 0 deletions access_all_tests/test_units/strategy/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import os
import subprocess
from abc import ABCMeta, abstractmethod
from ..constants import BASE_DIR


class AccurateTest(metaclass=ABCMeta):
base_dir = BASE_DIR
test_dir = base_dir / 'test'

@abstractmethod
def identify(self, modify_file):
"""
该接口提供代码对应的UT的路径信息
"""
raise Exception("abstract method. Subclasses should implement it.")

@classmethod
def find_ut_by_regex(cls, regex):
ut_files = []
cmd = "find {} -name {}".format(str(cls.test_dir), regex)
status, output = subprocess.getstatusoutput(cmd)
# 对于找不到的暂不作处理
if not(status or not output):
files = output.split('\n')
for ut_file in files:
ut_file_basename = os.path.basename(ut_file)
if ut_file_basename.startswith("test") and ut_file.endswith(".py"):
ut_files.append(ut_file)
return ut_files
13 changes: 13 additions & 0 deletions access_all_tests/test_units/strategy/copy_opt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .base import AccurateTest


class CopyOptStrategy(AccurateTest):
"""
通过识别非连续转连续的测试用例
"""

def identify(self, modify_file):
if modify_file.find('contiguous') > 0:
regex = '*contiguous*'
return AccurateTest.find_ut_by_regex(regex)
return []
18 changes: 18 additions & 0 deletions access_all_tests/test_units/strategy/core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from pathlib import Path
from .base import AccurateTest


class CoreTestStrategy(AccurateTest):
"""
Determine whether the core tests should be runned
"""
def __init__(self):
super().__init__()
self.block_list = ['test', 'docs']
self.core_test_cases = [str(i) for i in (self.base_dir / 'test/npu').rglob('test_*.py')]

def identify(self, modify_file):
modified_module = str(Path(modify_file).parts[0])
if modified_module not in self.block_list:
return self.core_test_cases
return []
55 changes: 55 additions & 0 deletions access_all_tests/test_units/strategy/directory_mapping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from pathlib import Path
from .base import AccurateTest


class DirectoryMappingStrategy(AccurateTest):
"""
Map the modified files to the corresponding test cases
"""
mapping_list = {
'contrib': 'test/contrib',
'cpp_extension': 'test/cpp_extensions',
'distributed': 'test/distributed',
'fx': 'test/test_fx.py',
'optim': 'test/optim',
'profiler': 'test/profiler',
'onnx': 'test/onnx',
'utils': 'test/test_utils.py',
'testing': 'test/test_testing.py',
'jit': 'test/test_jit.py',
'rpc': 'test/distributed/rpc',
'dynamo': 'test/dynamo',
'checkpoint': 'test/distributed/checkpoint',
}

@staticmethod
def get_module_name(modify_file):
module_name = str(Path(modify_file).parts[1])
if module_name == 'csrc':
module_name = str(Path(modify_file).parts[2])
for part in Path(modify_file).parts:
if part == 'rpc' or part == 'checkpoint':
module_name = part
if module_name == 'utils' and Path(modify_file).parts[2] == 'cpp_extension.py':
module_name = 'cpp_extension'
if module_name == 'utils' and 'dynamo' in Path(modify_file).parts[2]:
module_name = 'dynamo'
return module_name

def identify(self, modify_file):
current_all_ut_path = []
if str(Path(modify_file).parts[0]) == 'torch_npu':
mapped_ut_path = []
module_name = self.get_module_name(modify_file)
if module_name in self.mapping_list:
mapped_ut_path.append(self.mapping_list[module_name])
file_name = str(Path(modify_file).stem)
if file_name in self.mapping_list:
mapped_ut_path.append(self.mapping_list[file_name])

for mapped_path in mapped_ut_path:
if Path.is_file(self.base_dir.joinpath(mapped_path)):
current_all_ut_path.append(str(self.base_dir.joinpath(mapped_path)))
else:
current_all_ut_path += [str(i) for i in (self.base_dir.joinpath(mapped_path)).glob('test_*.py')]
return current_all_ut_path
25 changes: 25 additions & 0 deletions access_all_tests/test_units/strategy/op.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import re
from pathlib import Path
from .base import AccurateTest


class OpStrategy(AccurateTest):
"""
通过对适配层的代码的识别
"""

def identify(self, modify_file):
"""
通过对于算子实现文件的文件名解析获取其单元测试的名字,比如:
BinaryCrossEntropyWithLogitsBackwardKernelNpu.cpp
针对这个文件,先识别关键字BinaryCrossEntropyWithLogitsBackward
然后,获取其正则表达式*binary*cross*entropy*with*logits*backward*识别到符合要求的测试用例
具体方法:通过大写字母切分关键字,再识别包含所有这些关键字的测试文件名。
"""
filename = Path(modify_file).name
if filename.find('KernelNpu') >= 0:
feature_line = filename.split('KernelNpu')[0]
features = re.findall('[A-Z][^A-Z]*', feature_line)
regex = '*' + '*'.join([f"{feature.lower()}" for feature in features]) + '*'
return AccurateTest.find_ut_by_regex(regex)
return []
14 changes: 14 additions & 0 deletions access_all_tests/test_units/strategy/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import re
from pathlib import Path
from .base import AccurateTest


class TestFileStrategy(AccurateTest):
"""
Determine whether the modified files are test cases
"""

def identify(self, modify_file):
is_test_file = str(Path(modify_file).parts[0]) == "test" \
and re.match("test_(.+).py", Path(modify_file).name)
return [(str(self.base_dir.joinpath(modify_file)))] if is_test_file else []
Loading