diff --git a/specifications/SPEC-UTILS-SERVICE.md b/specifications/SPEC-UTILS-SERVICE.md index 5b8db2d7..544a4b02 100644 --- a/specifications/SPEC-UTILS-SERVICE.md +++ b/specifications/SPEC-UTILS-SERVICE.md @@ -151,12 +151,12 @@ health: properties: status: type: string - enum: [UP, DOWN] + enum: [UP, DEGRADED, DOWN] description: Service health status reason: type: string nullable: true - description: Optional reason for status + description: Required reason for DOWN or DEGRADED status; must be null for UP components: type: object description: Hierarchical component health diff --git a/specifications/SPEC_SYSTEM_SERVICE.md b/specifications/SPEC_SYSTEM_SERVICE.md index ace5b467..115e579d 100644 --- a/specifications/SPEC_SYSTEM_SERVICE.md +++ b/specifications/SPEC_SYSTEM_SERVICE.md @@ -107,27 +107,6 @@ _Note: For detailed implementation, refer to the source code in the `src/aignost ### 3.3 Data Schemas -**Health Status Schema:** - -```yaml -Health: - type: object - properties: - status: - type: string - enum: [UP, DOWN] - description: "Overall system health status" - components: - type: object - description: "Health status of individual components" - additionalProperties: - $ref: "#/definitions/Health" - reason: - type: string - description: "Reason for DOWN status, null for UP" - required: [status] -``` - **System Info Schema:** ```yaml diff --git a/src/aignostics/platform/_service.py b/src/aignostics/platform/_service.py index 505ab18c..b1fb57eb 100644 --- a/src/aignostics/platform/_service.py +++ b/src/aignostics/platform/_service.py @@ -192,10 +192,47 @@ def info(self, mask_secrets: bool = True) -> dict[str, Any]: else None, } + @staticmethod + def _health_from_response(response: urllib3.BaseHTTPResponse) -> Health: + """Map a PAPI health response to a Health status. + + Handles non-200 status codes, unparseable bodies, and the three recognised + ``status`` values (``"UP"``, ``"DEGRADED"``, ``"DOWN"``). + + Args: + response: urllib3 response from the ``/health`` endpoint. + + Returns: + Health: ``UP``, ``DEGRADED``, or ``DOWN`` derived from the response. + """ + if response.status != HTTPStatus.OK: + logger.error("Aignostics Platform API returned '{}'", response.status) + return Health( + status=Health.Code.DOWN, reason=f"Aignostics Platform API returned status '{response.status}'" + ) + + try: + body = json.loads(response.data) + except Exception: + return Health(status=Health.Code.DOWN, reason="Aignostics Platform API returned unparseable response") + + api_status = body.get("status") + if api_status == "UP": + return Health(status=Health.Code.UP) + if api_status == "DEGRADED": + reason = body.get("reason") or "Aignostics Platform API is DEGRADED" + logger.warning("Aignostics Platform API is DEGRADED: {}", reason) + return Health(status=Health.Code.DEGRADED, reason=reason) + return Health( + status=Health.Code.DOWN, + reason=f"Aignostics Platform API returned unknown status '{api_status}'", + ) + def _determine_api_public_health(self) -> Health: """Determine healthiness and reachability of Aignostics Platform API. - Checks if health endpoint is reachable and returns 200 OK + - Parses the response body to detect DEGRADED status - Uses urllib3 for a direct connection check without authentication Returns: @@ -209,23 +246,17 @@ def _determine_api_public_health(self) -> Health: headers={"User-Agent": user_agent()}, timeout=urllib3.Timeout(total=self._settings.health_timeout), ) - - if response.status != HTTPStatus.OK: - logger.error("Aignostics Platform API (public) returned '{}'", response.status) - return Health( - status=Health.Code.DOWN, reason=f"Aignostics Platform API returned status '{response.status}'" - ) + return self._health_from_response(response) except Exception as e: logger.exception("Issue with Aignostics Platform API") return Health(status=Health.Code.DOWN, reason=f"Issue with Aignostics Platform API: '{e}'") - return Health(status=Health.Code.UP) - def _determine_api_authenticated_health(self) -> Health: """Determine healthiness and reachability of Aignostics Platform API via authenticated request. Uses a dedicated HTTP pool (separate from the API client's connection pool) to prevent connection-level cross-contamination between health checks and API calls. + Parses the response body to detect DEGRADED status. Returns: Health: The healthiness of the Aignostics Platform API when trying to reach via authenticated request. @@ -242,14 +273,10 @@ def _determine_api_authenticated_health(self) -> Health: }, timeout=urllib3.Timeout(total=self._settings.health_timeout), ) - - if response.status != HTTPStatus.OK: - logger.error("Aignostics Platform API (authenticated) returned '{}'", response.status) - return Health(status=Health.Code.DOWN, reason=f"Aignostics Platform API returned '{response.status}'") + return self._health_from_response(response) except Exception as e: logger.exception("Issue with Aignostics Platform API") return Health(status=Health.Code.DOWN, reason=f"Issue with Aignostics Platform API: '{e}'") - return Health(status=Health.Code.UP) def health(self) -> Health: """Determine health of this service. diff --git a/src/aignostics/system/CLAUDE.md b/src/aignostics/system/CLAUDE.md index 38adafa2..3b3d0124 100644 --- a/src/aignostics/system/CLAUDE.md +++ b/src/aignostics/system/CLAUDE.md @@ -123,11 +123,9 @@ def health(self) -> Health: ) # Determine overall status based on ALL modules - overall = Health.Code.UP if all( - c.status == Health.Code.UP for c in components.values() - ) else Health.Code.DOWN - - return Health(status=overall, components=components) + # Priority: DOWN > DEGRADED > UP + # compute_health_from_components() handles this automatically + return Health(status=Health.Code.UP, components=components) ``` ### Exception Hierarchy (`_exceptions.py`) @@ -275,7 +273,7 @@ print(f"System status: {health.status}") # Check specific component platform_health = health.components.get("platform") -if platform_health.status != Health.Code.UP: +if not platform_health: # False only when DOWN (DEGRADED and UP are both truthy) print(f"Platform issue: {platform_health.reason}") ``` @@ -453,7 +451,7 @@ def test_health_aggregation(): service = Service() health = service.health() - assert health.status in [Health.Code.UP, Health.Code.DOWN] + assert health.status in [Health.Code.UP, Health.Code.DEGRADED, Health.Code.DOWN] assert "platform" in health.components assert isinstance(health.components, dict) diff --git a/src/aignostics/utils/__init__.py b/src/aignostics/utils/__init__.py index 309e9bb2..7781cbf7 100644 --- a/src/aignostics/utils/__init__.py +++ b/src/aignostics/utils/__init__.py @@ -24,7 +24,7 @@ ) from ._di import discover_plugin_packages, load_modules, locate_implementations, locate_subclasses from ._fs import get_user_data_directory, open_user_data_directory, sanitize_path, sanitize_path_component -from ._health import Health +from ._health import Health, HealthStatus from ._log import LogSettings from ._mcp import MCP_SERVER_NAME, MCP_TRANSPORT, mcp_create_server, mcp_discover_servers, mcp_list_tools, mcp_run from ._nav import BaseNavBuilder, NavGroup, NavItem, gui_get_nav_groups @@ -42,6 +42,7 @@ "BaseNavBuilder", "BaseService", "Health", + "HealthStatus", "LogSettings", "NavGroup", "NavItem", diff --git a/src/aignostics/utils/_health.py b/src/aignostics/utils/_health.py index 84042398..ebc3e5a4 100644 --- a/src/aignostics/utils/_health.py +++ b/src/aignostics/utils/_health.py @@ -1,13 +1,22 @@ """Health models and status definitions for service health checks.""" from enum import StrEnum -from typing import ClassVar, Self +from typing import Any, ClassVar, Self from pydantic import BaseModel, Field, model_validator -class _HealthStatus(StrEnum): +class HealthStatus(StrEnum): + """Health status enumeration for service health checks. + + Values: + UP: Service is operating normally + DEGRADED: Service is operational but with reduced functionality + DOWN: Service is not operational + """ + UP = "UP" + DEGRADED = "DEGRADED" DOWN = "DOWN" @@ -15,20 +24,20 @@ class Health(BaseModel): """Represents the health status of a service with optional components and failure reasons. - A health object can have child components, i.e. health forms a tree. - - Any node in the tree can set itself to DOWN. In this case the node is required - to set the reason attribute. If reason is not set when DOWN, - automatic model validation of the tree will fail. - - DOWN'ness is propagated to parent health objects. I.e. the health of a parent - node is automatically set to DOWN if any of its child components are DOWN. The - child components leading to this will be listed in the reason. - - The root of the health tree is computed in the system module. The health of other - modules is automatically picked up by the system module. + - Any node in the tree can set itself to DOWN or DEGRADED. If DOWN or DEGRADED, the node + is required to set the reason attribute. If reason is not set when DOWN or DEGRADED, + automatic model validation fails. + - DOWN trumps DEGRADED, DEGRADED trumps UP. If any child is DOWN, parent is DOWN. + If none are DOWN but any are DEGRADED, parent is DEGRADED. + - The root of the health tree is computed in the system module. + The health of other modules is automatically picked up by the system module. """ - Code: ClassVar[type[_HealthStatus]] = _HealthStatus - status: _HealthStatus + Code: ClassVar[type[HealthStatus]] = HealthStatus + status: HealthStatus reason: str | None = None components: dict[str, "Health"] = Field(default_factory=dict) + uptime_statistics: dict[str, dict[str, Any]] | None = None # Optional uptime stats def compute_health_from_components(self) -> Self: """Recursively compute health status from components. @@ -36,34 +45,48 @@ def compute_health_from_components(self) -> Self: - If health is already DOWN, it remains DOWN with its original reason. - If health is UP but any component is DOWN, health becomes DOWN with a reason listing all failed components. + - If no components are DOWN but any are DEGRADED, health becomes DEGRADED with a reason. Returns: Self: The updated health instance with computed status. """ # Skip recomputation if already known to be DOWN - if self.status == _HealthStatus.DOWN: + if self.status == HealthStatus.DOWN: return self # No components means we keep the existing status if not self.components: return self - # Find all DOWN components + # Find all DOWN and DEGRADED components down_components = [] + degraded_components = [] for component_name, component in self.components.items(): # Recursively compute health for each component component.compute_health_from_components() - if component.status == _HealthStatus.DOWN: - down_components.append(component_name) + if component.status == HealthStatus.DOWN: + down_components.append((component_name, component.reason)) + elif component.status == HealthStatus.DEGRADED: + degraded_components.append((component_name, component.reason)) # If any components are DOWN, mark the parent as DOWN if down_components: - self.status = _HealthStatus.DOWN + self.status = HealthStatus.DOWN if len(down_components) == 1: - self.reason = f"Component '{down_components[0]}' is DOWN" + component_name, component_reason = down_components[0] + self.reason = f"Component '{component_name}' is DOWN ({component_reason})" + else: + component_list = ", ".join(f"'{name}' ({reason})" for name, reason in down_components) + self.reason = f"Components {component_list} are DOWN" + # If no components are DOWN but any are DEGRADED, mark parent as DEGRADED + elif degraded_components: + self.status = HealthStatus.DEGRADED + if len(degraded_components) == 1: + component_name, component_reason = degraded_components[0] + self.reason = f"Component '{component_name}' is DEGRADED ({component_reason})" else: - component_list = "', '".join(down_components) - self.reason = f"Components '{component_list}' are DOWN" + component_list = ", ".join(f"'{name}' ({reason})" for name, reason in degraded_components) + self.reason = f"Components {component_list} are DEGRADED" return self @@ -73,7 +96,7 @@ def validate_health_state(self) -> Self: - Compute overall health based on component health - Ensure UP status has no associated reason - - Ensure DOWN status always has a reason + - Ensure DOWN and DEGRADED status always have a reason Returns: Self: The validated model instance with correct health status. @@ -85,24 +108,24 @@ def validate_health_state(self) -> Self: self.compute_health_from_components() # Validate that UP status has no reason - if (self.status == _HealthStatus.UP) and self.reason: + if (self.status == HealthStatus.UP) and self.reason: msg = f"Health {self.status} must not have reason" raise ValueError(msg) - # Validate that DOWN status always has a reason - if (self.status == _HealthStatus.DOWN) and not self.reason: - msg = "Health DOWN must have a reason" + # Validate that DOWN and DEGRADED status always have a reason + if (self.status in {HealthStatus.DOWN, HealthStatus.DEGRADED}) and not self.reason: + msg = f"Health {self.status} must have a reason" raise ValueError(msg) return self def __str__(self) -> str: - """Return string representation of health status with optional reason for DOWN state. + """Return string representation of health status with optional reason for DOWN/DEGRADED state. Returns: - str: The health status value, with reason appended if status is DOWN. + str: The health status value, with reason appended if status is DOWN or DEGRADED. """ - if self.status == _HealthStatus.DOWN and self.reason: + if self.status in {HealthStatus.DOWN, HealthStatus.DEGRADED} and self.reason: return f"{self.status.value}: {self.reason}" return self.status.value @@ -110,6 +133,6 @@ def __bool__(self) -> bool: """Convert health status to a boolean value. Returns: - bool: True if the status is UP, False otherwise. + bool: True if the status is UP or DEGRADED, False otherwise. """ - return self.status == _HealthStatus.UP + return self.status in {HealthStatus.UP, HealthStatus.DEGRADED} diff --git a/tests/aignostics/application/cli_test.py b/tests/aignostics/application/cli_test.py index 6e9ef449..8af036a3 100644 --- a/tests/aignostics/application/cli_test.py +++ b/tests/aignostics/application/cli_test.py @@ -1,8 +1,10 @@ """Tests to verify the CLI functionality of the application module.""" +import contextlib import json import platform import re +from collections.abc import Generator from datetime import UTC, datetime, timedelta from pathlib import Path from time import sleep @@ -45,6 +47,96 @@ RUN_CSV_FILENAME = "run.csv" +# Full SPOT_0 CSV - single source of truth for all run submissions in this test file. +CSV_CONTENT_SPOT0 = ( + "external_id;checksum_base64_crc32c;resolution_mpp;width_px;height_px;" + "staining_method;tissue;disease;platform_bucket_url\n" + f"{SPOT_0_FILENAME};{SPOT_0_CRC32C};{SPOT_0_RESOLUTION_MPP};{SPOT_0_WIDTH};{SPOT_0_HEIGHT}" + f";H&E;LUNG;LUNG_CANCER;{SPOT_0_GS_URL}" +) + +# Source directory for `prepare` tests (contains small-pyramidal.dcm). +PREPARE_SOURCE_DIR = Path(__file__).parent.parent.parent / "resources" / "run" + + +@contextlib.contextmanager +def submitted_run( + runner: CliRunner, + tmp_path: Path, + csv_content: str, + application_id: str = HETA_APPLICATION_ID, + extra_args: list[str] | None = None, +) -> Generator[str, None, None]: + """Context manager that submits a run, yields its ID, then cancels it on exit. + + Submits an application run via the CLI, yields the extracted run ID to the caller, + and attempts to cancel the run on exit. Cancellation failures are logged but do not + raise, so test assertions are never masked by cleanup errors. + + A 5-minute deadline is automatically appended unless ``extra_args`` already contains + ``--deadline``. + + Args: + runner: Typer CliRunner to invoke CLI commands. + tmp_path: Temporary directory used to write the CSV file. + csv_content: Full CSV content (header + rows) for the run submission. + application_id: Application ID to submit against. Defaults to HETA_APPLICATION_ID. + extra_args: Additional CLI arguments forwarded to the ``submit`` command, + e.g. ``["--tags", "my-tag", "--deadline", "..."]``. + + Yields: + The run ID string extracted from the submission output. + + Raises: + AssertionError: If the submit command fails or the run ID cannot be extracted. + """ + csv_path = tmp_path / "run.csv" + csv_path.write_text(csv_content) + + extra = list(extra_args or []) + if "--deadline" not in extra: + extra += ["--deadline", (datetime.now(tz=UTC) + timedelta(minutes=5)).isoformat()] + + args = ["application", "run", "submit", application_id, str(csv_path), *extra] + result = runner.invoke(cli, args) + assert result.exit_code == 0, f"Run submission failed (exit {result.exit_code}):\n{result.stdout}" + + output = normalize_output(result.stdout) + run_id_match = re.search(r"Submitted run with id '([0-9a-f-]+)' for '", output) + assert run_id_match, f"Could not extract run ID from submission output:\n{output}" + run_id = run_id_match.group(1) + + try: + yield run_id + finally: + cancel_result = runner.invoke(cli, ["application", "run", "cancel", run_id]) + if cancel_result.exit_code != 0: + logger.warning( + "Failed to cancel run '{}' during cleanup (exit {}): {}", + run_id, + cancel_result.exit_code, + normalize_output(cancel_result.stdout), + ) + + +def _cancel_run_if_submitted(runner: CliRunner, output: str) -> None: + """Cancel any run that was unexpectedly created, for use in error-path cleanup. + + Args: + runner: Typer CliRunner to invoke CLI commands. + output: stdout from the submit invocation to search for a run ID. + """ + run_id_match = re.search(r"Submitted run with id '([0-9a-f-]+)' for '", normalize_output(output)) + if run_id_match: + cancel_result = runner.invoke(cli, ["application", "run", "cancel", run_id_match.group(1)]) + if cancel_result.exit_code != 0: + logger.warning( + "Defensive cancel of run '{}' failed (exit {}): {}", + run_id_match.group(1), + cancel_result.exit_code, + normalize_output(cancel_result.stdout), + ) + @pytest.mark.e2e @pytest.mark.timeout(timeout=60) @@ -126,7 +218,7 @@ def test_cli_application_run_prepare_upload_submit_fail_on_mpp( """Check application run prepare command and upload works and submit fails on mpp not supported.""" record_property("tested-item-id", "TC-APPLICATION-CLI-01") # Step 1: Prepare the file, by scanning for wsi and generating metadata - source_directory = Path(__file__).parent.parent.parent / "resources" / "run" + source_directory = PREPARE_SOURCE_DIR metadata_csv = tmp_path / "metadata.csv" result = runner.invoke( cli, ["application", "run", "prepare", HETA_APPLICATION_ID, str(metadata_csv), str(source_directory)] @@ -157,9 +249,12 @@ def test_cli_application_run_prepare_upload_submit_fail_on_mpp( # Step 3: Submit the run from the metadata file result = runner.invoke(cli, ["application", "run", "submit", HETA_APPLICATION_ID, str(metadata_csv), "--force"]) - assert result.exit_code == 2 - assert "Invalid metadata for artifact `whole_slide_image`" in normalize_output(result.stdout) - assert "8.065226874391001 is greater than" in normalize_output(result.stdout) + try: + assert result.exit_code == 2 + assert "Invalid metadata for artifact `whole_slide_image`" in normalize_output(result.stdout) + assert "8.065226874391001 is greater than" in normalize_output(result.stdout) + finally: + _cancel_run_if_submitted(runner, result.stdout) @pytest.mark.integration @@ -180,7 +275,7 @@ def test_cli_application_run_upload_fails_on_missing_source(runner: CliRunner, t assert "Warning: Source file 'missing.file' (row 0) does not exist" in normalize_output(result.stdout) -@pytest.mark.unit +@pytest.mark.e2e @pytest.mark.timeout(timeout=10) @patch("aignostics.application._cli.SystemService.health_static") def test_cli_run_submit_fails_when_system_unhealthy_and_no_force( @@ -207,11 +302,28 @@ def test_cli_run_submit_fails_when_system_unhealthy_and_no_force( str(csv_path), ], ) + try: + assert result.exit_code == 1 + finally: + _cancel_run_if_submitted(runner, result.stdout) - assert result.exit_code == 1 + +@pytest.mark.e2e +@pytest.mark.timeout(timeout=10) +@patch("aignostics.application._cli.SystemService.health_static") +def test_cli_run_submit_succeeds_when_system_degraded_and_no_force( + mock_health: MagicMock, runner: CliRunner, tmp_path: Path +) -> None: + """Check run submit command succeeds when system is degraded and --force is not used.""" + mock_health.return_value = Health( + status=Health.Code.DEGRADED, + reason="Simulated degraded system for testing", + ) + with submitted_run(runner, tmp_path, CSV_CONTENT_SPOT0): + pass # submission success is asserted by the context manager -@pytest.mark.unit +@pytest.mark.e2e @pytest.mark.timeout(timeout=10) @patch("aignostics.application._cli.SystemService.health_static") def test_cli_run_upload_fails_when_system_unhealthy_and_no_force( @@ -242,7 +354,7 @@ def test_cli_run_upload_fails_when_system_unhealthy_and_no_force( assert result.exit_code == 1 -@pytest.mark.unit +@pytest.mark.e2e @pytest.mark.timeout(timeout=10) @patch("aignostics.application._cli.SystemService.health_static") def test_cli_run_execute_fails_when_system_unhealthy_and_no_force( @@ -298,11 +410,13 @@ def test_cli_run_submit_fails_on_application_not_found(runner: CliRunner, tmp_pa "--force", ], ) - - assert result.exit_code == 2 - assert 'HTTP response body: {"detail":"application not found"}' in normalize_output(result.stdout) - assert "Warning: Could not find application" in normalize_output(result.stdout) - assert result.exit_code == 2 + try: + assert result.exit_code == 2 + assert 'HTTP response body: {"detail":"application not found"}' in normalize_output(result.stdout) + assert "Warning: Could not find application" in normalize_output(result.stdout) + assert result.exit_code == 2 + finally: + _cancel_run_if_submitted(runner, result.stdout) @pytest.mark.e2e @@ -329,9 +443,11 @@ def test_cli_run_submit_fails_on_unsupported_cloud(runner: CliRunner, tmp_path: "--force", ], ) - - assert result.exit_code == 2 - assert "Invalid platform bucket URL: 'aws://bucket/test'" in normalize_output(result.stdout) + try: + assert result.exit_code == 2 + assert "Invalid platform bucket URL: 'aws://bucket/test'" in normalize_output(result.stdout) + finally: + _cancel_run_if_submitted(runner, result.stdout) @pytest.mark.e2e @@ -358,9 +474,11 @@ def test_cli_run_submit_fails_on_missing_url(runner: CliRunner, tmp_path: Path, "--force", ], ) - - assert result.exit_code == 2 - assert "Invalid platform bucket URL: ''" in normalize_output(result.stdout) + try: + assert result.exit_code == 2 + assert "Invalid platform bucket URL: ''" in normalize_output(result.stdout) + finally: + _cancel_run_if_submitted(runner, result.stdout) @pytest.mark.e2e @@ -372,22 +490,11 @@ def test_cli_run_submit_and_describe_and_cancel_and_download_and_delete( # noqa ) -> None: """Check run submit command runs successfully.""" record_property("tested-item-id", "TC-APPLICATION-CLI-02") - csv_content = "external_id;checksum_base64_crc32c;resolution_mpp;width_px;height_px;staining_method;tissue;disease;" - csv_content += "platform_bucket_url\n" - csv_content += ( - f"{SPOT_0_FILENAME};{SPOT_0_CRC32C};{SPOT_0_RESOLUTION_MPP};{SPOT_0_WIDTH};{SPOT_0_HEIGHT}" - f";H&E;LUNG;LUNG_CANCER;{SPOT_0_GS_URL}" - ) - csv_path = tmp_path / "dummy.csv" - csv_path.write_text(csv_content) - result = runner.invoke( - cli, - [ - "application", - "run", - "submit", - HETA_APPLICATION_ID, - str(csv_path), + with submitted_run( + runner, + tmp_path, + CSV_CONTENT_SPOT0, + extra_args=[ "--note", "note_of_this_complex_test", "--tags", @@ -399,254 +506,244 @@ def test_cli_run_submit_and_describe_and_cancel_and_download_and_delete( # noqa PIPELINE_GPU_TYPE, "--force", ], - ) - output = normalize_output(result.stdout) - assert re.search( - r"Submitted run with id '[0-9a-f-]+' for '", - output, - ), f"Output '{output}' doesn't match expected pattern" - assert result.exit_code == 0 + ) as run_id: + # Test that we can find this run by it's note via the query parameter + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--query", + "note_of_this_complex_test", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by note via query" - # Extract run ID from the output - run_id_match = re.search(r"Submitted run with id '([0-9a-f-]+)' for '", output) - assert run_id_match, f"Failed to extract run ID from output '{output}'" - run_id = run_id_match.group(1) + # Test that we can find this run by it's tag via the query parameter + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--query", + "test_cli_run_submit_and_describe_and_cancel_and_download_and_delete", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by tag via query" - # Test that we can find this run by it's note via the query parameter - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--query", - "note_of_this_complex_test", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by note via query" + # Test that we cannot find this run by another tag via the query parameter + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--query", + "another_tag", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by another tag via query" - # Test that we can find this run by it's tag via the query parameter - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--query", - "test_cli_run_submit_and_describe_and_cancel_and_download_and_delete", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by tag via query" + # Test that we can find this run by it's note + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--note-regex", + "note_of_this_complex_test", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by note" - # Test that we cannot find this run by another tag via the query parameter - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--query", - "another_tag", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by another tag via query" + # but not another note + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--note-regex", + "other_note", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by other note" - # Test that we can find this run by it's note - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--note-regex", - "note_of_this_complex_test", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by note" + # Test that we can find this run by one of its tags + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--tags", + "test_cli_run_submit_and_describe_and_cancel_and_download_and_delete", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by one tag" - # but not another note - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--note-regex", - "other_note", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by other note" + # but not another tag + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--tags", + "other-tag", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by other tag" - # Test that we can find this run by one of its tags - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--tags", - "test_cli_run_submit_and_describe_and_cancel_and_download_and_delete", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by one tag" + # Test that we can find this run by two of its tags + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--tags", + "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by two tags" - # but not another tag - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--tags", - "other-tag", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by other tag" + # Test that we can find this run by all of its tags + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--tags", + "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete,further-tag", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by all tags" - # Test that we can find this run by two of its tags - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--tags", - "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by two tags" + # Test that we cannot find this run by all of its tags and a non-existent tag + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--tags", + "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete,further-tag,non-existing-tag", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by all tags" - # Test that we can find this run by all of its tags - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--tags", - "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete,further-tag", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by all tags" + # Test that we can find this run by all of its tags and it's note + list_result = runner.invoke( + cli, + [ + "application", + "run", + "list", + "--note-regex", + "note_of_this_complex_test", + "--tags", + "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete,further-tag", + ], + ) + assert list_result.exit_code == 0 + list_output = normalize_output(list_result.stdout) + assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by all tags and note" - # Test that we cannot find this run by all of its tags and a non-existent tag - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--tags", - "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete,further-tag,non-existing-tag", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id not in list_output, f"Run ID '{run_id}' found when filtering by all tags" + # Test the describe command with the extracted run ID + describe_result = runner.invoke(cli, ["application", "run", "describe", run_id]) + assert describe_result.exit_code == 0 + assert f"Run Details for {run_id}" in normalize_output(describe_result.stdout) + assert "Status (Termination Reason): PENDING" in normalize_output( + describe_result.stdout + ) or "Status (Termination Reason): PROCESSING" in normalize_output(describe_result.stdout) + assert "Queue Position:" in normalize_output(describe_result.stdout) + assert "test_cli_run_submit_and_describe_and_cancel_and_download_and_delete" in normalize_output( + describe_result.stdout + ) - # Test that we can find this run by all of its tags and it's note - list_result = runner.invoke( - cli, - [ - "application", - "run", - "list", - "--note-regex", - "note_of_this_complex_test", - "--tags", - "cli-test,test_cli_run_submit_and_describe_and_cancel_and_download_and_delete,further-tag", - ], - ) - assert list_result.exit_code == 0 - list_output = normalize_output(list_result.stdout) - assert run_id in list_output, f"Run ID '{run_id}' not found when filtering by all tags and note" - - # Test the describe command with the extracted run ID - describe_result = runner.invoke(cli, ["application", "run", "describe", run_id]) - assert describe_result.exit_code == 0 - assert f"Run Details for {run_id}" in normalize_output(describe_result.stdout) - assert "Status (Termination Reason): PENDING" in normalize_output( - describe_result.stdout - ) or "Status (Termination Reason): PROCESSING" in normalize_output(describe_result.stdout) - assert "Queue Position:" in normalize_output(describe_result.stdout) - assert "test_cli_run_submit_and_describe_and_cancel_and_download_and_delete" in normalize_output( - describe_result.stdout - ) + # Test the download command spots the run is still running + download_result = runner.invoke( + cli, ["application", "run", "result", "download", run_id, str(tmp_path), "--no-wait-for-completion"] + ) + assert download_result.exit_code == 0 + assert f"Downloaded results of run '{run_id}'" in normalize_output(download_result.stdout) - # Test the download command spots the run is still running - download_result = runner.invoke( - cli, ["application", "run", "result", "download", run_id, str(tmp_path), "--no-wait-for-completion"] - ) - assert download_result.exit_code == 0 - assert f"Downloaded results of run '{run_id}'" in normalize_output(download_result.stdout) - - # Test the cancel command with the extracted run ID - cancel_result = runner.invoke(cli, ["application", "run", "cancel", run_id]) - assert cancel_result.exit_code == 0 - assert f"Run with ID '{run_id}' has been canceled." in normalize_output(cancel_result.stdout) - - # Test the describe command with the extracted run ID on canceled run - describe_result = runner.invoke(cli, ["application", "run", "describe", run_id]) - assert describe_result.exit_code == 0 - assert f"Run Details for {run_id}" in normalize_output(describe_result.stdout) - assert "Status (Termination Reason): TERMINATED (RunTerminationReason.CANCELED_BY_USER)" in normalize_output( - describe_result.stdout - ) + # Test the cancel command with the extracted run ID + cancel_result = runner.invoke(cli, ["application", "run", "cancel", run_id]) + assert cancel_result.exit_code == 0 + assert f"Run with ID '{run_id}' has been canceled." in normalize_output(cancel_result.stdout) - download_result = runner.invoke(cli, ["application", "run", "result", "download", run_id, str(tmp_path)]) - assert download_result.exit_code == 0 + # Test the describe command with the extracted run ID on canceled run + describe_result = runner.invoke(cli, ["application", "run", "describe", run_id]) + assert describe_result.exit_code == 0 + assert f"Run Details for {run_id}" in normalize_output(describe_result.stdout) + assert "Status (Termination Reason): TERMINATED (RunTerminationReason.CANCELED_BY_USER)" in normalize_output( + describe_result.stdout + ) - # Verify the download message and path - assert f"Downloaded results of run '{run_id}'" in normalize_output(download_result.stdout) - # TODO(andreas): Would also be great to check if it is canceled by user - assert "status: terminated" in normalize_output(download_result.stdout) + download_result = runner.invoke(cli, ["application", "run", "result", "download", run_id, str(tmp_path)]) + assert download_result.exit_code == 0 - # More robust path verification - normalize paths and check if the destination path is mentioned in the output - normalized_tmp_path = str(Path(tmp_path).resolve()) - normalized_output = normalize_output(download_result.stdout).replace(" ", "") - normalized_path_in_output = normalized_tmp_path.replace(" ", "") + # Verify the download message and path + assert f"Downloaded results of run '{run_id}'" in normalize_output(download_result.stdout) + # TODO(andreas): Would also be great to check if it is canceled by user + assert "status: terminated" in normalize_output(download_result.stdout) - assert normalized_path_in_output in normalized_output, ( - f"Expected path '{normalized_tmp_path}' not found in output: '{download_result.output}'" - ) + # More robust path verification - normalize paths and check if destination path is mentioned in output + normalized_tmp_path = str(Path(tmp_path).resolve()) + normalized_output = normalize_output(download_result.stdout).replace(" ", "") + normalized_path_in_output = normalized_tmp_path.replace(" ", "") - download_result = runner.invoke(cli, ["application", "run", "result", "download", run_id, "/4711"]) - if platform.system() == "Windows": - assert download_result.exit_code == 0 - else: - assert download_result.exit_code == 2 - assert f"Failed to create destination directory '/4711/{run_id}'" in normalize_output(download_result.stdout) - - # Test the result delete command with the extracted run ID - delete_result = runner.invoke(cli, ["application", "run", "result", "delete", run_id]) - assert delete_result.exit_code == 0 - assert f"Results for run with ID '{run_id}' have been deleted." in normalize_output(delete_result.stdout) - - # Test the describe command with the extracted run ID on deleted run - describe_result = runner.invoke(cli, ["application", "run", "describe", run_id]) - assert describe_result.exit_code == 0 - assert f"Run Details for {run_id}" in normalize_output(describe_result.stdout) - assert "Status (Termination Reason): TERMINATED (RunTerminationReason.CANCELED_BY_USER)" in normalize_output( - describe_result.stdout - ) + assert normalized_path_in_output in normalized_output, ( + f"Expected path '{normalized_tmp_path}' not found in output: '{download_result.output}'" + ) + + download_result = runner.invoke(cli, ["application", "run", "result", "download", run_id, "/4711"]) + if platform.system() == "Windows": + assert download_result.exit_code == 0 + else: + assert download_result.exit_code == 2 + assert f"Failed to create destination directory '/4711/{run_id}'" in normalize_output( + download_result.stdout + ) + + # Test the result delete command with the extracted run ID + delete_result = runner.invoke(cli, ["application", "run", "result", "delete", run_id]) + assert delete_result.exit_code == 0 + assert f"Results for run with ID '{run_id}' have been deleted." in normalize_output(delete_result.stdout) + + # Test the describe command with the extracted run ID on deleted run + describe_result = runner.invoke(cli, ["application", "run", "describe", run_id]) + assert describe_result.exit_code == 0 + assert f"Run Details for {run_id}" in normalize_output(describe_result.stdout) + assert "Status (Termination Reason): TERMINATED (RunTerminationReason.CANCELED_BY_USER)" in normalize_output( + describe_result.stdout + ) # TODO(Helmut): Activate when PAPI fixed @@ -1029,40 +1126,13 @@ def test_cli_run_update_item_metadata_not_dict(runner: CliRunner) -> None: @pytest.mark.e2e @pytest.mark.timeout(timeout=180) @pytest.mark.sequential -def test_cli_run_dump_and_update_custom_metadata(runner: CliRunner, tmp_path: Path) -> None: # noqa: PLR0914, PLR0915 +def test_cli_run_dump_and_update_custom_metadata(runner: CliRunner, tmp_path: Path) -> None: """Test dumping and updating custom metadata via CLI commands.""" import json import random - # Submit a dedicated run with a unique tag to avoid shared-state race conditions - csv_content = "external_id;checksum_base64_crc32c;resolution_mpp;width_px;height_px;staining_method;tissue;disease;" - csv_content += "platform_bucket_url\n" - csv_content += ";5onqtA==;0.26268186053789266;7447;7196;H&E;LUNG;LUNG_CANCER;gs://bucket/test" - csv_path = tmp_path / "dummy.csv" - csv_path.write_text(csv_content) - unique_tag = f"test_metadata_{datetime.now(tz=UTC).timestamp()}" - submit_result = runner.invoke( - cli, - [ - "application", - "run", - "submit", - HETA_APPLICATION_ID, - str(csv_path), - "--tags", - unique_tag, - "--deadline", - (datetime.now(tz=UTC) + timedelta(minutes=5)).isoformat(), - ], - ) - assert submit_result.exit_code == 0 - submit_output = normalize_output(submit_result.stdout) - run_id_match = re.search(r"Submitted run with id '([0-9a-f-]+)' for '", submit_output) - assert run_id_match, f"Failed to extract run ID from submit output '{submit_output}'" - run_id = run_id_match.group(1) - - try: + with submitted_run(runner, tmp_path, CSV_CONTENT_SPOT0, extra_args=["--tags", unique_tag]) as run_id: # Step 1: Dump initial custom metadata of run result = runner.invoke(cli, ["application", "run", "dump-metadata", run_id]) assert result.exit_code == 0 @@ -1144,48 +1214,20 @@ def test_cli_run_dump_and_update_custom_metadata(runner: CliRunner, tmp_path: Pa # when operations are performed. Instead, verify the random field was removed # and the structure remains consistent. assert isinstance(final_metadata, dict), "Final metadata should be a dictionary" - finally: - runner.invoke(cli, ["application", "run", "cancel", run_id]) @pytest.mark.e2e @pytest.mark.timeout(timeout=240) @pytest.mark.sequential -def test_cli_run_dump_and_update_item_custom_metadata(runner: CliRunner, tmp_path: Path) -> None: # noqa: PLR0914, PLR0915 +def test_cli_run_dump_and_update_item_custom_metadata(runner: CliRunner, tmp_path: Path) -> None: # noqa: PLR0915 """Test dumping and updating item custom metadata via CLI commands.""" import json import random - # Submit a dedicated run with a unique tag to avoid shared-state race conditions. - # Use a non-empty external_id so describe output contains a matchable "Item External ID: `...`". - csv_content = "external_id;checksum_base64_crc32c;resolution_mpp;width_px;height_px;staining_method;tissue;disease;" - csv_content += "platform_bucket_url\n" - csv_content += "dummy-item-001;5onqtA==;0.26268186053789266;7447;7196;H&E;LUNG;LUNG_CANCER;gs://bucket/test" - csv_path = tmp_path / "dummy.csv" - csv_path.write_text(csv_content) - unique_tag = f"test_item_metadata_{datetime.now(tz=UTC).timestamp()}" - submit_result = runner.invoke( - cli, - [ - "application", - "run", - "submit", - HETA_APPLICATION_ID, - str(csv_path), - "--tags", - unique_tag, - "--deadline", - (datetime.now(tz=UTC) + timedelta(minutes=5)).isoformat(), - ], - ) - assert submit_result.exit_code == 0 - submit_output = normalize_output(submit_result.stdout) - run_id_match = re.search(r"Submitted run with id '([0-9a-f-]+)' for '", submit_output) - assert run_id_match, f"Failed to extract run ID from submit output '{submit_output}'" - run_id = run_id_match.group(1) - - try: + # CSV_CONTENT_SPOT0 uses SPOT_0_FILENAME as external_id, which the describe output surfaces + # as "Item External ID: `...`" — the get_external_id() helper below captures it dynamically. + with submitted_run(runner, tmp_path, CSV_CONTENT_SPOT0, extra_args=["--tags", unique_tag]) as run_id: # Wait for items to appear in the run (describe until external_id is available) @retry(wait=wait_exponential(multiplier=1, max=15), stop=stop_after_attempt(8)) def get_external_id() -> str: @@ -1275,8 +1317,6 @@ def get_external_id() -> str: # Note: Similar to run metadata, we verify the structure remains consistent # rather than doing exact equality comparison due to dynamic fields assert isinstance(final_metadata, dict), "Final metadata should be a dictionary" - finally: - runner.invoke(cli, ["application", "run", "cancel", run_id]) @retry(wait=wait_exponential(multiplier=2, max=10), stop=stop_after_attempt(5)) @@ -1378,40 +1418,14 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL assert "description" in app_details # Step 3: Submit a run with custom tag - csv_content = "external_id;checksum_base64_crc32c;resolution_mpp;width_px;height_px;staining_method;tissue;disease;" - csv_content += "platform_bucket_url\n" - csv_content += ";5onqtA==;0.26268186053789266;7447;7196;H&E;LUNG;LUNG_CANCER;gs://bucket/test" - csv_path = tmp_path / "dummy.csv" - csv_path.write_text(csv_content) - unique_tag = f"test_json_format_{datetime.now(tz=UTC).timestamp()}" - result = runner.invoke( - cli, - [ - "application", - "run", - "submit", - HETA_APPLICATION_ID, - str(csv_path), - "--tags", - unique_tag, - "--note", - "Testing JSON format output", - "--gpu-type", - PIPELINE_GPU_TYPE, - ], - ) - output = normalize_output(result.stdout) - assert result.exit_code == 0 - - # Extract run ID from the output - run_id_match = re.search(r"Submitted run with id '([0-9a-f-]+)' for '", output) - assert run_id_match, f"Failed to extract run ID from output '{output}'" - run_id = run_id_match.group(1) - run_id_2: str | None = None # Initialize before try block to avoid UnboundLocalError in finally - - try: + with submitted_run( + runner, + tmp_path, + CSV_CONTENT_SPOT0, + extra_args=["--tags", unique_tag, "--note", "Testing JSON format output", "--gpu-type", PIPELINE_GPU_TYPE], + ) as run_id: # Step 4: List runs with JSON format and filter by tag runs_data = list_runs_by_tag(unique_tag, runner, expected_count=1) @@ -1449,14 +1463,7 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL # Step 7: Test run describe with JSON format describe_result = runner.invoke( cli, - [ - "application", - "run", - "describe", - run_id, - "--format", - "json", - ], + ["application", "run", "describe", run_id, "--format", "json"], ) assert describe_result.exit_code == 0 @@ -1473,15 +1480,7 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL # Step 8: Test empty result with JSON format empty_result = runner.invoke( cli, - [ - "application", - "run", - "list", - "--tags", - "non_existent_tag_12345", - "--format", - "json", - ], + ["application", "run", "list", "--tags", "non_existent_tag_12345", "--format", "json"], ) assert empty_result.exit_code == 0 empty_runs = json.loads(empty_result.stdout) @@ -1489,14 +1488,11 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL assert len(empty_runs) == 0, "Should return empty list for non-existent tag" # Step 9: Submit a second run with same tags for testing cancel-by-filter - submit_result_2 = runner.invoke( - cli, - [ - "application", - "run", - "submit", - HETA_APPLICATION_ID, - str(csv_path), + with submitted_run( + runner, + tmp_path, + CSV_CONTENT_SPOT0, + extra_args=[ "--tags", unique_tag, "--note", @@ -1504,33 +1500,19 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL "--gpu-type", PIPELINE_GPU_TYPE, ], - ) - assert submit_result_2.exit_code == 0 - output_2 = submit_result_2.stdout - run_id_match_2 = re.search(r"Submitted run with id '([0-9a-f-]+)' for '", output_2) - assert run_id_match_2, f"Failed to extract run ID from output '{output_2}'" - run_id_2 = run_id_match_2.group(1) + ) as run_id_2: + # Wait for both runs to appear in the list (handles eventual consistency) + list_runs_by_tag(unique_tag, runner, expected_count=2) - # Wait for both runs to appear in the list (handles eventual consistency) - list_runs_by_tag(unique_tag, runner, expected_count=2) + # Step 10: Test dry-run mode - verify it shows what would be canceled without actually canceling + logger.info("Step 10: Testing dry-run mode for cancel-by-filter") - finally: - # Step 10: Test dry-run mode - verify it shows what would be canceled without actually canceling - logger.info("Step 10: Testing dry-run mode for cancel-by-filter") - - # First get the application version from the first run - app_version_result = runner.invoke( - cli, - [ - "application", - "run", - "describe", - run_id, - "--format", - "json", - ], - ) - if app_version_result.exit_code == 0: + # First get the application version from the first run + app_version_result = runner.invoke( + cli, + ["application", "run", "describe", run_id, "--format", "json"], + ) + assert app_version_result.exit_code == 0, f"Failed to describe run: {app_version_result.stdout}" run_details = json.loads(app_version_result.stdout) app_version = run_details["version_number"] @@ -1556,8 +1538,7 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL # Step 11: Verify runs are NOT canceled after dry-run by describing them logger.info("Step 11: Verifying runs are NOT canceled after dry-run") - runs_to_check = [run_id] + ([run_id_2] if run_id_2 else []) - for idx, rid in enumerate(runs_to_check, 1): + for idx, rid in enumerate([run_id, run_id_2], 1): describe_result = runner.invoke(cli, ["application", "run", "describe", rid, "--format", "json"]) assert describe_result.exit_code == 0, f"Failed to describe run {idx}: {describe_result.stdout}" described_run = json.loads(describe_result.stdout) @@ -1584,7 +1565,6 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL app_version, ], ) - # Should succeed (exit code 0) assert cancel_by_filter_result.exit_code == 0 assert "Successfully canceled 2 run(s)" in cancel_by_filter_result.stdout logger.info("Successfully canceled both runs using cancel-by-filter") @@ -1592,8 +1572,7 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL # Step 13: Verify runs ARE canceled by describing them again # Use retry to handle read-replica lag and slow API responses after cancel. logger.info("Step 13: Verifying runs ARE canceled after actual cancel") - runs_to_verify = [run_id] + ([run_id_2] if run_id_2 else []) - for idx, rid in enumerate(runs_to_verify, 1): + for idx, rid in enumerate([run_id, run_id_2], 1): for attempt in Retrying( wait=wait_exponential(multiplier=2, min=1, max=15), stop=stop_after_attempt(5), @@ -1616,8 +1595,3 @@ def test_cli_json_format_and_cancel_by_filter_with_dry_run( # noqa: PLR0915, PL f"Run {idx} has unexpected termination reason: {described_run.get('termination_reason')}" ) logger.info("Run {} successfully canceled (state: TERMINATED, reason: CANCELED_BY_USER)", idx) - else: - # Fallback: cancel individually if we couldn't get the version - runs_to_cancel = [run_id] + ([run_id_2] if run_id_2 else []) - for rid in runs_to_cancel: - runner.invoke(cli, ["application", "run", "cancel", rid]) diff --git a/tests/aignostics/platform/service_test.py b/tests/aignostics/platform/service_test.py index ce947cfa..33a42792 100644 --- a/tests/aignostics/platform/service_test.py +++ b/tests/aignostics/platform/service_test.py @@ -51,6 +51,7 @@ def test_determine_api_authenticated_health_success() -> None: """Health.UP returned when the dedicated pool responds 200 with auth token.""" mock_response = MagicMock() mock_response.status = HTTPStatus.OK + mock_response.data = b'{"status": "UP"}' mock_pool = MagicMock() mock_pool.request.return_value = mock_response @@ -94,10 +95,40 @@ def test_determine_api_authenticated_health_handles_exception() -> None: @pytest.mark.unit -def test_determine_api_public_health_success() -> None: - """Health.UP returned when the public pool responds 200.""" +def test_determine_api_public_health_non_200() -> None: + """Health.DOWN returned when the public pool responds with non-200.""" + mock_response = MagicMock() + mock_response.status = HTTPStatus.SERVICE_UNAVAILABLE + + mock_pool = MagicMock() + mock_pool.request.return_value = mock_response + + with patch.object(Service, "_get_http_pool", return_value=mock_pool): + result = Service()._determine_api_public_health() + + assert result.status == Health.Code.DOWN + assert result.reason is not None + + +@pytest.mark.unit +def test_determine_api_public_health_handles_exception() -> None: + """Health.DOWN returned when the public pool raises.""" + mock_pool = MagicMock() + mock_pool.request.side_effect = ConnectionError("unreachable") + + with patch.object(Service, "_get_http_pool", return_value=mock_pool): + result = Service()._determine_api_public_health() + + assert result.status == Health.Code.DOWN + assert result.reason is not None + + +@pytest.mark.unit +def test_determine_api_public_health_up_response() -> None: + """HTTP 200 + {"status": "UP"} body → Health.UP (explicit JSON body check).""" mock_response = MagicMock() mock_response.status = HTTPStatus.OK + mock_response.data = b'{"status": "UP"}' mock_pool = MagicMock() mock_pool.request.return_value = mock_response @@ -109,10 +140,11 @@ def test_determine_api_public_health_success() -> None: @pytest.mark.unit -def test_determine_api_public_health_non_200() -> None: - """Health.DOWN returned when the public pool responds with non-200.""" +def test_determine_api_public_health_degraded_response() -> None: + """HTTP 200 + {"status": "DEGRADED"} body → Health.DEGRADED with reason set.""" mock_response = MagicMock() - mock_response.status = HTTPStatus.SERVICE_UNAVAILABLE + mock_response.status = HTTPStatus.OK + mock_response.data = b'{"status": "DEGRADED"}' mock_pool = MagicMock() mock_pool.request.return_value = mock_response @@ -120,15 +152,36 @@ def test_determine_api_public_health_non_200() -> None: with patch.object(Service, "_get_http_pool", return_value=mock_pool): result = Service()._determine_api_public_health() - assert result.status == Health.Code.DOWN + assert result.status == Health.Code.DEGRADED assert result.reason is not None @pytest.mark.unit -def test_determine_api_public_health_handles_exception() -> None: - """Health.DOWN returned when the public pool raises.""" +def test_determine_api_public_health_degraded_response_with_reason() -> None: + """HTTP 200 + {"status": "DEGRADED", "reason": "DB slow"} → reason == "DB slow".""" + mock_response = MagicMock() + mock_response.status = HTTPStatus.OK + mock_response.data = b'{"status": "DEGRADED", "reason": "DB slow"}' + mock_pool = MagicMock() - mock_pool.request.side_effect = ConnectionError("unreachable") + mock_pool.request.return_value = mock_response + + with patch.object(Service, "_get_http_pool", return_value=mock_pool): + result = Service()._determine_api_public_health() + + assert result.status == Health.Code.DEGRADED + assert result.reason == "DB slow" + + +@pytest.mark.unit +def test_determine_api_public_health_unknown_status_is_down() -> None: + """HTTP 200 + {"status": "UNKNOWN"} body → Health.DOWN.""" + mock_response = MagicMock() + mock_response.status = HTTPStatus.OK + mock_response.data = b'{"status": "UNKNOWN"}' + + mock_pool = MagicMock() + mock_pool.request.return_value = mock_response with patch.object(Service, "_get_http_pool", return_value=mock_pool): result = Service()._determine_api_public_health() @@ -137,6 +190,26 @@ def test_determine_api_public_health_handles_exception() -> None: assert result.reason is not None +@pytest.mark.unit +def test_determine_api_authenticated_health_degraded_response() -> None: + """HTTP 200 + {"status": "DEGRADED"} body → Health.DEGRADED with reason set.""" + mock_response = MagicMock() + mock_response.status = HTTPStatus.OK + mock_response.data = b'{"status": "DEGRADED"}' + + mock_pool = MagicMock() + mock_pool.request.return_value = mock_response + + with ( + patch.object(Service, "_get_http_pool", return_value=mock_pool), + patch(_PATCH_AUTH_GETTER, return_value="test-token"), + ): + result = Service()._determine_api_authenticated_health() + + assert result.status == Health.Code.DEGRADED + assert result.reason is not None + + @pytest.mark.unit def test_health_returns_both_components() -> None: """health() aggregates api_public and api_authenticated component keys.""" diff --git a/tests/aignostics/utils/health_test.py b/tests/aignostics/utils/health_test.py index 1198b833..2e9b6eb4 100644 --- a/tests/aignostics/utils/health_test.py +++ b/tests/aignostics/utils/health_test.py @@ -5,6 +5,7 @@ from aignostics.utils._health import Health DB_FAILURE = "DB failure" +CACHE_SLOW = "cache slow" @pytest.mark.unit @@ -81,7 +82,7 @@ def test_compute_health_from_components_single_down(record_property) -> None: result = health.compute_health_from_components() assert result.status == Health.Code.DOWN - assert result.reason == "Component 'database' is DOWN" + assert result.reason == f"Component 'database' is DOWN ({DB_FAILURE})" assert result is health # Should return self @@ -185,6 +186,82 @@ def test_validate_health_state_integration(record_property) -> None: assert health.components["monitoring"].status == Health.Code.UP +@pytest.mark.unit +def test_health_degraded_requires_reason(record_property) -> None: + """Test that a DEGRADED status requires a reason.""" + record_property("tested-item-id", "SPEC-UTILS-SERVICE") + with pytest.raises(ValueError, match="Health DEGRADED must have a reason"): + Health(status=Health.Code.DEGRADED) + + health = Health(status=Health.Code.DEGRADED, reason="x") + assert health.status == Health.Code.DEGRADED + assert health.reason == "x" + + +@pytest.mark.unit +def test_str_representation_degraded(record_property) -> None: + """Test string representation of DEGRADED health status.""" + record_property("tested-item-id", "SPEC-UTILS-SERVICE") + health = Health(status=Health.Code.DEGRADED, reason="slow") + assert str(health) == "DEGRADED: slow" + + +@pytest.mark.unit +def test_compute_health_from_components_single_degraded(record_property) -> None: + """Test that parent becomes DEGRADED when one component is DEGRADED.""" + record_property("tested-item-id", "SPEC-UTILS-SERVICE") + health = Health(status=Health.Code.UP) + health.components = { + "cache": Health(status=Health.Code.DEGRADED, reason=CACHE_SLOW), + "api": Health(status=Health.Code.UP), + } + + result = health.compute_health_from_components() + + assert result.status == Health.Code.DEGRADED + assert result.reason is not None + assert "cache" in result.reason + assert "DEGRADED" in result.reason + + +@pytest.mark.unit +def test_compute_health_from_components_multiple_degraded(record_property) -> None: + """Test that parent becomes DEGRADED with reason listing all degraded components.""" + record_property("tested-item-id", "SPEC-UTILS-SERVICE") + health = Health(status=Health.Code.UP) + health.components = { + "cache": Health(status=Health.Code.DEGRADED, reason=CACHE_SLOW), + "queue": Health(status=Health.Code.DEGRADED, reason="queue slow"), + "api": Health(status=Health.Code.UP), + } + + result = health.compute_health_from_components() + + assert result.status == Health.Code.DEGRADED + assert result.reason is not None + assert "cache" in result.reason + assert "queue" in result.reason + assert "DEGRADED" in result.reason + + +@pytest.mark.unit +def test_compute_health_from_components_down_trumps_degraded(record_property) -> None: + """Test that DOWN takes priority over DEGRADED when both are present.""" + record_property("tested-item-id", "SPEC-UTILS-SERVICE") + health = Health(status=Health.Code.UP) + health.components = { + "database": Health(status=Health.Code.DOWN, reason=DB_FAILURE), + "cache": Health(status=Health.Code.DEGRADED, reason=CACHE_SLOW), + } + + result = health.compute_health_from_components() + + assert result.status == Health.Code.DOWN + assert result.reason is not None + assert "database" in result.reason + assert "DOWN" in result.reason + + @pytest.mark.unit def test_health_manually_set_components_validated(record_property) -> None: """Test that manually setting components triggers validation."""