Skip to content

Commit 8638b21

Browse files
authored
file watcher singleton thread pool (#18)
cp 00ee677 and cd8486a Signed-off-by: SpadeA <tangchenjie1210@gmail.com>
1 parent 5eadb2a commit 8638b21

File tree

5 files changed

+64
-38
lines changed

5 files changed

+64
-38
lines changed

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ tokenizer-api = { version = "0.2", path = "./tokenizer-api", package = "tantivy-
7070
sketches-ddsketch = { version = "0.2.1", features = ["use_serde"] }
7171
futures-util = { version = "0.3.28", optional = true }
7272
lazy_static = "1.5.0"
73-
tokio = { version = "1.29.1", features = ["rt-multi-thread"] }
73+
tokio = { version = "1.29.1", features = ["rt-multi-thread", "macros", "time"] }
7474
async-channel = "2.3.1"
7575

7676
[target.'cfg(windows)'.dependencies]
@@ -88,7 +88,6 @@ futures = "0.3.21"
8888
paste = "1.0.11"
8989
more-asserts = "0.3.1"
9090
rand_distr = "0.4.3"
91-
tokio = { version = "1.29.1", features = ["rt-multi-thread", "macros"] }
9291

9392
[target.'cfg(not(windows))'.dev-dependencies]
9493
criterion = "0.5"

src/directory/file_watcher.rs

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
use std::io::BufRead;
22
use std::path::Path;
33
use std::sync::atomic::{AtomicUsize, Ordering};
4-
use std::sync::Arc;
4+
use std::sync::{Arc, RwLock};
55
use std::time::Duration;
66
use std::{fs, io, thread};
77

88
use crc32fast::Hasher;
9+
use tokio::task::JoinHandle;
910

1011
use crate::directory::{WatchCallback, WatchCallbackList, WatchHandle};
12+
use crate::indexer::TOKIO_FILE_WATCHER_WORKER_RUNTIME;
1113

1214
const POLLING_INTERVAL: Duration = Duration::from_millis(if cfg!(test) { 1 } else { 500 });
1315

@@ -16,6 +18,8 @@ pub struct FileWatcher {
1618
path: Arc<Path>,
1719
callbacks: Arc<WatchCallbackList>,
1820
state: Arc<AtomicUsize>, // 0: new, 1: runnable, 2: terminated
21+
watch_handle: RwLock<Option<JoinHandle<()>>>,
22+
wakeup_channel: RwLock<Option<async_channel::Sender<()>>>,
1923
}
2024

2125
impl FileWatcher {
@@ -24,6 +28,8 @@ impl FileWatcher {
2428
path: Arc::from(path),
2529
callbacks: Default::default(),
2630
state: Default::default(),
31+
watch_handle: RwLock::new(None),
32+
wakeup_channel: RwLock::new(None),
2733
}
2834
}
2935

@@ -40,29 +46,34 @@ impl FileWatcher {
4046
let callbacks = self.callbacks.clone();
4147
let state = self.state.clone();
4248

43-
thread::Builder::new()
44-
.name("thread-tantivy-meta-file-watcher".to_string())
45-
.spawn(move || {
46-
let mut current_checksum_opt = None;
47-
48-
while state.load(Ordering::SeqCst) == 1 {
49-
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
50-
let metafile_has_changed = current_checksum_opt
51-
.map(|current_checksum| current_checksum != checksum)
52-
.unwrap_or(true);
53-
if metafile_has_changed {
54-
info!("Meta file {:?} was modified", path);
55-
current_checksum_opt = Some(checksum);
56-
// We actually ignore callbacks failing here.
57-
// We just wait for the end of their execution.
58-
let _ = callbacks.broadcast().wait();
59-
}
49+
let (tx, rx) = async_channel::bounded(5);
50+
self.wakeup_channel.write().unwrap().replace(tx);
51+
let task = async move {
52+
let mut current_checksum_opt = None;
53+
while state.load(Ordering::SeqCst) == 1 {
54+
if let Ok(checksum) = FileWatcher::compute_checksum(&path) {
55+
let metafile_has_changed = current_checksum_opt
56+
.map(|current_checksum| current_checksum != checksum)
57+
.unwrap_or(true);
58+
if metafile_has_changed {
59+
info!("Meta file {:?} was modified", path);
60+
current_checksum_opt = Some(checksum);
61+
// We actually ignore callbacks failing here.
62+
// We just wait for the end of their execution.
63+
let _ = callbacks.broadcast().wait();
6064
}
65+
}
6166

62-
thread::sleep(POLLING_INTERVAL);
67+
tokio::select! {
68+
_ = tokio::time::sleep(POLLING_INTERVAL) => {},
69+
_ = rx.recv() => {
70+
// Early wake up from sleep
71+
}
6372
}
64-
})
65-
.expect("Failed to spawn meta file watcher thread");
73+
}
74+
};
75+
let watch_handle = TOKIO_FILE_WATCHER_WORKER_RUNTIME.spawn(task);
76+
self.watch_handle.write().unwrap().replace(watch_handle);
6677
}
6778

6879
pub fn watch(&self, callback: WatchCallback) -> WatchHandle {
@@ -91,6 +102,11 @@ impl FileWatcher {
91102

92103
pub fn graceful_stop(&self) {
93104
self.state.store(2, Ordering::SeqCst);
105+
if let Some(handle) = self.watch_handle.write().unwrap().take() {
106+
let _ = self.wakeup_channel.write().unwrap().take();
107+
handle.abort();
108+
info!("Meta file watcher thread joined/aborted");
109+
}
94110
}
95111
}
96112

src/directory/watch_event_router.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::sync::{Arc, RwLock, Weak};
22

3+
use crate::indexer::MISC_POOL;
34
use crate::FutureResult;
45

56
/// Cloneable wrapper for callbacks registered when watching files of a `Directory`.
@@ -86,20 +87,14 @@ impl WatchCallbackList {
8687
let _ = sender.send(Ok(()));
8788
return result;
8889
}
89-
let spawn_res = std::thread::Builder::new()
90-
.name("watch-callbacks".to_string())
91-
.spawn(move || {
92-
for callback in callbacks {
93-
callback.call();
94-
}
95-
let _ = sender.send(Ok(()));
96-
});
97-
if let Err(err) = spawn_res {
98-
error!(
99-
"Failed to spawn thread to call watch callbacks. Cause: {:?}",
100-
err
101-
);
102-
}
90+
91+
MISC_POOL.spawn(move || {
92+
for callback in callbacks {
93+
callback.call();
94+
}
95+
let _ = sender.send(Ok(()));
96+
});
97+
10398
result
10499
}
105100
}

src/indexer/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ pub use self::index_writer::IndexWriter;
2727
pub use self::log_merge_policy::LogMergePolicy;
2828
pub use self::merge_operation::MergeOperation;
2929
pub use self::merge_policy::{MergeCandidate, MergePolicy, NoMergePolicy};
30-
pub use self::pool::{TOKIO_DOCSTORE_COMPRESS_RUNTIME, TOKIO_RUNTIME};
30+
pub use self::pool::{
31+
MISC_POOL, TOKIO_DOCSTORE_COMPRESS_RUNTIME, TOKIO_FILE_WATCHER_WORKER_RUNTIME, TOKIO_RUNTIME,
32+
};
3133
pub use self::prepared_commit::PreparedCommit;
3234
pub use self::segment_entry::SegmentEntry;
3335
pub use self::segment_serializer::SegmentSerializer;

src/indexer/pool.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ const MILVUS_TANTIVY_WRITER_THREAD_NUM: &str = "MILVUS_TANTIVY_WRITER_THREAD_NUM
99
const MILVUS_TOKIO_THREAD_NUM: &str = "MILVUS_TOKIO_THREAD_NUM";
1010
const MILVUS_TOKIO_DOCSTORE_COMPRESS_THREAD_NUM: &str =
1111
"MILVUS_TANTIVY_DOCSTORE_COMPRESS_THREAD_NUM";
12+
const MILVUS_TOKIO_FILE_WATCHER_THREAD_NUM: &str = "MILVUS_TOKIO_FILE_WATCHER_THREAD_NUM";
13+
const MILVUS_MISC_POLL: &str = "MILVUS_MISC_POLL";
1214

1315
lazy_static! {
1416
pub static ref TOKIO_RUNTIME: tokio::runtime::Runtime =
@@ -35,6 +37,18 @@ lazy_static! {
3537
.enable_all()
3638
.build()
3739
.expect("Failed to create tokio runtime");
40+
pub static ref TOKIO_FILE_WATCHER_WORKER_RUNTIME: tokio::runtime::Runtime =
41+
tokio::runtime::Builder::new_multi_thread()
42+
.worker_threads(get_num_thread(MILVUS_TOKIO_FILE_WATCHER_THREAD_NUM))
43+
.thread_name("tantivy-file-watcher")
44+
.enable_all()
45+
.build()
46+
.expect("Failed to create tokio runtime");
47+
pub static ref MISC_POOL: ThreadPool = ThreadPoolBuilder::new()
48+
.num_threads(get_num_thread(MILVUS_MISC_POLL))
49+
.thread_name(|sz| format!("tantivy-misc{}", sz))
50+
.build()
51+
.expect("Failed to create tantivy-misc thread pool");
3852
}
3953

4054
fn default_num_thread() -> usize {

0 commit comments

Comments
 (0)