diff --git a/packages/server/src/CachePool.ts b/packages/server/src/CachePool.ts index bacb01a58c1..37028581361 100644 --- a/packages/server/src/CachePool.ts +++ b/packages/server/src/CachePool.ts @@ -1,5 +1,6 @@ import { IActiveCache, MODE } from './Interface' -import Redis from 'ioredis' +import { Redis } from 'ioredis' +import { RedisConnector } from './RedisConnector' /** * This pool is to keep track of in-memory cache used for LLM and Embeddings @@ -12,35 +13,7 @@ export class CachePool { ssoTokenCache: { [key: string]: any } = {} constructor() { - if (process.env.MODE === MODE.QUEUE) { - if (process.env.REDIS_URL) { - this.redisClient = new Redis(process.env.REDIS_URL, { - keepAlive: - process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) - ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined - }) - } else { - this.redisClient = new Redis({ - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined, - keepAlive: - process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) - ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined - }) - } - } + this.redisClient = new RedisConnector().getRedisClient() } /** diff --git a/packages/server/src/RedisConnector.ts b/packages/server/src/RedisConnector.ts new file mode 100644 index 00000000000..c03dae514a0 --- /dev/null +++ b/packages/server/src/RedisConnector.ts @@ -0,0 +1,179 @@ +import { InternalFlowiseError } from './errors/internalFlowiseError' +import logger from './utils/logger' +import { MODE } from './Interface' +import { Redis } from 'ioredis' +import { StatusCodes } from 'http-status-codes' + +/** + * Class used to initialize and connect to Redis instance. + * + * Sync usage: + * const connector = new RedisConnector() + * const redis = connector.getRedisClient() + * + * Async usage: + * const connector = new RedisConnector() + * await connector.ready() // fully waits for Redis init + * const redis = connector.getRedisClient() + */ +export class RedisConnector { + /** + * @type {Redis} + */ + private redis!: Redis + + /** + * @type {Record} + */ + private connection!: Record + + /** + * @type {Promise} + */ + private initPromise: Promise | null = null + + /** + * Sync constructor + * + * @constructor + */ + constructor() {} + + /** + * Initializes Redis lazily (runs once). + * + * @returns {Promise} + */ + private async init(): Promise { + if (this.initPromise) return this.initPromise + + this.initPromise = (async () => { + const keepAlive = + process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) + ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) + : 0 + + const tlsOptions = + process.env.REDIS_TLS === 'true' + ? { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + : {} + + switch (process.env.MODE) { + case MODE.QUEUE: + await this.initializeQueueMode(keepAlive, tlsOptions) + break + + case MODE.MAIN: + throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, + `[server]: MODE ${process.env.MODE} not implemented` + ) + + default: + throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, + `Unrecognized MODE - ${process.env.MODE}` + ) + } + })() + + return this.initPromise + } + + /** + * Queue mode initialization. + * + * @param {number} keepAlive - Keep alive in milliseconds (see https://redis.github.io/ioredis/index.html#RedisOptions) + * @param {Record} tlsOptions - Record with key-value pairs (see https://redis.github.io/ioredis/index.html#RedisOptions) + */ + private async initializeQueueMode(keepAlive: number, tlsOptions: Record): Promise { + if (process.env.REDIS_URL) { + logger.info('[server] Queue mode using REDIS_URL.') + + tlsOptions.rejectUnauthorized = + !(process.env.REDIS_URL.startsWith('rediss://') && process.env.REDIS_TLS !== 'true') + + this.connection = { + keepAlive, + tls: tlsOptions, + enableReadyCheck: true, + reconnectOnError: this.connectOnError.bind(this) + } + + this.redis = new Redis(process.env.REDIS_URL, this.connection) + + } else { + logger.info('[server] Queue mode using HOST or localhost.') + + this.connection = { + host: process.env.REDIS_HOST ?? 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + keepAlive, + tls: tlsOptions, + enableReadyCheck: true, + reconnectOnError: this.connectOnError.bind(this) + } + + this.redis = new Redis(this.connection) + } + + try { + await this.redis.connect() + } catch (err: any) { + logger.error(`[server]: Redis connection failed - ${err.message}`) + throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, err.message) + } + } + + /** + * Function to handle Redis failure, used as callback. + * https://redis.github.io/ioredis/interfaces/CommonRedisOptions.html#reconnectOnError + * @param {Error} err + * @returns {number} 1 - Always reconnect to Redis in case of errors (does not retry the failed command) + * @see https://redis.github.io/ioredis/interfaces/CommonRedisOptions.html#reconnectOnError + */ + private connectOnError(err: Error): number { + logger.error(`[server]: Redis connection error - ${err.message}`) + return 1 + } + + /** + * Sync-safe access: + * - If Redis isn't initialized: triggers async initialization. + * - Always returns the Redis instance synchronously. + * + * @returns {Redis} + */ + public getRedisClient(): Redis { + // Trigger async init if not yet started + void this.init() + return this.redis + } + + /** + * Fully async safe usage: + * await connector.ready() + * + * @returns {Promise} + */ + public async ready(): Promise { + await this.init() + } + + /** + * Sync-safe access + * + * @returns {Record} + */ + public getRedisConnection(): Record { + // Trigger async init if not yet started + void this.init() + return this.connection + } +} + +export default RedisConnector diff --git a/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts b/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts index afaf3c2f099..09aeb151150 100644 --- a/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts +++ b/packages/server/src/enterprise/middleware/passport/SessionPersistance.ts @@ -1,4 +1,5 @@ import Redis from 'ioredis' +import { RedisConnector } from '../../../RedisConnector' import { RedisStore } from 'connect-redis' import { getDatabaseSSLFromEnv } from '../../../DataSource' import path from 'path' @@ -13,24 +14,7 @@ let dbStore: Store | null = null export const initializeRedisClientAndStore = (): RedisStore => { if (!redisClient) { - if (process.env.REDIS_URL) { - redisClient = new Redis(process.env.REDIS_URL) - } else { - redisClient = new Redis({ - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined - }) - } + redisClient = new RedisConnector().getRedisClient() } if (!redisStore) { redisStore = new RedisStore({ client: redisClient }) diff --git a/packages/server/src/utils/rateLimit.ts b/packages/server/src/utils/rateLimit.ts index d4dd168a654..5aee9293d37 100644 --- a/packages/server/src/utils/rateLimit.ts +++ b/packages/server/src/utils/rateLimit.ts @@ -4,6 +4,7 @@ import { IChatFlow, MODE } from '../Interface' import { Mutex } from 'async-mutex' import { RedisStore } from 'rate-limit-redis' import Redis from 'ioredis' +import { RedisConnector } from '../RedisConnector' import { QueueEvents, QueueEventsListener, QueueEventsProducer } from 'bullmq' interface CustomListener extends QueueEventsListener { @@ -22,65 +23,11 @@ export class RateLimiterManager { private queueEvents: QueueEvents constructor() { + let redisConnector = new RedisConnector() + this.redisClient = redisConnector.getRedisClient() if (process.env.MODE === MODE.QUEUE) { - if (process.env.REDIS_URL) { - this.redisClient = new Redis(process.env.REDIS_URL, { - keepAlive: - process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) - ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined - }) - } else { - this.redisClient = new Redis({ - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined, - keepAlive: - process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) - ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined - }) - } - this.queueEventsProducer = new QueueEventsProducer(QUEUE_NAME, { connection: this.getConnection() }) - this.queueEvents = new QueueEvents(QUEUE_NAME, { connection: this.getConnection() }) - } - } - - getConnection() { - let tlsOpts = undefined - if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) { - tlsOpts = { - rejectUnauthorized: false - } - } else if (process.env.REDIS_TLS === 'true') { - tlsOpts = { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - } - return { - url: process.env.REDIS_URL || undefined, - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: tlsOpts, - maxRetriesPerRequest: null, - enableReadyCheck: true, - keepAlive: - process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10)) - ? parseInt(process.env.REDIS_KEEP_ALIVE, 10) - : undefined + this.queueEventsProducer = new QueueEventsProducer(QUEUE_NAME, { connection: redisConnector.getRedisConnection() }) + this.queueEvents = new QueueEvents(QUEUE_NAME, { connection: redisConnector.getRedisConnection() }) } }