Skip to content

Commit 43d170a

Browse files
committed
fix: make CallbackManager isolated to MainActor
1 parent 49509de commit 43d170a

File tree

4 files changed

+34
-71
lines changed

4 files changed

+34
-71
lines changed

Sources/Helpers/EventEmitter.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ import Foundation
1313
/// When this token gets deallocated it cancels the observation it was associated with. Store this token in another object to keep the observation alive.
1414
public final class ObservationToken: @unchecked Sendable, Hashable {
1515
private let _isCancelled = LockIsolated(false)
16-
package var onCancel: @Sendable () -> Void
16+
package var onCancel: () -> Void
1717

1818
public var isCancelled: Bool {
1919
_isCancelled.withValue { $0 }
2020
}
2121

22-
package init(onCancel: @escaping @Sendable () -> Void = {}) {
22+
package init(onCancel: @escaping () -> Void = {}) {
2323
self.onCancel = onCancel
2424
}
2525

Sources/Realtime/CallbackManager.swift

Lines changed: 31 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,75 @@
11
import ConcurrencyExtras
22
import Foundation
33

4-
final class CallbackManager: Sendable {
5-
struct MutableState {
6-
var id = 0
7-
var serverChanges: [PostgresJoinConfig] = []
8-
var callbacks: [RealtimeCallback] = []
9-
}
10-
11-
private let mutableState = LockIsolated(MutableState())
12-
13-
var serverChanges: [PostgresJoinConfig] {
14-
mutableState.serverChanges
15-
}
16-
17-
var callbacks: [RealtimeCallback] {
18-
mutableState.callbacks
19-
}
20-
21-
deinit {
22-
reset()
23-
}
4+
@MainActor
5+
final class CallbackManager {
6+
var id = 0
7+
var serverChanges: [PostgresJoinConfig] = []
8+
var callbacks: [RealtimeCallback] = []
249

2510
@discardableResult
2611
func addBroadcastCallback(
2712
event: String,
2813
callback: @escaping @Sendable (JSONObject) -> Void
2914
) -> Int {
30-
mutableState.withValue {
31-
$0.id += 1
32-
$0.callbacks.append(
33-
.broadcast(
34-
BroadcastCallback(
35-
id: $0.id,
36-
event: event,
37-
callback: callback
38-
)
15+
self.id += 1
16+
self.callbacks.append(
17+
.broadcast(
18+
BroadcastCallback(
19+
id: self.id,
20+
event: event,
21+
callback: callback
3922
)
4023
)
41-
return $0.id
42-
}
24+
)
25+
return self.id
4326
}
4427

4528
@discardableResult
4629
func addPostgresCallback(
4730
filter: PostgresJoinConfig,
4831
callback: @escaping @Sendable (AnyAction) -> Void
4932
) -> Int {
50-
mutableState.withValue {
51-
$0.id += 1
52-
$0.callbacks.append(
33+
self.id += 1
34+
self.callbacks.append(
5335
.postgres(
5436
PostgresCallback(
55-
id: $0.id,
37+
id: self.id,
5638
filter: filter,
5739
callback: callback
5840
)
5941
)
6042
)
61-
return $0.id
62-
}
43+
return self.id
6344
}
6445

6546
@discardableResult
6647
func addPresenceCallback(callback: @escaping @Sendable (any PresenceAction) -> Void) -> Int {
67-
mutableState.withValue {
68-
$0.id += 1
69-
$0.callbacks.append(.presence(PresenceCallback(id: $0.id, callback: callback)))
70-
return $0.id
71-
}
48+
self.id += 1
49+
self.callbacks.append(.presence(PresenceCallback(id: self.id, callback: callback)))
50+
return self.id
7251
}
7352

7453
@discardableResult
7554
func addSystemCallback(callback: @escaping @Sendable (RealtimeMessageV2) -> Void) -> Int {
76-
mutableState.withValue {
77-
$0.id += 1
78-
$0.callbacks.append(.system(SystemCallback(id: $0.id, callback: callback)))
79-
return $0.id
80-
}
55+
self.id += 1
56+
self.callbacks.append(.system(SystemCallback(id: self.id, callback: callback)))
57+
return self.id
8158
}
8259

8360
func setServerChanges(changes: [PostgresJoinConfig]) {
84-
mutableState.withValue {
85-
$0.serverChanges = changes
86-
}
61+
self.serverChanges = changes
8762
}
8863

8964
func removeCallback(id: Int) {
90-
mutableState.withValue {
91-
$0.callbacks.removeAll { $0.id == id }
92-
}
65+
self.callbacks.removeAll { $0.id == id }
9366
}
9467

9568
func triggerPostgresChanges(ids: [Int], data: AnyAction) {
96-
// Read mutableState at start to acquire lock once.
97-
let mutableState = mutableState.value
98-
99-
let filters = mutableState.serverChanges.filter {
69+
let filters = serverChanges.filter {
10070
ids.contains($0.id)
10171
}
102-
let postgresCallbacks = mutableState.callbacks.compactMap {
72+
let postgresCallbacks = callbacks.compactMap {
10373
if case let .postgres(callback) = $0 {
10474
return callback
10575
}
@@ -118,7 +88,7 @@ final class CallbackManager: Sendable {
11888
}
11989

12090
func triggerBroadcast(event: String, json: JSONObject) {
121-
let broadcastCallbacks = mutableState.callbacks.compactMap {
91+
let broadcastCallbacks = callbacks.compactMap {
12292
if case let .broadcast(callback) = $0 {
12393
return callback
12494
}
@@ -133,7 +103,7 @@ final class CallbackManager: Sendable {
133103
leaves: [String: PresenceV2],
134104
rawMessage: RealtimeMessageV2
135105
) {
136-
let presenceCallbacks = mutableState.callbacks.compactMap {
106+
let presenceCallbacks = callbacks.compactMap {
137107
if case let .presence(callback) = $0 {
138108
return callback
139109
}
@@ -151,7 +121,7 @@ final class CallbackManager: Sendable {
151121
}
152122

153123
func triggerSystem(message: RealtimeMessageV2) {
154-
let systemCallbacks = mutableState.callbacks.compactMap {
124+
let systemCallbacks = callbacks.compactMap {
155125
if case .system(let callback) = $0 {
156126
return callback
157127
}
@@ -162,10 +132,6 @@ final class CallbackManager: Sendable {
162132
systemCallback.callback(message)
163133
}
164134
}
165-
166-
func reset() {
167-
mutableState.setValue(MutableState())
168-
}
169135
}
170136

171137
struct PostgresCallback {

Sources/Realtime/RealtimeChannelV2.swift

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,6 @@ public final class RealtimeChannelV2: Sendable, RealtimeChannelProtocol {
8888
self.socket = socket
8989
}
9090

91-
deinit {
92-
callbackManager.reset()
93-
}
94-
9591
/// Subscribes to the channel.
9692
public func subscribeWithError() async throws {
9793
logger?.debug(

Tests/RealtimeTests/CallbackManagerTests.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import XCTest
1111

1212
@testable import Realtime
1313

14+
@MainActor
1415
final class CallbackManagerTests: XCTestCase {
1516
func testIntegration() {
1617
let callbackManager = CallbackManager()

0 commit comments

Comments
 (0)