Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/136757.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 136757
summary: Allow dynamic updates to frequency
area: Transform
type: bug
issues:
- 133321
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ public boolean changesDestIndex(TransformConfig config) {
return isNullOrEqual(updatedIndex, config.getDestination().getIndex()) == false;
}

public boolean changesFrequency(TransformConfig config) {
return isNullOrEqual(frequency, config.getFrequency()) == false;
}

private static boolean isNullOrEqual(Object lft, Object rgt) {
return lft == null || lft.equals(rgt);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ protected void createReviewsIndexNano() throws IOException {
protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader) throws IOException {

// Set frequency high for testing
createContinuousPivotReviewsTransform(transformId, transformIndex, authHeader, "1s");
}

protected void createContinuousPivotReviewsTransform(String transformId, String transformIndex, String authHeader, String frequency)
throws IOException {
String config = Strings.format("""
{
"dest": {
Expand All @@ -265,7 +270,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
"delay": "15m"
}
},
"frequency": "1s",
"frequency": "%s",
"pivot": {
"group_by": {
"reviewer": {
Expand All @@ -282,7 +287,7 @@ protected void createContinuousPivotReviewsTransform(String transformId, String
}
}
}
}""", transformIndex, REVIEWS_INDEX_NAME);
}""", transformIndex, REVIEWS_INDEX_NAME, frequency);

createReviewsTransform(transformId, authHeader, null, config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -109,21 +110,54 @@ public void testUpdateTransferRightsSecondaryAuthHeaders() throws Exception {
}

public void testUpdateThatChangesSettingsButNotHeaders() throws Exception {
String transformId = "test_update_that_changes_settings";
String destIndex = transformId + "-dest";
var transformId = "test_update_that_changes_settings";
var destIndex = transformId + "-dest";

// Create the transform
createPivotReviewsTransform(transformId, destIndex, null, null, null);

Request updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null);
updateTransformRequest.setJsonEntity("""
// Update the transform's settings
var updatedConfig = updateTransform(transformId, """
{ "settings": { "max_page_search_size": 123 } }""");

// Verify that the settings got updated
assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123))));
}

private Map<String, Object> updateTransform(String transformId, String jsonPayload) throws Exception {
var updateTransformRequest = createRequestWithAuth("POST", getTransformEndpoint() + transformId + "/_update", null);
updateTransformRequest.setJsonEntity(jsonPayload);
return entityAsMap(client().performRequest(updateTransformRequest));
}

public void testUpdateFrequency() throws Exception {
var transformId = "test_update_frequency";
var destIndex = transformId + "-dest";

// Create the transform
createContinuousPivotReviewsTransform(transformId, destIndex, null, "1h");
startTransform(transformId);

// wait until it finishes the first checkpoint and check that it hasn't triggered again
assertBusy(() -> {
var statsAndState = getTransformStateAndStats(transformId);
assertThat(XContentMapValues.extractValue("checkpointing.last.checkpoint", statsAndState), equalTo(1));
assertThat(XContentMapValues.extractValue("stats.trigger_count", statsAndState), equalTo(1));
}, 10, TimeUnit.SECONDS);

// Update the transform's settings
Map<String, Object> updatedConfig = entityAsMap(client().performRequest(updateTransformRequest));
var updatedConfig = updateTransform(transformId, """
{ "frequency": "1s" }""");

// Verify that the settings got updated
assertThat(updatedConfig.get("settings"), is(equalTo(Map.of("max_page_search_size", 123))));
assertThat(updatedConfig.get("frequency"), is(equalTo("1s")));
assertBusy(() -> {
var triggerCount = (Integer) XContentMapValues.extractValue("stats.trigger_count", getTransformStateAndStats(transformId));
assertThat(triggerCount, is(greaterThan(1)));
}, 10, TimeUnit.SECONDS);

stopTransform(transformId, true);
deleteTransform(transformId);
}

public void testConcurrentUpdates() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
boolean updateChangesSettings = update.changesSettings(originalConfig);
boolean updateChangesHeaders = update.changesHeaders(originalConfig);
boolean updateChangesDestIndex = update.changesDestIndex(originalConfig);
if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex) {
boolean updateFrequency = update.changesFrequency(originalConfig);
if (updateChangesSettings || updateChangesHeaders || updateChangesDestIndex || updateFrequency) {
PersistentTasksCustomMetadata.PersistentTask<?> transformTask = TransformTask.getTransformTask(
request.getId(),
clusterState
Expand Down Expand Up @@ -258,6 +259,7 @@ protected void taskOperation(
transformTask.applyNewSettings(request.getConfig().getSettings());
transformTask.applyNewAuthState(request.getAuthState());
transformTask.checkAndResetDestinationIndexBlock(request.getConfig());
transformTask.applyNewFrequency(request.getConfig());
listener.onResponse(new Response(request.getConfig()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,13 @@ public void checkAndResetDestinationIndexBlock(TransformConfig config) {
}
}

public void applyNewFrequency(TransformConfig config) {
var frequency = config != null ? config.getFrequency() : null;
if (frequency != null) {
transformScheduler.updateFrequency(config.getId(), frequency);
}
}

@Override
protected void init(
PersistentTasksService persistentTasksService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.elasticsearch.core.Strings.format;

/**
* {@link TransformScheduler} class is responsible for scheduling transform tasks according to their configured frequency as well as
* retrying policy.
Expand Down Expand Up @@ -123,9 +121,9 @@ void processScheduledTasks() {
Instant processingFinished = clock.instant();
long tookMs = Duration.between(processingStarted, processingFinished).toMillis();
if (taskWasProcessed) {
logger.trace(format("Processing one scheduled task finished, took %dms", tookMs));
logger.trace("Processing one scheduled task finished, took [{}] ms", tookMs);
} else {
logger.trace(format("Looking for scheduled tasks to process finished, took %dms", tookMs));
logger.trace("Looking for scheduled tasks to process finished, took [{}] ms", tookMs);
}
}
if (taskWasProcessed == false) {
Expand Down Expand Up @@ -161,12 +159,10 @@ private boolean processScheduledTasksInternal() {
scheduledTasks.update(scheduledTask.getTransformId(), task -> {
if (task.equals(scheduledTask) == false) {
logger.debug(
() -> format(
"[%s] task object got modified while processing. Expected: %s, was: %s",
scheduledTask.getTransformId(),
scheduledTask,
task
)
"[{}] task object got modified while processing. Expected: {}, was: {}",
scheduledTask.getTransformId(),
scheduledTask,
task
);
}
return new TransformScheduledTask(
Expand Down Expand Up @@ -200,7 +196,7 @@ public void stop() {
*/
public void registerTransform(TransformTaskParams transformTaskParams, Listener listener) {
String transformId = transformTaskParams.getId();
logger.trace(() -> format("[%s] register the transform", transformId));
logger.trace("[{}] register the transform", transformId);
long currentTimeMillis = clock.millis();
TransformScheduledTask transformScheduledTask = new TransformScheduledTask(
transformId,
Expand All @@ -223,7 +219,7 @@ public void registerTransform(TransformTaskParams transformTaskParams, Listener
* @param failureCount new value of transform task's failure count
*/
public void handleTransformFailureCountChanged(String transformId, int failureCount) {
logger.trace(() -> format("[%s] handle transform failure count change to %d", transformId, failureCount));
logger.trace("[{}] handle transform failure count change to [{}]", transformId, failureCount);
// Update the task's failure count (next_scheduled_time gets automatically re-calculated)
scheduledTasks.update(
transformId,
Expand All @@ -237,14 +233,30 @@ public void handleTransformFailureCountChanged(String transformId, int failureCo
);
}

public void updateFrequency(String transformId, TimeValue frequency) {
logger.trace("[{}] handle transform frequency change to [{}]", transformId, frequency);
scheduledTasks.update(transformId, task -> {
if (task.getFrequency().equals(frequency)) {
return task;
}
return new TransformScheduledTask(
task.getTransformId(),
getFrequency(frequency),
task.getLastTriggeredTimeMillis(),
task.getFailureCount(),
task.getListener()
);
});
}

/**
* Updates the transform task's next_scheduled_time so that it is set to now.
* Doing so may result in the task being processed earlier that it would normally (i.e.: according to its frequency) be.
*
* @param transformId id of the transform to schedule now
*/
public void scheduleNow(String transformId) {
logger.trace(() -> format("[%s] schedule_now transform", transformId));
logger.trace("[{}] schedule_now transform", transformId);
long currentTimeMillis = clock.millis();
// Update the task's next_scheduled_time
scheduledTasks.update(
Expand All @@ -268,7 +280,7 @@ public void scheduleNow(String transformId) {
*/
public void deregisterTransform(String transformId) {
Objects.requireNonNull(transformId);
logger.trace(() -> format("[%s] de-register the transform", transformId));
logger.trace("[{}] de-register the transform", transformId);
scheduledTasks.remove(transformId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,68 @@ public void testScheduleNow() {
transformScheduler.stop();
}

public void testUpdateFrequencyWithChanges() throws Exception {
var transformId = "test-update-frequency-with-changes";
var startingFrequency = TimeValue.timeValueHours(1);
var updatingFrequency = TimeValue.timeValueSeconds(1);

var transformTaskParams = new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, startingFrequency, false);
var clock = new FakeClock(Instant.ofEpochMilli(0));
var events = new CopyOnWriteArrayList<TransformScheduler.Event>();
TransformScheduler.Listener listener = events::add;

var transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
transformScheduler.registerTransform(transformTaskParams, listener);
assertThat(events, hasSize(1));

// Update the frequency
transformScheduler.updateFrequency(transformId, updatingFrequency);
clock.advanceTimeBy(Duration.ofMinutes(2));
transformScheduler.processScheduledTasks();

assertBusy(() -> assertThat(events, hasSize(greaterThanOrEqualTo(2))), 10, TimeUnit.SECONDS);

var actualTask = transformScheduler.getTransformScheduledTasks()
.stream()
.filter(task -> task.getTransformId().equals(transformId))
.findAny()
.orElseThrow();
assertThat(actualTask.getFrequency(), equalTo(updatingFrequency));

transformScheduler.deregisterTransform(transformId);
transformScheduler.stop();
}

public void testUpdateFrequencyWithNoChanges() throws Exception {
var transformId = "test-update-frequency-with-changes";
var startingFrequency = TimeValue.timeValueHours(1);

var transformTaskParams = new TransformTaskParams(transformId, TransformConfigVersion.CURRENT, startingFrequency, false);
var clock = new FakeClock(Instant.ofEpochMilli(0));
var events = new CopyOnWriteArrayList<TransformScheduler.Event>();
TransformScheduler.Listener listener = events::add;

var transformScheduler = new TransformScheduler(clock, threadPool, SETTINGS, TimeValue.ZERO);
transformScheduler.registerTransform(transformTaskParams, listener);
assertThat(events, hasSize(1));

// Update the frequency
transformScheduler.updateFrequency(transformId, startingFrequency);
clock.advanceTimeBy(Duration.ofMinutes(2));
transformScheduler.processScheduledTasks();
assertThat(events, hasSize(1));

var actualTask = transformScheduler.getTransformScheduledTasks()
.stream()
.filter(task -> task.getTransformId().equals(transformId))
.findAny()
.orElseThrow();
assertThat(actualTask.getFrequency(), equalTo(startingFrequency));

transformScheduler.deregisterTransform(transformId);
transformScheduler.stop();
}

public void testConcurrentProcessing() throws Exception {
String transformId = "test-with-fake-clock-concurrent";
int frequencySeconds = 5;
Expand Down