Skip to content

Commit b25f24c

Browse files
authored
feat(meta-srv): add repartition procedure skeleton (#7487)
Signed-off-by: WenyXu <[email protected]>
1 parent 7bc0934 commit b25f24c

File tree

11 files changed

+783
-4
lines changed

11 files changed

+783
-4
lines changed

src/meta-srv/src/error.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use common_error::ext::{BoxedError, ErrorExt};
1717
use common_error::status_code::StatusCode;
1818
use common_macro::stack_trace_debug;
1919
use common_meta::DatanodeId;
20+
use common_procedure::ProcedureId;
2021
use common_runtime::JoinError;
2122
use snafu::{Location, Snafu};
2223
use store_api::storage::RegionId;
@@ -768,6 +769,35 @@ pub enum Error {
768769
location: Location,
769770
},
770771

772+
#[snafu(display("Failed to create repartition subtasks"))]
773+
RepartitionCreateSubtasks {
774+
source: partition::error::Error,
775+
#[snafu(implicit)]
776+
location: Location,
777+
},
778+
779+
#[snafu(display(
780+
"Source partition expression '{}' does not match any existing region",
781+
expr
782+
))]
783+
RepartitionSourceExprMismatch {
784+
expr: String,
785+
#[snafu(implicit)]
786+
location: Location,
787+
},
788+
789+
#[snafu(display(
790+
"Failed to get the state receiver for repartition subprocedure {}",
791+
procedure_id
792+
))]
793+
RepartitionSubprocedureStateReceiver {
794+
procedure_id: ProcedureId,
795+
#[snafu(source)]
796+
source: common_procedure::Error,
797+
#[snafu(implicit)]
798+
location: Location,
799+
},
800+
771801
#[snafu(display("Unsupported operation {}", operation))]
772802
Unsupported {
773803
operation: String,
@@ -1113,7 +1143,8 @@ impl ErrorExt for Error {
11131143
| Error::LeaderPeerChanged { .. }
11141144
| Error::RepartitionSourceRegionMissing { .. }
11151145
| Error::RepartitionTargetRegionMissing { .. }
1116-
| Error::PartitionExprMismatch { .. } => StatusCode::InvalidArguments,
1146+
| Error::PartitionExprMismatch { .. }
1147+
| Error::RepartitionSourceExprMismatch { .. } => StatusCode::InvalidArguments,
11171148
Error::LeaseKeyFromUtf8 { .. }
11181149
| Error::LeaseValueFromUtf8 { .. }
11191150
| Error::InvalidRegionKeyFromUtf8 { .. }
@@ -1173,6 +1204,8 @@ impl ErrorExt for Error {
11731204

11741205
Error::BuildTlsOptions { source, .. } => source.status_code(),
11751206
Error::Other { source, .. } => source.status_code(),
1207+
Error::RepartitionCreateSubtasks { source, .. } => source.status_code(),
1208+
Error::RepartitionSubprocedureStateReceiver { source, .. } => source.status_code(),
11761209
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,
11771210

11781211
#[cfg(feature = "pg_kvbackend")]

src/meta-srv/src/procedure/repartition.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,63 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
pub mod allocate_region;
16+
pub mod collect;
17+
pub mod deallocate_region;
18+
pub mod dispatch;
1519
pub mod group;
1620
pub mod plan;
21+
pub mod repartition_end;
22+
pub mod repartition_start;
23+
24+
use std::any::Any;
25+
use std::fmt::Debug;
26+
27+
use common_meta::cache_invalidator::CacheInvalidatorRef;
28+
use common_meta::key::TableMetadataManagerRef;
29+
use common_procedure::{Context as ProcedureContext, Status};
30+
use serde::{Deserialize, Serialize};
31+
use store_api::storage::TableId;
32+
33+
use crate::error::Result;
34+
use crate::procedure::repartition::plan::RepartitionPlanEntry;
35+
use crate::service::mailbox::MailboxRef;
1736

1837
#[cfg(test)]
1938
pub mod test_util;
39+
40+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
41+
pub struct PersistentContext {
42+
pub catalog_name: String,
43+
pub schema_name: String,
44+
pub table_name: String,
45+
pub table_id: TableId,
46+
pub plans: Vec<RepartitionPlanEntry>,
47+
}
48+
49+
pub struct Context {
50+
pub persistent_ctx: PersistentContext,
51+
pub table_metadata_manager: TableMetadataManagerRef,
52+
pub mailbox: MailboxRef,
53+
pub server_addr: String,
54+
pub cache_invalidator: CacheInvalidatorRef,
55+
}
56+
57+
#[async_trait::async_trait]
58+
#[typetag::serde(tag = "repartition_state")]
59+
pub(crate) trait State: Sync + Send + Debug {
60+
fn name(&self) -> &'static str {
61+
let type_name = std::any::type_name::<Self>();
62+
// short name
63+
type_name.split("::").last().unwrap_or(type_name)
64+
}
65+
66+
/// Yields the next [State] and [Status].
67+
async fn next(
68+
&mut self,
69+
ctx: &mut Context,
70+
procedure_ctx: &ProcedureContext,
71+
) -> Result<(Box<dyn State>, Status)>;
72+
73+
fn as_any(&self) -> &dyn Any;
74+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
17+
use common_procedure::{Context as ProcedureContext, Status};
18+
use serde::{Deserialize, Serialize};
19+
20+
use crate::error::Result;
21+
use crate::procedure::repartition::dispatch::Dispatch;
22+
use crate::procedure::repartition::plan::{AllocationPlanEntry, RepartitionPlanEntry};
23+
use crate::procedure::repartition::{Context, State};
24+
25+
#[derive(Debug, Clone, Serialize, Deserialize)]
26+
pub struct AllocateRegion {
27+
plan_entries: Vec<AllocationPlanEntry>,
28+
}
29+
30+
impl AllocateRegion {
31+
pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
32+
Self { plan_entries }
33+
}
34+
}
35+
36+
#[async_trait::async_trait]
37+
#[typetag::serde]
38+
impl State for AllocateRegion {
39+
async fn next(
40+
&mut self,
41+
ctx: &mut Context,
42+
_procedure_ctx: &ProcedureContext,
43+
) -> Result<(Box<dyn State>, Status)> {
44+
let region_to_allocate = self
45+
.plan_entries
46+
.iter()
47+
.map(|p| p.regions_to_allocate)
48+
.sum::<usize>();
49+
50+
if region_to_allocate == 0 {
51+
let repartition_plan_entries = self
52+
.plan_entries
53+
.iter()
54+
.map(RepartitionPlanEntry::from_allocation_plan_entry)
55+
.collect::<Vec<_>>();
56+
ctx.persistent_ctx.plans = repartition_plan_entries;
57+
return Ok((Box::new(Dispatch), Status::executing(true)));
58+
}
59+
60+
// TODO(weny): allocate regions.
61+
todo!()
62+
}
63+
64+
fn as_any(&self) -> &dyn Any {
65+
self
66+
}
67+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
17+
use common_procedure::{Context as ProcedureContext, ProcedureId, Status, watcher};
18+
use common_telemetry::error;
19+
use serde::{Deserialize, Serialize};
20+
use snafu::ResultExt;
21+
22+
use crate::error::{RepartitionSubprocedureStateReceiverSnafu, Result};
23+
use crate::procedure::repartition::deallocate_region::DeallocateRegion;
24+
use crate::procedure::repartition::group::GroupId;
25+
use crate::procedure::repartition::{Context, State};
26+
27+
/// Metadata for tracking a dispatched sub-procedure.
28+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq)]
29+
pub struct ProcedureMeta {
30+
/// The index of the plan entry in the parent procedure's plan list.
31+
pub plan_index: usize,
32+
/// The group id of the repartition group.
33+
pub group_id: GroupId,
34+
/// The procedure id of the sub-procedure.
35+
pub procedure_id: ProcedureId,
36+
}
37+
38+
/// State for collecting results from dispatched sub-procedures.
39+
#[derive(Debug, Clone, Serialize, Deserialize)]
40+
pub struct Collect {
41+
/// Sub-procedures that are currently in-flight.
42+
pub inflight_procedures: Vec<ProcedureMeta>,
43+
/// Sub-procedures that have completed successfully.
44+
pub succeeded_procedures: Vec<ProcedureMeta>,
45+
/// Sub-procedures that have failed.
46+
pub failed_procedures: Vec<ProcedureMeta>,
47+
/// Sub-procedures whose state could not be determined.
48+
pub unknown_procedures: Vec<ProcedureMeta>,
49+
}
50+
51+
impl Collect {
52+
pub fn new(inflight_procedures: Vec<ProcedureMeta>) -> Self {
53+
Self {
54+
inflight_procedures,
55+
succeeded_procedures: Vec::new(),
56+
failed_procedures: Vec::new(),
57+
unknown_procedures: Vec::new(),
58+
}
59+
}
60+
}
61+
62+
#[async_trait::async_trait]
63+
#[typetag::serde]
64+
impl State for Collect {
65+
async fn next(
66+
&mut self,
67+
_ctx: &mut Context,
68+
procedure_ctx: &ProcedureContext,
69+
) -> Result<(Box<dyn State>, Status)> {
70+
for procedure_meta in self.inflight_procedures.iter() {
71+
let procedure_id = procedure_meta.procedure_id;
72+
let group_id = procedure_meta.group_id;
73+
let Some(mut receiver) = procedure_ctx
74+
.provider
75+
.procedure_state_receiver(procedure_id)
76+
.await
77+
.context(RepartitionSubprocedureStateReceiverSnafu { procedure_id })?
78+
else {
79+
error!(
80+
"failed to get procedure state receiver, procedure_id: {}, group_id: {}",
81+
procedure_id, group_id
82+
);
83+
self.unknown_procedures.push(*procedure_meta);
84+
continue;
85+
};
86+
87+
match watcher::wait(&mut receiver).await {
88+
Ok(_) => self.succeeded_procedures.push(*procedure_meta),
89+
Err(e) => {
90+
error!(e; "failed to wait for repartition subprocedure, procedure_id: {}, group_id: {}", procedure_id, group_id);
91+
self.failed_procedures.push(*procedure_meta);
92+
}
93+
}
94+
}
95+
96+
if !self.failed_procedures.is_empty() || !self.unknown_procedures.is_empty() {
97+
// TODO(weny): retry the failed or unknown procedures.
98+
}
99+
100+
Ok((Box::new(DeallocateRegion), Status::executing(true)))
101+
}
102+
103+
fn as_any(&self) -> &dyn Any {
104+
self
105+
}
106+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Copyright 2023 Greptime Team
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
17+
use common_procedure::{Context as ProcedureContext, Status};
18+
use serde::{Deserialize, Serialize};
19+
20+
use crate::error::Result;
21+
use crate::procedure::repartition::repartition_end::RepartitionEnd;
22+
use crate::procedure::repartition::{Context, State};
23+
24+
#[derive(Debug, Clone, Serialize, Deserialize)]
25+
pub struct DeallocateRegion;
26+
27+
#[async_trait::async_trait]
28+
#[typetag::serde]
29+
impl State for DeallocateRegion {
30+
async fn next(
31+
&mut self,
32+
ctx: &mut Context,
33+
_procedure_ctx: &ProcedureContext,
34+
) -> Result<(Box<dyn State>, Status)> {
35+
let region_to_deallocate = ctx
36+
.persistent_ctx
37+
.plans
38+
.iter()
39+
.map(|p| p.pending_deallocate_region_ids.len())
40+
.sum::<usize>();
41+
if region_to_deallocate == 0 {
42+
return Ok((Box::new(RepartitionEnd), Status::done()));
43+
}
44+
45+
// TODO(weny): deallocate regions.
46+
todo!()
47+
}
48+
49+
fn as_any(&self) -> &dyn Any {
50+
self
51+
}
52+
}

0 commit comments

Comments
 (0)