diff --git a/Cargo.lock b/Cargo.lock index 728b5a0..a0478ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2368,6 +2368,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 1.0.69", +] + +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "quote" version = "1.0.40" @@ -2749,7 +2770,9 @@ dependencies = [ "chrono", "clap", "flate2", + "lazy_static", "log", + "prometheus", "reqwest", "serde", "serde_json", @@ -3060,7 +3083,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "thiserror", + "thiserror 2.0.12", "tokio", "tokio-stream", "tracing", @@ -3143,7 +3166,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.12", "tracing", "whoami", ] @@ -3180,7 +3203,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror", + "thiserror 2.0.12", "tracing", "whoami", ] @@ -3302,13 +3325,33 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.12", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index f108839..b437d97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,4 +27,6 @@ async-trait = "0.1.77" flate2 = "1.0.28" urlencoding = "2.1.3" clap = { version = "4.5.0", features = ["derive"] } -reqwest = { version = "0.12.0", features = ["json", "gzip"] } \ No newline at end of file +reqwest = { version = "0.12.0", features = ["json", "gzip"] } +prometheus = "0.13.4" +lazy_static = "1.5.0" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7645d12..4002ac1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,6 +2,7 @@ version: '3.8' # MinIO and PostgreSQL setup for s3dedup development # Buckets are automatically created by the application on startup +# PostgreSQL creates both 's3dedup' (main) and 's3dedup_test' (for tests) databases services: minio: image: minio/minio:RELEASE.2024-10-02T17-50-41Z @@ -34,6 +35,7 @@ services: POSTGRES_DB: s3dedup volumes: - postgres_data:/var/lib/postgresql/data + - ./docker/init-test-db.sh:/docker-entrypoint-initdb.d/init-test-db.sh healthcheck: test: ["CMD-SHELL", "pg_isready -U postgres"] interval: 10s diff --git a/docker/init-test-db.sh b/docker/init-test-db.sh new file mode 100755 index 0000000..bca4a71 --- /dev/null +++ b/docker/init-test-db.sh @@ -0,0 +1,6 @@ +#!/bin/bash +set -e + +psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL + CREATE DATABASE s3dedup_test; +EOSQL diff --git a/src/lib.rs b/src/lib.rs index e452e09..bf6374a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod filetracker_client; pub mod kvstorage; pub mod locks; pub mod logging; +pub mod metrics; pub mod migration; pub mod routes; pub mod s3storage; @@ -18,6 +19,7 @@ pub struct AppState { pub locks: Arc>>, pub s3storage: Arc>>, pub filetracker_client: Option>, + pub metrics: Arc, } impl AppState { @@ -27,12 +29,14 @@ impl AppState { let kvstorage = kvstorage::KVStorage::new(config).await?; let locks = locks::LocksStorage::new(&config.locks_type); let s3storage = s3storage::S3Storage::new(config).await?; + let metrics = Arc::new(metrics::Metrics::new()); Ok(Self { bucket_name: config.name.clone(), kvstorage: Arc::new(Mutex::new(kvstorage)), locks: Arc::new(Mutex::new(locks)), s3storage: Arc::new(Mutex::new(s3storage)), filetracker_client: None, + metrics, }) } @@ -44,12 +48,18 @@ impl AppState { let locks = locks::LocksStorage::new(&config.locks_type); let s3storage = s3storage::S3Storage::new(config).await?; let filetracker_client = filetracker_client::FiletrackerClient::new(filetracker_url); + let metrics = Arc::new(metrics::Metrics::new()); + + // Mark migration as active + metrics::MIGRATION_ACTIVE.set(1); + Ok(Self { bucket_name: config.name.clone(), kvstorage: Arc::new(Mutex::new(kvstorage)), locks: Arc::new(Mutex::new(locks)), s3storage: Arc::new(Mutex::new(s3storage)), filetracker_client: Some(Arc::new(filetracker_client)), + metrics, }) } } diff --git a/src/main.rs b/src/main.rs index 9ce1751..374db81 100644 --- a/src/main.rs +++ b/src/main.rs @@ -104,6 +104,11 @@ async fn run_s3dedup_server(config_path: Option<&str>, use_env: bool) { .put(ft_put_file) .delete(ft_delete_file), ) + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .route( + "/metrics/json", + get(s3dedup::routes::metrics::metrics_json_handler), + ) .layer( // Logging middleware TraceLayer::new_for_http() @@ -252,6 +257,11 @@ async fn run_live_migrate(config_path: Option<&str>, use_env: bool, max_concurre .put(s3dedup::routes::ft::put_file::ft_put_file) .delete(s3dedup::routes::ft::delete_file::ft_delete_file), ) + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .route( + "/metrics/json", + get(s3dedup::routes::metrics::metrics_json_handler), + ) .layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) @@ -316,6 +326,11 @@ async fn run_live_migrate(config_path: Option<&str>, use_env: bool, max_concurre .put(s3dedup::routes::ft::put_file::ft_put_file) .delete(s3dedup::routes::ft::delete_file::ft_delete_file), ) + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .route( + "/metrics/json", + get(s3dedup::routes::metrics::metrics_json_handler), + ) .layer( TraceLayer::new_for_http() .make_span_with(DefaultMakeSpan::new().level(Level::INFO)) diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..86bb324 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,244 @@ +use lazy_static::lazy_static; +use prometheus::{ + Encoder, Gauge, HistogramVec, IntCounterVec, IntGauge, TextEncoder, register_gauge, + register_histogram_vec, register_int_counter_vec, register_int_gauge, +}; +use serde_json::{Value, json}; + +lazy_static! { + // HTTP Request metrics + pub static ref HTTP_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_http_requests_total", + "Total number of HTTP requests", + &["method", "endpoint", "status"] + ) + .unwrap(); + + pub static ref HTTP_REQUEST_DURATION_SECONDS: HistogramVec = register_histogram_vec!( + "s3dedup_http_request_duration_seconds", + "HTTP request latencies in seconds", + &["method", "endpoint"] + ) + .unwrap(); + + // Storage metrics + pub static ref TOTAL_FILES: IntGauge = register_int_gauge!( + "s3dedup_total_files", + "Total number of logical files stored" + ) + .unwrap(); + + pub static ref TOTAL_BLOBS: IntGauge = register_int_gauge!( + "s3dedup_total_blobs", + "Total number of unique content blobs in S3" + ) + .unwrap(); + + pub static ref DEDUPLICATION_RATIO: Gauge = register_gauge!( + "s3dedup_deduplication_ratio", + "Ratio of total files to total blobs" + ) + .unwrap(); + + pub static ref TOTAL_LOGICAL_SIZE_BYTES: IntGauge = register_int_gauge!( + "s3dedup_total_logical_size_bytes", + "Sum of all logical file sizes" + ) + .unwrap(); + + pub static ref TOTAL_PHYSICAL_SIZE_BYTES: IntGauge = register_int_gauge!( + "s3dedup_total_physical_size_bytes", + "Actual storage used in S3" + ) + .unwrap(); + + pub static ref STORAGE_SAVINGS_RATIO: Gauge = register_gauge!( + "s3dedup_storage_savings_ratio", + "Storage savings ratio (logical - physical) / logical" + ) + .unwrap(); + + pub static ref AVERAGE_REFERENCE_COUNT: Gauge = register_gauge!( + "s3dedup_average_reference_count", + "Average number of files pointing to each blob" + ) + .unwrap(); + + // Deduplication metrics + pub static ref DEDUP_HITS_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_dedup_hits_total", + "Number of PUT requests that matched existing blobs", + &["bucket"] + ) + .unwrap(); + + pub static ref DEDUP_MISSES_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_dedup_misses_total", + "Number of PUT requests requiring new blob storage", + &["bucket"] + ) + .unwrap(); + + pub static ref DEDUP_HIT_RATE: Gauge = register_gauge!( + "s3dedup_dedup_hit_rate", + "Deduplication hit rate (hits / (hits + misses))" + ) + .unwrap(); + + // Cleaner metrics + pub static ref CLEANER_LAST_RUN_TIMESTAMP: IntGauge = register_int_gauge!( + "s3dedup_cleaner_last_run_timestamp_seconds", + "Timestamp of last successful cleaner run" + ) + .unwrap(); + + pub static ref CLEANER_TOTAL_RUNS: IntCounterVec = register_int_counter_vec!( + "s3dedup_cleaner_total_runs", + "Total number of cleaner runs", + &["bucket"] + ) + .unwrap(); + + pub static ref CLEANER_DELETED_BLOBS_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_cleaner_deleted_blobs_total", + "Total blobs deleted by cleaner", + &["bucket"] + ) + .unwrap(); + + pub static ref CLEANER_FREED_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_cleaner_freed_bytes_total", + "Total storage freed by cleaner", + &["bucket"] + ) + .unwrap(); + + pub static ref CLEANER_ERRORS_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_cleaner_errors_total", + "Total cleaner errors", + &["bucket"] + ) + .unwrap(); + + // Migration metrics + pub static ref MIGRATION_ACTIVE: IntGauge = register_int_gauge!( + "s3dedup_migration_active", + "Whether migration is currently active (1 = active, 0 = inactive)" + ) + .unwrap(); + + pub static ref MIGRATION_FILES_MIGRATED: IntCounterVec = register_int_counter_vec!( + "s3dedup_migration_files_migrated_total", + "Total files migrated from old filetracker", + &["bucket"] + ) + .unwrap(); + + pub static ref FILETRACKER_FALLBACKS_TOTAL: IntCounterVec = register_int_counter_vec!( + "s3dedup_filetracker_fallbacks_total", + "Number of GET requests served from old filetracker", + &["bucket"] + ) + .unwrap(); + + // System health metrics + pub static ref LOCK_QUEUE_SIZE: IntGauge = register_int_gauge!( + "s3dedup_lock_queue_size", + "Number of requests waiting for locks" + ) + .unwrap(); + + pub static ref UPTIME_SECONDS: IntGauge = register_int_gauge!( + "s3dedup_uptime_seconds", + "Server uptime in seconds" + ) + .unwrap(); +} + +#[derive(Clone)] +pub struct Metrics { + start_time: std::time::Instant, +} + +impl Metrics { + pub fn new() -> Self { + Self { + start_time: std::time::Instant::now(), + } + } + + /// Update uptime metric + pub fn update_uptime(&self) { + UPTIME_SECONDS.set(self.start_time.elapsed().as_secs() as i64); + } + + /// Gather all metrics and return as Prometheus text format + pub fn gather(&self) -> Result> { + self.update_uptime(); + + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + let mut buffer = vec![]; + encoder.encode(&metric_families, &mut buffer)?; + Ok(String::from_utf8(buffer)?) + } + + /// Gather all metrics and return as JSON + pub fn gather_json(&self) -> Result> { + self.update_uptime(); + + let metric_families = prometheus::gather(); + let mut metrics = json!({}); + + for mf in metric_families { + let name = mf.get_name(); + let help = mf.get_help(); + let metric_type = format!("{:?}", mf.get_field_type()); + + let mut metric_values = vec![]; + + for m in mf.get_metric() { + let mut labels = json!({}); + for label in m.get_label() { + labels[label.get_name()] = json!(label.get_value()); + } + + let value = if m.has_counter() { + json!({ + "labels": labels, + "value": m.get_counter().get_value(), + }) + } else if m.has_gauge() { + json!({ + "labels": labels, + "value": m.get_gauge().get_value(), + }) + } else if m.has_histogram() { + json!({ + "labels": labels, + "sample_count": m.get_histogram().get_sample_count(), + "sample_sum": m.get_histogram().get_sample_sum(), + }) + } else { + continue; + }; + + metric_values.push(value); + } + + metrics[name] = json!({ + "help": help, + "type": metric_type, + "metrics": metric_values, + }); + } + + Ok(metrics) + } +} + +impl Default for Metrics { + fn default() -> Self { + Self::new() + } +} diff --git a/src/migration/mod.rs b/src/migration/mod.rs index b7d328b..fe06134 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -162,6 +162,20 @@ pub async fn migrate_single_file_from_metadata( let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); app_state.locks.lock().await.acquire_exclusive(&lock_key); + // Recheck if file was already migrated after acquiring lock (race condition protection) + let current_modified_after_lock = app_state + .kvstorage + .lock() + .await + .get_modified(&app_state.bucket_name, path) + .await?; + + if current_modified_after_lock >= file_metadata.last_modified { + // File was migrated by another concurrent task, skip + app_state.locks.lock().await.release(&lock_key); + return Ok(()); + } + // Check if blob already exists in S3 let blob_exists = app_state .s3storage @@ -294,6 +308,20 @@ async fn migrate_single_file( let lock_key = crate::locks::file_lock(&app_state.bucket_name, path); app_state.locks.lock().await.acquire_exclusive(&lock_key); + // Recheck if file was already migrated after acquiring lock (race condition protection) + let current_modified_after_lock = app_state + .kvstorage + .lock() + .await + .get_modified(&app_state.bucket_name, path) + .await?; + + if current_modified_after_lock >= file_metadata.last_modified { + // File was migrated by another concurrent task, skip + app_state.locks.lock().await.release(&lock_key); + return Ok(false); + } + // Check if blob already exists in S3 let blob_exists = app_state .s3storage @@ -414,5 +442,7 @@ pub async fn live_migration_worker( } } - info!("Background migration worker finished"); + // Reset migration_active gauge to indicate migration is complete + crate::metrics::MIGRATION_ACTIVE.set(0); + info!("Background migration worker finished, migration_active set to 0"); } diff --git a/src/routes/ft/get_file.rs b/src/routes/ft/get_file.rs index 5fdc56e..b4b61d1 100644 --- a/src/routes/ft/get_file.rs +++ b/src/routes/ft/get_file.rs @@ -1,5 +1,5 @@ use crate::routes::ft::utils; -use crate::{AppState, locks}; +use crate::{AppState, locks, metrics}; use axum::body::Body; use axum::extract::{Path, State}; use axum::http::{Response, StatusCode}; @@ -11,6 +11,8 @@ pub async fn ft_get_file( State(state): State>, Path(path): Path, ) -> impl IntoResponse { + let start = std::time::Instant::now(); + // Remove leading slash from wildcard path let path = path.strip_prefix('/').unwrap_or(&path); debug!("Handling GET for path: {}", path); @@ -29,6 +31,14 @@ pub async fn ft_get_file( if modified_time.is_err() { error!("Failed to get modified time"); state.locks.lock().await.release(&lock_key); + + metrics::HTTP_REQUESTS_TOTAL + .with_label_values(&["GET", "/ft/files", "500"]) + .inc(); + metrics::HTTP_REQUEST_DURATION_SECONDS + .with_label_values(&["GET", "/ft/files"]) + .observe(start.elapsed().as_secs_f64()); + return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -46,6 +56,11 @@ pub async fn ft_get_file( Ok(file_metadata) => { debug!("File {} found in filetracker, migrating on-the-fly", path); + // Track filetracker fallback + metrics::FILETRACKER_FALLBACKS_TOTAL + .with_label_values(&[&state.bucket_name]) + .inc(); + // Migrate the file on-the-fly using migration logic let result = crate::migration::migrate_single_file_from_metadata( &state, @@ -57,6 +72,14 @@ pub async fn ft_get_file( if let Err(e) = result { error!("Failed to migrate file on-the-fly: {}", e); state.locks.lock().await.release(&lock_key); + + metrics::HTTP_REQUESTS_TOTAL + .with_label_values(&["GET", "/ft/files", "500"]) + .inc(); + metrics::HTTP_REQUEST_DURATION_SECONDS + .with_label_values(&["GET", "/ft/files"]) + .observe(start.elapsed().as_secs_f64()); + return Response::builder() .status(StatusCode::INTERNAL_SERVER_ERROR) .body(Body::empty()) @@ -66,6 +89,13 @@ pub async fn ft_get_file( // Release lock state.locks.lock().await.release(&lock_key); + metrics::HTTP_REQUESTS_TOTAL + .with_label_values(&["GET", "/ft/files", "200"]) + .inc(); + metrics::HTTP_REQUEST_DURATION_SECONDS + .with_label_values(&["GET", "/ft/files"]) + .observe(start.elapsed().as_secs_f64()); + // Serve the file directly from filetracker response return Response::builder() .status(StatusCode::OK) @@ -95,6 +125,14 @@ pub async fn ft_get_file( debug!("File {} not found", path); state.locks.lock().await.release(&lock_key); + + metrics::HTTP_REQUESTS_TOTAL + .with_label_values(&["GET", "/ft/files", "404"]) + .inc(); + metrics::HTTP_REQUEST_DURATION_SECONDS + .with_label_values(&["GET", "/ft/files"]) + .observe(start.elapsed().as_secs_f64()); + return Response::builder() .status(StatusCode::NOT_FOUND) .body(Body::empty()) @@ -159,7 +197,15 @@ pub async fn ft_get_file( // 6. Release lock state.locks.lock().await.release(&lock_key); - // 7. Return file with appropriate headers (matching original filetracker) + // 7. Record metrics + metrics::HTTP_REQUESTS_TOTAL + .with_label_values(&["GET", "/ft/files", "200"]) + .inc(); + metrics::HTTP_REQUEST_DURATION_SECONDS + .with_label_values(&["GET", "/ft/files"]) + .observe(start.elapsed().as_secs_f64()); + + // 8. Return file with appropriate headers (matching original filetracker) Response::builder() .status(StatusCode::OK) .header("Content-Type", "application/octet-stream") diff --git a/src/routes/metrics.rs b/src/routes/metrics.rs new file mode 100644 index 0000000..7ec0ad8 --- /dev/null +++ b/src/routes/metrics.rs @@ -0,0 +1,28 @@ +use crate::AppState; +use axum::Json; +use axum::extract::State; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use std::sync::Arc; + +pub async fn metrics_handler(State(state): State>) -> impl IntoResponse { + match state.metrics.gather() { + Ok(metrics_output) => (StatusCode::OK, metrics_output), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error gathering metrics: {}", e), + ), + } +} + +pub async fn metrics_json_handler(State(state): State>) -> impl IntoResponse { + match state.metrics.gather_json() { + Ok(metrics_json) => (StatusCode::OK, Json(metrics_json)), + Err(e) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(serde_json::json!({ + "error": format!("Error gathering metrics: {}", e) + })), + ), + } +} diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 8926510..1c4232d 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1 +1,2 @@ pub mod ft; +pub mod metrics; diff --git a/tests/integration_test.rs b/tests/integration_test.rs index 82767e9..baa2fa5 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -96,6 +96,7 @@ async fn create_test_app_with_state() -> (Router, Arc) { locks: Arc::new(Mutex::new(locks)), s3storage: Arc::new(Mutex::new(s3storage)), filetracker_client: None, + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); app_state.kvstorage.lock().await.setup().await.unwrap(); diff --git a/tests/metrics_test.rs b/tests/metrics_test.rs new file mode 100644 index 0000000..30b8b0c --- /dev/null +++ b/tests/metrics_test.rs @@ -0,0 +1,363 @@ +use axum::Router; +use axum::body::Body; +use axum::http::{Request, StatusCode}; +use axum::routing::get; +use s3dedup::AppState; +use s3dedup::config::BucketConfig; +use std::sync::Arc; +use tower::util::ServiceExt; + +async fn create_test_app_state() -> Arc { + // Create unique identifier to avoid conflicts + let thread_id = std::thread::current().id(); + let thread_id_str = format!("{:?}", thread_id) + .replace("ThreadId(", "") + .replace(")", ""); + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let unique_id = format!("{}{}{}", std::process::id(), thread_id_str, nanos); + let test_bucket = format!("test-metrics-{}", unique_id.to_lowercase()); + + std::fs::create_dir_all("db").ok(); + + let config = BucketConfig { + name: test_bucket, + address: "127.0.0.1".to_string(), + port: 3000, + kvstorage_type: s3dedup::config::KVStorageType::SQLite, + sqlite: Some(s3dedup::config::SQLiteConfig { + path: format!("db/test-metrics-{}.db", unique_id), + pool_size: 10, + }), + postgres: None, + locks_type: s3dedup::config::LocksType::Memory, + s3storage_type: s3dedup::config::S3StorageType::MinIO, + minio: Some(s3dedup::config::MinIOConfig { + endpoint: "http://localhost:9000".to_string(), + access_key: "minioadmin".to_string(), + secret_key: "minioadmin".to_string(), + force_path_style: true, + }), + cleaner: Default::default(), + filetracker_url: None, + }; + + let app_state = AppState::new(&config).await.unwrap(); + app_state.kvstorage.lock().await.setup().await.unwrap(); + + Arc::new(app_state) +} + +#[tokio::test] +async fn test_metrics_endpoint_exists() { + let app_state = create_test_app_state().await; + + let app = Router::new() + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .with_state(app_state); + + let response = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); +} + +#[tokio::test] +async fn test_metrics_format() { + let app_state = create_test_app_state().await; + + let app = Router::new() + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .with_state(app_state); + + let response = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + let body_str = String::from_utf8(body.to_vec()).unwrap(); + + // Check for Prometheus format - should have HELP and TYPE comments + assert!( + body_str.contains("# HELP"), + "Metrics should contain HELP comments" + ); + assert!( + body_str.contains("# TYPE"), + "Metrics should contain TYPE comments" + ); + + // Check for some expected metrics + assert!( + body_str.contains("s3dedup_uptime_seconds"), + "Should contain uptime metric" + ); + // Note: HTTP requests metric only appears after at least one request is made + // We just check that the metrics endpoint returns valid Prometheus format +} + +#[tokio::test] +async fn test_metrics_after_get_request() { + let app_state = create_test_app_state().await; + + // Create a router with both GET and metrics endpoints + let app = Router::new() + .route( + "/ft/files/{*path}", + get(s3dedup::routes::ft::get_file::ft_get_file), + ) + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .with_state(app_state); + + // Make a GET request that will return 404 + let app1 = app.clone(); + let _response = app1 + .oneshot( + Request::builder() + .uri("/ft/files/nonexistent.txt") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Check metrics endpoint + let response = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + let body_str = String::from_utf8(body.to_vec()).unwrap(); + + // Should have recorded the GET request with 404 status + assert!( + body_str.contains("s3dedup_http_requests_total"), + "Should contain HTTP requests metric" + ); + assert!( + body_str.contains("GET"), + "Should contain GET method in metrics" + ); + assert!( + body_str.contains("404"), + "Should contain 404 status in metrics" + ); +} + +#[tokio::test] +async fn test_metrics_uptime_increases() { + let app_state = create_test_app_state().await; + + let app = Router::new() + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .with_state(app_state.clone()); + + // First request + let response1 = app + .clone() + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body1 = axum::body::to_bytes(response1.into_body(), usize::MAX) + .await + .unwrap(); + let body1_str = String::from_utf8(body1.to_vec()).unwrap(); + + // Extract uptime value from first response + let uptime1 = body1_str + .lines() + .find(|line| line.starts_with("s3dedup_uptime_seconds ")) + .and_then(|line| line.split_whitespace().nth(1)) + .and_then(|val| val.parse::().ok()) + .expect("Should have uptime metric"); + + // Wait a bit + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // Second request + let response2 = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body2 = axum::body::to_bytes(response2.into_body(), usize::MAX) + .await + .unwrap(); + let body2_str = String::from_utf8(body2.to_vec()).unwrap(); + + let uptime2 = body2_str + .lines() + .find(|line| line.starts_with("s3dedup_uptime_seconds ")) + .and_then(|line| line.split_whitespace().nth(1)) + .and_then(|val| val.parse::().ok()) + .expect("Should have uptime metric"); + + assert!( + uptime2 >= uptime1, + "Uptime should increase or stay the same" + ); +} + +#[tokio::test] +async fn test_migration_active_metric() { + // Create unique identifier + let thread_id = std::thread::current().id(); + let thread_id_str = format!("{:?}", thread_id) + .replace("ThreadId(", "") + .replace(")", ""); + let nanos = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let unique_id = format!("{}{}{}", std::process::id(), thread_id_str, nanos); + let test_bucket = format!("test-migration-{}", unique_id.to_lowercase()); + + std::fs::create_dir_all("db").ok(); + + // Create app with filetracker client (migration mode) + let config = BucketConfig { + name: test_bucket, + address: "127.0.0.1".to_string(), + port: 3000, + kvstorage_type: s3dedup::config::KVStorageType::SQLite, + sqlite: Some(s3dedup::config::SQLiteConfig { + path: format!("db/test-migration-{}.db", unique_id), + pool_size: 10, + }), + postgres: None, + locks_type: s3dedup::config::LocksType::Memory, + s3storage_type: s3dedup::config::S3StorageType::MinIO, + minio: Some(s3dedup::config::MinIOConfig { + endpoint: "http://localhost:9000".to_string(), + access_key: "minioadmin".to_string(), + secret_key: "minioadmin".to_string(), + force_path_style: true, + }), + cleaner: Default::default(), + filetracker_url: Some("http://localhost:8000".to_string()), + }; + + let app_state = AppState::new_with_filetracker(&config, "http://localhost:8000".to_string()) + .await + .unwrap(); + app_state.kvstorage.lock().await.setup().await.unwrap(); + + let app = Router::new() + .route("/metrics", get(s3dedup::routes::metrics::metrics_handler)) + .with_state(Arc::new(app_state)); + + let response = app + .oneshot( + Request::builder() + .uri("/metrics") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + let body_str = String::from_utf8(body.to_vec()).unwrap(); + + // Should have migration_active set to 1 + assert!( + body_str.contains("s3dedup_migration_active"), + "Should contain migration_active metric" + ); + + // Find the value + let migration_active = body_str + .lines() + .find(|line| line.starts_with("s3dedup_migration_active ")) + .and_then(|line| line.split_whitespace().nth(1)) + .and_then(|val| val.parse::().ok()); + + assert_eq!( + migration_active, + Some(1), + "Migration should be marked as active" + ); +} + +#[tokio::test] +async fn test_metrics_json_endpoint() { + let app_state = create_test_app_state().await; + + let app = Router::new() + .route( + "/metrics/json", + get(s3dedup::routes::metrics::metrics_json_handler), + ) + .with_state(app_state); + + let response = app + .oneshot( + Request::builder() + .uri("/metrics/json") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let body = axum::body::to_bytes(response.into_body(), usize::MAX) + .await + .unwrap(); + let body_str = String::from_utf8(body.to_vec()).unwrap(); + + // Parse as JSON to verify it's valid JSON + let json: serde_json::Value = serde_json::from_str(&body_str).unwrap(); + + // Verify it's an object + assert!(json.is_object(), "Response should be a JSON object"); + + // Verify it contains uptime metric + assert!( + json.get("s3dedup_uptime_seconds").is_some(), + "Should contain uptime metric" + ); +} diff --git a/tests/migration_test.rs b/tests/migration_test.rs index 3cdf75e..96aca0f 100644 --- a/tests/migration_test.rs +++ b/tests/migration_test.rs @@ -204,6 +204,7 @@ async fn create_test_app_state() -> Arc { locks: Arc::new(tokio::sync::Mutex::new(locks)), s3storage: Arc::new(tokio::sync::Mutex::new(s3storage)), filetracker_client: None, + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); app_state.kvstorage.lock().await.setup().await.unwrap(); @@ -406,6 +407,7 @@ async fn test_live_migration_get_fallback() { locks: app_state.locks.clone(), s3storage: app_state.s3storage.clone(), filetracker_client: Some(Arc::new(FiletrackerClient::new(url))), + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); // Create router with live migration support @@ -452,6 +454,7 @@ async fn test_live_migration_put_dual_write() { locks: app_state.locks.clone(), s3storage: app_state.s3storage.clone(), filetracker_client: Some(Arc::new(FiletrackerClient::new(url))), + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); // Create router with live migration support @@ -528,6 +531,7 @@ async fn test_live_migration_delete_dual_delete() { locks: app_state.locks.clone(), s3storage: app_state.s3storage.clone(), filetracker_client: Some(Arc::new(FiletrackerClient::new(url))), + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); // Verify file exists in both before deletion @@ -598,6 +602,7 @@ async fn test_live_migration_get_not_found_in_both() { locks: app_state.locks.clone(), s3storage: app_state.s3storage.clone(), filetracker_client: Some(Arc::new(FiletrackerClient::new(url))), + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); // Create router with live migration support @@ -640,6 +645,7 @@ async fn test_live_migration_get_fallback_response_data() { locks: app_state.locks.clone(), s3storage: app_state.s3storage.clone(), filetracker_client: Some(Arc::new(FiletrackerClient::new(url))), + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); // Create router with live migration support @@ -703,6 +709,7 @@ async fn test_live_migration_subsequent_get_from_s3dedup() { locks: app_state.locks.clone(), s3storage: app_state.s3storage.clone(), filetracker_client: Some(Arc::new(FiletrackerClient::new(url))), + metrics: Arc::new(s3dedup::metrics::Metrics::new()), }); // First GET - should fallback to filetracker and migrate