Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions arc.toml
Original file line number Diff line number Diff line change
Expand Up @@ -257,5 +257,36 @@ enabled = false
# Enable MQTT subscription manager (subscriptions configured via API)
enabled = false

# -----------------------------------------------------------------------------
# Query Configuration
# -----------------------------------------------------------------------------
# Query execution optimizations
[query]
# Enable S3 file caching via DuckDB's cache_httpfs extension
# When enabled, caches S3 Parquet files in memory for faster repeated reads.
# This significantly improves query performance (5-10x) for:
# - CTEs (Common Table Expressions) that read the same table multiple times
# - Subqueries accessing the same data
# - Grafana dashboards with multiple panels querying similar time ranges
#
# Trade-off: Increases memory usage based on cache size settings below
# Only effective when storage.backend = "s3"
#
# Environment variable: ARC_QUERY_ENABLE_S3_CACHE
# Default: false
enable_s3_cache = false

# Maximum size for in-memory S3 cache
# Supports: KB, MB, GB (e.g., "128MB", "256MB", "1GB")
# Environment variable: ARC_QUERY_S3_CACHE_SIZE
# Default: "128MB"
s3_cache_size = "128MB"

# Cache entry TTL (time-to-live) in seconds
# Cached blocks are evicted after this duration
# Environment variable: ARC_QUERY_S3_CACHE_TTL_SECONDS
# Default: 3600 (1 hour)
s3_cache_ttl_seconds = 3600

[telemetry]
enabled = true
4 changes: 4 additions & 0 deletions cmd/arc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func main() {
AzureAccountName: cfg.Storage.AzureAccountName,
AzureAccountKey: cfg.Storage.AzureAccountKey,
AzureEndpoint: cfg.Storage.AzureEndpoint,
// Query optimization
EnableS3Cache: cfg.Query.EnableS3Cache,
S3CacheSize: cfg.Query.S3CacheSize,
S3CacheTTLSeconds: cfg.Query.S3CacheTTLSeconds,
}

db, err := database.New(dbConfig, logger.Get("database"))
Expand Down
24 changes: 24 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type Config struct {
License LicenseConfig
Scheduler SchedulerConfig
Cluster ClusterConfig
Query QueryConfig
}

type ServerConfig struct {
Expand Down Expand Up @@ -161,6 +162,13 @@ type MQTTConfig struct {
Enabled bool // Enable MQTT subscription manager feature
}

// QueryConfig holds configuration for query execution optimizations
type QueryConfig struct {
EnableS3Cache bool // Enable S3 file caching for faster repeated reads (useful for CTEs/subqueries)
S3CacheSize int64 // Cache size in bytes (parsed from "128MB", "256MB", etc.)
S3CacheTTLSeconds int // Cache entry TTL in seconds (default: 3600 = 1 hour)
}

// LicenseConfig holds configuration for enterprise license validation
type LicenseConfig struct {
Enabled bool // Enable license validation (default: false)
Expand Down Expand Up @@ -263,6 +271,12 @@ func Load() (*Config, error) {
return nil, fmt.Errorf("invalid server.max_payload_size: %w", err)
}

// Parse S3 cache size
s3CacheSize, err := ParseSize(v.GetString("query.s3_cache_size"))
if err != nil {
return nil, fmt.Errorf("invalid query.s3_cache_size: %w", err)
}

// Build config from Viper (which includes defaults + env vars)
cfg := &Config{
Server: ServerConfig{
Expand Down Expand Up @@ -375,6 +389,11 @@ func Load() (*Config, error) {
MQTT: MQTTConfig{
Enabled: v.GetBool("mqtt.enabled"),
},
Query: QueryConfig{
EnableS3Cache: v.GetBool("query.enable_s3_cache"),
S3CacheSize: s3CacheSize,
S3CacheTTLSeconds: v.GetInt("query.s3_cache_ttl_seconds"),
},
License: LicenseConfig{
Enabled: v.GetBool("license.enabled"),
Key: v.GetString("license.key"),
Expand Down Expand Up @@ -525,6 +544,11 @@ func setDefaults(v *viper.Viper) {
// MQTT defaults (subscriptions are configured via REST API, stored in SQLite)
v.SetDefault("mqtt.enabled", false) // Feature toggle only - disabled by default

// Query defaults
v.SetDefault("query.enable_s3_cache", false) // Disabled by default (opt-in feature)
v.SetDefault("query.s3_cache_size", "128MB") // 128MB cache (256 blocks × 512KB)
v.SetDefault("query.s3_cache_ttl_seconds", 3600) // 1 hour

// License defaults (Enterprise features)
// Note: Server URL and validation interval are hardcoded in internal/license/client.go
v.SetDefault("license.enabled", false) // Disabled by default
Expand Down
68 changes: 68 additions & 0 deletions internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,3 +657,71 @@ func TestLoad_ClusterSeedsEnvOverride(t *testing.T) {
}
}
}

// QueryConfig Tests

func TestQueryConfig_Defaults(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "arc-config-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)

oldWd, _ := os.Getwd()
os.Chdir(tmpDir)
defer os.Chdir(oldWd)

cfg, err := Load()
if err != nil {
t.Fatalf("Load() error = %v", err)
}

// Test defaults
if cfg.Query.EnableS3Cache != false {
t.Errorf("Query.EnableS3Cache default = %v, want false", cfg.Query.EnableS3Cache)
}
expectedSize := int64(128 * 1024 * 1024) // 128MB in bytes
if cfg.Query.S3CacheSize != expectedSize {
t.Errorf("Query.S3CacheSize default = %d, want %d (128MB)", cfg.Query.S3CacheSize, expectedSize)
}
if cfg.Query.S3CacheTTLSeconds != 3600 {
t.Errorf("Query.S3CacheTTLSeconds default = %d, want 3600", cfg.Query.S3CacheTTLSeconds)
}
}

func TestQueryConfig_EnvOverride(t *testing.T) {
tmpDir, err := os.MkdirTemp("", "arc-config-test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)

oldWd, _ := os.Getwd()
os.Chdir(tmpDir)
defer os.Chdir(oldWd)

os.Setenv("ARC_QUERY_ENABLE_S3_CACHE", "true")
os.Setenv("ARC_QUERY_S3_CACHE_SIZE", "256MB")
os.Setenv("ARC_QUERY_S3_CACHE_TTL_SECONDS", "7200")
defer func() {
os.Unsetenv("ARC_QUERY_ENABLE_S3_CACHE")
os.Unsetenv("ARC_QUERY_S3_CACHE_SIZE")
os.Unsetenv("ARC_QUERY_S3_CACHE_TTL_SECONDS")
}()

cfg, err := Load()
if err != nil {
t.Fatalf("Load() error = %v", err)
}

if cfg.Query.EnableS3Cache != true {
t.Errorf("Query.EnableS3Cache = %v, want true", cfg.Query.EnableS3Cache)
}
expectedSize := int64(256 * 1024 * 1024) // 256MB in bytes
if cfg.Query.S3CacheSize != expectedSize {
t.Errorf("Query.S3CacheSize = %d, want %d (256MB)", cfg.Query.S3CacheSize, expectedSize)
}
if cfg.Query.S3CacheTTLSeconds != 7200 {
t.Errorf("Query.S3CacheTTLSeconds = %d, want 7200", cfg.Query.S3CacheTTLSeconds)
}
}
39 changes: 37 additions & 2 deletions internal/database/duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type Config struct {
AzureAccountName string
AzureAccountKey string
AzureEndpoint string // Custom endpoint (optional)
// Query optimization configuration
EnableS3Cache bool // Enable S3 file caching via cache_httpfs extension
S3CacheSize int64 // Cache size in bytes
S3CacheTTLSeconds int // Cache entry TTL in seconds (default: 3600)
}

// New creates a new DuckDB instance
Expand Down Expand Up @@ -88,6 +92,7 @@ func New(cfg *Config, logger zerolog.Logger) (*DuckDB, error) {
Bool("wal_enabled", cfg.EnableWAL).
Bool("s3_enabled", s3Enabled).
Str("s3_region", cfg.S3Region).
Bool("s3_cache_enabled", cfg.EnableS3Cache).
Bool("azure_enabled", azureEnabled).
Str("azure_account", cfg.AzureAccountName).
Msg("DuckDB initialized")
Expand Down Expand Up @@ -135,7 +140,7 @@ func configureDatabase(db *sql.DB, cfg *Config, logger zerolog.Logger) error {

// Configure httpfs extension for S3 access if credentials are provided
if cfg.S3AccessKey != "" && cfg.S3SecretKey != "" {
if err := configureS3Access(db, cfg); err != nil {
if err := configureS3Access(db, cfg, logger); err != nil {
return fmt.Errorf("failed to configure S3 access: %w", err)
}
}
Expand All @@ -152,7 +157,7 @@ func configureDatabase(db *sql.DB, cfg *Config, logger zerolog.Logger) error {

// configureS3Access sets up the httpfs extension for S3 access
// Note: We use SET GLOBAL to ensure settings persist across all connections in the pool
func configureS3Access(db *sql.DB, cfg *Config) error {
func configureS3Access(db *sql.DB, cfg *Config, logger zerolog.Logger) error {
// Install and load the httpfs extension
if _, err := db.Exec("INSTALL httpfs"); err != nil {
return fmt.Errorf("failed to install httpfs: %w", err)
Expand Down Expand Up @@ -201,6 +206,36 @@ func configureS3Access(db *sql.DB, cfg *Config) error {
return fmt.Errorf("failed to set s3_use_ssl: %w", err)
}

// Configure cache_httpfs extension for S3 file caching if enabled
if cfg.EnableS3Cache {
logger.Info().Msg("Enabling S3 file caching via cache_httpfs extension")
if _, err := db.Exec("INSTALL cache_httpfs FROM community"); err != nil {
logger.Warn().Err(err).Msg("Failed to install cache_httpfs extension, continuing without cache")
} else if _, err := db.Exec("LOAD cache_httpfs"); err != nil {
logger.Warn().Err(err).Msg("Failed to load cache_httpfs extension, continuing without cache")
} else {
db.Exec("SET cache_httpfs_type='in_memory'")
// Calculate max blocks from cache size (each block is 512KB)
if cfg.S3CacheSize > 0 {
maxBlocks := cfg.S3CacheSize / (512 * 1024) // 512KB per block
if maxBlocks > 0 {
db.Exec(fmt.Sprintf("SET cache_httpfs_max_in_mem_cache_block_count=%d", maxBlocks))
} else {
logger.Warn().
Int64("configured_bytes", cfg.S3CacheSize).
Msg("S3 cache size too small (minimum 512KB), increase s3_cache_size for caching to take effect")
}
}
if cfg.S3CacheTTLSeconds > 0 {
db.Exec(fmt.Sprintf("SET cache_httpfs_in_mem_cache_block_timeout_millisec=%d", cfg.S3CacheTTLSeconds*1000))
}
logger.Info().
Int64("cache_size_bytes", cfg.S3CacheSize).
Int("ttl_seconds", cfg.S3CacheTTLSeconds).
Msg("cache_httpfs extension loaded with in_memory mode")
}
}

return nil
}

Expand Down