Skip to content

Commit c2dd2a3

Browse files
committed
fix cursor for v1 and v2 compat
1 parent 980f23b commit c2dd2a3

File tree

4 files changed

+73
-54
lines changed

4 files changed

+73
-54
lines changed

be/src/api_sql.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,10 +189,7 @@ async fn query(
189189
.iter()
190190
.map(|r| {
191191
query::sql(
192-
Some(query::Cursor::new(
193-
r.chain.unwrap_or_default(),
194-
r.block_height,
195-
)),
192+
&mut query::Cursor::new(r.chain.unwrap_or_default(), r.block_height),
196193
r.event_signatures.iter().map(|s| s.as_str()).collect(),
197194
&r.query,
198195
)
@@ -224,8 +221,8 @@ async fn query(
224221
.get::<usize, U64>(0)
225222
.to::<u64>();
226223
let mut result: Vec<Rows> = Vec::new();
227-
for eq in queries {
228-
result.push(handle_rows(pgtx.query(&eq.query, &[]).await?)?);
224+
for q in queries {
225+
result.push(handle_rows(pgtx.query(&q, &[]).await?)?);
229226
}
230227
Ok(Response {
231228
block_height,

be/src/api_sql2.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ use crate::{
3535
pub struct Request {
3636
#[serde(alias = "api-key")]
3737
pub api_key: Option<api::Key>,
38-
pub cursor: Option<query::Cursor>,
38+
#[serde(default)]
39+
pub cursor: query::Cursor,
3940
#[serde(default)]
4041
pub signatures: Vec<String>,
4142
pub query: String,
@@ -101,7 +102,7 @@ pub async fn handle_sse(
101102
.await
102103
{
103104
Ok(resp) if resp.len() == 1 => {
104-
req.cursor = Some(resp[0].cursor.clone());
105+
req.cursor = resp[0].cursor.clone();
105106
yield Ok(SSEvent::default().json_data(&resp).unwrap());
106107
}
107108
Err(err) => {
@@ -120,7 +121,7 @@ pub async fn handle_sse(
120121
match val {
121122
Ok(broadcast::Message::Close) => return,
122123
Ok(broadcast::Message::Block(new_block)) => {
123-
if req.cursor.as_ref().unwrap().contains(new_block.chain) {
124+
if req.cursor.contains(new_block.chain) {
124125
break;
125126
}
126127
},
@@ -163,7 +164,7 @@ impl RequestLog {
163164
for req in &log.requests {
164165
gafe.log_query(
165166
req.api_key.clone(),
166-
req.cursor.clone().unwrap_or_default(),
167+
req.cursor.clone(),
167168
req.signatures.clone(),
168169
req.query.clone(),
169170
latency,
@@ -208,14 +209,16 @@ async fn query(
208209
.await?;
209210
let mut result: Vec<Response> = Vec::new();
210211
for r in requests {
211-
let eq = query::sql(
212-
r.cursor.clone(),
212+
let mut cursor = r.cursor.clone();
213+
let q = query::sql(
214+
&mut cursor,
213215
r.signatures.iter().map(|s| s.as_str()).collect(),
214216
&r.query,
215217
)?;
216-
let rows = pgtx.query(&eq.query, &[]).await?;
218+
let rows = pgtx.query(&q, &[]).await?;
219+
update_cursor(&pgtx, &mut cursor).await?;
217220
result.push(Response {
218-
cursor: update_cursor(&pgtx, eq.cusror).await?,
221+
cursor,
219222
columns: get_columns(&rows),
220223
rows: get_rows(&rows),
221224
});
@@ -225,8 +228,8 @@ async fn query(
225228

226229
async fn update_cursor(
227230
pgtx: &tokio_postgres::Transaction<'_>,
228-
mut cursor: query::Cursor,
229-
) -> Result<query::Cursor, api::Error> {
231+
cursor: &mut query::Cursor,
232+
) -> Result<(), api::Error> {
230233
for c in cursor.chains() {
231234
let row = pgtx
232235
.query_one(
@@ -236,7 +239,7 @@ async fn update_cursor(
236239
.await?;
237240
cursor.set_block_height(c, row.get::<usize, U64>(0).to());
238241
}
239-
Ok(cursor)
242+
Ok(())
240243
}
241244

242245
fn get_columns(rows: &[tokio_postgres::Row]) -> HashMap<String, String> {

be/src/query.rs

Lines changed: 51 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,18 @@ impl Cursor {
3535
Cursor(map)
3636
}
3737

38-
pub fn from_chains(chains: &HashSet<u64>) -> Self {
39-
let map = chains.iter().map(|&c| (c, None)).collect();
40-
Cursor(map)
38+
pub fn add_chains(&mut self, chains: &HashSet<u64>) {
39+
chains.iter().for_each(|&c| {
40+
self.0.entry(c).or_insert(None);
41+
})
4142
}
4243

4344
pub fn contains(&self, chain: u64) -> bool {
4445
self.0.keys().any(|c| *c == chain)
4546
}
4647

4748
pub fn chains(&self) -> Vec<u64> {
48-
self.0.keys().cloned().collect()
49-
}
50-
51-
pub fn set_block_height(&mut self, chain: u64, n: u64) {
52-
self.0.insert(chain, Some(n));
49+
self.0.keys().sorted().cloned().collect()
5350
}
5451

5552
pub fn chain(&self) -> u64 {
@@ -59,36 +56,42 @@ impl Cursor {
5956
}
6057
}
6158

62-
pub fn to_sql(&self) -> String {
63-
self.0
59+
pub fn set_block_height(&mut self, chain: u64, n: u64) {
60+
self.0.insert(chain, Some(n));
61+
}
62+
63+
pub fn to_sql(&self, col_name: &str) -> String {
64+
let predicates = self
65+
.0
6466
.iter()
67+
.sorted_by_key(|(chain, _)| *chain)
6568
.map(|(chain, block_num)| match block_num {
66-
Some(n) => format!("(chain = {} and block_num > {})", chain, n),
69+
Some(n) => format!("(chain = {} and {} > {})", chain, col_name, n),
6770
None => format!("chain = {}", chain),
6871
})
69-
.collect::<Vec<_>>()
70-
.join(" or ")
72+
.collect::<Vec<_>>();
73+
if predicates.len() == 1 {
74+
predicates[0].clone()
75+
} else {
76+
format!("({})", predicates.join(" or "))
77+
}
7178
}
7279
}
7380

74-
pub struct EnhancedQuery {
75-
pub query: String,
76-
pub cusror: Cursor,
77-
}
78-
7981
const PG: &sqlparser::dialect::PostgreSqlDialect = &sqlparser::dialect::PostgreSqlDialect {};
8082

8183
/// Parses the user supplied query into a SQL AST
8284
/// and validates the query against the provided event signatures.
8385
/// The SQL API implements onlny a subset of SQL so un-supported
8486
/// SQL results in an error.
8587
pub fn sql(
86-
cursor: Option<Cursor>,
88+
cursor: &mut Cursor,
8789
signatures: Vec<&str>,
8890
user_query: &str,
89-
) -> Result<EnhancedQuery, api::Error> {
91+
) -> Result<String, api::Error> {
9092
let mut q = UserQuery::new(signatures)?;
91-
let new_query = q.process(user_query)?;
93+
let rewritten_query = q.process(user_query)?;
94+
cursor.add_chains(&q.chains);
9295
let query = [
9396
"with".to_string(),
9497
q.relations
@@ -97,13 +100,10 @@ pub fn sql(
97100
.sorted_by_key(|s| s.table_name.to_string())
98101
.map(|rel| rel.to_sql(cursor.clone()))
99102
.join(","),
100-
new_query.to_string(),
103+
rewritten_query.to_string(),
101104
]
102105
.join(" ");
103-
Ok(EnhancedQuery {
104-
query,
105-
cusror: Cursor::from_chains(&q.chains),
106-
})
106+
Ok(query)
107107
}
108108

109109
/*
@@ -153,7 +153,7 @@ impl Relation {
153153
|| self.table_alias.contains(other)
154154
}
155155

156-
fn to_sql(&self, cursor: Option<Cursor>) -> String {
156+
fn to_sql(&self, cursor: Cursor) -> String {
157157
let mut res: Vec<String> = Vec::new();
158158
res.push(format!("{} as not materialized (", self.table_name));
159159
res.push("select".to_string());
@@ -182,8 +182,10 @@ impl Relation {
182182
res.push(select_list.join(","));
183183

184184
let mut predicates = vec![];
185-
if let Some(cursor) = cursor {
186-
predicates.push(cursor.to_sql());
185+
if self.table_name.value.to_lowercase() == "blocks" {
186+
predicates.push(cursor.to_sql("num"));
187+
} else {
188+
predicates.push(cursor.to_sql("block_num"));
187189
}
188190
if let Some(event) = self.event.as_ref() {
189191
predicates.push(event.sighash_sql_predicate());
@@ -950,10 +952,10 @@ mod tests {
950952
}
951953

952954
async fn check_sql(event_sigs: Vec<&str>, user_query: &str, want: &str) {
953-
let got = sql(Some(Cursor::new(1, None)), event_sigs, user_query)
955+
let got = sql(&mut Cursor::new(1, None), event_sigs, user_query)
954956
.unwrap_or_else(|e| panic!("unable to create sql for:\n{} error: {:?}", user_query, e));
955957
let (got, want) = (
956-
fmt_sql(&got.query).unwrap_or_else(|_| panic!("unable to format got: {}", got.query)),
958+
fmt_sql(&got).unwrap_or_else(|_| panic!("unable to format got: {}", got)),
957959
fmt_sql(want).unwrap_or_else(|_| panic!("unable to format want: {}", want)),
958960
);
959961
if got.to_lowercase().ne(&want.to_lowercase()) {
@@ -964,6 +966,24 @@ mod tests {
964966
pg.query(&got, &[]).await.expect("issue with query");
965967
}
966968

969+
#[test]
970+
fn test_cursor() {
971+
let mut cursor = Cursor::default();
972+
cursor.set_block_height(8453, 100);
973+
cursor.set_block_height(10, 42);
974+
let _ = sql(
975+
&mut cursor,
976+
vec![],
977+
"select hash from txs where chain in (8453, 10, 1)",
978+
)
979+
.unwrap();
980+
assert_eq!(cursor.chains(), vec![1, 10, 8453]);
981+
assert_eq!(
982+
cursor.to_sql("foo"),
983+
"(chain = 1 or (chain = 10 and foo > 42) or (chain = 8453 and foo > 100))"
984+
);
985+
}
986+
967987
#[tokio::test]
968988
async fn test_blocks_table() {
969989
check_sql(

fe/src/god_mode.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ time::serde::format_description!(
1919
#[derive(Clone, Debug, Serialize)]
2020
struct UserQuery {
2121
owner_email: Option<String>,
22-
cursor: Option<query::Cursor>,
22+
cursor: query::Cursor,
2323
events: Vec<String>,
2424
sql: String,
2525
latency: Option<u64>,
@@ -34,12 +34,11 @@ struct UserQuery {
3434
impl UserQuery {
3535
pub fn gen_sql(mut self) -> UserQuery {
3636
self.generated_sql = be::query::sql(
37-
self.cursor.clone(),
37+
&mut self.cursor.clone(),
3838
self.events.iter().map(AsRef::as_ref).collect(),
3939
&self.sql,
4040
)
41-
.ok()
42-
.map(|eq| eq.query);
41+
.ok();
4342
self
4443
}
4544
}
@@ -117,7 +116,7 @@ async fn log(
117116
.await?
118117
.into_iter()
119118
.map(|row| UserQuery {
120-
cursor: Some(query::Cursor::default()),
119+
cursor: query::Cursor::default(),
121120
owner_email: row.get("owner_email"),
122121
events: row.get("events"),
123122
sql: row.get("user_query"),
@@ -164,7 +163,7 @@ async fn top(
164163
.await?
165164
.into_iter()
166165
.map(|row| UserQuery {
167-
cursor: Some(query::Cursor::default()),
166+
cursor: query::Cursor::default(),
168167
owner_email: row.get("owner_email"),
169168
events: row.get("events"),
170169
sql: row.get("user_query"),

0 commit comments

Comments
 (0)