AI Agent Guide¶
This guide provides comprehensive instructions for AI coding agents (GitHub Copilot, Cursor, Cline, etc.) working on the CML Cloud Manager codebase.
Single Source of Truth
The content below is automatically included from .github/copilot-instructions.md - the single maintained source for AI agent instructions. Do not edit this page directly; instead, update the source file.
For Human Developers
While designed for AI agents, this guide is equally valuable for human developers new to the codebase. It covers essential architectural patterns, workflows, and conventions.
AI Agent Instructions¶
CML Cloud Manager - AI Agent Instructions¶
Quick Start for AI Agents¶
Critical First Steps:
- Always set
PYTHONPATH=srcor use Makefile commands - Run
make build-uibeforemake run(UI assets required) - Commands/queries are self-contained (request + handler in same file)
- Use Neuroglia DI patterns - never instantiate services directly
Common Operations:
make test # Run tests (use this to validate changes)
make run # Start locally (needs make build-ui first)
make dev # Full Docker stack with logs
make lint && make format # Code quality checks
Architecture Overview¶
Stack: FastAPI + Neuroglia Framework (DDD/CQRS) + Bootstrap 5 SPA + Keycloak OAuth2/OIDC Purpose: Manage AWS EC2-based Cisco Modeling Lab (CML) workers with monitoring, metrics collection, and lab orchestration Key Pattern: Event-sourced aggregates with @dispatch handlers, CQRS via Mediator, dual authentication (cookie + JWT)
Multi-SubApp Pattern¶
The application uses Neuroglia's SubApp pattern with two mounted FastAPI apps:
- API SubApp (
/api/*): JSON REST endpoints with JWT/cookie auth (seesrc/api/) - UI SubApp (
/*): Bootstrap 5 SPA with Server-Side Events (seesrc/ui/)
Both apps are configured in src/main.py::create_app() via WebApplicationBuilder and mounted with route prefixes.
Layer Architecture (Clean Architecture/DDD)¶
domain/ # Pure entities (CMLWorker, Task, LabRecord) with AggregateRoot pattern
application/ # CQRS: commands/queries/handlers, settings, background services
integration/ # MongoDB repositories (Motor), AWS EC2/CloudWatch client
infrastructure/ # Session stores (Redis/in-memory), technical adapters
api/ # API controllers, dependencies (auth), OpenAPI config
ui/ # UI controllers, Parcel-built frontend (src/*, package.json)
Key Pattern: Commands/Queries are self-contained - each file contains both the request class and its handler (e.g., create_task_command.py has CreateTaskCommand + CreateTaskCommandHandler). No separate handlers/ directory.
Finding CQRS handlers: Don't look for a handlers/ directory - handlers are co-located with requests in application/commands/*.py and application/queries/*.py.
Example self-contained command:
# application/commands/create_task_command.py
@dataclass
class CreateTaskCommand(Command[OperationResult[TaskCreatedDto]]):
title: str
description: str
class CreateTaskCommandHandler(CommandHandler[CreateTaskCommand, OperationResult[TaskCreatedDto]]):
def __init__(self, task_repository: TaskRepository):
self._repository = task_repository
async def handle_async(self, request: CreateTaskCommand, cancellation_token=None) -> OperationResult[TaskCreatedDto]:
# Validate using helper methods
if not request.title:
return self.bad_request("Title is required")
# Business logic
task = Task.create(title=request.title, description=request.description)
await self._repository.add_async(task, cancellation_token)
# Return success using helper method
return self.created(TaskCreatedDto(id=task.id(), title=task.state.title))
Critical Domain Concepts¶
CMLWorker Aggregate (domain/entities/cml_worker.py)¶
Represents an AWS EC2 instance running CML. Uses Neuroglia's AggregateRoot pattern with event-driven state transitions:
- Instance type:
m5zn.metal(nested virtualization support) - expensive and slow to provision - Idle management: Instances should be stopped after idle_timeout to reduce costs (detection mechanism TBD)
- Licensing: Valid CML license required for labs with >5 nodes
- Event sourcing: State changes via domain events (e.g.,
CMLWorkerCreatedDomainEvent,CMLWorkerStatusUpdatedDomainEvent) - State reconstruction:
@dispatchdecorators handle events to updateCMLWorkerState - Lifecycle: created → provisioning → running → stopping → stopped → terminated
Lab Management & Resource Hierarchy¶
Critical relationships:
- Worker-to-Lab: One worker hosts multiple labs (user-created or imported)
- Lab lifecycle: Independent state machine - users can create/modify/start/wipe/stop/delete labs
- Resource consumption: Labs consume worker resources (CPU/memory/storage) - monitor via CML native telemetry API (preferred over CloudWatch)
- Node definitions: Workers include standard + custom node definitions, each with image definitions (features/state)
- Separation of concerns: Worker and lab lifecycles must be managed separately
Orchestration principle: Transparent - no worker-side changes required for management operations
Worker Monitoring System¶
Background monitoring coordinated by WorkerMonitoringScheduler (see notes/WORKER_MONITORING_ARCHITECTURE.md):
- Auto-discovery: Scans for active workers on startup via
get_active_workers_async() - Job scheduling: Uses
BackgroundTaskScheduler(APScheduler wrapper) to scheduleWorkerMetricsCollectionJobper worker - Reactive notifications:
WorkerNotificationHandlerobserves metrics events and processes them - Separation of concerns: Metrics collection (compute layer) vs. workload monitoring (CML labs)
- Telemetry sources: CML native API (preferred) for lab-level metrics; CloudWatch for EC2-level metrics
- Regular polling: Resource utilization must be monitored continuously (CPU/memory/storage at both worker and lab levels)
Startup lifecycle (src/main.py::lifespan_with_monitoring):
- Creates service scope to access repositories
- Instantiates monitoring scheduler with dependencies
- Auto-starts monitoring for active workers
- Schedules global labs refresh job (30-minute interval)
Background Jobs (see application/jobs/*.py):
WorkerMetricsCollectionJob: Per-worker metrics polling (EC2 + CML + CloudWatch)LabsRefreshJob: Global lab data refresh every 30 minutes
Use @backgroundjob decorator with task_type="recurrent" for recurring tasks:
@backgroundjob(task_type="recurrent", interval=300)
class MyBackgroundJob(BackgroundJobBase):
async def execute_async(self, context):
# Job logic here
Authentication Architecture¶
Dual Authentication (api/services/auth.py::DualAuthService):
- Cookie-based (Primary): Backend-for-Frontend (BFF) pattern with httpOnly cookies, Keycloak OIDC flow
- Bearer Token (API): JWT tokens for programmatic access (Swagger UI, API clients)
Security Model (see notes/AUTHENTICATION_ARCHITECTURE.md):
- Tokens never exposed to browser JavaScript (httpOnly cookies prevent XSS)
- CSRF protection via SameSite cookies
- Session storage: Redis (production) or in-memory (dev) via
infrastructure/session_store.py - RBAC enforcement: Both UI and API must enforce role-based access control at application layer
Auth Flow:
- Browser →
/api/auth/login→ redirect to Keycloak - Keycloak callback → exchange code for tokens → store server-side
- Set httpOnly cookie with session ID
- API requests → validate session → retrieve tokens → call protected endpoints
Dependency Injection (api/dependencies.py):
async def get_current_user(
session_id: Optional[str] = Cookie(None),
credentials: Optional[HTTPAuthorizationCredentials] = Security(security_optional)
) -> dict:
Checks cookie first, then bearer token. Auth service retrieved from request.state.auth_service (injected by middleware).
Development Workflows¶
Essential Commands (Makefile)¶
make install # Install Python deps with Poetry
make install-ui # Install Node.js deps for UI
make build-ui # Build Parcel frontend → static/
make run # Run app locally (requires build-ui first)
make dev # Docker Compose: build + start services with logs
make up # Docker Compose: start services in background
make urls # Show all service URLs
make test # Run pytest suite
make test-cov # Run tests with coverage report
make lint # Run Ruff linting
make format # Format with Black
make install-hooks # Install pre-commit hooks
Local Development (without Docker):
make install && make install-ui- Install dependenciesmake build-ui- Build frontend assets (required before running)make run- Starts Uvicorn with reload athttp://localhost:8000- Set
PYTHONPATH=srcwhen running Python commands directly
Docker Development:
make dev- Full stack with live logs (app, MongoDB, Redis, Keycloak, OTEL Collector)- App runs with debugpy on port 5678 (VS Code attach config available)
- Environment variables in
.envoverride defaults indocker-compose.yml
Testing Patterns (pytest.ini)¶
Markers: @pytest.mark.unit, @pytest.mark.integration, @pytest.mark.asyncio, @pytest.mark.command, @pytest.mark.query
Test discovery: tests/ directory, PYTHONPATH=src configured in pytest.ini
Coverage: Run make test-cov to generate HTML report in htmlcov/
Neuroglia Framework Specifics¶
CRITICAL: Mediator & RequestHandler API Patterns¶
This is a recurring source of errors - READ CAREFULLY:
Mediator.execute_async() Signature¶
ALWAYS call with ONLY ONE ARGUMENT (the request):
# ✅ CORRECT - One argument only
result = await self.mediator.execute_async(GetWorkerQuery(worker_id="123"))
result = await self.mediator.execute_async(CreateTaskCommand(title="Task"))
# ❌ WRONG - DO NOT pass cancellation_token to mediator
result = await self.mediator.execute_async(query, cancellation_token) # TypeError!
Why this matters: Mediator.execute_async(request) takes only 1 parameter. The cancellation_token is passed to repository methods, not mediator calls.
RequestHandler Helper Methods (Commands, Queries, Events)¶
All handlers inherit from RequestHandler which provides HTTP status code helper methods. These are available in CommandHandler, QueryHandler, and EventHandler:
Available Methods (return OperationResult[T]):
self.ok(data)- 200 OK with dataself.created(data)- 201 Createdself.accepted(data)- 202 Acceptedself.no_content()- 204 No Contentself.bad_request(message)- 400 Bad Requestself.unauthorized(message)- 401 Unauthorizedself.forbidden(message)- 403 Forbiddenself.not_found(title, detail)- 404 Not Foundself.conflict(message)- 409 Conflictself.unprocessable_entity(message)- 422 Unprocessable Entityself.internal_server_error(message)- 500 Internal Server Errorself.service_unavailable(message)- 503 Service Unavailable
Pattern - Command/Query Handlers:
from neuroglia.mediation import Command, CommandHandler
@dataclass
class CreateTaskCommand(Command[OperationResult[dict]]):
title: str
description: str
class CreateTaskCommandHandler(CommandHandler[CreateTaskCommand, OperationResult[dict]]):
def __init__(self, task_repository: TaskRepository):
self._repository = task_repository
async def handle_async(self, request: CreateTaskCommand, cancellation_token=None) -> OperationResult[dict]:
# ✅ Use helper methods, NOT OperationResult.success/fail
# Validate input
if not request.title:
return self.bad_request("Title is required")
# Check existence
existing = await self._repository.get_by_title_async(request.title)
if existing:
return self.conflict("Task with this title already exists")
# Create entity
task = Task.create(title=request.title, description=request.description)
# Save (repository methods DO accept cancellation_token)
await self._repository.add_async(task, cancellation_token)
# Return success
return self.ok({"id": task.id(), "title": task.state.title})
Pattern - Checking Results:
# When calling commands/queries via mediator, check results:
result = await self.mediator.execute_async(GetWorkerQuery(worker_id="123"))
# ✅ CORRECT - Use .is_success (no "ful")
if result.is_success:
data = result.data # Access data via .data
print(f"Success: {data}")
else:
error = result.error_message # Access error via .error_message
print(f"Failed: {error}")
# ❌ WRONG - These don't exist
if result.is_successful: # AttributeError!
content = result.content # AttributeError!
errors = result.errors # AttributeError!
OperationResult API Reference:
.is_success: bool- Check if operation succeeded.data: T- Access result data (on success).error_message: str- Access error message (on failure).status_code: int- HTTP status code.detail: str- Additional error details.type: str- Error type.instance: str- Error instance identifier
Common Mistakes to Avoid¶
- ❌ DO NOT import or use
OperationResultfor construction:
from neuroglia.core import OperationResult # This is ProblemDetails, not a result wrapper!
return OperationResult.success(data) # AttributeError: no 'success' method
return OperationResult.fail(message) # AttributeError: no 'fail' method
- ❌ DO NOT pass cancellation_token to mediator:
- ❌ DO NOT use wrong attribute names:
if result.is_successful: # Should be is_success
data = result.content # Should be data
errors = result.errors # Should be error_message
- ✅ DO use helper methods consistently:
return self.ok({"key": "value"})
return self.not_found("Resource", "Resource not found")
return self.bad_request("Invalid input")
Repository vs Mediator Parameter Patterns¶
Repository methods accept cancellation_token:
worker = await self._repository.get_by_id_async(worker_id, cancellation_token)
await self._repository.update_async(worker, cancellation_token)
await self._repository.add_async(worker, cancellation_token)
Mediator calls do NOT accept cancellation_token:
result = await self.mediator.execute_async(GetWorkerQuery(worker_id="123"))
result = await self.mediator.execute_async(UpdateWorkerCommand(worker_id="123", name="Updated"))
Dependency Injection¶
Services registered in src/main.py::create_app():
builder.services.add_singleton(TaskRepository, MongoTaskRepository)
builder.services.add_scoped(CMLWorkerRepository, MongoCMLWorkerRepository)
builder.services.configure(DualAuthService.configure) # Factory method pattern
Scoped services: Repositories, Mediator (per-request lifecycle) Singletons: Settings, AWS client, notification handlers, schedulers
Accessing services in code:
# In controllers/handlers - inject via __init__
def __init__(self, service_provider, mapper, mediator, repository: TaskRepository):
self._repository = repository
# In background jobs - get from service provider
scope = self._service_provider.create_scope()
repository = scope.get_required_service(CMLWorkerRepository)
Controller Routing (notes/NEUROGLIA_CONTROLLER_PREFIX_FINDINGS.md)¶
Neuroglia auto-generates route prefixes from class names:
TasksController→/tasks/*UIController→/ui/*(not root!)
Override to serve at root:
class UIController(ControllerBase):
def __init__(self, service_provider, mapper, mediator):
super().__init__(service_provider, mapper, mediator)
self.prefix = "" # Override auto-generated prefix
CQRS with Mediator¶
Commands (write operations): application/commands/*.py
from neuroglia.mediation import Command, CommandHandler
@dataclass
class CreateTaskCommand(Command[OperationResult[TaskCreatedDto]]):
title: str
description: str
class CreateTaskCommandHandler(CommandHandler[CreateTaskCommand, OperationResult[TaskCreatedDto]]):
async def handle_async(self, request: CreateTaskCommand, cancellation_token=None) -> OperationResult[TaskCreatedDto]:
# Validate
if not request.title:
return self.bad_request("Title is required")
# Business logic
task = Task.create(title=request.title, description=request.description)
await self._repository.add_async(task, cancellation_token)
# Return using helper method
return self.created(TaskCreatedDto(id=task.id(), title=task.state.title))
Queries (read operations): application/queries/*.py - same pattern
Usage in controllers:
# ✅ CORRECT - Single argument only
result = await self.mediator.execute_async(CreateTaskCommand(title="...", description="..."))
return self.process(result) # Controllers use process() to convert to HTTP response
Event Sourcing with @dispatch¶
Domain events handled via multipledispatch:
@dispatch(CMLWorkerCreatedDomainEvent)
def on(self, event: CMLWorkerCreatedDomainEvent) -> None:
self.id = event.worker_id
self.name = event.name
self.status = CMLWorkerStatus.PENDING
Events automatically published via CloudEvent middleware when aggregate saved.
AWS Integration¶
Client: integration/services/aws_ec2_api_client.py::AwsEc2Client
- EC2 instance management (create, start, stop, terminate)
- CloudWatch metrics (CPU, memory, network, disk)
- Tag-based instance discovery
- Singleton service with boto3 client pooling
CML API Client: integration/services/cml_api_client.py::CMLApiClient
- CML REST API integration for worker instances
- System information (
/api/v0/system_information) - no auth required - System stats (
/api/v0/system_stats) - requires auth - Lab management endpoints (list, create, start, stop, wipe, delete)
- Node definitions and image definitions queries
Required AWS Environment Variables (see notes/AWS_IAM_PERMISSIONS_REQUIRED.md):
AWS_ACCESS_KEY_ID
AWS_SECRET_ACCESS_KEY
CML_WORKER_AMI_NAME_DEFAULT # AMI name filter
CML_WORKER_INSTANCE_TYPE_DEFAULT # e.g., m5.xlarge
CML_WORKER_SECURITY_GROUP_ID
CML_WORKER_SUBNET_ID
Configuration & Settings¶
Main config: src/application/settings.py::Settings (extends ApplicationSettings)
- Environment variables override defaults (e.g.,
APP_PORT,KEYCLOAK_URL) - Redis session store: Set
REDIS_ENABLED=trueandREDIS_URL=redis://host:6379/0 - Worker monitoring:
WORKER_MONITORING_ENABLED=true,WORKER_METRICS_POLL_INTERVAL=300
Keycloak setup: deployment/keycloak/ contains realm import JSON
- Default realm:
cml-cloud-manager - Default client:
cml-cloud-manager-public(public client for Authorization Code Flow)
UI Development¶
Frontend Stack: Bootstrap 5 + Vanilla JS + Parcel bundler
Source: src/ui/src/ (JS modules, SCSS)
Build: cd ui && npm run build → outputs to static/
Dev mode: cd ui && npm run dev (hot reload on port 1234)
Server-Side Events (SSE): UI subscribes to /api/events/stream for real-time worker status/metrics updates (see application/services/sse_event_relay.py)
Real-time event broadcasting:
- SSE clients register with
SSEEventRelayservice - Optional filtering by worker_ids and event_types
- Events published to all matching subscribed clients
- Common event types: worker_status, worker_metrics, lab_status
Common Pitfalls¶
- Forgot to build UI: Run
make build-uibeforemake runor app won't serve static assets - PYTHONPATH issues: Always run from project root with
PYTHONPATH=srcor usemakecommands - Neuroglia controllers at wrong path: Override
self.prefixin__init__if not using default naming - Auth middleware missing: Ensure
DualAuthService.configure()called increate_app()before mounting subapps - Background jobs not running: Check
WORKER_MONITORING_ENABLED=trueand verify APScheduler logs - Aggregate state not updating: Ensure
@dispatchdecorators present and events emitted viarecord_event()
Architecture Evolution & Migration Path¶
Current state: Imperative commands/queries (CQRS pattern) Target state: Hybrid model with Resource-Oriented Architecture (ROA)
Migration strategy (implement gradually):
- Preserve existing command/query patterns for immediate operations
- Add declarative resource definitions (desired state specifications)
- Implement autonomous watchers for resource observation
- Build reconciliation loops to align actual state with desired state
- Maintain backward compatibility during transition
Guiding principles:
- Portable: Cloud-native 12-factor app (local → Docker → Kubernetes deployment)
- Observable: OpenTelemetry integration (metrics, traces, logs)
- Actionable: CloudEvents for event-driven integration
- Maintainable: Clean Architecture with clear layer boundaries
Documentation¶
Required updates for any feature change:
CHANGELOG.md- User-facing changes under "Unreleased" sectionREADME.md- Update if user workflows changeMakefile- Add/update commands if new workflows introduceddocs/(MkDocs) - Architectural decisions, feature guides, troubleshooting
Documentation as code: Architecture and design decisions must be documented continuously in notes/*.md and docs/
- Architecture diagrams:
docs/architecture/(MkDocs site) - Design decisions:
notes/*.md(e.g.,AUTHENTICATION_ARCHITECTURE.md,WORKER_MONITORING_ARCHITECTURE.md) - API docs: Auto-generated at
/api/docs(Swagger UI) when app running - Build docs:
make docs-serve(MkDocs dev server on port 8000)
Code Style & Contribution Guidelines¶
Formatters: Black (line length 120 - see pyproject.toml), isort (Black profile)
Linter: Ruff with rules E, F, W, I, UP
Pre-commit: Install with make install-hooks - runs Black, isort, Ruff, detect-secrets on commit
- Enforcement: Pre-commit hooks required; exceptions must be documented with rationale
Import Guidelines¶
All imports MUST be at module level (top of file):
- Place all
importandfrom ... importstatements at the beginning of the file - Organize imports in standard order: stdlib → third-party → local (enforced by isort)
- No inline imports inside functions, methods, or classes except for:
- Circular dependency resolution (document with comment explaining why)
TYPE_CHECKINGimports for type hints (useif TYPE_CHECKING:block)
Correct pattern:
# Standard library
import logging
from dataclasses import dataclass
# Third-party
from neuroglia.mediation import Command, CommandHandler
# Local application
from domain.repositories import CMLWorkerRepository
from integration.enums import AwsRegion
class CreateWorkerCommandHandler(CommandHandler):
def handle_async(self, command):
region = AwsRegion(command.region) # ✅ Import at top
Incorrect pattern:
class CreateWorkerCommandHandler(CommandHandler):
def handle_async(self, command):
from integration.enums import AwsRegion # ❌ Inline import
region = AwsRegion(command.region)
TYPE_CHECKING exception (acceptable):
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from domain.entities import CMLWorker # ✅ Acceptable for type hints only
Benefits of module-level imports:
- Explicit dependency declaration at file start
- Better readability and maintainability
- Easier to detect circular dependencies
- Faster repeated execution (no repeated import overhead)
- Clearer for static analysis tools
Commit conventions:
- Style:
<type>: <description>(feat, fix, docs, refactor, test, chore) - Message format: Max 5 lines, group related changes logically
- DCO: All commits must be signed off (
git commit -s) - Atomic commits: One logical change per commit (easier to review/revert)
Example commit:
feat: add idle timeout detection for workers
- Add idle_timeout field to CMLWorker entity
- Implement detection via last_activity timestamp
- Schedule periodic idle check background job
- Update settings to configure timeout threshold
Using This Guide¶
For AI Agents¶
This guide is automatically loaded by GitHub Copilot and other AI assistants that support .github/copilot-instructions.md. The instructions provide context for:
- Understanding the multi-SubApp architecture
- Following CQRS patterns with self-contained commands/queries
- Working with Neuroglia framework specifics
- Implementing authentication flows
- Managing background jobs and monitoring
- Maintaining code quality standards
For Human Developers¶
Use this guide as:
- Onboarding reference for new team members
- Quick reference for architectural decisions
- Convention checklist before submitting PRs
- Troubleshooting guide for common issues
Keeping Content Updated¶
When making architectural changes or adding new patterns:
- Update
.github/copilot-instructions.md(single source of truth) - This page will automatically reflect the changes
- Update
CHANGELOG.mdif user-facing - Add detailed architectural decisions to
notes/*.mdif needed
Related Documentation¶
- Testing Guide - Comprehensive testing patterns and practices
- Makefile Reference - All available development commands
- Architecture Overview - Deep dive into system design
- Security - Authentication and authorization details
- Worker Monitoring - Background job orchestration