@@ -20,6 +20,7 @@ interface ConnectionInfo {
2020interface ActiveConnection extends ConnectionInfo {
2121 readonly clientSocket : net . Socket ;
2222 readonly serverSocket : net . Socket ;
23+ inflightRequestsCount : number
2324}
2425
2526type SendResult =
@@ -49,11 +50,16 @@ interface ProxyEvents {
4950 'close' : ( ) => void ;
5051}
5152
53+ export type Interceptor = ( data : Buffer ) => Promise < Buffer > ;
54+ export type InterceptorFunction = ( data : Buffer , next : Interceptor ) => Promise < Buffer > ;
55+ type InterceptorInitializer = ( init : Interceptor ) => Interceptor ;
56+
5257export class RedisProxy extends EventEmitter {
5358 private readonly server : net . Server ;
5459 public readonly config : Required < ProxyConfig > ;
5560 private readonly connections : Map < string , ActiveConnection > ;
5661 private isRunning : boolean ;
62+ private interceptorInitializer : InterceptorInitializer = ( init ) => init ;
5763
5864 constructor ( config : ProxyConfig ) {
5965 super ( ) ;
@@ -113,6 +119,13 @@ export class RedisProxy extends EventEmitter {
113119 } ) ;
114120 }
115121
122+ public setInterceptors ( interceptors : Array < InterceptorFunction > ) {
123+ this . interceptorInitializer = ( init ) => interceptors . reduceRight < Interceptor > (
124+ ( next , mw ) => ( data ) => mw ( data , next ) ,
125+ init
126+ ) ;
127+ }
128+
116129 public getStats ( ) : ProxyStats {
117130 const connections = Array . from ( this . connections . values ( ) ) ;
118131
@@ -218,19 +231,22 @@ export class RedisProxy extends EventEmitter {
218231 }
219232
220233 private handleClientConnection ( clientSocket : net . Socket ) : void {
221- const connectionId = this . generateConnectionId ( ) ;
234+ clientSocket . pause ( ) ;
222235 const serverSocket = net . createConnection ( {
223236 host : this . config . targetHost ,
224237 port : this . config . targetPort
225238 } ) ;
239+ serverSocket . once ( 'connect' , clientSocket . resume . bind ( clientSocket ) ) ;
226240
241+ const connectionId = this . generateConnectionId ( ) ;
227242 const connectionInfo : ActiveConnection = {
228243 id : connectionId ,
229244 clientAddress : clientSocket . remoteAddress || 'unknown' ,
230245 clientPort : clientSocket . remotePort || 0 ,
231246 connectedAt : new Date ( ) ,
232247 clientSocket,
233- serverSocket
248+ serverSocket,
249+ inflightRequestsCount : 0
234250 } ;
235251
236252 this . connections . set ( connectionId , connectionInfo ) ;
@@ -243,12 +259,33 @@ export class RedisProxy extends EventEmitter {
243259 this . emit ( 'connection' , connectionInfo ) ;
244260 } ) ;
245261
246- clientSocket . on ( 'data' , ( data ) => {
262+ clientSocket . on ( 'data' , async ( data ) => {
247263 this . emit ( 'data' , connectionId , 'client->server' , data ) ;
248- serverSocket . write ( data ) ;
264+
265+ connectionInfo . inflightRequestsCount ++ ;
266+
267+ // next1 -> next2 -> ... -> last -> server
268+ // next1 <- next2 <- ... <- last <- server
269+ const last = ( data : Buffer ) : Promise < Buffer > => {
270+ return new Promise ( ( resolve , reject ) => {
271+ serverSocket . write ( data ) ;
272+ serverSocket . once ( 'data' , ( data ) => {
273+ connectionInfo . inflightRequestsCount -- ;
274+ assert ( connectionInfo . inflightRequestsCount >= 0 , `inflightRequestsCount for connection ${ connectionId } went below zero` ) ;
275+ this . emit ( 'data' , connectionId , 'server->client' , data ) ;
276+ resolve ( data ) ;
277+ } ) ;
278+ serverSocket . once ( 'error' , reject ) ;
279+ } ) ;
280+ } ;
281+
282+ const interceptorChain = this . interceptorInitializer ( last ) ;
283+ const response = await interceptorChain ( data ) ;
284+ clientSocket . write ( response ) ;
249285 } ) ;
250286
251287 serverSocket . on ( 'data' , ( data ) => {
288+ if ( connectionInfo . inflightRequestsCount > 0 ) return ;
252289 this . emit ( 'data' , connectionId , 'server->client' , data ) ;
253290 clientSocket . write ( data ) ;
254291 } ) ;
@@ -273,6 +310,7 @@ export class RedisProxy extends EventEmitter {
273310 } ) ;
274311
275312 serverSocket . on ( 'error' , ( error ) => {
313+ if ( connectionInfo . inflightRequestsCount > 0 ) return ;
276314 this . log ( `Server error for connection ${ connectionId } : ${ error . message } ` ) ;
277315 this . emit ( 'error' , error , connectionId ) ;
278316 clientSocket . destroy ( ) ;
@@ -306,6 +344,7 @@ export class RedisProxy extends EventEmitter {
306344 }
307345}
308346import { createServer } from 'net' ;
347+ import assert from 'node:assert' ;
309348
310349export function getFreePortNumber ( ) : Promise < number > {
311350 return new Promise ( ( resolve , reject ) => {
@@ -326,4 +365,3 @@ export function getFreePortNumber(): Promise<number> {
326365
327366export { RedisProxy as RedisTransparentProxy } ;
328367export type { ProxyConfig , ConnectionInfo , ProxyEvents , SendResult , DataDirection , ProxyStats } ;
329-
0 commit comments