@@ -28,28 +28,38 @@ import Musl
2828import Glibc
2929#endif
3030
31- final class FilesystemEventWorker : @ unchecked Sendable {
31+ final class FilesystemEventWorker : Sendable {
3232 private static let handshakeReady : UInt8 = 0xAA
3333 private static let handshakeFailure : UInt8 = 0xFF
3434
3535 private let containerID : String
3636 private let containerPID : Int32
37- private var childPID : Int32 ?
38- private var parentSocket : Int32 ?
39- private var channel : Channel ?
4037 private let eventLoop : EventLoop
41- private var eventIDCounter : UInt32 = 0
42- private let pendingEvents : Mutex < [ UInt32 : CheckedContinuation < Void , Error > ] > = Mutex ( [ : ] )
4338 private let shouldStop : Atomic < Bool > = Atomic ( false )
4439
40+ // Cross-thread state (accessed from any thread)
41+ private struct State {
42+ var childPID : Int32 ?
43+ var parentSocket : Int32 ?
44+ }
45+ private let state : Mutex < State > = Mutex ( State ( childPID: nil , parentSocket: nil ) )
46+
47+ // Event-loop confined state (only accessed on channel.eventLoop)
48+ private final class ELState : @unchecked Sendable {
49+ var channel : Channel ?
50+ var eventIDCounter : UInt32 = 0
51+ var pendingEvents : [ UInt32 : CheckedContinuation < Void , Error > ] = [ : ]
52+ }
53+ private let elState = ELState ( )
54+
4555 init ( containerID: String , containerPID: Int32 , eventLoop: EventLoop ) {
4656 self . containerID = containerID
4757 self . containerPID = containerPID
4858 self . eventLoop = eventLoop
4959 }
5060
5161 func start( ) throws {
52- guard childPID == nil else {
62+ guard state . withLock ( { $0 . childPID } ) == nil else {
5363 throw ContainerizationError ( . invalidState, message: " FilesystemEventWorker already started " )
5464 }
5565
@@ -94,25 +104,27 @@ final class FilesystemEventWorker: @unchecked Sendable {
94104 let pid = command. pid
95105 close ( childSocket)
96106 close ( errorWriteFD) // Close write end in parent
97- self . childPID = pid
98- self . parentSocket = parentSocket
107+ state. withLock {
108+ $0. childPID = pid
109+ $0. parentSocket = parentSocket
110+ }
99111
100112 var handshake : UInt8 = 0
101113 let readResult = read ( parentSocket, & handshake, 1 )
102114
103115 if readResult != 1 {
104116 close ( parentSocket)
105- self . parentSocket = nil
117+ state . withLock { $0 . parentSocket = nil }
106118 close ( errorReadFD)
107119 var status : Int32 = 0
108120 waitpid ( pid, & status, 0 )
109- self . childPID = nil
121+ state . withLock { $0 . childPID = nil }
110122 throw ContainerizationError ( . internalError, message: " Child process failed to start " )
111123 }
112124
113125 if handshake == Self . handshakeFailure {
114126 close ( parentSocket)
115- self . parentSocket = nil
127+ state . withLock { $0 . parentSocket = nil }
116128
117129 // Read error message from child
118130 var errorBuffer = [ UInt8] ( repeating: 0 , count: 1024 )
@@ -121,7 +133,7 @@ final class FilesystemEventWorker: @unchecked Sendable {
121133
122134 var status : Int32 = 0
123135 waitpid ( pid, & status, 0 )
124- self . childPID = nil
136+ state . withLock { $0 . childPID = nil }
125137
126138 let errorMsg =
127139 bytesRead > 0
@@ -132,11 +144,11 @@ final class FilesystemEventWorker: @unchecked Sendable {
132144
133145 if handshake != Self . handshakeReady {
134146 close ( parentSocket)
135- self . parentSocket = nil
147+ state . withLock { $0 . parentSocket = nil }
136148 close ( errorReadFD)
137149 var status : Int32 = 0
138150 waitpid ( pid, & status, 0 )
139- self . childPID = nil
151+ state . withLock { $0 . childPID = nil }
140152 throw ContainerizationError ( . internalError, message: " Child process sent unexpected handshake: \( handshake) " )
141153 }
142154
@@ -149,68 +161,73 @@ final class FilesystemEventWorker: @unchecked Sendable {
149161 let handler = ResponseHandler ( worker: self )
150162 return channel. pipeline. addHandler ( handler)
151163 }
152- self . channel = try bootstrap. takingOwnershipOfDescriptor ( inputOutput: parentSocket) . wait ( )
164+ self . elState . channel = try bootstrap. takingOwnershipOfDescriptor ( inputOutput: parentSocket) . wait ( )
153165 } catch {
154166 close ( parentSocket)
155- self . parentSocket = nil
167+ state . withLock { $0 . parentSocket = nil }
156168 var status : Int32 = 0
157169 waitpid ( pid, & status, 0 )
158- self . childPID = nil
170+ state . withLock { $0 . childPID = nil }
159171 throw ContainerizationError ( . internalError, message: " Failed to setup NIO channel: \( error) " )
160172 }
161173 }
162174
163175 func enqueueEvent( path: String , eventType: Com_Apple_Containerization_Sandbox_V3_FileSystemEventType ) async throws {
164- guard let socket = parentSocket, !shouldStop. load ( ordering: . relaxed) else {
165- throw ContainerizationError ( . invalidState, message: " FilesystemEventWorker not running " )
176+ let socket = try state. withLock { state throws -> Int32 in
177+ guard let socket = state. parentSocket, !shouldStop. load ( ordering: . relaxed) else {
178+ throw ContainerizationError ( . invalidState, message: " FilesystemEventWorker not running " )
179+ }
180+ return socket
166181 }
167182
168- let eventID = eventIDCounter
169- eventIDCounter += 1
170-
183+ // Use continuation to bridge between async and event loop
171184 try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < Void , Error > ) in
172- pendingEvents. withLock { events in
173- events [ eventID] = continuation
174- }
175-
176- do {
177- try sendEventToChild ( socket: socket, eventID: eventID, path: path, eventType: eventType)
178- } catch {
179- _ = pendingEvents. withLock { events in
180- events. removeValue ( forKey: eventID)
185+ // Hop to event loop to access event ID counter and store continuation
186+ eventLoop. execute { [ elState] in
187+ let eventID = elState. eventIDCounter
188+ elState. eventIDCounter += 1
189+ elState. pendingEvents [ eventID] = continuation
190+
191+ do {
192+ try self . sendEventToChild ( socket: socket, eventID: eventID, path: path, eventType: eventType)
193+ } catch {
194+ // Remove continuation and resume with error on send failure
195+ _ = elState. pendingEvents. removeValue ( forKey: eventID)
196+ continuation. resume ( throwing: error)
181197 }
182- continuation. resume ( throwing: error)
183198 }
184199 }
185200 }
186201
187202 func stop( ) {
188203 shouldStop. store ( true , ordering: . relaxed)
189204
190- if let channel = self . channel {
191- try ? channel. close ( ) . wait ( )
192- self . channel = nil
205+ // Close channel and clean up pending events on event loop
206+ eventLoop. execute { [ elState] in
207+ elState. channel? . close ( promise: nil )
208+ elState. channel = nil
209+
210+ for (_, continuation) in elState. pendingEvents {
211+ continuation. resume ( throwing: ContainerizationError ( . cancelled, message: " FilesystemEventWorker stopped " ) )
212+ }
213+ elState. pendingEvents. removeAll ( )
193214 }
194215
195- self . parentSocket = nil
216+ // Kill child process
217+ state. withLock { state in
218+ state. parentSocket = nil
196219
197- if let pid = childPID {
198- #if canImport(Musl)
199- Musl . kill ( pid, SIGTERM)
200- #elseif canImport(Glibc)
201- Glibc . kill ( pid, SIGTERM)
202- #endif
220+ if let pid = state . childPID {
221+ #if canImport(Musl)
222+ Musl . kill ( pid, SIGTERM)
223+ #elseif canImport(Glibc)
224+ Glibc . kill ( pid, SIGTERM)
225+ #endif
203226
204- var status : Int32 = 0
205- waitpid ( pid, & status, 0 )
206- childPID = nil
207- }
208-
209- pendingEvents. withLock { events in
210- for (_, continuation) in events {
211- continuation. resume ( throwing: ContainerizationError ( . cancelled, message: " FilesystemEventWorker stopped " ) )
227+ var status : Int32 = 0
228+ waitpid ( pid, & status, 0 )
229+ state. childPID = nil
212230 }
213- events. removeAll ( )
214231 }
215232 }
216233
@@ -254,25 +271,23 @@ final class FilesystemEventWorker: @unchecked Sendable {
254271 break
255272 }
256273
257- worker. pendingEvents. withLock { events in
258- if let continuation = events. removeValue ( forKey: eventID) {
259- if success == 1 {
260- continuation. resume ( )
261- } else {
262- continuation. resume ( throwing: ContainerizationError ( . internalError, message: " Child process failed to process filesystem event " ) )
263- }
274+ // ResponseHandler runs on event loop, so can access elState.pendingEvents directly
275+ if let continuation = worker. elState. pendingEvents. removeValue ( forKey: eventID) {
276+ if success == 1 {
277+ continuation. resume ( )
278+ } else {
279+ continuation. resume ( throwing: ContainerizationError ( . internalError, message: " Child process failed to process filesystem event " ) )
264280 }
265281 }
266282 }
267283 }
268284
269285 func errorCaught( context: ChannelHandlerContext , error: Error ) {
270- worker. pendingEvents. withLock { events in
271- for (_, continuation) in events {
272- continuation. resume ( throwing: error)
273- }
274- events. removeAll ( )
286+ // ResponseHandler runs on event loop, so can access elState.pendingEvents directly
287+ for (_, continuation) in worker. elState. pendingEvents {
288+ continuation. resume ( throwing: error)
275289 }
290+ worker. elState. pendingEvents. removeAll ( )
276291 }
277292 }
278293}
0 commit comments