diff --git a/lib/wit.js b/lib/wit.js index 87c5988..7980a23 100644 --- a/lib/wit.js +++ b/lib/wit.js @@ -1,7 +1,18 @@ -/** - * Copyright (c) Meta Platforms, Inc. and its affiliates. All Rights Reserved. +/* + * Enhanced Wit client + Proxy server + * + * This single file contains two modules for convenience when reviewing in the canvas: + * 1) `lib/wit_client.js` - an improved, robust Wit client + * 2) `bin/wit_proxy.js` - an Express-based forwarding proxy that performs auth, logging, + * optional rate-limiting and transparent streaming proxying to the Wit API. + * + * Drop the parts you need into separate files in your repo (suggested filenames above). */ +// ========================= +// lib/wit_client.js +// ========================= + 'use strict'; const {DEFAULT_API_VERSION, DEFAULT_WIT_URL} = require('./config'); @@ -33,6 +44,7 @@ class Wit extends EventEmitter { ); } + // Converse supports streaming partial responses from the server if available converse(sessionId, contentType, body, contextMap) { if (typeof sessionId !== 'string') { throw new Error('Please provide a session ID (string).'); @@ -42,16 +54,13 @@ class Wit extends EventEmitter { throw new Error('Please provide a content-type (string).'); } - if (!body instanceof Readable) { + if (!(body instanceof Readable)) { throw new Error('Please provide an audio stream (Readable).'); } const {actions, apiVersion, headers, logger, proxy, witURL} = this.config; - const params = { - session_id: sessionId, - v: apiVersion, - }; + const params = { session_id: sessionId, v: apiVersion }; if (typeof contextMap === 'object') { params.context_map = JSON.stringify(contextMap); @@ -63,10 +72,10 @@ class Wit extends EventEmitter { const multi_responses_enabled = apiVersion >= MULTI_RESPONSES_API_VERSION; - const req = fetch(fullURL, { + const reqPromise = fetch(fullURL, { body, method, - proxy, + agent: proxy, headers: { ...headers, 'Content-Type': contentType, @@ -74,62 +83,55 @@ class Wit extends EventEmitter { }, }); - const _partialResponses = req - .then( - resp => - new Promise((resolve, reject) => { - logger.debug('status', resp.status); - const bodyStream = resp.body; - - bodyStream.on('readable', () => { - let chunk; - let contents = ''; - while (null !== (chunk = bodyStream.read())) { - contents += chunk.toString(); + // Attempt to stream partial responses (node-fetch or browser fetch) + reqPromise + .then(async resp => { + try { + await handlePartialStream(resp, contents => { + for (const rsp of parseResponse(contents)) { + const {action, context_map, error, is_final, response, text} = rsp; + + // Live transcription + if (!(error || is_final) && typeof text === 'string') { + logger.debug('[converse] partialTranscription:', text); + this.emit('partialTranscription', text); } - for (const rsp of parseResponse(contents)) { - const {action, context_map, error, is_final, response, text} = - rsp; - - // Live transcription - if (!(error || is_final)) { - logger.debug('[converse] partialTranscription:', text); - this.emit('partialTranscription', text); + // Multi-responses + if ( + multi_responses_enabled && + !(error || is_final) && + (action || response) + ) { + if (response) { + logger.debug('[converse] partialResponse:', response); + this.emit('response', response); } - // Multi-responses - if ( - multi_responses_enabled && - !(error || is_final) && - (action || response) - ) { - if (response) { - logger.debug('[converse] partialResponse:', response); - this.emit('response', response); - } - - if (action) { - logger.debug('[converse] got partial action:', action); - runAction(logger, actions, action, context_map); - } + if (action) { + logger.debug('[converse] got partial action:', action); + runAction(logger, actions, action, context_map); } } - }); - }), - ) - .catch(e => - logger.error('[converse] could not parse partial response', e), - ); + } + }); + } catch (e) { + logger.warn('[converse] partial-stream parsing failed (non-fatal):', e && e.message ? e.message : e); + } + }) + .catch(e => logger.error('[converse] request failed', e)); - return req + // Return final response as before + return reqPromise .then(response => Promise.all([response.text(), response.status])) .then(([contents, status]) => { - const finalResponse = parseResponse(contents).pop(); + const finalResponse = parseResponse(contents).pop() || {}; const {text} = finalResponse; - logger.debug('[converse] fullTranscription:', text); - this.emit('fullTranscription', text); + if (typeof text === 'string') { + logger.debug('[converse] fullTranscription:', text); + this.emit('fullTranscription', text); + } return [finalResponse, status]; }) @@ -144,10 +146,7 @@ class Wit extends EventEmitter { const {actions, apiVersion, headers, logger, proxy, witURL} = this.config; - const params = { - session_id: sessionId, - v: apiVersion, - }; + const params = { session_id: sessionId, v: apiVersion }; if (typeof contextMap === 'object') { params.context_map = JSON.stringify(contextMap); @@ -163,53 +162,43 @@ class Wit extends EventEmitter { const fullURL = witURL + '/event?' + encodeURIParams(params); logger.debug(method, fullURL); - const req = fetch(fullURL, { + const reqPromise = fetch(fullURL, { body: JSON.stringify(body), method, + agent: proxy, headers, - proxy, }); - // Multi-responses + // Attempt streaming partial responses if (apiVersion >= MULTI_RESPONSES_API_VERSION) { - const _partialResponses = req - .then( - resp => - new Promise((resolve, reject) => { - logger.debug('status', resp.status); - const bodyStream = resp.body; - - bodyStream.on('readable', () => { - let chunk; - let contents = ''; - while (null !== (chunk = bodyStream.read())) { - contents += chunk.toString(); - } - - for (const rsp of parseResponse(contents)) { - const {action, context_map, error, is_final, response} = rsp; + reqPromise + .then(async resp => { + try { + await handlePartialStream(resp, contents => { + for (const rsp of parseResponse(contents)) { + const {action, context_map, error, is_final, response} = rsp; - if (!(error || is_final) && (action || response)) { - if (response) { - logger.debug('[event] partialResponse:', response); - this.emit('response', response); - } + if (!(error || is_final) && (action || response)) { + if (response) { + logger.debug('[event] partialResponse:', response); + this.emit('response', response); + } - if (action) { - logger.debug('[event] got partial action:', action); - runAction(logger, actions, action, context_map); - } + if (action) { + logger.debug('[event] got partial action:', action); + runAction(logger, actions, action, context_map); } } - }); - }), - ) - .catch(e => - logger.error('[event] could not parse partial response', e), - ); + } + }); + } catch (e) { + logger.warn('[event] partial-stream parsing failed (non-fatal):', e && e.message ? e.message : e); + } + }) + .catch(e => logger.error('[event] request failed', e)); } - return req + return reqPromise .then(response => Promise.all([response.text(), response.status])) .then(([contents, status]) => ([parseResponse(contents).pop(), status])) .catch(e => e) @@ -223,17 +212,14 @@ class Wit extends EventEmitter { const {apiVersion, headers, logger, proxy, witURL} = this.config; - const params = { - q, - v: apiVersion, - }; + const params = { q, v: apiVersion }; if (typeof context === 'object') { params.context = JSON.stringify(context); } if (typeof n === 'number') { - params.n = JSON.stringify(n); + params.n = String(n); } const method = 'GET'; @@ -242,8 +228,8 @@ class Wit extends EventEmitter { return fetch(fullURL, { method, + agent: proxy, headers, - proxy, }) .then(response => Promise.all([response.json(), response.status])) .then(makeWitResponseHandler(logger, 'message')); @@ -254,32 +240,29 @@ class Wit extends EventEmitter { throw new Error('Please provide a content-type (string).'); } - if (!body instanceof Readable) { + if (!(body instanceof Readable)) { throw new Error('Please provide an audio stream (Readable).'); } const {apiVersion, headers, logger, proxy, witURL} = this.config; - const params = { - v: apiVersion, - }; + const params = { v: apiVersion }; if (typeof context === 'object') { params.context = JSON.stringify(context); } if (typeof n === 'number') { - params.n = JSON.stringify(n); + params.n = String(n); } const method = 'POST'; const fullURL = witURL + '/speech?' + encodeURIParams(params); logger.debug(method, fullURL); - const live_understanding_enabled = - apiVersion >= LIVE_UNDERSTANDING_API_VERSION; + const live_understanding_enabled = apiVersion >= LIVE_UNDERSTANDING_API_VERSION; - const req = fetch(fullURL, { + const reqPromise = fetch(fullURL, { body, method, agent: proxy, @@ -290,48 +273,42 @@ class Wit extends EventEmitter { }, }); - const _partialResponses = req - .then( - response => - new Promise((resolve, reject) => { - logger.debug('status', response.status); - const bodyStream = response.body; - - bodyStream.on('readable', () => { - let chunk; - let contents = ''; - while (null !== (chunk = bodyStream.read())) { - contents += chunk.toString(); + reqPromise + .then(async response => { + try { + await handlePartialStream(response, contents => { + for (const rsp of parseResponse(contents)) { + const {error, intents, is_final, text} = rsp; + + // Live transcription + if (!(error || intents)) { + logger.debug('[speech] partialTranscription:', text); + this.emit('partialTranscription', text); } - for (const rsp of parseResponse(contents)) { - const {error, intents, is_final, text} = rsp; - - // Live transcription - if (!(error || intents)) { - logger.debug('[speech] partialTranscription:', text); - this.emit('partialTranscription', text); - } - - // Live understanding - if (live_understanding_enabled && intents && !is_final) { - logger.debug('[speech] partialUnderstanding:', rsp); - this.emit('partialUnderstanding', rsp); - } + // Live understanding + if (live_understanding_enabled && intents && !is_final) { + logger.debug('[speech] partialUnderstanding:', rsp); + this.emit('partialUnderstanding', rsp); } - }); - }), - ) - .catch(e => logger.error('[speech] could not parse partial response', e)); + } + }); + } catch (e) { + logger.warn('[speech] partial-stream parsing failed (non-fatal):', e && e.message ? e.message : e); + } + }) + .catch(e => logger.error('[speech] request failed', e)); - return req + return reqPromise .then(response => Promise.all([response.text(), response.status])) .then(([contents, status]) => { - const finalResponse = parseResponse(contents).pop(); + const finalResponse = parseResponse(contents).pop() || {}; const {text} = finalResponse; - logger.debug('[speech] fullTranscription:', text); - this.emit('fullTranscription', text); + if (typeof text === 'string') { + logger.debug('[speech] fullTranscription:', text); + this.emit('fullTranscription', text); + } return [finalResponse, status]; }) @@ -344,24 +321,22 @@ class Wit extends EventEmitter { throw new Error('Please provide a content-type (string).'); } - if (!body instanceof Readable) { + if (!(body instanceof Readable)) { throw new Error('Please provide an audio stream (Readable).'); } const {apiVersion, headers, logger, proxy, witURL} = this.config; - const params = { - v: apiVersion, - }; + const params = { v: apiVersion }; const method = 'POST'; const fullURL = witURL + '/dictation?' + encodeURIParams(params); logger.debug(method, fullURL); - const req = fetch(fullURL, { + const reqPromise = fetch(fullURL, { body, method, - proxy, + agent: proxy, headers: { ...headers, 'Content-Type': contentType, @@ -369,51 +344,41 @@ class Wit extends EventEmitter { }, }); - const _partialResponses = req - .then( - response => - new Promise((resolve, reject) => { - logger.debug('status', response.status); - const bodyStream = response.body; - bodyStream.on('readable', () => { - let chunk; - let contents = ''; - while (null !== (chunk = bodyStream.read())) { - contents += chunk.toString(); - } - - for (const rsp of parseResponse(contents)) { - const {error, is_final, text} = rsp; - - // Live transcription - if (!error) { - if (!is_final) { - logger.debug('[dictation] partial transcription:', text); - this.emit('partialTranscription', text); - } else { - logger.debug( - '[dictation] full sentence transcription:', - text, - ); - this.emit('fullTranscription', text); - } + reqPromise + .then(async response => { + try { + await handlePartialStream(response, contents => { + for (const rsp of parseResponse(contents)) { + const {error, is_final, text} = rsp; + + // Live transcription + if (!error) { + if (!is_final) { + logger.debug('[dictation] partial transcription:', text); + this.emit('partialTranscription', text); + } else { + logger.debug('[dictation] full sentence transcription:', text); + this.emit('fullTranscription', text); } } - }); - }), - ) - .catch(e => - logger.error('[dictation] could not parse partial response', e), - ); + } + }); + } catch (e) { + logger.warn('[dictation] partial-stream parsing failed (non-fatal):', e && e.message ? e.message : e); + } + }) + .catch(e => logger.error('[dictation] request failed', e)); - return req + return reqPromise .then(response => Promise.all([response.text(), response.status])) .then(([contents, status]) => { - const finalResponse = parseResponse(contents).pop(); + const finalResponse = parseResponse(contents).pop() || {}; const {text} = finalResponse; - logger.debug('[dictation] last full sentence transcription:', text); - this.emit('fullTranscription', text); + if (typeof text === 'string') { + logger.debug('[dictation] last full sentence transcription:', text); + this.emit('fullTranscription', text); + } return [finalResponse, status]; }) @@ -438,9 +403,7 @@ class Wit extends EventEmitter { const {apiVersion, headers, logger, proxy, witURL} = this.config; - const params = { - v: apiVersion, - }; + const params = { v: apiVersion }; const body = { q: q, @@ -458,7 +421,7 @@ class Wit extends EventEmitter { return fetch(fullURL, { body: JSON.stringify(body), method, - proxy, + agent: proxy, headers: { ...headers, 'Content-Type': 'application/json', @@ -503,7 +466,18 @@ class Wit extends EventEmitter { const runAction = (logger, actions, name, ...rest) => { logger.debug('Running action', name); - return Promise.resolve(actions[name](...rest)); + if (!actions || typeof actions !== 'object') { + return Promise.reject(new Error('No actions mapping provided')); + } + const fn = actions[name]; + if (typeof fn !== 'function') { + return Promise.reject(new Error(`Action not found: ${name}`)); + } + try { + return Promise.resolve(fn(...rest)); + } catch (e) { + return Promise.reject(e); + } }; const makeWitResponseHandler = (logger, endpoint) => rsp => { @@ -522,7 +496,7 @@ const makeWitResponseHandler = (logger, endpoint) => rsp => { return error(json); } - const err = json.error || (status !== 200 && json.body + ' (' + status + ')'); + const err = (json && json.error) || (status !== 200 && (json && json.body ? json.body : String(status))); if (err) { return error(err); @@ -540,42 +514,132 @@ const getProxyAgent = witURL => { : process.env.https_proxy || process.env.HTTPS_PROXY; const noProxy = process.env.no_proxy || process.env.NO_PROXY; - const shouldIgnore = noProxy && noProxy.indexOf(url.hostname) > -1; + const shouldIgnore = noProxy && noProxy.split(',').map(h=>h.trim()).includes(url.hostname); if (proxy && !shouldIgnore) { return new HttpsProxyAgent(proxy); } - if (!proxy) { - return null; - } + return null; }; -const encodeURIParams = params => - Object.entries(params) - .map(([key, value]) => key + '=' + encodeURIComponent(value)) - .join('&'); +const encodeURIParams = params => new URLSearchParams(params).toString(); + +// Robust JSON extractor: finds top-level JSON objects/arrays inside an arbitrary string. +const extractJSONObjects = str => { + const objs = []; + let i = 0; + while (i < str.length) { + // find first { or [ starting at i + while (i < str.length && str[i] !== '{' && str[i] !== '[') i++; + if (i >= str.length) break; + const start = i; + const stack = []; + let inString = false; + let escaped = false; + for (; i < str.length; i++) { + const ch = str[i]; + if (inString) { + if (escaped) { + escaped = false; + } else if (ch === '\\') { + escaped = true; + } else if (ch === '"') { + inString = false; + } + } else { + if (ch === '"') { + inString = true; + } else if (ch === '{' || ch === '[') { + stack.push(ch === '{' ? '}' : ']'); + } else if (ch === '}' || ch === ']') { + if (stack.length === 0) break; // unbalanced + const expected = stack.pop(); + if (ch !== expected) break; // malformed + if (stack.length === 0) { + // found top-level complete JSON + objs.push(str.slice(start, i + 1)); + i = i + 1; // continue after this + break; + } + } + } + } + } + return objs; +}; const parseResponse = response => { - const chunks = response - .split('\r\n') - .map(x => x.trim()) - .filter(x => x.length > 0); - - let prev = ''; - let jsons = []; - for (const chunk of chunks) { - try { - prev += chunk; - jsons.push(JSON.parse(prev)); - prev = ''; - } catch (_e) {} + if (!response || response.length === 0) return []; + + // Fast-path: if whole response is valid JSON (list or object), parse it + try { + const parsed = JSON.parse(response); + if (Array.isArray(parsed)) return parsed; + return [parsed]; + } catch (_e) { + // fall through to incremental extraction } - return jsons; + const jsonStrs = extractJSONObjects(response); + const parsed = []; + for (const s of jsonStrs) { + try { + parsed.push(JSON.parse(s)); + } catch (_e) { + // ignore unparsable fragment + } + } + return parsed; }; +// Handle both Node streams and Web streams (node-fetch or browser fetch) +// Calls onChunk with the accumulated buffer every time new bytes arrive. +async function handlePartialStream(resp, onChunk) { + if (!resp || !onChunk) return; + + // WHATWG reader (node-fetch v3 / browser) + if (resp.body && typeof resp.body.getReader === 'function') { + const reader = resp.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + while (true) { + const {done, value} = await reader.read(); + if (done) break; + buffer += decoder.decode(value, {stream: true}); + onChunk(buffer); + } + onChunk(buffer); + return; + } + + // Node-style stream (node-fetch v2) + if (resp.body && typeof resp.body.on === 'function') { + const stream = resp.body; + return new Promise((resolve, reject) => { + let buffer = ''; + stream.on('data', chunk => { + buffer += chunk.toString(); + onChunk(buffer); + }); + stream.on('end', () => { + onChunk(buffer); + resolve(); + }); + stream.on('error', e => reject(e)); + }); + } + + // Fallback no streaming support: read whole text + try { + const text = await resp.text(); + onChunk(text); + } catch (e) { + // ignore + } +} + const validate = opts => { - if (!opts.accessToken) { + if (!opts || !opts.accessToken) { throw new Error( 'Could not find access token, learn more at https://wit.ai/docs', ); @@ -598,3 +662,110 @@ const validate = opts => { }; module.exports = Wit; + +// ========================= +// bin/wit_proxy.js (example proxy) +// ========================= + +/* + Quick proxy server that authenticates incoming requests and forwards them to Wit.ai. + - Uses express + http-proxy-middleware + - Reads AUTH_TOKEN (optional) that clients must present in Authorization header + - Forwards streaming body/responses transparently + - Adds logging, CORS, and optional rate limiter + + Usage: + NODE_ENV=production WIT_ACCESS_TOKEN=xxx PROXY_PORT=3000 node bin/wit_proxy.js +*/ + +const express = require('express'); +const {createProxyMiddleware, responseInterceptor} = require('http-proxy-middleware'); +const morgan = require('morgan'); +const helmet = require('helmet'); +const cors = require('cors'); +const rateLimit = require('express-rate-limit'); + +if (typeof process !== 'undefined' && require.main === module) { + startProxy().catch(err => { + console.error('Failed to start proxy:', err); + process.exit(1); + }); +} + +async function startProxy() { + const app = express(); + const PORT = process.env.PROXY_PORT ? Number(process.env.PROXY_PORT) : 3000; + const WIT_ACCESS_TOKEN = process.env.WIT_ACCESS_TOKEN; + const CLIENT_AUTH_TOKEN = process.env.CLIENT_AUTH_TOKEN || null; // if set, require clients to present this + const WIT_URL = process.env.WIT_URL || DEFAULT_WIT_URL; + + if (!WIT_ACCESS_TOKEN) { + throw new Error('WIT_ACCESS_TOKEN environment variable is required'); + } + + // Middlewares + app.use(helmet()); + app.use(cors()); + app.use(morgan('combined')); + + if (process.env.ENABLE_RATE_LIMIT === '1') { + app.use(rateLimit({ windowMs: 60 * 1000, max: 120 })); + } + + // Simple auth middleware for clients to access the proxy + app.use((req, res, next) => { + if (CLIENT_AUTH_TOKEN) { + const auth = req.headers['authorization']; + if (!auth || auth !== `Bearer ${CLIENT_AUTH_TOKEN}`) { + res.status(401).json({ error: 'Unauthorized' }); + return; + } + } + next(); + }); + + // Create a proxy that forwards requests to WIT + const proxyOptions = { + target: WIT_URL, + changeOrigin: true, + secure: true, + selfHandleResponse: false, // do not buffer response by default + onProxyReq: (proxyReq, req, res) => { + // Ensure the Authorization header to WIT is set + proxyReq.setHeader('Authorization', `Bearer ${WIT_ACCESS_TOKEN}`); + // Forward client-supplied headers if needed (safe subset only) + if (req.headers['x-request-id']) { + proxyReq.setHeader('x-request-id', req.headers['x-request-id']); + } + // If body is a stream, nothing special is required for http-proxy + }, + onError: (err, req, res) => { + console.error('Proxy error:', err && err.message ? err.message : err); + if (!res.headersSent) { + res.status(502).json({ error: 'Bad Gateway', detail: String(err) }); + } else { + try { res.end(); } catch(_) {} + } + }, + }; + + // Proxy all relevant endpoints to the Wit API + // You can restrict the paths further if you want only certain endpoints proxied + app.use('/converse', createProxyMiddleware({ ...proxyOptions, pathRewrite: { '^/converse': '/converse' } })); + app.use('/event', createProxyMiddleware({ ...proxyOptions, pathRewrite: { '^/event': '/event' } })); + app.use('/message', createProxyMiddleware({ ...proxyOptions, pathRewrite: { '^/message': '/message' } })); + app.use('/speech', createProxyMiddleware({ ...proxyOptions, pathRewrite: { '^/speech': '/speech' } })); + app.use('/dictation', createProxyMiddleware({ ...proxyOptions, pathRewrite: { '^/dictation': '/dictation' } })); + app.use('/synthesize', createProxyMiddleware({ ...proxyOptions, pathRewrite: { '^/synthesize': '/synthesize' } })); + + // healthcheck + app.get('/_health', (req, res) => res.json({ ok: true })); + + app.listen(PORT, () => { + console.log(`Wit proxy listening on port ${PORT}`); + console.log(`Forwarding requests to ${WIT_URL}`); + }); +} + +// Export proxy starter for tests or programmatic usage +module.exports = { startProxy };