Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.onesignal.core.internal.operations.impl

import com.onesignal.common.threading.WaiterWithValue
import com.onesignal.common.threading.suspendifyOnIO
import com.onesignal.core.internal.config.ConfigModelStore
import com.onesignal.core.internal.operations.ExecutionResult
import com.onesignal.core.internal.operations.GroupComparisonType
Expand All @@ -14,7 +13,10 @@ import com.onesignal.debug.LogLevel
import com.onesignal.debug.internal.logging.Logging
import com.onesignal.user.internal.operations.impl.states.NewRecordsState
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withTimeoutOrNull
import java.util.UUID
import kotlin.math.max
Expand Down Expand Up @@ -43,6 +45,14 @@ internal class OperationRepo(
val previousWaitedTime: Long = 0,
)

// The order of operation execution is critical to this OperationRepo
// logic, all processing must be done on same thread to ensure this.
// - This result of not following this is flaky tests, which inturn could
// result in bugs in production.
private val scope by lazy {
CoroutineScope(newSingleThreadContext(name = "OSOperationRepoScope"))
}

private val executorsMap: Map<String, IOperationExecutor>
internal val queue = mutableListOf<OperationQueueItem>()
private val waiter = WaiterWithValue<LoopWaiterMessage>()
Expand Down Expand Up @@ -92,7 +102,7 @@ internal class OperationRepo(

override fun start() {
paused = false
suspendifyOnIO {
scope.launch {
// load saved operations first then start processing the queue to ensure correct operation order
loadSavedOperations()
processQueueForever()
Expand All @@ -113,8 +123,7 @@ internal class OperationRepo(
Logging.log(LogLevel.DEBUG, "OperationRepo.enqueue(operation: $operation, flush: $flush)")

operation.id = UUID.randomUUID().toString()
// Use suspendifyOnIO to ensure non-blocking behavior for main thread
suspendifyOnIO {
scope.launch {
internalEnqueue(OperationQueueItem(operation, bucket = enqueueIntoBucket), flush, true)
}
}
Expand All @@ -127,7 +136,9 @@ internal class OperationRepo(

operation.id = UUID.randomUUID().toString()
val waiter = WaiterWithValue<Boolean>()
internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true)
scope.launch {
internalEnqueue(OperationQueueItem(operation, waiter, bucket = enqueueIntoBucket), flush, true)
}
return waiter.waitForWake()
}

Expand Down