Skip to content

Commit ca5111d

Browse files
mfateevvitarb
andauthored
Workflow thread deadlock detector (#243)
* Added thread deadlock detector * Fixed stack trace * Added deadlock timeout paratemer * Wired deadlock detection timeout * renamed constant to DEFAULT_DEADLOCK_DETECTION_TIMEOUT Co-authored-by: Vitaly <[email protected]>
1 parent ab8fd40 commit ca5111d

File tree

12 files changed

+213
-70
lines changed

12 files changed

+213
-70
lines changed

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
*/
3333
interface DeterministicRunner {
3434

35+
long DEFAULT_DEADLOCK_DETECTION_TIMEOUT = 1000;
36+
3537
static DeterministicRunner newRunner(Runnable root) {
3638
return new DeterministicRunnerImpl(root);
3739
}
@@ -74,8 +76,9 @@ static DeterministicRunner newRunner(
7476
* completed or blocked.
7577
*
7678
* @throws Throwable if one of the threads didn't handle an exception.
79+
* @param deadlockDetectionTimeout the maximum time a thread can run without calling yield.
7780
*/
78-
void runUntilAllBlocked();
81+
void runUntilAllBlocked(long deadlockDetectionTimeout);
7982

8083
/** IsDone returns true when all of threads are completed */
8184
boolean isDone();

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ SyncWorkflowContext getWorkflowContext() {
219219
}
220220

221221
@Override
222-
public void runUntilAllBlocked() {
222+
public void runUntilAllBlocked(long deadlockDetectionTimeout) {
223223
if (rootWorkflowThread == null) {
224224
// TODO: workflow instance specific thread name
225225
rootWorkflowThread =
@@ -276,7 +276,7 @@ public void runUntilAllBlocked() {
276276
Iterator<WorkflowThread> ci = threads.iterator();
277277
while (ci.hasNext()) {
278278
WorkflowThread c = ci.next();
279-
progress = c.runUntilBlocked() || progress;
279+
progress = c.runUntilBlocked(deadlockDetectionTimeout) || progress;
280280
if (exitRequested) {
281281
close();
282282
break outerLoop;

temporal-sdk/src/main/java/io/temporal/internal/sync/DeterministicRunnerWrapper.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
4545
result.completeExceptionally(throwable);
4646
}
4747
});
48-
runner.runUntilAllBlocked();
48+
// Used to execute activities under TestActivityEnvironment
49+
// So it is expected that a workflow thread is blocked for a long time.
50+
runner.runUntilAllBlocked(Long.MAX_VALUE);
4951
try {
5052
return result.get();
5153
} catch (ExecutionException e) {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (C) 2020 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
9+
* use this file except in compliance with the License. A copy of the License is
10+
* located at
11+
*
12+
* http://aws.amazon.com/apache2.0
13+
*
14+
* or in the "license" file accompanying this file. This file is distributed on
15+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
16+
* express or implied. See the License for the specific language governing
17+
* permissions and limitations under the License.
18+
*/
19+
20+
package io.temporal.internal.sync;
21+
22+
class PotentialDeadlockException extends RuntimeException {
23+
24+
PotentialDeadlockException(StackTraceElement[] stackTrace) {
25+
super(
26+
"Potential deadlock detected: workflow thread blocked for over a second", null, true, true);
27+
setStackTrace(stackTrace);
28+
}
29+
}

temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package io.temporal.internal.sync;
2121

22+
import static io.temporal.internal.sync.DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT;
23+
2224
import io.temporal.api.common.v1.Payloads;
2325
import io.temporal.api.common.v1.WorkflowType;
2426
import io.temporal.api.enums.v1.EventType;
@@ -134,7 +136,7 @@ public boolean eventLoop() {
134136
if (runner == null) {
135137
return false;
136138
}
137-
runner.runUntilAllBlocked();
139+
runner.runUntilAllBlocked(DEFAULT_DEADLOCK_DETECTION_TIMEOUT);
138140
return runner.isDone() || workflowProc.isDone(); // Do not wait for all other threads.
139141
}
140142

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThread.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ static WorkflowThread newThread(Runnable runnable, boolean detached, String name
8282

8383
SyncWorkflowContext getWorkflowContext();
8484

85-
boolean runUntilBlocked();
85+
boolean runUntilBlocked(long deadlockDetectionTimeout);
8686

8787
Throwable getUnhandledException();
8888

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadContext.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919

2020
package io.temporal.internal.sync;
2121

22+
import static io.temporal.internal.sync.DeterministicRunner.DEFAULT_DEADLOCK_DETECTION_TIMEOUT;
23+
2224
import com.google.common.base.Throwables;
2325
import io.temporal.workflow.Functions;
26+
import java.util.concurrent.TimeUnit;
2427
import java.util.concurrent.locks.Condition;
2528
import java.util.concurrent.locks.Lock;
2629
import java.util.function.Supplier;
@@ -43,6 +46,7 @@ class WorkflowThreadContext {
4346
private boolean remainedBlocked;
4447
private String yieldReason;
4548
private boolean destroyRequested;
49+
private Thread currentThread;
4650

4751
WorkflowThreadContext(Lock lock) {
4852
this.lock = lock;
@@ -194,15 +198,20 @@ public void setUnhandledException(Throwable unhandledException) {
194198
}
195199
}
196200

201+
public void setCurrentThread(Thread currentThread) {
202+
this.currentThread = currentThread;
203+
}
204+
197205
public String getYieldReason() {
198206
return yieldReason;
199207
}
200208

201209
/**
202210
* @return true if thread made some progress. Which is await was unblocked and some code after it
203211
* was executed.
212+
* @param deadlockDetectionTimeout
204213
*/
205-
public boolean runUntilBlocked() {
214+
public boolean runUntilBlocked(long deadlockDetectionTimeout) {
206215
lock.lock();
207216
try {
208217
if (status == Status.DONE) {
@@ -218,7 +227,10 @@ public boolean runUntilBlocked() {
218227
remainedBlocked = true;
219228
yieldCondition.signal();
220229
while (status == Status.RUNNING || status == Status.CREATED) {
221-
runCondition.await();
230+
if (!runCondition.await(deadlockDetectionTimeout, TimeUnit.MILLISECONDS)) {
231+
throw new PotentialDeadlockException(currentThread.getStackTrace());
232+
}
233+
;
222234
if (evaluationFunction != null) {
223235
throw new IllegalStateException("Cannot runUntilBlocked while evaluating");
224236
}
@@ -259,7 +271,7 @@ public void destroy() {
259271
(r) -> {
260272
throw new DestroyWorkflowThreadError();
261273
});
262-
runUntilBlocked();
274+
runUntilBlocked(DEFAULT_DEADLOCK_DETECTION_TIMEOUT);
263275
}
264276

265277
/** To be called only from a workflow thread. */

temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowThreadImpl.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ class RunnableWrapper implements Runnable {
8787
@Override
8888
public void run() {
8989
thread = Thread.currentThread();
90+
threadContext.setCurrentThread(thread);
9091
originalName = thread.getName();
9192
thread.setName(name);
9293
DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
@@ -99,7 +100,6 @@ public void run() {
99100
// Repopulate the context(s)
100101
ContextThreadLocal.setContextPropagators(this.contextPropagators);
101102
ContextThreadLocal.propagateContextToCurrentThread(this.propagatedContexts);
102-
103103
try {
104104
// initialYield blocks thread until the first runUntilBlocked is called.
105105
// Otherwise r starts executing without control of the sync.
@@ -125,6 +125,7 @@ public void run() {
125125
threadContext.setStatus(Status.DONE);
126126
thread.setName(originalName);
127127
thread = null;
128+
threadContext.setCurrentThread(null);
128129
MDC.clear();
129130
}
130131
}
@@ -308,13 +309,16 @@ public int getPriority() {
308309
return priority;
309310
}
310311

311-
/** @return true if coroutine made some progress. */
312+
/**
313+
* @return true if coroutine made some progress.
314+
* @param deadlockDetectionTimeout maximum time the thread can run before calling yield.
315+
*/
312316
@Override
313-
public boolean runUntilBlocked() {
317+
public boolean runUntilBlocked(long deadlockDetectionTimeout) {
314318
if (taskFuture == null) {
315319
start();
316320
}
317-
return context.runUntilBlocked();
321+
return context.runUntilBlocked(deadlockDetectionTimeout);
318322
}
319323

320324
@Override

0 commit comments

Comments
 (0)