diff --git a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java index bf5a1f84c6dd..4003f94f10e1 100644 --- a/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java +++ b/core/trino-main/src/main/java/io/trino/execution/SqlTaskExecution.java @@ -105,6 +105,8 @@ public class SqlTaskExecution private final List sourceStartOrder; @GuardedBy("this") private int schedulingPlanNodeOrdinal; + @GuardedBy("this") + private ListenableFuture pipelineDependenciesSatisfied = immediateVoidFuture(); @GuardedBy("this") private final Map pendingSplitsByPlanNode; @@ -309,7 +311,8 @@ private synchronized Set updateSplitAssignments(List s PendingSplitsForPlanNode pendingSplitsForPlanNode = pendingSplitsByPlanNode.get(planNodeId); partitionedDriverFactory.splitsAdded(scheduledSplits.size(), SplitWeight.rawValueSum(scheduledSplits, scheduledSplit -> scheduledSplit.getSplit().getSplitWeight())); - for (ScheduledSplit scheduledSplit : scheduledSplits) { - pendingSplitsForPlanNode.addSplit(scheduledSplit); - } + pendingSplitsForPlanNode.addSplits(scheduledSplits); if (noMoreSplits) { pendingSplitsForPlanNode.setNoMoreSplits(); } } - private synchronized void schedulePartitionedSource(SplitAssignment splitAssignmentUpdate) + private synchronized void scheduleSourcePartitionedSplitsAfterPipelineUnblocked() { - mergeIntoPendingSplits(splitAssignmentUpdate.getPlanNodeId(), splitAssignmentUpdate.getSplits(), splitAssignmentUpdate.isNoMoreSplits()); + try (SetThreadName _ = new SetThreadName("Task-" + taskId)) { + // Enqueue pending splits as split runners after unblocking + schedulePartitionedSourcePendingSplits(); + // Re-check for task completion since we may have just set no more splits + checkTaskCompletion(); + } + } + private synchronized void schedulePartitionedSourcePendingSplits() + { while (schedulingPlanNodeOrdinal < sourceStartOrder.size()) { PlanNodeId schedulingPlanNode = sourceStartOrder.get(schedulingPlanNodeOrdinal); DriverSplitRunnerFactory partitionedDriverRunnerFactory = driverRunnerFactoriesWithSplitLifeCycle.get(schedulingPlanNode); + // Avoid creating split runners for pipelines that are awaiting another pipeline completing (e.g. probe side of a join waiting + // on the broadcast completion). Otherwise, build side pipelines will have reduced concurrency available. + ListenableFuture pipelineDependenciesSatisfied = partitionedDriverRunnerFactory.getPipelineDependenciesSatisfied(); + if (!pipelineDependenciesSatisfied.isDone()) { + // Only register a single re-schedule listener if we're blocked on pipeline dependencies + if (this.pipelineDependenciesSatisfied.isDone()) { + this.pipelineDependenciesSatisfied = pipelineDependenciesSatisfied; + pipelineDependenciesSatisfied.addListener(this::scheduleSourcePartitionedSplitsAfterPipelineUnblocked, notificationExecutor); + } + break; + } PendingSplitsForPlanNode pendingSplits = pendingSplitsByPlanNode.get(schedulingPlanNode); // Enqueue driver runners with split lifecycle for this plan node and driver life cycle combination. @@ -541,10 +561,10 @@ public SplitsState getState() return state; } - public void addSplit(ScheduledSplit scheduledSplit) + public void addSplits(Set scheduledSplits) { checkState(state == ADDING_SPLITS); - splits.add(scheduledSplit); + splits.addAll(scheduledSplits); } public Set removeAllSplits() @@ -602,6 +622,11 @@ private DriverSplitRunnerFactory(DriverFactory driverFactory, Tracer tracer, boo .startSpan(); } + public ListenableFuture getPipelineDependenciesSatisfied() + { + return driverFactory.getPipelineDependenciesSatisfied(); + } + public DriverSplitRunner createPartitionedDriverRunner(ScheduledSplit partitionedSplit) { return createDriverRunner(partitionedSplit, partitionedSplit.getSplit().getSplitWeight().getRawValue()); diff --git a/core/trino-main/src/main/java/io/trino/operator/DriverFactory.java b/core/trino-main/src/main/java/io/trino/operator/DriverFactory.java index 3456c5800097..2321d9efdc13 100644 --- a/core/trino-main/src/main/java/io/trino/operator/DriverFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/DriverFactory.java @@ -14,6 +14,8 @@ package io.trino.operator; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import com.google.errorprone.annotations.concurrent.GuardedBy; import io.trino.sql.planner.plan.PlanNodeId; import jakarta.annotation.Nullable; @@ -26,6 +28,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static java.util.Objects.requireNonNull; public class DriverFactory @@ -35,6 +38,7 @@ public class DriverFactory private final boolean outputDriver; private final Optional sourceId; private final OptionalInt driverInstances; + private final ListenableFuture pipelineDependenciesSatisfied; // must synchronize between createDriver() and noMoreDrivers(), but isNoMoreDrivers() is safe without synchronizing @GuardedBy("this") @@ -57,6 +61,11 @@ public DriverFactory(int pipelineId, boolean inputDriver, boolean outputDriver, .collect(toImmutableList()); checkArgument(sourceIds.size() <= 1, "Expected at most one source operator in driver factory, but found %s", sourceIds); this.sourceId = sourceIds.isEmpty() ? Optional.empty() : Optional.of(sourceIds.get(0)); + List> pipelineDependencies = operatorFactories.stream() + .map(OperatorFactory::pipelineDependenciesSatisfied) + .filter(future -> !future.isDone()) + .collect(toImmutableList()); + this.pipelineDependenciesSatisfied = pipelineDependencies.isEmpty() ? Futures.immediateVoidFuture() : Futures.whenAllComplete(pipelineDependencies).call(() -> null, directExecutor()); } public int getPipelineId() @@ -74,6 +83,11 @@ public boolean isOutputDriver() return outputDriver; } + public ListenableFuture getPipelineDependenciesSatisfied() + { + return pipelineDependenciesSatisfied; + } + /** * return the sourceId of this DriverFactory. * A DriverFactory doesn't always have source node. diff --git a/core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java index 5cd54c846575..7da52388ac44 100644 --- a/core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/OperatorFactory.java @@ -13,6 +13,10 @@ */ package io.trino.operator; +import com.google.common.util.concurrent.ListenableFuture; + +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; + public interface OperatorFactory { Operator createOperator(DriverContext driverContext); @@ -27,4 +31,14 @@ public interface OperatorFactory void noMoreOperators(); OperatorFactory duplicate(); + + /** + * Returns a future indicating that any dependencies operators have on other pipelines has been satisfied and that leaf splits + * should be allowed to start for this operator. This is used to prevent join probe splits from starting before the build side + * of a join is ready when the two are in the same stage (i.e.: broadcast join on top of a table scan). + */ + default ListenableFuture pipelineDependenciesSatisfied() + { + return immediateVoidFuture(); + } } diff --git a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorOperatorAdapter.java b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorOperatorAdapter.java index 82beb61e881a..999114c606a2 100644 --- a/core/trino-main/src/main/java/io/trino/operator/WorkProcessorOperatorAdapter.java +++ b/core/trino-main/src/main/java/io/trino/operator/WorkProcessorOperatorAdapter.java @@ -14,6 +14,7 @@ package io.trino.operator; import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import io.trino.memory.context.MemoryTrackingContext; import io.trino.operator.join.JoinOperatorFactory; @@ -86,6 +87,21 @@ public Optional createOuterOperatorFactory() return lookupJoin.createOuterOperatorFactory(); } + @Override + public ListenableFuture buildPipelineReady() + { + if (!(operatorFactory instanceof JoinOperatorFactory lookupJoin)) { + return Futures.immediateVoidFuture(); + } + return lookupJoin.buildPipelineReady(); + } + + @Override + public ListenableFuture pipelineDependenciesSatisfied() + { + return buildPipelineReady(); + } + @VisibleForTesting public WorkProcessorOperatorFactory getWorkProcessorOperatorFactory() { diff --git a/core/trino-main/src/main/java/io/trino/operator/join/JoinBridgeManager.java b/core/trino-main/src/main/java/io/trino/operator/join/JoinBridgeManager.java index ada26f41a79c..7ec2ca55f5c2 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/JoinBridgeManager.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/JoinBridgeManager.java @@ -43,6 +43,7 @@ public static JoinBridgeManager lookupAllAtOnce( private final List buildOutputTypes; private final boolean buildOuter; private final T joinBridge; + private final ListenableFuture whenBuildFinishes; private final AtomicBoolean initialized = new AtomicBoolean(); private JoinLifecycle joinLifecycle; @@ -57,6 +58,7 @@ public JoinBridgeManager( this.buildOuter = buildOuter; this.joinBridge = requireNonNull(joinBridge, "joinBridge is null"); this.buildOutputTypes = requireNonNull(buildOutputTypes, "buildOutputTypes is null"); + this.whenBuildFinishes = requireNonNull(joinBridge.whenBuildFinishes(), "whenBuildFinishes is null"); } private void initializeIfNecessary() @@ -67,7 +69,7 @@ private void initializeIfNecessary() return; } int finalProbeFactoryCount = probeFactoryCount.get(); - joinLifecycle = new JoinLifecycle(joinBridge, finalProbeFactoryCount, buildOuter ? 1 : 0); + joinLifecycle = new JoinLifecycle(whenBuildFinishes, joinBridge, finalProbeFactoryCount, buildOuter ? 1 : 0); initialized.set(true); } } @@ -83,6 +85,11 @@ public void incrementProbeFactoryCount() probeFactoryCount.increment(); } + public ListenableFuture getBuildFinishedFuture() + { + return whenBuildFinishes; + } + public T getJoinBridge() { initializeIfNecessary(); @@ -139,7 +146,7 @@ private static class JoinLifecycle private final ListenableFuture whenBuildAndProbeFinishes; private final ListenableFuture whenAllFinishes; - public JoinLifecycle(JoinBridge joinBridge, int probeFactoryCount, int outerFactoryCount) + private JoinLifecycle(ListenableFuture whenBuildFinishes, JoinBridge joinBridge, int probeFactoryCount, int outerFactoryCount) { // When all probe and lookup-outer operators finish, destroy the join bridge (freeing the memory) // * Each LookupOuterOperatorFactory count as 1 @@ -152,7 +159,7 @@ public JoinLifecycle(JoinBridge joinBridge, int probeFactoryCount, int outerFact // * Each probe operator count as 1 probeReferenceCount = new ReferenceCount(probeFactoryCount); - whenBuildAndProbeFinishes = Futures.whenAllSucceed(joinBridge.whenBuildFinishes(), probeReferenceCount.getFreeFuture()).call(() -> null, directExecutor()); + whenBuildAndProbeFinishes = Futures.whenAllSucceed(whenBuildFinishes, probeReferenceCount.getFreeFuture()).call(() -> null, directExecutor()); whenAllFinishes = Futures.whenAllSucceed(whenBuildAndProbeFinishes, outerReferenceCount.getFreeFuture()).call(() -> null, directExecutor()); whenAllFinishes.addListener(joinBridge::destroy, directExecutor()); } diff --git a/core/trino-main/src/main/java/io/trino/operator/join/JoinOperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/join/JoinOperatorFactory.java index 45bae8beedef..e9adcf765160 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/JoinOperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/JoinOperatorFactory.java @@ -13,6 +13,7 @@ */ package io.trino.operator.join; +import com.google.common.util.concurrent.ListenableFuture; import io.trino.operator.OperatorFactory; import java.util.Optional; @@ -20,4 +21,10 @@ public interface JoinOperatorFactory { Optional createOuterOperatorFactory(); + + /** + * Future that indicates when the build side of the join has been completed and probe processing + * can begin. Used by {@link OperatorFactory#pipelineDependenciesSatisfied()}. + */ + ListenableFuture buildPipelineReady(); } diff --git a/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java index 0b65be198371..9775bbca755d 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/LookupJoinOperatorFactory.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.primitives.Ints; +import com.google.common.util.concurrent.ListenableFuture; import io.trino.operator.HashGenerator; import io.trino.operator.JoinOperatorType; import io.trino.operator.OperatorFactory; @@ -164,6 +165,12 @@ public String getOperatorType() return LookupJoinOperator.class.getSimpleName(); } + @Override + public ListenableFuture buildPipelineReady() + { + return joinBridgeManager.getBuildFinishedFuture(); + } + @Override public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor sourcePages) { diff --git a/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java b/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java index 3a1d3d32b1cd..4862e66958da 100644 --- a/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java +++ b/core/trino-main/src/main/java/io/trino/operator/join/unspilled/LookupJoinOperatorFactory.java @@ -14,6 +14,7 @@ package io.trino.operator.join.unspilled; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; import io.trino.operator.JoinOperatorType; import io.trino.operator.OperatorFactory; import io.trino.operator.ProcessorContext; @@ -133,6 +134,12 @@ public String getOperatorType() return LookupJoinOperator.class.getSimpleName(); } + @Override + public ListenableFuture buildPipelineReady() + { + return joinBridgeManager.getBuildFinishedFuture(); + } + @Override public WorkProcessorOperator create(ProcessorContext processorContext, WorkProcessor sourcePages) {