Skip to content

feat: a manager for SQLAlchemy engines#34826

Open
betodealmeida wants to merge 14 commits intomasterfrom
engine-manager
Open

feat: a manager for SQLAlchemy engines#34826
betodealmeida wants to merge 14 commits intomasterfrom
engine-manager

Conversation

@betodealmeida
Copy link
Member

@betodealmeida betodealmeida commented Aug 22, 2025

SUMMARY

This PR implements SIP-26: Connection Pooling for Analytics Database Connections (#8574).

Problem: Superset previously created and discarded database connections for each query without pooling, causing:

  • Latency issues from small delays accumulating across operations
  • Unbounded connections that could overwhelm databases under peak load
  • No predictability for concurrent database connections

Solution: A new EngineManager class that provides centralized SQLAlchemy engine management with two modes:

  1. NEW mode (default): Maintains backward compatibility - creates a new engine for every connection using NullPool (current behavior)
  2. SINGLETON mode: Enables connection pooling by reusing engines with configurable pool settings

Key Features:

  • Centralized engine creation: All SQLAlchemy engines are now created through EngineManager, consolidating logic previously scattered in Database.get_sqla_engine()
  • Configurable connection pools: Supports QueuePool, SingletonThreadPool, StaticPool, NullPool, and AssertionPool via database extra configuration
  • Thread-safe caching: Uses double-checked locking pattern with secure hash-based keys for engine caching
  • SSH tunnel management: Integrates SSH tunnel creation and lifecycle management (replaces SSHManager)
  • Background cleanup: Periodic cleanup thread to remove abandoned locks and prevent memory leaks
  • Flask extension pattern: EngineManagerExtension handles initialization and shutdown

Configuration Options (in config.py):

  • ENGINE_MANAGER_MODE: EngineModes.NEW (default) or EngineModes.SINGLETON
  • ENGINE_MANAGER_CLEANUP_INTERVAL: Interval for cleanup thread (default: 5 minutes)
  • ENGINE_MANAGER_AUTO_START_CLEANUP: Auto-start cleanup in SINGLETON mode (default: True)

Code Changes:

  • New superset/engines/manager.py with EngineManager class (~680 lines)
  • New superset/extensions/engine_manager.py Flask extension
  • Simplified Database.get_sqla_engine() to delegate to EngineManager
  • Removed superset/extensions/ssh.py (functionality merged into EngineManager)
  • Added type aliases for DBConnectionMutator and EngineContextManager
  • Comprehensive unit tests for LockManager and EngineManager

BEFORE/AFTER SCREENSHOTS OR ANIMATED GIF

N/A - Backend infrastructure change with no UI impact.

TESTING INSTRUCTIONS

  1. Verify backward compatibility (NEW mode - default):

    # Default config uses EngineModes.NEW
    # Verify all database connections work as before
    # Check SQL Lab queries, chart rendering, metadata exploration
  2. Test SINGLETON mode with connection pooling:

    # In superset_config.py:
    from superset.engines.manager import EngineModes
    ENGINE_MANAGER_MODE = EngineModes.SINGLETON
    
    # In database extra JSON:
    {"poolclass": "queue", "engine_params": {"pool_size": 5, "max_overflow": 10}}
  3. Test SSH tunnel connections:

    • Configure a database with SSH tunnel
    • Verify tunnel is created and reused appropriately
    • Check tunnel recreation when inactive
  4. Run unit tests:

    pytest tests/unit_tests/engines/manager_test.py -v

ADDITIONAL INFORMATION

  • Has associated issue: Fixes [SIP-26] Proposal for Implementing Connection Pooling for Analytics Database Connections #8574
  • Required feature flags:
  • Changes UI
  • Includes DB Migration (follow approval process in SIP-59)
    • Migration is atomic, supports rollback & is backwards-compatible
    • Confirm DB migration upgrade and downgrade tested
    • Runtime estimates and downtime expectations provided
  • Introduces new feature or API
  • Removes existing feature or API (removes SSHManager in favor of integrated EngineManager)

@korbit-ai
Copy link

korbit-ai bot commented Aug 22, 2025

Based on your review schedule, I'll hold off on reviewing this PR until it's marked as ready for review. If you'd like me to take a look now, comment /korbit-review.

Your admin can change your review schedule in the Korbit Console

@dpgaspar dpgaspar self-requested a review December 19, 2025 08:57
@netlify
Copy link

netlify bot commented Jan 6, 2026

Deploy Preview for superset-docs-preview canceled.

Name Link
🔨 Latest commit 259eeb2
🔍 Latest deploy log https://app.netlify.com/projects/superset-docs-preview/deploys/695d49f6b8751f00089218a9

@betodealmeida betodealmeida marked this pull request as ready for review February 4, 2026 15:03
@dosubot dosubot bot added change:backend Requires changing the backend data:databases Related to database configurations and connections risk:breaking-change Issues or PRs that will introduce breaking changes labels Feb 4, 2026
- Use sshtunnel.open_tunnel() instead of SSHTunnelForwarder directly
  to properly handle debug_level parameter
- Fix keepalive parameter name (set_keepalive, not keepalive)
- Fix test assertions that were inside pytest.raises blocks and never
  executed - now check error_type instead of string messages
- Update SSH tunnel test mocks to patch open_tunnel

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
import pytest

from superset.engines.manager import _LockManager, EngineManager, EngineModes

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: The tests call EngineManager._get_engine and get_engine without a Flask request or application context, but those methods internally use get_query_source_from_request and get_user_id, which rely on Flask's request and g and will raise "working outside of request context"; patching these functions in the EngineManager module for tests avoids spurious failures unrelated to engine logic. [logic error]

Severity Level: Major ⚠️
- ❌ Unit tests fail with Flask context errors.
- ⚠️ CI pipeline test suite blocked by failures.
- ⚠️ Developer workflow slowed by spurious test noise.
Suggested change
@pytest.fixture(autouse=True)
def _mock_engine_manager_dependencies(monkeypatch):
"""
Avoid Flask request/g dependencies when calling EngineManager in unit tests.
"""
import superset.engines.manager as manager
monkeypatch.setattr(manager, "get_query_source_from_request", lambda: None)
monkeypatch.setattr(manager, "get_user_id", lambda: None)
Steps of Reproduction ✅
1. Run the unit tests for the engine manager: pytest
tests/unit_tests/engines/manager_test.py. The test module imports EngineManager (file:
tests/unit_tests/engines/manager_test.py) and executes tests such as
test_get_engine_new_mode and test_engine_oauth2_error_handling.

2. In test_get_engine_new_mode (defined in the same file, added around lines 128-149 in
the PR), the test calls engine_manager._get_engine(mock_database, "catalog1", "schema1",
None). That code path inside EngineManager uses helper functions tied to Flask request
context (e.g., get_query_source_from_request / get_user_id) which access flask.request or
flask.g.

3. Because pytest runs these tests outside of any Flask request/app context, calling those
helpers triggers a "working outside of request context" runtime error coming from Flask
internals when EngineManager calls get_query_source_from_request/get_user_id. The failure
surface is seen during test collection/execution of the test functions in this file.

4. The suggested change (add an autouse fixture patching get_query_source_from_request and
get_user_id) prevents the Flask-dependent helpers from executing during tests so the
EngineManager logic is exercised without needing a Flask request context. This reproduces
locally by removing the fixture and re-running pytest to observe the Flask "working
outside of request context" error.
Prompt for AI Agent 🤖
This is a comment left during a code review.

**Path:** tests/unit_tests/engines/manager_test.py
**Line:** 27:27
**Comment:**
	*Logic Error: The tests call `EngineManager._get_engine` and `get_engine` without a Flask request or application context, but those methods internally use `get_query_source_from_request` and `get_user_id`, which rely on Flask's `request` and `g` and will raise "working outside of request context"; patching these functions in the EngineManager module for tests avoids spurious failures unrelated to engine logic.

Validate the correctness of the flagged issue. If correct, How can I resolve this? If you propose a fix, implement it and please make it concise.

Copy link
Contributor

@bito-code-review bito-code-review bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Agent Run #0c218e

Actionable Suggestions - 5
  • superset/extensions/engine_manager.py - 1
    • Incorrect teardown handler registration · Line 74-74
  • tests/unit_tests/engines/manager_test.py - 1
  • superset/engines/manager.py - 3
Additional Suggestions - 5
  • tests/unit_tests/engines/manager_test.py - 5
    • Incorrect Mock Setup · Line 165-165
      The mock for make_url_safe is incorrectly set to return the real_engine, but make_url_safe returns a URL object. This could cause the test to pass incorrectly or mask issues.
      Code suggestion
       @@ -162,6 +162,7 @@
      -        from sqlalchemy import create_engine
      -        from sqlalchemy.pool import StaticPool
      -        
      -        real_engine = create_engine("sqlite:///:memory:", poolclass=StaticPool)
      -        mock_create_engine.return_value = real_engine
      -        mock_make_url.return_value = real_engine
      +        from sqlalchemy import create_engine
      +        from sqlalchemy.pool import StaticPool
      +        from sqlalchemy.engine.url import URL
      +        
      +        real_engine = create_engine("sqlite:///:memory:", poolclass=StaticPool)
      -        mock_create_engine.return_value = real_engine
      +        mock_make_url.return_value = URL("sqlite:///:memory:")
    • Missing Test Implementation · Line 176-176
      The test comments about verifying different parameters create new engines but lacks the implementation.
      Code suggestion
       @@ -171,1 +171,5 @@
      -        # Call with different params - should create new engine
      +        # Call with different params - should create new engine
      +        result3 = engine_manager._get_engine(mock_database, "catalog2", "schema1", None)
      +        assert result3 is not result1  # Different engine
      +        assert mock_create_engine.call_count == 2  # Called twice now
    • Incorrect Mock Setup · Line 189-189
      The mock for make_url_safe should return a URL object instead of the engine; update mock_make_url.return_value to a URL instance.
    • Incomplete Test Assertions · Line 332-335
      The test does not verify the actual behavior of _get_engine_args, such as correct poolclass for NEW mode.
      Code suggestion
       @@ -334,2 +334,5 @@
      -        assert str(uri) == "trino://"
      -        assert "connect_args" in database.get_extra.return_value
      +        assert str(uri) == "trino://"
      +        assert "connect_args" in database.get_extra.return_value
      +        from sqlalchemy.pool import NullPool
      +        assert kwargs['poolclass'] == NullPool
      +        assert kwargs['connect_args'] == {"source": "Apache Superset"}
    • Incomplete Test Assertions · Line 374-381
      The test verifies the call but not the returned values from impersonation logic.
      Code suggestion
       @@ -381,1 +381,3 @@
      -         assert call_args[0][2] is None  # access_token (no OAuth2)
      +         assert call_args[0][2] is None  # access_token (no OAuth2)
      +         assert uri is mock_uri
      +         assert kwargs == {"connect_args": {"user": "alice", "source": "Apache Superset"}}
Review Details
  • Files reviewed - 15 · Commit Range: 5753dfb..c00fae5
    • superset/config.py
    • superset/engines/manager.py
    • superset/extensions/__init__.py
    • superset/extensions/engine_manager.py
    • superset/extensions/ssh.py
    • superset/initialization/__init__.py
    • superset/models/core.py
    • superset/superset_typing.py
    • tests/integration_tests/conftest.py
    • tests/integration_tests/databases/commands_tests.py
    • tests/integration_tests/model_tests.py
    • tests/unit_tests/engines/manager_test.py
    • tests/unit_tests/initialization_test.py
    • tests/unit_tests/models/core_test.py
    • tests/unit_tests/sql/execution/conftest.py
  • Files skipped - 1
    • UPDATING.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

Bito Usage Guide

Commands

Type the following command in the pull request comment and save the comment.

  • /review - Manually triggers a full AI review.

  • /pause - Pauses automatic reviews on this pull request.

  • /resume - Resumes automatic reviews.

  • /resolve - Marks all Bito-posted review comments as resolved.

  • /abort - Cancels all in-progress reviews.

Refer to the documentation for additional commands.

Configuration

This repository uses Superset You can customize the agent settings here or contact your Bito workspace admin at evan@preset.io.

Documentation & Help

AI Code Review powered by Bito Logo

mock_database, "catalog1", "schema1", None
)
results.append(engine)
except Exception as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blind exception catch too broad

Replace broad Exception catch with specific exception types. Multiple similar issues exist (line 208). Catch specific exceptions like RuntimeError or AssertionError instead.

Code suggestion
Check the AI-generated fix before applying
Suggested change
except Exception as e:
except (RuntimeError, AssertionError) as e:

Code Review Run #0c218e


Should Bito avoid suggestions like this for future reviews? (Manage Rules)

  • Yes, avoid them

@bito-code-review
Copy link
Contributor

bito-code-review bot commented Feb 4, 2026

Code Review Agent Run #741e2d

Actionable Suggestions - 0
Review Details
  • Files reviewed - 15 · Commit Range: c00fae5..3bb4b5f
    • superset/config.py
    • superset/engines/manager.py
    • superset/extensions/__init__.py
    • superset/extensions/engine_manager.py
    • superset/extensions/ssh.py
    • superset/initialization/__init__.py
    • superset/models/core.py
    • superset/superset_typing.py
    • tests/integration_tests/conftest.py
    • tests/integration_tests/databases/commands_tests.py
    • tests/integration_tests/model_tests.py
    • tests/unit_tests/engines/manager_test.py
    • tests/unit_tests/initialization_test.py
    • tests/unit_tests/models/core_test.py
    • tests/unit_tests/sql/execution/conftest.py
  • Files skipped - 1
    • UPDATING.md - Reason: Filter setting
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful
    • MyPy (Static Code Analysis) - ✔︎ Successful
    • Astral Ruff (Static Code Analysis) - ✔︎ Successful

Bito Usage Guide

Commands

Type the following command in the pull request comment and save the comment.

  • /review - Manually triggers a full AI review.

  • /pause - Pauses automatic reviews on this pull request.

  • /resume - Resumes automatic reviews.

  • /resolve - Marks all Bito-posted review comments as resolved.

  • /abort - Cancels all in-progress reviews.

Refer to the documentation for additional commands.

Configuration

This repository uses Superset You can customize the agent settings here or contact your Bito workspace admin at evan@preset.io.

Documentation & Help

AI Code Review powered by Bito Logo

@villebro
Copy link
Member

villebro commented Feb 4, 2026

@betodealmeida I believe the original SIP has been abandoned, and before it was closed I left a comment about the design not considering typical distributed worker deployments, for which the singleton doesn't really work: #8574 (comment) With this design connections will be grow linearly with the number of pods, which makes it more difficult to manage multi-pod deployments. So I think we need to revisit the architecture to come up with a design where we can coordinate connection pooling across the whole worker fleet.

@michael-s-molina
Copy link
Member

So I think we need to revisit the architecture to come up with a design where we can coordinate connection pooling across the whole worker fleet.

Agree with @villebro. To illustrate the problem in more detail:

1. Connection Pool Multiplication

In SINGLETON mode, each pod/Celery worker has its own EngineManager instance:

Pod 1: EngineManager → Pool(size=5) → 5 connections
Pod 2: EngineManager → Pool(size=5) → 5 connections
Pod 3: EngineManager → Pool(size=5) → 5 connections
...

With 10 pods and pool size 5, you could hit 50 connections to the database, not 5. This can easily exhaust
database connection limits, especially for managed databases with strict limits.

2. SSH Tunnel Multiplication

Same problem with SSH tunnels:

# Each pod creates its own tunnel cache   
self._tunnels: dict[TunnelKey, SSHTunnelForwarder] = {}   

If you have 20 Celery workers across multiple pods, you could have 20 separate SSH tunnels to the same bastion
host. Many SSH servers limit concurrent connections per user (often 10 by default).

3. Celery Prefork Workers Make It Worse

Celery's default prefork model spawns multiple worker processes:

Pod 1:
└── Celery Worker (prefork, concurrency=4)
├── Process 1 → EngineManager → Pool
├── Process 2 → EngineManager → Pool
├── Process 3 → EngineManager → Pool
└── Process 4 → EngineManager → Pool

Each forked process gets its own memory space, so connection pools aren't shared even within a single pod.

4. No Cluster-Wide Coordination

The cache keys are deterministic but there's no coordination:

# manager.py:258
engine_key = self._get_engine_key(database, catalog, schema, source, user_id)

Each process independently decides to create connections without knowing cluster-wide state.

What's Missing for True Connection Pooling

For distributed connection pooling, you'd typically need:

  1. External connection pooler like PgBouncer, ProxySQL, or cloud-native poolers (RDS Proxy, Cloud SQL Proxy)
  2. Centralized state via Redis or similar for tunnel coordination
  3. Connection limits per-pod that account for cluster size

@betodealmeida
Copy link
Member Author

@villebro @michael-s-molina I understand this is not a solution for every deployment out there, but it's still incredibly valuable for other architectures (like Preset, but also smaller Superset deployments). For distributed workers you can continue using EngineModes.NEW, or potentially implement a third engine mode that manages the pool between pods.

Regardless, IMHO this PR is a big win just for the fact that it centralizes the engine creation in a single place in a Flask extension.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

change:backend Requires changing the backend data:databases Related to database configurations and connections preset-io review:draft risk:breaking-change Issues or PRs that will introduce breaking changes size/XXL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[SIP-26] Proposal for Implementing Connection Pooling for Analytics Database Connections

4 participants