diff --git a/.github/workflows/sofatutor_image.yml b/.github/workflows/sofatutor_image.yml new file mode 100644 index 000000000000..15bcd42f2951 --- /dev/null +++ b/.github/workflows/sofatutor_image.yml @@ -0,0 +1,59 @@ +# This workflow uses actions that are not certified by GitHub. +# They are provided by a third-party and are governed by +# separate terms of service, privacy policy, and support +# documentation. + +# GitHub recommends pinning actions to a commit SHA. +# To get a newer version, you will need to update the SHA. +# You can also reference a tag or branch, but the action may change without warning. +--- +name: Create and publish a Container image + +on: + push: + branches: + - sofatutor-tweaks + tags: + - "v*" + +env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository_owner }}/${{ github.repository }} + +jobs: + build-and-push-image: + runs-on: ubuntu-22.04 + permissions: + contents: read + packages: write + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Log in to the Container registry + uses: sofatutor/docker-login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract metadata (tags, labels) for Docker + id: meta + uses: sofatutor/docker-metadata-action@57396166ad8aefe6098280995947635806a0e6ea + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + tags: | + type=ref,event=branch + type=ref,event=pr + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + + - name: Build and push Docker image + uses: sofatutor/docker-build-push-action@c56af957549030174b10d6867f20e78cfd7debc5 + with: + context: . + push: true + pull: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} diff --git a/docs/feature-cloudwatch-logging-plan.md b/docs/feature-cloudwatch-logging-plan.md new file mode 100644 index 000000000000..ce4bfb918f46 --- /dev/null +++ b/docs/feature-cloudwatch-logging-plan.md @@ -0,0 +1,95 @@ +# Feature: CloudWatch Logging Support + +> **Note:** All relevant patch diffs for this feature are available in `/tmp/patch_*.diff`. Refer to these files for manual or automated patch application as needed. + +## Summary +This document tracks the changes required to add AWS CloudWatch logging support to LiteLLM. The goal is to integrate new logging modules, refactor callback handling, and ensure robust logging for CloudWatch. + +## Checklist +- [x] Identify all relevant code changes from the diff +- [x] Apply new CloudWatch integration module +- [x] Refactor core logging utilities to support CloudWatch +- [x] Update proxy and utility modules for new logging hooks and call types +- [x] Test logging functionality in development/staging +- [x] Document configuration and usage +- [x] Fix tests to properly handle CloudWatch integration + +## Sequential Application Plan + +1. **litellm/integrations/cloud_watch.py** + - [x] Check if file exists (it does not) + - [x] Apply patch directly (new file) + +2. **litellm/litellm_core_utils/litellm_logging.py** + - [x] Attempt to apply patch (conflicts detected) + - [x] Review .rej file and patch contents + - [x] Identify where CloudWatch logging changes need to be integrated + - [x] Manually merge relevant changes + - [x] Test logging functionality + +3. **litellm/proxy/proxy_server.py** + - [x] Review patch and compare with current file + - [x] Manually merge relevant changes + - [x] Test proxy logging hooks + +4. **litellm/proxy/utils.py** + - [x] Review patch and compare with current file + - [x] Manually merge relevant changes + - [x] Test call type handling + +5. **litellm/utils.py** + - [x] Review patch and compare with current file + - [x] Manually merge relevant changes + - [x] Test async callback logic + +6. **litellm/__init__.py** + - [x] Add `cloudwatch_callback_params` attribute to fix test errors + +## Changes Applied (from diff) +| File | Change Type | Description | Status | +|------|-------------|-------------|--------| +| litellm/integrations/cloud_watch.py | Add | New integration for logging to AWS CloudWatch | ✅ Complete | +| litellm/litellm_core_utils/litellm_logging.py | Modify | Refactors AWS logging, adds CloudWatch logger, improves callback handling | ✅ Complete | +| litellm/proxy/proxy_server.py | Modify | Adds/updates logging hooks, filters model, adds call IDs, updates headers | ✅ Complete | +| litellm/proxy/utils.py | Modify | Adds new call types to pre_call_hook | ✅ Complete | +| litellm/utils.py | Modify | Adds cloudwatch to async callback logic, supports litellm_metadata | ✅ Complete | +| litellm/__init__.py | Modify | Added `cloudwatch_callback_params` attribute to fix test errors | ✅ Complete | + +## Testing Coverage + +### Existing Tests Updated +- [x] Updated `test_embedding_input_array_of_tokens` in `tests/litellm/proxy/test_proxy_server.py` to handle detailed proxy_server_request logging data +- [x] Verified all tests pass with our changes (432 tests in the proxy module passed) + +### New Tests Implemented + +1. **CloudWatch Logger Tests** + - [x] Created/Updated `tests/litellm/integrations/test_cloud_watch.py` with: + - Unit tests for CloudWatch logger initialization + - Mocked AWS CloudWatch API calls to test log delivery + - Tests for different log event formats + +3. **Test Fixes** + - [x] Fixed test initialization issues with `ProxyLogging` class by properly mocking the required `user_api_key_cache` parameter + - [x] Ensured proper cleanup in test mocks to avoid test interference + - [x] Made sure all tests properly exercise the actual functionality rather than stubbing essential components + +## Configuration Details + +CloudWatch logging can be configured in the LiteLLM config using: + +```yaml +litellm_settings: + telemetry: True + success_callback: ["cloudwatch"] + cloudwatch_callback_params: + log_group_name: /litellm + aws_region: eu-central-1 +``` + +## Next Steps +- Consider additional parameterization for CloudWatch logging (e.g., customizable log streams) +- Monitor production usage for any performance impacts +- Document best practices for log retention and analysis + + \ No newline at end of file diff --git a/docs/my-website/docs/proxy/logging.md b/docs/my-website/docs/proxy/logging.md index cf36963b7e16..fa3a91c15a47 100644 --- a/docs/my-website/docs/proxy/logging.md +++ b/docs/my-website/docs/proxy/logging.md @@ -1316,7 +1316,6 @@ This will log all successful LLM calls to s3 Bucket ```shell AWS_ACCESS_KEY_ID = "" AWS_SECRET_ACCESS_KEY = "" -AWS_REGION_NAME = "" ``` **Step 2**: Create a `config.yaml` file and set `litellm_settings`: `success_callback` @@ -2626,3 +2625,39 @@ litellm_settings: `thresholds` are not required by default, but you can tune the values to your needs. Default values is `4` for all categories ::: --> + +## CloudWatch Logging + +Log LLM input/output to AWS CloudWatch. + +| Property | Details | +|----------|---------| +| Description | Log LLM calls to AWS CloudWatch | + +#### Basic Setup + +1. Add `cloudwatch` to your config.yaml +```yaml +litellm_settings: + success_callback: ["cloudwatch"] + cloudwatch_callback_params: + log_group_name: /litellm + aws_region: us-west-2 +``` + +2. Set AWS credentials as environment variables +```shell +AWS_ACCESS_KEY_ID="" +AWS_SECRET_ACCESS_KEY="" +AWS_REGION="us-west-2" +``` + +3. Start the proxy +``` +litellm --config /path/to/config.yaml +``` + +#### Fields Logged to CloudWatch + +- Standard LLM request/response data +- All logs include a unique `litellm_call_id` for tracing diff --git a/docs/runbooks/update-sofatutor-tweaks-to-latest-stable.md b/docs/runbooks/update-sofatutor-tweaks-to-latest-stable.md new file mode 100644 index 000000000000..0b99f97566d7 --- /dev/null +++ b/docs/runbooks/update-sofatutor-tweaks-to-latest-stable.md @@ -0,0 +1,322 @@ +# Runbook: Update sofatutor-tweaks to Latest Stable Tag + +This runbook describes how to update the `sofatutor-tweaks` branch to the latest stable tag from the upstream BerriAI/litellm repository. + +## Prerequisites + +- Git configured with access to both `origin` (sofatutor/litellm) and `upstream` (BerriAI/litellm) remotes +- GitHub CLI (`gh`) installed and authenticated +- Python environment with test dependencies installed + +## Overview + +1. Fetch latest upstream tags +2. Identify the latest stable tag +3. Create a new branch based on that tag +4. Merge sofatutor-tweaks into the new branch (or vice versa) +5. Resolve any conflicts +6. Run tests to verify +7. Update PR #4 with the new base branch +8. Create a new `-sofatutor` tag +9. Draft a new release with all changes + +--- + +## Step 1: Fetch Latest Upstream Tags + +```bash +# Ensure upstream remote is configured +git remote -v | grep upstream || git remote add upstream https://github.com/BerriAI/litellm.git + +# Fetch all tags from upstream +git fetch upstream --tags +``` + +## Step 2: Identify the Latest Stable Tag + +```bash +# List all stable tags, sorted by version +git tag -l "*-stable*" | sort -V | tail -10 + +# The latest stable tag should be something like: v1.XX.Y-stable.Z +# Store it in a variable for later use +LATEST_STABLE_TAG=$(git tag -l "*-stable*" | grep -v sofatutor | sort -V | tail -1) +echo "Latest stable tag: $LATEST_STABLE_TAG" +``` + +## Step 3: Create a New Branch Based on the Stable Tag + +```bash +# Create and checkout a new branch from the stable tag +git checkout -b "stable-update-${LATEST_STABLE_TAG}" "${LATEST_STABLE_TAG}" + +# Push this branch to origin +git push -u origin "stable-update-${LATEST_STABLE_TAG}" +``` + +## Step 4: Merge sofatutor-tweaks into the New Branch + +```bash +# Merge sofatutor-tweaks into the new stable branch +git merge sofatutor-tweaks +``` + +### If there are conflicts: + +1. **Identify conflicting files:** + ```bash + git status + ``` + +2. **Common conflict resolution strategies:** + + - **For our custom files** (e.g., `.github/workflows/sofatutor_image.yml`): + Keep our version (`--ours` from sofatutor-tweaks perspective, but since we're merging INTO stable, use `--theirs`): + ```bash + git checkout --theirs + ``` + + - **For upstream changes we want to keep:** + ```bash + git checkout --ours + ``` + + - **For mixed changes** (manual resolution required): + Open the file, look for conflict markers (`<<<<<<<`, `=======`, `>>>>>>>`), and manually resolve. + +3. **After resolving each file:** + ```bash + git add + ``` + +4. **Complete the merge:** + ```bash + git commit -m "Merge sofatutor-tweaks into ${LATEST_STABLE_TAG}" + ``` + +## Step 5: Run Tests + +```bash +# Install dependencies if needed +pip install -e ".[dev]" + +# Run the core test suite +pytest tests/local_testing/ -v --tb=short + +# Run specific tests related to our customizations +pytest tests/llm_responses_api_testing/ -v + +# If you have specific proxy tests +pytest tests/proxy_unit_tests/ -v -k "not slow" +``` + +### If tests fail: + +1. Investigate the failure +2. Fix the issue in the merge branch +3. Commit the fix: + ```bash + git add . + git commit -m "fix: resolve test failures after merge" + ``` + +## Step 6: Update sofatutor-tweaks Branch + +```bash +# Switch to sofatutor-tweaks +git checkout sofatutor-tweaks + +# Fast-forward merge from the stable-update branch +git merge "stable-update-${LATEST_STABLE_TAG}" + +# Push the updated sofatutor-tweaks +git push origin sofatutor-tweaks +``` + +## Step 7: Update PR #4 Base Branch + +PR #4 contains all sofatutor customizations. After updating sofatutor-tweaks, PR #4 should automatically reflect the changes if it's based on sofatutor-tweaks. + +If you need to update the PR base branch: + +```bash +# Via GitHub CLI +gh pr edit 4 --base main # or whatever the target base should be +``` + +Or update via GitHub UI: +1. Go to https://github.com/sofatutor/litellm/pull/4 +2. Click "Edit" next to the base branch +3. Select the appropriate base branch + +## Step 8: Create the New Sofatutor Tag + +```bash +# Create the new tag with -sofatutor suffix +NEW_SOFATUTOR_TAG="${LATEST_STABLE_TAG}-sofatutor" +git tag -a "${NEW_SOFATUTOR_TAG}" -m "Sofatutor release based on ${LATEST_STABLE_TAG}" + +# Push the new tag +git push origin "${NEW_SOFATUTOR_TAG}" +``` + +## Step 9: Draft a New Release + +### Get the previous sofatutor tag: + +```bash +PREVIOUS_SOFATUTOR_TAG=$(git tag -l "*-sofatutor" | sort -V | tail -2 | head -1) +echo "Previous sofatutor tag: $PREVIOUS_SOFATUTOR_TAG" +``` + +### Generate changelog between tags: + +```bash +# Get commits between the two sofatutor tags +git log "${PREVIOUS_SOFATUTOR_TAG}..${NEW_SOFATUTOR_TAG}" --oneline --no-merges + +# Get upstream changes (from previous stable to new stable) +PREVIOUS_STABLE_TAG=$(echo $PREVIOUS_SOFATUTOR_TAG | sed 's/-sofatutor//') +git log "${PREVIOUS_STABLE_TAG}..${LATEST_STABLE_TAG}" --oneline --no-merges | head -50 +``` + +### Create the release: + +```bash +gh release create "${NEW_SOFATUTOR_TAG}" \ + --title "${NEW_SOFATUTOR_TAG}" \ + --notes "## What's Changed + +### Upstream Changes (${PREVIOUS_STABLE_TAG} → ${LATEST_STABLE_TAG}) + +- See full changelog: https://github.com/BerriAI/litellm/compare/${PREVIOUS_STABLE_TAG}...${LATEST_STABLE_TAG} + +### Sofatutor Customizations +- **Proxy**: Centralized error handling with enhanced OpenAI exception mapping and parsing for clearer logs and standardized responses +- **Proxy**: Rename function from \`image_generation\` to \`moderation\` in the moderations endpoint; change call type from \`audio_speech\` to \`pass_through_endpoint\` for accurate logging/metrics +- **OpenAI (audio speech)**: Add streaming support via context managers and implement deferred streaming to avoid prematurely closing upstream streams +- **CloudWatch Logging**: Remove Assistants API references from CloudWatch logging (to reduce log noise) +- **CI**: Add \`.github/workflows/sofatutor_image.yml\` for Sofatutor Docker image builds +- **fix(responses-api)**: Support instructions as list when using prompt objects + +### Based on +- LiteLLM ${LATEST_STABLE_TAG} +- Previous Sofatutor release: ${PREVIOUS_SOFATUTOR_TAG} + +**Full Changelog**: https://github.com/sofatutor/litellm/compare/${PREVIOUS_SOFATUTOR_TAG}...${NEW_SOFATUTOR_TAG}" +``` + +### Verify the release: + +1. Go to https://github.com/sofatutor/litellm/releases +2. Verify the release was created correctly +3. Edit if needed with `gh release edit "${NEW_SOFATUTOR_TAG}" --notes "..."` + +--- + +## Quick Reference Script + +Here's a complete script you can run (after setting the variables): + +```bash +#!/bin/bash +set -e + +# Configuration +UPSTREAM_REMOTE="upstream" +ORIGIN_REMOTE="origin" + +# Step 1: Fetch upstream +git fetch $UPSTREAM_REMOTE --tags + +# Step 2: Find latest stable tag +LATEST_STABLE_TAG=$(git tag -l "*-stable*" | grep -v sofatutor | sort -V | tail -1) +echo "Latest stable tag: $LATEST_STABLE_TAG" + +# Step 3: Create update branch +BRANCH_NAME="stable-update-${LATEST_STABLE_TAG}" +git checkout -b "$BRANCH_NAME" "$LATEST_STABLE_TAG" + +# Step 4: Merge sofatutor-tweaks +echo "Merging sofatutor-tweaks..." +if ! git merge sofatutor-tweaks -m "Merge sofatutor-tweaks into ${LATEST_STABLE_TAG}"; then + echo "⚠️ Conflicts detected! Please resolve manually, then run:" + echo " git add . && git commit" + echo " Then re-run this script from step 5" + exit 1 +fi + +# Step 5: Run tests +echo "Running tests..." +pytest tests/local_testing/ -v --tb=short -x + +# Step 6: Update sofatutor-tweaks +git checkout sofatutor-tweaks +git merge "$BRANCH_NAME" +git push $ORIGIN_REMOTE sofatutor-tweaks + +# Step 7: Create new tag +NEW_SOFATUTOR_TAG="${LATEST_STABLE_TAG}-sofatutor" +PREVIOUS_SOFATUTOR_TAG=$(git tag -l "*-sofatutor" | sort -V | tail -1) + +git tag -a "$NEW_SOFATUTOR_TAG" -m "Sofatutor release based on ${LATEST_STABLE_TAG}" +git push $ORIGIN_REMOTE "$NEW_SOFATUTOR_TAG" + +echo "✅ Done! New tag: $NEW_SOFATUTOR_TAG" +echo "📝 Don't forget to create a release at:" +echo " https://github.com/sofatutor/litellm/releases/new?tag=${NEW_SOFATUTOR_TAG}" +``` + +--- + +## Troubleshooting + +### Merge conflicts in specific files + +| File Type | Resolution Strategy | +|-----------|---------------------| +| `.github/workflows/sofatutor_image.yml` | Keep sofatutor version | +| `litellm/proxy/*.py` | Carefully merge, keeping both upstream fixes and our customizations | +| `litellm/llms/openai/*.py` | Review changes, upstream usually takes priority unless we have specific fixes | +| `tests/*` | Usually keep both sets of tests | + +### Tests failing after merge + +1. Check if it's a dependency issue: `pip install -e ".[dev]" --upgrade` +2. Check if upstream changed APIs we depend on +3. Review the failing test to understand what changed + +### Tag already exists + +If the sofatutor tag already exists: +```bash +# Delete local tag +git tag -d "${NEW_SOFATUTOR_TAG}" + +# Delete remote tag (careful!) +git push origin --delete "${NEW_SOFATUTOR_TAG}" + +# Recreate +git tag -a "${NEW_SOFATUTOR_TAG}" -m "..." +git push origin "${NEW_SOFATUTOR_TAG}" +``` + +--- + +## Checklist + +- [ ] Fetched latest upstream tags +- [ ] Identified latest stable tag: `_________________` +- [ ] Created update branch +- [ ] Merged sofatutor-tweaks +- [ ] Resolved all conflicts +- [ ] All tests passing +- [ ] Updated sofatutor-tweaks branch +- [ ] Updated PR #4 if needed +- [ ] Created new `-sofatutor` tag +- [ ] Drafted release with changelog +- [ ] Published release + +--- + +*Last updated: November 2025* diff --git a/litellm/__init__.py b/litellm/__init__.py index b47c7b74d338..940de54f8bd7 100644 --- a/litellm/__init__.py +++ b/litellm/__init__.py @@ -361,6 +361,7 @@ datadog_llm_observability_params: Optional[Union[DatadogLLMObsInitParams, Dict]] = None datadog_params: Optional[Union[DatadogInitParams, Dict]] = None aws_sqs_callback_params: Optional[Dict] = None +cloudwatch_callback_params: Optional[Dict] = None generic_logger_headers: Optional[Dict] = None default_key_generate_params: Optional[Dict] = None upperbound_key_generate_params: Optional[LiteLLM_UpperboundKeyGenerateParams] = None diff --git a/litellm/integrations/SlackAlerting/slack_alerting.py b/litellm/integrations/SlackAlerting/slack_alerting.py index 3efe58737865..7c12512f170f 100644 --- a/litellm/integrations/SlackAlerting/slack_alerting.py +++ b/litellm/integrations/SlackAlerting/slack_alerting.py @@ -509,12 +509,19 @@ async def failed_tracking_alert(self, error_message: str, failing_model: str): _cache_key = "budget_alerts:failed_tracking:{}".format(failing_model) result = await _cache.async_get_cache(key=_cache_key) if result is None: - await self.send_alert( - message=message, - level="High", - alert_type=AlertType.failed_tracking_spend, - alerting_metadata={}, - ) + try: + await self.send_alert( + message=message, + level="High", + alert_type=AlertType.failed_tracking_spend, + alerting_metadata={}, + ) + except Exception as e: + # Don't raise if webhook is missing or misconfigured; log and continue + verbose_proxy_logger.error( + "[Non-Blocking Error] Slack failed_tracking_alert: %s", str(e) + ) + return await _cache.async_set_cache( key=_cache_key, value="SENT", diff --git a/litellm/integrations/cloud_watch.py b/litellm/integrations/cloud_watch.py new file mode 100644 index 000000000000..fef0b6621760 --- /dev/null +++ b/litellm/integrations/cloud_watch.py @@ -0,0 +1,120 @@ +import datetime +import os +import boto3 +import json +from typing import Optional +from botocore.exceptions import ClientError + +import litellm +from litellm._logging import print_verbose, verbose_logger +from litellm.types.utils import StandardLoggingPayload + + +class CloudWatchLogger: + def __init__( + self, + log_group_name=None, + log_stream_name=None, + aws_region=None, + ): + try: + verbose_logger.debug( + f"in init cloudwatch logger - cloudwatch_callback_params {litellm.cloudwatch_callback_params}" + ) + + if litellm.cloudwatch_callback_params is not None: + # read in .env variables - example os.environ/CLOUDWATCH_LOG_GROUP_NAME + for key, value in litellm.cloudwatch_callback_params.items(): + if isinstance(value, str) and value.startswith("os.environ/"): + litellm.cloudwatch_callback_params[key] = litellm.get_secret(value) + # now set cloudwatch params from litellm.cloudwatch_callback_params + log_group_name = litellm.cloudwatch_callback_params.get("log_group_name", log_group_name) + log_stream_name = litellm.cloudwatch_callback_params.get("log_stream_name", log_stream_name) + aws_region = litellm.cloudwatch_callback_params.get("aws_region", aws_region) + + self.log_group_name = log_group_name or os.getenv("CLOUDWATCH_LOG_GROUP_NAME") + self.log_stream_name = log_stream_name or os.getenv("CLOUDWATCH_LOG_STREAM_NAME") + self.aws_region = aws_region or os.getenv("AWS_REGION") + + if self.log_group_name is None: + raise ValueError("log_group_name must be provided either through parameters, cloudwatch_callback_params, or environment variables.") + + # Initialize CloudWatch Logs client + self.logs_client = boto3.client("logs", region_name=self.aws_region) + + # Ensure the log group exists + self._ensure_log_group() + self.sequence_token = None + + except Exception as e: + print_verbose(f"Got exception while initializing CloudWatch Logs client: {str(e)}") + raise e + + def _ensure_log_group(self): + try: + self.logs_client.create_log_group(logGroupName=self.log_group_name) + print_verbose(f"Created log group: {self.log_group_name}") + except self.logs_client.exceptions.ResourceAlreadyExistsException: + print_verbose(f"Log group already exists: {self.log_group_name}") + + def _ensure_log_stream(self): + try: + self.logs_client.create_log_stream( + logGroupName=self.log_group_name, logStreamName=self.log_stream_name + ) + print_verbose(f"Created log stream: {self.log_stream_name}") + except self.logs_client.exceptions.ResourceAlreadyExistsException: + print_verbose(f"Log stream already exists: {self.log_stream_name}") + + async def _async_log_event( + self, kwargs, response_obj, start_time, end_time, print_verbose + ): + self.log_event(kwargs, response_obj, start_time, end_time, print_verbose) + + def log_event(self, kwargs, response_obj, start_time, end_time, print_verbose): + try: + # Ensure log group and stream exist before logging + self._ensure_log_group() + + verbose_logger.debug( + f"CloudWatch Logging - Enters logging function for model {kwargs}" + ) + + + # Construct payload + payload: Optional[StandardLoggingPayload] = kwargs.get("standard_logging_object", None) + if payload is None: + litellm_params = kwargs.get("litellm_params", {}) + metadata = litellm_params.get("metadata", {}) or {} + payload = {key: value for key, value in metadata.items() if key not in ["headers", "endpoint", "caching_groups", "previous_models"]} + + log_event_message = json.dumps(payload) + + timestamp = int(datetime.datetime.now().timestamp() * 1000) + + if self.log_stream_name is None: + self.log_stream_name = payload["id"] + self._ensure_log_stream() + + # Prepare the log event parameters + log_event_params = { + 'logGroupName': self.log_group_name, + 'logStreamName': self.log_stream_name, + 'logEvents': [ + { + 'timestamp': timestamp, + 'message': log_event_message + } + ] + } + + if self.sequence_token: + log_event_params['sequenceToken'] = self.sequence_token + + response = self.logs_client.put_log_events(**log_event_params) + + self.sequence_token = response['nextSequenceToken'] + + print_verbose(f"Logged to CloudWatch: {log_event_message}") + except Exception as e: + verbose_logger.exception(f"CloudWatch Logs Error: {str(e)}") diff --git a/litellm/litellm_core_utils/litellm_logging.py b/litellm/litellm_core_utils/litellm_logging.py index 6bad7ee29e2d..7f32f563f54c 100644 --- a/litellm/litellm_core_utils/litellm_logging.py +++ b/litellm/litellm_core_utils/litellm_logging.py @@ -126,6 +126,8 @@ from ..integrations.arize.arize_phoenix import ArizePhoenixLogger from ..integrations.athina import AthinaLogger from ..integrations.azure_storage.azure_storage import AzureBlobStorageLogger +from ..integrations.braintrust_logging import BraintrustLogger +from ..integrations.cloud_watch import CloudWatchLogger from ..integrations.custom_prompt_management import CustomPromptManagement from ..integrations.datadog.datadog import DataDogLogger from ..integrations.datadog.datadog_llm_obs import DataDogLLMObsLogger @@ -226,6 +228,9 @@ local_cache: Optional[Dict[str, str]] = {} last_fetched_at = None last_fetched_at_keys = None +aws_loggers = {} +cloudWatchLogger = None +genericAPILogger = None #### @@ -1592,10 +1597,20 @@ def _success_handler_helper_fn( ) ) elif standard_logging_object is not None: + self.model_call_details[ + "standard_logging_object" + ] = standard_logging_object + else: # streaming chunks + image gen. self.model_call_details["standard_logging_object"] = ( - standard_logging_object + get_standard_logging_object_payload( + kwargs=self.model_call_details, + init_response_obj=result, + start_time=start_time, + end_time=end_time, + logging_obj=self, + status="success", + ) ) - else: self.model_call_details["response_cost"] = None result = self._transform_usage_objects(result=result) @@ -1628,6 +1643,17 @@ def _is_recognized_call_type_for_logging( """ Returns True if the call type is recognized for logging (eg. ModelResponse, ModelResponseStream, etc.) """ + # Ensure Text-to-Speech calls are recognized for logging even if the + # provider returns a lightweight streaming adapter instead of + # HttpxBinaryResponseContent. This guarantees building a + # standard_logging_object for TTS so downstream proxy callbacks can + # track spend and budgets. + try: + if self.call_type in (CallTypes.speech.value, CallTypes.aspeech.value): + return True + except Exception: + # If call_type is missing for any reason, fallthrough to type checks + pass if ( isinstance(logging_result, ModelResponse) or isinstance(logging_result, ModelResponseStream) @@ -1781,6 +1807,23 @@ def success_handler( # noqa: PLR0915 call_type=self.call_type, ) + # Ensure a standard logging payload exists even if no recognized result + # (e.g., TTS adapters with stream-like behavior). This prevents + # downstream proxy callbacks from failing. + if "standard_logging_object" not in self.model_call_details: + try: + self.model_call_details["standard_logging_object"] = get_standard_logging_object_payload( + kwargs=self.model_call_details, + init_response_obj=result if isinstance(result, (dict, BaseModel)) else {}, + start_time=start_time, + end_time=end_time, + logging_obj=self, + status="success", + standard_built_in_tools_params=self.standard_built_in_tools_params, + ) + except Exception: + pass + self.has_run_logging(event_type="sync_success") for callback in callbacks: try: @@ -2021,16 +2064,13 @@ def success_handler( # noqa: PLR0915 user_id=kwargs.get("user", None), print_verbose=print_verbose, ) - if callback == "s3": - global s3Logger - if s3Logger is None: - s3Logger = S3Logger() + if callback in ["s3", "cloudwatch"]: if self.stream: if "complete_streaming_response" in self.model_call_details: print_verbose( - "S3Logger Logger: Got Stream Event - Completed Stream Response" + f"{callback.capitalize()} Logger: Got Stream Event - Completed Stream Response" ) - s3Logger.log_event( + aws_loggers[callback].log_event( kwargs=self.model_call_details, response_obj=self.model_call_details[ "complete_streaming_response" @@ -2039,12 +2079,20 @@ def success_handler( # noqa: PLR0915 end_time=end_time, print_verbose=print_verbose, ) + elif self.model_call_details.get("log_event_type") == "successful_api_call": + aws_loggers[callback].log_event( + kwargs=self.model_call_details, + response_obj=result, + start_time=start_time, + end_time=end_time, + print_verbose=print_verbose, + ) else: print_verbose( - "S3Logger Logger: Got Stream Event - No complete stream response as yet" + f"{callback.capitalize()} Logger: Got Stream Event - No complete stream response as yet" ) else: - s3Logger.log_event( + aws_loggers[callback].log_event( kwargs=self.model_call_details, response_obj=result, start_time=start_time, @@ -3214,7 +3262,7 @@ def set_callbacks(callback_list, function_id=None): # noqa: PLR0915 """ Globally sets the callback client """ - global sentry_sdk_instance, capture_exception, add_breadcrumb, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, supabaseClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger, deepevalLogger + global sentry_sdk_instance, capture_exception, add_breadcrumb, posthog, slack_app, alerts_channel, traceloopLogger, athinaLogger, heliconeLogger, supabaseClient, lunaryLogger, promptLayerLogger, langFuseLogger, customLogger, weightsBiasesLogger, logfireLogger, dynamoLogger, s3Logger, aws_loggers, dataDogLogger, prometheusLogger, greenscaleLogger, openMeterLogger, deepevalLogger try: for callback in callback_list: @@ -3290,8 +3338,10 @@ def set_callbacks(callback_list, function_id=None): # noqa: PLR0915 dataDogLogger = DataDogLogger() elif callback == "dynamodb": dynamoLogger = DyanmoDBLogger() - elif callback == "s3": - s3Logger = S3Logger() + elif callback in ["s3", "cloudwatch"]: + if callback not in aws_loggers: + aws_loggers[callback] = S3Logger() if callback == "s3" else CloudWatchLogger() + print_verbose(f"Initialized {callback.capitalize()} Logger") elif callback == "wandb": from litellm.integrations.weights_biases import WeightsBiasesLogger @@ -4725,14 +4775,37 @@ def get_standard_logging_object_payload( ## Get model cost information ## base_model = _get_base_model_from_metadata(model_call_details=kwargs) - custom_pricing = use_custom_pricing_for_model(litellm_params=litellm_params) - - model_cost_information = StandardLoggingPayloadSetup.get_model_cost_information( - base_model=base_model, - custom_pricing=custom_pricing, - custom_llm_provider=kwargs.get("custom_llm_provider"), - init_response_obj=init_response_obj, - ) + if base_model is None: + model_cost_information = None + else: + custom_pricing = use_custom_pricing_for_model(litellm_params=litellm_params) + model_cost_name = _select_model_name_for_cost_calc( + model=None, + completion_response=init_response_obj, # type: ignore + base_model=base_model, + custom_pricing=custom_pricing, + ) + if model_cost_name is None: + model_cost_information = StandardLoggingModelInformation( + model_map_key="", model_map_value=None + ) + else: + custom_llm_provider = kwargs.get("custom_llm_provider", None) + try: + _model_cost_information = litellm.get_model_info( + model=model_cost_name, custom_llm_provider=custom_llm_provider + ) + model_cost_information = StandardLoggingModelInformation( + model_map_key=model_cost_name, + model_map_value=_model_cost_information, + ) + except Exception: + verbose_logger.debug( + f"Model={model_cost_name} is not mapped in model cost map. Defaulting to None model_cost_information for standard_logging_payload" + ) + model_cost_information = StandardLoggingModelInformation( + model_map_key=model_cost_name, model_map_value=None + ) response_cost: float = kwargs.get("response_cost", 0) or 0.0 error_information = StandardLoggingPayloadSetup.get_error_information( diff --git a/litellm/llms/openai/openai.py b/litellm/llms/openai/openai.py index 2949e35e5e79..9c9ee508d19b 100644 --- a/litellm/llms/openai/openai.py +++ b/litellm/llms/openai/openai.py @@ -63,6 +63,26 @@ openAIGPT5Config = OpenAIGPT5Config() +class _DeferredOpenAITTSStream: + """ + Opens the OpenAI streaming context only when aiter_bytes() is consumed, + keeping the upstream stream alive while the proxy yields chunks. + """ + + def __init__(self, client: AsyncOpenAI, request_kwargs: dict): + self._client = client + self._request_kwargs = request_kwargs + self._hidden_params: dict = {} + + async def aiter_bytes(self, chunk_size: int = 1024): + async with self._client.audio.speech.with_streaming_response.create( + **self._request_kwargs + ) as streamed: + async for chunk in streamed.http_response.aiter_bytes( + chunk_size=chunk_size + ): + yield chunk + class MistralEmbeddingConfig: """ Reference: https://docs.mistral.ai/api/#operation/createEmbedding @@ -1439,6 +1459,8 @@ def audio_speech( client=client, ) + # For sync path, fall back to simple non-streaming create (keeps behavior for sync speech()) + # Proxy uses async path; real streaming is handled in async_audio_speech via deferred stream. response = cast(OpenAI, openai_client).audio.speech.create( model=model, voice=voice, # type: ignore @@ -1473,14 +1495,25 @@ async def async_audio_speech( ), ) - response = await openai_client.audio.speech.create( - model=model, - voice=voice, # type: ignore - input=input, + # Return a deferred streaming object so proxy can iterate without prematurely closing upstream + request_kwargs = { + "model": model, + "voice": voice, + "input": input, **optional_params, - ) - - return HttpxBinaryResponseContent(response=response.response) + } + deferred = _DeferredOpenAITTSStream(client=openai_client, request_kwargs=request_kwargs) + # Adapt to HttpxBinaryResponseContent interface by exposing aiter_bytes() + class _Adapter: + _hidden_params: dict = {} + + async def aiter_bytes(self, chunk_size: int = 1024): + async def _gen(): + async for b in deferred.aiter_bytes(chunk_size=chunk_size): + yield b + return _gen() + + return _Adapter() # type: ignore class OpenAIFilesAPI(BaseLLM): diff --git a/litellm/proxy/utils.py b/litellm/proxy/utils.py index 015540caafd0..9c30c846f79e 100644 --- a/litellm/proxy/utils.py +++ b/litellm/proxy/utils.py @@ -62,6 +62,7 @@ from litellm.integrations.custom_logger import CustomLogger from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting from litellm.integrations.SlackAlerting.utils import _add_langfuse_trace_id_to_alert +from litellm.integrations.cloud_watch import CloudWatchLogger from litellm.litellm_core_utils.litellm_logging import Logging from litellm.litellm_core_utils.safe_json_dumps import safe_dumps from litellm.litellm_core_utils.safe_json_loads import safe_json_loads diff --git a/litellm/types/llms/openai.py b/litellm/types/llms/openai.py index fd2f9b9d9c82..25eb0d0f679d 100644 --- a/litellm/types/llms/openai.py +++ b/litellm/types/llms/openai.py @@ -43,7 +43,7 @@ # Handle OpenAI SDK version compatibility for Text type try: - from openai.types.responses.response_create_params import ( Text as ResponseText ) # type: ignore[attr-defined] # fmt: skip # isort: skip + from openai.types.responses.response_create_params import Text as ResponseText # type: ignore[attr-defined] # fmt: skip # isort: skip except (ImportError, AttributeError): # Fall back to the concrete config type available in all SDK versions from openai.types.responses.response_text_config_param import ( @@ -849,12 +849,12 @@ def __init__(self, **kwargs): class Hyperparameters(BaseModel): batch_size: Optional[Union[str, int]] = None # "Number of examples in each batch." - learning_rate_multiplier: Optional[Union[str, float]] = ( - None # Scaling factor for the learning rate - ) - n_epochs: Optional[Union[str, int]] = ( - None # "The number of epochs to train the model for" - ) + learning_rate_multiplier: Optional[ + Union[str, float] + ] = None # Scaling factor for the learning rate + n_epochs: Optional[ + Union[str, int] + ] = None # "The number of epochs to train the model for" class FineTuningJobCreate(BaseModel): @@ -881,18 +881,18 @@ class FineTuningJobCreate(BaseModel): model: str # "The name of the model to fine-tune." training_file: str # "The ID of an uploaded file that contains training data." - hyperparameters: Optional[Hyperparameters] = ( - None # "The hyperparameters used for the fine-tuning job." - ) - suffix: Optional[str] = ( - None # "A string of up to 18 characters that will be added to your fine-tuned model name." - ) - validation_file: Optional[str] = ( - None # "The ID of an uploaded file that contains validation data." - ) - integrations: Optional[List[str]] = ( - None # "A list of integrations to enable for your fine-tuning job." - ) + hyperparameters: Optional[ + Hyperparameters + ] = None # "The hyperparameters used for the fine-tuning job." + suffix: Optional[ + str + ] = None # "A string of up to 18 characters that will be added to your fine-tuned model name." + validation_file: Optional[ + str + ] = None # "The ID of an uploaded file that contains validation data." + integrations: Optional[ + List[str] + ] = None # "A list of integrations to enable for your fine-tuning job." seed: Optional[int] = None # "The seed controls the reproducibility of the job." @@ -1053,7 +1053,7 @@ class ResponsesAPIResponse(BaseLiteLLMOpenAIResponseObject): created_at: int error: Optional[dict] = None incomplete_details: Optional[IncompleteDetails] = None - instructions: Optional[str] = None + instructions: Optional[Union[str, List[Dict[str, Any]]]] = None metadata: Optional[Dict] = None model: Optional[str] = None object: Optional[str] = None diff --git a/litellm/utils.py b/litellm/utils.py index 783d462a7afe..49d9c62f11ee 100644 --- a/litellm/utils.py +++ b/litellm/utils.py @@ -690,6 +690,7 @@ def function_setup( # noqa: PLR0915 get_coroutine_checker().is_async_callable(callback) or callback == "dynamodb" or callback == "s3" + or callback == "cloudwatch" ): if dynamic_async_success_callbacks is not None and isinstance( dynamic_async_success_callbacks, list @@ -802,6 +803,24 @@ def function_setup( # noqa: PLR0915 call_type == CallTypes.aspeech.value or call_type == CallTypes.speech.value ): messages = kwargs.get("input", "speech") + # Populate input for TTS so cost calculator can count characters + try: + if isinstance(messages, str): + kwargs.setdefault("metadata", {}) + + except Exception: + pass + # Ensure TTS input is recorded on the logging object for character-based + # pricing in the cost calculator. + try: + if isinstance(messages, str): + # This is set later when the Logging object is constructed; we + # also redundantly set it in model_call_details for safety. + kwargs.setdefault("metadata", {}) + # model_call_details populated below will read from kwargs + + except Exception: + pass elif ( call_type == CallTypes.aresponses.value or call_type == CallTypes.responses.value @@ -832,11 +851,20 @@ def function_setup( # noqa: PLR0915 applied_guardrails=applied_guardrails, ) + # For TTS calls, record the raw input text to assist cost calculation + try: + if call_type in (CallTypes.aspeech.value, CallTypes.speech.value): + if isinstance(kwargs.get("input"), str): + logging_obj.model_call_details["input"] = kwargs.get("input") + except Exception: + pass + ## check if metadata is passed in litellm_params: Dict[str, Any] = {"api_base": ""} if "metadata" in kwargs: litellm_params["metadata"] = kwargs["metadata"] - + elif "litellm_metadata" in kwargs: + litellm_params["metadata"] = kwargs["litellm_metadata"] logging_obj.update_environment_variables( model=model, user="", diff --git a/pr-cloudwatch-logging.md b/pr-cloudwatch-logging.md new file mode 100644 index 000000000000..55da5b267080 --- /dev/null +++ b/pr-cloudwatch-logging.md @@ -0,0 +1,44 @@ +# Add CloudWatch Logging Support + +## Relevant issues +Implements enhanced logging capability for AWS CloudWatch + +## Pre-Submission checklist +- [x] I have Added testing in the `tests/litellm/` directory +- [x] My PR passes all unit tests on `make test-unit` +- [x] My PR's scope is as isolated as possible, it only solves 1 specific problem + +## Type +🆕 New Feature +✅ Test + +## Changes +This PR adds support for AWS CloudWatch logging integration to LiteLLM. + +### Summary +Implements complete CloudWatch logging integration. This allows: +1. Logging LiteLLM activity to AWS CloudWatch +2. Standardized log format across all logging destinations + +### Implementation Details +| File | Change Type | Description | +|------|-------------|-------------| +| litellm/integrations/cloud_watch.py | New | CloudWatch integration module | +| litellm/litellm_core_utils/litellm_logging.py | Modified | Refactored AWS logging support | +| litellm/utils.py | Modified | Added CloudWatch to async callback logic | +| litellm/__init__.py | Modified | Added cloudwatch_callback_params attribute | + +### Testing +- Added comprehensive tests for CloudWatch integration in `tests/litellm/integrations/test_cloud_watch.py` +- Fixed test initialization issues with proper DualCache mocking + +### Configuration +CloudWatch logging can be configured in LiteLLM config using: +```yaml +litellm_settings: + telemetry: True + success_callback: ["cloudwatch"] + cloudwatch_callback_params: + log_group_name: /litellm + aws_region: eu-central-1 +``` diff --git a/scripts/verify_tts_streaming.py b/scripts/verify_tts_streaming.py new file mode 100644 index 000000000000..01ee07142085 --- /dev/null +++ b/scripts/verify_tts_streaming.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +import argparse +import contextlib +import os +import sys +import time +from typing import Optional + +import httpx + + +def build_url(base_url: str, endpoint_path: str) -> str: + if base_url.endswith("/"): + base_url = base_url[:-1] + if not endpoint_path.startswith("/"): + endpoint_path = "/" + endpoint_path + return base_url + endpoint_path + + +def main() -> int: + parser = argparse.ArgumentParser(description="Verify TTS streaming via chunked transfer") + parser.add_argument( + "--base-url", + default=os.environ.get("OPENAI_BASE_URL", "http://0.0.0.0:4000"), + help="Base URL for the API (default from OPENAI_BASE_URL or http://0.0.0.0:4000)", + ) + parser.add_argument( + "--endpoint-path", + default="/v1/audio/speech", + help="Endpoint path to call (e.g. /v1/audio/speech or /openai/audio/speech)", + ) + parser.add_argument( + "--model", + default="gpt-4o-mini-tts", + help="Model name (default: gpt-4o-mini-tts)", + ) + parser.add_argument( + "--voice", + default="shimmer", + help="Voice to use (default: shimmer)", + ) + parser.add_argument( + "--input", + default=( + "Once upon a time, in a bustling city nestled between rolling hills and a sparkling river, there lived a young inventor named Elara. Elara was known throughout the city for her boundless curiosity and her knack for creating marvelous contraptions from the most ordinary of objects. One day, while exploring the attic of her late grandfather’s house, she stumbled upon a dusty, leather-bound journal filled with cryptic notes and intricate sketches of a mysterious machine. Intrigued, Elara spent days deciphering the journal, piecing together the purpose of the device. It was said to be a portal, capable of bridging worlds and connecting distant realms. Driven by excitement and a sense of adventure, Elara gathered the necessary parts—cogs, wires, crystals, and a peculiar brass key—and began assembling the machine in her workshop. As she tightened the final bolt and inserted the key, the device hummed to life, casting a shimmering blue light across the room. With a deep breath, Elara stepped forward and activated the portal. Instantly, she was enveloped in a whirlwind of colors and sounds, feeling herself transported beyond the boundaries of her world. When the light faded, she found herself standing in a lush, enchanted forest, where trees whispered secrets and fantastical creatures roamed freely. Elara realized she had crossed into a realm of endless possibilities, where her inventions could shape the very fabric of reality. Determined to explore and learn, she set off down a winding path, eager to uncover the wonders and challenges that awaited her in this extraordinary new world. And so began Elara’s greatest adventure, one that would test her ingenuity, courage, and heart, and ultimately reveal the true power of imagination and discovery." + ), + help="Text to synthesize", + ) + parser.add_argument( + "--response-format", + default="mp3", + help="Audio response format (default: mp3)", + ) + parser.add_argument( + "--output", + default=None, + help="Optional path to write audio to (if omitted, data is discarded)", + ) + parser.add_argument( + "--http2", + action="store_true", + help="Enable HTTP/2 (default: off). Leave off to see chunked headers in HTTP/1.1", + ) + args = parser.parse_args() + + api_key = os.environ.get("OPENAI_API_KEY") + if not api_key: + print("ERROR: OPENAI_API_KEY is not set in the environment", file=sys.stderr) + return 2 + + url = build_url(args.base_url, args.endpoint_path) + headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + "Accept": "audio/mpeg", + } + json_body = { + "model": args.model, + "input": args.input, + "voice": args.voice, + "response_format": args.response_format, + } + + print(f"Requesting: {url}") + print(f"HTTP/2: {'on' if args.http2 else 'off'} (HTTP/1.1 if off)") + + # Force HTTP/1.1 by default to make Transfer-Encoding: chunked visible when streaming. + # For HTTP/2, chunked header will not be present even when streaming works. + start_req = time.time() + first_byte_at: Optional[float] = None + total_bytes = 0 + + with httpx.Client(http2=args.http2, timeout=None) as client: + with client.stream("POST", url, headers=headers, json=json_body) as resp: + status = resp.status_code + # Print key headers that indicate buffering vs streaming + cl = resp.headers.get("content-length") + te = resp.headers.get("transfer-encoding") + server = resp.headers.get("server") + print(f"Status: {status}") + print(f"Content-Type: {resp.headers.get('content-type')}") + print(f"Content-Length: {cl}") + print(f"Transfer-Encoding: {te}") + print(f"Server: {server}") + + # Stream body + sink_cm = open(args.output, "wb") if args.output else contextlib.nullcontext() + with sink_cm as sink: + for chunk in resp.iter_bytes(): + if not first_byte_at: + first_byte_at = time.time() + print( + f"First byte after {first_byte_at - start_req:.3f}s" + ) + total_bytes += len(chunk) + if sink and hasattr(sink, "write"): + sink.write(chunk) # type: ignore + + end = time.time() + print(f"Total bytes: {total_bytes}") + print(f"Total time: {end - start_req:.3f}s") + if first_byte_at: + print(f"Time to first byte: {first_byte_at - start_req:.3f}s") + + print() + print("Interpretation:") + print("- If Content-Length is absent and Transfer-Encoding is chunked (HTTP/1.1), it streamed.") + print("- If Content-Length is present, the response was buffered by an intermediary or origin.") + print("- Even with HTTP/2 (no chunked header), early first byte indicates streaming.") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) + + diff --git a/tests/llm_responses_api_testing/test_openai_responses_api.py b/tests/llm_responses_api_testing/test_openai_responses_api.py index 7553c6707743..3024345e8dcc 100644 --- a/tests/llm_responses_api_testing/test_openai_responses_api.py +++ b/tests/llm_responses_api_testing/test_openai_responses_api.py @@ -65,7 +65,10 @@ def validate_standard_logging_payload( assert slp is not None, "Standard logging payload should not be None" # Validate token counts - print("VALIDATING STANDARD LOGGING PAYLOAD. response=", json.dumps(response, indent=4, default=str)) + print( + "VALIDATING STANDARD LOGGING PAYLOAD. response=", + json.dumps(response, indent=4, default=str), + ) print("FIELDS IN SLP=", json.dumps(slp, indent=4, default=str)) print("SLP PROMPT TOKENS=", slp["prompt_tokens"]) print("RESPONSE PROMPT TOKENS=", response["usage"]["input_tokens"]) @@ -856,6 +859,206 @@ def json(self): mock_post.assert_called_once() +@pytest.mark.asyncio +async def test_openai_responses_with_prompt_instructions_as_list(): + """ + Test that ResponsesAPIResponse correctly handles instructions returned as a list + when using prompt objects. This is the format OpenAI returns when a prompt template + is expanded - the instructions become a list of message objects rather than a string. + + Regression test for: https://github.com/BerriAI/litellm/issues/XXXX + """ + from litellm.types.llms.openai import ResponsesAPIResponse + + # Mock response with instructions as a list (as returned by OpenAI when using prompt objects) + mock_response = { + "id": "resp_abc123", + "object": "response", + "created_at": 1741476542, + "status": "completed", + "model": "gpt-4o", + "output": [ + { + "type": "message", + "id": "msg_001", + "status": "completed", + "role": "assistant", + "content": [{"type": "output_text", "text": "The answer is 3."}], + } + ], + "parallel_tool_calls": True, + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + "text": {"format": {"type": "text"}}, + "error": None, + "incomplete_details": None, + # This is the key: instructions as a list of message objects (expanded from prompt template) + "instructions": [ + { + "type": "message", + "content": [ + {"type": "input_text", "text": "You are a helpful math assistant."} + ], + "role": "developer", + }, + { + "type": "message", + "content": [ + {"type": "input_text", "text": "Solve the following problem."} + ], + "role": "assistant", + }, + ], + "metadata": {}, + "temperature": 0.7, + "tool_choice": "auto", + "tools": [], + "top_p": 1.0, + "max_output_tokens": None, + "previous_response_id": None, + "reasoning": None, + "truncation": "disabled", + "user": None, + } + + # This should not raise a ValidationError + response = ResponsesAPIResponse(**mock_response) + + # Verify the response was parsed correctly + assert response.id == "resp_abc123" + assert response.status == "completed" + assert isinstance(response.instructions, list) + assert len(response.instructions) == 2 + assert response.instructions[0]["role"] == "developer" + assert response.instructions[1]["role"] == "assistant" + + +@pytest.mark.asyncio +async def test_openai_responses_with_prompt_instructions_as_string(): + """ + Test that ResponsesAPIResponse still correctly handles instructions as a string + (the traditional format when not using prompt objects). + """ + from litellm.types.llms.openai import ResponsesAPIResponse + + mock_response = { + "id": "resp_xyz789", + "object": "response", + "created_at": 1741476542, + "status": "completed", + "model": "gpt-4o", + "output": [], + "parallel_tool_calls": True, + "usage": {"input_tokens": 10, "output_tokens": 5, "total_tokens": 15}, + "text": {"format": {"type": "text"}}, + "error": None, + "incomplete_details": None, + "instructions": "You are a helpful assistant.", # String format + "metadata": {}, + "temperature": 1.0, + "tool_choice": "auto", + "tools": [], + "top_p": 1.0, + "max_output_tokens": None, + "previous_response_id": None, + "reasoning": None, + "truncation": "disabled", + "user": None, + } + + response = ResponsesAPIResponse(**mock_response) + + assert response.id == "resp_xyz789" + assert response.instructions == "You are a helpful assistant." + assert isinstance(response.instructions, str) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("sync_mode", [True, False]) +async def test_openai_responses_streaming_with_prompt_instructions_as_list(sync_mode): + """ + Test that streaming response event types correctly handle instructions as a list + when using prompt objects. This tests that ResponseCreatedEvent, ResponseInProgressEvent, + and ResponseCompletedEvent all properly parse instructions as a list. + + Regression test for ValidationError when using prompt objects with streaming. + """ + from litellm.types.llms.openai import ( + ResponseCreatedEvent, + ResponseInProgressEvent, + ResponseCompletedEvent, + ) + + # Test data with instructions as a list (as returned by OpenAI when using prompt objects) + response_with_list_instructions = { + "id": "resp_stream123", + "object": "response", + "created_at": 1741476542, + "status": "in_progress", + "model": "gpt-4o", + "output": [], + "parallel_tool_calls": True, + "usage": None, + "text": {"format": {"type": "text"}}, + "error": None, + "incomplete_details": None, + "instructions": [ + { + "type": "message", + "content": [ + {"type": "input_text", "text": "You are a helpful assistant."} + ], + "role": "developer", + } + ], + "metadata": {}, + "temperature": 0.7, + "tool_choice": "auto", + "tools": [], + "top_p": 1.0, + "max_output_tokens": None, + "previous_response_id": None, + "reasoning": None, + "truncation": "disabled", + "user": None, + } + + # Test ResponseCreatedEvent with list instructions + created_event_data = { + "type": "response.created", + "response": response_with_list_instructions, + } + created_event = ResponseCreatedEvent(**created_event_data) + assert created_event.type == "response.created" + assert isinstance(created_event.response.instructions, list) + assert len(created_event.response.instructions) == 1 + assert created_event.response.instructions[0]["role"] == "developer" + + # Test ResponseInProgressEvent with list instructions + in_progress_event_data = { + "type": "response.in_progress", + "response": response_with_list_instructions, + } + in_progress_event = ResponseInProgressEvent(**in_progress_event_data) + assert in_progress_event.type == "response.in_progress" + assert isinstance(in_progress_event.response.instructions, list) + + # Test ResponseCompletedEvent with list instructions + completed_response = response_with_list_instructions.copy() + completed_response["status"] = "completed" + completed_response["usage"] = { + "input_tokens": 10, + "output_tokens": 5, + "total_tokens": 15, + } + completed_event_data = { + "type": "response.completed", + "response": completed_response, + } + completed_event = ResponseCompletedEvent(**completed_event_data) + assert completed_event.type == "response.completed" + assert isinstance(completed_event.response.instructions, list) + + def test_bad_request_bad_param_error(): """Raise a BadRequestError when an invalid parameter value is provided""" try: @@ -1662,8 +1865,12 @@ async def async_log_success_event( ), f"Expected response_obj.usage to be of type Usage or dict, but got {type(response_obj.usage)}" # Verify it has the chat completion format fields if isinstance(response_obj.usage, dict): - assert "prompt_tokens" in response_obj.usage, "Usage dict should have prompt_tokens" - assert "completion_tokens" in response_obj.usage, "Usage dict should have completion_tokens" + assert ( + "prompt_tokens" in response_obj.usage + ), "Usage dict should have prompt_tokens" + assert ( + "completion_tokens" in response_obj.usage + ), "Usage dict should have completion_tokens" print("\n\nVALIDATED USAGE\n\n") self.validate_usage = True diff --git a/tests/test_litellm/integrations/test_cloud_watch.py b/tests/test_litellm/integrations/test_cloud_watch.py new file mode 100644 index 000000000000..db687cfbdb50 --- /dev/null +++ b/tests/test_litellm/integrations/test_cloud_watch.py @@ -0,0 +1,213 @@ +import os +import sys +import unittest +from unittest.mock import MagicMock, patch + +import pytest + +# Add project root to path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../.."))) + +from litellm.integrations.cloud_watch import CloudWatchLogger + + +class TestCloudWatchLogger(unittest.TestCase): + """ + Tests for the CloudWatch Logger integration + """ + + def setUp(self): + """Set up test fixtures""" + # Set AWS region for testing + self.aws_region = "us-west-2" + + # Create a log group and stream name for testing + self.log_group_name = "test-log-group" + self.log_stream_name = "test-log-stream" + + # Create a patch for litellm module + self.litellm_patcher = patch("litellm.integrations.cloud_watch.litellm") + self.mock_litellm = self.litellm_patcher.start() + self.mock_litellm.cloudwatch_callback_params = None + self.mock_litellm.get_secret = lambda x: x.replace("os.environ/", "") + + def tearDown(self): + """Clean up after tests""" + self.litellm_patcher.stop() + + @patch("litellm.integrations.cloud_watch.boto3") + def test_init(self, mock_boto3): + """Test CloudWatchLogger initialization""" + # Arrange + mock_client = MagicMock() + mock_boto3.client.return_value = mock_client + + # Act + logger = CloudWatchLogger( + log_group_name=self.log_group_name, + log_stream_name=self.log_stream_name, + aws_region=self.aws_region, + ) + + # Assert + mock_boto3.client.assert_called_once_with( + "logs", + region_name=self.aws_region, + ) + + assert logger.log_group_name == self.log_group_name + assert logger.log_stream_name == self.log_stream_name + + @patch("litellm.integrations.cloud_watch.boto3") + def test_init_with_callback_params(self, mock_boto3): + """Test CloudWatchLogger initialization with callback params""" + # Arrange + mock_client = MagicMock() + mock_boto3.client.return_value = mock_client + + # Set callback params + self.mock_litellm.cloudwatch_callback_params = { + "log_group_name": "callback-group", + "log_stream_name": "callback-stream", + "aws_region": "us-east-1", + } + + # Act + logger = CloudWatchLogger() + + # Assert + mock_boto3.client.assert_called_once_with( + "logs", + region_name="us-east-1", + ) + + assert logger.log_group_name == "callback-group" + assert logger.log_stream_name == "callback-stream" + + @patch("litellm.integrations.cloud_watch.boto3") + def test_log_success_event(self, mock_boto3): + """Test logging a successful event to CloudWatch""" + # Arrange + mock_client = MagicMock() + mock_boto3.client.return_value = mock_client + + logger = CloudWatchLogger( + log_group_name=self.log_group_name, + log_stream_name=self.log_stream_name, + aws_region=self.aws_region, + ) + + # Create test data + test_kwargs = { + "model": "gpt-4", + "messages": [{"role": "user", "content": "Hello"}], + "litellm_params": { + "metadata": { + "call_type": "completion", + "litellm_call_id": "test-call-id", + } + } + } + + test_response = { + "model": "gpt-4", + "choices": [ + { + "message": {"role": "assistant", "content": "Hi there!"}, + "finish_reason": "stop", + "index": 0, + } + ], + "usage": {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15}, + } + + # Act + logger.log_event( + kwargs=test_kwargs, + response_obj=test_response, + start_time=1234567890, + end_time=1234567895, + print_verbose=lambda x: None, + ) + + # Assert + # Verify log events was called + mock_client.put_log_events.assert_called_once() + + # Extract the call arguments + call_args = mock_client.put_log_events.call_args[1] + + # Verify log group and stream + assert call_args["logGroupName"] == self.log_group_name + assert call_args["logStreamName"] == self.log_stream_name + + # Verify log events contains data + log_events = call_args["logEvents"] + assert len(log_events) == 1 + + # Verify timestamp is present + assert "timestamp" in log_events[0] + + @patch("litellm.integrations.cloud_watch.boto3") + def test_log_failure_event(self, mock_boto3): + """Test logging a failure event to CloudWatch""" + # Arrange + mock_client = MagicMock() + mock_boto3.client.return_value = mock_client + + logger = CloudWatchLogger( + log_group_name=self.log_group_name, + log_stream_name=self.log_stream_name, + aws_region=self.aws_region, + ) + + # Create test data + test_kwargs = { + "model": "gpt-4", + "messages": [{"role": "user", "content": "Hello"}], + "litellm_params": { + "metadata": { + "call_type": "completion", + "litellm_call_id": "test-call-id", + "error": "API rate limit exceeded" + } + } + } + + # Act + logger.log_event( + kwargs=test_kwargs, + response_obj=None, + start_time=1234567890, + end_time=1234567895, + print_verbose=lambda x: None, + ) + + # Assert + # Verify put_log_events was called + mock_client.put_log_events.assert_called_once() + +# For pytest compatibility +def test_cloudwatch_logger_init(): + """Pytest-compatible test for CloudWatchLogger initialization""" + with patch("litellm.integrations.cloud_watch.litellm") as mock_litellm: + mock_litellm.cloudwatch_callback_params = None + mock_litellm.get_secret = lambda x: x.replace("os.environ/", "") + + with patch("litellm.integrations.cloud_watch.boto3") as mock_boto3: + mock_client = MagicMock() + mock_boto3.client.return_value = mock_client + + logger = CloudWatchLogger( + log_group_name="test-group", + log_stream_name="test-stream", + aws_region="us-west-2", + ) + + assert logger.log_group_name == "test-group" + assert logger.log_stream_name == "test-stream" + mock_boto3.client.assert_called_once() + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/test_litellm/test_slack_failed_tracking_alert_no_webhook.py b/tests/test_litellm/test_slack_failed_tracking_alert_no_webhook.py new file mode 100644 index 000000000000..003f1783d9a5 --- /dev/null +++ b/tests/test_litellm/test_slack_failed_tracking_alert_no_webhook.py @@ -0,0 +1,17 @@ +import asyncio +import os +import pytest + +from litellm.integrations.SlackAlerting.slack_alerting import SlackAlerting +from litellm.integrations.SlackAlerting.utils import process_slack_alerting_variables +from litellm.proxy._types import AlertType + +@pytest.mark.asyncio +async def test_failed_tracking_alert_does_not_raise_without_webhook(monkeypatch): + # Ensure no webhook in env + monkeypatch.delenv("SLACK_WEBHOOK_URL", raising=False) + + sa = SlackAlerting(alerting=["slack"], alert_types=[AlertType.failed_tracking_spend]) + + # Should not raise even if webhook is missing + await sa.failed_tracking_alert(error_message="test error", failing_model="gpt-x") diff --git a/tests/test_litellm/test_tts_deferred_streaming.py b/tests/test_litellm/test_tts_deferred_streaming.py new file mode 100644 index 000000000000..a21c3e5b54f2 --- /dev/null +++ b/tests/test_litellm/test_tts_deferred_streaming.py @@ -0,0 +1,73 @@ +import asyncio + +from types import SimpleNamespace + +from litellm.llms.openai.openai import _DeferredOpenAITTSStream + + +class _FakeHTTPResponse: + def __init__(self, chunks): + self._chunks = chunks + + async def aiter_bytes(self, chunk_size: int = 1024): + for c in self._chunks: + await asyncio.sleep(0) + yield c + + +class _FakeStreamed: + def __init__(self, chunks, enter_counter): + self.http_response = _FakeHTTPResponse(chunks) + self._enter_counter = enter_counter + + async def __aenter__(self): + self._enter_counter["count"] += 1 + return self + + async def __aexit__(self, exc_type, exc, tb): + return False + + +class _FakeContextFactory: + def __init__(self, chunks, enter_counter): + self._chunks = chunks + self._enter_counter = enter_counter + + def __call__(self, **kwargs): + # Return an async context manager compatible object + return _FakeStreamed(self._chunks, self._enter_counter) + + +def _make_fake_client(chunks, enter_counter): + client = SimpleNamespace() + client.audio = SimpleNamespace() + client.audio.speech = SimpleNamespace() + client.audio.speech.with_streaming_response = SimpleNamespace() + # create(**kwargs) should return an async context manager + client.audio.speech.with_streaming_response.create = _FakeContextFactory(chunks, enter_counter) + return client + + +def test_deferred_streaming_yields_bytes(): + chunks = [b"one", b"two", b"three"] + enter_counter = {"count": 0} + client = _make_fake_client(chunks, enter_counter) + stream = _DeferredOpenAITTSStream( + client=client, + request_kwargs={"model": "x", "voice": "y", "input": "z"}, + ) + + # Ensure stream context not opened until iteration + assert enter_counter["count"] == 0 + + async def _collect(): + out_local = [] + async for b in stream.aiter_bytes(chunk_size=2): + out_local.append(b) + return out_local + + out = asyncio.run(_collect()) + assert out == chunks + # Ensure context was opened exactly once during iteration + assert enter_counter["count"] == 1 + diff --git a/tests/test_litellm/test_tts_logging_standard_payload.py b/tests/test_litellm/test_tts_logging_standard_payload.py new file mode 100644 index 000000000000..5af338f65ef1 --- /dev/null +++ b/tests/test_litellm/test_tts_logging_standard_payload.py @@ -0,0 +1,33 @@ +import pytest + +from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLogging +from litellm.types.utils import CallTypes + +class _DeferredTTSAdapter: + _hidden_params = {} + async def aiter_bytes(self, chunk_size: int = 1024): + async def _gen(): + yield b"bytes" + return _gen() + +@pytest.mark.asyncio +async def test_aspeech_logging_builds_standard_payload_for_tts(): + logging_obj = LiteLLMLogging( + model="gpt-4o-mini-tts", + messages=[], + stream=False, + litellm_call_id="test-call", + function_id="test-func", + call_type=CallTypes.aspeech.value, + start_time=None, + kwargs={"input": "hello world"}, + ) + + result = _DeferredTTSAdapter() + await logging_obj.async_success_handler(result=result) + + assert "standard_logging_object" in logging_obj.model_call_details, ( + "standard_logging_object should be built for TTS/aspeech responses" + ) + sl = logging_obj.model_call_details["standard_logging_object"] + assert sl is None or isinstance(sl, dict)