diff --git a/mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java b/mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java index c6a7b02cb..c5e29c76d 100644 --- a/mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java +++ b/mantis-runtime-autoscaler-api/src/main/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandler.java @@ -55,6 +55,8 @@ /* package */ class WorkerMetricHandler { private static final Logger logger = LoggerFactory.getLogger(WorkerMetricHandler.class); + private static final int DEFAULT_METRICS_INTERVAL_SECONDS = 30; + private final PublishSubject metricDataSubject = PublishSubject.create(); private final Observer jobAutoScaleObserver; private final MantisMasterGateway masterClientApi; @@ -62,6 +64,7 @@ private final MetricAggregator metricAggregator; private final Map numWorkersByStage = new HashMap<>(); private final Map> workerHostsByStage = new HashMap<>(); + private final int metricsIntervalSeconds; private final String jobId; private final Func1 lookupNumWorkersByStage = stage -> { @@ -82,12 +85,22 @@ public WorkerMetricHandler(final String jobId, final MantisMasterGateway masterClientApi, final AutoScaleMetricsConfig autoScaleMetricsConfig, final JobAutoscalerManager jobAutoscalerManager) { + this(jobId, jobAutoScaleObserver, masterClientApi, autoScaleMetricsConfig, jobAutoscalerManager, DEFAULT_METRICS_INTERVAL_SECONDS); + } + + public WorkerMetricHandler(final String jobId, + final Observer jobAutoScaleObserver, + final MantisMasterGateway masterClientApi, + final AutoScaleMetricsConfig autoScaleMetricsConfig, + final JobAutoscalerManager jobAutoscalerManager, + final int metricsIntervalSeconds) { this.jobId = jobId; this.jobAutoScaleObserver = jobAutoScaleObserver; this.masterClientApi = masterClientApi; this.autoScaleMetricsConfig = autoScaleMetricsConfig; this.metricAggregator = new MetricAggregator(autoScaleMetricsConfig); this.jobAutoscalerManager = jobAutoscalerManager; + this.metricsIntervalSeconds = metricsIntervalSeconds; } public Observer initAndGetMetricDataObserver() { @@ -129,13 +142,17 @@ private class StageMetricDataOperator implements Observable.Operator workerNumberByIndex = new HashMap<>(); + private final int metricsIntervalSeconds; + public StageMetricDataOperator(final int stage, final Func1 numStageWorkersFn, - final AutoScaleMetricsConfig autoScaleMetricsConfig) { + final AutoScaleMetricsConfig autoScaleMetricsConfig, + final int metricsIntervalSeconds) { logger.debug("setting operator for stage " + stage); this.stage = stage; this.numStageWorkersFn = numStageWorkersFn; this.autoScaleMetricsConfig = autoScaleMetricsConfig; + this.metricsIntervalSeconds = metricsIntervalSeconds; Action1 workerResubmitFunc = workerIndex -> { try { @@ -262,8 +279,6 @@ private void addSourceJobDataPoint(final MetricData datapoint) { } } - private static final int metricsIntervalSeconds = 30; // TODO make it configurable - @Override public Subscriber call(final Subscriber child) { rx.Scheduler.Worker worker = Schedulers.computation().createWorker(); @@ -510,7 +525,7 @@ private void start() { .doOnNext(go -> { final Integer stage = go.getKey(); final Subscription s = go - .lift(new StageMetricDataOperator(stage, lookupNumWorkersByStage, autoScaleMetricsConfig)) + .lift(new StageMetricDataOperator(stage, lookupNumWorkersByStage, autoScaleMetricsConfig, metricsIntervalSeconds)) .subscribe(); logger.info("adding subscription for stage {} StageMetricDataOperator", stage); stageSubscriptions.add(s); diff --git a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java index 91e12c4bf..04d529226 100644 --- a/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java +++ b/mantis-runtime-executor/src/test/java/io/mantisrx/server/worker/jobmaster/WorkerMetricHandlerTest.java @@ -82,6 +82,7 @@ public void testDropDataMetricTriggersAutoScale() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig(); + final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30 final List events = new ArrayList<>(); final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer() { @@ -105,14 +106,14 @@ public void onNext(JobAutoScaler.Event event) { } latch.countDown(); } - }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); // Purposely create a new String for jobId metricDataObserver.onNext(new MetricData(new String(jobId), stage, workerIdx, workerNum, DATA_DROP_METRIC_GROUP, gauges)); - assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS)); + assertTrue(latch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS)); } @@ -153,6 +154,7 @@ public void testKafkaLagAndUserDefinedTriggersAutoScale() throws InterruptedExce final CountDownLatch latch = new CountDownLatch(2); final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig(Collections.singletonMap(testMetricGroup, Collections.singletonMap(testMetricName, AutoScaleMetricsConfig.AggregationAlgo.AVERAGE))); + final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30 final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer() { @Override @@ -179,7 +181,7 @@ public void onNext(JobAutoScaler.Event event) { } latch.countDown(); } - }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); @@ -188,7 +190,7 @@ public void onNext(JobAutoScaler.Event event) { metricDataObserver.onNext(new MetricData(jobId, stage, workerIdx, workerNum, testMetricGroup, gauges1)); metricDataObserver.onNext(new MetricData(jobId, stage, workerIdx + 1, workerNum + 1, testMetricGroup, gauges2)); - assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS)); + assertTrue(latch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS)); } @Test @@ -253,6 +255,7 @@ public Boolean call(Integer integer) { final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig(); + final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30 final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer() { @Override public void onCompleted() { @@ -274,7 +277,7 @@ public void onNext(JobAutoScaler.Event event) { } autoScaleLatch.countDown(); } - }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); @@ -288,8 +291,8 @@ public void onNext(JobAutoScaler.Event event) { metricDataObserver.onNext(new MetricData(jobId, stage, workerIdx + 2, workerNum + 2, DATA_DROP_METRIC_GROUP, zeroDropGauges)); } - assertTrue(resubmitLatch.await(30, TimeUnit.SECONDS)); - assertTrue(autoScaleLatch.await(30 + 5/* leeway */, TimeUnit.SECONDS)); + assertTrue(resubmitLatch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS)); + assertTrue(autoScaleLatch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS)); } @Test @@ -309,6 +312,7 @@ public void testSourceJobDropMetricTriggersAutoScale() throws InterruptedExcepti final CountDownLatch latch = new CountDownLatch(1); final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig(); + final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30 final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer() { @Override @@ -330,7 +334,7 @@ public void onNext(JobAutoScaler.Event event) { latch.countDown(); } } - }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT); + }, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs); final Observer metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver(); @@ -363,6 +367,6 @@ public void onNext(JobAutoScaler.Event event) { new GaugeMeasurement(DROPPED_COUNTER_METRIC_NAME, 5.0)); metricDataObserver.onNext(new MetricData(sourceJobId, stage, 1, 2, "ServerSentEventRequestHandler:clientId=" + jobId + ":sockAddr=/2.2.2.2", gauges)); - assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS)); + assertTrue(latch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS)); } }