Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
private val indexTtlMs = downsampleTtls.last.toMillis
private val clusterType = filodbConfig.getString("cluster-type")
private val deploymentPartitionName = filodbConfig.getString("deployment-partition-name")
private val NamespaceCol = "namespace"

private val downsampleStoreConfig = StoreConfig(filodbConfig.getConfig("downsampler.downsample-store-config"))
private val typeFieldIndexingEnabled = filodbConfig.getBoolean("memstore.type-field-indexing-enabled")
Expand Down Expand Up @@ -345,7 +346,11 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
// is expensive, but we do it to handle data sizing for metrics that have
// continuous churn. See capDataScannedPerShardCheck method.
val startNs = Utils.currentThreadCpuTimeNanos
val recs = partKeyIndex.partKeyRecordsFromFilters(filters, chunkMethod.startTime, chunkMethod.endTime)
val normalizedFilters = normalizeNamespaceFilter(filters)
// NEW (minimal): attribute counters per namespace if it became In(...)
attributePerNamespaceCounters(normalizedFilters, chunkMethod, querySession)
val recs = partKeyIndex.partKeyRecordsFromFilters(
normalizedFilters, chunkMethod.startTime, chunkMethod.endTime)
val _schema = recs.headOption.map { pkRec =>
RecordSchema.schemaID(pkRec.partKey, UnsafeUtils.arayOffset)
}
Expand All @@ -355,10 +360,11 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
stats.queryTimeRangeMins.record((chunkMethod.endTime - chunkMethod.startTime) / 60000 )
val metricShardKeys = schemas.part.options.shardKeyColumns
val metricGroupBy = deploymentPartitionName +: clusterType +: metricShardKeys.map { col =>
filters.collectFirst {
case ColumnFilter(c, Filter.Equals(filtVal: String)) if c == col => filtVal
}.getOrElse("multiple")
}.toList
normalizedFilters.collectFirst {
case ColumnFilter(c, Filter.Equals(v)) if c == col => v.toString
case ColumnFilter(c, Filter.In(_)) if c == col => "multiple" // came from nsA|nsB|nsC
}.getOrElse("multiple")
}.toList
querySession.queryStats.getTimeSeriesScannedCounter(metricGroupBy).addAndGet(recs.length)
querySession.queryStats.getCpuNanosCounter(metricGroupBy).addAndGet(Utils.currentThreadCpuTimeNanos - startNs)
val chunksReadCounter = querySession.queryStats.getDataBytesScannedCounter(metricGroupBy)
Expand All @@ -372,6 +378,87 @@ class DownsampledTimeSeriesShard(rawDatasetRef: DatasetRef,
}
}

/**
* Attributes per-namespace metrics when the namespace filter has been normalized to `In(...)`.
*
* How it works:
* 1) Locate the namespace filter that is an `In(...)`; if none exists, do nothing.
* 2) For each namespace value `v` in that `In` set:
* - Replace the `namespace In(...)` with `namespace = v` to form `perNsFilters`.
* - Start a CPU timer and run a part-key lookup
* (`partKeyIndex.partKeyRecordsFromFilters(perNsFilters, startTime, endTime)`)
* purely for accounting (does not affect query results).
* - If any record exists, infer its schema ID and increment `queriesBySchema(schema=...)`.
* - Record the query time range (minutes) into `queryTimeRangeMins`.
* - Build metric tags (`metricGroupByPerNs`) from:
* deploymentPartitionName, clusterType, and each shard-key column’s single `Equals(...)` value,
* or `"multiple"` if a unique value isn’t present.
* - Bump per-namespace counters in `querySession.queryStats` with those tags:
* * timeSeriesScanned += number of part-key records
* * cpuNanos += elapsed CPU nanos since the timer started
*
* Side-effects only: this function is for metering/attribution and does not change query results.
*
* @param normalizedFilters Filters expected to already contain `namespace In(...)` from prior normalization.
* @param chunkMethod Provides the time range used for accounting lookups and range histogram.
* @param querySession Destination for per-namespace counters/histograms tagged by shard keys.
*/
private def attributePerNamespaceCounters(normalizedFilters: Seq[ColumnFilter],
chunkMethod: ChunkScanMethod,
querySession: QuerySession): Unit = {
// find the namespace filter if it is an In(...)
val nsIdxOpt =
normalizedFilters.zipWithIndex.collectFirst {
case (ColumnFilter(c, f), idx) if c == NamespaceCol && f.isInstanceOf[Filter.In] => idx
}

nsIdxOpt.foreach { nsIdx =>
val nsValues = normalizedFilters(nsIdx).filter.asInstanceOf[Filter.In].values
val shardKeys = schemas.part.options.shardKeyColumns

nsValues.foreach { v =>
// replace namespace=In(...) with namespace=Equals(v) for per-NS attribution
val perNsFilters = normalizedFilters.updated(nsIdx, ColumnFilter(NamespaceCol, Filter.Equals(v)))
val startNs = Utils.currentThreadCpuTimeNanos

// per-namespace lookup purely for accounting
val recsForNs = partKeyIndex.partKeyRecordsFromFilters(
perNsFilters, chunkMethod.startTime, chunkMethod.endTime)
Comment on lines +424 to +426
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This correctly gets the per-ns query stats, but it's an expensive index lookup that we've already done for the full pipe-concatenated set of namespaces (i.e. to serve the query). IIUC this is effectively a second full scan.

Our options might be pretty limited here-- we can:

  1. In the SingleClusterPlanner, split LogicalPlans with pipe-concatenated namespaces into one plan per namespace. Materialize each, then concatenate/aggregate. Benefit: filodb clusters can still be sent namespace-regex; total count of remote plans remains low. Cost: risky planner rework.
  2. In the ShardKeyRegexPlanner, set the max count of pipe-concatenated namespaces to 1. Benefit: stupid-simple configuration change. Cost: count of remote plans will increase; query fanout increases.

I'm leaning towards option 2 (at least as a first attempt). The benefit we've seen from reduced fanout has been negligible, so I don't expect query latencies to suddenly increase. Also, we can scale the max count of pipe-concatenated namespaces gradually back to 1 and abort if query latencies spike.

What do you think @amolnayak311?


// schema counter (same as main path)
recsForNs.headOption.foreach { pkRec =>
val sid = RecordSchema.schemaID(pkRec.partKey, UnsafeUtils.arayOffset)
stats.queriesBySchema.withTag("schema", schemas(sid).name).increment()
}

stats.queryTimeRangeMins.record((chunkMethod.endTime - chunkMethod.startTime) / 60000)

// build metricGroupBy for this specific namespace
val metricGroupByPerNs =
deploymentPartitionName +: clusterType +: shardKeys.map { col =>
perNsFilters.collectFirst {
case ColumnFilter(c, Filter.Equals(x)) if c == col => x.toString
}.getOrElse("multiple")
}.toList

querySession.queryStats.getTimeSeriesScannedCounter(metricGroupByPerNs).addAndGet(recsForNs.length)
querySession.queryStats.getCpuNanosCounter(metricGroupByPerNs)
.addAndGet(Utils.currentThreadCpuTimeNanos - startNs)
}
}
}

// Replace a namespace regex with unescaped pipes by an In(Set[Any]) or Equals(...)
private def normalizeNamespaceFilter(filters: Seq[ColumnFilter]): Seq[ColumnFilter] = {
filters.map {
case ColumnFilter(col, Filter.EqualsRegex(re)) if col == NamespaceCol =>
val parts = QueryUtils.splitAtUnescapedPipes(re.toString).map(_.trim).filter(_.nonEmpty)
if (parts.size > 1) ColumnFilter(col, Filter.In(parts.map(identity[Any]).toSet))
else ColumnFilter(col, Filter.Equals(re.toString))
case other => other
}
}

def shutdown(): Unit = {
try {
partKeyIndex.closeIndex()
Expand Down