diff --git a/.prettierrc b/.prettierrc new file mode 100644 index 0000000..feb658b --- /dev/null +++ b/.prettierrc @@ -0,0 +1 @@ +{ "tabWidth": 2, "useTabs": false } diff --git a/package-lock.json b/package-lock.json index 48fec9a..c728ab7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8521,6 +8521,11 @@ "url": "https://github.com/chalk/supports-color?sponsor=1" } }, + "node_modules/jmuxer": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/jmuxer/-/jmuxer-2.1.0.tgz", + "integrity": "sha512-iizwBTIV11RFKrOp0s/SKrb00yz2epwSOdWxdphSfV7gWlAi9ZXpDdNk/m67Dp0M3+4uGL0AcBQmhB2THxABpQ==" + }, "node_modules/js-tokens": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/js-tokens/-/js-tokens-4.0.0.tgz", @@ -13693,6 +13698,7 @@ "license": "MIT", "dependencies": { "@caplaz/eufy-security-client": "^0.3.0", + "jmuxer": "^2.1.0", "tslog": "^4.9.2" }, "devDependencies": { diff --git a/packages/eufy-security-client/src/api-manager-commands.ts b/packages/eufy-security-client/src/api-manager-commands.ts index f739ea5..7a258cf 100644 --- a/packages/eufy-security-client/src/api-manager-commands.ts +++ b/packages/eufy-security-client/src/api-manager-commands.ts @@ -424,6 +424,16 @@ export class DeviceCommandBuilder { }); } + /** + * Send raw audio data to the device during a talkback session + */ + async talkbackAudioData(buffer: Buffer) { + return this.api.command(DEVICE_COMMANDS.TALKBACK_AUDIO_DATA, { + serialNumber: this.serialNumber, + buffer: { type: "Buffer" as const, data: Array.from(buffer) }, + }); + } + // Lock operations /** * Calibrate the lock mechanism diff --git a/packages/eufy-security-client/src/device/commands.ts b/packages/eufy-security-client/src/device/commands.ts index 20a4d34..7fd5a08 100644 --- a/packages/eufy-security-client/src/device/commands.ts +++ b/packages/eufy-security-client/src/device/commands.ts @@ -151,6 +151,15 @@ export interface DeviceStopTalkbackCommand extends BaseDeviceCommand< typeof DEVICE_COMMANDS.STOP_TALKBACK > {} +/** + * Send audio data during talkback session + */ +export interface DeviceTalkbackAudioDataCommand extends BaseDeviceCommand< + typeof DEVICE_COMMANDS.TALKBACK_AUDIO_DATA +> { + buffer: { type: "Buffer"; data: number[] }; +} + /** * Unlock device (for locks) */ @@ -195,6 +204,7 @@ export type DeviceCommand = | DeviceGetVoicesCommand | DeviceStartTalkbackCommand | DeviceStopTalkbackCommand + | DeviceTalkbackAudioDataCommand | DeviceUnlockCommand | DeviceTriggerAlarmCommand | DeviceResetAlarmCommand; diff --git a/packages/eufy-security-scrypted/src/eufy-device.ts b/packages/eufy-security-scrypted/src/eufy-device.ts index 2cf9048..18344ad 100644 --- a/packages/eufy-security-scrypted/src/eufy-device.ts +++ b/packages/eufy-security-scrypted/src/eufy-device.ts @@ -32,6 +32,8 @@ import { Brightness, Camera, Charger, + FFmpegInput, + Intercom, MediaObject, MotionSensor, OnOff, @@ -44,6 +46,7 @@ import { ResponsePictureOptions, ScryptedDeviceBase, ScryptedInterface, + ScryptedMimeTypes, Sensors, Setting, SettingValue, @@ -54,6 +57,7 @@ import { VideoClipThumbnailOptions, VideoClips, } from "@scrypted/sdk"; +import sdk from "@scrypted/sdk"; import { DEVICE_EVENTS, @@ -69,6 +73,7 @@ import { } from "@caplaz/eufy-security-client"; import { Logger, ILogObj } from "tslog"; +import { ChildProcess, spawn } from "child_process"; import { StreamServer } from "@caplaz/eufy-stream-server"; // Device Services @@ -126,10 +131,14 @@ export class EufyDevice Brightness, Sensors, Settings, - Refresh + Refresh, + Intercom { private wsClient: EufyWebSocketClient; private logger: Logger; + private talkbackProcess?: ChildProcess; + private talkbackActive = false; + private intercomStartedLivestream = false; // Device info and state private latestProperties?: DeviceProperties; @@ -632,6 +641,178 @@ export class EufyDevice ); } + // =================== INTERCOM INTERFACE =================== + + /** + * Wait for a device event for this device's serial number. The listener + * self-removes on first match or on timeout. + */ + private waitForDeviceEvent( + eventType: T, + timeoutMs: number, + ): Promise { + return new Promise((resolve, reject) => { + let remove: (() => boolean) | undefined; + const timeout = setTimeout(() => { + remove?.(); + reject(new Error(`Timed out waiting for "${eventType}"`)); + }, timeoutMs); + // The waiter is a fail-safe — don't keep the event loop alive on + // its own. If the wait promise is abandoned (e.g. caller threw + // before awaiting it), we don't want to delay process exit. + timeout.unref?.(); + const callback: EventCallbackForType = () => { + clearTimeout(timeout); + remove?.(); + resolve(); + }; + remove = this.wsClient.addEventListener(eventType, callback, { + source: EVENT_SOURCES.DEVICE, + serialNumber: this.serialNumber, + }); + }); + } + + async startIntercom(media: MediaObject): Promise { + // Scrypted can call startIntercom mid-session. Re-entering is fine as + // long as we tear the previous session down cleanly first. + if (this.talkbackActive) { + await this.stopIntercom(); + } + + // Talkback on Eufy requires an active livestream owned by our ws + // session. Start it ourselves if not already running — the camera + // returns "device_livestream_not_running" otherwise. + let livestreaming = false; + try { + const status = await this.api.isLivestreaming(); + livestreaming = status.livestreaming; + } catch (e) { + throw new Error( + `Failed to query livestream status before starting talkback: ${e}`, + ); + } + if (!livestreaming) { + this.logger.info("Starting livestream to host talkback session"); + const livestreamStarted = this.waitForDeviceEvent( + DEVICE_EVENTS.LIVESTREAM_STARTED, + 10000, + ); + await this.api.startLivestream(); + await livestreamStarted; + this.intercomStartedLivestream = true; + } + + // Start talkback and wait for confirmation. bropat's client emits + // "talkback started" once the camera has opened its receive channel; + // writing before that event silently drops the audio. + this.logger.info("Starting talkback session on device"); + const talkbackStarted = this.waitForDeviceEvent( + DEVICE_EVENTS.TALKBACK_STARTED, + 10000, + ); + // Always attach a handler — the promise has its own 10s timeout, and + // if startTalkback throws below we'd leak an unhandled rejection + // when that timeout eventually fires. + talkbackStarted.catch(() => {}); + try { + await this.api.startTalkback(); + } catch (e) { + this.logger.warn(`Failed to start talkback: ${e}`); + if (this.intercomStartedLivestream) { + this.intercomStartedLivestream = false; + await this.api.stopLivestream().catch(() => {}); + } + throw e; + } + await talkbackStarted; + this.talkbackActive = true; + this.logger.info("Talkback ready — forwarding audio"); + + // Transcode the incoming intercom audio to AAC-LC/ADTS at 16 kHz mono + // 16 kbps — the exact format bropat's eufy-security-client expects on + // the talkback channel. + const ffmpegInput = + await sdk.mediaManager.convertMediaObjectToJSON( + media, + ScryptedMimeTypes.FFmpegInput, + ); + + const args = [ + ...(ffmpegInput.inputArguments ?? []), + "-vn", + "-acodec", + "aac", + "-ar", + "16000", + "-ac", + "1", + "-b:a", + "16k", + "-f", + "adts", + "pipe:1", + ]; + + this.talkbackProcess = spawn("ffmpeg", args); + + this.talkbackProcess.stdout?.on("data", async (chunk: Buffer) => { + if (!this.talkbackActive) return; + try { + await this.api.talkbackAudioData(chunk); + } catch (e) { + this.logger.warn(`Failed to send talkback audio chunk: ${e}`); + } + }); + + this.talkbackProcess.stderr?.on("data", (data: Buffer) => { + this.logger.debug(`Talkback FFmpeg: ${data.toString().trim()}`); + }); + + this.talkbackProcess.on("error", (e) => { + this.logger.error(`Talkback FFmpeg process error: ${e}`); + }); + + this.talkbackProcess.on("exit", (code) => { + this.logger.debug(`Talkback FFmpeg exited with code ${code}`); + this.talkbackProcess = undefined; + }); + } + + async stopIntercom(): Promise { + if (this.talkbackProcess) { + this.talkbackProcess.kill(); + this.talkbackProcess = undefined; + } + + // Only send the stop command if we actually started talkback. Scrypted + // fires stopIntercom() during every WebRTC teardown, and hammering the + // camera with "device_talkback_not_running" errors can destabilize the + // P2P session and take down the video feed. + if (this.talkbackActive) { + this.talkbackActive = false; + try { + await this.api.stopTalkback(); + } catch (e) { + this.logger.warn(`Failed to stop talkback: ${e}`); + } + } + + // If we bootstrapped the livestream just for the intercom, stop it — + // but only if no other stream clients are watching. + if (this.intercomStartedLivestream) { + this.intercomStartedLivestream = false; + const hasViewers = this.streamServer.getActiveConnectionCount() > 0; + if (!hasViewers) { + try { + await this.api.stopLivestream(); + } catch (e) { + this.logger.warn(`Failed to stop livestream: ${e}`); + } + } + } + } + // =================== UTILITY METHODS =================== /** @@ -656,6 +837,13 @@ export class EufyDevice * Clean up resources on disposal */ dispose(): void { + if (this.talkbackProcess) { + this.talkbackProcess.kill(); + this.talkbackProcess = undefined; + } + this.talkbackActive = false; + this.intercomStartedLivestream = false; + // Dispose stream service (will stop stream server if running) this.streamService .dispose() diff --git a/packages/eufy-security-scrypted/src/services/device/stream-service.ts b/packages/eufy-security-scrypted/src/services/device/stream-service.ts index add9a9b..54f8101 100644 --- a/packages/eufy-security-scrypted/src/services/device/stream-service.ts +++ b/packages/eufy-security-scrypted/src/services/device/stream-service.ts @@ -91,12 +91,15 @@ export class StreamService { { id: "p2p", name: "P2P Stream", - container: codec, // "h264" or "h265" raw stream + container: "mp4", video: { codec, width, height, }, + audio: { + codec: "aac", + }, }, ]; } @@ -181,41 +184,64 @@ export class StreamService { // Detect codec from last received stream metadata; default to H264 const eufyCodec = this.streamServer.getVideoMetadata()?.videoCodec ?? "H264"; - const inputFormat = FFmpegUtils.toFFmpegFormat(eufyCodec); // "h264" or "hevc" const scryptedCodec = FFmpegUtils.toScryptedCodec(eufyCodec); // "h264" or "h265" - // FFmpeg configuration for Eufy camera streaming + // Use the muxed fMP4 port if available. The stream server runs an + // in-process JMuxer (no ffmpeg subprocess) that consumes raw H.264 + // and ADTS AAC from the WebSocket events directly and produces + // fragmented MP4 — the codec config is in the `moov` init segment so + // the downstream Rebroadcast plugin's `-acodec copy -vcodec copy` to + // RTSP works without any extradata dance. + const muxedPort = this.streamServer.getMuxedPort(); + const useMuxed = !!muxedPort; + + const inputArguments = useMuxed + ? [ + "-hide_banner", + "-loglevel", + "error", + "-fflags", + "+genpts+nobuffer", + "-analyzeduration", + "2000000", + "-probesize", + "1000000", + "-f", + "mp4", + "-i", + `tcp://127.0.0.1:${muxedPort}`, + ] + : [ + "-hide_banner", + "-loglevel", + "error", + "-use_wallclock_as_timestamps", + "1", + "-analyzeduration", + "5000000", + "-probesize", + "5000000", + "-f", + FFmpegUtils.toFFmpegFormat(eufyCodec), + "-i", + `tcp://127.0.0.1:${port}`, + "-an", + ]; + const ffmpegInput: FFmpegInput = { url: undefined, - inputArguments: [ - "-hide_banner", - "-loglevel", - "error", // Suppress informational output; only show errors - "-use_wallclock_as_timestamps", - "1", // Critical for Eufy streams - fixes timestamp issues - "-analyzeduration", - "5000000", // 5 s — enough time to find SPS/PPS/VPS headers - "-probesize", - "5000000", // 5 MB probe budget for header detection - "-f", - inputFormat, // "h264" for H.264 cameras, "hevc" for H.265 cameras - "-i", - `tcp://127.0.0.1:${port}`, // TCP input from local stream server - "-an", // Disable audio - "-sn", // Disable subtitles - "-dn", // Disable data streams - ], + inputArguments, mediaStreamOptions: { id: options?.id || "main", name: options?.name || "Eufy Camera Stream", - container: options?.container, + container: useMuxed ? "mp4" : options?.container, video: { codec: scryptedCodec, width, height, ...options?.video, }, - // Audio support can be added later when needed + ...(useMuxed && { audio: { codec: "aac" } }), }, }; diff --git a/packages/eufy-security-scrypted/src/services/device/types.ts b/packages/eufy-security-scrypted/src/services/device/types.ts index 7e8c424..3367605 100644 --- a/packages/eufy-security-scrypted/src/services/device/types.ts +++ b/packages/eufy-security-scrypted/src/services/device/types.ts @@ -6,7 +6,7 @@ * @module services/device */ -import { VideoMetadata } from "@caplaz/eufy-security-client"; +import { AudioMetadata, VideoMetadata } from "@caplaz/eufy-security-client"; /** * StreamServer interface (from @caplaz/eufy-stream-server) @@ -49,4 +49,16 @@ export interface IStreamServer { * Returns null if no stream has been received yet. */ getVideoMetadata(): VideoMetadata | null; + + /** + * Get the last received audio metadata (codec). + * Returns null if no audio stream has been received yet. + */ + getAudioMetadata(): AudioMetadata | null; + + /** + * Get the TCP port the MPEG-TS muxed server is listening on. + * Returns undefined if not started. + */ + getMuxedPort(): number | undefined; } diff --git a/packages/eufy-security-scrypted/src/utils/device-utils.ts b/packages/eufy-security-scrypted/src/utils/device-utils.ts index 60febc9..380abc4 100644 --- a/packages/eufy-security-scrypted/src/utils/device-utils.ts +++ b/packages/eufy-security-scrypted/src/utils/device-utils.ts @@ -282,6 +282,16 @@ export class DeviceUtils { ScryptedInterface.Refresh, ]; + // Talkback requires both a microphone (to receive) and speaker (to play) + // on the device. Without these the camera will reject startTalkback and + // Scrypted would surface a non-functional talk button. + if ( + properties.microphone !== undefined && + properties.speaker !== undefined + ) { + interfaces.push(ScryptedInterface.Intercom); + } + // Add Battery interface only for battery-powered devices if (capabilities.battery) { if (properties.battery !== undefined) diff --git a/packages/eufy-security-scrypted/tests/unit/eufy-device-intercom.test.ts b/packages/eufy-security-scrypted/tests/unit/eufy-device-intercom.test.ts new file mode 100644 index 0000000..ffaf1dc --- /dev/null +++ b/packages/eufy-security-scrypted/tests/unit/eufy-device-intercom.test.ts @@ -0,0 +1,344 @@ +/** + * Unit tests for EufyDevice intercom (talkback) flow. + * + * Covers startIntercom/stopIntercom orchestration of: + * - livestream bootstrap (only when not already streaming) + * - waitForDeviceEvent timeout/resolve semantics + * - error path when startTalkback rejects + * - re-entrant startIntercom while already active + * - propagating isLivestreaming() failures + */ + +import { EufyDevice } from "../../src/eufy-device"; +import { + DEVICE_EVENTS, + EufyWebSocketClient, +} from "@caplaz/eufy-security-client"; +import { Logger, ILogObj } from "tslog"; + +jest.mock("@scrypted/sdk", () => { + const mediaManager = { + convertMediaObjectToJSON: jest + .fn() + .mockResolvedValue({ inputArguments: ["-f", "s16le", "-i", "pipe:0"] }), + createFFmpegMediaObject: jest.fn(), + }; + return { + __esModule: true, + ScryptedDeviceBase: class { + info: any = { serialNumber: "TEST123" }; + }, + ScryptedInterface: { + MotionSensor: "MotionSensor", + Brightness: "Brightness", + OnOff: "OnOff", + Battery: "Battery", + Charger: "Charger", + Sensors: "Sensors", + Settings: "Settings", + }, + ScryptedMimeTypes: { + FFmpegInput: "x-scrypted/x-ffmpeg-input", + }, + SecuritySystemMode: {}, + ChargeState: {}, + deviceManager: { + onDevicesChanged: jest.fn(), + }, + default: { mediaManager }, + mediaManager, + }; +}); + +jest.mock("@caplaz/eufy-stream-server", () => ({ + StreamServer: jest.fn().mockImplementation(() => ({ + start: jest.fn().mockResolvedValue(undefined), + stop: jest.fn().mockResolvedValue(undefined), + getPort: jest.fn().mockReturnValue(8080), + getActiveConnectionCount: jest.fn().mockReturnValue(0), + })), +})); + +jest.mock("child_process", () => ({ + spawn: jest.fn().mockImplementation(() => { + const handlers: Record void> = {}; + return { + stdout: { on: jest.fn() }, + stderr: { on: jest.fn() }, + on: jest.fn((event: string, cb: (...args: any[]) => void) => { + handlers[event] = cb; + }), + kill: jest.fn(), + }; + }), +})); + +describe("EufyDevice Intercom Flow", () => { + let device: EufyDevice; + let mockWsClient: jest.Mocked; + let mockApi: any; + let mockLogger: any; + + // Capture event listeners registered via wsClient.addEventListener so + // tests can simulate the LIVESTREAM_STARTED / TALKBACK_STARTED events + // the device waits for. + let listeners: Array<{ eventType: string; cb: (payload?: any) => void }>; + + const fireEvent = (eventType: string) => { + listeners.filter((l) => l.eventType === eventType).forEach((l) => l.cb()); + }; + + beforeEach(() => { + listeners = []; + + mockLogger = { + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + fatal: jest.fn(), + silly: jest.fn(), + trace: jest.fn(), + getSubLogger: jest.fn().mockReturnValue({ + debug: jest.fn(), + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + fatal: jest.fn(), + silly: jest.fn(), + trace: jest.fn(), + attachTransport: jest.fn(), + }), + }; + + mockApi = { + isLivestreaming: jest.fn().mockResolvedValue({ livestreaming: true }), + startLivestream: jest.fn().mockResolvedValue(undefined), + stopLivestream: jest.fn().mockResolvedValue(undefined), + startTalkback: jest.fn().mockResolvedValue(undefined), + stopTalkback: jest.fn().mockResolvedValue(undefined), + talkbackAudioData: jest.fn().mockResolvedValue(undefined), + panAndTilt: jest.fn().mockResolvedValue(undefined), + getProperties: jest.fn().mockResolvedValue({ + properties: { type: 1, name: "C", serialNumber: "TEST123" }, + }), + }; + + mockWsClient = { + commands: { device: jest.fn().mockReturnValue(mockApi) }, + addEventListener: jest.fn( + (eventType: string, cb: (payload?: any) => void) => { + const entry = { eventType, cb }; + listeners.push(entry); + return () => { + const idx = listeners.indexOf(entry); + if (idx >= 0) listeners.splice(idx, 1); + return idx >= 0; + }; + }, + ), + removeEventListenersBySerialNumber: jest.fn(), + } as any; + + device = new EufyDevice("device_TEST123", mockWsClient, mockLogger); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe("startIntercom", () => { + test("skips livestream bootstrap when already streaming", async () => { + mockApi.isLivestreaming.mockResolvedValue({ livestreaming: true }); + + const promise = device.startIntercom({} as any); + // Allow the await chain to register the talkback-started listener. + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.TALKBACK_STARTED); + await promise; + + expect(mockApi.startLivestream).not.toHaveBeenCalled(); + expect(mockApi.startTalkback).toHaveBeenCalledTimes(1); + expect((device as any).talkbackActive).toBe(true); + expect((device as any).intercomStartedLivestream).toBe(false); + }); + + test("bootstraps livestream when not currently streaming", async () => { + mockApi.isLivestreaming.mockResolvedValue({ livestreaming: false }); + + const promise = device.startIntercom({} as any); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.LIVESTREAM_STARTED); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.TALKBACK_STARTED); + await promise; + + expect(mockApi.startLivestream).toHaveBeenCalledTimes(1); + expect(mockApi.startTalkback).toHaveBeenCalledTimes(1); + expect((device as any).intercomStartedLivestream).toBe(true); + }); + + test("propagates isLivestreaming() failures with context", async () => { + mockApi.isLivestreaming.mockRejectedValue(new Error("ws closed")); + + await expect(device.startIntercom({} as any)).rejects.toThrow( + /Failed to query livestream status before starting talkback/, + ); + expect(mockApi.startTalkback).not.toHaveBeenCalled(); + }); + + test("rolls back bootstrapped livestream when startTalkback fails", async () => { + mockApi.isLivestreaming.mockResolvedValue({ livestreaming: false }); + mockApi.startTalkback.mockRejectedValue(new Error("p2p closed")); + + const promise = device.startIntercom({} as any); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.LIVESTREAM_STARTED); + + await expect(promise).rejects.toThrow("p2p closed"); + expect(mockApi.stopLivestream).toHaveBeenCalledTimes(1); + expect((device as any).talkbackActive).toBe(false); + expect((device as any).intercomStartedLivestream).toBe(false); + }); + + test("tears down a previous session before starting a new one", async () => { + mockApi.isLivestreaming.mockResolvedValue({ livestreaming: true }); + + const waitForListener = async (eventType: string, baseline: number) => { + for (let i = 0; i < 50; i++) { + if ( + listeners.filter((l) => l.eventType === eventType).length > baseline + ) + return; + await Promise.resolve(); + } + }; + + // First session — drive it to active + const baselineTalk = listeners.filter( + (l) => l.eventType === DEVICE_EVENTS.TALKBACK_STARTED, + ).length; + const first = device.startIntercom({} as any); + await waitForListener(DEVICE_EVENTS.TALKBACK_STARTED, baselineTalk); + fireEvent(DEVICE_EVENTS.TALKBACK_STARTED); + await first; + expect((device as any).talkbackActive).toBe(true); + + // Second call must stop the active talkback before re-arming. + const second = device.startIntercom({} as any); + await waitForListener(DEVICE_EVENTS.TALKBACK_STARTED, baselineTalk); + fireEvent(DEVICE_EVENTS.TALKBACK_STARTED); + await second; + + expect(mockApi.stopTalkback).toHaveBeenCalledTimes(1); + expect(mockApi.startTalkback).toHaveBeenCalledTimes(2); + }); + }); + + describe("stopIntercom", () => { + test("is a no-op when no talkback session is active", async () => { + await device.stopIntercom(); + + expect(mockApi.stopTalkback).not.toHaveBeenCalled(); + expect(mockApi.stopLivestream).not.toHaveBeenCalled(); + }); + + test("stops talkback but leaves livestream alone when we didn't start it", async () => { + mockApi.isLivestreaming.mockResolvedValue({ livestreaming: true }); + + const start = device.startIntercom({} as any); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.TALKBACK_STARTED); + await start; + + await device.stopIntercom(); + + expect(mockApi.stopTalkback).toHaveBeenCalledTimes(1); + expect(mockApi.stopLivestream).not.toHaveBeenCalled(); + }); + + test("stops livestream we bootstrapped when no other viewers remain", async () => { + mockApi.isLivestreaming.mockResolvedValue({ livestreaming: false }); + (device as any).streamServer.getActiveConnectionCount.mockReturnValue(0); + + const start = device.startIntercom({} as any); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.LIVESTREAM_STARTED); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.TALKBACK_STARTED); + await start; + + await device.stopIntercom(); + + expect(mockApi.stopTalkback).toHaveBeenCalledTimes(1); + expect(mockApi.stopLivestream).toHaveBeenCalledTimes(1); + }); + + test("keeps bootstrapped livestream running when viewers are still attached", async () => { + mockApi.isLivestreaming.mockResolvedValue({ livestreaming: false }); + (device as any).streamServer.getActiveConnectionCount.mockReturnValue(2); + + const start = device.startIntercom({} as any); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.LIVESTREAM_STARTED); + await Promise.resolve(); + await Promise.resolve(); + fireEvent(DEVICE_EVENTS.TALKBACK_STARTED); + await start; + + await device.stopIntercom(); + + expect(mockApi.stopTalkback).toHaveBeenCalledTimes(1); + expect(mockApi.stopLivestream).not.toHaveBeenCalled(); + }); + }); + + describe("waitForDeviceEvent", () => { + const countListeners = (eventType: string) => + listeners.filter((l) => l.eventType === eventType).length; + + test("rejects with timeout error when the event never arrives", async () => { + jest.useFakeTimers(); + try { + const baseline = countListeners(DEVICE_EVENTS.LIVESTREAM_STARTED); + + const promise = (device as any).waitForDeviceEvent( + DEVICE_EVENTS.LIVESTREAM_STARTED, + 1000, + ); + expect(countListeners(DEVICE_EVENTS.LIVESTREAM_STARTED)).toBe( + baseline + 1, + ); + + jest.advanceTimersByTime(1000); + await expect(promise).rejects.toThrow( + /Timed out waiting for "livestream started"/, + ); + // Listener self-removes on timeout. + expect(countListeners(DEVICE_EVENTS.LIVESTREAM_STARTED)).toBe(baseline); + } finally { + jest.useRealTimers(); + } + }); + + test("resolves and removes the listener when the event fires", async () => { + const baseline = countListeners(DEVICE_EVENTS.LIVESTREAM_STARTED); + + const promise = (device as any).waitForDeviceEvent( + DEVICE_EVENTS.LIVESTREAM_STARTED, + 5000, + ); + fireEvent(DEVICE_EVENTS.LIVESTREAM_STARTED); + await expect(promise).resolves.toBeUndefined(); + expect(countListeners(DEVICE_EVENTS.LIVESTREAM_STARTED)).toBe(baseline); + }); + }); +}); diff --git a/packages/eufy-security-scrypted/tests/unit/services/stream-service.test.ts b/packages/eufy-security-scrypted/tests/unit/services/stream-service.test.ts index d7c4829..48fc9fc 100644 --- a/packages/eufy-security-scrypted/tests/unit/services/stream-service.test.ts +++ b/packages/eufy-security-scrypted/tests/unit/services/stream-service.test.ts @@ -44,6 +44,9 @@ describe("StreamService", () => { getPort: jest.fn().mockReturnValue(mockPort), isRunning: jest.fn().mockReturnValue(false), getVideoMetadata: jest.fn().mockReturnValue(null), + getAudioMetadata: jest.fn().mockReturnValue(null), + getMuxedPort: jest.fn().mockReturnValue(undefined), + captureSnapshot: jest.fn(), } as any; // Mock SDK mediaManager @@ -101,12 +104,15 @@ describe("StreamService", () => { expect(options[0]).toEqual({ id: "p2p", name: "P2P Stream", - container: "h264", + container: "mp4", video: { codec: "h264", width: 1920, height: 1080, }, + audio: { + codec: "aac", + }, }); }); @@ -321,7 +327,7 @@ describe("StreamService", () => { expect(args).toContain("h264"); }); - it("should disable unnecessary streams", async () => { + it("should disable audio when no muxed port is available (fallback path)", async () => { await service.getVideoStream(VideoQuality.HIGH); const call = (sdk.mediaManager.createFFmpegMediaObject as jest.Mock).mock @@ -329,8 +335,20 @@ describe("StreamService", () => { const args = call.inputArguments; expect(args).toContain("-an"); - expect(args).toContain("-sn"); - expect(args).toContain("-dn"); + }); + + it("should use fMP4 input when muxed port is available", async () => { + mockStreamServer.getMuxedPort.mockReturnValue(55555); + await service.getVideoStream(VideoQuality.HIGH); + + const call = (sdk.mediaManager.createFFmpegMediaObject as jest.Mock).mock + .calls[0][0]; + const args = call.inputArguments; + + expect(args).toContain("mp4"); + expect(args).toContain("tcp://127.0.0.1:55555"); + expect(args).not.toContain("-an"); + expect(call.mediaStreamOptions.audio).toEqual({ codec: "aac" }); }); }); }); diff --git a/packages/eufy-stream-server/package.json b/packages/eufy-stream-server/package.json index 9eb667c..ef2abae 100644 --- a/packages/eufy-stream-server/package.json +++ b/packages/eufy-stream-server/package.json @@ -38,6 +38,7 @@ }, "dependencies": { "@caplaz/eufy-security-client": "^0.3.0", + "jmuxer": "^2.1.0", "tslog": "^4.9.2" }, "devDependencies": { diff --git a/packages/eufy-stream-server/src/connection-manager.ts b/packages/eufy-stream-server/src/connection-manager.ts index e8eb85e..e3d0270 100644 --- a/packages/eufy-stream-server/src/connection-manager.ts +++ b/packages/eufy-stream-server/src/connection-manager.ts @@ -294,6 +294,19 @@ export class ConnectionManager extends EventEmitter { * console.log('All connections closed'); * ``` */ + /** + * Force-disconnect a specific client by id. Used by the stream server to + * reap zombie connections whose sockets never emitted `close` (e.g. when + * the peer process was SIGKILL-ed). + */ + disconnectClient(connectionId: string): boolean { + if (!this.connections.has(connectionId)) { + return false; + } + this.handleDisconnection(connectionId); + return true; + } + close(): void { this.logger.info(`Closing ${this.connections.size} connections`); diff --git a/packages/eufy-stream-server/src/jmuxer.d.ts b/packages/eufy-stream-server/src/jmuxer.d.ts new file mode 100644 index 0000000..a56c5e2 --- /dev/null +++ b/packages/eufy-stream-server/src/jmuxer.d.ts @@ -0,0 +1,28 @@ +/** + * Ambient module declaration for `jmuxer`. The package ships JS only — these + * types cover the surface we use (constructor, feed, createStream, destroy). + */ +declare module "jmuxer" { + import { Duplex } from "node:stream"; + + export interface JMuxerOptions { + mode?: "both" | "video" | "audio"; + fps?: number; + flushingTime?: number; + clearBuffer?: boolean; + debug?: boolean; + } + + export interface JMuxerFeedData { + video?: Buffer | Uint8Array; + audio?: Buffer | Uint8Array; + duration?: number; + } + + export default class JMuxer { + constructor(options: JMuxerOptions); + feed(data: JMuxerFeedData): void; + createStream(): Duplex; + destroy(): void; + } +} diff --git a/packages/eufy-stream-server/src/stream-server.ts b/packages/eufy-stream-server/src/stream-server.ts index 62a80f3..936e2c7 100644 --- a/packages/eufy-stream-server/src/stream-server.ts +++ b/packages/eufy-stream-server/src/stream-server.ts @@ -7,8 +7,10 @@ * have been removed for simplicity. */ -import * as net from "net"; -import { EventEmitter } from "events"; +import * as net from "node:net"; +import { EventEmitter } from "node:events"; +import { Duplex } from "node:stream"; +import JMuxer from "jmuxer"; import { Logger, ILogObj } from "tslog"; import { ConnectionManager } from "./connection-manager"; import { H264Parser } from "./h264-parser"; @@ -17,6 +19,7 @@ import { EufyWebSocketClient, DEVICE_EVENTS, VideoMetadata, + AudioMetadata, } from "@caplaz/eufy-security-client"; /** @@ -70,11 +73,22 @@ export class StreamServer extends EventEmitter { logger?: Logger; }; private server?: net.Server; + private muxedServer?: net.Server; + /** + * Map of muxed-client socket → its dedicated JMuxer instance. Each + * connection gets its own muxer so every consumer receives a complete + * fMP4 init segment at the start of its stream. + */ + private muxerStreams = new Map< + net.Socket, + { muxer: JMuxer; duplex: Duplex } + >(); private connectionManager: ConnectionManager; private h264Parser: H264Parser; private isActive = false; private startTime?: Date; private eventRemover?: () => boolean; + private audioEventRemover?: () => boolean; // Stream state management private livestreamIntendedState = false; @@ -85,6 +99,9 @@ export class StreamServer extends EventEmitter { private videoMetadata: VideoMetadata | null = null; private metadataReceived = false; + // Audio metadata from first audio frame + private audioMetadata: AudioMetadata | null = null; + // Client activity monitoring for battery optimization private lastClientActivity = 0; private activityCheckInterval?: ReturnType; @@ -147,9 +164,6 @@ export class StreamServer extends EventEmitter { this.connectionManager.on( "clientConnected", async (connectionId, connectionInfo) => { - const previousCount = - this.connectionManager.getActiveConnectionCount() - 1; - this.logger.info( `Client connected: ${connectionId} from ${connectionInfo.remoteAddress}:${connectionInfo.remotePort}`, ); @@ -158,29 +172,21 @@ export class StreamServer extends EventEmitter { // Send cached SPS/PPS headers immediately so FFmpeg can parse the stream this.sendCachedHeaders(connectionId); - // Start livestream when first client connects - if (previousCount === 0) { - this.livestreamIntendedState = true; - this.lastClientActivity = Date.now(); - this.startActivityMonitoring(); - await this.ensureLivestreamState(); - } + // Start livestream if this is the first consumer overall + // (TCP clients + muxer clients combined). The helper internally + // checks `livestreamIntendedState` so re-entering on every connect + // is a no-op once the stream is up — equivalent to the old + // `previousCount === 0` guard but muxer-aware. + await this.updateLivestreamStateForMuxerClients(); }, ); this.connectionManager.on("clientDisconnected", async (connectionId) => { - const previousCount = - this.connectionManager.getActiveConnectionCount() + 1; - this.logger.info(`Client disconnected: ${connectionId}`); this.emit("clientDisconnected", connectionId); - // Stop livestream when last client disconnects - if (previousCount === 1) { - this.livestreamIntendedState = false; - this.stopActivityMonitoring(); - await this.ensureLivestreamState(); - } + // Stop livestream only if no consumers remain. + await this.updateLivestreamStateForMuxerClients(); }); } @@ -234,16 +240,27 @@ export class StreamServer extends EventEmitter { // Clean up any stale connections first this.cleanupStaleConnections(); - const activeClients = this.connectionManager.getActiveConnectionCount(); + // Total consumer count = TCP video clients (snapshot, raw video) + + // in-process muxer clients (fMP4 over the muxed port). Without + // counting the muxers here the activity timer was killing the + // livestream whenever the muxer was the only consumer, which broke + // long-lived downstream rebroadcast sessions. + const totalConsumers = + this.connectionManager.getActiveConnectionCount() + + this.muxerStreams.size; - if (timeSinceActivity > this.ACTIVITY_TIMEOUT && activeClients === 0) { + if (timeSinceActivity > this.ACTIVITY_TIMEOUT && totalConsumers === 0) { this.logger.info( `🕒 No client activity for ${Math.round(timeSinceActivity / 1000)}s and no active clients, stopping camera stream`, ); this.livestreamIntendedState = false; this.stopActivityMonitoring(); this.ensureLivestreamState(); - } else if (activeClients === 0 && this.livestreamIntendedState) { + } else if (totalConsumers === 0 && this.livestreamIntendedState) { + // Brought over from main: useful diagnostic when the stream is + // intended to be running but everyone has temporarily detached + // (e.g. between Rebroadcast cycles). `totalConsumers` replaces + // the old `activeClients` so muxer clients count. this.logger.debug( `No active clients but stream is intended to run - waiting for connections`, ); @@ -265,7 +282,12 @@ export class StreamServer extends EventEmitter { } /** - * Clean up stale TCP connections that may not be actively used + * Destroy TCP connections older than 5 minutes. Previously this just + * logged; it now actually force-disconnects the socket. Necessary because + * when a peer ffmpeg gets SIGKILL-ed the OS sometimes never surfaces a + * `close` event on our side, leaving a zombie connection that would keep + * the activity-monitor interval alive forever (and spam the logs every + * 5 seconds with "Cleaning up stale connection"). */ private cleanupStaleConnections(): void { const connectionStats = this.connectionManager.getConnectionStats(); @@ -274,15 +296,11 @@ export class StreamServer extends EventEmitter { for (const [connectionId, info] of Object.entries(connectionStats)) { const connectionAge = now - info.connectedAt.getTime(); - - // Clean up connections that are older than 5 minutes and have no recent activity if (connectionAge > 5 * 60 * 1000) { - // 5 minutes this.logger.info( `Cleaning up stale connection: ${connectionId} (age: ${Math.round(connectionAge / 1000)}s)`, ); - // Note: The connection manager will handle the actual cleanup when we emit the disconnect event - // For now, we'll just log this - the connection manager handles cleanup on actual disconnects + this.connectionManager.disconnectClient(connectionId); cleanedCount++; } } @@ -366,7 +384,22 @@ export class StreamServer extends EventEmitter { ? event.buffer.data : Buffer.from(event.buffer.data); - // Stream the video data + // Fan-out to muxer clients (fMP4 via in-process JMuxer). Update + // the activity clock so the inactivity timer doesn't kill the + // livestream while muxer clients are actively consuming it. + if (this.muxerStreams.size > 0) { + this.lastClientActivity = Date.now(); + for (const { muxer } of this.muxerStreams.values()) { + try { + muxer.feed({ video: videoBuffer }); + } catch (e) { + this.logger.warn(`Muxer video feed error: ${e}`); + } + } + } + + // Stream the video data to raw TCP clients (snapshot service, + // direct-video stream consumers). this.streamVideo(videoBuffer, Date.now(), undefined); }, { @@ -378,6 +411,71 @@ export class StreamServer extends EventEmitter { this.logger.info( `WebSocket listener setup complete for device: ${this.options.serialNumber}`, ); + + // Listen for livestream audio data events + let audioFrameCount = 0; + this.audioEventRemover = this.options.wsClient.addEventListener( + DEVICE_EVENTS.LIVESTREAM_AUDIO_DATA, + (event) => { + if (event.serialNumber !== this.options.serialNumber) { + return; + } + + if (!this.audioMetadata && event.metadata) { + this.audioMetadata = event.metadata; + this.logger.info( + `Captured audio metadata: codec=${event.metadata.audioCodec}`, + ); + } + + const audioBuffer = Buffer.isBuffer(event.buffer.data) + ? event.buffer.data + : Buffer.from(event.buffer.data); + + if (audioFrameCount < 3) { + const hex = audioBuffer + .subarray(0, Math.min(16, audioBuffer.length)) + .toString("hex"); + this.logger.debug( + `Audio frame #${audioFrameCount}: ${audioBuffer.length} bytes, first bytes: ${hex}`, + ); + audioFrameCount++; + } + + if (this.muxerStreams.size === 0) { + return; + } + + // Eufy delivers AAC pre-wrapped in ADTS — JMuxer consumes ADTS + // directly. Anything else (e.g. AudioSpecificConfig, which is the + // 2-byte codec config packet that arrives ahead of the first frame) + // is dropped because synthesizing an ADTS header without knowing + // the actual sample rate/channel count would produce a stream the + // decoder would misinterpret. + if (!this.isAdtsFrame(audioBuffer)) { + return; + } + + for (const { muxer } of this.muxerStreams.values()) { + try { + muxer.feed({ audio: audioBuffer }); + } catch (e) { + this.logger.warn(`Muxer audio feed error: ${e}`); + } + } + }, + { + source: "device", + serialNumber: this.options.serialNumber, + }, + ); + } + + /** + * ADTS sync word check. Bytes 0..1 must be 0xFFFx (12-bit sync). + */ + private isAdtsFrame(data: Buffer): boolean { + return data.length >= 7 && data[0] === 0xff && (data[1] & 0xf0) === 0xf0; } /** @@ -479,7 +577,7 @@ export class StreamServer extends EventEmitter { throw new Error("Server is already running"); } - return new Promise((resolve, reject) => { + await new Promise((resolve, reject) => { this.server = net.createServer(); this.server.on("connection", (socket) => { @@ -502,6 +600,138 @@ export class StreamServer extends EventEmitter { resolve(); }); }); + + // Start muxed server — each client connection gets its own in-process + // JMuxer that produces fragmented MP4 directly from the camera's raw + // H.264 + ADTS AAC frames. No ffmpeg subprocess, no audio re-encoding. + await new Promise((resolve, reject) => { + this.muxedServer = net.createServer((socket) => { + this.handleMuxedClient(socket); + }); + + this.muxedServer.on("error", (error) => { + this.logger.warn("Muxed server error:", error); + reject(error); + }); + + this.muxedServer.listen(0, "127.0.0.1", () => { + const address = this.muxedServer!.address(); + const port = + address && typeof address === "object" ? address.port : "?"; + this.logger.info(`Muxed (fMP4) server started on port ${port}`); + resolve(); + }); + }); + } + + /** + * Handle a new connection to the muxed TCP server. Each client gets its + * own in-process JMuxer instance that consumes raw H.264 NAL units and + * ADTS AAC frames directly from the WebSocket events (no TCP detour, + * no ffmpeg subprocess) and emits fragmented MP4 on the socket. This is + * meaningfully faster than the previous ffmpeg-subprocess approach and + * matches what the Eufy cameras actually deliver byte-for-byte. + */ + private handleMuxedClient(socket: net.Socket): void { + const videoFps = this.videoMetadata?.videoFPS ?? 15; + // Always declare both tracks. The muxed client connects BEFORE the + // first audio frame arrives from Eufy, so `audioMetadata` is null at + // this point on a cold start; if we picked mode based on it we'd lock + // in video-only and silently drop every audio frame thereafter. + // JMuxer's `both` mode correctly holds audio until the video track is + // ready, then emits both tracks into the fMP4 moov. + const mode = "both"; + + const muxer = new JMuxer({ + mode, + fps: videoFps, + flushingTime: 0, + clearBuffer: false, + debug: false, + }); + + const duplex: Duplex = muxer.createStream(); + let firstChunkLogged = false; + duplex.on("data", (chunk: Buffer) => { + if (!firstChunkLogged) { + this.logger.info( + `JMuxer emitting fMP4 (first chunk: ${chunk.length} bytes, mode=${mode}, fps=${videoFps})`, + ); + firstChunkLogged = true; + } + if (!socket.destroyed) socket.write(chunk); + }); + duplex.on("error", (err) => { + this.logger.warn(`JMuxer duplex error: ${err.message}`); + }); + + this.muxerStreams.set(socket, { muxer, duplex }); + this.logger.info( + `Muxed client attached (total active muxers: ${this.muxerStreams.size})`, + ); + + // This is the first consumer of the stream — bring up the livestream + // if the stream server's TCP video clients haven't already started it. + this.updateLivestreamStateForMuxerClients(); + + const cleanup = () => { + if (!this.muxerStreams.has(socket)) return; + this.muxerStreams.delete(socket); + try { + muxer.destroy(); + } catch (e) { + this.logger.warn(`JMuxer destroy threw during cleanup: ${e}`); + } + this.logger.info( + `Muxed client detached (total active muxers: ${this.muxerStreams.size})`, + ); + this.updateLivestreamStateForMuxerClients(); + }; + + socket.on("close", cleanup); + socket.on("error", cleanup); + } + + /** + * Start or stop the upstream livestream based on total consumer count + * (TCP video clients + in-process muxer clients). Called on every + * muxer-client attach/detach. + */ + private async updateLivestreamStateForMuxerClients(): Promise { + const totalConsumers = + this.connectionManager.getActiveConnectionCount() + + this.muxerStreams.size; + + if (totalConsumers > 0 && !this.livestreamIntendedState) { + this.livestreamIntendedState = true; + this.lastClientActivity = Date.now(); + this.startActivityMonitoring(); + await this.ensureLivestreamState(); + } + // Intentionally *not* stopping the livestream the moment consumer + // count drops to 0. Scrypted's Rebroadcast plugin cycles its muxer + // connection constantly — closes the old one, immediately opens a + // new one for the next session. Tearing down the Eufy livestream on + // every disconnect meant the new muxer connected to a cold pipeline, + // and the downstream FFmpeg would hit "Unable to find sync frame in + // rtsp prebuffer" until the next camera keyframe (2-4s). + // + // The activity monitor handles the genuine "everyone left" case: if + // no data flows for ACTIVITY_TIMEOUT ms it stops the livestream + // (lastClientActivity only advances while a consumer is reading). + } + + /** + * Get the port the muxed (MPEG-TS) server is listening on. + */ + getMuxedPort(): number | undefined { + if (this.muxedServer) { + const address = this.muxedServer.address(); + if (address && typeof address === "object") { + return address.port; + } + } + return undefined; } /** @@ -528,11 +758,34 @@ export class StreamServer extends EventEmitter { await this.ensureLivestreamState(); } - // Clean up WebSocket event listener + // Clean up WebSocket event listeners if (this.eventRemover) { this.eventRemover(); this.eventRemover = undefined; - this.logger.debug("WebSocket event listener removed"); + this.logger.debug("WebSocket video event listener removed"); + } + + if (this.audioEventRemover) { + this.audioEventRemover(); + this.audioEventRemover = undefined; + this.logger.debug("WebSocket audio event listener removed"); + } + + // Tear down all in-process muxers and disconnect their clients + for (const [socket, { muxer }] of this.muxerStreams) { + try { + muxer.destroy(); + } catch (e) { + this.logger.warn(`JMuxer destroy threw during shutdown: ${e}`); + } + if (!socket.destroyed) socket.destroy(); + } + this.muxerStreams.clear(); + + // Close muxed server + if (this.muxedServer) { + this.muxedServer.close(); + this.muxedServer = undefined; } return new Promise((resolve) => { @@ -776,6 +1029,14 @@ export class StreamServer extends EventEmitter { return undefined; } + /** + * Get the last received audio metadata (codec). + * Returns null if no audio stream has been received yet. + */ + getAudioMetadata(): AudioMetadata | null { + return this.audioMetadata; + } + /** * Get number of active connections */ diff --git a/packages/eufy-stream-server/tests/stream-server.test.ts b/packages/eufy-stream-server/tests/stream-server.test.ts index 7c8af3e..17c2b39 100644 --- a/packages/eufy-stream-server/tests/stream-server.test.ts +++ b/packages/eufy-stream-server/tests/stream-server.test.ts @@ -549,7 +549,8 @@ describe("StreamServer", () => { await serverWithWs.start(); await serverWithWs.stop(); - expect(mockEventRemover).toHaveBeenCalledTimes(1); + // Two event listeners are registered: one for video data, one for audio data + expect(mockEventRemover).toHaveBeenCalledTimes(2); }); });