Skip to content

Commit 12a6064

Browse files
committed
Defer broadcast join probe leaf splits
Avoids starting join probe leaf splits for tasks that are awaiting the build side completion. Otherwise, build side leaf split concurrency will be starved by probe side splits that start and then immediately block.
1 parent 137e53a commit 12a6064

File tree

8 files changed

+98
-6
lines changed

8 files changed

+98
-6
lines changed

core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class SqlTaskExecution
105105
private final List<PlanNodeId> sourceStartOrder;
106106
@GuardedBy("this")
107107
private int schedulingPlanNodeOrdinal;
108+
@GuardedBy("this")
109+
private ListenableFuture<Void> pipelineDependenciesSatisfied = immediateVoidFuture();
108110

109111
@GuardedBy("this")
110112
private final Map<PlanNodeId, PendingSplitsForPlanNode> pendingSplitsByPlanNode;
@@ -309,7 +311,8 @@ private synchronized Set<PlanNodeId> updateSplitAssignments(List<SplitAssignment
309311
// update task with new sources
310312
for (SplitAssignment splitAssignment : unacknowledgedSplitAssignment) {
311313
if (driverRunnerFactoriesWithSplitLifeCycle.containsKey(splitAssignment.getPlanNodeId())) {
312-
schedulePartitionedSource(splitAssignment);
314+
mergeIntoPendingSplits(splitAssignment.getPlanNodeId(), splitAssignment.getSplits(), splitAssignment.isNoMoreSplits());
315+
schedulePartitionedSourcePendingSplits();
313316
}
314317
else {
315318
// tell existing drivers about the new splits
@@ -337,15 +340,34 @@ private void mergeIntoPendingSplits(PlanNodeId planNodeId, Set<ScheduledSplit> s
337340
}
338341
}
339342

340-
private synchronized void schedulePartitionedSource(SplitAssignment splitAssignmentUpdate)
343+
private synchronized void scheduleSourcePartitionedSplitsAfterPipelineUnblocked()
341344
{
342-
mergeIntoPendingSplits(splitAssignmentUpdate.getPlanNodeId(), splitAssignmentUpdate.getSplits(), splitAssignmentUpdate.isNoMoreSplits());
345+
try (SetThreadName _ = new SetThreadName("Task-" + taskId)) {
346+
// Enqueue pending splits as split runners after unblocking
347+
schedulePartitionedSourcePendingSplits();
348+
// Re-check for task completion since we may have just set no more splits
349+
checkTaskCompletion();
350+
}
351+
}
343352

353+
private synchronized void schedulePartitionedSourcePendingSplits()
354+
{
344355
while (schedulingPlanNodeOrdinal < sourceStartOrder.size()) {
345356
PlanNodeId schedulingPlanNode = sourceStartOrder.get(schedulingPlanNodeOrdinal);
346357

347358
DriverSplitRunnerFactory partitionedDriverRunnerFactory = driverRunnerFactoriesWithSplitLifeCycle.get(schedulingPlanNode);
348359

360+
// Avoid creating split runners for pipelines that are awaiting another pipeline completing (e.g. probe side of a join waiting
361+
// on the broadcast completion). Otherwise, build side pipelines will have reduced concurrency available.
362+
ListenableFuture<Void> pipelineDependenciesSatisfied = partitionedDriverRunnerFactory.getPipelineDependenciesSatisfied();
363+
if (!pipelineDependenciesSatisfied.isDone()) {
364+
// Only register a single re-schedule listener if we're blocked on pipeline dependencies
365+
if (this.pipelineDependenciesSatisfied.isDone()) {
366+
this.pipelineDependenciesSatisfied = pipelineDependenciesSatisfied;
367+
pipelineDependenciesSatisfied.addListener(this::scheduleSourcePartitionedSplitsAfterPipelineUnblocked, notificationExecutor);
368+
}
369+
break;
370+
}
349371
PendingSplitsForPlanNode pendingSplits = pendingSplitsByPlanNode.get(schedulingPlanNode);
350372

351373
// Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination.
@@ -600,6 +622,11 @@ private DriverSplitRunnerFactory(DriverFactory driverFactory, Tracer tracer, boo
600622
.startSpan();
601623
}
602624

625+
public ListenableFuture<Void> getPipelineDependenciesSatisfied()
626+
{
627+
return driverFactory.getPipelineDependenciesSatisfied();
628+
}
629+
603630
public DriverSplitRunner createPartitionedDriverRunner(ScheduledSplit partitionedSplit)
604631
{
605632
return createDriverRunner(partitionedSplit, partitionedSplit.getSplit().getSplitWeight().getRawValue());

core/trino-main/src/main/java/io/trino/operator/DriverFactory.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
package io.trino.operator;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import com.google.common.util.concurrent.Futures;
18+
import com.google.common.util.concurrent.ListenableFuture;
1719
import com.google.errorprone.annotations.concurrent.GuardedBy;
1820
import io.trino.sql.planner.plan.PlanNodeId;
1921
import jakarta.annotation.Nullable;
@@ -26,6 +28,7 @@
2628
import static com.google.common.base.Preconditions.checkArgument;
2729
import static com.google.common.base.Preconditions.checkState;
2830
import static com.google.common.collect.ImmutableList.toImmutableList;
31+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
2932
import static java.util.Objects.requireNonNull;
3033

3134
public class DriverFactory
@@ -35,6 +38,7 @@ public class DriverFactory
3538
private final boolean outputDriver;
3639
private final Optional<PlanNodeId> sourceId;
3740
private final OptionalInt driverInstances;
41+
private final ListenableFuture<Void> pipelineDependenciesSatisfied;
3842

3943
// must synchronize between createDriver() and noMoreDrivers(), but isNoMoreDrivers() is safe without synchronizing
4044
@GuardedBy("this")
@@ -57,6 +61,11 @@ public DriverFactory(int pipelineId, boolean inputDriver, boolean outputDriver,
5761
.collect(toImmutableList());
5862
checkArgument(sourceIds.size() <= 1, "Expected at most one source operator in driver factory, but found %s", sourceIds);
5963
this.sourceId = sourceIds.isEmpty() ? Optional.empty() : Optional.of(sourceIds.get(0));
64+
List<ListenableFuture<Void>> pipelineDependencies = operatorFactories.stream()
65+
.map(OperatorFactory::pipelineDependenciesSatisfied)
66+
.filter(future -> !future.isDone())
67+
.collect(toImmutableList());
68+
this.pipelineDependenciesSatisfied = pipelineDependencies.isEmpty() ? Futures.immediateVoidFuture() : Futures.whenAllComplete(pipelineDependencies).call(() -> null, directExecutor());
6069
}
6170

6271
public int getPipelineId()
@@ -74,6 +83,11 @@ public boolean isOutputDriver()
7483
return outputDriver;
7584
}
7685

86+
public ListenableFuture<Void> getPipelineDependenciesSatisfied()
87+
{
88+
return pipelineDependenciesSatisfied;
89+
}
90+
7791
/**
7892
* return the sourceId of this DriverFactory.
7993
* A DriverFactory doesn't always have source node.

core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313
*/
1414
package io.trino.operator;
1515

16+
import com.google.common.util.concurrent.Futures;
17+
import com.google.common.util.concurrent.ListenableFuture;
18+
1619
public interface OperatorFactory
1720
{
1821
Operator createOperator(DriverContext driverContext);
@@ -27,4 +30,9 @@ public interface OperatorFactory
2730
void noMoreOperators();
2831

2932
OperatorFactory duplicate();
33+
34+
default ListenableFuture<Void> pipelineDependenciesSatisfied()
35+
{
36+
return Futures.immediateVoidFuture();
37+
}
3038
}

core/trino-main/src/main/java/io/trino/operator/WorkProcessorOperatorAdapter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.operator;
1515

1616
import com.google.common.annotations.VisibleForTesting;
17+
import com.google.common.util.concurrent.Futures;
1718
import com.google.common.util.concurrent.ListenableFuture;
1819
import io.trino.memory.context.MemoryTrackingContext;
1920
import io.trino.operator.join.JoinOperatorFactory;
@@ -86,6 +87,21 @@ public Optional<OperatorFactory> createOuterOperatorFactory()
8687
return lookupJoin.createOuterOperatorFactory();
8788
}
8889

90+
@Override
91+
public ListenableFuture<Void> buildPipelineReady()
92+
{
93+
if (!(operatorFactory instanceof JoinOperatorFactory lookupJoin)) {
94+
return Futures.immediateVoidFuture();
95+
}
96+
return lookupJoin.buildPipelineReady();
97+
}
98+
99+
@Override
100+
public ListenableFuture<Void> pipelineDependenciesSatisfied()
101+
{
102+
return buildPipelineReady();
103+
}
104+
89105
@VisibleForTesting
90106
public WorkProcessorOperatorFactory getWorkProcessorOperatorFactory()
91107
{

core/trino-main/src/main/java/io/trino/operator/join/JoinBridgeManager.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import com.google.common.annotations.VisibleForTesting;
1818
import com.google.common.util.concurrent.Futures;
1919
import com.google.common.util.concurrent.ListenableFuture;
20+
import com.google.common.util.concurrent.SettableFuture;
2021
import io.trino.operator.ReferenceCount;
2122
import io.trino.spi.type.Type;
2223

@@ -43,6 +44,7 @@ public static JoinBridgeManager<PartitionedLookupSourceFactory> lookupAllAtOnce(
4344
private final List<Type> buildOutputTypes;
4445
private final boolean buildOuter;
4546
private final T joinBridge;
47+
private final SettableFuture<Void> whenBuildFinishes = SettableFuture.create();
4648

4749
private final AtomicBoolean initialized = new AtomicBoolean();
4850
private JoinLifecycle joinLifecycle;
@@ -67,7 +69,7 @@ private void initializeIfNecessary()
6769
return;
6870
}
6971
int finalProbeFactoryCount = probeFactoryCount.get();
70-
joinLifecycle = new JoinLifecycle(joinBridge, finalProbeFactoryCount, buildOuter ? 1 : 0);
72+
joinLifecycle = new JoinLifecycle(whenBuildFinishes, joinBridge, finalProbeFactoryCount, buildOuter ? 1 : 0);
7173
initialized.set(true);
7274
}
7375
}
@@ -83,6 +85,11 @@ public void incrementProbeFactoryCount()
8385
probeFactoryCount.increment();
8486
}
8587

88+
public ListenableFuture<Void> getBuildFinishedFuture()
89+
{
90+
return whenBuildFinishes;
91+
}
92+
8693
public T getJoinBridge()
8794
{
8895
initializeIfNecessary();
@@ -139,8 +146,9 @@ private static class JoinLifecycle
139146
private final ListenableFuture<Void> whenBuildAndProbeFinishes;
140147
private final ListenableFuture<Void> whenAllFinishes;
141148

142-
public JoinLifecycle(JoinBridge joinBridge, int probeFactoryCount, int outerFactoryCount)
149+
private JoinLifecycle(SettableFuture<Void> notifyOnBuildFinished, JoinBridge joinBridge, int probeFactoryCount, int outerFactoryCount)
143150
{
151+
requireNonNull(notifyOnBuildFinished, "notifyOnBuildFinished is null");
144152
// When all probe and lookup-outer operators finish, destroy the join bridge (freeing the memory)
145153
// * Each LookupOuterOperatorFactory count as 1
146154
// * There is at most 1 LookupOuterOperatorFactory
@@ -152,7 +160,9 @@ public JoinLifecycle(JoinBridge joinBridge, int probeFactoryCount, int outerFact
152160
// * Each probe operator count as 1
153161
probeReferenceCount = new ReferenceCount(probeFactoryCount);
154162

155-
whenBuildAndProbeFinishes = Futures.whenAllSucceed(joinBridge.whenBuildFinishes(), probeReferenceCount.getFreeFuture()).call(() -> null, directExecutor());
163+
ListenableFuture<Void> whenBuildFinishes = joinBridge.whenBuildFinishes();
164+
notifyOnBuildFinished.setFuture(whenBuildFinishes);
165+
whenBuildAndProbeFinishes = Futures.whenAllSucceed(whenBuildFinishes, probeReferenceCount.getFreeFuture()).call(() -> null, directExecutor());
156166
whenAllFinishes = Futures.whenAllSucceed(whenBuildAndProbeFinishes, outerReferenceCount.getFreeFuture()).call(() -> null, directExecutor());
157167
whenAllFinishes.addListener(joinBridge::destroy, directExecutor());
158168
}

core/trino-main/src/main/java/io/trino/operator/join/JoinOperatorFactory.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,14 @@
1313
*/
1414
package io.trino.operator.join;
1515

16+
import com.google.common.util.concurrent.ListenableFuture;
1617
import io.trino.operator.OperatorFactory;
1718

1819
import java.util.Optional;
1920

2021
public interface JoinOperatorFactory
2122
{
2223
Optional<OperatorFactory> createOuterOperatorFactory();
24+
25+
ListenableFuture<Void> buildPipelineReady();
2326
}

core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.primitives.Ints;
18+
import com.google.common.util.concurrent.ListenableFuture;
1819
import io.trino.operator.HashGenerator;
1920
import io.trino.operator.JoinOperatorType;
2021
import io.trino.operator.OperatorFactory;
@@ -164,6 +165,12 @@ public String getOperatorType()
164165
return LookupJoinOperator.class.getSimpleName();
165166
}
166167

168+
@Override
169+
public ListenableFuture<Void> buildPipelineReady()
170+
{
171+
return joinBridgeManager.getBuildFinishedFuture();
172+
}
173+
167174
@Override
168175
public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
169176
{

core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.trino.operator.join.unspilled;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import com.google.common.util.concurrent.ListenableFuture;
1718
import io.trino.operator.JoinOperatorType;
1819
import io.trino.operator.OperatorFactory;
1920
import io.trino.operator.ProcessorContext;
@@ -133,6 +134,12 @@ public String getOperatorType()
133134
return LookupJoinOperator.class.getSimpleName();
134135
}
135136

137+
@Override
138+
public ListenableFuture<Void> buildPipelineReady()
139+
{
140+
return joinBridgeManager.getBuildFinishedFuture();
141+
}
142+
136143
@Override
137144
public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor<Page> sourcePages)
138145
{

0 commit comments

Comments
 (0)