Skip to content

Commit 98f1d63

Browse files
authored
use correct otel logger, and normalise otel paths (#15645)
1 parent 8b522d8 commit 98f1d63

File tree

2 files changed

+652
-30
lines changed

2 files changed

+652
-30
lines changed

litellm/integrations/opentelemetry.py

Lines changed: 180 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,9 @@ def _init_metrics(self, meter_provider):
227227
PeriodicExportingMetricReader,
228228
)
229229

230+
normalized_endpoint = self._normalize_otel_endpoint(self.config.endpoint, 'metrics')
230231
_metric_exporter = OTLPMetricExporter(
231-
endpoint=self.config.endpoint,
232+
endpoint=normalized_endpoint,
232233
headers=OpenTelemetry._get_headers_dictionary(self.config.headers),
233234
preferred_temporality={Histogram: AggregationTemporality.DELTA},
234235
)
@@ -268,22 +269,20 @@ def _init_logs(self, logger_provider):
268269
return
269270

270271
from opentelemetry._logs import set_logger_provider
271-
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
272272
from opentelemetry.sdk._logs import LoggerProvider as OTLoggerProvider
273273
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
274274

275275
# set up log pipeline
276276
if logger_provider is None:
277-
logger_provider = OTLoggerProvider()
277+
litellm_resource = _get_litellm_resource()
278+
logger_provider = OTLoggerProvider(resource=litellm_resource)
278279
# Only add OTLP exporter if we created the logger provider ourselves
279-
logger_provider.add_log_record_processor(
280-
BatchLogRecordProcessor(
281-
OTLPLogExporter(
282-
endpoint=self.config.endpoint,
283-
headers=self._get_headers_dictionary(self.config.headers),
284-
)
280+
log_exporter = self._get_log_exporter()
281+
if log_exporter:
282+
logger_provider.add_log_record_processor(
283+
BatchLogRecordProcessor(log_exporter) # type: ignore[arg-type]
285284
)
286-
)
285+
287286
set_logger_provider(logger_provider)
288287

289288
def log_success_event(self, kwargs, response_obj, start_time, end_time):
@@ -658,10 +657,15 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
658657
if not self.config.enable_events:
659658
return
660659

661-
from opentelemetry._logs import LogRecord, get_logger
660+
from opentelemetry._logs import SeverityNumber, get_logger, get_logger_provider
661+
from opentelemetry.sdk._logs import LogRecord as SdkLogRecord
662662

663663
otel_logger = get_logger(LITELLM_LOGGER_NAME)
664664

665+
# Get the resource from the logger provider
666+
logger_provider = get_logger_provider()
667+
resource = getattr(logger_provider, '_resource', None) or _get_litellm_resource()
668+
665669
parent_ctx = span.get_span_context()
666670
provider = (kwargs.get("litellm_params") or {}).get(
667671
"custom_llm_provider", "Unknown"
@@ -676,15 +680,18 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
676680
if self.message_logging and msg.get("content"):
677681
attrs["gen_ai.prompt"] = msg["content"]
678682

679-
otel_logger.emit(
680-
LogRecord(
681-
attributes=attrs,
682-
body=msg.copy(),
683-
trace_id=parent_ctx.trace_id,
684-
span_id=parent_ctx.span_id,
685-
trace_flags=parent_ctx.trace_flags,
686-
)
683+
log_record = SdkLogRecord(
684+
timestamp=self._to_ns(datetime.now()),
685+
trace_id=parent_ctx.trace_id,
686+
span_id=parent_ctx.span_id,
687+
trace_flags=parent_ctx.trace_flags,
688+
severity_number=SeverityNumber.INFO,
689+
severity_text="INFO",
690+
body=msg.copy(),
691+
resource=resource,
692+
attributes=attrs,
687693
)
694+
otel_logger.emit(log_record)
688695

689696
# per-choice events
690697
for idx, choice in enumerate(response_obj.get("choices", [])):
@@ -705,15 +712,18 @@ def _emit_semantic_logs(self, kwargs, response_obj, span: Span):
705712
if self.message_logging and body_msg.get("content"):
706713
body["message"]["content"] = body_msg["content"]
707714

708-
otel_logger.emit(
709-
LogRecord(
710-
attributes=attrs,
711-
body=body,
712-
trace_id=parent_ctx.trace_id,
713-
span_id=parent_ctx.span_id,
714-
trace_flags=parent_ctx.trace_flags,
715-
)
715+
log_record = SdkLogRecord(
716+
timestamp=self._to_ns(datetime.now()),
717+
trace_id=parent_ctx.trace_id,
718+
span_id=parent_ctx.span_id,
719+
trace_flags=parent_ctx.trace_flags,
720+
severity_number=SeverityNumber.INFO,
721+
severity_text="INFO",
722+
body=body,
723+
resource=resource,
724+
attributes=attrs,
716725
)
726+
otel_logger.emit(log_record)
717727

718728
def _create_guardrail_span(
719729
self, kwargs: Optional[dict], context: Optional[Context]
@@ -1292,19 +1302,21 @@ def _get_span_processor(self, dynamic_headers: Optional[dict] = None):
12921302
"OpenTelemetry: intiializing http exporter. Value of OTEL_EXPORTER: %s",
12931303
self.OTEL_EXPORTER,
12941304
)
1305+
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'traces')
12951306
return BatchSpanProcessor(
12961307
OTLPSpanExporterHTTP(
1297-
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
1308+
endpoint=normalized_endpoint, headers=_split_otel_headers
12981309
),
12991310
)
13001311
elif self.OTEL_EXPORTER == "otlp_grpc" or self.OTEL_EXPORTER == "grpc":
13011312
verbose_logger.debug(
13021313
"OpenTelemetry: intiializing grpc exporter. Value of OTEL_EXPORTER: %s",
13031314
self.OTEL_EXPORTER,
13041315
)
1316+
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'traces')
13051317
return BatchSpanProcessor(
13061318
OTLPSpanExporterGRPC(
1307-
endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
1319+
endpoint=normalized_endpoint, headers=_split_otel_headers
13081320
),
13091321
)
13101322
else:
@@ -1314,6 +1326,145 @@ def _get_span_processor(self, dynamic_headers: Optional[dict] = None):
13141326
)
13151327
return BatchSpanProcessor(ConsoleSpanExporter())
13161328

1329+
def _get_log_exporter(self):
1330+
"""
1331+
Get the appropriate log exporter based on the configuration.
1332+
"""
1333+
verbose_logger.debug(
1334+
"OpenTelemetry Logger, initializing log exporter \nself.OTEL_EXPORTER: %s\nself.OTEL_ENDPOINT: %s\nself.OTEL_HEADERS: %s",
1335+
self.OTEL_EXPORTER,
1336+
self.OTEL_ENDPOINT,
1337+
self.OTEL_HEADERS,
1338+
)
1339+
1340+
_split_otel_headers = OpenTelemetry._get_headers_dictionary(self.OTEL_HEADERS)
1341+
1342+
# Normalize endpoint for logs - ensure it points to /v1/logs instead of /v1/traces
1343+
normalized_endpoint = self._normalize_otel_endpoint(self.OTEL_ENDPOINT, 'logs')
1344+
1345+
verbose_logger.debug(
1346+
"OpenTelemetry: Log endpoint normalized from %s to %s",
1347+
self.OTEL_ENDPOINT,
1348+
normalized_endpoint,
1349+
)
1350+
1351+
if hasattr(self.OTEL_EXPORTER, "export"):
1352+
# Custom exporter provided
1353+
verbose_logger.debug(
1354+
"OpenTelemetry: Using custom log exporter. Value of OTEL_EXPORTER: %s",
1355+
self.OTEL_EXPORTER,
1356+
)
1357+
return self.OTEL_EXPORTER
1358+
1359+
if self.OTEL_EXPORTER == "console":
1360+
from opentelemetry.sdk._logs.export import ConsoleLogExporter
1361+
verbose_logger.debug(
1362+
"OpenTelemetry: Using console log exporter. Value of OTEL_EXPORTER: %s",
1363+
self.OTEL_EXPORTER,
1364+
)
1365+
return ConsoleLogExporter()
1366+
elif (
1367+
self.OTEL_EXPORTER == "otlp_http"
1368+
or self.OTEL_EXPORTER == "http/protobuf"
1369+
or self.OTEL_EXPORTER == "http/json"
1370+
):
1371+
from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter
1372+
verbose_logger.debug(
1373+
"OpenTelemetry: Using HTTP log exporter. Value of OTEL_EXPORTER: %s, endpoint: %s",
1374+
self.OTEL_EXPORTER,
1375+
normalized_endpoint,
1376+
)
1377+
return OTLPLogExporter(
1378+
endpoint=normalized_endpoint, headers=_split_otel_headers
1379+
)
1380+
elif self.OTEL_EXPORTER == "otlp_grpc" or self.OTEL_EXPORTER == "grpc":
1381+
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter
1382+
verbose_logger.debug(
1383+
"OpenTelemetry: Using gRPC log exporter. Value of OTEL_EXPORTER: %s, endpoint: %s",
1384+
self.OTEL_EXPORTER,
1385+
normalized_endpoint,
1386+
)
1387+
return OTLPLogExporter(
1388+
endpoint=normalized_endpoint, headers=_split_otel_headers
1389+
)
1390+
else:
1391+
verbose_logger.warning(
1392+
"OpenTelemetry: Unknown log exporter '%s', defaulting to console. Supported: console, otlp_http, otlp_grpc",
1393+
self.OTEL_EXPORTER,
1394+
)
1395+
from opentelemetry.sdk._logs.export import ConsoleLogExporter
1396+
return ConsoleLogExporter()
1397+
1398+
def _normalize_otel_endpoint(
1399+
self,
1400+
endpoint: Optional[str],
1401+
signal_type: str
1402+
) -> Optional[str]:
1403+
"""
1404+
Normalize the endpoint URL for a specific OpenTelemetry signal type.
1405+
1406+
The OTLP exporters expect endpoints to use signal-specific paths:
1407+
- traces: /v1/traces
1408+
- metrics: /v1/metrics
1409+
- logs: /v1/logs
1410+
1411+
This method ensures the endpoint has the correct path for the given signal type.
1412+
1413+
Args:
1414+
endpoint: The endpoint URL to normalize
1415+
signal_type: The telemetry signal type ('traces', 'metrics', or 'logs')
1416+
1417+
Returns:
1418+
Normalized endpoint URL with the correct signal path
1419+
1420+
Examples:
1421+
_normalize_otel_endpoint("http://collector:4318/v1/traces", "logs")
1422+
-> "http://collector:4318/v1/logs"
1423+
1424+
_normalize_otel_endpoint("http://collector:4318", "traces")
1425+
-> "http://collector:4318/v1/traces"
1426+
1427+
_normalize_otel_endpoint("http://collector:4318/v1/logs", "metrics")
1428+
-> "http://collector:4318/v1/metrics"
1429+
"""
1430+
if not endpoint:
1431+
return endpoint
1432+
1433+
# Validate signal_type
1434+
valid_signals = {'traces', 'metrics', 'logs'}
1435+
if signal_type not in valid_signals:
1436+
verbose_logger.warning(
1437+
"Invalid signal_type '%s' provided to _normalize_otel_endpoint. "
1438+
"Valid values: %s. Returning endpoint unchanged.",
1439+
signal_type,
1440+
valid_signals
1441+
)
1442+
return endpoint
1443+
1444+
# Remove trailing slash
1445+
endpoint = endpoint.rstrip('/')
1446+
1447+
# Check if endpoint already ends with the correct signal path
1448+
target_path = f'/v1/{signal_type}'
1449+
if endpoint.endswith(target_path):
1450+
return endpoint
1451+
1452+
# Replace existing signal path with the target signal path
1453+
other_signals = valid_signals - {signal_type}
1454+
for other_signal in other_signals:
1455+
other_path = f'/v1/{other_signal}'
1456+
if endpoint.endswith(other_path):
1457+
endpoint = endpoint.rsplit('/', 1)[0] + f'/{signal_type}'
1458+
return endpoint
1459+
1460+
# No existing signal path found, append the target path
1461+
if not endpoint.endswith('/v1'):
1462+
endpoint = endpoint + target_path
1463+
else:
1464+
endpoint = endpoint + f'/{signal_type}'
1465+
1466+
return endpoint
1467+
13171468
@staticmethod
13181469
def _get_headers_dictionary(headers: Optional[Union[str, dict]]) -> Dict[str, str]:
13191470
"""

0 commit comments

Comments
 (0)