Lablet Resource Manager - Architecture Design¶
| Attribute | Value |
|---|---|
| Document Version | 0.5.0 |
| Status | Draft |
| Created | 2026-01-15 |
| Last Updated | 2026-02-18 |
| Author | Architecture Team |
| Related | Requirements Specification, ADRs |
1. Architecture Overview¶
1.1 Design Principles¶
| Principle | Application |
|---|---|
| Declarative over Imperative | Users declare desired state; system reconciles |
| Separation of Concerns | API, Scheduling, Control each have distinct responsibilities |
| Event-Driven Integration | CloudEvents for async external communication |
| API-Centric State Management | Single source of truth via Control Plane API |
| Provider Abstraction | SPI pattern for cloud provider independence |
1.2 High-Level Architecture¶
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EXTERNAL CLIENTS β
β ββββββββββββ ββββββββββββ ββββββββββββββββββββ βββββββββββββββββββββ β
β β REST API β β UI (SPA) β β Assessment Svc β β Audit/Compliance β β
β β Clients β β β β (CloudEvents) β β (CloudEvents) β β
β ββββββ¬ββββββ ββββββ¬ββββββ ββββββββββ¬ββββββββββ βββββββββββ¬ββββββββββ β
βββββββββΌββββββββββββββΌββββββββββββββββββΌβββββββββββββββββββββββΌββββββββββββββ
β β β β
βΌ βΌ βΌ βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CML CLOUD MANAGER SYSTEM β
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CONTROL PLANE API β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββ β β
β β β Definition β β Session β β Worker β β Reservation β β β
β β β Endpoints β β Endpoints β β Endpoints β β Endpoints β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββ β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββ β β
β β β SSE Stream β β Admission β β Rate β β Auth/RBAC β β β
β β β β β Control β β Limiting β β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββββ β β
β βββββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DUAL STORAGE ARCHITECTURE β β
β β β β
β β βββββββββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββ β β
β β β STATE STORE (etcd) β β SPEC STORE (MongoDB) β β β
β β β β β β β β
β β β β’ Instance states β β β’ LabletDefinitions (full) β β β
β β β β’ Worker states β β β’ WorkerTemplates (full) β β β
β β β β’ Port allocations β β β’ Audit events (CloudEvents) β β β
β β β β’ Leader election keys β β β’ Complex aggregates β β β
β β β β’ Watch subscriptions β β β’ Historical data β β β
β β β β β β β β
β β β [Native Watch Mechanism] β β [Rich Query Capabilities] β β β
β β βββββββββββββββ¬ββββββββββββββ ββββββββββββββββββββββββββββββββββ β β
β β β β β
β β β Watch Events β β
β β βΌ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββββββββΌβββββββββββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β RESOURCE β β LABLET β β WORKER β β
β β SCHEDULER β β CONTROLLER β β CONTROLLER β β
β β β β β β β β
β β β’ Watch for β β β’ Watch for β β β’ Watch for β β
β β PENDING β β SCHEDULED β β Workers β β
β β β’ Placement β β β’ Reconcile β β β’ Reconcile β β
β β β’ Queue Mgmtβ β Instances β β Workers β β
β β β’ Timeslots β β vs Labs β β vs EC2 β β
β β β β β’ DRAINING β β β’ License β β
β β [Leader β β [Leader β β [Leader β β
β β Election] β β Election] β β Election] β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β β β β
β β β β β
β β ββββββββ΄βββββββ ββββββββ΄βββββββ β
β β β CML LABS β β CLOUD β β
β β β SPI β β PROVIDER β β
β β β β β SPI β β
β β β β’ Labs API β β β β
β β β β’ Nodes API β β β’ AWS EC2 β β
β β β β’ Links API β β β’ CloudWatchβ β
β β β β’ Interfacesβ β β’ CML Systemβ β
β β βββββββββββββββ βββββββββββββββ β
β β β β β
β ββββββββββββββββββββββΌβββββββββββββββββββββ β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CLOUDEVENTS BUS β β
β β (External Event Sink) β β
β β β β
β β [Persists events for audit/analytics - NOT primary write model] β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CML WORKERS (Data Plane) β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β β β Worker 1 β β Worker 2 β β Worker N β β β
β β β (Personal) β β (Enterprise)β β (DRAINING) β β β
β β β β β β β β β β
β β β βββββββββββ β β βββββββββββ β β βββββββββββ β β β
β β β βSession 1β β β βSession 3β β β βSession 5β β ββ completing β β
β β β βββββββββββ β β βββββββββββ€ β β βββββββββββ β β β
β β β βββββββββββ β β βSession 4β β β β ββ no new β β
β β β βSession 2β β β βββββββββββ β β β assignments β β
β β β βββββββββββ β β β β β β β
β β βββββββββββββββ βββββββββββββββ βββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EXTERNAL SERVICES β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββββββββββ β
β β Artifact Storage β β Keycloak β β OTEL Collector β β
β β (S3/MinIO) β β (Auth) β β (Traces/Metrics/Logs) β β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
1.3 Storage Architecture Decision¶
See ADR-005: Dual State Store Architecture for full rationale.
| Store | Purpose | Data Types | Key Feature |
|---|---|---|---|
| etcd | State coordination | Instance states, worker states, port allocations, leader keys | Native watch mechanism |
| MongoDB | Spec/document storage | LabletDefinitions, WorkerTemplates, Audit events | Rich queries, schema flexibility |
| Redis | UI Session storage | User sessions (httpOnly cookies) | Fast, ephemeral |
Why not just MongoDB?
- MongoDB Change Streams have limitations (cursor timeout, resumption complexity)
- No built-in leader election primitives
- etcd's watch mechanism is more reliable for reactive state propagation
Redis clarification:
- Redis stores UI session data (user authentication state via httpOnly cookies)
- NOT used for Resource Scheduler/Controller coordination (that's etcd)
- Could migrate to etcd, but Redis is simpler for session TTL management
2. Component Design¶
2.1 Control Plane API¶
Responsibility: Central gateway for all state operations, authentication, and real-time updates.
Key Design Decision: The Control Plane API is the ONLY component that writes to MongoDB and etcd. All other services (Resource Scheduler, Lablet Controller, Worker Controller) read state and request mutations via the API.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CONTROL PLANE API β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββ ββββββββββββββββββ ββββββββββββββββββ β
β β REST API β β Event API β β SSE Stream β β
β β Endpoints β β (Webhooks) β β (Real-time) β β
β βββββββββ¬βββββββββ βββββββββ¬βββββββββ βββββββββ¬βββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ADMISSION CONTROL β β
β β β’ Authentication (Keycloak JWT) β β
β β β’ Authorization (RBAC) β β
β β β’ Rate Limiting β β
β β β’ Request Validation β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β COMMAND/QUERY BUS β β
β β (Neuroglia Mediator) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β DOMAIN LAYER β β
β β ββββββββββββββ ββββββββββββββ ββββββββββββββββββββββ β β
β β β Lablet β β Lablet β β CMLWorker β β β
β β β Definition β β Session β β (Extended) β β β
β β β Aggregate β β Aggregate β β Aggregate β β β
β β β β β + UserSess β β β β β
β β β β β + GradeSes β β β β β
β β β β β + ScoreRep β β β β β
β β ββββββββββββββ ββββββββββββββ ββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β EVENT PUBLISHER β β
β β (CloudEvents) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.1.1 API Endpoints¶
LabletDefinition Endpoints:
| Method | Path | Description |
|---|---|---|
| POST | /api/v1/definitions |
Create/register new definition |
| GET | /api/v1/definitions |
List all definitions |
| GET | /api/v1/definitions/{id} |
Get definition by ID |
| GET | /api/v1/definitions/{id}/versions |
List all versions |
| GET | /api/v1/definitions/{id}/versions/{version} |
Get specific version |
| POST | /api/v1/definitions/{id}/sync |
Trigger artifact sync |
| DELETE | /api/v1/definitions/{id} |
Soft-delete definition |
LabletSession Endpoints:
| Method | Path | Description |
|---|---|---|
| POST | /api/v1/sessions |
Create session (reservation) |
| GET | /api/v1/sessions |
List sessions (with filters) |
| GET | /api/v1/sessions/{id} |
Get session details |
| POST | /api/v1/sessions/{id}/start |
Start stopped session |
| POST | /api/v1/sessions/{id}/stop |
Stop running session |
| POST | /api/v1/sessions/{id}/collect |
Trigger collection |
| DELETE | /api/v1/sessions/{id} |
Terminate session |
UserSession Endpoints (LDS Integration):
| Method | Path | Description |
|---|---|---|
| GET | /api/v1/sessions/{id}/user-session |
Get UserSession details |
| GET | /api/v1/sessions/{id}/user-session/login-url |
Get LDS IFRAME login URL |
GradingSession Endpoints:
| Method | Path | Description |
|---|---|---|
| GET | /api/v1/sessions/{id}/grading-session |
Get GradingSession details |
| POST | /api/v1/sessions/{id}/grade |
Trigger grading |
ScoreReport Endpoints:
| Method | Path | Description |
|---|---|---|
| GET | /api/v1/sessions/{id}/score-report |
Get score report |
| GET | /api/v1/score-reports |
List/query score reports (reporting) |
Worker Endpoints (Extended):
| Method | Path | Description |
|---|---|---|
| GET | /api/v1/workers/{id}/capacity |
Get capacity details |
| GET | /api/v1/workers/{id}/instances |
List instances on worker |
| GET | /api/v1/workers/{id}/ports |
Get port allocations |
Internal Endpoints (for Controllers):
| Method | Path | Description |
|---|---|---|
| POST | /api/internal/sessions/{id}/schedule |
Assign worker to session |
| POST | /api/internal/sessions/{id}/allocate-ports |
Allocate ports |
| POST | /api/internal/sessions/{id}/transition |
Transition state |
| POST | /api/internal/sessions/{id}/user-session |
Create UserSession (LDS provisioned) |
| PUT | /api/internal/sessions/{id}/user-session/status |
Update UserSession status |
| POST | /api/internal/sessions/{id}/grading-session |
Create GradingSession |
| PUT | /api/internal/sessions/{id}/grading-session/status |
Update GradingSession status |
| POST | /api/internal/sessions/{id}/score-report |
Store ScoreReport |
| POST | /api/internal/workers/scale-up |
Request new worker |
| POST | /api/internal/workers/{id}/scale-down |
Stop/terminate worker |
2.2 Resource Resource Scheduler¶
Responsibility: Make placement decisions and manage the scheduling queue.
Key Design Decision: Stateless service that reads state via etcd watches and writes decisions via Control Plane API. Uses leader election for HA (see ADR-006).
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β RESOURCE SCHEDULER SERVICE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LEADER ELECTION (etcd) β β
β β Only leader runs scheduling loop; standbys watch β β
β βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β SCHEDULING LOOP β β
β β Triggered by: etcd watch + Periodic reconciliation (30s)β β
β βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββΌββββββββββββββββ β
β βΌ βΌ βΌ β
β ββββββββββββββββ ββββββββββββββββ ββββββββββββββββ β
β β PENDING β β SCHEDULED β β APPROACHING β β
β β QUEUE β β QUEUE β β TIMESLOTS β β
β β PROCESSOR β β MONITOR β β MONITOR β β
β β β β β β β β
β β [etcd watch: β β [Verify β β [35min lead β β
β β state=PEND] β β assignments]β β time check] β β
β ββββββββ¬ββββββββ ββββββββ¬ββββββββ ββββββββ¬ββββββββ β
β β β β β
β βΌ βΌ βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β PLACEMENT ENGINE β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β 1. Filter: License Affinity β β β
β β β 2. Filter: Resource Requirements β β β
β β β 3. Filter: AMI Requirements β β β
β β β 4. Filter: Available Capacity β β β
β β β 5. Filter: Available Ports β β β
β β β 6. Filter: NOT DRAINING (exclude draining workers) β β β
β β β 7. Score: Bin-Packing (prefer fuller workers) β β β
β β β 8. Select: Highest scoring worker β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β β
β β Outcome: β β
β β β’ Worker Found β Call API to schedule instance β β
β β β’ No Worker β Signal Lablet Controller for scale-up β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.2.0 Resource Scheduler High Availability¶
See ADR-006: Resource Scheduler HA Coordination for full details.
How multiple resource schedulers coordinate:
class ResourceSchedulerService:
"""Resource Scheduler with leader election."""
def __init__(self, etcd_client, api_client, instance_id: str):
self.etcd = etcd_client
self.api = api_client
self.instance_id = instance_id
self.leader_key = "/lcm/resource-scheduler/leader"
self.is_leader = False
async def start_async(self):
"""Start the resource scheduler service."""
# Attempt to become leader
self.is_leader = await self._campaign_for_leadership()
if self.is_leader:
# Start leadership maintenance and scheduling loop
asyncio.create_task(self._maintain_leadership())
asyncio.create_task(self._run_scheduling_loop())
else:
# Watch for leader changes
asyncio.create_task(self._watch_leader())
async def _campaign_for_leadership(self) -> bool:
"""Try to become leader via etcd lease."""
lease = await self.etcd.lease(ttl=15) # 15 second lease
try:
await self.etcd.put(
self.leader_key,
self.instance_id,
lease=lease,
prev_kv=False,
create_only=True # Only succeeds if key doesn't exist
)
self._lease = lease
return True
except KeyExistsError:
return False
async def _watch_leader(self):
"""Watch leader key, campaign when leader fails."""
async for event in self.etcd.watch(self.leader_key):
if event.type == EventType.DELETE:
# Leader lost, try to take over
self.is_leader = await self._campaign_for_leadership()
if self.is_leader:
asyncio.create_task(self._maintain_leadership())
asyncio.create_task(self._run_scheduling_loop())
Failover timeline:
- Leader crashes β Lease expires in ~15 seconds β Standby detects via watch β Standby campaigns and wins β New leader starts scheduling
Total failover time: ~15-20 seconds
2.2.1 Scheduling Algorithm¶
def schedule_session(session: LabletSession) -> SchedulingDecision:
"""
Placement algorithm for LabletSession.
Returns assigned worker or scale-up request.
"""
definition = get_definition(session.definition_id)
# Phase 1: Filter eligible workers
candidates = []
for worker in get_active_workers():
if not matches_license_affinity(worker, definition):
continue
if not meets_resource_requirements(worker, definition):
continue
if not matches_ami_requirements(worker, definition):
continue
if not has_available_capacity(worker, definition):
continue
if not has_available_ports(worker, definition.port_count):
continue
candidates.append(worker)
# Phase 2: No candidates - request scale-up
if not candidates:
return SchedulingDecision(
action=ScaleUpRequired,
worker_template=select_template(definition),
reason="No worker with sufficient capacity"
)
# Phase 3: Score candidates (bin-packing)
scored = []
for worker in candidates:
score = calculate_utilization_score(worker) # Higher = fuller
scored.append((worker, score))
# Phase 4: Select best worker
scored.sort(key=lambda x: x[1], reverse=True)
selected_worker = scored[0][0]
return SchedulingDecision(
action=AssignWorker,
worker_id=selected_worker.id,
reason=f"Best fit with {scored[0][1]:.2f} utilization"
)
2.2.2 Timeslot Management¶
Timeline:
NOW TIMESLOT_START TIMESLOT_END
β β β
βΌ βΌ βΌ
ββββββΌβββββββββββββββββββββββββββββΌββββββββββββββββββββββββββΌββββββΆ
β β β
ββββββ LEAD_TIME βββββββββββββ€ β
β (15 min buffer) β β
β β β
β βββββββββββββββββββββββ β ββββββββββββββββββββ β
β β INSTANTIATION β β β RUNNING β β
β β (Import + Start) β β β (User Session) β β
β βββββββββββββββββββββββ β ββββββββββββββββββββ β
The resource scheduler monitors approaching timeslots and triggers instantiation with LEAD_TIME buffer (default: 15 minutes to account for worker startup).
2.3 Lablet Controller (src/lablet-controller/)¶
Responsibility: LabletSession reconciliation loop - reconciles desired session state (spec) against actual CML lab, LDS, and GradingEngine state.
Domain: Application-layer workload management. Talks exclusively to CML Labs SPI (labs/nodes/interfaces/links API), LDS SPI (sessions/devices), and GradingEngine SPI (sessions/parts/pods).
Key Design Decision: Stateless service operating on a periodic reconciliation cycle. Detects drift between desired LabletSession state and actual external system states. All mutations go through Control Plane API (ADR-001). Only service that communicates with LDS, GradingEngine, and CML Labs API. Receives CloudEvents from LDS and GradingEngine via Neuroglia CloudEventIngestor and proxies state updates to CPA.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LABLET CONTROLLER β
β (Application Layer - Workloads) β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LEADER ELECTION (etcd) β β
β βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ β
β β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RECONCILIATION LOOP β β
β β (Every 30 seconds) β β
β β β β
β β For each LabletSession: β β
β β SPEC (desired) ββ OBSERVE (actual) β ACT β β
β βββββββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CML LABS SPI (Service Provider Interface) β β
β β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β /api/v0/labs β β β
β β β β’ Create lab (import topology) β β β
β β β β’ Start/stop/wipe lab β β β
β β β β’ Get lab state β β β
β β β β’ Delete lab β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β /api/v0/labs/{id}/nodes β β β
β β β β’ List nodes in lab β β β
β β β β’ Get node state β β β
β β β β’ Extract node configs β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β /api/v0/labs/{id}/interfaces β β β
β β β β’ Get console ports β β β
β β β β’ Map external ports β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β /api/v0/labs/{id}/links β β β
β β β β’ Topology connectivity β β β
β β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CONTROL PLANE API β β
β β (All mutations via API - ADR-001) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.3.0 Lablet Controller Reconciliation Pattern¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β LABLET CONTROLLER - RECONCILIATION PATTERN β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β
β β SPEC β β OBSERVE β β ACT β β
β β (Desired) β β (Actual) β β (Reconcile) β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β
β β LabletSession β β CML Lab State β β β’ Import lab β β
β β β’ state=RUNNING β β β’ state=DEFINED β β β’ Start nodes β β
β β β’ worker_id=W1 β ββ β β’ nodes stopped β β β β’ Allocate ports β β
β β β’ ports={...} β β β’ no ports β β β’ Update state β β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β
β β
β Source: MongoDB Source: CML Labs API Target: Both β
β (via Control Plane) (direct observation) (via Control Plane) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Reconciliation Examples:
| Desired (Spec) | Actual (Observed) | Action |
|---|---|---|
| Session state=RUNNING | Lab not imported | Import topology, start lab |
| Session state=RUNNING | Lab state=DEFINED | Start lab nodes |
| Session state=RUNNING | Lab state=STARTED | No action (converged) |
| Session state=STOPPED | Lab state=STARTED | Stop lab nodes |
| Session state=TERMINATED | Lab exists | Wipe and delete lab |
2.3.1 Scale-Up Logic¶
See ADR-008: Worker Draining State for draining behavior.
Critical Timing Considerations:
- Worker bootup time: 15-20 minutes (EC2 m5zn.metal + CML initialization)
- Lablet instantiation time: Up to 15 minutes (lab import + node startup)
- Total lead time: Up to 35 minutes before scheduled timeslot
# Configurable timing parameters
WORKER_BOOTUP_DELAY_MINUTES = 20 # m5zn.metal EC2 + CML startup
LABLET_INSTANTIATION_DELAY_MINUTES = 15 # Lab import + node startup
TOTAL_LEAD_TIME_MINUTES = WORKER_BOOTUP_DELAY_MINUTES + LABLET_INSTANTIATION_DELAY_MINUTES
def check_scale_up_needed() -> list[ScaleUpAction]:
"""
Determine if new workers are needed.
Called by Lablet Controller reconciliation loop.
Must account for:
1. Worker bootup delay (15-20 min for m5zn.metal)
2. Lablet instantiation delay (up to 15 min)
"""
actions = []
# Get scheduled sessions approaching timeslot
# Use TOTAL_LEAD_TIME to account for both delays
approaching = get_sessions_approaching_timeslot(
lead_time_minutes=TOTAL_LEAD_TIME_MINUTES # ~35 minutes
)
for session in approaching:
if session.worker_id is None:
# Session not yet assigned - resource scheduler couldn't place it
definition = get_definition(session.definition_id)
template = select_worker_template(definition)
# Check if scale-up already in progress for this template
pending_workers = get_workers_in_state(
template=template,
states=[WorkerStatus.PENDING, WorkerStatus.PROVISIONING]
)
if not pending_workers:
actions.append(ScaleUpAction(
template=template,
reason=f"Session {session.id} approaching timeslot with no capacity",
estimated_ready_time=datetime.now() + timedelta(minutes=WORKER_BOOTUP_DELAY_MINUTES)
))
return actions
2.3.2 Scale-Down Logic¶
IMPORTANT: Workers should enter DRAINING state before scale-down to allow running instances to complete gracefully.
SCALE_DOWN_GRACE_PERIOD_MINUTES = 30 # Don't scale down if work approaching
def check_scale_down_candidates() -> list[ScaleDownAction]:
"""
Identify workers eligible for scale-down.
Process:
1. Find idle workers (no running instances)
2. Check for upcoming scheduled work
3. Transition to DRAINING (not immediate stop)
4. DRAINING workers complete existing work, accept no new assignments
5. When DRAINING worker is empty -> STOPPING -> STOPPED
"""
actions = []
for worker in get_workers_in_state(states=[WorkerStatus.RUNNING]):
# Check if worker has any active sessions
active_sessions = get_sessions_on_worker(
worker_id=worker.id,
states=[
SessionState.RUNNING,
SessionState.COLLECTING,
SessionState.GRADING
]
)
if active_sessions:
continue # Worker is active, cannot scale down
# Check if worker has upcoming scheduled sessions
scheduled_sessions = get_sessions_on_worker(
worker_id=worker.id,
states=[
SessionState.SCHEDULED,
SessionState.INSTANTIATING
]
)
if scheduled_sessions:
continue # Worker has pending work
# Check approaching timeslots (any session scheduled to this worker)
approaching = get_approaching_sessions_for_worker(
worker_id=worker.id,
lookahead_minutes=SCALE_DOWN_GRACE_PERIOD_MINUTES
)
if approaching:
continue # Work coming soon
# Worker is idle - candidate for scale-down
# Prefer DRAINING transition over immediate stop
actions.append(ScaleDownAction(
worker_id=worker.id,
action=ScaleDownActionType.DRAIN, # Start draining, not immediate stop
reason="No running or scheduled sessions"
))
# Also check DRAINING workers that can be stopped
for worker in get_workers_in_state(states=[WorkerStatus.DRAINING]):
sessions_on_worker = get_sessions_on_worker(
worker_id=worker.id,
states=ACTIVE_SESSION_STATES
)
if not sessions_on_worker:
# DRAINING worker with no sessions -> stop it
actions.append(ScaleDownAction(
worker_id=worker.id,
action=ScaleDownActionType.STOP,
reason="Draining complete, no remaining sessions"
))
return actions
2.3.3 Worker State Machine with DRAINING¶
βββββββββββββββββββ
β β
βΌ β
βββββββββββ βββββββββββββββ ββββββββββββ ββββββββββββ
β PENDING βββββΆβ PROVISIONINGβββββΆβ RUNNING βββββΆβ DRAINING β
βββββββββββ βββββββββββββββ ββββββββββββ ββββββββββββ
β β
β β All instances
β β completed
βΌ βΌ
ββββββββββββ ββββββββββββ
β STOPPING ββββββ (empty) β
ββββββββββββ ββββββββββββ
β
βΌ
ββββββββββββ
β STOPPED β
ββββββββββββ
β
βΌ
ββββββββββββββ
β TERMINATED β
ββββββββββββββ
DRAINING State Behavior:
- Continues running existing LabletSessions
- Does NOT accept new session assignments (Resource Scheduler skips)
- Transitions to STOPPING when last session terminates
- Has configurable timeout (default 4 hours) after which force-stop
2.4 Worker Controller (src/worker-controller/)¶
Responsibility: CML Worker reconciliation loop - reconciles desired worker state (spec) against actual cloud infrastructure state.
Domain: Infrastructure-layer resource management. Talks exclusively to Cloud Provider SPI (AWS EC2, CloudWatch, CML system API).
Key Design Decision: Separate service from Lablet Controller to enable clear domain separation. Worker Controller reconciles CML Workers (infrastructure layer); Lablet Controller reconciles LabletSessions (application layer). All mutations go through Control Plane API (ADR-001).
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WORKER CONTROLLER β
β (Infrastructure Layer - Compute) β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β LEADER ELECTION (etcd) β β
β βββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β RECONCILIATION LOOP β β
β β (Every 30 seconds) β β
β β β β
β β For each CMLWorker: β β
β β SPEC (desired) ββ OBSERVE (actual) β ACT β β
β βββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CLOUD PROVIDER SPI (Service Provider Interface). β β
β β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β AWS EC2 API β β β
β β β β’ Describe instances (status, tags) β β β
β β β β’ Start/stop/terminate instances β β β
β β β β’ Create instances (scale-up) β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β AWS CloudWatch API β β β
β β β β’ Instance CPU/memory/network metrics β β β
β β β β’ Disk I/O and utilization β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β β β CML System API (worker-level) β β β
β β β β’ /api/v0/system_information (no auth required) β β β
β β β β’ /api/v0/system_stats (requires auth) β β β
β β β β’ License registration/deregistration β β β
β β βββββββββββββββββββββββββββββββββββββββββββββββββββββββ β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β β
β βΌ β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β CONTROL PLANE API β β
β β (All mutations via API - ADR-001) β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2.4.0 Worker Controller Reconciliation Pattern¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β WORKER CONTROLLER - RECONCILIATION PATTERN β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β
β β SPEC β β OBSERVE β β ACT β β
β β (Desired) β β (Actual) β β (Reconcile) β β
β ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ ββββββββββ¬ββββββββββ β
β β β β β
β βΌ βΌ βΌ β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β
β β CMLWorker β β EC2 + CML State β β β’ Launch EC2 β β
β β β’ status=RUNNING β β β’ EC2 running β β β’ Register lic. β β
β β β’ license=ENT β ββ β β’ CML ready β β β β’ Update status β β
β β β’ region=us-e-1 β β β’ No license β β β’ Collect metricsβ β
β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β
β β
β Source: MongoDB Source: AWS + CML API Target: Both β
β (via Control Plane) (direct observation) (via Control Plane) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Reconciliation Examples:
| Desired (Spec) | Actual (Observed) | Action |
|---|---|---|
| Worker status=RUNNING | EC2 stopped | Start EC2 instance |
| Worker status=RUNNING | EC2 running, CML unlicensed | Register CML license |
| Worker status=RUNNING | EC2 running, CML licensed | Update metrics, no action |
| Worker status=STOPPED | EC2 running | Stop EC2 instance |
| Worker status=TERMINATED | EC2 exists | Terminate EC2 instance |
| Worker imported=false | EC2 tagged for import | Create worker record |
2.4.1 Metrics Collection Job¶
The Worker Controller polls each active CML Worker for infrastructure metrics:
- CML System API: System stats via
/api/v0/system_stats(worker-level, requires auth) - EC2 CloudWatch: Instance-level CPU, memory, network, disk
- EC2 Describe Instances: Instance status, tags, metadata
class WorkerMetricsCollectionJob:
"""Collects infrastructure metrics from CML Workers and updates via API."""
async def execute_async(self, worker_id: str) -> None:
# 1. Get worker spec from API
worker = await self.api_client.get_worker(worker_id)
# 2. Observe actual EC2 state
ec2_state = await self.ec2_client.describe_instance(worker.ec2_instance_id)
# 3. Observe CML system metrics
cml_stats = await self.cml_client.get_system_stats(
host=worker.ip_address,
username=settings.CML_WORKER_API_USERNAME,
password=settings.CML_WORKER_API_PASSWORD
)
# 4. Collect CloudWatch metrics
cloudwatch_metrics = await self.cloudwatch_client.get_instance_metrics(
instance_id=worker.ec2_instance_id
)
# 5. Update via Control Plane API (ADR-001)
await self.api_client.update_worker_metrics(
worker_id=worker_id,
metrics=WorkerMetrics(
ec2_status=ec2_state.status,
cpu_utilization=cml_stats.cpu_percent,
memory_utilization=cml_stats.memory_percent,
disk_utilization=cml_stats.disk_percent,
network_in=cloudwatch_metrics.network_in,
network_out=cloudwatch_metrics.network_out,
collected_at=datetime.utcnow()
)
)
2.4.2 License Management¶
Reconciles CML license state with desired configuration:
class LicenseReconciler:
"""Ensures CML workers have correct license state."""
async def reconcile(self, worker: CMLWorker) -> None:
# Observe actual license state
license_info = await self.cml_client.get_license_info(worker.ip_address)
# Compare with desired spec
if worker.license_required and not license_info.is_registered:
# Action: Register license
await self.cml_client.register_license(
host=worker.ip_address,
license_token=settings.CML_LICENSE_TOKEN
)
await self.api_client.update_worker_license_status(
worker_id=worker.id,
license_registered=True
)
elif not worker.license_required and license_info.is_registered:
# Action: Deregister license (release for other workers)
await self.cml_client.deregister_license(worker.ip_address)
await self.api_client.update_worker_license_status(
worker_id=worker.id,
license_registered=False
)
2.4.3 Auto-Import Workers¶
Discovers and imports EC2 instances tagged for CML management:
class AutoImportWorkersJob:
"""Discovers EC2 instances and creates worker records."""
async def execute_async(self) -> None:
# Observe: Find EC2 instances tagged for CML
ec2_instances = await self.ec2_client.describe_instances(
filters=[
{"Name": "tag:cml-managed", "Values": ["true"]},
{"Name": "instance-state-name", "Values": ["running"]}
]
)
# Get existing workers from spec
existing_workers = await self.api_client.list_workers()
existing_instance_ids = {w.ec2_instance_id for w in existing_workers}
# Reconcile: Create workers for new instances
for instance in ec2_instances:
if instance.id not in existing_instance_ids:
# Action: Import worker
await self.api_client.import_worker(
ec2_instance_id=instance.id,
name=instance.tags.get("Name", f"imported-{instance.id}"),
ip_address=instance.private_ip
)
2.5 Cloud Provider SPI¶
Responsibility: Abstract cloud-specific operations behind a common interface.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β CLOUD PROVIDER SPI β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β ICloudProviderAdapter β β
β β (Abstract Interface) β β
β β β β
β β + create_instance(template) -> InstanceId β β
β β + start_instance(instance_id) -> None β β
β β + stop_instance(instance_id) -> None β β
β β + terminate_instance(instance_id) -> None β β
β β + get_instance_status(instance_id) -> Status β β
β β + get_instance_metrics(instance_id) -> Metrics β β
β β + list_instances(filters) -> list[Instance] β β
β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β³ β
β β β
β βββββββββββββββββΌββββββββββββββββ β
β β β β β
β βββββββββββ΄βββββ βββββββββ΄ββββββ ββββββββ΄βββββββ β
β β AWS EC2 β β GCP Compute β β Azure VMs β β
β β Adapter β β Adapter β β Adapter β β
β β (Implemented)β β (Future) β β (Future) β β
β ββββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3. Domain Model¶
3.1 Aggregate Relationships¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β DOMAIN MODEL β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββββ ββββββββββββββββββββββββββββββββββββββββββ β
β β LabletDefinition β 1 * β LabletSession (AggregateRoot) β β
β β (Aggregate Root) ββββββββββΆβ β β
β β β β β’ id β β
β β β’ id β β β’ definition_id, definition_ver β β
β β β’ name β β β’ worker_id ββββ
β β β’ version β β β’ lab_record_id (1:1) β ββ
β β β’ lab_artifact_uriβ β β’ user_session_id β UserSession β ββ
β β β’ resource_reqs β β β’ grading_session_id β GradingSess β ββ
β β β’ license_affinityβ β β’ score_report_id β ScoreReport β ββ
β β β’ port_template β β β’ state (LabletSessionStatus) β ββ
β β β’ grading_rules β β β’ allocated_ports β ββ
β β β’ warm_pool_depth β β β’ timeslot_start, timeslot_end β ββ
β ββββββββββββββββββββββ β β’ started_at, ended_at β ββ
β β β’ duration_seconds β ββ
β β β’ owner_id, reservation_id β ββ
β ββββββββββββββββββββββββββββββββββββββββββ ββ
β ββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ ββ
β β CHILD ENTITIES (separate collections, linked by lablet_session_id) β ββ
β β β ββ
β β ββββββββββββββββββββ ββββββββββββββββββββ ββββββββββββββββββββ β ββ
β β β UserSession β β GradingSession β β ScoreReport β β ββ
β β β (Entity[str]) β β (Entity[str]) β β (Entity[str]) β β ββ
β β β β β β β β β ββ
β β β β’ lds_session_id β β β’ grading_id β β β’ grading_sess_idβ β ββ
β β β β’ lds_part_id β β β’ grading_part_idβ β β’ score β β ββ
β β β β’ form_qual_name β β β’ pod_id β β β’ max_score β β ββ
β β β β’ login_url β β β’ form_qual_name β β β’ cut_score β β ββ
β β β β’ devices[] β β β’ devices[] β β β’ passed β β ββ
β β β β’ status β β β’ status β β β’ sections[] β β ββ
β β ββββββββββββββββββββ ββββββββββββββββββββ β β’ submitted_at β β ββ
β β β β’ report_url β β ββ
β β Collection: Collection: ββββββββββββββββββββ β ββ
β β user_sessions grading_sessions Collection: β ββ
β β score_reports β ββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ ββ
β ββ
β ββββββββββββββββββββββββββββββββββββ
β β β
β βΌ * β
β ββββββββββββββββββββββ ββββββββββββββββββββββ β
β β WorkerTemplate β 1 * β CMLWorker β β
β β (Value Object) ββββββββββΆβ (Aggregate Root) β β
β β β β [EXTENDED] β β
β β β’ name β β β β
β β β’ instance_type β β β’ id β β
β β β’ capacity β β β’ template_name β β
β β β’ license_type β β β’ status β β
β β β’ ami_pattern β β β’ capacity β β
β β β’ region β β β’ allocated_cap β β
β β β’ port_range β β β’ port_allocationsβ β
β ββββββββββββββββββββββ β β’ session_ids[] β β
β ββββββββββββββββββββββ β
β β
β ELIMINATED (AD-39): LabletRecordRun, LabletLabBinding β
β LabletSession absorbs: allocated_ports, started_at/ended_at/duration β
β LabletSession absorbs: lab_record_id as direct 1:1 reference (AD-43) β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
3.2 LabletDefinition Aggregate¶
@dataclass
class LabletDefinitionState(AggregateState[str]):
"""State for LabletDefinition aggregate."""
id: str
name: str
version: str # Semantic version
# Artifact reference
lab_artifact_uri: str # S3/MinIO path
lab_yaml_hash: str # SHA-256 for change detection
lab_yaml_cached: str | None # Cached YAML content
# Resource requirements
resource_requirements: ResourceRequirements
license_affinity: list[LicenseType]
node_count: int
# Port configuration
port_template: PortTemplate # Template with placeholders
# Assessment integration
grading_rules_uri: str | None
max_duration_minutes: int
# Warm pool
warm_pool_depth: int
# Ownership
owner_notification: NotificationConfig | None
created_by: str
created_at: datetime
@dataclass
class ResourceRequirements:
cpu_cores: int
memory_gb: int
storage_gb: int
nested_virt: bool
ami_requirements: list[AmiRequirement] | None
@dataclass
class PortTemplate:
"""Template for port allocation with placeholders."""
ports: list[PortDefinition]
# Example: [{"name": "serial_1", "protocol": "tcp"}, {"name": "vnc_1", "protocol": "tcp"}]
class LabletDefinition(AggregateRoot[LabletDefinitionState, str]):
"""LabletDefinition aggregate - immutable per version."""
@staticmethod
def create(
name: str,
version: str,
lab_artifact_uri: str,
resource_requirements: ResourceRequirements,
license_affinity: list[LicenseType],
port_template: PortTemplate,
created_by: str,
**kwargs
) -> "LabletDefinition":
"""Create a new LabletDefinition."""
definition = LabletDefinition()
definition.record_event(LabletDefinitionCreatedDomainEvent(
aggregate_id=str(uuid4()),
name=name,
version=version,
lab_artifact_uri=lab_artifact_uri,
resource_requirements=resource_requirements,
license_affinity=license_affinity,
port_template=port_template,
created_by=created_by,
created_at=datetime.now(timezone.utc),
**kwargs
))
return definition
3.3 LabletSession Aggregate (renamed from LabletInstance β AD-38)¶
class LabletSessionStatus(Enum):
PENDING = "pending"
SCHEDULED = "scheduled"
INSTANTIATING = "instantiating"
READY = "ready" # NEW: LDS provisioned, awaiting user login
RUNNING = "running"
COLLECTING = "collecting"
GRADING = "grading"
STOPPING = "stopping"
STOPPED = "stopped"
ARCHIVED = "archived"
TERMINATED = "terminated"
@dataclass
class LabletSessionState(AggregateState[str]):
"""State for LabletSession aggregate."""
id: str
definition_id: str
definition_version: str # Pinned at creation
# Assignment
worker_id: str | None
allocated_ports: dict[str, int] | None # {"serial_1": 5041, "vnc_1": 5044}
cml_lab_id: str | None # Lab ID in CML after import
# Lifecycle
status: LabletSessionStatus
state_history: list[StateTransition]
# Timeslot
timeslot_start: datetime
timeslot_end: datetime
# Ownership
owner_id: str
reservation_id: str | None # External reservation reference
# Lab Record binding (1:1, absorbed from LabletLabBinding β AD-39, AD-43)
lab_record_id: str | None
# Child entity references (separate collections β AD-45, AD-46, AD-47-R1)
user_session_id: str | None # β UserSession (user_sessions collection)
grading_session_id: str | None # β GradingSession (grading_sessions collection)
score_report_id: str | None # β ScoreReport (score_reports collection)
# Timing (absorbed from LabletRecordRun β AD-39)
started_at: datetime | None
ended_at: datetime | None
duration_seconds: int | None
# Timestamps
created_at: datetime
scheduled_at: datetime | None
terminated_at: datetime | None
class LabletSession(AggregateRoot[LabletSessionState, str]):
"""LabletSession aggregate - runtime lifecycle (renamed from LabletInstance)."""
def schedule(self, worker_id: str, allocated_ports: dict[str, int]) -> None:
"""Assign session to worker with port allocation."""
if self.state.status != LabletSessionStatus.PENDING:
raise InvalidStateTransition(f"Cannot schedule from {self.state.status}")
self.record_event(LabletSessionScheduledDomainEvent(
aggregate_id=self.id(),
worker_id=worker_id,
allocated_ports=allocated_ports,
scheduled_at=datetime.now(timezone.utc)
))
def start_instantiation(self) -> None:
"""Begin lab import and startup."""
if self.state.status != LabletSessionStatus.SCHEDULED:
raise InvalidStateTransition(f"Cannot instantiate from {self.state.status}")
self.record_event(LabletSessionInstantiatingDomainEvent(
aggregate_id=self.id()
))
def mark_ready(self, cml_lab_id: str, user_session_id: str) -> None:
"""Mark session as ready after LDS provisioning complete."""
self.record_event(LabletSessionReadyDomainEvent(
aggregate_id=self.id(),
cml_lab_id=cml_lab_id,
user_session_id=user_session_id,
))
def mark_running(self) -> None:
"""Mark session as running when user logs in (via LDS CloudEvent)."""
self.record_event(LabletSessionRunningDomainEvent(
aggregate_id=self.id(),
started_at=datetime.now(timezone.utc)
))
def start_collection(self) -> None:
"""Transition to collecting state."""
if self.state.status != LabletSessionStatus.RUNNING:
raise InvalidStateTransition(f"Cannot collect from {self.state.status}")
self.record_event(LabletSessionCollectingDomainEvent(
aggregate_id=self.id()
))
def record_grading_result(self, score_report_id: str) -> None:
"""Record grading result and transition to stopping."""
self.record_event(LabletSessionGradedDomainEvent(
aggregate_id=self.id(),
score_report_id=score_report_id
))
3.3.1 UserSession Entity (AD-45)¶
class UserSessionStatus(Enum):
PROVISIONING = "provisioning"
PROVISIONED = "provisioned" # LDS session created, awaiting user login
ACTIVE = "active" # User logged in
PAUSED = "paused" # User paused session
ENDED = "ended" # Normal completion
EXPIRED = "expired" # Timeslot expired
FAULTED = "faulted" # LDS error
@dataclass
class UserSessionState(AggregateState[str]):
"""State for UserSession entity. Stored in 'user_sessions' collection."""
id: str
lablet_session_id: str # FK to LabletSession
# LDS references
lds_session_id: str
lds_part_id: str
form_qualified_name: str
# Access
login_url: str | None
devices: list[DeviceAccessInfo]
# Lifecycle
status: UserSessionStatus
created_at: datetime
started_at: datetime | None
ended_at: datetime | None
3.3.2 GradingSession Entity (AD-46)¶
class GradingStatus(Enum):
PENDING = "pending"
COLLECTING = "collecting"
GRADING = "grading"
REVIEWING = "reviewing"
SUBMITTED = "submitted"
FAULTED = "faulted"
@dataclass
class GradingSessionState(AggregateState[str]):
"""State for GradingSession entity. Stored in 'grading_sessions' collection."""
id: str
lablet_session_id: str # FK to LabletSession
# Grading Engine references
grading_session_id: str
grading_part_id: str
pod_id: str
# Content
form_qualified_name: str
devices: list[DeviceAccessInfo]
# Lifecycle
status: GradingStatus
created_at: datetime
completed_at: datetime | None
3.3.3 ScoreReport Entity (AD-47-R1)¶
@dataclass
class ScoreSection:
"""Individual grading section within a score report."""
criterion: str
points: float
max_points: float
@dataclass
class ScoreReportState(AggregateState[str]):
"""State for ScoreReport entity. Stored in 'score_reports' collection."""
id: str
lablet_session_id: str # FK to LabletSession
grading_session_id: str # FK to GradingSession
# Scores
score: float
max_score: float
cut_score: float | None
passed: bool
sections: list[ScoreSection]
# Metadata
submitted_at: datetime
report_url: str | None
3.4 CMLWorker Extensions¶
The existing CMLWorker aggregate needs extensions for capacity tracking:
@dataclass
class WorkerCapacity:
"""Capacity specification for a worker."""
cpu_cores: int
memory_gb: int
storage_gb: int
max_nodes: int # License-based limit
@dataclass
class PortAllocation:
"""Port allocation on a worker."""
session_id: str
ports: dict[str, int] # {"serial_1": 5041, "vnc_1": 5044}
allocated_at: datetime
# Extensions to CMLWorkerState
class CMLWorkerState(AggregateState[str]):
# ... existing fields ...
# NEW: Capacity management
template_name: str | None # Reference to WorkerTemplate
declared_capacity: WorkerCapacity
allocated_capacity: WorkerCapacity # Sum of running sessions
# NEW: Port management
port_range_start: int # 2000
port_range_end: int # 9999
port_allocations: list[PortAllocation]
# NEW: Session tracking
session_ids: list[str] # Currently assigned sessions
@property
def available_capacity(self) -> WorkerCapacity:
"""Calculate remaining available capacity."""
return WorkerCapacity(
cpu_cores=self.declared_capacity.cpu_cores - self.allocated_capacity.cpu_cores,
memory_gb=self.declared_capacity.memory_gb - self.allocated_capacity.memory_gb,
storage_gb=self.declared_capacity.storage_gb - self.allocated_capacity.storage_gb,
max_nodes=self.declared_capacity.max_nodes - self.allocated_capacity.max_nodes
)
@property
def available_ports(self) -> int:
"""Calculate remaining available ports."""
used_ports = sum(len(a.ports) for a in self.port_allocations)
total_ports = self.port_range_end - self.port_range_start + 1
return total_ports - used_ports
4. Data Flows¶
4.1 Reservation Request Flow¶
βββββββββββ βββββββββββββββββ βββββββββββββββββββ
β Client β β Control Plane β β Resource β
β β β API β β Scheduler β
ββββββ¬βββββ βββββββββ¬ββββββββ ββββββββββ¬βββββββββ
β β β
β POST /api/v1/sessions β β
β {definition_id, β β
β timeslot_start, ...} β β
ββββββββββββββββββββββββΆβ β
β β β
β β Create Session β
β β (PENDING state) β
β ββββββββββ β
β β β β
β ββββββββββ β
β β β
β β Emit: SessionCreated β
β βββββββββββββββββββββββββΆβ
β β β
β 201 Created β β
β {session_id, state: β β
β "pending"} β β
βββββββββββββββββββββββββ β
β β β
β β β Scheduling
β β β Loop Runs
β β βββββββ
β β β β Find
β β β β Worker
β β βββββββ
β β β
β β POST /internal/scheduleβ
β β {session_id, worker_id β
β β allocated_ports} β
β ββββββββββββββββββββββββββ
β β β
β β Update Session β
β β (SCHEDULED state) β
β ββββββββββ β
β β β β
β ββββββββββ β
β β β
β SSE: SessionScheduled β β
βββββββββββββββββββββββββ β
β β β
4.2 Session Instantiation Flow¶
βββββββββββ βββββββββββββ ββββββββββββ ββββββββββββ
βLablet β β Control β β CML β βArtifact β
βController β Plane API β β Worker β βStorage β
ββββββ¬βββββ βββββββ¬ββββββ ββββββ¬ββββββ ββββββ¬ββββββ
β β β β
β Reconcile Loop β β β
β (Approaching β β β
β Timeslot) β β β
ββββββββββββββββββΆβ β β
β β β β
β Get Session β β β
βββββββββββββββββββ β β
β β β β
β Get Definition β β β
βββββββββββββββββββ β β
β β β β
β β β Download β
β β β Lab YAML β
ββββββββββββββββββββββββββββββββββββββββββββββββββββββΆβ
β β β β
βββββββββββββββββββββββββββββββββββββββLab YAMLβββββββ
β β β β
β Rewrite YAML β β β
β (Port mapping) β β β
ββββββββββ β β β
β β β β β
ββββββββββ β β β
β β β β
β POST /internal/ β β β
β transition β β β
β (INSTANTIATING) β β β
ββββββββββββββββββΆβ β β
β β β β
β β Import Lab YAML β β
β ββββββββββββββββββΆβ β
β β β β
β β Lab ID β β
β βββββββββββββββββββ β
β β β β
β β Start Lab β β
β ββββββββββββββββββΆβ β
β β β β
β β Lab Started β β
β βββββββββββββββββββ β
β β β β
β POST /internal/ β β β
β transition β β β
β (RUNNING) β β β
ββββββββββββββββββΆβ β β
β β β β
β β Emit CloudEvent:β β
β β session.running β β
β ββββββββββββββββββββΆ (to Assessment)β
β β β β
4.3 Port Rewriting Process¶
def rewrite_lab_yaml(
lab_yaml: str,
port_template: PortTemplate,
allocated_ports: dict[str, int]
) -> str:
"""
Rewrite lab YAML with allocated ports.
Template placeholders in smart_annotations:
tag: serial:${PORT_SERIAL_1}
Becomes:
tag: serial:5041
"""
import yaml
lab_data = yaml.safe_load(lab_yaml)
# Build placeholder -> port mapping
port_map = {}
for port_def in port_template.ports:
placeholder = f"${{{port_def.name.upper()}}}"
port_map[placeholder] = allocated_ports[port_def.name]
# Rewrite smart_annotations
for annotation in lab_data.get("smart_annotations", []):
tag = annotation.get("tag", "")
for placeholder, port in port_map.items():
if placeholder in tag:
annotation["tag"] = tag.replace(placeholder, str(port))
annotation["label"] = annotation["label"].replace(placeholder, str(port))
# Also rewrite node tags
for node in lab_data.get("nodes", []):
new_tags = []
for tag in node.get("tags", []):
for placeholder, port in port_map.items():
tag = tag.replace(placeholder, str(port))
new_tags.append(tag)
node["tags"] = new_tags
return yaml.dump(lab_data)
5. CloudEvents Schema¶
See ADR-003: CloudEvents for External Integration for rationale.
Important: CloudEvents are emitted for external integration and audit - they are NOT the primary persistence mechanism. State is persisted in etcd/MongoDB; events are a side-effect for subscribers.
5.1 Complete Event Catalog¶
5.1.1 LabletDefinition Events¶
| Event Type | Trigger | Purpose |
|---|---|---|
ccm.lablet.definition.created |
New definition registered | Notify consumers of new lab type |
ccm.lablet.definition.version.created |
New version detected | Version management, cache invalidation |
ccm.lablet.definition.deprecated |
Definition marked deprecated | Prevent new sessions |
5.1.2 LabletSession Lifecycle Events (All States)¶
| Event Type | Trigger | Purpose |
|---|---|---|
ccm.lablet.session.pending |
Session created | Audit: request received |
ccm.lablet.session.scheduled |
Worker assigned | Audit: placement decision made |
ccm.lablet.session.instantiating |
Lab import begins | Audit: instantiation starting |
ccm.lablet.session.ready |
LDS provisioned, awaiting user | NEW: Session ready for user login |
ccm.lablet.session.running |
User logged in (LDS CloudEvent) | Assessment integration: session active |
ccm.lablet.session.collecting |
Collection triggered | Assessment integration: begin collection |
ccm.lablet.session.grading |
Grading in progress | Assessment integration: grading active |
ccm.lablet.session.graded |
Grading finished | Assessment integration: score available |
ccm.lablet.session.stopping |
Stop initiated | Audit: teardown starting |
ccm.lablet.session.stopped |
Lab stopped | Audit: lab inactive |
ccm.lablet.session.archived |
Resources cleaned | Audit: ready for deletion |
ccm.lablet.session.terminated |
Session deleted | Audit: final state |
5.1.2.1 Child Entity Events¶
| Event Type | Trigger | Purpose |
|---|---|---|
ccm.lablet.session.user-session.created |
LDS session provisioned | UserSession tracking |
ccm.lablet.session.user-session.active |
User logged in | Session started |
ccm.lablet.session.user-session.ended |
User ended session | Session completed |
ccm.lablet.session.grading-session.created |
Grading initiated | GradingSession tracking |
ccm.lablet.session.grading-session.completed |
Grading finished | Results available |
ccm.lablet.session.score-report.created |
Score recorded | ScoreReport created |
5.1.3 Worker Lifecycle Events¶
| Event Type | Trigger | Purpose |
|---|---|---|
ccm.worker.pending |
Scale-up initiated | Audit: worker requested |
ccm.worker.provisioning.started |
EC2 instance launching | Audit: cloud API called |
ccm.worker.running |
Worker ready for workload | Capacity management |
ccm.worker.draining |
Scale-down initiated | Capacity: no new assignments |
ccm.worker.stopping |
Worker shutdown started | Audit: EC2 stop in progress |
ccm.worker.stopped |
Worker stopped | Cost: compute paused |
ccm.worker.terminated |
Worker deleted | Audit: resources released |
5.1.4 Scaling Events¶
| Event Type | Trigger | Purpose |
|---|---|---|
ccm.scaling.up.requested |
Capacity shortage detected | Operations alerting |
ccm.scaling.up.completed |
New worker ready | Capacity confirmation |
ccm.scaling.down.requested |
Idle worker detected | Cost optimization tracking |
ccm.scaling.down.completed |
Worker stopped/terminated | Cost confirmation |
5.2 Event Payload Examples¶
# ccm.lablet.session.pending
{
"specversion": "1.0",
"type": "ccm.lablet.session.pending",
"source": "ccm/api",
"id": "evt-12345",
"time": "2026-01-15T10:30:00Z",
"datacontenttype": "application/json",
"data": {
"session_id": "sess-abc123",
"definition_id": "def-xyz789",
"definition_version": "1.2.0",
"owner_id": "user-456",
"reservation_id": "res-789",
"timeslot_start": "2026-01-15T11:00:00Z",
"timeslot_end": "2026-01-15T12:00:00Z",
"created_at": "2026-01-15T10:30:00Z"
}
}
# ccm.lablet.session.instantiating
{
"specversion": "1.0",
"type": "ccm.lablet.session.instantiating",
"source": "ccm/controller",
"id": "evt-12346",
"time": "2026-01-15T10:35:00Z",
"data": {
"session_id": "sess-abc123",
"worker_id": "worker-def456",
"allocated_ports": {
"serial_1": 5041,
"vnc_1": 5044
},
"lab_yaml_hash": "sha256:abc123..."
}
}
# ccm.lablet.session.running
{
"specversion": "1.0",
"type": "ccm.lablet.session.running",
"source": "ccm/controller",
"id": "evt-12347",
"time": "2026-01-15T10:45:00Z",
"data": {
"session_id": "sess-abc123",
"worker_id": "worker-def456",
"worker_hostname": "worker-def456.internal",
"cml_lab_id": "lab-ghi789",
"allocated_ports": {
"serial_1": 5041,
"serial_2": 5042,
"vnc_1": 5044
},
"started_at": "2026-01-15T10:45:00Z"
}
}
# ccm.lablet.session.collecting
{
"specversion": "1.0",
"type": "ccm.lablet.session.collecting",
"source": "ccm/api",
"id": "evt-12348",
"time": "2026-01-15T11:50:00Z",
"data": {
"session_id": "sess-abc123",
"triggered_by": "user-456", // or "system" for auto-collection
"collection_reason": "manual" // or "timeslot_end", "assessment_request"
}
}
# ccm.lablet.session.grading
{
"specversion": "1.0",
"type": "ccm.lablet.session.grading",
"source": "ccm/controller",
"id": "evt-12349",
"time": "2026-01-15T11:52:00Z",
"data": {
"session_id": "sess-abc123",
"grading_session_id": "grade-session-xyz"
}
}
# ccm.lablet.session.graded
{
"specversion": "1.0",
"type": "ccm.lablet.session.graded",
"source": "ccm/controller",
"id": "evt-12350",
"time": "2026-01-15T12:00:00Z",
"data": {
"session_id": "sess-abc123",
"score_report_id": "sr-abc123",
"score": {
"total": 85,
"max": 100,
"passed": true,
"breakdown": [
{"criterion": "Task 1", "points": 25, "max": 30},
{"criterion": "Task 2", "points": 30, "max": 30},
{"criterion": "Task 3", "points": 30, "max": 40}
]
},
"grading_duration_seconds": 120
}
}
# ccm.lablet.session.terminated
{
"specversion": "1.0",
"type": "ccm.lablet.session.terminated",
"source": "ccm/controller",
"id": "evt-12355",
"time": "2026-01-15T12:05:00Z",
"data": {
"session_id": "sess-abc123",
"final_state": "archived",
"score_report_id": "sr-abc123",
"duration_minutes": 55
}
}
# ccm.worker.draining (for scale-down visibility)
{
"specversion": "1.0",
"type": "ccm.worker.draining",
"source": "ccm/controller",
"id": "evt-worker-drain-1",
"time": "2026-01-15T13:00:00Z",
"data": {
"worker_id": "worker-def456",
"reason": "scale_down_idle",
"running_sessions_count": 2,
"estimated_drain_completion": "2026-01-15T14:00:00Z"
}
}
5.3 Events Consumed by CCM¶
# lds.session.started (from LDS via CloudEventIngestor β AD-41)
{
"specversion": "1.0",
"type": "lds.session.started",
"source": "lds",
"id": "evt-lds-start-1",
"time": "2026-01-15T10:46:00Z",
"data": {
"lds_session_id": "lds-sess-123",
"session_id": "sess-abc123"
}
}
# lds.session.ended (from LDS via CloudEventIngestor)
{
"specversion": "1.0",
"type": "lds.session.ended",
"source": "lds",
"id": "evt-lds-end-1",
"time": "2026-01-15T11:50:00Z",
"data": {
"lds_session_id": "lds-sess-123",
"session_id": "sess-abc123"
}
}
# grading.session.completed (from Grading Engine via CloudEventIngestor)
{
"specversion": "1.0",
"type": "grading.session.completed",
"source": "grading-engine",
"id": "evt-grade-789",
"time": "2026-01-15T12:02:00Z",
"data": {
"grading_session_id": "grade-session-xyz",
"session_id": "sess-abc123",
"score": {
"total": 85,
"max": 100,
"breakdown": [
{"criterion": "Task 1", "points": 25, "max": 30},
{"criterion": "Task 2", "points": 30, "max": 30},
{"criterion": "Task 3", "points": 30, "max": 40}
]
},
"passed": true
}
}
6. Deployment Architecture¶
6.1 Component Deployment¶
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β KUBERNETES CLUSTER β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β
β β Ingress Controller β β
β βββββββββββββββββββββββββββββ¬βββββββββββββββββββββββββββββββββ β
β β β
β ββββββββββββββββββββββΌβββββββββββββββββββββ β
β βΌ βΌ βΌ β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β Control β β Scheduler β β Resource β β
β β Plane API β β Service β β Controller β β
β β (3 replicas)β β (2 replicas)β β (2 replicas)β β
β ββββββββ¬βββββββ βββββββββββββββ βββββββββββββββ β
β β (Leader election) (Leader election) β
β βΌ β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β etcd (State Store) ββ
β β (3-node cluster) ββ
β β β’ Instance/Worker state β’ Leader election β’ Watches ββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β MongoDB (Spec Store) ββ
β β (3-node replica set) ββ
β β β’ LabletDefinitions β’ WorkerTemplates β’ Audit events ββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β Redis (UI Session Store) ββ
β β β’ User authentication sessions (httpOnly cookies) ββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β CloudEvents Bus (External Sink) ββ
β β β’ Event persistence for audit/analytics ββ
β β β’ External integration (Assessment Platform) ββ
β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β EXTERNAL SERVICES β
β ββββββββββββ ββββββββββββ ββββββββββββ βββββββββββββββββ β
β β Keycloak β β S3/MinIO β β OTEL β β Assessment β β
β β β β β β Collectorβ β Platform β β
β ββββββββββββ ββββββββββββ ββββββββββββ βββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
6.2 Scaling Configuration¶
| Component | Min Replicas | Max Replicas | Scaling Metric |
|---|---|---|---|
| Control Plane API | 2 | 10 | CPU 70% |
| Resource Scheduler | 2 | 5 | Custom (queue depth) |
| Lablet Controller | 2 | 3 | N/A (leader election) |
| Worker Controller | 2 | 3 | N/A (leader election) |
7. Implementation Phases¶
Phase 1: Foundation (Weeks 1-4)¶
- [ ] Define LabletDefinition aggregate and repository
- [ ] Define LabletSession aggregate and repository (renamed from LabletInstance)
- [ ] Define UserSession, GradingSession, ScoreReport entities and repositories
- [ ] Extend CMLWorker with capacity tracking
- [ ] Implement basic CRUD APIs
- [ ] Implement port allocation service
Phase 2: Scheduling (Weeks 5-8)¶
- [ ] Implement Resource Scheduler (basic placement)
- [ ] Implement timeslot management
- [ ] Implement lab YAML rewriting
- [ ] Implement instantiation flow
- [ ] Add SSE updates for instance state
Phase 3: Auto-Scaling (Weeks 9-12)¶
- [ ] Implement Lablet Controller (
src/lablet-controller/) - [ ] Implement Worker Controller (
src/worker-controller/) - [ ] Implement scale-up logic
- [ ] Implement scale-down logic
- [ ] Implement Cloud Provider SPI (AWS)
- [ ] Add worker template configuration
Phase 4: Assessment Integration (Weeks 13-16)¶
- [ ] Implement CloudEvent publishing
- [ ] Implement CloudEvent consumption
- [ ] Integrate collection/grading states
- [ ] Add grading result handling
Phase 5: Production Hardening (Weeks 17-20)¶
- [ ] Add comprehensive observability
- [ ] Implement warm pool (if needed)
- [ ] Performance testing
- [ ] Documentation
- [ ] UI integration
8. Architectural Decisions Record¶
All architectural decisions are documented in the ADR folder.
Current ADRs¶
| ADR | Title | Status |
|---|---|---|
| ADR-001 | API-Centric State Management | Accepted |
| ADR-002 | Separate Resource Scheduler | Accepted |
| ADR-003 | CloudEvents for External Integration | Accepted |
| ADR-004 | Port Allocation per Worker | Accepted |
| ADR-005 | Dual State Store Architecture (etcd + MongoDB) | Proposed |
| ADR-006 | Scheduler High Availability Coordination | Proposed |
| ADR-007 | Worker Template Seeding and Management | Accepted |
| ADR-008 | Worker Draining State for Scale-Down | Proposed |
9. Assessment Integration: Pod Generation¶
Based on the Grading Engine API schema (
docs/grading-engine_openapi.json).
9.0 Integration Configuration¶
Authentication: JWT tokens from shared Keycloak instance (same IDP as CCM).
Deployment: Grading Engine can be deployed in the same docker-compose stack for development/testing.
# docker-compose.yml (example addition)
services:
grading-engine:
image: grading-engine:latest
environment:
- KEYCLOAK_URL=http://keycloak:8080
- KEYCLOAK_REALM=lablet-cloud-manager
- KEYCLOAK_CLIENT_ID=grading-engine
depends_on:
- keycloak
9.1 Pod Schema Mapping¶
The Grading Engine expects a Pod definition when assigning lab resources to an assessment session:
// Grading Engine Pod Schema (confirmed)
{
"id": "string",
"devices": [
{
"label": "string",
"hostname": "string",
"collector": "string",
"interfaces": [
{
"name": "string",
"protocol": "string", // ssh, telnet, console, vnc
"host": "string", // Worker IP/hostname
"port": 5041, // Allocated port
"authentication": {}, // Credentials object
"configuration": {} // Protocol-specific config
}
]
}
]
}
9.2 CML Lab β Pod Mapping¶
When a LabletSession reaches READY state, the Lablet Controller generates a Pod definition from:
- CML Lab YAML (nodes with smart_annotations)
- Allocated Ports (from Scheduler)
- Worker Details (hostname/IP)
def generate_pod_from_session(
session: LabletSession,
worker: CMLWorker,
definition: LabletDefinition
) -> Pod:
"""
Generate Grading Engine Pod from LabletSession in READY state.
Called by Lablet Controller during reconciliation.
Mapping:
- CML node β Pod device
- smart_annotation serial:PORT β interface (protocol=console)
- smart_annotation vnc:PORT β interface (protocol=vnc)
"""
lab_yaml = yaml.safe_load(definition.lab_yaml_cached)
devices = []
for node in lab_yaml.get("nodes", []):
device = Device(
label=node["label"],
hostname=node["label"], # Or extract from node config
collector="ccm", # Collection agent identifier
interfaces=[]
)
# Extract interfaces from node tags
for tag in node.get("tags", []):
if tag.startswith("serial:"):
port = int(tag.split(":")[1])
device.interfaces.append(DeviceInterface(
name=f"console-{node['label']}",
protocol="console",
host=worker.state.hostname,
port=port,
authentication={"type": "none"}, # CML console auth
))
elif tag.startswith("vnc:"):
port = int(tag.split(":")[1])
device.interfaces.append(DeviceInterface(
name=f"vnc-{node['label']}",
protocol="vnc",
host=worker.state.hostname,
port=port,
authentication={"type": "vnc_password"},
))
if device.interfaces: # Only include nodes with external interfaces
devices.append(device)
return Pod(
id=session.id,
devices=devices
)
9.3 Pod Assignment Flow¶
Lablet Controller Grading Engine
β β
β Session reaches READY state β
ββββββββββββββββββββββββββββββββββββββ
β β
β Generate Pod from Lab YAML β
ββββββββββ β
β β β
ββββββββββ β
β β
β POST /api/v1/sessions/{id}/parts/{partId}/pod
β { pod: {...} } β
βββββββββββββββββββββββββββββββββββββΆβ
β β
β 202 Accepted β
ββββββββββββββββββββββββββββββββββββββ
β β
β CloudEvent: ccm.lablet.session.ready
β { pod_assigned: true } β
βββββββββββββββββββββββββββββββββββββΆβ
10. Open Questions for Implementation¶
Resolved¶
-
~~Warm Pool Priority: Should warm pool implementation be deferred?~~ β Deferred to later optimization phase
-
~~Worker Template Management: Should templates be stored in MongoDB or configuration files?~~ β Both: MongoDB aggregate seeded from config files (see ADR-007)
-
~~Multi-Region Strategy: How to handle region-specific worker templates?~~ β Regional isolation: One CCM deployment per region, no cross-region coordination
-
~~etcd vs MongoDB-only: Should we prototype with MongoDB Change Streams first?~~ β No, proceed with dual store (etcd + MongoDB) - see ADR-005
-
~~Drain timeout configuration: Should drain timeout be per-worker-template or global?~~ β Per-template:
drain_timeout_hoursattribute on WorkerTemplate (see ADR-008) -
~~Grading Engine integration: Confirm Pod assignment API endpoint and authentication?~~ β Confirmed: Device/Interface schema validated, JWT auth on shared Keycloak instance
-
~~Audit Log Retention: How long should CloudEvents be retained?~~ β Minimum 3 months, maximum 1 year (NFR-3.5.5)
-
~~Cost estimation: Should terminated events include cost estimates?~~ β No, cost estimation NOT included in event payload
Open¶
None - all questions resolved.
11. Revision History¶
| Version | Date | Author | Changes |
|---|---|---|---|
| 0.1.0 | 2026-01-15 | Architecture Team | Initial draft |
| 0.2.0 | 2026-01-15 | Architecture Team | Incorporated feedback: dual store architecture (etcd+MongoDB), worker DRAINING state, scale timing delays, separated ADRs to /docs/architecture/adr/, added intermediate CloudEvents, HA coordination with leader election, Pod generation for Grading Engine integration |
| 0.5.0 | 2026-02-18 | Architecture Team | Major entity model redesign (AD-38 through AD-47-R1): Renamed LabletInstance β LabletSession, eliminated LabletRecordRun and LabletLabBinding, added UserSession/GradingSession/ScoreReport as separate Entity[str] with own collections. Added READY state between INSTANTIATING and RUNNING. Updated CloudEvents schema (ccm.lablet.instance.β ccm.lablet.session.). Added LDS/GradingEngine CloudEvent consumption via Neuroglia CloudEventIngestor. Updated Lablet Controller responsibility to include LDS+GradingEngine+CloudEvent proxy. |
| 0.3.0 | 2026-01-16 | Architecture Team | Resolved all open questions: confirmed dual DB approach, drain timeout per-template with admin cancel + instance retry, Grading Engine JWT auth confirmed, audit retention 3mo-1yr, no cost in events |