|
4 | 4 | import traceback |
5 | 5 | import unittest |
6 | 6 |
|
| 7 | +import numpy as np |
| 8 | + |
7 | 9 | from vllm.distributed.device_communicators.shm_object_storage import ( |
8 | 10 | SingleWriterShmRingBuffer, |
9 | 11 | ) |
@@ -113,6 +115,69 @@ def test_clear_buffer(self): |
113 | 115 | self.assertEqual(self.ring_buffer.data_buffer_start, 0) |
114 | 116 | self.assertEqual(self.ring_buffer.data_buffer_end, 0) |
115 | 117 |
|
| 118 | + def test_allocation_cycles(self): |
| 119 | + buffer_size = 100 |
| 120 | + ring = SingleWriterShmRingBuffer(data_buffer_size=buffer_size, create=True) |
| 121 | + |
| 122 | + # tracking allocations for assertions |
| 123 | + allocated_bitmap = np.zeros( |
| 124 | + (buffer_size,), dtype=np.bool_ |
| 125 | + ) # addr -> is_allocated |
| 126 | + allocation_map = dict() # monotonic_id -> (addr, size) |
| 127 | + |
| 128 | + def count_allocated(bitmap) -> int: |
| 129 | + return np.sum(bitmap).item() |
| 130 | + |
| 131 | + def is_free_fn(a, b) -> bool: |
| 132 | + return True |
| 133 | + |
| 134 | + def mark_allocated_with_assertion(id, addr, size): |
| 135 | + addr = addr % buffer_size |
| 136 | + self.assertEqual(count_allocated(allocated_bitmap[addr : addr + size]), 0) |
| 137 | + |
| 138 | + allocated_bitmap[addr : addr + size] = True |
| 139 | + allocation_map[id] = (addr, size) |
| 140 | + |
| 141 | + def mark_freed_with_assertion(id): |
| 142 | + self.assertTrue(id in allocation_map) |
| 143 | + |
| 144 | + addr, size = allocation_map.pop(id) |
| 145 | + addr = addr % buffer_size |
| 146 | + self.assertEqual( |
| 147 | + count_allocated(allocated_bitmap[addr : addr + size]), size |
| 148 | + ) |
| 149 | + |
| 150 | + allocated_bitmap[addr : addr + size] = False |
| 151 | + |
| 152 | + def ring_free(free_size=None): |
| 153 | + freed_ids = ring.free_buf(is_free_fn, free_size) |
| 154 | + for freed_id in freed_ids: |
| 155 | + mark_freed_with_assertion(freed_id) |
| 156 | + |
| 157 | + def ring_allocate(allocate_size): |
| 158 | + allocate_size_with_md = allocate_size + ring.MD_SIZE |
| 159 | + try: |
| 160 | + addr, monotonic_id = ring.allocate_buf(allocate_size) |
| 161 | + mark_allocated_with_assertion(monotonic_id, addr, allocate_size_with_md) |
| 162 | + except MemoryError: |
| 163 | + # free 2x size for enough space if wrapping happened |
| 164 | + ring_free(allocate_size_with_md * 2) |
| 165 | + |
| 166 | + # retry allocating |
| 167 | + addr, monotonic_id = ring.allocate_buf(allocate_size) |
| 168 | + mark_allocated_with_assertion(monotonic_id, addr, allocate_size_with_md) |
| 169 | + |
| 170 | + # 1. allocation & free cycles |
| 171 | + for _ in range(33): |
| 172 | + # will consume 2 + 8 = 10 bytes per allocation |
| 173 | + ring_allocate(2) |
| 174 | + |
| 175 | + # 2. free all allocations |
| 176 | + ring_free() |
| 177 | + |
| 178 | + # 3. try allocate the largest possible buffer |
| 179 | + ring_allocate(buffer_size - ring.MD_SIZE) |
| 180 | + |
116 | 181 |
|
117 | 182 | def main(): |
118 | 183 | """Main function demonstrating usage and running tests""" |
|
0 commit comments