Control Plane API ArchitectureΒΆ
Version: 1.3.0 (April 2026) Status: Current Implementation
Related Documentation
For development workflows and Makefile commands, see the Development Guide.
Revision HistoryΒΆ
| Version | Date | Changes |
|---|---|---|
| 1.3.0 | 2026-04 | SSE race condition fix (ADR-039), LDS CloudEvent direct ingestion (ADR-040), LDS integration settings |
| 1.2.0 | 2026-02 | Added CloudEvent ingestion endpoint, CollectAndGrade command, etcd state publishing (ADR-018) |
| 1.1.0 | 2026-02 | Added dual auth architecture, SSE event relay |
| 1.0.0 | 2026-01 | Initial architecture documentation |
1. OverviewΒΆ
The Control Plane API is the central gateway for the Lablet Cloud Manager system. It serves as the Backend-for-Frontend (BFF) providing:
- REST API for LabletDefinition, LabletInstance, and Worker CRUD operations
- Bootstrap 5 SPA with Server-Side Events (SSE) for real-time updates
- OAuth2/OIDC Authentication via Keycloak
- CQRS Pattern with Commands (writes) and Queries (reads) via Mediator
- CloudEvents Publishing for external system integration (fire-and-forget)
- etcd State Publishing for controller watches
CQRS Location & Persistence Pattern
Control Plane API is the ONLY service that implements CQRS commands/queries. Controllers (resource-scheduler, worker-controller, lablet-controller) use Reconciliation Loops, not CQRS.
Event-Driven State-Based Persistence: The system uses a hybrid persistence pattern where:
- State-Based Storage: Aggregates are persisted as current state in MongoDB (not event-sourced)
- Domain Events: AggregateRoot emits domain events on state changes for in-process side effects
- CloudEvents: Domain events are automatically published to external event bus (fire-and-forget)
- etcd Publishing: State changes are published to etcd for controller watching
ADR-015: No External API Calls
Control Plane API MUST NOT make external API calls (AWS, CML). It only interacts with MongoDB, etcd, and Redis (sessions). All external operations are delegated to controllers via state/intent in the database.
2. Entity HierarchyΒΆ
erDiagram
CMLWorker ||--o{ LabRecord : hosts
CMLWorker ||--o{ NodeDefinition : contains
CMLWorker ||--o{ ImageDefinition : contains
LabletDefinition ||--o{ LabletInstance : instantiates
LabletInstance }o--|| CMLWorker : "assigned_to"
LabletInstance ||--o| LabRecord : "maps_to"
CMLWorker {
string id PK
string name
string region
string instance_type
string instance_id
WorkerStatus status
string ip_address
int idle_timeout
datetime created_at
}
LabRecord {
string id PK
string lab_id
string title
string owner
LabState state
int node_count
datetime created
}
LabletDefinition {
string id PK
string name
string description
string topology_yaml
string[] required_nodes
int estimated_duration
boolean is_public
}
LabletInstance {
string id PK
string definition_id FK
string worker_id FK
string user_id
InstanceState state
datetime scheduled_start
datetime scheduled_end
dict port_mappings
}
NodeDefinition {
string id PK
string node_definition_id
string label
string description
boolean is_custom
}
ImageDefinition {
string id PK
string image_id
string node_definition_id FK
string label
}
3. Storage StrategyΒΆ
| Entity | Storage | Access Pattern |
|---|---|---|
| CMLWorker | MongoDB | State-based aggregate with optimistic concurrency |
| LabRecord | MongoDB | Denormalized snapshot from CML API |
| LabletDefinition | MongoDB | Immutable versioned entity (see note below) |
| LabletInstance | MongoDB | State-based aggregate with status machine |
| NodeDefinition | MongoDB | Cached from CML Worker discovery |
| ImageDefinition | MongoDB | Cached from CML Worker discovery |
| Sessions | Redis | User session tokens (httpOnly cookies) |
| State Keys | etcd | Worker/instance state for controller watches |
LabletDefinition Versioning (Decision Pending)
LabletDefinition MUST be versioned so that LabletInstance always refers to a specific version. When a command "changes" a definition's state, it MUST create a new entity in the database (maintaining old versions while enabling automatic cleanup over time).
Design Decision Required:
- Version numbering scheme (semantic versioning vs auto-increment)
- Retention policy for old versions
- Migration strategy for existing data
This will be documented in a separate ADR.
4. Layer ArchitectureΒΆ
control-plane-api/
βββ api/ # HTTP Layer
β βββ controllers/ # REST endpoints (auto-prefixed)
β β βββ workers_controller.py
β β βββ lablet_definitions_controller.py
β β βββ lablet_instances_controller.py
β β βββ auth_controller.py
β βββ dependencies.py # DI for auth, sessions
β βββ services/
β βββ auth.py # DualAuthService (Cookie + JWT)
β
βββ application/ # Business Logic Layer
β βββ commands/ # Write operations (self-contained)
β β βββ create_worker_command.py
β β βββ start_worker_command.py
β β βββ create_lablet_instance_command.py
β βββ queries/ # Read operations (self-contained)
β β βββ get_workers_query.py
β β βββ get_lablet_instances_query.py
β βββ dtos/ # Data Transfer Objects
β βββ services/ # Application services
β β βββ worker_monitoring_scheduler.py
β β βββ sse_event_relay.py
β βββ jobs/ # Background tasks
β β βββ worker_metrics_collection_job.py
β β βββ labs_refresh_job.py
β βββ settings.py # Configuration (Pydantic Settings)
β
βββ domain/ # Core Domain Layer
β βββ entities/ # Aggregates
β β βββ cml_worker.py
β β βββ lab_record.py
β β βββ lablet_definition.py
β β βββ lablet_instance.py
β βββ events/ # Domain events (@cloudevent)
β βββ enums/ # Value objects
β β βββ worker_status.py
β β βββ instance_state.py
β βββ repositories/ # Abstract interfaces
β
βββ integration/ # External Service Adapters
β βββ repositories/ # MongoDB implementations
β β βββ mongo_worker_repository.py
β β βββ mongo_lablet_instance_repository.py
β βββ services/
β βββ aws_ec2_api_client.py
β βββ cml_api_client.py
β βββ control_plane_api_client.py
β
βββ infrastructure/ # Technical Adapters
β βββ session_store.py # Redis/InMemory session storage
β
βββ ui/ # Frontend Assets
β βββ src/ # Parcel source (JS, SCSS)
β βββ package.json
β
βββ observability/ # Instrumentation
βββ otel_config.py
5. Authentication ArchitectureΒΆ
The Control Plane API implements Dual Authentication supporting both browser-based and API-based access:
sequenceDiagram
participant Browser
participant API as Control Plane API
participant Redis
participant Keycloak
Note over Browser,Keycloak: Cookie-Based Flow (Browser)
Browser->>API: GET /api/auth/login
API->>Keycloak: Redirect to OIDC authorize
Keycloak-->>Browser: Login form
Browser->>Keycloak: Credentials
Keycloak->>API: Callback with code
API->>Keycloak: Exchange code for tokens
API->>Redis: Store tokens (session_id β tokens)
API-->>Browser: Set-Cookie: session_id (httpOnly)
Note over Browser,API: Subsequent requests
Browser->>API: GET /api/workers (Cookie: session_id)
API->>Redis: Lookup tokens by session_id
API->>API: Validate JWT
API-->>Browser: 200 OK
Note over Browser,Keycloak: Bearer Token Flow (API Clients)
Browser->>Keycloak: POST /token (client_credentials)
Keycloak-->>Browser: access_token
Browser->>API: GET /api/workers (Authorization: Bearer token)
API->>Keycloak: Validate JWT (JWKS)
API-->>Browser: 200 OK
Security ModelΒΆ
| Aspect | Implementation |
|---|---|
| Session Storage | Redis (production) or in-memory (dev) |
| Token Exposure | Never exposed to browser JS (httpOnly cookies) |
| CSRF Protection | SameSite cookie attribute |
| Token Refresh | Server-side refresh before expiry |
| RBAC | Keycloak realm roles mapped to permissions |
6. Background ServicesΒΆ
The Control Plane API runs several background services:
Worker Monitoring SchedulerΒΆ
Coordinates per-worker metrics collection:
class WorkerMonitoringScheduler:
"""
Lifecycle:
1. On startup: Query active workers
2. For each worker: Schedule WorkerMetricsCollectionJob
3. On worker state change: Add/remove jobs dynamically
"""
async def start_async(self):
workers = await self._get_active_workers()
for worker in workers:
self._scheduler.add_job(
WorkerMetricsCollectionJob(worker.id),
trigger='interval',
seconds=self._settings.metrics_poll_interval
)
SSE Event RelayΒΆ
Broadcasts real-time updates to connected clients:
class SSEEventRelay:
"""
- Clients subscribe with optional filters (worker_ids, event_types)
- Events broadcast to all matching subscribers
- Common events: worker_status, worker_metrics, lab_status
"""
7. API DesignΒΆ
Controller RoutingΒΆ
Neuroglia auto-generates route prefixes from controller class names:
| Controller Class | Route Prefix |
|---|---|
WorkersController |
/workers/* |
LabletDefinitionsController |
/lablet-definitions/* |
LabletInstancesController |
/lablet-instances/* |
AuthController |
/auth/* |
Prefix Convention
Do NOT add prefix in @route decorators - the controller name IS the prefix.
CQRS PatternΒΆ
All operations use the Mediator pattern:
# In controller
result = await self.mediator.execute_async(
GetWorkersQuery(status_filter=WorkerStatus.RUNNING)
)
return self.process(result)
# In handler
class GetWorkersQueryHandler(QueryHandler[GetWorkersQuery, OperationResult[list[WorkerDto]]]):
async def handle_async(self, request, cancellation_token=None):
workers = await self._repository.get_all_async(cancellation_token)
return self.ok([WorkerDto.from_entity(w) for w in workers])
8. External IntegrationsΒΆ
AWS EC2 ClientΒΆ
class AwsEc2Client:
"""
Singleton service with boto3 client pooling.
Operations:
- describe_instances() - Query instance state
- start_instances() - Start stopped instances
- stop_instances() - Stop running instances
- terminate_instances() - Terminate instances
- get_cloudwatch_metrics() - CPU, memory, network, disk
"""
CML API ClientΒΆ
class CMLApiClient:
"""
Per-worker client for CML REST API.
Endpoints:
- /api/v0/system_information - No auth, worker info
- /api/v0/system_stats - Auth required, resource usage
- /api/v0/labs - Lab lifecycle operations
- /api/v0/node_definitions - Available node types
"""
CloudEvent Ingestion (LDS Integration β ADR-040)ΒΆ
The Control Plane API receives CloudEvents from external systems via its CloudEventMiddleware (mounted on the main app). Per ADR-040, CPA handles simple state-transition CloudEvents directly, while complex orchestration events are routed to lablet-controller per ADR-022.
sequenceDiagram
participant LDS as Lab Delivery System
participant CPA as Control Plane API
participant Handler as LdsSessionRunningHandler
participant MongoDB
participant SSE as SSE Event Relay
participant Browser
LDS->>CPA: POST / (CloudEvent: io.lablet.lds.session.running.v1)
CPA->>Handler: CloudEventIngestor dispatches
Handler->>Handler: Deduplication check
Handler->>MongoDB: Load LabletSession aggregate
Handler->>Handler: session.mark_running() (READY β RUNNING)
Handler->>MongoDB: Save aggregate (emits domain events)
MongoDB-->>SSE: Domain event β SSE handler
SSE-->>Browser: lablet.session.status.changed
CPA-->>LDS: 202 Accepted
Supported LDS CloudEvent TypesΒΆ
| Event Type | Handler | Action | State Transition |
|---|---|---|---|
io.lablet.lds.session.running.v1 |
LdsSessionRunningHandler |
Mark session running | READY β RUNNING |
io.lablet.lds.session.paused.v1 |
LdsSessionPausedHandler |
Informational log | β |
io.lablet.lds.session.ended.v1 |
LdsSessionEndedHandler |
Informational log | β |
Dual Routing Model
Events requiring external calls (e.g., lds.session.user-finished β GradingSPI) are routed
to lablet-controller per ADR-022. Only simple aggregate mutations are handled directly by CPA.
LDS Integration SettingsΒΆ
| Variable | Description | Default |
|---|---|---|
LDS_CLOUDEVENT_SOURCE |
Expected CloudEvent source URI | https://labs.lcm.io |
LDS_CLOUDEVENT_TYPE_PREFIX |
CloudEvent type prefix for LDS | io.lablet.lds |
LDS_CLOUDEVENT_ENABLED |
Feature toggle for LDS event processing | true |
9. CQRS Commands ReferenceΒΆ
Key commands implemented in Control Plane API:
| Command | Purpose | Outcome |
|---|---|---|
CreateWorkerCommand |
Register new CML worker | Worker in PENDING state |
StartWorkerCommand |
Request worker startup | desired_state = RUNNING |
StopWorkerCommand |
Request worker stop | desired_state = STOPPED |
CreateLabletInstanceCommand |
Create new instance | Instance in PENDING state |
ScheduleLabletInstanceCommand |
Assign worker to instance | Instance in SCHEDULED state |
InstantiateLabletInstanceCommand |
Import/start lab on worker | Instance in INSTANTIATING/READY |
CollectAndGradeCommand |
Trigger grading workflow | Instance in GRADED state |
TerminateLabletInstanceCommand |
Cleanup and terminate | Instance in TERMINATED state |
CollectAndGrade Command (FR-2.2.7)ΒΆ
Triggers the grading workflow for a running lablet instance:
@dataclass
class CollectAndGradeCommand(Command[OperationResult[GradingResultDto]]):
"""Collect lab state and submit for grading."""
instance_id: str
grading_criteria: dict | None = None # Optional custom criteria
class CollectAndGradeCommandHandler(CommandHandler):
async def handle_async(self, request, cancellation_token=None):
# 1. Get instance from repository
instance = await self._repository.get_by_id_async(request.instance_id)
if not instance:
return self.not_found("LabletInstance", f"Instance {request.instance_id} not found")
if instance.state.status != LabletInstanceStatus.RUNNING:
return self.bad_request("Instance must be RUNNING to collect and grade")
# 2. Trigger grading (publishes domain event)
instance.collect_and_grade(request.grading_criteria)
# 3. Persist
await self._repository.update_async(instance, cancellation_token)
return self.accepted(GradingResultDto(instance_id=request.instance_id, status="GRADING"))
10. ConfigurationΒΆ
Key environment variables:
| Variable | Description | Default |
|---|---|---|
KEYCLOAK_URL |
Keycloak server URL | Required |
KEYCLOAK_REALM |
OIDC realm | lablet-cloud-manager |
CONNECTION_STRINGS |
MongoDB connection | Required |
REDIS_URL |
Redis URL (sessions) | Optional |
WORKER_MONITORING_ENABLED |
Enable background monitoring | true |
WORKER_METRICS_POLL_INTERVAL |
Metrics collection interval (sec) | 300 |
ETCD_ENDPOINTS |
etcd cluster endpoints | localhost:2379 |
LDS_CLOUDEVENT_SOURCE |
Expected LDS CloudEvent source URI | https://labs.lcm.io |
LDS_CLOUDEVENT_TYPE_PREFIX |
LDS CloudEvent type prefix | io.lablet.lds |
LDS_CLOUDEVENT_ENABLED |
Enable LDS CloudEvent processing | true |