Skip to content

Conversation

@EmilyMatt
Copy link

Which issue does this PR close?

Rationale for this change

Allows for proper file splitting within an asynchronous context.

What changes are included in this PR?

The raw implementation, allowing for file splitting, starting mid-block(read until sync marker is found), and further reading until end of block is found.
This reader currently requires a reader_schema is provided if type-promotion, schema-evolution, or projection are desired.
This is done so because #8928 is currently blocking proper parsing from an ArrowSchema

Are these changes tested?

Yes

Are there any user-facing changes?

Only the addition.
Other changes are internal to the crate (namely the way Decoder is created from parts)

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flushing a partial review with some high level thoughts.

I'll wait for you to finish before resuming.

@EmilyMatt
Copy link
Author

Flushing a partial review with some high level thoughts.

I'll wait for you to finish before resuming.

Honestly I think my main blocker is the schema thing here. I don't want to commit to the constructor before it is resolved as its a public API and I don't want it to be volatile

@jecsand838
Copy link
Contributor

jecsand838 commented Nov 26, 2025

Flushing a partial review with some high level thoughts.
I'll wait for you to finish before resuming.

Honestly I think my main blocker is the schema thing here. I don't want to commit to the constructor before it is resolved as its a public API and I don't want it to be volatile

100% I'm working on that right now and won't stop until I have a PR. That was a solid catch.

The schema logic is an area of the code I mean to (or would welcome) a full refactor of. I knew it would eventually come back.

@EmilyMatt
Copy link
Author

Sorry, I haven't dropped it, just found myself in a really busy week! The generic reader support does not seem to hard to implement from the dabbling I made, and I still need to get to the builder pattern change

…, separate object store file reader into a featuregated struct and use a generic async file reader trait
@EmilyMatt
Copy link
Author

@jecsand838 I believe this is now ready for a proper review^

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EmilyMatt Thank you so much for getting these changes up!

I left a few comments. Let me know what you think.

EDIT: Should have mentioned that this is looking really good overall and I'm very excited for the AsyncReader!

@alamb
Copy link
Contributor

alamb commented Jan 10, 2026

@jecsand838 and @EmilyMatt -- how is this PR looking?

@EmilyMatt
Copy link
Author

@jecsand838 and @EmilyMatt -- how is this PR looking?

I had actually just returned to work on it 2 days ago, still having some issues with the schema now being provided, due to the problems I've described, @jecsand838 suggested removing the arrow schema and I'm starting to think that is the only viable way for now.
Making the fetch API a bit closer to the one parquet uses is the smaller issue, I do wish to keep the seperate semantics for the original fetch and extra fetch(for parquet for example, that will be the row groups ranges, and the footer range), will try a couple ways to do this

@EmilyMatt
Copy link
Author

Hope to push another version today and address some of the things above

@EmilyMatt
Copy link
Author

@jecsand838 I've shamelessly plagiarized the API for the object reader from the parquet crate, but that's ok IMO, it lays the foundations for a common API in a few versions.
I believe I've addressed everything, let me know if anything pops to mind

@jecsand838
Copy link
Contributor

@jecsand838 I've shamelessly plagiarized the API for the object reader from the parquet crate, but that's ok IMO, it lays the foundations for a common API in a few versions. I believe I've addressed everything, let me know if anything pops to mind

@EmilyMatt Absolutely, I'll have time to give this a solid review tonight. Ty for getting these changes in!

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EmilyMatt Flushing a partial review here. Looking really good overall. I know this is a huge PR and your high-level changes look great.

I left more code-level comments with one architectural call-out you may want to consider. Overall this is looking solid and I'm super stoked about this async reader for arrow-avro.

Comment on lines 156 to 158
ReaderState::Limbo => {
unreachable!("ReaderState::Limbo should never be observed");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the ReaderState::Limbo variant really necessary? Could we use Finished and if a bug causes an early return without setting state, the stream just ends (which is safer than panicking)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A user will never know they did not actually finish writing the file, they will think they've just reached the end, this is in my opinion times of magnitude more severe than crashing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean in the event of that occurring an ArrowError should be passed back which would alert the user.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be misunderstanding but you are suggesting two opposing things at the same time.

I can replace the panic with returning an error, but limbo should never exist. programatically, there should never be any code path where state can be limbo, and that is the purpose of the unreachable!() macro.

On the other hand, setting it to Finished means adding a risk of returning a Ready(None) by accident.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EmilyMatt

I might be misunderstanding but you are suggesting two opposing things at the same time.

I can replace the panic with returning an error, but limbo should never exist. programatically, there should never be any code path where state can be limbo, and that is the purpose of the unreachable!() macro.

On the other hand, setting it to Finished means adding a risk of returning a Ready(None) by accident.

Sorry I should have explained this better!

My thinking was it maybe better to simply catch the error and terminate early in a manner that bubbles the error back up to the caller (via the ReaderState::Finish or even a new ReaderState::Errored(ArrowError) variant) instead of assuming Limbo is and will always be unreachable. Granted you raise a fair point about returning a Ready(None) by accident, so that's something to be cautious of.

There's two big reasons this caught my attention, the first is despite being a solid idiomatic pattern in Rust, by using unreachable! we risk turning an internal bug into a process-aborting panic for downstream users. This is compounded by arrow-rs generally operating rather low in many stacks.

The second reason this caught my attention is there seems to be multiple early return paths (including ?, etc.) that can (either now or in the future) leave self.reader_state stuck as Limbo and potentially cause a panic on the next poll_next.

As an aside, here's two additional options to consider (if we want to continue going after getting an error):

  1. Keeping Limbo and eliminating ? inside the replaced state match with manual error handling that restores a real state.
  2. Refactoring to not mem::replace the entire enum.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe 2 is impossible, due to ownership semantics of async behaviour, this can be observed in the other async readers as well, they all usually do something similar when implementing streams manually.

Regarding 1. I was sure polling again after error violates the stream contract, but perhaps I was mistaken, in which case I apologize for speaking too confidently about there being no such code paths^^

Do you think an implementation such as the one I've pushed now is viable? or perhaps it is best to merge the Error state and invalid state?
I feel like using Finished is too risky for the reasons I've previously explained

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EmilyMatt

Regarding 1. I was sure polling again after error violates the stream contract, but perhaps I was mistaken, in which case I apologize for speaking too confidently about there being no such code paths^^

All good! Polling again after an error item is not a Stream contract violation that I'm aware of. The Stream contract only explicitly warns about polling again after the stream has terminated (i.e. after Ready(None)). With Stream<Item = Result<_, _>>, an Err is just another yielded item, and it’s valid for callers to keep polling unless the stream returns None.

I believe 2 is impossible, due to ownership semantics of async behaviour, this can be observed in the other async readers as well, they all usually do something similar when implementing streams manually.

I agree that removing mem::replace on the entire enum isn’t always practical in these manual state machines -- you can do it, but I get that will require a large refactor (pin-projection / splitting state / Options, etc.). The mem::replace pattern is totally reasonable here, just wanted to make sure we're careful about panics in the hot path of such an upstream project.

Do you think an implementation such as the one I've pushed now is viable? or perhaps it is best to merge the Error state and invalid state?
I feel like using Finished is too risky for the reasons I've previously explained

On the current implementation: I glanced over the latest changes and I think the updated InvalidState sentinel approach + removing ?/early returns inside the replaced-state match is viable. I'll conduct a follow-up code review right now, but I'm pretty much inclined to approve this.

@mzabaluev
Copy link
Contributor

mzabaluev commented Jan 19, 2026

I get a ParseError("bad varint") on this test:

    fn get_int_array_schema() -> SchemaRef {
        let schema = Schema::new(vec![Field::new(
            "int_array",
            DataType::List(Arc::new(Field::new("element", DataType::Int32, true))),
            true,
        )])
        .with_metadata(HashMap::from([("avro.name".into(), "table".into())]));
        Arc::new(schema)
    }

    #[tokio::test]
    async fn test_bad_varint_bug() {
        let file = arrow_test_data("avro/bad-varint-bug.avro");

        let schema = get_int_array_schema();
        let batches = read_async_file(&file, 1024, None, schema).await.unwrap();
        let _batch = &batches[0];
    }

The Avro file, readable by Spark: bad-varint-bug.avro.gz

@mzabaluev-flarion
Copy link

The Avro file, readable by Spark: bad-varint-bug.avro.gz

I have checked the Avro file is readable with Python avro 1.12.1:

>>> from avro.datafile import DataFileReader
>>> from avro.io import DatumReader
>>> reader = DataFileReader(open("testing/data/avro/bad-varint-bug.avro", "rb"), DatumReader())
>>> for rec in reader:
...     print(rec)
... 
{'int_array': [1, 2]}

@EmilyMatt
Copy link
Author

EmilyMatt commented Jan 20, 2026

The Avro file, readable by Spark: bad-varint-bug.avro.gz

I have checked the Avro file is readable with Python avro 1.12.1:

>>> from avro.datafile import DataFileReader
>>> from avro.io import DatumReader
>>> reader = DataFileReader(open("testing/data/avro/bad-varint-bug.avro", "rb"), DatumReader())
>>> for rec in reader:
...     print(rec)
... 
{'int_array': [1, 2]}

I don't think this is a bug in the async reader.
You are using a testing infrastructure build around Arrow schemas which have the reader schema in the metadata, but you did not provide the schema in yours.

I can confirm the following test passes:

#[tokio::test]
    async fn test_bad_varint_bug() {
        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
        let location = Path::from_filesystem_path("/home/emily/Downloads/bad-varint-bug.avro").unwrap();

        let file_size = store.head(&location).await.unwrap().size;

        let file_reader = AvroObjectReader::new(store, location);
        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
            .try_build()
            .await.unwrap();;

        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
        let batch = &batches[0];
        let int_list_col = batch.column(0).as_list::<i32>();

        let first_list = int_list_col.value(0);
        let expected_result = Arc::new(Int32Array::from_iter_values(vec![1i32, 2])) as _;
        assert_eq!(first_list, expected_result)
    }

The issue is probably in the AvroSchema::from
it has various bugs I've also encountered.

@mzabaluev
Copy link
Contributor

I don't think this is a bug in the async reader. You are using a testing infrastructure build around Arrow schemas which have the reader schema in the metadata, but you did not provide the schema in yours.

My test provides the Arrow reader schema and the top-level Avro record name in the metadata, which should be sufficient.
The problem was in a schema mismatch: in the file, the array elements are not nullable.

@EmilyMatt
Copy link
Author

I don't think this is a bug in the async reader. You are using a testing infrastructure build around Arrow schemas which have the reader schema in the metadata, but you did not provide the schema in yours.

My test provides the Arrow reader schema and the top-level Avro record name in the metadata, which should be sufficient. The problem was in a schema mismatch: in the file, the array elements are not nullable.

It is not necessarily sufficient.
See
#8928

But you should open a bug for this. since a reader schema with nullables and writer schema with non-nullables should be compatible.

@jecsand838
Copy link
Contributor

jecsand838 commented Jan 21, 2026

@mzabaluev

I don't think this is a bug in the async reader. You are using a testing infrastructure build around Arrow schemas which have the reader schema in the metadata, but you did not provide the schema in yours.

My test provides the Arrow reader schema and the top-level Avro record name in the metadata, which should be sufficient. The problem was in a schema mismatch: in the file, the array elements are not nullable.

I think this is a schema resolution bug based on a quick glance over details you provided.

That being said there are limitations with using AvroSchema::try_from to create a reader schema. For now my recommendation for creating a reader schema (especially more complicated ones) is to either:

  1. Modify the writer schema's JSON
  2. Manually craft the json for an AvroSchema.
  3. Use AvroSchema::try_from, but sanitize the output and embed it into a pre-defined JSON wrapper if needed.

Originally the AvroSchema::try_from method was built for the Writer so that a correct AvroSchema is inferred from an Arrow Schema in the absence of a provided AvroSchema.

The biggest challenge to overcome relates to the lossy behavior inherent to Arrow -> Avro schema conversion, i.e. Arrow not having the concepts of named types, etc.

@EmilyMatt

The issue is probably in the AvroSchema::from
it has various bugs I've also encountered.

100%, It's absolutely not related to this PR. Sorry about not jumping in sooner to call that out.


As an aside, I just created #9233 which proposes an approach for modularizing schema.rs, adding an ArrowToAvroSchemaBuilder, and enhancing the overall AvroSchema conversion functionality. I'd love to get some feedback if either of you get an opportunity!

Copy link
Contributor

@jecsand838 jecsand838 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EmilyMatt This looks good! I left some final feedback and recommendations, but I think it's at a place to re-run the CI/CD jobs if you wanted to follow-up on these. Once pipelines pass, I'll approve.

CC: @alamb

Comment on lines +141 to +165
// If projection exists, project the reader schema,
// if no reader schema is provided, parse it from the header(get the raw writer schema), and project that
// this projected schema will be the schema used for reading.
let projected_reader_schema = self
.projection
.as_deref()
.map(|projection| {
let base_schema = if let Some(reader_schema) = &self.reader_schema {
reader_schema.clone()
} else {
let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
ArrowError::ParseError("No Avro schema present in file header".to_string())
})?;
let json_string = std::str::from_utf8(raw)
.map_err(|e| {
ArrowError::ParseError(format!(
"Invalid UTF-8 in Avro schema header: {e}"
))
})?
.to_string();
AvroSchema::new(json_string)
};
base_schema.project(projection)
})
.transpose()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably add more test coverage to the arrow-avro/src/reader/async_reader/builder.rs file.

Screenshot 2026-01-20 at 7 14 41 PM

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests, not all the error cases are covered in the builder, but it looks better now

Comment on lines 35 to 58
enum ReaderState<R: AsyncFileReader> {
/// Intermediate state to fix ownership issues
InvalidState,
/// Initial state, fetch initial range
Idle { reader: R },
/// Fetching data from the reader
FetchingData {
future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
next_behaviour: FetchNextBehaviour,
},
/// Decode a block in a loop until completion
DecodingBlock { data: Bytes, reader: R },
/// Output batches from a decoded block
ReadingBatches {
data: Bytes,
block_data: Bytes,
remaining_in_block: usize,
reader: R,
},
/// An error occurred, should not have been polled again.
Error,
/// Done, flush decoder and return
Finished,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking through the code, I strongly recommend making the stream "fused" so that it terminates after an error, rather than yielding infinite errors.

My concern is that currently, if the reader enters ReaderState::Error (or hits InvalidState), it returns Some(Err(...)) indefinitely. This causes consumers like StreamExt::collect to hang forever.

I recommend fixing this by strictly transitioning to ReaderState::Finished immediately upon encountering any error. This is similar to the Parquet Async Reader.

What do you think about these updates below?

Suggested change
enum ReaderState<R: AsyncFileReader> {
/// Intermediate state to fix ownership issues
InvalidState,
/// Initial state, fetch initial range
Idle { reader: R },
/// Fetching data from the reader
FetchingData {
future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
next_behaviour: FetchNextBehaviour,
},
/// Decode a block in a loop until completion
DecodingBlock { data: Bytes, reader: R },
/// Output batches from a decoded block
ReadingBatches {
data: Bytes,
block_data: Bytes,
remaining_in_block: usize,
reader: R,
},
/// An error occurred, should not have been polled again.
Error,
/// Done, flush decoder and return
Finished,
}
enum ReaderState<R: AsyncFileReader> {
/// Intermediate state to fix ownership issues
InvalidState,
/// Initial state, fetch initial range
Idle { reader: R },
/// Fetching data from the reader
FetchingData {
future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
next_behaviour: FetchNextBehaviour,
},
/// Decode a block in a loop until completion
DecodingBlock { data: Bytes, reader: R },
/// Output batches from a decoded block
ReadingBatches {
data: Bytes,
block_data: Bytes,
remaining_in_block: usize,
reader: R,
},
/// Successfully finished reading file contents; drain any remaining buffered records
/// from the decoder into (possibly partial) output batches.
Flushing,
/// Terminal state. Always yields `None` and never returns items again.
Finished,
}

Then we can do something like this for AsyncAvroFileReader::read_next:

    /// Terminate the stream after returning this error once.
    #[inline]
    fn finish_with_error(
        &mut self,
        error: ArrowError,
    ) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
        self.reader_state = ReaderState::Finished;
        Poll::Ready(Some(Err(error)))
    }

    #[inline]
    fn start_flushing(&mut self) {
        self.reader_state = ReaderState::Flushing;
    }

    /// Drain any remaining buffered records from the decoder.
    #[inline]
    fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
        match self.decoder.flush() {
            Ok(Some(batch)) => {
                self.reader_state = ReaderState::Flushing;
                Poll::Ready(Some(Ok(batch)))
            }
            Ok(None) => {
                self.reader_state = ReaderState::Finished;
                Poll::Ready(None)
            }
            Err(e) => self.finish_with_error(e),
        }
    }

    fn read_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
        loop {
            match mem::replace(&mut self.reader_state, ReaderState::InvalidState) {
                ReaderState::Idle { mut reader } => {
                    let range = self.range.clone();
                    if range.start >= range.end {
                        return self.finish_with_error(ArrowError::AvroError(format!(
                            "Invalid range specified for Avro file: start {} >= end {}, file_size: {}",
                            range.start, range.end, self.file_size
                        )));
                    }
                    let future = async move {
                        let data = reader.get_bytes(range).await?;
                        Ok((reader, data))
                    }
                    .boxed();
                    self.reader_state = ReaderState::FetchingData {
                        future,
                        next_behaviour: FetchNextBehaviour::ReadSyncMarker,
                    };
                }
                ReaderState::FetchingData {
                    mut future,
                    next_behaviour,
                } => {
                    let (reader, data_chunk) = match future.poll_unpin(cx) {
                        Poll::Ready(Ok(data)) => data,
                        Poll::Ready(Err(e)) => return self.finish_with_error(e),
                        Poll::Pending => {
                            self.reader_state = ReaderState::FetchingData {
                                future,
                                next_behaviour,
                            };
                            return Poll::Pending;
                        }
                    };
                    match next_behaviour {
                        FetchNextBehaviour::ReadSyncMarker => {
                            let sync_marker_pos = data_chunk
                                .windows(16)
                                .position(|slice| slice == self.sync_marker);
                            let block_start = match sync_marker_pos {
                                Some(pos) => pos + 16, // Move past the sync marker
                                None => {
                                    // Sync marker not found, valid if we arbitrarily split the file at its end.
                                    self.reader_state = ReaderState::Finished;
                                    return Poll::Ready(None);
                                }
                            };
                            self.reader_state = ReaderState::DecodingBlock {
                                reader,
                                data: data_chunk.slice(block_start..),
                            };
                        }
                        FetchNextBehaviour::DecodeVLQHeader => {
                            let mut reader = reader;
                            let mut data = data_chunk;
                            // Feed bytes one at a time until we reach Data state (VLQ header complete)
                            while !matches!(self.block_decoder.state(), BlockDecoderState::Data) {
                                if data.is_empty() {
                                    return self.finish_with_error(ArrowError::AvroError(
                                        "Unexpected EOF while reading Avro block header".into(),
                                    ));
                                }
                                let consumed = match self.block_decoder.decode(&data[..1]) {
                                    Ok(consumed) => consumed,
                                    Err(e) => return self.finish_with_error(e),
                                };
                                if consumed == 0 {
                                    return self.finish_with_error(ArrowError::AvroError(
                                        "BlockDecoder failed to consume byte during VLQ header parsing"
                                            .into(),
                                    ));
                                }
                                data = data.slice(consumed..);
                            }
                            // Now we know the block size. Slice remaining data to what we need.
                            let bytes_remaining = self.block_decoder.bytes_remaining();
                            let data_to_use = data.slice(..data.len().min(bytes_remaining));
                            let consumed = match self.block_decoder.decode(&data_to_use) {
                                Ok(consumed) => consumed,
                                Err(e) => return self.finish_with_error(e),
                            };
                            if consumed != data_to_use.len() {
                                return self.finish_with_error(ArrowError::AvroError(
                                    "BlockDecoder failed to consume all bytes after VLQ header parsing"
                                        .into(),
                                ));
                            }
                            // May need more data to finish the block.
                            let range_to_fetch = match self.remaining_block_range() {
                                Ok(range) if range.is_empty() => {
                                    // All bytes fetched, move to decoding block directly
                                    self.reader_state = ReaderState::DecodingBlock {
                                        reader,
                                        data: Bytes::new(),
                                    };
                                    continue;
                                }
                                Ok(range) => range,
                                Err(e) => return self.finish_with_error(e),
                            };
                            let future = async move {
                                let data = reader.get_bytes(range_to_fetch).await?;
                                Ok((reader, data))
                            }
                            .boxed();
                            self.reader_state = ReaderState::FetchingData {
                                future,
                                next_behaviour: FetchNextBehaviour::ContinueDecoding,
                            };
                            continue;
                        }
                        FetchNextBehaviour::ContinueDecoding => {
                            self.reader_state = ReaderState::DecodingBlock {
                                reader,
                                data: data_chunk,
                            };
                        }
                    }
                }
                ReaderState::InvalidState => {
                    return self.finish_with_error(ArrowError::AvroError(
                        "AsyncAvroFileReader in invalid state".into(),
                    ));
                }
                ReaderState::DecodingBlock {
                    mut reader,
                    mut data,
                } => {
                    // Try to decode another block from the buffered reader.
                    let consumed = match self.block_decoder.decode(&data) {
                        Ok(consumed) => consumed,
                        Err(e) => return self.finish_with_error(e),
                    };
                    data = data.slice(consumed..);
                    // If we reached the end of the block, flush it, and move to read batches.
                    if let Some(block) = self.block_decoder.flush() {
                        // Successfully decoded a block.
                        let block_count = block.count;
                        // We completed (or resumed and completed) a block successfully.
                        self.finishing_partial_block = false;
                        let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
                            match codec.decompress(&block.data) {
                                Ok(decompressed) => decompressed,
                                Err(e) => {
                                    return self.finish_with_error(ArrowError::AvroError(format!(
                                        "Error decompressing Avro block with codec {codec:?}: {e}"
                                    )));
                                }
                            }
                        } else {
                            block.data
                        });
                        // Since we have an active block, move to reading batches
                        self.reader_state = ReaderState::ReadingBatches {
                            reader,
                            data,
                            block_data,
                            remaining_in_block: block_count,
                        };
                        continue;
                    }
                    // data should always be consumed unless Finished, if it wasn't, something went wrong
                    if !data.is_empty() {
                        return self.finish_with_error(ArrowError::AvroError(
                            "Unable to make progress decoding Avro block, data may be corrupted"
                                .into(),
                        ));
                    }
                    if matches!(self.block_decoder.state(), BlockDecoderState::Finished) {
                        // We've already flushed, so if no batch was produced, we are simply done.
                        self.finishing_partial_block = false;
                        self.start_flushing();
                        continue;
                    }
                    // If we've tried the following stage before, and still can't decode,
                    // this means the file is truncated or corrupted.
                    if self.finishing_partial_block {
                        return self.finish_with_error(ArrowError::AvroError(
                            "Unexpected EOF while reading last Avro block".into(),
                        ));
                    }
                    // Avro splitting case: block is incomplete, we need to:
                    // 1. Parse the length so we know how much to read
                    // 2. Fetch more data from the object store
                    // 3. Create a new block data from the remaining slice and the newly fetched data
                    // 4. Continue decoding until end of block
                    self.finishing_partial_block = true;
                    // Mid-block, but we don't know how many bytes are missing yet
                    if matches!(
                        self.block_decoder.state(),
                        BlockDecoderState::Count | BlockDecoderState::Size
                    ) {
                        // Max VLQ header is 20 bytes (10 bytes each for count and size).
                        // Fetch just enough to complete it.
                        const MAX_VLQ_HEADER_SIZE: u64 = 20;
                        let fetch_end = (self.range.end + MAX_VLQ_HEADER_SIZE).min(self.file_size);
                        // If there is nothing more to fetch, error out
                        if fetch_end == self.range.end {
                            return self.finish_with_error(ArrowError::AvroError(
                                "Unexpected EOF while reading Avro block header".into(),
                            ));
                        }
                        let range_to_fetch = self.range.end..fetch_end;
                        self.range.end = fetch_end; // Track that we've fetched these bytes
                        let future = async move {
                            let data = reader.get_bytes(range_to_fetch).await?;
                            Ok((reader, data))
                        }
                        .boxed();
                        self.reader_state = ReaderState::FetchingData {
                            future,
                            next_behaviour: FetchNextBehaviour::DecodeVLQHeader,
                        };
                        continue;
                    }
                    // Otherwise, we're mid-block but know how many bytes are remaining to fetch.
                    let range_to_fetch = match self.remaining_block_range() {
                        Ok(range) => range,
                        Err(e) => return self.finish_with_error(e),
                    };
                    let future = async move {
                        let data = reader.get_bytes(range_to_fetch).await?;
                        Ok((reader, data))
                    }
                    .boxed();
                    self.reader_state = ReaderState::FetchingData {
                        future,
                        next_behaviour: FetchNextBehaviour::ContinueDecoding,
                    };
                    continue;
                }
                ReaderState::ReadingBatches {
                    reader,
                    data,
                    mut block_data,
                    mut remaining_in_block,
                } => {
                    let (consumed, records_decoded) =
                        match self.decoder.decode_block(&block_data, remaining_in_block) {
                            Ok((consumed, records_decoded)) => (consumed, records_decoded),
                            Err(e) => return self.finish_with_error(e),
                        };
                    remaining_in_block -= records_decoded;
                    if remaining_in_block == 0 {
                        if data.is_empty() {
                            // No more data to read, drain remaining buffered records
                            self.start_flushing();
                        } else {
                            // Finished this block, move to decode next block in the next iteration
                            self.reader_state = ReaderState::DecodingBlock { reader, data };
                        }
                    } else {
                        // Still more records to decode in this block, slice the already-read data and stay in this state
                        block_data = block_data.slice(consumed..);
                        self.reader_state = ReaderState::ReadingBatches {
                            reader,
                            data,
                            block_data,
                            remaining_in_block,
                        };
                    }
                    // We have a full batch ready, emit it
                    // (This is not mutually exclusive with the block being finished, so the state change is valid)
                    if self.decoder.batch_is_full() {
                        match self.decoder.flush() {
                            Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
                            Ok(None) => {
                                return self.finish_with_error(ArrowError::AvroError(
                                    "Decoder reported a full batch, but flush returned None".into(),
                                ));
                            }
                            Err(e) => return self.finish_with_error(e),
                        }
                    }
                }
                ReaderState::Flushing => {
                    return self.poll_flush();
                }
                ReaderState::Finished => {
                    // Terminal: once finished (including after an error), always yield None
                    self.reader_state = ReaderState::Finished;
                    return Poll::Ready(None);
                }
            }
        }
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this, also added a lint to prevent any accidental "?" in that function.
I will take a look at coverage later today, as it requires setting up some infra to test locally

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

arrow Changes to the arrow crate arrow-avro arrow-avro crate parquet Changes to the parquet crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement an async AvroReader

5 participants