diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index d5ff146b..ed8106e8 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -28,7 +28,7 @@ jobs: strategy: fail-fast: false matrix: - node: [8.x, 10.x, 12.x, 14.x, 16.x, 20.x] + node: [10.x, 12.x, 14.x, 16.x, 20.x, 22.x] # Steps represent a sequence of tasks that will be executed as part of the job steps: diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index c0ff76da..53323ed4 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -1,60 +1,49 @@ import { Debug } from "../utils"; -import ClusterSubscriber from "./ClusterSubscriber"; -import Cluster from "./index"; -import ConnectionPool from "./ConnectionPool"; import { getNodeKey } from "./util"; import * as calculateSlot from "cluster-key-slot"; +import ShardedSubscriber from "./ShardedSubscriber"; +import * as EventEmitter from "events"; const debug = Debug("cluster:subscriberGroup"); /** - * Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one - * ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m - * messages between shards. However, this has scalability limitations, which is the reason why the sharded - * PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages. - * Given that, we need at least one ClusterSubscriber per master endpoint/node. + * Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature, + * exactly one subscriber exists per cluster instance because the Redis cluster bus forwards + * messages between shards. Sharded PubSub removes this limitation by making each shard + * responsible for its own messages. * - * This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers - * in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way - * to support this feature. + * This class coordinates one ShardedSubscriber per master node in the cluster, providing + * sharded PubSub support while keeping the public API backward compatible. */ export default class ClusterSubscriberGroup { - private shardedSubscribers: Map = new Map(); + private shardedSubscribers: Map = new Map(); private clusterSlots: string[][] = []; - //Simple [min, max] slot ranges aren't enough because you can migrate single slots + // Simple [min, max] slot ranges aren't enough because you can migrate single slots private subscriberToSlotsIndex: Map = new Map(); private channels: Map> = new Map(); + private failedAttemptsByNode: Map = new Map(); + + // Only latest pending reset kept; throttled by refreshSlotsCache's isRefreshing + backoff delay + private isResetting = false; + private pendingReset: { slots: string[][]; nodes: any[] } | null = null; + + // Retry strategy + private static readonly MAX_RETRY_ATTEMPTS = 10; + private static readonly MAX_BACKOFF_MS = 2000; + private static readonly BASE_BACKOFF_MS = 100; /** * Register callbacks * * @param cluster */ - constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) { - cluster.on("+node", (redis) => { - this._addSubscriber(redis); - }); - - cluster.on("-node", (redis) => { - this._removeSubscriber(redis); - }); - - cluster.on("refresh", () => { - this._refreshSlots(cluster); - }); - - cluster.on("forceRefresh", () => { - refreshSlotsCacheCallback(); - }); - } + constructor(private readonly subscriberGroupEmitter: EventEmitter) {} /** * Get the responsible subscriber. * - * Returns null if no subscriber was found - * * @param slot */ - getResponsibleSubscriber(slot: number): ClusterSubscriber { + getResponsibleSubscriber(slot: number): ShardedSubscriber | undefined { const nodeKey = this.clusterSlots[slot][0]; return this.shardedSubscribers.get(nodeKey); } @@ -67,10 +56,12 @@ export default class ClusterSubscriberGroup { addChannels(channels: (string | Buffer)[]): number { const slot = calculateSlot(channels[0]); - //Check if the all channels belong to the same slot and otherwise reject the operation - channels.forEach((c: string) => { - if (calculateSlot(c) != slot) return -1; - }); + // Check if the all channels belong to the same slot and otherwise reject the operation + for (const c of channels) { + if (calculateSlot(c) !== slot) { + return -1; + } + } const currChannels = this.channels.get(slot); @@ -93,10 +84,12 @@ export default class ClusterSubscriberGroup { removeChannels(channels: (string | Buffer)[]): number { const slot = calculateSlot(channels[0]); - //Check if the all channels belong to the same slot and otherwise reject the operation - channels.forEach((c: string) => { - if (calculateSlot(c) != slot) return -1; - }); + // Check if the all channels belong to the same slot and otherwise reject the operation + for (const c of channels) { + if (calculateSlot(c) !== slot) { + return -1; + } + } const slotChannels = this.channels.get(slot); @@ -124,55 +117,124 @@ export default class ClusterSubscriberGroup { * Start all not yet started subscribers */ start() { + const startPromises = []; for (const s of this.shardedSubscribers.values()) { if (!s.isStarted()) { - s.start(); + startPromises.push( + s + .start() + .then(() => { + this.handleSubscriberConnectSucceeded(s.getNodeKey()); + }) + .catch((err) => { + this.handleSubscriberConnectFailed(err, s.getNodeKey()); + }) + ); } } + return Promise.all(startPromises); } /** - * Add a subscriber to the group of subscribers - * - * @param redis + * Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones. */ - private _addSubscriber(redis: any): ClusterSubscriber { - const pool: ConnectionPool = new ConnectionPool(redis.options); + public async reset( + clusterSlots: string[][], + clusterNodes: any[] + ): Promise { + if (this.isResetting) { + this.pendingReset = { slots: clusterSlots, nodes: clusterNodes }; + return; + } - if (pool.addMasterNode(redis)) { - const sub = new ClusterSubscriber(pool, this.cluster, true); - const nodeKey = getNodeKey(redis.options); - this.shardedSubscribers.set(nodeKey, sub); - sub.start(); + this.isResetting = true; - // We need to attempt to resubscribe them in case the new node serves their slot - this._resubscribe(); - this.cluster.emit("+subscriber"); - return sub; - } + try { + const hasTopologyChanged = this._refreshSlots(clusterSlots); + const hasFailedSubscribers = this.hasUnhealthySubscribers(); - return null; - } + if (!hasTopologyChanged && !hasFailedSubscribers) { + debug( + "No topology change detected or failed subscribers. Skipping reset." + ); + return; + } - /** - * Removes a subscriber from the group - * @param redis - */ - private _removeSubscriber(redis: any): Map { - const nodeKey = getNodeKey(redis.options); - const sub = this.shardedSubscribers.get(nodeKey); + // For each of the sharded subscribers + for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { + if ( + // If the subscriber is still responsible for a slot range and is running then keep it + this.subscriberToSlotsIndex.has(nodeKey) && + shardedSubscriber.isStarted() + ) { + debug("Skipping deleting subscriber for %s", nodeKey); + continue; + } + + debug("Removing subscriber for %s", nodeKey); + // Otherwise stop the subscriber and remove it + shardedSubscriber.stop(); + this.shardedSubscribers.delete(nodeKey); + + this.subscriberGroupEmitter.emit("-subscriber"); + } + + const startPromises = []; + // For each node in slots cache + for (const [nodeKey, _] of this.subscriberToSlotsIndex) { + // If we already have a subscriber for this node then keep it + if (this.shardedSubscribers.has(nodeKey)) { + debug("Skipping creating new subscriber for %s", nodeKey); + continue; + } - if (sub) { - sub.stop(); - this.shardedSubscribers.delete(nodeKey); + debug("Creating new subscriber for %s", nodeKey); + // Otherwise create a new subscriber + const redis = clusterNodes.find((node) => { + return getNodeKey(node.options) === nodeKey; + }); + + if (!redis) { + debug("Failed to find node for key %s", nodeKey); + continue; + } + + const sub = new ShardedSubscriber( + this.subscriberGroupEmitter, + redis.options + ); + + this.shardedSubscribers.set(nodeKey, sub); + + startPromises.push( + sub + .start() + .then(() => { + this.handleSubscriberConnectSucceeded(nodeKey); + }) + .catch((error) => { + this.handleSubscriberConnectFailed(error, nodeKey); + }) + ); + + this.subscriberGroupEmitter.emit("+subscriber"); + } + + // It's vital to await the start promises before resubscribing + // Otherwise we might try to resubscribe to a subscriber that is not yet connected + // This can cause a race condition + await Promise.all(startPromises); - // Even though the subscriber to this node is going down, we might have another subscriber - // handling the same slots, so we need to attempt to subscribe the orphaned channels this._resubscribe(); - this.cluster.emit("-subscriber"); + this.subscriberGroupEmitter.emit("subscribersReady"); + } finally { + this.isResetting = false; + if (this.pendingReset) { + const { slots, nodes } = this.pendingReset; + this.pendingReset = null; + await this.reset(slots, nodes); + } } - - return this.shardedSubscribers; } /** @@ -180,40 +242,36 @@ export default class ClusterSubscriberGroup { * * Returns false if no refresh was needed * - * @param cluster + * @param targetSlots */ - private _refreshSlots(cluster: Cluster): boolean { + private _refreshSlots(targetSlots: string[][]): boolean { //If there was an actual change, then reassign the slot ranges - if (this._slotsAreEqual(cluster.slots)) { + if (this._slotsAreEqual(targetSlots)) { debug( "Nothing to refresh because the new cluster map is equal to the previous one." ); - } else { - debug("Refreshing the slots of the subscriber group."); - //Rebuild the slots index - this.subscriberToSlotsIndex = new Map(); + return false; + } - for (let slot = 0; slot < cluster.slots.length; slot++) { - const node: string = cluster.slots[slot][0]; + debug("Refreshing the slots of the subscriber group."); - if (!this.subscriberToSlotsIndex.has(node)) { - this.subscriberToSlotsIndex.set(node, []); - } - this.subscriberToSlotsIndex.get(node).push(Number(slot)); - } - - //Update the subscribers from the index - this._resubscribe(); + //Rebuild the slots index + this.subscriberToSlotsIndex = new Map(); - //Update the cached slots map - this.clusterSlots = JSON.parse(JSON.stringify(cluster.slots)); + for (let slot = 0; slot < targetSlots.length; slot++) { + const node: string = targetSlots[slot][0]; - this.cluster.emit("subscribersReady"); - return true; + if (!this.subscriberToSlotsIndex.has(node)) { + this.subscriberToSlotsIndex.set(node, []); + } + this.subscriberToSlotsIndex.get(node).push(Number(slot)); } - return false; + //Update the cached slots map + this.clusterSlots = JSON.parse(JSON.stringify(targetSlots)); + + return true; } /** @@ -224,12 +282,9 @@ export default class ClusterSubscriberGroup { private _resubscribe() { if (this.shardedSubscribers) { this.shardedSubscribers.forEach( - (s: ClusterSubscriber, nodeKey: string) => { + (s: ShardedSubscriber, nodeKey: string) => { const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey); if (subscriberSlots) { - //More for debugging purposes - s.associateSlotRange(subscriberSlots); - //Resubscribe on the underlying connection subscriberSlots.forEach((ss) => { //Might return null if being disconnected @@ -238,12 +293,10 @@ export default class ClusterSubscriberGroup { if (channels && channels.length > 0) { //Try to subscribe now - if (redis) { - redis.ssubscribe(channels); - - //If the instance isn't ready yet, then register the re-subscription for later - redis.on("ready", () => { - redis.ssubscribe(channels); + if (redis && redis.status !== "end") { + redis.ssubscribe(channels).catch((err) => { + // TODO: Should we emit an error event here? + debug("Failed to ssubscribe on node %s: %s", nodeKey, err); }); } } @@ -261,7 +314,75 @@ export default class ClusterSubscriberGroup { * @private */ private _slotsAreEqual(other: string[][]) { - if (this.clusterSlots === undefined) return false; - else return JSON.stringify(this.clusterSlots) === JSON.stringify(other); + if (this.clusterSlots === undefined) { + return false; + } else { + return JSON.stringify(this.clusterSlots) === JSON.stringify(other); + } + } + + /** + * Checks if any subscribers are in an unhealthy state. + * + * A subscriber is considered unhealthy if: + * - It exists but is not started (failed/disconnected) + * - It's missing entirely for a node that should have one + * + * @returns true if any subscribers need to be recreated + */ + private hasUnhealthySubscribers(): boolean { + const hasFailedSubscribers = Array.from( + this.shardedSubscribers.values() + ).some((sub) => !sub.isStarted()); + + const hasMissingSubscribers = Array.from( + this.subscriberToSlotsIndex.keys() + ).some((nodeKey) => !this.shardedSubscribers.has(nodeKey)); + + return hasFailedSubscribers || hasMissingSubscribers; } + + /** + * Handles failed subscriber connections by emitting an event to refresh the slots cache + * after a backoff period. + * + * @param error + * @param nodeKey + */ + private handleSubscriberConnectFailed = (error: Error, nodeKey: string) => { + const currentAttempts = this.failedAttemptsByNode.get(nodeKey) || 0; + const failedAttempts = currentAttempts + 1; + this.failedAttemptsByNode.set(nodeKey, failedAttempts); + + const attempts = Math.min( + failedAttempts, + ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS + ); + const backoff = Math.min( + ClusterSubscriberGroup.BASE_BACKOFF_MS * 2 ** attempts, + ClusterSubscriberGroup.MAX_BACKOFF_MS + ); + const jitter = Math.floor((Math.random() - 0.5) * (backoff * 0.5)); + const delay = Math.max(0, backoff + jitter); + + debug( + "Failed to connect subscriber for %s. Refreshing slots in %dms", + nodeKey, + delay + ); + + this.subscriberGroupEmitter.emit("subscriberConnectFailed", { + delay, + error, + }); + }; + + /** + * Handles successful subscriber connections by resetting the failed attempts counter. + * + * @param nodeKey + */ + private handleSubscriberConnectSucceeded = (nodeKey: string) => { + this.failedAttemptsByNode.delete(nodeKey); + }; } diff --git a/lib/cluster/ShardedSubscriber.ts b/lib/cluster/ShardedSubscriber.ts new file mode 100644 index 00000000..8b39d0d4 --- /dev/null +++ b/lib/cluster/ShardedSubscriber.ts @@ -0,0 +1,111 @@ +import EventEmitter = require("events"); +import Redis from "../redis"; +import { getConnectionName, getNodeKey, IRedisOptions } from "./util"; +import { Debug } from "../utils"; +const debug = Debug("cluster:subscriberGroup:shardedSubscriber"); + +export default class ShardedSubscriber { + private readonly nodeKey: string; + private started = false; + private instance: any = null; + + // Store listener references for cleanup + private readonly messageListeners: Map void> = + new Map(); + + constructor(private readonly emitter: EventEmitter, options: IRedisOptions) { + this.instance = new Redis({ + port: options.port, + host: options.host, + username: options.username, + password: options.password, + enableReadyCheck: false, + connectionName: getConnectionName("ssubscriber", options.connectionName), + lazyConnect: true, + tls: options.tls, + /** + * Disable auto reconnection for subscribers. + * The ClusterSubscriberGroup will handle the reconnection. + */ + retryStrategy: null, + }); + + this.nodeKey = getNodeKey(options); + + // Register listeners + this.instance.once("end", this.onEnd); + this.instance.on("error", this.onError); + this.instance.on("moved", this.onMoved); + + for (const event of ["smessage", "smessageBuffer"]) { + const listener = (...args: any[]) => { + this.emitter.emit(event, ...args); + }; + this.messageListeners.set(event, listener); + this.instance.on(event, listener); + } + } + + private onEnd = () => { + this.started = false; + this.emitter.emit("-node", this.instance, this.nodeKey); + }; + + private onError = (error: Error) => { + this.emitter.emit("nodeError", error, this.nodeKey); + }; + + private onMoved = () => { + this.emitter.emit("moved"); + }; + + async start(): Promise { + if (this.started) { + debug("already started %s", this.nodeKey); + return; + } + + try { + await this.instance.connect(); + debug("started %s", this.nodeKey); + this.started = true; + } catch (err) { + debug("failed to start %s: %s", this.nodeKey, err); + this.started = false; + throw err; // Re-throw so caller knows it failed + } + } + + stop(): void { + this.started = false; + + if (this.instance) { + this.instance.disconnect(); + this.instance.removeAllListeners(); + this.messageListeners.clear(); + this.instance = null; + } + + debug("stopped %s", this.nodeKey); + } + + /** + * Checks if the subscriber is started and NOT explicitly disconnected. + */ + isStarted(): boolean { + const status = this.instance?.status; + + const isDisconnected = + status === "end" || status === "close" || !this.instance; + + return this.started && !isDisconnected; + } + + getInstance(): any { + return this.instance; + } + + getNodeKey(): string { + return this.nodeKey; + } +} diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 5957f049..fa5b3f76 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -12,7 +12,6 @@ import { groupSrvRecords, weightSrvRecords, getConnectionName, - getNodeKey, } from "./util"; import ClusterSubscriber from "./ClusterSubscriber"; import DelayQueue from "./DelayQueue"; @@ -36,6 +35,7 @@ import Commander from "../commander"; import Deque = require("denque"); import { Pipeline } from ".."; import ClusterSubscriberGroup from "./ClusterSubscriberGroup"; +import ShardedSubscriber from "./ShardedSubscriber"; const debug = Debug("cluster"); @@ -79,6 +79,7 @@ class Cluster extends EventEmitter { private _readyDelayedCallbacks: CallbackFunction[] = []; public _addedScriptHashes: { [key: string]: any } = {}; public _addedScriptHashesCleanInterval: NodeJS.Timeout; + private subscriberGroupEmitter: EventEmitter | null; /** * Every time Cluster#connect() is called, this value will be @@ -109,11 +110,9 @@ class Cluster extends EventEmitter { this.startupNodes = startupNodes; this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options); - if (this.options.shardedSubscribers == true) - this.shardedSubscribers = new ClusterSubscriberGroup( - this, - this.refreshSlotsCache.bind(this) - ); + if (this.options.shardedSubscribers) { + this.createShardedSubscriberGroup(); + } // validate options if ( @@ -243,6 +242,15 @@ class Cluster extends EventEmitter { } this.connectionPool.reset(nodes); + if (this.options.shardedSubscribers) { + this.shardedSubscribers + .reset(this.slots, this.connectionPool.getNodes("all")) + .catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); + } + function readyHandler() { this.setStatus("ready"); this.retryAttempts = 0; @@ -298,7 +306,10 @@ class Cluster extends EventEmitter { this.subscriber.start(); if (this.options.shardedSubscribers) { - this.shardedSubscribers.start(); + this.shardedSubscribers.start().catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); } }) .catch((err) => { @@ -347,6 +358,9 @@ class Cluster extends EventEmitter { retryDelay ); } else { + if (this.options.shardedSubscribers) { + this.subscriberGroupEmitter?.removeAllListeners(); + } this.setStatus("end"); this.flushQueue(new Error("None of startup nodes is available")); } @@ -411,6 +425,10 @@ class Cluster extends EventEmitter { this.subscriber.stop(); + if (this.options.shardedSubscribers) { + this.shardedSubscribers.stop(); + } + const Promise = PromiseContainer.get(); if (status === "wait") { const ret = asCallback(Promise.resolve<"OK">("OK"), callback); @@ -703,22 +721,32 @@ class Cluster extends EventEmitter { _this.options.shardedSubscribers == true && (command.name == "ssubscribe" || command.name == "sunsubscribe") ) { - const sub: ClusterSubscriber = + const sub: ShardedSubscriber = _this.shardedSubscribers.getResponsibleSubscriber(targetSlot); + + if (!sub) { + command.reject( + new AbortError(`No sharded subscriber for slot: ${targetSlot}`) + ); + return; + } + let status = -1; - if (command.name == "ssubscribe") + if (command.name == "ssubscribe") { status = _this.shardedSubscribers.addChannels(command.getKeys()); - if (command.name == "sunsubscribe") + } + if (command.name == "sunsubscribe") { status = _this.shardedSubscribers.removeChannels( command.getKeys() ); + } if (status !== -1) { redis = sub.getInstance(); } else { command.reject( new AbortError( - "Can't add or remove the given channels. Are they in the same slot?" + "Possible CROSSSLOT error: All channels must hash to the same slot" ) ); } @@ -939,6 +967,15 @@ class Cluster extends EventEmitter { } this.connectionPool.reset(nodes); + + if (this.options.shardedSubscribers) { + this.shardedSubscribers + .reset(this.slots, this.connectionPool.getNodes("all")) + .catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); + } callback(); }, this.options.slotsRefreshTimeout) ); @@ -1088,6 +1125,57 @@ class Cluster extends EventEmitter { }); }); } + + private createShardedSubscriberGroup() { + this.subscriberGroupEmitter = new EventEmitter(); + + this.shardedSubscribers = new ClusterSubscriberGroup( + this.subscriberGroupEmitter + ); + + this.subscriberGroupEmitter.on("-node", (redis, nodeKey) => { + this.emit("-node", redis, nodeKey); + + this.refreshSlotsCache(); + }); + + this.subscriberGroupEmitter.on( + "subscriberConnectFailed", + ({ delay, error }) => { + this.emit("error", error); + + setTimeout(() => { + this.refreshSlotsCache(); + }, delay); + } + ); + + this.subscriberGroupEmitter.on("moved", () => { + this.refreshSlotsCache(); + }); + + this.subscriberGroupEmitter.on("-subscriber", () => { + this.emit("-subscriber"); + }); + + this.subscriberGroupEmitter.on("+subscriber", () => { + this.emit("+subscriber"); + }); + + this.subscriberGroupEmitter.on("nodeError", (error, nodeKey) => { + this.emit("nodeError", error, nodeKey); + }); + + this.subscriberGroupEmitter.on("subscribersReady", () => { + this.emit("subscribersReady"); + }); + + for (const event of ["smessage", "smessageBuffer"]) { + this.subscriberGroupEmitter.on(event, (arg1, arg2, arg3) => { + this.emit(event, arg1, arg2, arg3); + }); + } + } } Object.getOwnPropertyNames(Commander.prototype).forEach((name) => { diff --git a/test/cluster/cluster_subscriber_group.ts b/test/cluster/cluster_subscriber_group.ts index 9e9a3a82..d2776d29 100644 --- a/test/cluster/cluster_subscriber_group.ts +++ b/test/cluster/cluster_subscriber_group.ts @@ -89,7 +89,7 @@ describe("cluster:ClusterSubscriberGroup", () => { expect( err .toString() - .includes("CROSSSLOT Keys in request don't hash to the same slot") + .includes("CROSSSLOT") ).to.be.true; }); diff --git a/test/scenario/sharded-pub-sub.test.ts b/test/scenario/sharded-pub-sub.test.ts index 0a308f97..78476aac 100644 --- a/test/scenario/sharded-pub-sub.test.ts +++ b/test/scenario/sharded-pub-sub.test.ts @@ -1,10 +1,5 @@ import type { TestConfig } from "./utils/test.util"; -import { - createClusterTestClient, - getConfig, - wait, - waitClientReady, -} from "./utils/test.util"; +import { createClusterTestClient, getConfig, wait } from "./utils/test.util"; import { FaultInjectorClient } from "./utils/fault-injector"; import { TestCommandRunner } from "./utils/command-runner"; @@ -12,6 +7,7 @@ import { CHANNELS, CHANNELS_BY_SLOT } from "./utils/test.util"; import { MessageTracker } from "./utils/message-tracker"; import { Cluster } from "../../lib"; import { assert } from "chai"; +import { IClusterOptions } from "../../lib/cluster/ClusterOptions"; describe("Sharded Pub/Sub E2E", () => { let faultInjectorClient: FaultInjectorClient; @@ -24,25 +20,44 @@ describe("Sharded Pub/Sub E2E", () => { }); describe("Single Subscriber", () => { - let subscriber: Cluster; - let publisher: Cluster; - let messageTracker: MessageTracker; - - beforeEach(async () => { - messageTracker = new MessageTracker(CHANNELS); - subscriber = createClusterTestClient(config.clientConfig, { - shardedSubscribers: true, - }); - publisher = createClusterTestClient(config.clientConfig, { + let cleanup: (() => Promise) | null = null; + + const setup = async ( + subscriberOverrides: Partial = {}, + publisherOverrides: Partial = {} + ) => { + const messageTracker = new MessageTracker(CHANNELS); + const subscriber = createClusterTestClient(config.clientConfig, { shardedSubscribers: true, + ...subscriberOverrides, }); - }); + const publisher = createClusterTestClient( + config.clientConfig, + publisherOverrides + ); + + // Return cleanup function along with the resources + cleanup = async () => { + await Promise.all([subscriber.quit(), publisher.quit()]); + }; + + return { subscriber, publisher, messageTracker }; + }; afterEach(async () => { - await Promise.all([subscriber.quit(), publisher.quit()]); + if (cleanup) { + try { + await cleanup(); + } catch { + } finally { + cleanup = null; + } + } }); it("should receive messages published to multiple channels", async () => { + const { subscriber, publisher, messageTracker } = await setup(); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -72,6 +87,10 @@ describe("Sharded Pub/Sub E2E", () => { }); it("should resume publishing and receiving after failover", async () => { + const { subscriber, publisher, messageTracker } = await setup({ + slotsRefreshInterval: -1, + }); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -106,15 +125,20 @@ describe("Sharded Pub/Sub E2E", () => { publishAbort.abort(); await publishResult; - for (const channel of CHANNELS) { - const sent = messageTracker.getChannelStats(channel)!.sent; - const received = messageTracker.getChannelStats(channel)!.received; + const totalSent = CHANNELS.reduce( + (acc, channel) => acc + messageTracker.getChannelStats(channel)!.sent, + 0 + ); + const totalReceived = CHANNELS.reduce( + (acc, channel) => + acc + messageTracker.getChannelStats(channel)!.received, + 0 + ); - assert.ok( - received <= sent, - `Channel ${channel}: received (${received}) should be <= sent (${sent})` - ); - } + assert.ok( + totalReceived <= totalSent, + `Total received (${totalReceived}) should be <= total sent (${totalSent})` + ); // Wait for 2 seconds before resuming publishing await wait(2_000); @@ -152,6 +176,8 @@ describe("Sharded Pub/Sub E2E", () => { }); it("should NOT receive messages after sunsubscribe", async () => { + const { subscriber, publisher, messageTracker } = await setup(); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -337,21 +363,22 @@ describe("Sharded Pub/Sub E2E", () => { publishAbort.abort(); await publishResult; - for (const channel of CHANNELS) { - const sent = messageTracker1.getChannelStats(channel)!.sent; - const received1 = messageTracker1.getChannelStats(channel)!.received; - - const received2 = messageTracker2.getChannelStats(channel)!.received; + const totalSent = CHANNELS.reduce( + (acc, channel) => acc + messageTracker1.getChannelStats(channel)!.sent, + 0 + ); + const totalReceived = CHANNELS.reduce( + (acc, channel) => + acc + + messageTracker1.getChannelStats(channel)!.received + + messageTracker2.getChannelStats(channel)!.received, + 0 + ); - assert.ok( - received1 <= sent, - `Channel ${channel}: received (${received1}) should be <= sent (${sent})` - ); - assert.ok( - received2 <= sent, - `Channel ${channel}: received2 (${received2}) should be <= sent (${sent})` - ); - } + assert.ok( + totalReceived <= totalSent * 2, + `Total received (${totalReceived}) should be <= total sent (${totalSent})` + ); // Wait for 2 seconds before resuming publishing await wait(2_000);