Skip to content

Commit c10fe77

Browse files
Add tests and fixes for schema resolution bug (#9237)
# Which issue does this PR close? - Closes #9231. # Rationale for this change Avro schema resolution allows a reader schema to represent “nullable” values using a two-branch union (`["null", T]` or `[T, "null"]`) while still reading data written with the non-union schema `T` (i.e. without union discriminants in the encoded data). In `arrow-avro`, resolving a non-union writer type against a reader union (notably for array/list item schemas like `items: ["null", "int"]`) could incorrectly treat the encoded stream as a union and attempt to decode a union discriminant. This would misalign decoding and could surface as `ParseError("bad varint")` for certain files (see #9231). # What changes are included in this PR? - Fix schema resolution when the *writer* schema is non-union and the *reader* schema is a union: - Special-case two-branch unions containing `null` to be treated as “nullable” (capturing whether `null` is first or second), and resolve against the non-null branch. - Improve matching for general reader unions by attempting to resolve against each union variant, preferring a direct match, and constructing the appropriate union resolution mapping for the selected branch. - Ensure promotions are represented at the union-resolution level (avoiding nested promotion resolution on the selected union child). - Add regression coverage for the bug and the fixed behavior: - `test_resolve_array_writer_nonunion_items_reader_nullable_items` (schema resolution / codec) - `test_array_decoding_writer_nonunion_items_reader_nullable_items` (record decoding; ensures correct byte consumption and decoded values) - `test_bad_varint_bug_nullable_array_items` (end-to-end reader regression using a small Avro fixture) - Add a small compressed Avro fixture under `arrow-avro/test/data/bad-varint-bug.avro.gz` used by the regression test. # Are these changes tested? Yes. This PR adds targeted unit/integration tests that reproduce the prior failure mode and validate correct schema resolution and decoding for nullable-union array items. # Are there any user-facing changes? Yes (bug fix): reading Avro files with arrays whose element type is represented as a nullable union in the reader schema (e.g. `items: ["null", "int"]`) now succeeds instead of failing with `ParseError("bad varint")`. No public API changes are intended. --------- Co-authored-by: Mikhail Zabaluev <[email protected]>
1 parent 096751f commit c10fe77

File tree

4 files changed

+191
-17
lines changed

4 files changed

+191
-17
lines changed

arrow-avro/src/codec.rs

Lines changed: 101 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1529,23 +1529,71 @@ impl<'a> Maker<'a> {
15291529
Ok(dt)
15301530
}
15311531
(writer_non_union, Schema::Union(reader_variants)) => {
1532-
let promo = self.find_best_promotion(
1533-
writer_non_union,
1534-
reader_variants.as_slice(),
1535-
namespace,
1536-
);
1537-
let Some((reader_index, promotion)) = promo else {
1538-
return Err(ArrowError::SchemaError(
1539-
"Writer schema does not match any reader union branch".to_string(),
1540-
));
1541-
};
1542-
let mut dt = self.parse_type(reader_schema, namespace)?;
1543-
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1544-
writer_to_reader: Arc::from(vec![Some((reader_index, promotion))]),
1545-
writer_is_union: false,
1546-
reader_is_union: true,
1547-
}));
1548-
Ok(dt)
1532+
if let Some((nullability, non_null_branch)) =
1533+
nullable_union_variants(reader_variants)
1534+
{
1535+
let mut dt = self.resolve_type(writer_non_union, non_null_branch, namespace)?;
1536+
let non_null_idx = match nullability {
1537+
Nullability::NullFirst => 1,
1538+
Nullability::NullSecond => 0,
1539+
};
1540+
#[cfg(feature = "avro_custom_types")]
1541+
Self::propagate_nullability_into_ree(&mut dt, nullability);
1542+
dt.nullability = Some(nullability);
1543+
let promotion = Self::coercion_from(&dt);
1544+
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1545+
writer_to_reader: Arc::from(vec![Some((non_null_idx, promotion))]),
1546+
writer_is_union: false,
1547+
reader_is_union: true,
1548+
}));
1549+
Ok(dt)
1550+
} else {
1551+
let mut best_match: Option<(usize, AvroDataType, Promotion)> = None;
1552+
for (i, variant) in reader_variants.iter().enumerate() {
1553+
if let Ok(resolved_dt) =
1554+
self.resolve_type(writer_non_union, variant, namespace)
1555+
{
1556+
let promotion = Self::coercion_from(&resolved_dt);
1557+
if promotion == Promotion::Direct {
1558+
best_match = Some((i, resolved_dt, promotion));
1559+
break;
1560+
} else if best_match.is_none() {
1561+
best_match = Some((i, resolved_dt, promotion));
1562+
}
1563+
}
1564+
}
1565+
let Some((match_idx, match_dt, promotion)) = best_match else {
1566+
return Err(ArrowError::SchemaError(
1567+
"Writer schema does not match any reader union branch".to_string(),
1568+
));
1569+
};
1570+
let mut children = Vec::with_capacity(reader_variants.len());
1571+
let mut match_dt = Some(match_dt);
1572+
for (i, variant) in reader_variants.iter().enumerate() {
1573+
if i == match_idx {
1574+
if let Some(mut dt) = match_dt.take() {
1575+
if matches!(dt.resolution, Some(ResolutionInfo::Promotion(_))) {
1576+
dt.resolution = None;
1577+
}
1578+
children.push(dt);
1579+
}
1580+
} else {
1581+
children.push(self.parse_type(variant, namespace)?);
1582+
}
1583+
}
1584+
let union_fields = build_union_fields(&children)?;
1585+
let mut dt = AvroDataType::new(
1586+
Codec::Union(children.into(), union_fields, UnionMode::Dense),
1587+
Default::default(),
1588+
None,
1589+
);
1590+
dt.resolution = Some(ResolutionInfo::Union(ResolvedUnion {
1591+
writer_to_reader: Arc::from(vec![Some((match_idx, promotion))]),
1592+
writer_is_union: false,
1593+
reader_is_union: true,
1594+
}));
1595+
Ok(dt)
1596+
}
15491597
}
15501598
(
15511599
Schema::Complex(ComplexType::Array(writer_array)),
@@ -2926,6 +2974,42 @@ mod tests {
29262974
}
29272975
}
29282976

2977+
#[test]
2978+
fn test_resolve_array_writer_nonunion_items_reader_nullable_items() {
2979+
let writer_schema = Schema::Complex(ComplexType::Array(Array {
2980+
items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2981+
attributes: Attributes::default(),
2982+
}));
2983+
let reader_schema = Schema::Complex(ComplexType::Array(Array {
2984+
items: Box::new(mk_union(vec![
2985+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2986+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2987+
])),
2988+
attributes: Attributes::default(),
2989+
}));
2990+
let mut maker = Maker::new(false, false);
2991+
let dt = maker
2992+
.make_data_type(&writer_schema, Some(&reader_schema), None)
2993+
.unwrap();
2994+
if let Codec::List(inner) = dt.codec() {
2995+
assert_eq!(inner.nullability(), Some(Nullability::NullFirst));
2996+
assert!(matches!(inner.codec(), Codec::Int32));
2997+
match inner.resolution.as_ref() {
2998+
Some(ResolutionInfo::Union(info)) => {
2999+
assert!(!info.writer_is_union, "writer should be non-union");
3000+
assert!(info.reader_is_union, "reader should be union");
3001+
assert_eq!(
3002+
info.writer_to_reader.as_ref(),
3003+
&[Some((1, Promotion::Direct))]
3004+
);
3005+
}
3006+
other => panic!("expected Union resolution, got {other:?}"),
3007+
}
3008+
} else {
3009+
panic!("expected List codec");
3010+
}
3011+
}
3012+
29293013
#[test]
29303014
fn test_resolve_fixed_success_name_and_size_match_and_alias() {
29313015
let writer_schema = Schema::Complex(ComplexType::Fixed(Fixed {

arrow-avro/src/reader/mod.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9081,4 +9081,46 @@ mod test {
90819081
"entire RecordBatch mismatch (schema, all columns, all rows)"
90829082
);
90839083
}
9084+
9085+
#[test]
9086+
fn test_bad_varint_bug_nullable_array_items() {
9087+
use flate2::read::GzDecoder;
9088+
use std::io::Read;
9089+
let manifest_dir = env!("CARGO_MANIFEST_DIR");
9090+
let gz_path = format!("{manifest_dir}/test/data/bad-varint-bug.avro.gz");
9091+
let gz_file = File::open(&gz_path).expect("test file should exist");
9092+
let mut decoder = GzDecoder::new(gz_file);
9093+
let mut avro_bytes = Vec::new();
9094+
decoder
9095+
.read_to_end(&mut avro_bytes)
9096+
.expect("should decompress");
9097+
let reader_arrow_schema = Schema::new(vec![Field::new(
9098+
"int_array",
9099+
DataType::List(Arc::new(Field::new("element", DataType::Int32, true))),
9100+
true,
9101+
)])
9102+
.with_metadata(HashMap::from([("avro.name".into(), "table".into())]));
9103+
let reader_schema = AvroSchema::try_from(&reader_arrow_schema)
9104+
.expect("should convert Arrow schema to Avro");
9105+
let mut reader = ReaderBuilder::new()
9106+
.with_reader_schema(reader_schema)
9107+
.build(Cursor::new(avro_bytes))
9108+
.expect("should build reader");
9109+
let batch = reader
9110+
.next()
9111+
.expect("should have one batch")
9112+
.expect("reading should succeed without bad varint error");
9113+
assert_eq!(batch.num_rows(), 1);
9114+
let list_col = batch
9115+
.column(0)
9116+
.as_any()
9117+
.downcast_ref::<ListArray>()
9118+
.expect("should be ListArray");
9119+
assert_eq!(list_col.len(), 1);
9120+
let values = list_col.values();
9121+
let int_values = values.as_primitive::<Int32Type>();
9122+
assert_eq!(int_values.len(), 2);
9123+
assert_eq!(int_values.value(0), 1);
9124+
assert_eq!(int_values.value(1), 2);
9125+
}
90849126
}

arrow-avro/src/reader/record.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2897,6 +2897,54 @@ mod tests {
28972897
assert_eq!(list_arr.value_length(0), 0);
28982898
}
28992899

2900+
#[test]
2901+
fn test_array_decoding_writer_nonunion_items_reader_nullable_items() {
2902+
use crate::schema::Array;
2903+
let writer_schema = Schema::Complex(ComplexType::Array(Array {
2904+
items: Box::new(Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))),
2905+
attributes: Attributes::default(),
2906+
}));
2907+
let reader_schema = Schema::Complex(ComplexType::Array(Array {
2908+
items: Box::new(Schema::Union(vec![
2909+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Null)),
2910+
Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)),
2911+
])),
2912+
attributes: Attributes::default(),
2913+
}));
2914+
let dt = resolved_root_datatype(writer_schema, reader_schema, false, false);
2915+
if let Codec::List(inner) = dt.codec() {
2916+
assert_eq!(
2917+
inner.nullability(),
2918+
Some(Nullability::NullFirst),
2919+
"items should be nullable"
2920+
);
2921+
} else {
2922+
panic!("expected List codec");
2923+
}
2924+
let mut decoder = Decoder::try_new(&dt).unwrap();
2925+
let mut data = encode_avro_long(2);
2926+
data.extend(encode_avro_int(10));
2927+
data.extend(encode_avro_int(20));
2928+
data.extend(encode_avro_long(0));
2929+
let mut cursor = AvroCursor::new(&data);
2930+
decoder.decode(&mut cursor).unwrap();
2931+
assert_eq!(
2932+
cursor.position(),
2933+
data.len(),
2934+
"all bytes should be consumed"
2935+
);
2936+
let array = decoder.flush(None).unwrap();
2937+
let list_arr = array.as_any().downcast_ref::<ListArray>().unwrap();
2938+
assert_eq!(list_arr.len(), 1, "one list/row");
2939+
assert_eq!(list_arr.value_length(0), 2, "two items in the list");
2940+
let values = list_arr.values().as_primitive::<Int32Type>();
2941+
assert_eq!(values.len(), 2);
2942+
assert_eq!(values.value(0), 10);
2943+
assert_eq!(values.value(1), 20);
2944+
assert!(!values.is_null(0));
2945+
assert!(!values.is_null(1));
2946+
}
2947+
29002948
#[test]
29012949
fn test_decimal_decoding_fixed256() {
29022950
let dt = avro_from_codec(Codec::Decimal(50, Some(2), Some(32)));
254 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)