diff --git a/ddtrace/_trace/tracer.py b/ddtrace/_trace/tracer.py index 7ea9be3b4df..d6877c5d1a7 100644 --- a/ddtrace/_trace/tracer.py +++ b/ddtrace/_trace/tracer.py @@ -203,8 +203,6 @@ def sample(self, span): self._sampler.sample(span) def _sample_before_fork(self) -> None: - if isinstance(self._span_aggregator.writer, AgentWriterInterface): - self._span_aggregator.writer.before_fork() span = self.current_root_span() if span is not None and span.context.sampling_priority is None: self.sample(span) diff --git a/ddtrace/internal/writer/writer.py b/ddtrace/internal/writer/writer.py index cd049c4e00c..8d0f849d4f6 100644 --- a/ddtrace/internal/writer/writer.py +++ b/ddtrace/internal/writer/writer.py @@ -1,6 +1,7 @@ import abc import binascii from collections import defaultdict +import functools import gzip import logging import os @@ -12,8 +13,10 @@ from typing import List from typing import Optional from typing import TextIO +import weakref from ddtrace import config +from ddtrace.internal import forksafe from ddtrace.internal.dist_computing.utils import in_ray_job from ddtrace.internal.hostname import get_hostname import ddtrace.internal.native as native @@ -77,6 +80,29 @@ class NoEncodableSpansError(Exception): DEFAULT_SMA_WINDOW = 10 +def make_weak_method_hook(bound_method): + """ + Wrap a bound method so that it is called via a weakref to its instance. + If the instance has been garbage-collected, the hook is a no-op. + """ + if not hasattr(bound_method, "__self__") or bound_method.__self__ is None: + raise TypeError("make_weak_method_hook expects a bound method") + + instance = bound_method.__self__ + func = bound_method.__func__ + instance_ref = weakref.ref(instance) + + @functools.wraps(func) + def hook(*args, **kwargs): + inst = instance_ref() + if inst is None: + # The instance was garbage-collected + return + return func(inst, *args, **kwargs) + + return hook + + def _human_size(nbytes: float) -> str: """Return a human-readable size.""" i = 0 @@ -777,6 +803,11 @@ def __init__( self._max_payload_size = max_payload_size self._test_session_token = test_session_token + fork_hook = make_weak_method_hook(self.before_fork) + + forksafe.register_before_fork(fork_hook) + self._fork_hook = fork_hook + self._clients = [client] self.dogstatsd = dogstatsd self._metrics: Dict[str, int] = defaultdict(int) @@ -1064,7 +1095,9 @@ def _stop_service( ) -> None: # FIXME: don't join() on stop(), let the caller handle this super(NativeWriter, self)._stop_service() + forksafe.unregister_before_fork(self._fork_hook) self.join(timeout=timeout) + self.before_fork() def before_fork(self) -> None: self._exporter.stop_worker()