Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ class HighAvailabilityPlanner(dsRef: DatasetRef,
import QueryFailureRoutingStrategy._
import LogicalPlan._

def childPlanners(): Seq[QueryPlanner] = Seq(localPlanner)
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()

// legacy failover counter captures failovers when we send a PromQL to the buddy
// cluster
val legacyFailoverCounter = Kamon.counter(HighAvailabilityPlanner.FailoverCounterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import filodb.query.exec._
* LongTimeRangePlanner knows about limited retention of raw data, and existence of downsampled data.
* For any query that arrives beyond the retention period of raw data, it splits the query into
* two time ranges - latest time range from raw data, and old time range from downsampled data.
* It then stitches the subquery results to present top level query results for entire time range.
*
* @param rawClusterPlanner this planner (typically a SingleClusterPlanner) abstracts planning for raw cluster data
* @param downsampleClusterPlanner this planner (typically a SingleClusterPlanner)
Expand All @@ -38,6 +37,13 @@ import filodb.query.exec._
override val schemas: Schemas = Schemas(dataset.schema)
override val dsOptions: DatasetOptions = schemas.part.options

def childPlanners(): Seq[QueryPlanner] = Seq(rawClusterPlanner, downsampleClusterPlanner)
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()

private def materializePeriodicSeriesPlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan) = {
val execPlan = if (!periodicSeriesPlan.isRoutable)
Expand Down Expand Up @@ -65,7 +71,6 @@ import filodb.query.exec._
}
}


// scalastyle:off method.length
private def materializeRoutablePlan(qContext: QueryContext, periodicSeriesPlan: PeriodicSeriesPlan): ExecPlan = {
import LogicalPlan._
Expand Down Expand Up @@ -114,15 +119,17 @@ import filodb.query.exec._
downsampleClusterPlanner.materialize(periodicSeriesPlan, qContext)
} else if (startWithOffsetMs - lookbackMs >= earliestRawTime) { // full time range in raw cluster
rawClusterPlanner.materialize(periodicSeriesPlan, qContext)
} else if (LogicalPlan.hasSubqueryWithWindowing(periodicSeriesPlan)) {
// eventually the subquery will be delegated to the root planner
super.defaultWalkLogicalPlanTree(periodicSeriesPlan, qContext).plans.head
} else if (
// "(endWithOffsetMs - lookbackMs) < earliestRawTime" check is erroneous, we claim that we have
// a long lookback only if the last lookback window overlaps with earliestRawTime, however, we
// should check for ANY interval overalapping earliestRawTime. We
// can happen with ANY lookback interval, not just the last one.
} else if (
endWithOffsetMs - lookbackMs < earliestRawTime || //TODO lookbacks can overlap in the middle intervals too
LogicalPlan.hasSubqueryWithWindowing(periodicSeriesPlan)
endWithOffsetMs - lookbackMs < earliestRawTime //TODO lookbacks can overlap in the middle intervals too
) {
// For subqueries and long lookback queries, we keep things simple by routing to
// For long lookback queries, we keep things simple by routing to
// downsample cluster since dealing with lookback windows across raw/downsample
// clusters is quite complex and is not in scope now. We omit recent instants for which downsample
// cluster may not have complete data
Expand Down Expand Up @@ -276,12 +283,17 @@ import filodb.query.exec._
}

// scalastyle:off cyclomatic.complexity
// scalastyle:off method.length
override def walkLogicalPlanTree(logicalPlan: LogicalPlan,
qContext: QueryContext,
forceInProcess: Boolean = false): PlanResult = {

if (!LogicalPlanUtils.hasBinaryJoin(logicalPlan)) {
logicalPlan match {
// for subqueries, we need to delegate to the root planner since the logic resides there already
case t: TopLevelSubquery => super.materializeTopLevelSubquery(qContext, t)
case s: SubqueryWithWindowing => super.materializeSubqueryWithWindowing(qContext, s)

case p: PeriodicSeriesPlan => materializePeriodicSeriesPlan(qContext, p)
case lc: LabelCardinality => materializeLabelCardinalityPlan(lc, qContext)
case ts: TsCardinalities => materializeTSCardinalityPlan(qContext, ts)
Expand Down Expand Up @@ -314,7 +326,7 @@ import filodb.query.exec._
case lp: ScalarFixedDoublePlan => super.materializeFixedScalar(qContext, lp)
case lp: ApplyAbsentFunction => super.materializeAbsentFunction(qContext, lp)
case lp: ScalarBinaryOperation => super.materializeScalarBinaryOperation(qContext, lp)
case lp: SubqueryWithWindowing => materializePeriodicSeriesPlan(qContext, lp)
case lp: SubqueryWithWindowing => throw new IllegalStateException("Already handled subquery with windowing")
case lp: TopLevelSubquery => super.materializeTopLevelSubquery(qContext, lp)
case lp: ApplyLimitFunction => rawClusterMaterialize(qContext, lp)
case lp: LabelNames => rawClusterMaterialize(qContext, lp)
Expand All @@ -324,6 +336,7 @@ import filodb.query.exec._
}

override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
require(getRootPlanner().isDefined, "Root planner not set. Internal error.")
walkLogicalPlanTree(logicalPlan, qContext).plans.head
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
override val schemas: Schemas = Schemas(dataset.schema)
override val dsOptions: DatasetOptions = schemas.part.options

def childPlanners(): Seq[QueryPlanner] = Seq(localPartitionPlanner)
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()

val plannerSelector: String = queryConfig.plannerSelector
.getOrElse(throw new IllegalArgumentException("plannerSelector is mandatory"))

Expand Down Expand Up @@ -96,6 +104,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
// plannerHelper.materializeX(x)
// }
// }
require(getRootPlanner().isDefined, "Root planner not set. Internal error.")
val tsdbQueryParams = qContext.origQueryParams

if(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package filodb.coordinator.queryplanner

import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

import kamon.Kamon
Expand All @@ -11,6 +12,7 @@ import filodb.core.query.QueryContext
import filodb.query.{LogicalPlan, QueryResponse, StreamQueryResponse}
import filodb.query.exec.{ClientParams, ExecPlan, ExecPlanWithClientParams, UnsupportedChunkSource}


/**
* Abstraction for Query Planning. QueryPlanners can be composed using decorator pattern to add capabilities.
*/
Expand Down Expand Up @@ -56,4 +58,30 @@ trait QueryPlanner {
}
}

/**
* Returns the child planners of this planner.
*/
def childPlanners(): Seq[QueryPlanner]

/**
* Returns the root planner of the planner tree. Root planner is needed to plan subqueries from the leaf planners.
*/
def getRootPlanner(): Option[QueryPlanner]

def setRootPlanner(rootPlanner: QueryPlanner): Unit

/**
* Uses Depth first traversal to initialize the root planner in the planner tree.
* Must call on the root planner once the planner tree is constructed.
*/
def initRootPlanner(): Unit = {
val q = mutable.Queue[QueryPlanner]()
q.enqueue(this)
while (q.nonEmpty) {
val p = q.dequeue()
p.setRootPlanner(this)
p.childPlanners().foreach(c => q.enqueue(c))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
_targetSchemaProvider: TargetSchemaProvider = StaticTargetSchemaProvider())
extends PartitionLocationPlanner(dataset, partitionLocationProvider) {

def childPlanners(): Seq[QueryPlanner] = Seq(queryPlanner)
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()

override def queryConfig: QueryConfig = config
override val schemas: Schemas = Schemas(dataset.schema)
override val dsOptions: DatasetOptions = schemas.part.options
Expand Down Expand Up @@ -98,6 +106,7 @@ class ShardKeyRegexPlanner(val dataset: Dataset,
* @return materialized Execution Plan which can be dispatched
*/
override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
require(getRootPlanner().isDefined, "Root planner not set. Internal error.")
val nonMetricShardKeyFilters =
LogicalPlan.getNonMetricShardKeyFilters(logicalPlan, dataset.options.nonMetricShardColumns)
if (isMetadataQuery(logicalPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class SingleClusterPlanner(val dataset: Dataset,
override val dsOptions: DatasetOptions = schemas.part.options
private val shardColumns = dsOptions.shardKeyColumns.sorted
private val dsRef = dataset.ref
def childPlanners(): Seq[QueryPlanner] = Nil
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()

private val shardPushdownCache: Option[Cache[(LogicalPlan, Option[Seq[Int]]), Option[Set[Int]]]] =
if (queryConfig.cachingConfig.singleClusterPlannerCachingEnabled) {
Expand Down Expand Up @@ -305,6 +312,7 @@ class SingleClusterPlanner(val dataset: Dataset,
}

def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
require(getRootPlanner().isDefined, "Root planner not set. Internal error.")
val plannerParams = qContext.plannerParams
val updatedPlan = updateStartTime(logicalPlan)
if (updatedPlan.isEmpty) EmptyResultExec(qContext, dsRef, inProcessPlanDispatcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,21 @@ class SinglePartitionPlanner(planners: Map[String, QueryPlanner],
val queryConfig: QueryConfig)
extends QueryPlanner with DefaultPlanner {

def childPlanners(): Seq[QueryPlanner] = planners.values.toSeq
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()

override val schemas: Schemas = Schemas(dataset.schema)
override val dsOptions: DatasetOptions = schemas.part.options

def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan =
def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
require(getRootPlanner().isDefined, "Root planner not set. Internal error.")
walkLogicalPlanTree(logicalPlan, qContext).plans.head
}

/**
* Returns planner for first metric in logical plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,26 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat
override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
new MockExecPlan("raw", logicalPlan)
}
def childPlanners(): Seq[QueryPlanner] = Nil
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()
}

val downsamplePlanner = new QueryPlanner {
override def materialize(logicalPlan: LogicalPlan, qContext: QueryContext): ExecPlan = {
new MockExecPlan("downsample", logicalPlan)
}
def childPlanners(): Seq[QueryPlanner] = Nil
private var rootPlanner: Option[QueryPlanner] = None
def getRootPlanner(): Option[QueryPlanner] = rootPlanner
def setRootPlanner(rootPlanner: QueryPlanner): Unit = {
this.rootPlanner = Some(rootPlanner)
}
initRootPlanner()
}

val rawRetention = 10.minutes
Expand Down Expand Up @@ -88,7 +102,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat

val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan]
ep.name shouldEqual "raw"
ep.lp shouldEqual logicalPlan
ep.lp shouldEqual logicalPlan.asInstanceOf[TopLevelSubquery].innerPeriodicSeries
}

it("should direct raw-cluster-only range function subqueries to raw planner") {
Expand All @@ -99,7 +113,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat

val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan]
ep.name shouldEqual "raw"
ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing]
ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing].innerPeriodicSeries
}

it("should direct downsample-only queries to downsample planner") {
Expand All @@ -117,7 +131,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat

val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan]
ep.name shouldEqual "downsample"
ep.lp shouldEqual logicalPlan
ep.lp shouldEqual logicalPlan.asInstanceOf[TopLevelSubquery].innerPeriodicSeries
}

it("should directed downsample-only range function subqueries to downsample planner") {
Expand All @@ -126,7 +140,7 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat

val ep = longTermPlanner.materialize(logicalPlan, QueryContext()).asInstanceOf[MockExecPlan]
ep.name shouldEqual "downsample"
ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing]
ep.lp shouldEqual logicalPlan.asInstanceOf[SubqueryWithWindowing].innerPeriodicSeries
}

it("should direct overlapping instant queries correctly to raw or downsample clusters") {
Expand Down Expand Up @@ -216,9 +230,8 @@ class LongTimeRangePlannerSpec extends AnyFunSpec with Matchers with PlanValidat
val ep = longTermPlanner.materialize(logicalPlan, QueryContext())
val exp = ep.asInstanceOf[MockExecPlan]
exp.name shouldEqual "downsample"
val downsampleLp = exp.lp.asInstanceOf[PeriodicSeriesPlan]
downsampleLp.startMs shouldEqual logicalPlan.startMs
downsampleLp.endMs shouldEqual logicalPlan.endMs
exp.lp.asInstanceOf[PeriodicSeriesPlan]
// skipping check of startMs and endMs as we are not using a proper root planner to plan the subquery
}

def getStartForSubquery(startMs: Long, stepMs: Long) : Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,13 +475,12 @@ class MultiPartitionPlannerSpec extends AnyFunSpec with Matchers with PlanValida
QueryContext(origQueryParams = promQlQueryParams, plannerParams = PlannerParams(processMultiPartition = true))
)
val expectedPlan = {
"""E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#64113238],raw)
|-T~PeriodicSamplesMapper(start=1200000, step=120000, end=1800000, window=Some(600000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None)
"""T~PeriodicSamplesMapper(start=1200000, step=120000, end=1800000, window=Some(600000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None)
|-E~LocalPartitionDistConcatExec() on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-742845126],raw)
|--T~PeriodicSamplesMapper(start=600000, step=60000, end=1800000, window=None, functionId=None, rawSource=true, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#64113238],raw)
|-T~PeriodicSamplesMapper(start=1200000, step=120000, end=1800000, window=Some(600000), functionId=Some(AvgOverTime), rawSource=false, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=3, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-742845126],raw)
|--T~PeriodicSamplesMapper(start=600000, step=60000, end=1800000, window=None, functionId=None, rawSource=true, offsetMs=None)
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#64113238],raw)""".stripMargin
|---E~MultiSchemaPartitionsExec(dataset=timeseries, shard=19, chunkMethod=TimeRangeChunkScan(300000,1800000), filters=List(ColumnFilter(job,Equals(app)), ColumnFilter(__name__,Equals(test))), colName=None, schema=None) on ActorPlanDispatcher(Actor[akka://default/system/testProbe-1#-742845126],raw)""".stripMargin
}
validatePlan(execPlan, expectedPlan)
}
Expand Down
Loading