Skip to content

Commit 6ae5d68

Browse files
committed
Revert "remove force_filter_selections"
This reverts commit 1b1f46b.
1 parent 1b1f46b commit 6ae5d68

File tree

17 files changed

+109
-4
lines changed

17 files changed

+109
-4
lines changed

datafusion/common/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -694,6 +694,12 @@ config_namespace! {
694694
/// the filters are applied in the same order as written in the query
695695
pub reorder_filters: bool, default = false
696696

697+
/// (reading) Force the use of RowSelections for filter results, when
698+
/// pushdown_filters is enabled. If false, the reader will automatically
699+
/// choose between a RowSelection and a Bitmap based on the number and
700+
/// pattern of selected rows.
701+
pub force_filter_selections: bool, default = false
702+
697703
/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
698704
/// and `Binary/BinaryLarge` with `BinaryView`.
699705
pub schema_force_view_types: bool, default = true

datafusion/common/src/error.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1233,6 +1233,7 @@ mod test {
12331233
// To pass the test the environment variable RUST_BACKTRACE should be set to 1 to enforce backtrace
12341234
#[cfg(feature = "backtrace")]
12351235
#[test]
1236+
#[expect(clippy::unnecessary_literal_unwrap)]
12361237
fn test_enabled_backtrace() {
12371238
match std::env::var("RUST_BACKTRACE") {
12381239
Ok(val) if val == "1" => {}

datafusion/common/src/file_options/parquet_writer.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ impl ParquetOptions {
199199
metadata_size_hint: _,
200200
pushdown_filters: _,
201201
reorder_filters: _,
202+
force_filter_selections: _, // not used for writer props
202203
allow_single_file_parallelism: _,
203204
maximum_parallel_row_group_writers: _,
204205
maximum_buffered_record_batches_per_stream: _,
@@ -461,6 +462,7 @@ mod tests {
461462
metadata_size_hint: defaults.metadata_size_hint,
462463
pushdown_filters: defaults.pushdown_filters,
463464
reorder_filters: defaults.reorder_filters,
465+
force_filter_selections: defaults.force_filter_selections,
464466
allow_single_file_parallelism: defaults.allow_single_file_parallelism,
465467
maximum_parallel_row_group_writers: defaults
466468
.maximum_parallel_row_group_writers,
@@ -572,6 +574,7 @@ mod tests {
572574
metadata_size_hint: global_options_defaults.metadata_size_hint,
573575
pushdown_filters: global_options_defaults.pushdown_filters,
574576
reorder_filters: global_options_defaults.reorder_filters,
577+
force_filter_selections: global_options_defaults.force_filter_selections,
575578
allow_single_file_parallelism: global_options_defaults
576579
.allow_single_file_parallelism,
577580
maximum_parallel_row_group_writers: global_options_defaults

datafusion/core/tests/parquet/filter_pushdown.rs

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -638,9 +638,28 @@ async fn predicate_cache_pushdown_default() -> datafusion_common::Result<()> {
638638
// The cache is on by default, and used when filter pushdown is enabled
639639
PredicateCacheTest {
640640
expected_inner_records: 8,
641-
// reads more than necessary from the cache as then another bitmap is applied
642-
// See https://github.com/apache/datafusion/pull/18820 for setting and workaround
643-
expected_records: 7,
641+
expected_records: 7, // reads more than necessary from the cache as then another bitmap is applied
642+
}
643+
.run(&ctx)
644+
.await
645+
}
646+
647+
#[tokio::test]
648+
async fn predicate_cache_pushdown_default_selections_only(
649+
) -> datafusion_common::Result<()> {
650+
let mut config = SessionConfig::new();
651+
config.options_mut().execution.parquet.pushdown_filters = true;
652+
// forcing filter selections minimizes the number of rows read from the cache
653+
config
654+
.options_mut()
655+
.execution
656+
.parquet
657+
.force_filter_selections = true;
658+
let ctx = SessionContext::new_with_config(config);
659+
// The cache is on by default, and used when filter pushdown is enabled
660+
PredicateCacheTest {
661+
expected_inner_records: 8,
662+
expected_records: 4,
644663
}
645664
.run(&ctx)
646665
.await

datafusion/datasource-parquet/src/opener.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ use futures::{ready, Stream, StreamExt, TryStreamExt};
5353
use itertools::Itertools;
5454
use log::debug;
5555
use parquet::arrow::arrow_reader::metrics::ArrowReaderMetrics;
56-
use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
56+
use parquet::arrow::arrow_reader::{
57+
ArrowReaderMetadata, ArrowReaderOptions, RowSelectionPolicy,
58+
};
5759
use parquet::arrow::async_reader::AsyncFileReader;
5860
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
5961
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
@@ -87,6 +89,8 @@ pub(super) struct ParquetOpener {
8789
pub pushdown_filters: bool,
8890
/// Should the filters be reordered to optimize the scan?
8991
pub reorder_filters: bool,
92+
/// Should we force the reader to use RowSelections for filtering
93+
pub force_filter_selections: bool,
9094
/// Should the page index be read from parquet files, if present, to skip
9195
/// data pages
9296
pub enable_page_index: bool,
@@ -147,6 +151,7 @@ impl FileOpener for ParquetOpener {
147151
let partition_fields = self.partition_fields.clone();
148152
let reorder_predicates = self.reorder_filters;
149153
let pushdown_filters = self.pushdown_filters;
154+
let force_filter_selections = self.force_filter_selections;
150155
let coerce_int96 = self.coerce_int96;
151156
let enable_bloom_filter = self.enable_bloom_filter;
152157
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
@@ -347,6 +352,10 @@ impl FileOpener for ParquetOpener {
347352
}
348353
};
349354
};
355+
if force_filter_selections {
356+
builder =
357+
builder.with_row_selection_policy(RowSelectionPolicy::Selectors);
358+
}
350359

351360
// Determine which row groups to actually read. The idea is to skip
352361
// as many row groups as possible based on the metadata and query
@@ -887,6 +896,7 @@ mod test {
887896
partition_fields: vec![],
888897
pushdown_filters: false, // note that this is false!
889898
reorder_filters: false,
899+
force_filter_selections: false,
890900
enable_page_index: false,
891901
enable_bloom_filter: false,
892902
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -960,6 +970,7 @@ mod test {
960970
))],
961971
pushdown_filters: false, // note that this is false!
962972
reorder_filters: false,
973+
force_filter_selections: false,
963974
enable_page_index: false,
964975
enable_bloom_filter: false,
965976
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1049,6 +1060,7 @@ mod test {
10491060
))],
10501061
pushdown_filters: false, // note that this is false!
10511062
reorder_filters: false,
1063+
force_filter_selections: false,
10521064
enable_page_index: false,
10531065
enable_bloom_filter: false,
10541066
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1141,6 +1153,7 @@ mod test {
11411153
))],
11421154
pushdown_filters: true, // note that this is true!
11431155
reorder_filters: true,
1156+
force_filter_selections: false,
11441157
enable_page_index: false,
11451158
enable_bloom_filter: false,
11461159
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1233,6 +1246,7 @@ mod test {
12331246
))],
12341247
pushdown_filters: false, // note that this is false!
12351248
reorder_filters: false,
1249+
force_filter_selections: false,
12361250
enable_page_index: false,
12371251
enable_bloom_filter: false,
12381252
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
@@ -1383,6 +1397,7 @@ mod test {
13831397
partition_fields: vec![],
13841398
pushdown_filters: true,
13851399
reorder_filters: false,
1400+
force_filter_selections: false,
13861401
enable_page_index: false,
13871402
enable_bloom_filter: false,
13881403
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),

datafusion/datasource-parquet/src/source.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,11 @@ impl ParquetSource {
410410
self.table_parquet_options.global.reorder_filters
411411
}
412412

413+
/// Return the value of [`datafusion_common::config::ParquetOptions::force_filter_selections`]
414+
fn force_filter_selections(&self) -> bool {
415+
self.table_parquet_options.global.force_filter_selections
416+
}
417+
413418
/// If enabled, the reader will read the page index
414419
/// This is used to optimize filter pushdown
415420
/// via `RowSelector` and `RowFilter` by
@@ -595,6 +600,7 @@ impl FileSource for ParquetSource {
595600
parquet_file_reader_factory,
596601
pushdown_filters: self.pushdown_filters(),
597602
reorder_filters: self.reorder_filters(),
603+
force_filter_selections: self.force_filter_selections(),
598604
enable_page_index: self.enable_page_index(),
599605
enable_bloom_filter: self.bloom_filter_on_read(),
600606
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,

datafusion/proto-common/proto/datafusion_common.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,7 @@ message ParquetOptions {
519519
bool skip_metadata = 3; // default = true
520520
bool pushdown_filters = 5; // default = false
521521
bool reorder_filters = 6; // default = false
522+
bool force_filter_selections = 34; // default = false
522523
uint64 data_pagesize_limit = 7; // default = 1024 * 1024
523524
uint64 write_batch_size = 8; // default = 1024
524525
string writer_version = 9; // default = "1.0"

datafusion/proto-common/src/from_proto/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -943,6 +943,7 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
943943
.unwrap_or(None),
944944
pushdown_filters: value.pushdown_filters,
945945
reorder_filters: value.reorder_filters,
946+
force_filter_selections: value.force_filter_selections,
946947
data_pagesize_limit: value.data_pagesize_limit as usize,
947948
write_batch_size: value.write_batch_size as usize,
948949
writer_version: value.writer_version.clone(),

datafusion/proto-common/src/generated/pbjson.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5557,6 +5557,9 @@ impl serde::Serialize for ParquetOptions {
55575557
if self.reorder_filters {
55585558
len += 1;
55595559
}
5560+
if self.force_filter_selections {
5561+
len += 1;
5562+
}
55605563
if self.data_pagesize_limit != 0 {
55615564
len += 1;
55625565
}
@@ -5651,6 +5654,9 @@ impl serde::Serialize for ParquetOptions {
56515654
if self.reorder_filters {
56525655
struct_ser.serialize_field("reorderFilters", &self.reorder_filters)?;
56535656
}
5657+
if self.force_filter_selections {
5658+
struct_ser.serialize_field("forceFilterSelections", &self.force_filter_selections)?;
5659+
}
56545660
if self.data_pagesize_limit != 0 {
56555661
#[allow(clippy::needless_borrow)]
56565662
#[allow(clippy::needless_borrows_for_generic_args)]
@@ -5816,6 +5822,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
58165822
"pushdownFilters",
58175823
"reorder_filters",
58185824
"reorderFilters",
5825+
"force_filter_selections",
5826+
"forceFilterSelections",
58195827
"data_pagesize_limit",
58205828
"dataPagesizeLimit",
58215829
"write_batch_size",
@@ -5875,6 +5883,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
58755883
SkipMetadata,
58765884
PushdownFilters,
58775885
ReorderFilters,
5886+
ForceFilterSelections,
58785887
DataPagesizeLimit,
58795888
WriteBatchSize,
58805889
WriterVersion,
@@ -5927,6 +5936,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
59275936
"skipMetadata" | "skip_metadata" => Ok(GeneratedField::SkipMetadata),
59285937
"pushdownFilters" | "pushdown_filters" => Ok(GeneratedField::PushdownFilters),
59295938
"reorderFilters" | "reorder_filters" => Ok(GeneratedField::ReorderFilters),
5939+
"forceFilterSelections" | "force_filter_selections" => Ok(GeneratedField::ForceFilterSelections),
59305940
"dataPagesizeLimit" | "data_pagesize_limit" => Ok(GeneratedField::DataPagesizeLimit),
59315941
"writeBatchSize" | "write_batch_size" => Ok(GeneratedField::WriteBatchSize),
59325942
"writerVersion" | "writer_version" => Ok(GeneratedField::WriterVersion),
@@ -5977,6 +5987,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
59775987
let mut skip_metadata__ = None;
59785988
let mut pushdown_filters__ = None;
59795989
let mut reorder_filters__ = None;
5990+
let mut force_filter_selections__ = None;
59805991
let mut data_pagesize_limit__ = None;
59815992
let mut write_batch_size__ = None;
59825993
let mut writer_version__ = None;
@@ -6035,6 +6046,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
60356046
}
60366047
reorder_filters__ = Some(map_.next_value()?);
60376048
}
6049+
GeneratedField::ForceFilterSelections => {
6050+
if force_filter_selections__.is_some() {
6051+
return Err(serde::de::Error::duplicate_field("forceFilterSelections"));
6052+
}
6053+
force_filter_selections__ = Some(map_.next_value()?);
6054+
}
60386055
GeneratedField::DataPagesizeLimit => {
60396056
if data_pagesize_limit__.is_some() {
60406057
return Err(serde::de::Error::duplicate_field("dataPagesizeLimit"));
@@ -6213,6 +6230,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
62136230
skip_metadata: skip_metadata__.unwrap_or_default(),
62146231
pushdown_filters: pushdown_filters__.unwrap_or_default(),
62156232
reorder_filters: reorder_filters__.unwrap_or_default(),
6233+
force_filter_selections: force_filter_selections__.unwrap_or_default(),
62166234
data_pagesize_limit: data_pagesize_limit__.unwrap_or_default(),
62176235
write_batch_size: write_batch_size__.unwrap_or_default(),
62186236
writer_version: writer_version__.unwrap_or_default(),

datafusion/proto-common/src/generated/prost.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,9 @@ pub struct ParquetOptions {
763763
/// default = false
764764
#[prost(bool, tag = "6")]
765765
pub reorder_filters: bool,
766+
/// default = false
767+
#[prost(bool, tag = "34")]
768+
pub force_filter_selections: bool,
766769
/// default = 1024 * 1024
767770
#[prost(uint64, tag = "7")]
768771
pub data_pagesize_limit: u64,

0 commit comments

Comments
 (0)