Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion apps/frontend/src/components/activity-view/ActivityView.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,12 @@ import {
hotKeysRequested, selectHotKeys, selectHotKeysStatus, selectHotKeysError,
selectHotKeysNodeErrors, selectHotKeysLastCollectedAt
} from "@/state/valkey-features/hotkeys/hotKeysSlice"
import { monitorRequested, selectMonitorRunning, selectClusterMonitorRunning } from "@/state/valkey-features/monitor/monitorSlice"
import {
monitorRequested,
selectMonitorRunning,
selectClusterMonitorRunning,
selectMonitorError
} from "@/state/valkey-features/monitor/monitorSlice"
import { selectConnectionDetails, selectClusterAlias } from "@/state/valkey-features/connection/connectionSelectors"
import { getKeyTypeRequested } from "@/state/valkey-features/keys/keyBrowserSlice"
import { selectKeys } from "@/state/valkey-features/keys/keyBrowserSelectors"
Expand Down Expand Up @@ -66,6 +71,7 @@ export const ActivityView = () => {
const monitorRunning = useSelector((state: RootState) =>
clusterId ? selectClusterMonitorRunning(clusterId)(state) : selectMonitorRunning(nodeId)(state),
)
const monitorError = useSelector(selectMonitorError(nodeId))
const connectionDetails = useSelector((state: RootState) => selectConnectionDetails(id!)(state))
const clusterAlias = useSelector(selectClusterAlias(id!))
const useHotSlots = connectionDetails?.keyEvictionPolicy?.includes("lfu") && connectionDetails?.clusterSlotStatsEnabled
Expand Down Expand Up @@ -211,6 +217,7 @@ export const ActivityView = () => {
data={hotKeysData}
errorMessage={hotKeysErrorMessage as string | null}
isCluster={!!clusterId}
monitorError={monitorError}
monitorRunning={monitorRunning}
nodeErrors={hotKeysNodeErrors}
onKeyClick={handleKeyClick}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,22 +59,27 @@ export function NodeErrorsBanner({ nodeErrors }: NodeErrorsBannerProps) {

interface MonitorNotRunningBannerProps {
onStartMonitoring: () => void
error?: string | null
}

export function MonitorNotRunningBanner({ onStartMonitoring }: MonitorNotRunningBannerProps) {
export function MonitorNotRunningBanner({ onStartMonitoring, error }: MonitorNotRunningBannerProps) {
return (
<div className="m-3 p-3 bg-red-50 dark:bg-red-900/20 rounded-md border
border-red-200 dark:border-red-700 flex items-start gap-2">
<AlertCircle className="w-4 h-4 text-red-500 mt-0.5 shrink-0" />
<Typography variant="bodySm">
Monitor is not running. Showing last known data.{" "}
<button
className="text-primary underline hover:opacity-80"
onClick={onStartMonitoring}
type="button"
>
Start MONITOR
</button>
{error
? <>Monitor failed: {error}</>
: <>Monitor is not running. Showing last known data.{" "}
<button
className="text-primary underline hover:opacity-80"
onClick={onStartMonitoring}
type="button"
>
Start MONITOR
</button>
</>
}
</Typography>
</div>
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ interface HotKeysProps {
errorMessage: string | null
status?: string
monitorRunning?: boolean
monitorError?: string | null
nodeErrors?: { nodeId: string; error: string }[]
isCluster?: boolean
onKeyClick?: (keyName: string) => void
Expand All @@ -25,7 +26,7 @@ interface HotKeysProps {
}

export function HotKeys({
data, errorMessage, status, monitorRunning, nodeErrors,
data, errorMessage, status, monitorRunning, monitorError, nodeErrors,
isCluster, onKeyClick, onStartMonitoring, selectedKey,
}: HotKeysProps) {
const [sortOrder, setSortOrder] = useState<SortOrder>("desc")
Expand Down Expand Up @@ -63,7 +64,7 @@ export function HotKeys({
const banners = (
<>
{!monitorRunning && onStartMonitoring && (
<MonitorNotRunningBanner onStartMonitoring={onStartMonitoring} />
<MonitorNotRunningBanner error={monitorError} onStartMonitoring={onStartMonitoring} />
)}
{nodeErrors && nodeErrors.length > 0 && (
<NodeErrorsBanner nodeErrors={nodeErrors} />
Expand Down
2 changes: 1 addition & 1 deletion apps/frontend/src/components/dashboard/Dashboard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ export function Dashboard() {
className="flex-1"
icon={<Database className="text-primary" size={24} />}
label="Used Memory"
value={formatBytes(memoryUsageMetrics.used_memory || 0)}
value={formatBytes(memoryUsageMetrics.used_memory || -1)}
/>
<StatCard
className="flex-1"
Expand Down
2 changes: 1 addition & 1 deletion apps/frontend/src/components/ui/donut-chart.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export default function DonutChart() {
// count and memory usage per each key type
const keyTypeInfo = keys.reduce((stats, key) => {
(stats[key.type] ??= { count: 0, totalSize: 0 }).count += 1
stats[key.type].totalSize += key.size
if (key.size > 0) stats[key.type].totalSize += key.size
Comment thread
ArgusLi marked this conversation as resolved.
return stats
}, {} as Record<string, { count: number; totalSize: number }>)

Expand Down
11 changes: 10 additions & 1 deletion apps/frontend/src/state/epics/valkeyEpics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,16 @@ export const connectionEpic = (store: Store) =>
currentConnections[payload.connectionId] = connectionToSave
localStorage.setItem(LOCAL_STORAGE.VALKEY_CONNECTIONS, JSON.stringify(currentConnections))

toast.success("Connected to server successfully!")
if (baseConnectionDetails?.host?.includes(".serverless.")) {
toast.warning(
`Connected to an ElastiCache Serverless cache.
Some features like Dashboard metrics, Command Logs, and Monitor are not available.
Key Browser and Send Command work as expected.`,
{ duration: 10000 },
)
} else {
toast.success("Connected to server successfully!")
}
} catch (e) {
toast.error("Connection to server failed!")
console.error(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ export const selectMonitorLoading =
(state: RootState) =>
R.path<boolean>([VALKEY.MONITOR.name, nodeId, "loading"], state) ?? false

export const selectMonitorError =
(connectionId: string) =>
(state: RootState) =>
R.path<string | null>([VALKEY.MONITOR.name, connectionId, "error"], state) ?? null

export const selectRunningMonitorConnections =
(state: RootState): { nodeId: string; clusterId?: string; startedAt: number | null }[] => {
const monitorState = R.path<MonitorState>([VALKEY.MONITOR.name], state) ?? {}
Expand Down Expand Up @@ -100,7 +105,7 @@ const monitorSlice = createSlice({
state[nodeId].checkAt = parsedResponse.checkAt ?? null
state[nodeId].startedAt = parsedResponse.startedAt ?? null
state[nodeId].loading = false
state[nodeId].error = null
if (parsedResponse.monitorRunning) state[nodeId].error = null
},
// eslint-disable-next-line @typescript-eslint/no-unused-vars
saveMonitorSettingsRequested: (_state, _action) => {
Expand Down
113 changes: 68 additions & 45 deletions apps/metrics/src/effects/monitor-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,76 +3,99 @@ import { exhaustMap, catchError, map } from "rxjs"
import Valkey from "iovalkey"
import { ElastiCacheIAMProvider } from "../utils/elasticache-iam-provider.js"

export const makeMonitorStream = (onLogs = async () => { }, config) => {
const { monitoringInterval, monitoringDuration, maxCommandsPerRun: maxLogs } = config

function getConnectionOptions() {
const host = process.env.VALKEY_HOST
const port = Number(process.env.VALKEY_PORT)

const username = process.env.VALKEY_USERNAME
const verifyTlsCertificate = process.env.VALKEY_VERIFY_CERT
const verifyTlsCertificate = process.env.VALKEY_VERIFY_CERT
let tls = undefined
if (process.env.VALKEY_TLS === "true") {
tls = verifyTlsCertificate === "false" ? { rejectUnauthorized: false } : {}
tls = verifyTlsCertificate === "false" ? { rejectUnauthorized: false } : {}
}
return { host, port, username, tls }
}

const runMonitorOnce = async () => {
const password = process.env.VALKEY_AUTH_TYPE === "iam"
? await new ElastiCacheIAMProvider(username, process.env.VALKEY_REPLICATION_GROUP_ID, process.env.VALKEY_AWS_REGION).getCredentials()
: process.env.VALKEY_PASSWORD
const monitorClient = new Valkey({
host,
port,
username,
password,
tls,
})
monitorClient.on("error", (err) => console.error("[monitor] ioredis client error:", err.message))
const monitor = await monitorClient.monitor()
monitor.on("error", (err) => console.error("[monitor] monitor stream error:", err.message))
async function getPassword() {
const username = process.env.VALKEY_USERNAME
return process.env.VALKEY_AUTH_TYPE === "iam"
? await new ElastiCacheIAMProvider(username, process.env.VALKEY_REPLICATION_GROUP_ID, process.env.VALKEY_AWS_REGION).getCredentials()
: process.env.VALKEY_PASSWORD
}

/**
* Phase 1: Connect and issue MONITOR command.
* Returns the client and monitor handle for use in Phase 2.
* Throws immediately if the command is unsupported.
*/
export const connectMonitor = async () => {
const { host, port, username, tls } = getConnectionOptions()
const password = await getPassword()
const client = new Valkey({ host, port, username, password, tls })
client.on("error", (err) => console.error("[monitor] iovalkey client error:", err.message))
const monitor = await client.monitor()
monitor.on("error", (err) => console.error("[monitor] monitor stream error:", err.message))
return { client, monitor }
}

const rows = []
const overflow$ = new Subject()
/**
* Phase 2: Collect logs from an active monitor handle.
* Listens for monitoringDuration or until maxLogs is reached, then disconnects.
*/
export const collectLogs = async ({ client, monitor }, { monitoringDuration, maxCommandsPerRun: maxLogs }) => {
const rows = []
const overflow$ = new Subject()

const processEvent = (time, args) => {
rows.push({ ts: time, command: args.join(" ") })
if (rows.length >= maxLogs) overflow$.next()
}
const processEvent = (time, args) => {
rows.push({ ts: time, command: args.join(" ") })
if (rows.length >= maxLogs) overflow$.next()
}

monitor.on("monitor", processEvent)
monitor.on("monitor", processEvent)

let monitorCompletionReason
let monitorCompletionReason

try {
monitorCompletionReason = await firstValueFrom(
race([
timer(monitoringDuration).pipe(map(() => "Monitor duration completed.")),
overflow$.pipe(map(() => "Max logs read")),
]),
)
} finally {
monitor.off("monitor", processEvent)
await Promise.all([
monitor.disconnect(),
monitorClient.disconnect(),
(async () => { overflow$.complete() })(),
])
console.debug(`Monitor run complete (${monitorCompletionReason}).`)
}
try {
monitorCompletionReason = await firstValueFrom(
race([
timer(monitoringDuration).pipe(map(() => "Monitor duration completed.")),
overflow$.pipe(map(() => "Max logs read")),
]),
)
} finally {
monitor.off("monitor", processEvent)
await Promise.all([
monitor.disconnect(),
client.disconnect(),
(async () => { overflow$.complete() })(),
])
console.debug(`Monitor run complete (${monitorCompletionReason}).`)
}

return rows
}

export const makeMonitorStream = (onLogs = async () => { }, config) => {
const { monitoringInterval } = config

const runMonitorOnce = async () => {
const handle = await connectMonitor()
const rows = await collectLogs(handle, config)
if (rows.length > 0) await onLogs(rows)
return rows
}

const monitorStream$ = timer(0, monitoringInterval).pipe(
exhaustMap(() =>
defer(runMonitorOnce).pipe(
catchError((err) => {
console.error("Monitor cycle failed", err)
if (err.message?.includes("unknown command")) {
throw err
}
return of([])
}),
),
),
)
return monitorStream$

}
14 changes: 12 additions & 2 deletions apps/metrics/src/handlers/commandlog-handler.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import { COMMANDLOG_TYPE } from "../utils/constants.js"
import { COMMANDLOG_TYPE, COMMANDLOG_SLOW, COMMANDLOG_LARGE_REPLY, COMMANDLOG_LARGE_REQUEST } from "../utils/constants.js"
import { getCollectorMeta } from "../init-collectors.js"
import * as Streamer from "../effects/ndjson-streamer.js"

const collectorNameByType = {
[COMMANDLOG_TYPE.SLOW]: COMMANDLOG_SLOW,
[COMMANDLOG_TYPE.LARGE_REQUEST]: COMMANDLOG_LARGE_REQUEST,
[COMMANDLOG_TYPE.LARGE_REPLY]: COMMANDLOG_LARGE_REPLY,
}

const latestByTsFold = () => ({
seed: null,
reducer: (acc, curr) => (acc == null || curr.ts > acc.ts ? curr : acc),
Expand Down Expand Up @@ -30,7 +36,11 @@ const getCommandLogRows = async (commandlogType) => {
export const getCommandLogs = async (req, res, nodeId) => {
try {
const commandlogType = req.query.type
const { lastUpdatedAt, nextCycleAt } = getCollectorMeta(commandlogType) || {}
const meta = getCollectorMeta(collectorNameByType[commandlogType])
if (meta?.error) {
return res.status(503).json({ error: meta.error })
}
const { lastUpdatedAt, nextCycleAt } = meta || {}
if (lastUpdatedAt !== null) {
const count = Number(req.query.count) || 100
const rows = await getCommandLogRows(commandlogType)
Expand Down
11 changes: 9 additions & 2 deletions apps/metrics/src/init-collectors.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { makeFetcher } from "./effects/fetchers.js"
import { makeMonitorStream } from "./effects/monitor-stream.js"
import { makeMonitorStream, connectMonitor } from "./effects/monitor-stream.js"
import { makeNdjsonWriter } from "./effects/ndjson-writer.js"
import { startCollector } from "./epics/collector-rx.js"
import { MONITOR } from "./utils/constants.js"
Expand Down Expand Up @@ -41,8 +41,13 @@ let monitorStopper
updateCollectorMeta(MONITOR, {
isRunning: false,
})
const startMonitor = (cfg) => {
const startMonitor = async (cfg) => {
const monitorEpic = cfg.epics.find((e) => e.name === MONITOR)

// Phase 1: Verify MONITOR command is supported (fast — throws if not)
await connectMonitor()

// Command works — set up collection
const { maxFiles, maxFileSize } = computeCapacity(monitorEpic.data_retention_mb)
const nd = makeNdjsonWriter({
dataDir: cfg.server.data_dir,
Expand All @@ -65,6 +70,7 @@ const startMonitor = (cfg) => {
willCompleteAt: Date.now() + monitorEpic.monitoringDuration,
})

// Phase 2: Start background collection stream
const stream$ = makeMonitorStream(async (logs) => {
await sink.appendRows(logs)
}, monitorEpic)
Expand Down Expand Up @@ -153,6 +159,7 @@ const setupCollectors = async (client, cfg) => {
rows = await fn()
} catch (err) {
console.warn(`[${f.name}] initial fetch failed, skipping collector:`, err.message)
updateCollectorMeta(f.name, { isRunning: false, error: err.message })
nd.close()
return
}
Expand Down
Loading
Loading