Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 135 additions & 1 deletion parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,7 @@ mod tests {
use arrow::error::Result as ArrowResult;
use arrow_array::builder::{Float32Builder, ListBuilder, StringBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::Int32Type;
use arrow_array::types::{Int32Type, TimestampNanosecondType};
use arrow_array::{
Array, ArrayRef, BooleanArray, Int8Array, Int32Array, Int64Array, RecordBatchReader,
Scalar, StringArray, StructArray, UInt64Array,
Expand Down Expand Up @@ -2308,4 +2308,138 @@ mod tests {

Ok(())
}

/// Regression test for adaptive predicate pushdown attempting to read skipped pages.
/// Related issue: https://github.com/apache/arrow-rs/issues/9239
#[tokio::test]
async fn test_predicate_pushdown_with_skipped_pages() {
use arrow_array::TimestampNanosecondArray;
use arrow_schema::TimeUnit;

// Time range constants
const TIME_IN_RANGE_START: i64 = 1_704_092_400_000_000_000;
const TIME_IN_RANGE_END: i64 = 1_704_110_400_000_000_000;
const TIME_BEFORE_RANGE: i64 = 1_704_078_000_000_000_000;

// Create test data: 2 row groups, 300 rows each
// "tag" column: 'a', 'b', 'c' (100 rows each, sorted)
// "time" column: alternating in-range/out-of-range timestamps
let schema = Arc::new(Schema::new(vec![
Field::new(
"time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("tag", DataType::Utf8, false),
]));

let props = WriterProperties::builder()
.set_max_row_group_size(300)
.set_data_page_row_count_limit(33)
.build();

let mut buffer = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buffer, schema.clone(), Some(props)).unwrap();

// Write 2 row groups
for _ in 0..2 {
for (tag_idx, tag) in ["a", "b", "c"].iter().enumerate() {
let times: Vec<i64> = (0..100)
.map(|j| {
let row_idx = tag_idx * 100 + j;
if row_idx % 2 == 0 {
TIME_IN_RANGE_START + (j as i64 * 1_000_000)
} else {
TIME_BEFORE_RANGE + (j as i64 * 1_000_000)
}
})
.collect();
let tags: Vec<&str> = (0..100).map(|_| *tag).collect();

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(TimestampNanosecondArray::from(times)) as ArrayRef,
Arc::new(StringArray::from(tags)) as ArrayRef,
],
)
.unwrap();
writer.write(&batch).unwrap();
}
writer.flush().unwrap();
}
writer.close().unwrap();
let buffer = Bytes::from(buffer);
// Read back with various page index policies, should get the same answer with all
for policy in [
PageIndexPolicy::Skip,
PageIndexPolicy::Optional,
PageIndexPolicy::Required,
] {
println!("Testing with page index policy: {:?}", policy);
let reader = TestReader::new(buffer.clone());
let options = ArrowReaderOptions::default().with_page_index_policy(policy);
let builder = ParquetRecordBatchStreamBuilder::new_with_options(reader, options)
.await
.unwrap();

let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
let num_row_groups = builder.metadata().num_row_groups();

// Initial selection: skip middle 100 rows (tag='b') per row group
let mut selectors = Vec::new();
for _ in 0..num_row_groups {
selectors.push(RowSelector::select(100));
selectors.push(RowSelector::skip(100));
selectors.push(RowSelector::select(100));
}
let selection = RowSelection::from(selectors);

// Predicate 1: time >= START
let time_gte_predicate =
ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [0]), |batch| {
let col = batch.column(0).as_primitive::<TimestampNanosecondType>();
Ok(BooleanArray::from_iter(
col.iter().map(|t| t.map(|v| v >= TIME_IN_RANGE_START)),
))
});

// Predicate 2: time < END
let time_lt_predicate =
ArrowPredicateFn::new(ProjectionMask::roots(&schema_descr, [0]), |batch| {
let col = batch.column(0).as_primitive::<TimestampNanosecondType>();
Ok(BooleanArray::from_iter(
col.iter().map(|t| t.map(|v| v < TIME_IN_RANGE_END)),
))
});

let row_filter = RowFilter::new(vec![
Box::new(time_gte_predicate),
Box::new(time_lt_predicate),
]);

// Output projection: Only tag column (time not in output)
let projection = ProjectionMask::roots(&schema_descr, [1]);

let stream = builder
.with_row_filter(row_filter)
.with_row_selection(selection)
.with_projection(projection)
.build()
.unwrap();

// Stream should complete without error and the same results
let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();

let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
assert_eq!(batch.num_columns(), 1);
let expected = StringArray::from_iter_values(
std::iter::repeat_n("a", 50)
.chain(std::iter::repeat_n("c", 50))
.chain(std::iter::repeat_n("a", 50))
.chain(std::iter::repeat_n("c", 50)),
);
assert_eq!(batch.column(0).as_string(), &expected);
}
}
}
10 changes: 10 additions & 0 deletions parquet/src/arrow/push_decoder/reader_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,16 @@ impl RowGroupReaderBuilder {
.with_parquet_metadata(&self.metadata)
.build_array_reader(self.fields.as_deref(), predicate.projection())?;

// Prepare to evaluate the filter.
// Note: first update the selection strategy to properly handle any pages
// pruned during fetch
plan_builder = override_selector_strategy_if_needed(
plan_builder,
predicate.projection(),
self.row_group_offset_index(row_group_idx),
);
// `with_predicate` actually evaluates the filter

plan_builder =
plan_builder.with_predicate(array_reader, filter_info.current_mut())?;

Expand Down
Loading