Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package filodb.coordinator.queryplanner


import java.util.concurrent.ThreadLocalRandom

import akka.serialization.SerializationExtension
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import filodb.core.query.Filter.{Equals, EqualsRegex}
import filodb.grpc.GrpcCommonUtils
import filodb.query._
import filodb.query.LogicalPlan._
import filodb.query.RangeFunctionId.AbsentOverTime
import filodb.query.exec._

//scalastyle:off file.size.limit
Expand Down Expand Up @@ -517,6 +518,22 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
materializeForAggregateAssignment(aggregate, assignment, queryContext, timeRangeOverride)
case binaryJoin: BinaryJoin =>
materializeForBinaryJoinAssignment(binaryJoin, assignment, queryContext, timeRangeOverride)
case psw: PeriodicSeriesWithWindowing if psw.function == AbsentOverTime =>
val plans = proportionMap.map(entry => {
val partitionDetails = entry._2
materializeForPartition(logicalPlan, partitionDetails.partitionName,
partitionDetails.grpcEndPoint, partitionDetails.httpEndPoint, queryContext, timeRangeOverride)
}).toSeq
val dispatcher = PlannerUtil.pickDispatcher(plans)
// 0 present 1 absent => 01/10/00 are present. 11 is absent.
val reducer = MultiPartitionReduceAggregateExec(queryContext, dispatcher,
plans.sortWith((x, _) => !x.isInstanceOf[PromQlRemoteExec]), AggregationOperator.Absent, Nil)
if (!queryContext.plannerParams.skipAggregatePresent) {
val promQlQueryParams = queryContext.origQueryParams.asInstanceOf[PromQlQueryParams]
reducer.addRangeVectorTransformer(AggregatePresenter(AggregationOperator.Absent, Nil,
RangeParams(promQlQueryParams.startSecs, promQlQueryParams.stepSecs, promQlQueryParams.endSecs)))
}
reducer
case _ =>
val plans = proportionMap.map(entry => {
val partitionDetails = entry._2
Expand Down Expand Up @@ -698,11 +715,13 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
* @param logicalPlan the logic plan.
* @return true if the binary join or aggregation has clauses.
*/
private def hasJoinClause(logicalPlan: LogicalPlan): Boolean = {
private def hasJoinOrAggClause(logicalPlan: LogicalPlan): Boolean = {
logicalPlan match {
case binaryJoin: BinaryJoin => binaryJoin.on.nonEmpty || binaryJoin.ignoring.nonEmpty
case aggregate: Aggregate => hasJoinClause(aggregate.vectors)
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasJoinClause)
case aggregate: Aggregate => hasJoinOrAggClause(aggregate.vectors)
// AbsentOverTime is a special case that is converted to aggregation.
case psw: PeriodicSeriesWithWindowing if psw.function == AbsentOverTime => true
case nonLeaf: NonLeafLogicalPlan => nonLeaf.children.exists(hasJoinOrAggClause)
case _ => false
}
}
Expand All @@ -714,7 +733,7 @@ class MultiPartitionPlanner(val partitionLocationProvider: PartitionLocationProv
val tschemaLabels = getTschemaLabelsIfCanPushdown(aggregate.vectors, queryContext)
// TODO have a more accurate pushdown after location rule is define.
// Right now do not push down any multi-partition namespace plans when on clause exists.
val canPushdown = !(hasMultiPartitionNamespace && hasJoinClause(aggregate)) &&
val canPushdown = !(hasMultiPartitionNamespace && hasJoinOrAggClause(aggregate)) &&
canPushdownAggregate(aggregate, tschemaLabels, queryContext)
val plan = if (!canPushdown) {
val childPlanRes = walkLogicalPlanTree(aggregate.vectors, queryContext.copy(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,7 @@ object ProtoConverters {
case AggregationOperator.Stdvar => GrpcMultiPartitionQueryService.AggregationOperator.STDVAR
case AggregationOperator.Quantile => GrpcMultiPartitionQueryService.AggregationOperator.QUANTILE
case AggregationOperator.Max => GrpcMultiPartitionQueryService.AggregationOperator.MAX
case AggregationOperator.Absent => throw new UnsupportedOperationException("Absent is not supported")
}
}
}
Expand Down
1 change: 1 addition & 0 deletions query/src/main/scala/filodb/query/PlanEnums.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ object AggregationOperator extends Enum[AggregationOperator] {
case object BottomK extends AggregationOperator("bottomk")
case object CountValues extends AggregationOperator("count_values")
case object Quantile extends AggregationOperator("quantile")
case object Absent extends AggregationOperator("absent")
}

sealed abstract class BinaryOperator extends EnumEntry {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package filodb.query.exec.aggregator

import filodb.core.query._
import filodb.memory.format.RowReader

/**
* Map: Every sample is mapped to itself
* ReduceMappedRow: Same as ReduceAggregate since every row is mapped into an aggregate
* ReduceAggregate: Accumulator maintains the min. Reduction happens by choosing one of currentMin, or the value.
* Present: The min is directly presented
*/
object AbsentRowAggregator extends RowAggregator {
class AbsentHolder(var timestamp: Long = 0L, var value: Double = 1.0) extends AggregateHolder {
val row = new TransientRow()
def toRowReader: MutableRowReader = { row.setValues(timestamp, value); row }
def resetToZero(): Unit = value = 1.0
}
type AggHolderType = AbsentHolder
def zero: AbsentHolder = new AbsentHolder()
def newRowToMapInto: MutableRowReader = new TransientRow()
def map(rvk: RangeVectorKey, item: RowReader, mapInto: MutableRowReader): RowReader = item
def reduceAggregate(acc: AbsentHolder, aggRes: RowReader): AbsentHolder = {
acc.timestamp = aggRes.getLong(0)
// NaN means the time series present. 1.0 means absent.
if (aggRes.getDouble(1).isNaN) {
acc.value = Double.NaN
}
acc
}
def present(aggRangeVector: RangeVector, limit: Int,
rangeParams: RangeParams, queryStats: QueryStats): Seq[RangeVector] = Seq(aggRangeVector)
def reductionSchema(source: ResultSchema): ResultSchema = source
def presentationSchema(reductionSchema: ResultSchema): ResultSchema = reductionSchema
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ object RowAggregator {
def apply(aggrOp: AggregationOperator, params: Seq[Any], schema: ResultSchema): RowAggregator = {
val valColType = ResultSchema.valueColumnType(schema)
aggrOp match {
case Absent if valColType != ColumnType.HistogramColumn => AbsentRowAggregator
case Min if valColType != ColumnType.HistogramColumn => MinRowAggregator
case Max if valColType != ColumnType.HistogramColumn => MaxRowAggregator
case Sum if valColType == ColumnType.DoubleColumn => SumRowAggregator
Expand Down