Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ddtrace/_trace/tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ def sample(self, span):
self._sampler.sample(span)

def _sample_before_fork(self) -> None:
if isinstance(self._span_aggregator.writer, AgentWriterInterface):
self._span_aggregator.writer.before_fork()
span = self.current_root_span()
if span is not None and span.context.sampling_priority is None:
self.sample(span)
Expand Down
33 changes: 33 additions & 0 deletions ddtrace/internal/writer/writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import abc
import binascii
from collections import defaultdict
import functools
import gzip
import logging
import os
Expand All @@ -12,8 +13,10 @@
from typing import List
from typing import Optional
from typing import TextIO
import weakref

from ddtrace import config
from ddtrace.internal import forksafe
from ddtrace.internal.dist_computing.utils import in_ray_job
from ddtrace.internal.hostname import get_hostname
import ddtrace.internal.native as native
Expand Down Expand Up @@ -77,6 +80,29 @@ class NoEncodableSpansError(Exception):
DEFAULT_SMA_WINDOW = 10


def make_weak_method_hook(bound_method):
"""
Wrap a bound method so that it is called via a weakref to its instance.
If the instance has been garbage-collected, the hook is a no-op.
"""
if not hasattr(bound_method, "__self__") or bound_method.__self__ is None:
raise TypeError("make_weak_method_hook expects a bound method")

instance = bound_method.__self__
func = bound_method.__func__
instance_ref = weakref.ref(instance)

@functools.wraps(func)
def hook(*args, **kwargs):
inst = instance_ref()
if inst is None:
# The instance was garbage-collected
return
return func(inst, *args, **kwargs)

return hook


def _human_size(nbytes: float) -> str:
"""Return a human-readable size."""
i = 0
Expand Down Expand Up @@ -777,6 +803,11 @@ def __init__(
self._max_payload_size = max_payload_size
self._test_session_token = test_session_token

fork_hook = make_weak_method_hook(self.before_fork)

forksafe.register_before_fork(fork_hook)
self._fork_hook = fork_hook

self._clients = [client]
self.dogstatsd = dogstatsd
self._metrics: Dict[str, int] = defaultdict(int)
Expand Down Expand Up @@ -1064,7 +1095,9 @@ def _stop_service(
) -> None:
# FIXME: don't join() on stop(), let the caller handle this
super(NativeWriter, self)._stop_service()
forksafe.unregister_before_fork(self._fork_hook)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to unregister if the writer is GC'd, but not due to a fork? Or does it get automatically cleaned up?

self.join(timeout=timeout)
self.before_fork()

def before_fork(self) -> None:
self._exporter.stop_worker()
Expand Down
Loading