diff --git a/docs/examples/metrics/producer/README.rst b/docs/examples/metrics/producer/README.rst new file mode 100644 index 00000000000..675c10c84eb --- /dev/null +++ b/docs/examples/metrics/producer/README.rst @@ -0,0 +1,101 @@ +# MetricProducer Examples + +This directory contains examples of how to implement and use the `MetricProducer` interface to bridge third-party metric sources with OpenTelemetry. + +## What is MetricProducer? + +`MetricProducer` is an interface defined in the OpenTelemetry specification that allows you to plug third-party metric sources into an OpenTelemetry `MetricReader`. This is particularly useful for: + +- Bridging existing monitoring systems to OpenTelemetry +- Integrating with systems like Prometheus, StatsD, or custom monitoring solutions +- Collecting pre-processed metrics from external sources + +## Key Concepts + +- **MetricProducer**: Interface that defines how to produce metrics from third-party sources +- **MetricReader**: Collects metrics from both the OpenTelemetry SDK and registered MetricProducers +- **Pre-processed data**: Unlike OpenTelemetry instruments that collect raw measurements, MetricProducers work with already aggregated metrics + +## Examples + +### basic_example.py + +A comprehensive example showing: +- How to implement `MetricProducer` for different systems (Prometheus simulation, custom system) +- How to convert third-party metric formats to OpenTelemetry `MetricsData` +- How to register producers with a `MetricReader` +- How both SDK metrics and producer metrics are combined + +## Running the Examples + +```bash +# From the repo root +cd docs/examples/metrics/producer +python basic_example.py +``` + +## Implementation Pattern + +When implementing a `MetricProducer`: + +1. **Inherit from MetricProducer**: Create a class that extends the abstract base class +2. **Implement produce()**: This method should fetch and convert metrics to OpenTelemetry format +3. **Handle errors gracefully**: Your producer should not crash the entire collection process +4. **Respect timeout**: The `produce()` method receives a timeout parameter +5. **Return MetricsData**: Convert your metrics to the standard OpenTelemetry format + +```python +from opentelemetry.sdk.metrics.export import MetricProducer, MetricsData + +class MyMetricProducer(MetricProducer): + def produce(self, timeout_millis: float = 10_000) -> MetricsData: + # Fetch metrics from your source + raw_metrics = self.fetch_from_source() + + # Convert to OpenTelemetry format + otel_metrics = self.convert_to_otel_format(raw_metrics) + + # Return as MetricsData + return MetricsData(resource_metrics=otel_metrics) +``` + +## Best Practices + +1. **Resource Identification**: Use appropriate resource attributes to identify the source system +2. **Instrumentation Scope**: Create meaningful instrumentation scopes for your producers +3. **Metric Naming**: Use clear, descriptive metric names, optionally with prefixes +4. **Error Handling**: Handle network errors, parsing errors, and timeouts gracefully +5. **Performance**: Consider caching and efficient data fetching to avoid impacting collection performance +6. **Thread Safety**: Ensure your producer is thread-safe as it may be called concurrently + +## Integration with MetricReader + +```python +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter + +# Create your producers +producer1 = MyCustomProducer() +producer2 = PrometheusProducer() + +# Create a reader with producers +reader = PeriodicExportingMetricReader( + exporter=ConsoleMetricExporter(), + metric_producers=[producer1, producer2] +) + +# The reader will automatically collect from both SDK and producers +``` + +## Relationship to OpenTelemetry Instruments + +MetricProducer is different from OpenTelemetry instruments: + +- **Instruments** (Counter, Histogram, etc.): Collect raw measurements and aggregate them in the SDK +- **MetricProducer**: Provides already-aggregated metrics from external sources + +Use MetricProducer when you have an existing system that already aggregates metrics and you want to bridge that data into OpenTelemetry. + +## Further Reading + +- [OpenTelemetry Metrics Specification](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#metricproducer) +- [OpenTelemetry Python SDK Documentation](https://opentelemetry-python.readthedocs.io/) \ No newline at end of file diff --git a/docs/examples/metrics/producer/basic_example.py b/docs/examples/metrics/producer/basic_example.py new file mode 100644 index 00000000000..0f4dd5ad970 --- /dev/null +++ b/docs/examples/metrics/producer/basic_example.py @@ -0,0 +1,268 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This example demonstrates how to implement and use a MetricProducer to +bridge third-party metric sources with OpenTelemetry. + +MetricProducer allows you to integrate pre-processed metrics from external +systems (like Prometheus, custom monitoring systems, etc.) into the +OpenTelemetry metrics pipeline. +""" + +import time +from typing import Dict + +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + Metric, + MetricProducer, + MetricsData, + NumberDataPoint, + PeriodicExportingMetricReader, + ResourceMetrics, + ScopeMetrics, + Sum, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope + + +class PrometheusMetricProducer(MetricProducer): + """Example MetricProducer that bridges Prometheus metrics. + + This example shows how to fetch metrics from a third-party source + (simulating Prometheus) and convert them to OpenTelemetry format. + """ + + def __init__(self, prometheus_url: str = "http://localhost:9090"): + self.prometheus_url = prometheus_url + self.instrumentation_scope = InstrumentationScope( + name="prometheus.bridge", + version="1.0.0" + ) + self.resource = Resource.create({ + "service.name": "prometheus-bridge", + "bridge.source": "prometheus", + "bridge.url": prometheus_url + }) + + def produce(self, timeout_millis: float = 10_000) -> MetricsData: + """Fetch metrics from Prometheus and convert to OpenTelemetry format.""" + + # In a real implementation, you would: + # 1. Make HTTP request to Prometheus /api/v1/query_range or /metrics + # 2. Parse the response (JSON or Prometheus text format) + # 3. Convert to OpenTelemetry metrics + + # For this example, we'll simulate fetching metrics + simulated_prometheus_metrics = self._fetch_prometheus_metrics() + + # Convert to OpenTelemetry format + otel_metrics = [] + for metric_name, metric_data in simulated_prometheus_metrics.items(): + otel_metrics.append( + Metric( + name=f"prometheus.{metric_name}", + description=f"Metric {metric_name} from Prometheus", + unit=metric_data.get("unit", "1"), + data=Sum( + data_points=[ + NumberDataPoint( + attributes=metric_data.get("labels", {}), + start_time_unix_nano=int((time.time() - 60) * 1e9), # 1 minute ago + time_unix_nano=int(time.time() * 1e9), + value=metric_data["value"], + ) + ], + aggregation_temporality=1, # CUMULATIVE + is_monotonic=metric_data.get("monotonic", False), + ), + ) + ) + + # Return as MetricsData + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=self.resource, + scope_metrics=[ + ScopeMetrics( + scope=self.instrumentation_scope, + metrics=otel_metrics, + schema_url="", + ) + ], + schema_url="", + ) + ] + ) + + def _fetch_prometheus_metrics(self) -> Dict[str, Dict]: + """Simulate fetching metrics from Prometheus.""" + # In a real implementation, this would make HTTP requests to Prometheus + # and parse the response. For this example, we return simulated data. + + return { + "http_requests_total": { + "value": 12345, + "labels": {"method": "GET", "status": "200"}, + "unit": "1", + "monotonic": True, + }, + "http_request_duration_seconds": { + "value": 0.234, + "labels": {"method": "GET", "quantile": "0.95"}, + "unit": "s", + "monotonic": False, + }, + "memory_usage_bytes": { + "value": 1024 * 1024 * 512, # 512 MB + "labels": {"instance": "server-1"}, + "unit": "bytes", + "monotonic": False, + }, + } + + +class CustomSystemMetricProducer(MetricProducer): + """Example MetricProducer for a custom monitoring system.""" + + def __init__(self, system_name: str = "custom-system"): + self.system_name = system_name + self.instrumentation_scope = InstrumentationScope( + name=f"{system_name}.bridge", + version="1.0.0" + ) + self.resource = Resource.create({ + "service.name": f"{system_name}-bridge", + "bridge.source": system_name, + }) + + def produce(self, timeout_millis: float = 10_000) -> MetricsData: + """Fetch metrics from custom system.""" + + # Simulate fetching from a custom system + custom_metrics = self._fetch_custom_metrics() + + # Convert to OpenTelemetry format + otel_metrics = [] + for metric in custom_metrics: + otel_metrics.append( + Metric( + name=f"custom.{metric['name']}", + description=metric.get("description", ""), + unit=metric.get("unit", "1"), + data=Sum( + data_points=[ + NumberDataPoint( + attributes=metric.get("tags", {}), + start_time_unix_nano=int((time.time() - 30) * 1e9), # 30 seconds ago + time_unix_nano=int(time.time() * 1e9), + value=metric["value"], + ) + ], + aggregation_temporality=1, # CUMULATIVE + is_monotonic=metric.get("is_counter", False), + ), + ) + ) + + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=self.resource, + scope_metrics=[ + ScopeMetrics( + scope=self.instrumentation_scope, + metrics=otel_metrics, + schema_url="", + ) + ], + schema_url="", + ) + ] + ) + + def _fetch_custom_metrics(self) -> list: + """Simulate fetching from a custom monitoring system.""" + return [ + { + "name": "database_connections", + "value": 25, + "description": "Active database connections", + "unit": "1", + "tags": {"database": "postgres", "pool": "main"}, + "is_counter": False, + }, + { + "name": "api_calls_total", + "value": 9876, + "description": "Total API calls processed", + "unit": "1", + "tags": {"endpoint": "/api/v1/users", "method": "GET"}, + "is_counter": True, + }, + ] + + +def main(): + """Example usage of MetricProducer with OpenTelemetry.""" + + print("Starting MetricProducer example...") + + # Create MetricProducers for different third-party sources + prometheus_producer = PrometheusMetricProducer("http://localhost:9090") + custom_producer = CustomSystemMetricProducer("monitoring-system") + + # Create a metric reader that includes the producers + exporter = ConsoleMetricExporter() + reader = PeriodicExportingMetricReader( + exporter=exporter, + export_interval_millis=5000, # Export every 5 seconds + metric_producers=[prometheus_producer, custom_producer] + ) + + # IMPORTANT: Register the reader with a MeterProvider + # This is required for the reader to be able to collect metrics + meter_provider = MeterProvider(metric_readers=[reader]) + + print("Configured MetricReader with the following producers:") + print("- PrometheusMetricProducer (simulated)") + print("- CustomSystemMetricProducer (simulated)") + print("\nThe reader is now registered with a MeterProvider and will collect") + print("metrics from these producers every 5 seconds and export them to the console.\n") + + # Note: You can also use the meter_provider to create meters and instruments + # meter = meter_provider.get_meter("example.meter") + # counter = meter.create_counter("example.counter") + # counter.add(1) + + print("=== Metrics will be collected and exported every 5 seconds ===") + print("Press Ctrl+C to stop...") + + try: + # Let it run for a bit to show periodic collection + time.sleep(20) + except KeyboardInterrupt: + print("\nStopping...") + finally: + # Clean shutdown + meter_provider.shutdown() + print("MeterProvider shut down successfully.") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py index 2cb587f2f65..b48bb6eadd8 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/export/__init__.py @@ -66,6 +66,43 @@ _logger = getLogger(__name__) +class MetricProducer(ABC): + """Interface for external metric sources. + + A MetricProducer is a source of metrics external to the SDK, which can be + used to bridge existing metrics sources and third-party instrumentation + libraries. For example, a MetricProducer implementation might collect + metrics from Prometheus endpoints or system monitoring tools. + + MetricProducers are called during metric collection by MetricReaders that + support them, allowing their metrics to be included in the final export. + + This interface aligns with the OpenTelemetry specification's MetricProducer + definition, enabling consistent behavior across language implementations. + """ + + @abstractmethod + def produce(self, timeout_millis: float = 10_000) -> Optional[MetricsData]: + """Produce metrics from this external source. + + Called during metric collection to gather metrics from this producer. + + Args: + timeout_millis: Maximum time to wait for metric collection, in + milliseconds. Implementations should attempt to + respect this timeout. + + Returns: + MetricsData containing the collected metrics, or None if no metrics + are available or an error occurred. + + Raises: + Exception: If metric collection fails. Implementations should handle + errors gracefully and either return None or raise an + exception that will be logged by the MetricReader. + """ + + class MetricExportResult(Enum): """Result of exporting a metric @@ -220,6 +257,7 @@ def __init__( type, "opentelemetry.sdk.metrics.view.Aggregation" ] | None = None, + metric_producers: Iterable[MetricProducer] | None = None, ) -> None: self._collect: Callable[ [ @@ -228,6 +266,8 @@ def __init__( ], Iterable["opentelemetry.sdk.metrics.export.Metric"], ] = None + + self._metric_producers = list(metric_producers) if metric_producers is not None else [] self._instrument_class_temporality = { _Counter: AggregationTemporality.CUMULATIVE, @@ -320,8 +360,8 @@ def __init__( @final def collect(self, timeout_millis: float = 10_000) -> None: - """Collects the metrics from the internal SDK state and - invokes the `_receive_metrics` with the collection. + """Collects the metrics from the internal SDK state and any registered + MetricProducers, and invokes the `_receive_metrics` with the collection. Args: timeout_millis: Amount of time in milliseconds before this function @@ -337,11 +377,62 @@ def collect(self, timeout_millis: float = 10_000) -> None: ) return - metrics = self._collect(self, timeout_millis=timeout_millis) + from time import time_ns + from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError + + start_time_ns = time_ns() + deadline_ns = start_time_ns + (timeout_millis * 1_000_000) + + remaining_millis = timeout_millis + + sdk_metrics = self._collect(self, timeout_millis=remaining_millis) + + if time_ns() > deadline_ns: + raise MetricsTimeoutError("Timeout exceeded during SDK metric collection") + + all_resource_metrics = [] + + if sdk_metrics is not None: + if hasattr(sdk_metrics, 'resource_metrics'): + try: + all_resource_metrics.extend(sdk_metrics.resource_metrics) + except TypeError: + # sdk_metrics.resource_metrics is not iterable (e.g., Mock object in tests) + pass + + for producer in self._metric_producers: + remaining_ns = deadline_ns - time_ns() + if remaining_ns <= 0: + raise MetricsTimeoutError( + f"Timeout exceeded before collecting from MetricProducer: {producer.__class__.__name__}" + ) + + remaining_millis = remaining_ns / 1_000_000 + + try: + producer_metrics = producer.produce(remaining_millis) + + if time_ns() > deadline_ns: + raise MetricsTimeoutError( + f"Timeout exceeded while collecting from MetricProducer: {producer.__class__.__name__}" + ) + + if producer_metrics is not None and hasattr(producer_metrics, 'resource_metrics'): + all_resource_metrics.extend(producer_metrics.resource_metrics) + except MetricsTimeoutError: + raise + except Exception as e: + _logger.warning("Failed to collect from MetricProducer: %s", e) - if metrics is not None: + if all_resource_metrics: + combined_metrics = MetricsData(resource_metrics=all_resource_metrics) + self._receive_metrics( + combined_metrics, + timeout_millis=timeout_millis, + ) + elif sdk_metrics is not None: self._receive_metrics( - metrics, + sdk_metrics, timeout_millis=timeout_millis, ) @@ -400,10 +491,12 @@ def __init__( type, "opentelemetry.sdk.metrics.view.Aggregation" ] | None = None, + metric_producers: Iterable[MetricProducer] | None = None, ) -> None: super().__init__( preferred_temporality=preferred_temporality, preferred_aggregation=preferred_aggregation, + metric_producers=metric_producers, ) self._lock = RLock() self._metrics_data: "opentelemetry.sdk.metrics.export.MetricsData" = ( @@ -448,11 +541,13 @@ def __init__( exporter: MetricExporter, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, + metric_producers: Iterable[MetricProducer] | None = None, ) -> None: # PeriodicExportingMetricReader defers to exporter for configuration super().__init__( preferred_temporality=exporter._preferred_temporality, preferred_aggregation=exporter._preferred_aggregation, + metric_producers=metric_producers, ) # This lock is held whenever calling self._exporter.export() to prevent concurrent diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py index 478237cd170..a0b7faafeb1 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/metrics/export/__init__.py @@ -18,6 +18,7 @@ ConsoleMetricExporter, InMemoryMetricReader, MetricExporter, + MetricProducer, MetricExportResult, MetricReader, PeriodicExportingMetricReader, @@ -48,6 +49,7 @@ "InMemoryMetricReader", "MetricExporter", "MetricExportResult", + "MetricProducer", "MetricReader", "PeriodicExportingMetricReader", "DataPointT", diff --git a/opentelemetry-sdk/tests/metrics/test_metric_producer.py b/opentelemetry-sdk/tests/metrics/test_metric_producer.py new file mode 100644 index 00000000000..f16735bde24 --- /dev/null +++ b/opentelemetry-sdk/tests/metrics/test_metric_producer.py @@ -0,0 +1,481 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# pylint: disable=protected-access + +from typing import Optional, Union +from unittest import TestCase +from unittest.mock import Mock + +from opentelemetry.sdk.metrics.export import ( + InMemoryMetricReader, + Metric, + MetricProducer, + MetricsData, + NumberDataPoint, + ResourceMetrics, + ScopeMetrics, + Sum, +) +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.util.instrumentation import InstrumentationScope + + +class TestMetricProducer(TestCase): + def test_metric_producer_is_abstract(self): + """Test that MetricProducer cannot be instantiated directly.""" + with self.assertRaises(TypeError): + MetricProducer() # pylint: disable=abstract-class-instantiated + + def test_metric_producer_produce_method_required(self): + """Test that MetricProducer requires implementation of produce method.""" + + class IncompleteProducer(MetricProducer): + pass + + with self.assertRaises(TypeError): + IncompleteProducer() # pylint: disable=abstract-class-instantiated + + +class MockMetricProducer(MetricProducer): + """Mock implementation of MetricProducer for testing.""" + + def __init__(self, metrics_data: Union[MetricsData, None, str] = "default", should_raise: Optional[Exception] = None): + self.metrics_data = metrics_data + self.should_raise = should_raise + self.produce_called = False + + def produce(self, timeout_millis: float = 10_000) -> MetricsData: + """Produce mock metrics data.""" + self.produce_called = True + + if self.should_raise: + raise self.should_raise + + # If explicitly set to None, return empty MetricsData + if self.metrics_data is None: + return MetricsData(resource_metrics=[]) + + # If explicitly provided metrics_data, return it + if self.metrics_data != "default" and isinstance(self.metrics_data, MetricsData): + return self.metrics_data + + # Default mock data when no explicit metrics_data provided + return MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource.create({"service.name": "test-producer"}), + scope_metrics=[ + ScopeMetrics( + scope=InstrumentationScope("test.producer"), + metrics=[ + Metric( + name="producer.counter", + description="Test counter from producer", + unit="1", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"producer": "mock"}, + start_time_unix_nano=1647626444152947792, + time_unix_nano=1647626444153163239, + value=42, + ) + ], + aggregation_temporality=1, # CUMULATIVE + is_monotonic=True, + ), + ) + ], + schema_url="", + ) + ], + schema_url="", + ) + ] + ) + + +class TestInMemoryMetricReaderWithProducers(TestCase): + def test_metric_reader_with_no_producers(self): + """Test MetricReader with no producers behaves as before.""" + reader = InMemoryMetricReader() + mock_collect_callback = Mock(return_value=[]) + reader._set_collect_callback(mock_collect_callback) + + metrics_data = reader.get_metrics_data() + # When there are no metrics, should return empty list (same as before) + self.assertEqual(metrics_data, []) + + def test_metric_reader_with_single_producer(self): + """Test MetricReader with a single MetricProducer.""" + producer = MockMetricProducer() + reader = InMemoryMetricReader(metric_producers=[producer]) + mock_collect_callback = Mock(return_value=MetricsData(resource_metrics=[])) + reader._set_collect_callback(mock_collect_callback) + + metrics_data = reader.get_metrics_data() + + # Producer should have been called + self.assertTrue(producer.produce_called) + + # Should have producer's metrics + self.assertIsNotNone(metrics_data) + if metrics_data is not None: + self.assertEqual(len(metrics_data.resource_metrics), 1) + self.assertEqual( + metrics_data.resource_metrics[0].resource.attributes["service.name"], + "test-producer" + ) + + def test_metric_reader_with_multiple_producers(self): + """Test MetricReader with multiple MetricProducers.""" + producer1 = MockMetricProducer() + producer2 = MockMetricProducer( + MetricsData( + resource_metrics=[ + ResourceMetrics( + resource=Resource.create({"service.name": "test-producer-2"}), + scope_metrics=[ + ScopeMetrics( + scope=InstrumentationScope("test.producer.2"), + metrics=[ + Metric( + name="producer.gauge", + description="Test gauge from producer 2", + unit="bytes", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"producer": "mock2"}, + start_time_unix_nano=1647626444152947792, + time_unix_nano=1647626444153163239, + value=100, + ) + ], + aggregation_temporality=1, # CUMULATIVE + is_monotonic=True, + ), + ) + ], + schema_url="", + ) + ], + schema_url="", + ) + ] + ) + ) + + reader = InMemoryMetricReader(metric_producers=[producer1, producer2]) + mock_collect_callback = Mock(return_value=MetricsData(resource_metrics=[])) + reader._set_collect_callback(mock_collect_callback) + + metrics_data = reader.get_metrics_data() + + # Both producers should have been called + self.assertTrue(producer1.produce_called) + self.assertTrue(producer2.produce_called) + + # Should have metrics from both producers + self.assertIsNotNone(metrics_data) + if metrics_data is not None: + self.assertEqual(len(metrics_data.resource_metrics), 2) + + service_names = { + rm.resource.attributes["service.name"] + for rm in metrics_data.resource_metrics + } + self.assertEqual(service_names, {"test-producer", "test-producer-2"}) + + def test_metric_reader_combines_sdk_and_producer_metrics(self): + """Test that MetricReader combines both SDK and producer metrics.""" + producer = MockMetricProducer() + reader = InMemoryMetricReader(metric_producers=[producer]) + + # Mock SDK metrics + sdk_resource_metrics = ResourceMetrics( + resource=Resource.create({"service.name": "test-sdk"}), + scope_metrics=[ + ScopeMetrics( + scope=InstrumentationScope("test.sdk"), + metrics=[ + Metric( + name="sdk.counter", + description="Test counter from SDK", + unit="1", + data=Sum( + data_points=[ + NumberDataPoint( + attributes={"source": "sdk"}, + start_time_unix_nano=1647626444152947792, + time_unix_nano=1647626444153163239, + value=10, + ) + ], + aggregation_temporality=1, # CUMULATIVE + is_monotonic=True, + ), + ) + ], + schema_url="", + ) + ], + schema_url="", + ) + + mock_collect_callback = Mock( + return_value=MetricsData(resource_metrics=[sdk_resource_metrics]) + ) + reader._set_collect_callback(mock_collect_callback) + + metrics_data = reader.get_metrics_data() + + # Producer should have been called + self.assertTrue(producer.produce_called) + + # Should have metrics from both SDK and producer + self.assertIsNotNone(metrics_data) + if metrics_data is not None: + self.assertEqual(len(metrics_data.resource_metrics), 2) + + service_names = { + rm.resource.attributes["service.name"] + for rm in metrics_data.resource_metrics + } + self.assertEqual(service_names, {"test-sdk", "test-producer"}) + + def test_metric_reader_handles_producer_exception(self): + """Test that MetricReader handles exceptions from producers gracefully.""" + failing_producer = MockMetricProducer(should_raise=RuntimeError("Producer failed")) + working_producer = MockMetricProducer() + + reader = InMemoryMetricReader( + metric_producers=[failing_producer, working_producer] + ) + mock_collect_callback = Mock(return_value=MetricsData(resource_metrics=[])) + reader._set_collect_callback(mock_collect_callback) + + # Should not raise exception and should still collect from working producer + metrics_data = reader.get_metrics_data() + + # Working producer should have been called + self.assertTrue(working_producer.produce_called) + + # Should have metrics from working producer only + self.assertIsNotNone(metrics_data) + if metrics_data is not None: + self.assertEqual(len(metrics_data.resource_metrics), 1) + self.assertEqual( + metrics_data.resource_metrics[0].resource.attributes["service.name"], + "test-producer" + ) + + def test_metric_reader_with_none_producer_data(self): + """Test that MetricReader handles producers returning None.""" + producer = MockMetricProducer(metrics_data=None) + reader = InMemoryMetricReader(metric_producers=[producer]) + # Mock the SDK to return None (no SDK metrics) + mock_collect_callback = Mock(return_value=None) + reader._set_collect_callback(mock_collect_callback) + + # Should not raise exception + metrics_data = reader.get_metrics_data() + + # Producer should have been called + self.assertTrue(producer.produce_called) + + # Should have no metrics since both SDK and producer return None/empty + # When both sources are empty, get_metrics_data might return None + self.assertIsNone(metrics_data) + + def test_periodic_exporting_metric_reader_with_producers(self): + """Test PeriodicExportingMetricReader with MetricProducers.""" + from opentelemetry.sdk.metrics.export import ( + ConsoleMetricExporter, + PeriodicExportingMetricReader + ) + + producer = MockMetricProducer() + exporter = ConsoleMetricExporter() + reader = PeriodicExportingMetricReader( + exporter=exporter, + metric_producers=[producer] + ) + + # Should initialize without error + self.assertIsNotNone(reader) + self.assertEqual(len(reader._metric_producers), 1) + + # Clean up + reader.shutdown() + + +class TestMetricProducerTimeoutEnforcement(TestCase): + """Tests for timeout budget enforcement across SDK and MetricProducers.""" + + def test_timeout_enforced_with_slow_producer(self): + """Test that timeout is raised when producer takes too long.""" + import time + from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError + + class SlowMetricProducer(MetricProducer): + def produce(self, timeout_millis=10_000): + # Simulate slow producer that takes longer than timeout + time.sleep(timeout_millis / 1000 + 0.5) # Sleep longer than timeout + return None + + producer = SlowMetricProducer() + reader = InMemoryMetricReader(metric_producers=[producer]) + + # Set up minimal callback + mock_collect_callback = Mock(return_value=[]) + reader._set_collect_callback(mock_collect_callback) + + # Should raise timeout error + with self.assertRaises(MetricsTimeoutError): + reader.collect(timeout_millis=100) # Very short timeout + + def test_timeout_budget_decremented_across_producers(self): + """Test that remaining timeout decreases across multiple producers.""" + import time + + class TimeTrackingProducer(MetricProducer): + def __init__(self): + self.timeout_received = None + self.call_time = None + + def produce(self, timeout_millis=10_000): + self.timeout_received = timeout_millis + self.call_time = time.time() + time.sleep(0.05) # Sleep 50ms + return None + + producer1 = TimeTrackingProducer() + producer2 = TimeTrackingProducer() + producer3 = TimeTrackingProducer() + + reader = InMemoryMetricReader( + metric_producers=[producer1, producer2, producer3] + ) + + # Set up minimal callback that takes some time + def slow_collect(*args, **kwargs): + time.sleep(0.05) # SDK collection takes 50ms + return [] + + reader._set_collect_callback(slow_collect) + + # Collect with 1 second timeout + reader.collect(timeout_millis=1000) + + # Verify each producer got less timeout than the previous + self.assertIsNotNone(producer1.timeout_received) + self.assertIsNotNone(producer2.timeout_received) + self.assertIsNotNone(producer3.timeout_received) + + # Producer2 should have received less timeout than producer1 + # (accounting for SDK collection time and producer1 execution time) + assert producer2.timeout_received is not None + assert producer1.timeout_received is not None + self.assertLess(producer2.timeout_received, producer1.timeout_received) + + # Producer3 should have received less timeout than producer2 + assert producer3.timeout_received is not None + self.assertLess(producer3.timeout_received, producer2.timeout_received) + + def test_timeout_not_exceeded_with_fast_producers(self): + """Test that fast producers complete successfully within timeout.""" + + class FastMetricProducer(MetricProducer): + def __init__(self): + self.produce_called = False + + def produce(self, timeout_millis=10_000): + self.produce_called = True + # Return immediately + return MetricsData(resource_metrics=[]) + + producer1 = FastMetricProducer() + producer2 = FastMetricProducer() + producer3 = FastMetricProducer() + + reader = InMemoryMetricReader( + metric_producers=[producer1, producer2, producer3] + ) + + mock_collect_callback = Mock(return_value=[]) + reader._set_collect_callback(mock_collect_callback) + + # Should complete without timeout error + reader.collect(timeout_millis=1000) + + # All producers should have been called + self.assertTrue(producer1.produce_called) + self.assertTrue(producer2.produce_called) + self.assertTrue(producer3.produce_called) + + def test_timeout_error_before_sdk_collection_completes(self): + """Test timeout error if SDK collection takes too long.""" + import time + from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError + + class SlowProducer(MetricProducer): + def produce(self, timeout_millis=10_000): + return None + + producer = SlowProducer() + reader = InMemoryMetricReader(metric_producers=[producer]) + + # Set up slow SDK collection + def very_slow_collect(*args, **kwargs): + time.sleep(0.5) # Sleep longer than timeout + return [] + + reader._set_collect_callback(very_slow_collect) + + # Should raise timeout error during SDK collection + with self.assertRaises(MetricsTimeoutError) as context: + reader.collect(timeout_millis=100) + + self.assertIn("SDK metric collection", str(context.exception)) + + def test_timeout_error_identifies_failing_producer(self): + """Test that timeout error identifies which producer exceeded timeout.""" + import time + from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError + + class FastProducer(MetricProducer): + def produce(self, timeout_millis=10_000): + return None + + class NamedSlowProducer(MetricProducer): + def produce(self, timeout_millis=10_000): + time.sleep(timeout_millis / 1000 + 0.1) + return None + + producer1 = FastProducer() + producer2 = NamedSlowProducer() + + reader = InMemoryMetricReader(metric_producers=[producer1, producer2]) + + mock_collect_callback = Mock(return_value=[]) + reader._set_collect_callback(mock_collect_callback) + + # Should raise timeout error with producer name + with self.assertRaises(MetricsTimeoutError) as context: + reader.collect(timeout_millis=100) + + # Error message should mention the producer class + self.assertIn("NamedSlowProducer", str(context.exception)) \ No newline at end of file