Skip to content

Commit af4bb49

Browse files
committed
[Feat] UCM supports metrics display online via Grafana and Promethues
1 parent 38f044d commit af4bb49

29 files changed

+1111
-702
lines changed

CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
88

99
option(BUILD_UCM_STORE "build ucm store module." ON)
1010
option(BUILD_UCM_SPARSE "build ucm sparse module." ON)
11-
option(BUILD_UCM_METRICS "build ucm metrics module." ON)
1211
option(BUILD_UNIT_TESTS "build all unit test suits." OFF)
1312
option(BUILD_NUMA "build numactl library." OFF)
1413
option(DOWNLOAD_DEPENDENCE "download dependence by cmake." ON)

ucm/CMakeLists.txt

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,3 @@ endif()
55
if(BUILD_UCM_SPARSE)
66
add_subdirectory(sparse)
77
endif()
8-
if(BUILD_UCM_METRICS)
9-
add_subdirectory(metrics)
10-
endif()

ucm/integration/vllm/ucm_connector.py

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import ctypes
12
import hashlib
23
import itertools
34
import os
@@ -19,8 +20,8 @@
1920
from vllm.v1.request import Request
2021

2122
from ucm.logger import init_logger
22-
from ucm.metrics.ucm_obser import UCMStatsLogger
23-
from ucm.metrics.ucmmonitor import UCMStatsMonitor
23+
from ucm.shared.metrics import monitor
24+
from ucm.shared.metrics.observability import UCMStatsLogger
2425
from ucm.store.factory import UcmConnectorFactory
2526
from ucm.store.ucmstore import Task, UcmKVStoreBase
2627
from ucm.utils import Config
@@ -129,10 +130,9 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
129130
self.broadcast_fn = self.group_coordinator.broadcast
130131
self.broadcast_stream = torch.cuda.Stream()
131132

133+
logger.info(f"self.launch_config: {self.launch_config}")
132134
connector_configs = self.launch_config.get("ucm_connectors", [])
133135
assert len(connector_configs) > 0, "no storage connector name in config."
134-
self.io_size = config["io_size"]
135-
self.num_layers = num_layers
136136

137137
name = connector_configs[0].get("ucm_connector_name")
138138
config = connector_configs[0].get("ucm_connector_config") or {}
@@ -157,6 +157,7 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
157157
1 if self.is_mla else num_head_per_tp
158158
)
159159
self.store = UcmConnectorFactory.create_connector(name, config)
160+
self.block_data_size = config["kv_block_size"]
160161

161162
logger.info("init UCConnectorImpl, connector: %s", name)
162163
logger.info(
@@ -165,6 +166,16 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
165166
config["io_size"] / 1024,
166167
)
167168

169+
self.stats_logger = UCMStatsLogger(
170+
vllm_config.model_config.served_model_name, self.rank
171+
)
172+
self.monitor = monitor.StatsMonitor.get_instance()
173+
self.synchronize = (
174+
torch.cuda.synchronize
175+
if current_platform.is_cuda_alike()
176+
else torch.npu.synchronize
177+
)
178+
168179
def generate_hash(self, block_size: int, request: "Request") -> list[str]:
169180
token_ids = request.all_token_ids
170181

@@ -213,6 +224,10 @@ def get_num_new_matched_tokens(
213224
f"hit hbm: {hbm_hit_block_num}, "
214225
f"hit external: {external_hit_blocks}"
215226
)
227+
self.monitor.update_stats(
228+
"ConnStats",
229+
{"interval_lookup_hit_rates": external_hit_blocks / len(ucm_block_ids)},
230+
)
216231

217232
total_hit_block_num = hbm_hit_block_num + external_hit_blocks
218233

@@ -456,12 +471,14 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
456471
req_broadcast_addr = {}
457472
is_load = False
458473
num_loaded_block = 0
474+
num_loaded_request = 0
459475
load_start_time = time.perf_counter() * 1000
460476
for request_id, request in metadata.request_meta.items():
461477
if len(request.load_block_ids[0]) == 0:
462478
continue
463479
is_load = True
464480
num_loaded_block += len(request.load_block_ids[0])
481+
num_loaded_request += 1
465482

466483
ucm_block_ids, vllm_block_ids = request.load_block_ids
467484
if self.rank != 0 and not self.is_mla:
@@ -486,17 +503,21 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
486503
if self.load_only_first_rank:
487504
self._broadcast(req_broadcast_addr[request_id])
488505
load_end_time = time.perf_counter() * 1000
506+
load_speed = (
507+
num_loaded_block
508+
* self.block_data_size
509+
/ (load_end_time - load_start_time)
510+
/ 1024
511+
/ 1024
512+
) # GB/s
489513
if is_load:
490-
UCMStatsMonitor.get_instance().update_stats(
491-
"UCMStats",
514+
self.monitor.update_stats(
515+
"ConnStats",
492516
{
517+
"load_requests_num": num_loaded_request,
518+
"load_blocks_num": num_loaded_block,
493519
"load_duration": load_end_time - load_start_time,
494-
"load_speed": num_loaded_block
495-
* self.io_size
496-
* self.num_layers
497-
/ (load_end_time - load_start_time)
498-
/ 1024
499-
/ 1024, # GB/s
520+
"load_speed": load_speed,
500521
},
501522
)
502523

@@ -515,20 +536,24 @@ def save_kv_layer(
515536
def wait_for_save(self) -> None:
516537
if self.is_mla and self.rank != 0:
517538
return
539+
if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
540+
self.synchronize()
518541

519542
metadata = self._get_connector_metadata()
520543
assert isinstance(metadata, UCMConnectorMetadata)
521544

522545
request_to_task: dict[str, Task] = {}
523546
request_to_blocks: dict[str, list[str]] = {}
524-
is_dump = False
525-
dump_start_time = time.perf_counter() * 1000
526-
num_dumped_block = 0
547+
is_save = False
548+
num_saved_block = 0
549+
num_saved_request = 0
550+
save_start_time = time.perf_counter() * 1000
527551
for request_id, request in metadata.request_meta.items():
528552
if len(request.dump_block_ids[0]) == 0:
529553
continue
530-
is_dump = True
531-
num_dumped_block += len(request.dump_block_ids[0])
554+
is_save = True
555+
num_saved_block += len(request.dump_block_ids[0])
556+
num_saved_request += 1
532557

533558
ucm_block_ids, vllm_block_ids = request.dump_block_ids
534559
if self.rank != 0:
@@ -563,18 +588,22 @@ def wait_for_save(self) -> None:
563588
else:
564589
logger.error(f"request {request_id} dump kv cache failed.")
565590
self.store.commit(ucm_block_ids, False)
566-
dump_end_time = time.perf_counter() * 1000
567-
if is_dump:
568-
UCMStatsMonitor.get_instance().update_stats(
569-
"UCMStats",
591+
save_end_time = time.perf_counter() * 1000
592+
save_speed = (
593+
num_saved_block
594+
* self.block_data_size
595+
/ (save_end_time - save_start_time)
596+
/ 1024
597+
/ 1024
598+
) # GB/s
599+
if is_save:
600+
self.monitor.update_stats(
601+
"ConnStats",
570602
{
571-
"save_duration": dump_end_time - dump_start_time,
572-
"save_speed": num_dumped_block
573-
* self.io_size
574-
* self.num_layers
575-
/ (dump_end_time - dump_start_time)
576-
/ 1024
577-
/ 1024, # GB/s
603+
"save_requests_num": num_saved_request,
604+
"save_blocks_num": num_saved_block,
605+
"save_duration": save_end_time - save_start_time,
606+
"save_speed": save_speed,
578607
},
579608
)
580609

@@ -699,10 +728,6 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
699728
else:
700729
self.connector = UCMDirectConnector(vllm_config, role)
701730

702-
if role == KVConnectorRole.WORKER:
703-
self.stats_logger = UCMStatsLogger(vllm_config, 10)
704-
self.monitor = UCMStatsMonitor.get_instance()
705-
706731
def get_num_new_matched_tokens(
707732
self,
708733
request: "Request",

ucm/metrics/CMakeLists.txt

Lines changed: 0 additions & 20 deletions
This file was deleted.

ucm/metrics/cc/stats/istats.h

Lines changed: 0 additions & 17 deletions
This file was deleted.

ucm/metrics/cc/stats/ucm_stats.cc

Lines changed: 0 additions & 52 deletions
This file was deleted.

ucm/metrics/cc/stats/ucm_stats.h

Lines changed: 0 additions & 38 deletions
This file was deleted.

ucm/metrics/cc/stats_monitor.cc

Lines changed: 0 additions & 55 deletions
This file was deleted.

0 commit comments

Comments
 (0)