Skip to content

Commit 85f082a

Browse files
committed
use correct otel logger, and normalise otel paths
1 parent bb3b77c commit 85f082a

File tree

2 files changed

+654
-31
lines changed

2 files changed

+654
-31
lines changed

litellm/integrations/opentelemetry.py

Lines changed: 182 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,9 @@ def _init_metrics(self, meter_provider):
227227
PeriodicExportingMetricReader,
228228
)
229229

230+
normalized_endpoint = self._normalize_otel_endpoint(self.config.endpoint, 'metrics')
230231
_metric_exporter = OTLPMetricExporter(
231-
endpoint=self.config.endpoint,
232+
endpoint=normalized_endpoint,
232233
headers=OpenTelemetry._get_headers_dictionary(self.config.headers),
233234
preferred_temporality={Histogram: AggregationTemporality.DELTA},
234235
)
@@ -268,22 +269,20 @@ def _init_logs(self, logger_provider):
268269
return
269270

270271
from opentelemetry._logs import set_logger_provider
271-
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
272272
from opentelemetry.sdk._logs import LoggerProvider as OTLoggerProvider
273273
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
274274

275275
# set up log pipeline
276276
if logger_provider is None:
277-
logger_provider = OTLoggerProvider()
277+
litellm_resource = _get_litellm_resource()
278+
logger_provider = OTLoggerProvider(resource=litellm_resource)
278279
# Only add OTLP exporter if we created the logger provider ourselves
279-
logger_provider.add_log_record_processor(
280-
BatchLogRecordProcessor(
281-
OTLPLogExporter(
282-
endpoint=self.config.endpoint,
283-
headers=self._get_headers_dictionary(self.config.headers),
284-
)
280+
log_exporter = self._get_log_exporter()
281+
if log_exporter:
282+
logger_provider.add_log_record_processor(
283+
BatchLogRecordProcessor(log_exporter) # type: ignore[arg-type]
285284
)
286-
)
285+
287286
set_logger_provider(logger_provider)
288287

289288
def log_success_event(self, kwargs, response_obj, start_time, end_time):
@@ -543,7 +542,7 @@ def _handle_success(self, kwargs, response_obj, start_time, end_time):
543542
# 4. Metrics & cost recording
544543
self._record_metrics(kwargs, response_obj, start_time, end_time)
545544

546-
# 5. Semantic logs.
545+
# 5. Semantic logs.
547546
if self.config.enable_events:
548547
self._emit_semantic_logs(kwargs, response_obj, span)
549548

@@ -652,9 +651,15 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
652651
if not self.config.enable_events:
653652
return
654653

655-
from opentelemetry._logs import LogRecord, get_logger
654+
from opentelemetry._logs import SeverityNumber, get_logger, get_logger_provider
655+
from opentelemetry.sdk._logs import LogRecord as SdkLogRecord
656+
656657
otel_logger = get_logger(LITELLM_LOGGER_NAME)
657658

659+
# Get the resource from the logger provider
660+
logger_provider = get_logger_provider()
661+
resource = getattr(logger_provider, '_resource', None) or _get_litellm_resource()
662+
658663
parent_ctx = span.get_span_context()
659664
provider = (kwargs.get("litellm_params") or {}).get(
660665
"custom_llm_provider", "Unknown"
@@ -669,15 +674,18 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
669674
if self.message_logging and msg.get("content"):
670675
attrs["gen_ai.prompt"] = msg["content"]
671676

672-
otel_logger.emit(
673-
LogRecord(
674-
attributes=attrs,
675-
body=msg.copy(),
676-
trace_id=parent_ctx.trace_id,
677-
span_id=parent_ctx.span_id,
678-
trace_flags=parent_ctx.trace_flags,
679-
)
677+
log_record = SdkLogRecord(
678+
timestamp=self._to_ns(datetime.now()),
679+
trace_id=parent_ctx.trace_id,
680+
span_id=parent_ctx.span_id,
681+
trace_flags=parent_ctx.trace_flags,
682+
severity_number=SeverityNumber.INFO,
683+
severity_text="INFO",
684+
body=msg.copy(),
685+
resource=resource,
686+
attributes=attrs,
680687
)
688+
otel_logger.emit(log_record)
681689

682690
# per-choice events
683691
for idx, choice in enumerate(response_obj.get("choices", [])):
@@ -698,15 +706,18 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
698706
if self.message_logging and body_msg.get("content"):
699707
body["message"]["content"] = body_msg["content"]
700708

701-
otel_logger.emit(
702-
LogRecord(
703-
attributes=attrs,
704-
body=body,
705-
trace_id=parent_ctx.trace_id,
706-
span_id=parent_ctx.span_id,
707-
trace_flags=parent_ctx.trace_flags,
708-
)
709+
log_record = SdkLogRecord(
710+
timestamp=self._to_ns(datetime.now()),
711+
trace_id=parent_ctx.trace_id,
712+
span_id=parent_ctx.span_id,
713+
trace_flags=parent_ctx.trace_flags,
714+
severity_number=SeverityNumber.INFO,
715+
severity_text="INFO",
716+
body=body,
717+
resource=resource,
718+
attributes=attrs,
709719
)
720+
otel_logger.emit(log_record)
710721

711722

712723
def _create_guardrail_span(
@@ -1278,19 +1289,21 @@ def _get_span_processor(self, dynamic_headers: Optional[dict] = None):
12781289
"OpenTelemetry: intiializing http exporter. Value of OTEL_EXPORTER: %s",
12791290
self.OTEL_EXPORTER,
12801291
)
1292+
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'traces')
12811293
return BatchSpanProcessor(
12821294
OTLPSpanExporterHTTP(
1283-
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
1295+
endpoint=normalized_endpoint, headers=_split_otel_headers
12841296
),
12851297
)
12861298
elif self.OTEL_EXPORTER == "otlp_grpc" or self.OTEL_EXPORTER == "grpc":
12871299
verbose_logger.debug(
12881300
"OpenTelemetry: intiializing grpc exporter. Value of OTEL_EXPORTER: %s",
12891301
self.OTEL_EXPORTER,
12901302
)
1303+
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'traces')
12911304
return BatchSpanProcessor(
12921305
OTLPSpanExporterGRPC(
1293-
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
1306+
endpoint=normalized_endpoint, headers=_split_otel_headers
12941307
),
12951308
)
12961309
else:
@@ -1300,6 +1313,145 @@ def _get_span_processor(self, dynamic_headers: Optional[dict] = None):
13001313
)
13011314
return BatchSpanProcessor(ConsoleSpanExporter())
13021315

1316+
def _get_log_exporter(self):
1317+
"""
1318+
Get the appropriate log exporter based on the configuration.
1319+
"""
1320+
verbose_logger.debug(
1321+
"OpenTelemetry Logger, initializing log exporter \nself.OTEL_EXPORTER: %s\nself.OTEL_ENDPOINT: %s\nself.OTEL_HEADERS: %s",
1322+
self.OTEL_EXPORTER,
1323+
self.OTEL_ENDPOINT,
1324+
self.OTEL_HEADERS,
1325+
)
1326+
1327+
_split_otel_headers = OpenTelemetry._get_headers_dictionary(self.OTEL_HEADERS)
1328+
1329+
# Normalize endpoint for logs - ensure it points to /v1/logs instead of /v1/traces
1330+
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'logs')
1331+
1332+
verbose_logger.debug(
1333+
"OpenTelemetry: Log endpoint normalized from %s to %s",
1334+
self.OTEL_ENDPOINT,
1335+
normalized_endpoint,
1336+
)
1337+
1338+
if hasattr(self.OTEL_EXPORTER, "export"):
1339+
# Custom exporter provided
1340+
verbose_logger.debug(
1341+
"OpenTelemetry: Using custom log exporter. Value of OTEL_EXPORTER: %s",
1342+
self.OTEL_EXPORTER,
1343+
)
1344+
return self.OTEL_EXPORTER
1345+
1346+
if self.OTEL_EXPORTER == "console":
1347+
from opentelemetry.sdk._logs.export import ConsoleLogExporter
1348+
verbose_logger.debug(
1349+
"OpenTelemetry: Using console log exporter. Value of OTEL_EXPORTER: %s",
1350+
self.OTEL_EXPORTER,
1351+
)
1352+
return ConsoleLogExporter()
1353+
elif (
1354+
self.OTEL_EXPORTER == "otlp_http"
1355+
or self.OTEL_EXPORTER == "http/protobuf"
1356+
or self.OTEL_EXPORTER == "http/json"
1357+
):
1358+
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
1359+
verbose_logger.debug(
1360+
"OpenTelemetry: Using HTTP log exporter. Value of OTEL_EXPORTER: %s, endpoint: %s",
1361+
self.OTEL_EXPORTER,
1362+
normalized_endpoint,
1363+
)
1364+
return OTLPLogExporter(
1365+
endpoint=normalized_endpoint, headers=_split_otel_headers
1366+
)
1367+
elif self.OTEL_EXPORTER == "otlp_grpc" or self.OTEL_EXPORTER == "grpc":
1368+
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
1369+
verbose_logger.debug(
1370+
"OpenTelemetry: Using gRPC log exporter. Value of OTEL_EXPORTER: %s, endpoint: %s",
1371+
self.OTEL_EXPORTER,
1372+
normalized_endpoint,
1373+
)
1374+
return OTLPLogExporter(
1375+
endpoint=normalized_endpoint, headers=_split_otel_headers
1376+
)
1377+
else:
1378+
verbose_logger.warning(
1379+
"OpenTelemetry: Unknown log exporter '%s', defaulting to console. Supported: console, otlp_http, otlp_grpc",
1380+
self.OTEL_EXPORTER,
1381+
)
1382+
from opentelemetry.sdk._logs.export import ConsoleLogExporter
1383+
return ConsoleLogExporter()
1384+
1385+
def _normalize_otel_endpoint(
1386+
self,
1387+
endpoint: Optional[str],
1388+
signal_type: str
1389+
) -> Optional[str]:
1390+
"""
1391+
Normalize the endpoint URL for a specific OpenTelemetry signal type.
1392+
1393+
The OTLP exporters expect endpoints to use signal-specific paths:
1394+
- traces: /v1/traces
1395+
- metrics: /v1/metrics
1396+
- logs: /v1/logs
1397+
1398+
This method ensures the endpoint has the correct path for the given signal type.
1399+
1400+
Args:
1401+
endpoint: The endpoint URL to normalize
1402+
signal_type: The telemetry signal type ('traces', 'metrics', or 'logs')
1403+
1404+
Returns:
1405+
Normalized endpoint URL with the correct signal path
1406+
1407+
Examples:
1408+
_normalize_otel_endpoint("http://collector:4318/v1/traces", "logs")
1409+
-> "http://collector:4318/v1/logs"
1410+
1411+
_normalize_otel_endpoint("http://collector:4318", "traces")
1412+
-> "http://collector:4318/v1/traces"
1413+
1414+
_normalize_otel_endpoint("http://collector:4318/v1/logs", "metrics")
1415+
-> "http://collector:4318/v1/metrics"
1416+
"""
1417+
if not endpoint:
1418+
return endpoint
1419+
1420+
# Validate signal_type
1421+
valid_signals = {'traces', 'metrics', 'logs'}
1422+
if signal_type not in valid_signals:
1423+
verbose_logger.warning(
1424+
"Invalid signal_type '%s' provided to _normalize_otel_endpoint. "
1425+
"Valid values: %s. Returning endpoint unchanged.",
1426+
signal_type,
1427+
valid_signals
1428+
)
1429+
return endpoint
1430+
1431+
# Remove trailing slash
1432+
endpoint = endpoint.rstrip('/')
1433+
1434+
# Check if endpoint already ends with the correct signal path
1435+
target_path = f'/v1/{signal_type}'
1436+
if endpoint.endswith(target_path):
1437+
return endpoint
1438+
1439+
# Replace existing signal path with the target signal path
1440+
other_signals = valid_signals - {signal_type}
1441+
for other_signal in other_signals:
1442+
other_path = f'/v1/{other_signal}'
1443+
if endpoint.endswith(other_path):
1444+
endpoint = endpoint.rsplit('/', 1)[0] + f'/{signal_type}'
1445+
return endpoint
1446+
1447+
# No existing signal path found, append the target path
1448+
if not endpoint.endswith('/v1'):
1449+
endpoint = endpoint + target_path
1450+
else:
1451+
endpoint = endpoint + f'/{signal_type}'
1452+
1453+
return endpoint
1454+
13031455
@staticmethod
13041456
def _get_headers_dictionary(headers: Optional[Union[str, dict]]) -> Dict[str, str]:
13051457
"""

0 commit comments

Comments
 (0)