Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
strategy:
fail-fast: false
matrix:
node: [8.x, 10.x, 12.x, 14.x, 16.x, 20.x]
node: [10.x, 12.x, 14.x, 16.x, 20.x, 22.x]

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
Expand Down
260 changes: 153 additions & 107 deletions lib/cluster/ClusterSubscriberGroup.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
import { Debug } from "../utils";
import ClusterSubscriber from "./ClusterSubscriber";
import Cluster from "./index";
import ConnectionPool from "./ConnectionPool";
import { getNodeKey } from "./util";
import * as calculateSlot from "cluster-key-slot";
import ShardedSubscriber from "./ShardedSubscriber";
import * as EventEmitter from "events";
const debug = Debug("cluster:subscriberGroup");

/**
* Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one
* ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m
* messages between shards. However, this has scalability limitations, which is the reason why the sharded
* PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages.
* Given that, we need at least one ClusterSubscriber per master endpoint/node.
* Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature,
* exactly one subscriber exists per cluster instance because the Redis cluster bus forwards
* messages between shards. Sharded PubSub removes this limitation by making each shard
* responsible for its own messages.
*
* This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers
* in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way
* to support this feature.
* This class coordinates one ShardedSubscriber per master node in the cluster, providing
* sharded PubSub support while keeping the public API backward compatible.
*/
export default class ClusterSubscriberGroup {
private shardedSubscribers: Map<string, ClusterSubscriber> = new Map();
private shardedSubscribers: Map<string, ShardedSubscriber> = new Map();
private clusterSlots: string[][] = [];
//Simple [min, max] slot ranges aren't enough because you can migrate single slots
// Simple [min, max] slot ranges aren't enough because you can migrate single slots
private subscriberToSlotsIndex: Map<string, number[]> = new Map();
private channels: Map<number, Array<string | Buffer>> = new Map();

Expand All @@ -29,32 +26,14 @@
*
* @param cluster
*/
constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) {
cluster.on("+node", (redis) => {
this._addSubscriber(redis);
});

cluster.on("-node", (redis) => {
this._removeSubscriber(redis);
});

cluster.on("refresh", () => {
this._refreshSlots(cluster);
});

cluster.on("forceRefresh", () => {
refreshSlotsCacheCallback();
});
}
constructor(private readonly subscriberGroupEmitter: EventEmitter) {}

/**
* Get the responsible subscriber.
*
* Returns null if no subscriber was found
*
* @param slot
*/
getResponsibleSubscriber(slot: number): ClusterSubscriber {
getResponsibleSubscriber(slot: number): ShardedSubscriber | undefined {
const nodeKey = this.clusterSlots[slot][0];
return this.shardedSubscribers.get(nodeKey);
}
Expand All @@ -67,10 +46,12 @@
addChannels(channels: (string | Buffer)[]): number {
const slot = calculateSlot(channels[0]);

//Check if the all channels belong to the same slot and otherwise reject the operation
channels.forEach((c: string) => {
if (calculateSlot(c) != slot) return -1;
});
// Check if the all channels belong to the same slot and otherwise reject the operation
for (const c of channels) {
if (calculateSlot(c) !== slot) {
return -1;
}
}

const currChannels = this.channels.get(slot);

Expand All @@ -93,10 +74,12 @@
removeChannels(channels: (string | Buffer)[]): number {
const slot = calculateSlot(channels[0]);

//Check if the all channels belong to the same slot and otherwise reject the operation
channels.forEach((c: string) => {
if (calculateSlot(c) != slot) return -1;
});
// Check if the all channels belong to the same slot and otherwise reject the operation
for (const c of channels) {
if (calculateSlot(c) !== slot) {
return -1;
}
}

const slotChannels = this.channels.get(slot);

Expand Down Expand Up @@ -124,96 +107,140 @@
* Start all not yet started subscribers
*/
start() {
const startPromises = [];
for (const s of this.shardedSubscribers.values()) {
if (!s.isStarted()) {
s.start();
startPromises.push(
s.start().catch((err) => {
this.subscriberGroupEmitter.emit("subscriberConnectFailed", err);
})
);
}
}
return Promise.all(startPromises);
}

/**
* Add a subscriber to the group of subscribers
*
* @param redis
* Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones.
*/
private _addSubscriber(redis: any): ClusterSubscriber {
const pool: ConnectionPool = new ConnectionPool(redis.options);
public async reset(
clusterSlots: string[][],
clusterNodes: any[]
): Promise<void> {
const hasTopologyChanged = this._refreshSlots(clusterSlots);
const hasFailedSubscribers = this.hasUnhealthySubscribers();

if (!hasTopologyChanged && !hasFailedSubscribers) {
debug(
"No topology change detected or failed subscribers. Skipping reset."
);
return;
}

if (pool.addMasterNode(redis)) {
const sub = new ClusterSubscriber(pool, this.cluster, true);
const nodeKey = getNodeKey(redis.options);
this.shardedSubscribers.set(nodeKey, sub);
sub.start();
// For each of the sharded subscribers
for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) {
if (
// If the subscriber is still responsible for a slot range and is running then keep it
this.subscriberToSlotsIndex.has(nodeKey) &&
shardedSubscriber.isStarted()
) {
debug("Skipping deleting subscriber for %s", nodeKey);
continue;
}

// We need to attempt to resubscribe them in case the new node serves their slot
this._resubscribe();
this.cluster.emit("+subscriber");
return sub;
debug("Removing subscriber for %s", nodeKey);
// Otherwise stop the subscriber and remove it
shardedSubscriber.stop();
this.shardedSubscribers.delete(nodeKey);

this.subscriberGroupEmitter.emit("-subscriber");
}

return null;
}
const startPromises = [];
// For each node in slots cache
for (const [nodeKey, _] of this.subscriberToSlotsIndex) {

Check warning on line 161 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (16.x)

'_' is assigned a value but never used

Check warning on line 161 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (14.x)

'_' is assigned a value but never used

Check warning on line 161 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (10.x)

'_' is assigned a value but never used

Check warning on line 161 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

'_' is assigned a value but never used

Check warning on line 161 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

'_' is assigned a value but never used

Check warning on line 161 in lib/cluster/ClusterSubscriberGroup.ts

View workflow job for this annotation

GitHub Actions / build (12.x)

'_' is assigned a value but never used
// If we already have a subscriber for this node then keep it
if (this.shardedSubscribers.has(nodeKey)) {
debug("Skipping creating new subscriber for %s", nodeKey);
continue;
}

/**
* Removes a subscriber from the group
* @param redis
*/
private _removeSubscriber(redis: any): Map<string, ClusterSubscriber> {
const nodeKey = getNodeKey(redis.options);
const sub = this.shardedSubscribers.get(nodeKey);
debug("Creating new subscriber for %s", nodeKey);
// Otherwise create a new subscriber
const redis = clusterNodes.find((node) => {
return getNodeKey(node.options) === nodeKey;
});

if (sub) {
sub.stop();
this.shardedSubscribers.delete(nodeKey);
if (!redis) {
debug("Failed to find node for key %s", nodeKey);
continue;
}

const sub = new ShardedSubscriber(
this.subscriberGroupEmitter,
redis.options
);

this.shardedSubscribers.set(nodeKey, sub);

// Even though the subscriber to this node is going down, we might have another subscriber
// handling the same slots, so we need to attempt to subscribe the orphaned channels
this._resubscribe();
this.cluster.emit("-subscriber");
startPromises.push(
sub.start().catch((err) => {
this.subscriberGroupEmitter.emit("subscriberConnectFailed", err);
})
);

this.subscriberGroupEmitter.emit("+subscriber");
}

// It's vital to await the start promises before resubscribing
// Otherwise we might try to resubscribe to a subscriber that is not yet connected
// This can cause a race condition
try {
await Promise.all(startPromises);
} catch (err) {
debug("Error while starting subscribers: %s", err);
this.subscriberGroupEmitter.emit("error", err);
}

return this.shardedSubscribers;
this._resubscribe();
this.subscriberGroupEmitter.emit("subscribersReady");
}

/**
* Refreshes the subscriber-related slot ranges
*
* Returns false if no refresh was needed
*
* @param cluster
* @param targetSlots
*/
private _refreshSlots(cluster: Cluster): boolean {
private _refreshSlots(targetSlots: string[][]): boolean {
//If there was an actual change, then reassign the slot ranges
if (this._slotsAreEqual(cluster.slots)) {
if (this._slotsAreEqual(targetSlots)) {
debug(
"Nothing to refresh because the new cluster map is equal to the previous one."
);
} else {
debug("Refreshing the slots of the subscriber group.");

//Rebuild the slots index
this.subscriberToSlotsIndex = new Map();

for (let slot = 0; slot < cluster.slots.length; slot++) {
const node: string = cluster.slots[slot][0];
return false;
}

if (!this.subscriberToSlotsIndex.has(node)) {
this.subscriberToSlotsIndex.set(node, []);
}
this.subscriberToSlotsIndex.get(node).push(Number(slot));
}
debug("Refreshing the slots of the subscriber group.");

//Update the subscribers from the index
this._resubscribe();
//Rebuild the slots index
this.subscriberToSlotsIndex = new Map();

//Update the cached slots map
this.clusterSlots = JSON.parse(JSON.stringify(cluster.slots));
for (let slot = 0; slot < targetSlots.length; slot++) {
const node: string = targetSlots[slot][0];

this.cluster.emit("subscribersReady");
return true;
if (!this.subscriberToSlotsIndex.has(node)) {
this.subscriberToSlotsIndex.set(node, []);
}
this.subscriberToSlotsIndex.get(node).push(Number(slot));
}

return false;
//Update the cached slots map
this.clusterSlots = JSON.parse(JSON.stringify(targetSlots));

return true;
}

/**
Expand All @@ -224,12 +251,9 @@
private _resubscribe() {
if (this.shardedSubscribers) {
this.shardedSubscribers.forEach(
(s: ClusterSubscriber, nodeKey: string) => {
(s: ShardedSubscriber, nodeKey: string) => {
const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey);
if (subscriberSlots) {
//More for debugging purposes
s.associateSlotRange(subscriberSlots);

//Resubscribe on the underlying connection
subscriberSlots.forEach((ss) => {
//Might return null if being disconnected
Expand All @@ -238,12 +262,10 @@

if (channels && channels.length > 0) {
//Try to subscribe now
if (redis) {
redis.ssubscribe(channels);

//If the instance isn't ready yet, then register the re-subscription for later
redis.on("ready", () => {
redis.ssubscribe(channels);
if (redis && redis.status !== "end") {
redis.ssubscribe(channels).catch((err) => {
// TODO: Should we emit an error event here?
debug("Failed to ssubscribe on node %s: %s", nodeKey, err);
});
}
}
Expand All @@ -261,7 +283,31 @@
* @private
*/
private _slotsAreEqual(other: string[][]) {
if (this.clusterSlots === undefined) return false;
else return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
if (this.clusterSlots === undefined) {
return false;
} else {
return JSON.stringify(this.clusterSlots) === JSON.stringify(other);
}
}

/**
* Checks if any subscribers are in an unhealthy state.
*
* A subscriber is considered unhealthy if:
* - It exists but is not started (failed/disconnected)
* - It's missing entirely for a node that should have one
*
* @returns true if any subscribers need to be recreated
*/
private hasUnhealthySubscribers(): boolean {
const hasFailedSubscribers = Array.from(
this.shardedSubscribers.values()
).some((sub) => !sub.isStarted());

const hasMissingSubscribers = Array.from(
this.subscriberToSlotsIndex.keys()
).some((nodeKey) => !this.shardedSubscribers.has(nodeKey));

return hasFailedSubscribers || hasMissingSubscribers;
}
}
Loading
Loading