diff --git a/docs/changelog/136757.yaml b/docs/changelog/136757.yaml new file mode 100644 index 0000000000000..9c0bf5829c4c6 --- /dev/null +++ b/docs/changelog/136757.yaml @@ -0,0 +1,6 @@ +pr: 136757 +summary: Allow dynamic updates to frequency +area: Transform +type: bug +issues: + - 133321 diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java index 42baa1f769b23..959241284d025 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/transform/transforms/TransformConfigUpdate.java @@ -244,6 +244,10 @@ public boolean changesDestIndex(TransformConfig config) { return isNullOrEqual(updatedIndex, config.getDestination().getIndex()) == false; } + public boolean changesFrequency(TransformConfig config) { + return isNullOrEqual(frequency, config.getFrequency()) == false; + } + private static boolean isNullOrEqual(Object lft, Object rgt) { return lft == null || lft.equals(rgt); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java index 14438d59626ce..96478f5576107 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformRestTestCase.java @@ -251,6 +251,11 @@ protected void createReviewsIndexNano() throws IOException { protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException { // Set frequency high for testing + createContinuousPivotReviewsTransform(transformId, transformIndex, authHeader, "1s"); + } + + protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader, String frequency) + throws IOException { String config = Strings.format(""" { "dest": { @@ -265,7 +270,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String "delay": "15m" } }, - "frequency": "1s", + "frequency": "%s", "pivot": { "group_by": { "reviewer": { @@ -282,7 +287,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String } } } - }""", transformIndex, REVIEWS_INDEX_NAME); + }""", transformIndex, REVIEWS_INDEX_NAME, frequency); createReviewsTransform(transformId, authHeader, null, config); } diff --git a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java index 6fd5df3a60c74..012d5a0565659 100644 --- a/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java +++ b/x-pack/plugin/transform/qa/single-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/transform/integration/TransformUpdateIT.java @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -109,21 +110,54 @@ public void testUpdateTransferRightsSecondaryAuthHeaders() throws Exception { } public void testUpdateThatChangesSettingsButNotHeaders() throws Exception { - String transformId = "test_update_that_changes_settings"; - String destIndex = transformId + "-dest"; + var transformId = "test_update_that_changes_settings"; + var destIndex = transformId + "-dest"; // Create the transform createPivotReviewsTransform(transformId, destIndex, null, null, null); - Request updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null); - updateTransformRequest.setJsonEntity(""" + // Update the transform's settings + var updatedConfig = updateTransform(transformId, """ { "settings": { "max_page_search_size": 123 } }"""); + // Verify that the settings got updated + assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123)))); + } + + private Map updateTransform(String transformId, String jsonPayload) throws Exception { + var updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null); + updateTransformRequest.setJsonEntity(jsonPayload); + return entityAsMap(client().performRequest(updateTransformRequest)); + } + + public void testUpdateFrequency() throws Exception { + var transformId = "test_update_frequency"; + var destIndex = transformId + "-dest"; + + // Create the transform + createContinuousPivotReviewsTransform(transformId, destIndex, null, "1h"); + startTransform(transformId); + + // wait until it finishes the first checkpoint and check that it hasn't triggered again + assertBusy(() -> { + var statsAndState = getTransformStateAndStats(transformId); + assertThat(XContentMapValues.extractValue("checkpointing.last.checkpoint", statsAndState), equalTo(1)); + assertThat(XContentMapValues.extractValue("stats.trigger_count", statsAndState), equalTo(1)); + }, 10, TimeUnit.SECONDS); + // Update the transform's settings - Map updatedConfig = entityAsMap(client().performRequest(updateTransformRequest)); + var updatedConfig = updateTransform(transformId, """ + { "frequency": "1s" }"""); // Verify that the settings got updated - assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123)))); + assertThat(updatedConfig.get("frequency"), is(equalTo("1s"))); + assertBusy(() -> { + var triggerCount = (Integer) XContentMapValues.extractValue("stats.trigger_count", getTransformStateAndStats(transformId)); + assertThat(triggerCount, is(greaterThan(1))); + }, 10, TimeUnit.SECONDS); + + stopTransform(transformId, true); + deleteTransform(transformId); } public void testConcurrentUpdates() throws Exception { diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java index 4d5dacde6efcb..ea5a9d4b430fe 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/action/TransportUpdateTransformAction.java @@ -172,7 +172,8 @@ protected void doExecute(Task task, Request request, ActionListener li boolean updateChangesSettings = update.changesSettings(originalConfig); boolean updateChangesHeaders = update.changesHeaders(originalConfig); boolean updateChangesDestIndex = update.changesDestIndex(originalConfig); - if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex) { + boolean updateFrequency = update.changesFrequency(originalConfig); + if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex || updateFrequency) { PersistentTasksCustomMetadata.PersistentTask transformTask = TransformTask.getTransformTask( request.getId(), clusterState @@ -258,6 +259,7 @@ protected void taskOperation( transformTask.applyNewSettings(request.getConfig().getSettings()); transformTask.applyNewAuthState(request.getAuthState()); transformTask.checkAndResetDestinationIndexBlock(request.getConfig()); + transformTask.applyNewFrequency(request.getConfig()); listener.onResponse(new Response(request.getConfig())); } diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java index 1d967d6e6774f..e74f76a71f9dc 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/TransformTask.java @@ -421,6 +421,13 @@ public void checkAndResetDestinationIndexBlock(TransformConfig config) { } } + public void applyNewFrequency(TransformConfig config) { + var frequency = config != null ? config.getFrequency() : null; + if (frequency != null) { + transformScheduler.updateFrequency(config.getId(), frequency); + } + } + @Override protected void init( PersistentTasksService persistentTasksService, diff --git a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java index 9c7afa38a5c59..3dd04140fae5c 100644 --- a/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java +++ b/x-pack/plugin/transform/src/main/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformScheduler.java @@ -25,8 +25,6 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.core.Strings.format; - /** * {@link TransformScheduler} class is responsible for scheduling transform tasks according to their configured frequency as well as * retrying policy. @@ -123,9 +121,9 @@ void processScheduledTasks() { Instant processingFinished = clock.instant(); long tookMs = Duration.between(processingStarted, processingFinished).toMillis(); if (taskWasProcessed) { - logger.trace(format("Processing one scheduled task finished, took %dms", tookMs)); + logger.trace("Processing one scheduled task finished, took [{}] ms", tookMs); } else { - logger.trace(format("Looking for scheduled tasks to process finished, took %dms", tookMs)); + logger.trace("Looking for scheduled tasks to process finished, took [{}] ms", tookMs); } } if (taskWasProcessed == false) { @@ -161,12 +159,10 @@ private boolean processScheduledTasksInternal() { scheduledTasks.update(scheduledTask.getTransformId(), task -> { if (task.equals(scheduledTask) == false) { logger.debug( - () -> format( - "[%s] task object got modified while processing. Expected: %s, was: %s", - scheduledTask.getTransformId(), - scheduledTask, - task - ) + "[{}] task object got modified while processing. Expected: {}, was: {}", + scheduledTask.getTransformId(), + scheduledTask, + task ); } return new TransformScheduledTask( @@ -200,7 +196,7 @@ public void stop() { */ public void registerTransform(TransformTaskParams transformTaskParams, Listener listener) { String transformId = transformTaskParams.getId(); - logger.trace(() -> format("[%s] register the transform", transformId)); + logger.trace("[{}] register the transform", transformId); long currentTimeMillis = clock.millis(); TransformScheduledTask transformScheduledTask = new TransformScheduledTask( transformId, @@ -223,7 +219,7 @@ public void registerTransform(TransformTaskParams transformTaskParams, Listener * @param failureCount new value of transform task's failure count */ public void handleTransformFailureCountChanged(String transformId, int failureCount) { - logger.trace(() -> format("[%s] handle transform failure count change to %d", transformId, failureCount)); + logger.trace("[{}] handle transform failure count change to [{}]", transformId, failureCount); // Update the task's failure count (next_scheduled_time gets automatically re-calculated) scheduledTasks.update( transformId, @@ -237,6 +233,22 @@ public void handleTransformFailureCountChanged(String transformId, int failureCo ); } + public void updateFrequency(String transformId, TimeValue frequency) { + logger.trace("[{}] handle transform frequency change to [{}]", transformId, frequency); + scheduledTasks.update(transformId, task -> { + if (task.getFrequency().equals(frequency)) { + return task; + } + return new TransformScheduledTask( + task.getTransformId(), + getFrequency(frequency), + task.getLastTriggeredTimeMillis(), + task.getFailureCount(), + task.getListener() + ); + }); + } + /** * Updates the transform task's next_scheduled_time so that it is set to now. * Doing so may result in the task being processed earlier that it would normally (i.e.: according to its frequency) be. @@ -244,7 +256,7 @@ public void handleTransformFailureCountChanged(String transformId, int failureCo * @param transformId id of the transform to schedule now */ public void scheduleNow(String transformId) { - logger.trace(() -> format("[%s] schedule_now transform", transformId)); + logger.trace("[{}] schedule_now transform", transformId); long currentTimeMillis = clock.millis(); // Update the task's next_scheduled_time scheduledTasks.update( @@ -268,7 +280,7 @@ public void scheduleNow(String transformId) { */ public void deregisterTransform(String transformId) { Objects.requireNonNull(transformId); - logger.trace(() -> format("[%s] de-register the transform", transformId)); + logger.trace("[{}] de-register the transform", transformId); scheduledTasks.remove(transformId); } diff --git a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java index 06fdfd7b538b1..c9e020b33f294 100644 --- a/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java +++ b/x-pack/plugin/transform/src/test/java/org/elasticsearch/xpack/transform/transforms/scheduling/TransformSchedulerTests.java @@ -240,6 +240,68 @@ public void testScheduleNow() { transformScheduler.stop(); } + public void testUpdateFrequencyWithChanges() throws Exception { + var transformId = "test-update-frequency-with-changes"; + var startingFrequency = TimeValue.timeValueHours(1); + var updatingFrequency = TimeValue.timeValueSeconds(1); + + var transformTaskParams = new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, startingFrequency, false); + var clock = new FakeClock(Instant.ofEpochMilli(0)); + var events = new CopyOnWriteArrayList(); + TransformScheduler.Listener listener = events::add; + + var transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO); + transformScheduler.registerTransform(transformTaskParams, listener); + assertThat(events, hasSize(1)); + + // Update the frequency + transformScheduler.updateFrequency(transformId, updatingFrequency); + clock.advanceTimeBy(Duration.ofMinutes(2)); + transformScheduler.processScheduledTasks(); + + assertBusy(() -> assertThat(events, hasSize(greaterThanOrEqualTo(2))), 10, TimeUnit.SECONDS); + + var actualTask = transformScheduler.getTransformScheduledTasks() + .stream() + .filter(task -> task.getTransformId().equals(transformId)) + .findAny() + .orElseThrow(); + assertThat(actualTask.getFrequency(), equalTo(updatingFrequency)); + + transformScheduler.deregisterTransform(transformId); + transformScheduler.stop(); + } + + public void testUpdateFrequencyWithNoChanges() throws Exception { + var transformId = "test-update-frequency-with-changes"; + var startingFrequency = TimeValue.timeValueHours(1); + + var transformTaskParams = new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, startingFrequency, false); + var clock = new FakeClock(Instant.ofEpochMilli(0)); + var events = new CopyOnWriteArrayList(); + TransformScheduler.Listener listener = events::add; + + var transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO); + transformScheduler.registerTransform(transformTaskParams, listener); + assertThat(events, hasSize(1)); + + // Update the frequency + transformScheduler.updateFrequency(transformId, startingFrequency); + clock.advanceTimeBy(Duration.ofMinutes(2)); + transformScheduler.processScheduledTasks(); + assertThat(events, hasSize(1)); + + var actualTask = transformScheduler.getTransformScheduledTasks() + .stream() + .filter(task -> task.getTransformId().equals(transformId)) + .findAny() + .orElseThrow(); + assertThat(actualTask.getFrequency(), equalTo(startingFrequency)); + + transformScheduler.deregisterTransform(transformId); + transformScheduler.stop(); + } + public void testConcurrentProcessing() throws Exception { String transformId = "test-with-fake-clock-concurrent"; int frequencySeconds = 5;