Skip to content

Commit 2ff9726

Browse files
committed
add index_routing_rules table and create / get methods
1 parent d0cc1c5 commit 2ff9726

File tree

12 files changed

+740
-42
lines changed

12 files changed

+740
-42
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DROP TABLE IF EXISTS index_routing_rules;
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
CREATE TABLE IF NOT EXISTS index_routing_rules (
2+
routing_table_id VARCHAR(50) NOT NULL,
3+
rank INTEGER NOT NULL,
4+
filter TEXT NOT NULL,
5+
index_id VARCHAR(50) NOT NULL,
6+
create_timestamp TIMESTAMP NOT NULL DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'),
7+
update_timestamp TIMESTAMP NOT NULL DEFAULT (CURRENT_TIMESTAMP AT TIME ZONE 'UTC'),
8+
9+
PRIMARY KEY (routing_table_id, rank),
10+
FOREIGN KEY(index_id) REFERENCES indexes(index_id)
11+
);

quickwit/quickwit-metastore/src/metastore/control_plane_metastore.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ use quickwit_common::uri::Uri;
1919
use quickwit_proto::control_plane::{ControlPlaneService, ControlPlaneServiceClient};
2020
use quickwit_proto::metastore::{
2121
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
22-
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
23-
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
24-
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse,
25-
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
26-
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
22+
CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse,
23+
CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery,
24+
DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest,
25+
DeleteTask, EmptyResponse, FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse,
26+
GetClusterIdentityRequest, GetClusterIdentityResponse, GetIndexRoutingTableRequest,
27+
GetIndexRoutingTableResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
2728
IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse,
2829
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
2930
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
@@ -289,4 +290,18 @@ impl MetastoreService for ControlPlaneMetastore {
289290
) -> MetastoreResult<GetClusterIdentityResponse> {
290291
self.metastore.get_cluster_identity(request).await
291292
}
293+
294+
async fn create_index_routing_table(
295+
&self,
296+
request: CreateIndexRoutingTableRequest,
297+
) -> MetastoreResult<CreateIndexRoutingTableResponse> {
298+
self.metastore.create_index_routing_table(request).await
299+
}
300+
301+
async fn get_index_routing_table(
302+
&self,
303+
request: GetIndexRoutingTableRequest,
304+
) -> MetastoreResult<GetIndexRoutingTableResponse> {
305+
self.metastore.get_index_routing_table(request).await
306+
}
292307
}

quickwit/quickwit-metastore/src/metastore/file_backed/mod.rs

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,22 +41,24 @@ use quickwit_common::ServiceStream;
4141
use quickwit_config::IndexTemplate;
4242
use quickwit_proto::metastore::{
4343
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
44-
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
45-
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
46-
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind,
47-
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
48-
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
49-
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
50-
IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
51-
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
52-
ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest,
53-
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
54-
ListShardsRequest, ListShardsResponse, ListSplitsRequest, ListSplitsResponse,
55-
ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError, MetastoreResult,
56-
MetastoreService, MetastoreServiceStream, OpenShardSubrequest, OpenShardsRequest,
57-
OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest, ResetSourceCheckpointRequest,
58-
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest,
59-
UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse, serde_utils,
44+
CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse,
45+
CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery,
46+
DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest,
47+
DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest,
48+
FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse,
49+
GetIndexRoutingTableRequest, GetIndexRoutingTableResponse, GetIndexTemplateRequest,
50+
GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason,
51+
IndexMetadataRequest, IndexMetadataResponse, IndexTemplateMatch, IndexesMetadataRequest,
52+
IndexesMetadataResponse, LastDeleteOpstampRequest, LastDeleteOpstampResponse,
53+
ListDeleteTasksRequest, ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
54+
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
55+
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
56+
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
57+
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
58+
OpenShardsRequest, OpenShardsResponse, PruneShardsRequest, PublishSplitsRequest,
59+
ResetSourceCheckpointRequest, StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest,
60+
UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest, UpdateSplitsDeleteOpstampResponse,
61+
serde_utils,
6062
};
6163
use quickwit_proto::types::{IndexId, IndexUid};
6264
use quickwit_storage::Storage;
@@ -1275,6 +1277,26 @@ impl MetastoreService for FileBackedMetastore {
12751277
uuid: state_wlock_guard.identity.hyphenated().to_string(),
12761278
})
12771279
}
1280+
1281+
async fn create_index_routing_table(
1282+
&self,
1283+
_request: CreateIndexRoutingTableRequest,
1284+
) -> MetastoreResult<CreateIndexRoutingTableResponse> {
1285+
Err(MetastoreError::Internal {
1286+
message: "routing tables are not supported in file-backed metastore".to_string(),
1287+
cause: "unsupported operation".to_string(),
1288+
})
1289+
}
1290+
1291+
async fn get_index_routing_table(
1292+
&self,
1293+
_request: GetIndexRoutingTableRequest,
1294+
) -> MetastoreResult<GetIndexRoutingTableResponse> {
1295+
Err(MetastoreError::Internal {
1296+
message: "routing tables are not supported in file-backed metastore".to_string(),
1297+
cause: "unsupported operation".to_string(),
1298+
})
1299+
}
12781300
}
12791301

12801302
impl MetastoreServiceExt for FileBackedMetastore {}

quickwit/quickwit-metastore/src/metastore/postgres/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,13 @@ pub(super) fn convert_sqlx_err(index_id: &str, sqlx_error: sqlx::Error) -> Metas
4040
index_id: index_id.to_string(),
4141
})
4242
}
43+
(pg_error_codes::UNIQUE_VIOLATION, Some("index_routing_rules")) => {
44+
error!(error=?boxed_db_error, "postgresql-error");
45+
MetastoreError::Internal {
46+
message: "duplicate rank in routing table".to_string(),
47+
cause: format!("DB error {boxed_db_error:?}"),
48+
}
49+
}
4350
(pg_error_codes::UNIQUE_VIOLATION, _) => {
4451
error!(error=?boxed_db_error, "postgresql-error");
4552
MetastoreError::Internal {

quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,19 +29,21 @@ use quickwit_config::{
2929
use quickwit_proto::ingest::{Shard, ShardState};
3030
use quickwit_proto::metastore::{
3131
AcquireShardsRequest, AcquireShardsResponse, AddSourceRequest, CreateIndexRequest,
32-
CreateIndexResponse, CreateIndexTemplateRequest, DeleteIndexRequest,
33-
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteShardsResponse,
34-
DeleteSourceRequest, DeleteSplitsRequest, DeleteTask, EmptyResponse, EntityKind,
35-
FindIndexTemplateMatchesRequest, FindIndexTemplateMatchesResponse, GetClusterIdentityRequest,
36-
GetClusterIdentityResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
37-
IndexMetadataFailure, IndexMetadataFailureReason, IndexMetadataRequest, IndexMetadataResponse,
38-
IndexStats, IndexTemplateMatch, IndexesMetadataRequest, IndexesMetadataResponse,
39-
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
40-
ListDeleteTasksResponse, ListIndexStatsRequest, ListIndexStatsResponse,
41-
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
42-
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListShardsSubresponse,
43-
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
44-
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
32+
CreateIndexResponse, CreateIndexRoutingTableRequest, CreateIndexRoutingTableResponse,
33+
CreateIndexTemplateRequest, DeleteIndexRequest, DeleteIndexTemplatesRequest, DeleteQuery,
34+
DeleteShardsRequest, DeleteShardsResponse, DeleteSourceRequest, DeleteSplitsRequest,
35+
DeleteTask, EmptyResponse, EntityKind, FindIndexTemplateMatchesRequest,
36+
FindIndexTemplateMatchesResponse, GetClusterIdentityRequest, GetClusterIdentityResponse,
37+
GetIndexRoutingTableRequest, GetIndexRoutingTableResponse, GetIndexTemplateRequest,
38+
GetIndexTemplateResponse, IndexMetadataFailure, IndexMetadataFailureReason,
39+
IndexMetadataRequest, IndexMetadataResponse, IndexRoutingRule, IndexStats, IndexTemplateMatch,
40+
IndexesMetadataRequest, IndexesMetadataResponse, LastDeleteOpstampRequest,
41+
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
42+
ListIndexStatsRequest, ListIndexStatsResponse, ListIndexTemplatesRequest,
43+
ListIndexTemplatesResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
44+
ListShardsRequest, ListShardsResponse, ListShardsSubresponse, ListSplitsRequest,
45+
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreError,
46+
MetastoreResult, MetastoreService, MetastoreServiceStream, OpenShardSubrequest,
4547
OpenShardSubresponse, OpenShardsRequest, OpenShardsResponse, PruneShardsRequest,
4648
PublishSplitsRequest, ResetSourceCheckpointRequest, SplitStats, StageSplitsRequest,
4749
ToggleSourceRequest, UpdateIndexRequest, UpdateSourceRequest, UpdateSplitsDeleteOpstampRequest,
@@ -57,7 +59,9 @@ use uuid::Uuid;
5759

5860
use super::error::convert_sqlx_err;
5961
use super::migrator::run_migrations;
60-
use super::model::{PgDeleteTask, PgIndex, PgIndexTemplate, PgShard, PgSplit, Splits};
62+
use super::model::{
63+
PgDeleteTask, PgIndex, PgIndexRoutingRule, PgIndexTemplate, PgShard, PgSplit, Splits,
64+
};
6165
use super::pool::TrackedPool;
6266
use super::split_stream::SplitStream;
6367
use super::utils::{append_query_filters_and_order_by, establish_connection};
@@ -1763,6 +1767,68 @@ impl MetastoreService for PostgresqlMetastore {
17631767
.await?;
17641768
Ok(GetClusterIdentityResponse { uuid })
17651769
}
1770+
1771+
async fn create_index_routing_table(
1772+
&self,
1773+
request: CreateIndexRoutingTableRequest,
1774+
) -> MetastoreResult<CreateIndexRoutingTableResponse> {
1775+
const INSERT_ROUTING_RULE_QUERY: &str = include_str!("queries/routing_rules/insert.sql");
1776+
1777+
// Validate request
1778+
if request.rules.is_empty() {
1779+
return Err(MetastoreError::InvalidArgument {
1780+
message: "routing table must contain at least one rule".to_string(),
1781+
});
1782+
}
1783+
1784+
// Generate routing_table_id
1785+
let routing_table_id = Uuid::new_v4().hyphenated().to_string();
1786+
1787+
// Use transaction for atomicity
1788+
run_with_tx!(self.connection_pool, tx, "create_routing_table", {
1789+
// Insert each rule with its array index as the rank
1790+
for (rank, rule) in request.rules.iter().enumerate() {
1791+
sqlx::query(INSERT_ROUTING_RULE_QUERY)
1792+
.bind(&routing_table_id)
1793+
.bind(rank as i32)
1794+
.bind(&rule.filter)
1795+
.bind(&rule.index_id)
1796+
.execute(tx.as_mut())
1797+
.await
1798+
.map_err(|e| convert_sqlx_err("routing_rules", e))?;
1799+
}
1800+
1801+
Ok(CreateIndexRoutingTableResponse { routing_table_id })
1802+
})
1803+
}
1804+
1805+
async fn get_index_routing_table(
1806+
&self,
1807+
request: GetIndexRoutingTableRequest,
1808+
) -> MetastoreResult<GetIndexRoutingTableResponse> {
1809+
const SELECT_ROUTING_RULES_QUERY: &str = include_str!("queries/routing_rules/select.sql");
1810+
1811+
let pg_rules: Vec<PgIndexRoutingRule> = sqlx::query_as(SELECT_ROUTING_RULES_QUERY)
1812+
.bind(&request.routing_table_id)
1813+
.fetch_all(&self.connection_pool)
1814+
.await?;
1815+
1816+
if pg_rules.is_empty() {
1817+
return Err(MetastoreError::NotFound(EntityKind::RoutingTable {
1818+
routing_table_id: request.routing_table_id.clone(),
1819+
}));
1820+
}
1821+
1822+
let rules = pg_rules
1823+
.into_iter()
1824+
.map(|pg_rule| IndexRoutingRule {
1825+
index_id: pg_rule.index_id,
1826+
filter: pg_rule.filter,
1827+
})
1828+
.collect();
1829+
1830+
Ok(GetIndexRoutingTableResponse { rules })
1831+
}
17661832
}
17671833

17681834
async fn open_or_fetch_shard<'e>(

quickwit/quickwit-metastore/src/metastore/postgres/model.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,3 +292,11 @@ impl From<PgShard> for Shard {
292292
pub(super) struct PgIndexTemplate {
293293
pub index_template_json: String,
294294
}
295+
296+
#[derive(sqlx::FromRow, Debug)]
297+
pub(super) struct PgIndexRoutingRule {
298+
pub routing_table_id: String,
299+
pub rank: i32,
300+
pub filter: String,
301+
pub index_id: String,
302+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
INSERT INTO index_routing_rules (routing_table_id, rank, filter, index_id)
2+
VALUES ($1, $2, $3, $4)
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
SELECT routing_table_id, rank, filter, index_id
2+
FROM index_routing_rules
3+
WHERE routing_table_id = $1
4+
ORDER BY rank ASC

quickwit/quickwit-proto/protos/quickwit/metastore.proto

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,16 @@ service MetastoreService {
204204

205205
// Get cluster identity
206206
rpc GetClusterIdentity(GetClusterIdentityRequest) returns (GetClusterIdentityResponse);
207-
}
208207

209-
message EmptyResponse {
208+
// Creates index routing table
209+
rpc CreateIndexRoutingTable(CreateIndexRoutingTableRequest) returns (CreateIndexRoutingTableResponse);
210+
211+
// Get index routing table
212+
rpc GetIndexRoutingTable(GetIndexRoutingTableRequest) returns (GetIndexRoutingTableResponse);
210213
}
211214

215+
message EmptyResponse {}
216+
212217
message CreateIndexRequest {
213218
string index_config_json = 2;
214219
repeated string source_configs_json = 3;
@@ -229,7 +234,7 @@ message UpdateIndexRequest {
229234
}
230235

231236
message ListIndexesMetadataRequest {
232-
reserved 1;
237+
reserved 1;
233238
// List of patterns an index should match or not match to get considered
234239
// An index must match at least one positive pattern (a pattern not starting
235240
// with a '-'), and no negative pattern (a pattern starting with a '-').
@@ -550,8 +555,7 @@ message IndexTemplateMatch {
550555
string index_template_json = 3;
551556
}
552557

553-
message ListIndexTemplatesRequest {
554-
}
558+
message ListIndexTemplatesRequest {}
555559

556560
message ListIndexTemplatesResponse {
557561
repeated string index_templates_json = 1;
@@ -561,9 +565,29 @@ message DeleteIndexTemplatesRequest {
561565
repeated string template_ids = 1;
562566
}
563567

564-
message GetClusterIdentityRequest {
565-
}
568+
message GetClusterIdentityRequest {}
566569

567570
message GetClusterIdentityResponse {
568571
string uuid = 1;
569572
}
573+
574+
message CreateIndexRoutingTableRequest {
575+
repeated IndexRoutingRule rules = 1;
576+
}
577+
578+
message CreateIndexRoutingTableResponse {
579+
string routing_table_id = 1;
580+
}
581+
582+
message GetIndexRoutingTableRequest {
583+
string routing_table_id = 1;
584+
}
585+
586+
message GetIndexRoutingTableResponse {
587+
repeated IndexRoutingRule rules = 1;
588+
}
589+
590+
message IndexRoutingRule {
591+
string index_id = 1;
592+
string filter = 2;
593+
}

0 commit comments

Comments
 (0)