Skip to content

Commit 02ffe83

Browse files
Add tests for transaction stream item timeout (#167)
1 parent cdc45e3 commit 02ffe83

File tree

3 files changed

+221
-0
lines changed

3 files changed

+221
-0
lines changed

aptos-indexer-processors-sdk/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aptos-indexer-processors-sdk/transaction-stream/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,7 @@ tokio = { workspace = true }
2828
tonic = { workspace = true }
2929
tracing = { workspace = true }
3030
url = { workspace = true }
31+
32+
[dev-dependencies]
33+
futures = { workspace = true }
34+
tokio-stream = { workspace = true }
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
use aptos_indexer_transaction_stream::{
2+
config::TransactionStreamConfig, transaction_stream::TransactionStream,
3+
};
4+
use aptos_protos::indexer::v1::{
5+
raw_data_server::{RawData, RawDataServer},
6+
GetTransactionsRequest, TransactionsResponse,
7+
};
8+
use futures::{Future, Stream};
9+
use std::{
10+
pin::Pin,
11+
sync::{
12+
atomic::{AtomicU64, Ordering},
13+
Arc,
14+
},
15+
task::{Context, Poll},
16+
time::Duration,
17+
};
18+
use tokio_stream::wrappers::TcpListenerStream;
19+
use tonic::{transport::Server, Request, Response, Status};
20+
use url::Url;
21+
22+
/// A stream that stalls for a specified duration before yielding a response
23+
struct StallingResponseStream {
24+
stall_duration: Duration,
25+
sleep: Option<Pin<Box<tokio::time::Sleep>>>,
26+
done: bool,
27+
}
28+
29+
impl StallingResponseStream {
30+
fn new(stall_duration: Duration) -> Self {
31+
Self {
32+
stall_duration,
33+
sleep: None,
34+
done: false,
35+
}
36+
}
37+
}
38+
39+
impl Stream for StallingResponseStream {
40+
type Item = Result<TransactionsResponse, Status>;
41+
42+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
if self.done {
44+
return Poll::Ready(None);
45+
}
46+
47+
// Initialize sleep if not already done
48+
if self.sleep.is_none() {
49+
self.sleep = Some(Box::pin(tokio::time::sleep(self.stall_duration)));
50+
}
51+
52+
// Poll the sleep future
53+
if let Some(sleep) = self.sleep.as_mut() {
54+
match sleep.as_mut().poll(cx) {
55+
Poll::Pending => return Poll::Pending,
56+
Poll::Ready(()) => {
57+
self.done = true;
58+
return Poll::Ready(Some(Ok(TransactionsResponse {
59+
transactions: vec![],
60+
chain_id: Some(1),
61+
processed_range: Some(aptos_protos::indexer::v1::ProcessedRange {
62+
first_version: 0,
63+
last_version: 0,
64+
}),
65+
})));
66+
}
67+
}
68+
}
69+
70+
Poll::Pending
71+
}
72+
}
73+
74+
/// A mock gRPC server that stalls for a specified duration before returning a response.
75+
/// After max_connections is reached, it rejects new connections with an error.
76+
pub struct StallingMockGrpcServer {
77+
stall_duration: Duration,
78+
connection_count: Arc<AtomicU64>,
79+
max_connections: u64,
80+
}
81+
82+
type ResponseStream = Pin<Box<dyn Stream<Item = Result<TransactionsResponse, Status>> + Send>>;
83+
84+
#[tonic::async_trait]
85+
impl RawData for StallingMockGrpcServer {
86+
type GetTransactionsStream = ResponseStream;
87+
88+
async fn get_transactions(
89+
&self,
90+
_req: Request<GetTransactionsRequest>,
91+
) -> Result<Response<Self::GetTransactionsStream>, Status> {
92+
let count = self.connection_count.fetch_add(1, Ordering::SeqCst) + 1;
93+
94+
// Reject connections after max_connections
95+
if count > self.max_connections {
96+
return Err(Status::unavailable("Server is no longer accepting connections"));
97+
}
98+
99+
let stream = StallingResponseStream::new(self.stall_duration);
100+
Ok(Response::new(Box::pin(stream)))
101+
}
102+
}
103+
104+
impl StallingMockGrpcServer {
105+
pub fn new(
106+
stall_duration: Duration,
107+
connection_count: Arc<AtomicU64>,
108+
max_connections: u64,
109+
) -> Self {
110+
Self {
111+
stall_duration,
112+
connection_count,
113+
max_connections,
114+
}
115+
}
116+
117+
pub async fn run(self) -> anyhow::Result<u16> {
118+
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
119+
let bound_addr = listener.local_addr()?;
120+
let stream = TcpListenerStream::new(listener);
121+
122+
let server = Server::builder().add_service(
123+
RawDataServer::new(self)
124+
.accept_compressed(tonic::codec::CompressionEncoding::Zstd)
125+
.send_compressed(tonic::codec::CompressionEncoding::Zstd),
126+
);
127+
128+
tokio::spawn(async move {
129+
let _ = server.serve_with_incoming(stream).await;
130+
});
131+
132+
Ok(bound_addr.port())
133+
}
134+
}
135+
136+
#[tokio::test]
137+
async fn test_transaction_stream_reconnects_on_timeout() {
138+
let connection_count = Arc::new(AtomicU64::new(0));
139+
let stall_duration = Duration::from_secs(6);
140+
let timeout_duration_secs = 5;
141+
142+
// Server accepts 2 connections:
143+
// 1. Initial connection from TransactionStream::new
144+
// 2. First reconnection attempt after timeout
145+
// After that, reconnection fails and the test completes
146+
let max_connections = 2;
147+
148+
// Start the stalling mock server
149+
let server =
150+
StallingMockGrpcServer::new(stall_duration, connection_count.clone(), max_connections);
151+
let port = server.run().await.expect("Failed to start mock server");
152+
153+
// Create config with 5 second timeout and minimal retries for faster test
154+
let config = TransactionStreamConfig {
155+
indexer_grpc_data_service_address: Url::parse(&format!("http://127.0.0.1:{}", port))
156+
.unwrap(),
157+
starting_version: Some(0),
158+
request_ending_version: None,
159+
auth_token: "test_token".to_string(),
160+
request_name_header: "test".to_string(),
161+
additional_headers: Default::default(),
162+
indexer_grpc_http2_ping_interval_secs: 30,
163+
indexer_grpc_http2_ping_timeout_secs: 10,
164+
indexer_grpc_reconnection_timeout_secs: 5,
165+
indexer_grpc_response_item_timeout_secs: timeout_duration_secs,
166+
indexer_grpc_reconnection_max_retries: 2,
167+
transaction_filter: None,
168+
};
169+
170+
// Initialize the transaction stream (uses connection 1)
171+
let mut transaction_stream = TransactionStream::new(config)
172+
.await
173+
.expect("Failed to create transaction stream");
174+
175+
assert_eq!(
176+
connection_count.load(Ordering::SeqCst),
177+
1,
178+
"Should have 1 initial connection"
179+
);
180+
181+
let start = std::time::Instant::now();
182+
183+
// Try to get next batch - this should:
184+
// 1. Timeout after 5 seconds (stream stalls for 6s)
185+
// 2. Reconnect (uses connection 2, also stalls)
186+
// 3. Timeout again after 5 seconds
187+
// 4. Try to reconnect but fail (max connections reached)
188+
// 5. Return error
189+
let result = transaction_stream.get_next_transaction_batch().await;
190+
let elapsed = start.elapsed();
191+
192+
// Should have failed after timeouts and failed reconnection
193+
assert!(result.is_err(), "Expected error after failed reconnection");
194+
195+
// Verify we waited at least one timeout period
196+
assert!(
197+
elapsed >= Duration::from_secs(timeout_duration_secs),
198+
"Should have waited at least {} seconds. Elapsed: {:?}",
199+
timeout_duration_secs,
200+
elapsed
201+
);
202+
203+
// Verify reconnection was attempted (connection count > 1)
204+
let total_connections = connection_count.load(Ordering::SeqCst);
205+
assert!(
206+
total_connections > 1,
207+
"Should have attempted reconnection. Total connections: {}",
208+
total_connections
209+
);
210+
211+
println!(
212+
"Test completed in {:?} with {} total connections",
213+
elapsed, total_connections
214+
);
215+
}

0 commit comments

Comments
 (0)