Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 180 additions & 29 deletions litellm/integrations/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,9 @@ def _init_metrics(self, meter_provider):
PeriodicExportingMetricReader,
)

normalized_endpoint = self._normalize_otel_endpoint(self.config.endpoint, 'metrics')
_metric_exporter = OTLPMetricExporter(
endpoint=self.config.endpoint,
endpoint=normalized_endpoint,
headers=OpenTelemetry._get_headers_dictionary(self.config.headers),
preferred_temporality={Histogram: AggregationTemporality.DELTA},
)
Expand Down Expand Up @@ -268,22 +269,20 @@ def _init_logs(self, logger_provider):
return

from opentelemetry._logs import set_logger_provider
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
from opentelemetry.sdk._logs import LoggerProvider as OTLoggerProvider
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor

# set up log pipeline
if logger_provider is None:
logger_provider = OTLoggerProvider()
litellm_resource = _get_litellm_resource()
logger_provider = OTLoggerProvider(resource=litellm_resource)
# Only add OTLP exporter if we created the logger provider ourselves
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(
OTLPLogExporter(
endpoint=self.config.endpoint,
headers=self._get_headers_dictionary(self.config.headers),
)
log_exporter = self._get_log_exporter()
if log_exporter:
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(log_exporter) # type: ignore[arg-type]
)
)

set_logger_provider(logger_provider)

def log_success_event(self, kwargs, response_obj, start_time, end_time):
Expand Down Expand Up @@ -658,10 +657,15 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
if not self.config.enable_events:
return

from opentelemetry._logs import LogRecord, get_logger
from opentelemetry._logs import SeverityNumber, get_logger, get_logger_provider
from opentelemetry.sdk._logs import LogRecord as SdkLogRecord

otel_logger = get_logger(LITELLM_LOGGER_NAME)

# Get the resource from the logger provider
logger_provider = get_logger_provider()
resource = getattr(logger_provider, '_resource', None) or _get_litellm_resource()

parent_ctx = span.get_span_context()
provider = (kwargs.get("litellm_params") or {}).get(
"custom_llm_provider", "Unknown"
Expand All @@ -676,15 +680,18 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
if self.message_logging and msg.get("content"):
attrs["gen_ai.prompt"] = msg["content"]

otel_logger.emit(
LogRecord(
attributes=attrs,
body=msg.copy(),
trace_id=parent_ctx.trace_id,
span_id=parent_ctx.span_id,
trace_flags=parent_ctx.trace_flags,
)
log_record = SdkLogRecord(
timestamp=self._to_ns(datetime.now()),
trace_id=parent_ctx.trace_id,
span_id=parent_ctx.span_id,
trace_flags=parent_ctx.trace_flags,
severity_number=SeverityNumber.INFO,
severity_text="INFO",
body=msg.copy(),
resource=resource,
attributes=attrs,
)
otel_logger.emit(log_record)

# per-choice events
for idx, choice in enumerate(response_obj.get("choices", [])):
Expand All @@ -705,15 +712,18 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
if self.message_logging and body_msg.get("content"):
body["message"]["content"] = body_msg["content"]

otel_logger.emit(
LogRecord(
attributes=attrs,
body=body,
trace_id=parent_ctx.trace_id,
span_id=parent_ctx.span_id,
trace_flags=parent_ctx.trace_flags,
)
log_record = SdkLogRecord(
timestamp=self._to_ns(datetime.now()),
trace_id=parent_ctx.trace_id,
span_id=parent_ctx.span_id,
trace_flags=parent_ctx.trace_flags,
severity_number=SeverityNumber.INFO,
severity_text="INFO",
body=body,
resource=resource,
attributes=attrs,
)
otel_logger.emit(log_record)

def _create_guardrail_span(
self, kwargs: Optional[dict], context: Optional[Context]
Expand Down Expand Up @@ -1292,19 +1302,21 @@ def _get_span_processor(self, dynamic_headers: Optional[dict] = None):
"OpenTelemetry: intiializing http exporter. Value of OTEL_EXPORTER: %s",
self.OTEL_EXPORTER,
)
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'traces')
return BatchSpanProcessor(
OTLPSpanExporterHTTP(
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
endpoint=normalized_endpoint, headers=_split_otel_headers
),
)
elif self.OTEL_EXPORTER == "otlp_grpc" or self.OTEL_EXPORTER == "grpc":
verbose_logger.debug(
"OpenTelemetry: intiializing grpc exporter. Value of OTEL_EXPORTER: %s",
self.OTEL_EXPORTER,
)
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'traces')
return BatchSpanProcessor(
OTLPSpanExporterGRPC(
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
endpoint=normalized_endpoint, headers=_split_otel_headers
),
)
else:
Expand All @@ -1314,6 +1326,145 @@ def _get_span_processor(self, dynamic_headers: Optional[dict] = None):
)
return BatchSpanProcessor(ConsoleSpanExporter())

def _get_log_exporter(self):
"""
Get the appropriate log exporter based on the configuration.
"""
verbose_logger.debug(
"OpenTelemetry Logger, initializing log exporter \nself.OTEL_EXPORTER: %s\nself.OTEL_ENDPOINT: %s\nself.OTEL_HEADERS: %s",
self.OTEL_EXPORTER,
self.OTEL_ENDPOINT,
self.OTEL_HEADERS,
)

_split_otel_headers = OpenTelemetry._get_headers_dictionary(self.OTEL_HEADERS)

# Normalize endpoint for logs - ensure it points to /v1/logs instead of /v1/traces
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'logs')

verbose_logger.debug(
"OpenTelemetry: Log endpoint normalized from %s to %s",
self.OTEL_ENDPOINT,
normalized_endpoint,
)

if hasattr(self.OTEL_EXPORTER, "export"):
# Custom exporter provided
verbose_logger.debug(
"OpenTelemetry: Using custom log exporter. Value of OTEL_EXPORTER: %s",
self.OTEL_EXPORTER,
)
return self.OTEL_EXPORTER

if self.OTEL_EXPORTER == "console":
from opentelemetry.sdk._logs.export import ConsoleLogExporter
verbose_logger.debug(
"OpenTelemetry: Using console log exporter. Value of OTEL_EXPORTER: %s",
self.OTEL_EXPORTER,
)
return ConsoleLogExporter()
elif (
self.OTEL_EXPORTER == "otlp_http"
or self.OTEL_EXPORTER == "http/protobuf"
or self.OTEL_EXPORTER == "http/json"
):
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
verbose_logger.debug(
"OpenTelemetry: Using HTTP log exporter. Value of OTEL_EXPORTER: %s, endpoint: %s",
self.OTEL_EXPORTER,
normalized_endpoint,
)
return OTLPLogExporter(
endpoint=normalized_endpoint, headers=_split_otel_headers
)
elif self.OTEL_EXPORTER == "otlp_grpc" or self.OTEL_EXPORTER == "grpc":
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
verbose_logger.debug(
"OpenTelemetry: Using gRPC log exporter. Value of OTEL_EXPORTER: %s, endpoint: %s",
self.OTEL_EXPORTER,
normalized_endpoint,
)
return OTLPLogExporter(
endpoint=normalized_endpoint, headers=_split_otel_headers
)
else:
verbose_logger.warning(
"OpenTelemetry: Unknown log exporter '%s', defaulting to console. Supported: console, otlp_http, otlp_grpc",
self.OTEL_EXPORTER,
)
from opentelemetry.sdk._logs.export import ConsoleLogExporter
return ConsoleLogExporter()

def _normalize_otel_endpoint(
self,
endpoint: Optional[str],
signal_type: str
) -> Optional[str]:
"""
Normalize the endpoint URL for a specific OpenTelemetry signal type.

The OTLP exporters expect endpoints to use signal-specific paths:
- traces: /v1/traces
- metrics: /v1/metrics
- logs: /v1/logs

This method ensures the endpoint has the correct path for the given signal type.

Args:
endpoint: The endpoint URL to normalize
signal_type: The telemetry signal type ('traces', 'metrics', or 'logs')

Returns:
Normalized endpoint URL with the correct signal path

Examples:
_normalize_otel_endpoint("http://collector:4318/v1/traces", "logs")
-> "http://collector:4318/v1/logs"

_normalize_otel_endpoint("http://collector:4318", "traces")
-> "http://collector:4318/v1/traces"

_normalize_otel_endpoint("http://collector:4318/v1/logs", "metrics")
-> "http://collector:4318/v1/metrics"
"""
if not endpoint:
return endpoint

# Validate signal_type
valid_signals = {'traces', 'metrics', 'logs'}
if signal_type not in valid_signals:
verbose_logger.warning(
"Invalid signal_type '%s' provided to _normalize_otel_endpoint. "
"Valid values: %s. Returning endpoint unchanged.",
signal_type,
valid_signals
)
return endpoint

# Remove trailing slash
endpoint = endpoint.rstrip('/')

# Check if endpoint already ends with the correct signal path
target_path = f'/v1/{signal_type}'
if endpoint.endswith(target_path):
return endpoint

# Replace existing signal path with the target signal path
other_signals = valid_signals - {signal_type}
for other_signal in other_signals:
other_path = f'/v1/{other_signal}'
if endpoint.endswith(other_path):
endpoint = endpoint.rsplit('/', 1)[0] + f'/{signal_type}'
return endpoint

# No existing signal path found, append the target path
if not endpoint.endswith('/v1'):
endpoint = endpoint + target_path
else:
endpoint = endpoint + f'/{signal_type}'

return endpoint

@staticmethod
def _get_headers_dictionary(headers: Optional[Union[str, dict]]) -> Dict[str, str]:
"""
Expand Down
Loading
Loading