From a70f4a869da31bae52398db7bc86af2aee7e1b0d Mon Sep 17 00:00:00 2001 From: Florian Date: Thu, 7 Aug 2025 13:22:05 +0100 Subject: [PATCH 1/2] WIP --- packages/client/lib/client/commands-queue.ts | 47 +++++++++++++------- packages/client/lib/client/index.spec.ts | 2 + 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/packages/client/lib/client/commands-queue.ts b/packages/client/lib/client/commands-queue.ts index 52a07a7e3b..68d76a66d1 100644 --- a/packages/client/lib/client/commands-queue.ts +++ b/packages/client/lib/client/commands-queue.ts @@ -27,10 +27,6 @@ export interface CommandToWrite extends CommandWaitingForReply { signal: AbortSignal; listener: () => unknown; } | undefined; - timeout: { - signal: AbortSignal; - listener: () => unknown; - } | undefined; } interface CommandWaitingForReply { @@ -38,6 +34,12 @@ interface CommandWaitingForReply { reject(err: unknown): void; channelsCounter: number | undefined; typeMapping: TypeMapping | undefined; + node?: DoublyLinkedNode + timeout: { + signal: AbortSignal; + toBeSent: boolean; + listener: () => unknown; + } | undefined; } export type OnShardedChannelMoved = (channel: string, listeners: ChannelListeners) => void; @@ -54,7 +56,7 @@ export default class RedisCommandsQueue { readonly #respVersion; readonly #maxLength; readonly #toWrite = new DoublyLinkedList(); - readonly #waitingForReply = new SinglyLinkedList(); + readonly #waitingForReply = new DoublyLinkedList(); readonly #onShardedChannelMoved; #chainInExecution: symbol | undefined; readonly decoder; @@ -78,11 +80,21 @@ export default class RedisCommandsQueue { } #onReply(reply: ReplyUnion) { - this.#waitingForReply.shift()!.resolve(reply); + const node = this.#waitingForReply.shift()!; + if (node.timeout) { + RedisCommandsQueue.#removeTimeoutListener(node) + node.timeout = undefined; + } + node.resolve(reply); } #onErrorReply(err: ErrorReply) { - this.#waitingForReply.shift()!.reject(err); + const node = this.#waitingForReply.shift()!; + if (node.timeout) { + RedisCommandsQueue.#removeTimeoutListener(node) + node.timeout = undefined; + } + node.reject(err); } #onPush(push: Array) { @@ -156,7 +168,6 @@ export default class RedisCommandsQueue { } return new Promise((resolve, reject) => { - let node: DoublyLinkedNode; const value: CommandToWrite = { args, chainId: options?.chainId, @@ -173,9 +184,16 @@ export default class RedisCommandsQueue { const signal = AbortSignal.timeout(timeout); value.timeout = { signal, + toBeSent: true, listener: () => { - this.#toWrite.remove(node); - value.reject(new TimeoutError()); + const reject = value.reject; + if (value.timeout!.toBeSent) { + this.#toWrite.remove(value.node as DoublyLinkedNode); + } else { + value.resolve = () => {}; + value.reject = () => {}; + } + reject(new TimeoutError()); } }; signal.addEventListener('abort', value.timeout.listener, { once: true }); @@ -186,14 +204,14 @@ export default class RedisCommandsQueue { value.abort = { signal, listener: () => { - this.#toWrite.remove(node); + this.#toWrite.remove(value.node as DoublyLinkedNode); value.reject(new AbortError()); } }; signal.addEventListener('abort', value.abort.listener, { once: true }); } - node = this.#toWrite.add(value, options?.asap); + value.node = this.#toWrite.add(value, options?.asap); }); } @@ -408,8 +426,7 @@ export default class RedisCommandsQueue { toSend.abort = undefined; } if (toSend.timeout) { - RedisCommandsQueue.#removeTimeoutListener(toSend); - toSend.timeout = undefined; + toSend.timeout.toBeSent = false; } this.#chainInExecution = toSend.chainId; toSend.chainId = undefined; @@ -431,7 +448,7 @@ export default class RedisCommandsQueue { command.abort!.signal.removeEventListener('abort', command.abort!.listener); } - static #removeTimeoutListener(command: CommandToWrite) { + static #removeTimeoutListener(command: CommandToWrite | CommandWaitingForReply) { command.timeout!.signal.removeEventListener('abort', command.timeout!.listener); } diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index 0aed98450d..c3793418e3 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -352,6 +352,7 @@ describe('Client', () => { } }); + /* testUtils.testWithClientSentinel('Timeout with global timeout config (sentinel)', async sentinel => { await blockSetImmediate(async () => { await assert.rejects(sentinel.HSET('key', 'foo', 'value'), TimeoutError); @@ -365,6 +366,7 @@ describe('Client', () => { } } }); + */ testUtils.testWithClient('undefined and null should not break the client', async client => { await assert.rejects( From 91198483e15a2b70dae392e0e03435dcefe36283 Mon Sep 17 00:00:00 2001 From: florian-schunk <149071178+florian-schunk@users.noreply.github.com> Date: Mon, 25 Aug 2025 17:28:28 +0100 Subject: [PATCH 2/2] un-comment-out test --- packages/client/lib/client/index.spec.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/client/lib/client/index.spec.ts b/packages/client/lib/client/index.spec.ts index c3793418e3..0aed98450d 100644 --- a/packages/client/lib/client/index.spec.ts +++ b/packages/client/lib/client/index.spec.ts @@ -352,7 +352,6 @@ describe('Client', () => { } }); - /* testUtils.testWithClientSentinel('Timeout with global timeout config (sentinel)', async sentinel => { await blockSetImmediate(async () => { await assert.rejects(sentinel.HSET('key', 'foo', 'value'), TimeoutError); @@ -366,7 +365,6 @@ describe('Client', () => { } } }); - */ testUtils.testWithClient('undefined and null should not break the client', async client => { await assert.rejects(