1919from vllm .v1 .request import Request
2020
2121from ucm .logger import init_logger
22+ from ucm .metrics .ucm_obser import UCMStatsLogger
2223from ucm .metrics .ucmmonitor import UCMStatsMonitor
2324from ucm .store .factory import UcmConnectorFactory
2425from ucm .store .ucmstore import Task , UcmKVStoreBase
25- from ucm .metrics .ucm_obser import UCMStatsLogger
2626from ucm .utils import Config
2727
2828if TYPE_CHECKING :
@@ -131,6 +131,8 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
131131
132132 connector_configs = self .launch_config .get ("ucm_connectors" , [])
133133 assert len (connector_configs ) > 0 , "no storage connector name in config."
134+ self .io_size = config ["io_size" ]
135+ self .num_layers = num_layers
134136
135137 name = connector_configs [0 ].get ("ucm_connector_name" )
136138 config = connector_configs [0 ].get ("ucm_connector_config" ) or {}
@@ -445,17 +447,21 @@ def _broadcast(self, dst_tensor_addr: list[torch.Tensor]):
445447 tensor .copy_ (rec_tensor [i ])
446448
447449 def start_load_kv (self , forward_context : "ForwardContext" , ** kwargs ) -> None :
448-
449450 metadata = self ._get_connector_metadata ()
450451 assert isinstance (metadata , UCMConnectorMetadata )
451452
452453 self ._init_kv_caches_from_forward_context (forward_context )
453454
454455 request_to_task : dict [str , Optional [Task ]] = {}
455456 req_broadcast_addr = {}
457+ is_load = False
458+ num_loaded_block = 0
459+ load_start_time = time .perf_counter () * 1000
456460 for request_id , request in metadata .request_meta .items ():
457461 if len (request .load_block_ids [0 ]) == 0 :
458462 continue
463+ is_load = True
464+ num_loaded_block += len (request .load_block_ids [0 ])
459465
460466 ucm_block_ids , vllm_block_ids = request .load_block_ids
461467 if self .rank != 0 and not self .is_mla :
@@ -479,6 +485,20 @@ def start_load_kv(self, forward_context: "ForwardContext", **kwargs) -> None:
479485 logger .error (f"request { request_id } load kv cache failed." )
480486 if self .load_only_first_rank :
481487 self ._broadcast (req_broadcast_addr [request_id ])
488+ load_end_time = time .perf_counter () * 1000
489+ if is_load :
490+ UCMStatsMonitor .get_instance ().update_stats (
491+ "UCMStats" ,
492+ {
493+ "load_duration" : load_end_time - load_start_time ,
494+ "load_speed" : num_loaded_block
495+ * self .io_size
496+ * self .num_layers
497+ / (load_end_time - load_start_time )
498+ / 1024
499+ / 1024 , # GB/s
500+ },
501+ )
482502
483503 def wait_for_layer_load (self , layer_name : str ) -> None :
484504 pass
@@ -493,7 +513,6 @@ def save_kv_layer(
493513 pass
494514
495515 def wait_for_save (self ) -> None :
496-
497516 if self .is_mla and self .rank != 0 :
498517 return
499518
@@ -502,9 +521,14 @@ def wait_for_save(self) -> None:
502521
503522 request_to_task : dict [str , Task ] = {}
504523 request_to_blocks : dict [str , list [str ]] = {}
524+ is_dump = False
525+ dump_start_time = time .perf_counter () * 1000
526+ num_dumped_block = 0
505527 for request_id , request in metadata .request_meta .items ():
506528 if len (request .dump_block_ids [0 ]) == 0 :
507529 continue
530+ is_dump = True
531+ num_dumped_block += len (request .dump_block_ids [0 ])
508532
509533 ucm_block_ids , vllm_block_ids = request .dump_block_ids
510534 if self .rank != 0 :
@@ -539,6 +563,20 @@ def wait_for_save(self) -> None:
539563 else :
540564 logger .error (f"request { request_id } dump kv cache failed." )
541565 self .store .commit (ucm_block_ids , False )
566+ dump_end_time = time .perf_counter () * 1000
567+ if is_dump :
568+ UCMStatsMonitor .get_instance ().update_stats (
569+ "UCMStats" ,
570+ {
571+ "save_duration" : dump_end_time - dump_start_time ,
572+ "save_speed" : num_dumped_block
573+ * self .io_size
574+ * self .num_layers
575+ / (dump_end_time - dump_start_time )
576+ / 1024
577+ / 1024 , # GB/s
578+ },
579+ )
542580
543581 def clear_connector_metadata (self ) -> None :
544582 super ().clear_connector_metadata ()
0 commit comments