diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
index c9694e5ceb..400fad8e47 100644
--- a/.github/workflows/build.yaml
+++ b/.github/workflows/build.yaml
@@ -5,6 +5,7 @@ on:
push:
branches:
- main
+ - '*_rc'
pull_request:
branches:
- "**"
@@ -32,9 +33,26 @@ jobs:
- name: Install development dependencies
run: uv sync --group dev
-
+
+ - name: Get package version
+ id: get_version
+ run: |
+ release=$(cat RELEASE)
+ hash=$(echo "${{ github.event.pull_request.head.sha || github.sha }}" | cut -c1-7)
+ echo "Package version: ${release}+${hash}"
+ echo "package-version=${release}+${hash}" >> $GITHUB_OUTPUT
+
- name: Build project
- run: uv build
+ run: |
+ sed -i 's/^dynamic = \["version"\]/version = "'"${{ steps.get_version.outputs.package-version }}"'"/' pyproject.toml
+ uv build
- name: Install project in editable mode
- run: uv pip install --editable .
\ No newline at end of file
+ run: uv pip install --editable .
+
+ - name: Upload wheel
+ uses: actions/upload-artifact@v4
+ with:
+ name: pipecat_ai_dist_files_${{ steps.get_version.outputs.package-version }}
+ path: dist/*
+ retention-days: 30%
diff --git a/.github/workflows/secret_scan.yml b/.github/workflows/secret_scan.yml
new file mode 100644
index 0000000000..c665ab1b80
--- /dev/null
+++ b/.github/workflows/secret_scan.yml
@@ -0,0 +1,21 @@
+name: secret_scan
+on:
+ pull_request:
+ branches:
+ - 'main'
+ push:
+ branches:
+ - 'main'
+
+permissions:
+ contents: read
+ issues: write
+
+jobs:
+ scan_secrets_on_pull_request:
+ if: github.event_name == 'pull_request' && github.event.pull_request.base.ref == github.event.repository.default_branch
+ uses: opentok/application-security-secret-scanner/.github/workflows/secret_scanner_on_pr.yml@main
+
+ scan_secrets_on_push:
+ if: github.event_name == 'push' && github.ref_name == github.event.repository.default_branch
+ uses: opentok/application-security-secret-scanner/.github/workflows/secret_scanner_on_push.yaml@main
diff --git a/README.md b/README.md
index 3eb9bf346f..e32f1992a8 100644
--- a/README.md
+++ b/README.md
@@ -1,3 +1,9 @@
+
(Vonage) Opentok integrations implemented on copy of pipecat
+
+Original sources can be found on: [PipecatSource](https://github.com/pipecat-ai/pipecat/)
+
+
+
diff --git a/RELEASE b/RELEASE
new file mode 100644
index 0000000000..ef0f38abe1
--- /dev/null
+++ b/RELEASE
@@ -0,0 +1 @@
+2.19.0
diff --git a/examples/foundational/40a-aws-nova-sonic-vonage-video-webrtc.py b/examples/foundational/40a-aws-nova-sonic-vonage-video-webrtc.py
new file mode 100644
index 0000000000..d5c596b95c
--- /dev/null
+++ b/examples/foundational/40a-aws-nova-sonic-vonage-video-webrtc.py
@@ -0,0 +1,130 @@
+# Copyright 2025 Vonage
+"""Example of using AWS Nova Sonic LLM service with Vonage Video WebRTC transport."""
+
+import asyncio
+import json
+import os
+import sys
+
+from loguru import logger
+
+from pipecat.audio.vad.silero import SileroVADAnalyzer
+from pipecat.frames.frames import LLMRunFrame
+from pipecat.observers.loggers.transcription_log_observer import TranscriptionLogObserver
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.pipeline.runner import PipelineRunner
+from pipecat.pipeline.task import PipelineTask
+from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
+from pipecat.services import aws_nova_sonic
+from pipecat.services.aws_nova_sonic.aws import AWSNovaSonicLLMService
+from pipecat.transports.vonage.video_webrtc import (
+ VonageVideoWebrtcTransport,
+ VonageVideoWebrtcTransportParams,
+)
+
+logger.remove(0)
+logger.add(sys.stderr, level="DEBUG")
+
+
+async def main(session_str: str):
+ """Main entry point for the nova sonic vonage video webrtc example."""
+ system_instruction = (
+ "You are a friendly assistant. The user and you will engage in a spoken dialog exchanging "
+ "the transcripts of a natural real-time conversation. Keep your responses short, generally "
+ "two or three sentences for chatty scenarios. "
+ f"{AWSNovaSonicLLMService.AWAIT_TRIGGER_ASSISTANT_RESPONSE_INSTRUCTION}"
+ )
+ chans = 1
+ in_sr = 16000
+ out_sr = 24000
+
+ session_obj = json.loads(session_str)
+ application_id = session_obj.get("apiKey", "")
+ session_id = session_obj.get("sessionId", "")
+ token = session_obj.get("token", "")
+
+ transport = VonageVideoWebrtcTransport(
+ application_id,
+ session_id,
+ token,
+ VonageVideoWebrtcTransportParams(
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ vad_analyzer=SileroVADAnalyzer(),
+ publisher_name="TTS bot",
+ audio_in_sample_rate=in_sr,
+ audio_in_channels=chans,
+ audio_out_sample_rate=out_sr,
+ audio_out_channels=chans,
+ ),
+ )
+
+ ns_params = aws_nova_sonic.aws.Params()
+ ns_params.input_sample_rate = in_sr
+ ns_params.output_sample_rate = out_sr
+ ns_params.input_channel_count = chans
+ ns_params.output_channel_count = chans
+
+ llm = AWSNovaSonicLLMService(
+ secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY", ""),
+ access_key_id=os.getenv("AWS_ACCESS_KEY_ID", ""),
+ region=os.getenv("AWS_REGION", ""),
+ session_token=os.getenv("AWS_SESSION_TOKEN", ""),
+ voice_id="tiffany",
+ params=ns_params,
+ )
+ context = OpenAILLMContext(
+ messages=[
+ {"role": "system", "content": f"{system_instruction}"},
+ {
+ "role": "user",
+ "content": "Tell me a fun fact!",
+ },
+ ],
+ )
+ context_aggregator = llm.create_context_aggregator(context)
+
+ pipeline = Pipeline(
+ [
+ transport.input(),
+ context_aggregator.user(),
+ llm,
+ transport.output(),
+ ]
+ )
+
+ task = PipelineTask(pipeline, observers=[TranscriptionLogObserver()])
+
+ # Handle client connection event
+ @transport.event_handler("on_client_connected")
+ async def on_client_connected(transport, client):
+ logger.info(f"Client connected")
+ await task.queue_frames([LLMRunFrame()])
+ # HACK: for now, we need this special way of triggering the first assistant response in AWS
+ # Nova Sonic. Note that this trigger requires a special corresponding bit of text in the
+ # system instruction. In the future, simply queueing the context frame should be sufficient.
+ await llm.trigger_assistant_response()
+
+ runner = PipelineRunner()
+
+ await asyncio.gather(runner.run(task))
+
+
+def cli_main():
+ """Console script entry point for the nova sonic vonage video webrtc example."""
+ if len(sys.argv) > 1:
+ session_str = sys.argv[1]
+ logger.info(f"Session str: {session_str}")
+ else:
+ logger.error(f"Usage: {sys.argv[0]} ")
+ logger.error("VONAGE_SESSION_STR should be a JSON string with the following format:")
+ logger.error(
+ '{"apiKey": "your_api_key", "sessionId": "your_session_id", "token": "your_token"}'
+ )
+ sys.exit(1)
+
+ asyncio.run(main(session_str))
+
+
+if __name__ == "__main__":
+ cli_main()
diff --git a/examples/vonage-chatbot/Dockerfile b/examples/vonage-chatbot/Dockerfile
new file mode 100644
index 0000000000..daef112269
--- /dev/null
+++ b/examples/vonage-chatbot/Dockerfile
@@ -0,0 +1,30 @@
+# Use an official Python runtime as a parent image
+FROM python:3.12-bullseye
+
+# Set the working directory in the container (repo root inside the image)
+WORKDIR /vonage-chatbot
+
+# Install ffmpeg for pydub at runtime
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends ffmpeg && \
+ rm -rf /var/lib/apt/lists/*
+
+# Copy the example's requirements file into the container (for layer caching)
+COPY examples/vonage-chatbot/requirements.txt ./requirements.txt
+
+# Install any needed packages specified in requirements.txt
+RUN pip install --upgrade pip && \
+ pip install --no-cache-dir -r requirements.txt
+
+# Copy the entire repo so local src/pipecat/* is available
+COPY . .
+
+# Install the local pipecat package (so imports like pipecat.serializers.vonage work)
+RUN pip install -e ".[openai,websocket,vonage,silero,runner]"
+
+# Expose the desired port (WebSocket server)
+EXPOSE 8005
+
+# Run the application from the example directory
+WORKDIR /vonage-chatbot/examples/vonage-chatbot
+CMD ["python", "server.py"]
diff --git a/examples/vonage-chatbot/README.md b/examples/vonage-chatbot/README.md
new file mode 100644
index 0000000000..3c9fe3ec37
--- /dev/null
+++ b/examples/vonage-chatbot/README.md
@@ -0,0 +1,157 @@
+# Vonage Chatbot (Pipecat)
+
+A real-time voice chatbot built using **Pipecat AI** with **Vonage Audio Connector** over **WebSocket**.
+This project streams caller audio to **OpenAI STT**, processes the conversation using an LLM, converts the AI's response to speech via **OpenAI TTS**, and streams it back to the caller in real time. The server exposes a WebSocket endpoint (via **VonageAudioConnectorTransport**) that the Vonage **/connect API** connects to, bridging a live session into the **OpenAI STT → LLM → TTS** pipeline.
+
+
+## Table of Contents
+
+- [Features](#features)
+- [Requirements](#requirements)
+- [Installation](#installation)
+- [Expose Local Server with ngrok](#expose-local-server-with-ngrok)
+- [Configure Vonage Voice)](#configure-vonage-voice)
+- [Running the Application](#running-the-application)
+- [Testing the Chatbot](#testing-the-chatbot)
+
+## Features
+
+- **Real-time WebSocket audio** to/from Vonage over WebSocket
+- **OpenAI-powered pipeline** STT → LLM → TTS pipeline
+- **Silero VAD** for accurate talk-pause detection
+- **Dockerized** for easy deployment
+
+## Requirements
+
+- Python **3.10+**
+- A **Vonage account**
+- An **OpenAI API key**
+- **ngrok** (or any HTTPS tunnel) for local testing
+- Docker (optional)
+
+## Installation
+
+1. **Clone the repo and enter it**
+
+ ```sh
+ git clone https://github.com/opentok/vonage-pipecat.git
+ cd vonage-pipecat/
+ ```
+
+2. **Set up a virtual environment** (recommended):
+
+ ```sh
+ python -m venv .venv
+ source .venv/bin/activate # Windows: .venv\Scripts\activate
+ ```
+
+3. **Install Pipecat AI (editable mode)**:
+
+ ```sh
+ pip install -e ".[openai,websocket,vonage,silero,runner]"
+ ```
+
+4. **Install example dependencies**:
+
+ ```sh
+ cd examples/vonage-chatbot
+ pip install -r requirements.txt
+ ```
+
+5. **Create .env file**:
+
+ Copy the example environment file and update with your settings:
+
+ ```sh
+ cp env.example .env
+ ```
+
+6. **Add your OpenAI Key to .env**:
+
+ ```sh
+ OPENAI_API_KEY=sk-proj-xxxxxxxxxxxxxxxxxxxxxxxx
+ # Do not include quotes ("")
+ ```
+
+7. **Install ngrok**:
+
+ Follow the instructions on the [ngrok website](https://ngrok.com/download) to download and install ngrok. You’ll use this to securely expose your local WebSocket server for testing.
+
+## Expose Local Server with ngrok
+
+1. **Start ngrok**:
+
+ In a new terminal, start ngrok to tunnel the local server:
+
+ ```sh
+ ngrok http 8005
+ #Copy the wss URL, e.g. "uri": "wss://",
+ ```
+
+ You’ll see output like:
+
+ ```sh
+ Forwarding https://a5db22f57efa.ngrok-free.app -> http://localhost:8005
+ ```
+
+ The https:// address is your public ngrok domain. To create the WebSocket Secure (WSS) URL for Vonage, simply replace https:// with wss://.
+
+ Example:
+
+ ```sh
+ "websocket": {
+ "uri": "wss://a5db22f57efa.ngrok-free.app",
+ "audioRate": 16000,
+ "bidirectional": true
+ }
+ ```
+
+## Configure Vonage Voice
+1. Open the **Vonage Video API Playground** (or your own application).
+2. Create a new session and publish the stream.
+3. Make a POST request to:
+ ```sh
+ /v2/project/{apiKey}/connect
+ ```
+4. Include the following in the JSON body:
+ - sessionId
+ - token
+ - The WebSocket URI from ngrok (e.g. "wss://a5db22f57efa.ngrok-free.app")
+ - "audioRate": 16000
+ - "bidirectional": true
+5. This connects your Vonage session to your locally running Pipecat WebSocket server through ngrok.
+6. For a working example of the /connect API request, see [Testing the Chatbot](#testing-the-chatbot)
+
+## Running the Application
+
+Choose one of the following methods to start the chatbot server.
+
+### Option 1: Run with Python
+
+**Run the Server application**:
+
+ ```sh
+ # Ensure you're in the example directory (examples/vonage-chatbot) and your virtual environment is active
+ python server.py
+ ```
+
+### Option 2: Run with Docker
+
+1. **Build the Docker image**:
+
+ ```sh
+ docker build -f examples/vonage-chatbot/Dockerfile -t vonage-chatbot .
+ ```
+
+2. **Run the Docker container**:
+ ```sh
+ docker run -it --rm -p 8005:8005 --env-file examples/vonage-chatbot/.env vonage-chatbot
+ ```
+
+The server will start on port 8005. Keep this running while you test with Vonage.
+
+## Testing the Chatbot
+
+1. Start publishing audio in the Vonage Playground
+2. Follow the examples/vonage-chatbot/client/README.md and run the connect_and_stream.py.
+Once established then speak. Your audio will reach STT → LLM → TTS pipeline and you’ll hear AI-generated voice reply.
diff --git a/examples/vonage-chatbot/client/README.md b/examples/vonage-chatbot/client/README.md
new file mode 100644
index 0000000000..d9397df646
--- /dev/null
+++ b/examples/vonage-chatbot/client/README.md
@@ -0,0 +1,113 @@
+# Python Client for Server Testing
+
+This Python client enables automated testing of the **Vonage Pipecat WebSocket server** . It opens a WS connection to your Pipecat endpoint, streams test audio (microphone) and plays back the audio received from the server.
+
+## Setup Instructions
+
+1. **Clone the repo and enter it**
+ ```sh
+ git clone https://github.com/opentok/vonage-pipecat.git
+ cd vonage-pipecat/examples/vonage-chatbot/client
+ ```
+
+2. **Set up a virtual environment** (optional but recommended):
+ ```sh
+ python -m venv .venv-client
+ source .venv-client/bin/activate # Windows: .venv-client\Scripts\activate
+ ```
+
+3. **Install dependencies**:
+ ```sh
+ pip install -r requirements.txt
+ ```
+
+4. **Create .env**:
+ Copy the example environment file and update with your settings:
+
+ ```sh
+ cp env.example .env
+ ```
+
+5. **Start a Opentok Session and Publish a stream**
+ The Session ID is required.
+ Note: You can use either opentok or vonage platform to create the session. Open the Playground (or your own app) to create a session and publish a stream.
+ Copy the Session ID and set it in `.env` file:
+ ```sh
+ VONAGE_SESSION_ID=
+ ```
+
+ If you are using Opentok platform, set OPENTOK_API_URL in your .env:
+ ```sh
+ OPENTOK_API_URL=https://api.opentok.com
+ ```
+ If you are using Vonage platform, set VONAGE_API_URL in your .env:
+ ```sh
+ VONAGE_API_URL=api.vonage.com
+ ```
+
+ Use the **Credentials** from the **same project** that created the `sessionId`.
+
+6. **Set the Keys in .env**
+ If you created the session in Opentok platform, set the following in your `.env`:
+
+ ```sh
+ # Vonage (OpenTok) credentials
+ VONAGE_API_KEY=YOUR_API_KEY
+ VONAGE_API_SECRET=YOUR_API_SECRET
+
+ # Your Pipecat WebSocket endpoint (ngrok or prod)
+ WS_URI=wss://
+
+ # Put existing session from playground or app which you want to connect pipecat-ai
+ VONAGE_SESSION_ID=1_MX4....
+
+ # API base
+ OPENTOK_API_URL=https://api.opentok.com
+
+ # Keep rest as same.
+ ```
+ If you created the session in Vonage platform, set the following in your `.env`:
+
+ ```sh
+ # Vonage (OpenTok) credentials
+ VONAGE_APPLICATION_ID=YOUR_APPLICATION_ID
+ VONAGE_PRIVATE_KEY=YOUR_PRIVATE_KEY_PATH
+
+ # Your Pipecat WebSocket endpoint (ngrok or prod)
+ WS_URI=wss://
+
+ # Put existing session from playground or app which you want to connect pipecat-ai
+ VONAGE_SESSION_ID=1_MX4....
+
+ # API base
+ VONAGE_API_URL=api.vonage.com
+
+ # Keep rest as same.
+ ```
+
+7. **Start your Pipecat WS server**:
+ Make sure the Vonage Pipecat server is running locally and exposes a WS endpoint via ngrok
+
+8. **Running the Client**:
+ Below program will connect the opentok session created above to the pipecat-ai pipeline.
+ If you are using the opentok platform, run:
+ ```sh
+ python connect_and_stream.py
+ ```
+
+ If you are using the Vonage platform, run:
+ ```sh
+ python connect_and_stream_vonage.py
+ ```
+
+**Note**
+The script reads everything from .env via os.getenv().
+You can still override via flags if you want, e.g.:
+
+ ```sh
+ # Example
+ python connect_and_stream.py --ws-uri wss://my-ngrok/ws --audio-rate 16000
+
+ # OR
+ python connect_and_stream_vonage.py --ws-uri wss://my-ngrok/ws --audio-rate 16000
+ ```
diff --git a/examples/vonage-chatbot/client/connect_and_stream.py b/examples/vonage-chatbot/client/connect_and_stream.py
new file mode 100644
index 0000000000..31c7b82c22
--- /dev/null
+++ b/examples/vonage-chatbot/client/connect_and_stream.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python3
+"""
+Use a Vonage (OpenTok) Video API existing session, generate a token,
+and connect its audio to your Pipecat WebSocket endpoint.
+"""
+
+import argparse
+import json
+import os
+from typing import Dict, List
+
+from dotenv import load_dotenv
+from opentok import Client # SDK 3.x
+
+# ---- helpers ----------------------------------------------------------------
+
+
+def parse_kv_pairs(items: List[str]) -> Dict[str, str]:
+ """
+ Parse CLI --header/--param entries like "Key=Value" or "Key:Value".
+ """
+ out: Dict[str, str] = {}
+ for raw in items or []:
+ sep = "=" if "=" in raw else (":" if ":" in raw else None)
+ if not sep:
+ raise ValueError(f"Invalid header/param format: {raw!r}. Use Key=Value")
+ k, v = raw.split(sep, 1)
+ out[k.strip()] = v.strip()
+ return out
+
+
+def comma_list(s: str | None) -> List[str]:
+ return [x.strip() for x in s.split(",")] if s else []
+
+
+# ---- main -------------------------------------------------------------------
+
+
+def main() -> None:
+ load_dotenv()
+
+ p = argparse.ArgumentParser(
+ description="Create a session and connect its audio to a WebSocket (Pipecat)."
+ )
+ # Auth
+ p.add_argument("--api-key", default=os.getenv("VONAGE_API_KEY"), required=False)
+ p.add_argument("--api-secret", default=os.getenv("VONAGE_API_SECRET"), required=False)
+
+ # Where to connect
+ p.add_argument("--ws-uri", default=os.getenv("WS_URI"), help="wss://...", required=False)
+ p.add_argument("--audio-rate", type=int, default=int(os.getenv("VONAGE_AUDIO_RATE", "16000")))
+ p.add_argument("--bidirectional", action="store_true", default=True)
+
+ # An existing session which needs to be connected to pipecat-ai
+ p.add_argument("--session-id", default=os.getenv("VONAGE_SESSION_ID"))
+
+ # Optional streams and headers (to pass to the WS)
+ p.add_argument(
+ "--streams", default=os.getenv("VONAGE_STREAMS"), help="Comma-separated stream IDs"
+ )
+ p.add_argument(
+ "--header",
+ action="append",
+ help="Extra header(s) for WS, e.g. --header X-Foo=bar (repeatable)",
+ )
+
+ # Optional: choose API base. If your SDK doesn’t accept api_url, set OPENTOK_API_URL env before run.
+ p.add_argument("--api-base", default=os.getenv("OPENTOK_API_URL", "https://api.opentok.com"))
+
+ args = p.parse_args()
+
+ # Validate inputs
+ missing = [
+ k
+ for k, v in {
+ "api-key": args.api_key,
+ "api-secret": args.api_secret,
+ "ws-uri": args.ws_uri,
+ }.items()
+ if not v
+ ]
+ if missing:
+ raise SystemExit(f"Missing required args/env: {', '.join(missing)}")
+
+ # Init client (SDK 3.x supports api_url kw; if yours doesn’t, remove it and use OPENTOK_API_URL env)
+ try:
+ ot = Client(args.api_key, args.api_secret, api_url=args.api_base)
+ except TypeError:
+ # Fallback for older SDKs that don't accept api_url
+ ot = Client(args.api_key, args.api_secret)
+
+ session_id = args.session_id
+ print(f"Using existing session: {session_id}")
+
+ # Token: generate a fresh one tied to this session
+ token = ot.generate_token(session_id)
+ print(f"Generated token: {token[:32]}...") # don’t print full token in logs
+
+ # Build websocket options (mirrors your Postman body)
+ ws_opts = {
+ "uri": args.ws_uri,
+ "audioRate": args.audio_rate,
+ "bidirectional": bool(args.bidirectional),
+ }
+
+ # Optional stream filtering
+ stream_list = comma_list(args.streams)
+ if stream_list:
+ ws_opts["streams"] = stream_list
+
+ # Optional headers passed to your WS server
+ headers = parse_kv_pairs(args.header or [])
+ if headers:
+ ws_opts["headers"] = headers
+
+ print("Connecting audio to WebSocket with options:")
+ print(json.dumps(ws_opts, indent=2))
+
+ # Call the Audio Connector (this is equivalent to POST /v2/project/{apiKey}/connect)
+ resp = ot.connect_audio_to_websocket(session_id, token, ws_opts)
+
+ # The SDK returns a small object/dict; print it for visibility
+ try:
+ print("Connect response:", json.dumps(resp, indent=2))
+ except TypeError:
+ # Not JSON-serializable; just repr it
+ print("Connect response:", resp)
+
+ print("\nSuccess! Your Video session should now stream audio to/from:", args.ws_uri)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/vonage-chatbot/client/connect_and_stream_vonage.py b/examples/vonage-chatbot/client/connect_and_stream_vonage.py
new file mode 100644
index 0000000000..ddd6dda5a5
--- /dev/null
+++ b/examples/vonage-chatbot/client/connect_and_stream_vonage.py
@@ -0,0 +1,154 @@
+#!/usr/bin/env python3
+"""
+Use a Vonage (OpenTok) Video API existing session, generate a token,
+and connect its audio to your Pipecat WebSocket endpoint.
+"""
+
+import argparse
+import json
+import os
+from typing import Dict, List
+
+from dotenv import load_dotenv
+from vonage import Auth, HttpClientOptions, Vonage
+from vonage_video import AudioConnectorOptions, TokenOptions
+
+# ---- helpers ----------------------------------------------------------------
+
+
+def parse_kv_pairs(items: List[str]) -> Dict[str, str]:
+ """
+ Parse CLI --header/--param entries like "Key=Value" or "Key:Value".
+ """
+ out: Dict[str, str] = {}
+ for raw in items or []:
+ sep = "=" if "=" in raw else (":" if ":" in raw else None)
+ if not sep:
+ raise ValueError(f"Invalid header/param format: {raw!r}. Use Key=Value")
+ k, v = raw.split(sep, 1)
+ out[k.strip()] = v.strip()
+ return out
+
+
+def comma_list(s: str | None) -> List[str]:
+ return [x.strip() for x in s.split(",")] if s else []
+
+
+# ---- main -------------------------------------------------------------------
+
+
+def main() -> None:
+ load_dotenv()
+
+ p = argparse.ArgumentParser(
+ description="Create a session and connect its audio to a WebSocket (Pipecat)."
+ )
+ # Auth
+ p.add_argument("--application-id", default=os.getenv("VONAGE_APPLICATION_ID"), required=False)
+ p.add_argument("--private-key", default=os.getenv("VONAGE_PRIVATE_KEY"), required=False)
+
+ # Where to connect
+ p.add_argument("--ws-uri", default=os.getenv("WS_URI"), help="wss://...", required=False)
+ p.add_argument("--audio-rate", type=int, default=int(os.getenv("VONAGE_AUDIO_RATE", "16000")))
+
+ bidirectional_env = os.getenv("VONAGE_BIDIRECTIONAL")
+ if bidirectional_env is not None:
+ if bidirectional_env.lower() not in ("true", "false"):
+ raise SystemExit("VONAGE_BIDIRECTIONAL must be 'true' or 'false'")
+ bidirectional_default = bidirectional_env.lower() == "true"
+ else:
+ bidirectional_default = True
+
+ p.add_argument("--bidirectional", action="store_true", default=bidirectional_default)
+
+ # An existing session which needs to be connected to pipecat-ai
+ p.add_argument("--session-id", default=os.getenv("VONAGE_SESSION_ID"))
+
+ # Optional streams and headers (to pass to the WS)
+ p.add_argument(
+ "--streams", default=os.getenv("VONAGE_STREAMS"), help="Comma-separated stream IDs"
+ )
+ p.add_argument(
+ "--header",
+ action="append",
+ help="Extra header(s) for WS, e.g. --header X-Foo=bar (repeatable)",
+ )
+
+ # Optional: choose API base. If your SDK doesn’t accept api_url, set VONAGE_API_URL env before run.
+ p.add_argument("--api-base", default=os.getenv("VONAGE_API_URL", "api.vonage.com"))
+
+ args = p.parse_args()
+
+ # Validate inputs
+ missing = [
+ k
+ for k, v in {
+ "application-id": args.application_id,
+ "private-key": args.private_key,
+ "ws-uri": args.ws_uri,
+ "session-id": args.session_id,
+ }.items()
+ if not v
+ ]
+ if missing:
+ raise SystemExit(f"Missing required args/env: {', '.join(missing)}")
+
+ # Create an Auth instance
+ auth = Auth(
+ application_id=args.application_id,
+ private_key=args.private_key,
+ )
+
+ # Create HttpClientOptions instance
+ # (not required unless you want to change options from the defaults)
+ options = HttpClientOptions(video_host="video." + args.api_base, timeout=30)
+
+ # Create a Vonage instance
+ vonage = Vonage(auth=auth, http_client_options=options)
+
+ session_id = args.session_id
+ print(f"Using existing session: {session_id}")
+
+ # Token: generate a fresh one tied to this session
+ token_options = TokenOptions(session_id=session_id, role="publisher")
+ token = vonage.video.generate_client_token(token_options)
+ print(f"Generated token: {token[:32]}...") # don’t print full token in logs
+
+ # Build websocket options (mirrors your Postman body)
+ ws_opts = {
+ "uri": args.ws_uri,
+ "audioRate": args.audio_rate,
+ "bidirectional": bool(args.bidirectional),
+ }
+
+ # Optional stream filtering
+ stream_list = comma_list(args.streams)
+ if stream_list:
+ ws_opts["streams"] = stream_list
+
+ # Optional headers passed to your WS server
+ headers = parse_kv_pairs(args.header or [])
+ if headers:
+ ws_opts["headers"] = headers
+
+ print("Connecting audio to WebSocket with options:")
+ print(json.dumps(ws_opts, indent=2))
+
+ # Call the Audio Connector (this is equivalent to POST /v2/project/{apiKey}/connect)
+ audio_connector_options = AudioConnectorOptions(
+ session_id=session_id, token=token, websocket=ws_opts
+ )
+ resp = vonage.video.start_audio_connector(audio_connector_options)
+
+ # The SDK returns a small object/dict; print it for visibility
+ try:
+ print("Connect response:", json.dumps(resp, indent=2))
+ except TypeError:
+ # Not JSON-serializable; just repr it
+ print("Connect response:", resp)
+
+ print("\nSuccess! Your Video session should now stream audio to/from:", args.ws_uri)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/vonage-chatbot/client/env.example b/examples/vonage-chatbot/client/env.example
new file mode 100644
index 0000000000..05fce761c2
--- /dev/null
+++ b/examples/vonage-chatbot/client/env.example
@@ -0,0 +1,27 @@
+# Vonage (OpenTok sdk) credentials
+VONAGE_API_KEY=YOUR_API_KEY
+VONAGE_API_SECRET=YOUR_API_SECRET
+
+# API base: uses the prod endpoint by default
+OPENTOK_API_URL=https://api.opentok.com
+
+# Or if you are using Vonage sdk
+
+# Vonage (Vonage sdk) credentials
+VONAGE_APPLICATION_ID=YOUR_APPLICATION_ID
+VONAGE_PRIVATE_KEY=YOUR_PRIVATE_KEY_PATH
+
+# API base: uses the prod endpoint by default
+VONAGE_API_URL=api.vonage.com
+
+# Your Pipecat WebSocket endpoint (ngrok or prod)
+WS_URI=wss://
+
+# Put existing session from playground or app which you want to connect pipecat-ai
+VONAGE_SESSION_ID=1_MX4....
+
+# Audio settings for the Audio Connector
+VONAGE_AUDIO_RATE=16000
+
+# Optional: override bidirectional (defaults to true in the script)
+# VONAGE_BIDIRECTIONAL=true
diff --git a/examples/vonage-chatbot/client/requirements.txt b/examples/vonage-chatbot/client/requirements.txt
new file mode 100644
index 0000000000..cd1bd9c0f4
--- /dev/null
+++ b/examples/vonage-chatbot/client/requirements.txt
@@ -0,0 +1,6 @@
+opentok>=3
+vonage>=3.3.1
+python-dotenv
+websockets>=12.0
+numpy>=1.26
+sounddevice>=0.4
diff --git a/examples/vonage-chatbot/env.example b/examples/vonage-chatbot/env.example
new file mode 100644
index 0000000000..e570b8b559
--- /dev/null
+++ b/examples/vonage-chatbot/env.example
@@ -0,0 +1 @@
+OPENAI_API_KEY=
diff --git a/examples/vonage-chatbot/requirements.txt b/examples/vonage-chatbot/requirements.txt
new file mode 100644
index 0000000000..a0bc9b0583
--- /dev/null
+++ b/examples/vonage-chatbot/requirements.txt
@@ -0,0 +1,3 @@
+python-dotenv
+loguru
+pydub>=0.25
diff --git a/examples/vonage-chatbot/server.py b/examples/vonage-chatbot/server.py
new file mode 100644
index 0000000000..636eee54d0
--- /dev/null
+++ b/examples/vonage-chatbot/server.py
@@ -0,0 +1,114 @@
+# SPDX-License-Identifier: BSD-2-Clause
+"""Example: Vonage serializer + custom WS transport + OpenAI STT/LLM/TTS."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+
+from dotenv import load_dotenv
+from loguru import logger
+
+from pipecat.audio.vad.silero import SileroVADAnalyzer
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.pipeline.runner import PipelineRunner
+from pipecat.pipeline.task import PipelineParams, PipelineTask
+from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
+from pipecat.serializers.vonage import VonageFrameSerializer
+from pipecat.services.openai import OpenAILLMService, OpenAISTTService, OpenAITTSService
+from pipecat.transports.network.websocket_server import WebsocketServerParams
+from pipecat.transports.vonage.audio_connector import VonageAudioConnectorTransport
+
+# ---- Constants ---------------------------------------------------------------
+
+WS_HOST: str = "0.0.0.0"
+WS_PORT: int = 8005
+SESSION_TIMEOUT_SECONDS: int = 60 * 3 # 3 minutes
+AUDIO_OUT_SAMPLE_RATE: int = 24_000
+
+SYSTEM_INSTRUCTION: str = (
+ "You are OpenAI Chatbot, a friendly, helpful robot. "
+ "Your output will be converted to audio, so avoid special characters. "
+ "Respond to the user in a creative, helpful way. Keep responses brief—"
+ "one or two sentences."
+)
+
+# Load environment variables from .env
+load_dotenv()
+
+
+async def run_bot_websocket_server() -> None:
+ serializer = VonageFrameSerializer()
+
+ ws_transport = VonageAudioConnectorTransport(
+ host=WS_HOST,
+ port=WS_PORT,
+ params=WebsocketServerParams(
+ serializer=serializer,
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ add_wav_header=True,
+ vad_analyzer=SileroVADAnalyzer(),
+ session_timeout=SESSION_TIMEOUT_SECONDS,
+ ),
+ )
+
+ stt = OpenAISTTService(
+ api_key=os.getenv("OPENAI_API_KEY"),
+ model="gpt-4o-transcribe",
+ prompt=("Expect words based on questions across technology, science, and culture."),
+ )
+
+ tts = OpenAITTSService(
+ api_key=os.getenv("OPENAI_API_KEY"),
+ voice="coral",
+ instructions="There may be literal '\\n' characters; ignore them when speaking.",
+ )
+
+ llm = OpenAILLMService(api_key=os.getenv("OPENAI_API_KEY"))
+
+ messages = [{"role": "system", "content": SYSTEM_INSTRUCTION}]
+ context = OpenAILLMContext(messages)
+ context_aggregator = llm.create_context_aggregator(context)
+
+ pipeline = Pipeline(
+ [
+ ws_transport.input(),
+ stt,
+ context_aggregator.user(),
+ llm,
+ tts,
+ ws_transport.output(),
+ ]
+ )
+
+ task = PipelineTask(
+ pipeline,
+ params=PipelineParams(
+ audio_out_sample_rate=AUDIO_OUT_SAMPLE_RATE,
+ enable_metrics=True,
+ enable_usage_metrics=True,
+ ),
+ )
+
+ @ws_transport.event_handler("on_client_connected")
+ async def on_client_connected(_transport, _client) -> None:
+ logger.info("Client connected")
+ messages.append({"role": "system", "content": "Please introduce yourself to the user."})
+ await task.queue_frames([context_aggregator.user().get_context_frame()])
+
+ @ws_transport.event_handler("on_client_disconnected")
+ async def on_client_disconnected(_transport, _client) -> None:
+ logger.info("Client disconnected")
+ await task.cancel()
+
+ @ws_transport.event_handler("on_websocket_ready")
+ async def on_websocket_ready(_client) -> None:
+ logger.info("Server WebSocket ready")
+
+ runner = PipelineRunner(handle_sigint=False)
+ await runner.run(task)
+
+
+if __name__ == "__main__":
+ asyncio.run(run_bot_websocket_server())
diff --git a/examples/vonage-speech-to-speech/Dockerfile b/examples/vonage-speech-to-speech/Dockerfile
new file mode 100644
index 0000000000..3a8297ba35
--- /dev/null
+++ b/examples/vonage-speech-to-speech/Dockerfile
@@ -0,0 +1,30 @@
+# Use an official Python runtime as a parent image
+FROM python:3.12-bullseye
+
+# Set the working directory in the container (repo root inside the image)
+WORKDIR /vonage-speech-to-speech
+
+# Install ffmpeg for pydub at runtime
+RUN apt-get update && \
+ apt-get install -y --no-install-recommends ffmpeg && \
+ rm -rf /var/lib/apt/lists/*
+
+# Copy the example's requirements file into the container (for layer caching)
+COPY examples/vonage-speech-to-speech/requirements.txt ./requirements.txt
+
+# Install any needed packages specified in requirements.txt
+RUN pip install --upgrade pip && \
+ pip install --no-cache-dir -r requirements.txt
+
+# Copy the entire repo so local src/pipecat/* is available
+COPY . .
+
+# Install the local pipecat package (so imports like pipecat.serializers.vonage work)
+RUN pip install -e ".[openai,websocket,vonage,silero,runner]"
+
+# Expose the desired port (WebSocket server)
+EXPOSE 8005
+
+# Run the application from the example directory
+WORKDIR /vonage-speech-to-speech/examples/vonage-speech-to-speech
+CMD ["python", "server.py"]
diff --git a/examples/vonage-speech-to-speech/README.md b/examples/vonage-speech-to-speech/README.md
new file mode 100644
index 0000000000..04624f0bdc
--- /dev/null
+++ b/examples/vonage-speech-to-speech/README.md
@@ -0,0 +1,157 @@
+# Vonage Speech-to-Speech Bot (Pipecat)
+
+A real-time voice chatbot using **Pipecat AI** with **Vonage Audio Connector** over **WebSocket**.
+This example uses OpenAI Realtime for speech-in → speech-out (no separate STT/TTS services). The server exposes a WS endpoint (via **VonageAudioConnectorTransport**) that the Vonage **/connect API** connects to, bridging the live session into an OpenAI Realtime speech↔speech pipeline.
+
+
+## Table of Contents
+
+- [Features](#features)
+- [Requirements](#requirements)
+- [Installation](#installation)
+- [Expose Local Server with ngrok](#expose-local-server-with-ngrok)
+- [Configure Vonage Voice)](#configure-vonage-voice)
+- [Running the Application](#running-the-application)
+- [Testing the Speech-to-Speech Bot](#testing-the-speech-to-speech-bot)
+
+## Features
+
+- **Real-time WebSocket audio** streaming between Vonage ↔ OpenAI Realtime
+- **OpenAI Realtime** native speech↔speech (no separate STT/TTS)
+- **Silero VAD** for accurate talk-pause detection
+- **Dockerized** for easy deployment
+
+## Requirements
+
+- Python **3.12+**
+- A **Vonage account**
+- An **OpenAI API key**
+- **ngrok** (or any HTTPS tunnel) for local testing
+- Docker (optional)
+
+## Installation
+
+1. **Clone the repo and enter it**
+
+ ```sh
+ git clone https://github.com/opentok/vonage-pipecat.git
+ cd vonage-pipecat/
+ ```
+
+2. **Set up a virtual environment** (optional but recommended):
+
+ ```sh
+ python -m venv .venv
+ source .venv/bin/activate # Windows: .venv\Scripts\activate
+ ```
+
+3. **Install Pipecat AI (editable mode)**:
+
+ ```sh
+ pip install -e ".[openai,websocket,vonage,silero,runner]"
+ ```
+
+4. **Install example dependencies**:
+
+ ```sh
+ cd examples/vonage-speech-to-speech
+ pip install -r requirements.txt
+ ```
+
+5. **Create .env file**:
+
+ Copy the example environment file and update with your settings:
+
+ ```sh
+ cp env.example .env
+ ```
+
+6. **Add your OpenAI Key to .env**:
+
+ ```sh
+ OPENAI_API_KEY=sk-proj-xxxxxxxxxxxxxxxxxxxxxxxx
+ # Do not include quotes ("")
+ ```
+
+7. **Install ngrok**:
+
+ Follow the instructions on the [ngrok website](https://ngrok.com/download) to download and install ngrok. You’ll use this to securely expose your local WebSocket server for testing.
+
+## Expose Local Server with ngrok
+
+1. **Start ngrok**:
+
+ In a new terminal, start ngrok to tunnel the local server:
+
+ ```sh
+ ngrok http 8005
+ #Copy the wss URL, e.g. "uri": "wss://",
+ ```
+
+ You’ll see output like:
+
+ ```sh
+ Forwarding https://a5db22f57efa.ngrok-free.app -> http://localhost:8005
+ ```
+
+ The https:// address is your public ngrok domain. To create the WebSocket Secure (WSS) URL for Vonage, simply replace https:// with wss://.
+
+ Example:
+
+ ```sh
+ "websocket": {
+ "uri": "wss://a5db22f57efa.ngrok-free.app",
+ "audioRate": 16000,
+ "bidirectional": true
+ }
+ ```
+
+## Configure Vonage Voice
+1. Open the **Vonage Video API Playground** (or your own application).
+2. Create a new session and publish the stream.
+3. Make a POST request to:
+ ```sh
+ /v2/project/{apiKey}/connect
+ ```
+4. Include the following in the JSON body:
+ - sessionId
+ - token
+ - The WebSocket URI from ngrok (e.g. "wss://a5db22f57efa.ngrok-free.app")
+ - "audioRate": 16000
+ - "bidirectional": true
+5. This connects your Vonage session to your locally running Pipecat WebSocket server through ngrok.
+6. For a working example of the /connect API request, see [Testing the Speech-to-Speech Bot](#testing-the-speech-to-speech-bot)
+
+## Running the Application
+
+Choose one of the following methods to start the chatbot server.
+
+### Option 1: Run with Python
+
+**Run the Server application**:
+
+ ```sh
+ # Ensure you're in the example directory (examples/vonage-speech-to-speech) and your virtual environment is active
+ python server.py
+ ```
+
+### Option 2: Run with Docker
+
+1. **Build the Docker image**:
+
+ ```sh
+ docker build -f examples/vonage-speech-to-speech/Dockerfile -t vonage-speech-to-speech .
+ ```
+
+2. **Run the Docker container**:
+ ```sh
+ docker run -it --rm -p 8005:8005 --env-file examples/vonage-speech-to-speech/.env vonage-speech-to-speech
+ ```
+
+The server will start on port 8005. Keep this running while you test with Vonage.
+
+## Testing the Speech-to-Speech Bot
+
+1. Start publishing audio in the Vonage Playground
+2. Follow the examples/vonage-speech-to-speech/client/README.md and run the connect_and_stream.py.
+Once established speak into the session and you’ll hear the AI’s response streamed back instantly via the OpenAI Realtime speech↔speech model. Voice Input → Realtime LLM → Voice Reply.
diff --git a/examples/vonage-speech-to-speech/client/README.md b/examples/vonage-speech-to-speech/client/README.md
new file mode 100644
index 0000000000..215f57c4a5
--- /dev/null
+++ b/examples/vonage-speech-to-speech/client/README.md
@@ -0,0 +1,108 @@
+# Python Client for Server Testing
+
+This Python client enables automated testing of the **Vonage Pipecat WebSocket server** . It opens a WS connection to your Pipecat endpoint, streams test audio (microphone) and plays back the audio received from the server.
+
+## Setup Instructions
+
+1. **Clone the repo and enter it**
+ ```sh
+ git clone https://github.com/opentok/vonage-pipecat.git
+ cd vonage-pipecat/examples/vonage-speech-to-speech/client
+ ```
+
+2. **Set up a virtual environment** (optional but recommended):
+ ```sh
+ python -m venv .venv-client
+ source .venv-client/bin/activate # Windows: .venv-client\Scripts\activate
+ ```
+
+3. **Install dependencies**:
+ ```sh
+ pip install -r requirements.txt
+ ```
+
+4. **Create .env**:
+ Copy the example environment file and update with your settings:
+
+ ```sh
+ cp env.example .env
+ ```
+
+5. **Start an Opentok Session and Publish a stream**
+ The Session ID is required.
+ Note: You can use either opentok or vonage platform to create the session. Open the Playground (or your own app) to create a session and publish a stream.
+ Copy the Session ID and set it in `.env` file:
+ ```sh
+ VONAGE_SESSION_ID=
+ ```
+
+ If you are using Opentok platform, set OPENTOK_API_URL in your .env:
+ ```sh
+ OPENTOK_API_URL=https://api.opentok.com
+ ```
+
+ Use the **API key** and **secret** from the **same project** that created the `sessionId`.
+
+6. **Set the Keys in .env**:
+ ```sh
+ # Vonage (OpenTok) credentials
+ VONAGE_API_KEY=YOUR_API_KEY
+ VONAGE_API_SECRET=YOUR_API_SECRET
+
+ # Your Pipecat WebSocket endpoint (ngrok or prod)
+ WS_URI=wss://
+
+ # Put existing session from playground or app which you want to connect pipecat-ai
+ VONAGE_SESSION_ID=1_MX4....
+
+ # API base
+ OPENTOK_API_URL=https://api.opentok.com
+
+ # Keep rest as same.
+ ```
+ If you created the session in Vonage platform, set the following in your `.env`:
+
+ ```sh
+ # Vonage (OpenTok) credentials
+ VONAGE_APPLICATION_ID=YOUR_APPLICATION_ID
+ VONAGE_PRIVATE_KEY=YOUR_PRIVATE_KEY_PATH
+
+ # Your Pipecat WebSocket endpoint (ngrok or prod)
+ WS_URI=wss://
+
+ # Put existing session from playground or app which you want to connect pipecat-ai
+ VONAGE_SESSION_ID=1_MX4....
+
+ # API base
+ VONAGE_API_URL=api.vonage.com
+
+ # Keep rest as same.
+ ```
+
+7. **Start your Pipecat WS server**:
+ Make sure the Vonage Pipecat server is running locally and exposes a WS endpoint via ngrok
+
+8. **Running the Client**:
+ Below program will connect the opentok session created above to the pipecat-ai pipeline.
+
+ If you are using the opentok platform, run:
+ ```sh
+ python connect_and_stream.py
+ ```
+
+ If you are using the Vonage platform, run:
+ ```sh
+ python connect_and_stream_vonage.py
+ ```
+
+**Note**
+The script reads everything from .env via os.getenv().
+You can still override via flags if you want, e.g.:
+
+ ```sh
+ # Example
+ python connect_and_stream.py --ws-uri wss://my-ngrok/ws --audio-rate 16000
+
+ # OR
+ python connect_and_stream_vonage.py --ws-uri wss://my-ngrok/ws --audio-rate 16000
+ ```
diff --git a/examples/vonage-speech-to-speech/client/connect_and_stream.py b/examples/vonage-speech-to-speech/client/connect_and_stream.py
new file mode 100644
index 0000000000..31c7b82c22
--- /dev/null
+++ b/examples/vonage-speech-to-speech/client/connect_and_stream.py
@@ -0,0 +1,133 @@
+#!/usr/bin/env python3
+"""
+Use a Vonage (OpenTok) Video API existing session, generate a token,
+and connect its audio to your Pipecat WebSocket endpoint.
+"""
+
+import argparse
+import json
+import os
+from typing import Dict, List
+
+from dotenv import load_dotenv
+from opentok import Client # SDK 3.x
+
+# ---- helpers ----------------------------------------------------------------
+
+
+def parse_kv_pairs(items: List[str]) -> Dict[str, str]:
+ """
+ Parse CLI --header/--param entries like "Key=Value" or "Key:Value".
+ """
+ out: Dict[str, str] = {}
+ for raw in items or []:
+ sep = "=" if "=" in raw else (":" if ":" in raw else None)
+ if not sep:
+ raise ValueError(f"Invalid header/param format: {raw!r}. Use Key=Value")
+ k, v = raw.split(sep, 1)
+ out[k.strip()] = v.strip()
+ return out
+
+
+def comma_list(s: str | None) -> List[str]:
+ return [x.strip() for x in s.split(",")] if s else []
+
+
+# ---- main -------------------------------------------------------------------
+
+
+def main() -> None:
+ load_dotenv()
+
+ p = argparse.ArgumentParser(
+ description="Create a session and connect its audio to a WebSocket (Pipecat)."
+ )
+ # Auth
+ p.add_argument("--api-key", default=os.getenv("VONAGE_API_KEY"), required=False)
+ p.add_argument("--api-secret", default=os.getenv("VONAGE_API_SECRET"), required=False)
+
+ # Where to connect
+ p.add_argument("--ws-uri", default=os.getenv("WS_URI"), help="wss://...", required=False)
+ p.add_argument("--audio-rate", type=int, default=int(os.getenv("VONAGE_AUDIO_RATE", "16000")))
+ p.add_argument("--bidirectional", action="store_true", default=True)
+
+ # An existing session which needs to be connected to pipecat-ai
+ p.add_argument("--session-id", default=os.getenv("VONAGE_SESSION_ID"))
+
+ # Optional streams and headers (to pass to the WS)
+ p.add_argument(
+ "--streams", default=os.getenv("VONAGE_STREAMS"), help="Comma-separated stream IDs"
+ )
+ p.add_argument(
+ "--header",
+ action="append",
+ help="Extra header(s) for WS, e.g. --header X-Foo=bar (repeatable)",
+ )
+
+ # Optional: choose API base. If your SDK doesn’t accept api_url, set OPENTOK_API_URL env before run.
+ p.add_argument("--api-base", default=os.getenv("OPENTOK_API_URL", "https://api.opentok.com"))
+
+ args = p.parse_args()
+
+ # Validate inputs
+ missing = [
+ k
+ for k, v in {
+ "api-key": args.api_key,
+ "api-secret": args.api_secret,
+ "ws-uri": args.ws_uri,
+ }.items()
+ if not v
+ ]
+ if missing:
+ raise SystemExit(f"Missing required args/env: {', '.join(missing)}")
+
+ # Init client (SDK 3.x supports api_url kw; if yours doesn’t, remove it and use OPENTOK_API_URL env)
+ try:
+ ot = Client(args.api_key, args.api_secret, api_url=args.api_base)
+ except TypeError:
+ # Fallback for older SDKs that don't accept api_url
+ ot = Client(args.api_key, args.api_secret)
+
+ session_id = args.session_id
+ print(f"Using existing session: {session_id}")
+
+ # Token: generate a fresh one tied to this session
+ token = ot.generate_token(session_id)
+ print(f"Generated token: {token[:32]}...") # don’t print full token in logs
+
+ # Build websocket options (mirrors your Postman body)
+ ws_opts = {
+ "uri": args.ws_uri,
+ "audioRate": args.audio_rate,
+ "bidirectional": bool(args.bidirectional),
+ }
+
+ # Optional stream filtering
+ stream_list = comma_list(args.streams)
+ if stream_list:
+ ws_opts["streams"] = stream_list
+
+ # Optional headers passed to your WS server
+ headers = parse_kv_pairs(args.header or [])
+ if headers:
+ ws_opts["headers"] = headers
+
+ print("Connecting audio to WebSocket with options:")
+ print(json.dumps(ws_opts, indent=2))
+
+ # Call the Audio Connector (this is equivalent to POST /v2/project/{apiKey}/connect)
+ resp = ot.connect_audio_to_websocket(session_id, token, ws_opts)
+
+ # The SDK returns a small object/dict; print it for visibility
+ try:
+ print("Connect response:", json.dumps(resp, indent=2))
+ except TypeError:
+ # Not JSON-serializable; just repr it
+ print("Connect response:", resp)
+
+ print("\nSuccess! Your Video session should now stream audio to/from:", args.ws_uri)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/vonage-speech-to-speech/client/connect_and_stream_vonage.py b/examples/vonage-speech-to-speech/client/connect_and_stream_vonage.py
new file mode 100644
index 0000000000..3248f4eeab
--- /dev/null
+++ b/examples/vonage-speech-to-speech/client/connect_and_stream_vonage.py
@@ -0,0 +1,153 @@
+#!/usr/bin/env python3
+"""
+Use a Vonage (OpenTok) Video API existing session, generate a token,
+and connect its audio to your Pipecat WebSocket endpoint.
+"""
+
+import argparse
+import json
+import os
+from typing import Dict, List
+
+from dotenv import load_dotenv
+from vonage import Auth, HttpClientOptions, Vonage
+from vonage_video import AudioConnectorOptions, TokenOptions
+
+# ---- helpers ----------------------------------------------------------------
+
+
+def parse_kv_pairs(items: List[str]) -> Dict[str, str]:
+ """
+ Parse CLI --header/--param entries like "Key=Value" or "Key:Value".
+ """
+ out: Dict[str, str] = {}
+ for raw in items or []:
+ sep = "=" if "=" in raw else (":" if ":" in raw else None)
+ if not sep:
+ raise ValueError(f"Invalid header/param format: {raw!r}. Use Key=Value")
+ k, v = raw.split(sep, 1)
+ out[k.strip()] = v.strip()
+ return out
+
+
+def comma_list(s: str | None) -> List[str]:
+ return [x.strip() for x in s.split(",")] if s else []
+
+
+# ---- main -------------------------------------------------------------------
+
+
+def main() -> None:
+ load_dotenv()
+
+ p = argparse.ArgumentParser(
+ description="Create a session and connect its audio to a WebSocket (Pipecat)."
+ )
+ # Auth
+ p.add_argument("--application-id", default=os.getenv("VONAGE_APPLICATION_ID"), required=False)
+ p.add_argument("--private-key", default=os.getenv("VONAGE_PRIVATE_KEY"), required=False)
+
+ # Where to connect
+ p.add_argument("--ws-uri", default=os.getenv("WS_URI"), help="wss://...", required=False)
+ p.add_argument("--audio-rate", type=int, default=int(os.getenv("VONAGE_AUDIO_RATE", "16000")))
+ bidirectional_env = os.getenv("VONAGE_BIDIRECTIONAL")
+ if bidirectional_env is not None:
+ if bidirectional_env.lower() not in ("true", "false"):
+ raise SystemExit("VONAGE_BIDIRECTIONAL must be 'true' or 'false'")
+ bidirectional_default = bidirectional_env.lower() == "true"
+ else:
+ bidirectional_default = True
+
+ p.add_argument("--bidirectional", action="store_true", default=bidirectional_default)
+
+ # An existing session which needs to be connected to pipecat-ai
+ p.add_argument("--session-id", default=os.getenv("VONAGE_SESSION_ID"))
+
+ # Optional streams and headers (to pass to the WS)
+ p.add_argument(
+ "--streams", default=os.getenv("VONAGE_STREAMS"), help="Comma-separated stream IDs"
+ )
+ p.add_argument(
+ "--header",
+ action="append",
+ help="Extra header(s) for WS, e.g. --header X-Foo=bar (repeatable)",
+ )
+
+ # Optional: choose API base. If your SDK doesn’t accept api_url, set VONAGE_API_URL env before run.
+ p.add_argument("--api-base", default=os.getenv("VONAGE_API_URL", "api.vonage.com"))
+
+ args = p.parse_args()
+
+ # Validate inputs
+ missing = [
+ k
+ for k, v in {
+ "application-id": args.application_id,
+ "private-key": args.private_key,
+ "ws-uri": args.ws_uri,
+ "session-id": args.session_id,
+ }.items()
+ if not v
+ ]
+ if missing:
+ raise SystemExit(f"Missing required args/env: {', '.join(missing)}")
+
+ # Create an Auth instance
+ auth = Auth(
+ application_id=args.application_id,
+ private_key=args.private_key,
+ )
+
+ # Create HttpClientOptions instance
+ # (not required unless you want to change options from the defaults)
+ options = HttpClientOptions(video_host="video." + args.api_base, timeout=30)
+
+ # Create a Vonage instance
+ vonage = Vonage(auth=auth, http_client_options=options)
+
+ session_id = args.session_id
+ print(f"Using existing session: {session_id}")
+
+ # Token: generate a fresh one tied to this session
+ token_options = TokenOptions(session_id=session_id, role="publisher")
+ token = vonage.video.generate_client_token(token_options)
+ print(f"Generated token: {token[:32]}...") # don’t print full token in logs
+
+ # Build websocket options (mirrors your Postman body)
+ ws_opts = {
+ "uri": args.ws_uri,
+ "audioRate": args.audio_rate,
+ "bidirectional": bool(args.bidirectional),
+ }
+
+ # Optional stream filtering
+ stream_list = comma_list(args.streams)
+ if stream_list:
+ ws_opts["streams"] = stream_list
+
+ # Optional headers passed to your WS server
+ headers = parse_kv_pairs(args.header or [])
+ if headers:
+ ws_opts["headers"] = headers
+
+ print("Connecting audio to WebSocket with options:")
+ print(json.dumps(ws_opts, indent=2))
+
+ # Call the Audio Connector (this is equivalent to POST /v2/project/{apiKey}/connect)
+ audio_connector_options = AudioConnectorOptions(
+ session_id=session_id, token=token, websocket=ws_opts
+ )
+ resp = vonage.video.start_audio_connector(audio_connector_options)
+
+ # The SDK returns a small object/dict; print it for visibility
+ try:
+ print("Connect response:", json.dumps(resp, indent=2))
+ except TypeError:
+ # Not JSON-serializable; just repr it
+ print("Connect response:", resp)
+
+ print("\nSuccess! Your Video session should now stream audio to/from:", args.ws_uri)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/examples/vonage-speech-to-speech/client/env.example b/examples/vonage-speech-to-speech/client/env.example
new file mode 100644
index 0000000000..05fce761c2
--- /dev/null
+++ b/examples/vonage-speech-to-speech/client/env.example
@@ -0,0 +1,27 @@
+# Vonage (OpenTok sdk) credentials
+VONAGE_API_KEY=YOUR_API_KEY
+VONAGE_API_SECRET=YOUR_API_SECRET
+
+# API base: uses the prod endpoint by default
+OPENTOK_API_URL=https://api.opentok.com
+
+# Or if you are using Vonage sdk
+
+# Vonage (Vonage sdk) credentials
+VONAGE_APPLICATION_ID=YOUR_APPLICATION_ID
+VONAGE_PRIVATE_KEY=YOUR_PRIVATE_KEY_PATH
+
+# API base: uses the prod endpoint by default
+VONAGE_API_URL=api.vonage.com
+
+# Your Pipecat WebSocket endpoint (ngrok or prod)
+WS_URI=wss://
+
+# Put existing session from playground or app which you want to connect pipecat-ai
+VONAGE_SESSION_ID=1_MX4....
+
+# Audio settings for the Audio Connector
+VONAGE_AUDIO_RATE=16000
+
+# Optional: override bidirectional (defaults to true in the script)
+# VONAGE_BIDIRECTIONAL=true
diff --git a/examples/vonage-speech-to-speech/client/requirements.txt b/examples/vonage-speech-to-speech/client/requirements.txt
new file mode 100644
index 0000000000..cd1bd9c0f4
--- /dev/null
+++ b/examples/vonage-speech-to-speech/client/requirements.txt
@@ -0,0 +1,6 @@
+opentok>=3
+vonage>=3.3.1
+python-dotenv
+websockets>=12.0
+numpy>=1.26
+sounddevice>=0.4
diff --git a/examples/vonage-speech-to-speech/env.example b/examples/vonage-speech-to-speech/env.example
new file mode 100644
index 0000000000..e570b8b559
--- /dev/null
+++ b/examples/vonage-speech-to-speech/env.example
@@ -0,0 +1 @@
+OPENAI_API_KEY=
diff --git a/examples/vonage-speech-to-speech/requirements.txt b/examples/vonage-speech-to-speech/requirements.txt
new file mode 100644
index 0000000000..a0bc9b0583
--- /dev/null
+++ b/examples/vonage-speech-to-speech/requirements.txt
@@ -0,0 +1,3 @@
+python-dotenv
+loguru
+pydub>=0.25
diff --git a/examples/vonage-speech-to-speech/server.py b/examples/vonage-speech-to-speech/server.py
new file mode 100644
index 0000000000..991135044d
--- /dev/null
+++ b/examples/vonage-speech-to-speech/server.py
@@ -0,0 +1,158 @@
+# SPDX-License-Identifier: BSD-2-Clause
+"""Speech↔Speech via OpenAI Realtime (no separate STT/TTS)."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+
+from dotenv import load_dotenv
+from loguru import logger
+
+from pipecat.audio.vad.silero import SileroVADAnalyzer
+from pipecat.audio.vad.vad_analyzer import VADParams
+from pipecat.frames.frames import Frame, StartInterruptionFrame
+from pipecat.pipeline.pipeline import Pipeline
+from pipecat.pipeline.runner import PipelineRunner
+from pipecat.pipeline.task import PipelineParams, PipelineTask
+from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
+from pipecat.serializers.vonage import VonageFrameSerializer
+
+# Realtime S2S (audio-in/audio-out) service
+from pipecat.services.openai_realtime_beta.context import OpenAIRealtimeLLMContext
+from pipecat.services.openai_realtime_beta.openai import OpenAIRealtimeBetaLLMService
+from pipecat.transports.network.websocket_server import WebsocketServerParams
+from pipecat.transports.vonage.audio_connector import VonageAudioConnectorTransport
+
+WS_HOST = "0.0.0.0"
+WS_PORT = 8005
+SESSION_TIMEOUT_SECONDS = 60 * 3
+AUDIO_OUT_SAMPLE_RATE = 16_000 # telephony-friendly
+
+SYSTEM_INSTRUCTION = (
+ "You are a concise, friendly voice assistant. "
+ "You will receive spoken input and respond with speech. "
+ "Always respond in ENGLISH only, even if the user speaks another language. "
+ "Keep replies to one or two sentences and avoid special characters."
+)
+
+load_dotenv()
+
+
+# Cancels the Realtime model when user starts speaking (barge-in).
+class RealtimeBargeInCanceler(FrameProcessor):
+ def __init__(self, realtime_service):
+ super().__init__()
+ self._realtime = realtime_service
+
+ # Direction-aware forwarding to avoid feedback loops.
+ async def queue_frame(self, frame: Frame, direction):
+ # Only cancel on *downstream* interruption (from mic/user)
+ if direction == FrameDirection.DOWNSTREAM and isinstance(frame, StartInterruptionFrame):
+ cancelled = False
+ for method_name in (
+ "cancel_current_response",
+ "cancel_response",
+ "stop_current_response",
+ ):
+ try:
+ method = getattr(self._realtime, method_name, None)
+ if method:
+ await method()
+ cancelled = True
+ break
+ except Exception as e:
+ logger.warning(f"Realtime cancel via {method_name} failed: {e}")
+ if not cancelled:
+ logger.warning(
+ "Realtime cancel method not found; barge-in will rely on VAD + clearAudio only."
+ )
+
+ # Forward respecting direction to prevent recursion
+ if direction == FrameDirection.DOWNSTREAM:
+ if self._next:
+ await self._next.queue_frame(frame, direction)
+ else: # UPSTREAM
+ if self._prev:
+ await self._prev.queue_frame(frame, direction)
+
+
+async def run_bot_websocket_server() -> None:
+ serializer = VonageFrameSerializer()
+
+ # VAD tuned for barge-in (times in seconds)
+ vad = SileroVADAnalyzer(
+ sample_rate=AUDIO_OUT_SAMPLE_RATE,
+ params=VADParams(
+ confidence=0.7,
+ start_secs=0.12, # ~120 ms to declare speaking
+ stop_secs=0.25, # ~250 ms silence to stop
+ min_volume=0.6,
+ ),
+ )
+
+ ws_transport = VonageAudioConnectorTransport(
+ host=WS_HOST,
+ port=WS_PORT,
+ params=WebsocketServerParams(
+ serializer=serializer,
+ audio_in_enabled=True,
+ audio_out_enabled=True,
+ add_wav_header=True,
+ vad_analyzer=vad,
+ session_timeout=SESSION_TIMEOUT_SECONDS,
+ ),
+ )
+
+ realtime = OpenAIRealtimeBetaLLMService(
+ api_key=os.getenv("OPENAI_API_KEY"),
+ model="gpt-4o-realtime-preview-2025-06-03",
+ send_transcription_frames=False,
+ # Optional knobs if supported:
+ # transcription_language="en",
+ # enable_server_vad=True,
+ # max_output_chunk_ms=200,
+ )
+
+ canceler = RealtimeBargeInCanceler(realtime)
+
+ messages = [{"role": "system", "content": SYSTEM_INSTRUCTION}]
+ context = OpenAIRealtimeLLMContext(messages)
+ context_agg = realtime.create_context_aggregator(context)
+
+ pipeline = Pipeline(
+ [
+ ws_transport.input(), # audio from Vonage over WS
+ canceler, # cancel model on StartInterruptionFrame (direction-aware)
+ context_agg.user(), # seed system context once
+ realtime, # audio-in/audio-out model
+ ws_transport.output(), # audio back to Vonage
+ ]
+ )
+
+ task = PipelineTask(
+ pipeline,
+ params=PipelineParams(
+ audio_out_sample_rate=AUDIO_OUT_SAMPLE_RATE,
+ enable_metrics=True,
+ enable_usage_metrics=True,
+ ),
+ )
+
+ @ws_transport.event_handler("on_client_connected")
+ async def on_client_connected(_t, _c):
+ logger.info("Client connected")
+ # Send the system context after everything is linked and running
+ await task.queue_frames([context_agg.user().get_context_frame()])
+
+ @ws_transport.event_handler("on_client_disconnected")
+ async def on_client_disconnected(_t, _c):
+ logger.info("Client disconnected")
+ await task.cancel()
+
+ runner = PipelineRunner(handle_sigint=False)
+ await runner.run(task)
+
+
+if __name__ == "__main__":
+ asyncio.run(run_bot_websocket_server())
diff --git a/pyproject.toml b/pyproject.toml
index c04dd24338..fbd2a2bd62 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -108,6 +108,7 @@ tavus=[]
together = []
tracing = [ "opentelemetry-sdk>=1.33.0", "opentelemetry-api>=1.33.0", "opentelemetry-instrumentation>=0.54b0" ]
ultravox = [ "transformers>=4.48.0", "vllm>=0.9.0" ]
+vonage = [ "pipecat-ai[websockets-base]" ]
webrtc = [ "aiortc>=1.13.0,<2", "opencv-python>=4.11.0.86,<5" ]
websocket = [ "pipecat-ai[websockets-base]", "fastapi>=0.115.6,<0.117.0" ]
websockets-base = [ "websockets>=13.1,<16.0" ]
diff --git a/src/pipecat/serializers/vonage.py b/src/pipecat/serializers/vonage.py
new file mode 100644
index 0000000000..7628b81a7c
--- /dev/null
+++ b/src/pipecat/serializers/vonage.py
@@ -0,0 +1,176 @@
+# SPDX-License-Identifier: BSD-2-Clause
+"""Vonage WebSocket serializer (WAV+pydub resample, fixed-size chunking)."""
+
+from __future__ import annotations
+
+import io
+import json
+import wave
+from typing import List, Optional, Union
+
+from loguru import logger
+from pydantic import BaseModel
+from pydub import AudioSegment
+
+from pipecat.audio.utils import create_stream_resampler
+from pipecat.frames.frames import (
+ CancelFrame,
+ EndFrame,
+ Frame,
+ InputAudioRawFrame,
+ OutputAudioRawFrame,
+ StartFrame,
+ StartInterruptionFrame,
+)
+from pipecat.serializers.base_serializer import FrameSerializer, FrameSerializerType
+
+# ---- Audio/timing constants --------------------------------------------------
+
+AUDIO_TARGET_RATE_HZ: int = 16_000 # 16 kHz target
+AUDIO_CHANNELS_MONO: int = 1 # mono
+PCM16_SAMPLE_WIDTH_BYTES: int = 2 # 16-bit PCM
+CHUNK_DURATION_MS: int = 20 # telephony frame
+SECONDS_PER_MS: float = 1.0 / 1_000.0
+CHUNK_PERIOD_SECONDS: float = CHUNK_DURATION_MS * SECONDS_PER_MS
+SLEEP_INTERVAL_PER_CHUNK: float = 0.01
+
+BYTES_PER_SAMPLE_MONO: int = AUDIO_CHANNELS_MONO * PCM16_SAMPLE_WIDTH_BYTES
+BYTES_PER_CHUNK: int = int(AUDIO_TARGET_RATE_HZ * CHUNK_PERIOD_SECONDS) * BYTES_PER_SAMPLE_MONO
+
+
+class VonageFrameSerializer(FrameSerializer):
+ """Produces 16 kHz mono PCM chunks; resamples using WAV+pydub path."""
+
+ class InputParams(BaseModel):
+ """Configuration options for the Vonage frame serializer.
+
+ Controls whether to send a clear-audio event and whether
+ to auto-hang-up on End/Cancel frames.
+ """
+
+ auto_hang_up: bool = True
+ send_clear_audio_event: bool = True
+
+ def __init__(self, params: Optional[InputParams] = None) -> None:
+ """Initialize the VonageFrameSerializer.
+
+ Args:
+ params: Optional configuration parameters for serialization.
+ """
+ self._params: VonageFrameSerializer.InputParams = (
+ params or VonageFrameSerializer.InputParams()
+ )
+ self._sample_rate_hz: int = AUDIO_TARGET_RATE_HZ
+ self._in_resampler = create_stream_resampler()
+ self._out_resampler = create_stream_resampler()
+
+ # Transport reads this for pacing (one sleep per chunk).
+ self.sleep_interval: float = SLEEP_INTERVAL_PER_CHUNK
+
+ # Serializer-side audio format assumptions for pydub path:
+ self._channels: int = AUDIO_CHANNELS_MONO
+ self._sample_width_bytes: int = PCM16_SAMPLE_WIDTH_BYTES
+
+ @property
+ def type(self) -> FrameSerializerType:
+ """Return the serializer type (binary frames)."""
+ return FrameSerializerType.BINARY
+
+ async def setup(self, frame: StartFrame) -> None:
+ """Prepare the serializer for a new session.
+
+ Sets the sample rate and sleep interval for chunk pacing.
+ """
+ self._sample_rate_hz = AUDIO_TARGET_RATE_HZ
+ self.sleep_interval = SLEEP_INTERVAL_PER_CHUNK
+
+ # --- helpers --------------------------------------------------------------
+
+ @staticmethod
+ def _resample_audio_with_pydub(
+ data: bytes,
+ src_rate_hz: int,
+ num_channels: int,
+ sample_width_bytes: int,
+ target_rate_hz: int,
+ ) -> bytes:
+ """Resample via WAV header + pydub.
+
+ NOTE: This assumes `data` contains a WAV header. If your pipeline disables
+ WAV headers, switch to a raw-PCM resampler instead.
+ """
+ with wave.open(io.BytesIO(data), "rb") as wf:
+ num_frames = wf.getnframes()
+ pcm_data = wf.readframes(num_frames)
+
+ segment = AudioSegment.from_raw(
+ io.BytesIO(pcm_data),
+ sample_width=sample_width_bytes,
+ frame_rate=src_rate_hz,
+ channels=num_channels,
+ )
+ resampled = (
+ segment.set_channels(num_channels)
+ .set_sample_width(sample_width_bytes)
+ .set_frame_rate(target_rate_hz)
+ )
+ return resampled.raw_data
+
+ @staticmethod
+ def _split_into_chunks(audio16: bytes) -> List[bytes]:
+ return [audio16[i : i + BYTES_PER_CHUNK] for i in range(0, len(audio16), BYTES_PER_CHUNK)]
+
+ # --- API ------------------------------------------------------------------
+
+ async def serialize(self, frame: Frame) -> Optional[Union[str, bytes, list[bytes]]]:
+ """Convert a Frame into one or more serialized payloads.
+
+ Args:
+ frame: The frame to serialize.
+
+ Returns:
+ The serialized data as a string, bytes, or list of bytes.
+ """
+ if self._params.auto_hang_up and isinstance(frame, (EndFrame, CancelFrame)):
+ logger.debug(
+ "VonageFrameSerializer: End/Cancel observed (auto-hang-up not implemented)."
+ )
+ return None
+
+ if isinstance(frame, StartInterruptionFrame) and self._params.send_clear_audio_event:
+ return json.dumps({"event": "clearAudio"})
+
+ if isinstance(frame, OutputAudioRawFrame):
+ audio16 = self._resample_audio_with_pydub(
+ data=frame.audio,
+ src_rate_hz=frame.sample_rate,
+ num_channels=self._channels,
+ sample_width_bytes=self._sample_width_bytes,
+ target_rate_hz=self._sample_rate_hz,
+ )
+ return self._split_into_chunks(audio16)
+
+ logger.debug(f"VonageFrameSerializer: ignoring frame type {type(frame).__name__}.")
+ return None
+
+ async def deserialize(self, data: Union[str, bytes]) -> Optional[Frame]:
+ """Convert serialized input data into a Frame.
+
+ Args:
+ data: The raw audio or frame payload.
+
+ Returns:
+ The corresponding Frame instance, or None if parsing fails.
+ """
+ if isinstance(data, (bytes, bytearray)):
+ audio = await self._in_resampler.resample(
+ bytes(data), self._sample_rate_hz, self._sample_rate_hz
+ )
+ return InputAudioRawFrame(
+ audio=audio,
+ num_channels=AUDIO_CHANNELS_MONO,
+ sample_rate=self._sample_rate_hz,
+ )
+
+ logger.info("VonageFrameSerializer: ignoring non-binary inbound data.")
+ return None
diff --git a/src/pipecat/transports/vonage/__init__.py b/src/pipecat/transports/vonage/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
diff --git a/src/pipecat/transports/vonage/audio_connector.py b/src/pipecat/transports/vonage/audio_connector.py
new file mode 100644
index 0000000000..e261d31fce
--- /dev/null
+++ b/src/pipecat/transports/vonage/audio_connector.py
@@ -0,0 +1,113 @@
+# SPDX-License-Identifier: BSD-2-Clause
+"""Vonage WebSocket transport (chunk iterator + sleep-per-chunk pacing)."""
+
+from __future__ import annotations
+
+import asyncio
+import io
+import wave
+from typing import Optional
+
+from loguru import logger
+
+from pipecat.frames.frames import Frame, OutputAudioRawFrame
+from pipecat.transports.base_transport import BaseTransport
+from pipecat.transports.network.websocket_server import (
+ WebsocketServerOutputTransport,
+ WebsocketServerParams,
+ WebsocketServerTransport,
+)
+
+# ---- Constants ---------------------------------------------------------------
+
+DEFAULT_WS_HOST: str = "localhost"
+DEFAULT_WS_PORT: int = 8765
+PCM16_SAMPLE_WIDTH_BYTES: int = 2 # 16-bit PCM
+
+
+class VonageAudioConnectorTransport(WebsocketServerTransport):
+ """WebSocket server transport that paces by sleeping once per audio chunk."""
+
+ def __init__(
+ self,
+ params: WebsocketServerParams,
+ host: str = DEFAULT_WS_HOST,
+ port: int = DEFAULT_WS_PORT,
+ input_name: Optional[str] = None,
+ output_name: Optional[str] = None,
+ ) -> None:
+ """Initialize the Vonage WebSocket server transport.
+
+ Args:
+ params: WebSocket server parameters including serializer and audio options.
+ host: Host address for the WebSocket server.
+ port: Port number for the WebSocket server.
+ input_name: Optional name for the input transport.
+ output_name: Optional name for the output transport.
+ """
+ super().__init__(params, host, port, input_name, output_name)
+ self._params = params
+
+ def output(self) -> WebsocketServerOutputTransport:
+ """Return the output transport used to send data to clients."""
+ if not self._output:
+ self._output = VonageAudioConnectorOutputTransport(self, self._params)
+ return self._output
+
+
+class VonageAudioConnectorOutputTransport(WebsocketServerOutputTransport):
+ """Output transport that sends each serializer-produced chunk and sleeps between sends."""
+
+ def __init__(self, transport: BaseTransport, params: WebsocketServerParams, **kwargs) -> None:
+ """Initialize the Vonage WebSocket output transport.
+
+ Args:
+ transport: The base transport instance to wrap.
+ params: WebSocket server parameters.
+ **kwargs: Additional keyword arguments for the base class.
+ """
+ super().__init__(transport, params, **kwargs)
+
+ async def write_audio_frame(self, frame: OutputAudioRawFrame) -> None:
+ """Write an audio frame to the WebSocket client with pacing."""
+ if not self._websocket:
+ # Keep pipeline timing consistent if the client isn't connected yet.
+ await self._write_audio_sleep()
+ return
+
+ normalized = OutputAudioRawFrame(
+ audio=frame.audio,
+ sample_rate=self.sample_rate,
+ num_channels=self._params.audio_out_channels,
+ )
+
+ if self._params.add_wav_header:
+ with io.BytesIO() as buffer:
+ with wave.open(buffer, "wb") as wf:
+ wf.setsampwidth(PCM16_SAMPLE_WIDTH_BYTES)
+ wf.setnchannels(normalized.num_channels)
+ wf.setframerate(normalized.sample_rate)
+ wf.writeframes(normalized.audio)
+ normalized = OutputAudioRawFrame(
+ audio=buffer.getvalue(),
+ sample_rate=normalized.sample_rate,
+ num_channels=normalized.num_channels,
+ )
+
+ await self._write_frame(normalized)
+
+ async def _write_frame(self, frame: Frame) -> None:
+ """Serialize and send a frame to the WebSocket client."""
+ if not self._params.serializer:
+ return
+
+ try:
+ payload = await self._params.serializer.serialize(frame)
+ if payload and self._websocket:
+ # For audio, serializer returns a list[bytes] of chunks.
+ # Pace by sleeping once per chunk using serializer's interval.
+ for chunk in payload:
+ await self._websocket.send(chunk)
+ await asyncio.sleep(self._params.serializer.sleep_interval)
+ except Exception as exc:
+ logger.error(f"{self} exception sending data: {exc.__class__.__name__} ({exc})")
diff --git a/src/pipecat/transports/vonage/video_webrtc.py b/src/pipecat/transports/vonage/video_webrtc.py
new file mode 100644
index 0000000000..97f5a9f480
--- /dev/null
+++ b/src/pipecat/transports/vonage/video_webrtc.py
@@ -0,0 +1,839 @@
+# SPDX-License-Identifier: BSD-2-Clause
+"""Vonage WebRTC transport."""
+
+import asyncio
+import itertools
+from dataclasses import dataclass, replace
+from typing import Awaitable, Callable, Optional
+
+import numpy as np
+from loguru import logger
+
+from pipecat.audio.resamplers.base_audio_resampler import BaseAudioResampler
+from pipecat.audio.utils import create_stream_resampler
+from pipecat.frames.frames import (
+ CancelFrame,
+ EndFrame,
+ InputAudioRawFrame,
+ OutputAudioRawFrame,
+ StartFrame,
+ UserAudioRawFrame,
+)
+from pipecat.processors.frame_processor import FrameProcessor, FrameProcessorSetup
+from pipecat.transports.base_input import BaseInputTransport
+from pipecat.transports.base_output import BaseOutputTransport
+from pipecat.transports.base_transport import BaseTransport, TransportParams
+
+try:
+ import vonage_video_connector as vonage_video
+ from vonage_video_connector.models import (
+ AudioData,
+ LoggingSettings,
+ Publisher,
+ PublisherAudioSettings,
+ PublisherSettings,
+ Session,
+ SessionAudioSettings,
+ SessionSettings,
+ Stream,
+ Subscriber,
+ )
+except ModuleNotFoundError as e:
+ logger.error(f"Exception: {e}")
+ logger.error(
+ f"In order to use Vonage, you need to Vonage's native SDK wrapper for python installed."
+ )
+ raise Exception(f"Missing module: {e}")
+
+
+class VonageVideoWebrtcTransportParams(TransportParams):
+ """Parameters for the Vonage WebRTC transport.
+
+ Parameters:
+ publisher_name: Name of the publisher stream.
+ publisher_enable_opus_dtx: Whether to enable OPUS DTX for publisher audio.
+ session_enable_migration: Whether to enable session migration.
+ """
+
+ publisher_name: str = ""
+ publisher_enable_opus_dtx: bool = False
+ session_enable_migration: bool = False
+
+
+class VonageException(Exception):
+ """Exception raised when a Vonage transport operation fails or encounters an error."""
+
+ pass
+
+
+async def async_noop(*args, **kwargs):
+ """No operation async function."""
+ pass
+
+
+@dataclass
+class VonageClientListener:
+ """Listener for Vonage client events.
+
+ Parameters:
+ on_connected: Async callback when session is connected.
+ on_disconnected: Async callback when session is disconnected.
+ on_error: Async callback for session errors.
+ on_audio_in: Callback for incoming audio data.
+ on_stream_received: Async callback when a stream is received.
+ on_stream_dropped: Async callback when a stream is dropped.
+ on_subscriber_connected: Async callback when a subscriber connects.
+ on_subscriber_disconnected: Async callback when a subscriber disconnects.
+ """
+
+ on_connected: Callable[[Session], Awaitable[None]] = async_noop
+ on_disconnected: Callable[[Session], Awaitable[None]] = async_noop
+ on_error: Callable[[Session, str, int], Awaitable[None]] = async_noop
+ on_audio_in: Callable[[Session, AudioData], None] = lambda _session, _audio: None
+ on_stream_received: Callable[[Session, Stream], Awaitable[None]] = async_noop
+ on_stream_dropped: Callable[[Session, Stream], Awaitable[None]] = async_noop
+ on_subscriber_connected: Callable[[Subscriber], Awaitable[None]] = async_noop
+ on_subscriber_disconnected: Callable[[Subscriber], Awaitable[None]] = async_noop
+
+
+@dataclass
+class VonageClientParams:
+ """Parameters for the Vonage client.
+
+ Parameters:
+ audio_in_sample_rate: Sample rate for incoming audio.
+ audio_in_channels: Number of channels for incoming audio.
+ audio_out_sample_rate: Sample rate for outgoing audio.
+ audio_out_channels: Number of channels for outgoing audio.
+ enable_migration: Whether to enable session migration.
+ """
+
+ audio_in_sample_rate: int = 48000
+ audio_in_channels: int = 2
+ audio_out_sample_rate: int = 48000
+ audio_out_channels: int = 2
+ enable_migration: bool = False
+
+
+class VonageClient:
+ """Client for managing a Vonage Video session.
+
+ Handles connection, publishing, subscribing, and event callbacks for a Vonage Video session.
+
+ Supported features:
+
+ - Connects to a Vonage Video session using provided credentials
+ - Publishes audio streams with configurable settings
+ - Subscribes to remote streams and handles audio data
+ - Manages event listeners for session and stream events
+ - Supports session migration and advanced audio options
+ """
+
+ def __init__(
+ self,
+ application_id: str,
+ session_id: str,
+ token: str,
+ params: VonageClientParams,
+ publisher_settings: Optional[PublisherSettings] = None,
+ ):
+ """Initialize the Vonage client.
+
+ Args:
+ application_id: The Vonage Video application ID.
+ session_id: The session ID to connect to.
+ token: The authentication token for the session.
+ params: Parameters for audio and migration settings.
+ publisher_settings: Optional publisher settings for audio stream.
+ """
+ self._client = vonage_video.VonageVideoClient()
+ self._application_id: str = application_id
+ self._session_id: str = session_id
+ self._token: str = token
+ self._params = params
+ self._connected: bool = False
+ self._connection_counter: int = 0
+ self._listener_id_gen: itertools.count = itertools.count()
+ self._listeners: dict[int, VonageClientListener] = {}
+ self._publish_ready: Optional[asyncio.Future] = None
+ self._publisher_settings: Optional[PublisherSettings] = publisher_settings
+ self._publisher: Optional[Publisher] = None
+ self._loop: Optional[asyncio.AbstractEventLoop] = None
+ self._session = Session(id=session_id)
+
+ def get_params(self) -> VonageClientParams:
+ """Get the parameters of the Vonage client.
+
+ Returns:
+ The VonageClientParams instance for this client.
+ """
+ return self._params
+
+ def add_listener(self, listener: VonageClientListener) -> int:
+ """Add a listener to the Vonage client.
+
+ Args:
+ listener: The VonageClientListener to add.
+
+ Returns:
+ The unique ID assigned to the listener.
+ """
+ listener_id = next(self._listener_id_gen)
+ self._listeners[listener_id] = listener
+ return listener_id
+
+ def remove_listener(self, listener_id: int):
+ """Remove a listener from the Vonage client.
+
+ Args:
+ listener_id: The ID of the listener to remove.
+ """
+ self._listeners.pop(listener_id, None)
+
+ async def connect(self, listener: VonageClientListener) -> int:
+ """Connect to the Vonage session.
+
+ Args:
+ listener: Listener for session events.
+
+ Returns:
+ The unique ID assigned to the listener.
+ """
+ logger.info(f"Connecting with session string {self._session_id}")
+
+ listener_id: int = self.add_listener(listener)
+ if self._connected:
+ logger.info(f"Already connected to {self._session_id}")
+
+ # if we've already connected refcount the times we've connected
+ self._connection_counter += 1
+ await listener.on_connected(self._session)
+ return listener_id
+
+ if self._publish_ready is not None:
+ logger.info(f"Already connecting to {self._session_id}")
+
+ # if we already connecting, await for the publish ready event
+ await self._publish_ready
+ return listener_id
+
+ if self._publisher_settings:
+ loop = asyncio.get_running_loop()
+ self._loop = loop
+ self._publish_ready: asyncio.Future = loop.create_future()
+
+ if not self._client.connect(
+ application_id=self._application_id,
+ session_id=self._session_id,
+ token=self._token,
+ session_settings=SessionSettings(
+ audio=SessionAudioSettings(
+ sample_rate=self._params.audio_out_sample_rate,
+ number_of_channels=self._params.audio_out_channels,
+ ),
+ enable_migration=self._params.enable_migration,
+ logging=LoggingSettings(level="INFO"),
+ ),
+ on_error_cb=self._on_session_error_cb,
+ on_connected_cb=self._on_session_connected_cb,
+ on_disconnected_cb=self._on_session_disconnected_cb,
+ on_stream_received_cb=self._on_stream_received_cb,
+ on_stream_dropped_cb=self._on_stream_dropped_cb,
+ on_audio_data_cb=self._on_session_audio_data_cb,
+ on_ready_for_audio_cb=self._on_session_ready_for_audio_cb,
+ ):
+ logger.error(f"Could not connect to {self._session_id}")
+ raise VonageException("Could not connect to session")
+
+ logger.info(f"Connected to {self._session_id}")
+
+ if self._publish_ready:
+ await self._publish_ready
+
+ self._connected = True
+ await self._on_session_connected()
+ return listener_id
+
+ async def disconnect(self, listener_id: int):
+ """Disconnect from the Vonage session.
+
+ Args:
+ listener_id: The ID of the listener to disconnect.
+ """
+ self._connection_counter -= 1
+ if not self._connected or self._connection_counter != 0:
+ logger.info(f"Already disconnected from {self._session_id}")
+ return
+
+ logger.info(f"Disconnecting from {self._session_id}")
+
+ if self._publisher:
+ self._client.unpublish()
+ self._publisher = None
+
+ self._client.disconnect()
+
+ for listener in self._listeners.values():
+ await listener.on_disconnected(self._session)
+
+ self._listeners.pop(listener_id, None)
+
+ logger.info(f"Disconnected from {self._session_id}")
+
+ async def write_audio(self, raw_audio_frame: bytes):
+ """Write audio data to the Vonage session.
+
+ Args:
+ raw_audio_frame: Raw PCM audio data to inject into the session.
+ """
+ frame_count = len(raw_audio_frame) // (self._params.audio_out_channels * 2)
+ self._client.inject_audio(
+ AudioData(
+ sample_buffer=memoryview(raw_audio_frame).cast("h"),
+ number_of_frames=frame_count,
+ number_of_channels=self._params.audio_out_channels,
+ sample_rate=self._params.audio_out_sample_rate,
+ )
+ )
+
+ async def _on_session_connected(self):
+ for listener in self._listeners.values():
+ await listener.on_connected(self._session)
+
+ def _on_session_ready_for_audio_cb(self, session: Session):
+ logger.info(f"Session {session.id} ready to publish")
+ if self._publish_ready:
+ future = self._publish_ready
+ self._publish_ready = None
+ self._loop.call_soon_threadsafe(future.set_result, None)
+
+ def _on_session_error_cb(self, session: Session, description: str, code: int):
+ logger.warning(f"Session error {session.id} code={code} description={description}")
+ self._loop.call_soon_threadsafe(
+ lambda: asyncio.create_task(self._on_session_error_async_cb(session, description, code))
+ )
+
+ async def _on_session_error_async_cb(self, session: Session, description: str, code: int):
+ for listener in self._listeners.values():
+ await listener.on_error(session.id, description, code)
+
+ def _on_session_connected_cb(self, session: Session):
+ logger.info(f"Session connected {session.id}")
+ self._session = session
+ self._client.publish(
+ settings=self._publisher_settings,
+ on_error_cb=self._on_publisher_error_cb,
+ on_stream_created_cb=self._on_publisher_stream_created_cb,
+ on_stream_destroyed_cb=self._on_publisher_stream_destroyed_cb,
+ )
+
+ def _on_session_disconnected_cb(self, session: Session):
+ logger.info(f"Session disconnected {session.id}")
+ self._connected = False
+
+ def _on_publisher_error_cb(self, publisher: Publisher, description: str, code: int):
+ logger.warning(
+ f"Publisher error session={self._session_id} publisher={publisher.stream.id} "
+ f"code={code} description={description}"
+ )
+
+ def _on_publisher_stream_created_cb(self, publisher: Publisher):
+ logger.info(
+ f"Publisher stream created session={self._session_id} publisher={publisher.stream.id}"
+ )
+ self._publisher = publisher
+
+ def _on_publisher_stream_destroyed_cb(self, publisher: Publisher):
+ logger.info(
+ f"Publisher stream destroyed session={self._session_id} publisher={publisher.stream.id}"
+ )
+
+ def _on_session_audio_data_cb(self, session: Session, audio_data: AudioData):
+ for listener in self._listeners.values():
+ if listener.on_audio_in:
+ listener.on_audio_in(session, audio_data)
+
+ def _on_stream_received_cb(self, session: Session, stream: Stream):
+ logger.info(f"Stream received session={session.id} stream={stream.id}")
+ self._client.subscribe(
+ stream,
+ on_error_cb=self._on_subscriber_error_cb,
+ on_connected_cb=self._on_subscriber_connected_cb,
+ on_disconnected_cb=self._on_subscriber_disconnected_cb,
+ )
+ self._loop.call_soon_threadsafe(
+ lambda: asyncio.create_task(self._on_stream_received_async_cb(session, stream))
+ )
+
+ async def _on_stream_received_async_cb(self, session: Session, stream: Stream):
+ for listener in self._listeners.values():
+ await listener.on_stream_received(session, stream)
+
+ def _on_stream_dropped_cb(self, session: Session, stream: Stream):
+ logger.info(f"Stream dropped session={session.id} stream={stream.id}")
+ self._client.unsubscribe(stream)
+ self._loop.call_soon_threadsafe(
+ lambda: asyncio.create_task(self._on_stream_dropped_async_cb(session, stream))
+ )
+
+ async def _on_stream_dropped_async_cb(self, session: Session, stream: Stream):
+ for listener in self._listeners.values():
+ await listener.on_stream_dropped(session, stream)
+
+ def _on_subscriber_error_cb(self, subscriber: Subscriber, description: str, code: int):
+ logger.info(
+ f"Subscriber error session={self._session_id} subscriber={subscriber.stream.id} "
+ f"code={code} description={description}"
+ )
+
+ def _on_subscriber_connected_cb(self, subscriber: Subscriber):
+ logger.info(
+ f"Subscriber connected session={self._session_id} subscriber={subscriber.stream.id} "
+ )
+ self._loop.call_soon_threadsafe(
+ lambda: asyncio.create_task(self._on_subscriber_connected_async_cb(subscriber))
+ )
+
+ async def _on_subscriber_connected_async_cb(self, subscriber: Subscriber):
+ for listener in self._listeners.values():
+ await listener.on_subscriber_connected(subscriber)
+
+ def _on_subscriber_disconnected_cb(self, subscriber: Subscriber):
+ logger.info(
+ f"Subscriber disconnected session={self._session_id} subscriber={subscriber.stream.id} "
+ )
+ self._loop.call_soon_threadsafe(
+ lambda: asyncio.create_task(self._on_subscriber_disconnected_async_cb(subscriber))
+ )
+
+ async def _on_subscriber_disconnected_async_cb(self, subscriber: Subscriber):
+ for listener in self._listeners.values():
+ await listener.on_subscriber_disconnected(subscriber)
+
+
+class VonageVideoWebrtcInputTransport(BaseInputTransport):
+ """Input transport for Vonage, handling audio input from the Vonage session.
+
+ Receives audio from a Vonage Video session and pushes it as input frames.
+ """
+
+ _params: VonageVideoWebrtcTransportParams
+
+ def __init__(self, client: VonageClient, params: VonageVideoWebrtcTransportParams):
+ """Initialize the Vonage input transport.
+
+ Args:
+ client: The VonageClient instance to use.
+ params: Transport parameters for input configuration.
+ """
+ super().__init__(params)
+ self._initialized: bool = False
+ self._client: VonageClient = client
+ self._listener_id: Optional[int] = None
+ self._resampler = create_stream_resampler()
+
+ async def start(self, frame: StartFrame):
+ """Start the Vonage input transport.
+
+ Args:
+ frame: The StartFrame to initiate the transport.
+ """
+ await super().start(frame)
+
+ if self._initialized:
+ return
+
+ self._initialized = True
+
+ if self._params.audio_in_enabled:
+ self._listener_id: int = await self._client.connect(
+ VonageClientListener(on_audio_in=self._audio_in_cb)
+ )
+
+ await self.set_transport_ready(frame)
+
+ def _audio_in_cb(self, _session: Session, audio: AudioData):
+ if self._listener_id is not None and self._params.audio_in_enabled:
+ check_audio_data(audio.sample_buffer, audio.number_of_frames, audio.number_of_channels)
+
+ audio_sample_rate = audio.sample_rate
+ number_of_channels = audio.number_of_channels
+
+ # we need to copy the raw audio here as it is a memory view and it will be lost when processed async later
+ audio_np = np.frombuffer(audio.sample_buffer, dtype=np.int16)
+
+ async def push_frame():
+ # TODO(Toni S): this normalization won't be necessary once VIDMP-1393 is done
+ processed_audio_np = await process_audio(
+ self._resampler,
+ audio_np,
+ AudioProps(
+ sample_rate=audio_sample_rate,
+ is_stereo=number_of_channels == 2,
+ ),
+ AudioProps(
+ sample_rate=self.sample_rate,
+ is_stereo=self._params.audio_in_channels == 2,
+ ),
+ )
+
+ frame = InputAudioRawFrame(
+ audio=processed_audio_np.tobytes(),
+ sample_rate=self.sample_rate,
+ num_channels=self._params.audio_in_channels,
+ )
+
+ await self.push_audio_frame(frame)
+
+ asyncio.run_coroutine_threadsafe(push_frame(), self.get_event_loop())
+
+ async def stop(self, frame: EndFrame):
+ """Stop the Vonage input transport.
+
+ Args:
+ frame: The EndFrame to stop the transport.
+ """
+ await super().stop(frame)
+ if self._listener_id is not None and self._params.audio_in_enabled:
+ listener_id, self._listener_id = self._listener_id, None
+ await self._client.disconnect(listener_id)
+
+ async def cancel(self, frame: CancelFrame):
+ """Cancel the Vonage input transport.
+
+ Args:
+ frame: The CancelFrame to cancel the transport.
+ """
+ await super().cancel(frame)
+ if self._listener_id is not None and self._params.audio_in_enabled:
+ listener_id, self._listener_id = self._listener_id, None
+ await self._client.disconnect(listener_id)
+
+
+class VonageVideoWebrtcOutputTransport(BaseOutputTransport):
+ """Output transport for Vonage, handling audio output to the Vonage session.
+
+ Sends audio frames to a Vonage Video session as output.
+ """
+
+ _params: VonageVideoWebrtcTransportParams
+
+ def __init__(self, client: VonageClient, params: VonageVideoWebrtcTransportParams):
+ """Initialize the Vonage output transport.
+
+ Args:
+ client: The VonageClient instance to use.
+ params: Transport parameters for output configuration.
+ """
+ super().__init__(params)
+ self._initialized: bool = False
+ self._resampler = create_stream_resampler()
+ self._client = client
+ self._listener_id: Optional[int] = None
+
+ async def start(self, frame: StartFrame):
+ """Start the Vonage output transport.
+
+ Args:
+ frame: The StartFrame to initiate the transport.
+ """
+ await super().start(frame)
+
+ if self._initialized:
+ return
+
+ self._initialized = True
+
+ if self._params.audio_out_enabled:
+ self._listener_id: int = await self._client.connect(VonageClientListener())
+
+ await self.set_transport_ready(frame)
+
+ async def write_audio_frame(self, frame: OutputAudioRawFrame):
+ """Write an audio frame to the Vonage session.
+
+ Args:
+ frame: The OutputAudioRawFrame to send.
+ """
+ if self._listener_id is not None and self._params.audio_out_enabled:
+ check_audio_data(frame.audio, frame.num_frames, frame.num_channels)
+
+ audio = frame.audio
+ params: VonageClientParams = self._client.get_params()
+ np_audio = np.frombuffer(audio, dtype=np.int16)
+
+ # TODO(Toni S): this normalization won't be necessary once VIDMP-1393 is done
+ processed_audio = await process_audio(
+ self._resampler,
+ np_audio,
+ AudioProps(
+ sample_rate=frame.sample_rate,
+ is_stereo=frame.num_channels == 2,
+ ),
+ AudioProps(
+ sample_rate=params.audio_out_sample_rate,
+ is_stereo=params.audio_out_channels == 2,
+ ),
+ )
+
+ await self._client.write_audio(processed_audio.tobytes())
+
+ async def stop(self, frame: EndFrame):
+ """Stop the Vonage output transport.
+
+ Args:
+ frame: The EndFrame to stop the transport.
+ """
+ await super().stop(frame)
+ if self._listener_id is not None and self._params.audio_out_enabled:
+ listener_id, self._listener_id = self._listener_id, None
+ await self._client.disconnect(listener_id)
+
+ async def cancel(self, frame: CancelFrame):
+ """Cancel the Vonage output transport.
+
+ Args:
+ frame: The CancelFrame to cancel the transport.
+ """
+ await super().cancel(frame)
+ if self._listener_id is not None and self._params.audio_out_enabled:
+ listener_id, self._listener_id = self._listener_id, None
+ await self._client.disconnect(listener_id)
+
+
+class VonageVideoWebrtcTransport(BaseTransport):
+ """Vonage WebRTC transport implementation for Pipecat.
+
+ Provides input and output audio transport for Vonage Video sessions, supporting event handling
+ for session and participant lifecycle.
+
+ Supported features:
+
+ - Audio input and output transport for Vonage Video sessions
+ - Event handler registration for session and participant events
+ - Publisher and subscriber management
+ - Configurable audio and migration parameters
+ """
+
+ _params: VonageVideoWebrtcTransportParams
+
+ def __init__(
+ self,
+ application_id: str,
+ session_id: str,
+ token: str,
+ params: VonageVideoWebrtcTransportParams,
+ ):
+ """Initialize the Vonage WebRTC transport.
+
+ Args:
+ application_id: The Vonage Video application ID.
+ session_id: The session ID to connect to.
+ token: The authentication token for the session.
+ params: Transport parameters for input/output configuration.
+ """
+ super().__init__()
+ params.audio_out_sample_rate = params.audio_out_sample_rate or 48000
+ self._params = params
+
+ vonage_params = VonageClientParams(
+ audio_in_sample_rate=params.audio_in_sample_rate,
+ audio_in_channels=params.audio_in_channels,
+ audio_out_sample_rate=params.audio_out_sample_rate,
+ audio_out_channels=params.audio_out_channels,
+ enable_migration=params.session_enable_migration,
+ )
+ publisher_settings = (
+ PublisherSettings(
+ name=params.publisher_name,
+ audio_settings=PublisherAudioSettings(
+ enable_stereo_mode=params.audio_out_channels == 2,
+ enable_opus_dtx=params.publisher_enable_opus_dtx,
+ ),
+ )
+ if params.audio_out_enabled
+ else None
+ )
+ self._client = VonageClient(
+ application_id, session_id, token, vonage_params, publisher_settings
+ )
+
+ # Register supported handlers.
+ self._register_event_handler("on_joined")
+ self._register_event_handler("on_left")
+ self._register_event_handler("on_error")
+ self._register_event_handler("on_client_connected")
+ self._register_event_handler("on_client_disconnected")
+ self._register_event_handler("on_first_participant_joined")
+ self._register_event_handler("on_participant_joined")
+ self._register_event_handler("on_participant_left")
+
+ self._client.add_listener(
+ VonageClientListener(
+ on_connected=self._on_connected,
+ on_disconnected=self._on_disconnected,
+ on_error=self._on_error,
+ on_stream_received=self._on_stream_received,
+ on_stream_dropped=self._on_stream_dropped,
+ on_subscriber_connected=self._on_subscriber_connected,
+ on_subscriber_disconnected=self._on_subscriber_disconnected,
+ )
+ )
+
+ self._input: Optional[VonageVideoWebrtcInputTransport] = None
+ self._output: Optional[VonageVideoWebrtcOutputTransport] = None
+ self._one_stream_received: bool = False
+
+ def input(self) -> FrameProcessor:
+ """Get the input transport for Vonage.
+
+ Returns:
+ The VonageVideoWebrtcInputTransport instance.
+ """
+ if not self._input:
+ self._input = VonageVideoWebrtcInputTransport(self._client, self._params)
+ return self._input
+
+ def output(self) -> FrameProcessor:
+ """Get the output transport for Vonage.
+
+ Returns:
+ The VonageVideoWebrtcOutputTransport instance.
+ """
+ if not self._output:
+ self._output = VonageVideoWebrtcOutputTransport(self._client, self._params)
+ return self._output
+
+ async def _on_connected(self, session: Session):
+ """Handle session connected event.
+
+ Args:
+ session: The connected Session object.
+ """
+ await self._call_event_handler("on_joined", {"sessionId": session.id})
+
+ async def _on_disconnected(self, _session_id: Session):
+ """Handle session disconnected event.
+
+ Args:
+ _session_id: The disconnected Session object.
+ """
+ await self._call_event_handler("on_left")
+
+ async def _on_error(self, _session: Session, description: str, _code: int):
+ """Handle session error event.
+
+ Args:
+ _session: The Session object.
+ description: Error description.
+ _code: Error code.
+ """
+ await self._call_event_handler("on_error", description)
+
+ async def _on_stream_received(self, session: Session, stream: Stream):
+ """Handle stream received event.
+
+ Args:
+ session: The Session object.
+ stream: The received Stream object.
+ """
+ if not self._one_stream_received:
+ self._one_stream_received = True
+ await self._call_event_handler(
+ "on_first_participant_joined", {"sessionId": session.id, "streamId": stream.id}
+ )
+
+ await self._call_event_handler(
+ "on_participant_joined", {"sessionId": session.id, "streamId": stream.id}
+ )
+
+ async def _on_stream_dropped(self, session: Session, stream: Stream):
+ """Handle stream dropped event.
+
+ Args:
+ session: The Session object.
+ stream: The dropped Stream object.
+ """
+ await self._call_event_handler(
+ "on_participant_left", {"sessionId": session.id, "streamId": stream.id}
+ )
+
+ async def _on_subscriber_connected(self, subscriber: Subscriber):
+ """Handle subscriber connected event.
+
+ Args:
+ subscriber: The connected Subscriber object.
+ """
+ await self._call_event_handler(
+ "on_client_connected", {"subscriberId": subscriber.stream.id}
+ )
+
+ async def _on_subscriber_disconnected(self, subscriber: Subscriber):
+ """Handle subscriber disconnected event.
+
+ Args:
+ subscriber: The disconnected Subscriber object.
+ """
+ await self._call_event_handler(
+ "on_client_disconnected", {"subscriberId": subscriber.stream.id}
+ )
+
+
+def check_audio_data(buffer: bytes | memoryview, number_of_frames: int, number_of_channels):
+ """Check the audio sample width based on buffer size, number of frames and channels."""
+ if number_of_channels not in (1, 2):
+ raise ValueError(f"We only accept mono or stereo audio, got {number_of_channels}")
+
+ if isinstance(buffer, memoryview):
+ bytes_per_sample = buffer.itemsize
+ else:
+ bytes_per_sample = len(buffer) // (number_of_frames * number_of_channels)
+
+ if bytes_per_sample != 2:
+ raise ValueError(f"We only accept 16 bit PCM audio, got {bytes_per_sample * 8} bit")
+
+
+@dataclass
+class AudioProps:
+ """Audio properties for normalization.
+
+ Parameters:
+ sample_rate: The sample rate of the audio.
+ is_stereo: Whether the audio is stereo (True) or mono (False).
+ """
+
+ sample_rate: int
+ is_stereo: bool
+
+
+def process_audio_channels(
+ audio: np.ndarray, current: AudioProps, target: AudioProps
+) -> np.ndarray:
+ """Normalize audio channels to the target properties."""
+ if current.is_stereo != target.is_stereo:
+ if target.is_stereo:
+ audio = np.repeat(audio, 2)
+ else:
+ audio = audio.reshape(-1, 2).mean(axis=1).astype(np.int16)
+
+ return audio
+
+
+async def process_audio(
+ resampler: BaseAudioResampler, audio: np.ndarray, current: AudioProps, target: AudioProps
+) -> np.ndarray:
+ """Normalize audio to the target properties."""
+ res_audio = audio
+ if current.sample_rate != target.sample_rate:
+ # first normalize channels to mono if needed, then resample, then normalize channels to target
+ res_audio = process_audio_channels(res_audio, current, replace(current, is_stereo=False))
+ current = replace(current, is_stereo=False)
+
+ res_audio = await resampler.resample(
+ res_audio.tobytes(), current.sample_rate, target.sample_rate
+ )
+ res_audio = np.frombuffer(res_audio, dtype=np.int16)
+
+ res_audio = process_audio_channels(res_audio, current, target)
+
+ return res_audio
diff --git a/tests/test_vonage_video_webrtc.py b/tests/test_vonage_video_webrtc.py
new file mode 100644
index 0000000000..41dd6924ba
--- /dev/null
+++ b/tests/test_vonage_video_webrtc.py
@@ -0,0 +1,847 @@
+# SPDX-License-Identifier: BSD 2-Clause License
+
+import asyncio
+import sys
+import unittest
+from unittest.mock import AsyncMock, MagicMock, Mock, call, patch
+
+import numpy as np
+
+from pipecat.frames.frames import (
+ CancelFrame,
+ EndFrame,
+ InputAudioRawFrame,
+ OutputAudioRawFrame,
+ StartFrame,
+)
+
+# Mock the vonage_video module since it's not available in test environment
+vonage_video_mock = MagicMock()
+vonage_video_mock.VonageVideoClient = MagicMock()
+vonage_video_mock.models = MagicMock()
+
+
+# Create mock classes that match the expected interface
+class MockAudioData:
+ def __init__(self, sample_buffer, number_of_frames, number_of_channels, sample_rate):
+ self.sample_buffer = sample_buffer
+ self.number_of_frames = number_of_frames
+ self.number_of_channels = number_of_channels
+ self.sample_rate = sample_rate
+
+
+class MockSession:
+ def __init__(self, id="test_session"):
+ self.id = id
+
+
+class MockStream:
+ def __init__(self, id="test_stream"):
+ self.id = id
+
+
+class MockPublisher:
+ def __init__(self, stream=None):
+ self.stream = stream or MockStream()
+
+
+class MockSubscriber:
+ def __init__(self, stream=None):
+ self.stream = stream or MockStream()
+
+
+# Set up the mock module structure
+vonage_video_mock.models.AudioData = MockAudioData
+vonage_video_mock.models.Session = MockSession
+vonage_video_mock.models.Stream = MockStream
+vonage_video_mock.models.Publisher = MockPublisher
+vonage_video_mock.models.Subscriber = MockSubscriber
+vonage_video_mock.models.LoggingSettings = MagicMock
+vonage_video_mock.models.PublisherSettings = MagicMock
+vonage_video_mock.models.PublisherAudioSettings = MagicMock
+vonage_video_mock.models.SessionSettings = MagicMock
+vonage_video_mock.models.SessionAudioSettings = MagicMock
+
+# Mock the module in sys.modules so imports work
+sys.modules["vonage_video_connector"] = vonage_video_mock
+sys.modules["vonage_video_connector.models"] = vonage_video_mock.models
+
+
+# Now we can import the transport classes since the vonage_video module is mocked
+from pipecat.transports.vonage.video_webrtc import (
+ AudioProps,
+ VonageClient,
+ VonageClientListener,
+ VonageClientParams,
+ VonageVideoWebrtcInputTransport,
+ VonageVideoWebrtcOutputTransport,
+ VonageVideoWebrtcTransport,
+ VonageVideoWebrtcTransportParams,
+ check_audio_data,
+ process_audio,
+ process_audio_channels,
+)
+
+
+class TestVonageVideoWebrtcTransport(unittest.IsolatedAsyncioTestCase):
+ """Test cases for Vonage Video WebRTC transport classes."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.VonageClient = VonageClient
+ self.VonageClientListener = VonageClientListener
+ self.VonageClientParams = VonageClientParams
+ self.VonageVideoWebrtcInputTransport = VonageVideoWebrtcInputTransport
+ self.VonageVideoWebrtcOutputTransport = VonageVideoWebrtcOutputTransport
+ self.VonageVideoWebrtcTransport = VonageVideoWebrtcTransport
+ self.VonageVideoWebrtcTransportParams = VonageVideoWebrtcTransportParams
+
+ # Mock client instance
+ self.mock_client_instance = Mock()
+ vonage_video_mock.VonageVideoClient.return_value = self.mock_client_instance
+
+ # Common test data
+ self.application_id = "test-app-id"
+ self.session_id = "test-session-id"
+ self.token = "test-token"
+
+ def tearDown(self):
+ """Clean up after tests."""
+ pass
+
+ def test_vonage_client_params_defaults(self):
+ """Test VonageClientParams default values."""
+ params = self.VonageClientParams()
+ self.assertEqual(params.audio_in_sample_rate, 48000)
+ self.assertEqual(params.audio_in_channels, 2)
+ self.assertFalse(params.enable_migration)
+
+ def test_vonage_client_params_custom_values(self):
+ """Test VonageClientParams with custom values."""
+ params = self.VonageClientParams(
+ audio_in_sample_rate=16000,
+ audio_in_channels=1,
+ audio_out_sample_rate=22050,
+ audio_out_channels=1,
+ enable_migration=True,
+ )
+ self.assertEqual(params.audio_in_sample_rate, 16000)
+ self.assertEqual(params.audio_in_channels, 1)
+ self.assertEqual(params.audio_out_sample_rate, 22050)
+ self.assertEqual(params.audio_out_channels, 1)
+ self.assertTrue(params.enable_migration)
+
+ def test_vonage_client_listener_defaults(self):
+ """Test VonageClientListener default values."""
+ listener = self.VonageClientListener()
+ self.assertIsNotNone(listener.on_connected)
+ self.assertIsNotNone(listener.on_disconnected)
+ self.assertIsNotNone(listener.on_error)
+ self.assertIsNotNone(listener.on_audio_in)
+ self.assertIsNotNone(listener.on_stream_received)
+ self.assertIsNotNone(listener.on_stream_dropped)
+ self.assertIsNotNone(listener.on_subscriber_connected)
+ self.assertIsNotNone(listener.on_subscriber_disconnected)
+
+ def test_vonage_transport_params_defaults(self):
+ """Test VonageVideoWebrtcTransportParams default values."""
+ params = self.VonageVideoWebrtcTransportParams()
+ self.assertEqual(params.publisher_name, "")
+ self.assertFalse(params.publisher_enable_opus_dtx)
+ self.assertFalse(params.session_enable_migration)
+
+ def test_vonage_client_initialization(self):
+ """Test VonageClient initialization."""
+ # Reset the mock for this specific test
+ vonage_video_mock.VonageVideoClient.reset_mock()
+
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ self.assertEqual(client._application_id, self.application_id)
+ self.assertEqual(client._session_id, self.session_id)
+ self.assertEqual(client._token, self.token)
+ self.assertEqual(client._params, params)
+ self.assertFalse(client._connected)
+ self.assertEqual(client._connection_counter, 0)
+ vonage_video_mock.VonageVideoClient.assert_called_once()
+
+ def test_vonage_client_add_remove_listener(self):
+ """Test adding and removing listeners from VonageClient."""
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ listener = self.VonageClientListener()
+ listener_id = client.add_listener(listener)
+
+ self.assertIsInstance(listener_id, int)
+ self.assertIn(listener_id, client._listeners)
+ self.assertEqual(client._listeners[listener_id], listener)
+
+ client.remove_listener(listener_id)
+ self.assertNotIn(listener_id, client._listeners)
+
+ async def test_vonage_client_connect_first_time(self):
+ """Test VonageClient connect method for first connection."""
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ # Mock the connect method to return True
+ self.mock_client_instance.connect.return_value = True
+
+ listener = self.VonageClientListener()
+ listener_id = await client.connect(listener)
+
+ self.assertIsInstance(listener_id, int)
+ self.mock_client_instance.connect.assert_called_once()
+
+ # Verify connect was called with correct parameters
+ call_args = self.mock_client_instance.connect.call_args
+ self.assertEqual(call_args[1]["application_id"], self.application_id)
+ self.assertEqual(call_args[1]["session_id"], self.session_id)
+ self.assertEqual(call_args[1]["token"], self.token)
+
+ async def test_vonage_client_connect_already_connected(self):
+ """Test VonageClient connect when already connected."""
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ # Mock the connect method to return True
+ self.mock_client_instance.connect.return_value = True
+
+ # First connection
+ listener1 = self.VonageClientListener()
+ listener1.on_connected = AsyncMock()
+ await client.connect(listener1)
+
+ listener1.on_connected.assert_called_once()
+
+ # Set connected state manually since we're mocking
+ client._connected = True
+ client._connection_counter = 1
+
+ # Second connection
+ listener2 = self.VonageClientListener()
+ listener2.on_connected = AsyncMock()
+ listener_id2 = await client.connect(listener2)
+
+ self.assertIsInstance(listener_id2, int)
+ self.assertEqual(client._connection_counter, 2)
+ listener2.on_connected.assert_called_once()
+
+ listener1.on_connected.assert_called_once()
+
+ async def test_vonage_client_connect_failure(self):
+ """Test VonageClient connect method when connection fails."""
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ # Mock the connect method to return False
+ self.mock_client_instance.connect.return_value = False
+
+ listener = self.VonageClientListener()
+
+ with self.assertRaises(Exception) as context:
+ await client.connect(listener)
+
+ self.assertIn("Could not connect to session", str(context.exception))
+
+ async def test_vonage_client_disconnect(self):
+ """Test VonageClient disconnect method."""
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ # Mock connected state
+ client._connected = True
+ client._connection_counter = 1
+
+ listener = self.VonageClientListener()
+ listener.on_disconnected = AsyncMock()
+ listener_id = client.add_listener(listener)
+
+ await client.disconnect(listener_id)
+
+ self.mock_client_instance.disconnect.assert_called_once()
+ listener.on_disconnected.assert_called_once()
+
+ async def test_vonage_client_write_audio(self):
+ """Test VonageClient write_audio method."""
+ params = self.VonageClientParams(audio_out_channels=2, audio_out_sample_rate=48000)
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ # Create mock audio data
+ audio_data = b"\x00\x01\x02\x03\x04\x05\x06\x07" # 4 frames of 2-channel 16-bit audio
+
+ await client.write_audio(audio_data)
+
+ self.mock_client_instance.inject_audio.assert_called_once()
+ call_args = self.mock_client_instance.inject_audio.call_args[0][0]
+ self.assertEqual(call_args.number_of_frames, 2) # 8 bytes / (2 channels * 2 bytes)
+ self.assertEqual(call_args.number_of_channels, 2)
+ self.assertEqual(call_args.sample_rate, 48000)
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_input_transport_initialization(self, mock_resampler):
+ """Test VonageVideoWebrtcInputTransport initialization."""
+ mock_resampler.return_value = Mock()
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(audio_in_enabled=True)
+ transport = self.VonageVideoWebrtcInputTransport(client, transport_params)
+
+ self.assertEqual(transport._client, client)
+ self.assertFalse(transport._initialized)
+ mock_resampler.assert_called_once()
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_input_transport_start(self, mock_resampler):
+ """Test VonageVideoWebrtcInputTransport start method."""
+ mock_resampler.return_value = Mock()
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(audio_in_enabled=True)
+ transport = self.VonageVideoWebrtcInputTransport(client, transport_params)
+
+ # Mock the client connect method
+ client.connect = AsyncMock(return_value=1)
+ transport.set_transport_ready = AsyncMock()
+
+ start_frame = StartFrame()
+ await transport.start(start_frame)
+
+ self.assertTrue(transport._initialized)
+ client.connect.assert_called_once()
+ transport.set_transport_ready.assert_called_once_with(start_frame)
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_input_transport_stop(self, mock_resampler):
+ """Test VonageVideoWebrtcInputTransport stop method."""
+ mock_resampler.return_value = Mock()
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(audio_in_enabled=True)
+ transport = self.VonageVideoWebrtcInputTransport(client, transport_params)
+ transport._listener_id = 1
+
+ # Mock the client disconnect method
+ client.disconnect = AsyncMock()
+
+ end_frame = EndFrame()
+ await transport.stop(end_frame)
+
+ client.disconnect.assert_called_once_with(1)
+ self.assertIsNone(transport._listener_id)
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_input_transport_cancel(self, mock_resampler):
+ """Test VonageVideoWebrtcInputTransport cancel method."""
+ mock_resampler.return_value = Mock()
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(audio_in_enabled=True)
+ transport = self.VonageVideoWebrtcInputTransport(client, transport_params)
+ transport._listener_id = 1
+
+ # Mock the client disconnect method
+ client.disconnect = AsyncMock()
+
+ cancel_frame = CancelFrame()
+ await transport.cancel(cancel_frame)
+
+ client.disconnect.assert_called_once_with(1)
+ self.assertIsNone(transport._listener_id)
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_output_transport_initialization(self, mock_resampler):
+ """Test VonageVideoWebrtcOutputTransport initialization."""
+ mock_resampler.return_value = Mock()
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(audio_out_enabled=True)
+ transport = self.VonageVideoWebrtcOutputTransport(client, transport_params)
+
+ self.assertEqual(transport._client, client)
+ self.assertFalse(transport._initialized)
+ mock_resampler.assert_called_once()
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_output_transport_start(self, mock_resampler):
+ """Test VonageVideoWebrtcOutputTransport start method."""
+ mock_resampler.return_value = Mock()
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(audio_out_enabled=True)
+ transport = self.VonageVideoWebrtcOutputTransport(client, transport_params)
+
+ # Mock the client connect method
+ client.connect = AsyncMock(return_value=1)
+ transport.set_transport_ready = AsyncMock()
+
+ start_frame = StartFrame()
+ await transport.start(start_frame)
+
+ self.assertTrue(transport._initialized)
+ client.connect.assert_called_once()
+ transport.set_transport_ready.assert_called_once_with(start_frame)
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_output_transport_write_audio_frame(self, mock_resampler):
+ """Test VonageVideoWebrtcOutputTransport write_audio_frame method."""
+ mock_resampler_instance = Mock()
+ mock_resampler_instance.resample = AsyncMock(return_value=b"\x00\x01\x02\x03")
+ mock_resampler.return_value = mock_resampler_instance
+
+ params = self.VonageClientParams(audio_out_sample_rate=48000, audio_out_channels=2)
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+ client.write_audio = AsyncMock()
+ client.get_params = Mock(return_value=params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(audio_out_enabled=True)
+ transport = self.VonageVideoWebrtcOutputTransport(client, transport_params)
+ transport._listener_id = 1
+
+ # Create a mock audio frame
+ audio_frame = OutputAudioRawFrame(
+ audio=b"\x00\x01\x02\x03", sample_rate=16000, num_channels=1
+ )
+
+ await transport.write_audio_frame(audio_frame)
+
+ # Verify resampling was called
+ mock_resampler_instance.resample.assert_called_once_with(audio_frame.audio, 16000, 48000)
+ # Verify audio was written to client
+ client.write_audio.assert_called_once()
+
+ async def test_vonage_transport_initialization(self):
+ """Test VonageVideoWebrtcTransport initialization."""
+ params = self.VonageVideoWebrtcTransportParams(
+ audio_out_sample_rate=48000,
+ audio_out_channels=2,
+ audio_out_enabled=True,
+ session_enable_migration=True,
+ publisher_name="test-publisher",
+ publisher_enable_opus_dtx=True,
+ )
+
+ transport = self.VonageVideoWebrtcTransport(
+ self.application_id, self.session_id, self.token, params
+ )
+
+ self.assertIsNotNone(transport._client)
+ self.assertFalse(transport._one_stream_received)
+
+ # Verify vonage client was initialized with correct parameters
+ client_params = transport._client._params
+ self.assertEqual(client_params.audio_out_sample_rate, 48000)
+ self.assertEqual(client_params.audio_out_channels, 2)
+ self.assertTrue(client_params.enable_migration)
+
+ async def test_vonage_transport_input_output_methods(self):
+ """Test VonageVideoWebrtcTransport input and output methods."""
+ params = self.VonageVideoWebrtcTransportParams()
+ transport = self.VonageVideoWebrtcTransport(
+ self.application_id, self.session_id, self.token, params
+ )
+
+ # Test input method
+ input_transport = transport.input()
+ self.assertIsInstance(input_transport, self.VonageVideoWebrtcInputTransport)
+
+ # Test output method
+ output_transport = transport.output()
+ self.assertIsInstance(output_transport, self.VonageVideoWebrtcOutputTransport)
+
+ # Verify they return the same instances on subsequent calls
+ self.assertIs(transport.input(), input_transport)
+ self.assertIs(transport.output(), output_transport)
+
+ @patch("pipecat.transports.vonage.video_webrtc.asyncio.run_coroutine_threadsafe")
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_vonage_input_audio_callback(self, mock_resampler, mock_run_coroutine):
+ """Test audio input callback processing."""
+ resampled_audio = b"\x00\x01\x02\x03"
+ resampled_bitrate = 26000
+ mock_resampler_instance = Mock()
+ mock_resampler_instance.resample = AsyncMock(return_value=resampled_audio)
+ mock_resampler.return_value = mock_resampler_instance
+
+ push_frame_coroutine = None
+
+ # Mock the run_coroutine_threadsafe to capture the coroutine
+ def mock_run_coro(coro, loop):
+ nonlocal push_frame_coroutine
+ push_frame_coroutine = coro
+ # Return a mock task
+ task = Mock()
+ task.result.return_value = None
+ return task
+
+ mock_run_coroutine.side_effect = mock_run_coro
+
+ params = self.VonageClientParams()
+ client = self.VonageClient(self.application_id, self.session_id, self.token, params)
+
+ transport_params = self.VonageVideoWebrtcTransportParams(
+ audio_in_enabled=True,
+ audio_in_sample_rate=resampled_bitrate,
+ )
+ transport = self.VonageVideoWebrtcInputTransport(client, transport_params)
+ transport._listener_id = 1
+ transport.push_audio_frame = AsyncMock()
+ transport.get_event_loop = Mock(return_value=asyncio.get_event_loop())
+
+ # Mock the client connect method
+ client.connect = AsyncMock(return_value=1)
+ transport.set_transport_ready = AsyncMock()
+ start_frame = StartFrame()
+ await transport.start(start_frame)
+
+ # Create mock audio data
+ audio_buffer = np.array([100, 200, 300, 400], dtype=np.int16)
+ mock_audio_data = Mock()
+ mock_audio_data.sample_buffer = audio_buffer.tobytes()
+ mock_audio_data.number_of_frames = 2
+ mock_audio_data.number_of_channels = 2
+ mock_audio_data.sample_rate = 48000
+
+ # Create mock session
+ mock_session = Mock()
+
+ # Call the audio callback
+ transport._audio_in_cb(mock_session, mock_audio_data)
+
+ # Execute the captured coroutine and check it does what we expect
+ self.assertIsNotNone(push_frame_coroutine)
+ await push_frame_coroutine
+
+ transport.push_audio_frame.assert_called_once()
+ # Verify run_coroutine_threadsafe was called
+ mock_run_coroutine.assert_called_once()
+ arg = transport.push_audio_frame.call_args[0][0]
+ self.assertIsInstance(arg, InputAudioRawFrame)
+ self.assertEqual(arg.audio, resampled_audio)
+ self.assertEqual(arg.sample_rate, resampled_bitrate)
+ self.assertEqual(arg.num_channels, 1)
+
+ async def test_vonage_transport_event_handlers(self):
+ """Test VonageVideoWebrtcTransport event handlers."""
+ params = self.VonageVideoWebrtcTransportParams()
+ transport = self.VonageVideoWebrtcTransport(
+ self.application_id, self.session_id, self.token, params
+ )
+
+ # Mock the event handler calling mechanism
+ transport._call_event_handler = AsyncMock()
+
+ # Test session events
+ mock_session = Mock()
+ mock_session.id = "session-123"
+
+ await transport._on_connected(mock_session)
+ transport._call_event_handler.assert_called_with("on_joined", {"sessionId": "session-123"})
+
+ await transport._on_disconnected(mock_session)
+ transport._call_event_handler.assert_called_with("on_left")
+
+ await transport._on_error(mock_session, "test error", 500)
+ transport._call_event_handler.assert_called_with("on_error", "test error")
+
+ # Test stream events
+ mock_stream = Mock()
+ mock_stream.id = "stream-456"
+
+ await transport._on_stream_received(mock_session, mock_stream)
+ # Should call both first participant and participant joined events
+ expected_calls = [
+ call(
+ "on_first_participant_joined",
+ {"sessionId": "session-123", "streamId": "stream-456"},
+ ),
+ call("on_participant_joined", {"sessionId": "session-123", "streamId": "stream-456"}),
+ ]
+ transport._call_event_handler.assert_has_calls(expected_calls)
+
+ await transport._on_stream_dropped(mock_session, mock_stream)
+ transport._call_event_handler.assert_called_with(
+ "on_participant_left", {"sessionId": "session-123", "streamId": "stream-456"}
+ )
+
+ # Test subscriber events
+ mock_subscriber = Mock()
+ mock_subscriber.stream.id = "subscriber-789"
+
+ await transport._on_subscriber_connected(mock_subscriber)
+ transport._call_event_handler.assert_called_with(
+ "on_client_connected", {"subscriberId": "subscriber-789"}
+ )
+
+ await transport._on_subscriber_disconnected(mock_subscriber)
+ transport._call_event_handler.assert_called_with(
+ "on_client_disconnected", {"subscriberId": "subscriber-789"}
+ )
+
+ async def test_vonage_transport_first_participant_flag(self):
+ """Test that first participant event is only called once."""
+ params = self.VonageVideoWebrtcTransportParams()
+ transport = self.VonageVideoWebrtcTransport(
+ self.application_id, self.session_id, self.token, params
+ )
+
+ transport._call_event_handler = AsyncMock()
+
+ mock_session = Mock()
+ mock_session.id = "session-123"
+ mock_stream1 = Mock()
+ mock_stream1.id = "stream-456"
+ mock_stream2 = Mock()
+ mock_stream2.id = "stream-789"
+
+ # First stream should trigger first participant event
+ await transport._on_stream_received(mock_session, mock_stream1)
+ self.assertTrue(transport._one_stream_received)
+
+ # Reset mock to check second stream
+ transport._call_event_handler.reset_mock()
+
+ # Second stream should not trigger first participant event
+ await transport._on_stream_received(mock_session, mock_stream2)
+ transport._call_event_handler.assert_called_once_with(
+ "on_participant_joined", {"sessionId": "session-123", "streamId": "stream-789"}
+ )
+
+
+class TestAudioNormalization(unittest.IsolatedAsyncioTestCase):
+ """Test cases for audio normalization functions."""
+
+ def setUp(self):
+ """Set up test fixtures."""
+ self.AudioProps = AudioProps
+ self.process_audio_channels = process_audio_channels
+ self.process_audio = process_audio
+ self.check_audio_data = check_audio_data
+
+ def test_audio_props_creation(self):
+ """Test AudioProps dataclass creation."""
+ props = self.AudioProps(sample_rate=48000, is_stereo=True)
+ self.assertEqual(props.sample_rate, 48000)
+ self.assertTrue(props.is_stereo)
+
+ props_mono = self.AudioProps(sample_rate=16000, is_stereo=False)
+ self.assertEqual(props_mono.sample_rate, 16000)
+ self.assertFalse(props_mono.is_stereo)
+
+ def test_process_audio_channels_mono_to_stereo(self):
+ """Test converting mono audio to stereo."""
+ # Create mono audio (4 samples)
+ mono_audio = np.array([100, 200, 300, 400], dtype=np.int16)
+
+ current = self.AudioProps(sample_rate=48000, is_stereo=False)
+ target = self.AudioProps(sample_rate=48000, is_stereo=True)
+
+ result = self.process_audio_channels(mono_audio, current, target)
+
+ # Should duplicate each sample
+ expected = np.array([100, 100, 200, 200, 300, 300, 400, 400], dtype=np.int16)
+ np.testing.assert_array_equal(result, expected)
+
+ def test_process_audio_channels_stereo_to_mono(self):
+ """Test converting stereo audio to mono."""
+ # Create stereo audio (2 frames, 4 samples total)
+ stereo_audio = np.array([100, 200, 300, 400], dtype=np.int16)
+
+ current = self.AudioProps(sample_rate=48000, is_stereo=True)
+ target = self.AudioProps(sample_rate=48000, is_stereo=False)
+
+ result = self.process_audio_channels(stereo_audio, current, target)
+
+ # Should average each stereo pair: (100+200)/2=150, (300+400)/2=350
+ expected = np.array([150, 350], dtype=np.int16)
+ np.testing.assert_array_equal(result, expected)
+
+ def test_process_audio_channels_same_format(self):
+ """Test when source and target have the same channel format."""
+ audio = np.array([100, 200, 300, 400], dtype=np.int16)
+
+ # Test mono to mono
+ current = self.AudioProps(sample_rate=48000, is_stereo=False)
+ target = self.AudioProps(sample_rate=48000, is_stereo=False)
+ result = self.process_audio_channels(audio, current, target)
+ np.testing.assert_array_equal(result, audio)
+
+ # Test stereo to stereo
+ current = self.AudioProps(sample_rate=48000, is_stereo=True)
+ target = self.AudioProps(sample_rate=48000, is_stereo=True)
+ result = self.process_audio_channels(audio, current, target)
+ np.testing.assert_array_equal(result, audio)
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_process_audio_same_sample_rate(self, mock_resampler):
+ """Test process_audio when sample rates are the same."""
+ mock_resampler_instance = Mock()
+ mock_resampler.return_value = mock_resampler_instance
+
+ audio = np.array([100, 200, 300, 400], dtype=np.int16)
+ current = self.AudioProps(sample_rate=48000, is_stereo=False)
+ target = self.AudioProps(sample_rate=48000, is_stereo=True)
+
+ result = await self.process_audio(mock_resampler_instance, audio, current, target)
+
+ # Should only do channel conversion, no resampling
+ expected = np.array([100, 100, 200, 200, 300, 300, 400, 400], dtype=np.int16)
+ np.testing.assert_array_equal(result, expected)
+
+ # Resampler should not be called
+ mock_resampler_instance.resample.assert_not_called()
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_process_audio_different_sample_rate_mono(self, mock_resampler):
+ """Test process_audio with different sample rates (mono)."""
+ mock_resampler_instance = Mock()
+ mock_resampler_instance.resample = AsyncMock(
+ return_value=b"\x64\x00\xc8\x00"
+ ) # 100, 200 in bytes
+ mock_resampler.return_value = mock_resampler_instance
+
+ audio = np.array([150, 250, 350, 450], dtype=np.int16)
+ current = self.AudioProps(sample_rate=48000, is_stereo=False)
+ target = self.AudioProps(sample_rate=16000, is_stereo=False)
+
+ result = await self.process_audio(mock_resampler_instance, audio, current, target)
+
+ # Should resample the audio
+ expected = np.array([100, 200], dtype=np.int16)
+ np.testing.assert_array_equal(result, expected)
+
+ # Resampler should be called with correct parameters
+ mock_resampler_instance.resample.assert_called_once_with(audio.tobytes(), 48000, 16000)
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_process_audio_different_sample_rate_stereo_to_mono(self, mock_resampler):
+ """Test process_audio with different sample rates and channel conversion."""
+ mock_resampler_instance = Mock()
+ # Return resampled mono data
+ mock_resampler_instance.resample = AsyncMock(
+ return_value=b"\x64\x00\xc8\x00"
+ ) # 100, 200 in bytes
+ mock_resampler.return_value = mock_resampler_instance
+
+ # Stereo audio: 2 frames with left/right channels
+ audio = np.array([100, 200, 300, 400], dtype=np.int16) # L1=100, R1=200, L2=300, R2=400
+ current = self.AudioProps(sample_rate=48000, is_stereo=True)
+ target = self.AudioProps(sample_rate=16000, is_stereo=False)
+
+ result = await self.process_audio(mock_resampler_instance, audio, current, target)
+
+ # Should convert to mono first, then resample
+ expected = np.array([100, 200], dtype=np.int16)
+ np.testing.assert_array_equal(result, expected)
+
+ # Resampler should be called with mono audio
+ expected_mono = np.array([150, 350], dtype=np.int16) # (100+200)/2, (300+400)/2
+ mock_resampler_instance.resample.assert_called_once_with(
+ expected_mono.tobytes(), 48000, 16000
+ )
+
+ @patch("pipecat.transports.vonage.video_webrtc.create_stream_resampler")
+ async def test_process_audio_different_sample_rate_mono_to_stereo(self, mock_resampler):
+ """Test process_audio with different sample rates converting mono to stereo."""
+ mock_resampler_instance = Mock()
+ # Return resampled mono data
+ mock_resampler_instance.resample = AsyncMock(
+ return_value=b"\x64\x00\xc8\x00"
+ ) # 100, 200 in bytes
+ mock_resampler.return_value = mock_resampler_instance
+
+ audio = np.array([150, 250], dtype=np.int16)
+ current = self.AudioProps(sample_rate=48000, is_stereo=False)
+ target = self.AudioProps(sample_rate=16000, is_stereo=True)
+
+ result = await self.process_audio(mock_resampler_instance, audio, current, target)
+
+ # Should resample first (mono), then convert to stereo
+ expected = np.array([100, 100, 200, 200], dtype=np.int16)
+ np.testing.assert_array_equal(result, expected)
+
+ # Resampler should be called with mono audio
+ mock_resampler_instance.resample.assert_called_once_with(audio.tobytes(), 48000, 16000)
+
+ def test_check_audio_data_valid_mono_bytes(self):
+ """Test check_audio_data with valid mono audio as bytes."""
+ # 4 frames of mono 16-bit audio (8 bytes total)
+ buffer = b"\x00\x01\x02\x03\x04\x05\x06\x07"
+
+ # Should not raise any exception
+ self.check_audio_data(buffer, 4, 1)
+
+ def test_check_audio_data_valid_stereo_bytes(self):
+ """Test check_audio_data with valid stereo audio as bytes."""
+ # 2 frames of stereo 16-bit audio (8 bytes total)
+ buffer = b"\x00\x01\x02\x03\x04\x05\x06\x07"
+
+ # Should not raise any exception
+ self.check_audio_data(buffer, 2, 2)
+
+ def test_check_audio_data_valid_memoryview(self):
+ """Test check_audio_data with valid audio as memoryview."""
+ # Create int16 memoryview (2 bytes per sample)
+ array = np.array([100, 200, 300, 400], dtype=np.int16)
+ buffer = memoryview(array)
+
+ # Should not raise any exception
+ self.check_audio_data(buffer, 4, 1) # 4 mono frames
+ self.check_audio_data(buffer, 2, 2) # 2 stereo frames
+
+ def test_check_audio_data_invalid_channels(self):
+ """Test check_audio_data with invalid number of channels."""
+ buffer = b"\x00\x01\x02\x03"
+
+ # Should raise ValueError for invalid channel counts
+ with self.assertRaises(ValueError) as context:
+ self.check_audio_data(buffer, 2, 3) # 3 channels not supported
+ self.assertIn("mono or stereo", str(context.exception))
+
+ with self.assertRaises(ValueError) as context:
+ self.check_audio_data(buffer, 2, 0) # 0 channels not supported
+ self.assertIn("mono or stereo", str(context.exception))
+
+ def test_check_audio_data_invalid_bit_depth_bytes(self):
+ """Test check_audio_data with invalid bit depth using bytes."""
+ # 2 frames of mono audio with 1 byte per sample (8-bit)
+ buffer = b"\x00\x01"
+
+ with self.assertRaises(ValueError) as context:
+ self.check_audio_data(buffer, 2, 1)
+ self.assertIn("16 bit PCM", str(context.exception))
+ self.assertIn("got 8 bit", str(context.exception))
+
+ def test_check_audio_data_invalid_bit_depth_memoryview(self):
+ """Test check_audio_data with invalid bit depth using memoryview."""
+ # Create uint8 memoryview (1 byte per sample)
+ array = np.array([100, 200], dtype=np.uint8)
+ buffer = memoryview(array)
+
+ with self.assertRaises(ValueError) as context:
+ self.check_audio_data(buffer, 2, 1)
+ self.assertIn("16 bit PCM", str(context.exception))
+ self.assertIn("got 8 bit", str(context.exception))
+
+ def test_check_audio_data_buffer_size_mismatch(self):
+ """Test check_audio_data with buffer size that doesn't match expected size."""
+ # 3 bytes total, but expecting 2 frames of mono 16-bit (should be 4 bytes)
+ buffer = b"\x00\x01\x02"
+
+ with self.assertRaises(ValueError) as context:
+ self.check_audio_data(buffer, 2, 1)
+ # Should detect that 3 bytes / (2 frames * 1 channel) = 1.5 bytes per sample
+ # which gets truncated to 1 byte per sample = 8 bit
+ self.assertIn("16 bit PCM", str(context.exception))
+
+
+if __name__ == "__main__":
+ unittest.main()