Skip to content

Conversation

@danielhumanmod
Copy link
Contributor

Which issue does this PR close?

Closes #1379.

Rationale for this change

Currently, ShuffleReaderExec passes batches directly from the upstream AbortableReceiverStream to downstream operators. In scenarios with high parallelism or network fragmentation, the shuffle reader can receives small record batches.

This causes performance degradation because:

  1. Downstream operators (aggregates, joins) incur high overhead per function call (poll_next).
  2. Vectorized execution benefits are lost on micro-batches.

What changes are included in this PR?

  1. Core Logic: Introduced CoalescedShuffleReaderStream as a stream adapter to buffer and merge small batches using DataFusion's BatchCoalescer.

  2. Integration: Updated ShuffleReaderExec::execute to apply this wrapper, enabling configurable batch sizing via SessionConfig and preserving existing metrics.

Are there any user-facing changes?

No

@danielhumanmod
Copy link
Contributor Author

cc @milenkovicm

@danielhumanmod
Copy link
Contributor Author

danielhumanmod commented Jan 16, 2026

But one wired thing is the performance seems not improve based on benchmark test result, I used the Running the Ballista Benchmarks setting. Maybe didn't use the right one?

After

st --port 60000 --query 10 --path ./data --format tbl --iterations 3`
Running benchmarks with the following options: BallistaBenchmarkOpt { query: 10, debug: false, expected_results: None, iterations: 3, batch_size: 8192, path: "./data", file_format: "tbl", partitions: 2, host: Some("localhost"), port: Some(60000), output_path: None }
Running benchmark with queries 10:
 ["select\n    c_custkey,\n    c_name,\n    sum(l_extendedprice * (1 - l_discount)) as revenue,\n    c_acctbal,\n    n_name,\n    c_address,\n    c_phone,\n    c_comment\nfrom\n    customer,\n    orders,\n    lineitem,\n    nation\nwhere\n        c_custkey = o_custkey\n  and l_orderkey = o_orderkey\n  and o_orderdate >= date '1993-10-01'\n  and o_orderdate < date '1994-01-01'\n  and l_returnflag = 'R'\n  and c_nationkey = n_nationkey\ngroup by\n    c_custkey,\n    c_name,\n    c_acctbal,\n    c_phone,\n    n_name,\n    c_address,\n    c_comment\norder by\n    revenue desc\nlimit 20"]
Query 10 iteration 0 took 2533.0 ms and returned 20 rows
Query 10 iteration 1 took 1853.6 ms and returned 20 rows
Query 10 iteration 2 took 1958.3 ms and returned 20 rows
Query 10 avg time: 2114.96 ms

Before

st --port 60000 --query 10 --path ./data --format tbl --iterations 3`
Running benchmarks with the following options: BallistaBenchmarkOpt { query: 10, debug: false, expected_results: None, iterations: 3, batch_size: 8192, path: "./data", file_format: "tbl", partitions: 2, host: Some("localhost"), port: Some(60000), output_path: None }
Running benchmark with queries 10:
 ["select\n    c_custkey,\n    c_name,\n    sum(l_extendedprice * (1 - l_discount)) as revenue,\n    c_acctbal,\n    n_name,\n    c_address,\n    c_phone,\n    c_comment\nfrom\n    customer,\n    orders,\n    lineitem,\n    nation\nwhere\n        c_custkey = o_custkey\n  and l_orderkey = o_orderkey\n  and o_orderdate >= date '1993-10-01'\n  and o_orderdate < date '1994-01-01'\n  and l_returnflag = 'R'\n  and c_nationkey = n_nationkey\ngroup by\n    c_custkey,\n    c_name,\n    c_acctbal,\n    c_phone,\n    n_name,\n    c_address,\n    c_comment\norder by\n    revenue desc\nlimit 20"]
Query 10 iteration 0 took 2739.5 ms and returned 20 rows
Query 10 iteration 1 took 1855.9 ms and returned 20 rows
Query 10 iteration 2 took 1850.1 ms and returned 20 rows
Query 10 avg time: 2148.49 ms

@milenkovicm
Copy link
Contributor

thanks @danielhumanmod will have a look asap

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @danielhumanmod
I think this makes sense, i have just few minor comments.

I would also propose postponing merge of this for a bit, @andygrove is working on automated benchmark framework, would be good to benchmark overall performance.


log::debug!(
"ShuffleReaderExec::execute({task_id}) max_request_num: {max_request_num}, max_message_size: {max_message_size}"
"ShuffleReaderExec::execute({task_id}) max_request_num: {max_request_num}, max_message_size: {max_message_size}, batch_size: {batch_size}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need batch size here? its part of configuration, we can find out if needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, will remove

Ok(Box::pin(CoalescedShuffleReaderStream {
schema: self.schema.clone(),
input: input_stream,
coalescer: LimitedBatchCoalescer::new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason using LimitedBatchCoalescer instead of BatchCoalescer as there is no fetch limit?

would there be a case where we can use limit 🤔?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could add limit to ShuffleReader, defaulting to None, and add with_limit option, so if we find a use-case we can set it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the reason using LimitedBatchCoalescer instead of BatchCoalescer as there is no fetch limit?

would there be a case where we can use limit 🤔?

Regarding LimitedBatchCoalescer, I initially chose it because it offers better encapsulation for the stream state machine compared to the raw BatchCoalescer. Specifically, it manages the finished state and provides PushBatchStatus, which makes the poll_next implementation cleaner and safer, even without a limit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could add limit to ShuffleReader, defaulting to None, and add with_limit option, so if we find a use-case we can set it ?

Sounds like a good point. One use case I can think of is optimizations like Limit Pushdown to the shuffle read stage

let target_batch_size = 10;

// 4. Manually build the CoalescedShuffleReaderStream
let coalesced_stream = CoalescedShuffleReaderStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe adding 'new' method would make sense

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will add, thanks!

@milenkovicm
Copy link
Contributor

@andygrove if you need verification for benchmarking framework this PR is a good candidate

@Dandandan
Copy link
Contributor

Dandandan commented Jan 19, 2026

For benchmarking the performance change for small batches, probably it is better to run against Parquet (to make the scan less of a bottleneck) and with a high number of task slots (to make the batch size small).

@milenkovicm
Copy link
Contributor

@sqlbenchmark run tpch

@sqlbenchmark
Copy link

Ballista TPC-H Benchmark Results

PR: #1380 - impl
PR Commit: cf58271
Base Commit: 38fca8b (main)
Scale Factor: SF1
Iterations: 3

Results

Could not parse structured results. Raw output:

Main branch:

ery 12 avg time: 847.1 ms
Query 13 iteration 0 took 813.5 ms and returned 42 rows
Query 13 iteration 1 took 915.0 ms and returned 42 rows
Query 13 iteration 2 took 955.0 ms and returned 42 rows
Query 13 avg time: 894.5 ms
Query 14 iteration 0 took 609.0 ms and returned 1 rows
Query 14 iteration 1 took 552.0 ms and returned 1 rows
Query 14 iteration 2 took 611.7 ms and returned 1 rows
Query 14 avg time: 590.9 ms
Query 15 iteration 0 took 1020.6 ms and returned 0 rows
Query 15 iteration 1 took 917.2 ms and returned 0 rows
Query 15 iteration 2 took 917.6 ms and returned 0 rows
Query 15 avg time: 951.8 ms
Query 16 iteration 0 took 1222.9 ms and returned 18314 rows
Query 16 iteration 1 took 1220.9 ms and returned 18314 rows
Query 16 iteration 2 took 1223.2 ms and returned 18314 rows
Query 16 avg time: 1222.3 ms
Query 17 iteration 0 took 754.9 ms and returned 1 rows
Query 17 iteration 1 took 713.6 ms and returned 1 rows
Query 17 iteration 2 took 816.5 ms and returned 1 rows
Query 17 avg time: 761.6 ms
Query 18 iteration 0 took 1262.5 ms and returned 57 rows
Query 18 iteration 1 took 1321.6 ms and returned 57 rows
Query 18 iteration 2 took 1220.5 ms and returned 57 rows
Query 18 avg time: 1268.2 ms
Query 19 iteration 0 took 714.9 ms and returned 1 rows
Query 19 iteration 1 took 715.1 ms and returned 1 rows
Query 19 iteration 2 took 756.3 ms and returned 1 rows
Query 19 avg time: 728.8 ms
Query 20 iteration 0 took 1059.4 ms and returned 186 rows
Query 20 iteration 1 took 917.3 ms and returned 186 rows
Query 20 iteration 2 took 957.9 ms and returned 186 rows
Query 20 avg time: 978.2 ms
Query 21 iteration 0 took 1769.6 ms and returned 100 rows
Query 21 iteration 1 took 1731.1 ms and returned 100 rows
Query 21 iteration 2 took 1625.9 ms and returned 100 rows
Query 21 avg time: 1708.9 ms
Query 22 iteration 0 took 918.1 ms and returned 7 rows
Query 22 iteration 1 took 918.6 ms and returned 7 rows
Query 22 iteration 2 took 917.2 ms and returned 7 rows
Query 22 avg time: 917.9 ms

PR branch:

    Finished `release` profile [optimized] target(s) in 0.18s
     Running `target/release/tpch benchmark ballista --host localhost --port 50050 --path /app/data --format parquet --iterations 3`
error: The following required arguments were not provided:
    --query <query>

USAGE:
    tpch benchmark ballista --batch-size <batch-size> --format <file-format> --host <host> --iterations <iterations> --partitions <partitions> --path <path> --port <port> --query <query>

For more information try --help


Automated benchmark run by dfbench

@andygrove
Copy link
Member

@milenkovicm the benchmark script failed because this PR does not contain the changes from #1391

@milenkovicm
Copy link
Contributor

Perhaps @danielhumanmod could rebase the PR

@danielhumanmod danielhumanmod force-pushed the coalescer_in_shuffle_reader branch from cf58271 to 118b6e8 Compare January 21, 2026 04:35
@danielhumanmod
Copy link
Contributor Author

Rebased, try benchmark test again

@danielhumanmod
Copy link
Contributor Author

@sqlbenchmark run tpch

3 similar comments
@andygrove
Copy link
Member

@sqlbenchmark run tpch

@milenkovicm
Copy link
Contributor

@sqlbenchmark run tpch

@andygrove
Copy link
Member

@sqlbenchmark run tpch

@sqlbenchmark
Copy link

Ballista TPC-H Benchmark Results

PR: #1380 - fix format and simplify new
PR Commit: b7be1f7
Base Commit: 91558a5 (main)
Scale Factor: SF1
Iterations: 3

Query Comparison

Query Main (ms) PR (ms) Change
Q1 817.00 783.90 ⚪ -4.1%
Q2 1319.70 1354.50 ⚪ +2.6%
Q3 991.60 1101.70 🔴 +11.1%
Q4 831.00 877.30 🔴 +5.6%
Q5 1745.30 1737.40 ⚪ -0.5%
Q6 401.50 401.70 ⚪ +0.0%
Q7 1825.40 1825.70 ⚪ +0.0%
Q8 2298.90 2153.00 🟢 -6.3%
Q9 1927.00 1940.60 ⚪ +0.7%
Q10 1237.30 1350.00 🔴 +9.1%
Q11 1045.90 1025.90 ⚪ -1.9%
Q12 863.00 841.40 ⚪ -2.5%
Q13 917.10 928.00 ⚪ +1.2%
Q14 617.60 618.20 ⚪ +0.1%
Q15 964.10 958.40 ⚪ -0.6%
Q16 1230.30 1251.10 ⚪ +1.7%
Q17 721.30 686.10 ⚪ -4.9%
Q18 1270.00 1282.10 ⚪ +1.0%
Q19 695.90 723.10 ⚪ +3.9%
Q20 978.60 1027.20 ⚪ +5.0%
Q21 1703.00 1770.10 ⚪ +3.9%
Q22 877.40 911.90 ⚪ +3.9%

Total: Main=25278.90ms, PR=25549.30ms (+1.1%)


Automated benchmark run by dfbench

@andygrove
Copy link
Member

@sqlbenchmark run tpch

There is currently a list of contributors that have permissions to run this. I will open this up to more people once it has been through more testing. It is very experimental at the moment.

@andygrove
Copy link
Member

@sqlbenchmark run tpch -s 10 -i 3

@sqlbenchmark
Copy link

Ballista TPC-H Benchmark Results

PR: #1380 - fix format and simplify new
PR Commit: b7be1f7
Base Commit: cd4ead8 (main)
Scale Factor: SF1
Iterations: 3

Query Comparison

Query Main (ms) PR (ms) Change
Q1 816.10 817.00 ⚪ +0.1%
Q2 1351.00 1318.40 ⚪ -2.4%
Q3 1100.30 999.00 🟢 -9.2%
Q4 827.50 827.40 ⚪ -0.0%
Q5 1655.40 1776.20 🔴 +7.3%
Q6 401.20 408.50 ⚪ +1.8%
Q7 1798.20 1898.70 🔴 +5.6%
Q8 2387.20 2319.40 ⚪ -2.8%
Q9 1856.20 1912.10 ⚪ +3.0%
Q10 1268.40 1350.10 🔴 +6.4%
Q11 1031.60 1032.60 ⚪ +0.1%
Q12 874.90 827.60 🟢 -5.4%
Q13 948.00 929.20 ⚪ -2.0%
Q14 639.00 625.60 ⚪ -2.1%
Q15 1087.30 984.90 🟢 -9.4%
Q16 1236.80 1222.10 ⚪ -1.2%
Q17 741.70 740.90 ⚪ -0.1%
Q18 1371.20 1417.30 ⚪ +3.4%
Q19 782.60 749.00 ⚪ -4.3%
Q20 999.10 1031.90 ⚪ +3.3%
Q21 1771.50 1762.80 ⚪ -0.5%
Q22 930.30 910.20 ⚪ -2.2%

Total: Main=25875.50ms, PR=25860.90ms (-0.1%)


Automated benchmark run by dfbench

@sqlbenchmark
Copy link

Ballista TPC-H Benchmark Results

PR: #1380 - fix format and simplify new
PR Commit: b7be1f7
Base Commit: cd4ead8 (main)
Scale Factor: SF10
Iterations: 3

Query Comparison

Query Main (ms) PR (ms) Change
Q1 2621.60 2655.50 ⚪ +1.3%
Q2 1914.20 1877.40 ⚪ -1.9%
Q3 2074.00 2112.60 ⚪ +1.9%
Q4 1248.90 1301.00 ⚪ +4.2%
Q5 4046.90 4199.60 ⚪ +3.8%
Q6 943.00 996.90 🔴 +5.7%
Q7 5601.90 5519.40 ⚪ -1.5%
Q8 5155.20 5128.40 ⚪ -0.5%
Q9 6406.80 6414.00 ⚪ +0.1%
Q10 2480.70 2408.60 ⚪ -2.9%
Q11 1484.70 1418.60 ⚪ -4.5%
Q12 1843.90 1769.50 ⚪ -4.0%
Q13 2444.90 2211.40 🟢 -9.6%
Q14 1166.50 1065.30 🟢 -8.7%
Q15 1601.80 1596.00 ⚪ -0.4%
Q16 1318.30 1338.40 ⚪ +1.5%
Q17 4877.10 4296.30 🟢 -11.9%
Q18 7092.60 6496.30 🟢 -8.4%
Q19 1866.20 1865.00 ⚪ -0.1%
Q20 1831.60 1742.50 ⚪ -4.9%
Q21 6209.10 5693.70 🟢 -8.3%
Q22 1087.00 1149.00 🔴 +5.7%

Total: Main=65316.90ms, PR=63255.40ms (-3.2%)


Automated benchmark run by dfbench

Copy link
Contributor

@milenkovicm milenkovicm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benchmarks look ok, i think this change makes sense.
thanks @danielhumanmod

@milenkovicm milenkovicm merged commit bbf9d30 into apache:main Jan 21, 2026
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Consider adding BatchCoalescer to ShuffleReader

5 participants