Skip to content

Commit 1ea91ef

Browse files
reuse duplicated code between '_acquire' and '_release'
1 parent 8115b1b commit 1ea91ef

File tree

1 file changed

+55
-85
lines changed

1 file changed

+55
-85
lines changed

ddtrace/profiling/collector/_lock.py

Lines changed: 55 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ def acquire(self, *args: Any, **kwargs: Any) -> Any:
7474

7575
def release(self, *args: Any, **kwargs: Any) -> Any:
7676
return self._release(self.__wrapped__.release, *args, **kwargs)
77-
77+
7878
def __aenter__(self, *args: Any, **kwargs: Any) -> Any:
7979
return self._acquire(self.__wrapped__.__aenter__, *args, **kwargs)
8080

@@ -87,6 +87,51 @@ def __enter__(self, *args: Any, **kwargs: Any) -> Any:
8787
def __exit__(self, *args: Any, **kwargs: Any) -> None:
8888
self._release(self.__wrapped__.__exit__, *args, **kwargs)
8989

90+
def _process_sample(self, start: Optional[int], end: int = time.monotonic_ns()) -> None:
91+
if not start:
92+
return
93+
94+
thread_id: int
95+
thread_name: str
96+
thread_id, thread_name = _current_thread()
97+
98+
task_id: Optional[int]
99+
task_name: Optional[str]
100+
task_frame: Optional[FrameType]
101+
task_id, task_name, task_frame = _task.get_task(thread_id)
102+
103+
lock_name: str = f"{self._self_init_loc}:{self._self_name}" if self._self_name else self._self_init_loc
104+
105+
"""
106+
If we can't get the task frame, we use the caller frame.
107+
We expect the following call combo be on the stack, so we go back 2 frames:
108+
* acquire/release
109+
* __enter__/__exit__
110+
* __aenter__/__aexit__
111+
"""
112+
frame: FrameType = task_frame or sys._getframe(2)
113+
114+
frames: List[DDFrame]
115+
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
116+
117+
thread_native_id: int = _threading.get_thread_native_id(thread_id)
118+
119+
handle: ddup.SampleHandle = ddup.SampleHandle()
120+
handle.push_monotonic_ns(end)
121+
handle.push_lock_name(lock_name)
122+
handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here
123+
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
124+
handle.push_task_id(task_id)
125+
handle.push_task_name(task_name)
126+
127+
if self._self_tracer:
128+
handle.push_span(self._self_tracer.current_span())
129+
130+
for ddframe in frames:
131+
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
132+
133+
handle.flush_sample()
134+
90135
def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
91136
if not self._self_capture_sampler.capture():
92137
return inner_func(*args, **kwargs)
@@ -95,53 +140,16 @@ def _acquire(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) ->
95140
try:
96141
return inner_func(*args, **kwargs)
97142
finally:
98-
try:
99-
end: int = time.monotonic_ns()
100-
self._self_acquired_at = end
101-
102-
thread_id: int
103-
thread_name: str
104-
thread_id, thread_name = _current_thread()
105-
106-
task_id: Optional[int]
107-
task_name: Optional[str]
108-
task_frame: Optional[FrameType]
109-
task_id, task_name, task_frame = _task.get_task(thread_id)
143+
end: int = time.monotonic_ns()
110144

145+
try:
111146
self._maybe_update_self_name()
112-
lock_name: str = (
113-
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
114-
)
115-
116-
frame: FrameType
117-
if task_frame is None:
118-
# If we can't get the task frame, we use the caller frame. We expect acquire/release or
119-
# __enter__/__exit__ to be on the stack, so we go back 2 frames.
120-
frame = sys._getframe(2)
121-
else:
122-
frame = task_frame
123-
124-
frames: List[DDFrame]
125-
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
126-
127-
thread_native_id: int = _threading.get_thread_native_id(thread_id)
128-
129-
handle: ddup.SampleHandle = ddup.SampleHandle()
130-
handle.push_monotonic_ns(end)
131-
handle.push_lock_name(lock_name)
132-
handle.push_acquire(end - start, 1) # AFAICT, capture_pct does not adjust anything here
133-
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
134-
handle.push_task_id(task_id)
135-
handle.push_task_name(task_name)
136-
137-
if self._self_tracer is not None:
138-
handle.push_span(self._self_tracer.current_span())
139-
for ddframe in frames:
140-
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
141-
handle.flush_sample()
142147
except Exception:
143148
pass # nosec
144149

150+
self._self_acquired_at = end
151+
self._process_sample(start, end)
152+
145153
def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) -> None:
146154
# The underlying threading.Lock class is implemented using C code, and
147155
# it doesn't have the __dict__ attribute. So we can't do
@@ -155,7 +163,7 @@ def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) ->
155163
# following statement code will raise an AttributeError. This should
156164
# not be propagated to the caller and to the users. The inner_func
157165
# will raise an RuntimeError as the threads are trying to release()
158-
# and unlocked lock, and the expected behavior is to propagate that.
166+
# an unlocked lock, and the expected behavior is to propagate that.
159167
del self._self_acquired_at
160168
except AttributeError:
161169
# We just ignore the error, if the attribute is not found.
@@ -164,47 +172,9 @@ def _release(self, inner_func: Callable[..., Any], *args: Any, **kwargs: Any) ->
164172
try:
165173
return inner_func(*args, **kwargs)
166174
finally:
167-
if start is not None:
168-
end: int = time.monotonic_ns()
169-
170-
thread_id: int
171-
thread_name: str
172-
thread_id, thread_name = _current_thread()
173-
174-
task_id: Optional[int]
175-
task_name: Optional[str]
176-
task_frame: Optional[FrameType]
177-
task_id, task_name, task_frame = _task.get_task(thread_id)
178-
179-
lock_name: str = (
180-
"%s:%s" % (self._self_init_loc, self._self_name) if self._self_name else self._self_init_loc
181-
)
182-
183-
frame: FrameType
184-
if task_frame is None:
185-
# See the comments in _acquire
186-
frame = sys._getframe(2)
187-
else:
188-
frame = task_frame
189-
190-
frames: List[DDFrame]
191-
frames, _ = _traceback.pyframe_to_frames(frame, self._self_max_nframes)
192-
193-
thread_native_id: int = _threading.get_thread_native_id(thread_id)
194-
195-
handle: ddup.SampleHandle = ddup.SampleHandle()
196-
handle.push_monotonic_ns(end)
197-
handle.push_lock_name(lock_name)
198-
handle.push_release(end - start, 1) # AFAICT, capture_pct does not adjust anything here
199-
handle.push_threadinfo(thread_id, thread_native_id, thread_name)
200-
handle.push_task_id(task_id)
201-
handle.push_task_name(task_name)
202-
203-
if self._self_tracer is not None:
204-
handle.push_span(self._self_tracer.current_span())
205-
for ddframe in frames:
206-
handle.push_frame(ddframe.function_name, ddframe.file_name, 0, ddframe.lineno)
207-
handle.flush_sample()
175+
self._process_sample(start)
176+
177+
# TODO: do we need to update the self_name here with _maybe_update_self_name()?
208178

209179
def _find_self_name(self, var_dict: Dict[str, Any]) -> Optional[str]:
210180
for name, value in var_dict.items():

0 commit comments

Comments
 (0)