Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,025 changes: 1,025 additions & 0 deletions examples/metrics/grafana.json

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions examples/metrics/metrics_configs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Prometheus Metrics Configuration
# This file defines which metrics should be enabled and their configurations
log_interval: 5 # Interval in seconds for logging metrics

prometheus:
multiproc_dir: "/vllm-workspace" # Directory for Prometheus multiprocess mode

metric_prefix: "ucm:"

# Enable/disable metrics by category
enabled_metrics:
counters: true
gauges: true
histograms: true

# Counter metrics configuration
# counters:
# - name: "received_requests"
# documentation: "Total number of requests sent to ucm"

# Gauge metrics configuration
# gauges:
# - name: "lookup_hit_rate"
# documentation: "Hit rate of ucm lookup requests since last log"
# multiprocess_mode: "livemostrecent"

# Histogram metrics configuration
histograms:
- name: "load_requests_num"
documentation: "Number of requests loaded from ucm"
buckets: [1, 5, 10, 20, 50, 100, 200, 500, 1000]
- name: "load_blocks_num"
documentation: "Number of blocks loaded from ucm"
buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 550, 600, 750, 800, 850, 900, 950, 1000]
- name: "load_duration"
documentation: "Time to load from ucm (ms)"
buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 550, 600, 750, 800, 850, 900, 950, 1000]
- name: "load_speed"
documentation: "Speed of loading from ucm (GB/s)"
buckets: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 50, 60, 70, 80, 90, 100]
- name: "save_requests_num"
documentation: "Number of requests saved to ucm"
buckets: [1, 5, 10, 20, 50, 100, 200, 500, 1000]
- name: "save_blocks_num"
documentation: "Number of blocks saved to ucm"
buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 550, 600, 750, 800, 850, 900, 950, 1000]
- name: "save_duration"
documentation: "Time to save to ucm (ms)"
buckets: [0, 50, 100, 150, 200, 250, 300, 350, 400, 550, 600, 750, 800, 850, 900, 950, 1000]
- name: "save_speed"
documentation: "Speed of saving to ucm (GB/s)"
buckets: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 50, 60, 70, 80, 90, 100]
- name: "interval_lookup_hit_rates"
documentation: "Hit rates of ucm lookup requests"
buckets: [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]

2 changes: 2 additions & 0 deletions examples/ucm_config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ ucm_connectors:

load_only_first_rank: false

metrics_config_path: "/vllm-workspace/metrics_config.yaml"

# Sparse attention configuration
# Format 1: Dictionary format (for methods like ESA, KvComp)
# ucm_sparse_config:
Expand Down
77 changes: 76 additions & 1 deletion ucm/integration/vllm/ucm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import itertools
import os
import pickle
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Callable, List, Optional

Expand All @@ -18,6 +19,8 @@
from vllm.v1.request import Request

from ucm.logger import init_logger
from ucm.shared.metrics import ucmmonitor
from ucm.shared.metrics.observability import UCMStatsLogger
from ucm.store.factory import UcmConnectorFactory
from ucm.store.ucmstore import Task, UcmKVStoreBase
from ucm.utils import Config
Expand Down Expand Up @@ -127,6 +130,7 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
self.broadcast_fn = self.group_coordinator.broadcast
self.broadcast_stream = torch.cuda.Stream()

logger.info(f"self.launch_config: {self.launch_config}")
connector_configs = self.launch_config.get("ucm_connectors", [])
assert len(connector_configs) > 0, "no storage connector name in config."

Expand All @@ -153,6 +157,7 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
1 if self.is_mla else num_head_per_tp
)
self.store = UcmConnectorFactory.create_connector(name, config)
self.block_data_size = config["kv_block_size"]

logger.info("init UCConnectorImpl, connector: %s", name)
logger.info(
Expand All @@ -161,6 +166,20 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
config["io_size"] / 1024,
)

self.metrics_config = self.launch_config.get("metrics_config_path", "")
if self.metrics_config:
self.stats_logger = UCMStatsLogger(
vllm_config.model_config.served_model_name,
self.rank,
self.metrics_config,
)
self.monitor = ucmmonitor.StatsMonitor.get_instance()
self.synchronize = (
torch.cuda.synchronize
if current_platform.is_cuda_alike()
else torch.npu.synchronize
)

def generate_hash(self, block_size: int, request: "Request") -> list[str]:
token_ids = request.all_token_ids

Expand Down Expand Up @@ -209,6 +228,11 @@ def get_num_new_matched_tokens(
f"hit hbm: {hbm_hit_block_num}, "
f"hit external: {external_hit_blocks}"
)
if self.metrics_config:
self.monitor.update_stats(
"ConnStats",
{"interval_lookup_hit_rates": external_hit_blocks / len(ucm_block_ids)},
)

total_hit_block_num = hbm_hit_block_num + external_hit_blocks

Expand Down Expand Up @@ -452,17 +476,23 @@ def _broadcast(self, dst_tensor_addr: list[torch.Tensor]):
tensor.copy_(rec_tensor[i])

def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:

metadata = self._get_connector_metadata()
assert isinstance(metadata, UCMConnectorMetadata)

self._init_kv_caches_from_forward_context(forward_context)

request_to_task: dict[str, Optional[Task]] = {}
req_broadcast_addr = {}
is_load = False
num_loaded_block = 0
num_loaded_request = 0
load_start_time = time.perf_counter() * 1000
for request_id, request in metadata.request_meta.items():
if len(request.load_block_ids[0]) == 0:
continue
is_load = True
num_loaded_block += len(request.load_block_ids[0])
num_loaded_request += 1

ucm_block_ids, vllm_block_ids = request.load_block_ids
if self.rank != 0 and not self.is_mla:
Expand All @@ -486,6 +516,24 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
logger.error(f"request {request_id} load kv cache failed.")
if self.load_only_first_rank:
self._broadcast(req_broadcast_addr[request_id])
load_end_time = time.perf_counter() * 1000
load_speed = (
num_loaded_block
* self.block_data_size
/ (load_end_time - load_start_time)
/ 1024
/ 1024
) # GB/s
if self.metrics_config and is_load:
self.monitor.update_stats(
"ConnStats",
{
"load_requests_num": num_loaded_request,
"load_blocks_num": num_loaded_block,
"load_duration": load_end_time - load_start_time,
"load_speed": load_speed,
},
)

def wait_for_layer_load(self, layer_name: str) -> None:
pass
Expand All @@ -503,15 +551,24 @@ def wait_for_save(self) -> None:

if (self.is_mla or self.is_dsa) and self.rank != 0:
return
if self.metrics_config:
self.synchronize()

metadata = self._get_connector_metadata()
assert isinstance(metadata, UCMConnectorMetadata)

request_to_task: dict[str, Task] = {}
request_to_blocks: dict[str, list[str]] = {}
is_save = False
num_saved_block = 0
num_saved_request = 0
save_start_time = time.perf_counter() * 1000
for request_id, request in metadata.request_meta.items():
if len(request.dump_block_ids[0]) == 0:
continue
is_save = True
num_saved_block += len(request.dump_block_ids[0])
num_saved_request += 1

ucm_block_ids, vllm_block_ids = request.dump_block_ids
if self.rank != 0:
Expand Down Expand Up @@ -546,6 +603,24 @@ def wait_for_save(self) -> None:
else:
logger.error(f"request {request_id} dump kv cache failed.")
self.store.commit(ucm_block_ids, False)
save_end_time = time.perf_counter() * 1000
save_speed = (
num_saved_block
* self.block_data_size
/ (save_end_time - save_start_time)
/ 1024
/ 1024
) # GB/s
if self.metrics_config and is_save:
self.monitor.update_stats(
"ConnStats",
{
"save_requests_num": num_saved_request,
"save_blocks_num": num_saved_block,
"save_duration": save_end_time - save_start_time,
"save_speed": save_speed,
},
)

def clear_connector_metadata(self) -> None:
super().clear_connector_metadata()
Expand Down
1 change: 1 addition & 0 deletions ucm/shared/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
add_subdirectory(vendor)
add_subdirectory(trans)
add_subdirectory(metrics)
add_subdirectory(test)
15 changes: 15 additions & 0 deletions ucm/shared/metrics/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
file(GLOB_RECURSE CORE_SRCS CONFIGURE_DEPENDS
"${CMAKE_CURRENT_SOURCE_DIR}/cc/stats/*.cc"
"${CMAKE_CURRENT_SOURCE_DIR}/cc/*.cc")
add_library(monitor_static STATIC ${CORE_SRCS})
set_property(TARGET monitor_static PROPERTY POSITION_INDEPENDENT_CODE ON)
target_include_directories(monitor_static PUBLIC
$<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cc>
$<INSTALL_INTERFACE:include>)
set_target_properties(monitor_static PROPERTIES OUTPUT_NAME monitor)

file(GLOB_RECURSE BINDINGS_SRCS CONFIGURE_DEPENDS "${CMAKE_CURRENT_SOURCE_DIR}/cpy/*.cc")
pybind11_add_module(ucmmonitor ${BINDINGS_SRCS})
target_link_libraries(ucmmonitor PRIVATE -Wl,--whole-archive monitor_static -Wl,--no-whole-archive)
target_include_directories(ucmmonitor PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/cc)
set_target_properties(ucmmonitor PROPERTIES LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
79 changes: 79 additions & 0 deletions ucm/shared/metrics/cc/stats/conn_stats.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#include "conn_stats.h"

namespace UC::Metrics {

ConnStats::ConnStats() = default;

std::string ConnStats::Name() const {
return "ConnStats";
}

void ConnStats::Reset() {
for (auto& v : data_) v.clear();
}

void ConnStats::Update(const std::unordered_map<std::string, double>& params) {
for (const auto& [k, v] : params) {
Key id = KeyFromString(k);
if (id == Key::COUNT) continue;
EmplaceBack(id, v);
}
}

std::unordered_map<std::string, std::vector<double>> ConnStats::Data() {
std::unordered_map<std::string, std::vector<double>> result;
result["save_requests_num"] = data_[static_cast<std::size_t>(Key::save_requests_num)];
result["save_blocks_num"] = data_[static_cast<std::size_t>(Key::save_blocks_num)];
result["save_duration"] = data_[static_cast<std::size_t>(Key::save_duration)];
result["save_speed"] = data_[static_cast<std::size_t>(Key::save_speed)];
result["load_requests_num"] = data_[static_cast<std::size_t>(Key::load_requests_num)];
result["load_blocks_num"] = data_[static_cast<std::size_t>(Key::load_blocks_num)];
result["load_duration"] = data_[static_cast<std::size_t>(Key::load_duration)];
result["load_speed"] = data_[static_cast<std::size_t>(Key::load_speed)];
result["interval_lookup_hit_rates"] = data_[static_cast<std::size_t>(Key::interval_lookup_hit_rates)];
return result;
}

Key ConnStats::KeyFromString(const std::string& k) {
if (k == "save_requests_num") return Key::save_requests_num;
if (k == "save_blocks_num") return Key::save_blocks_num;
if (k == "save_duration") return Key::save_duration;
if (k == "save_speed") return Key::save_speed;
if (k == "load_requests_num") return Key::load_requests_num;
if (k == "load_blocks_num") return Key::load_blocks_num;
if (k == "load_duration") return Key::load_duration;
if (k == "load_speed") return Key::load_speed;
if (k == "interval_lookup_hit_rates")return Key::interval_lookup_hit_rates;
return Key::COUNT;
}

void ConnStats::EmplaceBack(Key id, double value) {
data_[static_cast<std::size_t>(id)].push_back(value);
}

static Registrar registrar;

}
Loading