diff --git a/apps/frontend/src/components/activity-view/ActivityView.tsx b/apps/frontend/src/components/activity-view/ActivityView.tsx index 846fbf4f..b18ea061 100644 --- a/apps/frontend/src/components/activity-view/ActivityView.tsx +++ b/apps/frontend/src/components/activity-view/ActivityView.tsx @@ -11,6 +11,7 @@ import { TabGroup } from "../ui/tab-group" import { ButtonGroup } from "../ui/button-group" import { HotKeys } from "./hotkeys/hot-keys" import { HotKeysParamsModal } from "./hotkeys/hot-keys-params-modal" +import { BigKeys } from "./bigkeys/big-keys" import { CommandLogTable } from "./command-log-table" import KeyDetails from "../key-browser/key-details/key-details" import RouteContainer from "../ui/route-container" @@ -23,17 +24,21 @@ import { hotKeysRequested, selectHotKeys, selectHotKeysStatus, selectHotKeysError, selectHotKeysNodeErrors, selectHotKeysLastCollectedAt } from "@/state/valkey-features/hotkeys/hotKeysSlice" -import { - monitorRequested, - selectMonitorRunning, - selectClusterMonitorRunning, - selectMonitorError +import { + bigKeysRequested, selectBigKeys, selectBigKeysStatus, selectBigKeysError, + selectBigKeysNodeErrors, selectBigKeysScanned, selectBigKeysTotalKeys +} from "@/state/valkey-features/bigkeys/bigKeysSlice" +import { + monitorRequested, + selectMonitorRunning, + selectClusterMonitorRunning, + selectMonitorError } from "@/state/valkey-features/monitor/monitorSlice" import { selectConnectionDetails, selectClusterAlias } from "@/state/valkey-features/connection/connectionSelectors" import { getKeyTypeRequested } from "@/state/valkey-features/keys/keyBrowserSlice" import { selectKeys } from "@/state/valkey-features/keys/keyBrowserSelectors" -type TabType = "hot-keys" | "command-logs" +type TabType = "hot-keys" | "big-keys" | "command-logs" type CommandLogSubTab = "slow" | "large-request" | "large-reply" interface KeyInfo { @@ -68,6 +73,12 @@ export const ActivityView = () => { const hotKeysErrorMessage = useSelector((state: RootState) => selectHotKeysError(targetId)(state)) const hotKeysNodeErrors = useSelector((state: RootState) => selectHotKeysNodeErrors(targetId)(state)) const hotKeysLastCollectedAt = useSelector((state: RootState) => selectHotKeysLastCollectedAt(targetId)(state)) + const bigKeysData = useSelector((state: RootState) => selectBigKeys(targetId)(state)) + const bigKeysStatus = useSelector((state: RootState) => selectBigKeysStatus(targetId)(state)) + const bigKeysErrorMessage = useSelector((state: RootState) => selectBigKeysError(targetId)(state)) + const bigKeysNodeErrors = useSelector((state: RootState) => selectBigKeysNodeErrors(targetId)(state)) + const bigKeysScanned = useSelector((state: RootState) => selectBigKeysScanned(targetId)(state)) + const bigKeysTotalKeys = useSelector((state: RootState) => selectBigKeysTotalKeys(targetId)(state)) const monitorRunning = useSelector((state: RootState) => clusterId ? selectClusterMonitorRunning(clusterId)(state) : selectMonitorRunning(nodeId)(state), ) @@ -107,6 +118,19 @@ export const ActivityView = () => { } } + // Big keys are scanned on demand - they get fetched when the tab is opened + useEffect(() => { + if (id && activeTab === "big-keys" && bigKeysStatus === undefined) { + dispatch(bigKeysRequested({ connectionId: id, clusterId })) + } + }, [activeTab, bigKeysStatus, id, clusterId, dispatch]) + + const refreshBigKeys = () => { + if (id) { + dispatch(bigKeysRequested({ connectionId: id, clusterId })) + } + } + const getCurrentCommandLogData = () => { switch (commandLogSubTab) { case "slow": @@ -133,8 +157,12 @@ export const ActivityView = () => { ? keys.find((k) => k.name === selectedKey) ?? null : null + // Big keys can exceed the readable size limit, so details are hot-keys only. + const showKeyDetails = activeTab === "hot-keys" && !!selectedKey + const tabs = [ { id: "hot-keys" as TabType, label: "Hot Keys" }, + { id: "big-keys" as TabType, label: "Big Keys" }, { id: "command-logs" as TabType, label: "Command Logs" }, ] @@ -150,7 +178,7 @@ export const ActivityView = () => { - Monitor Hot Keys and Command Logs of{" "} + Monitor Hot Keys, Big Keys and Command Logs of{" "} {clusterId ? ( <> cluster {" "} {truncateText(clusterAlias || clusterId!)} @@ -186,6 +214,24 @@ export const ActivityView = () => { )} + {/* Big Keys Refresh */} + {activeTab === "big-keys" && ( +
+ {bigKeysScanned !== null && bigKeysTotalKeys !== null && ( + + Scanned {bigKeysScanned.toLocaleString()} of {bigKeysTotalKeys.toLocaleString()} keys + + )} + +
+ )} + {/* Command Log Sub-tabs and Refresh */} {activeTab === "command-logs" && (
@@ -208,27 +254,46 @@ export const ActivityView = () => {
{/* Tab Content */} - {activeTab === "hot-keys" ? ( + {activeTab === "command-logs" ? ( +
+ +
+ ) : (
- {/* Hot Keys List */} -
+ {/* Keys List (hot keys or big keys) */} +
- setConfigOpen(true)} - selectedKey={selectedKey} - status={hotKeysStatus} - /> + {activeTab === "hot-keys" ? ( + setConfigOpen(true)} + selectedKey={selectedKey} + status={hotKeysStatus} + /> + ) : ( + + )}
- {/* Key Details Panel */} - {selectedKey && ( + {/* Key Details Panel (hot keys only; big keys can exceed the readable size limit) */} + {showKeyDetails && (
{
)}
- ) : ( -
- -
)} diff --git a/apps/frontend/src/components/activity-view/bigkeys/big-keys-table.tsx b/apps/frontend/src/components/activity-view/bigkeys/big-keys-table.tsx new file mode 100644 index 00000000..5fa1daec --- /dev/null +++ b/apps/frontend/src/components/activity-view/bigkeys/big-keys-table.tsx @@ -0,0 +1,126 @@ +import { Copy, KeyRound } from "lucide-react" +import { toast } from "sonner" +import { convertTTL } from "@common/src/ttl-conversion" +import { formatBytes } from "@common/src/bytes-conversion" +import { truncateText } from "@common/src/truncate-text" +import { TableContainer } from "../../ui/table-container" +import { SortableTableHeader, StaticTableHeader, type SortOrder } from "../../ui/sortable-table-header" +import { Typography } from "../../ui/typography" +import { CustomTooltip } from "../../ui/tooltip" +import type { BigKey } from "@/state/valkey-features/bigkeys/bigKeysSlice" +import { copyToClipboard } from "@/lib/utils" + +interface BigKeyRowProps { + entry: BigKey +} + +function BigKeyRow({ entry }: BigKeyRowProps) { + const { key, sizeBytes, type, ttl, nodeId } = entry + + const handleCopy = () => { + copyToClipboard(key) + toast.success("Key name copied!") + } + + return ( + + +
+ {key} + +
+ + + {type} + + + {formatBytes(sizeBytes)} + + + {convertTTL(ttl)} + + + + {truncateText(nodeId ?? "—")} + + + + ) +} + +interface NoMatchRowProps { + searchQuery: string + selectedNode: string +} + +function NoMatchRow({ searchQuery, selectedNode }: NoMatchRowProps) { + return ( + + + + No keys match + {searchQuery && ( + {searchQuery} + )} + {selectedNode !== "all" && ( + on node {selectedNode} + )} + + + + ) +} + +interface BigKeysTableProps { + rows: BigKey[] + sortOrder: SortOrder + onToggleSort: () => void + searchQuery: string + selectedNode: string +} + +export function BigKeysTable({ + rows, sortOrder, onToggleSort, searchQuery, selectedNode, +}: BigKeysTableProps) { + return ( + + } + label="Key Name" + width="w-1/3" + /> + + + + + + } + > + {rows.length === 0 ? ( + + ) : ( + rows.map((entry, index) => ( + + )) + )} + + ) +} diff --git a/apps/frontend/src/components/activity-view/bigkeys/big-keys.tsx b/apps/frontend/src/components/activity-view/bigkeys/big-keys.tsx new file mode 100644 index 00000000..812e3113 --- /dev/null +++ b/apps/frontend/src/components/activity-view/bigkeys/big-keys.tsx @@ -0,0 +1,100 @@ +import { useState } from "react" +import { AlertCircle, KeyRound } from "lucide-react" +import * as R from "ramda" +import { EmptyState } from "../../ui/empty-state" +import { LoadingState } from "../../ui/loading-state" +import { Typography } from "../../ui/typography" +import { SearchInput } from "../../ui/search-input" +import { type SortOrder } from "../../ui/sortable-table-header" +import { NodeErrorsBanner } from "../hotkeys/hot-keys-banners" +import { NodeFilterDropdown } from "../hotkeys/node-filter-dropdown" +import { BigKeysTable } from "./big-keys-table" +import type { BigKey } from "@/state/valkey-features/bigkeys/bigKeysSlice" + +interface BigKeysProps { + data: BigKey[] | null + errorMessage: string | null + status?: string + nodeErrors?: { nodeId: string; error: string }[] + isCluster?: boolean +} + +export function BigKeys({ + data, errorMessage, status, nodeErrors, isCluster, +}: BigKeysProps) { + const [sortOrder, setSortOrder] = useState("desc") + const [searchQuery, setSearchQuery] = useState("") + const [selectedNode, setSelectedNode] = useState("all") + + if (status === "Pending") return + + const allKeys = R.defaultTo([], data) + + const uniqueNodes = Array.from( + new Set(allKeys.map((k) => k.nodeId).filter(Boolean)), + ) as string[] + + const filtered = allKeys.filter((k) => { + const matchesSearch = !searchQuery || k.key.toLowerCase().includes(searchQuery.toLowerCase()) + const matchesNode = selectedNode === "all" || k.nodeId === selectedNode + return matchesSearch && matchesNode + }) + + const sorted = R.sort( + (sortOrder === "asc" ? R.ascend : R.descend)((k) => k.sizeBytes), + filtered, + ) + + const banner = nodeErrors && nodeErrors.length > 0 && + + if (allKeys.length === 0) { + return ( + <> + {banner} + + + {errorMessage} +
+ ) + } + icon={} + title="No Big Keys Found" + /> + + ) + } + + return ( +
+ {banner} +
+ setSearchQuery(e.target.value)} + onClear={() => setSearchQuery("")} + placeholder="Search keys..." + value={searchQuery} + /> + {isCluster && ( + + )} +
+
+ setSortOrder((prev) => (prev === "asc" ? "desc" : "asc"))} + rows={sorted} + searchQuery={searchQuery} + selectedNode={selectedNode} + sortOrder={sortOrder} + /> +
+
+ ) +} diff --git a/apps/frontend/src/state/epics/rootEpic.ts b/apps/frontend/src/state/epics/rootEpic.ts index 03f68b1e..859e64fc 100644 --- a/apps/frontend/src/state/epics/rootEpic.ts +++ b/apps/frontend/src/state/epics/rootEpic.ts @@ -9,6 +9,7 @@ import { autoReconnectEpic, valkeyRetryEpic, getHotKeysEpic, + getBigKeysEpic, getCommandLogsEpic, updateConfigEpic, getCpuUsageEpic, @@ -31,6 +32,7 @@ export const registerEpics = (store: Store) => { sendRequestEpic(), setDataEpic(store), getHotKeysEpic(store), + getBigKeysEpic(), getCommandLogsEpic(), updateConfigEpic(), keyBrowserEpic(), diff --git a/apps/frontend/src/state/epics/valkeyEpics.ts b/apps/frontend/src/state/epics/valkeyEpics.ts index bda71045..ea5c9217 100644 --- a/apps/frontend/src/state/epics/valkeyEpics.ts +++ b/apps/frontend/src/state/epics/valkeyEpics.ts @@ -32,6 +32,7 @@ import { selectMetricsStarting, selectError } from "../valkey-features/info/info import { action$, select } from "../middleware/rxjsMiddleware/rxjsMiddleware.ts" import { connectFulfilled as wsConnectFulfilled } from "../wsconnection/wsConnectionSlice" import { hotKeysRequested } from "../valkey-features/hotkeys/hotKeysSlice.ts" +import { bigKeysRequested } from "../valkey-features/bigkeys/bigKeysSlice.ts" import { commandLogsRequested } from "../valkey-features/commandlogs/commandLogsSlice.ts" import history from "../../history.ts" import { setClusterData, updateClusterData } from "../valkey-features/cluster/clusterSlice.ts" @@ -456,6 +457,24 @@ export const getCommandLogsEpic = () => ignoreElements(), ) +export const getBigKeysEpic = () => + action$.pipe( + select(bigKeysRequested), + tap((action) => { + try { + const { connectionId, clusterId } = action.payload + const socket = getSocket() + socket.next({ + type: action.type, + payload: { connectionId, clusterId }, + }) + } catch (error) { + console.error("[getBigKeysEpic] Error sending action:", error) + } + }), + ignoreElements(), + ) + export const updateConfigEpic = () => action$.pipe( select(updateConfig), diff --git a/apps/frontend/src/state/valkey-features/bigkeys/bigKeysSlice.ts b/apps/frontend/src/state/valkey-features/bigkeys/bigKeysSlice.ts new file mode 100644 index 00000000..6aa9477f --- /dev/null +++ b/apps/frontend/src/state/valkey-features/bigkeys/bigKeysSlice.ts @@ -0,0 +1,104 @@ +import { createSlice } from "@reduxjs/toolkit" +import { type JSONObject } from "@common/src/json-utils" +import { ERROR, FULFILLED, PENDING, VALKEY } from "@common/src/constants.ts" +import { toNodeId } from "@common/src/connection-id.ts" +import * as R from "ramda" +import type { RootState } from "@/store.ts" + +type BigKeysStatus = typeof PENDING | typeof FULFILLED | typeof ERROR + +export interface BigKey { + key: string + sizeBytes: number + type: string + ttl: number + nodeId?: string +} + +export const selectBigKeys = (targetId: string) => (state: RootState) => + R.pathOr([], [VALKEY.BIGKEYS.name, targetId, "keys"], state) + +export const selectBigKeysStatus = (targetId: string) => (state: RootState) => + R.path([VALKEY.BIGKEYS.name, targetId, "status"], state) + +export const selectBigKeysError = (targetId: string) => (state: RootState) => + R.path([VALKEY.BIGKEYS.name, targetId, "error"], state) + +export const selectBigKeysNodeErrors = (targetId: string) => (state: RootState) => + R.pathOr([], [VALKEY.BIGKEYS.name, targetId, "nodeErrors"], state) + +export const selectBigKeysScanned = (targetId: string) => (state: RootState) => + R.path([VALKEY.BIGKEYS.name, targetId, "scanned"], state) ?? null + +export const selectBigKeysTotalKeys = (targetId: string) => (state: RootState) => + R.path([VALKEY.BIGKEYS.name, targetId, "totalKeys"], state) ?? null + +interface BigKeysState { + // Keyed by `targetId`: `clusterId` (cluster) or db-less `nodeId` (standalone). + [targetId: string]: { + keys: BigKey[] + scanned: number | null + totalKeys: number | null + nodeId: string | null + error?: JSONObject | null + nodeErrors?: { nodeId: string; error: string }[] + status: BigKeysStatus + } +} + +const initialBigKeysState: BigKeysState = {} + +const emptyEntry = (status: BigKeysStatus): BigKeysState[string] => ({ + keys: [], + scanned: null, + totalKeys: null, + nodeId: null, + status, +}) + +const bigKeysSlice = createSlice({ + name: "bigKeys", + initialState: initialBigKeysState, + reducers: { + bigKeysRequested: (state, action) => { + const { connectionId, clusterId } = action.payload + const targetId = clusterId ?? toNodeId(connectionId) + if (!state[targetId]) { + state[targetId] = emptyEntry(PENDING) + } else { + state[targetId].status = PENDING + state[targetId].keys = [] + state[targetId].error = null + } + }, + bigKeysFulfilled: (state, action) => { + const { keys, scanned, totalKeys, nodeId } = action.payload.parsedResponse + const targetId = action.payload.clusterId ?? action.payload.nodeId + const nodeErrors = action.payload.nodeErrors ?? [] + state[targetId] = { + keys, + scanned, + totalKeys, + nodeId, + nodeErrors, + status: FULFILLED, + } + }, + bigKeysError: (state, action) => { + const { error } = action.payload + const targetId = action.payload.clusterId ?? action.payload.nodeId + if (!state[targetId]) { + state[targetId] = emptyEntry(ERROR) + } + state[targetId].error = error + state[targetId].status = ERROR + }, + }, +}) + +export default bigKeysSlice.reducer +export const { + bigKeysRequested, + bigKeysFulfilled, + bigKeysError, +} = bigKeysSlice.actions diff --git a/apps/frontend/src/store.ts b/apps/frontend/src/store.ts index 8c5d41f6..4bea7ca1 100644 --- a/apps/frontend/src/store.ts +++ b/apps/frontend/src/store.ts @@ -9,6 +9,7 @@ import valkeyCommandReducer from "@/state/valkey-features/command/commandSlice.t import valkeyInfoReducer from "@/state/valkey-features/info/infoSlice.ts" import keyBrowserReducer from "@/state/valkey-features/keys/keyBrowserSlice.ts" import hotKeysReducer from "@/state/valkey-features/hotkeys/hotKeysSlice.ts" +import bigKeysReducer from "@/state/valkey-features/bigkeys/bigKeysSlice.ts" import commandLogsReducer from "@/state/valkey-features/commandlogs/commandLogsSlice" import configReducer from "@/state/valkey-features/config/configSlice" import cpuReducer from "@/state/valkey-features/cpu/cpuSlice.ts" @@ -25,6 +26,7 @@ export const store = configureStore({ [VALKEY.KEYS.name]: keyBrowserReducer, [VALKEY.CLUSTER.name]: clusterReducer, [VALKEY.HOTKEYS.name]: hotKeysReducer, + [VALKEY.BIGKEYS.name]: bigKeysReducer, [VALKEY.COMMANDLOGS.name]: commandLogsReducer, [VALKEY.CONFIG.name]: configReducer, [VALKEY.CPU.name]: cpuReducer, diff --git a/apps/metrics/src/analyzers/scan-big-keys.js b/apps/metrics/src/analyzers/scan-big-keys.js new file mode 100644 index 00000000..99933c0e --- /dev/null +++ b/apps/metrics/src/analyzers/scan-big-keys.js @@ -0,0 +1,43 @@ +import { Heap } from "heap-js" + +export const scanBigKeys = async (client, { scanLimit = 10000, topN = 50, batchSize = 100 } = {}) => { + const heap = new Heap((a, b) => a.sizeBytes - b.sizeBytes) + + const totalKeys = Number(await client.customCommand(["DBSIZE"])) + + let cursor = "0" + let scanned = 0 + + do { + const [nextCursor, keys] = await client.customCommand(["SCAN", cursor, "COUNT", batchSize.toString()]) + cursor = nextCursor + + for (const key of keys) { + const [sizeBytes, type, ttl] = await Promise.all([ + // sample 5 elements to estimate size faster on big keys + client.customCommand(["MEMORY", "USAGE", key, "SAMPLES", "5"]), + client.customCommand(["TYPE", key]), + client.customCommand(["TTL", key]), + ]) + + const entry = { key, sizeBytes: Number(sizeBytes), type, ttl: Number(ttl) } + + if (heap.size() < topN) { + heap.push(entry) + } else if (Number(sizeBytes) > heap.peek().sizeBytes) { + heap.pop() + heap.push(entry) + } + + scanned++ + } + // scanLimit controls how many keys are scanned, not how many are returned + } while (cursor !== "0" && scanned < scanLimit) + + return { + // topN keys returned in descending order of sizeBytes + keys: heap.toArray().sort((a, b) => b.sizeBytes - a.sizeBytes), + scanned, + totalKeys, + } +} diff --git a/apps/metrics/src/api-schema.js b/apps/metrics/src/api-schema.js index 864d86e1..a11a40b5 100644 --- a/apps/metrics/src/api-schema.js +++ b/apps/metrics/src/api-schema.js @@ -60,3 +60,10 @@ export const cpuQuerySchema = z.object({ since: optionalInt(), until: optionalInt(), }) + +// absent or invalid values fall back to the defaults defined in scanBigKeys +export const bigKeysQuerySchema = z.object({ + scanLimit: optionalInt({ min: 1, abs: true }), + topN: optionalInt({ min: 1, abs: true }), + batchSize: optionalInt({ min: 1, abs: true }), +}) diff --git a/apps/metrics/src/index.js b/apps/metrics/src/index.js index f600ca42..e7ef5547 100644 --- a/apps/metrics/src/index.js +++ b/apps/metrics/src/index.js @@ -10,11 +10,12 @@ import { calculateHotKeysFromHotSlots } from "./analyzers/calculate-hot-keys.js" import { enrichHotKeys } from "./analyzers/enrich-hot-keys.js" import cpuFold from "./analyzers/calculate-cpu-usage.js" import memoryFold from "./analyzers/memory-metrics.js" -import { cpuQuerySchema, memoryQuerySchema, parseQuery } from "./api-schema.js" +import { bigKeysQuerySchema, cpuQuerySchema, memoryQuerySchema, parseQuery } from "./api-schema.js" import { sanitizeUrl } from "./utils/helpers.js" import { setupNdjsonCleaner, stopNdjsonCleaner } from "./effects/ndjson-cleaner.js" import { createValkeyClient } from "./valkey-client.js" import { ACTION, MONITOR } from "./utils/constants.js" +import { scanBigKeys } from "./analyzers/scan-big-keys.js" async function main() { const cfg = getConfig() @@ -70,6 +71,17 @@ async function main() { } }) + app.get("/big-keys", async (req, res) => { + try { + // scanLimit, topN and batchSize are optional - scanBigKeys applies the defaults + const result = await scanBigKeys(client, parseQuery(bigKeysQuerySchema)(req.query)) + res.json({ ...result, nodeId: ownNodeId }) + } catch (e) { + console.error(e) + res.status(500).json({ error: e.message }) + } + }) + app.get("/commandlog", (req, res) => getCommandLogs(req, res, ownNodeId)) app.get("/slowlog_len", async (req, res) => { diff --git a/apps/server/src/actions/bigkeys.ts b/apps/server/src/actions/bigkeys.ts new file mode 100644 index 00000000..75cb063d --- /dev/null +++ b/apps/server/src/actions/bigkeys.ts @@ -0,0 +1,141 @@ +import { type WebSocket } from "ws" +import { VALKEY, type AggregateReplyId, toNodeId } from "valkey-common" +import * as R from "ramda" +import { withDeps, Deps } from "./utils" + +type BigKey = { + key: string + sizeBytes: number + type: string + ttl: number + nodeId?: string +} + +type BigKeysResponse = { + nodeId: string + keys: BigKey[] + scanned: number + totalKeys: number +} + +type NodeError = { + nodeId: string + error: string +} + +// Scan-policy defaults, always forwarded to the metrics nodes. +// batchSize is left to the metrics layer's own default. +const DEFAULT_TOP_N = 50 +const DEFAULT_SCAN_LIMIT = 10000 + +const sendBigKeysFulfilled = ( + ws: WebSocket, + replyId: AggregateReplyId, + parsedResponse: BigKeysResponse, + nodeErrors?: NodeError[], +) => { + ws.send( + JSON.stringify({ + type: VALKEY.BIGKEYS.bigKeysFulfilled, + payload: { + ...replyId, + parsedResponse, + ...(nodeErrors?.length ? { nodeErrors } : {}), + }, + }), + ) +} + +const sendBigKeysError = ( + ws: WebSocket, + nodeId: string, + error: unknown, +) => { + console.error(error) + ws.send( + JSON.stringify({ + type: VALKEY.BIGKEYS.bigKeysError, + payload: { + nodeId, + error: error instanceof Error ? error.message : String(error), + }, + }), + ) +} + +export const bigKeysRequested = withDeps( + async ({ ws, metricsServerMap, action, clusterNodesRegistry }) => { + const { connectionId, clusterId, scanLimit, topN } = action.payload + + // Resolve once so every node and the merge cap use the same value. + const effectiveTopN = Number(topN) || DEFAULT_TOP_N + const effectiveScanLimit = Number(scanLimit) || DEFAULT_SCAN_LIMIT + + const nodes = + typeof clusterId === "string" + ? clusterNodesRegistry[clusterId] + : undefined + + const nodeIds = nodes ? Object.keys(nodes) : [toNodeId(connectionId)] + + const promises = nodeIds.map(async (nodeId: string) => { + const metricsServerURI = metricsServerMap.get(nodeId)?.metricsURI + if (!metricsServerURI) { + console.warn("Metrics server not started for node: ", nodeId) + return { nodeId, error: "Metrics server not started" } as NodeError + } + const url = new URL("/big-keys", metricsServerURI) + url.searchParams.set("scanLimit", String(effectiveScanLimit)) + url.searchParams.set("topN", String(effectiveTopN)) + try { + console.debug("[Big keys] Fetching from:", url.href) + const response = await fetch(url) + if (!response.ok) { + const errorBody = await response.json() as { error?: string } + return { nodeId, error: errorBody.error ?? `HTTP ${response.status}` } as NodeError + } + return await response.json() as BigKeysResponse + } catch (error) { + return { nodeId, error: error instanceof Error ? error.message : String(error) } as NodeError + } + }) + + const settled = await Promise.all(promises) + const results = settled.filter((r): r is BigKeysResponse => !!r && "keys" in r) + const nodeErrors = settled.filter((r): r is NodeError => !!r && "error" in r) + + if (results.length === 0) { + if (nodes) { + const emptyResponse: BigKeysResponse = { keys: [], scanned: 0, totalKeys: 0, nodeId: "" } + sendBigKeysFulfilled(ws, { clusterId: clusterId as string }, emptyResponse, nodeErrors) + return + } + const nodeId = toNodeId(connectionId) + if (nodeErrors[0]) { + sendBigKeysError(ws, nodeId, nodeErrors[0].error) + } else { + const emptyResponse: BigKeysResponse = { keys: [], scanned: 0, totalKeys: 0, nodeId } + sendBigKeysFulfilled(ws, { nodeId }, emptyResponse) + } + return + } + + if (!nodes) { + // Tag each key with the node it came from so the UI can show which node it belongs to. + const single = results[0] + const keys = single.keys.map((k) => ({ ...k, nodeId: single.nodeId })) + sendBigKeysFulfilled(ws, { nodeId: toNodeId(connectionId) }, { ...single, keys }) + return + } + + // Merge every node's keys, keep the globally largest top N, each key carries its nodeId. + const mergedKeys = R.pipe( + R.chain((res: BigKeysResponse) => res.keys.map((k): BigKey => ({ ...k, nodeId: res.nodeId }))), + R.sort(R.descend((k) => k.sizeBytes)), + R.take(effectiveTopN), + )(results) as BigKey[] + const scanned = R.sum(results.map((r) => r.scanned)) + const totalKeys = R.sum(results.map((r) => r.totalKeys)) + const aggregatedResponse: BigKeysResponse = { keys: mergedKeys, scanned, totalKeys, nodeId: clusterId as string } + sendBigKeysFulfilled(ws, { clusterId: clusterId as string }, aggregatedResponse, nodeErrors) + }) diff --git a/apps/server/src/index.ts b/apps/server/src/index.ts index 6cc0b2db..74dd0dba 100644 --- a/apps/server/src/index.ts +++ b/apps/server/src/index.ts @@ -18,6 +18,7 @@ import { updateKeyRequested } from "./actions/keys" import { hotKeysRequested } from "./actions/hotkeys" +import { bigKeysRequested } from "./actions/bigkeys" import { commandLogsRequested } from "./actions/commandLogs" import { updateConfig, enableClusterSlotStats } from "./actions/config" import { cpuUsageRequested } from "./actions/cpuUsage" @@ -197,6 +198,7 @@ wss.on("connection", (ws: AliveWebSocket) => { [VALKEY.KEYS.addKeyRequested]: addKeyRequested, [VALKEY.KEYS.updateKeyRequested]: updateKeyRequested, [VALKEY.HOTKEYS.hotKeysRequested]: hotKeysRequested, + [VALKEY.BIGKEYS.bigKeysRequested]: bigKeysRequested, [VALKEY.COMMANDLOGS.commandLogsRequested]: commandLogsRequested, [VALKEY.CONFIG.enableClusterSlotStats]: enableClusterSlotStats, [VALKEY.CPU.cpuUsageRequested]: cpuUsageRequested, diff --git a/common/src/constants.ts b/common/src/constants.ts index 0b235d56..8d6cf964 100644 --- a/common/src/constants.ts +++ b/common/src/constants.ts @@ -74,6 +74,11 @@ export const VALKEY = { hotKeysFulfilled: "hotKeysFulfilled", hotKeysError: "hotKeysError", }), + BIGKEYS: makeNamespace( "bigKeys",{ + bigKeysRequested: "bigKeysRequested", + bigKeysFulfilled: "bigKeysFulfilled", + bigKeysError: "bigKeysError", + }), COMMANDLOGS: makeNamespace( "commandLogs",{ commandLogsRequested: "commandLogsRequested", commandLogsFulfilled: "commandLogsFulfilled",