Skip to content

Commit d87007e

Browse files
committed
I noticed some missing logs and I think there is an issue where blocks
are available before logs in the provider api. This is a temp patch as it doesn't solve the problem entirely. The best way I know how to deal with this is to batch the block/logs requests. I might do this in a later pr.
1 parent 5504656 commit d87007e

File tree

1 file changed

+68
-69
lines changed

1 file changed

+68
-69
lines changed

be/src/sync.rs

Lines changed: 68 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ impl Downloader {
207207
return Ok(());
208208
}
209209
let block = match self.start_block {
210-
Some(n) => self.remote_block(U64::from(n)).await?,
211-
None => self.remote_block_latest().await?,
210+
Some(n) => remote_block(&self.jrpc_client, U64::from(n)).await?,
211+
None => remote_block_latest(&self.jrpc_client).await?,
212212
};
213213
tracing::info!("initializing blocks table at: {}", block.number);
214214
let mut pg = self.be_pool.get().await.wrap_err("getting pg")?;
@@ -297,7 +297,7 @@ impl Downloader {
297297
*/
298298
#[tracing::instrument(level="info" skip_all, fields(from, to, blocks, txs, logs))]
299299
async fn download(&mut self, batch_size: u16) -> Result<u64, Error> {
300-
let latest = self.remote_block_latest().await?;
300+
let latest = remote_block_latest(&self.jrpc_client).await?;
301301
let _ = self.broadcaster.json_updates.send(serde_json::json!({
302302
"new_block": "remote",
303303
"chain": self.chain.0,
@@ -320,9 +320,9 @@ impl Downloader {
320320
tracing::Span::current()
321321
.record("from", from)
322322
.record("to", to);
323-
let (mut logs, mut blocks) = (
324-
self.download_logs(from, to).await?,
325-
self.download_blocks(from, to).await?,
323+
let (mut blocks, mut logs) = (
324+
download_blocks(&self.jrpc_client, from, to).await?,
325+
download_logs(&self.jrpc_client, from, to).await?,
326326
);
327327
add_timestamp(&mut blocks, &mut logs);
328328
validate_blocks(from, to, &blocks)?;
@@ -345,45 +345,6 @@ impl Downloader {
345345
Ok(last_block.number.to())
346346
}
347347

348-
#[tracing::instrument(level="info" skip_all, fields(from, to))]
349-
async fn download_logs(&self, from: u64, to: u64) -> Result<Vec<jrpc::Log>, Error> {
350-
Ok(self
351-
.jrpc_client
352-
.send_one(serde_json::json!({
353-
"id": "1",
354-
"jsonrpc": "2.0",
355-
"method": "eth_getLogs",
356-
"params": [{"fromBlock": U64::from(from), "toBlock": U64::from(to)}],
357-
}))
358-
.await?
359-
.to()?)
360-
}
361-
362-
#[tracing::instrument(level="info" skip_all, fields(from, to))]
363-
async fn download_blocks(&self, from: u64, to: u64) -> Result<Vec<jrpc::Block>, Error> {
364-
Ok(self
365-
.jrpc_client
366-
.send(
367-
(from..=to)
368-
.map(|n| {
369-
serde_json::json!({
370-
"id": "1",
371-
"jsonrpc": "2.0",
372-
"method": "eth_getBlockByNumber",
373-
"params": [U64::from(n), true],
374-
})
375-
})
376-
.collect(),
377-
)
378-
.await?
379-
.into_iter()
380-
.map(|resp| resp.to::<jrpc::Block>())
381-
.collect::<Result<Vec<_>, _>>()?
382-
.into_iter()
383-
.sorted_by(|a, b| a.number.cmp(&b.number))
384-
.collect())
385-
}
386-
387348
async fn local_latest(&self) -> Result<(u64, BlockHash), Error> {
388349
let pg = self.be_pool.get().await.wrap_err("pg pool")?;
389350
let q = "SELECT num, hash from blocks where chain = $1 order by num desc limit 1";
@@ -396,32 +357,70 @@ impl Downloader {
396357
row.try_get("hash")?,
397358
))
398359
}
360+
}
399361

400-
async fn remote_block(&self, n: U64) -> Result<jrpc::Block, Error> {
401-
Ok(self
402-
.jrpc_client
403-
.send_one(serde_json::json!({
404-
"id": "1",
405-
"jsonrpc": "2.0",
406-
"method": "eth_getBlockByNumber",
407-
"params": [n, true],
408-
}))
409-
.await?
410-
.to()?)
411-
}
362+
async fn remote_block(client: &jrpc::Client, n: U64) -> Result<jrpc::Block, Error> {
363+
Ok(client
364+
.send_one(serde_json::json!({
365+
"id": "1",
366+
"jsonrpc": "2.0",
367+
"method": "eth_getBlockByNumber",
368+
"params": [n, true],
369+
}))
370+
.await?
371+
.to()?)
372+
}
412373

413-
async fn remote_block_latest(&self) -> Result<jrpc::Block, Error> {
414-
Ok(self
415-
.jrpc_client
416-
.send_one(serde_json::json!({
417-
"id": "1",
418-
"jsonrpc": "2.0",
419-
"method": "eth_getBlockByNumber",
420-
"params": ["latest", true],
421-
}))
422-
.await?
423-
.to()?)
424-
}
374+
async fn remote_block_latest(client: &jrpc::Client) -> Result<jrpc::Block, Error> {
375+
Ok(client
376+
.send_one(serde_json::json!({
377+
"id": "1",
378+
"jsonrpc": "2.0",
379+
"method": "eth_getBlockByNumber",
380+
"params": ["latest", true],
381+
}))
382+
.await?
383+
.to()?)
384+
}
385+
#[tracing::instrument(level="info" skip_all, fields(from, to))]
386+
async fn download_blocks(
387+
client: &jrpc::Client,
388+
from: u64,
389+
to: u64,
390+
) -> Result<Vec<jrpc::Block>, Error> {
391+
Ok(client
392+
.send(
393+
(from..=to)
394+
.map(|n| {
395+
serde_json::json!({
396+
"id": "1",
397+
"jsonrpc": "2.0",
398+
"method": "eth_getBlockByNumber",
399+
"params": [U64::from(n), true],
400+
})
401+
})
402+
.collect(),
403+
)
404+
.await?
405+
.into_iter()
406+
.map(|resp| resp.to::<jrpc::Block>())
407+
.collect::<Result<Vec<_>, _>>()?
408+
.into_iter()
409+
.sorted_by(|a, b| a.number.cmp(&b.number))
410+
.collect())
411+
}
412+
413+
#[tracing::instrument(level="info" skip_all, fields(from, to))]
414+
async fn download_logs(client: &jrpc::Client, from: u64, to: u64) -> Result<Vec<jrpc::Log>, Error> {
415+
Ok(client
416+
.send_one(serde_json::json!({
417+
"id": "1",
418+
"jsonrpc": "2.0",
419+
"method": "eth_getLogs",
420+
"params": [{"fromBlock": U64::from(from), "toBlock": U64::from(to)}],
421+
}))
422+
.await?
423+
.to()?)
425424
}
426425

427426
fn validate_blocks(from: u64, to: u64, blocks: &[jrpc::Block]) -> Result<(), Error> {

0 commit comments

Comments
 (0)