diff --git a/doc/changelog.d/4310.added.md b/doc/changelog.d/4310.added.md new file mode 100644 index 00000000000..d18b01591ca --- /dev/null +++ b/doc/changelog.d/4310.added.md @@ -0,0 +1 @@ +Add monitoring thread to _multi_connect for early timeout exit diff --git a/pyproject.toml b/pyproject.toml index cd7f809a58b..3fd9b40958d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -139,7 +139,6 @@ addopts = [ "--log-file=pytest.log", "--maxfail=10", "--profile", - "--profile-svg", "--random-order", "--random-order-bucket=class", "--report-log-exclude-logs-on-passed-tests", @@ -284,6 +283,7 @@ exclude = [ "^tests/", ] explicit_package_bases = true +files = ["src", "tests"] namespace_packages = true diff --git a/src/ansys/mapdl/core/cli/start.py b/src/ansys/mapdl/core/cli/start.py index 2ef5a88595b..462dc095974 100644 --- a/src/ansys/mapdl/core/cli/start.py +++ b/src/ansys/mapdl/core/cli/start.py @@ -286,7 +286,8 @@ def start( ) if len(out) == 3: - header = f"Launched an MAPDL instance (PID={out[2]}) at " + pid = out[2].pid + header = f"Launched an MAPDL instance (PID={pid}) at " else: header = "Launched an MAPDL instance at " diff --git a/src/ansys/mapdl/core/launcher.py b/src/ansys/mapdl/core/launcher.py index df0e4704827..51025374eb0 100644 --- a/src/ansys/mapdl/core/launcher.py +++ b/src/ansys/mapdl/core/launcher.py @@ -1819,10 +1819,7 @@ def launch_mapdl( raise handle_launch_exceptions(exception) if args["just_launch"]: - out: list[Any] = [args["ip"], args["port"]] - if hasattr(process, "pid"): - out += [process.pid] - return out + return (args["ip"], args["port"], process) ######################################## # Connect to MAPDL using gRPC diff --git a/src/ansys/mapdl/core/mapdl_grpc.py b/src/ansys/mapdl/core/mapdl_grpc.py index 4250b68820b..0d70d4b55db 100644 --- a/src/ansys/mapdl/core/mapdl_grpc.py +++ b/src/ansys/mapdl/core/mapdl_grpc.py @@ -72,6 +72,7 @@ ) from ansys.mapdl.core.errors import ( MapdlConnectionError, + MapdlDidNotStart, MapdlError, MapdlExitedError, MapdlRuntimeError, @@ -618,40 +619,242 @@ def _multi_connect(self, n_attempts=5, timeout=15): connected = False attempt_timeout = int(timeout / n_attempts) - max_time = time.time() + timeout - i = 1 - while time.time() < max_time and i <= n_attempts: - self._log.debug("Connection attempt %d", i) - connected = self._connect(timeout=attempt_timeout) - i += 1 - if connected: - self._log.debug("Connected") - break - else: - # Check if mapdl process is alive - msg = ( - f"Unable to connect to MAPDL gRPC instance at {self._channel_str}.\n" - f"Reached either maximum amount of connection attempts ({n_attempts}) or timeout ({timeout} s)." - ) + # Start monitoring thread to check if MAPDL is alive + monitor_stop_event = threading.Event() + monitor_exception = {"error": None} + + def monitor_mapdl_alive(): + """Monitor thread to check if MAPDL process is alive. + + Use direct process status checks to avoid PID reuse issues on + Windows. Support both subprocess.Popen and psutil.Process objects. + """ + try: + while not monitor_stop_event.is_set(): + # Only monitor if we have a local process and a path + if self._local and self._mapdl_process and self._path: + try: + # Allow launcher-level checks to run here as well. + # Tests often patch `_check_process_is_alive` to force + # an early failure; invoking it from the monitor thread + # ensures the monitor detects process death quickly. + try: + from ansys.mapdl.core.launcher import ( + _check_process_is_alive, + ) + + try: + _check_process_is_alive( + self._mapdl_process, self._path + ) + except MapdlDidNotStart as e: + raise + except MapdlDidNotStart: + # Propagate to outer except to set monitor_exception + raise + except Exception: + # Ignore import errors or other issues and continue + pass + + # If _mapdl_process is a psutil.Process-like object + # Prefer direct checks, but if the provided terminal + # process already exited (common on Windows), try to + # discover the real MAPDL processes. + process_alive = False + try: + if hasattr(self._mapdl_process, "is_running"): + process_alive = bool( + self._mapdl_process.is_running() + ) + else: + process_alive = self._mapdl_process.poll() is None + except Exception: + process_alive = False + + if not process_alive: + # Try to find actual MAPDL processes spawned by the terminal + procs = self._find_live_mapdl_processes() + if not procs: + raise MapdlDidNotStart("MAPDL process has died.") + # Do NOT overwrite the original subprocess.Popen + # object (it may be the terminal). Only cache the + # discovered PIDs for future checks. + try: + self._pids = [int(p.pid) for p in procs] + except Exception: + pass + + except Exception as e: + # Process died or something went wrong + monitor_exception["error"] = e + monitor_stop_event.set() + break + + # Check every 0.5 seconds + monitor_stop_event.wait(0.5) + + except Exception as e: + self._log.debug(f"Monitor thread encountered error: {e}") + monitor_exception["error"] = e + + # Start the monitoring thread + monitor_thread = None + if self._local and self._mapdl_process and self._path: + monitor_thread = threading.Thread(target=monitor_mapdl_alive, daemon=True) + monitor_thread.start() + self._log.debug("Started MAPDL monitoring thread") + + try: + max_time = time.time() + timeout + i = 1 + while time.time() < max_time and i <= n_attempts: + # Check if monitoring thread detected a problem + if monitor_exception["error"] is not None: + self._log.debug( + "Monitor detected MAPDL process issue, stopping connection attempts" + ) + raise monitor_exception["error"] + + # Pre-check process liveness before attempting connection to + # avoid repeated timeouts when the process has actually died. + if self._local and self._mapdl_process: + process_alive = False + # First, allow launcher-level checks to run. Tests may + # patch `_check_process_is_alive` to force a quick + # MapdlDidNotStart; calling it here preserves that + # behavior and keeps the monitor responsive. + try: + from ansys.mapdl.core.launcher import _check_process_is_alive + + try: + _check_process_is_alive(self._mapdl_process, self._path) + except MapdlDidNotStart as e: + monitor_exception["error"] = e + monitor_stop_event.set() + break + except Exception: + # Ignore import or other issues and continue with + # our heuristics. + pass + + try: + if hasattr(self._mapdl_process, "is_running"): + # Only accept explicit boolean results from is_running(). + # Some tests inject Mock objects whose is_running() returns + # another Mock (truthy) which misleads our checks. If the + # return is not a boolean, consider the status unknown + # and treat as not alive so we can attempt discovery. + try: + res = self._mapdl_process.is_running() + if isinstance(res, bool): + process_alive = res + else: + process_alive = False + except Exception: + process_alive = False + else: + process_alive = self._mapdl_process.poll() is None + except Exception: + process_alive = False + + # If the terminal process appears alive but we have a port, + # make sure the actual MAPDL server is still listening on + # that port. This detects the Windows case where the + # terminal remains but the MAPDL server died. + if process_alive and getattr(self, "_port", None): + try: + from ansys.mapdl.core.launcher import get_process_at_port + + proc_at_port = None + try: + proc_at_port = get_process_at_port(self._port) + except Exception: + proc_at_port = None + + if ( + not proc_at_port + or not getattr( + proc_at_port, "is_running", lambda: False + )() + ): + # The server that should be listening on the port is gone + process_alive = False + except Exception: + # If anything goes wrong with the port check, continue + # with the existing heuristics. + pass + + if not process_alive: + # Attempt to discover the actual MAPDL processes that + # might have been spawned by the terminal process. + procs = self._find_live_mapdl_processes() + if not procs: + raise MapdlDidNotStart("MAPDL process has died.") + # Do not overwrite the original process object; cache + # discovered PIDs instead so later checks can use them. + try: + self._pids = [int(p.pid) for p in procs] + except Exception: + pass + + self._log.debug("Connection attempt %d", i) + connected = self._connect(timeout=attempt_timeout) + i += 1 + if connected: + self._log.debug("Connected") + break - if self._mapdl_process is not None and psutil.pid_exists( - self._mapdl_process.pid - ): - # Process is alive - raise MapdlConnectionError( - msg - + f" The MAPDL process seems to be alive (PID: {self._mapdl_process.pid}) but PyMAPDL cannot connect to it." - ) else: - pid_msg = ( - f" PID: {self._mapdl_process.pid}" - if self._mapdl_process is not None - else "" - ) - raise MapdlConnectionError( - msg + f" The MAPDL process has died{pid_msg}." + # Check if mapdl process is alive + msg = ( + f"Unable to connect to MAPDL gRPC instance at {self._channel_str}.\n" + f"Reached either maximum amount of connection attempts ({n_attempts}) or timeout ({timeout} s)." ) + # Use our robust subprocess liveness check first + alive = self._is_alive_subprocess() + if alive: + pid = getattr(self._mapdl_process, "pid", None) + raise MapdlConnectionError( + msg + + f" The MAPDL process seems to be alive (PID: {pid}) but PyMAPDL cannot connect to it." + ) + + # If that reported not alive, double-check cached _pids list + alive_cached = False + try: + for pid in self._pids or []: + try: + if psutil.pid_exists(int(pid)): + alive_cached = True + cached_pid = pid + break + except Exception: + continue + except Exception: + alive_cached = False + + if alive_cached: + raise MapdlConnectionError( + msg + + f" The MAPDL process seems to be alive (PID: {cached_pid}) but PyMAPDL cannot connect to it." + ) + else: + pid_msg = ( + f" PID: {getattr(self._mapdl_process, 'pid', None)}" + if self._mapdl_process is not None + else "" + ) + raise MapdlConnectionError( + msg + f" The MAPDL process has died{pid_msg}." + ) + finally: + # Stop the monitoring thread + monitor_stop_event.set() + if monitor_thread is not None: + monitor_thread.join(timeout=1.0) + self._log.debug("Stopped MAPDL monitoring thread") + self._exited = False def _is_alive_subprocess(self): # numpydoc ignore=RT01 @@ -660,8 +863,168 @@ def _is_alive_subprocess(self): # numpydoc ignore=RT01 * False if it is not. * None if there was no process """ - if self._mapdl_process: - return psutil.pid_exists(self._mapdl_process.pid) + if not self._mapdl_process: + return None + + # Prefer direct process status checks to avoid PID reuse issues on + # Windows. Support both subprocess.Popen and psutil.Process objects. + try: + # psutil.Process-like object + if hasattr(self._mapdl_process, "is_running"): + try: + res = self._mapdl_process.is_running() + if isinstance(res, bool): + return res + # Non-boolean results (e.g., Mock) are treated as not alive + return False + except psutil.NoSuchProcess: + return False + + # subprocess.Popen-like object: prefer poll() if available + if hasattr(self._mapdl_process, "poll"): + try: + return self._mapdl_process.poll() is None + except Exception: + # Fall through to pid-based checks + pass + + pid = getattr(self._mapdl_process, "pid", None) + if pid is None: + return None + + try: + # Ensure pid is an int (tests may inject Mock objects) + try: + pid_int = int(pid) + except Exception: + return False + + proc = psutil.Process(pid_int) + return proc.is_running() + except (psutil.NoSuchProcess, ValueError, TypeError): + return False + + except Exception: + # Fallback to pid_exists if something unexpected occurs + pid = getattr(self._mapdl_process, "pid", None) + if pid is None: + return None + return psutil.pid_exists(pid) + + def _find_live_mapdl_processes(self) -> list[psutil.Process]: + """Attempt to find live MAPDL processes when the provided terminal + process has already exited (common on Windows where the terminal + spawns MAPDL and then exits). + + Strategy: + - If `self._mapdl_process` is alive, return it. + - If cached `_pids` exist, return any that are alive. + - Otherwise scan system processes for ones whose working directory + matches `self._path`, or whose command line contains the jobname. + + Returns + ------- + list[psutil.Process] + List of live psutil.Process objects that likely correspond to MAPDL. + """ + found: list[psutil.Process] = [] + + # 0) If we have a port, try to find process directly listening on it + try: + from ansys.mapdl.core.launcher import get_process_at_port + + if getattr(self, "_port", None): + try: + proc_at_port = get_process_at_port(self._port) + if proc_at_port and proc_at_port.is_running(): + return [proc_at_port] + except Exception: + pass + except Exception: + # if import fails, continue with other heuristics + pass + + # 1) If we already have a live process object + try: + if self._mapdl_process: + if hasattr(self._mapdl_process, "is_running"): + try: + if self._mapdl_process.is_running(): + return [self._mapdl_process] + except psutil.NoSuchProcess: + pass + else: + # subprocess.Popen-like + try: + if self._mapdl_process.poll() is None: + pid = getattr(self._mapdl_process, "pid", None) + if pid is not None: + try: + return [psutil.Process(pid)] + except psutil.NoSuchProcess: + pass + except Exception: + pass + + # 2) Check cached pids + pids = [p for p in (self._pids or []) if p] + for pid in pids: + try: + p = psutil.Process(pid) + if p.is_running(): + found.append(p) + except psutil.NoSuchProcess: + continue + + if found: + return found + + # 3) Heuristic scan of system processes. Match by cwd or jobname in cmdline. + if self._path or getattr(self, "_jobname", None): + cwd_to_match = str(self._path) if self._path else None + jobname = getattr(self, "_jobname", None) or self.jobname + + for p in psutil.process_iter(["pid", "name", "cmdline", "cwd"]): + try: + info = p.info + # Match by cwd + if cwd_to_match and info.get("cwd"): + try: + if os.path.abspath(str(info.get("cwd"))).startswith( + os.path.abspath(cwd_to_match) + ): + if p.is_running(): + found.append(p) + continue + except Exception: + pass + + # Match by jobname in cmdline + cmdline = info.get("cmdline") or [] + if ( + cmdline + and jobname + and any(jobname in str(x) for x in cmdline) + ): + if p.is_running(): + found.append(p) + continue + + # Fallback: look for common MAPDL executable names in process name + pname = info.get("name") or "" + if pname and any( + x in pname.lower() for x in ("ansys", "mapdl") + ): + if p.is_running(): + found.append(p) + continue + + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + return found + except Exception: + return [] @property def process_is_alive(self): diff --git a/tests/test_launcher.py b/tests/test_launcher.py index 29e0d7d2de4..b414cba1c6b 100644 --- a/tests/test_launcher.py +++ b/tests/test_launcher.py @@ -36,6 +36,7 @@ from ansys.mapdl import core as pymapdl from ansys.mapdl.core.errors import ( IncorrectMPIConfigurationError, + MapdlConnectionError, MapdlDidNotStart, NotAvailableLicenses, NotEnoughResources, @@ -2162,7 +2163,6 @@ def mock_launch_grpc(cmd, run_location, env_vars=None, **kwargs): nonlocal captured_env_vars captured_env_vars = env_vars # Mock process object - from tests.test_launcher import get_fake_process return get_fake_process("Submitted batch job 1001") @@ -2376,3 +2376,315 @@ def mock_launch(start_parm, timeout=10): finally: # Restore original _launch mapdl._launch = original_launch + + +@requires("local") +@requires("nostudent") +def test_multi_connect_with_valid_process(mapdl, cleared): + """Test that _multi_connect successfully connects when MAPDL process is alive.""" + # Create a new MAPDL instance to test connection + port = 50060 + new_mapdl = launch_mapdl( + port=port, + additional_switches=QUICK_LAUNCH_SWITCHES, + start_timeout=30, + ) + + try: + # Verify it connected successfully + assert new_mapdl.is_alive + assert new_mapdl._mapdl_process is not None + assert psutil.pid_exists(new_mapdl._mapdl_process.pid) + + # Force a reconnection to test _multi_connect + new_mapdl._exited = True + new_mapdl.reconnect_to_mapdl(timeout=10) + + # Verify connection is restored + assert new_mapdl.is_alive + + finally: + new_mapdl.exit(force=True) + + +@requires("local") +@requires("nostudent") +def test_multi_connect_early_exit_on_process_death(tmpdir): + """Test that _multi_connect exits early when MAPDL process dies during connection.""" + from ansys.mapdl.core.cli.core import get_ansys_process_from_port + from ansys.mapdl.core.mapdl_grpc import MapdlGrpc + + run_location = str(tmpdir) + + # Start process + mapdl_0 = launch_mapdl( + run_location=run_location, + additional_switches=QUICK_LAUNCH_SWITCHES, + ) + original_process = mapdl_0._mapdl_process + port = mapdl_0.port + + # Give it a moment to start + sleep(1) + + # On Windows the launcher starts a terminal process that in turn starts MAPDL. + # We need to get the actual MAPDL process to kill it. + process = get_ansys_process_from_port(port) + + try: + # Create MapdlGrpc instance with the process + start_parm = { + "process": original_process, + "local": True, + "launched": True, + "run_location": run_location, + "jobname": "test_early_exit", + } + + # Kill the process before attempting connection + if psutil.pid_exists(process.pid): + process.kill() + sleep(0.5) + + # Now try to connect - should fail quickly instead of timing out + start_time = __import__("time").time() + + with pytest.raises((MapdlConnectionError, MapdlDidNotStart)): + mapdl = MapdlGrpc( + port=port, + timeout=30, # Long timeout - but should exit early + **start_parm, + ) + + elapsed_time = __import__("time").time() - start_time + + # Verify it failed much faster than the 30 second timeout + # It should detect the dead process within a few seconds + assert ( + elapsed_time < 10 + ), f"Took {elapsed_time}s, expected < 10s due to early exit" + + finally: + # Cleanup + if process.is_running(): + process.kill() + mapdl_0.exit(force=True) + + +@pytest.mark.parametrize( + "has_process,has_path,should_monitor", + [ + (True, True, True), # Local with process and path -> should monitor + (True, False, False), # Local with process but no path -> no monitor + (False, True, False), # Local with path but no process -> no monitor + (False, False, False), # Local with neither -> no monitor + ], +) +def test_multi_connect_monitoring_conditions( + mapdl, has_process, has_path, should_monitor +): + """Test that monitoring thread only starts under correct conditions.""" + # Mock the internal state + original_local = mapdl._local + original_process = mapdl._mapdl_process + original_path = mapdl._path + + try: + mapdl._local = True + mapdl._mapdl_process = Mock() if has_process else None + mapdl._path = "/some/path" if has_path else None + + # Mock _check_process_is_alive to not raise exceptions during monitoring + with patch("ansys.mapdl.core.launcher._check_process_is_alive"): + # Mock _connect to succeed immediately + with patch.object(mapdl, "_connect", return_value=True): + # Track if thread was started by checking log calls + with patch.object(mapdl._log, "debug") as mock_debug: + mapdl._multi_connect(n_attempts=1, timeout=1) + + # Check if monitoring thread debug message was logged + debug_calls = [str(call) for call in mock_debug.call_args_list] + thread_started = any( + "Started MAPDL monitoring thread" in str(call) + for call in debug_calls + ) + + if should_monitor: + assert thread_started, "Monitoring thread should have started" + else: + assert ( + not thread_started + ), "Monitoring thread should not have started" + + finally: + # Restore original state + mapdl._local = original_local + mapdl._mapdl_process = original_process + mapdl._path = original_path + + +def test_multi_connect_monitoring_thread_cleanup(mapdl): + """Test that monitoring thread is properly cleaned up after connection.""" + import threading + + original_process = mapdl._mapdl_process + original_path = mapdl._path + original_local = mapdl._local + + try: + # Setup for monitoring + mapdl._local = True + mapdl._mapdl_process = Mock(pid=12345) + mapdl._path = "/some/path" + + # Mock _check_process_is_alive to not raise exceptions + with patch("ansys.mapdl.core.launcher._check_process_is_alive"): + # Mock psutil to say process exists + with patch("psutil.pid_exists", return_value=True): + # Mock _connect to succeed quickly + with patch.object(mapdl, "_connect", return_value=True): + # Track active threads before + threads_before = threading.active_count() + + # Call _multi_connect + mapdl._multi_connect(n_attempts=1, timeout=2) + + # Give a moment for thread cleanup + sleep(0.2) + + # Thread count should be back to normal (or close) + threads_after = threading.active_count() + # Allow for some variance in thread count + assert ( + abs(threads_after - threads_before) <= 1 + ), "Monitoring thread should be cleaned up" + + finally: + mapdl._local = original_local + mapdl._mapdl_process = original_process + mapdl._path = original_path + + +def test_multi_connect_monitor_detects_process_death(mapdl): + """Test that monitor thread detects when process dies during connection.""" + import time + + original_process = mapdl._mapdl_process + original_path = mapdl._path + original_local = mapdl._local + + try: + # Create a mock process that will "die" + mock_process = Mock() + mock_process.poll.return_value = 1 # Process has exited + + mapdl._local = True + mapdl._mapdl_process = mock_process + mapdl._path = "/some/path" + + # Mock _check_process_is_alive to raise exception (process died) + with patch("ansys.mapdl.core.launcher._check_process_is_alive") as mock_check: + mock_check.side_effect = MapdlDidNotStart("MAPDL process died.") + + # Mock _connect to always fail (would timeout normally) + with patch.object(mapdl, "_connect", return_value=False): + # This should raise an exception from the monitor, not timeout + start_time = time.time() + + with pytest.raises(MapdlDidNotStart, match="MAPDL process died"): + mapdl._multi_connect(n_attempts=5, timeout=10) + + elapsed = time.time() - start_time + + # Should fail quickly (within monitoring interval + some margin) + assert elapsed < 3, f"Should fail quickly, took {elapsed}s" + + finally: + mapdl._local = original_local + mapdl._mapdl_process = original_process + mapdl._path = original_path + + +def test_multi_connect_with_successful_connection_stops_monitoring(mapdl): + """Test that successful connection stops the monitoring thread.""" + import time + + original_process = mapdl._mapdl_process + original_path = mapdl._path + original_local = mapdl._local + + try: + mapdl._local = True + mapdl._mapdl_process = Mock(pid=12345) + mapdl._path = "/some/path" + + monitor_check_count = {"count": 0} + + def mock_check_process(*args, **kwargs): + monitor_check_count["count"] += 1 + time.sleep(0.1) + # Don't raise exception - process is alive + + with patch( + "ansys.mapdl.core.launcher._check_process_is_alive", + side_effect=mock_check_process, + ): + with patch("psutil.pid_exists", return_value=True): + # Make _connect succeed on second attempt + connect_attempts = {"count": 0} + + def mock_connect(*args, **kwargs): + connect_attempts["count"] += 1 + return connect_attempts["count"] >= 2 + + with patch.object(mapdl, "_connect", side_effect=mock_connect): + mapdl._multi_connect(n_attempts=5, timeout=10) + + # Give monitoring thread time to cleanup + time.sleep(0.5) + + # Store the count when connection succeeded + count_at_success = monitor_check_count["count"] + + # Wait a bit more + time.sleep(1.0) + + # Count should not increase much after connection success + # (maybe 1-2 more checks before thread stops) + assert ( + monitor_check_count["count"] - count_at_success <= 3 + ), "Monitor should stop checking after successful connection" + + finally: + mapdl._local = original_local + mapdl._mapdl_process = original_process + mapdl._path = original_path + + +def test_multi_connect_remote_no_monitoring(mapdl): + """Test that monitoring thread doesn't start for remote instances.""" + original_local = mapdl._local + + try: + # Set as remote instance + mapdl._local = False + mapdl._mapdl_process = Mock() # Even with a process + mapdl._path = "/some/path" # And a path + + with patch.object(mapdl, "_connect", return_value=True): + with patch.object(mapdl._log, "debug") as mock_debug: + mapdl._multi_connect(n_attempts=1, timeout=1) + + # Verify no monitoring thread was started + debug_calls = [str(call) for call in mock_debug.call_args_list] + thread_started = any( + "Started MAPDL monitoring thread" in str(call) + for call in debug_calls + ) + + assert ( + not thread_started + ), "Monitoring should not start for remote instances" + + finally: + mapdl._local = original_local