Skip to content

Commit c239e64

Browse files
committed
chore: update datafusion to 52.0
1 parent 38fca8b commit c239e64

File tree

17 files changed

+623
-603
lines changed

17 files changed

+623
-603
lines changed

Cargo.lock

Lines changed: 439 additions & 421 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@ members = ["ballista-cli", "ballista/client", "ballista/core", "ballista/executo
2121
resolver = "3"
2222

2323
[workspace.package]
24-
# edition to be changed to 2024 when we update
25-
# Minimum Supported Rust Version (MSRV) to 1.85.0
26-
# which is datafusion 49
27-
#
2824
edition = "2024"
2925
# we should try to follow datafusion version
3026
rust-version = "1.88.0"
@@ -33,11 +29,11 @@ rust-version = "1.88.0"
3329
arrow = { version = "57", features = ["ipc_compression"] }
3430
arrow-flight = { version = "57", features = ["flight-sql-experimental"] }
3531
clap = { version = "4.5", features = ["derive", "cargo"] }
36-
datafusion = "51.0.0"
37-
datafusion-cli = "51.0.0"
38-
datafusion-proto = "51.0.0"
39-
datafusion-proto-common = "51.0.0"
40-
datafusion-substrait = "51.0.0"
32+
datafusion = "52.0.0"
33+
datafusion-cli = "52.0.0"
34+
datafusion-proto = "52.0.0"
35+
datafusion-proto-common = "52.0.0"
36+
datafusion-substrait = "52.0.0"
4137
object_store = "0.12"
4238
prost = "0.14"
4339
prost-types = "0.14"
@@ -51,7 +47,7 @@ tonic-prost-build = { version = "0.14" }
5147
tracing = "0.1"
5248
tracing-appender = "0.2.2"
5349
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
54-
ctor = { version = "0.5" }
50+
ctor = { version = "0.6" }
5551
mimalloc = { version = "0.1" }
5652

5753
tokio = { version = "1" }

ballista/client/tests/context_checks.rs

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mod supported {
2828
use datafusion::prelude::*;
2929
use datafusion::{assert_batches_eq, prelude::SessionContext};
3030
use rstest::*;
31+
use std::path::PathBuf;
3132

3233
#[rstest::fixture]
3334
fn test_data() -> String {
@@ -714,17 +715,22 @@ mod supported {
714715
];
715716

716717
let write_dir = tempfile::tempdir().expect("temporary directory to be created");
717-
let write_dir_path = write_dir
718-
.path()
719-
.to_str()
720-
.expect("path to be converted to str");
718+
let write_dir_path = PathBuf::from(
719+
write_dir
720+
.path()
721+
.to_str()
722+
.expect("path to be converted to str"),
723+
);
724+
725+
let parquet_file = write_dir_path.join("p_written_table.parquet");
726+
let parquet_file = parquet_file.to_str().expect("cannot create csv file");
721727

722728
ctx.sql("select * from test")
723729
.await?
724-
.write_parquet(write_dir_path, Default::default(), Default::default())
730+
.write_parquet(parquet_file, Default::default(), Default::default())
725731
.await?;
726732

727-
ctx.register_parquet("p_written_table", write_dir_path, Default::default())
733+
ctx.register_parquet("p_written_table", parquet_file, Default::default())
728734
.await?;
729735

730736
let result = ctx
@@ -735,12 +741,15 @@ mod supported {
735741

736742
assert_batches_eq!(expected, &result);
737743

744+
let csv_file = write_dir_path.join("c_written_table.csv");
745+
let csv_file = csv_file.to_str().expect("cannot create csv file");
746+
738747
ctx.sql("select * from test")
739748
.await?
740-
.write_csv(write_dir_path, Default::default(), Default::default())
749+
.write_csv(csv_file, Default::default(), Default::default())
741750
.await?;
742751

743-
ctx.register_csv("c_written_table", write_dir_path, Default::default())
752+
ctx.register_csv("c_written_table", csv_file, Default::default())
744753
.await?;
745754

746755
let result = ctx
@@ -751,12 +760,15 @@ mod supported {
751760

752761
assert_batches_eq!(expected, &result);
753762

763+
let json_file = write_dir_path.join("j_written_table.json");
764+
let json_file = json_file.to_str().expect("cannot create csv file");
765+
754766
ctx.sql("select * from test")
755767
.await?
756-
.write_json(write_dir_path, Default::default(), Default::default())
768+
.write_json(json_file, Default::default(), Default::default())
757769
.await?;
758770

759-
ctx.register_json("j_written_table", write_dir_path, Default::default())
771+
ctx.register_json("j_written_table", json_file, Default::default())
760772
.await?;
761773

762774
let result = ctx
@@ -1048,13 +1060,12 @@ mod supported {
10481060
"| | EmptyRelation: rows=1 |",
10491061
"| physical_plan | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |",
10501062
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1051-
"| | CoalesceBatchesExec: target_batch_size=8192 |",
1052-
"| | RepartitionExec: partitioning=Hash([id@0], 16), input_partitions=1 |",
1053-
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1054-
"| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id] |",
1055-
"| | UnnestExec |",
1056-
"| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] |",
1057-
"| | PlaceholderRowExec |",
1063+
"| | RepartitionExec: partitioning=Hash([id@0], 16), input_partitions=1 |",
1064+
"| | AggregateExec: mode=Partial, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1065+
"| | ProjectionExec: expr=[__unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)),depth=1)@0 as id] |",
1066+
"| | UnnestExec |",
1067+
"| | ProjectionExec: expr=[[1, 2, 3, 4, 5] as __unnest_placeholder(make_array(Int64(1),Int64(2),Int64(3),Int64(4),Int64(5)))] |",
1068+
"| | PlaceholderRowExec |",
10581069
"| | |",
10591070
"| distributed_plan | =========ResolvedStage[stage_id=1.0, partitions=1]========= |",
10601071
"| | ShuffleWriterExec: partitioning: Hash([id@0], 16) |",
@@ -1069,8 +1080,7 @@ mod supported {
10691080
"| | ShuffleWriterExec: partitioning: None |",
10701081
"| | ProjectionExec: expr=[count(Int64(1))@1 as count(*), id@0 as id] |",
10711082
"| | AggregateExec: mode=FinalPartitioned, gby=[id@0 as id], aggr=[count(Int64(1))] |",
1072-
"| | CoalesceBatchesExec: target_batch_size=8192 |",
1073-
"| | UnresolvedShuffleExec: partitioning: Hash([id@0], 16) |",
1083+
"| | UnresolvedShuffleExec: partitioning: Hash([id@0], 16) |",
10741084
"| | |",
10751085
"| | |",
10761086
"+------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+",

ballista/core/proto/ballista.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ message OperatorMetric {
292292
uint64 output_bytes = 12;
293293
NamedPruningMetrics pruning_metrics = 13;
294294
NamedRatio ratio = 14;
295+
uint64 output_batches = 15;
295296
}
296297
}
297298

ballista/core/proto/datafusion.proto

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ message CreateExternalTableNode {
179179

180180
message PrepareNode {
181181
string name = 1;
182+
// We serialize both the data types and the fields for compatibility with
183+
// older versions (newer versions populate both).
182184
repeated datafusion_common.ArrowType data_types = 2;
183185
LogicalPlanNode input = 3;
184186
repeated datafusion_common.Field fields = 4;
@@ -413,6 +415,8 @@ message Wildcard {
413415

414416
message PlaceholderNode {
415417
string id = 1;
418+
// We serialize the data type, metadata, and nullability separately to maintain
419+
// compatibility with older versions
416420
datafusion_common.ArrowType data_type = 2;
417421
optional bool nullable = 3;
418422
map<string, string> metadata = 4;
@@ -744,6 +748,7 @@ message PhysicalPlanNode {
744748
GenerateSeriesNode generate_series = 33;
745749
SortMergeJoinExecNode sort_merge_join = 34;
746750
MemoryScanExecNode memory_scan = 35;
751+
AsyncFuncExecNode async_func = 36;
747752
}
748753
}
749754

@@ -867,6 +872,8 @@ message PhysicalExprNode {
867872
PhysicalExtensionExprNode extension = 19;
868873

869874
UnknownColumn unknown_column = 20;
875+
876+
PhysicalHashExprNode hash_expr = 21;
870877
}
871878
}
872879

@@ -985,6 +992,15 @@ message PhysicalExtensionExprNode {
985992
repeated PhysicalExprNode inputs = 2;
986993
}
987994

995+
message PhysicalHashExprNode {
996+
repeated PhysicalExprNode on_columns = 1;
997+
uint64 seed0 = 2;
998+
uint64 seed1 = 3;
999+
uint64 seed2 = 4;
1000+
uint64 seed3 = 5;
1001+
string description = 6;
1002+
}
1003+
9881004
message FilterExecNode {
9891005
PhysicalPlanNode input = 1;
9901006
PhysicalExprNode expr = 2;
@@ -1005,6 +1021,15 @@ message PhysicalSortExprNodeCollection {
10051021
repeated PhysicalSortExprNode physical_sort_expr_nodes = 1;
10061022
}
10071023

1024+
message ProjectionExpr {
1025+
string alias = 1;
1026+
PhysicalExprNode expr = 2;
1027+
}
1028+
1029+
message ProjectionExprs {
1030+
repeated ProjectionExpr projections = 1;
1031+
}
1032+
10081033
message FileScanExecConf {
10091034
repeated FileGroup file_groups = 1;
10101035
datafusion_common.Schema schema = 2;
@@ -1020,6 +1045,8 @@ message FileScanExecConf {
10201045

10211046
datafusion_common.Constraints constraints = 11;
10221047
optional uint64 batch_size = 12;
1048+
1049+
optional ProjectionExprs projection_exprs = 13;
10231050
}
10241051

10251052
message ParquetScanExecNode {
@@ -1207,6 +1234,7 @@ message AggregateExecNode {
12071234
repeated bool groups = 9;
12081235
repeated MaybeFilter filter_expr = 10;
12091236
AggLimit limit = 11;
1237+
bool has_grouping_set = 12;
12101238
}
12111239

12121240
message GlobalLimitExecNode {
@@ -1377,4 +1405,10 @@ message SortMergeJoinExecNode {
13771405
JoinFilter filter = 5;
13781406
repeated SortExprNode sort_options = 6;
13791407
datafusion_common.NullEquality null_equality = 7;
1380-
}
1408+
}
1409+
1410+
message AsyncFuncExecNode {
1411+
PhysicalPlanNode input = 1;
1412+
repeated PhysicalExprNode async_exprs = 2;
1413+
repeated string async_expr_names = 3;
1414+
}

ballista/core/proto/datafusion_common.proto

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -351,11 +351,11 @@ message Decimal256{
351351
// Serialized data type
352352
message ArrowType{
353353
oneof arrow_type_enum {
354-
EmptyMessage NONE = 1; // arrow::Type::NA
355-
EmptyMessage BOOL = 2; // arrow::Type::BOOL
356-
EmptyMessage UINT8 = 3; // arrow::Type::UINT8
357-
EmptyMessage INT8 = 4; // arrow::Type::INT8
358-
EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h
354+
EmptyMessage NONE = 1; // arrow::Type::NA
355+
EmptyMessage BOOL = 2; // arrow::Type::BOOL
356+
EmptyMessage UINT8 = 3; // arrow::Type::UINT8
357+
EmptyMessage INT8 = 4; // arrow::Type::INT8
358+
EmptyMessage UINT16 = 5; // represents arrow::Type fields in src/arrow/type.h
359359
EmptyMessage INT16 = 6;
360360
EmptyMessage UINT32 = 7;
361361
EmptyMessage INT32 = 8;
@@ -461,12 +461,14 @@ message CsvOptions {
461461
bytes newlines_in_values = 16; // Indicates if newlines are supported in values
462462
bytes terminator = 17; // Optional terminator character as a byte
463463
bytes truncated_rows = 18; // Indicates if truncated rows are allowed
464+
optional uint32 compression_level = 19; // Optional compression level
464465
}
465466

466467
// Options controlling CSV format
467468
message JsonOptions {
468469
CompressionTypeVariant compression = 1; // Compression type
469470
optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference
471+
optional uint32 compression_level = 3; // Optional compression level
470472
}
471473

472474
message TableParquetOptions {
@@ -519,6 +521,7 @@ message ParquetOptions {
519521
bool skip_metadata = 3; // default = true
520522
bool pushdown_filters = 5; // default = false
521523
bool reorder_filters = 6; // default = false
524+
bool force_filter_selections = 34; // default = false
522525
uint64 data_pagesize_limit = 7; // default = 1024 * 1024
523526
uint64 write_batch_size = 8; // default = 1024
524527
string writer_version = 9; // default = "1.0"
@@ -608,6 +611,8 @@ message Statistics {
608611
Precision num_rows = 1;
609612
Precision total_byte_size = 2;
610613
repeated ColumnStats column_stats = 3;
614+
// total_rows was removed - field 4 is reserved
615+
reserved 4;
611616
}
612617

613618
message ColumnStats {
@@ -616,4 +621,5 @@ message ColumnStats {
616621
Precision sum_value = 5;
617622
Precision null_count = 3;
618623
Precision distinct_count = 4;
619-
}
624+
Precision byte_size = 6;
625+
}

ballista/core/src/execution_plans/shuffle_writer.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,11 @@ impl ShuffleWriterExec {
260260
writers.push(None);
261261
}
262262

263-
let mut partitioner = BatchPartitioner::try_new(
264-
Partitioning::Hash(exprs, num_output_partitions),
263+
let mut partitioner = BatchPartitioner::new_hash_partitioner(
264+
exprs,
265+
num_output_partitions,
265266
write_metrics.repart_time.clone(),
266-
)?;
267+
);
267268

268269
while let Some(result) = stream.next().await {
269270
let input_batch = result?;

ballista/core/src/serde/generated/ballista.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,7 +409,7 @@ pub struct NamedRatio {
409409
pub struct OperatorMetric {
410410
#[prost(
411411
oneof = "operator_metric::Metric",
412-
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14"
412+
tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15"
413413
)]
414414
pub metric: ::core::option::Option<operator_metric::Metric>,
415415
}
@@ -445,6 +445,8 @@ pub mod operator_metric {
445445
PruningMetrics(super::NamedPruningMetrics),
446446
#[prost(message, tag = "14")]
447447
Ratio(super::NamedRatio),
448+
#[prost(uint64, tag = "15")]
449+
OutputBatches(u64),
448450
}
449451
}
450452
/// Used by scheduler

ballista/core/src/serde/scheduler/from_proto.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ impl TryInto<MetricValue> for protobuf::OperatorMetric {
224224
ratio_metrics,
225225
})
226226
}
227+
Some(operator_metric::Metric::OutputBatches(value)) => {
228+
let count = Count::new();
229+
count.add(value as usize);
230+
Ok(MetricValue::OutputBatches(count))
231+
}
227232
None => Err(BallistaError::General(
228233
"scheduler::from_proto(OperatorMetric) metric is None.".to_owned(),
229234
)),

ballista/core/src/serde/scheduler/to_proto.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ impl TryInto<protobuf::OperatorMetric> for &MetricValue {
205205
total: ratio_metrics.total() as u64,
206206
})),
207207
}),
208+
MetricValue::OutputBatches(count) => Ok(protobuf::OperatorMetric {
209+
metric: Some(
210+
operator_metric::Metric::OutputBatches(count.value() as u64),
211+
),
212+
}),
208213
// at the moment there there is no way to serialize custom metrics
209214
// thus at the moment we can't support it
210215
MetricValue::Custom { .. } => Err(BallistaError::General(String::from(

0 commit comments

Comments
 (0)