Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -433,146 +433,151 @@ impl TransactionStream {
/// 1. If we lose the connection, we will try reconnecting X times within Y seconds before crashing.
/// 2. If we specified an end version and we hit that, we will stop fetching, but we will make sure that
/// all existing transactions are processed
/// 3. If no transactions are received within the response item timeout (default 60s),
/// we will automatically reconnect and retry.
///
/// Returns
/// - true if should continue fetching
/// - false if we reached the end of the stream or there is an error and the loop should stop
pub async fn get_next_transaction_batch(&mut self) -> Result<TransactionsPBResponse> {
let grpc_channel_recv_latency = std::time::Instant::now();
loop {
let grpc_channel_recv_latency = std::time::Instant::now();

let txn_pb_res = match tokio::time::timeout(
self.transaction_stream_config
.indexer_grpc_response_item_timeout(),
self.stream.next(),
)
.await
{
// Received datastream response
Ok(response) => {
match response {
Some(Ok(r)) => {
self.reconnection_retries = 0;

// The processed range may not exist if using the v1 transaction stream.
// In the case that it doesn't exist, use the previous behavior of using the transaction version of the first and last transactions.
let start_version = match r.processed_range {
Some(range) => range.first_version,
None => r.transactions.as_slice().first().unwrap().version,
};
let end_version = match r.processed_range {
Some(range) => range.last_version,
None => r.transactions.as_slice().last().unwrap().version,
};

// The processed range does not contain a timestamp, so we use the timestamp of the first and last transactions.
let start_txn_timestamp =
r.transactions.as_slice().first().and_then(|t| t.timestamp);
let end_txn_timestamp =
r.transactions.as_slice().last().and_then(|t| t.timestamp);

let size_in_bytes = r.encoded_len() as u64;
let chain_id: u64 = r
.chain_id
.expect("[Transaction Stream] Chain Id doesn't exist.");
let num_txns = r.transactions.len();
let duration_in_secs = grpc_channel_recv_latency.elapsed().as_secs_f64();
self.fetch_ma.tick_now(num_txns as u64);

sample!(
SampleRate::Duration(Duration::from_secs(1)),
info!(
let txn_pb_res = match tokio::time::timeout(
self.transaction_stream_config.indexer_grpc_response_item_timeout(),
self.stream.next(),
)
.await
{
// Received datastream response
Ok(response) => {
match response {
Some(Ok(r)) => {
self.reconnection_retries = 0;

// The processed range may not exist if using the v1 transaction stream.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really but I think it's ok to keep just to be safe

// In the case that it doesn't exist, use the previous behavior of using the transaction version of the first and last transactions.
let start_version = match r.processed_range {
Some(range) => range.first_version,
None => r.transactions.as_slice().first().unwrap().version,
};
let end_version = match r.processed_range {
Some(range) => range.last_version,
None => r.transactions.as_slice().last().unwrap().version,
};

// The processed range does not contain a timestamp, so we use the timestamp of the first and last transactions.
let start_txn_timestamp =
r.transactions.as_slice().first().and_then(|t| t.timestamp);
let end_txn_timestamp =
r.transactions.as_slice().last().and_then(|t| t.timestamp);

let size_in_bytes = r.encoded_len() as u64;
let chain_id: u64 = r
.chain_id
.expect("[Transaction Stream] Chain Id doesn't exist.");
let num_txns = r.transactions.len();
let duration_in_secs =
grpc_channel_recv_latency.elapsed().as_secs_f64();
self.fetch_ma.tick_now(num_txns as u64);

sample!(
SampleRate::Duration(Duration::from_secs(1)),
info!(
stream_address = self
.transaction_stream_config
.indexer_grpc_data_service_address
.to_string(),
connection_id = self.connection_id,
start_version = start_version,
end_version = end_version,
start_txn_timestamp_iso = start_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default(),
end_txn_timestamp_iso = end_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default(),
num_of_transactions = end_version - start_version + 1,
size_in_bytes = size_in_bytes,
duration_in_secs = duration_in_secs,
tps = self.fetch_ma.avg().ceil() as u64,
bytes_per_sec = size_in_bytes as f64 / duration_in_secs,
"[Transaction Stream] Received transactions from GRPC.",
)
);

if let Some(last_fetched_version) = self.last_fetched_version {
if last_fetched_version + 1 != start_version as i64 {
error!(
last_fetched_version = self.last_fetched_version, // last fetched version
expected_start_version =
self.last_fetched_version.map(|v| v + 1), // expected start version
actual_start_version = start_version, // actual start version
"[Transaction Stream] Received batch with gap from GRPC stream"
);
return Err(anyhow!("Received batch with gap from GRPC stream"));
}
}
self.last_fetched_version = Some(end_version as i64);

let txn_pb = TransactionsPBResponse {
transactions: r.transactions,
chain_id,
start_version,
end_version,
start_txn_timestamp,
end_txn_timestamp,
size_in_bytes,
};

return Ok(txn_pb);
},
// Error receiving datastream response
Some(Err(rpc_error)) => {
warn!(
stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(),
connection_id = self.connection_id,
start_version = self.transaction_stream_config.starting_version,
end_version = self.transaction_stream_config.request_ending_version,
error = ?rpc_error,
"[Transaction Stream] Error receiving datastream response."
);
Err(anyhow!("Error receiving datastream response"))
},
// Stream is finished
None => {
warn!(
stream_address = self
.transaction_stream_config
.indexer_grpc_data_service_address
.to_string(),
connection_id = self.connection_id,
start_version = start_version,
end_version = end_version,
start_txn_timestamp_iso = start_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default(),
end_txn_timestamp_iso = end_txn_timestamp
.as_ref()
.map(timestamp_to_iso)
.unwrap_or_default(),
num_of_transactions = end_version - start_version + 1,
size_in_bytes = size_in_bytes,
duration_in_secs = duration_in_secs,
tps = self.fetch_ma.avg().ceil() as u64,
bytes_per_sec = size_in_bytes as f64 / duration_in_secs,
"[Transaction Stream] Received transactions from GRPC.",
)
);

if let Some(last_fetched_version) = self.last_fetched_version {
if last_fetched_version + 1 != start_version as i64 {
error!(
last_fetched_version = self.last_fetched_version, // last fetched version
expected_start_version =
self.last_fetched_version.map(|v| v + 1), // expected start version
actual_start_version = start_version, // actual start version
"[Transaction Stream] Received batch with gap from GRPC stream"
);
return Err(anyhow!("Received batch with gap from GRPC stream"));
}
}
self.last_fetched_version = Some(end_version as i64);

let txn_pb = TransactionsPBResponse {
transactions: r.transactions,
chain_id,
start_version,
end_version,
start_txn_timestamp,
end_txn_timestamp,
size_in_bytes,
};

Ok(txn_pb)
},
// Error receiving datastream response
Some(Err(rpc_error)) => {
warn!(
stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(),
connection_id = self.connection_id,
start_version = self.transaction_stream_config.starting_version,
end_version = self.transaction_stream_config.request_ending_version,
error = ?rpc_error,
"[Transaction Stream] Error receiving datastream response."
);
Err(anyhow!("Error receiving datastream response"))
},
// Stream is finished
None => {
warn!(
stream_address = self
.transaction_stream_config
.indexer_grpc_data_service_address
.to_string(),
connection_id = self.connection_id,
start_version = self.transaction_stream_config.starting_version,
end_version = self.transaction_stream_config.request_ending_version,
"[Transaction Stream] Stream ended."
);
Err(anyhow!("Stream ended"))
},
}
},
// Timeout receiving datastream response
Err(e) => {
warn!(
stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(),
connection_id = self.connection_id,
start_version = self.transaction_stream_config.starting_version,
end_version = self.transaction_stream_config.request_ending_version,
error = ?e,
"[Transaction Stream] Timeout receiving datastream response."
);
Err(anyhow!("Timeout receiving datastream response"))
},
};
txn_pb_res
start_version = self.transaction_stream_config.starting_version,
end_version = self.transaction_stream_config.request_ending_version,
"[Transaction Stream] Stream ended."
);
Err(anyhow!("Stream ended"))
},
}
},
// Timeout receiving datastream response - reconnect and retry
Err(_) => {
warn!(
stream_address = self.transaction_stream_config.indexer_grpc_data_service_address.to_string(),
connection_id = self.connection_id,
start_version = self.transaction_stream_config.starting_version,
end_version = self.transaction_stream_config.request_ending_version,
timeout_secs = self.transaction_stream_config.indexer_grpc_response_item_timeout_secs,
"[Transaction Stream] Response item timeout. Reconnecting..."
);
self.reconnect_to_grpc_with_retries().await?;
continue;
},
};
return txn_pb_res;
}
}

/// Helper function to signal that we've fetched all the transactions up to the ending version that was requested.
Expand Down