diff --git a/gprofiler/metadata/py_module_version.py b/gprofiler/metadata/py_module_version.py index e795f7afe..4b8312f69 100644 --- a/gprofiler/metadata/py_module_version.py +++ b/gprofiler/metadata/py_module_version.py @@ -57,7 +57,11 @@ def _get_packages_dir(file_path: str) -> Optional[str]: def _get_metadata(dist: pkg_resources.Distribution) -> Dict[str, str]: """Based on pip._internal.utils.get_metadata""" metadata_name = "METADATA" - if isinstance(dist, pkg_resources.DistInfoDistribution) and dist.has_metadata(metadata_name): + # Check if this distribution supports modern METADATA format + # Some distributions have METADATA, others only have PKG-INFO + is_dist_info = hasattr(dist, "_path") or dist.__class__.__name__ in ("DistInfoDistribution", "Distribution") + + if is_dist_info and dist.has_metadata(metadata_name): metadata = dist.get_metadata(metadata_name) elif dist.has_metadata("PKG-INFO"): metadata_name = "PKG-INFO" @@ -120,7 +124,7 @@ def _files_from_legacy(dist: pkg_resources.Distribution) -> Optional[Iterator[st return None paths = (p for p in text.splitlines(keepends=False) if p) root = dist.location - info = dist.egg_info + info = getattr(dist, "egg_info", None) if root is None or info is None: return paths try: diff --git a/gprofiler/utils/__init__.py b/gprofiler/utils/__init__.py index 91aaf4eb8..b1a8b3635 100644 --- a/gprofiler/utils/__init__.py +++ b/gprofiler/utils/__init__.py @@ -136,6 +136,7 @@ def start_process( **kwargs, ) + _processes.append(process) return process @@ -190,10 +191,28 @@ def reap_process(process: Popen) -> Tuple[int, bytes, bytes]: (see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait, and see ticket https://github.com/intel/gprofiler/issues/744). """ - stdout, stderr = process.communicate() - returncode = process.poll() - assert returncode is not None # only None if child has not terminated - return returncode, stdout, stderr + # If process is already terminated, don't try to communicate + if process.poll() is not None: + # Process already exited, just collect any remaining output + try: + stdout = process.stdout.read() if process.stdout and not process.stdout.closed else b"" + stderr = process.stderr.read() if process.stderr and not process.stderr.closed else b"" + except Exception: + stdout, stderr = b"", b"" + return process.returncode, stdout, stderr + + # Process still running, try normal communicate + try: + stdout, stderr = process.communicate() + return process.returncode, stdout, stderr + except ValueError as e: + if "flush of closed file" in str(e): + # Handle the race condition gracefully + returncode = process.wait() + stdout, stderr = b"", b"" + return returncode, stdout, stderr + else: + raise def _kill_and_reap_process(process: Popen, kill_signal: signal.Signals) -> Tuple[int, bytes, bytes]: diff --git a/gprofiler/utils/perf_process.py b/gprofiler/utils/perf_process.py index e8cbce27a..ddcadc777 100644 --- a/gprofiler/utils/perf_process.py +++ b/gprofiler/utils/perf_process.py @@ -37,6 +37,8 @@ class PerfProcess: # default number of pages used by "perf record" when perf_event_mlock_kb=516 # we use double for dwarf. _MMAP_SIZES = {"fp": 129, "dwarf": 257} + _RSS_GROWTH_THRESHOLD = 100 * 1024 * 1024 # 100MB in bytes + _BASELINE_COLLECTION_COUNT = 3 # Number of function calls to collect RSS before setting baseline def __init__( self, @@ -65,6 +67,8 @@ def __init__( self._extra_args = extra_args self._switch_timeout_s = switch_timeout_s self._process: Optional[Popen] = None + self._baseline_rss: Optional[int] = None + self._collected_rss_values: List[int] = [] @property def _log_name(self) -> str: @@ -132,6 +136,7 @@ def is_running(self) -> bool: def restart(self) -> None: self.stop() + self._clear_baseline_data() self.start() def restart_if_not_running(self) -> None: @@ -145,18 +150,50 @@ def restart_if_not_running(self) -> None: def restart_if_rss_exceeded(self) -> None: """Checks if perf used memory exceeds threshold, and if it does, restarts perf""" assert self._process is not None - perf_rss = Process(self._process.pid).memory_info().rss - if ( - time.monotonic() - self._start_time >= self._RESTART_AFTER_S - and perf_rss >= self._PERF_MEMORY_USAGE_THRESHOLD - ): + current_rss = Process(self._process.pid).memory_info().rss + + # Collect RSS readings for baseline calculation + if self._baseline_rss is None: + self._collected_rss_values.append(current_rss) + + if len(self._collected_rss_values) < self._BASELINE_COLLECTION_COUNT: + return # Still collecting, don't check thresholds yet + + # Calculate average from collected samples + self._baseline_rss = sum(self._collected_rss_values) // len(self._collected_rss_values) + logger.debug( + f"RSS baseline established for {self._log_name}", + collected_samples=self._collected_rss_values, + calculated_baseline=self._baseline_rss, + ) + + # Now check memory thresholds with established baseline + memory_growth = current_rss - self._baseline_rss + time_elapsed = time.monotonic() - self._start_time + + should_restart_time_based = ( + time_elapsed >= self._RESTART_AFTER_S and current_rss >= self._PERF_MEMORY_USAGE_THRESHOLD + ) + should_restart_growth_based = memory_growth > self._RSS_GROWTH_THRESHOLD + + if should_restart_time_based or should_restart_growth_based: + restart_cause = "time+memory limits" if should_restart_time_based else "memory growth" logger.debug( - f"Restarting {self._log_name} due to memory exceeding limit", - limit_rss=self._PERF_MEMORY_USAGE_THRESHOLD, - perf_rss=perf_rss, + f"Restarting {self._log_name} due to {restart_cause}", + current_rss=current_rss, + baseline_rss=self._baseline_rss, + memory_growth=memory_growth, + time_elapsed=time_elapsed, + threshold_limit=self._PERF_MEMORY_USAGE_THRESHOLD, ) + self._clear_baseline_data() self.restart() + def _clear_baseline_data(self) -> None: + """Reset baseline tracking for next process instance""" + self._baseline_rss = None + self._collected_rss_values = [] + def switch_output(self) -> None: assert self._process is not None, "profiling not started!" # clean stale files (can be emitted by perf timing out and switching output file). @@ -191,6 +228,17 @@ def wait_and_script(self) -> str: # (unlike Popen.communicate()) if self._process is not None and self._process.stderr is not None: logger.debug(f"{self._log_name} run output", perf_stderr=self._process.stderr.read1()) # type: ignore + # Safely drain stdout buffer without interfering with error handling + if self._process is not None and self._process.stdout is not None: + try: + # Use read1() to avoid blocking, but don't necessarily log it + stdout_data = self._process.stdout.read1() # type: ignore + # Only log if there's unexpected stdout data (diagnostic value) + if stdout_data: + logger.debug(f"{self._log_name} unexpected stdout", perf_stdout=stdout_data) + except (OSError, IOError): + # Handle case where stdout is already closed/broken + pass try: inject_data = Path(f"{str(perf_data)}.inject")