Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 65 additions & 94 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,69 @@ def __init__(
self._self_acquired_at: int = 0
self._self_name: Optional[str] = None

def acquire(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.acquire, *args, **kwargs)

def release(self, *args: Any, **kwargs: Any) -> Any:
return self._release(self.__wrapped__.release, *args, **kwargs)

def __aenter__(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs)

def __aexit__(self, *args: Any, **kwargs: Any) -> Any:
return self._release(self.__wrapped__.__aexit__, *args, **kwargs)

def __enter__(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.__enter__, *args, **kwargs)

def __exit__(self, *args: Any, **kwargs: Any) -> None:
self._release(self.__wrapped__.__exit__, *args, **kwargs)

def _process_sample(self, start: Optional[int], end: int = time.monotonic_ns()) -> None:
if not start:
return

thread_id: int
thread_name: str
thread_id, thread_name = _current_thread()

task_id: Optional[int]
task_name: Optional[str]
task_frame: Optional[FrameType]
task_id, task_name, task_frame = _task.get_task(thread_id)

lock_name: str = f"{self._self_init_loc}:{self._self_name}" if self._self_name else self._self_init_loc

"""
If we can't get the task frame, we use the caller frame.
We expect the following call combo be on the stack, so we go back 2 frames:
* acquire/release
* __enter__/__exit__
* __aenter__/__aexit__
"""
frame: FrameType = task_frame or sys._getframe(2)

frames: List[DDFrame]
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)

thread_native_id: int = _threading.get_thread_native_id(thread_id)

handle: ddup.SampleHandle = ddup.SampleHandle()
handle.push_monotonic_ns(end)
handle.push_lock_name(lock_name)
handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
handle.push_task_id(task_id)
handle.push_task_name(task_name)

if self._self_tracer:
handle.push_span(self._self_tracer.current_span())

for ddframe in frames:
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)

handle.flush_sample()

def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
if not self._self_capture_sampler.capture():
return inner_func(*args, **kwargs)
Expand All @@ -83,55 +140,15 @@ def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) ->
try:
return inner_func(*args, **kwargs)
finally:
try:
end: int = time.monotonic_ns()
self._self_acquired_at = end

thread_id: int
thread_name: str
thread_id, thread_name = _current_thread()

task_id: Optional[int]
task_name: Optional[str]
task_frame: Optional[FrameType]
task_id, task_name, task_frame = _task.get_task(thread_id)
end: int = time.monotonic_ns()

try:
self._maybe_update_self_name()
lock_name: str = (
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
)

frame: FrameType
if task_frame is None:
# If we can't get the task frame, we use the caller frame. We expect acquire/release or
# __enter__/__exit__ to be on the stack, so we go back 2 frames.
frame = sys._getframe(2)
else:
frame = task_frame

frames: List[DDFrame]
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)

thread_native_id: int = _threading.get_thread_native_id(thread_id)

handle: ddup.SampleHandle = ddup.SampleHandle()
handle.push_monotonic_ns(end)
handle.push_lock_name(lock_name)
handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
handle.push_task_id(task_id)
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span())
for ddframe in frames:
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
handle.flush_sample()
except Exception:
pass # nosec

def acquire(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.acquire, *args, **kwargs)
self._self_acquired_at = end
self._process_sample(start, end)

def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
# The underlying threading.Lock class is implemented using C code, and
Expand All @@ -146,64 +163,18 @@ def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) ->
# following statement code will raise an AttributeError. This should
# not be propagated to the caller and to the users. The inner_func
# will raise an RuntimeError as the threads are trying to release()
# and unlocked lock, and the expected behavior is to propagate that.
# an unlocked lock, and the expected behavior is to propagate that.
del self._self_acquired_at
except AttributeError:
# We just ignore the error, if the attribute is not found.
pass

try:
return inner_func(*args, **kwargs)
finally:
if start is not None:
end: int = time.monotonic_ns()

thread_id: int
thread_name: str
thread_id, thread_name = _current_thread()

task_id: Optional[int]
task_name: Optional[str]
task_frame: Optional[FrameType]
task_id, task_name, task_frame = _task.get_task(thread_id)

lock_name: str = (
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
)

frame: FrameType
if task_frame is None:
# See the comments in _acquire
frame = sys._getframe(2)
else:
frame = task_frame

frames: List[DDFrame]
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)

thread_native_id: int = _threading.get_thread_native_id(thread_id)

handle: ddup.SampleHandle = ddup.SampleHandle()
handle.push_monotonic_ns(end)
handle.push_lock_name(lock_name)
handle.push_release(end - start, 1) # AFAICT, capture_pct does not adjust anything here
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
handle.push_task_id(task_id)
handle.push_task_name(task_name)

if self._self_tracer is not None:
handle.push_span(self._self_tracer.current_span())
for ddframe in frames:
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
handle.flush_sample()
self._process_sample(start)

def release(self, *args: Any, **kwargs: Any) -> Any:
return self._release(self.__wrapped__.release, *args, **kwargs)

def __enter__(self, *args: Any, **kwargs: Any) -> Any:
return self._acquire(self.__wrapped__.__enter__, *args, **kwargs)

def __exit__(self, *args: Any, **kwargs: Any) -> None:
self._release(self.__wrapped__.__exit__, *args, **kwargs)
# TODO: do we need to update the self_name here with _maybe_update_self_name()?

def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]:
for name, value in var_dict.items():
Expand Down
Loading