End-to-end MLOps pipeline for real-time financial sentiment analysis
An end-to-end ML system demonstrating software engineering patterns for machine learning. Ingests live financial news headlines, performs sentiment analysis using FinBERT, and monitors for system performance and model drift. Built to showcase async processing, comprehensive observability, statistical drift detection, and modern Python development practices.
- Real-time sentiment analysis using FinBERT for financial texts
- Async FastAPI service with a Redis caching layer
- Redis Streams architecture for decoupled producer-consumer data pipeline
- PostgreSQL storage with SQLAlchemy 2.0 async APIs and Alembic migration management
- Statistical drift detection using KL divergence with automatic baseline creation
- End-to-end latency monitoring from news scraping to sentiment inference
- 45+ Prometheus metrics across inference service, drift detector, and data producer
- 3 auto-provisioned Grafana dashboards (16-panel inference + 9-panel drift + 10-panel producer)
- 21 alerting rules monitoring system health, model drift, and data ingestion
- Structured JSON logging with unified configuration across all services
- Health checks at every layer with graceful shutdown handling
- Singleton model loading with thread-safe async initialization
- SHA256-based cache keys with configurable TTL
- Abstract interfaces for pluggable scrapers and sentiment models
- Consumer group pattern for at-least-once message delivery
- Comprehensive test suite (27 tests: 10 unit + 17 integration, 42% coverage)
graph LR
A[News APIs] --> B[Producer Service]
B --> C[Redis Streams]
C --> D[Inference Service]
D --> E[PostgreSQL]
D --> F[Redis Cache]
E --> G[Drift Monitor]
G --> H[Prometheus]
D --> H
B --> H
H --> I[Grafana Dashboards]
style A fill:#e1f5ff
style B fill:#fff4e1
style C fill:#ffe1e1
style D fill:#e1ffe1
style E fill:#f0e1ff
style F fill:#ffe1e1
style G fill:#fff4e1
style H fill:#ffe1f0
style I fill:#e1f0ff
| Service | Purpose | Ports |
|---|---|---|
| Redis | Message queue (Redis Streams) + caching layer | 6379 |
| PostgreSQL | Persistent storage for predictions and drift checks | 5432 |
| Inference Service | FastAPI + FinBERT model with stream consumer | 8000 (API), metrics on /metrics |
| Producer | News scraper publishing headlines to Redis Streams | 8002 (metrics) |
| Drift Monitor | KL divergence checks every 5 minutes with auto-baseline | 8001 (metrics) |
| Prometheus | Metrics aggregation and alerting engine | 9090 |
| Grafana | Visualization dashboards (auto-provisioned) | 3000 |
- Ingestion: Producer scrapes financial news APIs (NewsAPI/Mock) every 60 seconds
- Publishing: Headlines published to Redis Streams with timestamps
- Processing: Inference service consumes stream messages via consumer group
- Inference: FinBERT sentiment classification (positive/negative/neutral)
- Caching: Results cached in Redis with SHA256 hash keys
- Storage: Predictions stored in PostgreSQL with source tracking
- Monitoring: Drift monitor compares current vs baseline distributions
- Alerting: Prometheus evaluates 21 alert rules, fires notifications
- Web: FastAPI + Uvicorn (ASGI), Pydantic v2
- ML: Transformers (HuggingFace), PyTorch
- Infrastructure: Redis (streams + cache), PostgreSQL 16, Docker Compose
- Observability: Prometheus, Grafana, structlog (JSON logging)
- Development: pytest + pytest-asyncio, ruff, pyright, uv
- Database: SQLAlchemy 2.0 async, Alembic migrations
- Docker & Docker Compose v2
- (Optional) NewsAPI key for live data ingestion - works with mock scraper by default
1. Clone and configure:
git clone <repo-url>
cd sentinel-stream
cp .env.example .env
# Edit .env if needed (defaults work out of the box with mock scraper)2. Start all services:
make up3. Wait 30-60 seconds for model loading, then verify:
# Health check
curl http://localhost:8000/api/v1/health
# Test prediction
curl -X POST http://localhost:8000/api/v1/predict \
-H "Content-Type: application/json" \
-d '{"text": "Apple stock surges on strong earnings"}'
# Expected response:
# {
# "sentiment": "positive",
# "confidence": 0.927,
# "model": "ProsusAI/finbert",
# "cached": false
# }4. Access dashboards:
- API Documentation: http://localhost:8000/docs
- Grafana: http://localhost:3000 (login: admin/admin)
- SentinelStream Inference Dashboard
- Drift Monitoring Dashboard
- Producer Monitoring Dashboard
- Prometheus: http://localhost:9090
- Metrics Endpoints:
- Inference: http://localhost:8000/metrics
- Drift Monitor: http://localhost:8001/metrics
- Producer: http://localhost:8002/metrics
# Install dependencies (ideally with uv installed)
make install# Code Quality
make lint # Run ruff linter
make format # Format code with ruff
make typecheck # Run pyright type checker
# Testing
make test # Run all tests (27 tests)
pytest tests/unit/test_scrapers.py -v # Run specific test file
pytest --cov=. --cov-report=html # Generate coverage report (42% on critical paths)
# Database Migrations
make migrate-up # Apply pending migrations
make migrate-down # Rollback last migration
make migrate-create MSG="description" # Create new migration
make migrate-history # View migration history
# Docker Operations
make up # Start all services
make down # Stop all services
make dev # Start in development mode (with hot reload)
docker compose logs -f [service] # View service logssentinel-stream/
├── common/ # Shared utilities (logging, config)
├── data_ingestion/ # News scrapers + Redis producer
│ ├── scrapers/ # Abstract interface + implementations
│ ├── producer.py # Stream publisher
│ └── metrics.py # Producer observability
├── inference_service/ # FastAPI + FinBERT + stream consumer
│ ├── main.py # FastAPI app with consumer lifecycle
│ ├── core.py # Shared prediction logic
│ ├── consumer.py # Redis Stream consumer
│ ├── models/ # Model loader + cache
│ └── api/ # REST endpoints + schemas
├── drift_detector/ # Statistical drift monitoring
│ ├── detector.py # KL divergence calculation
│ ├── baseline.py # Baseline CRUD operations
│ ├── service.py # Continuous monitoring service
│ └── metrics.py # Prometheus exporters
├── database/ # PostgreSQL setup
│ ├── models.py # SQLAlchemy ORM models
│ └── migrations/ # Alembic migrations
├── monitoring/ # Observability configs
│ ├── prometheus/ # Metrics + alerting rules
│ └── grafana/ # Auto-provisioned dashboards
├── tests/ # Unit + integration tests
└── docker-compose.yml # Multi-service orchestration
- API vs stream datapoints differentiation in grafana
- same for inference vs cache metrics
- currently the drift baselines are only set once at startup - they should be regularly updated
- more drift metrics
- more inference models