Skip to content

Commit 02af36d

Browse files
imkerodongluwywang96
authored
[Bugfix] Fix allocation & free logic of SingleWriterShmRingBuffer (#27117)
Signed-off-by: Kero Liang <[email protected]> Signed-off-by: Roger Wang <[email protected]> Co-authored-by: donglu <[email protected]> Co-authored-by: Roger Wang <[email protected]>
1 parent e88bdd6 commit 02af36d

File tree

2 files changed

+75
-4
lines changed

2 files changed

+75
-4
lines changed

tests/distributed/test_shm_buffer.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
import traceback
55
import unittest
66

7+
import numpy as np
8+
79
from vllm.distributed.device_communicators.shm_object_storage import (
810
SingleWriterShmRingBuffer,
911
)
@@ -113,6 +115,69 @@ def test_clear_buffer(self):
113115
self.assertEqual(self.ring_buffer.data_buffer_start, 0)
114116
self.assertEqual(self.ring_buffer.data_buffer_end, 0)
115117

118+
def test_allocation_cycles(self):
119+
buffer_size = 100
120+
ring = SingleWriterShmRingBuffer(data_buffer_size=buffer_size, create=True)
121+
122+
# tracking allocations for assertions
123+
allocated_bitmap = np.zeros(
124+
(buffer_size,), dtype=np.bool_
125+
) # addr -> is_allocated
126+
allocation_map = dict() # monotonic_id -> (addr, size)
127+
128+
def count_allocated(bitmap) -> int:
129+
return np.sum(bitmap).item()
130+
131+
def is_free_fn(a, b) -> bool:
132+
return True
133+
134+
def mark_allocated_with_assertion(id, addr, size):
135+
addr = addr % buffer_size
136+
self.assertEqual(count_allocated(allocated_bitmap[addr : addr + size]), 0)
137+
138+
allocated_bitmap[addr : addr + size] = True
139+
allocation_map[id] = (addr, size)
140+
141+
def mark_freed_with_assertion(id):
142+
self.assertTrue(id in allocation_map)
143+
144+
addr, size = allocation_map.pop(id)
145+
addr = addr % buffer_size
146+
self.assertEqual(
147+
count_allocated(allocated_bitmap[addr : addr + size]), size
148+
)
149+
150+
allocated_bitmap[addr : addr + size] = False
151+
152+
def ring_free(free_size=None):
153+
freed_ids = ring.free_buf(is_free_fn, free_size)
154+
for freed_id in freed_ids:
155+
mark_freed_with_assertion(freed_id)
156+
157+
def ring_allocate(allocate_size):
158+
allocate_size_with_md = allocate_size + ring.MD_SIZE
159+
try:
160+
addr, monotonic_id = ring.allocate_buf(allocate_size)
161+
mark_allocated_with_assertion(monotonic_id, addr, allocate_size_with_md)
162+
except MemoryError:
163+
# free 2x size for enough space if wrapping happened
164+
ring_free(allocate_size_with_md * 2)
165+
166+
# retry allocating
167+
addr, monotonic_id = ring.allocate_buf(allocate_size)
168+
mark_allocated_with_assertion(monotonic_id, addr, allocate_size_with_md)
169+
170+
# 1. allocation & free cycles
171+
for _ in range(33):
172+
# will consume 2 + 8 = 10 bytes per allocation
173+
ring_allocate(2)
174+
175+
# 2. free all allocations
176+
ring_free()
177+
178+
# 3. try allocate the largest possible buffer
179+
ring_allocate(buffer_size - ring.MD_SIZE)
180+
116181

117182
def main():
118183
"""Main function demonstrating usage and running tests"""

vllm/distributed/device_communicators/shm_object_storage.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,7 @@ def __init__(
127127

128128
if create:
129129
# we are creating a buffer
130-
self.metadata = {
131-
self.monotonic_id_end: self.data_buffer_end
132-
} # monotonic_id -> start address
130+
self.metadata: dict[int, int] = {} # monotonic_id -> start address
133131
self.shared_memory = shared_memory.SharedMemory(
134132
create=True, size=self.data_buffer_size, name=name
135133
)
@@ -288,7 +286,15 @@ def free_buf(
288286
self.monotonic_id_start = (
289287
self.monotonic_id_start + 1
290288
) % self.ID_MAX
291-
self.data_buffer_start = address
289+
if self.monotonic_id_start in self.metadata:
290+
# pointing to the start addr of next allocation
291+
self.data_buffer_start += (
292+
self.metadata[self.monotonic_id_start]
293+
- self.data_buffer_start
294+
) % self.data_buffer_size
295+
else:
296+
# no remaining allocation, reset to zero
297+
self.data_buffer_start = self.data_buffer_end = 0
292298
freed_bytes += metadata[1]
293299
else:
294300
# there are still readers, we cannot free the buffer

0 commit comments

Comments
 (0)