diff --git a/docs/source/3x/benchmark.md b/docs/source/3x/benchmark.md deleted file mode 100644 index 571e0f83f80..00000000000 --- a/docs/source/3x/benchmark.md +++ /dev/null @@ -1,61 +0,0 @@ -Benchmark ---- - -1. [Introduction](#introduction) - -2. [Supported Matrix](#supported-matrix) - -3. [Usage](#usage) - -## Introduction - -Intel Neural Compressor provides a command `incbench` to launch the Intel CPU performance benchmark. - -To get the peak performance on Intel Xeon CPU, we should avoid crossing NUMA node in one instance. -Therefore, by default, `incbench` will trigger 1 instance on the first NUMA node. - -## Supported Matrix - -| Platform | Status | -|:---:|:---:| -| Linux | ✔ | -| Windows | ✔ | - -## Usage - -| Parameters | Default | comments | -|:----------------------:|:------------------------:|:-------------------------------------:| -| num_instances | 1 | Number of instances | -| num_cores_per_instance | None | Number of cores in each instance | -| C, cores | 0-${num_cores_on_NUMA-1} | decides the visible core range | -| cross_memory | False | whether to allocate memory cross NUMA | - -> Note: cross_memory is set to True only when memory is insufficient. - -### General Use Cases - -1. `incbench main.py`: run 1 instance on NUMA:0. -2. `incbench --num_i 2 main.py`: run 2 instances on NUMA:0. -3. `incbench --num_c 2 main.py`: run multi-instances with 2 cores per instance on NUMA:0. -4. `incbench -C 24-47 main.py`: run 1 instance on COREs:24-47. -5. `incbench -C 24-47 --num_c 4 main.py`: run multi-instances with 4 COREs per instance on COREs:24-47. - -> Note: - > - `num_i` works the same as `num_instances` - > - `num_c` works the same as `num_cores_per_instance` - -### Dump Throughput and Latency Summary - -To merge benchmark results from multi-instances, "incbench" automatically checks log file messages for "throughput" and "latency" information matching the following patterns. - -```python -throughput_pattern = r"[T,t]hroughput:\s*([0-9]*\.?[0-9]+)\s*([a-zA-Z/]*)" -latency_pattern = r"[L,l]atency:\s*([0-9]*\.?[0-9]+)\s*([a-zA-Z/]*)" -``` - -#### Demo usage - -```python -print("Throughput: {:.3f} samples/sec".format(throughput)) -print("Latency: {:.3f} ms".format(latency * 10**3)) -``` diff --git a/docs/source/benchmark.md b/docs/source/benchmark.md deleted file mode 100644 index 6bd2bf1ea7e..00000000000 --- a/docs/source/benchmark.md +++ /dev/null @@ -1,60 +0,0 @@ -Benchmarking (Deprecated) -=============== -> Since version >= 3.3, this functionality has been replaced by [incbench](https://github.com/intel/neural-compressor/blob/master/docs/source/3x/benchmark.md), which is easier to use. ----------------------------- -1. [Introduction](#Introduction) -2. [Benchmark Support Matrix](#Benchmark-Support-Matrix) -3. [Get Started with Benchmark](#Get-Started-with-Benchmark) -4. [Examples](#Examples) - -## Introduction -The benchmarking feature of Neural Compressor is used to measure the model performance with the objective settings. -Users can get the performance of the float32 model and the optimized low precision model in the same scenarios. - -## Benchmark Support Matrix - - - - - - - - - - - - - - - - - - - - - - - - - - -
EnvironmentCategory
Operating System linux
windows
Architecture x86_64
aarch64
gpu
- -## Get Started with Benchmark API - -Benchmark provide capability to automatically run with multiple instance through `cores_per_instance` and `num_of_instance` config (CPU only). -And please make sure `cores_per_instance * num_of_instance` must be less than CPU physical core numbers. -`benchmark.fit` accept `b_dataloader` or `b_func` as input. -`b_func` is customized benchmark function. If user passes the `b_dataloader`, then `b_func` is not required. - -```python -from neural_compressor.config import BenchmarkConfig -from neural_compressor.benchmark import fit - -conf = BenchmarkConfig(warmup=10, iteration=100, cores_per_instance=4, num_of_instance=7) -fit(model="./int8.pb", conf=conf, b_dataloader=eval_dataloader) -``` - -## Examples - -Refer to the [Benchmark example](../../examples/deprecated/helloworld/tf_example5). diff --git a/neural_compressor/benchmark.py b/neural_compressor/benchmark.py deleted file mode 100644 index 9cdbde59401..00000000000 --- a/neural_compressor/benchmark.py +++ /dev/null @@ -1,519 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright (c) 2021 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Benchmark is used for evaluating the model performance.""" -import json -import os -import re -import signal -import subprocess -import sys -from threading import Thread - -import numpy as np -import psutil - -from neural_compressor.profiling.parser.factory import ParserFactory -from neural_compressor.profiling.profiler.factory import ProfilerFactory - -from .adaptor import FRAMEWORKS -from .config import BenchmarkConfig, options -from .data import check_dataloader -from .model import BaseModel, Model -from .objective import MultiObjective -from .profiling.parser.parser import ProfilingParser -from .profiling.profiler.profiler import Profiler -from .utils import OPTIONS, alias_param, logger -from .utils.utility import GLOBAL_STATE, MODE, Statistics, dump_table, print_table - - -def set_env_var(env_var, value, overwrite_existing=False): - """Set the specified environment variable. - - Only set new env in two cases: - 1. env not exists - 2. env already exists but overwrite_existing params set True - """ - if overwrite_existing or not os.environ.get(env_var): - os.environ[env_var] = str(value) - - -def set_all_env_var(conf, overwrite_existing=False): - """Set all the environment variables with the configuration dict. - - Neural Compressor only uses physical cores - """ - cpu_counts = psutil.cpu_count(logical=False) - assert isinstance(conf, BenchmarkConfig), "input has to be a Config object" - - if conf.cores_per_instance is not None: - assert ( - conf.cores_per_instance * conf.num_of_instance <= cpu_counts - ), "num_of_instance * cores_per_instance should <= cpu physical cores" - else: - assert conf.num_of_instance <= cpu_counts, "num_of_instance should <= cpu counts" - conf.cores_per_instance = int(cpu_counts / conf.num_of_instance) - for var, value in dict(conf).items(): - set_env_var(var.upper(), value, overwrite_existing) - - -def get_architecture(): - """Get the architecture name of the system.""" - p1 = subprocess.Popen("lscpu", stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - p2 = subprocess.Popen(["grep", "Architecture"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["cut", "-d", ":", "-f2"], stdin=p2.stdout, stdout=subprocess.PIPE) - res = None - for line in iter(p3.stdout.readline, b""): - res = line.decode("utf-8").strip() - return res - - -def get_threads_per_core(): - """Get the threads per core.""" - p1 = subprocess.Popen("lscpu", stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - p2 = subprocess.Popen(["grep", "Thread(s) per core"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["cut", "-d", ":", "-f2"], stdin=p2.stdout, stdout=subprocess.PIPE) - res = None - for line in iter(p3.stdout.readline, b""): - res = line.decode("utf-8").strip() - return res - - -def get_threads(): - """Get the list of threads.""" - p1 = subprocess.Popen(["cat", "/proc/cpuinfo"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - p2 = subprocess.Popen(["grep", "processor"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["cut", "-d", ":", "-f2"], stdin=p2.stdout, stdout=subprocess.PIPE) - res = [] - for line in iter(p3.stdout.readline, b""): - res.append(line.decode("utf-8").strip()) - return res - - -def get_physical_ids(): - """Get the list of sockets.""" - p1 = subprocess.Popen(["cat", "/proc/cpuinfo"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - p2 = subprocess.Popen(["grep", "physical id"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["cut", "-d", ":", "-f2"], stdin=p2.stdout, stdout=subprocess.PIPE) - res = [] - for line in iter(p3.stdout.readline, b""): - res.append(line.decode("utf-8").strip()) - return res - - -def get_core_ids(): - """Get the ids list of the cores.""" - p1 = subprocess.Popen(["cat", "/proc/cpuinfo"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - p2 = subprocess.Popen(["grep", "core id"], stdin=p1.stdout, stdout=subprocess.PIPE) - p3 = subprocess.Popen(["cut", "-d", ":", "-f2"], stdin=p2.stdout, stdout=subprocess.PIPE) - res = [] - for line in iter(p3.stdout.readline, b""): - res.append(line.decode("utf-8").strip()) - return res - - -def get_bounded_threads(core_ids, threads, sockets): - """Return the threads id list that we will bind instances to.""" - res = [] - existing_socket_core_list = [] - for idx, x in enumerate(core_ids): - socket_core = sockets[idx] + ":" + x - if socket_core not in existing_socket_core_list: - res.append(int(threads[idx])) - existing_socket_core_list.append(socket_core) - return res - - -def run_instance(model, conf, b_dataloader=None, b_func=None): - """Run the instance with the configuration. - - Args: - model (object): The model to be benchmarked. - conf (BenchmarkConfig): The configuration for benchmark containing accuracy goal, - tuning objective and preferred calibration & quantization - tuning space etc. - b_dataloader: The dataloader for frameworks. - b_func: Customized benchmark function. If user passes the dataloader, - then b_func is not needed. - """ - results = {} - if b_func is None: - GLOBAL_STATE.STATE = MODE.BENCHMARK - framework_specific_info = { - "device": conf.device, - "approach": None, - "random_seed": options.random_seed, - "backend": conf.backend if conf.backend is not None else "default", - "format": "default", - } - framework = conf.framework.lower() - if "tensorflow" in framework: - framework_specific_info.update( - {"inputs": conf.inputs, "outputs": conf.outputs, "recipes": {}, "workspace_path": options.workspace} - ) - if framework == "keras": - framework_specific_info.update({"workspace_path": options.workspace}) - if "onnx" in framework: - framework_specific_info.update( - {"workspace_path": options.workspace, "graph_optimization": OPTIONS[framework].graph_optimization} - ) - if framework == "pytorch_ipex" or framework == "pytorch" or framework == "pytorch_fx": - framework_specific_info.update({"workspace_path": options.workspace, "q_dataloader": None}) - - assert isinstance(model, BaseModel), "need set neural_compressor Model for quantization...." - - adaptor = FRAMEWORKS[framework](framework_specific_info) - - assert b_dataloader is not None, "dataloader should not be None" - - from neural_compressor.utils.create_obj_from_config import create_eval_func - - b_func = create_eval_func(conf.framework, b_dataloader, adaptor, None, iteration=conf.iteration) - - objectives = MultiObjective(["performance"], {"relative": 0.1}, is_measure=True) - - val = objectives.evaluate(b_func, model) - # measurer contain info not only performance(eg, memory, model_size) - # also measurer have result list among steps - acc, _ = val - batch_size = b_dataloader.batch_size - warmup = conf.warmup - if len(objectives.objectives[0].result_list()) < warmup: - if len(objectives.objectives[0].result_list()) > 1 and warmup != 0: - warmup = 1 - else: - warmup = 0 - - result_list = objectives.objectives[0].result_list()[warmup:] - latency = np.array(result_list).mean() / batch_size - results["performance"] = acc, batch_size, result_list - - logger.info("\nbenchmark result:") - for i, res in enumerate(result_list): - logger.debug("Iteration {} result {}:".format(i, res)) - logger.info("Batch size = {}".format(batch_size)) - logger.info("Latency: {:.3f} ms".format(latency * 1000)) - logger.info("Throughput: {:.3f} images/sec".format(1.0 / latency)) - return results - else: - b_func(model.model) - - -def generate_prefix(core_list): - """Generate the command prefix with numactl. - - Args: - core_list: a list of core indexes bound with specific instances - """ - if sys.platform in ["linux"] and os.system("numactl --show >/dev/null 2>&1") == 0: - return "OMP_NUM_THREADS={} numactl --localalloc --physcpubind={}".format( - len(core_list), ",".join(core_list.astype(str)) - ) - elif sys.platform in ["win32"]: # pragma: no cover - # (TODO) should we move the hw_info from ux? - from neural_compressor.utils.utility import get_number_of_sockets - - num_of_socket = int(get_number_of_sockets()) - cores_per_instance = int(os.environ.get("CORES_PER_INSTANCE")) - cores_per_socket = int(psutil.cpu_count(logical=False)) / num_of_socket - socket_id = int(core_list[0] // cores_per_socket) - # cores per socket should integral multiple of cores per instance, else not bind core - if cores_per_socket % cores_per_instance == 0: - from functools import reduce - - hex_core = hex(reduce(lambda x, y: x | y, [1 << p for p in core_list])) - return "start /b /WAIT /node {} /affinity {} CMD /c".format(socket_id, hex_core) - else: - return "" - - -def call_one(cmd, log_file): - """Execute one command for one instance in one thread and dump the log (for Windows).""" - proc = subprocess.Popen( - cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True - ) # nosec - with open(log_file, "w", 1, encoding="utf-8") as log_file: - log_file.write(f"[ COMMAND ] {cmd} \n") - for line in proc.stdout: - decoded_line = line.decode("utf-8", errors="ignore").strip() - logger.info(decoded_line) # redirect to terminal - log_file.write(decoded_line + "\n") - - -def config_instance(raw_cmd): - """Configure the multi-instance commands and trigger benchmark with sub process. - - Args: - raw_cmd: raw command used for benchmark - """ - multi_instance_cmd = "" - num_of_instance = int(os.environ.get("NUM_OF_INSTANCE")) - cores_per_instance = int(os.environ.get("CORES_PER_INSTANCE")) - - logger.info("num of instance: {}".format(num_of_instance)) - logger.info("cores per instance: {}".format(cores_per_instance)) - - if sys.platform in ["linux"] and get_architecture() == "aarch64" and int(get_threads_per_core()) > 1: - raise OSError("Currently no support on ARM with hyperthreads") - elif sys.platform in ["linux"]: - bounded_threads = get_bounded_threads(get_core_ids(), get_threads(), get_physical_ids()) - - for i in range(0, num_of_instance): - if sys.platform in ["linux"] and get_architecture() == "x86_64": - core_list_idx = np.arange(0, cores_per_instance) + i * cores_per_instance - core_list = np.array(bounded_threads)[core_list_idx] - else: - core_list = np.arange(0, cores_per_instance) + i * cores_per_instance - # bind cores only allowed in linux/mac os with numactl enabled - prefix = generate_prefix(core_list) - instance_cmd = "{} {}".format(prefix, raw_cmd) - if sys.platform in ["linux"]: - instance_log = "{}_{}_{}.log".format(num_of_instance, cores_per_instance, i) - multi_instance_cmd += "{} 2>&1|tee {} & \\\n".format(instance_cmd, instance_log) - else: # pragma: no cover - multi_instance_cmd += "{} \n".format(instance_cmd) - - multi_instance_cmd += "wait" if sys.platform in ["linux"] else "" - logger.info("Running command is\n{}".format(multi_instance_cmd)) - # each instance will execute single instance - set_env_var("NC_ENV_CONF", True, overwrite_existing=True) - if sys.platform in ["linux"]: - p = subprocess.Popen(multi_instance_cmd, preexec_fn=os.setsid, shell=True) # nosec - elif sys.platform in ["win32"]: # pragma: no cover - cmd_list = multi_instance_cmd.split("\n")[:-1] - threads = [] - for idx, cmd in enumerate(cmd_list): - # wrap each execution of windows bat file in one thread - # write the log to the log file of the corresponding instance - logger.info("Will dump to {}_{}_{}.log".format(num_of_instance, cores_per_instance, idx)) - threads.append( - Thread(target=call_one, args=(cmd, "{}_{}_{}.log".format(num_of_instance, cores_per_instance, idx))) - ) - for command_thread in threads: - command_thread.start() - logger.info("Worker threads start") - # Wait for all of them to finish - for command_thread in threads: - command_thread.join() - logger.info("Worker threads join") - return - try: - p.communicate() - except KeyboardInterrupt: - os.killpg(os.getpgid(p.pid), signal.SIGKILL) - - -def summary_benchmark(): - """Get the summary of the benchmark.""" - if sys.platform in ["linux"]: - num_of_instance = int(os.environ.get("NUM_OF_INSTANCE")) - cores_per_instance = int(os.environ.get("CORES_PER_INSTANCE")) - latency_l = [] - throughput_l = [] - for i in range(0, num_of_instance): - log = "{}_{}_{}.log".format(num_of_instance, cores_per_instance, i) - with open(log, "r") as f: - for line in f: - latency = re.search(r"[L,l]atency:\s+(\d+(\.\d+)?)", line) - latency_l.append(float(latency.group(1))) if latency and latency.group(1) else None - throughput = re.search(r"[T,t]hroughput:\s+(\d+(\.\d+)?)", line) - throughput_l.append(float(throughput.group(1))) if throughput and throughput.group(1) else None - if throughput_l and latency_l: - assert ( - len(latency_l) == len(throughput_l) == num_of_instance - ), "Multiple instance benchmark failed with some instance!" - - output_data = [ - ["Latency average [second/sample]", "{:.6f}".format((sum(latency_l) / len(latency_l)) / 1000)], - ["Throughput sum [samples/second]", "{:.3f}".format(sum(throughput_l))], - ] - logger.info("********************************************") - Statistics( - output_data, header="Multiple Instance Benchmark Summary", field_names=["Items", "Result"] - ).print_stat() - else: - # (TODO) should add summary after win32 benchmark has log - pass - - -def profile(model, conf, b_dataloader) -> None: - """Execute profiling for benchmark configuration. - - Args: - model: The model to be profiled. - conf: The configuration for benchmark containing accuracy goal, - tuning objective and preferred calibration & quantization - tuning space etc. - b_dataloader: The dataloader for frameworks. - - Returns: - None - """ - intra_num_of_threads = 1 - inter_num_of_threads = 1 - num_warmup = 10 - - intra_num_of_threads_conf = conf.intra_num_of_threads - if intra_num_of_threads_conf is not None: - intra_num_of_threads = intra_num_of_threads_conf - else: - logger.warning( - f"Could not find intra_num_of_threads value in config. Using: {intra_num_of_threads}", - ) - - inter_num_of_threads_conf = conf.inter_num_of_threads - if inter_num_of_threads_conf is not None: - inter_num_of_threads = inter_num_of_threads_conf - else: - logger.warning( - f"Could not find inter_num_of_threads value in config. Using: {inter_num_of_threads}", - ) - - num_warmup_conf = conf.warmup - if num_warmup_conf is not None: - num_warmup = num_warmup_conf - else: - logger.warning( - f"Could not get find num_warmup value in config. Using: {num_warmup}", - ) - - profiling_log = os.path.abspath( - os.path.join( - options.workspace, - ), - ) - profiler: Profiler = ProfilerFactory.get_profiler( - model=model, - dataloader=b_dataloader, - log_file=profiling_log, - ) - profiler.profile_model( - intra_num_of_threads=intra_num_of_threads, - inter_num_of_threads=inter_num_of_threads, - num_warmup=num_warmup, - ) - parser: ProfilingParser = ParserFactory.get_parser( - model=model, - logs=[profiling_log], - ) - parsed_results = parser.process() - print_table( - title="Profiling", - column_mapping={ - "Node name": "node_name", - "Total execution time [us]": "total_execution_time", - "Accelerator execution time [us]": "accelerator_execution_time", - "CPU execution time [us]": "cpu_execution_time", - "OP run": "op_run", - "OP defined": "op_defined", - }, - table_entries=parsed_results, - ) - - profiling_table_file = os.path.join( - options.workspace, - "profiling_table.csv", - ) - - dump_table( - filepath=profiling_table_file, - column_mapping={ - "Node name": "node_name", - "Total execution time [us]": "total_execution_time", - "Accelerator execution time [us]": "accelerator_execution_time", - "CPU execution time [us]": "cpu_execution_time", - "OP run": "op_run", - "OP defined": "op_defined", - }, - table_entries=parsed_results, - file_type="csv", - ) - - profiling_data_file = os.path.join( - options.workspace, - "profiling_data.json", - ) - with open(profiling_data_file, "w") as profiling_json: - json.dump(parsed_results, profiling_json, indent=4) - - -def benchmark_with_raw_cmd(raw_cmd, conf=None): - """Benchmark the model performance with the raw command. - - Args: - raw_cmd (string): The command to be benchmarked. - conf (BenchmarkConfig): The configuration for benchmark containing accuracy goal, - tuning objective and preferred calibration & quantization - tuning space etc. - - Example:: - - # Run benchmark according to config - from neural_compressor.benchmark import fit_with_raw_cmd - - conf = BenchmarkConfig(iteration=100, cores_per_instance=4, num_of_instance=7) - fit_with_raw_cmd("test.py", conf) - """ - if conf is not None: - if conf.backend == "ipex": - import intel_extension_for_pytorch - assert sys.platform in ["linux", "win32"], "only support platform windows and linux..." - # disable multi-instance for running benchmark on GPU device - set_all_env_var(conf) - - config_instance(raw_cmd) - summary_benchmark() - - -@alias_param("conf", param_alias="config") -def fit(model, conf, b_dataloader=None, b_func=None): - """Benchmark the model performance with the configure. - - Args: - model (object): The model to be benchmarked. - conf (BenchmarkConfig): The configuration for benchmark containing accuracy goal, - tuning objective and preferred calibration & quantization - tuning space etc. - b_dataloader: The dataloader for frameworks. - b_func: Customized benchmark function. If user passes the dataloader, - then b_func is not needed. - - Example:: - - # Run benchmark according to config - from neural_compressor.benchmark import fit - - conf = BenchmarkConfig(iteration=100, cores_per_instance=4, num_of_instance=7) - fit(model='./int8.pb', conf=conf, b_dataloader=eval_dataloader) - """ - if conf.backend == "ipex": - import intel_extension_for_pytorch - - wrapped_model = Model(model, conf=conf) - - if b_dataloader is not None: - check_dataloader(b_dataloader) - assert sys.platform in ["linux", "win32", "darwin"], "platform not supported..." - # disable multi-instance for running benchmark on GPU device - set_all_env_var(conf) - if conf.device == "gpu" or conf.device == "npu" or sys.platform == "darwin": - set_env_var("NC_ENV_CONF", True, overwrite_existing=True) - - logger.info("Start to run Benchmark.") - if os.environ.get("NC_ENV_CONF") == "True": - return run_instance(model=wrapped_model, conf=conf, b_dataloader=b_dataloader, b_func=b_func) - raw_cmd = sys.executable + " " + " ".join(sys.argv) - benchmark_with_raw_cmd(raw_cmd) diff --git a/neural_compressor/common/benchmark.py b/neural_compressor/common/benchmark.py deleted file mode 100644 index 5c3ba3ee765..00000000000 --- a/neural_compressor/common/benchmark.py +++ /dev/null @@ -1,524 +0,0 @@ -# Copyright (c) 2024 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Benchmark API for Intel Neural Compressor.""" - -import argparse -import os -import re -import subprocess -import sys - -import psutil - -from neural_compressor.common.utils import Statistics, get_workspace, logger - -description = """ -################################################################################################################## -This is the command used to launch the Intel CPU performance benchmark, supports both Linux and Windows platform. -To get the peak performance on Intel Xeon CPU, we should avoid crossing NUMA node in one instance. -By default, `incbench` will trigger 1 instance on the first NUMA node. - -Params in `incbench`: - - num_instances Default to 1. - - num_cores_per_instance Default to None. - - C, cores Default to 0-${num_cores_on_NUMA-1}, decides the visible core range. - - cross_memory Default to False, decides whether to allocate memory cross NUMA. - Note: Use it only when memory for instance is not enough. - -# General use cases: -1. `incbench main.py`: run 1 instance on NUMA:0. -2. `incbench --num_i 2 main.py`: run 2 instances on NUMA:0. -3. `incbench --num_c 2 main.py`: run multi-instances with 2 cores per instance on NUMA:0. -4. `incbench -C 24-47 main.py`: run 1 instance on COREs:24-47. -5. `incbench -C 24-47 --num_c 4 main.py`: run multi-instances with 4 COREs per instance on COREs:24-47. - -Note: - - `num_i` works the same as `num_instances` - - `num_c` works the same as `num_cores_per_instance` -################################################################################################################## -""" - - -def get_linux_numa_info(): - """Collect numa/socket information on linux system. - - Returns: - numa_info (dict): demo: {numa_index: {"physical_cpus": "xxx"; "logical_cpus": "xxx"}} - E.g. numa_info = { - 0: {"physical_cpus": "0-23", "logical_cpus": "0-23,48-71"}, - 1: {"physical_cpus": "24-47", "logical_cpus": "24-47,72-95"} - } - """ - result = subprocess.run(["lscpu"], capture_output=True, text=True) - output = result.stdout - - numa_info = {} - for line in output.splitlines(): - # demo: "NUMA node0 CPU(s): 0-3" - node_match = re.match(r"^NUMA node(\d+) CPU\(s\):\s+(.*)$", line) - if node_match: - node_id = int(node_match.group(1)) - cpus = node_match.group(2).strip() - numa_info[node_id] = { - "physical_cpus": cpus.split(",")[0], - "logical_cpus": ",".join(cpus.split(",")), - } - - # if numa_info is not collected, we go back to socket_info - if not numa_info: # pragma: no cover - for line in output.splitlines(): - # demo: "Socket(s): 2" - socket_match = re.match(r"^Socket\(s\):\s+(.*)$", line) - if socket_match: - num_socket = int(socket_match.group(1)) - # process big cores (w/ physical cores) and small cores (w/o physical cores) - physical_cpus = psutil.cpu_count(logical=False) - logical_cpus = psutil.cpu_count(logical=True) - physical_cpus_per_socket = physical_cpus // num_socket - logical_cpus_per_socket = logical_cpus // num_socket - for i in range(num_socket): - physical_cpus_str = str(i * physical_cpus_per_socket) + "-" + str((i + 1) * physical_cpus_per_socket - 1) - if num_socket == 1: - logical_cpus_str = str(i * logical_cpus_per_socket) + "-" + str((i + 1) * logical_cpus_per_socket - 1) - else: - remain_cpus = logical_cpus_per_socket - physical_cpus_per_socket - logical_cpus_str = ( - physical_cpus_str - + "," - + str(i * (remain_cpus) + physical_cpus) - + "-" - + str((i + 1) * remain_cpus + physical_cpus - 1) - ) - numa_info[i] = { - "physical_cpus": physical_cpus_str, - "logical_cpus": logical_cpus_str, - } - return numa_info - - -def get_windows_numa_info(): - """Collect socket information on Windows system due to no available numa info. - - Returns: - numa_info (dict): demo: {numa_index: {"physical_cpus": "xxx"; "logical_cpus": "xxx"}} - E.g. numa_info = { - 0: {"physical_cpus": "0-23", "logical_cpus": "0-23,48-71"}, - 1: {"physical_cpus": "24-47", "logical_cpus": "24-47,72-95"} - } - """ - # pylint: disable=import-error - # pragma: no cover - import wmi - - c = wmi.WMI() - processors = c.Win32_Processor() - socket_designations = set() - for processor in processors: - socket_designations.add(processor.SocketDesignation) - num_socket = len(socket_designations) - physical_cpus = sum(processor.NumberOfCores for processor in processors) - logical_cpus = sum(processor.NumberOfLogicalProcessors for processor in processors) - physical_cpus_per_socket = physical_cpus // num_socket - logical_cpus_per_socket = logical_cpus // num_socket - - numa_info = {} - for i in range(num_socket): - physical_cpus_str = str(i * physical_cpus_per_socket) + "-" + str((i + 1) * physical_cpus_per_socket - 1) - if num_socket == 1: - logical_cpus_str = str(i * logical_cpus_per_socket) + "-" + str((i + 1) * logical_cpus_per_socket - 1) - else: - remain_cpus = logical_cpus_per_socket - physical_cpus_per_socket - logical_cpus_str = ( - physical_cpus_str - + "," - + str(i * (remain_cpus) + physical_cpus) - + "-" - + str((i + 1) * remain_cpus + physical_cpus - 1) - ) - numa_info[i] = { - "physical_cpus": physical_cpus_str, - "logical_cpus": logical_cpus_str, - } - return numa_info - - -def dump_numa_info(): - """Fetch NUMA info and dump stats in shell, return numa_info. - - Returns: - numa_info (dict): {numa_node_index: list of Physical CPUs in this numa node, ...} - """ - if psutil.WINDOWS: # pragma: no cover - numa_info = get_windows_numa_info() - elif psutil.LINUX: - numa_info = get_linux_numa_info() - else: # pragma: no cover - logger.error(f"Unsupported platform detected: {sys.platform}, only supported on Linux and Windows") - - # dump stats to shell - field_names = ["NUMA node", "Physical CPUs", "Logical CPUs"] - output_data = [] - for op_type in numa_info.keys(): - field_results = [op_type, numa_info[op_type]["physical_cpus"], numa_info[op_type]["logical_cpus"]] - output_data.append(field_results) - Statistics(output_data, header="CPU Information", field_names=field_names).print_stat() - - # parse numa_info for ease-of-use - for n in numa_info: - numa_info[n] = parse_str2list(numa_info[n]["physical_cpus"]) - return numa_info - - -def parse_str2list(cpu_ranges): - """Parse '0-4,7,8' into [0,1,2,3,4,7,8] for machine readable.""" - cpus = [] - ranges = cpu_ranges.split(",") - for r in ranges: - if "-" in r: - try: - start, end = r.split("-") - cpus.extend(range(int(start), int(end) + 1)) - except ValueError: # pragma: no cover - raise ValueError(f"Invalid range: {r}") - else: - try: - cpus.append(int(r)) - except ValueError: # pragma: no cover - raise ValueError(f"Invalid number: {r}") - return cpus - - -def format_list2str(cpus): - """Format [0,1,2,3,4,7,8] back to '0-4,7,8' for human readable.""" - if not cpus: # pragma: no cover - return "" - cpus = sorted(set(cpus)) - ranges = [] - start = cpus[0] - end = start - for i in range(1, len(cpus)): - if cpus[i] == end + 1: - end = cpus[i] - else: - if start == end: - ranges.append(f"{start}") - else: - ranges.append(f"{start}-{end}") - start = cpus[i] - end = start - if start == end: - ranges.append(f"{start}") - else: - ranges.append(f"{start}-{end}") - return ",".join(ranges) - - -def get_reversed_numa_info(numa_info): - """Reverse numa_info.""" - reversed_numa_info = {} - for n, cpu_info in numa_info.items(): - for i in cpu_info: - reversed_numa_info[i] = n - return reversed_numa_info - - -def get_numa_node(core_list, reversed_numa_info): - """Return numa node used in current core_list.""" - numa_set = set() - for c in core_list: - assert c in reversed_numa_info, "Cores should be in physical CPUs" - numa_set.add(reversed_numa_info[c]) - return numa_set - - -def set_cores_for_instance(args, numa_info): - """Set cores for each instance based on the input args. - - All use cases are listed below: - Params: a=num_instance; b=num_cores_per_instance; c=cores; - - no a, b, c: a=1, c=numa:0 - - no a, b: a=1, c=c - - no a, c: a=numa:0/b, c=numa:0 - - no b, c: a=a, c=numa:0 - - no a: a=numa:0/b, c=c - - no b: a=a, c=c - - no c: a=a, c=a*b - - a, b, c: a=a, c=a*b - - Args: - args (argparse): arguments for setting different configurations - numa_info (dict): {numa_node_index: list of Physical CPUs in this numa node, ...} - - Returns: - core_list_per_instance (dict): {"instance_index": ["node_index", "cpu_index", num_cpu]} - """ - available_cores_list = [] - for n in numa_info: - available_cores_list.extend(numa_info[n]) - # preprocess args.cores to set default values - if args.cores is None: - if args.num_cores_per_instance and args.num_instances: - target_cores = args.num_instances * args.num_cores_per_instance - assert target_cores <= len(available_cores_list), ( - "Invalid configuration: num_instances * num_cores_per_instance = " - + "{} exceeds the number of physical CPUs = {}.".format(target_cores, len(available_cores_list)) - ) - cores_list = list(range(target_cores)) - # log for cores in use - logger.info("num_instances * num_cores_per_instance = {} cores are used.".format(target_cores)) - else: - # default behavior, only use numa:0 - cores_list = numa_info[0] - # log for cores in use - logger.info("By default, Intel Neural Compressor uses all cores on numa:0.") - else: - cores_list = parse_str2list(args.cores) - # log for cores available - logger.info("{} cores are available.".format(len(cores_list))) - if args.num_cores_per_instance and args.num_instances: - target_cores = args.num_instances * args.num_cores_per_instance - assert target_cores <= len(cores_list), ( - "Invalid configuration: num_instances * num_cores_per_instance = " - + "{} exceeds the number of available CPUs = {}.".format(target_cores, len(cores_list)) - ) - cores_list = cores_list[:target_cores] - - # preprocess args.num_instances to set default values - if args.num_instances is None: - if args.num_cores_per_instance: - assert args.num_cores_per_instance <= len(cores_list), ( - "Invalid configuration: num_cores_per_instance = " - + "{} exceeds the number of available CPUs = {}.".format(args.num_cores_per_instance, len(cores_list)) - ) - args.num_instances = len(cores_list) // args.num_cores_per_instance - target_cores = args.num_instances * args.num_cores_per_instance - cores_list = cores_list[:target_cores] - else: - args.num_instances = 1 - logger.info("By default, Intel Neural Compressor triggers only one instance.") - else: - assert args.num_instances <= len( - cores_list - ), "Invalid configuration: num_instances = " + "{} exceeds the number of available CPUs = {}.".format( - args.num_instances, len(cores_list) - ) - - ### log for instances number and cores in use - if args.num_instances == 1: - logger.info("1 instance is triggered.") - else: - logger.info("{} instances are triggered.".format(args.num_instances)) - if len(cores_list) == 1: - logger.info("1 core is in use.") - else: - logger.info("{} cores are in use.".format(len(cores_list))) - - # only need to process num_cores_per_instance now - core_list_per_instance = {} - # num_cores_per_instance = all_cores / num_instances - num_cores_per_instance = len(cores_list) // args.num_instances - for i in range(args.num_instances): - core_list_per_instance[i] = cores_list[i * num_cores_per_instance : (i + 1) * num_cores_per_instance] - if len(cores_list) % args.num_instances != 0: # pragma: no cover - last_index = args.num_instances - 1 - core_list_per_instance[last_index] = cores_list[last_index * num_cores_per_instance :] - - # convert core_list_per_instance = {"instance_index": cpu_index_list} - # -> {"instance_index": ["node_index", "cpu_index", num_cores]} - reversed_numa_info = get_reversed_numa_info(numa_info) - for i, core_list in core_list_per_instance.items(): - core_list_per_instance[i] = [ - format_list2str(get_numa_node(core_list, reversed_numa_info)), - format_list2str(core_list), - len(core_list), - ] - - # dump stats to shell - field_names = ["Instance", "NUMA node", "Physical CPUs", "Number of cores"] - output_data = [] - for i, core_list in core_list_per_instance.items(): - field_results = [i + 1, core_list[0], core_list[1], core_list[2]] - output_data.append(field_results) - Statistics(output_data, header="Instance Binding Information", field_names=field_names).print_stat() - return core_list_per_instance - - -def generate_prefix(args, core_list): - """Generate the command prefix with `numactl` (Linux) or `start` (Windows) command. - - Args: - args (argparse): arguments for setting different configurations - core_list: ["node_index", "cpu_index", num_cpu] - - Returns: - command_prefix (str): command_prefix with specific core list for Linux or Windows. - """ - if sys.platform in ["linux"] and os.system("numactl --show >/dev/null 2>&1") == 0: - if args.cross_memory: - return "OMP_NUM_THREADS={} numactl -l -C {}".format(core_list[2], core_list[1]) - else: - return "OMP_NUM_THREADS={} numactl -m {} -C {}".format(core_list[2], core_list[0], core_list[1]) - elif sys.platform in ["win32"]: # pragma: no cover - socket_id = core_list[0] - from functools import reduce - - hex_core = hex(reduce(lambda x, y: x | y, [1 << p for p in parse_str2list(core_list[1])])) - return "start /B /WAIT /node {} /affinity {}".format(socket_id, hex_core) - else: # pragma: no cover - return "" - - -def run_multi_instance_command(args, core_list_per_instance, raw_cmd): - """Build and trigger commands for multi-instances with subprocess. - - Args: - args (argparse): arguments for setting different configurations - core_list_per_instance (dict): {"instance_index": ["node_index", "cpu_index", num_cpu]} - raw_cmd (str): script.py and parameters for this script - """ - instance_cmd = "" - if not os.getenv("PYTHON_PATH"): # pragma: no cover - logger.info("The interpreter path is not set, using string `python` as command.") - logger.info("To replace it, use `export PYTHON_PATH=xxx`.") - interpreter = os.getenv("PYTHON_PATH", "python") - workspace_dir = get_workspace() - logfile_process_map = {} - logfile_dict = {} - for i, core_list in core_list_per_instance.items(): - # build cmd and log file path - prefix = generate_prefix(args, core_list) - instance_cmd = "{} {} {}".format(prefix, interpreter, raw_cmd) - logger.info(f"Instance {i+1}: {instance_cmd}") - instance_log_file = "{}_{}_{}C.log".format(i + 1, len(core_list_per_instance), core_list[2]) - instance_log_file = os.path.join(workspace_dir, instance_log_file) - # trigger subprocess - p = subprocess.Popen( - instance_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True - ) # nosec - # log_file_path: [process_object, instance_command, instance_index] - logfile_process_map[instance_log_file] = [p, instance_cmd, i + 1] - logfile_dict[i + 1] = instance_log_file - - # Dump each instance's standard output to the corresponding log file - for instance_log_file, p_cmd_i in logfile_process_map.items(): - # p.communicate() reads std to avoid dead-lock, p.wait() only return. - stdout, stderr = p_cmd_i[0].communicate() # stderr is merged to stdout, so it's None - with open(instance_log_file, "w", 1, encoding="utf-8") as log_file: - log_file.write(f"[COMMAND]: {p_cmd_i[1]}\n") - log_file.write(stdout.decode()) - logger.info(f"The log of instance {p_cmd_i[2]} is saved to {instance_log_file}") - - return logfile_dict - - -def summary_latency_throughput(logfile_dict): - """Get the summary of the benchmark.""" - throughput_pattern = r"[T,t]hroughput:\s*([0-9]*\.?[0-9]+)\s*([a-zA-Z/]*)" - latency_pattern = r"[L,l]atency:\s*([0-9]*\.?[0-9]+)\s*([a-zA-Z/]*)" - - latency_list = [] - throughput_list = [] - latency_unit_name = "" - throughput_unit_name = "" - for idx, logfile in logfile_dict.items(): - with open(logfile, "r") as f: - for line in f: - re_latency = re.search(latency_pattern, line) - re_throughput = re.search(throughput_pattern, line) - if re_latency: - latency_list.append(float(re_latency.group(1))) - if not latency_unit_name: - latency_unit_name = re_latency.group(2) - if re_throughput: - throughput_list.append(float(re_throughput.group(1))) - if not throughput_unit_name: - throughput_unit_name = re_throughput.group(2) - if throughput_list and latency_list: - assert ( - len(latency_list) == len(throughput_list) == len(logfile_dict) - ), "Multiple instance benchmark failed with some instances!" - - # dump collected latency and throughput info - header = "Multiple Instance Benchmark Summary" - field_names = [ - "Instance", - "Latency ({})".format(latency_unit_name), - "Throughput ({})".format(throughput_unit_name), - ] - output_data = [] - for idx, (latency, throughput) in enumerate(zip(latency_list, throughput_list)): - output_data.append([idx + 1, round(latency, 3), round(throughput, 3)]) - Statistics(output_data, header=header, field_names=field_names).print_stat() - # show summary info - logger.info("Average latency: {} {}".format(round(sum(latency_list) / len(latency_list), 3), latency_unit_name)) - logger.info("Total throughput: {} {}".format(round(sum(throughput_list), 3), throughput_unit_name)) - elif throughput_list: - assert len(throughput_list) == len(logfile_dict), "Multiple instance benchmark failed with some instances!" - - # dump collected throughput info - header = "Multiple Instance Benchmark Summary" - field_names = [ - "Instance", - "Throughput ({})".format(throughput_unit_name), - ] - output_data = [] - for idx, throughput in enumerate(throughput_list): - output_data.append([idx + 1, round(throughput, 3)]) - Statistics(output_data, header=header, field_names=field_names).print_stat() - # show summary info - logger.info("Total throughput: {} {}.hdfghdfghs".format(round(sum(throughput_list), 3), throughput_unit_name)) - elif latency_list: - assert len(latency_list) == len(logfile_dict), "Multiple instance benchmark failed with some instances!" - - # dump collected latency info - header = "Multiple Instance Benchmark Summary" - field_names = [ - "Instance", - "Latency ({})".format(latency_unit_name), - ] - output_data = [] - for idx, latency in enumerate(latency_list): - output_data.append([idx + 1, round(latency, 3)]) - Statistics(output_data, header=header, field_names=field_names).print_stat() - # show summary info - logger.info("Average latency: {} {}".format(round(sum(latency_list) / len(latency_list), 3), latency_unit_name)) - - -def benchmark(): - """Benchmark API interface.""" - logger.info("Start benchmark with Intel Neural Compressor.") - logger.info("Intel Neural Compressor only uses physical CPUs for the best performance.") - - parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument("--num_instances", type=int, default=None, help="Determine the number of instances.") - parser.add_argument( - "--num_cores_per_instance", - type=int, - default=None, - help="Determine the number of cores in 1 instance.", - ) - parser.add_argument("-C", "--cores", type=str, default=None, help="Determine the visible core range.") - parser.add_argument("--cross_memory", action="store_true", help="Determine the visible core range.") - parser.add_argument("script", type=str, help="The path to the script to launch.") - parser.add_argument("parameters", nargs=argparse.REMAINDER, help="arguments to the script.") - - args = parser.parse_args() - - assert sys.platform in ["linux", "win32"], "only support platform windows and linux..." - - numa_info = dump_numa_info() # show numa info and current usage of cores - core_list_per_instance = set_cores_for_instance(args, numa_info=numa_info) - script_and_parameters = args.script + " " + " ".join(args.parameters) - logfile_dict = run_multi_instance_command(args, core_list_per_instance, raw_cmd=script_and_parameters) - summary_latency_throughput(logfile_dict) diff --git a/setup.py b/setup.py index 77097e18176..4a86d024734 100644 --- a/setup.py +++ b/setup.py @@ -119,11 +119,6 @@ def get_build_version(): package_data = PKG_INSTALL_CFG[cfg_key].get("package_data") or {} install_requires = PKG_INSTALL_CFG[cfg_key].get("install_requires") or [] extras_require = PKG_INSTALL_CFG[cfg_key].get("extras_require") or {} - entry_points = { - "console_scripts": [ - "incbench = neural_compressor.common.benchmark:benchmark", - ] - } setup( name=project_name, diff --git a/test/3x/common/test_benchmark.py b/test/3x/common/test_benchmark.py deleted file mode 100644 index 78e94f70ace..00000000000 --- a/test/3x/common/test_benchmark.py +++ /dev/null @@ -1,171 +0,0 @@ -import os -import re -import shutil -import subprocess - -from neural_compressor.common.utils import DEFAULT_WORKSPACE - -# build files during test process to test benchmark -tmp_file_dict = {} -tmp = """ -print("test benchmark") -""" -tmp_file_dict["./tmp/tmp.py"] = tmp - -tmp = """ -print("test benchmark") -print("Throughput: 1 samples/sec") -print("Latency: 1000 ms") -""" -tmp_file_dict["./tmp/throughput_latency.py"] = tmp - -tmp = """ -print("test benchmark") -print("Throughput: 2 tokens/sec") -""" -tmp_file_dict["./tmp/throughput.py"] = tmp - -tmp = """ -print("test benchmark") -print("Latency: 10 ms") -""" -tmp_file_dict["./tmp/latency.py"] = tmp - - -def build_tmp_file(): - os.makedirs("./tmp") - for tmp_path, tmp in tmp_file_dict.items(): - f = open(tmp_path, "w") - f.write(tmp) - f.close() - - -def trigger_process(cmd): - # trigger subprocess - p = subprocess.Popen( - cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True - ) # nosec - return p - - -def check_main_process(message): - num_i_pattern = r"(.*) (\d+) instance(.*) triggered" - num_c_pattern = r"(.*) (\d+) core(.*) in use" - log_file_pattern = r"(.*) The log of instance 1 is saved to (.*)" - num_i = re.search(num_i_pattern, message, flags=re.DOTALL).group(2) - all_c = re.search(num_c_pattern, message).group(2) - log_file_path = re.search(log_file_pattern, message).group(2) - return int(num_i), int(all_c), log_file_path - - -def check_log_file(log_file_path): - output_pattern = r"(.*)test benchmark(.*)" - with open(log_file_path, "r") as f: - output = f.read() - f.close() - return re.match(output_pattern, output, flags=re.DOTALL) - - -class TestBenchmark: - def setup_class(self): - build_tmp_file() - - def teardown_class(self): - shutil.rmtree("./tmp") - shutil.rmtree("nc_workspace") - - def test_default(self): - cmd = "incbench tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 1, "the number of instance should be 1." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_only_num_i(self): - cmd = "incbench --num_i 2 tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 2, "the number of instance should be 2." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_only_num_c(self): - cmd = "incbench --num_c 1 tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == all_c, "the number of instance should equal the number of available cores." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_only_cores(self): - cmd = "incbench -C 0-1 tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 1, "the number of instance should be 1." - assert all_c == 2, "the number of available cores should be 2." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_num_i_num_c(self): - cmd = "incbench --num_i 2 --num_c 2 tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 2, "the number of instance should be 2." - assert all_c == 4, "the number of available cores should be 4." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_num_i_cores(self): - cmd = "incbench --num_i 2 -C 0-2,5,8 tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 2, "the number of instance should be 2." - assert all_c == 5, "the number of available cores should be 5." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_num_c_cores(self): - cmd = "incbench --num_c 2 -C 0-6 tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 3, "the number of instance should be all_c//num_c=3." - assert all_c == 6, "the number of available cores should be (all_c//num_c)*num_c=6." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_cross_memory(self): - cmd = "incbench --num_c 1 -C 0 --cross_memory tmp/tmp.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 1, "the number of instance should be all_c//num_c=1." - assert all_c == 1, "the number of available cores should be 1." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_throughput_latency(self): - cmd = "incbench --num_i 2 --num_c 2 -C 0-7 tmp/throughput_latency.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 2, "the number of instance should be 2." - assert all_c == 4, "the number of available cores should be num_i*num_c=4." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_throughput(self): - cmd = "incbench --num_i 2 --num_c 2 -C 0-7 tmp/throughput.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 2, "the number of instance should be 2." - assert all_c == 4, "the number of available cores should be num_i*num_c=4." - assert check_log_file(log_file_path), "instance output is not correct." - - def test_latency(self): - cmd = "incbench --num_i 2 --num_c 2 -C 0-7 tmp/latency.py" - p = trigger_process(cmd) - stdout, _ = p.communicate() - num_i, all_c, log_file_path = check_main_process(stdout.decode()) - assert num_i == 2, "the number of instance should be 2." - assert all_c == 4, "the number of available cores should be num_i*num_c=4." - assert check_log_file(log_file_path), "instance output is not correct." diff --git a/test/benchmark/test_benchmark_2.x.py b/test/benchmark/test_benchmark_2.x.py deleted file mode 100644 index 15db0a9c638..00000000000 --- a/test/benchmark/test_benchmark_2.x.py +++ /dev/null @@ -1,210 +0,0 @@ -"""Tests for neural_compressor benchmark.""" - -import os -import re -import shutil -import tempfile -import unittest - -import numpy as np -import psutil -import tensorflow as tf - -from neural_compressor.adaptor.tf_utils.util import write_graph -from neural_compressor.benchmark import benchmark_with_raw_cmd -from neural_compressor.config import BenchmarkConfig - - -def build_benchmark(): - seq = """ -from argparse import ArgumentParser -arg_parser = ArgumentParser(description='Parse args') -arg_parser.add_argument('--input_model', dest='input_model', default='input_model', help='input odel') -args = arg_parser.parse_args() -from neural_compressor.benchmark import fit -from neural_compressor.config import BenchmarkConfig -from neural_compressor.data import Datasets -from neural_compressor.data.dataloaders.dataloader import DataLoader -dataset = Datasets('tensorflow')['dummy']((100, 32, 32, 1), label=True) -b_dataloader = DataLoader(framework="tensorflow", dataset=dataset, batch_size=10) -conf = BenchmarkConfig(warmup=5, iteration=10, cores_per_instance=4, num_of_instance=2) -fit(args.input_model, conf, b_dataloader=b_dataloader) - """ - - seq1 = """ -from argparse import ArgumentParser -arg_parser = ArgumentParser(description='Parse args') -arg_parser.add_argument('--input_model', dest='input_model', default='input_model', help='input odel') -args = arg_parser.parse_args() -from neural_compressor.benchmark import fit -from neural_compressor.config import BenchmarkConfig -from neural_compressor.data import Datasets -dataset = Datasets('tensorflow')['dummy']((100, 32, 32, 1), label=True) -from neural_compressor.data.dataloaders.dataloader import DataLoader -conf = BenchmarkConfig(warmup=5, iteration=10, cores_per_instance=4, num_of_instance=2) -b_dataloader = DataLoader(framework="tensorflow", dataset=dataset, batch_size=10) -fit(args.input_model, conf, b_dataloader=b_dataloader) - """ - - # test normal case - with open("fake.py", "w", encoding="utf-8") as f: - f.writelines(seq) - # test batchsize > len(dataset), use first batch - fake_data_5 = seq.replace("100, 32, 32, 1", "5, 32, 32, 1") - with open("fake_data_5.py", "w", encoding="utf-8") as f: - f.writelines(fake_data_5) - # test batchsize < len(dataset) < 2*batchsize, discard first batch - fake_data_15 = seq1.replace("100, 32, 32, 1", "15, 32, 32, 1") - with open("fake_data_15.py", "w", encoding="utf-8") as f: - f.writelines(fake_data_15) - # test 2*batchsize < len(dataset) < warmup*batchsize, discard last batch - fake_data_25 = seq1.replace("100, 32, 32, 1", "25, 32, 32, 1") - with open("fake_data_25.py", "w", encoding="utf-8") as f: - f.writelines(fake_data_25) - - -def build_benchmark2(): - seq = [ - "from argparse import ArgumentParser\n", - "arg_parser = ArgumentParser(description='Parse args')\n", - "arg_parser.add_argument('--input_model', dest='input_model', default='input_model', help='input model')\n", - "args = arg_parser.parse_args()\n", - "import time\n", - "import numpy as np\n", - "from neural_compressor.benchmark import benchmark_with_raw_cmd\n", - "from neural_compressor.data import Datasets\n", - "from neural_compressor.model import Model\n", - "dataset = Datasets('tensorflow')['dummy']((5, 32, 32, 1), label=True)\n", - "from neural_compressor.data.dataloaders.dataloader import DataLoader\n", - "b_dataloader = DataLoader(framework='tensorflow', dataset=dataset)\n", - "model = Model(args.input_model)\n", - "input_tensor = model.input_tensor\n", - "output_tensor = model.output_tensor if len(model.output_tensor)>1 else model.output_tensor[0]\n", - "iteration = 10\n", - "latency_list = []\n", - "for idx, (inputs, labels) in enumerate(b_dataloader):\n", - " inputs = np.array([inputs])\n", - " feed_dict = dict(zip(input_tensor, inputs))\n", - " start = time.time()\n", - " predictions = model.sess.run(output_tensor, feed_dict)\n", - " end = time.time()\n", - " latency_list.append(end-start)\n", - " if idx + 1 == iteration:\n", - " break\n", - "latency = np.array(latency_list).mean()\n", - "print('Latency: {:.3f} ms'.format(latency * 1000))\n", - "print('Throughput: {:.3f} images/sec'.format(1. / latency))\n", - ] - - with open("fake_raw_cmd.py", "w", encoding="utf-8") as f: - f.writelines(seq) - - -def build_fake_model(): - graph_path = tempfile.mkstemp(suffix=".pb")[1] - try: - graph = tf.Graph() - graph_def = tf.GraphDef() - with tf.Session(graph=graph) as sess: - x = tf.placeholder(tf.float64, shape=(None, 32, 32, 1), name="x") - y_1 = tf.constant(np.random.random((3, 3, 1, 1)), name="y_1") - y_2 = tf.constant(np.random.random((3, 3, 1, 1)), name="y_2") - conv1 = tf.nn.conv2d(input=x, filter=y_1, strides=[1, 1, 1, 1], padding="VALID", name="conv1") - op = tf.nn.conv2d(input=conv1, filter=y_2, strides=[1, 1, 1, 1], padding="VALID", name="op_to_store") - - sess.run(tf.global_variables_initializer()) - constant_graph = tf.graph_util.convert_variables_to_constants(sess, sess.graph_def, ["op_to_store"]) - - graph_def.ParseFromString(constant_graph.SerializeToString()) - write_graph(graph_def, graph_path) - except: - graph = tf.Graph() - graph_def = tf.compat.v1.GraphDef() - with tf.compat.v1.Session(graph=graph) as sess: - x = tf.compat.v1.placeholder(tf.float64, shape=(None, 32, 32, 1), name="x") - y_1 = tf.constant(np.random.random((3, 3, 1, 1)), name="y_1") - y_2 = tf.constant(np.random.random((3, 3, 1, 1)), name="y_2") - conv1 = tf.nn.conv2d(input=x, filters=y_1, strides=[1, 1, 1, 1], padding="VALID", name="conv1") - op = tf.nn.conv2d(input=conv1, filters=y_2, strides=[1, 1, 1, 1], padding="VALID", name="op_to_store") - - sess.run(tf.compat.v1.global_variables_initializer()) - constant_graph = tf.compat.v1.graph_util.convert_variables_to_constants( - sess, sess.graph_def, ["op_to_store"] - ) - - graph_def.ParseFromString(constant_graph.SerializeToString()) - write_graph(graph_def, graph_path) - return graph_path - - -class TestObjective(unittest.TestCase): - @classmethod - def setUpClass(self): - self.graph_path = build_fake_model() - build_benchmark() - build_benchmark2() - self.cpu_counts = psutil.cpu_count(logical=False) - - @classmethod - def tearDownClass(self): - if os.path.exists("fake.py"): - os.remove("fake.py") - if os.path.exists("fake_data_5.py"): - os.remove("fake_data_5.py") - if os.path.exists("fake_data_15.py"): - os.remove("fake_data_15.py") - if os.path.exists("fake_data_25.py"): - os.remove("fake_data_25.py") - if os.path.exists("fake_raw_cmd.py"): - os.remove("fake_raw_cmd.py") - shutil.rmtree("nc_workspace", ignore_errors=True) - - def test_benchmark(self): - os.system("python fake.py --input_model={}".format(self.graph_path)) - for i in range(2): - with open(f"2_4_{i}.log", "r") as f: - for line in f: - throughput = re.search(r"Throughput:\s+(\d+(\.\d+)?) images/sec", line) - self.assertIsNotNone(throughput) - os.system("rm *.log") - - def test_benchmark_data_5(self): - os.system("python fake_data_5.py --input_model={}".format(self.graph_path)) - for i in range(2): - with open(f"2_4_{i}.log", "r") as f: - for line in f: - throughput = re.search(r"Throughput:\s+(\d+(\.\d+)?) images/sec", line) - self.assertIsNotNone(throughput) - os.system("rm *.log") - - def test_benchmark_data_15(self): - os.system("python fake_data_15.py --input_model={}".format(self.graph_path)) - for i in range(2): - with open(f"2_4_{i}.log", "r") as f: - for line in f: - throughput = re.search(r"Throughput:\s+(\d+(\.\d+)?) images/sec", line) - self.assertIsNotNone(throughput) - os.system("rm *.log") - - def test_benchmark_data_25(self): - os.system("python fake_data_25.py --input_model={}".format(self.graph_path)) - for i in range(2): - with open(f"2_4_{i}.log", "r") as f: - for line in f: - throughput = re.search(r"Throughput:\s+(\d+(\.\d+)?) images/sec", line) - self.assertIsNotNone(throughput) - os.system("rm *.log") - - def test_benchmark_raw_cmd(self): - conf = BenchmarkConfig(warmup=5, iteration=10, cores_per_instance=4, num_of_instance=2) - raw_cmd = "python fake_raw_cmd.py --input_model={}".format(self.graph_path) - benchmark_with_raw_cmd(raw_cmd, conf=conf) - for i in range(2): - with open(f"2_4_{i}.log", "r") as f: - for line in f: - throughput = re.search(r"Throughput:\s+(\d+(\.\d+)?) images/sec", line) - self.assertIsNotNone(throughput) - - -if __name__ == "__main__": - unittest.main()