11package com.enginebai.base.extensions
22
3- import kotlinx.coroutines.delay
3+ import kotlinx.coroutines.*
4+ import kotlinx.coroutines.sync.Mutex
5+ import kotlinx.coroutines.sync.withLock
46import timber.log.Timber
7+ import java.util.concurrent.atomic.AtomicReference
58
69/* *
710 * Retry running block with exponential backoff mechanism.
@@ -26,4 +29,100 @@ suspend fun <T> retry(
2629 currentDelay = (currentDelay * delayFactor).toLong()
2730 }
2831 return block() // last attempt
32+ }
33+
34+ /* *
35+ * A helper class that controls the task execution as new task requests to run:
36+ * 1. Cancel previous task and run the new task.
37+ * 2. Execute all task sequentially.
38+ * 3. Continue previous task and don't run the new task.
39+ */
40+ // Source: https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7
41+ class CoroutineRunningController <T > {
42+
43+ // A lock that may only be taken by one coroutine at a time.
44+ private val mutex by lazy { Mutex () }
45+
46+ // The currently running task, this uses atomic reference for thread safety.
47+ private val activeTask by lazy { AtomicReference <Deferred <T >? > (null ) }
48+
49+ /* *
50+ * Cancel all previous tasks before calling block, then run block.
51+ */
52+ suspend fun cancelPreviousThenRun (block : suspend () -> T ): T {
53+ // Cancel previous task if there is.
54+ activeTask.get()?.cancelAndJoin()
55+
56+ return coroutineScope {
57+ // Create a new coroutine for new task and don't start it until it's decided
58+ // that the new task should execute.
59+ val newTask = async(start = CoroutineStart .LAZY ) { block() }
60+
61+ // Reset the currently running task to null as new task completes.
62+ newTask.invokeOnCompletion {
63+ activeTask.compareAndSet(newTask, null )
64+ }
65+
66+ val result: T
67+ // Loop until all previous tasks are canceled and we can run new task.
68+ while (true ) {
69+ // Some other tasks started before the new task got set to running.
70+ // If there is still other tasks running, just cancel.
71+ if (! activeTask.compareAndSet(null , newTask)) {
72+ activeTask.get()?.cancelAndJoin()
73+ yield ()
74+ } else {
75+ result = newTask.await()
76+ break
77+ }
78+ }
79+ result
80+ }
81+ }
82+
83+ /* *
84+ * Ensure to execute the tasks one by one, it will always ensure that all previously tasks
85+ * completes prior to start the current block task. Any future calls to this method while the
86+ * current block task is running will not execute until the current block task completes.
87+ */
88+ suspend fun queueTask (block : suspend () -> T ): T {
89+ mutex.withLock {
90+ return block()
91+ }
92+ }
93+
94+ /* *
95+ * Don't run the new task while a previous task is running, instead wait for the previous task
96+ * and return its result.
97+ */
98+ suspend fun joinPreviousOrRun (block : suspend () -> T ): T {
99+ // If a previous task is running, then wait and return its result.
100+ activeTask.get()?.let { return it.await() }
101+
102+ return coroutineScope {
103+ val newTask = async(start = CoroutineStart .LAZY ) { block() }
104+ newTask.invokeOnCompletion {
105+ activeTask.compareAndSet(newTask, null )
106+ }
107+
108+ val result: T
109+ while (true ) {
110+ // Loop to check if there is running tasks, then join.
111+ if (! activeTask.compareAndSet(null , newTask)) {
112+ val currentTask = activeTask.get()
113+ if (currentTask != null ) {
114+ newTask.cancel()
115+ result = currentTask.await()
116+ break
117+ } else {
118+ yield ()
119+ }
120+ } else {
121+ result = newTask.await()
122+ break
123+ }
124+ }
125+ result
126+ }
127+ }
29128}
0 commit comments