Skip to content

Commit 6f86a22

Browse files
authored
feat: adjust some args to gc worker (#7469)
* chore: less stuff sent Signed-off-by: discord9 <discord9@163.com> * after rebase fix Signed-off-by: discord9 <discord9@163.com> * pcr Signed-off-by: discord9 <discord9@163.com> * fix: clarify comment on manifest file removal for GC worker Signed-off-by: discord9 <discord9@163.com> * per review Signed-off-by: discord9 <discord9@163.com> --------- Signed-off-by: discord9 <discord9@163.com>
1 parent 5162c1d commit 6f86a22

File tree

3 files changed

+49
-18
lines changed

3 files changed

+49
-18
lines changed

src/meta-srv/src/gc/options.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,11 @@ impl Default for GcSchedulerOptions {
7373
retry_backoff_duration: Duration::from_secs(5),
7474
region_gc_concurrency: 16,
7575
min_region_size_threshold: 100 * 1024 * 1024, // 100MB
76-
sst_count_weight: 1.0,
77-
file_removed_count_weight: 0.5,
76+
sst_count_weight: 0.5, // more sst means could potentially remove more files, moderate priority
77+
file_removed_count_weight: 1.0, // more file to be deleted, higher priority
7878
gc_cooldown_period: Duration::from_secs(60 * 5), // 5 minutes
79-
regions_per_table_threshold: 20, // Select top 20 regions per table
80-
mailbox_timeout: Duration::from_secs(60), // 60 seconds
79+
regions_per_table_threshold: 20, // Select top 20 regions per table
80+
mailbox_timeout: Duration::from_secs(60), // 60 seconds
8181
// Perform full file listing every 24 hours to find orphan files
8282
full_file_listing_interval: Duration::from_secs(60 * 60 * 24),
8383
// Clean up stale tracker entries every 6 hours

src/mito2/src/gc.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ impl LocalGcWorker {
249249
.entry(*region_id)
250250
.or_insert_with(HashSet::new)
251251
.extend(file_refs.clone());
252+
// no need to include manifest files here, as they are already included in region manifest
252253
}
253254

254255
Ok(tmp_ref_files)
@@ -328,15 +329,34 @@ impl LocalGcWorker {
328329
let region_id = region.region_id();
329330

330331
debug!("Doing gc for region {}", region_id);
332+
333+
let manifest = region.manifest_ctx.manifest().await;
334+
// If the manifest version does not match, skip GC for this region to avoid deleting files that are still in use.
335+
let file_ref_manifest_version = self
336+
.file_ref_manifest
337+
.manifest_version
338+
.get(&region.region_id())
339+
.cloned();
340+
if file_ref_manifest_version != Some(manifest.manifest_version) {
341+
// should be rare enough(few seconds after leader update manifest version), just skip gc for this region
342+
warn!(
343+
"Tmp ref files manifest version {:?} does not match region {} manifest version {}, skip gc for this region",
344+
file_ref_manifest_version,
345+
region.region_id(),
346+
manifest.manifest_version
347+
);
348+
return Ok(vec![]);
349+
}
350+
331351
// do the time consuming listing only when full_file_listing is true
332352
// and do it first to make sure we have the latest manifest etc.
333353
let all_entries = if self.full_file_listing {
334-
self.list_from_object_store(&region).await?
354+
self.list_from_object_store(region.region_id(), manifest.clone())
355+
.await?
335356
} else {
336357
vec![]
337358
};
338359

339-
let manifest = region.manifest_ctx.manifest().await;
340360
let region_id = manifest.metadata.region_id;
341361
let current_files = &manifest.files;
342362

@@ -509,10 +529,12 @@ impl LocalGcWorker {
509529
/// List all files in the region directory.
510530
/// Returns a vector of all file entries found.
511531
/// This might take a long time if there are many files in the region directory.
512-
async fn list_from_object_store(&self, region: &MitoRegionRef) -> Result<Vec<Entry>> {
532+
async fn list_from_object_store(
533+
&self,
534+
region_id: RegionId,
535+
manifest: Arc<RegionManifest>,
536+
) -> Result<Vec<Entry>> {
513537
let start = tokio::time::Instant::now();
514-
let region_id = region.region_id();
515-
let manifest = region.manifest_ctx.manifest().await;
516538
let current_files = &manifest.files;
517539
let concurrency = (current_files.len() / Self::CONCURRENCY_LIST_PER_FILES)
518540
.max(1)

src/mito2/src/sst/file_ref.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,28 +82,25 @@ impl FileReferenceManager {
8282

8383
/// Gets all ref files for the given regions, meaning all open FileHandles for those regions
8484
/// and from related regions' manifests.
85+
/// `query_regions_for_mem` queries for in memory file handles.
86+
/// `related_regions_in_manifest` queries for related regions' manifests to get more file refs of given region ids.
8587
pub(crate) async fn get_snapshot_of_file_refs(
8688
&self,
87-
query_regions: Vec<MitoRegionRef>,
88-
related_regions: Vec<(MitoRegionRef, Vec<RegionId>)>,
89+
query_regions_for_mem: Vec<MitoRegionRef>,
90+
related_regions_in_manifest: Vec<(MitoRegionRef, Vec<RegionId>)>,
8991
) -> Result<FileRefsManifest> {
9092
let mut ref_files = HashMap::new();
9193
// get from in memory file handles
92-
for region_id in query_regions.iter().map(|r| r.region_id()) {
94+
for region_id in query_regions_for_mem.iter().map(|r| r.region_id()) {
9395
if let Some(files) = self.ref_file_set(region_id) {
9496
ref_files.insert(region_id, files);
9597
}
9698
}
9799

98100
let mut manifest_version = HashMap::new();
99101

100-
for r in &query_regions {
101-
let manifest = r.manifest_ctx.manifest().await;
102-
manifest_version.insert(r.region_id(), manifest.manifest_version);
103-
}
104-
105102
// get file refs from related regions' manifests
106-
for (related_region, queries) in &related_regions {
103+
for (related_region, queries) in &related_regions_in_manifest {
107104
let queries = queries.iter().cloned().collect::<HashSet<_>>();
108105
let manifest = related_region.manifest_ctx.manifest().await;
109106
for meta in manifest.files.values() {
@@ -125,6 +122,18 @@ impl FileReferenceManager {
125122
manifest_version.insert(related_region.region_id(), manifest.manifest_version);
126123
}
127124

125+
for r in &query_regions_for_mem {
126+
let manifest = r.manifest_ctx.manifest().await;
127+
// remove in manifest files for smaller size, since gc worker read from manifest later.
128+
ref_files.entry(r.region_id()).and_modify(|refs| {
129+
*refs = std::mem::take(refs)
130+
.into_iter()
131+
.filter(|f| !manifest.files.contains_key(&f.file_id))
132+
.collect();
133+
});
134+
manifest_version.insert(r.region_id(), manifest.manifest_version);
135+
}
136+
128137
// simply return all ref files, no manifest version filtering for now.
129138
Ok(FileRefsManifest {
130139
file_refs: ref_files,

0 commit comments

Comments
 (0)