Skip to content

Commit 4dbda19

Browse files
odesenfansnesitor
authored andcommitted
Perf: Batch systemd queries in list_executions endpoints
Replace per-VM systemd D-Bus calls with a single batch query using ListUnits(). This reduces the number of D-Bus calls from O(n) to O(1) for persistent VMs, significantly improving response times on CRNs with many instances. - Add get_services_active_states() method to SystemDManager that queries all service states in one ListUnits() call - Add _get_executions_running_states() helper in views to pre-fetch all running states efficiently - Update list_executions and list_executions_v2 to use batch query
1 parent 34811d2 commit 4dbda19

File tree

2 files changed

+81
-2
lines changed

2 files changed

+81
-2
lines changed

src/aleph/vm/orchestrator/views/__init__.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,43 @@ async def about_executions(request: web.Request) -> web.Response:
188188
)
189189

190190

191+
def _get_executions_running_states(pool: VmPool) -> dict[ItemHash, bool]:
192+
"""Get running state for all executions efficiently using batch systemd query.
193+
194+
For persistent VMs, this uses a single D-Bus call to get all service states
195+
instead of one call per VM, which is much faster when there are many persistent VMs.
196+
"""
197+
# Collect persistent executions that need systemd check
198+
persistent_services: dict[str, ItemHash] = {}
199+
for item_hash, execution in pool.executions.items():
200+
if execution.persistent and execution.systemd_manager:
201+
persistent_services[execution.controller_service] = item_hash
202+
203+
# Batch query systemd for all persistent services at once
204+
service_states: dict[str, bool] = {}
205+
if persistent_services:
206+
service_states = pool.systemd_manager.get_services_active_states(list(persistent_services.keys()))
207+
208+
# Build running states for all executions
209+
running_states: dict[ItemHash, bool] = {}
210+
for item_hash, execution in pool.executions.items():
211+
if execution.persistent and execution.systemd_manager:
212+
# Use batch result for persistent VMs
213+
running_states[item_hash] = service_states.get(execution.controller_service, False)
214+
else:
215+
# Use timestamp check for non-persistent VMs
216+
running_states[item_hash] = bool(execution.times.starting_at and not execution.times.stopping_at)
217+
218+
return running_states
219+
220+
191221
@cors_allow_all
192222
async def list_executions(request: web.Request) -> web.Response:
193223
pool: VmPool = request.app["vm_pool"]
224+
225+
# Get running states efficiently using batch systemd query
226+
running_states = _get_executions_running_states(pool)
227+
194228
return web.json_response(
195229
{
196230
item_hash: {
@@ -200,7 +234,7 @@ async def list_executions(request: web.Request) -> web.Response:
200234
},
201235
}
202236
for item_hash, execution in pool.executions.items()
203-
if execution.is_running
237+
if running_states.get(item_hash, False)
204238
},
205239
dumps=dumps_for_json,
206240
)
@@ -211,6 +245,9 @@ async def list_executions_v2(request: web.Request) -> web.Response:
211245
"""List all executions. Returning their status and ip"""
212246
pool: VmPool = request.app["vm_pool"]
213247

248+
# Get running states efficiently using batch systemd query
249+
running_states = _get_executions_running_states(pool)
250+
214251
return web.json_response(
215252
{
216253
item_hash: {
@@ -227,7 +264,7 @@ async def list_executions_v2(request: web.Request) -> web.Response:
227264
else {}
228265
),
229266
"status": execution.times,
230-
"running": execution.is_running,
267+
"running": running_states.get(item_hash, False),
231268
}
232269
for item_hash, execution in pool.executions.items()
233270
},

src/aleph/vm/systemd.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,48 @@ def is_service_active(self, service: str) -> bool:
130130
logger.error(error)
131131
return False
132132

133+
def get_services_active_states(self, services: list[str]) -> dict[str, bool]:
134+
"""Get active state of multiple services in a single D-Bus call.
135+
136+
This is much more efficient than calling is_service_active() for each service,
137+
as it uses ListUnits() which returns all loaded units in one call.
138+
139+
Args:
140+
services: List of service names to check (e.g., ["aleph-vm-controller@hash.service"])
141+
142+
Returns:
143+
Dictionary mapping service name to active state (True if active, False otherwise)
144+
"""
145+
if not services:
146+
return {}
147+
148+
try:
149+
manager = self._get_manager()
150+
units = manager.ListUnits()
151+
152+
# Build lookup from ListUnits() result
153+
# ListUnits returns: (name, description, load_state, active_state, sub_state,
154+
# following, unit_path, job_id, job_type, job_path)
155+
active_states: dict[str, bool] = {}
156+
service_set = set(services)
157+
158+
for unit in units:
159+
name = str(unit[0])
160+
if name in service_set:
161+
active_state = str(unit[3])
162+
active_states[name] = active_state == "active"
163+
164+
# Services not in ListUnits() output are not loaded (treat as inactive)
165+
for service in services:
166+
if service not in active_states:
167+
active_states[service] = False
168+
169+
return active_states
170+
except DBusException as error:
171+
logger.error(f"Failed to get services active states: {error}")
172+
# Return all as inactive on error
173+
return {service: False for service in services}
174+
133175
async def enable_and_start(self, service: str) -> None:
134176
if not self.is_service_enabled(service):
135177
self.enable(service)

0 commit comments

Comments
 (0)