diff --git a/apps/frontend/src/components/activity-view/ActivityView.tsx b/apps/frontend/src/components/activity-view/ActivityView.tsx index cc160fab..846fbf4f 100644 --- a/apps/frontend/src/components/activity-view/ActivityView.tsx +++ b/apps/frontend/src/components/activity-view/ActivityView.tsx @@ -23,7 +23,12 @@ import { hotKeysRequested, selectHotKeys, selectHotKeysStatus, selectHotKeysError, selectHotKeysNodeErrors, selectHotKeysLastCollectedAt } from "@/state/valkey-features/hotkeys/hotKeysSlice" -import { monitorRequested, selectMonitorRunning, selectClusterMonitorRunning } from "@/state/valkey-features/monitor/monitorSlice" +import { + monitorRequested, + selectMonitorRunning, + selectClusterMonitorRunning, + selectMonitorError +} from "@/state/valkey-features/monitor/monitorSlice" import { selectConnectionDetails, selectClusterAlias } from "@/state/valkey-features/connection/connectionSelectors" import { getKeyTypeRequested } from "@/state/valkey-features/keys/keyBrowserSlice" import { selectKeys } from "@/state/valkey-features/keys/keyBrowserSelectors" @@ -66,6 +71,7 @@ export const ActivityView = () => { const monitorRunning = useSelector((state: RootState) => clusterId ? selectClusterMonitorRunning(clusterId)(state) : selectMonitorRunning(nodeId)(state), ) + const monitorError = useSelector(selectMonitorError(nodeId)) const connectionDetails = useSelector((state: RootState) => selectConnectionDetails(id!)(state)) const clusterAlias = useSelector(selectClusterAlias(id!)) const useHotSlots = connectionDetails?.keyEvictionPolicy?.includes("lfu") && connectionDetails?.clusterSlotStatsEnabled @@ -211,6 +217,7 @@ export const ActivityView = () => { data={hotKeysData} errorMessage={hotKeysErrorMessage as string | null} isCluster={!!clusterId} + monitorError={monitorError} monitorRunning={monitorRunning} nodeErrors={hotKeysNodeErrors} onKeyClick={handleKeyClick} diff --git a/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx b/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx index b43f66a3..7a221e68 100644 --- a/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx +++ b/apps/frontend/src/components/activity-view/hotkeys/hot-keys-banners.tsx @@ -59,22 +59,27 @@ export function NodeErrorsBanner({ nodeErrors }: NodeErrorsBannerProps) { interface MonitorNotRunningBannerProps { onStartMonitoring: () => void + error?: string | null } -export function MonitorNotRunningBanner({ onStartMonitoring }: MonitorNotRunningBannerProps) { +export function MonitorNotRunningBanner({ onStartMonitoring, error }: MonitorNotRunningBannerProps) { return (
- Monitor is not running. Showing last known data.{" "} - + {error + ? <>Monitor failed: {error} + : <>Monitor is not running. Showing last known data.{" "} + + + }
) diff --git a/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx b/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx index 9c3ce129..1bc4cfaf 100644 --- a/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx +++ b/apps/frontend/src/components/activity-view/hotkeys/hot-keys.tsx @@ -17,6 +17,7 @@ interface HotKeysProps { errorMessage: string | null status?: string monitorRunning?: boolean + monitorError?: string | null nodeErrors?: { nodeId: string; error: string }[] isCluster?: boolean onKeyClick?: (keyName: string) => void @@ -25,7 +26,7 @@ interface HotKeysProps { } export function HotKeys({ - data, errorMessage, status, monitorRunning, nodeErrors, + data, errorMessage, status, monitorRunning, monitorError, nodeErrors, isCluster, onKeyClick, onStartMonitoring, selectedKey, }: HotKeysProps) { const [sortOrder, setSortOrder] = useState("desc") @@ -63,7 +64,7 @@ export function HotKeys({ const banners = ( <> {!monitorRunning && onStartMonitoring && ( - + )} {nodeErrors && nodeErrors.length > 0 && ( diff --git a/apps/frontend/src/components/dashboard/Dashboard.tsx b/apps/frontend/src/components/dashboard/Dashboard.tsx index b5626dd9..a16fe345 100644 --- a/apps/frontend/src/components/dashboard/Dashboard.tsx +++ b/apps/frontend/src/components/dashboard/Dashboard.tsx @@ -154,7 +154,7 @@ export function Dashboard() { className="flex-1" icon={} label="Used Memory" - value={formatBytes(memoryUsageMetrics.used_memory || 0)} + value={formatBytes(memoryUsageMetrics.used_memory || -1)} /> { (stats[key.type] ??= { count: 0, totalSize: 0 }).count += 1 - stats[key.type].totalSize += key.size + if (key.size > 0) stats[key.type].totalSize += key.size return stats }, {} as Record) diff --git a/apps/frontend/src/state/epics/valkeyEpics.ts b/apps/frontend/src/state/epics/valkeyEpics.ts index 31fe4c72..bda71045 100644 --- a/apps/frontend/src/state/epics/valkeyEpics.ts +++ b/apps/frontend/src/state/epics/valkeyEpics.ts @@ -99,7 +99,16 @@ export const connectionEpic = (store: Store) => currentConnections[payload.connectionId] = connectionToSave localStorage.setItem(LOCAL_STORAGE.VALKEY_CONNECTIONS, JSON.stringify(currentConnections)) - toast.success("Connected to server successfully!") + if (baseConnectionDetails?.host?.includes(".serverless.")) { + toast.warning( + `Connected to an ElastiCache Serverless cache. + Some features like Dashboard metrics, Command Logs, and Monitor are not available. + Key Browser and Send Command work as expected.`, + { duration: 10000 }, + ) + } else { + toast.success("Connected to server successfully!") + } } catch (e) { toast.error("Connection to server failed!") console.error(e) diff --git a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts index a76ea397..b7796297 100644 --- a/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts +++ b/apps/frontend/src/state/valkey-features/monitor/monitorSlice.ts @@ -14,6 +14,11 @@ export const selectMonitorLoading = (state: RootState) => R.path([VALKEY.MONITOR.name, nodeId, "loading"], state) ?? false +export const selectMonitorError = + (connectionId: string) => + (state: RootState) => + R.path([VALKEY.MONITOR.name, connectionId, "error"], state) ?? null + export const selectRunningMonitorConnections = (state: RootState): { nodeId: string; clusterId?: string; startedAt: number | null }[] => { const monitorState = R.path([VALKEY.MONITOR.name], state) ?? {} @@ -100,7 +105,7 @@ const monitorSlice = createSlice({ state[nodeId].checkAt = parsedResponse.checkAt ?? null state[nodeId].startedAt = parsedResponse.startedAt ?? null state[nodeId].loading = false - state[nodeId].error = null + if (parsedResponse.monitorRunning) state[nodeId].error = null }, // eslint-disable-next-line @typescript-eslint/no-unused-vars saveMonitorSettingsRequested: (_state, _action) => { diff --git a/apps/metrics/src/effects/monitor-stream.js b/apps/metrics/src/effects/monitor-stream.js index 09050f18..73f366f0 100644 --- a/apps/metrics/src/effects/monitor-stream.js +++ b/apps/metrics/src/effects/monitor-stream.js @@ -3,76 +3,99 @@ import { exhaustMap, catchError, map } from "rxjs" import Valkey from "iovalkey" import { ElastiCacheIAMProvider } from "../utils/elasticache-iam-provider.js" -export const makeMonitorStream = (onLogs = async () => { }, config) => { - const { monitoringInterval, monitoringDuration, maxCommandsPerRun: maxLogs } = config - +function getConnectionOptions() { const host = process.env.VALKEY_HOST const port = Number(process.env.VALKEY_PORT) - const username = process.env.VALKEY_USERNAME - const verifyTlsCertificate = process.env.VALKEY_VERIFY_CERT + const verifyTlsCertificate = process.env.VALKEY_VERIFY_CERT let tls = undefined if (process.env.VALKEY_TLS === "true") { - tls = verifyTlsCertificate === "false" ? { rejectUnauthorized: false } : {} + tls = verifyTlsCertificate === "false" ? { rejectUnauthorized: false } : {} } + return { host, port, username, tls } +} - const runMonitorOnce = async () => { - const password = process.env.VALKEY_AUTH_TYPE === "iam" - ? await new ElastiCacheIAMProvider(username, process.env.VALKEY_REPLICATION_GROUP_ID, process.env.VALKEY_AWS_REGION).getCredentials() - : process.env.VALKEY_PASSWORD - const monitorClient = new Valkey({ - host, - port, - username, - password, - tls, - }) - monitorClient.on("error", (err) => console.error("[monitor] ioredis client error:", err.message)) - const monitor = await monitorClient.monitor() - monitor.on("error", (err) => console.error("[monitor] monitor stream error:", err.message)) +async function getPassword() { + const username = process.env.VALKEY_USERNAME + return process.env.VALKEY_AUTH_TYPE === "iam" + ? await new ElastiCacheIAMProvider(username, process.env.VALKEY_REPLICATION_GROUP_ID, process.env.VALKEY_AWS_REGION).getCredentials() + : process.env.VALKEY_PASSWORD +} + +/** + * Phase 1: Connect and issue MONITOR command. + * Returns the client and monitor handle for use in Phase 2. + * Throws immediately if the command is unsupported. + */ +export const connectMonitor = async () => { + const { host, port, username, tls } = getConnectionOptions() + const password = await getPassword() + const client = new Valkey({ host, port, username, password, tls }) + client.on("error", (err) => console.error("[monitor] iovalkey client error:", err.message)) + const monitor = await client.monitor() + monitor.on("error", (err) => console.error("[monitor] monitor stream error:", err.message)) + return { client, monitor } +} - const rows = [] - const overflow$ = new Subject() +/** + * Phase 2: Collect logs from an active monitor handle. + * Listens for monitoringDuration or until maxLogs is reached, then disconnects. + */ +export const collectLogs = async ({ client, monitor }, { monitoringDuration, maxCommandsPerRun: maxLogs }) => { + const rows = [] + const overflow$ = new Subject() - const processEvent = (time, args) => { - rows.push({ ts: time, command: args.join(" ") }) - if (rows.length >= maxLogs) overflow$.next() - } + const processEvent = (time, args) => { + rows.push({ ts: time, command: args.join(" ") }) + if (rows.length >= maxLogs) overflow$.next() + } - monitor.on("monitor", processEvent) + monitor.on("monitor", processEvent) - let monitorCompletionReason + let monitorCompletionReason - try { - monitorCompletionReason = await firstValueFrom( - race([ - timer(monitoringDuration).pipe(map(() => "Monitor duration completed.")), - overflow$.pipe(map(() => "Max logs read")), - ]), - ) - } finally { - monitor.off("monitor", processEvent) - await Promise.all([ - monitor.disconnect(), - monitorClient.disconnect(), - (async () => { overflow$.complete() })(), - ]) - console.debug(`Monitor run complete (${monitorCompletionReason}).`) - } + try { + monitorCompletionReason = await firstValueFrom( + race([ + timer(monitoringDuration).pipe(map(() => "Monitor duration completed.")), + overflow$.pipe(map(() => "Max logs read")), + ]), + ) + } finally { + monitor.off("monitor", processEvent) + await Promise.all([ + monitor.disconnect(), + client.disconnect(), + (async () => { overflow$.complete() })(), + ]) + console.debug(`Monitor run complete (${monitorCompletionReason}).`) + } + return rows +} + +export const makeMonitorStream = (onLogs = async () => { }, config) => { + const { monitoringInterval } = config + + const runMonitorOnce = async () => { + const handle = await connectMonitor() + const rows = await collectLogs(handle, config) if (rows.length > 0) await onLogs(rows) return rows } + const monitorStream$ = timer(0, monitoringInterval).pipe( exhaustMap(() => defer(runMonitorOnce).pipe( catchError((err) => { console.error("Monitor cycle failed", err) + if (err.message?.includes("unknown command")) { + throw err + } return of([]) }), ), ), ) return monitorStream$ - } diff --git a/apps/metrics/src/handlers/commandlog-handler.js b/apps/metrics/src/handlers/commandlog-handler.js index ffa5eb7c..5b61353a 100644 --- a/apps/metrics/src/handlers/commandlog-handler.js +++ b/apps/metrics/src/handlers/commandlog-handler.js @@ -1,7 +1,13 @@ -import { COMMANDLOG_TYPE } from "../utils/constants.js" +import { COMMANDLOG_TYPE, COMMANDLOG_SLOW, COMMANDLOG_LARGE_REPLY, COMMANDLOG_LARGE_REQUEST } from "../utils/constants.js" import { getCollectorMeta } from "../init-collectors.js" import * as Streamer from "../effects/ndjson-streamer.js" +const collectorNameByType = { + [COMMANDLOG_TYPE.SLOW]: COMMANDLOG_SLOW, + [COMMANDLOG_TYPE.LARGE_REQUEST]: COMMANDLOG_LARGE_REQUEST, + [COMMANDLOG_TYPE.LARGE_REPLY]: COMMANDLOG_LARGE_REPLY, +} + const latestByTsFold = () => ({ seed: null, reducer: (acc, curr) => (acc == null || curr.ts > acc.ts ? curr : acc), @@ -30,7 +36,11 @@ const getCommandLogRows = async (commandlogType) => { export const getCommandLogs = async (req, res, nodeId) => { try { const commandlogType = req.query.type - const { lastUpdatedAt, nextCycleAt } = getCollectorMeta(commandlogType) || {} + const meta = getCollectorMeta(collectorNameByType[commandlogType]) + if (meta?.error) { + return res.status(503).json({ error: meta.error }) + } + const { lastUpdatedAt, nextCycleAt } = meta || {} if (lastUpdatedAt !== null) { const count = Number(req.query.count) || 100 const rows = await getCommandLogRows(commandlogType) diff --git a/apps/metrics/src/init-collectors.js b/apps/metrics/src/init-collectors.js index b83c0dcc..c5d7041d 100644 --- a/apps/metrics/src/init-collectors.js +++ b/apps/metrics/src/init-collectors.js @@ -1,5 +1,5 @@ import { makeFetcher } from "./effects/fetchers.js" -import { makeMonitorStream } from "./effects/monitor-stream.js" +import { makeMonitorStream, connectMonitor } from "./effects/monitor-stream.js" import { makeNdjsonWriter } from "./effects/ndjson-writer.js" import { startCollector } from "./epics/collector-rx.js" import { MONITOR } from "./utils/constants.js" @@ -41,8 +41,13 @@ let monitorStopper updateCollectorMeta(MONITOR, { isRunning: false, }) -const startMonitor = (cfg) => { +const startMonitor = async (cfg) => { const monitorEpic = cfg.epics.find((e) => e.name === MONITOR) + + // Phase 1: Verify MONITOR command is supported (fast — throws if not) + await connectMonitor() + + // Command works — set up collection const { maxFiles, maxFileSize } = computeCapacity(monitorEpic.data_retention_mb) const nd = makeNdjsonWriter({ dataDir: cfg.server.data_dir, @@ -65,6 +70,7 @@ const startMonitor = (cfg) => { willCompleteAt: Date.now() + monitorEpic.monitoringDuration, }) + // Phase 2: Start background collection stream const stream$ = makeMonitorStream(async (logs) => { await sink.appendRows(logs) }, monitorEpic) @@ -153,6 +159,7 @@ const setupCollectors = async (client, cfg) => { rows = await fn() } catch (err) { console.warn(`[${f.name}] initial fetch failed, skipping collector:`, err.message) + updateCollectorMeta(f.name, { isRunning: false, error: err.message }) nd.close() return } diff --git a/apps/metrics/src/init-collectors.test.js b/apps/metrics/src/init-collectors.test.js index ad0ac8ba..233da9f6 100644 --- a/apps/metrics/src/init-collectors.test.js +++ b/apps/metrics/src/init-collectors.test.js @@ -9,6 +9,7 @@ vi.mock("./effects/fetchers.js", () => ({ vi.mock("./effects/monitor-stream.js", () => ({ makeMonitorStream: vi.fn(), + connectMonitor: vi.fn().mockResolvedValue(undefined), })) vi.mock("./effects/ndjson-writer.js", () => ({ @@ -267,7 +268,7 @@ describe("init-collectors", () => { }) const { startMonitor } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) expect(makeMonitorStream).toHaveBeenCalled() const monitorEpic = makeMonitorStream.mock.calls[0][1] @@ -276,6 +277,9 @@ describe("init-collectors", () => { }) it("should update metadata when monitor starts", async () => { + const mockSubject = new Subject() + makeMonitorStream.mockReturnValue(mockSubject) + const monitorConfig = createMockConfig({ epics: [ { name: "monitor", monitoringDuration: 5000, data_retention_mb: 10 }, @@ -283,7 +287,7 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) const meta = getCollectorMeta("monitor") expect(meta.isRunning).toBe(true) @@ -300,7 +304,7 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) // Emit a next event mockSubject.next([{ ts: Date.now(), cmd: "GET key" }]) @@ -310,24 +314,16 @@ describe("init-collectors", () => { }) it("should handle monitor errors", async () => { - const mockSubject = new Subject() - makeMonitorStream.mockReturnValue(mockSubject) + const { connectMonitor } = await import("./effects/monitor-stream.js") + connectMonitor.mockRejectedValueOnce(new Error("Monitor error")) const monitorConfig = createMockConfig({ epics: [{ name: "monitor", monitoringDuration: 5000, data_retention_mb: 10 }], }) - const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) - - // Emit an error - mockSubject.error(new Error("Monitor error")) + const { startMonitor } = await import("./init-collectors.js") - const meta = getCollectorMeta("monitor") - expect(meta.isRunning).toBe(false) - expect(meta.lastErrorAt).toBe(Date.now()) - expect(meta.lastError).toContain("Monitor error") - expect(meta.willCompleteAt).toBeNull() + await expect(startMonitor(monitorConfig)).rejects.toThrow("Monitor error") }) it("should update metadata on completion", async () => { @@ -339,7 +335,7 @@ describe("init-collectors", () => { }) const { startMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) // Complete the stream mockSubject.complete() @@ -374,7 +370,7 @@ describe("init-collectors", () => { }) const { startMonitor, stopMonitor } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) await stopMonitor() @@ -430,7 +426,7 @@ describe("init-collectors", () => { }) const { startMonitor, stopMonitor, getCollectorMeta } = await import("./init-collectors.js") - startMonitor(monitorConfig) + await startMonitor(monitorConfig) await stopMonitor() diff --git a/apps/server/src/__tests__/keys-browser.test.ts b/apps/server/src/__tests__/keys-browser.test.ts index 59e2e5ef..de831a15 100644 --- a/apps/server/src/__tests__/keys-browser.test.ts +++ b/apps/server/src/__tests__/keys-browser.test.ts @@ -56,7 +56,7 @@ describe("getKeyInfo", () => { const result = await getKeyInfo(mockClient as any, "mykey") - assert.strictEqual(result.size, 0) + assert.strictEqual(result.size, -1) }) }) @@ -189,7 +189,7 @@ describe("getKeyInfo", () => { assert.strictEqual(result.name, "failkey") assert.strictEqual(result.type, "unknown") assert.strictEqual(result.ttl, -1) - assert.strictEqual(result.size, 0) + assert.strictEqual(result.size, -1) }) }) }) @@ -724,7 +724,7 @@ describe("getKeyInfoSingle", () => { assert.strictEqual(sentMessage.payload.key, "mykey") assert.strictEqual(sentMessage.payload.type, "unknown") assert.strictEqual(sentMessage.payload.ttl, -1) - assert.strictEqual(sentMessage.payload.size, 0) + assert.strictEqual(sentMessage.payload.size, -1) }) }) }) diff --git a/apps/server/src/actions/commandLogs.ts b/apps/server/src/actions/commandLogs.ts index 44fb13f0..28b54bd5 100644 --- a/apps/server/src/actions/commandLogs.ts +++ b/apps/server/src/actions/commandLogs.ts @@ -84,6 +84,10 @@ const fetchCommandLogs = async (metricsServerURI: string, commandLogType: Comman const count = process.env.COMMAND_LOGS_COUNT || "100" const url = `${metricsServerURI}/commandlog?type=${commandLogType}&count=${count}` const initialResponse = await fetch(url) + if (!initialResponse.ok) { + const body = await initialResponse.json().catch(() => ({})) as { error?: string } + throw new Error(body.error ?? `HTTP ${initialResponse.status}`) + } const parsed: CommandLogResponse = await initialResponse.json() as CommandLogResponse if (parsed.checkAt) { const delay = parsed.checkAt - Date.now() diff --git a/apps/server/src/keys-browser.ts b/apps/server/src/keys-browser.ts index 52d4dd8c..eebe8925 100644 --- a/apps/server/src/keys-browser.ts +++ b/apps/server/src/keys-browser.ts @@ -346,7 +346,7 @@ export async function getKeyInfo( name: key, type: keyType, ttl: ttl, - size: memoryUsage || 0, + size: memoryUsage || -1, } // Get collection size and elements for each type @@ -401,7 +401,7 @@ export async function getKeyInfo( name: key, type: "unknown", ttl: -1, - size: 0, + size: -1, } } } @@ -517,7 +517,7 @@ export async function getKeys( name: k, type: "unknown", ttl: -1, - size: 0, + size: -1, })), ), ), diff --git a/common/src/bytes-conversion.ts b/common/src/bytes-conversion.ts index 9caa1f82..a13a04fc 100644 --- a/common/src/bytes-conversion.ts +++ b/common/src/bytes-conversion.ts @@ -1,4 +1,5 @@ export function formatBytes(bytes: number) { + if (bytes < 0) return "—" if (bytes === 0) return "0 B" const k = 1024 const sizes = ["B", "KB", "MB", "GB"] diff --git a/common/src/memory-usage-calculation.ts b/common/src/memory-usage-calculation.ts index ae3cd56c..4ed57ef2 100644 --- a/common/src/memory-usage-calculation.ts +++ b/common/src/memory-usage-calculation.ts @@ -7,5 +7,7 @@ interface KeyInfo { } export function calculateTotalMemoryUsage (keys: KeyInfo[]) { - return keys.reduce((total, key) => total + (key.size || 0), 0) + const validKeys = keys.filter((key) => key.size > 0) + if (validKeys.length === 0) return -1 + return validKeys.reduce((total, key) => total + key.size, 0) }; diff --git a/package-lock.json b/package-lock.json index 18de7cb5..7dcc951b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,10 +8,10 @@ "name": "valkey-admin", "version": "1.0.1", "workspaces": [ + "common", "apps/frontend", "apps/server", - "apps/metrics", - "common" + "apps/metrics" ], "dependencies": { "@types/express": "^5.0.6"