Skip to content

Commit a1de809

Browse files
authored
Remove reset sticky call from cache invalidation (#236)
* Remove reset sticky call from cache invalidation * extract method * Avoid doing reset sticky when history if full
1 parent 5993fc1 commit a1de809

File tree

7 files changed

+38
-41
lines changed

7 files changed

+38
-41
lines changed

temporal-sdk/src/main/java/io/temporal/internal/common/WorkflowExecutionUtils.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import io.temporal.api.workflowservice.v1.DescribeWorkflowExecutionResponse;
5757
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryRequest;
5858
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
59+
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
5960
import io.temporal.client.WorkflowFailedException;
6061
import io.temporal.common.converter.DataConverter;
6162
import io.temporal.common.converter.EncodedValues;
@@ -865,4 +866,10 @@ public static WorkflowExecutionHistory readHistory(File historyFile) throws IOEx
865866
return WorkflowExecutionHistory.fromJson(jsonHistory);
866867
}
867868
}
869+
870+
public static boolean isFullHistory(PollWorkflowTaskQueueResponseOrBuilder workflowTask) {
871+
return workflowTask.getHistory() != null
872+
&& workflowTask.getHistory().getEventsCount() > 0
873+
&& workflowTask.getHistory().getEvents(0).getEventId() == 1;
874+
}
868875
}

temporal-sdk/src/main/java/io/temporal/internal/replay/ReplayWorkflowTaskHandler.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package io.temporal.internal.replay;
2121

2222
import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue;
23+
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
2324
import static io.temporal.serviceclient.MetricsTag.METRICS_TAGS_CALL_OPTIONS_KEY;
2425

2526
import com.uber.m3.tally.Scope;
@@ -38,6 +39,7 @@
3839
import io.temporal.api.workflowservice.v1.GetWorkflowExecutionHistoryResponse;
3940
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse;
4041
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
42+
import io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest;
4143
import io.temporal.api.workflowservice.v1.RespondQueryTaskCompletedRequest;
4244
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskCompletedRequest;
4345
import io.temporal.api.workflowservice.v1.RespondWorkflowTaskFailedRequest;
@@ -216,6 +218,11 @@ private Result handleWorkflowTaskWithEmbeddedQuery(
216218

217219
if (stickyTaskQueueName != null) {
218220
cache.invalidate(execution, metricsScope);
221+
// If history if full and exception occurred then sticky session hasn't been established
222+
// yet and we can avoid doing a reset.
223+
if (!isFullHistory(workflowTask)) {
224+
resetStickyTaskList(execution);
225+
}
219226
}
220227
throw e;
221228
} finally {
@@ -227,6 +234,16 @@ private Result handleWorkflowTaskWithEmbeddedQuery(
227234
}
228235
}
229236

237+
private void resetStickyTaskList(WorkflowExecution execution) {
238+
service
239+
.futureStub()
240+
.resetStickyTaskQueue(
241+
ResetStickyTaskQueueRequest.newBuilder()
242+
.setNamespace(namespace)
243+
.setExecution(execution)
244+
.build());
245+
}
246+
230247
private Result handleQueryOnlyWorkflowTask(
231248
PollWorkflowTaskQueueResponse.Builder workflowTask, Scope metricsScope) {
232249
RespondQueryTaskCompletedRequest.Builder queryCompletedRequest =

temporal-sdk/src/main/java/io/temporal/internal/replay/WorkflowExecutorCache.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,16 @@
1919

2020
package io.temporal.internal.replay;
2121

22+
import static io.temporal.internal.common.WorkflowExecutionUtils.isFullHistory;
23+
2224
import com.google.common.base.Preconditions;
2325
import com.google.common.cache.CacheBuilder;
2426
import com.google.common.cache.CacheLoader;
2527
import com.google.common.cache.LoadingCache;
2628
import com.uber.m3.tally.Scope;
2729
import io.temporal.api.common.v1.WorkflowExecution;
2830
import io.temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponseOrBuilder;
29-
import io.temporal.api.workflowservice.v1.ResetStickyTaskQueueRequest;
3031
import io.temporal.internal.metrics.MetricsType;
31-
import io.temporal.serviceclient.WorkflowServiceStubs;
3232
import java.util.HashSet;
3333
import java.util.Objects;
3434
import java.util.Set;
@@ -38,17 +38,12 @@
3838
import java.util.concurrent.locks.ReentrantLock;
3939

4040
public final class WorkflowExecutorCache {
41-
private final WorkflowServiceStubs service;
42-
private final String namespace;
4341
private final Scope metricsScope;
4442
private final LoadingCache<String, WorkflowRunTaskHandler> cache;
4543
private final Lock cacheLock = new ReentrantLock();
4644
private final Set<String> inProcessing = new HashSet<>();
4745

48-
public WorkflowExecutorCache(
49-
WorkflowServiceStubs service, String namespace, int workflowCacheSize, Scope scope) {
50-
this.service = service;
51-
this.namespace = namespace;
46+
public WorkflowExecutorCache(int workflowCacheSize, Scope scope) {
5247
Preconditions.checkArgument(workflowCacheSize > 0, "Max cache size must be greater than 0");
5348
this.metricsScope = Objects.requireNonNull(scope);
5449
this.cache =
@@ -145,16 +140,6 @@ void invalidate(WorkflowExecution execution, Scope metricsScope) {
145140
cache.invalidate(runId);
146141
inProcessing.remove(runId);
147142
metricsScope.counter(MetricsType.STICKY_CACHE_TOTAL_FORCED_EVICTION).inc(1);
148-
if (service != null) {
149-
// Execute asynchronously
150-
service
151-
.futureStub()
152-
.resetStickyTaskQueue(
153-
ResetStickyTaskQueueRequest.newBuilder()
154-
.setNamespace(namespace)
155-
.setExecution(execution)
156-
.build());
157-
}
158143
} finally {
159144
cacheLock.unlock();
160145
}
@@ -164,12 +149,6 @@ public long size() {
164149
return cache.size();
165150
}
166151

167-
private boolean isFullHistory(PollWorkflowTaskQueueResponseOrBuilder workflowTask) {
168-
return workflowTask.getHistory() != null
169-
&& workflowTask.getHistory().getEventsCount() > 0
170-
&& workflowTask.getHistory().getEvents(0).getEventId() == 1;
171-
}
172-
173152
public void invalidateAll() {
174153
cache.invalidateAll();
175154
}

temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,7 @@ private WorkerFactory(WorkflowClient workflowClient, WorkerFactoryOptions factor
109109
.tagged(MetricsTag.defaultTags(workflowClient.getOptions().getNamespace()));
110110

111111
this.cache =
112-
new WorkflowExecutorCache(
113-
this.workflowClient.getWorkflowServiceStubs(),
114-
workflowClient.getOptions().getNamespace(),
115-
this.factoryOptions.getWorkflowCacheSize(),
116-
metricsScope);
112+
new WorkflowExecutorCache(this.factoryOptions.getWorkflowCacheSize(), metricsScope);
117113
Scope stickyScope =
118114
metricsScope.tagged(
119115
new ImmutableMap.Builder<String, String>(1)

temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerCacheTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void setUp() {
6868
public void whenHistoryIsFullNewWorkflowExecutorIsReturnedAndCached_InitiallyEmpty()
6969
throws Exception {
7070
// Arrange
71-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, new NoopScope());
71+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
7272
PollWorkflowTaskQueueResponse workflowTask =
7373
HistoryUtils.generateWorkflowTaskWithInitialHistory();
7474

@@ -92,7 +92,7 @@ public void whenHistoryIsFullNewWorkflowExecutorIsReturned_InitiallyCached() thr
9292
WorkflowServiceStubs service = testService.newClientStub();
9393

9494
// Arrange
95-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, new NoopScope());
95+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
9696
PollWorkflowTaskQueueResponse workflowTask1 =
9797
HistoryUtils.generateWorkflowTaskWithInitialHistory(
9898
"namespace", "taskQueue", "workflowType", service);
@@ -137,7 +137,7 @@ public void whenHistoryIsPartialCachedEntryIsReturned() throws Exception {
137137
.build();
138138
Scope scope = metricsScope.tagged(tags);
139139

140-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, scope);
140+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, scope);
141141
TestWorkflowService testService = new TestWorkflowService(true);
142142
WorkflowServiceStubs service = testService.newClientStub();
143143
try {
@@ -178,7 +178,7 @@ public void whenHistoryIsPartialAndCacheIsEmptyThenExceptionIsThrown() throws Ex
178178
.put(MetricsTag.TASK_QUEUE, "stickyTaskQueue")
179179
.build();
180180
Scope scope = metricsScope.tagged(tags);
181-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 10, scope);
181+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, scope);
182182

183183
// Act
184184
PollWorkflowTaskQueueResponse workflowTask =
@@ -207,7 +207,7 @@ public void evictAnyWillInvalidateAnEntryRandomlyFromTheCache() throws Exception
207207
Scope scope = metricsScope.tagged(tags);
208208

209209
// Arrange
210-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 50, scope);
210+
WorkflowExecutorCache cache = new WorkflowExecutorCache(50, scope);
211211
PollWorkflowTaskQueueResponse workflowTask1 =
212212
HistoryUtils.generateWorkflowTaskWithInitialHistory();
213213
PollWorkflowTaskQueueResponse workflowTask2 =
@@ -242,7 +242,7 @@ public void evictAnyWillInvalidateAnEntryRandomlyFromTheCache() throws Exception
242242
@Test
243243
public void evictAnyWillNotInvalidateItself() throws Exception {
244244
// Arrange
245-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 50, new NoopScope());
245+
WorkflowExecutorCache cache = new WorkflowExecutorCache(50, new NoopScope());
246246
PollWorkflowTaskQueueResponse workflowTask1 =
247247
HistoryUtils.generateWorkflowTaskWithInitialHistory();
248248

temporal-sdk/src/test/java/io/temporal/internal/replay/ReplayWorkflowRunTaskHandlerTaskHandlerTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,7 @@ public void tearDown() {
6868
@Test
6969
public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() throws Throwable {
7070
// Arrange
71-
WorkflowExecutorCache cache =
72-
new WorkflowExecutorCache(service, "default", 10, new NoopScope());
71+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
7372
WorkflowTaskHandler taskHandler =
7473
new ReplayWorkflowTaskHandler(
7574
"namespace",
@@ -95,8 +94,7 @@ public void ifStickyExecutionAttributesAreNotSetThenWorkflowsAreNotCached() thro
9594
@Test
9695
public void ifStickyExecutionAttributesAreSetThenWorkflowsAreCached() throws Throwable {
9796
// Arrange
98-
WorkflowExecutorCache cache =
99-
new WorkflowExecutorCache(service, "default", 10, new NoopScope());
97+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, new NoopScope());
10098
WorkflowTaskHandler taskHandler =
10199
new ReplayWorkflowTaskHandler(
102100
"namespace",

temporal-sdk/src/test/java/io/temporal/internal/sync/DeterministicRunnerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,7 @@ public void workflowThreadsWillEvictCacheWhenMaxThreadCountIsHit() throws Throwa
637637
new ThreadPoolExecutor(1, 3, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
638638
AtomicReference<String> status = new AtomicReference<>();
639639

640-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 3, scope);
640+
WorkflowExecutorCache cache = new WorkflowExecutorCache(3, scope);
641641
ReplayWorkflowContext replayWorkflowContext = mock(ReplayWorkflowContext.class);
642642
when(replayWorkflowContext.getMetricsScope()).thenReturn(scope);
643643
when(replayWorkflowContext.getWorkflowExecution())
@@ -706,7 +706,7 @@ public void workflowThreadsWillNotEvictCacheWhenMaxThreadCountIsHit() throws Thr
706706
new ThreadPoolExecutor(1, 5, 1, TimeUnit.SECONDS, new SynchronousQueue<>());
707707
AtomicReference<String> status = new AtomicReference<>();
708708

709-
WorkflowExecutorCache cache = new WorkflowExecutorCache(null, "default", 3, new NoopScope());
709+
WorkflowExecutorCache cache = new WorkflowExecutorCache(3, new NoopScope());
710710

711711
DeterministicRunnerImpl d =
712712
new DeterministicRunnerImpl(

0 commit comments

Comments
 (0)