diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala index 0e459fa657..1d161e113e 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/DefaultPlanner.scala @@ -1,6 +1,5 @@ package filodb.coordinator.queryplanner - import java.util.concurrent.ThreadLocalRandom import akka.serialization.SerializationExtension @@ -27,92 +26,88 @@ import filodb.query.exec.InternalRangeFunction.Last */ case class PlanResult(plans: Seq[ExecPlan], needsStitch: Boolean = false) -trait DefaultPlanner { - def queryConfig: QueryConfig - def dataset: Dataset - def schemas: Schemas - def dsOptions: DatasetOptions - private[queryplanner] val inProcessPlanDispatcher = InProcessPlanDispatcher(queryConfig) - def materializeVectorPlan(qContext: QueryContext, - lp: VectorPlan, - forceInProcess: Boolean = false): PlanResult = { - val vectors = walkLogicalPlanTree(lp.scalars, qContext, forceInProcess) - vectors.plans.foreach(_.addRangeVectorTransformer(VectorFunctionMapper())) - vectors - } +trait DefaultPlanner extends QueryPlanner { + def queryConfig: QueryConfig + def dataset: Dataset + def schemas: Schemas + def dsOptions: DatasetOptions + private[queryplanner] val inProcessPlanDispatcher = InProcessPlanDispatcher(queryConfig) + def materializeVectorPlan(qContext: QueryContext, + lp: VectorPlan, + forceInProcess: Boolean = false): PlanResult = { + val vectors = walkLogicalPlanTree(lp.scalars, qContext, forceInProcess) + vectors.plans.foreach(_.addRangeVectorTransformer(VectorFunctionMapper())) + vectors + } - def toChunkScanMethod(rangeSelector: RangeSelector): ChunkScanMethod = { - rangeSelector match { - case IntervalSelector(from, to) => TimeRangeChunkScan(from, to) - case AllChunksSelector => AllChunkScan - case WriteBufferSelector => WriteBufferChunkScan - case InMemoryChunksSelector => InMemoryChunkScan - case x@_ => throw new IllegalArgumentException(s"Unsupported range selector '$x' found") - } + def toChunkScanMethod(rangeSelector: RangeSelector): ChunkScanMethod = { + rangeSelector match { + case IntervalSelector(from, to) => TimeRangeChunkScan(from, to) + case AllChunksSelector => AllChunkScan + case WriteBufferSelector => WriteBufferChunkScan + case InMemoryChunksSelector => InMemoryChunkScan + case x@_ => throw new IllegalArgumentException(s"Unsupported range selector '$x' found") } + } - def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan - - - def materializeFunctionArgs(functionParams: Seq[FunctionArgsPlan], - qContext: QueryContext): Seq[FuncArgs] = functionParams map { - case num: ScalarFixedDoublePlan => StaticFuncArgs(num.scalar, num.timeStepParams) - case s: ScalarVaryingDoublePlan => ExecPlanFuncArgs(materialize(s, qContext), - RangeParams(s.startMs, s.stepMs, s.endMs)) - case t: ScalarTimeBasedPlan => TimeFuncArgs(t.rangeParams) - case s: ScalarBinaryOperation => ExecPlanFuncArgs(materialize(s, qContext), - RangeParams(s.startMs, s.stepMs, s.endMs)) - } - /** - * @param logicalPlan The LogicalPlan instance - * @param qContext The QueryContext - * @param forceInProcess if true, all materialized plans for this entire - * logical plan will dispatch via an InProcessDispatcher - * @return The PlanResult containing the ExecPlan - */ - def walkLogicalPlanTree(logicalPlan: LogicalPlan, - qContext: QueryContext, - forceInProcess: Boolean = false): PlanResult - - /** - * DefaultPlanner has logic to handle multiple LogicalPlans, classes implementing this trait may choose to override - * the behavior and delegate to the default implementation when no special implementation if required. - * The method is similar to walkLogicalPlanTree but deliberately chosen to have a different name to for the - * classes to implement walkLogicalPlanTree and explicitly delegate to defaultWalkLogicalPlanTree if needed. The - * method essentially pattern matches all LogicalPlans and invoke the default implementation in the - * DefaultPlanner trait - */ - // scalastyle:off cyclomatic.complexity - def defaultWalkLogicalPlanTree(logicalPlan: LogicalPlan, - qContext: QueryContext, - forceInProcess: Boolean = false): PlanResult = logicalPlan match { - - case lp: ApplyInstantFunction => this.materializeApplyInstantFunction(qContext, lp, forceInProcess) - case lp: ApplyInstantFunctionRaw => this.materializeApplyInstantFunctionRaw(qContext, lp, forceInProcess) - case lp: Aggregate => this.materializeAggregate(qContext, lp, forceInProcess) - case lp: BinaryJoin => this.materializeBinaryJoin(qContext, lp, forceInProcess) - case lp: ScalarVectorBinaryOperation => this.materializeScalarVectorBinOp(qContext, lp, forceInProcess) - - case lp: ApplyMiscellaneousFunction => this.materializeApplyMiscellaneousFunction(qContext, lp, forceInProcess) - case lp: ApplySortFunction => this.materializeApplySortFunction(qContext, lp, forceInProcess) - case lp: ScalarVaryingDoublePlan => this.materializeScalarPlan(qContext, lp, forceInProcess) - case lp: ScalarTimeBasedPlan => this.materializeScalarTimeBased(qContext, lp) - case lp: VectorPlan => this.materializeVectorPlan(qContext, lp, forceInProcess) - case lp: ScalarFixedDoublePlan => this.materializeFixedScalar(qContext, lp) - case lp: ApplyAbsentFunction => this.materializeAbsentFunction(qContext, lp, forceInProcess) - case lp: ApplyLimitFunction => this.materializeLimitFunction(qContext, lp, forceInProcess) - case lp: ScalarBinaryOperation => this.materializeScalarBinaryOperation(qContext, lp, forceInProcess) - case lp: SubqueryWithWindowing => this.materializeSubqueryWithWindowing(qContext, lp, forceInProcess) - case lp: TopLevelSubquery => this.materializeTopLevelSubquery(qContext, lp, forceInProcess) - case lp: PeriodicSeries => this.materializePeriodicSeries(qContext, lp, forceInProcess) - case lp: PeriodicSeriesWithWindowing => - this.materializePeriodicSeriesWithWindowing(qContext, lp, forceInProcess) - case _: RawSeries | - _: RawChunkMeta | - _: MetadataQueryPlan | - _: TsCardinalities => throw new IllegalArgumentException("Unsupported operation") - } + def materializeFunctionArgs(functionParams: Seq[FunctionArgsPlan], + qContext: QueryContext): Seq[FuncArgs] = functionParams map { + case num: ScalarFixedDoublePlan => StaticFuncArgs(num.scalar, num.timeStepParams) + case s: ScalarVaryingDoublePlan => ExecPlanFuncArgs(materialize(s, qContext), + RangeParams(s.startMs, s.stepMs, s.endMs)) + case t: ScalarTimeBasedPlan => TimeFuncArgs(t.rangeParams) + case s: ScalarBinaryOperation => ExecPlanFuncArgs(materialize(s, qContext), + RangeParams(s.startMs, s.stepMs, s.endMs)) + } + /** + * @param logicalPlan The LogicalPlan instance + * @param qContext The QueryContext + * @param forceInProcess if true, all materialized plans for this entire + * logical plan will dispatch via an InProcessDispatcher + * @return The PlanResult containing the ExecPlan + */ + def walkLogicalPlanTree(logicalPlan: LogicalPlan, + qContext: QueryContext, + forceInProcess: Boolean = false): PlanResult + /** + * DefaultPlanner has logic to handle multiple LogicalPlans, classes implementing this trait may choose to override + * the behavior and delegate to the default implementation when no special implementation if required. + * The method is similar to walkLogicalPlanTree but deliberately chosen to have a different name to for the + * classes to implement walkLogicalPlanTree and explicitly delegate to defaultWalkLogicalPlanTree if needed. The + * method essentially pattern matches all LogicalPlans and invoke the default implementation in the + * DefaultPlanner trait + */ + // scalastyle:off cyclomatic.complexity + def defaultWalkLogicalPlanTree(logicalPlan: LogicalPlan, + qContext: QueryContext, + forceInProcess: Boolean = false): PlanResult = logicalPlan match { + + case lp: ApplyInstantFunction => this.materializeApplyInstantFunction(qContext, lp, forceInProcess) + case lp: ApplyInstantFunctionRaw => this.materializeApplyInstantFunctionRaw(qContext, lp, forceInProcess) + case lp: Aggregate => this.materializeAggregate(qContext, lp, forceInProcess) + case lp: BinaryJoin => this.materializeBinaryJoin(qContext, lp, forceInProcess) + case lp: ScalarVectorBinaryOperation => this.materializeScalarVectorBinOp(qContext, lp, forceInProcess) + + case lp: ApplyMiscellaneousFunction => this.materializeApplyMiscellaneousFunction(qContext, lp, forceInProcess) + case lp: ApplySortFunction => this.materializeApplySortFunction(qContext, lp, forceInProcess) + case lp: ScalarVaryingDoublePlan => this.materializeScalarPlan(qContext, lp, forceInProcess) + case lp: ScalarTimeBasedPlan => this.materializeScalarTimeBased(qContext, lp) + case lp: VectorPlan => this.materializeVectorPlan(qContext, lp, forceInProcess) + case lp: ScalarFixedDoublePlan => this.materializeFixedScalar(qContext, lp) + case lp: ApplyAbsentFunction => this.materializeAbsentFunction(qContext, lp, forceInProcess) + case lp: ApplyLimitFunction => this.materializeLimitFunction(qContext, lp, forceInProcess) + case lp: ScalarBinaryOperation => this.materializeScalarBinaryOperation(qContext, lp, forceInProcess) + case lp: SubqueryWithWindowing => this.materializeSubqueryWithWindowing(qContext, lp, forceInProcess) + case lp: TopLevelSubquery => this.materializeTopLevelSubquery(qContext, lp, forceInProcess) + case lp: PeriodicSeries => this.materializePeriodicSeries(qContext, lp, forceInProcess) + case lp: PeriodicSeriesWithWindowing => + this.materializePeriodicSeriesWithWindowing(qContext, lp, forceInProcess) + case _: RawSeries | + _: RawChunkMeta | + _: MetadataQueryPlan | + _: TsCardinalities => throw new IllegalArgumentException("Unsupported operation") + } private[queryplanner] def materializePeriodicSeriesWithWindowing(qContext: QueryContext, lp: PeriodicSeriesWithWindowing, @@ -221,100 +216,99 @@ trait DefaultPlanner { rawSeries } - def materializeApplyInstantFunction(qContext: QueryContext, - lp: ApplyInstantFunction, - forceInProcess: Boolean = false): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - val paramsExec = materializeFunctionArgs(lp.functionArgs, qContext) - vectors.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(lp.function, paramsExec))) - vectors - } - - def materializeApplyMiscellaneousFunction(qContext: QueryContext, - lp: ApplyMiscellaneousFunction, - forceInProcess: Boolean = false): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) { - // Optimize with aggregation is a no-op, doing no transformation. It must pass through - // the execution plan to apply optimization logic correctly during aggregation. - vectors - } else { - if (lp.function == MiscellaneousFunctionId.HistToPromVectors) - vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) - else - vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) - vectors - } - } + def materializeApplyInstantFunction(qContext: QueryContext, + lp: ApplyInstantFunction, + forceInProcess: Boolean = false): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) + val paramsExec = materializeFunctionArgs(lp.functionArgs, qContext) + vectors.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(lp.function, paramsExec))) + vectors + } - def materializeApplyInstantFunctionRaw(qContext: QueryContext, - lp: ApplyInstantFunctionRaw, - forceInProcess: Boolean = false): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - val paramsExec = materializeFunctionArgs(lp.functionArgs, qContext) - vectors.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(lp.function, paramsExec))) + def materializeApplyMiscellaneousFunction(qContext: QueryContext, + lp: ApplyMiscellaneousFunction, + forceInProcess: Boolean = false): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) + if (lp.function == MiscellaneousFunctionId.OptimizeWithAgg) { + // Optimize with aggregation is a no-op, doing no transformation. It must pass through + // the execution plan to apply optimization logic correctly during aggregation. vectors - } - - def materializeScalarVectorBinOp(qContext: QueryContext, - lp: ScalarVectorBinaryOperation, - forceInProcess: Boolean = false): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vector, qContext, forceInProcess) - val funcArg = materializeFunctionArgs(Seq(lp.scalarArg), qContext) - vectors.plans.foreach(_.addRangeVectorTransformer(ScalarOperationMapper(lp.operator, lp.scalarIsLhs, funcArg))) + } else { + if (lp.function == MiscellaneousFunctionId.HistToPromVectors) + vectors.plans.foreach(_.addRangeVectorTransformer(HistToPromSeriesMapper(schemas.part))) + else + vectors.plans.foreach(_.addRangeVectorTransformer(MiscellaneousFunctionMapper(lp.function, lp.stringArgs))) vectors } + } - def materializeApplySortFunction(qContext: QueryContext, - lp: ApplySortFunction, - forceInProcess: Boolean = false): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - if (vectors.plans.length > 1) { - val targetActor = PlannerUtil.pickDispatcher(vectors.plans) - val topPlan = LocalPartitionDistConcatExec(qContext, targetActor, vectors.plans) - topPlan.addRangeVectorTransformer(SortFunctionMapper(lp.function)) - PlanResult(Seq(topPlan), vectors.needsStitch) - } else { - vectors.plans.foreach(_.addRangeVectorTransformer(SortFunctionMapper(lp.function))) - vectors - } - } + def materializeApplyInstantFunctionRaw(qContext: QueryContext, + lp: ApplyInstantFunctionRaw, + forceInProcess: Boolean = false): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) + val paramsExec = materializeFunctionArgs(lp.functionArgs, qContext) + vectors.plans.foreach(_.addRangeVectorTransformer(InstantVectorFunctionMapper(lp.function, paramsExec))) + vectors + } - def materializeScalarPlan(qContext: QueryContext, - lp: ScalarVaryingDoublePlan, - forceInProcess: Boolean = false): PlanResult = { - val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) - if (vectors.plans.length > 1) { - val targetActor = PlannerUtil.pickDispatcher(vectors.plans) - val topPlan = LocalPartitionDistConcatExec(qContext, targetActor, vectors.plans) - topPlan.addRangeVectorTransformer(ScalarFunctionMapper(lp.function, - RangeParams(lp.startMs, lp.stepMs, lp.endMs))) - PlanResult(Seq(topPlan), vectors.needsStitch) - } else { - vectors.plans.foreach(_.addRangeVectorTransformer(ScalarFunctionMapper(lp.function, - RangeParams(lp.startMs, lp.stepMs, lp.endMs)))) - vectors - } + def materializeScalarVectorBinOp(qContext: QueryContext, + lp: ScalarVectorBinaryOperation, + forceInProcess: Boolean = false): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vector, qContext, forceInProcess) + val funcArg = materializeFunctionArgs(Seq(lp.scalarArg), qContext) + vectors.plans.foreach(_.addRangeVectorTransformer(ScalarOperationMapper(lp.operator, lp.scalarIsLhs, funcArg))) + vectors + } + + def materializeApplySortFunction(qContext: QueryContext, + lp: ApplySortFunction, + forceInProcess: Boolean = false): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) + if (vectors.plans.length > 1) { + val targetActor = PlannerUtil.pickDispatcher(vectors.plans) + val topPlan = LocalPartitionDistConcatExec(qContext, targetActor, vectors.plans) + topPlan.addRangeVectorTransformer(SortFunctionMapper(lp.function)) + PlanResult(Seq(topPlan), vectors.needsStitch) + } else { + vectors.plans.foreach(_.addRangeVectorTransformer(SortFunctionMapper(lp.function))) + vectors } + } - def addAbsentFunctionMapper(vectors: PlanResult, - columnFilters: Seq[ColumnFilter], - rangeParams: RangeParams, - queryContext: QueryContext): PlanResult = { - vectors.plans.foreach(_.addRangeVectorTransformer(AbsentFunctionMapper(columnFilters, rangeParams, - dsOptions.metricColumn ))) + def materializeScalarPlan(qContext: QueryContext, + lp: ScalarVaryingDoublePlan, + forceInProcess: Boolean = false): PlanResult = { + val vectors = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) + if (vectors.plans.length > 1) { + val targetActor = PlannerUtil.pickDispatcher(vectors.plans) + val topPlan = LocalPartitionDistConcatExec(qContext, targetActor, vectors.plans) + topPlan.addRangeVectorTransformer(ScalarFunctionMapper(lp.function, + RangeParams(lp.startMs, lp.stepMs, lp.endMs))) + PlanResult(Seq(topPlan), vectors.needsStitch) + } else { + vectors.plans.foreach(_.addRangeVectorTransformer(ScalarFunctionMapper(lp.function, + RangeParams(lp.startMs, lp.stepMs, lp.endMs)))) vectors } + } - // scalastyle:off method.length + def addAbsentFunctionMapper(vectors: PlanResult, + columnFilters: Seq[ColumnFilter], + rangeParams: RangeParams, + queryContext: QueryContext): PlanResult = { + vectors.plans.foreach(_.addRangeVectorTransformer(AbsentFunctionMapper(columnFilters, rangeParams, + dsOptions.metricColumn ))) + vectors + } + + // scalastyle:off method.length /** * @param forceRootDispatcher if occupied, the dispatcher used at the root reducer node. */ - def addAggregator(lp: Aggregate, + def addAggregator(lp: Aggregate, qContext: QueryContext, toReduceLevel: PlanResult, - forceRootDispatcher: Option[PlanDispatcher] = None): - LocalPartitionReduceAggregateExec = { + forceRootDispatcher: Option[PlanDispatcher] = None): LocalPartitionReduceAggregateExec = { // Now we have one exec plan per shard /* @@ -400,7 +394,7 @@ trait DefaultPlanner { } } - def materializeAggregate(qContext: QueryContext, + def materializeAggregate(qContext: QueryContext, lp: Aggregate, forceInProcess: Boolean = false): PlanResult = { val toReduceLevel1 = walkLogicalPlanTree(lp.vectors, qContext, forceInProcess) @@ -408,7 +402,7 @@ trait DefaultPlanner { PlanResult(Seq(reducer)) // since we have aggregated, no stitching } - def materializeFixedScalar(qContext: QueryContext, + def materializeFixedScalar(qContext: QueryContext, lp: ScalarFixedDoublePlan): PlanResult = { val scalarFixedDoubleExec = ScalarFixedDoubleExec(qContext, dataset = dataset.ref, lp.timeStepParams, lp.scalar, inProcessPlanDispatcher) @@ -448,7 +442,7 @@ trait DefaultPlanner { // different from start/end of the inner logical plan. This is rather confusing, the intent, however, was to keep // query context "original" without modification, so, as to capture the original intent of the user. This, however, // does not hold true across the entire code base, there are a number of place where we modify query context. - val innerExecPlan = walkLogicalPlanTree(sqww.innerPeriodicSeries, qContext, forceInProcess) + val innerExecPlan = getRootPlanner().get.materialize(sqww.innerPeriodicSeries, qContext) if (sqww.functionId != RangeFunctionId.AbsentOverTime) { val rangeFn = InternalRangeFunction.lpToInternalFunc(sqww.functionId) val paramsExec = materializeFunctionArgs(sqww.functionArgs, qContext) @@ -463,19 +457,19 @@ trait DefaultPlanner { rawSource = false, leftInclusiveWindow = true ) - innerExecPlan.plans.foreach { p => { - p.addRangeVectorTransformer(rangeVectorTransformer) - sqww.atMs.map(_ => p.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, sqww.stepMs, sqww.endMs - , p.queryWithPlanName(qContext)))) - }} - innerExecPlan + innerExecPlan.addRangeVectorTransformer(rangeVectorTransformer) + sqww.atMs.foreach { _ => + innerExecPlan.addRangeVectorTransformer(RepeatTransformer(sqww.startMs, + sqww.stepMs, sqww.endMs, innerExecPlan.queryWithPlanName(qContext))) + } + PlanResult(Seq(innerExecPlan)) } else { val innerPlan = sqww.innerPeriodicSeries - createAbsentOverTimePlan(innerExecPlan, innerPlan, qContext, window, sqww.offsetMs, sqww) + createAbsentOverTimePlan(PlanResult(Seq(innerExecPlan)), innerPlan, qContext, window, sqww.offsetMs, sqww) } } - def createAbsentOverTimePlan( innerExecPlan: PlanResult, + def createAbsentOverTimePlan( innerExecPlan: PlanResult, innerPlan: PeriodicSeriesPlan, qContext: QueryContext, window: Option[Long], @@ -533,7 +527,7 @@ trait DefaultPlanner { // is optimal, if there is no overlap and even worse significant gap between the individual subqueries, retrieving // the entire range might be suboptimal, this still might be a better option than issuing and concatenating numerous // subqueries separately - walkLogicalPlanTree(tlsq.innerPeriodicSeries, qContext, forceInProcess) + PlanResult(Seq(getRootPlanner().get.materialize(tlsq.innerPeriodicSeries, qContext))) } def materializeScalarTimeBased(qContext: QueryContext, @@ -543,7 +537,7 @@ trait DefaultPlanner { PlanResult(Seq(scalarTimeBasedExec)) } - def materializeScalarBinaryOperation(qContext: QueryContext, + def materializeScalarBinaryOperation(qContext: QueryContext, lp: ScalarBinaryOperation, forceInProcess: Boolean = false): PlanResult = { val lhs = if (lp.lhs.isRight) { diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/HighAvailabilityPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/HighAvailabilityPlanner.scala index 492d469ce4..5790fb4f93 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/HighAvailabilityPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/HighAvailabilityPlanner.scala @@ -52,6 +52,14 @@ class HighAvailabilityPlanner(dsRef: DatasetRef, import QueryFailureRoutingStrategy._ import LogicalPlan._ + def childPlanners(): Seq[QueryPlanner] = Seq(localPlanner) + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() + // legacy failover counter captures failovers when we send a PromQL to the buddy // cluster val legacyFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala index 1c92b62940..cb64c75b69 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/LongTimeRangePlanner.scala @@ -12,7 +12,6 @@ import filodb.query.exec._ * LongTimeRangePlanner knows about limited retention of raw data, and existence of downsampled data. * For any query that arrives beyond the retention period of raw data, it splits the query into * two time ranges - latest time range from raw data, and old time range from downsampled data. - * It then stitches the subquery results to present top level query results for entire time range. * * @param rawClusterPlanner this planner (typically a SingleClusterPlanner) abstracts planning for raw cluster data * @param downsampleClusterPlanner this planner (typically a SingleClusterPlanner) @@ -38,6 +37,13 @@ import filodb.query.exec._ override val schemas: Schemas = Schemas(dataset.schema) override val dsOptions: DatasetOptions = schemas.part.options + def childPlanners(): Seq[QueryPlanner] = Seq(rawClusterPlanner, downsampleClusterPlanner) + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() private def materializePeriodicSeriesPlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan) = { val execPlan = if (!periodicSeriesPlan.isRoutable) @@ -65,7 +71,6 @@ import filodb.query.exec._ } } - // scalastyle:off method.length private def materializeRoutablePlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan): ExecPlan = { import LogicalPlan._ @@ -114,15 +119,17 @@ import filodb.query.exec._ downsampleClusterPlanner.materialize(periodicSeriesPlan, qContext) } else if (startWithOffsetMs - lookbackMs >= earliestRawTime) { // full time range in raw cluster rawClusterPlanner.materialize(periodicSeriesPlan, qContext) + } else if (LogicalPlan.hasSubqueryWithWindowing(periodicSeriesPlan)) { + // eventually the subquery will be delegated to the root planner + super.defaultWalkLogicalPlanTree(periodicSeriesPlan, qContext).plans.head + } else if ( // "(endWithOffsetMs - lookbackMs) < earliestRawTime" check is erroneous, we claim that we have // a long lookback only if the last lookback window overlaps with earliestRawTime, however, we // should check for ANY interval overalapping earliestRawTime. We // can happen with ANY lookback interval, not just the last one. - } else if ( - endWithOffsetMs - lookbackMs < earliestRawTime || //TODO lookbacks can overlap in the middle intervals too - LogicalPlan.hasSubqueryWithWindowing(periodicSeriesPlan) + endWithOffsetMs - lookbackMs < earliestRawTime //TODO lookbacks can overlap in the middle intervals too ) { - // For subqueries and long lookback queries, we keep things simple by routing to + // For long lookback queries, we keep things simple by routing to // downsample cluster since dealing with lookback windows across raw/downsample // clusters is quite complex and is not in scope now. We omit recent instants for which downsample // cluster may not have complete data @@ -276,12 +283,17 @@ import filodb.query.exec._ } // scalastyle:off cyclomatic.complexity + // scalastyle:off method.length override def walkLogicalPlanTree(logicalPlan: LogicalPlan, qContext: QueryContext, forceInProcess: Boolean = false): PlanResult = { if (!LogicalPlanUtils.hasBinaryJoin(logicalPlan)) { logicalPlan match { + // for subqueries, we need to delegate to the root planner since the logic resides there already + case t: TopLevelSubquery => super.materializeTopLevelSubquery(qContext, t) + case s: SubqueryWithWindowing => super.materializeSubqueryWithWindowing(qContext, s) + case p: PeriodicSeriesPlan => materializePeriodicSeriesPlan(qContext, p) case lc: LabelCardinality => materializeLabelCardinalityPlan(lc, qContext) case ts: TsCardinalities => materializeTSCardinalityPlan(qContext, ts) @@ -314,7 +326,7 @@ import filodb.query.exec._ case lp: ScalarFixedDoublePlan => super.materializeFixedScalar(qContext, lp) case lp: ApplyAbsentFunction => super.materializeAbsentFunction(qContext, lp) case lp: ScalarBinaryOperation => super.materializeScalarBinaryOperation(qContext, lp) - case lp: SubqueryWithWindowing => materializePeriodicSeriesPlan(qContext, lp) + case lp: SubqueryWithWindowing => throw new IllegalStateException("Already handled subquery with windowing") case lp: TopLevelSubquery => super.materializeTopLevelSubquery(qContext, lp) case lp: ApplyLimitFunction => rawClusterMaterialize(qContext, lp) case lp: LabelNames => rawClusterMaterialize(qContext, lp) @@ -324,6 +336,7 @@ import filodb.query.exec._ } override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { + require(getRootPlanner().isDefined, "Root planner not set. Internal error.") walkLogicalPlanTree(logicalPlan, qContext).plans.head } } diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala index bc19476bec..2ff91082bb 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/MultiPartitionPlanner.scala @@ -64,6 +64,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv override val schemas: Schemas = Schemas(dataset.schema) override val dsOptions: DatasetOptions = schemas.part.options + def childPlanners(): Seq[QueryPlanner] = Seq(localPartitionPlanner) + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() + val plannerSelector: String = queryConfig.plannerSelector .getOrElse(throw new IllegalArgumentException("plannerSelector is mandatory")) @@ -96,6 +104,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv // plannerHelper.materializeX(x) // } // } + require(getRootPlanner().isDefined, "Root planner not set. Internal error.") val tsdbQueryParams = qContext.origQueryParams if( diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/QueryPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/QueryPlanner.scala index cb062d11b9..36f6de73d3 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/QueryPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/QueryPlanner.scala @@ -1,5 +1,6 @@ package filodb.coordinator.queryplanner +import scala.collection.mutable import scala.concurrent.duration.FiniteDuration import kamon.Kamon @@ -11,6 +12,7 @@ import filodb.core.query.QueryContext import filodb.query.{LogicalPlan, QueryResponse, StreamQueryResponse} import filodb.query.exec.{ClientParams, ExecPlan, ExecPlanWithClientParams, UnsupportedChunkSource} + /** * Abstraction for Query Planning. QueryPlanners can be composed using decorator pattern to add capabilities. */ @@ -56,4 +58,30 @@ trait QueryPlanner { } } + /** + * Returns the child planners of this planner. + */ + def childPlanners(): Seq[QueryPlanner] + + /** + * Returns the root planner of the planner tree. Root planner is needed to plan subqueries from the leaf planners. + */ + def getRootPlanner(): Option[QueryPlanner] + + def setRootPlanner(rootPlanner: QueryPlanner): Unit + + /** + * Uses Depth first traversal to initialize the root planner in the planner tree. + * Must call on the root planner once the planner tree is constructed. + */ + def initRootPlanner(): Unit = { + val q = mutable.Queue[QueryPlanner]() + q.enqueue(this) + while (q.nonEmpty) { + val p = q.dequeue() + p.setRootPlanner(this) + p.childPlanners().foreach(c => q.enqueue(c)) + } + } + } diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala index 01ba33ca4a..0d5a7de54d 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlanner.scala @@ -36,6 +36,14 @@ class ShardKeyRegexPlanner(val dataset: Dataset, _targetSchemaProvider: TargetSchemaProvider = StaticTargetSchemaProvider()) extends PartitionLocationPlanner(dataset, partitionLocationProvider) { + def childPlanners(): Seq[QueryPlanner] = Seq(queryPlanner) + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() + override def queryConfig: QueryConfig = config override val schemas: Schemas = Schemas(dataset.schema) override val dsOptions: DatasetOptions = schemas.part.options @@ -98,6 +106,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset, * @return materialized Execution Plan which can be dispatched */ override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { + require(getRootPlanner().isDefined, "Root planner not set. Internal error.") val nonMetricShardKeyFilters = LogicalPlan.getNonMetricShardKeyFilters(logicalPlan, dataset.options.nonMetricShardColumns) if (isMetadataQuery(logicalPlan) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala index 083f9e45cb..7001b05457 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SingleClusterPlanner.scala @@ -65,6 +65,13 @@ class SingleClusterPlanner(val dataset: Dataset, override val dsOptions: DatasetOptions = schemas.part.options private val shardColumns = dsOptions.shardKeyColumns.sorted private val dsRef = dataset.ref + def childPlanners(): Seq[QueryPlanner] = Nil + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() private val shardPushdownCache: Option[Cache[(LogicalPlan, Option[Seq[Int]]), Option[Set[Int]]]] = if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) { @@ -305,6 +312,7 @@ class SingleClusterPlanner(val dataset: Dataset, } def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { + require(getRootPlanner().isDefined, "Root planner not set. Internal error.") val plannerParams = qContext.plannerParams val updatedPlan = updateStartTime(logicalPlan) if (updatedPlan.isEmpty) EmptyResultExec(qContext, dsRef, inProcessPlanDispatcher) diff --git a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SinglePartitionPlanner.scala b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SinglePartitionPlanner.scala index aab22c2ce2..44270f811b 100644 --- a/coordinator/src/main/scala/filodb.coordinator/queryplanner/SinglePartitionPlanner.scala +++ b/coordinator/src/main/scala/filodb.coordinator/queryplanner/SinglePartitionPlanner.scala @@ -20,11 +20,21 @@ class SinglePartitionPlanner(planners: Map[String, QueryPlanner], val queryConfig: QueryConfig) extends QueryPlanner with DefaultPlanner { + def childPlanners(): Seq[QueryPlanner] = planners.values.toSeq + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() + override val schemas: Schemas = Schemas(dataset.schema) override val dsOptions: DatasetOptions = schemas.part.options - def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = + def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { + require(getRootPlanner().isDefined, "Root planner not set. Internal error.") walkLogicalPlanTree(logicalPlan, qContext).plans.head + } /** * Returns planner for first metric in logical plan diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala index b582ea71c2..cc881444df 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/LongTimeRangePlannerSpec.scala @@ -37,12 +37,26 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { new MockExecPlan("raw", logicalPlan) } + def childPlanners(): Seq[QueryPlanner] = Nil + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() } val downsamplePlanner = new QueryPlanner { override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { new MockExecPlan("downsample", logicalPlan) } + def childPlanners(): Seq[QueryPlanner] = Nil + private var rootPlanner: Option[QueryPlanner] = None + def getRootPlanner(): Option[QueryPlanner] = rootPlanner + def setRootPlanner(rootPlanner: QueryPlanner): Unit = { + this.rootPlanner = Some(rootPlanner) + } + initRootPlanner() } val rawRetention = 10.minutes @@ -88,7 +102,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan] ep.name shouldEqual "raw" - ep.lp shouldEqual logicalPlan + ep.lp shouldEqual logicalPlan.asInstanceOf[TopLevelSubquery].innerPeriodicSeries } it("should direct raw-cluster-only range function subqueries to raw planner") { @@ -99,7 +113,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan] ep.name shouldEqual "raw" - ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing] + ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing].innerPeriodicSeries } it("should direct downsample-only queries to downsample planner") { @@ -117,7 +131,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan] ep.name shouldEqual "downsample" - ep.lp shouldEqual logicalPlan + ep.lp shouldEqual logicalPlan.asInstanceOf[TopLevelSubquery].innerPeriodicSeries } it("should directed downsample-only range function subqueries to downsample planner") { @@ -126,7 +140,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan] ep.name shouldEqual "downsample" - ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing] + ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing].innerPeriodicSeries } it("should direct overlapping instant queries correctly to raw or downsample clusters") { @@ -216,9 +230,8 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat val ep = longTermPlanner.materialize(logicalPlan, QueryContext()) val exp = ep.asInstanceOf[MockExecPlan] exp.name shouldEqual "downsample" - val downsampleLp = exp.lp.asInstanceOf[PeriodicSeriesPlan] - downsampleLp.startMs shouldEqual logicalPlan.startMs - downsampleLp.endMs shouldEqual logicalPlan.endMs + exp.lp.asInstanceOf[PeriodicSeriesPlan] + // skipping check of startMs and endMs as we are not using a proper root planner to plan the subquery } def getStartForSubquery(startMs: Long, stepMs: Long) : Long = { diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala index f40712935b..892a222d1a 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/MultiPartitionPlannerSpec.scala @@ -475,13 +475,12 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true)) ) val expectedPlan = { - """E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#64113238],raw) - |-T~PeriodicSamplesMapper(start=1200000, step=120000, end=1800000, window=Some(600000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None) + """T~PeriodicSamplesMapper(start=1200000, step=120000, end=1800000, window=Some(600000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None) + |-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-742845126],raw) |--T~PeriodicSamplesMapper(start=600000, step=60000, end=1800000, window=None, functionId=None, rawSource=true, offsetMs=None) - |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#64113238],raw) - |-T~PeriodicSamplesMapper(start=1200000, step=120000, end=1800000, window=Some(600000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-742845126],raw) |--T~PeriodicSamplesMapper(start=600000, step=60000, end=1800000, window=None, functionId=None, rawSource=true, offsetMs=None) - |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#64113238],raw)""".stripMargin + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-742845126],raw)""".stripMargin } validatePlan(execPlan, expectedPlan) } diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala index ad6dd37256..5dcd7b26b4 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/PlannerHierarchySpec.scala @@ -177,10 +177,9 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS private val targetSchemaProvider = StaticTargetSchemaProvider() - val rootPlanner = new ShardKeyRegexPlanner(dataset, oneRemoteMultiPartitionPlanner, shardKeyMatcherFn, oneRemotePartitionLocationProvider, queryConfig) val oneRemoteRootPlanner = new ShardKeyRegexPlanner(dataset, oneRemoteMultiPartitionPlanner, oneRemoteShardKeyMatcherFn, oneRemotePartitionLocationProvider, queryConfig) val twoRemoteRootPlanner = new ShardKeyRegexPlanner(dataset, twoRemoteMultiPartitionPlanner, twoRemoteShardKeyMatcherFn, twoRemotePartitionLocationProvider, queryConfig) - + val rootPlanner = new ShardKeyRegexPlanner(dataset, oneRemoteMultiPartitionPlanner, shardKeyMatcherFn, oneRemotePartitionLocationProvider, queryConfig) private val startSeconds = now / 1000 - 10.days.toSeconds private val endSeconds = now / 1000 @@ -1201,6 +1200,105 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS validatePlan(execPlan, expected) } + it("verification of common subquery cases: should generate plan for subquery spanning both raw and downsample") { + val lp = Parser.queryRangeToLogicalPlan( + """last_over_time_is_mad_outlier(3, 1, sum(rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[2m]))[15m:5m])""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + + val expected = + """T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(900000), functionId=Some(LastOverTimeIsMadOutlier), rawSource=false, offsetMs=None) + |-FA1~StaticFuncArgs(3.0,RangeParams(1633913330,300,1634777330)) + | + |-FA2~StaticFuncArgs(1.0,RangeParams(1633913330,300,1634777330)) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634172900,300,1634777100)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#655288488],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634172900000, step=300000, end=1634777100000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172780000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#655288488],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634172900000, step=300000, end=1634777100000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172780000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#655288488],raw) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1633912500,300,1634172600)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#655288488],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633912500000, step=300000, end=1634172600000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912380000,1634172600000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#655288488],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633912500000, step=300000, end=1634172600000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912380000,1634172600000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#655288488],downsample)""".stripMargin + + validatePlan(execPlan, expected) + } + + it("verification of common subquery cases: should generate plan with long subquery range spanning both raw and downsample") { + val lp = Parser.queryRangeToLogicalPlan( + """last_over_time_is_mad_outlier(3, 1, sum(rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[2m]))[10d:5m])""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + + val expected = + """T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(864000000), functionId=Some(LastOverTimeIsMadOutlier), rawSource=false, offsetMs=None) + |-FA1~StaticFuncArgs(3.0,RangeParams(1633913330,300,1634777330)) + | + |-FA2~StaticFuncArgs(1.0,RangeParams(1633913330,300,1634777330)) + |-E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634172900,300,1634777100)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#687588407],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634172900000, step=300000, end=1634777100000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172780000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#687588407],raw) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1634172900000, step=300000, end=1634777100000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172780000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#687588407],raw) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1633049400,300,1634172600)) + |---E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#687588407],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633049400000, step=300000, end=1634172600000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633049280000,1634172600000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#687588407],downsample) + |----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-----T~PeriodicSamplesMapper(start=1633049400000, step=300000, end=1634172600000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633049280000,1634172600000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#687588407],downsample)""".stripMargin + + validatePlan(execPlan, expected) + } + + it("should generate plan for wrapped windowing subquery spanning both raw and downsample") { + val lp = Parser.queryRangeToLogicalPlan( + """min(last_over_time_is_mad_outlier(3, 1, sum(rate(foo{_ws_ = "demo", _ns_ = "localNs", instance = "Inst-1" }[2m]))[15m:5m]))""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + + val expected = + """T~AggregatePresenter(aggrOp=Min, aggrParams=List(), rangeParams=RangeParams(1633913330,300,1634777330)) + |-E~LocalPartitionReduceAggregateExec(aggrOp=Min, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |--T~AggregateMapReduce(aggrOp=Min, aggrParams=List(), without=List(), by=List()) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(900000), functionId=Some(LastOverTimeIsMadOutlier), rawSource=false, offsetMs=None) + |----FA1~StaticFuncArgs(3.0,RangeParams(1633913330,300,1634777330)) + | + |----FA2~StaticFuncArgs(1.0,RangeParams(1633913330,300,1634777330)) + |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-----T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1634172900,300,1634777100)) + |------E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#567974005],raw) + |-------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |--------T~PeriodicSamplesMapper(start=1634172900000, step=300000, end=1634777100000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172780000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#567974005],raw) + |-------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |--------T~PeriodicSamplesMapper(start=1634172900000, step=300000, end=1634777100000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172780000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#567974005],raw) + |-----T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(1633912500,300,1634172600)) + |------E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#567974005],downsample) + |-------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |--------T~PeriodicSamplesMapper(start=1633912500000, step=300000, end=1634172600000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912380000,1634172600000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#567974005],downsample) + |-------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |--------T~PeriodicSamplesMapper(start=1633912500000, step=300000, end=1634172600000, window=Some(120000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912380000,1634172600000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#567974005],downsample)""".stripMargin + + validatePlan(execPlan, expected) + } + it("should generate plan for raw query spanning multiple partitions with namespace regex, and push down aggregations") { val query = """sum(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" }) @@ -1467,7 +1565,7 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS // TODO max_over_time should have been pushed down for subqueries but ShardKeyRegexPlanner // in materializeOthers() creates MultiPartitionDistConcatExec - it("verification of common subquery cases: nested subquery") { + it("verification of common subquery cases: nested subquery") { val query ="""max_over_time(deriv(rate(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" }[1m])[5m:1m])[1h:1m])""" val endSecs = 1634775000L val queryParams = PromQlQueryParams(query, endSecs, 0, endSecs) @@ -1488,6 +1586,303 @@ class PlannerHierarchySpec extends AnyFunSpec with Matchers with PlanValidationS validatePlan(execPlan, expected) } + it("verification of common subquery cases: nested subquery over raw and downsample") { + val lp = Parser.queryRangeToLogicalPlan( + """max_over_time(deriv(rate(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" }[1m])[5m:1m])[1h:1m])""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + + val expected = + """T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(3600000), functionId=Some(MaxOverTime), rawSource=false, offsetMs=None) + |-T~PeriodicSamplesMapper(start=1633909740000, step=60000, end=1634777280000, window=Some(300000), functionId=Some(Deriv), rawSource=false, offsetMs=None) + |--E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],raw) + |-----T~PeriodicSamplesMapper(start=1634172600000, step=60000, end=1634777280000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],raw) + |-----T~PeriodicSamplesMapper(start=1634172600000, step=60000, end=1634777280000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],raw) + |----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],downsample) + |-----T~PeriodicSamplesMapper(start=1633909440000, step=60000, end=1634172540000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633909380000,1634172540000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],downsample) + |-----T~PeriodicSamplesMapper(start=1633909440000, step=60000, end=1634172540000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633909380000,1634172540000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],downsample) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],raw) + |-----T~PeriodicSamplesMapper(start=1634172600000, step=60000, end=1634777280000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],raw) + |-----T~PeriodicSamplesMapper(start=1634172600000, step=60000, end=1634777280000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],raw) + |----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],downsample) + |-----T~PeriodicSamplesMapper(start=1633909440000, step=60000, end=1634172540000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633909380000,1634172540000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],downsample) + |-----T~PeriodicSamplesMapper(start=1633909440000, step=60000, end=1634172540000, window=Some(60000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633909380000,1634172540000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#1676152650],downsample)""".stripMargin + validatePlan(execPlan, expected) + } + + it("TopLevel subquery of a binary join, over raw and downsample with a namespace regex query") { + val lp = Parser.queryRangeToLogicalPlan( + """(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" } + bar{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" })[10d:15m]""", + TimeStepParams(endSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expected = """E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |-E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |-E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |--E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |----T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],raw) + |---E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample) + |----T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#598660990],downsample)""".stripMargin + validatePlan(execPlan, expected) + } + + it("TopLevel subquery of a binary join, over raw and downsample") { + val lp = Parser.queryRangeToLogicalPlan( + """(foo{_ws_ = "demo", _ns_ = "remoteNs", instance = "Inst-1" } + bar{_ws_ = "demo", _ns_ = "remoteNs", instance = "Inst-1" })[10d:15m]""", + TimeStepParams(endSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + val expected = """E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],raw) + |--T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],raw) + |--T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],raw) + |--T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],raw) + |--T~PeriodicSamplesMapper(start=1634173200000, step=900000, end=1634777100000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172900000,1634777100000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],raw) + |-E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],downsample) + |--T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],downsample) + |--T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],downsample) + |--T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],downsample) + |--T~PeriodicSamplesMapper(start=1633914000000, step=900000, end=1634172300000, window=None, functionId=None, rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913700000,1634172300000), filters=List(ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs)), ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#2005229512],downsample)""".stripMargin + validatePlan(execPlan, expected) + } + + it("Binary join of two subqueries, over raw and downsample") { + val lp = Parser.queryRangeToLogicalPlan( + """max_over_time(sum(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" })[5m:1m]) + max_over_time(sum(bar{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" })[5m:1m])""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + + val expected = + """E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |-T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(300000), functionId=Some(MaxOverTime), rawSource=false, offsetMs=None) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(100,1,1000)) + |---E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |-T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(300000), functionId=Some(MaxOverTime), rawSource=false, offsetMs=None) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(100,1,1000)) + |---E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],raw) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-637748980],downsample)""".stripMargin + validatePlan(execPlan, expected) + } + + it("verification of common subquery cases: binary join of subquery and non-subquery, over raw and downsample") { + val lp = Parser.queryRangeToLogicalPlan( + """max_over_time(sum(foo{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" })[5m:1m]) + sum(bar{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1" })""", + TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams)) + + val expected = + """E~BinaryJoinExec(binaryOp=ADD, on=None, ignoring=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |-T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(300000), functionId=Some(MaxOverTime), rawSource=false, offsetMs=None) + |--T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(100,1,1000)) + |---E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634777280000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634777280000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |-----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |------T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |-------T~PeriodicSamplesMapper(start=1633913040000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633912740000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(foo)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |-T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(100,1,1000)) + |--E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(remoteNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |---E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1634173130000, step=300000, end=1634777330000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172830000,1634777330000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],raw) + |----E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample) + |-----T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |------T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634172830000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633913030000,1634172830000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(bar)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-494454291],downsample)""".stripMargin + validatePlan(execPlan, expected) + } + + it("should generate plan for subquery with absent_over_time and offset") { + val query = """absent_over_time(my_gauge{_ws_ = "demo", _ns_ =~ ".*Ns", instance = "Inst-1"}[20m:1m] offset 1h)""" + val queryParams = PromQlQueryParams(query, startSeconds, step, endSeconds) + val lp = Parser.queryRangeToLogicalPlan(query, TimeStepParams(startSeconds, step, endSeconds), Antlr) + val execPlan = rootPlanner.materialize(lp, QueryContext(origQueryParams = queryParams, + plannerParams = PlannerParams(processMultiPartition = true))) + + val expected = """T~AbsentFunctionMapper(columnFilter=List() rangeParams=RangeParams(1633913330,300,1634777330) metricColumn=_metric_) + |-E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |--T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List(job)) + |---T~PeriodicSamplesMapper(start=1633913330000, step=300000, end=1634777330000, window=Some(1200000), functionId=Some(Last), rawSource=false, offsetMs=Some(3600000)) + |----E~MultiPartitionDistConcatExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |-----E~StitchRvsExec() on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,100,false,false,true,Set(),None,Map(filodb-query-exec-aggregate-large-container -> 65536, filodb-query-exec-metadataexec -> 8192),RoutingConfig(false,3 days,true,300000),CachingConfig(true,2048))) + |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1670234237],raw) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634773680000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1634172540000,1634773680000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(my_gauge)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1670234237],raw) + |-------T~PeriodicSamplesMapper(start=1634172840000, step=60000, end=1634773680000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1634172540000,1634773680000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(my_gauge)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1670234237],raw) + |------E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1670234237],downsample) + |-------T~PeriodicSamplesMapper(start=1633908540000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(1633908240000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(my_gauge)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1670234237],downsample) + |-------T~PeriodicSamplesMapper(start=1633908540000, step=60000, end=1634172780000, window=None, functionId=None, rawSource=true, offsetMs=None) + |--------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=1, chunkMethod=TimeRangeChunkScan(1633908240000,1634172780000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(my_gauge)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(localNs))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1670234237],downsample) + |-----E~PromQlRemoteExec(PromQlQueryParams(my_gauge{instance="Inst-1",_ws_="demo",_ns_="remoteNs"},1633908540,60,1634773680,None,false), PlannerParams(filodb,None,None,None,None,60000,PerQueryLimits(1000000,18000000,100000,100000,300000000,1000000,200000000),PerQueryLimits(50000,15000000,50000,50000,150000000,500000,100000000),None,None,None,false,86400000,86400000,true,true,false,false,true,10,false,true,TreeSet(),LegacyFailoverMode,None,None,None,None), queryEndpoint=remotePartition-url, requestTimeoutMs=10000) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,Some(10000),None,None,25,true,false,true,Set(),Some(plannerSelector),Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048)))""".stripMargin + validatePlan(execPlan, expected) + } + // TODO timestamp function does not take binary expression although it works well with Prometheus // timestamp is used often with subqueries to find a particular event // it("verification of common subquery cases: using timestamp funtion to find an occurence of a particular event") { diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlannerSpec.scala index ab7842ac0e..667ca0cfab 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/ShardKeyRegexPlannerSpec.scala @@ -188,12 +188,27 @@ class ShardKeyRegexPlannerSpec extends AnyFunSpec with Matchers with ScalaFuture val engine = new ShardKeyRegexPlanner( dataset, localPlanner, shardKeyMatcherFn, simplePartitionLocationProvider, queryConfig) val execPlan = engine.materialize(lp, QueryContext(origQueryParams = PromQlQueryParams("sum(heap_usage)", 100, 1, 1000))) - execPlan.isInstanceOf[MultiPartitionReduceAggregateExec] shouldEqual(true) - execPlan.children(1).children.head.isInstanceOf[MultiSchemaPartitionsExec] - execPlan.children(0).children.head.asInstanceOf[MultiSchemaPartitionsExec].filters. - contains(ColumnFilter("_ns_", Equals("App-1"))) shouldEqual(true) - execPlan.children(1).children.head.asInstanceOf[MultiSchemaPartitionsExec].filters. - contains(ColumnFilter("_ns_", Equals("App-2"))) shouldEqual(true) + val expected = + """T~AggregatePresenter(aggrOp=Sum, aggrParams=List(), rangeParams=RangeParams(100,1,1000)) + |-E~MultiPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on InProcessPlanDispatcher(QueryConfig(10 seconds,300000,1,50,antlr,true,true,None,None,None,None,25,true,false,true,Set(),None,Map(filodb-query-exec-metadataexec -> 65536, filodb-query-exec-aggregate-large-container -> 65536),RoutingConfig(false,1800000 milliseconds,true,0),CachingConfig(true,2048))) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=1000000, step=0, end=1000000, window=Some(300000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None) + |-----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw) + |------T~PeriodicSamplesMapper(start=720000, step=60000, end=960000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=0, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw) + |------T~PeriodicSamplesMapper(start=720000, step=60000, end=960000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=16, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-1))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw) + |--E~LocalPartitionReduceAggregateExec(aggrOp=Sum, aggrParams=List()) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw) + |---T~AggregateMapReduce(aggrOp=Sum, aggrParams=List(), without=List(), by=List()) + |----T~PeriodicSamplesMapper(start=1000000, step=0, end=1000000, window=Some(300000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None) + |-----E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw) + |------T~PeriodicSamplesMapper(start=720000, step=60000, end=960000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=6, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-2))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw) + |------T~PeriodicSamplesMapper(start=720000, step=60000, end=960000, window=None, functionId=None, rawSource=true, offsetMs=None) + |-------E~MultiSchemaPartitionsExec(dataset=timeseries, shard=22, chunkMethod=TimeRangeChunkScan(420000,960000), filters=List(ColumnFilter(instance,Equals(Inst-1)), ColumnFilter(_metric_,Equals(test)), ColumnFilter(_ws_,Equals(demo)), ColumnFilter(_ns_,Equals(App-2))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-1164028639],raw)""".stripMargin + + validatePlan(execPlan, expected) } it("should generate Exec plan for top level subquery") { diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala index 8e0cef9142..257a7f46cb 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SingleClusterPlannerSpec.scala @@ -304,68 +304,46 @@ class SingleClusterPlannerSpec extends AnyFunSpec val lp = Parser.queryRangeToLogicalPlan("""min_over_time(rate(foo{job="bar"}[5m])[3m:1m])""", TimeStepParams(20900, 90, 21800)) val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams)) - execPlan.isInstanceOf[LocalPartitionDistConcatExec] shouldEqual true - execPlan.children should have length (2) - execPlan.children(1).isInstanceOf[MultiSchemaPartitionsExec] - val partExec = execPlan.children(1).asInstanceOf[MultiSchemaPartitionsExec] - partExec.rangeVectorTransformers should have length (2) - val topPsm = partExec.rangeVectorTransformers(1).asInstanceOf[PeriodicSamplesMapper] - topPsm.startMs shouldEqual 20900000 - topPsm.endMs shouldEqual 21800000 - topPsm.stepMs shouldEqual 90000 - topPsm.window shouldEqual Some(180000) - topPsm.functionId shouldEqual Some(InternalRangeFunction.MinOverTime) - partExec.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] - val middlePsm = partExec.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper] + + // Middle PSM //Notice that the start is not 20 720 000, because 20 720 000 is not divisible by 60 //Instead it's 20 760 000, ie next divisible after 20 720 000 - middlePsm.startMs shouldEqual 20760000 //Similarly the end is not 21 800 000, because 20 800 000 is not divisible by 60 //Instead it's 21 780 000, ie next divisible to the left of 20 800 000 - middlePsm.endMs shouldEqual 21780000 - middlePsm.stepMs shouldEqual 60000 - middlePsm.window shouldEqual Some(300000) // 20 460 000 = 21 780 000 - 300 000 - partExec.chunkMethod.startTime shouldEqual 20460000 - partExec.chunkMethod.endTime shouldEqual 21780000 + val expected = """T~PeriodicSamplesMapper(start=20900000, step=90000, end=21800000, window=Some(180000), functionId=Some(MinOverTime), rawSource=false, offsetMs=None) + |-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-121427689],raw) + |--T~PeriodicSamplesMapper(start=20760000, step=60000, end=21780000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=5, chunkMethod=TimeRangeChunkScan(20460000,21780000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-121427689],raw) + |--T~PeriodicSamplesMapper(start=20760000, step=60000, end=21780000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=21, chunkMethod=TimeRangeChunkScan(20460000,21780000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-121427689],raw)""".stripMargin + validatePlan(execPlan, expected) + } it("should generate correct plan for nested subqueries") { val lp = Parser.queryRangeToLogicalPlan("""avg_over_time(max_over_time(rate(foo{job="bar"}[5m])[5m:1m])[10m:2m])""", TimeStepParams(20900, 90, 21800)) val execPlan = engine.materialize(lp, QueryContext(origQueryParams = promQlQueryParams)) - execPlan.isInstanceOf[LocalPartitionDistConcatExec] shouldEqual true - execPlan.children should have length (2) - execPlan.children(1).isInstanceOf[MultiSchemaPartitionsExec] - val partExec = execPlan.children(1).asInstanceOf[MultiSchemaPartitionsExec] - partExec.rangeVectorTransformers should have length (3) - val topPsm = partExec.rangeVectorTransformers(2).asInstanceOf[PeriodicSamplesMapper] - topPsm.startMs shouldEqual 20900000 - topPsm.endMs shouldEqual 21800000 - topPsm.stepMs shouldEqual 90000 - topPsm.window shouldEqual Some(600000) - topPsm.functionId shouldEqual Some(InternalRangeFunction.AvgOverTime) - partExec.rangeVectorTransformers(0).isInstanceOf[PeriodicSamplesMapper] - val middlePsm = partExec.rangeVectorTransformers(1).asInstanceOf[PeriodicSamplesMapper] + + // middle PSM // 20 900 000 - 600 000 = 20 300 000 // 20 300 000 / 120 000 = 20 280 000 // 20 280 000 + 120 000 = 20 400 000 - middlePsm.startMs shouldEqual 20400000 //Similarly the end is not 21 800 000, because 20 800 000 is not divisible by 120 //Instead it's 21 720 000, ie next divisible to the left of 20 800 000 - middlePsm.endMs shouldEqual 21720000 - middlePsm.stepMs shouldEqual 120000 - middlePsm.window shouldEqual Some(300000) - middlePsm.functionId shouldEqual Some(InternalRangeFunction.MaxOverTime) - val bottomPsm = partExec.rangeVectorTransformers(0).asInstanceOf[PeriodicSamplesMapper] + + // Bottom PSM // 20 400 000 - 300 000 = 20 100 000 - bottomPsm.startMs shouldEqual 20100000 - bottomPsm.endMs shouldEqual 21720000 - bottomPsm.stepMs shouldEqual 60000 - bottomPsm.window shouldEqual Some(300000) // 20 100 000 - 300 000 = 19 800 000 - partExec.chunkMethod.startTime shouldEqual 19800000 - partExec.chunkMethod.endTime shouldEqual 21720000 + val expected = """T~PeriodicSamplesMapper(start=20900000, step=90000, end=21800000, window=Some(600000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None) + |-T~PeriodicSamplesMapper(start=20400000, step=120000, end=21720000, window=Some(300000), functionId=Some(MaxOverTime), rawSource=false, offsetMs=None) + |--E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-152549677],raw) + |---T~PeriodicSamplesMapper(start=20100000, step=60000, end=21720000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=5, chunkMethod=TimeRangeChunkScan(19800000,21720000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-152549677],raw) + |---T~PeriodicSamplesMapper(start=20100000, step=60000, end=21720000, window=Some(300000), functionId=Some(Rate), rawSource=true, offsetMs=None) + |----E~MultiSchemaPartitionsExec(dataset=timeseries, shard=21, chunkMethod=TimeRangeChunkScan(19800000,21720000), filters=List(ColumnFilter(job,Equals(bar)), ColumnFilter(__name__,Equals(foo))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-152549677],raw)""".stripMargin + validatePlan(execPlan, expected) } it("should generate correct plan for top level subqueries") { diff --git a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala index 60ec16cd60..e6834aede5 100644 --- a/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala +++ b/coordinator/src/test/scala/filodb.coordinator/queryplanner/SinglePartitionPlannerSpec.scala @@ -72,12 +72,20 @@ class SinglePartitionPlannerSpec extends AnyFunSpec with Matchers { override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { new MockExecPlan("rules1", logicalPlan) } + override def childPlanners(): Seq[QueryPlanner] = Nil + def getRootPlanner(): Option[QueryPlanner] = None + def setRootPlanner(rootPlanner: QueryPlanner): Unit = {} + initRootPlanner() } val rrPlanner2 = new QueryPlanner { override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = { new MockExecPlan("rules2", logicalPlan) } + override def childPlanners(): Seq[QueryPlanner] = Nil + def getRootPlanner(): Option[QueryPlanner] = None + def setRootPlanner(rootPlanner: QueryPlanner): Unit = {} + initRootPlanner() } val planners = Map("local" -> highAvailabilityPlanner, "rules1" -> rrPlanner1, "rules2" -> rrPlanner2)