Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,16 @@
/* package */ class WorkerMetricHandler {

private static final Logger logger = LoggerFactory.getLogger(WorkerMetricHandler.class);
private static final int DEFAULT_METRICS_INTERVAL_SECONDS = 30;

private final PublishSubject<MetricData> metricDataSubject = PublishSubject.create();
private final Observer<JobAutoScaler.Event> jobAutoScaleObserver;
private final MantisMasterGateway masterClientApi;
private final AutoScaleMetricsConfig autoScaleMetricsConfig;
private final MetricAggregator metricAggregator;
private final Map<Integer, Integer> numWorkersByStage = new HashMap<>();
private final Map<Integer, List<WorkerHost>> workerHostsByStage = new HashMap<>();
private final int metricsIntervalSeconds;

private final String jobId;
private final Func1<Integer, Integer> lookupNumWorkersByStage = stage -> {
Expand All @@ -82,12 +85,22 @@ public WorkerMetricHandler(final String jobId,
final MantisMasterGateway masterClientApi,
final AutoScaleMetricsConfig autoScaleMetricsConfig,
final JobAutoscalerManager jobAutoscalerManager) {
this(jobId, jobAutoScaleObserver, masterClientApi, autoScaleMetricsConfig, jobAutoscalerManager, DEFAULT_METRICS_INTERVAL_SECONDS);
}

public WorkerMetricHandler(final String jobId,
final Observer<JobAutoScaler.Event> jobAutoScaleObserver,
final MantisMasterGateway masterClientApi,
final AutoScaleMetricsConfig autoScaleMetricsConfig,
final JobAutoscalerManager jobAutoscalerManager,
final int metricsIntervalSeconds) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be on WorkerMetricHandler? looks it only used in stageMetricDataOperator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought process was that WorkerMetricHandler is responsible for instantiating the StageMetricDataOperator. The alternative would be to pass it in directly via the initAndGetMetricDataObserver method.

this.jobId = jobId;
this.jobAutoScaleObserver = jobAutoScaleObserver;
this.masterClientApi = masterClientApi;
this.autoScaleMetricsConfig = autoScaleMetricsConfig;
this.metricAggregator = new MetricAggregator(autoScaleMetricsConfig);
this.jobAutoscalerManager = jobAutoscalerManager;
this.metricsIntervalSeconds = metricsIntervalSeconds;
}

public Observer<MetricData> initAndGetMetricDataObserver() {
Expand Down Expand Up @@ -129,13 +142,17 @@ private class StageMetricDataOperator implements Observable.Operator<Object, Met

private final Map<Integer, Integer> workerNumberByIndex = new HashMap<>();

private final int metricsIntervalSeconds;

public StageMetricDataOperator(final int stage,
final Func1<Integer, Integer> numStageWorkersFn,
final AutoScaleMetricsConfig autoScaleMetricsConfig) {
final AutoScaleMetricsConfig autoScaleMetricsConfig,
final int metricsIntervalSeconds) {
logger.debug("setting operator for stage " + stage);
this.stage = stage;
this.numStageWorkersFn = numStageWorkersFn;
this.autoScaleMetricsConfig = autoScaleMetricsConfig;
this.metricsIntervalSeconds = metricsIntervalSeconds;

Action1<Integer> workerResubmitFunc = workerIndex -> {
try {
Expand Down Expand Up @@ -262,8 +279,6 @@ private void addSourceJobDataPoint(final MetricData datapoint) {
}
}

private static final int metricsIntervalSeconds = 30; // TODO make it configurable

@Override
public Subscriber<? super MetricData> call(final Subscriber<? super Object> child) {
rx.Scheduler.Worker worker = Schedulers.computation().createWorker();
Expand Down Expand Up @@ -510,7 +525,7 @@ private void start() {
.doOnNext(go -> {
final Integer stage = go.getKey();
final Subscription s = go
.lift(new StageMetricDataOperator(stage, lookupNumWorkersByStage, autoScaleMetricsConfig))
.lift(new StageMetricDataOperator(stage, lookupNumWorkersByStage, autoScaleMetricsConfig, metricsIntervalSeconds))
.subscribe();
logger.info("adding subscription for stage {} StageMetricDataOperator", stage);
stageSubscriptions.add(s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void testDropDataMetricTriggersAutoScale() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);

final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig();
final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30

final List<JobAutoScaler.Event> events = new ArrayList<>();
final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer<JobAutoScaler.Event>() {
Expand All @@ -105,14 +106,14 @@ public void onNext(JobAutoScaler.Event event) {
}
latch.countDown();
}
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs);

final Observer<MetricData> metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver();

// Purposely create a new String for jobId
metricDataObserver.onNext(new MetricData(new String(jobId), stage, workerIdx, workerNum, DATA_DROP_METRIC_GROUP, gauges));

assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS));
assertTrue(latch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS));

}

Expand Down Expand Up @@ -153,6 +154,7 @@ public void testKafkaLagAndUserDefinedTriggersAutoScale() throws InterruptedExce
final CountDownLatch latch = new CountDownLatch(2);

final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig(Collections.singletonMap(testMetricGroup, Collections.singletonMap(testMetricName, AutoScaleMetricsConfig.AggregationAlgo.AVERAGE)));
final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30

final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer<JobAutoScaler.Event>() {
@Override
Expand All @@ -179,7 +181,7 @@ public void onNext(JobAutoScaler.Event event) {
}
latch.countDown();
}
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs);

final Observer<MetricData> metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver();

Expand All @@ -188,7 +190,7 @@ public void onNext(JobAutoScaler.Event event) {
metricDataObserver.onNext(new MetricData(jobId, stage, workerIdx, workerNum, testMetricGroup, gauges1));
metricDataObserver.onNext(new MetricData(jobId, stage, workerIdx + 1, workerNum + 1, testMetricGroup, gauges2));

assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS));
assertTrue(latch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS));
}

@Test
Expand Down Expand Up @@ -253,6 +255,7 @@ public Boolean call(Integer integer) {


final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig();
final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30
final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer<JobAutoScaler.Event>() {
@Override
public void onCompleted() {
Expand All @@ -274,7 +277,7 @@ public void onNext(JobAutoScaler.Event event) {
}
autoScaleLatch.countDown();
}
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs);


final Observer<MetricData> metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver();
Expand All @@ -288,8 +291,8 @@ public void onNext(JobAutoScaler.Event event) {
metricDataObserver.onNext(new MetricData(jobId, stage, workerIdx + 2, workerNum + 2,
DATA_DROP_METRIC_GROUP, zeroDropGauges));
}
assertTrue(resubmitLatch.await(30, TimeUnit.SECONDS));
assertTrue(autoScaleLatch.await(30 + 5/* leeway */, TimeUnit.SECONDS));
assertTrue(resubmitLatch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS));
assertTrue(autoScaleLatch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS));
}

@Test
Expand All @@ -309,6 +312,7 @@ public void testSourceJobDropMetricTriggersAutoScale() throws InterruptedExcepti
final CountDownLatch latch = new CountDownLatch(1);

final AutoScaleMetricsConfig aggregationConfig = new AutoScaleMetricsConfig();
final int testMetricsIntervalSecs = 1; // Use 1 second for tests instead of default 30

final WorkerMetricHandler workerMetricHandler = new WorkerMetricHandler(jobId, new Observer<JobAutoScaler.Event>() {
@Override
Expand All @@ -330,7 +334,7 @@ public void onNext(JobAutoScaler.Event event) {
latch.countDown();
}
}
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT);
}, mockMasterClientApi, aggregationConfig, JobAutoscalerManager.DEFAULT, testMetricsIntervalSecs);

final Observer<MetricData> metricDataObserver = workerMetricHandler.initAndGetMetricDataObserver();

Expand Down Expand Up @@ -363,6 +367,6 @@ public void onNext(JobAutoScaler.Event event) {
new GaugeMeasurement(DROPPED_COUNTER_METRIC_NAME, 5.0));
metricDataObserver.onNext(new MetricData(sourceJobId, stage, 1, 2, "ServerSentEventRequestHandler:clientId=" + jobId + ":sockAddr=/2.2.2.2", gauges));

assertTrue(latch.await(30 + 5/* leeway */, TimeUnit.SECONDS));
assertTrue(latch.await(testMetricsIntervalSecs + 2/* leeway */, TimeUnit.SECONDS));
}
}
Loading