From 63ed5af721624aafac368deaf8ae5a97419ed76c Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 20 Oct 2025 10:10:33 +0200 Subject: [PATCH 1/5] fix debug output in TransportGetDataFrameAnalyticsStatsAction --- .../ml/action/TransportGetDataFrameAnalyticsStatsAction.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java index 7d918ebeaf5a1..fdd64822a7a02 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetDataFrameAnalyticsStatsAction.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ml.utils.persistence.MlParserUtils; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -278,7 +279,7 @@ private void searchStats(DataFrameAnalyticsConfig config, TaskId parentTaskId, A () -> format( "[%s] Item failure encountered during multi search for request [indices=%s, source=%s]: %s", config.getId(), - itemRequest.indices(), + Arrays.toString(itemRequest.indices()), itemRequest.source(), itemResponse.getFailureMessage() ), From ff1a00b38d8db39a386eb731904976377969a810 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 20 Oct 2025 10:11:40 +0200 Subject: [PATCH 2/5] clear TrainedModelStatsService's queue upon MachineLearning reset --- .../xpack/ml/MachineLearning.java | 28 +++++++++++-------- .../inference/TrainedModelStatsService.java | 4 +++ 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 72c3b4ba3be01..b35d676c5b95e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -805,7 +805,8 @@ public void loadExtensions(ExtensionLoader loader) { private final SetOnce learningToRankService = new SetOnce<>(); private final SetOnce mlAutoscalingDeciderService = new SetOnce<>(); private final SetOnce deploymentManager = new SetOnce<>(); - private final SetOnce trainedModelAllocationClusterServiceSetOnce = new SetOnce<>(); + private final SetOnce trainedModelAllocationClusterService = new SetOnce<>(); + private final SetOnce trainedModelStatsService = new SetOnce<>(); private final SetOnce machineLearningExtension = new SetOnce<>(); @@ -1164,12 +1165,14 @@ public Collection createComponents(PluginServices services) { this.datafeedRunner.set(datafeedRunner); // Inference components - final TrainedModelStatsService trainedModelStatsService = new TrainedModelStatsService( - resultsPersisterService, - originSettingClient, - indexNameExpressionResolver, - clusterService, - threadPool + trainedModelStatsService.set( + new TrainedModelStatsService( + resultsPersisterService, + originSettingClient, + indexNameExpressionResolver, + clusterService, + threadPool + ) ); final TrainedModelCacheMetadataService trainedModelCacheMetadataService = new TrainedModelCacheMetadataService( clusterService, @@ -1185,7 +1188,7 @@ public Collection createComponents(PluginServices services) { inferenceAuditor, threadPool, clusterService, - trainedModelStatsService, + trainedModelStatsService.get(), settings, clusterService.getNodeName(), inferenceModelBreaker.get(), @@ -1315,7 +1318,7 @@ public Collection createComponents(PluginServices services) { clusterService, threadPool ); - trainedModelAllocationClusterServiceSetOnce.set( + trainedModelAllocationClusterService.set( new TrainedModelAssignmentClusterService( settings, clusterService, @@ -1391,7 +1394,7 @@ public Collection createComponents(PluginServices services) { trainedModelCacheMetadataService, trainedModelProvider, trainedModelAssignmentService, - trainedModelAllocationClusterServiceSetOnce.get(), + trainedModelAllocationClusterService.get(), deploymentManager.get(), nodeAvailabilityZoneMapper, new MachineLearningExtensionHolder(machineLearningExtension.get()), @@ -2153,6 +2156,7 @@ public void cleanUpFeature( ActionListener unsetResetModeListener = ActionListener.wrap(success -> { client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(resetSuccess -> { + trainedModelStatsService.get().clearQueue(); finalListener.onResponse(success); logger.info("Finished machine learning feature reset"); }, resetFailure -> { @@ -2329,11 +2333,11 @@ public void cleanUpFeature( ); client.execute(CancelJobModelSnapshotUpgradeAction.INSTANCE, cancelSnapshotUpgradesReq, delegate); }).delegateFailureAndWrap((delegate, acknowledgedResponse) -> { - if (trainedModelAllocationClusterServiceSetOnce.get() == null || machineLearningExtension.get().isNlpEnabled() == false) { + if (trainedModelAllocationClusterService.get() == null || machineLearningExtension.get().isNlpEnabled() == false) { delegate.onResponse(AcknowledgedResponse.TRUE); return; } - trainedModelAllocationClusterServiceSetOnce.get().removeAllModelAssignments(delegate); + trainedModelAllocationClusterService.get().removeAllModelAssignments(delegate); }); // validate no pipelines are using machine learning models diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java index 4fc606f1513c2..e8f7fde64640d 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/TrainedModelStatsService.java @@ -295,4 +295,8 @@ static UpdateRequest buildUpdateRequest(InferenceStats stats) { } return null; } + + public void clearQueue() { + statsQueue.clear(); + } } From 38e12622457ad156f8f2e74d3f9fb1435292866b Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 20 Oct 2025 10:21:52 +0200 Subject: [PATCH 3/5] unmute tests --- muted-tests.yml | 33 ------------------- .../ml/integration/DeleteExpiredDataIT.java | 2 -- 2 files changed, 35 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index a056a183df39c..f211ac8664ad1 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -137,18 +137,12 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=snapshot/10_basic/Create a source only snapshot and then restore it} issue: https://github.com/elastic/elasticsearch/issues/122755 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_crud/Test get stats given multiple analytics} - issue: https://github.com/elastic/elasticsearch/issues/123034 - class: org.elasticsearch.indices.recovery.IndexRecoveryIT method: testSourceThrottling issue: https://github.com/elastic/elasticsearch/issues/123680 - class: org.elasticsearch.smoketest.MlWithSecurityIT method: test {yaml=ml/3rd_party_deployment/Test start deployment fails while model download in progress} issue: https://github.com/elastic/elasticsearch/issues/120814 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable is missing} - issue: https://github.com/elastic/elasticsearch/issues/124168 - class: org.elasticsearch.smoketest.MlWithSecurityIT method: test {yaml=ml/3rd_party_deployment/Test start and stop multiple deployments} issue: https://github.com/elastic/elasticsearch/issues/124315 @@ -161,15 +155,6 @@ tests: - class: org.elasticsearch.packaging.test.BootstrapCheckTests method: test10Install issue: https://github.com/elastic/elasticsearch/issues/124957 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_crud/Test get stats on newly created config} - issue: https://github.com/elastic/elasticsearch/issues/121726 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_cat_apis/Test cat data frame analytics all jobs with header and column selection} - issue: https://github.com/elastic/elasticsearch/issues/125641 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/data_frame_analytics_cat_apis/Test cat data frame analytics single job with header} - issue: https://github.com/elastic/elasticsearch/issues/125642 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_start_stop/Test schedule_now on an already started transform} issue: https://github.com/elastic/elasticsearch/issues/120720 @@ -179,9 +164,6 @@ tests: - class: org.elasticsearch.xpack.core.common.notifications.AbstractAuditorTests method: testRecreateTemplateWhenDeleted issue: https://github.com/elastic/elasticsearch/issues/123232 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/start_data_frame_analytics/Test start given dest index is not empty} - issue: https://github.com/elastic/elasticsearch/issues/125909 - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_stats/Test get transform stats with timeout} issue: https://github.com/elastic/elasticsearch/issues/125975 @@ -197,15 +179,6 @@ tests: - class: org.elasticsearch.xpack.test.rest.XPackRestIT method: test {p0=transform/transforms_stats/Test get transform stats} issue: https://github.com/elastic/elasticsearch/issues/126270 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable cardinality is too low} - issue: https://github.com/elastic/elasticsearch/issues/126299 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/start_data_frame_analytics/Test start classification analysis when the dependent variable cardinality is too low} - issue: https://github.com/elastic/elasticsearch/issues/123200 -- class: org.elasticsearch.smoketest.MlWithSecurityIT - method: test {yaml=ml/trained_model_cat_apis/Test cat trained models} - issue: https://github.com/elastic/elasticsearch/issues/125750 - class: org.elasticsearch.ingest.geoip.EnterpriseGeoIpDownloaderIT method: testEnterpriseDownloaderTask issue: https://github.com/elastic/elasticsearch/issues/126124 @@ -245,9 +218,6 @@ tests: - class: org.elasticsearch.cli.keystore.AddStringKeyStoreCommandTests method: testStdinWithMultipleValues issue: https://github.com/elastic/elasticsearch/issues/126882 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/data_frame_analytics_cat_apis/Test cat data frame analytics all jobs with header} - issue: https://github.com/elastic/elasticsearch/issues/127625 - class: org.elasticsearch.xpack.ccr.action.ShardFollowTaskReplicationTests method: testChangeFollowerHistoryUUID issue: https://github.com/elastic/elasticsearch/issues/127680 @@ -333,9 +303,6 @@ tests: - class: org.elasticsearch.packaging.test.DockerTests method: test171AdditionalCliOptionsAreForwarded issue: https://github.com/elastic/elasticsearch/issues/120925 -- class: org.elasticsearch.xpack.test.rest.XPackRestIT - method: test {p0=ml/delete_expired_data/Test delete expired data with body parameters} - issue: https://github.com/elastic/elasticsearch/issues/131364 - class: org.elasticsearch.packaging.test.DockerTests method: test070BindMountCustomPathConfAndJvmOptions issue: https://github.com/elastic/elasticsearch/issues/131366 diff --git a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java index 50b9597bb0326..fa176f2ce92b3 100644 --- a/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java +++ b/x-pack/plugin/ml/qa/native-multi-node-tests/src/javaRestTest/java/org/elasticsearch/xpack/ml/integration/DeleteExpiredDataIT.java @@ -105,7 +105,6 @@ public void testDeleteExpiredData_GivenNothingToDelete() throws Exception { client().execute(DeleteExpiredDataAction.INSTANCE, new DeleteExpiredDataAction.Request()).get(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataNoThrottle() throws Exception { testExpiredDeletion(null, 10010); } @@ -152,7 +151,6 @@ public void testDeleteExpiredDataActionDeletesEmptyStateIndices() throws Excepti ); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/62699") public void testDeleteExpiredDataWithStandardThrottle() throws Exception { testExpiredDeletion(-1.0f, 100); } From 46bc323106083497f2a9644f6e90230283140ddc Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 20 Oct 2025 16:11:25 +0200 Subject: [PATCH 4/5] rename ResetAuditorActions -> ResetMlComponentsAction --- ...tion.java => ResetMlComponentsAction.java} | 6 +-- .../xpack/ml/MachineLearning.java | 16 +++---- ... => TransportResetMlComponentsAction.java} | 48 +++++++++---------- 3 files changed, 35 insertions(+), 35 deletions(-) rename x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/{ResetAuditorAction.java => ResetMlComponentsAction.java} (94%) rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/{TransportResetAuditorAction.java => TransportResetMlComponentsAction.java} (53%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java similarity index 94% rename from x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java rename to x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java index dd5cbaf29bc1f..3ecda2674e36f 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetMlComponentsAction.java @@ -22,12 +22,12 @@ import java.util.List; import java.util.Objects; -public class ResetAuditorAction extends ActionType { +public class ResetMlComponentsAction extends ActionType { - public static final ResetAuditorAction INSTANCE = new ResetAuditorAction(); + public static final ResetMlComponentsAction INSTANCE = new ResetMlComponentsAction(); public static final String NAME = "cluster:internal/xpack/ml/auditor/reset"; - private ResetAuditorAction() { + private ResetMlComponentsAction() { super(NAME); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index b35d676c5b95e..eccd6e0a7cb4c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -160,8 +160,8 @@ import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelVocabularyAction; -import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; import org.elasticsearch.xpack.core.ml.action.ResetJobAction; +import org.elasticsearch.xpack.core.ml.action.ResetMlComponentsAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; import org.elasticsearch.xpack.core.ml.action.SetUpgradeModeAction; @@ -271,8 +271,8 @@ import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelVocabularyAction; -import org.elasticsearch.xpack.ml.action.TransportResetAuditorAction; import org.elasticsearch.xpack.ml.action.TransportResetJobAction; +import org.elasticsearch.xpack.ml.action.TransportResetMlComponentsAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction; import org.elasticsearch.xpack.ml.action.TransportSetUpgradeModeAction; @@ -1567,7 +1567,7 @@ public List getActions() { actionHandlers.add(new ActionHandler(MlMemoryAction.INSTANCE, TransportMlMemoryAction.class)); actionHandlers.add(new ActionHandler(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)); actionHandlers.add(new ActionHandler(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class)); - actionHandlers.add(new ActionHandler(ResetAuditorAction.INSTANCE, TransportResetAuditorAction.class)); + actionHandlers.add(new ActionHandler(ResetMlComponentsAction.INSTANCE, TransportResetMlComponentsAction.class)); // Included in this section as it's used by MlMemoryAction actionHandlers.add(new ActionHandler(TrainedModelCacheInfoAction.INSTANCE, TransportTrainedModelCacheInfoAction.class)); actionHandlers.add(new ActionHandler(GetMlAutoscalingStats.INSTANCE, TransportGetMlAutoscalingStats.class)); @@ -2181,17 +2181,17 @@ public void cleanUpFeature( }); ActionListener resetAuditors = ActionListener.wrap(success -> { - // reset the auditors as aliases used may be removed + // reset components, such as the auditors the trained model stats queue client.execute( - ResetAuditorAction.INSTANCE, - ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ResetMlComponentsAction.INSTANCE, + ResetMlComponentsAction.Request.RESET_AUDITOR_REQUEST, ActionListener.wrap(ignored -> unsetResetModeListener.onResponse(success), unsetResetModeListener::onFailure) ); }, failure -> { logger.error("failed to reset machine learning", failure); client.execute( - ResetAuditorAction.INSTANCE, - ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ResetMlComponentsAction.INSTANCE, + ResetMlComponentsAction.Request.RESET_AUDITOR_REQUEST, ActionListener.wrap(ignored -> unsetResetModeListener.onFailure(failure), unsetResetModeListener::onFailure) ); }); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java similarity index 53% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java index a8f0daca2274d..1fa43fdb9efac 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java @@ -17,72 +17,72 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; +import org.elasticsearch.xpack.core.ml.action.ResetMlComponentsAction; +import org.elasticsearch.xpack.ml.inference.TrainedModelStatsService; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; -import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; import java.io.IOException; import java.util.List; -public class TransportResetAuditorAction extends TransportNodesAction< - ResetAuditorAction.Request, - ResetAuditorAction.Response, - ResetAuditorAction.NodeRequest, - ResetAuditorAction.Response.ResetResponse, +public class TransportResetMlComponentsAction extends TransportNodesAction< + ResetMlComponentsAction.Request, + ResetMlComponentsAction.Response, + ResetMlComponentsAction.NodeRequest, + ResetMlComponentsAction.Response.ResetResponse, Void> { private final AnomalyDetectionAuditor anomalyDetectionAuditor; private final DataFrameAnalyticsAuditor dfaAuditor; - private final InferenceAuditor inferenceAuditor; + private final TrainedModelStatsService trainedModelStatsService; @Inject - public TransportResetAuditorAction( + public TransportResetMlComponentsAction( ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, AnomalyDetectionAuditor anomalyDetectionAuditor, DataFrameAnalyticsAuditor dfaAuditor, - InferenceAuditor inferenceAuditor + TrainedModelStatsService trainedModelStatsService ) { super( - ResetAuditorAction.NAME, + ResetMlComponentsAction.NAME, clusterService, transportService, actionFilters, - ResetAuditorAction.NodeRequest::new, + ResetMlComponentsAction.NodeRequest::new, threadPool.executor(ThreadPool.Names.MANAGEMENT) ); this.anomalyDetectionAuditor = anomalyDetectionAuditor; this.dfaAuditor = dfaAuditor; - this.inferenceAuditor = inferenceAuditor; + this.trainedModelStatsService = trainedModelStatsService; } @Override - protected ResetAuditorAction.Response newResponse( - ResetAuditorAction.Request request, - List resetResponses, + protected ResetMlComponentsAction.Response newResponse( + ResetMlComponentsAction.Request request, + List resetResponses, List failures ) { - return new ResetAuditorAction.Response(clusterService.getClusterName(), resetResponses, failures); + return new ResetMlComponentsAction.Response(clusterService.getClusterName(), resetResponses, failures); } @Override - protected ResetAuditorAction.NodeRequest newNodeRequest(ResetAuditorAction.Request request) { - return new ResetAuditorAction.NodeRequest(); + protected ResetMlComponentsAction.NodeRequest newNodeRequest(ResetMlComponentsAction.Request request) { + return new ResetMlComponentsAction.NodeRequest(); } @Override - protected ResetAuditorAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { - return new ResetAuditorAction.Response.ResetResponse(in); + protected ResetMlComponentsAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new ResetMlComponentsAction.Response.ResetResponse(in); } @Override - protected ResetAuditorAction.Response.ResetResponse nodeOperation(ResetAuditorAction.NodeRequest request, Task task) { + protected ResetMlComponentsAction.Response.ResetResponse nodeOperation(ResetMlComponentsAction.NodeRequest request, Task task) { anomalyDetectionAuditor.reset(); dfaAuditor.reset(); - inferenceAuditor.reset(); - return new ResetAuditorAction.Response.ResetResponse(clusterService.localNode(), true); + trainedModelStatsService.clearQueue(); + return new ResetMlComponentsAction.Response.ResetResponse(clusterService.localNode(), true); } } From 3a929ab16cddb5d2028340720f020de2312d53a1 Mon Sep 17 00:00:00 2001 From: Jan Kuipers Date: Mon, 20 Oct 2025 16:23:50 +0200 Subject: [PATCH 5/5] Move clearing stats queue to reset action --- .../xpack/ml/MachineLearning.java | 19 ++++++++----------- .../TransportResetMlComponentsAction.java | 5 +++++ 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index eccd6e0a7cb4c..632c5bc17a611 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -806,7 +806,6 @@ public void loadExtensions(ExtensionLoader loader) { private final SetOnce mlAutoscalingDeciderService = new SetOnce<>(); private final SetOnce deploymentManager = new SetOnce<>(); private final SetOnce trainedModelAllocationClusterService = new SetOnce<>(); - private final SetOnce trainedModelStatsService = new SetOnce<>(); private final SetOnce machineLearningExtension = new SetOnce<>(); @@ -1165,14 +1164,12 @@ public Collection createComponents(PluginServices services) { this.datafeedRunner.set(datafeedRunner); // Inference components - trainedModelStatsService.set( - new TrainedModelStatsService( - resultsPersisterService, - originSettingClient, - indexNameExpressionResolver, - clusterService, - threadPool - ) + final TrainedModelStatsService trainedModelStatsService = new TrainedModelStatsService( + resultsPersisterService, + originSettingClient, + indexNameExpressionResolver, + clusterService, + threadPool ); final TrainedModelCacheMetadataService trainedModelCacheMetadataService = new TrainedModelCacheMetadataService( clusterService, @@ -1188,7 +1185,7 @@ public Collection createComponents(PluginServices services) { inferenceAuditor, threadPool, clusterService, - trainedModelStatsService.get(), + trainedModelStatsService, settings, clusterService.getNodeName(), inferenceModelBreaker.get(), @@ -1395,6 +1392,7 @@ public Collection createComponents(PluginServices services) { trainedModelProvider, trainedModelAssignmentService, trainedModelAllocationClusterService.get(), + trainedModelStatsService, deploymentManager.get(), nodeAvailabilityZoneMapper, new MachineLearningExtensionHolder(machineLearningExtension.get()), @@ -2156,7 +2154,6 @@ public void cleanUpFeature( ActionListener unsetResetModeListener = ActionListener.wrap(success -> { client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(resetSuccess -> { - trainedModelStatsService.get().clearQueue(); finalListener.onResponse(success); logger.info("Finished machine learning feature reset"); }, resetFailure -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java index 1fa43fdb9efac..748de68b8c46c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetMlComponentsAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.ml.inference.TrainedModelStatsService; import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; import java.io.IOException; import java.util.List; @@ -34,6 +35,7 @@ public class TransportResetMlComponentsAction extends TransportNodesAction< private final AnomalyDetectionAuditor anomalyDetectionAuditor; private final DataFrameAnalyticsAuditor dfaAuditor; + private final InferenceAuditor inferenceAuditor; private final TrainedModelStatsService trainedModelStatsService; @Inject @@ -44,6 +46,7 @@ public TransportResetMlComponentsAction( ActionFilters actionFilters, AnomalyDetectionAuditor anomalyDetectionAuditor, DataFrameAnalyticsAuditor dfaAuditor, + InferenceAuditor inferenceAuditor, TrainedModelStatsService trainedModelStatsService ) { super( @@ -56,6 +59,7 @@ public TransportResetMlComponentsAction( ); this.anomalyDetectionAuditor = anomalyDetectionAuditor; this.dfaAuditor = dfaAuditor; + this.inferenceAuditor = inferenceAuditor; this.trainedModelStatsService = trainedModelStatsService; } @@ -82,6 +86,7 @@ protected ResetMlComponentsAction.Response.ResetResponse newNodeResponse(StreamI protected ResetMlComponentsAction.Response.ResetResponse nodeOperation(ResetMlComponentsAction.NodeRequest request, Task task) { anomalyDetectionAuditor.reset(); dfaAuditor.reset(); + inferenceAuditor.reset(); trainedModelStatsService.clearQueue(); return new ResetMlComponentsAction.Response.ResetResponse(clusterService.localNode(), true); }