Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
Expand All @@ -57,9 +56,7 @@
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.ExecutionPlanWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
Expand Down Expand Up @@ -591,10 +588,6 @@ private CompletableFuture<Boolean> isInGloballyTerminalState(JobID jobId) {
}

private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan executionPlan) {
if (executionPlan instanceof JobGraph) {
applyParallelismOverrides((JobGraph) executionPlan);
}

log.info("Submitting job '{}' ({}).", executionPlan.getName(), executionPlan.getJobID());

// track as an outstanding job
Expand Down Expand Up @@ -1628,25 +1621,6 @@ public CompletableFuture<Void> onRemovedExecutionPlan(JobID jobId) {
return CompletableFuture.runAsync(() -> terminateJob(jobId), getMainThreadExecutor(jobId));
}

private void applyParallelismOverrides(JobGraph jobGraph) {
Map<String, String> overrides = new HashMap<>();
overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
for (JobVertex vertex : jobGraph.getVertices()) {
String override = overrides.get(vertex.getID().toHexString());
if (override != null) {
int currentParallelism = vertex.getParallelism();
int overrideParallelism = Integer.parseInt(override);
log.info(
"Changing job vertex {} parallelism from {} to {}",
vertex.getID(),
currentParallelism,
overrideParallelism);
vertex.setParallelism(overrideParallelism);
}
}
}

private Executor getIoExecutor(JobID jobID) {
// todo: consider caching
return MdcUtils.scopeToJob(jobID, ioExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public SchedulerNG createInstance(
"Unsupported execution plan " + executionPlan.getClass().getCanonicalName());
}

// Apply parallelism overrides after StreamGraph -> JobGraph conversion
ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);

final SlotPool slotPool =
slotPoolService
.castInto(SlotPool.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.scheduler;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* Utility class for applying parallelism overrides from configuration to JobGraph vertices.
*
* <p>This utility must be called after converting StreamGraph to JobGraph in all SchedulerNGFactory
* implementations to ensure parallelism overrides are respected in Application Mode (where
* StreamGraph is submitted directly).
*/
@Internal
public class ParallelismOverrideUtil {

private static final Logger LOG = LoggerFactory.getLogger(ParallelismOverrideUtil.class);

/**
* Applies parallelism overrides from configuration to the JobGraph.
*
* <p>Overrides are taken from two sources (in order of precedence):
*
* <ol>
* <li>JobGraph configuration (higher precedence)
* <li>Job master configuration (lower precedence)
* </ol>
*
* @param jobGraph the JobGraph to modify
* @param jobMasterConfiguration the job master configuration containing potential overrides
*/
public static void applyParallelismOverrides(
JobGraph jobGraph, Configuration jobMasterConfiguration) {
Map<String, String> overrides = new HashMap<>();

// Add overrides from job master configuration
Map<String, String> masterConfigOverrides =
jobMasterConfiguration.get(PipelineOptions.PARALLELISM_OVERRIDES);
overrides.putAll(masterConfigOverrides);

// Add overrides from job configuration (these take precedence)
Map<String, String> jobConfigOverrides =
jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES);
overrides.putAll(jobConfigOverrides);

// Apply overrides to each vertex
for (JobVertex vertex : jobGraph.getVertices()) {
String vertexIdHex = vertex.getID().toHexString();
String override = overrides.get(vertexIdHex);
if (override != null) {
int currentParallelism = vertex.getParallelism();
int overrideParallelism = Integer.parseInt(override);
LOG.info(
"Applying parallelism override for job vertex {} ({}): {} -> {}",
vertex.getName(),
vertex.getID(),
currentParallelism,
overrideParallelism);
vertex.setParallelism(overrideParallelism);
}
}
}

private ParallelismOverrideUtil() {
// Utility class, not meant to be instantiated
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ParallelismOverrideUtil;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator;
Expand Down Expand Up @@ -99,6 +100,9 @@ public SchedulerNG createInstance(
"Unsupported execution plan " + executionPlan.getClass().getCanonicalName());
}

// Apply parallelism overrides after StreamGraph -> JobGraph conversion
ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);

final DeclarativeSlotPool declarativeSlotPool =
slotPoolService
.castInto(DeclarativeSlotPool.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.ParallelismOverrideUtil;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerNGFactory;
import org.apache.flink.runtime.scheduler.SimpleExecutionSlotAllocator;
Expand Down Expand Up @@ -122,18 +123,23 @@ public SchedulerNG createInstance(
Collection<FailureEnricher> failureEnrichers,
BlocklistOperations blocklistOperations)
throws Exception {
ExecutionConfig executionConfig;
final JobGraph jobGraph;
final ExecutionConfig executionConfig;

if (executionPlan instanceof JobGraph) {
jobGraph = (JobGraph) executionPlan;
executionConfig =
executionPlan.getSerializedExecutionConfig().deserializeValue(userCodeLoader);
} else if (executionPlan instanceof StreamGraph) {
jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader);
executionConfig = ((StreamGraph) executionPlan).getExecutionConfig();
} else {
throw new FlinkException(
"Unsupported execution plan " + executionPlan.getClass().getCanonicalName());
}

ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);

final SlotPool slotPool =
slotPoolService
.castInto(SlotPool.class)
Expand Down Expand Up @@ -175,7 +181,7 @@ public SchedulerNG createInstance(

return createScheduler(
log,
executionPlan,
jobGraph,
executionConfig,
ioExecutor,
jobMasterConfiguration,
Expand Down Expand Up @@ -285,7 +291,8 @@ public static AdaptiveBatchScheduler createScheduler(
executionPlan,
jobRecoveryHandler instanceof DefaultBatchJobRecoveryHandler,
userCodeLoader,
futureExecutor);
futureExecutor,
jobMasterConfiguration);

return new AdaptiveBatchScheduler(
log,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.flink.runtime.scheduler.adaptivebatch;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.scheduler.ParallelismOverrideUtil;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.util.DynamicCodeLoadingException;
Expand Down Expand Up @@ -46,6 +48,8 @@ public class AdaptiveExecutionHandlerFactory {
* @param enableBatchJobRecovery Whether to enable batch job recovery.
* @param userClassLoader The class loader for the user code.
* @param serializationExecutor The executor used for serialization tasks.
* @param jobMasterConfiguration The job master configuration containing potential parallelism
* overrides.
* @return An instance of {@link AdaptiveExecutionHandler}.
* @throws IllegalArgumentException if the execution plan is neither a {@link JobGraph} nor a
* {@link StreamGraph}.
Expand All @@ -54,15 +58,19 @@ public static AdaptiveExecutionHandler create(
ExecutionPlan executionPlan,
boolean enableBatchJobRecovery,
ClassLoader userClassLoader,
Executor serializationExecutor)
Executor serializationExecutor,
Configuration jobMasterConfiguration)
throws DynamicCodeLoadingException {
if (executionPlan instanceof JobGraph) {
return new NonAdaptiveExecutionHandler((JobGraph) executionPlan);
} else {
checkState(executionPlan instanceof StreamGraph, "Unsupported execution plan.");
if (enableBatchJobRecovery) {
StreamGraph streamGraph = (StreamGraph) executionPlan;
return new NonAdaptiveExecutionHandler(streamGraph.getJobGraph(userClassLoader));
JobGraph jobGraph = streamGraph.getJobGraph(userClassLoader);
// Apply parallelism overrides after StreamGraph -> JobGraph conversion
ParallelismOverrideUtil.applyParallelismOverrides(jobGraph, jobMasterConfiguration);
return new NonAdaptiveExecutionHandler(jobGraph);
} else {
return new DefaultAdaptiveExecutionHandler(
userClassLoader, (StreamGraph) executionPlan, serializationExecutor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,7 +1248,7 @@ public void testRequestMultipleJobDetails_isSerializable() throws Exception {
}

@Test
public void testOverridingJobVertexParallelisms() throws Exception {
public void testDispatcherDoesNotOverrideJobVertexParallelisms() throws Exception {
JobVertex v1 = new JobVertex("v1");
v1.setParallelism(1);
JobVertex v2 = new JobVertex("v2");
Expand Down Expand Up @@ -1288,9 +1288,11 @@ public void testOverridingJobVertexParallelisms() throws Exception {

dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();

Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 10);
// Verify that Dispatcher does NOT apply parallelism overrides directly
// Overrides are applied in scheduler factories, not in Dispatcher
Assert.assertEquals(jobGraph.findVertexByID(v1.getID()).getParallelism(), 1);
Assert.assertEquals(jobGraph.findVertexByID(v2.getID()).getParallelism(), 2);
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 42);
Assert.assertEquals(jobGraph.findVertexByID(v3.getID()).getParallelism(), 3);
}

private JobManagerRunner runningJobManagerRunnerWithJobStatus(
Expand Down
Loading