Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 3 additions & 30 deletions packages/server/src/CachePool.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { IActiveCache, MODE } from './Interface'
import Redis from 'ioredis'
import { Redis } from 'ioredis'
import { RedisConnector } from './RedisConnector'

/**
* This pool is to keep track of in-memory cache used for LLM and Embeddings
Expand All @@ -12,35 +13,7 @@ export class CachePool {
ssoTokenCache: { [key: string]: any } = {}

constructor() {
if (process.env.MODE === MODE.QUEUE) {
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL, {
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined,
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
}
}
this.redisClient = new RedisConnector().getRedisClient()
}

/**
Expand Down
179 changes: 179 additions & 0 deletions packages/server/src/RedisConnector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import { InternalFlowiseError } from './errors/internalFlowiseError'
import logger from './utils/logger'
import { MODE } from './Interface'
import { Redis } from 'ioredis'
import { StatusCodes } from 'http-status-codes'

/**
* Class used to initialize and connect to Redis instance.
*
* Sync usage:
* const connector = new RedisConnector()
* const redis = connector.getRedisClient()
*
* Async usage:
* const connector = new RedisConnector()
* await connector.ready() // fully waits for Redis init
* const redis = connector.getRedisClient()
*/
export class RedisConnector {
/**
* @type {Redis}
*/
private redis!: Redis

/**
* @type {Record<string, unknown>}
*/
private connection!: Record<string, unknown>

/**
* @type {Promise<void>}
*/
private initPromise: Promise<void> | null = null

/**
* Sync constructor
*
* @constructor
*/
constructor() {}

/**
* Initializes Redis lazily (runs once).
*
* @returns {Promise<void>}
*/
private async init(): Promise<void> {
if (this.initPromise) return this.initPromise

this.initPromise = (async () => {
const keepAlive =
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: 0

const tlsOptions =
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: {}

switch (process.env.MODE) {
case MODE.QUEUE:
await this.initializeQueueMode(keepAlive, tlsOptions)
break

case MODE.MAIN:
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR,
`[server]: MODE ${process.env.MODE} not implemented`
)

default:
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR,
`Unrecognized MODE - ${process.env.MODE}`
)
}
})()

return this.initPromise
}

/**
* Queue mode initialization.
*
* @param {number} keepAlive - Keep alive in milliseconds (see https://redis.github.io/ioredis/index.html#RedisOptions)
* @param {Record<string, unknown>} tlsOptions - Record with key-value pairs (see https://redis.github.io/ioredis/index.html#RedisOptions)
*/
private async initializeQueueMode(keepAlive: number, tlsOptions: Record<string, unknown>): Promise<void> {
if (process.env.REDIS_URL) {
logger.info('[server] Queue mode using REDIS_URL.')

tlsOptions.rejectUnauthorized =
!(process.env.REDIS_URL.startsWith('rediss://') && process.env.REDIS_TLS !== 'true')

this.connection = {
keepAlive,
tls: tlsOptions,
enableReadyCheck: true,
reconnectOnError: this.connectOnError.bind(this)
}

this.redis = new Redis(process.env.REDIS_URL, this.connection)

} else {
logger.info('[server] Queue mode using HOST or localhost.')

this.connection = {
host: process.env.REDIS_HOST ?? 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
keepAlive,
tls: tlsOptions,
enableReadyCheck: true,
reconnectOnError: this.connectOnError.bind(this)
}

this.redis = new Redis(this.connection)
}

try {
await this.redis.connect()
} catch (err: any) {
logger.error(`[server]: Redis connection failed - ${err.message}`)
throw new InternalFlowiseError(StatusCodes.INTERNAL_SERVER_ERROR, err.message)
}
}

/**
* Function to handle Redis failure, used as callback.
* https://redis.github.io/ioredis/interfaces/CommonRedisOptions.html#reconnectOnError
* @param {Error} err
* @returns {number} 1 - Always reconnect to Redis in case of errors (does not retry the failed command)
* @see https://redis.github.io/ioredis/interfaces/CommonRedisOptions.html#reconnectOnError
*/
private connectOnError(err: Error): number {
logger.error(`[server]: Redis connection error - ${err.message}`)
return 1
}

/**
* Sync-safe access:
* - If Redis isn't initialized: triggers async initialization.
* - Always returns the Redis instance synchronously.
*
* @returns {Redis}
*/
public getRedisClient(): Redis {
// Trigger async init if not yet started
void this.init()
return this.redis
}

/**
* Fully async safe usage:
* await connector.ready()
*
* @returns {Promise<void>}
*/
public async ready(): Promise<void> {
await this.init()
}

/**
* Sync-safe access
*
* @returns {Record<string, unknown>}
*/
public getRedisConnection(): Record<string, unknown> {
// Trigger async init if not yet started
void this.init()
return this.connection
}
}

export default RedisConnector
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Redis from 'ioredis'
import { RedisConnector } from '../../../RedisConnector'
import { RedisStore } from 'connect-redis'
import { getDatabaseSSLFromEnv } from '../../../DataSource'
import path from 'path'
Expand All @@ -13,24 +14,7 @@ let dbStore: Store | null = null

export const initializeRedisClientAndStore = (): RedisStore => {
if (!redisClient) {
if (process.env.REDIS_URL) {
redisClient = new Redis(process.env.REDIS_URL)
} else {
redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
}
redisClient = new RedisConnector().getRedisClient()
}
if (!redisStore) {
redisStore = new RedisStore({ client: redisClient })
Expand Down
63 changes: 5 additions & 58 deletions packages/server/src/utils/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { IChatFlow, MODE } from '../Interface'
import { Mutex } from 'async-mutex'
import { RedisStore } from 'rate-limit-redis'
import Redis from 'ioredis'
import { RedisConnector } from '../RedisConnector'
import { QueueEvents, QueueEventsListener, QueueEventsProducer } from 'bullmq'

interface CustomListener extends QueueEventsListener {
Expand All @@ -22,65 +23,11 @@ export class RateLimiterManager {
private queueEvents: QueueEvents

constructor() {
let redisConnector = new RedisConnector()
this.redisClient = redisConnector.getRedisClient()
if (process.env.MODE === MODE.QUEUE) {
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL, {
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined,
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
})
}
this.queueEventsProducer = new QueueEventsProducer(QUEUE_NAME, { connection: this.getConnection() })
this.queueEvents = new QueueEvents(QUEUE_NAME, { connection: this.getConnection() })
}
}

getConnection() {
let tlsOpts = undefined
if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) {
tlsOpts = {
rejectUnauthorized: false
}
} else if (process.env.REDIS_TLS === 'true') {
tlsOpts = {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
}
return {
url: process.env.REDIS_URL || undefined,
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls: tlsOpts,
maxRetriesPerRequest: null,
enableReadyCheck: true,
keepAlive:
process.env.REDIS_KEEP_ALIVE && !isNaN(parseInt(process.env.REDIS_KEEP_ALIVE, 10))
? parseInt(process.env.REDIS_KEEP_ALIVE, 10)
: undefined
this.queueEventsProducer = new QueueEventsProducer(QUEUE_NAME, { connection: redisConnector.getRedisConnection() })
this.queueEvents = new QueueEvents(QUEUE_NAME, { connection: redisConnector.getRedisConnection() })
}
}

Expand Down