Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ Langfuse.configure do |config|
# Optional: for self-hosted instances
config.base_url = ENV.fetch('LANGFUSE_BASE_URL', 'https://cloud.langfuse.com')

# Optional: sample traces and trace-linked scores deterministically
config.sample_rate = 1.0

# Optional: Enable stale-while-revalidate for best performance
config.cache_backend = :rails # or :memory
config.cache_stale_while_revalidate = true
Expand All @@ -52,6 +55,8 @@ end

> Langfuse tracing is isolated by default. `Langfuse.configure` stores configuration only; it does not replace `OpenTelemetry.tracer_provider`.

> `sample_rate` is applied to traces and trace-linked scores. Rebuild the client with `Langfuse.reset!` before expecting runtime sampling changes to take effect.

> Fetch and use a prompt

```ruby
Expand Down
1 change: 1 addition & 0 deletions docs/API_REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Block receives a configuration object with these properties:
| `cache_refresh_threads` | Integer | No | `5` | Background refresh threads |
| `batch_size` | Integer | No | `50` | Score + trace export batch size |
| `flush_interval` | Integer | No | `10` | Score + trace export interval (s) |
| `sample_rate` | Float | No | `1.0` | Trace + trace-linked score sampling rate (`0.0..1.0`) |
| `logger` | Logger | No | Auto-detected | Logger instance |
| `tracing_async` | Boolean | No | `true` | ⚠️ Experimental (OTel export mode) |
| `job_queue` | Symbol | No | `:default` | ⚠️ Experimental (not implemented) |
Expand Down
21 changes: 21 additions & 0 deletions docs/CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,22 @@ Used by scoring API and OpenTelemetry tracing export. See [SCORING.md](SCORING.m
config.flush_interval = 5 # Flush more frequently
```

#### `sample_rate`

- **Type:** Float (`0.0..1.0`)
- **Default:** `1.0`
- **Description:** Deterministic sampling rate for traces and trace-linked scores, based on trace ID

```ruby
config.sample_rate = 0.1 # Sample ~10% of traces
```

`0.0` drops all traces, `1.0` preserves current always-on behavior.
Trace-linked scores use the same `sample_rate` decision so the SDK does not create orphaned scores for sampled-out traces.
Session-only and dataset-run-only scores are still sent because they are not tied to a sampled trace.

For Ruby client instances, `sample_rate` is snapshotted when the client is built. Changing `config.sample_rate` later does not update that client's score sampler or the already-initialized trace sampler. Rebuild the client with `Langfuse.reset!` when changing sampling behavior.

#### `logger`

- **Type:** Logger
Expand Down Expand Up @@ -348,13 +364,16 @@ After the first successful tracing initialization, these settings require `Langf
- `base_url`
- `environment`
- `release`
- `sample_rate`
- `should_export_span`
- `tracing_async`
- `batch_size`
- `flush_interval`

That includes processor tuning. Changing `batch_size` or `flush_interval` after tracing is already live will not rebuild the exporter pipeline until reset.

The singleton client follows the same rule for score sampling: once `Langfuse.client` has been built, changing `sample_rate` on `Langfuse.configuration` does not change that client's trace-linked score sampling. Call `Langfuse.reset!`, configure again, and then rebuild the client.

Performance note for `should_export_span`:

- It runs synchronously on every ended span in the application thread
Expand All @@ -369,6 +388,7 @@ The SDK automatically reads these environment variables as defaults when no expl
- `LANGFUSE_BASE_URL` — API endpoint (defaults to `https://cloud.langfuse.com`)
- `LANGFUSE_TRACING_ENVIRONMENT` — default trace environment
- `LANGFUSE_RELEASE` — default release identifier (falls back to common CI commit envs if present)
- `LANGFUSE_SAMPLE_RATE` — trace sampling rate (`0.0..1.0`, defaults to `1.0`)

Explicit configuration always takes precedence:

Expand All @@ -386,6 +406,7 @@ end
LANGFUSE_PUBLIC_KEY=pk-lf-...
LANGFUSE_SECRET_KEY=sk-lf-...
LANGFUSE_BASE_URL=https://cloud.langfuse.com # Optional
LANGFUSE_SAMPLE_RATE=0.25 # Optional
```

## Rails-Specific Configuration
Expand Down
17 changes: 17 additions & 0 deletions docs/SCORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,23 @@ Use before shutdown:
Langfuse.flush_scores
```

## Sampling Behavior

Trace-linked scores follow the same deterministic `sample_rate` decision as traces.

- If a trace is sampled out, scores with that lowercase 32-hex `trace_id` are dropped.
- Scores without `trace_id` (for example, session-only or dataset-run-only) are not sampled out.
- Legacy/non-valid trace IDs are treated as in-sample for backwards compatibility. This matches `langfuse-python`: only lowercase 32-character hex trace IDs participate in sampling, while uppercase or custom IDs are treated as legacy.
- Sampling is decided by the Langfuse client's `sample_rate` alone. An active span's OpenTelemetry trace flags do not override the decision. If you run Langfuse inside a host OTel tracer with its own sampler, that tracer's flags will not steer Langfuse score emission.
- Score sampling is scoped to the client that creates the score. Another Langfuse client or global OpenTelemetry provider in the same process does not change that client's score sampling.
- The client snapshots `sample_rate` when it is built. If you mutate `config.sample_rate` afterward, call `Langfuse.reset!` and rebuild the client before expecting different trace or score sampling.

```ruby
Langfuse.configure do |config|
config.sample_rate = 0.1
end
```

## Getting Trace/Observation IDs

### From Observation Object
Expand Down
1 change: 1 addition & 0 deletions lib/langfuse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class UnauthorizedError < ApiError; end
require_relative "langfuse/cache_warmer"
require_relative "langfuse/api_client"
require_relative "langfuse/span_filter"
require_relative "langfuse/sampling"
require_relative "langfuse/otel_setup"
require_relative "langfuse/masking"
require_relative "langfuse/otel_attributes"
Expand Down
55 changes: 47 additions & 8 deletions lib/langfuse/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class Config
# @return [String, nil] Default release identifier applied to new traces/observations
attr_accessor :release

# @return [Float] Trace sampling rate from 0.0 to 1.0
attr_reader :sample_rate

# @return [#call, nil] Callback that decides whether an ended span should export to Langfuse.
attr_accessor :should_export_span

Expand Down Expand Up @@ -118,6 +121,9 @@ class Config
# @return [Symbol] Default ActiveJob queue name
DEFAULT_JOB_QUEUE = :default

# @return [Float] Default trace sampling rate (sample all traces)
DEFAULT_SAMPLE_RATE = 1.0

# @return [Integer] Number of seconds representing indefinite cache duration (~1000 years)
INDEFINITE_SECONDS = 1000 * 365 * 24 * 60 * 60

Expand All @@ -140,7 +146,6 @@ class Config
# @yield [config] Optional block for configuration
# @yieldparam config [Config] The config instance
# @return [Config] a new Config instance
# rubocop:disable Metrics/AbcSize
def initialize
@public_key = ENV.fetch("LANGFUSE_PUBLIC_KEY", nil)
@secret_key = ENV.fetch("LANGFUSE_SECRET_KEY", nil)
Expand All @@ -157,15 +162,11 @@ def initialize
@batch_size = DEFAULT_BATCH_SIZE
@flush_interval = DEFAULT_FLUSH_INTERVAL
@job_queue = DEFAULT_JOB_QUEUE
@environment = env_value("LANGFUSE_TRACING_ENVIRONMENT")
@release = env_value("LANGFUSE_RELEASE") || detect_release_from_ci_env
@should_export_span = nil
@mask = nil
initialize_tracing_defaults
@logger = default_logger

yield(self) if block_given?
end
# rubocop:enable Metrics/AbcSize

# Validate the configuration
#
Expand All @@ -188,9 +189,8 @@ def validate!
validate_swr_config!

validate_cache_backend!

validate_sample_rate!
validate_should_export_span!

validate_mask!
end
# rubocop:enable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity
Expand All @@ -212,6 +212,15 @@ def normalized_stale_ttl
cache_stale_ttl == :indefinite ? INDEFINITE_SECONDS : cache_stale_ttl
end

# Set trace sampling rate.
#
# @param value [Numeric, String] Sampling rate from 0.0 to 1.0
# @raise [ConfigurationError] if value is non-numeric or outside 0.0..1.0
# @return [Float]
def sample_rate=(value)
@sample_rate = coerce_sample_rate(value)
end

private

def default_logger
Expand All @@ -222,6 +231,14 @@ def default_logger
end
end

def initialize_tracing_defaults
@environment = env_value("LANGFUSE_TRACING_ENVIRONMENT")
@release = env_value("LANGFUSE_RELEASE") || detect_release_from_ci_env
self.sample_rate = env_value("LANGFUSE_SAMPLE_RATE") || DEFAULT_SAMPLE_RATE
@should_export_span = nil
@mask = nil
end

def validate_cache_backend!
valid_backends = %i[memory rails]
return if valid_backends.include?(cache_backend)
Expand Down Expand Up @@ -262,6 +279,12 @@ def validate_refresh_threads!
raise ConfigurationError, "cache_refresh_threads must be positive"
end

def validate_sample_rate!
return if sample_rate.is_a?(Numeric) && sample_rate.between?(0.0, 1.0)

raise ConfigurationError, "sample_rate must be between 0.0 and 1.0"
end

def validate_mask!
return if mask.nil? || mask.respond_to?(:call)

Expand Down Expand Up @@ -289,6 +312,22 @@ def env_value(key)

value
end

def coerce_sample_rate(value)
numeric_value = if value.is_a?(Numeric)
value.to_f
elsif value.is_a?(String)
Float(value)
else
raise ConfigurationError, "sample_rate must be numeric"
end

return numeric_value if numeric_value.between?(0.0, 1.0)

raise ConfigurationError, "sample_rate must be between 0.0 and 1.0"
rescue ArgumentError, TypeError
raise ConfigurationError, "sample_rate must be numeric"
end
end
# rubocop:enable Metrics/ClassLength
end
9 changes: 8 additions & 1 deletion lib/langfuse/otel_setup.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module OtelSetup
base_url
environment
release
sample_rate
should_export_span
tracing_async
batch_size
Expand Down Expand Up @@ -126,7 +127,9 @@ def rollback_provider(provider)
end

def build_tracer_provider(config)
provider = OpenTelemetry::SDK::Trace::TracerProvider.new
provider = OpenTelemetry::SDK::Trace::TracerProvider.new(
sampler: build_sampler(config.sample_rate)
)
provider.add_span_processor(
SpanProcessor.new(config: config, exporter: build_exporter(config))
)
Expand Down Expand Up @@ -172,6 +175,10 @@ def build_headers(public_key, secret_key)
encoded = Base64.strict_encode64(credentials)
{ "Authorization" => "Basic #{encoded}" }
end

def build_sampler(sample_rate)
Sampling.build_sampler(sample_rate) || OpenTelemetry::SDK::Trace::Samplers::ALWAYS_ON
end
end
end
# rubocop:enable Metrics/ModuleLength
Expand Down
20 changes: 20 additions & 0 deletions lib/langfuse/sampling.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# frozen_string_literal: true

module Langfuse
# Shared sampling helpers for trace and score emission.
#
# @api private
module Sampling
module_function

# Build the sampler used by both trace export and trace-linked score emission.
#
# @param sample_rate [Float] Sampling rate from 0.0 to 1.0
# @return [OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased, nil]
def build_sampler(sample_rate)
return nil if sample_rate >= 1.0

OpenTelemetry::SDK::Trace::Samplers::TraceIdRatioBased.new(sample_rate)
end
end
end
61 changes: 43 additions & 18 deletions lib/langfuse/score_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class ScoreClient
# @return [Logger] Logger instance
attr_reader :logger

HEX_TRACE_ID_PATTERN = /\A[0-9a-f]{32}\z/

# Initialize a new ScoreClient
#
# @param api_client [ApiClient] The API client for sending batches
Expand All @@ -43,6 +45,9 @@ def initialize(api_client:, config:)
@mutex = Mutex.new
@flush_thread = nil
@shutdown = false
# Match the immutable tracing setup contract: once this client exists, later config
# mutations must not change score sampling without rebuilding the client.
@score_sampler = Sampling.build_sampler(config.sample_rate)

start_flush_timer
end
Expand Down Expand Up @@ -76,28 +81,19 @@ def initialize(api_client:, config:)
def create(name:, value:, id: nil, trace_id: nil, session_id: nil, observation_id: nil, comment: nil,
metadata: nil, environment: nil, data_type: :numeric, dataset_run_id: nil, config_id: nil)
validate_name(name)
# Keep identifier policy server-side to preserve cross-SDK parity and avoid blocking valid future payloads.
normalized_value = normalize_value(value, data_type)
data_type_str = Types::SCORE_DATA_TYPES[data_type] || raise(ArgumentError, "Invalid data_type: #{data_type}")

return unless enqueue_trace_linked_score?(trace_id)

event = build_score_event(
name: name,
value: normalized_value,
id: id,
trace_id: trace_id,
session_id: session_id,
observation_id: observation_id,
comment: comment,
metadata: metadata,
environment: environment,
data_type: data_type_str,
dataset_run_id: dataset_run_id,
config_id: config_id
name: name, value: normalized_value, id: id, trace_id: trace_id,
session_id: session_id, observation_id: observation_id, comment: comment,
metadata: metadata, environment: environment, data_type: data_type_str,
dataset_run_id: dataset_run_id, config_id: config_id
)

@queue << event

# Trigger flush if batch size reached
flush if @queue.size >= config.batch_size
rescue StandardError => e
logger.error("Langfuse score creation failed: #{e.message}")
Expand Down Expand Up @@ -294,14 +290,43 @@ def validate_name(name)
# @return [Hash] Hash with :trace_id and :observation_id (may be nil)
def extract_ids_from_active_span
span = OpenTelemetry::Trace.current_span
return { trace_id: nil, observation_id: nil } unless span&.recording?
span_context = span&.context
return { trace_id: nil, observation_id: nil } unless span_context&.valid?

{
trace_id: span.context.trace_id.unpack1("H*"),
observation_id: span.context.span_id.unpack1("H*")
trace_id: span_context.trace_id.unpack1("H*"),
observation_id: span_context.span_id.unpack1("H*")
}
end

# Score sampling is decided purely by the configured sampler on the trace_id hash,
# matching langfuse-python. Non-hex trace ids and session/dataset-only scores bypass sampling.
def enqueue_trace_linked_score?(trace_id)
return true if trace_id.nil?
return true unless HEX_TRACE_ID_PATTERN.match?(trace_id)

sampler = score_sampler
return true if sampler.nil?
return true unless sampler.respond_to?(:should_sample?)
Comment thread
kxzk marked this conversation as resolved.

sample_result = sampler.should_sample?(
trace_id: [trace_id].pack("H*"),
parent_context: nil,
links: [],
name: "score",
kind: OpenTelemetry::Trace::SpanKind::INTERNAL,
attributes: {}
)
sample_result.sampled?
rescue StandardError => e
logger.warn("Langfuse score sampling fallback for trace_id=#{trace_id}: #{e.message}")
true
end

# Sampler is pinned at ScoreClient construction to match the "sample_rate requires reset!"
# contract and to keep each client's sampling scoped to its own config.
attr_reader :score_sampler

# Send a batch of events to the API
#
# @param events [Array<Hash>] Array of event hashes
Expand Down
Loading
Loading