diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index ad805e0eb..977d9754e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -1365,6 +1365,12 @@ public void continueAsNew(ContinueAsNewInput input) { && options.getTypedSearchAttributes().size() > 0) { attributes.setSearchAttributes( SearchAttributesUtil.encodeTyped(options.getTypedSearchAttributes())); + } else if (options.getTypedSearchAttributes() == null && searchAttributes == null) { + // Carry over existing search attributes if none are specified. + SearchAttributes existing = replayContext.getSearchAttributes(); + if (existing != null && !existing.getIndexedFieldsMap().isEmpty()) { + attributes.setSearchAttributes(existing); + } } Map memo = options.getMemo(); if (memo != null) { @@ -1379,11 +1385,21 @@ public void continueAsNew(ContinueAsNewInput input) { .determineUseCompatibleFlag( replayContext.getTaskQueue().equals(options.getTaskQueue()))); } - } else if (replayContext.getRetryOptions() != null) { - // Have to copy retry options as server doesn't copy them. + } + + if (options == null && replayContext.getRetryOptions() != null) { + // Have to copy certain options as server doesn't copy them. attributes.setRetryPolicy(toRetryPolicy(replayContext.getRetryOptions())); } + if (options == null && replayContext.getSearchAttributes() != null) { + // Carry over existing search attributes if none are specified. + SearchAttributes existing = replayContext.getSearchAttributes(); + if (existing != null && !existing.getIndexedFieldsMap().isEmpty()) { + attributes.setSearchAttributes(existing); + } + } + List propagators = options != null && options.getContextPropagators() != null ? options.getContextPropagators() diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java index 3afd56333..d30850045 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/ContinueAsNewOptions.java @@ -8,7 +8,7 @@ import javax.annotation.Nullable; /** - * This class contain overrides for continueAsNew call. Every field can be null and it means that + * This class contain overrides for continueAsNew call. Every field can be null, and it means that * the value of the option should be taken from the originating workflow run. */ public final class ContinueAsNewOptions { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java index 345ee546c..d1be871e6 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/ContinueAsNewTest.java @@ -16,12 +16,17 @@ import org.junit.Test; public class ContinueAsNewTest { + static final SearchAttributeKey CUSTOM_KEYWORD_SA = + SearchAttributeKey.forKeyword("CustomKeywordField"); public static final int INITIAL_COUNT = 4; @Rule public SDKTestWorkflowRule testWorkflowRule = - SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestContinueAsNewImpl.class).build(); + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestContinueAsNewImpl.class) + .setUseExternalService(true) + .build(); @Test public void testContinueAsNew() { @@ -30,6 +35,8 @@ public void testContinueAsNew() { options = WorkflowOptions.newBuilder(options) .setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(10).build()) + .setTypedSearchAttributes( + SearchAttributes.newBuilder().set(CUSTOM_KEYWORD_SA, "foo0").build()) .build(); TestContinueAsNew client = testWorkflowRule.getWorkflowClient().newWorkflowStub(TestContinueAsNew.class, options); @@ -68,8 +75,10 @@ public int execute(int count, String continueAsNewTaskQueue) { String taskQueue = Workflow.getInfo().getTaskQueue(); if (count >= INITIAL_COUNT - 2) { assertEquals(10, Workflow.getInfo().getRetryOptions().getMaximumAttempts()); + assertEquals("foo0", Workflow.getTypedSearchAttributes().get(CUSTOM_KEYWORD_SA)); } else { assertEquals(5, Workflow.getInfo().getRetryOptions().getMaximumAttempts()); + assertEquals("foo1", Workflow.getTypedSearchAttributes().get(CUSTOM_KEYWORD_SA)); } if (count == 0) { assertEquals(continueAsNewTaskQueue, taskQueue); @@ -78,22 +87,22 @@ public int execute(int count, String continueAsNewTaskQueue) { Map memo = new HashMap<>(); memo.put("myKey", "MyValue"); RetryOptions retryOptions = null; + SearchAttributes searchAttributes = null; // don't specify ContinueAsNewOptions on the first continue-as-new to test that RetryOptions + // and SearchAttributes // are copied from the previous run. if (count == INITIAL_COUNT) { TestContinueAsNew next = Workflow.newContinueAsNewStub(TestContinueAsNew.class); next.execute(count - 1, continueAsNewTaskQueue); throw new RuntimeException("unreachable"); } - // don't specify RetryOptions on the second continue-as-new to test that they are copied from + // don't specify RetryOptions and SearchAttributes on the second continue-as-new to test that + // they are copied from // the previous run. if (count < INITIAL_COUNT - 1) { retryOptions = RetryOptions.newBuilder().setMaximumAttempts(5).build(); + searchAttributes = SearchAttributes.newBuilder().set(CUSTOM_KEYWORD_SA, "foo1").build(); } - SearchAttributes searchAttributes = - SearchAttributes.newBuilder() - .set(SearchAttributeKey.forKeyword("CustomKeywordField"), "foo1") - .build(); ContinueAsNewOptions options = ContinueAsNewOptions.newBuilder() .setTaskQueue(continueAsNewTaskQueue)