Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,7 @@ analysis/*

# VSCode
*.code-workspace

# Internal
READONLY_API_REFERENCE.md
tests/manual/log.txt
26 changes: 26 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@

All notable changes to this project will be documented in this file.

## [1.0.5] - 2025-10-29

### Added - 2025-10-29
- **On-Demand MQTT Monitoring**
- New endpoint: `POST /v1/iot-service/api/user/device/realtime/start` - Start 2-minute MQTT monitoring session
- New endpoint: `GET /v1/iot-service/api/user/device/realtime?device_id={id}` - Get cached real-time MQTT data
- New endpoint: `GET /admin/mqtt` - View active MQTT session status
- Background cleanup thread automatically disconnects expired sessions
- Session-based model prevents resource exhaustion
- Only POST endpoint allowed in strict mode (doesn't modify printer state)

### Changed
- **Proxy Server (servers/proxy.py)**:
- Changed from automatic monitoring of all devices to on-demand session model
- Added `mqtt_sessions` dict to track active monitoring sessions
- Added configurable session duration (120 seconds) and cleanup interval (30 seconds)
- Updated `/health` endpoint to show active MQTT sessions count
- Modified strict mode check to allow MQTT session start POST endpoint

- **Test Suite (tests/manual/test_proxy_server.py)**:
- Updated MQTT tests to use on-demand session workflow
- Increased MQTT wait time from 3 to 5 seconds for better data retrieval
- Added detailed status output for MQTT session creation and data retrieval

---

## [1.0.4] - 2025-10-28

### Changed
Expand Down
274 changes: 269 additions & 5 deletions servers/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import os
import re
import copy
import time
import atexit
import threading
from flask import Flask, request, jsonify, Response
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

# Add parent directory to path for bambulab import
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from bambulab import BambuClient, TokenManager
from bambulab import BambuClient, TokenManager, MQTTClient
from bambulab.client import BambuAPIError

app = Flask(__name__)
Expand Down Expand Up @@ -73,8 +76,13 @@ def get_token_key():
swallow_errors=False # Show rate limit errors
)

# Global token manager
# Global token manager and MQTT cache
token_manager = None
mqtt_sessions = {} # device_id -> {'client': MQTTClient, 'data': {}, 'expires': timestamp}
mqtt_config = {
'session_duration': 120, # 2 minutes in seconds
'cleanup_interval': 30 # Check for expired sessions every 30s
}


def mask_sensitive_data(data, custom_token=None):
Expand Down Expand Up @@ -172,8 +180,16 @@ def mask_urls_and_ips(text, custom_token=None):
return text


# Global token manager
token_manager = None
def mqtt_message_handler(device_id):
"""Factory function to create message handler for specific device"""
def handler(dev_id, data):
"""Callback for MQTT messages - updates cache"""
global mqtt_sessions
if device_id in mqtt_sessions:
mqtt_sessions[device_id]['data'] = data
mqtt_sessions[device_id]['timestamp'] = time.time()
mqtt_sessions[device_id]['message_count'] = mqtt_sessions[device_id].get('message_count', 0) + 1
return handler


def init_token_manager():
Expand All @@ -183,9 +199,118 @@ def init_token_manager():
print(f"Loaded {token_manager.count()} token mappings")


def cleanup_expired_mqtt_sessions():
"""Background thread to cleanup expired MQTT sessions"""
global mqtt_sessions

while True:
try:
time.sleep(mqtt_config['cleanup_interval'])
now = time.time()
expired = []

for device_id, session in mqtt_sessions.items():
if now > session['expires']:
expired.append(device_id)

for device_id in expired:
session = mqtt_sessions[device_id]
try:
session['client'].disconnect()
print(f"[MQTT] Session expired for device {device_id}")
except:
pass
del mqtt_sessions[device_id]

except Exception as e:
print(f"[MQTT] Cleanup error: {e}")


def start_mqtt_session(device_id: str, real_token: str):
"""
Start MQTT monitoring session for a specific device.

Returns:
dict: Session info or error
"""
global mqtt_sessions

# Check if session already exists and is valid
if device_id in mqtt_sessions:
session = mqtt_sessions[device_id]
if time.time() < session['expires']:
# Extend existing session
session['expires'] = time.time() + mqtt_config['session_duration']
return {
'status': 'extended',
'device_id': device_id,
'expires_in': mqtt_config['session_duration'],
'message': 'Existing session extended'
}
else:
# Clean up expired session
try:
session['client'].disconnect()
except:
pass
del mqtt_sessions[device_id]

try:
# Get user profile for username
client = BambuClient(real_token)
profile = client.get(f"v1/user-service/my/profile")
username = profile.get('uid') or profile.get('user_id')

if not username:
return {'error': 'Failed to get user ID', 'status': 'failed'}

# Create MQTT client
mqtt_client = MQTTClient(
username=str(username),
access_token=real_token,
device_id=device_id,
on_message=mqtt_message_handler(device_id)
)

mqtt_client.connect(blocking=False)

# Wait briefly for connection
time.sleep(1)

if not mqtt_client.connected:
return {'error': 'MQTT connection failed', 'status': 'failed'}

# Request full status
mqtt_client.request_full_status()

# Create session
mqtt_sessions[device_id] = {
'client': mqtt_client,
'data': {},
'timestamp': None,
'expires': time.time() + mqtt_config['session_duration'],
'started': time.time(),
'message_count': 0
}

return {
'status': 'started',
'device_id': device_id,
'expires_in': mqtt_config['session_duration'],
'message': f'MQTT monitoring started for {mqtt_config["session_duration"]} seconds'
}

except Exception as e:
return {'error': str(e), 'status': 'failed'}


@app.before_request
def check_strict_mode():
"""Reject non-GET requests in strict mode."""
"""Reject non-GET requests in strict mode (except MQTT start endpoint)."""
# Allow POST to MQTT start endpoint even in strict mode
if request.path == '/v1/iot-service/api/user/device/realtime/start':
return None

if PROXY_MODE == "strict" and request.method != 'GET' and request.path not in ['/health', '/']:
return jsonify({
"error": "Method Not Allowed",
Expand All @@ -203,6 +328,8 @@ def health():
"mode": PROXY_MODE,
"backend": BambuClient.BASE_URL,
"tokens_configured": token_manager.count() if token_manager else 0,
"mqtt_active_sessions": len(mqtt_sessions),
"mqtt_session_duration": f"{mqtt_config['session_duration']} seconds",
"rate_limits": {
"device_queries": "30 per minute",
"user_endpoints": "15 per minute",
Expand Down Expand Up @@ -311,6 +438,100 @@ def proxy_v1(endpoint):
}), 502


@app.route('/v1/iot-service/api/user/device/realtime/start', methods=['POST'])
@limiter.limit(RATE_LIMITS["default"])
def start_realtime_monitoring():
"""Start MQTT monitoring session for a device"""
# Validate token
custom_token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not custom_token:
return jsonify({"error": "Missing token"}), 401

real_token = token_manager.validate(custom_token)
if not real_token:
return jsonify({"error": "Invalid token"}), 401

# Get device_id from request body
try:
data = request.get_json()
device_id = data.get('device_id')
except:
return jsonify({"error": "Invalid JSON"}), 400

if not device_id:
return jsonify({"error": "device_id required"}), 400

# Start MQTT session
result = start_mqtt_session(device_id, real_token)

if result.get('status') in ['started', 'extended']:
return jsonify(result), 200
else:
return jsonify(result), 500


@app.route('/v1/iot-service/api/user/device/realtime', methods=['GET'])
@limiter.limit(RATE_LIMITS["default"])
def get_realtime_data():
"""Get cached real-time MQTT data for a device"""
# Validate token
custom_token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not custom_token:
return jsonify({"error": "Missing token"}), 401

real_token = token_manager.validate(custom_token)
if not real_token:
return jsonify({"error": "Invalid token"}), 401

# Require device_id parameter
device_id = request.args.get('device_id')

if not device_id:
return jsonify({
"error": "device_id required",
"message": "Specify device_id parameter or start monitoring with POST to /v1/iot-service/api/user/device/realtime/start"
}), 400

# Check if session exists
if device_id not in mqtt_sessions:
return jsonify({
"error": "No active monitoring session",
"message": "Start monitoring by POST to /v1/iot-service/api/user/device/realtime/start with device_id",
"device_id": device_id
}), 404

session = mqtt_sessions[device_id]
now = time.time()

# Check if session expired
if now > session['expires']:
return jsonify({
"error": "Monitoring session expired",
"message": "Session expired. Start new session by POST to /v1/iot-service/api/user/device/realtime/start",
"device_id": device_id
}), 410 # 410 Gone

# Check if we have data yet
if not session.get('data'):
return jsonify({
"status": "waiting",
"message": "Waiting for first MQTT message",
"device_id": device_id,
"session_age": now - session['started'],
"expires_in": session['expires'] - now
}), 202 # 202 Accepted (processing)

# Return data
return jsonify({
"device_id": device_id,
"data": mask_sensitive_data(session['data'], custom_token),
"timestamp": session.get('timestamp'),
"age_seconds": now - session['timestamp'] if session.get('timestamp') else None,
"expires_in": session['expires'] - now,
"message_count": session.get('message_count', 0)
})


@app.route('/admin/tokens', methods=['GET'])
@limiter.limit(RATE_LIMITS["admin"])
def list_tokens():
Expand All @@ -328,6 +549,31 @@ def list_tokens():
})


@app.route('/admin/mqtt', methods=['GET'])
@limiter.limit(RATE_LIMITS["admin"])
def mqtt_status():
"""Get MQTT monitoring status"""
now = time.time()
sessions = {}

for device_id, session in mqtt_sessions.items():
sessions[device_id] = {
"connected": session['client'].connected,
"message_count": session.get('message_count', 0),
"last_update": session.get('timestamp'),
"has_data": bool(session.get('data')),
"expires_in": session['expires'] - now,
"session_age": now - session['started']
}

return jsonify({
"active_sessions": len(mqtt_sessions),
"session_duration": mqtt_config['session_duration'],
"cleanup_interval": mqtt_config['cleanup_interval'],
"sessions": sessions
})


@app.errorhandler(429)
def ratelimit_handler(e):
"""Handle rate limit exceeded"""
Expand Down Expand Up @@ -355,6 +601,24 @@ def main():
# Initialize token manager
init_token_manager()

# Start MQTT cleanup thread
cleanup_thread = threading.Thread(target=cleanup_expired_mqtt_sessions, daemon=True)
cleanup_thread.start()
print("MQTT session cleanup thread started")

# Register cleanup handler
def cleanup():
"""Cleanup on shutdown"""
print("\nShutting down...")
for device_id, session in mqtt_sessions.items():
try:
session['client'].disconnect()
print(f" Disconnected MQTT for {device_id}")
except:
pass

atexit.register(cleanup)

# Print banner
port = PORTS[PROXY_MODE]
print("=" * 80)
Expand Down
Loading