1+ //! CPU affinity management for optimal performance.
2+ //!
3+ //! This module provides cross-platform CPU affinity management to bind threads
4+ //! to specific CPU cores for better cache locality and reduced context switching.
5+
6+ use crate :: { Error , Result } ;
7+ use once_cell:: sync:: Lazy ;
8+ use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
9+
10+ /// Global CPU affinity manager
11+ static AFFINITY_MANAGER : Lazy < CpuAffinityManager > = Lazy :: new ( CpuAffinityManager :: new) ;
12+
13+ /// CPU affinity manager for distributing threads across CPU cores.
14+ pub struct CpuAffinityManager {
15+ cpu_count : usize ,
16+ next_cpu : AtomicUsize ,
17+ }
18+
19+ impl CpuAffinityManager {
20+ /// Create a new CPU affinity manager.
21+ fn new ( ) -> Self {
22+ let cpu_count = num_cpus:: get ( ) ;
23+ tracing:: info!( "Detected {} CPU cores" , cpu_count) ;
24+
25+ Self {
26+ cpu_count,
27+ next_cpu : AtomicUsize :: new ( 0 ) ,
28+ }
29+ }
30+
31+ /// Get the number of available CPU cores.
32+ pub fn cpu_count ( & self ) -> usize {
33+ self . cpu_count
34+ }
35+
36+ /// Get the optimal number of worker threads.
37+ /// Uses all available cores but caps at a reasonable maximum.
38+ pub fn optimal_worker_count ( & self ) -> usize {
39+ self . cpu_count . min ( 32 ) . max ( 1 )
40+ }
41+
42+ /// Assign the current thread to the next available CPU core.
43+ pub fn assign_current_thread ( & self ) -> Result < usize > {
44+ let cpu_id = self . next_cpu . fetch_add ( 1 , Ordering :: Relaxed ) % self . cpu_count ;
45+ set_thread_affinity ( cpu_id) ?;
46+ tracing:: debug!( "Assigned current thread to CPU core {}" , cpu_id) ;
47+ Ok ( cpu_id)
48+ }
49+
50+ /// Spawn a new thread and bind it to a specific CPU core.
51+ pub fn spawn_on_cpu < F , T > ( & self , cpu_id : usize , f : F ) -> std:: thread:: JoinHandle < T >
52+ where
53+ F : FnOnce ( ) -> T + Send + ' static ,
54+ T : Send + ' static ,
55+ {
56+ let actual_cpu = cpu_id % self . cpu_count ;
57+ std:: thread:: spawn ( move || {
58+ if let Err ( e) = set_thread_affinity ( actual_cpu) {
59+ tracing:: warn!( "Failed to set CPU affinity for thread: {}" , e) ;
60+ }
61+ f ( )
62+ } )
63+ }
64+ }
65+
66+ /// Set CPU affinity for the current thread.
67+ #[ cfg( target_os = "linux" ) ]
68+ pub fn set_thread_affinity ( cpu_id : usize ) -> Result < ( ) > {
69+ use std:: mem;
70+
71+ unsafe {
72+ let mut set: libc:: cpu_set_t = mem:: zeroed ( ) ;
73+ libc:: CPU_ZERO ( & mut set) ;
74+ libc:: CPU_SET ( cpu_id, & mut set) ;
75+
76+ let result = libc:: sched_setaffinity ( 0 , mem:: size_of :: < libc:: cpu_set_t > ( ) , & set) ;
77+ if result != 0 {
78+ let error = std:: io:: Error :: last_os_error ( ) ;
79+ return Err ( Error :: internal ( format ! ( "Failed to set CPU affinity: {}" , error) ) ) ;
80+ }
81+ }
82+
83+ Ok ( ( ) )
84+ }
85+
86+ /// Set CPU affinity for the current thread (Windows implementation).
87+ #[ cfg( target_os = "windows" ) ]
88+ pub fn set_thread_affinity ( cpu_id : usize ) -> Result < ( ) > {
89+ use std:: ptr;
90+
91+ extern "system" {
92+ fn SetThreadAffinityMask (
93+ h_thread : * mut std:: ffi:: c_void ,
94+ dw_thread_affinity_mask : usize ,
95+ ) -> usize ;
96+ fn GetCurrentThread ( ) -> * mut std:: ffi:: c_void ;
97+ }
98+
99+ unsafe {
100+ let mask = 1usize << cpu_id;
101+ let result = SetThreadAffinityMask ( GetCurrentThread ( ) , mask) ;
102+ if result == 0 {
103+ return Err ( Error :: internal ( "Failed to set CPU affinity on Windows" ) ) ;
104+ }
105+ }
106+
107+ Ok ( ( ) )
108+ }
109+
110+ /// Set CPU affinity for the current thread (no-op on unsupported platforms).
111+ #[ cfg( not( any( target_os = "linux" , target_os = "windows" ) ) ) ]
112+ pub fn set_thread_affinity ( _cpu_id : usize ) -> Result < ( ) > {
113+ // CPU affinity not supported on this platform
114+ Ok ( ( ) )
115+ }
116+
117+ /// Get the global CPU affinity manager.
118+ pub fn global_manager ( ) -> & ' static CpuAffinityManager {
119+ & AFFINITY_MANAGER
120+ }
121+
122+ /// Get the number of available CPU cores.
123+ pub fn cpu_count ( ) -> usize {
124+ global_manager ( ) . cpu_count ( )
125+ }
126+
127+ /// Get the optimal number of worker threads.
128+ pub fn optimal_worker_count ( ) -> usize {
129+ global_manager ( ) . optimal_worker_count ( )
130+ }
131+
132+ /// Assign the current thread to the next available CPU core.
133+ pub fn assign_current_thread ( ) -> Result < usize > {
134+ global_manager ( ) . assign_current_thread ( )
135+ }
136+
137+ /// Create a Tokio runtime with CPU affinity optimization.
138+ pub fn create_optimized_runtime ( ) -> Result < tokio:: runtime:: Runtime > {
139+ let worker_threads = optimal_worker_count ( ) ;
140+
141+ tracing:: info!( "Creating Tokio runtime with {} worker threads and CPU affinity" , worker_threads) ;
142+
143+ tokio:: runtime:: Builder :: new_multi_thread ( )
144+ . worker_threads ( worker_threads)
145+ . thread_name ( "harbr-worker" )
146+ . on_thread_start ( || {
147+ if let Err ( e) = assign_current_thread ( ) {
148+ tracing:: warn!( "Failed to set CPU affinity for worker thread: {}" , e) ;
149+ }
150+ } )
151+ . enable_all ( )
152+ . build ( )
153+ . map_err ( |e| Error :: internal ( format ! ( "Failed to create Tokio runtime: {}" , e) ) )
154+ }
155+
156+ /// Configure the current thread for optimal performance.
157+ pub fn optimize_current_thread ( ) -> Result < ( ) > {
158+ // Set CPU affinity
159+ assign_current_thread ( ) ?;
160+
161+ // Set thread priority on platforms that support it
162+ #[ cfg( unix) ]
163+ {
164+ unsafe {
165+ let result = libc:: setpriority ( libc:: PRIO_PROCESS , 0 , -10 ) ;
166+ if result != 0 {
167+ tracing:: warn!( "Failed to set thread priority: {}" , std:: io:: Error :: last_os_error( ) ) ;
168+ }
169+ }
170+ }
171+
172+ #[ cfg( windows) ]
173+ {
174+ extern "system" {
175+ fn SetThreadPriority ( h_thread : * mut std:: ffi:: c_void , n_priority : i32 ) -> i32 ;
176+ fn GetCurrentThread ( ) -> * mut std:: ffi:: c_void ;
177+ }
178+
179+ const THREAD_PRIORITY_ABOVE_NORMAL : i32 = 1 ;
180+
181+ unsafe {
182+ let result = SetThreadPriority ( GetCurrentThread ( ) , THREAD_PRIORITY_ABOVE_NORMAL ) ;
183+ if result == 0 {
184+ tracing:: warn!( "Failed to set thread priority on Windows" ) ;
185+ }
186+ }
187+ }
188+
189+ Ok ( ( ) )
190+ }
191+
192+ #[ cfg( test) ]
193+ mod tests {
194+ use super :: * ;
195+
196+ #[ test]
197+ fn test_cpu_count ( ) {
198+ let count = cpu_count ( ) ;
199+ assert ! ( count > 0 , "Should detect at least one CPU core" ) ;
200+ assert ! ( count <= 256 , "Unrealistic number of CPU cores" ) ;
201+ }
202+
203+ #[ test]
204+ fn test_optimal_worker_count ( ) {
205+ let count = optimal_worker_count ( ) ;
206+ assert ! ( count > 0 , "Should have at least one worker thread" ) ;
207+ assert ! ( count <= 32 , "Should cap worker threads at reasonable maximum" ) ;
208+ }
209+
210+ #[ test]
211+ fn test_affinity_manager ( ) {
212+ let manager = CpuAffinityManager :: new ( ) ;
213+ assert_eq ! ( manager. cpu_count( ) , num_cpus:: get( ) ) ;
214+
215+ let optimal = manager. optimal_worker_count ( ) ;
216+ assert ! ( optimal > 0 ) ;
217+ assert ! ( optimal <= manager. cpu_count( ) . min( 32 ) ) ;
218+ }
219+ }
0 commit comments