Skip to content

feat(kvbm): integrate kvbm-logical into Qwen3-4B KV cache#187

Merged
xiaguan merged 3 commits into
mainfrom
feat/dynamo-kvbm-port
May 28, 2026
Merged

feat(kvbm): integrate kvbm-logical into Qwen3-4B KV cache#187
xiaguan merged 3 commits into
mainfrom
feat/dynamo-kvbm-port

Conversation

@xiaguan
Copy link
Copy Markdown
Owner

@xiaguan xiaguan commented May 28, 2026

Summary

  • Port 11 NVIDIA Dynamo kvbm crates into workspace (Apache-2.0)
  • Add pegainfer-kv-cache crate wrapping kvbm-logical's BlockManager with KvCacheManager / RequestKv / KvView
  • Replace KvPool/KvState with kvbm-logical lifecycle in Qwen3-4B executor

Architecture change

Before After
Qwen3Model owns KvPool Model is pure weights, executor owns KvCacheManager
Per-TP-rank RequestStateStore Centralized HashMap<RequestId, RequestKv> in executor
Workers manage per-request state Workers stateless, receive pre-computed KvViews
ensure_capacity/advance inside forward Forward functions pure compute: &[KvView] + &CudaSlice<bf16> + &KvLayout
Implicit TP page-index consistency Single BlockManager guarantees identical indices

Block lifecycle

schedule_prefill → prefill_view → batch_prefill (kernel) → apply_prefill
schedule_decode  → decode_view  → batch_decode  (kernel) → apply_decode
drop_request     → RAII reclaims blocks

Test plan

  • cargo check --tests --bins clean (0 errors, 0 warnings)
  • pegainfer-kv-cache lifecycle tests (10 cases)
  • vllm bench serve 200 requests, block pool stable at ~2440, no leak
  • E2E greedy regression: cargo test -p pegainfer-qwen3-4b --test e2e
  • TPOT bench ±3% vs baseline

🤖 Generated with Claude Code

xiaguan and others added 3 commits May 28, 2026 14:16
Port the KV block manager subsystem from NVIDIA Dynamo into the
pegainfer workspace as a foundation for production-grade KV cache
management. All 11 crates compile and kvbm-logical's 492 unit tests
pass.

Ported crates: dynamo-tokens, dynamo-memory, dynamo-kv-hashing,
dynamo-kv-router, kvbm-common, kvbm-config, kvbm-consolidator,
kvbm-engine, kvbm-kernels, kvbm-logical, kvbm-physical.

Modifications from upstream:
- dynamo-runtime dep removed (not ported; only used by kv-router
  optional feature runtime-protocols)
- rand 0.9→0.10 API fixes in dynamo-kv-router (RngExt, random_range)
- standalone_indexer/runtime submodule decls removed (files depend on
  dynamo-runtime, never existed in upstream either)
- workspace.package added for dynamo crate edition/metadata inheritance

Also adds KV cache design doc and Apache-2.0 license attribution.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
New standalone crate providing the public KV cache management API:
- KvCacheManager: engine-level BlockManager + GPU buffer
- RequestKv: per-request lifecycle (schedule → view → apply)
- KvView/KvViewDesc: immutable kernel metadata for forward paths
- KvBuffer/KvLayout: GPU buffer geometry without allocator

10 integration tests covering lifecycle contracts: block allocation,
page boundary crossing, capacity exhaustion, drop-without-apply
safety, and view invariants.

Also adds Qwen3-4B kvbm integration spec doc.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Architecture change: centralize KV block management in Qwen3Executor
via KvCacheManager (wrapping kvbm-logical's BlockManager), replacing
the per-TP-rank RequestStateStore + KvPool design.

Key changes:
- Qwen3Model no longer owns KvPool; it exposes kv_budget() for the
  executor to create KvCacheManager
- Executor owns HashMap<RequestId, RequestKv> centrally; workers are
  stateless (receive pre-computed KvViews via StepCommand)
- Forward functions (batch_prefill, batch_decode, unified_step) are
  pure compute: take &[KvView] + &CudaSlice<bf16> + &KvLayout, no
  ensure_capacity/advance
- Block lifecycle follows schedule → view → forward → apply pattern
- TP consistency guaranteed by single BlockManager (vs implicit
  assumption of identical allocation order across ranks)
- Scheduler renamed page → block terminology
- Drop uses RAII (SchedulableSequence block guards) instead of
  explicit release, safe in any lifecycle state

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request integrates the Dynamo KV cache management framework into the codebase, introducing several ported crates (such as dynamo-tokens, dynamo-kv-hashing, and dynamo-kv-router), licensing files, and detailed design documentation. The changes establish a logical/physical split KV cache design, a radix tree indexer, and a lower-tier continuation indexer. The review feedback identifies two critical issues: a potential memory leak and stale worker references in anchor_nodes due to a lack of pruning, and a state desynchronization bug in remove_blocks_impl where an early return aborts the removal of subsequent blocks in a batch.

Comment on lines +87 to +106
fn apply_anchor(
&self,
worker: WorkerWithDpRank,
anchor: AnchorTask,
) -> Result<(), KvCacheEventError> {
let anchor_node = {
let entry = self
.anchor_nodes
.entry(anchor.anchor_id)
.or_insert_with(|| {
Arc::new(Node::from_anchor(
anchor.anchor_local_hash,
anchor.anchor_id,
))
});
entry.clone()
};

anchor_node.promote_worker_to_full_edge(worker);
Ok(())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The anchor_nodes map is never pruned or cleaned up, leading to unbounded memory growth (memory leak) over time as new anchors are registered. Furthermore, when a worker is removed via RemoveWorker or RemoveWorkerDpRank, the worker is only dropped from nodes present in the thread-local lookup map. Since apply_anchor does not insert the anchor node into lookup, the worker is never removed from the anchor node's full_edge_workers (unless resolve_anchor_lookup happened to be called), leading to stale worker references in the anchor nodes.

Consider implementing a pruning mechanism for anchor_nodes during cleanup (e.g., in sweep_stale_children or run_cleanup_task), removing any anchor nodes that have no active workers and no children. Additionally, ensure that when a worker is removed, they are also cleaned up from the anchor_nodes map.

Comment on lines +254 to +274
block_hashes: &[ExternalSequenceBlockHash],
) -> Result<(), KvCacheEventError> {
let remove_worker_entry = {
let Some(worker_map) = worker_blocks.get_mut(&worker) else {
return Err(KvCacheEventError::BlockNotFound);
};

for block_hash in block_hashes {
let Some(key) = worker_map.remove(block_hash) else {
return Err(KvCacheEventError::BlockNotFound);
};

self.remove_worker_from_edge(key, worker);
}

worker_map.is_empty()
};

if remove_worker_entry {
worker_blocks.remove(&worker);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If a batch of block hashes is removed, and one of them is not found in worker_map, remove_blocks_impl immediately returns Err(KvCacheEventError::BlockNotFound) and aborts the loop. This leaves the remaining blocks in the batch registered, leading to state desynchronization between the primary indexer (which continues processing remaining blocks on error) and the lower-tier indexer.

Modify the loop to process all block hashes in the batch, logging a warning or accumulating the error, and only return the error after the loop completes, matching the behavior of the primary indexer.

        let mut kv_cache_err = None;
        let remove_worker_entry = {
            let Some(worker_map) = worker_blocks.get_mut(&worker) else {
                return Err(KvCacheEventError::BlockNotFound);
            };

            for block_hash in block_hashes {
                if let Some(key) = worker_map.remove(block_hash) {
                    self.remove_worker_from_edge(key, worker);
                } else {
                    if kv_cache_err.is_none() {
                        kv_cache_err = Some(KvCacheEventError::BlockNotFound);
                    }
                }
            }

            worker_map.is_empty()
        };

        if remove_worker_entry {
            worker_blocks.remove(&worker);
        }

        kv_cache_err.map_or(Ok(()), Err)

@xiaguan xiaguan merged commit de26afa into main May 28, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant