Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4ed172b
feat: Implement an AsyncReader for avro using ObjectStore
EmilyMatt Nov 26, 2025
e5c7f57
Merge branch 'main' into avro-async-reader
EmilyMatt Nov 26, 2025
f5bfd35
feature gate use
EmilyMatt Nov 26, 2025
32b0760
update comments
EmilyMatt Nov 26, 2025
2251be5
file size is mandatory
EmilyMatt Nov 26, 2025
79af114
finish immediately
EmilyMatt Nov 26, 2025
4e207ea
remove object store form default
EmilyMatt Nov 27, 2025
e04337c
remove object store form default
EmilyMatt Nov 27, 2025
854cd95
Merge branch 'main' into avro-async-reader
EmilyMatt Dec 1, 2025
8e0e46e
Use builder pattern, fallback to get the schema from the arrow schema…
EmilyMatt Dec 7, 2025
4f88571
Merge branch 'main' into avro-async-reader
EmilyMatt Dec 7, 2025
cb19fad
remove accidental changes
EmilyMatt Dec 7, 2025
d59a2f5
Merge branch 'apache:main' into avro-async-reader
EmilyMatt Jan 8, 2026
e57801d
Merge branch 'refs/heads/main' into avro-async-reader
EmilyMatt Jan 12, 2026
939cf8e
rebase, address CR
EmilyMatt Jan 12, 2026
03de289
Merge remote-tracking branch 'upstream/avro-async-reader' into avro-a…
EmilyMatt Jan 12, 2026
a071ab8
fix some docs
EmilyMatt Jan 12, 2026
1cc8d8a
update cfg
EmilyMatt Jan 12, 2026
1a2169b
Add some docs
EmilyMatt Jan 12, 2026
c6634ad
Add a basic roundtrip test
EmilyMatt Jan 12, 2026
f7cffc9
Merge branch 'main' into avro-async-reader
EmilyMatt Jan 18, 2026
4361057
address CR, fix some bugs
EmilyMatt Jan 18, 2026
94aae91
use smaller buffer
EmilyMatt Jan 18, 2026
3216788
add error state
EmilyMatt Jan 19, 2026
138e72d
address CR
EmilyMatt Jan 21, 2026
977f619
add tests
EmilyMatt Jan 21, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion arrow-avro/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,26 @@ sha256 = ["dep:sha2"]
small_decimals = []
avro_custom_types = ["dep:arrow-select"]

# Enable async APIs
async = ["futures", "tokio"]
# Enable object_store integration
object_store = ["dep:object_store", "async"]

[dependencies]
arrow-schema = { workspace = true }
arrow-buffer = { workspace = true }
arrow-array = { workspace = true }
arrow-select = { workspace = true, optional = true }

object_store = { version = "0.12", default-features = false, optional = true }

bytes = { version = "1.11.0", default-features = false, features = ["std"] }
serde_json = { version = "1.0", default-features = false, features = ["std"] }
serde = { version = "1.0.188", features = ["derive"] }
flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
], optional = true }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
snap = { version = "1.0", default-features = false, optional = true }
zstd = { version = "0.13", default-features = false, optional = true }
bzip2 = { version = "0.6.0", optional = true }
Expand All @@ -66,6 +76,7 @@ indexmap = "2.10"
rand = "0.9"
md5 = { version = "0.8", optional = true }
sha2 = { version = "0.10", optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }

[dev-dependencies]
arrow-data = { workspace = true }
Expand All @@ -78,11 +89,12 @@ criterion = { workspace = true, default-features = false }
tempfile = "3.3"
arrow = { workspace = true }
futures = "0.3.31"
bytes = "1.10.1"
async-stream = "0.3.6"
apache-avro = "0.21.0"
num-bigint = "0.4"
object_store = { version = "0.12", default-features = false, features = ["fs"] }
once_cell = "1.21.3"
tokio = { version = "1.0", default-features = false, features = ["macros", "rt-multi-thread", "io-util", "fs"] }

[[bench]]
name = "avro_reader"
Expand Down
60 changes: 51 additions & 9 deletions arrow-avro/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Disable defaults and pick only what you need (see **Feature Flags**):

```toml
[dependencies]
arrow-avro = { version = "57.0.0", default-features = false, features = ["deflate", "snappy"] }
arrow-avro = { version = "58.0.0", default-features = false, features = ["deflate", "snappy"] }
```

---
Expand Down Expand Up @@ -105,6 +105,36 @@ fn main() -> anyhow::Result<()> {

See the crate docs for runnable SOE and Confluent round‑trip examples.

### Async reading from object stores (`object_store` feature)

```rust,ignore
use std::sync::Arc;
use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader};
use futures::TryStreamExt;
use object_store::ObjectStore;
use object_store::local::LocalFileSystem;
use object_store::path::Path;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
let path = Path::from("data/example.avro");

let meta = store.head(&path).await?;
let reader = AvroObjectReader::new(store, path);

let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024)
.try_build()
.await?;

let batches: Vec<_> = stream.try_collect().await?;
for batch in batches {
println!("rows: {}", batch.num_rows());
}
Ok(())
}
```

---

## Feature Flags (what they do and when to use them)
Expand All @@ -128,15 +158,22 @@ See the crate docs for runnable SOE and Confluent round‑trip examples.
* Only **OCF** uses these codecs (they compress per‑block). They do **not** apply to raw Avro frames used by Confluent wire format or SOE. The crate’s `compression` module is specifically for **OCF blocks**.
* `deflate` uses `flate2` with the `rust_backend` (no system zlib required).

### Async & Object Store

| Feature | Default | What it enables | When to use |
|----------------|--------:|-----------------------------------------------------------------------------|-------------------------------------------------------------------------------|
| `async` | ⬜ | Async APIs for reading Avro via `futures` and `tokio` | Enable for non-blocking async Avro reading with `AsyncAvroFileReader`. |
| `object_store` | ⬜ | Integration with `object_store` crate (implies `async`) | Enable for reading Avro from cloud storage (S3, GCS, Azure Blob, etc.). |

### Schema fingerprints & custom logical type helpers

| Feature | Default | What it enables | When to use |
| Feature | Default | What it enables | When to use |
|-----------------------------|--------:|----------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------|
| `md5` | ⬜ | `md5` dep for optional **MD5** schema fingerprints | If you want to compute MD5 fingerprints of writer schemas (i.e. for custom prefixing/validation). |
| `sha256` | ⬜ | `sha2` dep for optional **SHA‑256** schema fingerprints | If you prefer longer fingerprints; affects max prefix length (i.e. when framing). |
| `md5` | ⬜ | `md5` dep for optional **MD5** schema fingerprints | If you want to compute MD5 fingerprints of writer schemas (i.e. for custom prefixing/validation). |
| `sha256` | ⬜ | `sha2` dep for optional **SHA‑256** schema fingerprints | If you prefer longer fingerprints; affects max prefix length (i.e. when framing). |
| `small_decimals` | ⬜ | Extra handling for **small decimal** logical types (`Decimal32` and `Decimal64`) | If your Avro `decimal` values are small and you want more compact Arrow representations. |
| `avro_custom_types` | ⬜ | Annotates Avro values using Arrow specific custom logical types | Enable when you need arrow-avro to reinterpret certain Avro fields as Arrow types that Avro doesnt natively model. |
| `canonical_extension_types` | ⬜ | Re‑exports Arrows canonical extension types support from `arrow-schema` | Enable if your workflow uses Arrow [canonical extension types] and you want `arrow-avro` to respect them. |
| `avro_custom_types` | ⬜ | Annotates Avro values using Arrow specific custom logical types | Enable when you need arrow-avro to reinterpret certain Avro fields as Arrow types that Avro doesn't natively model. |
| `canonical_extension_types` | ⬜ | Re‑exports Arrow's canonical extension types support from `arrow-schema` | Enable if your workflow uses Arrow [canonical extension types] and you want `arrow-avro` to respect them. |

[canonical extension types]: https://arrow.apache.org/docs/format/CanonicalExtensions.html

Expand All @@ -149,17 +186,22 @@ See the crate docs for runnable SOE and Confluent round‑trip examples.
* Minimal, fast build (common pipelines):

```toml
arrow-avro = { version = "56", default-features = false, features = ["deflate", "snappy"] }
arrow-avro = { version = "58", default-features = false, features = ["deflate", "snappy"] }
```
* Include Zstandard too (modern data lakes):

```toml
arrow-avro = { version = "56", default-features = false, features = ["deflate", "snappy", "zstd"] }
arrow-avro = { version = "58", default-features = false, features = ["deflate", "snappy", "zstd"] }
```
* Async reading from object stores (S3, GCS, etc.):

```toml
arrow-avro = { version = "58", features = ["object_store"] }
```
* Fingerprint helpers:

```toml
arrow-avro = { version = "56", features = ["md5", "sha256"] }
arrow-avro = { version = "58", features = ["md5", "sha256"] }
```

---
Expand Down
47 changes: 47 additions & 0 deletions arrow-avro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,56 @@
//! # Ok(()) }
//! ```
//!
//! ## `async` Reading (`async` feature)
//!
//! The [`reader`] module provides async APIs for reading Avro files when the `async`
//! feature is enabled.
//!
//! [`AsyncAvroFileReader`] implements `Stream<Item = Result<RecordBatch, ArrowError>>`,
//! allowing efficient async streaming of record batches. When the `object_store` feature
//! is enabled, [`AvroObjectReader`] provides integration with object storage services
//! such as S3 via the [object_store] crate.
//!
//! ```
//! use std::sync::Arc;
//! use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader};
//! use futures::TryStreamExt;
//! use object_store::ObjectStore;
//! use object_store::local::LocalFileSystem;
//! use object_store::path::Path;
//!
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
//! let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
//! let path = Path::from("data/example.avro");
//! let meta = store.head(&path).await?;
//!
//! let reader = AvroObjectReader::new(store, path);
//! let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024)
//! .try_build()
//! .await?;
//!
//! let batches: Vec<_> = stream.try_collect().await?;
//! # Ok(())
//! # }
//! ```
//!
//! [object_store]: https://docs.rs/object_store/latest/object_store/
//!
//! ---
//!
//! ### Modules
//!
//! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es.
//! - With the `async` feature: [`AsyncAvroFileReader`] for async streaming reads.
//! - With the `object_store` feature: [`AvroObjectReader`] for reading from cloud storage.
//! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent, Apicurio).
//! - [`schema`]: Avro schema parsing / fingerprints / registries.
//! - [`compression`]: codecs used for **OCF block compression** (i.e., Deflate, Snappy, Zstandard, BZip2, and XZ).
//! - [`codec`]: internal Avro-Arrow type conversion and row decode/encode plans.
//!
//! [`AsyncAvroFileReader`]: reader::AsyncAvroFileReader
//! [`AvroObjectReader`]: reader::AvroObjectReader
//!
//! ### Features
//!
//! **OCF compression (enabled by default)**
Expand All @@ -142,6 +182,11 @@
//! - `bzip2` — enable BZip2 block compression.
//! - `xz` — enable XZ/LZMA block compression.
//!
//! **Async & Object Store (opt‑in)**
//! - `async` — enable async APIs for reading Avro (`AsyncAvroFileReader`, `AsyncFileReader` trait).
//! - `object_store` — enable integration with the [`object_store`] crate for reading Avro
//! from cloud storage (S3, GCS, Azure Blob, etc.) via `AvroObjectReader`. Implies `async`.
//!
//! **Schema fingerprints & helpers (opt‑in)**
//! - `md5` — enable MD5 writer‑schema fingerprints.
//! - `sha256` — enable SHA‑256 writer‑schema fingerprints.
Expand All @@ -156,6 +201,8 @@
//! - OCF compression codecs apply only to **Object Container Files**; they do not affect Avro
//! single object encodings.
//!
//! [`object_store`]: https://docs.rs/object_store/latest/object_store/
//!
//! [canonical extension types]: https://arrow.apache.org/docs/format/CanonicalExtensions.html
//!
//! [Apache Arrow]: https://arrow.apache.org/
Expand Down
78 changes: 78 additions & 0 deletions arrow-avro/src/reader/async_reader/async_file_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use arrow_schema::ArrowError;
use bytes::Bytes;
use futures::FutureExt;
use futures::future::BoxFuture;
use std::io::SeekFrom;
use std::ops::Range;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

/// The asynchronous interface used by [`AsyncAvroFileReader`] to read avro files
///
/// Notes:
///
/// 1. There is a default implementation for types that implement [`AsyncRead`]
/// and [`AsyncSeek`], for example [`tokio::fs::File`].
///
/// 2. [`AvroObjectReader`], available when the `object_store` crate feature
/// is enabled, implements this interface for [`ObjectStore`].
///
/// [`ObjectStore`]: object_store::ObjectStore
///
/// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html
pub trait AsyncFileReader: Send {
/// Retrieve the bytes in `range`
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ArrowError>>;

/// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, Result<Vec<Bytes>, ArrowError>> {
async move {
let mut result = Vec::with_capacity(ranges.len());

for range in ranges.into_iter() {
let data = self.get_bytes(range).await?;
result.push(data);
}

Ok(result)
}
.boxed()
}
}

/// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader,
impl AsyncFileReader for Box<dyn AsyncFileReader + '_> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ArrowError>> {
self.as_mut().get_bytes(range)
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, Result<Vec<Bytes>, ArrowError>> {
self.as_mut().get_byte_ranges(ranges)
}
}

impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ArrowError>> {
async move {
self.seek(SeekFrom::Start(range.start)).await?;

let to_read = range.end - range.start;
let mut buffer = Vec::with_capacity(to_read as usize);
let read = self.take(to_read).read_to_end(&mut buffer).await?;
if read as u64 != to_read {
return Err(ArrowError::AvroError(format!(
"expected to read {} bytes, got {}",
to_read, read
)));
}

Ok(buffer.into())
}
.boxed()
}
}
Loading