Worker Monitoring SystemΒΆ
Updated: 2026-01-19 (Post-refactoring to controller-based architecture)
This document describes the worker discovery and monitoring system after migration to declarative resource management per ADR-010 and ADR-011.
The Lablet Cloud Manager implements an automated worker discovery and monitoring system that tracks the health, status, and performance of CML Workers running on AWS EC2 instances.
OverviewΒΆ
The worker monitoring system provides:
- Automated Worker Discovery: Periodic scanning of AWS regions for EC2 instances matching AMI patterns
- Metrics Collection: Continuous polling of AWS EC2, CloudWatch, and CML System APIs
- Status Synchronization: Real-time sync between EC2 instance state and worker status
- Real-time Updates: SSE event broadcasting to connected browsers
- Idle Detection: Activity tracking for cost optimization (auto-pause)
ArchitectureΒΆ
Post-Refactoring Architecture (Current)ΒΆ
graph TB
subgraph "worker-controller"
WR[WorkerReconciler<br/>LeaderElectedHostedService]
WR -->|Discovery task| AWS_SCAN
WR -->|Observe workers| RECONCILE
subgraph "Discovery Loop (_run_discovery_loop)"
AWS_SCAN[Scan AWS Regions]
AWS_SCAN --> POST_IMPORT[POST /api/internal/workers/bulk-import]
end
subgraph "Reconciliation Loop"
RECONCILE["Fetch workers needing attention"]
RECONCILE --> METRICS["Collect EC2 + CloudWatch + CML metrics"]
METRICS --> POST_METRICS["POST /api/internal/workers/:id/metrics"]
POST_METRICS --> IDLE["Detect idle activity"]
IDLE --> POST_IDLE["POST /api/internal/workers/:id/detect-idle"]
end
end
subgraph "control-plane-api"
INT_API[InternalController]
REPO[(MongoDB)]
SSE[SSEEventRelay]
POST_IMPORT --> INT_API
POST_METRICS --> INT_API
POST_IDLE --> INT_API
INT_API --> REPO
INT_API --> DOMAIN_EVENTS[Domain Events]
DOMAIN_EVENTS --> SSE
end
subgraph "Browser"
UI[Frontend UI]
SSE -->|SSE Stream| UI
end
subgraph "External APIs"
EC2[AWS EC2 API]
CW[AWS CloudWatch API]
CML[CML System API]
AWS_SCAN --> EC2
METRICS --> EC2
METRICS --> CW
METRICS --> CML
end
Key Principle: API-Centric State Management (ADR-001)ΒΆ
Per ADR-001:
- control-plane-api is the ONLY component that writes to MongoDB
- worker-controller observes and reports; does NOT persist directly
- All state mutations flow through internal API endpoints
- SSE events are triggered by domain events in control-plane-api
Key ComponentsΒΆ
Worker Discovery (WorkerReconciler._run_discovery_loop)ΒΆ
Discovery runs as an independent asyncio task inside WorkerReconciler, under leader election.
Per AD-020, the standalone WorkerDiscoveryService was removed and its logic consolidated here
to prevent redundant AWS API calls when multiple replicas are running.
# worker-controller/application/hosted_services/worker_reconciler.py
class WorkerReconciler(WatchTriggeredHostedService[CMLWorkerReadModel]):
"""Reconciles worker state; also runs discovery as an independent task."""
async def _run_discovery_loop(self) -> None:
"""Discovery loop - scans AWS for EC2 instances matching AMI patterns."""
while self._running:
for region in self._discovery_regions:
ami_ids = await ec2_client.get_ami_ids_by_name(ami_pattern)
instances = await ec2_client.list_instances_by_ami(ami_ids)
await self._api.bulk_import_workers(
discovered_instances=instances,
aws_region=region,
source="worker-controller-discovery",
)
await asyncio.sleep(self._discovery_interval)
Features:
- Configurable via
WORKER_DISCOVERY_REGIONSandWORKER_DISCOVERY_AMI_NAME - Dynamic region configuration via SystemSettings (see ADR-012)
- Runs under leader election (only one replica discovers)
- Submits discoveries to control-plane-api for persistence
- Includes orphan detection / garbage collection (see ADR-014)
WorkerReconciler (worker-controller)ΒΆ
Leader-elected hosted service for worker lifecycle management and metrics collection:
# worker-controller/application/hosted_services/worker_reconciler.py
class WorkerReconciler(WatchTriggeredHostedService[CMLWorkerReadModel]):
"""Reconciles worker state with actual EC2/CML state."""
async def reconcile(self, worker: CMLWorkerReadModel) -> ReconciliationResult:
"""Reconcile a single worker."""
if worker.status == "RUNNING":
return await self._handle_running(worker)
# ... handle other states ...
async def _handle_running(self, worker: CMLWorkerReadModel) -> ReconciliationResult:
"""Handle RUNNING worker - collect metrics and detect activity."""
# 1. Collect EC2 status and CloudWatch metrics
ec2_metrics = await self._ec2.get_instance_metrics(worker.ec2_instance_id)
# 2. Collect CML system stats
cml_metrics = await self._cml.get_system_stats(worker.ip_address)
# 3. Report metrics to Control Plane API
await self._api.report_worker_metrics(worker.id, {
"collected_at": datetime.now(timezone.utc).isoformat(),
"ec2": ec2_metrics,
"cml": cml_metrics,
})
# 4. Detect activity and idle state
await self._detect_activity(worker)
return ReconciliationResult.success()
Features:
- Leader election via etcd (only one instance reconciles)
- etcd watch for reactive reconciliation on state changes
- Periodic polling fallback (configurable interval)
- Handles full worker lifecycle: PENDING β PROVISIONING β RUNNING β STOPPING β STOPPED β TERMINATED
- Metrics collection and activity detection integrated
InternalController (control-plane-api)ΒΆ
Internal API endpoints for service-to-service communication:
| Endpoint | Method | Purpose | Called By |
|---|---|---|---|
/api/internal/workers |
GET | List workers | worker-controller |
/api/internal/workers/{id} |
GET | Get worker details | worker-controller |
/api/internal/workers/bulk-import |
POST | Import discovered workers | WorkerReconciler |
/api/internal/workers/{id}/status |
POST | Update worker status | WorkerReconciler |
/api/internal/workers/{id}/metrics |
POST | Update worker metrics | WorkerReconciler |
/api/internal/workers/{id}/detect-idle |
POST | Execute idle detection | WorkerReconciler |
SSEEventRelay (control-plane-api)ΒΆ
Broadcasts domain events to connected browsers:
# control-plane-api/application/services/sse_event_relay.py
class SSEEventRelay:
"""Relay service for broadcasting events to SSE clients."""
async def broadcast_event(self, event_type: str, data: dict) -> None:
"""Broadcast event to all matching clients."""
# Publish to Redis if enabled (multi-instance support)
if self._redis_client:
await self._redis_client.publish(self.REDIS_CHANNEL, payload)
else:
await self._broadcast_local(event_message)
Event flow:
- InternalController receives metrics from WorkerReconciler
- UpdateCMLWorkerMetricsCommand executed via Mediator
- CMLWorkerTelemetryUpdatedDomainEvent emitted
- CMLWorkerTelemetryUpdatedDomainEventHandler broadcasts SSE event
- Connected browsers receive
worker.metrics.updatedevent
ConfigurationΒΆ
Environment VariablesΒΆ
# Worker Discovery (worker-controller)
WORKER_DISCOVERY_ENABLED=true
WORKER_DISCOVERY_INTERVAL=300 # 5 minutes
WORKER_DISCOVERY_AMI_NAME="CML-*" # AMI name pattern
WORKER_DISCOVERY_REGIONS="us-east-1,us-west-2" # Comma-separated
# Reconciliation (worker-controller)
WORKER_CONTROLLER_RECONCILE_INTERVAL=30 # seconds
METRICS_POLL_INTERVAL=60 # seconds
IDLE_CHECK_INTERVAL=60 # seconds
IDLE_THRESHOLD_MINUTES=30
# AWS Credentials
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_REGION=us-east-1
# CML Worker API
CML_WORKER_API_USERNAME=admin
CML_WORKER_API_PASSWORD=xxx
# Leader Election (etcd)
ETCD_HOST=etcd
ETCD_PORT=2379
WORKER_CONTROLLER_LEADER_LEASE_TTL=15
# Control Plane API Connection
CONTROL_PLANE_API_URL=http://control-plane-api:8030
CONTROL_PLANE_API_KEY=xxx
Dynamic Configuration via SystemSettingsΒΆ
Per ADR-012, discovery regions can be configured via admin UI:
@dataclass
class DiscoverySettings:
enabled: bool = True
regions: list[str] = field(default_factory=lambda: ["us-east-1"])
ami_name_pattern: str = "CML-*"
scan_interval_seconds: int = 300
Worker-controller fetches these settings periodically and uses them with env var fallback.
Metrics CollectedΒΆ
| Source | Metric | Description |
|---|---|---|
| EC2 | Instance State | running, stopped, pending, terminated |
| EC2 | System Status | AWS system health checks |
| EC2 | Instance Status | Instance health checks |
| CloudWatch | CPU Utilization | Average % over period |
| CloudWatch | Network In/Out | Bytes transferred |
| CML | CPU Percent | CML-reported CPU usage |
| CML | Memory Percent | CML-reported memory usage |
| CML | Disk Percent | CML-reported disk usage |
| CML | Uptime Seconds | Time since CML started |
Real-Time UpdatesΒΆ
SSE events broadcast to browsers (see ADR-013):
| Event Type | Trigger | Payload |
|---|---|---|
worker.created |
Worker aggregate created | Worker summary |
worker.imported |
Bulk import discovered | Worker summary |
worker.status.updated |
Status change | Worker ID, old/new status |
worker.metrics.updated |
Metrics collected | Worker ID, metrics snapshot |
worker.labs.updated |
Labs synced | Worker ID, lab count |
worker.terminated |
Worker terminated | Worker ID |
Migration from APSchedulerΒΆ
Per ADR-011, all APScheduler-based jobs have been migrated:
| Previous Job | New Location | Trigger |
|---|---|---|
AutoImportWorkersJob |
WorkerReconciler._run_discovery_loop() |
Leader-elected asyncio task |
WorkerMetricsCollectionJob |
WorkerReconciler |
Reconciliation loop |
ActivityDetectionJob |
WorkerReconciler._detect_activity() |
Reconciliation loop |
TroubleshootingΒΆ
Workers Not DiscoveredΒΆ
- Check
WORKER_DISCOVERY_ENABLED=true - Verify
WORKER_DISCOVERY_AMI_NAMEmatches EC2 AMI names - Check worker-controller logs for discovery errors
- Verify AWS credentials have EC2 DescribeInstances permission
Metrics Not UpdatingΒΆ
- Verify worker-controller is leader (check etcd lease)
- Check
CONTROL_PLANE_API_URLis reachable - Verify internal API key is configured
- Check CloudWatch permissions for GetMetricData
SSE Events Not ReceivedΒΆ
- Verify
/api/events/streamconnection is established - Check Redis Pub/Sub is working (if multi-instance)
- Review domain event handler logs
- Check browser DevTools Network tab for SSE frames
ReferencesΒΆ
- ADR-001: API-Centric State Management
- ADR-010: Service Unification on Neuroglia
- ADR-011: APScheduler Removal
- ADR-012: Dynamic Region Configuration
- ADR-013: SSE Protocol Improvements
- Real-Time Updates Architecture
status_checks = aws_client.get_instance_status_checks( aws_region=worker.state.aws_region, instance_id=worker.state.aws_instance_id ) ec2_state = status_checks["instance_state"]
```
- Status Mapping
new_status = _map_ec2_state_to_cml_status(ec2_state)
# running -> RUNNING
# stopped -> STOPPED
# pending -> PENDING
# etc.
- CloudWatch Metrics (if running)
metrics = aws_client.get_instance_resources_utilization(
aws_region=worker.state.aws_region,
instance_id=worker.state.aws_instance_id,
relative_start_time=FIVE_MIN_AGO
)
worker.update_telemetry(
cpu_utilization=metrics.avg_cpu_utilization,
memory_utilization=metrics.avg_memory_utilization,
last_activity_at=datetime.now(timezone.utc)
)
- Event Emission
metrics_data = {
"worker_id": worker_id,
"worker_name": worker.state.name,
"status": new_status.value,
"metrics": {
"cpu_utilization": cpu_util,
"memory_utilization": memory_util
}
}
for observer in self._observers:
observer(metrics_data)
ConfigurationΒΆ
Application SettingsΒΆ
# src/application/settings.py
class Settings(ApplicationSettings):
# Worker Monitoring
worker_monitoring_enabled: bool = True
worker_metrics_poll_interval: int = 300 # 5 minutes
worker_notification_webhooks: list[str] = []
# Background Job Store
background_job_store: dict[str, Any] = {
"redis_host": "redis",
"redis_port": 6379,
"redis_db": 1
}
Environment VariablesΒΆ
# Enable/disable monitoring
WORKER_MONITORING_ENABLED=true
# Polling interval (seconds)
WORKER_METRICS_POLL_INTERVAL=300
# Webhook URLs for notifications (comma-separated)
WORKER_NOTIFICATION_WEBHOOKS=https://hooks.slack.com/services/xxx
Lifecycle IntegrationΒΆ
Application StartupΒΆ
# main.py
def configure_worker_monitoring(app: FastAPI) -> None:
"""Configure worker monitoring on startup"""
# Get services from DI container
worker_repository = app.state.services.get_required_service(CMLWorkerRepository)
aws_client = app.state.services.get_required_service(AwsEc2Client)
background_task_bus = app.state.services.get_required_service(BackgroundTasksBus)
background_task_scheduler = app.state.services.get_required_service(BackgroundTaskScheduler)
# Create notification handler
notification_handler = WorkerNotificationHandler(
cpu_threshold=90.0,
memory_threshold=90.0
)
# Create monitoring scheduler
scheduler = WorkerMonitoringScheduler(
worker_repository=worker_repository,
aws_client=aws_client,
notification_handler=notification_handler,
background_task_bus=background_task_bus,
background_task_scheduler=background_task_scheduler,
poll_interval=app_settings.worker_metrics_poll_interval
)
# Add lifecycle hooks
@app.on_event("startup")
async def start_monitoring():
await scheduler.start_async()
@app.on_event("shutdown")
async def stop_monitoring():
await scheduler.stop_async()
Worker RegistrationΒΆ
When a worker is created or imported, monitoring starts automatically:
# Domain event triggers monitoring
@integration_event_handler(CMLWorkerCreatedDomainEvent)
async def on_worker_created(event: CMLWorkerCreatedDomainEvent):
await monitoring_scheduler.start_monitoring_worker_async(event.aggregate_id)
Worker TerminationΒΆ
When a worker is terminated, monitoring stops automatically:
# Job checks worker status
if worker.state.status == CMLWorkerStatus.TERMINATED:
raise Exception("Worker terminated - stopping job")
# Scheduler removes job
await monitoring_scheduler.stop_monitoring_worker_async(worker_id)
Monitoring DataΒΆ
Metrics CollectedΒΆ
- EC2 Instance Status: running, stopped, pending, terminated
- CPU Utilization: Average over 5 minutes (percentage)
- Memory Utilization: Average over 5 minutes (percentage)
- Instance Health: System status checks, instance status checks
- Timestamps: Last activity, last transition time
Data StorageΒΆ
Metrics are stored in the worker aggregate:
# domain/entities/cml_worker.py
class CMLWorkerState:
cpu_utilization: float | None
memory_utilization: float | None
active_labs_count: int
last_activity_at: datetime | None
ObservabilityΒΆ
OpenTelemetry IntegrationΒΆ
All monitoring operations are traced:
with tracer.start_as_current_span("collect_worker_metrics") as span:
span.set_attribute("worker_id", worker_id)
span.set_attribute("aws_instance_id", instance_id)
span.set_attribute("cpu_utilization", cpu_util)
LoggingΒΆ
Comprehensive logging at each step:
logger.info(f"π Collected metrics for worker {worker_id}: CPU={cpu}%, Memory={mem}%")
logger.warning(f"β οΈ HIGH CPU: Worker {worker_id} - {cpu}% (threshold: 90%)")
logger.error(f"β Failed to collect metrics for worker {worker_id}: {error}")
Threshold AlertingΒΆ
CPU/Memory ThresholdsΒΆ
handler = WorkerNotificationHandler(
cpu_threshold=90.0, # Alert if CPU > 90%
memory_threshold=90.0 # Alert if memory > 90%
)
Alert ProcessingΒΆ
When thresholds are exceeded:
- Log warning message
- Call
_handle_threshold_violation()method - Future: Send webhook notifications, create tickets, etc.
def _handle_threshold_violation(
self,
worker_id: str,
worker_name: str,
metric_type: str, # "cpu" or "memory"
value: float,
threshold: float
) -> None:
logger.warning(
f"π¨ Threshold Violation: {worker_name} - "
f"{metric_type.upper()} at {value:.1f}% exceeds {threshold}%"
)
# TODO: Future integrations
# - await self._send_webhook_notification(...)
# - await self._trigger_pagerduty_alert(...)
Error HandlingΒΆ
Missing WorkersΒΆ
Jobs gracefully handle missing workers:
worker = await worker_repository.get_by_id_async(worker_id)
if not worker:
logger.warning(f"Worker {worker_id} not found - stopping job")
raise Exception("Worker not found - terminating job")
AWS API FailuresΒΆ
Transient AWS failures don't stop monitoring:
try:
status_checks = aws_client.get_instance_status_checks(...)
except Exception as e:
logger.error(f"Failed to get EC2 status: {e}")
# Job continues on next interval
Job DeserializationΒΆ
Dependencies are re-injected after restart:
def configure(self, service_provider=None, **kwargs):
if service_provider:
self.aws_ec2_client = service_provider.get_required_service(AwsEc2Client)
self.worker_repository = service_provider.get_required_service(CMLWorkerRepository)
Best PracticesΒΆ
- Poll Interval: Balance between freshness and API costs (default: 5 minutes)
- Threshold Configuration: Adjust based on workload characteristics
- Job Cleanup: Ensure terminated workers stop monitoring jobs
- Error Recovery: Let jobs fail cleanly for APScheduler retry logic
- Observability: Use tracing and logging for debugging
TroubleshootingΒΆ
Monitoring Not StartingΒΆ
Check scheduler logs:
# Look for startup messages
grep "Starting worker monitoring" logs/app.log
# Check active jobs
grep "Started monitoring worker" logs/app.log
Metrics Not UpdatingΒΆ
Verify AWS connectivity:
# Check EC2 API access
status = aws_client.get_instance_status_checks(...)
# Check CloudWatch metrics
metrics = aws_client.get_instance_resources_utilization(...)
High Resource UsageΒΆ
Reduce monitoring frequency:
Future EnhancementsΒΆ
- Webhook Notifications: Send alerts to external systems
- Auto-Remediation: Automatically restart failed workers
- Historical Metrics: Store time-series data in TimescaleDB
- Distributed Alerting: PagerDuty/Opsgenie integration
- Dashboard Integration: Real-time metrics visualization