From 41f9faf29ffa0be9572b5bc0dc3d5ab5be07970e Mon Sep 17 00:00:00 2001 From: ravjotb Date: Tue, 16 Jun 2026 14:24:02 -0700 Subject: [PATCH 01/15] Catch unknown command error in metrics server Signed-off-by: ravjotb --- apps/metrics/src/effects/monitor-stream.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/apps/metrics/src/effects/monitor-stream.js b/apps/metrics/src/effects/monitor-stream.js index 09050f18..32a46b04 100644 --- a/apps/metrics/src/effects/monitor-stream.js +++ b/apps/metrics/src/effects/monitor-stream.js @@ -68,6 +68,9 @@ export const makeMonitorStream = (onLogs = async () => { }, config) => { defer(runMonitorOnce).pipe( catchError((err) => { console.error("Monitor cycle failed", err) + if (err.message?.includes("unknown command")) { + throw err + } return of([]) }), ), From 4dfc26c89d5549b943a621b001e89d0b474e1dbf Mon Sep 17 00:00:00 2001 From: ravjotb Date: Tue, 16 Jun 2026 14:32:20 -0700 Subject: [PATCH 02/15] Handle command log not supported error Signed-off-by: ravjotb --- apps/metrics/src/handlers/commandlog-handler.js | 6 +++++- apps/metrics/src/init-collectors.js | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/apps/metrics/src/handlers/commandlog-handler.js b/apps/metrics/src/handlers/commandlog-handler.js index ffa5eb7c..10ec4419 100644 --- a/apps/metrics/src/handlers/commandlog-handler.js +++ b/apps/metrics/src/handlers/commandlog-handler.js @@ -30,7 +30,11 @@ const getCommandLogRows = async (commandlogType) => { export const getCommandLogs = async (req, res, nodeId) => { try { const commandlogType = req.query.type - const { lastUpdatedAt, nextCycleAt } = getCollectorMeta(commandlogType) || {} + const meta = getCollectorMeta(commandlogType) + if (meta?.error) { + return res.status(503).json({ error: meta.error }) + } + const { lastUpdatedAt, nextCycleAt } = meta || {} if (lastUpdatedAt !== null) { const count = Number(req.query.count) || 100 const rows = await getCommandLogRows(commandlogType) diff --git a/apps/metrics/src/init-collectors.js b/apps/metrics/src/init-collectors.js index b83c0dcc..59ec7824 100644 --- a/apps/metrics/src/init-collectors.js +++ b/apps/metrics/src/init-collectors.js @@ -153,6 +153,7 @@ const setupCollectors = async (client, cfg) => { rows = await fn() } catch (err) { console.warn(`[${f.name}] initial fetch failed, skipping collector:`, err.message) + updateCollectorMeta(f.name, { isRunning: false, error: err.message }) nd.close() return } From 9d3231bbe1f65556625d82e43ed5ec2c19f5ccac Mon Sep 17 00:00:00 2001 From: ravjotb Date: Tue, 16 Jun 2026 15:28:37 -0700 Subject: [PATCH 03/15] Return promise to set monitorRunning to false on error Signed-off-by: ravjotb --- apps/metrics/src/init-collectors.js | 83 +++++++++++++++----------- apps/server/src/actions/commandLogs.ts | 4 ++ 2 files changed, 51 insertions(+), 36 deletions(-) diff --git a/apps/metrics/src/init-collectors.js b/apps/metrics/src/init-collectors.js index 59ec7824..688b5322 100644 --- a/apps/metrics/src/init-collectors.js +++ b/apps/metrics/src/init-collectors.js @@ -59,51 +59,62 @@ const startMonitor = (cfg) => { close: nd.close, } - updateCollectorMeta(monitorEpic.name, { - isRunning: true, - startedAt: Date.now(), - willCompleteAt: Date.now() + monitorEpic.monitoringDuration, - }) - const stream$ = makeMonitorStream(async (logs) => { await sink.appendRows(logs) }, monitorEpic) - const subscription = stream$.subscribe({ - next: (logs) => { - updateCollectorMeta(monitorEpic.name, { - lastUpdatedAt: Date.now(), - }) - console.debug(`[${monitorEpic.name}] monitor cycle complete (${logs.length} logs)`) - }, - error: (err) => { + return new Promise((resolve, reject) => { + let settled = false + + const subscription = stream$.subscribe({ + next: (logs) => { + if (!settled) { + settled = true + updateCollectorMeta(monitorEpic.name, { + isRunning: true, + startedAt: Date.now(), + willCompleteAt: Date.now() + monitorEpic.monitoringDuration, + }) + resolve() + } + updateCollectorMeta(monitorEpic.name, { + lastUpdatedAt: Date.now(), + }) + console.debug(`[${monitorEpic.name}] monitor cycle complete (${logs.length} logs)`) + }, + error: (err) => { + updateCollectorMeta(monitorEpic.name, { + isRunning: false, + lastErrorAt: Date.now(), + lastError: String(err), + willCompleteAt: null, + }) + console.error(`[${monitorEpic.name}] monitor error:`, err) + if (!settled) { + settled = true + reject(err) + } + }, + complete: () => { + updateCollectorMeta(monitorEpic.name, { + completedAt: Date.now(), + isRunning: false, + }) + console.debug(`[${monitorEpic.name}] monitor completed`) + }, + }) + + monitorStopper = async () => { + console.debug(`[${monitorEpic.name}] stopping monitor...`) updateCollectorMeta(monitorEpic.name, { + stoppedAt: Date.now(), isRunning: false, - lastErrorAt: Date.now(), - lastError: String(err), willCompleteAt: null, }) - console.error(`[${monitorEpic.name}] monitor error:`, err) - }, - complete: () => { - updateCollectorMeta(monitorEpic.name, { - completedAt: Date.now(), - isRunning: false, - }) - console.debug(`[${monitorEpic.name}] monitor completed`) - }, + subscription.unsubscribe() + await sink.close() + } }) - - monitorStopper = async () => { - console.debug(`[${monitorEpic.name}] stopping monitor...`) - updateCollectorMeta(monitorEpic.name, { - stoppedAt: Date.now(), - isRunning: false, - willCompleteAt: null, - }) - subscription.unsubscribe() - await sink.close() - } } const stopMonitor = async () => await monitorStopper() diff --git a/apps/server/src/actions/commandLogs.ts b/apps/server/src/actions/commandLogs.ts index 466eefcb..5479a74f 100644 --- a/apps/server/src/actions/commandLogs.ts +++ b/apps/server/src/actions/commandLogs.ts @@ -85,6 +85,10 @@ const fetchCommandLogs = async (metricsServerURI: string, commandLogType: Comman const count = process.env.COMMAND_LOGS_COUNT || "100" const url = `${metricsServerURI}/commandlog?type=${commandLogType}&count=${count}` const initialResponse = await fetch(url) + if (!initialResponse.ok) { + const body = await initialResponse.json().catch(() => ({})) as { error?: string } + throw new Error(body.error ?? `HTTP ${initialResponse.status}`) + } const parsed: CommandLogResponse = await initialResponse.json() as CommandLogResponse if (parsed.checkAt) { const delay = parsed.checkAt - Date.now() From 5418143299b8bdcf14630e6fe1e612d10f43597c Mon Sep 17 00:00:00 2001 From: ravjotb Date: Tue, 16 Jun 2026 15:43:55 -0700 Subject: [PATCH 04/15] Propagate monitor error to the frontend Signed-off-by: ravjotb --- .../components/activity-view/ActivityView.tsx | 4 +++- .../hotkeys/hot-keys-banners.tsx | 23 +++++++++++-------- .../activity-view/hotkeys/hot-keys.tsx | 5 ++-- .../valkey-features/monitor/monitorSlice.ts | 5 ++++ 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/apps/frontend/src/components/activity-view/ActivityView.tsx b/apps/frontend/src/components/activity-view/ActivityView.tsx index 2181ba1a..024dabc0 100644 --- a/apps/frontend/src/components/activity-view/ActivityView.tsx +++ b/apps/frontend/src/components/activity-view/ActivityView.tsx @@ -22,7 +22,7 @@ import { hotKeysRequested, selectHotKeys, selectHotKeysStatus, selectHotKeysError, selectHotKeysNodeErrors, selectHotKeysLastCollectedAt } from "@/state/valkey-features/hotkeys/hotKeysSlice" -import { monitorRequested, selectMonitorRunning } from "@/state/valkey-features/monitor/monitorSlice" +import { monitorRequested, selectMonitorRunning, selectMonitorError } from "@/state/valkey-features/monitor/monitorSlice" import { selectConnectionDetails, selectClusterAlias } from "@/state/valkey-features/connection/connectionSelectors" import { getKeyTypeRequested } from "@/state/valkey-features/keys/keyBrowserSlice" import { selectKeys } from "@/state/valkey-features/keys/keyBrowserSelectors" @@ -60,6 +60,7 @@ export const ActivityView = () => { const hotKeysNodeErrors = useSelector((state: RootState) => selectHotKeysNodeErrors(hotKeysId)(state)) const hotKeysLastCollectedAt = useSelector((state: RootState) => selectHotKeysLastCollectedAt(hotKeysId)(state)) const monitorRunning = useSelector(selectMonitorRunning(id!)) + const monitorError = useSelector(selectMonitorError(id!)) const connectionDetails = useSelector((state: RootState) => selectConnectionDetails(id!)(state)) const clusterAlias = useSelector(selectClusterAlias(id!)) const useHotSlots = connectionDetails?.keyEvictionPolicy?.includes("lfu") && connectionDetails?.clusterSlotStatsEnabled @@ -206,6 +207,7 @@ export const ActivityView = () => { errorMessage={hotKeysErrorMessage as string | null} isCluster={!!clusterId} monitorRunning={monitorRunning} + monitorError={monitorError} nodeErrors={hotKeysNodeErrors} onKeyClick={handleKeyClick} onStartMonitoring={useHotSlots ? undefined : () => setConfigOpen(true)} diff --git a/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx b/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx index 2d02333d..e90e3ab7 100644 --- a/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx +++ b/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx @@ -59,22 +59,27 @@ export function NodeErrorsBanner({ nodeErrors }: NodeErrorsBannerProps) { interface MonitorNotRunningBannerProps { onStartMonitoring: () => void + error?: string | null } -export function MonitorNotRunningBanner({ onStartMonitoring }: MonitorNotRunningBannerProps) { +export function MonitorNotRunningBanner({ onStartMonitoring, error }: MonitorNotRunningBannerProps) { return (
- Monitor is not running. Showing last known data.{" "} - + {error + ? <>Monitor failed: {error} + : <>Monitor is not running. Showing last known data.{" "} + + + }
) diff --git a/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx b/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx index 4aa31927..f353eba8 100644 --- a/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx +++ b/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx @@ -17,6 +17,7 @@ interface HotKeysProps { errorMessage: string | null status?: string monitorRunning?: boolean + monitorError?: string | null nodeErrors?: { connectionId: string; error: string }[] isCluster?: boolean onKeyClick?: (keyName: string) => void @@ -25,7 +26,7 @@ interface HotKeysProps { } export function HotKeys({ - data, errorMessage, status, monitorRunning, nodeErrors, + data, errorMessage, status, monitorRunning, monitorError, nodeErrors, isCluster, onKeyClick, onStartMonitoring, selectedKey, }: HotKeysProps) { const [sortOrder, setSortOrder] = useState("desc") @@ -63,7 +64,7 @@ export function HotKeys({ const banners = ( <> {!monitorRunning && onStartMonitoring && ( - + )} {nodeErrors && nodeErrors.length > 0 && ( diff --git a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts index 691d2ac9..e7a72193 100644 --- a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts +++ b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts @@ -13,6 +13,11 @@ export const selectMonitorLoading = (state: RootState) => R.path([VALKEY.MONITOR.name, connectionId, "loading"], state) ?? false +export const selectMonitorError = + (connectionId: string) => + (state: RootState) => + R.path([VALKEY.MONITOR.name, connectionId, "error"], state) ?? null + export const selectRunningMonitorConnections = (state: RootState): { connectionId: string; startedAt: number | null }[] => { const monitorState = R.path([VALKEY.MONITOR.name], state) ?? {} From 3b9e754afb76ee680e48d7b657f10accf0315611 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Tue, 16 Jun 2026 15:55:03 -0700 Subject: [PATCH 05/15] Fix bug with clearing error in slice Signed-off-by: ravjotb --- apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts index e7a72193..140bd8ca 100644 --- a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts +++ b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts @@ -68,7 +68,7 @@ const monitorSlice = createSlice({ state[connectionId].checkAt = parsedResponse.checkAt ?? null state[connectionId].startedAt = parsedResponse.startedAt ?? null state[connectionId].loading = false - state[connectionId].error = null + if (parsedResponse.monitorRunning) state[connectionId].error = null }, // eslint-disable-next-line @typescript-eslint/no-unused-vars saveMonitorSettingsRequested: (state, action) => { From 50748763ed432b5de9de4b5f590d0d76ea3b497c Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 13:09:39 -0700 Subject: [PATCH 06/15] Map command log name correctly Signed-off-by: ravjotb --- apps/metrics/src/handlers/commandlog-handler.js | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/apps/metrics/src/handlers/commandlog-handler.js b/apps/metrics/src/handlers/commandlog-handler.js index 10ec4419..5b61353a 100644 --- a/apps/metrics/src/handlers/commandlog-handler.js +++ b/apps/metrics/src/handlers/commandlog-handler.js @@ -1,7 +1,13 @@ -import { COMMANDLOG_TYPE } from "../utils/constants.js" +import { COMMANDLOG_TYPE, COMMANDLOG_SLOW, COMMANDLOG_LARGE_REPLY, COMMANDLOG_LARGE_REQUEST } from "../utils/constants.js" import { getCollectorMeta } from "../init-collectors.js" import * as Streamer from "../effects/ndjson-streamer.js" +const collectorNameByType = { + [COMMANDLOG_TYPE.SLOW]: COMMANDLOG_SLOW, + [COMMANDLOG_TYPE.LARGE_REQUEST]: COMMANDLOG_LARGE_REQUEST, + [COMMANDLOG_TYPE.LARGE_REPLY]: COMMANDLOG_LARGE_REPLY, +} + const latestByTsFold = () => ({ seed: null, reducer: (acc, curr) => (acc == null || curr.ts > acc.ts ? curr : acc), @@ -30,7 +36,7 @@ const getCommandLogRows = async (commandlogType) => { export const getCommandLogs = async (req, res, nodeId) => { try { const commandlogType = req.query.type - const meta = getCollectorMeta(commandlogType) + const meta = getCollectorMeta(collectorNameByType[commandlogType]) if (meta?.error) { return res.status(503).json({ error: meta.error }) } From 5f6e1e9ea940348fa3642b4475f1d0e5843cde54 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 15:32:31 -0700 Subject: [PATCH 07/15] Detect serverless and add warning Signed-off-by: ravjotb --- apps/frontend/src/state/epics/valkeyEpics.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/frontend/src/state/epics/valkeyEpics.ts b/apps/frontend/src/state/epics/valkeyEpics.ts index d24e8c25..ed8217e8 100644 --- a/apps/frontend/src/state/epics/valkeyEpics.ts +++ b/apps/frontend/src/state/epics/valkeyEpics.ts @@ -99,7 +99,14 @@ export const connectionEpic = (store: Store) => currentConnections[payload.connectionId] = connectionToSave localStorage.setItem(LOCAL_STORAGE.VALKEY_CONNECTIONS, JSON.stringify(currentConnections)) - toast.success("Connected to server successfully!") + if (payload.connectionId?.includes(".serverless.")) { + toast.warning( + "Connected to an ElastiCache Serverless cache. Some features like Dashboard metrics, Command Logs, and Monitor are not available. Key Browser and Send Command work as expected.", + { duration: 10000 }, + ) + } else { + toast.success("Connected to server successfully!") + } } catch (e) { toast.error("Connection to server failed!") console.error(e) From 8fc028c192fd0547a0f80718c8550c9609ef244c Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 15:38:29 -0700 Subject: [PATCH 08/15] Check base connection details instead of sanitized Signed-off-by: ravjotb --- apps/frontend/src/state/epics/valkeyEpics.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/frontend/src/state/epics/valkeyEpics.ts b/apps/frontend/src/state/epics/valkeyEpics.ts index ed8217e8..335bfa23 100644 --- a/apps/frontend/src/state/epics/valkeyEpics.ts +++ b/apps/frontend/src/state/epics/valkeyEpics.ts @@ -99,7 +99,7 @@ export const connectionEpic = (store: Store) => currentConnections[payload.connectionId] = connectionToSave localStorage.setItem(LOCAL_STORAGE.VALKEY_CONNECTIONS, JSON.stringify(currentConnections)) - if (payload.connectionId?.includes(".serverless.")) { + if (baseConnectionDetails?.host?.includes(".serverless.")) { toast.warning( "Connected to an ElastiCache Serverless cache. Some features like Dashboard metrics, Command Logs, and Monitor are not available. Key Browser and Send Command work as expected.", { duration: 10000 }, From 0ada6fd9755c57f9725e9b22c87a0a4460a3b232 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 16:07:43 -0700 Subject: [PATCH 09/15] Display key size as '-' when not available Signed-off-by: ravjotb --- apps/frontend/src/components/dashboard/Dashboard.tsx | 2 +- apps/frontend/src/components/ui/donut-chart.tsx | 2 +- apps/server/src/keys-browser.ts | 2 +- common/src/bytes-conversion.ts | 1 + common/src/memory-usage-calculation.ts | 4 +++- 5 files changed, 7 insertions(+), 4 deletions(-) diff --git a/apps/frontend/src/components/dashboard/Dashboard.tsx b/apps/frontend/src/components/dashboard/Dashboard.tsx index b5626dd9..a16fe345 100644 --- a/apps/frontend/src/components/dashboard/Dashboard.tsx +++ b/apps/frontend/src/components/dashboard/Dashboard.tsx @@ -154,7 +154,7 @@ export function Dashboard() { className="flex-1" icon={} label="Used Memory" - value={formatBytes(memoryUsageMetrics.used_memory || 0)} + value={formatBytes(memoryUsageMetrics.used_memory || -1)} /> { (stats[key.type] ??= { count: 0, totalSize: 0 }).count += 1 - stats[key.type].totalSize += key.size + if (key.size > 0) stats[key.type].totalSize += key.size return stats }, {} as Record) diff --git a/apps/server/src/keys-browser.ts b/apps/server/src/keys-browser.ts index 52d4dd8c..14a7964d 100644 --- a/apps/server/src/keys-browser.ts +++ b/apps/server/src/keys-browser.ts @@ -346,7 +346,7 @@ export async function getKeyInfo( name: key, type: keyType, ttl: ttl, - size: memoryUsage || 0, + size: memoryUsage || -1, } // Get collection size and elements for each type diff --git a/common/src/bytes-conversion.ts b/common/src/bytes-conversion.ts index 9caa1f82..61327b76 100644 --- a/common/src/bytes-conversion.ts +++ b/common/src/bytes-conversion.ts @@ -1,4 +1,5 @@ export function formatBytes(bytes: number) { + if (bytes < 0) return "-" if (bytes === 0) return "0 B" const k = 1024 const sizes = ["B", "KB", "MB", "GB"] diff --git a/common/src/memory-usage-calculation.ts b/common/src/memory-usage-calculation.ts index ae3cd56c..4ed57ef2 100644 --- a/common/src/memory-usage-calculation.ts +++ b/common/src/memory-usage-calculation.ts @@ -7,5 +7,7 @@ interface KeyInfo { } export function calculateTotalMemoryUsage (keys: KeyInfo[]) { - return keys.reduce((total, key) => total + (key.size || 0), 0) + const validKeys = keys.filter((key) => key.size > 0) + if (validKeys.length === 0) return -1 + return validKeys.reduce((total, key) => total + key.size, 0) }; From d4f2dfcb7f7dbaad8dfff0f1358df1174a7a137b Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 16:19:59 -0700 Subject: [PATCH 10/15] Fix getKeyInfo default Signed-off-by: ravjotb --- apps/server/src/keys-browser.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/server/src/keys-browser.ts b/apps/server/src/keys-browser.ts index 14a7964d..eebe8925 100644 --- a/apps/server/src/keys-browser.ts +++ b/apps/server/src/keys-browser.ts @@ -401,7 +401,7 @@ export async function getKeyInfo( name: key, type: "unknown", ttl: -1, - size: 0, + size: -1, } } } @@ -517,7 +517,7 @@ export async function getKeys( name: k, type: "unknown", ttl: -1, - size: 0, + size: -1, })), ), ), From acd05ba76295a7984a7b761bc90d3dafcd4efc54 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 16:24:42 -0700 Subject: [PATCH 11/15] Change to em dash Signed-off-by: ravjotb --- common/src/bytes-conversion.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/bytes-conversion.ts b/common/src/bytes-conversion.ts index 61327b76..a13a04fc 100644 --- a/common/src/bytes-conversion.ts +++ b/common/src/bytes-conversion.ts @@ -1,5 +1,5 @@ export function formatBytes(bytes: number) { - if (bytes < 0) return "-" + if (bytes < 0) return "—" if (bytes === 0) return "0 B" const k = 1024 const sizes = ["B", "KB", "MB", "GB"] From 7b49d8c78dca2a197efe2d6c168aa62d24df2ae3 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 16:28:04 -0700 Subject: [PATCH 12/15] Fix lint and tests Signed-off-by: ravjotb --- apps/frontend/src/components/activity-view/ActivityView.tsx | 2 +- .../src/components/activity-view/hotkeys/hot-keys.tsx | 2 +- apps/frontend/src/state/epics/valkeyEpics.ts | 4 +++- apps/server/src/__tests__/keys-browser.test.ts | 6 +++--- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/apps/frontend/src/components/activity-view/ActivityView.tsx b/apps/frontend/src/components/activity-view/ActivityView.tsx index 024dabc0..744cc00b 100644 --- a/apps/frontend/src/components/activity-view/ActivityView.tsx +++ b/apps/frontend/src/components/activity-view/ActivityView.tsx @@ -206,8 +206,8 @@ export const ActivityView = () => { data={hotKeysData} errorMessage={hotKeysErrorMessage as string | null} isCluster={!!clusterId} - monitorRunning={monitorRunning} monitorError={monitorError} + monitorRunning={monitorRunning} nodeErrors={hotKeysNodeErrors} onKeyClick={handleKeyClick} onStartMonitoring={useHotSlots ? undefined : () => setConfigOpen(true)} diff --git a/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx b/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx index f353eba8..9bf16e60 100644 --- a/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx +++ b/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx @@ -64,7 +64,7 @@ export function HotKeys({ const banners = ( <> {!monitorRunning && onStartMonitoring && ( - + )} {nodeErrors && nodeErrors.length > 0 && ( diff --git a/apps/frontend/src/state/epics/valkeyEpics.ts b/apps/frontend/src/state/epics/valkeyEpics.ts index 335bfa23..3e097322 100644 --- a/apps/frontend/src/state/epics/valkeyEpics.ts +++ b/apps/frontend/src/state/epics/valkeyEpics.ts @@ -101,7 +101,9 @@ export const connectionEpic = (store: Store) => if (baseConnectionDetails?.host?.includes(".serverless.")) { toast.warning( - "Connected to an ElastiCache Serverless cache. Some features like Dashboard metrics, Command Logs, and Monitor are not available. Key Browser and Send Command work as expected.", + `Connected to an ElastiCache Serverless cache. + Some features like Dashboard metrics, Command Logs, and Monitor are not available. + Key Browser and Send Command work as expected.`, { duration: 10000 }, ) } else { diff --git a/apps/server/src/__tests__/keys-browser.test.ts b/apps/server/src/__tests__/keys-browser.test.ts index 59e2e5ef..de831a15 100644 --- a/apps/server/src/__tests__/keys-browser.test.ts +++ b/apps/server/src/__tests__/keys-browser.test.ts @@ -56,7 +56,7 @@ describe("getKeyInfo", () => { const result = await getKeyInfo(mockClient as any, "mykey") - assert.strictEqual(result.size, 0) + assert.strictEqual(result.size, -1) }) }) @@ -189,7 +189,7 @@ describe("getKeyInfo", () => { assert.strictEqual(result.name, "failkey") assert.strictEqual(result.type, "unknown") assert.strictEqual(result.ttl, -1) - assert.strictEqual(result.size, 0) + assert.strictEqual(result.size, -1) }) }) }) @@ -724,7 +724,7 @@ describe("getKeyInfoSingle", () => { assert.strictEqual(sentMessage.payload.key, "mykey") assert.strictEqual(sentMessage.payload.type, "unknown") assert.strictEqual(sentMessage.payload.ttl, -1) - assert.strictEqual(sentMessage.payload.size, 0) + assert.strictEqual(sentMessage.payload.size, -1) }) }) }) From 9ca9e6a464f5e8b68becfa9e5611297054159503 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Fri, 19 Jun 2026 16:32:28 -0700 Subject: [PATCH 13/15] Fix metrics tests Signed-off-by: ravjotb --- apps/metrics/src/init-collectors.test.js | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/apps/metrics/src/init-collectors.test.js b/apps/metrics/src/init-collectors.test.js index ad0ac8ba..17e0a846 100644 --- a/apps/metrics/src/init-collectors.test.js +++ b/apps/metrics/src/init-collectors.test.js @@ -276,6 +276,9 @@ describe("init-collectors", () => { }) it("should update metadata when monitor starts", async () => { + const mockSubject = new Subject() + makeMonitorStream.mockReturnValue(mockSubject) + const monitorConfig = createMockConfig({ epics: [ { name: "monitor", monitoringDuration: 5000, data_retention_mb: 10 }, @@ -283,7 +286,11 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + const promise = startMonitor(monitorConfig) + + // Emit first successful cycle to resolve the promise + mockSubject.next([{ ts: Date.now(), cmd: "GET key" }]) + await promise const meta = getCollectorMeta("monitor") expect(meta.isRunning).toBe(true) @@ -300,10 +307,11 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + const promise = startMonitor(monitorConfig) // Emit a next event mockSubject.next([{ ts: Date.now(), cmd: "GET key" }]) + await promise const meta = getCollectorMeta("monitor") expect(meta.lastUpdatedAt).toBe(Date.now()) @@ -318,11 +326,13 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + const promise = startMonitor(monitorConfig) // Emit an error mockSubject.error(new Error("Monitor error")) + await expect(promise).rejects.toThrow("Monitor error") + const meta = getCollectorMeta("monitor") expect(meta.isRunning).toBe(false) expect(meta.lastErrorAt).toBe(Date.now()) @@ -430,7 +440,10 @@ describe("init-collectors", () => { }) const { startMonitor, stopMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + const promise = startMonitor(monitorConfig) + + mockSubject.next([{ ts: Date.now(), cmd: "GET key" }]) + await promise await stopMonitor() From 956e1ef9124fd7546a6168390ce640ee11767233 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Mon, 22 Jun 2026 10:54:10 -0700 Subject: [PATCH 14/15] Refactor startMonitor to use two-phase connect/collect approach Signed-off-by: ravjotb --- apps/metrics/src/effects/monitor-stream.js | 110 ++++++++++++--------- apps/metrics/src/init-collectors.js | 93 +++++++++-------- apps/metrics/src/init-collectors.test.js | 39 +++----- 3 files changed, 120 insertions(+), 122 deletions(-) diff --git a/apps/metrics/src/effects/monitor-stream.js b/apps/metrics/src/effects/monitor-stream.js index 32a46b04..26d5bdd7 100644 --- a/apps/metrics/src/effects/monitor-stream.js +++ b/apps/metrics/src/effects/monitor-stream.js @@ -3,66 +3,87 @@ import { exhaustMap, catchError, map } from "rxjs" import Valkey from "iovalkey" import { ElastiCacheIAMProvider } from "../utils/elasticache-iam-provider.js" -export const makeMonitorStream = (onLogs = async () => { }, config) => { - const { monitoringInterval, monitoringDuration, maxCommandsPerRun: maxLogs } = config - +function getConnectionOptions() { const host = process.env.VALKEY_HOST const port = Number(process.env.VALKEY_PORT) - const username = process.env.VALKEY_USERNAME - const verifyTlsCertificate = process.env.VALKEY_VERIFY_CERT + const verifyTlsCertificate = process.env.VALKEY_VERIFY_CERT let tls = undefined if (process.env.VALKEY_TLS === "true") { - tls = verifyTlsCertificate === "false" ? { rejectUnauthorized: false } : {} + tls = verifyTlsCertificate === "false" ? { rejectUnauthorized: false } : {} } + return { host, port, username, tls } +} - const runMonitorOnce = async () => { - const password = process.env.VALKEY_AUTH_TYPE === "iam" - ? await new ElastiCacheIAMProvider(username, process.env.VALKEY_REPLICATION_GROUP_ID, process.env.VALKEY_AWS_REGION).getCredentials() - : process.env.VALKEY_PASSWORD - const monitorClient = new Valkey({ - host, - port, - username, - password, - tls, - }) - monitorClient.on("error", (err) => console.error("[monitor] ioredis client error:", err.message)) - const monitor = await monitorClient.monitor() - monitor.on("error", (err) => console.error("[monitor] monitor stream error:", err.message)) +async function getPassword() { + const username = process.env.VALKEY_USERNAME + return process.env.VALKEY_AUTH_TYPE === "iam" + ? await new ElastiCacheIAMProvider(username, process.env.VALKEY_REPLICATION_GROUP_ID, process.env.VALKEY_AWS_REGION).getCredentials() + : process.env.VALKEY_PASSWORD +} + +/** + * Phase 1: Connect and issue MONITOR command. + * Returns the client and monitor handle for use in Phase 2. + * Throws immediately if the command is unsupported. + */ +export const connectMonitor = async () => { + const { host, port, username, tls } = getConnectionOptions() + const password = await getPassword() + const client = new Valkey({ host, port, username, password, tls }) + client.on("error", (err) => console.error("[monitor] ioredis client error:", err.message)) + const monitor = await client.monitor() + monitor.on("error", (err) => console.error("[monitor] monitor stream error:", err.message)) + return { client, monitor } +} - const rows = [] - const overflow$ = new Subject() +/** + * Phase 2: Collect logs from an active monitor handle. + * Listens for monitoringDuration or until maxLogs is reached, then disconnects. + */ +export const collectLogs = async ({ client, monitor }, { monitoringDuration, maxCommandsPerRun: maxLogs }) => { + const rows = [] + const overflow$ = new Subject() - const processEvent = (time, args) => { - rows.push({ ts: time, command: args.join(" ") }) - if (rows.length >= maxLogs) overflow$.next() - } + const processEvent = (time, args) => { + rows.push({ ts: time, command: args.join(" ") }) + if (rows.length >= maxLogs) overflow$.next() + } - monitor.on("monitor", processEvent) + monitor.on("monitor", processEvent) - let monitorCompletionReason + let monitorCompletionReason - try { - monitorCompletionReason = await firstValueFrom( - race([ - timer(monitoringDuration).pipe(map(() => "Monitor duration completed.")), - overflow$.pipe(map(() => "Max logs read")), - ]), - ) - } finally { - monitor.off("monitor", processEvent) - await Promise.all([ - monitor.disconnect(), - monitorClient.disconnect(), - (async () => { overflow$.complete() })(), - ]) - console.debug(`Monitor run complete (${monitorCompletionReason}).`) - } + try { + monitorCompletionReason = await firstValueFrom( + race([ + timer(monitoringDuration).pipe(map(() => "Monitor duration completed.")), + overflow$.pipe(map(() => "Max logs read")), + ]), + ) + } finally { + monitor.off("monitor", processEvent) + await Promise.all([ + monitor.disconnect(), + client.disconnect(), + (async () => { overflow$.complete() })(), + ]) + console.debug(`Monitor run complete (${monitorCompletionReason}).`) + } + return rows +} + +export const makeMonitorStream = (onLogs = async () => { }, config) => { + const { monitoringInterval } = config + + const runMonitorOnce = async () => { + const handle = await connectMonitor() + const rows = await collectLogs(handle, config) if (rows.length > 0) await onLogs(rows) return rows } + const monitorStream$ = timer(0, monitoringInterval).pipe( exhaustMap(() => defer(runMonitorOnce).pipe( @@ -77,5 +98,4 @@ export const makeMonitorStream = (onLogs = async () => { }, config) => { ), ) return monitorStream$ - } diff --git a/apps/metrics/src/init-collectors.js b/apps/metrics/src/init-collectors.js index 688b5322..c5d7041d 100644 --- a/apps/metrics/src/init-collectors.js +++ b/apps/metrics/src/init-collectors.js @@ -1,5 +1,5 @@ import { makeFetcher } from "./effects/fetchers.js" -import { makeMonitorStream } from "./effects/monitor-stream.js" +import { makeMonitorStream, connectMonitor } from "./effects/monitor-stream.js" import { makeNdjsonWriter } from "./effects/ndjson-writer.js" import { startCollector } from "./epics/collector-rx.js" import { MONITOR } from "./utils/constants.js" @@ -41,8 +41,13 @@ let monitorStopper updateCollectorMeta(MONITOR, { isRunning: false, }) -const startMonitor = (cfg) => { +const startMonitor = async (cfg) => { const monitorEpic = cfg.epics.find((e) => e.name === MONITOR) + + // Phase 1: Verify MONITOR command is supported (fast — throws if not) + await connectMonitor() + + // Command works — set up collection const { maxFiles, maxFileSize } = computeCapacity(monitorEpic.data_retention_mb) const nd = makeNdjsonWriter({ dataDir: cfg.server.data_dir, @@ -59,62 +64,52 @@ const startMonitor = (cfg) => { close: nd.close, } + updateCollectorMeta(monitorEpic.name, { + isRunning: true, + startedAt: Date.now(), + willCompleteAt: Date.now() + monitorEpic.monitoringDuration, + }) + + // Phase 2: Start background collection stream const stream$ = makeMonitorStream(async (logs) => { await sink.appendRows(logs) }, monitorEpic) - return new Promise((resolve, reject) => { - let settled = false - - const subscription = stream$.subscribe({ - next: (logs) => { - if (!settled) { - settled = true - updateCollectorMeta(monitorEpic.name, { - isRunning: true, - startedAt: Date.now(), - willCompleteAt: Date.now() + monitorEpic.monitoringDuration, - }) - resolve() - } - updateCollectorMeta(monitorEpic.name, { - lastUpdatedAt: Date.now(), - }) - console.debug(`[${monitorEpic.name}] monitor cycle complete (${logs.length} logs)`) - }, - error: (err) => { - updateCollectorMeta(monitorEpic.name, { - isRunning: false, - lastErrorAt: Date.now(), - lastError: String(err), - willCompleteAt: null, - }) - console.error(`[${monitorEpic.name}] monitor error:`, err) - if (!settled) { - settled = true - reject(err) - } - }, - complete: () => { - updateCollectorMeta(monitorEpic.name, { - completedAt: Date.now(), - isRunning: false, - }) - console.debug(`[${monitorEpic.name}] monitor completed`) - }, - }) - - monitorStopper = async () => { - console.debug(`[${monitorEpic.name}] stopping monitor...`) + const subscription = stream$.subscribe({ + next: (logs) => { + updateCollectorMeta(monitorEpic.name, { + lastUpdatedAt: Date.now(), + }) + console.debug(`[${monitorEpic.name}] monitor cycle complete (${logs.length} logs)`) + }, + error: (err) => { updateCollectorMeta(monitorEpic.name, { - stoppedAt: Date.now(), isRunning: false, + lastErrorAt: Date.now(), + lastError: String(err), willCompleteAt: null, }) - subscription.unsubscribe() - await sink.close() - } + console.error(`[${monitorEpic.name}] monitor error:`, err) + }, + complete: () => { + updateCollectorMeta(monitorEpic.name, { + completedAt: Date.now(), + isRunning: false, + }) + console.debug(`[${monitorEpic.name}] monitor completed`) + }, }) + + monitorStopper = async () => { + console.debug(`[${monitorEpic.name}] stopping monitor...`) + updateCollectorMeta(monitorEpic.name, { + stoppedAt: Date.now(), + isRunning: false, + willCompleteAt: null, + }) + subscription.unsubscribe() + await sink.close() + } } const stopMonitor = async () => await monitorStopper() diff --git a/apps/metrics/src/init-collectors.test.js b/apps/metrics/src/init-collectors.test.js index 17e0a846..233da9f6 100644 --- a/apps/metrics/src/init-collectors.test.js +++ b/apps/metrics/src/init-collectors.test.js @@ -9,6 +9,7 @@ vi.mock("./effects/fetchers.js", () => ({ vi.mock("./effects/monitor-stream.js", () => ({ makeMonitorStream: vi.fn(), + connectMonitor: vi.fn().mockResolvedValue(undefined), })) vi.mock("./effects/ndjson-writer.js", () => ({ @@ -267,7 +268,7 @@ describe("init-collectors", () => { }) const { startMonitor } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) expect(makeMonitorStream).toHaveBeenCalled() const monitorEpic = makeMonitorStream.mock.calls[0][1] @@ -286,11 +287,7 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - const promise = startMonitor(monitorConfig) - - // Emit first successful cycle to resolve the promise - mockSubject.next([{ ts: Date.now(), cmd: "GET key" }]) - await promise + await startMonitor(monitorConfig) const meta = getCollectorMeta("monitor") expect(meta.isRunning).toBe(true) @@ -307,37 +304,26 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - const promise = startMonitor(monitorConfig) + await startMonitor(monitorConfig) // Emit a next event mockSubject.next([{ ts: Date.now(), cmd: "GET key" }]) - await promise const meta = getCollectorMeta("monitor") expect(meta.lastUpdatedAt).toBe(Date.now()) }) it("should handle monitor errors", async () => { - const mockSubject = new Subject() - makeMonitorStream.mockReturnValue(mockSubject) + const { connectMonitor } = await import("./effects/monitor-stream.js") + connectMonitor.mockRejectedValueOnce(new Error("Monitor error")) const monitorConfig = createMockConfig({ epics: [{ name: "monitor", monitoringDuration: 5000, data_retention_mb: 10 }], }) - const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - const promise = startMonitor(monitorConfig) - - // Emit an error - mockSubject.error(new Error("Monitor error")) - - await expect(promise).rejects.toThrow("Monitor error") + const { startMonitor } = await import("./init-collectors.js") - const meta = getCollectorMeta("monitor") - expect(meta.isRunning).toBe(false) - expect(meta.lastErrorAt).toBe(Date.now()) - expect(meta.lastError).toContain("Monitor error") - expect(meta.willCompleteAt).toBeNull() + await expect(startMonitor(monitorConfig)).rejects.toThrow("Monitor error") }) it("should update metadata on completion", async () => { @@ -349,7 +335,7 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) // Complete the stream mockSubject.complete() @@ -384,7 +370,7 @@ describe("init-collectors", () => { }) const { startMonitor, stopMonitor } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) await stopMonitor() @@ -440,10 +426,7 @@ describe("init-collectors", () => { }) const { startMonitor, stopMonitor, getCollectorMeta } = await import("./init-collectors.js") - const promise = startMonitor(monitorConfig) - - mockSubject.next([{ ts: Date.now(), cmd: "GET key" }]) - await promise + await startMonitor(monitorConfig) await stopMonitor() From f227716c47ea406e6d40b11113be8744704b1cd8 Mon Sep 17 00:00:00 2001 From: ravjotb Date: Wed, 24 Jun 2026 14:35:03 -0700 Subject: [PATCH 15/15] Address review comments Signed-off-by: ravjotb --- .../src/components/activity-view/ActivityView.tsx | 9 +++++++-- .../src/state/valkey-features/monitor/monitorSlice.ts | 2 +- apps/metrics/src/effects/monitor-stream.js | 2 +- package-lock.json | 4 ++-- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/apps/frontend/src/components/activity-view/ActivityView.tsx b/apps/frontend/src/components/activity-view/ActivityView.tsx index db16eb2f..846fbf4f 100644 --- a/apps/frontend/src/components/activity-view/ActivityView.tsx +++ b/apps/frontend/src/components/activity-view/ActivityView.tsx @@ -23,7 +23,12 @@ import { hotKeysRequested, selectHotKeys, selectHotKeysStatus, selectHotKeysError, selectHotKeysNodeErrors, selectHotKeysLastCollectedAt } from "@/state/valkey-features/hotkeys/hotKeysSlice" -import { monitorRequested, selectMonitorRunning, selectClusterMonitorRunning, selectMonitorError } from "@/state/valkey-features/monitor/monitorSlice" +import { + monitorRequested, + selectMonitorRunning, + selectClusterMonitorRunning, + selectMonitorError +} from "@/state/valkey-features/monitor/monitorSlice" import { selectConnectionDetails, selectClusterAlias } from "@/state/valkey-features/connection/connectionSelectors" import { getKeyTypeRequested } from "@/state/valkey-features/keys/keyBrowserSlice" import { selectKeys } from "@/state/valkey-features/keys/keyBrowserSelectors" @@ -66,7 +71,7 @@ export const ActivityView = () => { const monitorRunning = useSelector((state: RootState) => clusterId ? selectClusterMonitorRunning(clusterId)(state) : selectMonitorRunning(nodeId)(state), ) - const monitorError = useSelector(selectMonitorError(id!)) + const monitorError = useSelector(selectMonitorError(nodeId)) const connectionDetails = useSelector((state: RootState) => selectConnectionDetails(id!)(state)) const clusterAlias = useSelector(selectClusterAlias(id!)) const useHotSlots = connectionDetails?.keyEvictionPolicy?.includes("lfu") && connectionDetails?.clusterSlotStatsEnabled diff --git a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts index 32c359c5..b7796297 100644 --- a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts +++ b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts @@ -105,7 +105,7 @@ const monitorSlice = createSlice({ state[nodeId].checkAt = parsedResponse.checkAt ?? null state[nodeId].startedAt = parsedResponse.startedAt ?? null state[nodeId].loading = false - if (parsedResponse.monitorRunning) state[connectionId].error = null + if (parsedResponse.monitorRunning) state[nodeId].error = null }, // eslint-disable-next-line @typescript-eslint/no-unused-vars saveMonitorSettingsRequested: (_state, _action) => { diff --git a/apps/metrics/src/effects/monitor-stream.js b/apps/metrics/src/effects/monitor-stream.js index 26d5bdd7..73f366f0 100644 --- a/apps/metrics/src/effects/monitor-stream.js +++ b/apps/metrics/src/effects/monitor-stream.js @@ -31,7 +31,7 @@ export const connectMonitor = async () => { const { host, port, username, tls } = getConnectionOptions() const password = await getPassword() const client = new Valkey({ host, port, username, password, tls }) - client.on("error", (err) => console.error("[monitor] ioredis client error:", err.message)) + client.on("error", (err) => console.error("[monitor] iovalkey client error:", err.message)) const monitor = await client.monitor() monitor.on("error", (err) => console.error("[monitor] monitor stream error:", err.message)) return { client, monitor } diff --git a/package-lock.json b/package-lock.json index 18de7cb5..7dcc951b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,10 +8,10 @@ "name": "valkey-admin", "version": "1.0.1", "workspaces": [ + "common", "apps/frontend", "apps/server", - "apps/metrics", - "common" + "apps/metrics" ], "dependencies": { "@types/express": "^5.0.6"