From 9daa5ee3b5529961ea5448c829cc55ba2214a6f7 Mon Sep 17 00:00:00 2001 From: Shubhra Date: Mon, 5 May 2025 14:21:07 -0600 Subject: [PATCH 01/13] Run tests on dev branch (#382) --- .github/workflows/build.yml | 4 ++-- .github/workflows/test.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f30cda556..8b19f7f7c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,9 +5,9 @@ name: Build on: push: - branches: [next, main] + branches: [next, main, dev-1.0] pull_request: - branches: [next, main] + branches: [next, main, dev-1.0] jobs: reuse: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index df780e4f0..961f77275 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,9 +5,9 @@ name: Test on: push: - branches: [next, main] + branches: [next, main, dev-1.0] pull_request: - branches: [next, main] + branches: [next, main, dev-1.0] jobs: build: From 0cdd55fa14d13030281d5078dc209f6616f432e6 Mon Sep 17 00:00:00 2001 From: Shubhra Date: Mon, 5 May 2025 14:21:07 -0600 Subject: [PATCH 02/13] Run tests on dev branch (#382) --- .github/workflows/build.yml | 4 ++-- .github/workflows/test.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f30cda556..8b19f7f7c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -5,9 +5,9 @@ name: Build on: push: - branches: [next, main] + branches: [next, main, dev-1.0] pull_request: - branches: [next, main] + branches: [next, main, dev-1.0] jobs: reuse: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index df780e4f0..961f77275 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,9 +5,9 @@ name: Test on: push: - branches: [next, main] + branches: [next, main, dev-1.0] pull_request: - branches: [next, main] + branches: [next, main, dev-1.0] jobs: build: From 117ce2edaf74b43de74688482f073269fc8555cd Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Wed, 7 May 2025 13:44:29 -0700 Subject: [PATCH 03/13] update package version --- plugins/cartesia/package.json | 4 ++-- plugins/deepgram/package.json | 4 ++-- plugins/elevenlabs/package.json | 4 ++-- plugins/neuphonic/package.json | 4 ++-- plugins/openai/package.json | 4 ++-- plugins/resemble/package.json | 4 ++-- plugins/silero/package.json | 4 ++-- plugins/test/package.json | 4 ++-- pnpm-lock.yaml | 32 ++++++++++++++++---------------- 9 files changed, 32 insertions(+), 32 deletions(-) diff --git a/plugins/cartesia/package.json b/plugins/cartesia/package.json index 67adcf2b9..38f3ef063 100644 --- a/plugins/cartesia/package.json +++ b/plugins/cartesia/package.json @@ -33,7 +33,7 @@ "@livekit/agents": "workspace:^x", "@livekit/agents-plugin-openai": "workspace:^x", "@livekit/agents-plugins-test": "workspace:^x", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@microsoft/api-extractor": "^7.35.0", "@types/ws": "^8.5.10", "tsup": "^8.3.5", @@ -44,6 +44,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/plugins/deepgram/package.json b/plugins/deepgram/package.json index 0ef1e09af..0dc9d0aa2 100644 --- a/plugins/deepgram/package.json +++ b/plugins/deepgram/package.json @@ -33,7 +33,7 @@ "@livekit/agents": "workspace:^x", "@livekit/agents-plugin-silero": "workspace:^x", "@livekit/agents-plugins-test": "workspace:^x", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@microsoft/api-extractor": "^7.35.0", "@types/ws": "^8.5.10", "tsup": "^8.3.5", @@ -44,6 +44,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/plugins/elevenlabs/package.json b/plugins/elevenlabs/package.json index 758b4d750..7520d5b0c 100644 --- a/plugins/elevenlabs/package.json +++ b/plugins/elevenlabs/package.json @@ -33,7 +33,7 @@ "@livekit/agents": "workspace:^x", "@livekit/agents-plugin-openai": "workspace:^x", "@livekit/agents-plugins-test": "workspace:^x", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@microsoft/api-extractor": "^7.35.0", "@types/ws": "^8.5.10", "tsup": "^8.3.5", @@ -44,6 +44,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/plugins/neuphonic/package.json b/plugins/neuphonic/package.json index 1277156d6..aafe1aa3f 100644 --- a/plugins/neuphonic/package.json +++ b/plugins/neuphonic/package.json @@ -33,7 +33,7 @@ "@livekit/agents": "workspace:^x", "@livekit/agents-plugin-openai": "workspace:^x", "@livekit/agents-plugins-test": "workspace:^x", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@microsoft/api-extractor": "^7.35.0", "@types/ws": "^8.5.10", "tsup": "^8.3.5", @@ -44,6 +44,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/plugins/openai/package.json b/plugins/openai/package.json index 16cc6a89c..0bc110eff 100644 --- a/plugins/openai/package.json +++ b/plugins/openai/package.json @@ -33,7 +33,7 @@ "@livekit/agents": "workspace:^x", "@livekit/agents-plugin-silero": "workspace:^x", "@livekit/agents-plugins-test": "workspace:^x", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@microsoft/api-extractor": "^7.35.0", "@types/ws": "^8.5.10", "tsup": "^8.3.5", @@ -46,6 +46,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/plugins/resemble/package.json b/plugins/resemble/package.json index 3bd876b5d..75aa1d28c 100644 --- a/plugins/resemble/package.json +++ b/plugins/resemble/package.json @@ -33,7 +33,7 @@ "@livekit/agents": "workspace:^", "@livekit/agents-plugin-openai": "workspace:^", "@livekit/agents-plugins-test": "workspace:^", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@microsoft/api-extractor": "^7.35.0", "@types/ws": "^8.5.10", "tsup": "^8.3.5", @@ -44,6 +44,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/plugins/silero/package.json b/plugins/silero/package.json index ad93456bf..f960af4ff 100644 --- a/plugins/silero/package.json +++ b/plugins/silero/package.json @@ -31,7 +31,7 @@ }, "devDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@microsoft/api-extractor": "^7.35.0", "@types/ws": "^8.5.10", "onnxruntime-common": "^1.19.2", @@ -44,6 +44,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/plugins/test/package.json b/plugins/test/package.json index 5437414e9..eae6cf6dd 100644 --- a/plugins/test/package.json +++ b/plugins/test/package.json @@ -28,7 +28,7 @@ }, "devDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11", + "@livekit/rtc-node": "^0.13.12", "@types/node": "^22.5.5", "tsup": "^8.3.5", "typescript": "^5.0.0" @@ -40,6 +40,6 @@ }, "peerDependencies": { "@livekit/agents": "workspace:^x", - "@livekit/rtc-node": "^0.13.11" + "@livekit/rtc-node": "^0.13.12" } } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 24dfbb43f..5128ebd11 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -180,8 +180,8 @@ importers: specifier: workspace:^x version: link:../test '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -211,8 +211,8 @@ importers: specifier: workspace:^x version: link:../test '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -242,8 +242,8 @@ importers: specifier: workspace:^x version: link:../test '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -298,8 +298,8 @@ importers: specifier: workspace:^x version: link:../test '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -335,8 +335,8 @@ importers: specifier: workspace:^x version: link:../test '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -366,8 +366,8 @@ importers: specifier: workspace:^ version: link:../test '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -394,8 +394,8 @@ importers: specifier: workspace:^x version: link:../../agents '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -428,8 +428,8 @@ importers: specifier: workspace:^x version: link:../../agents '@livekit/rtc-node': - specifier: ^0.13.11 - version: 0.13.11 + specifier: ^0.13.12 + version: 0.13.12 '@types/node': specifier: ^22.5.5 version: 22.5.5 From 3672b81d490c27d4e23618744dd371eccb54862a Mon Sep 17 00:00:00 2001 From: Shubhra Date: Thu, 15 May 2025 12:58:51 -0600 Subject: [PATCH 04/13] Use WHATWG streams to forward AudioFrames from Livekit room into VAD (#383) Co-authored-by: lukasIO --- agents/src/deferred_stream.ts | 64 +++++++++++++ agents/src/index.ts | 3 +- agents/src/utils.ts | 12 +-- agents/src/vad.ts | 28 ++++++ agents/src/voice/agent.ts | 93 +++++++++++++++++++ agents/src/voice/agent_activity.ts | 65 +++++++++++++ agents/src/voice/agent_session.ts | 64 +++++++++++++ agents/src/voice/audio_recognition.ts | 67 +++++++++++++ agents/src/voice/index.ts | 5 + agents/src/voice/io.ts | 10 ++ agents/src/voice/room_io.ts | 45 +++++++++ examples/src/basic_agent.ts | 32 +++++++ pnpm-lock.yaml | 129 +++++++------------------- 13 files changed, 514 insertions(+), 103 deletions(-) create mode 100644 agents/src/deferred_stream.ts create mode 100644 agents/src/voice/agent.ts create mode 100644 agents/src/voice/agent_activity.ts create mode 100644 agents/src/voice/agent_session.ts create mode 100644 agents/src/voice/audio_recognition.ts create mode 100644 agents/src/voice/index.ts create mode 100644 agents/src/voice/io.ts create mode 100644 agents/src/voice/room_io.ts create mode 100644 examples/src/basic_agent.ts diff --git a/agents/src/deferred_stream.ts b/agents/src/deferred_stream.ts new file mode 100644 index 000000000..73d28bebf --- /dev/null +++ b/agents/src/deferred_stream.ts @@ -0,0 +1,64 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { ReadableStream } from 'node:stream/web'; +import { Future } from './utils.js'; + +export class DeferredReadableStream { + private _sourceFuture: Future>; + + private _reader?: ReadableStreamDefaultReader; + + public readonly stream: ReadableStream; + + constructor() { + this._sourceFuture = new Future>(); + + this.stream = new ReadableStream({ + start: async (controller) => { + try { + const source = await this._sourceFuture.await; + + this._reader = source.getReader(); + + const pump = async () => { + try { + while (true) { + const { done, value } = await this._reader!.read(); + if (done) break; + controller.enqueue(value); + } + controller.close(); + } catch (err) { + controller.error(err); + } + }; + + pump(); + } catch (err) { + controller.error(err); + } + }, + cancel: async (reason) => { + await this.cancel(reason); + }, + }); + } + + /** + * Call once the actual source is ready. + */ + setSource(source: ReadableStream) { + if (this._sourceFuture.done) { + return; + } + this._sourceFuture.resolve(source); + } + + async cancel(reason?: Error) { + if (!this._sourceFuture.done) { + this._sourceFuture.reject(reason ?? new Error('Stream cancelled without reason')); + } + await this._reader?.cancel(reason); + } +} diff --git a/agents/src/index.ts b/agents/src/index.ts index 131ea0322..1bec48b4b 100644 --- a/agents/src/index.ts +++ b/agents/src/index.ts @@ -18,6 +18,7 @@ import * as pipeline from './pipeline/index.js'; import * as stt from './stt/index.js'; import * as tokenize from './tokenize/index.js'; import * as tts from './tts/index.js'; +import * as voice from './voice/index.js'; export * from './vad.js'; export * from './plugin.js'; @@ -31,4 +32,4 @@ export * from './audio.js'; export * from './transcription.js'; export * from './inference_runner.js'; -export { cli, stt, tts, llm, pipeline, multimodal, tokenize, metrics, ipc }; +export { cli, stt, tts, llm, pipeline, multimodal, tokenize, metrics, ipc, voice }; diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 5ec97e5f9..d81711379 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -117,14 +117,14 @@ export class Queue { } /** @internal */ -export class Future { - #await: Promise; - #resolvePromise!: () => void; +export class Future { + #await: Promise; + #resolvePromise!: (value: T) => void; #rejectPromise!: (error: Error) => void; #done: boolean = false; constructor() { - this.#await = new Promise((resolve, reject) => { + this.#await = new Promise((resolve, reject) => { this.#resolvePromise = resolve; this.#rejectPromise = reject; }); @@ -138,9 +138,9 @@ export class Future { return this.#done; } - resolve() { + resolve(value: T) { this.#done = true; - this.#resolvePromise(); + this.#resolvePromise(value); } reject(error: Error) { diff --git a/agents/src/vad.ts b/agents/src/vad.ts index 766bae8b5..5b517abe5 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -4,6 +4,9 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; +import type { ReadableStream } from 'node:stream/web'; +import { DeferredReadableStream } from './deferred_stream.js'; +import { log } from './log.js'; import type { VADMetrics } from './metrics/base.js'; import { AsyncIterableQueue } from './utils.js'; @@ -83,10 +86,31 @@ export abstract class VADStream implements AsyncIterableIterator { protected closed = false; #vad: VAD; #lastActivityTime = BigInt(0); + private logger = log(); + private deferredInputStream: DeferredReadableStream; constructor(vad: VAD) { this.#vad = vad; + this.deferredInputStream = new DeferredReadableStream(); this.monitorMetrics(); + this.mainTask(); + } + + protected async mainTask() { + // This is just a placeholder since VAD isn't implemented with the streams API yet. + try { + const inputStream = this.deferredInputStream.stream; + const reader = inputStream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + this.pushFrame(value); + } + } catch (error) { + this.logger.error('Error in VADStream mainTask:', error); + } } protected async monitorMetrics() { @@ -122,6 +146,10 @@ export abstract class VADStream implements AsyncIterableIterator { this.output.close(); } + updateInputStream(audioStream: ReadableStream) { + this.deferredInputStream.setSource(audioStream); + } + pushFrame(frame: AudioFrame) { if (this.input.closed) { throw new Error('Input is closed'); diff --git a/agents/src/voice/agent.ts b/agents/src/voice/agent.ts new file mode 100644 index 000000000..c2bdf3512 --- /dev/null +++ b/agents/src/voice/agent.ts @@ -0,0 +1,93 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/* eslint-disable @typescript-eslint/no-unused-vars */ + +/* eslint-disable @typescript-eslint/no-explicit-any */ +import type { AudioFrame } from '@livekit/rtc-node'; +import type { ChatChunk, ChatMessage, LLM } from '../llm/index.js'; +import { ChatContext } from '../llm/index.js'; +import type { STT, SpeechEvent } from '../stt/index.js'; +import type { TTS } from '../tts/index.js'; +import type { VAD } from '../vad.js'; + +export class Agent { + private instructions: string; + private chatCtx: ChatContext; + private tools: any; // TODO(shubhra): add type + private turnDetection: any; // TODO(shubhra): add type + private stt: STT | undefined; + private vad: VAD | undefined; + private llm: LLM | any; + private tts: TTS | undefined; + private agentActivity: any; // TODO(shubhra): add type + + constructor( + instructions: string, + chatCtx?: ChatContext, + tools?: any, // TODO(shubhra): add type + turnDetection?: any, // TODO(shubhra): add type + stt?: STT, + vad?: VAD, + llm?: LLM | any, + tts?: TTS, + allowInterruptions?: boolean, + ) { + this.instructions = instructions; + this.chatCtx = chatCtx || new ChatContext(); + this.tools = tools; + this.turnDetection = turnDetection; + this.stt = stt; + this.vad = vad; + this.llm = llm; + this.tts = tts; + this.agentActivity = undefined; // TODO(shubhra): add type + } + + async onEnter(): Promise {} + + async onExit(): Promise {} + + async transcriptionNode( + text: ReadableStream, + modelSettings: any, // TODO(shubhra): add type + ): Promise | null> { + return null; + } + + async onUserTurnCompleted(chatCtx: ChatContext, newMessage: ChatMessage): Promise {} + + async sttNode( + audio: ReadableStream, + modelSettings: any, // TODO(shubhra): add type + ): Promise | null> { + return null; + } + + async llmNode( + chatCtx: ChatContext, + tools: Array, // TODO(shubhra): add type + modelSettings: any, // TODO(shubhra): add type + ): Promise | null> { + return null; + } + + async ttsNode( + text: ReadableStream, + modelSettings: any, // TODO(shubhra): add type + ): Promise | null> { + return null; + } + + // realtime_audio_output_node + + static default = { + async sttNode( + audio: ReadableStream, + modelSettings: any, // TODO(shubhra): add type + ): Promise | null> { + return null; + }, + }; +} diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts new file mode 100644 index 000000000..faa8ccf58 --- /dev/null +++ b/agents/src/voice/agent_activity.ts @@ -0,0 +1,65 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; +import { log } from '../log.js'; +import type { SpeechEvent } from '../stt/stt.js'; +import type { VADEvent } from '../vad.js'; +import type { Agent } from './agent.js'; +import type { AgentSession } from './agent_session.js'; +import { + AudioRecognition, + type EndOfTurnInfo, + type RecognitionHooks, +} from './audio_recognition.js'; + +export class AgentActivity implements RecognitionHooks { + private started = false; + private audioRecognition?: AudioRecognition; + private logger = log(); + agent: Agent; + agentSession: AgentSession; + + constructor(agent: Agent, agentSession: AgentSession) { + this.agent = agent; + this.agentSession = agentSession; + } + + async start(): Promise { + if (this.started) { + return; + } + this.audioRecognition = new AudioRecognition(this, this.agentSession.vad); + this.audioRecognition.start(); + this.started = true; + } + + updateAudioInput(audioStream: ReadableStream): void { + this.audioRecognition?.setInputAudioStream(audioStream); + } + + onStartOfSpeech(ev: VADEvent): void { + this.logger.info('Start of speech', ev); + } + + onEndOfSpeech(ev: VADEvent): void { + this.logger.info('End of speech', ev); + } + + onVADInferenceDone(ev: VADEvent): void { + this.logger.info('VAD inference done', ev); + } + + onInterimTranscript(ev: SpeechEvent): void { + this.logger.info('Interim transcript', ev); + } + + onFinalTranscript(ev: SpeechEvent): void { + this.logger.info('Final transcript', ev); + } + + onEndOfTurn(ev: EndOfTurnInfo): void { + this.logger.info('End of turn', ev); + } +} diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts new file mode 100644 index 000000000..0faef10bd --- /dev/null +++ b/agents/src/voice/agent_session.ts @@ -0,0 +1,64 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame, Room } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; +import { log } from '../log.js'; +import type { VAD } from '../vad.js'; +import type { Agent } from './agent.js'; +import { AgentActivity } from './agent_activity.js'; +import { RoomIO } from './room_io.js'; + +export class AgentSession { + vad: VAD; + + private agent?: Agent; + private activity?: AgentActivity; + private nextActivity?: AgentActivity; + private started = false; + + private roomIO?: RoomIO; + private logger = log(); + + /** @internal */ + audioInput?: ReadableStream; + + constructor(vad: VAD) { + this.vad = vad; + } + + async start(agent: Agent, room: Room): Promise { + if (this.started) { + return; + } + + this.agent = agent; + + if (this.agent) { + await this.updateActivity(this.agent); + } + + this.roomIO = new RoomIO(this, room); + this.roomIO.start(); + + if (this.audioInput) { + this.activity?.updateAudioInput(this.audioInput); + } + + this.logger.debug('AgentSession started'); + this.started = true; + } + + private async updateActivity(agent: Agent): Promise { + this.nextActivity = new AgentActivity(agent, this); + + // TODO(shubhra): Drain and close the old activity + + this.activity = this.nextActivity; + this.nextActivity = undefined; + + if (this.activity) { + await this.activity.start(); + } + } +} diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts new file mode 100644 index 000000000..e668b9e12 --- /dev/null +++ b/agents/src/voice/audio_recognition.ts @@ -0,0 +1,67 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; +import { DeferredReadableStream } from '../deferred_stream.js'; +import { log } from '../log.js'; +import type { SpeechEvent } from '../stt/stt.js'; +import { type VAD, type VADEvent, VADEventType } from '../vad.js'; + +export interface EndOfTurnInfo { + newTranscript: string; + transcriptionDelay: number; + endOfUtteranceDelay: number; +} + +export interface RecognitionHooks { + onStartOfSpeech: (ev: VADEvent) => void; + onEndOfSpeech: (ev: VADEvent) => void; + onVADInferenceDone: (ev: VADEvent) => void; + onInterimTranscript: (ev: SpeechEvent) => void; + onFinalTranscript: (ev: SpeechEvent) => void; + onEndOfTurn: (info: EndOfTurnInfo) => void; +} + +export class AudioRecognition { + private deferredInputStream: DeferredReadableStream; + private vadStreamProcessor?: Promise; + private logger = log(); + + constructor( + private hooks: RecognitionHooks, + private vad: VAD, + ) { + this.deferredInputStream = new DeferredReadableStream(); + } + + start() { + this.vadStreamProcessor = this.vadTask().catch((err) => { + this.logger.error('Error in VAD task', err); + }); + } + + private async vadTask() { + const inputStream = this.deferredInputStream.stream; + const vadStream = this.vad.stream(); + vadStream.updateInputStream(inputStream); + + for await (const ev of vadStream) { + switch (ev.type) { + case VADEventType.START_OF_SPEECH: + this.hooks.onStartOfSpeech(ev); + break; + case VADEventType.END_OF_SPEECH: + this.hooks.onEndOfSpeech(ev); + break; + case VADEventType.INFERENCE_DONE: + this.hooks.onVADInferenceDone(ev); + break; + } + } + } + + setInputAudioStream(audioStream: ReadableStream) { + this.deferredInputStream.setSource(audioStream); + } +} diff --git a/agents/src/voice/index.ts b/agents/src/voice/index.ts new file mode 100644 index 000000000..7e20ed188 --- /dev/null +++ b/agents/src/voice/index.ts @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +export { Agent } from './agent.js'; +export { AgentSession } from './agent_session.js'; diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts new file mode 100644 index 000000000..ca2b8c47e --- /dev/null +++ b/agents/src/voice/io.ts @@ -0,0 +1,10 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame } from '@livekit/rtc-node'; +import type { SpeechEvent } from '../stt/stt.js'; + +export type STTNode = ( + audio: ReadableStream, + modelSettings: any, // TODO(shubhra): add type +) => Promise | null>; diff --git a/agents/src/voice/room_io.ts b/agents/src/voice/room_io.ts new file mode 100644 index 000000000..ba5f8eb10 --- /dev/null +++ b/agents/src/voice/room_io.ts @@ -0,0 +1,45 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import type { AudioFrame, Room } from '@livekit/rtc-node'; +import { AudioStream, type RemoteTrack, RoomEvent, TrackKind } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; +import { DeferredReadableStream } from '../deferred_stream.js'; +import { log } from '../log.js'; +import type { AgentSession } from './agent_session.js'; + +export class RoomIO { + private agentSession: AgentSession; + private participantAudioInputStream: ReadableStream; + private logger = log(); + + private room: Room; + + private _deferredAudioInputStream = new DeferredReadableStream(); + + constructor(agentSession: AgentSession, room: Room) { + this.agentSession = agentSession; + this.room = room; + this.participantAudioInputStream = this._deferredAudioInputStream.stream; + + this.setupEventListeners(); + } + + private setupEventListeners() { + this.room.on(RoomEvent.TrackSubscribed, this.onTrackSubscribed); + } + + private cleanup() { + this.room.off(RoomEvent.TrackSubscribed, this.onTrackSubscribed); + } + + private onTrackSubscribed = (track: RemoteTrack) => { + if (track.kind === TrackKind.KIND_AUDIO) { + this._deferredAudioInputStream.setSource(new AudioStream(track)); + } + }; + + start() { + this.agentSession.audioInput = this.participantAudioInputStream; + } +} diff --git a/examples/src/basic_agent.ts b/examples/src/basic_agent.ts new file mode 100644 index 000000000..410eaef29 --- /dev/null +++ b/examples/src/basic_agent.ts @@ -0,0 +1,32 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { + type JobContext, + type JobProcess, + WorkerOptions, + cli, + defineAgent, + voice, +} from '@livekit/agents'; +import * as silero from '@livekit/agents-plugin-silero'; +import { fileURLToPath } from 'node:url'; + +export default defineAgent({ + prewarm: async (proc: JobProcess) => { + proc.userData.vad = await silero.VAD.load(); + }, + entry: async (ctx: JobContext) => { + const agent = new voice.Agent('test'); + await ctx.connect(); + const participant = await ctx.waitForParticipant(); + console.log('participant joined: ', participant.identity); + + const vad = ctx.proc.userData.vad! as silero.VAD; + + const session = new voice.AgentSession(vad); + session.start(agent, ctx.room); + }, +}); + +cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) })); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5128ebd11..881fedc0a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -107,7 +107,7 @@ importers: devDependencies: '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -146,7 +146,7 @@ importers: version: link:../plugins/silero '@livekit/rtc-node': specifier: ^0.13.11 - version: 0.13.11 + version: 0.13.13 livekit-server-sdk: specifier: ^2.9.2 version: 2.9.2 @@ -181,7 +181,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -212,7 +212,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -243,7 +243,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -299,7 +299,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -336,7 +336,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -367,7 +367,7 @@ importers: version: link:../test '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -395,7 +395,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@microsoft/api-extractor': specifier: ^7.35.0 version: 7.43.7(@types/node@22.5.5) @@ -429,7 +429,7 @@ importers: version: link:../../agents '@livekit/rtc-node': specifier: ^0.13.12 - version: 0.13.12 + version: 0.13.13 '@types/node': specifier: ^22.5.5 version: 22.5.5 @@ -1329,72 +1329,38 @@ packages: '@livekit/protocol@1.29.1': resolution: {integrity: sha512-OhxXTZlyM5f4ydnAq1p5azzzOtKWmIoCSVtyVj9rgE42zQI5JM1rR9pubVRZovouGSvEDSJx9yL4Js2IlIyM1Q==} - '@livekit/rtc-node-darwin-arm64@0.13.11': - resolution: {integrity: sha512-XqbVUW5rVrRdVzxUI3+f8K6A1bnzAXytbCmPx7YiGOXVNRCV1kC84R7fap7OgrgN/rAtObhyYK882xdJVG/BYA==} + '@livekit/rtc-node-darwin-arm64@0.13.13': + resolution: {integrity: sha512-iyjWwgr7JKTHa+YX1aCiKPT91Zk/snnBWOrWIJz9qq9X3cDvKVFjxOpJF1wVKPHBWE1dwDmNpSry/tltiUilZQ==} engines: {node: '>= 10'} cpu: [arm64] os: [darwin] - '@livekit/rtc-node-darwin-arm64@0.13.12': - resolution: {integrity: sha512-R13dfyNc3CwIAmmTo3rMO6d5HRddkzwjZ7RRkak8uYUMORMBcIrNZIOSAdRDo6kylLDvD7dzVXBQeqHVP/XBsg==} - engines: {node: '>= 10'} - cpu: [arm64] - os: [darwin] - - '@livekit/rtc-node-darwin-x64@0.13.11': - resolution: {integrity: sha512-UFe9Lp+7Z8UZcJq2oOH8+6nCKWlX0PVorB4jeCRZuVa4QL2PL1CcGvo9/kNNw5aA25AkPUgDjMXj2WbfEPNMKA==} - engines: {node: '>= 10'} - cpu: [x64] - os: [darwin] - - '@livekit/rtc-node-darwin-x64@0.13.12': - resolution: {integrity: sha512-fStjTQID0N4c6nX3Ii8JuADd1doP8hcXF0kcCQ7yG+yBZQxKpVQ9r66yxplxJSR0bpUd3NPf71GFbAVfWr+7eg==} + '@livekit/rtc-node-darwin-x64@0.13.13': + resolution: {integrity: sha512-MmmDIUO85D4Mj1vQqVhTBXoP64kfd4HMeTGI8mPxsqlzoj4M80/N0um/dW+in5RuwvMZ6AMHgVKuWK4nKxf7CQ==} engines: {node: '>= 10'} cpu: [x64] os: [darwin] - '@livekit/rtc-node-linux-arm64-gnu@0.13.11': - resolution: {integrity: sha512-GuJtl1nJhJnFEMI9plJqlIJ0BJCWuynJzbhhD7Yd/Zuw/NYzzzIf+wQ2mIZ0Zk9/EUV4oMYJqacJiZXvOvUQ3A==} - engines: {node: '>= 10'} - cpu: [arm64] - os: [linux] - - '@livekit/rtc-node-linux-arm64-gnu@0.13.12': - resolution: {integrity: sha512-QZQQk1lXSGSXXR6Bw1W+nbuSJL2m3OkhTQIvpvF0iz12Q5Ck3PVHEIgGoJbPB1qTiIyRDjKTGsZyJt6XJZJtsA==} + '@livekit/rtc-node-linux-arm64-gnu@0.13.13': + resolution: {integrity: sha512-Jl3Wm0qDyiag+20BDIHbV6Gk50VgSffxyTWYxw1Ecns1UZCVlB+1V5U7O6dcnCe/Wk/n/RxR8sFtTpvbNudnbw==} engines: {node: '>= 10'} cpu: [arm64] os: [linux] - '@livekit/rtc-node-linux-x64-gnu@0.13.11': - resolution: {integrity: sha512-Zi7Elg29JSmDzikxL2Q9YAAka2Khi7GwYHYBv69W6XXHqz3MN4wtnUGShclmqC7aITkHF0tVNLHdexFmMc3trA==} - engines: {node: '>= 10'} - cpu: [x64] - os: [linux] - - '@livekit/rtc-node-linux-x64-gnu@0.13.12': - resolution: {integrity: sha512-EoP2EhdASL9ZqpkDzjtrVvae/Jv9ATH4vKHseVFOveHfw5gSOirFFwqeoibERUgAzHaV1qfA3AmZlS/ErG+iBw==} + '@livekit/rtc-node-linux-x64-gnu@0.13.13': + resolution: {integrity: sha512-B/SgbeBRobpA5LqmDEoBJHpRXePpoF4RO4F0zJf9BdkDhOR0j77p6hD0ZiOuPTRoBzUqukpsTszp+lZnHoNmiA==} engines: {node: '>= 10'} cpu: [x64] os: [linux] - '@livekit/rtc-node-win32-x64-msvc@0.13.11': - resolution: {integrity: sha512-NYemYGbc271SFv+ttYaNvEKLWmwkJqn988xOasq+lWd31kuhj0krR2cThM07HKhgoQwOfdyIcSV87b0mjPzb/A==} - engines: {node: '>= 10'} - cpu: [x64] - os: [win32] - - '@livekit/rtc-node-win32-x64-msvc@0.13.12': - resolution: {integrity: sha512-O4M5s1c/DvP9PLc1ac5cDdp9YKZOmnPLHmDG5hmHFzzrnT9YT+1PNXkvl7xDIG7YUHTSlwJyIdMvQ6CGmQkY3w==} + '@livekit/rtc-node-win32-x64-msvc@0.13.13': + resolution: {integrity: sha512-ygVYV4eHczs3QdaW/p0ADhhm7InUDhFaCYk8OzzIn056ZibZPXzvPizCThZqs8VsDniA01MraZF3qhZZb8IyRg==} engines: {node: '>= 10'} cpu: [x64] os: [win32] - '@livekit/rtc-node@0.13.11': - resolution: {integrity: sha512-yq9uNRK+cdee0W6w0HPMSjTHovUteY4t4ZFrTdmpNt7fg/VxaJkdpXaG7cg8t+RX0pBT/NHskSQ4WFrqumezZg==} - engines: {node: '>= 18'} - - '@livekit/rtc-node@0.13.12': - resolution: {integrity: sha512-WjfCrTS2tqQgsP250GzgrSsCXq507p6V9FwXR3djv2AYrBWEXal3FwhXIo5LqKQhL17HP1XxBmupVPbN6HV5nQ==} + '@livekit/rtc-node@0.13.13': + resolution: {integrity: sha512-aB6i46dLWX+nj7VdkirZho2XemcinlJvb0YY1RZj1EZTkNsmKVS6BUDmYk0njLqr5862jazkxGPwG6Zmh3e2kw==} engines: {node: '>= 18'} '@livekit/typed-emitter@3.0.0': @@ -5214,51 +5180,22 @@ snapshots: dependencies: '@bufbuild/protobuf': 1.10.0 - '@livekit/rtc-node-darwin-arm64@0.13.11': - optional: true - - '@livekit/rtc-node-darwin-arm64@0.13.12': - optional: true - - '@livekit/rtc-node-darwin-x64@0.13.11': - optional: true - - '@livekit/rtc-node-darwin-x64@0.13.12': - optional: true - - '@livekit/rtc-node-linux-arm64-gnu@0.13.11': + '@livekit/rtc-node-darwin-arm64@0.13.13': optional: true - '@livekit/rtc-node-linux-arm64-gnu@0.13.12': + '@livekit/rtc-node-darwin-x64@0.13.13': optional: true - '@livekit/rtc-node-linux-x64-gnu@0.13.11': + '@livekit/rtc-node-linux-arm64-gnu@0.13.13': optional: true - '@livekit/rtc-node-linux-x64-gnu@0.13.12': + '@livekit/rtc-node-linux-x64-gnu@0.13.13': optional: true - '@livekit/rtc-node-win32-x64-msvc@0.13.11': + '@livekit/rtc-node-win32-x64-msvc@0.13.13': optional: true - '@livekit/rtc-node-win32-x64-msvc@0.13.12': - optional: true - - '@livekit/rtc-node@0.13.11': - dependencies: - '@bufbuild/protobuf': 1.10.0 - '@livekit/mutex': 1.1.1 - '@livekit/typed-emitter': 3.0.0 - pino: 9.6.0 - pino-pretty: 13.0.0 - optionalDependencies: - '@livekit/rtc-node-darwin-arm64': 0.13.11 - '@livekit/rtc-node-darwin-x64': 0.13.11 - '@livekit/rtc-node-linux-arm64-gnu': 0.13.11 - '@livekit/rtc-node-linux-x64-gnu': 0.13.11 - '@livekit/rtc-node-win32-x64-msvc': 0.13.11 - - '@livekit/rtc-node@0.13.12': + '@livekit/rtc-node@0.13.13': dependencies: '@bufbuild/protobuf': 1.10.0 '@livekit/mutex': 1.1.1 @@ -5266,11 +5203,11 @@ snapshots: pino: 9.6.0 pino-pretty: 13.0.0 optionalDependencies: - '@livekit/rtc-node-darwin-arm64': 0.13.12 - '@livekit/rtc-node-darwin-x64': 0.13.12 - '@livekit/rtc-node-linux-arm64-gnu': 0.13.12 - '@livekit/rtc-node-linux-x64-gnu': 0.13.12 - '@livekit/rtc-node-win32-x64-msvc': 0.13.12 + '@livekit/rtc-node-darwin-arm64': 0.13.13 + '@livekit/rtc-node-darwin-x64': 0.13.13 + '@livekit/rtc-node-linux-arm64-gnu': 0.13.13 + '@livekit/rtc-node-linux-x64-gnu': 0.13.13 + '@livekit/rtc-node-win32-x64-msvc': 0.13.13 '@livekit/typed-emitter@3.0.0': {} From b3d2bc5d2dcc707827065e8924ec955285640389 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Mon, 19 May 2025 10:10:15 -0700 Subject: [PATCH 05/13] update git ignore --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 327fa1ea3..c085243d0 100644 --- a/.gitignore +++ b/.gitignore @@ -185,3 +185,6 @@ docs # direnv .direnv + +# vscode workspace config +agents-js.code-workspace \ No newline at end of file From ef332c549e2f39abf4d479c0c70aa57ed2f37c1c Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 19 May 2025 19:22:11 +0200 Subject: [PATCH 06/13] Simplify deferred stream helper (#392) --- agents/src/deferred_stream.ts | 64 ------------------------- agents/src/stream/deferred_stream.ts | 27 +++++++++++ agents/src/stream/identity_transform.ts | 12 +++++ 3 files changed, 39 insertions(+), 64 deletions(-) delete mode 100644 agents/src/deferred_stream.ts create mode 100644 agents/src/stream/deferred_stream.ts create mode 100644 agents/src/stream/identity_transform.ts diff --git a/agents/src/deferred_stream.ts b/agents/src/deferred_stream.ts deleted file mode 100644 index 73d28bebf..000000000 --- a/agents/src/deferred_stream.ts +++ /dev/null @@ -1,64 +0,0 @@ -// SPDX-FileCopyrightText: 2025 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 -import { ReadableStream } from 'node:stream/web'; -import { Future } from './utils.js'; - -export class DeferredReadableStream { - private _sourceFuture: Future>; - - private _reader?: ReadableStreamDefaultReader; - - public readonly stream: ReadableStream; - - constructor() { - this._sourceFuture = new Future>(); - - this.stream = new ReadableStream({ - start: async (controller) => { - try { - const source = await this._sourceFuture.await; - - this._reader = source.getReader(); - - const pump = async () => { - try { - while (true) { - const { done, value } = await this._reader!.read(); - if (done) break; - controller.enqueue(value); - } - controller.close(); - } catch (err) { - controller.error(err); - } - }; - - pump(); - } catch (err) { - controller.error(err); - } - }, - cancel: async (reason) => { - await this.cancel(reason); - }, - }); - } - - /** - * Call once the actual source is ready. - */ - setSource(source: ReadableStream) { - if (this._sourceFuture.done) { - return; - } - this._sourceFuture.resolve(source); - } - - async cancel(reason?: Error) { - if (!this._sourceFuture.done) { - this._sourceFuture.reject(reason ?? new Error('Stream cancelled without reason')); - } - await this._reader?.cancel(reason); - } -} diff --git a/agents/src/stream/deferred_stream.ts b/agents/src/stream/deferred_stream.ts new file mode 100644 index 000000000..305f5efec --- /dev/null +++ b/agents/src/stream/deferred_stream.ts @@ -0,0 +1,27 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { type ReadableStream } from 'node:stream/web'; +import { IdentityTransform } from './identity_transform.js'; + +export class DeferredReadableStream { + private transform: IdentityTransform; + + get stream() { + return this.transform.readable; + } + + constructor() { + this.transform = new IdentityTransform(); + } + + /** + * Call once the actual source is ready. + */ + setSource(source: ReadableStream) { + if (this.transform.writable.locked) { + throw new Error('Stream is already locked'); + } + source.pipeTo(this.transform.writable); + } +} diff --git a/agents/src/stream/identity_transform.ts b/agents/src/stream/identity_transform.ts new file mode 100644 index 000000000..cb83f091f --- /dev/null +++ b/agents/src/stream/identity_transform.ts @@ -0,0 +1,12 @@ +// SPDX-FileCopyrightText: 2025 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { TransformStream } from 'node:stream/web'; + +export class IdentityTransform extends TransformStream { + constructor() { + super({ + transform: (chunk, controller) => controller.enqueue(chunk), + }); + } +} From 49b16061e1cea7a38a41eba1be6c5fc470071623 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 19 May 2025 19:28:34 +0200 Subject: [PATCH 07/13] fix deferred stream inputs (#393) --- agents/src/vad.ts | 2 +- agents/src/voice/audio_recognition.ts | 2 +- agents/src/voice/room_io.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/agents/src/vad.ts b/agents/src/vad.ts index 5b517abe5..cc1f0fba7 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -5,9 +5,9 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; import type { ReadableStream } from 'node:stream/web'; -import { DeferredReadableStream } from './deferred_stream.js'; import { log } from './log.js'; import type { VADMetrics } from './metrics/base.js'; +import { DeferredReadableStream } from './stream/deferred_stream.js'; import { AsyncIterableQueue } from './utils.js'; export enum VADEventType { diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index e668b9e12..38764861a 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -3,8 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; -import { DeferredReadableStream } from '../deferred_stream.js'; import { log } from '../log.js'; +import { DeferredReadableStream } from '../stream/deferred_stream.js'; import type { SpeechEvent } from '../stt/stt.js'; import { type VAD, type VADEvent, VADEventType } from '../vad.js'; diff --git a/agents/src/voice/room_io.ts b/agents/src/voice/room_io.ts index ba5f8eb10..67aa77ddb 100644 --- a/agents/src/voice/room_io.ts +++ b/agents/src/voice/room_io.ts @@ -4,8 +4,8 @@ import type { AudioFrame, Room } from '@livekit/rtc-node'; import { AudioStream, type RemoteTrack, RoomEvent, TrackKind } from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; -import { DeferredReadableStream } from '../deferred_stream.js'; import { log } from '../log.js'; +import { DeferredReadableStream } from '../stream/deferred_stream.js'; import type { AgentSession } from './agent_session.js'; export class RoomIO { From ac35f860d49b2012f4f2fc6801640e9930805741 Mon Sep 17 00:00:00 2001 From: Shubhra Date: Mon, 19 May 2025 14:51:39 -0600 Subject: [PATCH 08/13] Shubhra/add stt to audio recognition (#385) Co-authored-by: lukasIO --- agents/src/stt/stt.ts | 27 ++++++- agents/src/voice/agent.ts | 41 +++++++++- agents/src/voice/agent_activity.ts | 28 +++++-- agents/src/voice/agent_session.ts | 5 +- agents/src/voice/audio_recognition.ts | 106 +++++++++++++++++++++++--- agents/src/voice/io.ts | 1 + agents/src/voice/room_io.ts | 8 +- examples/src/basic_agent.ts | 3 +- 8 files changed, 194 insertions(+), 25 deletions(-) diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 42868bfe8..ed8a5a47b 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -4,7 +4,10 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; +import type { ReadableStream } from 'node:stream/web'; +import { log } from '../log.js'; import type { STTMetrics } from '../metrics/base.js'; +import { DeferredReadableStream } from '../stream/deferred_stream.js'; import type { AudioBuffer } from '../utils.js'; import { AsyncIterableQueue } from '../utils.js'; @@ -143,10 +146,28 @@ export abstract class SpeechStream implements AsyncIterableIterator abstract label: string; protected closed = false; #stt: STT; - + private deferredInputStream: DeferredReadableStream; + private logger = log(); constructor(stt: STT) { this.#stt = stt; + this.deferredInputStream = new DeferredReadableStream(); this.monitorMetrics(); + this.mainTask(); + } + + protected async mainTask() { + // TODO(AJS-35): Implement STT with webstreams API + try { + const inputStream = this.deferredInputStream.stream; + const reader = inputStream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) break; + this.pushFrame(value); + } + } catch (error) { + this.logger.error('Error in STTStream mainTask:', error); + } } protected async monitorMetrics() { @@ -169,6 +190,10 @@ export abstract class SpeechStream implements AsyncIterableIterator this.output.close(); } + updateInputStream(audioStream: ReadableStream) { + this.deferredInputStream.setSource(audioStream); + } + /** Push an audio frame to the STT */ pushFrame(frame: AudioFrame) { if (this.input.closed) { diff --git a/agents/src/voice/agent.ts b/agents/src/voice/agent.ts index c2bdf3512..d29d8b35b 100644 --- a/agents/src/voice/agent.ts +++ b/agents/src/voice/agent.ts @@ -6,11 +6,14 @@ /* eslint-disable @typescript-eslint/no-explicit-any */ import type { AudioFrame } from '@livekit/rtc-node'; +import { ReadableStream } from 'node:stream/web'; import type { ChatChunk, ChatMessage, LLM } from '../llm/index.js'; import { ChatContext } from '../llm/index.js'; +import { StreamAdapter as STTStreamAdapter } from '../stt/index.js'; import type { STT, SpeechEvent } from '../stt/index.js'; import type { TTS } from '../tts/index.js'; import type { VAD } from '../vad.js'; +import type { AgentActivity } from './agent_activity.js'; export class Agent { private instructions: string; @@ -21,7 +24,9 @@ export class Agent { private vad: VAD | undefined; private llm: LLM | any; private tts: TTS | undefined; - private agentActivity: any; // TODO(shubhra): add type + + /** @internal */ + agentActivity?: AgentActivity; constructor( instructions: string, @@ -62,7 +67,7 @@ export class Agent { audio: ReadableStream, modelSettings: any, // TODO(shubhra): add type ): Promise | null> { - return null; + return Agent.default.sttNode(this, audio, modelSettings); } async llmNode( @@ -82,12 +87,42 @@ export class Agent { // realtime_audio_output_node + getActivityOrThrow(): AgentActivity { + if (!this.agentActivity) { + throw new Error('Agent activity not found'); + } + return this.agentActivity; + } + static default = { async sttNode( + agent: Agent, audio: ReadableStream, modelSettings: any, // TODO(shubhra): add type ): Promise | null> { - return null; + const activity = agent.getActivityOrThrow(); + + let wrapped_stt = activity.stt; + + if (!wrapped_stt.capabilities.streaming) { + if (!agent.vad) { + throw new Error( + 'STT does not support streaming, add a VAD to the AgentTask/VoiceAgent to enable streaming', + ); + } + wrapped_stt = new STTStreamAdapter(wrapped_stt, agent.vad); + } + + const stream = wrapped_stt.stream(); + stream.updateInputStream(audio); + + return new ReadableStream({ + async start(controller) { + for await (const event of stream) { + controller.enqueue(event); + } + }, + }); }, }; } diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index faa8ccf58..89d913ea8 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -4,7 +4,7 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; -import type { SpeechEvent } from '../stt/stt.js'; +import type { STT, SpeechEvent } from '../stt/stt.js'; import type { VADEvent } from '../vad.js'; import type { Agent } from './agent.js'; import type { AgentSession } from './agent_session.js'; @@ -18,6 +18,8 @@ export class AgentActivity implements RecognitionHooks { private started = false; private audioRecognition?: AudioRecognition; private logger = log(); + private turnDetectionMode?: string; + agent: Agent; agentSession: AgentSession; @@ -27,12 +29,23 @@ export class AgentActivity implements RecognitionHooks { } async start(): Promise { - if (this.started) { - return; - } - this.audioRecognition = new AudioRecognition(this, this.agentSession.vad); + this.agent.agentActivity = this; + this.audioRecognition = new AudioRecognition( + this, + this.agentSession.vad, + // Arrow function preserves the Agent context + (...args) => this.agent.sttNode(...args), + this.turnDetectionMode === 'manual', + ); this.audioRecognition.start(); this.started = true; + + // TODO(shubhra): Add turn detection mode + } + + get stt(): STT { + // TODO(shubhra): Allow components to be defined in Agent class + return this.agentSession.stt; } updateAudioInput(audioStream: ReadableStream): void { @@ -47,8 +60,9 @@ export class AgentActivity implements RecognitionHooks { this.logger.info('End of speech', ev); } + // eslint-disable-next-line @typescript-eslint/no-unused-vars onVADInferenceDone(ev: VADEvent): void { - this.logger.info('VAD inference done', ev); + // TODO(AJS-40): Implement this } onInterimTranscript(ev: SpeechEvent): void { @@ -56,7 +70,7 @@ export class AgentActivity implements RecognitionHooks { } onFinalTranscript(ev: SpeechEvent): void { - this.logger.info('Final transcript', ev); + this.logger.info(`Final transcript ${ev.alternatives![0].text}`); } onEndOfTurn(ev: EndOfTurnInfo): void { diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 0faef10bd..1f8bd3063 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -4,6 +4,7 @@ import type { AudioFrame, Room } from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; +import type { STT } from '../stt/index.js'; import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; import { AgentActivity } from './agent_activity.js'; @@ -11,6 +12,7 @@ import { RoomIO } from './room_io.js'; export class AgentSession { vad: VAD; + stt: STT; private agent?: Agent; private activity?: AgentActivity; @@ -23,8 +25,9 @@ export class AgentSession { /** @internal */ audioInput?: ReadableStream; - constructor(vad: VAD) { + constructor(vad: VAD, stt: STT) { this.vad = vad; + this.stt = stt; } async start(agent: Agent, room: Room): Promise { diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 38764861a..6a6781217 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -2,11 +2,12 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; -import type { ReadableStream } from 'node:stream/web'; +import { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; -import type { SpeechEvent } from '../stt/stt.js'; +import { type SpeechEvent, SpeechEventType } from '../stt/stt.js'; import { type VAD, type VADEvent, VADEventType } from '../vad.js'; +import type { STTNode } from './io.js'; export interface EndOfTurnInfo { newTranscript: string; @@ -26,23 +27,102 @@ export interface RecognitionHooks { export class AudioRecognition { private deferredInputStream: DeferredReadableStream; private vadStreamProcessor?: Promise; + private sttStreamProcessor?: Promise; private logger = log(); - + private lastLanguage?: string; + private lastFinalTranscriptTime = 0; + private audioTranscript = ''; + private audioInterimTranscript = ''; + private lastSpeakingTime = 0; + private userTurnCommitted = false; + private speaking = false; constructor( private hooks: RecognitionHooks, private vad: VAD, + private stt: STTNode, + private manualTurnDetection = false, ) { this.deferredInputStream = new DeferredReadableStream(); } - start() { - this.vadStreamProcessor = this.vadTask().catch((err) => { - this.logger.error('Error in VAD task', err); + async start() { + const [vadInputStream, sttInputStream] = this.deferredInputStream.stream.tee(); + this.vadStreamProcessor = this.vadTask(vadInputStream).catch((err) => { + throw err; + }); + this.sttStreamProcessor = this.sttTask(sttInputStream).catch((err) => { + throw err; }); } - private async vadTask() { - const inputStream = this.deferredInputStream.stream; + private async onSTTEvent(ev: SpeechEvent) { + // TODO(AJS-30) ignore stt event if user turn already committed and EOU task is done + // or it's an interim transcript + + switch (ev.type) { + case SpeechEventType.FINAL_TRANSCRIPT: + this.hooks.onFinalTranscript(ev); + const transcript = ev.alternatives?.[0]?.text; + this.lastLanguage = ev.alternatives?.[0]?.language; + + if (!transcript) return; + + this.logger.debug('received user transcript', { + user_transcript: transcript, + language: this.lastLanguage, + }); + + this.lastFinalTranscriptTime = Date.now(); + this.audioTranscript += ` ${transcript}`; + this.audioTranscript = this.audioTranscript.trim(); + this.audioInterimTranscript = ''; + + if (!this.speaking) { + if (!this.vad) { + this.lastSpeakingTime = Date.now(); + } + } + + if (!this.manualTurnDetection || this.userTurnCommitted) { + this.hooks.onEndOfTurn({ + newTranscript: transcript, + transcriptionDelay: this.lastFinalTranscriptTime - this.lastSpeakingTime, + endOfUtteranceDelay: this.lastFinalTranscriptTime - Date.now(), + }); + } + + break; + case SpeechEventType.INTERIM_TRANSCRIPT: + this.hooks.onInterimTranscript(ev); + this.audioInterimTranscript = ev.alternatives?.[0]?.text ?? ''; + break; + } + } + + private async sttTask(inputStream: ReadableStream) { + const sttStream = await this.stt(inputStream, {}); + if (sttStream === null) { + return; + } + if (sttStream instanceof ReadableStream) { + const reader = sttStream.getReader(); + while (true) { + const { done, value: ev } = await reader.read(); + if (done) { + break; + } + if (typeof ev === 'string') { + throw new Error('STT node must yield SpeechEvent'); + } else { + await this.onSTTEvent(ev); + } + } + reader.releaseLock(); + sttStream.cancel(); + } + } + + private async vadTask(inputStream: ReadableStream) { const vadStream = this.vad.stream(); vadStream.updateInputStream(inputStream); @@ -50,13 +130,17 @@ export class AudioRecognition { switch (ev.type) { case VADEventType.START_OF_SPEECH: this.hooks.onStartOfSpeech(ev); - break; - case VADEventType.END_OF_SPEECH: - this.hooks.onEndOfSpeech(ev); + this.speaking = true; break; case VADEventType.INFERENCE_DONE: this.hooks.onVADInferenceDone(ev); break; + case VADEventType.END_OF_SPEECH: + this.hooks.onEndOfSpeech(ev); + this.speaking = false; + // when VAD fires END_OF_SPEECH, it already waited for the silence_duration + this.lastSpeakingTime = Date.now() - ev.silenceDuration; + break; } } } diff --git a/agents/src/voice/io.ts b/agents/src/voice/io.ts index ca2b8c47e..0a79bf0ae 100644 --- a/agents/src/voice/io.ts +++ b/agents/src/voice/io.ts @@ -2,6 +2,7 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame } from '@livekit/rtc-node'; +import type { ReadableStream } from 'node:stream/web'; import type { SpeechEvent } from '../stt/stt.js'; export type STTNode = ( diff --git a/agents/src/voice/room_io.ts b/agents/src/voice/room_io.ts index 67aa77ddb..9d2761af5 100644 --- a/agents/src/voice/room_io.ts +++ b/agents/src/voice/room_io.ts @@ -35,7 +35,13 @@ export class RoomIO { private onTrackSubscribed = (track: RemoteTrack) => { if (track.kind === TrackKind.KIND_AUDIO) { - this._deferredAudioInputStream.setSource(new AudioStream(track)); + this._deferredAudioInputStream.setSource( + new AudioStream(track, { + // TODO(AJS-41) remove hardcoded sample rate + sampleRate: 16000, + numChannels: 1, + }), + ); } }; diff --git a/examples/src/basic_agent.ts b/examples/src/basic_agent.ts index 410eaef29..12f5c56b9 100644 --- a/examples/src/basic_agent.ts +++ b/examples/src/basic_agent.ts @@ -9,6 +9,7 @@ import { defineAgent, voice, } from '@livekit/agents'; +import * as deepgram from '@livekit/agents-plugin-deepgram'; import * as silero from '@livekit/agents-plugin-silero'; import { fileURLToPath } from 'node:url'; @@ -24,7 +25,7 @@ export default defineAgent({ const vad = ctx.proc.userData.vad! as silero.VAD; - const session = new voice.AgentSession(vad); + const session = new voice.AgentSession(vad, new deepgram.STT()); session.start(agent, ctx.room); }, }); From bcfc134233810cfa44a528b2578d8c38c4063434 Mon Sep 17 00:00:00 2001 From: Shubhra Date: Mon, 19 May 2025 15:26:52 -0600 Subject: [PATCH 09/13] Shubhra/ajs 31 refactor vad with streams (#390) Co-authored-by: lukasIO --- agents/src/vad.ts | 97 ++++++++++++++++++--------- agents/src/voice/audio_recognition.ts | 4 +- plugins/silero/src/vad.ts | 13 ++-- 3 files changed, 76 insertions(+), 38 deletions(-) diff --git a/agents/src/vad.ts b/agents/src/vad.ts index cc1f0fba7..2e135df3a 100644 --- a/agents/src/vad.ts +++ b/agents/src/vad.ts @@ -4,11 +4,15 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; -import type { ReadableStream } from 'node:stream/web'; +import type { + ReadableStream, + ReadableStreamDefaultReader, + WritableStreamDefaultWriter, +} from 'node:stream/web'; import { log } from './log.js'; import type { VADMetrics } from './metrics/base.js'; import { DeferredReadableStream } from './stream/deferred_stream.js'; -import { AsyncIterableQueue } from './utils.js'; +import { IdentityTransform } from './stream/identity_transform.js'; export enum VADEventType { START_OF_SPEECH, @@ -80,46 +84,70 @@ export abstract class VAD extends (EventEmitter as new () => TypedEmitter { protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL'); - protected input = new AsyncIterableQueue(); - protected queue = new AsyncIterableQueue(); - protected output = new AsyncIterableQueue(); + protected input = new IdentityTransform(); + protected output = new IdentityTransform(); + protected inputWriter: WritableStreamDefaultWriter; + protected inputReader: ReadableStreamDefaultReader; + protected outputWriter: WritableStreamDefaultWriter; + protected outputReader: ReadableStreamDefaultReader; protected closed = false; + protected inputClosed = false; + #vad: VAD; #lastActivityTime = BigInt(0); private logger = log(); private deferredInputStream: DeferredReadableStream; + private metricsStream: ReadableStream; constructor(vad: VAD) { this.#vad = vad; this.deferredInputStream = new DeferredReadableStream(); + + this.inputWriter = this.input.writable.getWriter(); + this.inputReader = this.input.readable.getReader(); + this.outputWriter = this.output.writable.getWriter(); + + const [outputStream, metricsStream] = this.output.readable.tee(); + this.metricsStream = metricsStream; + this.outputReader = outputStream.getReader(); + + this.pumpDeferredStream(); this.monitorMetrics(); - this.mainTask(); } - protected async mainTask() { - // This is just a placeholder since VAD isn't implemented with the streams API yet. + /** + * Reads from the deferred input stream and forwards chunks to the input writer. + * + * Note: we can't just do this.deferredInputStream.stream.pipeTo(this.input.writable) + * because the inputWriter locks the this.input.writable stream. All writes must go through + * the inputWriter. + */ + private async pumpDeferredStream() { + const reader = this.deferredInputStream.stream.getReader(); try { - const inputStream = this.deferredInputStream.stream; - const reader = inputStream.getReader(); while (true) { const { done, value } = await reader.read(); - if (done) { - break; - } - this.pushFrame(value); + if (done) break; + await this.inputWriter.write(value); } - } catch (error) { - this.logger.error('Error in VADStream mainTask:', error); + } catch (e) { + this.logger.error(`Error pumping deferred stream: ${e}`); + throw e; + } finally { + reader.releaseLock(); } } protected async monitorMetrics() { let inferenceDurationTotal = 0; let inferenceCount = 0; - - for await (const event of this.queue) { - this.output.put(event); - switch (event.type) { + const metricsReader = this.metricsStream.getReader(); + while (true) { + const { done, value } = await metricsReader.read(); + if (done) { + break; + } + switch (value.type) { case VADEventType.START_OF_SPEECH: inferenceCount++; if (inferenceCount >= 1 / this.#vad.capabilities.updateInterval) { @@ -143,51 +171,56 @@ export abstract class VADStream implements AsyncIterableIterator { break; } } - this.output.close(); } updateInputStream(audioStream: ReadableStream) { this.deferredInputStream.setSource(audioStream); } + /** @deprecated Use `updateInputStream` instead */ pushFrame(frame: AudioFrame) { - if (this.input.closed) { + // TODO(AJS-395): remove this method + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(frame); + this.inputWriter.write(frame); } flush() { - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(VADStream.FLUSH_SENTINEL); + this.inputWriter.write(VADStream.FLUSH_SENTINEL); } endInput() { - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.close(); + this.inputClosed = true; + this.input.writable.close(); } - next(): Promise> { - return this.output.next(); + async next(): Promise> { + return this.outputReader.read().then(({ done, value }) => { + if (done) { + return { done: true, value: undefined }; + } + return { done: false, value }; + }); } close() { - this.input.close(); - this.queue.close(); - this.output.close(); + this.input.writable.close(); this.closed = true; } diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 6a6781217..88699e6bf 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -48,10 +48,10 @@ export class AudioRecognition { async start() { const [vadInputStream, sttInputStream] = this.deferredInputStream.stream.tee(); this.vadStreamProcessor = this.vadTask(vadInputStream).catch((err) => { - throw err; + this.logger.error(`Error in VAD task: ${err}`); }); this.sttStreamProcessor = this.sttTask(sttInputStream).catch((err) => { - throw err; + this.logger.error(`Error in STT task: ${err}`); }); } diff --git a/plugins/silero/src/vad.ts b/plugins/silero/src/vad.ts index 6d38462b5..1e42aef5e 100644 --- a/plugins/silero/src/vad.ts +++ b/plugins/silero/src/vad.ts @@ -157,7 +157,12 @@ export class VADStream extends baseStream { // used to avoid drift when the sampleRate ratio is not an integer let inputCopyRemainingFrac = 0.0; - for await (const frame of this.input) { + while (true) { + const { done, value: frame } = await this.inputReader.read(); + if (done) { + break; + } + if (typeof frame === 'symbol') { continue; // ignore flush sentinel for now } @@ -255,7 +260,7 @@ export class VADStream extends baseStream { pubSilenceDuration += inferenceDuration; } - this.queue.put({ + this.outputWriter.write({ type: VADEventType.INFERENCE_DONE, samplesIndex: pubCurrentSample, timestamp: pubTimestamp, @@ -309,7 +314,7 @@ export class VADStream extends baseStream { pubSilenceDuration = 0; pubSpeechDuration = speechThresholdDuration; - this.queue.put({ + this.outputWriter.write({ type: VADEventType.START_OF_SPEECH, samplesIndex: pubCurrentSample, timestamp: pubTimestamp, @@ -336,7 +341,7 @@ export class VADStream extends baseStream { pubSpeechDuration = 0; pubSilenceDuration = silenceThresholdDuration; - this.queue.put({ + this.outputWriter.write({ type: VADEventType.END_OF_SPEECH, samplesIndex: pubCurrentSample, timestamp: pubTimestamp, From 37576deb3cf34d582d1bba51586b01738102c27a Mon Sep 17 00:00:00 2001 From: Shubhra Date: Wed, 21 May 2025 17:13:17 -0600 Subject: [PATCH 10/13] Add agent/user state (#397) --- agents/src/voice/agent_session.ts | 14 ++++++++++++++ agents/src/voice/events.ts | 5 +++++ 2 files changed, 19 insertions(+) create mode 100644 agents/src/voice/events.ts diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index 1f8bd3063..b4e2929ea 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -4,10 +4,12 @@ import type { AudioFrame, Room } from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; +import type { AgentState } from '../pipeline/index.js'; import type { STT } from '../stt/index.js'; import type { VAD } from '../vad.js'; import type { Agent } from './agent.js'; import { AgentActivity } from './agent_activity.js'; +import type { UserState } from './events.js'; import { RoomIO } from './room_io.js'; export class AgentSession { @@ -18,6 +20,8 @@ export class AgentSession { private activity?: AgentActivity; private nextActivity?: AgentActivity; private started = false; + private userState: UserState = 'listening'; + private agentState: AgentState = 'initializing'; private roomIO?: RoomIO; private logger = log(); @@ -64,4 +68,14 @@ export class AgentSession { await this.activity.start(); } } + + /** @internal */ + _updateAgentState(state: AgentState) { + this.agentState = state; + } + + /** @internal */ + _updateUserState(state: UserState) { + this.userState = state; + } } diff --git a/agents/src/voice/events.ts b/agents/src/voice/events.ts new file mode 100644 index 000000000..4710b5efe --- /dev/null +++ b/agents/src/voice/events.ts @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +export type AgentState = 'initializing' | 'thinking' | 'listening' | 'speaking'; +export type UserState = 'idle' | 'thinking' | 'listening' | 'speaking'; From 4cf30596a938bdc4b706e3a6e813a01ede098753 Mon Sep 17 00:00:00 2001 From: Shubhra Date: Wed, 21 May 2025 17:13:30 -0600 Subject: [PATCH 11/13] Add basic audio output support (#396) --- agents/src/voice/agent_session.ts | 7 +++++-- agents/src/voice/room_io.ts | 27 +++++++++++++++++++++++++-- 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/agents/src/voice/agent_session.ts b/agents/src/voice/agent_session.ts index b4e2929ea..b3dc43e7e 100644 --- a/agents/src/voice/agent_session.ts +++ b/agents/src/voice/agent_session.ts @@ -1,7 +1,7 @@ // SPDX-FileCopyrightText: 2024 LiveKit, Inc. // // SPDX-License-Identifier: Apache-2.0 -import type { AudioFrame, Room } from '@livekit/rtc-node'; +import type { AudioFrame, AudioSource, Room } from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import type { AgentState } from '../pipeline/index.js'; @@ -28,6 +28,8 @@ export class AgentSession { /** @internal */ audioInput?: ReadableStream; + /** @internal */ + audioOutput?: AudioSource; constructor(vad: VAD, stt: STT) { this.vad = vad; @@ -45,7 +47,8 @@ export class AgentSession { await this.updateActivity(this.agent); } - this.roomIO = new RoomIO(this, room); + // TODO(AJS-38): update with TTS sample rate and num channels + this.roomIO = new RoomIO(this, room, 0, 0); this.roomIO.start(); if (this.audioInput) { diff --git a/agents/src/voice/room_io.ts b/agents/src/voice/room_io.ts index 9d2761af5..5d60bbe0c 100644 --- a/agents/src/voice/room_io.ts +++ b/agents/src/voice/room_io.ts @@ -2,7 +2,17 @@ // // SPDX-License-Identifier: Apache-2.0 import type { AudioFrame, Room } from '@livekit/rtc-node'; -import { AudioStream, type RemoteTrack, RoomEvent, TrackKind } from '@livekit/rtc-node'; +import { + AudioSource, + AudioStream, + LocalAudioTrack, + type LocalTrackPublication, + type RemoteTrack, + RoomEvent, + TrackKind, + TrackPublishOptions, + TrackSource, +} from '@livekit/rtc-node'; import type { ReadableStream } from 'node:stream/web'; import { log } from '../log.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; @@ -16,11 +26,14 @@ export class RoomIO { private room: Room; private _deferredAudioInputStream = new DeferredReadableStream(); + private audioSource: AudioSource; + private publication?: LocalTrackPublication; - constructor(agentSession: AgentSession, room: Room) { + constructor(agentSession: AgentSession, room: Room, sampleRate: number, numChannels: number) { this.agentSession = agentSession; this.room = room; this.participantAudioInputStream = this._deferredAudioInputStream.stream; + this.audioSource = new AudioSource(sampleRate, numChannels); this.setupEventListeners(); } @@ -45,7 +58,17 @@ export class RoomIO { } }; + private async publishTrack() { + const track = LocalAudioTrack.createAudioTrack('roomio_audio', this.audioSource); + this.publication = await this.room.localParticipant?.publishTrack( + track, + new TrackPublishOptions({ source: TrackSource.SOURCE_MICROPHONE }), + ); + } + start() { + this.publishTrack(); this.agentSession.audioInput = this.participantAudioInputStream; + this.agentSession.audioOutput = this.audioSource; } } From d4cbfcfee80f6c5484d8ece9a3b43fee870e5941 Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Thu, 29 May 2025 16:32:21 -0400 Subject: [PATCH 12/13] init --- agents/src/stt/stt.ts | 105 +++++++++++++++++++++++++++++------------- 1 file changed, 73 insertions(+), 32 deletions(-) diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index ed8a5a47b..3d8642865 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -4,12 +4,16 @@ import type { AudioFrame } from '@livekit/rtc-node'; import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; import { EventEmitter } from 'node:events'; -import type { ReadableStream } from 'node:stream/web'; +import type { + ReadableStream, + ReadableStreamDefaultReader, + WritableStreamDefaultWriter, +} from 'node:stream/web'; import { log } from '../log.js'; import type { STTMetrics } from '../metrics/base.js'; import { DeferredReadableStream } from '../stream/deferred_stream.js'; +import { IdentityTransform } from '../stream/identity_transform.js'; import type { AudioBuffer } from '../utils.js'; -import { AsyncIterableQueue } from '../utils.js'; /** Indicates start/middle/end of speech */ export enum SpeechEventType { @@ -140,102 +144,139 @@ export abstract class STT extends (EventEmitter as new () => TypedEmitter { protected static readonly FLUSH_SENTINEL = Symbol('FLUSH_SENTINEL'); - protected input = new AsyncIterableQueue(); - protected output = new AsyncIterableQueue(); - protected queue = new AsyncIterableQueue(); + protected input = new IdentityTransform(); + protected output = new IdentityTransform(); + + protected inputReader: ReadableStreamDefaultReader< + AudioFrame | typeof SpeechStream.FLUSH_SENTINEL + >; + protected outputWriter: WritableStreamDefaultWriter; + abstract label: string; - protected closed = false; #stt: STT; private deferredInputStream: DeferredReadableStream; private logger = log(); + private inputWriter: WritableStreamDefaultWriter; + private outputReader: ReadableStreamDefaultReader; + private metricsStream: ReadableStream; + private closed = false; + private inputClosed = false; + constructor(stt: STT) { this.#stt = stt; this.deferredInputStream = new DeferredReadableStream(); + + this.inputWriter = this.input.writable.getWriter(); + this.inputReader = this.input.readable.getReader(); + this.outputWriter = this.output.writable.getWriter(); + + const [outputStream, metricsStream] = this.output.readable.tee(); + this.metricsStream = metricsStream; + this.outputReader = outputStream.getReader(); + + this.pumpDeferredStream(); this.monitorMetrics(); - this.mainTask(); } - protected async mainTask() { - // TODO(AJS-35): Implement STT with webstreams API + /** + * Reads from the deferred input stream and forwards chunks to the input writer. + * + * Note: we can't just do this.deferredInputStream.stream.pipeTo(this.input.writable) + * because the inputWriter locks the this.input.writable stream. All writes must go through + * the inputWriter. + */ + private async pumpDeferredStream() { + const reader = this.deferredInputStream.stream.getReader(); try { - const inputStream = this.deferredInputStream.stream; - const reader = inputStream.getReader(); while (true) { const { done, value } = await reader.read(); if (done) break; - this.pushFrame(value); + await this.inputWriter.write(value); } - } catch (error) { - this.logger.error('Error in STTStream mainTask:', error); + } catch (e) { + this.logger.error(`Error pumping deferred stream: ${e}`); + throw e; + } finally { + reader.releaseLock(); } } protected async monitorMetrics() { const startTime = process.hrtime.bigint(); + const metricsReader = this.metricsStream.getReader(); + + while (true) { + const { done, value } = await metricsReader.read(); + if (done) { + break; + } + + if (value.type !== SpeechEventType.RECOGNITION_USAGE) continue; - for await (const event of this.queue) { - this.output.put(event); - if (event.type !== SpeechEventType.RECOGNITION_USAGE) continue; const duration = process.hrtime.bigint() - startTime; const metrics: STTMetrics = { timestamp: Date.now(), - requestId: event.requestId!, + requestId: value.requestId!, duration: Math.trunc(Number(duration / BigInt(1000000))), label: this.label, - audioDuration: event.recognitionUsage!.audioDuration, + audioDuration: value.recognitionUsage!.audioDuration, streamed: true, }; this.#stt.emit(SpeechEventType.METRICS_COLLECTED, metrics); } - this.output.close(); } updateInputStream(audioStream: ReadableStream) { this.deferredInputStream.setSource(audioStream); } - /** Push an audio frame to the STT */ + /** @deprecated Use `updateInputStream` instead */ pushFrame(frame: AudioFrame) { - if (this.input.closed) { + // TODO: remove this method in future version + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(frame); + this.inputWriter.write(frame); } /** Flush the STT, causing it to process all pending text */ flush() { - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.put(SpeechStream.FLUSH_SENTINEL); + this.inputWriter.write(SpeechStream.FLUSH_SENTINEL); } /** Mark the input as ended and forbid additional pushes */ endInput() { - if (this.input.closed) { + if (this.inputClosed) { throw new Error('Input is closed'); } if (this.closed) { throw new Error('Stream is closed'); } - this.input.close(); + this.inputClosed = true; + this.inputWriter.close(); } - next(): Promise> { - return this.output.next(); + async next(): Promise> { + return this.outputReader.read().then(({ done, value }) => { + if (done) { + return { done: true, value: undefined }; + } + return { done: false, value }; + }); } /** Close both the input and output of the STT stream */ close() { - this.input.close(); - this.queue.close(); - this.output.close(); + this.input.writable.close(); this.closed = true; } From 60bc9f19b200a9606437fa13fc13f156b247a21d Mon Sep 17 00:00:00 2001 From: Shubhrakanti Ganguly Date: Thu, 29 May 2025 16:58:52 -0400 Subject: [PATCH 13/13] finalize --- agents/src/stt/stream_adapter.ts | 15 +++++++++------ agents/src/stt/stt.ts | 5 ++--- plugins/deepgram/src/stt.ts | 20 +++++++++++--------- 3 files changed, 22 insertions(+), 18 deletions(-) diff --git a/agents/src/stt/stream_adapter.ts b/agents/src/stt/stream_adapter.ts index 0368ff3b6..279de653f 100644 --- a/agents/src/stt/stream_adapter.ts +++ b/agents/src/stt/stream_adapter.ts @@ -53,11 +53,14 @@ export class StreamAdapterWrapper extends SpeechStream { async #run() { const forwardInput = async () => { - for await (const input of this.input) { - if (input === SpeechStream.FLUSH_SENTINEL) { + while (true) { + const { done, value } = await this.inputReader.read(); + if (done) break; + + if (value === SpeechStream.FLUSH_SENTINEL) { this.#vadStream.flush(); } else { - this.#vadStream.pushFrame(input); + this.#vadStream.pushFrame(value); } } this.#vadStream.endInput(); @@ -67,10 +70,10 @@ export class StreamAdapterWrapper extends SpeechStream { for await (const ev of this.#vadStream) { switch (ev.type) { case VADEventType.START_OF_SPEECH: - this.output.put({ type: SpeechEventType.START_OF_SPEECH }); + this.outputWriter.write({ type: SpeechEventType.START_OF_SPEECH }); break; case VADEventType.END_OF_SPEECH: - this.output.put({ type: SpeechEventType.END_OF_SPEECH }); + this.outputWriter.write({ type: SpeechEventType.END_OF_SPEECH }); try { const event = await this.#stt.recognize(ev.frames); @@ -78,7 +81,7 @@ export class StreamAdapterWrapper extends SpeechStream { continue; } - this.output.put(event); + this.outputWriter.write(event); break; } catch (error) { let logger = log(); diff --git a/agents/src/stt/stt.ts b/agents/src/stt/stt.ts index 3d8642865..ec11f196f 100644 --- a/agents/src/stt/stt.ts +++ b/agents/src/stt/stt.ts @@ -151,7 +151,8 @@ export abstract class SpeechStream implements AsyncIterableIterator AudioFrame | typeof SpeechStream.FLUSH_SENTINEL >; protected outputWriter: WritableStreamDefaultWriter; - + protected closed = false; + protected inputClosed = false; abstract label: string; #stt: STT; private deferredInputStream: DeferredReadableStream; @@ -159,8 +160,6 @@ export abstract class SpeechStream implements AsyncIterableIterator private inputWriter: WritableStreamDefaultWriter; private outputReader: ReadableStreamDefaultReader; private metricsStream: ReadableStream; - private closed = false; - private inputClosed = false; constructor(stt: STT) { this.#stt = stt; diff --git a/plugins/deepgram/src/stt.ts b/plugins/deepgram/src/stt.ts index 560b26a70..5fe838da6 100644 --- a/plugins/deepgram/src/stt.ts +++ b/plugins/deepgram/src/stt.ts @@ -125,7 +125,6 @@ export class SpeechStream extends stt.SpeechStream { constructor(stt: STT, opts: STTOptions) { super(stt); this.#opts = opts; - this.closed = false; this.#audioEnergyFilter = new AudioEnergyFilter(); this.#run(); @@ -134,7 +133,7 @@ export class SpeechStream extends stt.SpeechStream { async #run(maxRetry = 32) { let retries = 0; let ws: WebSocket; - while (!this.input.closed) { + while (!this.inputClosed) { const streamURL = new URL(API_BASE_URL_V1); const params = { model: this.#opts.model, @@ -193,7 +192,7 @@ export class SpeechStream extends stt.SpeechStream { } } - this.closed = true; + this.close(); } updateOptions(opts: Partial) { @@ -222,7 +221,10 @@ export class SpeechStream extends stt.SpeechStream { samples100Ms, ); - for await (const data of this.input) { + while (true) { + const { done, value: data } = await this.inputReader.read(); + if (done) break; + let frames: AudioFrame[]; if (data === SpeechStream.FLUSH_SENTINEL) { frames = stream.flush(); @@ -270,7 +272,7 @@ export class SpeechStream extends stt.SpeechStream { // It's also possible we receive a transcript without a SpeechStarted event. if (this.#speaking) return; this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + this.outputWriter.write({ type: stt.SpeechEventType.START_OF_SPEECH }); break; } // see this page: @@ -288,16 +290,16 @@ export class SpeechStream extends stt.SpeechStream { if (alternatives[0] && alternatives[0].text) { if (!this.#speaking) { this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + this.outputWriter.write({ type: stt.SpeechEventType.START_OF_SPEECH }); } if (isFinal) { - this.queue.put({ + this.outputWriter.write({ type: stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives: [alternatives[0], ...alternatives.slice(1)], }); } else { - this.queue.put({ + this.outputWriter.write({ type: stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives: [alternatives[0], ...alternatives.slice(1)], }); @@ -309,7 +311,7 @@ export class SpeechStream extends stt.SpeechStream { // a non-empty transcript (deepgram doesn't have a SpeechEnded event) if (isEndpoint && this.#speaking) { this.#speaking = false; - this.queue.put({ type: stt.SpeechEventType.END_OF_SPEECH }); + this.outputWriter.write({ type: stt.SpeechEventType.END_OF_SPEECH }); } break;