-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Description
Is your feature request related to a problem or challenge? Please describe what you are trying to do.
arrow-avro currently provides synchronous writing APIs (Writer<W: std::io::Write> and the aliases AvroWriter for OCF and AvroStreamWriter for SOE). In async-first applications (Tokio-based services, DataFusion execution, writing to object_store, HTTP streaming responses, etc.), these sync APIs force one of the following suboptimal approaches:
- Use blocking I/O (
std::fs::File) inside an async runtime (risking thread starvation / reduced throughput). - Use
spawn_blockingto isolate blocking writes (adds complexity and makes backpressure/error handling harder). - Buffer the entire output into memory (e.g., write to
Vec<u8>) before uploading to object storage (bad for large outputs).
In parallel, PR #8930 is adding an arrow-avro AsyncReader for Avro OCF (with object_store integration). To enable end-to-end async pipelines, arrow-avro should also have a corresponding AsyncWriter.
Concrete use cases:
- Write Avro OCF results directly to
tokio::fs::Filewithout blocking. - Stream an Avro OCF response body over the network (chunked transfer, back-pressure aware).
- Write Avro OCF to S3/GCS/Azure via
object_storemultipart upload (symmetry with the new AsyncReader).
Describe the solution you'd like
Add a new arrow-avro async writer API that mirrors the patterns used by Parquet’s async writer (parquet/src/arrow/async_writer) and interoperates with the arrow-avro AsyncReader being introduced.
Proposed high-level design:
- New module and feature gating
- Add
arrow-avro/src/writer/async_writer.rs(orwriter/async_writer/mod.rs) behindcfg(feature = "async") - Align feature flags with the AsyncReader PR (
async, and optionallyobject_store)
- Async sink abstraction (match Parquet’s pattern)
-
Introduce an
AsyncFileWritertrait analogous to Parquet’s:- Accepts
bytes::Bytespayloads - Returns
futures::future::BoxFuture<'_, Result<(), ArrowError>> - Has
write(Bytes)andcomplete()methods
- Accepts
-
Provide:
impl AsyncFileWriter for Box<dyn AsyncFileWriter + '_>- A blanket impl for Tokio sinks (like Parquet):
impl<T: tokio::io::AsyncWrite + Unpin + Send> AsyncFileWriter for T
- AsyncWriter types and API parity
- Implement a generic
AsyncWriter<W, F>whereW: AsyncFileWriterandF: AvroFormat - Provide aliases consistent with the existing sync types:
type AsyncAvroWriter<W> = AsyncWriter<W, AvroOcfFormat>type AsyncAvroStreamWriter<W> = AsyncWriter<W, AvroSoeFormat>
- Public API should be intentionally close to the existing sync
Writer:WriterBuilder::build_async::<W, F>(writer)(preferred: reuses existing builder options)async fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError>async fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError>async fn finish(&mut self) -> Result<(), ArrowError>(flush +complete)- (Optional ergonomics)
async fn close(self) -> Result<W, ArrowError>to consume and return the underlying writer
-
Implementation approach (buffer + flush, like Parquet)
To avoid rewriting the encoder as “async”, reuse the existing synchronous encoding logic by staging into an in-memory buffer and then pushing the bytes into the async sink:
- On construction:
- Encode the OCF header via existing
AvroFormat::start_stream, but write it into aVec<u8>buffer - Flush that buffer to
AsyncFileWriter::write(Bytes)
- Encode the OCF header via existing
- On
write:- For OCF:
- Encode the batch into a
Vec<u8>using the existingRecordEncoder - Apply block-level compression exactly as the sync writer does
- Append block framing (
count,block_size,block_bytes,sync_marker) into a staging buffer - Flush staging buffer to the async sink
- Encode the batch into a
- For SOE stream:
- Encode into a staging buffer and flush (preserving the existing prefix behavior)
- For OCF:
- Reuse
WriterBuildersettings (with_compression,with_capacity,with_row_capacity,with_fingerprint_strategy) to size buffers and control prefixes/compression.
-
Interop and test plan (explicit requirement)
Add tests demonstrating interoperability with the AsyncReader in feat: Implement an AsyncReader for avro using ObjectStore #8930:
- Roundtrip test:
AsyncAvroWriter→ bytes/file/object_store →AsyncAvroReaderyields the sameRecordBatchvalues - Also validate roundtrip with the existing sync
ReaderBuilderto ensure backwards compatibility - Cover both:
- uncompressed OCF
- compressed OCF (as supported by
CompressionCodec)
-
object_store writer adapter
For symmetry with
parquet::arrow::async_writer::ParquetObjectWriterand the AsyncReader’sobject_storeintegration add anAvroObjectWriter(behindfeature = "object_store") implementingAsyncFileWriterand performing multipart upload.
Illustrative API sketch:
use tokio::fs::File;
use arrow_avro::writer::{WriterBuilder /*, AsyncAvroWriter */};
# async fn example(schema: arrow_schema::Schema, batch: arrow_array::RecordBatch) -> Result<(), arrow_schema::ArrowError> {
let file = File::create("out.avro").await.map_err(|e| arrow_schema::ArrowError::IoError(e.to_string(), e))?;
// via builder (preferred)
let mut w = WriterBuilder::new(schema)
.with_compression(None)
.with_capacity(64 * 1024)
.build_async::<_, arrow_avro::writer::AvroOcfFormat>(file)?;
// write and finish
w.write(&batch).await?;
w.finish().await?;
# Ok(())
# }Describe alternatives you've considered
-
Use synchronous
AvroWriterinspawn_blocking- Avoids blocking the async runtime directly, but complicates control flow and backpressure, and can still become a bottleneck at scale.
-
Write to
Vec<u8>with syncAvroWriterand then upload- Requires buffering entire outputs, increasing memory usage and latency.
Additional context
Notes / open questions:
- OCF sync markers are generated randomly today for
AvroOcfFormat. If we want any “byte-for-byte equality” tests between sync and async writers, we may need a way to inject a deterministic sync marker in tests, or rely purely on decode/roundtrip validation. - Naming bikeshed: should the terminal method be
finish().await(matching the current syncWriter) orclose().await(matching Parquet’s async writer)? Either is fine, but consistent naming would be nice.