Skip to content

Conversation

@JSOD11
Copy link
Collaborator

@JSOD11 JSOD11 commented Dec 17, 2025

Add new test data and testing showing distributed datafusion's ability to leverage recent additions to upstream datafusion to register and utilize hive-style partitioned data sources to generate optimal distributed plans avoiding data-shuffling repartitions and using partitioned hash joins.

Also added a tool for using the datafusion cli to regenerate parquet files from easy-to-read csv files.

This PR is based off of the single node superset satisfaction datafusion PR linked above. Once that PR is merged upstream, I'll switch the datafusion base in this PR's Cargo.toml to reference the newer datafusion version.

@JSOD11 JSOD11 changed the title [QEIO-81] Join testing Join testing Dec 22, 2025
tests/join.rs Outdated
│ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
│ DataSourceExec: file_groups={4 groups: [[testdata/join/parquet/dim/d_dkey=A/data0.parquet], [testdata/join/parquet/dim/d_dkey=B/data0.parquet], [testdata/join/parquet/dim/d_dkey=C/data0.parquet], [testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, host, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
│ PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1]
│ DataSourceExec: file_groups={4 groups: [[testdata/join/parquet/fact/f_dkey=A/data0.parquet], [testdata/join/parquet/fact/f_dkey=B/data2.parquet, testdata/join/parquet/fact/f_dkey=B/data0.parquet, testdata/join/parquet/fact/f_dkey=B/data1.parquet], [testdata/join/parquet/fact/f_dkey=C/data0.parquet, testdata/join/parquet/fact/f_dkey=C/data1.parquet], [testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After you define the sorted columns, you would see output_ordering= .... here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be something broken here, I don't see output_ordering showing up in the plan. I left a TODO to figure this out.

@JSOD11 JSOD11 changed the title Join testing Distributed hive-style join testing with superset satisfaction Dec 23, 2025
@JSOD11 JSOD11 marked this pull request as ready for review December 23, 2025 16:53
tests/join.rs Outdated
d.host
FROM dim d
INNER JOIN fact f ON d.d_dkey = f.f_dkey
WHERE d.service = 'log'
Copy link
Collaborator

@NGA-TRAN NGA-TRAN Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add ORDER BY f_dkey, timestamp in here and i think you will see the sort order in the query plan

Copy link
Collaborator Author

@JSOD11 JSOD11 Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding ORDER BY to this query seems to produce a plan with a SortExec, no sort order in the plan other than the ordering_mode=PartiallySorted([0, 1]) which I think was already there.

Without ORDER BY:

┌───── DistributedExec ── Tasks: t0:[p0] 
│ CoalescePartitionsExec
│   [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
  ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3] 
  │ ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value]
  │   AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1])
  │     ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value]
  │       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4]
  │         FilterExec: service@1 = log, projection=[env@0, d_dkey@2]
  │           PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1] 
  │             DataSourceExec: file_groups={4 groups: [[testdata/join/parquet/dim/d_dkey=A/data0.parquet], [testdata/join/parquet/dim/d_dkey=B/data0.parquet], [testdata/join/parquet/dim/d_dkey=C/data0.parquet], [testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
  │         PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1] 
  │           DataSourceExec: file_groups={4 groups: [[testdata/join/parquet/fact/f_dkey=A/data0.parquet], [testdata/join/parquet/fact/f_dkey=B/data2.parquet, testdata/join/parquet/fact/f_dkey=B/data0.parquet, testdata/join/parquet/fact/f_dkey=B/data1.parquet], [testdata/join/parquet/fact/f_dkey=C/data0.parquet, testdata/join/parquet/fact/f_dkey=C/data1.parquet], [testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]  
  └──────────────────────────────────────────────────

With ORDER_BY:

┌───── DistributedExec ── Tasks: t0:[p0] 
│ SortPreservingMergeExec: [f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST]
│   [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
  ┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3] 
  │ SortExec: expr=[f_dkey@0 ASC NULLS LAST, time_bin@1 ASC NULLS LAST], preserve_partitioning=[true]
  │   ProjectionExec: expr=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value]
  │     AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1])
  │       ProjectionExec: expr=[f_dkey@3 as f_dkey, env@0 as env, timestamp@1 as timestamp, value@2 as value]
  │         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@2)], projection=[env@0, timestamp@2, value@3, f_dkey@4]
  │           FilterExec: service@1 = log, projection=[env@0, d_dkey@2]
  │             PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1] 
  │               DataSourceExec: file_groups={4 groups: [[testdata/join/parquet/dim/d_dkey=A/data0.parquet], [testdata/join/parquet/dim/d_dkey=B/data0.parquet], [testdata/join/parquet/dim/d_dkey=C/data0.parquet], [testdata/join/parquet/dim/d_dkey=D/data0.parquet]]}, projection=[env, service, d_dkey], file_type=parquet, predicate=service@1 = log, pruning_predicate=service_null_count@2 != row_count@3 AND service_min@0 <= log AND log <= service_max@1, required_guarantees=[service in (log)]
  │           PartitionIsolatorExec: t0:[p0,p1,__,__] t1:[__,__,p0,p1] 
  │             DataSourceExec: file_groups={4 groups: [[testdata/join/parquet/fact/f_dkey=A/data0.parquet], [testdata/join/parquet/fact/f_dkey=B/data2.parquet, testdata/join/parquet/fact/f_dkey=B/data0.parquet, testdata/join/parquet/fact/f_dkey=B/data1.parquet], [testdata/join/parquet/fact/f_dkey=C/data0.parquet, testdata/join/parquet/fact/f_dkey=C/data1.parquet], [testdata/join/parquet/fact/f_dkey=D/data0.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet, predicate=DynamicFilter [ empty ]
  └──────────────────────────────────────────────────

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. You just need to add order by into the first 2 queries and verify there are no SortExec in both cases. It is fine if you do not see to output_ordering as long as no SortExec, we know the sort order is propagated

I let @gene-bordegaray review and approve this. We need to wait for DF 52 to merge this anyway

tests/join.rs Outdated
│ RepartitionExec: partitioning=Hash([env@0, time_bin@1], 4), input_partitions=2
│ AggregateExec: mode=Partial, gby=[env@1 as env, time_bin@0 as time_bin], aggr=[avg(a.max_bin_value)]
│ ProjectionExec: expr=[date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp)@1 as time_bin, env@2 as env, max(j.value)@3 as max_bin_value]
│ AggregateExec: mode=SinglePartitioned, gby=[f_dkey@0 as f_dkey, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }, timestamp@2) as date_bin(IntervalMonthDayNano("IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 30000000000 }"),j.timestamp), env@1 as env], aggr=[max(j.value)], ordering_mode=PartiallySorted([0, 1])
Copy link
Collaborator

@NGA-TRAN NGA-TRAN Dec 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ordering_mode=PartiallySorted([0, 1] tells me the plans know data is sorted by f_dkey, date_bin but it is not shown in the DataSourceExec and HashJoin. Vey annoying!

tests/join.rs Outdated
Comment on lines 60 to 63
distributed_ctx
.state_ref()
.write()
.config_mut()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we ensuring partitioned joins over collect lefts?

is it because it forces in a single stage?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many reasons:

  1. Because the streams/partitions on both build and probe side do not overlap on join key, partitioned hash join will be more efficient. All the hash tables do not overlap.
  2. We do not need to do distinct aggregation on the build side.
  3. It will be tricky to support collect left join on a distributed plan

Copy link
Collaborator

@gabotechs gabotechs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice tests!

My only relevant comment is about using insta and the assert_snapshot!() macro for asserting snapshots rather than copy-pasting outputs into code, besides that it looks like pretty good test additions 🚀

About the tests themselves, I'll trust whatever @gene-bordegaray and @NGA-TRAN have to say.

Cargo.toml Outdated
Comment on lines 71 to 78
arrow = "57.0.0"
arrow = "57.0.1"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arrow was updated in #268. If you update with main you should be fine.

tests/join.rs Outdated
// hive-style partitioning and avoiding data-shuffling repartitions.
// —————————————————————————————————————————————————————————————

let target_plan = r#"┌───── DistributedExec ── Tasks: t0:[p0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertions should be handled using insta rather than copy-pasting outputs from stdout.

You have examples on how to do that here https://github.com/datafusion-contrib/datafusion-distributed/blob/main/tests/tpch_plans_test.rs#L19-L19.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated!

tests/join.rs Outdated
// hive-style partitioning and avoiding data-shuffling repartitions.
// —————————————————————————————————————————————————————————————

let target_plan = r#"┌───── DistributedExec ── Tasks: t0:[p0]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, using insta like in the rest of the project will provide a better DX

tests/join.rs Outdated
│ [Stage 1] => NetworkCoalesceExec: output_partitions=4, input_tasks=2
└──────────────────────────────────────────────────
┌───── Stage 1 ── Tasks: t0:[p0..p1] t1:[p2..p3]
│ SortExec: expr=[f_dkey@0 ASC NULLS LAST, timestamp@1 ASC NULLS LAST], preserve_partitioning=[true]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to find the right setting to not see this SortExec here

Copy link
Collaborator

@gabotechs gabotechs Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 We might not be propagating sort orders correctly across network boundaries...

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point is:

  • the data is already sorted and we want to keep that property in the local node
  • we would be able to keep the sort order when we collecting them at the end in the distributed system. @gene-bordegaray has a proposal (internal doc)

For this PR, in order to avoid SortExec, we only need to set some thing such as with_output_ordering or something like that. Hopefully, @JSOD11 will be able to find it or we will find it together

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I took a look at this yesterday wasn't able to get it to work, from what I saw it wasn't as trivial as flipping a flag to with_output_ordering as that method requires a different path to register the data, but I'll take another stab at it.

In my investigation I may have identified a bug upstream causing the sort order to not appear in the plan which I'm looking into as well. However, from what I can tell fixing that bug may not also fix the SortExec issue we're seeing — they could be separate issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gabotechs and @JSOD11 : would it be able to add sqlloogictest in this repo yet? If so, we would be able to define external table to have data sorted as we want like Gene did it hereand avoid SortExec here.

We do not want to put a lot of time on these tests. If you can use sqlloogictest to avoid SortExec, let us do so. Otherwise, I am fine to have the tests with SortExec for now

Copy link
Collaborator

@gabotechs gabotechs Dec 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue with sqllogictests is that the correct answer is always going to be not to distribute them, because they are very small. They are suitable for testing DataFusion, but I'm afraid that here they are not going to cover any of this project capabilities, and we are just going to end up testing DataFusion itself.

Making a DataSourceExec declare a specific sort ordering is trivial though, we should be able to do it with the current tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a DataSourceExec declare a specific sort ordering is trivial though, we should be able to do it with the current tests.

This is all we need. Hopefully, you and Justin can make it work

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took another look, think I figured it out.

                col("f_dkey").sort(true, true),
                col("timestamp").sort(true, true),

Turns out this true, true caused the ordering to be ASC, NULLS FIRST, and true, false causes ASC, NULLS LAST, which is what we want. Changing to

                col("f_dkey").sort(true, false),
                col("timestamp").sort(true, false),

seems to have done the trick

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perfect. Thanks @JSOD11

@JSOD11
Copy link
Collaborator Author

JSOD11 commented Dec 30, 2025

It is my understanding that we cannot merge this PR until Gene's PR is merged upstream.

If this looks good to everyone now, we will have this PR sit until DF 52. Then I'll update this PR to point to the latest datafusion version, fix any conflicts, and merge it.

@NGA-TRAN
Copy link
Collaborator

It is my understanding that we cannot merge this PR until gene-bordegaray/datafusion#3 is merged upstream.

If this looks good to everyone now, we will have this PR sit until DF 52. Then I'll update this PR to point to the latest datafusion version, fix any conflicts, and merge it.

Yes

@JSOD11 JSOD11 merged commit deb53e9 into datafusion-contrib:main Jan 22, 2026
7 checks passed
@gabotechs
Copy link
Collaborator

I see that this PR has been merged including several unrelated upgrades to Arrow and other libraries.

In general, I'd be careful with merging this kind of things in unrelated PRs, as this kind of upgrades need to be more carefully reviewed. For example, the Arrow upgrade proposed in this PR has a change that needed to be reverted:

Comment on lines +1 to +2
#[cfg(test)]
mod tests {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test seem to be using utils from the integration feature, but it's not hidden behind the integration feature like all other integration tests. I think we need add it here like this:

#[cfg(all(feature = "integration", test))]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be breaking cargo clippy on main

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will resolve in a follow-up PR.

let (state, logical_plan) = df.into_parts();
let physical_plan = state.create_physical_plan(&logical_plan).await?;
let distributed_plan = display_plan_ascii(physical_plan.as_ref(), false);
println!("\n——————— DISTRIBUTED PLAN ———————\n\n{}", distributed_plan);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file might be missing some cargo clippy --fix, maybe we don't have clippy correctly configured?

This line should look something like:

Suggested change
println!("\n——————— DISTRIBUTED PLAN ———————\n\n{}", distributed_plan);
println!("\n——————— DISTRIBUTED PLAN ———————\n\n{distributed_plan}");

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took a look at this, proposing a fix in #303.

@JSOD11 JSOD11 mentioned this pull request Jan 22, 2026
@JSOD11
Copy link
Collaborator Author

JSOD11 commented Jan 22, 2026

Sounds good, yes when I upgraded to DF52 there it seems some other changes were made to Cargo.lock — I should have re-requested a review before merging since this PR is older and will do so next time.

Regarding the clippy changes, I did some investigation and believe that for these sorts of warnings to be blocked by CI we'll need a change such as what I've proposed in #303.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants