Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .ci/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ git+https://github.com/eaidova/treon.git@ea/cell_info # test framework for Jupyt
toml # used for validating docker requirements
mistune==2.0.4 # use for parsing README.md
requests==2.32.4 # use for checking links
psutil # process management
pyspelling # automating spell checking
1 change: 1 addition & 0 deletions .ci/precommit_list.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
notebooks/hello-world/hello-world.ipynb
251 changes: 206 additions & 45 deletions .ci/validate_notebooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import json
import shutil
import platform
import psutil
import threading
import queue
import yaml

from argparse import ArgumentParser
Expand Down Expand Up @@ -231,6 +234,145 @@ def print_disk_usage(label: str, notebook_dir: Path):
print(f"Error checking disk usage: {e}")


def read_output_thread(process, output_queue):
"""
Thread target helper function to read subprocess output in real-time.
"""
try:
for line in iter(process.stdout.readline, ""):
if line:
output_queue.put(line)
output_queue.put(None) # Signal EOF
except Exception as e:
print(f"Exception during read_output_thread method: {e}", flush=True)
output_queue.put(None) # Signal error/EOF


def kill_process_tree(pid):
"""Kill process tree using platform-specific methods."""
try:
if platform.system() == "Windows":
# On Windows, kill all children in the process group
parent = psutil.Process(pid)
children = parent.children(recursive=True)
print(f"Killing process tree: parent PID {pid} with {len(children)} children", flush=True)
for child in children:
try:
print(f"Killing child process PID {child.pid}", flush=True)
child.kill()
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
print(f"Could not kill child PID {child.pid}: {e}", flush=True)
try:
parent.kill()
print(f"Killed parent process PID {pid}", flush=True)
except (psutil.NoSuchProcess, psutil.AccessDenied) as e:
print(f"Could not kill parent PID {pid}: {e}", flush=True)
else:
# On Unix, kill the entire process group
import signal

os.killpg(pid, signal.SIGKILL)
print(f"Killed process group PID {pid}", flush=True)
except Exception as e:
print(f"Error killing process tree PID {pid}: {e}", flush=True)


def run_subprocess_with_timeout(cmd, timeout, shell=False, description="Process"):
"""
Run a subprocess with real-time output and timeout protection.

Args:
cmd: Command to run (list or string)
timeout: Timeout in seconds
shell: Whether to use shell=True
description: Description for logging purposes

Returns:
tuple: (return_code, duration)
"""
# Convert all Path objects to strings in cmd list
if isinstance(cmd, list):
cmd = [str(item) for item in cmd]
print(f"Running {description}: {' '.join(cmd) if isinstance(cmd, list) else cmd}", flush=True)
start_time = time.perf_counter()
process = None
retcode = None

# Setup process group creation for proper child process management
popen_kwargs = {
"shell": shell,
"stdout": subprocess.PIPE,
"stderr": subprocess.STDOUT,
"encoding": "utf-8",
"errors": "replace",
"bufsize": 1,
}

if platform.system() == "Windows":
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
else:
# Use start_new_session instead of preexec_fn to avoid thread-safety warning
popen_kwargs["start_new_session"] = True

try:
process = subprocess.Popen(cmd, **popen_kwargs)

# Start output reading thread
output_queue = queue.Queue()
reader_thread = threading.Thread(target=read_output_thread, args=(process, output_queue), daemon=True)
reader_thread.start()

loop_start = time.perf_counter()
while True:
# Check timeout FIRST (before any potentially blocking operations)
if time.perf_counter() - loop_start > timeout:
print(f"\n{description} timeout reached ({timeout}s), killing process...", flush=True)
kill_process_tree(process.pid)
retcode = -42 # Special timeout exit code
break

# Check if process finished
if process.poll() is not None:
retcode = process.returncode
break

# Try to get output with short timeout (non-blocking check)
try:
line = output_queue.get(timeout=0.1)
if line is None: # EOF signal
break
print(line, end="", flush=True)
except queue.Empty:
# No output available, loop continues to check timeout
continue

# Drain any remaining output from the queue
while not output_queue.empty():
try:
line = output_queue.get_nowait()
if line:
print(line, end="", flush=True)
except queue.Empty:
break

# Wait for process to finish if not already done
if retcode is None:
process.wait()
retcode = process.returncode

except Exception as e:
print(f"\nError running {description}: {e}", flush=True)
try:
if process and process.poll() is None:
kill_process_tree(process.pid)
except Exception as ex:
print(f"Error during cleanup: {ex}", flush=True)
retcode = -1

duration = time.perf_counter() - start_time
return retcode, duration


def run_test(notebook_path: Path, root, timeout=7200, keep_artifacts=False, report_dir=".") -> Optional[tuple[str, int, float, str, str]]:
os.environ["HUGGINGFACE_HUB_CACHE"] = str(notebook_path.parent)
os.environ["HF_HUB_CACHE"] = str(notebook_path.parent)
Expand Down Expand Up @@ -260,16 +402,12 @@ def run_test(notebook_path: Path, root, timeout=7200, keep_artifacts=False, repo
collect_python_packages(report_dir / (patched_notebook.stem + "_env_before.txt"))

main_command = [sys.executable, "-m", "treon", "--verbose", str(patched_notebook)]
start = time.perf_counter()
try:
retcode = subprocess.run(
main_command,
shell=(platform.system() == "Windows"),
timeout=timeout,
).returncode
except subprocess.TimeoutExpired:
retcode = -42
duration = time.perf_counter() - start
retcode, duration = run_subprocess_with_timeout(
main_command,
timeout,
shell=(platform.system() == "Windows"),
description=f"Notebook test [{patched_notebook.name}]",
)

ov_version_after = get_pip_package_version("openvino", "OpenVINO after notebook execution", "OpenVINO is missing")
get_pip_package_version("openvino_tokenizers", "OpenVINO Tokenizers after notebook execution", "OpenVINO Tokenizers is missing")
Expand All @@ -285,23 +423,60 @@ def run_test(notebook_path: Path, root, timeout=7200, keep_artifacts=False, repo
return result


def write_csv_report(csv_path, test_report, result_queue):
try:
with csv_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=["name", "status", "full_path", "duration"])
writer.writeheader()
writer.writerows(test_report)
result_queue.put(("success", None))
except Exception as e:
result_queue.put(("error", str(e)))


def finalize_status(failed_notebooks: list[str], timeout_notebooks: list[str], test_plan: TestPlan, report_dir: Path, root: Path) -> int:
return_status = 0

if failed_notebooks:
return_status = 1
print("FAILED: \n{}".format("\n".join(failed_notebooks)))
print("FAILED: \n{}".format("\n".join(failed_notebooks)), flush=True)

if timeout_notebooks:
print("FAILED BY TIMEOUT: \n{}".format("\n".join(timeout_notebooks)))
print("FAILED BY TIMEOUT: \n{}".format("\n".join(timeout_notebooks)), flush=True)

test_report = []

for notebook, status in test_plan.items():
test_status = status["status"] or NotebookStatus.NOT_RUN
test_report.append(
{"name": notebook.as_posix(), "status": test_status, "full_path": str(status["path"].relative_to(root)), "duration": status["duration"]}
)
with (report_dir / "test_report.csv").open("w") as f:
writer = csv.DictWriter(f, fieldnames=["name", "status", "full_path", "duration"])
writer.writeheader()
writer.writerows(test_report)
try:
full_path_str = str(status["path"].relative_to(root))
except (ValueError, TypeError):
full_path_str = str(status["path"].absolute())

test_report.append({"name": notebook.as_posix(), "status": test_status, "full_path": full_path_str, "duration": status["duration"]})
print(f"Test report built with {len(test_report)} entries", flush=True)
csv_path = report_dir / "test_report.csv"
print(f"Writing test report to: {csv_path.absolute()}", flush=True)
result_queue = queue.Queue()
csv_writer_thread = threading.Thread(target=write_csv_report, args=(csv_path, test_report, result_queue), daemon=True)
csv_writer_thread.start()
csv_writer_thread.join(timeout=30)

if csv_writer_thread.is_alive():
print(f"ERROR: CSV write hung after 30s timeout", flush=True)
return_status = 1
else:
try:
status, error = result_queue.get_nowait()
if status == "error":
print(f"ERROR writing test report: {error}", flush=True)
return_status = 1
else:
print(f"Test report written successfully", flush=True)
except queue.Empty:
print(f"ERROR: CSV thread finished but produced no result", flush=True)
return_status = 1

return return_status


Expand Down Expand Up @@ -350,7 +525,7 @@ def main():
failed_notebooks = []
timeout_notebooks = []
args = parse_arguments()
reports_dir = Path(args.report_dir)
reports_dir = Path(args.report_dir).absolute()
reports_dir.mkdir(exist_ok=True, parents=True)
notebooks_moving_dir = args.move_notebooks_dir
root = ROOT
Expand All @@ -372,7 +547,7 @@ def main():
for notebook, report in test_plan.items():
if report["status"] == NotebookStatus.SKIPPED:
continue
test_result = run_test(report["path"], root, args.timeout, keep_artifacts, reports_dir.absolute())
test_result = run_test(report["path"], root, args.timeout, keep_artifacts, reports_dir)
timing = 0
if not test_result:
print(f'Testing notebooks "{str(notebook)}" is not found.')
Expand Down Expand Up @@ -401,30 +576,16 @@ def main():
)
if args.upload_to_db:
cmd = [sys.executable, args.upload_to_db, report_path]
print(f"\nUploading {report_path} to database. CMD: {cmd}")
dbprocess = None
try:
dbprocess = subprocess.Popen(
cmd, shell=(platform.system() == "Windows"), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True
)
try:
stdout, _ = dbprocess.communicate(timeout=60)
if stdout:
print(stdout, flush=True)
except subprocess.TimeoutExpired:
print("Database upload process timed out after 60 seconds.")
dbprocess.kill()
stdout, _ = dbprocess.communicate()
if stdout:
print(stdout, flush=True)
except Exception as e:
print(f"An error occurred during database upload: {e}")
try:
if dbprocess and dbprocess.poll() is None:
dbprocess.kill()
dbprocess.communicate(timeout=2)
except Exception as cleanup_error:
print(f"Failed to cleanup database upload process: {cleanup_error}")
retcode, duration = run_subprocess_with_timeout(
cmd,
timeout=15,
shell=(platform.system() == "Windows"),
description=f"Upload notebook report to DB [{patched_notebook}]",
)
if retcode != 0:
print(f"Database upload failed with exit code {retcode}, duration: {duration:.2f} seconds", flush=True)
else:
print(f"Database upload succeeded, duration: {duration:.2f} seconds", flush=True)

if args.early_stop:
break
Expand Down
23 changes: 20 additions & 3 deletions .github/workflows/build_treon_reusable.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,31 @@ jobs:
if: ${{ inputs.test_only_changed }}
shell: bash
run: |
touch test_notebooks.txt
changed_files="${{ steps.changed-files.outputs.all_changed_files }}"
changed_files=$(echo $changed_files | tr '\\' '/')

# Check if any notebook files were changed
notebook_changed=false
for file in $changed_files; do
echo "$file was changed"
echo $file >> test_notebooks.txt
if [[ $file == notebooks/* ]]; then
notebook_changed=true
break
fi
done

if [ "$notebook_changed" = true ]; then
# If notebooks were changed, test those notebooks
touch test_notebooks.txt
for file in $changed_files; do
echo "$file was changed"
echo $file >> test_notebooks.txt
done
else
# If no notebooks were changed, use precommit list
echo "No notebook files changed, using precommit list"
cp .ci/precommit_list.txt test_notebooks.txt
fi

- name: Dotenv Action
id: dotenv
uses: xom9ikk/dotenv@ac290ca23a42155a0cba1031d23afa46240116a9 # v2.3.0
Expand Down