diff --git a/ddtrace/profiling/collector/_lock.py b/ddtrace/profiling/collector/_lock.py index f2a542c19c7..e297b46b07e 100644 --- a/ddtrace/profiling/collector/_lock.py +++ b/ddtrace/profiling/collector/_lock.py @@ -69,12 +69,69 @@ def __init__( self._self_acquired_at: int = 0 self._self_name: Optional[str] = None + def acquire(self, *args: Any, **kwargs: Any) -> Any: + return self._acquire(self.__wrapped__.acquire, *args, **kwargs) + + def release(self, *args: Any, **kwargs: Any) -> Any: + return self._release(self.__wrapped__.release, *args, **kwargs) + def __aenter__(self, *args: Any, **kwargs: Any) -> Any: return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs) def __aexit__(self, *args: Any, **kwargs: Any) -> Any: return self._release(self.__wrapped__.__aexit__, *args, **kwargs) + def __enter__(self, *args: Any, **kwargs: Any) -> Any: + return self._acquire(self.__wrapped__.__enter__, *args, **kwargs) + + def __exit__(self, *args: Any, **kwargs: Any) -> None: + self._release(self.__wrapped__.__exit__, *args, **kwargs) + + def _process_sample(self, start: Optional[int], end: int = time.monotonic_ns()) -> None: + if not start: + return + + thread_id: int + thread_name: str + thread_id, thread_name = _current_thread() + + task_id: Optional[int] + task_name: Optional[str] + task_frame: Optional[FrameType] + task_id, task_name, task_frame = _task.get_task(thread_id) + + lock_name: str = f"{self._self_init_loc}:{self._self_name}" if self._self_name else self._self_init_loc + + """ + If we can't get the task frame, we use the caller frame. + We expect the following call combo be on the stack, so we go back 2 frames: + * acquire/release + * __enter__/__exit__ + * __aenter__/__aexit__ + """ + frame: FrameType = task_frame or sys._getframe(2) + + frames: List[DDFrame] + frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) + + thread_native_id: int = _threading.get_thread_native_id(thread_id) + + handle: ddup.SampleHandle = ddup.SampleHandle() + handle.push_monotonic_ns(end) + handle.push_lock_name(lock_name) + handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here + handle.push_threadinfo(thread_id, thread_native_id, thread_name) + handle.push_task_id(task_id) + handle.push_task_name(task_name) + + if self._self_tracer: + handle.push_span(self._self_tracer.current_span()) + + for ddframe in frames: + handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) + + handle.flush_sample() + def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any: if not self._self_capture_sampler.capture(): return inner_func(*args, **kwargs) @@ -83,55 +140,15 @@ def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> try: return inner_func(*args, **kwargs) finally: - try: - end: int = time.monotonic_ns() - self._self_acquired_at = end - - thread_id: int - thread_name: str - thread_id, thread_name = _current_thread() - - task_id: Optional[int] - task_name: Optional[str] - task_frame: Optional[FrameType] - task_id, task_name, task_frame = _task.get_task(thread_id) + end: int = time.monotonic_ns() + try: self._maybe_update_self_name() - lock_name: str = ( - "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc - ) - - frame: FrameType - if task_frame is None: - # If we can't get the task frame, we use the caller frame. We expect acquire/release or - # __enter__/__exit__ to be on the stack, so we go back 2 frames. - frame = sys._getframe(2) - else: - frame = task_frame - - frames: List[DDFrame] - frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - - thread_native_id: int = _threading.get_thread_native_id(thread_id) - - handle: ddup.SampleHandle = ddup.SampleHandle() - handle.push_monotonic_ns(end) - handle.push_lock_name(lock_name) - handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here - handle.push_threadinfo(thread_id, thread_native_id, thread_name) - handle.push_task_id(task_id) - handle.push_task_name(task_name) - - if self._self_tracer is not None: - handle.push_span(self._self_tracer.current_span()) - for ddframe in frames: - handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) - handle.flush_sample() except Exception: pass # nosec - def acquire(self, *args: Any, **kwargs: Any) -> Any: - return self._acquire(self.__wrapped__.acquire, *args, **kwargs) + self._self_acquired_at = end + self._process_sample(start, end) def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None: # The underlying threading.Lock class is implemented using C code, and @@ -146,64 +163,18 @@ def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> # following statement code will raise an AttributeError. This should # not be propagated to the caller and to the users. The inner_func # will raise an RuntimeError as the threads are trying to release() - # and unlocked lock, and the expected behavior is to propagate that. + # an unlocked lock, and the expected behavior is to propagate that. del self._self_acquired_at except AttributeError: # We just ignore the error, if the attribute is not found. pass + try: return inner_func(*args, **kwargs) finally: - if start is not None: - end: int = time.monotonic_ns() - - thread_id: int - thread_name: str - thread_id, thread_name = _current_thread() - - task_id: Optional[int] - task_name: Optional[str] - task_frame: Optional[FrameType] - task_id, task_name, task_frame = _task.get_task(thread_id) - - lock_name: str = ( - "%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc - ) - - frame: FrameType - if task_frame is None: - # See the comments in _acquire - frame = sys._getframe(2) - else: - frame = task_frame - - frames: List[DDFrame] - frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes) - - thread_native_id: int = _threading.get_thread_native_id(thread_id) - - handle: ddup.SampleHandle = ddup.SampleHandle() - handle.push_monotonic_ns(end) - handle.push_lock_name(lock_name) - handle.push_release(end - start, 1) # AFAICT, capture_pct does not adjust anything here - handle.push_threadinfo(thread_id, thread_native_id, thread_name) - handle.push_task_id(task_id) - handle.push_task_name(task_name) - - if self._self_tracer is not None: - handle.push_span(self._self_tracer.current_span()) - for ddframe in frames: - handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno) - handle.flush_sample() + self._process_sample(start) - def release(self, *args: Any, **kwargs: Any) -> Any: - return self._release(self.__wrapped__.release, *args, **kwargs) - - def __enter__(self, *args: Any, **kwargs: Any) -> Any: - return self._acquire(self.__wrapped__.__enter__, *args, **kwargs) - - def __exit__(self, *args: Any, **kwargs: Any) -> None: - self._release(self.__wrapped__.__exit__, *args, **kwargs) + # TODO: do we need to update the self_name here with _maybe_update_self_name()? def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]: for name, value in var_dict.items():