diff --git a/docs/TelemetryDevelopment.md b/docs/TelemetryDevelopment.md new file mode 100644 index 00000000000..cd6fee4ddb5 --- /dev/null +++ b/docs/TelemetryDevelopment.md @@ -0,0 +1,66 @@ +# Telemetry Development + +## Telemetry Presence + +`dd-trace-rb` is written to assume that the telemetry component is always +present. If telemetry is disabled, the component is still created but does +nothing. + +Most components call methods on `telemetry` unconditionally. There are two +exceptons: DI and Data Streams are written to assume that `telemetry` may be nil. +However, this assumption is not necessary and these components may be +changed in the future to assume that `telemetry` is always present. + +## Event Submission Prior To Start + +Telemetry is unique among other components in that it permits events to be +submitted to it prior to its worker starting. This is done so that errors +during `Datadog.configure` processing can be reported via telemetry, because +the errors can be produced prior to telemetry worker starting. The telemetry +component keeps the events and sends them after the worker starts. + +## Initial Event + +`dd-trace-rb` can be initialized multiple times during application boot. +For example, if customers follow our documentation and require +`datadog/auto_instrument`, and call `Datadog.configure`, they would get +`Datadog.configure` invoked two times total (the first time by `auto_instrument`) +and thus telemetry instance would be created twice. This happens in the +applications used with system tests. + +System tests, on the other hand, require that there is only one `app-started` +event emitted, because they think the application is launched once. +To deal with this we have a hack in the telemetry code to send an +`app-client-configuration-change` event instead of the second `app-started` +event. This is implemented via the `SynthAppClientConfigurationChange` class. + +## Fork Handling + +We must send telemetry data from forked children. + +Telemetry started out as a diagnostic tool used during application boot, +but is now used for reporting application liveness (and settings/state) +throughout the application lifetime. Live Debugger / Dynamic Instrumentation, +for example, require ongoing `app-heartbeat` events emitted via telemetry +to provide a working UI to customers. + +It is somewhat common for customers to preload the application in the parent +web server process and process requests from children. This means telemetry +is initialized from the parent process, and it must emit events in the +forked children. + +We use the standard worker `after_fork` handler to recreated the worker +thread in forked children. However, there are two caveats to keep in mind +which are specific to telemetry: + +1. Due to telemetry permitting event submission prior to its start, it is +not sufficient to simply reset the state from the worker's `perform` method, +as is done in other components. We must only reset the state when we are +in the forked child, otherwise we'll trash any events submitted to telemetry +prior to its worker starting. + +2. The child process is a brand new application as far as the backend/UI is +concerned, having a new runtime ID, and therefore the initial event in the +forked child must always be `app-started`. Since we track the initial event +in the telemetry component, this event must be changed to `app-started` in +forked children regardless of what it was in the parent. diff --git a/lib/datadog/core/telemetry/component.rb b/lib/datadog/core/telemetry/component.rb index d29dfa6f654..e67c15f4dd6 100644 --- a/lib/datadog/core/telemetry/component.rb +++ b/lib/datadog/core/telemetry/component.rb @@ -14,14 +14,23 @@ module Datadog module Core module Telemetry - # Telemetry entrypoint, coordinates sending telemetry events at various points in app lifecycle. - # Note: Telemetry does not spawn its worker thread in fork processes, thus no telemetry is sent in forked processes. + # Telemetry entry point, coordinates sending telemetry events at + # various points in application lifecycle. # # @api private class Component ENDPOINT_COLLECTION_MESSAGE_LIMIT = 300 - attr_reader :enabled, :logger, :transport, :worker + attr_reader :enabled + attr_reader :logger + attr_reader :transport + attr_reader :worker + attr_reader :settings + attr_reader :agent_settings + + # Alias for consistency with other components. + # TODO Remove +enabled+ method + alias_method :enabled?, :enabled include Core::Utils::Forking include Telemetry::Logging @@ -110,7 +119,7 @@ def disable! end def start(initial_event_is_change = false, components:) - return if !@enabled + return unless enabled? initial_event = if initial_event_is_change Event::SynthAppClientConfigurationChange.new( @@ -136,19 +145,19 @@ def shutdown! end def emit_closing! - return if !@enabled || forked? + return unless enabled? @worker.enqueue(Event::AppClosing.new) end def integrations_change! - return if !@enabled || forked? + return unless enabled? @worker.enqueue(Event::AppIntegrationsChange.new) end def log!(event) - return if !@enabled || forked? || !@log_collection_enabled + return unless enabled? && @log_collection_enabled @worker.enqueue(event) end @@ -159,21 +168,21 @@ def log!(event) # # @api private def flush - return if !@enabled || forked? + return unless enabled? @worker.flush end # Report configuration changes caused by Remote Configuration. def client_configuration_change!(changes) - return if !@enabled || forked? + return unless enabled? @worker.enqueue(Event::AppClientConfigurationChange.new(changes, 'remote_config')) end # Report application endpoints def app_endpoints_loaded(endpoints, page_size: ENDPOINT_COLLECTION_MESSAGE_LIMIT) - return if !@enabled || forked? + return unless enabled? endpoints.each_slice(page_size).with_index do |endpoints_slice, i| @worker.enqueue(Event::AppEndpointsLoaded.new(endpoints_slice, is_first: i.zero?)) diff --git a/lib/datadog/core/telemetry/event/app_started.rb b/lib/datadog/core/telemetry/event/app_started.rb index 13cd2bda259..dd4c4ed6e83 100644 --- a/lib/datadog/core/telemetry/event/app_started.rb +++ b/lib/datadog/core/telemetry/event/app_started.rb @@ -11,6 +11,12 @@ class AppStarted < Base def initialize(components:) # To not hold a reference to the component tree, generate # the event payload here in the constructor. + # + # Important: do not store data that contains (or is derived from) + # the runtime_id or sequence numbers. + # This event is reused when a process forks, but in the + # child process the runtime_id would be different and sequence + # number is reset. @configuration = configuration(components.settings, components.agent_settings) @install_signature = install_signature(components.settings) @products = products(components) @@ -30,6 +36,15 @@ def payload } end + # Whether the event is actually the app-started event. + # For the app-started event we follow up by sending + # app-dependencies-loaded, if the event is + # app-client-configuration-change we don't send + # app-dependencies-loaded. + def app_started? + true + end + private def products(components) diff --git a/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb b/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb index 6ddbe9cffeb..de9e9bd7d18 100644 --- a/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb +++ b/lib/datadog/core/telemetry/event/synth_app_client_configuration_change.rb @@ -28,13 +28,36 @@ module Event # and app-closing events. class SynthAppClientConfigurationChange < AppStarted def type - 'app-client-configuration-change' + if reset? + super + else + 'app-client-configuration-change' + end end def payload - { - configuration: @configuration, - } + if reset? + super + else + { + configuration: @configuration, + } + end + end + + def app_started? + reset? + end + + # Revert this event to a "regular" AppStarted event. + # + # Used in after_fork to send the AppStarted event in child processes. + def reset! + @reset = true + end + + def reset? + !!@reset end end end diff --git a/lib/datadog/core/telemetry/worker.rb b/lib/datadog/core/telemetry/worker.rb index 2a0e3a0b915..5f6120234fe 100644 --- a/lib/datadog/core/telemetry/worker.rb +++ b/lib/datadog/core/telemetry/worker.rb @@ -9,7 +9,10 @@ module Datadog module Core module Telemetry - # Accumulates events and sends them to the API at a regular interval, including heartbeat event. + # Accumulates events and sends them to the API at a regular interval, + # including heartbeat event. + # + # @api private class Worker include Core::Workers::Queue include Core::Workers::Polling @@ -40,11 +43,15 @@ def initialize( self.enabled = enabled # Workers::IntervalLoop settings self.loop_base_interval = metrics_aggregation_interval_seconds - self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_STOP + self.fork_policy = Core::Workers::Async::Thread::FORK_POLICY_RESTART @shutdown_timeout = shutdown_timeout @buffer_size = buffer_size + initialize_state + end + + private def initialize_state self.buffer = buffer_klass.new(@buffer_size) @initial_event_once = Utils::OnlyOnceSuccessful.new(APP_STARTED_EVENT_RETRIES) @@ -53,12 +60,13 @@ def initialize( attr_reader :logger attr_reader :initial_event_once attr_reader :initial_event + attr_reader :emitter # Returns true if worker thread is successfully started, # false if worker thread was not started but telemetry is enabled, # nil if telemetry is disabled. def start(initial_event) - return if !enabled? || forked? + return unless enabled? @initial_event = initial_event @@ -79,7 +87,21 @@ def stop(force_stop = false, timeout = @shutdown_timeout) # for not enqueueing event (presently) is that telemetry is disabled # altogether, and in this case other methods return nil. def enqueue(event) - return if !enabled? || forked? + return unless enabled? + + # Start the worker if needed, including in forked children. + # Needs to be done before pushing to buffer since perform + # may invoke after_fork handler which resets the buffer. + # + # Telemetry is special in that it permits events to be submitted + # to the worker with the worker not running, and the worker is + # explicitly started later (to maintain proper initialization order). + # Thus here we can't just call perform unconditionally and must + # check if the worker is supposed to be running, and only call + # perform in that case. + if worker && !worker.alive? + perform + end buffer.push(event) true @@ -133,7 +155,7 @@ def flush private def perform(*events) - return if !enabled? || forked? + return unless enabled? if need_initial_event? started! @@ -189,7 +211,9 @@ def started! # dependencies and send the new ones. # System tests demand only one instance of this event per # dependency. - send_event(Event::AppDependenciesLoaded.new) if @dependency_collection && initial_event.class.eql?(Telemetry::Event::AppStarted) # standard:disable Style/ClassEqualityComparison: + if @dependency_collection && initial_event.app_started? + send_event(Event::AppDependenciesLoaded.new) + end true else @@ -240,6 +264,27 @@ def disable_on_not_found!(response) disable! end + # Stop the worker after fork without sending closing event. + # The closing event will be (or should be) sent by the worker + # in the parent process. + # Also, discard any accumulated events since they will be sent by + # the parent. + def after_fork + # If telemetry is disabled, we still reset the state to avoid + # having wrong state. It is possible that in the future telemetry + # will be re-enabled after errors. + initialize_state + # In the child process, we get a new runtime_id. + # As such we need to send AppStarted event. + # In the parent process, the event may have been the + # SynthAppClientConfigurationChange instead of AppStarted, + # and in that case we need to convert it to the "regular" + # AppStarted event. + if @initial_event.is_a?(Event::SynthAppClientConfigurationChange) + @initial_event.reset! # steep:ignore + end + end + # Deduplicate logs by counting the number of repeated occurrences of the same log # entry and replacing them with a single entry with the calculated `count` value. # Non-log events are unchanged. diff --git a/sig/datadog/core/telemetry/component.rbs b/sig/datadog/core/telemetry/component.rbs index 0d59476e33f..ef1e2ced28c 100644 --- a/sig/datadog/core/telemetry/component.rbs +++ b/sig/datadog/core/telemetry/component.rbs @@ -23,6 +23,8 @@ module Datadog def self.build: (untyped settings, Datadog::Core::Configuration::AgentSettings agent_settings, Datadog::Core::Logger logger) -> Component def initialize: (logger: Core::Logger, settings: untyped, agent_settings: Datadog::Core::Configuration::AgentSettings, enabled: true | false) -> void + + def enabled?: -> bool def disable!: () -> void diff --git a/sig/datadog/core/telemetry/event/app_started.rbs b/sig/datadog/core/telemetry/event/app_started.rbs index d0b7871149e..68bfba41755 100644 --- a/sig/datadog/core/telemetry/event/app_started.rbs +++ b/sig/datadog/core/telemetry/event/app_started.rbs @@ -8,6 +8,8 @@ module Datadog def type: () -> "app-started" def payload: () -> { products: untyped, configuration: untyped, install_signature: untyped } + + def app_started?: -> bool private diff --git a/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs b/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs index 24331290384..188d7bcc9b1 100644 --- a/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs +++ b/sig/datadog/core/telemetry/event/synth_app_client_configuration_change.rbs @@ -3,9 +3,13 @@ module Datadog module Telemetry module Event class SynthAppClientConfigurationChange < AppStarted - def type: () -> "app-client-configuration-change" + def type: -> ("app-client-configuration-change" | "app-started") - def payload: () -> { configuration: untyped } + def payload: () -> { ?products: untyped, configuration: untyped, ?install_signature: untyped } + + def reset?: -> bool + + def reset!: -> void end end end diff --git a/sig/datadog/core/telemetry/worker.rbs b/sig/datadog/core/telemetry/worker.rbs index 4db805e7221..06b869a3d1b 100644 --- a/sig/datadog/core/telemetry/worker.rbs +++ b/sig/datadog/core/telemetry/worker.rbs @@ -22,10 +22,11 @@ module Datadog @logger: ::Logger attr_reader logger: ::Logger - attr_reader initial_event: Telemetry::Event::Base + attr_reader initial_event: Telemetry::Event::AppStarted attr_reader initial_event_once: Datadog::Core::Utils::OnlyOnceSuccessful def initialize: (?enabled: bool, heartbeat_interval_seconds: Float, metrics_aggregation_interval_seconds: Float, emitter: Emitter, metrics_manager: MetricsManager, ?shutdown_timeout: Float | Integer, ?buffer_size: Integer, dependency_collection: bool, logger: ::Logger) -> void + def initialize_state: -> void def start: (Telemetry::Event::Base initial_event) -> void diff --git a/sig/datadog/core/utils/only_once_successful.rbs b/sig/datadog/core/utils/only_once_successful.rbs index 2236b5a66b5..bf43c172d05 100644 --- a/sig/datadog/core/utils/only_once_successful.rbs +++ b/sig/datadog/core/utils/only_once_successful.rbs @@ -8,15 +8,15 @@ module Datadog def initialize: (?Integer limit) -> void - def success?: () -> bool + def success?: -> bool - def failed?: () -> bool + def failed?: -> bool private - def check_limit!: () -> void + def check_limit!: -> void - def limited?: () -> bool + def limited?: -> bool end end end diff --git a/spec/datadog/core/telemetry/component_spec.rb b/spec/datadog/core/telemetry/component_spec.rb index b32c5ab9a7a..e2e65f112f2 100644 --- a/spec/datadog/core/telemetry/component_spec.rb +++ b/spec/datadog/core/telemetry/component_spec.rb @@ -365,7 +365,7 @@ event = instance_double(Datadog::Core::Telemetry::Event::Log) telemetry.log!(event) - expect(worker).not_to have_received(:enqueue) + expect(worker).to have_received(:enqueue).with(event) end end end diff --git a/spec/datadog/core/telemetry/integration/full_integration_spec.rb b/spec/datadog/core/telemetry/integration/full_integration_spec.rb index 2b1fbb54830..8bd6f55bef6 100644 --- a/spec/datadog/core/telemetry/integration/full_integration_spec.rb +++ b/spec/datadog/core/telemetry/integration/full_integration_spec.rb @@ -5,6 +5,8 @@ require 'datadog/core/telemetry/component' RSpec.describe 'Telemetry full integration tests' do + skip_unless_integration_testing_enabled + context 'when Datadog.configure is used' do let(:worker1) do double(Datadog::Core::Telemetry::Worker) diff --git a/spec/datadog/core/telemetry/integration/telemetry_spec.rb b/spec/datadog/core/telemetry/integration/telemetry_spec.rb index fdc640605fe..1793b4760ae 100644 --- a/spec/datadog/core/telemetry/integration/telemetry_spec.rb +++ b/spec/datadog/core/telemetry/integration/telemetry_spec.rb @@ -5,6 +5,8 @@ require 'datadog/core/telemetry/component' RSpec.describe 'Telemetry integration tests' do + skip_unless_integration_testing_enabled + # Although the tests override the environment variables, if any, # with programmatic configuration, that may produce warnings from the # configuration code. Remove environment variables to suppress the warnings. @@ -467,4 +469,152 @@ # Network I/O is mocked end end + + context 'when process forks' do + skip_unless_fork_supported + + # The mode is irrelevant but we need settings from the context. + include_context 'agent mode' + + context 'when telemetry is disabled' do + before do + settings.telemetry.enabled = false + end + + it 'stays disabled in child process' do + expect(component.enabled?).to be false + expect(component.worker).to be nil + + expect_in_fork do + expect(component.enabled?).to be false + expect(component.worker).to be nil + end + end + end + + context 'when telemetry is enabled' do + before do + settings.telemetry.enabled = true + end + + it 'stays enabled in child process' do + expect(component.enabled?).to be true + expect(component.worker).to be_a(Datadog::Core::Telemetry::Worker) + expect(component.worker.enabled?).to be true + + expect_in_fork do + expect(component.enabled?).to be true + expect(component.worker.enabled?).to be true + end + end + + context 'when worker is running' do + let(:initial_event) do + Datadog::Core::Telemetry::Event::AppStarted.new(components: Datadog.send(:components)) + end + + before do + component.worker.start(initial_event) + end + + it 'restarts worker when event is enqueued' do + expect(component.enabled?).to be true + expect(component.worker).to be_a(Datadog::Core::Telemetry::Worker) + expect(component.worker.enabled?).to be true + expect(component.worker.running?).to be true + + expect_in_fork do + expect(component.enabled?).to be true + expect(component.worker.enabled?).to be true + expect(component.worker.running?).to be false + + # Queueing an event will restart the worker in the forked child. + component.worker.enqueue(Datadog::Core::Telemetry::Event::AppHeartbeat.new) + + expect(component.worker.running?).to be true + end + end + end + + describe 'events generated in forked child' do + # Behavior in the child should be the same regardless of what + # was sent in the parent, because the child is a new application + # (process) from the backend's perspective. + def fork_and_assert + sent_payloads.clear + + expect_in_fork do + component.worker.enqueue(Datadog::Core::Telemetry::Event::AppHeartbeat.new) + + component.flush + end + + expect(sent_payloads.length).to eq 3 + + payload = sent_payloads[0].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-started', + ) + payload = sent_payloads[1].fetch(:payload) + # The app-dependencies-loaded assertion is also critical here, + # since there is no other test coverage for the + # app-dependencies-loaded event being sent in the forked child. + expect(payload).to include( + 'request_type' => 'app-dependencies-loaded', + ) + payload = sent_payloads[2].fetch(:payload) + expect(payload).to include( + 'request_type' => 'message-batch', + ) + expect(payload.fetch('payload').first).to include( + 'request_type' => 'app-heartbeat', + ) + end + + context 'when initial event is SynthAppClientConfigurationChange' do + let(:initial_event) do + Datadog::Core::Telemetry::Event::SynthAppClientConfigurationChange.new(components: Datadog.send(:components)) + end + + it 'produces correct events in the child' do + component.worker.start(initial_event) + component.flush + + expect(sent_payloads.length).to eq 1 + + payload = sent_payloads[0].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-client-configuration-change', + ) + + fork_and_assert + end + end + + context 'when initial event is AppStarted' do + let(:initial_event) do + Datadog::Core::Telemetry::Event::AppStarted.new(components: Datadog.send(:components)) + end + + it 'produces correct events in the child' do + component.worker.start(initial_event) + component.flush + + expect(sent_payloads.length).to eq 2 + + payload = sent_payloads[0].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-started', + ) + payload = sent_payloads[1].fetch(:payload) + expect(payload).to include( + 'request_type' => 'app-dependencies-loaded', + ) + + fork_and_assert + end + end + end + end + end end diff --git a/spec/support/core_helpers.rb b/spec/support/core_helpers.rb index 5caca383079..1757d031f2d 100644 --- a/spec/support/core_helpers.rb +++ b/spec/support/core_helpers.rb @@ -71,6 +71,14 @@ def skip_unless_integration_testing_enabled end end + def skip_unless_fork_supported + unless Process.respond_to?(:fork) + before(:all) do + skip 'Fork is not supported on current platform' + end + end + end + # Positional and keyword arguments are both accepted to make the method # work on Ruby 2.5/2.6 and 2.7+. In practice only one type of arguments # should be used in any given call.