-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: Implement an AsyncReader for avro using ObjectStore #8930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
4ed172b
e5c7f57
f5bfd35
32b0760
2251be5
79af114
4e207ea
e04337c
854cd95
8e0e46e
4f88571
cb19fad
d59a2f5
e57801d
939cf8e
03de289
a071ab8
1cc8d8a
1a2169b
c6634ad
f7cffc9
4361057
94aae91
3216788
138e72d
977f619
3f51768
69b7967
e3d3624
61c44e4
4d15e02
eef1b4d
5c93414
7936756
5bd5241
3f1489e
068128f
863dc17
31ee0eb
ab27d7b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -123,16 +123,56 @@ | |||||
| //! # Ok(()) } | ||||||
| //! ``` | ||||||
| //! | ||||||
| //! ## `async` Reading (`async` feature) | ||||||
| //! | ||||||
| //! The [`reader`] module provides async APIs for reading Avro files when the `async` | ||||||
| //! feature is enabled. | ||||||
| //! | ||||||
| //! [`AsyncAvroFileReader`] implements `Stream<Item = Result<RecordBatch, ArrowError>>`, | ||||||
| //! allowing efficient async streaming of record batches. When the `object_store` feature | ||||||
| //! is enabled, [`AvroObjectReader`] provides integration with object storage services | ||||||
| //! such as S3 via the [object_store] crate. | ||||||
| //! | ||||||
| //! ```ignore | ||||||
EmilyMatt marked this conversation as resolved.
Show resolved
Hide resolved
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's make this runnable
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know how to do it without failing the tests, because all the code here is featuregated, and the doctests also runs without features enabled |
||||||
| //! use std::sync::Arc; | ||||||
| //! use arrow_avro::reader::{AsyncAvroFileReader, AvroObjectReader}; | ||||||
| //! use futures::TryStreamExt; | ||||||
| //! use object_store::ObjectStore; | ||||||
| //! use object_store::local::LocalFileSystem; | ||||||
| //! use object_store::path::Path; | ||||||
| //! | ||||||
| //! # async fn example() -> Result<(), Box<dyn std::error::Error>> { | ||||||
| //! let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new()); | ||||||
| //! let path = Path::from("data/example.avro"); | ||||||
| //! let meta = store.head(&path).await?; | ||||||
| //! | ||||||
| //! let reader = AvroObjectReader::new(store, path); | ||||||
| //! let stream = AsyncAvroFileReader::builder(reader, meta.size, 1024) | ||||||
| //! .try_build() | ||||||
| //! .await?; | ||||||
| //! | ||||||
| //! let batches: Vec<_> = stream.try_collect().await?; | ||||||
| //! # Ok(()) | ||||||
| //! # } | ||||||
| //! ``` | ||||||
| //! | ||||||
| //! [object_store]: https://docs.rs/object_store/latest/object_store/ | ||||||
| //! | ||||||
| //! --- | ||||||
| //! | ||||||
| //! ### Modules | ||||||
| //! | ||||||
| //! - [`reader`]: read Avro (OCF, SOE, Confluent) into Arrow `RecordBatch`es. | ||||||
| //! - With the `async` feature: [`AsyncAvroFileReader`] for async streaming reads. | ||||||
| //! - With the `object_store` feature: [`AvroObjectReader`] for reading from cloud storage. | ||||||
| //! - [`writer`]: write Arrow `RecordBatch`es as Avro (OCF, SOE, Confluent, Apicurio). | ||||||
| //! - [`schema`]: Avro schema parsing / fingerprints / registries. | ||||||
| //! - [`compression`]: codecs used for **OCF block compression** (i.e., Deflate, Snappy, Zstandard, BZip2, and XZ). | ||||||
| //! - [`codec`]: internal Avro-Arrow type conversion and row decode/encode plans. | ||||||
| //! | ||||||
| //! [`AsyncAvroFileReader`]: reader::AsyncAvroFileReader | ||||||
| //! [`AvroObjectReader`]: reader::AvroObjectReader | ||||||
| //! | ||||||
| //! ### Features | ||||||
| //! | ||||||
| //! **OCF compression (enabled by default)** | ||||||
|
|
@@ -142,6 +182,11 @@ | |||||
| //! - `bzip2` — enable BZip2 block compression. | ||||||
| //! - `xz` — enable XZ/LZMA block compression. | ||||||
| //! | ||||||
| //! **Async & Object Store (opt‑in)** | ||||||
| //! - `async` — enable async APIs for reading Avro (`AsyncAvroFileReader`, `AsyncFileReader` trait). | ||||||
| //! - `object_store` — enable integration with the [`object_store`] crate for reading Avro | ||||||
| //! from cloud storage (S3, GCS, Azure Blob, etc.) via `AvroObjectReader`. Implies `async`. | ||||||
| //! | ||||||
| //! **Schema fingerprints & helpers (opt‑in)** | ||||||
| //! - `md5` — enable MD5 writer‑schema fingerprints. | ||||||
| //! - `sha256` — enable SHA‑256 writer‑schema fingerprints. | ||||||
|
|
@@ -156,6 +201,8 @@ | |||||
| //! - OCF compression codecs apply only to **Object Container Files**; they do not affect Avro | ||||||
| //! single object encodings. | ||||||
| //! | ||||||
| //! [`object_store`]: https://docs.rs/object_store/latest/object_store/ | ||||||
| //! | ||||||
| //! [canonical extension types]: https://arrow.apache.org/docs/format/CanonicalExtensions.html | ||||||
| //! | ||||||
| //! [Apache Arrow]: https://arrow.apache.org/ | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| use arrow_schema::ArrowError; | ||
| use bytes::Bytes; | ||
| use futures::FutureExt; | ||
| use futures::future::BoxFuture; | ||
| use std::io::SeekFrom; | ||
| use std::ops::Range; | ||
| use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; | ||
|
|
||
| /// The asynchronous interface used by [`AsyncAvroFileReader`] to read avro files | ||
| /// | ||
| /// Notes: | ||
| /// | ||
| /// 1. There is a default implementation for types that implement [`AsyncRead`] | ||
| /// and [`AsyncSeek`], for example [`tokio::fs::File`]. | ||
| /// | ||
| /// 2. [`AvroObjectReader`], available when the `object_store` crate feature | ||
| /// is enabled, implements this interface for [`ObjectStore`]. | ||
| /// | ||
| /// [`ObjectStore`]: object_store::ObjectStore | ||
| /// | ||
| /// [`tokio::fs::File`]: https://docs.rs/tokio/latest/tokio/fs/struct.File.html | ||
| pub trait AsyncFileReader: Send { | ||
| /// Retrieve the bytes in `range` | ||
| fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ArrowError>>; | ||
|
|
||
| /// Retrieve multiple byte ranges. The default implementation will call `get_bytes` sequentially | ||
| fn get_byte_ranges( | ||
| &mut self, | ||
| ranges: Vec<Range<u64>>, | ||
| ) -> BoxFuture<'_, Result<Vec<Bytes>, ArrowError>> { | ||
| async move { | ||
| let mut result = Vec::with_capacity(ranges.len()); | ||
|
|
||
| for range in ranges.into_iter() { | ||
| let data = self.get_bytes(range).await?; | ||
| result.push(data); | ||
| } | ||
|
|
||
| Ok(result) | ||
| } | ||
| .boxed() | ||
| } | ||
| } | ||
|
|
||
| /// This allows Box<dyn AsyncFileReader + '_> to be used as an AsyncFileReader, | ||
| impl AsyncFileReader for Box<dyn AsyncFileReader + '_> { | ||
| fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ArrowError>> { | ||
| self.as_mut().get_bytes(range) | ||
| } | ||
|
|
||
| fn get_byte_ranges( | ||
| &mut self, | ||
| ranges: Vec<Range<u64>>, | ||
| ) -> BoxFuture<'_, Result<Vec<Bytes>, ArrowError>> { | ||
| self.as_mut().get_byte_ranges(ranges) | ||
| } | ||
| } | ||
|
|
||
| impl<T: AsyncRead + AsyncSeek + Unpin + Send> AsyncFileReader for T { | ||
| fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes, ArrowError>> { | ||
| async move { | ||
| self.seek(SeekFrom::Start(range.start)).await?; | ||
|
|
||
| let to_read = range.end - range.start; | ||
| let mut buffer = Vec::with_capacity(to_read as usize); | ||
| let read = self.take(to_read).read_to_end(&mut buffer).await?; | ||
| if read as u64 != to_read { | ||
| return Err(ArrowError::AvroError(format!( | ||
| "expected to read {} bytes, got {}", | ||
| to_read, read | ||
| ))); | ||
| } | ||
|
|
||
| Ok(buffer.into()) | ||
| } | ||
| .boxed() | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.