diff --git a/arc.toml b/arc.toml index 3d511c2..99c1d09 100644 --- a/arc.toml +++ b/arc.toml @@ -257,5 +257,36 @@ enabled = false # Enable MQTT subscription manager (subscriptions configured via API) enabled = false +# ----------------------------------------------------------------------------- +# Query Configuration +# ----------------------------------------------------------------------------- +# Query execution optimizations +[query] +# Enable S3 file caching via DuckDB's cache_httpfs extension +# When enabled, caches S3 Parquet files in memory for faster repeated reads. +# This significantly improves query performance (5-10x) for: +# - CTEs (Common Table Expressions) that read the same table multiple times +# - Subqueries accessing the same data +# - Grafana dashboards with multiple panels querying similar time ranges +# +# Trade-off: Increases memory usage based on cache size settings below +# Only effective when storage.backend = "s3" +# +# Environment variable: ARC_QUERY_ENABLE_S3_CACHE +# Default: false +enable_s3_cache = false + +# Maximum size for in-memory S3 cache +# Supports: KB, MB, GB (e.g., "128MB", "256MB", "1GB") +# Environment variable: ARC_QUERY_S3_CACHE_SIZE +# Default: "128MB" +s3_cache_size = "128MB" + +# Cache entry TTL (time-to-live) in seconds +# Cached blocks are evicted after this duration +# Environment variable: ARC_QUERY_S3_CACHE_TTL_SECONDS +# Default: 3600 (1 hour) +s3_cache_ttl_seconds = 3600 + [telemetry] enabled = true diff --git a/cmd/arc/main.go b/cmd/arc/main.go index 4b3ea16..80069bc 100644 --- a/cmd/arc/main.go +++ b/cmd/arc/main.go @@ -166,6 +166,10 @@ func main() { AzureAccountName: cfg.Storage.AzureAccountName, AzureAccountKey: cfg.Storage.AzureAccountKey, AzureEndpoint: cfg.Storage.AzureEndpoint, + // Query optimization + EnableS3Cache: cfg.Query.EnableS3Cache, + S3CacheSize: cfg.Query.S3CacheSize, + S3CacheTTLSeconds: cfg.Query.S3CacheTTLSeconds, } db, err := database.New(dbConfig, logger.Get("database")) diff --git a/internal/config/config.go b/internal/config/config.go index ddce7a8..709819e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -29,6 +29,7 @@ type Config struct { License LicenseConfig Scheduler SchedulerConfig Cluster ClusterConfig + Query QueryConfig } type ServerConfig struct { @@ -161,6 +162,13 @@ type MQTTConfig struct { Enabled bool // Enable MQTT subscription manager feature } +// QueryConfig holds configuration for query execution optimizations +type QueryConfig struct { + EnableS3Cache bool // Enable S3 file caching for faster repeated reads (useful for CTEs/subqueries) + S3CacheSize int64 // Cache size in bytes (parsed from "128MB", "256MB", etc.) + S3CacheTTLSeconds int // Cache entry TTL in seconds (default: 3600 = 1 hour) +} + // LicenseConfig holds configuration for enterprise license validation type LicenseConfig struct { Enabled bool // Enable license validation (default: false) @@ -263,6 +271,12 @@ func Load() (*Config, error) { return nil, fmt.Errorf("invalid server.max_payload_size: %w", err) } + // Parse S3 cache size + s3CacheSize, err := ParseSize(v.GetString("query.s3_cache_size")) + if err != nil { + return nil, fmt.Errorf("invalid query.s3_cache_size: %w", err) + } + // Build config from Viper (which includes defaults + env vars) cfg := &Config{ Server: ServerConfig{ @@ -375,6 +389,11 @@ func Load() (*Config, error) { MQTT: MQTTConfig{ Enabled: v.GetBool("mqtt.enabled"), }, + Query: QueryConfig{ + EnableS3Cache: v.GetBool("query.enable_s3_cache"), + S3CacheSize: s3CacheSize, + S3CacheTTLSeconds: v.GetInt("query.s3_cache_ttl_seconds"), + }, License: LicenseConfig{ Enabled: v.GetBool("license.enabled"), Key: v.GetString("license.key"), @@ -525,6 +544,11 @@ func setDefaults(v *viper.Viper) { // MQTT defaults (subscriptions are configured via REST API, stored in SQLite) v.SetDefault("mqtt.enabled", false) // Feature toggle only - disabled by default + // Query defaults + v.SetDefault("query.enable_s3_cache", false) // Disabled by default (opt-in feature) + v.SetDefault("query.s3_cache_size", "128MB") // 128MB cache (256 blocks × 512KB) + v.SetDefault("query.s3_cache_ttl_seconds", 3600) // 1 hour + // License defaults (Enterprise features) // Note: Server URL and validation interval are hardcoded in internal/license/client.go v.SetDefault("license.enabled", false) // Disabled by default diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 11795a1..03d6727 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -657,3 +657,71 @@ func TestLoad_ClusterSeedsEnvOverride(t *testing.T) { } } } + +// QueryConfig Tests + +func TestQueryConfig_Defaults(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "arc-config-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + oldWd, _ := os.Getwd() + os.Chdir(tmpDir) + defer os.Chdir(oldWd) + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error = %v", err) + } + + // Test defaults + if cfg.Query.EnableS3Cache != false { + t.Errorf("Query.EnableS3Cache default = %v, want false", cfg.Query.EnableS3Cache) + } + expectedSize := int64(128 * 1024 * 1024) // 128MB in bytes + if cfg.Query.S3CacheSize != expectedSize { + t.Errorf("Query.S3CacheSize default = %d, want %d (128MB)", cfg.Query.S3CacheSize, expectedSize) + } + if cfg.Query.S3CacheTTLSeconds != 3600 { + t.Errorf("Query.S3CacheTTLSeconds default = %d, want 3600", cfg.Query.S3CacheTTLSeconds) + } +} + +func TestQueryConfig_EnvOverride(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "arc-config-test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(tmpDir) + + oldWd, _ := os.Getwd() + os.Chdir(tmpDir) + defer os.Chdir(oldWd) + + os.Setenv("ARC_QUERY_ENABLE_S3_CACHE", "true") + os.Setenv("ARC_QUERY_S3_CACHE_SIZE", "256MB") + os.Setenv("ARC_QUERY_S3_CACHE_TTL_SECONDS", "7200") + defer func() { + os.Unsetenv("ARC_QUERY_ENABLE_S3_CACHE") + os.Unsetenv("ARC_QUERY_S3_CACHE_SIZE") + os.Unsetenv("ARC_QUERY_S3_CACHE_TTL_SECONDS") + }() + + cfg, err := Load() + if err != nil { + t.Fatalf("Load() error = %v", err) + } + + if cfg.Query.EnableS3Cache != true { + t.Errorf("Query.EnableS3Cache = %v, want true", cfg.Query.EnableS3Cache) + } + expectedSize := int64(256 * 1024 * 1024) // 256MB in bytes + if cfg.Query.S3CacheSize != expectedSize { + t.Errorf("Query.S3CacheSize = %d, want %d (256MB)", cfg.Query.S3CacheSize, expectedSize) + } + if cfg.Query.S3CacheTTLSeconds != 7200 { + t.Errorf("Query.S3CacheTTLSeconds = %d, want 7200", cfg.Query.S3CacheTTLSeconds) + } +} diff --git a/internal/database/duckdb.go b/internal/database/duckdb.go index d693a7a..bb395c9 100644 --- a/internal/database/duckdb.go +++ b/internal/database/duckdb.go @@ -49,6 +49,10 @@ type Config struct { AzureAccountName string AzureAccountKey string AzureEndpoint string // Custom endpoint (optional) + // Query optimization configuration + EnableS3Cache bool // Enable S3 file caching via cache_httpfs extension + S3CacheSize int64 // Cache size in bytes + S3CacheTTLSeconds int // Cache entry TTL in seconds (default: 3600) } // New creates a new DuckDB instance @@ -88,6 +92,7 @@ func New(cfg *Config, logger zerolog.Logger) (*DuckDB, error) { Bool("wal_enabled", cfg.EnableWAL). Bool("s3_enabled", s3Enabled). Str("s3_region", cfg.S3Region). + Bool("s3_cache_enabled", cfg.EnableS3Cache). Bool("azure_enabled", azureEnabled). Str("azure_account", cfg.AzureAccountName). Msg("DuckDB initialized") @@ -135,7 +140,7 @@ func configureDatabase(db *sql.DB, cfg *Config, logger zerolog.Logger) error { // Configure httpfs extension for S3 access if credentials are provided if cfg.S3AccessKey != "" && cfg.S3SecretKey != "" { - if err := configureS3Access(db, cfg); err != nil { + if err := configureS3Access(db, cfg, logger); err != nil { return fmt.Errorf("failed to configure S3 access: %w", err) } } @@ -152,7 +157,7 @@ func configureDatabase(db *sql.DB, cfg *Config, logger zerolog.Logger) error { // configureS3Access sets up the httpfs extension for S3 access // Note: We use SET GLOBAL to ensure settings persist across all connections in the pool -func configureS3Access(db *sql.DB, cfg *Config) error { +func configureS3Access(db *sql.DB, cfg *Config, logger zerolog.Logger) error { // Install and load the httpfs extension if _, err := db.Exec("INSTALL httpfs"); err != nil { return fmt.Errorf("failed to install httpfs: %w", err) @@ -201,6 +206,36 @@ func configureS3Access(db *sql.DB, cfg *Config) error { return fmt.Errorf("failed to set s3_use_ssl: %w", err) } + // Configure cache_httpfs extension for S3 file caching if enabled + if cfg.EnableS3Cache { + logger.Info().Msg("Enabling S3 file caching via cache_httpfs extension") + if _, err := db.Exec("INSTALL cache_httpfs FROM community"); err != nil { + logger.Warn().Err(err).Msg("Failed to install cache_httpfs extension, continuing without cache") + } else if _, err := db.Exec("LOAD cache_httpfs"); err != nil { + logger.Warn().Err(err).Msg("Failed to load cache_httpfs extension, continuing without cache") + } else { + db.Exec("SET cache_httpfs_type='in_memory'") + // Calculate max blocks from cache size (each block is 512KB) + if cfg.S3CacheSize > 0 { + maxBlocks := cfg.S3CacheSize / (512 * 1024) // 512KB per block + if maxBlocks > 0 { + db.Exec(fmt.Sprintf("SET cache_httpfs_max_in_mem_cache_block_count=%d", maxBlocks)) + } else { + logger.Warn(). + Int64("configured_bytes", cfg.S3CacheSize). + Msg("S3 cache size too small (minimum 512KB), increase s3_cache_size for caching to take effect") + } + } + if cfg.S3CacheTTLSeconds > 0 { + db.Exec(fmt.Sprintf("SET cache_httpfs_in_mem_cache_block_timeout_millisec=%d", cfg.S3CacheTTLSeconds*1000)) + } + logger.Info(). + Int64("cache_size_bytes", cfg.S3CacheSize). + Int("ttl_seconds", cfg.S3CacheTTLSeconds). + Msg("cache_httpfs extension loaded with in_memory mode") + } + } + return nil }