Skip to content

Commit 1e9e164

Browse files
authored
Add S3 file caching via cache_httpfs extension (#149)
* Add cache_httpfs extension support for faster S3 reads Add optional S3 file caching via DuckDB's cache_httpfs extension. This improves query performance for CTEs/subqueries that read the same Parquet files multiple times (5-10x faster in benchmarks). Configuration (disabled by default): [query] enable_s3_cache = true Fixes #147 * Add S3 file caching via cache_httpfs extension - Add cache_httpfs extension support for faster S3 reads - Add [query] config section with enable_s3_cache, s3_cache_size, and s3_cache_ttl_seconds options - Add logging for cache setup (info on success, warn on failure) - Add config tests for defaults and environment variable overrides
1 parent 65e1d9c commit 1e9e164

File tree

5 files changed

+164
-2
lines changed

5 files changed

+164
-2
lines changed

arc.toml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,5 +257,36 @@ enabled = false
257257
# Enable MQTT subscription manager (subscriptions configured via API)
258258
enabled = false
259259

260+
# -----------------------------------------------------------------------------
261+
# Query Configuration
262+
# -----------------------------------------------------------------------------
263+
# Query execution optimizations
264+
[query]
265+
# Enable S3 file caching via DuckDB's cache_httpfs extension
266+
# When enabled, caches S3 Parquet files in memory for faster repeated reads.
267+
# This significantly improves query performance (5-10x) for:
268+
# - CTEs (Common Table Expressions) that read the same table multiple times
269+
# - Subqueries accessing the same data
270+
# - Grafana dashboards with multiple panels querying similar time ranges
271+
#
272+
# Trade-off: Increases memory usage based on cache size settings below
273+
# Only effective when storage.backend = "s3"
274+
#
275+
# Environment variable: ARC_QUERY_ENABLE_S3_CACHE
276+
# Default: false
277+
enable_s3_cache = false
278+
279+
# Maximum size for in-memory S3 cache
280+
# Supports: KB, MB, GB (e.g., "128MB", "256MB", "1GB")
281+
# Environment variable: ARC_QUERY_S3_CACHE_SIZE
282+
# Default: "128MB"
283+
s3_cache_size = "128MB"
284+
285+
# Cache entry TTL (time-to-live) in seconds
286+
# Cached blocks are evicted after this duration
287+
# Environment variable: ARC_QUERY_S3_CACHE_TTL_SECONDS
288+
# Default: 3600 (1 hour)
289+
s3_cache_ttl_seconds = 3600
290+
260291
[telemetry]
261292
enabled = true

cmd/arc/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ func main() {
166166
AzureAccountName: cfg.Storage.AzureAccountName,
167167
AzureAccountKey: cfg.Storage.AzureAccountKey,
168168
AzureEndpoint: cfg.Storage.AzureEndpoint,
169+
// Query optimization
170+
EnableS3Cache: cfg.Query.EnableS3Cache,
171+
S3CacheSize: cfg.Query.S3CacheSize,
172+
S3CacheTTLSeconds: cfg.Query.S3CacheTTLSeconds,
169173
}
170174

171175
db, err := database.New(dbConfig, logger.Get("database"))

internal/config/config.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type Config struct {
2929
License LicenseConfig
3030
Scheduler SchedulerConfig
3131
Cluster ClusterConfig
32+
Query QueryConfig
3233
}
3334

3435
type ServerConfig struct {
@@ -161,6 +162,13 @@ type MQTTConfig struct {
161162
Enabled bool // Enable MQTT subscription manager feature
162163
}
163164

165+
// QueryConfig holds configuration for query execution optimizations
166+
type QueryConfig struct {
167+
EnableS3Cache bool // Enable S3 file caching for faster repeated reads (useful for CTEs/subqueries)
168+
S3CacheSize int64 // Cache size in bytes (parsed from "128MB", "256MB", etc.)
169+
S3CacheTTLSeconds int // Cache entry TTL in seconds (default: 3600 = 1 hour)
170+
}
171+
164172
// LicenseConfig holds configuration for enterprise license validation
165173
type LicenseConfig struct {
166174
Enabled bool // Enable license validation (default: false)
@@ -263,6 +271,12 @@ func Load() (*Config, error) {
263271
return nil, fmt.Errorf("invalid server.max_payload_size: %w", err)
264272
}
265273

274+
// Parse S3 cache size
275+
s3CacheSize, err := ParseSize(v.GetString("query.s3_cache_size"))
276+
if err != nil {
277+
return nil, fmt.Errorf("invalid query.s3_cache_size: %w", err)
278+
}
279+
266280
// Build config from Viper (which includes defaults + env vars)
267281
cfg := &Config{
268282
Server: ServerConfig{
@@ -375,6 +389,11 @@ func Load() (*Config, error) {
375389
MQTT: MQTTConfig{
376390
Enabled: v.GetBool("mqtt.enabled"),
377391
},
392+
Query: QueryConfig{
393+
EnableS3Cache: v.GetBool("query.enable_s3_cache"),
394+
S3CacheSize: s3CacheSize,
395+
S3CacheTTLSeconds: v.GetInt("query.s3_cache_ttl_seconds"),
396+
},
378397
License: LicenseConfig{
379398
Enabled: v.GetBool("license.enabled"),
380399
Key: v.GetString("license.key"),
@@ -525,6 +544,11 @@ func setDefaults(v *viper.Viper) {
525544
// MQTT defaults (subscriptions are configured via REST API, stored in SQLite)
526545
v.SetDefault("mqtt.enabled", false) // Feature toggle only - disabled by default
527546

547+
// Query defaults
548+
v.SetDefault("query.enable_s3_cache", false) // Disabled by default (opt-in feature)
549+
v.SetDefault("query.s3_cache_size", "128MB") // 128MB cache (256 blocks × 512KB)
550+
v.SetDefault("query.s3_cache_ttl_seconds", 3600) // 1 hour
551+
528552
// License defaults (Enterprise features)
529553
// Note: Server URL and validation interval are hardcoded in internal/license/client.go
530554
v.SetDefault("license.enabled", false) // Disabled by default

internal/config/config_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,3 +657,71 @@ func TestLoad_ClusterSeedsEnvOverride(t *testing.T) {
657657
}
658658
}
659659
}
660+
661+
// QueryConfig Tests
662+
663+
func TestQueryConfig_Defaults(t *testing.T) {
664+
tmpDir, err := os.MkdirTemp("", "arc-config-test")
665+
if err != nil {
666+
t.Fatal(err)
667+
}
668+
defer os.RemoveAll(tmpDir)
669+
670+
oldWd, _ := os.Getwd()
671+
os.Chdir(tmpDir)
672+
defer os.Chdir(oldWd)
673+
674+
cfg, err := Load()
675+
if err != nil {
676+
t.Fatalf("Load() error = %v", err)
677+
}
678+
679+
// Test defaults
680+
if cfg.Query.EnableS3Cache != false {
681+
t.Errorf("Query.EnableS3Cache default = %v, want false", cfg.Query.EnableS3Cache)
682+
}
683+
expectedSize := int64(128 * 1024 * 1024) // 128MB in bytes
684+
if cfg.Query.S3CacheSize != expectedSize {
685+
t.Errorf("Query.S3CacheSize default = %d, want %d (128MB)", cfg.Query.S3CacheSize, expectedSize)
686+
}
687+
if cfg.Query.S3CacheTTLSeconds != 3600 {
688+
t.Errorf("Query.S3CacheTTLSeconds default = %d, want 3600", cfg.Query.S3CacheTTLSeconds)
689+
}
690+
}
691+
692+
func TestQueryConfig_EnvOverride(t *testing.T) {
693+
tmpDir, err := os.MkdirTemp("", "arc-config-test")
694+
if err != nil {
695+
t.Fatal(err)
696+
}
697+
defer os.RemoveAll(tmpDir)
698+
699+
oldWd, _ := os.Getwd()
700+
os.Chdir(tmpDir)
701+
defer os.Chdir(oldWd)
702+
703+
os.Setenv("ARC_QUERY_ENABLE_S3_CACHE", "true")
704+
os.Setenv("ARC_QUERY_S3_CACHE_SIZE", "256MB")
705+
os.Setenv("ARC_QUERY_S3_CACHE_TTL_SECONDS", "7200")
706+
defer func() {
707+
os.Unsetenv("ARC_QUERY_ENABLE_S3_CACHE")
708+
os.Unsetenv("ARC_QUERY_S3_CACHE_SIZE")
709+
os.Unsetenv("ARC_QUERY_S3_CACHE_TTL_SECONDS")
710+
}()
711+
712+
cfg, err := Load()
713+
if err != nil {
714+
t.Fatalf("Load() error = %v", err)
715+
}
716+
717+
if cfg.Query.EnableS3Cache != true {
718+
t.Errorf("Query.EnableS3Cache = %v, want true", cfg.Query.EnableS3Cache)
719+
}
720+
expectedSize := int64(256 * 1024 * 1024) // 256MB in bytes
721+
if cfg.Query.S3CacheSize != expectedSize {
722+
t.Errorf("Query.S3CacheSize = %d, want %d (256MB)", cfg.Query.S3CacheSize, expectedSize)
723+
}
724+
if cfg.Query.S3CacheTTLSeconds != 7200 {
725+
t.Errorf("Query.S3CacheTTLSeconds = %d, want 7200", cfg.Query.S3CacheTTLSeconds)
726+
}
727+
}

internal/database/duckdb.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type Config struct {
4949
AzureAccountName string
5050
AzureAccountKey string
5151
AzureEndpoint string // Custom endpoint (optional)
52+
// Query optimization configuration
53+
EnableS3Cache bool // Enable S3 file caching via cache_httpfs extension
54+
S3CacheSize int64 // Cache size in bytes
55+
S3CacheTTLSeconds int // Cache entry TTL in seconds (default: 3600)
5256
}
5357

5458
// New creates a new DuckDB instance
@@ -88,6 +92,7 @@ func New(cfg *Config, logger zerolog.Logger) (*DuckDB, error) {
8892
Bool("wal_enabled", cfg.EnableWAL).
8993
Bool("s3_enabled", s3Enabled).
9094
Str("s3_region", cfg.S3Region).
95+
Bool("s3_cache_enabled", cfg.EnableS3Cache).
9196
Bool("azure_enabled", azureEnabled).
9297
Str("azure_account", cfg.AzureAccountName).
9398
Msg("DuckDB initialized")
@@ -135,7 +140,7 @@ func configureDatabase(db *sql.DB, cfg *Config, logger zerolog.Logger) error {
135140

136141
// Configure httpfs extension for S3 access if credentials are provided
137142
if cfg.S3AccessKey != "" && cfg.S3SecretKey != "" {
138-
if err := configureS3Access(db, cfg); err != nil {
143+
if err := configureS3Access(db, cfg, logger); err != nil {
139144
return fmt.Errorf("failed to configure S3 access: %w", err)
140145
}
141146
}
@@ -152,7 +157,7 @@ func configureDatabase(db *sql.DB, cfg *Config, logger zerolog.Logger) error {
152157

153158
// configureS3Access sets up the httpfs extension for S3 access
154159
// Note: We use SET GLOBAL to ensure settings persist across all connections in the pool
155-
func configureS3Access(db *sql.DB, cfg *Config) error {
160+
func configureS3Access(db *sql.DB, cfg *Config, logger zerolog.Logger) error {
156161
// Install and load the httpfs extension
157162
if _, err := db.Exec("INSTALL httpfs"); err != nil {
158163
return fmt.Errorf("failed to install httpfs: %w", err)
@@ -201,6 +206,36 @@ func configureS3Access(db *sql.DB, cfg *Config) error {
201206
return fmt.Errorf("failed to set s3_use_ssl: %w", err)
202207
}
203208

209+
// Configure cache_httpfs extension for S3 file caching if enabled
210+
if cfg.EnableS3Cache {
211+
logger.Info().Msg("Enabling S3 file caching via cache_httpfs extension")
212+
if _, err := db.Exec("INSTALL cache_httpfs FROM community"); err != nil {
213+
logger.Warn().Err(err).Msg("Failed to install cache_httpfs extension, continuing without cache")
214+
} else if _, err := db.Exec("LOAD cache_httpfs"); err != nil {
215+
logger.Warn().Err(err).Msg("Failed to load cache_httpfs extension, continuing without cache")
216+
} else {
217+
db.Exec("SET cache_httpfs_type='in_memory'")
218+
// Calculate max blocks from cache size (each block is 512KB)
219+
if cfg.S3CacheSize > 0 {
220+
maxBlocks := cfg.S3CacheSize / (512 * 1024) // 512KB per block
221+
if maxBlocks > 0 {
222+
db.Exec(fmt.Sprintf("SET cache_httpfs_max_in_mem_cache_block_count=%d", maxBlocks))
223+
} else {
224+
logger.Warn().
225+
Int64("configured_bytes", cfg.S3CacheSize).
226+
Msg("S3 cache size too small (minimum 512KB), increase s3_cache_size for caching to take effect")
227+
}
228+
}
229+
if cfg.S3CacheTTLSeconds > 0 {
230+
db.Exec(fmt.Sprintf("SET cache_httpfs_in_mem_cache_block_timeout_millisec=%d", cfg.S3CacheTTLSeconds*1000))
231+
}
232+
logger.Info().
233+
Int64("cache_size_bytes", cfg.S3CacheSize).
234+
Int("ttl_seconds", cfg.S3CacheTTLSeconds).
235+
Msg("cache_httpfs extension loaded with in_memory mode")
236+
}
237+
}
238+
204239
return nil
205240
}
206241

0 commit comments

Comments
 (0)