diff --git a/package-lock.json b/package-lock.json index 3739530f34..d19ddee03a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1428,6 +1428,76 @@ "@octokit/openapi-types": "^25.0.0" } }, + "node_modules/@opentelemetry/api": { + "version": "1.9.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/api/-/api-1.9.0.tgz", + "integrity": "sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=8.0.0" + } + }, + "node_modules/@opentelemetry/core": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/core/-/core-2.2.0.tgz", + "integrity": "sha512-FuabnnUm8LflnieVxs6eP7Z383hgQU4W1e3KJS6aOG3RxWxcHyBxH8fDMHNgu/gFx/M2jvTOW/4/PHhLz6bjWw==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/semantic-conventions": "^1.29.0" + }, + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.0.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/resources": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/resources/-/resources-2.2.0.tgz", + "integrity": "sha512-1pNQf/JazQTMA0BiO5NINUzH0cbLbbl7mntLa4aJNmCCXSj0q03T5ZXXL0zw4G55TjdL9Tz32cznGClf+8zr5A==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/core": "2.2.0", + "@opentelemetry/semantic-conventions": "^1.29.0" + }, + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.3.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/sdk-metrics": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/sdk-metrics/-/sdk-metrics-2.2.0.tgz", + "integrity": "sha512-G5KYP6+VJMZzpGipQw7Giif48h6SGQ2PFKEYCybeXJsOCB4fp8azqMAAzE5lnnHK3ZVwYQrgmFbsUJO/zOnwGw==", + "dev": true, + "license": "Apache-2.0", + "dependencies": { + "@opentelemetry/core": "2.2.0", + "@opentelemetry/resources": "2.2.0" + }, + "engines": { + "node": "^18.19.0 || >=20.6.0" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1.9.0 <1.10.0" + } + }, + "node_modules/@opentelemetry/semantic-conventions": { + "version": "1.37.0", + "resolved": "https://registry.npmjs.org/@opentelemetry/semantic-conventions/-/semantic-conventions-1.37.0.tgz", + "integrity": "sha512-JD6DerIKdJGmRp4jQyX5FlrQjA4tjOw1cvfsPAZXfOOEErMUHjPcPSICS+6WnM0nB0efSFARh0KAZss+bvExOA==", + "dev": true, + "license": "Apache-2.0", + "engines": { + "node": ">=14" + } + }, "node_modules/@phun-ky/typeof": { "version": "1.2.8", "resolved": "https://registry.npmjs.org/@phun-ky/typeof/-/typeof-1.2.8.tgz", @@ -7341,12 +7411,22 @@ "cluster-key-slot": "1.1.2" }, "devDependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/sdk-metrics": "^2.2.0", "@redis/test-utils": "*", "@types/sinon": "^17.0.3", "sinon": "^17.0.1" }, "engines": { "node": ">= 18" + }, + "peerDependencies": { + "@opentelemetry/api": ">=1 <2" + }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } } }, "packages/entraid": { @@ -7433,6 +7513,18 @@ "node": ">= 18" } }, + "packages/redis/node_modules/@redis/client": { + "version": "5.9.0-beta.2", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-5.9.0-beta.2.tgz", + "integrity": "sha512-Fs3PSC/o6lrfyFPzY6YwBVDH97WZ1Uj5YvE8wyPY+3gRxNd1XV3nVIvmrlWn5pCTt5dIdqSP6h63Qe5MHhNlIQ==", + "license": "MIT", + "dependencies": { + "cluster-key-slot": "1.1.2" + }, + "engines": { + "node": ">= 18" + } + }, "packages/search": { "name": "@redis/search", "version": "5.9.0", diff --git a/packages/client/index.ts b/packages/client/index.ts index 09426cb50a..b3f277e544 100644 --- a/packages/client/index.ts +++ b/packages/client/index.ts @@ -35,3 +35,4 @@ export { GEO_REPLY_WITH, GeoReplyWith } from './lib/commands/GEOSEARCH_WITH'; export { SetOptions, CLIENT_KILL_FILTERS, FAILOVER_MODES, CLUSTER_SLOT_STATES, COMMAND_LIST_FILTER_BY, REDIS_FLUSH_MODES } from './lib/commands' export { BasicClientSideCache, BasicPooledClientSideCache } from './lib/client/cache'; +export { OpenTelemetry } from './lib/opentelemetry/opentelemetry'; diff --git a/packages/client/lib/client/index.ts b/packages/client/lib/client/index.ts index ea2102c37f..fe15fbe16d 100644 --- a/packages/client/lib/client/index.ts +++ b/packages/client/lib/client/index.ts @@ -1,5 +1,5 @@ import COMMANDS from '../commands'; -import RedisSocket, { RedisSocketOptions } from './socket'; +import RedisSocket, { RedisSocketOptions, RedisTcpSocketOptions } from './socket'; import { BasicAuth, CredentialsError, CredentialsProvider, StreamingCredentialsProvider, UnableToObtainNewCredentialsError, Disposable } from '../authx'; import RedisCommandsQueue, { CommandOptions } from './commands-queue'; import { EventEmitter } from 'node:events'; @@ -21,6 +21,7 @@ import { BasicCommandParser, CommandParser } from './parser'; import SingleEntryCache from '../single-entry-cache'; import { version } from '../../package.json' import EnterpriseMaintenanceManager, { MaintenanceUpdate, MovingEndpointType } from './enterprise-maintenance-manager'; +import { OTelMetrics } from '../opentelemetry/metrics'; export interface RedisClientOptions< M extends RedisModules = RedisModules, @@ -1064,21 +1065,47 @@ export default class RedisClient< args: ReadonlyArray, options?: CommandOptions ): Promise { + const recordOperation = OTelMetrics.createRecordOperationDuration(args, { + host: (this._self.#options.socket as RedisTcpSocketOptions)?.host || "", + port: + ( + this._self.#options.socket as RedisTcpSocketOptions + )?.port?.toString() || "", + db: this._self.#selectedDB.toString(), + }); + if (!this._self.#socket.isOpen) { + recordOperation(new ClientClosedError()); return Promise.reject(new ClientClosedError()); - } else if (!this._self.#socket.isReady && this._self.#options.disableOfflineQueue) { + } else if ( + !this._self.#socket.isReady && + this._self.#options.disableOfflineQueue + ) { + recordOperation(new ClientOfflineError()); return Promise.reject(new ClientOfflineError()); } // Merge global options with provided options const opts = { ...this._self._commandOptions, - ...options - } + ...options, + }; const promise = this._self.#queue.addCommand(args, opts); + OTelMetrics.recordPendingRequests(1); + + const trackedPromise = promise.then((reply) => { + recordOperation(); + return reply; + }).catch((err) => { + recordOperation(err); + throw err; + }).finally(() => { + OTelMetrics.recordPendingRequests(-1); + }); + this._self.#scheduleWrite(); - return promise; + return trackedPromise; } async SELECT(db: number): Promise { diff --git a/packages/client/lib/client/socket.ts b/packages/client/lib/client/socket.ts index c5569e8654..416a124fd8 100644 --- a/packages/client/lib/client/socket.ts +++ b/packages/client/lib/client/socket.ts @@ -5,6 +5,7 @@ import { ConnectionTimeoutError, ClientClosedError, SocketClosedUnexpectedlyErro import { setTimeout } from 'node:timers/promises'; import { RedisArgument } from '../RESP/types'; import { dbgMaintenance } from './enterprise-maintenance-manager'; +import { OTelMetrics } from '../opentelemetry/metrics'; type NetOptions = { tls?: false; @@ -215,6 +216,7 @@ export default class RedisSocket extends EventEmitter { let retries = 0; do { try { + const started = performance.now(); this.#socket = await this.#createSocket(); this.emit('connect'); @@ -228,6 +230,8 @@ export default class RedisSocket extends EventEmitter { this.#isReady = true; this.#socketEpoch++; this.emit('ready'); + OTelMetrics.recordConnectionCount(1); + OTelMetrics.recordConnectionCreateTime(performance.now() - started); } catch (err) { const retryIn = this.#shouldReconnect(retries++, err as Error); if (typeof retryIn !== 'number') { @@ -304,6 +308,10 @@ export default class RedisSocket extends EventEmitter { this.#isReady = false; this.emit('error', err); + if (wasReady) { + OTelMetrics.recordConnectionCount(-1); + } + if (!wasReady || !this.#isOpen || typeof this.#shouldReconnect(0, err) !== 'number') return; this.emit('reconnecting'); @@ -362,6 +370,7 @@ export default class RedisSocket extends EventEmitter { this.#socket = undefined; } + OTelMetrics.recordConnectionCount(-1); this.emit('end'); } diff --git a/packages/client/lib/opentelemetry/metrics.spec.ts b/packages/client/lib/opentelemetry/metrics.spec.ts new file mode 100644 index 0000000000..1851365756 --- /dev/null +++ b/packages/client/lib/opentelemetry/metrics.spec.ts @@ -0,0 +1,421 @@ +import { strict as assert } from "node:assert"; + +import * as api from "@opentelemetry/api"; +import { + AggregationTemporality, + DataPoint, + Histogram, + InMemoryMetricExporter, + MeterProvider, + PeriodicExportingMetricReader, +} from "@opentelemetry/sdk-metrics"; +import { spy } from "sinon"; + +import { OTelMetrics } from "./metrics"; +import { METRIC_NAMES, ObservabilityConfig } from "./types"; +import { NOOP_UP_DOWN_COUNTER_METRIC } from "./noop-meter"; +import { noopFunction, waitForMetrics } from "./utils"; +import testUtils, { GLOBAL } from "../test-utils"; + +describe("OTel Metrics Unit Tests", () => { + afterEach(async () => { + OTelMetrics.reset(); + }); + + it("should init only once", () => { + OTelMetrics.init({ + api: undefined, + config: undefined, + }); + + assert.throws(() => { + OTelMetrics.init({ + api: undefined, + config: undefined, + }); + }); + }); + + it("should be noop if not initialized", () => { + const addSpy = spy(NOOP_UP_DOWN_COUNTER_METRIC, "add"); + + OTelMetrics.recordConnectionCount(1); + + assert.equal(addSpy.callCount, 1); + + addSpy.restore(); + }); + + it("should create instance with noop meter when API is null", () => { + const config: ObservabilityConfig = { + metrics: { + enabled: true, + }, + }; + + const addSpy = spy(NOOP_UP_DOWN_COUNTER_METRIC, "add"); + + OTelMetrics.init({ api: undefined, config }); + + OTelMetrics.recordConnectionCount(1); + + assert.equal(addSpy.callCount, 1); + + addSpy.restore(); + }); + + it("should create instance with noop meter when metrics are disabled", () => { + const config: ObservabilityConfig = { + metrics: { + enabled: false, + }, + }; + + const addSpy = spy(NOOP_UP_DOWN_COUNTER_METRIC, "add"); + + OTelMetrics.init({ api: undefined, config }); + + OTelMetrics.recordConnectionCount(1); + + assert.equal(addSpy.callCount, 1); + + addSpy.restore(); + }); + + it("should not record excluded commands", () => { + const config: ObservabilityConfig = { + metrics: { + enabled: true, + excludeCommands: ["GET"], + }, + }; + + OTelMetrics.init({ api: undefined, config }); + + const recordGET = OTelMetrics.createRecordOperationDuration( + ["GET", "key"], + { + host: "localhost", + port: "6379", + db: "0", + } + ); + + assert.strictEqual( + recordGET, + noopFunction, + "expect record to be noop function" + ); + + const recordSET = OTelMetrics.createRecordOperationDuration( + ["SET", "key"], + { + host: "localhost", + port: "6379", + db: "0", + } + ); + + assert.notStrictEqual( + recordSET, + noopFunction, + "expect record to not be noop function" + ); + }); + + it("should only record included commands", () => { + const config: ObservabilityConfig = { + metrics: { + enabled: true, + includeCommands: ["SET"], + }, + }; + + OTelMetrics.init({ api: undefined, config }); + + const recordGET = OTelMetrics.createRecordOperationDuration( + ["GET", "key"], + { + host: "localhost", + port: "6379", + db: "0", + } + ); + + assert.strictEqual( + recordGET, + noopFunction, + "expect record to be noop function" + ); + + const recordSET = OTelMetrics.createRecordOperationDuration( + ["SET", "key"], + { + host: "localhost", + port: "6379", + db: "0", + } + ); + + assert.notStrictEqual( + recordSET, + noopFunction, + "expect record to not be noop function" + ); + }); +}); + +describe("OTel Metrics E2E", function () { + let exporter: InMemoryMetricExporter; + let meterProvider: MeterProvider; + let reader: PeriodicExportingMetricReader; + + this.timeout(5000); + + beforeEach(function () { + exporter = new InMemoryMetricExporter(AggregationTemporality.CUMULATIVE); + reader = new PeriodicExportingMetricReader({ + exporter, + }); + meterProvider = new MeterProvider({ + readers: [reader], + }); + }); + + afterEach(async function () { + OTelMetrics.reset(); + + // Disable global meter provider + api.metrics.disable(); + + // Drain anything pending, then shut everything down + await reader.collect().catch(() => {}); + await meterProvider.shutdown().catch(() => {}); + }); + + it("should work with injected meter provider", async () => { + const config: ObservabilityConfig = { + metrics: { + enabled: true, + meterProvider, + }, + }; + + OTelMetrics.init({ api, config }); + + OTelMetrics.recordConnectionCount(1); + OTelMetrics.recordConnectionCount(2); + OTelMetrics.recordConnectionCount(3); + + await meterProvider.forceFlush(); + + const resourceMetrics = exporter.getMetrics(); + + const metric = resourceMetrics + .flatMap((rm) => rm.scopeMetrics) + .flatMap((sm) => sm.metrics) + .find((m) => m.descriptor.name === "db.client.connection.count"); + + assert.ok( + metric, + "expected db.client.connection.count metric to be present" + ); + assert.strictEqual(metric.dataPoints?.[0].value, 6); + }); + + it("should use global meter provider from api.metrics", async () => { + // Register the test's meter provider as the global one + api.metrics.setGlobalMeterProvider(meterProvider); + + const config: ObservabilityConfig = { + metrics: { + enabled: true, + // No meterProvider + }, + }; + + OTelMetrics.init({ api, config }); + + OTelMetrics.recordConnectionCount(5); + + await meterProvider.forceFlush(); + + const resourceMetrics = exporter.getMetrics(); + + const metric = resourceMetrics + .flatMap((rm) => rm.scopeMetrics) + .flatMap((sm) => sm.metrics) + .find((m) => m.descriptor.name === "db.client.connection.count"); + + assert.ok( + metric, + "expected db.client.connection.count metric to be present" + ); + assert.strictEqual(metric.dataPoints?.[0].value, 5); + }); + + testUtils.testWithClient( + "should count connections", + async (client) => { + try { + OTelMetrics.init({ + api, + config: { + metrics: { + enabled: true, + meterProvider, + }, + }, + }); + + await client.connect(); + + await meterProvider.forceFlush(); + + const resourceMetrics = exporter.getMetrics(); + const metric = resourceMetrics + .flatMap((rm) => rm.scopeMetrics) + .flatMap((sm) => sm.metrics) + .find( + (m) => m.descriptor.name === METRIC_NAMES.dbClientConnectionCount + ); + + assert.ok( + metric, + "expected db.client.connection.count metric to be present" + ); + assert.strictEqual(metric.dataPoints?.[0].value, 1); + } catch (error) { + throw error; + } finally { + await client.destroy(); + } + }, + { + ...GLOBAL.SERVERS.OPEN, + disableClientSetup: true, + } + ); + + testUtils.testAll( + "should record commands", + async (client) => { + OTelMetrics.init({ + api, + config: { + metrics: { + enabled: true, + meterProvider, + }, + }, + }); + + await Promise.all([ + client.set("key", "value"), + client.set("key", "value"), + client.set("key", "value"), + ]); + + await meterProvider.forceFlush(); + + const resourceMetrics = exporter.getMetrics(); + const metric = resourceMetrics + .flatMap((rm) => rm.scopeMetrics) + .flatMap((sm) => sm.metrics) + .find( + (m) => m.descriptor.name === METRIC_NAMES.dbClientOperationDuration + ); + + assert.ok( + metric, + "expected db.client.operation.duration metric to be present" + ); + + const dataPoints = metric.dataPoints as DataPoint[]; + + assert.strictEqual(dataPoints[0].value.count, 3); + }, + { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + } + ); + + testUtils.testAll( + "should NOT record commands when disabled", + async (client) => { + OTelMetrics.init({ + api, + config: { + metrics: { + enabled: false, + }, + }, + }); + + await Promise.all([ + client.set("key", "value"), + client.set("key", "value"), + client.set("key", "value"), + ]); + + await meterProvider.forceFlush(); + + const resourceMetrics = exporter.getMetrics(); + const metric = resourceMetrics + .flatMap((rm) => rm.scopeMetrics) + .flatMap((sm) => sm.metrics) + .find( + (m) => m.descriptor.name === METRIC_NAMES.dbClientOperationDuration + ); + + assert.strictEqual(metric, undefined); + }, + { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + } + ); + + testUtils.testAll( + "should record pending requests", + async (client) => { + OTelMetrics.init({ + api, + config: { + metrics: { + enabled: true, + meterProvider, + // Make sure connection-advanced is enabled since that's where pending requests is + enabledMetricGroups: ["connection-advanced"], + }, + }, + }); + + const beforeMetric = await waitForMetrics( + meterProvider, + exporter, + METRIC_NAMES.dbClientConnectionPendingRequests + ); + + assert.strictEqual(beforeMetric, undefined); + + const blockingPromise = client.blPop("key${tag}", 1); + + const afterMetric = await waitForMetrics( + meterProvider, + exporter, + METRIC_NAMES.dbClientConnectionPendingRequests + ); + + await blockingPromise; + + assert.ok(afterMetric, "expected pending requests metric to be present"); + assert.strictEqual(afterMetric.dataPoints[0].value, 1); + }, + { + client: GLOBAL.SERVERS.OPEN, + cluster: GLOBAL.CLUSTERS.OPEN, + } + ); +}); diff --git a/packages/client/lib/opentelemetry/metrics.ts b/packages/client/lib/opentelemetry/metrics.ts new file mode 100644 index 0000000000..a5732f8856 --- /dev/null +++ b/packages/client/lib/opentelemetry/metrics.ts @@ -0,0 +1,449 @@ +import { Meter } from "@opentelemetry/api"; +import { RedisArgument } from "../RESP/types"; +import { + DEFAULT_OTEL_ATTRIBUTES, + MetricInstruments, + ObservabilityConfig, + OTEL_ATTRIBUTES, + MetricOptions, + DEFAULT_METRIC_GROUPS, + DEFAULT_HISTOGRAM_BUCKETS, + InstrumentConfig, + MetricGroup, + METRIC_GROUP, + METRIC_NAMES, +} from "./types"; +import { createNoopMeter } from "./noop-meter"; +import { noopFunction } from "./utils"; + +export class OTelMetrics { + // Create a noop instance by default + static #instance: OTelMetrics = new OTelMetrics({ + api: undefined, + config: undefined, + }); + static #initialized = false; + + readonly #meter: Meter; + readonly #instruments: MetricInstruments; + readonly #options: MetricOptions; + + private constructor({ + api, + config, + }: { + api?: typeof import("@opentelemetry/api"); + config?: ObservabilityConfig; + }) { + this.#options = OTelMetrics.parseOptions(config); + this.#meter = OTelMetrics.getMeter(api, this.#options); + this.#instruments = OTelMetrics.registerInstruments( + this.#meter, + this.#options + ); + } + + public static init({ + api, + config, + }: { + api?: typeof import("@opentelemetry/api"); + config?: ObservabilityConfig; + }) { + if (OTelMetrics.#initialized) { + throw new Error("OTelMetrics already initialized"); + } + OTelMetrics.#instance = new OTelMetrics({ api, config }); + OTelMetrics.#initialized = true; + } + + /** + * Reset the instance to noop. Used for testing. + * + * @internal + */ + public static reset() { + OTelMetrics.#instance = new OTelMetrics({ + api: undefined, + config: undefined, + }); + OTelMetrics.#initialized = false; + } + + public static createRecordOperationDuration( + args: ReadonlyArray, + options: { + host: string; + port: string; + db: string; + } + ): (error?: Error) => void { + const commandName = args[0]?.toString() || "UNKNOWN"; + + if ( + OTelMetrics.isCommandExcluded(commandName) || + !OTelMetrics.#instance.#options.enabledMetricGroups.includes( + METRIC_GROUP.COMMAND + ) + ) { + return noopFunction; + } + + const startTime = performance.now(); + + const baseAttributes = { + [OTEL_ATTRIBUTES.dbOperationName]: commandName, + [OTEL_ATTRIBUTES.dbNamespace]: options.db, + [OTEL_ATTRIBUTES.serverAddress]: options.host, + [OTEL_ATTRIBUTES.serverPort]: options.port, + }; + + return (error?: Error) => { + OTelMetrics.#instance.#instruments.dbClientOperationDuration.record( + (performance.now() - startTime) / 1000, // convert to seconds + { + ...OTelMetrics.#instance.#options.attributes, + ...baseAttributes, + // TODO add error types + ...(error ? { [OTEL_ATTRIBUTES.errorType]: error.message } : {}), + } + ); + }; + } + + public static recordConnectionCount(value: number) { + OTelMetrics.#instance.#instruments.dbClientConnectionCount.add( + value, + OTelMetrics.#instance.#options.attributes + ); + } + + public static recordConnectionCreateTime(durationMs: number) { + OTelMetrics.#instance.#instruments.dbClientConnectionCreateTime.record( + durationMs / 1000, // convert to seconds + OTelMetrics.#instance.#options.attributes + ); + } + + public static recordPendingRequests(value: number) { + OTelMetrics.#instance.#instruments.dbClientConnectionPendingRequests.add( + value, + OTelMetrics.#instance.#options.attributes + ); + } + + private static isCommandExcluded(commandName: string) { + const upperCommandName = commandName?.toUpperCase(); + return ( + (OTelMetrics.#instance.#options.includeCommands.length > 0 && + !OTelMetrics.#instance.#options.includeCommands.includes( + upperCommandName + )) || + OTelMetrics.#instance.#options.excludeCommands.includes(upperCommandName) + ); + } + + private static getMeter( + api: typeof import("@opentelemetry/api") | undefined, + options: MetricOptions + ): Meter { + if (!api || !options.enabled) { + return createNoopMeter(); + } + + if (options?.meterProvider) { + return options.meterProvider.getMeter( + options.serviceName ?? + DEFAULT_OTEL_ATTRIBUTES[OTEL_ATTRIBUTES.redisClientLibrary] + ); + } + + return api.metrics.getMeter( + options.serviceName ?? + DEFAULT_OTEL_ATTRIBUTES[OTEL_ATTRIBUTES.redisClientLibrary] + ); + } + + private static parseOptions(config?: ObservabilityConfig) { + return { + enabled: !!config?.metrics?.enabled, + attributes: { + ...DEFAULT_OTEL_ATTRIBUTES, + ...config?.resourceAttributes, + }, + meterProvider: config?.metrics?.meterProvider, + serviceName: config?.serviceName, + includeCommands: + config?.metrics?.includeCommands?.map((c) => c.toUpperCase()) ?? [], + excludeCommands: + config?.metrics?.excludeCommands?.map((c) => c.toUpperCase()) ?? [], + enabledMetricGroups: + config?.metrics?.enabledMetricGroups ?? DEFAULT_METRIC_GROUPS, + hidePubSubChannelNames: config?.metrics?.hidePubSubChannelNames ?? false, + hideStreamNames: config?.metrics?.hideStreamNames ?? false, + histAggregation: + config?.metrics?.histAggregation ?? "explicit_bucket_histogram", + bucketsOperationDuration: + config?.metrics?.bucketsOperationDuration ?? + DEFAULT_HISTOGRAM_BUCKETS.OPERATION_DURATION, + bucketsConnectionCreateTime: + config?.metrics?.bucketsConnectionCreateTime ?? + DEFAULT_HISTOGRAM_BUCKETS.CONNECTION_CREATE_TIME, + bucketsConnectionWaitTime: + config?.metrics?.bucketsConnectionWaitTime ?? + DEFAULT_HISTOGRAM_BUCKETS.CONNECTION_WAIT_TIME, + bucketsConnectionUseTime: + config?.metrics?.bucketsConnectionUseTime ?? + DEFAULT_HISTOGRAM_BUCKETS.CONNECTION_WAIT_TIME, + bucketsPipelineDuration: + config?.metrics?.bucketsPipelineDuration ?? + DEFAULT_HISTOGRAM_BUCKETS.PIPELINE_DURATION, + bucketsHealthcheckDuration: + config?.metrics?.bucketsHealthcheckDuration ?? + DEFAULT_HISTOGRAM_BUCKETS.HEALTHCHECK_DURATION, + bucketsPipelineSize: + config?.metrics?.bucketsPipelineSize ?? + DEFAULT_HISTOGRAM_BUCKETS.PIPELINE_SIZE, + }; + } + + private static createHistorgram( + meter: Meter, + enabledMetricGroups: MetricGroup[], + instrumentConfig: InstrumentConfig + ) { + const isEnabled = enabledMetricGroups.includes( + instrumentConfig.metricGroup + ); + + if (!isEnabled) { + return createNoopMeter().createHistogram(instrumentConfig.name); + } + + return meter.createHistogram(instrumentConfig.name, { + unit: instrumentConfig.unit, + description: instrumentConfig.description, + }); + } + + private static createCounter( + meter: Meter, + enabledMetricGroups: MetricGroup[], + instrumentConfig: InstrumentConfig + ) { + const isEnabled = enabledMetricGroups.includes( + instrumentConfig.metricGroup + ); + + if (!isEnabled) { + return createNoopMeter().createCounter(instrumentConfig.name); + } + + return meter.createCounter(instrumentConfig.name, { + unit: instrumentConfig.unit, + description: instrumentConfig.description, + }); + } + + private static createUpDownCounter( + meter: Meter, + enabledMetricGroups: MetricGroup[], + instrumentConfig: InstrumentConfig + ) { + const isEnabled = enabledMetricGroups.includes( + instrumentConfig.metricGroup + ); + + if (!isEnabled) { + return createNoopMeter().createUpDownCounter(instrumentConfig.name); + } + + return meter.createUpDownCounter(instrumentConfig.name, { + unit: instrumentConfig.unit, + description: instrumentConfig.description, + }); + } + + private static registerInstruments(meter: Meter, options: MetricOptions) { + return { + // Command metrics + dbClientOperationDuration: OTelMetrics.createHistorgram( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientOperationDuration, + unit: "s", + description: + "Duration of a Redis client operation (includes retries)", + metricGroup: METRIC_GROUP.COMMAND, + } + ), + // Connection metrics + dbClientConnectionCreateTime: OTelMetrics.createHistorgram( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionCreateTime, + unit: "s", + description: + "Time taken to create a new connection to the Redis server", + metricGroup: METRIC_GROUP.CONNECTION_BASIC, + } + ), + dbClientConnectionWaitTime: OTelMetrics.createHistorgram( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionWaitTime, + unit: "s", + description: + "Time spent waiting for an available connection from the pool", + metricGroup: METRIC_GROUP.CONNECTION_ADVANCED, + } + ), + dbClientConnectionUseTime: OTelMetrics.createHistorgram( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionUseTime, + unit: "s", + description: + "Time a connection is actively used for executing operations", + metricGroup: METRIC_GROUP.CONNECTION_ADVANCED, + } + ), + redisClientPipelineDuration: OTelMetrics.createHistorgram( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.redisClientPipelineDuration, + unit: "s", + description: "Duration of pipeline execution", + metricGroup: METRIC_GROUP.PIPELINE, + } + ), + redisClientHealthcheckDuration: OTelMetrics.createHistorgram( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.redisClientHealthcheckDuration, + unit: "s", + description: "Duration of health check operations", + metricGroup: METRIC_GROUP.HEALTHCHECK, + } + ), + dbClientConnectionCount: OTelMetrics.createUpDownCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionCount, + unit: "{connection}", + description: "Current number of active connections in the pool", + metricGroup: METRIC_GROUP.CONNECTION_BASIC, + } + ), + dbClientConnectionIdleMax: OTelMetrics.createUpDownCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionIdleMax, + unit: "{connection}", + description: "Maximum number of idle connections allowed in the pool", + metricGroup: METRIC_GROUP.CONNECTION_ADVANCED, + } + ), + dbClientConnectionIdleMin: OTelMetrics.createUpDownCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionIdleMin, + unit: "{connection}", + description: + "Minimum number of idle connections maintained in the pool", + metricGroup: METRIC_GROUP.CONNECTION_ADVANCED, + } + ), + dbClientConnectionMax: OTelMetrics.createUpDownCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionMax, + unit: "{connection}", + description: "Maximum number of connections allowed in the pool", + metricGroup: METRIC_GROUP.CONNECTION_ADVANCED, + } + ), + dbClientConnectionPendingRequests: OTelMetrics.createUpDownCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionPendingRequests, + unit: "{request}", + description: "Number of requests waiting for an available connection", + metricGroup: METRIC_GROUP.CONNECTION_ADVANCED, + } + ), + dbClientConnectionTimeouts: OTelMetrics.createCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.dbClientConnectionTimeouts, + unit: "{timeout}", + description: "Number of connection timeout events", + metricGroup: METRIC_GROUP.RESILIENCY, + } + ), + // Redis-specific metrics + redisClientClusterRedirections: OTelMetrics.createCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.redisClientClusterRedirections, + unit: "{redirection}", + description: "Number of cluster redirection events (MOVED/ASK)", + metricGroup: METRIC_GROUP.RESILIENCY, + } + ), + redisClientErrorsHandled: OTelMetrics.createCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.redisClientErrorsHandled, + unit: "{error}", + description: "Number of errors handled by the Redis client", + metricGroup: METRIC_GROUP.RESILIENCY, + } + ), + redisClientMaintenanceNotifications: OTelMetrics.createCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.redisClientMaintenanceNotifications, + unit: "{notification}", + description: "Number of maintenance notifications received", + metricGroup: METRIC_GROUP.MISC, + } + ), + redisClientPubSubMessages: OTelMetrics.createCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.redisClientPubSubMessages, + unit: "{message}", + description: "Number of pub/sub messages processed", + metricGroup: METRIC_GROUP.PUBSUB, + } + ), + redisClientStreamProduced: OTelMetrics.createCounter( + meter, + options.enabledMetricGroups, + { + name: METRIC_NAMES.redisClientStreamProduced, + unit: "{message}", + description: "Number of messages produced to Redis streams", + metricGroup: METRIC_GROUP.STREAMS, + } + ), + } as const; + } +} diff --git a/packages/client/lib/opentelemetry/noop-meter.ts b/packages/client/lib/opentelemetry/noop-meter.ts new file mode 100644 index 0000000000..cc61f7ca16 --- /dev/null +++ b/packages/client/lib/opentelemetry/noop-meter.ts @@ -0,0 +1,138 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified by PavelPashov, 2025. + */ + +import type { + BatchObservableCallback, + Counter, + Gauge, + Histogram, + Meter, + Attributes, + MetricOptions, + Observable, + ObservableCallback, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +} from "@opentelemetry/api"; + +export class NoopMeter implements Meter { + constructor() {} + + createGauge(_name: string, _options?: MetricOptions): Gauge { + return NOOP_GAUGE_METRIC; + } + + createHistogram(_name: string, _options?: MetricOptions): Histogram { + return NOOP_HISTOGRAM_METRIC; + } + + createCounter(_name: string, _options?: MetricOptions): Counter { + return NOOP_COUNTER_METRIC; + } + + createUpDownCounter(_name: string, _options?: MetricOptions): UpDownCounter { + return NOOP_UP_DOWN_COUNTER_METRIC; + } + + createObservableGauge( + _name: string, + _options?: MetricOptions + ): ObservableGauge { + return NOOP_OBSERVABLE_GAUGE_METRIC; + } + + createObservableCounter( + _name: string, + _options?: MetricOptions + ): ObservableCounter { + return NOOP_OBSERVABLE_COUNTER_METRIC; + } + + createObservableUpDownCounter( + _name: string, + _options?: MetricOptions + ): ObservableUpDownCounter { + return NOOP_OBSERVABLE_UP_DOWN_COUNTER_METRIC; + } + + addBatchObservableCallback( + _callback: BatchObservableCallback, + _observables: Observable[] + ): void {} + + removeBatchObservableCallback(_callback: BatchObservableCallback): void {} +} + +export class NoopMetric {} + +export class NoopCounterMetric extends NoopMetric implements Counter { + add(_value: number, _attributes: Attributes): void {} +} + +export class NoopUpDownCounterMetric + extends NoopMetric + implements UpDownCounter +{ + add(_value: number, _attributes: Attributes): void {} +} + +export class NoopGaugeMetric extends NoopMetric implements Gauge { + record(_value: number, _attributes: Attributes): void {} +} + +export class NoopHistogramMetric extends NoopMetric implements Histogram { + record(_value: number, _attributes: Attributes): void {} +} + +export class NoopObservableMetric { + addCallback(_callback: ObservableCallback) {} + + removeCallback(_callback: ObservableCallback) {} +} + +export class NoopObservableCounterMetric + extends NoopObservableMetric + implements ObservableCounter {} + +export class NoopObservableGaugeMetric + extends NoopObservableMetric + implements ObservableGauge {} + +export class NoopObservableUpDownCounterMetric + extends NoopObservableMetric + implements ObservableUpDownCounter {} + +export const NOOP_METER = new NoopMeter(); + +// Synchronous instruments +export const NOOP_COUNTER_METRIC = new NoopCounterMetric(); +export const NOOP_GAUGE_METRIC = new NoopGaugeMetric(); +export const NOOP_HISTOGRAM_METRIC = new NoopHistogramMetric(); +export const NOOP_UP_DOWN_COUNTER_METRIC = new NoopUpDownCounterMetric(); + +// Asynchronous instruments +export const NOOP_OBSERVABLE_COUNTER_METRIC = new NoopObservableCounterMetric(); +export const NOOP_OBSERVABLE_GAUGE_METRIC = new NoopObservableGaugeMetric(); +export const NOOP_OBSERVABLE_UP_DOWN_COUNTER_METRIC = + new NoopObservableUpDownCounterMetric(); + +export function createNoopMeter(): Meter { + return NOOP_METER; +} diff --git a/packages/client/lib/opentelemetry/opentelemetry.ts b/packages/client/lib/opentelemetry/opentelemetry.ts new file mode 100644 index 0000000000..b18cca11be --- /dev/null +++ b/packages/client/lib/opentelemetry/opentelemetry.ts @@ -0,0 +1,26 @@ +import { OTelMetrics } from "./metrics"; +import { ObservabilityConfig } from "./types"; + +export class OpenTelemetry { + private static _instance: OpenTelemetry | null = null; + + private constructor() {} + + public static init(config?: ObservabilityConfig) { + if (OpenTelemetry._instance) { + throw new Error("OpenTelemetry already initialized"); + } + + let api: typeof import("@opentelemetry/api") | undefined; + + try { + api = require("@opentelemetry/api"); + } catch (err: unknown) { + // TODO add custom errors + throw new Error("OpenTelemetry not found"); + } + + OpenTelemetry._instance = new OpenTelemetry(); + OTelMetrics.init({ api, config }); + } +} diff --git a/packages/client/lib/opentelemetry/types.ts b/packages/client/lib/opentelemetry/types.ts new file mode 100644 index 0000000000..d2e390937f --- /dev/null +++ b/packages/client/lib/opentelemetry/types.ts @@ -0,0 +1,166 @@ +import { + Attributes, + Counter, + Histogram, + MeterProvider, + UpDownCounter, +} from "@opentelemetry/api"; + +export const METRIC_GROUP = { + COMMAND: "command", + CONNECTION_BASIC: "connection-basic", + CONNECTION_ADVANCED: "connection-advanced", + RESILIENCY: "resiliency", + PUBSUB: "pubsub", + STREAMS: "streams", + PIPELINE: "pipeline", + HEALTHCHECK: "healthcheck", + MISC: "misc", +} as const; + +export type MetricGroup = (typeof METRIC_GROUP)[keyof typeof METRIC_GROUP]; + +export const HISTOGRAM_AGGREGATION = { + EXPLICIT_BUCKET_HISTOGRAM: "explicit_bucket_histogram", + BASE2_EXPONENTIAL_BUCKET_HISTOGRAM: "base2_exponential_bucket_histogram", +} as const; + +export type HistogramAggregation = + (typeof HISTOGRAM_AGGREGATION)[keyof typeof HISTOGRAM_AGGREGATION]; + +export const METRIC_INSTRUMENT_TYPE = { + COUNTER: "counter", + HISTOGRAM: "histogram", + UP_DOWN_COUNTER: "up_down_counter", +}; + +export interface MetricConfig { + enabled?: boolean; + meterProvider?: MeterProvider; + includeCommands?: string[]; + excludeCommands?: string[]; + enabledMetricGroups?: MetricGroup[]; + hidePubSubChannelNames?: boolean; + hideStreamNames?: boolean; + histAggregation?: HistogramAggregation; + bucketsOperationDuration?: number[]; + bucketsConnectionCreateTime?: number[]; + bucketsConnectionWaitTime?: number[]; + bucketsConnectionUseTime?: number[]; + bucketsPipelineDuration?: number[]; + bucketsHealthcheckDuration?: number[]; + bucketsPipelineSize?: number[]; +} + +export interface ObservabilityConfig { + serviceName?: string; + resourceAttributes?: Attributes; + metrics?: MetricConfig; +} + +export interface MetricOptions + extends Required> { + attributes: Attributes; + serviceName?: string; + meterProvider?: MeterProvider; +} + +export type MetricInstruments = Readonly<{ + // Histograms + dbClientOperationDuration: Histogram; + dbClientConnectionCreateTime: Histogram; + dbClientConnectionWaitTime: Histogram; + dbClientConnectionUseTime: Histogram; + redisClientPipelineDuration: Histogram; + redisClientHealthcheckDuration: Histogram; + + // UpDownCounters + dbClientConnectionCount: UpDownCounter; + dbClientConnectionIdleMax: UpDownCounter; + dbClientConnectionIdleMin: UpDownCounter; + dbClientConnectionMax: UpDownCounter; + dbClientConnectionPendingRequests: UpDownCounter; + + // Counters + dbClientConnectionTimeouts: Counter; + redisClientClusterRedirections: Counter; + redisClientErrorsHandled: Counter; + redisClientMaintenanceNotifications: Counter; + redisClientPubSubMessages: Counter; + redisClientStreamProduced: Counter; +}>; + +export const OTEL_ATTRIBUTES = { + // Database & network + dbSystem: "db.system", + dbNamespace: "db.namespace", + dbOperationName: "db.operation.name", + dbResponseStatusCode: "db.response.status_code", + errorType: "error.type", + serverAddress: "server.address", + serverPort: "server.port", + networkPeerAddress: "network.peer.address", + networkPeerPort: "network.peer.port", + dbOperationBatchSize: "db.operation.batch.size", + dbStoredProcedureName: "db.stored_procedure.name", + connectionPoolName: "db.client.connection.pool.name", + connectionState: "db.client.connection.state", + + // Redis-specific extensions + redisClientLibrary: "redis.client.library", + redisRedirectionKind: "redis.client.redirection.kind", + redisClientErrorsHandled: "redis.client.errors.handled", + redisClientPubSubChannel: "redis.client.pubsub.channel", + redisClientPubSubSharded: "redis.client.pubsub.sharded", + redisClientStreamName: "redis.client.stream.name", + redisClientOperationRetryAttempts: "redis.client.operation.retry_attempts", + redisClientOperationBlocking: "redis.client.operation.blocking", +} as const; + +export const DEFAULT_OTEL_ATTRIBUTES = { + [OTEL_ATTRIBUTES.redisClientLibrary]: "node-redis", + [OTEL_ATTRIBUTES.dbSystem]: "redis", +} as const; + +export const METRIC_NAMES = { + dbClientOperationDuration: "db.client.operation.duration", + dbClientConnectionCreateTime: "db.client.connection.create.time", + dbClientConnectionWaitTime: "db.client.connection.wait.time", + dbClientConnectionUseTime: "db.client.connection.use.time", + redisClientPipelineDuration: "redis.client.pipeline.duration", + redisClientHealthcheckDuration: "redis.client.healthcheck.duration", + dbClientConnectionCount: "db.client.connection.count", + dbClientConnectionIdleMax: "db.client.connection.idle.max", + dbClientConnectionIdleMin: "db.client.connection.idle.min", + dbClientConnectionMax: "db.client.connection.max", + dbClientConnectionPendingRequests: "db.client.connection.pending_requests", + dbClientConnectionTimeouts: "db.client.connection.timeouts", + redisClientClusterRedirections: "redis.client.cluster.redirections", + redisClientErrorsHandled: "redis.client.errors.handled", + redisClientMaintenanceNotifications: "redis.client.maintenance.notifications", + redisClientPubSubMessages: "redis.client.pubsub.messages", + redisClientStreamProduced: "redis.client.stream.produced", +} as const; + +export type InstrumentConfig = { + name: string; + unit: string; + description: string; + metricGroup: MetricGroup; +}; + +export const DEFAULT_METRIC_GROUPS: MetricGroup[] = [ + "command", + "connection-basic", + "resiliency", +]; + +export const DEFAULT_HISTOGRAM_BUCKETS = { + OPERATION_DURATION: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10], + CONNECTION_CREATE_TIME: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10], + CONNECTION_WAIT_TIME: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10], + CONNECTION_USE_TIME: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10], + PIPELINE_DURATION: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2.5, 5], + HEALTHCHECK_DURATION: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 2.5], + PIPELINE_SIZE: [1, 2, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000], +}; diff --git a/packages/client/lib/opentelemetry/utils.ts b/packages/client/lib/opentelemetry/utils.ts new file mode 100644 index 0000000000..e42688ac91 --- /dev/null +++ b/packages/client/lib/opentelemetry/utils.ts @@ -0,0 +1,30 @@ +import { + MeterProvider, + InMemoryMetricExporter, +} from "@opentelemetry/sdk-metrics"; + +export function noopFunction() {} + +export const waitForMetrics = async ( + meterProvider: MeterProvider, + exporter: InMemoryMetricExporter, + metricName: string, + timeoutMs = 1000 +) => { + const startTime = performance.now(); + + while (performance.now() - startTime < timeoutMs) { + await meterProvider.forceFlush(); + const beforeResourceMetrics = exporter.getMetrics(); + const beforeMetric = beforeResourceMetrics + .flatMap((rm) => rm.scopeMetrics) + .flatMap((sm) => sm.metrics) + .find((m) => m.descriptor.name === metricName); + + if (beforeMetric) { + return beforeMetric; + } + + await new Promise((resolve) => setTimeout(resolve, 100)); + } +}; diff --git a/packages/client/package.json b/packages/client/package.json index 864953de32..e91fa8dd76 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -15,7 +15,17 @@ "dependencies": { "cluster-key-slot": "1.1.2" }, + "peerDependencies": { + "@opentelemetry/api": ">=1 <2" + }, + "peerDependenciesMeta": { + "@opentelemetry/api": { + "optional": true + } + }, "devDependencies": { + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/sdk-metrics": "^2.2.0", "@redis/test-utils": "*", "@types/sinon": "^17.0.3", "sinon": "^17.0.1"